### Consume data from Event Hubs

Setup variables for this job

In [3]:
# make sure to use your Event Hubs connection string to the event hubs created (not the namespace level)
eh_connection = "put your event hubs connection here"   # or use dbutils.secrets.get(scope="your scope",key="your scope key to the eventhubs connection")

lake = "/mnt/lake/raw/"
tableName = "raw.events"
deltaDataPath = lake+tableName
checkpointPath = "/checkpoint/"+tableName

Clean up checkpoint and table data (to start with a clean environment)

In [5]:
dbutils.fs.rm(checkpointPath, recurse=True)
dbutils.fs.rm(deltaDataPath, recurse=True)

Create the Structured streaming job

In [7]:
import json

# event hubs connection and params
ehConf = {'eventhubs.connectionString' : eh_connection}
startingEventPosition = {
  #"offset": "@latest",   
  "offset": "-1",
  "seqNo": -1,            
  "enqueuedTime": None,   
  "isInclusive": False
}
ehConf["eventhubs.startingPosition"] = json.dumps(startingEventPosition)
ehConf["maxEventsPerTrigger"] = 100000

# Start streaming
streamingInputDF = (spark.readStream
    .format("eventhubs")
    .options(**ehConf)
    .load())

Parse input json data and save into a delta table

In [9]:
from pyspark.sql.types import StructType, StructField, StringType, LongType, IntegerType
from pyspark.sql.functions import substring,col,from_json

inputSchema = StructType([
  StructField("messageId", LongType(), True),
  StructField("deviceId", IntegerType(), True),
  StructField("temperature", IntegerType(), True),
  StructField("genTimestamp", StringType(), True)
])

In [10]:
query = (streamingInputDF
  .select(from_json(col("body").cast("string"), inputSchema).alias("value"))
  .selectExpr("value.*")
  .withColumn("genDate", substring("genTimestamp", 1, 10))
  .writeStream
  .format("delta")
  .partitionBy("genDate")
  .outputMode("append")
  .trigger(once=True)
  #.trigger(processingTime='30 seconds')
  .option("checkpointLocation", checkpointPath)
  .start(deltaDataPath) )

Wait so streaming starts and create a table definition mapping to location

In [12]:
spark.sql(f"CREATE DATABASE IF NOT EXISTS raw")
spark.sql(f"DROP TABLE IF EXISTS {tableName}")
spark.sql(f"CREATE TABLE {tableName} USING DELTA LOCATION '{deltaDataPath}'")

Check that data is coming in

In [14]:
display(sql(f"select deviceId, count(*) from {tableName} group by deviceId"))

deviceId,count(1)
31.0,995
85.0,984
65.0,984
53.0,977
78.0,992
34.0,954
81.0,944
28.0,974
76.0,939
27.0,955


Check the folder for this table in ADLS gen2

Then optimize to see compaction take place

In [16]:
# Optimize table, and clean up snapshots
spark.sql(f"OPTIMIZE {tableName}")
spark.conf.set("spark.databricks.delta.retentionDurationCheck.enabled", False)
spark.sql(f"VACUUM {tableName} RETAIN 0 HOURS")    # 0 HOURS is risky if streaming is running

Check how delta table keeps history of changes in the table

In [18]:
display(spark.sql(f"DESCRIBE HISTORY {tableName}"))

version,timestamp,userId,userName,operation,operationParameters,job,notebook,clusterId,readVersion,isolationLevel,isBlindAppend,operationMetrics
20,2020-05-26T21:46:00.000+0000,4627966444400556,nacoloss@microsoft.com,STREAMING UPDATE,"Map(outputMode -> Append, queryId -> 20e4bd40-5e68-4266-a9a1-563165fa83ce, epochId -> 19)",,List(4491530867301522),0526-164620-mace887,19.0,WriteSerializable,True,"Map(numFiles -> 1, numRemovedFiles -> 0, numOutputRows -> 300, numParts -> 1, numOutputBytes -> 4964, numAddedFiles -> 1)"
19,2020-05-26T21:45:32.000+0000,4627966444400556,nacoloss@microsoft.com,STREAMING UPDATE,"Map(outputMode -> Append, queryId -> 20e4bd40-5e68-4266-a9a1-563165fa83ce, epochId -> 18)",,List(4491530867301522),0526-164620-mace887,18.0,WriteSerializable,True,"Map(numFiles -> 1, numRemovedFiles -> 0, numOutputRows -> 300, numParts -> 1, numOutputBytes -> 4961, numAddedFiles -> 1)"
18,2020-05-26T21:45:29.000+0000,4627966444400556,nacoloss@microsoft.com,OPTIMIZE,"Map(predicate -> [], zOrderBy -> [], batchId -> 0, auto -> false)",,List(4491530867301522),0526-164620-mace887,17.0,SnapshotIsolation,False,"Map(numFiles -> 1, numRemovedFiles -> 18, numRemovedBytes -> 83014, p25FileSize -> 49940, minFileSize -> 49940, numOutputRows -> 4880, numParts -> 1, numOutputBytes -> 49939, numAddedFiles -> 1, maxFileSize -> 49940, p75FileSize -> 49940, p50FileSize -> 49940, numAddedBytes -> 49940)"
17,2020-05-26T21:45:00.000+0000,4627966444400556,nacoloss@microsoft.com,STREAMING UPDATE,"Map(outputMode -> Append, queryId -> 20e4bd40-5e68-4266-a9a1-563165fa83ce, epochId -> 17)",,List(4491530867301522),0526-164620-mace887,16.0,WriteSerializable,True,"Map(numFiles -> 1, numRemovedFiles -> 0, numOutputRows -> 300, numParts -> 1, numOutputBytes -> 4949, numAddedFiles -> 1)"
16,2020-05-26T21:44:31.000+0000,4627966444400556,nacoloss@microsoft.com,STREAMING UPDATE,"Map(outputMode -> Append, queryId -> 20e4bd40-5e68-4266-a9a1-563165fa83ce, epochId -> 16)",,List(4491530867301522),0526-164620-mace887,15.0,WriteSerializable,True,"Map(numFiles -> 1, numRemovedFiles -> 0, numOutputRows -> 300, numParts -> 1, numOutputBytes -> 4969, numAddedFiles -> 1)"
15,2020-05-26T21:44:00.000+0000,4627966444400556,nacoloss@microsoft.com,STREAMING UPDATE,"Map(outputMode -> Append, queryId -> 20e4bd40-5e68-4266-a9a1-563165fa83ce, epochId -> 15)",,List(4491530867301522),0526-164620-mace887,14.0,WriteSerializable,True,"Map(numFiles -> 1, numRemovedFiles -> 0, numOutputRows -> 300, numParts -> 1, numOutputBytes -> 4961, numAddedFiles -> 1)"
14,2020-05-26T21:43:30.000+0000,4627966444400556,nacoloss@microsoft.com,STREAMING UPDATE,"Map(outputMode -> Append, queryId -> 20e4bd40-5e68-4266-a9a1-563165fa83ce, epochId -> 14)",,List(4491530867301522),0526-164620-mace887,13.0,WriteSerializable,True,"Map(numFiles -> 1, numRemovedFiles -> 0, numOutputRows -> 300, numParts -> 1, numOutputBytes -> 4968, numAddedFiles -> 1)"
13,2020-05-26T21:43:00.000+0000,4627966444400556,nacoloss@microsoft.com,STREAMING UPDATE,"Map(outputMode -> Append, queryId -> 20e4bd40-5e68-4266-a9a1-563165fa83ce, epochId -> 13)",,List(4491530867301522),0526-164620-mace887,12.0,WriteSerializable,True,"Map(numFiles -> 1, numRemovedFiles -> 0, numOutputRows -> 300, numParts -> 1, numOutputBytes -> 4963, numAddedFiles -> 1)"
12,2020-05-26T21:42:30.000+0000,4627966444400556,nacoloss@microsoft.com,STREAMING UPDATE,"Map(outputMode -> Append, queryId -> 20e4bd40-5e68-4266-a9a1-563165fa83ce, epochId -> 12)",,List(4491530867301522),0526-164620-mace887,11.0,WriteSerializable,True,"Map(numFiles -> 1, numRemovedFiles -> 0, numOutputRows -> 300, numParts -> 1, numOutputBytes -> 4969, numAddedFiles -> 1)"
11,2020-05-26T21:42:00.000+0000,4627966444400556,nacoloss@microsoft.com,STREAMING UPDATE,"Map(outputMode -> Append, queryId -> 20e4bd40-5e68-4266-a9a1-563165fa83ce, epochId -> 11)",,List(4491530867301522),0526-164620-mace887,10.0,WriteSerializable,True,"Map(numFiles -> 1, numRemovedFiles -> 0, numOutputRows -> 300, numParts -> 1, numOutputBytes -> 4946, numAddedFiles -> 1)"
