In [1]:
# Import Statements
from pyspark.ml import feature
from pyspark.ml import clustering
from pyspark.ml import Pipeline
from pyspark.sql import functions as fn
import numpy as np
from pyspark.sql import SparkSession
from pyspark.ml import feature, regression, evaluation, Pipeline
from pyspark.sql import functions as fn, Row
import matplotlib.pyplot as plt
import seaborn as sns
import pandas as pd
from pyspark.sql import functions as sf
from pyspark.ml.feature import CountVectorizer
from pyspark.ml.feature import IDF
from pyspark.ml.feature import RegexTokenizer
import requests
from pyspark.ml.feature import StopWordsRemover
from pyspark.sql.functions import concat, col, lit, lower
from pyspark.sql.functions import isnan, when, count, col, isnull
from pyspark.sql.functions import concat_ws
from  pyspark.sql.functions import abs
# seting master("local[*]") enables multicore processing on all available logical cores on your machine
spark = SparkSession.builder.master("local[*]").getOrCreate()
sc = spark.sparkContext

In [2]:
# Do not delete or change this cell

import os

# Define a function to determine if we are running on data bricks
# Return true if running in the data bricks environment, false otherwise
def is_databricks():
    # get the databricks runtime version
    db_env = os.getenv("DATABRICKS_RUNTIME_VERSION")
    
    # if running on data bricks
    if db_env != None:
        return True
    else:
        return False

# Define a function to read the data file.  The full path data file name is constructed
# by checking runtime environment variables to determine if the runtime environment is 
# databricks, or a student's personal computer.  The full path file name is then
# constructed based on the runtime env.
# 
# Params
#   data_file_name: The base name of the data file to load
# 
# Returns the full path file name based on the runtime env
#
def get_training_filename(data_file_name):    
    # if running on data bricks
    if is_databricks():
        # build the full path file name assuming data brick env
        full_path_name = "/FileStore/tables/%s" % data_file_name
    # else the data is assumed to be in the same dir as this notebook
    else:
        # Assume the student is running on their own computer and load the data
        # file from the same dir as this notebook
        full_path_name = data_file_name
    
    # return the full path file name to the caller
    return full_path_name

In [3]:
# Loading raw data
airlines_df = spark.read.csv(get_training_filename('airlines.csv'), header=True, inferSchema=True)
airports_df = spark.read.csv(get_training_filename('airports.csv'), header=True, inferSchema=True)
flights_df = spark.read.csv(get_training_filename('flights.csv'), header=True, inferSchema=True)

In [4]:
# Sampling data
flights_df_sample = flights_df.sample(True, 0.5, 42)

In [5]:
# Checking for Nulls in dataframe
flights_df_sample.select([count(when(isnull(c), c)).alias(c) for c in flights_df_sample.columns]).show()

In [6]:
# Removing flights which are cancelled
flights_df_sample = flights_df_sample.select('MONTH', 'DAY', 'DAY_OF_WEEK', 'AIRLINE', 'ORIGIN_AIRPORT', 'DESTINATION_AIRPORT', 'SCHEDULED_DEPARTURE', 'DEPARTURE_DELAY', 'DISTANCE', 'SCHEDULED_ARRIVAL', 'CANCELLED')

flights_df_sample = flights_df_sample.filter((fn.col('CANCELLED')==0))

In [7]:
# Cleaning and one hot enconding
from pyspark.ml.feature import Bucketizer
from pyspark.sql.functions import udf
from pyspark.sql.types import *

#bucketizing our data
delay_bucketizer = Bucketizer(splits=[ -55, 15, 30, float('Inf') ],inputCol="DEPARTURE_DELAY", outputCol="Delay_Bucket")
flights_df_sample = delay_bucketizer.setHandleInvalid("keep").transform(flights_df_sample)

d = {0.0:"No Delay", 1.0: "Delayed by more than 15mins", 2.0:"Delayed by more than 30 mins"}
udf_foo = udf(lambda x: d[x], StringType())
flights_df_sample = flights_df_sample.withColumn("Flight_Delayed", udf_foo("Delay_Bucket"))

#flights_df_sample = flights_df_sample.withColumn("Flight_Delayed", fn.when(fn.col("DEPARTURE_DELAY")<10, 0).otherwise(1))


bucketizer = Bucketizer(splits=[ 0, 100, 1000, float('Inf') ],inputCol="DISTANCE", outputCol="Distance_Bucket")
flights_df_sample = bucketizer.setHandleInvalid("keep").transform(flights_df_sample)

t = {0.0:"Short", 1.0: "Medium", 2.0:"Long"}
udf_foo = udf(lambda x: t[x], StringType())
flights_df_sample = flights_df_sample.withColumn("Flight_Distance", udf_foo("Distance_Bucket"))

from pyspark.ml.feature import StringIndexer

# String indexing our Airline column
indexer = StringIndexer(inputCol="AIRLINE", outputCol="Airline_Numeric").fit(flights_df_sample)
flights_df_sample = indexer.transform(flights_df_sample)

# performing one hot encoding
from pyspark.ml.feature import OneHotEncoder
encoder = OneHotEncoder(inputCol="Airline_Numeric", outputCol="Airline_OHE")
flights_df_sample= encoder.transform(flights_df_sample)

indexer = StringIndexer(inputCol="ORIGIN_AIRPORT", outputCol="OA_Numeric").fit(flights_df_sample)
flights_df_sample = indexer.transform(flights_df_sample)

encoder = OneHotEncoder(inputCol="OA_Numeric", outputCol="Origin_Airport_OHE")
flights_df_sample= encoder.transform(flights_df_sample)

indexer = StringIndexer(inputCol="DESTINATION_AIRPORT", outputCol="DA_Numeric").fit(flights_df_sample)
flights_df_sample = indexer.transform(flights_df_sample)

encoder = OneHotEncoder(inputCol="DA_Numeric", outputCol="Destination_Airport_OHE")
flights_df_sample= encoder.transform(flights_df_sample)

In [8]:
#print schema of data
flights_df_sample.printSchema()

In [9]:
#splitting data into train ad test dataframe
training_df, testing_df = flights_df_sample.randomSplit([0.9, 0.1])

In [10]:
# creating vector assembler
from pyspark.ml.feature import VectorAssembler

va = VectorAssembler(
    inputCols=["MONTH", "DAY", "DAY_OF_WEEK", "Airline_OHE", "Origin_Airport_OHE", "Destination_Airport_OHE", "SCHEDULED_DEPARTURE", "Distance_Bucket", "SCHEDULED_ARRIVAL", "CANCELLED"], outputCol="features")

In [11]:
from pyspark.ml.classification import LogisticRegression, RandomForestClassifier
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.classification import GBTClassifier
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.tuning import CrossValidatorModel

rf_bce = BinaryClassificationEvaluator(labelCol='Delay_Bucket', metricName='areaUnderROC')
gbt_bce = BinaryClassificationEvaluator(labelCol='Delay_Bucket', metricName='areaUnderROC')


from pyspark.ml import evaluation
lr_evaluator_accuracy = evaluation.MulticlassClassificationEvaluator(labelCol="Delay_Bucket", metricName="accuracy")
rf_evaluator_accuracy = evaluation.MulticlassClassificationEvaluator(labelCol="Delay_Bucket", metricName="f1")
rf_cv_evaluator_f1 = evaluation.MulticlassClassificationEvaluator(labelCol="Delay_Bucket", metricName="f1")
gbt_evaluator_f1 = evaluation.MulticlassClassificationEvaluator(labelCol="Delay_Bucket", metricName="f1")

In [12]:
# building logistic regression model
lr = LogisticRegression(featuresCol='features', labelCol='Delay_Bucket', maxIter=10, regParam=0.3, elasticNetParam=0.8, family='multinomial')
lr_pipeline = Pipeline(stages=[va, lr])
lr_model = lr_pipeline.fit(training_df)
lr_transform = lr_model.transform(testing_df)

In [13]:
lr_accuracy = lr_evaluator_accuracy.evaluate(lr_transform)

In [14]:
# AURC metric
lr_bce = BinaryClassificationEvaluator(labelCol='Delay_Bucket', metricName='areaUnderROC')
lr_auc = lr_bce.evaluate(lr_transform)

In [15]:
# dataframe for Metrics of logistic model
scores = [lr_accuracy, lr_auc]
metricName = ['Balanced Accuracy', 'AUC']
metric_df = pd.DataFrame(zip(metricName,scores),index=[1,2],columns=['Metric Name', 'Score'])
metric_df

In [16]:
# Building random forest model
rf = RandomForestClassifier(featuresCol='features', labelCol='Delay_Bucket')
rf_pipeline = Pipeline(stages=[va, rf])
rf_model = rf_pipeline.fit(training_df)
rf_transform = rf_model.transform(testing_df) 

In [17]:
rf_acc = rf_evaluator_accuracy.evaluate(rf_transform)

In [18]:
rf_auc = rf_bce.evaluate(rf_transform)

In [19]:
# dataframe for Metrics of random forest model
scores = [rf_acc, rf_auc]
metricName = ['Balanced Accuracy', 'AUC']
metric_df = pd.DataFrame(zip(metricName,scores),index=[1,2],columns=['Metric Name', 'Score'])
metric_df