# Building a Real-time Recommendation API

This reference architecture shows the full lifecycle of building a recommendation system. It walks through the creation of appropriate azure resources, training a recommendation model using a Virtual Machine or Databricks, and deploying it as an API. It uses Azure Cosmos DB, Azure Machine Learning, and Azure Kubernetes Service. 

This architecture can be generalized for many recommendation engine scenarios, including recommendations for products, movies, and news. 
### Architecture
![architecture](img/reco-arch.png "Architecture")

**Scenario**: A media organization wants to provide movie or video recommendations to its users. By providing personalized recommendations, the organization meets several business goals, including increased click-through rates, increased engagement on site, and higher user satisfaction.

In this reference, we train and deploy a real-time recommender service API that can provide the top 10 movie recommendations for a given user. 

### Dataflow
1. Track user behaviors. For example, a back-end service might log when a user rates a movie or clicks a product or news article.
2. Load the data into Azure Databricks from an available [data source](https://learn.microsoft.com/en-us/azure/databricks/data/data-sources/).
3. Prepare the data and split it into training and testing sets to train the model. ([This guide](https://github.com/Microsoft/Recommenders/blob/master/examples/01_prepare_data/data_split.ipynb) describes options for splitting data.)
4. Fit the [Spark Collaborative Filtering](https://spark.apache.org/docs/latest/ml-collaborative-filtering.html) model to the data.
5. Evaluate the quality of the model using rating and ranking metrics. ([This guide](https://github.com/Microsoft/Recommenders/blob/master/examples/03_evaluate/evaluation.ipynb) provides details about the metrics that you can use to evaluate your recommender.)
6. Precompute the top 10 recommendations per user and store as a cache in Azure Cosmos DB.
7. Deploy an API service to AKS using the Machine Learning APIs to containerize and deploy the API.
8. When the back-end service gets a request from a user, call the recommendations API hosted in AKS to get the top 10 recommendations and display them to the user.

### Components
This architecture consists of the following key components:
* [Azure Databricks](https://docs.microsoft.com/en-us/azure/azure-databricks/what-is-azure-databricks)<sup>1)</sup> is used as a development environment to prepare input data and train the recommender model on a Spark cluster. Azure Databricks also provides an interactive workspace to run and collaborate on notebooks for any data processing or machine learning tasks.
* [Azure Kubernetes Service](https://docs.microsoft.com/en-us/azure/aks/intro-kubernetes)(AKS) is used to deploy and operationalize a machine learning model service API on a Kubernetes cluster. AKS hosts the containerized model, providing scalability that meets throughput requirements, identity and access management, and logging and health monitoring. 
* [Azure Cosmos DB](https://docs.microsoft.com/en-us/azure/cosmos-db/introduction) is a globally distributed database service used to store the top 10 recommended movies for each user. Azure Cosmos DB is ideal for this scenario as it provides low latency (10 ms at 99th percentile) to read the top recommended items for a given user. 
* [Azure Machine Learning Service](https://docs.microsoft.com/en-us/azure/machine-learning/service/) is a service used to track and manage machine learning models, and then package and deploy these models to a scalable Azure Kubernetes Service environment.


### Table of Contents.
0. [File Imports](#0-File-Imports)
1. [Service Creation](#1-Service-Creation)
2. [Training and evaluation](#2-Training)
3. [Operationalization](#3.-Operationalize-the-Recommender-Service)

## Setup
To run this notebook on Azure Databricks, you should setup Azure Databricks by following the appropriate sections in the repository [SETUP instructions](https://raw.githubusercontent.com/microsoft/recommenders/main/SETUP.md) and import this notebook into your Azure Databricks Workspace (see instructions [here](https://docs.azuredatabricks.net/user-guide/notebooks/notebook-manage.html#import-a-notebook)).

Please note: This notebook **REQUIRES** that you add the dependencies to support **operationalization**. See [SETUP](https://raw.githubusercontent.com/microsoft/recommenders/main/SETUP.md) for details.

![libraries](img/databricks.jpg "Libraries")

## 0 File Imports

In [0]:
import os
import sys
import urllib

In [0]:
from azure.common.client_factory import get_client_from_cli_profile

from azure.identity import AzureCliCredential

import azure.mgmt.cosmosdb

import azureml.core
from azureml.core import Workspace
from azureml.core.model import Model
from azureml.core.compute import AksCompute, ComputeTarget
from azureml.core.compute_target import ComputeTargetException
from azureml.core.webservice import Webservice, AksWebservice
from azureml.exceptions import WebserviceException
from azureml.core import Environment
from azureml.core.environment import CondaDependencies
from azureml.core.model import InferenceConfig
from azureml.core.environment import SparkPackage

import pydocumentdb.document_client as document_client

from pyspark.ml.recommendation import ALS
from pyspark.sql.types import StructType, StructField
from pyspark.sql.types import FloatType, IntegerType, LongType

In [0]:
from recommenders.datasets import movielens
from recommenders.datasets.cosmos_cli import find_collection, read_collection, read_database, find_database
from recommenders.datasets.download_utils import maybe_download
from recommenders.datasets.spark_splitters import spark_random_split
from recommenders.evaluation.spark_evaluation import SparkRatingEvaluation, SparkRankingEvaluation
from recommenders.utils.notebook_utils import is_databricks
from recommenders.utils.timer import Timer
from recommenders.utils.spark_utils import start_or_get_spark

In [0]:
print("Azure SDK version:", azureml.core.VERSION)

Azure SDK version: 1.48.0


In [0]:
# Start spark session if needed
if not is_databricks():
    cosmos_connector = (
        "https://search.maven.org/remotecontent?filepath=com/azure/cosmos/spark/"
        "azure-cosmos-spark_3-1_2-12/4.15.0/azure-cosmos-spark_3-1_2-12-4.15.0.jar"
    )
    jar_filepath = maybe_download(url=cosmos_connector, filename="cosmos.jar")
    spark = start_or_get_spark("ALS", memory="10g", jars=[jar_filepath])
    sc = spark.sparkContext
print(sc)

<SparkContext master=local[*, 4] appName=Databricks Shell>


## 1 Service Creation
Modify the **Subscription ID** to the subscription you would like to deploy to and set the resource name variables.

#### Services created by this notebook:
1. [Azure ML Service](https://azure.microsoft.com/en-us/services/machine-learning-service/)
    1. [Azure ML Workspace](https://docs.microsoft.com/en-us/azure/machine-learning/concept-workspace)
    1. [Azure Application Insights](https://azure.microsoft.com/en-us/services/monitor/)
    1. [Azure Storage](https://docs.microsoft.com/en-us/azure/storage/common/storage-account-overview)
    1. [Azure Key Vault](https://azure.microsoft.com/en-us/services/key-vault/)    

1. [Azure Cosmos DB](https://azure.microsoft.com/en-us/services/cosmos-db/)
1. [Azure Kubernetes Service (AKS)](https://azure.microsoft.com/en-us/services/kubernetes-service/)

**Add your Azure subscription ID**

In [0]:
# Add your subscription ID
subscription_id = "my-subscription-id"

# Set your workspace name
workspace_name = "databricks-project"
resource_group = "DatabricksProjectRG"#"{}-rg".format(workspace_name)

# Set your region to deploy Azure ML workspace
location = "eastus"

# AzureML service and Azure Kubernetes Service prefix
service_name = "mvl-als"

In [0]:
# Login for Azure CLI so that AzureML can use Azure CLI login credentials
!az login

[93mTo sign in, use a web browser to open the page https://microsoft.com/devicelogin and enter the code F874G9FCP to authenticate.[0m
[
  {
    "cloudName": "AzureCloud",
    "homeTenantId": "home-tenant-id",
    "id": "my-subscription-id",
    "isDefault": true,
    "managedByTenants": [
      {
        "tenantId": "tenant-id"
      }
    ],
    "name": "Azure for Students",
    "state": "Enabled",
    "tenantId": "home-tenant-id",
    "user": {
      "name": "my-email",
      "type": "user"
    }
  }
]


In [0]:
# Change subscription if needed
!az account set --subscription {subscription_id}

In [0]:
# Check account
!az account show

{
  "environmentName": "AzureCloud",
  "homeTenantId": "home-tenant-id",
  "id": "my-subscription-id",
  "isDefault": true,
  "managedByTenants": [
    {
      "tenantId": "tenant-id"
    }
  ],
  "name": "Azure for Students",
  "state": "Enabled",
  "tenantId": "home-tenant-id",
  "user": {
    "name": "my-email",
    "type": "user"
  }
}


In [0]:
# CosmosDB
# account_name for CosmosDB cannot have "_" and needs to be less than 31 chars
account_name = "{}-ds-sql".format(workspace_name).replace("_", "-")[:31]
cosmos_database = "recommendations"
cosmos_collection = "user_recommendations_als"

# AzureML resource names
model_name = "{}-reco.mml".format(service_name)
aks_name = "{}-aks".format(service_name)

In [0]:
# top k items to recommend
TOP_K = 10

# Select MovieLens data size: 100k, 1m, 10m, or 20m
MOVIELENS_DATA_SIZE = '100k'

In [0]:
userCol = "UserId"
itemCol = "MovieId"
ratingCol = "Rating"

train_data_path = "train"
test_data_path = "test"

### 1.1 Import or create the AzureML Workspace. 
This command will check if the AzureML Workspace exists or not, and will create the workspace if it doesn't exist.

In [0]:
ws = Workspace.create(
    name=workspace_name,
    subscription_id=subscription_id,
    resource_group=resource_group, 
    location=location,
    exist_ok=True
)

If you run your code in unattended mode, i.e., where you can't give a user input, then we recommend to use ServicePrincipalAuthentication or MsiAuthentication.
Please refer to aka.ms/aml-notebook-auth for different authentication mechanisms in azureml-sdk.


### 1.2 Create a Cosmos DB to store recommendation results

This step will take some time to create CosmosDB resources.

In [0]:
# explicitly pass subscription_id in case user has multiple subscriptions
# client = get_client_from_cli_profile(
#     azure.mgmt.cosmosdb.CosmosDB,
#     subscription_id=subscription_id
# )

client = azure.mgmt.cosmosdb.CosmosDBManagementClient(AzureCliCredential(), subscription_id)

async_cosmosdb_create = client.database_accounts.begin_create_or_update(
    resource_group,
    account_name,
    {
        'location': location,
        'locations': [{
            'location_name': location
        }]
    }
)
account = async_cosmosdb_create.result()

my_keys = client.database_accounts.list_keys(resource_group, account_name)
master_key = my_keys.primary_master_key
endpoint = "https://" + account_name + ".documents.azure.com:443/"

# DB client
client = document_client.DocumentClient(endpoint, {'masterKey': master_key})

if not find_database(client, cosmos_database):
    db = client.CreateDatabase({'id': cosmos_database })
    print("Database created")
else:
    db = read_database(client, cosmos_database)
    print("Database found")

# Create collection options
options = dict(offerThroughput=11000)

# Create a collection
collection_definition = {
    'id': cosmos_collection,
    'partitionKey': {'paths': ['/id'],'kind': 'Hash'}
}
if not find_collection(client, cosmos_database, cosmos_collection):
    collection = client.CreateCollection(
        db['_self'], 
        collection_definition,
        options
    )
    print("Collection created")
else:
    collection = read_collection(client, cosmos_database, cosmos_collection)
    print("Collection found")
    


Database found
Collection found


In [0]:
# dbsecrets = dict(
#     Endpoint=endpoint, 
#     Masterkey=master_key, 
#     Database=cosmos_database, 
#     Collection=cosmos_collection, 
#     Upsert=True
# )

# dbsecrets = {
#     "accountEndpoint" : endpoint, 
#     "Masterkey" : master_key, 
#     "Database" : cosmos_database, 
#     "Collection" : cosmos_collection, 
#     "Upsert" : True
# }

## 2 Training

Next, we train an [Alternating Least Squares model](https://spark.apache.org/docs/latest/ml-collaborative-filtering.html) on [MovieLens](https://grouplens.org/datasets/movielens/) dataset.

### 2.1 Download the MovieLens dataset

In [0]:
# Note: The DataFrame-based API for ALS currently only supports integers for user and item ids.
schema = StructType(
    (
        StructField(userCol, IntegerType()),
        StructField(itemCol, IntegerType()),
        StructField(ratingCol, FloatType()),
    )
)

data = movielens.load_spark_df(spark, size=MOVIELENS_DATA_SIZE, schema=schema, dbutils=dbutils)
data.show()

  0%|          | 0.00/4.81k [00:00<?, ?KB/s]  4%|▎         | 177/4.81k [00:00<00:03, 1.49kKB/s] 30%|███       | 1.45k/4.81k [00:00<00:00, 7.65kKB/s]100%|██████████| 4.81k/4.81k [00:00<00:00, 17.6kKB/s]
+------+-------+------+
|UserId|MovieId|Rating|
+------+-------+------+
|   196|    242|   3.0|
|   186|    302|   3.0|
|    22|    377|   1.0|
|   244|     51|   2.0|
|   166|    346|   1.0|
|   298|    474|   4.0|
|   115|    265|   2.0|
|   253|    465|   5.0|
|   305|    451|   3.0|
|     6|     86|   3.0|
|    62|    257|   2.0|
|   286|   1014|   5.0|
|   200|    222|   5.0|
|   210|     40|   3.0|
|   224|     29|   3.0|
|   303|    785|   3.0|
|   122|    387|   5.0|
|   194|    274|   2.0|
|   291|   1042|   4.0|
|   234|   1184|   2.0|
+------+-------+------+
only showing top 20 rows



### 2.2 Split the data into train, test
There are several ways of splitting the data: random, chronological, stratified, etc., each of which favors a different real-world evaluation use case. We will split randomly in this example – for more details on which splitter to choose, consult [this guide](https://raw.githubusercontent.com/microsoft/recommenders/main/examples/01_prepare_data/data_split.ipynb).

In [0]:
train, test = spark_random_split(data, ratio=0.75, seed=42)
print("N train", train.cache().count())
print("N test", test.cache().count())

N train 74998
N test 25002


### 2.3 Train the ALS model on the training data

To predict movie ratings, we use the rating data in the training set as users' explicit feedback. The hyperparameters used to estimate the model are set based on [this page](http://mymedialite.net/examples/datasets.html).

Under most circumstances, you would explore the hyperparameters and choose an optimal set based on some criteria. For additional details on this process, please see additional information in the deep dives [here](https://raw.githubusercontent.com/microsoft/recommenders/main/examples/04_model_select_and_optimize/tuning_spark_als.ipynb).

In [0]:
als = ALS(
    rank=10,
    maxIter=15,
    implicitPrefs=False,
    alpha=0.1,
    regParam=0.05,
    coldStartStrategy='drop',
    nonnegative=True,
    userCol=userCol,
    itemCol=itemCol,
    ratingCol=ratingCol,
)

In [0]:
model = als.fit(train)

### 2.4 Get top-k recommendations for our testing data

In the movie recommendation use case, recommending movies that have been rated by the users do not make sense. Therefore, the rated movies are removed from the recommended items.

In order to achieve this, we recommend all movies to all users, and then remove the user-movie pairs that exist in the training dataset.

In [0]:
# Get the cross join of all user-item pairs and score them.
users = train.select(userCol).distinct()
items = train.select(itemCol).distinct()
user_item = users.crossJoin(items)
dfs_pred = model.transform(user_item)
dfs_pred.show()

+------+-------+----------+
|UserId|MovieId|prediction|
+------+-------+----------+
|   148|    148| 3.5317662|
|   148|    471| 3.1172194|
|   148|    496| 3.3190777|
|   148|    463|  4.757464|
|   148|    833|   3.68779|
|   148|   1238| 1.8957852|
|   148|   1088| 4.5473194|
|   148|   1342|  1.493924|
|   148|   1580|0.57647055|
|   148|   1591| 3.5548735|
|   148|    243| 2.7581022|
|   148|    392| 3.5030882|
|   148|    540|  2.205358|
|   148|    737|  3.483089|
|   148|    858| 0.9696207|
|   148|    897|0.79084826|
|   148|   1025| 3.0202923|
|   148|   1084|  4.958096|
|   148|    623|  3.010971|
|   148|   1127| 2.4659214|
+------+-------+----------+
only showing top 20 rows



In [0]:
# Remove seen items.
as_pred = dfs_pred.alias("pred")
as_train = train.alias("train")

dfs_pred_exclude_train = as_pred.join(
    as_train,
    (as_pred[userCol]==as_train[userCol]) & (as_pred[itemCol]==as_train[itemCol]),
    how='outer'
)
top_all = dfs_pred_exclude_train.filter(dfs_pred_exclude_train["train."+ratingCol].isNull()) \
    .select("pred."+userCol, "pred."+itemCol, "pred.prediction")

top_all.show()

+------+-------+----------+
|UserId|MovieId|prediction|
+------+-------+----------+
|     1|      7| 4.1338468|
|     1|      9| 4.3809776|
|     1|     20| 3.6322436|
|     1|     43| 2.6116803|
|     1|     46| 3.6096683|
|     1|     63| 2.2370367|
|     1|    117|  3.333944|
|     1|    118| 2.6852312|
|     1|    119|  4.625972|
|     1|    190|  4.299953|
|     1|    193| 4.1029973|
|     1|    255| 3.2740529|
|     1|    269|  4.250825|
|     1|    276| 4.0782523|
|     1|    278| 1.9464829|
|     1|    284| 3.0049326|
|     1|    285| 4.5487285|
|     1|    293|  4.090019|
|     1|    294| 3.0005083|
|     1|    300|  3.499889|
+------+-------+----------+
only showing top 20 rows



### 2.5 Evaluate how well ALS performs

Evaluate model performance using metrics such as Precision@K, Recall@K, [MAP@K](https://en.wikipedia.org/wiki/Evaluation_measures_(information_retrieval) or [nDCG@K](https://en.wikipedia.org/wiki/Discounted_cumulative_gain). For a full guide on what metrics to evaluate your recommender with, consult [this guide]../03_evaluate/evaluation.ipynb).

In [0]:
cols = {
    'col_user': userCol,
    'col_item': itemCol,
    'col_rating': ratingCol,
    'col_prediction': "prediction",
}

test.show()

+------+-------+------+
|UserId|MovieId|Rating|
+------+-------+------+
|     1|      3|   4.0|
|     1|      7|   4.0|
|     1|      9|   5.0|
|     1|     10|   3.0|
|     1|     14|   5.0|
|     1|     20|   4.0|
|     1|     24|   3.0|
|     1|     30|   3.0|
|     1|     31|   3.0|
|     1|     33|   4.0|
|     1|     35|   1.0|
|     1|     36|   2.0|
|     1|     43|   4.0|
|     1|     46|   4.0|
|     1|     47|   4.0|
|     1|     48|   5.0|
|     1|     50|   5.0|
|     1|     52|   4.0|
|     1|     56|   4.0|
|     1|     63|   2.0|
+------+-------+------+
only showing top 20 rows



In [0]:
# Evaluate Ranking Metrics
rank_eval = SparkRankingEvaluation(
    test, 
    top_all, 
    k=TOP_K,
    **cols
)

print(
    "Model:\tALS",
    "Top K:\t%d" % rank_eval.k,
    "MAP:\t%f" % rank_eval.map_at_k(),
    "NDCG:\t%f" % rank_eval.ndcg_at_k(),
    "Precision@K:\t%f" % rank_eval.precision_at_k(),
    "Recall@K:\t%f" % rank_eval.recall_at_k(), sep='\n'
)

Model:	ALS
Top K:	10
MAP:	0.002992
NDCG:	0.029724
Precision@K:	0.035737
Recall@K:	0.013052


In [0]:
# Evaluate Rating Metrics
prediction = model.transform(test)
rating_eval = SparkRatingEvaluation(
    test, 
    prediction, 
    **cols
)

print(
    "Model:\tALS rating prediction",
    "RMSE:\t%.2f" % rating_eval.rmse(),
    "MAE:\t%f" % rating_eval.mae(),
    "Explained variance:\t%f" % rating_eval.exp_var(),
    "R squared:\t%f" % rating_eval.rsquared(), sep='\n'
)

Model:	ALS rating prediction
RMSE:	0.95
MAE:	0.741856
Explained variance:	0.293860
R squared:	0.289563


### 2.6 Save the model

In [0]:
type(model)

Out[24]: pyspark.ml.recommendation.ALSModel

In [0]:
model_name

Out[25]: 'mvl-als-reco.mml'

In [0]:
(model
 .write()
 .overwrite()
 .save(model_name))

## 3. Operationalize the Recommender Service
Once the model is built with desirable performance, it will be operationalized to run as a REST endpoint to be utilized by a real time service. We will utilize [Azure Cosmos DB](https://azure.microsoft.com/en-us/services/cosmos-db/), [Azure Machine Learning Service](https://azure.microsoft.com/en-us/services/machine-learning-service/), and [Azure Kubernetes Service](https://docs.microsoft.com/en-us/azure/aks/intro-kubernetes) to operationalize the recommender service.

### 3.1 Create a look-up for Recommendations in Cosmos DB

First, the Top-10 recommendations for each user as predicted by the model are stored as a lookup table in Cosmos DB. At runtime, the service will return the Top-10 recommendations as precomputed and stored in Cosmos DB:

In [0]:
recs = model.recommendForAllUsers(10)
recs_topk = recs.withColumn("id", recs[userCol].cast("string")) \
    .select("id", "recommendations." + itemCol)
recs_topk.show()

+---+--------------------+
| id|             MovieId|
+---+--------------------+
|  1|[1643, 1449, 408,...|
|  3|[1368, 1512, 320,...|
|  6|[1643, 1062, 1126...|
| 12|[1612, 1245, 645,...|
| 13|[1643, 1347, 1062...|
| 16|[1643, 1449, 1467...|
| 20|[394, 1427, 538, ...|
| 22|[1643, 50, 1269, ...|
| 26|[1643, 1449, 114,...|
| 27|[1368, 1085, 1449...|
| 28|[695, 1449, 169, ...|
| 31|[1154, 1062, 1643...|
| 34|[1512, 1203, 1368...|
| 40|[1643, 1612, 958,...|
| 44|[1643, 50, 408, 1...|
| 47|[838, 1643, 1591,...|
| 52|[1449, 1612, 64, ...|
| 53|[1449, 496, 318, ...|
| 54|[914, 899, 1268, ...|
| 57|[1612, 1643, 1005...|
+---+--------------------+
only showing top 20 rows



In [0]:
dbsecrets = {
    "spark.cosmos.accountEndpoint" : endpoint, 
    "spark.cosmos.accountKey" : master_key, 
    "spark.cosmos.database" : cosmos_database, 
    "spark.cosmos.container" : cosmos_collection, 
    "Upsert" : True
}


# Save data to CosmosDB
(recs_topk.coalesce(1)
 .write
 .format("cosmos.oltp")
 .mode('Append')
 .options(**dbsecrets)
 .save())

### 3.2 Configure Azure Machine Learning

Next, Azure Machine Learning Service is used to create a model scoring image and deploy it to Azure Kubernetes Service as a scalable containerized service. To achieve this, a **scoring script** should be created. In the script, we make a call to Cosmos DB to lookup the top 10 movies to recommend given an input User ID.

In [0]:
score_sparkml = """
import json
import pydocumentdb.document_client as document_client

def init(local=False):
    global client, collection
    try:
        client = document_client.DocumentClient('{endpoint}', dict(masterKey='{key}'))
        collection = client.ReadCollection(collection_link='dbs/{database}/colls/{collection}')
    except Exception as e:
        collection = e

def run(input_json):
    try:
        # Query them in SQL
        id = str(json.loads(json.loads(input_json)[0])['id'])
        query = dict(query='SELECT * FROM c WHERE c.id = "' + id +'"')
        options = dict(partitionKey=str(id))
        document_link = 'dbs/{database}/colls/{collection}/docs/' + id
        result = client.ReadDocument(document_link, options);  
    except Exception as e:
        result = str(e)
    return json.dumps(str(result))
""".format(key=dbsecrets['spark.cosmos.accountKey'], 
           endpoint=dbsecrets['spark.cosmos.accountEndpoint'], 
           database=dbsecrets['spark.cosmos.database'], 
           collection=dbsecrets['spark.cosmos.container'])

# test validity of python string
exec(score_sparkml)

with open("score_sparkml.py", "w") as file:
    file.write(score_sparkml)

Register your model:

In [0]:
print(type(model))

<class 'pyspark.ml.recommendation.ALSModel'>


In [0]:
model_name

Out[31]: 'mvl-als-reco.mml'

In [0]:
display(dbutils.fs.ls('/mvl-als-reco.mml/'))

path,name,size,modificationTime
dbfs:/mvl-als-reco.mml/itemFactors/,itemFactors/,0,1672557970000
dbfs:/mvl-als-reco.mml/metadata/,metadata/,0,1672557964000
dbfs:/mvl-als-reco.mml/userFactors/,userFactors/,0,1672557968000


In [0]:
!ls /dbfs/

FileStore   databricks-datasets  mvl-als-reco	   mvl-als-recommend
databricks  databricks-results	 mvl-als-reco.mml  tmp


In [0]:
mymodel = Model.register(
    model_path='/dbfs/' + model_name + '/',  # this points to a local file
    model_name=model_name,  # this is the name the model is registered as
    description="AML trained model",
    workspace=ws
)

print(mymodel.name, mymodel.description, mymodel.version)

Registering model mvl-als-reco.mml
mvl-als-reco.mml AML trained model 3


### 3.3 Deploy the model as a Service on AKS

#### 3.3.1 Create an Environment for your model:

In [0]:
env = Environment(name='sparkmlenv')

# Specify a public image from microsoft/mmlspark as base image
env.docker.base_image="microsoft/mmlspark:0.15"

pip = [
    'azureml-defaults', 
    'numpy==1.14.2', 
    'scikit-learn==0.19.1', 
    'pandas', 
    'pydocumentdb'
]

# Add dependencies needed for inferencing
env.python.conda_dependencies = CondaDependencies.create(pip_packages=pip)
env.inferencing_stack_version = "latest"

# Add spark packages
env.spark.precache_packages = True
env.spark.repositories = ["https://mmlspark.azureedge.net/maven"]
env.spark.packages= [
    SparkPackage("com.microsoft.ml.spark", "mmlspark_2.11", "0.15"),
    SparkPackage("com.microsoft.azure", artifact="azure-storage", version="2.0.0"),
    SparkPackage(group="org.apache.hadoop", artifact="hadoop-azure", version="2.7.0")
]



#### 3.3.2 Create an AKS Cluster to run your container
This may take 20 to 30 minutes depending on the cluster size.

In [0]:
resource_group2 = "DatabricksProjectRG2"

location2 = "uksouth"

ws2 = Workspace.create(
    name=workspace_name,
    subscription_id=subscription_id,
    resource_group=resource_group2, 
    location=location2,
    exist_ok=True
)

Deploying AppInsights with name databricinsights276fe04f.
Deployed AppInsights with name databricinsights276fe04f. Took 6.24 seconds.
Deploying KeyVault with name databrickeyvaultc2154c27.
Deploying StorageAccount with name databricstorage8a15bd358.
Deployed KeyVault with name databrickeyvaultc2154c27. Took 22.24 seconds.
Deployed StorageAccount with name databricstorage8a15bd358. Took 25.35 seconds.
Deploying Workspace with name databricks-project.
Deployed Workspace with name databricks-project. Took 19.26 seconds.


In [0]:
mymodel = Model.register(
    model_path='/dbfs/' + model_name + '/',  # this points to a local file
    model_name=model_name,  # this is the name the model is registered as
    description="AML trained model",
    workspace=ws2
)

print(mymodel.name, mymodel.description, mymodel.version)

Registering model mvl-als-reco.mml
mvl-als-reco.mml AML trained model 1


In [0]:
# Verify that cluster does not exist already
try:
    aks_target = ComputeTarget(workspace=ws, name=aks_name)
    print("Found existing cluster, use it.")
except ComputeTargetException:
    # Create the cluster using the default configuration (can also provide parameters to customize)
    prov_config = AksCompute.provisioning_configuration()
    aks_target = ComputeTarget.create(
        workspace=ws, 
        name=aks_name, 
        provisioning_configuration=prov_config
    )
    aks_target.wait_for_completion(show_output = True)
    print(aks_target.provisioning_state)
    # To check any error logs, print(aks_target.provisioning_errors)

Creating.......................................................................................................
SucceededProvisioning operation finished, operation "Succeeded"
Succeeded


In [0]:
# Create an Inferencing Configuration with your environment and scoring script
inference_config = InferenceConfig(
    environment=env,
    entry_script="score_sparkml.py"
)

# Set the web service configuration (using default here with app insights)
aks_config = AksWebservice.deploy_configuration(enable_app_insights=True)

# Webservice creation using single command
try:
    aks_service = Model.deploy(
        workspace=ws,
        models=[mymodel],
        name=service_name,
        inference_config=inference_config,
        deployment_config=aks_config,
        deployment_target=aks_target
    )
    aks_service.wait_for_deployment(show_output=True)
except WebserviceException:
    # Retrieve existing service.
    aks_service = Webservice(ws, name=service_name)
    print("Retrieved existing service")

Running....................................................................................................................
SucceededAKS service creation operation finished, operation "Succeeded"


### 3.4 Call the AKS model service
After the deployment, the service can be called with a user ID – the service will then look up the top 10 recommendations for that user in Cosmos DB and send back the results.
The following script demonstrates how to call the recommendation service API and view the result for the given user ID:

In [0]:
import json

scoring_url = aks_service.scoring_uri
service_key = aks_service.get_keys()[0]

input_data = '["{\\"id\\":\\"496\\"}"]'.encode()

req = urllib.request.Request(scoring_url, data=input_data)
req.add_header("Authorization","Bearer {}".format(service_key))
req.add_header("Content-Type","application/json")

with Timer() as t: 
    with urllib.request.urlopen(req) as result:
        res = result.read()
        resj = json.loads(
            # Cleanup to parse into a json object
            res.decode("utf-8")
            .replace("\\", "")
            .replace('"', "")
            .replace("'", '"')
        )
        print(json.dumps(resj, indent=4))
    
print("Full run took %.2f seconds" % t.interval)

{
    "MovieId": [
        320,
        1589,
        262,
        1344,
        958,
        889,
        1368,
        645,
        919,
        1137
    ],
    "id": "496",
    "_rid": "34hEAIe9pterAQAAAAAACA==",
    "_self": "dbs/34hEAA==/colls/34hEAIe9ptc=/docs/34hEAIe9pterAQAAAAAACA==/",
    "_etag": "6d006b74-0000-0100-0000-5f25f0550000",
    "_attachments": "attachments/",
    "_ts": 1596321877
}
Full run took 0.05 seconds


## Appendix - Realtime scoring with AzureML

In the previous cells, we utilized Cosmos DB to cache the recommendation results for realtime serving. Alternatively, we can generate recommendation results on demand by using the model we deployed. Following scripts load the registered model and use it for recommendation:

* *score_sparkml.py*
    ```
    import json
    import os
    from pyspark.ml.recommendation import ALSModel

    # Note, set `model_name`, `userCol`, and `itemCol` defined earlier.
    model_name = "mvl-als-reco.mml"
    userCol = "UserId"
    itemCol = "MovieId"

    def init(local=False):
        global model

        # Load ALS model.
        model_path = os.path.join(os.getenv('AZUREML_MODEL_DIR'), model_name)
        model = ALSModel.load(model_path)

    def run(input_json):
        js = json.loads(json.loads(input_json)[0])
        id = str(js['id'])
        k = js.get('k', 10)

        # Use the model to get recommendation.
        recs = model.recommendForAllUsers(k)
        recs_topk = recs.withColumn('id', recs[userCol].cast("string")).select(
            'id', "recommendations." + itemCol
        )
        result = recs_topk[recs_topk.id==id].collect()[0].asDict()

        return json.dumps(str(result))
    ```

* Call the AKS model service
    ```
    # Get a recommendation of 10 movies
    input_data = '["{\\"id\\":\\"496\\",\\"k\\":10}"]'.encode()

    req = urllib.request.Request(scoring_url, data=input_data)
    req.add_header("Authorization","Bearer {}".format(service_key))
    req.add_header("Content-Type","application/json")
    
    ...
    ```