# 0. Loading Essential Libraries

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql import functions as F
from pyspark.sql.types import *
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml import Pipeline
from pyspark.ml.regression import LinearRegression, DecisionTreeRegressor
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.sql.functions import when, col
import json
import os

# 1. Initial Data Loading and Understanding

In [None]:
# Create Spark session with configurations:
# - Allocate 4GB memory to driver for better performance with large datasets
spark = SparkSession.builder \
    .appName("FlightDelayPrediction") \
    .config("spark.driver.memory", "4g") \
    .getOrCreate()

In [3]:
# Interactive file path input for local development
file_path1 = input("Please enter the path for the file (without quotes!): ")

try:
    # Load the datasets using the provided local paths
    df = spark.read.csv(file_path1,header=True,inferSchema=True,nullValue="NA")

except FileNotFoundError:
    print("File not found. Please check the path.")

## 1.1 Data Cleaning and Checking NULL values

In [4]:
# Remove forbidden variables (information unknown at takeoff)
forbidden_cols = [
    "ArrTime",        
    "ActualElapsedTime", 
    "AirTime",        
    "TaxiIn",         
    "Diverted",       
    "CarrierDelay",   
    "WeatherDelay",
    "NASDelay",
    "SecurityDelay",
    "LateAircraftDelay"
]

df_cleaned = df.drop(*forbidden_cols)

# Check if forbidden columns were dropped from the dataframe - if nothing is shown then OK
existing_forbidden_cols = [col for col in forbidden_cols if col in df_cleaned.columns]
existing_forbidden_cols

[]

In [5]:
# Filter out canceled flights and ensure valid elapsed time
df_cleaned = df_cleaned.filter(col("Cancelled") == 0) \
    .drop("CancellationCode", "Cancelled") \
    .filter(col("CRSElapsedTime") > 0) \
    .distinct()

In [6]:
# Check and handle repeated values between scheduled and actual times
def handle_repeated_times(df):
    # Calculate percentage of matches between DepTime and CRSDepTime
    match_df = df.withColumn("is_match", 
        when(col("DepTime") == col("CRSDepTime"), 1).otherwise(0))
    
    match_pct = match_df.agg(avg("is_match") * 100).collect()[0][0]
    
    # If more than 70% match, drop DepTime as it's redundant
    if match_pct > 70:
        df = df.drop("DepTime")
    
    return df

In [7]:
handle_repeated_times(df_cleaned)

DataFrame[Year: int, Month: int, DayofMonth: int, DayOfWeek: int, DepTime: int, CRSDepTime: int, CRSArrTime: int, UniqueCarrier: string, FlightNum: int, TailNum: string, CRSElapsedTime: int, ArrDelay: int, DepDelay: int, Origin: string, Dest: string, Distance: int, TaxiOut: string]

In [8]:
# Function to analyze missing values
def analyze_null_values(df):
    total_rows = df.count()
    for column in df.columns:
        null_count = df.filter(F.col(column).isNull()).count()
        percentage = (null_count/total_rows) * 100
        print(f"{column}: {null_count} nulls ({percentage:.2f}%)")

In [9]:
# Check null values
analyze_null_values(df_cleaned)

Year: 0 nulls (0.00%)
Month: 0 nulls (0.00%)
DayofMonth: 0 nulls (0.00%)
DayOfWeek: 0 nulls (0.00%)
DepTime: 0 nulls (0.00%)
CRSDepTime: 0 nulls (0.00%)
CRSArrTime: 0 nulls (0.00%)
UniqueCarrier: 0 nulls (0.00%)
FlightNum: 0 nulls (0.00%)
TailNum: 5010652 nulls (100.00%)
CRSElapsedTime: 0 nulls (0.00%)
ArrDelay: 10333 nulls (0.21%)
DepDelay: 0 nulls (0.00%)
Origin: 0 nulls (0.00%)
Dest: 0 nulls (0.00%)
Distance: 6794 nulls (0.14%)
TaxiOut: 5010652 nulls (100.00%)


## 1.2 Feature Engineering

In [10]:
# Create time-based features
df_time = df_cleaned \
    .withColumn("DepTime_Hour", F.floor(F.col("DepTime")/100)) \
    .withColumn("DepTime_Minute", F.col("DepTime") % 100) \
    .withColumn("CRSDepTime_Hour", F.floor(F.col("CRSDepTime")/100)) \
    .withColumn("CRSDepTime_Minute", F.col("CRSDepTime") % 100) \
    .withColumn("CRSArrTime_Hour", F.floor(F.col("CRSArrTime")/100)) \
    .withColumn("CRSArrTime_Minute", F.col("CRSArrTime") % 100)

In [11]:
# Create categorical features
df_categorical = df_time \
    .withColumn("IsWeekend", F.when(F.col("DayOfWeek").isin([6, 7]), 1).otherwise(0)) \
    .withColumn("TimeOfDay",
        F.when((F.col("DepTime_Hour") >= 5) & (F.col("DepTime_Hour") < 12), "morning")
        .when((F.col("DepTime_Hour") >= 12) & (F.col("DepTime_Hour") < 17), "afternoon")
        .when((F.col("DepTime_Hour") >= 17) & (F.col("DepTime_Hour") < 22), "evening")
        .otherwise("night")) \
    .withColumn("IsDelayed", F.when(F.col("DepDelay") > 0, 1).otherwise(0))

In [12]:
# Create distance-based features, source: https://www.airliners.net/forum/viewtopic.php?t=461349
df_distance = df_categorical \
    .withColumn("DistanceCategory",
        F.when(F.col("Distance") <= 750, "short")
        .when((F.col("Distance") > 750) & (F.col("Distance") <= 2500), "medium")
        .otherwise("long"))

In [13]:
# Create route frequency features - Between origin and destination what is the count of flight during year X.
route_frequencies = df_distance.groupBy("Origin", "Dest") \
    .count() \
    .withColumnRenamed("count", "RouteFrequency")

In [14]:
# Add route frequencies to main dataframe
df_final = df_distance.join(route_frequencies, ["Origin", "Dest"])

## 1.3 Selected Features for Modeling

In [15]:
features = [
    # Time-related features
    "Month",                 # Month of flight
    "DayOfWeek",            # Day of week (1=Monday, 7=Sunday)
    "DepTime_Hour",         # Hour of departure
    "DepTime_Minute",       # Minute of departure
    "CRSDepTime_Hour",      # Scheduled departure hour
    "CRSDepTime_Minute",    # Scheduled departure minute

    # Flight-specific features
    "UniqueCarrier",        # Airline identifier
    "Distance",             # Flight distance in miles
    "CRSElapsedTime",      # Scheduled flight time

    # Derived features
    "IsWeekend",           # Weekend indicator
    "TimeOfDay",           # Time period of day
    "DistanceCategory",    # Flight distance category

    # Route information
    "Origin",              # Origin airport
    "Dest",               # Destination airport
    "RouteFrequency",     # Frequency of route

    # Delay information
    "DepDelay",           # Departure delay in minutes
    "IsDelayed",          # Binary delay indicator

    # Target variable
    "ArrDelay"            # Arrival delay in minutes
]

In [16]:
# Create final dataset
model_data = df_final.select(features)

# Show schema of final dataset
model_data.printSchema()

root
 |-- Month: integer (nullable = true)
 |-- DayOfWeek: integer (nullable = true)
 |-- DepTime_Hour: long (nullable = true)
 |-- DepTime_Minute: integer (nullable = true)
 |-- CRSDepTime_Hour: long (nullable = true)
 |-- CRSDepTime_Minute: integer (nullable = true)
 |-- UniqueCarrier: string (nullable = true)
 |-- Distance: integer (nullable = true)
 |-- CRSElapsedTime: integer (nullable = true)
 |-- IsWeekend: integer (nullable = false)
 |-- TimeOfDay: string (nullable = false)
 |-- DistanceCategory: string (nullable = false)
 |-- Origin: string (nullable = true)
 |-- Dest: string (nullable = true)
 |-- RouteFrequency: long (nullable = false)
 |-- DepDelay: integer (nullable = true)
 |-- IsDelayed: integer (nullable = false)
 |-- ArrDelay: integer (nullable = true)



In [17]:
# Additional exploratory analysis
# Display summary statistics for numerical columns
print("Summary statistics for numerical columns:")
df_final.select("Distance", "CRSElapsedTime", "DepDelay", "ArrDelay").summary().show()

# Calculate correlations with target variable
numeric_cols = ["Distance", "CRSElapsedTime", "DepDelay", "DepTime_Hour", "RouteFrequency"]
for col in numeric_cols:
    correlation = df_final.stat.corr(col, "ArrDelay")
    print(f"Correlation between {col} and ArrDelay: {correlation:.3f}")

# Analyze categorical variables distribution
categorical_cols = ["TimeOfDay", "DistanceCategory", "UniqueCarrier"]
for col in categorical_cols:
    print(f"\nDistribution of {col}:")
    df_final.groupBy(col).agg(
        F.avg("ArrDelay").alias("avg_delay"),
        F.count("*").alias("count")
    ).show()

Summary statistics for numerical columns:
+-------+-----------------+------------------+------------------+------------------+
|summary|         Distance|    CRSElapsedTime|          DepDelay|          ArrDelay|
+-------+-----------------+------------------+------------------+------------------+
|  count|          5003858|           5010652|           5010652|           5000319|
|   mean|670.8173859050356|115.09698917426314| 6.122558301793858| 5.344012252018321|
| stddev|522.8019881679623|  64.4886595947797|21.395056670055883|22.994638451256556|
|    min|               11|                 2|              -675|              -829|
|    25%|              297|                68|                -1|                -6|
|    50%|              510|                97|                 0|                 0|
|    75%|              913|               147|                 5|                 9|
|    max|             4502|              1613|              1439|              1291|
+-------+--------------

## 1.4 Saving Processed Data

In [18]:
# Use current working directory as a substitute for script directory
current_dir = os.getcwd()

# Define the path to save the file
save_path = os.path.join(current_dir, "processed_data")

# Create the directory if it doesn't exist
os.makedirs(save_path, exist_ok=True)

# Save the file
model_data.write.parquet(save_path, mode="overwrite")

# 2. Model Development

## 2.1 Prepare features for modelling

In [19]:
# First drop rows with nulls in important columns
model_data_cleaned = model_data.dropna(subset=["ArrDelay", "DepDelay", "DepTime_Hour", "DepTime_Minute"])

In [20]:
# Index categorical columns
categorical_cols = ["TimeOfDay", "DistanceCategory", "UniqueCarrier", "Origin", "Dest"]
indexers = [StringIndexer(inputCol=col, outputCol=f"{col}_idx", handleInvalid="keep")
           for col in categorical_cols]

In [21]:
# Create encoders for indexed variables
encoders = [OneHotEncoder(inputCol=f"{col}_idx", outputCol=f"{col}_encoded")
           for col in categorical_cols]

In [22]:
# Define numeric features
numeric_cols = ["Month", "DayOfWeek", "Distance", "CRSElapsedTime",
               "DepTime_Hour", "DepTime_Minute", "RouteFrequency", "DepDelay"]

In [23]:
# Combine all features
feature_cols = numeric_cols + [f"{col}_encoded" for col in categorical_cols]

In [24]:
# Create feature vector with null handling
assembler = VectorAssembler(
    inputCols=numeric_cols + [f"{col}_encoded" for col in categorical_cols],
    outputCol="features",
    handleInvalid="skip"  # Handle null values
)

## 2.2 Create and configure Random Forest model

In [25]:
# Create different models

# Linear Regression
lr = LinearRegression(
    featuresCol="features",
    labelCol="ArrDelay",
    maxIter=10
)

# Decision Tree
dt = DecisionTreeRegressor(
    featuresCol="features",
    labelCol="ArrDelay",
    maxDepth=5
)

## 2.3 Create Pipelines

In [26]:
# Create pipelines for each model
pipeline_lr = Pipeline(stages=indexers + encoders + [assembler, lr])
pipeline_dt = Pipeline(stages=indexers + encoders + [assembler, dt])

## 2.4 Split data for training

In [27]:
# Split data into training and test sets:
# - 80% for training (train_data)
# - 20% for testing (test_data)
# - seed=42 ensures reproducibility of the random split
train_data, test_data = model_data_cleaned.randomSplit([0.8, 0.2], seed=42)

## 2.5 Train and evaluate model

In [28]:
# Train and evaluate each model
models = {
    "Linear Regression": pipeline_lr,
    "Decision Tree": pipeline_dt
}

In [29]:
def train_and_evaluate_models(models, train_data, test_data):

    """
    Trains multiple models and evaluates their performance using regression metrics
    
    Args:
        models: Dictionary of model pipelines to train
        train_data: Training dataset
        test_data: Test dataset for evaluation
    Returns:
        model_metrics: Dictionary containing metrics for each model
        best_model_name: Name of the best performing model
    """

    # Print header for training process
    print("\nTraining and Evaluating Models...")
    print("-" * 50)
    
    # Initialize evaluator for regression metrics
    evaluator = RegressionEvaluator(
        labelCol="ArrDelay",        # Target variable
        predictionCol="prediction"  # Predicted values column
    )
    
    # Initialize storage for metrics and tracking best model
    model_metrics = {}
    best_model_name = None
    best_rmse = float('inf')
    
    # Train and evaluate each model in the dictionary
    for name, pipeline in models.items():
        print(f"\nTraining {name}...")

        # Fit model pipeline on training data
        model = pipeline.fit(train_data)

        # Generate predictions on test data
        predictions = model.transform(test_data)
        
        # Calculate metrics
        rmse = evaluator.setMetricName("rmse").evaluate(predictions) # Root Mean Square Error
        mae = evaluator.setMetricName("mae").evaluate(predictions) # Mean Absolute Error
        r2 = evaluator.setMetricName("r2").evaluate(predictions) # R-squared score
        
        # Store all metrics and model object
        model_metrics[name] = {
            "RMSE": rmse,
            "MAE": mae,
            "R2": r2,
            "model": model
        }
        
        # Update best model if current RMSE is lower
        if rmse < best_rmse:
            best_rmse = rmse
            best_model_name = name
        
        # Print metrics for current model
        print(f"{name} Metrics:")
        print(f"RMSE: {rmse:.2f}")
        print(f"MAE: {mae:.2f}")
        print(f"R2: {r2:.3f}")
    
    return model_metrics, best_model_name

In [30]:
def validate_model(model, data, evaluator):
    
    """
    Performs comprehensive model validation including both standard regression metrics
    and business-specific accuracy metrics
    
    Args:
        model: Trained ML model to evaluate
        data: Dataset to validate against
        evaluator: RegressionEvaluator instance for calculating metrics
    
    Returns:
        Dictionary containing all validation metrics
    """

    # Generate predictions on validation data
    predictions = model.transform(data)
    
    # Calculate standard metrics
    rmse = evaluator.setMetricName("rmse").evaluate(predictions)
    mae = evaluator.setMetricName("mae").evaluate(predictions)
    r2 = evaluator.setMetricName("r2").evaluate(predictions)
    
    # Calculate business metrics
    total_flights = predictions.count()
    
    # Calculate percentage of predictions within 15 minutes of actual delay
    accurate_predictions = predictions.filter(
        F.abs(F.col("prediction") - F.col("ArrDelay")) <= 15
    ).count()
    prediction_accuracy = accurate_predictions / total_flights
    
    # Calculate accuracy for severe delays (>60 minutes)
    severe_delays = predictions.filter(F.col("ArrDelay") > 60)
    if severe_delays.count() > 0:
        severe_correct = severe_delays.filter(
            F.col("prediction") > 60
        ).count()
        severe_delay_accuracy = severe_correct / severe_delays.count()
    else:
        severe_delay_accuracy = 0
    
    return {
        "RMSE": rmse,
        "MAE": mae,
        "R2": r2,
        "15min_Accuracy": prediction_accuracy,
        "Severe_Delay_Accuracy": severe_delay_accuracy
    }

In [31]:
# Train and evaluate all models using the helper function
# Returns metrics dictionary and name of best performing model
model_metrics, best_model_name = train_and_evaluate_models(models, train_data, test_data)
best_model = model_metrics[best_model_name]["model"]

# Display results header for best model
print("\n" + "=" * 50)
print(f"Best Performing Model: {best_model_name}")
print(f"Best RMSE: {model_metrics[best_model_name]['RMSE']:.2f}")
print(f"Best MAE: {model_metrics[best_model_name]['MAE']:.2f}")
print(f"Best R2: {model_metrics[best_model_name]['R2']:.3f}")
print("=" * 50)

# Perform detailed validation on both training and test sets
print("\nPerforming Detailed Model Validation...")
print("-" * 50)

# Initialize regression evaluator for metrics calculation
evaluator = RegressionEvaluator(labelCol="ArrDelay", predictionCol="prediction")

# Get detailed metrics for both training and test data
train_metrics = validate_model(best_model, train_data, evaluator)
test_metrics = validate_model(best_model, test_data, evaluator)

# Display validation results
print("\nDetailed Validation Results:")
print("-" * 50)
print("Metric\t\t\tTraining\tTest")
print("-" * 50)
for metric in train_metrics.keys():
    print(f"{metric:<15}\t{train_metrics[metric]:.3f}\t{test_metrics[metric]:.3f}")

# Save model and metrics to disk
# Create directory if it doesn't exist
if not os.path.exists("best_model"):
    os.makedirs("best_model")

# Save the model
best_model.write().overwrite().save("best_model")

# Prepare validation results dictionary
validation_results = {
    "model_type": best_model_name,
    "training_metrics": train_metrics,
    "test_metrics": test_metrics
}

# Save metrics as JSON file
with open("best_model/metrics.json", "w") as f:
    json.dump(validation_results, f, indent=4)

print("\nModel and validation metrics saved to 'best_model' directory")


Training and Evaluating Models...
--------------------------------------------------

Training Linear Regression...
Linear Regression Metrics:
RMSE: 14.66
MAE: 8.04
R2: 0.592

Training Decision Tree...
Decision Tree Metrics:
RMSE: 14.29
MAE: 8.53
R2: 0.612

Best Performing Model: Decision Tree
Best RMSE: 14.29
Best MAE: 8.53
Best R2: 0.612

Performing Detailed Model Validation...
--------------------------------------------------

Detailed Validation Results:
--------------------------------------------------
Metric			Training	Test
--------------------------------------------------
RMSE           	14.351	14.294
MAE            	8.548	8.526
R2             	0.611	0.612
15min_Accuracy 	0.861	0.861
Severe_Delay_Accuracy	0.801	0.797

Model and validation metrics saved to 'best_model' directory
