Initialing SparkSession and Loading Flight Data

In [1]:
# mounting the google drive to access the data
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [2]:
!pip install pyspark


import pyspark
from pyspark.sql import SparkSession

from pyspark.sql.types import StructType, StructField, IntegerType, StringType
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import SQLTransformer


import matplotlib.pyplot as plt
import numpy as np

# creating a spark session
spark = SparkSession.builder.master('local[*]').appName('Flight Delay Prediction').getOrCreate()

# reading the csv flight data from drive
flights_df = spark.read.csv('/content/drive/MyDrive/532/DelayedFlights.csv', sep=',', header=True, inferSchema=True, nullValue='NA')

# printing the columns and their types
flights_df.show(5)
print(flights_df.dtypes)

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.4.0.tar.gz (310.8 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m310.8/310.8 MB[0m [31m4.6 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.4.0-py2.py3-none-any.whl size=311317130 sha256=cce825f336dc4904b02b8c8171afe0b888867cecfbe03f28e6427f599c7567d0
  Stored in directory: /root/.cache/pip/wheels/7b/1b/4b/3363a1d04368e7ff0d408e57ff57966fcdf00583774e761327
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.4.0
+---+----+-----+----------+---------+-------+----------+-------+----------+-------------+---------+-------+-----------------+--------------+-------+--------+--------+------+----

Pre-processing the flight data 

In [3]:
# Removing rows with missing data
flights_df = flights_df.dropna()

# Removing columns which doesn't have much relation with the 'delay'
cols = []
for x in flights_df.dtypes:
    cols.append(x[0])

flights_df =  flights_df.drop('_c0')
for c in cols[-5:]:
    print(f"dropping column{c}")
    flights_df =  flights_df.drop(c)
flights_df.show(5)



dropping columnCarrierDelay
dropping columnWeatherDelay
dropping columnNASDelay
dropping columnSecurityDelay
dropping columnLateAircraftDelay
+----+-----+----------+---------+-------+----------+-------+----------+-------------+---------+-------+-----------------+--------------+-------+--------+--------+------+----+--------+------+-------+---------+----------------+--------+
|Year|Month|DayofMonth|DayOfWeek|DepTime|CRSDepTime|ArrTime|CRSArrTime|UniqueCarrier|FlightNum|TailNum|ActualElapsedTime|CRSElapsedTime|AirTime|ArrDelay|DepDelay|Origin|Dest|Distance|TaxiIn|TaxiOut|Cancelled|CancellationCode|Diverted|
+----+-----+----------+---------+-------+----------+-------+----------+-------------+---------+-------+-----------------+--------------+-------+--------+--------+------+----+--------+------+-------+---------+----------------+--------+
|2008|    1|         3|        4| 1829.0|      1755| 1959.0|      1925|           WN|     3920| N464WN|             90.0|          90.0|   77.0|    34.0|

In [4]:
# Converting non-numeric data into integers for various columns

flights_idx = StringIndexer(inputCol='UniqueCarrier', outputCol='carrier_transformed').fit(flights_df).transform(flights_df)
flights_idx = StringIndexer(inputCol='Origin', outputCol='org_transformed').fit(flights_idx).transform(flights_idx)
flights_idx = StringIndexer(inputCol='Dest', outputCol='dest_transformed').fit(flights_idx).transform(flights_idx)
flights_idx = StringIndexer(inputCol='TailNum', outputCol='tailnum_transformed').fit(flights_idx).transform(flights_idx)

flights_idx.show(5)

# collecting relevant columns
relevantCols = ([c[0] for c in flights_idx.dtypes if c[1] != 'string'])
relevantCols.remove('ArrDelay')
relevantCols.remove('DepDelay')
print(relevantCols)

# storing the processed data in drive for further usage
flights_idx.write.option("header",True).csv("/content/drive/MyDrive/532/ProcessedFlights")

print(" -------------------- Data is processed and saved --------------")

+----+-----+----------+---------+-------+----------+-------+----------+-------------+---------+-------+-----------------+--------------+-------+--------+--------+------+----+--------+------+-------+---------+----------------+--------+-------------------+---------------+----------------+-------------------+
|Year|Month|DayofMonth|DayOfWeek|DepTime|CRSDepTime|ArrTime|CRSArrTime|UniqueCarrier|FlightNum|TailNum|ActualElapsedTime|CRSElapsedTime|AirTime|ArrDelay|DepDelay|Origin|Dest|Distance|TaxiIn|TaxiOut|Cancelled|CancellationCode|Diverted|carrier_transformed|org_transformed|dest_transformed|tailnum_transformed|
+----+-----+----------+---------+-------+----------+-------+----------+-------------+---------+-------+-----------------+--------------+-------+--------+--------+------+----+--------+------+-------+---------+----------------+--------+-------------------+---------------+----------------+-------------------+
|2008|    1|         3|        4| 1829.0|      1755| 1959.0|      1925|     