In [1]:
from pyspark import SparkContext, SparkConf
conf = SparkConf().setAppName("EndToEndApp")
sc = SparkContext(conf = conf)

In [2]:
from pyspark.sql import SparkSession 
spark = SparkSession(sc)

In [3]:
# Creating a Spark DataFrame by reading from a csv
flightData2015 = spark\
        .read\
        .option("inferSchema", "true")\
        .option("header", "true")\
        .csv("../data/flight-data/csv/2015-summary.csv")

In [6]:
flightData2015.show(2)

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Romania|   15|
|    United States|            Croatia|    1|
+-----------------+-------------------+-----+
only showing top 2 rows



In [5]:
flightData2015.show(2)

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Romania|   15|
|    United States|            Croatia|    1|
+-----------------+-------------------+-----+
only showing top 2 rows



In [12]:
# sort and looking at the explain plan
flightData2015.sort("count").explain()

== Physical Plan ==
*(2) Sort [count#12 ASC NULLS FIRST], true, 0
+- Exchange rangepartitioning(count#12 ASC NULLS FIRST, 200)
   +- *(1) FileScan csv [DEST_COUNTRY_NAME#10,ORIGIN_COUNTRY_NAME#11,count#12] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/home/jovyan/work/data/flight-data/csv/2015-summary.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<DEST_COUNTRY_NAME:string,ORIGIN_COUNTRY_NAME:string,count:int>


In [13]:
# setting number of partion
spark.conf.set("spark.sql.shuffle.partitions", "5")

In [14]:
flightData2015.sort("count").show(5)

+--------------------+-------------------+-----+
|   DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+--------------------+-------------------+-----+
|               Malta|      United States|    1|
|Saint Vincent and...|      United States|    1|
|       United States|            Croatia|    1|
|       United States|          Gibraltar|    1|
|       United States|          Singapore|    1|
+--------------------+-------------------+-----+
only showing top 5 rows



In [15]:
# creating a temporary view from DF
flightData2015.createOrReplaceTempView("flight_data_2015")

In [26]:
# Groups by dest_country_name and count how many using SQL
sqlWay = spark.sql("""
SELECT DEST_COUNTRY_NAME, count(1)
FROM flight_data_2015
GROUP BY DEST_COUNTRY_NAME
""")

In [27]:
sqlWay.show(10)

+--------------------+--------+
|   DEST_COUNTRY_NAME|count(1)|
+--------------------+--------+
|             Moldova|       1|
|             Bolivia|       1|
|             Algeria|       1|
|Turks and Caicos ...|       1|
|            Pakistan|       1|
|    Marshall Islands|       1|
|            Suriname|       1|
|              Panama|       1|
|         New Zealand|       1|
|             Liberia|       1|
+--------------------+--------+
only showing top 10 rows



In [28]:
# Same using DataFrame operations
dataFrameWay = flightData2015\
    .groupBy("DEST_COUNTRY_NAME")\
    .count()

In [31]:
# using SQL
spark.sql("SELECT max(count) from flight_data_2015").take(1)

[Row(max(count)=370002)]

In [32]:
# using DataFrame operations
from pyspark.sql.functions import max
flightData2015.select(max("count")).take(1)

[Row(max(count)=370002)]

In [34]:
# top five destination countries in the data  

# SQL Way
maxSql = spark.sql("""
SELECT DEST_COUNTRY_NAME, sum(count) as destination_total
FROM flight_data_2015
GROUP BY DEST_COUNTRY_NAME
ORDER BY sum(count) DESC
LIMIT 5
""")
maxSql.show()

+-----------------+-----------------+
|DEST_COUNTRY_NAME|destination_total|
+-----------------+-----------------+
|    United States|           411352|
|           Canada|             8399|
|           Mexico|             7140|
|   United Kingdom|             2025|
|            Japan|             1548|
+-----------------+-----------------+



In [35]:
# DataFrame way
from pyspark.sql.functions import desc

flightData2015\
  .groupBy("DEST_COUNTRY_NAME")\
  .sum("count")\
  .withColumnRenamed("sum(count)", "destination_total")\
  .sort(desc("destination_total"))\
  .limit(5)\
  .show()

+-----------------+-----------------+
|DEST_COUNTRY_NAME|destination_total|
+-----------------+-----------------+
|    United States|           411352|
|           Canada|             8399|
|           Mexico|             7140|
|   United Kingdom|             2025|
|            Japan|             1548|
+-----------------+-----------------+



In [36]:
sc.stop()