In [1]:
import sys

import numpy as np

# from recbole.data.dataloader import *
import torch
from pandas import DataFrame
from torch import Tensor
from recbole.config import Config
from recbole.data.dataset import Dataset, SequentialDataset
# from recbole.quick_start import run_recbole
from recbole.data import (
    create_dataset,
     data_preparation,
)
from logging import getLogger

from recbole.data.transform import construct_transform
from recbole.utils import (
    init_logger,
    get_model,
    get_trainer,
    init_seed,
    FeatureType,
    set_color,
    get_flops,
    get_environment,
    ModelType
)

# 1. Define new dataset: SimulatedOnlineDataset

In [2]:
class SimulatedOnlineDataset(Dataset):
    def __init__(self, config):
        self.timestamp_max, self.timestamp_min = 0.0, 0.0
        self.cutoff, self.cutoff_conv = 0.0, 0.0

        super().__init__(config)

    def _fill_nan(self):
        """Missing value imputation.

        For fields with type :obj:`~recbole.utils.enum_type.FeatureType.TOKEN`, missing value will be filled by
        ``[PAD]``, which indexed as 0.

        For fields with type :obj:`~recbole.utils.enum_type.FeatureType.FLOAT`, missing value will be filled by
        the average of original data.

        Note:
            This is similar to the recbole's original implementation. The difference is the change in inplace operation to suit the pandas 3.0
        """
        self.logger.debug(set_color("Filling nan", "green"))

        for feat_name in self.feat_name_list:
            feat = getattr(self, feat_name)
            for field in feat:
                ftype = self.field2type[field]
                if ftype == FeatureType.TOKEN:
                    feat[field] = feat[field].fillna(value=0)
                elif ftype == FeatureType.FLOAT:
                    feat[field] = feat[field].fillna(value=feat[field].mean())
                else:
                    dtype = np.int64 if ftype == FeatureType.TOKEN_SEQ else np.float64
                    feat[field] = feat[field].apply(
                        lambda x: (
                            np.array([], dtype=dtype) if isinstance(x, float) else x
                        )
                    )

    def build(self):
        self._change_feat_format()

        if self.benchmark_filename_list is not None:
            super().build()

        # ordering
        ordering_args = self.config["eval_args"]["order"]
        if ordering_args == "TO":
            self.sort(by=self.time_field)
        else:
            raise AssertionError("The ordering_method must be 'TO.")

        # splitting & grouping
        split_args = self.config["eval_args"]["split"]
        if split_args is None:
            raise ValueError("The split_args in eval_args should not be None.")
        if not isinstance(split_args, dict):
            raise ValueError(f"The split_args [{split_args}] should be a dict.")

        split_mode = list(split_args.keys())[0]
        assert len(split_args.keys()) == 1
        if split_mode != "CO":
            raise NotImplementedError("The split_mode must be 'CO'.")
        elif split_mode == "CO":
            cutoff = split_args["CO"]
            # NOTE: HoangLe [Jun-05]: cutoff may come with different types: string, int

            group_by = self.config["eval_args"]["group_by"]
            datasets = self.split_by_cuttoff(cutoff=cutoff, group_by=group_by)

        return datasets

    def split_by_cuttoff(self, cutoff: str | int, group_by: str) -> list[Dataset]:
        """Split the interations by cutoff date

        Args:
            cutoff (str | int): cutoff date in Unix timestamp format
            group_by (str): field to group by, usually the user_id

        Returns:
            list[Dataset]: list of training/validation/testing dataset, whose interaction features has been split.

        Notes:
            cutoff may be different types: string of Unix timestamp (e.g. '1717923174'), integer of Unix timestamp (e.g. 1717923174)
        """

        self.logger.debug(f"split by cutoff date = '{cutoff}', group_by=[{group_by}]")

        assert self.inter_feat

        # Convert cutoff to suitable format and apply 0-1 normalization with max/min timestamp
        cutoff_conv = float(cutoff)

        is_normalized = (
            self.config["normalize_field"]
            and self.time_field in self.config["normalize_field"]
        ) or self.config["normalize_all"]
        if is_normalized:

            def norm_timestamp(timestamp: float):
                mx, mn = self.timestamp_max, self.timestamp_min
                if mx == mn:
                    arr = 1.0
                else:
                    arr = (timestamp - mn) / (mx - mn)
                return arr

            cutoff_conv = norm_timestamp(cutoff_conv)
        self.cutoff_conv = cutoff_conv

        match self.inter_feat[group_by]:
            case DataFrame():
                inter_feat_grouby_numpy = self.inter_feat[group_by].to_numpy()
            case Tensor():
                inter_feat_grouby_numpy = self.inter_feat[group_by].numpy()
            case _:
                raise TypeError(
                    f"self.inter_feat[group_by] has type: {type(self.inter_feat[group_by])} - which must be either DataFrame() or Tensor()"
                )

        grouped_inter_feat_index = self._grouped_index(inter_feat_grouby_numpy)

        indices_train, indices_val, indices_test = [], [], []
        for grouped_index in grouped_inter_feat_index:
            df_each_user = self.inter_feat[grouped_index]

            n_trainval = torch.sum(
                (df_each_user[self.time_field] <= self.cutoff_conv).to(
                    dtype=torch.int32
                )
            )
            n_test = len(df_each_user) - n_trainval

            if n_trainval == 0:
                continue

            if n_trainval >= 1:
                indices_train.extend(grouped_index[: n_trainval - 1])
            if n_trainval >= 2:
                indices_val.append(grouped_index[n_trainval - 1])
            if n_test > 0:
                indices_test.append(grouped_index[n_trainval])

        self._drop_unused_col()
        next_df = [
            self.inter_feat[index]
            for index in [indices_train, indices_val, indices_test]
        ]
        next_ds = [self.copy(_) for _ in next_df]
        return next_ds


class SimulatedOnlineSequentialDataset(SimulatedOnlineDataset, SequentialDataset):
    pass


## Check validity of TimeCutoffDataset
Checking:

    - for each user, the timestamp of test_dataset >= cutoff_conv >= val_dataset >= train_dataset
    - test_dataset has at most 1 interaction, val_dataset always has 1 interaction 

In [5]:

model_name = 'FM'
dataset_name = 'amazon-digital-music'

config_dict = {
    'normalize_all': False,
    'load_col': {"inter": ['user_id', 'item_id', 'timestamp']},
    'metrics': ["NDCG", "Precision", "Recall", "MRR", "Hit", "MAP"],
    'eval_args': {
        "order": "TO",
        "split": {"CO": '1403568000'},
        "group_by": 'user_id',
        'mode': 'full'
    },
    'train_neg_sample_args': None
}

config = Config(
    model=model_name,
    dataset=dataset_name,
    config_dict=config_dict
)

match (config["MODEL_TYPE"]):
    case ModelType.GENERAL | ModelType.CONTEXT:
        ds_name = "SimulatedOnlineDataset"
    case ModelType.SEQUENTIAL:
        ds_name = "SimulatedOnlineSequentialDataset"

dataset = eval(ds_name)(config)
print(f"dataset type: {type(dataset)}")

train_dataset, val_dataset, test_dataset = dataset.build()

cutoff_conv = dataset.cutoff_conv


def get_df(ds):
    if isinstance(ds, SimulatedOnlineDataset):
        ds = ds.inter_feat

    return DataFrame({
        'user_id': ds['user_id'].numpy(), 
        'item_id': ds['item_id'].numpy(), 
        'timestamp': ds['timestamp'].numpy()
    })

df_train = get_df(train_dataset)
df_val = get_df(val_dataset)
df_test = get_df(test_dataset)

def check_greater_ts(df_check, df_train):
    a = df_check['timestamp'].item() >= df_train['timestamp']
    assert sum(a) == len(df_train)

for user_id in df_train['user_id'].unique():
    df_train_user = df_train[df_train['user_id'] == user_id]
    df_val_user = df_val[df_val['user_id'] == user_id]
    df_test_user = df_test[df_test['user_id'] == user_id]

    assert len(df_val_user) == 1

    check_greater_ts(df_val_user, df_train_user)

    assert len(df_test_user) <= 1
    if len(df_test_user) == 1:
        check_greater_ts(df_test_user, df_train_user)
        check_greater_ts(df_test_user, df_val_user)

        assert df_test_user['timestamp'].item() >= cutoff_conv

dataset type: <class '__main__.SimulatedOnlineDataset'>


# 2. Define model, dataset and start training

In [4]:
def run_recbole_with_TimeCutoff(
    model=None,
    dataset=None,
    config_file_list=None,
    config_dict=None,
    saved=True,
    queue=None,
):
    r"""A fast running api, which includes the complete process of
    training and testing a model on a specified dataset

    Args:
        model (str, optional): Model name. Defaults to ``None``.
        dataset (str, optional): Dataset name. Defaults to ``None``.
        config_file_list (list, optional): Config files used to modify experiment parameters. Defaults to ``None``.
        config_dict (dict, optional): Parameters dictionary used to modify experiment parameters. Defaults to ``None``.
        saved (bool, optional): Whether to save the model. Defaults to ``True``.
        queue (torch.multiprocessing.Queue, optional): The queue used to pass the result to the main process. Defaults to ``None``.
    """
    # configurations initialization
    config = Config(
        model=model,
        dataset=dataset,
        config_file_list=config_file_list,
        config_dict=config_dict,
    )
    init_seed(config["seed"], config["reproducibility"])
    # logger initialization
    init_logger(config)
    logger = getLogger()
    logger.info(sys.argv)
    logger.info(config)

    # dataset filtering
    dataset = TimeCutoffDataset(config)
    logger.info(dataset)

    # dataset splitting
    train_data, valid_data, test_data = data_preparation(config, dataset)

    # model loading and initialization
    init_seed(config["seed"] + config["local_rank"], config["reproducibility"])
    model = get_model(config["model"])(config, train_data._dataset).to(config["device"])
    logger.info(model)

    transform = construct_transform(config)
    flops = get_flops(model, dataset, config["device"], logger, transform)
    logger.info(set_color("FLOPs", "blue") + f": {flops}")

    # trainer loading and initialization
    trainer = get_trainer(config["MODEL_TYPE"], config["model"])(config, model)

    # model training
    best_valid_score, best_valid_result = trainer.fit(
        train_data, valid_data, saved=saved, show_progress=config["show_progress"]
    )

    # model evaluation
    test_result = trainer.evaluate(
        test_data, load_best_model=saved, show_progress=config["show_progress"]
    )

    environment_tb = get_environment(config)
    logger.info(
        "The running environment of this training is as follows:\n"
        + environment_tb.draw()
    )

    logger.info(set_color("best valid ", "yellow") + f": {best_valid_result}")
    logger.info(set_color("test result", "yellow") + f": {test_result}")

    result = {
        "best_valid_score": best_valid_score,
        "valid_score_bigger": config["valid_metric_bigger"],
        "best_valid_result": best_valid_result,
        "test_result": test_result,
    }

    if not config["single_spec"]:
        dist.destroy_process_group()

    if config["local_rank"] == 0 and queue is not None:
        queue.put(result)  # for multiprocessing, e.g., mp.spawn

    return result  # for the single process


In [5]:
model_name = 'NPE'
dataset_name = 'ml-100k'

config_dict = {
    'eval_args': {
        "order": "TO",
        "split": {"CO": '886349689'},
        "group_by": 'user_id'
    },
    'train_neg_sample_args': None
}

config = Config(
    model=model_name,
    dataset=dataset_name,
    config_dict=config_dict
)

run_recbole_with_TimeCutoff(model=model_name, dataset=dataset_name, config_dict=config_dict)

13 Jun 22:48    INFO  ['/Users/macos/miniforge3/envs/py/lib/python3.10/site-packages/ipykernel_launcher.py', '--f=/Users/macos/Library/Jupyter/runtime/kernel-v2-554001krL4tqekY7i.json']
13 Jun 22:48    INFO  
General Hyper Parameters:
gpu_id = 0
use_gpu = True
seed = 2020
state = INFO
reproducibility = True
data_path = /Users/macos/miniforge3/envs/py/lib/python3.10/site-packages/recbole/config/../dataset_example/ml-100k
checkpoint_dir = saved
show_progress = True
save_dataset = False
dataset_save_path = None
save_dataloaders = False
dataloaders_save_path = None
log_wandb = False

Training Hyper Parameters:
epochs = 300
train_batch_size = 2048
learner = adam
learning_rate = 0.001
train_neg_sample_args = {'distribution': 'none', 'sample_num': 'none', 'alpha': 'none', 'dynamic': False, 'candidate_num': 0}
eval_step = 1
stopping_step = 10
clip_grad_norm = None
weight_decay = 0.0
loss_decimal_place = 4

Evaluation Hyper Parameters:
eval_args = {'split': {'CO': '886349689'}, 'order': 'TO', '

{'best_valid_score': 0.024,
 'valid_score_bigger': True,
 'best_valid_result': OrderedDict([('recall@10', 0.0526),
              ('mrr@10', 0.024),
              ('ndcg@10', 0.0306),
              ('hit@10', 0.0526),
              ('precision@10', 0.0053)]),
 'test_result': OrderedDict([('recall@10', 0.0),
              ('mrr@10', 0.0),
              ('ndcg@10', 0.0),
              ('hit@10', 0.0),
              ('precision@10', 0.0)])}