In [1]:
%load_ext autoreload
%autoreload 2

In [2]:
torchrec_root = '../'

import sys
sys.path.append(torchrec_root)

In [3]:
import random
import numpy as np
import pandas as pd

import torch
from torch.utils.data import Dataset, DataLoader
from torch import nn

import torch.optim as optim
import torch.optim.lr_scheduler as lr_scheduler
from torch.optim import Adam

In [4]:
pd.options.mode.chained_assignment = None  # default='warn'
pd.options.display.max_rows = 999
pd.options.display.max_columns = 100

In [5]:
device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
print(device)

cpu


## Load data

In [6]:
import joblib

df_samples = joblib.load(f'{torchrec_root}/data/amazon_data.pkl')

In [7]:
len( df_samples['reviewerID'].unique() )

192403

In [8]:
len( df_samples['asin'].unique() )

63001

In [9]:
len( df_samples['brand'].unique() )

3526

In [10]:
len( df_samples['categories'].unique() )

801

In [11]:
# sequence features from latest to oldest
df_samples['his_asin_seq'] = df_samples['his_asin_seq'].map(lambda x: x[::-1])

## Create pytorch dataset

In [12]:
## Hash buckets
# feat_configs = [
#     {"name": "reviewerID", "dtype": "category", "emb_dim": 12, "min_freq": 3, "hash_buckets": 1000000},
#     {"name": "asin", "dtype": "category", "emb_dim": 12, "min_freq": 3, "hash_buckets": 1000000},
    
#     {"name": "price", "dtype": "numerical", "norm": "std"},
#     {"name": "brand", "dtype": "category", "min_freq": 3, "emb_dim": 12},
#     {"name": "categories", "dtype": "category", "min_freq": 3, "emb_dim": 12},

#     {"name": "his_asin_seq", "dtype": "category", "islist": True, "min_freq": 3, "emb_dim": 12, "hash_buckets": 1000000},
# ]

## Dynamic Embedding
# feat_configs = [
#     {"name": "reviewerID", "dtype": "category", "emb_dim": 12, "min_freq": 3},
#     {"name": "asin", "dtype": "category", "emb_dim": 12, "min_freq": 3},
    
#     {"name": "price", "dtype": "numerical", "norm": "std"},
#     {"name": "brand", "dtype": "category", "min_freq": 3, "emb_dim": 12},
#     {"name": "categories", "dtype": "category", "min_freq": 3, "emb_dim": 12},

#     {"name": "his_asin_seq", "dtype": "category", "islist": True, "min_freq": 3, "emb_dim": 12},
# ]

## Auto generate feat_configs
from torchrec.utils import auto_generate_feature_configs
feat_configs = auto_generate_feature_configs(
    df_samples[['reviewerID', 'asin', 'price', 'brand', 'categories', 'his_asin_seq']]
)

print(feat_configs)

target_cols = ['label', ]

[{'name': 'reviewerID', 'dtype': 'category', 'emb_dim': 17, 'min_freq': 3}, {'name': 'asin', 'dtype': 'category', 'emb_dim': 15, 'min_freq': 3}, {'name': 'price', 'dtype': 'numerical', 'norm': 'std', 'mean': 74.40153304932919, 'std': 123.75264929566384}, {'name': 'brand', 'dtype': 'category', 'emb_dim': 11, 'min_freq': 3}, {'name': 'categories', 'dtype': 'category', 'emb_dim': 9, 'min_freq': 3}, {'name': 'his_asin_seq', 'dtype': 'category', 'islist': True, 'emb_dim': 15, 'min_freq': 3, 'max_len': 256}]


In [13]:
from torchrec.sample import traintest_split

df_train, df_test = traintest_split(df_samples, test_size=0.2, shuffle=True, group_id='reviewerID')
print(len(df_train), len(df_test))

1352538 336650


In [14]:
# from torchrec.dataset import FeatureTransformer

# transformer = FeatureTransformer(feat_configs)

# df_train = transformer.transform(df_train, is_train=True, n_jobs=4)
# df_test = transformer.transform(df_test, is_train=False, n_jobs=4)

In [15]:
import polars as pl

df_train = pl.from_pandas(df_train)
df_test = pl.from_pandas(df_test)

In [16]:
from torchrec.dataset import DataFrameDataset

train_dataset = DataFrameDataset(df_train, feat_configs, target_cols, is_raw=True, is_train=True, n_jobs=1, verbose=True)

feat_configs = train_dataset.transformer.get_feat_configs()
test_dataset = DataFrameDataset(df_test, feat_configs, target_cols, is_raw=True, is_train=False, n_jobs=3)

==> Feature transforming (is_train=True), note that feat_configs will be updated when is_train=True...
Input dataFrame type: <class 'polars.dataframe.frame.DataFrame'>, transform it by FeatureTransformerPolars
Processing feature reviewerID...
Converting category reviewerID to indices...
Feature reviewerID vocab size: None -> 153923
Processing feature asin...
Converting category asin to indices...
Feature asin vocab size: None -> 62384
Processing feature price...
Feature price mean: 74.40153304932919, std: 123.75264929566384, min: 0.01, max: 999.99
Processing feature brand...
Converting category brand to indices...
Feature brand vocab size: None -> 3503
Processing feature categories...
Converting category categories to indices...
Feature categories vocab size: None -> 800
Processing feature his_asin_seq...
Converting category his_asin_seq to indices...
Feature his_asin_seq vocab size: None -> 61925
==> Feature transforming (is_train=True) done...
==> Dense features: ['price']
==> Sparse

In [17]:
# max([v['idx'] for k,v in feat_configs[3]['vocab'].items()])
# feat_configs[3]['num_embeddings']

In [18]:
df_train.head()

reviewerID,asin,unixReviewTime,overall,title,price,brand,categories,label,his_asin_seq
str,str,i64,f64,str,f64,str,str,i64,list[str]
"""A1PC34OXBBHDEF""","""B0000BZL5A""",1375660800,4.0,"""B+W 58mm Kaesemann Circular Po…",89.0,"""B+W""","""Polarizing Filters""",1,"[""B0000BZL1P""]"
"""A27A0U9O1HUSC8""","""B007IV7KRU""",1342137600,5.0,"""OtterBox Defender Series Case …",64.97,"""OtterBox""","""Cases""",1,"[""B004JMZZE6"", ""B005O22Y7G"", ""B004RDWVUS""]"
"""A1FMND912KUYSX""","""B006ZW4HY2""",1397088000,5.0,"""Olympus VN-702PC Voice Recorde…",58.98,"""Olympus""","""Digital Voice Recorders""",1,"[""B003ANVQWU"", ""B000RT77I2"", ""B000652M6Y""]"
"""A27LEATCCMJJF4""","""B001D2LJ3Q""",1357516800,4.0,"""Manfrotto 701HDV Pro Fluid Vid…",299.99,,"""Tripod Heads""",1,"[""B005FYNSPK"", ""B003D5MZUW""]"
"""A1SMJ3J49A59FX""","""B008DWYBZM""",1365724800,4.0,"""Bear Motion Luxury Buffalo Hid…",49.99,"""Bear Motion""","""Cases""",1,"[""B005CLPP84"", ""B005U0M9B8"", ""B002L6HDTC""]"


In [19]:
train_dataloader = DataLoader(train_dataset, batch_size=512, num_workers=2, shuffle=True, collate_fn=train_dataset.collate_fn)
test_dataloader = DataLoader(test_dataset, batch_size=512, num_workers=2, shuffle=False, collate_fn=test_dataset.collate_fn)

In [20]:
print( len(train_dataloader) )
for features, labels in DataLoader(train_dataset, batch_size=2, num_workers=0, shuffle=True, collate_fn=train_dataset.collate_fn):
    print(features)
    print(labels)
    break

2642
{'dense_features': tensor([[-0.2781],
        [-0.5528]]), 'reviewerID': tensor([[76537],
        [78104]], dtype=torch.int32), 'asin': tensor([[19008],
        [26286]], dtype=torch.int32), 'brand': tensor([[1961],
        [1961]], dtype=torch.int32), 'categories': tensor([[351],
        [184]], dtype=torch.int32), 'his_asin_seq': tensor([[42381,  5008],
        [21211,  -100]], dtype=torch.int32)}
tensor([[1.],
        [1.]])


## Train Model

In [21]:
from model import DNN

dnn_hidden_units = [128,64,32]
model = DNN(feat_configs, hidden_units=dnn_hidden_units)
model = model.to(device)
print(model)

==> Model Input: dense_size=1, sparse_size=67
DNN(
  (embeddings): ModuleDict(
    (reviewerID): Embedding(153923, 17)
    (asin): Embedding(62384, 15)
    (brand): Embedding(3504, 11)
    (categories): Embedding(800, 9)
    (his_asin_seq): Embedding(61926, 15)
  )
  (tower): Sequential(
    (0): Linear(in_features=68, out_features=128, bias=True)
    (1): BatchNorm1d(128, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (2): ReLU()
    (3): Dropout(p=0.5, inplace=False)
    (4): Linear(in_features=128, out_features=64, bias=True)
    (5): BatchNorm1d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (6): ReLU()
    (7): Dropout(p=0.5, inplace=False)
    (8): Linear(in_features=64, out_features=32, bias=True)
    (9): BatchNorm1d(32, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (10): ReLU()
    (11): Dropout(p=0.5, inplace=False)
    (12): Linear(in_features=32, out_features=1, bias=True)
  )
)


In [22]:
optimizer = Adam(model.parameters(),  lr = 0.002, weight_decay = 1e-9)
lr_scd = lr_scheduler.StepLR(optimizer, step_size=len(train_dataloader), gamma=0.8)

In [23]:
from torchrec.trainer import Trainer

trainer = Trainer(
    model, 
    optimizer=optimizer,
    lr_scheduler=lr_scd,
    max_epochs=5,
    early_stopping_rounds=3,
    save_ckpt_path='./ckpt/'
)

model = trainer.fit(train_dataloader, eval_dataloader = test_dataloader, ret_model = 'final') #, init_ckpt_path='./ckpt/')

INFO:Trainer:[Validation] Epoch: 0/5, Validation Loss: {'loss': 0.7081270164450614}
INFO:Trainer:Learning rate: 0.002
INFO:Trainer:[Training] Epoch: 1/5 iter 0/2642, Training Loss: {'loss': 0.7866648435592651}
INFO:Trainer:[Training] Epoch: 1/5 iter 100/2642, Training Loss: {'loss': 0.4818296018242836}
INFO:Trainer:[Training] Epoch: 1/5 iter 200/2642, Training Loss: {'loss': 0.43139001533389093}
INFO:Trainer:[Training] Epoch: 1/5 iter 300/2642, Training Loss: {'loss': 0.4124824810028076}
INFO:Trainer:[Training] Epoch: 1/5 iter 400/2642, Training Loss: {'loss': 0.4021803920716047}
INFO:Trainer:[Training] Epoch: 1/5 iter 500/2642, Training Loss: {'loss': 0.39550615698099134}
INFO:Trainer:[Training] Epoch: 1/5 iter 600/2642, Training Loss: {'loss': 0.39065128356218337}
INFO:Trainer:[Training] Epoch: 1/5 iter 700/2642, Training Loss: {'loss': 0.3867773515837533}
INFO:Trainer:[Training] Epoch: 1/5 iter 800/2642, Training Loss: {'loss': 0.3838881490752101}
INFO:Trainer:[Training] Epoch: 1/5 

In [24]:
ckpt = trainer.load_ckpt('./ckpt')
model.load_state_dict(ckpt['model'].state_dict())

INFO:Trainer:Loaded model state_dict from checkpoint.
INFO:Trainer:Loaded model.training from checkpoint.
INFO:Trainer:Loaded model.feat_configs from checkpoint.
INFO:Trainer:Loaded optimizer = Adam (
Parameter Group 0
    amsgrad: False
    betas: (0.9, 0.999)
    capturable: False
    differ... from checkpoint.
INFO:Trainer:Loaded lr_scheduler = <torch.optim.lr_scheduler.StepLR object at 0x31ddc0700> from checkpoint.
INFO:Trainer:Loaded logger = <Logger Trainer (INFO)> from checkpoint.
INFO:Trainer:Loaded ckpt_file_prefix = checkpoint from checkpoint.
INFO:Trainer:Loaded num_epoch = 4 from checkpoint.
INFO:Trainer:Loaded global_steps = 10568 from checkpoint.
INFO:Trainer:Loaded save_ckpt_path = ./ckpt/ from checkpoint.
INFO:Trainer:Loaded metadata_fn = ./ckpt//metadata.json from checkpoint.
INFO:Trainer:Loaded max_epochs = 5 from checkpoint.
INFO:Trainer:Loaded early_stopping_rounds = 3 from checkpoint.
INFO:Trainer:Checkpoint loaded from ./ckpt/checkpoint.010568.ckpt.


<All keys matched successfully>

In [25]:
test_preds = []
test_labels = []
model.eval()

for features, labels in test_dataloader:
    outputs = model(features)
    test_preds.append(outputs[:,0])
    test_labels.append(labels[:,0])
test_preds = torch.concat(test_preds, dim=0).detach().cpu().numpy()
test_labels = torch.concat(test_labels, dim=0).detach().cpu().numpy()

In [26]:
print(test_preds.shape, test_labels.shape)

(336650,) (336650,)


In [27]:
from sklearn.metrics import roc_auc_score

auc_score = roc_auc_score(test_labels, test_preds)
print("AUC Score:", auc_score)

AUC Score: 0.673544966279805


# Generate and Test service

In [28]:
# Run this on terminal under root of project
# !python -m torchrec.serve --name dnn --path examples/ckpt/checkpoint.010568.ckpt --dep_paths examples

import os
ret = os.system('curl http://localhost:8000/dnn/health')

  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
  0     0    0     0    0     0      0      0 --:--:-- --:--:-- --:--:--     0
curl: (7) Failed to connect to localhost port 8000 after 2 ms: Connection refused


In [29]:
from torchrec.serve import test_predict

if ret == 0:
    test_predict(df_samples.sample(3), name='dnn')