In [None]:
#One worker per core 
import findspark
findspark.init()
from pyspark import SparkContext
from pyspark.sql.session import SparkSession
from pyspark.sql.types import *

sc=SparkContext(master="local[2]")
spark = SparkSession(sc)

#from pyspark.sql.session import SparkSession
#sc = SparkContext.getOrCreate()
#spark = SparkSession(sc)
#sc.stop()
import pandas as pd 
import numpy as np 

from pyspark.sql.types import *
from pyspark.sql.functions import *

from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

In [2]:
csv=spark.read.csv("data/flights.csv", header=True, inferSchema=True)
data = csv.select("DayofMonth", "DayOfWeek", "OriginAirportID", "DestAirportID", "DepDelay", ((col("ArrDelay") > 15).cast("Int").alias("label")))
data.show(3)

+----------+---------+---------------+-------------+--------+-----+
|DayofMonth|DayOfWeek|OriginAirportID|DestAirportID|DepDelay|label|
+----------+---------+---------------+-------------+--------+-----+
|        19|        5|          11433|        13303|      -3|    0|
|        19|        5|          14869|        12478|       0|    0|
|        19|        5|          14057|        14869|      -4|    0|
+----------+---------+---------------+-------------+--------+-----+
only showing top 3 rows



In [3]:
# Split the data
splits = data.randomSplit([0.7, 0.3])
train = splits[0]
test = splits[1].withColumnRenamed("label", "trueLabel")

In [4]:
assembler = VectorAssembler(inputCols = ["DayofMonth", "DayOfWeek", "OriginAirportID", "DestAirportID", "DepDelay"], outputCol="features")
lr = LogisticRegression(labelCol="label",featuresCol="features",maxIter=10,regParam=0.3)
pipeline = Pipeline(stages=[assembler, lr])
model = pipeline.fit(train)

In [5]:
#Prediction for test set 
prediction = model.transform(test)
predicted = prediction.select("features", "prediction", "trueLabel")
predicted.show(100, truncate=False)

+-------------------------------+----------+---------+
|features                       |prediction|trueLabel|
+-------------------------------+----------+---------+
|[1.0,1.0,10140.0,11259.0,-5.0] |0.0       |0        |
|[1.0,1.0,10140.0,11259.0,-5.0] |0.0       |0        |
|[1.0,1.0,10140.0,11259.0,0.0]  |0.0       |0        |
|[1.0,1.0,10140.0,11259.0,12.0] |0.0       |0        |
|[1.0,1.0,10140.0,11292.0,2.0]  |0.0       |0        |
|[1.0,1.0,10140.0,11292.0,3.0]  |0.0       |0        |
|[1.0,1.0,10140.0,11292.0,41.0] |0.0       |1        |
|[1.0,1.0,10140.0,11298.0,-10.0]|0.0       |0        |
|[1.0,1.0,10140.0,11298.0,-6.0] |0.0       |0        |
|[1.0,1.0,10140.0,11298.0,-5.0] |0.0       |0        |
|[1.0,1.0,10140.0,11298.0,-1.0] |0.0       |0        |
|[1.0,1.0,10140.0,11298.0,34.0] |0.0       |1        |
|[1.0,1.0,10140.0,12191.0,-3.0] |0.0       |0        |
|[1.0,1.0,10140.0,12191.0,1.0]  |0.0       |0        |
|[1.0,1.0,10140.0,12264.0,2.0]  |0.0       |0        |
|[1.0,1.0,

In [6]:
#Confusion matrix without treshold (default treshild ==0.5)
tp = float(predicted.filter("prediction == 1.0 AND truelabel == 1").count())
fp = float(predicted.filter("prediction == 1.0 AND truelabel == 0").count())
tn = float(predicted.filter("prediction == 0.0 AND truelabel == 0").count())
fn = float(predicted.filter("prediction == 0.0 AND truelabel == 1").count())
metrics = spark.createDataFrame([
 ("TP", tp),
 ("FP", fp),
 ("TN", tn),
 ("FN", fn),
 ("Precision", tp / (tp + fp)),
 ("Recall", tp / (tp + fn))],["metric", "value"])
metrics.show()

+---------+-------------------+
|   metric|              value|
+---------+-------------------+
|       TP|            19333.0|
|       FP|               79.0|
|       TN|           649969.0|
|       FN|           142555.0|
|Precision| 0.9959303523593653|
|   Recall|0.11942206957896817|
+---------+-------------------+



In [7]:
prediction.select("rawPrediction", "probability", "prediction", "trueLabel").show(100, truncate=False)

+----------------------------------------+----------------------------------------+----------+---------+
|rawPrediction                           |probability                             |prediction|trueLabel|
+----------------------------------------+----------------------------------------+----------+---------+
|[1.6119207104679714,-1.6119207104679714]|[0.833677881069122,0.16632211893087803] |0.0       |0        |
|[1.6119207104679714,-1.6119207104679714]|[0.833677881069122,0.16632211893087803] |0.0       |0        |
|[1.5419538951770808,-1.5419538951770808]|[0.8237485845854853,0.17625141541451467]|0.0       |0        |
|[1.3740335384789437,-1.3740335384789437]|[0.7980310507618131,0.20196894923818684]|0.0       |0        |
|[1.5140748524212153,-1.5140748524212153]|[0.819664314806139,0.1803356851938611]  |0.0       |0        |
|[1.5000814893630372,-1.5000814893630372]|[0.8175866297284948,0.18241337027150523]|0.0       |0        |
|[0.9683336931522695,-0.9683336931522695]|[0.7247872429

In [11]:
#AUC calculation
evaluator = BinaryClassificationEvaluator(labelCol="trueLabel", rawPredictionCol="rawPrediction", metricName="areaUnderROC")
aur = evaluator.evaluate(prediction)
print("AUR = ", aur)

AUR =  0.9232296859342777


In [12]:
#change treshold value 
lr2 = LogisticRegression(labelCol="label",featuresCol="features",maxIter=10,regParam=0.3, threshold=0.35)
pipeline2 = Pipeline(stages=[assembler, lr2])
model2 = pipeline2.fit(train)
newPrediction = model2.transform(test)
newPrediction.select("rawPrediction", "probability", "prediction", "trueLabel").show(100, truncate=False)

+----------------------------------------+----------------------------------------+----------+---------+
|rawPrediction                           |probability                             |prediction|trueLabel|
+----------------------------------------+----------------------------------------+----------+---------+
|[1.6119207104679714,-1.6119207104679714]|[0.833677881069122,0.16632211893087803] |0.0       |0        |
|[1.6119207104679714,-1.6119207104679714]|[0.833677881069122,0.16632211893087803] |0.0       |0        |
|[1.5419538951770808,-1.5419538951770808]|[0.8237485845854853,0.17625141541451467]|0.0       |0        |
|[1.3740335384789437,-1.3740335384789437]|[0.7980310507618131,0.20196894923818684]|0.0       |0        |
|[1.5140748524212153,-1.5140748524212153]|[0.819664314806139,0.1803356851938611]  |0.0       |0        |
|[1.5000814893630372,-1.5000814893630372]|[0.8175866297284948,0.18241337027150523]|0.0       |0        |
|[0.9683336931522695,-0.9683336931522695]|[0.7247872429

In [13]:
# Recalculate confusion matrix(treshold ==0.35)
tp2 = float(newPrediction.filter("prediction == 1.0 AND truelabel == 1").count())
fp2 = float(newPrediction.filter("prediction == 1.0 AND truelabel == 0").count())
tn2 = float(newPrediction.filter("prediction == 0.0 AND truelabel == 0").count())
fn2 = float(newPrediction.filter("prediction == 0.0 AND truelabel == 1").count())
metrics2 = spark.createDataFrame([
 ("TP", tp2),
 ("FP", fp2),
 ("TN", tn2),
 ("FN", fn2),
 ("Precision", tp2 / (tp2 + fp2)),
 ("Recall", tp2 / (tp2 + fn2))],["metric", "value"])
metrics2.show()

+---------+-------------------+
|   metric|              value|
+---------+-------------------+
|       TP|            42049.0|
|       FP|              124.0|
|       TN|           649924.0|
|       FN|           119839.0|
|Precision| 0.9970597301591065|
|   Recall|0.25974130262897804|
+---------+-------------------+



In [17]:
#Parameter tunning - Normal method    
paramGrid = ParamGridBuilder().addGrid(lr.regParam, [0.3, 0.1, 0.01]).addGrid(lr.maxIter, [10, 5]).addGrid(lr.threshold, [0.35, 0.30]).build()
tvs = TrainValidationSplit(estimator=pipeline, evaluator=BinaryClassificationEvaluator(), estimatorParamMaps=paramGrid, trainRatio=0.8)
model = tvs.fit(train)

In [23]:
#Parameter tunning - CV method     
paramGrid = ParamGridBuilder().addGrid(lr.regParam, [0.3, 0.01]).addGrid(lr.maxIter, [10, 5]).build()
cv = CrossValidator(estimator=pipeline, evaluator=BinaryClassificationEvaluator(), estimatorParamMaps=paramGrid, numFolds=2)
model = cv.fit(train) 