## 스파크의 기본 아키텍쳐
- 스파크는 사용 가능한 자원을 파악하기 위해 스파크 standalone, 하둡 YARN, Mesos 같은 클러스터 매니저를 사용함
- 연산 처리 과정: 사용자가 클러스터 매니저에 스파크 애플리케이션을 submit함 -> 클러스터 매니저는 필요한 자원을 할당하여 작업을 처리함
- 스파크 애플리케이션: 드라이버 프로세스와 익스큐터 프로세스로 구성됨
- 드라이버 프로세스: 사용자의 입력에 대한 응답, 전반적인 작업 분석, 배포 및 스케쥴링 담당
- 익스큐터 프로세스: 드라이버 프로세스가 할당한 작업을 수행하고 다시 드라이버에 보고함

### SparkSession
- 스파크 애플리케이션은 SparkSession이라고 불리는 드라이버 프로세스로 제어함
- SparkSession 객체는 사용자의 처리 명령을 클러스터에서 실행함
- 하나의 SparkSession이 하나의 스파크 애플리케이션에 대응됨

### DataFrame
- 가장 대표적인 구조적 API
- 테이블의 데이터를 row, column으로 단순하게 표현함. 데이터는 클러스터에 분산 저장됨
- 파티션: 데이터 병렬 처리를 위해 청크 단위로 나누어진 데이터의 집합 (클러스터의 각 노드에 존재하는 row의 집합)

### Transformation
- DataFrame을 변경하는 것
- 좁은 Transformation: 각 입력 파티션이 하나의 출력 파티션에만 영향을 미침. where 구문 등
- 넓은 Transformation: 하나의 입력 파티션이 여러 출력 파티션에 영향을 미침. shuffle시 일어남 (sort() 등)
- 지연 실행: 스파크는 연산 명령이 내려지기 전에는 데이터를 수정하지 않고 transformation 실행 계획만 생성함 -> 연산 명령이 떨어진 시점에서 실행 계획을 최적화한 상태로 연산 수행
- 실행 계획: transformation의 지향성 비순환 그래프 (directed acyclic graph)이며 action 호출시 결과를 만들어냄. 각 단계마다 불변성을 지닌 새로운 DataFrame이 생성됨
- 스파크 UI: 스파크 job의 상태, 환경 설정, 클러스터 상태 등 모니터링 가능. 드라이버 노드의 4040 port로 접속
- 사용자는 물리적 데이터를 직접 다루지 않고, 속성값을 변경하여 데이터 처리의 실행 특성을 제어함

### Action
- 일련의 transformation으로부터 결과를 계산하도록 지시하는 연산 명령
- ex) 콘솔에서 데이터를 봄, 각 언어의 native 객체에 데이터를 모음, 출력 데이터 소스에 저장 등

### [예제] 미국 교통통계국의 항공운항 데이터 분석
- semi-structured csv file

In [2]:
import findspark
findspark.init()

In [3]:
# SparkSession 객체 생성
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
df = spark.sql("select 'spark' as hello ")
df.show()

+-----+
|hello|
+-----+
|spark|
+-----+



In [4]:
spark

### Data loading
- 스키마 정보를 자동으로 알아내는 schema inference 기능 사용 (시간이 걸리기 때문에 실제 운영 환경에서는 직접 스키마를 지정하여 로딩함)
- 첫 row를 헤더로 지정함

In [6]:
flightData2015 = spark\
  .read\
  .option("inferSchema", "true")\
  .option("header", "true")\
  .csv("c:/SparkDG/data/flight-data/csv/2015-summary.csv")

In [7]:
flightData2015.take(3)

[Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Romania', count=15),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Croatia', count=1),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Ireland', count=344)]

### sort (count column 기준)
- .sort(): 단지 transformation임으로 실제 데이터에는 아무런 변화는 일어나지 않고 실행 계획만 생성됨
- .explain(): 스파크의 쿼리 실행 계획을 보여주는 method
- 실행 계획은 역순으로 출력됨. Sort / Exchange / FileScan

In [8]:
flightData2015.sort("count").explain()

== Physical Plan ==
*(2) Sort [count#19 ASC NULLS FIRST], true, 0
+- Exchange rangepartitioning(count#19 ASC NULLS FIRST, 200)
   +- *(1) FileScan csv [DEST_COUNTRY_NAME#17,ORIGIN_COUNTRY_NAME#18,count#19] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/c:/SparkDG/data/flight-data/csv/2015-summary.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<DEST_COUNTRY_NAME:string,ORIGIN_COUNTRY_NAME:string,count:int>


In [9]:
# shuffle의 출력 partition 수를 5로 줄이고 sort action을 수행
spark.conf.set("spark.sql.shuffle.partitions", "5")

flightData2015.sort("count").take(2)

[Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Singapore', count=1),
 Row(DEST_COUNTRY_NAME='Moldova', ORIGIN_COUNTRY_NAME='United States', count=1)]

### DataFrame과 SQL
- DataFrame을 view로 등록한 후 SQL 쿼리 수행
- SQL쿼리는 DataFrame 코드와 같은 실행 계획으로 컴파일되므로 둘 사이의 성능 차이는 없음

In [10]:
flightData2015.createOrReplaceTempView("flight_data_2015")

In [11]:
sqlWay = spark.sql("""
select dest_country_name, count(1)
from flight_data_2015
group by dest_country_name
""")

sqlWay.take(5)

[Row(dest_country_name='Moldova', count(1)=1),
 Row(dest_country_name='Bolivia', count(1)=1),
 Row(dest_country_name='Algeria', count(1)=1),
 Row(dest_country_name='Turks and Caicos Islands', count(1)=1),
 Row(dest_country_name='Pakistan', count(1)=1)]

In [12]:
sql2 = spark.sql("""
select dest_country_name, count(*) as cnt
from flight_data_2015
group by dest_country_name
order by cnt desc
""")

sql2.take(10)

[Row(dest_country_name='United States', cnt=125),
 Row(dest_country_name='Taiwan', cnt=1),
 Row(dest_country_name='Latvia', cnt=1),
 Row(dest_country_name='Ecuador', cnt=1),
 Row(dest_country_name='Venezuela', cnt=1),
 Row(dest_country_name='Hungary', cnt=1),
 Row(dest_country_name='Martinique', cnt=1),
 Row(dest_country_name='Croatia', cnt=1),
 Row(dest_country_name='Cyprus', cnt=1),
 Row(dest_country_name='Qatar', cnt=1)]

In [13]:
# 최대값 구하기
spark.sql("select max(count) from flight_data_2015").take(1)

[Row(max(count)=370002)]

In [17]:
# 목적지 별 count 합계 구하기 (SQL 구문)
maxSql = spark.sql("""
select dest_country_name, sum(count) as destination_total
from flight_data_2015
group by dest_country_name
order by sum(count) desc
limit 5
""")

maxSql.show()

+-----------------+-----------------+
|dest_country_name|destination_total|
+-----------------+-----------------+
|    United States|           411352|
|           Canada|             8399|
|           Mexico|             7140|
|   United Kingdom|             2025|
|            Japan|             1548|
+-----------------+-----------------+



In [15]:
# 목적지 별 count 합계 구하기 (DataFrame 구문)
from pyspark.sql.functions import desc

flightData2015\
  .groupBy("DEST_COUNTRY_NAME")\
  .sum("count")\
  .withColumnRenamed("sum(count)", "destination_total")\
  .sort(desc("destination_total"))\
  .limit(5)\
  .show()

+-----------------+-----------------+
|DEST_COUNTRY_NAME|destination_total|
+-----------------+-----------------+
|    United States|           411352|
|           Canada|             8399|
|           Mexico|             7140|
|   United Kingdom|             2025|
|            Japan|             1548|
+-----------------+-----------------+



### DataFrame의 변환 흐름
- read -> groupBy -> sum -> column rename -> sort -> limit -> collect

In [16]:
maxSql.explain()

== Physical Plan ==
TakeOrderedAndProject(limit=5, orderBy=[aggOrder#59L DESC NULLS LAST], output=[dest_country_name#17,sum(count)#58L])
+- *(2) HashAggregate(keys=[dest_country_name#17], functions=[sum(cast(count#19 as bigint))])
   +- Exchange hashpartitioning(dest_country_name#17, 5)
      +- *(1) HashAggregate(keys=[dest_country_name#17], functions=[partial_sum(cast(count#19 as bigint))])
         +- *(1) FileScan csv [DEST_COUNTRY_NAME#17,count#19] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/c:/SparkDG/data/flight-data/csv/2015-summary.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<DEST_COUNTRY_NAME:string,count:int>


#### [스파크 완벽 가이드] - chapter 2 정리