# Table of contents<a class="anchor" id="table"></a>

* [1 Data Loading, Cleaning, Labelling, and Exploration](#1)
    * [1.1 Data Loading](#1.1)
        * [1.1.1 Create Spark Session](#1.1.1)
        * [1.1.2 Load CSV files & display total number of rows](#1.1.2)
        * [1.1.3 Obtain column list](#1.1.3)

    * [1.2 Data Cleaning](#1.2)
        * [1.2.1 Check for missing values in columns](#1.2.1)
        * [1.2.2.a Remove selected columns](#1.2.2.a)
        * [1.2.2.b Drop rows with Null & NaN values](#1.2.2.b)

    * [1.3 Data Labelling](#1.3)
        * [1.3.1 Add label columns for binary classification](#1.3.1)
        * [1.3.2 Add label columns for multiclass classification](#1.3.2)

    * [1.4 Data Exploration](#1.4)
        * [1.4.1 Show statistics of numeric columns](#1.4.1)
        * [1.4.2 Show number of records in each class (binary and multiclass)](#1.4.2)

* [2 Feature extraction and ML Training](#2)
    * [2.1 Prepare the feature columns](#2.1)
        * [2.1.1 Add additional feature column "DEPT_TIME_FLAG"](#2.1.1)
        * [2.1.2 Select relevant columns (both feature & label columns)](#2.1.2)
    * [2.2 Preparing any Spark ML Transformers/ Estimators for features and models](#2.2) 
        * [2.2.1 Create Transformers/Estimators for feature columns](#2.2.1)
        * [2.2.2 Create ML model Estimators for DT & Gradient Boosted tree for binary classification](#2.2.2)
        * [2.2.3 Create ML model Estimators for Naïve Bayes for multi-class classification](#2.2.3)
        * [2.2.4 Create Pipeline Model](#2.2.4)
        
    * [2.3 Preparing the training and testing data](#2.3)
    * [2.4 Training and evaluating models](#2.4)
        * [2.4.1 Binary classification tasks](#2.4.1)
        * [2.4.2 Multiclass Classification tasks](#2.4.2)





# 1 Data Loading, Cleaning, Labelling, and Exploration<a class="anchor" id="1"></a>
In this section, you will prepare the data (loading and cleaning) and performing data exploration.

## 1.1 Data Loading<a class="anchor" id="1.1"></a>
[Back to top](#table)

### 1.1.1 Create Spark Session <a class="anchor" id="1.1.1"></a>
[Back to top](#table)

Write the code to get an object using SparkSession, which tells Spark how to access a cluster. To create a SparkSession you first need to build a SparkConf object that contains information about your application. Give an appropriate name for your application and run Spark locally with as many working processors as logical cores on your machine.

In [1]:
from pyspark import SparkConf
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
# Import SparkContext 
from pyspark import SparkContext # Spark

# local[*]: run Spark in local mode with as many working processors as logical cores on your machine
master = "local[*]"
# Giving the app name of Assignment 2 to be shown on the Spark cluster UI page
app_name = "Assignment 2"
# Setup configuration parameters for Spark
spark_conf = SparkConf().setMaster(master).setAppName(app_name)

# Method 1: Using SparkSession
spark = SparkSession.builder.config(conf=spark_conf).getOrCreate()
sc = spark.sparkContext
sc.setLogLevel('ERROR')

24/10/22 14:20:56 WARN Utils: Your hostname, Sarvagyas-MacBook-Air.local resolves to a loopback address: 127.0.0.1; using 192.168.104.24 instead (on interface en0)
24/10/22 14:20:56 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/10/22 14:20:56 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
24/10/22 14:20:56 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


### 1.1.2 Load CSV files & display total number of rows <a class="anchor" id="1.1.2"></a>
[Back to top](#table)

Read the 20 files of “flight*.csv” file into a single Spark DataFrame namely flightsRawDf using spark.read.csv with both header and inferSchema attributes are set to True. Display the total number of flightsRawDf rows.

In [2]:
#reading the files into flightsRawDf
flightsRawDf = spark.read.csv("/Users/sarvagyasinghs/Downloads/Analysing-flight-delay-data/flight-delays/flights/", header = True, inferSchema = True)
#displaying the total number of rows
print("The total number of rows is " + str(flightsRawDf.count()))

[Stage 1:>                                                          (0 + 7) / 7]                                                                                

The total number of rows is 582184


### 1.1.3 Obtain column list  <a class="anchor" id="1.1.3"></a>
[Back to top](#table)

Obtain the list of columns from flightsRawDf and name this variable as allColumnFlights.

In [3]:
# get a list of columns
allColumnFlights = [ x for x in flightsRawDf.columns]
print(allColumnFlights) # to be removed

['YEAR', 'MONTH', 'DAY', 'DAY_OF_WEEK', 'AIRLINE', 'FLIGHT_NUMBER', 'TAIL_NUMBER', 'ORIGIN_AIRPORT', 'DESTINATION_AIRPORT', 'SCHEDULED_DEPARTURE', 'DEPARTURE_TIME', 'DEPARTURE_DELAY', 'TAXI_OUT', 'WHEELS_OFF', 'SCHEDULED_TIME', 'ELAPSED_TIME', 'AIR_TIME', 'DISTANCE', 'WHEELS_ON', 'TAXI_IN', 'SCHEDULED_ARRIVAL', 'ARRIVAL_TIME', 'ARRIVAL_DELAY', 'DIVERTED', 'CANCELLED', 'CANCELLATION_REASON', 'AIR_SYSTEM_DELAY', 'SECURITY_DELAY', 'AIRLINE_DELAY', 'LATE_AIRCRAFT_DELAY', 'WEATHER_DELAY']


## 1.2 Data Cleaning<a class="anchor" id="1.2"></a>

### 1.2.1 Check for missing values in columns <a class="anchor" id="1.2.1"></a>
[Back to top](#table)

Check for missing values (NaN and Null) in all columns, display the number of missing values for each column. Since there are many columns in the data, it is best to visualize the number of null values for a selected set of columns at one snap shot.

In [4]:
from pyspark.sql.functions import isnan, when, count, col
flightsRawDf.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in flightsRawDf.columns]).show()



+----+-----+---+-----------+-------+-------------+-----------+--------------+-------------------+-------------------+--------------+---------------+--------+----------+--------------+------------+--------+--------+---------+-------+-----------------+------------+-------------+--------+---------+-------------------+----------------+--------------+-------------+-------------------+-------------+
|YEAR|MONTH|DAY|DAY_OF_WEEK|AIRLINE|FLIGHT_NUMBER|TAIL_NUMBER|ORIGIN_AIRPORT|DESTINATION_AIRPORT|SCHEDULED_DEPARTURE|DEPARTURE_TIME|DEPARTURE_DELAY|TAXI_OUT|WHEELS_OFF|SCHEDULED_TIME|ELAPSED_TIME|AIR_TIME|DISTANCE|WHEELS_ON|TAXI_IN|SCHEDULED_ARRIVAL|ARRIVAL_TIME|ARRIVAL_DELAY|DIVERTED|CANCELLED|CANCELLATION_REASON|AIR_SYSTEM_DELAY|SECURITY_DELAY|AIRLINE_DELAY|LATE_AIRCRAFT_DELAY|WEATHER_DELAY|
+----+-----+---+-----------+-------+-------------+-----------+--------------+-------------------+-------------------+--------------+---------------+--------+----------+--------------+------------+--------+-



### 1.2.2.a Remove selected columns <a class="anchor" id="1.2.2.a"></a>
[Back to top](#table)

Remove these column list ['CANCELLATION_REASON', 'AIR_SYSTEM_DELAY', 'SECURITY_DELAY', 'AIRLINE_DELAY', 'LATE_AIRCRAFT_DELAY', 'WEATHER_DELAY'] from the flightsRawDf . These columns have high number of missing values. Then, display the number of retained columns in flightsRawDf.

In [5]:
#list down the columns to be removed
columnsToDrop = ['CANCELLATION_REASON', 'AIR_SYSTEM_DELAY', 'SECURITY_DELAY', 'AIRLINE_DELAY', 'LATE_AIRCRAFT_DELAY', 'WEATHER_DELAY'] 
#dropping the columns from the list
flightsRawDf = flightsRawDf.drop(*columnsToDrop)
#display the number of retained columns
print("The number of retained columns is " + str(len(flightsRawDf.columns)))

The number of retained columns is 25



### 1.2.2.b Drop rows with Null & NaN values <a class="anchor" id="1.2.2.b"></a>
[Back to top](#table)

Drop rows with Null and Nan values from resulting flightsRawDf. Please name it as flightsDf. Display the number of rows (or number of records) in flightsDf.

In [6]:
flightsDf=flightsRawDf.na.drop()
print("The number of rows after dropping is " + str(flightsDf.count()))

The number of rows after dropping is 571729


## 1.3 Data Labelling<a class="anchor" id="1.3"></a>

### 1.3.1 Add label columns for binary classification <a class="anchor" id="1.3.1"></a>
[Back to top](#table)

The new binary labels generated from arrival delay and departure delay columns for the binary classification task purpose. The new column names are binaryArrDelay and binaryDeptDelay, which are generated from arrival delay and departure delay respectively. Label the data as 1 (late) if the delay value is positive, otherwise label it as 0 (not late).

In [7]:
import pyspark.sql.functions as F
# for arrival delay
flightsDf = flightsDf.withColumn("binaryArrDelay", F.when(F.col("ARRIVAL_DELAY") > 0, 1).otherwise(0))
# for departure delay
flightsDf = flightsDf.withColumn("binaryDeptDelay", F.when(F.col("DEPARTURE_DELAY") > 0, 1).otherwise(0))

### 1.3.2 Add label columns for multiclass classification <a class="anchor" id="1.3.2"></a>
[Back to top](#table)

The new multiclass labels generated from arrival delay and departure delay columns for the multiclass classification task purpose. The new column names are multiClassArrDelay and multiClassDeptDelay, which are generated from arrival delay and departure delay values respectively. For multiclass labels. Please make it three classes as early, on time, and late which are represented by the labels 0, 1, and 2, respectively. The values below 5 are regarded as early, 5 to 20 are regarded as on time, and above 20 are regarded as late.

In [8]:
# for arrival delay
flightsDf = flightsDf.withColumn('multiClassArrDelay', F.when(F.col('ARRIVAL_DELAY') < 5, 0).when((F.col('ARRIVAL_DELAY') >= 5) & (F.col('ARRIVAL_DELAY') <= 20), 1).otherwise(2))
# for departure delay
flightsDf = flightsDf.withColumn('multiClassDeptDelay', F.when(F.col('DEPARTURE_DELAY') < 5, 0).when((F.col('DEPARTURE_DELAY') >= 5) & (F.col('DEPARTURE_DELAY') <= 20), 1).otherwise(2))

## 1.4 Data Exploration <a class="anchor" id="1.4"></a>

### 1.4.1 Show statistics of numeric columns<a class="anchor" id="1.4.1"></a>
[Back to top](#table)

Show the basic statistics (count, mean, stddev, min, max, 25 and 75 percentile) for all numerical columns (integer-type data) of the flightsDf, except for the class label columns: binaryArrDelay, binaryDeptDelay, multiClassArrDelay and multiClassDeptDelay.

In [9]:
# choosing only the int type columns and excluding the class label columns
flightsDf.select([c for c in flightsDf.columns if (dict(flightsDf.dtypes)[c] == "int") & (c not in {'binaryArrDelay', 'binaryDeptDelay', 'multiClassArrDelay', 'multiClassDeptDelay'})]).summary("count","mean","stddev", "min", "25%", "75%", "max").show()

[Stage 13:>                                                         (0 + 1) / 1]

+-------+--------------------+-----------------+------------------+------------------+------------------+-------------------+------------------+------------------+-----------------+------------------+------------------+-----------------+------------------+-----------------+------------------+-----------------+------------------+------------------+-----------------+--------+---------+
|summary|                YEAR|            MONTH|               DAY|       DAY_OF_WEEK|     FLIGHT_NUMBER|SCHEDULED_DEPARTURE|    DEPARTURE_TIME|   DEPARTURE_DELAY|         TAXI_OUT|        WHEELS_OFF|    SCHEDULED_TIME|     ELAPSED_TIME|          AIR_TIME|         DISTANCE|         WHEELS_ON|          TAXI_IN| SCHEDULED_ARRIVAL|      ARRIVAL_TIME|    ARRIVAL_DELAY|DIVERTED|CANCELLED|
+-------+--------------------+-----------------+------------------+------------------+------------------+-------------------+------------------+------------------+-----------------+------------------+------------------+-------

                                                                                

### 1.4.2 Show number of records in each class (binary and multiclass)<a class="anchor" id="1.4.2"></a>
[Back to top](#table)

Write code to count and show number of records in each class for the binary classification (i.e., class labels 0 and 1) and the multiclass classification (i.e., class labels 0, 1, 2) based on the columns binaryArrDelay, binaryDeptDelay, multiClassArrDelay and multiClassDeptDelay. Do you see any class imbalance? Describe what you observe and discuss how it could impact classification.

In [10]:
# count the number of records in binaryArrDelay
flightsDf.groupBy('binaryArrDelay').count().show()

# count the number of records in binaryDeptDelay
flightsDf.groupBy('binaryDeptDelay').count().show()

# count the number of records in multiClassArrDelay
flightsDf.groupBy('multiClassArrDelay').count().show()

# count the number of records in multiClassDeptDelay
flightsDf.groupBy('multiClassDeptDelay').count().show()

+--------------+------+
|binaryArrDelay| count|
+--------------+------+
|             1|209055|
|             0|362674|
+--------------+------+

+---------------+------+
|binaryDeptDelay| count|
+---------------+------+
|              1|211228|
|              0|360501|
+---------------+------+

+------------------+------+
|multiClassArrDelay| count|
+------------------+------+
|                 1| 82428|
|                 2| 85692|
|                 0|403609|
+------------------+------+

+-------------------+------+
|multiClassDeptDelay| count|
+-------------------+------+
|                  1| 78843|
|                  2| 84831|
|                  0|408055|
+-------------------+------+



In the binary classification, it could be observed that the number of records for label 0 (+-360000) is higher than that for label 1 (+-210000), which is a difference of approximately 1.7. As for the multiclass classification, while number of records for lables 1 and 2 are relatively the same, the number of record for label 0 is almost 5 times larger, which creates a significant class imbalance. 
Obviously, the impact of the class imbalance is negative, since it leads to the machine learning models over-classify the larger classes, which is 0 for both binary and multiclass classification, because of their increaed prior probability. Thus, since the machine learning classifier is more biased towards the majority class, the instances of the smaller classes are missclassifies.

# 2 Feature extraction and ML Training <a class="anchor" id="2"></a>
In this section, we will need to use PySpark DataFrame functions and ML packages for data preparation, model building and evaluation.

## 2.1 Prepare the feature columns <a class="anchor" id="2.1"></a>
### 2.1.1 Add additional feature column "DEPT_TIME_FLAG"<a class="anchor" id="2.1.1"></a>
[Back to top](#table)

Write code to prepare an additional feature column called “DEPT_TIME_FLAG” (in string type), by mapping the values in the column “SCHEDULED_DEPARTURE” (in integer-type) to one of the five categories in DEPT_TIME_FLAG, according to the following ranges:

SCHEDULED_DEPARTURE DEPT_TIME_FLAG
    * 0000 - 0500 Midnight
    * 0500 - 1100 Morning
    * 1100 - 1600 Afternoon
    * 1600 - 2000 Evening
    * 2000 - 2359 Night

In [11]:
#mapping the values
flightsDf = flightsDf.withColumn('DEPT_TIME_FLAG', F.when((F.col('SCHEDULED_DEPARTURE') >= 0) & (F.col('SCHEDULED_DEPARTURE') < 500), "Midnight")
                                 .when((F.col('SCHEDULED_DEPARTURE') >= 500) & (F.col('SCHEDULED_DEPARTURE') < 1100), "Morning")
                                 .when((F.col('SCHEDULED_DEPARTURE') >= 1100) & (F.col('SCHEDULED_DEPARTURE') < 1600), "Afternoon")
                                 .when((F.col('SCHEDULED_DEPARTURE') >= 1600) & (F.col('SCHEDULED_DEPARTURE') < 2000), "Evening")
                                 .when((F.col('SCHEDULED_DEPARTURE') >= 2000) & (F.col('SCHEDULED_DEPARTURE') <= 2359), "Night")
                                 .otherwise("Invalid value"))
#showing the result
flightsDf.show()

+----+-----+---+-----------+-------+-------------+-----------+--------------+-------------------+-------------------+--------------+---------------+--------+----------+--------------+------------+--------+--------+---------+-------+-----------------+------------+-------------+--------+---------+--------------+---------------+------------------+-------------------+--------------+
|YEAR|MONTH|DAY|DAY_OF_WEEK|AIRLINE|FLIGHT_NUMBER|TAIL_NUMBER|ORIGIN_AIRPORT|DESTINATION_AIRPORT|SCHEDULED_DEPARTURE|DEPARTURE_TIME|DEPARTURE_DELAY|TAXI_OUT|WHEELS_OFF|SCHEDULED_TIME|ELAPSED_TIME|AIR_TIME|DISTANCE|WHEELS_ON|TAXI_IN|SCHEDULED_ARRIVAL|ARRIVAL_TIME|ARRIVAL_DELAY|DIVERTED|CANCELLED|binaryArrDelay|binaryDeptDelay|multiClassArrDelay|multiClassDeptDelay|DEPT_TIME_FLAG|
+----+-----+---+-----------+-------+-------------+-----------+--------------+-------------------+-------------------+--------------+---------------+--------+----------+--------------+------------+--------+--------+---------+-------+----

### 2.1.2 Select relevant columns (both feature & label columns)<a class="anchor" id="2.1.2"></a>
[Back to top](#table)

Write the code to create a new dataframe consisting of relevant columns, and name it as ‘flightsDf_data’. The relevant columns are given as follows: 'MONTH', 'DAY', 'DAY_OF_WEEK', 'AIRLINE', 'DEPT_TIME_FLAG', 'ELAPSED_TIME','DISTANCE', 'TAXI_IN', 'TAXI_OUT', 'AIR_TIME', 'SCHEDULED_TIME', 'binaryArrDelay', 'binaryDeptDelay', 'multiClassDeptDelay', 'multiClassArrDelay'.

In [12]:
# select the columns and place them into the new DF
flightsDf_data = flightsDf.select('MONTH', 'DAY', 'DAY_OF_WEEK', 'AIRLINE', 'DEPT_TIME_FLAG', 'ELAPSED_TIME','DISTANCE', 'TAXI_IN', 'TAXI_OUT', 'AIR_TIME', 'SCHEDULED_TIME', 'binaryArrDelay', 'binaryDeptDelay', 'multiClassDeptDelay', 'multiClassArrDelay')
# showing the result
flightsDf_data.show()

+-----+---+-----------+-------+--------------+------------+--------+-------+--------+--------+--------------+--------------+---------------+-------------------+------------------+
|MONTH|DAY|DAY_OF_WEEK|AIRLINE|DEPT_TIME_FLAG|ELAPSED_TIME|DISTANCE|TAXI_IN|TAXI_OUT|AIR_TIME|SCHEDULED_TIME|binaryArrDelay|binaryDeptDelay|multiClassDeptDelay|multiClassArrDelay|
+-----+---+-----------+-------+--------------+------------+--------+-------+--------+--------+--------------+--------------+---------------+-------------------+------------------+
|    6| 26|          5|     EV|       Morning|         141|     866|     15|      13|     113|           155|             0|              0|                  0|                 0|
|   12| 19|          6|     WN|       Evening|          96|     634|      4|      13|      79|           115|             0|              0|                  0|                 0|
|    1| 10|          6|     WN|       Morning|          80|     371|      6|      11|      63|      

## 2.2. Preparing any Spark ML Transformers/ Estimators for features and models<a class="anchor" id="2.2"></a>
### 2.2.1 Create Transformers/Estimators for feature columns <a class="anchor" id="2.2.1"></a>
[Back to top](#table)

Write code to create Transformers/Estimators for transforming/assembling the columns selected above in 2.1.2.

In [13]:
from pyspark.ml.feature import StringIndexer
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import OneHotEncoder

# the categorical columns 
catCols = ['MONTH', 'DAY', 'DAY_OF_WEEK', 'AIRLINE', 'DEPT_TIME_FLAG']
# the numerical columns
intCols = [ 'ELAPSED_TIME','DISTANCE', 'TAXI_IN','TAXI_OUT', 'AIR_TIME', 'SCHEDULED_TIME']

# label columns to exclude 
labelCols = ['binaryArrDelay', 'binaryDeptDelay', 'multiClassDeptDelay', 'multiClassArrDelay']
# get all the input columns 
inputCols = [x for x in flightsDf_data.columns if x not in labelCols] 


# 1. Indexer only for categorical:
outputCols_index=[f'{x}_index' for x in catCols] # defining the output columns
indexer = StringIndexer(inputCols=catCols, outputCols=outputCols_index) #initializing the string indexer
indexer = indexer.setHandleInvalid("skip")

# 2.1 Encoder only for categorical:
outputCols_enc=[f'{x}_vec' for x in catCols] # defining the output columns
encoder = OneHotEncoder(inputCols=outputCols_index, outputCols=outputCols_enc) # initializing the encoder

# make the input for assembler both categorical and numerical
inputColsAssembler = outputCols_enc + intCols

# 3. Assembler:
assembler = VectorAssembler(inputCols= inputColsAssembler, outputCol="features")


### 2.2.2 Create ML model Estimators for DT & Gradient Boosted tree for binary classification<a class="anchor" id="2.2.2"></a>
[Back to top](#table)

Create ML model Estimators for Decision Tree and Gradient Boosted Tree model for binary classification for both arrival and departure delays from the labels that you have created at task 1.3.1 (a) (PLEASE DO NOT fit/transform the data yet).

In [14]:
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.classification import GBTClassifier

# binary arrival delay
dt_arrival = DecisionTreeClassifier(featuresCol = 'features', labelCol = 'binaryArrDelay') # Decision Tree 
gbt_arrival = GBTClassifier(featuresCol="features",labelCol="binaryArrDelay") # Gradient Boosted Tree 

#binary departure delay
dt_departure = DecisionTreeClassifier(featuresCol = 'features', labelCol = 'binaryDeptDelay') # Decision Tree 
gbt_departure = GBTClassifier(featuresCol="features",labelCol="binaryDeptDelay") # Gradient Boosted Tree 

### 2.2.3 Create ML model Estimators for Naïve Bayes for multi-class classification<a class="anchor" id="2.2.3"></a>
[Back to top](#table)

Create ML model Estimators for Naive Bayes model for multiclass classification for only arrival delay from the labels that you have created at task 1.3.1 (b) (PLEASE DO NOT fit/transform the data yet).

In [15]:
from pyspark.ml.classification import NaiveBayes

# creating the naive bayes model for arrival delay only
nb_arrival = NaiveBayes(labelCol= "multiClassArrDelay", featuresCol ='features', smoothing=1.0, modelType="multinomial")

### 2.2.4 Create pipeline model <a class="anchor" id="2.2.4"></a>
[Back to top](#table)

Write code to include the above Transformers/Estimators into pipelines for all tasks (PLEASE DO NOT fit/transform the data yet).

In [16]:
from pyspark.ml import Pipeline

#getting the stages from 2.2.1
stage_1 = indexer
stage_2 = encoder
stage_3 = assembler


#Pipeline for binary arrival delay Decision Tree
stage_4 = dt_arrival
pipeline_dt_arr = Pipeline(stages=[stage_1,stage_2,stage_3, stage_4 ])

#Pipeline for binary arrival delay Gradient Boosted Tree 
stage_4 = gbt_arrival
pipeline_gbt_arr = Pipeline(stages=[stage_1,stage_2,stage_3, stage_4 ])

#Pipeline for binary departure delay Decision Tree
stage_4 = dt_departure
pipeline_dt_dept = Pipeline(stages=[stage_1,stage_2,stage_3, stage_4 ])

#Pipeline for binary departure delay Gradient Boosted Tree 
stage_4 = gbt_departure
pipeline_gbt_dept = Pipeline(stages=[stage_1,stage_2,stage_3, stage_4 ])

#Pipeline for Multiclass Arrival Delay
stage_4 = nb_arrival
pipeline_nb_arr = Pipeline(stages=[stage_1,stage_2,stage_3, stage_4 ])

## 2.3. Preparing the training and testing data <a class="anchor" id="2.3"></a>
Write code to randomly split the data into 80 percent and 20 percent proportion as training and testing data. You may use seed for the analysis purposes. This training and testing data will be used for model evaluation of all tasks in 2.4.
[Back to top](#table)

In [17]:
train, test = flightsDf_data.randomSplit([0.2, 0.8], seed = 567) #567 because it is prime

## 2.4. Training and evaluating models <a class="anchor" id="2.4"></a>
For three use cases below, please follow the instructions

[Back to top](#table)

### 2.4.1 Binary Classification tasks<a class="anchor" id="2.4.1"></a>
Binary classification task (Using Decision Tree and Gradient Boosted Tree) for both arrival and departure delay classification (4 models).

a. Write code to use the corresponding ML Pipelines to train the models on the training data. (Use fit/transform here)


In [18]:
# For the decision tree arrival
pipelineModel_dt_arr = pipeline_dt_arr.fit(train)
predictions_dt_arr = pipelineModel_dt_arr.transform(test)

# For the Gradient Boosted Tree arrival
pipelineModel_gbt_arr = pipeline_gbt_arr.fit(train)
predictions_gbt_arr = pipelineModel_gbt_arr.transform(test)

# For the decision tree departure
pipelineModel_dt_dept = pipeline_dt_dept.fit(train)
predictions_dt_dept = pipelineModel_dt_dept.transform(test)

# For the Gradient Boosted Tree departure 
pipelineModel_gbt_dept = pipeline_gbt_dept.fit(train)
predictions_gbt_dept = pipelineModel_gbt_dept.transform(test)

                                                                                


b. For both models and for both delays, code to display the count of each combination of late/not late label and prediction label in formats as shown in the example Fig.1.


In [19]:
# For the decision tree arrival
print("Decision tree")
combination_dt_arr = predictions_dt_arr.groupby('binaryArrDelay', 'prediction')
combination_dt_arr = combination_dt_arr.agg(F.count('binaryArrDelay').alias('count'))
combination_dt_arr.withColumnRenamed('binaryArrDelay', 'late (arrival)').show()

# For the decision tree departure
combination_dt_dept = predictions_dt_dept.groupby('binaryDeptDelay', 'prediction')
combination_dt_dept = combination_dt_dept.agg(F.count('binaryDeptDelay').alias('count'))
combination_dt_dept.withColumnRenamed('binaryDeptDelay', 'late (departure)').show()

print("Gradient Boosted Tree")
# For the Gradient Boosted Tree arrival
combination_gbt_arr = predictions_gbt_arr.groupby('binaryArrDelay', 'prediction')
combination_gbt_arr = combination_gbt_arr.agg(F.count('binaryArrDelay').alias('count'))
combination_gbt_arr.withColumnRenamed('binaryArrDelay', 'late (arrival)').show()

# For the Gradient Boosted Tree departure 
combination_gbt_dept = predictions_gbt_dept.groupby('binaryDeptDelay', 'prediction')
combination_gbt_dept = combination_gbt_dept.agg(F.count('binaryDeptDelay').alias('count'))
combination_gbt_dept.withColumnRenamed('binaryDeptDelay', 'late (departure)').show()

Decision tree


                                                                                

+--------------+----------+------+
|late (arrival)|prediction| count|
+--------------+----------+------+
|             1|       0.0|123486|
|             0|       0.0|273902|
|             1|       1.0| 43944|
|             0|       1.0| 16147|
+--------------+----------+------+



                                                                                

+----------------+----------+------+
|late (departure)|prediction| count|
+----------------+----------+------+
|               1|       0.0|122337|
|               0|       0.0|254375|
|               1|       1.0| 46774|
|               0|       1.0| 33993|
+----------------+----------+------+

Gradient Boosted Tree


                                                                                

+--------------+----------+------+
|late (arrival)|prediction| count|
+--------------+----------+------+
|             1|       0.0|104173|
|             0|       0.0|270033|
|             1|       1.0| 63257|
|             0|       1.0| 20016|
+--------------+----------+------+





+----------------+----------+------+
|late (departure)|prediction| count|
+----------------+----------+------+
|               1|       0.0|120282|
|               0|       0.0|254933|
|               1|       1.0| 48829|
|               0|       1.0| 33435|
+----------------+----------+------+



                                                                                

c. Computing the AUC, accuracy, recall, and precision for the late/not late label from each model testing result using pyspark MLlib/ML APIs.


In [20]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

predictions = [predictions_dt_arr, predictions_dt_dept, predictions_gbt_arr, predictions_gbt_dept]
namings = ['Decision tree arrival', 'Decision tree departure', 'Gradient Boosted Tree arrival','Gradient Boosted Tree departure']
labels = ['binaryArrDelay', 'binaryDeptDelay', 'binaryArrDelay', 'binaryDeptDelay']

for i in range(4):
    print(namings[i])
    
    #calculating AUC
    evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction", labelCol = labels[i] , metricName = 'areaUnderROC')
    auc = evaluator.evaluate(predictions[i])
    print("AUC: "+str(auc))
    
    # calculating accuracy
    evaluator = MulticlassClassificationEvaluator(predictionCol='prediction', labelCol=labels[i], metricName='accuracy')
    accuracy = evaluator.evaluate(predictions[i])
    print("Accuracy: "+str(accuracy))
    
    #calculating precision
    evaluator = MulticlassClassificationEvaluator(predictionCol='prediction', labelCol=labels[i], metricName='precisionByLabel')
    precision = evaluator.evaluate(predictions[i])
    print("Precision: "+str(precision))
    
    #calculating recall
    evaluator = MulticlassClassificationEvaluator(predictionCol='prediction', labelCol=labels[i], metricName='recallByLabel')
    recall = evaluator.evaluate(predictions[i])
    print("Recall: "+str(recall))
    
    print("-----------------------------------------------------------------------------")
    

Decision tree arrival


                                                                                

AUC: 0.3955740101191193


                                                                                

Accuracy: 0.6947772466058552


                                                                                

Precision: 0.6892558406393752


                                                                                

Recall: 0.9443300959493052
-----------------------------------------------------------------------------
Decision tree departure


                                                                                

AUC: 0.453202115095796


                                                                                

Accuracy: 0.6582793964313116




Precision: 0.6752505893096052




Recall: 0.8821193752427454
-----------------------------------------------------------------------------
Gradient Boosted Tree arrival


                                                                                

AUC: 0.7573592611653716


                                                                                

Accuracy: 0.7285361732451107


                                                                                

Precision: 0.7216159014019016


                                                                                

Recall: 0.9309909704911928
-----------------------------------------------------------------------------
Gradient Boosted Tree departure


                                                                                

AUC: 0.6670489927417441


                                                                                

Accuracy: 0.6639911340192665


                                                                                

Precision: 0.6794317924390016




Recall: 0.8840544027076513
-----------------------------------------------------------------------------


                                                                                

d. For each classification task, we discuss which is the better model based on performance metric above, and persist the better model and name the persisted models as ‘arrival_delay_prediction_model’ and ‘departure_delay_prediction_model’

Based on the performance metric above, it may be observed that for the arrival delay value, GBT has slightly higher values for accuracy (+-0.727) and precision (+-0.720) than the DT with accuracy of +-0.693 and precision +-0.685; and significantly higher value of AUC, which is +-0.756 for GBT and +-0.400 for DT. However, recall for DT is slightly higher (+-0.956) than that of GBT (+-0.932). Accuracy, precision and AUC are vital, when assesing the model performance, since precision states how many classes are actually positive out of all the correctly predicted positive classes and recall evaluates how many predictions are correct out of all the positive classes , while AUC measurea of the ability of a classifier to distinguish between classes. Thus, it may be concluded that the GBT model is better than the DT model for the arrival delay. 

As for the departure delay, all the values of performance metric for GBT are noticeably higher than those for DT, thus for the DT, GBT is is better model.

In [21]:
arrival_delay_prediction_model = predictions_gbt_arr
departure_delay_prediction_model = predictions_gbt_dept

e. For each classification task and for each classifier (i.e. Decision Tree and Gradient Boosted Tree),we print out the tree structure, and the top-3 features with each corresponding feature importance.

In [22]:
import pandas as pd
# the function below was taken from https://www.timlrx.com/blog/feature-selection-using-feature-importance-score-creating-a-pyspark-estimator
def ExtractFeatureImp(featureImp, dataset, featuresCol):
    list_extract = []
    for i in dataset.schema[featuresCol].metadata["ml_attr"]["attrs"]:
        list_extract = list_extract + dataset.schema[featuresCol].metadata["ml_attr"]["attrs"][i]
    varlist = pd.DataFrame(list_extract)
    varlist['score'] = varlist['idx'].apply(lambda x: featureImp[x])
    return(varlist.sort_values('score', ascending = False))

print("Decision tree arrival")
print("---------------------")
# print out tree for Decision tree arrival
print(pipelineModel_dt_arr.stages[-1].toDebugString)
# for i in range(10):
print(ExtractFeatureImp(pipelineModel_dt_arr.stages[-1].featureImportances, predictions_dt_arr, "features").head(3))
print("-----------------------------------------------------------------------------")

print("Decision tree departure")
print("------------------------")
# print out tree for Decision tree departure
print(pipelineModel_dt_dept.stages[-1].toDebugString)
print(ExtractFeatureImp(pipelineModel_dt_dept.stages[-1].featureImportances, predictions_dt_dept, "features").head(3))
print("-----------------------------------------------------------------------------")

print("Gradient Boosted Tree arrival")
print("------------------------------")
# print out tree for Gradient Boosted Tree arrival
print(pipelineModel_gbt_arr.stages[-1].toDebugString)
print(ExtractFeatureImp(pipelineModel_gbt_arr.stages[-1].featureImportances, predictions_gbt_arr, "features").head(3))
print("-----------------------------------------------------------------------------")

print("Gradient Boosted Tree departure")
print("------------------------------")
# print out tree for Gradient Boosted Tree departure
print(pipelineModel_gbt_dept.stages[-1].toDebugString)
print(ExtractFeatureImp(pipelineModel_gbt_dept.stages[-1].featureImportances, predictions_gbt_dept, "features").head(3))


Decision tree arrival
---------------------
DecisionTreeClassificationModel: uid=DecisionTreeClassifier_27460062f268, depth=5, numNodes=23, numClasses=2, numFeatures=70
  If (feature 67 <= 24.5)
   If (feature 66 <= 18.5)
    Predict: 0.0
   Else (feature 66 > 18.5)
    If (feature 66 <= 27.5)
     If (feature 67 <= 14.5)
      If (feature 47 in {1.0})
       Predict: 1.0
      Else (feature 47 not in {1.0})
       Predict: 0.0
     Else (feature 67 > 14.5)
      Predict: 1.0
    Else (feature 66 > 27.5)
     Predict: 1.0
  Else (feature 67 > 24.5)
   If (feature 67 <= 33.5)
    If (feature 69 <= 159.5)
     Predict: 1.0
    Else (feature 69 > 159.5)
     If (feature 64 <= 166.5)
      Predict: 0.0
     Else (feature 64 > 166.5)
      Predict: 1.0
   Else (feature 67 > 33.5)
    If (feature 69 <= 159.5)
     Predict: 1.0
    Else (feature 69 > 159.5)
     If (feature 64 <= 166.5)
      If (feature 51 in {1.0})
       Predict: 1.0
      Else (feature 51 not in {1.0})
       Predict: 0.0

### 2.4.2 Multiclass Classification tasks<a class="anchor" id="2.4.2"></a>
Multiclass classification task (using Naive Bayes) for only arrival delay classification (1 model).

a. Code to use the corresponding ML Pipelines to train the model on the training data (fit/transform). This label is generated from task 1.3.1.b.

In [23]:
# For the Multiclass classification arrival
pipelineModel_nb_arr = pipeline_nb_arr.fit(train)
predictions_nb_arr = pipelineModel_nb_arr.transform(test)



b. Code to display the count of each combination of early/on-time/late label and prediction label.

In [24]:
# For the Multiclass classification arrival
print("Multiclass classification arrival")
combination_nb_arr = predictions_nb_arr.groupby('multiClassArrDelay', 'prediction')
combination_nb_arr = combination_nb_arr.agg(F.count('multiClassArrDelay').alias('count'))
combination_nb_arr.withColumnRenamed('multiClassArrDelay', 'late (arrival)').show()

Multiclass classification arrival




+--------------+----------+------+
|late (arrival)|prediction| count|
+--------------+----------+------+
|             2|       1.0|  1124|
|             1|       0.0| 31292|
|             0|       2.0|120589|
|             0|       0.0|199176|
|             1|       2.0| 33170|
|             2|       2.0| 35482|
|             1|       1.0|  1525|
|             2|       0.0| 32007|
|             0|       1.0|  3114|
+--------------+----------+------+





c. Computing the AUC, accuracy, recall, and precision for the early/on-time/late label from Naive Bayes model from pyspark MLlib/ML APIs.

In [25]:
print("Multiclass classification arrival")

'''
#calculating AUC
evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction", labelCol = 'multiClassArrDelay' , metricName = 'areaUnderROC')
auc = evaluator.evaluate(predictions_nb_arr)
print("AUC: "+str(auc))
'''    
# calculating accuracy
evaluator = MulticlassClassificationEvaluator(predictionCol='prediction', labelCol='multiClassArrDelay', metricName='accuracy')
accuracy = evaluator.evaluate(predictions_nb_arr)
print("Accuracy: "+str(accuracy))
    
#calculating precision
evaluator = MulticlassClassificationEvaluator(predictionCol='prediction', labelCol='multiClassArrDelay', metricName='precisionByLabel')
precision = evaluator.evaluate(predictions_nb_arr)
print("Precision: "+str(precision))
    
#calculating recall
evaluator = MulticlassClassificationEvaluator(predictionCol='prediction', labelCol='multiClassArrDelay', metricName='recallByLabel')
recall = evaluator.evaluate(predictions_nb_arr)
print("Recall: "+str(recall))

Multiclass classification arrival


                                                                                

Accuracy: 0.5162706922066368


                                                                                

Precision: 0.758837984569959




Recall: 0.6168750522641608




d. Discussing which metric is more proper for measuring the model performance on predicting early/on-time/late events, in order to give the performers good recommendations:

 In my opinion, precision is the most proper metric for measuring the model performance on predicting early/on-time/late events. Precision is the measurement of how many classes are actually positive out of all the positive classes we have predicted. In other words, it measures how many of predicted as 'late' events are actually late.

e. Discussing the ways the classification performance can be improved:

As it may be observed from the results is (c), even though the accuracy, precision and recall values are relatively high, the AUC, which is the measure of the ability of a classifier to distinguish between classes, is as low as +-0.442. AUC is an important metric in measuring the performance of a model at distinguishing between the positive and negative classes. 
In order to improve the AUC metric, and the classification performance overall there are several possible approaches:
1. Feature normalization and scaling: makes the values of each feature in the data have zero-mean and unit-variance. Improves the performance of the linear model
2. Improve class imbalance: as it may be noticed from part (b), occurence of some classes, such as [0, 0.0] is extremely high compared to other classes. The approaches such as setting class weights or performing the data sampling may significantly improve the performance
3. Hyperparameter tuning: use algorithms such as grid search or random search to tune the data and hyper-parameters to get a best fit. 

In [26]:
pipelineModel_dt_arr.save("binary_arrival_pipeline_model_dt")  # Save Decision Tree model
pipelineModel_gbt_arr.save("binary_arrival_pipeline_model_gbt")  # Save GBT model

24/10/22 14:22:07 ERROR Instrumentation: java.io.IOException: Path binary_arrival_pipeline_model_dt already exists. To overwrite it, please use write.overwrite().save(path) for Scala and use write().overwrite().save(path) for Java and Python.
	at org.apache.spark.ml.util.FileSystemOverwrite.handleOverwrite(ReadWrite.scala:683)
	at org.apache.spark.ml.util.MLWriter.save(ReadWrite.scala:167)
	at org.apache.spark.ml.PipelineModel$PipelineModelWriter.super$save(Pipeline.scala:344)
	at org.apache.spark.ml.PipelineModel$PipelineModelWriter.$anonfun$save$4(Pipeline.scala:344)
	at org.apache.spark.ml.MLEvents.withSaveInstanceEvent(events.scala:174)
	at org.apache.spark.ml.MLEvents.withSaveInstanceEvent$(events.scala:169)
	at org.apache.spark.ml.util.Instrumentation.withSaveInstanceEvent(Instrumentation.scala:42)
	at org.apache.spark.ml.PipelineModel$PipelineModelWriter.$anonfun$save$3(Pipeline.scala:344)
	at org.apache.spark.ml.PipelineModel$PipelineModelWriter.$anonfun$save$3$adapted(Pipeline

Py4JJavaError: An error occurred while calling o2440.save.
: java.io.IOException: Path binary_arrival_pipeline_model_dt already exists. To overwrite it, please use write.overwrite().save(path) for Scala and use write().overwrite().save(path) for Java and Python.
	at org.apache.spark.ml.util.FileSystemOverwrite.handleOverwrite(ReadWrite.scala:683)
	at org.apache.spark.ml.util.MLWriter.save(ReadWrite.scala:167)
	at org.apache.spark.ml.PipelineModel$PipelineModelWriter.super$save(Pipeline.scala:344)
	at org.apache.spark.ml.PipelineModel$PipelineModelWriter.$anonfun$save$4(Pipeline.scala:344)
	at org.apache.spark.ml.MLEvents.withSaveInstanceEvent(events.scala:174)
	at org.apache.spark.ml.MLEvents.withSaveInstanceEvent$(events.scala:169)
	at org.apache.spark.ml.util.Instrumentation.withSaveInstanceEvent(Instrumentation.scala:42)
	at org.apache.spark.ml.PipelineModel$PipelineModelWriter.$anonfun$save$3(Pipeline.scala:344)
	at org.apache.spark.ml.PipelineModel$PipelineModelWriter.$anonfun$save$3$adapted(Pipeline.scala:344)
	at org.apache.spark.ml.util.Instrumentation$.$anonfun$instrumented$1(Instrumentation.scala:191)
	at scala.util.Try$.apply(Try.scala:213)
	at org.apache.spark.ml.util.Instrumentation$.instrumented(Instrumentation.scala:191)
	at org.apache.spark.ml.PipelineModel$PipelineModelWriter.save(Pipeline.scala:344)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:75)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:52)
	at java.base/java.lang.reflect.Method.invoke(Method.java:580)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:1583)


In [None]:
import shutil

# Zip the saved model directory (for Decision Tree)
shutil.make_archive('binary_arrival_pipeline_model_dt', 'zip', 'binary_arrival_pipeline_model_dt')

# Zip the saved model directory (for Gradient Boosted Tree)
shutil.make_archive('binary_arrival_pipeline_model_gbt', 'zip', 'binary_arrival_pipeline_model_gbt')