In [0]:
import os
#from datetime import timedelta, date
from pyspark.sql import SQLContext, SparkSession
from pyspark.sql.functions import udf
from pyspark.sql.types import (StructType,
                               StructField,
                               DoubleType,
                               IntegerType,
                               StringType,
                               BooleanType)

In [0]:
# add a boolean column that indicates whether flight delayed or not (threshold 15 mins)
was_delayed_udf = udf(lambda x: float(x >= 15), DoubleType())

# convert hours, e.g. 1430 --> 14
get_hour_udf = udf(lambda x: float(x // 100), DoubleType())

# add column that indicates how close a flight is to a holiday
#nearest_holiday_udf = udf(nearest_holiday, DoubleType())

In [0]:

if __name__ == "__main__":

    spark = SparkSession.builder \
        .master('local') \
        .appName('Flight Delay') \
        .getOrCreate()

In [0]:
from pyspark import SparkConf, SparkContext, SQLContext

conf = SparkConf().setAppName("Flight Delay").setMaster("local")
sc = SparkContext(conf=conf)
sqlContext = SQLContext(sc)
print(sc)
print(sqlContext)

In [0]:
 flight_data = spark.read \
        .format('com.databricks.spark.csv') \
        .csv('/FileStore/tables/FlightData.csv',
             inferSchema='true', nanValue="", header='true', mode='PERMISSIVE')

In [0]:
# there is a PR to accept multiple `nanValue`s, until then, however, the schema
# must be manually cast (due to the way the DOT stores the data)
flight_data = flight_data \
        .withColumn('Year', flight_data['Year'].cast('int')) \
        .withColumn('Month', flight_data['Month'].cast('Double')) \
        .withColumn('Day', flight_data['Day'].cast('Double')) \
        .withColumn('CRSDepTime', flight_data['CRSDepTime'].cast('Double')) \
        .withColumn('Dow', flight_data['Dow'].cast('Double')) \
        .withColumn('DepTime', flight_data['DepTime'].cast('Double')) \
        .withColumn('DepDelay', flight_data['DepDelay'].cast('Double')) \
        .withColumn('TaxiOut', flight_data['TaxiOut'].cast('int')) \
        .withColumn('TaxiIn', flight_data['TaxiIn'].cast('int')) \
        .withColumn('CRSArrTime', flight_data['CRSArrTime'].cast('int')) \
        .withColumn('ArrTime', flight_data['ArrTime'].cast('int')) \
        .withColumn('ArrDelay', flight_data['ArrDelay'].cast('int')) \
        .withColumn('Cancelled', flight_data['Cancelled'].cast('int')) \
        .withColumn('Diverted', flight_data['Diverted'].cast('int')) \
        .withColumn('CRSElapsedTime', flight_data['CRSElapsedTime'].cast('int')) \
        .withColumn('ActualElapsedTime', flight_data['ActualElapsedTime'].cast('int')) \
        .withColumn('AirTime', flight_data['AirTime'].cast('int')) \
        .withColumn('Distance', flight_data['Distance'].cast('Double')) \
        .withColumn('CarrierDelay', flight_data['CarrierDelay'].cast('int')) \
        .withColumn('WeatherDelay', flight_data['WeatherDelay'].cast('int')) \
        .withColumn('NASDelay', flight_data['NASDelay'].cast('int')) \
        .withColumn('SecurityDelay', flight_data['SecurityDelay'].cast('int')) \
        .withColumn('LateAircraftDelay', flight_data['LateAircraftDelay'].cast('int'))\
        .withColumn('OP_CARRIER', flight_data['OP_CARRIER'].cast('string'))

In [0]:
# drop cancelled flights, and flights where there is no departure delay data
flight_data = flight_data \
        .dropna(subset=['DepDelay']) \
        .filter(flight_data['Cancelled'] == 0)

In [0]:
# add new udf computed columns
flight_data = flight_data \
        .withColumn('Delayed', was_delayed_udf(flight_data['DepDelay'])) \
        .withColumn('CRSDepTime', get_hour_udf(flight_data['CRSDepTime']))

In [0]:
flight_data.show(2)

In [0]:
flight_data.registerTempTable("airlinedf")

In [0]:
display(sqlContext.sql("SELECT sum(WeatherDelay) Weather,sum(NASDelay) NAS,sum(SecurityDelay) Security,sum(LateAircraftDelay) lateAircraft,sum(CarrierDelay) Carrier\
                              FROM airlinedf "))

Weather,NAS,Security,lateAircraft,Carrier
378095,1173609,7493,1691916,2032137


In [0]:
display(sqlContext.sql("SELECT OP_CARRIER, avg(DepDelay) \
                                FROM airlinedf \
                                GROUP BY OP_CARRIER"))

OP_CARRIER,avg(DepDelay)
NK,6.2132734185966125
AA,7.631834477649455
DL,3.8986329442931047
F9,4.596818698263269
YV,10.011800923550538
MQ,7.691121974296302
OH,10.923067326485633
HA,0.4726417782844115
WN,4.336155179927491
UA,5.546129220023283


In [0]:
display(sqlContext.sql("SELECT OP_CARRIER, count(DepDelay) \
                                FROM airlinedf \
                                Where DepDelay > 15 \
                                GROUP BY OP_CARRIER"))

OP_CARRIER,count(DepDelay)
NK,2309
AA,10553
DL,7720
F9,1836
YV,2732
MQ,4157
OH,4510
HA,525
WN,12641
UA,5949


In [0]:
display(sqlContext.sql("SELECT Dow, count(DepDelay) \
                                FROM airlinedf \
                                Where DepDelay > 15 \
                                GROUP BY Dow"))

Dow,count(DepDelay)
7.0,10858
1.0,10810
4.0,13420
3.0,9193
2.0,7259
6.0,12049
5.0,15172


In [0]:
display(sqlContext.sql("SELECT ORIGIN_AIRPORT_ID, count(DepDelay) \
                                FROM airlinedf \
                                Where DepDelay > 15 \
                                GROUP BY ORIGIN_AIRPORT_ID"))


ORIGIN_AIRPORT_ID,count(DepDelay)
14570,143
11146,67
13795,31
10257,110
12264,585
14771,2234
11057,2941
13377,34
13830,274
10994,203


In [0]:
display(sqlContext.sql("SELECT ORIGIN_AIRPORT_ID, count(*) conFlight,avg(Delayed) delay\
                                FROM airlinedf \
                                GROUP BY ORIGIN_AIRPORT_ID"))


ORIGIN_AIRPORT_ID,conFlight,delay
14570,1649,0.0915706488781079
11146,371,0.1886792452830188
11630,357,0.1288515406162464
13795,247,0.1295546558704453
10257,898,0.1236080178173719
12264,5073,0.1186674551547407
14771,13521,0.1715109829154648
11057,19902,0.1534519143804642
13830,2383,0.1229542593369702
13377,261,0.1340996168582375


In [0]:
display(sqlContext.sql("SELECT ORIGIN, DEST, count(1) traffic,avg(Distance) Dist, avg(DepDelay) Delay\
                                FROM airlinedf \
                                GROUP BY ORIGIN,DEST\
                                Order By 5 desc"))

ORIGIN,DEST,traffic,Dist,Delay
RST,FAR,1,295.0,399.0
CHA,PHL,6,641.0,216.33333333333331
PHL,CHA,6,641.0,207.33333333333331
GFK,LAS,9,1230.0,200.22222222222223
LAS,GFK,9,1230.0,179.11111111111111
LCK,SAV,2,539.0,149.5
CID,LAS,9,1319.0,147.77777777777777
SAV,LCK,2,539.0,142.5
MCI,PGD,9,1125.0,120.66666666666669
SJC,PSP,1,390.0,119.0


In [0]:
display(sqlContext.sql("SELECT  OP_CARRIER,ORIGIN, avg(Cancelled) Cancelled from airlinedf \
                            WHERE Delayed=True \
                            GROUP BY OP_CARRIER,ORIGIN"))

OP_CARRIER,ORIGIN,Cancelled
WN,ALB,0.0
WN,BWI,0.0
DL,STL,0.0
DL,MSY,0.0
WN,MAF,0.0
MQ,HSV,0.0
OH,MKE,0.0
OH,SAV,0.0
NK,RIC,0.0
WN,BNA,0.0


In [0]:
# columns used in the predictive models
cols = ['DepDelay', 'Month', 'Day', 'Dow', 'CRSDepTime', 'Distance', 'OP_CARRIER',
            'ORIGIN', 'Dest', 'Delayed']

In [0]:
flight_data

In [0]:
# rename columns
flights = flight_data \
        .select(*cols) \
        .withColumnRenamed('OP_CARRIER', 'Carrier')\
        .withColumnRenamed('DepDelay', 'Delay') \
        .withColumnRenamed('CRSDepTime', 'Hour')

In [0]:
flights.groupBy("Carrier").count().show()

In [0]:
#flights.createGlobalTempView("flight19")

In [0]:
print("Table before storing")
flights.show(5)

In [0]:
import os
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler
from pyspark.ml.classification import LogisticRegression, RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [0]:
# categorical columns that will be OneHotEncoded
cat_cols = ['Month', 'Day', 'Dow', 'Hour', 'Carrier', 'Dest']

In [0]:
# numeric columns that will be a part of features used for prediction
non_cat_cols = ['Delay', 'Distance']

In [0]:
cat_indexers = [ StringIndexer(inputCol=col, outputCol=col+'_Index')
                     for col in cat_cols ]

In [0]:
# OneHotEncode each categorical feature after being StringIndexed
encoders = [ OneHotEncoder(dropLast=False, inputCol=indexer.getOutputCol(),
                               outputCol=indexer.getOutputCol()+'_Encoded')
                 for indexer in cat_indexers ]

In [0]:
# Assemble all feature columns (numeric + categorical) into `features` col
assembler = VectorAssembler(inputCols=[encoder.getOutputCol()
                                           for encoder in encoders] + non_cat_cols,
                                outputCol='Features')


In [0]:
# Train a random forest model
rf = RandomForestClassifier(labelCol='Delayed',featuresCol='Features', numTrees=10)

In [0]:
# Chain indexers, encoders, and forest into one pipeline
pipeline = Pipeline(stages=[ *cat_indexers, *encoders, assembler, rf ] )

In [0]:
# split the data into training and testing splits (70/30 rn)
(trainingData, testData) = flights.randomSplit([0.7, 0.3])

In [0]:
# Train the model -- which also runs indexers and coders
model = pipeline.fit(trainingData)

In [0]:
# use model to make predictions
predictions = model.transform(testData)

In [0]:
display(predictions.select('Delayed', 'prediction', 'probability', 'Features' ))

Delayed,prediction,probability,Features
0.0,0.0,"Map(vectorType -> dense, length -> 2, values -> List(0.8883431072793447, 0.11165689272065518))","Map(vectorType -> sparse, length -> 432, indices -> List(0, 9, 32, 55, 76, 81, 430, 431), values -> List(1.0, 1.0, 1.0, 1.0, 1.0, 1.0, -28.0, 1011.0))"
0.0,0.0,"Map(vectorType -> dense, length -> 2, values -> List(0.8608071256287069, 0.13919287437129318))","Map(vectorType -> sparse, length -> 432, indices -> List(0, 29, 38, 48, 71, 95, 430, 431), values -> List(1.0, 1.0, 1.0, 1.0, 1.0, 1.0, -27.0, 769.0))"
0.0,0.0,"Map(vectorType -> dense, length -> 2, values -> List(0.8906466431208117, 0.10935335687918828))","Map(vectorType -> sparse, length -> 432, indices -> List(0, 4, 33, 55, 76, 106, 430, 431), values -> List(1.0, 1.0, 1.0, 1.0, 1.0, 1.0, -27.0, 616.0))"
0.0,0.0,"Map(vectorType -> dense, length -> 2, values -> List(0.8638061536214862, 0.13619384637851395))","Map(vectorType -> sparse, length -> 432, indices -> List(0, 30, 38, 46, 71, 84, 430, 431), values -> List(1.0, 1.0, 1.0, 1.0, 1.0, 1.0, -25.0, 241.0))"
0.0,0.0,"Map(vectorType -> dense, length -> 2, values -> List(0.8946583780727092, 0.10534162192729073))","Map(vectorType -> sparse, length -> 432, indices -> List(0, 25, 37, 41, 76, 129, 430, 431), values -> List(1.0, 1.0, 1.0, 1.0, 1.0, 1.0, -24.0, 985.0))"
0.0,0.0,"Map(vectorType -> dense, length -> 2, values -> List(0.894689688404083, 0.10531031159591686))","Map(vectorType -> sparse, length -> 432, indices -> List(0, 1, 35, 52, 64, 80, 430, 431), values -> List(1.0, 1.0, 1.0, 1.0, 1.0, 1.0, -23.0, 526.0))"
0.0,0.0,"Map(vectorType -> dense, length -> 2, values -> List(0.8666723153931327, 0.13332768460686723))","Map(vectorType -> sparse, length -> 432, indices -> List(0, 6, 32, 57, 76, 125, 430, 431), values -> List(1.0, 1.0, 1.0, 1.0, 1.0, 1.0, -23.0, 879.0))"
0.0,0.0,"Map(vectorType -> dense, length -> 2, values -> List(0.8621172313561057, 0.1378827686438942))","Map(vectorType -> sparse, length -> 432, indices -> List(0, 21, 33, 50, 76, 83, 430, 431), values -> List(1.0, 1.0, 1.0, 1.0, 1.0, 1.0, -23.0, 455.0))"
0.0,0.0,"Map(vectorType -> dense, length -> 2, values -> List(0.8906466431208117, 0.10935335687918828))","Map(vectorType -> sparse, length -> 432, indices -> List(0, 9, 32, 55, 76, 106, 430, 431), values -> List(1.0, 1.0, 1.0, 1.0, 1.0, 1.0, -23.0, 616.0))"
0.0,0.0,"Map(vectorType -> dense, length -> 2, values -> List(0.9000828081948699, 0.09991719180513019))","Map(vectorType -> sparse, length -> 432, indices -> List(0, 16, 34, 42, 76, 88, 430, 431), values -> List(1.0, 1.0, 1.0, 1.0, 1.0, 1.0, -22.0, 1090.0))"


In [0]:
# Select (prediction, true label) and compute test error
evaluator = MulticlassClassificationEvaluator(
labelCol='Delayed', predictionCol='prediction', metricName='accuracy')
accuracy = evaluator.evaluate(predictions)
print('Accuracy = %g' % (accuracy))

In [0]:
rf_model = model.stages[-1]
print(rf_model) # summary only

In [0]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer 

In [0]:
# categorical columns that will be OneHotEncoded
cat_cols = ['Month', 'Day', 'Dow', 'Hour', 'Carrier', 'Dest']

In [0]:
# numeric columns that will be a part of features used for prediction
non_cat_cols = ['Delay', 'Distance']

In [0]:
cat_indexers = [ StringIndexer(inputCol=col, outputCol=col+'_Index')
                     for col in cat_cols ]

In [0]:
# OneHotEncode each categorical feature after being StringIndexed
encoders = [ OneHotEncoder(dropLast=False, inputCol=indexer.getOutputCol(),
                               outputCol=indexer.getOutputCol()+'_Encoded')
                 for indexer in cat_indexers ]

In [0]:
# Assemble all feature columns (numeric + categorical) into `features` col
assembler = VectorAssembler(inputCols=[encoder.getOutputCol()
                                           for encoder in encoders] + non_cat_cols,
                                outputCol='Features')


In [0]:
# Train a Logistic Regression model
lr = LogisticRegression(labelCol='Delayed',featuresCol='Features', maxIter=10)

In [0]:
# Train the model -- which also runs indexers and coders
model = pipeline.fit(trainingData)

In [0]:
# use model to make predictions
predictions = model.transform(testData)

In [0]:
display(predictions.select('Delayed', 'prediction', 'probability', 'Features' ))

Delayed,prediction,probability,Features
0.0,0.0,"Map(vectorType -> dense, length -> 2, values -> List(0.8883431072793447, 0.11165689272065518))","Map(vectorType -> sparse, length -> 432, indices -> List(0, 9, 32, 55, 76, 81, 430, 431), values -> List(1.0, 1.0, 1.0, 1.0, 1.0, 1.0, -28.0, 1011.0))"
0.0,0.0,"Map(vectorType -> dense, length -> 2, values -> List(0.8608071256287069, 0.13919287437129318))","Map(vectorType -> sparse, length -> 432, indices -> List(0, 29, 38, 48, 71, 95, 430, 431), values -> List(1.0, 1.0, 1.0, 1.0, 1.0, 1.0, -27.0, 769.0))"
0.0,0.0,"Map(vectorType -> dense, length -> 2, values -> List(0.8906466431208117, 0.10935335687918828))","Map(vectorType -> sparse, length -> 432, indices -> List(0, 4, 33, 55, 76, 106, 430, 431), values -> List(1.0, 1.0, 1.0, 1.0, 1.0, 1.0, -27.0, 616.0))"
0.0,0.0,"Map(vectorType -> dense, length -> 2, values -> List(0.8638061536214862, 0.13619384637851395))","Map(vectorType -> sparse, length -> 432, indices -> List(0, 30, 38, 46, 71, 84, 430, 431), values -> List(1.0, 1.0, 1.0, 1.0, 1.0, 1.0, -25.0, 241.0))"
0.0,0.0,"Map(vectorType -> dense, length -> 2, values -> List(0.8946583780727092, 0.10534162192729073))","Map(vectorType -> sparse, length -> 432, indices -> List(0, 25, 37, 41, 76, 129, 430, 431), values -> List(1.0, 1.0, 1.0, 1.0, 1.0, 1.0, -24.0, 985.0))"
0.0,0.0,"Map(vectorType -> dense, length -> 2, values -> List(0.894689688404083, 0.10531031159591686))","Map(vectorType -> sparse, length -> 432, indices -> List(0, 1, 35, 52, 64, 80, 430, 431), values -> List(1.0, 1.0, 1.0, 1.0, 1.0, 1.0, -23.0, 526.0))"
0.0,0.0,"Map(vectorType -> dense, length -> 2, values -> List(0.8666723153931327, 0.13332768460686723))","Map(vectorType -> sparse, length -> 432, indices -> List(0, 6, 32, 57, 76, 125, 430, 431), values -> List(1.0, 1.0, 1.0, 1.0, 1.0, 1.0, -23.0, 879.0))"
0.0,0.0,"Map(vectorType -> dense, length -> 2, values -> List(0.8621172313561057, 0.1378827686438942))","Map(vectorType -> sparse, length -> 432, indices -> List(0, 21, 33, 50, 76, 83, 430, 431), values -> List(1.0, 1.0, 1.0, 1.0, 1.0, 1.0, -23.0, 455.0))"
0.0,0.0,"Map(vectorType -> dense, length -> 2, values -> List(0.8906466431208117, 0.10935335687918828))","Map(vectorType -> sparse, length -> 432, indices -> List(0, 9, 32, 55, 76, 106, 430, 431), values -> List(1.0, 1.0, 1.0, 1.0, 1.0, 1.0, -23.0, 616.0))"
0.0,0.0,"Map(vectorType -> dense, length -> 2, values -> List(0.9000828081948699, 0.09991719180513019))","Map(vectorType -> sparse, length -> 432, indices -> List(0, 16, 34, 42, 76, 88, 430, 431), values -> List(1.0, 1.0, 1.0, 1.0, 1.0, 1.0, -22.0, 1090.0))"


In [0]:
evaluator = MulticlassClassificationEvaluator(
labelCol='Delayed', predictionCol='prediction', metricName='accuracy')
accuracy = evaluator.evaluate(predictions)
print('Accuracy = %g' % (accuracy))