### Connection & Import

In [75]:
rdd = sc.textFile("wasb://datasets@ahdn.blob.core.windows.net/air_transit.csv").map(lambda line: line.split(",")).cache()

In [1]:
#test using 100000 data
data = sc.textFile("wasb://datasets@ahdn.blob.core.windows.net/air_transit.csv").map(lambda line: line.split(","))\
.takeSample(False, 100000, 0)
rdd = sc.parallelize(data)  

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
1,application_1526481398570_0005,pyspark,idle,Link,Link,✔


SparkContext available as 'sc'.
HiveContext available as 'sqlContext'.


### Q1

In [320]:
q1 = rdd.count() # count number

print('The number of records is ' + str(q1))

The number of records is 123534969

### Q2

In [321]:
q21 = rdd.map(lambda x: (x[0],1)) # select year
q22 = q21.reduceByKey(lambda x, y: x+y) # aggregate on year by counting the number of flight
q23 = q22.sortBy(lambda x: x[0]) # order by year
q2 = q23.collect()

sqlContext.createDataFrame(q2, ['year', 'number of flights']).show(n=100)

+----+-----------------+
|year|number of flights|
+----+-----------------+
|1987|          1311826|
|1988|          5202096|
|1989|          5041200|
|1990|          5270893|
|1991|          5076925|
|1992|          5092157|
|1993|          5070501|
|1994|          5180048|
|1995|          5327435|
|1996|          5351983|
|1997|          5411843|
|1998|          5384721|
|1999|          5527884|
|2000|          5683047|
|2001|          5967780|
|2002|          5271359|
|2003|          6488540|
|2004|          7129270|
|2005|          7140596|
|2006|          7141922|
|2007|          7453215|
|2008|          7009728|
+----+-----------------+

### Q3

In [3]:
q31 = rdd.filter(lambda x: (x[10]!='UNKNOW')&(x[10]!='NA')&(x[10]!='')&(x[10]!='0')) # deal with missing values in flight number
q32 = q31.map(lambda x: (x[10],1)) # select flight number
q33 = q32.reduceByKey(lambda x, y: x+y) # aggregate on flight number by counting the number of flight
q34 = q33.sortBy(lambda x: -x[1]) # order by number of flight
q3 = q34.top(1) # select top 1

sqlContext.createDataFrame(q3, ['filght', 'number of flights']).show()

+------+-----------------+
|filght|number of flights|
+------+-----------------+
|�NKNO�|           184310|
+------+-----------------+

### Q4

In [324]:
# Assumption: we use air time to calculate total flight time
q41 = rdd.filter(lambda x: (x[10]!='UNKNOW')&(x[10]!='NA')&(x[10]!='')&(x[10]!='0')) # deal with missing values in flight number
q42 = q41.filter(lambda x: x[13]!='NA')  # deal with missing values in air time
q43 = q42.map(lambda x: (x[10], int(x[13]))) # select flight number and flight time; convert air time into integer
q44 = q43.reduceByKey(lambda x, y: x + y) # aggregated on flight number by summing up airtime
q45 = q44.sortBy(lambda x: -x[1]) # order by total air time
q4 = q45.collect()

sqlContext.createDataFrame(q4, ['filght', 'total air time']).show(10)

+------+--------------+
|filght|total air time|
+------+--------------+
|N551UA|       2968897|
|N543UA|       2959090|
|N552UA|       2945347|
|N548UA|       2940997|
|N545UA|       2939408|
|N550UA|       2938828|
|N549UA|       2938181|
|N544UA|       2932148|
|N546UA|       2890061|
|N547UA|       2864442|
+------+--------------+
only showing top 10 rows

### Q5

In [131]:
# Assumption: count the times(origin + destination)
q51 = rdd.map(lambda x: ((x[0], x[16]),1)).reduceByKey(lambda x, y: x+y) # select year and original airport, count the number of flights
q52 = rdd.map(lambda x: ((x[0], x[17]),1)).reduceByKey(lambda x, y: x+y) # select year and dest airport, count the number of flights
q53 = q51.join(q52).map(lambda x: (x[0][0], (x[1][0]+x[1][1], x[0][1]))) # sum up the times as origin and dest
q53 = q53.reduceByKey(max).map(lambda x: (x[0], x[1][1], x[1][0])) # for each year, select the busiest airport
q54 = q53.sortBy(lambda x: x[0]) # order by year
q5 = q54.collect()

sqlContext.createDataFrame(q5, ['year', 'busiest airport', 'number of flights(origin + dest)']).show(100)

+----+---------------+--------------------------------+
|year|busiest airport|number of flights(origin + dest)|
+----+---------------+--------------------------------+
|1987|            ORD|                          135046|
|1988|            ORD|                          546260|
|1989|            ORD|                          528136|
|1990|            ORD|                          544122|
|1991|            ORD|                          531638|
|1992|            ORD|                          557156|
|1993|            ORD|                          578747|
|1994|            ORD|                          561461|
|1995|            ORD|                          586894|
|1996|            ORD|                          592481|
|1997|            ORD|                          595017|
|1998|            ORD|                          589806|
|1999|            ORD|                          596531|
|2000|            ORD|                          593639|
|2001|            ORD|                          

### Q6

In [132]:
q61 = rdd.filter(lambda x: (x[0]=='2006')&(x[24]!='NA')&(x[25]!='NA')&(x[26]!='NA')&(x[27]!='NA')&(x[28]!='NA'))\
.map(lambda x: (x[8], x[24], x[25], x[26], x[27], x[28])) # select records from 2006 with no NA
# CarrierDelay
q62_1 = q61.map(lambda x: (x[0], (1, int(x[1]))))\
.reduceByKey(lambda x, y: (x[0]+y[0],x[1]+y[1])).map(lambda x: (x[0], x[1][1]/x[1][0])) # avg delay
q63_1 = q62_1.sortBy(lambda x: -x[1])
q64_1 = q63_1.top(1) # select the top 1
# WeatherDelay
q62_2 = q61.map(lambda x: (x[0], (1, int(x[2]))))\
.reduceByKey(lambda x, y: (x[0]+y[0],x[1]+y[1])).map(lambda x: (x[0], x[1][1]/x[1][0])) # select airline and the arrival delay
q63_2 = q62_2.sortBy(lambda x: -x[1])
q64_2 = q63_2.top(1)
# NASDelay
q62_3 = q61.map(lambda x: (x[0], (1, int(x[3]))))\
.reduceByKey(lambda x, y: (x[0]+y[0],x[1]+y[1])).map(lambda x: (x[0], x[1][1]/x[1][0])) # select airline and the arrival delay
q63_3 = q62_3.sortBy(lambda x: -x[1])
q64_3 = q63_3.top(1)
# SecurityDelay
q62_4 = q61.map(lambda x: (x[0], (1, int(x[4]))))\
.reduceByKey(lambda x, y: (x[0]+y[0],x[1]+y[1])).map(lambda x: (x[0], x[1][1]/x[1][0])) # select airline and the arrival delay
q63_4 = q62_4.sortBy(lambda x: -x[1])
q64_4 = q63_4.top(1)
# LateAircraftDelay
q62_5 = q61.map(lambda x: (x[0], (1, int(x[5]))))\
.reduceByKey(lambda x, y: (x[0]+y[0],x[1]+y[1])).map(lambda x: (x[0], x[1][1]/x[1][0])) # select airline and the arrival delay
q63_5 = q62_5.sortBy(lambda x: -x[1])
q64_5 = q63_5.top(1)

q64_1='CarrierDelay', q64_1[0][0], q64_1[0][1]
q64_2='WeatherDelay', q64_2[0][0], q64_2[0][1]
q64_3='NASDelay', q64_3[0][0], q64_3[0][1]
q64_4='SecurityDelay', q64_4[0][0], q64_4[0][1]
q64_5='LateAircraftDelay', q64_5[0][0], q64_5[0][1]
q6 = [q64_1, q64_2, q64_3, q64_4, q64_5]
sqlContext.createDataFrame(q6, ['type', 'airline', 'avg carrier delay']).show(100)

+-----------------+-------+-----------------+
|             type|airline|avg carrier delay|
+-----------------+-------+-----------------+
|     CarrierDelay|     YV|                5|
|     WeatherDelay|     YV|                0|
|         NASDelay|     YV|                2|
|    SecurityDelay|     YV|                0|
|LateAircraftDelay|     YV|                6|
+-----------------+-------+-----------------+

### Q7

In [76]:
# for mean
from pyspark.mllib.stat import Statistics
import numpy as np
from scipy import stats
q71 = rdd.filter(lambda x: (x[11]!='NA')&(x[12]!='NA')&(x[13]!='NA')&(x[14]!='NA')&(x[15]!='NA')&(x[18]!='NA')&(x[19]!='NA')\
                 &(x[20]!='NA')&(x[24]!='NA')&(x[25]!='NA')&(x[26]!='NA')&(x[27]!='NA')&(x[28]!='NA')).cache()
q72 = q71.map(lambda x: (int(x[11]), int(x[12]),int(x[13]),int(x[14]),int(x[15]),int(x[18]),int(x[19]),int(x[20]),int(x[24]),int(x[25]),int(x[26]),int(x[27]),int(x[28])))
summary = Statistics.colStats(q72)
q73=[int(i) for i in summary.mean()]

q7_mean=tuple(['mean']+q73)

In [77]:
# for median
col_list = [11, 12, 13, 14, 15, 18, 19, 20, 24, 25, 26, 27, 28]
median_list=[]
for i in col_list:
    q_median = q71.map(lambda x: int(x[i])).sortBy(lambda x: x)
    median_element=q_median.take(int(round(q_median.count()/2)+1))[-1]
    median_list.append(median_element)
q7_median=tuple(['median']+median_list)

In [78]:
# for mode
mode_list = []
for i in col_list:
    q_mode = q71.map(lambda x: (int(x[i]), 1)).reduceByKey(lambda x, y: x + y).sortBy(lambda x: -x[1]).map(lambda x: x[0])
    mode_element=q_mode.take(1)[0]
    mode_list.append(mode_element)
q7_mode=tuple(['mode']+mode_list) 

In [79]:
sqlContext.createDataFrame([q7_mean, q7_median, q7_mode], ['Type', 'ActualElapsedTime', 'CRSElapsedTime', 'AirTime', 'ArrDelay', 'DepDelay', 'Distance', 'TaxiIn', 'TaxiOut', 'CarrierDelay', 'WeatherDelay', 'NASDelay', 'SecurityDelay', 'LateAircraftDelay']).show()

+------+-----------------+--------------+-------+--------+--------+--------+------+-------+------------+------------+--------+-------------+-----------------+
|  Type|ActualElapsedTime|CRSElapsedTime|AirTime|ArrDelay|DepDelay|Distance|TaxiIn|TaxiOut|CarrierDelay|WeatherDelay|NASDelay|SecurityDelay|LateAircraftDelay|
+------+-----------------+--------------+-------+--------+--------+--------+------+-------+------------+------------+--------+-------------+-----------------+
|  mean|              125|           126|    103|       9|      10|     725|     7|     16|           3|           0|       4|            0|                4|
|median|              107|           108|     85|       0|       0|     569|     5|     14|           0|           0|       0|            0|                0|
|  mode|               70|            70|     45|      -5|       0|     337|     5|     10|           0|           0|       0|            0|                0|
+------+-----------------+--------------+-----

### Q8

In [147]:
q81 = rdd2.filter(lambda x: (x[0]=='2007')&(x[1]=='2')&(x[2]=='11'))\
.filter(lambda x: (x[10]!='UNKNOW')&(x[10]!='NA')&(x[10]!='')&(x[10]!='0')) # select records in 2017-02-11 and filght number is not NA
q821 = q81.filter(lambda x: x[17]=='ATL').map(lambda x: (x[17], (x[16], x[5], x[7], x[10]))) # create rdd includes destination
q822 = q81.filter(lambda x: x[16]=='ATL').map(lambda x: (x[16], (x[17], x[5], x[7], x[10]))) # create rdd includes origin 
q82 = q821.join(q822).filter(lambda x: x[1][0][0]!=x[1][1][0])\
.map(lambda x: (x[0], x[1][0][1], x[1][0][2], x[1][0][3], x[1][1][1],x[1][1][2], x[1][1][3])) # join them on origin = destination, filter out return flight and reset
q83 = q82.filter(lambda x: ((int(x[4][:-2])*60+int(x[4][-2:]))-(int(x[2][:-2])*60+int(x[2][-2:]))<120)\
        & ((int(x[4][:-2])*60+int(x[4][-2:]))-(int(x[2][:-2])*60+int(x[2][-2:]))>60)) # select stopover between 60 and 120
q8 = q83.map(lambda x: (x[0], x[3], x[6])).take(100)

sqlContext.createDataFrame(q8, ['airline', '1st flight num', '2nd flight num']).show(100)

+-------+--------------+--------------+
|airline|1st flight num|2nd flight num|
+-------+--------------+--------------+
|    ATL|        N722EV|        N681BR|
|    ATL|        N722EV|        N881AS|
|    ATL|        N722EV|        N932EV|
|    ATL|        N722EV|        N675DL|
|    ATL|        N722EV|        N902DL|
|    ATL|        N722EV|        N605DL|
|    ATL|        N722EV|        N661DN|
|    ATL|        N722EV|        N924EV|
|    ATL|        N722EV|        N672DL|
|    ATL|        N722EV|        N629DL|
|    ATL|        N722EV|        N684DA|
|    ATL|        N722EV|        N396DA|
|    ATL|        N722EV|        N971DL|
|    ATL|        N722EV|        N976DL|
|    ATL|        N722EV|        N813SK|
|    ATL|        N722EV|        N760EV|
|    ATL|        N722EV|        N438AA|
|    ATL|        N722EV|        N950DL|
|    ATL|        N722EV|        N913DL|
|    ATL|        N722EV|        N905EV|
|    ATL|        N722EV|        N708EV|
|    ATL|        N722EV|        N837AS|


### Q9

In [4]:
q911 = rdd.filter(lambda x: (x[0]=='2007') & (x[16]=='PHL') & (x[17]=='LAX'))\
.map(lambda x: ((x[8], x[1], x[2]), (x[5], x[7]))).filter(lambda x: int(x[1][0])>559) # select records from PHL to LAX in 2007 and dept time after 5:59
q912 = rdd.filter(lambda x: (x[0]=='2007') & (x[16]=='PHL') & (x[17]=='LAX'))\
.map(lambda x: ((x[8], x[1], x[2]), (x[5], x[7]))).filter(lambda x: int(x[1][1])<=2300) # select records from LAX to PHL in 2007 and arrive time before 23:00 
q92 = q911.join(q912).map(lambda x: (x[0][0], x[1][0], x[1][1])) # merge the two flight
q93 = q92.filter(lambda x: (int(x[2][0][:-2])*60+int(x[2][0][-2:]))-(int(x[1][1][:-2])*60+int(x[1][1][-2:]))<300) # select the records whose staying time within 5 hours and 
q94 = q93.map(lambda x: (x[0], 1)).reduceByKey(lambda x, y: x + y).sortBy(lambda x: -x[1])
q9 = q94.collect() # get the carrier with highest count

sqlContext.createDataFrame(q9, ['carrier', 'count']).show(1)

+-------+-----+
|carrier|count|
+-------+-----+
|     US| 5020|
+-------+-----+
only showing top 1 row

### Q10

In [322]:
q01 = rdd.filter(lambda x:(x[0]=='2007')&(x[1]=='1')&(x[2]=='12')&(x[4]!='NA'))\
.filter(lambda x: (int(x[4])>1200) & (int(x[4])<1400)) # select flight on 2007-01-12, depart time between 12:00 and 14:00
def if_leave(x):
    # if before 13:00, exclude actual time
    if int(x) > 1300:
        re='tbd'
    else:
        re = x
    return re
q02 = q01.map(lambda x: (x[5], if_leave(x[4]), x[8], x[17])) # select focal collumns
q03 = q02.sortBy(lambda x: x[0]) # order by scheduled departure time
q10 = q02.collect()

sqlContext.createDataFrame(q10, ['scheduled departure time', 'actual departure time', 'airline code', 'destination']).show(100)

+------------------------+---------------------+------------+-----------+
|scheduled departure time|actual departure time|airline code|destination|
+------------------------+---------------------+------------+-----------+
|                    1145|                 1255|          YV|        SBA|
|                    1220|                 1217|          YV|        IAD|
|                    1235|                 1230|          YV|        PHX|
|                    1235|                 1224|          YV|        HNL|
|                    1136|                 1238|          YV|        IAD|
|                    1344|                  tbd|          NW|        ORF|
|                    1245|                 1245|          YV|        CLT|
|                    1235|                 1225|          YV|        CLT|
|                    1240|                 1241|          AA|        BDL|
|                    1225|                 1221|          XE|        EWR|
|                    1226|            