# Random Forest

In [0]:
from pyspark.sql.functions import col, when, count
from pyspark.sql.functions import rand, date_format, current_date, datediff
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType
from pyspark.sql import functions as F
from pyspark.sql import Window
from pyspark.sql import SparkSession

from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.feature import StringIndexer, VectorAssembler, Imputer
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import MinMaxScaler
from pyspark.ml import Pipeline
from pyspark.ml.functions import vector_to_array
from sklearn.metrics import classification_report, precision_recall_fscore_support

import matplotlib.pyplot as plt
# import seaborn as sns
import pandas as pd
import numpy as np

print("Welcome to the W261 final project!") 

Welcome to the W261 final project!


In [0]:
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml import Pipeline

from sklearn.metrics import classification_report, precision_recall_fscore_support

%md

# Data Mount

In [0]:
## Place this cell in any team notebook that needs access to the team cloud storage.


# The following blob storage is accessible to team members only (read and write)
# access key is valid til TTL
# after that you will need to create a new SAS key and authenticate access again via DataBrick command line
blob_container  = "261-final-project"       # The name of your container created in https://portal.azure.com
storage_account = "ansonbquon"  # The name of your Storage account created in https://portal.azure.com
secret_scope    = "final_project"           # The name of the scope created in your local computer using the Databricks CLI
secret_key      = "project_key"             # The name of the secret key created in your local computer using the Databricks CLI
team_blob_url   = f"wasbs://{blob_container}@{storage_account}.blob.core.windows.net"  #points to the root of your team storage bucket


# the 261 course blob storage is mounted here.
mids261_mount_path      = "/mnt/mids-w261"

# SAS Token: Grant the team limited access to Azure Storage resources
spark.conf.set(
  f"fs.azure.sas.{blob_container}.{storage_account}.blob.core.windows.net",
  dbutils.secrets.get(scope = secret_scope, key = secret_key)
)

# Navigate back to your Storage account in https://portal.azure.com, to inspect the partitions/files.
# df.write.parquet(f"{team_blob_url}/TP")



# see what's in the blob storage root folder 
display(dbutils.fs.ls(f"{team_blob_url}"))

In [0]:
#base folder
display(dbutils.fs.ls(f"{team_blob_url}/base"))

## Load training and testing data

In [0]:
display(dbutils.fs.ls(f"{team_blob_url}/PRD/"))

## Baseline features

In [0]:
label = 'depDel15'

features_columns = [
    'distance',
    'quarterIndex',
    'crsDepTimeSine',
    'monthSine',
    'dayOfWeekSine',
    'originIndex',
    'originTypeIndex',
    'originPageRank',
    'destPageRank',
    'timeBtwFlightsPlanned',
    'depDel15PrevFIndex',
    'depDelayGroupPrevFIndex',
    'depTimeBlkPrevFIndex',
    'arrDel15PrevFIndex',
    'arrDelayGroupPrevFIndex',
    'arrTimeBlkPrevFIndex',
    'originTypePrevFIndex',
    'distancePrevF',
    'daysToNearestHoliday'
]



In [0]:
len(features_columns)

19

## Experimental features

In [0]:
label = 'depDel15'

features_columns = [
    'distance',
    #'quarterIndex',
    'crsDepTimeSine',
    'monthSine',
    #'dayOfWeekSine',
    #'dayOfMonthIndex',
    'originIndex',
    #'originTypeIndex',
    'originPageRank',
    'destPageRank',
    #'opCarrierFlNumBinIndex1',
    # 'hourlyAltimeterSetting',
    # 'hourlyDewPointTemperature',
    # 'hourlyDryBulbTemperature',
    # 'hourlyPrecipitation',
    # 'hourlyPressureChange',
    # 'hourlyPressureTendency',
    # 'hourlyRelativeHumidity',
    # 'hourlySeaLevelPressure',
    # 'hourlyStationPressure',
    # 'hourlyVisibility',
    # 'hourlyWetBulbTemperature',
    # 'hourlyWindDirection',
    # 'hourlyWindGustSpeed',
    # 'hourlyWindSpeed',
    'daysToNearestHoliday',
    'timeBtwFlightsPlanned',
    'depDel15PrevFIndex',
    'depDelayGroupPrevFIndex',
    'depTimeBlkPrevFIndex',
    'arrDel15PrevFIndex',
    'arrDelayGroupPrevFIndex',
    'arrTimeBlkPrevFIndex',
    #'originTypePrevFIndex',
    'distancePrevF',
    #'daysToNearestHoliday'
]



In [0]:
assembler_columns = [
    'distance',
    'crsDepTimeSine',
    'monthSine',
    'originIndex',
    'originPageRank',
    'destPageRank',
    'daysToNearestHoliday',
    'timeBtwFlightsPlanned',
    'depDel15PrevFIndex',
    'depDelayGroupPrevFIndex',
    'depTimeBlkPrevFIndex',
    'arrDel15PrevFIndex',
    'arrDelayGroupPrevFIndex',
    'arrTimeBlkPrevFIndex',
    'distancePrevF'
]

# 2015-2019 Data

In [0]:
#organize columns
# LABEL
label = 'depDel15'

# NUMERIC COLUMNS
# min/max scaling to be applied
num_scale_columns = [   'distance',
'timeBtwFlights',
'hourlyAltimeterSetting',
'hourlyDewPointTemperature',
'hourlyDryBulbTemperature',
'hourlyPrecipitation',
'hourlyPressureChange',
'hourlyPressureTendency',
'hourlyRelativeHumidity',
'hourlyVisibility',
'hourlyWetBulbTemperature',
'hourlyWindDirection',
'hourlyWindGustSpeed',
'hourlyWindSpeed',
'daysToNearestHoliday',
'originPageRank',
'destPageRank',
'predictedMeanHourlyPrecipitation',
'predictedMeanHourlyVisibility',
'predictedMeanHourlyDewPointTemp',
                        ]

#no change columns
no_change_columns = [   'monthIndex',
'dayOfWeekIndex',
'crsDepTimeSine',
'quarterIndex',
'opUniqueCarrierIndex',
'yearIndex',
'originTypeIndex',
'destTypeIndex',
'depDel15PrevFIndex',
'depDelayGroupPrevFIndex',
'tailNumBinIndex0',
'tailNumBinIndex1',
'tailNumBinIndex2',
'tailNumBinIndex3',
'tailNumBinIndex4',
'tailNumBinIndex5',
'tailNumBinIndex6',
'tailNumBinIndex7',
'tailNumBinIndex8',
'tailNumBinIndex9',
'tailNumBinIndex10',
'tailNumBinIndex11',
'tailNumBinIndex12',
'originBinIndex0',
'originBinIndex1',
'originBinIndex2',
'originBinIndex3',
'originBinIndex4',
'originBinIndex5',
'originBinIndex6',
'originBinIndex7',
'originBinIndex8',
'destBinIndex0',
'destBinIndex1',
'destBinIndex2',
'destBinIndex3',
'destBinIndex4',
'destBinIndex5',
'destBinIndex6',
'destBinIndex7',
'destBinIndex8',
'originIsoRegionBinIndex0',
'originIsoRegionBinIndex1',
'originIsoRegionBinIndex2',
'originIsoRegionBinIndex3',
'originIsoRegionBinIndex4',
'originIsoRegionBinIndex5',
'destIsoRegionBinIndex0',
'destIsoRegionBinIndex1',
'destIsoRegionBinIndex2',
'destIsoRegionBinIndex3',
'destIsoRegionBinIndex4',
'destIsoRegionBinIndex5',
                    ]

# not used (currently)
omit_columns = [    'crsDepTime',
'crsElapsedTime',
'crsDepTimePrevF',
'depTimePrevF',
'crsArrTimePrevF',
'arrTimePrevF',
'crsElapsedTimePrevF',
'actualElapsedTimePrevF',
'distancePrevF',
'timeBtwFlightsPlanned',
'hourlySeaLevelPressure',
'hourlyStationPressure',
'monthSine',
'dayOfWeekSine',
'dayOfMonthIndex',
'depTimeBlkPrevFIndex',
'arrDel15PrevFIndex',
'arrDelayGroupPrevFIndex',
'arrTimeBlkPrevFIndex',
'originTypePrevFIndex',
'tailNumIndex',
'opCarrierFlNumIndex',
'originIndex',
'destIndex',
'originIsoRegionIndex',
'destIsoRegionIndex',
'originPrevFIndex',
'opCarrierFlNumBinIndex0',
'opCarrierFlNumBinIndex1',
'opCarrierFlNumBinIndex2',
'opCarrierFlNumBinIndex3',
'opCarrierFlNumBinIndex4',
'opCarrierFlNumBinIndex5',
'opCarrierFlNumBinIndex6',
'opCarrierFlNumBinIndex7',
'opCarrierFlNumBinIndex8',
'opCarrierFlNumBinIndex9',
'opCarrierFlNumBinIndex10',
'opCarrierFlNumBinIndex11',
'opCarrierFlNumBinIndex12',
'originPrevFBinIndex0',
'originPrevFBinIndex1',
'originPrevFBinIndex2',
'originPrevFBinIndex3',
'originPrevFBinIndex4',
'originPrevFBinIndex5',
'originPrevFBinIndex6',
'originPrevFBinIndex7',
'originPrevFBinIndex8',     
                    ]


In [0]:
# Function to input scaled values back into original columns
def replace_scaled_columns(df, input_cols):
    df = df.withColumn('scaled_array', vector_to_array(col('scaled_features')))
    for idx, input_col in enumerate(input_cols):
        df = df.withColumn(input_col, F.col('scaled_array').getItem(idx))
    return df

In [0]:
label = 'depDel15'

baseline_features = [
    'distance',
    'quarterIndex',
    'crsDepTimeSine',
    'monthSine',
    'dayOfWeekSine',
    'originIndex',
    'originTypeIndex',
    'originPageRank',
    'destPageRank',
    'timeBtwFlightsPlanned',
    'depDel15PrevFIndex',
    'depDelayGroupPrevFIndex',
    'depTimeBlkPrevFIndex',
    'arrDel15PrevFIndex',
    'arrDelayGroupPrevFIndex',
    'arrTimeBlkPrevFIndex',
    'originTypePrevFIndex',
    'distancePrevF',
    'daysToNearestHoliday'
]



In [0]:
#function to fit and transform pipeline on df
def df_fit_pipeline(df):

    rf_label = StringIndexer(inputCol=label, outputCol='label')

    # create a VectorAssembler to combine the feature columns into a single vector column
    rf_features = VectorAssembler(inputCols=baseline_features, outputCol='features')

    pipeline = Pipeline(stages=[rf_label, rf_features])

    return pipeline.fit(df)

In [0]:
#function to train model
def train_model(df, num_trees, max_depth):

    rf_classifier = RandomForestClassifier(featuresCol="features", labelCol="label", numTrees=num_trees, maxDepth=max_depth).setMaxBins(4745)

    rf_model = rf_classifier.fit(df)

    return rf_model

In [0]:
def model_metrics(model, df):
    #get predictions
    predictions = model.transform(df)

    y_true = predictions.select("label").toPandas()
    y_pred = predictions.select('prediction').toPandas()

    prf = precision_recall_fscore_support(y_true, y_pred, beta=2.0, average='binary', pos_label=1)

    return prf

In [0]:
df_train_names = ["df_xval1_train_processed", "df_xval2_train_processed", "df_xval3_train_processed"]
df_test_names = ["df_xval1_test_processed", "df_xval2_test_processed", "df_xval3_test_processed"]
data_folder = "PRD/xval_corrected"
num_trees_params = [5, 10]
max_depth_params = [5, 10]

In [0]:
def train_random_forest_models(df_train_names, df_test_names, data_folder, num_trees_params, max_depth_params):

    #establish blank model summary dict
    model_summary = {}

    for xval in range(len(df_train_names)):

        #load train data
        df_train = spark.read.parquet(f"{team_blob_url}/{data_folder}/{df_train_names[xval]}/")

        #load test data
        df_test = spark.read.parquet(f"{team_blob_url}/{data_folder}/{df_test_names[xval]}/")

        #fit and transform df's
        pipeline = df_fit_pipeline(df_train)
        df_fit_train = pipeline.transform(df_train).select('features', 'label')
        df_fit_test = pipeline.transform(df_test).select('features', 'label')

        #train model on each param
        for i in num_trees_params:
            for j in max_depth_params:
                #establish blank dict to save run info to
                model_dict = {}

                #save model name
                model_dict['datasplit'] = f'xval{xval+1}'
                
                model_dict['params'] = {
                                        'numTrees': i,
                                        'maxDepth': j
                                        }
                
                model = train_model(df_fit_train, i, j)

                #save model
                #might not need this/get memory warning? can comment out
                model_dict['model'] = model

                #save model iterations
                model_dict['model_its'] = model.summary.totalIterations

                #save final loss
                model_dict['loss'] = model.summary.objectiveHistory[-1]

                #get train metrics
                train_metrics = model_metrics(model, df_fit_train)

                train_metrics_dict =   {'Precision': train_metrics[0],
                                        'Recall': train_metrics[1],
                                        'Fbeta': train_metrics[2],
                                        }
                model_dict['train_metrics'] = train_metrics_dict

                #get test metrics
                test_metrics = model_metrics(model, df_fit_test)

                test_metrics_dict =   { 'Precision': test_metrics[0],
                                        'Recall': test_metrics[1],
                                        'Fbeta': test_metrics[2],
                                        }
                model_dict['test_metrics'] = test_metrics_dict

                #save model dict
                model_summary[f'xval{xval+1}_param_{i}_{j}'] = model_dict
    
    return model_summary

        

In [0]:
def get_feature_importance(model_summary, model_name, selected_features):
    xval1_mod_feature_importances = model_summary[model_name]['model'].featureImportances

    for feature, importance in zip(selected_features, xval1_mod_feature_importances):
        print(feature, "{:.2f}".format(importance))


In [0]:
#print test results

#define function to get weighted average of xval models
def weighted_average(errors, weights):
    return sum(error * weight for error, weight in zip(errors, weights)) / sum(weights)

xval_wavg_vals = [0.2, 0.3, 0.5]

def get_model_metrics(model_summary, num_trees_params, max_depth_params, xval_wavg_vals):
    #loop through param iterations and get each xval metric
    for i in num_trees_params:
        for j in max_depth_params:
            print(f'Model Params: Number of Trees: {i}, Max Depth: {j}')
            print()

            #loss
            loss_values = [model_summary[f'xval1_param_{i}_{j}']['loss'],
                                model_summary[f'xval2_param_{i}_{j}']['loss'],
                                model_summary[f'xval3_param_{i}_{j}']['loss'],
                                ]
            
            loss_wavg = weighted_average(loss_values, xval_wavg_vals)
            print(f'Loss Weighted Average: {loss_wavg}')
            print()

            #loop through metrics
            metric_list = ['Fbeta', 'Recall', 'Precision']
            for metric in metric_list:

                train_metric = [model_summary[f'xval1_param_{i}_{j}']['train_metrics'][metric],
                                model_summary[f'xval2_param_{i}_{j}']['train_metrics'][metric],
                                model_summary[f'xval3_param_{i}_{j}']['train_metrics'][metric],
                                ]
                
                test_metric = [ model_summary[f'xval1_param_{i}_{j}']['test_metrics'][metric],
                                model_summary[f'xval2_param_{i}_{j}']['test_metrics'][metric],
                                model_summary[f'xval3_param_{i}_{j}']['test_metrics'][metric],
                                ]
                
                train_wavg = weighted_average(train_metric, xval_wavg_vals)
                test_wavg = weighted_average(test_metric, xval_wavg_vals)
                print(f'Train {metric} Weighted Average: {train_wavg}')
                print(f'Test {metric} Weighted Average: {test_wavg}')
                print()
                print("*"*50)

In [0]:
display(dbutils.fs.ls(f"{team_blob_url}/PRD/xval_corrected/"))

In [0]:
model_summary = train_random_forest_models(df_train_names, df_test_names, data_folder, num_trees_params, max_depth_params)

# 2020 Predictions

In [0]:
df_train = spark.read.parquet(f'{team_blob_url}/PRD/xval_corrected/df_xval1-3_train_processed')
df_test = spark.read.parquet(f'{team_blob_url}/PRD/xval_2020/df_final_val_processed_2015-2018/')

In [0]:
display(df_train)

In [0]:
display(df_test)

In [0]:
final_pipeline = df_fit_pipeline(df_train)
df_train = final_pipeline.transform(df_train)
df_test = final_pipeline.transform(df_test)

In [0]:
final_rf_model = train_model(df_train, 10, 5)

In [0]:
model_metrics(final_rf_model, df_train)

In [0]:
model_metrics(final_rf_model, df_test)

In [0]:
final_rf_importances = final_rf_model.featureImportances

for feature, importance in zip(baseline_features, final_rf_importances):
    print(feature, "{:.2f}".format(importance))

In [0]:
final_pipeline.write().save(f'{team_blob_url}/AQ/models/dt_2015-2018_rf_pipeline')

In [0]:
final_rf_model.save(f'{team_blob_url}/AQ/models/dt_2015-2018_2020_rf_model')

In [0]:
predictions = final_rf_model.transform(df_test)

y_true = predictions.select("label").toPandas()
y_pred = predictions.select('prediction').toPandas()
precision_recall_fscore_support(y_true, y_pred, beta=2.0, average='binary', pos_label=1)

In [0]:
predictions.write.mode('overwrite').parquet(f"{team_blob_url}/AQ/2020_rf_predictions.parquet")

In [0]:
display(dbutils.fs.ls(f"{team_blob_url}/AQ/"))

In [0]:
df_prediction_verification = spark.read.parquet(f"{team_blob_url}/AQ/2020_rf_predictions.parquet")
display(df_prediction_verification)

# 2019 Predictions

In [0]:
df_test = spark.read.parquet(f'{team_blob_url}/PRD/xval_corrected/df_final_val_processed/')

In [0]:
df_test = final_pipeline.transform(df_test)

In [0]:
predictions = final_rf_model.transform(df_test)

y_true = predictions.select("label").toPandas()
y_pred = predictions.select('prediction').toPandas()
precision_recall_fscore_support(y_true, y_pred, beta=2.0, average='binary', pos_label=1)

In [0]:
predictions.write.mode('overwrite').parquet(f"{team_blob_url}/AQ/2019_rf_predictions.parquet")

In [0]:
display(dbutils.fs.ls(f"{team_blob_url}/AQ/"))