In [1]:
# Import Libraries
import findspark
findspark.init()

import pandas as pd
from pyspark.ml.classification import RandomForestClassifier, NaiveBayes, GBTClassifier, LinearSVC, LogisticRegression
from pyspark.ml.feature import StringIndexer
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

In [2]:
def testing_metrics(bestModel, testData):
    lrPredictions = bestModel.transform(testData)
    eval_accuracy = MulticlassClassificationEvaluator(metricName="accuracy")
    eval_precision = MulticlassClassificationEvaluator(metricName="precisionByLabel")
    eval_recall = MulticlassClassificationEvaluator(metricName="recallByLabel")
    eval_f1 = MulticlassClassificationEvaluator(metricName="f1")

    accuracy = eval_accuracy.evaluate(lrPredictions)
    precision = eval_precision.evaluate(lrPredictions)
    recall = eval_recall.evaluate(lrPredictions)
    f1score = eval_f1.evaluate(lrPredictions)

    print(f"Testing Metrics\nAccuracy: {accuracy}\nPrecision: {precision}\nRecall: {recall}\nF1 Score: {f1score}")

In [3]:
def training_metrics(bestModel, trainingData):
    lrPredictions = bestModel.transform(trainingData)
    eval_accuracy = MulticlassClassificationEvaluator(metricName="accuracy")
    eval_precision = MulticlassClassificationEvaluator(metricName="precisionByLabel")
    eval_recall = MulticlassClassificationEvaluator(metricName="recallByLabel")
    eval_f1 = MulticlassClassificationEvaluator(metricName="f1")

    accuracy = eval_accuracy.evaluate(lrPredictions)
    precision = eval_precision.evaluate(lrPredictions)
    recall = eval_recall.evaluate(lrPredictions)
    f1score = eval_f1.evaluate(lrPredictions)

    print(f"Training Metrics\nAccuracy: {accuracy}\nPrecision: {precision}\nRecall: {recall}\nF1 Score: {f1score}\n")

In [4]:
def ExtractFeatureImp(featureImp, dataset, featuresCol):
    list_extract = []
    for i in dataset.schema[featuresCol].metadata["ml_attr"]["attrs"]:
        list_extract = list_extract + dataset.schema[featuresCol].metadata["ml_attr"]["attrs"][i]
    varlist = pd.DataFrame(list_extract)
    varlist['score'] = varlist['idx'].apply(lambda x: featureImp[x])
    return(varlist.sort_values('score', ascending = False))

# ExtractFeatureImp(featureImportances, trainingData, "features")


In [5]:
spark = SparkSession.builder \
    .appName("Airline Delay") \
    .config("spark.sql.execution.arrow.pyspark.enabled", "true") \
    .config("spark.dynamicAllocation.enabled", "true") \
    .config('spark.driver.memory', '30g') \
    .config('spark.executor.memory', '30g') \
    .getOrCreate()
spark.sparkContext.setLogLevel('ERROR')

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


22/11/29 12:22:37 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [7]:

sdf = spark.read.csv('/mydata/wss-project-files/flight-delay-model/data/clean/clean_full.csv', inferSchema=True, header=True)

                                                                                

In [8]:
merge_sdf = spark.read.csv('/mydata/wss-project-files/flight-delay-model/data/clean/clean_merge_full.csv', inferSchema=True, header=True)

                                                                                

In [11]:
try:
    sdf.drop('_c0')
    merge_sdf.drop('_c0')
except:
    pass

In [15]:
# Index labels, adding metadata to the label column.
# Fit on whole dataset to include all labels in index.
labelIndexer = StringIndexer(inputCol="label", outputCol="indexedLabel").fit(sdf)

assembler = VectorAssembler(
    inputCols=sdf.columns[:-1], outputCol="features")

final_df = assembler.transform(sdf)

# Split the data into training and test sets (30% held out for testing)
(trainingData, testData) = final_df.randomSplit([0.7, 0.3])

                                                                                

In [16]:
# Index labels, adding metadata to the label column.
# Fit on whole dataset to include all labels in index.
merge_labelIndexer = StringIndexer(inputCol="label", outputCol="indexedLabel").fit(merge_sdf)

merge_assembler = VectorAssembler(
    inputCols=merge_sdf.columns[:-1], outputCol="features")

merge_final_df = merge_assembler.transform(merge_sdf)

# Split the data into training and test sets (30% held out for testing)
(merge_trainingData, merge_testData) = merge_final_df.randomSplit([0.7, 0.3])

                                                                                

In [11]:
nb_classifier = NaiveBayes()
nb_paramGrid = (ParamGridBuilder()\
            .addGrid(nb_classifier.modelType, ['multinomial', 'gaussian'])
            .build())
nb_crossval = CrossValidator(estimator=nb_classifier,
                         estimatorParamMaps=nb_paramGrid,
                         evaluator=MulticlassClassificationEvaluator(),
                         numFolds=3)
nb_fitModel = nb_crossval.fit(trainingData)
nb_BestModel= nb_fitModel.bestModel
training_metrics(nb_BestModel, trainingData)
testing_metrics(nb_BestModel, testData)

                                                                                

Training Metrics
Accuracy: 0.5435598729649441
Precision: 0.6701962045264254
Recall: 0.5255765298607141
F1 Score: 0.5504483736860252





Testing Metrics
Accuracy: 0.5449431060845437
Precision: 0.670681275106715
Recall: 0.5288123333936081
F1 Score: 0.5518394817252527


                                                                                

In [12]:
nb_BestModel._java_obj.getModelType()

'multinomial'

## Random Forest

In [18]:
rf_classifier = RandomForestClassifier()
paramGrid = (ParamGridBuilder()\
            .addGrid(rf_classifier.maxDepth, [2, 5, 10])
            .addGrid(rf_classifier.maxBins, [5, 10 , 20])
            .addGrid(rf_classifier.numTrees,[5, 20, 50])
            .build())
rf_crossval = CrossValidator(estimator=rf_classifier,
                         estimatorParamMaps=paramGrid,
                         evaluator=MulticlassClassificationEvaluator(),
                         numFolds=3)

In [19]:
rf_fitModel = rf_crossval.fit(trainingData)

                                                                                

In [20]:
rf_BestModel= rf_fitModel.bestModel
rf_featureImportances= rf_BestModel.featureImportances.toArray()
training_metrics(rf_BestModel, trainingData)
testing_metrics(rf_BestModel, testData)
print(ExtractFeatureImp(rf_featureImportances, trainingData, "features"))
print(f'Max Depth: {rf_BestModel._java_obj.getMaxDepth()}')
print(f'Max Bins: {rf_BestModel._java_obj.getMaxBins()}')
print(f'Num Trees: {rf_BestModel._java_obj.getNumTrees()}')

                                                                                

Training Metrics
Accuracy: 0.6473340849889195
Precision: 0.6506608242826687
Recall: 0.9366082531375007
F1 Score: 0.5786964066312357





Testing Metrics
Accuracy: 0.6472048156923647
Precision: 0.6506727039615691
Recall: 0.935091135963429
F1 Score: 0.5795859466296087
     idx                              name     score
6      6                        CRSDepTime  0.253428
51    51  DOT_ID_Reporting_Airline - 19805  0.123497
45    45  IATA_CODE_Reporting_Airline - WN  0.118791
40    40  IATA_CODE_Reporting_Airline - MQ  0.070471
2      2                             Month  0.052469
..   ...                               ...       ...
180  180                        Dest - LBB  0.000000
181  181                        Dest - LBE  0.000000
182  182                        Dest - LCH  0.000000
183  183                        Dest - LEX  0.000000
68    68                        Dest - ACT  0.000000

[299 rows x 3 columns]
Max Depth: 10
Max Bins: 20
Num Trees: 5


                                                                                

## Random Forest Merged

In [21]:
merge_rf_classifier = RandomForestClassifier()
merge_rf_paramGrid = (ParamGridBuilder()\
            .addGrid(merge_rf_classifier.maxDepth, [2, 5, 10])
            .addGrid(merge_rf_classifier.maxBins, [5, 10 , 20])
            .addGrid(merge_rf_classifier.numTrees,[5, 20, 50])
            .build())
merge_rf_crossval = CrossValidator(estimator=merge_rf_classifier,
                         estimatorParamMaps=merge_rf_paramGrid,
                         evaluator=MulticlassClassificationEvaluator(),
                         numFolds=3)

In [22]:
merge_rf_fitModel = merge_rf_crossval.fit(merge_trainingData)

                                                                                

In [23]:
merge_rf_BestModel= merge_rf_fitModel.bestModel
merge_rf_featureImportances= merge_rf_BestModel.featureImportances.toArray()
training_metrics(merge_rf_BestModel, merge_trainingData)
testing_metrics(merge_rf_BestModel, merge_testData)
print(ExtractFeatureImp(merge_rf_featureImportances, merge_trainingData, "features"))
print(f'Max Depth: {merge_rf_BestModel._java_obj.getMaxDepth()}')
print(f'Max Bins: {merge_rf_BestModel._java_obj.getMaxBins()}')
print(f'Max Bins: {merge_rf_BestModel._java_obj.getNumTrees()}')

                                                                                

Training Metrics
Accuracy: 0.6606027181249478
Precision: 0.6628219297212584
Recall: 0.9313232884122766
F1 Score: 0.6023238372127822



                                                                                

Testing Metrics
Accuracy: 0.6583738932925416
Precision: 0.6617791275189017
Recall: 0.9292943502113766
F1 Score: 0.5997096511915421
     idx                              name     score
6      6                        CRSDepTime  0.197723
51    51  DOT_ID_Reporting_Airline - 19805  0.083522
298  298                              PRCP  0.051362
48    48  DOT_ID_Reporting_Airline - 19393  0.040958
296  296                         ELEVATION  0.032215
..   ...                               ...       ...
176  176                        Dest - KTN  0.000000
181  181                        Dest - LBE  0.000000
184  184                        Dest - LFT  0.000000
187  187                        Dest - LIH  0.000000
165  165                        Dest - ILM  0.000000

[330 rows x 3 columns]
Max Depth: 10
Max Bins: 10
Max Bins: 5


## Gradient Boosted Tree

In [19]:
gbt_classifier = GBTClassifier()
gbt_paramGrid = (ParamGridBuilder()\
            .addGrid(gbt_classifier.maxDepth, [2, 5, 10])
            .addGrid(gbt_classifier.maxBins, [5, 10 , 20])
            .build())
gbt_crossval = CrossValidator(estimator=gbt_classifier,
                         estimatorParamMaps=gbt_paramGrid,
                         evaluator=MulticlassClassificationEvaluator(),
                         numFolds=3)

In [20]:
gbt_fitModel = gbt_crossval.fit(trainingData)

                                                                                

In [None]:
gbt_BestModel= gbt_fitModel.bestModel
gbt_featureImportances = gbt_BestModel.featureImportances.toArray()
training_metrics(gbt_BestModel, trainingData)
testing_metrics(gbt_BestModel, testData)
print(ExtractFeatureImp(gbt_featureImportances, trainingData, "features"))
print(f'Max Depth: {gbt_BestModel._java_obj.getMaxDepth()}')
print(f'Max Bins: {gbt_BestModel._java_obj.getMaxBins()}')

## Gradient Boosted Tree Merged

In [None]:
merge_gbt_classifier = GBTClassifier()
merge_gbt_paramGrid = (ParamGridBuilder()\
            .addGrid(merge_gbt_classifier.maxDepth, [2, 5, 10])
            .addGrid(merge_gbt_classifier.maxBins, [5, 10 , 20])
            .build())
merge_gbt_crossval = CrossValidator(estimator=merge_gbt_classifier,
                         estimatorParamMaps=merge_gbt_paramGrid,
                         evaluator=MulticlassClassificationEvaluator(),
                         numFolds=3)

In [None]:
merge_gbt_fitModel = merge_gbt_crossval.fit(merge_trainingData)

In [None]:
merge_gbt_BestModel= merge_gbt_fitModel.bestModel
merge_gbt_featureImportances= merge_gbt_BestModel.featureImportances.toArray()
training_metrics(merge_gbt_BestModel, merge_trainingData)
testing_metrics(merge_gbt_BestModel, merge_testData)
print(ExtractFeatureImp(merge_gbt_featureImportances, merge_trainingData, "features"))
print(f'Max Depth: {merge_gbt_BestModel._java_obj.getMaxDepth()}')
print(f'Max Bins: {merge_gbt_BestModel._java_obj.getMaxBins()}')

## Linear SVC

In [None]:
svc_classifier = LinearSVC()
svc_paramGrid = (ParamGridBuilder()\
            .addGrid(svc_classifier.aggregationDepth, [2, 5, 10])
            .addGrid(svc_classifier.maxIter, [50, 100, 200])
            .build())
svc_crossval = CrossValidator(estimator=svc_classifier,
                         estimatorParamMaps=svc_paramGrid,
                         evaluator=MulticlassClassificationEvaluator(),
                         numFolds=3)

In [None]:
svc_fitModel = svc_crossval.fit(trainingData)

In [None]:
svc_BestModel= svc_fitModel.bestModel
training_metrics(svc_BestModel, trainingData)
testing_metrics(svc_BestModel, testData)
print(f'Aggregation Depth: {svc_BestModel._java_obj.getAggregationDepth()}')
print(f'Max Iterations: {svc_BestModel._java_obj.getMaxIter()}')

## Linear SVC Merged

In [None]:
merge_svc_classifier = LinearSVC()
merge_svc_paramGrid = (ParamGridBuilder()\
            .addGrid(svc_classifier.aggregationDepth, [2, 5, 10])
            .addGrid(svc_classifier.maxIter, [10, 50, 100])
            .build())
merge_svc_crossval = CrossValidator(estimator=merge_svc_classifier,
                         estimatorParamMaps=merge_svc_paramGrid,
                         evaluator=MulticlassClassificationEvaluator(),
                         numFolds=3)

In [None]:
merge_svc_fitModel = merge_svc_crossval.fit(merge_trainingData)

In [None]:
merge_svc_BestModel= merge_svc_fitModel.bestModel
training_metrics(merge_svc_BestModel, merge_trainingData)
testing_metrics(merge_svc_BestModel, merge_testData)
print(f'Aggregation Depth: {merge_svc_BestModel._java_obj.getAggregationDepth()}')
print(f'Max Iterations: {merge_svc_BestModel._java_obj.getMaxIter()}')

## Logistic Regression

In [None]:
lr_classifier = LogisticRegression()
lr_paramGrid = (ParamGridBuilder()\
            .addGrid(lr_classifier.regParam, [0.01, 0.5, 2.0])
            .addGrid(lr_classifier.maxIter, [10, 50, 100])
            .addGrid(lr_classifier.elasticNetParam, [0.0, 0.5, 1.0])
            .build())
lr_crossval = CrossValidator(estimator=lr_classifier,
                         estimatorParamMaps=lr_paramGrid,
                         evaluator=MulticlassClassificationEvaluator(),
                         numFolds=3)

In [None]:
lr_fitModel = lr_crossval.fit(trainingData)

In [None]:
lr_BestModel= lr_fitModel.bestModel
training_metrics(lr_BestModel, trainingData)
testing_metrics(lr_BestModel, testData)
print(f'Reg Param: {lr_BestModel._java_obj.getRegParam()}')
print(f'Max Iterations: {lr_BestModel._java_obj.getMaxIter()}')
print(f'Elastic Net Param: {lr_BestModel._java_obj.getElasticNetParam()}')

## Logistic Regression Merged

In [None]:
merge_lr_classifier = LogisticRegression()
merge_lr_paramGrid = (ParamGridBuilder()\
            .addGrid(merge_lr_classifier.regParam, [0.01, 0.5, 2.0])
            .addGrid(merge_lr_classifier.maxIter, [10, 50, 100])
            .addGrid(merge_lr_classifier.elasticNetParam, [0.0, 0.5, 1.0])
            .build())
merge_lr_crossval = CrossValidator(estimator=merge_lr_classifier,
                         estimatorParamMaps=merge_lr_paramGrid,
                         evaluator=MulticlassClassificationEvaluator(),
                         numFolds=3)

In [None]:
merge_lr_fitModel = merge_lr_crossval.fit(merge_trainingData)

In [None]:
merge_lr_BestModel= merge_lr_fitModel.bestModel
training_metrics(merge_lr_BestModel, merge_trainingData)
testing_metrics(merge_lr_BestModel, merge_testData)
print(f'Reg Param: {merge_lr_BestModel._java_obj.getRegParam()}')
print(f'Max Iterations: {merge_lr_BestModel._java_obj.getMaxIter()}')
print(f'Elastic Net Param: {merge_lr_BestModel._java_obj.getElasticNetParam()}')