<i>Copyright (c) Microsoft Corporation. All rights reserved.</i>

<i>Licensed under the MIT License.</i>

# Building a Real-time Recommendation API

This reference architecture shows the full lifecycle of building a recommendation system. It walks through the creation of appropriate azure resources, training a recommendation model using Azure Databricks and deploying it as an API. It uses Azure Cosmos DB, Azure Machine Learning, and Azure Kubernetes Service. 

This architecture can be generalized for many recommendation engine scenarios, including recommendations for products, movies, and news. 

### Architecture
![architecture](https://recodatasets.blob.core.windows.net/images/reco-arch.png "Architecture")

**Scenario**: A media organization wants to provide movie or video recommendations to its users. By providing personalized recommendations, the organization meets several business goals, including increased click-through rates, increased engagement on site, and higher user satisfaction.

In this reference, we train and deploy a real-time recommender service API that can provide the top 10 movie recommendations for a given user. 

### Components
This architecture consists of the following key components:
* [Azure Databricks](https://docs.microsoft.com/en-us/azure/azure-databricks/what-is-azure-databricks) is used as a development environment to prepare input data and train the recommender model on a Spark cluster. Azure Databricks also provides an interactive workspace to run and collaborate on notebooks for any data processing or machine learning tasks. 
* [Azure Kubernetes Service](https://docs.microsoft.com/en-us/azure/aks/intro-kubernetes)(AKS) is used to deploy and operationalize a machine learning model service API on a Kubernetes cluster. AKS hosts the containerized model, providing scalability that meets throughput requirements, identity and access management, and logging and health monitoring. 
* [Azure Cosmos DB](https://docs.microsoft.com/en-us/azure/cosmos-db/introduction) is a globally distributed database service used to store the top 10 recommended movies for each user. Azure Cosmos DB is ideal for this scenario as it provides low latency (10 ms at 99th percentile) to read the top recommended items for a given user. 
* [Azure Machine Learning Service](https://docs.microsoft.com/en-us/azure/machine-learning/service/) is a service used to track and manage machine learning models, and then package and deploy these models to a scalable Azure Kubernetes Service environment.


### Table of Contents.
0. [File Imports](#0-File-Imports)
1. [Service Creation](#1-Service-Creation)
2. [Training](#2-Training)
3. [Operationalization](#3.-Operationalize-the-Recommender-Service)

## Setup

This notebook should be run on Azure Databricks. To import this notebook into your Azure Databricks Workspace, see instructions [here](https://docs.azuredatabricks.net/user-guide/notebooks/notebook-manage.html#import-a-notebook).

Setup for Azure Databricks should be completed by following the appropriate sections in the repository's [SETUP file](../../SETUP.md). 

Please note: This notebook **REQUIRES** that you add the dependencies to support operationalization. See the [SETUP file](../../SETUP.md) for details.

## 0 File Imports

In [None]:
import numpy as np
import os
import pandas as pd
import pprint
import shutil
import time, timeit
import urllib
import yaml
import json
import uuid
import matplotlib
import matplotlib.pyplot as plt

from azure.common.client_factory import get_client_from_cli_profile
from azure.mgmt.compute import ComputeManagementClient
import azure.mgmt.cosmosdb
import azureml.core
from azureml.core import Workspace
from azureml.core.run import Run
from azureml.core.experiment import Experiment
from azureml.core.model import Model
from azureml.core.image import ContainerImage
from azureml.core.compute import AksCompute, ComputeTarget
from azureml.core.webservice import Webservice, AksWebservice


import pydocumentdb
import pydocumentdb.document_client as document_client

import pyspark
from pyspark.ml.feature import StringIndexer
from pyspark.ml.recommendation import ALS
from pyspark.sql import Row
from pyspark.sql.types import StructType, StructField
from pyspark.sql.types import StringType, FloatType, IntegerType, LongType

from reco_utils.dataset import movielens
from reco_utils.dataset.cosmos_cli import find_collection, read_collection, read_database, find_database
from reco_utils.dataset.spark_splitters import spark_random_split
from reco_utils.evaluation.spark_evaluation import SparkRatingEvaluation, SparkRankingEvaluation

print("PySpark version:", pyspark.__version__)
print("Azure SDK version:", azureml.core.VERSION)

## 1 Service Creation
Modify the **Subscription ID** to the subscription you would like to deploy to.

#### Services created by this notebook:
1. [Azure ML Service](https://docs.databricks.com/user-guide/libraries.html)
1. [Azure Cosmos DB](https://azure.microsoft.com/en-us/services/cosmos-db/)
1. [Azure Container Registery](https://docs.microsoft.com/en-us/azure/container-registry/)
1. [Azure Container Instances](https://docs.microsoft.com/en-us/azure/container-instances/)
1. [Azure Application Insights](https://azure.microsoft.com/en-us/services/monitor/)
1. [Azure Storage](https://docs.microsoft.com/en-us/azure/storage/common/storage-account-overview)
1. [Azure Key Vault](https://azure.microsoft.com/en-us/services/key-vault/)
1. [Azure Kubernetes Service (AKS)](https://azure.microsoft.com/en-us/services/kubernetes-service/)

In [None]:
# Select the services names
short_uuid = str(uuid.uuid4())[:4]
prefix = "reco" + short_uuid
data = "mvl"
algo = "als"

# location to store the secrets file for cosmosdb
secrets_path = '/dbfs/FileStore/dbsecrets.json'
ws_config_path = '/dbfs/FileStore'

# Add your subscription ID
subscription_id = ""


In [None]:
# Resource group and workspace
resource_group = prefix + "_" + data
workspace_name = prefix + "_"+data+"_aml"
workspace_region = "westus2"
print("Resource group:", resource_group)

# Columns
userCol = "UserId"
itemCol = "MovieId"
ratingCol = "Rating"

# CosmosDB
location = workspace_region
account_name = resource_group + "-ds-sql"
# account_name for CosmosDB cannot have "_" and needs to be less than 31 chars
account_name = account_name.replace("_","-")[0:min(31,len(prefix))]
DOCUMENTDB_DATABASE = "recommendations"
DOCUMENTDB_COLLECTION = "user_recommendations_" + algo

# AzureML
history_name = 'spark-ml-notebook'
model_name = data+"-"+algo+"-reco.mml" #NOTE: The name of a asset must be only letters or numerals, not contain spaces, and under 30 characters
service_name = data + "-" + algo
experiment_name = data + "_"+ algo +"_Experiment"
# Name here must be <= 16 chars and only include letters, numbers and "-"
aks_name = prefix.replace("_","-")[0:min(12,len(prefix))] + '-aks'
# add a name for the container
container_image_name = '-'.join([data, algo])

train_data_path = data + "Train"
test_data_path = data + "Test"

### 1.1 Import or create the AzureML Workspace. 
This command will check if the AzureML Workspace exists or not, and will create the workspace if it doesn't exist.

In [None]:
ws = Workspace.create(name = workspace_name,
                      subscription_id = subscription_id,
                      resource_group = resource_group, 
                      location = workspace_region,
                      exist_ok=True)

# persist the subscription id, resource group name, and workspace name in aml_config/config.json.
ws.write_config(ws_config_path)

### 1.2 Create a Cosmos DB resource to store recommendation results:

In [None]:
## explicitly pass subscription_id in case user has multiple subscriptions
client = get_client_from_cli_profile(azure.mgmt.cosmosdb.CosmosDB,
                                    subscription_id=subscription_id)

async_cosmosdb_create = client.database_accounts.create_or_update(
    resource_group,
    account_name,
    {
        'location': location,
        'locations': [{
            'location_name': location
        }]
    }
)
account = async_cosmosdb_create.result()

my_keys = client.database_accounts.list_keys(
    resource_group,
    account_name
)

master_key = my_keys.primary_master_key
endpoint = "https://" + account_name + ".documents.azure.com:443/"

#db client
client = document_client.DocumentClient(endpoint, {'masterKey': master_key})

if find_database(client, DOCUMENTDB_DATABASE) == False:
    db = client.CreateDatabase({ 'id': DOCUMENTDB_DATABASE })
else:
    db = read_database(client, DOCUMENTDB_DATABASE)
# Create collection options
options = {
    'offerThroughput': 11000
}

# Create a collection
collection_definition = { 'id': DOCUMENTDB_COLLECTION, 'partitionKey': {'paths': ['/id'],'kind': 'Hash'} }
if find_collection(client,DOCUMENTDB_DATABASE,  DOCUMENTDB_COLLECTION) ==False:
    collection = client.CreateCollection(db['_self'], collection_definition, options)
else:
    collection = read_collection(client, DOCUMENTDB_DATABASE, DOCUMENTDB_COLLECTION)

In [None]:
secrets = {
  "Endpoint": endpoint,
  "Masterkey": master_key,
  "Database": DOCUMENTDB_DATABASE,
  "Collection": DOCUMENTDB_COLLECTION,
  "Upsert": "true"
}
with open(secrets_path, "w") as file:
    json.dump(secrets, file)

## 2 Training

Next, we will train an [Alternating Least Squares model](https://spark.apache.org/docs/2.2.0/ml-collaborative-filtering.html) is trained using the [MovieLens](https://grouplens.org/datasets/movielens/) dataset.

In [None]:
# top k items to recommend
TOP_K = 10

# Select Movielens data size: 100k, 1m, 10m, or 20m
MOVIELENS_DATA_SIZE = '100k'

### 2.1 Download the MovieLens dataset

In [None]:
# Note: The DataFrame-based API for ALS currently only supports integers for user and item ids.
schema = StructType(
    (
        StructField("UserId", IntegerType()),
        StructField("MovieId", IntegerType()),
        StructField("Rating", FloatType()),
        StructField("Timestamp", LongType()),
    )
)

data = movielens.load_spark_df(spark, size=MOVIELENS_DATA_SIZE, schema=schema, dbutils=dbutils)
data.show()

### 2.2 Split the data into train, test
There are several ways of splitting the data: random, chronological, stratified, etc., each of which favors a different real-world evaluation use case. We will split randomly in this example – for more details on which splitter to choose, consult [this guide](https://github.com/Microsoft/Recommenders/blob/master/notebooks/01_data/data_split.ipynb).

In [None]:
train, test = spark_random_split(data, ratio=0.75, seed=42)
print ("N train", train.cache().count())
print ("N test", test.cache().count())

### 2.3 Train the ALS model on the training data, and get the top-k recommendations for our testing data

To predict movie ratings, we use the rating data in the training set as users' explicit feedbacks. The hyperparameters used to estimate the model are set based on [this page](http://mymedialite.net/examples/datasets.html).

Under most circumstances, you would explore the hyperparameters and choose an optimal set based on some criteria. For additional details on this process, please see additional information in the deep dives [here](../04_model_select_and_optimize/hypertune_spark_deep_dive.ipynb).

In [None]:
header = {
    "userCol": "UserId",
    "itemCol": "MovieId",
    "ratingCol": "Rating",
}


als = ALS(
    rank=10,
    maxIter=15,
    implicitPrefs=False,
    alpha=0.1,
    regParam=0.05,
    coldStartStrategy='drop',
    nonnegative=True,
    **header
)

In [None]:
model = als.fit(train)

In the movie recommendation use case, recommending movies that have been rated by the users do not make sense. Therefore, the rated movies are removed from the recommended items.

In order to achieve this, we recommend all movies to all users, and then remove the user-movie pairs that exist in the training datatset.

In [None]:
# Get the cross join of all user-item pairs and score them.
users = train.select('UserId').distinct()
items = train.select('MovieId').distinct()
user_item = users.crossJoin(items)
dfs_pred = model.transform(user_item)

In [None]:
dfs_pred.show()

In [None]:
# Remove seen items.
dfs_pred_exclude_train = dfs_pred.alias("pred").join(
    train.alias("train"),
    (dfs_pred['UserId'] == train['UserId']) & (dfs_pred['MovieId'] == train['MovieId']),
    how='outer'
)

top_all = dfs_pred_exclude_train.filter(dfs_pred_exclude_train["train.Rating"].isNull()) \
    .select('pred.' + 'UserId', 'pred.' + 'MovieId', 'pred.' + "prediction")

top_all.show()

### 2.4 Evaluate how well ALS performs

Evaluate model performance using metrics such as Precision@K, Recall@K, [MAP](https://en.wikipedia.org/wiki/Evaluation_measures_\(information_retrieval\)) or [nDCG](https://en.wikipedia.org/wiki/Discounted_cumulative_gain). For a full guide on what metrics to evaluate your recommender with, consult [this guide](https://github.com/Microsoft/Recommenders/blob/master/notebooks/03_evaluate/evaluation.ipynb).

In [None]:
test.show()

In [None]:
rank_eval = SparkRankingEvaluation(test, top_all, k = TOP_K, col_user="UserId", col_item="MovieId", 
                                    col_rating="Rating", col_prediction="prediction", 
                                    relevancy_method="top_k")

In [None]:
# Evaluate Ranking Metrics

print("Model:\tALS",
      "Top K:\t%d" % rank_eval.k,
      "MAP:\t%f" % rank_eval.map_at_k(),
      "NDCG:\t%f" % rank_eval.ndcg_at_k(),
      "Precision@K:\t%f" % rank_eval.precision_at_k(),
      "Recall@K:\t%f" % rank_eval.recall_at_k(), sep='\n')

In [None]:
# Evaluate Rating Metrics

prediction = model.transform(test)
rating_eval = SparkRatingEvaluation(test, prediction, col_user="UserId", col_item="MovieId", 
                                    col_rating="Rating", col_prediction="prediction")

print("Model:\tALS rating prediction",
      "RMSE:\t%.2f" % rating_eval.rmse(),
      "MAE:\t%f" % rating_eval.mae(),
      "Explained variance:\t%f" % rating_eval.exp_var(),
      "R squared:\t%f" % rating_eval.rsquared(), sep='\n')

### 2.5 Save the model

In [None]:
model.write().overwrite().save(model_name)
model_local = "file:" + os.getcwd() + "/" + model_name
dbutils.fs.cp(model_name, model_local, True)

## 3. Operationalize the Recommender Service
Once the model is built with desirable performance, it will be operationalized to run as a REST endpoint to be utilized by a real time service. We will utilize [Azure Cosmos DB](https://azure.microsoft.com/en-us/services/cosmos-db/), [Azure Machine Learning Service](https://azure.microsoft.com/en-us/services/machine-learning-service/), and [Azure Kubernetes Service](https://docs.microsoft.com/en-us/azure/aks/intro-kubernetes) to operationalize the recommender service.

### 3.1 Create a look-up for Recommendations in Cosmos DB

First, the Top-10 recommendations for each user as predicted by the model are stored as a lookup table in Cosmos DB. At runtime, the service will return the Top-10 recommendations as precomputed and stored in Cosmos DB:

In [None]:
with open(secrets_path) as json_data:
    writeConfig = json.load(json_data)
    recs = model.recommendForAllUsers(10)
    recs.withColumn("id",recs[userCol].cast("string")).select("id", "recommendations."+ itemCol)\
    .write.format("com.microsoft.azure.cosmosdb.spark").mode('overwrite').options(**writeConfig).save()

### 3.2 Configure Azure Machine Learning

Next, Azure Machine Learning Service is used to create a model scoring image and deploy it to Azure Kubernetes Service as a scalable containerized service. To achieve this, a **scoring script** and an **environment config** should be created. The following shows the content of the two files.  

In the scoring script, we make a call to Cosmos DB to lookup the top 10 movies to recommend given an input User ID:

In [None]:
score_sparkml = """

import json
def init(local=False):
    global client, collection
    try:
      # Query them in SQL
      import pydocumentdb.document_client as document_client

      MASTER_KEY = '{key}'
      HOST = '{endpoint}'
      DATABASE_ID = "{database}"
      COLLECTION_ID = "{collection}"
      database_link = 'dbs/' + DATABASE_ID
      collection_link = database_link + '/colls/' + COLLECTION_ID
      
      client = document_client.DocumentClient(HOST, {'masterKey': MASTER_KEY})
      collection = client.ReadCollection(collection_link=collection_link)
    except Exception as e:
      collection = e
def run(input_json):      

    try:
      import json

      id = json.loads(json.loads(input_json)[0])['id']
      query = {'query': 'SELECT * FROM c WHERE c.id = "' + str(id) +'"' } #+ str(id)

      options = {'partitionKey':str(id)}
      document_link = 'dbs/{DOCUMENTDB_DATABASE}/colls/{DOCUMENTDB_COLLECTION}/docs/{0}'.format(id)
      result = client.ReadDocument(document_link, options);
  
    except Exception as e:
        result = str(e)
    return json.dumps(str(result)) #json.dumps({{"result":result}})
"""


with open(secrets_path) as json_data:
    writeConfig = json.load(json_data)
    score_sparkml = score_sparkml.replace("{key}",writeConfig['Masterkey']).replace("{endpoint}",writeConfig['Endpoint']).replace("{database}",writeConfig['Database']).replace("{collection}",writeConfig['Collection']).replace("{DOCUMENTDB_DATABASE}",DOCUMENTDB_DATABASE).replace("{DOCUMENTDB_COLLECTION}", DOCUMENTDB_COLLECTION)

    exec(score_sparkml)

    with open("score_sparkml.py", "w") as file:
        file.write(score_sparkml)

Next, create a environment config file with the dependencies needed:

In [None]:
%%writefile myenv_sparkml.yml

name: myenv
channels:
  - defaults
dependencies:
  - pip:
    - numpy==1.14.2
    - scikit-learn==0.19.1
    - pandas
    - azureml-core
    - pydocumentdb

Register your model:

In [None]:
mymodel = Model.register(model_path = model_name, # this points to a local file
                       model_name = model_name, # this is the name the model is registered as, am using same name for both path and name.                 
                       description = "ADB trained model",
                       workspace = ws)

print(mymodel.name, mymodel.description, mymodel.version)

### 3.3 Deploy the model as a Service on AKS

#### 3.3.1 Create a container for your model service:

In [None]:
# Create Image for Web Service
models = [mymodel]
runtime = "spark-py"
conda_file = 'myenv_sparkml.yml'
driver_file = "score_sparkml.py"

# image creation
from azureml.core.image import ContainerImage
myimage_config = ContainerImage.image_configuration(execution_script = driver_file, 
                                    runtime = runtime, 
                                    conda_file = conda_file)

image = ContainerImage.create(name = container_image_name,
                                # this is the model object
                                models = [mymodel],
                                image_config = myimage_config,
                                workspace = ws)

# Wait for the create process to complete
image.wait_for_creation(show_output = True)

#### 3.3.2 Create an AKS Cluster to run your container (this may take 20-25 minutes):

In [None]:
from azureml.core.compute import AksCompute, ComputeTarget

# Use the default configuration (can also provide parameters to customize)
prov_config = AksCompute.provisioning_configuration()

# Create the cluster
aks_target = ComputeTarget.create(workspace = ws, 
                                  name = aks_name, 
                                  provisioning_configuration = prov_config)

aks_target.wait_for_completion(show_output = True)

print(aks_target.provisioning_state)
print(aks_target.provisioning_errors)

#### 3.3.3 Deploy the container image to AKS:

In [None]:
#Set the web service configuration (using default here with app insights)
aks_config = AksWebservice.deploy_configuration(enable_app_insights=True)

# Webservice creation using single command, there is a variant to use image directly as well.
try:
    aks_service = Webservice.deploy_from_image(
      workspace=ws, 
      name=service_name,
      deployment_config = aks_config,
      image = image,
      deployment_target = aks_target
      )
    aks_service.wait_for_deployment(show_output=True)
except Exception:
    aks_service = Webservice.list(ws)[0]

### 3.4 Call the AKS model service
After the deployment, the service can be called with a user ID – the service will then look up the top 10 recommendations for that user in Cosmos DB and send back the results.
The following script demonstrates how to call the recommendation service API and view the result for the given user ID:

In [None]:
scoring_url = aks_service.scoring_uri
service_key = aks_service.get_keys()[0]

input_data = '["{\\"id\\":\\"496\\"}"]'.encode()

req = urllib.request.Request(scoring_url,data=input_data)
req.add_header("Authorization","Bearer {}".format(service_key))
req.add_header("Content-Type","application/json")

tic = time.time()
with urllib.request.urlopen(req) as result:
    res = result.readlines()
    print(res)
    
toc = time.time()
t2 = toc - tic
print("Full run took %.2f seconds" % (toc - tic))