## Create Spark's Session

In [27]:
from pyspark.sql import SparkSession
from pyspark import SparkContext
from pyspark.sql import functions as F
from pyspark.sql.types import IntegerType, DoubleType

In [28]:
spark = SparkSession.builder.appName("Flights Data Pipeline").config("spark.driver.bindAddress", "127.0.0.1").config("spark.driver.port", "4040").getOrCreate()
sc = spark.sparkContext.setLogLevel("WARN")

## Read CSV and Create `flights` Temporary View

In [29]:
flights = spark.read.csv("flights.csv", header=True, inferSchema=True)
flights.createOrReplaceTempView("flights")

## Data Exploration Using SQL Queries

In [30]:
spark.sql("SELECT * FROM flights LIMIT 20").show()

+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+
|year|month|day|dep_time|dep_delay|arr_time|arr_delay|carrier|tailnum|flight|origin|dest|air_time|distance|hour|minute|
+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+
|2014|    1|  1|       1|       96|     235|       70|     AS| N508AS|   145|   PDX| ANC|     194|    1542|   0|     1|
|2014|    1|  1|       4|       -6|     738|      -23|     US| N195UW|  1830|   SEA| CLT|     252|    2279|   0|     4|
|2014|    1|  1|       8|       13|     548|       -4|     UA| N37422|  1609|   PDX| IAH|     201|    1825|   0|     8|
|2014|    1|  1|      28|       -2|     800|      -23|     US| N547UW|   466|   PDX| CLT|     251|    2282|   0|    28|
|2014|    1|  1|      34|       44|     325|       43|     AS| N762AS|   121|   SEA| ANC|     201|    1448|   0|    34|
|2014|    1|  1|      37|       82|     

In [31]:
spark.sql("DESCRIBE flights").show()

+---------+---------+-------+
| col_name|data_type|comment|
+---------+---------+-------+
|     year|      int|   NULL|
|    month|      int|   NULL|
|      day|      int|   NULL|
| dep_time|   string|   NULL|
|dep_delay|   string|   NULL|
| arr_time|   string|   NULL|
|arr_delay|   string|   NULL|
|  carrier|   string|   NULL|
|  tailnum|   string|   NULL|
|   flight|      int|   NULL|
|   origin|   string|   NULL|
|     dest|   string|   NULL|
| air_time|   string|   NULL|
| distance|      int|   NULL|
|     hour|   string|   NULL|
|   minute|   string|   NULL|
+---------+---------+-------+



In [32]:
spark.sql("SELECT * FROM flights WHERE dep_time == 'NA'").show()

+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+
|year|month|day|dep_time|dep_delay|arr_time|arr_delay|carrier|tailnum|flight|origin|dest|air_time|distance|hour|minute|
+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+
|2014|    1|  1|      NA|       NA|      NA|       NA|     AS| N763AS|    61|   SEA| JNU|      NA|     909|  NA|    NA|
|2014|    1|  1|      NA|       NA|      NA|       NA|     OO| N237SW|  5428|   PDX| SEA|      NA|     129|  NA|    NA|
|2014|    1|  1|      NA|       NA|      NA|       NA|     OO| N237SW|  5325|   SEA| PDX|      NA|     129|  NA|    NA|
|2014|    1|  1|      NA|       NA|      NA|       NA|     OO| N585SW|  5353|   PDX| EUG|      NA|     106|  NA|    NA|
|2014|    1|  1|      NA|       NA|      NA|       NA|     OO| N295SW|  5438|   SEA| PDX|      NA|     129|  NA|    NA|
|2014|    1|  1|      NA|       NA|     

In [33]:
spark.sql("SELECT COUNT(dep_time) AS empty_dep_time FROM flights WHERE dep_time = 'NA'").show()

+--------------+
|empty_dep_time|
+--------------+
|           857|
+--------------+



## A Bit of Cleaning

In the cells below, I'm only selecting the rows that **do not** contain any missing values. After that I cast the values of the columns inside the array `columns_to_clean` so we can train the data later.

In [34]:
columns_to_clean = ["dep_time", "dep_delay", "arr_time", "arr_delay", "air_time", "hour", "minute"]

for col in columns_to_clean:
    flights = flights.where(F.col(col) != "NA")

In [35]:
# Casting
flights.dtypes

[('year', 'int'),
 ('month', 'int'),
 ('day', 'int'),
 ('dep_time', 'string'),
 ('dep_delay', 'string'),
 ('arr_time', 'string'),
 ('arr_delay', 'string'),
 ('carrier', 'string'),
 ('tailnum', 'string'),
 ('flight', 'int'),
 ('origin', 'string'),
 ('dest', 'string'),
 ('air_time', 'string'),
 ('distance', 'int'),
 ('hour', 'string'),
 ('minute', 'string')]

In [36]:
casted_flights = flights

for c in columns_to_clean:
    casted_flights = casted_flights.withColumn(c, F.col(c).cast(IntegerType()))
    
casted_flights = casted_flights.withColumn("arr_delay", F.col("arr_delay").cast(DoubleType()))
casted_flights.dtypes

[('year', 'int'),
 ('month', 'int'),
 ('day', 'int'),
 ('dep_time', 'int'),
 ('dep_delay', 'int'),
 ('arr_time', 'int'),
 ('arr_delay', 'double'),
 ('carrier', 'string'),
 ('tailnum', 'string'),
 ('flight', 'int'),
 ('origin', 'string'),
 ('dest', 'string'),
 ('air_time', 'int'),
 ('distance', 'int'),
 ('hour', 'int'),
 ('minute', 'int')]

## A Bit More of Data Exploration

In [37]:
spark.sql("SELECT DISTINCT origin, COUNT(origin) FROM flights GROUP BY origin").show()

+------+-------------+
|origin|count(origin)|
+------+-------------+
|   SEA|       108714|
|   PDX|        53335|
+------+-------------+



In [38]:
spark.sql("SELECT DISTINCT origin from flights").show()

+------+
|origin|
+------+
|   SEA|
|   PDX|
+------+



In [39]:
casted_flights.createOrReplaceTempView("casted_flights")
spark.sql("DESCRIBE casted_flights").show()

+---------+---------+-------+
| col_name|data_type|comment|
+---------+---------+-------+
|     year|      int|   NULL|
|    month|      int|   NULL|
|      day|      int|   NULL|
| dep_time|      int|   NULL|
|dep_delay|      int|   NULL|
| arr_time|      int|   NULL|
|arr_delay|   double|   NULL|
|  carrier|   string|   NULL|
|  tailnum|   string|   NULL|
|   flight|      int|   NULL|
|   origin|   string|   NULL|
|     dest|   string|   NULL|
| air_time|      int|   NULL|
| distance|      int|   NULL|
|     hour|      int|   NULL|
|   minute|      int|   NULL|
+---------+---------+-------+



In [40]:
delayed_flights = spark.sql("SELECT * FROM casted_flights WHERE arr_delay > 0")
delayed_flights.show()

+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+
|year|month|day|dep_time|dep_delay|arr_time|arr_delay|carrier|tailnum|flight|origin|dest|air_time|distance|hour|minute|
+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+
|2014|    1|  1|       1|       96|     235|     70.0|     AS| N508AS|   145|   PDX| ANC|     194|    1542|   0|     1|
|2014|    1|  1|      34|       44|     325|     43.0|     AS| N762AS|   121|   SEA| ANC|     201|    1448|   0|    34|
|2014|    1|  1|      37|       82|     747|     88.0|     DL| N806DN|  1823|   SEA| DTW|     224|    1927|   0|    37|
|2014|    1|  1|     346|      227|     936|    219.0|     UA| N14219|  1481|   SEA| ORD|     202|    1721|   3|    46|
|2014|    1|  1|     526|       -4|    1148|     15.0|     UA| N813UA|   229|   PDX| IAH|     217|    1825|   5|    26|
|2014|    1|  1|     527|        7|     

In [41]:
delayed_flights.createOrReplaceTempView("delayed_flights")
avg_delay = spark.sql("SELECT dest, AVG(arr_delay) AS avg_delay FROM delayed_flights GROUP BY dest ORDER BY avg_delay DESC").show()

+----+------------------+
|dest|         avg_delay|
+----+------------------+
| BOI|             64.75|
| HDN|              46.8|
| SFO|41.193768844221104|
| CLE| 35.74193548387097|
| SBA|35.391752577319586|
| COS| 35.05607476635514|
| BWI|34.585798816568044|
| EWR| 33.52972258916777|
| DFW| 33.27519181585678|
| MIA| 32.66187050359712|
| ORD| 32.47909024211299|
| BNA| 31.94871794871795|
| JFK|31.255884586180713|
| JAC|             30.25|
| PHL|29.245989304812834|
| OGG|27.511111111111113|
| IAD|27.430875576036865|
| HOU| 27.33009708737864|
| LGB| 27.07634730538922|
| FAT|26.852589641434264|
+----+------------------+
only showing top 20 rows



## Preparing the Data for the Model

In [42]:
casted_flights.show()

+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+
|year|month|day|dep_time|dep_delay|arr_time|arr_delay|carrier|tailnum|flight|origin|dest|air_time|distance|hour|minute|
+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+
|2014|    1|  1|       1|       96|     235|     70.0|     AS| N508AS|   145|   PDX| ANC|     194|    1542|   0|     1|
|2014|    1|  1|       4|       -6|     738|    -23.0|     US| N195UW|  1830|   SEA| CLT|     252|    2279|   0|     4|
|2014|    1|  1|       8|       13|     548|     -4.0|     UA| N37422|  1609|   PDX| IAH|     201|    1825|   0|     8|
|2014|    1|  1|      28|       -2|     800|    -23.0|     US| N547UW|   466|   PDX| CLT|     251|    2282|   0|    28|
|2014|    1|  1|      34|       44|     325|     43.0|     AS| N762AS|   121|   SEA| ANC|     201|    1448|   0|    34|
|2014|    1|  1|      37|       82|     

In [43]:
data_inputs = ["month", "day", "dep_time", "arr_time", "carrier", "distance", "air_time", "arr_delay"]
training_data = casted_flights.select(data_inputs)

In [44]:
training_data.show()

+-----+---+--------+--------+-------+--------+--------+---------+
|month|day|dep_time|arr_time|carrier|distance|air_time|arr_delay|
+-----+---+--------+--------+-------+--------+--------+---------+
|    1|  1|       1|     235|     AS|    1542|     194|     70.0|
|    1|  1|       4|     738|     US|    2279|     252|    -23.0|
|    1|  1|       8|     548|     UA|    1825|     201|     -4.0|
|    1|  1|      28|     800|     US|    2282|     251|    -23.0|
|    1|  1|      34|     325|     AS|    1448|     201|     43.0|
|    1|  1|      37|     747|     DL|    1927|     224|     88.0|
|    1|  1|     346|     936|     UA|    1721|     202|    219.0|
|    1|  1|     526|    1148|     UA|    1825|     217|     15.0|
|    1|  1|     527|     917|     UA|    1024|     136|     24.0|
|    1|  1|     536|    1334|     UA|    2402|     268|     -6.0|
|    1|  1|     541|     911|     UA|     991|     130|      4.0|
|    1|  1|     549|     907|     US|    1009|     122|     12.0|
|    1|  1

In [45]:
from pyspark.ml.feature import StringIndexer
indexer_month = StringIndexer(inputCol="month", outputCol="month_indexed")
indexer_carrier = StringIndexer(inputCol="carrier", outputCol="carrier_indexed")

In [46]:
# Fit and transform 'month' and 'carrier'
flights_indexed = indexer_month.fit(training_data).transform(training_data)
flights_indexed = indexer_carrier.fit(flights_indexed).transform(flights_indexed)

# Check the result
flights_indexed.select("month", "month_indexed", "carrier", "carrier_indexed").show(5)

                                                                                

+-----+-------------+-------+---------------+
|month|month_indexed|carrier|carrier_indexed|
+-----+-------------+-------+---------------+
|    1|         10.0|     AS|            0.0|
|    1|         10.0|     US|            6.0|
|    1|         10.0|     UA|            4.0|
|    1|         10.0|     US|            6.0|
|    1|         10.0|     AS|            0.0|
+-----+-------------+-------+---------------+
only showing top 5 rows



In [47]:
from pyspark.ml.feature import VectorAssembler
columns = ["day", "dep_time", "arr_time", "distance", "air_time", "month_indexed", "carrier_indexed"]
assembler = VectorAssembler(inputCols=columns, outputCol="features")
input_variables_transformed = assembler.transform(flights_indexed)
input_variables_transformed.show()

+-----+---+--------+--------+-------+--------+--------+---------+-------------+---------------+--------------------+
|month|day|dep_time|arr_time|carrier|distance|air_time|arr_delay|month_indexed|carrier_indexed|            features|
+-----+---+--------+--------+-------+--------+--------+---------+-------------+---------------+--------------------+
|    1|  1|       1|     235|     AS|    1542|     194|     70.0|         10.0|            0.0|[1.0,1.0,235.0,15...|
|    1|  1|       4|     738|     US|    2279|     252|    -23.0|         10.0|            6.0|[1.0,4.0,738.0,22...|
|    1|  1|       8|     548|     UA|    1825|     201|     -4.0|         10.0|            4.0|[1.0,8.0,548.0,18...|
|    1|  1|      28|     800|     US|    2282|     251|    -23.0|         10.0|            6.0|[1.0,28.0,800.0,2...|
|    1|  1|      34|     325|     AS|    1448|     201|     43.0|         10.0|            0.0|[1.0,34.0,325.0,1...|
|    1|  1|      37|     747|     DL|    1927|     224|     88.0

In [48]:
from pyspark.ml.feature import Binarizer
binarizer = Binarizer(inputCol="arr_delay", outputCol="arr_delay_binary", threshold=15)
input_variables_transformed = binarizer.transform(input_variables_transformed)
input_variables_transformed.show()

+-----+---+--------+--------+-------+--------+--------+---------+-------------+---------------+--------------------+----------------+
|month|day|dep_time|arr_time|carrier|distance|air_time|arr_delay|month_indexed|carrier_indexed|            features|arr_delay_binary|
+-----+---+--------+--------+-------+--------+--------+---------+-------------+---------------+--------------------+----------------+
|    1|  1|       1|     235|     AS|    1542|     194|     70.0|         10.0|            0.0|[1.0,1.0,235.0,15...|             1.0|
|    1|  1|       4|     738|     US|    2279|     252|    -23.0|         10.0|            6.0|[1.0,4.0,738.0,22...|             0.0|
|    1|  1|       8|     548|     UA|    1825|     201|     -4.0|         10.0|            4.0|[1.0,8.0,548.0,18...|             0.0|
|    1|  1|      28|     800|     US|    2282|     251|    -23.0|         10.0|            6.0|[1.0,28.0,800.0,2...|             0.0|
|    1|  1|      34|     325|     AS|    1448|     201|     43

In [49]:
model_df = input_variables_transformed.select("features", "arr_delay_binary")
model_df.show()

+--------------------+----------------+
|            features|arr_delay_binary|
+--------------------+----------------+
|[1.0,1.0,235.0,15...|             1.0|
|[1.0,4.0,738.0,22...|             0.0|
|[1.0,8.0,548.0,18...|             0.0|
|[1.0,28.0,800.0,2...|             0.0|
|[1.0,34.0,325.0,1...|             1.0|
|[1.0,37.0,747.0,1...|             1.0|
|[1.0,346.0,936.0,...|             1.0|
|[1.0,526.0,1148.0...|             0.0|
|[1.0,527.0,917.0,...|             1.0|
|[1.0,536.0,1334.0...|             0.0|
|[1.0,541.0,911.0,...|             0.0|
|[1.0,549.0,907.0,...|             0.0|
|[1.0,550.0,837.0,...|             0.0|
|[1.0,557.0,1134.0...|             0.0|
|[1.0,557.0,825.0,...|             0.0|
|[1.0,558.0,801.0,...|             0.0|
|[1.0,559.0,916.0,...|             0.0|
|[1.0,600.0,1151.0...|             0.0|
|[1.0,600.0,842.0,...|             0.0|
|[1.0,602.0,943.0,...|             0.0|
+--------------------+----------------+
only showing top 20 rows



In [50]:
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
dt = DecisionTreeClassifier(labelCol="arr_delay_binary", featuresCol="features")
dt_model = dt.fit(model_df)

                                                                                

In [51]:
predictions_df = dt_model.transform(model_df)
predictions_df.show()

+--------------------+----------------+----------------+--------------------+----------+
|            features|arr_delay_binary|   rawPrediction|         probability|prediction|
+--------------------+----------------+----------------+--------------------+----------+
|[1.0,1.0,235.0,15...|             1.0|     [28.0,67.0]|[0.29473684210526...|       1.0|
|[1.0,4.0,738.0,22...|             0.0|[49783.0,4287.0]|[0.92071388940262...|       0.0|
|[1.0,8.0,548.0,18...|             0.0|[49783.0,4287.0]|[0.92071388940262...|       0.0|
|[1.0,28.0,800.0,2...|             0.0|[49783.0,4287.0]|[0.92071388940262...|       0.0|
|[1.0,34.0,325.0,1...|             1.0|     [28.0,67.0]|[0.29473684210526...|       1.0|
|[1.0,37.0,747.0,1...|             1.0|[49783.0,4287.0]|[0.92071388940262...|       0.0|
|[1.0,346.0,936.0,...|             1.0|[49783.0,4287.0]|[0.92071388940262...|       0.0|
|[1.0,526.0,1148.0...|             0.0|[49783.0,4287.0]|[0.92071388940262...|       0.0|
|[1.0,527.0,917.0,...

## Pipeline

In [52]:
from pyspark.ml import Pipeline

pipeline = Pipeline(stages=[indexer_month, indexer_carrier, binarizer, assembler, dt])
pipeline_model = pipeline.fit(training_data)  
predictions = pipeline_model.transform(training_data)
accuracy = MulticlassClassificationEvaluator(labelCol="arr_delay_binary", predictionCol="prediction", metricName="accuracy").evaluate(predictions_df)
precision = MulticlassClassificationEvaluator(labelCol="arr_delay_binary", predictionCol="prediction", metricName="weightedPrecision").evaluate(predictions_df)
recall = MulticlassClassificationEvaluator(labelCol="arr_delay_binary", predictionCol="prediction", metricName="weightedRecall").evaluate(predictions_df)
print(f"Accuracy Score: {accuracy}")
print(f"Precision Score: {precision}")

Accuracy Score: 0.8530370517829149
Precision Score: 0.8604016122218203


## Confusion Matrix

In [53]:
from pyspark.mllib.evaluation import MulticlassMetrics
from pyspark.sql.functions import col
import pandas as pd

# Step 1: Select prediction and actual label columns
prediction_and_labels = predictions.select(
    col("prediction").cast("double"),
    col("arr_delay_binary").cast("double")
)

# Step 2: Convert to RDD for MulticlassMetrics
preds_and_labels_rdd = prediction_and_labels.rdd.map(tuple)

# Step 3: Initialize MulticlassMetrics
metrics = MulticlassMetrics(preds_and_labels_rdd)

# Step 4: Print the Confusion Matrix
confusion_matrix = metrics.confusionMatrix().toArray()

cm_df = pd.DataFrame(
    confusion_matrix,
    index=["Actual On-Time (0)", "Actual Delayed (1)"],
    columns=["Predicted On-Time (0)", "Predicted Delayed (1)"]
)

print("Confusion Matrix:\n", cm_df)




Confusion Matrix:
                     Predicted On-Time (0)  Predicted Delayed (1)
Actual On-Time (0)               136424.0                   75.0
Actual Delayed (1)                23549.0                  700.0


                                                                                

## Confusion Matrix Breakdown

- **True Negatives (TN):** 136,092 → Flights correctly predicted as on time
- **False Positives (FP):** 407 → Flights predicted as delayed but were actually on time
- **False Negatives (FN):** 23,202 → Flights predicted as on time but were actually delayed
- **True Positives (TP):** 1,047 → Flights correctly predicted as delayed


## **Conclusion**

The model demonstrates a strong ability to predict on-time flights but struggles significantly with identifying delayed flights. While the **overall accuracy** is high (~85.4%), the **recall** for delayed flights is critically low (~4.3%), indicating that the model fails to detect most actual delays. The **precision** (~72%) suggests that when the model does predict a delay, it is often correct, but this comes at the cost of missing many delayed flights due to a high number of **false negatives**.

To improve the model's performance, especially in detecting delays, several strategies should be considered:
- **Address Class Imbalance:** Implement oversampling or class weighting to balance the representation of delayed and on-time flights.  
- **Threshold Adjustment:** Lower the binarization threshold to capture more delayed flights.  
- **Feature Engineering:** Incorporate additional features like weather conditions, holidays, and day-of-week trends.  
- **Model Tuning:** Experiment with more complex models (e.g., Random Forest, Gradient Boosting) and perform hyperparameter optimization.

By implementing these improvements, the model can achieve a better balance between **precision** and **recall**, ultimately enhancing its ability to accurately predict flight delays and provide more reliable insights for decision-making.


## Improving Model

### To Do:

- Check for other models to improve metrics