# Sample Notebook Iot Ingestion to Delta
Databricks sample notebook showcases to how to ingest data directly from an IoT Hub (Event Hub compatible endpoint) and land in an ADLS Gen2-backed Delta table.

This notebook was adapted from the fantastic blog posts linked below:
- https://brentonblogs.com/structured-streaming-in-azure-synapse/
- https://github.com/BrentonAD/blog-synapse-streaming/blob/main/code/StreamIngestion.ipynb

Successful execution of this notebook requires having a registered secret store with the following named secrets included:
- `storageaccount`: Name of an Azure storage account (Blob or ADLS Gen2) where delta table data will be stored
- `storagekey`: Storage key associated with the aforementioned storage acount. Note: you may optionally choose to use a SAS token here and configure your connection to storage as such.
- `eventhub`: Name of an Event Hub endpoint (can be retrieved from IoT Hub deployments) which events will be retrieved from
- `iothubconnstr`: Connection string for your IoT Hub/Event Hub resource. In the case of the former, this should be the connection string for the Event Hub compatible endpoint.


In [None]:
secret_scope = '<YOUR-SECRET-SCOPE-NAME>'

In [None]:
# Get names of storage account and event hub resources
storage_account = dbutils.secrets.get(secret_scope, 'storageaccount')
event_hub = dbutils.secrets.get(secret_scope, 'eventhub')

In [None]:
# Setup access to storage account for temp data when pushing to Synapse
spark.conf.set(f"fs.azure.account.key.{storage_account}.dfs.core.windows.net", dbutils.secrets.get(secret_scope, 'storagekey'))

# Setup storage locations for all data
ROOT_PATH = f"abfss://iot@{storage_account}.dfs.core.windows.net/"
BRONZE_PATH = ROOT_PATH + "raw/"
CHECKPOINT_PATH = ROOT_PATH + "checkpoints/"

# Retrieve event hub connection string and create configuration
connectionString = dbutils.secrets.get(secret_scope, 'iothubconnstr')

ehConf = {
  'eventhubs.connectionString' : sc._jvm.org.apache.spark.eventhubs.EventHubsUtils.encrypt(connectionString),
'ehName': event_hub
}

spark.conf.set("spark.databricks.delta.optimizeWrite.enabled","true")
spark.conf.set("spark.databricks.delta.autoCompact.enabled","true")

# Pyspark and ML Imports
import os, json, requests
from pyspark.sql import functions as F
from pyspark.sql.functions import pandas_udf, PandasUDFType

In [None]:
# Define schema for uploated telemetry
schema = 'timestamp timestamp, sensor_00 float,sensor_01 float,sensor_02 float,sensor_03 float,sensor_04 float,sensor_05 float,sensor_06 float,sensor_07 float,sensor_08 float,sensor_09 float,sensor_10 float,sensor_11 float,sensor_12 float,sensor_13 float,sensor_14 float,sensor_15 float,sensor_16 float,sensor_17 float,sensor_18 float,sensor_19 float,sensor_20 float,sensor_21 float,sensor_22 float,sensor_23 float,sensor_24 float,sensor_25 float,sensor_26 float,sensor_27 float,sensor_28 float,sensor_29 float,sensor_30 float,sensor_31 float,sensor_32 float,sensor_33 float,sensor_34 float,sensor_35 float,sensor_36 float,sensor_37 float,sensor_38 float,sensor_39 float,sensor_40 float,sensor_41 float,sensor_42 float,sensor_43 float,sensor_44 float,sensor_45 float,sensor_46 float,sensor_47 float,sensor_48 float,sensor_49 float, machine_status string'

iot_stream = (
	# Read from IoT Hubs directly
	spark.readStream.format("eventhubs")                                               
	# Use the Event-Hub-enabled connect string
    .options(**ehConf)                                                               
	# Load the data
    .load()
    # Extract the "body" payload from the messages
	.withColumn('reading', F.from_json(F.col('body').cast('string'), schema))        
	# Create a "date" field for partitioning
    .select('reading.*', F.to_date('reading.timestamp').alias('date'))                           
)

In [None]:
# Read stream and write to Delta
water_pump_data_to_delta = (
  iot_stream                                                                    # Filter out turbine telemetry from other data streams
    .select('*')                                                                # Extract the fields of interest
    .writeStream.format('delta')                                                # Write our stream to the Delta format
    .partitionBy('date')                                                        # Partition our data by Date for performance
    .option("checkpointLocation", CHECKPOINT_PATH + "mfg001")                   # Checkpoint so we can restart streams gracefully
    .start(BRONZE_PATH + "mfg001")                                              # Stream the data into an ADLS Path
)

In [None]:
# Sleep 60s - implemented for batch jobs
import time
time.sleep(60)

In [None]:
# Create Delta table if not exists
while True:
  try:
    spark.sql(f'CREATE TABLE IF NOT EXISTS mfg001 USING DELTA LOCATION "{BRONZE_PATH + "mfg001"}"')
    break
  except:
    pass

In [None]:
%sql
SELECT COUNT(*) FROM mfg001

In [None]:
%sql
SELECT (*) FROM mfg001 ORDER BY timestamp DESC LIMIT 10

In [None]:
# Stop streaming job and resume on next notebook execution
water_pump_data_to_delta.stop()