## Lab: Building Structured Streaming Pipelines
---
### Pre-Lab Activity
#### Refer to the Word Document "Pre-Lab or Pre-Demo Instructions for Module 4" to ensure everything is configured correctly
---

**What's Required for this Lab?**
- Azure Event  Hubs Namespace
- Azure Event Hub
  - This refers to a hub being created on the namespace
- Connection String for the created Azure Event Hub
- Python installed in enviornment

___

### Pre-Lab Steps   
If you have completed the steps in the "Pre-Lab or Pre-Demo Instructions for Module 4" Word Document you can skip these steps
___

### Step 1: Install necessary libraries
----
For this lab you will need more than the standard Python libraries on your local computer to run the sender.py. The Sender application makes use of `azure.servicebus`. Therefore, install the `azure-servicebus` library by running `pip install azure-servicebus==0.21.1`.
  
**NOTE**: Please use version 0.21.1 when installing azure-servicebus
  
### Step 2: Provision Azure Event Hubs
---
1. Go to the Azure Portal
2. Click "Create a Resource"
3. Look for Azure Event Hubs
4. Create a Event Hub with the following specifications:  
    - Name: {choose your own name}
    - Pricing Tier: Basic
    - Make this namespace zone redundant: Leave Disabled/Unchecked
    - Subscription: Use auto-populated subscription, unless directed to use another by instructor
    - Resource Group: {choose your own}
    - Location: {choose whichever is most appropriate}
    - Throughput Units: 1
    - Enable Auto-Inflate: Leave Disabled/Unchecked
5. Within the provisioned Azure Event Hubs Namespace, create a Event Hub
6. Get the Connection String and Name of the Hub for the Event Hub  
### Step 3: Edit the `sender.py` File
---
Open `sender.py`, which can be found in the Lab Folder, in a code editor of your choice.  
1. In line 8, where it says `sbs = ServiceBusService(service_namespace='<hub_name>', shared_access_key_name='<key_name>', shared_access_key_value='<key_value>')` put in the correct values for the name of the hub, the key name and the value of the key. All of these can be found in the Azure Portal in the Azure Event Hub that you provisioned in Step 2.
2. In line 47, put your hub name in place of `<hub-name>`
### Step 4: Run the `sender.py` program
---
1. Open Command Prompt, or some environment that allows you to run Python files
2. Navigate to the directory where `sender.py` resides
3. Run the following command `python sender.py`
    - This will send messages to the Event Hub for a little more than 2 hours giving you enough time to complete this lab

### Lab Activities  
Follow the set up guide "Pre-Lab or Pre-Demo Instructions for Module 4"

---

### Step 1: Establish connection between Databricks `readStream` event and Azure Event Hub
---
In the following cell, create the configuration object and the `readStream` event that is necessary to create the streaming DataFrame for data coming into the Azure Event Hub.  
  
**Notes**:  
- `format` option should be `org.apache.spark.sql.eventhubs.EventHubsSourceProvider`
- Use the double asterisks in the `.option(**eventHubConfigurationObject)` in the `readStream` event

In [3]:
dbutils.fs.mount(
  source = "wasbs://productswithreference@productstoragestreaming.blob.core.windows.net",
  mount_point = "/mnt/products",
  extra_configs = {"fs.azure.account.key.productstoragestreaming.blob.core.windows.net":dbutils.secrets.get(scope = "key-vault-secrets", key = "secretproduct")})


In [4]:
connectionString = "Endpoint=sb://demoeventhubtwitter.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=QVtgInlF0mNVrMXpQoddywlPVueBL6evPRmH1wRz+ZQ=;EntityPath=demotwittereh"

# Event Hubs Connection Configuration
ehConf = {
  'eventhubs.connectionString' : connectionString
}

productsSoldStream = spark \
  .readStream \
  .format("org.apache.spark.sql.eventhubs.EventHubsSourceProvider") \
  .options(**ehConf) \
  .load()
display(productsSoldStream)

### Step 2: Import necessary libarries for using `StructType`, `StructField`, and the `from_json` function
---
The libraries are `pyspark.sql.types` and `pyspark.sql.functions`

In [6]:
from pyspark.sql.types import *
from pyspark.sql.functions import *

### Step 3: Create the schema for the incoming data
---
Take a look at the data coming into the Azure Event Hub and create an appropriate schema. **Note**: Timestamp will not be a timestamp field, it must be a string.

In [8]:
productsSchema = StructType([
  StructField("room_id", IntegerType(), True),
  StructField("survey_id", IntegerType(), True),
  StructField("host_id", IntegerType(), True),
  StructField("room_type", StringType(), True),
  StructField("neighborhood", StringType(), True),
  StructField("reviews", IntegerType(), True),
  StructField("overall_satisfaction", DoubleType(), True),
  StructField("accommodates", IntegerType(), True),
  StructField("bedrooms", IntegerType(), True),
  StructField("price", DoubleType(), True),
  StructField("name", StringType(), True),
  StructField("last_modified", StringType(), True),
  StructField("latitude", DoubleType(), True),
  StructField("longitude", DoubleType(), True)
])

### Step 4: Grab the body and the enqueuedTime
---
Select the body of the message as well as the enqueuedTime of the message from the streaming DataFrame created by the `readStream` event.  
  
**Notes**  
- Remember how to select the body of the message from Event Hub, it needs to be `cast` into some data type
- Think about giving an alias to enqueuedTime, possibly ArrivalTime?

In [10]:
productsRaw = productsSoldStream.select(productsSoldStream.body.cast('string'), productsSoldStream.enqueuedTime.alias('ArrivalTime'))
display(productsRaw)

### Step 5: Use `from_json` to break down the JSON data
---
Remember to add the schema in with the `from_json` function, and remember to select the `enqueuedTime` along with the JSON data.

In [12]:
productsJson = productsRaw.select(from_json(productsRaw.body, schema=productsSchema).alias("json"), productsRaw.ArrivalTime)
display(productsJson)

### Step 6: Breakdown the `struct` into individual columns
---
Use the `"<col_name>.*"` syntax to grab all the values in the `struct` object and put them into individual columns. Remember to also select the `enqueuedTime`.

In [14]:
products = productsJson.select("json.*", productsJson.ArrivalTime)
display(products)

### Step 7: Create and Import `products` table
---
In the directory with `sender.py` and this notebook you will find a small CSV file named `products.csv`. Drag this into your Databricks workspace and create a table.  
  
Now, once, imported, read the table into this notebook.

In [16]:
productsInfo = spark.read.table("neigbourhoodinfo")

### Step 8: Join the Streaming DataFrame and the Static DataFrame
---
Using the streaming DataFrame created in **Step 6** and the static DataFrame created in **Step 11**, join them together on the `ProductType` column.

In [18]:
products = products.join(productsInfo, on="neighborhood")
display(products)

### Step 9: Create the `writeStream` event
---
Create a `writeStream` event that uses Databricks Delta. Additionally, add a trigger that will have the engine write every two processing seconds. Remember to add a location to store checkpoints. Have the event write to a table directly, and name that table `storeSalesDelta`.  
  
**Notes**  
- The `format` option should be `delta`
- The trigger is set using the `processingTime` value

In [20]:
products.writeStream \
          .format("delta") \
          .trigger(processingTime = "2 seconds") \
          .option("checkpointLocation", "/mod4/lab1/sink/checkpoints/") \
          .table("storeSalesDelta")


In [21]:
checkpoint_path_silver='/mnt/products/checkpoints/silver/'
silver_path='/mnt/products/silver/'
products.writeStream\
  .format("delta") \
  .outputMode("append")\
  .option("checkpointLocation", checkpoint_path_silver) \
  .start(silver_path)

In [22]:
checkpoint_path_gold='/mnt/products/checkpoints/gold/'
gold_path='/mnt/products/gold/'
products.writeStream\
  .format("csv") \
  .outputMode("append")\
  .option("checkpointLocation", checkpoint_path_gold) \
  .start(gold_path)

### Step 10: Find the Average Profit for Every 5 Minute Interval
---
Using SQL and the expression `AVG((price-CostOfProduction)) as AverageProfit` find the average profit grouped by a window of 5 minutes based on `enqueuedTime`.

In [24]:
%sql
SELECT AVG((price-CostOfProduction)) as AverageProfit
FROM storeSalesDelta
GROUP BY WINDOW(ArrivalTime, '5 minutes')

## Step 11: Run a `OPTIMIZE` and `VACUUM` job on the `storeSalesDelta` table
---
Using SQL, run the `OPTIMIZE` and `VACUUM` jobs on the table crated in **Step 9**

In [26]:
%sql
OPTIMIZE storeSalesDelta

In [27]:
%sql
VACUUM storeSalesDelta