## <span style="color:#ff5f27">👨🏻‍🏫 Create Deployment </span>

In this notebook, you'll create a deployment for your recommendation system.

**NOTE Currently the transformer scripts are not implemented.**

## <span style="color:#ff5f27">📝 Imports </span>

In [1]:
# !pip install -r requirements.txt

In [1]:
import os

## <span style="color:#ff5f27">🔮 Connect to Hopsworks Feature Store </span>

In [2]:
import hopsworks

project = hopsworks.login()

# Connect to Hopsworks Model Registry
mr = project.get_model_registry()

dataset_api = project.get_dataset_api()

  from .autonotebook import tqdm as notebook_tqdm


Connected. Call `.close()` to terminate connection gracefully.

Logged in to project, explore it here https://c.app.hopsworks.ai:443/p/17565
Connected. Call `.close()` to terminate connection gracefully.


## <span style="color:#ff5f27">🚀 Ranking Model Deployment </span>


You start by deploying your ranking model. Since it is a CatBoost model you need to implement a `Predict` class that tells Hopsworks how to load the model and how to use it.

In [3]:
ranking_model = mr.get_best_model(
    name="ranking_model", 
    metric="fscore", 
    direction="max",
)
ranking_model

Model(name: 'ranking_model', version: 1)

In [4]:
%%writefile ranking_transformer.py

import os
import pandas as pd

import hopsworks
from opensearchpy import OpenSearch

import logging


class Transformer(object):
    
    def __init__(self):
        # Connect to Hopsworks
        project = hopsworks.connection().get_project()
        self.fs = project.get_feature_store()
        
        # Retrieve the 'videos' feature view
        self.videos_fv = self.fs.get_feature_view(
            name="videos", 
            version=1,
        )
        
        # Get list of feature names for videos
        self.video_features = [feat.name for feat in self.videos_fv.schema]
        
        # Retrieve the 'users' feature view
        self.users_fv = self.fs.get_feature_view(
            name="users", 
            version=1,
        )

        # Retrieve the 'candidate_embeddings' feature view
        self.candidate_index = self.fs.get_feature_view(
            name="candidate_embeddings", 
            version=1,
        )

        # Retrieve ranking model
        mr = project.get_model_registry()
        model = mr.get_model(
            name="ranking_model", 
            version=1,
        )
        
        # Extract input schema from the model
        input_schema = model.model_schema["input_schema"]["columnar_schema"]
        
        # Get the names of features expected by the ranking model
        self.ranking_model_feature_names = [feat["name"] for feat in input_schema]
            
    def preprocess(self, inputs):
        # Extract the input instance
        inputs = inputs["instances"][0]

        # Extract customer_id from inputs
        user_id = inputs["user_id"]
        
        # Search for candidate items
        neighbors = self.candidate_index.find_neighbors(
            inputs["query_emb"], 
            k=100,
        )
        neighbors = [neighbor[0] for neighbor in neighbors]
        
        # Get IDs of items already bought by the customer
        already_seen_videos_ids = self.fs.sql(
            f"SELECT video_id from interactions_1 WHERE user_id = '{user_id}'"
        ).values.reshape(-1).tolist()
        
        # Filter candidate items to exclude those already bought by the customer
        video_id_list = [
            video_id
            for video_id 
            in neighbors 
            if video_id
            not in already_seen_videos_ids
        ]
        video_id_df = pd.DataFrame({"video_id" : video_id_list})
        
        # Retrieve Article data for candidate items
        videos_data = [
            self.videos_fv.get_feature_vector({"video_id": video_id}) 
            for video_id 
            in video_id_list
        ]

        videos_df = pd.DataFrame(
            data=videos_data, 
            columns=self.video_features,
        )
        
        # Join candidate items with their features
        ranking_model_inputs = video_id_df.merge(
            videos_df, 
            on="video_id", 
            how="inner",
        )        
        
        # Add customer features
        user_features = self.users_fv.get_feature_vector(
            {"user_id": user_id}, 
            return_type="pandas",
        )
        
        ranking_model_inputs["user_id"] = user_features['age'].values[0]   
        ranking_model_inputs["gender"] = user_features["gender"].values[0] 
        ranking_model_inputs["age"] = user_features["age"].values[0] 
        ranking_model_inputs["country"] = user_features["country"].values[0] 
        
        # Select only the features required by the ranking model
        ranking_model_inputs = ranking_model_inputs[self.ranking_model_feature_names]
                
        return { 
            "inputs" : [{"ranking_features": ranking_model_inputs.values.tolist(), "video_ids": video_id_list}]
        }

    def postprocess(self, outputs):
        # Extract predictions from the outputs
        preds = outputs["predictions"]
        
        # Merge prediction scores and corresponding article IDs into a list of tuples
        ranking = list(zip(preds["scores"], preds["video_ids"]))
        
        # Sort the ranking list by score in descending order
        ranking.sort(reverse=True)
        
        # Return the sorted ranking list
        return { 
            "ranking": ranking,
        }

Overwriting ranking_transformer.py


In [5]:
# Copy transformer file into Hopsworks File System 
uploaded_file_path = dataset_api.upload(
    "ranking_transformer.py",    # File name to be uploaded
    "Resources",                 # Destination directory in Hopsworks File System 
    overwrite=True,              # Overwrite the file if it already exists
) 

# Construct the path to the uploaded transformer script
transformer_script_path = os.path.join(
    "/Projects",                 # Root directory for projects in Hopsworks
    project.name,                # Name of the current project
    uploaded_file_path,          # Path to the uploaded file within the project
)

Uploading: 100.000%|██████████████████████████████████████████████████████████████████████████████████████████████████████████| 4244/4244 elapsed<00:01 remaining<00:00


In [6]:
%%writefile ranking_predictor.py

import os
import joblib
import numpy as np

import logging

class Predict(object):
    
    def __init__(self):
        self.model = joblib.load(os.environ["ARTIFACT_FILES_PATH"] + "/ranking_model.pkl")

    def predict(self, inputs):
        # Extract ranking features and article IDs from the inputs
        features = inputs[0].pop("ranking_features")
        video_ids = inputs[0].pop("video_ids")
        
        # Log the extracted features
        logging.info("predict -> " + str(features))

        # Predict probabilities for the positive class
        scores = self.model.predict_proba(features).tolist()
        
        # Get scores of positive class
        scores = np.asarray(scores)[:,1].tolist() 

        # Return the predicted scores along with the corresponding article IDs
        return {
            "scores": scores, 
            "video_ids": video_ids,
        }

Overwriting ranking_predictor.py


In [7]:
# Upload predictor file to Hopsworks
uploaded_file_path = dataset_api.upload(
    "ranking_predictor.py", 
    "Resources", 
    overwrite=True,
)

# Construct the path to the uploaded script
predictor_script_path = os.path.join(
    "/Projects", 
    project.name, 
    uploaded_file_path,
)

Uploading: 100.000%|████████████████████████████████████████████████████████████████████████████████████████████████████████████| 891/891 elapsed<00:01 remaining<00:00


With that in place, you can finally deploy your model.

In [10]:
from hsml.transformer import Transformer

ranking_deployment_name = "rankingdeployment"

# Define transformer
ranking_transformer=Transformer(
    script_file=transformer_script_path, 
    resources={"num_instances": 0},
)

# Deploy ranking model
ranking_deployment = ranking_model.deploy(
    name=ranking_deployment_name,
    description="Deployment that search for video candidates and scores them based on user metadata",
    script_file=predictor_script_path,
    resources={"num_instances": 0},
    transformer=ranking_transformer,
)

Deployment created, explore it at https://c.app.hopsworks.ai:443/p/17565/deployments/240641
Before making predictions, start the deployment by using `.start()`


In [11]:
# Start the deployment
ranking_deployment.start()

Deployment is ready: 100%|███████████████████████████████████████████████████████████████████████████████████████████████████████████████| 6/6 [00:31<00:00,  5.23s/it]

Start making predictions by using `.predict()`





In [None]:
# Check logs in case of failure
# ranking_deployment.get_logs(component="predictor", tail=200)

In [12]:
def get_top_recommendations(ranked_candidates, k=3):
    return [candidate[-1] for candidate in ranked_candidates['ranking'][:k]]

In [13]:
# Define a test input example
test_ranking_input = {"instances": [{
    "user_id": "NT203T",
    "query_emb": [0.214135289,
     0.571055949,
     0.330709577,
     -0.225899458,
     -0.308674961,
     -0.0115124583,
     0.0730511621,
     -0.495835781,
     0.625569344,
     -0.0438038409,
     0.263472944,
     -0.58485353,
     -0.307070434,
     0.0414443575,
     -0.321789205,
     0.966559],
}]}

In [14]:
# Test ranking deployment
ranked_candidates = ranking_deployment.predict(test_ranking_input)

# Retrieve article ids of the top recommended items
recommendations = get_top_recommendations(ranked_candidates, k=3)
recommendations

RestAPIError: Metadata operation error: (url: http://acfdd5a4a839249e8bb85d8b9651a20b-928877002.us-east-2.elb.amazonaws.com/v1/models/rankingdeployment:predict). Server response: 
HTTP code: 500, HTTP reason: Internal Server Error, body: b'{"error":"HTTPError : HTTP 500: Feature \'user_id\' is missing from vector.Possible reasons: 1. There is no match in the given entry. Please check if the entry exists in the online feature store or provide the feature as passed_feature. 2. Required entries [user_id] or [user_id] are not provided."}', error code: , error msg: , user msg: 

 Check the model server logs by using `.get_logs()`

In [None]:
# Check logs in case of failure
# ranking_deployment.get_logs(component="transformer",tail=200)

---

## <span style="color:#ff5f27">🚀 Query Model Deployment </span>

Next, you'll deploy your query model.

In [15]:
# Retrieve the 'query_model' from the Model Registry
query_model = mr.get_model(
    name="query_model",
    version=1,
)

In [16]:
%%writefile querymodel_transformer.py

import os
import numpy as np
import pandas as pd
from datetime import datetime

import hopsworks

import logging


class Transformer(object):
    
    def __init__(self):            
        # Connect to the Hopsworks
        project = hopsworks.connection().get_project()
        ms = project.get_model_serving()
    
        # Retrieve the 'users' feature view
        fs = project.get_feature_store()
        self.users_fv = fs.get_feature_view(
            name="users", 
            version=1,
        )
        # Retrieve the ranking deployment 
        self.ranking_server = ms.get_deployment("rankingdeployment")
        
        
    def preprocess(self, inputs):
        # Check if the input data contains a key named "instances"
        # and extract the actual data if present
        inputs = inputs["instances"] if "instances" in inputs else inputs

        # Extract customer_id from the inputs
        user_id = inputs["user_id"]

        # Get customer features
        user_features = self.users_fv.get_feature_vector(
            {"user_id": user_id}, 
            return_type="pandas",
        )

        # Enrich inputs with customer age
        inputs["gender"] = user_features['gender'].values[0]
        inputs["age"] = user_features['age'].values[0] 
        inputs["country"] = user_features['country'].values[0]
        
        return {
            "instances" : [inputs]
        }
    
    def postprocess(self, outputs):
        # Return ordered ranking predictions
        return {
            "predictions": self.ranking_server.predict({ "instances": outputs["predictions"]}),
        }

Writing querymodel_transformer.py


In [17]:
# Copy transformer file into Hopsworks File System
uploaded_file_path = dataset_api.upload(
    "querymodel_transformer.py", 
    "Models", 
    overwrite=True,
)

# Construct the path to the uploaded script
transformer_script_path = os.path.join(
    "/Projects", 
    project.name, 
    uploaded_file_path,
)

Uploading: 100.000%|██████████████████████████████████████████████████████████████████████████████████████████████████████████| 1620/1620 elapsed<00:01 remaining<00:00


In [19]:
from hsml.transformer import Transformer

query_model_deployment_name = "querydeployment"

# Define transformer
query_model_transformer=Transformer(
    script_file=transformer_script_path, 
    resources={"num_instances": 0},
)

# Deploy the query model
query_model_deployment = query_model.deploy(
    name=query_model_deployment_name,
    description="Deployment that generates query embeddings from user and video features using the query model",
    resources={"num_instances": 0},
    transformer=query_model_transformer,
)

Deployment created, explore it at https://c.app.hopsworks.ai:443/p/17565/deployments/240642
Before making predictions, start the deployment by using `.start()`


At this point, you have registered your deployment. To start it up you need to run:

In [24]:
# Start the deployment
query_model_deployment.start()

Deployment is already running


In [None]:
# Check logs in case of failure
# query_model_deployment.get_logs(component="transformer", tail=20)

In [22]:
# Define a test input example
data = {
    "instances": {
        "user_id": "LW988J",
    }
}

# Test the deployment
ranked_candidates = query_model_deployment.predict(data)

# Retrieve article ids of the top recommended items
recommendations = get_top_recommendations(
    ranked_candidates['predictions'], 
    k=3,
)
recommendations

RestAPIError: Metadata operation error: (url: http://acfdd5a4a839249e8bb85d8b9651a20b-928877002.us-east-2.elb.amazonaws.com/v1/models/querydeployment:predict). Server response: 
HTTP code: 500, HTTP reason: Internal Server Error, body: b'{"error":"HTTPError : HTTP 500: Feature \'user_id\' is missing from vector.Possible reasons: 1. There is no match in the given entry. Please check if the entry exists in the online feature store or provide the feature as passed_feature. 2. Required entries [user_id] or [user_id] are not provided."}', error code: , error msg: , user msg: 

 Check the model server logs by using `.get_logs()`

In [25]:
query_model_deployment.get_logs()

Explore all the logs and filters in the Kibana logs at https://c.app.hopsworks.ai:443/p/17565/deployments/240642



RestAPIError: Metadata operation error: (url: https://c.app.hopsworks.ai/hopsworks-api/api/project/17565/serving/240642/logs). Server response: 
HTTP code: 404, HTTP reason: Not Found, body: b'{"errorCode":240027,"errorMsg":"Server logs not available"}', error code: 240027, error msg: Server logs not available, user msg: 

In [None]:
# Check logs in case of failure
# query_model_deployment.get_logs(component="transformer",tail=200)

Stop the deployment when you're not using it.

In [None]:
# Stop the ranking model deployment
ranking_deployment.stop()

# Stop the query model deployment
query_model_deployment.stop()

---