In [1]:
import os
#from datetime import timedelta, date
from pyspark.sql import SQLContext, SparkSession
from pyspark.sql.functions import udf
from pyspark.sql.types import (StructType,
                               StructField,
                               DoubleType,
                               IntegerType,
                               StringType,
                               BooleanType)

In [2]:
# add a boolean column that indicates whether flight delayed or not (threshold 15 mins)
was_delayed_udf = udf(lambda x: float(x >= 15), DoubleType())

# convert hours, e.g. 1430 --> 14
get_hour_udf = udf(lambda x: float(x // 100), DoubleType())

# add column that indicates how close a flight is to a holiday
#nearest_holiday_udf = udf(nearest_holiday, DoubleType())

In [3]:

if __name__ == "__main__":

    spark = SparkSession.builder \
        .master('local') \
        .appName('Flight Delay') \
        .getOrCreate()

In [4]:
 flight_data = spark.read \
        .format('com.databricks.spark.csv') \
        .csv('/FileStore/tables/flightdelay.csv',
             inferSchema='true', nanValue="", header='true', mode='PERMISSIVE')

In [5]:
# there is a PR to accept multiple `nanValue`s, until then, however, the schema
# must be manually cast (due to the way the DOT stores the data)
flight_data = flight_data \
        .withColumn('Year', flight_data['Year'].cast('int')) \
        .withColumn('Month', flight_data['Month'].cast('Double')) \
        .withColumn('Day', flight_data['Day'].cast('Double')) \
        .withColumn('CRSDepTime', flight_data['CRSDepTime'].cast('Double')) \
        .withColumn('Dow', flight_data['Dow'].cast('Double')) \
        .withColumn('DepTime', flight_data['DepTime'].cast('Double')) \
        .withColumn('DepDelay', flight_data['DepDelay'].cast('Double')) \
        .withColumn('TaxiOut', flight_data['TaxiOut'].cast('int')) \
        .withColumn('TaxiIn', flight_data['TaxiIn'].cast('int')) \
        .withColumn('CRSArrTime', flight_data['CRSArrTime'].cast('int')) \
        .withColumn('ArrTime', flight_data['ArrTime'].cast('int')) \
        .withColumn('ArrDelay', flight_data['ArrDelay'].cast('int')) \
        .withColumn('Cancelled', flight_data['Cancelled'].cast('int')) \
        .withColumn('Diverted', flight_data['Diverted'].cast('int')) \
        .withColumn('CRSElapsedTime', flight_data['CRSElapsedTime'].cast('int')) \
        .withColumn('ActualElapsedTime', flight_data['ActualElapsedTime'].cast('int')) \
        .withColumn('AirTime', flight_data['AirTime'].cast('int')) \
        .withColumn('Distance', flight_data['Distance'].cast('Double')) \
        .withColumn('CarrierDelay', flight_data['CarrierDelay'].cast('int')) \
        .withColumn('WeatherDelay', flight_data['WeatherDelay'].cast('int')) \
        .withColumn('NASDelay', flight_data['NASDelay'].cast('int')) \
        .withColumn('SecurityDelay', flight_data['SecurityDelay'].cast('int')) \
        .withColumn('LateAircraftDelay', flight_data['LateAircraftDelay'].cast('int'))

In [7]:
# drop cancelled flights, and flights where there is no departure delay data
flight_data = flight_data \
        .dropna(subset=['DepDelay']) \
        .filter(flight_data['Cancelled'] == 0)

In [8]:
# add new udf computed columns
flight_data = flight_data \
        .withColumn('Delayed', was_delayed_udf(flight_data['DepDelay'])) \
        .withColumn('CRSDepTime', get_hour_udf(flight_data['CRSDepTime']))

In [9]:
flight_data.show(2)

In [10]:
flight_data.registerTempTable("airlinedf")

In [11]:
display(sqlContext.sql("SELECT sum(WeatherDelay) Weather,sum(NASDelay) NAS,sum(SecurityDelay) Security,sum(LateAircraftDelay) lateAircraft,sum(CarrierDelay) Carrier\
                              FROM airlinedf "))

Weather,NAS,Security,lateAircraft,Carrier
465981,1947490,8366,2625351,2115602


In [12]:
display(sqlContext.sql("SELECT OPCarrier, avg(DepDelay) \
                                FROM airlinedf \
                                GROUP BY OPCarrier"))

OPCarrier,avg(DepDelay)
UA,15.738500151456142
NK,11.933501963655727
AA,11.010886552062088
EV,20.101299975472163
B6,24.05749477901377
DL,10.057150242150517
OO,20.844456968038
F9,16.869313395113732
YV,12.65885052325256
MQ,12.899621212121213


In [13]:
display(sqlContext.sql("SELECT OPCarrier, count(DepDelay) \
                                FROM airlinedf \
                                Where DepDelay > 15 \
                                GROUP BY OPCarrier"))

OPCarrier,count(DepDelay)
UA,8181
NK,2109
AA,11550
EV,2477
B6,6184
DL,9063
OO,12434
F9,2260
YV,2814
MQ,4570


In [14]:
display(sqlContext.sql("SELECT Dow, count(DepDelay) \
                                FROM airlinedf \
                                Where DepDelay > 15 \
                                GROUP BY Dow"))

Dow,count(DepDelay)
7.0,12144
1.0,13114
4.0,18533
3.0,16269
2.0,13836
6.0,8478
5.0,13075


In [15]:
display(sqlContext.sql("SELECT Origin, count(DepDelay) \
                                FROM airlinedf \
                                Where DepDelay > 15 \
                                GROUP BY Origin"))


Origin,count(DepDelay)
BGM,14
INL,5
PSE,15
MSY,654
GEG,117
DRT,7
BUR,328
SNA,404
GRB,69
GTF,21


In [16]:
display(sqlContext.sql("SELECT Origin, count(*) conFlight,avg(Delayed) delay\
                                FROM airlinedf \
                                GROUP BY Origin"))


Origin,conFlight,delay
BGM,54,0.2592592592592592
INL,53,0.0943396226415094
PSE,64,0.25
MSY,4544,0.1492077464788732
PPG,11,0.0
GEG,919,0.1294885745375408
DRT,61,0.1475409836065573
BUR,2294,0.1477768090671316
SNA,3383,0.1247413538279633
GRB,329,0.2127659574468085


In [17]:
display(sqlContext.sql("SELECT Origin, Dest, count(*) traffic,avg(Distance) Dist, avg(DepDelay) Delay\
                                FROM airlinedf \
                                GROUP BY Origin,Dest\
                                Order By 5 desc"))

Origin,Dest,traffic,Dist,Delay
BWI,VPS,2,819.0,237.0
VPS,BWI,2,819.0,235.0
CMH,AUS,1,1073.0,218.0
AUS,CMH,1,1073.0,217.0
EWR,VPS,2,988.0,208.5
TWF,SFO,7,536.0,203.14285714285717
RFD,PGD,10,1127.0,182.6
SLC,EGE,1,279.0,163.0
EGE,SLC,1,279.0,155.0
VPS,EWR,2,988.0,152.5


In [18]:
display(sqlContext.sql("SELECT  OPCarrier,Origin, avg(Cancelled) Cancelled from airlinedf \
                            WHERE Delayed=True \
                            GROUP BY OPCarrier,Origin"))

OPCarrier,Origin,Cancelled
WN,BWI,0.0
WN,ALB,0.0
OO,TUL,0.0
G4,IND,0.0
G4,BNA,0.0
B6,SRQ,0.0
AS,SLC,0.0
AS,IAH,0.0
DL,MSY,0.0
DL,STL,0.0


In [19]:
# columns used in the predictive models
cols = ['DepDelay', 'Month', 'Day', 'Dow', 'CRSDepTime', 'Distance', 'OPCarrier',
            'Origin', 'Dest', 'Delayed']

In [20]:
flight_data

In [21]:
# rename columns
flights = flight_data \
        .select(*cols) \
        .withColumnRenamed('OPCarrier', 'Carrier')\
        .withColumnRenamed('DepDelay', 'Delay') \
        .withColumnRenamed('CRSDepTime', 'Hour')

In [22]:
flights.groupBy("Carrier").count().show()

In [23]:
#flights.createGlobalTempView("flight19")

In [24]:
print("Table before storing")
flights.show(5)

In [25]:
import os
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler
from pyspark.ml.classification import LogisticRegression, RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [26]:
# categorical columns that will be OneHotEncoded
cat_cols = ['Month', 'Day', 'Dow', 'Hour', 'Carrier', 'Dest']

In [27]:
# numeric columns that will be a part of features used for prediction
non_cat_cols = ['Delay', 'Distance']

In [28]:
cat_indexers = [ StringIndexer(inputCol=col, outputCol=col+'_Index')
                     for col in cat_cols ]

In [29]:
# OneHotEncode each categorical feature after being StringIndexed
encoders = [ OneHotEncoder(dropLast=False, inputCol=indexer.getOutputCol(),
                               outputCol=indexer.getOutputCol()+'_Encoded')
                 for indexer in cat_indexers ]

In [30]:
# Assemble all feature columns (numeric + categorical) into `features` col
assembler = VectorAssembler(inputCols=[encoder.getOutputCol()
                                           for encoder in encoders] + non_cat_cols,
                                outputCol='Features')


In [31]:
# Train a random forest model
rf = RandomForestClassifier(labelCol='Delayed',featuresCol='Features', numTrees=10)

In [32]:
# Chain indexers, encoders, and forest into one pipeline
pipeline = Pipeline(stages=[ *cat_indexers, *encoders, assembler, rf ] )

In [33]:
# split the data into training and testing splits (70/30 rn)
(trainingData, testData) = flights.randomSplit([0.7, 0.3])

In [34]:
# Train the model -- which also runs indexers and coders
model = pipeline.fit(trainingData)

In [35]:
# use model to make predictions
predictions = model.transform(testData)

In [36]:
display(predictions.select('Delayed', 'prediction', 'probability', 'Features' ))

Delayed,prediction,probability,Features
0.0,0.0,"List(1, 2, List(), List(0.8505992310079871, 0.1494007689920129))","List(0, 428, List(0, 22, 34, 58, 70, 97, 427), List(1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 2475.0))"
0.0,0.0,"List(1, 2, List(), List(0.8505992310079871, 0.1494007689920129))","List(0, 428, List(0, 22, 34, 62, 70, 95, 427), List(1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1674.0))"
0.0,0.0,"List(1, 2, List(), List(0.8505992310079871, 0.1494007689920129))","List(0, 428, List(0, 22, 34, 61, 70, 92, 427), List(1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1129.0))"
0.0,0.0,"List(1, 2, List(), List(0.8505992310079871, 0.1494007689920129))","List(0, 428, List(0, 22, 34, 55, 70, 97, 427), List(1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 264.0))"
0.0,0.0,"List(1, 2, List(), List(0.8505992310079871, 0.1494007689920129))","List(0, 428, List(0, 22, 34, 55, 70, 95, 427), List(1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 265.0))"
0.0,0.0,"List(1, 2, List(), List(0.8596739789831119, 0.14032602101688801))","List(0, 428, List(0, 22, 34, 55, 71, 83, 427), List(1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 414.0))"
0.0,0.0,"List(1, 2, List(), List(0.8653093361431432, 0.13469066385685674))","List(0, 428, List(0, 22, 34, 55, 69, 81, 427), List(1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 501.0))"
0.0,0.0,"List(1, 2, List(), List(0.8596739789831119, 0.14032602101688801))","List(0, 428, List(0, 22, 34, 55, 71, 83, 427), List(1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 562.0))"
0.0,0.0,"List(1, 2, List(), List(0.8662115514354891, 0.13378844856451094))","List(0, 428, List(0, 22, 34, 55, 73, 80, 427), List(1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 743.0))"
0.0,0.0,"List(1, 2, List(), List(0.8505992310079871, 0.1494007689920129))","List(0, 428, List(0, 22, 34, 55, 70, 97, 427), List(1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 828.0))"


In [37]:
# Select (prediction, true label) and compute test error
evaluator = MulticlassClassificationEvaluator(
labelCol='Delayed', predictionCol='prediction', metricName='accuracy')
accuracy = evaluator.evaluate(predictions)
print('Accuracy = %g' % (accuracy))

In [38]:
rf_model = model.stages[-1]
print(rf_model) # summary only

In [39]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoderEstimator, StringIndexer 

In [40]:
# categorical columns that will be OneHotEncoded
cat_cols = ['Month', 'Day', 'Dow', 'Hour', 'Carrier', 'Dest']

In [41]:
# numeric columns that will be a part of features used for prediction
non_cat_cols = ['Delay', 'Distance']

In [42]:
cat_indexers = [ StringIndexer(inputCol=col, outputCol=col+'_Index')
                     for col in cat_cols ]

In [43]:
# OneHotEncode each categorical feature after being StringIndexed
encoders = [ OneHotEncoder(dropLast=False, inputCol=indexer.getOutputCol(),
                               outputCol=indexer.getOutputCol()+'_Encoded')
                 for indexer in cat_indexers ]

In [44]:
# Assemble all feature columns (numeric + categorical) into `features` col
assembler = VectorAssembler(inputCols=[encoder.getOutputCol()
                                           for encoder in encoders] + non_cat_cols,
                                outputCol='Features')


In [45]:
# Train a Logistic Regression model
lr = LogisticRegression(labelCol='Delayed',featuresCol='Features', maxIter=10)

In [46]:
# Train the model -- which also runs indexers and coders
model = pipeline.fit(trainingData)

In [47]:
# use model to make predictions
predictions = model.transform(testData)

In [48]:
display(predictions.select('Delayed', 'prediction', 'probability', 'Features' ))

Delayed,prediction,probability,Features
0.0,0.0,"List(1, 2, List(), List(0.8505992310079871, 0.1494007689920129))","List(0, 428, List(0, 22, 34, 58, 70, 97, 427), List(1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 2475.0))"
0.0,0.0,"List(1, 2, List(), List(0.8505992310079871, 0.1494007689920129))","List(0, 428, List(0, 22, 34, 62, 70, 95, 427), List(1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1674.0))"
0.0,0.0,"List(1, 2, List(), List(0.8505992310079871, 0.1494007689920129))","List(0, 428, List(0, 22, 34, 61, 70, 92, 427), List(1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1129.0))"
0.0,0.0,"List(1, 2, List(), List(0.8505992310079871, 0.1494007689920129))","List(0, 428, List(0, 22, 34, 55, 70, 97, 427), List(1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 264.0))"
0.0,0.0,"List(1, 2, List(), List(0.8505992310079871, 0.1494007689920129))","List(0, 428, List(0, 22, 34, 55, 70, 95, 427), List(1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 265.0))"
0.0,0.0,"List(1, 2, List(), List(0.8596739789831119, 0.14032602101688801))","List(0, 428, List(0, 22, 34, 55, 71, 83, 427), List(1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 414.0))"
0.0,0.0,"List(1, 2, List(), List(0.8653093361431432, 0.13469066385685674))","List(0, 428, List(0, 22, 34, 55, 69, 81, 427), List(1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 501.0))"
0.0,0.0,"List(1, 2, List(), List(0.8596739789831119, 0.14032602101688801))","List(0, 428, List(0, 22, 34, 55, 71, 83, 427), List(1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 562.0))"
0.0,0.0,"List(1, 2, List(), List(0.8662115514354891, 0.13378844856451094))","List(0, 428, List(0, 22, 34, 55, 73, 80, 427), List(1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 743.0))"
0.0,0.0,"List(1, 2, List(), List(0.8505992310079871, 0.1494007689920129))","List(0, 428, List(0, 22, 34, 55, 70, 97, 427), List(1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 828.0))"


In [49]:
evaluator = MulticlassClassificationEvaluator(
labelCol='Delayed', predictionCol='prediction', metricName='accuracy')
accuracy = evaluator.evaluate(predictions)
print('Accuracy = %g' % (accuracy))