In [1]:
from pyspark.sql import SparkSession
MAX_MEMORY = "32g"
spark = (
    SparkSession.builder.appName("Test Shell")
    .config("spark.driver.memory", MAX_MEMORY)
    .getOrCreate()
)

----
----
## Table of Contents
### 1. Spark's Architecture
* What is Spark
* Spark Applicaiton
* SparkSession

### 2. RDD
* What is RDD
* RDD's Operation
* RDD - Lazy Loading
* Transformation's Dependency
* small examples

### 3. Main Example
* Calling Data
* DataFrame & Spark SQL
* .explain
---

## Spark's Architecture
#### 대규모 데이터의 처리

대규모 데이터를 처리하는 등의 일은 한 대의 컴퓨터로 처리하기 힘들다. 그래서 컴퓨터 클러스터가 필요함.

* 컴퓨터 클러스터: 여러 컴퓨터의 자원을 모아 하나의 컴퓨터처럼 사용 할 수 있게 만드는 것.

### So, what is Spark?

**Spark**: 클러스터의 데이터 처리 작업을 관리하고 조율하는 프레임워크

사용자는 **클러스터 매니저**에 **스파크 애플리케이션**을 제출하고, 매니저는 애플리케이션 실행에 필요한 자원을 **할당**한다.

### Spark Application
Spark Applicaiton은 **driver 프로세스**와 다수의 **executor 프로세스**로 구성되어 있고, **클러스터 매니저**에 자원을 할당받는다.
* **Driver Process**: ``main()``함수를 실행하며, 스파크 애플리켕션 정보의 유지 관리, 사용자 프로그램에 대한 응답, executor process의 작업 분석 및 스케줄링 등 전반적으로 **관리**하는 역할을 수행. ~= **Spark Session**
* **Executor Process**: driver process가 할당한 작업을 수행. 스파크 코드를 실행하는 역할

### SparkSession
Driver Process의 일부분으로, Spark Application을 제어. 사용자가 정의한 처리 명령을 SparkSession instance가 실행

---
## RDD
(RDD에 대한 구체적인 설명은 책에 나와있지 않아 [RDD를 설명한 이 블로그](https://bcho.tistory.com/1027)와 [종속성에 대해 설명한 이 블로그](https://brocess.tistory.com/187)를 참고했다.)

**RDD**(Resilient Distributed Dataset): 여러개의 **파티션**으로 분리가 되는 변경이 불가능한 데이터의 집합(ex. DataFrame)

Immutalbe하기 때문에, 변경이 필요하다면 새로운 dataset을 생성해야 한다.

### RDD의 Operation
* Transformation: 기존 RDD의 데이터를 변경하여 새로운 RDD 데이터를 생성하는 것. 
    * filter: 특정 데이터를 뽑아 냄
    * map: 데이터를 분산 배치
* Action: RDD 값을 기반으로 무엇인가를 계산해서 결과를 생성해내는 것. action을 실행하면 스파크 job이 시작된다.
    * 최종 결과를 리턴(count)
    * 데이터를 외부 저장소에 저장
    
### RDD의 Lazy Loading
RDD에 transformation을 가해도, action이 일어나기 전까지 실제로 코드가 실행되지 않는다.

즉, 스파크는 특정 tranformation 명령이 내려지면 기존 RDD에 적용할 transformation의 **실행 계획**을 생성한다. 이를 통해 스파크는 전체 데이터 흐름을 최적화 할 수 있다.
* ``.explain()`` method로 실행 계획을 확인할 수 있다.

### Transformation의 Dependency
Transformation이란, 기존의 RDD 데이터를 변경(filter, mapping)하여 새로운 RDD 데이터를 생성하는 과정이다.

이 때, 기존의 RDD를 부모 RDD, 새로운 RDD를 자식 RDD라고 한다면,
#### 좁은 종속성
부모 RDD의 Partition이 하나의 자식 Partition을 가진다면, 좁은 종속성이라고 한다.
#### 넓은 종속성
부모 RDD의 Partition이 여러 개의 자식 Partition을 가진다면, 넓은 종속성이라고 한다.

### Short  Example
#### RDD Example: DataFrame
Python과는 다르게, 하나의 DataFrame이 수천 대의 컴퓨터에 분산되어 있다.

In [3]:
myRange= spark.range(1000).toDF("number")

In [4]:
myRange.show()

+------+
|number|
+------+
|     0|
|     1|
|     2|
|     3|
|     4|
|     5|
|     6|
|     7|
|     8|
|     9|
|    10|
|    11|
|    12|
|    13|
|    14|
|    15|
|    16|
|    17|
|    18|
|    19|
+------+
only showing top 20 rows



#### Transformation & Action Example

In [7]:
divisBy2= myRange.where("number % 2 = 0") # Transformation(filter): 좁은 종속성, 아직 실행되지 않음

In [8]:
divisBy2.count() # Action(count): 넓은 종속성, 이 시점에서 앞의 transformation과 본 action이 실행됨

500

---
## Main Example
#### Data 불러오기 - RDD 생성

In [10]:
flight= spark\
    .read\
    .option("inferSchema", "true")\
    .option("header", "true")\
    .csv("/Users/jisujung/Desktop/dev/DataScience/Spark/Spark-The-Definitive-Guide-master/data/flight-data/csv/2015-summary.csv")

Data 확인

In [11]:
flight.take(3)

[Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Romania', count=15),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Croatia', count=1),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Ireland', count=344)]

#### Transformation & Explain

In [13]:
flight.sort("count").explain()

== Physical Plan ==
*(1) Sort [count#33 ASC NULLS FIRST], true, 0
+- Exchange rangepartitioning(count#33 ASC NULLS FIRST, 200), true, [id=#69]
   +- FileScan csv [DEST_COUNTRY_NAME#31,ORIGIN_COUNTRY_NAME#32,count#33] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex[file:/Users/jisujung/Desktop/dev/DataScience/Spark/Spark-The-Definitive-Guide-m..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<DEST_COUNTRY_NAME:string,ORIGIN_COUNTRY_NAME:string,count:int>




#### Action

In [14]:
flight.sort("count").take(2)

[Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Singapore', count=1),
 Row(DEST_COUNTRY_NAME='Moldova', ORIGIN_COUNTRY_NAME='United States', count=1)]

### DataFrame과 SQL 사용하기
DataFrame을 테이블(or View)로 등록하여 Spark SQL을 사용하는 방법과 DataFrame을 직접 코딩하는 방법이 있음. **결과는 같다!**

#### DataFrame to Table
DataFrame을 Table이나 View로 만들어준다. 이렇게 생성된 table은 spark sql을 통해 처리 할 수 있다.

In [15]:
flight.createOrReplaceTempView('flight_')

In [16]:
# 1) by spark sql
sqlWay= spark.sql("""
SELECT DEST_COUNTRY_NAME, count(1)
FROM flight_
GROUP BY DEST_COUNTRY_NAME
""")

# 2) by dataframe
dataFrameWay= flight\
    .groupby("DEST_COUNTRY_NAME")\
    .count()

In [17]:
sqlWay.explain() == dataFrameWay.explain()

== Physical Plan ==
*(2) HashAggregate(keys=[DEST_COUNTRY_NAME#31], functions=[count(1)])
+- Exchange hashpartitioning(DEST_COUNTRY_NAME#31, 200), true, [id=#98]
   +- *(1) HashAggregate(keys=[DEST_COUNTRY_NAME#31], functions=[partial_count(1)])
      +- FileScan csv [DEST_COUNTRY_NAME#31] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex[file:/Users/jisujung/Desktop/dev/DataScience/Spark/Spark-The-Definitive-Guide-m..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<DEST_COUNTRY_NAME:string>


== Physical Plan ==
*(2) HashAggregate(keys=[DEST_COUNTRY_NAME#31], functions=[count(1)])
+- Exchange hashpartitioning(DEST_COUNTRY_NAME#31, 200), true, [id=#117]
   +- *(1) HashAggregate(keys=[DEST_COUNTRY_NAME#31], functions=[partial_count(1)])
      +- FileScan csv [DEST_COUNTRY_NAME#31] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex[file:/Users/jisujung/Desktop/dev/DataScience/Spark/Spark-The-Definitive-Guide-m..., PartitionFilt

True

#### 조금 더 복잡한 예제
상위 5개의 도착 국가를 찾아보자

In [19]:
maxSql= spark.sql("""
SELECT DEST_COUNTRY_NAME, sum(count) as destination_total
FROM flight_
GROUP BY DEST_COUNTRY_NAME
ORDER BY sum(count) DESC
LIMIT 5""")

In [20]:
maxSql.show()

+-----------------+-----------------+
|DEST_COUNTRY_NAME|destination_total|
+-----------------+-----------------+
|    United States|           411352|
|           Canada|             8399|
|           Mexico|             7140|
|   United Kingdom|             2025|
|            Japan|             1548|
+-----------------+-----------------+



In [21]:
maxSql.explain()

== Physical Plan ==
TakeOrderedAndProject(limit=5, orderBy=[aggOrder#61L DESC NULLS LAST], output=[DEST_COUNTRY_NAME#31,destination_total#59L])
+- *(2) HashAggregate(keys=[DEST_COUNTRY_NAME#31], functions=[sum(cast(count#33 as bigint))])
   +- Exchange hashpartitioning(DEST_COUNTRY_NAME#31, 200), true, [id=#178]
      +- *(1) HashAggregate(keys=[DEST_COUNTRY_NAME#31], functions=[partial_sum(cast(count#33 as bigint))])
         +- FileScan csv [DEST_COUNTRY_NAME#31,count#33] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex[file:/Users/jisujung/Desktop/dev/DataScience/Spark/Spark-The-Definitive-Guide-m..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<DEST_COUNTRY_NAME:string,count:int>




DataFrame의 변환 흐름
1. Read csv file (DataFrame)
2. groupby (DataFrame group)
3. sum (DataFrame)
4. change column name (DataFrame)
5. Sort (DataFrame)
6. Limit (DataFrame)
7. show : **Action** - DataFrame의 결과를 모으는 프로세스를 이 시점에서 시작

이러한 과정이 역순으로(가장 늦게 쌓인 순서대로) ``.explain()``의 결과값(실행 계획)에서 확인된다.