### Supporting functions

Some of the supporting function for loading files, load into dataset

In [0]:
# Import Functions to be utilized throughout Workbook
from pyspark.sql.functions import col, to_timestamp, unix_timestamp, from_unixtime, expr,lpad,lag, row_number,concat, lit,count,substr,substring,coalesce,when,hour
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
import pandas as pd
import matplotlib.pyplot as plt
import numpy as np

import mlflow
import json
import os

from pyspark.sql.types import FloatType, IntegerType, DateType, StringType
from pyspark.sql.window import Window
from pyspark.sql.functions import sum, cos, sin, radians, col, explode, array, lit

from pyspark.ml.feature import OneHotEncoder, VectorAssembler, StringIndexer, StandardScaler
from pyspark.ml import Pipeline, Estimator
from pyspark.ml.evaluation import MulticlassClassificationEvaluator, BinaryClassificationEvaluator
from pyspark.ml.classification import LogisticRegression, RandomForestClassifier, GBTClassifier, MultilayerPerceptronClassifier
from pyspark.ml.tuning import ParamGridBuilder
from graphframes import *

def mount_to_storage(storage_account='vbui',
                     blob_container='team62container',
                     secret_key='cso',
                     secret_scope='vbui'):
    '''
    Function Used to Mount to blob Storage
    '''
    current_mounts = dbutils.fs.mounts()
    if len([1 for x in current_mounts if x.mountPoint == "/mnt/blob_storage"])>0:
        return dbutils.fs.ls("/mnt/blob_storage")
    else:
        dbutils.fs.mount(
            source = f"wasbs://{blob_container}@{storage_account}.blob.core.windows.net",
            mount_point = "/mnt/blob_storage",
            extra_configs = {f"fs.azure.sas.{blob_container}.{storage_account}.blob.core.windows.net": dbutils.secrets.get(scope=secret_scope, key=secret_key)})
    return dbutils.fs.ls("/mnt/blob_storage")


def import_file(file_name,
                file_type,
                mount_point='/mnt/blob_storage/'):
    '''
    Function used to Import files from Blob Storage, can read both parquet and csv files.
    '''
    if file_type=='parquet':
        return spark.read.parquet(f"dbfs:{mount_point}{file_name}")
    elif file_type=='csv':
        return spark.read.csv(f"dbfs:{mount_point}{file_name}",header=True)


def cal_fbeta_score(predictions, beta = 0.5):
  ''' Calcuate fbeta-score '''
  # Calculate fbeta-score
  evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="weightedPrecision")
  precision = evaluator.evaluate(predictions)

  evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="weightedRecall")
  recall = evaluator.evaluate(predictions)

  fmeasure = MulticlassClassificationEvaluator(metricName="weightedFMeasure", beta = beta).evaluate(predictions)

  results = {
    'precision' : precision,
    'recall': recall,
    'fbeta-score': (1 + beta ** 2) * (precision * recall) / (beta**2 * precision + recall),
    'fMeasure': fmeasure
  }
  return results


def time_series_cv(preprocessing: [], model, data, total_rows, metric_function = cal_fbeta_score, num_folds: int = 3):
  """
  Perform time series cross-validation.

  :param model_class: The ML model class to be trained.
  :param hyperparams: Dictionary of hyperparameters to tune.
  :param data: The dataset to be used for training and validation.
  :param evaluator: The evaluator to be used for model evaluation.
  :param num_folds: Number of folds for time series cross-validation.
  """
  # Assume the data is sorted by time dring preprocessing.
  mlflow.autolog(disable=True)
  print('============= Starting cross validation =============')
  # Split the data into folds respecting the temporal order.
  fold_size = total_rows // num_folds
  folds = [data.limit(fold_size * (i + 1)).subtract(data.limit(fold_size * i)).cache() for i in range(num_folds)]
  print('============= Finished spliting into folds =============')

  # Iterate over each combination of parameters
  metrics = []
  for i in range(1, num_folds):
      train = folds[i-1]
      test = folds[i]
      
      # Train and evaluate the model
      pipeline = Pipeline(stages=preprocessing + [model])
      fitted_pipeline = pipeline.fit(train)
      predictions = fitted_pipeline.transform(test)
      metric = metric_function(predictions)['fMeasure']
      metrics.append(metric)
    
  # Calculate the average metric across all folds for the current parameter combination
  mlflow.autolog(disable=False)
  scalars = np.array([i for i in range(1, len(metrics) + 1)])
  return np.sum(metrics * scalars) / np.sum(scalars)

def upsample(train_df, label_col = 'label', verbose=False):
  '''Upsamples train_df to balance classes'''
  #balance classes in train
  delay_count = train_df.filter(F.col(label_col) == 1).count()
  non_delay_count = train_df.filter(F.col(label_col) == 0).count()

  # keep_percent = non_delay_count/delay_count
  ratio = int(non_delay_count/delay_count)
  ratio_range = range(ratio)

  train_delay = train_df.filter(F.col(label_col) == 0)
  # train_non_delay = train_df.filter(F.col(label_col) == 1).sample(withReplacement=True, fraction=keep_percent,seed=42)
  train_non_delay = train_df.filter(F.col(label_col) == 1).withColumn("temp", explode(array([lit(x) for x in ratio_range]))).drop('temp')
  train_upsampled = train_delay.union(train_non_delay)
  return train_upsampled


def downsample(train_df, label_col = 'label', verbose=False):
  '''Downsamples train_df to balance classes'''
  #balance classes in train
  delay_count = train_df.filter(F.col(label_col) == 1).count()
  non_delay_count = train_df.filter(F.col(label_col) == 0).count()

  total = delay_count + non_delay_count
  keep_percent = delay_count / non_delay_count
  
  train_delay = train_df.filter(F.col(label_col) == 1)
  train_non_delay = train_df.filter(F.col(label_col) == 0).sample(withReplacement=False,fraction=keep_percent,seed=42)
  train_downsampled = train_delay.union(train_non_delay)
  return train_downsampled

def find_pagerank(df):
  vertices = df.selectExpr("ORIGIN as id").distinct()

  # Create edges DataFrame
  edges = df.select("ORIGIN", "DEST").selectExpr("ORIGIN as src", "DEST as dst")

  # Create GraphFrame
  graph = GraphFrame(vertices, edges)

  # Run PageRank algorithm
  results = graph.pageRank(resetProbability=0.15, maxIter=20)

  # Show PageRank scores
  return results.vertices.select("id", "pagerank").withColumnRenamed("id", "ORIGIN")



def join_pagerank(df,pagerank_df):
  left_join_df = df.join(pagerank_df, on="ORIGIN", how="left")
  
  return left_join_df

In [0]:
# big file
# file_name = 'Draft_Final_DF_1Y_2.00'
df = import_file('Draft_Final_DF_1Y_3.00','parquet')
df = df.withColumn('FL_DATE', df['FL_DATE'].cast(DateType()))


df = df.sort(df.FL_DATE)
df.display()

MONTH,FL_DATE,OP_CARRIER_AIRLINE_ID,ORIGIN,DEST,DEP_DEL15,PREVIOUS_FLIGHT_ARRIVED_LATE,PREVIOUS_DIVERTED,PLANE_FORECAST_TURNAROUND_TIME,FLIGHTS_SCHEDULED_2HRS_OR_LESS_BEFORE_CRS_DEP,FLIGHTS_DEPARTED_2HRS_BEFORE_PREDICTION,FLIGHTS_DELAYED_2HRS_BEFORE_PREDICITION,YEAR,QUARTER,DAY_OF_MONTH,DAY_OF_WEEK
1,2019-01-01,20304,DHN,ATL,0,,,,0,0,0,2019,1,1,3
1,2019-01-01,19805,SAV,CLT,0,0.0,0.0,42.0,4,6,1,2019,1,1,3
1,2019-01-01,20304,DHN,ATL,0,1.0,0.0,-1305.0,0,0,0,2019,1,1,3
1,2019-01-01,20363,ABE,ATL,0,,,,1,0,0,2019,1,1,3
1,2019-01-01,19977,DLH,ORD,0,,,,0,0,0,2019,1,1,3
1,2019-01-01,20368,ABE,SFB,0,0.0,0.0,63.0,0,0,0,2019,1,1,3
1,2019-01-01,20304,DLH,MSP,1,1.0,0.0,-827.0,1,0,0,2019,1,1,3
1,2019-01-01,20368,ABE,PIE,0,0.0,0.0,76.0,0,2,0,2019,1,1,3
1,2019-01-01,20304,DLH,MSP,0,0.0,0.0,41.0,0,0,0,2019,1,1,3
1,2019-01-01,20304,ABE,DTW,0,0.0,0.0,30.0,1,1,0,2019,1,1,3


## Modelling

### Preprocessing

In [0]:
features = ['MONTH', 'OP_CARRIER_AIRLINE_ID', 'ORIGIN', 'DEST', 'PREVIOUS_FLIGHT_ARRIVED_LATE', 
            'PREVIOUS_DIVERTED', 'PLANE_FORECAST_TURNAROUND_TIME',
            'FLIGHTS_DEPARTED_2HRS_BEFORE_PREDICTION', 'FLIGHTS_DELAYED_2HRS_BEFORE_PREDICITION', 'FLIGHTS_SCHEDULED_2HRS_OR_LESS_BEFORE_CRS_DEP']

df = df.withColumnRenamed(existing='DEP_DEL15', new='label')

train_set, test_set = df.filter(df.QUARTER < 4), df.filter(df.QUARTER == 4)
# train_set = upsample(train_set)

train_set = train_set.select(features + ['label']).fillna(0)
test_set = test_set.select(features + ['label']).fillna(0)

# page_rank_df = find_pagerank(train_set)

# train_set = join_pagerank(train_set, page_rank_df).fillna(0)
# test_set = join_pagerank(test_set, page_rank_df).fillna(0)

preprocessing = []

string_cols = ['ORIGIN', 'DEST', 'OP_CARRIER_AIRLINE_ID']

numerical_cols = ['PLANE_FORECAST_TURNAROUND_TIME', 'FLIGHTS_DEPARTED_2HRS_BEFORE_PREDICTION', 'FLIGHTS_DELAYED_2HRS_BEFORE_PREDICITION', 'FLIGHTS_SCHEDULED_2HRS_OR_LESS_BEFORE_CRS_DEP']
for string_col in string_cols:
    string_index = StringIndexer(inputCol=string_col, outputCol="indexed_" + string_col, handleInvalid='keep')
    preprocessing.append(string_index)
    list_onehot = OneHotEncoder(inputCol="indexed_" + string_col, outputCol="encoded_" + string_col, handleInvalid='keep')
    preprocessing.append(list_onehot)

encoded_MONTH = 'encoded_MONTH'
preprocessing.append(OneHotEncoder(inputCol='MONTH', outputCol=encoded_MONTH, handleInvalid='keep'))

used_features = ['encoded_' + col for col in string_cols] + [encoded_MONTH]

numerical_ass = VectorAssembler(inputCols = numerical_cols, outputCol = 'numerical_features')
preprocessing.append(numerical_ass)

standard_sc = StandardScaler(inputCol = 'numerical_features', outputCol = 'scaled_numerical_features')
preprocessing.append(standard_sc)

vector_ass = VectorAssembler(inputCols=['scaled_numerical_features'] + used_features, outputCol='features')
preprocessing.append(vector_ass)

preprocess_pipeline = Pipeline(stages = preprocessing)

### Train test split data and transform

In [0]:
# train test split
fitted_preprocess_pipeline = preprocess_pipeline.fit(train_set)
processed_train_set = fitted_preprocess_pipeline.transform(train_set)
processed_test_set = fitted_preprocess_pipeline.transform(test_set)

total_rows = train_set.count()

# mlflow.autolog(disable=True)

Downloading artifacts:   0%|          | 0/62 [00:00<?, ?it/s]

Uploading artifacts:   0%|          | 0/4 [00:00<?, ?it/s]

## Models

### Logistic Regression

$$
Weighed \space Avg = \frac{\sum_{i=1}^{n} i * f \\_ beta_i }{\sum_{i=1}^{n} i}
$$

In [0]:
# Create LogisticRegression model with elastic parameter = 0.4153
lr = LogisticRegression(featuresCol='features', labelCol='label', elasticNetParam=0.4153)

# Start MLflow run
with mlflow.start_run():
    result = dict()
    # Fit the Logistic Regression model on the processed training set
    model = lr.fit(processed_train_set)
    
    # Evaluate model on train_set using custom metric cal_fbeta_score and other default metrics
    train_cv = time_series_cv(preprocessing, lr, train_set, total_rows, num_folds=4)
    mlflow.log_metric('cv_train', train_cv)
    test_predictions = model.transform(processed_test_set)

    # Evaluate the model's performance on the test set using Fbeta score
    test_eval = cal_fbeta_score(test_predictions)
    for k, v in test_eval.items():
        result['test_' + k] = v
    
    # Log parameters used in the experiment to MLflow
    mlflow.log_metrics(result)
    mlflow.log_param('features', features)
    mlflow.log_param('ML Algo', 'Logistic Regression')
    # mlflow.spark.log_model(model, 'model')


2024/04/20 01:03:56 INFO mlflow.spark: Inferring pip requirements by reloading the logged model from the databricks artifact repository, which can be time-consuming. To speed up, explicitly specify the conda_env or pip_requirements when calling log_model().


Downloading artifacts:   0%|          | 0/9 [00:00<?, ?it/s]



Uploading artifacts:   0%|          | 0/4 [00:00<?, ?it/s]



2024/04/20 01:08:14 INFO mlflow.tracking.fluent: Autologging successfully enabled for pyspark.
2024/04/20 01:08:14 INFO mlflow.tracking.fluent: Autologging successfully enabled for pyspark.ml.


### Random Forest

In [0]:
rf_params = {'featuresCol': 'features', 'labelCol' : 'label',
             'maxBins': 46, 'maxDepth': 20, 'minInfoGain': 0.9670683775019292, 
             'minInstancesPerNode': 2, 'numTrees': 239, 'bootstrap' : False}

rf = RandomForestClassifier(featuresCol='features', labelCol='label', numTrees=239, maxDepth=20, maxBins=46, minInstancesPerNode=2, minInfoGain=0.9670683775019292, bootstrap=False)
#rf_pipeline = Pipeline(stages=preprocessing + [rf])

# Start MLflow run
with mlflow.start_run():
    result = dict()
    # Fit the model on train_set
    model = rf.fit(processed_train_set)
    
    # Evaluate model on train_set using custom metric cal_fbeta_score and other default metrics
    train_cv = time_series_cv(preprocessing, rf, train_set, total_rows, num_folds=4)
    mlflow.log_metric('cv_train', train_cv)

    test_predictions = model.transform(processed_test_set)
    test_eval = cal_fbeta_score(test_predictions)
    for k, v in test_eval.items():
        result['test_' + k] = v
    
    mlflow.log_metrics(result)
    mlflow.log_param('features', features)
    mlflow.log_param('ML Algo', 'RandomForestClassifier')
    
    # Save the model
    # mlflow.spark.log_model(model, 'model')

2024/04/20 01:11:27 INFO mlflow.spark: Inferring pip requirements by reloading the logged model from the databricks artifact repository, which can be time-consuming. To speed up, explicitly specify the conda_env or pip_requirements when calling log_model().


Downloading artifacts:   0%|          | 0/14 [00:00<?, ?it/s]

2024/04/20 01:11:31 INFO mlflow.store.artifact.artifact_repo: The progress bar can be disabled by setting the environment variable MLFLOW_ENABLE_ARTIFACTS_PROGRESS_BAR to false


Uploading artifacts:   0%|          | 0/4 [00:00<?, ?it/s]



2024/04/20 01:16:04 INFO mlflow.tracking.fluent: Autologging successfully enabled for pyspark.
2024/04/20 01:16:04 INFO mlflow.tracking.fluent: Autologging successfully enabled for pyspark.ml.


### GBT

In [0]:
gbt_params = {'featuresCol': 'features', 'labelCol': 'label', 'maxBins': 126, 'maxDepth': 11, 'minInstancesPerNode': 3, 'stepSize': 0.27286063089767754, 'subsamplingRate': 0.6445718010290064}
gbt = GBTClassifier(**gbt_params)

# Start MLflow run
with mlflow.start_run():
    result = dict()
    # Fit the model on train_set
    model = gbt.fit(processed_train_set)
    
    # Evaluate model on train_set using custom metric cal_fbeta_score and other default metrics
    train_cv = time_series_cv(preprocessing, gbt, train_set, total_rows, num_folds=4)
    mlflow.log_metric('cv_train', train_cv)

    test_predictions = model.transform(processed_test_set)
    test_eval = cal_fbeta_score(test_predictions)
    for k, v in test_eval.items():
        result['test_' + k] = v
    
    mlflow.log_metrics(result)
    mlflow.log_param('features', features)
    mlflow.log_param('ML Algo', 'GBTClassifier')
    
    # Save the model
    # mlflow.spark.log_model(model, 'model')



2024/04/21 02:02:16 INFO mlflow.tracking.fluent: Autologging successfully enabled for pyspark.
2024/04/21 02:02:16 INFO mlflow.tracking.fluent: Autologging successfully enabled for pyspark.ml.
