### streaming_data

- ####flights data

In [0]:
#CONFIGURATION

BRONZE_PATH = "dbfs:/FileStore/tables/sahan_project/bronze/streaming_data"
SILVER_PATH = "dbfs:/FileStore/tables/sahan_project/silver/flights"

In [0]:
# IMPORTS

from pyspark.sql import DataFrame
from pyspark.sql.functions import (
    col, lit, concat_ws, to_date, when, expr
)
from pyspark.sql.types import IntegerType
from pyspark.sql.window import Window


In [0]:
#READ BRONZE DATA

bronze_df = spark.read.format("delta").load(BRONZE_PATH)

In [0]:
#CREATE AUTHORITATIVE FLIGHT DATE

silver_df = bronze_df.withColumn(
    "FLIGHT_DATE",
    to_date(concat_ws("-", col("YEAR"), col("MONTH"), col("DAY")), "yyyy-M-d")
)


In [0]:
#TIMESTAMP CONVERSION (HHmm)

def hhmm_to_timestamp_safe(df: DataFrame, time_col: str) -> DataFrame:
    return df.withColumn(
        f"{time_col}_TS",
        expr(f"""
            CASE
                WHEN {time_col} IS NULL THEN NULL
                WHEN trim({time_col}) = '' THEN NULL
                WHEN length(lpad({time_col}, 4, '0')) != 4 THEN NULL
                ELSE try_to_timestamp(
                    concat(FLIGHT_DATE, ' ', lpad({time_col}, 4, '0')),
                    'yyyy-MM-dd HHmm'
                )
            END
        """)
    )

TIME_COLUMNS = [
    "SCHEDULED_DEPARTURE",
    "DEPARTURE_TIME",
    "WHEELS_OFF",
    "WHEELS_ON",
    "SCHEDULED_ARRIVAL",
    "ARRIVAL_TIME"
]

for c in TIME_COLUMNS:
    if c in silver_df.columns:
        silver_df = hhmm_to_timestamp_safe(silver_df, c)

# Drop raw HHmm columns (never keep them in Silver)
silver_df = silver_df.drop(*[c for c in TIME_COLUMNS if c in silver_df.columns])


In [0]:
# CAST DURATION / DELAY COLUMNS

DURATION_COLUMNS = [
    "DEPARTURE_DELAY",
    "TAXI_OUT",
    "TAXI_IN",
    "AIR_TIME",
    "ELAPSED_TIME",
    "ARRIVAL_DELAY",
    "AIR_SYSTEM_DELAY",
    "SECURITY_DELAY",
    "AIRLINE_DELAY",
    "LATE_AIRCRAFT_DELAY",
    "WEATHER_DELAY"
]

existing_columns = set(silver_df.columns)

# Cast existing columns
for c in DURATION_COLUMNS:
    if c in existing_columns:
        silver_df = silver_df.withColumn(c, col(c).cast(IntegerType()))

# Add missing duration columns as NULL (stable schema)
existing_columns = set(silver_df.columns)
for c in DURATION_COLUMNS:
    if c not in existing_columns:
        silver_df = silver_df.withColumn(c, lit(None).cast(IntegerType()))


In [0]:
#NORMALIZE NEGATIVE DELAYS

silver_df = (
    silver_df
    .withColumn(
        "DEPARTURE_DELAY",
        when(col("DEPARTURE_DELAY").isNull(), None)
        .when(col("DEPARTURE_DELAY") < 0, 0)
        .otherwise(col("DEPARTURE_DELAY"))
    )
    .withColumn(
        "ARRIVAL_DELAY",
        when(col("ARRIVAL_DELAY").isNull(), None)
        .when(col("ARRIVAL_DELAY") < 0, 0)
        .otherwise(col("ARRIVAL_DELAY"))
    )
)

In [0]:
#DATA QUALITY FILTERS

silver_df = silver_df.filter(
    col("FLIGHT_DATE").isNotNull() &
    col("AIRLINE").isNotNull() &
    col("FLIGHT_NUMBER").isNotNull() &
    col("ORIGIN_AIRPORT").isNotNull() &
    col("DESTINATION_AIRPORT").isNotNull()
)

In [0]:
#DEDUPLICATION 

DEDUP_KEYS = [
    "FLIGHT_DATE",
    "AIRLINE",
    "FLIGHT_NUMBER",
    "ORIGIN_AIRPORT",
    "DESTINATION_AIRPORT"
]

silver_df = (
    silver_df
    .withColumn(
        "rn",
        expr("""
            row_number() OVER (
                PARTITION BY FLIGHT_DATE, AIRLINE, FLIGHT_NUMBER,
                             ORIGIN_AIRPORT, DESTINATION_AIRPORT
                ORDER BY INGESTED_AT DESC
            )
        """)
    )
    .filter(col("rn") == 1)
    .drop("rn")
)


In [0]:
#FINAL SILVER COLUMN

FINAL_COLUMNS = [
    "FLIGHT_DATE",
    "YEAR", "MONTH", "DAY", "DAY_OF_WEEK",
    "AIRLINE", "FLIGHT_NUMBER", "TAIL_NUMBER",
    "ORIGIN_AIRPORT", "DESTINATION_AIRPORT",

    "SCHEDULED_DEPARTURE_TS",
    "DEPARTURE_TIME_TS",
    "WHEELS_OFF_TS",
    "WHEELS_ON_TS",
    "SCHEDULED_ARRIVAL_TS",
    "ARRIVAL_TIME_TS",

    "DEPARTURE_DELAY", "ARRIVAL_DELAY",
    "TAXI_OUT", "TAXI_IN", "AIR_TIME", "ELAPSED_TIME",

    "CANCELLED", "DIVERTED", "CANCELLATION_REASON",

    "INGESTED_AT", "SOURCE_FILE"
]

existing_columns = set(silver_df.columns)
final_columns_safe = [c for c in FINAL_COLUMNS if c in existing_columns]

silver_df = silver_df.select(*final_columns_safe)


In [0]:
#WRITE SILVER TABLE

(
    silver_df.write
    .format("delta")
    .mode("overwrite")
    .partitionBy("YEAR", "MONTH")
    .option("overwriteSchema", "true")
    .save(SILVER_PATH)
)

In [0]:
#VALIDATION

spark.read.format("delta").load(SILVER_PATH).printSchema()
spark.read.format("delta").load(SILVER_PATH).limit(5).show(truncate=False)

root
 |-- FLIGHT_DATE: date (nullable = true)
 |-- YEAR: integer (nullable = true)
 |-- MONTH: integer (nullable = true)
 |-- DAY: integer (nullable = true)
 |-- DAY_OF_WEEK: integer (nullable = true)
 |-- AIRLINE: string (nullable = true)
 |-- FLIGHT_NUMBER: string (nullable = true)
 |-- TAIL_NUMBER: string (nullable = true)
 |-- ORIGIN_AIRPORT: string (nullable = true)
 |-- DESTINATION_AIRPORT: string (nullable = true)
 |-- SCHEDULED_DEPARTURE_TS: timestamp (nullable = true)
 |-- DEPARTURE_TIME_TS: timestamp (nullable = true)
 |-- WHEELS_OFF_TS: timestamp (nullable = true)
 |-- WHEELS_ON_TS: timestamp (nullable = true)
 |-- SCHEDULED_ARRIVAL_TS: timestamp (nullable = true)
 |-- ARRIVAL_TIME_TS: timestamp (nullable = true)
 |-- DEPARTURE_DELAY: integer (nullable = true)
 |-- ARRIVAL_DELAY: integer (nullable = true)
 |-- TAXI_OUT: integer (nullable = true)
 |-- TAXI_IN: integer (nullable = true)
 |-- AIR_TIME: integer (nullable = true)
 |-- ELAPSED_TIME: integer (nullable = true)
 |-- 

### Batch_data

In [0]:
#imports

from pyspark.sql.functions import col, trim, upper, current_timestamp, row_number
from pyspark.sql.window import Window

In [0]:
# Paths
bronze_airlines_path = "dbfs:/FileStore/tables/sahan_project/bronze/batch_data/airlines"
bronze_airports_path = "dbfs:/FileStore/tables/sahan_project/bronze/batch_data/airports"
 
silver_airlines_path = "dbfs:/FileStore/tables/sahan_project/silver/airlines"
silver_airports_path = "dbfs:/FileStore/tables/sahan_project/silver/airports"

- ####airlines data

In [0]:
airlines_df = spark.read.format("delta").load(bronze_airlines_path)
 
# --- Clean Columns ---
airlines_df = (
    airlines_df
    .withColumn("IATA_CODE", upper(trim(col("IATA_CODE"))))
    .withColumn("AIRLINE", trim(col("AIRLINE")))
)
 
# --- Deduplicate ---
w_airl = Window.partitionBy("IATA_CODE").orderBy(col("LOAD_TIMESTAMP").desc())
 
airlines_silver = (
    airlines_df
    .withColumn("rn", row_number().over(w_airl))
    .filter(col("rn") == 1)
    .drop("rn")
)
 
# --- Add Silver Metadata ---
airlines_silver = airlines_silver.withColumn("INGESTED_AT", current_timestamp())
 
# --- Write Silver Table ---
(
    airlines_silver.write
    .format("delta")
    .mode("overwrite")
    .option("overwriteSchema", "true")
    .save(silver_airlines_path)
)

- #### airports data

In [0]:
airports_df = spark.read.format("delta").load(bronze_airports_path)
 
# --- Clean Columns ---
airports_df = (
    airports_df
    .withColumn("IATA_CODE", upper(trim(col("IATA_CODE"))))
    .withColumn("AIRPORT", trim(col("AIRPORT")))
    .withColumn("CITY", trim(col("CITY")))
    .withColumn("STATE", trim(col("STATE")))
    .withColumn("COUNTRY", trim(col("COUNTRY")))
    .withColumn("LATITUDE", trim(col("LATITUDE")))
    .withColumn("LONGITUDE", trim(col("LONGITUDE")))
)
 
# --- Deduplicate ---
w_airp = Window.partitionBy("IATA_CODE").orderBy(col("LOAD_TIMESTAMP").desc())
 
airports_silver = (
    airports_df
    .withColumn("rn", row_number().over(w_airp))
    .filter(col("rn") == 1)
    .drop("rn")
)
 
# --- Add Silver Metadata ---
airports_silver = airports_silver.withColumn("INGESTED_AT", current_timestamp())
 
# --- Write Silver Table ---
(
    airports_silver.write
    .format("delta")
    .mode("overwrite")
    .option("overwriteSchema", "true")
    .save(silver_airports_path)
)
 

In [0]:
df = spark.read.format("delta").load("dbfs:/FileStore/tables/sahan_project/silver/flights")
display(df.limit(5))

FLIGHT_DATE,YEAR,MONTH,DAY,DAY_OF_WEEK,AIRLINE,FLIGHT_NUMBER,TAIL_NUMBER,ORIGIN_AIRPORT,DESTINATION_AIRPORT,SCHEDULED_DEPARTURE_TS,DEPARTURE_TIME_TS,WHEELS_OFF_TS,WHEELS_ON_TS,SCHEDULED_ARRIVAL_TS,ARRIVAL_TIME_TS,DEPARTURE_DELAY,ARRIVAL_DELAY,TAXI_OUT,TAXI_IN,AIR_TIME,ELAPSED_TIME,CANCELLED,DIVERTED,CANCELLATION_REASON,INGESTED_AT,SOURCE_FILE
2015-01-01,2015,1,1,4,AA,1005,N3JXAA,DFW,FLL,2015-01-01T14:25:00Z,2015-01-01T14:28:00Z,2015-01-01T14:41:00Z,2015-01-01T17:55:00Z,2015-01-01T18:05:00Z,2015-01-01T18:03:00Z,3,0,13,8,134,155,0,0,,2025-12-30T16:03:21.082Z,dbfs:/FileStore/tables/sahan_project/raw/flights.csv
2015-01-01,2015,1,1,4,AA,1008,N3HSAA,ORD,RSW,2015-01-01T18:45:00Z,2015-01-01T20:57:00Z,2015-01-01T21:12:00Z,2015-01-01T00:37:00Z,2015-01-01T22:30:00Z,2015-01-01T00:42:00Z,132,132,15,5,145,165,0,0,,2025-12-30T16:03:21.082Z,dbfs:/FileStore/tables/sahan_project/raw/flights.csv
2015-01-01,2015,1,1,4,AA,102,N376AA,HNL,DFW,2015-01-01T19:20:00Z,2015-01-01T23:56:00Z,2015-01-01T00:13:00Z,2015-01-01T11:10:00Z,2015-01-01T06:45:00Z,2015-01-01T11:21:00Z,276,276,17,11,417,445,0,0,,2025-12-30T16:03:21.082Z,dbfs:/FileStore/tables/sahan_project/raw/flights.csv
2015-01-01,2015,1,1,4,AA,1022,N4YUAA,OKC,DFW,2015-01-01T07:05:00Z,2015-01-01T07:10:00Z,2015-01-01T07:45:00Z,2015-01-01T08:22:00Z,2015-01-01T08:10:00Z,2015-01-01T08:32:00Z,5,22,35,10,37,82,0,0,,2025-12-30T16:03:21.082Z,dbfs:/FileStore/tables/sahan_project/raw/flights.csv
2015-01-01,2015,1,1,4,AA,1028,N3BFAA,DEN,ORD,2015-01-01T06:00:00Z,2015-01-01T06:01:00Z,2015-01-01T06:15:00Z,2015-01-01T09:01:00Z,2015-01-01T09:35:00Z,2015-01-01T09:08:00Z,1,0,14,7,106,127,0,0,,2025-12-30T16:03:21.082Z,dbfs:/FileStore/tables/sahan_project/raw/flights.csv
