# Importing Spark and its machine learning packages

In [1]:
import findspark
findspark.init()
import pyspark

In [2]:
# Import modules
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.ml.classification import NaiveBayes
from pyspark.ml.feature import VectorAssembler
from pyspark.sql import SparkSession

# Create and initialize Spark session
spark = SparkSession.builder.appName("Classification in Apache Spark") \
.config("spark.some.config.option", "some-value").getOrCreate()

# Loading flights data

In [3]:
# Create flights db schema
flightSchema = StructType([
  StructField("DayofMonth", IntegerType(), False),
  StructField("DayOfWeek", IntegerType(), False),
  StructField("Carrier", StringType(), False),
  StructField("OriginAirportID", IntegerType(), False),
  StructField("DestAirportID", IntegerType(), False),
  StructField("DepDelay", IntegerType(), False),
  StructField("ArrDelay", IntegerType(), False),
])

# Read csv data from file into DataFrame
df = spark.read.csv('flights.csv', schema=flightSchema, header=True)
df.show(3)

+----------+---------+-------+---------------+-------------+--------+--------+
|DayofMonth|DayOfWeek|Carrier|OriginAirportID|DestAirportID|DepDelay|ArrDelay|
+----------+---------+-------+---------------+-------------+--------+--------+
|        19|        5|     DL|          11433|        13303|      -3|       1|
|        19|        5|     DL|          14869|        12478|       0|      -8|
|        19|        5|     DL|          14057|        14869|      -4|     -15|
+----------+---------+-------+---------------+-------------+--------+--------+
only showing top 3 rows



# Handling missing data

In [4]:
# Drop rows if there is at least one empty column
df_non_na = df.dropna(how="any", subset=["DayofMonth","DayOfWeek","Carrier","OriginAirportID",
                       "DestAirportID","ArrDelay", "DepDelay"])

# Select data features and create the "Late" column from the "ArrDelay" converted to binary using the value > 15

In [28]:
data = df_non_na.select(
     "DayofMonth", "DayOfWeek", "OriginAirportID", "DestAirportID", ((col("ArrDelay") > 15).cast("Int").alias("Late")))
data.show()

+----------+---------+---------------+-------------+----+
|DayofMonth|DayOfWeek|OriginAirportID|DestAirportID|Late|
+----------+---------+---------------+-------------+----+
|        19|        5|          11433|        13303|   0|
|        19|        5|          14869|        12478|   0|
|        19|        5|          14057|        14869|   0|
|        19|        5|          15016|        11433|   1|
|        19|        5|          11193|        12892|   0|
|        19|        5|          10397|        15016|   0|
|        19|        5|          15016|        10397|   0|
|        19|        5|          10397|        14869|   1|
|        19|        5|          10397|        10423|   1|
|        19|        5|          11278|        10397|   1|
|        19|        5|          14107|        13487|   0|
|        19|        5|          11433|        11298|   1|
|        19|        5|          11298|        11433|   1|
|        19|        5|          11433|        12892|   0|
|        19|  

# Split the data into training and testing

In [None]:
# Divide data, 70% for training, 30% for testing
dividedData = data.randomSplit([0.7, 0.3]) 
trainingData = dividedData[0] #index 0 = data training
testingData = dividedData[1] #index 1 = data testing
train_rows = trainingData.count()
test_rows = testingData.count()
print("Training data rows:", train_rows, "; Testing data rows:", test_rows)

Training data rows: 1893012 ; Testing data rows: 809206


# Prepare the training data

In [11]:
# Define an assembler
assembler = VectorAssembler(inputCols = ["DayofMonth", "DayOfWeek", "OriginAirportID", "DestAirportID"], outputCol="features")

trainingDataFinal = assembler.transform(trainingData).select(col("features"), col("Late").alias("label"))

trainingDataFinal.show(truncate=False)

+-------------------------+-----+
|features                 |label|
+-------------------------+-----+
|[1.0,1.0,10140.0,10397.0]|0    |
|[1.0,1.0,10140.0,11298.0]|0    |
|[1.0,1.0,10140.0,11298.0]|0    |
|[1.0,1.0,10140.0,11298.0]|0    |
|[1.0,1.0,10140.0,11298.0]|0    |
|[1.0,1.0,10140.0,11298.0]|0    |
|[1.0,1.0,10140.0,11298.0]|0    |
|[1.0,1.0,10140.0,12266.0]|0    |
|[1.0,1.0,10140.0,12266.0]|1    |
|[1.0,1.0,10140.0,13487.0]|0    |
|[1.0,1.0,10299.0,13487.0]|0    |
|[1.0,1.0,10299.0,13830.0]|1    |
|[1.0,1.0,10299.0,14057.0]|0    |
|[1.0,1.0,10299.0,14747.0]|0    |
|[1.0,1.0,10299.0,14747.0]|0    |
|[1.0,1.0,10299.0,14747.0]|0    |
|[1.0,1.0,10299.0,14747.0]|0    |
|[1.0,1.0,10299.0,14747.0]|0    |
|[1.0,1.0,10299.0,14747.0]|0    |
|[1.0,1.0,10299.0,14747.0]|0    |
+-------------------------+-----+
only showing top 20 rows



# Train the model

In [12]:
# Define the classifier
classifier = NaiveBayes(labelCol="label", featuresCol="features", smoothing=1.0, modelType="multinomial")

# Train the classifier
model = classifier.fit(trainingDataFinal)
print("Model successfully trained!")

Model successfully trained!


# Prepare the testing data

In [22]:
# Call the previous training assembler
testingDataFinal = assembler.transform(testingData).select(col("features"), col("Late").alias("trueLabel"))

testingDataFinal.show(truncate=False)

+-------------------------+---------+
|features                 |trueLabel|
+-------------------------+---------+
|[1.0,1.0,10140.0,10397.0]|0        |
|[1.0,1.0,10140.0,11292.0]|0        |
|[1.0,1.0,10140.0,11292.0]|0        |
|[1.0,1.0,10140.0,11298.0]|0        |
|[1.0,1.0,10140.0,11298.0]|1        |
|[1.0,1.0,10140.0,12266.0]|1        |
|[1.0,1.0,10299.0,12173.0]|0        |
|[1.0,1.0,10299.0,13487.0]|0        |
|[1.0,1.0,10299.0,13930.0]|0        |
|[1.0,1.0,10299.0,14057.0]|0        |
|[1.0,1.0,10299.0,14747.0]|0        |
|[1.0,1.0,10299.0,14747.0]|0        |
|[1.0,1.0,10299.0,14747.0]|0        |
|[1.0,1.0,10299.0,14747.0]|0        |
|[1.0,1.0,10299.0,14747.0]|0        |
|[1.0,1.0,10299.0,14869.0]|0        |
|[1.0,1.0,10397.0,10423.0]|0        |
|[1.0,1.0,10397.0,10529.0]|0        |
|[1.0,1.0,10397.0,10529.0]|0        |
|[1.0,1.0,10397.0,10529.0]|1        |
+-------------------------+---------+
only showing top 20 rows



# Predict testing data using the trained model

In [None]:
# Generate the model predictions for the testing data
prediction = model.transform(testingDataFinal)

prediction.show(truncate=False)

+-------------------------+---------+-----------------------------------------+------------------------------------------+----------+
|features                 |trueLabel|rawPrediction                            |probability                               |prediction|
+-------------------------+---------+-----------------------------------------+------------------------------------------+----------+
|[1.0,1.0,10140.0,10397.0]|0        |[-14267.533819553835,-14268.395736949447]|[0.7030610975119547,0.2969389024880454]   |0.0       |
|[1.0,1.0,10140.0,11292.0]|0        |[-14889.066369033728,-14887.452500792004]|[0.16605225151295977,0.8339477484870403]  |1.0       |
|[1.0,1.0,10140.0,11292.0]|0        |[-14889.066369033728,-14887.452500792004]|[0.16605225151295977,0.8339477484870403]  |1.0       |
|[1.0,1.0,10140.0,11298.0]|0        |[-14893.23306768946,-14891.602602002122] |[0.16376657671365463,0.8362334232863454]  |1.0       |
|[1.0,1.0,10140.0,11298.0]|1        |[-14893.23306768946,-1489

In [19]:
# Take only the relevant columns
predictionFinal = prediction.select("features", "prediction", "probability", "trueLabel")

predictionFinal.show(truncate=False)

+-------------------------+----------+------------------------------------------+---------+
|features                 |prediction|probability                               |trueLabel|
+-------------------------+----------+------------------------------------------+---------+
|[1.0,1.0,10140.0,10397.0]|0.0       |[0.7030610975119547,0.2969389024880454]   |0        |
|[1.0,1.0,10140.0,11292.0]|1.0       |[0.16605225151295977,0.8339477484870403]  |0        |
|[1.0,1.0,10140.0,11292.0]|1.0       |[0.16605225151295977,0.8339477484870403]  |0        |
|[1.0,1.0,10140.0,11298.0]|1.0       |[0.16376657671365463,0.8362334232863454]  |0        |
|[1.0,1.0,10140.0,11298.0]|1.0       |[0.16376657671365463,0.8362334232863454]  |1        |
|[1.0,1.0,10140.0,12266.0]|1.0       |[0.013279217135528374,0.9867207828644717] |1        |
|[1.0,1.0,10299.0,12173.0]|1.0       |[0.026385522003500486,0.9736144779964996] |0        |
|[1.0,1.0,10299.0,13487.0]|1.0       |[7.146229984800281E-4,0.99928537700152]   

# Calculate model performance and accuracy

In [20]:
# Get the correct predictions
correctPrediction = predictionFinal.filter(predictionFinal['prediction'] == predictionFinal['trueLabel']).count()

# Get the total data count
totalData = predictionFinal.count()

print("correct prediction:", correctPrediction, ", total data:", totalData, ", accuracy:", correctPrediction/totalData)

correct prediction: 452333 , total data: 809206 , accuracy: 0.5589837445594817
