## Exploratory Data Analysis for 2009 - 2015 Flight Delays and cancellations

####  Data Profiling 

##### Import required libraries

In [1]:
import findspark
findspark.init()

import pyspark
from pyspark.sql.session import SparkSession
sc = pyspark.SparkContext(appName="myAppName2").getOrCreate()
spark = SparkSession(sc)

In [2]:
from pyspark.sql.types import *
from pyspark.sql import functions as F 
from pyspark.sql.functions import isnan, when, count, col
import pandas as pd 

##### Create Schema

##### Flight Profile

##### Chunking Data

Our null analysis shows that the following features have a lot of null values: We will delete these rows, and omit some unnecessary columns as well. 

Verifying that there are no more null columns

##### Flight delays

In [3]:
delay_schema = StructType([StructField('date', StringType(), True),
                     StructField('flight_identifier', StringType(), True),
                     StructField('plan_time', StringType(), True),
                     StructField('actual_time', StringType(), True),
                     StructField('dep_delay', IntegerType(), True)])

In [4]:
delay_df = spark.read.format('csv').load('flight-delays.csv', header='true', schema=delay_schema) 
delay_df = delay_df.drop('actual_time')
delay_df = delay_df.drop('plan_time')
delay_df = delay_df.sample(.1)
delay_df = delay_df.dropna()
delay_df.printSchema()

root
 |-- date: string (nullable = true)
 |-- flight_identifier: string (nullable = true)
 |-- dep_delay: integer (nullable = true)



In [5]:
delay_df.show(10)

+----+-----------------+---------+
|date|flight_identifier|dep_delay|
+----+-----------------+---------+
|2009|               XE|       -5|
|2009|               XE|       -4|
|2009|               XE|       -7|
|2009|               XE|       -5|
|2009|               XE|       25|
|2009|               XE|       -6|
|2009|               XE|       -1|
|2009|               XE|       -2|
|2009|               XE|       -4|
|2009|               XE|       -1|
+----+-----------------+---------+
only showing top 10 rows



In [6]:
delay_df.count()

5461231

In [7]:
from pyspark.ml.feature import OneHotEncoderEstimator, StringIndexer, VectorAssembler
categoricalColumns = ["date", "flight_identifier"]
#indexers = [StringIndexer(inputCol=column, outputCol=column + "_index").fit(delay_df) for column in ["date", "flight_number", "flight_identifier", "delay", "reason"]]

In [8]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer
indexers = [StringIndexer(inputCol=column, outputCol=column+"_index").fit(delay_df) for column in categoricalColumns]
pipeline = Pipeline(stages=indexers)
delay2 = pipeline.fit(delay_df).transform(delay_df)
delay2.show(10)

+----+-----------------+---------+----------+-----------------------+
|date|flight_identifier|dep_delay|date_index|flight_identifier_index|
+----+-----------------+---------+----------+-----------------------+
|2009|               XE|       -5|       1.0|                   10.0|
|2009|               XE|       -4|       1.0|                   10.0|
|2009|               XE|       -7|       1.0|                   10.0|
|2009|               XE|       -5|       1.0|                   10.0|
|2009|               XE|       25|       1.0|                   10.0|
|2009|               XE|       -6|       1.0|                   10.0|
|2009|               XE|       -1|       1.0|                   10.0|
|2009|               XE|       -2|       1.0|                   10.0|
|2009|               XE|       -4|       1.0|                   10.0|
|2009|               XE|       -1|       1.0|                   10.0|
+----+-----------------+---------+----------+-----------------------+
only showing top 10 

In [9]:
#from pyspark.ml.feature import OneHotEncoder, StringIndexer
#indexers_ON = [OneHotEncoder(inputCol=column, outputCol=column+"_Vec") for column in filter(lambda x: x.endswith('_index'), delay2.columns) ]
#pipeline = Pipeline(stages=indexers_ON)
#delay2 = pipeline.fit(delay2).transform(delay2)
#delay2.show(10)

In [10]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import RandomForestClassifier

#assemblerInputs = ['date_index','flight_identifier_index']
assembler = VectorAssembler(inputCols=['date_index','flight_identifier_index'], outputCol="features")
delay3 = assembler.transform(delay2)
delay3 = delay3.select(['features', 'dep_delay'])
delay3.show(10)


+----------+---------+
|  features|dep_delay|
+----------+---------+
|[1.0,10.0]|       -5|
|[1.0,10.0]|       -4|
|[1.0,10.0]|       -7|
|[1.0,10.0]|       -5|
|[1.0,10.0]|       25|
|[1.0,10.0]|       -6|
|[1.0,10.0]|       -1|
|[1.0,10.0]|       -2|
|[1.0,10.0]|       -4|
|[1.0,10.0]|       -1|
+----------+---------+
only showing top 10 rows



In [11]:
splits = delay3.randomSplit([0.7, 0.3])
train_df = splits[0]
test_df = splits[1]

In [21]:
from pyspark.ml.regression import LinearRegression
lr = LinearRegression(featuresCol = 'features', labelCol='dep_delay', maxIter=10, regParam=0.3, elasticNetParam=0.8)
lr_model = lr.fit(train_df)
print("Coefficients: " + str(lr_model.coefficients))
print("Intercept: " + str(lr_model.intercept))
lr_predictions = lr_model.transform(test_df)
lr_predictions.select("prediction","dep_delay","features").show(5)
from pyspark.ml.evaluation import RegressionEvaluator
lr_evaluator = RegressionEvaluator(predictionCol="prediction", \
                 labelCol="dep_delay",metricName="rmse")
print("Root Mean Squared Error (RMSE) on test data = %g" % lr_evaluator.evaluate(lr_predictions))

Coefficients: [0.0,-0.08711292664978186]
Intercept: 9.594737106457627


Exception ignored in: <function JavaWrapper.__del__ at 0x7f913005b200>
Traceback (most recent call last):
  File "/opt/spark/python/pyspark/ml/wrapper.py", line 40, in __del__
    if SparkContext._active_spark_context and self._java_obj is not None:
AttributeError: 'RandomForestClassifier' object has no attribute '_java_obj'


+-----------------+---------+---------+
|       prediction|dep_delay| features|
+-----------------+---------+---------+
|9.594737106457627|      -15|[1.0,0.0]|
|9.594737106457627|      -13|[1.0,0.0]|
|9.594737106457627|      -11|[1.0,0.0]|
|9.594737106457627|      -10|[1.0,0.0]|
|9.594737106457627|      -10|[1.0,0.0]|
+-----------------+---------+---------+
only showing top 5 rows

Root Mean Squared Error (RMSE) on test data = 37.4283


In [18]:
from pyspark.ml.regression import DecisionTreeRegressor
from pyspark.ml.evaluation import RegressionEvaluator

dt = DecisionTreeRegressor(featuresCol ='features', labelCol = 'dep_delay')
dt_model = dt.fit(train_df)
dt_predictions = dt_model.transform(test_df)
dt_evaluator = RegressionEvaluator(
    labelCol="dep_delay", predictionCol="prediction", metricName="rmse")
rmse = dt_evaluator.evaluate(dt_predictions)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)

Root Mean Squared Error (RMSE) on test data = 37.3293


In [None]:
trainingSummary = lr_model.summary
print("RMSE: %f" % trainingSummary.rootMeanSquaredError)
print("r2: %f" % trainingSummary.r2)

In [None]:
train, test = delay_df.randomSplit([0.7, 0.3], seed = 2018)

In [None]:
from pyspark.ml.classification import LogisticRegression
lr = LogisticRegression(featuresCol = 'features', labelCol = 'label')
lrModel = lr.fit(train)

In [None]:
import matplotlib.pyplot as plt
import numpy as np
beta = np.sort(lrModel.coefficients)
plt.plot(beta)
plt.ylabel('Beta Coefficients')
plt.show()

In [None]:
trainingSummary = lrModel.summary
roc = trainingSummary.roc.toPandas()
plt.plot(roc['FPR'],roc['TPR'])
plt.ylabel('False Positive Rate')
plt.xlabel('True Positive Rate')
plt.title('ROC Curve')
plt.show()
print('Training set area under ROC: ' + str(trainingSummary.areaUnderROC))

In [None]:
pr = trainingSummary.pr.toPandas()
plt.plot(pr['recall'],pr['precision'])
plt.ylabel('Precision')
plt.xlabel('Recall')
plt.show()

In [None]:
import matplotlib.pyplot as plt
import numpy as np

predictions = lrModel.transform(test)

predictions.select('date',
 'flight_number',
 'flight_identifier', 'prediction', 'probability').show(10)

In [None]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator
evaluator = BinaryClassificationEvaluator()
print('Test Area Under ROC', evaluator.evaluate(predictions))

In [None]:
from pyspark.ml.classification import DecisionTreeClassifier
dt = DecisionTreeClassifier(featuresCol = 'features', labelCol = 'label', maxDepth = 3)
dtModel = dt.fit(train)
predictions = dtModel.transform(test)
predictions.select('date',
 'flight_number',
 'flight_identifier', 'prediction', 'probability').show(10)

In [None]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator
evaluator = BinaryClassificationEvaluator()
print('Test Area Under ROC', evaluator.evaluate(predictions))

In [None]:
from pyspark.ml.classification import RandomForestClassifier
rf = RandomForestClassifier(featuresCol = 'features', labelCol = 'label')
rfModel = rf.fit(train)
predictions = rfModel.transform(test)
predictions.select('date',
 'flight_number',
 'flight_identifier', 'prediction', 'probability').show(10)

In [None]:
evaluator = BinaryClassificationEvaluator()
print("Test Area Under ROC: " + str(evaluator.evaluate(predictions, {evaluator.metricName: "areaUnderROC"})))

In [None]:
from pyspark.ml.classification import GBTClassifier
gbt = GBTClassifier(maxIter=10)
gbtModel = gbt.fit(train)
predictions = gbtModel.transform(test)
predictions.select('date',
 'flight_number',
 'flight_identifier', 'prediction', 'probability').show(10)

In [None]:
evaluator = BinaryClassificationEvaluator()
print("Test Area Under ROC: " + str(evaluator.evaluate(predictions, {evaluator.metricName: "areaUnderROC"})))

##### Flight Delays

In [None]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
paramGrid = (ParamGridBuilder()
             .addGrid(gbt.maxDepth, [2, 4, 6])
             .addGrid(gbt.maxBins, [20, 60])
             .build())
cv = CrossValidator(estimator=gbt, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=3)# Run cross validations.  This can take about 6 minutes since it is training over 20 trees!
cvModel = cv.fit(train)
predictions = cvModel.transform(test)
evaluator.evaluate(predictions)

In [None]:
cvModel.bestModel

In [None]:
cvModel.getEstimatorParamMaps()[ np.argmax(cvModel.avgMetrics) ]
