사용자는 **표현식**으로 DataFrame의 컬럼을 선택,조작,제거할 수 있다. 스파크에선 dataframe의 트랜스포메이션을 사용해야 컬럼 내용에 접근할 수 있다. 

# 1. 컬럼
---

컬럼 생성은 col/column 함수를 사용하는 것이 가장 간단하다.

In [3]:
from pyspark.sql.functions import col,column

col("someColumnName")
column("someColumnName")

AssertionError: 

위 예제가 왜 에러가 뜨는지 모르겠다. 

# 2. 표현식
---

표현식은 DataFrame 레코드의 여러 값에 대한 트랜스포메이션 집합을 의미한다.

# 3. 레코드와 로우
---


In [4]:
from pyspark.sql import Row
myRow=Row("hello",None,1,False)

# 4. DataFrame의 트랜스포메이션
---

In [3]:
from pyspark.sql import SparkSession

spark = SparkSession.builder\
    .appName("session4")\
    .config("spark.some.config.option","some-value")\
    .getOrCreate()
df = spark.read.format("json").load("../data/flight-data/json/2015-summary.json")
df.createOrReplaceTempView("dfTable")

In [4]:
df.show()

+--------------------+-------------------+-----+
|   DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+--------------------+-------------------+-----+
|       United States|            Romania|   15|
|       United States|            Croatia|    1|
|       United States|            Ireland|  344|
|               Egypt|      United States|   15|
|       United States|              India|   62|
|       United States|          Singapore|    1|
|       United States|            Grenada|   62|
|          Costa Rica|      United States|  588|
|             Senegal|      United States|   40|
|             Moldova|      United States|    1|
|       United States|       Sint Maarten|  325|
|       United States|   Marshall Islands|   39|
|              Guyana|      United States|   64|
|               Malta|      United States|    1|
|            Anguilla|      United States|   41|
|             Bolivia|      United States|   30|
|       United States|           Paraguay|    6|
|             Algeri

In [6]:
from pyspark.sql import Row
from pyspark.sql.types import StructField,StructType,StringType,LongType

myManualSchema = StructType([
    StructField("some",StringType(),True),
    StructField("col",StringType(),True),
    StructField("names",LongType(),False)
])
myRow = Row("hello",None,1)
myDf = spark.createDataFrame([myRow],myManualSchema)
myDf.show()

+-----+----+-----+
| some| col|names|
+-----+----+-----+
|hello|null|    1|
+-----+----+-----+



In [7]:
# 4.2 select와 selectExpr

df.select("DEST_COUNTRY_NAME").show(2)

+-----------------+
|DEST_COUNTRY_NAME|
+-----------------+
|    United States|
|    United States|
+-----------------+
only showing top 2 rows



In [8]:
from pyspark.sql.functions import expr,col,column

df.select(expr("DEST_COUNTRY_NAME"),col("DEST_COUNTRY_NAME"),column("DEST_COUNTRY_NAME")).show(2)

+-----------------+-----------------+-----------------+
|DEST_COUNTRY_NAME|DEST_COUNTRY_NAME|DEST_COUNTRY_NAME|
+-----------------+-----------------+-----------------+
|    United States|    United States|    United States|
|    United States|    United States|    United States|
+-----------------+-----------------+-----------------+
only showing top 2 rows



In [9]:
df.select(expr("DEST_COUNTRY_NAME AS destination")).show(2)

+-------------+
|  destination|
+-------------+
|United States|
|United States|
+-------------+
only showing top 2 rows



In [19]:
df.select(expr("DEST_COUNTRY_NAME AS destination").alias("DEST_COUNTRY_NAME_new")).show(2)

+---------------------+
|DEST_COUNTRY_NAME_new|
+---------------------+
|        United States|
|        United States|
+---------------------+
only showing top 2 rows



In [18]:
# select + expr = selectExpr
df.selectExpr("DEST_COUNTRY_NAME as destination","DEST_COUNTRY_NAME").show(2) # 여러개를 선택하게 해주는 것임.

+-------------+-----------------+
|  destination|DEST_COUNTRY_NAME|
+-------------+-----------------+
|United States|    United States|
|United States|    United States|
+-------------+-----------------+
only showing top 2 rows



In [20]:
# 조건문도 사용 가능
df.selectExpr(
    "*",
    "(DEST_COUNTRY_NAME = ORIGIN_COUNTRY_NAME) as withinCountry"
).show(2)

+-----------------+-------------------+-----+-------------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|withinCountry|
+-----------------+-------------------+-----+-------------+
|    United States|            Romania|   15|        false|
|    United States|            Croatia|    1|        false|
+-----------------+-------------------+-----+-------------+
only showing top 2 rows



In [23]:
# 집계 함수 사용 가능
df.selectExpr("avg(count)","count(distinct(DEST_COUNTRY_NAME))").show(2)

+-----------+---------------------------------+
| avg(count)|count(DISTINCT DEST_COUNTRY_NAME)|
+-----------+---------------------------------+
|1770.765625|                              132|
+-----------+---------------------------------+



In [26]:
# 일정 값을 설정? 나열? 하는 함수 lit()
from pyspark.sql.functions import lit
df.select(expr("*"),lit(1).alias("One")).show(5)

+-----------------+-------------------+-----+---+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|One|
+-----------------+-------------------+-----+---+
|    United States|            Romania|   15|  1|
|    United States|            Croatia|    1|  1|
|    United States|            Ireland|  344|  1|
|            Egypt|      United States|   15|  1|
|    United States|              India|   62|  1|
+-----------------+-------------------+-----+---+
only showing top 5 rows



In [27]:
# 공식적인 컬럼 추가 메서드 withColumn

df.withColumn("numberOne",lit(1)).show(2)


+-----------------+-------------------+-----+---------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|numberOne|
+-----------------+-------------------+-----+---------+
|    United States|            Romania|   15|        1|
|    United States|            Croatia|    1|        1|
+-----------------+-------------------+-----+---------+
only showing top 2 rows



In [29]:
# 컬럼의 표현식을 사용해서 나타낼 수도 있음.
df.withColumn("withinCountry",expr("ORIGIN_COUNTRY_NAME == DEST_COUNTRY_NAME")).show(5)

+-----------------+-------------------+-----+-------------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|withinCountry|
+-----------------+-------------------+-----+-------------+
|    United States|            Romania|   15|        false|
|    United States|            Croatia|    1|        false|
|    United States|            Ireland|  344|        false|
|            Egypt|      United States|   15|        false|
|    United States|              India|   62|        false|
+-----------------+-------------------+-----+-------------+
only showing top 5 rows



In [30]:
# 컬럼명 변경
df.withColumnRenamed("DEST_COUNTRY_NAME","destination").columns

['destination', 'ORIGIN_COUNTRY_NAME', 'count']

In [35]:
# column명에 하이픈 "-" 혹은 공백 같은 예약 문자?가 들어간 경우, 표현식을 사용할 때 항상 백틱(`)을 사용하라.
df_with_long_col_name = df.withColumn("This Long Column-Name",expr("ORIGIN_COUNTRY_NAME"))
df_with_long_col_name.show(5)

+-----------------+-------------------+-----+---------------------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|This Long Column-Name|
+-----------------+-------------------+-----+---------------------+
|    United States|            Romania|   15|              Romania|
|    United States|            Croatia|    1|              Croatia|
|    United States|            Ireland|  344|              Ireland|
|            Egypt|      United States|   15|        United States|
|    United States|              India|   62|                India|
+-----------------+-------------------+-----+---------------------+
only showing top 5 rows



In [38]:
df_with_long_col_name.selectExpr("`This Long Column-Name`").show(5)

+---------------------+
|This Long Column-Name|
+---------------------+
|              Romania|
|              Croatia|
|              Ireland|
|        United States|
|                India|
+---------------------+
only showing top 5 rows



In [40]:
# 4.7 스파크는 대소문자를 구분하지 않는다.

# 4.8 컬럼 제거
df_with_long_col_name.drop("ORIGIN_COUNTRY_NAME","DEST_COUNTRY_NAME").show(5)

+-----+---------------------+
|count|This Long Column-Name|
+-----+---------------------+
|   15|              Romania|
|    1|              Croatia|
|  344|              Ireland|
|   15|        United States|
|   62|                India|
+-----+---------------------+
only showing top 5 rows



In [45]:
# 4.9 컬럼의 데이터 타입 변경하기 - cast() 메서드
df.withColumn("count2",col("count").cast("string")).show(5)


+-----------------+-------------------+-----+------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|count2|
+-----------------+-------------------+-----+------+
|    United States|            Romania|   15|    15|
|    United States|            Croatia|    1|     1|
|    United States|            Ireland|  344|   344|
|            Egypt|      United States|   15|    15|
|    United States|              India|   62|    62|
+-----------------+-------------------+-----+------+
only showing top 5 rows



In [46]:
# 4.10 로우 필터링하기

df.filter(col("count")<2).show(2)
df.where("count < 2").show(2)

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Croatia|    1|
|    United States|          Singapore|    1|
+-----------------+-------------------+-----+
only showing top 2 rows

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Croatia|    1|
|    United States|          Singapore|    1|
+-----------------+-------------------+-----+
only showing top 2 rows



In [50]:
# 여러 필터를 적용할 때 (and 느낌인 듯.)

df.where("count < 2").where(col("ORIGIN_COUNTRY_NAME") == "Croatia").show(2)

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Croatia|    1|
+-----------------+-------------------+-----+



In [51]:
# 4.11 중복되지 않은 고유한 로우 얻기 - distinct() 메서드

df.select("ORIGIN_COUNTRY_NAME","DEST_COUNTRY_NAME").distinct().count()

256

In [52]:
# 4.12 무작위 샘플 만들기

seed = 5
withReplacement = False
fraction = 0.5
df.sample(withReplacement,fraction,seed).count()

138

In [57]:
# 4.13 임의 분할
data_frame = df.randomSplit([0.25,0.75],seed)
data_frame[0].count() > data_frame[1].count()

False

In [69]:
# 로우 합치기와 추가하기
from pyspark.sql import Row
schema =  df.schema
newRows = [
    Row("new 1","other 1",5),
    Row("new 2","other 2",2),
]
parallelizaedRows = spark.sparkContext.parallelize(newRows)
newDF = spark.createDataFrame(parallelizaedRows,schema)

df.union(newDF)\
.where("count = 1")\
.where(col("ORIGIN_COUNTRY_NAME") != "United States")\
.show()

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Croatia|    1|
|    United States|          Singapore|    1|
|    United States|          Gibraltar|    1|
|    United States|             Cyprus|    1|
|    United States|            Estonia|    1|
|    United States|          Lithuania|    1|
|    United States|           Bulgaria|    1|
|    United States|            Georgia|    1|
|    United States|            Bahrain|    1|
|    United States|   Papua New Guinea|    1|
|    United States|         Montenegro|    1|
|    United States|            Namibia|    1|
+-----------------+-------------------+-----+



In [70]:
# 4.15 로우 정렬하기

df.orderBy(col("count").desc(),col("DEST_COUNTRY_NAME").asc()).show(2)

+-----------------+-------------------+------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME| count|
+-----------------+-------------------+------+
|    United States|      United States|370002|
|    United States|             Canada|  8483|
+-----------------+-------------------+------+
only showing top 2 rows

