In [1]:
# Copyright 2021 NVIDIA Corporation. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================

# Session-based Recommendation with XLNET

In this notebook we introduce the [Transformers4Rec](https://github.com/NVIDIA-Merlin/Transformers4Rec) library for sequential and session-based recommendation. This notebook uses the PyTorch API, but a TensorFlow API is also available. Transformers4Rec integrates with the popular [HuggingFace’s Transformers](https://github.com/huggingface/transformers) and make it possible to experiment with cutting-edge implementation of the latest NLP Transformer architectures.  

We demonstrate how to build a session-based recommendation model with the [XLNET](https://arxiv.org/abs/1906.08237) Transformer architecture. The XLNet architecture was designed to leverage the best of both auto-regressive language modeling and auto-encoding with its Permutation Language Modeling training method. In this example we will use XLNET with masked language modeling (MLM) training method, which showed very promising results in the experiments conducted in our [ACM RecSys'21 paper](https://github.com/NVIDIA-Merlin/publications/blob/main/2021_acm_recsys_transformers4rec/recsys21_transformers4rec_paper.pdf).

In the previous notebook we went through our ETL pipeline with NVTabular library, and created sequential features to be used in training a session-based recommendation model. In this notebook we will learn:

- Accelerating data loading of parquet files with multiple features on PyTorch using NVTabular library
- Training and evaluating a Transformer-based (XLNET-MLM) session-based recommendation model with multiple features

## Build a DL model with Transformers4Rec library  

Transformers4Rec supports multiple input features and provides configurable building blocks that can be easily combined for custom architectures:

- [TabularSequenceFeatures](https://nvidia-merlin.github.io/Transformers4Rec/main/api/transformers4rec.tf.html#transformers4rec.torch.TabularSequenceFeatures) class that reads from schema and creates an input block. This input module combines different types of features (continuous, categorical & text) to a sequence.
-  [MaskSequence](https://github.com/NVIDIA-Merlin/Transformers4Rec/blob/main/transformers4rec/tf/masking.py) to define masking schema and prepare the masked inputs and labels for the selected LM task.
-  [TransformerBlock](https://nvidia-merlin.github.io/Transformers4Rec/main/api/transformers4rec.tf.html#transformers4rec.torch.TransformerBlock) class that supports HuggingFace Transformers for session-based and sequential-based recommendation models.
-  [SequentialBlock](https://nvidia-merlin.github.io/Transformers4Rec/main/api/transformers4rec.tf.html#transformers4rec.torch.SequentialBlock) creates the body by mimicking [tf.keras.Sequential](https://www.tensorflow.org/api_docs/python/tf/keras/Sequential) class. It is designed to define our model as a sequence of layers.
-  [Head](https://nvidia-merlin.github.io/Transformers4Rec/main/api/transformers4rec.tf.html#transformers4rec.tf.Head) where we define the prediction task of the model.
-  [NextItemPredictionTask](https://nvidia-merlin.github.io/Transformers4Rec/main/api/transformers4rec.tf.html#transformers4rec.tf.NextItemPredictionTask) is the class to support next item prediction task.

You can check the [full documentation](https://nvidia-merlin.github.io/Transformers4Rec/main/index.html) of Transformers4Rec if needed.

Figure 1 illustrates Transformers4Rec meta-architecture and how each module/block interacts with each other.

![tf4rec_meta](images/tf4rec_meta2.png)

### Imports required libraries

In [2]:
import os
import os
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3' 
import glob

from nvtabular.loader.tensorflow import KerasSequenceLoader

from transformers4rec import tf as tr
from transformers4rec.tf.ranking_metric import NDCGAt, AvgPrecisionAt, RecallAt

In [3]:
import cudf
import dask_cudf
from cudf.io.parquet import ParquetWriter as pwriter_cudf
from dask_cudf.io.parquet import CudfEngine

Transformers4Rec library relies on a schema object to automatically build all necessary layers to represent, normalize and aggregate input features. As you can see below, `schema.pb` is a protobuf file that contains metadata including statistics about features such as cardinality, min and max values and also tags features based on their characteristics and dtypes (e.g., categorical, continuous, list, integer).

### Manually set the schema 

In [4]:
from merlin_standard_lib import Schema
SCHEMA_PATH = "schema.pb"
schema = Schema().from_proto_text(SCHEMA_PATH)
# !cat $SCHEMA_PATH

In [5]:
# You can select a subset of features for training
schema = schema.select_by_name(['item_id-list_trim', 
                                'category-list_trim'])

# 'timestamp/weekday/sin-list_trim', 'timestamp/age_days-list_trim'

In [6]:
import nvtabular as nvt
workflow = nvt.Workflow.load('workflow_etl')

In [7]:
workflow.output_dtypes

{'item_id-count': dtype('int32'),
 'day-first': dtype('int64'),
 'session_id': dtype('int64'),
 'category-list_trim': ListDtype(int64),
 'item_id-list_trim': ListDtype(int64),
 'timestamp/age_days-list_trim': ListDtype(float64),
 'timestamp/weekday/sin-list_trim': ListDtype(float64)}

### Define the sequential input module

Below we define our `input` block using the `TabularSequenceFeatures` [class](https://github.com/NVIDIA-Merlin/Transformers4Rec/blob/main/transformers4rec/tf/features/sequence.py#L121). The `from_schema()` method processes the schema and creates the necessary layers to represent features and aggregate them. It keeps only features tagged as `categorical` and `continuous` and supports data aggregation methods like `concat` and `elementwise-sum` techniques. It also support data augmentation techniques like stochastic swap noise. It outputs an interaction representation after combining all features and also the input mask according to the training task (more on this later).


The `max_sequence_length` argument defines the maximum sequence length of our sequential input, and if `continuous_projection` argument is set, all numerical features are concatenated and projected by an MLP block so that continuous features are represented by a vector of size defined by user, which is `64` in this example.

In [8]:
inputs = tr.TabularSequenceFeatures.from_schema(
        schema,
        max_sequence_length=20,
        d_output=100,
        masking="mlm",
)

The output of the `TabularSequenceFeatures` module is the sequence of interactions embeddings vectors defined in the following steps:
- 1. Create sequence inputs: If the schema contains non sequential features, expand each feature to a sequence by repeating the value as many as the `max_sequence_length` value.  
- 2. Get a representation vector of categorical features: Project each sequential categorical feature using the related embedding table. The resulting tensor is of shape (bs, max_sequence_length, embed_dim).
- 3. Project scalar values if `continuous_projection` is set : Apply an MLP layer with hidden size equal to `continuous_projection` vector size value. The resulting tensor is of shape (batch_size, max_sequence_length, continuous_projection).
- 4. Aggregate the list of features vectors to represent each interaction in the sequence with one vector: For example, `concat` will concat all vectors based on the last dimension `-1` and the resulting tensor will be of shape (batch_size, max_sequence_length, D) where D is the sum over all embedding dimensions and the value of continuous_projection. 
- 5. If masking schema is set (needed only for the NextItemPredictionTask training), the masked labels are derived from the sequence of raw item-ids and the sequence of interactions embeddings are processed to mask information about the masked positions.

### Define the Transformer Block

In the next cell, the whole model is build with a few lines of code. 
Here is a brief explanation of the main classes:  
- [XLNetConfig](https://github.com/NVIDIA-Merlin/Transformers4Rec/blob/main/transformers4rec/config/transformer.py#L261) - We have injected in the HF transformers config classes like `XLNetConfig` the `build()` method, that provides default configuration to Transformer architectures for session-based recommendation. Here we use it to instantiate and configure an XLNET architecture.  
- [TransformerBlock](https://github.com/NVIDIA-Merlin/Transformers4Rec/blob/main/transformers4rec/tf/block/transformer.py#L42) class integrates with HF Transformers, which are made available as a sequence processing module for session-based and sequential-based recommendation models.  
- [NextItemPredictionTask](https://github.com/NVIDIA-Merlin/Transformers4Rec/blob/405e3142f1274b1b0d642f4834ac437f2549cd33/transformers4rec/tf/model/prediction_task.py#82) supports the next-item prediction task. We also support other predictions [tasks](https://github.com/NVIDIA-Merlin/Transformers4Rec/blob/main/transformers4rec/tf/model/prediction_task.py), like classification and regression for the whole sequence. 

In [9]:
# Define XLNetConfig class and set default parameters for HF XLNet config  
transformer_config = tr.XLNetConfig.build(
    d_model=64, n_head=4, n_layer=2, total_seq_length=20
)
# Define the model block including: inputs, masking, projection and transformer block.
body = tr.SequentialBlock(
    [inputs, tr.MLPBlock([64]), tr.TransformerBlock(transformer_config, masking=inputs.masking)]
)

# Defines the evaluation top-N metrics and the cut-offs
metrics = (
    NDCGAt(top_ks=[1, 5, 20, 40], labels_onehot=True),  
    RecallAt(top_ks=[1, 5, 20, 40], labels_onehot=True)
          )

# link task to body and generate the end-to-end keras model
task = tr.NextItemPredictionTask(weight_tying=True, metrics=metrics)
 
model = task.to_model(body=body)

### Train the model 

We use the NVTabular `KerasSequenceLoader` Dataloader for optimized loading of multiple features from input parquet files. You can learn more about this data loader [here](https://nvidia-merlin.github.io/NVTabular/main/training/tensorflow.html).

### **Set DataLoader**

In [10]:
x_cat_names, x_cont_names = ['category-list_trim', 'item_id-list_trim'], ['timestamp/age_days-list_trim', 'timestamp/weekday/sin-list_trim']

# dictionary representing max sequence length for column
sparse_features_max = {
    fname: 20
    for fname in x_cat_names + x_cont_names
}

def get_dataloader(paths_or_dataset, batch_size=64):
    dataloader = KerasSequenceLoader(
        paths_or_dataset,
        batch_size=batch_size,
        label_names=None,
        cat_names=x_cat_names,
        cont_names=x_cont_names,
        sparse_names=list(sparse_features_max.keys()),
        sparse_max=sparse_features_max,
        sparse_as_dense=True,
    )
    return dataloader.map(lambda X, y: (X, []))

## Set training params

In [11]:
import tensorflow as tf
lr_schedule = tf.keras.optimizers.schedules.CosineDecay(
    initial_learning_rate=0.0005,
    decay_steps=1.5)
optimizer = tf.keras.optimizers.Adam(learning_rate=lr_schedule)
model.compile(optimizer="adam", run_eagerly=True)

## Daily Fine-Tuning: Training over a time window

Here we do daily fine-tuning meaning that we use the first day to train and second day to evaluate, then we use the second day data to train the model by resuming from the first step, and evaluate on the third day, so on so forth.

- Define the output folder of the processed parquet files

In [12]:
OUTPUT_DIR = os.environ.get("OUTPUT_DIR", "./sessions_by_day")

In [13]:
import warnings
warnings.filterwarnings('ignore')

In [14]:
start_time_window_index = 1
final_time_window_index = 2
#Iterating over days of one week
for time_index in range(start_time_window_index, final_time_window_index):
    # Set data 
    time_index_train = time_index
    time_index_eval = time_index + 1
    train_paths = glob.glob(os.path.join(OUTPUT_DIR, f"{time_index_train}/train.parquet"))
    eval_paths = glob.glob(os.path.join(OUTPUT_DIR, f"{time_index_eval}/valid.parquet"))
    print(train_paths)

    # Train on day related to time_index 
    print('*'*20)
    print("Launch training for day %s are:" %time_index)
    print('*'*20 + '\n')
    train_loader = get_dataloader(train_paths) 
    losses = model.fit(train_loader, epochs=1, verbose=0, )
    model.reset_metrics()
    print('finished')
    # Evaluate on the following day
    eval_loader = get_dataloader(eval_paths) 
    eval_metrics = model.evaluate(eval_loader, return_dict=True)
    print('*'*20)
    print("Eval results for day %s are:\t" %time_index_eval)
    print('\n' + '*'*20 + '\n')
    for key in sorted(eval_metrics.keys()):
        print(" %s = %s" % (key, str(eval_metrics[key]))) 

['./sessions_by_day/1/train.parquet']
********************
Launch training for day 1 are:
********************

finished
********************
Eval results for day 2 are:	

********************

 eval_ndcg_at_11 = 0.5714285969734192
 eval_ndcg_at_120 = 0.5714285969734192
 eval_ndcg_at_140 = 0.5714285969734192
 eval_ndcg_at_15 = 0.5714285969734192
 eval_recall_at_11 = 0.4196428656578064
 eval_recall_at_120 = 0.5625
 eval_recall_at_140 = 0.5714285969734192
 eval_recall_at_15 = 0.5535714030265808
 loss = 8.135071754455566
 regularization_loss = 0
 total_loss = 8.135071754455566


In [15]:
# Evaluate on the following day
time_index_eval = 5
eval_paths = glob.glob(os.path.join(OUTPUT_DIR, f"{time_index_eval}/valid.parquet"))
eval_loader = get_dataloader(eval_paths) 
eval_metrics = model.evaluate(eval_loader, return_dict=True, )



In [16]:
eval_metrics

{'eval_ndcg_at_11': 0.6571428775787354,
 'eval_ndcg_at_15': 0.6571428775787354,
 'eval_ndcg_at_120': 0.6571428775787354,
 'eval_ndcg_at_140': 0.6571428775787354,
 'eval_recall_at_11': 0.4952380955219269,
 'eval_recall_at_15': 0.5523809790611267,
 'eval_recall_at_120': 0.6000000238418579,
 'eval_recall_at_140': 0.6571428775787354,
 'loss': 8.021478652954102,
 'regularization_loss': 0,
 'total_loss': 8.021478652954102}

In [17]:
print('*'*20)
print("Eval results for day %s are:\t" %time_index_eval)
print('\n' + '*'*20 + '\n')
for key in sorted(eval_metrics.keys()):
    print(" %s = %s" % (key, str(eval_metrics[key]))) 

********************
Eval results for day 5 are:	

********************

 eval_ndcg_at_11 = 0.6571428775787354
 eval_ndcg_at_120 = 0.6571428775787354
 eval_ndcg_at_140 = 0.6571428775787354
 eval_ndcg_at_15 = 0.6571428775787354
 eval_recall_at_11 = 0.4952380955219269
 eval_recall_at_120 = 0.6000000238418579
 eval_recall_at_140 = 0.6571428775787354
 eval_recall_at_15 = 0.5523809790611267
 loss = 8.021478652954102
 regularization_loss = 0
 total_loss = 8.021478652954102


### Saves the model

In [18]:
model.save('./tmp/tensorflow')





INFO:tensorflow:Assets written to: ./tmp/tensorflow/assets


INFO:tensorflow:Assets written to: ./tmp/tensorflow/assets


### Reloads the model

In [19]:
custom_objects = dict([(e.__class__.__name__, e) for e in task.metrics])
custom_objects.update({"Model": tr.model.base.Model})
model = tf.keras.models.load_model('./tmp/tensorflow', custom_objects=custom_objects)

In [20]:
batch = next(iter(eval_loader))

In [24]:
batch[0]['category-list_trim'].shape

TensorShape([64, 20])

In [22]:
model(batch[0])

<tf.Tensor: shape=(64, 50006), dtype=float32, numpy=
array([[-11.441303 , -11.963867 , -10.763562 , ..., -10.943857 ,
        -11.11692  , -10.279627 ],
       [-10.86855  , -11.575258 , -10.653306 , ..., -10.151318 ,
        -10.895669 , -11.588299 ],
       [-11.242266 , -11.753831 ,  -9.920208 , ..., -10.659088 ,
        -11.2164345, -13.153706 ],
       ...,
       [-10.279788 , -11.126778 , -11.503899 , ..., -10.39446  ,
        -12.894978 , -11.764042 ],
       [-11.198521 , -10.784124 ,  -9.615355 , ..., -11.636454 ,
        -11.982885 ,  -9.753237 ],
       [-11.469727 , -11.97579  , -10.524342 , ..., -11.250589 ,
        -11.533768 , -10.358003 ]], dtype=float32)>

## TRITON

In [1]:
import nvtabular as nvt

In [2]:
workflow= nvt.Workflow.load('workflow_etl')

In [3]:
workflow.output_dtypes

{'item_id_count': dtype('int32'),
 'day_first': dtype('int64'),
 'session_id': dtype('int64'),
 'category_list_trim': ListDtype(int64),
 'item_id_list_trim': ListDtype(int64),
 'timestamp/age_days_list_trim': ListDtype(float32),
 'timestamp/weekday/sin_list_trim': ListDtype(float32)}

In [4]:
import tritonhttpclient
try:
    triton_client = tritonhttpclient.InferenceServerClient(url="192.168.0.4:8000", verbose=True)
    print("client created.")
except Exception as e:
    print("channel creation failed: " + str(e))
triton_client.is_server_live()

client created.
GET /v2/health/live, headers None
<HTTPSocketPoolResponse status=200 headers={'content-length': '0', 'content-type': 'text/plain'}>




True

In [5]:
modelName = "tf4rec_tf"
print(triton_client.is_server_live())
print(triton_client.is_server_ready())
print(triton_client.is_model_ready(modelName,"1"))

GET /v2/health/live, headers None
<HTTPSocketPoolResponse status=200 headers={'content-length': '0', 'content-type': 'text/plain'}>
True
GET /v2/health/ready, headers None
<HTTPSocketPoolResponse status=200 headers={'content-length': '0', 'content-type': 'text/plain'}>
True
GET /v2/models/tf4rec_tf/versions/1/ready, headers None
<HTTPSocketPoolResponse status=200 headers={'content-length': '0', 'content-type': 'text/plain'}>
True


In [6]:
triton_client.get_model_repository_index()

POST /v2/repository/index, headers None

<HTTPSocketPoolResponse status=200 headers={'content-type': 'application/json', 'content-length': '52'}>
bytearray(b'[{"name":"tf4rec_tf","version":"1","state":"READY"}]')


[{'name': 'tf4rec_tf', 'version': '1', 'state': 'READY'}]

In [None]:
# triton_client.unload_model(model_name="movielens")
# triton_client.unload_model(model_name="movielens_nvt")
# triton_client.unload_model(model_name="movielens_tf")

In [7]:
triton_client.get_model_metadata(modelName)

GET /v2/models/tf4rec_tf, headers None
<HTTPSocketPoolResponse status=200 headers={'content-type': 'application/json', 'content-length': '430'}>
bytearray(b'{"name":"tf4rec_tf","versions":["1"],"platform":"tensorflow_savedmodel","inputs":[{"name":"category_list_trim","datatype":"INT64","shape":[-1,20]},{"name":"item_id_list_trim","datatype":"INT64","shape":[-1,20]},{"name":"timestamp/age_days_list_trim","datatype":"FP32","shape":[-1,20]},{"name":"timestamp/weekday/sin_list_trim","datatype":"FP32","shape":[-1,20]}],"outputs":[{"name":"output_1","datatype":"FP32","shape":[-1,506]}]}')


{'name': 'tf4rec_tf',
 'versions': ['1'],
 'platform': 'tensorflow_savedmodel',
 'inputs': [{'name': 'category_list_trim',
   'datatype': 'INT64',
   'shape': [-1, 20]},
  {'name': 'item_id_list_trim', 'datatype': 'INT64', 'shape': [-1, 20]},
  {'name': 'timestamp/age_days_list_trim',
   'datatype': 'FP32',
   'shape': [-1, 20]},
  {'name': 'timestamp/weekday/sin_list_trim',
   'datatype': 'FP32',
   'shape': [-1, 20]}],
 'outputs': [{'name': 'output_1', 'datatype': 'FP32', 'shape': [-1, 506]}]}

In [8]:
import pandas as pd
import cudf
interactions_merged_df=cudf.read_parquet('./sessions_by_day/1/train.parquet')
batch = interactions_merged_df[:20][['category_list_trim', 'item_id_list_trim', 'timestamp/age_days_list_trim', 'timestamp/weekday/sin_list_trim']]
batch.head()

Unnamed: 0,category_list_trim,item_id_list_trim,timestamp/age_days_list_trim,timestamp/weekday/sin_list_trim
0,"[21, 6, 2, 11, 39, 30, 22, 8, 2, 4, 4, 10, 9, ...","[61, 34, 13, 27, 121, 87, 71, 20, 16, 5, 4, 33...","[0.1853242, 0.36186182, 0.4854968, 0.13430548,...","[0.62441665, 0.5125444, 0.37621656, 0.8952086,..."
1,"[4, 4, 2, 4, 4, 26, 7, 6, 2, 7, 5, 6, 10, 10, ...","[7, 4, 12, 4, 4, 78, 18, 8, 13, 19, 10, 21, 31...","[0.32196924, 0.9394915, 0.40144306, 0.4010484,...","[0.06354862, 0.757689, 0.8743491, 0.47241625, ..."
2,"[11, 3, 7, 65, 12, 3, 71, 8, 19, 4, 3, 8, 3, 1...","[27, 3, 19, 203, 35, 6, 206, 22, 58, 5, 3, 20,...","[0.14342074, 0.9007808, 0.7347703, 0.04563174,...","[0.6583806, 0.9247837, 0.61639905, 0.8016512, ..."
4,"[4, 3, 2, 8, 14, 4, 8, 2, 6, 3, 11, 13, 4, 2, ...","[7, 2, 15, 22, 46, 4, 22, 13, 8, 2, 27, 38, 4,...","[0.33066002, 0.46732098, 0.67253935, 0.9536914...","[0.5335744, 0.8635724, 0.7746976, 0.14082146, ..."
5,"[3, 21, 4, 28, 2, 12, 10, 8, 2, 30, 3, 9, 16, ...","[6, 61, 7, 88, 13, 37, 31, 23, 15, 91, 3, 24, ...","[0.12875848, 0.97819775, 0.1008913, 0.50742507...","[0.03943971, 0.12544261, 0.41058296, 0.2909649..."


In [9]:
batch.shape

(20, 4)

In [10]:
batch.dtypes

category_list_trim                 list
item_id_list_trim                  list
timestamp/age_days_list_trim       list
timestamp/weekday/sin_list_trim    list
dtype: object

In [15]:
df = cudf.DataFrame({"x": [[1.,2,3], [4,5,6], [7,8,9]], })


df.x.list.leaves[0: 6].values_host.reshape(2, 3)

array([[1., 2., 3.],
       [4., 5., 6.]])

In [16]:
df.x.list.leaves[0: 6].values_host.reshape(2, 3)

array([[1., 2., 3.],
       [4., 5., 6.]])

In [12]:
batch['category_list_trim'].list.leaves[:200].values_host.reshape(10, 20)

array([[21,  6,  2, 11, 39, 30, 22,  8,  2,  4,  4, 10,  9,  6, 12,  5,
         6,  6,  5,  7],
       [ 4,  4,  2,  4,  4, 26,  7,  6,  2,  7,  5,  6, 10, 10,  3, 18,
        11, 12, 49,  5],
       [11,  3,  7, 65, 12,  3, 71,  8, 19,  4,  3,  8,  3, 16,  5, 34,
        10,  4,  5,  4],
       [ 4,  3,  2,  8, 14,  4,  8,  2,  6,  3, 11, 13,  4,  2, 34,  8,
         8,  8,  9,  3],
       [21,  4, 28,  2, 12, 10,  8,  2, 30,  3,  9, 16, 39, 17,  7,  3,
        29,  6, 16,  4],
       [ 8,  9,  6,  6, 27,  3,  2, 11,  8,  8,  7, 29,  2,  7, 13,  4,
        16,  3, 13,  3],
       [31, 18,  8,  5, 10, 12,  6,  5,  5, 33,  5, 14,  2, 12, 37,  8,
         7,  4, 15, 13],
       [ 3, 12, 39, 13,  8,  6,  4, 15,  5,  2, 11,  2,  2,  3,  8, 41,
         4,  4,  2,  3],
       [ 5,  4,  2, 30,  3, 11,  7,  6, 10, 38,  8, 10, 42,  4,  8,  7,
        16,  4, 29, 11],
       [ 3,  6, 10, 11, 49,  3, 11,  4,  5,  2,  7,  4,  6,  7,  5,  6,
        17,  4,  7,  9]])

In [13]:
import nvtabular.inference.triton as nvt_triton
import tritonclient.grpc as grpcclient

# inputs = nvt_triton.convert_df_to_triton_input(batch.columns, batch, grpcclient.InferInput)

In [14]:
type(batch)

cudf.core.dataframe.DataFrame

In [15]:
categ_ids = batch['category_list_trim'].list.leaves[:200].values_host.reshape(10, 20)
item_ids = batch['item_id_list_trim'].list.leaves[:200].values_host.reshape(10, 20)
age_days = batch['timestamp/age_days_list_trim'].list.leaves[:200].values_host.reshape(10, 20)
week_days = batch['timestamp/weekday/sin_list_trim'].list.leaves[:200].values_host.reshape(10, 20)

In [16]:
categ_ids.shape, item_ids.shape

((10, 20), (10, 20))

In [17]:
categ_ids.shape, week_days.shape

((10, 20), (10, 20))

In [34]:
# import numpy as np
# dtype = np.int64
# categ_ids = np.array(batch['inputs/category-list_trim'][0:10], dtype=dtype)[None,...] # make bs=1
# item_ids = np.array(batch['inputs/item_id-list_trim'][0:10], dtype=dtype)[None,...] # make bs=1

In [20]:
inputs = []
inputs.append(tritonhttpclient.InferInput('category_list_trim', categ_ids.shape, "INT64"))
inputs.append(tritonhttpclient.InferInput('item_id_list_trim', item_ids.shape, "INT64"))
inputs.append(tritonhttpclient.InferInput('timestamp/age_days_list_trim', age_days.shape, "FP32"))
inputs.append(tritonhttpclient.InferInput('timestamp/weekday/sin_list_trim', week_days.shape, "FP32"))

In [21]:
inputs[0].set_data_from_numpy(categ_ids, binary_data=False)
inputs[1].set_data_from_numpy(item_ids, binary_data=False)
inputs[2].set_data_from_numpy(age_days, binary_data=False)
inputs[3].set_data_from_numpy(week_days, binary_data=False)

In [22]:
inputs[2]._get_tensor()

{'name': 'timestamp/age_days_list_trim',
 'shape': (10, 20),
 'datatype': 'FP32',
 'data': [0.18532420694828033,
  0.36186182498931885,
  0.48549678921699524,
  0.13430547714233398,
  0.8259202241897583,
  0.7516406178474426,
  0.5529733300209045,
  0.5101205110549927,
  0.7000038623809814,
  0.011482291854918003,
  0.4896935224533081,
  0.19191522896289825,
  0.773910403251648,
  0.6394434571266174,
  0.8883877992630005,
  0.29751238226890564,
  0.47845229506492615,
  0.016086123883724213,
  0.3428962826728821,
  0.7435528635978699,
  0.32196924090385437,
  0.9394915103912354,
  0.4014430642127991,
  0.4010483920574188,
  0.9949255585670471,
  0.8156757354736328,
  0.4450187087059021,
  0.0044811926782131195,
  0.2978813648223877,
  0.5207319855690002,
  0.7596309185028076,
  0.17658932507038116,
  0.6271448135375977,
  0.198658749461174,
  0.7599001526832581,
  0.30431076884269714,
  0.35886406898498535,
  0.557636559009552,
  0.8161512613296509,
  0.9738840460777283,
  0.14342074096

In [23]:
outputs = []
outputs.append(
        tritonhttpclient.InferRequestedOutput('output_1', binary_data=False))

In [24]:
results = triton_client.infer(modelName, inputs, outputs=outputs)

POST /v2/models/tf4rec_tf/infer, headers None
{"inputs":[{"name":"category_list_trim","shape":[10,20],"datatype":"INT64","data":[21,6,2,11,39,30,22,8,2,4,4,10,9,6,12,5,6,6,5,7,4,4,2,4,4,26,7,6,2,7,5,6,10,10,3,18,11,12,49,5,11,3,7,65,12,3,71,8,19,4,3,8,3,16,5,34,10,4,5,4,4,3,2,8,14,4,8,2,6,3,11,13,4,2,34,8,8,8,9,3,21,4,28,2,12,10,8,2,30,3,9,16,39,17,7,3,29,6,16,4,8,9,6,6,27,3,2,11,8,8,7,29,2,7,13,4,16,3,13,3,31,18,8,5,10,12,6,5,5,33,5,14,2,12,37,8,7,4,15,13,3,12,39,13,8,6,4,15,5,2,11,2,2,3,8,41,4,4,2,3,5,4,2,30,3,11,7,6,10,38,8,10,42,4,8,7,16,4,29,11,3,6,10,11,49,3,11,4,5,2,7,4,6,7,5,6,17,4,7,9]},{"name":"item_id_list_trim","shape":[10,20],"datatype":"INT64","data":[61,34,13,27,121,87,71,20,16,5,4,33,25,8,37,11,14,34,10,19,7,4,12,4,4,78,18,8,13,19,10,21,31,33,6,54,28,35,162,10,27,3,19,203,35,6,206,22,58,5,3,20,3,47,11,107,30,5,9,7,7,2,15,22,46,4,22,13,8,2,27,38,4,12,104,20,20,23,24,6,61,7,88,13,37,31,23,15,91,3,24,45,112,51,18,6,100,21,45,7,20,25,14,8,92,6,13,29,22,22,19,100,13,18,38,5,

In [26]:
import numpy as np
np.array(results.get_response()['outputs'][0]['data']).shape

(5060,)

In [27]:
output0_data = results.as_numpy('output_1')

In [28]:
output0_data.shape

(10, 506)

In [31]:
output0_data

array([[ -8.485235 ,  -9.017825 ,  -6.982795 , ...,  -8.269795 ,
         -9.366974 ,  -8.534415 ],
       [ -9.220784 ,  -9.690337 ,  -7.0150924, ...,  -8.307517 ,
         -9.336738 ,  -8.855139 ],
       [ -9.577528 ,  -7.2755556,  -6.906867 , ...,  -7.2030134,
         -8.946048 ,  -9.925276 ],
       ...,
       [ -8.45824  ,  -7.5913186,  -7.6724224, ...,  -8.509451 ,
         -8.716337 ,  -9.1536875],
       [ -7.4180007,  -8.073153 ,  -8.186111 , ..., -10.251942 ,
         -9.014874 ,  -8.478142 ],
       [ -8.361289 ,  -8.316622 ,  -6.1037326, ...,  -9.193757 ,
         -9.545865 ,  -9.474741 ]], dtype=float32)

In [74]:
from tritonclient.utils import *
import tritonclient.http as httpclient
import nvtabular
import cudf

# # read in the workflow (to get input/output schema to call triton with)
# workflow = nvtabular.Workflow.load("/NVTabular/NVTabular/examples/rossman/repo/rossmann/1/workflow")
cols = ['inputs/category-list_trim', 'inputs/item_id-list_trim']
# read in a batch of data to get transforms for
batch = cudf.read_parquet("./sessions_by_day/1/train.parquet", num_rows=20)[['category-list_trim', 'item_id-list_trim']]


In [75]:
batch.columns= cols

In [76]:
batch.head()

Unnamed: 0,inputs/category-list_trim,inputs/item_id-list_trim
0,"[8, 3, 16, 5, 2, 2, 2, 25, 2, 7, 7, 8, 2, 5, 4...","[32, 8, 70, 18, 2, 5, 2, 120, 4, 26, 29, 34, 2..."
1,"[16, 8, 2, 3, 2, 2, 5, 12, 14, 7, 9, 15, 3, 2,...","[72, 35, 4, 11, 2, 2, 17, 54, 57, 26, 43, 68, ..."
2,"[9, 20, 2, 4, 4, 7, 17, 7, 4, 4, 2, 2, 3, 7, 3...","[39, 99, 5, 21, 6, 28, 81, 29, 14, 33, 3, 5, 1..."
4,"[5, 20, 5, 3, 3, 4, 7, 4, 3, 3, 7, 5, 3, 6, 2,...","[16, 96, 16, 10, 8, 33, 26, 9, 12, 10, 28, 16,..."
5,"[7, 6, 10, 3, 7, 2, 2, 49, 2, 14, 5, 3, 5, 2, ...","[28, 19, 38, 10, 26, 4, 7, 205, 7, 56, 15, 8, ..."


In [77]:
# convert the batch to a triton inputs
columns = [(col, batch[col]) for col in batch.columns]
inputs = [httpclient.InferInput(name, col.shape, np_to_triton_dtype(col.dtype)) for name, col in columns]

In [78]:
inputs[0]._get_tensor()

{'name': 'inputs/category-list_trim', 'shape': (20,), 'datatype': None}

In [50]:
# col = col.reshape(len(col), 1)
# input_tensor = input_class(name, col.shape, np_to_triton_dtype(col.dtype))
# input_tensor.set_data_from_numpy(col)

AttributeError: 'Series' object has no attribute 'reshape'

In [None]:
# placeholder variables for the output
outputs = [httpclient.InferRequestedOutput(name) for name in workflow.column_group.columns]

# make the request
with httpclient.InferenceServerClient("localhost:8000") as client:
    response = client.infer("rossmann", inputs, request_id="1",outputs=outputs)

# convert output from triton back to a nvt dataframe  
output = cudf.DataFrame({col: response.as_numpy(col) for col in workflow.column_group.columns})
print(output)

In [None]:
from tritonclient.utils import *
import tritonclient.http as httpclient
import nvtabular
import cudf
from timeit import default_timer as timer
from datetime import timedelta


# # read in a batch of data to get transforms for
# batch = cudf.read_parquet("/working_dir/Models_TF/Data/movielens/train.parquet", nrows=2)



inputs = [] 

for i, col in enumerate(batch.columns):
    print(col)
    d = batch[col].values_host.astype(np.int64)
    d = d.reshape(len(d),1)
    inputs.append(httpclient.InferInput(col, d.shape, np_to_triton_dtype(np.int64)))
    inputs[i].set_data_from_numpy(d)


In [107]:
import cupy
batch[col]

0     [8, 3, 16, 5, 2, 2, 2, 25, 2, 7, 7, 8, 2, 5, 4...
1     [16, 8, 2, 3, 2, 2, 5, 12, 14, 7, 9, 15, 3, 2,...
2     [9, 20, 2, 4, 4, 7, 17, 7, 4, 4, 2, 2, 3, 7, 3...
4     [5, 20, 5, 3, 3, 4, 7, 4, 3, 3, 7, 5, 3, 6, 2,...
5     [7, 6, 10, 3, 7, 2, 2, 49, 2, 14, 5, 3, 5, 2, ...
6     [34, 3, 3, 2, 6, 11, 2, 8, 4, 8, 11, 2, 4, 24,...
8     [8, 2, 10, 14, 2, 7, 35, 3, 3, 6, 6, 3, 7, 8, ...
9     [5, 8, 2, 8, 35, 4, 2, 2, 2, 19, 2, 37, 3, 9, ...
10    [3, 3, 6, 7, 8, 11, 7, 2, 9, 2, 7, 7, 5, 119, ...
11    [16, 24, 21, 5, 2, 2, 4, 2, 10, 3, 4, 61, 34, ...
12    [8, 4, 6, 2, 6, 8, 3, 3, 9, 9, 12, 5, 7, 6, 13...
13    [4, 7, 6, 8, 2, 2, 2, 2, 7, 3, 9, 2, 5, 11, 2,...
15    [6, 8, 12, 6, 64, 7, 4, 10, 6, 17, 11, 2, 13, ...
16    [2, 7, 15, 28, 2, 3, 2, 6, 2, 9, 2, 18, 3, 3, ...
17    [2, 3, 2, 6, 2, 6, 8, 12, 10, 24, 9, 2, 26, 8,...
18    [11, 10, 4, 5, 3, 2, 7, 6, 11, 7, 3, 4, 13, 3,...
19    [2, 5, 2, 2, 6, 11, 10, 18, 3, 14, 3, 4, 5, 4,...
20    [43, 12, 3, 2, 3, 2, 4, 8, 4, 15, 24, 7, 2

In [None]:
outputs = [httpclient.InferRequestedOutput("dense_3")]

with httpclient.InferenceServerClient("localhost:8000") as client:
    response = client.infer("movielens", inputs, request_id="1",outputs=outputs)

print(response.as_numpy("dense_3"))