Copyright (c) Microsoft Corporation. All rights reserved.

Licensed under the MIT License.

## Deploying a Real-Time Content Based Personalization Model

This notebook provides an example for how a business can use machine learning to automate content based personalization for their customers by using a recommendation system. Azure Databricks is used to train a model that predicts the probability a user will engage with an item. In turn, this estimate can be used to rank items based on the content that a user is most likely to consume.<br><br>
This notebook creates a scalable real-time scoring service for the Spark based models such as the Content Based Personalization model trained in the [MMLSpark-LightGBM-Criteo notebook](../02_model_content_based_filtering/mmlspark_lightgbm_criteo.ipynb).
<br><br>
### Architecture
<img src="https://recodatasets.z20.web.core.windows.net/images/lightgbm_criteo_arch.svg" alt="Architecture">

### Components
The following components are used in this architecture:<br>
- [Azure Blob Storage](https://azure.microsoft.com/en-us/services/storage/blobs/) is a storage service optimized for storing massive amounts of unstructured data. In this case, the input data is stored here.<br>
- [Azure Databricks](https://azure.microsoft.com/en-us/services/databricks/) is a managed Apache Spark cluster where model training and evaluating is performed.<br>
- [Azure Machine Learning service](https://azure.microsoft.com/en-us/services/machine-learning-service/) is used in this scenario to register the machine learning model. <br>
- [Azure Container Registry](https://azure.microsoft.com/en-us/services/container-registry/) is used to package the scoring script as a container image which is used to serve the model in production. <br>
- [Azure Kubernetes Service](https://azure.microsoft.com/en-us/services/kubernetes-service/) is used to deploy the trained models to web or app services. <br>

## Assumptions
In order to execute this notebook the following items are assumed:

1. A model has previously been trained as shown in the [mmlspark_lightgbm_criteo](../02_model_content_based_filtering/mmlspark_lightgbm_criteo.ipynb) notebook
2. This notebook is running in the same Azure Databricks workspace used to run the notebook in Assumption 1.
3. The Databricks cluster used has been prepared for operationalization (MML Spark and reco_utils are both installed)
  - See [Setup](../../SETUP.md) instructions for details
4. An Azure Machine Learning Service workspace has been setup in the same region as the Azure Databricks workspace used for model training
  - See [Create A Workspace](https://docs.microsoft.com/en-us/azure/machine-learning/service/setup-create-workspace) for more details
5. The Azure ML Workspace config.json has been uploaded to databrics at `dbfs:/aml_config/config.json`
  - See [Configure Environment](https://docs.microsoft.com/en-us/azure/machine-learning/service/how-to-configure-environment) and [Databricks CLI](https://docs.databricks.com/user-guide/dbfs-databricks-file-system.html#access-dbfs-with-the-databricks-cli)
6. An Azure Container Instance (ACI) has been registered for use your Azure subscription
  - See [Supported Services](https://docs.microsoft.com/en-us/azure/azure-resource-manager/resource-manager-supported-services#portal) for more details

## Score Service Steps
In this example, a "scoring service" is a function that is executed by a docker container. It takes in a post request with JSON formatted payload and produces a score based on a previously estimated model. In our case, we will use the model we estimated earlier that predicts the probability of a user-item interaction based on a set of numeric and categorical features. Because that model was trained using PySpark we will create a Spark session on a single instance (within the docker container) which will use [MML Spark Serving](https://github.com/Azure/mmlspark/blob/master/docs/mmlspark-serving.md) to execute the model on the received input data and return the probability of interaction. We will use Azure Machine Learning to create and run the docker container.

In order to create a scoring service, we will do the following steps:

1. Setup and authorize the Azure Machine Learning Workspace
2. Serialize the previously trained model and add it to the Azure Model Registry
3. Define the 'scoring service' script to execute the model
4. Define all the pre-requisites that that script requires
5. Use the model, the driver script, and the pre-requisites to create a Azure Container Image
6. Deploy the container image on a scalable platform Azure Kubernetes Service
7. Test the service

### Setup libraries and variables

The next few cells initialize the environment and variables: we import relevant libraries and set variables.

In [9]:
import os
import json
import shutil

from reco_utils.dataset.criteo import get_spark_schema, load_spark_df
from reco_utils.common.k8s_utils import qps_to_replicas, replicas_to_qps, nodes_to_replicas

from azureml.core import Workspace
from azureml.core import VERSION as azureml_version

from azureml.core.model import Model
from azureml.core.conda_dependencies import CondaDependencies 
from azureml.core.webservice import Webservice, AksWebservice
from azureml.core.image import ContainerImage
from azureml.core.compute import AksCompute, ComputeTarget

from math import floor

# Check core SDK version number
print("Azure ML SDK version: {}".format(azureml_version))

Azure ML SDK version: 1.0.18


## Configure Scoring Service Variables

In [10]:
MODEL_NAME = 'lightgbm_criteo.mml'  # this name must exactly match the name used to save the pipeline model in the estimation notebook
MODEL_DESCRIPTION = 'LightGBM Criteo Model'

# Setup AzureML assets (names must be lower case alphanumeric without spaces and between 3 and 32 characters)
# Azure ML Webservice
SERVICE_NAME = 'lightgbm-criteo'
# Azure ML Container Image
CONTAINER_NAME = SERVICE_NAME
CONTAINER_RUN_TIME = 'spark-PY'
# Azure Kubernetes Service (AKS)
AKS_NAME = 'predict-aks'

# Names of other files that are used below
CONDA_FILE = "deploy_conda.yaml"
DRIVER_FILE = "mmlspark_serving.py"

## Setup AzureML Workspace
Workspace configuration can be retrieved from the portal and uploaded to Databricks<br>
See [AzureML on Databricks](https://docs.microsoft.com/en-us/azure/machine-learning/service/how-to-configure-environment#azure-databricks)

In [None]:
ws = Workspace.from_config('/dbfs/aml_config/config.json')

## Prepare the Serialized Model

In order to create the docker container, the first thing we will do is to prepare the model we estimated in a prior step so that the docker container we are creating will be able to access it. We do this by *registering* the model to the workspace (see the Azure ML [documentation](https://docs.microsoft.com/en-us/azure/machine-learning/service/concept-model-management-and-deployment) for additional details).

The model has been stored as a directory on dbfs, and before we register it, we do a few additional steps to facilitate the process.

### Input Schema

Spark Serving requires the schema of the raw input data. Therefore, we get the schema and 
store it as an additional file in the model directory.


In [11]:
raw_schema = get_spark_schema()
with open(os.path.join('/dbfs', MODEL_NAME, 'schema.json'), 'w') as f:
  f.write(raw_schema.json())

### Copy the model from dbfs to local

While you can access files on DBFS with local file APIs, it is safer to explicitly copy saved models to and from dbfs, because the local file APIs can only access files smaller than 2 GB (see details [here](https://docs.databricks.com/user-guide/dbfs-databricks-file-system.html#access-dbfs-using-local-file-apis)).

In [13]:
model_local = os.path.join(os.getcwd(), MODEL_NAME)
dbutils.fs.cp('dbfs:/' + MODEL_NAME, 'file:' + model_local, recurse=True)

### Register the Model

Now we are ready to register the model in the Azure Machine Learning Workspace.

In [15]:
# First the model directory is compressed to minimize data transfer
zip_file = shutil.make_archive(base_name=MODEL_NAME, format='zip', root_dir=model_local)

# Register the model
model = Model.register(model_path=zip_file,  # this points to a local file
                       model_name=MODEL_NAME,  # this is the name the model is registered as
                       description=MODEL_DESCRIPTION,
                       workspace=ws)

print(model.name, model.description, model.version)

## Define the Scoring Script

Next, we need to create the driver script that will be executed when the service is called. The functions that need to be defined for scoring are `init()` and `run()`. The `init()` function is run when the service is created, and the `run()` function is run each time the service is called.

In our example, we use the `init()` function to load all the libraries, initialize the spark session, start the spark streaming service and load the model pipeline. We use the `run()` method to route the input to the spark streaming service to generate predictions (in this case the probability of an interaction) then return the output.

In [17]:
driver_file = '''
import os
import json
from time import sleep
from uuid import uuid4
from zipfile import ZipFile

from azureml.core.model import Model
from pyspark.ml import PipelineModel
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType
import requests


def init():
    """One time initialization of pyspark and model server"""

    spark = SparkSession.builder.appName("Model Server").getOrCreate()
    import mmlspark  # this is needed to load mmlspark libraries

    # extract and load model
    model_path = Model.get_model_path('{model_name}')
    with ZipFile(model_path, 'r') as f:
        f.extractall('model')
    model = PipelineModel.load('model')

    # load data schema saved with model
    with open(os.path.join('model', 'schema.json'), 'r') as f:
        schema = StructType.fromJson(json.load(f))

    input_df = (
        spark.readStream.continuousServer()
        .address("localhost", 8089, "predict")
        .load()
        .parseRequest(schema)
    )

    output_df = (
        model.transform(input_df)
        .makeReply("probability")
    )

    checkpoint = os.path.join('/tmp', 'checkpoints', uuid4().hex)
    server = (
        output_df.writeStream.continuousServer()
        .trigger(continuous="30 seconds")
        .replyTo("predict")
        .queryName("prediction")
        .option("checkpointLocation", checkpoint)
        .start()
    )

    # let the server finish starting
    sleep(1)


def run(input_json):
    try:
        response = requests.post(data=input_json, url='http://localhost:8089/predict')
        result = response.json()['probability']['values'][1]
    except Exception as e:
        result = str(e)
    
    return json.dumps({{"result": result}})
    
'''.format(model_name=MODEL_NAME)

# check syntax
exec(driver_file)

with open(DRIVER_FILE, "w") as f:
    f.write(driver_file)

## Define Dependencies

Next, we define the dependencies that are required by the driver script.

In [19]:
# azureml-sdk is required to load the registered model
conda_file = CondaDependencies.create(pip_packages=['azureml-sdk', 'requests']).serialize_to_string()

with open(CONDA_FILE, "w") as f:
    f.write(conda_file)

## Create the Image

We use the `ContainerImage` class to first configure the image with the defined driver and dependencies, then to create the image for use later.<br>
Building the image allows it to be downloaded and debugged locally using docker, see [troubleshooting instructions](https://docs.microsoft.com/en-us/azure/machine-learning/service/how-to-troubleshoot-deployment)

In [21]:
image_config = ContainerImage.image_configuration(execution_script=DRIVER_FILE, 
                                                  runtime=CONTAINER_RUN_TIME,
                                                  conda_file=CONDA_FILE,
                                                  tags={"runtime":CONTAINER_RUN_TIME, "model": MODEL_NAME})

image = ContainerImage.create(name=CONTAINER_NAME,
                              models=[model],
                              image_config=image_config,
                              workspace=ws)

image.wait_for_creation(show_output=True)

## Create the Service

Once we have created an image, we configure an Azure Kubernetes Service (AKS) and deploy the image as an AKS Webservice.

**NOTE** We *can* create a service directly from the registered model and image_configuration with the `Webservice.deploy_from_model()` function. 
 We create the image here explicitly and use `deploy_from_image()` for three reasons:

1. It provides more transparency in terms of the actual steps that are taking place
2. It provides more flexibility and control. For example, you can create images with names that are independent of the service that you are creating. This can be useful in cases where your images are used across multiple services.
3. It has potential for faster iteration and for more portability. Once we have an image, we can create a new deployment with the exact same code.

### Setup and Planning

When we are setting up a production service, we should start by estimating the load we would like to support. In order to estimate that, we need to estimate how long a single call is likely to take. In this example, we have done some local tests, and we have estimated that a single query may take approximately 100 ms to process. 

Based on a few additional assumptions, we can estimate how many replicas are required to support a targetted number of queries per second (qps). 

**Note:** This estimate should be used as a ballpark figure to get started, and we can verify performance with subsequent load testing to hone in on better estimates. See this [documentation](https://docs.microsoft.com/en-us/azure/machine-learning/service/how-to-deploy-and-where#aks) for more details.


We have written some helper functions to support this type of calculation, and we will use them to estimate the number of replicas required to support loads of 25, 50, 100, 200, and 350 queries per second, using 100 ms as our estimate of the time to complete a single query.

In [11]:
all_target_qps = [25, 50, 100, 200, 350]
query_processing_time = 0.1  ## in seconds
replica_estimates = {t: qps_to_replicas(t, query_processing_time) for t in all_target_qps}

Based on the size of our customer base and other considerations (e.g. upcoming announcements that may boost traffic, etc), we make a decision on the maximum load we want to support. In this example, we will say we want to support 100 queries per second, and that will indicate that we should use the corresponding number of replicas (15 based on the estimates above). 

Once we have the number of replicas, we then need to make sure we have enough resources (Cores and Memory) within our Azure Kubernetes Service to support that number of replicas. In order to estimate that number, we need to know how many cores are going to be assigned to each replica. This number can be fractional, because there are many use-cases where there are multiple replicas per core. You can see additional details [here](https://kubernetes.io/docs/tasks/configure-pod-container/assign-cpu-resource/#cpu-units). When we create the Webservice below, we will allocate 0.3 `cpu_cores` and 0.5 GB of memory to each replica. To support 15 replicas, we need `15*0.3` cores and `15*0.5` GB of memory. 


In [12]:
cpu_cores_per_replica = 0.3
print('{} cores required'.format(replica_estimates[100]*cpu_cores_per_replica))
print('{} GB of memory required'.format(replica_estimates[100]*0.5))

4.5 cores required
7.5 GB of memory required


### Provision Azure Kubernetes Service

Now that we have an estimate of the number of cores and amount of memory we need, we will configure and create the AKS cluster. By default, `AksCompute.provisioning_configuration()` will create a configuration that has 3 agents with `vm_size='Standard_D3_v2'`. Each Standard_D3_v2 virtual machine has 4 cores and 14 GB of memory, so the defaults result in a cluster with a combined 12 cores and 42 GB of memory, which are both sufficient to meet our estimated load requirements. 

**Note**: In this particular case, even though our load requirements are just 4.5 cores, we should **not** go below 12 cores in the AKS cluster. 12 cores is the minimum number of cores in AKS required for web services. See documentation for [details](https://docs.microsoft.com/en-us/azure/machine-learning/service/how-to-deploy-and-where#aks). We can use the `agent_count` and `vm_size` parameters to increase the number of cores above 12 if our load requirements demand it, but we should not use them to go below.

In [23]:
# Create AKS compute first

# Use the default configuration (can also provide parameters to customize)
prov_config = AksCompute.provisioning_configuration()

# Create the cluster
aks_target = ComputeTarget.create(
  workspace=ws, 
  name=AKS_NAME, 
  provisioning_configuration=prov_config
)

aks_target.wait_for_completion(show_output=True)

print(aks_target.provisioning_state)
print(aks_target.provisioning_errors)

### Consideration

Because our estimated load requirements are less than the minimums set by Azure Machine Learning, we should consider an alternate approach to estimating the number of replicas to use for the web service. If this is the only service that will run on the AKS cluster, then we are potentially wasting resources by not leveraging all of the compute resources. Initially, we used the expected load to estimate the number of replicas that should be used. Instead of that approach, we can also use the  number of cores in our cluster to estimate the maximum number of replicas that could be supported.

In order to estimate the maximum number of replicas, we do need to consider that there is some overhead on each node for the base kubernetes operations as well as the node's operating system and core functionality. We assume 10\% overhead in this case, but you can find more details [here](https://docs.microsoft.com/en-us/azure/aks/concepts-clusters-workloads).

**Note** we are using cores in this example, but we could also leverage memory requirements instead.


In [13]:
max_replicas_12_cores = nodes_to_replicas(
    n_cores_per_node=4, n_nodes=3, cpu_cores_per_replica=cpu_cores_per_replica
)

Once we have the number of replicas our cluster will support, we can then estimate the queries per second we believe the AKS cluster could support.

In [14]:
replicas_to_qps(max_replicas_12_cores, query_processing_time)

140

### Create the Webservice

Next, we will configure and create the webservice. In this configuration, we will say each replica will set `cpu_cores=cpu_cores_per_replica` (default `cpu_cores=0.1`). We are adjusting this value based on experience and prior testing with this service. 

If no arguments are passed to `AksWebservice.deploy_configuration()`, it uses `autoscale_enabled=True` with `autoscale_min_replicas=1` and `autoscale_max_replicas=10`. The max value does not meet our minimum requirements to support 100 queries per second, so we need to adjust it. We can adjust this value to either our estimate based on load (15) or our estimate based on the number that can be supported by the AKS cluster (36). In this example, we will set it to the value based on load to allow the AKS cluster to be used for other tasks or services.

In [24]:
webservice_config = AksWebservice.deploy_configuration(cpu_cores=cpu_cores_per_replica,
                                                       autoscale_enabled=True,
                                                       autoscale_max_replicas=replica_estimates[100])

# Deploy service using created image
aks_service = Webservice.deploy_from_image(
  workspace=ws, 
  name=SERVICE_NAME,
  deployment_config=webservice_config,
  image=image,
  deployment_target=aks_target
)

aks_service.wait_for_deployment(show_output=True)

## Test the Service

Next, we can use data from the `sample` data to test the service.

The service expects JSON as its payload, so we take the sample data, convert to a dictionary, then submit to the service endpoint.

In [26]:
# View the URI
url = aks_service.scoring_uri
print('AKS URI: {}'.format(url))

# Setup authentication using one of the keys from aks_service
headers = dict(Authorization='Bearer {}'.format(aks_service.get_keys()[0]))

In [27]:
# Grab some sample data
df = load_spark_df(size='sample', spark=spark, dbutils=dbutils)
data = df.head().asDict()
print(data)

In [28]:
# Send a request to the AKS cluster
response = requests.post(url=url, json=data, headers=headers)
print(response.json())

### Delete the Service

When you are done, you can delete the service to minimize costs. You can always redeploy from the image using the same command above.

In [30]:
# Uncomment the following line to delete the web service
# aks_service.delete()

In [31]:
aks_service.state