In [2]:
sc

### DataFrame 기본 개념

- Row 타입의 레코드와 각 레코드에 수행할 연산 표현식을 나타내는 여러 컬럼으로 구성된다
- 스키마는 각 컬럼명과 데이터 타입을 정의

In [2]:
# df 생성
df = spark.read.format("json").load("./structured-data/flight-data/json/2015-summary.json")

In [6]:
# 10개만  보기
df.show(10)

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Romania|   15|
|    United States|            Croatia|    1|
|    United States|            Ireland|  344|
|            Egypt|      United States|   15|
|    United States|              India|   62|
|    United States|          Singapore|    1|
|    United States|            Grenada|   62|
|       Costa Rica|      United States|  588|
|          Senegal|      United States|   40|
|          Moldova|      United States|    1|
+-----------------+-------------------+-----+
only showing top 10 rows



### 스키마

- 여러 개의 StructField 타입 필드로 구성된 SturctType 객체
- StructField는 이름 , 데이터타입, 컬럼이 값이 없거난 null일 수 있는지 지정하는 불리언값을 가진다.  (필요한 경우 컬럼과 관련된 메타데이터를 지정할 수 있다)
- 스키마는 복합 데이터 타입인 StructType를 가질 수 있다.
- 스파크는 런타입에 데이터 타입이 스키마의 데이터 타입과 일치하지 않으면 오류 발생
- 스파크는 자체 데이터 타입 정보를 사용 → 프로그래밍 언어의 데이터 타입을 스파크의 데이터 타입으로 설정 불가능하다.

In [7]:
df.printSchema()

root
 |-- DEST_COUNTRY_NAME: string (nullable = true)
 |-- ORIGIN_COUNTRY_NAME: string (nullable = true)
 |-- count: long (nullable = true)



- 스키마를 직접 만들어보고 적용시켜보자

In [24]:
from pyspark.sql.types import StructField, StructType, StringType, LongType

# (이름, 데이터타입, null?, (null=false 시 metadata 지정 가능))
myManualSchema = StructType([
    StructField('DEST_COUNTRY_NAME', StringType(), True),
    StructField('ORIGIN_COUNTRY_NAME', StringType(), True),
    StructField('count', LongType(), False, metadata={"hello":"world"})
])

# 스키마를 기반으로 df 생성
df = spark.read.format('json').schema(myManualSchema)\
     .load("./structured-data/flight-data/json/2015-summary.json")

In [22]:
df.printSchema()

root
 |-- DEST_COUNTRY_NAME: string (nullable = true)
 |-- ORIGIN_COUNTRY_NAME: string (nullable = true)
 |-- count: long (nullable = true)



### 컬럼과 표현식
- 스파크의 컬럼은 우리가 흔히 하는 그 컬럼과 유사하다
- 사용자는 표현식으로 DataFrame의 컬럼을 선택, 조작, 제거할 수 있다
- 컬럼 : 표현식을 사용해 레코드 단위로 계산한 값을 단순하게 나타내는 논리적인 구조
- DataFrame을 이용해서 접근해야한다. (외부 접근 , 컬럼 내용 수정)


In [25]:
df.count

<bound method DataFrame.count of DataFrame[DEST_COUNTRY_NAME: string, ORIGIN_COUNTRY_NAME: string, count: bigint]>

In [26]:
df.columns

['DEST_COUNTRY_NAME', 'ORIGIN_COUNTRY_NAME', 'count']

### 로우 (레코드)

In [27]:
df.first()

Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Romania', count=15)

In [28]:
from pyspark.sql import Row
myRow = Row("Hello", None, 1, False)

# DataFrame의 트랜스포메이션


DataFrame 생성하기

In [35]:
from pyspark.sql import Row
from pyspark.sql.types import StructField, StructType, StringType, LongType
myManualSchema = StructType([
  StructField("some", StringType(), True),
  StructField("col", StringType(), True),
  StructField("names", LongType(), False)
])
myRow1 = Row("Hello", None, 1)
myRow2 = Row("Bye", None,4)
rowList = [myRow1,myRow2]
myDf = spark.createDataFrame(rowList, myManualSchema)
myDf.show()

+-----+----+-----+
| some| col|names|
+-----+----+-----+
|Hello|null|    1|
|  Bye|null|    4|
+-----+----+-----+



### select , selectExpr
- 두 메서드를 사용하면 데이터 테이블에 SQL을 실행하는 것처럼 DataFrame에서도 SQL을 사용할 수 있다.
- select Expr는 select와 Expr 기능을 합친 메서드이고 , 새로운 DataFrame을 생성하는 복잡한 표현식을 간단하게 만드는 도구이다.

In [38]:
df.select('DEST_COUNTRY_NAME', 'ORIGIN_COUNTRY_NAME').show(2)

+-----------------+-------------------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|
+-----------------+-------------------+
|    United States|            Romania|
|    United States|            Croatia|
+-----------------+-------------------+
only showing top 2 rows



In [39]:
from pyspark.sql.functions import expr

df.select(expr('DEST_COUNTRY_NAME AS destination')).show(2)
df.select(expr('DEST_COUNTRY_NAME AS destination').alias('DEST_COUNTRY_NAME')).show(2)

+-------------+
|  destination|
+-------------+
|United States|
|United States|
+-------------+
only showing top 2 rows

+-----------------+
|DEST_COUNTRY_NAME|
+-----------------+
|    United States|
|    United States|
+-----------------+
only showing top 2 rows



In [40]:
df.selectExpr("DEST_COUNTRY_NAME as newColumnName", "DEST_COUNTRY_NAME").show(2)


# COMMAND ----------

df.selectExpr(
  "*", # all original columns
  "(DEST_COUNTRY_NAME = ORIGIN_COUNTRY_NAME) as withinCountry")\
  .show(2)


# COMMAND ----------

df.selectExpr("avg(count)", "count(distinct(DEST_COUNTRY_NAME))").show(2)

+-------------+-----------------+
|newColumnName|DEST_COUNTRY_NAME|
+-------------+-----------------+
|United States|    United States|
|United States|    United States|
+-------------+-----------------+
only showing top 2 rows

+-----------------+-------------------+-----+-------------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|withinCountry|
+-----------------+-------------------+-----+-------------+
|    United States|            Romania|   15|        false|
|    United States|            Croatia|    1|        false|
+-----------------+-------------------+-----+-------------+
only showing top 2 rows

+-----------+---------------------------------+
| avg(count)|count(DISTINCT DEST_COUNTRY_NAME)|
+-----------+---------------------------------+
|1770.765625|                              132|
+-----------+---------------------------------+



### literal

In [41]:
from pyspark.sql.functions import lit
df.select(expr("*"), lit(1).alias("One")).show(2)

+-----------------+-------------------+-----+---+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|One|
+-----------------+-------------------+-----+---+
|    United States|            Romania|   15|  1|
|    United States|            Croatia|    1|  1|
+-----------------+-------------------+-----+---+
only showing top 2 rows



### 컬럼 관련 메서드
- withColumn 메서드를 사용한다
- withColumnRenamed 메서드를 이요하여 컬럼명 변경 가능하다
- drop 메서드를 통해 특정 컬럼을 제거할 수 있다.
- withColumn 메서드에 cast 메서드를 넣어서 다른 데이터 타입으로 형변환할 수 있다.

In [46]:
df.withColumn("numberOne", lit(1)).show(2)


# COMMAND ----------

df.withColumn("withinCountry", expr("ORIGIN_COUNTRY_NAME == DEST_COUNTRY_NAME"))\
  .show(2)


# COMMAND ----------

df.withColumnRenamed("DEST_COUNTRY_NAME", "dest").columns



+-----------------+-------------------+-----+---------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|numberOne|
+-----------------+-------------------+-----+---------+
|    United States|            Romania|   15|        1|
|    United States|            Croatia|    1|        1|
+-----------------+-------------------+-----+---------+
only showing top 2 rows

+-----------------+-------------------+-----+-------------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|withinCountry|
+-----------------+-------------------+-----+-------------+
|    United States|            Romania|   15|        false|
|    United States|            Croatia|    1|        false|
+-----------------+-------------------+-----+-------------+
only showing top 2 rows



['dest', 'ORIGIN_COUNTRY_NAME', 'count']

In [49]:
from pyspark.sql.functions import expr, col, column
# 컬럼 형변환
df.withColumn("count2",col("count").cast("string")).show(2)

+-----------------+-------------------+-----+------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|count2|
+-----------------+-------------------+-----+------+
|    United States|            Romania|   15|    15|
|    United States|            Croatia|    1|     1|
+-----------------+-------------------+-----+------+
only showing top 2 rows



In [50]:
# 컬럼 제거
df.drop('ORIGIN_COUNTRY_NAME').columns

['DEST_COUNTRY_NAME', 'count']

### 로우 필터링하기
- where 메서드나 filter 메서드로 필터링 할 수 있다. (같은 기능 수행)
- 필터 메서드를 연결시켜 여러 필터를 적용할 수 있다.

In [52]:
df.where(col("count") < 2).show(2)

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Croatia|    1|
|    United States|          Singapore|    1|
+-----------------+-------------------+-----+
only showing top 2 rows



In [55]:
df.where(col("count") < 2).where(col("ORIGIN_COUNTRY_NAME") != "Croatia").show(2)

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|          Singapore|    1|
|          Moldova|      United States|    1|
+-----------------+-------------------+-----+
only showing top 2 rows



In [56]:
# 고유한 로우 얻기
df.select("ORIGIN_COUNTRY_NAME", "DEST_COUNTRY_NAME").distinct().show()


+-------------------+--------------------+
|ORIGIN_COUNTRY_NAME|   DEST_COUNTRY_NAME|
+-------------------+--------------------+
|            Romania|       United States|
|            Croatia|       United States|
|            Ireland|       United States|
|      United States|               Egypt|
|              India|       United States|
|          Singapore|       United States|
|            Grenada|       United States|
|      United States|          Costa Rica|
|      United States|             Senegal|
|      United States|             Moldova|
|       Sint Maarten|       United States|
|   Marshall Islands|       United States|
|      United States|              Guyana|
|      United States|               Malta|
|      United States|            Anguilla|
|      United States|             Bolivia|
|           Paraguay|       United States|
|      United States|             Algeria|
|      United States|Turks and Caicos ...|
|          Gibraltar|       United States|
+----------

### 로우 합치기와 추가하기
- DataFrame은 불변성을 가진다. → 레코드를 추가하려면 union 해야 한다.
- union 조건 : 두개의 DataFrame은 반드시 동리한 스키마와 컬럼 수를 가져야 한다.
- union은 순서를 보장하지 않는다.

In [58]:
# 새로운 DataFrame 만들기
from pyspark.sql import Row
schema = df.schema
newRows = [
  Row("New Country", "Other Country", 5 ),
  Row("New Country 2", "Other Country 3", 1 )
]
parallelizedRows = spark.sparkContext.parallelize(newRows)
newDF = spark.createDataFrame(parallelizedRows, schema)

In [59]:
newDF.show()

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|      New Country|      Other Country|    5|
|    New Country 2|    Other Country 3|    1|
+-----------------+-------------------+-----+



In [60]:
# union 메서드로 합치고 필터링하기
df.union(newDF)\
  .where("count = 1")\
  .where(col("ORIGIN_COUNTRY_NAME") != "United States")\
  .show()

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Croatia|    1|
|    United States|          Singapore|    1|
|    United States|          Gibraltar|    1|
|    United States|             Cyprus|    1|
|    United States|            Estonia|    1|
|    United States|          Lithuania|    1|
|    United States|           Bulgaria|    1|
|    United States|            Georgia|    1|
|    United States|            Bahrain|    1|
|    United States|   Papua New Guinea|    1|
|    United States|         Montenegro|    1|
|    United States|            Namibia|    1|
|    New Country 2|    Other Country 3|    1|
+-----------------+-------------------+-----+



In [61]:
# 로우 정렬하기 
from pyspark.sql.functions import desc, asc
df.orderBy(expr("count desc")).show(2)
df.orderBy(col("count").desc(), col("DEST_COUNTRY_NAME").asc()).show(2)

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|          Moldova|      United States|    1|
|    United States|            Croatia|    1|
+-----------------+-------------------+-----+
only showing top 2 rows

+-----------------+-------------------+------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME| count|
+-----------------+-------------------+------+
|    United States|      United States|370002|
|    United States|             Canada|  8483|
+-----------------+-------------------+------+
only showing top 2 rows



In [62]:
# 로우 수 제한하기
df.limit(5).show()

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Romania|   15|
|    United States|            Croatia|    1|
|    United States|            Ireland|  344|
|            Egypt|      United States|   15|
|    United States|              India|   62|
+-----------------+-------------------+-----+



### 집계함수

In [1]:
# 파티션의 수를 줄이고 빠르게 접근할 수 있도록 캐싱했다. 
df = spark.read.format("csv")\
  .option("header", "true")\
  .option("inferSchema", "true")\
  .load("./structured-data/retail-data/all/*.csv")\
  .coalesce(5)
df.cache()
df.createOrReplaceTempView("dfTable")

In [3]:
# 레코드 수 확인
from pyspark.sql.functions import count
df.select(count('StockCode')).show()

+----------------+
|count(StockCode)|
+----------------+
|          541909|
+----------------+



In [4]:
# 정환한 고유 개수
from pyspark.sql.functions import countDistinct
df.select(countDistinct('StockCode')).show()

+-------------------------+
|count(DISTINCT StockCode)|
+-------------------------+
|                     4070|
+-------------------------+



In [5]:
# 근사치 카운트 (최대 추정 요류율을 파라미터로 가진다)
from pyspark.sql.functions import approx_count_distinct
df.select(approx_count_distinct("StockCode", 0.1)).show()

+--------------------------------+
|approx_count_distinct(StockCode)|
+--------------------------------+
|                            3364|
+--------------------------------+



In [6]:
# first와 last
# 첫 열과 마지막 열
from pyspark.sql.functions import first, last
df.select(first('StockCode'), last('StockCode')).show()

+----------------+---------------+
|first(StockCode)|last(StockCode)|
+----------------+---------------+
|          85123A|          22138|
+----------------+---------------+



In [7]:
# sumDistinct: 고유값의 합

from pyspark.sql.functions import min, max, sum, avg, sumDistinct

df.select(min("Quantity"), max("Quantity"), sum("Quantity"), avg("Quantity"), sumDistinct("Quantity")).show()

+-------------+-------------+-------------+----------------+----------------------+
|min(Quantity)|max(Quantity)|sum(Quantity)|   avg(Quantity)|sum(DISTINCT Quantity)|
+-------------+-------------+-------------+----------------+----------------------+
|       -80995|        80995|      5176450|9.55224954743324|                 29310|
+-------------+-------------+-------------+----------------+----------------------+



### 그룹화
- 그룹화 작업은 하나 이상의 컬럼을 그룹화(RelationalGroupedDataSet 반환)하고 집계 연산을 수행(DataFrame 반환) 하는 두 단계로 이뤄진다.

In [8]:
from pyspark.sql.functions import count ,expr

df.groupBy("InvoiceNo").agg(
    count("Quantity").alias("quan"),
    expr("count(Quantity)")).show()

+---------+----+---------------+
|InvoiceNo|quan|count(Quantity)|
+---------+----+---------------+
|   536596|   6|              6|
|   536938|  14|             14|
|   537252|   1|              1|
|   537691|  20|             20|
|   538041|   1|              1|
|   538184|  26|             26|
|   538517|  53|             53|
|   538879|  19|             19|
|   539275|   6|              6|
|   539630|  12|             12|
|   540499|  24|             24|
|   540540|  22|             22|
|  C540850|   1|              1|
|   540976|  48|             48|
|   541432|   4|              4|
|   541518| 101|            101|
|   541783|  35|             35|
|   542026|   9|              9|
|   542375|   6|              6|
|  C542604|   8|              8|
+---------+----+---------------+
only showing top 20 rows



### join

In [10]:
person = spark.createDataFrame([
    (0, "Bill Chambers", 0, [100]),
    (1, "Matei Zaharia", 1, [500, 250, 100]),
    (2, "Michael Armbrust", 1, [250, 100])])\
  .toDF("id", "name", "graduate_program", "spark_status")

graduateProgram = spark.createDataFrame([
    (0, "Masters", "School of Information", "UC Berkeley"),
    (2, "Masters", "EECS", "UC Berkeley"),
    (1, "Ph.D.", "EECS", "UC Berkeley")])\
  .toDF("id", "degree", "department", "school")

sparkStatus = spark.createDataFrame([
    (500, "Vice President"),
    (250, "PMC Member"),
    (100, "Contributor")])\
  .toDF("id", "status")

In [11]:
joinExpression = person['graduate_program'] == graduateProgram['id']
# 어떤 join을 할 지 지정해야 함. inner은 기본값이라 생략 가능
joinType = "inner" # outer, left_outer, right_outer, left_semi, left_anti, cross
person.join(graduateProgram, joinExpression, joinType).show()

+---+----------------+----------------+---------------+---+-------+--------------------+-----------+
| id|            name|graduate_program|   spark_status| id| degree|          department|     school|
+---+----------------+----------------+---------------+---+-------+--------------------+-----------+
|  0|   Bill Chambers|               0|          [100]|  0|Masters|School of Informa...|UC Berkeley|
|  1|   Matei Zaharia|               1|[500, 250, 100]|  1|  Ph.D.|                EECS|UC Berkeley|
|  2|Michael Armbrust|               1|     [250, 100]|  1|  Ph.D.|                EECS|UC Berkeley|
+---+----------------+----------------+---------------+---+-------+--------------------+-----------+



# 과제 
### 이론 정리 하나와 간단한 dataFrame 문제 2개입니다. (3문제)
### 총 캡쳐 3장해서 드라이브에 올려주세요 


### 이론 정리 문제
- spark에서의 join 수행방식인 셔플 조인과 브로드케스트 조인의 특징 대해 각각 3줄 이하로 정리해주세요!
- 캡쳐해주세요!!

### 실습 문제 1번
- (structured-data/retail-data/by-day/2011-12-01.csv 를 읽어옵니다)
- 주어진 함수들을 모두 사용해서 자유롭게 원하는 dataFrame을 출력하는 것입니다. (실습 코드와 겹치지 않게 해주세요!!)

-> groupBy , orderBy , expr , filter (4개를 모두 사용한 DataFrame)

In [14]:
hw_df = spark.read.format('csv').option("header","true").load("./structured-data/retail-data/by-day/2011-12-01.csv")

In [15]:
hw_df.show()

+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
|InvoiceNo|StockCode|         Description|Quantity|        InvoiceDate|UnitPrice|CustomerID|       Country|
+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
|  C579889|    23245|SET OF 3 REGENCY ...|      -8|2011-12-01 08:12:00|     4.15|   13853.0|United Kingdom|
|  C579890|    84947|ANTIQUE SILVER TE...|      -1|2011-12-01 08:14:00|     1.25|   15197.0|United Kingdom|
|  C579890|    23374|RED SPOT PAPER GI...|      -1|2011-12-01 08:14:00|     0.82|   15197.0|United Kingdom|
|  C579890|    84945|MULTI COLOUR SILV...|      -2|2011-12-01 08:14:00|     0.85|   15197.0|United Kingdom|
|  C579891|    23485|BOTANICAL GARDENS...|      -1|2011-12-01 08:18:00|     25.0|   13644.0|United Kingdom|
|  C579891|    23186|FRENCH STYLE STOR...|      -6|2011-12-01 08:18:00|     0.29|   13644.0|United Kingdom|
|  C579892|    23461|SWEETHE

### 실습 문제 2번
- 위에서 사용한 Join함수의 예제 (person, graduateProgram, sparkStatus)를 이용합니다
- inner join을 제외한 서로 다른 join을 적용한 dataFrame을 출력하는 것입니다. (실습 코드와 겹치지 않게 해주세요!!)