In [0]:
# Setup access to storage account for temp data when pushing to Synapse

storage_account = "temphumstorage"
spark.conf.set(f"fs.azure.account.key.{storage_account}.blob.core.windows.net", "AmLztDGPPK8HHONuCuFQYDunn6kVS+hr7XjflEgikeFx1qAFIasp4zWR9hLHzX00LqvCfZOE43jP+AStj06Chw==")

In [0]:
# Setup storage locations for all data
ROOT_PATH = f"wasbs://iot@{storage_account}.blob.core.windows.net/"
WEATHER_PATH = ROOT_PATH + "weather/"
dbutils.fs.ls(ROOT_PATH)

Out[38]: []

In [0]:
BRONZE_PATH = WEATHER_PATH + "bronze/"
CHECKPOINT_BRONZE = BRONZE_PATH + "checkpoint/"
#SILVER_PATH = ROOT_PATH + "silver/"
GOLD_PATH = WEATHER_PATH + "gold/"
CHECKPOINT_GOLD = GOLD_PATH + "checkpoint/"
#CHECKPOINT_PATH = ROOT_PATH + "checkpoints/"

In [0]:
dbutils.fs.rm(WEATHER_PATH , True)

Out[41]: False

In [0]:
IOT_CS = "Endpoint=sb://ihsuprodblres033dednamespace.servicebus.windows.net/;SharedAccessKeyName=iothubowner;SharedAccessKey=Wv4iaSjNkPbNi51fg4XSs/yJMwp2MG71yxo6YWIBIfI=;EntityPath=iothub-ehub-temphumhub-24669055-edb9e46156" # dbutils.secrets.get('iot','iothub-cs') # IoT Hub connection string (Event Hub Compatible)
ehConf = { 
  'eventhubs.connectionString':sc._jvm.org.apache.spark.eventhubs.EventHubsUtils.encrypt(IOT_CS),
  'ehName':"iothub-ehub-temphumhub-24669055-edb9e46156"
}

In [0]:
# Enable auto compaction and optimized writes in Delta
spark.conf.set("spark.databricks.delta.optimizeWrite.enabled","true")
spark.conf.set("spark.databricks.delta.autoCompact.enabled","true")

In [0]:
# Pyspark and ML Imports
import os, json, requests
from pyspark.sql import functions as F
from pyspark.sql.functions import pandas_udf, PandasUDFType

In [0]:
# Schema of incoming data from IoT hub
schema = "timestamp timestamp, deviceId string, temperature double, humidity double"
#windspeed double, winddirection string, rpm double, angle double"

# Read directly from IoT Hub using the EventHubs library for Databricks
iot_stream = (
  spark.readStream.format("eventhubs")                                               # Read from IoT Hubs directly
    .options(**ehConf)                                                               # Use the Event-Hub-enabled connect string
    .load()                                                                          # Load the data
    .withColumn('reading', F.from_json(F.col('body').cast('string'), schema))        # Extract the "body" payload from the message               
    .select('reading.*', F.to_date('reading.timestamp').alias('date')) # Create a "date" field for partitioning
    
)

In [0]:
# Split our IoT Hub stream into separate streams and write them both into their own Delta locations
(iot_stream
.filter(iot_stream.temperature.isNotNull())                              # Filter out weather telemetry only
.select('date','deviceid','timestamp','temperature','humidity') 
.writeStream.format('delta').option("mergeSchema", "true")                       # Write our stream to the Delta format
.partitionBy('date')                                                             # Partition our data by Date for performance
.option("checkpointLocation", CHECKPOINT_BRONZE + "weather_raw")
    # Checkpoint so we can restart streams gracefully
.start(BRONZE_PATH + "weather_raw"))                                              # Stream the data into an ADLS Path

Out[46]: <pyspark.sql.streaming.query.StreamingQuery at 0x7ff4460169d0>

In [0]:
# Create the external tables once data starts to stream in
while True:
    try:
        spark.sql(f'CREATE TABLE IF NOT EXISTS weather_raw USING DELTA LOCATION "{BRONZE_PATH + "weather_raw"}"')
        break
    except:
        pass

In [0]:
%sql
DROP TABLE weather_raw;

In [0]:
%sql
SELECT COUNT(*) FROM weather_raw where deviceid = "temphum";

count(1)
6


In [0]:
# Create functions to merge weather data into their target Delta tables
def merge_delta(incremental, target):
    incremental.dropDuplicates(['date','window','deviceid']).createOrReplaceTempView("incremental")
    try:
    # MERGE records into the target table using the specified join key
        incremental._jdf.sparkSession().sql(f"""
      MERGE INTO delta.`{target}` t
      USING incremental i
      ON i.date=t.date AND i.window = t.window AND i.deviceId = t.deviceid
      WHEN MATCHED THEN UPDATE SET *
      WHEN NOT MATCHED THEN INSERT *
    """)
    except:
    # If the †arget table does not exist, create one
        incremental.write.format("delta").partitionBy("date").save(target)

In [0]:
weather_df = (spark.readStream.format('delta').table("weather_raw")                        # Read data as a stream from our source Delta table
    .groupBy('deviceid','date',F.window('timestamp','5 minutes'))              # Aggregate readings to hourly intervals
    .agg({"temperature":"avg","humidity":"avg"})
    .selectExpr('date','window','deviceid','`avg(temperature)` as avg_temperature','`avg(humidity)` as avg_humidity')
    .writeStream                                                               # Write the resulting stream
    .foreachBatch(lambda i, b: merge_delta(i, GOLD_PATH + "weather_agg"))    # Pass each micro-batch to a function
    .outputMode("update")                                                      # Merge works with update mode
    .option("checkpointLocation", CHECKPOINT_GOLD + "weather_agg")             # Checkpoint so we can restart streams gracefully
    .start()  
)

In [0]:
%sql
DROP TABLE weather_agg;

In [0]:
while True:
    try:
        spark.sql(f'CREATE TABLE IF NOT EXISTS weather_agg USING DELTA LOCATION "{GOLD_PATH + "weather_agg"}"')
        break
    except:
        pass

In [0]:
%sql
select * from weather_agg;

date,window,deviceid,avg_temperature,avg_humidity
2023-03-09,"List(2023-03-09T05:50:00.000+0000, 2023-03-09T05:55:00.000+0000)",temphum,26.655916594659335,69.84137210623767
2023-03-09,"List(2023-03-09T05:45:00.000+0000, 2023-03-09T05:50:00.000+0000)",temphum,24.276380985021543,67.0970103601317


In [0]:
(spark.readStream
  .format("delta")
  .load(BRONZE_PATH + "weather_raw")
  .createOrReplaceTempView("weather_table")
)


In [0]:
%sql
SELECT * FROM weather_table;

date,deviceid,timestamp,temperature,humidity
2023-03-09,temphum,2023-03-09T05:47:11.000+0000,31.367398746293677,74.24725126120478
2023-03-09,temphum,2023-03-09T05:48:11.000+0000,21.048240893139265,60.40791703061616
2023-03-09,temphum,2023-03-09T05:49:11.000+0000,20.413503315631683,66.6358627885742
2023-03-09,temphum,2023-03-09T05:52:11.000+0000,26.23898347765408,74.54795335917834
2023-03-09,temphum,2023-03-09T05:53:11.000+0000,29.16460223639552,61.68478937998501
2023-03-09,temphum,2023-03-09T05:54:11.000+0000,26.334466472860264,66.78721489113478
2023-03-09,temphum,2023-03-09T05:55:11.000+0000,28.71931042475876,79.3273291216309
2023-03-09,temphum,2023-03-09T05:57:11.000+0000,21.36794620451833,62.17112092488283
2023-03-09,temphum,2023-03-09T05:50:11.000+0000,29.584431232723865,77.49574782307202
2023-03-09,temphum,2023-03-09T05:51:11.000+0000,21.635649431863904,65.63699786271529
