In [0]:
from pyspark.sql.functions import from_json, col, current_timestamp
from pyspark.sql.types import (
    StructType, StringType, StructField, TimestampNTZType, IntegerType, FloatType, ArrayType, LongType
)

In [0]:
schema = StructType([
    StructField("city", StringType(), False),
    StructField("temperature", FloatType(), True),
    StructField("feel_temperature", FloatType(), False),
    StructField("temp_min", FloatType(), True),
    StructField("temp_max", FloatType(), True),
    StructField("humidity", IntegerType(), True),
    StructField("pressure", FloatType(), True),
    StructField("cloudiness", IntegerType(), True),
    StructField("wind_speed", FloatType(), True),
    StructField("wind_direction", IntegerType(), True),
    StructField("weather_category", StringType(), True),
    StructField("weather_description", StringType(), True),
    StructField("weather_icon", StringType(), True),
    StructField("sunrise_time", IntegerType(), True),
    StructField("sunset_time", IntegerType(), True),
    StructField("recording_timestamp", TimestampNTZType(), False),
])

In [0]:
dbutils.secrets.listScopes()

In [0]:
conn_str = dbutils.secrets.get(scope="db-weather-vault", key="weather-conn-str")
#eventhub_name = dbutils.secrets.get(scope="db-weather-vault", key="weather-eventhub")

In [0]:
event_hub_config = {
    'eventhubs.connectionString': sc._jvm.org.apache.spark.eventhubs.EventHubsUtils.encrypt(conn_str)
}

In [0]:
df_raw = (
    spark.readStream
    .format("eventhubs")
    .options(**event_hub_config)
    .load()
)

# df_raw.display()

In [0]:
# remove delta log to properly delete old schema when deleting and recreating tables
# spark.sql("drop table azure_weather_db.bronze.raw_weather_data")
dbutils.fs.rm("abfss://unity-catalog-storage@dbstorage7pezjm3fdwfxa.dfs.core.windows.net/weather_data/bronze/raw_weather_data", recurse=True)


In [0]:
# create managed table for bronze
spark.sql("""
          CREATE TABLE IF NOT EXISTS azure_weather_db.bronze.raw_weather_data (
              body BINARY,
              recording_timestamp TIMESTAMP
          )
          USING DELTA
          LOCATION 'abfss://unity-catalog-storage@dbstorage7pezjm3fdwfxa.dfs.core.windows.net/weather_data/bronze/raw_weather_data'
""")

In [0]:
spark.table("azure_weather_db.bronze.raw_weather_data").printSchema()

In [0]:
# define bronze df
df_bronze = df_raw.select(
    col("body"),
    col("enqueuedTime").alias("recording_timestamp")
)

In [0]:
# write bronze into the delta storage
(
    df_bronze.writeStream
        .format("delta")
        .option("checkpointLocation", "abfss://unity-catalog-storage@dbstorage7pezjm3fdwfxa.dfs.core.windows.net/weather_data/bronze/raw_weather_data")
        .outputMode("append")
        .start("abfss://unity-catalog-storage@dbstorage7pezjm3fdwfxa.dfs.core.windows.net/weather_data/bronze/raw_weather_data")
)

In [0]:
# first create schema in this catalog before stream table creations
spark.sql("CREATE SCHEMA IF NOT EXISTS azure_weather_db.silver;")

In [0]:
# create silver table
spark.sql("""
          CREATE TABLE azure_weather_db.silver.weather_data (
              city STRING NOT NULL,
              temperature FLOAT NOT NULL,
              feel_temperature FLOAT NOT NULL,
              temp_min FLOAT,
              temp_max FLOAT,
              humidity INT,
              pressure FLOAT,
              cloudiness INT,
              wind_speed FLOAT,
              wind_direction INT,
              weather_category STRING,
              weather_description STRING,
              weather_icon STRING,
              sunrise_time LONG,
              sunset_time LONG,
              enqueued_time TIMESTAMP,
              recording_timestamp TIMESTAMP
          )
          USING DELTA
          LOCATION 'abfss://unity-catalog-storage@dbstorage7pezjm3fdwfxa.dfs.core.windows.net/weather_data/silver/weather_data'
""")

In [0]:
# delete silver table, in case of recreation (delete table, plus file logs)
spark.sql("DROP TABLE azure_weather_db.silver.weather_data")
dbutils.fs.rm("abfss://unity-catalog-storage@dbstorage7pezjm3fdwfxa.dfs.core.windows.net/weather_data/silver/weather_data", recurse=True)

In [0]:
# create readstream from bronze table
df_bronze = spark.readStream.table("azure_weather_db.bronze.raw_weather_data")

In [0]:
# define the raw weather payload schema as it is send from eventhub
raw_schema = StructType([
    StructField("coord", StructType([
        StructField("lon", FloatType()),
        StructField("lat", FloatType())
    ])),
    StructField("weather", ArrayType(StructType([
        StructField("id", IntegerType()),
        StructField("main", StringType()),
        StructField("description", StringType()),
        StructField("icon", StringType())
    ]))),
    StructField("main", StructType([
        StructField("temp", FloatType()),
        StructField("feels_like", FloatType()),
        StructField("temp_min", FloatType()),
        StructField("temp_max", FloatType()),
        StructField("pressure", FloatType()),
        StructField("humidity", IntegerType()),
        StructField("sea_level", FloatType()),
        StructField("grnd_level", FloatType())
    ])),
    StructField("wind", StructType([
        StructField("speed", FloatType()),
        StructField("deg", IntegerType()),
        StructField("gust", FloatType())
    ])),
    StructField("clouds", StructType([
        StructField("all", IntegerType())
    ])),
    StructField("dt", LongType()),
    StructField("sys", StructType([
        StructField("sunrise", LongType()),
        StructField("sunset", LongType())
    ])),
    StructField("name", StringType())
])

In [0]:
# first decode binary payload to string
df_bronze = df_bronze.withColumn("json_str", col("body").cast("string"))

In [0]:
# Add field to track corruption in raw schema, in case of invalid row
raw_schema_and_corrupt = raw_schema.add(StructField("_corrupt_record", StringType(), True))

In [0]:
# df based on bronze df with json data column holding the actual json now
parsed_df = df_bronze.withColumn("data", from_json(col("json_str"), raw_schema_and_corrupt, {"mode": "PERMISSIVE"}))

In [0]:
# create df for succesffully parsed and failed rows
df_parsed_invalid = parsed_df.filter(col("data._corrupt_record").isNotNull())
df_parsed_valid = parsed_df.filter(col("data._corrupt_record").isNull())

In [0]:
# bronze to silver processing (select relevant columns, and create dates)
processed_df = df_parsed_valid.select(
    col("data.name").alias("city"),
    col("data.main.temp").alias("temperature"),
    col("data.main.feels_like").alias("feel_temperature"),
    col("data.main.temp_min").alias("temp_min"),
    col("data.main.temp_max").alias("temp_max"),
    col("data.main.humidity").alias("humidity"),
    col("data.main.pressure").alias("pressure"),
    col("data.clouds.all").alias("cloudiness"),
    col("data.wind.speed").alias("wind_speed"),
    col("data.wind.deg").alias("wind_direction"),
    col("data.weather")[0]["main"].alias("weather_category"),
    col("data.weather")[0]["description"].alias("weather_description"),
    col("data.weather")[0]["icon"].alias("weather_icon"),
    col("data.sys.sunrise").alias("sunrise_time"),
    col("data.sys.sunset").alias("sunset_time"),
    col("recording_timestamp").alias("enqueued_time"),  # this is enqueued timestamp from raw eventhubs event
    current_timestamp().alias("recording_timestamp")
)

In [0]:
# write stream to silver storage
processed_df.writeStream \
    .format("delta") \
    .option("checkpointLocation", "abfss://unity-catalog-storage@dbstorage7pezjm3fdwfxa.dfs.core.windows.net/weather_data/checkpoints/silver/weather_data") \
    .outputMode("append") \
    .toTable("azure_weather_db.silver.weather_data")

In [0]:
# reset table for real run
spark.sql("truncate table azure_weather_db.bronze.raw_weather_data")
spark.sql("truncate table azure_weather_db.silver.weather_data")

In [0]:
spark.sql("SELECT * FROM azure_weather_db.silver.weather_data").display()

In [0]:
spark.sql("SELECT count(*) FROM azure_weather_db.silver.weather_data").display()

In [0]:
# reset checkpoints -------------------------------
dbutils.fs.rm("abfss://unity-catalog-storage@dbstorage7pezjm3fdwfxa.dfs.core.windows.net/weather_data/checkpoints/silver/weather_data", recurse=True)
dbutils.fs.rm("abfss://unity-catalog-storage@dbstorage7pezjm3fdwfxa.dfs.core.windows.net/weather_data/checkpoints/bronze/raw_weather_data", recurse=True)
dbutils.fs.rm("abfss://unity-catalog-storage@dbstorage7pezjm3fdwfxa.dfs.core.windows.net/weather_data/checkpoints/silver/weather_data", recurse=True)

In [0]:
dbutils.fs.ls("abfss://unity-catalog-storage@dbstorage7pezjm3fdwfxa.dfs.core.windows.net/weather_data/checkpoints/bronze")

In [0]:
# store corrupted records
df_parsed_invalid.writeStream \
    .format("delta") \
    .outputMode("append") \
    .option("checkpointLocation", "abfss://unity-catalog-storage@dbstorage7pezjm3fdwfxa.dfs.core.windows.net/weather_data/checkpoints/silver/corrupt_data") \
    .start("abfss://unity-catalog-storage@dbstorage7pezjm3fdwfxa.dfs.core.windows.net/weather_data/silver/invalid_records")

In [0]:
display(dbutils.fs.ls("abfss://unity-catalog-storage@dbstorage7pezjm3fdwfxa.dfs.core.windows.net/weather_data/checkpoints/silver/weather_data"))