#Final Project - Predicting Airline Delays
__`MIDS w261: Machine Learning at Scale | UC Berkeley School of Information | Spring 2020`__

Team 3:
* Tonya Di Sera
* Ammara Essa
* Andy Hoopengardner
* Lee Moore

[Presentation slides](https://docs.google.com/presentation/d/1pxUoSHfAK64we-ReFgVAXdrOS0Q8xGsTSj_c7_Ryoq4/edit#slide=id.g8347663c4a_0_486)

<img src="https://github.com/tonydisera/261-final-project-images/blob/master/title.png?raw=true" width="60%" >

# Table of Contents
### 1. Question Formulation
### 2. EDA and Discussion of Challenges
### 3. Feature Engineering
### 4. Algorithm Exploration
### 5. Algorithm Implementation
### 6. Conclusions
### 7. Application of Course Concepts

#1. Question Formulation <a class="anchor" id="first-bullet"></a>

<img src="https://github.com/tonydisera/261-final-project-images/blob/master/question_formation.png?raw=true" width="60%" >

###1.1 Study objective and dataset

Our study focused on the objective of predicting detrimental flight outcomes, specifically flight delays, cancellations or diverted flights, utilising the airline dataset combined with station and weather data. This airline dataset includes 31,746,841 flights from 2015-2019 and joins in available data on location and time-specific weather. We have defined our binary classification task as no delay (=0) versus delay, diverted or cancelled flights (=1), where we applied the national standard definition of delay which is greater than 15 minutes delay to the arrival of the flight. This kind of analysis could prove useful to both travellers and airline companies as an early detection system that customer travel plans may be disrupted, so that alternative plans can be put into place sooner. We aim to build a model that was widely applicable as possible instead of narrowly focusing on specific airports, airlines or specific flight outcomes (i.e. ignoring cancelled or diverted flights). This represents an unbalanced classification task given the rarity of the "positive" outcome (defined as delayed, diverted or cancelled in this instance, and subsequently referred to just as "delayed flights") which represents roughly 20% of the flights in our dataset. 

###1.2 Testing approach and Evaluation Metrics

Given the time series nature of the study, we chose to reserve the older data from 2015-2018 for our training set and to utilise the most recent data from 2019 for dev and test sets. We randomly sampled 50% of the 2019 data for our dev set and the remaining 50% formed our test set which we have reserved for only the final fine-tuned model predictions. A balanced sampling approach was utilised so we maintained the sampe proportions of our minority/majority class labaels in each datset. We considered it important that our unseen test data should not be sampled from the same dates that our training data comes from, given that the evidence that a model is practically useful depends on its ability to predict delays on future flights.  

A review of the literature on airport delay prediction studies suggest that accuracy is the most frequently reported metric but we believe that this is not sufficient since it puts greater weight on correct predictions within the majority class (in this case "no delays" which represents 80% of the data) (Etani 2019, Patgiri 2020). We believe that recall is the key metric in this analysis given the opportunity to action on correct predictions regarding delayed flights. However, since precision is also important, given the risk of unnecessary preemptive investment into alternative travel arrangements would cause great frustration to customers, we would argue that the composite measure of F1-score should be our secondary target evaluation metric for optimization, following our primary target of recall optimization. To be practically useful, we believe (based on our collective subjective view and a review of the literature) that recall should be at least .8, meaning that at least 80% of all delayed flights are correctly predicted and precision should be at least .8, meaning that no more than 20% of predicted delays are actually on time. This would mean we are targeting an F1-score of 0.8.

###1.3 Baseline and Modelling Approach

For our baseline, we selected the naive classifier based on a simple logistic regression model which produces the crude prediction of "No Delay" for every flight. Given the unbalanced classification task, this still represents a reasonably strong accuracy of 80% and therefore underlines again the reason that accuracy cannot be the sole metric of interest. This baseline produces a zero value for precision and for recall.  For our algorithms of interest, we will explore optimizations of logistic regression as well as various formulations of decision trees.

###1.4 Study Limitations

Intuitively, weather will be a critical component to predicting likelihood of a flight delay, however real weather patterns at the time of flight are not available ahead of the flight time when the flight delay predictions would be most useful, e.g. at least several hours ahead of the flight departure. This highlights a disconnect between the data we have available and the reality of this use case. For a machine learning model to be practically useful, it would need to rely on weather _forecasts_ rather than on realized weather patterns. How different weather forecasts are from realized weather patterns will inform how well our training data will support accurate flight delay prediction, though this is not something that can be evaluated through our unseen test data (which again utilises actual realised weather).

Additionally, in the real world, a system for predicting delays would rely on a real-time system taking into account delays at connected airports where the relevant aircraft may be transiting through. This real-time system may also take into account evolving weather patterns over the flight paths of the relevant aircraft. These would involve utilising dynamic graph algorithms since the flight paths planned on a specific day may different significantly from the flight path planned the prior week or indeed the prior year. Therefore our traditional approach to training set / test set evaluation would not work in this situation. 

Another limitation is that for our base model we selected the simple binary task of predicting delay or no delay. However, a more meaningful outcome may require more granular classes such as 'no delay', 'modest delay', 'severe delay' given that different plans may be required in the case of a 1 hour delay compared to a 3 hours delay and additionally knowing the probability of having cancelled or diverted flight outcome would be important for customers. We believe that a two stage approach similar to that identified in the literature (Thiagarajan 2017), where we first predict delay or no delay (as we are doing in this analysis), and then for delayed flights we further predict duration of delay would be appropriate. Due to time constraints we focused just on the first stage of this model.

#2. EDA and Discussion of Challenges

<img src="https://github.com/tonydisera/261-final-project-images/blob/master/eda.png?raw=true" width="60%" >

Our exploratory data analysis began with building an understanding of each of the variables, their definitions and their summary statistics (including any missing data). Correlation matrices were created to understand bivariate relationships between continuous variables and various histograms, bar charts, and box plots were created to see how various potential features changed over time and over outcome status (delay or not delayed).

For additional work on EDA [see separate notebook here](https://dbc-b1c912e7-d804.cloud.databricks.com/?o=7564214546094626#notebook/1239491467269575/command/814519033145399)

In [5]:
# Import relevant libraries
from pyspark.sql import functions as f
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, IntegerType, NullType, ShortType, DateType, BooleanType, BinaryType
from pyspark.sql import SQLContext
from pyspark.sql.functions import udf, col, lit, when
from pyspark.sql.functions import isnan, when, count, col
from pyspark.sql.types import *
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

from functools import reduce
from operator import add


import matplotlib.pyplot as plt
import seaborn as sns; sns.set()
import pandas as pd
import numpy as np

from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoderEstimator, StringIndexer, VectorAssembler, StandardScaler
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.classification import GBTClassifier

from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.mllib.evaluation import MulticlassMetrics
from pyspark.mllib.evaluation import BinaryClassificationMetrics

import time
import numpy as np
import pandas as pd
import statistics
%matplotlib inline
sqlContext = SQLContext(sc)

In [6]:
# Load datasets to perform EDA
airlines = spark.read.option("header", "true").parquet(f"dbfs:/mnt/mids-w261/data/datasets_final_project/parquet_airlines_data/201*.parquet")
airlines.createOrReplaceTempView("airlines")

airlines_and_weather = spark.read.option("header", "true").parquet(f"dbfs:/user/tonydisera@ischool.berkeley.edu/final_project/data_new_weather/*")
airlines_and_weather.createOrReplaceTempView("airlines_and_weather")
airlines_and_weather = spark.sql("select YEAR(FL_DATE) as YEAR, * from airlines_and_weather")


### 2.1 EDA: How balanced is our outcome of interest?

We decided to focus on arrival delay rather than depature delay due to the more significant impact that this can have on customers. In many instances, a delayed departure can be counteracted by "making up time" in the air, in which case the travellers still reach their destination on time. Departure delay was therefore considered less important than an arrival delay. Our EDA showed a classic skewed curve for total minues of delay. For our classification task, we focused on delays of 15 minutes or more and also included cancelled and diverted flights in the definition of our positive outcome. We considered it important to consider all three types of detrimental flight outcomes in our model because they all occur in the real world, and consequently our model trained on this data should be able to correctly identify all of these real life events. 

Given that the positive outcome is underepresented, this has potential significance for our algorithms of interest. Specifically, due to this finding, we will consider methods to weight or rebalance the training set prior to training.

In [8]:
# Display histogram of arrival delays in minutes
plt.rcParams["figure.figsize"] = (15,4)
bins, counts = airlines.filter("ARR_DELAY_NEW  IS NOT NULL") \
                            .filter("ARR_DELAY_NEW  < 120") \
                            .filter("ARR_DELAY_NEW  > 0") \
                            .selectExpr("ARR_DELAY_NEW").rdd.flatMap(lambda x: x).histogram(30)
plt.hist(bins[:-1], bins=bins, weights=counts)

In [9]:
# EDA on binary outcome variable
airlines = airlines.withColumn('ARRIVAL_OUTCOME', f.when((f.col("ARR_DELAY_NEW") > 15) | \
                                          (f.col("CANCELLED") == "true") | \
                                          (f.col("DIVERTED") == "true"), 'delayed')\
                                          .otherwise('on time'))
airlines.createOrReplaceTempView("airlines")
display(spark.sql("select ARRIVAL_OUTCOME, count(*) as count from airlines group by ARRIVAL_OUTCOME order by count desc"))

ARRIVAL_OUTCOME,count
on time,25586218
delayed,6160623


### 2.1.1 EDA: Is it justified to create a composite outcome of flight delay, cancelled and diverted flights?

We justified our composite outcome measure by examining various features with respect to each outcome separately - specifically flight delays greater than 15 minutes, cancelled flights and diverted flights. Our belief was that the attributes which caused cancelled or diverted flights are primarily extreme versions of the reasons that cause flight delays (i.e. heavy rain versus hurricanes). Our EDA on this topic did align with our hypothesis. Specifically, where a weather feature tended to be higher/lower for delayed flights, it tended to be even higher/lower for cancelled and diverted flights. By combining these three outcomes, we were also able to increase the total representation of our 'positive' class. In the original dataset, there are 6,160,623 delayed, cancelled or diverted flights representing 19.4% of all successful flights.

In [11]:
# EDA on cancelled and diverted flight outcomes - to justify composite outcome approach

airlines_and_weather = airlines_and_weather.withColumn('OUTCOME', f.when((f.col("ARR_DELAY_NEW") > 15) | \
                                          (f.col("CANCELLED") == "true") | \
                                          (f.col("DIVERTED") == "true"), 1)\
                                          .otherwise(0))
airlines_and_weather.createOrReplaceTempView("airlines_and_weather")
df1 = spark.sql("select OUTCOME, ORIGIN_CIG_AVG, DEST_SLP_AVG from airlines_and_weather where CANCELLED = 'false' and DIVERTED = 'false'  ").toPandas()
df2 = spark.sql("select OUTCOME, ORIGIN_CIG_AVG, DEST_SLP_AVG from airlines_and_weather where CANCELLED = 'true' and DIVERTED = 'false'  ").toPandas()
df3 = spark.sql("select OUTCOME, ORIGIN_CIG_AVG, DEST_SLP_AVG from airlines_and_weather where CANCELLED = 'false' and DIVERTED = 'true'  ").toPandas()

fig = plt.figure(figsize=(12,12))
ax1a = fig.add_subplot(2, 3, 1)
ax1b = fig.add_subplot(2, 3, 2)
ax1c = fig.add_subplot(2, 3, 3)
ax2a = fig.add_subplot(2, 3, 4)
ax2b = fig.add_subplot(2, 3, 5)
ax2c = fig.add_subplot(2, 3, 6)

df1.boxplot(column ='ORIGIN_CIG_AVG', by='OUTCOME', ax=ax1a, showfliers=False)
df2.boxplot(column ='ORIGIN_CIG_AVG', ax=ax1b, showfliers=False)
df3.boxplot(column ='ORIGIN_CIG_AVG', ax=ax1c, showfliers=False)


ax1a.set_ylim(0, 20000)
ax1b.set_ylim(0, 20000)
ax1c.set_ylim(0, 20000)
ax1a.set_title("ontime vs delayed")
ax1b.set_title("cancelled")
ax1c.set_title("diverted")


df1.boxplot(column ='DEST_SLP_AVG', by='OUTCOME', ax=ax2a, showfliers=False)
df2.boxplot(column ='DEST_SLP_AVG', ax=ax2b, showfliers=False)
df3.boxplot(column ='DEST_SLP_AVG', ax=ax2c, showfliers=False)

ax2a.set_ylim(10000, 10300)
ax2b.set_ylim(10000, 10300)
ax2c.set_ylim(10000, 10300)
ax2a.set_title("ontime vs delayed")
ax2b.set_title("cancelled")
ax2c.set_title("diverted")

fig.suptitle('Weather - On time, Delay, Cancelled, Diverted')
plt.subplots_adjust(hspace=.5)

display(plt.show())

### 2.2 EDA: Are there seasonal and time-of-day trends for delays?

We explored temporal factors to determine if there would be value in building a multi-temporal model.

#### 2.2.1 EDA: Seasonal trends

While flight frequency is relatively steady throughout the year, we noticed that flight delay times varied by 'season', specifically in summer and winter. For this analysis we define the Seasons as:  
* **Spring**: March (3) , April (4) , May (5)  
* **Summer**: June (6) , July (7) , August (8)
* **Fall**: September (9) , October (10) , November (11)
* **Winter**: December (12) , January (1) , February (2)  

This seaonal cycle could be because of a variety of intuitive factors such as weather, increased holiday travels etc. Since we recongnize there is a clear indicator that delays vary by time of year and thus predicting a flight occuring in the fall on a model that is trained on data from summer may not be optimal. This insight from the EDA has helped inform our decision to use an ensemble model separated in part by seasons for our final algorithm.

In [14]:
# Plot delays and trip counts by month
delay_by = spark.sql("select month, avg(ARR_DELAY_NEW) as delay, count(*) as trip_count from airlines group by month order by month")
result_df = delay_by.select("*").toPandas()

fig = plt.figure(figsize=(12,10))
ax1 = fig.add_subplot(2, 1, 1)
ax2 = fig.add_subplot(2, 1, 2)

sns.barplot(x="month", y="delay", data=result_df, ax=ax1,  dodge=False)
sns.barplot(x="month", y="trip_count", data=result_df, ax=ax2,  dodge=False)

display(plt.show())

#### 2.2.2 EDA: Time of day of flight departure

Arrival time also appears to have an impact on delay time, with flights arriving later in the evening experiencing the longest delays. Literature (Kafle et all, 2016) suggests that this may be due to a delay propagation, where some delay originating from an upstream flight spreads to downstream flights. In other words, previously delayed flights contribute to the delays in subsequent flights. To that end, this could be a reason why flights arriving later in the day experiecne higher delays. Crucially, as with seasonality, the variabiliy in delay based on time of day motivated us to include arrival time of day as an element in our ensemble model structure. For this analysis we define the Arrival Time of Day as:  
* **MORNING**: 0600 <= "ARR_TIME_BLK" <= 1359
* **EVENING**: 1400 <= "ARR_TIME_BLK" <= 2159
* **LATE_NIGHT**: 2200 <= "ARR_TIME_BLK" <= 0559

In [16]:
#EDA of arrival delay in minutes by time of flight arrival
plt.rcParams["figure.figsize"] = (25,6)
arrive_delay_df = spark.sql("select ARR_DELAY_NEW, ARR_TIME_BLK from airlines where ARR_DELAY_NEW > 0 and ARR_DELAY_NEW < 120").toPandas()
ax1 = arrive_delay_df.boxplot(column=['ARR_DELAY_NEW'], by=['ARR_TIME_BLK'])
ax1.set_title('Time of day of flight arrival')
ax1.set_xlabel('ARR_TIME_BLK'); ax1.set_ylabel('ARR_DELAY_NEW');
plt.show()

### 2.3 EDA: Are there continuous features that are highly correlated?

This analysis was important in order to determine the risk of multicollinearity within our logistic regression model.  Due to high correlation, we removed the following features:
QUARTER, CRS_ARR_TIME, DISTANCE.  

Even though they are correlated, we left in TEMP, DEW, and SLP (sea level air pressure) given that seasonality and general weather patterns are critical factors in airline delays. We want to ensure that multi-collinarity doesn't impact our Logistic Regression model, so we did some exploratory runs comparing our metrics for data with weather, data without any weather and data without TEMP.  This exploratory work demonstrated that weather has significant predictive power and that leaving TEMP in the model is well tolerated (see EDA notebook link above for code).

<img src="https://github.com/tonydisera/261-final-project-images/blob/master/correlations_continuous_variables.png?raw=true" width="60%" >

#3. Feature Engineering

<img src="https://github.com/tonydisera/261-final-project-images/blob/master/fe.png?raw=true" width="60%" >

###3.1 Feature selection process

Along our EDA, our team adopted a systematic procedure to review and determine which variables should be potential features and which should be excluded, either due to relevance in predicting the outcome and practical considerations of capturing the data in a real-world use case. Details of our collaborative effort to determine which features to include/exclude are provided in [this Google spreadsheet](https://docs.google.com/spreadsheets/d/1O-BqLHvE9cyPvAKEmtWlk9OBhoGCAhXaeJuuIPRCI3o/edit?)

From the original 2 datasets, this involved the following:
- Airlines dataset: we removed variables that provided 
  - Duplicative information (i.e. different encodings for the airport station, city, and time variables such as quarter given that we were already including month), 
  - Variables that were linked too closely to the outcome (i.e. taxi in, taxi out time, and all the delay status variables) and finally 
  - Variables that based on our EDA and our intuition did not appear to be a meaningful predictor of delays (i.e. day of month, US State of origin or destination airport)
- Weather dataset:
  - See [separate notebook for joining weather to airlines](https://dbc-b1c912e7-d804.cloud.databricks.com/?o=7564214546094626#notebook/1652976813635466/command/1652976813635476)
  - Given a large number of potential variables that all described a certain kind of weather event (i.e. wind, precipitation, or visibility), for simplicity we utilised only the available continuous variables and ignored all potential categorical variables
  - We also simplified the available continuous variables by taking a daily average of their output. While we know this means that we applied 'future weather' to predict delays to flights, we justified this on the basis that the real-world use case would use weather forecasts to support predictions and used this future weather as a proxy for these weather forecasts (see Section 1.5 for a more details discussion of this study limitation)
  
  <img src="https://github.com/tonydisera/261-final-project-images/blob/master/join_weather.png?raw=true" width="50%" >
  
- Creating new features:
  - We created a directed graph of all the flights in the training set and used this to calculate the pagerank of each of the stations, with the hypothesis that the importance of a airport station may be associated with its likelihood of delayed flights. We therefore created 2 new variables for both the origin airport pagerank and the destination airport pagerank. Additional centrality measures were also considered (betweenness and closeness) but due to high correlation between these and pagerank, we chose to focus on just pagerank.
  
###3.2 Complete list of features included

The below is the exhaustive list of features which were included in all algorithms. Most features are self-explanatory, where they are not self-explanatory, a brief description follows. Additionally, all features were numeric with the exception of some which are noted as categorical. 

- Airport Station Characteristics 
  - ORIGIN (Origin airport code - categorical), DEST (Destination airport code - categorical), DEST_LATITUDE, DEST_LONGITUDE, DEST_ELEV, ORIGIN_LATITUDE, ORIGIN_LONGITUDE, ORIGIN_ELEV
- Flight-specific features
  - OP_UNIQUE_CARRIER (Flight company code - categorial), CRS_ELAPSED_TIME (expected duration of flight)
- Time-specific features
  - YEAR (categorial), MONTH (categorial), DAY_OF_MONTH (categorical), DAY_OF_WEEK (categorical), CRS_DEP_TIME (time of departure in minutes)
- Graph-related (airport importance) features
  - DEST_PAGERANK, ORIGIN_PAGERANK
- Daily average Weather
  - ORIGIN_WND_ANGLE_AVG / DEST_WND_ANGLE_AVG (observed wind speed rate in meters per second), 
  - ORIGIN_WND_SPEED_AVG /  DEST_WND_ANGLE_AVG (observed wind speed rate in meters per second), 
  - ORIGIN_CIG_AVG / DEST_CIG_AVG (ceiling height in meters), 
  - ORIGIN_VIS_AVG / DEST_VIS_AVG (visibility distance in meters), 
  - ORIGIN_TMP_AVG / DEST_TMP_AVG (air temperature in Celsius, factors of 10), 
  - ORIGIN_DEW_AVG / DEST_DEW_AVG (dew point temperature in Celsius, factors of 10),  
  - ORIGIN_SLP_AVG / DEST_SLP_AVG (sea level pressure in Hectopascals, factor by 10), 
  - ORIGIN_PRECIP_RATE_AVG / DEST_PRECIP_RATE_AVG (depth of liquid precipitation at time of observation, in millimeters),
  - ORIGIN_SNOW_DEPTH_AVG / DEST_SNOW_DEPTH_AVG (snow depth in millimeters)

Additionally, for our tree-based models, we were able to incorporate additional features that were highly correlated with some of the features above. This is because tree-based models are not limited by multicollinearity concerns, so by including these features we can potentially identify the best potential predictor features. Additionally, categorical features were transformed to ordinal features using Brieman's methods and are noted accordingly below (as above, all other features are continuous).  

- Airport Station Characteristics
  - ORIGIN STATE FIPS / DEST STATE FIPS (US State identifier - Ordinal)
  - ORIGIN CITY MARKET ID / DEST CITY MARKET ID (US City identifier - Ordinal)
- Flight-specific features
  - CRS_ARR_TIME
  - DISTANCE
- Time-specific features
  - QUARTER, CRS_ARR_TIME (expected arrival time)
  
###3.3 Missing values treatment and Encoding process

- Missing values. We spent considerable effort recovering all of the weather stations (except for one ambiguous airport in North Dakota) and we selected only those weather fields that were core, mandatory fields from the weather data collection.  In addition, we included one optional weather field, the snow depth, since it was approximately 75% non-null. (We set the value to zero for the 25% missing snow depth entries.)  The query provided below **(see 3.3.1)**, shows the percent of missing data for each feature.  Outside of snow depth, other features were present in **99.5%** of the observations, so we choose to skip those rows (handleInvalid="skip" in data encoding pipeline) with missing values. This resulted in minimal dropout from 31,746,841 observations down to 30,999,177
 observations (2.3% dropout).

- Treatment of categorical variables (one hot encoding). For logistic regression, we one-hot encoded several categorical features.  However, for decision trees algorithms (including Random Forest and Gradient Boosted Trees), we created a separate data encoding pipeline, treating ordered categorical features (e.g. DAY_OF_WEEK, QUARTER, DAY_OF_MONTH) as numerical and converting unordered categorical features (e.g. origin and destination airports and states, airlines carrier id, etc) to ordinals using Brieman's method.

- Standardization. For the logistic regression data encoding pipeline, all numeric features were standardized.  This is necessary given that some features in terms of their mean and standard deviation vary wildly from other features depending on how they were measured.  For example, wind speed rate (WND) ranges from 0 to 80 meters per second where as ceiling height (CIG), measured in meters, ranges from 0 to 20,000. Without standardization, gradient descent would likely suffer from non-convergence given that the highest magnitude of values will distort the direction of the gradient.  Also, for L1 and L2 regularization, standardization is critical; otherwise regularization will tend to penalize those features in smaller scales.

- Two Data Encoding Pipelines.  Decision trees don't require standardization of numeric values and can handle missing data. Also, given that one-hot encoding removes important signal related to the categorical features makes it unlikely that such features will be considered at a top level node, we decided to create a different data encoding pipeline for decision trees that would forgo standardization, treat ordinal categorical features as numeric, and convert unordered categorical features to ordinals using Brieman's method.

In [20]:
#
# Create the binary outcome variable
#
data_raw = airlines_and_weather.withColumn('OUTCOME', f.when((f.col("ARR_DELAY_NEW") > 15) | \
                                          (f.col("CANCELLED") == "true") | \
                                          (f.col("DIVERTED") == "true"), 1)\
                                          .otherwise(0))
data_raw.createOrReplaceTempView("data_raw")

In [21]:
# Add id
data_raw = data_raw.withColumn('id', f.monotonically_increasing_id())
data_raw.createOrReplaceTempView("data_raw")

In [22]:
#
# Build graph to calculate pagerank on just the data_raw_train dataset (not on unseen data)
#
import networkx as nx
data_raw_train    = spark.sql("select * from data_raw where YEAR < 2019 or (YEAR = 2019 and MONTH = 1)")
# create the airline graph as a list of nodes and edges (this has no weights, meaning we are looking at unique paths and not frequency of flights)
AIRLINE_GRAPH = {'nodes': data_raw_train.select('ORIGIN', 'DEST').rdd.flatMap(list).distinct().collect(),
             'edges': data_raw_train.select('ORIGIN', 'DEST').rdd.map(tuple).collect()}

# create directed graph object
G = nx.DiGraph()
G.add_nodes_from(AIRLINE_GRAPH['nodes'])
G.add_edges_from(AIRLINE_GRAPH['edges'])

# Spark Dataframe of all stations and their caculated pagerank
calculated_rank = nx.pagerank(G, alpha=0.85)
df_pagerank = pd.DataFrame(calculated_rank.items(), columns=['Station', 'PageRank'])
df_pagerank = spark.createDataFrame(df_pagerank)
df_pagerank.createOrReplaceTempView("df_pagerank")

# create two new features ORIGIN_PAGERANK and DEST_PAGERANK 
data_raw = spark.sql("select * from data_raw left join df_pagerank on data_raw.DEST == df_pagerank.Station").drop('Station')
data_raw = data_raw.withColumnRenamed('PageRank','DEST_PAGERANK')
data_raw.createOrReplaceTempView("data_raw")
data_raw = spark.sql("select * from data_raw left join df_pagerank on data_raw.ORIGIN == df_pagerank.Station").drop('Station')
data_raw = data_raw.withColumnRenamed('PageRank','ORIGIN_PAGERANK')
data_raw.createOrReplaceTempView("data_raw")

#### 3.3.1 Missing values

In [24]:
#
# Compute percent of missing data for each feature
#
numericCols = [
'ORIGIN_LATITUDE','ORIGIN_LONGITUDE', 'ORIGIN_ELEV',
'DEST_LATITUDE',  'DEST_LONGITUDE',   'DEST_ELEV',
'CRS_ELAPSED_TIME','CRS_DEP_TIME',
'ORIGIN_WND_ANGLE_AVG', 'ORIGIN_WND_SPEED_AVG', 'ORIGIN_CIG_AVG', 'ORIGIN_VIS_AVG', 'ORIGIN_TMP_AVG', 'ORIGIN_DEW_AVG', 'ORIGIN_SLP_AVG', 
'DEST_WND_ANGLE_AVG',   'DEST_WND_SPEED_AVG',   'DEST_CIG_AVG',   'DEST_VIS_AVG',   'DEST_TMP_AVG',   'DEST_DEW_AVG',   'DEST_SLP_AVG',
'ORIGIN_PRECIP_RATE_AVG', 'ORIGIN_SNOW_DEPTH_AVG',
'DEST_PRECIP_RATE_AVG',   'DEST_SNOW_DEPTH_AVG',
'DEST_PAGERANK',
'ORIGIN_PAGERANK']
null_counts = data_raw.select([count(when(col(c).isNull(), c)).alias(c) for c in numericCols])
null_counts_df = null_counts.toPandas()
null_counts_df = null_counts_df.loc[:, (null_counts_df > 0).all(axis=0)]
total_count = data_raw.count()
cols = null_counts_df.columns
df = null_counts_df.T
df.columns = ['count_missing']
df['column_name'] = cols
df['percent_missing'] = np.round((df['count_missing'] / total_count) * 100, 2)
df = df.reindex(columns=['column_name','count_missing', 'percent_missing'])
display(df)

column_name,count_missing,percent_missing
ORIGIN_LATITUDE,8895,0.03
ORIGIN_LONGITUDE,8895,0.03
ORIGIN_ELEV,8895,0.03
DEST_LATITUDE,8899,0.03
DEST_LONGITUDE,8899,0.03
DEST_ELEV,8899,0.03
CRS_ELAPSED_TIME,164,0.0
ORIGIN_WND_ANGLE_AVG,140124,0.44
ORIGIN_WND_SPEED_AVG,139428,0.44
ORIGIN_CIG_AVG,139684,0.44


In [25]:
#
# Zero missing snow depth average
#
data_raw = data_raw.fillna({'ORIGIN_SNOW_DEPTH_AVG':0})
data_raw = data_raw.fillna({'DEST_SNOW_DEPTH_AVG':0})
data_raw.createOrReplaceTempView("data_raw")

#### 3.3.2  Data Prep Pipeline for Logistic Regression

In [27]:
#
# Encode the data
#
def create_encoding_stages_lr():
  stages = [] # stages in our Pipeline

  # Convert label into label indices using the StringIndexer
  label_stringIdx = StringIndexer(inputCol="OUTCOME", outputCol="label")
  stages += [label_stringIdx]

  # Perform one-hot encoding on categorical features
  categoricalColumns = ['YEAR', 'MONTH', 'DAY_OF_MONTH', 'DAY_OF_WEEK', 'OP_UNIQUE_CARRIER', 'ORIGIN', 'DEST']
  for categoricalCol in categoricalColumns:
      # Category Indexing with StringIndexer
      stringIndexer = StringIndexer(inputCol=categoricalCol, outputCol=categoricalCol + "Index",  handleInvalid="skip")
      # Use OneHotEncoder to convert categorical variables into binary SparseVectors
      # encoder = OneHotEncoderEstimator(inputCol=categoricalCol + "Index", outputCol=categoricalCol + "classVec")
      encoder = OneHotEncoderEstimator(inputCols=[stringIndexer.getOutputCol()], outputCols=[categoricalCol + "classVec"])
      # Add stages.  These are not run here, but will run all at once later on.
      stages += [stringIndexer, encoder]

  assemblerInputs = [c + "classVec" for c in categoricalColumns] 
  assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features_categorical",  handleInvalid="skip")
  stages += [assembler]    

  # Scale numeric features
  numericCols = [
  'ORIGIN_LATITUDE','ORIGIN_LONGITUDE', 'ORIGIN_ELEV',
  'DEST_LATITUDE',  'DEST_LONGITUDE',   'DEST_ELEV',
  'CRS_ELAPSED_TIME','CRS_DEP_TIME',
  'ORIGIN_WND_ANGLE_AVG', 'ORIGIN_WND_SPEED_AVG', 'ORIGIN_CIG_AVG', 'ORIGIN_VIS_AVG', 'ORIGIN_TMP_AVG', 'ORIGIN_DEW_AVG', 'ORIGIN_SLP_AVG', 
  'DEST_WND_ANGLE_AVG',   'DEST_WND_SPEED_AVG',   'DEST_CIG_AVG',   'DEST_VIS_AVG',   'DEST_TMP_AVG',   'DEST_DEW_AVG',   'DEST_SLP_AVG',
  'ORIGIN_PRECIP_RATE_AVG', 'ORIGIN_SNOW_DEPTH_AVG',
  'DEST_PRECIP_RATE_AVG',   'DEST_SNOW_DEPTH_AVG',
  'DEST_PAGERANK',
  'ORIGIN_PAGERANK']
  assembler = VectorAssembler(inputCols=numericCols, outputCol="features_numeric",  handleInvalid="skip")
  scaler_encoder = StandardScaler(inputCol="features_numeric", outputCol="features_scaled",
                          withStd=True, withMean=True)
  stages += [assembler, scaler_encoder]

  # Assemble categorical (one-hot-encoded features) and scaled numeric features
  final_assembler = VectorAssembler(inputCols=["features_categorical", "features_scaled"], outputCol="features",  handleInvalid="skip")
  stages += [final_assembler]
  
  return (stages, categoricalColumns + numericCols)

In [28]:
#
# Cast columns to appropriate data type
#
data_raw.createOrReplaceTempView("data_raw")
data_raw = data_raw.withColumn("DEST_LATITUDE", data_raw["DEST_LATITUDE"].cast('double'))
data_raw = data_raw.withColumn("DEST_LONGITUDE", data_raw["DEST_LONGITUDE"].cast('double'))
data_raw = data_raw.withColumn("DEST_ELEV", data_raw["DEST_ELEV"].cast('double'))
data_raw = data_raw.withColumn("ORIGIN_LATITUDE", data_raw["ORIGIN_LATITUDE"].cast('double'))
data_raw = data_raw.withColumn("ORIGIN_LONGITUDE", data_raw["ORIGIN_LONGITUDE"].cast('double'))
data_raw = data_raw.withColumn("ORIGIN_ELEV", data_raw["ORIGIN_ELEV"].cast('double'))
data_raw = data_raw.withColumn("CRS_DEP_TIME", data_raw["CRS_DEP_TIME"].cast('double'))

# cast 'categories' to be one-hote encoded into strings
data_raw = data_raw.withColumn("YEAR", data_raw["YEAR"].cast('string'))
data_raw = data_raw.withColumn("MONTH", data_raw["MONTH"].cast('string'))
data_raw = data_raw.withColumn("DAY_OF_MONTH", data_raw["DAY_OF_MONTH"].cast('string'))
data_raw = data_raw.withColumn("DAY_OF_WEEK", data_raw["DAY_OF_WEEK"].cast('string'))
data_raw = data_raw.withColumn("OP_UNIQUE_CARRIER", data_raw["OP_UNIQUE_CARRIER"].cast('string'))
data_raw = data_raw.withColumn("ORIGIN", data_raw["ORIGIN"].cast('string'))
data_raw = data_raw.withColumn("DEST", data_raw["DEST"].cast('string'))
data_raw.createOrReplaceTempView("data_raw")

#
# Split into train and test
#
data_raw_train    = spark.sql("select * from data_raw where YEAR < 2019 or (YEAR = 2019 and MONTH = 1)")
data_raw_test     = spark.sql("select * from data_raw where YEAR = 2019 and MONTH > 1")
print('data raw train', f'{data_raw_train.count():,}')
print('data raw test ', f'{data_raw_test.count():,}')

data_raw_train.createOrReplaceTempView("data_raw_train")
data_raw_test.createOrReplaceTempView("data_raw_test")

#
# Run Pipeline to encode data for logistic regression
#
# Create pipeline for one-hot encoding, standardizing, and creating label for outcome variable
stages, orig_cols = create_encoding_stages_lr()
data_prep_pipeline       = Pipeline().setStages(stages)
# Fit based on the training data
data_prep_model          = data_prep_pipeline.fit(data_raw_train)
# Transform (encode) all of the raw data
data_prepped             = data_prep_model.transform(data_raw)
data_prepped.count()
data_prepped.createOrReplaceTempView('data_prepped')

#
# Split the encoded data into train, test
#
data_prepped_train    = spark.sql("select * from data_prepped where YEAR < 2019 or (YEAR = 2019 and MONTH = 1)")
data_prepped_test     = spark.sql("select * from data_prepped where YEAR = 2019 and MONTH > 1")

#
# Keep relevant columns
#
selected_cols = ["label", "features", "OUTCOME", "id"] + orig_cols 
data_prepped_train          = data_prepped_train.select(selected_cols)
data_prepped_test           = data_prepped_test.select(selected_cols)

#
# Separate train and test into smaller subsets
#
data_train          = data_prepped_train

# Taking 50% of both 0,1 outcomes from test to create test data set.  this is our hold-out set
data_test         = data_prepped_test.sampleBy("outcome", fractions={0: 0.50, 1: 0.50}, seed=10)

# Remaining 50% (unique set) will be our dev set for performance tuning
data_dev          = data_prepped_test.subtract(data_test)

# Take 5% sample of train for train_small
data_train_small    = data_train.sampleBy("outcome", fractions={0: 0.05, 1: 0.05}, seed=10)

# Take 5% of from dev to for dev small
data_dev_small    = data_dev.sampleBy("outcome", fractions={0: 0.05, 1: 0.05}, seed=10)

#### 3.3.3 Data Pipeline for Decision Trees

In [30]:
#
# Determine the frequency of each category (sum of outcome / total observations)
#
def calculate_ordinal(categorical_col, total_count):
  ordinal_col    = categorical_col + "_ORDINAL"
  id_col         = categorical_col + "_X"
  count_col      = categorical_col + "_COUNT"
  
  the_sql = f"""select ROW_NUMBER() OVER(ORDER BY count(*) ASC) AS {ordinal_col}, 
  {categorical_col} AS {id_col}, 
  sum(OUTCOME) / {total_count} as {count_col}
  from data_raw
  group by {id_col}
  order by {count_col}"""
  
  df_xref = spark.sql(the_sql)

  data_raw1 = data_raw.join(df_xref, data_raw[categorical_col] == df_xref[id_col], how="left")
  data_raw1 = data_raw1.drop(count_col)
  data_raw1 = data_raw1.drop(id_col)
  
  return data_raw1

In [31]:
def create_encoding_stages_dt():
  #
  # Encode the data
  #
  stages = [] # stages in our Pipeline

  # Convert label into label indices using the StringIndexer
  label_stringIdx = StringIndexer(inputCol="OUTCOME", outputCol="label")
  stages += [label_stringIdx]

  # ordinal features
  ordinalCols= ['YEAR', 'MONTH', 'DAY_OF_MONTH', 'DAY_OF_WEEK', 'QUARTER', 'OP_UNIQUE_CARRIER_ORDINAL', 'ORIGIN_ORDINAL', 'DEST_ORDINAL', 'ORIGIN_STATE_FIPS_ORDINAL',
                'ORIGIN_CITY_MARKET_ID_ORDINAL', 'DEST_STATE_FIPS_ORDINAL', 'DEST_CITY_MARKET_ID_ORDINAL']
  #  numeric features
  numericCols = [
  'DEST_LATITUDE', 'DEST_LONGITUDE', 'DEST_ELEV',
  'ORIGIN_LATITUDE', 'ORIGIN_LONGITUDE', 'ORIGIN_ELEV',
  'CRS_ELAPSED_TIME','CRS_DEP_TIME',
  'ORIGIN_WND_ANGLE_AVG', 'ORIGIN_WND_SPEED_AVG', 'ORIGIN_CIG_AVG', 'ORIGIN_VIS_AVG', 'ORIGIN_TMP_AVG', 'ORIGIN_DEW_AVG', 'ORIGIN_SLP_AVG',
  'DEST_WND_ANGLE_AVG',   'DEST_WND_SPEED_AVG',   'DEST_CIG_AVG',   'DEST_VIS_AVG',   'DEST_TMP_AVG',   'DEST_DEW_AVG',   'DEST_SLP_AVG',
  'ORIGIN_PRECIP_RATE_AVG', 'ORIGIN_SNOW_DEPTH_AVG',
  'DEST_PRECIP_RATE_AVG',   'DEST_SNOW_DEPTH_AVG',
  'CRS_ARR_TIME','DISTANCE',
  'DEST_PAGERANK', 'ORIGIN_PAGERANK']

  input_cols = []
  input_cols += ordinalCols
  input_cols += numericCols
  assembler = VectorAssembler(inputCols=input_cols, outputCol="features", handleInvalid="skip")
  stages += [assembler]
  
  return (stages, input_cols)

In [32]:
#
# Cast columns to appropriate data type
#
# Cast numerics to doubles
data_raw = data_raw.withColumn("DEST_LATITUDE", data_raw["DEST_LATITUDE"].cast('double'))
data_raw = data_raw.withColumn("DEST_LONGITUDE", data_raw["DEST_LONGITUDE"].cast('double'))
data_raw = data_raw.withColumn("DEST_ELEV", data_raw["DEST_ELEV"].cast('double'))
data_raw = data_raw.withColumn("ORIGIN_LATITUDE", data_raw["ORIGIN_LATITUDE"].cast('double'))
data_raw = data_raw.withColumn("ORIGIN_LONGITUDE", data_raw["ORIGIN_LONGITUDE"].cast('double'))
data_raw = data_raw.withColumn("ORIGIN_ELEV", data_raw["ORIGIN_ELEV"].cast('double'))
data_raw = data_raw.withColumn("CRS_DEP_TIME", data_raw["CRS_DEP_TIME"].cast('double'))
data_raw = data_raw.withColumn("CRS_ARR_TIME", data_raw["CRS_ARR_TIME"].cast('double'))
data_raw = data_raw.withColumn("DISTANCE", data_raw["DISTANCE"].cast('double'))
data_raw = data_raw.withColumn("YEAR", data_raw["YEAR"].cast('double'))
data_raw = data_raw.withColumn("MONTH", data_raw["MONTH"].cast('double'))
data_raw = data_raw.withColumn("DAY_OF_MONTH", data_raw["DAY_OF_MONTH"].cast('double'))
data_raw = data_raw.withColumn("DAY_OF_WEEK", data_raw["DAY_OF_WEEK"].cast('double'))
data_raw = data_raw.withColumn("QUARTER", data_raw["QUARTER"].cast('double'))
data_raw = data_raw.withColumn("MONTH", data_raw["MONTH"].cast('double'))
# cast 'categories' to be one-hote encoded into strings
data_raw = data_raw.withColumn("OP_UNIQUE_CARRIER", data_raw["OP_UNIQUE_CARRIER"].cast('string'))
data_raw = data_raw.withColumn("ORIGIN", data_raw["ORIGIN"].cast('string'))
data_raw = data_raw.withColumn("DEST", data_raw["DEST"].cast('string'))
data_raw = data_raw.withColumn("ORIGIN_STATE_FIPS", data_raw["ORIGIN_STATE_FIPS"].cast('string'))
data_raw = data_raw.withColumn("ORIGIN_CITY_MARKET_ID", data_raw["ORIGIN_CITY_MARKET_ID"].cast('string'))
data_raw = data_raw.withColumn("DEST_STATE_FIPS", data_raw["DEST_STATE_FIPS"].cast('string'))
data_raw = data_raw.withColumn("DEST_CITY_MARKET_ID", data_raw["DEST_CITY_MARKET_ID"].cast('string'))



#
#  Use Brieman's method to create ordinals for unordered categorical data
#
data_raw.createOrReplaceTempView("data_raw")
total_count = data_raw.count()
data_raw = calculate_ordinal('OP_UNIQUE_CARRIER', total_count)
data_raw = calculate_ordinal('ORIGIN_STATE_FIPS', total_count)
data_raw = calculate_ordinal('DEST_STATE_FIPS', total_count)
data_raw = calculate_ordinal('ORIGIN_CITY_MARKET_ID', total_count)
data_raw = calculate_ordinal('DEST_CITY_MARKET_ID', total_count)
data_raw = calculate_ordinal('DEST', total_count)
data_raw = calculate_ordinal('ORIGIN', total_count)
data_raw.createOrReplaceTempView("data_raw")

#
# Split into train and test
#
data_raw_train    = spark.sql("select * from data_raw where YEAR < 2019 or (YEAR = 2019 and MONTH = 1)")
data_raw_test     = spark.sql("select * from data_raw where YEAR = 2019 and MONTH > 1")
print('data raw train', f'{data_raw_train.count():,}')
print('data raw test ', f'{data_raw_test.count():,}')

data_raw_train.createOrReplaceTempView("data_raw_train")
data_raw_test.createOrReplaceTempView("data_raw_test")

#
# Run Pipeline to encode data for logistic regression
#

# Create pipeline for one-hot encoding, standardizing, and creating label for outcome variable
stages, orig_cols        = create_encoding_stages_dt()
data_prep_pipeline       = Pipeline().setStages(stages)
# Fit based on the training data
data_prep_model          = data_prep_pipeline.fit(data_raw_train)
# Transform (encode) all of the raw data
data_prepped_dt          = data_prep_model.transform(data_raw)
data_prepped_dt.count()
data_prepped_dt.createOrReplaceTempView('data_prepped_dt')


#
# Split the encoded data into train, test
#
data_prepped_train_dt    = spark.sql("select * from data_prepped_dt where YEAR < 2019 or (YEAR = 2019 and MONTH = 1)")
data_prepped_test_dt     = spark.sql("select * from data_prepped_dt where YEAR = 2019 and MONTH > 1")

#
# Keep relevant columns
#
selected_cols = ["label", "features", "OUTCOME", "id"] + orig_cols
data_prepped_train_dt          = data_prepped_train_dt.select(selected_cols)
data_prepped_test_dt           = data_prepped_test_dt.select(selected_cols)

data_prepped_train_dt.createOrReplaceTempView('data_prepped_train_dt')
data_prepped_test_dt.createOrReplaceTempView('data_prepped_test_dt')

#
# Separate train and test into smaller subsets
#
data_train_tree          = data_prepped_train_dt

# Taking 50% of both 0,1 outcomes from test to create test data set.  this is our hold-out set
data_test_tree         = data_prepped_test_dt.sampleBy("outcome", fractions={0: 0.50, 1: 0.50}, seed=10)

# Remaining 50% (unique set) will be our dev set for performance tuning
data_dev_tree          = data_prepped_test_dt.subtract(data_test_tree)

# Take 5% sample of train for train_small
data_train_small_tree    = data_train_tree.sampleBy("outcome", fractions={0: 0.05, 1: 0.05}, seed=10)

# Take 5% of from dev to for dev small
data_dev_small_tree    = data_dev_tree.sampleBy("outcome", fractions={0: 0.05, 1: 0.05}, seed=10)

#4. Algorithm Exploration

<img src="https://github.com/tonydisera/261-final-project-images/blob/master/algo_explore1.png?raw=true" width="60%" >

For our analysis, we selected 2 traditional classification algorithms, specifically logistic regression and decision trees, with the intention to further consider various regularization options for logistic regression and bagging, boosting and random forest options for decision trees. Due to the fact that logistic regression is not suitable in the case of multicollinearity, our EDA efforts noted above were specifically designed to ensure the data pipeline informing our logistic regression model removed features which were highly correlated. We also ensured that our continuous features were normalized with mean = 0 and standard deviation = 1 so that gradient descent process would be optimized. 

For our decision tree-based models, we were able to incorporate more features without concern about multicollinearity and generally include all features with limited preprocessing (no normalization or one-hot encoding of categorical features). Our expectation is that ensemble decision tree methods (specifically bagging with random forest and bagging with gradient-boosted trees) would produce the strongest results, as these models train on the most data, use balanced training sets, handle missing values more easily, and balance the bias-variance tradeoff through ensembles. In terms of computational cost (or time required), logistic regression and decision trees should be the most performant.

###4.1 Logistic Regression

<img src="https://github.com/tonydisera/261-final-project-images/blob/master/lr.png?raw=true" align="left" width="10%" >


For logistic regression, we first started with a simple model trained on the full dataset. We then proceeded to manually explore L1 and L2 regularization (code not included for brevity). We used a gridsearch and crossfold validation to further explore hyperparameter tuning. After some initial attempts to perform these operations on the full training set, we concluded that we would need to sample in order to perform the crossfold validation in a reasonable amount of time.

#### Functions to evaluate model performance

We were most interested in recall and F1 as our primary performance metrics. These metrics were not accessible via the API, so we wrote a custom function to calculate the true positive, true negative, false positive, and false negative counts needed to calculate recall and F1. We also monitored the area under the precision-recall curve, or AUC PR. This metric closely followed F1 and was accessible from the model object returned from the API.

In [36]:
# Dataframe to store final results
predictionDFcolumns =['accuracy','precision', 'recall', 'f1score', 'areaUnderPR', 'tp', 'tn', 'fp', 'fn']
predictionDF = pd.DataFrame(columns=predictionDFcolumns)
# Save results to csv
metrics_filename = "/dbfs/user/ammara.essa@ischool.berkeley.edu/W261_Final_Project/predictions_df.csv"

In [37]:

# Function to calculate evaluation metrics
def evaluatePerformance(predictions, modelName = 'Test'):
  """
  Function that takes in a dataframe that includes a column of 'labels' and 'predictions' and
  parses these labels and predictions to calculate, print, and return as a list the suite of accuracy measures
  we are interested in. "model_name" is a string that provides the model name and is used for saving results to dataframe.
  """
  predictions.createOrReplaceTempView("predictions")
  # manual counts
  tp = spark.sql("select count(*) from predictions where label = 1 and prediction = 1").first()[0]
  tn = spark.sql("select count(*) from predictions where label = 0 and prediction = 0").first()[0]
  fp = spark.sql("select count(*) from predictions where label = 0 and prediction = 1").first()[0]
  fn = spark.sql("select count(*) from predictions where label = 1 and prediction = 0").first()[0]
  

  # Overall statistics
  accuracy = (tp+tn)/(tp+tn+fp+fn)
  recall = tp / (tp+fn)
  
  precision = 0
  if tp+fp != 0:
    precision = tp / (tp+fp)

  f1score = 0
  if recall + precision != 0:
    f1score = 2*recall*precision/(recall + precision)

  
  # Area under precision-recall curve
  pl_rdd = predictions.select(predictions.label.cast("float"), predictions.prediction.cast("float")).rdd
  pr_metrics = BinaryClassificationMetrics(pl_rdd)
  auc = pr_metrics.areaUnderPR

  print("Summary Stats")
  print("tp (correct   pred as delay) \t%s" % f'{tp:,}')
  print("tn (correct   pred as ontime)\t%s" % f'{tn:,}')
  print("fp (incorrect pred as delay) \t%s" % f'{fp:,}')
  print("fn (incorrect pred as ontime)\t%s" % f'{fn:,}, "\n"')

  print("accuracy \t%s"  % np.round(accuracy,4),  "\tout of all observations, how many were predicted correctly?")
  print("precision\t%s"  % np.round(precision,4), "\tout of all delays predicted, how many are correct?")
  print("recall   \t%s"  % np.round(recall,4),    "\tout of all actual delays, how many are correctly predicted?")
  print("f1       \t%s"  % np.round(f1score,4))
  print("Area under PR = %s" % auc)
  
  #Store the metrics in a dataframe to return & write output to csv file
#   predictionDF = pd.read_csv(metrics_filename, index_col=0)
  predictionDF.loc[modelName] = [accuracy, precision, recall, f1score, auc, tp, tn, fp, fn]
#   predictionDF.to_csv(metrics_filename)
  
  #reset the predictions for the next iteration
  predictions = tp = tn = fp = fn = None

# Function to show curves
def showCurve(modelSummary):
  """
  Function that takes in the modelSummary object from a trained model and calls 
  two additional plot functions to return Area under the ROC and Precision/Recall curves
  """
  fig = plt.figure(figsize=(10,5))
  ax1 = fig.add_subplot(1, 2, 1)
  ax2 = fig.add_subplot(1, 2, 2)
  showROCCurve(modelSummary, ax1)
  showPRCurve(modelSummary, ax2)
  display(plt.show())
  
def showROCCurve(modelSummary, ax):
  roc = modelSummary.roc.toPandas()
  ax.plot(roc['FPR'],roc['TPR'])
  ax.set_ylabel('False Positive Rate')
  ax.set_xlabel('True Positive Rate')
  ax.set_title('ROC Curve')

def showPRCurve(modelSummary, ax):
  pr = modelSummary.pr.toPandas()
  ax.plot(pr['recall'],pr['precision'])
  ax.set_xlabel('Precision')
  ax.set_ylabel('Recall')
  ax.set_title('Precision Recall')
  
# Function to show confusion matrix
def showConfusionMatrix(modelSummary):
  """
  Function that takes in the modelSummary object from a trained model (where the tp, tn, fp, fn have already been calculated
  and returns a confusion matrix in the form of a dataframe
  """
  confMatrix = pd.DataFrame(columns=['Delay_(Predicted)','No_Delay_(Predicted)'])
  confMatrix.loc['Delay_(Actual)','Delay_(Predicted)'] = modelSummary['tp']
  confMatrix.loc['No_Delay_(Actual)','No_Delay_(Predicted)'] = modelSummary['tn']
  confMatrix.loc['No_Delay_(Actual)','Delay_(Predicted)'] = modelSummary['fp']
  confMatrix.loc['Delay_(Actual)','No_Delay_(Predicted)'] = modelSummary['fn']
  cm = sns.light_palette("blue", as_cmap=True)
  s = confMatrix.style.background_gradient(cmap=cm)
  display(s)

#### Functions to show feature importance

One important aspect of logistic regression models is their explainability. By extracting features and feature weights and signs, we can understand the features that have the largest impact on the model's predictons, as well as the direction of impact. We create custom functions to extract this information.

In [39]:
#
# Show feature importance
#
def display_feature_importance_lr(sorted_feature_scores):
  seen = {}
  lookup = {
  'features_scaled_0': 'DEST_LATITUDE',
  'features_scaled_1': 'DEST_LONGITUDE',
  'features_scaled_2': 'DEST_ELEV',
  'features_scaled_3': 'ORIGIN_LATITUDE',
  'features_scaled_4': 'ORIGIN_LONGITUDE',
  'features_scaled_5': 'ORIGIN_ELEV',
  'features_scaled_6': 'CRS_ELAPSED_TIME',
  'features_scaled_7': 'CRS_DEP_TIME',
  'features_scaled_8': 'ORIGIN_WND_ANGLE_AVG',
  'features_scaled_9': 'ORIGIN_WND_SPEED_AVG',
  'features_scaled_10': 'ORIGIN_CIG_AVG',
  'features_scaled_11': 'ORIGIN_VIS_AVG',
  'features_scaled_12': 'ORIGIN_TMP_AVG',
  'features_scaled_13': 'ORIGIN_DEW_AVG',
  'features_scaled_14': 'ORIGIN_SLP_AVG',
  'features_scaled_15': 'ORIGIN_PRECIP_RATE_AVG',
  'features_scaled_16': 'ORIGIN_SNOW_DEPTH_AVG',
  'features_scaled_17': 'DEST_WND_ANGLE_AVG',
  'features_scaled_18': 'DEST_WND_SPEED_AVG',
  'features_scaled_19': 'DEST_CIG_AVG',
  'features_scaled_20': 'DEST_VIS_AVG',
  'features_scaled_21': 'DEST_TMP_AVG',
  'features_scaled_22': 'DEST_DEW_AVG',
  'features_scaled_23': 'DEST_SLP_AVG',
  'features_scaled_24': 'DEST_PRECIP_RATE_AVG',
  'features_scaled_25': 'DEST_SNOW_DEPTH_AVG',
  'features_scaled_26': 'DEST_PAGERANK',
  'features_scaled_27': 'ORIGIN_PAGERANK'}
  
  
  for name, score in sorted_feature_scores:
    trimmed = name
    try:
      idx = name.index('classVec_')
      if idx >= 0:
        trimmed = name[:idx]
    except:
      trimmed = name
    try:
      idx = trimmed.index('features_categorical_')
      if idx >= 0:
         trimmed = trimmed[len('features_categorical_'):]
    except:
      trimmed = trimmed
    if trimmed not in seen:
      if trimmed in lookup:
        print(lookup[trimmed], np.round(score,3))
      else:
        print(trimmed, np.round(score,3))
      seen[trimmed] = True

def determine_feature_importance_lr(predictions):      
  features = [x["name"] for x in sorted(predictions.schema["features"].metadata["ml_attr"]["attrs"]["binary"]+ predictions.schema["features"].metadata["ml_attr"]["attrs"]["numeric"], key=lambda x: x["idx"])]

  #features = [ x['name'] for x in predictions.schema["features"].metadata["ml_attr"]["attrs"]["numeric"]]
  cf = lrModel.coefficientMatrix.toArray().tolist()[0]

  feature_cf =  list(zip(features, cf))
  
  #
  # Feature Importance (ordered by absolute value of coefficient)
  #
  sorted_feature_scores_abs_class = sorted(feature_cf, key=lambda tup:(-abs(tup[1]), tup[0]))
  display_feature_importance_lr(sorted_feature_scores_abs_class)
  
  
def display_feature_importance_tree(model, predictions):
  featureImp = model.featureImportances
  list_extract = []
  for i in predictions.schema['features'].metadata["ml_attr"]["attrs"]:
      list_extract = list_extract + predictions.schema['features'].metadata["ml_attr"]["attrs"][i]
  varlist = pd.DataFrame(list_extract)
  varlist['score'] = varlist['idx'].apply(lambda x: featureImp[x])
  return varlist.sort_values('score', ascending = False)


#### 4.1.1 Logistic Regression - Baseline Model

Our initial exploration of logistic regression is performed on the complete dataset. We have not (yet) taken any actions to balance the dataset between classes and simply accept the default parameters for the model. We explored a number of different combinations of hyperparameters (examples not shown), primarily focusing on comparing regularization routines. The MLLib API has a hyperparameter (ElasticNet) that sets the regularization to either L1 or L2, but it can also blend the two types of regularization. Despite efforts at tuning, we saw little performance improvement when training on the full training set.

In [42]:
# Create logistic regression model
lr = LogisticRegression(featuresCol = 'features', labelCol = 'label')
print("\033[1m Logistic Regression Model [unweighted] with default parameters \033[0m")
start = time.time()
# Train model with Training Data
lrModel = lr.fit(data_train)
print(f"\nTrained model in {(time.time() - start)/60} minutes")

print("Evaluation against dev set")
print("========================")
start = time.time()

# Generate predictions on the dev set
predictions = lrModel.transform(data_dev).cache()
predictions.createOrReplaceTempView("predictions")

# Evaluate the performance of the dev set predictions
evaluatePerformance(predictions, 'LR_Unweighted_DefaultParms')

print(f"\nEvaluated metrics in {(time.time() - start)/60} minutes")

In [43]:
# Sample output the dataframe with model results. The model name is the index.
pd.DataFrame(predictionDF.loc['LR_Unweighted_DefaultParms']).transpose()
# predictionDF.loc['LR_Unweighted_DefaultParms','tp']

Unnamed: 0,accuracy,precision,recall,f1score,areaUnderPR,tp,tn,fp,fn
LR_Unweighted_DefaultParms,0.8009,0.559851,0.057988,0.105091,0.049822,38712.0,2613418.0,30435.0,628874.0


In [44]:
# Show confusion matrix
print('\033[1m Confusion matrix for model : LR_Unweighted_DefaultParms \033[0m')
showConfusionMatrix(predictionDF.loc['LR_Unweighted_DefaultParms'])

Unnamed: 0,Delay_(Predicted),No_Delay_(Predicted)
Delay_(Actual),38712,628874
No_Delay_(Actual),30435,2613418


####4.1.2 Logistic Regression - with Weighting

Due to the imbalance between delayed and on-time observations in the training set( ~80% ontime / 20 delayed), we employed a technique called “Class Weighing”. Using this method, we ensure that the logistic loss objective function treats the positive (delayed) class (Outcome == 1) with higher weight. The functions below described how the balancing ratio for the training data is calculated and applied to each observation of the training, dev and test data (in a new column 'weight').

#### Functions to calculate and assign weights

In [48]:
#helper functions to set up the weighted model
def getWeight(df):
  """
  Takes in a dataframe and returns class weights
  """
  df.createOrReplaceTempView("df")
  outcome_counts_nodelay  = spark.sql("select  count(*) from df where OUTCOME = 0")

  outcome_counts_delay    = spark.sql("select  count(*) from df where OUTCOME = 1")

  total = spark.sql("select  count(*) from df")

  tot     = int(total.first()[0])
  nodelay = int(outcome_counts_nodelay.first()[0])
  delay   = int(outcome_counts_delay.first()[0])

  balancing_ratio = (tot - nodelay) / tot
  weight_nodelay  = 1 * balancing_ratio
  weight_delay    = 1 * (1 - balancing_ratio)

  print("no delay", nodelay, '\tratio', nodelay/tot, '\tweight', weight_nodelay)
  print("delay.  ", delay, '\tratio', delay/tot, '\tweight', weight_delay)
#   print("training set lost records check:", (data_train_count - tot) )
  return (weight_nodelay,weight_delay)

def setOutcome(label):
  """
  Takes in a label, returns no_delay_weight to underweight the on time flights if label = 0
  """
  if label == 0: return weight_nodelay
  else: return weight_delay


def setWeight(df):
  """
  Takes in an unweighted dataframe and returns a dataframe with a weights column added
  """
  udfSetOutcome = udf(setOutcome, DoubleType())
  df_with_weight = df.withColumn("weight", udfSetOutcome("label"))
  return df_with_weight

#### Calculate and assign weights to data

In [50]:
# calculate and assign weights to data

print('\033[1m Class weights calculated from training data \033[0m')
weight_nodelay,weight_delay = getWeight(data_train)
data_train       = setWeight(data_train)
data_train_small = setWeight(data_train_small)

data_train.createOrReplaceTempView("data_train")
data_train_small.createOrReplaceTempView("data_train_small")

In [51]:
# Create logistic regression model and train on training set, create predictions on the dev set and evaluate performance
lr = LogisticRegression(labelCol="label", featuresCol="features", weightCol="weight")

print("\033[1m Logistic Regression Model [weighted] with default parameters) \033[0m")
start = time.time()
# Train model with Training Data
lrModel = lr.fit(data_train)
print(f"\nTrained model in {(time.time() - start)/60} minutes")

print("Evaluation against dev set")
print("========================")
start = time.time()
predictions = lrModel.transform(data_dev).cache()
predictions.createOrReplaceTempView("predictions")
evaluatePerformance(predictions, "LR_Weighted_DefaultParms")
print(f"\nEvaluated metrics in {(time.time() - start)/60} minutes")

#### In order to explore feature importance in our weighted logistic regresion model, we then extract the model coefficients.

For binary classification, the coefficients represent the odds.  Positive coefficients for a feature indicate that the delay becomes more likely and negative coefficients indicate that the delay becomes less likely.   We sort by absolute value to rank both strongly positive and strongly negative features in terms of their predictive power.

In [53]:
# Display feature importance
print("\033[1mFeature Coefficients: Logistic Regression Model [weighted] with default parameters) \033[0m")
determine_feature_importance_lr(predictions)

##4.2 Tree-Based Models

<img src="https://github.com/tonydisera/261-final-project-images/blob/master/dt.png?raw=true" width="15%" >

We explored the following classification algorithms:  decision trees, random forests, and gradient-boosted trees. For each of the models, we first explored performance with the complete, unbalanced dataset. Next we explored the models with a balanced dataset created from undersampling the majority class. Finally, we implemented bootstrap aggregation (bagging), which we applied in conjunction with a balanced dataset (in terms of majority and minority classes) on all three algorithms.

We performed hyperparameter tuning with gridsearch using crossfold validation, but have omitted that here because these baseline models performed so poorly that no amount of hyperparameter tuning made a meaningful difference in model performance.

####4.2.1 Baseline Tree-Based Models

####4.2.1.1 Decision Tree - Baseline

Decision trees can be advantageous for several reasons. They handle numeric, categorical, and ordinal features without requiring pre-processing (standardization or scaling). Further, decision trees handle missing values naturally, which was a challenge for our dataset, especially the weather data. They are also explainable, as we can inspect the tree structure and infer the relative importance of features based on where they are in the tree hierarchy. (The display of the tree is quite large, so we show the code but do not run it here.)

In [57]:
#Specify the decision tree classifier model and fit to the training data

dt = DecisionTreeClassifier(featuresCol = 'features', labelCol = 'label', maxDepth = 10)
start = time.time()
dtModel = dt.fit(data_train_tree)
print("\033[1m Decision Tree Model (Baseline) \033[0m")
print(f"\nTrained model in {(time.time() - start)/60} minutes")

print("\nEvaluation against dev set")
print("========================")
start = time.time()
# Generate predictions on the dev set and evaluate model performance
predictions = dtModel.transform(data_dev_tree).cache()
predictions.createOrReplaceTempView("predictions")
evaluatePerformance(predictions, "DecisionTree_Baseline")
print(f"\nEvaluated metrics in {(time.time() - start)/60} minutes")

In [58]:
#display(dtModel)

####4.2.1.2 Random Forest - Baseline

<img src="https://github.com/tonydisera/261-final-project-images/blob/master/rf.png?raw=true" width="15%" >

Random forests are built from a number of trees that are built on sub-samples of the features (columns). By only taking a subsection of the possible features, the individual trees are forced to consider different combinations of features. For classification, the mode prediction is returned. By creating a number of individual trees and returning the mode, random forests correct for the tendency of decision trees to overfit (James, Witten, Hastie, Tibshirani, 2017). We performed gridsearch and crossfold validation on these models (ommited here), but the results did not improve very much from those shown below.

In [60]:
# Create random forest model and train on the training set
rf = RandomForestClassifier(labelCol="label", featuresCol="features",  
                                     numTrees=50, featureSubsetStrategy="auto",
                                     impurity='gini', subsamplingRate=0.8, maxDepth=15, maxBins=32)
start = time.time()
rfModel = rf.fit(data_train_tree)
print("\033[1m Random Forest (Baseline) \033[0m")

print(f"\nTrained model in {(time.time() - start)/60} minutes")

print("\nEvaluation against dev set")
print("========================")
start = time.time()
# Generate predictions on the dev set and evaluate model performance
predictions = rfModel.transform(data_dev_tree).cache()
predictions.createOrReplaceTempView("predictions")
evaluatePerformance(predictions, "RandomForest_Baseline")
print(f"\nEvaluated metrics in {(time.time() - start)/60} minutes")

####4.2.1.3 Gradient-Boosted Trees - Baseline

<img src="https://github.com/tonydisera/261-final-project-images/blob/master/gbt.png?raw=true" width="10%" >

Gradient-boosted trees are a subset of the class of gradient-boosted models that use decision trees as base learners. These base learner trees are are weak models that learn direction vectors with both direction and magnitude (Parr and Howard, 2020). Subsequent learners build off previous ones (much like a good golfer hits progressively shorter shots as she moves from tee to green to the hole). We experimented with tuning the number of iterations, but observed little difference in model performance when the full training set was used.

In [62]:
# Create gradient-boosted tree model and train on the training set
gbt = GBTClassifier(labelCol="label", featuresCol="features",maxIter=20)


start = time.time()
gbtModel = gbt.fit(data_train_tree)
print("\033[1m Gradient Boosted Trees (Baseline) \033[0m")
print(f"\nTrained model in {(time.time() - start)/60} minutes")

print("\nEvaluation against dev set")
print("========================")
start = time.time()
# Generate predictions on the dev set and evaluate model performance
predictions = gbtModel.transform(data_dev_tree).cache()
predictions.createOrReplaceTempView("predictions")
evaluatePerformance(predictions, "GBT_Baseline")
print(f"\nEvaluated metrics in {(time.time() - start)/60} minutes")

####4.2.2 Tree-Based Models Using a Balanced Dataset

<img src="https://github.com/tonydisera/261-final-project-images/blob/master/bagging.png?raw=true" width="40%" >

Next, we attempted to improve the model performance by creating a balanced dataset by randomly undersampling the majority class. This had the effect of reducing our training set from ~24M records to ~9M records. We explored each of the three models again and experimented further with hyperparameter tuning, both manually and through gridsearch and crossfold validation (this was a very computationally expensive operation, especially for forests and gradient-boosted trees).

In [64]:
# Helper functions to create data sets balanced on the outcome variable

def split_data(data):
  """
  Function that takes in dataframe and splits it into two based on the outcome variable and returns two dataframes
  """
  data_ontime = data.filter('OUTCOME = 0')
  data_delayed = data.filter('OUTCOME = 1')
  return data_ontime, data_delayed

def create_balanced_set(data):
  """
  This function takes in a dataframe, splits the dataframe by its outcome variable, calculates a balancing ratio and
  creates a new dataset balanced by the outcome variable by under-sampling the majority class.
  """
  on_time, delayed = split_data(data)
  on_time_count = on_time.count()
  delayed_count = delayed.count()
  ratio = delayed_count / on_time_count
      
  #take a random sample from the data
  current_seed = np.random.randint(low=1, high=100, size=1)[0]
  on_time_samp = on_time.sample(False, ratio, seed=int(current_seed))
  delayed_samp = delayed.sample(True, 1.0, seed=int(current_seed))
      
  #combine into single dataframe
  balanced_train = on_time_samp.union(delayed_samp).cache()
  return balanced_train

In [65]:
# Create the balanced training set based on the tree pipeline
balanced_training = create_balanced_set(data_train_tree)
balanced_training.count()

####4.2.1.1 Decision Tree - Balanced Outcomes

In [67]:
#Specify the decision tree classifier model and fit to the training data

dt = DecisionTreeClassifier(featuresCol = 'features', labelCol = 'label', maxDepth = 10)
start = time.time()
dtModel = dt.fit(balanced_training)
print("\033[1m Decision Tree Model (Balanced Outcome) \033[0m")
print(f"\nTrained model in {(time.time() - start)/60} minutes")

print("\nEvaluation against dev set")
print("========================")
start = time.time()
# Generate predictions on the dev set and evaluate model performance
predictions = dtModel.transform(data_dev_tree).cache()
predictions.createOrReplaceTempView("predictions")
evaluatePerformance(predictions, "Decision_Tree_Balanced")
print(f"\nEvaluated metrics in {(time.time() - start)/60} minutes")

In [68]:
# Display feature imporance
print("\033[1mFeature Importance : Decision Tree Model [weighted]) \033[0m")
display_feature_importance_tree(dtModel, predictions)

Unnamed: 0,idx,name,score
19,19,CRS_DEP_TIME,0.270109
27,27,ORIGIN_PRECIP_RATE_AVG,0.149896
36,36,DEST_PRECIP_RATE_AVG,0.129083
23,23,ORIGIN_VIS_AVG,0.080427
5,5,OP_UNIQUE_CARRIER_ORDINAL,0.069378
24,24,ORIGIN_TMP_AVG,0.056768
1,1,MONTH,0.039007
14,14,DEST_ELEV,0.028351
11,11,DEST_CITY_MARKET_ID_ORDINAL,0.022225
9,9,ORIGIN_CITY_MARKET_ID_ORDINAL,0.021505


In [69]:
# Visualize the decision tree (output omitted due to large size)
#display(dtModel)

####4.2.1.2 Random Forest - Balanced Outcomes

The Random Forest model trained on balanced training data uses hyperparameters that were based on the results of the gridsearch crossfold validation performed on the small sample set of data. We displayed the feature importance so that we could compare how the different models treated features differently.

In [71]:
# Pull out the parameters from the gridsearch and re-run the RF model on the full training set to re-train the weights
best_rf = RandomForestClassifier(labelCol="label", featuresCol="features",  
                                     numTrees=25, featureSubsetStrategy="auto",
                                     impurity='gini', maxDepth=20, maxBins=32)

start = time.time()
rfModel_bal = best_rf.fit(balanced_training)
print("\033[1m Random Forest Model (Balanced Outcome) \033[0m")
print("\033[1m hyperparameters tuned with smaller gridsearch \033[0m")
print(f"\nTrained model in {(time.time() - start)/60} minutes")


print("\nEvaluation against dev set")
print("========================")
start = time.time()
# Generate predictions on the dev set and evaluate model performance
predictions = rfModel_bal.transform(data_dev_tree).cache()
predictions.createOrReplaceTempView("predictions")
evaluatePerformance(predictions,"Random_Forest_Balanced")
print(f"\nEvaluated metrics in {(time.time() - start)/60} minutes")

In [72]:
# Display feature importance
print("\033[1mFeature Importance : Random Forest Model [weighted]) \033[0m")
display_feature_importance_tree(rfModel_bal, predictions)

Unnamed: 0,idx,name,score
19,19,CRS_DEP_TIME,0.092413
38,38,CRS_ARR_TIME,0.059856
36,36,DEST_PRECIP_RATE_AVG,0.051311
27,27,ORIGIN_PRECIP_RATE_AVG,0.049372
23,23,ORIGIN_VIS_AVG,0.046293
24,24,ORIGIN_TMP_AVG,0.03513
32,32,DEST_VIS_AVG,0.032334
5,5,OP_UNIQUE_CARRIER_ORDINAL,0.0312
31,31,DEST_CIG_AVG,0.029228
25,25,ORIGIN_DEW_AVG,0.028917


####4.2.1.3 Gradient-Boosted Trees - Balanced Outcomes

In [74]:
# Create GBT model and train on the balanced training set
gbt = GBTClassifier(labelCol="label", featuresCol="features",maxIter=20)


start = time.time()
gbtModel_bal = gbt.fit(balanced_training)
print("\033[1m Gradient Boosted Tree Model (Balanced Outcome) \033[0m")
print(f"\nTrained model in {(time.time() - start)/60} minutes")

print("\nEvaluation against dev set")
print("========================")
start = time.time()
# Generate predictions on the dev set and evaluate model performance
predictions = gbtModel_bal.transform(data_dev_tree).cache()
predictions.createOrReplaceTempView("predictions")
evaluatePerformance(predictions,"GBT_Balanced")
print(f"\nEvaluated metrics in {(time.time() - start)/60} minutes")

In [75]:
# Display feature importance
print("\033[1mFeature Importance : Gradient Boosted Tree Model [weighted]) \033[0m")
display_feature_importance_tree(gbtModel_bal, predictions)

Unnamed: 0,idx,name,score
5,5,OP_UNIQUE_CARRIER_ORDINAL,0.093186
19,19,CRS_DEP_TIME,0.063047
24,24,ORIGIN_TMP_AVG,0.051811
32,32,DEST_VIS_AVG,0.045909
33,33,DEST_TMP_AVG,0.044994
38,38,CRS_ARR_TIME,0.043897
34,34,DEST_DEW_AVG,0.039952
3,3,DAY_OF_WEEK,0.037668
27,27,ORIGIN_PRECIP_RATE_AVG,0.035862
16,16,ORIGIN_LONGITUDE,0.035392


####4.2.3 Bootstrap Aggregation (Bagging)

Bagging implements a logic similar to that of random forests, but it operates on the rows (data observations) instead of the columns (features). It's underlying principle is to build complex, high variance trees (that individually might be overfit) and then overcome the high variance by averaging. We developed the bagging framework as a way to continue to train on balanced training sets, but be able to take advantage of all of our training data. We implemented bagging for decision trees, random forests, and gradient boosted trees. These models were expensive to train, so we implemented a human-based gridsearch whereby we split the different parameter settings we wanted to run between our four team members and had us each train different models. Depending on the performance of the databricks cluster at runtime, these models took up to ten hours to train. (We incorporated interim print statements to help us keep track of model training progress.)

In [77]:
# This is a set of helper functions that are used in running the bootstrap aggregation (bagging) on the tree-based models (decision tree, random forest, gradient-boosted trees)


def eval_tree(the_data_train, the_data_eval, nDepth):
  """
  Function that takes in a trainig dataframe, a test dataframe, and a depth hyperparameter, 
  then trains a decision tree on it and returns
  a new dataframe with columns 'id' and 'prediction'
  """
  predictions = dt = dtModel = tree_eval = None
  
  # Define the decision tree model, fit on the training data, generate and cache predictions on the eval data
  dt = DecisionTreeClassifier(featuresCol = 'features', labelCol = 'label', maxDepth = nDepth)
  dtModel = dt.fit(the_data_train)
  predictions = dtModel.transform(the_data_eval).cache()

  #Create new 'predictions' and 'one_prediction' df each time through
  #Create new dataframe taht only captures the row 'id' and prediction
  predictions.createOrReplaceTempView("predictions")
  
  one_prediction = predictions.select('id', 'prediction')
  one_prediction.createOrReplaceTempView("one_prediction")

  return one_prediction


def eval_forest(the_data_train, the_data_eval, nDepth):
  """
  Function that takes in a trainig dataframe, a test dataframe, and a depth hyperparameter, 
  then trains a random forest on it and returns
  a new dataframe with columns 'id' and 'prediction'
  """
  predictions = dt = dtModel = tree_eval = None
  nTrees = 25
  
  # Define the random forest model, fit on the training data, generate and cache predictions on the eval data
  rf = RandomForestClassifier(labelCol="label", featuresCol="features",  
                                     numTrees=nTrees, featureSubsetStrategy="auto",
                                     impurity='gini', subsamplingRate=0.8, maxDepth=nDepth, maxBins=32)
  rfModel = rf.fit(the_data_train)
  predictions = rfModel.transform(the_data_eval).cache()

  #Create new 'predictions' and 'one_prediction' df each time through
  #Create new dataframe taht only captures the row 'id' and prediction
  predictions.createOrReplaceTempView("predictions")
  
  one_prediction = predictions.select('id', 'prediction')
  one_prediction.createOrReplaceTempView("one_prediction")

  return one_prediction


def eval_gbt(the_data_train, the_data_eval):
  """
  Function that takes in a trainig dataframe, a test dataframe, and a depth hyperparameter, 
  then trains a gradient-boosted tree model on it and returns
  a new dataframe with columns 'id' and 'prediction'
  """
  predictions = dt = dtModel = tree_eval = None
    
  # Define the GBT model, fit on the training data, generate and cache predictions on the eval data
  gbt = GBTClassifier(labelCol="label", featuresCol="features",maxIter=20)
  
  gbtModel = gbt.fit(the_data_train)
  predictions = gbtModel.transform(the_data_eval).cache()

  #Create new 'predictions' and 'one_prediction' df each time through
  #Create new dataframe taht only captures the row 'id' and prediction
  predictions.createOrReplaceTempView("predictions")
  
  one_prediction = predictions.select('id', 'prediction')
  one_prediction.createOrReplaceTempView("one_prediction")


  return one_prediction

In [78]:
# There is a slight modification to the evaluate performance function to make it work more cleanly when called multiple times.

def evaluateTreePerformance(modelName, predictions):
  """
  Function that takes in a dataframe that includes a column of 'labels' and 'predictions' and
  parses these labels and predictions to calculate, print, and return as a list the suite of accuracy measures
  we are interested in.
  """
  
  # manual counts
  tp = spark.sql("select count(*) from predictions where label = 1 and prediction = 1").first()[0]
  tn = spark.sql("select count(*) from predictions where label = 0 and prediction = 0").first()[0]
  fp = spark.sql("select count(*) from predictions where label = 0 and prediction = 1").first()[0]
  fn = spark.sql("select count(*) from predictions where label = 1 and prediction = 0").first()[0]
  

  # Overall statistics
  accuracy = (tp+tn)/(tp+tn+fp+fn)
  recall = tp / (tp+fn)
  
  precision = 0
  if tp+fp != 0:
    precision = tp / (tp+fp)

  f1score = 0
  if recall + precision != 0:
    f1score = 2*recall*precision/(recall + precision)

  
  # Area under precision-recall curve
  pl_rdd = predictions.select(predictions.label.cast("float"), predictions.prediction.cast("float")).rdd
  pr_metrics = BinaryClassificationMetrics(pl_rdd)
  auc = pr_metrics.areaUnderPR

  print("Summary Stats")
  print("tp (correct   pred as delay) \t%s" % f'{tp:,}')
  print("tn (correct   pred as ontime)\t%s" % f'{tn:,}')
  print("fp (incorrect pred as delay) \t%s" % f'{fp:,}')
  print("fn (incorrect pred as ontime)\t%s" % f'{fn:,}, "\n"')

  print("accuracy \t%s"  % np.round(accuracy,4),  "\tout of all observations, how many were predicted correctly?")
  print("precision\t%s"  % np.round(precision,4), "\tout of all delays predicted, how many are correct?")
  print("recall   \t%s"  % np.round(recall,4),    "\tout of all actual delays, how many are correctly predicted?")
  print("f1       \t%s"  % np.round(f1score,4))
  print("Area under PR = %s" % auc)
  
  #Store the metrics in a list to return
  tree_metrics = [accuracy,precision,recall,f1score, auc]
  
  #Store the metrics in a list to return
  predictionDF = pd.read_csv(metrics_filename, index_col=0)
  predictionDF.loc[modelName] = [accuracy, precision, recall, f1score, auc, tp, tn, fp, fn]
  predictionDF.to_csv(metrics_filename)
  
  #reset the predictions for the next iteration
  predictions = tp = tn = fp = fn = None
  return tree_metrics

In [79]:
def bagging(nTrees, nDepth, bagFrac, data_train, data_eval, model_type):
  """
  Function that performs bagging on a balanced dataset for a specified number of trees/random forests / GBTs.
  'nTrees' is an integer that specifies the number of bagged trees to train
  'nDepth' is an integer that specifies the depth for trees/forests
  'data_train' is a dataframe of training data
  'data_eval' is a dataframe of evaluation data
  'model_type' is a string that accepts 'tree', 'forest', or 'gbt' and calls the specified model for each bagged tree
  
  Function returns the performance metrics for the bagged models.
  """
  #create new version of data_train
  data_train.createOrReplaceTempView("data_train")
  
  #split training data into on-time and delayed, generate counts and create a ratio
  on_time, delayed = split_data(data_train)
  on_time_count = on_time.count()
  delayed_count = delayed.count()
  ratio = delayed_count / on_time_count

  ## create dataframe to hold the predictions
  data_eval.createOrReplaceTempView("data_eval")
  prediction_df = data_eval.select('id', 'label')
    

  #loop to generate individual trees in ensemble
  for i in range(nTrees):
    start = time.time()
        
    #take a random sample from the data for each tree
    current_seed = np.random.randint(low=1, high=100, size=1)[0]
    on_time_samp = on_time.sample(False, bagFrac*ratio, seed=int(current_seed))
    delayed_samp = delayed.sample(True, bagFrac, seed=int(current_seed))
      
    #combine into single dataframe, replace old version in memory and cache
    balanced_train = on_time_samp.union(delayed_samp).cache()
    balanced_train.createOrReplaceTempView("balanced_train")   
        
    #build a tree on the sampled data
    if model_type == 'tree':
      one_prediction = eval_tree(balanced_train, data_eval, nDepth)
    elif model_type == 'forest':
      one_prediction = eval_forest(balanced_train, data_eval, nDepth)
    elif model_type == 'gbt':
      one_prediction = eval_gbt(balanced_train, data_eval)
    else:
      print('Do not recognize model type')
      break
    
    #join predictions from latest tree to dataframe of predictions
    prediction_df = prediction_df.join(one_prediction, 'id','left')
    prediction_df = prediction_df.withColumnRenamed('prediction',f'prediction_{i}')


      
    # Clear the sample and metrics variables for next tree
    on_time_samp = delayed_samp = tree_metrics = None
    print('Completed bag ', i)
    print(f"\nCompleted bag in {(time.time() - start)/60} minutes")
  
  #once all bagged predictions complete, calculate majority vote prediction
  n = lit(len(prediction_df.columns) - 2.0)
  rowMean  = (reduce(add, (col(x) for x in prediction_df.columns[2:])) / n)
  prediction_df = prediction_df.withColumn("prediction_mean", rowMean)
  rowVote = when(col("prediction_mean") >= 0.5, 1).otherwise(0)
  prediction_df = prediction_df.withColumn("prediction", rowVote)
  prediction_df.createOrReplaceTempView("prediction_df")
  LabelsandPredictions    = spark.sql("select label, prediction from prediction_df")
    
  #return evaluation metrics 
  modelDisplayName = model_type + "_" + str(nTrees) + "bags_" + str(nDepth) + "deep"
  metrics = evaluateTreePerformance(modelDisplayName, LabelsandPredictions)


  return metrics

####4.2.3.1 Decision Tree - Bootstrap Aggregation (Bagging)

In [81]:
# Save hyperparameters that are fed into the bagging model
nTrees = 12
nDepth = 12
bagFrac = 0.9

# Run the bagging algorithm, including generating predictions and evaluating model performance
print("\033[1m Bagged Tree Ensemble \033[0m")
print("\nEvaluation against dev set")
print("========================")
print('Number of trees: ', nTrees)
print('Tree depth: ', nDepth)
start = time.time()
bagging(nTrees, nDepth, bagFrac, data_train_tree, data_dev_tree, 'tree')
print(f"\nTrained Model in {(time.time() - start)/60} minutes")

####4.2.3.2 Random Forest - Bootstrap Aggregation (Bagging)

In [83]:
# Establish hyperparameters
nTrees = 12
nDepth = 12
bagFrac = 0.9

# Run the bagging algorithm, generate predictions and evaluate model performance
print("\033[1m Bagged Random Forest Ensemble \033[0m")
print("\nEvaluation against dev set")
print("========================")
print('Number of trees: ', nTrees)
print('Tree depth: ', nDepth)
start = time.time()
bagging(nTrees, nDepth, bagFrac, data_train_tree, data_dev_tree, 'forest')
print(f"\nTrained Model in {(time.time() - start)/60} minutes")

####4.2.3.3 Gradient-Boosted Tree - Bootstrap Aggregation (Bagging)

In [85]:
# Set hyperparameters
nTrees = 12
nDepth = 12
bagFrac = 0.9

# Run bagging algorithm, generate predictions and evaluate model performance
print("\033[1m Bagged Gradient Boosted Tree Ensemble \033[0m")
print("\nEvaluation against dev set")
print("========================")
print('Number of trees: ', nTrees)
print('Tree depth: ', nDepth)
start = time.time()
bagging(nTrees, nDepth, bagFrac, data_train_tree, data_dev_tree, 'gbt')
print(f"\nTrained Model in {(time.time() - start)/60} minutes")

###4.3 Feature Significance review

In each of our model types above, we extracted feature significance metrics and we summarize these in the table below. This was a helpful exercise to help us build intuition into which were the most relevant features - and in particular which features were consistently relevant across different algorithm types. We see that departure time, month of flight, and airline carrier code are consistently important features, whereas other features expected to be consistently important such as rainfall and visibility, are not consistently top ranking features across these algorithm types. 

As we examined feature importance for the different models, it was interesting to note how the ordered list of features changed from model to model. As an example, the departure time of day was the most influential feature for decison tree, random forest, and gradient boosted tree examples shown, but the fifth most important for logistic regression. Similarly, the rate of precipitation at the origin ranged from 18th (gradient boosted tree) to second (decision tree). These results were puzzling given how similarly the different models performed.


<img src="https://github.com/leebean337/w261_images/blob/master/feature_importance.png?raw=true" width="60%" >

###4.4 Final Algorithm: Logistic Regression Theory and Toy Example

Despite considerable effort dedicated to our tree-based modelling approaches, we ultimately selected logistic regression for our final algorithm given that it provided the best performance, both in term of f1-score and recall as well as runtime. In our case, there was little trade-off required in this decision. This section provides more background on the theory behind logistic regression, the gradient descent algorithm by which it is applied and a toy example outlining the steps involved in one iteration of gradient descent.

Logistic regression is similar to linear regression in as far as it begins by fitting a linear relationship between the features and the outcome variable by determining the optimal weights associated with each feature. The key difference is that the outcome variable is binary and the resulting prediction from a logistic regression model must therefore be a binary classification as well. In terms of interpretation, a unit change in a feature will multiply the odds of the outcome variables by a constant factor (the associated weight). This technique relies on the use of a sigmoid function (g(z)) which translates the raw output of the linear regression (z) into a probability value between 0 and 1. This results in the predicted probability (hΘ(x)) that a particular example belongs to a specific class (usually the positive class).

<img src="https://github.com/tonydisera/261-final-project-images/raw/master/LR%20sigmoid.jpg?raw=true"/>

Using the toy example below, we will demonstrate how our algorithm will apply logistic regression using standard gradient descent. Our goal is to find the optimal parameters (Θ), which will minimize our logistic loss function J(Θ) where m is the number of training examples and for each training example, the true value of y determines whether the log of the predicted probability (log(hΘ(x))) or log of one minus the predicted probability (log(1-(hΘ(x))) will factor into the total loss calculation.

<img src="https://github.com/tonydisera/261-final-project-images/raw/master/LR%20cost%20function.jpg?raw=true"/>

Gradient descent is an iterative optimisation technique to find the minimum of the loss function, by taking steps down the gradient of the function until a set of parameters (Θ) which are sufficiently close to the minimum of the loss are identified. To determine the gradient step, we take the derivative of the loss function \\( \nabla\_{\boldsymbol{\theta}} J(\boldsymbol{\theta}) \\)

<img src="https://github.com/tonydisera/261-final-project-images/raw/master/LR%20gradient%20calculation.jpg?raw=true"/>

This array is then multiplied by the learning rate and then subtracted from the prior set of parameters (Θ) and the process is repeated again. This is continued until Θ_old and Θ_new are sufficiently close to one another that it doesn’t make sense to continue updating. 

\\(  \theta\_{\text{new}} = \theta\_{\text{old}} - \eta \cdot \nabla\_{\boldsymbol{\theta}} J(\boldsymbol{\theta}) \\)

The below presents a toy example illustrating how an iteration of the algorithm would operate in practice. Five examples are provided in this toy training set with three features, two continuous variables representing the total amount of rainfall on the day of the flight and average visibility on the day of the flight (both normalized to values between 0 and 1) and a dummy variable representing time of flight (=1 for PM, =0 for AM). As in our dataset, a label = 1 means a delayed flight and a label = 0 means an ontime flight.

| \\(i\\) | \\(x\_{1i}\\) |    \\(x\_{2i}\\)  |    \\(x\_{3i}\\)  |   \\(y\_i\\) | 
|:---:|:--------:|:---------------:|
|  -  | PM |  Rain  | Visibility | label |  
| 1     |1     |     0.0     |     1     |     0     |       
| 2     |0     |     0.0     |     0.9     |     0     |       
| 3     |1     |     0.1     |     0.5     |     1     |      
| 4     |0     |     0.1     |     1     |     0     |     
| 5     |1     |     1.0     |     0.2     |     1     |       

The parameter vector \\(\theta\\) for our initial line \\(y=x \\) is  \\(\theta\\) = \\(\begin{bmatrix} 1 \ \quad 1 \ \quad 1 \ \quad 0 \ \end{bmatrix} \\) where the last value is the constant. 
         
The (augmented) data points \\(x_j\\) are:
\\( \begin{bmatrix} 1 \\\ 0 \\\ 1 \\\ 1 \\\ \end{bmatrix} \ \begin{bmatrix} 0 \\\ 0 \\\ .9 \\\ 1 \\\ \end{bmatrix} \ \begin{bmatrix} 1 \\\ .1 \\\ .5 \\\ 1 \\\ \end{bmatrix} \ \begin{bmatrix} 0 \\\ .1 \\\ 1 \\\ 1 \\\ \end{bmatrix}\ \begin{bmatrix} 1 \\\ 1 \\\ .2 \\\ 1 \\\ \end{bmatrix} \\)

The following table provides the values required to calculate the loss as well as the gradient update required in the first iteration of gradient descent. 

| \\(i\\) |  \\(x\_j'\\)  | \\(y\_j\\) |   \\(\boldsymbol{\theta}^T\cdot\mathbf{x}'\_j\\) | \\(h_ \theta (x) = g(\boldsymbol{\theta}^T\cdot\mathbf{x}'\_j\\)) | <img src="https://github.com/tonydisera/261-final-project-images/raw/master//LR%20individual%20loss.jpg?raw=true" width=60%/> | <img src="https://github.com/tonydisera/261-final-project-images/raw/master/LR%20individual%20gradient.jpg?raw=true" width=60%/> |
|:----:|:-----:|:----------------:|:------------------------:|:------------------------:|
|  -  |  input   | true \\(y\\)   |   linear weights    |  predicted probability       |loss  component for \\(x\_j\\)       |gradient  component for \\(x\_j\\)       
| 1     | \\( \begin{bmatrix} 1 \\\ 0 \\\ 1 \\\ 1 \\\ \end{bmatrix}\\)   |  0   |     2.0             |    0.1192    |    0.1269    |\\( \begin{bmatrix} 0.12 \\\ 0 \\\ 0.12 \\\ 0.12 \\\ \end{bmatrix}\\)   |      
| 2     | \\( \begin{bmatrix} 0 \\\ 0 \\\ .9 \\\ 1 \\\ \end{bmatrix}\\)   |  0   |     0.9             |    0.2891  |    0.3412    |\\( \begin{bmatrix} 0 \\\ 0 \\\ 0.26 \\\ 0.29 \\\ \end{bmatrix}\\)   |   
| 3     | \\( \begin{bmatrix} 1 \\\ .1 \\\ .5 \\\ 1 \\\ \end{bmatrix}\\)   |  1   |     1.6             |   0.1680  |    1.7839    |\\( \begin{bmatrix} -0.83 \\\ -0.08 \\\ -0.42 \\\ -0.83 \\\ \end{bmatrix}\\)   |   
| 4     | \\( \begin{bmatrix} 0 \\\ .1 \\\ 1 \\\ 1 \\\ \end{bmatrix}\\)   |  0   |     1.1             |    0.2497  |    0.2873    |\\( \begin{bmatrix} 0 \\\ 0.02 \\\ 0.25 \\\ 0.25 \\\ \end{bmatrix}\\)   |   
| 5     | \\( \begin{bmatrix} 1 \\\ 1 \\\ .2 \\\ 1 \\\ \end{bmatrix}\\)   |  1   |     2.2             |    0.0998  |    2.3051    |\\( \begin{bmatrix} -0.90 \\\ -0.90 \\\ -0.18 \\\ -0.90 \\\ \end{bmatrix}\\)   | 


The training loss \\(f(\boldsymbol{\theta})\\) for this data and these weights is: 0.1269 + 0.3412 + 1.7839 + 0.2873 +  2.3051 = 4.8444 / 5 = 0.96888

Our gradient update is \\( \begin{bmatrix} -1.61 \\\ -0.96 \\\ 0.03 \\\ -1.07 \\\ \end{bmatrix} \\) and assuming a learning rate of \\(\eta = \\)  0.1, we would adjust our initial weight matrix by \\( \begin{bmatrix} -0.16 \\\ -0.10 \\\ 0.003 \\\ -0.11 \\\ \end{bmatrix} \\)

Our new weight matrix would therefore be \\( \begin{bmatrix} 1.16 \\\ 1.10 \\\ 1.00 \\\ 0.11 \\\ \end{bmatrix} \\)

This process would continue until the the difference between the new weight matrix and the prior weight matrix becomes sufficiently small (i.e. where the derivative reaches nearly zero) that we can assume the minimum of the loss is reached and the optimal weight matrix has been approximated. The application of logistic regression in Spark MLlib differs from standard gradient descent which was applied above, since it would be extremely computational expensive to perform these calculations on our entire training set. Therefore, in practice stochastic gradient descent is applied. In this case only some of the examples, rather than the entire training set, are used to calculate the gradient that informs each step change.

#5. Algorithm Implementation

<img src="https://github.com/tonydisera/261-final-project-images/blob/master/ensemble.png?raw=true" width="30%" >

### 5.1 Logistic Regression - Hyper Parameter Tuning using Gridsearch and Crossfold Validation

Once both the weighted and unweighted models were established, we then explored different versions of hyperparameter tuning using gridsearch and crossfold validation. We set the validation metric to be the area under the precision-recall curve (AUC PR). This metric has the dual advantages of both being easily accessible by the API (and thus suitable to implement with crossfold validation) and similar in terms of scale and the directions of movement to the F1 metric that is our primary outcome metric. The crossfold validation exercise is computationally expensive and takes a long time, so we created a small sample (5%) of the training set and performed the crossfold validation on this smaller dataset.  
The code blocks are included below but commented out since this is a very time consuming operation. From previous work, the optimal parameters as identified by gridsearch with crossfold validation are:  
* **elasticNetParam (\\(\alpha)\\)** : 0.25
* **regParam (\\(\lambda)\\)** : 0.001
* **maxIter** : 20

<img src="https://github.com/tonydisera/261-final-project-images/blob/master/reg.png?raw=true" width="30%" >

In [90]:
### CODE COMMENTED TO AVOID RE-RUNNING
# #For this iteration, we had already determined that weighted models performed better so we did not search on that hyperparameter to save computation time.
# # Set the baseline model off of which the parameter grid can be builtbest_LRparams = cvModel.bestModel.extractParamMap()
# lr = LogisticRegression(labelCol="label", featuresCol="features",  weightCol="weight", standardization=False)

# # Create ParamGrid for Cross Validation
# paramGrid = (ParamGridBuilder()
#              .addGrid(lr.regParam, [0, .001, 0.01])
#              .addGrid(lr.elasticNetParam, [0.0, 0.25, 0.5, 1.0])
#              .addGrid(lr.maxIter, [20, 50])
#              .build())

In [91]:
### CODE COMMENTED TO AVOID RE-RUNNING

# # Set the evaluator function for the CrossValidator
# evaluator = BinaryClassificationEvaluator()
# evaluator = evaluator.setMetricName('areaUnderPR')

# # Create 5-fold CrossValidator
# cv = CrossValidator(estimator=lr, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=5)

# print("LR Model - evaluation against dev set")
# print("========================")
# start = time.time()

# # Run cross validations on the small training set
# cvModel = cv.fit(data_train_small)
# print(f"\nTrained gridsearch crossfold validation model in {(time.time() - start)/60} minutes")
# predictions = cvModel.transform(data_dev).cache()
# predictions.createOrReplaceTempView("predictions")
# evaluatePerformance(predictions, "Gridsearch_Crossfold_Validation_LR")
# print(f"\nEvaluated metrics in {(time.time() - start)/60} minutes")

In [92]:
### CODE COMMENTED TO AVOID RE-RUNNING

# # Extract and view the set of parameters returned for the best model in the crossfold validation set.
# best_LRparams = cvModel.bestModel.extractParamMap()
# best_LRparams

### 5.2 Logistic Regression with Optimal Parameters (Identified by Gridsearch and Crossfold Validation)

Now that we've found the optimal hyper paramters from the previous section, we can now use these parameter (regParam, elasticNetParam, maxIter) in subsequent models, starting with a single logistic regression model trained on the full dataset (with class weights). Results will show a slight improvement over the previous models.

In [95]:
# Create LogisticRegression model with class weighted on full training dataset (no temporal data)
# Use optimal hyper parameters as identified by gridsearch with crossfold

lrBestParams = LogisticRegression(labelCol="label", featuresCol="features", weightCol="weight", maxIter=20, standardization=False, regParam=.001, elasticNetParam=0.25 )

# Train model with Training Data

start = time.time()
lrBestParamsModel = lrBestParams.fit(data_train)
print("\033[1m Logistic Regression Model [weighted] with optimal parameters \033[0m")

print(f"\nTrained model in {(time.time() - start)/60} minutes")

print("\nEvaluation against dev set")
print("========================")
start = time.time()
predictions = lrBestParamsModel.transform(data_dev).cache()
predictions.createOrReplaceTempView("predictions")
evaluatePerformance(predictions, "LR_Weighted_OptimalParms")
print(f"\nEvaluated metrics in {(time.time() - start)/60} minutes")

### 5.3 Final Model Selected : Ensemble Model using Logistic Regression and Temporal Data

<img src="https://github.com/tonydisera/261-final-project-images/blob/master/multimodel.png?raw=true" width="60%" >

At this point we have gained insight into the fact that logistic regression with class weights and optimal parameters found using gridsearch and cross validation have outperformed other models, including unweighted logistic regression and, to our great surprise, tree based models. Thus having selected logistic regression as our algorithm of choice, we now shift our focus towards getting additional model performance gains.  
Given that there is evidence of delays varying based on seasonality and arrival time of day as described in the EDA section 2.2 above, we tried ensemble models that combine the decisions from multiple time-stratified models to improve the overall performance.  We decided to train individual models based on one or more aspects of temporal data and feed dev/test data to the appropriate model for delay or no delay prediction. The temporal properties of interest are: 
  
**TIME-OF-DAY** = ['LATE_NIGHT', 'MORNING', 'EVENING']  
**SEASONS** = ['WINTER', 'SPRING', 'SUMMER', 'FALL']  

In order to train the multiple models, we first need to add the appropriate flags to the data (train, dev and test) and divide the training set into appropriate subsets. These subsets will then be used to train the indivdual models that make up the ensemble. Similarly, the dev (and test) observations need to be sent to the corresponding model for prediction. And finally, results from each model will be collected to give overall model metrics.  
  
We tried several variants of ensemble models e.g.
- Season based : 4 models --> 'WINTER', 'SPRING', 'SUMMER', 'FALL'  
- Time based: 3 models --> 'LATE_NIGHT', 'MORNING', 'EVENING'  
- Season + Time based: 12 models --> 'WINTER-LATE_NIGHT', 'WINTER-MORNING', 'WINTER-EVENING', 'SPRING-LATE_NIGHT', 'SPRING-MORNING', 'SPRING-EVENING', 'SUMMER-LATE_NIGHT', 'SUMMER-MORNING', 'SUMMER-EVENING', 'FALL-LATE_NIGHT', 'FALL-MORNING', 'FALL-EVENING'  

In the interest of brevity, we have only provided code (output) for the final ensemble model [ Season + Time based: 12 models] as shown in the figure below.  
This model provided better results than ensemble models of just Seasons or just Time-of-day and of course, all the previous models shown above.  
  
From a scalability perspective, we believe that the parallel nature of Spark's machine learing API (ML Lib) works very well for such large datasets. The models train fairly quickly, with the ensemble only taking a few minutes. This is despite the fact that while the underlying MLLib data processing engine is distributed in nature, in our current implemenation, we train and evaluate the individual models themselves in series. Given more time, we would have liked to use the Spark framework to further parallelize the training and evaluation of the individual models as well.

#### 5.3.1 Add Season and Arrival Time of Day column to datasets

In [99]:
# Add relevant columns to train, dev and test dataset
# Train Data
data_train = data_train.withColumn('SEASON', f.when((f.col("MONTH") >= 3) & (f.col("MONTH") <= 5), "SPRING")\
                                          .when((f.col("MONTH") >= 6) & (f.col("MONTH") <= 8), "SUMMER")\
                                          .when((f.col("MONTH") >= 9) & (f.col("MONTH") <= 11), "FALL")\
                                          .otherwise("WINTER"))

data_train = data_train.withColumn('ARR_TIME_OF_DAY', f.when((f.col("ARR_TIME_BLK") == '0001-0559') | (f.col("ARR_TIME_BLK") == '2200-2259')| (f.col("ARR_TIME_BLK") == '2300-2359'), "LATE_NIGHT")\
                                          .when((f.col("ARR_TIME_BLK") == '0600-0659') | (f.col("ARR_TIME_BLK") == '0700-0759')| (f.col("ARR_TIME_BLK") == '0800-0859'), "MORNING")\
										  .when((f.col("ARR_TIME_BLK") == '0900-0959') | (f.col("ARR_TIME_BLK") == '1000-1059')| (f.col("ARR_TIME_BLK") == '1100-1159'), "MORNING")\
										  .when((f.col("ARR_TIME_BLK") == '1200-1259') | (f.col("ARR_TIME_BLK") == '1300-1359'), "MORNING")\
                                          .otherwise("EVENING"))
# Dev Data
data_dev = data_dev.withColumn('SEASON', f.when((f.col("MONTH") >= 3) & (f.col("MONTH") <= 5), "SPRING")\
                                          .when((f.col("MONTH") >= 6) & (f.col("MONTH") <= 8), "SUMMER")\
                                          .when((f.col("MONTH") >= 9) & (f.col("MONTH") <= 11), "FALL")\
                                          .otherwise("WINTER"))

data_dev = data_dev.withColumn('ARR_TIME_OF_DAY', f.when((f.col("ARR_TIME_BLK") == '0001-0559') | (f.col("ARR_TIME_BLK") == '2200-2259')| (f.col("ARR_TIME_BLK") == '2300-2359'), "LATE_NIGHT")\
                                          .when((f.col("ARR_TIME_BLK") == '0600-0659') | (f.col("ARR_TIME_BLK") == '0700-0759')| (f.col("ARR_TIME_BLK") == '0800-0859'), "MORNING")\
										  .when((f.col("ARR_TIME_BLK") == '0900-0959') | (f.col("ARR_TIME_BLK") == '1000-1059')| (f.col("ARR_TIME_BLK") == '1100-1159'), "MORNING")\
										  .when((f.col("ARR_TIME_BLK") == '1200-1259') | (f.col("ARR_TIME_BLK") == '1300-1359'), "MORNING")\
                                          .otherwise("EVENING"))
# Test Data
data_test = data_test.withColumn('SEASON', f.when((f.col("MONTH") >= 3) & (f.col("MONTH") <= 5), "SPRING")\
                                          .when((f.col("MONTH") >= 6) & (f.col("MONTH") <= 8), "SUMMER")\
                                          .when((f.col("MONTH") >= 9) & (f.col("MONTH") <= 11), "FALL")\
                                          .otherwise("WINTER"))

data_test = data_test.withColumn('ARR_TIME_OF_DAY', f.when((f.col("ARR_TIME_BLK") == '0001-0559') | (f.col("ARR_TIME_BLK") == '2200-2259')| (f.col("ARR_TIME_BLK") == '2300-2359'), "LATE_NIGHT")\
                                          .when((f.col("ARR_TIME_BLK") == '0600-0659') | (f.col("ARR_TIME_BLK") == '0700-0759')| (f.col("ARR_TIME_BLK") == '0800-0859'), "MORNING")\
										  .when((f.col("ARR_TIME_BLK") == '0900-0959') | (f.col("ARR_TIME_BLK") == '1000-1059')| (f.col("ARR_TIME_BLK") == '1100-1159'), "MORNING")\
										  .when((f.col("ARR_TIME_BLK") == '1200-1259') | (f.col("ARR_TIME_BLK") == '1300-1359'), "MORNING")\
                                          .otherwise("EVENING"))

data_train.createOrReplaceTempView("data_train")
data_dev.createOrReplaceTempView("data_dev")
data_test.createOrReplaceTempView("data_test")


#### 5.3.2 Split Train, Dev and Test Data by Season-Time

In [101]:
# Initialize empty dictionary to store dataset names & reference dataframes [train, dev, test]
trainSetDictionaryCombined = {}
devSetDictionaryCombined = {}
testSetDictionaryCombined = {}

# Training data is split into multiple datasets based on Season and Arrival Time of Day 
# Since there 12 dataset names based on combinations, the dataset names are stored in a dictionary referecing the appropriate dataframe

TIME = ["LATE_NIGHT", "MORNING", "EVENING"]
SEASONS = ['WINTER', 'SPRING', 'SUMMER', 'FALL']

print("\033[1m ROW COUNTS PER DATA SUBSET \033[0m")
for s in SEASONS:
  for t in TIME:
    trainQuery = "select * from data_train where SEASON = '{}' and ARR_TIME_OF_DAY = '{}'".format(s,t)
    trainSetName = 'data_train' + s + "_" + t
    trainSetNameStr = str(trainSetName)
    trainSetName = spark.sql(trainQuery)
    trainSetName.createOrReplaceTempView(trainSetNameStr)
    print("\033[1m {}-{} \033[0m".format(s,t))
    print('{}:  {:,}'.format(trainSetNameStr, trainSetName.count()))
    trainSetDictionaryCombined[trainSetNameStr] = trainSetName
    # Get delay / no delay weights specifically for subset of Season-Time training data. If commented out then weights from full training set will be applied to data subsets
#     weight_nodelay,weight_delay = getWeight(trainSetDictionaryCombined[trainSetNameStr])
#     trainSetDictionaryCombined[trainSetNameStr] = setWeight(trainSetDictionaryCombined[trainSetNameStr])
    devQuery = "select * from data_dev where SEASON = '{}' and ARR_TIME_OF_DAY = '{}'".format(s,t)
    devSetName = 'data_dev' + s + "_" + t
    devSetNameStr = str(devSetName)
    devSetName = spark.sql(devQuery)
    devSetName.createOrReplaceTempView(devSetNameStr)
    print('{}:  {:,}'.format(devSetNameStr, devSetName.count()))
    devSetDictionaryCombined[devSetNameStr] = devSetName
#     devSetDictionaryCombined[devSetNameStr] = setWeight(devSetDictionaryCombined[devSetNameStr])

    testQuery = "select * from data_test where SEASON = '{}' and ARR_TIME_OF_DAY = '{}'".format(s,t)
    testSetName = 'data_test' + s + "_" + t
    testSetNameStr = str(testSetName)
    testSetName = spark.sql(testQuery)
    testSetName.createOrReplaceTempView(testSetNameStr)
    print('{}:  {:,}'.format(testSetNameStr, testSetName.count()))
    testSetDictionaryCombined[testSetNameStr] = testSetName
#     testSetDictionaryCombined[testSetNameStr] = setWeight(testSetDictionaryCombined[testSetNameStr])


#### 5.3.3 Final Ensemble Model

#### 5.3.3.1 Evaluation on  Dev Data

In [104]:
# Create weighted LogisticRegression models by Season-Time with class weights 
# Use optimal hyper parameters as identified by gridsearch with crossfold

newDF_Flag = 0
lrWeighted = LogisticRegression(labelCol="label", featuresCol="features", weightCol="weight", maxIter=20, standardization=False, regParam=0.001, elasticNetParam=0.25 )
print("\033[1m Ensemble Model based on Season-Time \033[0m")
print("\033[1m --- Logistic Regression Model [weighted] with optimal parameters \033[0m")
for s in SEASONS:
  for t in TIME:
    start = time.time()
  # Train model with Training Data
    lrModel = lrWeighted.fit(trainSetDictionaryCombined['data_train' + s + "_" + t])
    print(s + "_" + t)
    print(f"\nTrained model in {(time.time() - start)/60} minutes")
    print("========================")
    start = time.time()
    predictions = lrModel.transform(devSetDictionaryCombined['data_dev' + s + "_" + t]).cache()
    # Must create predictionsAll DF to append results from each individual model in the ensemble. Empty DF must be created with correct schema matching predictions DF from model
    if newDF_Flag == 0:
      schema = predictions.schema
      predictionsAll = spark.createDataFrame([], schema)
      newDF_Flag = 1
    predictionsAll = predictionsAll.union(predictions)
    predictions.createOrReplaceTempView("predictions")
    modelName = 'LR_Weighted_CV_Parms' + s.capitalize() + t.capitalize()
#     evaluatePerformance(predictions, modelName+'_0', 'LR_Weighted_CV_Parms_SeasonTime_regParm_0')
#     print(f"\nEvaluated metrics in {(time.time() - start)/60} minutes")

print("Evaluation against dev set")
print("========================")
evaluatePerformance(predictionsAll, "LR_Weighted_OptimalParms_MultiModel")


In [105]:
# pd.DataFrame(predictionDF.loc['LR_Weighted_OptimalParms_MultiModel']).transpose()
# predictionDF.head()

#### 5.3.3.2 Evaluation on Unseen Test Data

In [107]:
# Evaluation on Test Data
# Create weighted LogisticRegression models by Season-Time with class weights 
# Use optimal hyper parameters as identified by gridsearch with crossfold

newDF_Flag = 0

lrWeighted = LogisticRegression(labelCol="label", featuresCol="features", weightCol="weight", maxIter=20, standardization=False, regParam=0.001, elasticNetParam=0.25 )
print("\033[1m Ensemble Model based on Season-Time \033[0m")
print("\033[1m --- Logistic Regression Model [weighted] with optimal parameters \033[0m")
for s in SEASONS:
  for t in TIME:
    start = time.time()
  # Train model with Training Data
    lrModel = lrWeighted.fit(trainSetDictionaryCombined['data_train' + s + "_" + t])
    print(s + "_" + t)
    print(f"\nTrained model in {(time.time() - start)/60} minutes")
    print("========================")
    start = time.time()
    predictions = lrModel.transform(testSetDictionaryCombined['data_test' + s + "_" + t]).cache()
    # Must create predictionsAll DF to append results from each individual model in the ensemble. Empty DF must be created with correct schema matching predictions DF from model
    if newDF_Flag == 0:
      schema = predictions.schema
      predictionsFinal = spark.createDataFrame([], schema)
      newDF_Flag = 1
    predictionsFinal = predictionsFinal.union(predictions)
    predictions.createOrReplaceTempView("predictions")
    modelName = 'Test Data_' + s.capitalize() + t.capitalize()
#     evaluatePerformance(predictions, modelName+'_0', 'LR_Weighted_CV_Parms_SeasonTime_regParm_0')
#     print(f"\nEvaluated metrics in {(time.time() - start)/60} minutes")

print("Evaluation against test set")
print("========================")
evaluatePerformance(predictionsFinal, "TestData_Eval_LR_Weighted_OptimalParms_MultiModel")

#6. Conclusions

In conclusion, our models are able to correctly predict (true positive) a negative arrival experience (including delayed arrivals >15 minutes, diverted or cancelled flights) with a recall of .67 and an f1-score of .42. This result holds for all flights across the United States, from the busiest airports to the smallest. Our model struggles the most with over-predicting delays, which is apparent in the relatively poor precision scores. We continue to believe that customers are most interested in correctly predicting when a flight might be delayed, and would react to our error in precision as "getting lucky", much the way one might feel when rain is predicted but never materializes. While there are clearly examples in the literature that perform better, closer inspection frequently reveals that those problem sets have been more finely specified by only focusing on select cities or select airlines. Moreover, performance comparison between models and confusion matrix for the final ensemble model are shown in the sections below.

In terms of machine learning items, some of our team's most significant learnings include:
- The tuning decision we made that had the largest impact across all models was creating a balanced training set.
- The process of writing the bagging algorithm from scratch really helped us understand how it worked.
- We seemed to get "stuck" in a relatively narrow performance range with all of our tree-based models. Spark's MLLib API is relatively limited, and by the time we got our bagging algorithm working we had limited time to troubleshoot this. Given more time, we could have pulled a sample of the data out of Spark and experimented with the more versatile tools in SKLearn and then applied that knowledge to the larger dataset. Additionally, we could have explored our feature engineering approach to see if this potential "underfitting" could be resolved with additional features (see point below on weather).
- The weather data was extremely complex and quite messy. We had to make a number of decisions along the way to get the model working. Our intent had been to get a base level of weather data incorporated, then circle back and add more. We only had time to add in snow data during that second pass.
- We would have liked to further refine our outcome model. While signaling to a customer than an adverse effect is likely is a good thing, there remains a large gap in outcomes between a 16 minute delay and a four hour delay (or diverted flight!). Given more time, we would have liked to try to refine that outcome prediction with an additional model.

In terms of scalability, some of our most significant learnings include:
 - The use of the MLLib API abstracts away much of the thought (and frustration!) we experienced during the course working with RDDs. Indeed, in the rare occasions during this project where we considered utilising RDDs, out of memory errors quickly refocused us on finding solutions within the Dataframe API.
 - There remained a couple of instances, namely in creating and training our bagged models and in training the logistic regression ensembles, where we did not take advantage of the distributed environment. Given more time, we might have realized performance gains by parallelizing those computations, rather than iterating through them.

In [110]:
# load savaed model metrics from CSV

metrics_df = pd.read_csv("/dbfs/user/ammara.essa@ischool.berkeley.edu/W261_Final_Project/predictions_df.csv")
metrics_df.set_index('Unnamed: 0', inplace=True)
metrics_df.rename(index={'Unnamed: 0' : 'Model Name'}, inplace=True)

### 6.1 Confusion Matrix for Test Data  
Below is the confusion matrix for predictions on the unseen test data. As we can see, there are still quite a few false negatives in the model, indicating that fligts that are delayed are still being classified as not delayed.

In [112]:
print('\033[1m Confusion matrix for model : TestData_Eval_LR_Weighted_OptimalParms_MultiModel \033[0m')
showConfusionMatrix(metrics_df.loc['TestData_Eval_LR_Weighted_OptimalParms_MultiModel'])

Unnamed: 0,Delay_(Predicted),No_Delay_(Predicted)
Delay_(Actual),444123,222257
No_Delay_(Actual),986377,1659791


### 6.2 Comparison of Models  
We've also provided a comparison of all the aforementioned models (in both tabular and graphical form), concluding with our final model ***LR_Weighted_OptimalParms_MultiModel*** for which metrics against dev and test data are provided.

In [114]:
# Dataframe model performance on f1, precision and recall
s = metrics_df[['precision','recall','f1score']].style.background_gradient(cmap="RdYlGn")
display(s)


Unnamed: 0_level_0,precision,recall,f1score
Unnamed: 0,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1
DecisionTree_Baseline,0.600631,0.0896759,0.156053
RandomForest_Baseline,0.7042,0.0744293,0.134629
GBT_Baseline,0.637856,0.0662527,0.120037
Decision_Tree_Balanced,0.317958,0.606644,0.417233
Random_Forest_Balanced,0.356423,0.576526,0.440511
GBT_Balanced,0.319931,0.626333,0.423526
tree_12bags_12deep,0.322393,0.598598,0.419078
forest_12bags_12deep,0.330183,0.627119,0.4326
gbt_12bags_12deep,0.321464,0.622663,0.424019
LR_Unweighted_DefaultParms,0.559851,0.057988,0.105091


In [115]:
# Plot comparing model performance on f1, precision and recall

fig = plt.figure(figsize=(16,14))
#plt.rcParams["figure.figsize"] = (16,14)
#ax = fig.add_subplot(1, 1, 1)

# labels = metrics_df['Unnamed: 0']
labels = metrics_df.index
plt.plot(labels, metrics_df.precision, color='slateblue', 
                 linewidth=2, label="precision", marker='o' )
plt.plot(labels, metrics_df.recall, color='crimson', 
                 linewidth=2, label="recall", marker='o' )
plt.plot(labels, metrics_df.f1score, color='gray', 
                 linewidth=2, label="f1", marker='o' )

plt.yticks(np.arange(0, .80, .01))
plt.ylabel('Performance', fontsize=14)
plt.xticks(labels)
plt.xticks(rotation='vertical')
plt.xlabel('Models', fontsize=14, labelpad=20)
plt.legend()
plt.title("Model Performance")
plt.grid(b=True, which='major', color='lightgray', linestyle='-')
display(plt.show())


#7. Application of Course Concepts

There are several course concepts in which our final project helped further expand and embed our understanding. This included concepts around caching, scalability, model complexity, vector embedding, and model assumptions.

We learned the importance of caching when attempting to build summary statistics from a dataframe. After producing a 'predictions' dataframe with each row representing a prediction for our dev/test set, we would attempt to use sql queries to count the number of true/false positives and true/false negatives. It was a revelation to us that without caching, the sql queries would produce slightly different numbers from each run, despite our expectation that this would be a static exercise. Caching our predictions dataframe resolvd this issue, and several other similar issues that arose during our efforts to compile results (during bagging, etc). <br>

Working with a dataset with 32 million observations and joining to the weather dataset of 600 million observations was a valuable exercise. I/O scaled very well in the Spark Databricks cluster. The speed at which Spark is able to read and write parquet files was astounding.  In addition, our only out-of-memory errors occurred when we switched back to Pandas for some EDA.  We were concerned with possible scaling problems joining to the weather data, but the Spark dataframe API handled left joins from airlines to weather in under 30 minutes clock time with no out-of-memory errors.  The basic MLLib classifiers all scaled well with this dataset, with Logistic Regression typically training in under 2 minutes and Decision trees in under 5 minutes.  Once we elaborated on the models, introducing bagging and boosting, we were at the mercy of cluster load, with some training times exceeding 2 hours.  But on the whole, all of the models scaled very well.

Writing the bagging algorithm from scratch really helped us think through and understand the different ways that tree models can be adapted to balance the bias variance tradeoff. While we did not see large gains in our metrics of interest from doing so, implementing this algortithm helped us understand the ways that random forests create high variance trees that are then balanced out through averaging, and that this reduction in variance can be extended by bagging deep trees or deep forests. We thought a lot about model complexity and regularization in the context of the logistic regression models, as well. The gridsearch crossfold validation routines that we ran would sometimes return a "regParam" equal to 0, effectively telling us that a model with no regularization was the best choice. On some iterations, though, a small entry would be returned, as well as a "elasticNetParam" setting indicating that a regularization blending L1 and L2 was the best implementation.

We utilized MLLib's pipeline API for one-hot encoding, standardization, and vector embedding.  In particular, the vector embedding made troubleshooting more challenging as the feature names were obscured.  However; we were able to obtain coefficients for the features in the Logistic Regression model to evaluate the odds ratios for the features and we were able to obtain the feature importance scores from the tree-based models.  Because the airlines dataset has a large number of observations, feature selection did not play a dominant role; however, there are still open questions regarding the appropriateness of one-hot encoding destination and arrival airports given such high cardinality. As such, we expected the tree-based models to perform better since categorical data converted to ordinals is well-tolerated by these models. Indeed, it was a bit of a disappointment to not see much improvement with recall scores, even with bagging and boosting.  

Assumptions underlying different algorithms played an important role in our determination of our model choice and subsequently feature engineering efforts. For example, multicollinearity reduces the precision of the estimated weights in our logistic regression model which means we need to place special efforts to remove some features which are highly correlated to other features included in our model. However, a decision tree's ability to utilise highly correlated features effectively and even model interactions between features by the very nature of the structure of trees meant that far less care was required in determining which features to include and exclude.  Due to time limitations, we were not able to fully explore the limits of tree-based models ability to optimize predictions given our set of features - the fact that our ensembled trees utilitisng bagging, random forest and gradient-boosted trees did not result in notably improved performance over our standard decision tree works against our intuition and brings suspect to our feature engineering efforts.

## References

Etani, Noriko (2019). "Development of a predictive model for on‑time arrival flight of airliner by discovering correlation between flight and weather data." Journal of Big Data, 6:85.

James, Witten, Hastie, and Tibshirani (2017). "An Introduction to Statistical Learning," Springer, chapters 4, 5, 6, and 8.

Kafle, Nabin, and Bo Zou (2016). “Modeling Flight Delay Propagation: A New Analytical-Econometric Approach.” Transportation Research Part B: Methodological, Pergamon, 1 Sept. 2016, www.sciencedirect.com/science/article/pii/S0191261515302010.

Parr, Terence and Howard, Jeremy (2020). "How to explain gradient boosting." Accessed in 2020: https://explained.ai/gradient-boosting/index.html.

Patgiri, Ripon, Sajid Hussain and Aditya Nongmeikapam (2020). "Empirical Study on Airline Delay Analysis and Prediction."" arXiv. Accessed at: https://arxiv.org/abs/2002.10254

Thiagarajan, B., L. Srinivasan, A. V. Sharma, D. Sreekanthan and V. Vijayaraghavan (2017)."A machine learning approach for prediction of on-time performance of flights," IEEE/AIAA 36th Digital Avionics Systems Conference (DASC), St. Petersburg, FL, 2017, pp. 1-6.  

The following document contains additional references to our research/literature review  
https://docs.google.com/document/d/1AzO84jtXgSrKxz9oSo1jpRQUxAApW1fJcQDHvYE1Jgw/edit?usp=sharing