In [0]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, Bucketizer, SQLTransformer, VectorAssembler
from pyspark.ml.classification import GBTClassifier

from ml_utils import evaluate
from TargetEncoder import TargetEncoder

import mlflow
import mlflow.spark
from mlflow.models import infer_signature

from hyperopt import fmin, tpe, hp, Trials, STATUS_OK
import numpy as np
    
random_seed = 69

In [0]:
def get_processing_pipeline(train_df):
    print("Number of initial features:", len(train_df.columns) - 2)     # -1 for label col, -1 for class weight col

    bool_cols = ["DepartureExtremeWeather", "ArrivalExtremeWeather"]
    # Aggregate features: Temperature: Max deviation from 10; Precipitation: Max; WindSpeed: Max
    OPTIMAL_TEMP = 10

    sql = "SELECT *,\n"
    for col in bool_cols:
        sql += f"\tCAST({col} AS int) AS {col}Int,\n"
    
    for prefix in ["Departure", "Arrival"]:
        sql += f"\tGREATEST(ABS({prefix}TemperatureLag2 - {OPTIMAL_TEMP}), ABS({prefix}TemperatureLag1 - {OPTIMAL_TEMP}), ABS({prefix}Temperature - {OPTIMAL_TEMP}), ABS({prefix}TemperatureLead1 - {OPTIMAL_TEMP}), ABS({prefix}TemperatureLead2 - {OPTIMAL_TEMP})) AS {prefix}TemperatureMaxDeviation,\n"
        sql += f"\tGREATEST({prefix}PrecipitationLag2, {prefix}PrecipitationLag1, {prefix}Precipitation, {prefix}PrecipitationLead1, {prefix}PrecipitationLead2) AS {prefix}PrecipitationMax,\n"
        sql += f"\tGREATEST({prefix}WindSpeedLag2, {prefix}WindSpeedLag1, {prefix}WindSpeed, {prefix}WindSpeedLead1, {prefix}WindSpeedLead2) AS {prefix}WindSpeedMax,\n"

    sql = sql[:-2] + "\nFROM __THIS__"

    sql_transformer = SQLTransformer(statement=sql)
    
    bucketizer_cols = ["DayOfMonth", "ScheduledDepartureTime", "ScheduledArrivalTime"]
    bucketizer_splits = [
        [-float("inf"), 10, 20, 31, float("inf")],
        [-float("inf"), 300, 600, 900, 1200, 1500, 1800, 2100, float("inf")],
        [-float("inf"), 300, 600, 900, 1200, 1500, 1800, 2100, float("inf")],
    ]

    bucketizer = Bucketizer(
        splitsArray=bucketizer_splits,
        inputCols=bucketizer_cols,
        outputCols=[f"{col}Bucket" for col in bucketizer_cols],
        handleInvalid="keep"
    )
    string_indexers_cols = ["Quarter", "Month", "DayOfWeek", "AirlineName", "OriginState", "DestinationState", "DepartureWeatherCondition", "ArrivalWeatherCondition"]

    string_indexer = StringIndexer(
        inputCols=string_indexers_cols,
        outputCols=[f"{col}Index" for col in string_indexers_cols],
        handleInvalid="keep"
    )
    target_encoder = TargetEncoder(
        inputCols=["OriginAirport", "DestinationAirport"],
        outputCols=["OriginAirportTargetEncoded", "DestinationAirportTargetEncoded"],
        labelCol="DepartureDelayed",
        handleInvalid="keep"
    )
    numeric_cols = [col for col in train_df.columns if any(base_col in col for base_col in ["Temperature", "DewPoint", "Humidity", "Precipitation", "WindSpeed", "Pressure"])] + \
        ["DepartureTemperatureMaxDeviation", "DeparturePrecipitationMax", "DepartureWindSpeedMax", "ArrivalTemperatureMaxDeviation", "ArrivalPrecipitationMax", "ArrivalWindSpeedMax"] + \
        ["ScheduledElapsedTime"]

    final_cols = ["DepartureExtremeWeatherInt", "ArrivalExtremeWeatherInt"] \
        + [f"{col}Bucket" for col in bucketizer_cols] \
        + [f"{col}Index" for col in string_indexers_cols] \
        + numeric_cols
        
    print("Number of final features:", len(final_cols))
        
    vector_assembler = VectorAssembler(
        inputCols=final_cols,
        outputCol="features"
    )

    pipeline_stages = [
        sql_transformer,
        bucketizer,
        string_indexer,
        target_encoder,
        vector_assembler,
    ]
    return pipeline_stages

In [0]:
def add_class_weight(train_df):
    pos_count = train_df.filter(F.col("DepartureDelayed") == 1).count()
    neg_count = train_df.filter(F.col("DepartureDelayed") != 1).count()
    balancing_ratio = neg_count / float(pos_count)

    train_df_weighted = train_df \
        .withColumn( 
            "ClassWeight",
            F.when(F.col("DepartureDelayed") == 1, balancing_ratio).otherwise(1.0)
        )
    return train_df_weighted, balancing_ratio

In [0]:
train_df = spark.read.parquet("gs://flight-delay-pred-data/ml/dataset_delayed_2014_to_2024/train.parquet")
val_df = spark.read.parquet("gs://flight-delay-pred-data/ml/dataset_delayed_2014_to_2024/val.parquet")

train_df_weighted, balancing_ratio = add_class_weight(train_df)
processing_pipeline_stages = get_processing_pipeline(train_df_weighted) 

Number of initial features: 76
Number of final features: 80


In [0]:
def train_eval(params):
    maxDepth = params["maxDepth"]
    maxIter = params["maxIter"]
    stepSize = params["stepSize"]

    gbt = GBTClassifier(
        featuresCol="features",
        labelCol="DepartureDelayed",
        stepSize=stepSize,
        maxDepth=maxDepth,
        maxIter=maxIter,
        maxBins=64,
        seed=random_seed,
        weightCol="ClassWeight"
    )
    model_pipeline_stages = processing_pipeline_stages + [gbt]
    pipeline = Pipeline(stages=model_pipeline_stages)
    
    with mlflow.start_run(run_name=f"gbt-{stepSize}-{maxIter}-{maxDepth}") as run:
        model = pipeline.fit(train_df_weighted)
        train_predictions = model.transform(train_df)
        val_predictions = model.transform(val_df)
        
        train_metrics = evaluate(train_predictions)
        val_metrics = evaluate(val_predictions)

        signature = infer_signature(train_df.drop('DepartureDelayed'), train_predictions.select("prediction"))
        mlflow.spark.log_model(
            model, 
            f"gbt-{stepSize}-{maxIter}-{maxDepth}", 
            signature=signature, 
            code_paths=["./TargetEncoder.py"],
            dfs_tmpdir="dbfs:/tmp/mlflow"
        )
        
        mlflow.log_params({
            "stepSize": stepSize,
            "maxIter": maxIter,
            "maxDepth": maxDepth,
            "balancing_ratio": balancing_ratio
        })
        
        mlflow.log_metrics({
            "train_accuracy": train_metrics["accuracy"],
            "train_precision": train_metrics["precision"],
            "train_recall": train_metrics["recall"],
            "train_f2": train_metrics["f2"],
            "train_prauc": train_metrics["prauc"],
            "val_accuracy": val_metrics["accuracy"],
            "val_precision": val_metrics["precision"],
            "val_recall": val_metrics["recall"],
            "val_f2": val_metrics["f2"],
            "val_prauc": val_metrics["prauc"]
        })

        loss = -val_metrics["recall"]
        return {"loss": loss, "status": STATUS_OK}



In [0]:
search_space = {
    "maxDepth": hp.choice("maxDepth", [5,6 , 7, 8]),
    "maxIter": hp.choice("maxIter", [50, 75, 100, 125, 150]),
    "stepSize": hp.uniform("stepSize", 0.07, 0.15)
}

In [0]:
username = dbutils.notebook.entry_point.getDbutils().notebook().getContext().tags().get("user").get()
mlflow.set_experiment(f"/Users/{username}/experiments/gbt-2014_to_2024-classweight100")

def train_eval_wrapper(params):
    return train_eval(params)

best_params = fmin(
    fn=train_eval_wrapper,
    space=search_space, 
    algo=tpe.suggest,
    max_evals=1,
    trials=Trials(),
    rstate=np.random.default_rng(random_seed),
)