In [4]:
from pyspark.sql import SparkSession
from pyspark.sql import types 

# Spark session & context
spark = SparkSession.builder.master('local').getOrCreate()
sc = spark.sparkContext

In [41]:
# TODO: specify the data input directory - this will read all CSV files from this directory
flightDF = spark.read.csv("original_data/", header="True", sep=",", inferSchema="False", nullValue="NA")
flightDF.show(2)
flightDF.printSchema()

+----+-----+----------+---------+-------+----------+-------+----------+-------------+---------+-------+-----------------+--------------+-------+--------+--------+------+----+--------+------+-------+---------+----------------+--------+------------+------------+--------+-------------+-----------------+
|Year|Month|DayofMonth|DayOfWeek|DepTime|CRSDepTime|ArrTime|CRSArrTime|UniqueCarrier|FlightNum|TailNum|ActualElapsedTime|CRSElapsedTime|AirTime|ArrDelay|DepDelay|Origin|Dest|Distance|TaxiIn|TaxiOut|Cancelled|CancellationCode|Diverted|CarrierDelay|WeatherDelay|NASDelay|SecurityDelay|LateAircraftDelay|
+----+-----+----------+---------+-------+----------+-------+----------+-------------+---------+-------+-----------------+--------------+-------+--------+--------+------+----+--------+------+-------+---------+----------------+--------+------------+------------+--------+-------------+-----------------+
|2008|    1|        20|        7|   1706|      1655|   1807|      1758|           OO|     5703

In [42]:
columnList = ["Year", "Month", "DayofMonth", "DepTime", "UniqueCarrier", "FlightNum", "ArrDelay", "Origin", "Dest"]
flightDF = flightDF.select(columnList)
flightDF = flightDF.withColumn("ArrDelay", flightDF.ArrDelay.cast("int"))
flightDF.printSchema()

root
 |-- Year: string (nullable = true)
 |-- Month: string (nullable = true)
 |-- DayofMonth: string (nullable = true)
 |-- DepTime: string (nullable = true)
 |-- UniqueCarrier: string (nullable = true)
 |-- FlightNum: string (nullable = true)
 |-- ArrDelay: integer (nullable = true)
 |-- Origin: string (nullable = true)
 |-- Dest: string (nullable = true)



In [43]:
flightDF.write.partitionBy("Dest",).parquet("wooo.parquet") 

Analytics Exploration - usually in a separate project/repo

In [44]:
# is this needed??
# parquetDF = spark.read.parquet("flights.parquet")
spark.sql("DROP TABLE IF EXISTS flights")
spark.sql("CREATE TEMPORARY VIEW flights USING parquet OPTIONS (path \"flights.parquet\")")

DataFrame[]

In [48]:
# Compute the average arrdelay of flights landing in LAX
spark.sql("SELECT AVG(ArrDelay) FROM flights GROUP BY Dest HAVING upper(Dest) LIKE 'LAX'").show()

+------------------+
|     avg(ArrDelay)|
+------------------+
|13.859731660568043|
+------------------+



In [52]:
# Compute the average arrdelay of flights where the origin is DEN and the dest is SFO
spark.sql("SELECT AVG(ArrDelay) FROM flights WHERE upper(Origin) LIKE 'DEN' AND upper(Dest) LIKE 'SFO' LIMIT 20").show()

+-------------+
|avg(ArrDelay)|
+-------------+
|      40.4375|
+-------------+



In [79]:
# Determine which dest airport had the highest average arrdelay
spark.sql("SELECT Dest FROM (SELECT Dest, RANK() OVER (ORDER BY AVG(ArrDelay) DESC) AS rank FROM flights GROUP BY Dest) WHERE rank=1").show()

+----+
|Dest|
+----+
| MLB|
+----+



In [85]:
# Assuming that we get a new file every hour. Write a script which would run and on hourly basis to calculate and store the average delay data.
# Is this meant to be an overall average or a running (hourly) average?
avgDelayDF = spark.sql("SELECT AVG(ArrDelay) AS AvgDelay FROM flights")
avgDelayDF.show()

+------------------+
|          AvgDelay|
+------------------+
|10.353448631124909|
+------------------+



In [91]:
flightDF.write.partitionBy("Dest",).mode('append').parquet("avgFlightDelay.parquet") 