# Phase 3 - End-to-end ML Pipeline

![Phase 3 ML Pipeline Flow Chart](https://i.imgur.com/3FF1OYx.png)

Imports

In [0]:
# Import required libraries
import gc
import pandas as pd 
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from functools import reduce

from pyspark import keyword_only
from pyspark.sql import DataFrame
import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler, RobustScaler, Imputer
from pyspark.ml import PipelineModel, Pipeline, Transformer, PipelineModel, Estimator, Model
from pyspark.sql.functions import col, when, isnan, lit, expr, row_number
from pyspark.ml.functions import vector_to_array, array_to_vector
from pyspark.sql.window import Window
from pyspark.ml.stat import Correlation
from multiprocessing.pool import ThreadPool
from pyspark.ml.param.shared import Param, Params, TypeConverters, HasParallelism, HasInputCols, HasOutputCols
from pyspark.ml.tuning import CrossValidatorModel, ParamGridBuilder
from pyspark.ml.util import MLReadable, MLWritable, DefaultParamsReadable, DefaultParamsWritable
from pyspark.ml.classification import LogisticRegression, GBTClassifier, OneVsRest, MultilayerPerceptronClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.storagelevel import StorageLevel

# Set pandas settings
pd.set_option('display.max_rows', None)  
pd.set_option('display.max_columns', None)  

Set Spark Config (for large dataset)

In [0]:
# These configurations help Spark better handle large datasets
spark.conf.set("spark.sql.adaptive.enabled", "true")  # Enable adaptive query execution
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")
# Updated values based on your specific cluster
spark.conf.set("spark.sql.shuffle.partitions", "200")  # Balanced for your core count
spark.conf.set("spark.default.parallelism", "200")
# Additional configuration for shuffle stability
spark.conf.set("spark.shuffle.io.maxRetries", "10")
spark.conf.set("spark.shuffle.io.retryWait", "30s")
spark.conf.set("spark.storage.blockManagerSlaveTimeoutMs", "300000")
spark.conf.set("spark.sql.adaptive.localShuffleReader.enabled", "true")

# checkpoint initial cleaned data
# Create folder
section = "03"
number = "01"
folder_path = f"dbfs:/student-groups/Group_{section}_{number}/pipeline_checkpoints"
spark.sparkContext.setCheckpointDir(folder_path)

Helper Methods

In [0]:
# Helper function to find missing values
def find_missing_values(df):
    """Take a dataframe and count up the number of missing values in each column as well as the percent of missing values.
    Output as a pandas dataframe."""
    df_len = df.count() # find length of df

    # count missing values and convert to pandas df
    df_missing = df.select([F.count(F.when(F.col(c).isNull(), c)).alias(c) for c in df.columns]).toPandas().transpose()

    # find percent missing and sort the columns by the percent missing
    df_missing = df_missing.rename(columns={0: 'num_missing'})
    df_missing['percent_missing'] = df_missing['num_missing'] / df_len * 100
    df_missing = df_missing.sort_values('percent_missing', ascending=False)

    return df_missing

# Helper function to check for null and NaN values
def get_null_nan_percentages(df):
    """Get percentage of null and NaN values in each column of the dataframe"""
    from pyspark.sql.functions import col, sum, when, isnan
    from pyspark.sql.types import NumericType

    data_len = df.count()

    # Identify numeric and non-numeric columns
    numeric_cols = [c for c, dtype in df.dtypes if isinstance(df.schema[c].dataType, NumericType)]
    string_cols = [c for c in df.columns if c not in numeric_cols]

    # Handle NULL values for all columns + NaN values only for numeric columns
    null_nan_counts_df = df.select([
        sum(when(col(c).isNull(), 1).otherwise(0)).alias(c) for c in string_cols
    ] + [
        sum(when(col(c).isNull() | isnan(col(c)), 1).otherwise(0)).alias(c) for c in numeric_cols
    ])

    # Convert result to a Python dictionary
    null_nan_counts = null_nan_counts_df.toPandas().to_dict(orient="records")[0]

    null_nan_percentage = {key: value / data_len for key, value in null_nan_counts.items()}

    return null_nan_percentage

In [0]:
# Helper method to generate sequential train-test splits from dataset
def train_test_split_seq(df):
    # Add a sequential row index using a window function
    windowSpec = Window.orderBy('YEAR', 'MONTH', 'DAY_OF_MONTH', 'sched_depart_date_time_UTC')

    df = df.withColumn("row_index", row_number().over(windowSpec))

    # Compute the boundary index for an 80/20 split
    train_rows = int(df.count() * 0.8)

    # Split the data into training and test sets based on the row index
    train_df = df.filter(df.row_index <= train_rows)
    test_df = df.filter(df.row_index > train_rows)

    # Drop Row Index Column
    train_df = train_df.drop("row_index")
    test_df = test_df.drop("row_index")

    return train_df, test_df

Sampling Helper Methods

In [0]:
# Sample with time-based windows
def sample_with_time_windows(df, sample_size):
    # Count total rows
    total = df.count()
    # Calculate approximately how many rows to take per month
    rows_per_month = sample_size / 12  # Assuming data spans 12 months
    
    # Sample from each month
    samples = []
    for month in range(1, 13):
        month_df = df.filter(F.col("MONTH") == month)
        month_count = month_df.count()
        if month_count > 0:
            fraction = min(1.0, rows_per_month / month_count)
            month_sample = month_df.sample(fraction=fraction, seed=42)
            samples.append(month_sample)
    
    # Combine all samples
    if samples:
        return reduce(DataFrame.unionAll, samples)
    return df.limit(sample_size)  # Fallback

In [0]:
# SMOTE-like function w/full row preservation
def fast_pseudo_smote_preserve_all(df: DataFrame, label_col: str, feature_col: str, target_count: int = None) -> DataFrame:
    class_counts = df.groupBy(label_col).count().collect()
    class_sizes = {row[label_col]: row['count'] for row in class_counts}
    max_size = target_count if target_count else max(class_sizes.values())

    balanced_dfs = []

    for cls, count in class_sizes.items():
        df_cls = df.filter(col(label_col) == cls)
        balanced_dfs.append(df_cls)

        if count >= max_size:
            continue

        repeat_factor = int(max_size / count)
        remainder = max_size - (repeat_factor * count)

        # Repeat original rows
        for _ in range(repeat_factor - 1):
            balanced_dfs.append(df_cls)

        # Add synthetic noise to feature vector
        df_array = df_cls.withColumn("feature_arr", vector_to_array(col(feature_col)))
        df_noisy = df_array.withColumn(
            "feature_arr", F.expr("transform(feature_arr, x -> x + randn() * 0.01)")
        ).withColumn(
            feature_col, array_to_vector("feature_arr")
        ).drop("feature_arr").limit(remainder)

        balanced_dfs.append(df_noisy)

    return reduce(DataFrame.unionByName, balanced_dfs)

In [0]:
# def upsampling(df):
#     # Filter the DataFrame by each class label for DEP_DELAY_BINNED:
#     df_class0 = df.filter(col("DEP_DELAY_BINNED") == 0)
#     df_class1 = df.filter(col("DEP_DELAY_BINNED") == 1)
#     df_class2 = df.filter(col("DEP_DELAY_BINNED") == 2)

#     # Count the number of rows (samples) in each class
#     count0 = df_class0.count()
#     count1 = df_class1.count()
#     count2 = df_class2.count()

#     print("Counts:", count0, count1, count2)

#     # Define a scaling factor (alpha) that will be applied to the majority class count.
#     # Here, we're setting the target count for classes 1 and 2 as 75% of the count of class 0
#     alpha = 0.75

#     # Calculate the target count based on the scaling factor and class 0 count.
#     target_count = alpha * count0
#     # Compute sampling fractions for classes 1 and 2 to upsample them to the target_count.
#     frac1 = target_count / count1
#     frac2 = target_count / count2

#     # Perform upsampling using random sampling with replacement for classes 1 and 2.
#     # The fraction parameters determine how many times each class will be replicated.
#     df_class1_upsampled = df_class1.sample(withReplacement=True, fraction=frac1, seed=42)
#     df_class2_upsampled = df_class2.sample(withReplacement=True, fraction=frac2, seed=42)

#     # Combine class 0 with the upsampled class 1 and class 2 to form a balanced DataFrame.
#     balanced_df = df_class0.unionAll(df_class1_upsampled).unionAll(df_class2_upsampled)

#     # Order the resulting DataFrame by time-related columns
#     balanced_df = balanced_df.orderBy('YEAR', 'MONTH', 'DAY_OF_MONTH', 'sched_depart_hour_UTC', 'sched_depart_minute_UTC')

#     # Group by the DEP_DELAY_BINNED column to show the new distribution of classes.
#     balanced_df.groupBy("DEP_DELAY_BINNED").count().show()

#     return balanced_df


In [0]:
def upsampling(df):
    # Filter the DataFrame by each class label for DEP_DELAY_BINNED:
    df_class0 = df.filter(col("DEP_DELAY_BINNED") == 0)
    df_class1 = df.filter(col("DEP_DELAY_BINNED") == 1)
    df_class2 = df.filter(col("DEP_DELAY_BINNED") == 2)

    # Count the number of rows (samples) in each class
    count0 = df_class0.count()
    count1 = df_class1.count()
    count2 = df_class2.count()

    print("Counts:", count0, count1, count2)
    
    # Check if any class has zero samples
    if count0 == 0 or count1 == 0 or count2 == 0:
        print("Warning: One or more classes have zero samples!")
        # Find the class with max count to use as reference
        max_class_count = max(count0, count1, count2)
        max_class_index = [count0, count1, count2].index(max_class_count)
        print(f"Using class {max_class_index} with {max_class_count} samples as reference")
        
        # Define a fallback strategy
        balanced_dfs = []
        
        # Add available classes to the result
        if count0 > 0:
            balanced_dfs.append(df_class0)
        if count1 > 0:
            balanced_dfs.append(df_class1)
        if count2 > 0:
            balanced_dfs.append(df_class2)
            
        # If there's only one class with data, just return it
        if len(balanced_dfs) == 1:
            print("Only one class has data, returning without upsampling")
            balanced_df = balanced_dfs[0]
        else:
            # Otherwise, upsample the available classes to match the largest
            alpha = 0.75  # Same scaling factor as original
            target_count = alpha * max_class_count
            
            # Upsample each available non-empty class (except the largest)
            for i, count in enumerate([count0, count1, count2]):
                if i == max_class_index or count == 0:
                    continue  # Skip the largest class and empty classes
                    
                # Get the appropriate class DataFrame
                class_df = [df_class0, df_class1, df_class2][i]
                
                # Calculate fraction for upsampling
                frac = target_count / count
                
                # Upsample and add to balanced_dfs
                upsampled_df = class_df.sample(withReplacement=True, fraction=frac, seed=42)
                balanced_dfs.append(upsampled_df)
            
            # Combine all dataframes
            balanced_df = reduce(DataFrame.unionAll, balanced_dfs)
    else:
        # Original logic when all classes have data
        alpha = 0.75
        target_count = alpha * count0
        
        # Compute sampling fractions for classes 1 and 2
        frac1 = target_count / count1 if count1 > 0 else 0
        frac2 = target_count / count2 if count2 > 0 else 0
        
        # Perform upsampling
        df_class1_upsampled = df_class1.sample(withReplacement=True, fraction=frac1, seed=42) if count1 > 0 else df_class1
        df_class2_upsampled = df_class2.sample(withReplacement=True, fraction=frac2, seed=42) if count2 > 0 else df_class2
        
        # Combine all classes
        balanced_df = df_class0.unionAll(df_class1_upsampled).unionAll(df_class2_upsampled)

    # Order the resulting DataFrame by time-related columns
    balanced_df = balanced_df.orderBy('YEAR', 'MONTH', 'DAY_OF_MONTH', 'sched_depart_hour_UTC', 'sched_depart_minute_UTC')

    # Group by the DEP_DELAY_BINNED column to show the new distribution of classes
    balanced_df.groupBy("DEP_DELAY_BINNED").count().show()

    return balanced_df

In [0]:
def downsampling(df):
    # Filter the DataFrame by each class label
    df_class0 = df.filter(col("DEP_DELAY_BINNED") == 0)
    df_class1 = df.filter(col("DEP_DELAY_BINNED") == 1)
    df_class2 = df.filter(col("DEP_DELAY_BINNED") == 2)
    
    # Count the number of instances for each class
    count0 = df_class0.count()
    count1 = df_class1.count()
    count2 = df_class2.count()
    
    print("Counts:", count0, count1, count2)
    
    # Determine the target count as the minimum number of instances among all classes
    target_count = min(count0, count1, count2)
    
    # Compute sampling fractions for each class based on the target count
    frac0 = target_count / count0 if count0 > target_count else 1.0
    frac1 = target_count / count1 if count1 > target_count else 1.0
    frac2 = target_count / count2 if count2 > target_count else 1.0
    
    # Downsample each class (without replacement) to match the target count
    df_class0_sampled = df_class0.sample(withReplacement=False, fraction=frac0, seed=42)
    df_class1_sampled = df_class1.sample(withReplacement=False, fraction=frac1, seed=42)
    df_class2_sampled = df_class2.sample(withReplacement=False, fraction=frac2, seed=42)
    
    # Combine the downsampled classes to form a balanced DataFrame
    balanced_df = df_class0_sampled.unionAll(df_class1_sampled).unionAll(df_class2_sampled)
    
    # Sort the resulting DataFrame by time attributes 
    balanced_df = balanced_df.orderBy('YEAR', 'MONTH', 'DAY_OF_MONTH', 'sched_depart_hour_UTC', 'sched_depart_minute_UTC')
    
    # Show the new class counts
    balanced_df.groupBy("DEP_DELAY_BINNED").count().show()
    
    return balanced_df


Graph feature helper method

In [0]:
from graphframes import GraphFrame
def graph_based_feature(df):
    # calculate the number of flights that occur from one airport to another (this will be the weight of the edges)
    flight_freq = df.groupBy(col('ORIGIN_AIRPORT_ID').alias('src'),col('DEST_AIRPORT_ID').alias('dst')).count().withColumnRenamed('count','flight_freq')

    # create the vertices based on airport ids for origin and destination, respectively
    vertices_origin = df.select(col('ORIGIN_AIRPORT_ID').alias('id'))
    vertices_dest = df.select(col('DEST_AIRPORT_ID').alias('id'))

    # union the two dataframes to have one list of all nodes (vertices)
    vertices = vertices_origin.union(vertices_dest).dropDuplicates()

    # edges will be the connection between the origin and destination airports weighted by how frequently they're flown
    edges = df.select(col('ORIGIN_AIRPORT_ID').alias('src'), col('DEST_AIRPORT_ID').alias('dst'))
    edges = edges.join(flight_freq, ['src','dst'])

    # create graph and calculate pagerank (tol=1e-6 will stop the program when the ranks don't change by more than 1e-6)
    g = GraphFrame(vertices, edges)
    results = g.pageRank(tol=1e-3)

    # get just the airport ids and their corresponding pageranks
    airport_pr = results.vertices.select(col('id'), col('pagerank'))

    # join the pageranks with the original dataframe and do it for both the origin and destinations airports
    df_graph = df.join(airport_pr, df.ORIGIN_AIRPORT_ID == airport_pr.id).drop('id').withColumnRenamed('pagerank','origin_airport_pagerank')
    df_graph = df_graph.join(airport_pr, df_graph.DEST_AIRPORT_ID == airport_pr.id).drop('id').withColumnRenamed('pagerank','dest_airport_pagerank')

    return df_graph

Partitioning/Checkpointing Helper Methods

In [0]:
def create_balanced_time_ordered_partitions(df, num_partitions=200):
    """
    Creates evenly sized partitions while maintaining chronological order.
    
    This approach:
    1. Computes row numbers based on time ordering
    2. Creates a synthetic partition key that distributes rows evenly
    3. Repartitions by this key to get balanced partitions
    4. Sorts within each partition to maintain overall ordering
    
    Returns: A DataFrame with balanced partitions in chronological order
    """
    if 'sched_depart_date_time_UTC' in df.columns:
        sort_list = ['YEAR', 'MONTH', 'DAY_OF_MONTH', 'sched_depart_date_time_UTC']
    else:
        sort_list = ['YEAR', 'MONTH', 'DAY_OF_MONTH', 'sched_depart_hour_UTC', 'sched_depart_minute_UTC']

    # First, add a monotonically increasing ID based on time ordering
    windowSpec = Window.orderBy(*sort_list)
    # `sched_depart_minute_UTC`, `sched_depart_hour_UTC`
    df_indexed = df.withColumn("row_num", F.row_number().over(windowSpec))
    
    # Get the total count to calculate partition sizes
    total_rows = df_indexed.count()
    rows_per_partition = total_rows / num_partitions
    
    # Create a synthetic partition key that distributes rows evenly
    df_partitioned = df_indexed.withColumn(
        "partition_id", 
        F.floor(F.col("row_num") / F.lit(rows_per_partition))
    )
    
    # Repartition by this synthetic key to get balanced partitions
    df_balanced = df_partitioned.repartition(num_partitions, "partition_id")
    # Sort within each partition to maintain chronological order
    df_balanced = df_balanced.sortWithinPartitions(*sort_list)
    # Drop the temporary columns
    df_balanced = df_balanced.drop("row_num", "partition_id")
    
    return df_balanced

In [0]:
def checkpoint_save(df, name, overwrite=True, clean_previous=True):
    """Save DataFrame to checkpoint location with standardized naming and cleanup"""
    checkpoint_path = f"{folder_path}/{name}"
    
    # First check if we need to clean up previous checkpoint with same name
    if clean_previous and spark._jsparkSession.catalog().tableExists(name):
        spark.sql(f"DROP TABLE IF EXISTS {name}")
        
    # Force materialization before writing to ensure computation happens now
    count = df.count()
    print(f"Checkpointing {count} rows to {name}")
    
    # Repartition to balance data and write as table for efficient retrieval
    # Use coalesce if the dataset has become smaller due to filtering
    if df.rdd.getNumPartitions() > 200 and count < 10000000:
        df = df.coalesce(200)
    else:
        df = df.repartition(200)
        
    df.write.format("parquet").mode("overwrite").saveAsTable(name)
    
    # Explicitly unpersist and run garbage collection
    df.unpersist(blocking=True)
    gc.collect()
    
    return checkpoint_path

def checkpoint_load(name):
    """Load DataFrame from checkpoint location"""
    print(f"Loading checkpoint: {name}")
    return spark.table(name)

def cleanup_checkpoints():
    """Delete all checkpoint tables created during processing"""
    tables = [table.name for table in spark.catalog.listTables() 
              if table.name.startswith("checkpoint_")]
    
    for table in tables:
        spark.sql(f"DROP TABLE IF EXISTS {table}")
    
    print(f"Cleaned up {len(tables)} checkpoint tables")
    
    # Also remove physical checkpoint directory
    dbutils.fs.rm(folder_path, recurse=True)

def clear_cache():
    """Clear all cached DataFrames and run garbage collection"""
    # Clear Spark cache
    spark.catalog.clearCache()
    
    # Force Python garbage collection
    gc.collect()
    
    print("Cache cleared and garbage collection performed")

Custom Transformers (for feature engineering and imputation)

In [0]:
# custom transformer for preprocessing/cleaning data
class DataPreprocessor(Transformer, HasInputCols, HasOutputCols, DefaultParamsReadable, DefaultParamsWritable):
    """
    Custom transformer to preprocess flight delay data before feature engineering
    """
    def __init__(self, inputCols=None, outputCols=None):
        super(DataPreprocessor, self).__init__()
        self._setDefault(inputCols=[], outputCols=[])
        self.setParams(inputCols, outputCols)
    
    def setParams(self, inputCols=None, outputCols=None):
        if inputCols is not None:
            self.setInputCols(inputCols)
        if outputCols is not None:
            self.setOutputCols(outputCols)
        return self
    
    def _transform(self, dataset):
        """
        Transforms the input DataFrame by applying preprocessing steps
        """
        print('Running DataPreprocessor...')
        df = dataset
        print(f'Preprocessing {df.count()} rows')
        # Drop records with missing target variable
        df = df.dropna(subset=["DEP_DELAY"])
        
        # Remove columns with high missing values (>30%)
        # In a transformer, we would typically pre-calculate this or pass a parameter
        # For simplicity, let's include only the columns to delete directly
        cols_to_delete = self._get_high_missing_cols(df)
        if cols_to_delete:
            df = df.drop(*cols_to_delete)
        
        # Remove redundant features
        redundant_features = ['ORIGIN', 'OP_CARRIER', 'ORIGIN_AIRPORT_SEQ_ID', 'ORIGIN_CITY_MARKET_ID',
                             'ORIGIN_CITY_NAME', 'ORIGIN_STATE_NM', 'ORIGIN_WAC', 'ORIGIN_STATE_FIPS',
                             'DEST_AIRPORT_SEQ_ID', 'DEST_CITY_MARKET_ID', 'DEST', 'DEST_CITY_NAME',
                             'DEST_STATE_FIPS', 'DEST_STATE_NM', 'DEST_WAC', 'DEP_TIME', 'CRS_DEP_TIME',
                             'DEP_DELAY_NEW', 'DEP_DELAY_GROUP', 'DEP_TIME_BLK', 'DIVERTED',
                             'origin_airport_name', 'origin_station_name', 'origin_iata_code',
                             'origin_icao', 'origin_region', 'origin_station_lon', 'origin_airport_lat',
                             'origin_station_lat', 'origin_airport_lon', 'dest_airport_name',
                             'dest_station_name', 'dest_iata_code', 'dest_icao', 'dest_region',
                             'dest_station_lat', 'dest_station_lon', 'dest_airport_lat',
                             'dest_airport_lon', 'sched_depart_date_time', 'NAME', 'REM',
                             'BackupEquipment', '_row_desc', 'FL_DATE', 'REPORT_TYPE', 'SOURCE']
        
        existing_redundant = [col for col in redundant_features if col in df.columns]
        if existing_redundant:
            df = df.drop(*existing_redundant)
        
        # Remove unusable features (post-departure)
        post_departure_features = ["TAXI_OUT", "WHEELS_OFF", "WHEELS_ON", "TAXI_IN", 
                                  "CRS_ARR_TIME", "ARR_TIME", "ARR_DELAY", "ARR_DELAY_NEW", 
                                  "ARR_DEL15", "ARR_DELAY_GROUP", "ARR_TIME_BLK", "CANCELLED", 
                                  "CRS_ELAPSED_TIME", "ACTUAL_ELAPSED_TIME", "AIR_TIME", 
                                  "CARRIER_DELAY", "WEATHER_DELAY", "LATE_AIRCRAFT_DELAY", 
                                  "WEATHER_DELAY", "NAS_DELAY", "CARRIER_DELAY", "SECURITY_DELAY", 
                                  "TAIL_NUMBER", "FLIGHTS"]
        
        existing_post_departure = [col for col in post_departure_features if col in df.columns]
        if existing_post_departure:
            df = df.drop(*existing_post_departure)
        
        # Convert data types
        int_cols = ['QUARTER', 'DAY_OF_MONTH', 'DAY_OF_WEEK', 'OP_CARRIER_FL_NUM', 
                    'DEP_DEL15', 'YEAR', 'MONTH', 'OP_CARRIER_FL_NUM']
        float_cols = ['DEP_DELAY', 'DISTANCE_GROUP', 'DISTANCE', 'origin_station_dis',
                     'dest_station_dis', 'LATITUDE', 'LONGITUDE', 'ELEVATION',
                     'HourlyAltimeterSetting', 'HourlyDewPointTemperature',
                     'HourlyDryBulbTemperature', 'HourlyPrecipitation', 
                     'HourlyRelativeHumidity', 'HourlySeaLevelPressure',
                     'HourlyStationPressure', 'HourlyVisibility',
                     'HourlyWetBulbTemperature', 'HourlyWindDirection', 'HourlyWindSpeed']
        
        for col_name in int_cols:
            if col_name in df.columns:
                df = df.withColumn(col_name, F.col(col_name).cast("int"))
        
        for col_name in float_cols:
            if col_name in df.columns:
                df = df.withColumn(col_name, F.col(col_name).cast("float"))
        
        # Remove duplicate records
        df = df.dropDuplicates()
        
        # Remove multicollinear features
        colinear_cols = ['HourlyStationPressure', 'DISTANCE_GROUP', 
                         'HourlySeaLevelPressure', 'HourlyWetBulbTemperature',
                         'HourlyDryBulbTemperature']
        
        existing_colinear = [col for col in colinear_cols if col in df.columns]
        if existing_colinear:
            df = df.drop(*existing_colinear)
        
        df = df.orderBy('YEAR', 'MONTH', 'DAY_OF_MONTH', 'sched_depart_date_time_UTC')
        df = df.repartition(180)  # Adjust for data volume after preprocessing
        checkpoint_save(df, "checkpoint_data_preprocessed")
        return checkpoint_load("checkpoint_data_preprocessed")
    
    def _get_high_missing_cols(self, df, threshold=30):
        """
        Find columns with missing values above the threshold percentage
        """
        # This would be better pre-calculated for a real transformer
        # But we'll implement it for completeness
        total_count = df.count()
        if total_count == 0:
            return []
            
        missing_counts = []
        for col in df.columns:
            null_count = df.where(F.col(col).isNull()).count()
            missing_percent = (null_count / total_count) * 100
            if missing_percent > threshold:
                missing_counts.append(col)
                
        return missing_counts

# Custom Transformer for datetime processing
class DateTimeProcessor(Transformer, DefaultParamsReadable, DefaultParamsWritable):
    def __init__(self):
        super(DateTimeProcessor, self).__init__()
    
    def _transform(self, dataset):
        """Process datetime columns to extract hour and minute features"""
        print('Running DateTimeProcessor...')

        datetime_columns = ['sched_depart_date_time_UTC', 'four_hours_prior_depart_UTC', 
                            'two_hours_prior_depart_UTC', 'DATE']
        
        result = dataset
        for col_ in datetime_columns:
            if col_ in dataset.columns:
                result = result.withColumn(col_, F.to_timestamp(col_, "yyyy-MM-dd'T'HH:mm:ss"))
        
        # Extract hour and minute components
        if 'sched_depart_date_time_UTC' in dataset.columns:
            result = result.withColumn('sched_depart_hour_UTC', F.hour('sched_depart_date_time_UTC')) \
                           .withColumn('sched_depart_minute_UTC', F.minute('sched_depart_date_time_UTC'))
        
        if 'four_hours_prior_depart_UTC' in dataset.columns:
            result = result.withColumn('four_hours_prior_depart_hour_UTC', F.hour('four_hours_prior_depart_UTC')) \
                           .withColumn('four_hours_prior_depart_minute_UTC', F.minute('four_hours_prior_depart_UTC'))
        
        if 'two_hours_prior_depart_UTC' in dataset.columns:
            result = result.withColumn('two_hours_prior_depart_hour_UTC', F.hour('two_hours_prior_depart_UTC')) \
                           .withColumn('two_hours_prior_depart_minute_UTC', F.minute('two_hours_prior_depart_UTC'))
        
        if 'DATE' in dataset.columns:
            result = result.withColumn('station_hour_UTC', F.hour('DATE')) \
                           .withColumn('station_minute_UTC', F.minute('DATE'))
        
        # Drop old datetime columns
        result = result.drop(*datetime_columns)

        return result

# Custom Transformer for SkyConditions processing
class SkyConditionsProcessor(Transformer, DefaultParamsReadable, DefaultParamsWritable):
    def __init__(self):
        super(SkyConditionsProcessor, self).__init__()
    
    def _transform(self, dataset):
        """Process HourlySkyConditions column to extract cloud data"""
        
        print('Running SkyConditionsProcessor...')
        if 'HourlySkyConditions' not in dataset.columns:
            return dataset
        
        result = dataset.withColumn("split_values", F.split(col("HourlySkyConditions"), " ")) \
                       .withColumn("HourlyCloudState", F.expr("slice(split_values, greatest(size(split_values)-1, 1), 2)")) \
                       .withColumn("HourlyCloudCoverage", F.split(col("HourlyCloudState")[0], ":")[0]) \
                       .withColumn("HourlyCloudLayerAmount", F.split(col("HourlyCloudState")[0], ":")[1].cast("int")) \
                       .withColumn("HourlyCloudBaseHeight", col("HourlyCloudState")[1].cast("float")*100) \
                       .drop("split_values", "HourlyCloudState", "HourlySkyConditions")
        
        return result

# Custom Transformer for dropping outliers processing for pre-chosen columns
class OutlierHandler(Transformer, DefaultParamsReadable, DefaultParamsWritable):
    def __init__(self):
        super(OutlierHandler, self).__init__()
    
    def _transform(self, dataset):
        """Process pre-chosen columns to drop outliers"""
        print('Running OutlierHandler...')
        df = dataset

        preserve_outlier_list = ['DEP_DELAY', 'origin_station_dis', 'dest_station_dis', 'HourlyPrecipitation', 'HourlyVisibility']

        float_cols_outliers = [col for col, dtype in df.dtypes if dtype in ('float') and col not in preserve_outlier_list]

        df_quantiles = df.approxQuantile(float_cols_outliers, [.25, .75], 0.01)
        df_IQR = [df_quantiles[i][1]- df_quantiles[i][0] for i in range(len(df_quantiles))]
        high_outlier = [df_quantiles[i][1] + 3*df_IQR[i] for i in range(len(df_quantiles))]
        low_outlier = [df_quantiles[i][0] - 3*df_IQR[i] for i in range(len(df_quantiles))]

        prev_size = df.count()
        for i, column in enumerate(float_cols_outliers):
            df = df.filter((df[column] >= low_outlier[i]) & (df[column] <= high_outlier[i]))
            new_size = df.count()
            print(f"The current column is {column}, the number of rows dropped is {prev_size - new_size}, the current number of rows is {new_size}")
            prev_size = new_size
            
        return df

# Custom Transformer for handling missing values
class MissingValueHandler(Transformer, DefaultParamsReadable, DefaultParamsWritable):
    def __init__(self):
        super(MissingValueHandler, self).__init__()
    
    def _transform(self, dataset):
        """Handle missing values in numerical and categorical columns"""
        print('Running MissingValueHandler...')
        result = dataset
        
        # Handle precipitation data - replace nulls with 0
        if 'HourlyPrecipitation' in result.columns:
            result = result.withColumn(
                "HourlyPrecipitation",
                F.when(F.col("HourlyPrecipitation").isNull(), 0.0)
                .otherwise(F.col("HourlyPrecipitation"))
            )

        # Impute numerical columns with median
        preserve_outlier_list = ['DEP_DELAY', 'origin_station_dis', 'dest_station_dis', 'HourlyPrecipitation', 'HourlyVisibility']

        float_cols_outliers = [col for col, dtype in result.dtypes if dtype in ('float') and col not in preserve_outlier_list]

        df_missing_vals = find_missing_values(result)

        float_cols_missing = list(set(df_missing_vals[df_missing_vals["percent_missing"] > 0].index).intersection(set(float_cols_outliers)) - set({"DEP_DELAY"}))

        if float_cols_missing:
            imputer = Imputer(strategy="median", inputCols=float_cols_missing, outputCols=float_cols_missing)

            imputer_model = imputer.fit(result)
            
            result = imputer_model.transform(result)

        # Fill missing categorical columns with 'missing'
        missing_cat_cols = ['HourlyCloudLayerAmount', 'HourlyCloudCoverage', 'TAIL_NUM']
        result = result.fillna('missing', subset='TAIL_NUM')
        # for col_name in missing_cat_cols:
        #     if col_name in result.columns:
        #         result = result.withColumn(
        #             col_name,
        #             F.when(F.col(col_name).isNull(), 'missing')
        #             .otherwise(F.col(col_name))
        #         )
        
        result = result.dropna()

        return result

# Custom Transformer for creating target bins
class TargetBinner(Transformer, DefaultParamsReadable, DefaultParamsWritable):
    def __init__(self):
        super(TargetBinner, self).__init__()
    
    def _transform(self, dataset):
        """Create delay bins from DEP_DELAY"""
        print('Running TargetBinner...')
        if 'DEP_DELAY' in dataset.columns:
            return dataset.withColumn("DEP_DELAY_BINNED",
                F.when(F.col("DEP_DELAY") < 0, 0)
                .when((F.col("DEP_DELAY") >= 0) & (F.col("DEP_DELAY") <= 15), 1)
                .when(F.col("DEP_DELAY") > 15, 2)
            )
        return dataset

# Custom Transformer for smote resampling
class GraphFeatureProcessor(Transformer, DefaultParamsReadable, DefaultParamsWritable):
    def __init__(self):
        super(GraphFeatureProcessor, self).__init__()
    
    def _transform(self, dataset):
        print('Running GraphFeatureProcessor...')
        df_final = graph_based_feature(dataset)
        return df_final

# # Custom Transformer for creating target encodings of high-cardinality categorical columns
# class TargetEncoder(Transformer, DefaultParamsReadable, DefaultParamsWritable):
#     def __init__(self):
#         super(TargetEncoder, self).__init__()
    
#     def _transform(self, dataset):
#         """Create target encodings for pre-chosen columns"""
#         print('Running TargetEncoder...')
#         cat_cols_to_target_encode = ['TAIL_NUM', 'OP_CARRIER_FL_NUM', 'ORIGIN_AIRPORT_ID', 'DEST_AIRPORT_ID', 'origin_station_id', 'dest_station_id', 'STATION']

#         # remaining_cat_columns = ['OP_UNIQUE_CARRIER',
#         #     'OP_CARRIER_AIRLINE_ID',
#         #     'ORIGIN_STATE_ABR',
#         #     'DEST_STATE_ABR',
#         #     'YEAR',
#         #     'origin_type',
#         #     'dest_type',
#         #     'HourlyCloudCoverage',
#         #     'HourlyCloudLayerAmount']
        
#         targ_enc_col_names = []
#         DEP_DELAY_mean = dataset.agg(F.mean("DEP_DELAY")).collect()[0][0]
#         for i, col in enumerate(cat_cols_to_target_encode):
#             # Compute the Mean of DEP_DELAY per Category
#             train_mean_df = dataset.groupBy(col).agg(F.mean("DEP_DELAY").alias(f"{col}_mean_delay")).orderBy(F.desc(f"{col}_mean_delay"))
#             # Accumulate New Column Names
#             targ_enc_col_names.append(f"{col}_mean_delay")
#             # Join the Aggregated Means to the DataFrames
#             if i == 0:
#                 dataset_joined = dataset.join(train_mean_df, on=col, how="left")
#             else:
#                 dataset_joined = dataset_joined.join(train_mean_df, on=col, how="left")
            
#             # dataset_joined = dataset_joined.drop(col)

#         # Fill NA's with mean
#         dataset_joined = dataset_joined.fillna(value=DEP_DELAY_mean, subset=targ_enc_col_names)

#         return dataset_joined

class TargetEncoder(Transformer, DefaultParamsReadable, DefaultParamsWritable):
    def __init__(self):
        super(TargetEncoder, self).__init__()
    
    def _transform(self, dataset):
        """Create target encodings for pre-chosen columns more efficiently"""
        print('Running TargetEncoder...')
        cat_cols_to_target_encode = ['TAIL_NUM', 'OP_CARRIER_FL_NUM', 'ORIGIN_AIRPORT_ID', 
                                     'DEST_AIRPORT_ID', 'origin_station_id', 
                                     'dest_station_id', 'STATION']
        
        # Only process columns that exist in the dataset
        existing_cols = [col for col in cat_cols_to_target_encode if col in dataset.columns]
        
        if not existing_cols:
            return dataset
            
        # Get overall mean for filling NA values
        DEP_DELAY_mean = dataset.agg(F.mean("DEP_DELAY")).collect()[0][0]
        
        # Pre-calculate all encoding lookup tables at once and cache them
        encoding_dfs = {}
        for col in existing_cols:
            encoding_df = dataset.groupBy(col).agg(
                F.mean("DEP_DELAY").alias(f"{col}_mean_delay")
            ).orderBy(F.desc(f"{col}_mean_delay"))
            
            # Cache small lookup tables for faster joins
            encoding_df = encoding_df.cache()
            encoding_dfs[col] = encoding_df
            
        # Start with original dataset
        result_df = dataset
        
        # Perform all joins efficiently
        for col, encoding_df in encoding_dfs.items():
            # Broadcast join for better performance
            result_df = result_df.join(
                F.broadcast(encoding_df), 
                on=col, 
                how="left"
            )
            
            # Unpersist the lookup table after use
            encoding_df.unpersist()
            
        # Fill all NA values at once
        all_mean_cols = [f"{col}_mean_delay" for col in existing_cols]
        result_df = result_df.fillna(value=DEP_DELAY_mean, subset=all_mean_cols)
        
        return result_df

# Custom Transformer for cyclic encoding
class CyclicEncoder(Transformer, DefaultParamsReadable, DefaultParamsWritable):
    def __init__(self, inputCols=None, outputPrefix=""):
        super(CyclicEncoder, self).__init__()
        self.inputCols = inputCols or []
        self.outputPrefix = outputPrefix
        self.ranges = {
            'QUARTER': 4,
            'DAY_OF_MONTH': 31,
            'DAY_OF_WEEK': 7,
            'MONTH': 12,
            'sched_depart_hour_UTC': 24,
            'sched_depart_minute_UTC': 60,
            'four_hours_prior_depart_hour_UTC': 24,
            'four_hours_prior_depart_minute_UTC': 60,
            'two_hours_prior_depart_hour_UTC': 24,
            'two_hours_prior_depart_minute_UTC': 60,
            'station_hour_UTC': 24,
            'station_minute_UTC': 60
        }
    
    def setInputCols(self, value):
        """Set input columns to be encoded"""
        self.inputCols = value
        return self
    
    def getInputCols(self):
        """Get input columns to be encoded"""
        return self.inputCols
    
    def getOutputCols(self):
        """Get output column names after encoding"""
        output_cols = []
        for col in self.inputCols:
            if col in self.ranges:
                output_cols.extend([f"{col}_sin", f"{col}_cos"])
        return output_cols
    
    def _transform(self, dataset):
        """Apply cyclic encoding to temporal features"""
        print('Running CyclicEncoder...')
        result = dataset
        
        # Apply cyclic encoding only to columns that exist in the dataset
        available_cols = [col for col in self.inputCols if col in result.columns and col in self.ranges]
        
        for feature in available_cols:
            max_val = self.ranges[feature]
            result = result.withColumn(
                f"{feature}_sin", 
                F.sin(2 * F.pi() * F.col(feature) / F.lit(max_val))
            )
            result = result.withColumn(
                f"{feature}_cos", 
                F.cos(2 * F.pi() * F.col(feature) / F.lit(max_val))
            )
        
        return result

# Custom Transformer for smote resampling
class SMOTEResampler(Transformer, DefaultParamsReadable, DefaultParamsWritable):
    def __init__(self):
        super(SMOTEResampler, self).__init__()
    
    def _transform(self, dataset):
        print('Running SMOTEResampler...')
        df_final = fast_pseudo_smote_preserve_all(dataset, "DEP_DELAY", feature_col="final_features")
        return df_final
    
# Custom Transformer for smote resampling
class UpsampleResampler(Transformer, DefaultParamsReadable, DefaultParamsWritable):
    def __init__(self):
        super(UpsampleResampler, self).__init__()
    
    def _transform(self, dataset):
        print('Running UpsampleResampler...')
        df_final = upsampling(dataset)
        return df_final
    
# Custom Transformer for smote resampling
class DownsampleResampler(Transformer, DefaultParamsReadable, DefaultParamsWritable):
    def __init__(self):
        super(DownsampleResampler, self).__init__()
    
    def _transform(self, dataset):
        print('Running DownsampleResampler...')
        df_final = downsampling(dataset)
        return df_final

Parallelized Transformers

In [0]:
class ParallelTargetEncoder(Transformer, DefaultParamsReadable, DefaultParamsWritable):
    def __init__(self):
        super(ParallelTargetEncoder, self).__init__()
    
    def _transform(self, dataset):
        """Parallel target encoding implementation for high-cardinality categorical features"""
        print('Running ParallelTargetEncoder...')
        cat_cols_to_target_encode = ['TAIL_NUM', 'OP_CARRIER_FL_NUM', 'ORIGIN_AIRPORT_ID', 
                                     'DEST_AIRPORT_ID', 'origin_station_id', 
                                     'dest_station_id', 'STATION']
        
        # Filter to only process columns that exist
        existing_cols = [col for col in cat_cols_to_target_encode if col in dataset.columns]
        
        if not existing_cols:
            return dataset
        
        # Pre-compute global mean for filling NA values
        DEP_DELAY_mean = dataset.agg(F.mean("DEP_DELAY")).collect()[0][0]
        
        # Set partition count based on cluster size
        num_partitions = 200  # Adjusted for your 40-80 core cluster
        
        # Use a ThreadPool to compute encodings in parallel
        def compute_encoding(col_name):
            # Repartition specifically for this operation 
            encoding_df = dataset.repartition(min(60, dataset.rdd.getNumPartitions()), col_name) \
                .groupBy(col_name) \
                .agg(F.mean("DEP_DELAY").alias(f"{col_name}_mean_delay"))
            
            # Cache the small lookup table
            encoding_df = encoding_df.cache()
            result_df = dataset.hint("broadcast", encoding_df).join(
                encoding_df, on=col_name, how="left"
            )
            return result_df, col_name
        
        # Process in parallel batches to avoid overwhelming the driver
        batch_size = 3  # Process 3 columns at a time
        batched_cols = [existing_cols[i:i+batch_size] for i in range(0, len(existing_cols), batch_size)]
        
        result_df = dataset
        
        for batch in batched_cols:
            # Process this batch in parallel
            with ThreadPool(processes=min(len(batch), 3)) as pool:
                partial_results = pool.map(compute_encoding, batch)
            
            # Merge the results
            for partial_df, col_name in partial_results:
                # We only need the new column from each partial result
                mean_col = f"{col_name}_mean_delay"
                result_df = result_df.join(
                    partial_df.select(col_name, mean_col),
                    on=col_name,
                    how="left"
                )
        
        # Fill missing values in all mean columns at once
        all_mean_cols = [f"{col}_mean_delay" for col in existing_cols]
        result_df = result_df.fillna(DEP_DELAY_mean, subset=all_mean_cols)
        
        return result_df

In [0]:
class ParallelGraphFeatureProcessor(Transformer, DefaultParamsReadable, DefaultParamsWritable):
    def __init__(self):
        super(ParallelGraphFeatureProcessor, self).__init__()
    
    def _transform(self, dataset):
        print('Running ParallelGraphFeatureProcessor...')
        
        # Make the dataset more resilient
        dataset = dataset.repartition(250).persist(StorageLevel.MEMORY_AND_DISK)
        dataset.count()  # Force materialization
        
        try:
            # Calculate the flight frequencies with better partition control
            flight_freq = dataset.select(
                col('ORIGIN_AIRPORT_ID').alias('src'),
                col('DEST_AIRPORT_ID').alias('dst')
            ).groupBy('src', 'dst').count().withColumnRenamed('count', 'flight_freq')
            
            # Break lineage with checkpoint to avoid recomputation
            flight_freq = flight_freq.repartition(100)
            flight_freq.checkpoint()
            flight_freq.count()  # Force materialization
            
            # Create vertices with optimized operation
            vertices_origin = dataset.select(col('ORIGIN_AIRPORT_ID').alias('id')).distinct()
            vertices_dest = dataset.select(col('DEST_AIRPORT_ID').alias('id')).distinct()
            
            # Union and distinct with controlled partitioning
            vertices = vertices_origin.unionAll(vertices_dest).distinct().repartition(20)
            vertices.checkpoint()
            vertices.count()  # Force materialization
            
            # Create edges with optimized partitioning
            edges = dataset.select(
                col('ORIGIN_AIRPORT_ID').alias('src'), 
                col('DEST_AIRPORT_ID').alias('dst')
            ).distinct()
            
            # Join with controlled partition count
            edges = edges.repartition(100, "src", "dst")
            flight_freq = flight_freq.repartition(100, "src", "dst")
            
            edges = edges.join(flight_freq, ['src', 'dst'])
            edges.checkpoint()
            edges.count()  # Force materialization
            
            # GraphFrames creation with more resilient settings
            from graphframes import GraphFrame
            g = GraphFrame(vertices, edges)
            
            # Use maxIter instead of tol to ensure the algorithm terminates
            # in a predictable amount of time
            results = g.pageRank(
                maxIter=20
            )
            
            # Extract and checkpoint airport ranks
            airport_pr = results.vertices.select(col('id'), col('pagerank'))
            airport_pr = airport_pr.repartition(20)
            airport_pr.checkpoint()
            airport_pr.count()  # Force materialization
            
            # Broadcast the smaller PageRank table for efficient joins
            broadcast_pr = F.broadcast(airport_pr)
            
            # Perform the joins more efficiently
            df_graph = dataset.join(
                broadcast_pr, 
                dataset.ORIGIN_AIRPORT_ID == broadcast_pr.id,
                "left"
            ).drop('id').withColumnRenamed('pagerank', 'origin_airport_pagerank')
            
            # Break lineage to avoid recomputing the expensive first join
            df_graph = df_graph.repartition(250) 
            df_graph.checkpoint()
            df_graph.count()  # Force materialization
            
            # Second join with broadcast
            df_graph = df_graph.join(
                broadcast_pr, 
                df_graph.DEST_AIRPORT_ID == broadcast_pr.id,
                "left"
            ).drop('id').withColumnRenamed('pagerank', 'dest_airport_pagerank')
            
            # Fill any nulls that might have been created in the join
            df_graph = df_graph.fillna(0.0, subset=['origin_airport_pagerank', 'dest_airport_pagerank'])
            
            # Clean up the final result and make it ready for next stage
            df_graph = df_graph.repartition(200)
            
            # Unpersist the original dataset to free memory
            dataset.unpersist()
            
            return df_graph
            
        except Exception as e:
            # If graph processing fails, fall back to a simple version
            # that just assigns default values to avoid pipeline failures
            dataset.unpersist()
            print(f"Graph processing failed with error: {e}")
            print("Falling back to default pagerank values")
            
            # Add default pagerank columns
            fallback_df = dataset.withColumn("origin_airport_pagerank", F.lit(1.0))
            fallback_df = fallback_df.withColumn("dest_airport_pagerank", F.lit(1.0))
            
            return fallback_df.repartition(200)

In [0]:
class ParallelDataPreprocessor(Transformer, HasInputCols, HasOutputCols, DefaultParamsReadable, DefaultParamsWritable):
    def __init__(self, inputCols=None, outputCols=None):
        super(ParallelDataPreprocessor, self).__init__()
        self._setDefault(inputCols=[], outputCols=[])
        self.setParams(inputCols, outputCols)
    
    def setParams(self, inputCols=None, outputCols=None):
        if inputCols is not None:
            self.setInputCols(inputCols)
        if outputCols is not None:
            self.setOutputCols(outputCols)
        return self
    
    def _transform(self, dataset):
        """Optimized transformer with parallel processing"""
        print('Running ParallelDataPreprocessor...')
        
        # Set optimal partition count for processing
        df = dataset.repartition(200)
        
        # Step 1: Drop records with missing target variable - can be done first for efficiency
        df = df.dropna(subset=["DEP_DELAY"])
        
        # Step 2-7: Group operations that can be done in parallel
        # Identify columns to keep (inverse of columns to drop)
        redundant_features = ['ORIGIN', 'OP_CARRIER', 'ORIGIN_AIRPORT_SEQ_ID', 'ORIGIN_CITY_MARKET_ID',
                             'ORIGIN_CITY_NAME', 'ORIGIN_STATE_NM', 'ORIGIN_WAC', 'ORIGIN_STATE_FIPS',
                             'DEST_AIRPORT_SEQ_ID', 'DEST_CITY_MARKET_ID', 'DEST', 'DEST_CITY_NAME',
                             'DEST_STATE_FIPS', 'DEST_STATE_NM', 'DEST_WAC', 'DEP_TIME', 'CRS_DEP_TIME',
                             'DEP_DELAY_NEW', 'DEP_DELAY_GROUP', 'DEP_TIME_BLK', 'DIVERTED',
                             'origin_airport_name', 'origin_station_name', 'origin_iata_code',
                             'origin_icao', 'origin_region', 'origin_station_lon', 'origin_airport_lat',
                             'origin_station_lat', 'origin_airport_lon', 'dest_airport_name',
                             'dest_station_name', 'dest_iata_code', 'dest_icao', 'dest_region',
                             'dest_station_lat', 'dest_station_lon', 'dest_airport_lat',
                             'dest_airport_lon', 'sched_depart_date_time', 'NAME', 'REM',
                             'BackupEquipment', '_row_desc', 'FL_DATE', 'REPORT_TYPE', 'SOURCE']
        
        post_departure_features = ["TAXI_OUT", "WHEELS_OFF", "WHEELS_ON", "TAXI_IN", 
                                  "CRS_ARR_TIME", "ARR_TIME", "ARR_DELAY", "ARR_DELAY_NEW", 
                                  "ARR_DEL15", "ARR_DELAY_GROUP", "ARR_TIME_BLK", "CANCELLED", 
                                  "CRS_ELAPSED_TIME", "ACTUAL_ELAPSED_TIME", "AIR_TIME", 
                                  "CARRIER_DELAY", "WEATHER_DELAY", "LATE_AIRCRAFT_DELAY", 
                                  "WEATHER_DELAY", "NAS_DELAY", "CARRIER_DELAY", "SECURITY_DELAY", 
                                  "TAIL_NUMBER", "FLIGHTS"]
        
        colinear_cols = ['HourlyStationPressure', 'DISTANCE_GROUP', 
                         'HourlySeaLevelPressure', 'HourlyWetBulbTemperature',
                         'HourlyDryBulbTemperature']
        
        # Combine all columns to drop
        all_cols_to_drop = redundant_features + post_departure_features + colinear_cols
        cols_to_drop = self._get_high_missing_cols(df) + [col for col in all_cols_to_drop if col in df.columns]
        
        # Keep only columns we want
        keep_cols = [col for col in df.columns if col not in cols_to_drop]
        df = df.select(*keep_cols)
        
        # Convert data types in bulk
        int_cols = ['QUARTER', 'DAY_OF_MONTH', 'DAY_OF_WEEK', 'OP_CARRIER_FL_NUM', 
                    'DEP_DEL15', 'YEAR', 'MONTH', 'OP_CARRIER_FL_NUM']
        float_cols = ['DEP_DELAY', 'DISTANCE_GROUP', 'DISTANCE', 'origin_station_dis',
                     'dest_station_dis', 'LATITUDE', 'LONGITUDE', 'ELEVATION',
                     'HourlyAltimeterSetting', 'HourlyDewPointTemperature',
                     'HourlyDryBulbTemperature', 'HourlyPrecipitation', 
                     'HourlyRelativeHumidity', 'HourlySeaLevelPressure',
                     'HourlyStationPressure', 'HourlyVisibility',
                     'HourlyWetBulbTemperature', 'HourlyWindDirection', 'HourlyWindSpeed']
        
        # Use select with casting to apply all type conversions in one operation
        select_exprs = []
        for col_name in df.columns:
            if col_name in int_cols:
                select_exprs.append(F.col(col_name).cast("int").alias(col_name))
            elif col_name in float_cols:
                select_exprs.append(F.col(col_name).cast("float").alias(col_name))
            else:
                select_exprs.append(F.col(col_name))
        
        df = df.select(*select_exprs)
        
        # Drop duplicates and sort once at the end
        df = df.dropDuplicates()
        
        # Sort and repartition for next stage
        df = df.orderBy('YEAR', 'MONTH', 'DAY_OF_MONTH', 'sched_depart_date_time_UTC')
        
        checkpoint_save(df, "checkpoint_preprocessed")
        clear_cache()
        return checkpoint_load("checkpoint_preprocessed")
    
    def _get_high_missing_cols(self, df, threshold=30):
        """
        Find columns with missing values above the threshold percentage
        """
        total_count = df.count()
        if total_count == 0:
            return []
            
        missing_counts = []
        for col in df.columns:
            null_count = df.where(F.col(col).isNull()).count()
            missing_percent = (null_count / total_count) * 100
            if missing_percent > threshold:
                missing_counts.append(col)
                
        return missing_counts

new custom cv method

In [0]:
def _parallelFitTasks(est, train_df, evaluators, val_df, epm):
    """
    Creates tasks for each parameter map (paramMap) so that they can be run in parallel threads.
    Each task returns a tuple (paramIndex, metrics_dict), where:
      - paramIndex is the index of the parameter map in the list epm.
      - metrics_dict is a dictionary mapping each evaluatorâ€™s metric name to the computed score on the validation data.
    """
     # Define an inner function 'singleTask' that takes a parameter map index.
    def singleTask(paramIndex):
        # Retrieve the parameter map for the current index from the list of parameter maps.
        paramMap = epm[paramIndex]
        # Create a copy of the estimator with the current parameter map applied, then fit the model on the training DataFrame.
        model = est.copy(paramMap).fit(train_df)
        # Use the trained model to make predictions on the validation DataFrame.
        predictions = model.transform(val_df)
        # Initialize an empty dictionary to store the metrics computed by each evaluator.
        metrics_dict = {}
        # Loop through the list of evaluators provided.
        for evaluator in evaluators:
            # Retrieve the metric name for the current evaluator (e.g., "f1", "accuracy").
            metric_name = evaluator.getMetricName()
             # Evaluate the predictions using the current evaluator and store the score in the dictionary.
            metrics_dict[metric_name] = evaluator.evaluate(predictions)
        # Return a tuple containing the parameter map index and the dictionary of computed metrics.
        return paramIndex, metrics_dict
    
    # Create and return a list of lambda functions (tasks) for each parameter map index (for each hyperparameter combination)
    return [lambda idx=i: singleTask(idx) for i in range(len(epm))]

class CustomCrossValidator(Estimator, MLReadable, MLWritable):
    """
    A custom cross-validator (inspired by Tim Lin's blog, https://www.timlrx.com/blog/creating-a-custom-cross-validation-function-in-pyspark) to perform
    time-series or custom cross-validation in one DataFrame. It supports multiple evaluators to compute various metrics simultaneously.
    
    Steps:
      1) Collect distinct fold IDs from 'fold_id'.
      2) For each fold, filter rows where train_test == splitWord[0] for training,
         and train_test == splitWord[1] for testing.
      3) Train/evaluate each param map in parallel, accumulate metrics.
      4) Pick the best param map (best hyperparameter combination), optionally retrain the final model on ALL data labeled as train.
    """
    def __init__(self, estimator=None, estimatorParamMaps=None, evaluators=None,
                 splitWord=('train','test'), cvCol='fold_id', seed=None, parallelism=1, verbose=0,
                 retrainFullModel=False):
        """
        Parameters:
         - estimator: ML estimator (such as LogisticRegression, RandomForestClassifier)
         - estimatorParamMaps: list of ParamMaps for hyperparameter search
         - evaluators: evaluator or list of evaluators to compute metrics (such as MulticlassClassificationEvaluator)
         - splitWord: tuple specifying the train/test labels (default ('train','test'))
         - cvCol: name of the fold ID column in the DataFrame (default 'fold_id')
         - seed: optional random seed
         - parallelism: number of threads for parallel tasks
         - verbose: 0 for no prints, 1 for printing fold info and metrics
         - retrainFullModel: if True, retrain final model on ALL data labeled as train (default 'False')
        """
        super().__init__() # allows you to call methods of a parent (or superclass) from a subclass
        self.estimator = estimator
        self.estimatorParamMaps = estimatorParamMaps
         # Make sure evaluators is always a list (even if a single evaluator is passed)
        self.evaluators = evaluators if isinstance(evaluators, list) else [evaluators]
        self.splitWord = splitWord
        self.cvCol = cvCol
        self.seed = seed
        self.parallelism = parallelism
        self.verbose = verbose
        self.retrainFullModel = retrainFullModel

    def _fit(self, dataset):
        est = self.estimator
        epm = self.estimatorParamMaps
        evaluators = self.evaluators
        trainWord, testWord = self.splitWord
        foldColName = self.cvCol
        par = self.parallelism

        # Make sure required parameters are provided
        if not est or not epm or not evaluators:
            raise ValueError("Must provide estimator, estimatorParamMaps, and evaluators.")

        # Get distinct fold IDs
        distinctFolds = dataset.select(foldColName).distinct().collect()
        foldIds = sorted(row[foldColName] for row in distinctFolds)
        nFolds = len(foldIds)
        numModels = len(epm)

        # Initialize lists of dictionaries to hold metrics per param map
        avg_metrics = [dict() for i in range(numModels)]
        matrix_metrics = [[dict() for i in range(nFolds)] for i in range(numModels)]

        # Create a thread pool for parallel processing
        pool = ThreadPool(processes=min(par, numModels))

        # Loop over each fold and process train/test splits
        for i, fid in enumerate(foldIds):
            train_df = dataset.filter((col(foldColName) == fid) & (col("train_test") == trainWord)).cache()
            val_df   = dataset.filter((col(foldColName) == fid) & (col("train_test") == testWord)).cache()

            if self.verbose:
                print(f"fold {fid}: train={train_df.count()} rows, test={val_df.count()} rows")

            # Create parallel tasks to fit and evaluate each parameter map on this fold
            tasks = _parallelFitTasks(est, train_df, evaluators, val_df, epm)
            for j, metrics_dict in pool.imap_unordered(lambda f: f(), tasks):
                # Store metrics for the current parameter map and fold
                matrix_metrics[j][i] = metrics_dict
                # Accumulate average metrics over folds for each evaluator metric
                for evaluator in evaluators:
                    mname = evaluator.getMetricName()
                    avg_metrics[j][mname] = avg_metrics[j].get(mname, 0.0) + (metrics_dict[mname] / nFolds)

            # Free up memory by unpersisting the DataFrames
            train_df.unpersist()
            val_df.unpersist()

        # Choose best param map based on the primary evaluator's metric (the first evaluator you specify in `CustomCrossValidator()` )
        primary_metric = evaluators[0].getMetricName()
        if evaluators[0].isLargerBetter():
            bestIndex = int(np.argmax([avg_metrics[j][primary_metric] for j in range(numModels)]))
        else:
            bestIndex = int(np.argmin([avg_metrics[j][primary_metric] for j in range(numModels)]))

        # Extract best metric values and hyperparameter values from the best parameter map
        best_metric_values = avg_metrics[bestIndex]
        best_hyperparams = {param.name: value for param, value in epm[bestIndex].items()}

        # Optionally retrain final model on ALL data labeled as train
        if self.retrainFullModel:
            full_train_df = dataset.filter(col("train_test") == trainWord)
            bestModel = est.copy(epm[bestIndex]).fit(full_train_df)
        else:
            bestModel = None

        if self.verbose:
            print("Best Metric Values:", best_metric_values)
            print("Best Hyperparameter Values:", best_hyperparams)

        # Return a custom cross-validation model containing the best model and additional details
        return CustomCrossValidatorModel(bestModel, matrix_metrics, avg_metrics, bestIndex, best_metric_values, best_hyperparams)
    
# The CustomCrossValidatorModel class is needed to wrap the best model along with the additional cross-validation results 
# (like average metrics, best hyperparameters, etc.), so that we can easily inspect the results and use the model
# in our Spark ML pipelines.
class CustomCrossValidatorModel(Model, MLReadable, MLWritable):
    """
    Custom Cross Validator Model that holds:
      - bestModel: the final model (if retrained)
      - matrix_metrics: detailed metrics for each fold and each param map
      - avgMetrics: average metrics across folds per param map (list of dictionaries)
      - bestIndex: the index of the best parameter map
      - bestMetricValues: the metric dictionary for the best parameter map
      - bestHyperParams: the best hyperparameter values as a dictionary
    """
    def __init__(self, bestModel, matrix_metrics, avgMetrics, bestIndex, bestMetricValues, bestHyperParams):
        super(CustomCrossValidatorModel, self).__init__() # allows you to call methods of a parent (or superclass) from a subclass
        self.bestModel = bestModel
        self.matrix_metrics = matrix_metrics
        self.avgMetrics = avgMetrics
        self.bestIndex = bestIndex
        self.bestMetricValues = bestMetricValues
        self.bestHyperParams = bestHyperParams

    def _transform(self, dataset):
        # If no final model is available, raise an error
        if self.bestModel is None:
            raise ValueError("No best model available. Consider setting retrainFullModel=True.")
        # Otherwise, use the best model to transform (make predictions on) the dataset
        return self.bestModel.transform(dataset)

    def copy(self, extra=None):
        # Create and return a copy of this model with all its attributes
        return CustomCrossValidatorModel(self.bestModel, self.matrix_metrics, self.avgMetrics,
                                           self.bestIndex, self.bestMetricValues, self.bestHyperParams)

    def explainParams(self):
        # Return a string summary of the key parameters (bestIndex and bestHyperParams) of this model
        return "CustomCrossValidatorModel with bestIndex={} and bestHyperParams={}".format(self.bestIndex, self.bestHyperParams)


optimized time based folds (doesnt repeat certain unnecessary processing)

In [0]:
def create_time_based_folds_optimized(df, n_folds=3, train_ratio=0.8, margin=0, resampling_method=None):
    """
    Optimized version of create_time_based_folds that focuses only on:
    1. Assigning fold IDs and train/test labels
    2. Target encoding (per fold to prevent leakage)
    3. Feature scaling (per fold to prevent leakage)
    4. Optional resampling (per fold)
    
    Parameters:
      - df: PySpark DataFrame (already preprocessed with categorical and cyclic features)
      - n_folds: Number of folds (blocks)
      - train_ratio: Fraction of each fold used for training
      - margin: Number of rows to skip between training and test within each fold
      - resampling_method: Method for resampling ('SMOTE', 'UPSAMPLE', 'DOWNSAMPLE', None)
    
    Assumes df already contains:
    - All relevant features
    - 'cyclic_onehot_features' column (vector of one-hot encoded and cyclic features)
    
    Returns:
    - DataFrame with fold_id, train_test labels, and properly processed features
    """
    # First, balance the partitions while maintaining chronological order
    df = create_balanced_time_ordered_partitions(df, num_partitions=200)

    # 1. Assign row indices in chronological order
    w = Window.orderBy(F.monotonically_increasing_id())
    df_indexed = df.withColumn("row_index", F.row_number().over(w) - 1)
    
    # 2. Compute fold size
    total_count = df_indexed.count()
    fold_size = total_count // n_folds
    
    # 3. Create UDF to assign fold_id and train_test labels
    schema = T.StructType([
        T.StructField("fold_id", T.IntegerType(), False),
        T.StructField("train_test", T.StringType(), False)
    ])
    
    @F.udf(returnType=schema)
    def assign_fold_train_test(row_idx):
        # Determine fold_id
        fold_id = int(row_idx // fold_size) + 1
        if fold_id > n_folds:
            fold_id = n_folds
        
        # Position within fold
        start_idx = (fold_id - 1) * fold_size
        in_fold_pos = row_idx - start_idx
        
        # Handle last fold differently if it has leftover rows
        current_fold_size = fold_size
        if fold_id == n_folds:
            leftover = total_count - (n_folds - 1) * fold_size
            current_fold_size = leftover
            in_fold_pos = row_idx - (n_folds - 1) * fold_size
        
        # Determine train/test/skip status
        train_cutoff = int(train_ratio * current_fold_size)
        if in_fold_pos < train_cutoff:
            return (fold_id, "train")
        elif in_fold_pos < train_cutoff + margin:
            return (fold_id, "skip")
        else:
            return (fold_id, "test")
    
    # 4. Apply UDF to create fold structure
    df_labeled = df_indexed.withColumn(
        "fold_struct", assign_fold_train_test(F.col("row_index"))
    ).withColumn(
        "fold_id", F.col("fold_struct.fold_id")
    ).withColumn(
        "train_test", F.col("fold_struct.train_test")
    ).drop("row_index", "fold_struct")

    # After fold labeling
    checkpoint_save(df_labeled, "checkpoint_fold_labeled")
    df_labeled = checkpoint_load("checkpoint_fold_labeled")
    
    # 5. Identify numeric features for scaling (exclude DEP_DELAY)
    numeric_cols = [c for (c, t) in df_labeled.dtypes 
                   if t in ('float', 'double') and c != 'DEP_DELAY']
    
    # 6. Columns to target encode
    cat_cols_to_target_encode = ['TAIL_NUM', 'OP_CARRIER_FL_NUM', 'ORIGIN_AIRPORT_ID', 
                              'DEST_AIRPORT_ID', 'origin_station_id', 
                              'dest_station_id', 'STATION']
    
    # Keep only columns that exist in the dataframe
    cat_cols_to_target_encode = [c for c in cat_cols_to_target_encode 
                               if c in df_labeled.columns]
    
    # 7. Process each fold separately to prevent data leakage
    fold_id_list = df_labeled.select("fold_id").distinct().rdd.map(lambda r: r[0]).collect()
    fold_id_list = sorted(fold_id_list)
    
    # Store processed folds for later union
    processed_folds = []
    
    for fid in fold_id_list:
        print(f"Processing fold {fid}...")
        
        # Extract train/test/skip parts
        train_df = df_labeled.filter((F.col("fold_id") == fid) & (F.col("train_test") == "train"))
        test_df = df_labeled.filter((F.col("fold_id") == fid) & (F.col("train_test") == "test"))
        skip_df = df_labeled.filter((F.col("fold_id") == fid) & (F.col("train_test") == "skip"))
        
        # Skip empty folds
        if train_df.count() == 0:
            print(f"Fold {fid} has empty training set. Skipping.")
            continue
        
        # === TARGET ENCODING (per fold) ===
        if cat_cols_to_target_encode:
            print(f"  Performing target encoding for fold {fid}...")
            
            # Get mean target value for missing values
            DEP_DELAY_mean = train_df.agg(F.mean("DEP_DELAY")).collect()[0][0]
            
            # Calculate all target encodings from train set at once
            encoding_dfs = {}
            for col in cat_cols_to_target_encode:
                if col in train_df.columns:
                    # Compute mean delay per category
                    encoding_df = train_df.groupBy(col).agg(
                        F.mean("DEP_DELAY").alias(f"{col}_mean_delay")
                    )
                    encoding_dfs[col] = encoding_df
            
            # Apply encodings to train, test, and skip sets
            for col, encoding_df in encoding_dfs.items():
                # Apply to train set
                train_df = train_df.join(F.broadcast(encoding_df), on=col, how="left")
                
                # Apply to test set if it exists
                if test_df.count() > 0:
                    test_df = test_df.join(F.broadcast(encoding_df), on=col, how="left")
                
                # Apply to skip set if it exists
                if skip_df.count() > 0:
                    skip_df = skip_df.join(F.broadcast(encoding_df), on=col, how="left")
            
            # Fill missing values with global mean
            mean_cols = [f"{col}_mean_delay" for col in cat_cols_to_target_encode 
                       if col in train_df.columns]
            
            if mean_cols:
                train_df = train_df.fillna(DEP_DELAY_mean, subset=mean_cols)
                if test_df.count() > 0:
                    test_df = test_df.fillna(DEP_DELAY_mean, subset=mean_cols)
                if skip_df.count() > 0:
                    skip_df = skip_df.fillna(DEP_DELAY_mean, subset=mean_cols)
        
        # === FEATURE SCALING (per fold) ===
        if numeric_cols:
            print(f"  Scaling features for fold {fid}...")
            
            # Add target-encoded features to numeric features for scaling
            target_cols = [f"{col}_mean_delay" for col in cat_cols_to_target_encode 
                         if f"{col}_mean_delay" in train_df.columns]
            all_numeric = numeric_cols + target_cols
            
            # Assemble numeric features
            assembler = VectorAssembler(
                inputCols=all_numeric,
                outputCol="numeric_features",
                handleInvalid="skip"
            )
            
            train_assembled = assembler.transform(train_df)
            test_assembled = assembler.transform(test_df) if test_df.count() > 0 else None
            skip_assembled = assembler.transform(skip_df) if skip_df.count() > 0 else None
            
            # Scale using train data stats only
            scaler = RobustScaler(
                inputCol="numeric_features",
                outputCol="scaled_numeric_features",
                withCentering=False,
                withScaling=True
            )
            
            scaler_model = scaler.fit(train_assembled)
            
            # Apply to all sets
            train_scaled = scaler_model.transform(train_assembled)
            test_scaled = scaler_model.transform(test_assembled) if test_assembled else None
            skip_scaled = scaler_model.transform(skip_assembled) if skip_assembled else None
            
            # Final feature assembly (combine scaled numeric with existing cyclic/onehot)
            final_cols = ["scaled_numeric_features"]
            if "cyclic_onehot_features" in train_scaled.columns:
                final_cols.append("cyclic_onehot_features")
            
            final_assembler = VectorAssembler(
                inputCols=final_cols,
                outputCol="final_features",
                handleInvalid="skip"
            )
            
            train_final = final_assembler.transform(train_scaled)
            test_final = final_assembler.transform(test_scaled) if test_scaled else None
            skip_final = final_assembler.transform(skip_scaled) if skip_scaled else None
            
            # === RESAMPLING (only for train) ===
            if resampling_method and train_final:
                print(f"  Applying {resampling_method} resampling for fold {fid}...")
                
                if resampling_method.upper() == "SMOTE":
                    train_final = fast_pseudo_smote_preserve_all(
                        train_final, "DEP_DELAY_BINNED", feature_col="final_features"
                    )
                elif resampling_method.upper() == "UPSAMPLE":
                    train_final = upsampling(train_final)
                elif resampling_method.upper() == "DOWNSAMPLE":
                    train_final = downsampling(train_final)
            
            # Combine all parts of this fold
            fold_parts = []
            if train_final is not None and train_final.count() > 0:
                fold_parts.append(train_final)
            if test_final is not None and test_final.count() > 0:
                fold_parts.append(test_final)
            if skip_final is not None and skip_final.count() > 0:
                fold_parts.append(skip_final)
            
            if fold_parts:
                fold_processed = reduce(DataFrame.unionByName, fold_parts)
                checkpoint_save(fold_processed, f"checkpoint_fold_{fid}")
                fold_processed = checkpoint_load(f"checkpoint_fold_{fid}")
                processed_folds.append(fold_processed)
    
    # 8. Combine all processed folds
    if not processed_folds:
        print("No processed folds to return!")
        return None
    
    print("Combining all processed folds...")
    result_df = reduce(DataFrame.unionByName, processed_folds)
    
    return result_df

Method to run Custom Cross-validator to get best hyper parameters

NOTE: takes final feature list if want to keep specific post-processed columns, custom cv parallelism number is set here

Resample method options: 'smote', 'upsample', 'downsample'

In [0]:
def run_cross_validation_mod(df_train, modelType, paramGrid, final_feature_list=None, resample_method=None):
    """
    Run cross-validation to find the best hyperparameters for provided model/paramgrid
    
    Parameters:
    - df_train: Training DataFrame (should be already preprocessed but NOT scaled)
    - final_feature_list: List of columns to include in the final feature set (optional)
    
    Returns:
    - best_params: Dictionary of best hyperparameters
    - best_metrics: Dictionary of metrics for the best hyperparameters
    """
    print("Preparing data for time-based cross-validation...")
    
    # Apply non-scaling preprocessing
    # This includes cleaning, handling missing values, feature extraction, but NOT scaling
    print("Applying non-scaling preprocessing steps...")
    
    # Create a modified pipeline excluding scaling steps
    stages = [
        DataPreprocessor(),
        DateTimeProcessor(),                 
        SkyConditionsProcessor(),
        OutlierHandler(),            
        MissingValueHandler(),               
        TargetBinner(),
        GraphFeatureProcessor()             
    ]

    # We'll still create the cyclic features as they don't rely on scaling
    time_features = ['QUARTER', 'DAY_OF_MONTH', 'DAY_OF_WEEK', 'MONTH',
                    'sched_depart_hour_UTC', 'sched_depart_minute_UTC',
                    'four_hours_prior_depart_hour_UTC', 'four_hours_prior_depart_minute_UTC',
                    'two_hours_prior_depart_hour_UTC', 'two_hours_prior_depart_minute_UTC',
                    'station_hour_UTC', 'station_minute_UTC']
    
    # Filter to only include columns that exist in the dataframe
    time_features = [col for col in time_features if col in df_train.columns]
    
    # Define and aadd cyclic encoder 
    cyclic_encoder = CyclicEncoder().setInputCols(time_features)
    stages.append(cyclic_encoder)
    
    # Create and apply non-scaling pipeline
    non_scaling_pipeline = Pipeline(stages=stages)
    non_scaling_model = non_scaling_pipeline.fit(df_train)
    preprocessed_df = non_scaling_model.transform(df_train)
    
    # checkpoint and reload data
    checkpoint_save(preprocessed_df, "checkpoint_cv_preprocessed")
    preprocessed_df = checkpoint_load("checkpoint_cv_preprocessed")
    
    categorical_features = ['OP_UNIQUE_CARRIER', 'OP_CARRIER_AIRLINE_ID', 'ORIGIN_STATE_ABR', 
                           'DEST_STATE_ABR', 'origin_type', 'dest_type',  
                           'HourlyCloudCoverage', 'HourlyCloudLayerAmount']
    
    # cat_cols_to_target_encode = ['TAIL_NUM', 'OP_CARRIER_FL_NUM', 'ORIGIN_AIRPORT_ID', 'DEST_AIRPORT_ID', 'origin_station_id', 'dest_station_id', 'STATION']

    # Filter to only include columns that exist in the dataframe
    categorical_features = [col for col in categorical_features if col in preprocessed_df.columns]

    # Apply string indexing and one-hot encoding
    indexed_cols = []
    onehot_cols = []
    category_preprocessors = []
    
    for cat_col in categorical_features:
        indexer = StringIndexer(
            inputCol=cat_col,
            outputCol=f"{cat_col}_indexed",
            handleInvalid="keep"
        )
        indexer_model = indexer.fit(preprocessed_df)
        category_preprocessors.append(indexer_model)
        preprocessed_df = indexer_model.transform(preprocessed_df)
        indexed_cols.append(f"{cat_col}_indexed")
        
        encoder = OneHotEncoder(
            inputCol=f"{cat_col}_indexed",
            outputCol=f"{cat_col}_onehot"
        )
        encoder_model = encoder.fit(preprocessed_df)
        category_preprocessors.append(encoder_model)
        preprocessed_df = encoder_model.transform(preprocessed_df)
        onehot_cols.append(f"{cat_col}_onehot")
    
    checkpoint_save(preprocessed_df, "checkpoint_cv_categorical")
    preprocessed_df = checkpoint_load("checkpoint_cv_categorical")

    # Keep final features in list if provided
    if final_feature_list:
        final_feature_list = [col for col in final_feature_list if col in preprocessed_df.columns]
        preprocessed_df = preprocessed_df.select(*final_feature_list)
    
    # Assemble one-hot encoded features
    if onehot_cols:
        onehot_cols = [col for col in onehot_cols if col in preprocessed_df.columns]
        onehot_assembler = VectorAssembler(
            inputCols=onehot_cols,
            outputCol="onehot_features",
            handleInvalid="skip"
        )
        preprocessed_df = onehot_assembler.transform(preprocessed_df)
    
    # Assemble cyclic features if available
    cyclic_cols = cyclic_encoder.getOutputCols()
    
    if cyclic_cols:
        cyclic_cols = [col for col in cyclic_cols if col in preprocessed_df.columns]
        cyclic_assembler = VectorAssembler(
            inputCols=cyclic_cols,
            outputCol="cyclic_features",
            handleInvalid="skip"
        )
        preprocessed_df = cyclic_assembler.transform(preprocessed_df)
    
    # Combine cyclic and onehot features
    combined_features = []
    if "cyclic_features" in preprocessed_df.columns:
        combined_features.append("cyclic_features")
    if "onehot_features" in preprocessed_df.columns:
        combined_features.append("onehot_features")
    
    if combined_features:
        combined_assembler = VectorAssembler(
            inputCols=combined_features,
            outputCol="cyclic_onehot_features",
            handleInvalid="skip"
        )
        preprocessed_df = combined_assembler.transform(preprocessed_df)
        preprocessed_df.checkpoint()
        preprocessed_df.count()  # Force materialization

    else:
        # If no cyclic or onehot features, create an empty vector as placeholder
        preprocessed_df = preprocessed_df.withColumn(
            "cyclic_onehot_features",
            F.array().cast("vector")
        )
    
    # make sure dataset is chronologically ordered for time-based cross-validation
    preprocessed_df = preprocessed_df.orderBy('YEAR', 'MONTH', 'DAY_OF_MONTH', 'sched_depart_hour_UTC', 'sched_depart_minute_UTC')

    checkpoint_save(preprocessed_df, "checkpoint_cv_combined_features")
    preprocessed_df = checkpoint_load("checkpoint_cv_combined_features")
    
    # Create time-based folds (WITH scaling within each fold)
    print("Creating time-based folds with per-fold scaling...")
    # Now use create_time_based_folds which will scale numerical features properly within each fold
    # df_cv = create_time_based_folds(preprocessed_df, n_folds=2, train_ratio=0.8, resampling_method=resample_method)
    df_cv = create_time_based_folds_optimized(preprocessed_df, n_folds=2, train_ratio=0.8, resampling_method=resample_method)
    
    # Define model for cross-validation
    print("Setting up cross-validation...")

    # Adjust for MLP if needed
    # Get a sample from the processed data for feature size
    sample_features = df_cv.filter(F.col("final_features").isNotNull()).select("final_features").limit(1).collect()
    
    if sample_features:  # Make sure we have a valid sample
        feature_size = len(sample_features[0][0])
        print(f"Adjusted feature vector size for CV: {feature_size}")
        
        # Update MLP architecture if using MLP
        if isinstance(modelType, MultilayerPerceptronClassifier):
            current_layers = modelType.getLayers()
            new_layers = [feature_size] + current_layers[1:]
            modelType = modelType.setLayers(new_layers)
            print(f"Updated MLP layers for CV: {new_layers}")

    # Define evaluators
    evaluator_f1 = MulticlassClassificationEvaluator(
        labelCol="DEP_DELAY_BINNED", 
        predictionCol="prediction", 
        metricName="f1"
    )
    
    evaluator_accuracy = MulticlassClassificationEvaluator(
        labelCol="DEP_DELAY_BINNED", 
        predictionCol="prediction", 
        metricName="accuracy"
    )
    
    evaluator_precision = MulticlassClassificationEvaluator(
        labelCol="DEP_DELAY_BINNED",
        predictionCol="prediction",
        metricName="weightedPrecision"
    )
    
    evaluator_recall = MulticlassClassificationEvaluator(
        labelCol="DEP_DELAY_BINNED",
        predictionCol="prediction",
        metricName="weightedRecall"
    )
    
    # Use the original CustomCrossValidator since scaling already happens in create_time_based_folds
    cv = CustomCrossValidator(
        estimator=modelType,
        estimatorParamMaps=paramGrid,
        evaluators=[evaluator_f1, evaluator_accuracy, evaluator_precision, evaluator_recall],
        splitWord=('train', 'test'),
        cvCol='fold_id',
        parallelism=4,
        verbose=1,
        retrainFullModel=False
    )

    # Run cross-validation
    print("Running cross-validation...")
    cvModel = cv.fit(df_cv)
    
    # Since a big chunk of preprocessing is identical, return part of pipeline to continue in train_model_mod()
    preprocessing_pipeline = {
    "non_scaling_model": non_scaling_model,        # Initial preprocessing
    "category_preprocessors": category_preprocessors,  # Categorical variables
    "onehot_assembler": onehot_assembler,          # One-hot encoding
    "cyclic_assembler": cyclic_assembler,          # Cyclic features
    "combined_assembler": combined_assembler       # Combined categorical/cyclic
    }

    # Return best hyperparameters and metrics
    return cvModel.bestHyperParams, cvModel.bestMetricValues, preprocessing_pipeline

Complete Training Model Method

In [0]:
def train_model_mod(df_train, modelType, paramGrid, df_test=None, use_cross_validation=True, final_feature_list=None, resample_method=None):
    """
    Train a flight delay prediction model
    
    Parameters:
    - df_train: Training DataFrame
    - df_test: Testing DataFrame (optional)
    - use_cross_validation: Whether to use cross-validation for hyperparameter tuning
    - final_feature_list: List of final feature columns to use (optional)
    
    Returns:
    - model: The trained pipeline model
    - metrics: Evaluation metrics (if df_test is provided)
    """
    # Note: data is chronologically sorted at end of first transformer (DataPreprocessor()) now

    # Hyperparameter tuning with cross-validation (if requested)
    best_params = {}
    
    if use_cross_validation:
        print("Beginning cross-validation for hyperparameter tuning...")
        # Sample records from every month for hyperparameter tuning
        df_sample = sample_with_time_windows(df_train, 12000000)
        df_sample = create_balanced_time_ordered_partitions(df_sample, num_partitions=200)
        print(f'Cross-validation using sample size: {df_sample.count()}, partition count: {df_sample.rdd.getNumPartitions()}')
        best_params, best_metrics, pp_pipeline = run_cross_validation_mod(df_sample, modelType, paramGrid, final_feature_list=final_feature_list, resample_method=resample_method)
        print("Best hyperparameters:", best_params)
        print("Best cross-validation metrics:", best_metrics)

    # Apply non-scaling preprocessing steps for the final model
    print("Building and applying non-scaling preprocessing steps for final model...")
    
    # Use same initial pipeline stages from cv since preprocessing of non-numeric is identical
    print("Using preprocessing pipeline from cross-validation...")
    non_scaling_model = pp_pipeline["non_scaling_model"]
    
    category_preprocessors = pp_pipeline["category_preprocessors"]
    onehot_assembler = pp_pipeline["onehot_assembler"]
    cyclic_assembler = pp_pipeline["cyclic_assembler"]
    combined_assembler = pp_pipeline["combined_assembler"]
    
    # Apply non-scaling preprocessing to train and test
    df_train_processed = non_scaling_model.transform(df_train)
    checkpoint_save(df_train_processed, "checkpoint_train_preprocessed")
    df_train_processed = checkpoint_load("checkpoint_train_preprocessed")
    df_test_processed = None
    if df_test is not None:
        df_test_processed = non_scaling_model.transform(df_test)
        checkpoint_save(df_test_processed, "checkpoint_test_preprocessed")
        df_test_processed = checkpoint_load("checkpoint_test_preprocessed")
    
    # Prepare categorical features
    # categorical_features = ['OP_UNIQUE_CARRIER', 'OP_CARRIER_AIRLINE_ID', 'TAIL_NUM', 
    #                        'OP_CARRIER_FL_NUM', 'ORIGIN_AIRPORT_ID', 'ORIGIN_STATE_ABR', 
    #                        'DEST_AIRPORT_ID', 'DEST_STATE_ABR', 'origin_station_id', 
    #                        'origin_type', 'dest_station_id', 'dest_type', 'STATION',  
    #                        'HourlyCloudCoverage', 'HourlyCloudLayerAmount']
    
    categorical_features = ['OP_UNIQUE_CARRIER', 'OP_CARRIER_AIRLINE_ID', 'ORIGIN_STATE_ABR', 
                           'DEST_STATE_ABR', 'origin_type', 'dest_type',  
                           'HourlyCloudCoverage', 'HourlyCloudLayerAmount']

    # Filter to only include columns that exist in the dataframe
    categorical_features = [col for col in categorical_features if col in df_train_processed.columns]
    
    # Create and fit categorical preprocessors
    onehot_cols = []
    
    for cat_col in categorical_features:
        onehot_cols.append(f"{cat_col}_onehot")
    
    # Apply categorical preprocessing to train data
    for processor in category_preprocessors:
            df_train_processed = processor.transform(df_train_processed)
    checkpoint_save(df_train_processed, "checkpoint_train_categorical")
    df_train_processed = checkpoint_load("checkpoint_train_categorical")

    # Apply categorical preprocessing to test data
    if df_test_processed is not None:
        for processor in category_preprocessors:
            df_test_processed = processor.transform(df_test_processed)
        checkpoint_save(df_test_processed, "checkpoint_test_categorical")
        df_test_processed = checkpoint_load("checkpoint_test_categorical")

    # Keep final features in list if provided
    if final_feature_list:
        final_feature_list = [col for col in final_feature_list if col in df_train_processed.columns]
        df_train_processed = df_train_processed.select(*final_feature_list)
        if df_test_processed is not None:
            df_test_processed = df_test_processed.select(*final_feature_list)

    # Assemble one-hot encoded features
    if onehot_cols:
        df_train_processed = onehot_assembler.transform(df_train_processed)
        if df_test_processed is not None:
            df_test_processed = onehot_assembler.transform(df_test_processed)
    
    # Assemble cyclic features
    cyclic_cols = cyclic_assembler.getInputCols()
    if cyclic_cols:
        df_train_processed = cyclic_assembler.transform(df_train_processed)
        if df_test_processed is not None:
            df_test_processed = cyclic_assembler.transform(df_test_processed)
    
    # Combine cyclic and onehot features
    combined_features = ["cyclic_features", "onehot_features"]
    
    if combined_features:
        df_train_pretarget = combined_assembler.transform(df_train_processed)
        checkpoint_save(df_train_pretarget, "checkpoint_train_pretarget_assembled")
        df_train_pretarget = checkpoint_load("checkpoint_train_pretarget_assembled")
        if df_test_processed is not None:
            df_test_pretarget = combined_assembler.transform(df_test_processed)
            checkpoint_save(df_test_pretarget, "checkpoint_test_pretarget_assembled")
            df_test_pretarget = checkpoint_load("checkpoint_test_pretarget_assembled")
    else:
        # If no cyclic or onehot features, create an empty vector as placeholder
        df_train_pretarget = df_train_processed.withColumn(
            "cyclic_onehot_features",
            F.array().cast("vector")
        )
        if df_test_processed is not None:
            df_test_pretarget = df_test_processed.withColumn(
                "cyclic_onehot_features",
                F.array().cast("vector")
            )

    # Process TargetEncoder features after cv
    target_encoder = TargetEncoder()
    df_train_processed = target_encoder.transform(df_train_pretarget)
    checkpoint_save(df_train_processed, "checkpoint_train_target_encoded")
    df_train_processed = checkpoint_load("checkpoint_train_target_encoded")
    if df_test_pretarget is not None:
        df_test_processed = target_encoder.transform(df_test_pretarget)
        checkpoint_save(df_test_processed, "checkpoint_test_target_encoded")
        df_test_processed = checkpoint_load("checkpoint_test_target_encoded")

    float_cols = [c for (c, t) in df_train_processed.dtypes if (t == 'float') and (c != 'DEP_DELAY')] + ['TAIL_NUM_mean_delay', 'OP_CARRIER_FL_NUM_mean_delay', 'ORIGIN_AIRPORT_ID_mean_delay', 'DEST_AIRPORT_ID_mean_delay', 'origin_station_id_mean_delay', 'dest_station_id_mean_delay', 'STATION_mean_delay']

    # Create a VectorAssembler for float features
    float_assembler = VectorAssembler(
        inputCols=float_cols,
        outputCol="float_vector",
        handleInvalid="skip"
    )
    
    # Apply assembler to train and test
    df_train_assembled = float_assembler.transform(df_train_processed)
    df_test_assembled = None
    if df_test_processed is not None:
        df_test_assembled = float_assembler.transform(df_test_processed)
    
    checkpoint_save(df_train_assembled, "checkpoint_train_assembled")
    df_train_assembled = checkpoint_load("checkpoint_train_assembled")
    if df_test_assembled is not None:
        checkpoint_save(df_test_assembled, "checkpoint_test_assembled")
        df_test_assembled = checkpoint_load("checkpoint_test_assembled")

    # Fit scaler on training data only
    scaler = RobustScaler(
        inputCol="float_vector",
        outputCol="scaled_float_vector",
        withCentering=False,
        withScaling=True
    )
    
    scaler_model = scaler.fit(df_train_assembled)
    
    # Apply scaler to both train and test
    df_train_scaled = scaler_model.transform(df_train_assembled)
    df_test_scaled = None
    if df_test_assembled is not None:
        df_test_scaled = scaler_model.transform(df_test_assembled)
    
    checkpoint_save(df_train_scaled, "checkpoint_train_scaled")
    df_train_scaled = checkpoint_load("checkpoint_train_scaled")
    if df_test_scaled is not None:
        checkpoint_save(df_test_scaled, "checkpoint_test_scaled")
        df_test_scaled = checkpoint_load("checkpoint_test_scaled")

    # Final feature assembly for the training model
    # Combine scaled float vector with cyclic_onehot_features
    final_assembler = VectorAssembler(
        inputCols=["scaled_float_vector", "cyclic_onehot_features"],
        outputCol="final_features",
        handleInvalid="skip"
    )
    
    # Apply final assembler
    df_train_final = final_assembler.transform(df_train_scaled)
    df_test_final = None
    if df_test_scaled is not None:
        df_test_final = final_assembler.transform(df_test_scaled)
    
    checkpoint_save(df_train_final, "checkpoint_train_final")
    df_train_final = checkpoint_load("checkpoint_train_final")
    if df_test_final is not None:
        checkpoint_save(df_test_final, "checkpoint_test_final")
        df_test_final = checkpoint_load("checkpoint_test_final")

    if resample_method:
        if resample_method.lower() == "smote":
            resampler = SMOTEResampler()
        elif resample_method.lower() == "upsample":
            resampler = UpsampleResampler()
        elif resample_method.lower() == "downsample":
            resampler = DownsampleResampler()
        
        df_train_final = resampler.transform(df_train_final)
        if df_test_final is not None:
            df_test_final = resampler.transform(df_test_final)
        
        checkpoint_save(df_train_final, "checkpoint_train_final_resampled")
        df_train_final = checkpoint_load("checkpoint_train_final_resampled")
        if df_test_final is not None:
            checkpoint_save(df_test_final, "checkpoint_test_final_resampled")
            df_test_final = checkpoint_load("checkpoint_test_final_resampled")

    # Define model with optimal hyperparameters
    print("Creating final model...")

    # modelType.set(**best_params)
    # Use Param objects to set the values
    for paramMap in best_params.items():
        print(f'current paramMap: {paramMap}')
        # Get the parameter object by name
        param = None
        for p in modelType.params:
            if p.name == paramMap[0]:
                param = p
                break
        
        if param is not None:
            # Set the parameter value
            modelType.set(param, paramMap[1])
        else:
            print(f"Warning: Parameter {paramMap[0]} not found in model")

    # Grab a sample to determine the feature dimension
    sample_features = df_train_final.select("final_features").limit(1).collect()[0][0]
    feature_size = len(sample_features)
    print(f"Adjusted feature vector size: {feature_size}")
    
    # Dynamically update the MLP architecture if it's an MLP model
    if isinstance(modelType, MultilayerPerceptronClassifier):
        # Get the existing layers
        current_layers = modelType.getLayers()
        
        # Update the first layer to match the feature size
        new_layers = [feature_size] + current_layers[1:]
        
        # Set the updated layers
        modelType = modelType.setLayers(new_layers)
        print(f"Updated MLP layers: {new_layers}")

    df_train_final = df_train_final.repartition(200)  # Balanced partitions for training
    # Now train the model on the full training dataset
    model = modelType.fit(df_train_final)
    # Unpersist final training df
    df_train_final.unpersist()

    # Evaluate on test data (if provided)
    metrics = None
    if df_test_final is not None:
        print("Evaluating model on test data...")
        predictions = model.transform(df_test_final)
        # Unpersist final test df
        df_test_final.unpersist()

        evaluator_f1 = MulticlassClassificationEvaluator(
            labelCol="DEP_DELAY_BINNED", 
            predictionCol="prediction", 
            metricName="f1"
        )
        
        evaluator_accuracy = MulticlassClassificationEvaluator(
            labelCol="DEP_DELAY_BINNED", 
            predictionCol="prediction", 
            metricName="accuracy"
        )
        
        evaluator_precision = MulticlassClassificationEvaluator(
            labelCol="DEP_DELAY_BINNED",
            predictionCol="prediction",
            metricName="weightedPrecision"
        )
        
        evaluator_recall = MulticlassClassificationEvaluator(
            labelCol="DEP_DELAY_BINNED",
            predictionCol="prediction",
            metricName="weightedRecall"
        )
        
        metrics = {
            "f1": evaluator_f1.evaluate(predictions),
            "accuracy": evaluator_accuracy.evaluate(predictions),
            "precision": evaluator_precision.evaluate(predictions),
            "recall": evaluator_recall.evaluate(predictions)
        }
        
        print("Test metrics:")
        for metric, value in metrics.items():
            print(f"  {metric}: {value:.4f}")
        
        print("Model Hyperparameters:")
        for param, value in model.extractParamMap().items():
            print(f"  {param.name}: {value}")
    
    # Create final pipeline model that combines all the fitted preprocessing steps and the trained model
    pipeline_model = PipelineModel(stages=[stage for stage in [
        non_scaling_model,
        *category_preprocessors,
        onehot_assembler if 'onehot_assembler' in locals() else None,
        cyclic_assembler if 'cyclic_assembler' in locals() else None,
        combined_assembler if 'combined_assembler' in locals() else None,
        target_encoder,
        float_assembler,
        scaler_model,
        final_assembler,
        resampler if 'resampler' in locals() else None,
        model
    ] if stage is not None])
        
    return pipeline_model, metrics

Full Workflow

In [0]:
def run_flight_delay_prediction_workflow_mod(df, modelType, paramGrid, resample_method=None):
    """
    Run the complete workflow for flight delay prediction
    
    This function demonstrates the entire process:
    1. Loading data
    2. Splitting into train/test
    3. Training model with cross-validation
    4. Evaluating model
    5. Making predictions
    """
    # First, balance the partitions while maintaining chronological order
    df = create_balanced_time_ordered_partitions(df, num_partitions=200)

    # Perform chronological train-test split (80/20)
    print("Performing chronological train-test split...")
    # Add a sequential row index
    windowSpec = Window.orderBy('YEAR', 'MONTH', 'DAY_OF_MONTH')
    df = df.withColumn("row_index", F.row_number().over(windowSpec))
    
    # Calculate the split point (80% for training)
    total_rows = df.count()
    train_rows = int(total_rows * 0.8)
    
    # Split the data
    train_data = df.filter(df.row_index <= train_rows).drop("row_index")
    test_data = df.filter(df.row_index > train_rows).drop("row_index")

    print(f"Training data size: {train_data.count()} rows")
    print(f"Test data size: {test_data.count()} rows")

    # Define final input features for model (to be kept post-processing) (optional)
    final_feature_list = None
    
    # Train the model with cross-validation
    print("Begin model hyperparameter tuning and training...")
    pipeline_model, metrics = train_model_mod(train_data, df_test=test_data, modelType=modelType, paramGrid=paramGrid, use_cross_validation=True, final_feature_list=final_feature_list, resample_method=resample_method)
    
    # Make predictions on test data (example)
    print("Making predictions on a sample of test data...")
    
    sample_predictions = pipeline_model.transform(test_data.limit(5))
    
    # Display prediction results
    print("Sample predictions:")
    sample_predictions.select("DEP_DELAY", "DEP_DELAY_BINNED", "prediction", "probability").show()
    
    # Clear checkpoints at end of processing
    dbutils.fs.rm(folder_path, recurse=True)
    print("Workflow completed successfully!")
    return pipeline_model, metrics

Testing/Experiments

In [0]:
# Full OTPW Dataset
directory = "dbfs:/mnt/mids-w261/OTPW_60M_Backup"
display(dbutils.fs.ls(f"{directory}"))

path,name,size,modificationTime
dbfs:/mnt/mids-w261/OTPW_60M_Backup/_SUCCESS,_SUCCESS,0,1744495305000
dbfs:/mnt/mids-w261/OTPW_60M_Backup/_committed_6865934896646357313,_committed_6865934896646357313,524,1744495306000
dbfs:/mnt/mids-w261/OTPW_60M_Backup/_started_6865934896646357313,_started_6865934896646357313,0,1744495305000
dbfs:/mnt/mids-w261/OTPW_60M_Backup/part-00000-tid-6865934896646357313-d0d2f795-a1f4-4e99-baf6-57bbbfdcabaa-133-1.c000.snappy.parquet,part-00000-tid-6865934896646357313-d0d2f795-a1f4-4e99-baf6-57bbbfdcabaa-133-1.c000.snappy.parquet,1385705174,1744495305000
dbfs:/mnt/mids-w261/OTPW_60M_Backup/part-00001-tid-6865934896646357313-d0d2f795-a1f4-4e99-baf6-57bbbfdcabaa-134-1.c000.snappy.parquet,part-00001-tid-6865934896646357313-d0d2f795-a1f4-4e99-baf6-57bbbfdcabaa-134-1.c000.snappy.parquet,1356469888,1744496514000
dbfs:/mnt/mids-w261/OTPW_60M_Backup/part-00002-tid-6865934896646357313-d0d2f795-a1f4-4e99-baf6-57bbbfdcabaa-135-1.c000.snappy.parquet,part-00002-tid-6865934896646357313-d0d2f795-a1f4-4e99-baf6-57bbbfdcabaa-135-1.c000.snappy.parquet,1092710849,1744497469000
dbfs:/mnt/mids-w261/OTPW_60M_Backup/part-00003-tid-6865934896646357313-d0d2f795-a1f4-4e99-baf6-57bbbfdcabaa-136-1.c000.snappy.parquet,part-00003-tid-6865934896646357313-d0d2f795-a1f4-4e99-baf6-57bbbfdcabaa-136-1.c000.snappy.parquet,1045217237,1744498247000
dbfs:/mnt/mids-w261/OTPW_60M_Backup/part-00004-tid-6865934896646357313-d0d2f795-a1f4-4e99-baf6-57bbbfdcabaa-137-1.c000.snappy.parquet,part-00004-tid-6865934896646357313-d0d2f795-a1f4-4e99-baf6-57bbbfdcabaa-137-1.c000.snappy.parquet,1030144723,1744499059000


In [0]:
print("Loading data...")
# df = spark.read.format("csv").option("header", "true").load("dbfs:/mnt/mids-w261/OTPW_3M_2015.csv")
# df = spark.read.option("header", "true").csv(f"dbfs:/mnt/mids-w261/OTPW_12M/OTPW_12M/OTPW_12M_2015.csv.gz").limit(100)
#Load 5 parts of 60M dataset
df_60m_part_0 = spark.read.parquet("dbfs:/mnt/mids-w261/OTPW_60M_Backup/part-00000-tid-6865934896646357313-d0d2f795-a1f4-4e99-baf6-57bbbfdcabaa-133-1.c000.snappy.parquet")
df_60m_part_1 = spark.read.parquet("dbfs:/mnt/mids-w261/OTPW_60M_Backup/part-00001-tid-6865934896646357313-d0d2f795-a1f4-4e99-baf6-57bbbfdcabaa-134-1.c000.snappy.parquet")
df_60m_part_2 = spark.read.parquet("dbfs:/mnt/mids-w261/OTPW_60M_Backup/part-00002-tid-6865934896646357313-d0d2f795-a1f4-4e99-baf6-57bbbfdcabaa-135-1.c000.snappy.parquet")
df_60m_part_3 = spark.read.parquet("dbfs:/mnt/mids-w261/OTPW_60M_Backup/part-00003-tid-6865934896646357313-d0d2f795-a1f4-4e99-baf6-57bbbfdcabaa-136-1.c000.snappy.parquet")
df_60m_part_4 = spark.read.parquet("dbfs:/mnt/mids-w261/OTPW_60M_Backup/part-00004-tid-6865934896646357313-d0d2f795-a1f4-4e99-baf6-57bbbfdcabaa-137-1.c000.snappy.parquet")
df_list = [df_60m_part_0, df_60m_part_1, df_60m_part_2, df_60m_part_3, df_60m_part_4]
df = reduce(DataFrame.union, df_list)

Loading data...


In [0]:
mlp_1 = MultilayerPerceptronClassifier(
    featuresCol="final_features", # column name of features (vectorized)
    labelCol="DEP_DELAY_BINNED", # column name of label
    layers=[198, 100, 50, 25, 10, 3],
    maxIter=50, 
    seed=42)

mlp_2 = MultilayerPerceptronClassifier(
    featuresCol="final_features", # column name of features (vectorized)
    labelCol="DEP_DELAY_BINNED", # column name of label
    layers=[198, 150, 75, 50, 25, 10, 3],
    maxIter=10, 
    seed=42)

mlp_3 = MultilayerPerceptronClassifier(
    featuresCol="final_features", # column name of features (vectorized)
    labelCol="DEP_DELAY_BINNED", # column name of label
    layers= [198, 50, 10, 3],
    maxIter= 10,
    blockSize= 64, 
    seed=42)

lr = LogisticRegression(
    featuresCol="final_features", # column name of features (vectorized)
    labelCol="DEP_DELAY_BINNED", # column name of label
    family="multinomial", # multinomial classification
    maxIter=200, # max iterations
    regParam=0, # regularization parameter
    elasticNetParam=0.0, # regularization mix (1.0=L1, 0.0=L2)
    
    # performance optimizations
    aggregationDepth=2, # depth for tree aggregation (higher values = more parallelism)
    standardization=False, # standardize features for better convergence
    tol=1e-6, # convergence tolerance
    fitIntercept=True # include intercept term
)

In [0]:
mlp_1_paramGrid = ParamGridBuilder() \
    .addGrid(mlp_1.stepSize, [0.05]) \
    .build()
mlp_2_paramGrid = ParamGridBuilder() \
    .addGrid(mlp_2.stepSize, [0.01, 0.05, 0.1]) \
    .build()
mlp_3_paramGrid = ParamGridBuilder() \
    .addGrid(mlp_3.stepSize, [0.01, 0.05, 0.1]) \
    .build()
lr_paramGrid = ParamGridBuilder() \
    .addGrid(lr.regParam, [0.01, 0.1]) \
    .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0]) \
    .build()

In [0]:
# Define parameters to feed to workflow

# Define model type
lr = LogisticRegression(
        featuresCol="final_features", 
        labelCol="DEP_DELAY_BINNED"
    )

# Define paramgrid
paramGrid = ParamGridBuilder() \
        .addGrid(lr.regParam, [0.001]) \
        .addGrid(lr.elasticNetParam, [0.0]) \
        .build()

In [0]:
print("Starting flight delay prediction workflow...")
model_pipeline, metrics = run_flight_delay_prediction_workflow_mod(df, modelType=mlp_1, paramGrid=mlp_1_paramGrid, resample_method='upsample')

print("\nModel training and evaluation complete!")
print("Final evaluation metrics:", metrics)
print("\nThe model can now be used to predict flight delays.")

Starting flight delay prediction workflow...
Performing chronological train-test split...
Training data size: 25338495 rows
Test data size: 6334624 rows
Begin model hyperparameter tuning and training...
Beginning cross-validation for hyperparameter tuning...
Cross-validation using sample size: 11995803, partition count: 200
Preparing data for time-based cross-validation...
Applying non-scaling preprocessing steps...
Running DataPreprocessor...
Preprocessing 1 rows
Checkpointing 1 rows to checkpoint_data_preprocessed
Loading checkpoint: checkpoint_data_preprocessed
Running DateTimeProcessor...
Running SkyConditionsProcessor...
Running OutlierHandler...
The current column is DISTANCE, the number of rows dropped is 0, the current number of rows is 1
The current column is LATITUDE, the number of rows dropped is 0, the current number of rows is 1
The current column is LONGITUDE, the number of rows dropped is 0, the current number of rows is 1
The current column is ELEVATION, the number of r

Downloading artifacts:   0%|          | 0/45 [00:00<?, ?it/s]

Uploading artifacts:   0%|          | 0/4 [00:00<?, ?it/s]

Running DataPreprocessor...
Preprocessing 11995803 rows
Checkpointing 11822718 rows to checkpoint_data_preprocessed
Loading checkpoint: checkpoint_data_preprocessed
Running DateTimeProcessor...
Running SkyConditionsProcessor...
Running OutlierHandler...
The current column is DISTANCE, the number of rows dropped is 19251, the current number of rows is 11803179
The current column is LATITUDE, the number of rows dropped is 12120, the current number of rows is 11791059
The current column is LONGITUDE, the number of rows dropped is 0, the current number of rows is 11791059
The current column is ELEVATION, the number of rows dropped is 866545, the current number of rows is 10924514
The current column is HourlyAltimeterSetting, the number of rows dropped is 455727, the current number of rows is 10468787
The current column is HourlyDewPointTemperature, the number of rows dropped is 8520, the current number of rows is 10460267
The current column is HourlyRelativeHumidity, the number of rows dro