### PySpark ETL Pipeline - Exploration & Development

This notebook documents the exploration and development of our PySpark-based ETL pipeline for the Flight Airfare Prediction project.

### 1. Environment Setup

In [1]:
# Standard imports
import sys
import os
from pathlib import Path

# Add project root to path
project_root = Path.cwd().parent if 'notebooks' in str(Path.cwd()) else Path.cwd()
sys.path.insert(0, str(project_root))
print(f"Project root: {project_root}")

Project root: /Users/proxim/projects/Flight-Airfare-Prediction-on-Azure-with-WebUI


In [2]:
# PySpark imports
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import *

# Data processing
import pandas as pd
import numpy as np

# Visualization
import matplotlib.pyplot as plt
import seaborn as sns

# Suppress warnings
import warnings
warnings.filterwarnings('ignore')

print("✓ Libraries imported successfully")

✓ Libraries imported successfully


In [None]:
# Initialize Spark Session
spark = SparkSession.builder \
    .appName("FlightETL-Exploration") \
    .config("spark.sql.adaptive.enabled", "true") \
    .config("spark.driver.memory", "4g") \
    .getOrCreate()

# Set log level
spark.sparkContext.setLogLevel("WARN")

print(f"Spark Version: {spark.version}")
print(f"App Name: {spark.sparkContext.appName}")

## 2. Data Exploration with PySpark

In [3]:
# Load raw data
data_path = str(project_root / "data" / "train.csv")

raw_df = spark.read \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .csv(data_path)

print(f"Loaded {raw_df.count()} rows from {data_path}")
print(f"Columns: {raw_df.columns}")

NameError: name 'spark' is not defined

In [None]:
# Schema inspection
raw_df.printSchema()

In [None]:
# Sample data preview
raw_df.show(5, truncate=False)

In [None]:
# Data statistics
raw_df.describe().show()

In [None]:
# Null analysis
null_counts = []
for col in raw_df.columns:
    null_count = raw_df.filter(F.col(col).isNull()).count()
    null_counts.append((col, null_count, null_count / raw_df.count() * 100))

null_df = pd.DataFrame(null_counts, columns=['Column', 'Null_Count', 'Null_Percentage'])
print("Null Analysis:")
print(null_df.to_string(index=False))

## 3. Bronze Layer - Raw Data Ingestion

The Bronze layer stores raw data exactly as received from the source, with added metadata for lineage tracking.

In [None]:
# Experiment: Adding ingestion metadata
import uuid
from datetime import datetime

batch_id = str(uuid.uuid4())[:8]

bronze_df = raw_df \
    .withColumn("_ingestion_timestamp", F.current_timestamp()) \
    .withColumn("_source_file", F.lit(data_path)) \
    .withColumn("_batch_id", F.lit(batch_id)) \
    .withColumn("_ingestion_date", F.current_date())

print(f"Bronze layer columns: {bronze_df.columns}")
bronze_df.select("_ingestion_timestamp", "_source_file", "_batch_id").show(3, truncate=False)

In [None]:
# Experiment: Schema definition for consistent data types
BRONZE_SCHEMA = StructType([
    StructField("Airline", StringType(), True),
    StructField("Date_of_Journey", StringType(), True),
    StructField("Source", StringType(), True),
    StructField("Destination", StringType(), True),
    StructField("Route", StringType(), True),
    StructField("Dep_Time", StringType(), True),
    StructField("Arrival_Time", StringType(), True),
    StructField("Duration", StringType(), True),
    StructField("Total_Stops", StringType(), True),
    StructField("Additional_Info", StringType(), True),
    StructField("Price", IntegerType(), True)
])

print("Bronze schema defined for consistent ingestion")

## 4. Silver Layer - Data Cleaning Experiments

In [None]:
# Experiment 1: Date parsing strategies
print("Testing date parsing...")
print(f"Sample dates: {raw_df.select('Date_of_Journey').distinct().take(5)}")

# Try different date formats
date_test = raw_df.withColumn(
    "parsed_date",
    F.to_date(F.col("Date_of_Journey"), "d/M/yyyy")
)

# Count successful parses
successful = date_test.filter(F.col("parsed_date").isNotNull()).count()
print(f"Successfully parsed: {successful}/{raw_df.count()} ({successful/raw_df.count()*100:.1f}%)")

In [None]:
# Experiment 2: Duration parsing
print("Testing duration parsing...")
print(f"Sample durations: {raw_df.select('Duration').distinct().take(10)}")

# Parse duration to minutes
duration_test = raw_df.withColumn(
    "Duration_Hours",
    F.when(
        F.col("Duration").contains("h"),
        F.regexp_extract(F.col("Duration"), r"(\d+)h", 1).cast("int")
    ).otherwise(0)
).withColumn(
    "Duration_Mins",
    F.when(
        F.col("Duration").contains("m"),
        F.regexp_extract(F.col("Duration"), r"(\d+)m", 1).cast("int")
    ).otherwise(0)
).withColumn(
    "Duration_Minutes",
    F.col("Duration_Hours") * 60 + F.col("Duration_Mins")
)

duration_test.select("Duration", "Duration_Hours", "Duration_Mins", "Duration_Minutes").show(10)

In [None]:
# Experiment 3: Stops parsing
print("Testing stops parsing...")
print(f"Unique stops: {raw_df.select('Total_Stops').distinct().collect()}")

stops_test = raw_df.withColumn(
    "Total_Stops_Num",
    F.when(F.col("Total_Stops") == "non-stop", 0)
     .when(F.col("Total_Stops") == "1 stop", 1)
     .when(F.col("Total_Stops") == "2 stops", 2)
     .when(F.col("Total_Stops") == "3 stops", 3)
     .when(F.col("Total_Stops") == "4 stops", 4)
     .otherwise(F.regexp_extract(F.col("Total_Stops"), r"(\d+)", 1).cast("int"))
)

stops_test.groupBy("Total_Stops", "Total_Stops_Num").count().show()

In [None]:
# Experiment 4: String cleaning
print("Testing string standardization...")

# Check for whitespace issues
clean_test = raw_df.withColumn(
    "Airline_Clean",
    F.trim(F.col("Airline"))
).withColumn(
    "Source_Clean",
    F.initcap(F.trim(F.col("Source")))
)

# Compare before/after
differences = clean_test.filter(
    F.col("Airline") != F.col("Airline_Clean")
).count()
print(f"Records with whitespace issues in Airline: {differences}")

## 5. Gold Layer - Feature Engineering Experiments

In [None]:
# Create silver-like clean data for feature experiments
silver_df = raw_df \
    .withColumn("Journey_Date", F.to_date(F.col("Date_of_Journey"), "d/M/yyyy")) \
    .withColumn("Journey_Day", F.dayofmonth("Journey_Date")) \
    .withColumn("Journey_Month", F.month("Journey_Date")) \
    .withColumn("Journey_Year", F.year("Journey_Date")) \
    .withColumn("Journey_DayOfWeek", F.dayofweek("Journey_Date")) \
    .withColumn("Dep_Hour", F.split(F.col("Dep_Time"), ":").getItem(0).cast("int")) \
    .withColumn("Dep_Minute", F.split(F.col("Dep_Time"), ":").getItem(1).cast("int")) \
    .withColumn("Duration_Minutes", 
        F.when(F.col("Duration").contains("h"),
            F.regexp_extract(F.col("Duration"), r"(\d+)h", 1).cast("int") * 60
        ).otherwise(0) +
        F.when(F.col("Duration").contains("m"),
            F.regexp_extract(F.col("Duration"), r"(\d+)m", 1).cast("int")
        ).otherwise(0)
    ) \
    .withColumn("Total_Stops_Num",
        F.when(F.col("Total_Stops") == "non-stop", 0)
         .otherwise(F.regexp_extract(F.col("Total_Stops"), r"(\d+)", 1).cast("int"))
    )

print("Silver data created for feature experiments")
silver_df.printSchema()

In [None]:
# Experiment 5: Time-based features
print("Creating time-based features...")

gold_df = silver_df \
    .withColumn("IsWeekend",
        F.when(F.col("Journey_DayOfWeek").isin([1, 7]), 1).otherwise(0)
    ) \
    .withColumn("IsMorningFlight",
        F.when((F.col("Dep_Hour") >= 5) & (F.col("Dep_Hour") < 12), 1).otherwise(0)
    ) \
    .withColumn("IsAfternoonFlight",
        F.when((F.col("Dep_Hour") >= 12) & (F.col("Dep_Hour") < 17), 1).otherwise(0)
    ) \
    .withColumn("IsEveningFlight",
        F.when((F.col("Dep_Hour") >= 17) & (F.col("Dep_Hour") < 21), 1).otherwise(0)
    ) \
    .withColumn("IsNightFlight",
        F.when((F.col("Dep_Hour") >= 21) | (F.col("Dep_Hour") < 5), 1).otherwise(0)
    ) \
    .withColumn("Is_Direct",
        F.when(F.col("Total_Stops_Num") == 0, 1).otherwise(0)
    )

# Verify feature distributions
print("\nFeature distributions:")
gold_df.groupBy("IsMorningFlight", "IsAfternoonFlight", "IsEveningFlight", "IsNightFlight").count().show()

In [None]:
# Experiment 6: Route-based features
print("Creating route features...")

gold_df = gold_df \
    .withColumn("Route_Segments",
        F.when(F.col("Route").isNotNull(),
            F.size(F.split(F.col("Route"), " → "))
        ).otherwise(2)
    ) \
    .withColumn("City_Pair",
        F.concat_ws("_", F.col("Source"), F.col("Destination"))
    )

# Route complexity analysis
print("\nRoute complexity distribution:")
gold_df.groupBy("Route_Segments").agg(
    F.count("*").alias("count"),
    F.mean("Price").alias("avg_price"),
    F.mean("Duration_Minutes").alias("avg_duration")
).orderBy("Route_Segments").show()

In [None]:
# Experiment 7: Statistical features (price aggregations)
print("Creating statistical features...")

# Average price by airline
airline_stats = gold_df.groupBy("Airline").agg(
    F.mean("Price").alias("Price_mean_by_airline"),
    F.stddev("Price").alias("Price_std_by_airline")
)

# Average price by route
route_stats = gold_df.groupBy("City_Pair").agg(
    F.mean("Price").alias("Price_mean_by_route"),
    F.count("*").alias("Route_frequency")
)

# Join back to main dataframe
gold_df = gold_df.join(airline_stats, on="Airline", how="left")
gold_df = gold_df.join(route_stats, on="City_Pair", how="left")

# Price deviation from airline mean
gold_df = gold_df.withColumn(
    "Price_Deviation_Airline",
    F.when(F.col("Price_std_by_airline") > 0,
        (F.col("Price") - F.col("Price_mean_by_airline")) / F.col("Price_std_by_airline")
    ).otherwise(0)
)

print("Statistical features created")
gold_df.select("Airline", "Price", "Price_mean_by_airline", "Price_Deviation_Airline").show(10)

## 6. Feature Correlation Analysis

In [None]:
# Convert to Pandas for correlation analysis
numeric_cols = [
    "Journey_Day", "Journey_Month", "Dep_Hour", "Dep_Minute",
    "Duration_Minutes", "Total_Stops_Num", "IsWeekend",
    "IsMorningFlight", "IsEveningFlight", "Is_Direct",
    "Route_Segments", "Price"
]

# Sample for visualization
sample_pd = gold_df.select(numeric_cols).sample(0.3).toPandas()

# Correlation matrix
plt.figure(figsize=(12, 10))
correlation = sample_pd.corr()
sns.heatmap(correlation, annot=True, cmap='coolwarm', center=0, fmt='.2f')
plt.title('Feature Correlation Matrix')
plt.tight_layout()
plt.show()

In [None]:
# Price correlations
print("Correlation with Price:")
price_corr = correlation['Price'].sort_values(ascending=False)
print(price_corr)

## 7. Conclusions & Next Steps

### Key Findings:

1. **Date Parsing**: Format `d/M/yyyy` works correctly for all records
2. **Duration**: Regex extraction handles all variations including edge cases
3. **Stops**: Categorical to numeric mapping is straightforward
4. **Time Features**: Morning/evening flight indicators show correlation with price
5. **Route Complexity**: More segments = higher price (correlation confirmed)

### Medallion Architecture Benefits:
- Bronze: Preserves raw data for audit/replay
- Silver: Clean, typed data for reliable transformations
- Gold: ML-ready features with no additional preprocessing needed

### Recommended Features for ML:
- Duration_Minutes
- Total_Stops_Num
- Time-based flags (IsWeekend, IsMorningFlight, etc.)
- Statistical features (Price_mean_by_airline, Price_mean_by_route)

In [None]:
# Cleanup
spark.stop()
print("✓ Spark session stopped")