# Import Libraries

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
from pyspark.sql.functions import round,col,desc,count
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator, BinaryClassificationEvaluator

# Session Creation
<br>
https://www.kaggle.com/code/rashid60/ml-with-pyspark-predicting-flight-delays/notebook


In [2]:
spark = SparkSession.builder.master("local[*]").config("spark.executor.memory", "70g")\
     .config("spark.driver.memory", "50g").config("spark.memory.offHeap.enabled",True)\
     .config("spark.memory.offHeap.size","16g").appName("Flight_delay").getOrCreate()


# spark = SparkSession.builder.appName('Flight_delay').getOrCreate()

22/12/04 13:21:12 WARN Utils: Your hostname, Jaminurs-MacBook-Pro.local resolves to a loopback address: 127.0.0.1; using 192.168.0.4 instead (on interface en0)
22/12/04 13:21:12 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


22/12/04 13:21:13 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
22/12/04 13:21:14 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


# Read Data

In [3]:
Flight_Spark_frame  = spark.read.option('recursiveFileLookup','True').option("header", "true")\
                        .csv("dataverse_files/*")
# Flight_Spark_frame.printSchema()

In [4]:
print(Flight_Spark_frame.dtypes)

[('Year', 'string'), ('Month', 'string'), ('DayofMonth', 'string'), ('DayOfWeek', 'string'), ('DepTime', 'string'), ('CRSDepTime', 'string'), ('ArrTime', 'string'), ('CRSArrTime', 'string'), ('UniqueCarrier', 'string'), ('FlightNum', 'string'), ('TailNum', 'string'), ('ActualElapsedTime', 'string'), ('CRSElapsedTime', 'string'), ('AirTime', 'string'), ('ArrDelay', 'string'), ('DepDelay', 'string'), ('Origin', 'string'), ('Dest', 'string'), ('Distance', 'string'), ('TaxiIn', 'string'), ('TaxiOut', 'string'), ('Cancelled', 'string'), ('CancellationCode', 'string'), ('Diverted', 'string'), ('CarrierDelay', 'string'), ('WeatherDelay', 'string'), ('NASDelay', 'string'), ('SecurityDelay', 'string'), ('LateAircraftDelay', 'string')]


In [5]:
carrier_Spark_frame  = spark.read.option("header", "true").csv("carriers.csv")
plane_Spark_frame  = spark.read.option("header", "true").csv("plane-data.csv")
airport_Spark_frame  = spark.read.option("header", "true").csv("airports.csv")

In [6]:
print(carrier_Spark_frame.dtypes)
plane_Spark_frame = plane_Spark_frame.withColumnRenamed("year","Pyear")
print(plane_Spark_frame.dtypes)
print(airport_Spark_frame.dtypes)

[('Code', 'string'), ('Description', 'string')]
[('tailnum', 'string'), ('type', 'string'), ('manufacturer', 'string'), ('issue_date', 'string'), ('model', 'string'), ('status', 'string'), ('aircraft_type', 'string'), ('engine_type', 'string'), ('Pyear', 'string')]
[('iata', 'string'), ('airport', 'string'), ('city', 'string'), ('state', 'string'), ('country', 'string'), ('lat', 'string'), ('long', 'string')]


In [7]:
plane_Spark_frame

DataFrame[tailnum: string, type: string, manufacturer: string, issue_date: string, model: string, status: string, aircraft_type: string, engine_type: string, Pyear: string]

# String Transfromation to Integer For Prediction

In [8]:
Flight_Spark_frame = Flight_Spark_frame.withColumn("WeatherDelay",col("WeatherDelay").cast(IntegerType())) \
    .withColumn("NASDelay",col("NASDelay").cast(IntegerType())) \
    .withColumn("SecurityDelay",col("SecurityDelay").cast(IntegerType()))\
    .withColumn("LateAircraftDelay",col("LateAircraftDelay").cast(IntegerType()))\
    .withColumn("CarrierDelay",col("CarrierDelay").cast(IntegerType()))\
    .withColumn("ArrDelay",col("ArrDelay").cast(IntegerType()))\
    .withColumn("DepDelay",col("DepDelay").cast(IntegerType()))\
    .withColumn("Month",col("Month").cast(IntegerType()))\
    .withColumn("Year",col("Year").cast(IntegerType()))\
    .withColumn("DayOfWeek",col("DayOfWeek").cast(IntegerType()))\
    .withColumn("DayofMonth",col("DayofMonth").cast(IntegerType()))

# MERGE flight data with plane and airport 

In [9]:
plane_flight_data = Flight_Spark_frame.join(plane_Spark_frame,"TailNum","inner")

In [10]:
# Departure Merge takes origin information 
plane_flight_dep_airport_data = plane_flight_data.withColumn("Origin", col("Origin")).\
                                join(airport_Spark_frame.withColumn("Origin", col("iata")), on="Origin")
# Arrival Merge takes destination information 
plane_flight_arr_airport_data = plane_flight_data.withColumn("Dest", col("Dest")).\
                                join(airport_Spark_frame.withColumn("Dest", col("iata")), on="Dest")

# Label Creation for Prediction Model Learning

In [11]:
dep_delay_Lab_f = plane_flight_dep_airport_data.withColumn('label', (plane_flight_dep_airport_data.DepDelay >= 15).cast('integer'))

# Check first five records
# dep_delay_Lab_f.select(col("label")).where(dep_delay_Lab_f.label == 1).show(5)

# dep_delay_Lab_f.show(5)

In [1]:
# dep_delay_Lab_f.columns

# Some String col to Index creation for the model
<br>
String column that are string neumeric values that does not require the transformation but 
<br>Values like origin string "ORD" requires to transform to some equivalent indexer by mapping the following code is doing this

In [13]:
flights_indexed = StringIndexer(inputCol='UniqueCarrier', outputCol='UniqueCarrier_idx').fit(dep_delay_Lab_f).transform(dep_delay_Lab_f)

# Repeat the process for org column
flights_indexed = StringIndexer(inputCol='Origin', outputCol='Origin_idx').fit(flights_indexed).transform(flights_indexed)
# flights_indexed.show(5)

                                                                                

In [14]:
#the following line transforming distance to integer as it will be used for ML model
flights_indexed = flights_indexed.withColumn("Distance",col("Distance").cast(IntegerType()))

# Feature Vector Creation for Departure Delay Prediction Model Learning

In [15]:
# Create an assembler object
assembler = VectorAssembler(inputCols=['Month', 'DayofMonth', 'DayOfWeek',
'UniqueCarrier_idx', 'Distance', 'DepDelay'], outputCol='features')
# Consolidate predictor columns
flights_assembled = assembler.setHandleInvalid("skip").transform(flights_indexed)
# Check the resulting column
flights_assembled.select('features', 'DepDelay').show(5, truncate=False)

+----------------------------+--------+
|features                    |DepDelay|
+----------------------------+--------+
|[1.0,1.0,1.0,6.0,479.0,10.0]|10      |
|[1.0,1.0,1.0,6.0,647.0,9.0] |9       |
|[1.0,1.0,1.0,6.0,647.0,3.0] |3       |
|[1.0,1.0,1.0,6.0,480.0,0.0] |0       |
|[1.0,1.0,1.0,6.0,480.0,2.0] |2       |
+----------------------------+--------+
only showing top 5 rows



In [21]:
flights_assembled['features']

Column<'features'>

# Train Test Split

In [16]:
flights_train, flights_test = flights_assembled.randomSplit([0.8, 0.2], seed=42)
# Check that training set has around 80% of records
training_ratio = flights_train.count() / flights_assembled.count()
print(training_ratio)

22/12/04 13:24:34 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.




0.7999771516368166


                                                                                

# Logistic Regression Model 
<br>
Takes huge time to train. Timer can be added to get the time to train

In [20]:
# Create a classifier object and train on training data
logistic = LogisticRegression().fit(flights_train)
# Create predictions for the testing data and show confusion matrix
prediction = logistic.transform(flights_test)
prediction.groupBy('label', 'prediction').count().show()

[Stage 76:>                                                        (0 + 8) / 17]

22/11/27 21:37:55 WARN InstanceBuilder$NativeBLAS: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
22/11/27 21:37:55 WARN InstanceBuilder$NativeBLAS: Failed to load implementation from:dev.ludovic.netlib.blas.ForeignLinkerBLAS


[Stage 150:>                                                       (0 + 8) / 17]

22/11/28 04:54:28 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
22/11/28 04:54:34 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
22/11/28 04:54:42 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.


[Stage 150:>                                                       (0 + 8) / 17]

22/11/28 04:54:55 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
22/11/28 04:55:07 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.


[Stage 150:===>                                                    (1 + 8) / 17]

22/11/28 04:55:32 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.


[Stage 150:===>                                                    (1 + 8) / 17]

22/11/28 04:56:32 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.




22/11/28 04:56:53 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.




22/11/28 04:57:07 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
22/11/28 04:57:21 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.




+-----+----------+-------+
|label|prediction|  count|
+-----+----------+-------+
|    0|       0.0|7131354|
|    1|       1.0|1544219|
+-----+----------+-------+



                                                                                

# Precision and Recall based Performance

In [21]:
# Calculate precision and recall
precision = TP / (TP + FP)
recall = TP / (TP + FN)
print('precision = {:.2f}\nrecall    = {:.2f}'.format(precision, recall))

# Find weighted precision
multi_evaluator = MulticlassClassificationEvaluator()
weighted_precision = multi_evaluator.evaluate(prediction, {multi_evaluator.metricName: "weightedPrecision"})

# Find AUC
binary_evaluator = BinaryClassificationEvaluator()
auc = binary_evaluator.evaluate(prediction, {binary_evaluator.metricName: "areaUnderROC"})

precision = 1.00
recall    = 1.00


                                                                                

In [2]:
# !pip install dtreeviz
# from dtreeviz.models.spark_decision_tree import ShadowSparkTree
# from dtreeviz import trees
# features = ['Month', 'DayofMonth', 'DayOfWeek','UniqueCarrier_idx', 'Distance', 'DepDelay']
# target = "DepDelay"
# spark_dtree = ShadowSparkTree(tree_model, flights_assembled[features], flights_assembled[target], feature_names= features, target_name=target, class_names=[0, 1])
# trees.dtreeviz(spark_dtree, fancy=True)