##### reference https://www.kaggle.com/code/yamsam/recbole-gru4rec-sample-code

### 生成原子文件 atomic file

In [2]:
import os
import cudf
import gc
import datetime
import polars as pl
IS_TRAIN = True

if IS_TRAIN:
    train = cudf.read_parquet('/root/autodl-tmp/ottodata/valid/train.parquet')
    last_week_time = train['ts'].max() - 7 * 24 * 3600
    train = train[train['ts']>last_week_time]
    test = cudf.read_parquet('/root/autodl-tmp/ottodata/valid/test.parquet')
    data_dir = '/root/autodl-tmp/ottodata/tmp/gru_train/'
else:
    train = cudf.read_parquet('/root/autodl-tmp/ottodata/train.parquet')
    last_week_time = train['ts'].max() - 7 * 24 * 3600
    train = train[train['ts']>last_week_time]
    test = cudf.read_parquet('/root/autodl-tmp/ottodata/test.parquet')
    data_dir = '/root/autodl-tmp/ottodata/tmp/gru_test/'

df = cudf.concat([train, test])
df = pl.from_pandas(df.to_pandas())
# test = pl.read_parquet('../data/test.parquet')
# valid2 = pl.read_parquet('../data/valid2.parquet')
# valid3 = pl.read_parquet('../data/valid3.parquet')

# print (f"       test  : before={datetime.datetime.fromtimestamp(test['ts'].min())} - {datetime.datetime.fromtimestamp(test['ts'].max())}")
# print (f"     valid2  : before={datetime.datetime.fromtimestamp(valid2['ts'].min())} - {datetime.datetime.fromtimestamp(valid2['ts'].max())}")
# print (f"     valid3  : before={datetime.datetime.fromtimestamp(valid3['ts'].min())} - {datetime.datetime.fromtimestamp(valid3['ts'].max())}")

# df = pl.concat([valid2, valid3, test])  # 拼接起来一起训练模型
# del test, valid2, valid3
 
df = df.sort(['session', 'aid', 'ts'])
df = df.with_columns((pl.col('ts') * 1e9).alias('ts'))
df = df.rename({'session': 'session:token', 'aid': 'aid:token', 'ts': 'ts:float'})
if not os.path.exists(f'{data_dir}/gru'):
    os.makedirs(f'{data_dir}/gru')
df['session:token', 'aid:token', 'ts:float',].write_csv(f'{data_dir}/gru/gru.inter', sep='\t')
del df

### 训练gru模型的参数设置

In [3]:
# !!!
MAX_ITEM = 20   #  每个用户的aid数量，用于粗排和embedding计算

In [4]:

parameter_dict = {
    'data_path': data_dir,
    'USER_ID_FIELD':'session',
    'ITEM_ID_FIELD': 'aid',
    'TIME_FIELD': 'ts',
    'user_inter_num_interval': "[5,Inf)",
    'item_inter_num_interval': "[5,Inf)",
    'load_col': {
        'inter': 
            ['session', 'aid', 'ts']
                },
#    'train_neg_sample_args': None,

    'save_dataset':True,
    'save_dataloaders':True,
    # 'dataloaders_save_path':'.，/data/gru',
    # 'dataset_save_path':'.，/data/gru',
    'checkpoint_dir': f'{data_dir}/gru/',  

    'epochs': 10,
    'stopping_step':3,
    'loss_type':'BPR',
    'eval_batch_size': 1024,
    #'train_batch_size': 1024,
#    'enable_amp':True,
    'MAX_ITEM_LIST_LENGTH': MAX_ITEM,   #########
    'eval_args': {
        'split': {'RS': [9, 1, 0]},
        'group_by': 'user',
        'order': 'TO',
        'mode': 'full'},
    'topk': [20, 200],
    'valid_metric': 'Recall@200'
}

In [5]:
from recbole.quick_start import load_data_and_model
from typing import List, Tuple
from pydantic import BaseModel
import torch
from recbole.data.interaction import Interaction
import pandas as pd
import numpy as np
import logging
from logging import getLogger
from collections import defaultdict
from recbole.config import Config
from recbole.utils import init_seed, init_logger
from recbole.data import create_dataset, data_preparation
from recbole.trainer import Trainer
from recbole.quick_start import load_data_and_model
from recbole.model.sequential_recommender import GRU4Rec

config = Config(model='GRU4Rec', dataset='gru', config_dict=parameter_dict) # dataset的名字要和文件夹名字一致
init_seed(config['seed'], config['reproducibility'])

# logger initialization
init_logger(config)
logger = getLogger()

# Create handlers
c_handler = logging.StreamHandler()
c_handler.setLevel(logging.INFO)
logger.addHandler(c_handler)

# write config info into log
logger.info(config)

28 Jan 17:32    INFO  
General Hyper Parameters:
gpu_id = 0
use_gpu = True
seed = 2020
state = INFO
reproducibility = True
data_path = /root/autodl-tmp/ottodata/tmp/gru_train/gru
checkpoint_dir = /root/autodl-tmp/ottodata/tmp/gru_train//gru/
show_progress = True
save_dataset = True
dataset_save_path = None
save_dataloaders = True
dataloaders_save_path = None
log_wandb = False

Training Hyper Parameters:
epochs = 10
train_batch_size = 2048
learner = adam
learning_rate = 0.001
neg_sampling = {'uniform': 1}
eval_step = 1
stopping_step = 3
clip_grad_norm = None
weight_decay = 0.0
loss_decimal_place = 4

Evaluation Hyper Parameters:
eval_args = {'split': {'RS': [9, 1, 0]}, 'group_by': 'user', 'order': 'TO', 'mode': 'full'}
repeatable = True
metrics = ['Recall', 'MRR', 'NDCG', 'Hit', 'Precision']
topk = [20, 200]
valid_metric = Recall@200
valid_metric_bigger = True
eval_batch_size = 1024
metric_decimal_place = 4

Dataset Hyper Parameters:
field_separator = 	
seq_separator =  
USER_ID_FIELD =

In [6]:
dataset = create_dataset(config)
logger.info(dataset)

28 Jan 17:42    INFO  Saving filtered dataset into [/root/autodl-tmp/ottodata/tmp/gru_train//gru/gru-dataset.pth]
Saving filtered dataset into [/root/autodl-tmp/ottodata/tmp/gru_train//gru/gru-dataset.pth]
28 Jan 17:42    INFO  gru
The number of users: 2874632
Average actions of users: 18.130091827438026
The number of items: 957239
Average actions of items: 54.44552347483071
The number of inters: 52117324
The sparsity of the dataset: 99.99810600220839%
Remain Fields: ['session', 'aid', 'ts']
gru
The number of users: 2874632
Average actions of users: 18.130091827438026
The number of items: 957239
Average actions of items: 54.44552347483071
The number of inters: 52117324
The sparsity of the dataset: 99.99810600220839%
Remain Fields: ['session', 'aid', 'ts']


In [7]:
# dataset splitting
train_data, valid_data, test_data = data_preparation(config, dataset)

28 Jan 17:53    INFO  Saving split dataloaders into: [/root/autodl-tmp/ottodata/tmp/gru_train//gru/gru-for-GRU4Rec-dataloader.pth]
Saving split dataloaders into: [/root/autodl-tmp/ottodata/tmp/gru_train//gru/gru-for-GRU4Rec-dataloader.pth]
28 Jan 17:53    INFO  [Training]: train_batch_size = [2048] negative sampling: [{'uniform': 1}]
[Training]: train_batch_size = [2048] negative sampling: [{'uniform': 1}]
28 Jan 17:53    INFO  [Evaluation]: eval_batch_size = [1024] eval_args: [{'split': {'RS': [9, 1, 0]}, 'group_by': 'user', 'order': 'TO', 'mode': 'full'}]
[Evaluation]: eval_batch_size = [1024] eval_args: [{'split': {'RS': [9, 1, 0]}, 'group_by': 'user', 'order': 'TO', 'mode': 'full'}]


In [8]:
# model loading and initialization
model = GRU4Rec(config, train_data.dataset).to(config['device'])
logger.info(model)

# trainer loading and initialization
trainer = Trainer(config, model)

# model training
best_valid_score, best_valid_result = trainer.fit(train_data, valid_data)

28 Jan 17:53    INFO  GRU4Rec(
  (item_embedding): Embedding(957239, 64, padding_idx=0)
  (emb_dropout): Dropout(p=0.3, inplace=False)
  (gru_layers): GRU(64, 128, bias=False, batch_first=True)
  (dense): Linear(in_features=128, out_features=64, bias=True)
  (loss_fct): BPRLoss()
)
Trainable parameters: 61345280
GRU4Rec(
  (item_embedding): Embedding(957239, 64, padding_idx=0)
  (emb_dropout): Dropout(p=0.3, inplace=False)
  (gru_layers): GRU(64, 128, bias=False, batch_first=True)
  (dense): Linear(in_features=128, out_features=64, bias=True)
  (loss_fct): BPRLoss()
)
Trainable parameters: 61345280
28 Jan 18:15    INFO  epoch 0 training [time: 1308.71s, train loss: 3511.2605]
epoch 0 training [time: 1308.71s, train loss: 3511.2605]
28 Jan 19:38    INFO  epoch 0 evaluating [time: 4937.23s, valid_score: 0.207300]
epoch 0 evaluating [time: 4937.23s, valid_score: 0.207300]
28 Jan 19:38    INFO  valid result: 
recall@20 : 0.0624    recall@200 : 0.2073    mrr@20 : 0.0167    mrr@200 : 0.0191 

KeyboardInterrupt: 

In [None]:
del trainer, train_data, valid_data, test_data
gc.collect()

### 使用训练好的模型

In [None]:
# https://qiita.com/fufufukakaka/items/e03df3a7299b2b8f99cf
from typing import List, Tuple
import numpy as np
import torch

from pydantic import BaseModel
from recbole.data import create_dataset
from recbole.data.dataset.sequential_dataset import SequentialDataset
from recbole.data.interaction import Interaction
from recbole.model.sequential_recommender.sine import SINE
from recbole.utils import get_model, init_seed

class ItemHistory(BaseModel):
    sequence: List[str]
    topk: int

class RecommendedItems(BaseModel):
    score_list: List[float]
    item_list: List[str]

def pred_user_to_item(item_history: ItemHistory, get_emb=None):
    item_history_dict = item_history.dict()
    item_sequence = item_history_dict["sequence"]  # sequence is AIDs
    item_length = len(item_sequence)
    pad_length = MAX_ITEM  

    '''
    First, we need to use token2id() to convert external user id 
    into internal user id.
    Then, we create a 0 padded tensor to pass into the interaction object. 
    The number of 0s depends on the length of the original item list. 
    If there are 4 items, then its padded with 16 0s so that the total 
    length is 20, which is what we want to predict.
    '''
    # 不足 MAX_ITEM 个候选的用0填充
    padded_item_sequence = torch.nn.functional.pad(
        torch.tensor(dataset.token2id(dataset.iid_field, item_sequence)),
        (0, pad_length - item_length),
        "constant",
        0,
    )

    '''To perform prediction, we need to create the sequence in this
    interaction object.'''        
    input_interaction = Interaction(
        {
            "aid_list": padded_item_sequence.reshape(1, -1),
            "item_length": torch.tensor([item_length]),
        }
    )
    '''
    In full_sort_predict, first we pass the sequence forward in the model to get the next article.
    This forward pass gives us an embedding. We multiple this embedding with the embedding space 
    learnt by the model. This matrix multiplication gives us a single score for each item. The higher 
    the score, the closer that article is to the predicted embedding. 
    '''
    if get_emb == True:
        seq_output = model(input_interaction['aid_list'].to(model.device),input_interaction['item_length'].to(model.device))
        # print(f'seq_output.shape:{seq_output.shape}')  # bs, emb_size (1,64)
        seq_output = seq_output.detach().cpu().numpy().tolist()
        return seq_output
    else:
        scores = model.full_sort_predict(input_interaction.to(model.device))
        # print(f'scores.shape {scores.shape}') # torch.Size([1, 450570])

        # print(f'dataset.item_num: {dataset.item_num}')  # 450570
        scores = scores.view(-1, dataset.item_num)
        scores[:, 0] = -np.inf  # pad item score -> -inf

        '''Top 20 scores and items are selected using torch.topk.'''
        topk_score, topk_iid_list = torch.topk(scores, item_history_dict["topk"])

        predicted_score_list = topk_score.tolist()[0]
        '''Predicted items need to be translated back into original article IDs 
        using dataset.id2token.'''
        predicted_item_list = dataset.id2token(
            dataset.iid_field, topk_iid_list.tolist()
        ).tolist()

        recommended_items = {
            "score_list": predicted_score_list,
            "item_list": predicted_item_list,
        }
        return recommended_items



In [None]:
# !!! 是否生成embedding
get_emb = True

In [None]:
test_df = pl.read_parquet(f'../data/test.parquet')
# test_df = test_df[:1000]  # debug

# session_types = ['clicks', 'carts', 'orders']
test_session_AIDs = test_df.to_pandas().reset_index(drop=True).groupby('session')['aid'].apply(list)
test_session_types = test_df.to_pandas().reset_index(drop=True).groupby('session')['type'].apply(list)
del test_df
labels = []
embedding = []
type_weight_multipliers = {0: 1, 1: 6, 2: 3}
for AIDs, types in zip(test_session_AIDs, test_session_types):
    # if len(AIDs) >= MAX_ITEM:
    #     # if we have enough aids (over equals 20) we don't need to look for candidates! we just use the old logic
    #     weights=np.logspace(0.1,1,len(AIDs),base=2, endpoint=True)-1  # logspac用于创建等比数列,开始点和结束点是10的幂
    #     aids_temp=defaultdict(lambda: 0)
    #     for aid,w,t in zip(AIDs,weights,types): 
    #         aids_temp[aid]+= w * type_weight_multipliers[t]
            
    #     sorted_aids=[k for k, v in sorted(aids_temp.items(), key=lambda item: -item[1])]  
    #     labels.append(sorted_aids[:MAX_ITEM])
    #     if get_emb:
    #         try:
    #             emb = pred_user_to_item(item, MAX_ITEM, get_emb=True)
    #         except:
    #             emb = []
    # else:
    AIDs = list(dict.fromkeys(AIDs))
    item = ItemHistory(sequence=AIDs, topk=MAX_ITEM)
    try:
        nns = [int(v) for v in pred_user_to_item(item, MAX_ITEM)['item_list']]
    except:
        nns = []

    for word in nns:
        if len(AIDs) == MAX_ITEM:
            break
        if int(word) not in AIDs:
            AIDs.append(word)

    labels.append(AIDs[:MAX_ITEM])

    if get_emb:
        try:
            emb = pred_user_to_item(item, MAX_ITEM, get_emb=True)
        except:
            emb = []
    if get_emb:
        if len(emb) != 0: 
            embedding.append(emb[0])  # emb
        else:
            embedding.append(emb)

if get_emb:
    emb_df = pd.DataFrame(data={'session': test_session_AIDs.index, 'emb': embedding})
    if IS_TRAIN:
        # print(f'emb_df:::::{emb_df}')
        emb_df.to_parquet(f'/root/autodl-tmp/ottodata/tmp/gru_{MAX_ITEM}_emb_train.parquet')
    else:
        emb_df.to_parquet(f'/root/autodl-tmp/ottodata/tmp/gru_{MAX_ITEM}_emb_test.parquet')

data = pd.DataFrame(data={'session': test_session_AIDs.index, 'aid': labels})  
df = pl.DataFrame(data)
df = df.explode('aid') 
df = df.select([pl.col('session').cast(pl.Int32), pl.col('aid').cast(pl.Int32)])
if IS_TRAIN:
    df.write_parquet(f'/root/autodl-tmp/ottodata/tmp/gru_{MAX_ITEM}_train.parquet')  # 粗排的候选
else:
    df.write_parquet(f'/root/autodl-tmp/ottodata/tmp/gru_{MAX_ITEM}_test.parquet')