# Flight Analysis with Spark and ML

### Steps
- we will train a LogisticRegression model with a Pipleline preparing the data, 
- a CrossValidator to tune the parameters of the model, and 
- a BinaryClassificationEvaluator to evaluate our trained model

In [1]:
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql import SparkSession

from pyspark.ml import Pipeline
#from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.feature import VectorAssembler, StringIndexer, VectorIndexer, MinMaxScaler
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import BinaryClassificationEvaluator

In [2]:
spark = SparkSession.builder.master("local[*]").getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/09/19 19:29:32 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
23/09/19 19:29:39 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


### Loading the data

In [3]:
csv = spark.read.csv('flights.csv', inferSchema=True, header=True)
csv.show(3)

                                                                                

+----------+---------+-------+---------------+-------------+--------+--------+
|DayofMonth|DayOfWeek|Carrier|OriginAirportID|DestAirportID|DepDelay|ArrDelay|
+----------+---------+-------+---------------+-------------+--------+--------+
|        19|        5|     DL|          11433|        13303|      -3|       1|
|        19|        5|     DL|          14869|        12478|       0|      -8|
|        19|        5|     DL|          14057|        14869|      -4|     -15|
+----------+---------+-------+---------------+-------------+--------+--------+
only showing top 3 rows



### Prepare the Data for a Classification Model (Decision Tree Learning Model)

I select a subset of columns to use as features and create a Boolean label field named label with values 1 or 0. Specifically, 1 for flight that arrived late, 0 for flight was early or on-time.

In [5]:
data = csv.select("DayofMonth", "DayOfWeek", "Carrier", "OriginAirportID", "DestAirportID", "DepDelay", ((col("ArrDelay") > 15).cast("Int").alias("label")))
data.show(3)

+----------+---------+-------+---------------+-------------+--------+-----+
|DayofMonth|DayOfWeek|Carrier|OriginAirportID|DestAirportID|DepDelay|label|
+----------+---------+-------+---------------+-------------+--------+-----+
|        19|        5|     DL|          11433|        13303|      -3|    0|
|        19|        5|     DL|          14869|        12478|       0|    0|
|        19|        5|     DL|          14057|        14869|      -4|    0|
+----------+---------+-------+---------------+-------------+--------+-----+
only showing top 3 rows



### Split the Data

I will use 70% of the data for training, and reserve 30% for testing. In the testing data, the label column is renamed to trueLabel so I can use it later to compare predicted labels with known actual values.

In [6]:
splits = data.randomSplit([0.7, 0.3])
train = splits[0]
test = splits[1].withColumnRenamed("label", "trueLabel")
train_rows = train.count()
test_rows = test.count()
print("Training Rows:", train_rows, " Testing Rows:", test_rows)



Training Rows: 1890737  Testing Rows: 811481


                                                                                

In [7]:
train.show(3)

[Stage 10:>                                                         (0 + 1) / 1]

+----------+---------+-------+---------------+-------------+--------+-----+
|DayofMonth|DayOfWeek|Carrier|OriginAirportID|DestAirportID|DepDelay|label|
+----------+---------+-------+---------------+-------------+--------+-----+
|         1|        1|     9E|          10397|        10693|      -2|    0|
|         1|        1|     9E|          10397|        12264|      -5|    0|
|         1|        1|     9E|          10397|        12264|      -3|    0|
|         1|        1|     9E|          10397|        13851|      -2|    0|
|         1|        1|     9E|          10423|        11433|      -5|    0|
|         1|        1|     9E|          10423|        13244|      -6|    0|
|         1|        1|     9E|          10423|        13487|      -7|    0|
|         1|        1|     9E|          10423|        14869|     -10|    0|
|         1|        1|     9E|          10423|        14869|      34|    1|
|         1|        1|     9E|          10529|        11193|      -5|    0|
|         1|

                                                                                

Define the Pipeline

A pipeline consists of a series of transformer and estimator stages that typically prepare a DataFrame for modeling and then train a predictive model. In this case, you will create a pipeline with seven stages:

* A StringIndexer estimator that converts string values to indexes for categorical features

* A VectorAssembler that combines categorical features into a single vector

* A VectorIndexer that creates indexes for a vector of categorical features

* A VectorAssembler that creates a vector of continuous numeric features

* A MinMaxScaler that normalizes continuous numeric features

* A VectorAssembler that creates a vector of categorical and continuous features

* A DecisionTreeClassifier that trains a classification model.


In [8]:
strIdx = StringIndexer(inputCol = "Carrier", 
                        outputCol = "CarrierIdx")

catVect = VectorAssembler(inputCols = ["CarrierIdx", "DayofMonth", "DayOfWeek", "OriginAirportID", "DestAirportID"], 
                          outputCol="catFeatures")

catIdx = VectorIndexer( inputCol = catVect.getOutputCol(), 
                        outputCol = "idxCatFeatures")

numVect = VectorAssembler(inputCols = ["DepDelay"], 
                          outputCol="numFeatures")

minMax = MinMaxScaler(inputCol = numVect.getOutputCol(), 
                      outputCol="normFeatures")

featVect = VectorAssembler(inputCols=["idxCatFeatures", "normFeatures"], 
                            outputCol="features")

lr = LogisticRegression(labelCol="label",featuresCol="features",maxIter=10,regParam=0.3)
#dt = DecisionTreeClassifier(labelCol="label", featuresCol="features")

In [9]:
pipeline = Pipeline(stages=[strIdx, catVect, catIdx, numVect, minMax, featVect, lr])

Run the Pipeline to train a model

Run the pipeline as an Estimator on the training data to train a model.

In [10]:
piplineModel = pipeline.fit(train)

[Stage 18:>                                                         (0 + 4) / 4]

23/09/18 19:52:51 WARN InstanceBuilder$NativeBLAS: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
23/09/18 19:52:51 WARN InstanceBuilder$NativeBLAS: Failed to load implementation from:dev.ludovic.netlib.blas.ForeignLinkerBLAS


                                                                                

Generate label predictions

Transform the test data with all of the stages and the trained model in the pipeline to generate label predictions.

In [11]:
prediction = piplineModel.transform(test)
predicted = prediction.select("features", "prediction", "trueLabel")
predicted.show(100, truncate=False)

[Stage 27:>                                                         (0 + 1) / 1]

+---------------------------------------------------+----------+---------+
|features                                           |prediction|trueLabel|
+---------------------------------------------------+----------+---------+
|[10.0,1.0,0.0,10397.0,12191.0,0.03167185877466251] |0.0       |0        |
|[10.0,1.0,0.0,10397.0,12264.0,0.030114226375908618]|0.0       |0        |
|[10.0,1.0,0.0,10397.0,12264.0,0.04205607476635514] |0.0       |0        |
|[10.0,1.0,0.0,10397.0,12264.0,0.04465212876427829] |0.0       |0        |
|[10.0,1.0,0.0,10423.0,11433.0,0.030114226375908618]|0.0       |0        |
|[10.0,1.0,0.0,10423.0,13487.0,0.029075804776739357]|0.0       |0        |
|[10.0,1.0,0.0,10529.0,11193.0,0.030114226375908618]|0.0       |0        |
|[10.0,1.0,0.0,10529.0,11193.0,0.030114226375908618]|0.0       |0        |
|[10.0,1.0,0.0,10529.0,11433.0,0.030114226375908618]|0.0       |0        |
|[10.0,1.0,0.0,10529.0,11433.0,0.030114226375908618]|0.0       |0        |
|[10.0,1.0,0.0,10693.0,11

                                                                                

#### Evaluating a Classification Model
We'll calculate a Confusion Matrix and the Area Under ROC (Receiver Operating Characteristic) to evaluate the model.

In [12]:
tp = float(predicted.filter("prediction == 1.0 AND truelabel == 1").count())
fp = float(predicted.filter("prediction == 1.0 AND truelabel == 0").count())
tn = float(predicted.filter("prediction == 0.0 AND truelabel == 0").count())
fn = float(predicted.filter("prediction == 0.0 AND truelabel == 1").count())
pr = tp / (tp + fp)
re = tp / (tp + fn)
metrics = spark.createDataFrame([ ("TP", tp),
                                  ("FP", fp),
                                  ("TN", tn),
                                  ("FN", fn),
                                  ("Precision", pr),
                                  ("Recall", re),
                                  ("F1", 2*pr*re/(re+pr))],
                                  
                                  ["metric", "value"]
                                )
metrics.show()

[Stage 40:>                                                         (0 + 1) / 1]

+---------+-------------------+
|   metric|              value|
+---------+-------------------+
|       TP|            19267.0|
|       FP|               77.0|
|       TN|           649692.0|
|       FN|           141797.0|
|Precision| 0.9960194375516956|
|   Recall| 0.1196232553519098|
|       F1|0.21359363221143188|
+---------+-------------------+



                                                                                

Review the Area Under ROC

Another way to assess the performance of a classification model is to measure the area under a ROC (Receiver Operating Characteristic) curve for the model. the spark.ml library includes a BinaryClassificationEvaluator class that we can use to compute this. The ROC curve shows the True Positive and False Positive rates plotted for varying thresholds.

In [14]:
evaluator = BinaryClassificationEvaluator(labelCol="trueLabel", rawPredictionCol="rawPrediction", metricName="areaUnderROC")
aur = evaluator.evaluate(prediction)
print ("AUR = ", aur)

[Stage 52:>                                                         (0 + 4) / 4]

AUR =  0.9234766455007042


                                                                                

View the Raw Prediction and Probability

The prediction is based on a raw prediction score that describes a labelled point in a logistic function. This raw prediction is then converted to a predicted label of 0 or 1 based on a probability vector that indicates the confidence for each possible label value (in this case, 0 and 1). The value with the highest confidence is selected as the prediction.

In [15]:
prediction.select("rawPrediction", "probability", "prediction", "trueLabel").show(100, truncate=False)

[Stage 53:>                                                         (0 + 1) / 1]

+----------------------------------------+----------------------------------------+----------+---------+
|rawPrediction                           |probability                             |prediction|trueLabel|
+----------------------------------------+----------------------------------------+----------+---------+
|[1.5845250832649063,-1.5845250832649063]|[0.8298444257652681,0.17015557423473193]|0.0       |0        |
|[1.6265469242135246,-1.6265469242135246]|[0.8356960527373434,0.16430394726265662]|0.0       |0        |
|[1.3047439961546696,-1.3047439961546696]|[0.78663230716005,0.21336769283995005]  |0.0       |0        |
|[1.2347868378810056,-1.2347868378810056]|[0.774655284560153,0.22534471543984702] |0.0       |0        |
|[1.6262266984653366,-1.6262266984653366]|[0.8356520784022183,0.16434792159778167]|0.0       |0        |
|[1.655547362487114,-1.655547362487114]  |[0.8396393835574494,0.16036061644255062]|0.0       |0        |
|[1.6269714517853375,-1.6269714517853375]|[0.8357543355

                                                                                

> Note that the results include rows where the probability for 0 (the first value in the probability vector) is only slightly higher than the probability for 1 (the second value in the probability vector). The default discrimination threshold (the boundary that decides whether a probability is predicted as a 1 or a 0) is set to 0.5; so the prediction with the highest probability is always used, no matter how close to the threshold.
> And we can see from the results above that for those truelabel 1s that we predicted 0s, many of them the problibilty of 1 is just slightly less than the threshold 0.5.

#### Tune Parameters
To find the best performing parameters, we can use the CrossValidator class to evaluate each combination of parameters defined in a ParameterGrid against multiple folds of the data split into training and validation datasets. Note that this can take a long time to run because every parameter combination is tried multiple times.

#### Change the Discrimination Threshold

The AUC score seems to indicate a reasonably good model, but the performance metrics seem to indicate that it predicts a high number of False Negative labels (i.e. it predicts 0 when the true label is 1), leading to a low Recall. We can improve this by lowering the threshold. Conversely, sometimes we may want to address a large number of False Positive by raising the threshold.

In this case, I'll let the CrossValidator find the best threshold from 0.45, 0.4 and 0.35, regularization parameter from 0.3 and 0.1, and the maximum number of iterations allowed from 10 and 5.

In [16]:
paramGrid = ParamGridBuilder().addGrid(lr.regParam, [0.3, 0.1]).addGrid(lr.maxIter, [10, 5]).addGrid(lr.threshold, 
                                                                                            [0.4, 0.3]).build()
cv = CrossValidator(estimator=pipeline, evaluator=BinaryClassificationEvaluator(), 
                    estimatorParamMaps=paramGrid, numFolds=2)

model = cv.fit(train)

                                                                                

In [17]:
newPrediction = model.transform(test)
newPredicted = prediction.select("features", "prediction", "trueLabel")
newPredicted.show()

[Stage 488:>                                                        (0 + 1) / 1]

+--------------------+----------+---------+
|            features|prediction|trueLabel|
+--------------------+----------+---------+
|[10.0,1.0,0.0,103...|       0.0|        0|
|[10.0,1.0,0.0,103...|       0.0|        0|
|[10.0,1.0,0.0,103...|       0.0|        0|
|[10.0,1.0,0.0,103...|       0.0|        0|
|[10.0,1.0,0.0,104...|       0.0|        0|
|[10.0,1.0,0.0,104...|       0.0|        0|
|[10.0,1.0,0.0,105...|       0.0|        0|
|[10.0,1.0,0.0,105...|       0.0|        0|
|[10.0,1.0,0.0,105...|       0.0|        0|
|[10.0,1.0,0.0,105...|       0.0|        0|
|[10.0,1.0,0.0,106...|       0.0|        0|
|[10.0,1.0,0.0,107...|       0.0|        1|
|[10.0,1.0,0.0,107...|       0.0|        0|
|[10.0,1.0,0.0,107...|       0.0|        1|
|[10.0,1.0,0.0,107...|       0.0|        0|
|[10.0,1.0,0.0,108...|       0.0|        0|
|[10.0,1.0,0.0,108...|       0.0|        0|
|[10.0,1.0,0.0,110...|       0.0|        0|
|[10.0,1.0,0.0,110...|       0.0|        0|
|[10.0,1.0,0.0,110...|       0.0

                                                                                

In [18]:
# Recalculate confusion matrix
tp2 = float(newPrediction.filter("prediction == 1.0 AND truelabel == 1").count())
fp2 = float(newPrediction.filter("prediction == 1.0 AND truelabel == 0").count())
tn2 = float(newPrediction.filter("prediction == 0.0 AND truelabel == 0").count())
fn2 = float(newPrediction.filter("prediction == 0.0 AND truelabel == 1").count())
pr2 = tp2 / (tp2 + fp2)
re2 = tp2 / (tp2 + fn2)
metrics2 = spark.createDataFrame([
 ("TP", tp2),
 ("FP", fp2),
 ("TN", tn2),
 ("FN", fn2),
 ("Precision", pr2),
 ("Recall", re2),
 ("F1", 2*pr2*re2/(re2+pr2))],["metric", "value"])
metrics2.show()



+---------+------------------+
|   metric|             value|
+---------+------------------+
|       TP|           88146.0|
|       FP|            1650.0|
|       TN|          648119.0|
|       FN|           72918.0|
|Precision|0.9816250167045303|
|   Recall|0.5472731336611534|
|       F1|0.7027505381487682|
+---------+------------------+



                                                                                

In [19]:
# Recalculate the Area Under ROC
evaluator2 = BinaryClassificationEvaluator(labelCol="trueLabel", rawPredictionCol="prediction", metricName="areaUnderROC")
aur2 = evaluator.evaluate(prediction)
print( "AUR2 = ", aur2)



AUR2 =  0.9234771719238939


                                                                                