# 미국 교통통계국의 항공 운항 데이터 분석

[data/flight-data](https://github.com/FVbros/Spark-The-Definitive-Guide/tree/master/data/flight-data)

In [2]:
from pyspark.sql import SparkSession

In [3]:
# Create Spark session
spark = SparkSession.builder.appName("MyApp").getOrCreate()

# Load a DataFrame
df = spark.read.csv("../data/flight-data/csv/2015-summary.csv", header=True, inferSchema=True)

# Show first 5 rows
df.show(5)


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/03/02 20:51:12 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
                                                                                

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Romania|   15|
|    United States|            Croatia|    1|
|    United States|            Ireland|  344|
|            Egypt|      United States|   15|
|    United States|              India|   62|
+-----------------+-------------------+-----+
only showing top 5 rows



SparkSession 
- DataFrameReader 클래스
- 스파크는 다양한 데이터 소스를 읽을 수 있다.
- 옵션
  - `.option("inferSchema", "true")`: 스키마 추론 기능 사용
  - `.option("header", "true")`: 파일의 첫 로우를 헤더로 지정

In [4]:
flightData2015 = spark\
  .read\
  .option("inferSchema", "true")\
  .option("header", "true")\
  .csv("../data/flight-data/csv/2015-summary.csv")

In [5]:
flightData2015

DataFrame[DEST_COUNTRY_NAME: string, ORIGIN_COUNTRY_NAME: string, count: int]

데이터를 읽는 과정
- 지연 연산 형태의 트랜스포메이션: 로우의 수를 알 수 없다. 
- 스파크는 각 칼럼의 데이터 타입을 추론하기 위해 적은 양의 데이터를 읽는다.

In [6]:
flightData2015.count()

256

In [7]:
flightData2015.take(3)

[Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Romania', count=15),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Croatia', count=1),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Ireland', count=344)]

`take()` 
- head() 명령과 같은 결과를 얻을 수 있다.

`sort()`
- 데이터프레임을 변경하지 않는다.
- 이전의 데이터프레임을 사용해 새로운 데이터프레임을 생성해 반환한다.
- 트랜스포메이션 이기 때문에 호출 시 데이터에는 아무런 변화도 일어나지 않는다. (실행 계획만 세움)

CSV 파일 -> DF -> DF -> Arrary(...)
- `read()`: 좁은 트랜스포메이션 
- `sort()`: 넓은 트랜스포메이션 
- `take(3)` 

`explain()`
- 데이터페이스의 계보(lineage)나 스파크 쿼리 실행 계획을 확인 할 수 있다.

데이터 계보(Data Lineage)
- 데이터가 어떻게 생성, 변형, 전파되었는지를 추적하는 개념
- 데이터의 출처와 변형 과정을 기록하여 데이터의 흐름을 파알할 수 있도록 한다.

In [8]:
flightData2015.sort("count").explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Sort [count#59 ASC NULLS FIRST], true, 0
   +- Exchange rangepartitioning(count#59 ASC NULLS FIRST, 200), ENSURE_REQUIREMENTS, [plan_id=94]
      +- FileScan csv [DEST_COUNTRY_NAME#57,ORIGIN_COUNTRY_NAME#58,count#59] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/Users/park/Desktop/Spark - The Definitive Guide/data/flight-data..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<DEST_COUNTRY_NAME:string,ORIGIN_COUNTRY_NAME:string,count:int>




In [9]:
# 액션 호출
# 셔플 파티션 생성 (기본값: 200)
# 셔풀의 출력 파티션을 5로 설정

spark.conf.set("spark.sql.shuffle.partitions", "5")

flightData2015.sort("count").take(5)

[Row(DEST_COUNTRY_NAME='Malta', ORIGIN_COUNTRY_NAME='United States', count=1),
 Row(DEST_COUNTRY_NAME='Saint Vincent and the Grenadines', ORIGIN_COUNTRY_NAME='United States', count=1),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Croatia', count=1),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Gibraltar', count=1),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Singapore', count=1)]

트랜스포메이션의 논리적 실행 계획
- 데이터프레임의 계보를 정의
- 계보를 통해 입력 데이터에 수행한 연산을 전체 파티션에서 어떻게 재연산하는지 확인
- 함수형 프로그래밍의 핵심 (같은 입력에 대해 항상 같은 출력을 생성)

사용자는 물리적 데이터를 직접 다루지 않는다. 대신 속성(ex. 셔플 파티션 파라미터)으로 물리적 실행 특성을 제어한다.

스파크 UI에 접속해 잡의 실행 상태와 스파크의 물리적, 논리적 실행 특정을 확인 할 수 있다.

In [10]:
# 데이터프레임을 테이블로 변환
flightData2015.createOrReplaceTempView("flight_data_2015")

In [11]:
sqlWay = spark.sql("""
SELECT DEST_COUNTRY_NAME, count(1)
FROM flight_data_2015
GROUP BY DEST_COUNTRY_NAME
""")

dataFrameWay = flightData2015\
.groupBy("DEST_COUNTRY_NAME")\
.count()

sqlWay.explain()
dataFrameWay.explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[DEST_COUNTRY_NAME#57], functions=[count(1)])
   +- Exchange hashpartitioning(DEST_COUNTRY_NAME#57, 5), ENSURE_REQUIREMENTS, [plan_id=116]
      +- HashAggregate(keys=[DEST_COUNTRY_NAME#57], functions=[partial_count(1)])
         +- FileScan csv [DEST_COUNTRY_NAME#57] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/Users/park/Desktop/Spark - The Definitive Guide/data/flight-data..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<DEST_COUNTRY_NAME:string>


== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[DEST_COUNTRY_NAME#57], functions=[count(1)])
   +- Exchange hashpartitioning(DEST_COUNTRY_NAME#57, 5), ENSURE_REQUIREMENTS, [plan_id=129]
      +- HashAggregate(keys=[DEST_COUNTRY_NAME#57], functions=[partial_count(1)])
         +- FileScan csv [DEST_COUNTRY_NAME#57] Batched: false, DataFilters: [], Format: CSV, Location: InMe

In [12]:
# 특정 위치를 왕래하는 최대 비행 횟수를 구한다

# using spark sql
spark.sql("SELECT max(count) from flight_data_2015").take(1)

# using data frame
from pyspark.sql.functions import max
flightData2015.select(max("count")).take(1)

[Row(max(count)=370002)]

In [13]:
# 상위 5개의 도착 국가를 찾아내는 코드

maxSql = spark.sql("""
SELECT DEST_COUNTRY_NAME, sum(count) as destination_total
FROM flight_data_2015
GROUP BY DEST_COUNTRY_NAME
ORDER BY sum(count) DESC
LIMIT 5
""")

maxSql.show()

+-----------------+-----------------+
|DEST_COUNTRY_NAME|destination_total|
+-----------------+-----------------+
|    United States|           411352|
|           Canada|             8399|
|           Mexico|             7140|
|   United Kingdom|             2025|
|            Japan|             1548|
+-----------------+-----------------+



In [14]:
from pyspark.sql.functions import desc

flightData2015\
.groupBy("DEST_COUNTRY_NAME")\
.sum("count")\
.withColumnRenamed("sum(count)", "destination_total")\
.sort(desc("destination_total"))\
.limit(5)\
.show()

+-----------------+-----------------+
|DEST_COUNTRY_NAME|destination_total|
+-----------------+-----------------+
|    United States|           411352|
|           Canada|             8399|
|           Mexico|             7140|
|   United Kingdom|             2025|
|            Japan|             1548|
+-----------------+-----------------+



In [15]:
flightData2015\
.groupBy("DEST_COUNTRY_NAME")\
.sum("count")\
.withColumnRenamed("sum(count)", "destination_total")\
.sort(desc("destination_total"))\
.limit(5)\
.explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- TakeOrderedAndProject(limit=5, orderBy=[destination_total#161L DESC NULLS LAST], output=[DEST_COUNTRY_NAME#57,destination_total#161L])
   +- HashAggregate(keys=[DEST_COUNTRY_NAME#57], functions=[sum(count#59)])
      +- Exchange hashpartitioning(DEST_COUNTRY_NAME#57, 5), ENSURE_REQUIREMENTS, [plan_id=299]
         +- HashAggregate(keys=[DEST_COUNTRY_NAME#57], functions=[partial_sum(count#59)])
            +- FileScan csv [DEST_COUNTRY_NAME#57,count#59] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/Users/park/Desktop/Spark - The Definitive Guide/data/flight-data..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<DEST_COUNTRY_NAME:string,count:int>




25/03/03 06:18:15 WARN HeartbeatReceiver: Removing executor driver with no recent heartbeats: 440238 ms exceeds timeout 120000 ms
25/03/03 06:18:15 WARN SparkContext: Killing executors is not supported by current scheduler.
25/03/03 06:18:20 WARN Executor: Issue communicating with driver in heartbeater
org.apache.spark.SparkException: Exception thrown in awaitResult: 
	at org.apache.spark.util.SparkThreadUtils$.awaitResult(SparkThreadUtils.scala:56)
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:310)
	at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
	at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:101)
	at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:85)
	at org.apache.spark.storage.BlockManagerMaster.registerBlockManager(BlockManagerMaster.scala:80)
	at org.apache.spark.storage.BlockManager.reregister(BlockManager.scala:642)
	at org.apache.spark.executor.Executor.reportHeartBeat(Executor.scala:1223)
	at o