### Spark Model

In [244]:
#sc.stop()

In [245]:
import findspark
findspark.init()

In [246]:
from pyspark import SparkContext
sc = SparkContext(master="local[4]")
sc

In [247]:
tmp = sc.textFile('2007.csv')

In [248]:
flights = tmp.sample(False, 1000)

In [249]:
carriers = sc.textFile('carriers.csv')

In [250]:
airports = sc.textFile('airports.csv')

In [251]:
planedata = sc.textFile('planedata.csv')

In [252]:
# Helper method to remove header from rdd

def remove_header(rdd):
    rdd_header = rdd.first()
    header = sc.parallelize([rdd_header])
    rdd_out = rdd.subtract(header)
    return rdd_out, str(rdd_header)

In [253]:
flights_rdd, flights_header = remove_header(flights)

In [254]:
#Printing the header for the Flights RDD
flights_header

'Year,Month,DayofMonth,DayOfWeek,DepTime,CRSDepTime,ArrTime,CRSArrTime,UniqueCarrier,FlightNum,TailNum,ActualElapsedTime,CRSElapsedTime,AirTime,ArrDelay,DepDelay,Origin,Dest,Distance,TaxiIn,TaxiOut,Cancelled,CancellationCode,Diverted,CarrierDelay,WeatherDelay,NASDelay,SecurityDelay,LateAircraftDelay'

In [255]:
#Printing the first ten rows for the flights RDD
flights_rdd.take(10)

['2007,1,1,1,831,830,957,1000,WN,2278,N480,86,90,74,-3,1,SMF,PDX,479,3,9,0,,0,0,0,0,0,0',
 '2007,1,1,1,1630,1630,1746,1750,WN,1146,N707SA,76,80,60,-4,0,SMF,SNA,404,6,10,0,,0,0,0,0,0,0',
 '2007,1,1,1,1439,1440,1607,1605,WN,728,N707SA,88,85,70,2,-1,SNA,SMF,404,3,15,0,,0,0,0,0,0,0',
 '2007,1,1,1,2050,2050,2338,2335,WN,675,N637SW,108,105,97,3,0,STL,BWI,737,4,7,0,,0,0,0,0,0,0',
 '2007,1,1,1,959,1000,1222,1225,WN,1477,N390SW,83,85,70,-3,-1,STL,CLE,487,5,8,0,,0,0,0,0,0,0',
 '2007,1,1,1,2116,2110,2241,2250,WN,82,N270,85,100,77,-9,6,STL,DAL,546,2,6,0,,0,0,0,0,0,0',
 '2007,1,1,1,1657,1700,1920,1925,WN,1880,N637SW,83,85,69,-5,-3,STL,DTW,440,7,7,0,,0,0,0,0,0,0',
 '2007,1,1,1,1358,1350,1602,1600,WN,144,N619SW,124,130,108,2,8,TPA,BWI,842,4,12,0,,0,0,0,0,0,0',
 '2007,1,1,1,816,815,906,910,WN,11,N624SW,50,55,41,-4,1,TUL,DAL,237,2,7,0,,0,0,0,0,0,0',
 '2007,1,2,2,1605,1540,1707,1650,WN,2014,N464,62,70,50,17,25,ABQ,PHX,328,4,8,0,,0,2,0,0,0,15']

In [256]:
#Separating RDD and headers for remaining files
carriers_rdd, carriers_header = remove_header(carriers)
airports_rdd, airports_header = remove_header(airports)
planedata_rdd, planedata_header = remove_header(planedata)

In [257]:
#INSERT A COUPLE RDD OPERATIONS HERE TO DEMO RDD

In [258]:
from pyspark.sql import SQLContext
from pyspark.sql.types import Row, StructField, StructType, StringType, IntegerType

In [260]:
sqlContext = SQLContext(sc)
sqlContext

<pyspark.sql.context.SQLContext at 0x112bf5400>

In [261]:
flights_header

'Year,Month,DayofMonth,DayOfWeek,DepTime,CRSDepTime,ArrTime,CRSArrTime,UniqueCarrier,FlightNum,TailNum,ActualElapsedTime,CRSElapsedTime,AirTime,ArrDelay,DepDelay,Origin,Dest,Distance,TaxiIn,TaxiOut,Cancelled,CancellationCode,Diverted,CarrierDelay,WeatherDelay,NASDelay,SecurityDelay,LateAircraftDelay'

In [262]:
carriers_header

'Code,Description'

In [263]:
carriers_rdd.take(2)

['"0CQ","ACM AIR CHARTER GmbH"',
 '"0FQ","Maine Aviation Aircraft Charter, LLC"']

In [264]:
carriers_schema = StructType([StructField("Code", StringType(), True),
                             StructField("Description", StringType(), True)])

In [265]:
airports_header

'"iata","airport","city","state","country","lat","long"'

In [266]:
airports_rdd.take(2)

['"00M","Thigpen ","Bay Springs","MS","USA",31.95376472,-89.23450472',
 '"00V","Meadow Lake","Colorado Springs","CO","USA",38.94574889,-104.5698933']

In [267]:
airports_schema = StructType([StructField("Iata", StringType(), True),
                             StructField("Airport", StringType(), True),
                             StructField("City", StringType(), True),
                             StructField("State", StringType(), True),
                             StructField("Country", StringType(), True), 
                             StructField("Lat", IntegerType(), True),
                             StructField("Long", IntegerType(), True)])

In [268]:
planedata_header

'tailnum,type,manufacturer,issue_date,model,status,aircraft_type,engine_type,year'

In [269]:
planedata_rdd.take(3)

['N051AA', 'N057AA', 'N065AA']

In [270]:
planedata_schema = StructType([StructField("TailNum", StringType(), True),
                             StructField("PlaneType", StringType(), True),
                             StructField("PlaneManufacturer", StringType(), True),
                             StructField("PlaneIssueDate", StringType(), True),
                             StructField("PlaneModel", StringType(), True),
                             StructField("PlaneStatus", StringType(), True),
                             StructField("PlaneAircraftType", StringType(), True),
                             StructField("PlaneEngineType", StringType(), True),
                             StructField("PlaneYear", IntegerType(), True)
                             ])

Transforming RDDs into Spark Dataframes

In [271]:
carriers_rdd.map(lambda s: s.replace(',','').split('\""')).take(10)

[['"0CQ', 'ACM AIR CHARTER GmbH"'],
 ['"0FQ', 'Maine Aviation Aircraft Charter LLC"'],
 ['"0JQ', 'Vision Airlines"'],
 ['"0KQ', 'Mokulele Flight Services Inc."'],
 ['"3F', 'Pacific Airways Inc."'],
 ['"4E', 'Tanana Air Service"'],
 ['"4H', 'Belize Trans Air"'],
 ['"4S', 'Sol Air (Aero Hunduras)"'],
 ['"5G', 'Skyservice Airlines Inc."'],
 ['"5Y', 'Atlas Air Inc."']]

In [272]:
carriers_df = sqlContext.createDataFrame(
    carriers_rdd.map(lambda s: s.replace(',','')
                                        .split('\""'))
                                        .map(lambda row: (row[0].replace('\"', ''),
                                                          row[1].replace('\"', ''))), 
                                                                carriers_schema)
carriers_df.printSchema()

root
 |-- Code: string (nullable = true)
 |-- Description: string (nullable = true)



In [273]:
carriers_df.show(5)

+----+--------------------+
|Code|         Description|
+----+--------------------+
| 0CQ|ACM AIR CHARTER GmbH|
| 0FQ|Maine Aviation Ai...|
| 0JQ|     Vision Airlines|
| 0KQ|Mokulele Flight S...|
|  3F|Pacific Airways Inc.|
+----+--------------------+
only showing top 5 rows



In [274]:
from pyspark.sql.functions import substring, length

In [275]:
#carriers_df = carriers_df.select(carriers_df.Code.substr(2,4).alias('Code'), 
#                                 carriers_df.Description.alias('Description'))

In [276]:
#Creating df for airplanes:
flights_df = sqlContext.createDataFrame(flights_rdd.map(lambda s: s.split(',')), flights_header.split(','))

In [277]:
flights_df.printSchema()

root
 |-- Year: string (nullable = true)
 |-- Month: string (nullable = true)
 |-- DayofMonth: string (nullable = true)
 |-- DayOfWeek: string (nullable = true)
 |-- DepTime: string (nullable = true)
 |-- CRSDepTime: string (nullable = true)
 |-- ArrTime: string (nullable = true)
 |-- CRSArrTime: string (nullable = true)
 |-- UniqueCarrier: string (nullable = true)
 |-- FlightNum: string (nullable = true)
 |-- TailNum: string (nullable = true)
 |-- ActualElapsedTime: string (nullable = true)
 |-- CRSElapsedTime: string (nullable = true)
 |-- AirTime: string (nullable = true)
 |-- ArrDelay: string (nullable = true)
 |-- DepDelay: string (nullable = true)
 |-- Origin: string (nullable = true)
 |-- Dest: string (nullable = true)
 |-- Distance: string (nullable = true)
 |-- TaxiIn: string (nullable = true)
 |-- TaxiOut: string (nullable = true)
 |-- Cancelled: string (nullable = true)
 |-- CancellationCode: string (nullable = true)
 |-- Diverted: string (nullable = true)
 |-- CarrierDelay:

In [278]:
#Next we will assign Int types to integer columns:
#First we define columns we would like to maintain type String:
st_list = ['TailNum', 'CancellationCode', 'UniqueCarrier', 'Origin', 'Dest']

#Then we remove those from the list of all columns we collected from the helper method remove_header:
int_list = [i for i in flights_header.split(',') if i not in st_list]

#Finally we use cast to modify data types:

from pyspark.sql.functions import col

for col_name in int_list:
    flights_df = flights_df.withColumn(col_name, col(col_name).cast('int'))

In [279]:
flights_df.printSchema()

root
 |-- Year: integer (nullable = true)
 |-- Month: integer (nullable = true)
 |-- DayofMonth: integer (nullable = true)
 |-- DayOfWeek: integer (nullable = true)
 |-- DepTime: integer (nullable = true)
 |-- CRSDepTime: integer (nullable = true)
 |-- ArrTime: integer (nullable = true)
 |-- CRSArrTime: integer (nullable = true)
 |-- UniqueCarrier: string (nullable = true)
 |-- FlightNum: integer (nullable = true)
 |-- TailNum: string (nullable = true)
 |-- ActualElapsedTime: integer (nullable = true)
 |-- CRSElapsedTime: integer (nullable = true)
 |-- AirTime: integer (nullable = true)
 |-- ArrDelay: integer (nullable = true)
 |-- DepDelay: integer (nullable = true)
 |-- Origin: string (nullable = true)
 |-- Dest: string (nullable = true)
 |-- Distance: integer (nullable = true)
 |-- TaxiIn: integer (nullable = true)
 |-- TaxiOut: integer (nullable = true)
 |-- Cancelled: integer (nullable = true)
 |-- CancellationCode: string (nullable = true)
 |-- Diverted: integer (nullable = true)

In [280]:
flights_df = flights_df.drop("Year", "country_Origin", "country_Dest")

Additional Files: Airplanes and Airports

In [281]:
#Alternative way to load data - directly into PySpark DF
planedata_df = sqlContext.read.csv('planedata.csv', inferSchema=True, header=True)
planedata_df.show(10)

+-------+----+------------+----------+-----+------+-------------+-----------+----+
|tailnum|type|manufacturer|issue_date|model|status|aircraft_type|engine_type|year|
+-------+----+------------+----------+-----+------+-------------+-----------+----+
| N050AA|null|        null|      null| null|  null|         null|       null|null|
| N051AA|null|        null|      null| null|  null|         null|       null|null|
| N052AA|null|        null|      null| null|  null|         null|       null|null|
| N054AA|null|        null|      null| null|  null|         null|       null|null|
| N055AA|null|        null|      null| null|  null|         null|       null|null|
| N056AA|null|        null|      null| null|  null|         null|       null|null|
| N057AA|null|        null|      null| null|  null|         null|       null|null|
| N058AA|null|        null|      null| null|  null|         null|       null|null|
| N059AA|null|        null|      null| null|  null|         null|       null|null|
| N0

In [282]:
airports_df = sqlContext.read.csv('airports.csv', inferSchema=True, header=True)
airports_df.show(10)

+----+--------------------+----------------+-----+-------+-----------+------------+
|iata|             airport|            city|state|country|        lat|        long|
+----+--------------------+----------------+-----+-------+-----------+------------+
| 00M|            Thigpen |     Bay Springs|   MS|    USA|31.95376472|-89.23450472|
| 00R|Livingston Municipal|      Livingston|   TX|    USA|30.68586111|-95.01792778|
| 00V|         Meadow Lake|Colorado Springs|   CO|    USA|38.94574889|-104.5698933|
| 01G|        Perry-Warsaw|           Perry|   NY|    USA|42.74134667|-78.05208056|
| 01J|    Hilliard Airpark|        Hilliard|   FL|    USA| 30.6880125|-81.90594389|
| 01M|   Tishomingo County|         Belmont|   MS|    USA|34.49166667|-88.20111111|
| 02A|         Gragg-Wade |         Clanton|   AL|    USA|32.85048667|-86.61145333|
| 02C|             Capitol|      Brookfield|   WI|    USA|   43.08751|-88.17786917|
| 02G|   Columbiana County|  East Liverpool|   OH|    USA|40.67331278|-80.64

Data Exploration with Spark - Performing Analytics

In [283]:
#Step 1 - Merging into a master df
#Step 2 - Running some queries to extract insights

In [None]:
#We will have to join the dataframes. Let us first explore what we are dealing with:
print('Dataframe Schema for Airports')
airports_df.printSchema()

Dataframe Schema for Airports
root
 |-- iata: string (nullable = true)
 |-- airport: string (nullable = true)
 |-- city: string (nullable = true)
 |-- state: string (nullable = true)
 |-- country: string (nullable = true)
 |-- lat: double (nullable = true)
 |-- long: double (nullable = true)



In [None]:
print('Dataframe Schema for Flights')
flights_df.printSchema()

Dataframe Schema for Flights
root
 |-- Month: integer (nullable = true)
 |-- DayofMonth: integer (nullable = true)
 |-- DayOfWeek: integer (nullable = true)
 |-- DepTime: integer (nullable = true)
 |-- CRSDepTime: integer (nullable = true)
 |-- ArrTime: integer (nullable = true)
 |-- CRSArrTime: integer (nullable = true)
 |-- UniqueCarrier: string (nullable = true)
 |-- FlightNum: integer (nullable = true)
 |-- TailNum: string (nullable = true)
 |-- ActualElapsedTime: integer (nullable = true)
 |-- CRSElapsedTime: integer (nullable = true)
 |-- AirTime: integer (nullable = true)
 |-- ArrDelay: integer (nullable = true)
 |-- DepDelay: integer (nullable = true)
 |-- Origin: string (nullable = true)
 |-- Dest: string (nullable = true)
 |-- Distance: integer (nullable = true)
 |-- TaxiIn: integer (nullable = true)
 |-- TaxiOut: integer (nullable = true)
 |-- Cancelled: integer (nullable = true)
 |-- CancellationCode: string (nullable = true)
 |-- Diverted: integer (nullable = true)
 |-- Ca

In [None]:
print('Dataframe Schema for Planes')
planedata_df.printSchema()

Dataframe Schema for Planes
root
 |-- tailnum: string (nullable = true)
 |-- type: string (nullable = true)
 |-- manufacturer: string (nullable = true)
 |-- issue_date: string (nullable = true)
 |-- model: string (nullable = true)
 |-- status: string (nullable = true)
 |-- aircraft_type: string (nullable = true)
 |-- engine_type: string (nullable = true)
 |-- year: string (nullable = true)



In [None]:
print('Dataframe Schema for Carriers')
carriers_df.printSchema()

Dataframe Schema for Carriers
root
 |-- Code: string (nullable = true)
 |-- Description: string (nullable = true)



In [None]:
#Joining flights with planedata on tailnum:
join_df = flights_df.join(planedata_df, flights_df.TailNum == planedata_df.tailnum, how='left')

In [None]:
#Next, joining thenew df with carriers on Code:
new_join_df = join_df.join(carriers_df, join_df.UniqueCarrier == carriers_df.Code, how='left')

In [None]:
new_join_df.take(4)

[Row(Month=1, DayofMonth=1, DayOfWeek=1, DepTime=831, CRSDepTime=830, ArrTime=957, CRSArrTime=1000, UniqueCarrier='WN', FlightNum=2278, TailNum='N480', ActualElapsedTime=86, CRSElapsedTime=90, AirTime=74, ArrDelay=-3, DepDelay=1, Origin='SMF', Dest='PDX', Distance=479, TaxiIn=3, TaxiOut=9, Cancelled=0, CancellationCode='', Diverted=0, CarrierDelay=0, WeatherDelay=0, NASDelay=0, SecurityDelay=0, LateAircraftDelay=0, tailnum=None, type=None, manufacturer=None, issue_date=None, model=None, status=None, aircraft_type=None, engine_type=None, year=None, Code='WN', Description='Southwest Airlines Co.'),
 Row(Month=1, DayofMonth=1, DayOfWeek=1, DepTime=1630, CRSDepTime=1630, ArrTime=1746, CRSArrTime=1750, UniqueCarrier='WN', FlightNum=1146, TailNum='N707SA', ActualElapsedTime=76, CRSElapsedTime=80, AirTime=60, ArrDelay=-4, DepDelay=0, Origin='SMF', Dest='SNA', Distance=404, TaxiIn=6, TaxiOut=10, Cancelled=0, CancellationCode='', Diverted=0, CarrierDelay=0, WeatherDelay=0, NASDelay=0, SecurityD

In [None]:
#Finally, we merge twice with the airports dataset. Once for Departing airport, once for Arrival airport:
third = new_join_df.join(airports_df, new_join_df.Origin == airports_df.iata, how='left')

In [None]:
third.take(2)

[Row(Month=1, DayofMonth=1, DayOfWeek=1, DepTime=831, CRSDepTime=830, ArrTime=957, CRSArrTime=1000, UniqueCarrier='WN', FlightNum=2278, TailNum='N480', ActualElapsedTime=86, CRSElapsedTime=90, AirTime=74, ArrDelay=-3, DepDelay=1, Origin='SMF', Dest='PDX', Distance=479, TaxiIn=3, TaxiOut=9, Cancelled=0, CancellationCode='', Diverted=0, CarrierDelay=0, WeatherDelay=0, NASDelay=0, SecurityDelay=0, LateAircraftDelay=0, tailnum=None, type=None, manufacturer=None, issue_date=None, model=None, status=None, aircraft_type=None, engine_type=None, year=None, Code='WN', Description='Southwest Airlines Co.', iata='SMF', airport='Sacramento International', city='Sacramento', state='CA', country='USA', lat=38.69542167, long=-121.5907669),
 Row(Month=1, DayofMonth=1, DayOfWeek=1, DepTime=1630, CRSDepTime=1630, ArrTime=1746, CRSArrTime=1750, UniqueCarrier='WN', FlightNum=1146, TailNum='N707SA', ActualElapsedTime=76, CRSElapsedTime=80, AirTime=60, ArrDelay=-4, DepDelay=0, Origin='SMF', Dest='SNA', Dista

In [None]:
new_cols = [i for i in third.columns]

In [None]:
new_cols = [
 'Month',
 'DayofMonth',
 'DayOfWeek',
 'DepTime',
 'CRSDepTime',
 'ArrTime',
 'CRSArrTime',
 'UniqueCarrier',
 'FlightNum',
 'TailNum',
 'ActualElapsedTime',
 'CRSElapsedTime',
 'AirTime',
 'ArrDelay',
 'DepDelay',
 'Origin',
 'Dest',
 'Distance',
 'TaxiIn',
 'TaxiOut',
 'Cancelled',
 'CancellationCode',
 'Diverted',
 'CarrierDelay',
 'WeatherDelay',
 'NASDelay',
 'SecurityDelay',
 'LateAircraftDelay',
 'tailnum',
 'type',
 'manufacturer',
 'issue_date',
 'model',
 'status',
 'aircraft_type',
 'engine_type',
 'year',
 'Code',
 'Description',
 'iata_Origin',
 'airport_Origin',
 'city_Origin',
 'state_Origin',
 'country_Origin',
 'lat_Origin',
 'long_Origin']

In [None]:
def rename_columns(df, old_cols, new_cols):
    for old_col, new_col in zip(old_cols, new_cols):
        df= df.withColumnRenamed(old_col, new_col)
    return df

In [None]:
third_updated = rename_columns(third, third.columns, new_cols)

In [None]:
fourth = third_updated.join(airports_df, third_updated.Dest == airports_df.iata, how='left')

In [None]:
new_cols = [i for i in fourth.columns]

In [None]:
new_cols = [
 'Month',
 'DayofMonth',
 'DayOfWeek',
 'DepTime',
 'CRSDepTime',
 'ArrTime',
 'CRSArrTime',
 'UniqueCarrier',
 'FlightNum',
 'tailnum',
 'ActualElapsedTime',
 'CRSElapsedTime',
 'AirTime',
 'ArrDelay',
 'DepDelay',
 'Origin',
 'Dest',
 'Distance',
 'TaxiIn',
 'TaxiOut',
 'Cancelled',
 'CancellationCode',
 'Diverted',
 'CarrierDelay',
 'WeatherDelay',
 'NASDelay',
 'SecurityDelay',
 'LateAircraftDelay',
 'tailnum',
 'type',
 'manufacturer',
 'issue_date',
 'model',
 'status',
 'aircraft_type',
 'engine_type',
 'plane_year',
 'Code',
 'Description',
 'iata_Origin',
 'airport_Origin',
 'city_Origin',
 'state_Origin',
 'country_Origin',
 'lat_Origin',
 'long_Origin',
 'iata_Dest',
 'airport_Dest',
 'city_Dest',
 'state_Dest',
 'country_Dest',
 'lat_Dest',
 'long_Dest']

In [None]:
fourth_updated = rename_columns(fourth, fourth.columns, new_cols)

In [None]:
fourth_updated.take(1)

[Row(Month=1, DayofMonth=14, DayOfWeek=7, DepTime=1015, CRSDepTime=1010, ArrTime=1519, CRSArrTime=1530, UniqueCarrier='UA', FlightNum=1, tailnum='N212UA', ActualElapsedTime=544, CRSElapsedTime=560, AirTime=518, ArrDelay=-11, DepDelay=5, Origin='ORD', Dest='HNL', Distance=4243, TaxiIn=3, TaxiOut=23, Cancelled=0, CancellationCode='', Diverted=0, CarrierDelay=0, WeatherDelay=0, NASDelay=0, SecurityDelay=0, LateAircraftDelay=0, tailnum='N212UA', type='Corporation', manufacturer='BOEING', issue_date='07/31/2000', model='777-222', status='Valid', aircraft_type='Fixed Wing Multi-Engine', engine_type='Turbo-Fan', plane_year='2000', Code='UA', Description='United Air Lines Inc.', iata_Origin='ORD', airport_Origin="Chicago O'Hare International", city_Origin='Chicago', state_Origin='IL', country_Origin='USA', lat_Origin=41.979595, long_Origin=-87.90446417, iata_Dest='HNL', airport_Dest='Honolulu International', city_Dest='Honolulu', state_Dest='HI', country_Dest='USA', lat_Dest=21.31869111, long_

In [None]:
fourth_updated.count()

7453215

In [None]:
fourth_updated.printSchema()

root
 |-- Month: integer (nullable = true)
 |-- DayofMonth: integer (nullable = true)
 |-- DayOfWeek: integer (nullable = true)
 |-- DepTime: integer (nullable = true)
 |-- CRSDepTime: integer (nullable = true)
 |-- ArrTime: integer (nullable = true)
 |-- CRSArrTime: integer (nullable = true)
 |-- UniqueCarrier: string (nullable = true)
 |-- FlightNum: integer (nullable = true)
 |-- tailnum: string (nullable = true)
 |-- ActualElapsedTime: integer (nullable = true)
 |-- CRSElapsedTime: integer (nullable = true)
 |-- AirTime: integer (nullable = true)
 |-- ArrDelay: integer (nullable = true)
 |-- DepDelay: integer (nullable = true)
 |-- Origin: string (nullable = true)
 |-- Dest: string (nullable = true)
 |-- Distance: integer (nullable = true)
 |-- TaxiIn: integer (nullable = true)
 |-- TaxiOut: integer (nullable = true)
 |-- Cancelled: integer (nullable = true)
 |-- CancellationCode: string (nullable = true)
 |-- Diverted: integer (nullable = true)
 |-- CarrierDelay: integer (nullable

Analytics with Spark SQL

In [None]:
#Now that we have the final dataframe we can start querying it to obtain some insights:

In [None]:
# Register this DataFrame as a table.
fourth_updated.registerTempTable("master")

In [None]:
# SQL statements can be run by using the sql methods provided by sqlContext
delays_by_engine = sqlContext.sql(
    "SELECT engine_type, SUM(CarrierDelay), SUM(WeatherDelay), SUM(NASDelay), SUM(SecurityDelay),SUM(LateAircraftDelay)  \
     FROM master \
     GROUP BY engine_type")

In [None]:
result1 = delays_by_engine.collect()
#result.toPandas()

In [None]:
delays_by_engine.toPandas()[delays_by_engine.toPandas()['engine_type']!='None']

Unnamed: 0,engine_type,sum(CarrierDelay),sum(WeatherDelay),sum(NASDelay),sum(SecurityDelay),sum(LateAircraftDelay)
1,,3588878,761304,3152224,12446,4286969
2,4 Cycle,12418,3164,15880,21,16683
3,Turbo-Shaft,18717,4063,14399,15,14380
4,Turbo-Fan,15974603,3464986,16244698,105844,22912142
5,Turbo-Prop,1445301,253248,515721,7403,980772
6,Reciprocating,121252,41700,152748,174,187766
7,Turbo-Jet,7634591,1206516,8085555,51002,9585065


In [None]:
avg_delay_by_carrier = sqlContext.sql(
    "SELECT Description, AVG(ArrDelay), AVG(AirTime), AVG(CRSElapsedTime), AVG(ActualElapsedTime)  \
     FROM master \
     GROUP BY Description")

In [None]:
result2 = avg_delay_by_carrier.collect()

In [None]:
avg_delay_by_carrier.toPandas()

Unnamed: 0,Description,avg(ArrDelay),avg(AirTime),avg(CRSElapsedTime),avg(ActualElapsedTime)
0,US Airways Inc. (Merged with America West 9/05...,11.527269,116.944477,142.296284,142.089904
1,Pinnacle Airlines Inc.,8.138337,71.223194,98.74204,97.790544
2,Aloha Airlines Inc.,-1.376896,62.185804,75.813568,73.913615
3,Skywest Airlines Inc.,8.838887,68.75508,89.916295,90.613354
4,American Eagle Airlines Inc.,12.992794,67.674432,90.825297,90.812536
5,United Air Lines Inc.,12.753122,144.69157,170.446041,169.692381
6,Comair Inc.,13.135051,79.218251,106.261807,106.611888
7,Expressjet Airlines Inc.,10.007052,85.350256,112.446327,110.620772
8,Frontier Airlines Inc.,7.431885,122.335277,144.363854,144.39862
9,Southwest Airlines Co.,5.472958,90.05629,110.132786,105.270136


In [None]:
cancellations_by_destination = sqlContext.sql(
    "SELECT city_Dest, iata_Dest, SUM(Cancelled)  \
     FROM master \
     GROUP BY city_Dest, iata_Dest \
     ORDER BY SUM(Cancelled)")

In [None]:
result3 = cancellations_by_destination.collect()

In [None]:
cancellations_by_destination.toPandas()

Unnamed: 0,city_Dest,iata_Dest,sum(Cancelled)
0,Cedar City,CDC,0
1,Pueblo,PUB,0
2,Provo,PVU,0
3,Pierre,PIR,0
4,Cheyenne,CYS,0
5,Lewiston,LWS,0
6,Ogden,OGD,0
7,Greenville,GLH,0
8,Kansas City,MKC,0
9,Scottsbluff,BFF,0


In [None]:
early_by_carrier = sqlContext.sql(
    "SELECT Description, SUM(ArrDelay)  \
     FROM master \
     WHERE (ArrDelay < 0) GROUP BY Description ORDER BY SUM(ArrDelay) ASC ")

In [None]:
result4 = early_by_carrier.collect()

In [None]:
early_by_carrier.toPandas()

Unnamed: 0,Description,sum(ArrDelay)
0,Southwest Airlines Co.,-5783814
1,American Airlines Inc.,-2891332
2,Expressjet Airlines Inc.,-2817869
3,Skywest Airlines Inc.,-2674892
4,United Air Lines Inc.,-2628170
5,Delta Air Lines Inc.,-2495262
6,American Eagle Airlines Inc.,-2488111
7,US Airways Inc. (Merged with America West 9/05...,-2258630
8,Continental Air Lines Inc.,-1948909
9,Mesa Airlines Inc.,-1547762


In [None]:
#, SUM(WeatherDelay), SUM(NASDelay), SUM(SecurityDelay),SUM(LateAircraftDelay)

In [None]:
total_count = sqlContext.sql('SELECT COUNT(*) FROM MASTER')

In [None]:
total_count.collect()

In [None]:
#Adding an ID field to use the FeatureTools4S library
from pyspark.sql.functions import monotonically_increasing_id

In [None]:
df = fourth_updated.withColumn("id", monotonically_increasing_id())

In [None]:
#Removing info just not relevant for the predictive model
#Year, Cancelled, CancellationCode, country_Origin, country_Dest- sure
#potential: cancellations - 

In [None]:
df = df.drop("Cancelled", "CancellationCode")

In [None]:
df = df.na.drop(subset=["ArrDelay"])

In [None]:
#Investigate Nulls:
from pyspark.sql.functions import isnan, when, count, col

In [None]:
df = df.drop('tailnum')

In [None]:
#The df is clean from nulls
df.select([count(when(isnan(c), c)).alias(c) for c in df.columns]).show()

In [None]:
#Dropping attributes that will not be helpful to us e.g. country is always US
df = df.drop('country_Dest', 'country_Origin', 'lat_Origin', 'long_Origin', 'lat_Dest', 'long_Dest', 'Code')

In [None]:
#Removing additional attributes to avoid sparsity:
df_ml = df.drop('type', 'status', 'model',  'engine_type', 'city_Dest', 'state_Origin',
                   'airport_Origin', 'city_Origin', 'iata_Dest', 'iata_Origin', 'state_Dest', 'issue_date', 
                  'Origin', 'Description', 'Origin', 'Dest',  'issue_date', 'id', 'airport_Dest', 'Month', 'DayofMonth')

In [None]:
df_ml.dtypes

In [None]:
#Plane year should be int:
df_ml = df_ml.withColumn('plane_year', col('plane_year').cast('int'))

In [None]:
df_ml = df_ml.withColumn('DayOfWeek', col('DayOfWeek').cast('string'))

In [None]:
df_ml = df_ml.na.drop()

In [None]:
from pyspark.sql.functions import substring
df_ml = df_ml.withColumn("manufacturer", substring(col("manufacturer"), 0, 5))
df_ml = df_ml.withColumn("aircraft_type", substring(col("aircraft_type"), 0,2))

In [None]:
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.regression import LinearRegression
from pyspark.ml.tuning import ParamGridBuilder, TrainValidationSplit
from pyspark.ml.feature import OneHotEncoderEstimator, StringIndexer, VectorAssembler
from pyspark.ml import Pipeline

In [None]:
#Run exploration here

In [None]:
cat_cols = [item[0] for item in df_ml.dtypes if item[1].startswith('string')]
num_cols = [item[0] for item in df_ml.dtypes if item[1].startswith('in')]

In [None]:
cat_cols

In [None]:
def make_pipeline(df):        
    stages= []

    for col in cat_cols:

        stringIndexer = StringIndexer(inputCol = col , outputCol = col + '_StringIndex')
        encoder = OneHotEncoderEstimator(inputCols=[stringIndexer.getOutputCol()], outputCols=[col + '_ClassVect'])
        stages += [stringIndexer, encoder]

    #Assembling mixed data type transformations:
    assemblerInputs = [c + "_ClassVect" for c in cat_cols] + num_cols
    assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")

    stages += [assembler]

    #Creating and running the pipeline:
    pipeline = Pipeline(stages=stages)
    pipelineModel = pipeline.fit(df_model)
    out_df = pipelineModel.transform(df_model)
    
    return out_df

In [None]:
out_df = make_pipeline(df_ml)

Linear Regression to Predict Delays

In [None]:
#Creating Baseline Linear Regression Model:
lr = LinearRegression(maxIter=10, featuresCol="features", labelCol="ArrDelay")

In [None]:
train, test = df_model.randomSplit([0.8, 0.2], seed=1)

In [None]:
#Fitting the model to the training data
lrmodel = lr.fit(train)

In [None]:
#Creating Model Summary to evaluate performance:
trainingSummary = lrmodel.summary

print("Training Set R2: ", trainingSummary.r2)
print("Training Set RMSE: ", trainingSummary.rootMeanSquaredError)

In [None]:
#Evaluating model with the held out test set:
predictions = lrmodel.transform(test)

In [None]:
from pyspark.ml.evaluation import RegressionEvaluator
evaluator = RegressionEvaluator(labelCol = "ArrDelay", predictionCol="prediction", metricName="rmse")

In [None]:
print("Test set R2: " + str(evaluator.evaluate(predictions, {evaluator.metricName: "r2"})))
print("Test set RMSE: " + str(evaluator.evaluate(predictions, {evaluator.metricName: "rmse"})))

These results are very good. Too good to be true?

In [None]:
from pyspark.ml.stat import Correlation

In [None]:
print("Pearson Correlation between Carrier Delay and Arrival Delay: ", df_model.stat.corr("CarrierDelay", "ArrDelay"))
print("Pearson Correlation between WeatherDelay Delay and Arrival Delay: ", df_model.stat.corr("WeatherDelay", "ArrDelay"))
print("Pearson Correlation between NASDelay Delay and Arrival Delay: ", df_model.stat.corr("NASDelay", "ArrDelay"))
print("Pearson Correlation between SecurityDelay Delay and Arrival Delay: ", df_model.stat.corr("SecurityDelay", "ArrDelay"))

We need to remove attributes that can only be populated upon the flight's arrival

In [None]:
#Attributes causing data leakage
df_dl = df_model.drop("ArrTime", "ActualElapsedTime", "AirTime", "TaxiIn", "Diverted", "CarrierDelay", 
                "WeatherDelay", "NASDelay", "SecurityDelay")

In [None]:
df_dl = make_pipeline(df_dl)

In [None]:
#Replicating Baseline Linear Regression Model without Data Leakage Attributes:
lr_dl = LinearRegression(maxIter=10, featuresCol="features", labelCol="ArrDelay")

In [None]:
train, test = df_model.randomSplit([0.8, 0.2], seed=1)

In [None]:
#Fitting the model to the training data
lrmodel_dl = lr_dl.fit(train)

In [None]:
#Creating Model Summary to evaluate performance:
trainingSummary_dl = lrmodel_dl.summary

print("Training Set R2: ", trainingSummary_dl.r2)
print("Training Set RMSE: ", trainingSummary_dl.rootMeanSquaredError)

The model is performing worse. Let's examine Feature Importance with the help of Random Forest Regression Model

In [None]:
from pyspark.ml import RandomForestRegressor

In [None]:
rf = RandomForestRegressor(labelCol="ArrDelay", featuresCol="features", seed=1, numTrees=5, subsamplingRate = 1)

In [None]:
rf_model_dl = rf.fit(train)

In [None]:
rf_model_dl.featureImportances

In [None]:
def ExtractFeatureImp(featureImp, df, featuresCol):
    extract = []
    for i in df.schema[featuresCol].metadata["ml_attr"]["attrs"]:
        extract = extract + df.schema[featuresCol].metadata["ml_attr"]["attrs"][i]
    varlist['score'] = varlist['idx'].apply(lambda x: featureImp[x])
    return(varlist.sort_values('score', ascending=False))

In [None]:
extracted = ExtractFeatureImp(rf_model_dl.featureImportances, df_dl, "features")

In [None]:
extracted

Rather than using all available variables, we can discard the ones that have little importance and construct new features with those

In [None]:
import featuretools4s as fts

In [None]:
drop = [col for col not in extracted['name']]

In [None]:
df_fe = df.drop(drop)

In [None]:
es = fts.EntitySetSpark(id="Flights")

In [None]:
es.entity_from_dataframe("Flights", df_fe, index="id")

In [None]:
features = fts.dfs(spark, entityset=es, target_entity="Flights",
#                   agg_primitives=["mean"],
#                   primary_col="id", num_partition=3)

In [None]:
#Run improved model here

This time we will use hyperparameter tuning to train the model

In [None]:
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

In [None]:
#Replicating Baseline Linear Regression Model without Data Leakage Attributes:
lr_dl = LinearRegression(maxIter=10, featuresCol="features", labelCol="ArrDelay")

In [None]:
paramGrid = ParamGridBuilder().addGrid(lr_dl.)