In [0]:
# mount Azure Blob container to /mnt/silver using account name + key
from pyspark.sql.functions import col, from_json, expr, to_timestamp, when, lit, year, month, dayofmonth
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType, LongType, TimestampType, BooleanType
from delta.tables import DeltaTable
import base64, urllib.parse


storage_account_name = "saflightdelays"  
storage_account_key  = ""
silver_container_name = "silver"
mount_point = "/mnt/silver"


# Build wasbs URL and config keys to mount the blob directory
wasbs_url = f"wasbs://{silver_container_name}@{storage_account_name}.blob.core.windows.net"
conf_key = f"fs.azure.account.key.{storage_account_name}.blob.core.windows.net"
configs = {conf_key: storage_account_key}

print("wasbs_url:", wasbs_url)
print("config key:", conf_key)

# Mount if not already mounted
existing_mounts = [m.mountPoint for m in dbutils.fs.mounts()]
if mount_point in existing_mounts:
    print(f"{mount_point} is already mounted")
else:
    print(f"Mounting {wasbs_url} -> {mount_point} ...")
    dbutils.fs.mount(
        source = wasbs_url,
        mount_point = mount_point,
        extra_configs = configs
    )
    print("Mounted.")


if dbutils.fs.ls(mount_point): display(dbutils.fs.ls(mount_point))

wasbs_url: wasbs://silver@saflightdelays.blob.core.windows.net
config key: fs.azure.account.key.saflightdelays.blob.core.windows.net
/mnt/silver is already mounted


path,name,size,modificationTime
dbfs:/mnt/silver/delta/,delta/,0,0


In [0]:
from pyspark.sql.functions import from_json, col, to_timestamp, expr, window
from pyspark.sql.types import StructType, StructField, StringType, LongType, DoubleType, TimestampType


# full Event Hubs connection string
eh_connection_string = ""

# Event Hub name only needed if not in conn string
eventhub_name = "airline-delays"

# Where to write Silver Delta
silver_delta_path = "/mnt/silver/delta/airline_delays"
checkpoint_path = "/mnt/silver/checkpoints/silver_airline_eh"
microbatch_trigger = "1 minute"   # or ""



In [0]:
# Build the connection string in EventHubs format expected by the connector
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

eh_conf = {
  "eventhubs.connectionString": eh_connection_string
}

# read as streaming DataFrame (raw body in 'body' column as binary)
raw = (spark.readStream
       .format("eventhubs")
       .options(**eh_conf)
       .load())

# inspect schema
display(raw.printSchema())

root
 |-- body: binary (nullable = true)
 |-- partition: string (nullable = true)
 |-- offset: string (nullable = true)
 |-- sequenceNumber: long (nullable = true)
 |-- enqueuedTime: timestamp (nullable = true)
 |-- publisher: string (nullable = true)
 |-- partitionKey: string (nullable = true)
 |-- properties: map (nullable = true)
 |    |-- key: string
 |    |-- value: string (valueContainsNull = true)
 |-- systemProperties: map (nullable = true)
 |    |-- key: string
 |    |-- value: string (valueContainsNull = true)



In [0]:
bronze_container     = "bronze"
bronze_mount_point   = "/mnt/bronze"

wasbs_url = f"wasbs://{bronze_container}@{storage_account_name}.blob.core.windows.net"
configs = {f"fs.azure.account.key.{storage_account_name}.blob.core.windows.net": storage_account_key}

# mount if not mounted
mounts = [m.mountPoint for m in dbutils.fs.mounts()]
if bronze_mount_point in mounts:
    print(f"{bronze_mount_point} already mounted")
else:
    dbutils.fs.mount(source=wasbs_url, mount_point=bronze_mount_point, extra_configs=configs)
    print(f"Mounted {wasbs_url} at {bronze_mount_point}")

# quick listing
display(dbutils.fs.ls(bronze_mount_point))

/mnt/bronze already mounted


path,name,size,modificationTime
dbfs:/mnt/bronze/eh-flight-delays/,eh-flight-delays/,0,0


In [0]:
wildcard_path = f"{bronze_mount_point}/eh-flight-delays/airline-delays/*/*/*/*/*/*/*.avro"
print(wildcard_path)


df = spark.read.format("avro").load(wildcard_path)
df.printSchema()
display(df.limit(5))





/mnt/bronze/eh-flight-delays/airline-delays/*/*/*/*/*/*/*.avro
root
 |-- SequenceNumber: long (nullable = true)
 |-- Offset: string (nullable = true)
 |-- EnqueuedTimeUtc: string (nullable = true)
 |-- SystemProperties: map (nullable = true)
 |    |-- key: string
 |    |-- value: struct (valueContainsNull = true)
 |    |    |-- member0: long (nullable = true)
 |    |    |-- member1: double (nullable = true)
 |    |    |-- member2: string (nullable = true)
 |    |    |-- member3: binary (nullable = true)
 |-- Properties: map (nullable = true)
 |    |-- key: string
 |    |-- value: struct (valueContainsNull = true)
 |    |    |-- member0: long (nullable = true)
 |    |    |-- member1: double (nullable = true)
 |    |    |-- member2: string (nullable = true)
 |    |    |-- member3: binary (nullable = true)
 |-- Body: binary (nullable = true)



SequenceNumber,Offset,EnqueuedTimeUtc,SystemProperties,Properties,Body
20015,12884901888,11/7/2025 11:08:26 PM,"Map(x-opt-enqueued-time -> List(1762556906690, null, null, null))",Map(),eyJ5ZWFyIjogIjIwMjUiLCAibW9udGgiOiAiNyIsICJjYXJyaWVyIjogIllWIiwgImNhcnJpZXJfbmFtZSI6ICJNZXNhIEFpcmxpbmVzIEluYy4iLCAiYWlycG9ydCI6ICJDUlAiLCAiYWlycG9ydF9uYW1lIjogIkNvcnB1cyA= (truncated)
20016,12884904768,11/7/2025 11:08:26 PM,"Map(x-opt-enqueued-time -> List(1762556906690, null, null, null))",Map(),eyJ5ZWFyIjogIjIwMjUiLCAibW9udGgiOiAiNyIsICJjYXJyaWVyIjogIllWIiwgImNhcnJpZXJfbmFtZSI6ICJNZXNhIEFpcmxpbmVzIEluYy4iLCAiYWlycG9ydCI6ICJDVkciLCAiYWlycG9ydF9uYW1lIjogIkNpbmNpbm4= (truncated)
20017,12884904840,11/7/2025 11:08:26 PM,"Map(x-opt-enqueued-time -> List(1762556906690, null, null, null))",Map(),eyJ5ZWFyIjogIjIwMjUiLCAibW9udGgiOiAiNyIsICJjYXJyaWVyIjogIllWIiwgImNhcnJpZXJfbmFtZSI6ICJNZXNhIEFpcmxpbmVzIEluYy4iLCAiYWlycG9ydCI6ICJERlciLCAiYWlycG9ydF9uYW1lIjogIkRhbGxhcy8= (truncated)
20018,12884904912,11/7/2025 11:08:26 PM,"Map(x-opt-enqueued-time -> List(1762556906690, null, null, null))",Map(),eyJ5ZWFyIjogIjIwMjUiLCAibW9udGgiOiAiNyIsICJjYXJyaWVyIjogIllWIiwgImNhcnJpZXJfbmFtZSI6ICJNZXNhIEFpcmxpbmVzIEluYy4iLCAiYWlycG9ydCI6ICJHVUMiLCAiYWlycG9ydF9uYW1lIjogIkd1bm5pc28= (truncated)
20019,12884904984,11/7/2025 11:08:26 PM,"Map(x-opt-enqueued-time -> List(1762556906690, null, null, null))",Map(),eyJ5ZWFyIjogIjIwMjUiLCAibW9udGgiOiAiNyIsICJjYXJyaWVyIjogIllWIiwgImNhcnJpZXJfbmFtZSI6ICJNZXNhIEFpcmxpbmVzIEluYy4iLCAiYWlycG9ydCI6ICJJQUQiLCAiYWlycG9ydF9uYW1lIjogIldhc2hpbmc= (truncated)


In [0]:
from pyspark.sql.functions import col, expr

sample = df.select(col("body")).limit(15).collect()
for r in sample:
    b = r['body']
    print(type(b), len(b))
    try:
        print(b.decode('utf-8')[:400])   # try text
    except Exception as e:
        print("not UTF-8 text:", e)

<class 'bytearray'> 533
{"year": "2025", "month": "7", "carrier": "YV", "carrier_name": "Mesa Airlines Inc.", "airport": "CRP", "airport_name": "Corpus Christi, TX: Corpus Christi International", "arr_flights": "19.00", "arr_del15": "4.00", "carrier_ct": "0.71", "weather_ct": "0.30", "nas_ct": "0.70", "security_ct": "0.00", "late_aircraft_ct": "2.29", "arr_cancelled": "1.00", "arr_diverted": "0.00", "arr_delay": "823.00"
<class 'bytearray'> 546
{"year": "2025", "month": "7", "carrier": "YV", "carrier_name": "Mesa Airlines Inc.", "airport": "CVG", "airport_name": "Cincinnati, OH: Cincinnati/Northern Kentucky International", "arr_flights": "75.00", "arr_del15": "14.00", "carrier_ct": "7.64", "weather_ct": "1.00", "nas_ct": "3.57", "security_ct": "0.00", "late_aircraft_ct": "1.79", "arr_cancelled": "0.00", "arr_diverted": "0.00", "arr_delay
<class 'bytearray'> 544
{"year": "2025", "month": "7", "carrier": "YV", "carrier_name": "Mesa Airlines Inc.", "airport": "DFW", "airport_name": "Dalla

In [0]:
df_bronze = spark.read.format("avro").load(wildcard_path)
print(df_bronze.count(), "records loaded")
df_bronze.printSchema()

47155 records loaded
root
 |-- SequenceNumber: long (nullable = true)
 |-- Offset: string (nullable = true)
 |-- EnqueuedTimeUtc: string (nullable = true)
 |-- SystemProperties: map (nullable = true)
 |    |-- key: string
 |    |-- value: struct (valueContainsNull = true)
 |    |    |-- member0: long (nullable = true)
 |    |    |-- member1: double (nullable = true)
 |    |    |-- member2: string (nullable = true)
 |    |    |-- member3: binary (nullable = true)
 |-- Properties: map (nullable = true)
 |    |-- key: string
 |    |-- value: struct (valueContainsNull = true)
 |    |    |-- member0: long (nullable = true)
 |    |    |-- member1: double (nullable = true)
 |    |    |-- member2: string (nullable = true)
 |    |    |-- member3: binary (nullable = true)
 |-- Body: binary (nullable = true)



Drop Metadata and retrieving data from body

In [0]:
# Imports 
from pyspark.sql.functions import col, from_json, udf
from pyspark.sql.types import (
    StructType, StructField, StringType, DoubleType, IntegerType
)

# --- 1) Define schema for the JSON payload 
schema = StructType([
    StructField("year", StringType(), True),
    StructField("month", StringType(), True),
    StructField("carrier", StringType(), True),
    StructField("carrier_name", StringType(), True),
    StructField("airport", StringType(), True),
    StructField("airport_name", StringType(), True),
    StructField("arr_flights", StringType(), True),
    StructField("arr_del15", StringType(), True),
    StructField("carrier_ct", StringType(), True),
    StructField("weather_ct", StringType(), True),
    StructField("nas_ct", StringType(), True),
    StructField("security_ct", StringType(), True),
    StructField("late_aircraft_ct", StringType(), True),
    StructField("arr_cancelled", StringType(), True),
    StructField("arr_diverted", StringType(), True),
    StructField("arr_delay", StringType(), True),
    StructField("carrier_delay", StringType(), True)
 
])

#  2) Safe decode 
def decode_and_trim_bytes(b):
    """Turn bytes/bytearray into UTF-8 string and chop to last complete JSON '}' if truncated."""
    if b is None:
        return None
    try:
        s = b.decode('utf-8', errors='replace') if isinstance(b, (bytes, bytearray)) else str(b)
    except Exception:
        s = str(b)
    
    idx = s.rfind('}')
    return s[:idx+1] if idx != -1 else s

udf_decode_trim = udf(decode_and_trim_bytes, StringType())



print("Bronze columns:", df_bronze.columns)

# 4) Detect the 'body' column name robustly 
body_col_candidates = [c for c in df_bronze.columns if c.lower() == "body"]
if not body_col_candidates:
    raise RuntimeError("No column named 'body' or 'Body' found in bronze. Columns: {}".format(df_bronze.columns))
body_col = body_col_candidates[0]   # take first matching column name (e.g. 'Body' or 'body')

# 5) Decode body into a safe string column (body_str) 
df_decoded = df_bronze.withColumn("body_str", udf_decode_trim(col(body_col)))


display(df_decoded.select("body_str").limit(5))

#  6) Parse JSON into structured columns, but ONLY keep the parsed fields
# We call from_json on body_str and then select data.* to exclude metadata columns
df_parsed = df_decoded.select(from_json(col("body_str"), schema).alias("data"))

# flatten the struct into top-level columns and drop rows where parsing failed 
df_flat = df_parsed.select("data.*").na.drop(subset=["year"])  # drop rows where JSON parse likely failed

#  7) Cast numeric columns to numeric types for downstream processing 
from pyspark.sql.types import DoubleType, IntegerType
df_clean = (df_flat
    .withColumn("year", col("year").cast(IntegerType()))
    .withColumn("month", col("month").cast(IntegerType()))
    .withColumn("arr_flights", col("arr_flights").cast(DoubleType()))
    .withColumn("arr_del15", col("arr_del15").cast(DoubleType()))
    .withColumn("carrier_ct", col("carrier_ct").cast(DoubleType()))
    .withColumn("weather_ct", col("weather_ct").cast(DoubleType()))
    .withColumn("nas_ct", col("nas_ct").cast(DoubleType()))
    .withColumn("security_ct", col("security_ct").cast(DoubleType()))
    .withColumn("late_aircraft_ct", col("late_aircraft_ct").cast(DoubleType()))
    .withColumn("arr_cancelled", col("arr_cancelled").cast(DoubleType()))
    .withColumn("arr_diverted", col("arr_diverted").cast(DoubleType()))
    .withColumn("arr_delay", col("arr_delay").cast(DoubleType()))
    .withColumn("carrier_delay", col("carrier_delay").cast(DoubleType()))
)

# final preview
display(df_clean.limit(10))
print("Parsed records (approx):", df_clean.count())



Bronze columns: ['SequenceNumber', 'Offset', 'EnqueuedTimeUtc', 'SystemProperties', 'Properties', 'Body']


body_str
"{""year"": ""2025"", ""month"": ""7"", ""carrier"": ""YV"", ""carrier_name"": ""Mesa Airlines Inc."", ""airport"": ""CRP"", ""airport_name"": ""Corpus Christi, TX: Corpus Christi International"", ""arr_flights"": ""19.00"", ""arr_del15"": ""4.00"", ""carrier_ct"": ""0.71"", ""weather_ct"": ""0.30"", ""nas_ct"": ""0.70"", ""security_ct"": ""0.00"", ""late_aircraft_ct"": ""2.29"", ""arr_cancelled"": ""1.00"", ""arr_diverted"": ""0.00"", ""arr_delay"": ""823.00"", ""carrier_delay"": ""45.00"", ""weather_delay"": ""6.00"", ""nas_delay"": ""14.00"", ""security_delay"": ""0.00"", ""late_aircraft_delay"": ""758.00""}"
"{""year"": ""2025"", ""month"": ""7"", ""carrier"": ""YV"", ""carrier_name"": ""Mesa Airlines Inc."", ""airport"": ""CVG"", ""airport_name"": ""Cincinnati, OH: Cincinnati/Northern Kentucky International"", ""arr_flights"": ""75.00"", ""arr_del15"": ""14.00"", ""carrier_ct"": ""7.64"", ""weather_ct"": ""1.00"", ""nas_ct"": ""3.57"", ""security_ct"": ""0.00"", ""late_aircraft_ct"": ""1.79"", ""arr_cancelled"": ""0.00"", ""arr_diverted"": ""0.00"", ""arr_delay"": ""722.00"", ""carrier_delay"": ""485.00"", ""weather_delay"": ""24.00"", ""nas_delay"": ""81.00"", ""security_delay"": ""0.00"", ""late_aircraft_delay"": ""132.00""}"
"{""year"": ""2025"", ""month"": ""7"", ""carrier"": ""YV"", ""carrier_name"": ""Mesa Airlines Inc."", ""airport"": ""DFW"", ""airport_name"": ""Dallas/Fort Worth, TX: Dallas/Fort Worth International"", ""arr_flights"": ""97.00"", ""arr_del15"": ""23.00"", ""carrier_ct"": ""8.09"", ""weather_ct"": ""0.73"", ""nas_ct"": ""4.85"", ""security_ct"": ""0.00"", ""late_aircraft_ct"": ""9.33"", ""arr_cancelled"": ""1.00"", ""arr_diverted"": ""0.00"", ""arr_delay"": ""2125.00"", ""carrier_delay"": ""655.00"", ""weather_delay"": ""65.00"", ""nas_delay"": ""437.00"", ""security_delay"": ""0.00"", ""late_aircraft_delay"": ""968.00""}"
"{""year"": ""2025"", ""month"": ""7"", ""carrier"": ""YV"", ""carrier_name"": ""Mesa Airlines Inc."", ""airport"": ""GUC"", ""airport_name"": ""Gunnison, CO: Gunnison-Crested Butte Regional"", ""arr_flights"": ""8.00"", ""arr_del15"": ""1.00"", ""carrier_ct"": ""0.00"", ""weather_ct"": ""1.00"", ""nas_ct"": ""0.00"", ""security_ct"": ""0.00"", ""late_aircraft_ct"": ""0.00"", ""arr_cancelled"": ""0.00"", ""arr_diverted"": ""0.00"", ""arr_delay"": ""150.00"", ""carrier_delay"": ""0.00"", ""weather_delay"": ""150.00"", ""nas_delay"": ""0.00"", ""security_delay"": ""0.00"", ""late_aircraft_delay"": ""0.00""}"
"{""year"": ""2025"", ""month"": ""7"", ""carrier"": ""YV"", ""carrier_name"": ""Mesa Airlines Inc."", ""airport"": ""IAD"", ""airport_name"": ""Washington, DC: Washington Dulles International"", ""arr_flights"": ""978.00"", ""arr_del15"": ""234.00"", ""carrier_ct"": ""52.94"", ""weather_ct"": ""14.27"", ""nas_ct"": ""67.41"", ""security_ct"": ""0.00"", ""late_aircraft_ct"": ""99.39"", ""arr_cancelled"": ""48.00"", ""arr_diverted"": ""6.00"", ""arr_delay"": ""23417.00"", ""carrier_delay"": ""3756.00"", ""weather_delay"": ""2148.00"", ""nas_delay"": ""8302.00"", ""security_delay"": ""0.00"", ""late_aircraft_delay"": ""9211.00""}"


year,month,carrier,carrier_name,airport,airport_name,arr_flights,arr_del15,carrier_ct,weather_ct,nas_ct,security_ct,late_aircraft_ct,arr_cancelled,arr_diverted,arr_delay,carrier_delay
2025,7,YV,Mesa Airlines Inc.,CRP,"Corpus Christi, TX: Corpus Christi International",19.0,4.0,0.71,0.3,0.7,0.0,2.29,1.0,0.0,823.0,45.0
2025,7,YV,Mesa Airlines Inc.,CVG,"Cincinnati, OH: Cincinnati/Northern Kentucky International",75.0,14.0,7.64,1.0,3.57,0.0,1.79,0.0,0.0,722.0,485.0
2025,7,YV,Mesa Airlines Inc.,DFW,"Dallas/Fort Worth, TX: Dallas/Fort Worth International",97.0,23.0,8.09,0.73,4.85,0.0,9.33,1.0,0.0,2125.0,655.0
2025,7,YV,Mesa Airlines Inc.,GUC,"Gunnison, CO: Gunnison-Crested Butte Regional",8.0,1.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,150.0,0.0
2025,7,YV,Mesa Airlines Inc.,IAD,"Washington, DC: Washington Dulles International",978.0,234.0,52.94,14.27,67.41,0.0,99.39,48.0,6.0,23417.0,3756.0
2025,5,OO,SkyWest Airlines Inc.,BFL,"Bakersfield, CA: Meadows Field",133.0,30.0,12.31,3.0,8.98,0.0,5.71,0.0,0.0,2589.0,563.0
2025,5,OO,SkyWest Airlines Inc.,BGM,"Binghamton, NY: Greater Binghamton/Edwin A. Link Field",25.0,2.0,2.0,0.0,0.0,0.0,0.0,0.0,1.0,60.0,60.0
2025,5,OO,SkyWest Airlines Inc.,BHM,"Birmingham, AL: Birmingham-Shuttlesworth International",176.0,54.0,31.88,6.83,7.12,0.0,8.18,1.0,0.0,3970.0,2103.0
2025,5,OO,SkyWest Airlines Inc.,BIL,"Billings, MT: Billings Logan International",175.0,24.0,10.29,1.68,4.63,0.0,7.4,1.0,1.0,1579.0,470.0
2025,5,OO,SkyWest Airlines Inc.,BIS,"Bismarck/Mandan, ND: Bismarck Municipal",144.0,29.0,8.59,4.71,7.3,0.0,8.4,2.0,1.0,4274.0,1089.0


Parsed records (approx): 47155


Writing Data

In [0]:
# 8) Write to Silver as Delta  

(df_clean.write
    .format("delta")
    .mode("overwrite")                 
    .option("overwriteSchema", "true")
    .save(silver_delta_path)
)
print("Written silver delta to:", silver_delta_path)


Written silver delta to: /mnt/silver/delta/airline_delays
