# Flight Prediction Data Pipeline
This data pipeline code ingests the flight and weather data from blob storage and processes it so it is ready for training with a machine learning estimator. It transforms the data: one-hot-encodes cateogorical features, normalizes numeric features, and imputes missinges values with the mean for numeric features. It also engineers new features, such as airport pagerank,  airport triangle count, and a lagged outcome variable. The code creates cross-validation folds for the training set and processes the full training and test datasets. In the data pipeline image below, this code completes the first half, stage 1 data cleaning, splitting the data into train and test datasets, and stage 2 data cleaning.

**Pipeline Image**
![Pipeline Image](https://i.imgur.com/wq62T0E.png)

### Project Description
This is a group project conducted for course w261: Machine Learning at Scale at the University of California Berkeley in Summer 2023. This project develops a machine learning model that predicts flight delays based on historical flight, airport station, and weather data spanning five years from 2015-2019 in the United States.

###Group members
Jessica Stockham, Chase Madison, Kisha Kim, Eric Danforth

In [0]:
import numpy as np
import re
import pandas as pd
from collections import namedtuple
from datetime import datetime, timedelta, date
import holidays
from datetime import datetime

from pyspark.sql.types import *
from pyspark.sql.functions import udf, col,isnan,when,count, lag, to_timestamp, to_date, first, row_number
from pyspark.sql import Window
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler,StandardScaler, Imputer, Bucketizer
from pyspark.ml import Pipeline
from pyspark.ml.tuning import ParamGridBuilder, TrainValidationSplit
from pyspark.ml.classification import LogisticRegression, RandomForestClassifier, GBTClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator
from pyspark.mllib.evaluation import MulticlassMetrics
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

from sklearn.metrics import confusion_matrix

from graphframes import GraphFrame
from hyperopt import fmin, tpe, Trials, SparkTrials, hp


In [0]:
## Place this cell in any team notebook that needs access to the team cloud storage
mids261_mount_path = '/mnt/mids-w261'  # 261 course blob storage is mounted here
secret_scope = 'sec5-team1-scope'  # Name of the secret scope Chase created i|n Databricks CLI
secret_key = 'sec5-team1-key'  # Name of the secret key Chase created in Databricks CLI
storage_account = 'sec5team1storage'  # Name of the Azure Storage Account Chase created
blob_container = 'sec5-team1-container'  # Name of the container Chase created in Azure Storage Account
team_blob_url = f'wasbs://{blob_container}@{storage_account}.blob.core.windows.net'  # Points to the root of your team storage bucket
spark.conf.set(  # SAS Token: Grant the team limited access to Azure Storage resources
  f'fs.azure.sas.{blob_container}.{storage_account}.blob.core.windows.net',
  dbutils.secrets.get(scope=secret_scope, key=secret_key)
)

### Cross Validation Folds
The training set is further divided into training folds that each contain training and validation dataframes using the Block Splits cross validation method. We create non-overlapping folds. The first chunk of days of each fold are used for testing and the next chunk of days are used for validation.

This arrangement also ensures that our validation folds are representative of different days of year and months, which is important because we have temporal features that we want to evaluate.

In [0]:
# Citation: make_block_splits() code provided by Course Instructor as part of a lab demonstration

BlockSplit = namedtuple("BlockSplit", ["min_date", "train_cut", "val_cut"])

def make_block_splits(min_date, max_date, train_width, val_width):
    """
    Return a list of breakpoints for building the train and validation folds.

    Each item in the list contains a namedtuple of dates: (min_date, train_cut, val_cut).
    The folds can be generated using these cuts as follows: 
    
    * train data is (min_date <= flight_date < train_cut)
    * val data is (train_cut <= flight_date < val_cut)

    Args:
        min_date - the first date to include in the first fold
        max_date - the validation cut of the last fold (i.e. this is the first day
            of the test set and is not included in the folds)
        train_width - the timespan covered by each training set
        val_delta - the timespan covered by each validation set

    Returns:
        a list of ``BlockSplit`` instances
    """
    blocks = list()
    train_min = min_date
    val_cut = min_date
    
    # Loop over and create blocks
    while True:
        train_cut = train_min + train_width
        val_cut = train_cut + val_width
        if train_cut > max_date or val_cut > max_date:
            break
        blocks.append(BlockSplit(train_min, train_cut, val_cut))
        train_min = val_cut
    
    return blocks

def make_folds(df, date_col, splits):
    """
    Make folds using the specified block splits.

    Args:
        df - the dataframe to make folds from
        date_col - the name of a date column to split on
        splits - a list of ``BlockSplit`` instances
    
    Returns:
        a list of (train_df, val_df) tuples
    """
    folds = list()
    for split in splits:
        train_df = df.filter((df[date_col] >= split.min_date) & (df[date_col] < split.train_cut))
        val_df = df.filter((df[date_col] >= split.train_cut) & (df[date_col] < split.val_cut))
        folds.append((train_df, val_df))
    return folds

def save_folds_to_blob(folds, blob_url, fold_name):
    for i, (train_df, val_df) in enumerate(folds):
        dbutils.fs.rm(f"{blob_url}/{fold_name}/train_{i}_df", recurse=True)
        dbutils.fs.rm(f"{blob_url}/{fold_name}/val_{i}_df", recurse=True)
        train_df.write.parquet(f"{blob_url}/{fold_name}/train_{i}_df")
        val_df.write.parquet(f"{blob_url}/{fold_name}/val_{i}_df")

def load_folds_from_blob_and_cache(blob_url, fold_name):
    folds = list()
    DEFAULT_PARTITION_COUNT = 36

    # Compute the fold count
    files = dbutils.fs.ls(f"{blob_url}/{fold_name}")
    fold_names = sorted([f.name for f in files if f.name.startswith("train")])
    match = re.match(r"train_(\d+)_df", fold_names[-1])
    fold_count = int(match.group(1)) + 1
    print(f"Loading {fold_count} folds...")

    # Load folds
    for i in range(fold_count):
        train_df = (
            spark.read.parquet(f"{blob_url}/{fold_name}/train_{i}_df")
            .repartition(DEFAULT_PARTITION_COUNT)
            .cache()
        )
        val_df = (
            spark.read.parquet(f"{blob_url}/{fold_name}/val_{i}_df")
            .repartition(DEFAULT_PARTITION_COUNT)
            .cache()
        )
        folds.append((train_df, val_df))
    return folds

**Save Folds Data**

Save the folds (each fold contains a training dataframe and a validation dataframe) to blob storage, then load them back. This is a recovery point if we need to rerun part of the pipeline.

In [0]:
# Citation: Code written by Jessica Stockham

def foldsWork(i, train_df, val_df):

    print(f'CV FOLD START: {i}: {datetime.now()}')

    display(train_df.count())
    display(val_df.count())

    # Downsample Training Set
    train_downsampled = downsample(train_df)

    display(train_downsampled.count())

    print(f'Done with downsampling: {datetime.now()}')

    # Add graph features to each fold
    df_train_graph_downsampled, df_test_graph = get_graph_features(train_downsampled, val_df)      

    print(f'Done with graph features: {datetime.now()}') 

    # CV Pipeline
    pipelineModel, cv_features = cvPipeline(df_train_graph_downsampled)
    train_split = pipelineModel.transform(df_train_graph_downsampled)
    test_split = pipelineModel.transform(df_test_graph)

    print(f'Done with CV Pipeline: {datetime.now()}')
    print(f'cv_features: {cv_features}')
                
    features_all = features + cv_features
    
    print(f'features_all: {features_all}')
    assembler = VectorAssembler(inputCols=features_all, outputCol="features")           
    train = assembler.transform(train_split)
    test = assembler.transform(test_split)

    display(train.count())
    display(test.count())

    print(f'Done with Assembling Features: {datetime.now()}')

    return train, test


def make_folds_and_save(df_in, splits, fold_name="folds"):
    """
    Create training folds and save to blob storage.

    Args:
        df_in - the dataframe to split into folds
        splits - the block splits as returned by make_block_splits()
    """

    global features

    if input(f"Are you sure you want to overwrite '{fold_name}'? (y/n)").lower().strip() == "y":
        print("Making train/val folds...")
        df = df_in
        folds = make_folds(df, "FL_DATE", splits)

        folds_transformed = []

        # Apply data processing within each fold to avoid data leakage
        for i, (train_df, val_df) in enumerate(folds):

            train, test = foldsWork(i, train_df, val_df)

            folds_transformed.append((train, test))

            save_folds_to_blob(folds_transformed, team_blob_url, fold_name)
        return folds_transformed
    else:
        print("Cancelled")
    

### Clean Data to Prepare for Training
Clean and process the full dataset. Drops canceled flights, does one-hot imputation on categorical variables, creates new features.

In [0]:
# Citation: Cleaning code in this block was written collaboratively by Chase Madision, Kisha Kim, and Jessica Stockham.

# HOLIDAY PROCESSING

# Create a broadcast variable for a list of US holidays
us_holidays = spark.sparkContext.broadcast(
    holidays.US(years=list(range(2015, 2021)))
)

# Define a UDF that uses the broadcast variable to check if a given date is a US holiday 
def is_holiday(date):
    return date in us_holidays.value

# Define a UDF to check if a given date is adjacent to a holiday
def is_holiday_adjacent(date):
    day_before = date - timedelta(days=1)
    day_after = date + timedelta(days=1)
    return is_holiday(day_before) or is_holiday(day_after)
    
def timeFeatures(df_otpw):
    
    ## KISHA CODE

    ## Avg flight delay in minutes from previous day

    df_otpw.createOrReplaceTempView("my_temp_view")
    # WHERE OP_CARRIER_FL_NUM==1
    sql_query1 = "SELECT DISTINCT OP_CARRIER_FL_NUM, FL_DATE, sum(DEP_DELAY) over (PARTITION BY OP_CARRIER_FL_NUM ORDER BY FL_DATE) AS CURRENT_DAY_DELAY, COUNT(DEP_DELAY) over (PARTITION BY OP_CARRIER_FL_NUM ORDER BY FL_DATE) AS NUM_FLIGHT_TODAY FROM my_temp_view ORDER BY 1,2"
    result_df1 = spark.sql(sql_query1)
    result_df1.createOrReplaceTempView("my_temp_view2")

    sql_query2 = "SELECT DISTINCT OP_CARRIER_FL_NUM, FL_DATE, CURRENT_DAY_DELAY, NUM_FLIGHT_TODAY, CURRENT_DAY_DELAY/NUM_FLIGHT_TODAY as DAILY_AVG_DEP_DELAY FROM my_temp_view2"
    result_df2 = spark.sql(sql_query2)
    result_df2.createOrReplaceTempView("my_temp_view3")

    sql_query3 = "SELECT OP_CARRIER_FL_NUM, FL_DATE, CURRENT_DAY_DELAY, NUM_FLIGHT_TODAY, DAILY_AVG_DEP_DELAY, LAG(DAILY_AVG_DEP_DELAY, 1) OVER (PARTITION BY OP_CARRIER_FL_NUM ORDER BY FL_DATE) AS PRIOR_DAILY_AVG_DEP_DELAY FROM my_temp_view3 ORDER BY 1,2"
    result_df3 = spark.sql(sql_query3)
    # Show the result DataFrame

    # display(result_df3.limit(50).toPandas())
    df_avg_delay_by_flightnum = result_df3.fillna(0, subset="PRIOR_DAILY_AVG_DEP_DELAY")

    # Rename multiple columns
    df_avg_delay_by_flightnum = df_avg_delay_by_flightnum.withColumnRenamed(
        "OP_CARRIER_FL_NUM", "FLIGHT_NUM"
    ).withColumnRenamed("FL_DATE", "FLIGHT_DATE")

    #Joining to main table 
    df_otpw = df_otpw.join(
            df_avg_delay_by_flightnum.withColumnRenamed('DAILY_AVG_DEP_DELAY', 'FE_DAILY_AVG_DEP_DELAY') \
                .withColumnRenamed('PRIOR_DAILY_AVG_DEP_DELAY', 'FE_PRIOR_DAILY_AVG_DEP_DELAY'),
            on=((df_otpw["OP_CARRIER_FL_NUM"] == df_avg_delay_by_flightnum["FLIGHT_NUM"]) & (df_otpw["FL_DATE"] == df_avg_delay_by_flightnum["FLIGHT_DATE"]) ) ,
            how="left"
        ) .drop("FLIGHT_NUM", "FLIGHT_DATE")
    
    ## Avg flight duration from previous day
    df_otpw.createOrReplaceTempView("my_temp_view")
    # WHERE OP_CARRIER_FL_NUM==1
    a_sql_query1 = "SELECT DISTINCT OP_CARRIER_FL_NUM, FL_DATE, sum(AIR_TIME) over (PARTITION BY OP_CARRIER_FL_NUM ORDER BY FL_DATE) AS SUM_DURATION_BY_FLIGHT, COUNT(AIR_TIME) over (PARTITION BY OP_CARRIER_FL_NUM ORDER BY FL_DATE) AS NUM_FLIGHT_TODAY FROM my_temp_view ORDER BY 1,2"
    a_result_df1 = spark.sql(a_sql_query1)
    a_result_df1.createOrReplaceTempView("my_temp_view2")

    a_sql_query2 = "SELECT OP_CARRIER_FL_NUM, FL_DATE, SUM_DURATION_BY_FLIGHT, NUM_FLIGHT_TODAY, SUM_DURATION_BY_FLIGHT/NUM_FLIGHT_TODAY as AVG_DURATION FROM my_temp_view2"
    a_result_df2 = spark.sql(a_sql_query2)
    a_result_df2.createOrReplaceTempView("my_temp_view3")

    a_sql_query3 = "SELECT OP_CARRIER_FL_NUM, FL_DATE, SUM_DURATION_BY_FLIGHT, NUM_FLIGHT_TODAY, AVG_DURATION, LAG(AVG_DURATION, 1) OVER (PARTITION BY OP_CARRIER_FL_NUM ORDER BY FL_DATE) AS PRIOR_AVG_DURATION FROM my_temp_view3 ORDER BY 1,2"
    a_result_df3 = spark.sql(a_sql_query3)
    df_avg_flight_duration = a_result_df3.fillna(0, subset="PRIOR_AVG_DURATION")

    # Rename multiple columns
    df_avg_flight_duration = df_avg_flight_duration.withColumnRenamed(
        "OP_CARRIER_FL_NUM", "FLIGHT_NUM"
    ).withColumnRenamed("FL_DATE", "FLIGHT_DATE")

    display(df_avg_flight_duration.limit(1000).toPandas())
    keep = ["FLIGHT_NUM", "FLIGHT_DATE", "AVG_DURATION", "PRIOR_AVG_DURATION"]
    df_avg_flight_duration = df_avg_flight_duration.select(keep)

    # join back
    df_otpw = df_otpw.join(
        df_avg_flight_duration.withColumnRenamed('AVG_DURATION', 'FE_AVG_DURATION') \
            .withColumnRenamed('PRIOR_AVG_DURATION', 'FE_PRIOR_AVG_DURATION'),
        on=((df_otpw["OP_CARRIER_FL_NUM"] == df_avg_flight_duration["FLIGHT_NUM"]) & (df_otpw["FL_DATE"] == df_avg_flight_duration["FLIGHT_DATE"]) ) ,
        how="left"
    ) .drop("FLIGHT_NUM", "FLIGHT_DATE")

    ## Number of Flight Scheduled

    df_otpw.createOrReplaceTempView("my_temp_view")
    b_sql_query1 = "SELECT DISTINCT FL_DATE, ORIGIN, DEP_TIME_BLK, COUNT(OP_CARRIER_FL_NUM) over (PARTITION BY ORIGIN, DEP_TIME_BLK ORDER BY FL_DATE) AS FLIGHT_SCHEDULE FROM my_temp_view ORDER BY 1,2"
    b_result_df1 = spark.sql(b_sql_query1)

    df_num_scheduled_flight = b_result_df1.fillna(0, subset="FLIGHT_SCHEDULE")

    # Rename multiple columns
    df_num_scheduled_flight = df_num_scheduled_flight.withColumnRenamed(
        "ORIGIN", "ORIGIN_AIRPORT").withColumnRenamed("FL_DATE", "FLIGHT_DATE").withColumnRenamed("DEP_TIME_BLK", "Departure_Time_Block")
    #display(df_num_scheduled_flight.limit(1000).toPandas())

    #Joining to main table 
    df_otpw = df_otpw.join(
            df_num_scheduled_flight.withColumnRenamed('FLIGHT_SCHEDULE', 'FE_NUM_FLIGHT_SCHEDULED'),
            on=((df_otpw["ORIGIN"] == df_num_scheduled_flight["ORIGIN_AIRPORT"]) & (df_otpw["FL_DATE"] == df_num_scheduled_flight["FLIGHT_DATE"]) & (df_otpw["DEP_TIME_BLK"] == df_num_scheduled_flight["Departure_Time_Block"])) ,
            how="left"
        ) .drop("ORIGIN_AIRPORT", "FLIGHT_DATE","Departure_Time_Block")

    return df_otpw

# Prior Flight Delay in Minutes
def get_flights_lagged_delay(df):
    """Looks at the scheduled plane's flight immediately prior to see difference """
    # Partition the data by tail number, then order by datetime of flight in UTC
    window_spec = Window.partitionBy('TAIL_NUM').orderBy('sched_depart_date_time_UTC')
    # Create the new column 'DEP_DELAY_LAG' using the lag function, and set the default value to 0 for the first flight in the sequence.
    df_with_lag = df.withColumn('DEP_DELAY_LAG', lag(col('DEP_DELAY')).over(window_spec)) \
        .na.fill({'DEP_DELAY_LAG': 0})
    return df_with_lag

# First Flight Indicator Variable
def get_first_flight_of_day(df):
    """Value of 1 indicates that its the plane's first flight of the day"""
    # Create the window specification to partition by TAIL_NUM and order by sched_depart_date_time_UTC
    window_spec = Window.partitionBy('TAIL_NUM', 'FL_DATE').orderBy('sched_depart_date_time_UTC')
    # Use row_number() to assign a row number to each record within the partition
    df_with_first_flights = df.withColumn('row_number', row_number().over(window_spec)) \
        .withColumn('IS_FIRST_FLIGHT_OF_DAY', col('row_number') == 1) \
        .drop('row_number')

    # Cast boolean to double for ingestion in MLlilb
    df_with_first_flights = df_with_first_flights\
        .withColumn("IS_FIRST_FLIGHT_OF_DAY_double", col("IS_FIRST_FLIGHT_OF_DAY").cast("integer").cast("double"))
    
    return df_with_first_flights

# DATA CLEANING PIPELINE: ALL DATA
def pipelineAllData(df):

    """Pipeline steps we apply uniformly to all training and held-out data. Only run this once
    """

    global features
    global numericFeatures
    global categoricalFeatures
    global ordinalFeatures
    global bucketFeatures
    global binaryfeatures
    global processingFeatures
    global labelCol

    # ADD NEW FEATURES

    # Cast Data to Needed Data Types
    cast_df = df\
        .withColumn('DEP_DEL15',col('DEP_DEL15').cast(DoubleType()))\
        .withColumn('DEP_DELAY',col('DEP_DELAY').cast(DoubleType()))\
        .withColumn('AIR_TIME',col('AIR_TIME').cast(DoubleType()))\
        .withColumn('DISTANCE',col('DISTANCE').cast(DoubleType()))\
        .withColumn('ELEVATION',col('ELEVATION').cast(DoubleType()))\
        .withColumn('CRS_DEP_TIME',col('CRS_DEP_TIME').cast(IntegerType()))\
        .withColumn("DATE",to_timestamp("DATE"))\
        .withColumn("FL_DATE",to_date("FL_DATE"))\
        .withColumn("sched_depart_date_time_UTC",to_timestamp("sched_depart_date_time_UTC"))\
        .withColumn('OP_CARRIER_FL_NUM',col('OP_CARRIER_FL_NUM').cast(IntegerType()))\
        .withColumn('MONTH',col('MONTH').cast(IntegerType()))\
        .withColumn('YEAR',col('YEAR').cast(IntegerType()))\
        .withColumn('DAY_OF_WEEK',col('DAY_OF_WEEK').cast(IntegerType()))\
        .withColumn('CANCELLED',col('CANCELLED').cast(IntegerType()))

    print("count before drop")
    display(cast_df.count())

    # Drop canceled flights
    df_dropped = cast_df.where(col('CANCELLED') == 0).drop('CANCELLED')
    print(f"count after drop")
    display(df_dropped.count())

    # Add First Flight Indicator [categorical feature]
    df_firstFlight = get_first_flight_of_day(df_dropped)

    # Add lagged Departure Delay (Prior Flight) [numeric feature]
    df_lag_delay = get_flights_lagged_delay(df_firstFlight)

    # Add Time features (numeric) [numeric features]
    time_df = timeFeatures(df_lag_delay)

    # Add Holiday Features (binary 0/1) [categorical features]

    udf_is_holiday = udf(is_holiday, BooleanType())
    udf_is_holiday_adjacent = udf(is_holiday_adjacent, BooleanType())
    holiday_df = time_df.withColumn('is_holiday', udf_is_holiday('FL_DATE')).withColumn('is_holiday_adjacent', udf_is_holiday_adjacent(col('FL_DATE')))

    holiday_df = holiday_df.withColumn("is_holiday_double",holiday_df.is_holiday.cast("integer").cast("double"))
    holiday_df = holiday_df.withColumn("is_holiday_adjacent_double",holiday_df.is_holiday_adjacent.cast("integer").cast("double"))

    # Create a Missing Category for 

     # Create column "label" to represent our outcome variable
    labeled_df = holiday_df.withColumn("label", col(labelCol))

    print(f"count after label")
    display(labeled_df.count())

    #Filter to just the features we need for training
    slim_df = labeled_df.na.drop(subset=["DEP_DEL15"]).select(['label'] + numericFeatures + categoricalFeatures_orig + processingFeatures)

    display(slim_df.printSchema())

    ### PIPELINE (one-hot encode categorical) ###
    stages = []

    # Departure Time Buckets
    bucketizer = Bucketizer(splits=[ 0, 900, 1200, 1600, 2000, 2359],inputCol="CRS_DEP_TIME", outputCol="CRS_DEP_BUCKET")
    stages += [bucketizer]

    # Categorical: StringIndex and One-Hot Encode
    ix_output_cols = [f"{c}_ix" for c in categoricalFeatures_plus_bin]
    cat_ix = StringIndexer(inputCols=categoricalFeatures_plus_bin, outputCols=ix_output_cols, handleInvalid="keep")
    hot_output_cols = [f"{c}_hot" for c in categoricalFeatures_plus_bin]
    hot_ix = OneHotEncoder(inputCols=ix_output_cols, outputCols=hot_output_cols, handleInvalid="keep")
    stages += [cat_ix, hot_ix]
    features += hot_output_cols

    # Create Pipeline
    pipeline = Pipeline(stages = stages)
    
    # Fit the Pipeline to Dataset
    pipelineModel = pipeline.fit(slim_df)
        
    return pipelineModel, slim_df

###Apply Meta-Data from Training Dataset to Validation or Test Dataset
Create graph features, normalize numeric features, impute missing for numeric features.

In [0]:
# Citation: Cleaning code in this block was written collaboratively by Chase Madision and Jessica Stockham
# Chase wrote get_graph_features() and downsample(). Jessica wrote cvPipeline()

flight_composite_key = [
    'OP_UNIQUE_CARRIER',
    'OP_CARRIER_FL_NUM',
    'origin_iata_code',
    'dest_iata_code',
    'FL_DATE',
    'TAIL_NUM',
    'CRS_DEP_TIME']

def get_graph_features(df_train, df_test):
    """
    Takes as input the train and test spark dfs
    Calculates PageRank, Triangle Count, and LPA based on train dataframe
    Produces input dfs, joined with 2 sets of new features on ORIGIN and DEST, and the features
    """
    ### 1. Create a GraphFrame ###
    vertices = df_train.select(col('ORIGIN').alias('id')) \
        .distinct() \
        .union(df_train.select(col('DEST').alias('id')).distinct()) \
        .distinct()
    edges = df_train.select(
        col('ORIGIN').alias('src'), 
        col('DEST').alias('dst'), 
        'OP_UNIQUE_CARRIER', 'OP_CARRIER_FL_NUM', 'TAIL_NUM'
    )
    f = GraphFrame(vertices, edges)
    ### 2. Compute Graph Algorithms ###
    # Page Rank
    page_rank = f.pageRank(maxIter=10, resetProbability=0.15).vertices \
                .withColumnRenamed("id", "airport_iata_code") \
                .withColumnRenamed("pagerank", "pagerank")
    # Triangle Count
    triangle_count = f.triangleCount() \
             .withColumnRenamed("id", "airport_iata_code") \
             .withColumnRenamed("count", "triangle_count")
    # # Label Propagation Algorithm (LPA)
    # lpa = f.labelPropagation(maxIter=4) \
    #    .withColumnRenamed("id", "airport_iata_code") \
    #    .withColumnRenamed("label", "lpa_label")
    graph_features = page_rank.join(triangle_count, on="airport_iata_code") \
       # .join(lpa, on="airport_iata_code")

    ### 3. Join Back to DataFrames on Origin and Dest Airports ###
    # Join to Training Set
    df_train = df_train.join(
        graph_features.withColumnRenamed('pagerank', 'pagerank_origin') \
            .withColumnRenamed('triangle_count', 'triangle_count_origin'), \
            # .withColumnRenamed('lpa_label', 'lpa_label_origin'),
        on=df_train["ORIGIN"] == graph_features["airport_iata_code"],
        how="left_outer"
    ).drop('airport_iata_code') \
    .join(
        graph_features.withColumnRenamed('pagerank', 'pagerank_dest') \
            .withColumnRenamed('triangle_count', 'triangle_count_dest'), \
            # .withColumnRenamed('lpa_label', 'lpa_label_dest'),
        on=df_train["DEST"] == graph_features["airport_iata_code"],
        how="left_outer"
    ).drop('airport_iata_code')
    
    # Join to Validation Set
    df_test = df_test.join(
        graph_features.withColumnRenamed('pagerank', 'pagerank_origin') \
            .withColumnRenamed('triangle_count', 'triangle_count_origin'), \
            # .withColumnRenamed('lpa_label', 'lpa_label_origin'),
        on=df_test["ORIGIN"] == graph_features["airport_iata_code"],
        how="left_outer"
    ).drop('airport_iata_code') \
    .join(
        graph_features.withColumnRenamed('pagerank', 'pagerank_dest') \
            .withColumnRenamed('triangle_count', 'triangle_count_dest'), \
            # .withColumnRenamed('lpa_label', 'lpa_label_dest'),
        on=df_test["DEST"] == graph_features["airport_iata_code"],
        how="left_outer"
    ).drop('airport_iata_code')

    return df_train, df_test
    
def downsample(df_train):
    """Takes the training set and downsamples to return a balanced set based on DEP_DEL15"""
    # Count the rows in the training set
    num_flights = df_train.count()
    # Filter down to just the delayed flights
    df_flights_delayed = df_train.where(col('label') == 1).cache()
    # Count the number of delayed flights
    num_flights_delayed = df_flights_delayed.count()
    # Sample a little more than we need, then limit to the number of delayed flights and combine
    flights_downsampled = df_train.where(col('label') == 0) \
        .sample(
            withReplacement=False,
            fraction=1.1 * num_flights_delayed / (num_flights - num_flights_delayed),
            seed=261
        ) \
        .limit(num_flights_delayed) \
        .union(df_flights_delayed)
    return flights_downsampled


def cvPipeline(df_train_cv):
    """Pipeline steps we apply Individually to each CV Split.
    1. Impute missing for numeric values
    2. Normalize numeric data    
    """

    stages = []

    # cv_features are features transformed for this particular CV split
    cv_features = []

    # Numeric: Impute missing: mean (faster for big data)
    imputer = Imputer(
        strategy='mean',inputCols = numeric_all_features,
        outputCols=["{}_imputed".format(c) for c in numeric_all_features]
    )
    stages += [imputer]

    # Numeric: Scale the features (first assemble)
    num_assembler = VectorAssembler(
        inputCols=[c + "_imputed" for c in numeric_all_features], 
        outputCol="vectorized_numeric_features")
    
    stages += [num_assembler]
    
    scaler = StandardScaler(
        inputCol="vectorized_numeric_features", withStd=True, withMean=True, 
        outputCol="scaled_numeric")
    
    stages += [scaler]
    
    cv_features.append("scaled_numeric")

    # Create Pipeline
    pipeline = Pipeline(stages = stages)
    
    # Fit the Pipeline to Dataset
    pipelineModel = pipeline.fit(df_train_cv)

    
    pipeline = Pipeline(stages = stages)
    
    # Fit the Pipeline to Dataset
    pipelineModel = pipeline.fit(df_train_cv)
    
    return pipelineModel, cv_features

In [0]:
# STORE A GLOBAL FINAL FEATURES LIST
def featuresList(features):
    '''Store features list'''
    features_df = pd.DataFrame(features)
    features_spark_df=spark.createDataFrame(features_df)

    features_spark_df.write.format("csv").mode("overwrite")\
        .option("path", (f"{team_blob_url}/features"))\
        .save()
    print(features)

    #Read features from disk
    features_in = spark.read.format("csv")\
        .option("path", (f"{team_blob_url}/features"))\
        .load()
    features_in.show()
    return

### Run Data Pipeline
This code runs the functions defined above.

In [0]:
# Citation: Code written by Jessica Stockham

############## 0. PIPELINE SETUP ####################

## PARAMETERS WE SET

# # Dataset Setting
DEFAULT_PARTITION_COUNT = 36
timeInterval = '60mo'  
#timeInterval = '3mo'


# Splits for 60 Months
#5 years

# Determine split points: 5 folds.
splits = make_block_splits(
    min_date = date(2015,1,1), 
    max_date = date(2018,12,31), 
    train_width = timedelta(days=200),
    val_width = timedelta(days=92),
)

# Splits for 3 Months

# # Determine split points (creates 3 folds)
# splits = make_block_splits(
#     min_date = date(2015,1,1), 
#     max_date = date(2015,3,31), 
#     train_width = timedelta(days=20),
#     val_width = timedelta(days=9),
# )

for i, split in enumerate(splits):
    print(f"Fold {i} Splits:")
    print(f"  Train: {split.min_date} <= x < {split.train_cut}")
    print(f"    Val: {split.train_cut} <= x < {split.val_cut}")

# Features
labelCol = 'DEP_DEL15'

graph_features_numeric = [
    'pagerank_origin',       # Numeric float value typically ranging from 0-10
    'pagerank_dest', 
    'triangle_count_origin', # Numeric integer values typically ranging from 0-500
    'triangle_count_dest'
    ]

# Special Feature Categories 
time_features = ['FE_PRIOR_DAILY_AVG_DEP_DELAY', 'FE_PRIOR_AVG_DURATION', 'FE_NUM_FLIGHT_SCHEDULED', "DEP_DELAY_LAG"]
binaryfeatures = ["is_holiday_double", "is_holiday_adjacent_double", "IS_FIRST_FLIGHT_OF_DAY_double"]
bucketFeatures = ["CRS_DEP_BUCKET"] # Result of binning CRS_DEP_TIME
processingFeatures = ["DATE", "FL_DATE", "OP_CARRIER_FL_NUM", "DEP_DELAY", "AIR_TIME", "DEP_TIME_BLK", 'origin_iata_code', 'dest_iata_code', 'TAIL_NUM', 'sched_depart_date_time_UTC', "CRS_DEP_TIME", "ORIGIN", "DEST"]

# Cleaning Stage 1 Numeric.
numericFeatures = ["DISTANCE", "ELEVATION"] + time_features 

# Cleaning Stage 1: Categorical Features Within Original Dataset. 
categoricalFeatures_orig = ["DAY_OF_WEEK", "MONTH", "YEAR", "OP_UNIQUE_CARRIER",
 "origin_type", "dest_type"] + binaryfeatures

# Categorical with the newly created "CRS_DEP_BUCKET" feature: Will one-hot encode
categoricalFeatures_plus_bin = categoricalFeatures_orig + bucketFeatures

# Cleaning Stage 2 Numeric: CV Fold Processing Category: impute missing, standardize. 
# Graph features are added to the numeric list because we create the graph features on each fold
# (time=4, graph=4, elevation and distance = 2 = 11 scaled features)
numeric_all_features = numericFeatures + graph_features_numeric

Fold 0 Splits:
  Train: 2015-01-01 <= x < 2015-07-20
    Val: 2015-07-20 <= x < 2015-10-20
Fold 1 Splits:
  Train: 2015-10-20 <= x < 2016-05-07
    Val: 2016-05-07 <= x < 2016-08-07
Fold 2 Splits:
  Train: 2016-08-07 <= x < 2017-02-23
    Val: 2017-02-23 <= x < 2017-05-26
Fold 3 Splits:
  Train: 2017-05-26 <= x < 2017-12-12
    Val: 2017-12-12 <= x < 2018-03-14
Fold 4 Splits:
  Train: 2018-03-14 <= x < 2018-09-30
    Val: 2018-09-30 <= x < 2018-12-31


In [0]:
########## PIPELINE STAGE 1 DATA CLEANING: CLEAN ALL DATA ###########
# Run this whenever we get a new time span of data or change our data cleaning process (pipelineAllData())

####### 1. Pipeline Step 1: Data Transformation ########
# (drop unneeded rows, add new features that do no leak, one-hot encode categorical).

# Load Dataset
df_otpw60 = spark.read.parquet(f'{team_blob_url}/OTPW_60M')
#df_optw3m = spark.read.parquet(f'{team_blob_url}/OTPW_3M_2015')

# Clean the Data
features = []  # Features list will hold all final feature column names.  
pipelineModel, df_slim = pipelineAllData(df_otpw60)
cleanDF = pipelineModel.transform(df_slim)
featuresList(features)  # Store global features list after cleaning data
cleanDF.cache()

# Checkpoint the Clean Data
fold_name_clean = 'train_clean'
cleanDF.write.format("parquet").mode("overwrite")\
    .option("path", (f"{team_blob_url}/{fold_name_clean}/baseline" + timeInterval))\
    .save()

count before drop


1401363

count after drop


1357914

FLIGHT_NUM,FLIGHT_DATE,SUM_DURATION_BY_FLIGHT,NUM_FLIGHT_TODAY,AVG_DURATION,PRIOR_AVG_DURATION
1,2015-01-01,1477.0,5,295.4,0.0
1,2015-01-02,2983.0,11,271.1818181818182,295.4
1,2015-01-03,4486.0,17,263.88235294117646,271.1818181818182
1,2015-01-04,5974.0,23,259.7391304347826,263.88235294117646
1,2015-01-05,7492.0,29,258.3448275862069,259.7391304347826
1,2015-01-06,9012.0,35,257.48571428571427,258.3448275862069
1,2015-01-07,10516.0,41,256.4878048780488,257.48571428571427
1,2015-01-08,11979.0,47,254.87234042553192,256.4878048780488
1,2015-01-09,13490.0,53,254.5283018867925,254.87234042553192
1,2015-01-10,14786.0,57,259.4035087719298,254.5283018867925


count after label


1357914

root
 |-- label: double (nullable = true)
 |-- DISTANCE: double (nullable = true)
 |-- ELEVATION: double (nullable = true)
 |-- FE_PRIOR_DAILY_AVG_DEP_DELAY: double (nullable = true)
 |-- FE_PRIOR_AVG_DURATION: double (nullable = true)
 |-- FE_NUM_FLIGHT_SCHEDULED: long (nullable = true)
 |-- DEP_DELAY_LAG: double (nullable = false)
 |-- DAY_OF_WEEK: integer (nullable = true)
 |-- MONTH: integer (nullable = true)
 |-- YEAR: integer (nullable = true)
 |-- OP_UNIQUE_CARRIER: string (nullable = true)
 |-- origin_type: string (nullable = true)
 |-- dest_type: string (nullable = true)
 |-- is_holiday_double: double (nullable = true)
 |-- is_holiday_adjacent_double: double (nullable = true)
 |-- IS_FIRST_FLIGHT_OF_DAY_double: double (nullable = false)
 |-- DATE: timestamp (nullable = true)
 |-- FL_DATE: date (nullable = true)
 |-- OP_CARRIER_FL_NUM: integer (nullable = true)
 |-- DEP_DELAY: double (nullable = true)
 |-- AIR_TIME: double (nullable = true)
 |-- DEP_TIME_BLK: string (nullable =

In [0]:
# ######### PIPELINE STAGE 2 DATA CLEANING: CREATE CV FOLDS ############ 
# Run this whenever we get a new time span of data (or we want to change our data processing within each fold)

###### 3. Make folds.  Save to blob storage. #####

# Load feature list
features_in = spark.read.format("csv")\
    .option("path", (f"{team_blob_url}/features"))\
    .load()
features = features_in.toPandas().iloc[:, 0].tolist()

# # Read clean training dataset from disk
fold_name_clean = 'train_clean'
cleanDF = spark.read.format("parquet")\
    .option("path", (f"{team_blob_url}/{fold_name_clean}/baseline" + timeInterval))\
    .load()

# 3. Split Data: 

# 60 MONTH: train = 2015-2018, test = 2019
train = cleanDF.filter(col("YEAR") < 2019)
test = cleanDF.filter(col("YEAR") == 2019)

print(train.count())
print(test.count())

# Make folds
# ANSWER y to overwrite folds on disk
fold_name = "folds" + timeInterval   # all features
folds = make_folds_and_save(train, splits, fold_name=fold_name)
display(dbutils.fs.ls(f"{team_blob_url}/{fold_name}"))

#4. Load folds back in to memory
fold_name = "folds" + timeInterval   # all features
foldsloaded = load_folds_from_blob_and_cache(team_blob_url, fold_name)

23920802
7259155


Are you sure you want to overwrite 'folds60mo'? (y/n) y

Making train/val folds...
CV FOLD START: 0: 2023-08-11 17:15:55.306373


3135321

1469648

1264186

Done with downsampling: 2023-08-11 17:17:23.588876


23920802

7259155

Job Start: 2023-08-10 19:20:30.192144
CV FOLD START: 0: 2023-08-10 19:20:33.037139


23920802

7259155

8625718

Done with downsampling: 2023-08-10 19:20:59.157733
Done with graph features: 2023-08-10 19:24:00.481814
Done with CV Pipeline: 2023-08-10 19:43:11.959040
cv_features: ['scaled_numeric']
features_all: ['DAY_OF_WEEK_hot', 'MONTH_hot', 'YEAR_hot', 'OP_UNIQUE_CARRIER_hot', 'origin_type_hot', 'dest_type_hot', 'is_holiday_double_hot', 'is_holiday_adjacent_double_hot', 'IS_FIRST_FLIGHT_OF_DAY_double_hot', 'CRS_DEP_BUCKET_hot', 'scaled_numeric']


8625718

7259155

Done with Assembling Features: 2023-08-10 19:46:12.925602


DataFrame[label: double, DISTANCE: double, ELEVATION: double, FE_PRIOR_DAILY_AVG_DEP_DELAY: double, FE_PRIOR_AVG_DURATION: double, FE_NUM_FLIGHT_SCHEDULED: bigint, DEP_DELAY_LAG: double, DAY_OF_WEEK: int, MONTH: int, YEAR: int, OP_UNIQUE_CARRIER: string, origin_type: string, dest_type: string, is_holiday_double: double, is_holiday_adjacent_double: double, IS_FIRST_FLIGHT_OF_DAY_double: double, DATE: timestamp, FL_DATE: date, OP_CARRIER_FL_NUM: int, DEP_DELAY: double, AIR_TIME: double, DEP_TIME_BLK: string, origin_iata_code: string, dest_iata_code: string, TAIL_NUM: string, sched_depart_date_time_UTC: timestamp, CRS_DEP_TIME: int, ORIGIN: string, DEST: string, CRS_DEP_BUCKET: double, DAY_OF_WEEK_ix: double, MONTH_ix: double, YEAR_ix: double, OP_UNIQUE_CARRIER_ix: double, origin_type_ix: double, dest_type_ix: double, is_holiday_double_ix: double, is_holiday_adjacent_double_ix: double, IS_FIRST_FLIGHT_OF_DAY_double_ix: double, CRS_DEP_BUCKET_ix: double, DAY_OF_WEEK_hot: vector, MONTH_hot:

In [0]:
# PIPELINE FINAL DATA CLEANING: STAGE 2 DATA CLEANING
# Assumes you already did stage 1 cleaning (pipelineAllData()) on full dataset (before it was split into train and test)

def createTrainTest(train_clean, test_clean):
    '''Uses dataset cleaned by pipelineAllData(). Applies CV data processing. 
    Returns train and test split'''

    print(f'Job Start: {datetime.now()}')

    # 1. Load feature list
    features_in = spark.read.format("csv")\
        .option("path", (f"{team_blob_url}/features"))\
        .load()
    features = features_in.toPandas().iloc[:, 0].tolist()

    # 2. Apply CV data processing to train and test (missing imputation, normalization, graph features)
    train, test = foldsWork(0, train_clean, test_clean)
    
    # 3.  Checkpoint RAPID PIPELINE DATA Ready for Machine Learning Estimation
    fold_name_clean = 'train_clean_downsampled'
    train.write.format("parquet").mode("overwrite")\
        .option("path", (f"{team_blob_url}/{fold_name_clean}/rapid" + timeInterval))\
        .save()

    fold_name_clean = 'test_clean_downsampled'
    test.write.format("parquet").mode("overwrite")\
        .option("path", (f"{team_blob_url}/{fold_name_clean}/rapid" + timeInterval))\
        .save()
        
    return train, test

# 1. Load clean train dataset from disk
timeInterval = '60mo' 
fold_name_clean = 'train_clean'
cleanDF = spark.read.format("parquet")\
    .option("path", (f"{team_blob_url}/{fold_name_clean}/baseline" + timeInterval))\
    .load()

# 2. Cache
df = cleanDF.cache()

# 3. Split Data: train = 2015-2018. test = 2019
train_df = df.filter(col("YEAR") < 2019).cache()
test_df = df.filter(col("YEAR") == 2019).cache()

display(train_df.count())
display(test_df.count())

train, test = createTrainTest(train_df, test_df)

train_df.unpersist()
test_df.unpersist()
df.unpersist()
    
