# Flight Delay Status Classification

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, IntegerType, NullType
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.feature import StandardScaler, Imputer
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from pyspark.sql import SQLContext
import pyspark.ml.feature as ftr
import pyspark.ml as ml
 
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.classification import GBTClassifier
from pyspark.mllib.linalg.distributed import RowMatrix
from pyspark.ml.tuning import ParamGridBuilder
from pyspark.ml.tuning import CrossValidator
from pyspark.ml.tuning import TrainValidationSplit
from pyspark.ml import PipelineModel
from pyspark.mllib.linalg import Vectors
from pyspark.ml.feature import PCA
 
import pandas as pd
import numpy as np
import seaborn as sns
import matplotlib.pyplot as plt
from pyspark.ml import Pipeline

#### Data

#### Load Data

In [33]:
spark = SparkSession.builder.appName("Load_Data").getOrCreate()

flight_path = 'gs://msca-bdp-student-gcs/Group4_Final_Project/archive/Combined_Flights_2018-2022.parquet'

flight = spark.read.parquet(flight_path)

weather_path = 'gs://msca-bdp-student-gcs/Group4_Final_Project/archive/combined_weather_data'

weather = spark.read.parquet(weather_path)

airport_path = 'gs://msca-bdp-student-gcs/Group4_Final_Project/archive/Airports'

airport = spark.read.parquet(airport_path)

##### 1. Flight Data

In [34]:
flight.printSchema()

root
 |-- FlightDate: timestamp (nullable = true)
 |-- Airline: string (nullable = true)
 |-- Origin: string (nullable = true)
 |-- Dest: string (nullable = true)
 |-- Cancelled: boolean (nullable = true)
 |-- Diverted: boolean (nullable = true)
 |-- CRSDepTime: long (nullable = true)
 |-- DepTime: double (nullable = true)
 |-- DepDelayMinutes: double (nullable = true)
 |-- DepDelay: double (nullable = true)
 |-- ArrTime: double (nullable = true)
 |-- ArrDelayMinutes: double (nullable = true)
 |-- AirTime: double (nullable = true)
 |-- CRSElapsedTime: double (nullable = true)
 |-- ActualElapsedTime: double (nullable = true)
 |-- Distance: double (nullable = true)
 |-- Year: long (nullable = true)
 |-- Quarter: long (nullable = true)
 |-- Month: long (nullable = true)
 |-- DayofMonth: long (nullable = true)
 |-- DayOfWeek: long (nullable = true)
 |-- Marketing_Airline_Network: string (nullable = true)
 |-- Operated_or_Branded_Code_Share_Partners: string (nullable = true)
 |-- DOT_ID_Mar

In [35]:
# Check the number of records loaded
print('Number of flight records loaded: ' + f'{flight.count():,}')



Number of flight records loaded: 29,193,782


                                                                                

In [36]:
yearly_counts = flight.groupBy("Year").agg(F.count("*").alias("Count of Records")).orderBy("Year")

yearly_counts.show()



+----+----------------+
|Year|Count of Records|
+----+----------------+
|2018|         5689512|
|2019|         8091684|
|2020|         5022397|
|2021|         6311871|
|2022|         4078318|
+----+----------------+



                                                                                

##### 2. Weather Data

In [37]:
weather.show(5)

[Stage 17:>                                                         (0 + 1) / 1]

+-----------+----------+----------+-----------+---------+-----------------+----+---------------+----+---------------+------+--------------+-----+--------------+-----+----------------+----+---------------+-----+-----+----+--------------+----+--------------+-----+---------------+-----+------+
|    STATION|      DATE|  LATITUDE|  LONGITUDE|ELEVATION|             NAME|TEMP|TEMP_ATTRIBUTES|DEWP|DEWP_ATTRIBUTES|   SLP|SLP_ATTRIBUTES|  STP|STP_ATTRIBUTES|VISIB|VISIB_ATTRIBUTES|WDSP|WDSP_ATTRIBUTES|MXSPD| GUST| MAX|MAX_ATTRIBUTES| MIN|MIN_ATTRIBUTES| PRCP|PRCP_ATTRIBUTES| SNDP|FRSHTT|
+-----------+----------+----------+-----------+---------+-----------------+----+---------------+----+---------------+------+--------------+-----+--------------+-----+----------------+----+---------------+-----+-----+----+--------------+----+--------------+-----+---------------+-----+------+
|71817099999|2021-12-28|      52.3|-55.8333333|     12.0|MARYS HARBOUR, CA|30.4|             24|27.4|             24|1006.0|

                                                                                

In [38]:
weather.printSchema()

root
 |-- STATION: string (nullable = true)
 |-- DATE: string (nullable = true)
 |-- LATITUDE: double (nullable = true)
 |-- LONGITUDE: double (nullable = true)
 |-- ELEVATION: double (nullable = true)
 |-- NAME: string (nullable = true)
 |-- TEMP: double (nullable = true)
 |-- TEMP_ATTRIBUTES: integer (nullable = true)
 |-- DEWP: double (nullable = true)
 |-- DEWP_ATTRIBUTES: integer (nullable = true)
 |-- SLP: double (nullable = true)
 |-- SLP_ATTRIBUTES: integer (nullable = true)
 |-- STP: double (nullable = true)
 |-- STP_ATTRIBUTES: integer (nullable = true)
 |-- VISIB: double (nullable = true)
 |-- VISIB_ATTRIBUTES: integer (nullable = true)
 |-- WDSP: double (nullable = true)
 |-- WDSP_ATTRIBUTES: integer (nullable = true)
 |-- MXSPD: double (nullable = true)
 |-- GUST: double (nullable = true)
 |-- MAX: double (nullable = true)
 |-- MAX_ATTRIBUTES: string (nullable = true)
 |-- MIN: double (nullable = true)
 |-- MIN_ATTRIBUTES: string (nullable = true)
 |-- PRCP: double (nullab

In [39]:
# Check the number of records loaded
print('Number of weather records loaded: ' + f'{weather.count():,}')



Number of weather records loaded: 20,437,888


                                                                                

##### 3. Airport Data

In [45]:
airport.show(5)

+--------------+----------+-------+--------------------+------------------------------+-------------------+-----------+--------------------+------------------------+------------------+------------------+------------------+------------------+--------------+-----------------------------+-----------------------+---------------+-----------+--------------+-----------+-----------+-----------+-----------+--------------+-----------+-----------+-------------+------------------------+------------------+-----------------+-----------------+-----------------+----+
|AIRPORT_SEQ_ID|AIRPORT_ID|AIRPORT|DISPLAY_AIRPORT_NAME|DISPLAY_AIRPORT_CITY_NAME_FULL|AIRPORT_WAC_SEQ_ID2|AIRPORT_WAC|AIRPORT_COUNTRY_NAME|AIRPORT_COUNTRY_CODE_ISO|AIRPORT_STATE_NAME|AIRPORT_STATE_CODE|AIRPORT_STATE_FIPS|CITY_MARKET_SEQ_ID|CITY_MARKET_ID|DISPLAY_CITY_MARKET_NAME_FULL|CITY_MARKET_WAC_SEQ_ID2|CITY_MARKET_WAC|LAT_DEGREES|LAT_HEMISPHERE|LAT_MINUTES|LAT_SECONDS|   LATITUDE|LON_DEGREES|LON_HEMISPHERE|LON_MINUTES|LON_SECONDS|  

In [46]:
airport.printSchema()

root
 |-- AIRPORT_SEQ_ID: integer (nullable = true)
 |-- AIRPORT_ID: integer (nullable = true)
 |-- AIRPORT: string (nullable = true)
 |-- DISPLAY_AIRPORT_NAME: string (nullable = true)
 |-- DISPLAY_AIRPORT_CITY_NAME_FULL: string (nullable = true)
 |-- AIRPORT_WAC_SEQ_ID2: integer (nullable = true)
 |-- AIRPORT_WAC: integer (nullable = true)
 |-- AIRPORT_COUNTRY_NAME: string (nullable = true)
 |-- AIRPORT_COUNTRY_CODE_ISO: string (nullable = true)
 |-- AIRPORT_STATE_NAME: string (nullable = true)
 |-- AIRPORT_STATE_CODE: string (nullable = true)
 |-- AIRPORT_STATE_FIPS: integer (nullable = true)
 |-- CITY_MARKET_SEQ_ID: integer (nullable = true)
 |-- CITY_MARKET_ID: integer (nullable = true)
 |-- DISPLAY_CITY_MARKET_NAME_FULL: string (nullable = true)
 |-- CITY_MARKET_WAC_SEQ_ID2: integer (nullable = true)
 |-- CITY_MARKET_WAC: integer (nullable = true)
 |-- LAT_DEGREES: integer (nullable = true)
 |-- LAT_HEMISPHERE: string (nullable = true)
 |-- LAT_MINUTES: integer (nullable = true)


In [47]:
# Check the number of records loaded
print('Number of airport records loaded: ' + f'{airport.count():,}')

Number of airport records loaded: 13,386


##### 4. Airlines Data

In [52]:
airline.show()

+----+--------------------+
|Code|         Description|
+----+--------------------+
| 02Q|       Titan Airways|
| 04Q|  Tradewind Aviation|
| 05Q| Comlux Aviation, AG|
| 06Q|Master Top Linhas...|
| 07Q| Flair Airlines Ltd.|
| 09Q|      Swift Air, LLC|
| 0BQ|                 DCA|
| 0CQ|ACM AIR CHARTER GmbH|
| 0GQ|Inter Island Airw...|
| 0HQ|Polar Airlines de...|
|  0J|          JetClub AG|
| 0JQ|     Vision Airlines|
| 0LQ|   Metropix UK, LLP.|
| 0MQ|Multi-Aero, Inc. ...|
| 0OQ|          Open Skies|
|  0Q| Flying Service N.V.|
| 0QQ|TAG Aviation (UK)...|
| 0RQ|TAG Aviation Espa...|
| 0TQ|  Corporatejets, XXI|
| 0UQ|  Comlux Malta, Ltd.|
+----+--------------------+
only showing top 20 rows



In [53]:
# Check the number of records loaded
print('Number of airline records loaded: ' + f'{airline.count():,}')

Number of airline records loaded: 1,571


#### Data Transformation

##### 1. Airport Data

In [71]:
# Filter US airport only 
def airport_transform(dataframe):
  return (
    dataframe
    .filter("AIRPORT_COUNTRY_CODE_ISO = 'US'")
    .dropDuplicates(['AIRPORT'])
    )

In [72]:
airport = airport_transform(airport)
airport.show(5)

+--------------+----------+-------+--------------------+------------------------------+-------------------+-----------+--------------------+------------------------+------------------+------------------+------------------+------------------+--------------+-----------------------------+-----------------------+---------------+-----------+--------------+-----------+-----------+-----------+-----------+--------------+-----------+-----------+-------------+------------------------+------------------+-----------------+-----------------+-----------------+----+
|AIRPORT_SEQ_ID|AIRPORT_ID|AIRPORT|DISPLAY_AIRPORT_NAME|DISPLAY_AIRPORT_CITY_NAME_FULL|AIRPORT_WAC_SEQ_ID2|AIRPORT_WAC|AIRPORT_COUNTRY_NAME|AIRPORT_COUNTRY_CODE_ISO|AIRPORT_STATE_NAME|AIRPORT_STATE_CODE|AIRPORT_STATE_FIPS|CITY_MARKET_SEQ_ID|CITY_MARKET_ID|DISPLAY_CITY_MARKET_NAME_FULL|CITY_MARKET_WAC_SEQ_ID2|CITY_MARKET_WAC|LAT_DEGREES|LAT_HEMISPHERE|LAT_MINUTES|LAT_SECONDS|   LATITUDE|LON_DEGREES|LON_HEMISPHERE|LON_MINUTES|LON_SECONDS|  

In [73]:
# Keep airports that co-exist in the airline dataset
flight.createOrReplaceTempView('flight')
airport.createOrReplaceTempView('airport')

filtered_airport = spark.sql(
  """
  SELECT * 
  FROM airport 
  WHERE 
    AIRPORT IN (
      SELECT DISTINCT ORIGIN FROM flight AS airline_subquery
      UNION
      SELECT DISTINCT DEST FROM flight AS airline_subquery
    )
  """
)

In [83]:
airport = spark.read.option("header", "true").parquet("gs://msca-bdp-student-gcs/Group4_Final_Project/archive/Flight Delay Status Classification/filtered_airport")

airport.createOrReplaceTempView('airport')

print('Number of airports that co-exist in the airline dataset: ' + f'{airport.count():,}')

Number of airports that co-exist in the airline dataset: 387


##### 2. Flight Data

In [94]:
def flight_transform(dataframe):
    selected_col = ["Year", "Quarter", "Month", "DayofMonth", "DayOfWeek", "FlightDate",
                    "Operating_Airline", "Tail_Number", "Flight_Number_Operating_Airline", 
                    "Origin", "OriginCityName", "OriginState", "Dest", "DestCityName", 
                    "DestState", "CRSDepTime", "DepTime", "DepDelayMinutes", "DepTimeBlk", 
                    "CRSArrTime", "ArrTime", "ArrDelayMinutes", "ArrTimeBlk", "Distance", 
                    "DistanceGroup", "DepDel15", "ArrDel15", "OriginAirportID", 
                    "DestAirportID", "PR_ARR_DEL15"]

    # Creating a window partition to extract prior arrival delay
    windowSpec = Window.partitionBy("Tail_Number").orderBy("CRSDepTime")

    return (
        dataframe
        .filter("Cancelled != 1 AND Diverted != 1")
        .withColumn("FlightDate", F.col("FlightDate").cast("date"))
        .withColumn("Flight_Number_Operating_Airline", F.col("Flight_Number_Operating_Airline").cast("string"))
        .withColumn("DepTime", F.col("DepTimeBlk").substr(1, 2).cast("int")) 
        .withColumn("ArrTime", F.col("ArrTimeBlk").substr(1, 2).cast("int"))
        .withColumn("DistanceGroup", F.col("DistanceGroup").cast("string"))
        .withColumn("DepDel15", F.col("DepDel15").cast("string"))
        .withColumn("ArrDel15", F.col("ArrDel15").cast("string"))
        .withColumn("Year", F.col("Year").cast("string"))
        .withColumn("Quarter", F.col("Quarter").cast("string"))
        .withColumn("Month", F.col("Month").cast("string"))
        .withColumn("DayofMonth", F.col("DayofMonth").cast("string"))
        .withColumn("DayofWeek", F.col("DayofWeek").cast("string"))
        .withColumn("PR_ARR_DEL15", F.lag(F.col("ArrDel15"), 1).over(windowSpec).cast("string"))
        .select(selected_col)
    )

In [95]:
cleaned_flight = flight_transform(flight)

In [97]:
cleaned_flight.show(10)

[Stage 128:>                                                        (0 + 1) / 1]

+----+-------+-----+----------+---------+----------+-----------------+-----------+-------------------------------+------+----------------+-----------+----+--------------------+---------+----------+-------+---------------+----------+----------+-------+---------------+----------+--------+-------------+--------+--------+---------------+-------------+------------+
|Year|Quarter|Month|DayofMonth|DayOfWeek|FlightDate|Operating_Airline|Tail_Number|Flight_Number_Operating_Airline|Origin|  OriginCityName|OriginState|Dest|        DestCityName|DestState|CRSDepTime|DepTime|DepDelayMinutes|DepTimeBlk|CRSArrTime|ArrTime|ArrDelayMinutes|ArrTimeBlk|Distance|DistanceGroup|DepDel15|ArrDel15|OriginAirportID|DestAirportID|PR_ARR_DEL15|
+----+-------+-----+----------+---------+----------+-----------------+-----------+-------------------------------+------+----------------+-----------+----+--------------------+---------+----------+-------+---------------+----------+----------+-------+---------------+-------

                                                                                

In [99]:
print('Total flight data number: ' + f'{cleaned_flight.count():,}')



Total flight data number: 28,348,168


                                                                                

In [9]:
target_features = ["CRSDepTime", "DepDelayMinutes", "CRSArrTime", "ArrDelay", "Distance"]

# Select the target features from the DataFrame
selected_df = flight.select(target_features)

# Convert the selected DataFrame to Pandas
selected_pandas_df = selected_df.toPandas()

# Calculate the correlation matrix
corr_matrix = selected_pandas_df.corr()

# Display the correlation matrix
print(corr_matrix)

23/11/14 23:38:16 ERROR org.apache.spark.scheduler.cluster.YarnSchedulerBackend$YarnSchedulerEndpoint: Sending KillExecutors(ArrayBuffer(5)) to AM was unsuccessful
org.apache.spark.rpc.RpcTimeoutException: Cannot receive any reply from hub-msca-bdp-dphub-students-jiaweixu-m.c.msca-bdp-student-ap.internal:39427 in 120 seconds. This timeout is controlled by spark.rpc.askTimeout
	at org.apache.spark.rpc.RpcTimeout.org$apache$spark$rpc$RpcTimeout$$createRpcTimeoutException(RpcTimeout.scala:47)
	at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:62)
	at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:58)
	at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:38)
	at scala.util.Failure.recover(Try.scala:234)
	at scala.concurrent.Future.$anonfun$recover$1(Future.scala:395)
	at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:33)
	at scala.concurrent.impl.Promise.$anonfun$tra

Py4JJavaError: An error occurred while calling o133.collectToPython.
: java.lang.OutOfMemoryError: GC overhead limit exceeded


In [None]:
mask = np.zeros_like(corr, dtype=np.bool)
mask[np.triu_indices_from(mask)] = True
cmap = sns.diverging_palette(240, 10, as_cmap=True)
fig, ax = plt.subplots(figsize=(20,10))
sns.heatmap(corr, mask=mask, cmap=cmap, center=0, linewidths=.5)