Cell 1: Setup Environment

In [11]:
# Install dependencies
!pip install pyspark pandas scikit-learn nltk plotly powerbiclient psutil

# Import libraries
import os
import pandas as pd
import numpy as np
import logging
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when, count, avg, to_date, length, hour, month, lag
from pyspark.sql.window import Window
from google.colab import drive
from datetime import datetime
from typing import Tuple, List
import traceback
import psutil

# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler('/content/drive/MyDrive/flight_data/pipeline.log'),
        logging.StreamHandler()
    ]
)
logger = logging.getLogger(__name__)

# Mount Google Drive
drive.mount('/content/drive')

# Initialize Spark with optimized configs
try:
    spark = SparkSession.builder \
        .appName("FlightDelayPrediction") \
        .config("spark.driver.memory", "6g") \
        .config("spark.executor.memory", "6g") \
        .config("spark.sql.shuffle.partitions", "50") \
        .config("spark.default.parallelism", "50") \
        .config("spark.memory.fraction", "0.6") \
        .getOrCreate()
    logger.info(f"Spark initialized. Version: {spark.version}")
except Exception as e:
    logger.error(f"Failed to initialize Spark: {str(e)}")
    raise

print("Environment setup complete!")

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).
Environment setup complete!


Cell 2: Load and Prepare the Dataset

In [7]:
# Install dependencies
!pip install pyspark pandas scikit-learn nltk plotly powerbiclient

# Import libraries
import os
import time
import logging
from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.functions import col, when, to_date, hour, month, concat_ws, lpad, expr
from google.colab import drive

# Mount Google Drive
drive.mount('/content/drive')

# Create directory if it doesn't exist
log_dir = '/content/drive/MyDrive/flight_data'
os.makedirs(log_dir, exist_ok=True)

# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler(os.path.join(log_dir, 'pipeline.log')),
        logging.StreamHandler()
    ]
)
logger = logging.getLogger(__name__)

# Initialize Spark
try:
    spark = SparkSession.builder \
        .appName("FlightDelayPrediction") \
        .config("spark.driver.memory", "8g") \
        .config("spark.executor.memory", "8g") \
        .getOrCreate()
    logger.info(f"Spark initialized. Version: {spark.version}")
except Exception as e:
    logger.error(f"Failed to initialize Spark: {str(e)}")
    raise

def load_and_validate_csv(path: str, name: str) -> DataFrame:
    """Load and validate CSV file."""
    try:
        df = spark.read.csv(path, header=True, inferSchema=True)
        row_count = df.count()
        if row_count == 0:
            raise ValueError(f"Empty dataset: {name}")
        logger.info(f"Loaded {name} with {row_count:,} rows")
        return df
    except Exception as e:
        logger.error(f"Error loading {name}: {str(e)}")
        raise

# Load datasets
base_path = '/content/drive/MyDrive/flight_data/'
try:
    flights_df = load_and_validate_csv(base_path + 'flights.csv', 'flights')
    airlines_df = load_and_validate_csv(base_path + 'airlines.csv', 'airlines')
    airports_df = load_and_validate_csv(base_path + 'airports.csv', 'airports')
except Exception as e:
    logger.error(f"Data loading failed: {str(e)}")
    spark.stop()
    raise

def enrich_data(flights: DataFrame, airlines: DataFrame, airports: DataFrame) -> DataFrame:
    """Join flights with airlines and airports."""
    try:
        # Rename for clarity
        flights = flights.withColumnRenamed('DAY_OF_WEEK', 'DayOfWeek') \
                        .withColumnRenamed('AIRLINE', 'AirlineCode') \
                        .withColumnRenamed('ORIGIN_AIRPORT', 'OriginCode') \
                        .withColumnRenamed('DESTINATION_AIRPORT', 'DestCode') \
                        .withColumnRenamed('DEPARTURE_DELAY', 'DepDelay') \
                        .withColumnRenamed('ARRIVAL_DELAY', 'ArrDelay')

        # Join with airlines
        flights = flights.join(airlines.select('IATA_CODE', 'AIRLINE'),
                              flights.AirlineCode == airlines.IATA_CODE, 'left') \
                        .drop('IATA_CODE') \
                        .withColumnRenamed('AIRLINE', 'AirlineName')

        # Join with airports (origin)
        flights = flights.join(airports.select('IATA_CODE', 'AIRPORT', 'CITY', 'LATITUDE', 'LONGITUDE'),
                              flights.OriginCode == airports.IATA_CODE, 'left') \
                        .drop('IATA_CODE') \
                        .withColumnRenamed('AIRPORT', 'OriginAirport') \
                        .withColumnRenamed('CITY', 'OriginCity') \
                        .withColumnRenamed('LATITUDE', 'OriginLat') \
                        .withColumnRenamed('LONGITUDE', 'OriginLon')

        # Join with airports (destination)
        flights = flights.join(airports.select('IATA_CODE', 'AIRPORT', 'CITY'),
                              flights.DestCode == airports.IATA_CODE, 'left') \
                        .drop('IATA_CODE') \
                        .withColumnRenamed('AIRPORT', 'DestAirport') \
                        .withColumnRenamed('CITY', 'DestCity')

        logger.info("Data enriched successfully")
        return flights
    except Exception as e:
        logger.error(f"Data enrichment failed: {str(e)}")
        raise

# Enrich and scale
start_time = time.time()
flights_df = enrich_data(flights_df, airlines_df, airports_df)
original_count = flights_df.count()
flights_scaled = flights_df
for _ in range(1):  # 2x to ~11.6M
    flights_scaled = flights_scaled.union(flights_df)
logger.info(f"Scaled data from {original_count:,} to {flights_scaled.count():,} rows in {time.time() - start_time:.2f} seconds")
df = flights_scaled

def clean_data(df: DataFrame) -> DataFrame:
    """Clean and preprocess data."""
    try:
        # Step 1: Basic cleaning
        df = df.dropDuplicates(['FLIGHT_NUMBER', 'YEAR', 'MONTH', 'DAY'])
        df = df.na.drop(subset=['DepDelay', 'ArrDelay', 'AirlineName', 'OriginCode', 'DestCode'])

        # Step 2: Add IsDelayed flag
        df = df.withColumn('IsDelayed', when(col('ArrDelay') > 15, 1).otherwise(0))

        # Step 3: Create FlightDate from YEAR/MONTH/DAY with padded values
        df = df.withColumn(
            'FlightDate',
            to_date(
                concat_ws('-',
                          col('YEAR'),
                          lpad(col('MONTH'), 2, '0'),
                          lpad(col('DAY'), 2, '0')
                ),
                'yyyy-MM-dd'
            )
        )

        # Step 4: Convert SCHEDULED_DEPARTURE to timestamp and extract hour
        df = df.withColumn(
            'DepHour',
            hour(expr("to_timestamp(lpad(cast(SCHEDULED_DEPARTURE as string), 4, '0'), 'HHmm')"))
        ).withColumn('Month', month('FlightDate'))

        logger.info(f"Cleaned data. Rows after cleaning: {df.count():,}")
        return df
    except Exception as e:
        logger.error(f"Data cleaning failed: {str(e)}")
        raise

# Clean data
df = clean_data(df)

# Save cleaned data
output_path = '/content/drive/MyDrive/flight_data/cleaned_flights.csv'
try:
    df.coalesce(1).write.csv(output_path, header=True, mode='overwrite')
    logger.info("Cleaned data saved as single CSV!")
    df.show(5)
except Exception as e:
    logger.error(f"Error saving CSV: {str(e)}")
    df.coalesce(1).write.parquet('/content/drive/MyDrive/flight_data/cleaned_flights.parquet', mode='overwrite')
    logger.info("Saved as Parquet instead!")

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).
+----+-----+---+---------+-----------+-------------+-----------+----------+--------+-------------------+--------------+--------+--------+----------+--------------+------------+--------+--------+---------+-------+-----------------+------------+--------+--------+---------+-------------------+----------------+--------------+-------------+-------------------+-------------+--------------------+--------------------+----------+---------+---------+--------------------+--------+---------+----------+-------+
|YEAR|Month|DAY|DayOfWeek|AirlineCode|FLIGHT_NUMBER|TAIL_NUMBER|OriginCode|DestCode|SCHEDULED_DEPARTURE|DEPARTURE_TIME|DepDelay|TAXI_OUT|WHEELS_OFF|SCHEDULED_TIME|ELAPSED_TIME|AIR_TIME|DISTANCE|WHEELS_ON|TAXI_IN|SCHEDULED_ARRIVAL|ARRIVAL_TIME|ArrDelay|DIVERTED|CANCELLED|CANCELLATION_REASON|AIR_SYSTEM_DELAY|SECURITY_DELAY|AIRLINE_DELAY|LATE_AIRCRAFT_DELAY|WEATHER_DE

Cell 3: Feature Engineering

In [8]:
def engineer_features(df: SparkSession) -> SparkSession:
    """Create advanced features."""
    try:
        # Route congestion
        route_counts = df.groupBy('OriginCode', 'DestCode').count() \
                        .withColumnRenamed('count', 'RouteFrequency')
        df = df.join(route_counts, ['OriginCode', 'DestCode'], 'left')

        # Previous flight delay (lag feature)
        window = Window.partitionBy('AirlineName').orderBy('FlightDate')
        df = df.withColumn('PrevDelay', lag('ArrDelay').over(window))
        df = df.na.fill({'PrevDelay': 0})

        # Time-based features
        df = df.withColumn('IsWeekend', when(col('DayOfWeek').isin([6, 7]), 1).otherwise(0))
        df = df.withColumn('IsPeakHour', when(col('DepHour').isin([7, 8, 17, 18]), 1).otherwise(0))

        logger.info("Feature engineering completed")
        return df
    except Exception as e:
        logger.error(f"Feature engineering failed: {str(e)}")
        raise

df = engineer_features(df)
logger.info("Features added. Sample:")
display(df.select('FlightDate', 'AirlineName', 'RouteFrequency', 'PrevDelay', 'IsWeekend', 'IsPeakHour').show(5))

+----------+--------------------+--------------+---------+---------+----------+
|FlightDate|         AirlineName|RouteFrequency|PrevDelay|IsWeekend|IsPeakHour|
+----------+--------------------+--------------+---------+---------+----------+
|2015-01-01|American Eagle Ai...|          1203|        0|        0|         0|
|2015-01-01|American Eagle Ai...|           461|       -2|        0|         0|
|2015-01-01|American Eagle Ai...|           461|       50|        0|         1|
|2015-01-01|American Eagle Ai...|           461|       41|        0|         0|
|2015-01-01|American Eagle Ai...|          1053|       13|        0|         0|
+----------+--------------------+--------------+---------+---------+----------+
only showing top 5 rows



None

Cell 4: Exploratory Data Analysis (EDA)

In [9]:
def run_eda(df: SparkSession) -> None:
    """Perform comprehensive EDA."""
    try:
        df.createOrReplaceTempView('flights')

        # Delay stats by airline
        airline_stats = spark.sql("""
            SELECT AirlineName,
                   COUNT(*) as TotalFlights,
                   AVG(IsDelayed) as DelayRate,
                   AVG(ArrDelay) as AvgDelayMinutes,
                   AVG(DISTANCE) as AvgDistance
            FROM flights
            GROUP BY AirlineName
            ORDER BY DelayRate DESC
        """)
        logger.info("Delay Stats by Airline:")
        display(airline_stats.show(10))

        # Delay by month
        monthly_stats = spark.sql("""
            SELECT Month,
                   COUNT(*) as TotalFlights,
                   AVG(IsDelayed) as DelayRate
            FROM flights
            GROUP BY Month
            ORDER BY Month
        """)
        logger.info("Delay Stats by Month:")
        display(monthly_stats.show(12))

        # High-risk routes
        route_stats = spark.sql("""
            SELECT OriginAirport, DestAirport,
                   COUNT(*) as TotalFlights,
                   AVG(IsDelayed) as DelayRate
            FROM flights
            GROUP BY OriginAirport, DestAirport
            HAVING TotalFlights > 100
            ORDER BY DelayRate DESC
            LIMIT 10
        """)
        logger.info("High-Risk Routes:")
        display(route_stats.show())

        # Correlation analysis
        corr_cols = ['DepDelay', 'ArrDelay', 'DISTANCE', 'RouteFrequency', 'PrevDelay']
        corr_matrix = []
        for c1 in corr_cols:
            row = []
            for c2 in corr_cols:
                corr = df.select(c1, c2).na.drop().stat.corr(c1, c2)
                row.append(corr)
            corr_matrix.append(row)
        corr_df = pd.DataFrame(corr_matrix, columns=corr_cols, index=corr_cols)
        logger.info("Correlation Matrix:")
        display(corr_df)
    except Exception as e:
        logger.error(f"EDA failed: {str(e)}")
        raise

run_eda(df)

+--------------------+------------+-------------------+-------------------+------------------+
|         AirlineName|TotalFlights|          DelayRate|    AvgDelayMinutes|       AvgDistance|
+--------------------+------------+-------------------+-------------------+------------------+
|    Spirit Air Lines|       21758| 0.2204246713852376|  8.484833164812942| 968.2922143579373|
|American Eagle Ai...|      159245|0.20313353637476844|  6.089993406386386| 432.2181983735753|
|Frontier Airlines...|       18649| 0.1906268432623733|  8.184835648024023|  966.588396160652|
|Atlantic Southeas...|      326184| 0.1777677629804037|  5.595323498393545| 475.7029345400142|
|Skywest Airlines ...|      345777| 0.1732301454405585|  5.592419391688863|510.32096408957216|
|      Virgin America|       11453| 0.1441543700340522| 1.2172356587793591|1382.9048284292326|
|     US Airways Inc.|       45840|0.13863438045375218| 0.6334424083769633|  879.129537521815|
|     JetBlue Airways|       60648|0.1353548344545

None

+-----+------------+-------------------+
|Month|TotalFlights|          DelayRate|
+-----+------------+-------------------+
|    1|      158864| 0.1847051566119448|
|    2|      141745|0.20449398567850718|
|    3|      163779| 0.1574621899022463|
|    4|      158156|0.13003616682263083|
|    5|      159733| 0.1351067093211797|
|    6|      157123|0.17144530081528483|
|    7|      164005| 0.1480869485686412|
|    8|      161855|0.13424361311050015|
|    9|      154128|0.09917081905948302|
|   10|      160870|0.09998135140175297|
|   11|      153889|0.12072337853907687|
|   12|      161036|0.16707444298169352|
+-----+------------+-------------------+



None

+--------------------+--------------------+------------+-------------------+
|       OriginAirport|         DestAirport|TotalFlights|          DelayRate|
+--------------------+--------------------+------------+-------------------+
|LaGuardia Airport...|Louisville Intern...|         224| 0.5267857142857143|
|Dallas/Fort Worth...|Aspen-Pitkin Coun...|         215| 0.4186046511627907|
|LaGuardia Airport...|Washington Dulles...|         172| 0.4069767441860465|
|Chicago O'Hare In...|Boise Airport (Bo...|         229| 0.4017467248908297|
|Aspen-Pitkin Coun...|Dallas/Fort Worth...|         107| 0.3925233644859813|
|Trenton Mercer Ai...|Orlando Internati...|         105| 0.3904761904761905|
|Philadelphia Inte...|Myrtle Beach Inte...|         131| 0.3816793893129771|
|Chicago O'Hare In...|Aspen-Pitkin Coun...|         395| 0.3721518987341772|
|LaGuardia Airport...|Birmingham-Shuttl...|         370| 0.3675675675675676|
|Newark Liberty In...|McGhee Tyson Airport|         211|0.36492890995260663|

None

Unnamed: 0,DepDelay,ArrDelay,DISTANCE,RouteFrequency,PrevDelay
DepDelay,1.0,0.942628,0.004176,0.027492,0.061385
ArrDelay,0.942628,1.0,-0.042204,0.033276,0.076061
DISTANCE,0.004176,-0.042204,1.0,-0.134862,-0.033173
RouteFrequency,0.027492,0.033276,-0.134695,1.0,0.015883
PrevDelay,0.061385,0.076061,-0.033173,0.015883,1.0


Cell 5: Machine Learning Pipeline

In [15]:
import logging
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler, StringIndexer, StandardScaler
from pyspark.ml.classification import RandomForestClassifier, LogisticRegression
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator
from pyspark.ml import Pipeline
from pyspark.sql.functions import col, when
from pyspark.ml.functions import vector_to_array
import pandas as pd
import plotly.express as px
import plotly.graph_objects as go
from typing import Tuple, List
import time
import psutil
import gc
import numpy as np
from sklearn.metrics import roc_curve

# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler('/content/drive/MyDrive/flight_data/pipeline.log'),
        logging.StreamHandler()
    ]
)
logger = logging.getLogger(__name__)

def monitor_resources() -> None:
    """Log CPU and memory usage."""
    cpu_percent = psutil.cpu_percent()
    memory = psutil.virtual_memory()
    logger.info(f"CPU Usage: {cpu_percent}% | Memory Usage: {memory.percent}% | Available: {memory.available / (1024**3):.2f} GB")

def build_ml_pipeline() -> Tuple[Pipeline, ParamGridBuilder, List[str]]:
    """Create optimized ML pipeline, parameter grid, and feature list."""
    try:
        # Index categorical variables
        indexer = StringIndexer(
            inputCols=['AirlineName', 'OriginCode', 'DestCode'],
            outputCols=['AirlineIdx', 'OriginIdx', 'DestIdx'],
            handleInvalid='keep'
        )

        # Assemble features
        feature_cols = ['DayOfWeek', 'DepHour', 'Month', 'DISTANCE', 'RouteFrequency',
                        'PrevDelay', 'IsWeekend', 'IsPeakHour', 'AirlineIdx', 'OriginIdx', 'DestIdx']
        assembler = VectorAssembler(
            inputCols=feature_cols,
            outputCol='raw_features',
            handleInvalid='skip'
        )

        # Scale features
        scaler = StandardScaler(inputCol='raw_features', outputCol='features')

        # Models (will add weightCol later)
        rf = RandomForestClassifier(labelCol='IsDelayed', featuresCol='features', seed=42)
        lr = LogisticRegression(labelCol='IsDelayed', featuresCol='features')

        # Pipeline with RF
        pipeline = Pipeline(stages=[indexer, assembler, scaler, rf])

        # Simplified parameter grid
        param_grid = ParamGridBuilder() \
            .addGrid(rf.numTrees, [50]) \
            .addGrid(rf.maxDepth, [5]) \
            .build()

        logger.info("Optimized ML pipeline configured with Random Forest")
        monitor_resources()
        return pipeline, param_grid, feature_cols
    except Exception as e:
        logger.error(f"ML pipeline setup failed: {str(e)}")
        raise

def train_lr_model(train: SparkSession, test: SparkSession, feature_cols: List[str], weight_col: str = 'weight') -> dict:
    """Train and evaluate Logistic Regression separately."""
    try:
        indexer = StringIndexer(
            inputCols=['AirlineName', 'OriginCode', 'DestCode'],
            outputCols=['AirlineIdx', 'OriginIdx', 'DestIdx'],
            handleInvalid='keep'
        )
        assembler = VectorAssembler(
            inputCols=feature_cols,
            outputCol='raw_features',
            handleInvalid='skip'
        )
        scaler = StandardScaler(inputCol='raw_features', outputCol='features')
        lr = LogisticRegression(labelCol='IsDelayed', featuresCol='features', weightCol=weight_col, regParam=0.01)
        pipeline = Pipeline(stages=[indexer, assembler, scaler, lr])

        start_time = time.time()
        model = pipeline.fit(train)
        logger.info(f"Logistic Regression trained in {(time.time() - start_time)/60:.2f} minutes")

        predictions = model.transform(test)
        evaluator = BinaryClassificationEvaluator(labelCol='IsDelayed', metricName='areaUnderROC')
        auc = evaluator.evaluate(predictions)
        multi_evaluator = MulticlassClassificationEvaluator(
            labelCol='IsDelayed',
            predictionCol='prediction',
            metricName='f1'
        )
        f1 = multi_evaluator.evaluate(predictions)
        precision = multi_evaluator.setMetricName('precisionByLabel').evaluate(predictions)
        recall = multi_evaluator.setMetricName('recallByLabel').evaluate(predictions)
        logger.info(f"Logistic Regression - AUC: {auc:.3f}, F1: {f1:.3f}, Precision: {precision:.3f}, Recall: {recall:.3f}")

        return {'predictions': predictions, 'auc': auc, 'f1': f1, 'precision': precision, 'recall': recall}
    except Exception as e:
        logger.error(f"Logistic Regression training failed: {str(e)}")
        raise

def train_and_evaluate(df: SparkSession, sample_fraction: float = 0.1) -> SparkSession:
    """Train and evaluate ML models with optimizations and class imbalance handling."""
    try:
        # Sample data
        logger.info(f"Sampling {sample_fraction*100}% of data for training")
        df_sample = df.sample(fraction=sample_fraction, seed=42)
        df_sample.cache()
        total_rows = df_sample.count()
        logger.info(f"Sampled rows: {total_rows:,}")

        # Compute class weights for imbalance
        class_counts = df_sample.groupBy('IsDelayed').count().collect()
        total = sum(row['count'] for row in class_counts)
        weights = {row['IsDelayed']: total / (len(class_counts) * row['count']) for row in class_counts}
        df_sample = df_sample.withColumn(
            'weight',
            when(col('IsDelayed') == 1, weights[1]).otherwise(weights[0])
        )
        logger.info(f"Class weights: {weights}")

        # Split data
        train, test = df_sample.randomSplit([0.8, 0.2], seed=42)
        train_rows, test_rows = train.count(), test.count()
        logger.info(f"Train rows: {train_rows:,}, Test rows: {test_rows:,}")

        # Checkpoint
        spark.sparkContext.setCheckpointDir('/content/drive/MyDrive/flight_data/checkpoints')
        train.checkpoint()
        test.checkpoint()
        logger.info("Data checkpointed to disk")

        # Train Random Forest
        pipeline, param_grid, feature_cols = build_ml_pipeline()
        # Update RF to use weightCol
        rf = RandomForestClassifier(labelCol='IsDelayed', featuresCol='features', weightCol='weight', seed=42)
        pipeline.setStages(pipeline.getStages()[:-1] + [rf])

        evaluator = BinaryClassificationEvaluator(labelCol='IsDelayed', metricName='areaUnderROC')
        cv = CrossValidator(
            estimator=pipeline,
            estimatorParamMaps=param_grid,
            evaluator=evaluator,
            numFolds=2,
            seed=42,
            parallelism=2
        )

        start_time = time.time()
        monitor_resources()
        rf_model = cv.fit(train)
        rf_training_time = (time.time() - start_time) / 60
        logger.info(f"Random Forest training completed in {rf_training_time:.2f} minutes")

        rf_predictions = rf_model.transform(test)
        rf_auc = evaluator.evaluate(rf_predictions)
        multi_evaluator = MulticlassClassificationEvaluator(
            labelCol='IsDelayed',
            predictionCol='prediction',
            metricName='f1'
        )
        rf_f1 = multi_evaluator.evaluate(rf_predictions)
        rf_precision = multi_evaluator.setMetricName('precisionByLabel').evaluate(rf_predictions)
        rf_recall = multi_evaluator.setMetricName('recallByLabel').evaluate(rf_predictions)
        logger.info(f"Random Forest - AUC: {rf_auc:.3f}, F1: {rf_f1:.3f}, Precision: {rf_precision:.3f}, Recall: {rf_recall:.3f}")

        # Train LR
        lr_results = train_lr_model(train, test, feature_cols, weight_col='weight')
        lr_predictions, lr_auc, lr_f1, lr_precision, lr_recall = (
            lr_results['predictions'], lr_results['auc'], lr_results['f1'],
            lr_results['precision'], lr_results['recall']
        )

        # Model comparison
        model_comparison = pd.DataFrame({
            'Model': ['Random Forest', 'Logistic Regression'],
            'AUC': [rf_auc, lr_auc],
            'F1 Score': [rf_f1, lr_f1],
            'Precision': [rf_precision, lr_precision],
            'Recall': [rf_recall, lr_recall]
        })
        logger.info("Model Comparison:")
        display(model_comparison)

        # Select best model
        best_predictions = rf_predictions if rf_auc >= lr_auc else lr_predictions
        logger.info(f"Selected {'Random Forest' if rf_auc >= lr_auc else 'Logistic Regression'} as best model")

        # Feature importance (RF)
        try:
            if rf_auc >= lr_auc:
                rf_stage = rf_model.bestModel.stages[-1]
                importance = pd.DataFrame({
                    'Feature': feature_cols,
                    'Importance': rf_stage.featureImportances.toArray()
                }).sort_values('Importance', ascending=False)
                logger.info("Feature Importance:")
                display(importance)

                fig_importance = px.bar(
                    importance,
                    x='Feature',
                    y='Importance',
                    title='Feature Importance in Delay Prediction',
                    labels={'Importance': 'Importance Score'}
                )
                fig_importance.write_html('/content/drive/MyDrive/flight_data/feature_importance.html')
                fig_importance.show()
        except Exception as e:
            logger.warning(f"Feature importance failed: {str(e)}. Continuing with predictions.")

        # Confusion matrix
        try:
            confusion = best_predictions.groupBy('IsDelayed', 'prediction').count().toPandas()
            logger.info("Confusion Matrix:")
            display(confusion.pivot(index='IsDelayed', columns='prediction', values='count').fillna(0))
        except Exception as e:
            logger.warning(f"Confusion matrix failed: {str(e)}. Continuing with predictions.")

        # ROC curve
        try:
            probs = best_predictions.select(
                vector_to_array(col('probability'))[1].cast('double').alias('delay_probability'),
                'IsDelayed'
            ).toPandas()
            fpr, tpr, _ = roc_curve(probs['IsDelayed'], probs['delay_probability'])
            roc_df = pd.DataFrame({'FPR': fpr, 'TPR': tpr})
            fig_roc = px.line(
                roc_df,
                x='FPR',
                y='TPR',
                title='ROC Curve',
                labels={'FPR': 'False Positive Rate', 'TPR': 'True Positive Rate'}
            )
            fig_roc.add_trace(go.Scatter(x=[0, 1], y=[0, 1], mode='lines', line=dict(dash='dash'), name='Random'))
            fig_roc.write_html('/content/drive/MyDrive/flight_data/roc_curve.html')
            fig_roc.show()
        except Exception as e:
            logger.warning(f"ROC curve failed: {str(e)}. Continuing with predictions.")

        # Full dataset predictions
        logger.info("Applying best model to full dataset")
        try:
            full_predictions = rf_model.transform(df) if rf_auc >= lr_auc else lr_results['predictions'].union(rf_model.transform(df))
            full_predictions.cache()
        except Exception as e:
            logger.error(f"Full dataset prediction failed: {str(e)}")
            raise
            logger.error(f"Full dataset prediction failed: {str(e)}")
            raise

        # Log schema before saving
        logger.info("Predictions schema:")
        full_predictions.printSchema()

        # Save predictions
        output_path = '/content/drive/MyDrive/flight_data/predictions.csv'
        try:
            full_predictions.select(
                'FlightDate', 'AirlineName', 'OriginAirport', 'DestAirport',
                'IsDelayed', 'prediction',
                vector_to_array(col('probability'))[1].cast('double').alias('delay_probability')
            ).coalesce(1).write.csv(output_path, header=True, mode='overwrite')
            logger.info("Full predictions saved as single CSV!")
            display(full_predictions.select('FlightDate', 'AirlineName', 'IsDelayed', 'prediction').show(5))
        except Exception as e:
            logger.error(f"Saving predictions failed: {str(e)}")
            full_predictions.select(
                'FlightDate', 'AirlineName', 'OriginAirport', 'DestAirport',
                'IsDelayed', 'prediction',
                vector_to_array(col('probability'))[1].cast('double').alias('delay_probability')
            ).coalesce(1).write.parquet('/content/drive/MyDrive/flight_data/predictions.parquet', mode='overwrite')
            logger.info("Saved predictions as Parquet instead!")

        # Clean up
        df_sample.unpersist()
        full_predictions.unpersist()
        gc.collect()
        monitor_resources()

        return full_predictions
    except Exception as e:
        logger.error(f"ML training failed: {str(e)}")
        raise
    finally:
        df_sample.unpersist()

# Run pipeline
predictions = train_and_evaluate(df, sample_fraction=0.1)

Unnamed: 0,Model,AUC,F1 Score,Precision,Recall
0,Random Forest,0.659054,0.638718,0.907475,0.562008
1,Logistic Regression,0.638925,0.6881,0.897375,0.647638


Unnamed: 0,Feature,Importance
1,DepHour,0.620677
8,AirlineIdx,0.112266
5,PrevDelay,0.096751
2,Month,0.07708
9,OriginIdx,0.035594
10,DestIdx,0.026978
4,RouteFrequency,0.010026
7,IsPeakHour,0.007356
0,DayOfWeek,0.006269
6,IsWeekend,0.005085


prediction,0.0,1.0
IsDelayed,Unnamed: 1_level_1,Unnamed: 2_level_1
0,18272,14240
1,1863,3671


root
 |-- OriginCode: string (nullable = true)
 |-- DestCode: string (nullable = true)
 |-- YEAR: integer (nullable = true)
 |-- Month: integer (nullable = true)
 |-- DAY: integer (nullable = true)
 |-- DayOfWeek: integer (nullable = true)
 |-- AirlineCode: string (nullable = true)
 |-- FLIGHT_NUMBER: integer (nullable = true)
 |-- TAIL_NUMBER: string (nullable = true)
 |-- SCHEDULED_DEPARTURE: integer (nullable = true)
 |-- DEPARTURE_TIME: integer (nullable = true)
 |-- DepDelay: integer (nullable = true)
 |-- TAXI_OUT: integer (nullable = true)
 |-- WHEELS_OFF: integer (nullable = true)
 |-- SCHEDULED_TIME: integer (nullable = true)
 |-- ELAPSED_TIME: integer (nullable = true)
 |-- AIR_TIME: integer (nullable = true)
 |-- DISTANCE: integer (nullable = true)
 |-- WHEELS_ON: integer (nullable = true)
 |-- TAXI_IN: integer (nullable = true)
 |-- SCHEDULED_ARRIVAL: integer (nullable = true)
 |-- ARRIVAL_TIME: integer (nullable = true)
 |-- ArrDelay: integer (nullable = true)
 |-- DIVERTE

None

Cell 6: Business Insights and Recommendations

In [17]:
def generate_insights(predictions: SparkSession) -> None:
    """Extract business insights."""
    try:
        # Delay impact by airline
        from pyspark.ml.functions import vector_to_array  # Import required for vector access
        delay_impact = predictions.groupBy('AirlineName').agg(
            count(when(col('IsDelayed') == 1, True)).alias('ActualDelays'),
            count(when(col('prediction') == 1, True)).alias('PredictedDelays'),
            avg(vector_to_array(col('probability'))[1].cast('double')).alias('AvgDelayProb') # Access the delay probability element
        ).withColumn('Accuracy', col('ActualDelays') / col('PredictedDelays'))
        logger.info("Delay Impact by Airline:")
        display(delay_impact.show())

        # High-risk periods
        risk_periods = predictions.groupBy('Month', 'DepHour').agg(
            avg('IsDelayed').alias('DelayRate')
        ).orderBy('DelayRate', ascending=False).limit(10)
        logger.info("High-Risk Periods:")
        display(risk_periods.show())

        # Cost estimation
        total_flights = predictions.count()
        delay_rate = predictions.filter(col('IsDelayed') == 1).count() / total_flights
        cost_per_delay = 5000  # $5,000 per delay
        potential_savings = total_flights * delay_rate * 0.2 * cost_per_delay  # 20% reduction
        logger.info(f"Delay rate: {delay_rate:.1%}")
        logger.info(f"Potential savings with 20% reduction: ${potential_savings:,.2f}")

        # Recommendations
        recommendations = [
            "Optimize scheduling during high-risk periods (peak hours, specific months).",
            "Target high-delay airlines with operational improvements.",
            "Use predictive model to preemptively reroute flights on high-risk routes."
        ]
        logger.info("Recommendations:")
        for rec in recommendations:
            print(rec)
    except Exception as e:
        logger.error(f"Insights generation failed: {str(e)}")
        raise

generate_insights(predictions)

+--------------------+------------+---------------+-------------------+-------------------+
|         AirlineName|ActualDelays|PredictedDelays|       AvgDelayProb|           Accuracy|
+--------------------+------------+---------------+-------------------+-------------------+
|Alaska Airlines Inc.|        3405|           8557| 0.4399967204109929| 0.3979198317167231|
|American Eagle Ai...|       32367|          98359| 0.5202838273790709| 0.3290700393456623|
|Atlantic Southeas...|       58021|         228330| 0.5319472198663772| 0.2541102789821749|
|Frontier Airlines...|        3564|           5978| 0.4743262050865678| 0.5961860153897625|
|Skywest Airlines ...|       59932|         239430| 0.5303084569679315| 0.2503111556613624|
|American Airlines...|       25406|          57487| 0.4546036368150073| 0.4419433958982031|
|Delta Air Lines Inc.|       30648|          98405| 0.4470371232062101| 0.3114475890452721|
|     JetBlue Airways|        8247|          17909|0.45011068164005474|0.4604947

None

+-----+-------+-------------------+
|Month|DepHour|          DelayRate|
+-----+-------+-------------------+
|    2|      3|0.34782608695652173|
|    6|     18| 0.3427299703264095|
|    6|     19| 0.3403351427897097|
|    2|     19| 0.3344239308249591|
|   10|      4| 0.3333333333333333|
|    9|      4| 0.3333333333333333|
|    2|     18| 0.3269677419354839|
|   12|      4| 0.3230769230769231|
|    6|     20|0.32254368375823544|
|    6|     17| 0.3178877426123448|
+-----+-------+-------------------+



None

Optimize scheduling during high-risk periods (peak hours, specific months).
Target high-delay airlines with operational improvements.
Use predictive model to preemptively reroute flights on high-risk routes.


Cell 7: Advanced Visualization

In [18]:
def create_visualizations(predictions: SparkSession) -> None:
    """Create advanced Plotly visualizations."""
    try:
        # Delay rate by airline
        airline_delays = predictions.groupBy('AirlineName').agg(
            avg('IsDelayed').alias('DelayRate')
        ).toPandas()
        fig1 = px.bar(airline_delays, x='AirlineName', y='DelayRate',
                     title='Flight Delay Rate by Airline',
                     labels={'DelayRate': 'Delay Rate (%)'})
        fig1.write_html('/content/drive/MyDrive/flight_data/airline_delays.html')

        # Delay probability distribution
        prob_dist = predictions.select('probability').toPandas()
        prob_dist['DelayProb'] = prob_dist['probability'].apply(lambda x: x[1])
        fig2 = px.histogram(prob_dist, x='DelayProb', nbins=50,
                           title='Distribution of Delay Probabilities',
                           labels={'DelayProb': 'Probability of Delay'})
        fig2.write_html('/content/drive/MyDrive/flight_data/prob_dist.html')

        # Delay trends by month
        monthly_delays = predictions.groupBy('Month').agg(
            avg('IsDelayed').alias('DelayRate')
        ).toPandas()
        fig3 = px.line(monthly_delays, x='Month', y='DelayRate',
                      title='Delay Rate by Month',
                      labels={'DelayRate': 'Delay Rate (%)'})
        fig3.write_html('/content/drive/MyDrive/flight_data/monthly_delays.html')

        logger.info("Visualizations saved as HTML")
        fig1.show()
    except Exception as e:
        logger.error(f"Visualization failed: {str(e)}")
        raise

create_visualizations(predictions)

Cell 8: Stop Spark

In [19]:
spark.stop()
logger.info("Spark session stopped.")