In [0]:
df = spark.read.parquet(f"dbfs:/FileStore/Group_4_3/Data/Pipeline_Output/OTPW_60M_PARQUET")


In [0]:
from pyspark.sql.functions import col, count, avg, when, floor
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, VectorAssembler, StandardScaler
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.evaluation import RegressionEvaluator

# === Step 0: Cast relevant columns and engineer features ===
cast_cols = [
    "DEP_DELAY", "PREV_DEP_DELAY", "PREV2_DEP_DELAY", "PREV2_TIME_BETWEEN_ARR_AND_SCHEDULED_DEP",
    "TIME_BETWEEN_ARR_AND_SCHEDULED_DEP", "CRS_DEP_TIME_MINUTES", "TIME_BETWEEN_SCHEDULED_ARR_AND_SCHEDULED_NEXT_DEP",
    "HOURLYVISIBILITY_2H", "HOURLYVISIBILITY_3H", "HOURLYVISIBILITY_4H", "HOURLYVISIBILITY_5H", "HOURLYVISIBILITY_6H",
    "HOURLYWINDSPEED_2H", "HOURLYWINDSPEED_3H", "HOURLYWINDSPEED_4H", "HOURLYWINDSPEED_5H", "HOURLYWINDSPEED_6H",
    "HOURLYWINDGUSTSPEED_2H", "HOURLYWINDGUSTSPEED_3H", "HOURLYWINDGUSTSPEED_4H", "HOURLYWINDGUSTSPEED_5H", "HOURLYWINDGUSTSPEED_6H",
    "HOURLYPRECIPITATION_2H", "HOURLYPRECIPITATION_3H", "HOURLYPRECIPITATION_4H", "HOURLYPRECIPITATION_5H", "HOURLYPRECIPITATION_6H",
    "HOURLYDRYBULBTEMPERATURE_2H", "HOURLYDRYBULBTEMPERATURE_3H", "HOURLYDRYBULBTEMPERATURE_4H", "HOURLYDRYBULBTEMPERATURE_5H", "HOURLYDRYBULBTEMPERATURE_6H",
    "HOURLYSEALEVELPRESSURE_2H", "HOURLYSEALEVELPRESSURE_3H", "HOURLYSEALEVELPRESSURE_4H", "HOURLYSEALEVELPRESSURE_5H", "HOURLYSEALEVELPRESSURE_6H",
    "ORIGIN_CLOSENESS", "TAIL_NUM_USE", "ORIGIN_RELATIVE_TRAFFIC"
]

df_feat = df
for c in cast_cols:
    df_feat = df_feat.withColumn(c, col(c).cast("double"))

# Feature engineering
df_feat = df_feat \
    .withColumn("CRS_DEP_X_PREV_DELAY", col("CRS_DEP_TIME_MINUTES") * col("PREV_DEP_DELAY")) \
    .withColumn("ARR_TIME_GAP_X_CRS_DEP", col("TIME_BETWEEN_ARR_AND_SCHEDULED_DEP") * col("CRS_DEP_TIME_MINUTES")) \
    .withColumn("TRIPLE_PRODUCT", col("TIME_BETWEEN_ARR_AND_SCHEDULED_DEP") * col("HOURLYVISIBILITY_2H") * col("ORIGIN_CLOSENESS")) \
    .withColumn("DEP_HOUR", floor(col("CRS_DEP_TIME_MINUTES") / 60))

# === Step 1: Time-based split for encodings ===
train_data = df_feat.filter(col("YEAR") < 2019)
test_data = df_feat.filter(col("YEAR") == 2019)

# === Step 2: Target encodings ===
mean_delay_carrier = train_data.groupBy("OP_UNIQUE_CARRIER").agg(avg("DEP_DELAY").alias("MEAN_DELAY_PER_CARRIER"))
mean_delay_hour = train_data.groupBy("DEP_HOUR").agg(avg("DEP_DELAY").alias("MEAN_DELAY_PER_HOUR"))
mean_delay_dest = train_data.groupBy("DEST").agg(avg("DEP_DELAY").alias("MEAN_DELAY_PER_DEST"))

df_joined = df_feat \
    .join(mean_delay_carrier, on="OP_UNIQUE_CARRIER", how="left") \
    .join(mean_delay_hour, on="DEP_HOUR", how="left") \
    .join(mean_delay_dest, on="DEST", how="left")

# === Step 3: Rare category flags ===
origin_counts = train_data.groupBy("ORIGIN").agg(count("*").alias("origin_count"))
dest_counts = train_data.groupBy("DEST").agg(count("*").alias("dest_count"))

df_flagged = df_joined \
    .join(origin_counts, on="ORIGIN", how="left") \
    .join(dest_counts, on="DEST", how="left") \
    .withColumn("IS_RARE_ORIGIN", when(col("origin_count") < 50, 1).otherwise(0)) \
    .withColumn("IS_RARE_DEST", when(col("dest_count") < 50, 1).otherwise(0))

# === Step 4: Define features ===
target = "DEP_DELAY"
exclude = {target, "origin_count", "dest_count"}
all_features = [c for c in df_flagged.columns if c not in exclude]
categorical_cols = [c for c in all_features if dict(df_flagged.dtypes)[c] == "string"]
numeric_cols = [c for c in all_features if dict(df_flagged.dtypes)[c] != "string"]

# === Step 5: Index + assemble features ===
indexers = [StringIndexer(inputCol=c, outputCol=c + "_idx", handleInvalid="keep") for c in categorical_cols]
indexed_features = [c + "_idx" for c in categorical_cols] + numeric_cols
assembler = VectorAssembler(inputCols=indexed_features, outputCol="features_unscaled")
scaler = StandardScaler(inputCol="features_unscaled", outputCol="features")

# === Step 6: Model and pipeline ===
rf = RandomForestRegressor(featuresCol="features", labelCol=target, maxDepth=10, numTrees=100)
pipeline = Pipeline(stages=indexers + [assembler, scaler, rf])

# === Step 7: Final split ===
df_final = df_flagged.dropna(subset=[target] + all_features)
train = df_final.filter(col("YEAR") < 2019)
test = df_final.filter(col("YEAR") == 2019)

# === Step 8: Fit model and evaluate ===
model = pipeline.fit(train)
predictions = model.transform(test)

evaluator = RegressionEvaluator(labelCol=target, predictionCol="prediction")
rmse = evaluator.setMetricName("rmse").evaluate(predictions)
mae = evaluator.setMetricName("mae").evaluate(predictions)
r2 = evaluator.setMetricName("r2").evaluate(predictions)

print(f"📉 RMSE (2019): {rmse:.2f}")
print(f"📉 MAE  (2019): {mae:.2f}")
print(f"📈 R²   (2019): {r2:.4f}")


📉 RMSE (2019): 58.33
📉 MAE  (2019): 25.35
📈 R²   (2019): 0.0683


In [0]:
from pyspark.sql.functions import col, count, avg, when, floor, log1p, expm1
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, VectorAssembler, StandardScaler
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

# === Step 0: Cast columns ===
cast_cols = [
    "DEP_DELAY", "PREV_DEP_DELAY", "TIME_BETWEEN_ARR_AND_SCHEDULED_DEP",
    "CRS_DEP_TIME_MINUTES", "HOURLYVISIBILITY_2H", "HOURLYVISIBILITY_6H",
    "HOURLYWINDSPEED_2H", "HOURLYWINDSPEED_6H", "ORIGIN_CLOSENESS",
    "TAIL_NUM_USE", "ORIGIN_RELATIVE_TRAFFIC"
]

df_feat = df
for c in cast_cols:
    df_feat = df_feat.withColumn(c, col(c).cast("double"))

# === Step 1: Log-transform target ===
df_feat = df_feat.withColumn("DEP_DELAY_LOG", log1p(col("DEP_DELAY")))

# === Step 2: Feature engineering ===
df_feat = df_feat \
    .withColumn("DEP_HOUR", floor(col("CRS_DEP_TIME_MINUTES") / 60)) \
    .withColumn("CRS_DEP_X_PREV_DELAY", col("CRS_DEP_TIME_MINUTES") * col("PREV_DEP_DELAY")) \
    .withColumn("TRIPLE_PRODUCT", col("TIME_BETWEEN_ARR_AND_SCHEDULED_DEP") * col("HOURLYVISIBILITY_2H") * col("ORIGIN_CLOSENESS")) \
    .withColumn("VISIBILITY_TREND", col("HOURLYVISIBILITY_6H") - col("HOURLYVISIBILITY_2H")) \
    .withColumn("WINDSPEED_TREND", col("HOURLYWINDSPEED_6H") - col("HOURLYWINDSPEED_2H")) \
    .withColumn("IS_PEAK_HOUR", when((col("DEP_HOUR").between(6, 9)) | (col("DEP_HOUR").between(16, 19)), 1).otherwise(0))

# === Step 3: Time-based split for encodings ===
train_data = df_feat.filter(col("YEAR") < 2019)
test_data = df_feat.filter(col("YEAR") == 2019)

# === Step 4: Target encodings ===
mean_delay_carrier = train_data.groupBy("OP_UNIQUE_CARRIER").agg(avg("DEP_DELAY").alias("MEAN_DELAY_PER_CARRIER"))
mean_delay_hour = train_data.groupBy("DEP_HOUR").agg(avg("DEP_DELAY").alias("MEAN_DELAY_PER_HOUR"))
mean_delay_dest = train_data.groupBy("DEST").agg(avg("DEP_DELAY").alias("MEAN_DELAY_PER_DEST"))

df_joined = df_feat \
    .join(mean_delay_carrier, on="OP_UNIQUE_CARRIER", how="left") \
    .join(mean_delay_hour, on="DEP_HOUR", how="left") \
    .join(mean_delay_dest, on="DEST", how="left")

# === Step 5: Rare category flags ===
origin_counts = train_data.groupBy("ORIGIN").agg(count("*").alias("origin_count"))
dest_counts = train_data.groupBy("DEST").agg(count("*").alias("dest_count"))

df_flagged = df_joined \
    .join(origin_counts, on="ORIGIN", how="left") \
    .join(dest_counts, on="DEST", how="left") \
    .withColumn("IS_RARE_ORIGIN", when(col("origin_count") < 50, 1).otherwise(0)) \
    .withColumn("IS_RARE_DEST", when(col("dest_count") < 50, 1).otherwise(0))

# === Step 6: Feature selection ===
target = "DEP_DELAY_LOG"
exclude = {"DEP_DELAY", target, "origin_count", "dest_count"}
all_features = [c for c in df_flagged.columns if c not in exclude]
categorical_cols = [c for c in all_features if dict(df_flagged.dtypes)[c] == "string"]
numeric_cols = [c for c in all_features if dict(df_flagged.dtypes)[c] != "string"]

# === Step 7: Index + assemble features ===
indexers = [StringIndexer(inputCol=c, outputCol=c + "_idx", handleInvalid="keep") for c in categorical_cols]
indexed_features = [c + "_idx" for c in categorical_cols] + numeric_cols
assembler = VectorAssembler(inputCols=indexed_features, outputCol="features_unscaled")
scaler = StandardScaler(inputCol="features_unscaled", outputCol="features")

# === Step 8: Random Forest model ===
rf = RandomForestRegressor(featuresCol="features", labelCol=target)

# === Step 9: Build pipeline ===
pipeline = Pipeline(stages=indexers + [assembler, scaler, rf])

# === Step 10: Prepare training/testing data ===
df_final = df_flagged.dropna(subset=[target] + all_features)
train = df_final.filter(col("YEAR") < 2019)
test = df_final.filter(col("YEAR") == 2019)

# === Step 11: Hyperparameter tuning ===
paramGrid = (ParamGridBuilder()
    .addGrid(rf.maxDepth, [10, 15])
    .addGrid(rf.numTrees, [100, 150])
    .build())

evaluator = RegressionEvaluator(labelCol=target, predictionCol="prediction", metricName="rmse")

cv = CrossValidator(
    estimator=pipeline,
    estimatorParamMaps=paramGrid,
    evaluator=evaluator,
    numFolds=3
)

# === Step 12: Fit and evaluate ===
cv_model = cv.fit(train)
predictions = cv_model.transform(test)

# Inverse transform the log prediction for reporting
predictions = predictions.withColumn("prediction_exp", expm1(col("prediction")))
predictions = predictions.withColumn("DEP_DELAY", col("DEP_DELAY").cast("double"))

# === Step 13: Final evaluation ===
final_evaluator = RegressionEvaluator(labelCol="DEP_DELAY", predictionCol="prediction_exp")
rmse = final_evaluator.setMetricName("rmse").evaluate(predictions)
mae = final_evaluator.setMetricName("mae").evaluate(predictions)
r2 = final_evaluator.setMetricName("r2").evaluate(predictions)

print(f"📉 RMSE (2019): {rmse:.2f}")
print(f"📉 MAE  (2019): {mae:.2f}")
print(f"📈 R²   (2019): {r2:.4f}")


📉 RMSE (2019): 87.04
📉 MAE  (2019): 36.57
📈 R²   (2019): -0.0751
