<a href="https://colab.research.google.com/github/wogsim/two_tower_recmodel/blob/main/notebooks/two_tower_rank_model.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>



# Рекомендации на основе Трансформера

Этот проект — это система рекомендаций, использующая архитектуру **Трансформер** для анализа истории позитивных действий пользователя.

**Цель:** Научить модель эффективно ранжировать товары, которые пользователь, скорее всего, добавит в корзину.

---

## 1. Входные данные (История взаимодействий)

Мы будем использовать историю **положительных взаимодействий** пользователя (сессии):
* **Клики**
* **Добавления в корзину**
* **Покупки**

(Примечание: Просмотры временно **игнорируются** для упрощения, но могут быть добавлены позже.)

---

## 2. Двухэтапное Обучение

Обучение модели будет состоять из двух фаз: **Pretrain** (Предварительное обучение) и **Finetune** (Тонкая настройка).

### А. Pretrain (Предварительное обучение)

* **Задача:** Обучить модель решать **общую задачу**, максимально утилизируя **все имеющиеся данные** (клики, корзины, покупки).
* **Результат:** Получение сильных общих представлений (эмбеддингов) для пользователей и товаров.
* **Свойство:** Хорошо масштабируется — добавление данных повышает качество.

### Б. Finetune (Тонкая настройка)

* **Задача:** **Адаптировать** модель под **конкретную целевую задачу ранжирования** на тесте.
* **Целевая Метрика:** **$ndcg@10$** (Normalized Discounted Cumulative Gain на 10) по группам (`request_id`).
* **Метод:** Обучение на **попарную функцию потерь** (Pairwise Loss) — будем использовать **Calibrated Pairwise Logistic** (подробности ниже).

---

## 3. Целевая Задача Finetune (Ранжирование)

На этапе Finetune мы фокусируемся на ранжировании внутри групп (`request_id`).

* **Метки:**
    * **1 (Позитивный):** Товар был **добавлен в корзину**.
    * **0 (Негативный):** Товар был **просмотрен** (но не добавлен в корзину).
* **Исключения:** Клики и покупки **не** учитываются, так как их нет в итоговом тестовом наборе.
* **Цель:** **Отранжировать** единички (корзины) как можно **выше** ноликов (просмотров)

### Pretrain:

<div align="center">
  <img src="https://i.ibb.co/8GksnWD/Screenshot-2025-05-04-at-10-15-36.png" width="500" alt="pretrain">
</div>

Предварительное обучение состоит из двух взаимодополняющих задач: **Next-Positive Prediction (NPP)** и **Feedback Prediction (FP)**.

### Общая структура токена

Каждое положительное взаимодействие ($t$) кодируется как **токен**, который является суммой векторов трёх характеристик:
$$\text{Токен}_t = \mathbf{c}_t + \mathbf{i}_t + \mathbf{f}_t$$
* $\mathbf{i}_t$ — **Товар** (Item) взаимодействия.
* $\mathbf{c}_t$ — **Контекст** (Context) взаимодействия.
* $\mathbf{f}_t$ — **Фидбек** (Feedback): клик, корзина или покупка.

### Next-Positive Prediction ($\mathcal{L}_{\mathrm{NPP}}$)
* **Задача:** Предсказать вероятность следующего товара $i_t$, используя Softmax и косинусное сходство:
$$
P(\text{item}=i_t\mid \text{history}=S_{t-1},\;\text{context}=c_t)\,.
$$
* **Позднее связывание (Next-Positive):** Трансформер выдает $h_{t-1}$. Для предсказания используем:
$$
\hat h^c_t = \mathrm{MLP}\bigl(\mathrm{Concat}(h_{t-1},\mathbf{c}_t)\bigr).
$$

### Feedback Prediction ($\mathcal{L}_{\mathrm{FP}}$)
* **Задача:** Предсказать вероятность типа фидбека $f_t$ (классификация на три класса: клик, корзина, покупка):
$$
P(\text{feedback}=f_t\mid \text{history}=S_{t-1},\;\text{context}=c_t,\;\text{item}=i_t).
$$
* **Позднее связывание (Feedback):** Используем $h_{t-1}$, $\mathbf{c}_t$ и $\mathbf{i}_t$:
$$
\hat h^i_t = \mathrm{MLP}\bigl(\mathrm{Concat}(h_{t-1},\mathbf{c}_t,\mathbf{i}_t)\bigr).
$$

### Итоговый Pretrain Loss

$$\mathcal{L}_{\rm pre\text{-}train} = \mathcal{L}_{\mathrm{NPP}} + \mathcal{L}_{\mathrm{FP}}.$$

### Finetune

<p align="center">
  <img src="https://i.ibb.co/kgwt7pRb/Screenshot-2025-05-04-at-10-15-48.png" width="500" alt="finetune">
</p>


#### Постановка задачи

Пусть для пользователя с идентификатором `user_id` есть сформированная история взаимодействий и набор групп товаров, каждая из которых имеет свой `request_id`.

* **Скрытые состояния:**
    $$h_0, h_1, \dots, h_t$$
* **Группа (`request_id`):** Множество товаров, каждый из которых имеет бинарную метку:
    * **1:** Товар был добавлен в корзину (позитив).
    * **0:** Товар был просмотрен (негатив).

**Цель:** Для каждого товара внутри группы оценить его **релевантность** пользователю, используя актуальное скрытое состояние пользователя.

---

#### Учет временной задержки (Consistency with Validation)

На валидации и тесте между последним известным состоянием истории и моментом показа новой группы может быть значительный разрыв (от 2 дней до 1 месяца). Чтобы имитировать это и обеспечить консистентность, мы добавляем задержку в процесс обучения:

1.  **Выбор задержки:** Для каждой группы случайно выбирается задержка $\Delta$:
$$\Delta \sim \mathrm{Uniform}(2\ \text{дн.},\ 32 \text{дн.}).$$
2.  **Выбор состояния:** Находим самое *позднее* скрытое состояние $h_k$, которое предшествует текущей группе *не менее* чем на $\Delta$.
3.  **Вектор пользователя:** Используем это состояние **$h_k$** как итоговый вектор пользователя для расчета потерь в данной группе.

---

#### Попарная ранжирующая функция потерь

Мы обучаем модель ранжировать позитивные товары выше негативных в рамках одной группы.

1.  **Формирование пар:** Внутри каждой группы (`request_id`) генерируем все возможные пары товаров $(i, j)$, где:
    * Товар $i$ имеет метку **1** (корзина, позитив).
    * Товар $j$ имеет метку **0** (просмотр, негатив).
2.  **Расчет потерь:** Для каждой такой пары рассчитываем ранжирующую функцию потерь (например, **Calibrated Pairwise Logistic**), используя предсказанные релевантности для товаров $i$ и $j$.

## 2. Предобработка данных

In [None]:
!git clone https://github.com/wogsim/two_tower_recmodel

In [None]:
!pip install -r /content/two_tower_recmodel/notebooks/requirements.txt
!pip install -e two_tower_recmodel/grocery

In [None]:
DATA_DIR = "/content/recsys_course/data/lavka"

from grocery.utils.dataset import download_and_extract
download_and_extract(
     url="https://www.kaggle.com/api/v1/datasets/download/thekabeton/ysda-recsys-2025-lavka-dataset",
     filename="lavka.zip",
    dest_dir=DATA_DIR
)

In [None]:
DATA_DIR = "/content/recsys_course/data/lavka"

In [None]:
import polars as pl
from collections import deque
from typing import Dict, Any, Generator, Iterable, Optional
from abc import ABC, abstractmethod
from itertools import chain

 Оставляем только небольшой поднабор всех признаков. Остальные признаки буду учтены в будущем.

In [None]:
train_df = pl.read_parquet(DATA_DIR + '/train.parquet').select(['action_type', 'product_id', 'source_type', 'timestamp', 'user_id', 'request_id'])

### Разделение на трейн и валидация:

<div align="center">
  <img src="https://i.ibb.co/yBPn87t7/IMG000-19.jpg" width="500" alt="split">
</div>

Для оценки модели будем использовать train данные. Из них сформируем валидационную и обучающую части. Для того, чтобы получить корректные метрики на валидации, важно повторить все особенности тестовых данных:
- 2 дня разница между train и valid
- 1 месяц на valid
- оставляем только просмотр и корзину
- группы с >= 10 товарами
- группы с хотя бы одной корзиной и хотя бы одним просмотром

In [None]:
class ActionType:
    VIEW = 'AT_View'
    CLICK = 'AT_Click'
    CART_UPDATE = 'AT_CartUpdate'
    PURCHASE = 'AT_Purchase'


class Preprocessor:
    mapping_action_types = {
        ActionType.VIEW: 0,
        ActionType.CART_UPDATE: 1,
        ActionType.CLICK: 2,
        ActionType.PURCHASE: 3
    }
    def __init__(
        self,
        train_df: pl.DataFrame,
    ):
        self.train_df = train_df

    def _map_col(self, column: str, cast: pl.DataType = None) -> dict:
        uniques = sorted(self.train_df.select(pl.col(column)).unique().to_series().to_list())
        mapping = {val: idx for idx, val in enumerate(uniques)}

        for attr in ("train_df",):
            df = getattr(self, attr)
            df = df.with_columns(
                pl.col(column)
                .replace(mapping)
                .alias(column)
            )
            if cast is not None:
                df = df.with_columns(pl.col(column).cast(cast))
            setattr(self, attr, df)

        return mapping

    def run(self):
        self.train_df = self.train_df.with_columns(
            pl.col("source_type").fill_null("").alias("source_type")
        )

        self.mapping_product_ids = self._map_col("product_id")
        self.mapping_user_ids = self._map_col("user_id")
        self.mapping_source_types = self._map_col("source_type", cast=pl.Int8)

        self.train_df = self.train_df.with_columns(
            pl.col("action_type")
            .replace(self.mapping_action_types)
            .cast(pl.Int8)
            .alias("action_type")
        )

        self.targets = (
            self.train_df
            .filter(
                pl.col("request_id").is_not_null() &
                pl.col("action_type").is_in([0, 1]) &
                (pl.col("source_type") != self.mapping_source_types["ST_Catalog"])
            )
            .group_by([
                "user_id",
                "request_id",
                "product_id",
            ])
            .agg([
                pl.col("action_type").max(),
                pl.col("timestamp").min(),
                pl.col("source_type").mode().first()
            ])
        )

        requests_with_cartupdate_and_view = (
            self.targets
            .select(["request_id", "action_type", "timestamp"])
            .group_by("request_id")
            .agg([
                pl.col("action_type").max().alias("max_t"),
                pl.col("action_type").min().alias("min_t"),
                pl.len(),
                pl.col("timestamp").min().alias("req_ts")
            ])
            .with_columns(sum_targets=pl.col('max_t').add(pl.col('min_t')))
            .filter(pl.col('sum_targets') == 1)
            .filter(pl.col('len') >= 10)
            .select(["request_id", "req_ts"])
        )
        self.targets = (
            self.targets
            .drop("timestamp")
            .join(requests_with_cartupdate_and_view, on="request_id", how="inner")
            .with_columns(pl.col("req_ts").alias("timestamp"))
            .drop("req_ts")
        )
        self.targets = (
            self.targets
            .group_by(['user_id', 'request_id', 'timestamp', 'source_type'])
            .agg([
                pl.col('product_id'),
                pl.col('action_type'),
            ])
        )

        self.timesplit_valid_end = self.train_df["timestamp"].max()
        self.timesplit_valid_start = self.timesplit_valid_end - 30 * 24 * 60 * 60
        self.timesplit_train_end = self.timesplit_valid_start - 2 * 24 * 60 * 60
        self.timesplit_train_start = self.train_df["timestamp"].min()

        self.train_df = (
            self.train_df
            .filter(pl.col("action_type") != 0)
            .drop("request_id")
        )

        self.train_targets = self.targets.filter(
            pl.col("timestamp") <= self.timesplit_train_end
        )
        self.valid_targets = self.targets.filter(
            (pl.col("timestamp") > self.timesplit_valid_start) &
            (pl.col("timestamp") <= self.timesplit_valid_end)
        )
        self.train_history = self.train_df.filter(pl.col('timestamp') <= self.timesplit_train_end)
        self.valid_history = self.train_df.filter(pl.col('timestamp') > self.timesplit_train_end)

        return (
            self.train_history,
            self.valid_history,
            self.train_targets,
            self.valid_targets
        )

In [None]:
preprocessor = Preprocessor(train_df)
train_history, valid_history, train_targets, valid_targets = preprocessor.run()

- train_history/valid_history - позитивные взаимодействия пользователей
- train_targets/valid_targets - группы для finetune

## 3. Подготовка данных для pretrain

Для pretrain и finetune работа происходит с двумя последовательностями: последовательностью позитивных взаимодействий и последовательностью request-ов пользователя. Далее будем называть их history и candidates. На pretrain нам будет нужна только history.

#### Обрезание историй

У пользователей может быть разное количество позитивных событий в истории. Для простоты будет рассматривать последние 512 событий. Если вдруг их будет больше, то будем обрезать.

#### Схемы таблиц и пример

Схема для history имеет вид:
```python
HISTORY_SCHEMA = pl.Struct([{
    'source_type': pl.List(pl.Int64),
    'action_type': pl.List(pl.Int64),
    'product_id': pl.List(pl.Int64),
    'position': pl.List(pl.Int64),
    'targets_inds': pl.List(pl.Int64),
    'targets_lengths': pl.List(pl.Int64), # количество таргет событий в истории
    'lengths': pl.List(pl.Int64), # длина всей истории
}]).
```
`position` это индексы событий в истории (нужно будут далее для позиционных эмбеддингов), `targets_inds` - индексы тех позиций, которые будут участвовать в подсчете функции потерь. Они нужны, чтобы разделять потерю по событиям из обучающий и валидационной частей. `targets_lengths` - количество таких событий в истории.
Пример: Пусть есть некоторый пользователь с историей из позитивных взаимодействий длины 5. Пусть первые 3 события попадают в обучение, а последние 2 в валидацию. Тогда:
```python
history_train_sample = pl.DataFrame([{
    'source_type': [1, 1, 2, 3, 4],
    'action_type': [1, 0, 1, 0, 1],
    'product_id': [1, 2, 3, 4, 5],
    'position': [0, 1, 2, 3, 4],
    'targets_inds': [0, 1, 2],
    'targets_lengths': [3],
    'lengths': [5]
}])
history_valid_sample = pl.DataFrame([{
    'source_type': [1, 1, 2, 3, 4],
    'action_type': [1, 0, 1, 0, 1],
    'product_id': [1, 2, 3, 4, 5],
    'position': [0, 1, 2, 3, 4],
    'targets_inds': [3, 4],
    'targets_lengths': [2],
    'lengths': [5]
}])
```

In [None]:
def ensure_sorted_by_timestamp(group: Iterable[Dict[str, Any]]) -> Generator[Dict[str, Any], None, None]:
    """
    Ensures that the given iterable of events is sorted by the 'timestamp' field.

    This function iterates over each event in the provided iterable and checks if the
    'timestamp' of the current event is greater than or equal to the 'timestamp' of the
    previous event. If any event has a 'timestamp' that is less than the previous event's
    'timestamp', an AssertionError is raised.

    @param group: An iterable of dictionaries, where each dictionary represents an event with at least a 'timestamp' key.
    @return: A generator yielding each event from the input iterable in order, ensuring they are sorted by 'timestamp'.
    @raises AssertionError: If the events are not sorted by 'timestamp'.
    """

    events = chain(group)

    prev_timestamp = 0
    for event in events:
        if event["timestamp"] >= prev_timestamp:
            prev_timestamp = event["timestamp"]
            yield event
        else:
            raise AssertionError("Events are not sorted by timestamp")

In [None]:
class Mapper(ABC):
    HISTORY_SCHEMA = pl.Struct({
        'source_type': pl.List(pl.Int64),
        'action_type': pl.List(pl.Int64),
        'product_id': pl.List(pl.Int64),
        'position': pl.List(pl.Int64),
        'targets_inds': pl.List(pl.Int64),
        'targets_lengths': pl.List(pl.Int64), # количество таргет событий в истории
        'lengths': pl.List(pl.Int64), # длина всей истории
    })
    CANDIDATES_SCHEMA = pl.Struct({
        'source_type': pl.List(pl.Int64),
        'action_type': pl.List(pl.Int64),
        'product_id': pl.List(pl.Int64),
        'lengths': pl.List(pl.Int64), # длина каждого реквеста
        'num_requests': pl.List(pl.Int64) # общее количество реквестов у этого пользователя
    })

    def __init__(self, min_length: int, max_length: int):
        self._min_length: int = min_length
        self._max_length: int = max_length

    @abstractmethod
    def __call__(self, group: pl.DataFrame) -> pl.DataFrame:
        pass

    def get_empty_frame(self, candidates=False):
        return pl.DataFrame(schema=pl.Schema({
            'history': self.HISTORY_SCHEMA,
            **({'candidates': self.CANDIDATES_SCHEMA} if candidates else {})
        }))

Структура, которая:
- накапливает history для пользователя и оставлять последние `max_length`
- умеет обращаться по индексу к событию истории
- имеет метод `get(self, targets_ids)`, который превращает `self._data` в dict в соотвествии со схемой `HISTORY_SCHEMA`.

In [None]:
class HistoryDeque:
    def __init__(self, max_length=512):
        self._data = deque([], maxlen=max_length)

    def append(self, x):
        self._data.append(x)

    def __len__(self):
        return (len(self._data))

    def __getitem__(self, idx):
        return self._data[idx]

    def get(self, targets_inds=None):
        """
        Retrieves a dictionary containing various attributes of the dataset samples.

        If `targets_inds` is not provided, it automatically identifies indices of samples where the `target` is 1.

        @param targets_inds: List of indices of target samples. If None, it will be determined based on samples with target value 1.
        @return: Dictionary with keys ['source_type', 'action_type', 'product_id', 'position', 'targets_inds', 'targets_lengths', 'lengths']
                Each key maps to a list or value representing the respective attribute of the dataset samples.
        """
        if targets_inds is None:
            targets_inds = [i for i, value in enumerate(self._data)
                                        if value['target'] == 1]

        history = {'source_type': [stori['source_type'] for stori in self._data],
                    'action_type': [stori['action_type'] for stori in self._data],
                    'product_id': [stori['product_id'] for stori in self._data],
                    'position': list(range(len(self._data))),
                    'targets_inds': targets_inds,
                    'targets_lengths': [len(targets_inds)],
                    'lengths': [len(self._data)]}


        return history

На основе функции `get_pretrain_data` реализуем `PretrainMapper`, который будет по данном пользователю выдавать обучающий пример в нужном формате.

In [None]:
class PretrainMapper(Mapper):
    def __call__(self, group: pl.DataFrame) -> pl.DataFrame:
        """
        Processes a group of data by maintaining a history of rows up to a specified maximum length.
        If the history meets the minimum length requirement and contains at least one target, it returns
        a DataFrame with the history. Otherwise, it returns an empty DataFrame.

        @param group: A Polars DataFrame containing the group of data to process.
        @return: A Polars DataFrame containing the history if conditions are met; otherwise, an empty DataFrame.
        """
        deque = HistoryDeque(self._max_length)
        events_generator = ensure_sorted_by_timestamp(group.to_struct())

        for event in events_generator:
            deque.append(event)

        if len(deque) > self._min_length:
            return pl.DataFrame([{'history': deque.get()}], schema=pl.Schema({'history': Mapper.HISTORY_SCHEMA}))

        else:
            return self.get_empty_frame()


In [None]:
def get_pretrain_data(train_history: pl.DataFrame,
                      valid_history: pl.DataFrame,
                      min_length: int = 5,
                      max_length: int = 4096) -> pl.DataFrame:
    mapper = PretrainMapper(
        min_length=min_length,
        max_length=max_length,
    )

    train_data = (
        train_history.with_columns(target=pl.lit(1))
        .sort(['user_id', 'timestamp'])
        .group_by('user_id')
        .map_groups(mapper)
    )

    valid_data = (
        pl.concat([
            train_history.with_columns(target=pl.lit(0)),
            valid_history.with_columns(target=pl.lit(1))
        ], how='diagonal')
        .sort(['user_id', 'timestamp'])
        .group_by('user_id')
        .map_groups(mapper)
    )

    return train_data, valid_data

In [None]:
pretrain_train_data, pretrain_valid_data = get_pretrain_data(train_history, valid_history, min_length=5, max_length=512)

In [None]:
pretrain_valid_data

## 4. Реализуем свой torch.nn.utils.data.Dataset

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
import polars as pl
from tqdm import tqdm

In [None]:
def convert_dict_to_tensor(data_dict):
    """
    Recursively converts lists within a dictionary to PyTorch tensors with dtype=torch.int64.

    @param data_dict: A dictionary potentially containing nested dictionaries and lists.
    @return: A new dictionary with the same structure as `data_dict`, but with lists converted to PyTorch tensors.
    """
    if not isinstance(data_dict, dict):
        if isinstance(data_dict, str):
            return data_dict
        return torch.tensor(data_dict, dtype=torch.int64)
    else:
        new_dict = {}
        for key in data_dict:
            new_dict[key] = convert_dict_to_tensor(data_dict[key])
    return new_dict


In [None]:
def test_convert_dict_to_tensor_basic():
    input_data = {
        'a': [1, 2, 3],
        'b': {
            'c': [4, 5],
            'd': 6
        },
        'e': 'text'
    }

    result = convert_dict_to_tensor(input_data)
    print(result)

    assert isinstance(result['a'], torch.Tensor)
    assert result['a'].dtype == torch.int64
    assert result['a'].tolist() == [1, 2, 3]

    assert isinstance(result['b'], dict)
    assert isinstance(result['b']['c'], torch.Tensor)
    assert result['b']['c'].dtype == torch.int64
    assert result['b']['c'].tolist() == [4, 5]

    assert result['b']['d'] == 6
    assert result['e'] == 'text'

test_convert_dict_to_tensor_basic()

In [None]:
class LavkaDataset(Dataset):
    def __init__(self, data):
        self.data = data

    @classmethod
    def from_dataframe(cls, df: pl.DataFrame) -> 'LavkaDataset':
        converted_data = [convert_dict_to_tensor(group) for group in df.to_struct()]

        return cls(converted_data)

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        return self.data[idx]

In [None]:
print("Creating train dataset ...")
train_ds = LavkaDataset.from_dataframe(pretrain_train_data)
print("Creating valid dataset ...")
valid_ds = LavkaDataset.from_dataframe(pretrain_valid_data)

## 5. Реализуем основной backbone модели

Реализуем класс ResNet-блока согласно следующим формулам:

Для входа $x\in\mathbb{R}^{\text{batch}\times d}$ вычислить:  
$$
    \begin{aligned}
    z &= xW + b,\\
    a &= \mathrm{ReLU}(z),\\
    d'&= \mathrm{Dropout}(a),\\
    y &= \mathrm{LayerNorm}\bigl(x + d'\bigr).
    \end{aligned}
$$  
В компактном виде:  
$$
    y = \mathrm{LayerNorm}\Bigl(x + \mathrm{Dropout}\bigl(\mathrm{ReLU}(xW + b)\bigr)\Bigr)\,.
$$


In [None]:
class ResNet(nn.Module):
    def __init__(self, embedding_dim, dropout=0.):
        super().__init__()

        self.linear = nn.Linear(embedding_dim, embedding_dim)
        self.relu = nn.ReLU()
        self.dropout = nn.Dropout(p=dropout)
        self.layer_norm = nn.LayerNorm(embedding_dim)

    def forward(self, x):

        identity = x
        out = self.linear(x)
        out = self.relu(out)
        out = self.dropout(out)
        out = self.layer_norm(identity + out)

        return out

In [None]:
def test_resnet_output_shape():
    embedding_dim = 32
    batch_size = 8
    model = ResNet(embedding_dim)
    x = torch.randn(batch_size, embedding_dim)
    out = model(x)
    assert out.shape == (batch_size, embedding_dim)

test_resnet_output_shape()

Реализуем `ContextEncoder`, `ItemEncoder` и `ActionEncoder`. На вход они принимают torch.tensor с индексами размера (seq_len,), а на выходе получают torch.tensor с соотвествующими векторами размера (seq_len, embedding_dim).

In [None]:
print(f'длинна action {len(preprocessor.mapping_action_types)}')
print(f'длинна item {len(preprocessor.mapping_product_ids)}')
print(f'длинна source {len(preprocessor.mapping_source_types)}')

In [None]:
class ContextEncoder(nn.Module):
    def __init__(self, embedding_dim=64):
        super().__init__()
        self.embeddings = nn.Embedding(13, embedding_dim)

    def forward(self, inputs):
        return self.embeddings(inputs)


class ItemEncoder(nn.Module):
    def __init__(self, embedding_dim=64):
        super().__init__()
        self.embeddings = nn.Embedding(26522, embedding_dim)

    def forward(self, inputs):
        return self.embeddings(inputs)


class ActionEncoder(nn.Module):
    def __init__(self, embedding_dim=64):
        super().__init__()
        self.embeddings = nn.Embedding(4, embedding_dim)

    def forward(self, inputs):
        return self.embeddings(inputs)

In [None]:
import math
class PositionalEncoding(nn.Module):
    def __init__(self, max_seq_len: int = 512, embedding_dim: int= 64):
        super().__init__()

        pe = torch.zeros(max_seq_len, embedding_dim)
        position = torch.arange(0, max_seq_len, dtype=torch.float).unsqueeze(1)

        div_term = torch.exp(
            torch.arange(0, embedding_dim, 2).float() * (-math.log(10000.0) / embedding_dim)
        )

        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)

        pe = pe.unsqueeze(0)

        self.register_buffer('pe', pe)

    def forward(self, pos: torch.Tensor) -> torch.Tensor:
        return self.pe[0].index_select(0, pos.flatten())

In [None]:
def test_context_encoder_shape():
    embedding_dim = 16
    batch_size = 5
    seq_len = 7
    model = ContextEncoder(embedding_dim=embedding_dim)
    x = torch.randint(0, 4, (batch_size, seq_len))
    out = model(x)
    assert out.shape == (batch_size, seq_len, embedding_dim)
test_context_encoder_shape()

def test_item_encoder_shape():
    embedding_dim = 20
    batch_size = 6
    seq_len = 10
    model = ItemEncoder(embedding_dim=embedding_dim)
    x = torch.randint(0, 20000, (batch_size, seq_len))
    out = model(x)
    assert out.shape == (batch_size, seq_len, embedding_dim)
test_item_encoder_shape()

def test_action_encoder_shape():
    embedding_dim = 12
    batch_size = 4
    seq_len = 3
    model = ActionEncoder(embedding_dim=embedding_dim)
    x = torch.randint(0, 4, (batch_size, seq_len))
    out = model(x)
    assert out.shape == (batch_size, seq_len, embedding_dim)
test_action_encoder_shape()

In [None]:
def get_mask(lengths):
    """
    Generates a mask tensor based on the given sequence lengths.

    The mask is a boolean tensor where each row corresponds to a sequence and contains
    True values up to the length of the sequence and False values thereafter.

    @param lengths: A 1D tensor containing the lengths of sequences.
    @return: A 2D boolean tensor where each row has True up to the corresponding sequence length.
    """
    max_length = max(lengths)
    arange_tensor = torch.arange(max_length, device=lengths.device)

    return (arange_tensor < lengths.unsqueeze(1))

In [None]:
def test_get_mask():
    lengths = torch.tensor([2, 3, 1])
    expected_mask = torch.tensor([
        [True, True, False],
        [True, True, True],
        [True, False, False]
    ])
    assert torch.equal(get_mask(lengths), expected_mask)

test_get_mask()

Реализуем `ModelBackbone`. Эта часть модели является общей для pretrain и finetune. Она кодируется входные события, преобразует их в нужный для трансформера формат `(batch_size, seq_len, embedding_dim)`, прогоняет через них трансформер и возвращает три поля: выходы трансформера, вектора товаров и вектора feedback-ов.

In [None]:
class ModelBackbone(nn.Module):
    def __init__(self,
                 embedding_dim=64,
                 num_heads=2,
                 max_seq_len=512,
                 dropout_rate=0.2,
                 num_transformer_layers=2):
        super().__init__()
        self.context_encoder = ContextEncoder(embedding_dim)
        self.item_encoder = ItemEncoder(embedding_dim)
        self.action_encoder = ActionEncoder(embedding_dim)
        self.position_embeddings = PositionalEncoding(max_seq_len, embedding_dim)
        encoder_layer = nn.TransformerEncoderLayer(
            d_model=embedding_dim,
            nhead=num_heads,
            dim_feedforward=embedding_dim * 4,
            dropout=dropout_rate,
            activation='gelu',
            batch_first=True
        )
        self.transformer_encoder = nn.TransformerEncoder(encoder_layer, num_transformer_layers)
        self._embedding_dim = embedding_dim

    @property
    def embedding_dim(self):
        return self._embedding_dim

    def forward(self, inputs):
        context_embeddings = self.context_encoder(inputs['history']['source_type'])
        item_embeddings = self.item_encoder(inputs['history']['product_id'])
        action_embeddings = self.action_encoder(inputs['history']['action_type'])
        position_embedding = self.position_embeddings(inputs['history']['position'])

        padding_mask = get_mask(inputs['history']['lengths'])
        batch_size, seq_len = padding_mask.shape

        token_embeddings = item_embeddings.new_zeros(
            batch_size, seq_len, self.embedding_dim, device=context_embeddings.device)

        summed_embs = (context_embeddings + item_embeddings
                       + action_embeddings + position_embedding)

        token_embeddings[padding_mask] = summed_embs

        source_embeddings = self.transformer_encoder(
            token_embeddings,
            mask=torch.triu(torch.ones(seq_len, seq_len), diagonal=1).bool().to(device=context_embeddings.device),
            src_key_padding_mask=~padding_mask)

        return {
            'source_embeddings': source_embeddings[padding_mask],
            'item_embeddings': item_embeddings.squeeze(),
            'context_embeddings': context_embeddings.squeeze()}

In [None]:
def test_model_backbone():
    sample = {
        'history': {
            'source_type': torch.tensor([[1, 1, 7, 1, 1,]]),
            'action_type': torch.tensor([[2, 2, 2, 1, 2]]),
            'product_id': torch.tensor([[19210,  8368,  5165,  5326, 12476]]),
            'position': torch.tensor([[0, 1, 2, 3, 4]]),
            'targets_inds': torch.tensor([[0, 1, 2]]),
            'targets_lengths': torch.tensor([3]),
            'lengths': torch.tensor([5])
        }
    }


    backbone = ModelBackbone()
    output = backbone(sample)
    print(output['source_embeddings'].shape)
    print(output['item_embeddings'].shape)
    print(output['context_embeddings'].shape)
    assert output['source_embeddings'].shape == (5, 64)
    assert output['item_embeddings'].shape == (5, 64)
    assert output['context_embeddings'].shape == (5, 64)

test_model_backbone()

## 6. Реализуем pretrain модель

#### Головы (heads)

- **User–context fusion**  
      Последовательность ResNet-блоков, свёртка размерности `2D → D`. На входе конкатенация `source_embeddings ∥ context_embeddings`.  
- **Candidate projector**  
      Три ResNet-блока, преобразующие эмбеддинг товара из `D → D`.  
- **Classifier**  
      Три ResNet-блока, затем линейный слой `3D → 3`, для предсказания типа следующего действия из трёх возможных (cart, click, purchase).  
- **Параметр τ**  
      Скаляp-коэффициент `τ = clip(exp(τ_raw), τ_min, τ_max)` для масштабирования скалярных произведений – температура в contrastive-лоссе.


#### Формулы лоссов

Обозначения:  
- $u_i \in \mathbb{R}^D$ – нормализованный вектор пользователя для i-го примера.  
- $c_i \in \mathbb{R}^D$ – нормализованный вектор кандидата (позитивного товара) для i-го примера.  
- $\{n_{ij}\}_{j=1}^M\subset\mathbb{R}^D$ – нормализованные векторы $M$ негативных товаров (весь каталог товаров минус один товар, позитивный).  
- $\text{temp}>0$ – «температура» (скаляр).  
- $K= M+1$ – общее число кандидатов (1 позитивный + M негативных). Количество товаров во всем каталоге будет $1 + M$.

1. **Retrieval loss** (контрастивный softmax-лосс)  
   Для каждого примера $i$ вычисляем скалярные логиты:  
   $$
     \ell_i = [\,\underbrace{u_i^\top c_i}_{\text{позитивный логит}} \,\big|\,
               \underbrace{u_i^\top n_{i1},\,u_i^\top n_{i2},\,\dots,\,u_i^\top n_{iM}}_{\text{негативные логиты}}] \;\times\;\\text{temp}
   $$
   Затем лосс  
   $$
     \mathcal{L}_{\mathrm{retr}}
     = -\frac1N \sum_{i=1}^N \log\frac{\exp\bigl(u_i^\top c_i \,\text{temp}\bigr)}
                                      {\exp\bigl(u_i^\top c_i \,\text{temp}\bigr)
                                     + \sum_{j=1}^M \exp\bigl(u_i^\top n_{ij}\,\text{temp}\bigr)}.
   $$

2. **Action loss** (кросс-энтропия)  
   Для каждого положительного шага $i$ модель выдаёт логиты $\mathbf{z}_i \in \mathbb{R}^3$ по трём классам действий, а истинная метка $y_i\in\{0,1,2\}$.  
   $$
     \mathcal{L}_{\mathrm{action}}
     = -\frac1N \sum_{i=1}^N \sum_{k=0}^2 \delta_{y_i=k}\,\log\bigl(\mathrm{softmax}(\mathbf{z}_i)_k\bigr),
   $$
   где $\mathrm{softmax}(\mathbf{z})_k = \frac{e^{z_k}}{\sum_{m=0}^2 e^{z_m}}$.

3. **Итоговый лосс**  
   $$
     \mathcal{L}
     = \mathcal{L}_{\mathrm{retr}}
       \;+\; 10 \times \mathcal{L}_{\mathrm{action}}.
   $$
   Перевзвешиваем action часть т.к. у нее сильно меньше масштаб.

In [None]:
class PretrainModel(nn.Module):
    MIN_TEMPERATURE = 0.01
    MAX_TEMPERATURE = 100

    def __init__(self,
                 backbone,
                 embedding_dim=64):
        super().__init__()
        self.backbone = backbone
        self.user_context_fusion = nn.Sequential(
            ResNet(2 * embedding_dim),
            ResNet(2 * embedding_dim),
            ResNet(2 * embedding_dim),
            nn.Linear(2 * embedding_dim, embedding_dim),
        )
        self.candidate_projector = nn.Sequential(
            ResNet(embedding_dim),
            ResNet(embedding_dim),
            ResNet(embedding_dim),
        )
        self.classifier = nn.Sequential(
            ResNet(3 * embedding_dim),
            ResNet(3 * embedding_dim),
            ResNet(3 * embedding_dim),
            nn.Linear(3 * embedding_dim, 3),
        )
        self._embedding_dim = embedding_dim
        self.tau = torch.nn.Parameter(torch.zeros(1, dtype=torch.float32))

        self.cross_entr_loss = nn.CrossEntropyLoss()

    @property
    def embedding_dim(self):
        return self._embedding_dim

    @property
    def temperature(self):
        return torch.clip(torch.exp(self.tau), min=self.MIN_TEMPERATURE, max=self.MAX_TEMPERATURE)

    def forward(self, inputs):
        backbone_outputs = self.backbone(inputs)
        source_embeddings = backbone_outputs['source_embeddings']
        item_embeddings = backbone_outputs['item_embeddings']
        context_embeddings = backbone_outputs['context_embeddings']

        lengths = inputs['history']['lengths']
        offsets = torch.cumsum(lengths, dim=0) - lengths
        target_mask = torch.full((sum(lengths).item(),), False, dtype=torch.bool, device=lengths.device)
        target_inds = torch.repeat_interleave(offsets, inputs['history']['targets_lengths']) + inputs['history']['targets_inds']
        target_mask[target_inds] = True
        non_first_element = torch.full((sum(lengths).item(),), True, dtype=torch.bool, device=lengths.device)
        non_first_element[offsets] = False
        source_mask = torch.roll(non_first_element & target_mask, -1)

        source_embeddings = source_embeddings[source_mask]
        context_embeddings = context_embeddings[non_first_element & target_mask]
        item_embeddings = item_embeddings[non_first_element & target_mask]


        # calc retrieval loss
        user_embeddings = torch.nn.functional.normalize(self.user_context_fusion(torch.cat([source_embeddings, context_embeddings], dim=-1)))
        candidate_embeddings = torch.nn.functional.normalize(
                                self.candidate_projector(
                                self.backbone.item_encoder.embeddings(inputs['history']['product_id'][target_mask & non_first_element])))
        negative_embeddings = torch.nn.functional.normalize(self.candidate_projector(self.backbone.item_encoder.embeddings.weight))
        pos_logits = torch.sum(user_embeddings * candidate_embeddings, dim=-1) * self.temperature
        neg_logits = user_embeddings @ negative_embeddings.T * self.temperature
        next_positive_prediction_loss = - torch.mean(
                                          (pos_logits) - (torch.logsumexp(neg_logits, dim=-1))
                                                  )

        # calc action loss
        logits = self.classifier(torch.cat([source_embeddings, context_embeddings, item_embeddings], dim=-1))
        targets = inputs['history']['action_type'][non_first_element & target_mask] - 1
        feedback_prediction_loss = self.cross_entr_loss(logits, targets)

        return {
            'next_positive_prediction_loss': next_positive_prediction_loss,
            'feedback_prediction_loss': feedback_prediction_loss,
            'loss': next_positive_prediction_loss + feedback_prediction_loss * 10
        }

## 7. (0.5 балл) Обучим pretrain модель

In [None]:
def collate_fn(batch):
    """
    Collates a batch of samples from a dataset.

    This function is designed to handle batches where each sample is a dictionary.
    It recursively collates values associated with the same keys across all samples in the batch.
    For tensor values, it concatenates them along the first dimension.
    For dictionary values, it applies the same collation logic recursively.
    For other types of values, it simply aggregates them into a list.

    @param batch: A list of samples, where each sample is a dictionary.
    @return: A dictionary with the same keys as the samples, where each value is either
             a concatenated tensor, a recursively collated dictionary, or a list of values.
    """
    if isinstance(batch, list) and isinstance(batch[0], dict):
        batched_dict = {}
        for key in batch[0].keys():
            list_of_values = [item[key] for item in batch]
            batched_dict[key] = collate_fn(list_of_values)
        return batched_dict
    elif isinstance(batch, list) and isinstance(batch[0], torch.Tensor):
        return torch.cat(batch, dim=0)
    elif isinstance(batch, list):
        return batch
    else:
        return batch

def move_to_device(batch, device):
    """
    Moves a batch of data to a specified device (e.g., CPU or GPU).

    Args:
        batch (torch.Tensor or dict): The batch of data to move. Can be a single tensor or a dictionary of tensors.
        device (torch.device): The target device to which the batch should be moved.

    Returns:
        torch.Tensor or dict: The batch of data moved to the specified device.
                             If the input is a dictionary, the returned value will be a dictionary with the same keys
                             and values moved to the specified device.
    """
    if isinstance(batch, torch.Tensor):
        return batch.to(device)
    elif isinstance(batch, dict):
        return {key: move_to_device(value, device) for key, value in batch.items()}
    elif isinstance(batch, list):
        return [move_to_device(item, device) for item in batch]
    elif isinstance(batch, tuple):
        return tuple(move_to_device(item, device) for item in batch)

In [None]:
def test_collate_fn_basic():
    batch = [
        {
            'x': torch.tensor([1, 2]),
            'y': {
                'z': torch.tensor([[10], [20]]),
                'w': 5
            },
            's': 'foo'
        },
        {
            'x': torch.tensor([3, 4]),
            'y': {
                'z': torch.tensor([[30], [40]]),
                'w': 6
            },
            's': 'bar'
        }
    ]

    result = collate_fn(batch)

    assert isinstance(result['x'], torch.Tensor)
    assert result['x'].tolist() == [1, 2, 3, 4]

    assert isinstance(result['y'], dict)
    assert isinstance(result['y']['z'], torch.Tensor)
    assert result['y']['z'].tolist() == [[10], [20], [30], [40]]
    assert result['y']['w'] == [5, 6]

    assert result['s'] == ['foo', 'bar']
test_collate_fn_basic()

In [None]:
from statistics import mean
from sklearn.metrics import ndcg_score


def train_pretrain_model(model, train_loader, valid_loader, optimizer, scheduler, num_epochs, device):
    global_cnt = 0
    prev_valid_loss = None
    for epoch in range(num_epochs):
        model.train()
        train_losses = []
        action_losses = []
        retrieval_losses = []
        for batch in tqdm(train_loader):
            batch = move_to_device(batch, device)
            optimizer.zero_grad()
            output = model(batch)
            loss = output['loss']
            loss.backward()
            optimizer.step()
            train_losses.append(loss.item())
            action_losses.append(output['feedback_prediction_loss'].item())
            retrieval_losses.append(output['next_positive_prediction_loss'].item())
        scheduler.step()
        print(f"Epoch {epoch+1}/{num_epochs}, Train Loss: {mean(train_losses):.6f}, Train Feedback Loss: {mean(action_losses):.6f}, Train NPP Loss: {mean(retrieval_losses):.6f}")


        model.eval()
        valid_losses = []
        action_losses = []
        retrieval_losses = []
        with torch.inference_mode():
            for batch in tqdm(valid_loader):
                if len(batch['history']['targets_inds']) == 0:
                    continue
                batch = move_to_device(batch, device)
                output = model(batch)
                loss = output['loss']
                valid_losses.append(loss.item())
                action_losses.append(output['feedback_prediction_loss'].item())
                retrieval_losses.append(output['next_positive_prediction_loss'].item())

        avg_valid_loss = mean(valid_losses)
        print(f"Epoch {epoch+1}/{num_epochs}, Valid Loss: {avg_valid_loss:.6f}, Valid Feedback Loss: {mean(action_losses):.6f}, Valid NPP Loss: {mean(retrieval_losses):.6f}")

        if prev_valid_loss is None or prev_valid_loss > avg_valid_loss:
            global_cnt = 0
            prev_valid_loss = avg_valid_loss
            with torch.no_grad():
                torch.save(model, './pretrain.pt')
        else:
            global_cnt += 1
            if global_cnt == 10:
                break

In [None]:
lr = 0.001
batch_size = 8
warmup_epochs = 4
start_factor = 0.1
num_epochs = 100

embedding_dim = 64
num_heads = 2
max_seq_len = 512
dropout_rate = 0.2
num_transformer_layers = 2

In [None]:
train_loader = DataLoader(train_ds, batch_size=batch_size, shuffle=True, collate_fn=collate_fn)
valid_loader = DataLoader(valid_ds, batch_size=batch_size, shuffle=False, collate_fn=collate_fn)

In [None]:
device = torch.device('cuda')

backbone = ModelBackbone(embedding_dim=embedding_dim,
                        num_heads=num_heads,
                        max_seq_len=max_seq_len,
                        dropout_rate=dropout_rate,
                        num_transformer_layers=num_transformer_layers).to(device)
model_pretrain = PretrainModel(backbone=backbone,
                             embedding_dim=embedding_dim).to(device)
optimizer = optim.AdamW(model_pretrain.parameters(), lr=lr, weight_decay=0.01)
scheduler = optim.lr_scheduler.LinearLR(
    optimizer,
    start_factor=start_factor,
    total_iters=warmup_epochs
)

In [None]:
train_pretrain_model(model_pretrain, train_loader, valid_loader, optimizer, scheduler, num_epochs, device)

In [None]:
def best_metric_model(valid_loader):
    test_model = torch.load('./pretrain.pt', weights_only=False)
    valid_losses = []
    with torch.inference_mode():
        for batch in tqdm(valid_loader):
            if len(batch['history']['targets_inds']) == 0:
                    continue
            batch = move_to_device(batch, device)
            output = test_model(batch)
            loss = output['loss']
            valid_losses.append(loss.item())
    print(mean(valid_losses))

best_metric_model(valid_loader)

## 8. Подготовка данных для finetune

Схемы для candidates:
```python
CANDIDATES_SCHEMA = pl.Struct({
    'source_type': pl.List(pl.Int64),
    'action_type': pl.List(pl.Int64),
    'product_id': pl.List(pl.Int64),
    'lengths': pl.List(pl.Int64), # длина каждого реквеста
    'num_requests': pl.List(pl.Int64) # общее количество реквестов у этого пользователя
})
```

Пример семпла:

```python
finetune_train_sample = {
    'history': {...},
    'candidates': {
        'source_type': [1, 2, 3],
        'action_type': [1, 0, 1, 0, 1, 0, 1, 1, 1],
        'product_id': [10, 20, 30, 40, 50, 60, 70, 80, 90],
        'lengths': [3, 3, 3],
        'num_requests': 3
    }
}
```

In [None]:
class Candidates:
    def __init__(self, max_requests_size):
        self._data = deque([], maxlen=max_requests_size)

    def append(self, x):
        if x['request_id']:
            self._data.append(x)

    def popleft(self):
        return self._data.popleft()

    def __getitem__(self, idx):
        return self._data[idx]

    def __len__(self):
        return len(self._data)

    def get(self):
        """
        Aggregates data from the internal _data attribute into a structured dictionary format.

        This method constructs a dictionary with keys 'source_type', 'action_type', 'product_id', 'lengths', and 'num_requests'.
        - 'source_type' contains the source types from each sample.
        - 'action_type' contains all action types from each sample's action_type_list flattened into a single list.
        - 'product_id' contains all product IDs from each sample's product_id_list flattened into a single list.
        - 'lengths' contains the length of the product_id_list for each sample.
        - 'num_requests' contains the total number of samples.

        Returns:
            Dict[str, Any]: A dictionary with aggregated data.
        """

        candidate_deque = {'source_type':[],
                           'action_type':[],
                           'product_id':[],
                           'lengths':[],
                           'num_requests':[len(self)]}

        for x in self:
            candidate_deque['source_type'].append(x['source_type'])
            candidate_deque['action_type'].extend(x['action_type_list'])
            candidate_deque['product_id'].extend(x['product_id_list'])
            candidate_deque['lengths'].append(len(x['action_type_list']))

        return candidate_deque

In [None]:
import random
import bisect


class FinetuneTrainMapper(Mapper):
    def __call__(self, group: pl.DataFrame) -> pl.DataFrame:
        """
        Processes a group of interactions to generate history and candidate sets for recommendation.

        This method processes a DataFrame containing interaction data, separating actions into history and candidates based on the presence of 'action_type_list'.
        It ensures the data is sorted by timestamp, filters candidates based on time constraints, and selects historical interactions within a specified lag range for each candidate.
        If there are no valid candidates or insufficient history, it returns an empty DataFrame.

        @param group: A Polars DataFrame containing interaction data with at least 'timestamp' and 'action_type_list' columns.
        @return: A Polars DataFrame with 'history' and 'candidates' columns, or an empty DataFrame if no valid candidates are found.
        """
        history_deque = HistoryDeque()
        candidate_deque = Candidates(self._max_length)

        history_generator = ensure_sorted_by_timestamp(group.to_struct())
        for event in history_generator:
            if event['action_type_list'] is None:
                history_deque.append(event)

        candidate_generator = ensure_sorted_by_timestamp(group.to_struct())
        for event in candidate_generator:
            if event['action_type_list']:
                max_time = event['timestamp'] - random.randrange(2, 32) * 86400
                target_ind = bisect.bisect_right(history_deque, max_time, key=lambda x: x['timestamp'])
                if target_ind == 0:
                    continue
                event['targets_inds'] = target_ind - 1
                candidate_deque.append(event)

        targets_inds = [candidate['targets_inds'] for candidate in candidate_deque]


        if len(candidate_deque) > self._min_length and len(history_deque) > self._min_length:
            return pl.DataFrame([{'history': history_deque.get(targets_inds),
                                  'candidates': candidate_deque.get()}],
                                 schema=pl.Schema({'history': Mapper.HISTORY_SCHEMA,
                                                  'candidates': Mapper.CANDIDATES_SCHEMA}))
        else:
            return self.get_empty_frame(candidates=True)

class FinetuneValidMapper(Mapper):
    def __call__(self, group: pl.DataFrame) -> pl.DataFrame:
        """
        Differs only in the formation of target_inds
        """
        history_deque = HistoryDeque()
        candidate_deque = Candidates(self._max_length)

        history_generator = ensure_sorted_by_timestamp(group.to_struct())
        for event in history_generator:
            if event['action_type_list'] is None:
                history_deque.append(event)

        candidate_generator = ensure_sorted_by_timestamp(group.to_struct())
        for event in candidate_generator:
            if event['action_type_list']:
                max_time = event['timestamp']
                target_ind = bisect.bisect_right(history_deque, max_time, key=lambda x: x['timestamp'])
                if target_ind == 0:
                    continue
                event['targets_inds'] = target_ind - 1
                candidate_deque.append(event)

        targets_inds = [candidate['targets_inds'] for candidate in candidate_deque]


        if len(candidate_deque) > self._min_length and len(history_deque) > self._min_length:
            return pl.DataFrame([{'history': history_deque.get(targets_inds),
                                  'candidates': candidate_deque.get()}],
                                 schema=pl.Schema({'history': Mapper.HISTORY_SCHEMA,
                                                  'candidates': Mapper.CANDIDATES_SCHEMA}))
        else:
            return self.get_empty_frame(candidates=True)

In [None]:
def get_finetune_data(train_history: pl.DataFrame,
                      train_targets: pl.DataFrame,
                      valid_targets: pl.DataFrame,
                      min_length: int = 5,
                      max_length: int = 4096) -> pl.DataFrame:
    mapper = FinetuneTrainMapper(
        min_length=min_length,
        max_length=max_length,
    )

    train_data = (
        pl.concat([
            train_history,
            train_targets.with_columns([
                pl.col('product_id').alias('product_id_list'),
                pl.col('action_type').alias('action_type_list')
            ]).drop(['product_id', 'action_type'])
        ], how='diagonal')
        .sort(['user_id', 'timestamp'])
        .group_by('user_id')
        .map_groups(mapper)
    )

    mapper = FinetuneValidMapper(
        min_length=min_length,
        max_length=max_length,
    )

    valid_data = (
        pl.concat([
            train_history,
            valid_targets.with_columns([
                pl.col('product_id').alias('product_id_list'),
                pl.col('action_type').alias('action_type_list')
            ]).drop(['product_id', 'action_type'])
        ], how='diagonal')
        .sort(['user_id', 'timestamp'])
        .group_by('user_id')
        .map_groups(mapper)
    )

    return train_data, valid_data

In [None]:
finetune_train_data, finetune_valid_data = get_finetune_data(train_history, train_targets, valid_targets, min_length=5, max_length=512)

In [None]:
print("Creating train dataset ...")
train_ds = LavkaDataset.from_dataframe(finetune_train_data)
print("Creating valid dataset ...")
valid_ds = LavkaDataset.from_dataframe(finetune_valid_data)

## 9. Реализуем finetune модель

#### Функция make_groups: разметка элементов по «группам»

Дано: вектор длин последовательностей  
$$
\mathbf{l} = [\,l_1, l_2, \dots, l_B\,],\quad l_i\in\mathbb{N},\;
B=\text{batch size}.
$$  
Нужно получить вектор «номеров групп» длиной  
$$
N = \sum_{i=1}^B l_i
$$
так, чтобы первые $l_1$ элементов имели номер группы 0, следующие $l_2$ - номер 1 и т.д.  
Результат:  
$$
\mathrm{groups} = [\,\underbrace{0,\dots,0}_{l_1},\;
\underbrace{1,\dots,1}_{l_2},\;\dots\;,\underbrace{B-1,\dots,B-1}_{l_B}\,]\,.
$$


In [None]:
def make_groups(lengths: torch.Tensor) -> torch.Tensor:
    range_tensor = torch.arange(0, len(lengths), device=lengths.device)
    return torch.repeat_interleave(range_tensor, lengths)

In [None]:
def test_make_groups_basic():
    lengths = torch.tensor([2, 3, 1])
    expected = torch.tensor([0, 0, 1, 1, 1, 2])
    result = make_groups(lengths)
    assert torch.equal(result, expected)

test_make_groups_basic()

#### Функция make_pairs: построение всех упорядоченных пар внутри групп

Цель: для каждого «блока» длины $l_i$ сгенерировать всех $l_i\times l_i$ упорядоченных пар индексов  
$$
( p, q ),\quad p,q\in\{0,\dots,l_i-1\},
$$
а затем «развернуть» их по всему батчу. Результат - двумерный тензор shape $(2,\,\sum_i l_i^2)$, где

- первая строка `pairs` - индексы «первого» элемента пары в пределах своего блока,  
- вторая строка `pairs` - индексы «второго».

Математически пары нумеруются так:
$$
\{\, (p,q)\;\big|\;p=0..l_i-1,\;q=0..l_i-1\;\}\quad\forall i=1..B.
$$

In [None]:
def make_pairs(lengths: torch.Tensor) -> torch.Tensor:
    num_pairs_per_group = lengths**2
    total_pairs = torch.sum(num_pairs_per_group)

    group_idx = make_groups(num_pairs_per_group)

    pair_offsets = torch.cumsum(num_pairs_per_group, dim=0) - num_pairs_per_group
    local_pair_idx = torch.arange(total_pairs, device=lengths.device) - pair_offsets.repeat_interleave(num_pairs_per_group)

    local_p = local_pair_idx // lengths[group_idx]
    local_q = local_pair_idx % lengths[group_idx]

    offsets = torch.cumsum(lengths, dim=0) - lengths
    global_offsets = offsets[group_idx]

    pairs_first = local_p + global_offsets
    pairs_second = local_q + global_offsets

    return torch.stack([pairs_first, pairs_second], dim=0)

In [None]:
def test_make_pairs_simple():
    lengths = torch.tensor([1, 2], dtype=torch.long)
    expected = torch.tensor([
        [0, 1, 1, 2, 2],
        [0, 1, 2, 1, 2]
    ], dtype=torch.long)

    pairs = make_pairs(lengths)
    assert pairs.shape == (2, 5)
    assert torch.equal(pairs, expected)

test_make_pairs_simple()

#### Класс CalibratedPairwiseLogistic: попарная калиброванная логистическая функция потерь

Идея была предложена здесь: [Calibrated Pairwise Logistic](https://arxiv.org/pdf/2211.01494). Пусть у нас есть:

- логиты всех элементов: $\mathbf{c} \in \mathbb{R}^N$,
- таргеты $\mathbf{t}\in\mathbb{R}^N$,

Шаги:

1. Генерируем все упорядоченные пары индексов внутри групп:  
   $$
   \mathrm{pairs} = \bigl[\;I_0,\;I_1\bigr],\quad
   I_0,I_1\in\{0,\dots,N-1\}
   $$
2. Для каждой пары извлекаем  
   $$
   c_i = c_{I_0},\quad c_j = c_{I_1},\quad
   t_i = t_{I_0},\quad t_j = t_{I_1}.
   $$
3. Отбираем только «положительные» пары, где $t_i > t_j$. Вводим индикатор  
   $$
   w_{ij} =
     \begin{cases}
       1,&t_i > t_j,\\
       0,&\text{иначе}.
     \end{cases}
   $$
   И считаем $W=\sum w_{ij}$.
4. Если $W>0$, вычисляем попарный loss для каждой положительной пары:
   
   а) сначала вычисляем «калиброванную вероятность» того, что $i$ лучше $j$:
   $$
     p_{ij}
     = \frac{\sigma(c_i)}{\sigma(c_i)+\sigma(c_j)},
     \quad
     \sigma(x)=\frac1{1+e^{-x}}.
   $$
   б) берём отрицательный логарифм правдоподобия:
   $$
     \ell_{ij}
     = -\log p_{ij}
     = -\log\frac{\sigma(c_i)}{\sigma(c_i)+\sigma(c_j)}.
   $$
   
5. Итоговая loss - усреднённая:
   $$
     \mathcal{L}
     = \frac{1}{W}\sum_{i,j} w_{ij}\;\ell_{ij}.
   $$
6. Если $W=0$ (нет ни одной пары с $t_i>t_j$), возвращаем нуль.

Таким образом, CalibratedPairwiseLogistic минимизирует  
$$
-\frac{1}{W}\sum_{t_i>t_j}\log\frac{\sigma(c_i)}{\sigma(c_i)+\sigma(c_j)},
$$
то есть учит давать более высокие оценки $c_i$ элементам с большим таргетом $t_i$.

In [None]:
import torch.nn.functional as F

class CalibratedPairwiseLogistic(nn.Module):
    def forward(self, logits, targets, lengths):
        pairs = make_pairs(lengths)
        targets_pairs = targets[pairs]
        logits_pairs = logits[pairs]

        w = targets_pairs[0] > targets_pairs[1]
        ci = logits_pairs[0][w]
        cj = logits_pairs[1][w]

        if ci.numel() == 0:
            return logits.new_tensor(0.0)

        term1 = F.softplus(-ci)

        log_sig_ci = -F.softplus(-ci)
        log_sig_cj = -F.softplus(-cj)
        term2 = torch.logaddexp(log_sig_ci, log_sig_cj)

        loss = term1 + term2

        #loss = F.softplus(-(ci - cj))

        return torch.mean(loss)

В FinetuneModel к сырым логитам  
$$\ell_i = \langle u_i, v_i\rangle$$  
применяется калибровка:

1. Параметр «scale» (обозначим $s$) хранится в виде логарифма, то есть в модели он задан как $\text{scale}$, а реальный множитель берётся как  
   $$
     \alpha = \exp(\text{scale}).
   $$

2. Параметр «bias» (обозначим $b$) - это свободный смещающий коэффициент.

Калиброванный логит получается по формуле  
$$
  \hat\ell_i \;=\; \frac{\ell_i}{\alpha} \;+\; b
  \;=\;
  \frac{\langle u_i, v_i\rangle}{\exp(\text{s})} \;+\; b.
$$

Благодаря этому механизмy модель может автоматически подстраивать и жёсткость (разброс) логитов (через $\alpha$), и их среднее значение (через $b$), что важно для оптимальной работы попарной логистической функции потерь.

In [None]:
class FinetuneModel(nn.Module):
    def __init__(self,
                 backbone,
                 embedding_dim=64):
        super().__init__()
        self.backbone = backbone
        self.user_context_fusion = nn.Sequential(
            ResNet(2 * embedding_dim),
            ResNet(2 * embedding_dim),
            ResNet(2 * embedding_dim),
            nn.Linear(2 * embedding_dim, embedding_dim),
        )
        self.candidate_projector = nn.Sequential(
            ResNet(embedding_dim),
            ResNet(embedding_dim),
            ResNet(embedding_dim),
        )
        self._embedding_dim = embedding_dim
        self.scale = torch.nn.Parameter(torch.zeros(1, dtype=torch.float32))
        self.bias = torch.nn.Parameter(torch.zeros(1, dtype=torch.float32))
        self.pairwise_loss = CalibratedPairwiseLogistic()

    @property
    def embedding_dim(self):
        return self._embedding_dim

    def forward(self, inputs):
        backbone_outputs = self.backbone(inputs)
        source_embeddings = backbone_outputs['source_embeddings']

        lengths = inputs['history']['lengths']
        offsets = torch.cumsum(lengths, dim=0) - lengths

        target_inds = torch.repeat_interleave(offsets, inputs['history']['targets_lengths']) + inputs['history']['targets_inds']
        source_embeddings = source_embeddings[target_inds]
        context_embeddings = self.backbone.context_encoder(inputs['candidates']['source_type'])
        candidate_embeddings = self.backbone.item_encoder(inputs['candidates']['product_id'])

        source_embeddings = torch.nn.functional.normalize(
                                self.user_context_fusion(
                                torch.cat([source_embeddings, context_embeddings], dim=-1)))

        candidate_embeddings = torch.nn.functional.normalize(self.candidate_projector(candidate_embeddings))
        source_embeddings = torch.repeat_interleave(source_embeddings, inputs['candidates']['lengths'], dim=0)
        output_logits = torch.sum((candidate_embeddings * source_embeddings), dim=-1) / torch.exp(self.scale) + self.bias

        return {
            'logits': output_logits,
            'loss': self.pairwise_loss(output_logits,
                                       inputs['candidates']['action_type'],
                                       inputs['candidates']['lengths'])
        }

In [None]:
def test_finetune_model():
    sample = {
        'history': {
            'source_type': torch.tensor([8, 8, 8, 8, 8]),
            'action_type': torch.tensor([1, 1, 2, 2, 1]),
            'product_id': torch.tensor([ 3551, 17044, 10396, 10396, 10396]),
            'position': torch.tensor([0, 1, 2, 3, 4]),
            'targets_inds': torch.tensor([1]),
            'targets_lengths': torch.tensor([1]),
            'lengths': torch.tensor([5])
        },
        'candidates': {
            'source_type': torch.tensor([8]),
            'action_type': torch.tensor([0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1]),
            'product_id': torch.tensor([18391,  6750, 21647,  5339,  3171,  6150,  3454, 20012, 19954, 10690, 24020,  5551,  5699, 17388, 10396]),
        'lengths': torch.tensor([15]),
        'num_requests': torch.tensor([1])
        }
    }
    backbone = ModelBackbone()
    model_finetune = FinetuneModel(backbone)
    output = model_finetune(sample)

    assert output['logits'].shape == (15,)
    assert output['loss'].shape == ()

test_finetune_model()

## 10. Обучаем finetune модель

In [None]:
def train_finetune_model(model, train_loader, valid_loader, optimizer, scheduler, num_epochs, device):
    prev_valid_ndcg = None
    global_cnt = 0
    for epoch in range(num_epochs):
        model.train()
        train_losses = []
        for batch in tqdm(train_loader):
            batch = move_to_device(batch, device)
            optimizer.zero_grad()
            loss = model(batch)['loss']
            loss.backward()
            optimizer.step()
            train_losses.append(loss.item())
        scheduler.step()
        print(f"Epoch {epoch+1}/{num_epochs}, Train Loss: {mean(train_losses):.6f}")

        model.eval()
        valid_losses = []
        valid_logits = []
        valid_targets = []
        with torch.inference_mode():
            for batch in tqdm(valid_loader):
                batch = move_to_device(batch, device)
                output = model(batch)
                loss = output['loss']
                valid_losses.append(loss.item())
                logits = output['logits']
                targets = batch['candidates']['action_type']
                lengths = batch['candidates']['lengths']
                i = 0
                for length in lengths:
                    if length > 1:
                        valid_logits.append(logits[i:i + length].cpu().numpy())
                        valid_targets.append(targets[i:i + length].cpu().numpy())
                    i += length

        avg_valid_ndcg = 0
        for logits, targets in zip(valid_logits, valid_targets):
            avg_valid_ndcg += ndcg_score(targets[None,], logits[None,], k=10, ignore_ties=True)
        avg_valid_ndcg /= len(valid_logits)

        print(f"Epoch {epoch+1}/{num_epochs}, Valid Loss: {mean(valid_losses):.6f}")
        print(f"Epoch {epoch+1}/{num_epochs}, Valid NDCG@10: {avg_valid_ndcg}")

        if prev_valid_ndcg is None or prev_valid_ndcg < avg_valid_ndcg:
            global_cnt = 0
            prev_valid_ndcg = avg_valid_ndcg
            with torch.no_grad():
                torch.save(model, './finetune.pt')
        else:
            global_cnt += 1
            if global_cnt == 10:
                break

In [None]:
lr = 0.001
batch_size = 4
warmup_epochs = 6
start_factor = 0.1
num_epochs = 100

embedding_dim = 64
num_heads = 2
max_seq_len = 512
dropout_rate = 0.1
num_transformer_layers = 2

In [None]:
train_loader = DataLoader(train_ds, batch_size=batch_size, shuffle=True, collate_fn=collate_fn)
valid_loader = DataLoader(valid_ds, batch_size=batch_size, shuffle=False, collate_fn=collate_fn)

In [None]:
backbone = ModelBackbone(embedding_dim=embedding_dim,
                        num_heads=num_heads,
                        max_seq_len=max_seq_len,
                        dropout_rate=dropout_rate,
                        num_transformer_layers=num_transformer_layers).to(device)

model_finetune = FinetuneModel(backbone=backbone,
                             embedding_dim=embedding_dim).to(device).to(device)
optimizer = optim.AdamW(model_finetune.parameters(), lr=lr, weight_decay=0.01)
scheduler = optim.lr_scheduler.LinearLR(
    optimizer,
    start_factor=start_factor,
    total_iters=warmup_epochs
)

In [None]:
train_finetune_model(model_finetune, train_loader, valid_loader, optimizer, scheduler, num_epochs, device)

Пробуем инициализироваться предобученной моделью, только backbone:

In [None]:
model_pretrain = torch.load('./pretrain.pt', weights_only=False)
model_finetune = FinetuneModel(model_pretrain.backbone, embedding_dim).to(device)
optimizer = optim.AdamW(model_finetune.parameters(), lr=lr, weight_decay=0.01)
scheduler = optim.lr_scheduler.LinearLR(
    optimizer,
    start_factor=start_factor,
    total_iters=warmup_epochs
)

In [None]:
train_finetune_model(model_finetune, train_loader, valid_loader, optimizer, scheduler, num_epochs, device)

Попробуем еще дополнительно иницилизировать user_context_fusion и candidate_projector:

In [None]:
model_pretrain = torch.load('./pretrain.pt', weights_only=False)
model_finetune = FinetuneModel(model_pretrain.backbone, embedding_dim).to(device)
model_finetune.user_context_fusion = model_pretrain.user_context_fusion
model_finetune.candidate_projector = model_pretrain.candidate_projector

optimizer = optim.AdamW(model_finetune.parameters(), lr=lr, weight_decay=0.01)
scheduler = optim.lr_scheduler.LinearLR(
    optimizer,
    start_factor=start_factor,
    total_iters=warmup_epochs
)

In [None]:
train_finetune_model(model_finetune, train_loader, valid_loader, optimizer, scheduler, num_epochs, device)

In [None]:
def best_metrics_finetune_model(valid_loader):
    valid_logits = []
    valid_targets = []
    with torch.inference_mode():
        test_model = torch.load('./finetune.pt', weights_only=False)
        for batch in tqdm(valid_loader):
            batch = move_to_device(batch, device)
            output = test_model(batch)
            logits = output['logits']
            targets = batch['candidates']['action_type']
            lengths = batch['candidates']['lengths']
            i = 0
            for length in lengths:
                if length > 1:
                    valid_logits.append(logits[i:i + length].cpu().numpy())
                    valid_targets.append(targets[i:i + length].cpu().numpy())
                i += length

    avg_valid_ndcg = 0
    for logits, targets in zip(valid_logits, valid_targets):
        avg_valid_ndcg += ndcg_score(targets[None,], logits[None,], k=10, ignore_ties=True)
    avg_valid_ndcg /= len(valid_logits)
    print(avg_valid_ndcg)

test_finetune_model(valid_loader)