In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, explode, to_timestamp, lit, from_json
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, IntegerType, DecimalType, BooleanType, TimestampType

spark = SparkSession.builder.appName("BronzeToSilver_FactTable").getOrCreate()
spark.conf.set(f"fs.azure.account.key.{account_name}.dfs.core.windows.net", account_key)

In [0]:
spark.version

'3.5.0'

In [0]:
# Define base paths
input_base_path = f"abfss://bronze@{account_name}.dfs.core.windows.net"
output_base_path = f"abfss://silver@{account_name}.dfs.core.windows.net"

In [0]:
from datetime import datetime, timezone

today = datetime.now(timezone.utc)
year = today.strftime('%Y')
month = today.strftime('%m')
day = today.strftime('%d')

# Flights table

In [0]:
bronze_flights_df  = spark.read.json(f"{input_base_path}/flight/{year}/{month}/{day}/*.json")

#### Step 1: Explode JSON fields to flatten nested structures
##### Extract the "data" array from the JSON

In [0]:
flights_data_df = bronze_flights_df.select(explode(col("data")).alias("flight"))

#### Step 2: Schema enforcement

##### Select relevant fields and apply the schema 

In [0]:
flights_data_df_enforced = flights_data_df.select(
    col("flight.flight_date").cast(StringType()).alias("flight_date"),
    col("flight.flight_status").cast(StringType()).alias("flight_status"),
    
    # Departure fields
    col("flight.departure.airport").cast(StringType()).alias("departure_airport"),
    col("flight.departure.timezone").cast(StringType()).alias("departure_timezone"),
    col("flight.departure.iata").cast(StringType()).alias("departure_iata"),
    col("flight.departure.icao").cast(StringType()).alias("departure_icao"),
    col("flight.departure.terminal").cast(StringType()).alias("departure_terminal"),
    col("flight.departure.gate").cast(StringType()).alias("departure_gate"),
    col("flight.departure.delay").cast(IntegerType()).alias("departure_delay"),
    col("flight.departure.scheduled").cast(StringType()).alias("departure_scheduled"),
    col("flight.departure.estimated").cast(StringType()).alias("departure_estimated"),
    col("flight.departure.actual").cast(StringType()).alias("departure_actual"),
    col("flight.departure.estimated_runway").cast(StringType()).alias("departure_estimated_runway"),
    col("flight.departure.actual_runway").cast(StringType()).alias("departure_actual_runway"),
    
    # Arrival fields
    col("flight.arrival.airport").cast(StringType()).alias("arrival_airport"),
    col("flight.arrival.timezone").cast(StringType()).alias("arrival_timezone"),
    col("flight.arrival.iata").cast(StringType()).alias("arrival_iata"),
    col("flight.arrival.icao").cast(StringType()).alias("arrival_icao"),
    col("flight.arrival.terminal").cast(StringType()).alias("arrival_terminal"),
    col("flight.arrival.gate").cast(StringType()).alias("arrival_gate"),
    col("flight.arrival.baggage").cast(StringType()).alias("arrival_baggage"),
    col("flight.arrival.delay").cast(IntegerType()).alias("arrival_delay"),
    col("flight.arrival.scheduled").cast(StringType()).alias("arrival_scheduled"),
    col("flight.arrival.estimated").cast(StringType()).alias("arrival_estimated"),
    col("flight.arrival.actual").cast(StringType()).alias("arrival_actual"),
    col("flight.arrival.estimated_runway").cast(StringType()).alias("arrival_estimated_runway"),
    col("flight.arrival.actual_runway").cast(StringType()).alias("arrival_actual_runway"),
    
    # Airline fields
    col("flight.airline.name").cast(StringType()).alias("airline_name"),
    col("flight.airline.iata").cast(StringType()).alias("airline_iata"),
    col("flight.airline.icao").cast(StringType()).alias("airline_icao"),
    
    # Flight fields
    col("flight.flight.number").cast(IntegerType()).alias("flight_number"),
    col("flight.flight.iata").cast(StringType()).alias("flight_iata"),
    col("flight.flight.icao").cast(StringType()).alias("flight_icao"),
    col("flight.flight.codeshared").cast(StringType()).alias("flight_codeshared"),
    
    # Aircraft fields
    col("flight.aircraft.registration").cast(StringType()).alias("aircraft_registration"),
    col("flight.aircraft.iata").cast(StringType()).alias("aircraft_iata"),
    col("flight.aircraft.icao").cast(StringType()).alias("aircraft_icao"),
    col("flight.aircraft.icao24").cast(StringType()).alias("aircraft_icao24"),
    
    # Live fields
    col("flight.live.updated").cast(TimestampType()).alias("live_updated"),
    col("flight.live.latitude").cast(DoubleType()).alias("live_latitude"),
    col("flight.live.longitude").cast(DoubleType()).alias("live_longitude"),
    col("flight.live.altitude").cast(DoubleType()).alias("live_altitude"),
    col("flight.live.direction").cast(DoubleType()).alias("live_direction"),
    col("flight.live.speed_horizontal").cast(DoubleType()).alias("live_speed_horizontal"),
    col("flight.live.speed_vertical").cast(DoubleType()).alias("live_speed_vertical"),
    col("flight.live.is_ground").cast(BooleanType()).alias("live_is_ground")
)

#### Step 3: Handle null values and standardize formats
##### Replace empty strings with corresponding appropriate values and get rid of the records with a null primary key 


In [0]:
flights_data_df_enforced.createOrReplaceTempView("silver_flights")

In [0]:
silver_flights_df = spark.sql("""
    SELECT
        COALESCE(flight_date, 'N/A') AS flight_date,
        CASE
            WHEN arrival_actual IS NOT NULL THEN 'landed'
            ELSE COALESCE(flight_status, 'unknown') 
        END AS flight_status,
        
        -- Departure fields
        COALESCE(departure_airport, 'Unknown') AS departure_airport,
        COALESCE(departure_timezone, 'Unknown') AS departure_timezone,
        COALESCE(departure_iata, 'N/A') AS departure_iata,
        COALESCE(departure_icao, 'N/A') AS departure_icao,
        COALESCE(departure_terminal, 'N/A') AS departure_terminal,
        COALESCE(departure_gate, 'N/A') AS departure_gate,
        COALESCE(departure_delay, 0) AS departure_delay,
        COALESCE(departure_scheduled, 'N/A') AS departure_scheduled,
        COALESCE(departure_estimated, 'N/A') AS departure_estimated,
        COALESCE(departure_actual, 'N/A') AS departure_actual,
        COALESCE(departure_estimated_runway, 'N/A') AS departure_estimated_runway,
        COALESCE(departure_actual_runway, 'N/A') AS departure_actual_runway,
        
        -- Arrival fields
        COALESCE(arrival_airport, 'Unknown') AS arrival_airport,
        COALESCE(arrival_timezone, 'Unknown') AS arrival_timezone,
        COALESCE(arrival_iata, 'N/A') AS arrival_iata,
        COALESCE(arrival_icao, 'N/A') AS arrival_icao, --pk
        COALESCE(arrival_terminal, 'N/A') AS arrival_terminal,
        COALESCE(arrival_gate, 'N/A') AS arrival_gate,
        COALESCE(arrival_baggage, 'N/A') AS arrival_baggage,
        COALESCE(arrival_delay, 0) AS arrival_delay,
        COALESCE(arrival_scheduled, 'N/A') AS arrival_scheduled,
        COALESCE(arrival_estimated, 'N/A') AS arrival_estimated,
        COALESCE(arrival_actual, 'N/A') AS arrival_actual,
        COALESCE(arrival_estimated_runway, 'N/A') AS arrival_estimated_runway,
        COALESCE(arrival_actual_runway, 'N/A') AS arrival_actual_runway,
        
        -- Airline fields
        COALESCE(airline_name, 'Unknown') AS airline_name,
        COALESCE(airline_iata, 'N/A') AS airline_iata,
        COALESCE(airline_icao, 'N/A') AS airline_icao,
        
        -- Flight fields
        COALESCE(flight_number, 0) AS flight_number,
        COALESCE(flight_iata, 'N/A') AS flight_iata,
        COALESCE(flight_icao, 'N/A') AS flight_icao,
        COALESCE(flight_codeshared, 'N/A') AS flight_codeshared,
        
        -- Aircraft fields
        COALESCE(aircraft_registration, 'N/A') AS aircraft_registration,
        COALESCE(aircraft_iata, 'N/A') AS aircraft_iata, --pk
        COALESCE(aircraft_icao, 'N/A') AS aircraft_icao,
        COALESCE(aircraft_icao24, 'N/A') AS aircraft_icao24,
        
        -- Live fields
        COALESCE(live_updated, 'N/A') AS live_updated,
        COALESCE(live_latitude, 0.0) AS live_latitude,
        COALESCE(live_longitude, 0.0) AS live_longitude,
        COALESCE(live_altitude, 0.0) AS live_altitude,
        COALESCE(live_direction, 0.0) AS live_direction,
        COALESCE(live_speed_horizontal, 0.0) AS live_speed_horizontal,
        COALESCE(live_speed_vertical, 0.0) AS live_speed_vertical,
        COALESCE(live_is_ground, false) AS live_is_ground
        
    FROM 
        silver_flights
    WHERE 
        flight_number IS NOT NULL AND NOT (flight_icao IS NULL AND flight_iata IS NULL)
""")

#### Step 4: Deduplication (if needed)
##### Drop duplicates based on unique identifiers

In [0]:
# print(silver_countries_df.count())
silver_flights_df = silver_flights_df.dropDuplicates(["flight_number", "flight_iata", "flight_icao", "flight_status"])
# print(silver_countries_df.count())


#### Step4: Write into Silver layer as parquet

In [0]:
silver_flights_df.coalesce(1).write.format("delta").option("mergeSchema", "true").mode("overwrite").save(f"{output_base_path}/flights/")
