In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml import Pipeline

from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

from pyspark.ml.regression import GBTRegressor
from pyspark.ml.evaluation import RegressionEvaluator

In [2]:
spark = SparkSession.builder \
    .appName("FlightDelay_Full_Models") \
    .getOrCreate()

In [3]:
fact = spark.read.csv("../output/CleanFactFlight.csv", header=True, inferSchema=True)

dim_time = spark.read.csv("../output/Dim_Time.csv", header=True, inferSchema=True)
dim_date = spark.read.csv("../output/Dim_Date.csv", header=True, inferSchema=True)
dim_airport = spark.read.csv("../output/Dim_Airport.csv", header=True, inferSchema=True)
dim_marketing = spark.read.csv("../output/Dim_Marketing_Airline.csv", header=True, inferSchema=True)
dim_operating = spark.read.csv("../output/Dim_Operating_Airline.csv", header=True, inferSchema=True)

In [4]:
df = fact.select(
    "flight_id",
    "date_id",
    "departure_time_id",
    "origin_airport_id",
    "destination_airport_id",
    "marketing_airline_id",
    "operating_airline_id",
    "dep_delay_minutes",
    "crs_elapsed_time",
    "distance",
    "flight_complexity_score"
)

In [5]:
df = df.filter(
    (col("dep_delay_minutes").isNotNull()) &
    (col("dep_delay_minutes") > -60) &
    (col("dep_delay_minutes") < 300)
)


In [6]:
df = df.join(
    dim_date.select(
        "date_id",
        "year",
        "quarter",
        "month",
        "day_of_month",
        "week_of_year",
        "is_weekend"
    ),
    on="date_id",
    how="left"
)

df = df.withColumn(
    "is_weekend_int",
    when((col("is_weekend") == True) | (col("is_weekend") == "true"), 1).otherwise(0)
)

In [7]:
df = df.join(
    dim_time.select(
        col("time_id").alias("dep_tid"),
        col("hour").alias("dep_hour"),
        col("minute").alias("dep_minute"),
        col("period_of_day").alias("dep_period_of_day")
    ),
    df.departure_time_id == col("dep_tid"),
    "left"
)

In [8]:
# origin airport
df = df.join(
    dim_airport.select(
        col("airport_id").alias("origin_id"),
        col("airport_code").alias("origin_airport_code"),
        col("state_name").alias("origin_state"),
        col("country").alias("origin_country")
    ),
    df.origin_airport_id == col("origin_id"),
    "left"
)

# destination airport
df = df.join(
    dim_airport.select(
        col("airport_id").alias("dest_id"),
        col("airport_code").alias("dest_airport_code"),
        col("state_name").alias("dest_state"),
        col("country").alias("dest_country")
    ),
    df.destination_airport_id == col("dest_id"),
    "left"
)

In [9]:
df = df.join(
    dim_marketing.select(
        col("marketing_airline_id").alias("mkt_id"),
        col("iata_code").alias("marketing_iata")
    ),
    df.marketing_airline_id == col("mkt_id"),
    "left"
)

df = df.join(
    dim_operating.select(
        col("operating_airline_id").alias("op_id"),
        col("iata_code").alias("operating_iata")
    ),
    df.operating_airline_id == col("op_id"),
    "left"
)

In [10]:
df = df.withColumn("delay_flag", when(col("dep_delay_minutes") > 30, 1).otherwise(0))

In [11]:
features_df = df.select(
    # classification target
    "delay_flag",

    # regression target
    "dep_delay_minutes",

    # numeric
    "crs_elapsed_time",
    "distance",
    "flight_complexity_score",
    "year",
    "quarter",
    "month",
    "day_of_month",
    "week_of_year",
    "is_weekend_int",
    "dep_hour",
    "dep_minute",

    # categorical
    "dep_period_of_day",
    "origin_airport_code",
    "origin_state",
    "origin_country",
    "dest_airport_code",
    "dest_state",
    "dest_country",
    "marketing_iata",
    "operating_iata"
).na.drop()

In [12]:
train, val, test = features_df.randomSplit([0.7, 0.15, 0.15], seed=42)

In [13]:
cat_cols = [
    "dep_period_of_day",
    "origin_airport_code",
    "origin_state",
    "origin_country",
    "dest_airport_code",
    "dest_state",
    "dest_country",
    "marketing_iata",
    "operating_iata"
]

num_cols = [
    "crs_elapsed_time",
    "distance",
    "flight_complexity_score",
    "year",
    "quarter",
    "month",
    "day_of_month",
    "week_of_year",
    "is_weekend_int",
    "dep_hour",
    "dep_minute"
]

indexers = [StringIndexer(inputCol=c, outputCol=f"{c}_idx", handleInvalid="keep") for c in cat_cols]

encoder = OneHotEncoder(
    inputCols=[f"{c}_idx" for c in cat_cols],
    outputCols=[f"{c}_ohe" for c in cat_cols]
)

assembler = VectorAssembler(
    inputCols=num_cols + [f"{c}_ohe" for c in cat_cols],
    outputCol="features"
)

In [14]:
rf = RandomForestClassifier(
    labelCol="delay_flag",
    featuresCol="features",
    numTrees=120,
    maxDepth=12,
    featureSubsetStrategy="auto"
)

pipeline_rf = Pipeline(stages=indexers + [encoder, assembler, rf])
model_rf = pipeline_rf.fit(train)

val_pred_rf = model_rf.transform(val)
test_pred_rf = model_rf.transform(test)


accuracy_eval = MulticlassClassificationEvaluator(
    labelCol="delay_flag", predictionCol="prediction", metricName="accuracy"
)

precision_eval = MulticlassClassificationEvaluator(
    labelCol="delay_flag", predictionCol="prediction", metricName="weightedPrecision"
)

recall_eval = MulticlassClassificationEvaluator(
    labelCol="delay_flag", predictionCol="prediction", metricName="weightedRecall"
)

f1_eval = MulticlassClassificationEvaluator(
    labelCol="delay_flag", predictionCol="prediction", metricName="f1"
)


print("RANDOM FOREST CLASSIFICATION METRICS")

print("VALIDATION:")
print("Accuracy :", accuracy_eval.evaluate(val_pred_rf))
print("Precision:", precision_eval.evaluate(val_pred_rf))
print("Recall   :", recall_eval.evaluate(val_pred_rf))
print("F1-score :", f1_eval.evaluate(val_pred_rf))

print("TEST:")
print("Accuracy :", accuracy_eval.evaluate(test_pred_rf))
print("Precision:", precision_eval.evaluate(test_pred_rf))
print("Recall   :", recall_eval.evaluate(test_pred_rf))
print("F1-score :", f1_eval.evaluate(test_pred_rf))


RANDOM FOREST CLASSIFICATION METRICS
VALIDATION:
Accuracy : 0.8703044980953102
Precision: 0.8872343319184874
Recall   : 0.8703044980953102
F1-score : 0.8158372146731713
TEST:
Accuracy : 0.8724666462979936
Precision: 0.8883510164080474
Recall   : 0.8724666462979938
F1-score : 0.8191764935673751


In [15]:
gbt = GBTRegressor(
    labelCol="dep_delay_minutes",
    featuresCol="features",
    maxDepth=7,
    maxIter=120,
    stepSize=0.05
)

pipeline_gbt = Pipeline(stages=indexers + [encoder, assembler, gbt])
model_gbt = pipeline_gbt.fit(train)

val_pred_gbt = model_gbt.transform(val)
test_pred_gbt = model_gbt.transform(test)


eval_rmse = RegressionEvaluator(labelCol="dep_delay_minutes", predictionCol="prediction", metricName="rmse")
eval_mae  = RegressionEvaluator(labelCol="dep_delay_minutes", predictionCol="prediction", metricName="mae")
eval_mse  = RegressionEvaluator(labelCol="dep_delay_minutes", predictionCol="prediction", metricName="mse")
eval_r2   = RegressionEvaluator(labelCol="dep_delay_minutes", predictionCol="prediction", metricName="r2")


print("GBT REGRESSION METRICS - VALIDATION")
print("RMSE:", eval_rmse.evaluate(val_pred_gbt))
print("MAE :", eval_mae.evaluate(val_pred_gbt))
print("MSE :", eval_mse.evaluate(val_pred_gbt))
print("R2  :", eval_r2.evaluate(val_pred_gbt))
print()
print("GBT REGRESSION METRICS - TEST")
print("RMSE:", eval_rmse.evaluate(test_pred_gbt))
print("MAE :", eval_mae.evaluate(test_pred_gbt))
print("MSE :", eval_mse.evaluate(test_pred_gbt))
print("R2  :", eval_r2.evaluate(test_pred_gbt))


GBT REGRESSION METRICS - VALIDATION
RMSE: 5.826144752059633
MAE : 1.7937801158964246
MSE : 33.943962671952
R2  : 0.9704518558998735

GBT REGRESSION METRICS - TEST
RMSE: 5.7728332775565345
MAE : 1.7897958586886582
MSE : 33.325604050464136
R2  : 0.9707467528516769
