In [1]:
from pyspark.sql import SparkSession
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.sql.functions import when
from pyspark.ml.feature import VectorAssembler, StandardScaler

In [2]:
spark = SparkSession.builder.getOrCreate()

In [3]:
# load data
df_training = spark.read.option("inferSchema", "true").csv("./PlaneDelayTraining.csv", header=True)
df_test = spark.read.option("inferSchema", "true").csv("./PlaneDelayTesting.csv", header=True)

# df_training.show()

In [4]:
#select features
# df_training = df_training.select("Airline", "DayOfWeek", "IsDelayed", "TicketPrice")
# df_test = df_test.select("Airline", "DayOfWeek", "IsDelayed", "TicketPrice")

# df_training = df_training.select("FlightID", "TimeDeparture", "IsDelayed")
# df_test = df_test.select("FlightID", "TimeDeparture", "IsDelayed")

df_training = df_training.select("TimeDeparture", "LengthOfFlight", "DayOfWeek", "IsDelayed")
df_test = df_test.select("TimeDeparture", "LengthOfFlight", "DayOfWeek", "IsDelayed")

df_test.show()

+-------------+--------------+---------+---------+
|TimeDeparture|LengthOfFlight|DayOfWeek|IsDelayed|
+-------------+--------------+---------+---------+
|          650|           313|        1|        0|
|          515|           285|        3|        0|
|          515|           285|        4|        0|
|         1184|           161|        3|        0|
|         1184|           161|        7|        0|
|          775|           159|        3|        0|
|          775|           159|        4|        0|
|          700|           159|        5|        1|
|         1230|            91|        4|        1|
|         1135|            96|        7|        1|
|         1135|            98|        7|        1|
|          965|            88|        6|        1|
|          965|            88|        5|        1|
|          697|           193|        5|        0|
|          697|           193|        4|        0|
|          697|           193|        3|        0|
|          870|            95| 

In [5]:
# data preprocessing
df_training = df_training.na.drop()
df_test = df_test.na.drop()

df_test.show()

+-------------+--------------+---------+---------+
|TimeDeparture|LengthOfFlight|DayOfWeek|IsDelayed|
+-------------+--------------+---------+---------+
|          650|           313|        1|        0|
|          515|           285|        3|        0|
|          515|           285|        4|        0|
|         1184|           161|        3|        0|
|         1184|           161|        7|        0|
|          775|           159|        3|        0|
|          775|           159|        4|        0|
|          700|           159|        5|        1|
|         1230|            91|        4|        1|
|         1135|            96|        7|        1|
|         1135|            98|        7|        1|
|          965|            88|        6|        1|
|          965|            88|        5|        1|
|          697|           193|        5|        0|
|          697|           193|        4|        0|
|          697|           193|        3|        0|
|          870|            95| 

In [6]:
# transform
# def transform(df):
#     df = df.withColumn("Airline", when(df["Airline"] == "DL", 1)\
#                       .when(df["Airline"] == "FL", 2)\
#                       .when(df["Airline"] == "EV", 3))
#     return df

# df_training = transform(df_training)
# df_test = transform(df_test)
# df_training.show()

In [7]:
# normalization

cols = df_training.columns
cols.remove("IsDelayed")

assembler = VectorAssembler(inputCols=cols, outputCol="Features")
scaler = StandardScaler(inputCol="Features", outputCol="ScaledFeatures")

df_training = assembler.transform(df_training)
df_training = scaler.fit(df_training).transform(df_training)

df_test = assembler.transform(df_test)
df_test = scaler.fit(df_test).transform(df_test)

df_test.show()

+-------------+--------------+---------+---------+------------------+--------------------+
|TimeDeparture|LengthOfFlight|DayOfWeek|IsDelayed|          Features|      ScaledFeatures|
+-------------+--------------+---------+---------+------------------+--------------------+
|          650|           313|        1|        0| [650.0,313.0,1.0]|[2.32676835297385...|
|          515|           285|        3|        0| [515.0,285.0,3.0]|[1.84351646427928...|
|          515|           285|        4|        0| [515.0,285.0,4.0]|[1.84351646427928...|
|         1184|           161|        3|        0|[1184.0,161.0,3.0]|[4.23829804603237...|
|         1184|           161|        7|        0|[1184.0,161.0,7.0]|[4.23829804603237...|
|          775|           159|        3|        0| [775.0,159.0,3.0]|[2.77422380546882...|
|          775|           159|        4|        0| [775.0,159.0,4.0]|[2.77422380546882...|
|          700|           159|        5|        1| [700.0,159.0,5.0]|[2.50575053397184...|

In [8]:
# generate model
model = LogisticRegression(featuresCol="ScaledFeatures", labelCol="IsDelayed", maxIter=1000).fit(df_training)

prediction = model.transform(df_test)
# prediction.select("Airline", "DayOfWeek", "IsDelayed", "TicketPrice").show(10)
# prediction.select("FlightID", "TimeDeparture","IsDelayed").show(10)


In [9]:
# model testing & evaluation
evaluator = BinaryClassificationEvaluator(labelCol="IsDelayed")
accuracy = round(evaluator.evaluate(prediction) * 100, 2)

print(f"{accuracy}%")

81.01%
