# 2교시 기본 연산자 다루기

> 스파크의 "기본 연산자" 와 "데이터프레임"에 대해 학습합니다

## 목차
* [1. 기본 연산자](#1.-기본-연산자)
* [2. RDD 의 특징과 데이터 변환의 종류](#3.-RDD-의-특징과-데이터-변환의-종류)
* [3. 데이터 타입](#4.-데이터-타입)
* [4. 핵심 데이터 프레임 연산자](#2.-핵심-데이터-프레임-연산자)
* [5. 기타 데이터 프레임 연산자](#5.-기타-데이터-프레임-연산자)
* [6. 데이터셋 API](#6.-데이터셋-API)
* [참고자료](#참고자료)

In [1]:
from pyspark.sql import *
from pyspark.sql.functions import *
from pyspark.sql.types import *
from IPython.display import display, display_pretty, clear_output, JSON

spark = (
    SparkSession
    .builder
    .config("spark.sql.session.timeZone", "Asia/Seoul")
    .getOrCreate()
)

# 노트북에서 테이블 형태로 데이터 프레임 출력을 위한 설정을 합니다
spark.conf.set("spark.sql.repl.eagerEval.enabled", True) # display enabled
spark.conf.set("spark.sql.repl.eagerEval.truncate", 100) # display output columns size

# 공통 데이터 위치
home_jovyan = "/home/jovyan"
work_data = f"{home_jovyan}/work/data"
work_dir=!pwd
work_dir = work_dir[0]

# 로컬 환경 최적화
spark.conf.set("spark.sql.shuffle.partitions", 5) # the number of partitions to use when shuffling data for joins or aggregations.
spark.conf.set("spark.sql.streaming.forceDeleteTempCheckpointLocation", "true")
spark

22/04/14 01:43:21 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


## 1. 기본 연산자
### A. RDD
* RDD(Rsilient Distributed Dataset): 불변성을 가지며 병렬로 처리할 수 있는 파티셔닝된 레코드의 모음
  - RDD: 불변성, 한번 생성하면 변경할 수 없음
  - Transformation: 원하는 변경 방법을 알려주는 명령. 추상적인 변경 방법이며, 실제 수행X
  - Lazy Evaluation(지연 연산): 특정 연산 명령을 하면 즉시 데이터를 수정하지 않고, 원시 데이터에 적용할 트랜스포메이션 실행계획 생성하고, 실제로 연산 그래프를 처리하기 직전까지 기다림

### B. Structured API

+ <strong>DataFrame</strong> 구성요소: 레코드, 컬럼
    + 레코드: Row타입(Table의 로우)
    + 컬럼: 레코드에 수행할 연산 표현식(테이블의 컬럼)
+ <strong>스키마</strong>: 각 컬럼명과 데이터 타입을 정의
+ <strong>파티셔닝</strong>: DataFrame이나 Dataset이 클러스터에서 물리적으로 배치되는 형태를 정의
+ <strong>파티셔닝 스키마</strong>: 파티션을 구성하는 방법을 정의 (테이블 Root Directory 의 Sub Directories 구성을 말합니다)

---
### 1.1 데이터 프레임 함수
| 함수 | 설명 | 기타 |
| - | - | - |
| df.printSchema() | 스키마 정보를 출력합니다. | - |
| df.schema | StructType 스키마를 반환합니다 | - |
| df.columns | 컬럼명 정보를 반환합니다 | - |
| df.show(n) | 데이터 n 개를 출력합니다 | - |
| df.first() | 데이터 프레임의 첫 번째 Row 를 반환합니다 | - |
| df.head(n) | 데이터 프레임의 처음부터 n 개의 Row 를 반환합니다 | - |
| df.createOrReplaceTempView | 임시 뷰 테이블을 생성합니다 | - |
| df.union(newdf) | 데이터프레임 간의 유니온 연산을 수행합니다 | - |
| df.limit(n) | 추출할 로우수 제한 | T |
| df.repartition(n) | 파티션 재분배, 셔플발생 | - |
| df.coalesce() | 셔플하지 않고 파티션을 병합 | 마지막 스테이지의 reduce 수가 줄어드는 효과로 성능저하에 유의해야 합니다 |
| df.collect() | 모든 데이터 수집, 반환 | A |
| df.take(n) | 상위 n개 로우 반환 | A |


### 1.2 컬럼 함수

#### 컬럼 이란?
> <strong>컬럼: 표현식을 사용해 레코드 단위로 계산한 값을 단순하게 나타내는 논리적인 구조</strong> (테이블의 컬럼으로 생각할 수 있음)
* 컬럼의 실제값을 얻으려면 로우가 필요하고, 로우를 얻으려면 DataFrame이 필요 (로우는 데이터 레코드)
  - DataFrame을 통하지 않으면 외부에서 컬럼에 접근 불가
  - DataFrame -> Row -> Column
* 컬럼의 특징
  - 컬럼의 내용을 수정하려면 반드시 DataFrame의 스파크 트랜포메이션을 사용
  - col, column 함수를 사용하는 것이 가장 간편
  - 컬럼은 컬럼명을 카탈로그에 저장된 정보와 비교하기 전까지 미확인 상태
  - 분석기가 동작하는 단계에서 컬럼과 테이블을 분석

| 함수 | 설명 | 기타 |
| - | - | - |
| df.select | 컬럼이나 표현식 사용  | - |
| df.selectExpr | 문자열 표현식 사용 = df.select(expr()) | - |
| df.withColumn(컬럼명, 표현식) | 컬럼 추가, 비교, 컬럼명 변경 | - |
| df.withColumnRenamed(old_name, new_name) | 컬럼명 변경 | - |
| df.drop() | 컬럼 삭제 | - |
| df.where | 로우 필터링 | - |
| df.filter | 로우 필터링 | - |
| df.sort, df.orderBy | 정렬 | - |
| df.sortWithinPartitions | 파티션별 정렬 | - |

### 1.3 기타 함수

#### 표현식
> <strong>표현식: DataFrame 레코드의 여러 값에 대한 트랜스포메이션 집합을 의미</strong>

* 여러 컬럼명을 입력받아 식별하고 단일 값을 만들기 위해 다양한 표현식을 각 레코드에 적용하는 함수
  - <strong>표현식은 expr함수로 사용</strong>
  - <strong>컬럼은 단지 표현식일 뿐</strong>
  - <strong>표현식은 연산 순서를 지정하는 논리적 트리로 컴파일됨</strong>

---

| 함수 | 설명 | 기타 |
| - | - | - |
| expr("someCol - 5") | 표현식 | - |
| lit() | 리터럴 | - |
| cast() | 컬럼 데이터 타입 변경 | - |
| distinct() | unique row | - |
| desc(), asc() | 정렬 순서 | - |



#### DataFrame 에 대해서는 밑에서 더 실습을 하고 먼저 RDD 부터 보겠습니다.

## 2. RDD 의 특징과 데이터 변환의 종류

| 특징 | 설명 | 기타 |
|---|---|---|
| dependencies | resilency | 리니지를 통해 의존성 정보를 유지함으로써 언제든 다시 수행할 수 있는 회복력을 가집니다 |
| partitions | parallelize computation | 파티션 단위로 데이터를 저장 관리하므로써 병렬 처리를 가능하게 합니다 |
| compute function | Iterator\[T\] | RDD로 저장되는 모든 데이터는 반복자를 통해 함수를 적용할 수 있습니다 |

* 반면에 compute function 의 내부를 spark 가 알 수 없기 때문에 오류를 찾아내가 어려우며, Python 과 같은 스크립트 언어는 generic object 로만 인식이 되므로 호환하기 어려우며, T 타입의 객체는 직렬화되어 전달되기만 할 뿐 스파크는 해당 데이터 타입 T 에 대해 알 수 없습니다

> RDD 를 통해 데이터 처리하는 방법과, 구조화된 API 를 통해 처리하는 방법을 비교해 보고, 이러한 고수준의 DSL 연산자를 통해 보다 단순하게 표현이 가능합니다.

### 2.1 RDD 통한 데이터 변환

In [7]:
# parallelize 로 RDD 형태로 만들어줌
dataRDD = spark.sparkContext.parallelize([("Cat", 30), ("Dog", 28), ("Monkey", 28), ("Cat", 24), ("Dog", 10)])
# dataRDD.toDF().show(5) 

# map
map_rdd = dataRDD.map(lambda x: (x[0], (x[1], 1)))
# map_rdd.toDF().show()

# map - reduce
# reduce = (
#     dataRDD.map(lambda x: (x[0], (x[1], 1)))
#      .reduceByKey(lambda v1, v2: (v1[0] + v2[0], v1[1] + v2[1])))
# reduce.toDF().show()

# map - reduce - map
agesRDD = (
    dataRDD.map(lambda x: (x[0], (x[1], 1)))
     .reduceByKey(lambda v1, v2: (v1[0] + v2[0], v1[1] + v2[1]))
     .map(lambda v: (v[0], v[1][0]/v[1][1]))
)

agesRDD.toDF(["Name", "Age"]).show()

+------+----+
|  Name| Age|
+------+----+
|Monkey|28.0|
|   Cat|27.0|
|   Dog|19.0|
+------+----+



### 2.2 Structured API 통한 데이터 변환

In [8]:
spark = SparkSession.builder.appName("동물의 평균 수명").getOrCreate()
animal = spark.createDataFrame([("Cat", 30), ("Dog", 28), ("Monkey", 28), ("Cat", 24), ("Dog", 10)], ["Name", "Age"])
ages = animal.select("Name", "Age").groupBy("Name").agg(avg("Age").alias("Age"))
ages.show(truncate=False)

+------+----+
|Name  |Age |
+------+----+
|Monkey|28.0|
|Cat   |27.0|
|Dog   |19.0|
+------+----+



### 1.4 Structured API 활용 가이드

+ <strong>DataFrame</strong> 구성요소: 레코드, 컬럼
    + 레코드: Row타입(Table의 로우)
    + 컬럼: 레코드에 수행할 연산 표현식(테이블의 컬럼)
+ <strong>스키마</strong>: 각 컬럼명과 데이터 타입을 정의
+ <strong>파티셔닝</strong>: DataFrame이나 Dataset이 클러스터에서 물리적으로 배치되는 형태를 정의
+ <strong>파티셔닝 스키마</strong>: 파티션을 구성하는 방법을 정의 (테이블 Root Directory 의 Sub Directories 구성을 말합니다)

---
### 1.1 데이터 프레임 함수
| 함수 | 설명 | 기타 |
| - | - | - |
| df.printSchema() | 스키마 정보를 출력합니다. | - |
| df.schema | StructType 스키마를 반환합니다 | - |
| df.columns | 컬럼명 정보를 반환합니다 | - |
| df.show(n) | 데이터 n 개를 출력합니다 | - |
| df.first() | 데이터 프레임의 첫 번째 Row 를 반환합니다 | - |
| df.head(n) | 데이터 프레임의 처음부터 n 개의 Row 를 반환합니다 | - |
| df.createOrReplaceTempView | 임시 뷰 테이블을 생성합니다 | - |
| df.union(newdf) | 데이터프레임 간의 유니온 연산을 수행합니다 | - |
| df.limit(n) | 추출할 로우수 제한 | T |
| df.repartition(n) | 파티션 재분배, 셔플발생 | - |
| df.coalesce() | 셔플하지 않고 파티션을 병합 | 마지막 스테이지의 reduce 수가 줄어드는 효과로 성능저하에 유의해야 합니다 |
| df.collect() | 모든 데이터 수집, 반환 | A |
| df.take(n) | 상위 n개 로우 반환 | A |


### 1.2 컬럼 함수

#### 컬럼 이란?
> <strong>컬럼: 표현식을 사용해 레코드 단위로 계산한 값을 단순하게 나타내는 논리적인 구조</strong> (테이블의 컬럼으로 생각할 수 있음)
* 컬럼의 실제값을 얻으려면 로우가 필요하고, 로우를 얻으려면 DataFrame이 필요 (로우는 데이터 레코드)
  - DataFrame을 통하지 않으면 외부에서 컬럼에 접근 불가
  - DataFrame -> Row -> Column
* 컬럼의 특징
  - 컬럼의 내용을 수정하려면 반드시 DataFrame의 스파크 트랜포메이션을 사용
  - col, column 함수를 사용하는 것이 가장 간편 (책에서는 col 함수 사용)
  - 컬럼은 컬럼명을 카탈로그에 저장된 정보와 비교하기 전까지 미확인 상태
  - 분석기가 동작하는 단계에서 컬럼과 테이블을 분석

| 함수 | 설명 | 기타 |
| - | - | - |
| df.select | 컬럼이나 표현식 사용  | - |
| df.selectExpr | 문자열 표현식 사용 = df.select(expr()) | - |
| df.withColumn(컬럼명, 표현식) | 컬럼 추가, 비교, 컬럼명 변경 | - |
| df.withColumnRenamed(old_name, new_name) | 컬럼명 변경 | - |
| df.drop() | 컬럼 삭제 | - |
| df.where | 로우 필터링 | - |
| df.filter | 로우 필터링 | - |
| df.sort, df.orderBy | 정렬 | - |
| df.sortWithinPartitions | 파티션별 정렬 | - |

### 1.3 기타 함수

#### 표현식
> <strong>표현식: DataFrame 레코드의 여러 값에 대한 트랜스포메이션 집합을 의미</strong>

* 여러 컬럼명을 입력받아 식별하고 단일 값을 만들기 위해 다양한 표현식을 각 레코드에 적용하는 함수
  - <strong>표현식은 expr함수로 사용</strong>
  - <strong>컬럼은 단지 표현식일 뿐</strong>
  - <strong>표현식은 연산 순서를 지정하는 논리적 트리로 컴파일됨</strong>

---

| 함수 | 설명 | 기타 |
| - | - | - |
| expr("someCol - 5") | 표현식 | - |
| lit() | 리터럴 | - |
| cast() | 컬럼 데이터 타입 변경 | - |
| distinct() | unique row | - |
| desc(), asc() | 정렬 순서 | - |



---
### 1.1 데이터 프레임 함수
| 함수 | 설명 | 기타 |
| - | - | - |
| df.printSchema() | 스키마 정보를 출력합니다. | - |
| df.schema | StructType 스키마를 반환합니다 | - |
| df.columns | 컬럼명 정보를 반환합니다 | - |
| df.show(n) | 데이터 n 개를 출력합니다 | - |
| df.first() | 데이터 프레임의 첫 번째 Row 를 반환합니다 | - |
| df.head(n) | 데이터 프레임의 처음부터 n 개의 Row 를 반환합니다 | - |
| df.createOrReplaceTempView | 임시 뷰 테이블을 생성합니다 | - |
| df.union(newdf) | 데이터프레임 간의 유니온 연산을 수행합니다 | - |
| df.limit(n) | 추출할 로우수 제한 | T |
| df.repartition(n) | 파티션 재분배, 셔플발생 | - |
| df.coalesce() | 셔플하지 않고 파티션을 병합 | 마지막 스테이지의 reduce 수가 줄어드는 효과로 성능저하에 유의해야 합니다 |
| df.collect() | 모든 데이터 수집, 반환 | A |
| df.take(n) | 상위 n개 로우 반환 | A |


### 1.2 컬럼 함수

#### 컬럼 이란?
> <strong>컬럼: 표현식을 사용해 레코드 단위로 계산한 값을 단순하게 나타내는 논리적인 구조</strong> (테이블의 컬럼으로 생각할 수 있음)
* 컬럼의 실제값을 얻으려면 로우가 필요하고, 로우를 얻으려면 DataFrame이 필요 (로우는 데이터 레코드)
  - DataFrame을 통하지 않으면 외부에서 컬럼에 접근 불가
  - DataFrame -> Row -> Column
* 컬럼의 특징
  - 컬럼의 내용을 수정하려면 반드시 DataFrame의 스파크 트랜포메이션을 사용
  - col, column 함수를 사용하는 것이 가장 간편 (책에서는 col 함수 사용)
  - 컬럼은 컬럼명을 카탈로그에 저장된 정보와 비교하기 전까지 미확인 상태
  - 분석기가 동작하는 단계에서 컬럼과 테이블을 분석

| 함수 | 설명 | 기타 |
| - | - | - |
| df.select | 컬럼이나 표현식 사용  | - |
| df.selectExpr | 문자열 표현식 사용 = df.select(expr()) | - |
| df.withColumn(컬럼명, 표현식) | 컬럼 추가, 비교, 컬럼명 변경 | - |
| df.withColumnRenamed(old_name, new_name) | 컬럼명 변경 | - |
| df.drop() | 컬럼 삭제 | - |
| df.where | 로우 필터링 | - |
| df.filter | 로우 필터링 | - |
| df.sort, df.orderBy | 정렬 | - |
| df.sortWithinPartitions | 파티션별 정렬 | - |

### 1.3 기타 함수

#### 표현식
> <strong>표현식: DataFrame 레코드의 여러 값에 대한 트랜스포메이션 집합을 의미</strong>

* 여러 컬럼명을 입력받아 식별하고 단일 값을 만들기 위해 다양한 표현식을 각 레코드에 적용하는 함수
  - <strong>표현식은 expr함수로 사용</strong>
  - <strong>컬럼은 단지 표현식일 뿐</strong>
  - <strong>표현식은 연산 순서를 지정하는 논리적 트리로 컴파일됨</strong>

---

| 함수 | 설명 | 기타 |
| - | - | - |
| expr("someCol - 5") | 표현식 | - |
| lit() | 리터럴 | - |
| cast() | 컬럼 데이터 타입 변경 | - |
| distinct() | unique row | - |
| desc(), asc() | 정렬 순서 | - |



### 1.4 Structured API 활용 가이드

+ <strong>DataFrame</strong> 구성요소: 레코드, 컬럼
    + 레코드: Row타입(Table의 로우)
    + 컬럼: 레코드에 수행할 연산 표현식(테이블의 컬럼)
+ <strong>스키마</strong>: 각 컬럼명과 데이터 타입을 정의
+ <strong>파티셔닝</strong>: DataFrame이나 Dataset이 클러스터에서 물리적으로 배치되는 형태를 정의
+ <strong>파티셔닝 스키마</strong>: 파티션을 구성하는 방법을 정의 (테이블 Root Directory 의 Sub Directories 구성을 말합니다)

> immutable 하며, 모든 transformation 들의 lineage 를 유지합니다. 또한 컬럼을 변경, 추가 등을 통해 새로운 데이터프레임을 생성합니다.

## 4 핵심 데이터 프레임 연산자

### 4.1 파일로 부터 테이블 만들어 사용하기

In [9]:
print("# 원시 데이터로 부터 읽거나, Spark SQL 통한 결과는 항상 데이터프레임이 생성됩니다")
df = spark.read.json(f"{work_data}/flight-data/json/2015-summary.json")
df.createOrReplaceTempView("2015_summary")

sql_result = spark.sql("SELECT * FROM 2015_summary").show(5)

# 원시 데이터로 부터 읽거나, Spark SQL 통한 결과는 항상 데이터프레임이 생성됩니다
+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Romania|   15|
|    United States|            Croatia|    1|
|    United States|            Ireland|  344|
|            Egypt|      United States|   15|
|    United States|              India|   62|
+-----------------+-------------------+-----+
only showing top 5 rows



### 4.2 특정 컬럼 선택 (select, selectExpr)
> 아래의 모든 예제에서 컬럼 선택 시에 select(col("컬럼명")) 으로 접근할 수도 있지만 **selectExpr("컬럼명") 이 간결하기 때문에 앞으로는 가능한 표현식으로 사용**하겠습니다 <br>
컬럼 표현식의 경우 반드시 하나의 컬럼은 하나씩 표현되어야만 합니다.  <br>
잘된예 : "컬럼1", "컬럼2" <br>
잘못된예: "컬럼1, 컬럼2"

In [11]:
from pyspark.sql.functions import *

print("# select 표현은 컬럼만 입력이 가능하며, 함수나 기타 표현식을 사용할 수 없습니다. 사용하기 위해서는 functions 를 임포트 하고, 개별 함수의 특징을 잘 이해하고 사용해야 합니다")
df.select(upper(col("DEST_COUNTRY_NAME")), "ORIGIN_COUNTRY_NAME").show(2)

print("# selectExpr 별도의 임포트 없이, 모든 표현식을 사용할 수 있습니다")
df.selectExpr("upper(DEST_COUNTRY_NAME)", "ORIGIN_COUNTRY_NAME").show(2)

# select 표현은 컬럼만 입력이 가능하며, 함수나 기타 표현식을 사용할 수 없습니다. 사용하기 위해서는 functions 를 임포트 하고, 개별 함수의 특징을 잘 이해하고 사용해야 합니다
+------------------------+-------------------+
|upper(DEST_COUNTRY_NAME)|ORIGIN_COUNTRY_NAME|
+------------------------+-------------------+
|           UNITED STATES|            Romania|
|           UNITED STATES|            Croatia|
+------------------------+-------------------+
only showing top 2 rows

# selectExpr 별도의 임포트 없이, 모든 표현식을 사용할 수 있습니다
+------------------------+-------------------+
|upper(DEST_COUNTRY_NAME)|ORIGIN_COUNTRY_NAME|
+------------------------+-------------------+
|           UNITED STATES|            Romania|
|           UNITED STATES|            Croatia|
+------------------------+-------------------+
only showing top 2 rows



In [12]:
print("# 컬럼의 앨리어스 혹은 전체 컬럼을 위한 * 도 사용할 수 있습니다")
df.selectExpr("DEST_COUNTRY_NAME as newColmnName", "DEST_COUNTRY_NAME").show(2)

df.selectExpr("*", "(DEST_COUNTRY_NAME = ORIGIN_COUNTRY_NAME) as withinCountry").show(2)

# 컬럼의 앨리어스 혹은 전체 컬럼을 위한 * 도 사용할 수 있습니다
+-------------+-----------------+
| newColmnName|DEST_COUNTRY_NAME|
+-------------+-----------------+
|United States|    United States|
|United States|    United States|
+-------------+-----------------+
only showing top 2 rows

+-----------------+-------------------+-----+-------------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|withinCountry|
+-----------------+-------------------+-----+-------------+
|    United States|            Romania|   15|        false|
|    United States|            Croatia|    1|        false|
+-----------------+-------------------+-----+-------------+
only showing top 2 rows



### <font color=green>1. [기본]</font> "/home/jovyan/work/data/flight-data/json/2015-summary.json" 경로의 JSON 데이터를 읽고
#### 1. 스키마를 출력하세요
#### 2. 10건의 데이터를 출력하세요
#### 3. selectExpr 구문 혹은 spark sql 구문을 이용하여 DEST_COUNTRY_NAME 컬럼은 대문자로, ORIGIN_COUNTRY_NAME 컬럼을 소문자로 2개의 컬럼을 조회하세요

<details><summary>[실습1] 출력 결과 확인 </summary>

> 아래와 유사하게 방식으로 작성 되었다면 정답입니다


```python
df1 = (
    spark
    .read
    .option("header", "true")
    .option("inferSchema", "true")
    .json(f"{work_data}/flight-data/json/2015-summary.json")
)
    
df1.printSchema()
answer = df1.createOrReplaceTempView("2015_summary")
spark.sql("select upper(DEST_COUNTRY_NAME), lower(ORIGIN_COUNTRY_NAME) from 2015_summary").show(10)
```

</details>


In [9]:
# 여기에 실습 코드를 작성하고 실행하세요 (Shift+Enter)
df1 = (
    spark
    .read
    .option("header", "true")
    .option("inferSchema", "true")
    .json(f"{work_data}/flight-data/json/2015-summary.json")
)

df1.printSchema()
answer = df1.createOrReplaceTempView("2015_summary")
spark.sql("select upper(DEST_COUNTRY_NAME), lower(ORIGIN_COUNTRY_NAME) from 2015_summary").show(10)

root
 |-- DEST_COUNTRY_NAME: string (nullable = true)
 |-- ORIGIN_COUNTRY_NAME: string (nullable = true)
 |-- count: long (nullable = true)

+------------------------+--------------------------+
|upper(DEST_COUNTRY_NAME)|lower(ORIGIN_COUNTRY_NAME)|
+------------------------+--------------------------+
|           UNITED STATES|                   romania|
|           UNITED STATES|                   croatia|
|           UNITED STATES|                   ireland|
|                   EGYPT|             united states|
|           UNITED STATES|                     india|
|           UNITED STATES|                 singapore|
|           UNITED STATES|                   grenada|
|              COSTA RICA|             united states|
|                 SENEGAL|             united states|
|                 MOLDOVA|             united states|
+------------------------+--------------------------+
only showing top 10 rows



### 4.3 상수값 사용하기

In [13]:
# 리터럴(literal)을 사용한 리터럴 상수 값 컬럼 추가
from pyspark.sql.functions import lit

# df.select(expr("*"), lit(1).alias("One")).show(2)
df.selectExpr("*", "1 as One").show(2)

+-----------------+-------------------+-----+---+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|One|
+-----------------+-------------------+-----+---+
|    United States|            Romania|   15|  1|
|    United States|            Croatia|    1|  1|
+-----------------+-------------------+-----+---+
only showing top 2 rows



### 4.4 컬럼 추가하기

In [14]:
print("# withColumn(컬럼명, 표현식) 으로 컬럼 추가")
df.withColumn("numberOne", lit(1)).show(2)

# withColumn(컬럼명, 표현식) 으로 컬럼 추가
+-----------------+-------------------+-----+---------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|numberOne|
+-----------------+-------------------+-----+---------+
|    United States|            Romania|   15|        1|
|    United States|            Croatia|    1|        1|
+-----------------+-------------------+-----+---------+
only showing top 2 rows



In [16]:
print("# 컬럼의 대소 비교를 통한 불리언 값 반환")
df.withColumn("withinCountry", expr("ORIGIN_COUNTRY_NAME == DEST_COUNTRY_NAME")).show(2)

# 컬럼의 대소 비교를 통한 불리언 값 반환
+-----------------+-------------------+-----+-------------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|withinCountry|
+-----------------+-------------------+-----+-------------+
|    United States|            Romania|   15|        false|
|    United States|            Croatia|    1|        false|
+-----------------+-------------------+-----+-------------+
only showing top 2 rows



In [17]:
print("# 존재하는 컬럼을 표현식을 통해 새로운 컬럼 생성, 기존 컬럼을 삭제")
before = df
before.printSchema()

after = before.withColumn("Destination", expr("DEST_COUNTRY_NAME"))
after.printSchema()

# 존재하는 컬럼을 표현식을 통해 새로운 컬럼 생성, 기존 컬럼을 삭제
root
 |-- DEST_COUNTRY_NAME: string (nullable = true)
 |-- ORIGIN_COUNTRY_NAME: string (nullable = true)
 |-- count: long (nullable = true)

root
 |-- DEST_COUNTRY_NAME: string (nullable = true)
 |-- ORIGIN_COUNTRY_NAME: string (nullable = true)
 |-- count: long (nullable = true)
 |-- Destination: string (nullable = true)



### <font color=green>2. [기본]</font> f"{work_data}/flight-data/json/2015-summary.json" 경로의 JSON 데이터를 읽고
#### 1. 스키마를 출력하세요
#### 2. 10건의 데이터를 출력하세요
#### 3. 기존의 컬럼은 그대로 두고, ORIGIN_COUNTRY_NAME, DEST_COUNTRY_NAME 2개의 컬럼을 각각 소문자, 대문자로 변경한 컬럼 ORIGIN_COUNTRY_NAME_LOWER, DEST_COUNTRY_NAME_UPPER 총 4개의 컬럼을 출력하세요

<details><summary>[실습2] 출력 결과 확인 </summary>

> 아래와 유사하게 방식으로 작성 되었다면 정답입니다


```python
df2 = (
    spark
    .read
    .option("header", "true")
    .option("inferSchema", "true")
    .json(f"{work_data}/flight-data/json/2015-summary.json")
)
    
df2.printSchema()
answer = df2.createOrReplaceTempView("2015_summary")
spark.sql("""select 
    ORIGIN_COUNTRY_NAME, 
    DEST_COUNTRY_NAME, 
    lower(ORIGIN_COUNTRY_NAME) as ORIGIN_COUNTRY_NAME_LOWER, 
    upper(DEST_COUNTRY_NAME) as DEST_COUNTRY_NAME_UPPER 
    from 2015_summary"""
).show(10)
```

</details>


In [29]:
# 여기에 실습 코드를 작성하고 실행하세요 (Shift+Enter)
df2 = (
    spark
    .read
    .option("header", "true")
    .option("inferSchema", "true")
    .json(f"{work_data}/flight-data/json/2015-summary.json")
)
df2.show()

df2.printSchema()
answer = df2.createOrReplaceTempView("2015_summary")
spark.sql("""select 
    ORIGIN_COUNTRY_NAME, 
    DEST_COUNTRY_NAME, 
    lower(ORIGIN_COUNTRY_NAME) as ORIGIN_COUNTRY_NAME_LOWER, 
    upper(DEST_COUNTRY_NAME) as DEST_COUNTRY_NAME_UPPER 
    from 2015_summary"""
).show(10)

+--------------------+-------------------+-----+
|   DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+--------------------+-------------------+-----+
|       United States|            Romania|   15|
|       United States|            Croatia|    1|
|       United States|            Ireland|  344|
|               Egypt|      United States|   15|
|       United States|              India|   62|
|       United States|          Singapore|    1|
|       United States|            Grenada|   62|
|          Costa Rica|      United States|  588|
|             Senegal|      United States|   40|
|             Moldova|      United States|    1|
|       United States|       Sint Maarten|  325|
|       United States|   Marshall Islands|   39|
|              Guyana|      United States|   64|
|               Malta|      United States|    1|
|            Anguilla|      United States|   41|
|             Bolivia|      United States|   30|
|       United States|           Paraguay|    6|
|             Algeri

### 4.5 컬럼명 바꾸기

In [15]:
print("# 컬럼 명 변경하기")
df.withColumnRenamed("DEST_COUNTRY_NAME", "Destination").columns

# 컬럼 명 변경하기


['Destination', 'ORIGIN_COUNTRY_NAME', 'count']

### 4.6 컬럼 제거하기

In [16]:
print("# 특정 컬럼을 제거합니다")
df.printSchema()
df.drop("ORIGIN_COUNTRY_NAME").columns

# 특정 컬럼을 제거합니다
root
 |-- DEST_COUNTRY_NAME: string (nullable = true)
 |-- ORIGIN_COUNTRY_NAME: string (nullable = true)
 |-- count: long (nullable = true)



['DEST_COUNTRY_NAME', 'count']

In [18]:
print("# 기본적으로 스파크는 대소문자를 가리지 않지만, 옵션을 통해서 구분이 가능합니다")
spark.conf.set('spark.sql.caseSensitive', True)
caseSensitive = df.drop("dest_country_name")
caseSensitive.printSchema()

spark.conf.set('spark.sql.caseSensitive', False)
caseInsensitive = df.drop("dest_country_name")
caseInsensitive.printSchema()

# 기본적으로 스파크는 대소문자를 가리지 않지만, 옵션을 통해서 구분이 가능합니다
root
 |-- DEST_COUNTRY_NAME: string (nullable = true)
 |-- ORIGIN_COUNTRY_NAME: string (nullable = true)
 |-- count: long (nullable = true)

root
 |-- ORIGIN_COUNTRY_NAME: string (nullable = true)
 |-- count: long (nullable = true)



In [20]:
print("# 한 번에 여러 컬럼도 삭제할 수 있습니다")
df.printSchema()
df.drop("ORIGIN_COUNTRY_NAME", "DEST_COUNTRY_NAME").columns # 여러 컬럼을 지우기

# 한 번에 여러 컬럼도 삭제할 수 있습니다
root
 |-- DEST_COUNTRY_NAME: string (nullable = true)
 |-- ORIGIN_COUNTRY_NAME: string (nullable = true)
 |-- count: long (nullable = true)



['count']

### <font color=blue>3. [중급]</font> f"{work_data}/flight-data/json/2015-summary.json" 경로의 JSON 데이터를 읽고
#### 1. ORIGIN_COUNTRY_NAME 컬럼은 Origin 으로 이름을 변경하고
#### 2. DEST_COUNTRY_NAME 컬럼은 DestUpper 으로 대문자로 변경한 컬럼을 추가하고
#### 3. DEST_COUNTRY_NAME 컬럼은 Drop 하고, Origin, DestUpper 2개의 컬럼만 남은 DataFrame 의 데이터를 출력하세요
#### 4. 최종 스키마를 출력하세요

<details><summary>[실습3] 출력 결과 확인 </summary>

> 아래와 유사하게 방식으로 작성 되었다면 정답입니다


```python
df3 = (
    spark
    .read
    .option("header", "true")
    .option("inferSchema", "true")
    .json(f"{work_data}/flight-data/json/2015-summary.json")
)
    
df3.printSchema()
answer = (df3
    .withColumnRenamed("ORIGIN_COUNTRY_NAME", "Origin")
    .withColumn("DestUpper", upper("DEST_COUNTRY_NAME"))
    .drop("DEST_COUNTRY_NAME", "count")
)
answer.show()
answer.printSchema()
```

</details>


In [31]:
# 여기에 실습 코드를 작성하고 실행하세요 (Shift+Enter)
df3 = (
    spark
    .read
    .option("header", "true")
    .option("inferSchema", "true")
    .json(f"{work_data}/flight-data/json/2015-summary.json")
)

df3.printSchema()
answer = (df3
    .withColumnRenamed("ORIGIN_COUNTRY_NAME", "Origin")
    .withColumn("DestUpper", upper("DEST_COUNTRY_NAME"))
    .drop("DEST_COUNTRY_NAME", "count")
)
answer.show()
answer.printSchema()

root
 |-- DEST_COUNTRY_NAME: string (nullable = true)
 |-- ORIGIN_COUNTRY_NAME: string (nullable = true)
 |-- count: long (nullable = true)

+----------------+--------------------+
|          Origin|           DestUpper|
+----------------+--------------------+
|         Romania|       UNITED STATES|
|         Croatia|       UNITED STATES|
|         Ireland|       UNITED STATES|
|   United States|               EGYPT|
|           India|       UNITED STATES|
|       Singapore|       UNITED STATES|
|         Grenada|       UNITED STATES|
|   United States|          COSTA RICA|
|   United States|             SENEGAL|
|   United States|             MOLDOVA|
|    Sint Maarten|       UNITED STATES|
|Marshall Islands|       UNITED STATES|
|   United States|              GUYANA|
|   United States|               MALTA|
|   United States|            ANGUILLA|
|   United States|             BOLIVIA|
|        Paraguay|       UNITED STATES|
|   United States|             ALGERIA|
|   United States|T

### 4.7 컬럼의 데이터 타입 변경하기

In [21]:
print("# 컬럼의 데이터 유형을 변경합니다")
df.printSchema()

int2str = df.withColumn("str_count", col("count").cast("string"))
int2str.show(5)
int2str.printSchema()

str2int = int2str.withColumn("int_count", col("str_count").cast("int"))
str2int.show(5)
str2int.printSchema()

# 컬럼의 데이터 유형을 변경합니다
root
 |-- DEST_COUNTRY_NAME: string (nullable = true)
 |-- ORIGIN_COUNTRY_NAME: string (nullable = true)
 |-- count: long (nullable = true)

+-----------------+-------------------+-----+---------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|str_count|
+-----------------+-------------------+-----+---------+
|    United States|            Romania|   15|       15|
|    United States|            Croatia|    1|        1|
|    United States|            Ireland|  344|      344|
|            Egypt|      United States|   15|       15|
|    United States|              India|   62|       62|
+-----------------+-------------------+-----+---------+
only showing top 5 rows

root
 |-- DEST_COUNTRY_NAME: string (nullable = true)
 |-- ORIGIN_COUNTRY_NAME: string (nullable = true)
 |-- count: long (nullable = true)
 |-- str_count: string (nullable = true)

+-----------------+-------------------+-----+---------+---------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|str_count|int_cou

### 4.8 레코드 필터링

In [22]:
print("# Where 와 Filter 는 동일합니다")
df.where("count < 2").show(2)
df.filter("count < 2").show(2)

print("# 같은 표현식에 여러 필터를 적용하는 것도 가능합니다")
df.where(col("count") < 2).where(col("ORIGIN_COUNTRY_NAME") != "Croatia").show(2)

# Where 와 Filter 는 동일합니다
+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Croatia|    1|
|    United States|          Singapore|    1|
+-----------------+-------------------+-----+
only showing top 2 rows

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Croatia|    1|
|    United States|          Singapore|    1|
+-----------------+-------------------+-----+
only showing top 2 rows

# 같은 표현식에 여러 필터를 적용하는 것도 가능합니다
+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|          Singapore|    1|
|          Moldova|      United States|    1|
+-----------------+-------------------+-----+
only showing top 2 rows



### 4.9 유일 값 (DISTINCT)

In [23]:
""" distinct 함수 """
print(df.select("ORIGIN_COUNTRY_NAME", "DEST_COUNTRY_NAME").distinct().count())
print(df.select("ORIGIN_COUNTRY_NAME").distinct().count())
# distinctcount?

256
125


### <font color=green>4. [기본] </font> f"{work_data}/flight-data/json/2015-summary.json" 경로의 JSON 데이터를 읽고
#### 1. 스키마를 출력하세요
#### 2. 데이터 10건을 출력하세요
#### 3. count 가 5000 이상, 100000 보다 미만인 ORIGIN_COUNTRY_NAME 를 출력하되 중복을 제거해 주세요

<details><summary>[실습4] 출력 결과 확인 </summary>

> 아래와 유사하게 방식으로 작성 되었다면 정답입니다


```python
df4 = (
    spark
    .read
    .option("header", "true")
    .option("inferSchema", "true")
    .json(f"{work_data}/flight-data/json/2015-summary.json")
)
    
df4.printSchema()
df4.show(10)
df4.selectExpr("min(count)", "max(count)").show()
answer = df4.where(expr("count >= 5000 and count < 100000")).select("ORIGIN_COUNTRY_NAME")
answer.distinct()
```

</details>


In [32]:
# 여기에 실습 코드를 작성하고 실행하세요 (Shift+Enter)
df4 = (
    spark
    .read
    .option("header", "true")
    .option("inferSchema", "true")
    .json(f"{work_data}/flight-data/json/2015-summary.json")
)

df4.printSchema()
df4.show(10)
df4.selectExpr("min(count)", "max(count)").show()
answer = df4.where(expr("count >= 5000 and count < 100000")).select("ORIGIN_COUNTRY_NAME")
answer.distinct()

root
 |-- DEST_COUNTRY_NAME: string (nullable = true)
 |-- ORIGIN_COUNTRY_NAME: string (nullable = true)
 |-- count: long (nullable = true)

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Romania|   15|
|    United States|            Croatia|    1|
|    United States|            Ireland|  344|
|            Egypt|      United States|   15|
|    United States|              India|   62|
|    United States|          Singapore|    1|
|    United States|            Grenada|   62|
|       Costa Rica|      United States|  588|
|          Senegal|      United States|   40|
|          Moldova|      United States|    1|
+-----------------+-------------------+-----+
only showing top 10 rows

+----------+----------+
|min(count)|max(count)|
+----------+----------+
|         1|    370002|
+----------+----------+



ORIGIN_COUNTRY_NAME
Mexico
Canada
United States


### 4.10 정렬 (SORT)

In [24]:
print("# sort 와 orderBy 함수는 동일한 효과를 가집니다")
df.sort("count").show(2)
df.orderBy("count", "DEST_COUNTRY_NAME").show(2)
df.orderBy(col("count"), col("DEST_COUNTRY_NAME")).show(2)

# sort 와 orderBy 함수는 동일한 효과를 가집니다
+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|          Moldova|      United States|    1|
|    United States|            Croatia|    1|
+-----------------+-------------------+-----+
only showing top 2 rows

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|     Burkina Faso|      United States|    1|
|    Cote d'Ivoire|      United States|    1|
+-----------------+-------------------+-----+
only showing top 2 rows

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|     Burkina Faso|      United States|    1|
|    Cote d'Ivoire|      United States|    1|
+-----------------+-------------------+-----+
only showing top 2 rows



In [25]:
from pyspark.sql.functions import *
print("# asc_nulls_first, desc_nulls_first, asc_nulls_last, desc_nulls_last 메서드로 null의 정렬 순서를 지정")
df.sort("DEST_COUNTRY_NAME").show(1)
df.sort(df["DEST_COUNTRY_NAME"].asc_nulls_first()).show(1)
df.sort(df.DEST_COUNTRY_NAME.asc_nulls_first()).show(1)

# asc_nulls_first, desc_nulls_first, asc_nulls_last, desc_nulls_last 메서드로 null의 정렬 순서를 지정
+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|          Algeria|      United States|    4|
+-----------------+-------------------+-----+
only showing top 1 row

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|          Algeria|      United States|    4|
+-----------------+-------------------+-----+
only showing top 1 row

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|          Algeria|      United States|    4|
+-----------------+-------------------+-----+
only showing top 1 row



In [26]:
print("# 정렬의 경우 예약어 컬럼명에 유의해야 하므로, expr 을 사용하거나, 명시적으로 구조화 API 를 사용하는 것도 좋습니다") 
from pyspark.sql.functions import desc, asc
df.orderBy(df["count"].desc()).show(2)
df.orderBy(df.ORIGIN_COUNTRY_NAME.desc(), df.DEST_COUNTRY_NAME.asc()).show(2)
df.orderBy(expr("ORIGIN_COUNTRY_NAME DESC"), expr("DEST_COUNTRY_NAME ASC")).show(2)

# 정렬의 경우 예약어 컬럼명에 유의해야 하므로, expr 을 사용하거나, 명시적으로 구조화 API 를 사용하는 것도 좋습니다
+-----------------+-------------------+------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME| count|
+-----------------+-------------------+------+
|    United States|      United States|370002|
|    United States|             Canada|  8483|
+-----------------+-------------------+------+
only showing top 2 rows

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Vietnam|    2|
|    United States|          Venezuela|  246|
+-----------------+-------------------+-----+
only showing top 2 rows

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|             Angola|   13|
|    United States|           Anguilla|   38|
+-----------------+-------------------+-----+
only showing top 2 rows



### 4.11 로우 수 제한 (LIMIT)

In [27]:
df.limit(5).show()
df.orderBy(expr("count desc")).limit(6).show()

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Romania|   15|
|    United States|            Croatia|    1|
|    United States|            Ireland|  344|
|            Egypt|      United States|   15|
|    United States|              India|   62|
+-----------------+-------------------+-----+

+--------------------+-------------------+-----+
|   DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+--------------------+-------------------+-----+
|               Malta|      United States|    1|
|Saint Vincent and...|      United States|    1|
|       United States|            Croatia|    1|
|       United States|          Gibraltar|    1|
|       United States|          Singapore|    1|
|             Moldova|      United States|    1|
+--------------------+-------------------+-----+



### 4.12 예약 문자와 키워드

> 공백이나 하이픈(-) 같은 예약 문자를 컬럼명에 사용하려면 백틱(`) 문자를 사용해야 합니다

In [28]:
""" withColumn, selectExpr, select 차이점 """
dfWithLongColName = df.withColumn("This Long Column-Name", expr("ORIGIN_COUNTRY_NAME")) # 첫 번째 인수에서 사용하지 않음
dfWithLongColName.show(2)

dfWithLongColName.selectExpr("`This Long Column-Name`", "`This Long Column-Name` as `new col`").show(2) # 사용함
dfWithLongColName.select(expr("`This Long Column-Name`")).show(2)

+-----------------+-------------------+-----+---------------------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|This Long Column-Name|
+-----------------+-------------------+-----+---------------------+
|    United States|            Romania|   15|              Romania|
|    United States|            Croatia|    1|              Croatia|
+-----------------+-------------------+-----+---------------------+
only showing top 2 rows

+---------------------+-------+
|This Long Column-Name|new col|
+---------------------+-------+
|              Romania|Romania|
|              Croatia|Croatia|
+---------------------+-------+
only showing top 2 rows

+---------------------+
|This Long Column-Name|
+---------------------+
|              Romania|
|              Croatia|
+---------------------+
only showing top 2 rows



### 4.13 임의 분할하기

In [27]:
""" randomSplit 함수 """
seed = 42
dataFrames = df.randomSplit([0.25, 0.75], seed)
print(dataFrames[0].count())
print(dataFrames[1].count())

63
193


### 4.14 로우 합치기와 추가하기
+ 동일한 스키마와 컬럼 수를 가져야 함

In [30]:
""" union 함수 """
from pyspark.sql import Row

schema = df.schema
newRows = [
    Row("New Country", "Other Country", 5),
    Row("New Country 2", "Other Country 3", 1)
]

# Parallelized Collections :
# Parallelized collections are created by calling SparkContext’s parallelize method on an existing iterable or collection in your driver program.
# The elements of the collection are copied to form a distributed dataset that can be operated on in parallel. 

# 병렬화된 컬렉션:
# 병렬화된 컬렉션은 드라이버 프로그램의 기존 iterable 또는 컬렉션에 대한 SparkContext의 병렬화 메서드를 호출하여 생성됩니다.
# 컬렉션의 요소는 병렬로 작동할 수 있는 분산 데이터 세트를 형성하기 위해 복사됩니다.

parallelizedRows = spark.sparkContext.parallelize(newRows) 
newDF = spark.createDataFrame(parallelizedRows, schema)

newDF.show()

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|      New Country|      Other Country|    5|
|    New Country 2|    Other Country 3|    1|
+-----------------+-------------------+-----+



In [31]:
df_union = (
    df.union(newDF)
    .where("count = 1")
    .where(col("ORIGIN_COUNTRY_NAME") != "United States")
    .show(5)
)

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Croatia|    1|
|    United States|          Singapore|    1|
|    United States|          Gibraltar|    1|
|    United States|             Cyprus|    1|
|    United States|            Estonia|    1|
+-----------------+-------------------+-----+
only showing top 5 rows



### 4.15 repartition과 cocalesce
> 향후에 사용할 파티션 수가 현재 파티션 수보다 많거나 컬럼을 기준으로 파티션을 만드는 경우에 사용(repartition, 셔플이 필수로 발생)
* 자주 필터링되는 컬럼을 기준으로 파티션 재분배를 권장
* repartition: 물리적인 데이터 구성 제어

In [33]:
""" 파티션 나누기 """
df.rdd.getNumPartitions()
repart_1 = df.repartition(5)
repart_2 = df.repartition(col("DEST_COUNTRY_NAME"))
repart_3 = df.repartition(5, col("DEST_COUNTRY_NAME"))

# repartition 숫자 변경 등 실험내용 공유
print(repart_1.rdd.getNumPartitions())
print(repart_2.rdd.getNumPartitions())
print(repart_3.rdd.getNumPartitions())

5
5
5


+ 셔플하지 않고 파티션을 병합

In [33]:
""" 파티션 합치기 """
df.repartition(5, col("DEST_COUNTRY_NAME")).coalesce(2)

DEST_COUNTRY_NAME,ORIGIN_COUNTRY_NAME,count
Moldova,United States,1
Bolivia,United States,30
Algeria,United States,4
Turks and Caicos Islands,United States,230
Pakistan,United States,12
Marshall Islands,United States,42
Suriname,United States,1
Panama,United States,510
New Zealand,United States,111
Liberia,United States,2


### 4.16 드라이버로 로우 데이터 수집하기
> 대규모 데이터셋에 collect 명령을 수행하면 드라이버 비정상 종료 우려

In [34]:
collectDF = df.limit(5)
collectDF.take(5) # 정수를 인수값으로 사용

[Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Romania', count=15),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Croatia', count=1),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Ireland', count=344),
 Row(DEST_COUNTRY_NAME='Egypt', ORIGIN_COUNTRY_NAME='United States', count=15),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='India', count=62)]

In [35]:
collectDF.show()  # 결과를 정돈된 형태로 출력

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Romania|   15|
|    United States|            Croatia|    1|
|    United States|            Ireland|  344|
|            Egypt|      United States|   15|
|    United States|              India|   62|
+-----------------+-------------------+-----+



In [36]:
collectDF.show(5, False)

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|United States    |Romania            |15   |
|United States    |Croatia            |1    |
|United States    |Ireland            |344  |
|Egypt            |United States      |15   |
|United States    |India              |62   |
+-----------------+-------------------+-----+



In [37]:
collectDF.collect() # 전체 모든 테이터를 수집, 반환

[Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Romania', count=15),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Croatia', count=1),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Ireland', count=344),
 Row(DEST_COUNTRY_NAME='Egypt', ORIGIN_COUNTRY_NAME='United States', count=15),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='India', count=62)]

In [38]:
collectDF.toLocalIterator()

<generator object _local_iterator_from_socket.<locals>.PyLocalIterable.__iter__ at 0x7f14fc013660>

+ 대규모 데이터셋에 collect, toLocalIterator 수행하면 매우 큰 비용(cpu, 메모리, 네트워크) 발생, 드라이버 비정상적 종료 가능성

### <font color=red>5. [고급]</font> f"{work_data}/flight-data/json/2015-summary.json" 경로의 JSON 데이터를 읽고
#### 1. 스키마를 출력하세요
#### 2. 10건의 데이터를 출력하세요
#### 3. count 를 100으로 나눈 몫을 가지는 cnt 컬럼을 추가합니다 표현식은 다음과 같습니다 `expr("floor(count / 100)")` - 힌트: withColumn("컬럼명", "표현식")
#### 4. cnt 컬럼을 기준으로 내림차순 정렬하되, 동순위가 발생하는 경우 ORIGIN_COUNTRY_NAME 오름차순, DEST_COUNTRY_NAME 내림차순으로 정렬하여 출력하세요
#### 5. 출력된 결과의 상위 10개만 제한하여 (limit) display 함수로 출력하세요

<details><summary>[실습5] 출력 결과 확인 </summary>

> 아래와 유사하게 방식으로 작성 되었다면 정답입니다


```python
df5 = (
    spark
    .read
    .option("header", "true")
    .option("inferSchema", "true")
    .json(f"{work_data}/flight-data/json/2015-summary.json")
)
    
df5.printSchema()
answer = df5.withColumn("cnt", expr("floor(count / 100)")).orderBy(desc("cnt"), asc("ORIGIN_COUNTRY_NAME"), desc("DEST_COUNTRY_NAME")).limit(10)
display(answer)
```

</details>


In [39]:
# 여기에 실습 코드를 작성하고 실행하세요 (Shift+Enter)
df5 = (
    spark
    .read
    .option("header", "true")
    .option("inferSchema", "true")
    .json(f"{work_data}/flight-data/json/2015-summary.json")
)

df5.printSchema()
answer = df5.withColumn("cnt", expr("floor(count / 100)")).orderBy(desc("cnt"), asc("ORIGIN_COUNTRY_NAME"), desc("DEST_COUNTRY_NAME")).limit(10)
display(answer)

root
 |-- DEST_COUNTRY_NAME: string (nullable = true)
 |-- ORIGIN_COUNTRY_NAME: string (nullable = true)
 |-- count: long (nullable = true)



DEST_COUNTRY_NAME,ORIGIN_COUNTRY_NAME,count,cnt
United States,United States,370002,3700
United States,Canada,8483,84
Canada,United States,8399,83
United States,Mexico,7187,71
Mexico,United States,7140,71
United Kingdom,United States,2025,20
United States,United Kingdom,1970,19
Japan,United States,1548,15
United States,Dominican Republic,1420,14
United States,Japan,1496,14


## 5. 기타 데이터 프레임 연산자

### 5.1 사전에 스키마를 정의하는 장점
* 데이터 타입을 추론에 대한 신경을 쓸 필요가 없다
* 스키마 추론을 위한 별도의 작업에 드는 리소스를 줄일 수 있다
* 스키마에 맞지 않는 데이터의 오류를 빠르게 인지할 수 있다

### 5.2 스키마를 정의하는 두 가지 방법
* 1. 프로그래밍 방식으로 정의하는 방법
  - 스키마: 여러 개의 StructField 타입 필드로 구성된 StructType 객체
  - [StructField](https://spark.apache.org/docs/latest/api/java/index.html?org/apache/spark/sql/types/StructField.html): (이름), (데이터 타입), (컬럼이 값이 없거나 null일 수 있는지 지정하는 불리언 값) 으로 구성
    - StructField(String name, DataType dataType, boolean nullable, Metadata metadata)
  - Metadata: 해당 컬럼과 관련된 정보이며, 스파크의 머신러닝 라이브러리에서 사용
* 2. DDL 구문을 이용하는 방법

In [38]:
from pyspark.sql.types import *
from pyspark.sql import Row

data = [
    ["정휘센", "안녕하세요 정휘센 입니다", 300],
    ["김싸이언", "안녕하세요 김싸이언 입니다", 200],
    ["유코드제로", "안녕하세요 유코드제로 입니다", 100]
]

print("# 1. Programming Style")
schema1 = StructType([
    StructField("author", StringType(), False),
    StructField("title", StringType(), False),
    StructField("pages", IntegerType(), False),
])
print(schema1)
df1 = spark.createDataFrame(data, schema1)
df1.printSchema()
df1.show(truncate=False)

rows = [
    Row("정휘센", "안녕하세요 정휘센 입니다", 300),
    Row("김싸이언", "안녕하세요 김싸이언 입니다", 200),
    Row("유코드제로", "안녕하세요 유코드제로 입니다", 100)
]

print("\n# 2. DDL Style")
schema2 = "`author` string, `title` string, `pages` int"
print(schema2)
df2 = spark.createDataFrame(rows, schema2)
df2.printSchema()
df2.show(truncate=False)

assert(df1.subtract(df2).count() == 0)
assert(df2.subtract(df1).count() == 0)

# 1. Programming Style
StructType(List(StructField(author,StringType,false),StructField(title,StringType,false),StructField(pages,IntegerType,false)))
root
 |-- author: string (nullable = false)
 |-- title: string (nullable = false)
 |-- pages: integer (nullable = false)

+----------+----------------------------+-----+
|author    |title                       |pages|
+----------+----------------------------+-----+
|정휘센    |안녕하세요 정휘센 입니다    |300  |
|김싸이언  |안녕하세요 김싸이언 입니다  |200  |
|유코드제로|안녕하세요 유코드제로 입니다|100  |
+----------+----------------------------+-----+


# 2. DDL Style
`author` string, `title` string, `pages` int
root
 |-- author: string (nullable = true)
 |-- title: string (nullable = true)
 |-- pages: integer (nullable = true)

+----------+----------------------------+-----+
|author    |title                       |pages|
+----------+----------------------------+-----+
|정휘센    |안녕하세요 정휘센 입니다    |300  |
|김싸이언  |안녕하세요 김싸이언 입니다  |200  |
|유코드제로|안녕하세요 유코드제로 입니다|100  |
+----------+------

### <font color=red>6. [고급]</font> Row 와 문자열을 통한 스키마 구현을 통해 데이터 프레임을 생성하세요
#### 1. 스키마 : id int, name string, payment int
#### 2. 임의의 데이터를 3건 정도 생성해서 데이터프레임을 만들어 보세요
#### 3. 스키마를 출력하세요
#### 4. 데이터를 출력하세요

<details><summary>[실습6] 출력 결과 확인 </summary>

> 아래와 유사하게 방식으로 작성 되었다면 정답입니다


```python
df6 = [
    Row(1, "엘지전자", 1000),
    Row(2, "엘지화학", 2000),
    Row(3, "엘지디스플레이", 3000)
]
sc6 = "`id` int, `name` string, `payment` int"
answer = spark.createDataFrame(df6, sc6)
answer.printSchema()
display(answer)
```

</details>


In [2]:
# 여기에 실습 코드를 작성하고 실행하세요 (Shift+Enter)
df6 = [
    Row(1, "엘지전자", 1000),
    Row(2, "엘지화학", 2000),
    Row(3, "엘지디스플레이", 3000)
]
sc6 = "`id` int, `name` string, `payment` int"
answer = spark.createDataFrame(df6, sc6)
answer.printSchema()
display(answer)

root
 |-- id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- payment: integer (nullable = true)



                                                                                

id,name,payment
1,엘지전자,1000
2,엘지화학,2000
3,엘지디스플레이,3000


### 5.3 중첩된 배열 스키마

In [42]:
schema = StructType([
    StructField("Id", IntegerType(), False),
    StructField("First", StringType(), False),
    StructField("Last", StringType(), False),
    StructField("Url", StringType(), False),
    StructField("Published", StringType(), False),
    StructField("Hits", IntegerType(), False),
    StructField("Campaigns", ArrayType(StringType()), False),
])
path=f"{work_data}/learning-spark/blogs.json"
blogDF = spark.read.schema(schema).json(path)
blogDF.printSchema()
blogDF.show(1, truncate=False)

root
 |-- Id: integer (nullable = true)
 |-- First: string (nullable = true)
 |-- Last: string (nullable = true)
 |-- Url: string (nullable = true)
 |-- Published: string (nullable = true)
 |-- Hits: integer (nullable = true)
 |-- Campaigns: array (nullable = true)
 |    |-- element: string (containsNull = true)

+---+-----+-----+-----------------+---------+----+-------------------+
|Id |First|Last |Url              |Published|Hits|Campaigns          |
+---+-----+-----+-----------------+---------+----+-------------------+
|1  |Jules|Damji|https://tinyurl.1|1/4/2016 |4535|[twitter, LinkedIn]|
+---+-----+-----+-----------------+---------+----+-------------------+
only showing top 1 row



### 5.4 컬럼과 표현식
> 컬럼은 공용 메소드들을 가진 객체들이며, pyspark.sql.functions.expr() 함수를 이용하여 표현식을 그대로 사용할 수 있습니다

* 특히 컬럼 함수를 통해 다양한 연산자를 확인할 수 있습니다.

In [43]:
from pyspark.sql.functions import Column
print(blogDF.columns)
# help(Column)

['Id', 'First', 'Last', 'Url', 'Published', 'Hits', 'Campaigns']


In [44]:
blogDF.withColumn("AuthorsId", (concat(expr("First"), lit("."), expr("Last"), lit("@"), expr("Id")))) \
.select(col("AuthorsId")) \
.show(4)

+---------------+
|      AuthorsId|
+---------------+
|  Jules.Damji@1|
| Brooke.Wenig@2|
|    Denny.Lee@3|
|Tathagata.Das@4|
+---------------+
only showing top 4 rows



In [45]:
blogDF.select(expr("Hits")).show(2)
blogDF.select(col("Hits")).show(2)
blogDF.select("Hits").show(2)

+----+
|Hits|
+----+
|4535|
|8908|
+----+
only showing top 2 rows

+----+
|Hits|
+----+
|4535|
|8908|
+----+
only showing top 2 rows

+----+
|Hits|
+----+
|4535|
|8908|
+----+
only showing top 2 rows



In [46]:
blogDF.sort(col("Id").desc()).show()

+---+---------+-------+-----------------+---------+-----+--------------------+
| Id|    First|   Last|              Url|Published| Hits|           Campaigns|
+---+---------+-------+-----------------+---------+-----+--------------------+
|  6|  Reynold|    Xin|https://tinyurl.6| 3/2/2015|25568| [twitter, LinkedIn]|
|  5|    Matei|Zaharia|https://tinyurl.5|5/14/2014|40578|[web, twitter, FB...|
|  4|Tathagata|    Das|https://tinyurl.4|5/12/2018|10568|       [twitter, FB]|
|  3|    Denny|    Lee|https://tinyurl.3| 6/7/2019| 7659|[web, twitter, FB...|
|  2|   Brooke|  Wenig|https://tinyurl.2| 5/5/2018| 8908| [twitter, LinkedIn]|
|  1|    Jules|  Damji|https://tinyurl.1| 1/4/2016| 4535| [twitter, LinkedIn]|
+---+---------+-------+-----------------+---------+-----+--------------------+



### 5.5 로우 생성 및 다루기
> 로우의 경우 컬럼을 인덱스를 기준으로 접근할 수 있습니다.

#### 레코드와 로우
+ 스파크는 레코드를 Row 객체로 표현
  - (각 로우는 하나의 레코드, '로우'와 '레코드'를 같은 의미로 사용)
  - (대문자로 시작하는 Row는 Row 객체를 의미)
+ Row 객체는 내부에 바이트 배열을 가지며, 오직 컬럼 표현식으로만 다룰 수 있으므로 사용자에게 노출되지 않음
+ DataFrame을 사용해 드라이버에게 개별 로우를 반환하는 명령은 항상 하나 이상의 Row 타입을 반환
+ Row 객체는 스키마 정보를 가지고 있지 않음 (DataFrame만 유일하게 스키마를 가지고 있음)
+ Row 객체를 직접 생성하려면 DataFrame의 스키마와 같은 순서로 값을 명시

In [47]:
""" Row를 확인하는 예문 """
df.first()

Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Romania', count=15)

In [48]:
df.head(2)

[Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Romania', count=15),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Croatia', count=1)]

In [49]:
from pyspark.sql import Row
blog_row = Row(6, "Reynold", "Xin", "https://tinyurl.6", 255568, "3/2/2015",
["twitter", "LinkedIn"])
print(blog_row[1])

rows = [Row("Matei Zaharia", "CA"), Row("Reynold Xin", "CA")]
authors_df = spark.createDataFrame(rows, ["Authors", "State"])
authors_df.show()

Reynold
+-------------+-----+
|      Authors|State|
+-------------+-----+
|Matei Zaharia|   CA|
|  Reynold Xin|   CA|
+-------------+-----+



In [7]:
# In Python, define a schema
from pyspark.sql.types import *

# Programmatic way to define a schema
fire_schema = StructType([StructField('CallNumber', IntegerType(), True),
StructField('UnitID', StringType(), True),
StructField('IncidentNumber', IntegerType(), True),
StructField('CallType', StringType(), True),
StructField('CallDate', StringType(), True),
StructField('WatchDate', StringType(), True),
StructField('CallFinalDisposition', StringType(), True),
StructField('AvailableDtTm', StringType(), True),
StructField('Address', StringType(), True),
StructField('City', StringType(), True),
StructField('Zipcode', IntegerType(), True),
StructField('Battalion', StringType(), True),
StructField('StationArea', StringType(), True),
StructField('Box', StringType(), True),
StructField('OriginalPriority', StringType(), True),
StructField('Priority', StringType(), True),
StructField('FinalPriority', IntegerType(), True),
StructField('ALSUnit', BooleanType(), True),
StructField('CallTypeGroup', StringType(), True),
StructField('NumAlarms', IntegerType(), True),
StructField('UnitType', StringType(), True),
StructField('UnitSequenceInCallDispatch', IntegerType(), True),
StructField('FirePreventionDistrict', StringType(), True),
StructField('SupervisorDistrict', StringType(), True),
StructField('Neighborhood', StringType(), True),
StructField('Location', StringType(), True),
StructField('RowID', StringType(), True),
StructField('Delay', FloatType(), True)])

# Use the DataFrameReader interface to read a CSV file
sf_fire_file = f"{work_data}/learning-spark/sf-fire-calls.csv"
fire_df = spark.read.csv(sf_fire_file, header=True, schema=fire_schema)
fire_df.select("CallNumber", "UnitID", "IncidentNumber", "CallType", "CallDate", "RowID").show(10, truncate=False)

+----------+------+--------------+----------------+----------+-------------+
|CallNumber|UnitID|IncidentNumber|CallType        |CallDate  |RowID        |
+----------+------+--------------+----------------+----------+-------------+
|20110016  |T13   |2003235       |Structure Fire  |01/11/2002|020110016-T13|
|20110022  |M17   |2003241       |Medical Incident|01/11/2002|020110022-M17|
|20110023  |M41   |2003242       |Medical Incident|01/11/2002|020110023-M41|
|20110032  |E11   |2003250       |Vehicle Fire    |01/11/2002|020110032-E11|
|20110043  |B04   |2003259       |Alarms          |01/11/2002|020110043-B04|
|20110072  |T08   |2003279       |Structure Fire  |01/11/2002|020110072-T08|
|20110125  |E33   |2003301       |Alarms          |01/11/2002|020110125-E33|
|20110130  |E36   |2003304       |Alarms          |01/11/2002|020110130-E36|
|20110197  |E05   |2003343       |Medical Incident|01/11/2002|020110197-E05|
|20110215  |E06   |2003348       |Medical Incident|01/11/2002|020110215-E06|

### 5.6 파케이 파일 혹은 테이블 저장
* save 저장 시에는 해당 경로에 파케이 파일이 저장되고, saveAsTable 저장 시에는 "spark.sql.warehouse.dir" 의 위치에 생성됩니다

In [48]:
parquetPath="target/sf_fire_calls"
fire_df.write.format("parquet").mode("overwrite").save(parquetPath)

22/04/13 07:33:48 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
                                                                                

In [8]:
!pwd
!rm -rf "spark-warehouse/sf_fire_calls"

/home/jovyan/work/answer


In [9]:
parquetTable="sf_fire_calls"
# spark.conf.set("spark.sql.legacy.allowCreatingManagedTableUsingNonemptyLocation","true")
fire_df.write.format("parquet").saveAsTable(parquetTable)

22/04/14 01:28:39 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
                                                                                

### 5.7 프로젝션과 필터
> *Projection*은 특정 관계형 조건 혹은 필터에 매칭되는 로우에 대해서만 반환하는 것을 말합니다.

In [29]:
few_fire_df = (
    fire_df
    .select("IncidentNumber", "AvailableDtTm", "CallType")
    .where(col("CallType") != "Medical Incident")
)
print(few_fire_df)
few_fire_df.show(5, truncate=False)

new_fire_df = fire_df.withColumnRenamed("Delay", "ResponseDelayedinMins")
(
    new_fire_df
    .select("ResponseDelayedinMins")
    .where(col("ResponseDelayedinMins") > 5)
    .show(5, False)
)

+--------------+----------------------+-----------------------------+
|IncidentNumber|         AvailableDtTm|                     CallType|
+--------------+----------------------+-----------------------------+
|       2003235|01/11/2002 01:51:44 AM|               Structure Fire|
|       2003250|01/11/2002 04:16:46 AM|                 Vehicle Fire|
|       2003259|01/11/2002 06:01:58 AM|                       Alarms|
|       2003279|01/11/2002 08:03:26 AM|               Structure Fire|
|       2003301|01/11/2002 09:46:44 AM|                       Alarms|
|       2003304|01/11/2002 09:58:53 AM|                       Alarms|
|       2003382|01/11/2002 02:59:04 PM|               Structure Fire|
|       2003408|01/11/2002 04:09:08 PM|               Structure Fire|
|       2003408|01/11/2002 04:09:08 PM|               Structure Fire|
|       2003408|01/11/2002 04:09:08 PM|               Structure Fire|
|       2003429|01/11/2002 05:17:15 PM|     Odor (Strange / Unknown)|
|       2003453|01/1

### 5.8 날짜 관련 함수
* 날짜의 경우 문자열로 전달되고 있기 때문에 표현 및 활용을 위해서는 to_timestamp(), to_date() 와 같은 날짜관련 함수를 사용할 수 있습니다.
  - 한번 timestamp 형태로 변경된 컬럼에 대해서는 year, month, dayofmonth 와 같은 일자관련 함수를 통해 다양한 예제를 실습할 수 있습니다

In [30]:
fire_ts_df = (new_fire_df
    .withColumn("IncidentDate", to_timestamp(col("CallDate"), "MM/dd/yyyy"))
    .drop("CallDate")
    .withColumn("OnWatchDate", to_timestamp(col("WatchDate"), "MM/dd/yyyy"))
    .drop("WatchDate")
    .withColumn("AvailableDtTS", to_timestamp(col("AvailableDtTm"),"MM/dd/yyyy hh:mm:ss a"))
    .drop("AvailableDtTm")
)
fire_ts_df.select("IncidentDate", "OnWatchDate", "AvailableDtTS").show(5, truncate=False)

from pyspark.sql.functions import *
(
    fire_ts_df
    .select(year('IncidentDate'), month('IncidentDate'), dayofmonth("IncidentDate"))
    .distinct()
    .orderBy(year('IncidentDate'), month('IncidentDate'), dayofmonth("IncidentDate"))
    .show(5)
)

+-------------------+-------------------+-------------------+
|IncidentDate       |OnWatchDate        |AvailableDtTS      |
+-------------------+-------------------+-------------------+
|2002-01-11 00:00:00|2002-01-10 00:00:00|2002-01-11 01:51:44|
|2002-01-11 00:00:00|2002-01-10 00:00:00|2002-01-11 03:01:18|
|2002-01-11 00:00:00|2002-01-10 00:00:00|2002-01-11 02:39:50|
|2002-01-11 00:00:00|2002-01-10 00:00:00|2002-01-11 04:16:46|
|2002-01-11 00:00:00|2002-01-10 00:00:00|2002-01-11 06:01:58|
+-------------------+-------------------+-------------------+
only showing top 5 rows



[Stage 158:>                                                        (0 + 4) / 4]

+------------------+-------------------+------------------------+
|year(IncidentDate)|month(IncidentDate)|dayofmonth(IncidentDate)|
+------------------+-------------------+------------------------+
|              2000|                  4|                      12|
|              2000|                  4|                      13|
|              2000|                  4|                      14|
|              2000|                  4|                      15|
|              2000|                  4|                      16|
+------------------+-------------------+------------------------+
only showing top 5 rows



                                                                                

### <font color=green>7. [기본]</font> 아래와 같이 제공된 데이터를 통해 생성한 데이터를 파케이 포맷으로 저장하세요
#### 1. "target/lgde_user" 경로에 파케이 파일로 저장하세요
#### 2. "lgde_user" 테이블로 저장하세요

<details><summary>[실습7] 출력 결과 확인 </summary>

> 아래와 유사하게 방식으로 작성 되었다면 정답입니다

```python
df6 = [
    Row(1, "엘지전자", 1000),
    Row(2, "엘지화학", 2000),
    Row(3, "엘지디스플레이", 3000)
]
sc6 = "`id` int, `name` string, `payment` int"
answer = spark.createDataFrame(df6, sc6)
answer.printSchema()
display(answer)

answer.write.format("parquet").save("target/lgde_user")
answer.write.format("parquet").saveAsTable("lgde_user")
```

</details>


In [3]:
# 여기에 실습 코드를 작성하고 실행하세요 (Shift+Enter)
df6 = [
    Row(1, "엘지전자", 1000),
    Row(2, "엘지화학", 2000),
    Row(3, "엘지디스플레이", 3000)
]
sc6 = "`id` int, `name` string, `payment` int"
answer = spark.createDataFrame(df6, sc6)
answer.printSchema()
display(answer)

answer.write.format("parquet").mode("overwrite").save("target/lgde_user")

root
 |-- id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- payment: integer (nullable = true)



id,name,payment
1,엘지전자,1000
2,엘지화학,2000
3,엘지디스플레이,3000


                                                                                

In [4]:
!rm -rf $work_dir/spark-warehouse/lgde_user
answer.write.format("parquet").saveAsTable("lgde_user")

### <font color=green>8. [기본]</font> '실습7' 에서 생성한 데이터를 읽어서 출력하세요
#### 1. "target/lgde_user" 경로에 파케이 파일을 읽어서 스키마와 데이터를 출력하세요
#### 2. "lgde_user" 테이블로 저장된 테이블을 spark sql 로 읽어서 스키마와 데이터를 출력하세요

<details><summary>[실습8] 출력 결과 확인 </summary>

> 아래와 유사하게 방식으로 작성 되었다면 정답입니다

```python
df8 = (
    spark
    .read.parquet("target/lgde_user")
)
df8.printSchema()
display(df8)

answer = spark.sql("select * from lgde_user")
display(answer)
```

</details>


In [5]:
# 여기에 실습 코드를 작성하고 실행하세요 (Shift+Enter)
df8 = (
    spark
    .read.parquet("target/lgde_user")
)
df8.printSchema()
display(df8)

answer = spark.sql("select * from lgde_user")
display(answer)

root
 |-- id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- payment: integer (nullable = true)



id,name,payment
3,엘지디스플레이,3000
1,엘지전자,1000
2,엘지화학,2000


id,name,payment
3,엘지디스플레이,3000
2,엘지화학,2000
1,엘지전자,1000


## 6. 데이터셋 API
> Python 과 R 은 compile-time type-safe 하지 않기 때문에, Datasets 통한 Typed 데이터 타입을 사용할 수 없습니다. Datasets 을 이용하는 경우에도 Spark SQL 엔진이 객체를 생성, 변환, 직렬화, 역직렬화를 수행하며, **Dataframe 의 경우와 마찬가지로 Off-heap 을 통한 메모리 관리를 수행**하게 되며, Dataset encoders 를 이용합니다

### 6.1 데이터셋과 데이터프레임 비교

| Structured APIs | SQL vs. Dataframe vs. Datasets |
|---|---|
| ![structured-api](images/structured-api.png) | ![sql-vs-dataframes-vs-datasets-type-safety-spectrum](images/sql-vs-dataframes-vs-datasets-type-safety-spectrum.png) |

* 언어별 타입 객체 비교
![typed-untyped](images/typed-untyped.png)

* Scala: Case Class 를 통해 선언
```scala
case class DeviceIoTData (
    battery_level: Long, 
    c02_level: Long,
    cca2: String, 
    cca3: String, 
    cn: String, 
    device_id: Long,
    device_name: String, 
    humidity: Long, 
    ip: String, 
    latitude: Double,
    lcd: String, 
    longitude: Double, 
    scale:String, 
    temp: Long,
    timestamp: Long)
```

* 데이터를 읽고 DeviceIoTData 클래스로 변환을 수행합니다
```scala
val ds = spark.read.json("/databricks-datasets/learning-spark-v2/iot-devices/iot_devices.json").as[DeviceIoTData]
val filterTempDS = ds.filter({d => {d.temp > 30 && d.humidity > 70})
```
* Datasets 이용 시에는 filter(), map(), groupBy(), select(), take() 등의 일반적인 함수를 사용합니다
```scala                              
case class DeviceTempByCountry(temp: Long, device_name: String, device_id: Long, cca3: String)
val dsTemp = ds.filter(d => {d.temp > 25})
    .map(d => (d.temp, d.device_name, d.device_id, d.cca3))
    .toDF("temp", "device_name", "device_id", "cca3")
    .as[DeviceTempByCountry]
```

### 6.2 데이터셋 데이터프레임 그리고 RDD
* Datasets
  - compile-time 의 type safety 가 필요한 경우
* Dataframe
  - SQL-like 쿼리를 이용하고자 하는 경우
  - 통합, 코드 최적화 그리고 API를 활용한 모듈화를 원하는 경우
  - R 혹은 Python 을 이용해야 하는 경우
  - 공간, 속도 효율성을 고려해야 하는 경우
* RDD
  - 별도의 RDD를 이용하는 써드파티 패키지를 사용하는 경우
  - 코드, 공간, 속도 최적화 등을 원하지 않는 경우
  - 스파크가 수행할 쿼리를 정확히 지시해야만 할 때


* RDD와 데이터프레임과 데이터셋은 서로 다른가?
  - 데이터프레임과 데이터셋은 RDD 위에서 구현됩니다. 즉, whole-stage code generation 단계에서 압축된 RDD 코드로 분해됩니다.

> DataFrames and Datasets are
built on top of RDDs, and they get decomposed to compact RDD code during wholestage
code generation, which we discuss in the next section

* Spark SQL
![spark-sql](images/spark-sql.png)

### 6.3 데이터프레임

![transform](images/transform.png)

* 데이터프레임을 다루는 연산자의 특징
  - 로우나 컬럼 추가
  - 로우나 컬럼 제거
  - 로우를 컬럼으로 변환하거나, 그 반대로 변환
  - 컬럼값을 기준으로 로우 순서 변경


In [66]:
""" 원시 데이터소스 활용 """
df = spark.read.format("json").load(f"{work_data}/flight-data/json/2015-summary.json")
df.createOrReplaceTempView("2015_summary")

sql_result = spark.sql("SELECT * FROM 2015_summary").show(5)

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Romania|   15|
|    United States|            Croatia|    1|
|    United States|            Ireland|  344|
|            Egypt|      United States|   15|
|    United States|              India|   62|
+-----------------+-------------------+-----+
only showing top 5 rows



### <font color=blue>9. [중급]</font> f"{work_data}/tbl_user.csv" 에 저장된 CSV 파일을 읽고
#### 1. 스키마를 출력하세요
#### 2. 데이터를 10건 출력하세요
#### 3. 가입일자 컬럼(u_signup)을 이용하여 가장 최근에 가입한 5명을 출력하세요

<details><summary>[실습9] 출력 결과 확인 </summary>

> 아래와 유사하게 방식으로 작성 되었다면 정답입니다

```python
df9 = (
    spark
    .read
    .option("header", "true")
    .option("inferSchema", "true")
    .csv(f"{work_data}/tbl_user.csv")
)
df9.printSchema()
df9.show(10)
df9.createOrReplaceTempView("tbl_user")
answer = spark.sql("select * from tbl_user order by u_signup desc limit 5")
display(answer)
```

</details>


In [7]:
# 여기에 실습 코드를 작성하고 실행하세요 (Shift+Enter)
df9 = (
    spark
    .read
    .option("header", "true")
    .option("inferSchema", "true")
    .csv(f"{work_data}/tbl_user.csv")
)
df9.printSchema()
df9.show(10)
df9.createOrReplaceTempView("tbl_user")
answer = spark.sql("select * from tbl_user order by u_signup desc limit 5")
display(answer)

root
 |-- u_id: integer (nullable = true)
 |-- u_name: string (nullable = true)
 |-- u_gender: string (nullable = true)
 |-- u_signup: integer (nullable = true)

+----+----------+--------+--------+
|u_id|    u_name|u_gender|u_signup|
+----+----------+--------+--------+
|   1|    정휘센|      남|19700808|
|   2|  김싸이언|      남|19710201|
|   3|    박트롬|      여|19951030|
|   4|    청소기|      남|19770329|
|   5|유코드제로|      여|20021029|
|   6|  윤디오스|      남|20040101|
|   7|  임모바일|      남|20040807|
|   8|  조노트북|      여|20161201|
|   9|  최컴퓨터|      남|20201124|
+----+----------+--------+--------+



u_id,u_name,u_gender,u_signup
9,최컴퓨터,남,20201124
8,조노트북,여,20161201
7,임모바일,남,20040807
6,윤디오스,남,20040101
5,유코드제로,여,20021029


### <font color=blue>10. [중급]</font> f"{work_data}/tbl_purchase.csv" 에 저장된 CSV 파일을 읽고
#### 1. 스키마를 출력하세요
#### 2. 데이터를 10건 출력하세요
#### 3. 상품가격 컬럼(p_amount)을 이용하여 200만원 이상 금액의 상품 가운데 상위 3개를 출력하세요

<details><summary>[실습10] 출력 결과 확인 </summary>

> 아래와 유사하게 방식으로 작성 되었다면 정답입니다

```python
df10 = (
    spark
    .read
    .option("header", "true")
    .option("inferSchema", "true")
    .csv(f"{work_data}/tbl_purchase.csv")
)
df10.printSchema()
df10.show(10)
df10.createOrReplaceTempView("tbl_purchase")
answer = spark.sql("select * from tbl_purchase where p_amount > 2000000 order by p_amount desc limit 3")
display(answer)
```

</details>


In [8]:
# 여기에 실습 코드를 작성하고 실행하세요 (Shift+Enter)
df10 = (
    spark
    .read
    .option("header", "true")
    .option("inferSchema", "true")
    .csv(f"{work_data}/tbl_purchase.csv")
)
df10.printSchema()
df10.show(10)

df10.createOrReplaceTempView("tbl_purchase")
answer = spark.sql("select * from tbl_purchase where p_amount > 2000000 order by p_amount desc limit 3")
display(answer)

root
 |-- p_time: integer (nullable = true)
 |-- p_uid: integer (nullable = true)
 |-- p_id: integer (nullable = true)
 |-- p_name: string (nullable = true)
 |-- p_amount: integer (nullable = true)

+----------+-----+----+-----------+--------+
|    p_time|p_uid|p_id|     p_name|p_amount|
+----------+-----+----+-----------+--------+
|1603651550|    0|1000|GoldStar TV|  100000|
|1603651550|    1|2000|    LG DIOS| 2000000|
|1603694755|    1|2001|    LG Gram| 1800000|
|1603673500|    2|2002|    LG Cyon| 1400000|
|1603652155|    3|2003|      LG TV| 1000000|
|1603674500|    4|2004|LG Computer| 4500000|
|1603665955|    5|2001|    LG Gram| 3500000|
|1603666155|    5|2003|      LG TV| 2500000|
+----------+-----+----+-----------+--------+



p_time,p_uid,p_id,p_name,p_amount
1603674500,4,2004,LG Computer,4500000
1603665955,5,2001,LG Gram,3500000
1603666155,5,2003,LG TV,2500000


## 참고자료

#### 1. [Spark Programming Guide](https://spark.apache.org/docs/latest/sql-programming-guide.html)
#### 2. [PySpark SQL Modules Documentation](https://spark.apache.org/docs/latest/api/python/pyspark.sql.html)
#### 3. <a href="https://spark.apache.org/docs/3.0.1/api/sql/" target="_blank">PySpark 3.0.1 Builtin Functions</a>