In [1]:
import sys
import os
import glob
import gc

### Install requirements

In [2]:
# Install conda if not available
# curl https://repo.anaconda.com/miniconda/Miniconda3-latest-Linux-x86_64.sh -o /tmp/miniconda-installer.sh
# bash /tmp/miniconda-installer.sh

# Setup conda environment
# conda create -n rapids-22.02 -c rapidsai -c nvidia -c conda-forge python=3.8 cudatoolkit=11.2 cudf=22.02 dask-cudf=22.02
# conda activate rapids-22.02
# pip install nvtabular==0.11.0 tensorflow-gpu==2.8.0 merlin-models==0.2.0 transformers4rec==0.1.4 scipy==1.8.0 pynvml==11.4.1 ipykernel
# python -m ipykernel install --user --name=rapids-22.02

In [3]:
import pandas as pd
import numpy as np
import tensorflow as tf

import cudf
import dask_cudf

import nvtabular as nvt
from nvtabular.ops import *

from merlin.schema.tags import Tags
from merlin.schema import Schema

import merlin.models.tf as mm
import merlin.models.tf.dataset as tf_dataloader

from merlin.io.dataset import Dataset
from merlin.schema.io.tensorflow_metadata import TensorflowMetadata
from merlin.models.tf.blocks.core.aggregation import CosineSimilarity

from utils.fit_transform import workflow_fit_transform
from utils.save_visualize import save_results, print_results

2022-04-03 16:40:07.219763: I tensorflow/core/common_runtime/gpu/gpu_device.cc:1525] Created device /job:localhost/replica:0/task:0/device:GPU:0 with 8080 MB memory:  -> device: 0, name: Tesla V100-PCIE-16GB, pci bus id: 0001:00:00.0, compute capability: 7.0


In [4]:
output_path = './data/processed'
train_path = './data/train/*.parquet'
test_path = './data/test/*.parquet'

In [5]:
# Create data folders
!mkdir -p ./data/train ./data/test

In [6]:
# Define variables
TIME_KEY = "timestamp"
USER_KEY = "visitorid"
ITEM_KEY = "itemid"
SESSION_KEY = "sessionid"
TRANSACTION_KEY = "transactionid"
TARGET_KEY = "target"

### Load datasets

In [7]:
# Convert to parquet
train_sessions = pd.read_pickle("data/02_train_sessions.pkl")

In [8]:
# Introduce target variable
train_sessions[TARGET_KEY] = train_sessions[TRANSACTION_KEY].fillna(0)
train_sessions.loc[train_sessions[TARGET_KEY] != 0, TARGET_KEY] = 1
train_sessions[TARGET_KEY].value_counts()

0.0    260620
1.0      8697
Name: target, dtype: int64

In [9]:
train_sessions.to_parquet("data/train/02_train_sessions.parquet")

In [10]:
# Convert to parquet
test_sessions = pd.read_pickle("data/02_test_sessions.pkl")

In [11]:
# Introduce target variable
test_sessions[TARGET_KEY] = test_sessions[TRANSACTION_KEY].fillna(0)
test_sessions.loc[test_sessions[TARGET_KEY] != 0, TARGET_KEY] = 1
test_sessions[TARGET_KEY].value_counts()

0.0    34715
1.0     1308
Name: target, dtype: int64

In [12]:
test_sessions.to_parquet("data/test/02_test_sessions.parquet")

In [13]:
df = dask_cudf.read_parquet(train_path)
df.head()

Unnamed: 0,timestamp,visitorid,event,itemid,transactionid,sessionid,target
0,1435607175,75,view,257575,,98,0.0
1,1435607242,75,view,257575,,98,0.0
2,1435609434,75,view,257575,,99,0.0
3,1435609596,75,view,257575,,99,0.0
4,1435609771,75,view,257575,,99,0.0


In [14]:
df.columns

Index(['timestamp', 'visitorid', 'event', 'itemid', 'transactionid',
       'sessionid', 'target'],
      dtype='object')

In [15]:
df[[USER_KEY, ITEM_KEY, TRANSACTION_KEY, TARGET_KEY]].head()

Unnamed: 0,visitorid,itemid,transactionid,target
0,75,257575,,0.0
1,75,257575,,0.0
2,75,257575,,0.0
3,75,257575,,0.0
4,75,257575,,0.0


In [16]:
# Check the total number or rows
len(df)

269317

In [17]:
# Print item stats
print('Minimum item id: ' + str(df[ITEM_KEY].min().compute()))
print('Maximum item id: ' + str(df[ITEM_KEY].max().compute()))
print('Unique item id: ' + str(df[ITEM_KEY].unique().shape[0].compute()))

Minimum item id: 15
Maximum item id: 466864
Unique item id: 39974


In [18]:
# Print user stats
print('Minimum user id: ' + str(df[USER_KEY].min().compute()))
print('Maximum user id: ' + str(df[USER_KEY].max().compute()))
print('Unique user id: ' + str(df[USER_KEY].unique().shape[0].compute()))

Minimum user id: 75
Maximum user id: 1407573
Unique user id: 9474


In [19]:
# Check the distribution of the target class. We can see that the dataset is imbalanced.
df[TARGET_KEY].value_counts().compute()

0.0    260620
1.0      8697
Name: target, dtype: int64

### Merlin Models

[Merlin Models](https://github.com/NVIDIA-Merlin/models) is a library to make it easy for users in industry or academia to train and deploy recommender models with best practices baked into the library. This will let users in industry easily train standard models against their own dataset, getting high performance GPU accelerated models into production. This will also let researchers to build custom models by incorporating standard components of deep learning recommender models, and then benchmark their new models on example offline datasets.

In [20]:
%%time

## Define NVT the pipeline
# We add tags for user_id, item_id and target. NVTabular will provide an output file
# We categorify the user_id and item_id to be continuous integers 0, ..., |C|
user_id = [USER_KEY] >> AddMetadata(tags=[Tags.USER_ID]) >> Categorify(freq_threshold=5)
item_id = [ITEM_KEY] >> AddMetadata(tags=[Tags.ITEM_ID]) >> Categorify(freq_threshold=5)
targets = [TARGET_KEY] >> AddMetadata(
    tags=[str(Tags.BINARY_CLASSIFICATION), TARGET_KEY]
)

# Add more features
add_feat = ["event"] >> nvt.ops.Categorify()

# Add target encoding
te_feat = (
    [USER_KEY, ITEM_KEY] + add_feat >>
    TargetEncoding(
        [TARGET_KEY],
        kfold=1,
        p_smooth=20
    ) >>
    Normalize()
)

outputs = user_id + item_id + targets + add_feat + te_feat

etl_description = 'etl'

CPU times: user 272 µs, sys: 95 µs, total: 367 µs
Wall time: 373 µs


In [21]:
%%time

# Run NVTabular workflow
workflow_fit_transform(outputs, train_path, test_path, output_path)



CPU times: user 3.33 s, sys: 183 ms, total: 3.51 s
Wall time: 3.51 s


In [22]:
# Load schema
schema = TensorflowMetadata.from_proto_text_file(output_path + '/train/').to_merlin_schema()

In [23]:
target_column = schema.select_by_tag(Tags.TARGET).column_names[0]

In [24]:
# Define model
model = mm.DCNModel(
    schema,
    depth=2,
    deep_block=mm.MLPBlock([64, 32]),
    prediction_tasks=mm.BinaryClassificationTask(target_column, 
                                                 metrics=[tf.keras.metrics.AUC(), tf.keras.metrics.Accuracy(),
                                                         tf.keras.metrics.Precision(), tf.keras.metrics.Recall()])
)
model_description = "DCN"

In [25]:
model.compile(optimizer="adam", run_eagerly=False)
model_description = "DCN_adam"

In [26]:
batch_size = 16 * 1024

train_dl = tf_dataloader.BatchedDataset(
    Dataset(output_path + '/train/*.parquet', part_size="500MB"),
    batch_size = batch_size,
    label_names = [target_column],
    shuffle= True,
    schema = schema,
)

test_dl = tf_dataloader.BatchedDataset(
    Dataset(output_path + '/test/*.parquet', part_size="500MB"),
    batch_size = batch_size,
    label_names = [target_column],
    shuffle = False,
    schema = schema,
)



In [27]:
%%time
model.fit(train_dl, validation_data=test_dl)

2022-04-03 16:40:14.522822: W tensorflow/python/util/util.cc:368] Sets are not currently considered sequences, but this may change in the future, so consider avoiding using them.




2022-04-03 16:40:20.387020: W tensorflow/core/grappler/optimizers/loop_optimizer.cc:907] Skipping loop optimization for Merge node with control input: cond/then/_0/cond/cond/branch_executed/_122


CPU times: user 6.8 s, sys: 442 ms, total: 7.25 s
Wall time: 6.95 s


<keras.callbacks.History at 0x7f7c847ddfa0>

In [28]:
# model_des = model_description + '_' + etl_description
for key, value in  model.history.history.items():
    print('%s:%s' % (key, value[0]))

auc:0.8660605549812317
accuracy:0.0
precision:0.12029604613780975
recall:0.6634471416473389
loss:0.3338724672794342
regularization_loss:0.0
total_loss:0.3338724672794342
val_auc:0.994050145149231
val_accuracy:0.0
val_precision:1.0
val_recall:0.3845565617084503
val_loss:0.3123938739299774
val_regularization_loss:0.0
val_total_loss:0.3123938739299774


### Cleanup

In [29]:
# Let's delete the dataframe to free GPU-memory
del df
gc.collect()

143

In [30]:
# We now shutdown the notebook to free GPU-memory
import IPython
app = IPython.Application.instance()
app.kernel.do_shutdown(True)

{'status': 'ok', 'restart': True}