# DEMO: Using Reference Data in a Streaming Pipeline
---

Author: Max Fisher (mafis)

---
  
In this demo we will show how you can incorporate static reference data to enrich incoming, real-time data. Sometimes you will have scenarios where you may need to tag certain messages with a particular identifier, or maybe the refernece data holds some values that you need to check to see whether the incoming data is relevant.  
  
Whatever your case may be, reference data can help us enrich our real-time pipelines, so let us demonstrate how to handle this while using Structured Streaming.

First, let's provision Azure Event Hubs. Come back to this notebook once you have this provisioned.  
  
Secondly, let's take a quick look at the code in the Python Producer. It will be called `sender.py` in the Demo folder.

Lastly, enter in the credentials for the Azure Event Hub into the `sender.py` and run the Python Program.

First, lets take a look at the reference data itself. We should have it sitting in DBFS, let's check. If you have not imported the stores.csv, please import the CSV into Databricks by dragging and dropping the stores.csv from this Demo's Data folder into the Databricks Workspace. Ensure that the file is saved as a table with the title seen in the filepath below.

In [3]:
storesDF = spark.read.csv(dbutils.fs.ls("/FileStore/tables/stores.csv")[0][0], header=True, inferSchema=True)
storesDF.show()

Now let's establish our connection to the stream of data in the Azure Event Hub. The following two cells both estblish the connection string, configuration object, and the readStream event.

In [5]:
# The connection string to your Event Hubs Namespace
connectionString = "<connection-string>;EntityPath=<hub-name>"
# Event Hubs Connection Configuration
ehConf = {
  'eventhubs.connectionString' : connectionString
}

In [6]:
productsSoldStream = spark \
  .readStream \
  .format("org.apache.spark.sql.eventhubs.EventHubsSourceProvider") \
  .options(**ehConf) \
  .load()

In [7]:
display(productsSoldStream)

Now let's import some PySpark libraries so that we can use StructField, StructType, and from_json.

In [9]:
from pyspark.sql.types import *
from pyspark.sql.functions import *

In the following cell we both grab the data being streamed into the EventHub and the Arrival Time of the message that Azure provides.

In [11]:
productsRaw = productsSoldStream.select(productsSoldStream.body.cast('string'), productsSoldStream.enqueuedTime.alias('ArrivalTime'))

Now let's take a look at the initial data.

In [13]:
display(productsRaw)

body,ArrivalTime
"{""storeId"": 1007, ""timestamp"": 1541695193.6862552, ""producttype"": 9, ""name"": ""Rice Cooker"", ""category"": ""Kitchen"", ""price"": 29.99, ""quantity"": 1}",2018-11-08T16:39:42.586+0000
"{""storeId"": 1005, ""timestamp"": 1541695198.7693238, ""producttype"": 1, ""name"": ""Chef's Knife"", ""category"": ""Kitchen"", ""price"": 15.99, ""quantity"": 2}",2018-11-08T16:39:47.656+0000
"{""storeId"": 1001, ""timestamp"": 1541695506.1699357, ""producttype"": 3, ""name"": ""Wooden Spoon"", ""category"": ""Kitchen"", ""price"": 1.99, ""quantity"": 3}",2018-11-08T16:44:55.493+0000
"{""storeId"": 1007, ""timestamp"": 1541695511.715615, ""producttype"": 6, ""name"": ""Saucepan"", ""category"": ""Kitchen"", ""price"": 21.99, ""quantity"": 3}",2018-11-08T16:45:00.643+0000
"{""storeId"": 1001, ""timestamp"": 1541695516.8080485, ""producttype"": 1, ""name"": ""Chef's Knife"", ""category"": ""Kitchen"", ""price"": 15.99, ""quantity"": 3}",2018-11-08T16:45:05.729+0000
"{""storeId"": 1001, ""timestamp"": 1541695521.9041686, ""producttype"": 5, ""name"": ""Roasting Pan"", ""category"": ""Kitchen"", ""price"": 17.99, ""quantity"": 3}",2018-11-08T16:45:10.816+0000
"{""storeId"": 1008, ""timestamp"": 1541695526.9835958, ""producttype"": 2, ""name"": ""Spatula"", ""category"": ""Kitchen"", ""price"": 2.99, ""quantity"": 1}",2018-11-08T16:45:15.885+0000
"{""storeId"": 1002, ""timestamp"": 1541695532.0606434, ""producttype"": 11, ""name"": ""Grill"", ""category"": ""Outdoors"", ""price"": 159.99, ""quantity"": 1}",2018-11-08T16:45:20.962+0000
"{""storeId"": 1006, ""timestamp"": 1541695537.138222, ""producttype"": 9, ""name"": ""Rice Cooker"", ""category"": ""Kitchen"", ""price"": 29.99, ""quantity"": 1}",2018-11-08T16:45:26.027+0000
"{""storeId"": 1002, ""timestamp"": 1541695542.2193782, ""producttype"": 1, ""name"": ""Chef's Knife"", ""category"": ""Kitchen"", ""price"": 15.99, ""quantity"": 2}",2018-11-08T16:45:31.150+0000


To use the refernce data we are going to need to break that json stucture apart. The first step is to establish a schema for those fields. We include Arrival Time since we want to bring that field across as well. We use the `from_json` function imported in the `pyspark.sql.functions` library earlier. This function parses the JSON to create a struct object based off the field in the JSON message.

In [15]:
productsSchema = StructType([
  StructField("storeId", IntegerType(), True),
  StructField("timestamp", StringType(), True),
  StructField("producttype", IntegerType(), True),
  StructField("name", StringType(), True),
  StructField("category", StringType(), True),
  StructField("price", DoubleType(), True),
  StructField("quantity", IntegerType(), True)
])

productsJson = productsRaw.select(from_json(productsRaw.body, schema=productsSchema).alias("json"), productsRaw.ArrivalTime)

Let's take a look at the table that is streamed now.

In [17]:
display(productsJson)

json,ArrivalTime
"List(1007, 1.5416951936862552E9, 9, Rice Cooker, Kitchen, 29.99, 1)",2018-11-08T16:39:42.586+0000
"List(1005, 1.5416951987693238E9, 1, Chef's Knife, Kitchen, 15.99, 2)",2018-11-08T16:39:47.656+0000
"List(1001, 1.5416955061699357E9, 3, Wooden Spoon, Kitchen, 1.99, 3)",2018-11-08T16:44:55.493+0000
"List(1007, 1.541695511715615E9, 6, Saucepan, Kitchen, 21.99, 3)",2018-11-08T16:45:00.643+0000
"List(1001, 1.5416955168080485E9, 1, Chef's Knife, Kitchen, 15.99, 3)",2018-11-08T16:45:05.729+0000
"List(1001, 1.5416955219041686E9, 5, Roasting Pan, Kitchen, 17.99, 3)",2018-11-08T16:45:10.816+0000
"List(1008, 1.5416955269835958E9, 2, Spatula, Kitchen, 2.99, 1)",2018-11-08T16:45:15.885+0000
"List(1002, 1.5416955320606434E9, 11, Grill, Outdoors, 159.99, 1)",2018-11-08T16:45:20.962+0000
"List(1006, 1.541695537138222E9, 9, Rice Cooker, Kitchen, 29.99, 1)",2018-11-08T16:45:26.027+0000
"List(1002, 1.5416955422193782E9, 1, Chef's Knife, Kitchen, 15.99, 2)",2018-11-08T16:45:31.150+0000


Now from this struct object we need to select the relevant fields that we want. In this case we want all the fields in the struct, so we use the asterisk `<field_name>.*` to grab all the fields in the struct. Since the struct has a name of `json` we write `json.*` since `json` is the `<field_name>`.

In [19]:
products = productsJson.select("json.*", productsJson.ArrivalTime)

Now let's take a look.

In [21]:
display(products)

storeId,timestamp,producttype,name,category,price,quantity,ArrivalTime
1007,1541695193.6862552,9,Rice Cooker,Kitchen,29.99,1,2018-11-08T16:39:42.586+0000
1005,1541695198.7693238,1,Chef's Knife,Kitchen,15.99,2,2018-11-08T16:39:47.656+0000
1001,1541695506.1699357,3,Wooden Spoon,Kitchen,1.99,3,2018-11-08T16:44:55.493+0000
1007,1541695511.715615,6,Saucepan,Kitchen,21.99,3,2018-11-08T16:45:00.643+0000
1001,1541695516.8080485,1,Chef's Knife,Kitchen,15.99,3,2018-11-08T16:45:05.729+0000
1001,1541695521.9041686,5,Roasting Pan,Kitchen,17.99,3,2018-11-08T16:45:10.816+0000
1008,1541695526.9835958,2,Spatula,Kitchen,2.99,1,2018-11-08T16:45:15.885+0000
1002,1541695532.0606434,11,Grill,Outdoors,159.99,1,2018-11-08T16:45:20.962+0000
1006,1541695537.138222,9,Rice Cooker,Kitchen,29.99,1,2018-11-08T16:45:26.027+0000
1002,1541695542.2193782,1,Chef's Knife,Kitchen,15.99,2,2018-11-08T16:45:31.150+0000


Now that it is all broken out into separate columns we can perform the join operation with the static `stores` DataFrame. Using the syntax of `streamingDf.join(staticDf, "type")` we can join the two datasets.

In [23]:
joinedDF = products.join(storesDF, "storeId")

Let's see if the join was successful.

In [25]:
display(joinedDF)

storeId,timestamp,producttype,name,category,price,quantity,ArrivalTime,StoreLocation,StoreMangerId
1002,1541695542.2193782,1,Chef's Knife,Kitchen,15.99,2,2018-11-08T16:45:31.150+0000,10015,3
1006,1541695537.138222,9,Rice Cooker,Kitchen,29.99,1,2018-11-08T16:45:26.027+0000,10005,7
1002,1541695547.36186,1,Chef's Knife,Kitchen,15.99,1,2018-11-08T16:45:36.264+0000,10015,3
1005,1541695552.438633,9,Rice Cooker,Kitchen,29.99,1,2018-11-08T16:45:41.355+0000,10004,6
1006,1541695557.5297103,7,Blender,Kitchen,25.99,3,2018-11-08T16:45:46.421+0000,10005,7
1009,1541695562.604473,4,Knife Block,Kitchen,10.99,2,2018-11-08T16:45:51.512+0000,10008,10
1004,1541695567.72709,10,Cutting Board,Kitchen,12.99,1,2018-11-08T16:45:56.626+0000,10002,5
1007,1541695572.8029454,8,Toaster,Kitchen,30.99,2,2018-11-08T16:46:01.700+0000,10006,8
1006,1541695577.8798635,2,Spatula,Kitchen,2.99,2,2018-11-08T16:46:06.799+0000,10005,7
1003,1541695582.9763372,4,Knife Block,Kitchen,10.99,1,2018-11-08T16:46:11.872+0000,10001,4


As you can see the join was successful and we are able to see more information about the store with each streamed transaction. Think about the possibilities of being able to enhance real-time streaming data with datasets like `stores` and how much more you can learn about your data as it streams into Azure.

Now we can add a calculation for **TotalRevenuePerTransaction** by using the `withColumn()` function to create a new column on the streaming dataset that multiplies the price of the item and the quantity sold.

In [28]:
joinedDFwithRev = joinedDF.withColumn("totalRevenuePerTransaction", joinedDF.price*joinedDF.quantity)

In [29]:
display(joinedDFwithRev)

storeId,timestamp,producttype,name,category,price,quantity,ArrivalTime,StoreLocation,StoreMangerId,totalRevenuePerTransaction
1004,1541695623.742716,6,Saucepan,Kitchen,21.99,2,2018-11-08T16:46:52.640+0000,10002,5,43.98
1004,1541695628.8179677,2,Spatula,Kitchen,2.99,2,2018-11-08T16:46:57.712+0000,10002,5,5.98
1010,1541695633.9085925,5,Roasting Pan,Kitchen,17.99,1,2018-11-08T16:47:02.828+0000,10009,11,17.99
1010,1541695639.006253,3,Wooden Spoon,Kitchen,1.99,1,2018-11-08T16:47:07.902+0000,10009,11,1.99
1005,1541695644.079338,8,Toaster,Kitchen,30.99,3,2018-11-08T16:47:12.985+0000,10004,6,92.97
1007,1541695649.147064,8,Toaster,Kitchen,30.99,2,2018-11-08T16:47:18.043+0000,10006,8,61.98
1007,1541695654.2300937,4,Knife Block,Kitchen,10.99,2,2018-11-08T16:47:23.173+0000,10006,8,21.98
1001,1541695659.3771,4,Knife Block,Kitchen,10.99,3,2018-11-08T16:47:28.278+0000,10014,2,32.97
1004,1541695664.4725137,5,Roasting Pan,Kitchen,17.99,2,2018-11-08T16:47:33.378+0000,10002,5,35.98
1000,1541695669.5534933,7,Blender,Kitchen,25.99,1,2018-11-08T16:47:38.451+0000,10013,1,25.99
