# Streaming Demo

Before running this notebook, ensure you have:
- Imported the following Maven coordinate: `com.microsoft.azure:azure-eventhubs-spark_2.11:2.3.6`
- Created an [Azure Key Vault-backed secret scope](https://docs.microsoft.com/en-us/azure/databricks/security/secrets/secret-scopes#--create-an-azure-key-vault-backed-secret-scope)

Below, you will find details on reading data from an Event Hub stream and producing an exploratory dashboard, all within Databricks.

1. Configure Event Hub and add notebook parameters
2. Read stream from Event Hub & write to a temporary directory in Delta format
3. Read stream from temporary directory and apply JSON schema
4. Query data and add cell output to dashboard

### Configure Event Hub and add notebook parameters

For help setting up an Azure Key Vault-backed secret scope, [reference the documentation located here](https://docs.microsoft.com/en-us/azure/databricks/security/secrets/secret-scopes#--create-an-azure-key-vault-backed-secret-scope).

In the next two cells, we pull the Event Hub connection string from Azure Key Vault, create an Event Hub configuration object and prepare a temporary directory for writing Event Hub stream in Delta format.

Databricks integrates with Keyvault through secret scopes to protect connection strings

In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *
from datetime import datetime as dt
import json

# Retrieve the Event Hub connection string from Azure Key Vault and configure Event Hub
event_hub_connection_string = dbutils.secrets.get("key-vaults-secrets", "ioteventhubreader") # .get( < SecretScopeName > , < SecretName > )
event_hub_connection_string = sc._jvm.org.apache.spark.eventhubs.EventHubsUtils.encrypt(event_hub_connection_string)
consumer_group = "databricks"

# Specifying an additional consumer group to strip the data off the iot hub so I don't interfere with the streaming data sent to stream analytics which leverages  '$default'

ehConf = {
  'eventhubs.connectionString' : event_hub_connection_string,
  'eventhubs.consumerGroup' : consumer_group
}

# Specify the temp directory for storing the streaming data from the hub.  Note, Databricks will create it if it does not exist.
tempDir = "/dbfs/tmp/streamingTemp"

# Clean up the tempDir from any previous runs
dbutils.fs.rm(tempDir, True)

In [0]:
# Create some parameters for the queries below
# Allowing me to filter the data in city or sensor
dbutils.widgets.dropdown("City", "Pittsburgh", ["Pittsburgh", "Cleveland", "Rochester", "Detroit"])
dbutils.widgets.dropdown("Sensor", "temperature", ["temperature", "pressure", "humidity"])

city = getArgument("City")
sensor = getArgument("Sensor")

### Read stream from Event Hub and write to temporary directory using delta format

First, we read the stream from Event Hub while supplying the configuration we created in the previous cells.
Then, we write the stream directly to a temporary directory in delta format.

We do this because the Event Hub stream is using a cursor and does not handle multiple queries reading from a single stream. Working with the data in a delta format gives us the flexibility to have multiple queries from a single stream.

In [0]:
# Read Event Hub stream
streamingInputDF = spark \
  .readStream \
  .format("eventhubs") \
  .options(**ehConf) \
  .load()


In [0]:
# Write stream out to the tempDir in delta format
# Pull the data off immediately so I can start to manipulate it in delta
streamingInputDF.writeStream \
 .format("delta") \
 .queryName("streaming_query") \
 .option("checkpointLocation", tempDir + "/_checkpoint") \
 .outputMode("append") \
 .start(tempDir)

### Read stream from temporary directory (in delta format) and apply JSON schema to extract data from payload

In the next few cells, we read data from the temporary directory in delta format, explore the content of the payload and then apply a JSON schema to the payload to extract data values.

In [0]:
# Delta handles having multiple readers more elegantly. Open up a stream from the temp directory where we wrote the data in delta format.
streamingData = spark.readStream \
    .format("delta") \
    .load(tempDir)

In [0]:
# Display the raw data. Event hubs body comes in as base64 encoded, so we need to cast to a string
display(streamingData.withColumn("body", streamingData["body"].cast("string")))

In [0]:
# Apply schema to JSON payload to extract values

# JSON schema
schema = StructType([
  StructField("city", StringType()),
  StructField("room", StringType()),
  StructField("sensor", StringType()), 
  StructField("value", DoubleType())
])

# Cast as string, apply JSON schema, select & filter columns
payloadDF = streamingData \
  .selectExpr("cast (body as STRING) jsonData", "enqueuedTime") \
  .select(from_json("jsonData", schema).alias("payload"), "enqueuedTime")\
  .select(col("payload.city"),col("payload.room"), col("payload.sensor"), col("payload.value"), col("enqueuedTime"))\
  .where((streamingData.enqueuedTime > "2020-05-14"))

display(payloadDF)

### Query data for exploratory analysis and creating charts for a dashboard

Click the graph icon on the top right of the cell to add a cell's output to a dashboard.

In [0]:
# Filter data by city and sensor to show as a tabular view
display(payloadDF.select("room", "value").where((col("payload.city") == city) & (col("payload.sensor") == sensor)))

In [0]:
# Run the same query, but this time have it plot as a line graph.
display(payloadDF.where((col("payload.city") == city) & (col("payload.sensor") == sensor)))

In [0]:
# Run the same query one more time, but display as a bar chart.
display(payloadDF.where((col("payload.city") == city) & (col("payload.sensor") == sensor)))

In [0]:
# Click on the "View:" and under "Dashboards:" select Streaming Dashboard

Don't forget to click the "Stop Execution" on the notebook so that the streams will be stopped!!