In [1]:
from pyspark import sql
from pyspark.sql import functions as f, udf

In [2]:
# Download the data files from AWS 
# !wget -q  https://s3.amazonaws.com/utcs378/flights.csv.bz2
!wget -q https://www.cs.utexas.edu/~kiat/datasets/flights.csv.bz2
    
# !wget -q  https://s3.amazonaws.com/utcs378/airports.csv.bz2
!wget -q https://www.cs.utexas.edu/~kiat/datasets/airports.csv

In [3]:
sqlContext = sql.SparkSession.builder \
    .master("local") \
    .appName("Flight DF") \
    .getOrCreate()

flights = sqlContext.read.format('csv')\
    .options(header='true', inferSchema='true')\
    .load("flights.csv.bz2")

airport = sqlContext.read.format('csv')\
    .options(header='true', inferSchema='true')\
    .load("airports.csv")

25/10/14 12:29:24 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.
                                                                                

In [4]:
flights.printSchema()
airport.printSchema()

root
 |-- YEAR: integer (nullable = true)
 |-- MONTH: integer (nullable = true)
 |-- DAY: integer (nullable = true)
 |-- DAY_OF_WEEK: integer (nullable = true)
 |-- AIRLINE: string (nullable = true)
 |-- FLIGHT_NUMBER: integer (nullable = true)
 |-- TAIL_NUMBER: string (nullable = true)
 |-- ORIGIN_AIRPORT: string (nullable = true)
 |-- DESTINATION_AIRPORT: string (nullable = true)
 |-- SCHEDULED_DEPARTURE: integer (nullable = true)
 |-- DEPARTURE_TIME: integer (nullable = true)
 |-- DEPARTURE_DELAY: integer (nullable = true)
 |-- TAXI_OUT: integer (nullable = true)
 |-- WHEELS_OFF: integer (nullable = true)
 |-- SCHEDULED_TIME: integer (nullable = true)
 |-- ELAPSED_TIME: integer (nullable = true)
 |-- AIR_TIME: integer (nullable = true)
 |-- DISTANCE: integer (nullable = true)
 |-- WHEELS_ON: integer (nullable = true)
 |-- TAXI_IN: integer (nullable = true)
 |-- SCHEDULED_ARRIVAL: integer (nullable = true)
 |-- ARRIVAL_TIME: integer (nullable = true)
 |-- ARRIVAL_DELAY: integer (null

# Questions 

## Q1 Find a list of Origin Airports

In [5]:
#Q1 Find a list of Origin Airports
flights.select("ORIGIN_AIRPORT").distinct().show()



+--------------+
|ORIGIN_AIRPORT|
+--------------+
|           BGM|
|           PSE|
|           INL|
|           MSY|
|           PPG|
|           GEG|
|           SNA|
|           BUR|
|           GRB|
|           GTF|
|           IDA|
|           GRR|
|           JLN|
|           EUG|
|           PSG|
|           GSO|
|           PVD|
|           MYR|
|           OAK|
|           MSN|
+--------------+
only showing top 20 rows


                                                                                

## Q2 Find a list of (Origin, Destination) pairs


In [6]:
#Q2 Find a list of (Origin, Destination) pairs
flights.select("ORIGIN_AIRPORT", "DESTINATION_AIRPORT").distinct().show()



+--------------+-------------------+
|ORIGIN_AIRPORT|DESTINATION_AIRPORT|
+--------------+-------------------+
|           BQN|                MCO|
|           PHL|                MCO|
|           MCI|                IAH|
|           SPI|                ORD|
|           SNA|                PHX|
|           LBB|                DEN|
|           ORD|                PDX|
|           EWR|                STT|
|           ATL|                GSP|
|           MCI|                MKE|
|           PBI|                DCA|
|           SMF|                BUR|
|           MDW|                MEM|
|           LAS|                LIT|
|           TPA|                ACY|
|           DSM|                EWR|
|           FSD|                ATL|
|           SJC|                LIH|
|           CLE|                SJU|
|           CPR|                DEN|
+--------------+-------------------+
only showing top 20 rows


                                                                                

## Q3 Find the Origin airport which had the largest departure delay in the month of January


In [7]:
flights.where(flights.MONTH == 1)\
    .orderBy("DEPARTURE_DELAY", ascending=False)\
    .limit(1)

DataFrame[YEAR: int, MONTH: int, DAY: int, DAY_OF_WEEK: int, AIRLINE: string, FLIGHT_NUMBER: int, TAIL_NUMBER: string, ORIGIN_AIRPORT: string, DESTINATION_AIRPORT: string, SCHEDULED_DEPARTURE: int, DEPARTURE_TIME: int, DEPARTURE_DELAY: int, TAXI_OUT: int, WHEELS_OFF: int, SCHEDULED_TIME: int, ELAPSED_TIME: int, AIR_TIME: int, DISTANCE: int, WHEELS_ON: int, TAXI_IN: int, SCHEDULED_ARRIVAL: int, ARRIVAL_TIME: int, ARRIVAL_DELAY: int, DIVERTED: int, CANCELLED: int, CANCELLATION_REASON: string, AIR_SYSTEM_DELAY: int, SECURITY_DELAY: int, AIRLINE_DELAY: int, LATE_AIRCRAFT_DELAY: int, WEATHER_DELAY: int]

In [8]:
#Q3 Find the Origin airport which had the largest departure delay in the month of January
flights.where(flights.MONTH == 1)\
    .orderBy("DEPARTURE_DELAY", ascending=False)\
    .limit(1)\
    .select("ORIGIN_AIRPORT")\
    .show()



+--------------+
|ORIGIN_AIRPORT|
+--------------+
|           BHM|
+--------------+



                                                                                

## Q4 Find out which Airline has the largest delay on Weekends. 


In [9]:
#Q4 Find out which Airline has the largest delay on Weekends. 
flights.filter("DAY_OF_WEEK = 6 OR DAY_OF_WEEK = 7" )\
    .orderBy("DEPARTURE_DELAY", ascending=False)\
    .limit(1)\
    .select("AIRLINE")\
    .show()

[Stage 11:=====>                                                   (1 + 9) / 10]

+-------+
|AIRLINE|
+-------+
|     AA|
+-------+



                                                                                

In [10]:
# flights.show()

In [11]:
flights.filter("CANCELLED = 1")\
    .withColumn("COUNT", f.lit(1))\
    .groupBy("ORIGIN_AIRPORT")\
    .agg(f.sum("COUNT").alias("COUNT")).show()



+--------------+-----+
|ORIGIN_AIRPORT|COUNT|
+--------------+-----+
|           BGM|    3|
|           PSE|   11|
|           MSY|  411|
|           GEG|   49|
|           SNA|  386|
|           BUR|  315|
|           GTF|   14|
|           GRB|  151|
|           IDA|   19|
|           GRR|  276|
|           JLN|   33|
|           EUG|   67|
|           PSG|    9|
|           GSO|  256|
|           PVD|  267|
|           MYR|   98|
|           OAK|  573|
|           FSM|   58|
|           FAR|  111|
|           MSN|  284|
+--------------+-----+
only showing top 20 rows


                                                                                

## Q5 Which airport has the most cancellation of flights?


In [12]:
#Q5 Which airport has the most cancellation of flights?
flights.filter("CANCELLED = 1")\
    .withColumn("COUNT", f.lit(1))\
    .groupBy("ORIGIN_AIRPORT")\
    .agg(f.sum("COUNT").alias("COUNT"))\
    .orderBy("COUNT", ascending=False)\
    .limit(2).show()
#     .select("ORIGIN_AIRPORT", "COUNT")\
#     .show()

[Stage 15:=====>                                                   (1 + 9) / 10]

+--------------+-----+
|ORIGIN_AIRPORT|COUNT|
+--------------+-----+
|           ORD| 8548|
|           DFW| 6254|
+--------------+-----+



                                                                                

## Q6 Find the percent of flights cancelled for each Airline.


In [13]:
#Q6 Find the percent of flights cancelled for each Airline.
flights.withColumn("TOTAL", f.lit(1))\
    .groupBy("AIRLINE")\
    .agg(f.sum("CANCELLED").alias("TOTAL_CANCELLED"), f.sum("TOTAL").alias("TOTAL"))\
    .withColumn("CANCEL_RATE", f.col("TOTAL_CANCELLED")/f.col("TOTAL")*100)\
    .show()



+-------+---------------+-------+-------------------+
|AIRLINE|TOTAL_CANCELLED|  TOTAL|        CANCEL_RATE|
+-------+---------------+-------+-------------------+
|     UA|           6573| 515723|  1.274521400053905|
|     NK|           2004| 117379| 1.7072900604026275|
|     AA|          10919| 725984| 1.5040276369727157|
|     EV|          15231| 571977| 2.6628693111785964|
|     B6|           4276| 267048| 1.6012102693148795|
|     DL|           3824| 875881| 0.4365889886868193|
|     OO|           9960| 588353| 1.6928612584621818|
|     F9|            588|  90836| 0.6473204456382932|
|     US|           4067| 198715| 2.0466497244797823|
|     MQ|          15025| 294632| 5.0995818512585185|
|     HA|            171|  76272|0.22419760855884205|
|     AS|            669| 172521|   0.38777887909298|
|     VX|            534|  61903| 0.8626399366751207|
|     WN|          16043|1261855| 1.2713822111098343|
+-------+---------------+-------+-------------------+



                                                                                

## Q7 Find the largest departure delay for each Airline


In [14]:
#Q7 Find the largest departure delay for each Airline
flights.groupBy("AIRLINE")\
    .agg(f.max("DEPARTURE_DELAY").alias("MAX_DEPARTURE_DELAY"))\
    .show()

[Stage 21:=====>                                                   (1 + 9) / 10]

+-------+-------------------+
|AIRLINE|MAX_DEPARTURE_DELAY|
+-------+-------------------+
|     UA|               1314|
|     NK|                836|
|     AA|               1988|
|     EV|               1274|
|     B6|               1006|
|     DL|               1289|
|     OO|               1378|
|     F9|               1112|
|     US|                759|
|     MQ|               1544|
|     HA|               1433|
|     AS|                963|
|     VX|                644|
|     WN|                665|
+-------+-------------------+



                                                                                

In [15]:
#Q8 Find the largest departure delay for each carrier for each month
flights.groupBy("AIRLINE", "MONTH")\
    .agg(f.max("DEPARTURE_DELAY").alias("MAX_DEPARTURE_DELAY"))\
    .show()

[Stage 24:=====>                                                   (1 + 9) / 10]

+-------+-----+-------------------+
|AIRLINE|MONTH|MAX_DEPARTURE_DELAY|
+-------+-----+-------------------+
|     F9|    2|                852|
|     WN|    2|                506|
|     AS|    1|                538|
|     NK|    1|                557|
|     F9|    1|                696|
|     UA|    1|                886|
|     AA|    2|               1587|
|     VX|    1|                397|
|     UA|    2|                858|
|     B6|    2|                942|
|     US|    2|                536|
|     EV|    2|                600|
|     HA|    1|               1003|
|     NK|    2|                546|
|     WN|    1|                604|
|     VX|    2|                577|
|     OO|    1|                931|
|     DL|    2|               1159|
|     AA|    1|               1988|
|     HA|    2|               1433|
+-------+-----+-------------------+
only showing top 20 rows


                                                                                

## Q8 Find the largest departure delay for each carrier for each month
 

In [16]:
#Q9 For each carrier find the average Departure delay
flights.withColumn("TOTAL", f.lit(1))\
    .groupBy("AIRLINE")\
    .agg(f.sum("DEPARTURE_DELAY").alias("TOTAL_DEPARTURE_DELAY"), f.sum("TOTAL").alias("TOTAL"))\
    .withColumn("AVG_DEPARTURE_DELAY", f.col("TOTAL_DEPARTURE_DELAY")/f.col("TOTAL"))\
    .show()

[Stage 27:=====>                                                   (1 + 9) / 10]

+-------+---------------------+-------+-------------------+
|AIRLINE|TOTAL_DEPARTURE_DELAY|  TOTAL|AVG_DEPARTURE_DELAY|
+-------+---------------------+-------+-------------------+
|     UA|              7355348| 515723|  14.26220664969373|
|     NK|              1840887| 117379|  15.68327383944317|
|     AA|              6369435| 725984|  8.773519802089302|
|     EV|              4857338| 571977|   8.49219111957299|
|     B6|              3026467| 267048| 11.333044995656211|
|     DL|              6427294| 875881|  7.338090448359994|
|     OO|              4517510| 588353|   7.67823058605973|
|     F9|              1205449|  90836|  13.27060856928971|
|     US|              1196447| 198715| 6.0209194071912036|
|     MQ|              2837908| 294632|   9.63204268375465|
|     HA|                36972|  76272|0.48473882945248586|
|     AS|               306997| 172521| 1.7794761217474975|
|     VX|               553852|  61903|  8.947094648078446|
|     WN|             13186520|1261855| 

                                                                                

## Q10 For each Airline find the average Departure delay for each month


In [17]:
#Q10 For each Airline find the average Departure delay for each month
flights.withColumn("TOTAL", f.lit(1))\
    .groupBy("AIRLINE","MONTH")\
    .agg(f.sum("DEPARTURE_DELAY").alias("TOTAL_DEPARTURE_DELAY"), f.sum("TOTAL").alias("TOTAL"))\
    .withColumn("AVG_DEPARTURE_DELAY", f.col("TOTAL_DEPARTURE_DELAY")/f.col("TOTAL"))\
    .select("AIRLINE", "MONTH", "AVG_DEPARTURE_DELAY")\
    .show()



+-------+-----+-------------------+
|AIRLINE|MONTH|AVG_DEPARTURE_DELAY|
+-------+-----+-------------------+
|     F9|    2| 25.258564296780857|
|     WN|    2|  9.487024797054518|
|     AS|    1| 3.1640642679339215|
|     NK|    1| 13.001944412672996|
|     F9|    1| 17.763215697759556|
|     UA|    1| 13.666987889048054|
|     AA|    2|  9.613731643027489|
|     VX|    1|  6.793912492073558|
|     UA|    2| 14.501504070649924|
|     B6|    2|  18.38879044099033|
|     US|    2|  8.661360395317216|
|     EV|    2| 10.498648588772209|
|     HA|    1| 1.3055900621118013|
|     NK|    2| 16.419458523921374|
|     WN|    1|  9.349463225445312|
|     VX|    2| 12.152971820980346|
|     OO|    1| 11.860082304526749|
|     DL|    2| 11.892286971946653|
|     AA|    1| 10.380512494609501|
|     HA|    2|   2.57916594566534|
+-------+-----+-------------------+
only showing top 20 rows


                                                                                

## Q11 Which date of year has the highest rate  of flight cancellations? Rate of flight cancellation is calculated by deviding number of canceled flights by total number of flights.


In [18]:
#Q11 Which date of year has the highest rate  of flight cancellations?
# Rate of flight cancellation is calculated by deviding number of canceled flights by total number of flights.

flights.withColumn("TOTAL", f.lit(1))\
    .groupBy("YEAR","MONTH","DAY")\
    .agg(f.sum("CANCELLED").alias("TOTAL_CANCELLED"), f.sum("TOTAL").alias("TOTAL"))\
    .withColumn("CANCEL_RATE", f.col("TOTAL_CANCELLED")/f.col("TOTAL")*100)\
    .orderBy("CANCEL_RATE", ascending=False)\
    .limit(1)\
    .select("YEAR","MONTH","DAY")\
    .show()




+----+-----+---+
|YEAR|MONTH|DAY|
+----+-----+---+
|2015|    1| 27|
+----+-----+---+



                                                                                

## Q12 Calculate the number of flights to each destination state For each Airline, 
## Q13 To which state do they have the largest average delay?

You will need the airline and airport data sets for this question.

In [19]:
#Q12 Calculate the number of flights to each destination state
# For each Airline, for which state do they have the largest average delay? 
# You will need the airline and airport data sets for this question.

from pyspark.sql.types import ArrayType, IntegerType, StringType
from pyspark.sql.functions import udf
fold_list = udf(lambda x,y: sorted(zip(x,y))[-1][1],StringType())

#Q8 Find the largest departure delay for each carrier for each month
flights.withColumn("COUNT", f.lit(1))\
    .groupBy("AIRLINE", "DESTINATION_AIRPORT")\
    .agg(f.sum("DEPARTURE_DELAY").alias("TOTAL_DEPARTURE_DELAY"),f.sum("COUNT").alias("COUNT"))\
    .withColumn("DEPARTURE_AVG_DELAY", f.col("TOTAL_DEPARTURE_DELAY")/f.col("COUNT"))\
    .join(airport, flights.DESTINATION_AIRPORT == airport.IATA_CODE)\
    .select("AIRLINE", "TOTAL_DEPARTURE_DELAY", "STATE")\
    .groupBy("AIRLINE")\
    .agg(
        f.collect_list("TOTAL_DEPARTURE_DELAY").alias("delay"),
        f.collect_list("STATE").alias("state")
    )\
    .withColumn("MAX_AVGDELAY_STATE", fold_list(f.col("delay"), f.col("state")))\
    .select("AIRLINE", "MAX_AVGDELAY_STATE")\
    .show()




+-------+------------------+
|AIRLINE|MAX_AVGDELAY_STATE|
+-------+------------------+
|     UA|                IL|
|     NK|                IL|
|     AA|                TX|
|     EV|                GA|
|     B6|                NY|
|     DL|                GA|
|     OO|                IL|
|     F9|                CO|
|     US|                NC|
|     MQ|                IL|
|     HA|                HI|
|     AS|                WA|
|     VX|                CA|
|     WN|                IL|
+-------+------------------+



                                                                                