# Real-time Data Processing with Azure Databricks and Event Hubs

In [0]:
%pip install azure-eventhub


In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *
import json

In [0]:
try:
    spark.sql("create catalog streaming;")
except:
    print('check if catalog already exists')

try:
    spark.sql("create schema streaming.bronze;")
except:
    print('check if bronze schema already exists')

try:
    spark.sql("create schema streaming.silver")
except:
    print('check if silver schema already exists')

try:
    spark.sql("create schema streaming.gold;")
except:
    print('check if gold schema already exists')

#### Bronze Layer

In [0]:
connectionString = "Listen accesss policy connection string of event hub"
eventHubName = "Name of event hub"

ehConf = {
  'eventhubs.connectionString' : sc._jvm.org.apache.spark.eventhubs.EventHubsUtils.encrypt(connectionString),
  'eventhubs.eventHubName': eventHubName
}

In [0]:
df = spark.readStream \
    .format("eventhubs") \
    .options(**ehConf) \
    .load() \

df.display()

df.writeStream\
    .option("checkpointLocation", "/mnt/streaming/bronze/weather")\
    .outputMode("append")\
    .format("delta")\
    .toTable("streaming.bronze.weather")

#### Silver Layer

In [0]:
json_schema = StructType([
    StructField("temperature", IntegerType()),
    StructField("humidity", IntegerType()),
    StructField("windSpeed", IntegerType()),
    StructField("windDirection", StringType()),
    StructField("precipitation", IntegerType()),
    StructField("conditions", StringType())
])

In [0]:
df = spark.readStream\
    .format("delta")\
    .option("skipChangeCommits", "true")\
    .table("streaming.bronze.weather")\
    .withColumn("body", col("body").cast("string"))\
    .withColumn("body",from_json(col("body"), json_schema))\
    .select("body.temperature", "body.humidity", "body.windSpeed", "body.windDirection", "body.precipitation", "body.conditions", col("enqueuedTime").alias('timestamp'))

df.display()

df.writeStream\
    .option("checkpointLocation", "/mnt/streaming/silver/weather")\
    .outputMode("append")\
    .format("delta")\
    .toTable("streaming.silver.weather")

#### Gold Layer

In [0]:
df = spark.readStream\
    .format("delta")\
    .option("skipChangeCommits", "true")\
    .table("streaming.silver.weather")\
    .withWatermark("timestamp", "5 minutes") \
    .groupBy(window("timestamp", "5 minutes")) \
    .agg(avg("temperature").alias('temperature'), avg("humidity").alias('humidity'), avg("windSpeed").alias('windSpeed'), avg("precipitation").alias('precipitation'))\
	.select('window.start', 'window.end', 'temperature', 'humidity', 'windSpeed', 'precipitation')

df.display()

df.writeStream\
    .option("checkpointLocation", "/mnt/streaming/gold/weather_summary")\
    .outputMode("append")\
    .format("delta")\
    .toTable("streaming.gold.weather_summary")

In [0]:
%python
from azure.eventhub import EventHubProducerClient, EventData
import asyncio
import concurrent.futures
from concurrent.futures import ThreadPoolExecutor
import random
import json

send_connection_string = "send access policy connection string of event hub"
eventHubName = "mp-car-sales-event-hub"
producer = EventHubProducerClient.from_connection_string(send_connection_string, eventhub_name=eventHubName)

NUM_EVENTS_PER_BATCH = 100
NUM_WORKERS = 20 
TOTAL_EVENTS = 10000000 
PARTITIONS = ["NW", "NE", "SE", "SW", "EE"]

def generate_event(i, partition):
    """Generate a JSON event"""
    return json.dumps({
        "temperature": random.randint(-10, 40),
        "humidity": random.randint(10, 100),
        "windSpeed": random.randint(0, 50),
        "windDirection": partition,
        "precipitation": random.randint(0, 5),
        "conditions": random.choice(["Sunny", "Rainy", "Cloudy", "Stormy"])
    })

async def send_batch(batch_events):
    """Send a batch of events asynchronously"""
    try:
        event_data_batch = producer.create_batch()
        for event in batch_events:
            try:
                event_data_batch.add(EventData(event)) 
            except ValueError:
                break

        await producer.send_batch(event_data_batch)
        print(f"Sent batch of {len(batch_events)} events")
    except Exception as e:
        print(f"Error sending batch: {e}")

async def send_events():
    """Send millions of events using high-concurrency"""
    tasks = []
    for _ in range(TOTAL_EVENTS // NUM_EVENTS_PER_BATCH):
        batch_events = [generate_event(i, random.choice(PARTITIONS)) for i in range(NUM_EVENTS_PER_BATCH)]
        tasks.append(send_batch(batch_events))

    await asyncio.gather(*tasks)

async def main():
    await send_events()
    await producer.close()
await main()