<a href="https://colab.research.google.com/github/sofi-avalos/PySpark-Air-Traffic-Analysis/blob/main/PySpark_Air_Traffic_Passenger_Statistics.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [2]:
"""
Air Traffic Passenger Statistics Analysis using PySpark

This notebook performs data ingestion, exploratory data analysis (EDA),
data cleaning, feature engineering, and regression modeling
(Decision Tree and Random Forest) on the 'Air Traffic Passenger Statistics' dataset.

It is designed to run in Google Colab with access to Google Drive.
"""

# ==============================================================================
# 1. SETUP AND DATA INGESTION
# ==============================================================================

## Install PySpark
!pip install PySpark

## Import necessary libraries for setup
from google.colab import drive
from os import path
from pyspark.sql import SparkSession

## Mount Google Drive to access the data
print("Mounting Google Drive...")
drive.mount('/content/drive')

## Define the path to the dataset on Google Drive
# IMPORTANT: Adjust this path to your specific file location
CSV_PATH = "/content/drive/MyDrive/Colab Notebooks/PROYECTO_FINAL/Air_Traffic_Passenger_Statistics.csv"

## Check if the file exists (optional but good practice)
if not path.exists(CSV_PATH):
    raise FileNotFoundError(f"File not found at: {CSV_PATH}. Please check the path.")

## Create Spark Session
print("Creating Spark Session...")
spark = (SparkSession.builder
    .appName("Air Traffic Analysis")
    .getOrCreate())

## Load the CSV data into a Spark DataFrame
print(f"Reading data from: {CSV_PATH}")
df_spark = spark.read.csv(CSV_PATH, header=True, inferSchema=True)

## Display the schema to verify data types
print("\n--- DataFrame Schema ---")
df_spark.printSchema()

# ------------------------------------------------------------------------------

# ==============================================================================
# 2. EXPLORATORY DATA ANALYSIS (EDA)
# ==============================================================================

from pyspark.sql.functions import sum, desc, count, col

## A) Analyze records for a specific airline (e.g., Air China)
print("\n--- Air China Records Analysis ---")
air_china_records_count = df_spark.filter(df_spark["Operating Airline"] == "Air China").count()
print(f"Total number of 'Air China' records: {air_china_records_count}")

# Display a sample of 'Air China' records
print("\nSample of 'Air China' records:")
df_spark.filter(df_spark["Operating Airline"] == "Air China").show(5)

## B) Analyze records for 'Air Berlin' embarked at 'G' Boarding Area
print("\n--- Air Berlin at Gate 'G' Analysis ---")
df_air_berlin = df_spark.filter(df_spark["Operating Airline"] == "Air Berlin")
air_berlin_g_count = df_air_berlin.filter(df_air_berlin["Boarding Area"] == "G").count()
print(f"Total number of 'Air Berlin' records in Boarding Area 'G': {air_berlin_g_count}")

# Display a sample of 'Air Berlin' at 'G' records
print("\nSample of 'Air Berlin' at 'G' records:")
df_air_berlin.filter(df_air_berlin["Boarding Area"] == "G").show(5)

## C) Count the number of unique operating airlines
print("\n--- Unique Operating Airlines Count ---")
distinct_companies = df_spark.select("Operating Airline").distinct()
unique_companies_count = distinct_companies.count()
print(f"Number of different operating airlines: {unique_companies_count}")

# Display a sample of unique operating airlines (up to 100)
print("\nList of unique operating airlines (partial):")
distinct_companies.show(20, truncate=False)

## D) Aggregate total passengers by Operating Airline
print("\n--- Top 20 Airlines by Total Adjusted Passenger Count ---")
df_spark.groupBy("Operating Airline").agg(
    sum("Adjusted Passenger Count").alias("Total_Passengers")).orderBy(desc("Total_Passengers")).show(20, truncate=False)

# ------------------------------------------------------------------------------

# ==============================================================================
# 3. DATA CLEANING AND PREPARATION
# ==============================================================================

from pyspark.sql.functions import col, avg, round, when, abs as abs_col
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number, lit
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml.stat import Correlation

## A) Filter out specific entries for data consistency
print("\n--- Data Cleaning: Filtering out 'United Airlines - Pre 07/01/2013' ---")
FILTER_OUT_AIRLINE = "United Airlines - Pre 07/01/2013"

df_clean = df_spark.filter(col("Operating Airline") != FILTER_OUT_AIRLINE) \
                   .filter(col("Published Airline") != FILTER_OUT_AIRLINE)

# Verification after cleaning
print("\nTop 20 Airlines by Total Passengers (Post-Cleaning Verification):")
df_clean.groupBy("Operating Airline", "Published Airline").agg(
    sum("Adjusted Passenger Count").alias("Total_Passengers")).orderBy(
        desc("Total_Passengers")).show(20, truncate=False)

cleaned_companies_count = df_clean.select("Operating Airline").distinct().count()
print(f"Number of distinct airlines in the cleaned dataset: {cleaned_companies_count}")

## B) Calculate the average passenger count per airline
print("\n--- Average Passenger Count per Operating Airline ---")
df_avg_passengers = df_clean.groupBy("Operating Airline") \
                            .agg(avg(col("Passenger Count")).alias("Average_Passengers"))

# Round the result and sort descending
df_avg_passengers = df_avg_passengers.withColumn("Average_Passengers", round(col("Average_Passengers"), 2)) \
                                     .orderBy(col("Average_Passengers").desc())

df_avg_passengers.show(20, truncate=False) # Showing a partial list

## C) Remove duplicate 'Geo Region' entries, keeping the one with the highest 'Passenger Count'
print("\n--- Deduplication: Unique Geo Regions by Max Passenger Count ---")
# Define the window specification
window_spec = Window.partitionBy("GEO Region").orderBy(desc("Passenger Count"))

# Rank the rows within each Geo Region group
df_ranked = df_clean.withColumn("row_num", row_number().over(window_spec))

print("Sample of ranked data:")
df_ranked.show(5)

# Filter for the highest count (row_num = 1) and drop the temporary column
df_unique_geo_region = df_ranked.filter(col("row_num") == 1).drop("row_num")

print("\nUnique 'GEO Region' results with maximum passenger count:")
df_unique_geo_region.show(5)

## D) Export results to a single CSV file
print("\n--- Exporting Results to CSV ---")

# Prepare DataFrames for union by creating a common column
df_avg_passengers_csv = df_avg_passengers.withColumn("Result_Type", lit("Average_Passengers_by_Airline"))
df_unique_geo_region_csv = df_unique_geo_region.withColumn("Result_Type", lit("Unique_Geo_Region"))

# Select common columns and perform a union
df_combined_results = df_avg_passengers_csv.select(
        col("Operating Airline").alias("Key_Attribute_1"),
        col("Average_Passengers").alias("Value_Attribute"),
        "Result_Type"
    ).union(
        df_unique_geo_region_csv.select(
            col("GEO Region").alias("Key_Attribute_1"),
            col("Passenger Count").alias("Value_Attribute"),
            "Result_Type"
        )
    )

# Define output path
OUTPUT_CSV_PATH = "/content/drive/MyDrive/Colab Notebooks/PROYECTO_FINAL/results/final_project_results_csv"

# Write the combined DataFrame to CSV (coalesce(1) ensures a single output file)
df_combined_results.coalesce(1).write.csv(
    OUTPUT_CSV_PATH,
    mode="overwrite", # Use "overwrite" to run the cell multiple times
    header=True
)

print(f"Results successfully saved to: {OUTPUT_CSV_PATH}")

# ------------------------------------------------------------------------------

# ==============================================================================
# 4. STATISTICAL ANALYSIS: MEAN AND STANDARD DEVIATION
# ==============================================================================

from pyspark.sql.functions import avg, stddev

TARGET_COL = "Adjusted Passenger Count" # The target column for all calculations

def calculate_stats(df, group_col):
    """Calculates Mean and StdDev for the target column, grouped by group_col."""
    print(f"\n--- Mean and Standard Deviation for '{group_col}' ---")
    df.groupBy(group_col).agg(
        round(avg(col(TARGET_COL)), 1).alias("Mean_Passengers"),
        round(stddev(col(TARGET_COL)), 1).alias("StdDev_Passengers")
    ).orderBy(desc("Mean_Passengers")).show(20, truncate=False)

# List of columns to analyze
ANALYSIS_COLS = [
    "Activity Period", "Month", "Year", "GEO Region", "GEO Summary",
    "Terminal", "Boarding Area", "Activity Type Code", "Adjusted Activity Type Code",
    "Operating Airline", "Published Airline", "Operating Airline IATA Code",
    "Published Airline IATA Code", "Price Category Code"
]

# Run analysis for each column
for column in ANALYSIS_COLS:
    # Use df_spark for 'Year' as the cleaning step removed some historical data,
    # and df_clean for the rest to analyze the cleaned dataset.
    df_to_use = df_spark if column == "Year" else df_clean
    calculate_stats(df_to_use, column)

# ------------------------------------------------------------------------------

# ==============================================================================
# 5. FEATURE ENGINEERING AND CORRELATION ANALYSIS
# ==============================================================================

## Convert categorical features to numerical indices and calculate correlation

# 1. Define columns to include/exclude
IGNORE_COLS = ['Passenger Count', 'Activity Period', "Published Airline IATA Code",
               "Operating Airline IATA Code", "Adjusted Activity Type Code"]
COLS_TO_INDEX = ['Month', 'Terminal', 'Boarding Area', 'GEO Region']
NUMERIC_COLS = ['Adjusted Passenger Count', 'Year']

# Filter the DataFrame to include only relevant columns for indexing/correlation
df_corr = df_clean.select(*[c for c in df_clean.columns if c not in IGNORE_COLS])

# 2. String Indexing for categorical columns
print("\n--- Feature Indexing for Correlation ---")
df_indexed = df_corr

indexers_corr = [StringIndexer(inputCol=c, outputCol=c + "_index", handleInvalid="skip") for c in COLS_TO_INDEX]
for indexer in indexers_corr:
    df_indexed = indexer.fit(df_indexed).transform(df_indexed)

# 3. Assemble the features vector
FINAL_COLS = NUMERIC_COLS + [c + "_index" for c in COLS_TO_INDEX]
vector_assembler_corr = VectorAssembler(inputCols=FINAL_COLS, outputCol="features")
df_vector = vector_assembler_corr.transform(df_indexed).select("features")

# 4. Calculate the correlation matrix (Pearson)
print("Calculating correlation matrix...")
corr_matrix_head = Correlation.corr(df_vector, "features").head()
if corr_matrix_head is not None:
    corr_matrix = corr_matrix_head[0].toArray()
else:
    print("Could not calculate correlation matrix (data may be empty or contain non-finite values).")
    corr_matrix = None

# 5. Convert matrix to DataFrame and find TOP 10 correlations
if corr_matrix is not None:
    corr_list = []
    n = len(FINAL_COLS)
    for i in range(n):
        for j in range(i+1, n):
            # Append (Column 1, Column 2, Correlation Value)
            corr_list.append((FINAL_COLS[i], FINAL_COLS[j], float(corr_matrix[i, j])))

    corr_df = spark.createDataFrame(corr_list, ["Column_1", "Column_2", "Correlation"])
    corr_df = corr_df.withColumn("Abs_Correlation", abs_col(col("Correlation")))

    print("\nTop 10 Strongest Correlations (Absolute Value):")
    corr_df.orderBy(col("Abs_Correlation").desc()).select("Column_1", "Column_2", "Correlation").show(10, truncate=False)
else:
    corr_df = None
# ------------------------------------------------------------------------------

# ==============================================================================
# 6. MISSING VALUE IMPUTATION AND DUPLICATE REMOVAL
# ==============================================================================

## A) Check for Missing Values (Nulls)
print("\n--- Missing Value Check ---")
print("Data Summary (pre-imputation):")
df_clean.summary().show()

# Count missings per column
missings_count = df_clean.select([sum(col(c).isNull().cast("integer")).alias(c) for c in df_clean.columns])
print("\nMissing values count per column:")
missings_count.show()

## B) Impute Missing IATA Codes with the Mode
print("\n--- Imputation using Mode ---")

# Calculate the mode for 'Operating Airline IATA Code'
mode_op_iata = df_clean.groupBy("Operating Airline IATA Code").agg(count("*").alias("count")).orderBy(desc("count")).first()
operating_airline_iata_mode = mode_op_iata[0] if mode_op_iata else "N/A"

# Calculate the mode for 'Published Airline IATA Code'
mode_pub_iata = df_clean.groupBy("Published Airline IATA Code").agg(count("*").alias("count")).orderBy(desc("count")).first()
published_airline_iata_mode = mode_pub_iata[0] if mode_pub_iata else "N/A"

print(f"Mode for 'Operating Airline IATA Code': {operating_airline_iata_mode}")
print(f"Mode for 'Published Airline IATA Code': {published_airline_iata_mode}")

# Fill null values with the calculated modes
df_filled = df_clean.fillna({
    "Operating Airline IATA Code": operating_airline_iata_mode,
    "Published Airline IATA Code": published_airline_iata_mode
})

print("\nData Summary (post-imputation verification):")
df_filled.summary().show()

## C) Remove Duplicate Records
print("\n--- Duplicate Removal ---")
df_unique = df_filled.dropDuplicates()

# Calculate the total adjusted passengers after cleaning and deduplication
total_passengers = df_unique.select(sum(TARGET_COL).alias("Total_Adjusted_Passengers_Unique")).collect()[0]["Total_Adjusted_Passengers_Unique"]
print(f"Total Adjusted Passengers (After Deduplication): {total_passengers:,.0f}")

# ------------------------------------------------------------------------------

# ==============================================================================
# 7. REGRESSION MODELING: DECISION TREE REGRESSOR
# ==============================================================================

from pyspark.ml.regression import DecisionTreeRegressor
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml import Pipeline

## A) Prepare Data for Modeling
print("\n--- Decision Tree Regressor Model Preparation ---")

# Select relevant columns and filter out nulls in the target variable
df_model_dt = df_unique.select(
    TARGET_COL, "Operating Airline", "Month", "GEO Region", "Activity Type Code",
    "Terminal", "Published Airline", "Boarding Area", "Year",
    "Operating Airline IATA Code", "Published Airline IATA Code"
).filter(col(TARGET_COL).isNotNull())

# Define categorical features to be indexed
categorical_cols_dt = [
    "Operating Airline", "GEO Region", "Activity Type Code", "Terminal",
    "Published Airline", "Boarding Area", "Year",
    "Operating Airline IATA Code", "Published Airline IATA Code"
]

# Create StringIndexers for all categorical columns
indexers_dt = [StringIndexer(inputCol=c, outputCol=c + "_index", handleInvalid="skip") for c in categorical_cols_dt]

# Create feature vector assembler
feature_cols_dt = [c + "_index" for c in categorical_cols_dt]
assembler_dt = VectorAssembler(inputCols=feature_cols_dt, outputCol="features")

# Create a Pipeline to apply transformations
pipeline_dt = Pipeline(stages=indexers_dt + [assembler_dt])
df_final_dt = pipeline_dt.fit(df_model_dt).transform(df_model_dt)

## B) Split Data
train_data_dt, test_data_dt = df_final_dt.randomSplit([0.8, 0.2], seed=42)
print(f"Training records: {train_data_dt.count()}, Test records: {test_data_dt.count()}")

## C) Train and Predict
print("\nTraining Decision Tree Regressor...")
dec_tree = DecisionTreeRegressor(
    featuresCol="features",
    labelCol=TARGET_COL,
    maxDepth=10,
    minInstancesPerNode=50,
    maxBins=80
)

model_dt = dec_tree.fit(train_data_dt)
predictions_dt = model_dt.transform(test_data_dt)

print("\nSample Predictions:")
predictions_dt.select(TARGET_COL, "prediction").show(5)

## D) Evaluate Model
evaluator_dt = RegressionEvaluator(labelCol=TARGET_COL, predictionCol="prediction")
r2_dt = evaluator_dt.evaluate(predictions_dt, {evaluator_dt.metricName: "r2"})
rmse_dt = evaluator_dt.evaluate(predictions_dt, {evaluator_dt.metricName: "rmse"})

print(f"\nDecision Tree - R2: {r2_dt:.4f}")
print(f"Decision Tree - RMSE: {rmse_dt:,.1f}")

# ------------------------------------------------------------------------------

# ==============================================================================
# 8. REGRESSION MODELING: RANDOM FOREST REGRESSOR
# ==============================================================================

from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.feature import MinMaxScaler

## A) Feature Engineering for Month (Manual Indexing)
print("\n--- Random Forest Regressor Model Preparation ---")

# Convert Month names to numerical indices (1 to 12)
df_with_month_num = df_unique.withColumn("Month_Num",
    when(col("Month") == "January", 1)
    .when(col("Month") == "February", 2)
    .when(col("Month") == "March", 3)
    .when(col("Month") == "April", 4)
    .when(col("Month") == "May", 5)
    .when(col("Month") == "June", 6)
    .when(col("Month") == "July", 7)
    .when(col("Month") == "August", 8)
    .when(col("Month") == "September", 9)
    .when(col("Month") == "October", 10)
    .when(col("Month") == "November", 11)
    .when(col("Month") == "December", 12)
    .otherwise(0)
)

# Select columns for the Random Forest Model
df_model_rf = df_with_month_num.select(
    TARGET_COL, "Operating Airline", "Month_Num", "GEO Region",
    "Activity Type Code", "Terminal", "Published Airline", "Year",
    "Boarding Area", "Operating Airline IATA Code", "Published Airline IATA Code"
).filter(col(TARGET_COL).isNotNull())

## B) Pipeline for Indexing and Assembling
categorical_cols_rf = [
    "Operating Airline", "GEO Region", "Activity Type Code", "Terminal",
    "Published Airline", "Boarding Area", "Operating Airline IATA Code",
    "Published Airline IATA Code"
]

# StringIndexers
indexers_rf = [StringIndexer(inputCol=c, outputCol=c + "_index", handleInvalid="skip") for c in categorical_cols_rf]

# Feature Columns: Indexed Categorical + Numeric Month_Num and Year
feature_cols_rf = ["Month_Num", "Year"] + [col_name + "_index" for col_name in categorical_cols_rf]
vector_assembler_rf = VectorAssembler(inputCols=feature_cols_rf, outputCol="features", handleInvalid="skip")

# Full Pipeline
pipeline_rf = Pipeline(stages=indexers_rf + [vector_assembler_rf])
df_final_rf = pipeline_rf.fit(df_model_rf).transform(df_model_rf)


## C) Split Data
(train_data_rf, test_data_rf) = df_final_rf.randomSplit([0.8, 0.2], seed=42)
print(f"Training records: {train_data_rf.count()}")
print(f"Test records: {test_data_rf.count()}")

## D) Train and Predict (Random Forest)
print("\nTraining Random Forest Regressor...")
rf = RandomForestRegressor(
    featuresCol="features",
    labelCol=TARGET_COL,
    numTrees=50,
    maxDepth=10,
    maxBins=128,
    seed=42
)

model_rf = rf.fit(train_data_rf)
predictions_rf = model_rf.transform(test_data_rf)

print("\nSample Predictions:")
predictions_rf.select(TARGET_COL, "prediction").show(5)

## E) Evaluate Model
evaluator_rmse_rf = RegressionEvaluator(labelCol=TARGET_COL, predictionCol="prediction", metricName="rmse")
evaluator_r2_rf = RegressionEvaluator(labelCol=TARGET_COL, predictionCol="prediction", metricName="r2")

rmse_rf = evaluator_rmse_rf.evaluate(predictions_rf)
r2_rf = evaluator_r2_rf.evaluate(predictions_rf)

print(f"\nRandom Forest RMSE: {rmse_rf:,.1f}")
print(f"Random Forest R2: {r2_rf:.4f}")

# ------------------------------------------------------------------------------

# ==============================================================================
# 9. ADVANCED RANDOM FOREST REGRESSOR (HYPERPARAMETER TUNING/REFINEMENT)
# ==============================================================================

# Note: This section re-runs a Random Forest model with different hyperparameters
# (numTrees=100, maxDepth=15) and a refined pipeline.

print("\n--- Advanced Random Forest Regressor (Refined) ---")

# Re-use df_with_month_num from the previous section
df_clean_for_advanced_rf = df_with_month_num

# 1. Define columns for indexing
categorical_cols_adv = [
    "Operating Airline", "Published Airline", "Activity Type Code", "Terminal",
    "Boarding Area", "GEO Region", "Operating Airline IATA Code",
    "Published Airline IATA Code"
]
numeric_cols_adv = ["Year", "Month_Num"] # Using the Month_Num column

# 2. Drop temporary index/feature columns if they exist from previous runs
cols_to_drop = [c + "_index" for c in categorical_cols_adv if c + "_index" in df_clean_for_advanced_rf.columns]
if "features" in df_clean_for_advanced_rf.columns:
    cols_to_drop.append("features")
df_clean_adv = df_clean_for_advanced_rf.drop(*cols_to_drop)

# 3. Define the full Pipeline: Indexing + Assembling
indexers_adv = [StringIndexer(inputCol=c, outputCol=c + "_Index", handleInvalid="skip") for c in categorical_cols_adv]
feature_cols_adv = numeric_cols_adv + [c + "_Index" for c in categorical_cols_adv]
assembler_adv = VectorAssembler(inputCols=feature_cols_adv, outputCol="features")
full_pipeline_adv = Pipeline(stages=indexers_adv + [assembler_adv])

# Apply transformations
df_prepared_adv = full_pipeline_adv.fit(df_clean_adv).transform(df_clean_adv)

# 4. Split Data
train_df_adv, test_df_adv = df_prepared_adv.randomSplit([0.8, 0.2], seed=42)
print(f"Training records: {train_df_adv.count()}")
print(f"Test records: {test_df_adv.count()}")

# 5. Define and Train the Random Forest Model
rf_adv = RandomForestRegressor(
    featuresCol="features",
    labelCol=TARGET_COL,
    numTrees=100,  # Increased trees
    maxDepth=15,   # Increased depth
    maxBins=128,
    seed=42)

model_rf_adv = rf_adv.fit(train_df_adv)

# 6. Predict and Evaluate
predictions_adv = model_rf_adv.transform(test_df_adv)
predictions_adv.select(TARGET_COL, "prediction").show(5)

evaluator_rmse_adv = RegressionEvaluator(labelCol=TARGET_COL, predictionCol="prediction", metricName="rmse")
evaluator_r2_adv = RegressionEvaluator(labelCol=TARGET_COL, predictionCol="prediction", metricName="r2")

rmse_adv = evaluator_rmse_adv.evaluate(predictions_adv)
r2_adv = evaluator_r2_adv.evaluate(predictions_adv)

print(f"\nRefined Random Forest RMSE: {rmse_adv:,.2f}")
print(f"Refined Random Forest R2: {r2_adv:.4f}")

# Stop Spark Session
spark.stop()
print("\nSpark Session stopped.")

Mounting Google Drive...
Mounted at /content/drive
Creating Spark Session...
Reading data from: /content/drive/MyDrive/Colab Notebooks/PROYECTO_FINAL/Air_Traffic_Passenger_Statistics.csv

--- DataFrame Schema ---
root
 |-- Activity Period: integer (nullable = true)
 |-- Operating Airline: string (nullable = true)
 |-- Operating Airline IATA Code: string (nullable = true)
 |-- Published Airline: string (nullable = true)
 |-- Published Airline IATA Code: string (nullable = true)
 |-- GEO Summary: string (nullable = true)
 |-- GEO Region: string (nullable = true)
 |-- Activity Type Code: string (nullable = true)
 |-- Price Category Code: string (nullable = true)
 |-- Terminal: string (nullable = true)
 |-- Boarding Area: string (nullable = true)
 |-- Passenger Count: integer (nullable = true)
 |-- Adjusted Activity Type Code: string (nullable = true)
 |-- Adjusted Passenger Count: integer (nullable = true)
 |-- Year: integer (nullable = true)
 |-- Month: string (nullable = true)


--- Air