<a href="https://colab.research.google.com/github/kntgio-z/Belardo_Elect2/blob/main/Lab%238.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Lab8

- Implement a complete ML pipeline to predict possible delays in the future for at least 2 airports given the dataset on departure delays.
- Interpret model evaluation results.

## Preprocesssing

We just copied this boilerplate from the lab 6.

### Loading datasets

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import expr

# Initialize Spark Session
spark = SparkSession.builder.appName("Lab6").getOrCreate()

prefix = "./data/"

tripdelaysFilePath = prefix + "departuredelays.csv"
airportsnaFilePath = prefix + "airport-codes-na.txt"

# Obtain airports data set
airportsna = (spark.read.format("csv").options(header="true", inferSchema="true", sep="\t").load(airportsnaFilePath))
airportsna.createOrReplaceTempView("airports_na")

# Obtain departure delays data set
departureDelays = (spark.read.format("csv").options(header="true").load(tripdelaysFilePath))
departureDelays = departureDelays.withColumn("delay", expr("CAST(delay as INT) as delay"))
departureDelays = departureDelays.withColumn("distance", expr("CAST(distance as INT) as distance"))
departureDelays.createOrReplaceTempView("departureDelays")


### Perform SQL Join

In [None]:
from pyspark.sql.functions import col

# Join departureDelays with airportsna to get origin city details
combined = departureDelays \
    .join(airportsna.alias("o"), departureDelays["origin"] == col("o.IATA"), "left") \
    .join(airportsna.alias("d"), departureDelays["destination"] == col("d.IATA"), "left") \
    .select(
        "date", "delay", "distance",
        col("o.city").alias("origin_city"),
        col("d.city").alias("destination_city")
    )

# Create temp view
combined.createOrReplaceTempView("combined")

+--------+-----+--------+-----------+----------------+
|    date|delay|distance|origin_city|destination_city|
+--------+-----+--------+-----------+----------------+
|01011245|    6|     602|  Allentown|         Atlanta|
|01020600|   -8|     369|  Allentown|         Detroit|
|01021245|   -2|     602|  Allentown|         Atlanta|
|01020605|   -4|     602|  Allentown|         Atlanta|
|01031245|   -4|     602|  Allentown|         Atlanta|
+--------+-----+--------+-----------+----------------+
only showing top 5 rows



Shows first 5 rows

In [None]:
combined.show(5)

Prints rows of uncleaned data

In [None]:
combined.count()

743245

Perform cleaning (Remove Duplicates)

In [None]:
combined_cleaned = combined.dropDuplicates()

Prints rows of cleaned data

In [None]:
combined_cleaned.count()

742958

## Step-by-Step ML Pipeline

### Step 1: Data Preparation & Feature Engineering


In [None]:
from pyspark.sql.functions import col, substring, when
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml import Pipeline
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.evaluation import RegressionEvaluator

combined_cleaned = combined_cleaned.withColumn("hour", substring(col("date"), 5, 2).cast("int"))

filtered_df = combined_cleaned.filter(col("origin_city").isin("Allentown", "Atlanta"))

filtered_df = filtered_df.withColumn("delay", col("delay").cast("double"))
filtered_df = filtered_df.withColumn("delay", col("delay").cast("double").alias("label"))
filtered_df = filtered_df.withColumn("delay", when(col("delay") < 0, 0).otherwise(col("delay")))


### Step 2: Indexing & Feature Encoding

In [None]:
# Encode origin and destination cities
origin_indexer = StringIndexer(inputCol="origin_city", outputCol="origin_index", handleInvalid="skip") # skip rows with invalid data
dest_indexer = StringIndexer(inputCol="destination_city", outputCol="dest_index", handleInvalid="skip") # skip rows with invalid data

# One-hot encoding (optional but improves modeling)
origin_encoder = OneHotEncoder(inputCol="origin_index", outputCol="origin_vec")
dest_encoder = OneHotEncoder(inputCol="dest_index", outputCol="dest_vec")


### Step 3: Assemble Features

In [None]:
# Assemble all features into a single vector
assembler = VectorAssembler(
    inputCols=["distance", "hour", "origin_vec", "dest_vec"],
    outputCol="features"
)

### Step 4: Model & Pipeline

In [None]:
# Use RandomForestRegressor to predict delay
rf = RandomForestRegressor(featuresCol="features", labelCol="delay", predictionCol="prediction")

# ML Pipeline
pipeline = Pipeline(stages=[
    origin_indexer,
    dest_indexer,
    origin_encoder,
    dest_encoder,
    assembler,
    rf
])


### Step 5: Train & Evaluate

In [None]:
# Split into training and test sets
train_data, test_data = filtered_df.randomSplit([0.8, 0.2], seed=42)

# Fit the pipeline
model = pipeline.fit(train_data)

# Predict on test data
predictions = model.transform(test_data)

# Evaluate model
evaluator = RegressionEvaluator(
    labelCol="delay", predictionCol="prediction", metricName="rmse"
)
rmse = evaluator.evaluate(predictions)

print(f"Root Mean Squared Error (RMSE) on test data: {rmse:.2f}")


📊 Root Mean Squared Error (RMSE) on test data: 36.52


In [None]:
predictions.select("origin_city", "destination_city", "distance", "hour", "delay", "prediction").show(10)


+-----------+----------------+--------+----+-----+------------------+
|origin_city|destination_city|distance|hour|delay|        prediction|
+-----------+----------------+--------+----+-----+------------------+
|    Atlanta|         Orlando|     351|   6|  0.0| 8.326138587145183|
|    Atlanta|       Charlotte|     197|   7|  1.0| 7.552062363974947|
|    Atlanta|        New York|     662|   7| 18.0|11.687346074627168|
|    Atlanta|           Tampa|     353|   7|  0.0| 8.264301834126815|
|    Atlanta|          Boston|     822|   8|  2.0|10.694192596222884|
|    Atlanta|         Houston|     599|   8|  0.0|11.275137982404708|
|    Atlanta| West Palm Beach|     473|   8|  0.0|  9.20799319467891|
|    Atlanta|         Houston|     605|   8| 11.0|11.275137982404708|
|    Atlanta|         Detroit|     516|   8|  0.0|  9.36688623573643|
|    Atlanta| Fort Lauderdale|     505|   8|  0.0| 8.264301834126815|
+-----------+----------------+--------+----+-----+------------------+
only showing top 10 

## Results

The model is, on average, 36.52 minutes off when predicting the actual flight delay.

The lower the RMSE, the better the predictions.

A perfect model would have an RMSE of 0 (no difference between predicted and actual delays).

### Is 36.52 good?
That depends on the range of your delay values:

### Scenario	Interpretation
Most delays are < 10 minutes	RMSE of 36.52 is high, model is struggling.
Delays range from 0 to 200+ minutes	RMSE of 36.52 is reasonable, especially for basic features.


In [None]:
# Get summary statistics for the 'delay' column
filtered_df.select("delay").describe().show()


+-------+------------------+
|summary|             delay|
+-------+------------------+
|  count|             59012|
|   mean|15.432234121873517|
| stddev|38.044117365421805|
|    min|               0.0|
|    max|             925.0|
+-------+------------------+



## Conclusion

Based on the range, RMSE of 36.52 is reasonable. We also tried doing this in other models, but we found out that using Random Forest is the most efficient in terms of speed and result.