In [1]:
from pyspark.sql import SparkSession

In [12]:
spark = SparkSession.builder \
        .master("local") \
        .appName("stdg") \
        .getOrCreate()

In [13]:
flightData2015 = spark.read \
        .option("inferSchema", "true") \
        .option("header", "true") \
        .csv("data/flight-data/csv/2015-summary.csv")

In [14]:
flightData2015.take(3)

[Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Romania', count=15),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Croatia', count=1),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Ireland', count=344)]

In [15]:
flightData2015.sort("count").explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Sort [count#42 ASC NULLS FIRST], true, 0
   +- Exchange rangepartitioning(count#42 ASC NULLS FIRST, 200), ENSURE_REQUIREMENTS, [id=#77]
      +- FileScan csv [DEST_COUNTRY_NAME#40,ORIGIN_COUNTRY_NAME#41,count#42] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/Users/zach/Projects/stdg-pyspark-notebooks/data/flight-data/csv/..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<DEST_COUNTRY_NAME:string,ORIGIN_COUNTRY_NAME:string,count:int>




In [18]:
spark.conf.set("spark.sql.shuffle.partitions","5")
flightData2015.sort("count").explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Sort [count#42 ASC NULLS FIRST], true, 0
   +- Exchange rangepartitioning(count#42 ASC NULLS FIRST, 5), ENSURE_REQUIREMENTS, [id=#99]
      +- FileScan csv [DEST_COUNTRY_NAME#40,ORIGIN_COUNTRY_NAME#41,count#42] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/Users/zach/Projects/stdg-pyspark-notebooks/data/flight-data/csv/..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<DEST_COUNTRY_NAME:string,ORIGIN_COUNTRY_NAME:string,count:int>




In [19]:
flightData2015.sort("count").take(2)

[Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Singapore', count=1),
 Row(DEST_COUNTRY_NAME='Moldova', ORIGIN_COUNTRY_NAME='United States', count=1)]

In [28]:
flightData2015.createOrReplaceTempView("flight_data_2015")

In [21]:
sql = spark.sql("""
SELECT DEST_COUNTRY_NAME, count(1)
FROM flight_data_2015
GROUP BY DEST_COUNTRY_NAME
""")

df = flightData2015.groupBy("DEST_COUNTRY_NAME").count()

In [22]:
sql.explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[DEST_COUNTRY_NAME#40], functions=[count(1)])
   +- Exchange hashpartitioning(DEST_COUNTRY_NAME#40, 5), ENSURE_REQUIREMENTS, [id=#121]
      +- HashAggregate(keys=[DEST_COUNTRY_NAME#40], functions=[partial_count(1)])
         +- FileScan csv [DEST_COUNTRY_NAME#40] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/Users/zach/Projects/stdg-pyspark-notebooks/data/flight-data/csv/..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<DEST_COUNTRY_NAME:string>




In [23]:
df.explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[DEST_COUNTRY_NAME#40], functions=[count(1)])
   +- Exchange hashpartitioning(DEST_COUNTRY_NAME#40, 5), ENSURE_REQUIREMENTS, [id=#134]
      +- HashAggregate(keys=[DEST_COUNTRY_NAME#40], functions=[partial_count(1)])
         +- FileScan csv [DEST_COUNTRY_NAME#40] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/Users/zach/Projects/stdg-pyspark-notebooks/data/flight-data/csv/..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<DEST_COUNTRY_NAME:string>




In [32]:
spark.sql("""
SELECT max(count) FROM flight_data_2015
""").take(1)

[Row(max(count)=370002)]

In [33]:
from pyspark.sql import functions as F
flightData2015.select(F.max("count")).take(1)

[Row(max(count)=370002)]

In [34]:
maxSql = spark.sql("""
SELECT DEST_COUNTRY_NAME, sum(count) as destination_total
FROM flight_data_2015
GROUP BY DEST_COUNTRY_NAME
ORDER BY sum(count) DESC
LIMIT 5
""")

In [35]:
maxSql.show()

+-----------------+-----------------+
|DEST_COUNTRY_NAME|destination_total|
+-----------------+-----------------+
|    United States|           411352|
|           Canada|             8399|
|           Mexico|             7140|
|   United Kingdom|             2025|
|            Japan|             1548|
+-----------------+-----------------+



In [36]:
df = flightData2015.groupBy("DEST_COUNTRY_NAME") \
            .sum("count") \
            .withColumnRenamed("sum(count)", "destination_total") \
            .sort(F.desc("destination_total")) \
            .limit(5)

In [37]:
df.show()

+-----------------+-----------------+
|DEST_COUNTRY_NAME|destination_total|
+-----------------+-----------------+
|    United States|           411352|
|           Canada|             8399|
|           Mexico|             7140|
|   United Kingdom|             2025|
|            Japan|             1548|
+-----------------+-----------------+



In [38]:
df.explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- TakeOrderedAndProject(limit=5, orderBy=[destination_total#165L DESC NULLS LAST], output=[DEST_COUNTRY_NAME#40,destination_total#165L])
   +- HashAggregate(keys=[DEST_COUNTRY_NAME#40], functions=[sum(count#42)])
      +- Exchange hashpartitioning(DEST_COUNTRY_NAME#40, 5), ENSURE_REQUIREMENTS, [id=#529]
         +- HashAggregate(keys=[DEST_COUNTRY_NAME#40], functions=[partial_sum(count#42)])
            +- FileScan csv [DEST_COUNTRY_NAME#40,count#42] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/Users/zach/Projects/stdg-pyspark-notebooks/data/flight-data/csv/..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<DEST_COUNTRY_NAME:string,count:int>


