# Step 4: Model operationalization & Deployment

In this script, we load the model from the `Code/3_model_building.ipynb` Jupyter notebook and the labeled feature data set constructed in the `Code/2_feature_engineering.ipynb` notebook in order to build the model deployment artifacts. 


The remainder of this notebook details steps required to deploy and operationalize the model using Azure Machine Learning service SDK.

In [2]:
## setup our environment by importing required libraries
import json
import os
import shutil
import time

from pyspark.ml import Pipeline
from pyspark.ml.classification import RandomForestClassifier

# for creating pipelines and model
from pyspark.ml.feature import StringIndexer, VectorAssembler, VectorIndexer

# setup the pyspark environment
from pyspark.sql import SparkSession

# AML SDK libraries
from azureml.core import Workspace, Run
from azureml.core.model import Model
from azureml.core.image import ContainerImage
from azureml.core.conda_dependencies import CondaDependencies 
from azureml.core.webservice import AciWebservice,Webservice



We need to load the feature data set from memory to construct the operationalization schema.

In [4]:
features_file = 'featureengineering_files.parquet'
target_dir = "dbfs:/dataset/"

feat_data = spark.read.parquet(os.path.join(target_dir,features_file))
feat_data.limit(5).toPandas().head(5)

The steps that makes up model operationalization are:

- The model you trained in notebook 3_model_building
- A scoring script to show how to use the model
- Conda yml file containing packages need to be installed
- A configuration definition object to build the ACI


For more details, go to https://docs.microsoft.com/en-us/azure/machine-learning/service/concept-model-management-and-deployment

Here, we copy the model saved in notebook 03_building_model to local directory

In [7]:
model_name = 'pdmrfull.model'
model_local = "file:" + os.getcwd() + "/" + model_name
model_dir = os.path.join("dbfs:/model/", model_name)
dbutils.fs.cp(model_dir, model_local, True)
display(dbutils.fs.ls(model_local))

We register the model in the experiment in Azure Machine learning service

In [9]:
ws = Workspace.from_config()
model_name = 'pdmrfull.model'
model = Model.register(model_path= model_name, model_name=model_name , workspace=ws)
print("Registered:", model.name)

Here we define the conda dependencies used by scoring script.

In [11]:
conda_env = CondaDependencies.create(conda_packages=['pyspark'])
with open("conda_env.yml","w") as f:
    f.write(conda_env.serialize_to_string())

Here we define the scoring script that will be backed into docker image for prediction serving

In [13]:
%%writefile score.py

from azureml.core.model import Model
from pyspark.ml.feature import StringIndexer, VectorAssembler, VectorIndexer
from pyspark.ml import PipelineModel
import pyspark
import json

def init():
    
    global pipeline,spark
        
    spark = pyspark.sql.SparkSession.builder.appName("Predictive maintenance service").getOrCreate()
    model_path = Model.get_model_path('pdmrfull.model')
    pipeline = PipelineModel.load(model_path)
    

def run(raw_data):
    
    try:
        sc = spark.sparkContext
        input_list = json.loads(raw_data)
        input_rdd = sc.parallelize(input_list)
        input_df = spark.read.json(input_rdd)
        
        key_cols =['label_e','machineID','dt_truncated', 'failure','model_encoded','model']
        input_features = input_df.columns
        
        # Remove unseen features by the model during training
        input_features = [x for x in input_features if x not in set(key_cols)]
        
        
        va = VectorAssembler(inputCols=(input_features), outputCol='features')
        data = va.transform(input_df).select('machineID','features')
        score = pipeline.transform(data)
        predictions = score.collect()
        
        preds = [str(x['prediction']) for x in predictions]
        result = preds
    except Exception as e:
        result = str(e)
        
    return json.dumps({"result":result})

###  Provision ACI for image deployment

Note that this may take a couple of minutes for ACI deployment to complete

In [15]:
image_config = ContainerImage.image_configuration(runtime= "spark-py",
                                 execution_script="score.py",
                                 conda_file="conda_env.yml")

aci_config = AciWebservice.deploy_configuration(cpu_cores = 2, 
                                               memory_gb = 4, 
                                               tags = {'type': "predictive_maintenance"}, 
                                               description = "Predictive maintenance classifier")



aci_service_name = 'pred-maintenance-service'
print(aci_service_name)

aci_service = Webservice.deploy_from_model(workspace=ws, 
                                        name=aci_service_name,
                                        deployment_config = aci_config,
                                        models = [model],
                                        image_config = image_config
                                          )


aci_service.wait_for_deployment(True)
print(aci_service.state)

### Invoke web service endpoint for prediction

First we get a sample test observation that we can score. For this, we can randomly select a single record from the test data..

In [17]:
test_sample = (feat_data.sample(False, .8).limit(1))
excluded_cols = {'label_e','machineID','dt_truncated','failure','model_encoded','model'}
input_features = set(test_sample.columns)- excluded_cols


raw_input = test_sample.toJSON().collect()
prediction = aci_service.run(json.dumps(raw_input))

print(prediction)

Delete ACI service to free up resources

In [19]:
aci_service.delete()