## References

##### General
[MLlib](https://spark.apache.org/docs/latest/ml-classification-regression.html#decision-tree-classifier)

[MLlib Classification](https://people.eecs.berkeley.edu/~jegonzal/pyspark/_modules/pyspark/ml/classification.html)

[Feature Engineering](https://docs.databricks.com/applications/machine-learning/preprocess-data/mllib.html) 

[Feature Transformers](https://spark.apache.org/docs/latest/ml-features)

[Feature Importance](https://spark.apache.org/docs/2.1.0/api/python/pyspark.ml.html?highlight=featureimportance)

##### Tree Algorithms
[Decision Tree](https://spark.apache.org/docs/1.5.2/ml-decision-tree.html)

[Gradient Boosted Trees](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.ml.classification.GBTClassifier.html)

[Xgboost](https://databricks.github.io/spark-deep-learning/_modules/sparkdl/xgboost/xgboost.html)

## Libraries

In [0]:
# general
import re
import time
import numpy as np
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt

# sql 
from pyspark.sql import functions as f
from pyspark.sql import SQLContext

# ml pipeline
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, VectorIndexer, VectorAssembler
from pyspark.ml.feature import OneHotEncoder

# models
from pyspark.ml.classification import DecisionTreeClassifier, RandomForestClassifier, GBTClassifier

from sparkdl.xgboost import XgboostClassifier

# metrics
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder, TrainValidationSplit
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from sklearn.metrics import classification_report

sqlContext = SQLContext(sc)

## Helper Functions

In [0]:
def ExtractFeatureImp(featureImp, dataset, featuresCol):
  '''
  Locates the indices from the pipeline transformations for each feature using the schema
  Maps through each index to find its feature importance
  
  output:                Pandas dataframe
  '''
  list_extract = []
  for i in dataset.schema[featuresCol].metadata["ml_attr"]["attrs"]:
      list_extract = list_extract + dataset.schema[featuresCol].metadata["ml_attr"]["attrs"][i]
  varlist = pd.DataFrame(list_extract)
  varlist['score'] = varlist['idx'].apply(lambda x: featureImp[x])
  return(varlist.sort_values('score', ascending = False))

In [0]:
def eval_metrics(prediction):
  '''
  Calculates roc_auc and pr_auc when given the predictions
  '''
  
  # initialize evaluators
  eval_roc_auc = BinaryClassificationEvaluator(metricName='areaUnderROC')
  eval_pr_auc = BinaryClassificationEvaluator(metricName='areaUnderPR')
  
  # calculate metrics
  roc_auc = eval_roc_auc.evaluate(prediction)
  pr_auc = eval_pr_auc.evaluate(prediction)
  
  # return metrics
  metrics = {'roc_auc':roc_auc, 'pr_auc': pr_auc}
  return metrics

In [0]:
def cv_scores(cvModel):
  '''
  Shows scores for each iteration during cross validation and param grid search
  '''
  params = [{p.name: v for p, v in m.items()} for m in cvModel.getEstimatorParamMaps()]

  scores = pd.DataFrame.from_dict([
      {cvModel.getEvaluator().getMetricName(): metric, **ps} 
      for ps, metric in zip(params, cvModel.avgMetrics)
  ])
  
  return scores

## Import data

In [0]:
data = spark.read.option("header", "true").parquet(f"dbfs:/tmp/out/final_air_weather.parquet")

In [0]:
display(data)

MONTH,DAY_OF_WEEK,OP_CARRIER,ORIGIN,DEST,CRS_DEP_TIME,DEP_DEL15,DEP_TIME_BLK,CRS_ARR_TIME,CRS_ELAPSED_TIME,DISTANCE,WND_SPD,VIS_DIST,TMP_TEMP,DEW_TEMP,SLP_PRESSURE,PRECIPITATION,CLOUD_COVERAGE,CLOUD_BASE_HEIGHT,ALTIMETER_SET
7,7,AA,ABQ,CLT,40,0.0,0001-0559,605,205.0,1449.0,31,16093,153,87,10167,0,2,1036,10169
1,1,F9,ABQ,MCO,40,0.0,0001-0559,618,218.0,1553.0,31,16093,153,87,10167,0,2,1036,10169
1,5,F9,ABQ,MCO,40,1.0,0001-0559,618,218.0,1553.0,31,16093,153,87,10167,0,2,1036,10169
11,3,AS,ADK,ANC,1631,1.0,1600-1659,2015,164.0,1192.0,31,16093,153,87,10167,0,2,1036,10169
11,3,AS,ANC,SEA,245,0.0,0001-0559,709,204.0,1448.0,31,16093,153,87,10167,0,2,1036,10169
11,3,AS,ANC,SEA,45,0.0,0001-0559,506,201.0,1448.0,31,16093,153,87,10167,0,2,1036,10169
11,3,AS,ANC,SEA,145,0.0,0001-0559,604,199.0,1448.0,31,16093,153,87,10167,0,2,1036,10169
11,3,DL,ANC,SEA,50,0.0,0001-0559,515,205.0,1448.0,31,16093,153,87,10167,0,2,1036,10169
11,3,AS,ANC,PDX,55,0.0,0001-0559,529,214.0,1542.0,31,16093,153,87,10167,0,2,1036,10169
1,1,AS,ANC,PDX,55,0.0,0001-0559,525,210.0,1542.0,31,16093,153,87,10167,0,2,1036,10169


In [0]:
print('shape: ', (data.count(), len(data.columns)))

## Undersampling - Optional

In [0]:
delayed_df = data.filter(f.col('DEP_DEL15')==1)
ontime_df = data.filter(f.col('DEP_DEL15')==0)
sampleRatio = 1.0* delayed_df.count() / data.count()
new_ontime_df = ontime_df.sample(False, sampleRatio)
new_data = delayed_df.unionAll(new_ontime_df)

print(new_data.count(), len(new_data.columns))

## Prep Data

In [0]:
def data_pipeline(data, label_col): 
  '''
  This funcion transforms input data into two columns: label and features.
  StringIndexer and one-hot encoding are applied to categorical features.  
  VectorAssembler combines both categorical and numeric features into one column.
  '''
  
  # stages in pipeline
  stages = []

  # convert label into label indices using the StringIndexer
  label_stringIdx = StringIndexer(inputCol=label_col, outputCol="label")
  stages += [label_stringIdx]

  # one hot encode categorical variables
  categoricalColumns =  [i[0] for i in data.dtypes if i[1]=='string']

  for categoricalCol in categoricalColumns:
    stringIndexer = StringIndexer(inputCol=categoricalCol, outputCol=categoricalCol + "Index")
    encoder = OneHotEncoder(inputCols=[stringIndexer.getOutputCol()], outputCols=[categoricalCol + ",classVec"])
    stages += [stringIndexer, encoder]

  # grabs numeric features - excluding our output
  numericCols = [i[0] for i in data.drop(label_col).dtypes if i[1]!='string']

  # transform all features into a single column called features using VectorAssembler
  assemblerInputs = [c + ",classVec" for c in categoricalColumns] + numericCols
  assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")
  stages += [assembler]

  # puts data through all the feature transformations
  partialPipeline = Pipeline().setStages(stages)
  pipelineModel = partialPipeline.fit(data)
  preppedDataDF = pipelineModel.transform(data)

  # rename
  dataset = preppedDataDF
  
  return dataset

In [0]:
# prepped dataset for train/test split
dataset_6m = data_pipeline(data_6m, "DEP_DEL15")

In [0]:
new_dataset = data_pipeline(new_data, "DEP_DEL15")

## Train/Test Split

In [0]:
# train test split
(trainingData_6m, testData_6m) = dataset_6m.randomSplit([0.8, 0.2], seed=2021)

In [0]:
# train test split - for undersampling
(trainingData, testData) = new_dataset.randomSplit([0.8, 0.2], seed=2021)

In [0]:
# train distribution
print('train')
print(trainingData.filter(f.col('label')==0).count()/trainingData.count())
print(trainingData.filter(f.col('label')==1).count()/trainingData.count())

# test distribution
print('test')
print(testData.filter(f.col('label')==0).count()/testData.count())
print(testData.filter(f.col('label')==1).count()/testData.count())

## Decision Tree

##### Train

In [0]:
# create initial Decision Tree Model
dt = DecisionTreeClassifier(labelCol="label", featuresCol="features", maxDepth=3)

In [0]:
# create ParamGrid for Cross Validation
paramGrid = (ParamGridBuilder()
             .addGrid(dt.maxDepth, [5, 10])
             .addGrid(dt.maxBins, [20, 40])
             .build())

# create 5-fold CrossValidator
evaluator = MulticlassClassificationEvaluator(metricName='f1')
cv = CrossValidator(estimator=dt, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=5)
 
# run cross validations - 2 minutes
dt_cvModel = cv.fit(trainingData_6m)

# tree
print()
print("numNodes = ", dt_cvModel.bestModel.numNodes)
print("depth = ", dt_cvModel.bestModel.depth)

In [0]:
# scores during training
cv_scores(dt_cvModel)

Unnamed: 0,f1,maxDepth,maxBins
0,0.60652,5,20
1,0.612756,5,40
2,0.621208,10,20
3,0.62178,10,40


##### Score

In [0]:
# use test set to measure the accuracy of the model on new data
dtPred = dt_cvModel.bestModel.transform(testData_6m)

# evaluate predictions
dtScore = eval_metrics(dtPred)
print(dtScore)

# classification report
print(classification_report(testData_6m.select(f.col('label')).toPandas(), dtPred.select(f.col('prediction')).toPandas()))

##### Feature Importance

In [0]:
# get feature importance with helper function
ExtractFeatureImp(dt_cvModel.bestModel.featureImportances, new_dataset, "features").head(10)

Unnamed: 0,idx,name,score
2,774,CRS_DEP_TIME,0.611667
15,0,"OP_CARRIER,classVec_WN",0.052149
0,772,MONTH,0.04509
13,785,CLOUD_BASE_HEIGHT,0.04401
9,781,DEW_TEMP,0.037325
11,783,PRECIPITATION,0.029255
3,775,CRS_ARR_TIME,0.024279
30,15,"OP_CARRIER,classVec_HA",0.022387
21,6,"OP_CARRIER,classVec_B6",0.019799
17,2,"OP_CARRIER,classVec_DL",0.019392


## Random Forest

##### Train

In [0]:
# create an initial RandomForest model.
rf = RandomForestClassifier(labelCol="label", featuresCol="features")

In [0]:
# create ParamGrid for Cross Validation
paramGrid = (ParamGridBuilder()
             .addGrid(rf.maxDepth, [5, 10])
             .addGrid(rf.maxBins, [20, 80])
             .addGrid(rf.numTrees, [10, 20, 30])
             .build())

# create 5-fold CrossValidator
evaluator = MulticlassClassificationEvaluator(metricName='f1')
rf_cv = CrossValidator(estimator=rf, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=5)
 
# run cross validations - 10 minutes
rf_cvModel = rf_cv.fit(trainingData)

# RF
print()
print("num trees = ", rf_cvModel.bestModel.getNumTrees)
print("maxdepth = ", rf_cvModel.bestModel.getOrDefault('maxDepth'))
print("maxbins = ", rf_cvModel.bestModel.getMaxBins())

In [0]:
# scores during training
cv_scores(rf_cvModel)

Unnamed: 0,f1,maxDepth,maxBins,numTrees
0,0.581921,5,20,10
1,0.574647,5,20,20
2,0.582041,5,20,30
3,0.590175,5,80,10
4,0.578191,5,80,20
5,0.578348,5,80,30
6,0.600207,10,20,10
7,0.604202,10,20,20
8,0.603553,10,20,30
9,0.603217,10,80,10


##### Score

In [0]:
# make predictions on test data 
rfPred = rf_cvModel.bestModel.transform(testData)

# evaluate predictions
rfScore = eval_metrics(rfPred)
print(rfScore)

# classification report
print(classification_report(testData.select(f.col('label')).toPandas(), rfPred.select(f.col('prediction')).toPandas()))

##### Feature Importance

In [0]:
# get feature importance with helper function
ExtractFeatureImp(rf_cvModel.bestModel.featureImportances, new_dataset, "features").head(10)

Unnamed: 0,idx,name,score
3,775,CRS_ARR_TIME,0.201791
2,774,CRS_DEP_TIME,0.163993
13,785,CLOUD_BASE_HEIGHT,0.078594
783,768,"DEP_TIME_BLK,classVec_0600-0659",0.068557
781,766,"DEP_TIME_BLK,classVec_0700-0759",0.062517
8,780,TMP_TEMP,0.032765
11,783,PRECIPITATION,0.023791
12,784,CLOUD_COVERAGE,0.023555
770,755,"DEP_TIME_BLK,classVec_1800-1859",0.022324
9,781,DEW_TEMP,0.022269


## Gradient Boosted Tree

##### Train

In [0]:
# create an initial RandomForest model.
gb = GBTClassifier(labelCol="label", featuresCol="features", seed=2021)

In [0]:
# create ParamGrid for Cross Validation
paramGrid = (ParamGridBuilder()
             .addGrid(gb.maxDepth, [5, 10])
             .addGrid(gb.maxBins, [20, 80])
             .addGrid(gb.maxIter, [10, 20, 50])
             .build())

# create 5-fold CrossValidator
evaluator = MulticlassClassificationEvaluator(metricName='f1')
gb_cv = CrossValidator(estimator=gb, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=5)
 
# run cross validations - 5hrs
gb_cvModel = gb_cv.fit(trainingData)

# GBT
print()
print("num trees = ", gb_cvModel.bestModel.getMaxIter())
print("maxdepth = ", gb_cvModel.bestModel.getMaxDepth())

##### Score

In [0]:
# make predictions on test data
gbPred = gb_cvModel.bestModel.transform(testData)

# evaluate predictions
gbScore = eval_metrics(gbPred)
print(gbScore)

# classification report
print(classification_report(testData.select(f.col('label')).toPandas(), gbPred.select(f.col('prediction')).toPandas()))

## Save models

In [0]:
# save models
dt_cvModel.bestModel.write().overwrite().save("dbfs:/tmp/out/sl_dt_cv")
rf_cvModel.bestModel.write().overwrite().save("dbfs:/tmp/out/sl_rf_cv")
gb_cvModel.bestModel.write().overwrite().save("dbfs:/tmp/out/sl_gb_cv")