In [1]:
from itertools import groupby

# Cell 1: Import necessary libraries
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import joblib
from fontTools.ttLib.tables.S__i_l_f import assemble
from sklearn.impute import SimpleImputer
from sklearn.preprocessing import OneHotEncoder, StandardScaler
from sklearn.decomposition import PCA
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split, RandomizedSearchCV
from sklearn.metrics import accuracy_score, classification_report, confusion_matrix, ConfusionMatrixDisplay
from imblearn.over_sampling import SMOTE
from xgboost import XGBClassifier



In [2]:
from pyspark.sql import SparkSession

In [3]:
spark = (
    SparkSession.builder
    .appName("FlightDelayRF")
    .master("local[*]")
    .config("spark.driver.memory", "6g")   # or 8g if you have it
    .config("spark.executor.memory", "6g")
    .getOrCreate()
)


Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
25/12/13 22:31:26 WARN Utils: Your hostname, RAMKRISHNAs-MacBook-Air.local, resolves to a loopback address: 127.0.0.1; using 192.168.68.100 instead (on interface en0)
25/12/13 22:31:26 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/12/13 22:31:27 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [4]:
spark

In [5]:
df = spark.read.csv("/Users/ramkrishnakhan/PycharmProjects/airlines_delay_prediction(Spark)/flight_data_2024.csv",header=True,inferSchema=True)
df.printSchema()




root
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- day_of_month: integer (nullable = true)
 |-- day_of_week: integer (nullable = true)
 |-- fl_date: date (nullable = true)
 |-- op_unique_carrier: string (nullable = true)
 |-- op_carrier_fl_num: double (nullable = true)
 |-- origin: string (nullable = true)
 |-- origin_city_name: string (nullable = true)
 |-- origin_state_nm: string (nullable = true)
 |-- dest: string (nullable = true)
 |-- dest_city_name: string (nullable = true)
 |-- dest_state_nm: string (nullable = true)
 |-- crs_dep_time: integer (nullable = true)
 |-- dep_time: double (nullable = true)
 |-- dep_delay: double (nullable = true)
 |-- taxi_out: double (nullable = true)
 |-- wheels_off: double (nullable = true)
 |-- wheels_on: double (nullable = true)
 |-- taxi_in: double (nullable = true)
 |-- crs_arr_time: integer (nullable = true)
 |-- arr_time: double (nullable = true)
 |-- arr_delay: double (nullable = true)
 |-- cancelled: intege

                                                                                

In [25]:
df.show(1)

+----+-----+------------+-----------+----------+-----------------+-----------------+------+----------------+---------------+----+--------------+-------------+------------+--------+---------+--------+----------+---------+-------+------------+--------+---------+---------+-----------------+--------+----------------+-------------------+--------+--------+-------------+-------------+---------+--------------+-------------------+
|year|month|day_of_month|day_of_week|   fl_date|op_unique_carrier|op_carrier_fl_num|origin|origin_city_name|origin_state_nm|dest|dest_city_name|dest_state_nm|crs_dep_time|dep_time|dep_delay|taxi_out|wheels_off|wheels_on|taxi_in|crs_arr_time|arr_time|arr_delay|cancelled|cancellation_code|diverted|crs_elapsed_time|actual_elapsed_time|air_time|distance|carrier_delay|weather_delay|nas_delay|security_delay|late_aircraft_delay|
+----+-----+------------+-----------+----------+-----------------+-----------------+------+----------------+---------------+----+--------------+----

In [26]:
print(df.count())



7079081


                                                                                

In [27]:
df.describe().show()



+-------+-------+-----------------+------------------+------------------+-----------------+------------------+-------+----------------+---------------+-------+--------------+-------------+------------------+------------------+------------------+------------------+------------------+------------------+----------------+------------------+------------------+------------------+--------------------+-----------------+--------------------+-----------------+-------------------+------------------+-----------------+------------------+------------------+------------------+--------------------+-------------------+
|summary|   year|            month|      day_of_month|       day_of_week|op_unique_carrier| op_carrier_fl_num| origin|origin_city_name|origin_state_nm|   dest|dest_city_name|dest_state_nm|      crs_dep_time|          dep_time|         dep_delay|          taxi_out|        wheels_off|         wheels_on|         taxi_in|      crs_arr_time|          arr_time|         arr_delay|           can

                                                                                

In [28]:
###checking for missing values
from pyspark.sql.functions import col, sum as spark_sum
missing_count=df.select([
    spark_sum(col(c).isNull().cast("int")).alias(c) for c in df.columns
])
missing_count.show()



+----+-----+------------+-----------+-------+-----------------+-----------------+------+----------------+---------------+----+--------------+-------------+------------+--------+---------+--------+----------+---------+-------+------------+--------+---------+---------+-----------------+--------+----------------+-------------------+--------+--------+-------------+-------------+---------+--------------+-------------------+
|year|month|day_of_month|day_of_week|fl_date|op_unique_carrier|op_carrier_fl_num|origin|origin_city_name|origin_state_nm|dest|dest_city_name|dest_state_nm|crs_dep_time|dep_time|dep_delay|taxi_out|wheels_off|wheels_on|taxi_in|crs_arr_time|arr_time|arr_delay|cancelled|cancellation_code|diverted|crs_elapsed_time|actual_elapsed_time|air_time|distance|carrier_delay|weather_delay|nas_delay|security_delay|late_aircraft_delay|
+----+-----+------------+-----------+-------+-----------------+-----------------+------+----------------+---------------+----+--------------+-------------

                                                                                

In [6]:
###dropping rows with missing values in critical columns
df=df.na.drop(subset=['dep_time','dep_delay','arr_time','arr_delay',])

In [7]:
cols_to_exclude = [
    "dep_time",
    "dep_delay",
    "taxi_out",
    "wheels_off",
    "wheels_on",
    "taxi_in",
    "arr_time",
    "actual_elapsed_time",
    "air_time",
    "carrier_delay",
    "weather_delay",
    "nas_delay",
    "security_delay",
    "late_aircraft_delay",
    "cancellation_code",
    "cancelled",
    "diverted",
    "fl_date",
    "op_carrier_fl_num",
    "origin_city_name",
    "dest_city_name"
]


In [8]:
###drooping unnecessary/post-departure columns as we are trying to predict delay before the flight takes off
df=(df
    .drop(*cols_to_exclude)
    .withColumn("label", (df.arr_delay > 15).cast("int"))
)

In [9]:
from pyspark.sql.functions import col,floor
df=df.withColumn(
    "crs_dep_hour",
   floor(col("crs_dep_time") / 100).cast("int"))

In [10]:
df=df.drop("arr_delay","crs_dep_time")

In [11]:
df.printSchema()

root
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- day_of_month: integer (nullable = true)
 |-- day_of_week: integer (nullable = true)
 |-- op_unique_carrier: string (nullable = true)
 |-- origin: string (nullable = true)
 |-- origin_state_nm: string (nullable = true)
 |-- dest: string (nullable = true)
 |-- dest_state_nm: string (nullable = true)
 |-- crs_arr_time: integer (nullable = true)
 |-- crs_elapsed_time: double (nullable = true)
 |-- distance: double (nullable = true)
 |-- label: integer (nullable = true)
 |-- crs_dep_hour: integer (nullable = true)



In [12]:
df.select('label').distinct().show()



+-----+
|label|
+-----+
|    1|
|    0|
+-----+



                                                                                

In [13]:
cat_cols = [
    "op_unique_carrier",
    "origin",
    "dest",
    "origin_state_nm",
    "dest_state_nm"
]


In [38]:
num_cols = [
    "year",
    "month",
    "day_of_month",
    "day_of_week",
    "crs_dep_hour",
    "crs_elapsed_time",
    "distance",
    "carrier_delay_rate",
    "origin_delay_rate",
    "dest_delay_rate",
    "route_delay_rate"
]



In [39]:
from pyspark.ml.feature import StringIndexer
indexers=[
    StringIndexer(
        inputCol=c,
        outputCol=f"{c}_idx",
        handleInvalid="keep"
    )
    for c in cat_cols
]

In [40]:
from pyspark.ml.feature import VectorAssembler

feature_cols=[f"{c}_idx" for c in cat_cols]+num_cols

assemble=VectorAssembler(
    inputCols=feature_cols,
    outputCol="features"
)

In [41]:
from pyspark.ml.classification import RandomForestClassifier

rf=RandomForestClassifier(
    labelCol="label",
    featuresCol="features",
    numTrees=30,
    maxDepth=8,
    maxBins=347,
    minInstancesPerNode=100,
    seed=42
)

In [42]:
from pyspark.ml import Pipeline
pipeline=Pipeline(
    stages=indexers+[assemble,rf]
)

In [19]:
train_df = df.filter("month <= 9")
test_df  = df.filter("month > 9")


In [20]:
train_df.count()


                                                                                

5199988

In [None]:
###Adding historical features

In [32]:
from pyspark.sql.functions import avg
###Carrier delay rate
carries_hist=(
    train_df
    .groupby("op_unique_carrier")
    .agg(avg("label").alias("carrier_delay_rate"))
)

In [33]:
###Origin airport delay rate
origin_hist=(
    train_df
    .groupby("origin")
    .agg(avg("label").alias("origin_delay_rate"))
)

In [35]:
####Destination airport delay rate
dest_hist=(
    train_df
    .groupby("dest")
    .agg(avg("label").alias("dest_delay_rate"))
)

In [36]:
###route delay rate
route_hist = (
    train_df
    .groupBy("origin", "dest")
    .agg(avg("label").alias("route_delay_rate"))
)


In [37]:
def add_hist_features(df):
    return (
        df
        .join(carries_hist, "op_unique_carrier", "left")
        .join(origin_hist, "origin", "left")
        .join(dest_hist, "dest", "left")
        .join(route_hist, ["origin", "dest"], "left")
        .fillna(0.0)
    )

train_df = add_hist_features(train_df)
test_df  = add_hist_features(test_df)


In [43]:
rf_model = pipeline.fit(train_df)


25/12/13 23:03:57 WARN DAGScheduler: Broadcasting large task binary with size 1184.2 KiB
25/12/13 23:04:06 WARN DAGScheduler: Broadcasting large task binary with size 2.2 MiB

In [44]:
predictions = rf_model.transform(test_df)

predictions.select(
    "label", "prediction", "probability"
).show(5, truncate=False)




+-----+----------+----------------------------------------+
|label|prediction|probability                             |
+-----+----------+----------------------------------------+
|0    |0.0       |[0.8453732755886184,0.1546267244113816] |
|0    |0.0       |[0.8348296035388334,0.16517039646116657]|
|0    |0.0       |[0.8461021290972329,0.15389787090276708]|
|0    |0.0       |[0.7835242052669878,0.21647579473301215]|
|0    |0.0       |[0.790391494706715,0.20960850529328495] |
+-----+----------+----------------------------------------+
only showing top 5 rows


                                                                                

In [45]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator
evaluator=BinaryClassificationEvaluator(
    labelCol="label",
    metricName="areaUnderROC"
)
auc=evaluator.evaluate(predictions)
print(f"Test AUC: {auc:.4f}")

                                                                                

Test AUC: 0.6013


In [29]:
predictions.groupBy("label").count().show()




+-----+-------+
|label|  count|
+-----+-------+
|    1| 277371|
|    0|1487908|
+-----+-------+



                                                                                

In [46]:
rf_stage = rf_model.stages[-1]

for name, score in sorted(
    zip(feature_cols, rf_stage.featureImportances),
    key=lambda x: x[1],
    reverse=True
):
    print(f"{name:25s} {score:.4f}")


route_delay_rate          0.3660
crs_dep_hour              0.3592
month                     0.0998
carrier_delay_rate        0.0781
origin_delay_rate         0.0396
dest_delay_rate           0.0148
op_unique_carrier_idx     0.0130
day_of_week               0.0087
origin_idx                0.0060
day_of_month              0.0054
origin_state_nm_idx       0.0046
dest_idx                  0.0032
crs_elapsed_time          0.0011
distance                  0.0003
dest_state_nm_idx         0.0002
year                      0.0000


In [53]:
from pyspark.ml.classification import GBTClassifier

gbt = GBTClassifier(
    labelCol="label",
    featuresCol="features",
    maxDepth=6,
    maxIter=40,
    maxBins=347,
    seed=42
)


In [54]:
from pyspark.ml import Pipeline

pipeline = Pipeline(
    stages=indexers + [assemble, gbt]
)


In [55]:
train_df = train_df.cache()
train_df.count()   # materialize

model = pipeline.fit(train_df)


25/12/13 23:09:53 WARN CacheManager: Asked to cache already cached data.
25/12/13 23:15:27 WARN DAGScheduler: Broadcasting large task binary with size 1038.3 KiB
25/12/13 23:15:35 WARN DAGScheduler: Broadcasting large task binary with size 1038.8 KiB
25/12/13 23:15:40 WARN DAGScheduler: Broadcasting large task binary with size 1043.5 KiB
25/12/13 23:15:44 WARN DAGScheduler: Broadcasting large task binary with size 1046.9 KiB
25/12/13 23:15:49 WARN DAGScheduler: Broadcasting large task binary with size 1052.2 KiB
25/12/13 23:15:54 WARN DAGScheduler: Broadcasting large task binary with size 1069.9 KiB
25/12/13 23:16:00 WARN DAGScheduler: Broadcasting large task binary with size 1089.4 KiB
25/12/13 23:16:08 WARN DAGScheduler: Broadcasting large task binary with size 1089.9 KiB
25/12/13 23:16:12 WARN DAGScheduler: Broadcasting large task binary with size 1094.7 KiB
25/12/13 23:16:17 WARN DAGScheduler: Broadcasting large task binary with size 1097.5 KiB
25/12/13 23:16:22 WARN DAGScheduler: 

In [56]:
preds = model.transform(test_df)

from pyspark.ml.evaluation import BinaryClassificationEvaluator

evaluator = BinaryClassificationEvaluator(
    labelCol="label",
    metricName="areaUnderROC"
)

auc = evaluator.evaluate(preds)
print("AUC:", auc)


25/12/13 23:36:01 WARN DAGScheduler: Broadcasting large task binary with size 2.9 MiB
25/12/13 23:36:01 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS

AUC: 0.6313798711285877


                                                                                