# Predicting flight delays using PySpark

In this Notebook we will predict flight delays using PySpark. We will use to basic machine learning models namely Decision Tree and Logistic Regression to achieve this task.

This notebook is based on [RASHID60's notebook](https://www.kaggle.com/code/rashid60/ml-with-pyspark-predicting-flight-delays) and is used just for learning purposes.

## Input Data

Data columns:
- mon — month (int between 1 and 12)
- dom — day of month (int between 1 and 31)
- dow — day of week (int; 1 = Monday and 7 = Sunday)
- org — origin airport (str; IATA code)
- mile — distance (int; miles)
- carrier — carrier (str; IATA code)
- depart — departure time (int; decimal hour)
- duration — expected duration (int; minutes)
- delay — delay (int; minutes)
- (IATA -> International Air Transport Association)

### Importing packages and libs

In [1]:
# import numpy as np
# import pandas as pd
# import os
# import pyspark
from pyspark.sql import SparkSession
# from pyspark.sql.types import StructType, StructField, IntegerType, StringType

from pyspark.sql.functions import round
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator, BinaryClassificationEvaluator

In [2]:
spark = SparkSession.builder \
                    .master('local[*]') \
                    .appName('FlightDelayPred') \
                    .getOrCreate()

# Get Spark Version
print(spark.version)

3.1.2


### Data Exploration

In [3]:
flights_df = spark.read.csv(
                            "flights-larger.csv",
                            sep=",",
                            header=True,
                            inferSchema=True,
                            nullValue="NA")

In [5]:
# Get amount of records
print(f"The data contains {flights_df.count()} records.")

The data contains 275000 records.


In [6]:
# Check first 10 rows
flights_df.show(10)

+---+---+---+-------+------+---+----+------+--------+-----+
|mon|dom|dow|carrier|flight|org|mile|depart|duration|delay|
+---+---+---+-------+------+---+----+------+--------+-----+
| 10| 10|  1|     OO|  5836|ORD| 157|  8.18|      51|   27|
|  1|  4|  1|     OO|  5866|ORD| 466|  15.5|     102| null|
| 11| 22|  1|     OO|  6016|ORD| 738|  7.17|     127|  -19|
|  2| 14|  5|     B6|   199|JFK|2248| 21.17|     365|   60|
|  5| 25|  3|     WN|  1675|SJC| 386| 12.92|      85|   22|
|  3| 28|  1|     B6|   377|LGA|1076| 13.33|     182|   70|
|  5| 28|  6|     B6|   904|ORD| 740|  9.58|     130|   47|
|  1| 19|  2|     UA|   820|SFO| 679| 12.75|     123|  135|
|  8|  5|  5|     US|  2175|LGA| 214|  13.0|      71|  -10|
|  5| 27|  5|     AA|  1240|ORD|1197| 14.42|     195|  -11|
+---+---+---+-------+------+---+----+------+--------+-----+
only showing top 10 rows



In [11]:
# Check data types
print(flights_df.dtypes)

[('mon', 'int'), ('dom', 'int'), ('dow', 'int'), ('carrier', 'string'), ('flight', 'int'), ('org', 'string'), ('mile', 'int'), ('depart', 'double'), ('duration', 'int'), ('delay', 'int')]


## Data preparation

**Data Cleaning**
- Removing uninformative column
- Removing rows with missing values

**Column/Data manipulation**
- A flight is considered "delayed" when it arrives more than 15 minutes after its scheduled time
- Creation of new boolean column "label" stating if a flight was delayed or not
- Convert columns that hold categorical data into indexed numerical values

**Assembling columns**
- Consolidate all predictor columns into a single one

In [15]:
# Removing "flight" column
flights_drop_flight_df = flights_df.drop("flight")

# Removing records with missing "delay" values
flights_valid_delay_df = flights_drop_flight_df.filter("delay IS NOT NULL")

# Remove records with missing values
flights_no_missing_df = flights_valid_delay_df.dropna()
print(f"The data contains now {flights_no_missing_df.count()} records.")

The data contains now 258289 records.


In [17]:
# Converting column "mile" to "km" and dropping it afterwards
flights_km_df = flights_no_missing_df.withColumn("km", round(flights_no_missing_df.mile * 1.609, 0)) \
                                .drop("mile")

In [18]:
# Creating "label" column for identifying whether a flight is delayed or not
flights_label_df = flights_km_df.withColumn("label", (flights_km_df.delay >= 15).cast("integer"))

# Show first 5 records to check the result
flights_label_df.show(5)

+---+---+---+-------+---+------+--------+-----+------+-----+
|mon|dom|dow|carrier|org|depart|duration|delay|    km|label|
+---+---+---+-------+---+------+--------+-----+------+-----+
| 10| 10|  1|     OO|ORD|  8.18|      51|   27| 253.0|    1|
| 11| 22|  1|     OO|ORD|  7.17|     127|  -19|1187.0|    0|
|  2| 14|  5|     B6|JFK| 21.17|     365|   60|3617.0|    1|
|  5| 25|  3|     WN|SJC| 12.92|      85|   22| 621.0|    1|
|  3| 28|  1|     B6|LGA| 13.33|     182|   70|1731.0|    1|
+---+---+---+-------+---+------+--------+-----+------+-----+
only showing top 5 rows



Check [StringIndexer](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.ml.feature.StringIndexer.html#stringindexer) in Spark docs.

In [26]:
# Create an indexer, which identifies categories and then creates a new column
# with numeric index values
flights_indexed_df = StringIndexer(inputCol="carrier", outputCol="carrier_idx") \
                                    .fit(flights_label_df) \
                                    .transform(flights_label_df)
# Repeating the process for org column
flights_indexed_df = StringIndexer(inputCol="org", outputCol="org_idx") \
                                    .fit(flights_indexed_df) \
                                    .transform(flights_indexed_df)

In [27]:
flights_indexed_df.show(5)

+---+---+---+-------+---+------+--------+-----+------+-----+-----------+-------+
|mon|dom|dow|carrier|org|depart|duration|delay|    km|label|carrier_idx|org_idx|
+---+---+---+-------+---+------+--------+-----+------+-----+-----------+-------+
| 10| 10|  1|     OO|ORD|  8.18|      51|   27| 253.0|    1|        2.0|    0.0|
| 11| 22|  1|     OO|ORD|  7.17|     127|  -19|1187.0|    0|        2.0|    0.0|
|  2| 14|  5|     B6|JFK| 21.17|     365|   60|3617.0|    1|        4.0|    2.0|
|  5| 25|  3|     WN|SJC| 12.92|      85|   22| 621.0|    1|        3.0|    5.0|
|  3| 28|  1|     B6|LGA| 13.33|     182|   70|1731.0|    1|        4.0|    3.0|
+---+---+---+-------+---+------+--------+-----+------+-----+-----------+-------+
only showing top 5 rows



Check [VectorAssembler](https://spark.apache.org/docs/3.1.3/api/python/reference/api/pyspark.ml.feature.VectorAssembler.html) in Spark docs.

In [28]:
# Creating an assembler object
assembler_cols = ['mon', 'dom', 'dow', 'carrier_idx', 'org_idx', 'km', 'depart', 'duration']
assembler = VectorAssembler(inputCols=assembler_cols, outputCol="features")

# Consolidate predictor columns
flights_assembled_df = assembler.transform(flights_indexed_df)

# Checking the resulting column
flights_assembled_df.select("features", "delay").show(5, truncate=False)

+-----------------------------------------+-----+
|features                                 |delay|
+-----------------------------------------+-----+
|[10.0,10.0,1.0,2.0,0.0,253.0,8.18,51.0]  |27   |
|[11.0,22.0,1.0,2.0,0.0,1187.0,7.17,127.0]|-19  |
|[2.0,14.0,5.0,4.0,2.0,3617.0,21.17,365.0]|60   |
|[5.0,25.0,3.0,3.0,5.0,621.0,12.92,85.0]  |22   |
|[3.0,28.0,1.0,4.0,3.0,1731.0,13.33,182.0]|70   |
+-----------------------------------------+-----+
only showing top 5 rows



## ML Models

### 1) Decition Trees

Offers inherit simplicity and explainability

In [36]:
# Split into training and testing sets with 80-20 ratio
flights_train, flights_test = flights_assembled_df.randomSplit([0.8, 0.2], seed=2022)

# Check amount of records
training_ration = flights_train.count()/flights_assembled_df.count()
testing_ration = flights_test.count()/flights_assembled_df.count()

print(f"Training ratio: {training_ration}\nTesting ratio: {testing_ration}")

Training ratio: 0.799352663102184
Testing ratio: 0.20064733689781603


In [38]:
# Creating a Decision Tree classifier object and fit the training data
tree = DecisionTreeClassifier()
tree_model = tree.fit(flights_train)

# Creating predictions on test data
tree_prediction = tree_model.transform(flights_test)
tree_prediction.select("label", "prediction", "probability").show(5, truncate=False)

+-----+----------+----------------------------------------+
|label|prediction|probability                             |
+-----+----------+----------------------------------------+
|0    |1.0       |[0.3587724540373326,0.6412275459626674] |
|1    |0.0       |[0.6453918288098484,0.3546081711901516] |
|0    |1.0       |[0.44187343183718986,0.5581265681628101]|
|0    |1.0       |[0.3587724540373326,0.6412275459626674] |
|0    |1.0       |[0.3587724540373326,0.6412275459626674] |
+-----+----------+----------------------------------------+
only showing top 5 rows



**Evaluate the model**

A confusion matrix gives a useful breakdown of predictions versus known values. It has four cells which represent the counts of:
- True Negatives (TN) — prediction is negative & label is negative
- True Positives (TP) — prediction is positive & label is positive
- False Negatives (FN) — prediction is negative & label is positive
- False Positives (FP) — prediction is positive & label is negative

Using these four measure, we can then calculate the accuracy of the model as follows:

Accuracy=(TN+TP)/(TN+TP+FN+FP)

In [41]:
# Creating confusion matrix
tree_prediction.groupBy("label", "prediction").count().show()

# Calculating the elements of the confusion matrix
tree_TN = tree_prediction.filter('prediction = 0 AND label = prediction').count()
tree_TP = tree_prediction.filter('prediction = 1 AND label = prediction').count()
tree_FN = tree_prediction.filter('prediction = 0 AND label != prediction').count()
tree_FP = tree_prediction.filter('prediction = 1 AND label != prediction').count()

# Accuracy measures the proportion of correct predictions
tree_accuracy = (tree_TN + tree_TP) / (tree_TN + tree_TP + tree_FN + tree_FP)
print(f"Accuracy: {tree_accuracy}")

+-----+----------+-----+
|label|prediction|count|
+-----+----------+-----+
|    1|       0.0| 8800|
|    0|       0.0|15215|
|    1|       1.0|17517|
|    0|       1.0|10293|
+-----+----------+-----+

Accuracy: 0.6315870718765074


The accuracy is decent but not a good one. We have a lot of false predictions.

### Logistic Regression:

Simple and easy to train

In [43]:
# Creating a classifier object and train on training data
logistic = LogisticRegression().fit(flights_train)

# Creating predictions for the testing data and show confusion matrix
logistic_prediction = logistic.transform(flights_test)
logistic_prediction.groupBy("label", "prediction").count().show()

+-----+----------+-----+
|label|prediction|count|
+-----+----------+-----+
|    1|       0.0| 9747|
|    0|       0.0|14753|
|    1|       1.0|16570|
|    0|       1.0|10755|
+-----+----------+-----+



In [45]:
# Calculating the elements of the confusion matrix
logistic_TN = tree_prediction.filter('prediction = 0 AND label = prediction').count()
logistic_TP = tree_prediction.filter('prediction = 1 AND label = prediction').count()
logistic_FN = tree_prediction.filter('prediction = 0 AND label != prediction').count()
logistic_FP = tree_prediction.filter('prediction = 1 AND label != prediction').count()

# Calculate precision and recall
logistic_precision = logistic_TP / (logistic_TP + logistic_FP)
logistic_recall = logistic_TP / (logistic_TP + logistic_FN)
print('precision = {:.2f}\nrecall    = {:.2f}'.format(logistic_precision, logistic_recall))

# Find weighted precision
multi_evaluator = MulticlassClassificationEvaluator()
weighted_precision = multi_evaluator.evaluate(logistic_prediction,
                                              {multi_evaluator.metricName: "weightedPrecision"})

# Find AUC
binary_evaluator = BinaryClassificationEvaluator()
auc = binary_evaluator.evaluate(logistic_prediction, {binary_evaluator.metricName: "areaUnderROC"})

precision = 0.63
recall    = 0.67


Again the matrices are reflecting decent values but not good ones.
Which means, improving in models' efficiency can be considered as potential future work.

In [46]:
#Close spark session
spark.stop()