# Real-time Data Processing with Azure Databricks (and Event Hubs)

In [None]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

In [None]:
from pyspark.shell import spark

try:
    spark.sql("create catalog streaming;")
except:
    print('check if catalog already exists')

try:
    spark.sql("create schema streaming.bronze;")
except:
    print('check if bronze schema already exists')

try:
    spark.sql("create schema streaming.silver")
except:
    print('check if silver schema already exists')

try:
    spark.sql("create schema streaming.gold;")
except:
    print('check if gold schema already exists')

#### Bronze Layer

In [None]:
# Replace with your Event Hub namespace, name, and key
connectionString = "Paste connection string here"
eventHubName = "Type event hub name here"

ehConf = {
  'eventhubs.connectionString' : spark.eventhubs.EventHubsUtils.encrypt(connectionString),
  'eventhubs.eventHubName': eventHubName
}

Reading and writing the stream to the bronze layer.

In [None]:
# Reading stream: Load data from Azure Event Hub into DataFrame 'df' using the previously configured settings
df = spark.readStream.format("eventhubs").options(**ehConf).load()
# df.display()

# Writing stream: Persist the streaming data to a Delta table 'streaming.bronze.weather' in 'append' mode with checkpointing
df.writeStream.option("checkpointLocation", "/mnt/streaming/bronze/weather").outputMode("append").format("delta").toTable("streaming.bronze.weather")

#### Silver Layer

In [None]:
json_schema = StructType([
    StructField("temperature", IntegerType()),
    StructField("humidity", IntegerType()),
    StructField("windSpeed", IntegerType()),
    StructField("windDirection", StringType()),
    StructField("precipitation", IntegerType()),
    StructField("conditions", StringType())
])

In [None]:
# Reading and Transforming: Load streaming data from the 'streaming.bronze.weather' Delta table, cast 'body' to string, parse JSON, and select specific fields
df = spark.readStream.format("delta").table("streaming.bronze.weather").withColumn("body", col("body").cast("string"))\
    .withColumn("body",from_json(col("body"), json_schema)).select("body.temperature", "body.humidity", "body.windSpeed", "body.windDirection", "body.precipitation", "body.conditions", col("enqueuedTime").alias('timestamp'))
# df.display()

# Writing stream: Save the transformed data to the 'streaming.silver.weather' Delta table in 'append' mode with checkpointing for data reliability
df.writeStream.option("checkpointLocation", "/mnt/streaming/silver/weather").outputMode("append").format("delta").toTable("streaming.silver.weather")

#### Gold Layer

In [None]:
# Aggregating Stream: Read from 'streaming.silver.weather', apply watermarking and windowing, and calculate average weather metrics
df = spark.readStream.format("delta").table("streaming.silver.weather").withWatermark("timestamp", "5 minutes").groupBy(window("timestamp", "5 minutes")) \
    .agg(avg("temperature").alias('temperature'), avg("humidity").alias('humidity'), avg("windSpeed").alias('windSpeed'), avg("precipitation").alias('precipitation')).select('window.start', 'window.end', 'temperature', 'humidity', 'windSpeed', 'precipitation')
# df.display()

# Writing Aggregated Stream: Store the aggregated data in 'streaming.gold.weather_aggregated' with checkpointing for data integrity
df.writeStream.option("checkpointLocation", "/mnt/streaming/weather_summary").outputMode("append").format("delta").toTable("streaming.gold.weather_summary")