
# RandomForest Phase 3


#### 1. setup

In [0]:

# The following blob storage is accessible to team members only (read and write)
# access key is valid til TTL
# after that you will need to create a new SAS key and authenticate access again via DataBrick command line
blob_container = "261-prrjckaw-container"  # The name of your container created in https://portal.azure.com
storage_account = "261prrjckaw"  # The name of your Storage account created in https://portal.azure.com
secret_scope = "261-project-scope"  # The name of the scope created in your local computer using the Databricks CLI
secret_key = "261-project-key"  # The name of the secret key created in your local computer using the Databricks CLI
team_blob_url = f"wasbs://{blob_container}@{storage_account}.blob.core.windows.net"  # points to the root of your team storage bucket


# the 261 course blob storage is mounted here.
mids261_mount_path = "/mnt/mids-w261"

# SAS Token: Grant the team limited access to Azure Storage resources
spark.conf.set(
    f"fs.azure.sas.{blob_container}.{storage_account}.blob.core.windows.net",
    dbutils.secrets.get(scope=secret_scope, key=secret_key),
)


# see what's in the blob storage root folder
display(dbutils.fs.ls(f"{team_blob_url}"))

path,name,size,modificationTime
wasbs://261-prrjckaw-container@261prrjckaw.blob.core.windows.net/TP/,TP/,0,1702403305000


In [0]:
import pyspark.sql.functions as F
from pyspark.sql.functions import col, regexp_replace

from pyspark.ml import Pipeline
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.classification import (
    LogisticRegression,
    RandomForestClassifier,
    LinearSVC,
    DecisionTreeClassifier,
    GBTClassifier,
)
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.ml.tuning import ParamGridBuilder
from pyspark.sql.types import FloatType
from pyspark.sql.window import Window

import numpy as np
from pprint import pprint
from copy import deepcopy

#### 1a. Global Vars

In [0]:
random_seed = 100

raw_cols = [
    "STATION",
    "DATE",
    "LATITUDE",
    "ELEVATION",
    "LONGITUDE",
    "HourlyAltimeterSetting",
    "HourlyDewPointTemperature",
    "HourlyDryBulbTemperature",
    "HourlyPrecipitation",
    "HourlyPresentWeatherType",
    "HourlyPressureChange",
    "HourlyPressureTendency",
    "HourlyRelativeHumidity",
    "HourlySkyConditions",
    "HourlySeaLevelPressure",
    "HourlyStationPressure",
    "HourlyVisibility",
    "HourlyWetBulbTemperature",
    "HourlyWindDirection",
    "HourlyWindGustSpeed",
    "HourlyWindSpeed",
    #"w_year",
    #"w_month",
    #"w_day",
    "DAY_OF_MONTH",
    "DAY_OF_WEEK",
    "YEAR",
    "FL_DATE",
    "OP_CARRIER",
    #"TAIL_NUM",
    #"OP_CARRIER_FL_NUM",
    "ORIGIN",
    #"ORIGIN_CITY_NAME",
    #"ORIGIN_STATE_NM",
    #"ORIGIN_WAC",
    #"DEST_AIRPORT_ID",
    "DEST",
    #"DEST_CITY_NAME",
    #"DEST_STATE_NM",
    "CRS_ARR_TIME",
    "CRS_DEP_TIME",
    "CRS_ELAPSED_TIME",
    "DISTANCE",
    #"DISTANCE_GROUP",
    "DEP_DEL15",
    #"station_id",
    #"utc_departure",
    #"airport_code",
    #"before_utc_departure",
    #"f_year",
    #"f_month",
    #"f_day",
    #"time_distance",
    #"rank",
    "QUARTER",
    "MONTH",
    "RecentHourlyPrecipitationDelta",
    "RecentHourlyRelativeHumidityDelta",
    "HolidayTravel",
    "ORIGIN_DELAY_SUM",
    "TAIL_NUM_PREV_DELAY",
]


feature_interact_columns = [
    "Precipitation_Carrier",
    "Temperature_Month",
    "WindSpeed_Origin",
    "Visibility_OriginAirport",
    "Precipitation_at_DepTime_and_Origin",
    "Origin_CRS_DEP_TIME",
    "Carrier_DayOfWeek",
    "Month_CRS_DEP_TIME",
    "Carrier_CRS_DEP_TIME",
    "DayOfWeek_CRS_DEP_TIME",
]

weather_interactions = [
    "WindSpeed_Visibility",
    "DewPoint_Humidity",
    "Temp_WindSpeed",
    "PressureTendency_Altimeter",
    "Visibility_SkyConditions",
    "WindDir_GustSpeed",
    "SeaLevel_StationPressure",
    "Precipitation_WetBulbTemp",
    "PressureChange_DewPoint",
    "Humidity_Temperature",
]

float_cols = [
    "DATE",
    "LATITUDE",
    "ELEVATION",
    "LONGITUDE",
    "CRS_ARR_TIME",
    "CRS_DEP_TIME",
    "QUARTER",
    "MONTH",
    "DAY_OF_MONTH",
    "DAY_OF_WEEK",
    "YEAR",
    "FL_DATE",
    "CRS_ELAPSED_TIME",
    "DISTANCE",
    "DEP_DEL15",
    "HourlyAltimeterSetting",
    "HourlyDewPointTemperature",
    "HourlyDryBulbTemperature",
    "HourlyPrecipitation",
    "HourlyPresentWeatherType",
    "HourlyPressureChange",
    "HourlyPressureTendency",
    "HourlyRelativeHumidity",
    "HourlySkyConditions",
    "HourlySeaLevelPressure",
    "HourlyStationPressure",
    "HourlyVisibility",
    "HourlyWetBulbTemperature",
    "HourlyWindDirection",
    "HourlyWindGustSpeed",
    "HourlyWindSpeed",
    #"utc_departure",
    #"before_utc_departure",
    #"time_distance",
    "RecentHourlyPrecipitationDelta",
    "RecentHourlyRelativeHumidityDelta",
    "ORIGIN_DELAY_SUM",
    "TAIL_NUM_PREV_DELAY",
]

cat_cols = [
    "STATION",
    "OP_CARRIER",
    #"TAIL_NUM",
    #"OP_CARRIER_FL_NUM",
    "ORIGIN",
    #"ORIGIN_CITY_NAME",
    #"ORIGIN_STATE_NM",
    #"ORIGIN_WAC",
    #"DEST_AIRPORT_ID",
    "DEST",
    #"DEST_CITY_NAME",
    #"DEST_STATE_NM",
    #"DISTANCE_GROUP",
    #"station_id",
    #"airport_code",
    #"w_year",
    #"w_month",
    #"w_day",
    #"f_year",
    #"f_month",
    #"f_day",
    #"rank",
    "HolidayTravel",
]

float_cols_no_y = [column for column in float_cols if column != "DEP_DEL15"] 
cat_cols_mean = [column + "_mean" for column in cat_cols]

final_cols = (
    cat_cols_mean + float_cols_no_y + weather_interactions + feature_interact_columns
)

#### 1b. Helper Functions

In [0]:
# DOWNSAMPLING
random_seed = 100

def downsample_majority_class(df, label_col, majority_label, minority_label, order_by_cols, random_seed):
    # count number of instances in each class
    major_df = df.filter(col(label_col) == majority_label)
    minor_df = df.filter(col(label_col) == minority_label)
    ratio = major_df.count() / minor_df.count()
    sampled_majority_df = major_df.sample(False, 1.0 / ratio, seed=random_seed)

    sampled_df = sampled_majority_df.unionAll(minor_df)
    return sampled_df.orderBy(order_by_cols)

In [0]:
# FEATURE ENGINEERING

# ------------------------------------------------------------------------------------------------------------
# ------------------------------------------------------------------------------------------------------------
# Mean encode training data and apply to validatioon
def mean_encoding(train_df, val_df, cat_cols):
    mappers = {}

    for colm in cat_cols:
        # Create the mapper using training data
        mapper = train_df.groupBy(colm).agg(F.mean("DEP_DEL15").alias("mean")).collect()
        mappers[colm] = {row[colm]: row["mean"] for row in mapper}

        # Broadcast the mapper
        broadcasted_mapper = spark.sparkContext.broadcast(mappers[colm])

        # Map function
        map_udf = F.udf(
            lambda category: broadcasted_mapper.value.get(category, 0), FloatType()
        )

        # Apply mapper to both training and validation data
        train_df = train_df.withColumn(colm + "_mean", map_udf(F.col(colm)))
        val_df = val_df.withColumn(colm + "_mean", map_udf(F.col(colm)))

    return train_df, val_df, mappers


# ------------------------------------------------------------------------------------------------------------
# ------------------------------------------------------------------------------------------------------------
# cast columns
def cast_columns(df, float_cols, cat_cols, cat_cols_mean, float_cols_no_y):

    # Iterate over each column and replace 'T' with 0.001 for columns containing 'Hourly'
    for col_name in df.columns:
        if "Hourly" in col_name:
            df = df.withColumn(
                col_name,
                F.when(F.col(col_name) == "T", 0.001).otherwise(F.col(col_name)),
            )

    # Prep dates to become numbers
    df = (
        df.withColumn("FL_DATE", regexp_replace("FL_DATE", "-", "").cast(FloatType()))
        .withColumn("DATE", regexp_replace("DATE", "-", ""))
        .withColumn("DATE", regexp_replace("DATE", "T", ""))
        .withColumn("DATE", regexp_replace("DATE", ":", "").cast(FloatType()))
    )
    # Cast float columns to float type
    for float_col in float_cols:
        df = df.withColumn(float_col, col(float_col).cast("float"))

    # Feed forward populating of nulls
    window = (
        Window.partitionBy("STATION")
        .orderBy("DATE")
        .rowsBetween(Window.unboundedPreceding, Window.currentRow)
    )

    # Fill null values in numerical columns with 0
    for c in float_cols:
        if c == "DEP_DEL15":  # !!!!!!add if statement if it is also cancelled!!!!!!!!
            df = df.na.fill({c: 1})
        else:
            df = df.withColumn(c, F.last(col(c), ignorenulls=True).over(window))

    new_columns = cat_cols_mean + float_cols
    for c in new_columns:
        df = df.na.fill({c: 0.0})

    return df


# ------------------------------------------------------------------------------------------------------------
# ------------------------------------------------------------------------------------------------------------
# create interactions
def create_interactions(df):
    df = df.withColumns(
        {
            "DayOfWeek_CRS_DEP_TIME": col("DAY_OF_WEEK") * col("CRS_DEP_TIME"),
            "Carrier_CRS_DEP_TIME": col("OP_CARRIER_mean") * col("CRS_DEP_TIME"),
            "Month_CRS_DEP_TIME": col("MONTH") * col("CRS_DEP_TIME"),
            "Carrier_DayOfWeek": col("OP_CARRIER_mean") * col("DAY_OF_WEEK"),
            "Origin_CRS_DEP_TIME": col("ORIGIN_mean") * col("CRS_DEP_TIME"),
            "Precipitation_at_DepTime_and_Origin": col("HourlyPrecipitation") * col("CRS_DEP_TIME") * col("ORIGIN_mean"),
            "Visibility_OriginAirport": col("HourlyVisibility") * col("ORIGIN_mean"),
            "WindSpeed_Origin": col("HourlyWindSpeed") * col("ORIGIN_mean"),
            "Temperature_Month": col("HourlyDryBulbTemperature") * col("MONTH"),
            "Precipitation_Carrier": col("HourlyPrecipitation") * col("OP_CARRIER_mean"),
            "WindSpeed_Visibility": col("HourlyWindSpeed") * col("HourlyVisibility"),
            "DewPoint_Humidity": col("HourlyDewPointTemperature") * col("HourlyRelativeHumidity"),
            "Temp_WindSpeed": col("HourlyDryBulbTemperature") * col("HourlyWindSpeed"),
            "PressureTendency_Altimeter": col("HourlyPressureTendency") * col("HourlyAltimeterSetting"),
            "Visibility_SkyConditions": col("HourlyVisibility") * col("HourlySkyConditions"),
            "WindDir_GustSpeed": col("HourlyWindDirection") * col("HourlyWindGustSpeed"),
            "SeaLevel_StationPressure": col("HourlySeaLevelPressure") * col("HourlyStationPressure"),
            "Precipitation_WetBulbTemp": col("HourlyPrecipitation") * col("HourlyWetBulbTemperature"),
            "PressureChange_DewPoint": col("HourlyPressureChange") * col("HourlyDewPointTemperature"),
            "Humidity_Temperature": col("HourlyRelativeHumidity") * col("HourlyDryBulbTemperature"),
        }
    )
    return df

In [0]:
#PIPELINE

def build_pipeline(model):
    assembler = VectorAssembler(inputCols=final_cols, outputCol="features")
    scaler = StandardScaler(
        inputCol="features", outputCol="scaledFeatures", withStd=True, withMean=True
    )
    return Pipeline(stages=[assembler, scaler, model])

In [0]:
# EVALUATION
metric_evaluators = {
    "precision": MulticlassClassificationEvaluator(
        labelCol="DEP_DEL15", predictionCol="prediction", metricName="precisionByLabel"
    ),
    "recall": MulticlassClassificationEvaluator(
        labelCol="DEP_DEL15", predictionCol="prediction", metricName="recallByLabel"
    ),
    "f2": MulticlassClassificationEvaluator(
        labelCol="DEP_DEL15",
        predictionCol="prediction",
        metricName="fMeasureByLabel",
        beta=2.0,
    ),
}


def evaluate(predictions):
    evaluation_metric = {}
    for metric_name, evaluator in metric_evaluators.items():
        evaluation_metric[metric_name] = evaluator.evaluate(predictions)
    return evaluation_metric


def avg_of_metrics(metric_list, decay=0.1):
    # calculate exponential weights per fold
    weights = list(reversed(np.exp(-decay * np.arange(len(metric_list)))))

    avg_metric = {
        "train": {metric_name: 0 for metric_name in metric_evaluators.keys()},
        "val": {metric_name: 0 for metric_name in metric_evaluators.keys()},
    }
    for i,metric in enumerate(metric_list):
        for df_type in ["train", "val"]:
            for metric_name in metric_evaluators.keys():
                avg_metric[df_type][metric_name] += metric[df_type][metric_name] * weights[i]

    for df_type in ["train", "val"]:
        for metric_name in metric_evaluators.keys():
            avg_metric[df_type][metric_name] /= sum(weights)
    return avg_metric

In [0]:
# SIMPLE VALIDATION
def simple_validate(pipeline, train, val, split_ratio=0.8):
    model = pipeline.fit(train)
    return {
        "train": evaluate(model.transform(train)),
        "val": evaluate(model.transform(val)),
    }

In [0]:
# FINAL VALIDATION
def final_validate(pipeline, train, val):
    # downsample train data
    order_by_cols = ["DATE", "CRS_DEP_TIME"]
    train = downsample_majority_class(train, "DEP_DEL15", 0, 1, order_by_cols, random_seed)

    #mean_encoding
    train, val, mapper = mean_encoding(train, val, cat_cols)

    # cast columns
    train = cast_columns(train, float_cols, cat_cols, cat_cols_mean, float_cols_no_y)
    val = cast_columns(val, float_cols, cat_cols,cat_cols_mean, float_cols_no_y)
    
    # create interactions
    train = create_interactions(train)
    val = create_interactions(val)

    #!!!!!!!ADD CHECKPOINT STEP HERE FOR TRAIN AND VAL DFs!!!!!!!!
    model_name = str(pipeline.getStages()[-1]).split('_')[0]
    final_model_name = f"{model_name}-final"
    train.write.partitionBy("YEAR","QUARTER","MONTH").parquet(f"{team_blob_url}/TP/checkpoint/{final_model_name}", mode='overwrite')
    train = spark.read.parquet(f"{team_blob_url}/TP/checkpoint/{final_model_name}")

    model = pipeline.fit(train)
    return {
        "train": evaluate(model.transform(train)),
        "val": evaluate(model.transform(val)),
    }

In [0]:
# CROSS-VALIDATION

def cross_validate(pipeline, df, folds=4):
    total = df.count()
    fold_size = total // folds

    metrics = []
    models = []

    model_name = str(pipeline.getStages()[-1]).split('_')[0]
    for i in range(folds-1):
        cv_model_name = f"{model_name}-fold-{i + 1}"
        start_index = i * fold_size
        end_index = start_index + fold_size if i < folds - 1 else total
        train_size = int((end_index - start_index) * 0.8)
        val_size = (end_index - start_index) - train_size

        #train/val split
        train = df.offset(start_index).limit(train_size)
        val = df.offset(start_index + train_size).limit(end_index - (start_index + train_size))

        # downsample train data
        order_by_cols = ["DATE", "CRS_DEP_TIME"]
        train = downsample_majority_class(train, "DEP_DEL15", 0, 1, order_by_cols, random_seed)

        #mean_encoding
        train, val, mapper = mean_encoding(train, val, cat_cols)

        # cast columns
        train = cast_columns(train, float_cols, cat_cols, cat_cols_mean, float_cols_no_y)
        val = cast_columns(val, float_cols, cat_cols,cat_cols_mean, float_cols_no_y)
        
        # create interactions
        train = create_interactions(train)
        val = create_interactions(val)

        #!!!!!!!ADD CHECKPOINT STEP HERE FOR TRAIN AND VAL DFs!!!!!!!!
        train.write.partitionBy("YEAR","QUARTER","MONTH").parquet(f"{team_blob_url}/TP/checkpoint/{cv_model_name}", mode='overwrite')
        train = spark.read.parquet(f"{team_blob_url}/TP/checkpoint/{cv_model_name}")

        model = pipeline.fit(train)
        models.append(model)
        metrics.append({"train": evaluate(model.transform(train)),
                        "val": evaluate(model.transform(val)),})

    return metrics, avg_of_metrics(metrics), models

In [0]:
#GRID-SEARCH

success_metric = "f2"

def grid_search(
    model_class,
    param_grid,
    train,
    val=None,
    split_ratio=0.8,  # missing cv_folds will lead to simple validation which uses this train-val split ratio
    cv_folds=None,  # needed only for cross-validation
    decay=0.1,
    ):
    # initialize these to store the diff metrics
    best_hyper_params = None
    best_metrics = {
        df_type: {
            metric_name: -float("inf") for metric_name in metric_evaluators.keys()
        }
        for df_type in ["train", "val"]
    }
    results = []

    for params in param_grid:
        # print(f"params: {params}")
        hyper_params = {k.name: v for k, v in params.items()}
        print(f"\nfor hyper_params: \n{hyper_params}")

        # build actual model
        actual_model = model_class(
            featuresCol="scaledFeatures", labelCol="DEP_DEL15", **hyper_params
        )
        # build pipeline with model
        model_pipeline = build_pipeline(actual_model)
        # call simple/cross validate
        print("Attempting evaluation")
        if cv_folds is None:
            cur_metrics = simple_validate(model_pipeline, train, val, split_ratio)
        else:
            cur_metrics, weighted_avg, _ = cross_validate(model_pipeline, train, cv_folds)
        print("Completed evaluation")

        # get best hyper-params best of f1
        if weighted_avg["val"][success_metric] > best_metrics["val"][success_metric]:
            best_metrics = weighted_avg
            best_hyper_params = params

        result = {}
        result.update(hyper_params)
        result.update(weighted_avg)
        results.append(result)

    print("Best Model Parameters:", best_hyper_params)
    print("Best Model Evaluation:", best_metrics)
    return best_hyper_params, best_metrics, results

#### 2. load in dataset

In [0]:
# save `blind_5yr_df` for last test after final model is chosen and all tinkering is complete
# blind_5yr_df = spark.read.parquet(f"{team_blob_url}/TP/phase3data/five_year_joined_df".filter((col("YEAR") == "2019"))

train_5yr_df = spark.read.parquet(f"{team_blob_url}/TP/phase3data/five_year_joined_df")\
    .filter((col("YEAR").isin("2015", "2016", "2017"))|
            ((col("YEAR").isin("2018")) & (col("MONTH").isin("1", "2", "3", "4"))))\
    .select(raw_cols)
train_5yr_df = train_5yr_df.withColumn("DEP_DEL15", col("DEP_DEL15").cast("int"))

test_blind_5yr_df = spark.read.parquet(f"{team_blob_url}/TP/phase3data/five_year_joined_df")\
    .filter((col("YEAR") == "2018") & ~(col("MONTH").isin("1", "2", "3", "4")))\
    .select(raw_cols)
test_blind_5yr_df = test_blind_5yr_df.withColumn("DEP_DEL15", col("DEP_DEL15").cast("int"))


#### 2b. add airport-size column

In [0]:


#Creates a new column that splits airport of origin into five size buckets - features seem to have different importances depending on size
from pyspark.sql.functions import count as sql_count, col, when
tiny_airports = ['HOB', 'ROW', 'LWS', 'DLG', 'STC', 'MQT', 'CDV', 'IAG', 'BLI', 'RKS', 'ORH', 'LBE', 'SUN', 'SWF', 'ADQ', 'PUB', 'CNY', 'MTJ', 'ABY', 'WRG', 'MVY', 'SAF', 'PSG', 'PLN', 'LSE', 'WYS', 'DHN', 'GRI', 'CSG', 'GGG', 'AVP', 'RDD', 'PAH', 'VEL', 'JMS', 'DLH', 'OME', 'ILG', 'BQK', 'JLN', 'BTM', 'SMX', 'HDN', 'FAY', 'BRD', 'CWA', 'YAK', 'GCK', 'MMH', 'SUX', 'LAW', 'BPT', 'PPG', 'TOL', 'ACT', 'DAB', 'MHK', 'OTZ', 'SPS', 'BGM', 'HYS', 'SGU', 'ESC', 'UST', 'GUC', 'RHI', 'EGE', 'AKN', 'GCC', 'COD', 'CEC', 'BGR', 'ERI', 'BJI', 'SCC', 'PIB', 'TWF', 'GFK', 'DIK', 'AZO', 'EWN', 'MKG', 'ACK', 'PBG', 'PIH', 'MLB', 'CLD', 'ACV', 'FLG', 'DVL', 'HIB', 'LAR', 'GTR', 'OAJ', 'CDC', 'IMT', 'MBS', 'SIT', 'COU', 'GST', 'ALO', 'CIU', 'DBQ', 'ITH', 'HYA', 'APN', 'TXK', 'HLN', 'VLD', 'BET', 'EAU', 'CMX', 'SCE', 'EKO', 'PHF', 'ABR', 'ADK', 'INL', 'CPR', 'MEI', 'SJT', 'STX', 'OTH', 'SPI', 'BQN', 'BRW']  # Add all codes for 'tiny'

small_airports = ['ISP', 'ECP', 'GNV', 'TYR', 'RAP', 'FSM', 'LNK', 'KTN', 'LAN', 'BTV', 'GTF', 'TRI', 'EYW', 'EVV', 'MGM', 'GJT', 'GRK', 'LCH', 'CRW', 'AGS', 'FAI', 'CHO', 'MLI', 'HSV', 'RST', 'CHA', 'YUM', 'ROA', 'MSO', 'SRQ', 'MFE', 'BIL', 'TLH', 'BRO', 'BIS', 'ASE', 'BMI', 'MOT', 'PSC', 'ATW', 'SBP', 'IDA', 'AVL', 'ISN', 'MFR', 'GPT', 'BFL', 'JAC', 'MRY', 'AMA', 'STT', 'ILM', 'DRO', 'CMI', 'ABE', 'TTN', 'ACY', 'MDT', 'AEX', 'LRD', 'HRL', 'SBN', 'RDM', 'BZN', 'FCA', 'ABI', 'EUG', 'JNU', 'ELM', 'TVC', 'CLL', 'MLU']  # Add all codes for 'small'

large_airports = ['ANC', 'FAT', 'GEG', 'BDL', 'ONT', 'HPN', 'JAN', 'LEX', 'BTR', 'GSO', 'BUR', 'PWM', 'OGG', 'TUL', 'ELP', 'OKC', 'RNO', 'DAY', 'DSM', 'LIT', 'ABQ', 'CAK', 'FSD', 'BHM', 'PBI', 'MEM', 'FNT', 'SYR', 'LGB', 'MOB', 'PVD', 'CVG', 'BOI', 'PNS', 'KOA', 'ORF', 'SHV', 'PSP', 'CID', 'MHT', 'MAF', 'GRR', 'JAX', 'CRP', 'BUF', 'ALB', 'VPS', 'FWA', 'GRB', 'XNA', 'ICT', 'LFT', 'SAV', 'SBA', 'CHS', 'CAE', 'SDF', 'COS', 'SGF', 'MSN', 'RIC', 'TUS', 'LIH', 'LBB', 'TYS', 'MYR', 'FAR', 'ITO', 'GSP', 'SJU', 'OMA', 'ROC', 'PIA']

unsure=['CGI', 'YNG', 'AZA', 'HTS', 'STS', 'LYH', 'EFD', 'OWB', 'OGD', 'ENV', 'FLO', 'SWO', 'BLV', 'PSM', 'IFP', 'PGV', 'BFF', 'TKI', 'SLN', 'SCK', 'PVU', 'PGD', 'HVN', 'UIN', 'PIE', 'HGR', 'LBF', 'LBL', 'OGS', 'SFB', 'SHD', 'RFD', 'PSE', 'LWB', 'CKB', 'USA', 'LCK']


# Add a new column 'AirportSize' based on the ORIGIN code
train_5yr_df = train_5yr_df.withColumn('AirportSize',
                             when(col('ORIGIN').isin(tiny_airports), 'tiny')
                             .when(col('ORIGIN').isin(small_airports), 'small')
                             .when(col('ORIGIN').isin(large_airports), 'large')
                             .when(col('ORIGIN').isin(unsure), 'unsure')
                             .otherwise('huge'))

test_blind_5yr_df = test_blind_5yr_df.withColumn('AirportSize',
                             when(col('ORIGIN').isin(tiny_airports), 'tiny')
                             .when(col('ORIGIN').isin(small_airports), 'small')
                             .when(col('ORIGIN').isin(large_airports), 'large')
                             .when(col('ORIGIN').isin(unsure), 'unsure')
                             .otherwise('huge'))

In [0]:
display(train_5yr_df.limit(10))

STATION,DATE,LATITUDE,ELEVATION,LONGITUDE,HourlyAltimeterSetting,HourlyDewPointTemperature,HourlyDryBulbTemperature,HourlyPrecipitation,HourlyPresentWeatherType,HourlyPressureChange,HourlyPressureTendency,HourlyRelativeHumidity,HourlySkyConditions,HourlySeaLevelPressure,HourlyStationPressure,HourlyVisibility,HourlyWetBulbTemperature,HourlyWindDirection,HourlyWindGustSpeed,HourlyWindSpeed,DAY_OF_MONTH,DAY_OF_WEEK,YEAR,FL_DATE,OP_CARRIER,ORIGIN,DEST,CRS_ARR_TIME,CRS_DEP_TIME,CRS_ELAPSED_TIME,DISTANCE,DEP_DEL15,QUARTER,MONTH,RecentHourlyPrecipitationDelta,RecentHourlyRelativeHumidityDelta,HolidayTravel,ORIGIN_DELAY_SUM,TAIL_NUM_PREV_DELAY,AirportSize
72219013874,2015-01-01T07:52:00,33.6301,307.8,-84.4418,30.33,26,32,0.0,,,,79,FEW:02 150 SCT:04 200 SCT:04 250,30.36,29.22,10.0,30,0,,0,1,4,2015,2015-01-01,EV,ATL,IAD,725,540,105.0,534.0,1.0,1,1,,,0,,,huge
72219013874,2015-01-01T08:52:00,33.6301,307.8,-84.4418,30.33,28,37,0.0,,,,70,FEW:02 150 SCT:04 200 SCT:04 250,30.35,29.22,10.0,33,0,,0,1,4,2015,2015-01-01,F9,ATL,TTN,750,600,110.0,701.0,1.0,1,1,,,0,,,huge
72219013874,2015-01-01T08:52:00,33.6301,307.8,-84.4418,30.33,28,37,0.0,,,,70,FEW:02 150 SCT:04 200 SCT:04 250,30.35,29.22,10.0,33,0,,0,1,4,2015,2015-01-01,AA,ATL,DFW,755,625,150.0,731.0,,1,1,,,0,,,huge
72219013874,2015-01-01T08:52:00,33.6301,307.8,-84.4418,30.33,28,37,0.0,,,,70,FEW:02 150 SCT:04 200 SCT:04 250,30.35,29.22,10.0,33,0,,0,1,4,2015,2015-01-01,DL,ATL,LGA,853,645,128.0,762.0,0.0,1,1,,,0,,,huge
72219013874,2015-01-01T08:52:00,33.6301,307.8,-84.4418,30.33,28,37,0.0,,,,70,FEW:02 150 SCT:04 200 SCT:04 250,30.35,29.22,10.0,33,0,,0,1,4,2015,2015-01-01,OO,ATL,IAH,802,646,136.0,689.0,0.0,1,1,,,0,,,huge
72219013874,2015-01-01T09:52:00,33.6301,307.8,-84.4418,30.34,27,43,0.0,,-0.01,3.0,53,FEW:02 150 SCT:04 200 BKN:07 250,30.36,29.23,10.0,37,0,,0,1,4,2015,2015-01-01,DL,ATL,MCO,822,655,87.0,404.0,0.0,1,1,,,0,,,huge
72219013874,2015-01-01T09:52:00,33.6301,307.8,-84.4418,30.34,27,43,0.0,,-0.01,3.0,53,FEW:02 150 SCT:04 200 BKN:07 250,30.36,29.23,10.0,37,0,,0,1,4,2015,2015-01-01,AA,ATL,MIA,852,700,112.0,594.0,0.0,1,1,,,0,,,huge
72219013874,2015-01-01T09:52:00,33.6301,307.8,-84.4418,30.34,27,43,0.0,,-0.01,3.0,53,FEW:02 150 SCT:04 200 BKN:07 250,30.36,29.23,10.0,37,0,,0,1,4,2015,2015-01-01,DL,ATL,SLC,940,720,260.0,1590.0,0.0,1,1,,,0,,,huge
72219013874,2015-01-01T09:52:00,33.6301,307.8,-84.4418,30.34,27,43,0.0,,-0.01,3.0,53,FEW:02 150 SCT:04 200 BKN:07 250,30.36,29.23,10.0,37,0,,0,1,4,2015,2015-01-01,WN,ATL,DAL,845,725,140.0,721.0,0.0,1,1,,,0,,,huge
72219013874,2015-01-01T09:52:00,33.6301,307.8,-84.4418,30.34,27,43,0.0,,-0.01,3.0,53,FEW:02 150 SCT:04 200 BKN:07 250,30.36,29.23,10.0,37,0,,0,1,4,2015,2015-01-01,UA,ATL,SFO,1004,729,335.0,2139.0,0.0,1,1,,,0,,,huge


#### 3. balance classes

#### 4. RandomForest pipeline


##### 4b. Model Finetuning with grid-search

In [0]:
from pyspark.ml.tuning import ParamGridBuilder
 

rf = RandomForestClassifier(featuresCol="scaledFeatures", labelCol="DEP_DEL15")

dummy_rfc = RandomForestClassifier(
    featuresCol="scaledFeatures", labelCol="DEP_DEL15", seed=random_seed
)
param_grid_rfc = (
    ParamGridBuilder()
    .addGrid(RandomForestClassifier.maxBins, [32, 64]) \
    .addGrid(RandomForestClassifier.numTrees, [5, 10]) \
    .addGrid(RandomForestClassifier.maxDepth, [3, 7]) \
    .build()
)

from pyspark.ml.classification import RandomForestClassifier

best_hyper_params, best_metrics, results = grid_search(
    RandomForestClassifier,  # model class
    param_grid_rfc,              # parameter grid
    train_5yr_df,
    cv_folds=4
)


for hyper_params: 
{'maxBins': 32, 'numTrees': 5, 'maxDepth': 3}
Attempting evaluation
Completed evaluation

for hyper_params: 
{'maxBins': 32, 'numTrees': 5, 'maxDepth': 7}
Attempting evaluation
Completed evaluation

for hyper_params: 
{'maxBins': 32, 'numTrees': 10, 'maxDepth': 3}
Attempting evaluation
Completed evaluation

for hyper_params: 
{'maxBins': 32, 'numTrees': 10, 'maxDepth': 7}
Attempting evaluation
Completed evaluation

for hyper_params: 
{'maxBins': 64, 'numTrees': 5, 'maxDepth': 3}
Attempting evaluation
Completed evaluation

for hyper_params: 
{'maxBins': 64, 'numTrees': 5, 'maxDepth': 7}
Attempting evaluation
Completed evaluation

for hyper_params: 
{'maxBins': 64, 'numTrees': 10, 'maxDepth': 3}
Attempting evaluation
Completed evaluation

for hyper_params: 
{'maxBins': 64, 'numTrees': 10, 'maxDepth': 7}
Attempting evaluation
Completed evaluation
Best Model Parameters: {Param(parent='undefined', name='maxBins', doc='Max number of bins for discretizing continuous featur


##### 4c. Finetuned model eval on 2018 data

In [0]:
from pyspark.ml.classification import RandomForestClassifier

# Set the best parameters in the RandomForestClassifier
rf_best = RandomForestClassifier(featuresCol="scaledFeatures", 
                            labelCol="DEP_DEL15", 
                            maxBins=100, 
                            numTrees=5, 
                            maxDepth=15)

In [0]:
from pyspark.ml.classification import RandomForestClassifier

# Set the best parameters in the RandomForestClassifier
rf_best = RandomForestClassifier(featuresCol="scaledFeatures", 
                            labelCol="DEP_DEL15", 
                            maxBins=100, 
                            numTrees=5, 
                            maxDepth=15)


pipeline = build_pipeline(rf_best)

# downsample train data
order_by_cols = ["DATE", "CRS_DEP_TIME"]
train = downsample_majority_class(train_5yr_df, "DEP_DEL15", 0, 1, order_by_cols, random_seed)
val = test_blind_5yr_df
#mean_encoding
train, val, mapper = mean_encoding(train, val, cat_cols)

# cast columns
train = cast_columns(train, float_cols, cat_cols, cat_cols_mean, float_cols_no_y)
val = cast_columns(val, float_cols, cat_cols,cat_cols_mean, float_cols_no_y)

# create interactions
train = create_interactions(train)
val = create_interactions(val)


# Fit the pipeline to the training data
model = pipeline.fit(train)

# Make predictions on the validation set
predictions = model.transform(val)

# Evaluate the model using your custom evaluation function
evaluation_metrics = evaluate(predictions)

# Print the evaluation metrics
print("Evaluation Metrics:", evaluation_metrics)

# Extract the RandomForest model from the pipeline
rf_model = model.stages[-1] 

# Get feature importances
importances = rf_model.featureImportances



Evaluation Metrics: {'precision': 0.8752550807985665, 'recall': 0.6097212601432628, 'f2': 0.6491048562172768}



### 4d. analysis of feature importance

In [0]:
assembler = [stage for stage in model.stages if isinstance(stage, VectorAssembler)][0]

In [0]:
assembler = [stage for stage in model.stages if isinstance(stage, VectorAssembler)][0]

feature_names = assembler.getInputCols()

# Match feature names with their importances
feature_importances = [(feature, importance) for feature, importance in zip(feature_names, importances)]

# Sort and display top features
sorted_features = sorted(feature_importances, key=lambda x: x[1], reverse=True)
for feature, importance in sorted_features:
    print(f"Feature: {feature}, Importance: {importance}")

Feature: Carrier_CRS_DEP_TIME, Importance: 0.37607869413481637
Feature: Origin_CRS_DEP_TIME, Importance: 0.16235424819471167
Feature: TAIL_NUM_PREV_DELAY, Importance: 0.060750948425244165
Feature: OP_CARRIER_mean, Importance: 0.045179601719206576
Feature: ORIGIN_DELAY_SUM, Importance: 0.04259633035896489
Feature: DEST_mean, Importance: 0.03295979358421199
Feature: ORIGIN_mean, Importance: 0.025085984661393945
Feature: CRS_DEP_TIME, Importance: 0.02479471050771934
Feature: HourlyRelativeHumidity, Importance: 0.023385730432489676
Feature: CRS_ARR_TIME, Importance: 0.022389866292051027
Feature: MONTH, Importance: 0.017885830841879207
Feature: LONGITUDE, Importance: 0.013621744328048548
Feature: STATION_mean, Importance: 0.013430338519917743
Feature: Humidity_Temperature, Importance: 0.011409261923828032
Feature: HourlyDryBulbTemperature, Importance: 0.010959095965721899
Feature: HourlyWetBulbTemperature, Importance: 0.010627897591181686
Feature: DewPoint_Humidity, Importance: 0.0103426582