<img src="https://developer.download.nvidia.com/notebooks/dlsw-notebooks/merlin_merlin_01-building-recommender-systems-with-merlin/nvidia_logo.png" style="width: 90px; float: right;"> 

# Building Online Multi-Stage Recsys Components
The figure below represents a **four-stage recommender system**. This is more complex process than only training a single model and deploying it, and it is much more realistic and closer to what's happening in the real-world recommender production systems. The models and data to perform the bottom row of tasks were previously completed in the first notebook [here](...).

![img](https://raw.githubusercontent.com/RedisVentures/Redis-Recsys/master/assets/OnlineMultiStageRecsys.png)

In this notebook, we are going to prepare the assets on the bottom row to deploy a four-stage recommender system on [Triton Inference Server](https://github.com/triton-inference-server/server). 

To learn more about the four-stage recommender systems, you can listen to Even Oldridge's [Moving Beyond Recommender Models talk](https://www.youtube.com/watch?v=5qjiY-kLwFY&list=PL65MqKWg6XcrdN4TJV0K1PdLhF_Uq-b43&index=7) at KDD'21 and read more [in this blog post](https://eugeneyan.com/writing/system-design-for-discovery/).

In addition to NVIDIA Merlin libraries and the Triton Inference Server client library, we use two external libraries in these series of examples:

- [Feast](https://docs.feast.dev/): an end-to-end open source feature store library for machine learning
- [Redis](https://github.com/redis/redis-py): a low-latency key-value store and ANN index

## Steps
1) [**Feature Store Setup**](#Feature-Store-Setup)
2) [**Redis ANN Index Setup**](#Redis-ANN-Index-Setup)

### Import required libraries and functions

*These notebooks are developed and tested using `merlin-tensorflow:22.11` container on [NVIDIA's docker registry](https://catalog.ngc.nvidia.com/containers?filters=&orderBy=dateModifiedDESC&query=merlin).*

In [23]:
import warnings
warnings.filterwarnings("ignore")

import os
import feast
import merlin.models.tf as mm
import nvtabular as nvt
import numpy as np
import tensorflow as tf


from merlin.datasets.ecommerce import transform_aliccp
from merlin.schema.tags import Tags
from merlin.io.dataset import Dataset
from nvtabular.ops import *

# for running this example on CPU, comment out the line below
# os.environ["TF_GPU_ALLOCATOR"] = "cuda_malloc_async"

First, we define our input path and feature repo path.

In [2]:
# Define output path for data
DATA_DIR = "/model-data/aliccp"
BASE_DIR = "/workdir"

Next, we need to load the previously trained assets. If you have your own great, make sure they end up in the same folder structure as the ones we will pull from the publically hosted S3 bucket below

## Feature Store Setup

We need to create a Feast feature repository. [Feast](https://feast.dev/) is an end-to-end open source feature store for machine learning. Feast (Feature Store) is a customizable operational data system that re-uses existing infrastructure to manage and serve machine learning features to real-time models.

Our feature repo will live at the defined path below:

In [3]:
# Feature repo path
feature_repo_path = os.path.join(BASE_DIR, "feature_repo")

### Prepare User and Item features

In [4]:
from merlin.models.utils.dataset import unique_rows_by_features

# Load pre-generated User features file
user_features = Dataset(os.path.join(DATA_DIR, "user_features.parquet")).to_ddf().compute()
user_features.head()

Unnamed: 0,user_id,user_shops,user_profile,user_group,user_gender,user_age,user_consumption_2,user_is_occupied,user_geography,user_intentions,user_brands,user_categories,user_id_raw
0,1,1,1,1,1,1,1,1,1,1,1,1,7
1,2,2,1,1,1,1,1,1,1,2,2,2,8
2,3,3,1,1,1,1,1,1,1,3,3,3,6
3,4,4,1,1,1,1,1,1,1,4,4,4,9
4,5,5,1,1,1,1,1,1,1,5,5,5,5


We will artificially add `datetime` and `created` timestamp columns to our user_features dataframe. This required by Feast to track the user-item features and their creation time and to determine which version to use when we query Feast.

In [5]:
from datetime import datetime

user_features["datetime"] = datetime.now()
user_features["datetime"] = user_features["datetime"].astype("datetime64[ns]")
user_features["created"] = datetime.now()
user_features["created"] = user_features["created"].astype("datetime64[ns]")
user_features.head()

Unnamed: 0,user_id,user_shops,user_profile,user_group,user_gender,user_age,user_consumption_2,user_is_occupied,user_geography,user_intentions,user_brands,user_categories,user_id_raw,datetime,created
0,1,1,1,1,1,1,1,1,1,1,1,1,7,2023-02-01 03:14:13.397906,2023-02-01 03:14:13.399280
1,2,2,1,1,1,1,1,1,1,2,2,2,8,2023-02-01 03:14:13.397906,2023-02-01 03:14:13.399280
2,3,3,1,1,1,1,1,1,1,3,3,3,6,2023-02-01 03:14:13.397906,2023-02-01 03:14:13.399280
3,4,4,1,1,1,1,1,1,1,4,4,4,9,2023-02-01 03:14:13.397906,2023-02-01 03:14:13.399280
4,5,5,1,1,1,1,1,1,1,5,5,5,5,2023-02-01 03:14:13.397906,2023-02-01 03:14:13.399280


In [6]:
# Write parquet file to feature_repo
user_features.to_parquet(
    os.path.join(feature_repo_path, "data", "user_features.parquet")
)

In [7]:
# Load pre-generated Item features file
item_features = Dataset(os.path.join(DATA_DIR, "item_features.parquet")).to_ddf().compute()
item_features.head()

Unnamed: 0,item_id,item_category,item_shop,item_brand,item_id_raw
0,1,1,1,1,7
1,2,2,2,2,6
2,3,3,3,3,8
3,4,4,4,4,9
4,5,5,5,5,5


In [8]:
# Append timestamps
item_features["datetime"] = datetime.now()
item_features["datetime"] = item_features["datetime"].astype("datetime64[ns]")
item_features["created"] = datetime.now()
item_features["created"] = item_features["created"].astype("datetime64[ns]")
item_features.head()

Unnamed: 0,item_id,item_category,item_shop,item_brand,item_id_raw,datetime,created
0,1,1,1,1,7,2023-02-01 03:14:13.446295,2023-02-01 03:14:13.447238
1,2,2,2,2,6,2023-02-01 03:14:13.446295,2023-02-01 03:14:13.447238
2,3,3,3,3,8,2023-02-01 03:14:13.446295,2023-02-01 03:14:13.447238
3,4,4,4,4,9,2023-02-01 03:14:13.446295,2023-02-01 03:14:13.447238
4,5,5,5,5,5,2023-02-01 03:14:13.446295,2023-02-01 03:14:13.447238


In [9]:
# Write parquet file to feature_repo
item_features.to_parquet(
    os.path.join(feature_repo_path, "data", "item_features.parquet")
)

### Register features
The Feast feature registry is a central catalog of all the feature definitions and their related metadata (read more [here](https://docs.feast.dev/getting-started/architecture-and-components/registry)). We have defined our user and item features definitions in the `user_features.py` and  `item_features.py` files.

With `FeatureView()` users can register data sources in their organizations into Feast, and then use those data sources for both training and online inference. In the `user_features.py` and `item_features.py` files, we are telling Feast where to find user and item features.

Before we move on to the next steps, we need to perform `feast apply`command as directed below.  With that, we register our features, we can apply the changes to create our feature registry and store all entity and feature view definitions in a local Redis database or a cloud deployed Redis if you're using the ``cloud-deployment`` folder.

> NOTE: Be sure to have updated the feature_store.yaml with the correct Redis address if deploying in the cloud

In [10]:
%cd $feature_repo_path
!feast apply

/workdir/feature_repo
[1m[94mNo changes to registry
Deploying infrastructure for [1m[32muser_features[0m
Deploying infrastructure for [1m[32mitem_features[0m


### Materialize (Load) features from Parquet into Redis

After we execute `apply` and registered our features and created our online local store, now we need to perform [materialization](https://docs.feast.dev/how-to-guides/running-feast-in-production) operation. This is done to keep our online store up to date and get it ready for prediction. For that we need to run a job that loads feature data from our feature view sources into our online store. As we add new features to our offline stores, we can continuously materialize them to keep our online store up to date by finding the latest feature values for each user. 

When you run the `feast materialize ..` command below, you will see a message <i>Materializing 2 feature views from 1995-01-01 01:01:01+00:00 to 2025-01-01 01:01:01+00:00 into the sqlite online store </i>  will be printed out.

Note that materialization step takes some time.. 

In [11]:
!feast materialize 1995-01-01T01:01:01 2025-01-01T01:01:01

Materializing [1m[32m2[0m feature views from [1m[32m1995-01-01 01:01:01+00:00[0m to [1m[32m2025-01-01 01:01:01+00:00[0m into the [1m[32mredis[0m online store.

[1m[32muser_features[0m:
100%|███████████████████████████████████████████████████████████| 654/654 [00:00<00:00, 4083.26it/s]
[1m[32mitem_features[0m:
100%|██████████████████████████████████████████████████████████| 658/658 [00:00<00:00, 11407.27it/s]


In [12]:
feature_store = feast.FeatureStore(feature_repo_path)
feature_store.get_feature_view("user_features").features

[user_shops-Int32,
 user_profile-Int32,
 user_group-Int32,
 user_gender-Int32,
 user_age-Int32,
 user_consumption_2-Int32,
 user_is_occupied-Int32,
 user_geography-Int32,
 user_intentions-Int32,
 user_brands-Int32,
 user_categories-Int32,
 user_id-Int32]

In [13]:
%%timeit
feature_store.get_online_features(
    features=["user_features:user_id", "user_features:user_age"],
    entity_rows=[{"user_id_raw": 1}]
).to_df()

# Fast feature retrieval from Redis!

3.95 ms ± 12.6 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)


### Explore Feature Repo Structure

In [14]:
import seedir as sd

sd.seedir(
    feature_repo_path,
    style="lines",
    itemlimit=10,
    depthlimit=3,
    exclude_folders=[".ipynb_checkpoints", "__pycache__"],
    sort=True,
)

feature_repo/
├─data/
│ ├─.gitkeep
│ ├─item_features.parquet
│ ├─registry.db
│ └─user_features.parquet
├─feature_store.yaml
├─item_features.py
└─user_features.py


## Redis ANN Index Setup

### Load Item Embeddings
We will load the pre-generated Item embeddings from file in preparation for loading into the Redis Server.

In [15]:
item_embeddings = Dataset(os.path.join(DATA_DIR, "item_embeddings.parquet")).to_ddf().compute()
item_embeddings.head()

Unnamed: 0,item_id,0,1,2,3,4,5,6,7,8,...,54,55,56,57,58,59,60,61,62,63
0,1,-0.034885,-0.000131,0.018455,0.03743,0.026332,0.012729,0.00676,0.069112,0.044133,...,-0.027153,-0.02995,-0.02007,-0.067773,0.00242,-0.001353,-0.055582,0.042481,0.013875,0.021228
1,2,0.021357,-0.026375,0.06909,-0.011445,0.025277,-0.010337,0.008437,0.042574,0.060663,...,-0.03727,-0.039209,0.013558,-0.006484,-0.029601,0.073999,0.009857,-0.022534,-0.00944,-0.025069
2,3,-0.018197,0.017502,0.002263,0.008534,0.015912,0.00636,-0.00166,0.007613,0.054932,...,-0.045789,0.033707,-0.025606,-0.020231,0.068983,0.030158,-0.054312,-0.006741,0.026637,-0.040934
3,4,-0.018756,-0.057435,0.027142,0.069214,-0.014137,0.063484,0.049648,-0.000459,0.04144,...,-0.050948,-0.007804,0.001069,-0.059237,-0.018273,-0.005572,-0.017192,0.033178,0.05067,0.040354
4,5,0.044985,0.015847,-0.041081,-0.00662,-0.003196,-0.04521,-0.031615,-0.093638,0.007464,...,-0.014779,0.057923,-0.015743,-0.048929,0.000438,-0.043618,-0.137103,-0.01958,0.025585,0.028937


In [16]:
import asyncio
import redis.asyncio as redis
from redis.commands.search.query import Query
from redis.commands.search.indexDefinition import IndexDefinition, IndexType
from redis.commands.search.field import VectorField

# Connect to the Redis client
host, port = os.environ["FEATURE_STORE_ADDRESS"].split(":")
redis_conn = redis.Redis(host=host, port=port)

In [17]:
# Define Redis ANN Index Params and Fields
INDEX_NAME = "candidate_index"
VECTOR_FIELD_NAME = "item_embedding"

vector_field = VectorField(
    VECTOR_FIELD_NAME,
    "HNSW", {
        "TYPE": "FLOAT32",
        "DIM": 64,
        "DISTANCE_METRIC": "IP",
        "INITIAL_CAP": len(item_embeddings),
    }
)

# Create ANN Index
await redis_conn.ft(INDEX_NAME).create_index(
    fields = [vector_field],
    definition= IndexDefinition(prefix=["ITEM:"], index_type=IndexType.HASH)
)

b'OK'

In [18]:
# Function to write item embeddings to Redis
async def write_item_embeddings(embs, n: int, redis_conn: redis.Redis):
    semaphore = asyncio.Semaphore(n)
    async def write(row):
        async with semaphore:
            item_id = int(row.pop("item_id"))
            entry = {
                "item_id": item_id,
                VECTOR_FIELD_NAME: np.array(row.values, dtype=np.float32).tobytes()
            }
            await redis_conn.hset(f"ITEM:{item_id}", mapping=entry)
    asyncio.gather(*[write(row[1]) for row in embs.iterrows()])

In [19]:
# Write embeddings to Redis ANN Index created above
await write_item_embeddings(item_embeddings, 100, redis_conn)

In [20]:
# Verify Index Construction
await redis_conn.ft(INDEX_NAME).info()

{'index_name': 'candidate_index',
 'index_options': [],
 'index_definition': [b'key_type',
  b'HASH',
  b'prefixes',
  [b'ITEM:'],
  b'default_score',
  b'1'],
 'attributes': [[b'identifier',
   b'item_embedding',
   b'attribute',
   b'item_embedding',
   b'type',
   b'VECTOR']],
 'num_docs': '100',
 'max_doc_id': '100',
 'num_terms': '0',
 'num_records': '100',
 'inverted_sz_mb': '0',
 'vector_index_sz_mb': '0.28116989135742188',
 'total_inverted_index_blocks': '0',
 'offset_vectors_sz_mb': '0',
 'doc_table_size_mb': '0.006954193115234375',
 'sortable_values_size_mb': '0',
 'key_table_size_mb': '0.0027933120727539062',
 'records_per_doc_avg': '1',
 'bytes_per_record_avg': '0',
 'offsets_per_term_avg': '0',
 'offset_bits_per_record_avg': '-nan',
 'hash_indexing_failures': '0',
 'total_indexing_time': '1.905',
 'indexing': '0',
 'percent_indexed': '1',
 'number_of_uses': 1,
 'gc_stats': [b'bytes_collected',
  b'0',
  b'total_ms_run',
  b'0',
  b'total_cycles',
  b'0',
  b'average_cycle_

In [21]:
# Fetch an Item ID
item_ids = [key for key in await redis_conn.keys() if b"ITEM:" in key]
item_id = item_ids[0]

# Fetch a testing input vector
test_vector = await redis_conn.hget(item_id.decode("utf"), VECTOR_FIELD_NAME)

# Create a Redis VSS Query
query = Query(f"*=>[KNN 10 @{VECTOR_FIELD_NAME} $vec_param AS vector_score]")\
    .sort_by("vector_score")\
    .return_fields("id", "vector_score")\
    .dialect(2)

# Search for KNN
k_nearest_neighbors = await redis_conn.ft(INDEX_NAME).search(query, query_params={"vec_param": test_vector})

In [22]:
# Inspect results
k_nearest_neighbors.docs

[Document {'id': 'ITEM:78', 'payload': None, 'vector_score': '0.918215274811'},
 Document {'id': 'ITEM:51', 'payload': None, 'vector_score': '0.925500392914'},
 Document {'id': 'ITEM:86', 'payload': None, 'vector_score': '0.92994427681'},
 Document {'id': 'ITEM:98', 'payload': None, 'vector_score': '0.933368206024'},
 Document {'id': 'ITEM:175', 'payload': None, 'vector_score': '0.935036063194'},
 Document {'id': 'ITEM:103', 'payload': None, 'vector_score': '0.940324306488'},
 Document {'id': 'ITEM:79', 'payload': None, 'vector_score': '0.942311763763'},
 Document {'id': 'ITEM:64', 'payload': None, 'vector_score': '0.944746255875'},
 Document {'id': 'ITEM:276', 'payload': None, 'vector_score': '0.945177614689'},
 Document {'id': 'ITEM:163', 'payload': None, 'vector_score': '0.945328950882'}]

### Next Steps
In this notebook we created our Feast Feature Store and setup the Redis ANN Index. Next, we will deploy our trained models into [Triton Inference Server (TIS)](https://github.com/triton-inference-server/server).

For the next step, move on to the [`02-Deploying-Online-Multi-Stage-Recsys-with-Triton.ipynb`](./02-Deploying-Online-Multi-Stage-Redsys-with-Triton.ipynb) notebook to deploy our saved models as an ensemble to TIS and obtain prediction results for a given request.