# NN MLP model baseline - 5 years

## Imports

In [0]:
from pyspark.sql.functions import col
from pyspark.sql import Window
import pyspark.sql.functions as F
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler, StandardScaler
from pyspark.ml import Pipeline

from pyspark.ml.regression import LinearRegression, RandomForestRegressor
from pyspark.ml.classification import MultilayerPerceptronClassifier
from pyspark.ml.evaluation import RegressionEvaluator, BinaryClassificationEvaluator, MulticlassClassificationEvaluator
from xgboost.spark import SparkXGBRegressor

from mlflow.models import infer_signature


import random
import numpy as np
import pandas as pd

import mlflow
print(mlflow.__version__)

import os

spark.conf.set("spark.databricks.mlflow.trackMLlib.enabled", "true")

RANDOM_SEED = 0
# Define experiment name with proper Databricks path
EXPERIMENT_NAME = "/Shared/team_2_2/mlflow-nn-classifier"
# Create the experiment if it doesn't exist
try:
    experiment = mlflow.get_experiment_by_name(EXPERIMENT_NAME)
    if experiment is None:
        experiment_id = mlflow.create_experiment(EXPERIMENT_NAME)
        print(f"Created new experiment with ID: {experiment_id}")
    else:
        print(f"Using existing experiment: {experiment.name}")
    mlflow.set_experiment(EXPERIMENT_NAME)
except Exception as e:
    print(f"Error with experiment setup: {e}")
    # Fallback to default experiment in workspace
    mlflow.set_experiment(f"/Users/{dbutils.notebook.entry_point.getDbutils().notebook().getContext().userName().get()}/default")



## Helper Functions


In [0]:
def checkpoint_dataset(dataset, file_path):
    # Create base folder
    section = "2"
    number = "2"
    base_folder = f"dbfs:/student-groups/Group_{section}_{number}"
    dbutils.fs.mkdirs(base_folder)
    # Create subfolders if file_path contains directories
    full_path = f"{base_folder}/{file_path}.parquet"
    subfolder = "/".join(full_path.split("/")[:-1])
    dbutils.fs.mkdirs(subfolder)
    # Save dataset as a parquet file
    dataset.write.mode("overwrite").parquet(full_path)
    print(f"Checkpointed {file_path}")

## Datasets - custom join
- get checkpoint data
  - 5 year combined join, with feature engineering

In [0]:
# %fs ls dbfs:/student-groups/Group_2_2/1_year_custom_joined/feature_eng

display(dbutils.fs.ls("dbfs:/student-groups/Group_2_2/"))

display(dbutils.fs.ls("dbfs:/student-groups/Group_2_2/3_month_custom_joined/fe_graph_and_holiday/"))
display(dbutils.fs.ls("dbfs:/student-groups/Group_2_2/3_month_custom_joined/fe_graph_and_holiday/cv_splits/"))
display(dbutils.fs.ls("dbfs:/student-groups/Group_2_2/3_month_custom_joined/fe_graph_and_holiday/training_splits/"))

display(dbutils.fs.ls("dbfs:/student-groups/Group_2_2/models/"))

display(dbutils.fs.ls("dbfs:/student-groups/Group_2_2/5_year_custom_joined/"))
display(dbutils.fs.ls("dbfs:/student-groups/Group_2_2/5_year_custom_joined/cv_splits/"))
display(dbutils.fs.ls("dbfs:/student-groups/Group_2_2/5_year_custom_joined/fe_graph_and_holiday/training_splits/"))
display(dbutils.fs.ls("dbfs:/student-groups/Group_2_2/5_year_custom_joined/feature_eng_ph3/training_splits/"))

In [0]:
display(spark.read.parquet("dbfs:/student-groups/Group_2_2/5_year_custom_joined/fe_graph_and_holiday/training_splits/test.parquet/").limit(100))

# display(spark.read.parquet("dbfs:/student-groups/Group_2_2/5_year_custom_joined/feature_eng_ph3/training_splits/test.parquet/").limit(100))

# Feature Selection

In [0]:
baselines_columns = [
    "QUARTER",
    "MONTH",
    "YEAR",
    "DAY_OF_MONTH",
    "DAY_OF_WEEK",
    "OP_CARRIER",
    # "TAIL_NUM",
    "ORIGIN_AIRPORT_SEQ_ID",
    "DEST_AIRPORT_SEQ_ID",
    "CRS_ELAPSED_TIME",
    "DISTANCE",
    "DEP_DELAY_NEW",
    "DEP_DEL15",
    "utc_timestamp",
    "CRS_DEP_MINUTES",            # feature eng start
    "prev_flight_delay_in_minutes", 
    "prev_flight_delay",
    "origin_delays_4h",
    "delay_origin_7d",
    "delay_origin_carrier_7d",
    "delay_route_7d",
    "flight_count_24h",
    "LANDING_TIME_DIFF_MINUTES",
    "AVG_ARR_DELAY_ORIGIN",
    "AVG_TAXI_OUT_ORIGIN",        # feature eng end
    'HourlyDryBulbTemperature',     # weather start
    'HourlyDewPointTemperature',
    'HourlyRelativeHumidity',
    'HourlyAltimeterSetting',
    'HourlyVisibility',
    'HourlyStationPressure',
    'HourlyWetBulbTemperature',
    'HourlyPrecipitation',
    'HourlyCloudCoverage',
    'HourlyCloudElevation',
    'HourlyWindSpeed',               # weather end
    'page_rank',               # phase 3 new features start
    'out_degree',
    'in_degree',
    'weighted_out_degree',
    'weighted_in_degree',
    'N_RUNWAYS',
    'betweenness_unweighted',
    'closeness',
    'betweenness',
    'avg_origin_dep_delay',
    'avg_dest_arr_delay',
    'avg_daily_route_flights',
    'avg_route_delay',
    'avg_hourly_flights',
    "IS_HOLIDAY",
    "IS_HOLIDAY_WINDOW",
    "AIRPORT_HUB_CLASS",
    "RATING",
    "AIRLINE_CATEGORY" # phase 3 new features end
]

In [0]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml import Pipeline
from pyspark.ml.regression import RandomForestRegressor

# Categorical encoding
carrier_indexer = StringIndexer(inputCol="OP_CARRIER", outputCol="carrier_idx", handleInvalid="keep")
origin_indexer = StringIndexer(inputCol="ORIGIN_AIRPORT_SEQ_ID", outputCol="origin_idx", handleInvalid="keep")
dest_indexer = StringIndexer(inputCol="DEST_AIRPORT_SEQ_ID", outputCol="dest_idx", handleInvalid="keep")
tail_num_indexer = StringIndexer(inputCol="TAIL_NUM", outputCol="tail_num_idx", handleInvalid="keep")
holiday_indexer = StringIndexer(inputCol="IS_HOLIDAY", outputCol="holiday_idx", handleInvalid="keep")
holiday_window_indexer = StringIndexer(inputCol="IS_HOLIDAY_WINDOW", outputCol="holiday_window_idx", handleInvalid="keep")
airport_hub_indexer = StringIndexer(inputCol="AIRPORT_HUB_CLASS", outputCol="airport_hub_idx", handleInvalid="keep")
airline_category_indexer = StringIndexer(inputCol="AIRLINE_CATEGORY", outputCol="airline_category_idx", handleInvalid="keep")

carrier_encoder = OneHotEncoder(inputCol="carrier_idx", outputCol="carrier_vec")
origin_encoder = OneHotEncoder(inputCol="origin_idx", outputCol="origin_vec")
dest_encoder = OneHotEncoder(inputCol="dest_idx", outputCol="dest_vec")
tail_num_encoder = OneHotEncoder(inputCol="tail_num_idx", outputCol="tail_num_vec")
holiday_encoder = OneHotEncoder(inputCol="holiday_idx", outputCol="holiday_vec")
holiday_window_encoder = OneHotEncoder(inputCol="holiday_window_idx", outputCol="holiday_window_vec")
airport_hub_encoder = OneHotEncoder(inputCol="airport_hub_idx", outputCol="airport_hub_vec")
airline_category_encoder = OneHotEncoder(inputCol="airline_category_idx", outputCol="airline_category_vec")


In [0]:
# Assemble all features
assembler = VectorAssembler(
    inputCols=[
        "QUARTER",
        "MONTH", 
        "YEAR",
        "DAY_OF_MONTH",
        "DAY_OF_WEEK",
        "carrier_vec",
        "origin_vec",
        "dest_vec",
        "CRS_ELAPSED_TIME",
        "DISTANCE",
        # "tail_num_vec",
        "CRS_DEP_MINUTES",                 # feature eng start
        "prev_flight_delay_in_minutes",
        "prev_flight_delay",
        "origin_delays_4h",
        "delay_origin_7d",
        "delay_origin_carrier_7d",
        "delay_route_7d",
        "flight_count_24h",
        "LANDING_TIME_DIFF_MINUTES",
        "AVG_ARR_DELAY_ORIGIN",
        "AVG_TAXI_OUT_ORIGIN",              # feature eng end
        'HourlyDryBulbTemperature',         # weather start
        'HourlyDewPointTemperature',
        'HourlyRelativeHumidity',
        'HourlyAltimeterSetting',
        'HourlyVisibility',
        'HourlyStationPressure',
        'HourlyWetBulbTemperature',
        'HourlyPrecipitation',
        'HourlyCloudCoverage',
        'HourlyCloudElevation',
        'HourlyWindSpeed',                   # weather end
        'page_rank',               # phase 3 new features start
        'out_degree',
        'in_degree',
        'weighted_out_degree',
        'weighted_in_degree',
        'N_RUNWAYS',
        'betweenness_unweighted',
        'closeness',
        'betweenness',
        'avg_origin_dep_delay',
        'avg_dest_arr_delay',
        'avg_daily_route_flights',
        'avg_route_delay',
        'avg_hourly_flights',
        "holiday_vec",
        "holiday_window_vec",
        "airport_hub_vec",
        "RATING",
        "airline_category_vec"               # phase 3 new features end
    ],
    outputCol="features"
)

# Training with Best Hyperparameters

In [0]:
# --- Model Estimators ---
preprocessing_stages = [
    carrier_indexer, origin_indexer, dest_indexer, 
    carrier_encoder, origin_encoder, dest_encoder, 
    assembler 
]

# --- Evaluator (Use one metric for optimization) ---
rmse_evaluator = RegressionEvaluator(
    labelCol="DEP_DELAY_NEW",
    predictionCol="prediction",
    metricName="rmse" 
)

mae_evaluator = RegressionEvaluator(
        labelCol="DEP_DELAY_NEW",      
        predictionCol="prediction", 
        metricName="mae"    
)

precision_evaluator = MulticlassClassificationEvaluator(
    labelCol="DEP_DEL15",
    predictionCol="prediction",
    metricName="weightedPrecision"
)

recall_evaluator = MulticlassClassificationEvaluator(
    labelCol="DEP_DEL15",
    predictionCol="prediction",
    metricName="weightedRecall"
)

f1_evaluator = MulticlassClassificationEvaluator(
    labelCol="DEP_DEL15",
    predictionCol="prediction",
    metricName="f1"
)

f2_evaluator = MulticlassClassificationEvaluator(
    labelCol="DEP_DEL15",
    predictionCol="prediction",
    metricName="weightedFMeasure"
)
f2_evaluator.setBeta(2.0)

f2_evaluator_label = MulticlassClassificationEvaluator(
    labelCol="DEP_DEL15",
    predictionCol="prediction",
    metricName="fMeasureByLabel"
)
f2_evaluator_label.setMetricLabel(1.0).setBeta(2.0)

auc_evaluator = BinaryClassificationEvaluator(
    labelCol="DEP_DEL15", 
    rawPredictionCol="rawPrediction", 
    metricName="areaUnderROC"
)

acc_evaluator = MulticlassClassificationEvaluator(
    labelCol="DEP_DEL15", 
    predictionCol="prediction", 
    metricName="accuracy"
)

In [0]:
def read_specific_fold(path: str, fold_id: int, split_type: str):
    """
    Read a specific fold from partitioned parquet data.
    Falls back to filtering if direct partition read fails.
    """
    fold_path = f"{path}/fold_id={fold_id}/split_type={split_type}"
    
    try:
        # Try direct partition read
        return spark.read.parquet(fold_path)
    except:
        # Fallback: read all data and filter
        print(f"Direct read failed for fold {fold_id}, using filter method...")
        all_data = spark.read.parquet(path)
        return all_data.filter(
            (all_data.fold_id == fold_id) & 
            (all_data.split_type == split_type)
        )

In [0]:
month_or_year = "5_year_custom_joined"
# cv_path = f"dbfs:/student-groups/Group_2_2/{month_or_year}/fe_graph_and_holiday/cv_splits" 
cv_path = f"dbfs:/student-groups/Group_2_2/{month_or_year}/cv_splits" 

# Plain MLP Model

In [0]:
from pyspark.ml.classification import MultilayerPerceptronClassifier
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler, StandardScaler
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.sql import functions as F
from pyspark.storagelevel import StorageLevel
from pyspark.sql.types import BooleanType
import mlflow
import numpy as np

# --- 1. Aggressive Optimization Config ---
OPTIMAL_PARTITIONS = 480  # 48 cores * 10 tasks
n_folds = 10               
month_or_year = "5_year_custom_joined"
cv_path = f"dbfs:/student-groups/Group_2_2/{month_or_year}/cv_splits"

# --- 2. Safety UDF (Prevent Crashing on Inf/NaN) ---
@F.udf(returnType=BooleanType())
def vector_is_valid(v):
    if v is None: return False
    if np.any(np.isinf(v.values)): return False
    if np.any(np.isnan(v.values)): return False
    if np.max(np.abs(v.values)) > 1e30: return False
    return True

# --- 3. Pipeline Definition ---
# Since this is the BASELINE, we process raw features (no XGBoost input)
carrier_indexer = StringIndexer(inputCol="OP_CARRIER", outputCol="carrier_idx", handleInvalid="keep")
origin_indexer = StringIndexer(inputCol="ORIGIN_AIRPORT_SEQ_ID", outputCol="origin_idx", handleInvalid="keep")
dest_indexer = StringIndexer(inputCol="DEST_AIRPORT_SEQ_ID", outputCol="dest_idx", handleInvalid="keep")
tail_num_indexer = StringIndexer(inputCol="TAIL_NUM", outputCol="tail_num_idx", handleInvalid="keep")
holiday_indexer = StringIndexer(inputCol="IS_HOLIDAY", outputCol="holiday_idx", handleInvalid="keep")
holiday_window_indexer = StringIndexer(inputCol="IS_HOLIDAY_WINDOW", outputCol="holiday_window_idx", handleInvalid="keep")
airport_hub_indexer = StringIndexer(inputCol="AIRPORT_HUB_CLASS", outputCol="airport_hub_idx", handleInvalid="keep")
airline_category_indexer = StringIndexer(inputCol="AIRLINE_CATEGORY", outputCol="airline_category_idx", handleInvalid="keep")

carrier_encoder = OneHotEncoder(inputCol="carrier_idx", outputCol="carrier_vec")
origin_encoder = OneHotEncoder(inputCol="origin_idx", outputCol="origin_vec")
dest_encoder = OneHotEncoder(inputCol="dest_idx", outputCol="dest_vec")
tail_num_encoder = OneHotEncoder(inputCol="tail_num_idx", outputCol="tail_num_vec")
holiday_encoder = OneHotEncoder(inputCol="holiday_idx", outputCol="holiday_vec")
holiday_window_encoder = OneHotEncoder(inputCol="holiday_window_idx", outputCol="holiday_window_vec")
airport_hub_encoder = OneHotEncoder(inputCol="airport_hub_idx", outputCol="airport_hub_vec")
airline_category_encoder = OneHotEncoder(inputCol="airline_category_idx", outputCol="airline_category_vec")

assembler = VectorAssembler(
    inputCols=[
        "QUARTER",
        "MONTH", 
        "YEAR",
        "DAY_OF_MONTH",
        "DAY_OF_WEEK",
        "carrier_vec",
        "origin_vec",
        "dest_vec",
        "CRS_ELAPSED_TIME",
        "DISTANCE",
        # "tail_num_vec",
        "CRS_DEP_MINUTES",                 # feature eng start
        "prev_flight_delay_in_minutes",
        "prev_flight_delay",
        "origin_delays_4h",
        "delay_origin_7d",
        "delay_origin_carrier_7d",
        "delay_route_7d",
        "flight_count_24h",
        "LANDING_TIME_DIFF_MINUTES",
        "AVG_ARR_DELAY_ORIGIN",
        "AVG_TAXI_OUT_ORIGIN",              # feature eng end
        'HourlyDryBulbTemperature',         # weather start
        'HourlyDewPointTemperature',
        'HourlyRelativeHumidity',
        'HourlyAltimeterSetting',
        'HourlyVisibility',
        'HourlyStationPressure',
        'HourlyWetBulbTemperature',
        'HourlyPrecipitation',
        'HourlyCloudCoverage',
        'HourlyCloudElevation',
        'HourlyWindSpeed',                   # weather end
        'page_rank',               # phase 3 new features start
        'out_degree',
        'in_degree',
        'weighted_out_degree',
        'weighted_in_degree',
        'N_RUNWAYS',
        'betweenness_unweighted',
        'closeness',
        'betweenness',
        'avg_origin_dep_delay',
        'avg_dest_arr_delay',
        'avg_daily_route_flights',
        'avg_route_delay',
        'avg_hourly_flights',
        "holiday_vec",
        "holiday_window_vec",
        "airport_hub_vec",
        "RATING",
        "airline_category_vec"               # phase 3 new features end
    ],
    outputCol="raw_features"
)

scaler = StandardScaler(inputCol="raw_features", outputCol="scaled_features", withStd=True, withMean=False)

baseline_pipeline = Pipeline(stages=[
    carrier_indexer, origin_indexer, dest_indexer, 
    tail_num_indexer, holiday_indexer, holiday_window_indexer, 
    airport_hub_indexer, airline_category_indexer,
    
    carrier_encoder, origin_encoder, dest_encoder, 
    tail_num_encoder, holiday_encoder, holiday_window_encoder, 
    airport_hub_encoder, airline_category_encoder,
    
    assembler, 
    scaler
])

# --- 4. Global Loading & Preprocessing ---
print(f"Loading 20% Sample from {month_or_year}...")

# Load + Sample + Repartition
full_cv_df = spark.read.parquet(cv_path) \
    .sample(withReplacement=False, fraction=0.20, seed=RANDOM_SEED) \
    .repartition(OPTIMAL_PARTITIONS) \
    .cache()

print("Fitting Global Feature Pipeline...")
feat_model = baseline_pipeline.fit(full_cv_df)

print("Transforming, Cleaning & Persisting Vectors...")
# Transform -> Filter Invalid Vectors -> Persist to Disk
featurized_df = feat_model.transform(full_cv_df) \
    .select("scaled_features", "DEP_DEL15", "fold_id", "split_type") \
    .filter(vector_is_valid(F.col("scaled_features"))) \
    .persist(StorageLevel.DISK_ONLY)

print(f"Materialized {featurized_df.count()} rows for Baseline Training.")

# --- 5. Training Loop (With Class Balancing) ---
f1_evaluator = MulticlassClassificationEvaluator(labelCol="DEP_DEL15", metricName="f1")
f2_evaluator = MulticlassClassificationEvaluator(labelCol="DEP_DEL15", metricName="weightedFMeasure", beta=2.0)
f2_label_evaluator = MulticlassClassificationEvaluator(labelCol="DEP_DEL15", metricName="fMeasureByLabel", metricLabel=1.0, beta=2.0)
precision_evaluator = MulticlassClassificationEvaluator(labelCol="DEP_DEL15", metricName="weightedPrecision")
recall_evaluator = MulticlassClassificationEvaluator(labelCol="DEP_DEL15", metricName="weightedRecall")

print(f"Starting Baseline MLP Training (Balanced)...")

with mlflow.start_run(run_name="POST_PRESENTATION_MLP_BASELINE_5_YR_BALANCED_10CV") as run:
    
    mlflow.log_param("model", "MLP Classifier")
    mlflow.log_param("type", "Baseline (No Stack)")
    mlflow.log_param("strategy", "Balanced Training + Global Prep")
    
    fold_metrics = {
        'train_f1': [], 'train_f2': [], 'train_f2_label': [], 'train_precision': [], 'train_recall': [],
        'val_f1': [], 'val_f2': [], 'val_f2_label': [], 'val_precision': [], 'val_recall': []
    }
    
    input_dim = len(featurized_df.first()["scaled_features"])
    print(f"Detected Input Dimension: {input_dim}")
    mlflow.log_param("input_dim", input_dim)

    for fold_id in range(1, n_folds + 1):
        with mlflow.start_run(run_name=f"fold_{fold_id}", nested=True):
            print(f"  Processing Fold {fold_id}/{n_folds}...")
            
            # 1. Split Data
            # Note: For baseline, we use the pre-defined 'split_type' from your CV generation
            train_raw = featurized_df.filter((F.col("fold_id") == fold_id) & (F.col("split_type") == "train"))
            val_vec = featurized_df.filter((F.col("fold_id") == fold_id) & (F.col("split_type") == "validation"))
            
            # 2. BALANCE TRAINING DATA (Critical Step)
            train_pos = train_raw.filter(F.col("DEP_DEL15") == 1.0)
            train_neg = train_raw.filter(F.col("DEP_DEL15") == 0.0)
            
            pos_count = train_pos.count()
            neg_count = train_neg.count()
            
            # Downsample negatives to match positives
            fraction = pos_count / neg_count
            train_neg_sampled = train_neg.sample(withReplacement=False, fraction=fraction, seed=42)
            
            train_balanced = train_pos.union(train_neg_sampled).repartition(OPTIMAL_PARTITIONS)
            
            print(f"    Fold {fold_id}: Training on Balanced Data ({pos_count} Pos)")
            
            # 3. Define & Train MLP
            layers = [input_dim, 64, 2] 
            mlp = MultilayerPerceptronClassifier(
                featuresCol="scaled_features",
                labelCol="DEP_DEL15",
                maxIter=100,
                layers=layers,
                blockSize=128,
                stepSize=0.03
            )
            
            mlp_model = mlp.fit(train_balanced)
            
            # 4. Predict & Evaluate
            # Predict on Training Set (Balanced)
            train_preds = mlp_model.transform(train_balanced).select("prediction", "DEP_DEL15").cache()
            train_preds.count() # Materialize
            
            # Predict on Validation Set (Unbalanced/Real)
            val_preds = mlp_model.transform(val_vec).select("prediction", "DEP_DEL15").cache()
            val_preds.count() # Materialize
            
            metrics = {
                # Training Metrics
                "train_f1": f1_evaluator.evaluate(train_preds),
                "train_f2": f2_evaluator.evaluate(train_preds),
                "train_f2_label": f2_label_evaluator.evaluate(train_preds),
                "train_precision": precision_evaluator.evaluate(train_preds),
                "train_recall": recall_evaluator.evaluate(train_preds),
                
                # Validation Metrics
                "val_f1": f1_evaluator.evaluate(val_preds),
                "val_f2": f2_evaluator.evaluate(val_preds),
                "val_f2_label": f2_label_evaluator.evaluate(val_preds),
                "val_precision": precision_evaluator.evaluate(val_preds),
                "val_recall": recall_evaluator.evaluate(val_preds),
            }
            
            # mlflow.log_metrics(metrics)
            print(f"    Fold {fold_id}: Train F2-Delay={metrics['train_f2_label']:.4f}, Val F2-Delay={metrics['val_f2_label']:.4f}")
            
            for k in fold_metrics.keys():
                fold_metrics[k].append(metrics[k])
            
            val_preds.unpersist()

    # Log Averages
    avg_metrics = {f"avg_{k}": np.mean(v) for k, v in fold_metrics.items()}
    mlflow.log_metrics(avg_metrics)
    
    print("\n" + "="*50)
    print(f"Average Train Delay-F2: {avg_metrics['avg_train_f2_label']:.4f}")
    print(f"Average Val Delay-F2:   {avg_metrics['avg_val_f2_label']:.4f}")
    print("="*50)

    # --- 6. RETRAIN FINAL MODEL ON FULL DATA ---
    print("Retraining Final Model on Full Balanced Data...")
    
    # Balance the entire dataset (using the global sample)
    full_pos = featurized_df.filter(F.col("DEP_DEL15") == 1.0)
    full_neg = featurized_df.filter(F.col("DEP_DEL15") == 0.0)
    
    global_fraction = full_pos.count() / full_neg.count()
    full_balanced = full_pos.union(
        full_neg.sample(withReplacement=False, fraction=global_fraction, seed=42)
    ).repartition(OPTIMAL_PARTITIONS)
    
    final_mlp = MultilayerPerceptronClassifier(
        featuresCol="scaled_features",
        labelCol="DEP_DEL15",
        layers=[input_dim, 64, 2],
        blockSize=128,
        maxIter=100,
        stepSize=0.03
    )
    
    final_model = final_mlp.fit(full_balanced)
    
    # Log Model Artifact
    mlflow.spark.log_model(final_model, "model")
    print(f"Final Model Saved to Run: {run.info.run_id}")

# Cleanup
featurized_df.unpersist()
full_cv_df.unpersist()


# stacked approach
- build XGBoost model and do hyperparamter tuning to find the best hyperparams
- generate the XGBoost regression delay field and output it using the held out data
- use the XGBoost delay field as the input for the NN/MLP model while doing hyperparameter tuning

In [0]:
from pyspark.ml.tuning import ParamGridBuilder
from xgboost.spark import SparkXGBRegressor
from pyspark.ml import Pipeline
from pyspark.sql import functions as F
from pyspark.storagelevel import StorageLevel
from pyspark.ml.linalg import Vectors, VectorUDT
from pyspark.sql.types import BooleanType
import itertools
import mlflow
import numpy as np
import pandas as pd
import shutil

# --- 1. Config ---
TRAIN_PARTITIONS = 6 
n_folds = 10
month_or_year = "5_year_custom_joined"
cv_path = f"dbfs:/student-groups/Group_2_2/{month_or_year}/cv_splits"
temp_materialize_path = f"dbfs:/student-groups/Group_2_2/{month_or_year}/temp_xgb_training"

# --- 2. Robust Safety Checks ---
# This REPLACES bad values instead of checking for them
@F.udf(returnType=VectorUDT())
def sanitize_vector(v):
    if v is None: return Vectors.dense([])
    
    # Convert to numpy for fast processing
    arr = v.toArray()
    
    # Replace NaN, Infinity, -Infinity with 0.0
    arr = np.nan_to_num(arr, nan=0.0, posinf=0.0, neginf=0.0)
    
    # Clip huge values to prevent float32 overflow in XGBoost
    arr = np.clip(arr, -1e30, 1e30)
    
    return Vectors.dense(arr)

# --- 3. Data Loading & Pipeline Definition ---
print(f"Loading and optimizing data from {month_or_year}...")

# Sample 20% (Stable)
full_cv_df = spark.read.parquet(cv_path) \
    .sample(withReplacement=False, fraction=0.20, seed=RANDOM_SEED) \
    .na.fill(0) \
    .repartition(480) \
    .cache()

# --- Feature Definitions ---
carrier_indexer = StringIndexer(inputCol="OP_CARRIER", outputCol="carrier_idx", handleInvalid="keep")
origin_indexer = StringIndexer(inputCol="ORIGIN_AIRPORT_SEQ_ID", outputCol="origin_idx", handleInvalid="keep")
dest_indexer = StringIndexer(inputCol="DEST_AIRPORT_SEQ_ID", outputCol="dest_idx", handleInvalid="keep")
tail_num_indexer = StringIndexer(inputCol="TAIL_NUM", outputCol="tail_num_idx", handleInvalid="keep")
holiday_indexer = StringIndexer(inputCol="IS_HOLIDAY", outputCol="holiday_idx", handleInvalid="keep")
holiday_window_indexer = StringIndexer(inputCol="IS_HOLIDAY_WINDOW", outputCol="holiday_window_idx", handleInvalid="keep")
airport_hub_indexer = StringIndexer(inputCol="AIRPORT_HUB_CLASS", outputCol="airport_hub_idx", handleInvalid="keep")
airline_category_indexer = StringIndexer(inputCol="AIRLINE_CATEGORY", outputCol="airline_category_idx", handleInvalid="keep")

carrier_encoder = OneHotEncoder(inputCol="carrier_idx", outputCol="carrier_vec")
origin_encoder = OneHotEncoder(inputCol="origin_idx", outputCol="origin_vec")
dest_encoder = OneHotEncoder(inputCol="dest_idx", outputCol="dest_vec")
tail_num_encoder = OneHotEncoder(inputCol="tail_num_idx", outputCol="tail_num_vec")
holiday_encoder = OneHotEncoder(inputCol="holiday_idx", outputCol="holiday_vec")
holiday_window_encoder = OneHotEncoder(inputCol="holiday_window_idx", outputCol="holiday_window_vec")
airport_hub_encoder = OneHotEncoder(inputCol="airport_hub_idx", outputCol="airport_hub_vec")
airline_category_encoder = OneHotEncoder(inputCol="airline_category_idx", outputCol="airline_category_vec")

# RE-DEFINE Assembler to ensure we output "features" (Standardizing name)
assembler_xgb = VectorAssembler(
    inputCols=[
        "QUARTER",
        "MONTH", 
        "YEAR",
        "DAY_OF_MONTH",
        "DAY_OF_WEEK",
        "carrier_vec",
        "origin_vec",
        "dest_vec",
        "CRS_ELAPSED_TIME",
        "DISTANCE",
        # "tail_num_vec",
        "CRS_DEP_MINUTES",                 # feature eng start
        "prev_flight_delay_in_minutes",
        "prev_flight_delay",
        "origin_delays_4h",
        "delay_origin_7d",
        "delay_origin_carrier_7d",
        "delay_route_7d",
        "flight_count_24h",
        "LANDING_TIME_DIFF_MINUTES",
        "AVG_ARR_DELAY_ORIGIN",
        "AVG_TAXI_OUT_ORIGIN",              # feature eng end
        'HourlyDryBulbTemperature',         # weather start
        'HourlyDewPointTemperature',
        'HourlyRelativeHumidity',
        'HourlyAltimeterSetting',
        'HourlyVisibility',
        'HourlyStationPressure',
        'HourlyWetBulbTemperature',
        'HourlyPrecipitation',
        'HourlyCloudCoverage',
        'HourlyCloudElevation',
        'HourlyWindSpeed',                   # weather end
        'page_rank',               # phase 3 new features start
        'out_degree',
        'in_degree',
        'weighted_out_degree',
        'weighted_in_degree',
        'N_RUNWAYS',
        'betweenness_unweighted',
        'closeness',
        'betweenness',
        'avg_origin_dep_delay',
        'avg_dest_arr_delay',
        'avg_daily_route_flights',
        'avg_route_delay',
        'avg_hourly_flights',
        "holiday_vec",
        "holiday_window_vec",
        "airport_hub_vec",
        "RATING",
        "airline_category_vec"               # phase 3 new features end
    ],
    outputCol="raw_features" # Explicit output name
)

# Re-build pipeline stages list to use this specific assembler
# We exclude the Scaler (not needed for XGBoost)
xgb_pipeline_stages = [
    carrier_indexer, origin_indexer, dest_indexer, 
    tail_num_indexer, holiday_indexer, holiday_window_indexer, 
    airport_hub_indexer, airline_category_indexer,
    
    carrier_encoder, origin_encoder, dest_encoder, 
    tail_num_encoder, holiday_encoder, holiday_window_encoder, 
    airport_hub_encoder, airline_category_encoder,
    
    assembler_xgb
]

global_pipeline = Pipeline(stages=xgb_pipeline_stages)

print("Fitting Global Feature Pipeline...")
feat_model = global_pipeline.fit(full_cv_df)

print("Transforming & Cleaning...")
raw_transformed = feat_model.transform(full_cv_df)

# CRITICAL FIX: Filter BOTH Features and Labels
# clean_df = raw_transformed \
#     .select("features", "DEP_DELAY_NEW", "fold_id", "split_type") \
#     .filter(F.col("DEP_DELAY_NEW").isNotNull()) \
#     .filter(~F.isnan(F.col("DEP_DELAY_NEW"))) \
#     .filter(vector_is_valid(F.col("features"))) \
#     .persist(StorageLevel.DISK_ONLY)

clean_df = raw_transformed \
    .withColumn("features", sanitize_vector(F.col("raw_features"))) \
    .select("features", "DEP_DELAY_NEW", "fold_id", "split_type") \
    .filter(F.col("DEP_DELAY_NEW").isNotNull()) \
    .filter(~F.isnan(F.col("DEP_DELAY_NEW"))) \
    .persist(StorageLevel.DISK_ONLY)

print(f"Materialized {clean_df.count()} CLEAN rows for Tuning.")

# --- 4. Define Parameter Grid ---
grid_search_params = {
    "max_depth": [6], 
    "n_estimators": [50, 100], 
    "learning_rate": [0.05, 0.1]
}

keys, values = zip(*grid_search_params.items())
param_combinations = [dict(zip(keys, v)) for v in itertools.product(*values)]
print(f"Total Parameter Combinations: {len(param_combinations)}")

# --- 5. Run Tuning Loop (Fold-First) ---
results_list = []
fold_scores = {i: {
    'train_mae': [], 'train_rmse': [],
    'val_mae': [], 'val_rmse': []
} for i in range(len(param_combinations))}

print(f"Starting Robust XGBoost Tuning...")

with mlflow.start_run(run_name="POST_PRESENTATION_XGB_STACKED_5_YR_10CV") as parent_run:
    mlflow.log_param("n_combinations", len(param_combinations))
    
    for fold_id in range(1, n_folds + 1):
        print(f"\n=== Processing Fold {fold_id}/{n_folds} ===")
        
        # A. Materialize Clean Training Data
        fold_train_path = f"{temp_materialize_path}/fold_{fold_id}"
        print(f"  Materializing training data to {fold_train_path}...")
        
        # Write clean data
        clean_df.filter((F.col("fold_id") == fold_id) & (F.col("split_type") == "train")) \
            .repartition(TRAIN_PARTITIONS) \
            .write.mode("overwrite").parquet(fold_train_path)
            
        # Read back
        train_vec = spark.read.parquet(fold_train_path).repartition(TRAIN_PARTITIONS)
        
        val_vec = clean_df.filter((F.col("fold_id") == fold_id) & (F.col("split_type") == "validation"))
        
        # INNER LOOP: Parameters
        for idx, params in enumerate(param_combinations):
            param_str = f"depth{params['max_depth']}_est{params['n_estimators']}_lr{params['learning_rate']}"
            
            xgb = SparkXGBRegressor(
                features_col="features",
                label_col="DEP_DELAY_NEW",
                num_workers=6, 
                tree_method="hist", 
                max_depth=params['max_depth'],
                n_estimators=params['n_estimators'],
                learning_rate=params['learning_rate'],
                missing=0.0  # FIX: Explicitly handle sparse zeros/missing
            )
            
            model = xgb.fit(train_vec)

            # --- CALCULATE METRICS (Train vs Val) ---
            
            # 1. Training Metrics (Check for Overfitting)
            train_preds = model.transform(train_vec)
            t_mae = mae_evaluator.evaluate(train_preds)
            t_rmse = rmse_evaluator.evaluate(train_preds)
            
            # 2. Validation Metrics (Generalization)
            val_preds = model.transform(val_vec)
            v_mae = mae_evaluator.evaluate(val_preds)
            v_rmse = rmse_evaluator.evaluate(val_preds)
            
            # Store
            fold_scores[idx]['train_mae'].append(t_mae)
            fold_scores[idx]['train_rmse'].append(t_rmse)
            
            fold_scores[idx]['val_mae'].append(v_mae)
            fold_scores[idx]['val_rmse'].append(v_rmse)
            
            print(f"  Combo {idx+1}: Train MAE={t_mae:.2f} / Val MAE={v_mae:.2f}")
            print(f"  Combo {idx+1}: Train RMSE={t_rmse:.2f} / Val RMSE={v_rmse:.2f}")

    # --- 6. Aggregate Results ---
    print("\n=== Aggregating Results ===")
    for idx, params in enumerate(param_combinations):
        # Calculate Averages
        avg_scores = {k: np.mean(v) for k, v in fold_scores[idx].items()}
        
        with mlflow.start_run(run_name=f"combo_{idx}_summary", nested=True):
            mlflow.log_params(params)
            mlflow.log_metrics(avg_scores)
        
        results_list.append({
            **params,
            "avg_train_mae": avg_scores['train_mae'],
            "avg_val_mae": avg_scores['val_mae'],
            "avg_train_rmse": avg_scores['train_rmse'],
            "avg_val_rmse": avg_scores['val_rmse']
        })
        print(f"Combo {idx+1} Final: Val MAE={avg_scores['val_mae']:.4f}, Train MAE={avg_scores['train_mae']:.4f}")
        print(f"Combo {idx+1} Final: Val RMSE={avg_scores['val_rmse']:.4f}, Train RMSE={avg_scores['train_rmse']:.4f}")

# --- 8. Identify Best Parameters ---
results_df = pd.DataFrame(results_list)
best_row = results_df.loc[results_df['avg_val_mae'].idxmin()]

print("\n" + "="*50)
print(f"WINNER FOUND: {best_row.to_dict()}")
print("="*50)

best_depth = int(best_row['max_depth'])
best_estimators = int(best_row['n_estimators'])
best_lr = float(best_row['learning_rate'])

# # --- 9. Retrain & Log BEST Model ---
# print("Retraining Final Model with Best Parameters...")

# # Define Final Model
# final_xgb = SparkXGBRegressor(
#     features_col="features",
#     label_col="DEP_DELAY_NEW",
#     num_workers=6,
#     tree_method="hist",
#     max_depth=best_depth,
#     n_estimators=best_estimators,
#     learning_rate=best_lr,
#     missing=0.0
# )

# # Pipeline includes the Assembler + Model (Indexers were pre-applied, but safer to include if needed for inference)
# # Since we used 'clean_df' which already has vectors, we fit on that directly.
# # BUT for a reusable model, we usually want the whole pipeline.
# # For simplicity here, we log the model trained on the processed vectors.
# final_model = final_xgb.fit(clean_df.repartition(TRAIN_PARTITIONS))

# with mlflow.start_run(run_name="POST_PRESENTATION_FINAL_BEST_XGB_MODEL") as run:
#     mlflow.log_params(best_row.to_dict())
#     mlflow.spark.log_model(final_model, "model")
#     print(f"Final model saved to MLflow Run: {run.info.run_id}")

# # Cleanup
# dbutils.fs.rm(temp_materialize_path, recurse=True)
# clean_df.unpersist()
# full_cv_df.unpersist()

In [0]:
# --- 9. Retrain & Log BEST Model ---
print("Retraining Final Model with Best Parameters...")

# Define Final Model
final_xgb = SparkXGBRegressor(
    features_col="features",
    label_col="DEP_DELAY_NEW",
    num_workers=6,
    tree_method="hist",
    max_depth=best_depth,
    n_estimators=best_estimators,
    learning_rate=best_lr,
    missing=0.0
)

# Pipeline includes the Assembler + Model (Indexers were pre-applied, but safer to include if needed for inference)
# Since we used 'clean_df' which already has vectors, we fit on that directly.
# BUT for a reusable model, we usually want the whole pipeline.
# For simplicity here, we log the model trained on the processed vectors.
final_model = final_xgb.fit(clean_df.repartition(TRAIN_PARTITIONS).cache())

with mlflow.start_run(run_name="POST_PRESENTATION_FINAL_BEST_XGB_MODEL") as run:
    mlflow.log_params(best_row.to_dict())
    mlflow.spark.log_model(final_model, "model")
    print(f"Final model saved to MLflow Run: {run.info.run_id}")

# Cleanup
dbutils.fs.rm(temp_materialize_path, recurse=True)
clean_df.unpersist()
full_cv_df.unpersist()



# # --- 9. Retrain & Log BEST Model (STABILITY FIX) ---
# print("Preparing data for Final Model...")

# # FIX 1: Materialize to disk first. 
# # This separates the "Shuffle" (heavy) from the "Training" (fragile).
# temp_final_train_path = f"dbfs:/student-groups/Group_2_2/{month_or_year}/temp_xgb_final_train"
# clean_df.repartition(TRAIN_PARTITIONS).write.mode("overwrite").parquet(temp_final_train_path)
# final_train_data = spark.read.parquet(temp_final_train_path)

# print("Retraining Final Model...")

# # FIX 2: Disable Autologging to prevent worker crashes
# # XGBoost's barrier execution hates external hooks.
# mlflow.spark.autolog(disable=True)

# final_xgb = SparkXGBRegressor(
#     features_col="features",
#     label_col="DEP_DELAY_NEW",
#     num_workers=6,
#     tree_method="hist",
#     max_depth=best_depth,
#     n_estimators=best_estimators,
#     learning_rate=best_lr,
#     missing=0.0
# )

# # Fit on the stable, materialized data
# final_model = final_xgb.fit(final_train_data)

# # Re-enable autologging (optional, for future cells)
# mlflow.spark.autolog(disable=False)

# with mlflow.start_run(run_name="POST_PRESENTATION_FINAL_BEST_XGB_MODEL") as run:
#     mlflow.log_params(best_row.to_dict())
    
#     # Log the model artifact manually
#     mlflow.spark.log_model(final_model, "model")
#     print(f"Final model saved to MLflow Run: {run.info.run_id}")

# # Cleanup
# dbutils.fs.rm(temp_materialize_path, recurse=True)
# dbutils.fs.rm(temp_final_train_path, recurse=True)
# clean_df.unpersist()
# full_cv_df.unpersist()

In [0]:
from pyspark.ml.tuning import ParamGridBuilder
from xgboost.spark import SparkXGBRegressor
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.sql import functions as F
from pyspark.storagelevel import StorageLevel
from pyspark.sql.types import BooleanType
from pyspark.ml.linalg import Vectors, VectorUDT
from pyspark.ml.feature import VectorAssembler, StringIndexer, OneHotEncoder
from functools import reduce
import mlflow
import numpy as np
import pandas as pd
import shutil

# --- 1. Config ---
TRAIN_PARTITIONS = 6 
n_folds = 10
month_or_year = "5_year_custom_joined"
cv_path = f"dbfs:/student-groups/Group_2_2/{month_or_year}/cv_splits"
output_path = f"dbfs:/student-groups/Group_2_2/{month_or_year}/stacked_input_optimized"  

# --- 2. Safety UDF ---
@F.udf(returnType=VectorUDT())
def sanitize_vector(v):
    if v is None: return Vectors.dense([])
    
    # Convert to numpy for fast processing
    arr = v.toArray()
    
    # Replace NaN, Infinity, -Infinity with 0.0
    arr = np.nan_to_num(arr, nan=0.0, posinf=0.0, neginf=0.0)
    
    # Clip huge values to prevent float32 overflow in XGBoost
    arr = np.clip(arr, -1e30, 1e30)
    
    return Vectors.dense(arr)

# --- 4. Global Data Loading ---
print(f"Loading 20% Sample from {month_or_year}...")

# 20% Sample (0.20) is the proven stable size
full_cv_df = spark.read.parquet(cv_path) \
    .sample(withReplacement=False, fraction=0.20, seed=RANDOM_SEED) \
    .na.fill(0) \
    .repartition(480) \
    .cache()

# --- 5. Feature Pipeline ---
carrier_indexer = StringIndexer(inputCol="OP_CARRIER", outputCol="carrier_idx", handleInvalid="keep")
origin_indexer = StringIndexer(inputCol="ORIGIN_AIRPORT_SEQ_ID", outputCol="origin_idx", handleInvalid="keep")
dest_indexer = StringIndexer(inputCol="DEST_AIRPORT_SEQ_ID", outputCol="dest_idx", handleInvalid="keep")
tail_num_indexer = StringIndexer(inputCol="TAIL_NUM", outputCol="tail_num_idx", handleInvalid="keep")
holiday_indexer = StringIndexer(inputCol="IS_HOLIDAY", outputCol="holiday_idx", handleInvalid="keep")
holiday_window_indexer = StringIndexer(inputCol="IS_HOLIDAY_WINDOW", outputCol="holiday_window_idx", handleInvalid="keep")
airport_hub_indexer = StringIndexer(inputCol="AIRPORT_HUB_CLASS", outputCol="airport_hub_idx", handleInvalid="keep")
airline_category_indexer = StringIndexer(inputCol="AIRLINE_CATEGORY", outputCol="airline_category_idx", handleInvalid="keep")

carrier_encoder = OneHotEncoder(inputCol="carrier_idx", outputCol="carrier_vec")
origin_encoder = OneHotEncoder(inputCol="origin_idx", outputCol="origin_vec")
dest_encoder = OneHotEncoder(inputCol="dest_idx", outputCol="dest_vec")
tail_num_encoder = OneHotEncoder(inputCol="tail_num_idx", outputCol="tail_num_vec")
holiday_encoder = OneHotEncoder(inputCol="holiday_idx", outputCol="holiday_vec")
holiday_window_encoder = OneHotEncoder(inputCol="holiday_window_idx", outputCol="holiday_window_vec")
airport_hub_encoder = OneHotEncoder(inputCol="airport_hub_idx", outputCol="airport_hub_vec")
airline_category_encoder = OneHotEncoder(inputCol="airline_category_idx", outputCol="airline_category_vec")

assembler_xgb = VectorAssembler(
    inputCols=[
        "QUARTER",
        "MONTH", 
        "YEAR",
        "DAY_OF_MONTH",
        "DAY_OF_WEEK",
        "carrier_vec",
        "origin_vec",
        "dest_vec",
        "CRS_ELAPSED_TIME",
        "DISTANCE",
        # "tail_num_vec",
        "CRS_DEP_MINUTES",                 # feature eng start
        "prev_flight_delay_in_minutes",
        "prev_flight_delay",
        "origin_delays_4h",
        "delay_origin_7d",
        "delay_origin_carrier_7d",
        "delay_route_7d",
        "flight_count_24h",
        "LANDING_TIME_DIFF_MINUTES",
        "AVG_ARR_DELAY_ORIGIN",
        "AVG_TAXI_OUT_ORIGIN",              # feature eng end
        'HourlyDryBulbTemperature',         # weather start
        'HourlyDewPointTemperature',
        'HourlyRelativeHumidity',
        'HourlyAltimeterSetting',
        'HourlyVisibility',
        'HourlyStationPressure',
        'HourlyWetBulbTemperature',
        'HourlyPrecipitation',
        'HourlyCloudCoverage',
        'HourlyCloudElevation',
        'HourlyWindSpeed',                   # weather end
        'page_rank',               # phase 3 new features start
        'out_degree',
        'in_degree',
        'weighted_out_degree',
        'weighted_in_degree',
        'N_RUNWAYS',
        'betweenness_unweighted',
        'closeness',
        'betweenness',
        'avg_origin_dep_delay',
        'avg_dest_arr_delay',
        'avg_daily_route_flights',
        'avg_route_delay',
        'avg_hourly_flights',
        "holiday_vec",
        "holiday_window_vec",
        "airport_hub_vec",
        "RATING",
        "airline_category_vec"               # phase 3 new features end
    ],
    outputCol="raw_features"
)

pipeline = Pipeline(stages=[
    carrier_indexer, origin_indexer, dest_indexer, 
    tail_num_indexer, holiday_indexer, holiday_window_indexer, 
    airport_hub_indexer, airline_category_indexer,
    
    carrier_encoder, origin_encoder, dest_encoder, 
    tail_num_encoder, holiday_encoder, holiday_window_encoder, 
    airport_hub_encoder, airline_category_encoder,
    
    assembler_xgb
])

print("Fitting Global Feature Pipeline...")
feat_model = pipeline.fit(full_cv_df)

print("Transforming & Cleaning...")
# clean_df = feat_model.transform(full_cv_df) \
#     .filter(F.col("DEP_DELAY_NEW").isNotNull()) \
#     .filter(~F.isnan(F.col("DEP_DELAY_NEW"))) \
#     .filter(vector_is_valid(F.col("features"))) \
#     .persist(StorageLevel.DISK_ONLY)
clean_df = feat_model.transform(full_cv_df) \
    .withColumn("features", sanitize_vector(F.col("raw_features"))) \
    .filter(F.col("DEP_DELAY_NEW").isNotNull()) \
    .filter(~F.isnan(F.col("DEP_DELAY_NEW"))) \
    .persist(StorageLevel.DISK_ONLY)

print(f"Materialized {clean_df.count()} CLEAN rows for Stacking.")

# --- 6. Stacking Generation Loop ---
out_of_fold_predictions = []
# Updated history to track both Train and Val
metrics_history = {
    'train_mae': [], 'train_rmse': [],
    'val_mae': [], 'val_rmse': []
}

print(f"\nStarting Stacked Feature Generation (Depth={best_depth}, Est={best_estimators})...")

# we're retraining the same model here since the model we saved above has already seen the whole dataset, so we want to only generate results for the held-out folds, so we'll do that below
with mlflow.start_run(run_name="POST_PRESENTATION_FINAL_BEST_XGB_STACKED_OUTPUT_5_YR_10CV") as run: 
    # Log Parameters
    mlflow.log_param("xgb_max_depth", best_depth)
    mlflow.log_param("xgb_n_estimators", best_estimators)
    mlflow.log_param("xgb_learning_rate", best_lr)
    mlflow.log_param("sample_fraction", 0.20)
    
    # Define Base Model
    xgb = SparkXGBRegressor(
        features_col="features",
        label_col="DEP_DELAY_NEW",
        num_workers=6,
        tree_method="hist", 
        max_depth=best_depth,
        n_estimators=best_estimators,
        learning_rate=best_lr,
        missing=0.0
    )

    for fold_id in range(1, n_folds + 1):
        print(f"\nProcessing Fold {fold_id}/{n_folds}...")
        
        # 1. Prepare Training Data
        train_vec = clean_df.filter((F.col("fold_id") != fold_id)) \
            .sample(withReplacement=False, fraction=0.20) \
            .repartition(TRAIN_PARTITIONS)
        
        # 2. Prepare Validation Data
        val_vec = clean_df.filter((F.col("fold_id") == fold_id))
        
        # 3. Fit Model
        model = xgb.fit(train_vec)
        
        # 4. TRAINING METRICS (New)
        train_preds = model.transform(train_vec)
        t_mae = mae_evaluator.evaluate(train_preds)
        t_rmse = rmse_evaluator.evaluate(train_preds)
        
        metrics_history['train_mae'].append(t_mae)
        metrics_history['train_rmse'].append(t_rmse)
        
        # 5. VALIDATION METRICS
        val_preds = model.transform(val_vec)
        v_mae = mae_evaluator.evaluate(val_preds)
        v_rmse = rmse_evaluator.evaluate(val_preds)
        
        metrics_history['val_mae'].append(v_mae)
        metrics_history['val_rmse'].append(v_rmse)
        
        print(f"  Fold {fold_id}: Train MAE={t_mae:.2f} | Val MAE={v_mae:.2f}")
        print(f"  Fold {fold_id}: Train RMSE={t_rmse:.2f} | Val RMSE={v_rmse:.2f}")
        
        # Log Fold Metrics
        # mlflow.log_metric(f"fold_{fold_id}_train_mae", t_mae)
        # mlflow.log_metric(f"fold_{fold_id}_val_mae", v_mae)
        # mlflow.log_metric(f"fold_{fold_id}_train_rmse", t_rmse)
        # mlflow.log_metric(f"fold_{fold_id}_val_rmse", v_rmse)
        
        # 6. Format Output (Drop features to save space)
        val_preds_clean = val_preds \
            .withColumnRenamed("prediction", "xgb_predicted_delay") \
            .drop("features") 
        
        out_of_fold_predictions.append(val_preds_clean)

    # Log Average Metrics
    mlflow.log_metric("avg_train_mae", np.mean(metrics_history['train_mae']))
    mlflow.log_metric("avg_train_rmse", np.mean(metrics_history['train_rmse']))
    mlflow.log_metric("avg_val_mae", np.mean(metrics_history['val_mae']))
    mlflow.log_metric("avg_val_rmse", np.mean(metrics_history['val_rmse']))
    
    # --- 7. Train & Log FINAL Model ---
    print("\nTraining Final XGBoost Model on Full Data...")
    
    final_train = clean_df.repartition(TRAIN_PARTITIONS)
    final_model = xgb.fit(final_train)
    
    mlflow.spark.log_model(final_model, "stacked_xgb_model")
    print(f"Final Model Saved to MLflow Run: {run.info.run_id}")

    # --- 8. Save Stacked Dataset ---
    print("\nUnioning and Saving Stacked Dataset...")
    
    stacked_dataset = reduce(lambda df1, df2: df1.union(df2), out_of_fold_predictions)

    cols_to_drop = [
        "features", "raw_features", 
        "carrier_vec", "origin_vec", "dest_vec", "tail_num_vec", 
        "holiday_vec", "holiday_window_vec", "airport_hub_vec", "airline_category_vec"
    ]
    final_output = stacked_dataset.drop(*cols_to_drop)

    final_output.write.mode("overwrite").parquet(output_path)
    print(f"SUCCESS: Stacked dataset saved to {output_path}")

# Cleanup
clean_df.unpersist()
full_cv_df.unpersist()

In [0]:
from pyspark.ml.classification import MultilayerPerceptronClassifier
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler, StandardScaler
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.sql import functions as F
from pyspark.storagelevel import StorageLevel
from pyspark.sql.types import BooleanType
import mlflow
import numpy as np
import pandas as pd

# --- 1. Optimization Config ---
OPTIMAL_PARTITIONS = 480 
# We use this for repartitioning inside the loop to keep tasks small and fast for MLP
TRAIN_PARTITIONS = 480 
n_folds = 10 
month_or_year = "5_year_custom_joined"
stacked_input_path = f"dbfs:/student-groups/Group_2_2/{month_or_year}/stacked_input_optimized"

# --- 2. Safety UDF ---
@F.udf(returnType=BooleanType())
def vector_is_valid(v):
    if v is None: return False
    if np.any(np.isinf(v.values)): return False
    if np.any(np.isnan(v.values)): return False
    if np.max(np.abs(v.values)) > 1e30: return False
    return True

# --- 3. Global Data Loading & Preprocessing ---
print(f"Loading and optimizing data from {month_or_year}...")

# Sample 20%
full_cv_df = spark.read.parquet(stacked_input_path) \
    .sample(withReplacement=False, fraction=0.20, seed=RANDOM_SEED) \
    .repartition(OPTIMAL_PARTITIONS) \
    .cache()

# Define Pipeline Stages
carrier_encoder = OneHotEncoder(inputCol="carrier_idx", outputCol="carrier_vec")
origin_encoder = OneHotEncoder(inputCol="origin_idx", outputCol="origin_vec")
dest_encoder = OneHotEncoder(inputCol="dest_idx", outputCol="dest_vec")
tail_num_encoder = OneHotEncoder(inputCol="tail_num_idx", outputCol="tail_num_vec")
holiday_encoder = OneHotEncoder(inputCol="holiday_idx", outputCol="holiday_vec")
holiday_window_encoder = OneHotEncoder(inputCol="holiday_window_idx", outputCol="holiday_window_vec")
airport_hub_encoder = OneHotEncoder(inputCol="airport_hub_idx", outputCol="airport_hub_vec")
airline_category_encoder = OneHotEncoder(inputCol="airline_category_idx", outputCol="airline_category_vec")

assembler = VectorAssembler(
    inputCols=[
        "QUARTER",
        "MONTH", 
        "YEAR",
        "DAY_OF_MONTH",
        "DAY_OF_WEEK",
        "carrier_vec",
        "origin_vec",
        "dest_vec",
        "CRS_ELAPSED_TIME",
        "DISTANCE",
        # "tail_num_vec",
        "CRS_DEP_MINUTES",                 # feature eng start
        "prev_flight_delay_in_minutes",
        "prev_flight_delay",
        "origin_delays_4h",
        "delay_origin_7d",
        "delay_origin_carrier_7d",
        "delay_route_7d",
        "flight_count_24h",
        "LANDING_TIME_DIFF_MINUTES",
        "AVG_ARR_DELAY_ORIGIN",
        "AVG_TAXI_OUT_ORIGIN",              # feature eng end
        'HourlyDryBulbTemperature',         # weather start
        'HourlyDewPointTemperature',
        'HourlyRelativeHumidity',
        'HourlyAltimeterSetting',
        'HourlyVisibility',
        'HourlyStationPressure',
        'HourlyWetBulbTemperature',
        'HourlyPrecipitation',
        'HourlyCloudCoverage',
        'HourlyCloudElevation',
        'HourlyWindSpeed',                   # weather end
        'page_rank',               # phase 3 new features start
        'out_degree',
        'in_degree',
        'weighted_out_degree',
        'weighted_in_degree',
        'N_RUNWAYS',
        'betweenness_unweighted',
        'closeness',
        'betweenness',
        'avg_origin_dep_delay',
        'avg_dest_arr_delay',
        'avg_daily_route_flights',
        'avg_route_delay',
        'avg_hourly_flights',
        "holiday_vec",
        "holiday_window_vec",
        "airport_hub_vec",
        "RATING",
        "airline_category_vec",               # phase 3 new features end
        "xgb_predicted_delay" 
    ],
    outputCol="raw_features"
)

scaler = StandardScaler(inputCol="raw_features", outputCol="scaled_features", withStd=True, withMean=False)

# Build & Fit Pipeline ONCE
global_pipeline = Pipeline(stages=[
    carrier_encoder, origin_encoder, dest_encoder, 
    tail_num_encoder, holiday_encoder, holiday_window_encoder, 
    airport_hub_encoder, airline_category_encoder,
    
    assembler, 
    scaler
])

print("Fitting Global Feature Pipeline (One-Time Fit)...")
feat_model = global_pipeline.fit(full_cv_df)

print("Transforming & Caching Vectors...")
featurized_df = feat_model.transform(full_cv_df) \
    .select("scaled_features", "DEP_DEL15", "fold_id", "split_type") \
    .filter(vector_is_valid(F.col("scaled_features"))) \
    .persist(StorageLevel.DISK_ONLY)

print(f"Materialized {featurized_df.count()} rows for Architecture Search.")

# --- 4. Define Architectures ---
hidden_layer_grid = [
    # [32],               
    [64, 32],           
    [128, 64],          
    [64, 32, 16]
]

# --- 5. Evaluators ---
f1_evaluator = MulticlassClassificationEvaluator(labelCol="DEP_DEL15", metricName="f1")
f2_evaluator = MulticlassClassificationEvaluator(labelCol="DEP_DEL15", metricName="weightedFMeasure", beta=2.0)
f2_label_evaluator = MulticlassClassificationEvaluator(labelCol="DEP_DEL15", metricName="fMeasureByLabel", metricLabel=1.0, beta=2.0)
precision_evaluator = MulticlassClassificationEvaluator(labelCol="DEP_DEL15", metricName="weightedPrecision")
recall_evaluator = MulticlassClassificationEvaluator(labelCol="DEP_DEL15", metricName="weightedRecall")

# --- 6. Tuning Loop ---
results_list = []

print(f"Starting MLP Architecture Search (Target: Delay-Class F2)...")

with mlflow.start_run(run_name="POST_PRESENTATION_MLP_5YR_STACKED_BALANCED_10CV") as parent_run: 
    
    input_dim = len(featurized_df.first()["scaled_features"])
    print(f"Detected Input Dimension: {input_dim}")
    mlflow.log_param("input_dim", input_dim)
    
    for idx, hidden_config in enumerate(hidden_layer_grid):
        
        config_str = "-".join(map(str, hidden_config)) 
        print(f"\n--- Testing Arch {idx+1}/{len(hidden_layer_grid)}: Hidden=[{config_str}] ---")
        
        # Track both Train and Val metrics
        fold_metrics = {
            'train_f1': [], 'train_f2': [], 'train_f2_label': [], 'train_precision': [], 'train_recall': [],
            'val_f1': [], 'val_f2': [], 'val_f2_label': [], 'val_precision': [], 'val_recall': []
        }
        
        with mlflow.start_run(run_name=f"arch_{config_str}", nested=True) as child_run:
            mlflow.log_param("hidden_layers", str(hidden_config))
            
            for fold_id in range(1, n_folds + 1):
                # 1. Split Data
                train_raw = featurized_df.filter((F.col("fold_id") != fold_id))
                val_vec = featurized_df.filter((F.col("fold_id") == fold_id))
                
                # 2. BALANCE TRAINING DATA
                train_pos = train_raw.filter(F.col("DEP_DEL15") == 1.0)
                train_neg = train_raw.filter(F.col("DEP_DEL15") == 0.0)
                
                pos_count = train_pos.count()
                neg_count = train_neg.count()
                fraction = pos_count / neg_count
                
                train_neg_sampled = train_neg.sample(withReplacement=False, fraction=fraction, seed=42)
                train_balanced = train_pos.union(train_neg_sampled).repartition(TRAIN_PARTITIONS)
                
                print(f"    Fold {fold_id}: Training on Balanced Data ({pos_count} Pos)")
                
                # 3. Define & Fit
                full_layers = [input_dim] + hidden_config + [2]
                
                mlp = MultilayerPerceptronClassifier(
                    featuresCol="scaled_features",
                    labelCol="DEP_DEL15",
                    layers=full_layers,
                    blockSize=128,
                    maxIter=100,
                    stepSize=0.03
                )
                
                mlp_model = mlp.fit(train_balanced)
                
                # 4. Predict & Evaluate (TRAIN - Check Overfitting)
                train_preds = mlp_model.transform(train_balanced).select("prediction", "DEP_DEL15")
                
                # 5. Predict & Evaluate (VAL - Check Performance)
                val_preds = mlp_model.transform(val_vec).select("prediction", "DEP_DEL15")
                
                metrics = {
                    "val_f1": f1_evaluator.evaluate(val_preds),
                    "val_f2": f2_evaluator.evaluate(val_preds),
                    "val_f2_label": f2_label_evaluator.evaluate(val_preds),
                    "val_precision": precision_evaluator.evaluate(val_preds),
                    "val_recall": recall_evaluator.evaluate(val_preds),
                    "train_f1": f1_evaluator.evaluate(train_preds),
                    "train_f2": f2_evaluator.evaluate(train_preds),
                    "train_f2_label": f2_label_evaluator.evaluate(train_preds),
                    "train_precision": precision_evaluator.evaluate(train_preds),
                    "train_recall": recall_evaluator.evaluate(train_preds)
                }
                
                # Log fold metrics
                # mlflow.log_metrics(metrics)
                
                print(f"    Result: Train F2-Delay={metrics['train_f2_label']:.4f} | Val F2-Delay={metrics['val_f2_label']:.4f}")
                
                for k in fold_metrics.keys():
                    fold_metrics[k].append(metrics[k])
            
            # Aggregate Results
            avg_metrics = {f"avg_{k}": np.mean(v) for k, v in fold_metrics.items()}
            mlflow.log_metrics(avg_metrics)
            
            results_list.append({
                "hidden_config": str(hidden_config),
                "avg_train_f2_label": avg_metrics['avg_train_f2_label'],
                "avg_val_f2_label": avg_metrics['avg_val_f2_label']
            })
            
            print(f"  Arch [{config_str}] Final Train Delay F2 = {avg_metrics['avg_train_f2_label']:.4f}")
            print(f"  Arch [{config_str}] Final Val Delay F2 = {avg_metrics['avg_val_f2_label']:.4f}")

# --- 7. Select Winner & Log Final Model ---
results_df = pd.DataFrame(results_list)
best_row = results_df.loc[results_df['avg_val_f2_label'].idxmax()] 
best_config_str = best_row['hidden_config'] # e.g. "[64, 32]"
best_config_list = eval(best_config_str)

print("\n" + "="*50)
print(f"WINNER FOUND: {best_config_str}")
print("="*50)

print("Retraining Final Stacked Model...")

# 1. Balance Full Dataset (Downsample global majority)
full_pos = featurized_df.filter(F.col("DEP_DEL15") == 1.0)
full_neg = featurized_df.filter(F.col("DEP_DEL15") == 0.0)
fraction = full_pos.count() / full_neg.count()

# Re-combine to get the Training Set for the Final Model
full_balanced = full_pos.union(full_neg.sample(False, fraction, seed=42)).repartition(OPTIMAL_PARTITIONS)

# 2. Train Final Model
final_layers = [input_dim] + best_config_list + [2]
final_mlp = MultilayerPerceptronClassifier(
    featuresCol="scaled_features",
    labelCol="DEP_DEL15",
    layers=final_layers,
    blockSize=128,
    maxIter=100,
    stepSize=0.03
)
final_model = final_mlp.fit(full_balanced)

# --- NEW: Calculate Final Training Metrics ---
print("Calculating Final Training Metrics...")
final_train_preds = final_model.transform(full_balanced).select("prediction", "DEP_DEL15").cache()
final_train_preds.count() # Materialize

final_metrics = {
    "final_train_f1": f1_evaluator.evaluate(final_train_preds),
    "final_train_f2": f2_evaluator.evaluate(final_train_preds),
    "final_train_f2_label": f2_label_evaluator.evaluate(final_train_preds),
    "final_train_precision": precision_evaluator.evaluate(final_train_preds),
    "final_train_recall": recall_evaluator.evaluate(final_train_preds)
}

print(f"Final Model Training F2 (Delay Class): {final_metrics['final_train_f2_label']:.4f}")

# 3. Log to MLflow
with mlflow.start_run(run_name="POST_PRESENTATION_FINAL_BEST_STACKED_MLP") as run: 
    # Log Params
    mlflow.log_param("hidden_layers", best_config_str)
    
    # Log Metrics (NEW)
    mlflow.log_metrics(final_metrics)
    
    # Log Model Artifact
    mlflow.spark.log_model(final_model, "model")
    print(f"Final Stacked Model & Metrics saved to Run: {run.info.run_id}")

# Cleanup
final_train_preds.unpersist()
featurized_df.unpersist()
full_cv_df.unpersist()