In [1]:
# Copyright 2023 NVIDIA Corporation. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ================================

# Each user is responsible for checking the content of datasets and the
# applicable licenses and determining if suitable for the intended use.

<img src="https://developer.download.nvidia.com/notebooks/dlsw-notebooks/merlin_merlin_01-building-recommender-systems-with-merlin/nvidia_logo.png" style="width: 90px; float: right;"> 

## Building Intelligent Recommender Systems with Merlin

This notebook is created using the latest stable [merlin-tensorflow](https://catalog.ngc.nvidia.com/orgs/nvidia/teams/merlin/containers/merlin-tensorflow/tags) container. 

### Overview

Recommender Systems (RecSys) are the engine of the modern internet and the catalyst for human decisions. Building a recommendation system is challenging because it requires multiple stages (data preprocessing, offline training, item retrieval, filtering, ranking, ordering, etc.) to work together seamlessly and efficiently. The biggest challenges for new practitioners are the lack of understanding around what RecSys look like in the real world, and the gap between examples of simple models and a production-ready end-to-end recommender systems.

The figure below represents a four-stage recommender systems. This is a more complex process than only training a single model and deploying it, and it is much more realistic and closer to what's happening in the real-world recommender production systems.

![fourstage](../images/fourstages.png)

In these series of notebooks, we are going to showcase how we can deploy a four-stage recommender systems using Merlin Systems library easily on [Triton Inference Server](https://github.com/triton-inference-server/server). Let's go over the concepts in the figure briefly. 
- **Retrieval:** This is the step to narrow down millions of items into thousands of candidates. We are going to train a Two-Tower item retrieval model to retrieve the relevant top-K candidate items.
- **Filtering:** This step is to exclude the already interacted  or undesirable items from the candidate items set or to apply business logic rules. Although this is an important step, for this example we skip this step.
- **Scoring:** This is also known as ranking. Here the retrieved and filtered candidate items are being scored. We are going to train a ranking model to be able to use at our scoring step. 
- **Ordering:** At this stage, we can order the final set of items that we want to recommend to the user. Here, we’re able to align the output of the model with business needs, constraints, or criteria.

To learn more about the four-stage recommender systems, you can listen to Even Oldridge's [Moving Beyond Recommender Models talk](https://www.youtube.com/watch?v=5qjiY-kLwFY&list=PL65MqKWg6XcrdN4TJV0K1PdLhF_Uq-b43&index=7) at KDD'21 and read more [in this blog post](https://eugeneyan.com/writing/system-design-for-discovery/).

### Learning objectives
- Understanding four stages of recommender systems
- Training retrieval and ranking models with Merlin Models
- Setting up feature store and approximate nearest neighbours (ANN) search libraries
- Deploying trained models to Triton Inference Server with Merlin Systems

In addition to NVIDIA Merlin libraries and the Triton Inference Server client library, we use two external libraries in these series of examples:

- [Feast](https://docs.feast.dev/): an end-to-end open source feature store library for machine learning
- [Faiss](https://github.com/facebookresearch/faiss): a library for efficient similarity search and clustering of dense vectors

You can find more information about `Feast feature store` and `Faiss` libraries in the next notebook.

### Import required libraries and functions

**Compatibility:**

This notebook is developed and tested using the latest `merlin-tensorflow` container from the NVIDIA NGC catalog. To find the tag for the most recently-released container, refer to the [Merlin TensorFlow](https://catalog.ngc.nvidia.com/orgs/nvidia/teams/merlin/containers/merlin-tensorflow) page.

In [2]:
!pip install "feast==0.31"

[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m23.3.2[0m[39;49m -> [0m[32;49m25.1.1[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip install --upgrade pip[0m


In [3]:
# CHOOSE AN OPTION - GPU or CPU:
# ------------------------------
# for running this example on GPU, uncomment the following lines
# !pip install faiss-gpu
# import os
# os.environ["TF_GPU_ALLOCATOR"] = "cuda_malloc_async"
# ------------------------------
# for running this example on CPU, uncomment the following lines
# ATTENTION: The installation of "tensorflow-cpu==2.12.0" will overwrite the installation of tensorflow, which ruin the abillity for Triton Inference Server to serve the models.
#            So to serve the models, you would need to use a new container of `merlin-tensorflow` that has the original "tensorflow" package.
!pip install "tensorflow-cpu==2.12.0" faiss-cpu
!pip uninstall --yes cudf horovod
import os
os.environ["CUDA_VISIBLE_DEVICES"] = "-1"  # Disable GPU usage

[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m23.3.2[0m[39;49m -> [0m[32;49m25.1.1[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip install --upgrade pip[0m
Found existing installation: cudf 23.4.0
Can't uninstall 'cudf'. No files were found to uninstall.
Found existing installation: horovod 0.28.0+nv23.6
Can't uninstall 'horovod'. No files were found to uninstall.
[0m

In [4]:
import os
import nvtabular as nvt
from nvtabular.ops import Rename, Filter, Dropna, LambdaOp, Categorify, \
    TagAsUserFeatures, TagAsUserID, TagAsItemFeatures, TagAsItemID, AddMetadata

from merlin.schema.tags import Tags
from merlin.dag.ops.subgraph import Subgraph
import merlin.models.tf as mm
from merlin.io.dataset import Dataset
from merlin.datasets.ecommerce import transform_aliccp
import tensorflow as tf
import numpy as np
import pandas as pd
import shutil

2025-06-28 18:04:46.404586: I tensorflow/core/platform/cpu_feature_guard.cc:182] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: AVX2 FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.




  warn(f"PyTorch dtype mappings did not load successfully due to an error: {exc.msg}")
  from .autonotebook import tqdm as notebook_tqdm


In [5]:
print("GPUs:", tf.config.list_physical_devices("GPU"))

GPUs: []


In [6]:
# disable INFO and DEBUG logging everywhere
import logging

logging.disable(logging.WARNING)

In this example notebook, we will generate the synthetic train and test datasets mimicking the real [Ali-CCP: Alibaba Click and Conversion Prediction](https://tianchi.aliyun.com/dataset/dataDetail?dataId=408#1) dataset to build our recommender system models.

First, we define our input path and feature repo path.

In [7]:
# set up the base dir for feature store
BASE_DIR = os.environ.get("BASE_DIR", "/Merlin/examples/Building-and-deploying-multi-stage-RecSys")
DATA_FOLDER = os.path.join(BASE_DIR, os.environ.get("DATA_FOLDER", "workspace/data"))

Then, we use `generate_data` utility function to generate synthetic dataset. 

In [8]:
from merlin.datasets.synthetic import generate_data

NUM_ROWS = os.environ.get("NUM_ROWS", 100_000)
train_raw, valid_raw = generate_data("aliccp-raw", int(NUM_ROWS), set_sizes=(0.7, 0.3))



In [9]:
train_raw.schema

Unnamed: 0,name,tags,dtype,is_list,is_ragged,properties.num_buckets,properties.freq_threshold,properties.max_size,properties.start_index,properties.cat_path,properties.embedding_sizes.cardinality,properties.embedding_sizes.dimension,properties.domain.min,properties.domain.max,properties.domain.name
0,user_id,"(Tags.CATEGORICAL, Tags.ID, Tags.USER)","DType(name='int32', element_type=<ElementType....",False,False,,0.0,0.0,0.0,.//categories/unique.user_id.parquet,294736.0,512.0,0,294736,user_id
1,item_id,"(Tags.CATEGORICAL, Tags.ITEM, Tags.ID)","DType(name='int32', element_type=<ElementType....",False,False,,0.0,0.0,0.0,.//categories/unique.item_id.parquet,3078306.0,512.0,0,3078306,item_id
2,item_category,"(Tags.CATEGORICAL, Tags.ITEM)","DType(name='int32', element_type=<ElementType....",False,False,,0.0,0.0,0.0,.//categories/unique.item_category.parquet,8581.0,255.0,0,8581,item_category
3,item_shop,"(Tags.CATEGORICAL, Tags.ITEM)","DType(name='int32', element_type=<ElementType....",False,False,,0.0,0.0,0.0,.//categories/unique.item_shop.parquet,604498.0,512.0,0,604498,item_shop
4,item_brand,"(Tags.CATEGORICAL, Tags.ITEM)","DType(name='int32', element_type=<ElementType....",False,False,,0.0,0.0,0.0,.//categories/unique.item_brand.parquet,208179.0,512.0,0,208179,item_brand
5,item_intention,"(Tags.CATEGORICAL, Tags.ITEM)","DType(name='int32', element_type=<ElementType....",False,False,,0.0,0.0,0.0,.//categories/unique.item_intention.parquet,96258.0,512.0,0,96258,item_intention
6,user_shops,"(Tags.CATEGORICAL, Tags.USER)","DType(name='int32', element_type=<ElementType....",False,False,,0.0,0.0,0.0,.//categories/unique.user_shops.parquet,116741.0,512.0,0,116741,user_shops
7,user_profile,"(Tags.CATEGORICAL, Tags.USER)","DType(name='int32', element_type=<ElementType....",False,False,,0.0,0.0,0.0,.//categories/unique.user_profile.parquet,98.0,21.0,0,98,user_profile
8,user_group,"(Tags.CATEGORICAL, Tags.USER)","DType(name='int32', element_type=<ElementType....",False,False,,0.0,0.0,0.0,.//categories/unique.user_group.parquet,14.0,16.0,0,14,user_group
9,user_gender,"(Tags.CATEGORICAL, Tags.USER)","DType(name='int32', element_type=<ElementType....",False,False,,0.0,0.0,0.0,.//categories/unique.user_gender.parquet,3.0,16.0,0,3,user_gender


See `Schema` [documentation](https://nvidia-merlin.github.io/core/v0.2.0/api/merlin.schema.html)

In [10]:
from nvtabular import ColumnSchema, Schema
from merlin.schema import Tags

# ── any custom tags you used ──────────────────────────────────────────────
USER_ITEM = "user_item"        # appears on several “user_item_*” columns

# ── build the schema column‑by‑column ─────────────────────────────────────
schema = Schema([
    # --- ID columns -------------------------------------------------------
    ColumnSchema("user_id", dtype="int32", tags=[Tags.ID, Tags.USER, Tags.CATEGORICAL]),
    ColumnSchema("item_id", dtype="int32", tags=[Tags.ID, Tags.ITEM, Tags.CATEGORICAL]),

    # --- item features ----------------------------------------------------
    ColumnSchema("item_category",  dtype="int32", tags=[Tags.ITEM, Tags.CATEGORICAL]),
    ColumnSchema("item_shop",      dtype="int32", tags=[Tags.ITEM, Tags.CATEGORICAL]),
    ColumnSchema("item_brand",     dtype="int32", tags=[Tags.ITEM, Tags.CATEGORICAL]),
    ColumnSchema("item_intention", dtype="int32", tags=[Tags.ITEM, Tags.CATEGORICAL]),

    # --- user features ----------------------------------------------------
    ColumnSchema("user_shops",          dtype="int32", tags=[Tags.USER, Tags.CATEGORICAL]),
    ColumnSchema("user_profile",        dtype="int32", tags=[Tags.USER, Tags.CATEGORICAL]),
    ColumnSchema("user_group",          dtype="int32", tags=[Tags.USER, Tags.CATEGORICAL]),
    ColumnSchema("user_gender",         dtype="int32", tags=[Tags.USER, Tags.CATEGORICAL]),
    ColumnSchema("user_age",            dtype="int32", tags=[Tags.USER, Tags.CATEGORICAL]),
    ColumnSchema("user_consumption_1",  dtype="int32", tags=[Tags.USER, Tags.CATEGORICAL]),
    ColumnSchema("user_consumption_2",  dtype="int32", tags=[Tags.USER, Tags.CATEGORICAL]),
    ColumnSchema("user_is_occupied",    dtype="int32", tags=[Tags.USER, Tags.CATEGORICAL]),
    ColumnSchema("user_geography",      dtype="int32", tags=[Tags.USER, Tags.CATEGORICAL]),
    ColumnSchema("user_intentions",     dtype="int32", tags=[Tags.USER, Tags.CATEGORICAL]),
    ColumnSchema("user_brands",         dtype="int32", tags=[Tags.USER, Tags.CATEGORICAL]),
    ColumnSchema("user_categories",     dtype="int32", tags=[Tags.USER, Tags.CATEGORICAL]),

    # --- user‑item interaction features (custom tag) ----------------------
    ColumnSchema("user_item_categories",  dtype="int32", tags=[USER_ITEM, Tags.CATEGORICAL]),
    ColumnSchema("user_item_shops",       dtype="int32", tags=[USER_ITEM, Tags.CATEGORICAL]),
    ColumnSchema("user_item_brands",      dtype="int32", tags=[USER_ITEM, Tags.CATEGORICAL]),
    ColumnSchema("user_item_intentions",  dtype="int32", tags=[USER_ITEM, Tags.CATEGORICAL]),

    # --- context feature --------------------------------------------------
    ColumnSchema("position", dtype="int32", tags=[Tags.CONTEXT, Tags.CATEGORICAL]),

    # --- labels / targets -------------------------------------------------
    ColumnSchema("click",      dtype="int64"),   # no tags => plain label
    ColumnSchema("conversion", dtype="int64"),
])

Add `user_history_item_ids` column, which stores the `item_id`s that a user had "already-seen".

In [11]:
# import numpy as np

# def ensure_history_column(df, col="user_history_item_ids", dtype=np.int32):
#     """Make sure *col* exists and every row contains a NumPy‐backed list."""
#     if col not in df.columns:
#         # Column was never created → start everyone with an empty list
#         df[col] = [np.asarray([1, 2, 3], dtype=dtype) for _ in range(len(df))]
#     else:
#         # Column exists → only touch the rows that aren’t already lists
#         df[col] = [
#             np.asarray(v, dtype=dtype) if isinstance(v, list)        # keep real lists
#             else np.asarray([], dtype=dtype)                         # NaN / None / scalar → []
#             for v in df[col]
#         ]
#     return df


# # Update the schema
# schema["user_history_item_ids"] = ColumnSchema("user_history_item_ids", dtype="int32", is_list=True, tags=[Tags.USER, Tags.CATEGORICAL])

# # Convert from NVTabular datasets back to DataFrame, so the data could be manipulated.
# train_df = ensure_history_column(train_raw.compute())
# valid_df = ensure_history_column(valid_raw.compute())

# # wrap as NVTabular datasets (safe now), and load Schema
# train_raw = Dataset(train_df, schema=schema)
# valid_raw = Dataset(valid_df, schema=schema)

In [12]:
train_raw.schema

Unnamed: 0,name,tags,dtype,is_list,is_ragged,properties.num_buckets,properties.freq_threshold,properties.max_size,properties.start_index,properties.cat_path,properties.embedding_sizes.cardinality,properties.embedding_sizes.dimension,properties.domain.min,properties.domain.max,properties.domain.name
0,user_id,"(Tags.CATEGORICAL, Tags.ID, Tags.USER)","DType(name='int32', element_type=<ElementType....",False,False,,0.0,0.0,0.0,.//categories/unique.user_id.parquet,294736.0,512.0,0,294736,user_id
1,item_id,"(Tags.CATEGORICAL, Tags.ITEM, Tags.ID)","DType(name='int32', element_type=<ElementType....",False,False,,0.0,0.0,0.0,.//categories/unique.item_id.parquet,3078306.0,512.0,0,3078306,item_id
2,item_category,"(Tags.CATEGORICAL, Tags.ITEM)","DType(name='int32', element_type=<ElementType....",False,False,,0.0,0.0,0.0,.//categories/unique.item_category.parquet,8581.0,255.0,0,8581,item_category
3,item_shop,"(Tags.CATEGORICAL, Tags.ITEM)","DType(name='int32', element_type=<ElementType....",False,False,,0.0,0.0,0.0,.//categories/unique.item_shop.parquet,604498.0,512.0,0,604498,item_shop
4,item_brand,"(Tags.CATEGORICAL, Tags.ITEM)","DType(name='int32', element_type=<ElementType....",False,False,,0.0,0.0,0.0,.//categories/unique.item_brand.parquet,208179.0,512.0,0,208179,item_brand
5,item_intention,"(Tags.CATEGORICAL, Tags.ITEM)","DType(name='int32', element_type=<ElementType....",False,False,,0.0,0.0,0.0,.//categories/unique.item_intention.parquet,96258.0,512.0,0,96258,item_intention
6,user_shops,"(Tags.CATEGORICAL, Tags.USER)","DType(name='int32', element_type=<ElementType....",False,False,,0.0,0.0,0.0,.//categories/unique.user_shops.parquet,116741.0,512.0,0,116741,user_shops
7,user_profile,"(Tags.CATEGORICAL, Tags.USER)","DType(name='int32', element_type=<ElementType....",False,False,,0.0,0.0,0.0,.//categories/unique.user_profile.parquet,98.0,21.0,0,98,user_profile
8,user_group,"(Tags.CATEGORICAL, Tags.USER)","DType(name='int32', element_type=<ElementType....",False,False,,0.0,0.0,0.0,.//categories/unique.user_group.parquet,14.0,16.0,0,14,user_group
9,user_gender,"(Tags.CATEGORICAL, Tags.USER)","DType(name='int32', element_type=<ElementType....",False,False,,0.0,0.0,0.0,.//categories/unique.user_gender.parquet,3.0,16.0,0,3,user_gender


In [13]:
train_raw.head()

Unnamed: 0,user_id,user_shops,user_profile,user_group,user_gender,user_age,user_consumption_1,user_consumption_2,user_is_occupied,user_geography,...,item_shop,item_brand,item_intention,user_item_categories,user_item_shops,user_item_brands,user_item_intentions,position,click,conversion
0,20,1992,2,1,1,1,1,1,1,1,...,20685,7124,3294,4526,143833,108178,72284,0,0,0
1,8,734,1,1,1,1,1,1,1,1,...,19615,6756,3124,6562,185989,80686,39008,0,0,1
2,12,1153,1,1,1,1,1,1,1,1,...,1070,369,171,5694,114310,85620,41219,3,1,1
3,256,26723,23,3,1,2,1,1,1,1,...,6420,2211,1023,2751,63275,42093,13467,0,1,1
4,17,1677,2,1,1,1,1,1,1,1,...,2140,737,341,2769,108771,97043,69257,0,1,0


If you would like to use the real ALI-CCP dataset, you can use [get_aliccp()](https://github.com/NVIDIA-Merlin/models/blob/stable/merlin/datasets/ecommerce/aliccp/dataset.py) function instead. This function takes the raw csv files, and generate parquet files that can be directly fed to NVTabular workflow above.

### Set up a feature store with Feast

Before we move onto the next step, we need to create a Feast feature repository. [Feast](https://feast.dev/) is an end-to-end open source feature store for machine learning. Feast (Feature Store) is a customizable operational data system that re-uses existing infrastructure to manage and serve machine learning features to real-time models.

We will create the feature repo in the current working directory, which is `BASE_DIR` for us.

In [14]:
!rm -rf $BASE_DIR/feast_repo
!cd $BASE_DIR && feast init feast_repo

  from distutils.dir_util import copy_tree

Creating a new Feast repository in [1m[32m/Merlin/examples/Building-and-deploying-multi-stage-RecSys/feast_repo[0m.



You should be seeing a message like <i>Creating a new Feast repository in ... </i> printed out above. Now, navigate to the `feature_repo` folder and remove the demo parquet file created by default, and `examples.py` file.

In [15]:
feature_repo_path = os.path.join(BASE_DIR, "feast_repo/feature_repo")
if os.path.exists(f"{feature_repo_path}/example_repo.py"):
    os.remove(f"{feature_repo_path}/example_repo.py")
if os.path.exists(f"{feature_repo_path}/data/driver_stats.parquet"):
    os.remove(f"{feature_repo_path}/data/driver_stats.parquet")

### Exporting user and item features

In [16]:
from merlin.models.utils.dataset import unique_rows_by_features

user_features = (
    unique_rows_by_features(train_raw, Tags.USER, Tags.USER_ID)
    .compute()
    .reset_index(drop=True)
)



We will artificially add `datetime` and `created` timestamp columns to our user_features dataframe. This required by Feast to track the user-item features and their creation time and to determine which version to use when we query Feast.

In [17]:
from datetime import datetime

user_features["datetime"] = datetime.now()
user_features["datetime"] = user_features["datetime"].astype("datetime64[ns]")
user_features["created"] = datetime.now()
user_features["created"] = user_features["created"].astype("datetime64[ns]")

In [18]:
user_features.head()

Unnamed: 0,user_id,user_shops,user_profile,user_group,user_gender,user_age,user_consumption_1,user_consumption_2,user_is_occupied,user_geography,user_intentions,user_brands,user_categories,datetime,created
0,20,1992,2,1,1,1,1,1,1,1,577,990,104,2025-06-28 18:04:52.720971,2025-06-28 18:04:52.722287
1,8,734,1,1,1,1,1,1,1,1,213,365,39,2025-06-28 18:04:52.720971,2025-06-28 18:04:52.722287
2,12,1153,1,1,1,1,1,1,1,1,334,573,61,2025-06-28 18:04:52.720971,2025-06-28 18:04:52.722287
3,256,26723,23,3,1,2,1,1,1,1,7734,13280,1393,2025-06-28 18:04:52.720971,2025-06-28 18:04:52.722287
4,17,1677,2,1,1,1,1,1,1,1,486,834,88,2025-06-28 18:04:52.720971,2025-06-28 18:04:52.722287


In [19]:
# save to disk
if os.path.exists(os.path.join(feature_repo_path, "data", "user_features.parquet")):
    os.remove(os.path.join(feature_repo_path, "data", "user_features.parquet"))
user_features.to_parquet(
    os.path.join(feature_repo_path, "data", "user_features.parquet")
)

In [20]:
item_features = (
    unique_rows_by_features(train_raw, Tags.ITEM, Tags.ITEM_ID)
    .compute()
    .reset_index(drop=True)
)



In [21]:
item_features["datetime"] = datetime.now()
item_features["datetime"] = item_features["datetime"].astype("datetime64[ns]")
item_features["created"] = datetime.now()
item_features["created"] = item_features["created"].astype("datetime64[ns]")

In [22]:
item_features.head()

Unnamed: 0,item_id,item_category,item_shop,item_brand,item_intention,datetime,created
0,59,294,20685,7124,3294,2025-06-28 18:04:53.037481,2025-06-28 18:04:53.038693
1,56,279,19615,6756,3124,2025-06-28 18:04:53.037481,2025-06-28 18:04:53.038693
2,4,16,1070,369,171,2025-06-28 18:04:53.037481,2025-06-28 18:04:53.038693
3,19,92,6420,2211,1023,2025-06-28 18:04:53.037481,2025-06-28 18:04:53.038693
4,7,31,2140,737,341,2025-06-28 18:04:53.037481,2025-06-28 18:04:53.038693


In [23]:
# save to disk
if os.path.exists(os.path.join(feature_repo_path, "data", "item_features.parquet")):
    os.remove(os.path.join(feature_repo_path, "data", "item_features.parquet"))
item_features.to_parquet(
    os.path.join(feature_repo_path, "data", "item_features.parquet")
)

### Feature Engineering with NVTabular

In [24]:
output_path = os.path.join(DATA_FOLDER, "processed_nvt")

In the following NVTabular workflow, notice that we apply the `Dropna()` Operator at the end. We add the Operator to remove rows with missing values in the final DataFrame after the preceding transformations. Although, the synthetic dataset that we generate and use in this notebook does not have null entries, you might have null entries in your `user_id` and `item_id` columns in your own custom dataset. Therefore, while applying `Dropna()` we will not be registering null `user_id_raw` and `item_id_raw` values in the feature store, and will be avoiding potential issues that can occur because of any null entries.

See the possible Feast `dtype`s [documentation](https://github.com/feast-dev/feast/blob/master/docs/specs/offline_store_format.md#type-mappings)

See the possible Merlin `dtype`s [documentation](https://github.com/NVIDIA-Merlin/core/blob/main/merlin/dtypes/aliases.py)

In [25]:
#     """
#     Build a Subgraph automatically from column tags in `schema`.

#     include_tags : list[Tag] – mandatory tags the column must have
#     exclude_tags : list[Tag] – tags that disqualify a column (optional)
#     id_op        : Operator   – pipeline for ID columns   (optional)
#     feat_op      : Operator   – pipeline for feature cols (optional)
#     """

# def subgraph_from_schema(name, schema, include_tags, exclude_tags=None,
#                          id_op=None, feat_op=None, dtype="int32"):
#     exclude_tags = set(exclude_tags or [])

#     def _select(tags_required):
#         names = [
#             c.name
#             for c in schema.column_schemas.values()  # ← FIXED HERE
#             if tags_required.issubset(set(c.tags))
#             and exclude_tags.isdisjoint(set(c.tags))
#         ]
#         return ColumnSelector(names)

#     ids   = _select(set(include_tags) | {Tags.ID})
#     feats = _select(set(include_tags) - {Tags.ID})

#     graph = None
#     if len(ids):   graph =  ids >> (id_op   or Categorify(dtype=dtype))
#     if len(feats): graph = (graph + feats >> (feat_op or Categorify(dtype=dtype))) if graph else feats >> (feat_op or Categorify(dtype=dtype))

#     return Subgraph(name, graph)


In [26]:
from merlin.dag import ColumnSelector

user_id_raw = ["user_id"] >> Rename(postfix='_raw') >> LambdaOp(lambda col: col.astype("int32")) >> TagAsUserFeatures()
item_id_raw = ["item_id"] >> Rename(postfix='_raw') >> LambdaOp(lambda col: col.astype("int32")) >> TagAsItemFeatures()


subgraph_item = Subgraph(
     "item", 
    (["item_id"] >> Categorify(dtype="int32") >> TagAsItemID()) + 
    (["item_category", "item_shop", "item_brand"] >> Categorify(dtype="int32") >> TagAsItemFeatures())
)
subgraph_user = Subgraph(
    "user",
    (["user_id"] >> Categorify(dtype="int32") >> TagAsUserID()) +
    (
        [
            "user_shops",
            "user_profile",
            "user_group",
            "user_gender",
            "user_age",
            "user_consumption_2",
            "user_is_occupied",
            "user_geography",
            "user_intentions",
            "user_brands",
            "user_categories"
        ] >> Categorify(dtype="int32") >> TagAsUserFeatures()
    )
    # +
    # (
    #     ["user_history_item_ids"]
    #     >> Categorify(dtype="int32")   # element type of the encoded list
    #     >> TagAsUserFeatures()
    # )
)

targets = ["click"] >> AddMetadata(tags=[Tags.BINARY_CLASSIFICATION, "target"])
outputs = subgraph_user + subgraph_item + targets

# add dropna op to filter rows with nulls
outputs = outputs >> Dropna()
nvt_wkflow = nvt.Workflow(outputs)

Let's call `transform_aliccp` utility function to be able to perform `fit` and `transform` steps on the raw dataset applying the operators defined in the NVTabular workflow pipeline below, and also save our workflow model. After fit and transform, the processed parquet files are saved to output_path.

In [27]:
print(output_path)

/Merlin/examples/Building-and-deploying-multi-stage-RecSys/workspace/data/processed_nvt


In [28]:
# transform_aliccp(
#     (train_raw, valid_raw), output_path, nvt_workflow=nvt_wkflow, workflow_name="workflow"
# )

if os.path.exists(os.path.join(output_path, "train")):
    shutil.rmtree(os.path.join(output_path, "train"))
nvt_wkflow.fit_transform(train_raw).to_parquet(
    output_path=os.path.join(output_path, "train")
)

if os.path.exists(os.path.join(output_path, "valid")):
    shutil.rmtree(os.path.join(output_path, "valid"))
nvt_wkflow.transform(valid_raw).to_parquet(
    output_path=os.path.join(output_path, "valid")
)



### Training a Retrieval Model with Two-Tower Model

We start with the offline candidate retrieval stage. We are going to train a Two-Tower model for item retrieval. To learn more about the Two-tower model you can visit [05-Retrieval-Model.ipynb](https://github.com/NVIDIA-Merlin/models/blob/stable/examples/05-Retrieval-Model.ipynb).

#### Feature Engineering with NVTabular

We are going to process our raw categorical features by encoding them using `Categorify()` operator and tag the features with `user` or `item` tags in the schema file. To learn more about [NVTabular](https://github.com/NVIDIA-Merlin/NVTabular) and the schema object visit this example [notebook](https://github.com/NVIDIA-Merlin/models/blob/stable/examples/02-Merlin-Models-and-NVTabular-integration.ipynb) in the Merlin Models repo.

Define a new output path to store the filtered datasets and schema files.

In [29]:
output_path2 = os.path.join(DATA_FOLDER, "processed/retrieval")

In [30]:
train_tt = Dataset(os.path.join(output_path, "train", "*.parquet"))
valid_tt = Dataset(os.path.join(output_path, "valid", "*.parquet"))



We select only positive interaction rows where `click==1` in the dataset with `Filter()` operator.

In [32]:
inputs = train_tt.schema.column_names
outputs = inputs >> Filter(f=lambda df: df["click"] == 1)

nvt_wkflow.fit(train_tt)

if os.path.exists(os.path.join(output_path2, "train")):
    shutil.rmtree(os.path.join(output_path2, "train"))
nvt_wkflow.transform(train_tt).to_parquet(
    output_path=os.path.join(output_path2, "train")
)

if os.path.exists(os.path.join(output_path2, "valid")):
    shutil.rmtree(os.path.join(output_path2, "valid"))
nvt_wkflow.transform(valid_tt).to_parquet(
    output_path=os.path.join(output_path2, "valid")
)



NVTabular exported the schema file, `schema.pbtxt` a protobuf text file, of our processed dataset. To learn more about the schema object and schema file you can explore [02-Merlin-Models-and-NVTabular-integration.ipynb](https://github.com/NVIDIA-Merlin/models/blob/stable/examples/02-Merlin-Models-and-NVTabular-integration.ipynb) notebook.

**Read filtered parquet files as Dataset objects.**

In [33]:
train_tt = Dataset(os.path.join(output_path2, "train", "*.parquet"), part_size="500MB")
valid_tt = Dataset(os.path.join(output_path2, "valid", "*.parquet"), part_size="500MB")



In [34]:
schema = train_tt.schema.select_by_tag([Tags.ITEM_ID, Tags.USER_ID, Tags.ITEM, Tags.USER]).without(['click'])
train_tt.schema = schema
valid_tt.schema = schema

In [35]:
model_tt = mm.TwoTowerModel(
    schema,
    query_tower=mm.MLPBlock([128, 64], no_activation_last_layer=True),
    samplers=[mm.InBatchSampler()],
    embedding_options=mm.EmbeddingOptions(infer_embedding_sizes=True),
)

In [36]:
model_tt.compile(
    optimizer="adam",
    run_eagerly=False,
    loss="categorical_crossentropy",
    metrics=[mm.RecallAt(10), mm.NDCGAt(10)],
)
model_tt.fit(train_tt, validation_data=valid_tt, batch_size=1024 * 8, epochs=1)

2025-06-28 18:05:36.550921: I tensorflow/core/common_runtime/executor.cc:1197] [/device:CPU:0] (DEBUG INFO) Executor start aborting (this does not indicate an error and you can ignore this message): INVALID_ARGUMENT: You must feed a value for placeholder tensor 'Placeholder/_0' with dtype int32
	 [[{{node Placeholder/_0}}]]




2025-06-28 18:05:55.386320: I tensorflow/core/common_runtime/executor.cc:1197] [/device:CPU:0] (DEBUG INFO) Executor start aborting (this does not indicate an error and you can ignore this message): INVALID_ARGUMENT: You must feed a value for placeholder tensor 'Placeholder/_0' with dtype int32
	 [[{{node Placeholder/_0}}]]




<keras.callbacks.History at 0x7f497c671d80>

### Exporting query (user) model

We export the query tower to use it later during the model deployment stage with Merlin Systems.

In [37]:
query_tower = model_tt.retrieval_block.query_block()
query_tower.save(os.path.join(BASE_DIR, "query_tower"))

### Training a Ranking Model with DLRM

Now we will move onto training an offline ranking model. This ranking model will be used for scoring our retrieved items.

Read processed parquet files. We use the `schema` object to define our model.

In [38]:
# define train and valid dataset objects
train = Dataset(os.path.join(output_path, "train", "*.parquet"), part_size="500MB")
valid = Dataset(os.path.join(output_path, "valid", "*.parquet"), part_size="500MB")

# define schema object
schema = train.schema



In [39]:
target_column = schema.select_by_tag(Tags.TARGET).column_names[0]
target_column

'click'

In [40]:
schema

Unnamed: 0,name,tags,dtype,is_list,is_ragged,properties.num_buckets,properties.freq_threshold,properties.max_size,properties.cat_path,properties.embedding_sizes.cardinality,properties.embedding_sizes.dimension,properties.domain.min,properties.domain.max,properties.domain.name
0,user_id,"(Tags.CATEGORICAL, Tags.ID, Tags.USER)","DType(name='int32', element_type=<ElementType....",False,False,,0.0,0.0,.//categories/unique.user_id.parquet,459.0,50.0,0,458,user_id
1,user_shops,"(Tags.CATEGORICAL, Tags.USER)","DType(name='int32', element_type=<ElementType....",False,False,,0.0,0.0,.//categories/unique.user_shops.parquet,459.0,50.0,0,458,user_shops
2,user_profile,"(Tags.CATEGORICAL, Tags.USER)","DType(name='int32', element_type=<ElementType....",False,False,,0.0,0.0,.//categories/unique.user_profile.parquet,69.0,17.0,0,68,user_profile
3,user_group,"(Tags.CATEGORICAL, Tags.USER)","DType(name='int32', element_type=<ElementType....",False,False,,0.0,0.0,.//categories/unique.user_group.parquet,15.0,16.0,0,14,user_group
4,user_gender,"(Tags.CATEGORICAL, Tags.USER)","DType(name='int32', element_type=<ElementType....",False,False,,0.0,0.0,.//categories/unique.user_gender.parquet,5.0,16.0,0,4,user_gender
5,user_age,"(Tags.CATEGORICAL, Tags.USER)","DType(name='int32', element_type=<ElementType....",False,False,,0.0,0.0,.//categories/unique.user_age.parquet,10.0,16.0,0,9,user_age
6,user_consumption_2,"(Tags.CATEGORICAL, Tags.USER)","DType(name='int32', element_type=<ElementType....",False,False,,0.0,0.0,.//categories/unique.user_consumption_2.parquet,6.0,16.0,0,5,user_consumption_2
7,user_is_occupied,"(Tags.CATEGORICAL, Tags.USER)","DType(name='int32', element_type=<ElementType....",False,False,,0.0,0.0,.//categories/unique.user_is_occupied.parquet,5.0,16.0,0,4,user_is_occupied
8,user_geography,"(Tags.CATEGORICAL, Tags.USER)","DType(name='int32', element_type=<ElementType....",False,False,,0.0,0.0,.//categories/unique.user_geography.parquet,7.0,16.0,0,6,user_geography
9,user_intentions,"(Tags.CATEGORICAL, Tags.USER)","DType(name='int32', element_type=<ElementType....",False,False,,0.0,0.0,.//categories/unique.user_intentions.parquet,459.0,50.0,0,458,user_intentions


In [41]:
train.to_ddf().compute()

Unnamed: 0,user_id,user_shops,user_profile,user_group,user_gender,user_age,user_consumption_2,user_is_occupied,user_geography,user_intentions,user_brands,user_categories,item_id,item_category,item_shop,item_brand,click
0,20,20,4,3,3,3,3,3,3,20,20,20,63,63,63,63,0
1,6,6,3,3,3,3,3,3,3,6,6,6,58,58,58,58,0
2,11,11,3,3,3,3,3,3,3,11,11,11,10,10,10,10,1
3,231,231,26,5,3,4,3,3,3,231,231,231,18,18,18,18,1
4,17,17,4,3,3,3,3,3,3,17,17,17,6,6,6,6,1
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
69995,13,13,4,3,3,3,3,3,3,13,13,13,9,9,9,9,0
69996,16,16,4,3,3,3,3,3,3,16,16,16,29,29,29,29,1
69997,3,3,3,3,3,3,3,3,3,3,3,3,42,42,42,42,1
69998,45,45,6,3,3,3,3,3,3,45,45,45,41,41,41,41,0


In [42]:
valid.to_ddf().compute()

Unnamed: 0,user_id,user_shops,user_profile,user_group,user_gender,user_age,user_consumption_2,user_is_occupied,user_geography,user_intentions,user_brands,user_categories,item_id,item_category,item_shop,item_brand,click
0,8,8,3,3,3,3,3,3,3,8,8,8,45,45,45,45,1
1,36,36,5,3,3,3,3,3,3,36,36,36,11,11,11,11,1
2,4,4,3,3,3,3,3,3,3,4,4,4,12,12,12,12,0
3,41,41,6,3,3,3,3,3,3,41,41,41,23,23,23,23,0
4,8,8,3,3,3,3,3,3,3,8,8,8,39,39,39,39,1
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
29995,7,7,3,3,3,3,3,3,3,7,7,7,39,39,39,39,1
29996,15,15,3,3,3,3,3,3,3,15,15,15,11,11,11,11,1
29997,22,22,4,3,3,3,3,3,3,22,22,22,82,82,82,82,0
29998,55,55,7,3,3,3,3,3,3,55,55,55,84,84,84,84,0


Deep Learning Recommendation Model [(DLRM)](https://arxiv.org/abs/1906.00091) architecture is a popular neural network model originally proposed by Facebook in 2019. The model was introduced as a personalization deep learning model that uses embeddings to process sparse features that represent categorical data and a multilayer perceptron (MLP) to process dense features, then interacts these features explicitly using the statistical techniques proposed in [here](https://ieeexplore.ieee.org/stamp/stamp.jsp?tp=&arnumber=5694074). To learn more about DLRM architetcture please visit `Exploring-different-models` [notebook](https://github.com/NVIDIA-Merlin/models/blob/stable/examples/04-Exporting-ranking-models.ipynb) in the Merlin Models GH repo.

In [43]:
model = mm.DLRMModel(
    schema,
    embedding_dim=64,
    bottom_block=mm.MLPBlock([128, 64]),
    top_block=mm.MLPBlock([128, 64, 32]),
    prediction_tasks=mm.BinaryClassificationTask(target_column),
)

In [44]:
model.compile(optimizer="adam", run_eagerly=False, metrics=[tf.keras.metrics.AUC()])
model.fit(train, validation_data=valid, batch_size=16 * 1024)

2025-06-28 18:06:04.959701: I tensorflow/core/common_runtime/executor.cc:1197] [/device:CPU:0] (DEBUG INFO) Executor start aborting (this does not indicate an error and you can ignore this message): INVALID_ARGUMENT: You must feed a value for placeholder tensor 'Placeholder/_0' with dtype int32
	 [[{{node Placeholder/_0}}]]




2025-06-28 18:06:09.458472: I tensorflow/core/common_runtime/executor.cc:1197] [/device:CPU:0] (DEBUG INFO) Executor start aborting (this does not indicate an error and you can ignore this message): INVALID_ARGUMENT: You must feed a value for placeholder tensor 'Placeholder/_0' with dtype int32
	 [[{{node Placeholder/_0}}]]




<keras.callbacks.History at 0x7f494a6894b0>

Let's save our DLRM model to be able to load back at the deployment stage. 

In [45]:
model.save(os.path.join(BASE_DIR, "dlrm"))

In the following cells we are going to export the required user and item features files, and save the query (user) tower model and item embeddings to disk. If you want to read more about exporting retrieval models, please visit [05-Retrieval-Model.ipynb](https://github.com/NVIDIA-Merlin/models/blob/stable/examples/05-Retrieval-Model.ipynb) notebook in Merlin Models library repo.

### Extract and save Item embeddings

In [46]:
from merlin.systems.dag.ops.tensorflow import PredictTensorflow
from merlin.systems.dag.ops.workflow import TransformWorkflow

workflow =  nvt.Workflow(["item_id"] + (['item_id', 'item_brand', 'item_category', 'item_shop'] >> TransformWorkflow(nvt_wkflow.get_subworkflow("item")) >> PredictTensorflow(model_tt.first.item_block())))
item_embeddings = workflow.fit_transform(Dataset(item_features)).to_ddf().compute()



In [47]:
item_embeddings.tail()

Unnamed: 0,item_id,output_1
452,476,"[0.024547405540943146, -0.011547492817044258, ..."
453,442,"[0.012859153561294079, -0.03099684603512287, -..."
454,477,"[0.024547405540943146, -0.011547492817044258, ..."
455,333,"[0.015834156423807144, -0.005540055222809315, ..."
456,291,"[0.027397993952035904, -0.036829207092523575, ..."


In [48]:
# save to disk
item_embeddings.to_parquet(os.path.join(BASE_DIR, "item_embeddings.parquet"))

### Create feature definitions 

Now we will create our user and item features definitions in the user_features.py and item_features.py files and save these files in the feature_repo.

See the possible `dtype`s [documentation](https://github.com/feast-dev/feast/blob/40d25c62e19283396c410cb0ea1ca8eb119b6002/sdk/python/feast/types.py#L116)

In [49]:
file = open(os.path.join(feature_repo_path, "user_features.py"), "w")
file.write(
    """
from datetime import timedelta
from feast import Entity, Field, FeatureView, ValueType
from feast.types import Int32, Array
from feast.infra.offline_stores.file_source import FileSource

user_features = FileSource(
    path="{}",
    timestamp_field="datetime",
    created_timestamp_column="created",
)

user = Entity(name="user_id", value_type=ValueType.INT32, join_keys=["user_id"],)

user_features_view = FeatureView(
    name="user_features",
    entities=[user],
    ttl=timedelta(0),
    schema=[
        Field(name="user_shops", dtype=Int32),
        Field(name="user_profile", dtype=Int32),
        Field(name="user_group", dtype=Int32),
        Field(name="user_gender", dtype=Int32),
        Field(name="user_age", dtype=Int32),
        Field(name="user_consumption_2", dtype=Int32),
        Field(name="user_is_occupied", dtype=Int32),
        Field(name="user_geography", dtype=Int32),
        Field(name="user_intentions", dtype=Int32),
        Field(name="user_brands", dtype=Int32),
        Field(name="user_categories", dtype=Int32),
        Field(name="user_history_item_ids", dtype=Array(Int32)),  # Any item that the user has already seen (or that your business rules mark as "don’t-show-again") is removed before you fetch item features or run the ranking model.
    ],
    online=True,
    source=user_features,
    tags=dict(),
)
""".format(
        os.path.join(feature_repo_path, "data/", "user_features.parquet")
    )
)
file.close()

In [50]:
with open(os.path.join(feature_repo_path, "item_features.py"), "w") as f:
    f.write(
        """
from datetime import timedelta
from feast import Entity, Field, FeatureView, ValueType
from feast.types import Int32
from feast.infra.offline_stores.file_source import FileSource

item_features = FileSource(
    path="{}",
    timestamp_field="datetime",
    created_timestamp_column="created",
)

item = Entity(name="item_id", value_type=ValueType.INT32, join_keys=["item_id"],)

item_features_view = FeatureView(
    name="item_features",
    entities=[item],
    ttl=timedelta(0),
    schema=[
        Field(name="item_category", dtype=Int32),
        Field(name="item_shop", dtype=Int32),
        Field(name="item_brand", dtype=Int32),
    ],
    online=True,
    source=item_features,
    tags=dict(),
)
""".format(
            os.path.join(feature_repo_path, "data/", "item_features.parquet")
        )
    )
file.close()

### Optionally, create a `FeatureService`

A `FeatureService` is an abstraction that groups one or more `FeatureView`s. It allows you to request features using a single name rather than referencing each feature manually.

In [51]:
with open(os.path.join(feature_repo_path, "user_feature_service.py"), "w") as f:
    f.write(
        """
from feast import FeatureService
from user_features import user_features_view

# Define a feature service that includes user features
user_feature_service = FeatureService(
    name="user_feature_service",
    features=[
        user_features_view,
    ]
)
""")
file.close()

In [52]:
with open(os.path.join(feature_repo_path, "item_feature_service.py"), "w") as f:
    f.write(
        """
from feast import FeatureService
from item_features import item_features_view

# Define a feature service that includes item features
item_feature_service = FeatureService(
    name="item_feature_service",
    features=[
        item_features_view,
    ]
)
""")
file.close()

In [53]:
with open(os.path.join(feature_repo_path, "user_item_feature_service.py"), "w") as f:
    f.write(
        """
from feast import FeatureService
from user_features import user_features_view
from item_features import item_features_view

# Define a feature service that includes both user and item features
user_item_feature_service = FeatureService(
    name="user_item_feature_service",
    features=[
        user_features_view,
        item_features_view,
    ]
)
""")
file.close()

Let's checkout our Feast feature repository structure.

In [54]:
# install seedir
!pip install seedir

[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m23.3.2[0m[39;49m -> [0m[32;49m25.1.1[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip install --upgrade pip[0m


In [55]:
import seedir as sd

feature_repo_path = os.path.join(BASE_DIR, "feast_repo")
sd.seedir(
    feature_repo_path,
    style="lines",
    itemlimit=10,
    depthlimit=3,
    exclude_folders=".ipynb_checkpoints",
    sort=True,
)

feast_repo/
├─README.md
├─__init__.py
└─feature_repo/
  ├─__init__.py
  ├─__pycache__/
  │ ├─__init__.cpython-310.pyc
  │ ├─example_repo.cpython-310.pyc
  │ └─test_workflow.cpython-310.pyc
  ├─data/
  │ ├─item_features.parquet
  │ └─user_features.parquet
  ├─feature_store.yaml
  ├─item_feature_service.py
  ├─item_features.py
  ├─test_workflow.py
  ├─user_feature_service.py
  ├─user_features.py
  └─user_item_feature_service.py


### Next Steps
We trained and exported our ranking and retrieval models and NVTabular workflows. In the next step, we will learn how to deploy our trained models into [Triton Inference Server (TIS)](https://github.com/triton-inference-server/server) with Merlin Systems library.

For the next step, move on to the `02-Deploying-multi-stage-Recsys-with-Merlin-Systems.ipynb` notebook to deploy our saved models as an ensemble to TIS and obtain prediction results for a given request.