In [1]:
import os
import sys
os.environ["JAVA_HOME"] = "change location"
os.environ["JDK_JAVA_OPTIONS"] = "-Djava.security.manager=allow"
os.environ["PYSPARK_PYTHON"] = sys.executable
os.environ["PYSPARK_DRIVER_PYTHON"] = sys.executable

In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when, avg, floor, concat_ws, count
from pyspark.sql.functions import to_date, month, dayofmonth
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler, MinMaxScaler
from pyspark.ml import Pipeline
from pyspark.ml.classification import RandomForestClassifier, GBTClassifier, LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator
import pickle

In [3]:
spark = SparkSession.builder \
    .appName("FlightDelayClassification") \
    .config("spark.driver.memory", "4g") \
    .master("local[*]") \
    .getOrCreate()

spark.sparkContext.setLogLevel("ERROR")

NOTE: Picked up JDK_JAVA_OPTIONS: -Djava.security.manager=allow
NOTE: Picked up JDK_JAVA_OPTIONS: -Djava.security.manager=allow
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/12/11 19:14:56 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [4]:
df = spark.read.csv("Flight_delay.csv", header=True, inferSchema=True)


                                                                                

In [5]:
df_clean = df.filter((col("Cancelled") == 0) & (col("Diverted") == 0)) \
             .drop("Cancelled", "Diverted") \
             .withColumn("ArrDelay", col("ArrDelay").cast("double")) \
             .withColumn("DepDelay", col("DepDelay").cast("double"))

In [6]:
df_clean = df_clean.withColumn("Date", to_date(col("Date"), "dd-MM-yyyy"))
df_clean = df_clean.withColumn("Month", month(col("Date")))
df_clean = df_clean.withColumn("Day", dayofmonth(col("Date")))
df_clean = df_clean.withColumn("DepHour", (col("DepTime") / 100).cast("int"))
df_clean = df_clean.withColumn("IsWeekend", when(col("DayOfWeek").isin([6, 7]), 1).otherwise(0))
df_clean = df_clean.withColumn("CRSArrHour", floor(col("CRSArrTime") / 100))
df_clean = df_clean.withColumn("Route", concat_ws("-", col("Origin"), col("Dest")))

df_clean = df_clean.withColumn("TimeOfDay",
    when(col("DepHour").between(6, 11), "Morning")
    .when(col("DepHour").between(12, 17), "Afternoon")
    .when(col("DepHour").between(18, 21), "Evening")
    .otherwise("Night")
)

df_clean = df_clean.withColumn("DistanceCategory",
    when(col("Distance") < 500, "Short")
    .when(col("Distance").between(500, 1500), "Medium")
    .otherwise("Long")
)

df_clean = df_clean.withColumn("IsRushHour",
    when((col("DepHour").between(6, 9)) | (col("DepHour").between(17, 20)), 1)
    .otherwise(0)
)

In [7]:
carrier_stats = df_clean.groupBy("UniqueCarrier").agg(
    avg("ArrDelay").alias("AvgCarrierDelay"),
    count("*").alias("CarrierFlightCount")
)
df_clean = df_clean.join(carrier_stats, "UniqueCarrier", "left")

hour_stats = df_clean.groupBy("DepHour").agg(avg("ArrDelay").alias("AvgHourDelay"))
df_clean = df_clean.join(hour_stats, "DepHour", "left")

route_stats = df_clean.groupBy("Route").agg(
    avg("ArrDelay").alias("AvgRouteDelay"),
    count("*").alias("RouteFlightCount")
)
df_clean = df_clean.join(route_stats, "Route", "left")

origin_stats = df_clean.groupBy("Origin").agg(avg("ArrDelay").alias("AvgOriginDelay"))
df_clean = df_clean.join(origin_stats, "Origin", "left")

dest_stats = df_clean.groupBy("Dest").agg(avg("ArrDelay").alias("AvgDestDelay"))
df_clean = df_clean.join(dest_stats, "Dest", "left")

carrier_hour_stats = df_clean.groupBy("UniqueCarrier", "DepHour").agg(
    avg("ArrDelay").alias("AvgCarrierHourDelay")
)
df_clean = df_clean.join(carrier_hour_stats, ["UniqueCarrier", "DepHour"], "left")

agg_cols = ["AvgCarrierDelay", "AvgHourDelay", "AvgRouteDelay",
            "AvgOriginDelay", "AvgDestDelay", "AvgCarrierHourDelay"]
for col_name in agg_cols:
    df_clean = df_clean.withColumn(col_name,
        when(col(col_name).isNull(), 0).otherwise(col(col_name))
    )

In [9]:
df_clean = df_clean.withColumn("DelayOver30",
    when(col("ArrDelay") > 30, 1).otherwise(0)
)

df_clean = df_clean.withColumn("DelayOver60",
    when(col("ArrDelay") > 60, 1).otherwise(0)
)

df_clean = df_clean.dropna(subset=["ArrDelay", "UniqueCarrier", "Origin", "Dest"])
print(f"Row count after cleaning: {df_clean.count()}")

df_clean.groupBy("DelayOver30").count().show()
df_clean.groupBy("DelayOver60").count().show()

Row count after cleaning: 484551
+-----------+------+
|DelayOver30| count|
+-----------+------+
|          1|316994|
|          0|167557|
+-----------+------+

+-----------+------+
|DelayOver60| count|
+-----------+------+
|          1|163362|
|          0|321189|
+-----------+------+



In [10]:
numeric_features = [
    'DayOfWeek', 'Month', 'Day', 'DepHour', 'IsWeekend', 'IsRushHour',
    'Distance', 'CRSElapsedTime', 'CRSArrHour',
    'AvgCarrierDelay', 'AvgHourDelay', 'AvgRouteDelay',
    'AvgOriginDelay', 'AvgDestDelay', 'AvgCarrierHourDelay',
    'CarrierFlightCount', 'RouteFlightCount'
]

categorical_features = ['UniqueCarrier', 'Origin', 'Dest', 'TimeOfDay', 'DistanceCategory']

In [11]:
def create_classification_pipeline(label_col):
    stages = []

    for cat_col in categorical_features:
        indexer = StringIndexer(inputCol=cat_col, outputCol=f"{cat_col}Index", handleInvalid="keep")
        encoder = OneHotEncoder(inputCol=f"{cat_col}Index", outputCol=f"{cat_col}Vec")
        stages.extend([indexer, encoder])

    assembler_inputs = [f"{c}Vec" for c in categorical_features] + numeric_features
    assembler = VectorAssembler(inputCols=assembler_inputs, outputCol="features_raw")
    stages.append(assembler)

    scaler = MinMaxScaler(inputCol="features_raw", outputCol="features")
    stages.append(scaler)

    classifier = RandomForestClassifier(
        featuresCol="features",
        labelCol=label_col,
        numTrees=50,
        maxDepth=10,
        seed=42
    )
    stages.append(classifier)

    return Pipeline(stages=stages)

In [12]:
all_features = list(set(numeric_features + categorical_features + ["DelayOver30", "DelayOver60"]))
df_model = df_clean.select(all_features)

In [13]:
train_data, test_data = df_model.randomSplit([0.8, 0.2], seed=42)
print(f"Training set: {train_data.count()}")
print(f"Test set: {test_data.count()}")

                                                                                

Training set: 387868


                                                                                

Test set: 96683


In [14]:
pipeline_30 = create_classification_pipeline("DelayOver30")
model_30 = pipeline_30.fit(train_data)
predictions_30 = model_30.transform(test_data)

evaluator_auc = BinaryClassificationEvaluator(labelCol="DelayOver30", metricName="areaUnderROC")
evaluator_acc = MulticlassClassificationEvaluator(labelCol="DelayOver30", metricName="accuracy")

auc_30 = evaluator_auc.evaluate(predictions_30)
acc_30 = evaluator_acc.evaluate(predictions_30)

print(f"  AUC-ROC: {auc_30:.4f}")
print(f"  Accuracy: {acc_30:.4f}")



  AUC-ROC: 0.6521
  Accuracy: 0.6600


                                                                                

In [15]:
pipeline_60 = create_classification_pipeline("DelayOver60")
model_60 = pipeline_60.fit(train_data)

predictions_60 = model_60.transform(test_data)

evaluator_auc_60 = BinaryClassificationEvaluator(labelCol="DelayOver60", metricName="areaUnderROC")
evaluator_acc_60 = MulticlassClassificationEvaluator(labelCol="DelayOver60", metricName="accuracy")

auc_60 = evaluator_auc_60.evaluate(predictions_60)
acc_60 = evaluator_acc_60.evaluate(predictions_60)

print(f"  AUC-ROC: {auc_60:.4f}")
print(f"  Accuracy: {acc_60:.4f}")

                                                                                

  AUC-ROC: 0.6935
  Accuracy: 0.6956


                                                                                

In [16]:
model_30.write().overwrite().save("model_classification_30")
model_60.write().overwrite().save("model_classification_60")


In [17]:
sample = predictions_30.select("DelayOver30", "prediction", "probability").limit(5)
sample.show(truncate=False)

                                                                                

+-----------+----------+----------------------------------------+
|DelayOver30|prediction|probability                             |
+-----------+----------+----------------------------------------+
|0          |1.0       |[0.37930321590333205,0.620696784096668] |
|0          |1.0       |[0.38137250081101925,0.6186274991889807]|
|0          |1.0       |[0.3838303969595463,0.6161696030404538] |
|0          |1.0       |[0.3477424063913042,0.6522575936086958] |
|0          |1.0       |[0.3990349388566149,0.6009650611433851] |
+-----------+----------+----------------------------------------+



In [18]:
spark.stop()