In [4]:
!pip install opendatasets

Collecting opendatasets
  Downloading opendatasets-0.1.22-py3-none-any.whl (15 kB)
Installing collected packages: opendatasets
Successfully installed opendatasets-0.1.22


In [9]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m2.7 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.1-py2.py3-none-any.whl size=317488491 sha256=cdc523935349ca98e3d82ac8c84398d44968207470c9efc4c87f29cfe02a4525
  Stored in directory: /root/.cache/pip/wheels/80/1d/60/2c256ed38dddce2fdd93be545214a63e02fbd8d74fb0b7f3a6
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.1


In [5]:
import opendatasets as od
url = "https://www.kaggle.com/datasets/tylerx/flights-and-airports-data"
od.download(url)

Please provide your Kaggle credentials to download this dataset. Learn more: http://bit.ly/kaggle-creds
Your Kaggle username: shacodes
Your Kaggle Key: ··········
Dataset URL: https://www.kaggle.com/datasets/tylerx/flights-and-airports-data
Downloading flights-and-airports-data.zip to ./flights-and-airports-data


100%|██████████| 27.1M/27.1M [00:00<00:00, 62.1MB/s]





In [16]:
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler, StringIndexer, VectorIndexer, MinMaxScaler
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import BinaryClassificationEvaluator


In [17]:
spark = SparkSession.builder.master("local[*]").getOrCreate()

In [18]:
data_path = "/content/flights-and-airports-data/flights.csv"
spark_df=spark.read.csv(data_path,inferSchema=True,header=True)

In [19]:
spark_df.show(2)

+----------+---------+-------+---------------+-------------+--------+--------+
|DayofMonth|DayOfWeek|Carrier|OriginAirportID|DestAirportID|DepDelay|ArrDelay|
+----------+---------+-------+---------------+-------------+--------+--------+
|        19|        5|     DL|          11433|        13303|      -3|       1|
|        19|        5|     DL|          14869|        12478|       0|      -8|
+----------+---------+-------+---------------+-------------+--------+--------+
only showing top 2 rows



In [50]:
data = spark_df.select("DayofMonth", "DayOfWeek", "Carrier", "OriginAirportID", "DestAirportID", "DepDelay", ((col("ArrDelay") > 45).cast("Int").alias("label")))
data.show(10)

+----------+---------+-------+---------------+-------------+--------+-----+
|DayofMonth|DayOfWeek|Carrier|OriginAirportID|DestAirportID|DepDelay|label|
+----------+---------+-------+---------------+-------------+--------+-----+
|        19|        5|     DL|          11433|        13303|      -3|    0|
|        19|        5|     DL|          14869|        12478|       0|    0|
|        19|        5|     DL|          14057|        14869|      -4|    0|
|        19|        5|     DL|          15016|        11433|      28|    0|
|        19|        5|     DL|          11193|        12892|      -6|    0|
|        19|        5|     DL|          10397|        15016|      -1|    0|
|        19|        5|     DL|          15016|        10397|       0|    0|
|        19|        5|     DL|          10397|        14869|      15|    0|
|        19|        5|     DL|          10397|        10423|      33|    0|
|        19|        5|     DL|          11278|        10397|     323|    1|
+----------+

In [51]:
splits = data.randomSplit([0.7, 0.3])
train = splits[0]
test = splits[1].withColumnRenamed("label", "trueLabel")
train_data = train.count()
test_data = test.count()
print("Training Rows:", train_data, " Testing Rows:", test_data)

Training Rows: 1892385  Testing Rows: 809833


In [52]:
carrierIndexer = StringIndexer(inputCol="Carrier", outputCol="CarrierIndex")
featureAssembler = VectorAssembler(inputCols=["CarrierIndex", "DayofMonth", "DayOfWeek", "OriginAirportID", "DestAirportID"],
                                   outputCol="catFeatures")
featureIndexer = VectorIndexer(inputCol=featureAssembler.getOutputCol(), outputCol="indexedCatFeatures")
numericAssembler = VectorAssembler(inputCols=["DepDelay"], outputCol="numFeatures")
minMaxScaler = MinMaxScaler(inputCol=numericAssembler.getOutputCol(), outputCol="scaledNumFeatures")
finalAssembler = VectorAssembler(inputCols=["indexedCatFeatures", "scaledNumFeatures"], outputCol="features")

logisticRegression = LogisticRegression(labelCol="label", featuresCol="features", maxIter=10, regParam=0.3)

pipeline = Pipeline(stages=[carrierIndexer, featureAssembler, featureIndexer, numericAssembler, minMaxScaler, finalAssembler, logisticRegression])


In [53]:
piplineModel = pipeline.fit(train)

In [54]:
prediction = piplineModel.transform(test)
predicted = prediction.select("features", "prediction", "trueLabel")
predicted.show(50, truncate=False)

+---------------------------------------------------+----------+---------+
|features                                           |prediction|trueLabel|
+---------------------------------------------------+----------+---------+
|[10.0,1.0,0.0,10397.0,12264.0,0.03115264797507788] |0.0       |0        |
|[10.0,1.0,0.0,10397.0,13851.0,0.03167185877466251] |0.0       |0        |
|[10.0,1.0,0.0,10397.0,13851.0,0.03374870197300104] |0.0       |0        |
|[10.0,1.0,0.0,10423.0,11433.0,0.030114226375908618]|0.0       |0        |
|[10.0,1.0,0.0,10423.0,11433.0,0.03115264797507788] |0.0       |0        |
|[10.0,1.0,0.0,10423.0,13244.0,0.029595015576323987]|0.0       |0        |
|[10.0,1.0,0.0,10423.0,13487.0,0.027518172377985463]|0.0       |0        |
|[10.0,1.0,0.0,10423.0,13487.0,0.030633437175493248]|0.0       |0        |
|[10.0,1.0,0.0,10423.0,14869.0,0.027518172377985463]|0.0       |0        |
|[10.0,1.0,0.0,10423.0,14869.0,0.029595015576323987]|0.0       |0        |
|[10.0,1.0,0.0,10529.0,11

In [55]:
truePositive = float(predicted.filter("prediction == 1.0 AND truelabel == 1").count())
falsePositive = float(predicted.filter("prediction == 1.0 AND truelabel == 0").count())
trueNegative = float(predicted.filter("prediction == 0.0 AND truelabel == 0").count())
falseNegative = float(predicted.filter("prediction == 0.0 AND truelabel == 1").count())
precision = truePositive / (truePositive + falsePositive)
recall = truePositive / (truePositive + falseNegative)
metrics = spark.createDataFrame([
 ("TP", truePositive),
 ("FP", falsePositive),
 ("TN", trueNegative),
 ("FN", falseNegative),
 ("Precision", precision),
 ("Recall", recall),
("F1", 2*precision*recall/(recall+precision))],["metric", "value"])
metrics.show()

+---------+-------------------+
|   metric|              value|
+---------+-------------------+
|       TP|             4718.0|
|       FP|               32.0|
|       TN|           741302.0|
|       FN|            63781.0|
|Precision| 0.9932631578947368|
|   Recall|0.06887691791121038|
|       F1|0.12882087127469316|
+---------+-------------------+



In [56]:
evaluator = BinaryClassificationEvaluator(labelCol="trueLabel", rawPredictionCol="rawPrediction", metricName="areaUnderROC")
aur = evaluator.evaluate(prediction)
print ("AUR = ", aur)

AUR =  0.9750475415879148


In [57]:
paramGrid = ParamGridBuilder() \
    .addGrid(logisticRegression.regParam, [0.7, 0.1, 0.5]) \
    .addGrid(logisticRegression.maxIter, [10, 5, 15]) \
    .addGrid(logisticRegression.threshold, [0.4, 0.5, 0.6]) \
    .build()

cv = CrossValidator(estimator=pipeline,
                    evaluator=BinaryClassificationEvaluator(),
                    estimatorParamMaps=paramGrid,
                    numFolds=2)

model = cv.fit(train)


In [58]:
newPrediction = model.transform(test)
newPredicted = prediction.select("features", "prediction", "trueLabel")
newPredicted.show()

+--------------------+----------+---------+
|            features|prediction|trueLabel|
+--------------------+----------+---------+
|[10.0,1.0,0.0,103...|       0.0|        0|
|[10.0,1.0,0.0,103...|       0.0|        0|
|[10.0,1.0,0.0,103...|       0.0|        0|
|[10.0,1.0,0.0,104...|       0.0|        0|
|[10.0,1.0,0.0,104...|       0.0|        0|
|[10.0,1.0,0.0,104...|       0.0|        0|
|[10.0,1.0,0.0,104...|       0.0|        0|
|[10.0,1.0,0.0,104...|       0.0|        0|
|[10.0,1.0,0.0,104...|       0.0|        0|
|[10.0,1.0,0.0,104...|       0.0|        0|
|[10.0,1.0,0.0,105...|       0.0|        0|
|[10.0,1.0,0.0,105...|       0.0|        0|
|[10.0,1.0,0.0,105...|       0.0|        0|
|[10.0,1.0,0.0,105...|       0.0|        0|
|[10.0,1.0,0.0,106...|       0.0|        0|
|[10.0,1.0,0.0,106...|       0.0|        1|
|[10.0,1.0,0.0,107...|       0.0|        0|
|[10.0,1.0,0.0,107...|       0.0|        0|
|[10.0,1.0,0.0,107...|       0.0|        0|
|[10.0,1.0,0.0,107...|       0.0

In [59]:
truePositive2 = float(predicted.filter("prediction == 1.0 AND truelabel == 1").count())
falsePositive2 = float(predicted.filter("prediction == 1.0 AND truelabel == 0").count())
trueNegative2 = float(predicted.filter("prediction == 0.0 AND truelabel == 0").count())
falseNegative2 = float(predicted.filter("prediction == 0.0 AND truelabel == 1").count())
precision2 = truePositive2 / (truePositive2 + falsePositive2)
recall2 = truePositive2 / (truePositive2 + falseNegative2)
metrics = spark.createDataFrame([
 ("TP", truePositive),
 ("FP", falsePositive),
 ("TN", trueNegative),
 ("FN", falseNegative),
 ("Precision", precision),
 ("Recall", recall),
("F1", 2*precision2*recall2/(recall2+precision2))],["metric", "value"])
metrics.show()

+---------+-------------------+
|   metric|              value|
+---------+-------------------+
|       TP|             4718.0|
|       FP|               32.0|
|       TN|           741302.0|
|       FN|            63781.0|
|Precision| 0.9932631578947368|
|   Recall|0.06887691791121038|
|       F1|0.12882087127469316|
+---------+-------------------+



In [60]:
evaluator = BinaryClassificationEvaluator(labelCol="trueLabel", rawPredictionCol="rawPrediction", metricName="areaUnderROC")
aur = evaluator.evaluate(newPrediction)
print ("AUR = ", aur)

AUR =  0.9750696069075243
