# Data Split

In [None]:
from datetime import timedelta
import polars as pl
import os
DATA_DIR = 'data'
SAVE_PATH = os.path.join(DATA_DIR, 'val')
os.makedirs(os.path.join(DATA_DIR, 'val'), exist_ok=True)
EVAL_DAYS_TRESHOLD = 14

df_test_users = pl.read_parquet(os.path.join(DATA_DIR, 'test_users.pq'))
df_clickstream = pl.read_parquet(os.path.join(DATA_DIR, 'clickstream.pq'))

#df_cat_features = pl.read_parquet(os.path.join(DATA_DIR, 'cat_features.pq'))
#df_text_features = pl.read_parquet(os.path.join(DATA_DIR, 'clickstream.pq'))
df_event = pl.read_parquet(os.path.join(DATA_DIR, 'events.pq'))

In [6]:
treshhold = df_clickstream['event_date'].max() - timedelta(days=EVAL_DAYS_TRESHOLD)

df_train = df_clickstream.filter(df_clickstream['event_date']<= treshhold)
df_eval = df_clickstream.filter(df_clickstream['event_date']> treshhold)[['cookie', 'node', 'event']]

df_eval = (
        df_eval
        .join(df_train, on=['cookie', 'node'], how='anti')
        .filter(
                pl.col('event').is_in(
                    df_event.filter(pl.col('is_contact')==1)['event'].unique()
                )
            )
        .filter(
        pl.col('cookie').is_in(df_train['cookie'].unique())
        ).filter(
            pl.col('node').is_in(df_train['node'].unique())
        )
)
df_eval = df_eval.unique(['cookie', 'node'])

In [13]:
df_train.write_parquet(os.path.join(SAVE_PATH, 'clickstream.pq'))
df_eval.write_parquet(os.path.join(SAVE_PATH, 'gt.pq'))

# Retrieval Train

## Autoencoders

### EASE_DAN

In [1]:
from autoencoders.model import EASE_DAN
from utils import Enc, convert_to_sparse, process_in_batches, recall_at
import polars as pl
import os
import numpy as np

DATA_DIR = 'data'
VAL_PATH = os.path.join(DATA_DIR, 'val')
PREDICTION_PATH = 'predictions'
MODEL_NAME = 'EASE_DAN'
EVAL_DAYS_TRESHOLD = 14
N_ITEMS = 30_000

df_clickstream = pl.read_parquet(os.path.join(VAL_PATH, 'clickstream.pq'))
df_eval = pl.read_parquet(os.path.join(VAL_PATH, 'gt.pq')).join(df_clickstream, on='cookie', how='semi')
df_cat_features = pl.read_parquet(os.path.join(DATA_DIR, 'cat_features.pq')) 
df_event = pl.read_parquet(os.path.join(DATA_DIR, 'events.pq'))

df_train = df_clickstream
eval_users = df_eval['cookie'].unique().to_list()

In [None]:
enc = Enc(item_key='node', user_key='cookie')
enc_eval_users = [enc.user_id_dict.get(i) for i in eval_users]

df_train = df_train.join(df_train.unique(subset=['node', 'cookie']).select('node').group_by('node').len().sort('len').tail(N_ITEMS).drop('len'),
                    on='node')


df_train = df_train.with_columns(
        (pl.lit(1)).alias("event_weight")
    )
df_eval = df_eval.join(df_train, on='cookie', how='semi')
df_eval = df_eval.with_columns(pl.col('node').cast(pl.Int64))
result = enc.fit(train_df=df_train, event_weight='event_weight')


X = (convert_to_sparse(result, enc) > 0).astype(np.float32)
ease = EASE_DAN(num_items=N_ITEMS)
ease.fit(X)
recommendations_df = process_in_batches(
    enc_eval_users=enc_eval_users,
    X=X,
    G=ease.W,
    k=300, # top_k
    batch_size=1000,
    fill_value=-1000
)
recs = enc.inverse_transform(recommendations_df) 
recs = recs.with_columns(
    pl.col('score').rank(descending=True).over('cookie').alias(f'rank_rd'),
    pl.col('cookie').cast(pl.Int64),
    pl.col('node').cast(pl.Int64)
)
print('UNSEEN-RECALL@40', recall_at(df_eval, recs, k=40))
print('UNSEEN-RECALL@100', recall_at(df_eval, recs, k=100))

UNSEEN-RECALL@40 0.17348943701276365
UNSEEN-RECALL@100 0.2819826797115451


In [None]:
os.makedirs(os.path.join(PREDICTION_PATH, MODEL_NAME), exist_ok=True)
recs.write_parquet(os.path.join(PREDICTION_PATH, MODEL_NAME, 'EASE_DAN_val.pq'))

### RDLAE

In [None]:
from autoencoders.model import RDLAE
from utils import truncate, process_batch_w_weight
from utils import Enc, convert_to_sparse, process_in_batches, recall_at
import polars as pl
import os
import numpy as np
from utils import convert

DATA_DIR = 'data'
VAL_PATH = os.path.join(DATA_DIR, 'val')
PREDICTION_PATH = 'predictions'
MODEL_NAME = 'RDLAE'
N_ITEMS = 50_000
DECAY_RATE_POS = 0.01
DECAY_RATE_TIME = 0.05
BAYESSIAN_C = 100
SMOOTHED_ALPHA = 1
SMOOTHED_BETA = 2
NOISE_INJECTION = 0.2
RATIO_COLUMN = 'bayesian_ratio_C'

df_clickstream = pl.read_parquet(os.path.join(VAL_PATH, 'clickstream.pq'))
df_eval = pl.read_parquet(os.path.join(VAL_PATH, 'gt.pq')).join(df_clickstream, on='cookie', how='semi')
df_cat_features = pl.read_parquet(os.path.join(DATA_DIR, 'cat_features.pq')) 
df_event = pl.read_parquet(os.path.join(DATA_DIR, 'events.pq'))

df_train = df_clickstream.join(df_event, on='event', how='left')
df_train = df_train.join(df_train.filter(pl.col('is_contact')==1).unique(subset=['node', 'cookie']).select('node').group_by('node').len().sort('len').tail(N_ITEMS).drop('len'),
                            on='node')
eval_users = df_eval['cookie'].unique().to_list()

#### Making bayessian columns to boost contact ration info into collaborative filtering

In [2]:
time_diff = (df_train.select('cookie', 'node', 'event_date', 'is_contact')
                           .with_columns(pl.col('event_date').dt.truncate("1d").alias('date'))
                           .group_by('cookie', 'node').agg(pl.col('event_date').max().alias('node_last_visit'),
                                                           pl.col('event_date').min().alias('node_first_visit'),
                                                           (pl.col('date').n_unique()-1).alias('n_days_clicks'),
                                                           pl.col('date').filter(pl.col('is_contact') > 0).n_unique().alias('n_days_contacts'),
                                                           pl.col('is_contact').sum().cast(pl.Int32),
                                                           pl.len().alias('cnt'),
                                                           ))
time_diff = time_diff.with_columns(
    ( -DECAY_RATE_POS * (pl.col('node_last_visit').rank(descending=True).over('cookie')-1)).exp().cast(pl.Float32).alias(f'exp_pos'),
    ( DECAY_RATE_TIME * ( pl.col('node_last_visit') - df_train.select('event_date').max() ).dt.total_days().cast(pl.Int64)).exp().cast(pl.Float32).alias('exp_time')
)

In [3]:
enc = Enc(item_key='node', user_key='cookie')
train = enc.fit(train_df=df_train.with_columns(event_weight=1.), event_weight='event_weight')
num_users, num_items = enc.get_num()
enc_eval_users = [enc.user_id_dict.get(i) for i in eval_users]

X = convert_to_sparse(train, enc)

n2n = pl.DataFrame({'node':enc.item_id_dict.keys(),   'le_node':enc.item_id_dict.values()})
c2c = pl.DataFrame({'cookie':enc.user_id_dict.keys(), 'le_cookie':enc.user_id_dict.values()})

train_sum = time_diff.join(n2n, on='node').join(c2c, on='cookie').drop('node_last_visit', 'node_first_visit')
train_sum.filter(pl.col('cookie')==1).tail(2)

cookie,node,n_days_clicks,n_days_contacts,is_contact,cnt,exp_pos,exp_time,le_node,le_cookie
i64,u32,u32,u32,i32,u32,f32,f32,i64,i64
1,262019,0,0,0,2,0.103312,0.740818,36530,1
1,214338,1,0,0,4,0.177284,0.778801,28726,1


In [4]:
extra_user_features = time_diff.group_by('cookie').agg(pl.col('n_days_clicks').sum().alias('sum_n_days_clicks'),
                                 pl.col('n_days_clicks').max().alias('max_n_days_clicks'),
                                 pl.col('n_days_contacts').sum().alias('sum_n_days_contacts'),
                                 pl.col('n_days_contacts').max().alias('max_n_days_contacts'),
                                 pl.col('is_contact').sum().alias('sum_is_contact'),
                                (pl.col('is_contact').sum() / pl.col('cnt').sum()).alias('user_contact_ratio'),
                                (pl.col('exp_pos') * pl.col('is_contact')).sum().alias('exp_pos_contact'),
                                (pl.col('exp_time') * pl.col('is_contact')).sum().alias('exp_time_contact'),
                                )
extra_user_features = extra_user_features.with_columns(
    pl.col('user_contact_ratio').cast(pl.Float32),
    pl.col('exp_pos_contact').cast(pl.Float32),
    pl.col('exp_time_contact').cast(pl.Float32),
)

In [None]:
default_contact_ratio =(df_train
                .select('cookie', 'node', 'is_contact', 'event_date').sort('is_contact')
                .unique(subset=['cookie', 'node', 'is_contact'], keep='last')
                .select('node', 'is_contact').with_columns(value=pl.lit(1))
                .pivot(
                    values="value",
                    index="node",
                    columns="is_contact",
                    aggregate_function="sum",
                ).fill_null(0)).with_columns(pl.col('0').cast(pl.Int32).alias('node_contacts_0'),
                                            pl.col('1').cast(pl.Int32).alias('node_contacts_1'),
                                            ).drop('0', '1')

train_nodes = train_sum.select('node').unique()
node_ratio = default_contact_ratio.join(train_nodes, on='node').with_columns(
    bayesian_ratio = (BAYESSIAN_C * (pl.col('node_contacts_1').sum() / pl.col('node_contacts_0').sum()) + pl.col('node_contacts_1')) / (BAYESSIAN_C + pl.col('node_contacts_0')),
    bayesian_ratio_C = (pl.col('node_contacts_0').mean() * (pl.col('node_contacts_1').sum() / pl.col('node_contacts_0').sum()) + pl.col('node_contacts_1')) / (pl.col('node_contacts_0').mean() + pl.col('node_contacts_0')),
    smoothed_ratio=(pl.col('node_contacts_1') + SMOOTHED_ALPHA) / (pl.col('node_contacts_0') + SMOOTHED_BETA),
    noisy_contacts_1 = (pl.Series(np.random.normal(0, NOISE_INJECTION, len(default_contact_ratio.join(train_nodes, on='node')))) *\
          pl.col('node_contacts_1').sqrt() + pl.col('node_contacts_1')).clip(0).round(),
).with_columns(
    noisy_bayesian_ratio_C = (pl.col('node_contacts_0').mean() * (pl.col('noisy_contacts_1').sum() / pl.col('node_contacts_0').sum()) + pl.col('noisy_contacts_1')) / (pl.col('node_contacts_0').mean() + pl.col('node_contacts_0')),
    noisy_bayesian_ratio = (BAYESSIAN_C * (pl.col('noisy_contacts_1').sum() / pl.col('node_contacts_0').sum()) + pl.col('noisy_contacts_1')) / (BAYESSIAN_C + pl.col('node_contacts_0')),
    noisy_smoothed_ratio = (pl.col('noisy_contacts_1') + SMOOTHED_ALPHA) / (pl.col('node_contacts_0') + SMOOTHED_BETA),
)

try:
    node_ratio = node_ratio.join(pl.DataFrame(
        {'node':list(enc.item_id_dict.keys()),
        'le_node':list(enc.item_id_dict.values())}),
        on='node')
except Exception:
    node_ratioc = node_ratio.drop('le_node').join(pl.DataFrame(
        {'node':list(enc.item_id_dict.keys()),
        'le_node':list(enc.item_id_dict.values())}),
        on='node')

  default_contact_ratio =(df_train


In [6]:
train_sum = train_sum.join(node_ratio.select('le_node', RATIO_COLUMN).rename({RATIO_COLUMN:'ratio_column'}) , on='le_node')

In [7]:
x_features = ['cnt','is_contact', 'n_days_clicks','n_days_contacts', 'exp_pos', 'exp_time', 'ratio_column']

x_dict = {}
for feature in x_features:
    x_dict[feature] = convert(train_sum, col=feature, enc=enc)

In [9]:
rdlae = RDLAE()
rdlae.fit((X>0).astype(np.float32))

In [10]:
weights = node_ratio.select(RATIO_COLUMN).to_numpy().reshape(-1)
weights = np.log1p(weights).astype(np.float32)

In [None]:
Gt = truncate(rdlae.G.T, k=300) 

In [19]:
recommendations_df = process_batch_w_weight(
    enc_eval_users=enc_eval_users,
    G = (rdlae.G),
    X = ((X>0) + 2*(x_dict['is_contact']>0)).astype(np.float32),
    Gt = Gt, # доп фичи?
    features_dict=x_dict,
    weights=(weights).astype(np.float32),
    k=300,
    batch_size=10_000,
    fill_value=-1000 ,
    use_torch = True
)

In [21]:
rdlae_recs

cookie,node,score
i32,i64,f32
0,1923,0.050749
0,2650,0.049023
0,116118,0.041513
0,187851,0.028902
0,214199,0.02602
…,…,…
149998,5799,0.001721
149998,17188,0.0017
149998,152032,0.001697
149998,122277,0.001696


In [24]:
rdlae_recs = enc.inverse_transform(recommendations_df) 
print('UNSEEN-RECALL@40', recall_at(df_eval, rdlae_recs.with_columns(pl.col('cookie').cast(pl.Int64), pl.col('node').cast(pl.UInt32)), k=40))
print('UNSEEN-RECALL@100', recall_at(df_eval, rdlae_recs.with_columns(pl.col('cookie').cast(pl.Int64), pl.col('node').cast(pl.UInt32)), k=100))

UNSEEN-RECALL@40 0.19937169213163403
UNSEEN-RECALL@100 0.31355099952500437


In [25]:
os.makedirs(os.path.join(PREDICTION_PATH, MODEL_NAME), exist_ok=True)
rdlae_recs.write_parquet(os.path.join(PREDICTION_PATH, MODEL_NAME, '{MODEL_NAME}_val.pq'))

## SasRec Replay

### SasRec over node

In [1]:
import polars as pl
import lightning as L
from lightning.pytorch.loggers import CSVLogger
from lightning.pytorch.callbacks import ModelCheckpoint
from torch.utils.data import DataLoader
import torch
import os

from replay.metrics import OfflineMetrics, Recall, Precision, MAP, NDCG, HitRate, MRR
from replay.metrics.torch_metrics_builder import metrics_to_df
from replay.splitters import LastNSplitter
from replay.data import (
    FeatureHint,
    FeatureInfo,
    FeatureSchema,
    FeatureSource,
    FeatureType,
    Dataset,
)
from replay.models.nn.optimizer_utils import FatOptimizerFactory
from replay.models.nn.sequential.callbacks import (
    ValidationMetricsCallback,
    SparkPredictionCallback,
    PandasPredictionCallback,
    TorchPredictionCallback,
    QueryEmbeddingsPredictionCallback,
)
from replay.models.nn.sequential.postprocessors import RemoveSeenItems
from replay.data.nn import SequenceTokenizer, SequentialDataset, TensorFeatureSource, TensorSchema, TensorFeatureInfo
from replay.models.nn.sequential import SasRec
from replay.models.nn.sequential.sasrec import (
    SasRecPredictionDataset,
    SasRecTrainingDataset,
    SasRecValidationDataset,
    SasRecPredictionBatch,
    SasRecModel,
)
import pandas as pd
import polars as pl

DATA_DIR = 'data'
VAL_DIR = 'val'


df_clickstream = pl.read_parquet(os.path.join(DATA_DIR, VAL_DIR, 'clickstream.pq'))
df_eval = pl.read_parquet(os.path.join(DATA_DIR, VAL_DIR, 'gt.pq')).join(df_clickstream, on='cookie', how='semi')
df_cat_features = pl.read_parquet(os.path.join(DATA_DIR, 'cat_features.pq'))
df_event = pl.read_parquet(os.path.join(DATA_DIR, 'events.pq'))

df_train = df_clickstream
eval_users = df_eval['cookie'].unique().to_list()

n_nodes = 30_000
train_small = df_train.join(df_train.unique(subset=['node', 'cookie']).select('node').group_by('node').len().sort('len').tail(n_nodes).drop('len'),
                        on='node')
df_eval_small = df_eval.join(df_train, on='cookie', how='semi')
df_eval_small = df_eval_small.with_columns(pl.col('node').cast(pl.Int64))

In [2]:
def prepare_feature_schema(is_ground_truth: bool) -> FeatureSchema:
    base_features = FeatureSchema(
        [
            FeatureInfo(
                column="user_id",
                feature_hint=FeatureHint.QUERY_ID,
                feature_type=FeatureType.CATEGORICAL,
            ),
            FeatureInfo(
                column="item_id",
                feature_hint=FeatureHint.ITEM_ID,
                feature_type=FeatureType.CATEGORICAL,
            ),
        ]
    )
    if is_ground_truth:
        return base_features

    all_features = base_features + FeatureSchema(
        [
            FeatureInfo(
                column="timestamp",
                feature_type=FeatureType.NUMERICAL,
                feature_hint=FeatureHint.TIMESTAMP,
            ),
        ]
    )

    return all_features


def filter_n_count(df, column='item_id', n=5):
    to_save = df.group_by(column).agg(pl.count().alias('count')).filter(pl.col('count')>n)
    return df.join(to_save, on=column, how='semi')

def allign_gt(df, gt, item_col='item_id', user_col='user_id'):
    gt = gt.join(df, on=user_col, how='semi')
    gt = gt.join(df, on=item_col, how='semi')
    return gt

def make_replay_format(df, is_gt=False):
    df = df.with_columns(
        [
            pl.col('cookie').alias('user_id').cast(pl.Int64), 
            pl.col('node').alias('item_id').cast(pl.Int64)
        ])
    if not is_gt:
        df = df.with_columns(pl.col("event_date").dt.timestamp("ms").alias("timestamp") // 1000)
    return df

validation_gt = make_replay_format(df_eval_small, is_gt=True)['user_id', 'item_id']
train_events = make_replay_format(train_small)['user_id', 'item_id','timestamp']
item_before = train_events['item_id'].unique().len()
item_after = train_events['item_id'].unique().len()

print(f'Before: {item_before}')
print(f'After: {item_after}')
print(f'Items save {100 * item_after/item_before} %')
validation_gt = allign_gt(train_events, validation_gt)
user_features = train_events[['user_id']].unique()
item_features = train_events[['item_id']].unique()
validation_events = train_events

Before: 30000
After: 30000
Items save 100.0 %


In [None]:
MAX_SEQ_LEN = 100
BATCH_SIZE = 128
NUM_WORKERS = 9
MAX_EPOCHS = 10

train_dataset = Dataset(
    feature_schema=prepare_feature_schema(is_ground_truth=False),
    interactions=train_events,
    query_features=user_features,
    item_features=item_features,
    check_consistency=True,
    categorical_encoded=False,
)

validation_dataset = Dataset(
    feature_schema=prepare_feature_schema(is_ground_truth=False),
    interactions=validation_events,
    query_features=user_features,
    item_features=item_features,
    check_consistency=True,
    categorical_encoded=False,
)
validation_gt = Dataset(
    feature_schema=prepare_feature_schema(is_ground_truth=True),
    interactions=validation_gt,
    check_consistency=True,
    categorical_encoded=False,
)

ITEM_FEATURE_NAME = "item_id_seq"

tensor_schema = TensorSchema(
    [
        TensorFeatureInfo(
            name=ITEM_FEATURE_NAME,
            is_seq=True,
            feature_type=FeatureType.CATEGORICAL,
            feature_sources=[TensorFeatureSource(FeatureSource.INTERACTIONS, train_dataset.feature_schema.item_id_column)],
            feature_hint=FeatureHint.ITEM_ID,
        )
    ]
)

tokenizer = SequenceTokenizer(tensor_schema, allow_collect_to_master=True)
tokenizer.fit(train_dataset)

sequential_train_dataset = tokenizer.transform(train_dataset)

sequential_validation_dataset = tokenizer.transform(validation_dataset)
sequential_validation_gt = tokenizer.transform(validation_gt, [tensor_schema.item_id_feature_name])

sequential_validation_dataset, sequential_validation_gt = SequentialDataset.keep_common_query_ids(
    sequential_validation_dataset, sequential_validation_gt
)

model = SasRec(
    tensor_schema,
    block_count=2,
    head_count=2,
    max_seq_len=MAX_SEQ_LEN,
    hidden_size=128,
    dropout_rate=0.3,
    optimizer_factory=FatOptimizerFactory(learning_rate=0.001),
    loss_sample_count=4_000,
    negatives_sharing=True
)

csv_logger = CSVLogger(save_dir=".logs/train", name="SASRec")

checkpoint_callback = ModelCheckpoint(
    dirpath=".checkpoints/sasrec_replay",
    save_top_k=1,
    verbose=True,
    monitor="recall@40",
    mode="max",
)

train_dataloader = DataLoader(
    dataset=SasRecTrainingDataset(
        sequential_train_dataset,
        max_sequence_length=MAX_SEQ_LEN,
    ),
    batch_size=BATCH_SIZE,
    shuffle=True,
    num_workers=NUM_WORKERS,
    pin_memory=True,
)

validation_dataloader = DataLoader(
    dataset=SasRecValidationDataset(
        sequential_validation_dataset,
        sequential_validation_gt,
        sequential_train_dataset,
        max_sequence_length=MAX_SEQ_LEN,
    ),
    batch_size=BATCH_SIZE,
    num_workers=NUM_WORKERS,
    pin_memory=True,
)

validation_metrics_callback = ValidationMetricsCallback(
    metrics=["recall", 
             # "ndcg", "map", "coverage"
             ],
    ks=[20, 40, 100],
    item_count=train_dataset.item_count,
    postprocessors=[RemoveSeenItems(sequential_validation_dataset)],
)

trainer = L.Trainer(
    max_epochs=MAX_EPOCHS,
    callbacks=[checkpoint_callback, validation_metrics_callback],
    logger=csv_logger,
    accelerator='cuda',
)

trainer.fit(
    model,
    train_dataloaders=train_dataloader,
    val_dataloaders=validation_dataloader,
)

  dataset=SasRecTrainingDataset(
  self._inner = TorchSequentialDataset(
  dataset=SasRecValidationDataset(
  self._inner = TorchSequentialValidationDataset(
  self._inner = TorchSequentialDataset(
GPU available: True (cuda), used: True
TPU available: False, using: 0 TPU cores
HPU available: False, using: 0 HPUs
You are using a CUDA device ('NVIDIA GeForce RTX 4070') that has Tensor Cores. To properly utilize them, you should set `torch.set_float32_matmul_precision('medium' | 'high')` which will trade-off precision for performance. For more details, read https://pytorch.org/docs/stable/generated/torch.set_float32_matmul_precision.html#torch.set_float32_matmul_precision
c:\Users\Grig\miniconda3\envs\my_env_py3.9\lib\site-packages\lightning\pytorch\callbacks\model_checkpoint.py:654: Checkpoint directory C:\code\avito_hack_clear\.checkpoints\sasrec_replay exists and is not empty.
LOCAL_RANK: 0 - CUDA_VISIBLE_DEVICES: [0]

  | Name   | Type             | Params | Mode 
--------------------

Sanity Checking: |          | 0/? [00:00<?, ?it/s]

c:\Users\Grig\miniconda3\envs\my_env_py3.9\lib\site-packages\lightning\pytorch\trainer\connectors\data_connector.py:420: Consider setting `persistent_workers=True` in 'val_dataloader' to speed up the dataloader worker initialization.
c:\Users\Grig\miniconda3\envs\my_env_py3.9\lib\site-packages\lightning\pytorch\trainer\connectors\data_connector.py:420: Consider setting `persistent_workers=True` in 'train_dataloader' to speed up the dataloader worker initialization.


Training: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Epoch 0, global step 989: 'recall@40' reached 0.14474 (best 0.14474), saving model to 'C:\\code\\avito_hack_clear\\.checkpoints\\sasrec_replay\\epoch=0-step=989.ckpt' as top 1


k            100        20        40
recall  0.234696  0.097424  0.144743



Validation: |          | 0/? [00:00<?, ?it/s]

Epoch 1, global step 1978: 'recall@40' reached 0.15222 (best 0.15222), saving model to 'C:\\code\\avito_hack_clear\\.checkpoints\\sasrec_replay\\epoch=1-step=1978.ckpt' as top 1


k            100        20        40
recall  0.246902  0.101545  0.152224



Validation: |          | 0/? [00:00<?, ?it/s]

Epoch 2, global step 2967: 'recall@40' reached 0.15333 (best 0.15333), saving model to 'C:\\code\\avito_hack_clear\\.checkpoints\\sasrec_replay\\epoch=2-step=2967.ckpt' as top 1


k            100        20        40
recall  0.248557  0.102813  0.153327



Validation: |          | 0/? [00:00<?, ?it/s]

Epoch 3, global step 3956: 'recall@40' reached 0.15561 (best 0.15561), saving model to 'C:\\code\\avito_hack_clear\\.checkpoints\\sasrec_replay\\epoch=3-step=3956-v1.ckpt' as top 1


k            100       20       40
recall  0.254591  0.10464  0.15561



Validation: |          | 0/? [00:00<?, ?it/s]

Epoch 4, global step 4945: 'recall@40' reached 0.15818 (best 0.15818), saving model to 'C:\\code\\avito_hack_clear\\.checkpoints\\sasrec_replay\\epoch=4-step=4945.ckpt' as top 1


k            100        20        40
recall  0.258511  0.104967  0.158176



Validation: |          | 0/? [00:00<?, ?it/s]

Epoch 5, global step 5934: 'recall@40' reached 0.16291 (best 0.16291), saving model to 'C:\\code\\avito_hack_clear\\.checkpoints\\sasrec_replay\\epoch=5-step=5934.ckpt' as top 1


k            100        20        40
recall  0.266703  0.108921  0.162912



Validation: |          | 0/? [00:00<?, ?it/s]

Epoch 6, global step 6923: 'recall@40' was not in top 1


k           100        20        40
recall  0.26048  0.104784  0.158601



Validation: |          | 0/? [00:00<?, ?it/s]

Epoch 7, global step 7912: 'recall@40' was not in top 1


k            100        20        40
recall  0.263805  0.107246  0.161621



Validation: |          | 0/? [00:00<?, ?it/s]

Epoch 8, global step 8901: 'recall@40' was not in top 1


k            100       20        40
recall  0.264808  0.10683  0.161539



Validation: |          | 0/? [00:00<?, ?it/s]

Epoch 9, global step 9890: 'recall@40' was not in top 1
`Trainer.fit` stopped: `max_epochs=10` reached.


k            100        20        40
recall  0.261147  0.105352  0.159423



In [10]:
best_model = SasRec.load_from_checkpoint(checkpoint_callback.best_model_path).eval()

PREDICTION_PATH = 'predictions'
MODEL_NAME = 'SASREC_REPLAY_NODE'

prediction_dataloader = DataLoader(
    dataset=SasRecPredictionDataset(
        sequential_validation_dataset,
        max_sequence_length=MAX_SEQ_LEN,
    ),
    batch_size=BATCH_SIZE,
    num_workers=NUM_WORKERS,
    pin_memory=True,
)

csv_logger = CSVLogger(save_dir=".logs/test", name="SASRec")

TOPK = [300]

postprocessors = [RemoveSeenItems(sequential_validation_dataset)]

pandas_prediction_callback = PandasPredictionCallback(
    top_k=max(TOPK),
    query_column="user_id",
    item_column="item_id",
    rating_column="score",
    postprocessors=postprocessors,
)


trainer = L.Trainer(
    callbacks=[
        pandas_prediction_callback,
    ],
    logger=csv_logger,
    inference_mode=True,
)
trainer.predict(best_model, dataloaders=prediction_dataloader, return_predictions=False)

pandas_res = pandas_prediction_callback.get_result()
recommendations = tokenizer.query_and_item_id_encoder.inverse_transform(pandas_res)
recommendations = pl.from_pandas(recommendations)

os.makedirs(os.path.join(PREDICTION_PATH, MODEL_NAME), exist_ok=True)
recommendations.rename({'user_id':'cookie', 'item_id':'node'}).write_parquet(os.path.join(PREDICTION_PATH, MODEL_NAME, 'SASREC_NODE_REPLAY_VAL.pq'))

  dataset=SasRecPredictionDataset(
  self._inner = TorchSequentialDataset(
You are using the plain ModelCheckpoint callback. Consider using LitModelCheckpoint which with seamless uploading to Model registry.
GPU available: True (cuda), used: True
TPU available: False, using: 0 TPU cores
HPU available: False, using: 0 HPUs
LOCAL_RANK: 0 - CUDA_VISIBLE_DEVICES: [0]
c:\Users\Grig\miniconda3\envs\my_env_py3.9\lib\site-packages\lightning\pytorch\trainer\connectors\data_connector.py:420: Consider setting `persistent_workers=True` in 'predict_dataloader' to speed up the dataloader worker initialization.


Predicting: |          | 0/? [00:00<?, ?it/s]

### SasRec Replay over category

In [1]:
import polars as pl
import lightning as L
import os
from lightning.pytorch.loggers import CSVLogger
from lightning.pytorch.callbacks import ModelCheckpoint
from torch.utils.data import DataLoader
import torch

from replay.metrics import OfflineMetrics, Recall, Precision, MAP, NDCG, HitRate, MRR
from replay.metrics.torch_metrics_builder import metrics_to_df
from replay.splitters import LastNSplitter
from replay.data import (
    FeatureHint,
    FeatureInfo,
    FeatureSchema,
    FeatureSource,
    FeatureType,
    Dataset,
)
from replay.models.nn.optimizer_utils import FatOptimizerFactory
from replay.models.nn.sequential.callbacks import (
    ValidationMetricsCallback,
    SparkPredictionCallback,
    PandasPredictionCallback,
    TorchPredictionCallback,
    QueryEmbeddingsPredictionCallback,
)
from replay.models.nn.sequential.postprocessors import RemoveSeenItems
from replay.data.nn import SequenceTokenizer, SequentialDataset, TensorFeatureSource, TensorSchema, TensorFeatureInfo
from replay.models.nn.sequential import SasRec
from replay.models.nn.sequential.sasrec import (
    SasRecPredictionDataset,
    SasRecTrainingDataset,
    SasRecValidationDataset,
    SasRecPredictionBatch,
    SasRecModel,
)
import pandas as pd
import polars as pl

DATA_DIR = 'data'
VAL_DIR = 'val'


df_clickstream = pl.read_parquet(os.path.join(DATA_DIR, VAL_DIR, 'clickstream.pq'))
df_eval = pl.read_parquet(os.path.join(DATA_DIR, VAL_DIR, 'gt.pq')).join(df_clickstream, on='cookie', how='semi')
df_cat_features = pl.read_parquet(os.path.join(DATA_DIR, 'cat_features.pq'))
df_event = pl.read_parquet(os.path.join(DATA_DIR, 'events.pq'))

df_train = df_clickstream
eval_users = df_eval['cookie'].unique().to_list()

n_nodes = 30_000
train_small = df_train.join(df_train.unique(subset=['node', 'cookie']).select('node').group_by('node').len().sort('len').tail(n_nodes).drop('len'),
                        on='node')
df_eval_small = df_eval.join(df_train, on='cookie', how='semi')
df_eval_small = df_eval_small.with_columns(pl.col('node').cast(pl.Int64))

In [2]:
def prepare_feature_schema(is_ground_truth: bool) -> FeatureSchema:
    base_features = FeatureSchema(
        [
            FeatureInfo(
                column="user_id",
                feature_hint=FeatureHint.QUERY_ID,
                feature_type=FeatureType.CATEGORICAL,
            ),
            FeatureInfo(
                column="item_id",
                feature_hint=FeatureHint.ITEM_ID,
                feature_type=FeatureType.CATEGORICAL,
            ),
        ]
    )
    if is_ground_truth:
        return base_features

    all_features = base_features + FeatureSchema(
        [
            FeatureInfo(
                column="timestamp",
                feature_type=FeatureType.NUMERICAL,
                feature_hint=FeatureHint.TIMESTAMP,
            ),
        ]
    )

    return all_features


def filter_n_count(df, column='item_id', n=5):
    to_save = df.group_by(column).agg(pl.count().alias('count')).filter(pl.col('count')>n)
    return df.join(to_save, on=column, how='semi')

def allign_gt(df, gt, item_col='item_id', user_col='user_id'):
    gt = gt.join(df, on=user_col, how='semi')
    gt = gt.join(df, on=item_col, how='semi')
    return gt

def make_replay_format(df, is_gt=False):
    df = df.with_columns(
        [
            pl.col('cookie').alias('user_id').cast(pl.Int64), 
            pl.col('category').alias('item_id').cast(pl.Int64)
        ])
    if not is_gt:
        df = df.with_columns(pl.col("event_date").dt.timestamp("ms").alias("timestamp") // 1000)
    return df

validation_gt = make_replay_format(df_eval_small.join(df_cat_features.with_columns(pl.col('node').cast(pl.Int64)).drop_nulls().select('node', 'category').unique(), on='node', how='left'), is_gt=True)['user_id', 'item_id'].unique()
train_events = make_replay_format(train_small.join(df_cat_features.drop_nulls().select('node', 'category').unique(), on='node', how='left'))['user_id', 'item_id','timestamp']
item_before = train_events['item_id'].unique().len()
item_after = train_events['item_id'].unique().len()

print(f'Before: {item_before}')
print(f'After: {item_after}')
print(f'Items save {100 * item_after/item_before} %')
validation_gt = allign_gt(train_events, validation_gt)
item_features = train_events[['item_id']].unique()
validation_events = train_events

user_features = make_replay_format(df_clickstream.join(df_cat_features.drop_nulls().select('node', 'category').unique(), on='node', how='left'))[['user_id']].unique()

Before: 51
After: 51
Items save 100.0 %


In [3]:
MAX_SEQ_LEN = 100
BATCH_SIZE = 512
NUM_WORKERS = 9
MAX_EPOCHS = 5

train_dataset = Dataset(
    feature_schema=prepare_feature_schema(is_ground_truth=False),
    interactions=train_events,
    query_features=user_features,
    item_features=item_features,
    check_consistency=True,
    categorical_encoded=False,
)

validation_dataset = Dataset(
    feature_schema=prepare_feature_schema(is_ground_truth=False),
    interactions=validation_events,
    query_features=user_features,
    item_features=item_features,
    check_consistency=True,
    categorical_encoded=False,
)
validation_gt = Dataset(
    feature_schema=prepare_feature_schema(is_ground_truth=True),
    interactions=validation_gt,
    check_consistency=True,
    categorical_encoded=False,
)

ITEM_FEATURE_NAME = "item_id_seq"

tensor_schema = TensorSchema(
    [
        TensorFeatureInfo(
            name=ITEM_FEATURE_NAME,
            is_seq=True,
            feature_type=FeatureType.CATEGORICAL,
            feature_sources=[TensorFeatureSource(FeatureSource.INTERACTIONS, train_dataset.feature_schema.item_id_column)],
            feature_hint=FeatureHint.ITEM_ID,
        )
    ]
)

tokenizer = SequenceTokenizer(tensor_schema, allow_collect_to_master=True)
tokenizer.fit(train_dataset)

sequential_train_dataset = tokenizer.transform(train_dataset)

sequential_validation_dataset = tokenizer.transform(validation_dataset)
sequential_validation_gt = tokenizer.transform(validation_gt, [tensor_schema.item_id_feature_name])

sequential_validation_dataset, sequential_validation_gt = SequentialDataset.keep_common_query_ids(
    sequential_validation_dataset, sequential_validation_gt
)

model = SasRec(
    tensor_schema,
    block_count=2,
    head_count=2,
    max_seq_len=MAX_SEQ_LEN,
    hidden_size=128,
    dropout_rate=0.3,
    optimizer_factory=FatOptimizerFactory(learning_rate=0.001),
    loss_sample_count=50,
    negatives_sharing=True
)

csv_logger = CSVLogger(save_dir=".logs/train", name="SASRec_category")

checkpoint_callback = ModelCheckpoint(
    dirpath=".checkpoints/sasrec_category",
    save_top_k=1,
    verbose=True,
    monitor="ndcg@5",
    mode="max",
)

train_dataloader = DataLoader(
    dataset=SasRecTrainingDataset(
        sequential_train_dataset,
        max_sequence_length=MAX_SEQ_LEN,
    ),
    batch_size=BATCH_SIZE,
    shuffle=True,
    num_workers=NUM_WORKERS,
    pin_memory=True,
)

validation_dataloader = DataLoader(
    dataset=SasRecValidationDataset(
        sequential_validation_dataset,
        sequential_validation_gt,
        sequential_train_dataset,
        max_sequence_length=MAX_SEQ_LEN,
    ),
    batch_size=BATCH_SIZE,
    num_workers=NUM_WORKERS,
    pin_memory=True,
)

validation_metrics_callback = ValidationMetricsCallback(
    metrics=["recall", 'map', 'ndcg'],
    ks=[1, 5, 10],
    item_count=train_dataset.item_count,
    postprocessors=[],
)

trainer = L.Trainer(
    max_epochs=MAX_EPOCHS,
    callbacks=[checkpoint_callback, validation_metrics_callback],
    logger=csv_logger,
    accelerator='cuda',
)

trainer.fit(
    model,
    train_dataloaders=train_dataloader,
    val_dataloaders=validation_dataloader,
)

  dataset=SasRecTrainingDataset(
  self._inner = TorchSequentialDataset(
  dataset=SasRecValidationDataset(
  self._inner = TorchSequentialValidationDataset(
  self._inner = TorchSequentialDataset(
GPU available: True (cuda), used: True
TPU available: False, using: 0 TPU cores
HPU available: False, using: 0 HPUs
You are using a CUDA device ('NVIDIA GeForce RTX 4070') that has Tensor Cores. To properly utilize them, you should set `torch.set_float32_matmul_precision('medium' | 'high')` which will trade-off precision for performance. For more details, read https://pytorch.org/docs/stable/generated/torch.set_float32_matmul_precision.html#torch.set_float32_matmul_precision
c:\Users\Grig\miniconda3\envs\my_env_py3.9\lib\site-packages\lightning\pytorch\callbacks\model_checkpoint.py:654: Checkpoint directory C:\code\avito_hack_clear\.checkpoints\sasrec_category exists and is not empty.
LOCAL_RANK: 0 - CUDA_VISIBLE_DEVICES: [0]

  | Name   | Type             | Params | Mode 
------------------

Sanity Checking: |          | 0/? [00:00<?, ?it/s]

c:\Users\Grig\miniconda3\envs\my_env_py3.9\lib\site-packages\lightning\pytorch\trainer\connectors\data_connector.py:420: Consider setting `persistent_workers=True` in 'val_dataloader' to speed up the dataloader worker initialization.
c:\Users\Grig\miniconda3\envs\my_env_py3.9\lib\site-packages\lightning\pytorch\trainer\connectors\data_connector.py:420: Consider setting `persistent_workers=True` in 'train_dataloader' to speed up the dataloader worker initialization.


Training: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Epoch 0, global step 248: 'ndcg@5' reached 0.46160 (best 0.46160), saving model to 'C:\\code\\avito_hack_clear\\.checkpoints\\sasrec_category\\epoch=0-step=248-v1.ckpt' as top 1


k              1        10         5
map     0.298366  0.411104  0.381566
ndcg    0.298366  0.518872  0.461601
recall  0.212957  0.765572  0.609367



Validation: |          | 0/? [00:00<?, ?it/s]

Epoch 1, global step 496: 'ndcg@5' reached 0.46626 (best 0.46626), saving model to 'C:\\code\\avito_hack_clear\\.checkpoints\\sasrec_category\\epoch=1-step=496.ckpt' as top 1


k              1        10         5
map     0.304509  0.415716  0.386477
ndcg    0.304509  0.522943  0.466262
recall  0.217589  0.767836  0.613223



Validation: |          | 0/? [00:00<?, ?it/s]

Epoch 2, global step 744: 'ndcg@5' was not in top 1


k              1        10         5
map     0.304745  0.415012  0.385685
ndcg    0.304745  0.522484  0.465802
recall  0.217629  0.768077  0.613735



Validation: |          | 0/? [00:00<?, ?it/s]

Epoch 3, global step 992: 'ndcg@5' was not in top 1


k              1        10         5
map     0.301401  0.412559  0.382953
ndcg    0.301401  0.520390  0.463110
recall  0.214911  0.767287  0.611289



Validation: |          | 0/? [00:00<?, ?it/s]

Epoch 4, global step 1240: 'ndcg@5' was not in top 1
`Trainer.fit` stopped: `max_epochs=5` reached.


k              1        10         5
map     0.298312  0.410138  0.380102
ndcg    0.298312  0.518359  0.460306
recall  0.212801  0.766803  0.608767



In [4]:
best_model = SasRec.load_from_checkpoint(checkpoint_callback.best_model_path).eval()

PREDICTION_PATH = 'predictions'
MODEL_NAME = 'SASREC_REPLAY_CATEGORY'

prediction_dataloader = DataLoader(
    dataset=SasRecPredictionDataset(
        sequential_validation_dataset,
        max_sequence_length=MAX_SEQ_LEN,
    ),
    batch_size=BATCH_SIZE,
    num_workers=NUM_WORKERS,
    pin_memory=True,
)

csv_logger = CSVLogger(save_dir=".logs/test", name="SASRec_category")

TOPK = [50]

postprocessors = []

pandas_prediction_callback = PandasPredictionCallback(
    top_k=max(TOPK),
    query_column="user_id",
    item_column="item_id",
    rating_column="score",
    postprocessors=postprocessors,
)


trainer = L.Trainer(
    callbacks=[
        pandas_prediction_callback,
    ],
    logger=csv_logger,
    inference_mode=True,
)
trainer.predict(best_model, dataloaders=prediction_dataloader, return_predictions=False)

pandas_res = pandas_prediction_callback.get_result()
recommendations = tokenizer.query_and_item_id_encoder.inverse_transform(pandas_res)
recommendations = pl.from_pandas(recommendations)

os.makedirs(os.path.join(PREDICTION_PATH, MODEL_NAME), exist_ok=True)
recommendations.rename({'user_id':'cookie', 'item_id':'node'}).write_parquet(os.path.join(PREDICTION_PATH, MODEL_NAME, 'SASREC_CATEGORY_REPLAY_VAL.pq'))

  dataset=SasRecPredictionDataset(
  self._inner = TorchSequentialDataset(
You are using the plain ModelCheckpoint callback. Consider using LitModelCheckpoint which with seamless uploading to Model registry.
GPU available: True (cuda), used: True
TPU available: False, using: 0 TPU cores
HPU available: False, using: 0 HPUs
LOCAL_RANK: 0 - CUDA_VISIBLE_DEVICES: [0]
c:\Users\Grig\miniconda3\envs\my_env_py3.9\lib\site-packages\lightning\pytorch\trainer\connectors\data_connector.py:420: Consider setting `persistent_workers=True` in 'predict_dataloader' to speed up the dataloader worker initialization.


Predicting: |          | 0/? [00:00<?, ?it/s]