In [0]:

from pyspark.sql.functions import *
from pyspark.sql.types import *

In [0]:
try:
    spark.sql("create catalog demo_databricks_bf;")
except Exception as e:
    print('check if catalog already exists')
    print(e)

try:
    spark.sql("create schema demo_databricks_bf.bronze;")
except:
    print('check if bronze schema already exists')

try:
    spark.sql("create schema demo_databricks_bf.silver")
except:
    print('check if silver schema already exists')

try:
    spark.sql("create schema demo_databricks_bf.gold;")
except:
    print('check if gold schema already exists')

check if catalog already exists
[RequestId=53eaa235-e219-4c95-a350-8b0b5f6f0270 ErrorClass=INVALID_STATE] Metastore storage root URL does not exist. Please provide a storage location for the catalog (for example 'CREATE CATALOG myCatalog MANAGED LOCATION '<location-path>'). Alternatively set up a metastore root storage location to provide a storage location for all catalogs in the metastore.
check if bronze schema already exists
check if silver schema already exists
check if gold schema already exists


In [0]:
# Configuration
connection_string = "CONNECTION_STRING"
event_hub_name = "EVENT_HUB_NAME"

eh_conf = {
  'eventhubs.connectionString' : sc._jvm.org.apache.spark.eventhubs.EventHubsUtils.encrypt(connection_string),
  'eventhubs.eventHubName': event_hub_name
}

In [0]:
# Lire un stream event hub 
df = spark.readStream \
    .format("eventhubs") \
    .options(**eh_conf) \
    .load() \

# Afficher les informations du stream
df.display()

body,partition,offset,sequenceNumber,enqueuedTime,publisher,partitionKey,properties,systemProperties
eyJpZCI6IDEsICJuYW1lIjogIkFsaWNlIiwgImFnZSI6IDMwLCAiY2l0eSI6ICJOZXcgWW9yayJ9,0,4294967296,4,2025-09-22T13:08:29.644+0000,,,Map(),Map(x-opt-sequence-number-epoch -> -1)
eyJpZCI6IDIsICJuYW1lIjogIkJvYiIsICJhZ2UiOiAyNSwgImNpdHkiOiAiTG9zIEFuZ2VsZXMifQ==,0,4294967416,5,2025-09-22T13:08:29.644+0000,,,Map(),Map(x-opt-sequence-number-epoch -> -1)
eyJpZCI6IDMsICJuYW1lIjogIkNoYXJsaWUiLCAiYWdlIjogMzUsICJjaXR5IjogIkNoaWNhZ28ifQ==,0,4294967536,6,2025-09-22T13:08:29.644+0000,,,Map(),Map(x-opt-sequence-number-epoch -> -1)
eyJpZCI6IDQsICJuYW1lIjogIkRpYW5hIiwgImFnZSI6IDI4LCAiY2l0eSI6ICJNaWFtaSJ9,0,4294967656,7,2025-09-22T13:08:29.644+0000,,,Map(),Map(x-opt-sequence-number-epoch -> -1)


In [0]:
df.writeStream\
    .option("checkpointLocation", "/mnt/streaming/bronze/weather")\
    .outputMode("append")\
    .format("delta")\
    .toTable("demo_databricks_bf.bronze.weather")

Out[14]: <pyspark.sql.streaming.query.StreamingQuery at 0x7fbf3c2143a0>

In [0]:
# Defining the schema for the JSON object

json_schema = StructType([
    StructField("id", IntegerType()),
    StructField("name", StringType()),
    StructField("age", IntegerType()),
    StructField("city", StringType())
])

In [0]:
# Reading and Transforming: Load streaming data from the 'streaming.bronze.weather' Delta table, cast 'body' to string, parse JSON, and select specific fields
df = spark.readStream\
    .format("delta")\
    .table("demo_databricks_bf.bronze.weather")\
    .withColumn("body", col("body").cast("string"))\
    .withColumn("body",from_json(col("body"), json_schema))\
    .select("body.id", "body.name", "body.age", "body.city", col("enqueuedTime").alias('timestamp'))

# Displaying stream: Visualize the transformed data in the DataFrame for verification and analysis
df.display()

# Writing stream: Save the transformed data to the 'streaming.silver.weather' Delta table in 'append' mode with checkpointing for data reliability
df.writeStream\
    .option("checkpointLocation", "/mnt/streaming/silver/weather")\
    .outputMode("append")\
    .format("delta")\
    .toTable("demo_databricks_bf.silver.weather")

id,name,age,city,timestamp
1,Alice,30,New York,2025-09-22T13:08:29.644+0000
2,Bob,25,Los Angeles,2025-09-22T13:08:29.644+0000
3,Charlie,35,Chicago,2025-09-22T13:08:29.644+0000
4,Diana,28,Miami,2025-09-22T13:08:29.644+0000


In [0]:
# Aggregating Stream: Read from 'streaming.silver.weather', apply watermarking and windowing, and calculate average weather metrics
df = spark.readStream\
    .format("delta")\
    .table("demo_databricks_bf.silver.weather")\
    .withWatermark("timestamp", "5 minutes") \
    .groupBy(window("timestamp", "5 minutes")) \
    .agg(avg("temperature").alias('temperature'), avg("humidity").alias('humidity'), avg("windSpeed").alias('windSpeed'), avg("precipitation").alias('precipitation'))\
	.select('window.start', 'window.end', 'temperature', 'humidity', 'windSpeed', 'precipitation')

# Displaying Aggregated Stream: Visualize aggregated data for insights into weather trends
df.display()

# Writing Aggregated Stream: Store the aggregated data in 'streaming.gold.weather_aggregated' with checkpointing for data integrity
df.writeStream\
    .option("checkpointLocation", "/mnt/streaming/weather_summary")\
    .outputMode("append")\
    .format("delta")\
    .toTable("demo_databricks_bf.gold.weather_summary")