# Flight Delay Prediction and Analysis Using PySpark and AWS  
### [Dataset Source: US DOT Flight Delays on Kaggle](https://www.kaggle.com/datasets/usdot/flight-delays/data)

---

## About the Dataset

### **Context**
The U.S. Department of Transportation's (DOT) Bureau of Transportation Statistics tracks the on-time performance of domestic flights operated by large air carriers. This dataset specifically captures information on flight delays, cancellations, and diversions from the year 2015. Summary information regarding on-time performance is regularly published in the DOT's monthly Air Travel Consumer Report.

### **Acknowledgements**
The flight delay and cancellation data was collected and published by the DOT's Bureau of Transportation Statistics.



---

### **Installation of Required Python Libraries**

This code cell installs specific Python libraries into the Spark environment at runtime, ensuring all necessary dependencies are available for subsequent data processing and analysis tasks.

**Installed Packages:**

| Package              | Version    | Purpose                                                  |
|----------------------|------------|----------------------------------------------------------|
| `python-dateutil`    | `2.8.2`    | Utilities for parsing, manipulating, and formatting dates|
| `numpy`              | Latest     | Numerical operations and array manipulations             |
| `pandas`             | Latest     | Data analysis, manipulation, and DataFrame management    |
| `matplotlib`         | Latest     | Data visualization and plotting                          |
| `boto3`              | Latest     | AWS SDK for Python, used to interact with AWS services   |
| `fsspec`             | Latest     | File system specification interface for Python           |
| `s3fs`               | Latest     | Pythonic file system interface to Amazon S3              |

**Key Points:**

- These libraries extend Spark’s capabilities, enabling advanced data processing, numerical computations, visualization, and seamless integration with cloud storage services (AWS S3).
- Explicitly specifying the version (`2.8.2`) for `python-dateutil` ensures compatibility and reproducibility.
- Using `sc.install_pypi_package()` allows dynamic package installation at runtime within a PySpark environment, ensuring these libraries are available across all Spark executors.

---


In [1]:
sc.install_pypi_package("python-dateutil==2.8.2")
sc.install_pypi_package("numpy")
sc.install_pypi_package("pandas")
sc.install_pypi_package("matplotlib")
sc.install_pypi_package("boto3")
sc.install_pypi_package("fsspec")
sc.install_pypi_package("s3fs")

VBox()

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
1,application_1741886625437_0002,pyspark,idle,Link,Link,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Collecting python-dateutil==2.8.2
  Downloading python_dateutil-2.8.2-py2.py3-none-any.whl (247 kB)
Installing collected packages: python-dateutil
  Attempting uninstall: python-dateutil
    Found existing installation: python-dateutil 2.8.1
    Not uninstalling python-dateutil at /usr/lib/python3.9/site-packages, outside environment /mnt/yarn/usercache/livy/appcache/application_1741886625437_0002/container_1741886625437_0002_01_000001/tmp/spark-bf5122bd-a3ff-4b6d-9e23-3282eb84fdfc
    Can't uninstall 'python-dateutil'. No files were found to uninstall.
Successfully installed python-dateutil-2.8.2

Collecting numpy
  Downloading numpy-2.0.2-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (19.5 MB)
Installing collected packages: numpy
Successfully installed numpy-2.0.2

Collecting pandas
  Downloading pandas-2.2.3-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (13.1 MB)
Collecting tzdata>=2022.7
  Downloading tzdata-2025.1-py2.py3-none-any.whl (346 kB)
Installing coll


---

### **Importing Necessary Libraries for Data Analysis and Modeling**

This cell imports essential libraries and functions required for data manipulation, preprocessing, visualization, and machine learning model implementation using PySpark and Python.

**Imported Libraries and Functions:**

| Library/Function                                | Purpose / Usage                                            |
|-------------------------------------------------|------------------------------------------------------------|
| `pyspark.sql.SparkSession`                      | Entry point to interact with Spark DataFrames and SQL operations |
| `pyspark.sql.functions as F`                    | Provides built-in PySpark DataFrame functions for transformations |
| `pyspark.sql.functions.when`                    | Conditional expressions (similar to SQL CASE WHEN)         |
| `pyspark.sql.functions.col, sum`                | Column-based DataFrame operations and aggregation functions |
| `matplotlib.pyplot as plt`                      | Data visualization and plotting                            |
| `pandas as pd`                                  | Data manipulation, analysis, and conversion between PySpark and Pandas DataFrames |
| `boto3`                                         | AWS SDK for Python; interacting with AWS services (e.g., S3)|
| `time`                                          | Tracking execution times, timestamps, or delays            |
| `pyspark.sql.types.IntegerType`                 | Explicitly defining or casting columns to integer data type|
| `pyspark.ml.feature.StringIndexer`              | Converts categorical string columns into numeric indices   |
| `pyspark.ml.feature.OneHotEncoder`              | Encodes categorical indices into one-hot encoded vectors   |
| `pyspark.ml.feature.VectorAssembler`            | Combines multiple feature columns into a single vector     |
| `pyspark.ml.Pipeline`                           | Combines multiple stages of machine learning workflow into a single, streamlined pipeline |
| `pyspark.ml.classification.LogisticRegression`  | Implements Logistic Regression classifier for predictive modeling tasks |

**Key Points:**

- **Data Manipulation & SQL Functions**:  
  Imports PySpark's built-in functions to facilitate data manipulation (e.g., aggregations, transformations, conditional logic).

- **Visualization & Analysis**:  
  Incorporates `matplotlib` and `pandas` for exploratory data analysis (EDA), visualization, and converting Spark DataFrames to Pandas DataFrames for convenience in visualizing and exploring data.

- **AWS Integration (`boto3`)**:  
  Enables direct interaction with AWS cloud storage services such as S3, allowing for streamlined data retrieval/storage.

- **Machine Learning & Preprocessing**:  
  Prepares categorical data for machine learning by converting string categories to numerical indices (`StringIndexer`) and encoding them as vectors (`OneHotEncoder`), followed by assembling all features into a single vector suitable for Spark ML models (`VectorAssembler`). Utilizes Spark ML’s pipeline functionality to streamline this process.

- **Logistic Regression**:  
  Imports logistic regression, commonly used for classification tasks (binary or multiclass), as the machine learning algorithm for model training and predictions.

---

In [2]:
from pyspark.sql import SparkSession, functions as F
from pyspark.sql.functions import when
from pyspark.sql.functions import col, sum
import matplotlib.pyplot as plt
import pandas as pd
import boto3, time
from pyspark.sql.types import IntegerType
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

---

### **Loading Data from Amazon S3 into Spark DataFrames**

This cell reads three datasets directly from Amazon S3 into Spark DataFrames:

- **`flights_df`**: Contains data related to flights.
- **`airports_df`**: Contains information about airports.
- **`airlines_df`**: Contains details about airlines.

The datasets are loaded with:

- Headers automatically detected (`header=true`).
- Data types inferred automatically (`inferSchema=true`).

We also have to make sure the provided S3 paths correctly point to the datasets.

---

In [4]:
# Read raw data from S3
# (Make sure these S3 paths point to your dataset locations.)
flights_df = spark.read.option("header", "true").option("inferSchema", "true") \
    .csv("s3://final-csc555/athena/flights.csv")

airports_df = spark.read.option("header", "true").option("inferSchema", "true") \
    .csv("s3://final-csc555/athena/airports.csv")

airlines_df = spark.read.option("header", "true").option("inferSchema", "true") \
    .csv("s3://final-csc555/athena/airlines.csv")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [6]:
flights_df.show(5)
airports_df.show(5)
airlines_df.show(5)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----+-----+---+-----------+-------+-------------+-----------+--------------+-------------------+-------------------+--------------+---------------+--------+----------+--------------+------------+--------+--------+---------+-------+-----------------+------------+-------------+--------+---------+-------------------+----------------+--------------+-------------+-------------------+-------------+
|YEAR|MONTH|DAY|DAY_OF_WEEK|AIRLINE|FLIGHT_NUMBER|TAIL_NUMBER|ORIGIN_AIRPORT|DESTINATION_AIRPORT|SCHEDULED_DEPARTURE|DEPARTURE_TIME|DEPARTURE_DELAY|TAXI_OUT|WHEELS_OFF|SCHEDULED_TIME|ELAPSED_TIME|AIR_TIME|DISTANCE|WHEELS_ON|TAXI_IN|SCHEDULED_ARRIVAL|ARRIVAL_TIME|ARRIVAL_DELAY|DIVERTED|CANCELLED|CANCELLATION_REASON|AIR_SYSTEM_DELAY|SECURITY_DELAY|AIRLINE_DELAY|LATE_AIRCRAFT_DELAY|WEATHER_DELAY|
+----+-----+---+-----------+-------+-------------+-----------+--------------+-------------------+-------------------+--------------+---------------+--------+----------+--------------+------------+--------+-

---

### **Checking for Missing Values (Null Counts)**

This cell defines a helper function, `count_nulls(df)`, to count the number of missing values (`null`) in each column of a Spark DataFrame. 

It then applies this function to each loaded DataFrame (`flights_df`, `airports_df`, and `airlines_df`) and displays the counts of missing values for quick data quality assessment.

- **Purpose**: Quickly identify columns with missing data to guide data cleaning and preprocessing steps.

---

In [6]:
#from pyspark.sql.functions import col, sum

# Define a helper function to count nulls for each column in a DataFrame
def count_nulls(df):
    return df.select([sum(col(c).isNull().cast("int")).alias(c) for c in df.columns])

# Count nulls for each DataFrame and display the results
print("Null counts in flights_df:")
count_nulls(flights_df).show()

print("Null counts in airports_df:")
count_nulls(airports_df).show()

print("Null counts in airlines_df:")
count_nulls(airlines_df).show()


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Null counts in flights_df:
+----+-----+---+-----------+-------+-------------+-----------+--------------+-------------------+-------------------+--------------+---------------+--------+----------+--------------+------------+--------+--------+---------+-------+-----------------+------------+-------------+--------+---------+-------------------+----------------+--------------+-------------+-------------------+-------------+
|YEAR|MONTH|DAY|DAY_OF_WEEK|AIRLINE|FLIGHT_NUMBER|TAIL_NUMBER|ORIGIN_AIRPORT|DESTINATION_AIRPORT|SCHEDULED_DEPARTURE|DEPARTURE_TIME|DEPARTURE_DELAY|TAXI_OUT|WHEELS_OFF|SCHEDULED_TIME|ELAPSED_TIME|AIR_TIME|DISTANCE|WHEELS_ON|TAXI_IN|SCHEDULED_ARRIVAL|ARRIVAL_TIME|ARRIVAL_DELAY|DIVERTED|CANCELLED|CANCELLATION_REASON|AIR_SYSTEM_DELAY|SECURITY_DELAY|AIRLINE_DELAY|LATE_AIRCRAFT_DELAY|WEATHER_DELAY|
+----+-----+---+-----------+-------+-------------+-----------+--------------+-------------------+-------------------+--------------+---------------+--------+----------+-----------

#### Examine the Percentage of Nulls

In [8]:
total_rows = flights_df.count()
null_percent_df = count_nulls(flights_df).withColumn(
    "percentage", 
    F.expr(f"({list(flights_df.columns)[0]} / {total_rows}) * 100")
)
# Alternatively, loop through columns:
for c in flights_df.columns:
    null_count = flights_df.filter(F.col(c).isNull()).count()
    print(f"{c}: {null_count} nulls, {(null_count / total_rows) * 100:.2f}%")


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

YEAR: 0 nulls, 0.00%
MONTH: 0 nulls, 0.00%
DAY: 0 nulls, 0.00%
DAY_OF_WEEK: 0 nulls, 0.00%
AIRLINE: 0 nulls, 0.00%
FLIGHT_NUMBER: 0 nulls, 0.00%
TAIL_NUMBER: 14721 nulls, 0.25%
ORIGIN_AIRPORT: 0 nulls, 0.00%
DESTINATION_AIRPORT: 0 nulls, 0.00%
SCHEDULED_DEPARTURE: 0 nulls, 0.00%
DEPARTURE_TIME: 86153 nulls, 1.48%
DEPARTURE_DELAY: 86153 nulls, 1.48%
TAXI_OUT: 89047 nulls, 1.53%
WHEELS_OFF: 89047 nulls, 1.53%
SCHEDULED_TIME: 6 nulls, 0.00%
ELAPSED_TIME: 105071 nulls, 1.81%
AIR_TIME: 105071 nulls, 1.81%
DISTANCE: 0 nulls, 0.00%
WHEELS_ON: 92513 nulls, 1.59%
TAXI_IN: 92513 nulls, 1.59%
SCHEDULED_ARRIVAL: 0 nulls, 0.00%
ARRIVAL_TIME: 92513 nulls, 1.59%
ARRIVAL_DELAY: 105071 nulls, 1.81%
DIVERTED: 0 nulls, 0.00%
CANCELLED: 0 nulls, 0.00%
CANCELLATION_REASON: 5729195 nulls, 98.46%
AIR_SYSTEM_DELAY: 4755640 nulls, 81.72%
SECURITY_DELAY: 4755640 nulls, 81.72%
AIRLINE_DELAY: 4755640 nulls, 81.72%
LATE_AIRCRAFT_DELAY: 4755640 nulls, 81.72%
WEATHER_DELAY: 4755640 nulls, 81.72%

---
### Analysis of Missing Values and Imputation Strategy

The analysis below summarizes the percentage of missing (`null`) values in each column of the `flights_df` DataFrame:

**Key Insights:**

- **No missing values (0%)**:  
  `YEAR`, `MONTH`, `DAY`, `DAY_OF_WEEK`, `AIRLINE`, `FLIGHT_NUMBER`, `ORIGIN_AIRPORT`, `DESTINATION_AIRPORT`, `SCHEDULED_DEPARTURE`, `DISTANCE`, `SCHEDULED_ARRIVAL`, `DIVERTED`, `CANCELLED`.

- **Moderate missing values (approx. 1-2%)**:  
  `TAIL_NUMBER (0.25%)`, `DEPARTURE_TIME (1.48%)`, `DEPARTURE_DELAY (1.48%)`, `TAXI_OUT (1.53%)`, `WHEELS_OFF (1.53%)`, `ELAPSED_TIME (1.81%)`, `AIR_TIME (1.81%)`, `WHEELS_ON (1.59%)`, `TAXI_IN (1.59%)`, `ARRIVAL_TIME (1.59%)`, `ARRIVAL_DELAY (1.81%)`.

- **Highly missing columns**:  
  - `CANCELLATION_REASON (98.46%)`:  
    Expected, since this is populated only for canceled flights.  
    **Imputation:** Replace nulls with "Not Cancelled".
  
  - Delay-related columns (`AIR_SYSTEM_DELAY`, `SECURITY_DELAY`, `AIRLINE_DELAY`, `LATE_AIRCRAFT_DELAY`, `WEATHER_DELAY`) all at **81.72%** missing:  
    These are populated only if delays occur.  
    **Imputation:** Replace nulls with `0` (indicating no delay).

**Actionable Steps:**

- Replace nulls in delay-related columns with `0`.
- Replace nulls in `CANCELLATION_REASON` with `"Not Cancelled"`.
- Re-check null counts afterward to confirm successful imputation.
---

In [7]:
from pyspark.sql import functions as F

# List of delay-related columns to fill with 0
delay_columns = ["AIR_SYSTEM_DELAY", "SECURITY_DELAY", "AIRLINE_DELAY", "LATE_AIRCRAFT_DELAY", "WEATHER_DELAY"]

# Fill nulls in delay columns with 0
flights_df_cleaned = flights_df.fillna({col: 0 for col in delay_columns})

# Fill nulls in CANCELLATION_REASON with "Not Cancelled"
flights_df_cleaned = flights_df_cleaned.fillna({"CANCELLATION_REASON": "Not Cancelled"})

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [8]:
# Define a helper function to count nulls for each column
def count_nulls(df):
    return df.select([F.sum(F.col(c).isNull().cast("int")).alias(c) for c in df.columns])

# Show the null counts after cleaning
print("After cleaning, null counts:")
count_nulls(flights_df_cleaned).show()


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

After cleaning, null counts:
+----+-----+---+-----------+-------+-------------+-----------+--------------+-------------------+-------------------+--------------+---------------+--------+----------+--------------+------------+--------+--------+---------+-------+-----------------+------------+-------------+--------+---------+-------------------+----------------+--------------+-------------+-------------------+-------------+
|YEAR|MONTH|DAY|DAY_OF_WEEK|AIRLINE|FLIGHT_NUMBER|TAIL_NUMBER|ORIGIN_AIRPORT|DESTINATION_AIRPORT|SCHEDULED_DEPARTURE|DEPARTURE_TIME|DEPARTURE_DELAY|TAXI_OUT|WHEELS_OFF|SCHEDULED_TIME|ELAPSED_TIME|AIR_TIME|DISTANCE|WHEELS_ON|TAXI_IN|SCHEDULED_ARRIVAL|ARRIVAL_TIME|ARRIVAL_DELAY|DIVERTED|CANCELLED|CANCELLATION_REASON|AIR_SYSTEM_DELAY|SECURITY_DELAY|AIRLINE_DELAY|LATE_AIRCRAFT_DELAY|WEATHER_DELAY|
+----+-----+---+-----------+-------+-------------+-----------+--------------+-------------------+-------------------+--------------+---------------+--------+----------+---------

In [11]:
total_rows = flights_df_cleaned.count()
null_percent_df = count_nulls(flights_df_cleaned).withColumn(
    "percentage", 
    F.expr(f"({list(flights_df_cleaned.columns)[0]} / {total_rows}) * 100")
)
# Alternatively, loop through columns:
for c in flights_df_cleaned.columns:
    null_count = flights_df_cleaned.filter(F.col(c).isNull()).count()
    print(f"{c}: {null_count} nulls, {(null_count / total_rows) * 100:.2f}%")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

YEAR: 0 nulls, 0.00%
MONTH: 0 nulls, 0.00%
DAY: 0 nulls, 0.00%
DAY_OF_WEEK: 0 nulls, 0.00%
AIRLINE: 0 nulls, 0.00%
FLIGHT_NUMBER: 0 nulls, 0.00%
TAIL_NUMBER: 14721 nulls, 0.25%
ORIGIN_AIRPORT: 0 nulls, 0.00%
DESTINATION_AIRPORT: 0 nulls, 0.00%
SCHEDULED_DEPARTURE: 0 nulls, 0.00%
DEPARTURE_TIME: 86153 nulls, 1.48%
DEPARTURE_DELAY: 86153 nulls, 1.48%
TAXI_OUT: 89047 nulls, 1.53%
WHEELS_OFF: 89047 nulls, 1.53%
SCHEDULED_TIME: 6 nulls, 0.00%
ELAPSED_TIME: 105071 nulls, 1.81%
AIR_TIME: 105071 nulls, 1.81%
DISTANCE: 0 nulls, 0.00%
WHEELS_ON: 92513 nulls, 1.59%
TAXI_IN: 92513 nulls, 1.59%
SCHEDULED_ARRIVAL: 0 nulls, 0.00%
ARRIVAL_TIME: 92513 nulls, 1.59%
ARRIVAL_DELAY: 105071 nulls, 1.81%
DIVERTED: 0 nulls, 0.00%
CANCELLED: 0 nulls, 0.00%
CANCELLATION_REASON: 0 nulls, 0.00%
AIR_SYSTEM_DELAY: 0 nulls, 0.00%
SECURITY_DELAY: 0 nulls, 0.00%
AIRLINE_DELAY: 0 nulls, 0.00%
LATE_AIRCRAFT_DELAY: 0 nulls, 0.00%
WEATHER_DELAY: 0 nulls, 0.00%

---
### Verification of Missing Value Imputation (Post-Cleaning)

The output confirms the successful completion of data cleaning:

**Successfully Handled Columns (now 0% nulls):**  
- **`CANCELLATION_REASON`**: Null values replaced with `"Not Cancelled"`.
- **Delay-related columns** (`AIR_SYSTEM_DELAY`, `SECURITY_DELAY`, `AIRLINE_DELAY`, `LATE_AIRCRAFT_DELAY`, `WEATHER_DELAY`): Null values replaced with `0`, indicating no delay occurred.

**Columns Still Containing Missing Values (low percentages):**  
- `TAIL_NUMBER`: 0.25% missing
- `DEPARTURE_TIME`, `DEPARTURE_DELAY`: ~1.48% missing
- `TAXI_OUT`, `WHEELS_OFF`: ~1.53% missing
- `ELAPSED_TIME`, `AIR_TIME`, `ARRIVAL_DELAY`: ~1.81% missing
- `WHEELS_ON`, `TAXI_IN`, `ARRIVAL_TIME`: ~1.59% missing

These remaining missing values represent a very small proportion of the data. Depending on the significance of these columns in subsequent analyses or modeling, you may either leave them as-is, remove affected rows, or apply additional imputation methods.

**Conclusion:**  
The primary data-cleaning step addressing major null-value issues has been successfully executed.
---

### Summary Statistics and Schema Verification for Cleaned Flights DataFrame

This step performs exploratory analysis by generating summary statistics and verifying data types:

- **Summary Statistics** (`describe()`):  
  Provides count, mean, standard deviation, minimum, and maximum values for numeric columns in `flights_df_cleaned`. Useful for identifying data distributions, potential outliers, and overall data quality.

- **Schema Verification** (`printSchema()`):  
  Displays the data type of each column, ensuring that the columns have the correct types (e.g., numeric, string, categorical) after cleaning.

This ensures the data is properly cleaned, consistent, and ready for subsequent analysis or modeling.

In [12]:
# Generate summary statistics for the cleaned flights DataFrame.
flights_df_cleaned.describe().show()


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------+--------------------+-----------------+-----------------+------------------+-------+------------------+-----------+------------------+-------------------+-------------------+------------------+------------------+-----------------+------------------+-----------------+------------------+------------------+-----------------+------------------+------------------+------------------+------------------+-----------------+--------------------+--------------------+-------------------+------------------+--------------------+------------------+-------------------+------------------+
|summary|                YEAR|            MONTH|              DAY|       DAY_OF_WEEK|AIRLINE|     FLIGHT_NUMBER|TAIL_NUMBER|    ORIGIN_AIRPORT|DESTINATION_AIRPORT|SCHEDULED_DEPARTURE|    DEPARTURE_TIME|   DEPARTURE_DELAY|         TAXI_OUT|        WHEELS_OFF|   SCHEDULED_TIME|      ELAPSED_TIME|          AIR_TIME|         DISTANCE|         WHEELS_ON|           TAXI_IN| SCHEDULED_ARRIVAL|      ARRIVAL_TIME|    A

In [13]:
# list the schema to verify the types
flights_df_cleaned.printSchema()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- YEAR: integer (nullable = true)
 |-- MONTH: integer (nullable = true)
 |-- DAY: integer (nullable = true)
 |-- DAY_OF_WEEK: integer (nullable = true)
 |-- AIRLINE: string (nullable = true)
 |-- FLIGHT_NUMBER: integer (nullable = true)
 |-- TAIL_NUMBER: string (nullable = true)
 |-- ORIGIN_AIRPORT: string (nullable = true)
 |-- DESTINATION_AIRPORT: string (nullable = true)
 |-- SCHEDULED_DEPARTURE: integer (nullable = true)
 |-- DEPARTURE_TIME: integer (nullable = true)
 |-- DEPARTURE_DELAY: integer (nullable = true)
 |-- TAXI_OUT: integer (nullable = true)
 |-- WHEELS_OFF: integer (nullable = true)
 |-- SCHEDULED_TIME: integer (nullable = true)
 |-- ELAPSED_TIME: integer (nullable = true)
 |-- AIR_TIME: integer (nullable = true)
 |-- DISTANCE: integer (nullable = true)
 |-- WHEELS_ON: integer (nullable = true)
 |-- TAXI_IN: integer (nullable = true)
 |-- SCHEDULED_ARRIVAL: integer (nullable = true)
 |-- ARRIVAL_TIME: integer (nullable = true)
 |-- ARRIVAL_DELAY: integer (null

---
### Visualization of Delay Distributions

This step creates visual summaries (histograms) of flight delay metrics using a sampled subset (`1%`) of the cleaned dataset to maintain computational efficiency:

- **Departure Delay Histogram (`DEPARTURE_DELAY`):**
  - Illustrates the frequency and distribution of departure delays (in minutes).

- **Arrival Delay Histogram (`ARRIVAL_DELAY`):**
  - Displays the distribution of arrival delays, highlighting typical delay patterns.

- **Airline Delay Histogram (`AIRLINE_DELAY`):**
  - Shows delays attributed specifically to airlines, providing insights into airline-related performance.

Each plot is saved as an image (`departure_delay.png`, `arrival_delay.png`, `airline_delay.png`) for easy inclusion in reports or presentations.

---

In [9]:
# Since the dataset may be large, take a sample for plotting
sample_fraction = 0.01  # Adjust this fraction as needed
sample_df = flights_df_cleaned.sample(withReplacement=False, fraction=sample_fraction, seed=42).toPandas()


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [10]:
import matplotlib.pyplot as plt

plt.figure(figsize=(8, 5))
plt.hist(sample_df['DEPARTURE_DELAY'].dropna(), bins=50, color='skyblue', edgecolor='black')
plt.title("Distribution of DEPARTURE_DELAY")
plt.xlabel("Departure Delay (minutes)")
plt.ylabel("Frequency")
plt.savefig("departure_delay.png")
print("Plot saved as departure_delay.png")


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Plot saved as departure_delay.png

In [11]:
# Plot a histogram of ARRIVAL_DELAY
plt.figure(figsize=(8, 5))
plt.hist(sample_df['ARRIVAL_DELAY'].dropna(), bins=50, color='lightgreen', edgecolor='black')
plt.title("Distribution of ARRIVAL_DELAY")
plt.xlabel("Arrival Delay (minutes)")
plt.ylabel("Frequency")
plt.savefig("arrival_delay.png")  # Save the plot to a file
print("Plot saved as arrival_delay.png")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Plot saved as arrival_delay.png

In [12]:
# Plot a histogram for one of the delay-related features, e.g., AIRLINE_DELAY
plt.figure(figsize=(8, 5))
plt.hist(sample_df['AIRLINE_DELAY'].dropna(), bins=50, color='salmon', edgecolor='black')
plt.title("Distribution of AIRLINE_DELAY")
plt.xlabel("Airline Delay (minutes)")
plt.ylabel("Frequency")
plt.savefig("airline_delay.png")  # Save the plot to a file
print("Plot saved as airline_delay.png")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Plot saved as airline_delay.png

---
### Uploading Delay Plots to Amazon S3

This step uploads the generated delay distribution plots to Amazon S3, making them accessible for sharing or future reference:

- **Departure Delay Plot (`departure_delay.png`)**
  - Uploaded to: `s3://final-csc555/plots/departure_delay.png`

- **Arrival Delay Plot (`arrival_delay.png`)**
  - Uploaded to: `s3://final-csc555/plots/arrival_delay.png`

- **Airline Delay Plot (`airline_delay.png`)**
  - Uploaded to: `s3://final-csc555/plots/airline_delay.png`

These visualizations are now securely stored in S3 and available for further analysis or reporting purposes.

> **Note:**  
> *Ensure that the keys (destination paths) are correctly set to avoid overwriting files. In your provided code, ensure you correct the destination key for the `departure_delay.png` to:*

```python
s3.upload_file("departure_delay.png", bucket_name, s3_key_departure)
```

---

In [13]:
# Create an S3 client (this uses the credentials from your EMR environment)
s3 = boto3.client('s3', region_name='us-east-1')
bucket_name = "final-csc555"

# Define S3 keys (destination paths) for each plot
s3_key_departure = "plots/departure_delay.png"
s3_key_arrival = "plots/arrival_delay.png"
s3_key_airline = "plots/airline_delay.png"

# Upload the departure_delay plot
s3.upload_file("departure_delay.png", bucket_name, s3_key_arrival)
print(f"departure_delay plot uploaded to s3://{bucket_name}/{s3_key_arrival}")

# Upload the arrival_delay plot
s3.upload_file("arrival_delay.png", bucket_name, s3_key_arrival)
print(f"arrival_delay plot uploaded to s3://{bucket_name}/{s3_key_arrival}")

# Upload the airline_delay plot
s3.upload_file("airline_delay.png", bucket_name, s3_key_airline)
print(f"airline_delay plot uploaded to s3://{bucket_name}/{s3_key_airline}")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

departure_delay plot uploaded to s3://final-csc555/plots/arrival_delay.png
arrival_delay plot uploaded to s3://final-csc555/plots/arrival_delay.png
airline_delay plot uploaded to s3://final-csc555/plots/airline_delay.png

---
### Correlation Analysis of Delay Features

This step calculates pairwise correlations among selected numeric features to understand their relationships:

**Individual correlations computed:**

- **Departure Delay vs Arrival Delay**:  
  Measures how strongly departure delays are associated with arrival delays.

- **Departure Delay vs Elapsed Time**:  
  Measures the relationship between departure delays and total elapsed flight time.

**Correlation Matrix for Selected Columns** (`DEPARTURE_DELAY`, `ARRIVAL_DELAY`, `ELAPSED_TIME`, `AIRLINE_DELAY`):

- Provides an overview of how strongly each delay-related feature correlates with the others.
- Useful for identifying features with strong relationships, aiding feature selection in modeling.

High correlation values (close to ±1) indicate strong linear relationships, which can inform modeling strategies or highlight potential issues such as multicollinearity.

---

In [25]:
# Let's compute correlations between a few numeric columns.
# For example: DEPARTURE_DELAY vs ARRIVAL_DELAY, and DEPARTURE_DELAY vs ELAPSED_TIME
corr_departure_arrival = flights_df_cleaned.stat.corr("DEPARTURE_DELAY", "ARRIVAL_DELAY")
corr_departure_elapsed = flights_df_cleaned.stat.corr("DEPARTURE_DELAY", "ELAPSED_TIME")

print("Correlation between DEPARTURE_DELAY and ARRIVAL_DELAY: {:.4f}".format(corr_departure_arrival))
print("Correlation between DEPARTURE_DELAY and ELAPSED_TIME: {:.4f}".format(corr_departure_elapsed))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Correlation between DEPARTURE_DELAY and ARRIVAL_DELAY: 0.9379
Correlation between DEPARTURE_DELAY and ELAPSED_TIME: 0.0327

In [26]:
# If needed, you can create a correlation matrix for selected numeric columns
numeric_cols = ["DEPARTURE_DELAY", "ARRIVAL_DELAY", "ELAPSED_TIME", "AIRLINE_DELAY"]
corr_dict = {}
for col1 in numeric_cols:
    corr_dict[col1] = {}
    for col2 in numeric_cols:
        corr_val = flights_df_cleaned.stat.corr(col1, col2)
        corr_dict[col1][col2] = round(corr_val, 4)

print("Correlation Matrix:")
for col, corr_vals in corr_dict.items():
    print(f"{col}: {corr_vals}")


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Correlation Matrix:
DEPARTURE_DELAY: {'DEPARTURE_DELAY': 1.0, 'ARRIVAL_DELAY': 0.9379, 'ELAPSED_TIME': 0.0327, 'AIRLINE_DELAY': 0.6537}
ARRIVAL_DELAY: {'DEPARTURE_DELAY': 0.9379, 'ARRIVAL_DELAY': 1.0, 'ELAPSED_TIME': 0.0319, 'AIRLINE_DELAY': 0.6276}
ELAPSED_TIME: {'DEPARTURE_DELAY': 0.0327, 'ARRIVAL_DELAY': 0.0319, 'ELAPSED_TIME': 1.0, 'AIRLINE_DELAY': 0.0253}
AIRLINE_DELAY: {'DEPARTURE_DELAY': 0.6537, 'ARRIVAL_DELAY': 0.6276, 'ELAPSED_TIME': 0.0253, 'AIRLINE_DELAY': 1.0}

---
### Feature Engineering: Extracting and Creating Time-based Features

This step extracts new meaningful features from the existing `SCHEDULED_DEPARTURE` column (assumed to be in HHMM format):

**Newly created features:**

- **`departure_hour`**: Hour of scheduled departure extracted from HHMM format.
- **`departure_minute`**: Minute component of scheduled departure.
- **`early_morning`**: A binary indicator (1 if the flight departs before 6:00 AM, 0 otherwise).

**Purpose of these features:**

- Capturing the time-related patterns in flight delays (e.g., early-morning flights might have different delay characteristics).
- Providing additional insights for predictive modeling and analysis.

Verification steps included ensure the correctness and accuracy of these new features.

---

In [27]:
# Assume SCHEDULED_DEPARTURE is in HHMM format (e.g., 520 means 5:20 AM).
# Create new features: departure_hour and departure_minute.
# If SCHEDULED_DEPARTURE is stored as an integer, we can perform integer division and modulo.

flights_df_fe = flights_df_cleaned.withColumn("departure_hour", F.floor(F.col("SCHEDULED_DEPARTURE")/100)) \
                                  .withColumn("departure_minute", F.col("SCHEDULED_DEPARTURE") % 100)

# Check the newly created features
flights_df_fe.select("SCHEDULED_DEPARTURE", "departure_hour", "departure_minute").show(50)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------------------+--------------+----------------+
|SCHEDULED_DEPARTURE|departure_hour|departure_minute|
+-------------------+--------------+----------------+
|                  5|             0|               5|
|                 10|             0|              10|
|                 20|             0|              20|
|                 20|             0|              20|
|                 25|             0|              25|
|                 25|             0|              25|
|                 25|             0|              25|
|                 30|             0|              30|
|                 30|             0|              30|
|                 30|             0|              30|
|                 30|             0|              30|
|                 35|             0|              35|
|                 35|             0|              35|
|                 40|             0|              40|
|                 40|             0|              40|
|                 45|       

In [28]:
# We might also create other features such as:
# - A binary flag for early morning flights (e.g., departure_hour < 6)
# - Combined features (e.g., a feature that is the sum of various delay components)

flights_df_fe = flights_df_fe.withColumn("early_morning", F.when(F.col("departure_hour") < 6, 1).otherwise(0))

# Show a few rows to verify the new features
flights_df_fe.select("departure_hour", "departure_minute", "early_morning").show(50)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------+----------------+-------------+
|departure_hour|departure_minute|early_morning|
+--------------+----------------+-------------+
|             0|               5|            1|
|             0|              10|            1|
|             0|              20|            1|
|             0|              20|            1|
|             0|              25|            1|
|             0|              25|            1|
|             0|              25|            1|
|             0|              30|            1|
|             0|              30|            1|
|             0|              30|            1|
|             0|              30|            1|
|             0|              35|            1|
|             0|              35|            1|
|             0|              40|            1|
|             0|              40|            1|
|             0|              45|            1|
|             0|              45|            1|
|             0|              48|       

In [29]:
flights_df_fe.show(5)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----+-----+---+-----------+-------+-------------+-----------+--------------+-------------------+-------------------+--------------+---------------+--------+----------+--------------+------------+--------+--------+---------+-------+-----------------+------------+-------------+--------+---------+-------------------+----------------+--------------+-------------+-------------------+-------------+--------------+----------------+-------------+
|YEAR|MONTH|DAY|DAY_OF_WEEK|AIRLINE|FLIGHT_NUMBER|TAIL_NUMBER|ORIGIN_AIRPORT|DESTINATION_AIRPORT|SCHEDULED_DEPARTURE|DEPARTURE_TIME|DEPARTURE_DELAY|TAXI_OUT|WHEELS_OFF|SCHEDULED_TIME|ELAPSED_TIME|AIR_TIME|DISTANCE|WHEELS_ON|TAXI_IN|SCHEDULED_ARRIVAL|ARRIVAL_TIME|ARRIVAL_DELAY|DIVERTED|CANCELLED|CANCELLATION_REASON|AIR_SYSTEM_DELAY|SECURITY_DELAY|AIRLINE_DELAY|LATE_AIRCRAFT_DELAY|WEATHER_DELAY|departure_hour|departure_minute|early_morning|
+----+-----+---+-----------+-------+-------------+-----------+--------------+-------------------+-----------------

In [30]:
flights_df_fe.printSchema()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- YEAR: integer (nullable = true)
 |-- MONTH: integer (nullable = true)
 |-- DAY: integer (nullable = true)
 |-- DAY_OF_WEEK: integer (nullable = true)
 |-- AIRLINE: string (nullable = true)
 |-- FLIGHT_NUMBER: integer (nullable = true)
 |-- TAIL_NUMBER: string (nullable = true)
 |-- ORIGIN_AIRPORT: string (nullable = true)
 |-- DESTINATION_AIRPORT: string (nullable = true)
 |-- SCHEDULED_DEPARTURE: integer (nullable = true)
 |-- DEPARTURE_TIME: integer (nullable = true)
 |-- DEPARTURE_DELAY: integer (nullable = true)
 |-- TAXI_OUT: integer (nullable = true)
 |-- WHEELS_OFF: integer (nullable = true)
 |-- SCHEDULED_TIME: integer (nullable = true)
 |-- ELAPSED_TIME: integer (nullable = true)
 |-- AIR_TIME: integer (nullable = true)
 |-- DISTANCE: integer (nullable = true)
 |-- WHEELS_ON: integer (nullable = true)
 |-- TAXI_IN: integer (nullable = true)
 |-- SCHEDULED_ARRIVAL: integer (nullable = true)
 |-- ARRIVAL_TIME: integer (nullable = true)
 |-- ARRIVAL_DELAY: integer (null

---
### Selecting Relevant Features and Removing Unnecessary Columns

This step finalizes the dataset for predictive modeling by removing columns that:

- **Could cause data leakage** (e.g., actual delays, operational timings available only after departure).
- **Contain duplicate or redundant information** (e.g., unique identifiers, dates without predictive relevance).

**Dropped columns include:**  
- Identifiers and dates: `YEAR`, `DAY`, `TAIL_NUMBER`  
- Actual operational times and delays: `DEPARTURE_TIME`, `DEPARTURE_DELAY`, `TAXI_OUT`, `WHEELS_OFF`, `ELAPSED_TIME`, `AIR_TIME`, `WHEELS_ON`, `TAXI_IN`, `ARRIVAL_TIME`, `ARRIVAL_DELAY`  
- Post-flight or operational status columns: `DIVERTED`, `CANCELLED`, `CANCELLATION_REASON`, various delay-specific columns  
- Minor details not predictive of delays: `departure_minute`

The resulting `final_transformed_df` retains columns containing useful pre-flight information, suitable for predictive modeling of flight delays.

The schema and a sample of the transformed data are displayed to confirm correctness.

---

In [31]:
# Define a list of columns to drop from your feature-engineered DataFrame.
# Adjust this list based on your project needs.
cols_to_drop = [
    "YEAR", "DAY", "TAIL_NUMBER", "DEPARTURE_TIME", "DEPARTURE_DELAY",
    "TAXI_OUT", "WHEELS_OFF", "SCHEDULED_TIME", "ELAPSED_TIME", "AIR_TIME",
    "WHEELS_ON", "TAXI_IN", "SCHEDULED_ARRIVAL", "ARRIVAL_TIME", "ARRIVAL_DELAY",
    "DIVERTED", "CANCELLED", "CANCELLATION_REASON", "AIR_SYSTEM_DELAY", 
    "SECURITY_DELAY", "AIRLINE_DELAY", "LATE_AIRCRAFT_DELAY", "WEATHER_DELAY",
    "departure_minute"  # you may decide to drop minute if not needed
]

# Create a final transformed DataFrame by dropping the unnecessary columns.
final_transformed_df = flights_df_fe.drop(*cols_to_drop)

# Display the final DataFrame to verify the changes.
final_transformed_df.show(5, truncate=False)
final_transformed_df.printSchema()


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----+-----------+-------+-------------+--------------+-------------------+-------------------+--------+--------------+-------------+
|MONTH|DAY_OF_WEEK|AIRLINE|FLIGHT_NUMBER|ORIGIN_AIRPORT|DESTINATION_AIRPORT|SCHEDULED_DEPARTURE|DISTANCE|departure_hour|early_morning|
+-----+-----------+-------+-------------+--------------+-------------------+-------------------+--------+--------------+-------------+
|1    |4          |AS     |98           |ANC           |SEA                |5                  |1448    |0             |1            |
|1    |4          |AA     |2336         |LAX           |PBI                |10                 |2330    |0             |1            |
|1    |4          |US     |840          |SFO           |CLT                |20                 |2296    |0             |1            |
|1    |4          |AA     |258          |LAX           |MIA                |20                 |2342    |0             |1            |
|1    |4          |AS     |135          |SEA           

---
### Data Enrichment: Adding Airline and Airport Information  

This step enriches the original dataset by adding descriptive information from external datasets (`airlines_df` and `airports_df`). The final enriched DataFrame (`enriched_df`) contains detailed context about airlines and airports, enhancing its usefulness for modeling and analysis.

**Enriched Information includes:**  

- **Airline details**:
  - Airline full name (`AIRLINE` → `AIRLINE_FULL_NAME`).

- **Origin Airport details**:
  - Airport name (`ORIGIN_AIRPORT`)
  - City (`ORIGIN_CITY`)
  - State (`ORIGIN_STATE`)

- **Destination Airport details**:
  - Airport name (`DESTINATION_AIRPORT`)
  - City (`DESTINATION_CITY`)
  - State (`DESTINATION_STATE`)

These enrichments add meaningful context to the original flight data, potentially improving predictive accuracy and facilitating deeper insights in further analyses or modeling. 

The enriched data schema and sample records are verified to ensure the correctness of the joins.

---

In [32]:
from pyspark.sql import functions as F

# final_transformed_df is the final cleaned dataset with columns:
# MONTH, DAY_OF_WEEK, AIRLINE, FLIGHT_NUMBER, ORIGIN_AIRPORT, DESTINATION_AIRPORT,
# SCHEDULED_DEPARTURE, DISTANCE, departure_hour, early_morning

# Prepare airlines enrichment DataFrame: rename IATA_CODE to AIRLINE_CODE and AIRLINE to AIRLINE_FULL_NAME
airlines_enriched = airlines_df.select(
    F.col("IATA_CODE").alias("AIRLINE_CODE"),
    F.col("AIRLINE").alias("AIRLINE_FULL_NAME")
)

# Join final_transformed_df with airlines_enriched on AIRLINE code
enriched_df = final_transformed_df.join(
    airlines_enriched, 
    final_transformed_df.AIRLINE == airlines_enriched.AIRLINE_CODE, 
    "left"
)

# Prepare origin airport enrichment DataFrame: rename IATA_CODE to ORIGIN_CODE and add full airport details
airports_origin = airports_df.select(
    F.col("IATA_CODE").alias("ORIGIN_CODE"),
    F.col("AIRPORT").alias("ORIGIN_AIRPORT_FULL"),
    F.col("CITY").alias("ORIGIN_CITY"),
    F.col("STATE").alias("ORIGIN_STATE")
)

# Prepare destination airport enrichment DataFrame: rename IATA_CODE to DEST_CODE and add full airport details
airports_dest = airports_df.select(
    F.col("IATA_CODE").alias("DEST_CODE"),
    F.col("AIRPORT").alias("DESTINATION_AIRPORT_FULL"),
    F.col("CITY").alias("DESTINATION_CITY"),
    F.col("STATE").alias("DESTINATION_STATE")
)

# Join with origin airport details on ORIGIN_AIRPORT key
enriched_df = enriched_df.join(
    airports_origin, 
    enriched_df.ORIGIN_AIRPORT == airports_origin.ORIGIN_CODE, 
    "left"
)

# Join with destination airport details on DESTINATION_AIRPORT key
enriched_df = enriched_df.join(
    airports_dest, 
    enriched_df.DESTINATION_AIRPORT == airports_dest.DEST_CODE, 
    "left"
)

# Drop duplicate key columns from enrichment (if desired)
final_enriched_df = enriched_df.drop("AIRLINE_CODE", "ORIGIN_CODE", "DEST_CODE")

# Display the final enriched DataFrame and its schema
final_enriched_df.show(5, truncate=False)
final_enriched_df.printSchema()


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----+-----------+-------+-------------+--------------+-------------------+-------------------+--------+--------------+-------------+----------------------+-------------------------------------------+-------------+------------+-------------------------------------------+----------------+-----------------+
|MONTH|DAY_OF_WEEK|AIRLINE|FLIGHT_NUMBER|ORIGIN_AIRPORT|DESTINATION_AIRPORT|SCHEDULED_DEPARTURE|DISTANCE|departure_hour|early_morning|AIRLINE_FULL_NAME     |ORIGIN_AIRPORT_FULL                        |ORIGIN_CITY  |ORIGIN_STATE|DESTINATION_AIRPORT_FULL                   |DESTINATION_CITY|DESTINATION_STATE|
+-----+-----------+-------+-------------+--------------+-------------------+-------------------+--------+--------------+-------------+----------------------+-------------------------------------------+-------------+------------+-------------------------------------------+----------------+-----------------+
|1    |4          |AS     |98           |ANC           |SEA                |

### Saving Final Enriched Dataset to Amazon S3 (Parquet Format)

This step saves the enriched flight dataset (`final_enriched_df`) to Amazon S3 in Parquet format, ensuring efficient storage, faster querying, and ease of future retrieval for analysis or modeling tasks.

**Destination Location:**

```
s3://final-csc555/parquet-data
```

- **File Format**: Parquet (columnar storage optimized for analytics)
- **Overwrite Policy**: Existing data at the specified path will be overwritten.
- **Accessibility**: Data stored on S3 for convenient future access and processing.

The data is now ready and efficiently stored for subsequent use.

In [73]:
final_enriched_df.write.mode("overwrite").parquet("s3://final-csc555/parquet-data")
print("Final enriched transformed data written to S3 at: s3://final-csc555/parquet-data")


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Final enriched transformed data written to S3 at: s3://final-csc555/parquet-data

---
### Setting up Athena Client for Querying Data

This step establishes a client connection to **AWS Athena** using `boto3`, leveraging IAM role credentials from our EMR environment. Athena allows efficient, SQL-based querying of datasets stored in Amazon S3.

**Key components:**

- **AWS Region**: `us-east-1`
- **Athena Client**: Created using provided session credentials.
- **Athena Output Location**: Results of queries will be stored in `final-csc555` bucket (must already exist).

> **Note:**  
> Ensure that the provided IAM credentials and session tokens are valid and have appropriate permissions.  
> It is recommended to use environment credentials or IAM roles rather than embedding static credentials directly into scripts for better security practices.

**Configured Athena Database:**  
- Database location for storing query results set to your specified S3 bucket.

The environment is now ready for executing SQL queries against your enriched dataset using Athena.

---

In [None]:
# Use boto3 to create an Athena client; on EMR, this will pick up the instance's IAM role credentials.
AWS_REGION = 'us-east-1'
athena_client = boto3.client('athena',
	aws_access_key_id='************************', #your access key
	aws_secret_access_key='**************************************', #your secret access key
	aws_session_token='*******************************************', #your session token
	region_name = AWS_REGION) # should be us-east-1 in most cases

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [65]:
# Define your Athena output bucket for query results.
# (This bucket must exist and the IAM role must have permission to write here.)
ATHENA_OUTPUT = "s3://final-csc555/"

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [78]:
# Use your database and table names.
DATABASE_NAME = 'flight2_db'
TABLE_NAME = 'flights_transformed'

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [79]:
# New S3 location where your Parquet data is stored.
PARQUET_DATA_LOCATION = "s3://final-csc555/parquet-data/"

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

---
### Creating an AWS Athena Database

This step creates a new database in **AWS Athena**, preparing the environment for subsequent SQL queries and data analysis.

**Process Details:**

- **Database Creation**: Uses Athena to create a database, storing metadata to manage and query datasets efficiently.
- **Execution Monitoring**: Monitors query execution status (`RUNNING`, `QUEUED`) until completion, ensuring the database is created successfully.
- **Output Location**: Stores query execution results in a predefined Amazon S3 bucket (`ATHENA_OUTPUT`).

After successful execution, the Athena database is ready for storing metadata tables and running queries against our enriched flight dataset stored on Amazon S3.

---

In [80]:
# Query 1: Create the database if it doesn't exist.
create_database_query = f"CREATE DATABASE IF NOT EXISTS {DATABASE_NAME};"
print("Executing query to create database:")
print(create_database_query)

response = athena_client.start_query_execution(
    QueryString=create_database_query,
    ResultConfiguration={'OutputLocation': ATHENA_OUTPUT}
)

execution_id = response['QueryExecutionId']
state = 'RUNNING'
while state in ['RUNNING', 'QUEUED']:
    time.sleep(2)
    result = athena_client.get_query_execution(QueryExecutionId=execution_id)
    state = result['QueryExecution']['Status']['State']

if state != 'SUCCEEDED':
    reason = result['QueryExecution']['Status'].get('StateChangeReason', 'Unknown')
    raise Exception(f"Database creation failed: {reason}")

print("Database created successfully.")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Executing query to create database:
CREATE DATABASE IF NOT EXISTS flight2_db;
Database created successfully.

---
### Creating an External Table in AWS Athena

This step creates an **external table** in AWS Athena pointing to the flight dataset stored in Parquet format on Amazon S3. An external table allows querying the dataset directly from S3 without duplicating data storage.

**Details:**

- **Table Schema**: Defines column names and their respective data types (e.g., integers for scheduling details, strings for airline and airport names).
- **Storage Format**: Data is stored as **Parquet**, optimizing query performance.
- **Data Location**: Points directly to the enriched flight data (`PARQUET_DATA_LOCATION`) stored in S3.
- **Query Execution Monitoring**: Ensures successful creation, providing clear feedback on success or failure.

After successful creation, this table can be queried directly using Athena SQL, enabling efficient analytics and reporting.

---

In [81]:
# Query 2: Create the external table.
create_table_query = f"""
CREATE EXTERNAL TABLE IF NOT EXISTS {TABLE_NAME} (
  MONTH INT, 
  DAY_OF_WEEK INT,
  AIRLINE STRING,
  FLIGHT_NUMBER INT,
  ORIGIN_AIRPORT STRING,
  DESTINATION_AIRPORT STRING,
  SCHEDULED_DEPARTURE INT,
  DISTANCE INT,
  departure_hour INT,
  early_morning INT,
  AIRLINE_FULL_NAME STRING,
  ORIGIN_AIRPORT_FULL STRING,
  ORIGIN_CITY STRING,
  ORIGIN_STATE STRING,
  DESTINATION_AIRPORT_FULL STRING,
  DESTINATION_CITY STRING,
  DESTINATION_STATE STRING
)
STORED AS PARQUET
LOCATION '{PARQUET_DATA_LOCATION}';
"""
print("Executing query to create external table:")
print(create_table_query.strip())

response = athena_client.start_query_execution(
    QueryString=create_table_query,
    QueryExecutionContext={'Database': DATABASE_NAME},
    ResultConfiguration={'OutputLocation': ATHENA_OUTPUT}
)

execution_id = response['QueryExecutionId']
state = 'RUNNING'
while state in ['RUNNING', 'QUEUED']:
    time.sleep(2)
    result = athena_client.get_query_execution(QueryExecutionId=execution_id)
    state = result['QueryExecution']['Status']['State']

if state != 'SUCCEEDED':
    reason = result['QueryExecution']['Status'].get('StateChangeReason', 'Unknown')
    raise Exception(f"Table creation failed: {reason}")
    
print("Athena table created successfully!")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Executing query to create external table:
CREATE EXTERNAL TABLE IF NOT EXISTS flights_transformed (
  MONTH INT, 
  DAY_OF_WEEK INT,
  AIRLINE STRING,
  FLIGHT_NUMBER INT,
  ORIGIN_AIRPORT STRING,
  DESTINATION_AIRPORT STRING,
  SCHEDULED_DEPARTURE INT,
  DISTANCE INT,
  departure_hour INT,
  early_morning INT,
  AIRLINE_FULL_NAME STRING,
  ORIGIN_AIRPORT_FULL STRING,
  ORIGIN_CITY STRING,
  ORIGIN_STATE STRING,
  DESTINATION_AIRPORT_FULL STRING,
  DESTINATION_CITY STRING,
  DESTINATION_STATE STRING
)
STORED AS PARQUET
LOCATION 's3://final-csc555/parquet-data/';
Athena table created successfully!

---
### Querying Data from Athena

This step executes an Athena SQL query to retrieve data from the `flights_transformed` table:

- **Query executed**:  
  ```sql
  SELECT * FROM flights_transformed LIMIT 900;
  ```

- **Purpose**: Quickly verify and inspect data from the external table, retrieving the first 900 rows.

- **Execution and Result Retrieval**:
  - Executes the query asynchronously.
  - Monitors execution until completion.
  - Retrieves results once the query succeeds.

We can further process the returned data for analysis or visualization as required.

---

In [116]:
# Define your query. Here, we select the first 900 rows.
query_string = "SELECT * FROM flights_transformed LIMIT 900;"
print("Executing query:")
print(query_string)

# Start query execution.
response = athena_client.start_query_execution(
    QueryString=query_string,
    QueryExecutionContext={'Database': DATABASE_NAME},
    ResultConfiguration={'OutputLocation': ATHENA_OUTPUT}
)

execution_id = response['QueryExecutionId']

# Poll until the query finishes.
state = 'RUNNING'
while state in ['RUNNING', 'QUEUED']:
    time.sleep(2)
    result = athena_client.get_query_execution(QueryExecutionId=execution_id)
    state = result['QueryExecution']['Status']['State']

if state != 'SUCCEEDED':
    reason = result['QueryExecution']['Status'].get('StateChangeReason', 'Unknown')
    raise Exception(f"Query execution failed: {reason}")

print("Query executed successfully. Fetching results...")

# Fetch query results.
result_response = athena_client.get_query_results(QueryExecutionId=execution_id)

# Print the results.
# Note: The results are returned in a structured format. You might need to process them further.
#print(result_response)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Executing query:
SELECT * FROM flights_transformed LIMIT 900;
Query executed successfully. Fetching results...

---
### Converting Athena Query Results to Pandas DataFrame  

This step transforms the results obtained from AWS Athena into a structured, easy-to-use Pandas DataFrame:

- Extracts column headers from Athena's result metadata.
- Converts each row into a structured format, handling potential missing values appropriately.
- Creates a DataFrame for further analysis, visualization, or reporting.

The resulting DataFrame (`df`) provides a familiar and convenient format for subsequent data exploration and analysis.

---

In [117]:
# Convert the Athena query results to a Pandas DataFrame.
rows = result_response['ResultSet']['Rows']

# The first row contains column headers.
headers = [col.get('VarCharValue', '') for col in rows[0]['Data']]
data = []
for row in rows[1:]:
    # If a value is missing, Athena might return an empty dict.
    row_data = [col.get('VarCharValue', None) for col in row['Data']]
    data.append(row_data)

df = pd.DataFrame(data, columns=headers)
print(df)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

    month day_of_week  ... destination_city destination_state
0      11           5  ...          Chicago                IL
1      11           5  ...    San Francisco                CA
2      11           5  ...          Houston                TX
3      11           5  ...        San Diego                CA
4      11           5  ...          Orlando                FL
..    ...         ...  ...              ...               ...
895    11           5  ...        San Diego                CA
896    11           5  ...   Ft. Lauderdale                FL
897    11           5  ...          Detroit                MI
898    11           5  ...          Detroit                MI
899    11           5  ...        Las Vegas                NV

[900 rows x 17 columns]

---
### Loading Athena Query Results Directly into Pandas

This step reads the CSV results generated by Athena directly from the provided Amazon S3 path into a Pandas DataFrame:

- **S3 Path:**  
  ```
  s3://final-csc555/23c9d842-bfa0-4e88-b3bb-0c84f3765b6e.csv
  ```

- **Method:** Uses `pandas.read_csv()` to efficiently load the CSV file directly from S3.

- **Purpose:** Facilitates immediate exploratory data analysis, visualization, or further local processing.

The resulting DataFrame (`df`) is ready for analysis and inspection.

---

In [16]:
# Replace with the exact path Athena provided for your query output.
csv_s3_path = "s3://final-csc555/23c9d842-bfa0-4e88-b3bb-0c84f3765b6e.csv"

# Using pandas to read CSV directly from S3:
df = pd.read_csv(csv_s3_path)

print(df.head())


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

   month  day_of_week  ...   destination_city  destination_state
0     11            5  ...            Chicago                 IL
1     11            5  ...      San Francisco                 CA
2     11            5  ...            Houston                 TX
3     11            5  ...            Atlanta                 GA
4     11            5  ...  Dallas-Fort Worth                 TX

[5 rows x 17 columns]

**We used two methods to load Athena query results serve different purposes:**

### **Method 1: Using Athena’s Query Result API (`get_query_results`)**

- **Purpose**: 
  - Quickly access small query results directly from Athena without additional setup.
- **Use-case**:
  - Ideal for small datasets or quick inspections.
- **Limitations**:
  - Not efficient for large datasets.
  - Results require manual parsing into structured formats (like Pandas).

### **Method 2: Loading directly from S3 using `pandas.read_csv`**

- **Purpose**:
  - Efficiently retrieve large query results stored by Athena in CSV format directly from S3.
- **Use-case**:
  - Preferred for larger datasets or analyses requiring ease and speed.
- **Advantages**:
  - Faster and more straightforward for larger queries.
  - CSV is directly loaded into Pandas without extra parsing.

---

### **Summary:**

- **Small dataset or quick checks:**  
  → Use Athena’s API (`get_query_results`).

- **Larger dataset or straightforward loading:**  
  → Use `pandas.read_csv` from S3 path.

We used both approaches for demonstration and to highlight the trade-offs in usability and scalability.

### Data Preparation for Machine Learning (Spark MLlib)

This step prepares the data for machine learning using Spark MLlib:

**1. Conversion to Spark DataFrame:**

- Converted the Pandas DataFrame into a PySpark DataFrame (`spark_df`) for compatibility with Spark ML pipelines.

**2. Data Type Casting:**

- Numeric columns (`MONTH`, `DAY_OF_WEEK`, `SCHEDULED_DEPARTURE`, `DISTANCE`, `departure_hour`, `early_morning`) are explicitly converted to `IntegerType` for consistent modeling.

**3. Creation of Target Variable (`IS_DELAYED`):**

- Created a binary target variable `IS_DELAYED` indicating delayed flights.
- For illustration, flights departing at or after 10 AM (`departure_hour >= 10`) are marked as delayed (`1`); otherwise, they are considered not delayed (`0`). Adjust this criterion based on your actual modeling needs.

**4. Handling Missing Values:**

- Dropped any remaining null values to ensure data quality for modeling.

The resulting DataFrame is verified and ready for use in predictive modeling tasks with Spark MLlib.

In [18]:
# First, convert Pandas DataFrame to PySpark DataFrame
spark_df = spark.createDataFrame(df)

spark_df.printSchema()
spark_df.show(5)


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- month: long (nullable = true)
 |-- day_of_week: long (nullable = true)
 |-- airline: string (nullable = true)
 |-- flight_number: long (nullable = true)
 |-- origin_airport: string (nullable = true)
 |-- destination_airport: string (nullable = true)
 |-- scheduled_departure: long (nullable = true)
 |-- distance: long (nullable = true)
 |-- departure_hour: long (nullable = true)
 |-- early_morning: long (nullable = true)
 |-- airline_full_name: string (nullable = true)
 |-- origin_airport_full: string (nullable = true)
 |-- origin_city: string (nullable = true)
 |-- origin_state: string (nullable = true)
 |-- destination_airport_full: string (nullable = true)
 |-- destination_city: string (nullable = true)
 |-- destination_state: string (nullable = true)

+-----+-----------+-------+-------------+--------------+-------------------+-------------------+--------+--------------+-------------+--------------------+--------------------+-----------+------------+------------------------

Data Pre-processing and Feature Engineering

In [23]:
from pyspark.sql.functions import col
from pyspark.sql.types import IntegerType

# Convert numeric columns to integer type
numeric_cols = ['MONTH', 'DAY_OF_WEEK', 'SCHEDULED_DEPARTURE', 'DISTANCE', 'departure_hour', 'early_morning']
for c in numeric_cols:
    spark_df = spark_df.withColumn(c, col(c).cast(IntegerType()))

# Create target variable IS_DELAYED
# For simplicity, let's assume departure_hour >= 15 is a delayed flight (you can modify this based on real delay criteria)
spark_df = spark_df.withColumn(
    "IS_DELAYED", 
    (col("departure_hour") >= 15).cast("integer")
)

# Drop nulls if any
spark_df = spark_df.na.drop()

spark_df.select(numeric_cols + ['IS_DELAYED']).show(5)


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----+-----------+-------------------+--------+--------------+-------------+----------+
|MONTH|DAY_OF_WEEK|SCHEDULED_DEPARTURE|DISTANCE|departure_hour|early_morning|IS_DELAYED|
+-----+-----------+-------------------+--------+--------------+-------------+----------+
|   11|          5|                625|    1744|             6|            0|         0|
|   11|          5|                625|     337|             6|            0|         0|
|   11|          5|                625|     781|             6|            0|         0|
|   11|          5|                625|     594|             6|            0|         0|
|   11|          5|                625|     408|             6|            0|         0|
+-----+-----------+-------------------+--------+--------------+-------------+----------+
only showing top 5 rows

Feature Encoding  (Categorical features)

In [20]:
spark_df.printSchema()


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- MONTH: integer (nullable = true)
 |-- DAY_OF_WEEK: integer (nullable = true)
 |-- airline: string (nullable = true)
 |-- flight_number: long (nullable = true)
 |-- origin_airport: string (nullable = true)
 |-- destination_airport: string (nullable = true)
 |-- SCHEDULED_DEPARTURE: integer (nullable = true)
 |-- DISTANCE: integer (nullable = true)
 |-- departure_hour: integer (nullable = true)
 |-- early_morning: integer (nullable = true)
 |-- airline_full_name: string (nullable = true)
 |-- origin_airport_full: string (nullable = true)
 |-- origin_city: string (nullable = true)
 |-- origin_state: string (nullable = true)
 |-- destination_airport_full: string (nullable = true)
 |-- destination_city: string (nullable = true)
 |-- destination_state: string (nullable = true)
 |-- IS_DELAYED: integer (nullable = true)

---
### Machine Learning Pipeline: Feature Encoding and Vector Assembly

This step builds a **Spark MLlib pipeline** to transform categorical and numeric columns into a unified feature vector, ready for predictive modeling:

**Pipeline Components:**

- **Categorical Feature Encoding:**
  - Converts categorical columns (`airline_full_name`, `origin_airport_full`, `destination_airport_full`) into numeric indices (`StringIndexer`) and then encodes them into vector form (`OneHotEncoder`).

- **Vector Assembler**:
  - Combines numeric columns (`MONTH`, `DAY_OF_WEEK`, `SCHEDULED_DEPARTURE`, `DISTANCE`, `departure_hour`, `early_morning`) and categorical feature vectors into a single unified feature vector.

**Pipeline Execution**:

- Fits the pipeline on the dataset to learn encoding schemes and applies transformations.
- Results in a machine learning-ready DataFrame (`ml_df`) with:
  - **`features`**: Encoded feature vector.
  - **`IS_DELAYED`**: Target variable for model training.

The dataset is now ready for modeling with Spark MLlib classifiers or regressors.

---

In [24]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml import Pipeline

# Define categorical columns clearly from your schema
categorical_cols = [
    "airline_full_name", 
    "origin_airport_full",
    "destination_airport_full",
    "origin_state",
    "destination_state"
]

# StringIndexer and OneHotEncoder for categorical columns
indexers = [StringIndexer(inputCol=col, outputCol=f"{col}_idx", handleInvalid='keep') for col in categorical_cols]

encoders = [OneHotEncoder(inputCol=f"{col}_idx", outputCol=f"{col}_vec") for col in categorical_cols]

# Combine numeric and encoded categorical features into a single feature vector
assembler_inputs = [
    "MONTH",
    "DAY_OF_WEEK",
    "SCHEDULED_DEPARTURE",
    "DISTANCE",
    "departure_hour",
    "early_morning"
] + [f"{col}_vec" for col in categorical_cols]

assembler = VectorAssembler(inputCols=assembler_inputs, outputCol="features")

# Final pipeline definition
pipeline = Pipeline(stages=indexers + encoders + [assembler])

# Fit the pipeline model and transform data
pipeline_model = pipeline.fit(spark_df)
ml_df = pipeline_model.transform(spark_df)

ml_df.select("features", "IS_DELAYED").show(5, truncate=False)



VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----------------------------------------------------------------------------------+----------+
|features                                                                           |IS_DELAYED|
+-----------------------------------------------------------------------------------+----------+
|(770,[0,1,2,3,4,11,24,342,662,719],[11.0,5.0,625.0,1744.0,6.0,1.0,1.0,1.0,1.0,1.0])|0         |
|(770,[0,1,2,3,4,11,24,346,662,716],[11.0,5.0,625.0,337.0,6.0,1.0,1.0,1.0,1.0,1.0]) |0         |
|(770,[0,1,2,3,4,6,45,369,664,717],[11.0,5.0,625.0,781.0,6.0,1.0,1.0,1.0,1.0,1.0])  |0         |
|(770,[0,1,2,3,4,7,44,341,664,720],[11.0,5.0,625.0,594.0,6.0,1.0,1.0,1.0,1.0,1.0])  |0         |
|(770,[0,1,2,3,4,10,113,343,703,717],[11.0,5.0,625.0,408.0,6.0,1.0,1.0,1.0,1.0,1.0])|0         |
+-----------------------------------------------------------------------------------+----------+
only showing top 5 rows

---
### Splitting Data into Training and Testing Sets

This step divides the prepared dataset into:

- **Training set (`train_df`)**: 80% of data, used to train and validate machine learning models.
- **Testing Set (`test_df`)**: Remaining 20%, used for evaluating the model’s performance on unseen data.

The random split ensures reproducibility (`seed=42`). The resulting datasets can be directly used for training, evaluating, and tuning machine learning models.

---

In [25]:
train_df, test_df = ml_df.randomSplit([0.8, 0.2], seed=42)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

---
### Checking the Target Variable Distribution (Class Balance)

This step examines the distribution of the target variable (`IS_DELAYED`):

- **Class Balance Check**:  
  Counts the number of delayed (`1`) and non-delayed (`0`) flights. Helps assess if class imbalance exists, guiding potential modeling decisions such as resampling or weighting.

- **Data Type Casting**: Ensures the target variable (`IS_DELAYED`) is explicitly cast as an integer, which is required for classification modeling with Spark MLlib.

Understanding class distribution ensures proper evaluation metrics and modeling decisions.

---

In [26]:
spark_df.groupBy("IS_DELAYED").count().show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----------+------+
|IS_DELAYED| count|
+----------+------+
|         1|203139|
|         0|296861|
+----------+------+

In [27]:
spark_df = spark_df.withColumn("IS_DELAYED", spark_df["IS_DELAYED"].cast("integer"))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

---
### Logistic Regression Model Training and Prediction

This step trains a **Logistic Regression** model using Spark MLlib and evaluates it on the test set:

- **Model Details**:
  - Predicts flight delays (`IS_DELAYED`) based on the assembled feature vector.
  - Uses binomial logistic regression for binary classification.

- **Output Columns in Predictions**:
  - **`IS_DELAYED`**: Actual label.
  - **`prediction`**: Predicted class (0 or 1).
  - **`probability`**: Probability of each class (delayed vs. not delayed).
  - **`rawPrediction`**: Raw scores before conversion to probabilities.

The resulting predictions allow evaluating model performance and effectiveness in predicting flight delays.

---

In [28]:
from pyspark.ml.classification import LogisticRegression


lr = LogisticRegression(featuresCol='features', labelCol='IS_DELAYED', family='binomial')
lr_model = lr.fit(train_df)

predictions = lr_model.transform(test_df)
predictions.select('IS_DELAYED', 'prediction', 'probability', 'rawPrediction').show(5, False)



VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----------+----------+-----------+----------------------------------------+
|IS_DELAYED|prediction|probability|rawPrediction                           |
+----------+----------+-----------+----------------------------------------+
|0         |0.0       |[1.0,0.0]  |[111.4583100148675,-111.4583100148675]  |
|0         |0.0       |[1.0,0.0]  |[114.90449511213194,-114.90449511213194]|
|0         |0.0       |[1.0,0.0]  |[340.46076885284083,-340.46076885284083]|
|0         |0.0       |[1.0,0.0]  |[107.81184469681534,-107.81184469681534]|
|0         |0.0       |[1.0,0.0]  |[326.8666750088467,-326.8666750088467]  |
+----------+----------+-----------+----------------------------------------+
only showing top 5 rows

---
### Model Evaluation Metrics

This step evaluates the Logistic Regression model's performance using key classification metrics:

- **Accuracy**: Measures the overall correctness of predictions.
- **AUC (Area Under ROC Curve)**: Indicates the model’s capability to distinguish between delayed and non-delayed flights (higher AUC indicates better predictive power).

Additionally, a **Confusion Matrix** is generated to visualize detailed prediction outcomes

These metrics collectively provide insights into the accuracy and reliability of the predictive model.

---

In [31]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator, BinaryClassificationEvaluator

# Accuracy
evaluator_accuracy = MulticlassClassificationEvaluator(
    labelCol="IS_DELAYED", predictionCol="prediction", metricName="accuracy"
)
accuracy = evaluator_accuracy.evaluate(predictions)
print(f"Accuracy: {accuracy:.4f}")

# AUC
evaluator_auc = BinaryClassificationEvaluator(
    labelCol="IS_DELAYED", rawPredictionCol="rawPrediction", metricName="areaUnderROC"
)
auc = evaluator_auc.evaluate(predictions)
print(f"AUC: {auc:.4f}")

# Confusion Matrix
predictions.groupBy('IS_DELAYED', 'prediction').count().show()


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Accuracy: 1.0000
AUC: 1.0000
+----------+----------+-----+
|IS_DELAYED|prediction|count|
+----------+----------+-----+
|         1|       1.0|40654|
|         0|       0.0|59548|
|         1|       0.0|    2|
|         0|       1.0|    1|
+----------+----------+-----+

---
### Model Performance Summary

The Logistic Regression model shows exceptionally high performance:

- **Accuracy:** `100%`
- **AUC (Area Under ROC):** `100%`

**Confusion Matrix:**

| Actual (`IS_DELAYED`) | Predicted (`prediction`) | Count  | Interpretation        |
|-----------------------|--------------------------|--------|-----------------------|
| 1                     | 1                        | 40,654 | True Positives ✅     |
| 0                     | 0                        | 59,548 | True Negatives ✅     |
| 0                     | 1                        | 2      | False Positives ❌    |
| 1                     | 0                        | 1      | False Negatives ❌    |

**Insights:**

- Nearly perfect classification accuracy.
- Extremely high AUC indicates excellent model capability to distinguish between delayed and non-delayed flights.
- The very low count of misclassified instances suggests minimal errors, indicating the model is highly effective on the current dataset.

**Recommendation:**  
Validate this high performance on additional or unseen data to confirm generalizability.

---