## 4-etpowy system rekomendacji FeedAI na podstawie datasetu Ali-CCP oraz frameworku NVidia Merlin

Poniżej zostanie zaprezentowane wytworzenie 4-etapowego systemu rekomendacji wg wzorca opracowanego przez firmę NVidia i zaimplementowanwego na bazie frameworku NVidia Merlin, etapy to:

- **Retrieval:** ograniczanie ilości kandydatów do rekomendacji np. z milionów do tysięcy lub setek, za pomocą modelu głębokiej sieci neuronowej tzw. Two_Tower
- **Filtering:** usuwanie spośród kandydatów produktów za pomocą logiki biznesowej np. usuwanie produktów nie na stanie
- **Scoring:** przypisanie kandydatom punktów ze wzlędu na ważnośc w rekomendacji za pomocą modelu tzw. DLRM
- **Ordering:** ustalenie kolejnosci rekomendacji za pomocą logiki biznesowej, np. w celu zwiększenia sprzedaży w danej kategorii produktów

Ciekawe odnośniki: [Moving Beyond Recommender Models talk](https://www.youtube.com/watch?v=5qjiY-kLwFY&list=PL65MqKWg6XcrdN4TJV0K1PdLhF_Uq-b43&index=7) z KOD 21 czy [blog post](https://eugeneyan.com/writing/system-design-for-discovery/).

Dodatkowe zastosowane biblioteki/rozwiązania:

- [Feast](https://docs.feast.dev/): an end-to-end open source feature store library for machine learning
- [Faiss](https://github.com/facebookresearch/faiss): a library for efficient similarity search and clustering of dense vectors

**Kompatybilność:**

Kod został opracowany na podstawie obrazu Docker którego definicja znajduje sie w lokalizacji `docker\python_env\Dockerfile`. Jest to poprawiona wersja obrazu `nvcr.io/nvidia/merlin/merlin-tensorflow-inference:22.05` umożliwiająca budowę systemu rekomendacji na GPU, oryginalne, związane z framweorkiem Merlin obrazy są publicznie dostępne w [NVIDIA's docker registry](https://catalog.ngc.nvidia.com/containers?filters=&orderBy=dateModifiedDESC&query=merlin).



## Wymagania

### Hardware

Do zbudowania i uruchomienia systemu rekomendacji wymagana jest karta z układem NVidia Ampere, np. taka jak zastosowana tutaj RTX A6000

### Software

- Docker
- Sterowniki NVidia kompatybine z CUDA 11.6

### Import wymaganych bibliotek i funkcji

In [1]:
import os
import glob
import gc
import nvtabular as nvt
from nvtabular.ops import *
from merlin.models.utils.example_utils import workflow_fit_transform
from merlin.schema.tags import Tags
import merlin.models.tf as mm
from merlin.io.dataset import Dataset
import tensorflow as tf

2022-12-05 12:36:03.422481: I tensorflow/core/util/util.cc:169] oneDNN custom operations are on. You may see slightly different numerical results due to floating-point round-off errors from different computation orders. To turn them off, set the environment variable `TF_ENABLE_ONEDNN_OPTS=0`.
2022-12-05 12:36:04.657248: I tensorflow/stream_executor/cuda/cuda_gpu_executor.cc:961] could not open file to read NUMA node: /sys/bus/pci/devices/0000:c1:00.0/numa_node
Your kernel may have been built without NUMA support.
2022-12-05 12:36:04.684467: I tensorflow/stream_executor/cuda/cuda_gpu_executor.cc:961] could not open file to read NUMA node: /sys/bus/pci/devices/0000:c1:00.0/numa_node
Your kernel may have been built without NUMA support.
2022-12-05 12:36:04.684861: I tensorflow/stream_executor/cuda/cuda_gpu_executor.cc:961] could not open file to read NUMA node: /sys/bus/pci/devices/0000:c1:00.0/numa_node
Your kernel may have been built without NUMA support.
2022-12-05 12:36:04.716733: I t

In [2]:
import logging
logging.disable(logging.WARNING)

Do budowy systemu wykorzystujemy publiczny dataset [Ali-CCP: Alibaba Click and Conversion Prediction](https://tianchi.aliyun.com/dataset/dataDetail?dataId=408#1) który został szerzej opisany w dokumentacji.

Dataset został pobrany rozpakowany do dowlonej lokalnej ścieżki `<sciezka_lokalna>/ali-ccp/raw_data_unpacked/` i zamontowany we wspomnianym powyżej kontenerze Docker za pomoca flagi: `-v <sciezka_lokalna>/:/workspace/data/`


Definicja lokalizacji datasetu:

In [4]:
import os
DATA_FOLDER = os.environ.get("DATA_FOLDER", "/workspace/data/ali-ccp/raw_data_unpacked/")
output_path = os.path.join(DATA_FOLDER, 'processed/ranking')

Zczytanie datasetu do formatu `.parquet`:

In [5]:
from merlin.datasets.ecommerce.aliccp.dataset import get_aliccp

# w celu ponownego uruchomienia usunac katalog `raw` ze sciezki DATA_FOLDER

train, valid = get_aliccp(DATA_FOLDER, overwrite=False)

2022-12-05 11:43:00.411862: I tensorflow/stream_executor/cuda/cuda_gpu_executor.cc:961] could not open file to read NUMA node: /sys/bus/pci/devices/0000:c1:00.0/numa_node
Your kernel may have been built without NUMA support.
2022-12-05 11:43:00.412293: I tensorflow/stream_executor/cuda/cuda_gpu_executor.cc:961] could not open file to read NUMA node: /sys/bus/pci/devices/0000:c1:00.0/numa_node
Your kernel may have been built without NUMA support.
2022-12-05 11:43:00.412602: I tensorflow/stream_executor/cuda/cuda_gpu_executor.cc:961] could not open file to read NUMA node: /sys/bus/pci/devices/0000:c1:00.0/numa_node
Your kernel may have been built without NUMA support.
2022-12-05 11:43:00.413005: I tensorflow/stream_executor/cuda/cuda_gpu_executor.cc:961] could not open file to read NUMA node: /sys/bus/pci/devices/0000:c1:00.0/numa_node
Your kernel may have been built without NUMA support.
2022-12-05 11:43:00.413019: I tensorflow/core/common_runtime/gpu/gpu_device.cc:1616] Could not ident

### Ekstrakcja atrybutów produktów i użytkowników za pomocą NVTabular

Przetwarzanie atrybutów kategorycznycvh za pomocą operatora`Categorify()` oraz tagowanie za pomocą taków `user` albo `item`. Więcej na [NVTabular](https://github.com/NVIDIA-Merlin/NVTabular)

In [7]:
%%time

user_id = ["user_id"] >> Categorify(dtype='int32') >> TagAsUserID()
item_id = ["item_id"] >> Categorify(dtype='int32') >> TagAsItemID()

item_features = ["item_category", "item_shop", "item_brand"] >> Categorify(dtype='int32') >> TagAsItemFeatures() 

user_features = ['user_shops', 'user_profile', 'user_group', 
       'user_gender', 'user_age', 'user_consumption_2', 'user_is_occupied',
       'user_geography', 'user_intentions', 'user_brands', 'user_categories'] \
    >> Categorify(dtype='int32') >> TagAsUserFeatures() 

targets = ["click"] >> AddMetadata(tags=[Tags.BINARY_CLASSIFICATION, "target"])

outputs = user_id+item_id+item_features+user_features+targets

CPU times: user 241 µs, sys: 5 µs, total: 246 µs
Wall time: 260 µs


Uruchomienie funkcji `transform_aliccp` w celu aplikacji powyższego przetwarzania, przetworzony dataset w tej formie jest zapisany w formie plików `parquet` w lokalizacji output_path.

In [8]:
from merlin.datasets.ecommerce import transform_aliccp

transform_aliccp((train, valid), output_path, nvt_workflow=outputs, workflow_name='workflow_ranking')

### Etap `Scoring` - trening modelu DLRM opisanego w dokumentacji

In [6]:
# define train and valid dataset objects
train = Dataset(os.path.join(output_path, 'train', '*.parquet'), part_size="500MB")
valid = Dataset(os.path.join(output_path, 'valid', '*.parquet'), part_size="500MB")

# define schema object
schema = train.schema

In [7]:
target_column = schema.select_by_tag(Tags.TARGET).column_names[0]
target_column

'click'

In [11]:
model = mm.DLRMModel(
    schema,
    embedding_dim=64,
    bottom_block=mm.MLPBlock([128, 64]),
    top_block=mm.MLPBlock([128, 64, 32]),
    prediction_tasks=mm.BinaryClassificationTask(target_column, metrics=[tf.keras.metrics.AUC()])
)

In [12]:
model.compile(optimizer='adam', run_eagerly=False)
model.fit(train, validation_data=valid, batch_size=16*1024)

2022-11-22 18:42:04.721557: I tensorflow/stream_executor/cuda/cuda_blas.cc:1786] TensorFloat-32 will be used for the matrix multiplication. This will only be logged once.




2022-11-22 18:45:28.285271: W tensorflow/core/grappler/optimizers/loop_optimizer.cc:907] Skipping loop optimization for Merge node with control input: cond/branch_executed/_13




<keras.callbacks.History at 0x7f70971e3d90>

In [13]:
# katalog dla repozytorium atrybutów
BASE_DIR = os.environ.get("BASE_DIR", "/Merlin/examples/Building-and-deploying-multi-stage-RecSys/")

Zapis modelu DLRM, zostanie załadowny w celu wdrożenia

In [14]:
model.save(os.path.join(BASE_DIR, 'dlrm'))

### Trening modelu Two-Tower na potrzeby etapu identufikacji kandydatów rekomendacji - `Retrieval`

In [6]:
output_path = os.path.join(DATA_FOLDER, 'processed/retrieval')

In [17]:
user_id = ["user_id"] >> Categorify(dtype='int32') >> TagAsUserID()
item_id = ["item_id"] >> Categorify(dtype='int32') >> TagAsItemID()

item_features = ["item_category", "item_shop", "item_brand"] >> Categorify(dtype='int32') >> TagAsItemFeatures()

user_features = ['user_shops', 'user_profile', 'user_group', 
       'user_gender', 'user_age', 'user_consumption_2', 'user_is_occupied',
       'user_geography', 'user_intentions', 'user_brands', 'user_categories'] \
        >> Categorify(dtype='int32') >> TagAsUserFeatures() 

inputs = user_id + item_id + item_features + user_features + ['click'] 

outputs = inputs >> Filter(f=lambda df: df["click"] == 1)

transform_aliccp((train, valid), output_path, nvt_workflow=outputs, workflow_name='workflow_retrieval')



In [7]:
train_tt = Dataset(os.path.join(output_path, 'train', '*.parquet'))
valid_tt = Dataset(os.path.join(output_path, 'valid', '*.parquet'))

schema = train_tt.schema
schema = schema.select_by_tag([Tags.ITEM_ID, Tags.USER_ID, Tags.ITEM, Tags.USER])



In [8]:
model = mm.TwoTowerModel(
    schema,
    query_tower=mm.MLPBlock([128, 64], no_activation_last_layer=True),        
    loss="categorical_crossentropy",  
    samplers=[mm.InBatchSampler()],
    embedding_options = mm.EmbeddingOptions(infer_embedding_sizes=True),
    metrics=[mm.RecallAt(10), mm.NDCGAt(10)]
)

In [9]:
model.compile(optimizer='adam', run_eagerly=False)
model.fit(train_tt, validation_data=valid_tt, batch_size=1024*8, epochs=13)

Epoch 1/13


2022-12-05 12:37:07.867350: I tensorflow/stream_executor/cuda/cuda_blas.cc:1786] TensorFloat-32 will be used for the matrix multiplication. This will only be logged once.




2022-12-05 12:37:34.637443: W tensorflow/core/grappler/optimizers/loop_optimizer.cc:907] Skipping loop optimization for Merge node with control input: cond/branch_executed/_24


Epoch 2/13
Epoch 3/13
Epoch 4/13
Epoch 5/13
Epoch 6/13
Epoch 7/13
Epoch 8/13
Epoch 9/13
Epoch 10/13
Epoch 11/13
Epoch 12/13
Epoch 13/13


<keras.callbacks.History at 0x7fa200503100>

### Repozytorium atrybutów - Feast

Utworzenie repozytorium

In [24]:
!cd $BASE_DIR && feast init feature_repo

Feast is an open source project that collects anonymized error reporting and usage statistics. To opt out or learn more see https://docs.feast.dev/reference/usage
--------------------------------------------------------------------------------

  CuPy may not function correctly because multiple CuPy packages are installed
  in your environment:

    cupy-cuda116, cupy-cuda11x

  Follow these steps to resolve this issue:

    1. For all packages listed above, run the following command to remove all
       existing CuPy installations:

         $ pip uninstall <package_name>

      If you previously installed CuPy via conda, also run the following:

         $ conda uninstall cupy

    2. Install the appropriate CuPy package.
       Refer to the Installation Guide for detailed instructions.

         https://docs.cupy.dev/en/stable/install.html

--------------------------------------------------------------------------------


Creating a new Feast repository in [1m[32m/Merlin/examples/

You should be seeing a message like <i>Creating a new Feast repository in ... </i> printed out above. Now, navigate to the `feature_repo` folder and remove the demo parquet file created by default, and `examples.py` file.

In [25]:
os.remove(os.path.join(BASE_DIR, 'feature_repo', 'example.py'))
os.remove(os.path.join(BASE_DIR, 'feature_repo/data', 'driver_stats.parquet'))

### Eksport modelu Two Tower za pomocą API Merlin w postaci umożliwiającej proponowanie produktów sla uzytkownika

In [26]:
query_tower = model.retrieval_block.query_block()
query_tower.save(os.path.join(BASE_DIR, 'query_tower'))

### Eksport atrybutów użytkowników oraz produktów

In [27]:
from merlin.models.utils.dataset import unique_rows_by_features
user_features = unique_rows_by_features(train, Tags.USER, Tags.USER_ID).compute().reset_index(drop=True)

In [28]:
user_features.head()

Unnamed: 0,user_id,user_shops,user_profile,user_group,user_gender,user_age,user_consumption_2,user_is_occupied,user_geography,user_intentions,user_brands,user_categories
0,1,3,1,6,2,2,3,1,1,56,2,20
1,2,1,1,2,1,2,1,1,1,1,12,105
2,3,1602,24,8,2,3,1,1,2,5,64,3
3,4,112,2,3,3,5,2,2,1,69,134,9
4,5,982,2,3,3,5,2,2,1,676,286,17


Feast wymaga atrybutów `datetime` i `created` - dodajemy sztucznie

In [29]:
from datetime import datetime
user_features["datetime"] = datetime.now()
user_features["datetime"] = user_features["datetime"].astype("datetime64[ns]")
user_features["created"] = datetime.now()
user_features["created"] = user_features["created"].astype("datetime64[ns]")

In [30]:
user_features.head()

Unnamed: 0,user_id,user_shops,user_profile,user_group,user_gender,user_age,user_consumption_2,user_is_occupied,user_geography,user_intentions,user_brands,user_categories,datetime,created
0,1,3,1,6,2,2,3,1,1,56,2,20,2022-11-22 18:55:19.239479,2022-11-22 18:55:19.536387
1,2,1,1,2,1,2,1,1,1,1,12,105,2022-11-22 18:55:19.239479,2022-11-22 18:55:19.536387
2,3,1602,24,8,2,3,1,1,2,5,64,3,2022-11-22 18:55:19.239479,2022-11-22 18:55:19.536387
3,4,112,2,3,3,5,2,2,1,69,134,9,2022-11-22 18:55:19.239479,2022-11-22 18:55:19.536387
4,5,982,2,3,3,5,2,2,1,676,286,17,2022-11-22 18:55:19.239479,2022-11-22 18:55:19.536387


In [31]:
user_features.to_parquet(os.path.join(BASE_DIR, 'feature_repo/data', 'user_features.parquet'))

In [32]:
item_features = unique_rows_by_features(train, Tags.ITEM, Tags.ITEM_ID).compute().reset_index(drop=True)

In [33]:
item_features.shape

(3168666, 4)

In [34]:
item_features["datetime"] = datetime.now()
item_features["datetime"] = item_features["datetime"].astype("datetime64[ns]")
item_features["created"] = datetime.now()
item_features["created"] = item_features["created"].astype("datetime64[ns]")

In [35]:
item_features.head()

Unnamed: 0,item_id,item_category,item_shop,item_brand,datetime,created
0,1,446,435,466,2022-11-22 18:55:37.551002,2022-11-22 18:55:37.554488
1,2,193,1202,124,2022-11-22 18:55:37.551002,2022-11-22 18:55:37.554488
2,3,1722,528,796,2022-11-22 18:55:37.551002,2022-11-22 18:55:37.554488
3,4,227,2641,2712,2022-11-22 18:55:37.551002,2022-11-22 18:55:37.554488
4,5,618,1762,2692,2022-11-22 18:55:37.551002,2022-11-22 18:55:37.554488


In [36]:
# save to disk
item_features.to_parquet(os.path.join(BASE_DIR, 'feature_repo/data', 'item_features.parquet'))

### Ekstrakcja i zapis reprezentacji wektorowej produktów

In [37]:
item_embs = model.item_embeddings(Dataset(item_features, schema=schema), batch_size=1024)
item_embs_df = item_embs.compute(scheduler="synchronous")

In [38]:
# select only item_id together with embedding columns 
item_embeddings = item_embs_df.drop(columns=['item_category', 'item_shop', 'item_brand'])

In [39]:
item_embeddings.head()

Unnamed: 0,item_id,0,1,2,3,4,5,6,7,8,...,54,55,56,57,58,59,60,61,62,63
0,1,-0.020003,-0.075518,-0.087866,0.151562,-0.110037,0.495973,0.079019,-0.025561,0.254261,...,-0.140148,-0.005431,0.007796,0.039577,0.253102,-0.441789,-0.221366,0.143352,-0.163019,0.468177
1,2,0.153656,-0.047635,0.195094,-0.237609,-0.25457,-0.554159,0.135774,0.182235,0.155593,...,0.360716,-0.165115,0.313495,-0.251373,-0.316715,0.149595,0.530286,-0.156772,0.133417,-0.466848
2,3,0.041907,0.208076,-0.007657,0.036008,-0.052415,0.113364,0.145278,-0.07368,0.20054,...,0.025312,-0.061955,-0.03442,-0.077453,0.177896,-0.243942,-0.116789,0.04972,-0.222127,0.227698
3,4,-0.065537,0.010116,-0.010827,-0.091568,-0.14368,-0.358728,0.037551,0.113641,-0.01071,...,0.351133,-0.181788,0.167121,-0.298541,-0.103837,0.075279,0.450471,-0.030042,0.103019,-0.351265
4,5,-0.111307,0.130277,-0.084017,0.154907,0.043098,0.265775,0.039192,-0.113902,0.134627,...,-0.171411,0.020445,-0.022748,0.037004,0.150457,-0.411173,-0.179397,0.193402,-0.31911,0.373385


In [40]:
# save to disk
item_embeddings.to_parquet(os.path.join(BASE_DIR,'item_embeddings.parquet'))

### Programowy opis atrybutów użytkownika i produktów na potrzeby repozytorium atrybutów

In [41]:
file = open(os.path.join(BASE_DIR, 'feature_repo/','user_features.py'), "w")
file.write(
'''
from google.protobuf.duration_pb2 import Duration
import datetime 
from feast import Entity, Feature, FeatureView, ValueType
from feast.infra.offline_stores.file_source import FileSource

user_features = FileSource(
    path="{}",
    event_timestamp_column="datetime",
    created_timestamp_column="created",
)

user = Entity(name="user_id", value_type=ValueType.INT32, description="user id",)

user_features_view = FeatureView(
    name="user_features",
    entities=["user_id"],
    ttl=Duration(seconds=86400 * 7),
    features=[
        Feature(name="user_shops", dtype=ValueType.INT32),
        Feature(name="user_profile", dtype=ValueType.INT32),
        Feature(name="user_group", dtype=ValueType.INT32),
        Feature(name="user_gender", dtype=ValueType.INT32),
        Feature(name="user_age", dtype=ValueType.INT32),
        Feature(name="user_consumption_2", dtype=ValueType.INT32),
        Feature(name="user_is_occupied", dtype=ValueType.INT32),
        Feature(name="user_geography", dtype=ValueType.INT32),
        Feature(name="user_intentions", dtype=ValueType.INT32),
        Feature(name="user_brands", dtype=ValueType.INT32),
        Feature(name="user_categories", dtype=ValueType.INT32),
    ],
    online=True,
    input=user_features,
    tags=dict(),
)
'''.format(os.path.join(BASE_DIR, 'feature_repo/data/','user_features.parquet'))
)
file.close()

In [42]:
with open(os.path.join(BASE_DIR, 'feature_repo/','item_features.py'), "w") as f:
    f.write(
'''
from google.protobuf.duration_pb2 import Duration
import datetime 
from feast import Entity, Feature, FeatureView, ValueType
from feast.infra.offline_stores.file_source import FileSource

item_features = FileSource(
    path="{}",
    event_timestamp_column="datetime",
    created_timestamp_column="created",
)

item = Entity(name="item_id", value_type=ValueType.INT32, description="item id",)

item_features_view = FeatureView(
    name="item_features",
    entities=["item_id"],
    ttl=Duration(seconds=86400 * 7),
    features=[
        Feature(name="item_category", dtype=ValueType.INT32),
        Feature(name="item_shop", dtype=ValueType.INT32),
        Feature(name="item_brand", dtype=ValueType.INT32),
    ],
    online=True,
    input=item_features,
    tags=dict(),
)
'''.format(os.path.join(BASE_DIR, 'feature_repo/data/','item_features.parquet'))
    )
file.close() 

Struktura repozytorium atrybutów

In [43]:
# install seedir
# !pip install seedir

Looking in indexes: https://pypi.org/simple, https://pypi.ngc.nvidia.com
Collecting seedir
  Downloading seedir-0.4.0-py3-none-any.whl (111 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m111.3/111.3 KB[0m [31m1.5 MB/s[0m eta [36m0:00:00[0ma [36m0:00:01[0m
[?25hCollecting natsort
  Downloading natsort-8.2.0-py3-none-any.whl (37 kB)
Installing collected packages: natsort, seedir
Successfully installed natsort-8.2.0 seedir-0.4.0
You should consider upgrading via the '/usr/bin/python3 -m pip install --upgrade pip' command.[0m[33m
[0m

In [44]:
import seedir as sd
feature_repo_path = os.path.join(BASE_DIR, 'feature_repo')
sd.seedir(feature_repo_path, style='lines', itemlimit=10, depthlimit=3, exclude_folders='.ipynb_checkpoints', sort=True)

feature_repo/
├─__init__.py
├─data/
│ ├─item_features.parquet
│ └─user_features.parquet
├─feature_store.yaml
├─item_features.py
└─user_features.py


### Następne kroki

Zostały wytrenowane i wyeksportowane modele etpaów Retrieval i Scoring, mamy utworzone repozytorium atrybutów produktów i użytkownika.
Następnie modele zostaną osadzone w [Triton Inference Server (TIS)](https://github.com/triton-inference-server/server) w celu uruchomienia (inferencji).
