# End to End Industrial IoT (IIoT) on Azure Databricks 
## Part 2 - Machine Learning
This notebook demonstrates the following architecture for IIoT Ingest, Processing and Analytics on Azure. The following architecture is implemented for the demo. 
<img src="https://sguptasa.blob.core.windows.net/random/iiot_blog/end_to_end_architecture.png" width=800>

The notebook is broken into sections following these steps:
3. **Machine Learning** - train XGBoost regression models using distributed ML to predict power output and asset remaining life on historical sensor data
4. **Model Deployment** - deploy trained models for real-time serving in Azure ML services 
5. **Model Inference** - score real data instantly against hosted models via REST API

In [0]:
# AzureML Workspace info (name, region, resource group and subscription ID) for model deployment
dbutils.widgets.text("Subscription ID","<your Azure subscription ID>","Subscription ID")
dbutils.widgets.text("Resource Group","<your Azure resource group name>","Resource Group")
dbutils.widgets.text("Region","<your Azure region>","Region")
dbutils.widgets.text("Storage Account","<your ADLS Gen 2 account name>","Storage Account")

## Step 1 - Environment Setup

The pre-requisites are listed below:

### Azure Services Required
* ADLS Gen 2 Storage account with a container called `iot`
* (Optional) Azure Machine Learning Workspace called `iot`

### Azure Databricks Configuration Required
* 3-node (min) Databricks Cluster running **DBR 7.0ML+** and the following libraries:
 * **MLflow[AzureML]** - PyPI library `azureml-mlflow` (optional, if using AzureML)
* The following Secrets defined in scope `iot`
 * `adls_key` - Access Key to ADLS storage account **(Important - use the [Access Key](https://raw.githubusercontent.com/tomatoTomahto/azure_databricks_iot/master/bricks.com/blog/2020/03/27/data-exfiltration-protection-with-azure-databricks.html))**
* (Optional - if using AzureML) The following notebook widgets populated:
 * `Subscription ID` - subscription ID of your Azure ML Workspace
 * `Resource Group` - resource group name of your Azure ML Workspace
 * `Region` - Azure region of your Azure ML Workspace
 * `Storage Account` - Name of your storage account
* **Part 1 Notebook Run to generate and process the data** (this can be found [here](https://databricks.com/notebooks/iiot/iiot-end-to-end-part-1.html)). Ensure the following tables have been created:
 * **turbine_maintenance** - Maintenance dates for each Wind Turbine
 * **turbine_power** - Hourly power output for each Wind Turbine
 * **turbine_enriched** - Hourly turbine sensor readinigs (RPM, Angle) enriched with weather readings (temperature, wind speed/direction, humidity)
 * **gold_readings** - Combined view containing all 3 tables

In [0]:
# Setup access to storage account for temp data when pushing to Synapse
storage_account = dbutils.widgets.get("Storage Account")
spark.conf.set(f"fs.azure.account.key.{storage_account}.dfs.core.windows.net", dbutils.secrets.get("iot","adls_key"))

# Setup storage locations for all data
ROOT_PATH = f"abfss://iot@{storage_account}.dfs.core.windows.net/"

# Pyspark and ML Imports
import os, json, requests
from pyspark.sql import functions as F
from pyspark.sql.functions import pandas_udf, PandasUDFType
import numpy as np 
import pandas as pd
import xgboost as xgb
import mlflow.xgboost
import mlflow.azureml
# from azureml.core import Workspace
# from azureml.core.webservice import AciWebservice, Webservice
import random, string

# Random String generator for ML models served in AzureML
random_string = lambda length: ''.join(random.SystemRandom().choice(string.ascii_lowercase) for _ in range(length))

## Step 3 - Machine Learning
Now that our data is flowing reliably from our sensor devices into an enriched Delta table in Data Lake storage, we can start to build ML models to predict power output and remaining life of our assets using historical sensor, weather, power and maintenance data. 

We create two models ***for each Wind Turbine***:
1. Turbine Power Output - using current readings for turbine operating parameters (angle, RPM) and weather (temperature, humidity, etc.), predict the expected power output 6 hours from now
2. Turbine Remaining Life - predict the remaining life in days until the next maintenance event

<img src="https://sguptasa.blob.core.windows.net/random/iiot_blog/turbine_models.png" width=800>

We will use the XGBoost framework to train regression models. Due to the size of the data and number of Wind Turbines, we will use Spark UDFs to distribute training across all the nodes in our cluster.

### 3a. Feature Engineering
In order to predict power output 6 hours ahead, we need to first time-shift our data to create our label column. We can do this easily using Spark Window partitioning. 

In order to predict remaining life, we need to backtrace the remaining life from the maintenance events. We can do this easily using cross joins. The following diagram illustrates the ML Feature Engineering pipeline:

<img src="https://sguptasa.blob.core.windows.net/random/iiot_blog/ml_pipeline.png" width=800>

In [0]:
%sql
-- Calculate the age of each turbine and the remaining life in days
CREATE OR REPLACE VIEW iot.turbine_age AS
WITH reading_dates AS (SELECT distinct date, deviceid FROM iot.turbine_power),
  maintenance_dates AS (
    SELECT d.*, datediff(nm.date, d.date) as datediff_next, datediff(d.date, lm.date) as datediff_last 
    FROM reading_dates d LEFT JOIN iot.turbine_maintenance nm ON (d.deviceid=nm.deviceid AND d.date<=nm.date)
    LEFT JOIN iot.turbine_maintenance lm ON (d.deviceid=lm.deviceid AND d.date>=lm.date ))
SELECT date, deviceid, ifnull(min(datediff_last),0) AS age, ifnull(min(datediff_next),0) AS remaining_life
FROM maintenance_dates 
GROUP BY deviceid, date;

-- Calculate the power 6 hours ahead using Spark Windowing and build a feature_table to feed into our ML models
CREATE OR REPLACE VIEW iot.feature_table AS
SELECT r.*, age, remaining_life,
  LEAD(power, 72, power) OVER (PARTITION BY r.deviceid ORDER BY window) as power_6_hours_ahead
FROM iot.gold_readings r JOIN iot.turbine_age a ON (r.date=a.date AND r.deviceid=a.deviceid)
WHERE r.date < CURRENT_DATE();

In [0]:
%sql
SELECT window, power, power_6_hours_ahead FROM iot.feature_table WHERE deviceid='WindTurbine-1'

window,power,power_6_hours_ahead
2020-03-05T00:00:00.000+0000,166.00524927526,181.572853131252
2020-03-05T00:05:00.000+0000,147.34510413737854,248.94219397830972
2020-03-05T00:10:00.000+0000,178.39670420601195,145.2832512297918
2020-03-05T00:15:00.000+0000,165.64688907490947,193.88194602795616
2020-03-05T00:20:00.000+0000,168.44869592270456,119.94487188054808
2020-03-05T00:25:00.000+0000,171.5850307172491,182.8221866810804
2020-03-05T00:30:00.000+0000,173.8008020703204,219.3128048302275
2020-03-05T00:35:00.000+0000,177.5455988737502,224.87071589934723
2020-03-05T00:40:00.000+0000,197.677395332042,162.131372609168
2020-03-05T00:45:00.000+0000,210.6861246227178,168.7735359495599


In [0]:
%sql
SELECT date, avg(age) as age, avg(remaining_life) as life FROM iot.feature_table WHERE deviceid='WindTurbine-1' GROUP BY date ORDER BY date

date,age,life
2020-03-05,0.0,9.0
2020-03-06,0.0,8.0
2020-03-07,0.0,7.0
2020-03-08,0.0,6.0
2020-03-09,0.0,5.0
2020-03-10,0.0,4.0
2020-03-11,0.0,3.0
2020-03-12,0.0,2.0
2020-03-13,0.0,1.0
2020-03-14,0.0,0.0


### 3b. Distributed Model Training - Predict Power Output
[Pandas UDFs](https://docs.microsoft.com/en-us/azure/databricks/spark/latest/spark-sql/udf-python-pandas?toc=https%3A%2F%2Fdocs.microsoft.com%2Fen-us%2Fazure%2Fazure-databricks%2Ftoc.json&bc=https%3A%2F%2Fdocs.microsoft.com%2Fen-us%2Fazure%2Fbread%2Ftoc.json) allow us to vectorize Pandas code across multiple nodes in a cluster. Here we create a UDF to train an XGBoost Regressor model against all the historic data for a particular Wind Turbine. We use a Grouped Map UDF as we perform this model training on the Wind Turbine group level.

In [0]:
# Create a function to train a XGBoost Regressor on a turbine's data
def train_distributed_xgb(readings_pd, model_type, label_col, prediction_col):
  mlflow.xgboost.autolog()
  with mlflow.start_run():
    # Log the model type and device ID
    mlflow.log_param('deviceid', readings_pd['deviceid'][0])
    mlflow.log_param('model', model_type)

    # Train an XGBRegressor on the data for this Turbine
    alg = xgb.XGBRegressor() 
    train_dmatrix = xgb.DMatrix(data=readings_pd[feature_cols].astype('float'),label=readings_pd[label_col])
    params = {'learning_rate': 0.5, 'alpha':10, 'colsample_bytree': 0.5, 'max_depth': 5}
    model = xgb.train(params=params, dtrain=train_dmatrix, evals=[(train_dmatrix, 'train')])

    # Make predictions on the dataset and return the results
    readings_pd[prediction_col] = model.predict(train_dmatrix)
  return readings_pd

# Create a Spark Dataframe that contains the features and labels we need
non_feature_cols = ['date','window','deviceid','winddirection','remaining_life']
feature_cols = ['angle','rpm','temperature','humidity','windspeed','power','age']
label_col = 'power_6_hours_ahead'
prediction_col = label_col + '_predicted'

# Read in our feature table and select the columns of interest
feature_df = spark.table('iot.feature_table').selectExpr(non_feature_cols + feature_cols + [label_col] + [f'0 as {prediction_col}'])

# Register a Pandas UDF to distribute XGB model training using Spark
@pandas_udf(feature_df.schema, PandasUDFType.GROUPED_MAP)
def train_power_models(readings_pd):
  return train_distributed_xgb(readings_pd, 'power_prediction', label_col, prediction_col)

# Run the Pandas UDF against our feature dataset - this will train 1 model for each turbine
power_predictions = feature_df.groupBy('deviceid').apply(train_power_models)

# Save predictions to storage
power_predictions.write.format("delta").mode("overwrite").partitionBy("date").saveAsTable("iot.turbine_power_predictions")

In [0]:
%sql 
-- Plot actuals vs. predicted
SELECT date, deviceid, avg(power_6_hours_ahead) as actual, avg(power_6_hours_ahead_predicted) as predicted FROM iot.turbine_power_predictions GROUP BY date, deviceid

date,deviceid,actual,predicted
2019-08-12,WindTurbine-7,175.90816477257303,165.40277777777777
2019-11-16,WindTurbine-5,144.78358812705568,149.98263888888889
2019-10-07,WindTurbine-6,165.56640621995112,155.34027777777777
2020-03-26,WindTurbine-0,154.6328803723859,154.37847222222223
2019-11-15,WindTurbine-4,144.75647333463564,149.84027777777777
2020-02-20,WindTurbine-7,154.87931667174772,151.32638888888889
2019-09-16,WindTurbine-2,173.06422587348453,153.46180555555554
2020-07-06,WindTurbine-2,151.1895332307547,154.59722222222223
2020-08-07,WindTurbine-1,192.0223990647813,164.33333333333334
2020-05-06,WindTurbine-0,153.14875361488828,153.02777777777777


#### Automated Model Tracking in Databricks
As you train the models, notice how Databricks-managed MLflow automatically tracks each run in the "Runs" tab of the notebook. You can open each run and view the parameters, metrics, models and model artifacts that are captured by MLflow Autologging. For XGBoost Regression models, MLflow tracks: 
1. Any model parameters (alpha, colsample, learning rate, etc.) passed to the `params` variable
2. Metrics specified in `evals` (RMSE by default)
3. The trained XGBoost model file
4. Feature importances

<img src="https://sguptasa.blob.core.windows.net/random/iiot_blog/iiot_mlflow_tracking.gif" width=800>

### 3c. Distributed Model Training - Predict Remaining Life
Our second model predicts the remaining useful life of each Wind Turbine based on the current operating conditions. We have historical maintenance data that indicates when a replacement activity occured - this will be used to calculate the remaining life as our training label. 

Once again, we train an XGBoost model for each Wind Turbine to predict the remaining life given a set of operating parameters and weather conditions

In [0]:
# Create a Spark Dataframe that contains the features and labels we need
non_feature_cols = ['date','window','deviceid','winddirection','power_6_hours_ahead_predicted']
label_col = 'remaining_life'
prediction_col = label_col + '_predicted'

# Read in our feature table and select the columns of interest
feature_df = spark.table('iot.turbine_power_predictions').selectExpr(non_feature_cols + feature_cols + [label_col] + [f'0 as {prediction_col}'])

# Register a Pandas UDF to distribute XGB model training using Spark
@pandas_udf(feature_df.schema, PandasUDFType.GROUPED_MAP)
def train_life_models(readings_pd):
  return train_distributed_xgb(readings_pd, 'life_prediction', label_col, prediction_col)

# Run the Pandas UDF against our feature dataset - this will train 1 model per turbine and write the predictions to a table
life_predictions = (
  feature_df.groupBy('deviceid').apply(train_life_models)
    .write.format("delta").mode("overwrite")
    .partitionBy("date")
    .saveAsTable("iot.turbine_life_predictions")
)

In [0]:
%sql 
SELECT date, avg(remaining_life) as Actual_Life, avg(remaining_life_predicted) as Predicted_Life 
FROM iot.turbine_life_predictions 
WHERE deviceid='WindTurbine-1' 
GROUP BY date ORDER BY date

date,Actual_Life,Predicted_Life
2020-03-05,9.0,2.0902777777777777
2020-03-06,8.0,2.149305555555556
2020-03-07,7.0,2.2916666666666665
2020-03-08,6.0,2.201388888888889
2020-03-09,5.0,1.9479166666666667
2020-03-10,4.0,2.055555555555556
2020-03-11,3.0,2.180555555555556
2020-03-12,2.0,2.076388888888889
2020-03-13,1.0,2.076388888888889
2020-03-14,0.0,2.138888888888889


The models to predict remaining useful life have been trained and logged by MLflow. We can now move on to model deployment in AzureML.

## Step 4 - Model Deployment to MLflow
Now that our models have been trained, we can deploy them in an automated way directly to a model serving environment like Azure ML or MLflow. Below, we register the best performing models to the Databricks-native hosted MLflow model registry for deployment tracking and serving. Once registered, we can enable Serving to expose a REST API to the model. 

<img src="https://sguptasa.blob.core.windows.net/random/iiot_blog/mlflow_register_serve.gif" width=800>

In [0]:
turbine = "WindTurbine-1"
power_model = "power_prediction"
life_model = "life_prediction"

# Retrieve the remaining_life and power_output experiments on WindTurbine-1, and get the best performing model (min RMSE)
best_life_model = mlflow.search_runs(filter_string=f'params.deviceid="{turbine}" and params.model="{life_model}"')\
  .dropna().sort_values("metrics.train-rmse")['artifact_uri'].iloc[0] + '/model'
best_power_model = mlflow.search_runs(filter_string=f'params.deviceid="{turbine}" and params.model="{power_model}"')\
  .dropna().sort_values("metrics.train-rmse")['artifact_uri'].iloc[0] + '/model'

# Register our best performing models in the Databricks model registry
mlflow.register_model(best_life_model, life_model)
mlflow.register_model(best_power_model, power_model)

## Step 4 (Optional) - Model Deployment to AzureML
Now that our models have been trained, we can deploy them in an automated way directly to a model serving environment like Azure ML. Below, we connect to an AzureML workspace, build a container image for the model, and deploy that image to Azure Container Instances (ACI) to be hosted for REST API calls. 

**Note:** This step can take up to 10 minutes to run due to images being created and deplyed in Azure ML.

**Important:** This step requires authentication to Azure - open the link provided in the output of the cell in a new browser tab and use the code provided.

In [0]:
# AML Workspace Information - replace with your workspace info
aml_resource_group = dbutils.widgets.get("Resource Group")
aml_subscription_id = dbutils.widgets.get("Subscription ID")
aml_region = dbutils.widgets.get("Region")
aml_workspace_name = "iot"
turbine = "WindTurbine-1"
power_model = "power_prediction"
life_model = "life_prediction"

# Connect to a workspace (replace widgets with your own workspace info)
workspace = Workspace.create(name = aml_workspace_name,
                             subscription_id = aml_subscription_id,
                             resource_group = aml_resource_group,
                             location = aml_region,
                             exist_ok=True)

# Retrieve the remaining_life and power_output experiments on WindTurbine-1, and get the best performing model (min RMSE)
best_life_model = mlflow.search_runs(filter_string=f'params.deviceid="{turbine}" and params.model="{life_model}"')\
  .dropna().sort_values("metrics.train-rmse")['artifact_uri'].iloc[0] + '/model'
best_power_model = mlflow.search_runs(filter_string=f'params.deviceid="{turbine}" and params.model="{power_model}"')\
  .dropna().sort_values("metrics.train-rmse")['artifact_uri'].iloc[0] + '/model'

scoring_uris = {}
for model, path in [('life',best_life_model),('power',best_power_model)]:
  # Build images for each of our two models in Azure Container Instances
  print(f"-----Building image for {model} model-----")
  model_image, azure_model = mlflow.azureml.build_image(model_uri=path, 
                                                        workspace=workspace, 
                                                        model_name=model,
                                                        image_name=model,
                                                        description=f"XGBoost model to predict {model} of a turbine", 
                                                        synchronous=True)
  model_image.wait_for_creation(show_output=True)

  # Deploy web services to host each model as a REST API
  print(f"-----Deploying image for {model} model-----")
  dev_webservice_name = model + random_string(10)
  dev_webservice_deployment_config = AciWebservice.deploy_configuration()
  dev_webservice = Webservice.deploy_from_image(name=dev_webservice_name, image=model_image, deployment_config=dev_webservice_deployment_config, workspace=workspace)
  dev_webservice.wait_for_deployment()

  # Get the URI for sending REST requests to
  scoring_uris[model] = dev_webservice.scoring_uri

In [0]:
print(f"-----Model URIs for Scoring:-----")
print(f"Life Prediction URL: {scoring_uris['life']}")
print(f"Power Prediction URL: {scoring_uris['power']}")

You can view your model, it's deployments and URL endpoints by navigating to https://ml.azure.com/.

<img src="https://sguptasa.blob.core.windows.net/random/iiot_blog/iiot_azureml.gif" width=800>

## Step 5 - Model Inference: Real-time Scoring
We can now make HTTP REST calls from a web app, PowerBI, or directly from Databricks to the hosted model URI to score data directly

In [0]:
json.dumps([payload])

In [0]:
import os
import requests
import pandas as pd

payload = [{
  'angle':8,
  'rpm':6,
  'temperature':25,
  'humidity':50,
  'windspeed':5,
  'power':150,
  'age':10
}]

life_prediction_uri = "https://adb-5016390217096892.12.azuredatabricks.net/model/life_prediction/1/invocations" # Replace with your model URI from MLflow or AzureML
power_prediction_uri = "https://adb-5016390217096892.12.azuredatabricks.net/model/power_prediction/1/invocations" # Replace with your model URI from MLflow or AzureML
databricks_token = "????" # Replace with your Databricks Personal Access Token

def score_model(uri, payload):
  headers = {'Authorization': f'Bearer {databricks_token}'}
  data_json = payload
  response = requests.request(method='POST', headers=headers, url=uri, json=data_json)
  if response.status_code != 200:
    raise Exception(f'Request failed with status {response.status_code}, {response.text}')
  return response.json()

print(f'Predicted remaining life (in days) from model: {score_model(life_prediction_uri, payload)}')
print(f'Predicted power (in kwh) from model: {score_model(power_prediction_uri, payload)}')

### Step 6: Asset Optimization
We can now identify the optimal operating conditions for maximizing power output while also maximizing asset useful life. 

\\(Revenue = Price\displaystyle\sum_1^{365} Power_t\\)

\\(Cost = {365 \over Life_{rpm}} Price \displaystyle\sum_1^{24} Power_t \\)

Price\displaystyle\sum_{t=1}^{24})\\)

\\(Profit = Revenue - Cost\\)

\\(Power_t\\) and \\(Life\\) will be calculated by scoring many different RPM values in AzureML. The results can be visualized to identify the RPM that yields the highest profit.

In [0]:
# Construct a payload to send with the request
payload = [{
  'angle':8,
  'rpm':6,
  'temperature':25,
  'humidity':50,
  'windspeed':5,
  'power':150,
  'age':10
}]

# Iterate through 50 different RPM configurations and capture the predicted power and remaining life at each RPM
results = []
for rpm in range(1,15):
  payload[0]['rpm'] = rpm
  expected_power = score_model(power_prediction_uri, payload)[0]
  payload[0]['power'] = expected_power
  expected_life = -score_model(life_prediction_uri, payload)[0]
  results.append((rpm, expected_power, expected_life))
  
# Calculalte the Revenue, Cost and Profit generated for each RPM configuration
optimization_df = pd.DataFrame(results, columns=['RPM', 'Expected Power', 'Expected Life'])
optimization_df['Revenue'] = optimization_df['Expected Power'] * 24 * 365
optimization_df['Cost'] = optimization_df['Expected Power'] * 24 * 365 / optimization_df['Expected Life']
optimization_df['Profit'] = optimization_df['Revenue'] + optimization_df['Cost']

display(optimization_df)

RPM,Expected Power,Expected Life,Revenue,Cost,Profit
1,140.5946044921875,-15.44410514831543,1231608.7353515625,-79746.20241988578,1151862.5329316766
2,140.5946044921875,-15.44410514831543,1231608.7353515625,-79746.20241988578,1151862.5329316766
3,140.5946044921875,-15.44410514831543,1231608.7353515625,-79746.20241988578,1151862.5329316766
4,140.5946044921875,-15.44410514831543,1231608.7353515625,-79746.20241988578,1151862.5329316766
5,140.5946044921875,-22.364524841308597,1231608.7353515625,-55069.747472421535,1176538.987879141
6,143.43505859375,-22.364524841308597,1256491.11328125,-56182.32992638578,1200308.7833548642
7,145.69522094726562,-22.790855407714844,1276290.1354980469,-56000.097963239015,1220290.0375348078
8,147.95968627929688,-23.290658950805664,1296126.8518066406,-55650.07218320053,1240476.77962344
9,150.9168243408203,-23.776859283447266,1322031.381225586,-55601.598405637385,1266429.7828199486
10,150.9168243408203,-23.776859283447266,1322031.381225586,-55601.598405637385,1266429.7828199486


The optimal operating parameters for **WindTurbine-1** given the specified weather conditions is **11 RPM** for generating a maximum profit of **$1.4M**! Your results may vary due to the random nature of the sensor readings.

### Step 7: Data Serving and Visualization (not included in notebook)
Now that our models are created and the data is scored, we can use Azure Synapse with PowerBI to perform data warehousing and analyltic reporting to generate a report like the one below. 
<img src="https://sguptasa.blob.core.windows.net/random/iiot_blog/PBI_report.gif" width=800>