# Table of contents<a class="anchor" id="table"></a>

* [1 Working with RDD](#1)
* [1.1 Data Preparation and Loading](#1.1)
* [1.1.1 Creating SparkSession & SparkContext](#OneOneOne)
* [1.1.2 Read CSV files, Preprocessing, and final(formatted data) RDD for each file](#OneOneTwo)
* [1.1.2.1 Flights RDD](#1.1.2.1)
* [1.1.2.2 Airports RDD](#1.1.2.2)
* [1.1.3 Show RDD number of columns, and number of records](#1.1.3)
* [1.2 Dataset flights partitioning](#1.2)
* [1.2.1 Obtain the maximum arrival time ](#1.2.1)
* [1.2.2 Obtain the maximum minimum time ](#1.2.2)
* [1.2.3 Define hash partitioning](#1.2.3)
* [1.2.4 Display the records in each partition](#1.2.4)
* [1.3 Query RDD](#1.3)
* [1.3.1 Collect a total number of flights for each month for all flights](#1.3.1)
* [1.3.2 Collect the average delay for each month for all flights](#1.3.2)
* [2 Working with DataFrames](#2)
* [2.1 Data Preparation and Loading](#2.1)
* [2.1.1 Define DataFrames](#2.1.1)
* [2.1.2 Display the Scheme of DataFrames](#2.1.2)
* [2.1.3 Transform date-time and location column](#2.1.3)
* [2.2.1 January Flights Events with ANC airport](#2.2.1)
* [2.2.2 Average Arrival Delay From Origin to Destination](#2.2.2)
* [2.2.3 Join Query with Airports DataFrame](#2.2.3)
* [2.3 Analysis](#2.3.1)
* [2.3.1 Relationship between day of week with mean arrival delay, total time delay, and count flights](#2.3.1)
* [2.3.2 Display mean arrival delay each month](#2.3.2)
* [2.3.3 Relationship between mean departure delay and mean arrival delay](#2.3.3)
* [3 RDDs vs DataFrame vs Spark SQL](#3)
* [3.1 RDD Operation](#3.1)
* [3.2 DataFrame Operation](#3.1)
* [3.3 Spark SQL Operation](#3.1)
* [3.4 Discussion](#3.1)


# 1 Working with RDD<a class="anchor" id="1"></a>
## 1.1 Data Preparation and Loading<a class="anchor" id="1.1"></a>
### 1.1.1 Create SparkSession and SparkContext<a class="anchor" id="OneOneOne"></a>
[Back to top](#table)

In [3]:
# Import SparkConf class into program
from pyspark import SparkConf

# local[*]: run Spark in local mode with as many working processors as logical cores on your machine
# If we want Spark to run locally with 'k' worker threads, we can specify as "local[k]".
master = "local[*]"
# The `appName` field is a name to be shown on the Spark cluster UI page
app_name = "31395996_Assignment 1"
# Setup configuration parameters for Spark
spark_conf = SparkConf().setMaster(master).setAppName(app_name)

# Import SparkSession classes 
from pyspark.sql import SparkSession # Spark SQL

#TODO : Initialize Spark Session and create a SparkContext Object
spark = SparkSession.builder.config(conf=spark_conf).getOrCreate()
sc= spark.sparkContext
sc.setLogLevel("ERROR")

### 1.1.2 Import CSV files and Make RDD for each file<a class="anchor" id="OneOneTwo"></a>
[Back to top](#table)

In [98]:
from pyspark.sql import Row

#### 1.1.2.1 Flights RDD <a class="anchor" id="1.1.2.1"></a>
[Back to top](#table)

NOTE: There are airplanes cancled or diverted. We can't change a ' ' string into float format. So, I just drop them which airplane are cancled or diverted.

In [99]:
def load_flights_rdd():
    raw_flights_rdd = sc.textFile('flight-delays/flight*.csv')
    header=raw_flights_rdd.first()
    flights_rdd=raw_flights_rdd.filter(lambda x:x!=header)\
                               .map(lambda x:x.split(','))\
                               .filter(lambda x:x[23]!='1' and x[24]!='1')\
                               .map(lambda x:Row(YEAR=int(x[0]),
                                MONTH=int(x[1]),DAY=int(x[2]),DAY_OF_WEEK=int(x[3]),
                                AIRLINE=x[4],FLIGHT_NUMBER=int(x[5]),TAIL_NUMBER=x[6],
                                ORIFIN_AIRPORT=x[7],DESTINATION_AIRPORT=x[8],
                                SHEDULED_DEPARTURE=x[9],DEPARTURE_TIME=x[10],
                                DEPARTURE_DELAY=float(x[11]),TAXI_OUR=float(x[12]),
                                WHEELS_OFF=x[13],SCHEDULED_TIME=x[14],ELAPSED_TIME=float(x[15]),
                                AIR_TIME=float(x[16]),DISTANCE=float(x[17]),
                                WHEELS_ON=x[18],TAXI_IN=float(x[19]),
                                SHEDULED_ARRIVAL=x[20],ARRIVAL_TIME=x[21],
                                ARRIVAL_DELAY=float(x[22]),DIVERTED=x[23],
                                CANCELLED=x[24],CANCELLATION_REASON=x[25],
                                AIR_SYSTEM_DELAY=x[26],SECURITY_DELAY=x[27],
                                AIRLINE_DELAY=x[28],LATE_AIRCRAFT_DELAY=x[29],
                                WEATHER_DELAY=x[30]))
    return flights_rdd
                                
flights_rdd=load_flights_rdd()   
flights_rdd.take(5)



[Row(YEAR=2015, MONTH=6, DAY=26, DAY_OF_WEEK=5, AIRLINE='EV', FLIGHT_NUMBER=4951, TAIL_NUMBER='N707EV', ORIFIN_AIRPORT='BHM', DESTINATION_AIRPORT='LGA', SHEDULED_DEPARTURE='630', DEPARTURE_TIME='629', DEPARTURE_DELAY=-1.0, TAXI_OUR=13.0, WHEELS_OFF='642', SCHEDULED_TIME='155', ELAPSED_TIME=141.0, AIR_TIME=113.0, DISTANCE=866.0, WHEELS_ON='935', TAXI_IN=15.0, SHEDULED_ARRIVAL='1005', ARRIVAL_TIME='950', ARRIVAL_DELAY=-15.0, DIVERTED='0', CANCELLED='0', CANCELLATION_REASON='', AIR_SYSTEM_DELAY='', SECURITY_DELAY='', AIRLINE_DELAY='', LATE_AIRCRAFT_DELAY='', WEATHER_DELAY=''),
 Row(YEAR=2015, MONTH=12, DAY=19, DAY_OF_WEEK=6, AIRLINE='WN', FLIGHT_NUMBER=3589, TAIL_NUMBER='N764SW', ORIFIN_AIRPORT='MKE', DESTINATION_AIRPORT='DCA', SHEDULED_DEPARTURE='1630', DEPARTURE_TIME='1627', DEPARTURE_DELAY=-3.0, TAXI_OUR=13.0, WHEELS_OFF='1640', SCHEDULED_TIME='115', ELAPSED_TIME=96.0, AIR_TIME=79.0, DISTANCE=634.0, WHEELS_ON='1859', TAXI_IN=4.0, SHEDULED_ARRIVAL='1925', ARRIVAL_TIME='1903', ARRIVAL_DE

#### 1.1.2.2 Airports RDD <a class="anchor" id="1.1.2.2"></a>
[Back to top](#table)

In [100]:
def load_airports_rdd():
    raw_airports_rdd = sc.textFile('flight-delays/airports.csv')
    header=raw_airports_rdd.first()
    airports_rdd=raw_airports_rdd.filter(lambda x:x!=header)\
                                    .map(lambda x:x.split(','))\
                                    .map(lambda x:Row(TATA_CODE=x[0],
                                                     AIRPORT=x[1],
                                                     CITY=x[2],
                                                     STATE=x[3],
                                                     COUNTRY=x[4],
                                                     LATITUDE=x[5],
                                                     LONGITUDE=x[6]))
    
    return airports_rdd
    
airports_rdd=load_airports_rdd()
airports_rdd.take(5)

[Row(TATA_CODE='ABE', AIRPORT='Lehigh Valley International Airport', CITY='Allentown', STATE='PA', COUNTRY='USA', LATITUDE='40.65236', LONGITUDE='-75.44040'),
 Row(TATA_CODE='ABI', AIRPORT='Abilene Regional Airport', CITY='Abilene', STATE='TX', COUNTRY='USA', LATITUDE='32.41132', LONGITUDE='-99.68190'),
 Row(TATA_CODE='ABQ', AIRPORT='Albuquerque International Sunport', CITY='Albuquerque', STATE='NM', COUNTRY='USA', LATITUDE='35.04022', LONGITUDE='-106.60919'),
 Row(TATA_CODE='ABR', AIRPORT='Aberdeen Regional Airport', CITY='Aberdeen', STATE='SD', COUNTRY='USA', LATITUDE='45.44906', LONGITUDE='-98.42183'),
 Row(TATA_CODE='ABY', AIRPORT='Southwest Georgia Regional Airport', CITY='Albany', STATE='GA', COUNTRY='USA', LATITUDE='31.53552', LONGITUDE='-84.19447')]

### 1.1.3 Show RDD number of columns, and number of records <a class="anchor" id="1.1.3"></a>
[Back to top](#table)

In [7]:
print('airports_rdd\'s length:', len(airports_rdd.take(1)[0]))
print('total number of records: ', airports_rdd.count())
print('total number of partitions:', airports_rdd.getNumPartitions())

airports_rdd's length: 7
total number of records:  322
total number of partitions: 2


In [116]:
print('flights_rdd\'s length:', len(flights_rdd.take(1)[0]))
print('total number of records: ', flights_rdd.count())
print('total number of partitions:', flights_rdd.getNumPartitions())


flights_rdd's length: 31
total number of records:  571729
total number of partitions: 20


NOTE: Flights total number of records are records without cancled airplane.

## 1.2 Dataset Partitioning <a class="anchor" id="1.2"></a>
### 1.2.1 Obtain the maximum arrival delay <a class="anchor" id="1.2.1"></a>
[Back to top](#table)

In [9]:
flights_rdd.max(lambda x: x['ARRIVAL_DELAY'])['ARRIVAL_DELAY']

1665.0

### 1.2.2 Obtain the minimum arrival delay <a class="anchor" id="1.2.2"></a>
[Back to top](#table)

In [10]:
flights_rdd.min(lambda x: x['ARRIVAL_DELAY'])['ARRIVAL_DELAY']

-82.0

### 1.2.3 Define hash partitioning function <a class="anchor" id="1.2.3"></a>
[Back to top](#table)

When we look close to ARRIVAL_DELAY, we will find they have float value,nagetive value,and nonetype. So we need to do a key type check.

In [16]:
def key_type_check(key):
    if key == None:
        return 0
    return int(key)
def hash_partition(rdd):
    kv=rdd.map(lambda x: (x['ARRIVAL_DELAY'],x))
    return  kv.partitionBy(4,key_type_check)
flights_rdd_partition = hash_partition(flights_rdd)

In [17]:
flights_rdd_partition.take(2)

[(-4.0,
  Row(YEAR=2015, MONTH=11, DAY=1, DAY_OF_WEEK=7, AIRLINE='WN', FLIGHT_NUMBER=2081, TAIL_NUMBER='N461WN', ORIFIN_AIRPORT='PDX', DESTINATION_AIRPORT='LAS', SHEDULED_DEPARTURE='1345', DEPARTURE_TIME='1346', DEPARTURE_DELAY=1.0, TAXI_OUR=9.0, WHEELS_OFF='1355', SCHEDULED_TIME='120', ELAPSED_TIME=115.0, AIR_TIME=97.0, DISTANCE=763.0, WHEELS_ON='1532', TAXI_IN=9.0, SHEDULED_ARRIVAL='1545', ARRIVAL_TIME='1541', ARRIVAL_DELAY=-4.0, DIVERTED='0', CANCELLED='0', CANCELLATION_REASON='', AIR_SYSTEM_DELAY='', SECURITY_DELAY='', AIRLINE_DELAY='', LATE_AIRCRAFT_DELAY='', WEATHER_DELAY='')),
 (40.0,
  Row(YEAR=2015, MONTH=4, DAY=8, DAY_OF_WEEK=3, AIRLINE='WN', FLIGHT_NUMBER=1170, TAIL_NUMBER='N408WN', ORIFIN_AIRPORT='MDW', DESTINATION_AIRPORT='PHL', SHEDULED_DEPARTURE='1700', DEPARTURE_TIME='1725', DEPARTURE_DELAY=25.0, TAXI_OUR=32.0, WHEELS_OFF='1757', SCHEDULED_TIME='120', ELAPSED_TIME=135.0, AIR_TIME=94.0, DISTANCE=668.0, WHEELS_ON='2031', TAXI_IN=9.0, SHEDULED_ARRIVAL='2000', ARRIVAL_TIME=

### 1.2.4 Display the records in each partition <a class="anchor" id="1.2.4"></a>
[Back to top](#table)

In [18]:
for p in flights_rdd_partition.glom().collect():
    print(len(p))

143370
143070
142335
142954


- Number of partition will influence the records in each partition. According to my test, partition by 4 parts will get a better result. It will have probably equal number of data in each partition. Other number will cause at most one thouand difference from each other.

- The reason probably because of the unblance of the data distribution. ARRIVAL_DELAY is from -82 to 1665,but we have much more data to do partition. If ARRIVAL_DELAY have a high frequence on some same number, it probably caused more records in same partition.

- When we do int(), actuall, -1 is 1. So we also made some same number here. But it won't influence a lot.

## 1.3 Query RDD  <a class="anchor" id="1.3"></a>
### 1.3.1 Collect a total number of flights for each month <a class="anchor" id="1.3.1"></a>
[Back to top](#table)

In [103]:
flights_rdd.map(lambda x:(x['MONTH'],1))\
           .reduceByKey(lambda x,y:x+y).collect()

[(1, 45900),
 (2, 40684),
 (3, 49580),
 (4, 48221),
 (5, 48977),
 (6, 49158),
 (7, 51415),
 (8, 49866),
 (9, 46459),
 (10, 48357),
 (11, 46203),
 (12, 46909)]

Here are flights number without cancled flights

### 1.3.2 Collect the average delay for each month <a class="anchor" id="1.3.2"></a>
[Back to top](#table)

In [104]:
flights_rdd.map(lambda x:(x['MONTH'],(x['ARRIVAL_DELAY'],1)))\
           .reduceByKey(lambda x,y:(x[0]+y[0],x[1]+y[1]))\
           .mapValues(lambda x:x[0]/x[1]).collect()

[(1, 5.804357298474946),
 (2, 8.123906203913085),
 (3, 5.011173860427592),
 (4, 3.173803944339603),
 (5, 4.7121097658084405),
 (6, 9.747630090727856),
 (7, 6.786093552465234),
 (8, 4.713893233866763),
 (9, -0.8498676252179341),
 (10, -0.541989784312509),
 (11, 0.8313745860658399),
 (12, 6.15837046195826)]

# 2 Working with DataFrame <a class="anchor" id="2"></a>
## 2.1. Data Preparation and Loading <a class="anchor" id="2.1"></a>
### 2.1.1 Define dataframes and loading scheme<a class="anchor" id="2.1.1"></a>
[Back to top](#table)

In [66]:
flightsDf = spark.read.format('csv').option("header",True)\
                                   .option("inferSchema", True)\
                                   .load('flight-delays/flight*.csv')

airportsDf = spark.read.format('csv').option("header",True)\
                                   .option("inferSchema", True)\
                                   .load('flight-delays/airports.csv')

### 2.1.2 Display the schema of the final two dataframes<a class="anchor" id="2.1.2"></a>
[Back to top](#table)

In [67]:
flightsDf.printSchema()
airportsDf.printSchema()

root
 |-- YEAR: integer (nullable = true)
 |-- MONTH: integer (nullable = true)
 |-- DAY: integer (nullable = true)
 |-- DAY_OF_WEEK: integer (nullable = true)
 |-- AIRLINE: string (nullable = true)
 |-- FLIGHT_NUMBER: integer (nullable = true)
 |-- TAIL_NUMBER: string (nullable = true)
 |-- ORIGIN_AIRPORT: string (nullable = true)
 |-- DESTINATION_AIRPORT: string (nullable = true)
 |-- SCHEDULED_DEPARTURE: integer (nullable = true)
 |-- DEPARTURE_TIME: integer (nullable = true)
 |-- DEPARTURE_DELAY: integer (nullable = true)
 |-- TAXI_OUT: integer (nullable = true)
 |-- WHEELS_OFF: integer (nullable = true)
 |-- SCHEDULED_TIME: integer (nullable = true)
 |-- ELAPSED_TIME: integer (nullable = true)
 |-- AIR_TIME: integer (nullable = true)
 |-- DISTANCE: integer (nullable = true)
 |-- WHEELS_ON: integer (nullable = true)
 |-- TAXI_IN: integer (nullable = true)
 |-- SCHEDULED_ARRIVAL: integer (nullable = true)
 |-- ARRIVAL_TIME: integer (nullable = true)
 |-- ARRIVAL_DELAY: integer (null

## 2.2. Query Analysis <a class="anchor" id="2.2"></a>
### 2.2.1 January flight events with ANC airport <a class="anchor" id="2.2.1"></a>
[Back to top](#table)

In [68]:
janFlightEventsAncDf = flightsDf.select('MONTH','ORIGIN_AIRPORT','DESTINATION_AIRPORT','DISTANCE','ARRIVAL_DELAY')\
                                .filter(flightsDf.MONTH==1)\
                                .filter(flightsDf.YEAR==2015)\
                                .filter(flightsDf.ORIGIN_AIRPORT=='ANC')\
                                
janFlightEventsAncDf.show()                       

+-----+--------------+-------------------+--------+-------------+
|MONTH|ORIGIN_AIRPORT|DESTINATION_AIRPORT|DISTANCE|ARRIVAL_DELAY|
+-----+--------------+-------------------+--------+-------------+
|    1|           ANC|                SEA|    1448|          -13|
|    1|           ANC|                SEA|    1448|           -4|
|    1|           ANC|                JNU|     571|           17|
|    1|           ANC|                CDV|     160|           20|
|    1|           ANC|                BET|     399|          -20|
|    1|           ANC|                SEA|    1448|          -15|
|    1|           ANC|                SEA|    1448|          -11|
|    1|           ANC|                ADQ|     253|          -16|
|    1|           ANC|                SEA|    1448|           17|
|    1|           ANC|                BET|     399|           -9|
|    1|           ANC|                SEA|    1448|           15|
|    1|           ANC|                FAI|     261|           -6|
|    1|   

### 2.2.2 Average Arrival Delay From Origin to Destination <a class="anchor" id="2.2.2"></a>
[Back to top](#table)

In [69]:
import pyspark.sql.functions as F
janFlightEventsAncAvgDf = janFlightEventsAncDf.groupBy(['ORIGIN_AIRPORT','DESTINATION_AIRPORT'])\
                                           .agg(F.avg('ARRIVAL_DELAY').alias('AVERAGE_DELAY'))\
                                           .orderBy('AVERAGE_DELAY')
janFlightEventsAncAvgDf.show()                                           

+--------------+-------------------+-------------------+
|ORIGIN_AIRPORT|DESTINATION_AIRPORT|      AVERAGE_DELAY|
+--------------+-------------------+-------------------+
|           ANC|                ADK|              -27.0|
|           ANC|                HNL|              -20.0|
|           ANC|                MSP|             -19.25|
|           ANC|                BET| -9.090909090909092|
|           ANC|                SEA| -6.490196078431373|
|           ANC|                BRW| -4.333333333333333|
|           ANC|                OME|               -3.0|
|           ANC|                ADQ|-2.6666666666666665|
|           ANC|                CDV|                1.0|
|           ANC|                OTZ|               1.25|
|           ANC|                PHX|                2.0|
|           ANC|                DEN| 3.3333333333333335|
|           ANC|                PDX|                3.5|
|           ANC|                JNU|                5.0|
|           ANC|               

### 2.2.3 Join Query with Airports DataFrame <a class="anchor" id="2.2.3"></a>
[Back to top](#table)

In [70]:
joinedSqlDf = janFlightEventsAncAvgDf.join(airportsDf,janFlightEventsAncAvgDf.ORIGIN_AIRPORT== airportsDf.IATA_CODE)

joinedSqlDf.show()

+--------------+-------------------+-------------------+---------+--------------------+---------+-----+-------+--------+----------+
|ORIGIN_AIRPORT|DESTINATION_AIRPORT|      AVERAGE_DELAY|IATA_CODE|             AIRPORT|     CITY|STATE|COUNTRY|LATITUDE| LONGITUDE|
+--------------+-------------------+-------------------+---------+--------------------+---------+-----+-------+--------+----------+
|           ANC|                BRW| -4.333333333333333|      ANC|Ted Stevens Ancho...|Anchorage|   AK|    USA|61.17432|-149.99619|
|           ANC|                ADK|              -27.0|      ANC|Ted Stevens Ancho...|Anchorage|   AK|    USA|61.17432|-149.99619|
|           ANC|                OME|               -3.0|      ANC|Ted Stevens Ancho...|Anchorage|   AK|    USA|61.17432|-149.99619|
|           ANC|                JNU|                5.0|      ANC|Ted Stevens Ancho...|Anchorage|   AK|    USA|61.17432|-149.99619|
|           ANC|                LAS|                9.0|      ANC|Ted Steven

## 2.3. Analysis <a class="anchor" id="2.3"></a>
### 2.3.1 Relationship between day of week with mean arrival delay, total time delay, and count flights <a class="anchor" id="2.3.1"></a>
[Back to top](#table)

In [81]:
flightsDf.createOrReplaceTempView('sql_flights')
delay_vs_no_of_flights = spark.sql('''
    SELECT DAY_OF_WEEK,
        AVG(ARRIVAL_DELAY) AS MeanArrivalDelay,
        SUM(ARRIVAL_DELAY) AS TotalTimeDelay,
        COUNT(*) AS NumberOfFlights
    FROM sql_flights
    WHERE YEAR =2015
    GROUP BY DAY_OF_WEEK
    ORDER BY COUNT(*) DESC
            
''')
delay_vs_no_of_flights.show()

+-----------+------------------+--------------+---------------+
|DAY_OF_WEEK|  MeanArrivalDelay|TotalTimeDelay|NumberOfFlights|
+-----------+------------------+--------------+---------------+
|          4| 5.684831897201573|        490186|          87683|
|          1| 5.883000999381335|        494478|          86317|
|          5| 4.715112525093624|        401638|          86253|
|          3|3.9745505431431147|        335150|          85607|
|          2| 4.391518272706391|        363262|          84449|
|          7| 4.299206488272548|        343498|          81422|
|          6| 1.813841449342257|        125750|          70453|
+-----------+------------------+--------------+---------------+



Most heavy delay always happened on Thursday and monday. According to dataframe above, we can say the reason probably is because on Thursday and Monday, there are most number of flights. Saturday has lowest arrival delay.

### 2.3.2 Display mean arrival delay each month <a class="anchor" id="2.3.2"></a>
[Back to top](#table)

In [85]:
delay_each_month = spark.sql('''
SELECT MONTH,
    AVG(ARRIVAL_DELAY) AS MeanArrivalDelay,
    SUM(ARRIVAL_DELAY) AS TotalTimeDelay,
    COUNT(*) AS NumberOfFlights
    FROM sql_flights
    GROUP BY MONTH
    ORDER BY MeanArrivalDelay
''')
delay_each_month.show()

+-----+-------------------+--------------+---------------+
|MONTH|   MeanArrivalDelay|TotalTimeDelay|NumberOfFlights|
+-----+-------------------+--------------+---------------+
|    9|-0.8498676252179341|        -39484|          46733|
|   10| -0.541989784312509|        -26209|          48680|
|   11| 0.8313745860658399|         38412|          46809|
|    4|  3.173803944339603|        153044|          48810|
|    5| 4.7121097658084405|        230785|          49691|
|    8|  4.713893233866763|        235063|          50524|
|    3|  5.011173860427592|        248454|          50816|
|    1|  5.804357298474946|        266420|          47136|
|   12|   6.15837046195826|        288883|          47866|
|    7|  6.786093552465234|        348907|          52065|
|    2|  8.123906203913085|        330513|          42798|
|    6|  9.747630090727856|        479174|          50256|
+-----+-------------------+--------------+---------------+



- Feb and June have highest arrival delay time. But we can't say the reason is that more flights are in Feb and June. The pressure for airport in Sep and Oct is low. Even they also have a lot of flights.
- Autumn has lowest delay.

### 2.3.3 Relationship between mean departure delay and mean arrival delay <a class="anchor" id="2.3.3"></a>
[Back to top](#table)

In [92]:
dept_and_arr_delay = spark.sql('''
SELECT MONTH,
    AVG(DEPARTURE_DELAY) AS MeanDeptDelay,
    AVG(ARRIVAL_DELAY) AS MeanArrivalDelay
    FROM sql_flights
    GROUP BY MONTH
    ORDER BY MeanDeptDelay DESC
''')
dept_and_arr_delay.show()

+-----+------------------+-------------------+
|MONTH|     MeanDeptDelay|   MeanArrivalDelay|
+-----+------------------+-------------------+
|    6|  13.9730063585922|  9.747630090727856|
|   12|11.821651454043728|   6.15837046195826|
|    7|11.708608758020432|  6.786093552465234|
|    2|11.620796080832823|  8.123906203913085|
|    8|10.086906141367324|  4.713893233866763|
|    1|  9.75401499511029|  5.804357298474946|
|    3| 9.718308159530178|  5.011173860427592|
|    5| 9.550310180006102| 4.7121097658084405|
|    4| 7.737554783759199|  3.173803944339603|
|   11| 6.630585898709037| 0.8313745860658399|
|   10| 5.243436261558784| -0.541989784312509|
|    9| 4.728506981740065|-0.8498676252179341|
+-----+------------------+-------------------+



- More departure delay will cause more arrival delay
- June, Dec and July have more delay both on departure and arrival.
- Sep,Oct,Nov,we could say autumn has lowest pressure on delay for airports.

# 3 RDDs vs DataFrame vs Spark SQL <a class="anchor" id="3"></a>


Implement the following queries using RDDs, DataFrames and SparkSQL separately. Log the time taken for each query in each approach using the “%%time” built-in magic command in Jupyter Notebook and discuss the performance difference of these 3 approaches.

<strong>Find the MONTH and DAY_OF_WEEK, number of flights, and average delay where TAIL_NUMBER = ‘N407AS’. Note number of flights and average delay should be aggregated separately. The average delay should be grouped by both MONTH and DAYS_OF_WEEK.</strong>

## 3.1 RDD Operation<a class="anchor" id="3.1"></a>
[Back to top](#table)

In [134]:
%%time
flights_rdd.filter(lambda x: x['TAIL_NUMBER'] == 'N407AS')\
          .map(lambda x:((x['MONTH'],x['DAY_OF_WEEK']),(1,x['DEPARTURE_DELAY'])))\
          .reduceByKey(lambda x,y:(x[0]+y[0],x[1]+y[1]))\
          .map(lambda kv:Row(MONTH=kv[0][0],DAY_OF_WEEK=kv[0][1],NumOfFlights=kv[1][0],AvgDeptDelay=kv[1][1]/kv[1][0]))\
          .collect()

CPU times: user 21.5 ms, sys: 4.27 ms, total: 25.8 ms
Wall time: 4.31 s


[Row(MONTH=11, DAY_OF_WEEK=7, NumOfFlights=3, AvgDeptDelay=-5.0),
 Row(MONTH=3, DAY_OF_WEEK=3, NumOfFlights=1, AvgDeptDelay=28.0),
 Row(MONTH=4, DAY_OF_WEEK=6, NumOfFlights=1, AvgDeptDelay=1.0),
 Row(MONTH=12, DAY_OF_WEEK=2, NumOfFlights=2, AvgDeptDelay=1.0),
 Row(MONTH=5, DAY_OF_WEEK=5, NumOfFlights=1, AvgDeptDelay=17.0),
 Row(MONTH=6, DAY_OF_WEEK=3, NumOfFlights=3, AvgDeptDelay=-5.0),
 Row(MONTH=7, DAY_OF_WEEK=4, NumOfFlights=2, AvgDeptDelay=-2.0),
 Row(MONTH=8, DAY_OF_WEEK=1, NumOfFlights=2, AvgDeptDelay=-14.0),
 Row(MONTH=9, DAY_OF_WEEK=2, NumOfFlights=1, AvgDeptDelay=8.0),
 Row(MONTH=6, DAY_OF_WEEK=2, NumOfFlights=1, AvgDeptDelay=33.0),
 Row(MONTH=9, DAY_OF_WEEK=3, NumOfFlights=5, AvgDeptDelay=-5.2),
 Row(MONTH=7, DAY_OF_WEEK=5, NumOfFlights=1, AvgDeptDelay=9.0),
 Row(MONTH=8, DAY_OF_WEEK=7, NumOfFlights=2, AvgDeptDelay=66.5),
 Row(MONTH=10, DAY_OF_WEEK=1, NumOfFlights=2, AvgDeptDelay=12.5),
 Row(MONTH=9, DAY_OF_WEEK=4, NumOfFlights=3, AvgDeptDelay=-8.333333333333334),
 Row(MONTH=

## 3.2 DataFrame Operation<a class="anchor" id="3.2"></a>
[Back to top](#table)

In [135]:
%%time
flightsDf.filter(flightsDf.TAIL_NUMBER == 'N407AS')\
          .groupBy(['MONTH','DAY_OF_WEEK'])\
          .agg(F.count('DEPARTURE_DELAY').alias('NumofFlights'),F.avg('DEPARTURE_DELAY').alias('AvgDeptDelay'))\
          .show()

+-----+-----------+------------+------------------+
|MONTH|DAY_OF_WEEK|NumofFlights|      AvgDeptDelay|
+-----+-----------+------------+------------------+
|    6|          1|           4|               4.5|
|    3|          1|           1|              40.0|
|    7|          4|           2|              -2.0|
|    2|          2|           2|              -3.5|
|    9|          4|           3|-8.333333333333334|
|   12|          7|           2|              -2.0|
|    8|          3|           1|               2.0|
|    4|          7|           1|              -8.0|
|    2|          3|           2|             -12.5|
|    7|          1|           1|              -3.0|
|   12|          2|           2|               1.0|
|    1|          2|           2|              12.5|
|   11|          1|           1|              57.0|
|    9|          1|           2|              -5.5|
|    5|          7|           3| 4.666666666666667|
|    5|          6|           2|               0.5|
|   12|     

## 3.3 Spark SQL OPERATION<a class="anchor" id="3.3"></a>
[Back to top](#table)

In [136]:
%%time
flightsDf.createOrReplaceTempView('sql_flights')
Spark_op = spark.sql('''
    SELECT MONTH, DAY_OF_WEEK,
        COUNT(*) AS NumberOfFlights,
        AVG(DEPARTURE_DELAY) AS MeanDeptDelay 
    FROM sql_flights
    GROUP BY MONTH,DAY_OF_WEEK

            
''')
Spark_op.show()

+-----+-----------+---------------+------------------+
|MONTH|DAY_OF_WEEK|NumberOfFlights|     MeanDeptDelay|
+-----+-----------+---------------+------------------+
|   10|          2|           6406|1.5475445730372224|
|    6|          1|           8611|15.902471209213052|
|    3|          1|           8357|11.533674339300937|
|    7|          4|           8824|12.438158497480531|
|    2|          2|           6204|  11.3463986599665|
|    9|          4|           6623| 8.057355849688118|
|   12|          7|           6333|  13.8415681192289|
|    8|          3|           6723| 7.469204255956841|
|    1|          7|           6043|15.087175188600167|
|    2|          3|           6277|  8.78494271685761|
|   11|          6|           5426| 6.624717407686511|
|    7|          1|           6912|12.489106594531364|
|    4|          7|           6284| 5.053471667996807|
|   12|          2|           7889|13.592482562645312|
|    1|          2|           6087| 8.703690807799443|
|   11|   

## 3.4 Discussion<a class="anchor" id="3.4"></a>
[Back to top](#table)

- For this task,DataFrame operation and Spark Sql operation are much more quicker than RDD operation(974ms,968ms vs 4.31s)

- DataFrame operation and Spark Sql are probably the same.

- According to a simple search on the Internet. This is because DataFrame operation and Spark Sql are different from RDD. SPARK SQL and spark dataframe are essentially the same. This is because SPARK SQL is designed for those who want to use spark but are only familiar with sql language. In addition, both DataFrame operation and Spark Sql are optimized by calalyst(https://databricks.com/glossary/catalyst-optimizer). After using calalyst optimization, their processing speed is faster than RDD.
- Of course, this speed is only reflected in structured data. We know that dataframe is structured data. But RDD can also handle semi-structured data items like Mongle DB, json, etc. RDD is more versatile.