<h1 align="center"> Airline Delays </h1>
<h2 align="center"> W261 - Final Project </h2>
<h5 align="center"> by Team 25: Adam Sohn, Chandra Shekar Bikkanur, Jayesh Parikh, Tucker Anderson</h5>

<h2 align="center"> Business Question:</h2>

Flights that arrive late to the destination due to weather delays, technical delays, security delays and etc. cause economical loss and inconvenience to the passengers. Predicting if a flight is going to be delayed at the destination right after it departs will give the airlines a heads up to act and mitigate the losses. For this analysis, we are going to look into the airline's data along with weather data to predict the arrival delay for a given flight in minutes. This prediction of arrival delay could be fed into other systems such as `MIL8` (mobile application for flight status) or `iReebook` (rebooking/rescheduling portal) for mitigating the adverse delay impacts on the airline. For this analysis, the predictive model should be able to cover maximum variance of the dependent variable (Arrival Delay).

<h5>Import Libraries & Data:</h5>

For this analysis, we are going to import below modules/classes from `pyspark` and other data analysis libraries.

In [4]:
# Import libraries
import re
import os
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import time
import functools
import dateutil.parser
import datetime
from math import atan2, cos, sin, radians, degrees

from pyspark.sql import functions as f
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, IntegerType, NullType, ShortType, DateType, BooleanType, BinaryType, TimestampType
from pyspark.sql import SQLContext
from pyspark.sql.functions import col, concat, lit, udf
from pyspark.sql import DataFrameNaFunctions
sqlContext = SQLContext(sc)

from pyspark.ml.feature import OneHotEncoderEstimator, StringIndexer, VectorAssembler
from pyspark.ml import Pipeline
from pyspark.ml.regression import LinearRegression
from pyspark.ml.regression import DecisionTreeRegressor
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.regression import GBTRegressor
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import ParamGridBuilder, TrainValidationSplit, CrossValidator, CrossValidatorModel
from pyspark.ml.stat import ChiSquareTest

Now, we will import airlines data, weather data, stations data and airport codes data for the analysis. We have used below sources for our data collection:

+ Airlines: Bureau Of Transportation Statistics (https://www.transtats.bts.gov/)
+ Weather: National Centers for Environmental Information (https://www.ncei.noaa.gov/)
+ Stations: Databricks FileStore (dbfs:/mnt/mids-w261/data/DEMO8/gsod/stations.csv.gz)
+ Airport Location: Open Flights Organization (https://openflights.org/data.html)

In [6]:
#Read in airlines, weather, stations, airport codes dataset
airlines = spark.read.option("header", "true").parquet(f"dbfs:/mnt/mids-w261/data/datasets_final_project/parquet_airlines_data/201*.parquet")
weather_parquet = spark.read.option("header", "true")\
                      .parquet(f"dbfs:/mnt/mids-w261/data/datasets_final_project/new_weather_parquet_177/weather201*a.parquet")
stations = spark.read.option("header", "true").csv("dbfs:/mnt/mids-w261/data/DEMO8/gsod/stations.csv.gz")
airport_codes = spark.read.csv('/FileStore/tables/airport_codes.csv', header="true", inferSchema="true")
airport_codes = airport_codes.selectExpr("`IATA Code` as code", "Latitude as lat", "Longitude as lon")

<h2 align="center">EDA & Discussion of Challenges:</h2>

We will now conduct an exploratory data analysis on above data sets to get a deeper insight into the data and data structure.

In [9]:
airlines.printSchema() # Check the data structure of airlines dataset

In [10]:
airlines.count() # Check for total number of records in airlines dataset

In [11]:
len(airlines.columns) # Check for total number of columns in airlines dataset

In [12]:
display(airlines.sample(0.0000001, False)) # sample and display a fraction of records from airlines dataset

YEAR,QUARTER,MONTH,DAY_OF_MONTH,DAY_OF_WEEK,FL_DATE,OP_UNIQUE_CARRIER,ORIGIN_AIRPORT_ID,ORIGIN_AIRPORT_SEQ_ID,ORIGIN_CITY_MARKET_ID,ORIGIN,ORIGIN_CITY_NAME,ORIGIN_STATE_ABR,ORIGIN_STATE_FIPS,ORIGIN_STATE_NM,ORIGIN_WAC,DEST_AIRPORT_ID,DEST_AIRPORT_SEQ_ID,DEST_CITY_MARKET_ID,DEST,DEST_CITY_NAME,DEST_STATE_ABR,DEST_STATE_FIPS,DEST_STATE_NM,DEST_WAC,CRS_DEP_TIME,DEP_TIME,DEP_DELAY,DEP_DELAY_NEW,DEP_DEL15,DEP_DELAY_GROUP,DEP_TIME_BLK,TAXI_OUT,WHEELS_OFF,WHEELS_ON,TAXI_IN,CRS_ARR_TIME,ARR_TIME,ARR_DELAY,ARR_DELAY_NEW,ARR_DEL15,ARR_DELAY_GROUP,ARR_TIME_BLK,CANCELLED,DIVERTED,CRS_ELAPSED_TIME,ACTUAL_ELAPSED_TIME,AIR_TIME,FLIGHTS,DISTANCE,DISTANCE_GROUP,CARRIER_DELAY,WEATHER_DELAY,NAS_DELAY,SECURITY_DELAY,LATE_AIRCRAFT_DELAY
2017,3,8,31,4,2017-08-31,B6,11042,1104203,30647,CLE,"Cleveland, OH",OH,39,Ohio,44,10721,1072102,30721,BOS,"Boston, MA",MA,25,Massachusetts,13,947,959,12.0,12.0,0.0,0,0900-0959,11.0,1010,1126,6.0,1125,1132,7.0,7.0,0.0,0,1100-1159,False,False,98.0,93.0,76.0,1.0,563.0,3,,,,,
2018,4,11,21,3,2018-11-21,WN,10792,1079206,30792,BUF,"Buffalo, NY",NY,36,New York,22,13232,1323202,30977,MDW,"Chicago, IL",IL,17,Illinois,41,1435,1430,-5.0,0.0,0.0,-1,1400-1459,12.0,1442,1511,3.0,1520,1514,-6.0,0.0,0.0,-1,1500-1559,False,False,105.0,104.0,89.0,1.0,468.0,2,,,,,


In [13]:
display(airlines.describe()) # descriptive statistics of the airlines dataset 

summary,YEAR,QUARTER,MONTH,DAY_OF_MONTH,DAY_OF_WEEK,OP_UNIQUE_CARRIER,ORIGIN_AIRPORT_ID,ORIGIN_AIRPORT_SEQ_ID,ORIGIN_CITY_MARKET_ID,ORIGIN,ORIGIN_CITY_NAME,ORIGIN_STATE_ABR,ORIGIN_STATE_FIPS,ORIGIN_STATE_NM,ORIGIN_WAC,DEST_AIRPORT_ID,DEST_AIRPORT_SEQ_ID,DEST_CITY_MARKET_ID,DEST,DEST_CITY_NAME,DEST_STATE_ABR,DEST_STATE_FIPS,DEST_STATE_NM,DEST_WAC,CRS_DEP_TIME,DEP_TIME,DEP_DELAY,DEP_DELAY_NEW,DEP_DEL15,DEP_DELAY_GROUP,DEP_TIME_BLK,TAXI_OUT,WHEELS_OFF,WHEELS_ON,TAXI_IN,CRS_ARR_TIME,ARR_TIME,ARR_DELAY,ARR_DELAY_NEW,ARR_DEL15,ARR_DELAY_GROUP,ARR_TIME_BLK,CRS_ELAPSED_TIME,ACTUAL_ELAPSED_TIME,AIR_TIME,FLIGHTS,DISTANCE,DISTANCE_GROUP,CARRIER_DELAY,WEATHER_DELAY,NAS_DELAY,SECURITY_DELAY,LATE_AIRCRAFT_DELAY
count,31746841.0,31746841.0,31746841.0,31746841.0,31746841.0,31746841,31746841.0,31746841.0,31746841.0,31746841,31746841,31746841,31746841.0,31746841,31746841.0,31746841.0,31746841.0,31746841.0,31746841,31746841,31746841,31746841.0,31746841,31746841.0,31746841.0,31274521.0,31269545.0,31269545.0,31269545.0,31269545.0,31746841,31260424.0,31260429.0,31244917.0,31244917.0,31746841.0,31244919.0,31176201.0,31176201.0,31176201.0,31176201.0,31746841,31746677.0,31178799.0,31178799.0,31746841.0,31746841.0,31746841.0,5799114.0,5799114.0,5799114.0,5799114.0,5799114.0
mean,2017.1512498204152,2.51748770846208,6.552106365480585,15.749554640727876,3.9346285509162944,,12668.724409461716,1266875.803290192,31729.315288031336,,,,26.35374732245013,,54.91906164774001,12668.666651116562,1266870.0274082704,31729.2951808339,,,,26.354102948384693,,54.919218135750896,1330.0884999550035,1334.2122192375064,9.855285614165476,12.909587811399238,0.1820794322398998,0.0360368850905889,,16.830789563186986,1356.9563268309594,1464.4766360877195,7.5604571777227,1488.9034405659447,1468.8957719173477,4.615475952313754,12.966188215170924,0.1860109575249402,-0.2096807112579239,,143.2167191860742,138.22906985609035,113.8502422431345,1.0,823.2170183483768,3.765292206553717,19.98459350859459,3.2259498606166392,15.44036813209742,0.0891679315150555,25.364284785572416
stddev,1.431653281021503,1.1053295681781927,3.399430256141529,8.774238088354528,1.991763538747178,,1526.7397787182156,152673.7066902925,1289.4588026200715,,,,16.539517798596844,,26.577828324534646,1526.721213157486,152671.85014169713,1289.419206153186,,,,16.539679261968384,,26.57807966993098,489.86848319644014,503.2922887741845,43.5052029370407,42.44165318434854,0.3859099860819424,2.161932356946247,,9.488981863443778,504.9367808166726,531.9873729297799,5.92997944817499,516.8048646426242,536.3586689058154,45.59418015238943,42.14088584758871,0.3891155176322491,2.2975645036344483,,74.73117735923344,74.33716296557806,72.24024903973572,0.0,607.6826683052021,2.3923501887692864,59.30797970625764,26.81202538233581,34.73908233877254,2.9147981743398184,48.60358147038267
min,2015.0,1.0,1.0,1.0,1.0,9E,10135.0,1013503.0,30070.0,ABE,"Aberdeen, SD",AK,1.0,Alabama,1.0,10135.0,1013503.0,30070.0,ABE,"Aberdeen, SD",AK,1.0,Alabama,1.0,1.0,1.0,-234.0,0.0,0.0,-2.0,0001-0559,0.0,1.0,1.0,0.0,1.0,1.0,-238.0,0.0,0.0,-2.0,0001-0559,-99.0,14.0,4.0,1.0,21.0,1.0,0.0,0.0,0.0,0.0,0.0
max,2019.0,4.0,12.0,31.0,7.0,YX,16869.0,1686901.0,36133.0,YUM,"Yuma, AZ",WY,78.0,Wyoming,93.0,16869.0,1686901.0,36133.0,YUM,"Yuma, AZ",WY,78.0,Wyoming,93.0,2359.0,2400.0,2755.0,2755.0,1.0,12.0,2300-2359,227.0,2400.0,2400.0,414.0,2400.0,2400.0,2695.0,2695.0,1.0,12.0,2300-2359,948.0,1604.0,1557.0,1.0,5095.0,11.0,2695.0,2692.0,1848.0,1078.0,2454.0


From above descriptive statistics of the airlines dataset, we have a combination of categorical and numerical columns and also there are some columns with missing values.

As part of EDA, we need to check if there are any null/NaN values in the dataset. If there are null values in the dataset, we need to see what proportion of the data that is missing or has null values. This helps us in deciding the imputation strategy for missing values.

In [15]:
def nullDataFrame(df):
  '''
  Returns a pandas dataframe consisting of column names, null values and percentage of null values for the given datftame, 'df' 
  '''
  null_feature_list = []
  count = df.count()
  for column in df.columns:
    nulls = df.filter(df[column].isNull()).count()
    nulls_perct = np.round((nulls/count)*100, 2)
    null_feature_list.append([column, nulls, nulls_perct])
  nullCounts_df = pd.DataFrame(np.array(null_feature_list), columns=['Feature_Name', 'Null_Counts', 'Percentage_Null_Counts'])
  return nullCounts_df

airlines_raw_nullCounts_df = nullDataFrame(airlines)
airlines_raw_nullCounts_df

Unnamed: 0,Feature_Name,Null_Counts,Percentage_Null_Counts
0,YEAR,0,0.0
1,QUARTER,0,0.0
2,MONTH,0,0.0
3,DAY_OF_MONTH,0,0.0
4,DAY_OF_WEEK,0,0.0
5,FL_DATE,0,0.0
6,OP_UNIQUE_CARRIER,0,0.0
7,ORIGIN_AIRPORT_ID,0,0.0
8,ORIGIN_AIRPORT_SEQ_ID,0,0.0
9,ORIGIN_CITY_MARKET_ID,0,0.0


From above dataframe for null values, we can see that most of the columns have very little proportion of missing values (at most ~2%) except for `CARRIER_DELAY`, `WEATHER_DELAY`, `NAS_DELAY`, `SECURITY_DELAY` and `LATE_AIRCRAFT_DELAY` where the missing values amount to 81% of the data.

Let us now plot the histograms for all the numerical features in airlines dataset to see the data distribution. For this, we will take a fraction (0.0001) of the original airlines data and plot the histograms.

In [17]:
sample_airlines_df = airlines.sample(False, 0.0001, 2020) # Sample a fraction of the airlines data 
airlines_pandas_df = sample_airlines_df.toPandas() # Converting spark SQL dataframe to pandas dataframe for plotting Histograms

In [18]:
numeric_features = [x[0] for x in airlines.dtypes if x[1] == 'int' or x[1] == 'double'] # Retrieving only numeric features
airlines_pandas_df[numeric_features].hist(figsize=(30,30), bins=50)
display(plt.show())

From above Histograms for numeric features, we can see that there are some features with normal distribution, uniform distribution, right skewed distribution and left skewed distribution.

Let us now create a correlation matrix to see what all features are positively correlated, negatively correlated and not correlated with the dependent variable, `ARR_DELAY`.

In [20]:
airlines_pandas_df.corr() # Check the correlation of all the features with ARR_DELAY

Unnamed: 0,YEAR,QUARTER,MONTH,DAY_OF_MONTH,DAY_OF_WEEK,ORIGIN_AIRPORT_ID,ORIGIN_AIRPORT_SEQ_ID,ORIGIN_CITY_MARKET_ID,ORIGIN_STATE_FIPS,ORIGIN_WAC,DEST_AIRPORT_ID,DEST_AIRPORT_SEQ_ID,DEST_CITY_MARKET_ID,DEST_STATE_FIPS,DEST_WAC,CRS_DEP_TIME,DEP_TIME,DEP_DELAY,DEP_DELAY_NEW,DEP_DEL15,DEP_DELAY_GROUP,TAXI_OUT,WHEELS_OFF,WHEELS_ON,TAXI_IN,CRS_ARR_TIME,ARR_TIME,ARR_DELAY,ARR_DELAY_NEW,ARR_DEL15,ARR_DELAY_GROUP,CANCELLED,DIVERTED,CRS_ELAPSED_TIME,ACTUAL_ELAPSED_TIME,AIR_TIME,FLIGHTS,DISTANCE,DISTANCE_GROUP,CARRIER_DELAY,WEATHER_DELAY,NAS_DELAY,SECURITY_DELAY,LATE_AIRCRAFT_DELAY
YEAR,1.0,0.027245,0.036315,0.026719,-0.025623,-0.010927,-0.010924,-0.009716,0.021722,-0.03739,0.007064,0.007067,0.028988,0.032613,-0.038472,-0.027221,-0.027685,0.028977,0.030955,0.012619,0.017931,0.072703,-0.023727,-0.012337,-0.002311,-0.004713,-0.010057,0.026492,0.032724,0.015501,0.011243,-0.010194,-0.038534,-0.017232,-0.016805,-0.025913,,-0.033147,-0.028839,0.033212,0.015593,-0.011671,,0.062875
QUARTER,0.027245,1.0,0.969916,0.01297,0.013014,0.02142,0.021421,0.050313,0.003572,-0.003701,0.013686,0.013686,0.026063,-0.021461,0.008866,0.012162,0.021252,-0.003205,-0.002406,-0.001524,0.004648,-0.000128,0.021392,0.002051,0.004912,0.010268,-5.2e-05,-0.009723,-0.004859,-0.017009,-0.00765,-0.045082,0.01504,0.007771,0.001922,0.001694,,0.011972,0.01323,-0.025513,-0.025687,0.092382,,0.001798
MONTH,0.036315,0.969916,1.0,0.007653,0.015631,0.022241,0.022241,0.057682,0.005509,-0.002387,0.019026,0.019027,0.025237,-0.018354,0.014429,0.011358,0.017888,-0.006035,-0.005224,-0.00486,-0.00063,-0.00225,0.019985,0.004916,0.005283,0.015823,0.002964,-0.01114,-0.005986,-0.020296,-0.010392,-0.039038,0.014376,0.007283,0.002319,0.002308,,0.011409,0.013342,-0.018053,-0.024879,0.088749,,-0.002919
DAY_OF_MONTH,0.026719,0.01297,0.007653,1.0,0.024595,-0.000954,-0.000954,-0.025161,-0.019787,0.003617,-0.027405,-0.027405,-0.031011,0.005575,0.00124,-0.038648,-0.039347,-0.026265,-0.023744,-0.018424,-0.017345,0.00306,-0.033409,-0.015103,0.012239,-0.011522,-0.014857,-0.027449,-0.024578,-0.029835,-0.019329,-0.032208,0.020449,0.028223,0.023105,0.022431,,0.025627,0.023623,-0.012056,-0.018533,0.046834,,-0.031233
DAY_OF_WEEK,-0.025623,0.013014,0.015631,0.024595,1.0,-0.011436,-0.011437,-0.004043,-0.005203,-0.014304,0.04346,0.043461,0.058038,-0.013205,-0.049296,0.002067,0.007483,-0.009442,-0.010738,-0.000761,-0.003261,-0.052986,0.00932,0.012883,0.019687,0.019355,0.018121,-0.025822,-0.021702,-0.005155,-0.019098,0.002249,-0.008434,0.039164,0.029569,0.035102,,0.036655,0.036791,-0.057791,-0.02837,-0.088815,,0.036059
ORIGIN_AIRPORT_ID,-0.010927,0.02142,0.022241,-0.000954,-0.011436,1.0,1.0,0.635169,-0.1017,0.251371,0.012615,0.012616,-0.003904,-0.029606,0.116429,-0.040424,-0.035854,-0.025605,-0.021149,-0.019009,-0.023874,-0.029176,-0.038927,0.012933,0.010375,0.00242,0.00925,-0.019422,-0.015105,0.002071,-0.017611,-0.017611,-0.003331,0.045404,0.048293,0.052135,,0.070723,0.074988,-0.078693,-0.024214,0.052672,,-0.004275
ORIGIN_AIRPORT_SEQ_ID,-0.010924,0.021421,0.022241,-0.000954,-0.011437,1.0,1.0,0.635168,-0.101701,0.251371,0.012616,0.012616,-0.003903,-0.029606,0.116429,-0.040424,-0.035854,-0.025605,-0.021149,-0.019009,-0.023873,-0.029176,-0.038926,0.012933,0.010374,0.00242,0.00925,-0.019422,-0.015105,0.002071,-0.017611,-0.017612,-0.003331,0.045403,0.048293,0.052135,,0.070722,0.074988,-0.078692,-0.024213,0.052672,,-0.004275
ORIGIN_CITY_MARKET_ID,-0.009716,0.050313,0.057682,-0.025161,-0.004043,0.635169,0.635168,1.0,0.00802,0.065779,-0.004904,-0.004904,-0.04166,-0.028528,0.01949,-0.06639,-0.068907,-0.065964,-0.05811,-0.069272,-0.070384,-0.092384,-0.068669,-0.03969,0.070359,-0.049016,-0.035443,-0.059496,-0.052885,-0.0526,-0.065244,-0.014584,-0.022242,-0.003427,-5e-06,0.005802,,0.008401,0.010437,-0.045065,0.006605,0.021613,,-0.060296
ORIGIN_STATE_FIPS,0.021722,0.003572,0.005509,-0.019787,-0.005203,-0.1017,-0.101701,0.00802,1.0,-0.063609,-0.057005,-0.057005,-0.060074,0.023615,-0.036584,-0.034425,-0.033198,-0.007923,-0.004532,-0.021493,-0.012665,0.061069,-0.021473,-0.034124,0.040059,-0.036603,-0.033415,-0.006366,0.00025,-0.014226,-0.002273,0.021345,0.004934,-0.026532,-0.02613,-0.037101,,-0.063116,-0.067743,0.01661,0.055179,-0.021244,,0.012027
ORIGIN_WAC,-0.03739,-0.003701,-0.002387,0.003617,-0.014304,0.251371,0.251371,0.065779,-0.063609,1.0,0.143793,0.143793,0.065635,-0.06724,0.423565,-0.00754,-0.013429,0.016886,0.015444,-0.003525,0.005243,-0.116894,-0.028284,0.048141,-0.010859,0.04597,0.044382,0.019649,0.013989,-0.006362,0.001546,0.00935,0.007872,-0.013027,-0.014081,0.000463,,0.052995,0.055903,0.070354,0.050508,-0.055702,,0.012243


From above correlation matrix, we can see that `ARR_DELAY` is highly correlated with `DEP_DELAY`, `CARRIER_DELAY`, `WEATHER_DELAY`, `NAS_DELAY`,  and `LATE_AIRCRAFT_DELAY` among other features. For our supervised machine learning model, we will consider these features for training the model along with other features of relavence and domain

In [22]:
weather_parquet.printSchema() # data structure of weather data

In [23]:
weather_parquet.count() # to check the number of records in weather data

In [24]:
len(weather_parquet.columns) # Check the number of columns in weather data

In [25]:
display(weather_parquet.sample(0.000000001, False)) # sample a fraction of records for data insight

STATION,DATE,SOURCE,LATITUDE,LONGITUDE,ELEVATION,NAME,REPORT_TYPE,CALL_SIGN,QUALITY_CONTROL,WND,CIG,VIS,TMP,DEW,SLP,AW1,GA1,GA2,GA3,GA4,GE1,GF1,KA1,KA2,MA1,MD1,MW1,MW2,OC1,OD1,OD2,REM,EQD,AW2,AX4,GD1,AW5,GN1,AJ1,AW3,MK1,KA4,GG3,AN1,RH1,AU5,HL1,OB1,AT8,AW7,AZ1,CH1,RH3,GK1,IB1,AX1,CT1,AK1,CN2,OE1,MW5,AO1,KA3,AA3,CR1,CF2,KB2,GM1,AT5,AY2,MW6,MG1,AH6,AU2,GD2,AW4,MF1,AA1,AH2,AH3,OE3,AT6,AL2,AL3,AX5,IB2,AI3,CV3,WA1,GH1,KF1,CU2,CT3,SA1,AU1,KD2,AI5,GO1,GD3,CG3,AI1,AL1,AW6,MW4,AX6,CV1,ME1,KC2,CN1,UA1,GD5,UG2,AT3,AT4,GJ1,MV1,GA5,CT2,CG2,ED1,AE1,CO1,KE1,KB1,AI4,MW3,KG2,AA2,AX2,AY1,RH2,OE2,CU3,MH1,AM1,AU4,GA6,KG1,AU3,AT7,KD1,GL1,IA1,GG2,OD3,UG1,CB1,AI6,CI1,CV2,AZ2,AD1,AH1,WD1,AA4,KC1,IA2,CF3,AI2,AT1,GD4,AX3,AH4,KB3,CU1,CN4,AT2,CG1,CF1,GG1,MV2,CW1,GG4,AB1,AH5,CN3
72515004725,2017-02-09T02:53:00.000+0000,7,42.2068,-75.98,486.2,"BINGHAMTON GREATER AP, NY US",FM-15,KBGM,V030,"360,5,N,0057,5","00457,5,M,N","016093,5,N,5",-285,-565,100825,,"07,5,+00457,5,99,9","08,5,+00884,5,99,9",,,"9,AGL ,+99999,+99999",99999999999004571999999,,,100685094855,"8,9,008,9,+999,9",,,,,,MET10802/08/17 21:53:02 METAR KBGM 090253Z 36011KT 10SM BKN015 OVC029 M03/M06 A2973 RMK AO2 SLP082 T10281056 58008,,,,"3,99,1,+00457,5,9",,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,"4,99,1,+00884,5,9",,,1000095,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,


<h2 align="center"> Feature Engineering:</h2>

<h5>Airlines & Weather Data Merge:</h5>

We will merge airlines and weather data sets to form a composite dataset.

In [28]:
def is_Weekend(x):
  """
  Function to determine if a given day of the week is a weekend_day(Friday, Saturday, Sunday)
  """
  if   x < 5: 
    return 0
  else: 
    return 1

def is_RushHour(x):
  """
  Function to determine if a given time of the day is rush hour (1600-2100)
  """
  if (x != None) and (x >= 1600) and (x <= 2100): 
    return 1
  else: 
    return 0
 
def preprocessAirlines(df):
  cols_to_keep = ['MONTH', 'DAY_OF_WEEK', 'FL_DATE', 'OP_UNIQUE_CARRIER', 'ORIGIN', 'DEST', 'DEP_DELAY', 'DEP_TIME_BLK', 'ARR_DELAY', 'ARR_TIME_BLK', 'CRS_ELAPSED_TIME', 'DISTANCE',  'CARRIER_DELAY', 'WEATHER_DELAY', 'NAS_DELAY', 'SECURITY_DELAY', 'LATE_AIRCRAFT_DELAY', 'IS_WEEKEND', 'DEP_RUSH_HOUR', 'ARR_RUSH_HOUR', 'DEP_TIME', 'CRS_DEP_TIME', 'ARR_TIME', 'CRS_ARR_TIME']
  cols_to_remove = [x for x in df.columns if x not in cols_to_keep]
  df = df.orderBy("FL_DATE") 
  df = df.filter(df.CANCELLED == False)
  df = df.filter(df.DIVERTED == False)
  df = df.withColumn('CARRIER_DELAY', f.when(df.CARRIER_DELAY.isNotNull(), 1).otherwise(0))
  df = df.withColumn('WEATHER_DELAY', f.when(df.WEATHER_DELAY.isNotNull(), 1).otherwise(0))
  df = df.withColumn('NAS_DELAY', f.when(df.NAS_DELAY.isNotNull(), 1).otherwise(0))
  df = df.withColumn('SECURITY_DELAY', f.when(df.SECURITY_DELAY.isNotNull(), 1).otherwise(0))
  df = df.withColumn('LATE_AIRCRAFT_DELAY', f.when(df.LATE_AIRCRAFT_DELAY.isNotNull(), 1).otherwise(0))
  df = df.withColumn("IS_WEEKEND", f.udf(is_Weekend, IntegerType())("DAY_OF_WEEK"))
  df = df.withColumn("DEP_RUSH_HOUR", f.udf(is_RushHour, IntegerType())("DEP_TIME"))
  df = df.withColumn("ARR_RUSH_HOUR", f.udf(is_RushHour, IntegerType())("CRS_ARR_TIME"))
  df = df.fillna(0, subset=['ARR_DELAY', 'DEP_DELAY'])  
  df = df.withColumn('ORIGIN_CARRIER', concat(col("ORIGIN"), lit("_"), col("OP_UNIQUE_CARRIER")))
  df = df.withColumn('DEST_CARRIER', concat(col("DEST"), lit("_"), col("OP_UNIQUE_CARRIER")))
  preprocessAirlines_df = df.drop(*cols_to_remove)
  return preprocessAirlines_df

In [29]:
def unionAll_fn(dfs):
    return functools.reduce(lambda df1,df2: df1.union(df2.select(df1.columns)), dfs) 

def US_fn(df):
    """
    Reduce df to US only to reduce size of dataset
    """
    # US is lat/long ranges according to format: [[(lat_low, lat_high),(long_low, long_high)], [(lat_low, lat_high),(long_low, long_high)]]
    US = [[(24,49),(-125,-67)],[(17,19),(-68,-65.5)], [(13,14),(144,145)], [(15,16),(145,146)], [(-15,-14), (-171,-170)], [(18,19),(-65.4,-64)], [(18,23),(-160,-154)], [(50,175),(-170,-103)]]  

    list_df = [] #empty list for parquet parts
    parquet_part = spark.range(0).drop("id") #empty spark df

    #Filtering for individual areas in US
    for item in US:
      parquet_part = df.filter((f.col('Latitude') > item[0][0]) & (f.col('Latitude') < item[0][1]) & (f.col('Longitude') > item[1][0]) & (f.col('Longitude') < item[1][1]))
      list_df.append(parquet_part)
    
    #Appending each individual US area
    parquet_us = unionAll_fn(list_df)

    return parquet_us

def reduce_split_cols_fn(weather_parquet_us):
    """
    Reduce weather dataset to columns of interest and return split columns with comma-separated values into multiple columns for each comma-separated value.
    """
    #Reduce weather dataset to columns of interest (high level) and return split columns with comma-separated values into multiple columns for each comma-separated value.
    weather_pre_split = weather_parquet_us.select('STATION','DATE','SOURCE','LATITUDE','LONGITUDE',f.split('WND', ',').alias('WND'),f.split('VIS', ',').alias('VIS'),f.split('SLP', ',').alias('SLP'),f.split('AA1', ',').alias('AA1'))
    df_sizes_WND = weather_pre_split.select(f.size('WND').alias('WND'))
    df_sizes_VIS = weather_pre_split.select(f.size('VIS').alias('VIS'))
    df_sizes_SLP = weather_pre_split.select(f.size('SLP').alias('SLP'))
    df_sizes_AA1 = weather_pre_split.select(f.size('AA1').alias('AA1'))
    df_max_WND = df_sizes_WND.agg(f.max('WND'))
    df_max_VIS = df_sizes_VIS.agg(f.max('VIS'))
    df_max_SLP = df_sizes_SLP.agg(f.max('SLP'))
    df_max_AA1 = df_sizes_AA1.agg(f.max('AA1'))
    nb_columns_WND = df_max_WND.collect()[0][0]
    nb_columns_VIS = df_max_VIS.collect()[0][0]
    nb_columns_SLP = df_max_SLP.collect()[0][0]
    nb_columns_AA1 = df_max_AA1.collect()[0][0]
    weather_post_split = weather_pre_split.select('STATION','DATE','SOURCE','LATITUDE','LONGITUDE',*[weather_pre_split['WND'][i] for i in range(nb_columns_WND)],*[weather_pre_split['VIS'][i] for i in range(nb_columns_VIS)],*[weather_pre_split['SLP'][i] for i in range(nb_columns_SLP)],*[weather_pre_split['AA1'][i] for i in range(nb_columns_AA1)])
  
    #Filtering out data with quality issues. All string values are indicative of quality issue
    fltr_msk = [
    f.col('WND[0]') != '999',
    f.col('WND[1]') != '2',
    f.col('WND[1]') != '3',
    f.col('WND[1]') != '6',
    f.col('WND[1]') != '7',
    f.col('WND[2]') != '9',
    f.col('WND[3]') != '9999',  
    f.col('WND[4]') != '2',
    f.col('WND[4]') != '3',
    f.col('WND[4]') != '6',
    f.col('WND[4]') != '7',
    f.col('VIS[0]') != '999999',
    f.col('VIS[1]') != '2',
    f.col('VIS[1]') != '3',
    f.col('VIS[1]') != '6',
    f.col('VIS[1]') != '7',
    f.col('VIS[2]') != '9',
    f.col('VIS[3]') != '2',
    f.col('VIS[3]') != '3',
    f.col('VIS[3]') != '6',
    f.col('VIS[3]') != '7',
    f.col('SLP[0]') != '99999',
    f.col('SLP[1]') != '2',
    f.col('SLP[1]') != '3',
    f.col('SLP[1]') != '6',
    f.col('SLP[1]') != '7',
    f.col('SLP[1]') != '9',
    f.col('AA1[0]') != '99',
    f.col('AA1[1]') != '9999',
    f.col('AA1[2]') != '9',
    f.col('AA1[3]') != '2',
    f.col('AA1[3]') != '3',
    f.col('AA1[3]') != '6',
    f.col('AA1[3]') != '7'
    ]
    weather_fltr = weather_post_split
    for i in fltr_msk:
      weather_fltr = weather_fltr.filter(i)

    #Reduce weather dataset to columns of interest (low level)
    weather_fltr_drop = weather_fltr.select('STATION','DATE','SOURCE','LATITUDE','LONGITUDE','WND[0]', 'WND[3]','VIS[0]','SLP[0]','AA1[0]')
    weather_fltr_drop = weather_fltr_drop.withColumnRenamed("DATE", "TIMESTAMP")

    return weather_fltr_drop

def distinct_station_fn(weather_fltr_drop):
    """
    For df input, return distinct stations for calculating closest stations to airports
    """
    weather_fltr_drop_distinct = weather_fltr_drop.select("STATION", "LATITUDE", "LONGITUDE").distinct()
    return weather_fltr_drop_distinct

def haversine_join_station_aircode_fn(airport_codes_df, weather_df):
    """
    For df input, return haversine distance
    """
    airport_codes_df.createOrReplaceTempView('airport_codes_us')
    weather_df.createOrReplaceTempView('stations_all')
    distance_query = "(SELECT airport_codes_us.code, stations_all.STATION, airport_codes_us.lat AS airport_lat, airport_codes_us.lon AS airport_lon, ( 3959 * acos(cos(radians(airport_codes_us.lat) ) * cos( radians( stations_all.LATITUDE ) ) * cos( radians( stations_all.LONGITUDE ) - radians(airport_codes_us.lon) ) + sin(radians(airport_codes_us.lat) ) * sin( radians( stations_all.LATITUDE ) ) ) ) AS airport_station_distance FROM airport_codes_us CROSS JOIN stations_all)"
    airports_stations_distance_all = spark.sql(distance_query)
    return airports_stations_distance_all
  
def airports_closest_stations_fn(airports_stations_distance_all):
    """
    For df input, return df with closest weather stations to airports
    """

    airports_stations_distance_all.createOrReplaceTempView('airports_stations_distance')
    closest_query = "(SELECT code AS airport_code, STATION AS station_name, airport_lat, airport_lon, airport_station_distance FROM airports_stations_distance ORDER BY airport_station_distance)"
    airports_closest_stations = spark.sql(closest_query)
  
    min_distance_query = "(SELECT code AS airport_code, STATION AS station_code, airport_lat, airport_lon, airport_station_distance FROM (SELECT *, row_number() over (partition by code order by airport_station_distance ASC) as seqnum from airports_stations_distance) airports_stations_distance where seqnum = 1)"
    airports_closest_station = spark.sql(min_distance_query)    

    MAX_ALLOWABLE_WEATHER_DISTANCE = 50.0
    airports_closest_station_filtered = airports_closest_station.filter(airports_closest_station.airport_station_distance < MAX_ALLOWABLE_WEATHER_DISTANCE)
    return airports_closest_station_filtered
  
def bearingClass_fn(flight_bearing, denominations=8):
    denom = 360/denominations
        
    if (int(flight_bearing) < 0 + denom/2) or (int(flight_bearing) > (7*denom) + (denom/2)):
      flight_bearing_class = "N"
    elif int(flight_bearing) <= denom + (denom/2):
      flight_bearing_class = "NW"
    elif int(flight_bearing) <= (2*denom) + (denom/2):
      flight_bearing_class = "W"
    elif int(flight_bearing) <= (3*denom) + (denom/2):
      flight_bearing_class = "SW"
    elif int(flight_bearing) <= (4*denom) + (denom/2):
      flight_bearing_class = "S"
    elif int(flight_bearing) <= (5*denom) + (denom/2):
      flight_bearing_class = "SE"
    elif int(flight_bearing) <= (6*denom) + (denom/2):
      flight_bearing_class = "E"
    elif int(flight_bearing) <= (7*denom) + (denom/2):
      flight_bearing_class = "NE"
    else:
      flight_bearing_class = "UNK"
      
    return flight_bearing_class
  
udfBearingClass_fn = udf(bearingClass_fn, StringType())

def bearingCalculation_fn(lat_a, lon_a, lat_b, lon_b):  
    lat_a_r, lat_b_r, lon_a_r, lon_b_r = radians(lat_a), radians(lat_b), radians(lon_a), radians(lon_b)
    delta_lon = lon_b - lon_a
    delta_lon_r = lon_b_r - lon_a_r
    X = cos(lat_b_r) * sin(delta_lon_r)
    Y = cos(lat_a_r) * sin(lat_b_r) - sin(lat_a_r) * cos(lat_b_r) * cos(delta_lon_r)
  
    flight_bearing = degrees(atan2(X, Y))
        
    flight_bearing_class = bearingClass_fn(flight_bearing)
  
    return flight_bearing_class
udfBearingCalculation_fn = udf(bearingCalculation_fn, StringType())

def join_closest_weather_airlines_fn(airlines_df, airports_closest_station_filtered):

    # add closest weather station to airlines dataset
    airlines_station_origin_filtered = airlines_df.join(airports_closest_station_filtered, airlines_df.ORIGIN == airports_closest_station_filtered.airport_code, how="inner")
    airlines_station_origin_filtered = airlines_station_origin_filtered.withColumnRenamed("station_code", "ORIGIN_STATION")
    airlines_station_origin_filtered = airlines_station_origin_filtered.withColumnRenamed("airport_station_distance", "ORIGIN_STATION_DISTANCE")
    airlines_station_origin_filtered = airlines_station_origin_filtered.withColumnRenamed("airport_lat", "ORIGIN_LAT")
    airlines_station_origin_filtered = airlines_station_origin_filtered.withColumnRenamed("airport_lon", "ORIGIN_LON")
    airlines_station_origin_filtered = airlines_station_origin_filtered.drop("airport_code")

    # add closest weather station to airlines dataset
    airlines_station_filtered = airlines_station_origin_filtered.join(airports_closest_station_filtered, airlines_station_origin_filtered.DEST == airports_closest_station_filtered.airport_code, how="inner")
    airlines_station_filtered = airlines_station_filtered.withColumnRenamed("station_code", "DEST_STATION")
    airlines_station_filtered = airlines_station_filtered.withColumnRenamed("airport_station_distance", "DEST_STATION_DISTANCE")
    airlines_station_filtered = airlines_station_filtered.withColumnRenamed("airport_lat", "DEST_LAT")
    airlines_station_filtered = airlines_station_filtered.withColumnRenamed("airport_lon", "DEST_LON")
    airlines_station_filtered = airlines_station_filtered.drop("airport_code")

    #add flight bearing angle in degrees from true north (consistent with wind direction)
    airlines_station_filtered = airlines_station_filtered.withColumn("FLIGHT_BEARING", udfBearingCalculation_fn("ORIGIN_LAT","ORIGIN_LON","DEST_LAT","DEST_LON"))
    return airlines_station_filtered

def flightDateTimeCalculation_fn(flight_date, flight_time):  
    timestamp_date = str(flight_date)
    timestamp_hour = str(flight_time).zfill(4)[:-2]
    timestamp_minute = str(flight_time).zfill(4)[-2:]
  
    timestamp = timestamp_date + 'T' + timestamp_hour + ':' + timestamp_minute# + ".000+0000"
    try:
      datetime_timestamp = dateutil.parser.isoparse(timestamp)
    except ValueError:
      timestamp = timestamp_date + 'T' + '00' + ':' + timestamp_minute# + ".000+0000"
      datetime_timestamp = dateutil.parser.isoparse(timestamp)
    
    return datetime_timestamp
  
def flightDateTimeCalculationArr_fn(flight_date, flight_time_dep, flight_time_arr):  
    timestamp_dep_date = str(flight_date)
    timestamp_arr_date = str(flight_date)
  
    
    timestamp_dep_hour = str(flight_time_dep).zfill(4)[:-2]
    timestamp_dep_minute = str(flight_time_dep).zfill(4)[-2:]
    timestamp_arr_hour = str(flight_time_arr).zfill(4)[:-2]
    timestamp_arr_minute = str(flight_time_arr).zfill(4)[-2:]
    
    timestamp_dep = timestamp_dep_hour + ':' + timestamp_dep_minute
    timestamp_arr = timestamp_arr_hour + ':' + timestamp_arr_minute
    
    timestamp_dep = timestamp_dep_date + 'T' + timestamp_dep_hour + ':' + timestamp_dep_minute# + ".000+0000"
    try:
      datetime_timestamp_dep = dateutil.parser.isoparse(timestamp_dep)
    except ValueError:
      timestamp_dep = timestamp_dep_date + 'T' + '00' + ':' + timestamp_dep_minute# + ".000+0000"
      datetime_timestamp_dep = dateutil.parser.isoparse(timestamp_dep)
    
    timestamp_arr = timestamp_arr_date + 'T' + timestamp_arr_hour + ':' + timestamp_arr_minute# + ".000+0000"
    try:
      datetime_timestamp_arr = dateutil.parser.isoparse(timestamp_arr)
    except ValueError:
      timestamp_arr = timestamp_arr_date + 'T' + '00' + ':' + timestamp_arr_minute# + ".000+0000"
      datetime_timestamp_arr = dateutil.parser.isoparse(timestamp_arr)
  
    # if flight arrived a later than when started, only works if flight was less than 24 hours long:
    if datetime_timestamp_dep > datetime_timestamp_arr:
      datetime_timestamp_arr = datetime_timestamp_arr + datetime.timedelta(days=1)

    return datetime_timestamp_arr

udfFlightDateTimeCalculation_fn = udf(flightDateTimeCalculation_fn, TimestampType())
udfFlightDateTimeCalculationArr_fn = udf(flightDateTimeCalculationArr_fn, TimestampType())

def airlines_station_datetime_fn(airlines_station_filtered):
    airlines_station_datetime = airlines_station_filtered.withColumn("CRS_DEP_TIMESTAMP", udfFlightDateTimeCalculation_fn("FL_DATE","CRS_DEP_TIME"))
    airlines_station_datetime = airlines_station_datetime.withColumn("CRS_ARR_TIMESTAMP", udfFlightDateTimeCalculationArr_fn("FL_DATE","CRS_DEP_TIME", "CRS_ARR_TIME"))
    return airlines_station_datetime

def airlines_station_datetime_unix_fn(airlines_station_datetime):
    airlines_station_datetime_unix = airlines_station_datetime.withColumn("CRS_DEP_TIMESTAMP_UNIX", f.unix_timestamp("CRS_DEP_TIMESTAMP"))
    airlines_station_datetime_unix = airlines_station_datetime_unix.withColumn("CRS_ARR_TIMESTAMP_UNIX", f.unix_timestamp("CRS_ARR_TIMESTAMP"))
    airlines_station_datetime_unix = airlines_station_datetime_unix.withColumn("DEP_HOUR", f.hour("CRS_DEP_TIMESTAMP"))
    airlines_station_datetime_unix = airlines_station_datetime_unix.withColumn("ARR_HOUR", f.hour("CRS_ARR_TIMESTAMP"))
    
    return airlines_station_datetime_unix
  
def weather_fltr_datetime_fn(weather_fltr_drop):
    weather_fltr_datetime = weather_fltr_drop.withColumn("DATE_TIMESTAMP_UNIX", f.unix_timestamp("TIMESTAMP"))
    weather_fltr_datetime = weather_fltr_datetime.withColumn('DATE', f.col("TIMESTAMP").cast(DateType()))
    weather_fltr_datetime = weather_fltr_datetime.withColumn("HOUR", f.hour("TIMESTAMP"))
    
    return weather_fltr_datetime

def weather_avg_fn(weather_fltr_datetime):
    weather_fltr_datetime.createOrReplaceTempView('weather_time')
    weather_avg_query = "(SELECT STATION, DATE, HOUR, ROUND(AVG(`WND[0]`),0) AS `WND[0]`, ROUND(AVG(`WND[3]`),0) AS `WND[3]`, ROUND(AVG(`VIS[0]`),0) AS `VIS[0]`, ROUND(AVG(`SLP[0]`),0) AS `SLP[0]`, ROUND(AVG(`AA1[0]`),0) AS `AA1[0]` FROM weather_time GROUP BY STATION, DATE, HOUR)"

    weather_avg = spark.sql(weather_avg_query)
    
    weather_avg = weather_avg.withColumn("WND_CLASS[0]", udfBearingClass_fn("WND[0]"))
    weather_avg = weather_avg.drop("WND[0]")
    
    return weather_avg
  
def weather_add_values_fn(weather_avg):
    weather_fltr_datetime_origin = weather_avg.withColumnRenamed("STATION", "ORIGIN_STATION_WEATHER")
    weather_fltr_datetime_origin = weather_fltr_datetime_origin.withColumnRenamed("DATE", "ORIGIN_STATION_DATE")
    weather_fltr_datetime_origin = weather_fltr_datetime_origin.withColumnRenamed("HOUR", "ORIGIN_STATION_HOUR")
    weather_fltr_datetime_origin = weather_fltr_datetime_origin.withColumnRenamed("WND_CLASS[0]", "ORIGIN_STATION_WND[0]")
    weather_fltr_datetime_origin = weather_fltr_datetime_origin.withColumnRenamed("WND[3]", "ORIGIN_STATION_WND[3]")
    weather_fltr_datetime_origin = weather_fltr_datetime_origin.withColumnRenamed("VIS[0]", "ORIGIN_STATION_VIS[0]")
    weather_fltr_datetime_origin = weather_fltr_datetime_origin.withColumnRenamed("SLP[0]", "ORIGIN_STATION_SLP[0]")
    weather_fltr_datetime_origin = weather_fltr_datetime_origin.withColumnRenamed("AA1[0]", "ORIGIN_STATION_AA1[0]")
    weather_fltr_datetime_dest = weather_avg.withColumnRenamed("STATION", "DEST_STATION_WEATHER")
    weather_fltr_datetime_dest = weather_fltr_datetime_dest.withColumnRenamed("DATE", "DEST_STATION_DATE")
    weather_fltr_datetime_dest = weather_fltr_datetime_dest.withColumnRenamed("HOUR", "DEST_STATION_HOUR")
    weather_fltr_datetime_dest = weather_fltr_datetime_dest.withColumnRenamed("WND_CLASS[0]", "DEST_STATION_WND[0]")
    weather_fltr_datetime_dest = weather_fltr_datetime_dest.withColumnRenamed("WND[3]", "DEST_STATION_WND[3]")
    weather_fltr_datetime_dest = weather_fltr_datetime_dest.withColumnRenamed("VIS[0]", "DEST_STATION_VIS[0]")
    weather_fltr_datetime_dest = weather_fltr_datetime_dest.withColumnRenamed("SLP[0]", "DEST_STATION_SLP[0]")
    weather_fltr_datetime_dest = weather_fltr_datetime_dest.withColumnRenamed("AA1[0]", "DEST_STATION_AA1[0]")
    return weather_fltr_datetime_origin, weather_fltr_datetime_dest
  
def departure_final_fn(airlines_station_datetime_unix):
    airlines_station_datetime_unix.createOrReplaceTempView("airports_weather")
    weather_fltr_datetime_origin.createOrReplaceTempView("origin_weather")
    origin_join_query = "(SELECT * FROM airports_weather a INNER JOIN origin_weather w ON a.ORIGIN_STATION = w.ORIGIN_STATION_WEATHER AND a.FL_DATE = w.ORIGIN_STATION_DATE AND a.DEP_HOUR = w.ORIGIN_STATION_HOUR)"

    departure_final = spark.sql(origin_join_query)
    return departure_final

def airlines_weather_final_trim_fn(departure_final):
    departure_final.createOrReplaceTempView("airports_weather_dest")
    weather_fltr_datetime_dest.createOrReplaceTempView("dest_weather")
    # chnaged to join on weather @ destination airport @ departure time
    dest_join_query = "(SELECT * FROM airports_weather_dest a INNER JOIN dest_weather w ON a.DEST_STATION = w.DEST_STATION_WEATHER AND a.FL_DATE = w.DEST_STATION_DATE AND a.DEP_HOUR = w.DEST_STATION_HOUR)"

    airlines_weather_final = spark.sql(dest_join_query)
    drop_cols = ['DEST_STATION_DATE', 'DEST_STATION_HOUR', 'ORIGIN_STATION_HOUR', 'ORIGIN_STATION_DATE', 'ORIGIN_LAT', 'ORIGIN_LON', 'DEST_LAT', 'DEST_LON', 'CRS_DEP_TIMESTAMP_UNIX', 'CRS_ARR_TIMESTAMP_UNIX', 'DEP_HOUR', 'ARR_HOUR', 'ORIGIN_STATION', 'DEST_STATION', 'ORIGIN_STATION_WEATHER', 'DEST_STATION_WEATHER']
    airlines_weather_final_trim = airlines_weather_final.drop(*drop_cols)
    return airlines_weather_final_trim
  
def airlines_weather_to_parquet_fn(airlines_weather_final_trim):
    dbutils.fs.rm("dbfs:/tmp/parquet/airlines_weather_final_4_7.parquet")
    airlines_weather_final_trim.write.parquet("dbfs:/tmp/parquet/airlines_weather_final_4_7.parquet")
    return None

In [30]:
# Merging airlines and weather data within United States
airlines_df =  preprocessAirlines(airlines)
weather_parquet_us = US_fn(weather_parquet)
weather_fltr_drop = reduce_split_cols_fn(weather_parquet_us)
weather_fltr_drop_distinct = distinct_station_fn(weather_fltr_drop)
airport_codes_us = US_fn(airport_codes)
airports_stations_distance_all = haversine_join_station_aircode_fn(airport_codes_us, weather_fltr_drop_distinct)
airports_closest_station_filtered = airports_closest_stations_fn(airports_stations_distance_all)
airports_closest_station_filtered = join_closest_weather_airlines_fn(airlines_df, airports_closest_station_filtered)
airlines_station_datetime = airlines_station_datetime_fn(airports_closest_station_filtered)
airlines_station_datetime_unix = airlines_station_datetime_unix_fn(airlines_station_datetime)  
weather_fltr_datetime = weather_fltr_datetime_fn(weather_fltr_drop)
weather_avg = weather_avg_fn(weather_fltr_datetime)
weather_fltr_datetime_origin, weather_fltr_datetime_dest = weather_add_values_fn(weather_avg)
departure_final = departure_final_fn(airlines_station_datetime_unix)
airlines_weather_final_trim = airlines_weather_final_trim_fn(departure_final)

In [31]:
airlines_preprocessed_nullCounts_df = nullDataFrame(airlines_df)
airlines_preprocessed_nullCounts_df

Unnamed: 0,Feature_Name,Null_Counts,Percentage_Null_Counts
0,MONTH,0,0.0
1,DAY_OF_WEEK,0,0.0
2,FL_DATE,0,0.0
3,OP_UNIQUE_CARRIER,0,0.0
4,ORIGIN,0,0.0
5,DEST,0,0.0
6,CRS_DEP_TIME,0,0.0
7,DEP_TIME,0,0.0
8,DEP_DELAY,0,0.0
9,DEP_TIME_BLK,0,0.0


In [32]:
#airlines_weather_final_trim.write.parquet("/FileStore/tables/airlines_weather_final_trim.parquet")
airlines_weather_final_trim = spark.read.parquet("/FileStore/tables/airlines_weather_final_trim.parquet")

In [33]:
airlines_weather_final_trim.printSchema()

#### Data Split:

In [35]:
airlines_train, airlines_val, airlines_test = airlines_weather_final_trim.randomSplit([0.8,0.1,0.1], seed = 2020)

In [36]:
train_cnt = airlines_train.count()
val_cnt = airlines_val.count()
test_cnt = airlines_test.count()
total_cnt = train_cnt + val_cnt + test_cnt
print('airlines_train records: {}\n airlines_val records: {}\n  airlines_test records: {}\n total records: {}'.format(train_cnt, val_cnt, test_cnt, total_cnt) )

## Feature Engineering & Feature Selection:

In [38]:
def featureSelection(df):
  cols_to_keep = ['MONTH', 'DAY_OF_WEEK', 'DEP_DELAY', 'DEP_TIME_BLK', 'ARR_DELAY', 'ARR_TIME_BLK', 'CRS_ELAPSED_TIME', 'DISTANCE',  'CARRIER_DELAY', 'WEATHER_DELAY', 'NAS_DELAY', 'SECURITY_DELAY', 'LATE_AIRCRAFT_DELAY', 'IS_WEEKEND', 'DEP_RUSH_HOUR', 'ARR_RUSH_HOUR','FLIGHT_BEARING', 'ORIGIN_CARRIER', 'DEST_CARRIER', 'ORIGIN_STATION_WND_DIR', 'ORIGIN_STATION_VIS', 'ORIGIN_STATION_SLP','ORIGIN_STATION_AA1',    'ORIGIN_STATION_WND', 'DEST_STATION_WND_DIR', 'DEST_STATION_VIS', 'DEST_STATION_SLP', 'DEST_STATION_AA1',  'DEST_STATION_WND']
  cols_to_remove = [x for x in df.columns if x not in cols_to_keep]
  df = df.withColumnRenamed("ORIGIN_STATION_WND[0]", "ORIGIN_STATION_WND_DIR")
  df = df.withColumnRenamed("ORIGIN_STATION_VIS[0]", "ORIGIN_STATION_VIS")
  df = df.withColumnRenamed("ORIGIN_STATION_SLP[0]", "ORIGIN_STATION_SLP")
  df = df.withColumnRenamed("ORIGIN_STATION_AA1[0]", "ORIGIN_STATION_AA1")
  df = df.withColumnRenamed("ORIGIN_STATION_WND[3]", "ORIGIN_STATION_WND")
  
  df = df.withColumnRenamed("DEST_STATION_WND[0]", "DEST_STATION_WND_DIR")
  df = df.withColumnRenamed("DEST_STATION_VIS[0]", "DEST_STATION_VIS")
  df = df.withColumnRenamed("DEST_STATION_SLP[0]", "DEST_STATION_SLP")
  df = df.withColumnRenamed("DEST_STATION_AA1[0]", "DEST_STATION_AA1")
  df = df.withColumnRenamed("DEST_STATION_WND[3]", "DEST_STATION_WND")  
  
  featureSelection_df = df.drop(*cols_to_remove)
  return featureSelection_df

In [39]:
airlines_train_df =  featureSelection(airlines_train)

In [40]:
numeric_features = [x[0] for x in airlines_train_df.dtypes if x[1] == 'int' or x[1] == 'double']
numeric_features.remove('ARR_DELAY')
cat_features = ['MONTH', 'DAY_OF_WEEK'] # These are catogorical fetures in 'int' type
numeric_features = [x for x in numeric_features if x not in cat_features]
numeric_features

In [41]:
categorical_features = [x[0] for x in airlines_train_df.dtypes if x[1] == 'string']
categorical_features = categorical_features + cat_features
categorical_features

In [42]:
stages = []
for categoricalCol in categorical_features:
    stringIndexer = StringIndexer(inputCol = categoricalCol, outputCol = categoricalCol + 'Index', handleInvalid="keep")
    encoder = OneHotEncoderEstimator(inputCols=[stringIndexer.getOutputCol()], outputCols=[categoricalCol + "classVec"])
    stages += [stringIndexer, encoder]
assemblerInputs = [c + "classVec" for c in categorical_features] + numeric_features
assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features", handleInvalid="keep")
stages += [assembler]

In [43]:
pipeline = Pipeline().setStages(stages).fit(airlines_train_df)
vector_airlines_train_df = pipeline.transform(airlines_train_df)
vector_airlines_train_df.printSchema()

In [44]:
train_df = vector_airlines_train_df.select(col("ARR_DELAY").alias("label"), col("features"))
train_df.show(2)

In [45]:
airlines_val_df =  featureSelection(airlines_val)
vector_airlines_val_df = pipeline.transform(airlines_val_df)
val_df = vector_airlines_val_df.select(col("ARR_DELAY").alias("label"), col("features"))

<h2 align="center">Algorithm Exploration:</h2>
To predict `ARR_DELAY` from the dataset, we are going to consider below supervised machine learning algorithms using cross validation.

1. Linear Regression
2. Decision Tree Regressor
3. Random Forest Regressor
4. Gradient Boosted Tree Regressor

### Linear Regression:

In [48]:
lr = LinearRegression(featuresCol = 'features', labelCol='label')
paramGrid_lr = ParamGridBuilder() \
   .addGrid(lr.regParam, [0.1, 0.01, 0.001]) \
   .addGrid(lr.maxIter, [10, 20]) \
   .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0]) \
   .build() 

crossval_lr = CrossValidator(estimator=lr,
                          estimatorParamMaps=paramGrid_lr,
                          evaluator=RegressionEvaluator(),
                          numFolds=5) 

cvModel_lr = crossval_lr.fit(train_df)

In [49]:
regression_evaluator_r2 = RegressionEvaluator(predictionCol="prediction", labelCol="label",metricName="r2")
regression_evaluator_rmse = RegressionEvaluator(predictionCol="prediction", labelCol="label",metricName="rmse")
regression_evaluator_mae = RegressionEvaluator(predictionCol="prediction", labelCol="label",metricName="mae")

regression_metrics_list = []

In [50]:
# Save and Load CrossValidatorModel
cvModel_lr.write().overwrite().save('/FileStore/tables/cvModel_lr')

In [51]:
saved_cvModel_lr = CrossValidatorModel.load('/FileStore/tables/cvModel_lr')

In [52]:
# train_df evaluation metrics
lr_predictions_train = saved_cvModel_lr.transform(train_df)
lr_train_r2 = regression_evaluator_r2.evaluate(lr_predictions_train)
lr_train_rmse = regression_evaluator_rmse.evaluate(lr_predictions_train)
lr_train_mae = regression_evaluator_mae.evaluate(lr_predictions_train)
regression_metrics_list.append(["LinearRegression_TrainData_CV", lr_train_r2, lr_train_rmse, lr_train_mae ])

#  val_df evaluation metrics
lr_predictions_val = saved_cvModel_lr.transform(val_df)
lr_val_r2 = regression_evaluator_r2.evaluate(lr_predictions_val)
lr_val_rmse = regression_evaluator_rmse.evaluate(lr_predictions_val)
lr_val_mae = regression_evaluator_mae.evaluate(lr_predictions_val)
regression_metrics_list.append(["LinearRegression_ValData_CV", lr_val_r2, lr_val_rmse, lr_val_mae ])

In [53]:
bestLRModel = cvModel_lr.bestModel
bestParams = bestLRModel.extractParamMap()
bestParams

### Decision Tree Regressor

In [55]:
dt = DecisionTreeRegressor(featuresCol="features", labelCol='label') 

paramGrid_dt = ParamGridBuilder()\
    .addGrid(dt.maxBins, [16, 32]) \
    .addGrid(dt.maxDepth, [5, 10]) \
    .addGrid(dt.minInstancesPerNode, [1, 5]) \
    .build()  

crossval_dt = CrossValidator(estimator=dt,
                          estimatorParamMaps=paramGrid_dt,
                          evaluator=RegressionEvaluator(),
                          numFolds=5) 

cvModel_dt = crossval_dt.fit(train_df)

In [56]:
# Save and Load CrossValidatorModel
cvModel_dt.write().overwrite().save('/FileStore/tables/cvModel_dt')

In [57]:
saved_cvModel_dt = CrossValidatorModel.load('/FileStore/tables/cvModel_dt')

In [58]:
# train_df evaluation metrics
dt_predictions_train = saved_cvModel_dt.transform(train_df)
dt_train_r2 = regression_evaluator_r2.evaluate(dt_predictions_train)
dt_train_rmse = regression_evaluator_rmse.evaluate(dt_predictions_train)
dt_train_mae = regression_evaluator_mae.evaluate(dt_predictions_train)
regression_metrics_list.append(["DecisionTreeRegressor_TrainData_CV", dt_train_r2, dt_train_rmse, dt_train_mae ])

# val_df evaluation metrics
dt_predictions_val = saved_cvModel_dt.transform(val_df)
dt_val_r2 = regression_evaluator_r2.evaluate(dt_predictions_val)
dt_val_rmse = regression_evaluator_rmse.evaluate(dt_predictions_val)
dt_val_mae = regression_evaluator_mae.evaluate(dt_predictions_val)
regression_metrics_list.append(["DecisionTreeRegressor_ValData_CV", dt_val_r2, dt_val_rmse, dt_val_mae ])

In [59]:
bestDTModel = cvModel_dt.bestModel
bestParams_dt = bestDTModel.extractParamMap()
bestParams_dt

### Random Forest Regressor

In [61]:
rf = RandomForestRegressor(featuresCol="features", labelCol='label')

paramGrid_rf = ParamGridBuilder()\
    .addGrid(rf.maxBins, [16, 32]) \
    .addGrid(rf.numTrees, [20, 40]) \
    .addGrid(rf.minInstancesPerNode, [1, 5]) \
    .build()  

crossval_rf = CrossValidator(estimator=rf,
                          estimatorParamMaps=paramGrid_rf,
                          evaluator=RegressionEvaluator(),
                          numFolds=5) 

cvModel_rf = crossval_rf.fit(train_df)

In [62]:
  # Save and Load CrossValidatorModel
cvModel_rf.write().overwrite().save('/FileStore/tables/cvModel_rf')


In [63]:
saved_cvModel_rf = CrossValidatorModel.load('/FileStore/tables/cvModel_rf')

In [64]:
rf_predictions_train = saved_cvModel_rf.transform(train_df)
rf_train_r2 = regression_evaluator_r2.evaluate(rf_predictions_train)
rf_train_rmse = regression_evaluator_rmse.evaluate(rf_predictions_train)
rf_train_mae = regression_evaluator_mae.evaluate(rf_predictions_train)
regression_metrics_list.append(["RandomForestRegressor_TrainData_CV", rf_train_r2, rf_train_rmse, rf_train_mae ])

# val_df evaluation metrics
rf_predictions_val = saved_cvModel_rf.transform(val_df)
rf_val_r2 = regression_evaluator_r2.evaluate(rf_predictions_val)
rf_val_rmse = regression_evaluator_rmse.evaluate(rf_predictions_val)
rf_val_mae = regression_evaluator_mae.evaluate(rf_predictions_val)
regression_metrics_list.append(["RandomForestRegressor_ValData_CV", rf_val_r2, rf_val_rmse, rf_val_mae ])

In [65]:
bestRFModel = cvModel_rf.bestModel
bestParams_rf = bestRFModel.extractParamMap()
bestParams_rf

### Gradient-Boosted Trees

In [67]:
 gbt = GBTRegressor(featuresCol="features", labelCol='label')

paramGrid_gbt = ParamGridBuilder()\
    .addGrid(gbt.maxBins, [10, 32]) \
    .addGrid(gbt.minInstancesPerNode, [1, 5]) \
    .build()  

crossval_gbt = CrossValidator(estimator=gbt,
                          estimatorParamMaps=paramGrid_gbt,
                          evaluator=RegressionEvaluator(),
                          numFolds=5) 

cvModel_gbt = crossval_gbt.fit(train_df)

In [68]:
# Save and Load CrossValidatorModel
cvModel_gbt.write().overwrite().save('/FileStore/tables/cvModel_gbt')


In [69]:
saved_cvModel_gbt = CrossValidatorModel.load('/FileStore/tables/cvModel_gbt')

In [70]:
gbt_predictions_train = saved_cvModel_gbt.transform(train_df)
gbt_train_r2 = regression_evaluator_r2.evaluate(gbt_predictions_train)
gbt_train_rmse = regression_evaluator_rmse.evaluate(gbt_predictions_train)
gbt_train_mae = regression_evaluator_mae.evaluate(gbt_predictions_train)
regression_metrics_list.append(["GradientBoostedTreeRegressor_TrainData_CV", gbt_train_r2, gbt_train_rmse, gbt_train_mae ])

# val_df evaluation metrics
gbt_predictions_val = saved_cvModel_gbt.transform(val_df)
gbt_val_r2 = regression_evaluator_r2.evaluate(gbt_predictions_val)
gbt_val_rmse = regression_evaluator_rmse.evaluate(gbt_predictions_val)
gbt_val_mae = regression_evaluator_mae.evaluate(gbt_predictions_val)
regression_metrics_list.append(["GradientBoostedTreeRegressor_ValData_CV", gbt_val_r2, gbt_val_rmse, gbt_val_mae ])

In [71]:
bestGBTModel = cvModel_gbt.bestModel
bestParams_gbt = bestGBTModel.extractParamMap()
bestParams_gbt

## Results:

In [73]:
regression_metrics_df = pd.DataFrame(regression_metrics_list, columns = ['Model_Data' , 'R^2', 'RMSE', 'MAE']) 
display(regression_metrics_df)

Model_Data,R^2,RMSE,MAE
LinearRegression_TrainData_CV,0.9556805965871404,13.422721977071546,9.542329680250475
LinearRegression_ValData_CV,0.9578701682031706,13.397124217962183,9.65773179158638
DecisionTreeRegressor_TrainData_CV,0.7762539878356634,29.88296529022086,11.616142687603576
DecisionTreeRegressor_ValData_CV,0.7428123335791165,33.48911533037165,11.402500854607558
RandomForestRegressor_TrainData_CV,0.70284374595401,34.934630735911,13.792047792327292
RandomForestRegressor_ValData_CV,0.6815277669348756,36.31649598510621,13.553556662155774
GradientBoostedTreeRegressor_TrainData_CV,0.7553940164461354,30.382756887520877,11.892415663469045
GradientBoostedTreeRegressor_ValData_CV,0.7765612911647015,33.29219070666156,12.084116229875963


From above results, we can see that Linear Regression performed well on the training and validation data. We will consider Linear Regression model for our inference on test data.

<h2 align="center">Algorithm Implementation:</h2>

#### Inference:
We will now predict the `ARRIVAL_DELAY` for test data using Linear Regression model that we trained using cross validation and hyperparameter tuning.

In [77]:
airlines_test_df =  featureSelection(airlines_test)
vector_airlines_test_df = pipeline.transform(airlines_test_df)
test_df = vector_airlines_test_df.select(col("ARR_DELAY").alias("label"), col("features"))

In [78]:
# # test_df evaluation metrics
lr_predictions_test = saved_cvModel_lr.transform(test_df)
lr_test_r2 = regression_evaluator_r2.evaluate(lr_predictions_test)
lr_test_rmse = regression_evaluator_rmse.evaluate(lr_predictions_test)
lr_test_mae = regression_evaluator_mae.evaluate(lr_predictions_test)
regression_metrics_list.append(["LinearRegression_TestData_CV", lr_test_r2, lr_test_rmse, lr_test_mae ])

In [79]:
regression_metrics_test_df = pd.DataFrame(regression_metrics_list, columns = ['Model_Data' , 'R^2', 'RMSE', 'MAE']) 
display(regression_metrics_test_df)

Model_Data,R^2,RMSE,MAE
LinearRegression_TrainData_CV,0.9556805965871404,13.422721977071546,9.542329680250475
LinearRegression_ValData_CV,0.9578701682031706,13.397124217962183,9.65773179158638
DecisionTreeRegressor_TrainData_CV,0.7762539878356634,29.88296529022086,11.616142687603576
DecisionTreeRegressor_ValData_CV,0.7428123335791165,33.48911533037165,11.402500854607558
RandomForestRegressor_TrainData_CV,0.70284374595401,34.934630735911,13.792047792327292
RandomForestRegressor_ValData_CV,0.6815277669348756,36.31649598510621,13.553556662155774
GradientBoostedTreeRegressor_TrainData_CV,0.7553940164461354,30.382756887520877,11.892415663469045
GradientBoostedTreeRegressor_ValData_CV,0.7765612911647015,33.29219070666156,12.084116229875963
LinearRegression_TestData_CV,0.9529835434342336,13.605171894321128,9.520017398426202


<h2 align="center">Conclusion:</h2>

<h2 align="center">Application of Course Concepts:</h2>

In [82]:
r = ChiSquareTest.test(train_df, "features", "label").head()
print("pValues: " + str(r.pValues))


In [83]:
print("degreesOfFreedom: " + str(r.degreesOfFreedom))
 

In [84]:

print("statistics: " + str(r.statistics))