In [None]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://www-us.apache.org/dist/spark/spark-2.4.7/spark-2.4.7-bin-hadoop2.7.tgz

!tar xf spark-2.4.7-bin-hadoop2.7.tgz
!pip install -q findspark

In [None]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.7-bin-hadoop2.7"

In [None]:
import findspark
findspark.init()

In [None]:
import pyspark
sc = pyspark.SparkContext(appName="yourAppName")

In [None]:
 from google.colab import files

files.upload()

Saving flights.csv to flights.csv


In [None]:
 from google.colab import files

files.upload()

Saving airports.csv to airports.csv
Saving raw-flight-data.csv to raw-flight-data.csv


In [None]:
!cd ..
!ls

airports.csv		      mnist_test.csv
anscombe.json		      mnist_train_small.csv
california_housing_test.csv   raw-flight-data.csv
california_housing_train.csv  README.md
flights.csv


In [None]:
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql import SparkSession

from pyspark.ml import Pipeline
#from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.feature import VectorAssembler, StringIndexer, VectorIndexer, MinMaxScaler
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import BinaryClassificationEvaluator

spark = SparkSession.builder.master("local[*]").getOrCreate()

In [None]:
csv = spark.read.csv('sample_data/flights.csv', inferSchema=True, header=True)
csv.show(10)

+----------+---------+-------+---------------+-------------+--------+--------+
|DayofMonth|DayOfWeek|Carrier|OriginAirportID|DestAirportID|DepDelay|ArrDelay|
+----------+---------+-------+---------------+-------------+--------+--------+
|        19|        5|     DL|          11433|        13303|      -3|       1|
|        19|        5|     DL|          14869|        12478|       0|      -8|
|        19|        5|     DL|          14057|        14869|      -4|     -15|
|        19|        5|     DL|          15016|        11433|      28|      24|
|        19|        5|     DL|          11193|        12892|      -6|     -11|
|        19|        5|     DL|          10397|        15016|      -1|     -19|
|        19|        5|     DL|          15016|        10397|       0|      -1|
|        19|        5|     DL|          10397|        14869|      15|      24|
|        19|        5|     DL|          10397|        10423|      33|      34|
|        19|        5|     DL|          11278|      

In [None]:
data = csv.select("DayofMonth", "DayOfWeek", "Carrier", "OriginAirportID", "DestAirportID", "DepDelay", ((col("ArrDelay") > 15).cast("Int").alias("label")))
data.show(10)

+----------+---------+-------+---------------+-------------+--------+-----+
|DayofMonth|DayOfWeek|Carrier|OriginAirportID|DestAirportID|DepDelay|label|
+----------+---------+-------+---------------+-------------+--------+-----+
|        19|        5|     DL|          11433|        13303|      -3|    0|
|        19|        5|     DL|          14869|        12478|       0|    0|
|        19|        5|     DL|          14057|        14869|      -4|    0|
|        19|        5|     DL|          15016|        11433|      28|    1|
|        19|        5|     DL|          11193|        12892|      -6|    0|
|        19|        5|     DL|          10397|        15016|      -1|    0|
|        19|        5|     DL|          15016|        10397|       0|    0|
|        19|        5|     DL|          10397|        14869|      15|    1|
|        19|        5|     DL|          10397|        10423|      33|    1|
|        19|        5|     DL|          11278|        10397|     323|    1|
+----------+

In [None]:
splits = data.randomSplit([0.7, 0.3])
train = splits[0]
test = splits[1].withColumnRenamed("label", "trueLabel")
train_rows = train.count()
test_rows = test.count()
print("Training Rows:", train_rows, " Testing Rows:", test_rows)

Training Rows: 1890688  Testing Rows: 811530


In [None]:
strIdx = StringIndexer(inputCol = "Carrier", outputCol = "CarrierIdx")
catVect = VectorAssembler(inputCols = ["CarrierIdx", "DayofMonth", "DayOfWeek", "OriginAirportID", "DestAirportID"], outputCol="catFeatures")
catIdx = VectorIndexer(inputCol = catVect.getOutputCol(), outputCol = "idxCatFeatures")
numVect = VectorAssembler(inputCols = ["DepDelay"], outputCol="numFeatures")
minMax = MinMaxScaler(inputCol = numVect.getOutputCol(), outputCol="normFeatures")
featVect = VectorAssembler(inputCols=["idxCatFeatures", "normFeatures"], outputCol="features")
lr = LogisticRegression(labelCol="label",featuresCol="features",maxIter=10,regParam=0.3)
#dt = DecisionTreeClassifier(labelCol="label", featuresCol="features")
pipeline = Pipeline(stages=[strIdx, catVect, catIdx, numVect, minMax, featVect, lr])



In [None]:
piplineModel = pipeline.fit(train)

In [None]:
prediction = piplineModel.transform(test)
predicted = prediction.select("features", "prediction", "trueLabel")
predicted.show(30, truncate=False)

+---------------------------------------------------+----------+---------+
|features                                           |prediction|trueLabel|
+---------------------------------------------------+----------+---------+
|[10.0,1.0,0.0,10397.0,12191.0,0.03115264797507788] |0.0       |0        |
|[10.0,1.0,0.0,10397.0,12264.0,0.0446521287642783]  |0.0       |0        |
|[10.0,1.0,0.0,10397.0,12264.0,0.08151609553478713] |0.0       |1        |
|[10.0,1.0,0.0,10397.0,13851.0,0.03374870197300104] |0.0       |0        |
|[10.0,1.0,0.0,10423.0,11433.0,0.030114226375908618]|0.0       |0        |
|[10.0,1.0,0.0,10423.0,13487.0,0.03167185877466251] |0.0       |0        |
|[10.0,1.0,0.0,10423.0,14869.0,0.027518172377985463]|0.0       |0        |
|[10.0,1.0,0.0,10529.0,11193.0,0.030114226375908618]|0.0       |0        |
|[10.0,1.0,0.0,10529.0,11193.0,0.037383177570093455]|0.0       |0        |
|[10.0,1.0,0.0,10529.0,13487.0,0.03063343717549325] |0.0       |1        |
|[10.0,1.0,0.0,10693.0,11

In [None]:
tp = float(predicted.filter("prediction == 1.0 AND truelabel == 1").count())
fp = float(predicted.filter("prediction == 1.0 AND truelabel == 0").count())
tn = float(predicted.filter("prediction == 0.0 AND truelabel == 0").count())
fn = float(predicted.filter("prediction == 0.0 AND truelabel == 1").count())
pr = tp / (tp + fp)
re = tp / (tp + fn)
metrics = spark.createDataFrame([
 ("TP", tp),
 ("FP", fp),
 ("TN", tn),
 ("FN", fn),
 ("Precision", pr),
 ("Recall", re),
 ("F1", 2*pr*re/(re+pr))],["metric", "value"])
metrics.show()

+---------+-------------------+
|   metric|              value|
+---------+-------------------+
|       TP|            18603.0|
|       FP|               89.0|
|       TN|           650072.0|
|       FN|           142766.0|
|Precision| 0.9952386047506955|
|   Recall|0.11528236526222509|
|       F1| 0.2066299753972265|
+---------+-------------------+



In [None]:
evaluator = BinaryClassificationEvaluator(labelCol="trueLabel", rawPredictionCol="rawPrediction", metricName="areaUnderROC")
aur = evaluator.evaluate(prediction)
print ("AUR = ", aur)

AUR =  0.9130514669622778


In [None]:
prediction.select("rawPrediction", "probability", "prediction", "trueLabel").show(20, truncate=False)

+----------------------------------------+----------------------------------------+----------+---------+
|rawPrediction                           |probability                             |prediction|trueLabel|
+----------------------------------------+----------------------------------------+----------+---------+
|[1.4781123770013846,-1.4781123770013846]|[0.814287297091043,0.18571270290895708] |0.0       |0        |
|[1.125136054455616,-1.125136054455616]  |[0.7549401585653928,0.24505984143460727]|0.0       |0        |
|[0.1536032679392647,-0.1536032679392647]|[0.5383254924094819,0.4616745075905181] |0.0       |1        |
|[1.4732804695583774,-1.4732804695583774]|[0.813555489255756,0.18644451074424392] |0.0       |0        |
|[1.4773925830810661,-1.4773925830810661]|[0.8141784227137258,0.18582157728627421]|0.0       |0        |
|[1.5150198701218598,-1.5150198701218598]|[0.8198039601428392,0.1801960398571608] |0.0       |0        |
|[1.6774255254556656,-1.6774255254556656]|[0.8425633272

In [None]:
paramGrid = ParamGridBuilder().addGrid(lr.regParam, [0.3, 0.1]).addGrid(lr.maxIter, [10, 5]).addGrid(lr.threshold, 
                                                                                            [0.4, 0.3]).build()
cv = CrossValidator(estimator=pipeline, evaluator=BinaryClassificationEvaluator(), estimatorParamMaps=paramGrid, 
                    numFolds=2)

model = cv.fit(train)


In [None]:
newPrediction = model.transform(test)
newPredicted = prediction.select("features", "prediction", "trueLabel")
newPredicted.show()

+--------------------+----------+---------+
|            features|prediction|trueLabel|
+--------------------+----------+---------+
|[10.0,1.0,0.0,103...|       0.0|        0|
|[10.0,1.0,0.0,103...|       0.0|        0|
|[10.0,1.0,0.0,103...|       0.0|        1|
|[10.0,1.0,0.0,103...|       0.0|        0|
|[10.0,1.0,0.0,104...|       0.0|        0|
|[10.0,1.0,0.0,104...|       0.0|        0|
|[10.0,1.0,0.0,104...|       0.0|        0|
|[10.0,1.0,0.0,105...|       0.0|        0|
|[10.0,1.0,0.0,105...|       0.0|        0|
|[10.0,1.0,0.0,105...|       0.0|        1|
|[10.0,1.0,0.0,106...|       0.0|        0|
|[10.0,1.0,0.0,106...|       0.0|        0|
|[10.0,1.0,0.0,106...|       0.0|        0|
|[10.0,1.0,0.0,107...|       0.0|        0|
|[10.0,1.0,0.0,107...|       0.0|        1|
|[10.0,1.0,0.0,107...|       0.0|        0|
|[10.0,1.0,0.0,107...|       1.0|        1|
|[10.0,1.0,0.0,108...|       0.0|        0|
|[10.0,1.0,0.0,108...|       0.0|        1|
|[10.0,1.0,0.0,108...|       0.0

In [None]:
tp2 = float(newPrediction.filter("prediction == 1.0 AND truelabel == 1").count())
fp2 = float(newPrediction.filter("prediction == 1.0 AND truelabel == 0").count())
tn2 = float(newPrediction.filter("prediction == 0.0 AND truelabel == 0").count())
fn2 = float(newPrediction.filter("prediction == 0.0 AND truelabel == 1").count())
pr2 = tp2 / (tp2 + fp2)
re2 = tp2 / (tp2 + fn2)
metrics2 = spark.createDataFrame([
 ("TP", tp2),
 ("FP", fp2),
 ("TN", tn2),
 ("FN", fn2),
 ("Precision", pr2),
 ("Recall", re2),
 ("F1", 2*pr2*re2/(re2+pr2))],["metric", "value"])
metrics2.show()

+---------+------------------+
|   metric|             value|
+---------+------------------+
|       TP|           57650.0|
|       FP|             201.0|
|       TN|          649960.0|
|       FN|          103719.0|
|Precision|0.9965255570344506|
|   Recall|0.3572557306545867|
|       F1|0.5259556609798376|
+---------+------------------+



In [None]:
evaluator2 = BinaryClassificationEvaluator(labelCol="trueLabel", rawPredictionCol="prediction", metricName="areaUnderROC")
aur2 = evaluator.evaluate(prediction)
print( "AUR2 = ", aur2)

AUR2 =  0.9130514669622887
