## 환경 세팅

In [1]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.master("local").appName("flight_data").getOrCreate()

24/09/12 15:40:02 WARN Utils: Your hostname, gimsehyeon-ui-MacBookPro.local resolves to a loopback address: 127.0.0.1; using 172.30.1.76 instead (on interface en0)
24/09/12 15:40:02 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/09/12 15:40:02 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


## 파일 읽어오기 

In [5]:
flightData2015 = spark.read.option("inferSchema", "true").option("header", "true").csv("./data/2015-summary.csv")

In [6]:
flightData2015.sort("count").explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Sort [count#59 ASC NULLS FIRST], true, 0
   +- Exchange rangepartitioning(count#59 ASC NULLS FIRST, 200), ENSURE_REQUIREMENTS, [plan_id=58]
      +- FileScan csv [DEST_COUNTRY_NAME#57,ORIGIN_COUNTRY_NAME#58,count#59] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/Users/seilylook/Development/Projects/chapter2/data/2015-summary...., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<DEST_COUNTRY_NAME:string,ORIGIN_COUNTRY_NAME:string,count:int>




## Sorting

In [7]:
spark.conf.set("spark.sql.shuffle.partitions", "5")

flightData2015.sort("count").take(2)

[Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Singapore', count=1),
 Row(DEST_COUNTRY_NAME='Moldova', ORIGIN_COUNTRY_NAME='United States', count=1)]

## DataFrame과 SQL 

In [10]:
flightData2015.createOrReplaceTempView("flight_data_2015")

In [12]:
sqlWay = spark.sql("""
SELECT DEST_COUNTRY_NAME, count(1)
FROM flight_data_2015
GROUP BY DEST_COUNTRY_NAME
""")

dataFrameWay = flightData2015.groupBy("DEST_COUNTRY_NAME").count()

sqlWay.explain()
dataFrameWay.explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[DEST_COUNTRY_NAME#57], functions=[count(1)])
   +- Exchange hashpartitioning(DEST_COUNTRY_NAME#57, 5), ENSURE_REQUIREMENTS, [plan_id=86]
      +- HashAggregate(keys=[DEST_COUNTRY_NAME#57], functions=[partial_count(1)])
         +- FileScan csv [DEST_COUNTRY_NAME#57] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/Users/seilylook/Development/Projects/chapter2/data/2015-summary...., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<DEST_COUNTRY_NAME:string>


== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[DEST_COUNTRY_NAME#57], functions=[count(1)])
   +- Exchange hashpartitioning(DEST_COUNTRY_NAME#57, 5), ENSURE_REQUIREMENTS, [plan_id=99]
      +- HashAggregate(keys=[DEST_COUNTRY_NAME#57], functions=[partial_count(1)])
         +- FileScan csv [DEST_COUNTRY_NAME#57] Batched: false, DataFilters: [], Format: CSV, Location: InMemo

In [18]:
from pyspark.sql.functions import max

#flightData2015.select(max(flightData2015["count"])).take(1)

flightData2015.select(max("count")).take(1)

[Row(max(count)=370002)]

In [19]:
maxSql = spark.sql("""
SELECT DEST_COUNTRY_NAME, sum(count) as destination_total
FROM flight_data_2015
GROUP BY DEST_COUNTRY_NAME
ORDER BY sum(count) DESC
LIMIT 5
""")

maxSql.show()

+-----------------+-----------------+
|DEST_COUNTRY_NAME|destination_total|
+-----------------+-----------------+
|    United States|           411352|
|           Canada|             8399|
|           Mexico|             7140|
|   United Kingdom|             2025|
|            Japan|             1548|
+-----------------+-----------------+



In [21]:
from pyspark.sql.functions import desc

flightData2015.groupBy("DEST_COUNTRY_NAME")\
.sum("count").withColumnRenamed("sum(count)", "destination_total")\
.sort(desc("destination_total")).limit(5).show()

+-----------------+-----------------+
|DEST_COUNTRY_NAME|destination_total|
+-----------------+-----------------+
|    United States|           411352|
|           Canada|             8399|
|           Mexico|             7140|
|   United Kingdom|             2025|
|            Japan|             1548|
+-----------------+-----------------+



In [22]:
flightData2015.groupBy("DEST_COUNTRY_NAME")\
.sum("count").withColumnRenamed("sum(count)", "destination_total")\
.sort(desc("destination_total")).limit(5).explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- TakeOrderedAndProject(limit=5, orderBy=[destination_total#206L DESC NULLS LAST], output=[DEST_COUNTRY_NAME#57,destination_total#206L])
   +- HashAggregate(keys=[DEST_COUNTRY_NAME#57], functions=[sum(count#59)])
      +- Exchange hashpartitioning(DEST_COUNTRY_NAME#57, 5), ENSURE_REQUIREMENTS, [plan_id=429]
         +- HashAggregate(keys=[DEST_COUNTRY_NAME#57], functions=[partial_sum(count#59)])
            +- FileScan csv [DEST_COUNTRY_NAME#57,count#59] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/Users/seilylook/Development/Projects/chapter2/data/2015-summary...., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<DEST_COUNTRY_NAME:string,count:int>


