In [1]:
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession.builder.getOrCreate()

### Load csv data into a spark DataFrame

In [9]:
flightData2015 = spark.read.option("inferSchema", "true")\
.option("header", "true")\
.csv(r"C:\Users\quang\OneDrive\Documents\teaching\big_data_fundamental\week9\lab_w9\2015-summary.csv")

In [10]:
flightData2015

DataFrame[DEST_COUNTRY_NAME: string, ORIGIN_COUNTRY_NAME: string, count: int]

In [11]:
flightData2015.schema

StructType([StructField('DEST_COUNTRY_NAME', StringType(), True), StructField('ORIGIN_COUNTRY_NAME', StringType(), True), StructField('count', IntegerType(), True)])

### Perform the take action on the DataFrame,

In [13]:
flightData2015.take(3)

[Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Romania', count=15),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Croatia', count=1),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Ireland', count=344)]

### Check the explain plan of transformation

In [15]:
flightData2015.sort("count").explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Sort [count#48 ASC NULLS FIRST], true, 0
   +- Exchange rangepartitioning(count#48 ASC NULLS FIRST, 200), ENSURE_REQUIREMENTS, [plan_id=80]
      +- FileScan csv [DEST_COUNTRY_NAME#46,ORIGIN_COUNTRY_NAME#47,count#48] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/C:/Users/quang/OneDrive/Documents/teaching/big_data_fundamental/..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<DEST_COUNTRY_NAME:string,ORIGIN_COUNTRY_NAME:string,count:int>




In [17]:
flightData2015.sort("count").take(5)

[Row(DEST_COUNTRY_NAME='Malta', ORIGIN_COUNTRY_NAME='United States', count=1),
 Row(DEST_COUNTRY_NAME='Saint Vincent and the Grenadines', ORIGIN_COUNTRY_NAME='United States', count=1),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Croatia', count=1),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Gibraltar', count=1),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Singapore', count=1)]

### DataFrames and SQL

In [18]:
flightData2015.createOrReplaceTempView("flight_data_2015")

In [19]:
sqlWay = spark.sql("""
SELECT DEST_COUNTRY_NAME, count(1)
FROM flight_data_2015
GROUP BY DEST_COUNTRY_NAME
""")

In [20]:
dataFrameWay = flightData2015\
.groupBy("DEST_COUNTRY_NAME")\
.count()

In [21]:
sqlWay.explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[DEST_COUNTRY_NAME#46], functions=[count(1)])
   +- Exchange hashpartitioning(DEST_COUNTRY_NAME#46, 200), ENSURE_REQUIREMENTS, [plan_id=108]
      +- HashAggregate(keys=[DEST_COUNTRY_NAME#46], functions=[partial_count(1)])
         +- FileScan csv [DEST_COUNTRY_NAME#46] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/C:/Users/quang/OneDrive/Documents/teaching/big_data_fundamental/..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<DEST_COUNTRY_NAME:string>




In [22]:
dataFrameWay.explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[DEST_COUNTRY_NAME#46], functions=[count(1)])
   +- Exchange hashpartitioning(DEST_COUNTRY_NAME#46, 200), ENSURE_REQUIREMENTS, [plan_id=121]
      +- HashAggregate(keys=[DEST_COUNTRY_NAME#46], functions=[partial_count(1)])
         +- FileScan csv [DEST_COUNTRY_NAME#46] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/C:/Users/quang/OneDrive/Documents/teaching/big_data_fundamental/..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<DEST_COUNTRY_NAME:string>




### Maximum number of flights

In [23]:
spark.sql("SELECT max(count) from flight_data_2015").take(1)

[Row(max(count)=370002)]

In [24]:
from pyspark.sql.functions import max
flightData2015.select(max("count")).take(1)

[Row(max(count)=370002)]

### Top five destination countries

In [25]:
maxSql = spark.sql("""
SELECT DEST_COUNTRY_NAME, sum(count) as destination_total
FROM flight_data_2015
GROUP BY DEST_COUNTRY_NAME
ORDER BY sum(count) DESC
LIMIT 5
""")
maxSql.show()

+-----------------+-----------------+
|DEST_COUNTRY_NAME|destination_total|
+-----------------+-----------------+
|    United States|           411352|
|           Canada|             8399|
|           Mexico|             7140|
|   United Kingdom|             2025|
|            Japan|             1548|
+-----------------+-----------------+



In [28]:
from pyspark.sql.functions import desc
flightData2015\
.groupBy("DEST_COUNTRY_NAME")\
.sum("count")\
.withColumnRenamed("sum(count)", "destination_total")\
.sort(desc("destination_total"))\
.limit(5)\
.show()

+-----------------+-----------------+
|DEST_COUNTRY_NAME|destination_total|
+-----------------+-----------------+
|    United States|           411352|
|           Canada|             8399|
|           Mexico|             7140|
|   United Kingdom|             2025|
|            Japan|             1548|
+-----------------+-----------------+



In [29]:
flightData2015\
.groupBy("DEST_COUNTRY_NAME")\
.sum("count")\
.withColumnRenamed("sum(count)", "destination_total")\
.sort(desc("destination_total"))\
.limit(5)\
.explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- TakeOrderedAndProject(limit=5, orderBy=[destination_total#148L DESC NULLS LAST], output=[DEST_COUNTRY_NAME#46,destination_total#148L])
   +- HashAggregate(keys=[DEST_COUNTRY_NAME#46], functions=[sum(count#48)])
      +- Exchange hashpartitioning(DEST_COUNTRY_NAME#46, 200), ENSURE_REQUIREMENTS, [plan_id=291]
         +- HashAggregate(keys=[DEST_COUNTRY_NAME#46], functions=[partial_sum(count#48)])
            +- FileScan csv [DEST_COUNTRY_NAME#46,count#48] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/C:/Users/quang/OneDrive/Documents/teaching/big_data_fundamental/..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<DEST_COUNTRY_NAME:string,count:int>


