# Flight Radar DLT Pipeline - Medallion Architecture

This Delta Live Tables (DLT) pipeline implements the **Medallion Architecture** with three data layers:

## Bronze Layer (Raw Data)
- Raw flight radar data as ingested from source systems
- Minimal processing, data validation flags added
- Includes ingestion metadata

## Silver Layer (Cleaned Data)  
- Cleaned, validated, and standardized flight data
- Data quality rules applied
- Derived fields and business logic applied
- Ready for analysis

## Gold Layer (Business Ready)
- Aggregated business metrics and KPIs
- Flight summaries, airport metrics, aircraft performance
- Optimized for reporting and analytics

This pipeline is executed using the configuration in `resources/flightradar_databricks.pipeline.yml`.

In [0]:
# Import dependencies
import dlt
from pyspark.sql.functions import col, explode, current_timestamp, lit, to_timestamp, when, count, avg
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, IntegerType

In [None]:
def get_flight_schema():
    """Define the expected schema for flight radar data"""
    return StructType([
        StructField("fr24_id", StringType(), True),
        StructField("flight", StringType(), True),
        StructField("callsign", StringType(), True),
        StructField("lat", DoubleType(), True),
        StructField("lon", DoubleType(), True),
        StructField("track", IntegerType(), True),
        StructField("alt", IntegerType(), True),
        StructField("gspeed", IntegerType(), True),
        StructField("vspeed", IntegerType(), True),
        StructField("squawk", StringType(), True),
        StructField("timestamp", StringType(), True),
        StructField("source", StringType(), True),
        StructField("hex", StringType(), True),
        StructField("type", StringType(), True),
        StructField("reg", StringType(), True),
        StructField("painted_as", StringType(), True),
        StructField("operating_as", StringType(), True),
        StructField("orig_iata", StringType(), True),
        StructField("orig_icao", StringType(), True),
        StructField("dest_iata", StringType(), True),
        StructField("dest_icao", StringType(), True),
        StructField("eta", StringType(), True)
    ])

def create_bronze_flights(spark, json_path):
    """Bronze Layer: Ingest raw flight data with minimal processing"""
    df = spark.read.option("multiline", "true").json(json_path)
    flights_df = df.select(explode(col("data")).alias("flight_data")).select("flight_data.*")
    bronze_flights = flights_df.withColumn("ingestion_timestamp", current_timestamp()) \
                               .withColumn("data_layer", lit("bronze"))
    return bronze_flights

def validate_bronze_data(bronze_df):
    """Add comprehensive validation flags for data quality"""
    return bronze_df.withColumn("is_valid_record", 
                                    when(col("fr24_id").isNull() | 
                                    col("lat").isNull() | 
                                    col("lon").isNull(), False)
                                    .otherwise(True)) \
                    .withColumn("has_valid_coordinates", 
                                    when((col("lat").between(-90, 90)) & 
                                    (col("lon").between(-180, 180)), True)
                                    .otherwise(False)) \
                    .withColumn("has_valid_altitude", 
                                    when(col("alt").between(0, 50000), True)
                                    .otherwise(False)) \
                    .withColumn("has_valid_speed", 
                                    when(col("gspeed").between(0, 1000), True)
                                    .otherwise(False))

def create_silver_flights(bronze_df):
    """Silver Layer: Clean and validate data with enrichment"""
    return bronze_df.filter(col("is_valid_record") == True) \
        .filter(col("has_valid_coordinates") == True) \
        .withColumn("timestamp_parsed", to_timestamp(col("timestamp"), "yyyy-MM-dd'T'HH:mm:ss'Z'")) \
        .withColumn("altitude_ft", col("alt").cast(IntegerType())) \
        .withColumn("ground_speed_kts", col("gspeed").cast(IntegerType())) \
        .withColumn("vertical_speed_fpm", col("vspeed").cast(IntegerType())) \
        .withColumn("is_climbing", when(col("vspeed") > 0, True).otherwise(False)) \
        .withColumn("is_descending", when(col("vspeed") < 0, True).otherwise(False)) \
        .withColumn("flight_phase", 
                        when(col("alt") < 10000, "takeoff_landing")
                        .when((col("alt") >= 10000) & (col("alt") < 30000), "climb_descent") 
                        .otherwise("cruise")) \
        .withColumn("processing_timestamp", current_timestamp()) \
        .withColumn("data_layer", lit("silver"))

def create_gold_airline_summary(silver_df):
    """Gold Layer: Comprehensive flight summary analytics by airline"""
    return silver_df.groupBy("operating_as") \
        .agg(
            count("*").alias("total_flights"),
            avg("altitude_ft").alias("avg_altitude_ft"),
            avg("ground_speed_kts").alias("avg_ground_speed_kts"),
            avg("vertical_speed_fpm").alias("avg_vertical_speed_fpm"),
            count(when(col("is_climbing"), 1)).alias("climbing_flights"),
            count(when(col("is_descending"), 1)).alias("descending_flights"),
            count(when(col("flight_phase") == "cruise", 1)).alias("cruise_flights")
        ) \
        .withColumn("analysis_timestamp", current_timestamp()) \
        .withColumn("data_layer", lit("gold"))

def create_gold_airport_summary(silver_df):
    """Gold Layer: Airport traffic and approach analytics"""
    return silver_df.groupBy("dest_iata", "dest_icao") \
        .agg(
            count("*").alias("incoming_flights"),
            avg("altitude_ft").alias("avg_approach_altitude"),
            count(when(col("flight_phase") == "takeoff_landing", 1)).alias("approaching_flights")
        ) \
        .filter(col("dest_iata").isNotNull()) \
        .withColumn("analysis_timestamp", current_timestamp()) \
        .withColumn("data_layer", lit("gold"))

def create_gold_aircraft_performance(silver_df):
    """Gold Layer: Aircraft type and registration performance metrics"""
    return silver_df.groupBy("type", "reg") \
        .agg(
            count("*").alias("total_observations"),
            avg("altitude_ft").alias("avg_altitude"),
            avg("ground_speed_kts").alias("avg_ground_speed"),
            avg("vertical_speed_fpm").alias("avg_vertical_speed")
        ) \
        .withColumn("analysis_timestamp", current_timestamp()) \
        .withColumn("data_layer", lit("gold"))

def filter_by_destination(df, dest_iata_code):
    """Filter flights by destination airport IATA code"""
    return df.filter(col("dest_iata") == dest_iata_code)

def filter_by_airline(df, airline_code):
    """Filter flights by airline operating code"""
    return df.filter(col("operating_as") == airline_code)

def filter_by_altitude_range(df, min_alt, max_alt):
    """Filter flights within altitude range (in feet)"""
    return df.filter(col("altitude_ft").between(min_alt, max_alt))

In [None]:
# =============================================================================
# BRONZE LAYER - Raw Data Ingestion
# =============================================================================

@dlt.table(
    name="bronze_flights",
    comment="Raw flight radar data with basic validation and ingestion metadata",
    table_properties={
        'quality': 'bronze'
    }
)
def bronze_flights():
    """Bronze Layer: Raw flight data with minimal processing"""
    return create_bronze_flights(spark, json_path='s3://flightradardata/flight-data/')

@dlt.table(
    name="validated_bronze_flights",
    comment="Validated raw flight radar data",
    table_properties={
        'quality': 'bronze'
    }
)
def validated_bronze_flights():
    """Add validation flag"""
    bronze_df = dlt.read("bronze_flights")
    return validate_bronze_data(bronze_df)

# =============================================================================
# SILVER LAYER - Cleaned Data
# =============================================================================

@dlt.table(
    name="silver_flights",
    comment="Cleaned and validated flight data",
    table_properties={
        'quality': 'silver'
    }
)
def silver_flights():
    """Silver Layer: Cleaned flight data"""
    validated_bronze_df = dlt.read("validated_bronze_flights")
    return create_silver_flights(validated_bronze_df)

# =============================================================================
# GOLD LAYER - Business Analytics
# =============================================================================

@dlt.table(
    name="gold_airline_summary",
    comment="Flight summary analytics by airline",
    table_properties={
        'quality': 'gold'
    }
)
def gold_airline_summary():
    """Gold Layer: Flight summary analytics by airline"""
    silver_df = dlt.read("silver_flights")
    return create_gold_airline_summary(silver_df)