# Архитектура сервиса для этапа применения бизнес-правил и повторного ранжирования кандидатов

### Описание
Для пакетной рекомендательной системы характерны 3 этапа работы:
1. Генерация кандидатов. На этом этапе происходит отбор сотен или тысяч объектов-кандидатов из множества объектов, которых могут быть миллиарды. Цель заключается в том, чтобы включить в этот набор макси-мально релевантные пользователю объекты и исключить нерелевантные.
2. Скоринг и ранжирование. Здесь система оценивает кандидатов, полученных на предыдущем этапе, и присваивает каждому из них баллы, основанные на их релевантности для пользователя. Т.к. количество объек-тов на этом этапе ограничено, то можно использовать больший набор при-знаков объектов, контекстные факторы, например время и геопозицию.
3. Повторное ранжирование. В рамках данного этапа проводится корректировка списка кандидатов, применяются бизнес-правила, используются метрики разнообразия и новизны для уточнения позиций объектов-кандидатов в итоговом списке рекомендаций.

В рамках этой задачи выполняется подбор моделей для этапа скоринга и ранжирования.
Ограничение: использование только CPU.

### Имеющиеся данные
Социально-демографическая информация о пользователях и список их покупок - дата, название, цена, количество.
Список сгенерированных кандидатов в разрезе покупателей.

Разделение на трейн и тест: даты покупок. Самые последние покупки в тесте.
Названия датафреймов с данными: train_data, test_data, candidates.

**Состав train_data и test_data**:
```
 0   buyer_id                           object  идентификатор покупателя
 1   buyer_birth_date                   object  дата рождения покупателя
 2   buyer_sms_allowed                  int64   покупатель разрешил присылать СМС
 3   buyer_emails_allowed               int64   покупатель разрешил присылать email
 4   buyer_account_status_loyal         int64   покупатель в статусе "Лояльный" (0 или 1)
 5   buyer_account_status_unregistered  int64   покупатель в статусе "Не зарегистрирован" (0 или 1)
 6   buyer_account_status_new           int64   покупатель в статусе "Новый" (0 или 1)
 7   buyer_account_status_potential     int64   покупатель в статусе "Потенциальный" (0 или 1)
 8   buyer_account_status_lost          int64   покупатель в статусе "Потерянный" (0 или 1)
 9   buyer_account_status_sleeping      int64   покупатель в статусе "Спящий" (0 или 1)
 10  buyer_is_female                    int64   покупатель - женщина (0 или 1)
 11  buyer_age                          int64   возраст покупателя
 12  order_id                           object  идентификатор покупки
 13  order_date                         object  дата покупки
 14  order_day_of_week                  int64   день недели, в который была совершена покупка
 15  order_hour_of_day                  int64   час дня, в который была совершена покупка
 16  product_id                         object  идентификатор товара
 17  product_name                       object  название товара
 18  product_group                      object  название группы товара
 19  product_count                      float64 количество товара
 20  product_sum                        float64 сумма за товар
```

**Состав списка отранжированных рекомендаций по 1000 для каждого покупателя**:
```
 0   buyer_id                           object  идентификатор покупателя
 1   product_id                         object  идентификатор товара
```

**Состав списка отранжированных рекомендаций по 1000 для топа по дням и часам**:
```
product_id                         object  идентификатор товара
```

**Состав списка отранжированных рекомендаций по 1000 для топа по персональным признакам**:
```
product_id                         object  идентификатор товара
```



### Метрики
- NDCG@K
- Precision@K
- Recall@K
- Diversity@K

Контроллируем, чтобы NDCG@K не падала значительно относительно этапа ранжирования кандидатов.

## Импорты и конфигурация

In [47]:
import pandas as pd
import numpy as np
from typing import List, Tuple, Dict, Optional
from sklearn.metrics import ndcg_score
import logging
import warnings
from multiprocessing import Pool
import optuna
from typing import List, Dict, Tuple, Optional
import logging
from tqdm import tqdm
from optuna.trial import Trial
import multiprocessing
from functools import partial


In [2]:
ORGANIZATION_ID = ""
PROCESSING_DATE = ""
RANDOM_STATE = 42
DATA_PATH = ""

In [3]:
warnings.filterwarnings("ignore")
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s - %(levelname)s - %(message)s",
    datefmt="%Y-%m-%d %H:%M:%S",
    force=True,
)

## Классы-утилиты и общие методы

In [4]:
class TopRecommender:
    """
    Класс для рекомендации товаров на основе их популярности с учетом дня недели и часа.

    Attributes
    ---
    top_products_by_time : Dict
        Словарь с популярными товарами по временным срезам.
        Ключи: кортежи (день_недели, час) или (день_недели, -1) или (-1, -1).
        Значения: списки кортежей (product_id, количество_покупок).
    max_items : int
        Максимальное количество товаров для хранения в каждом временном срезе.
    """

    def __init__(self, max_items: int = 1000):
        """
        Parameters
        ---
        max_items : int, опционально
            Максимальное количество товаров для хранения в каждом временном срезе,
            по умолчанию 1000.
        """
        self.top_products_by_time = {}
        self.max_items = max_items

    def fit(self, train_data: pd.DataFrame) -> None:
        """
        Обучает рекомендатель на исторических данных.

        Parameters
        ---
        train_data : pd.DataFrame
            Датафрейм с историческими данными о покупках.
            Должен содержать колонки: product_id, order_day_of_week, order_hour_of_day.

        Returns
        ---
        None
        """
        # Формируем популярные товары по дню недели и часу
        hourly_counts = (
            train_data.groupby(
                ["order_day_of_week", "order_hour_of_day", "product_id"]
            )["product_count"]
            .sum()
            .reset_index(name="total_count")
        )

        # Для каждой комбинации день-час сохраняем топ товаров
        for day in hourly_counts["order_day_of_week"].unique():
            for hour in hourly_counts["order_hour_of_day"].unique():
                products = (
                    hourly_counts[
                        (hourly_counts["order_day_of_week"] == day)
                        & (hourly_counts["order_hour_of_day"] == hour)
                    ]
                    .sort_values("total_count", ascending=False)
                    .head(self.max_items)
                )

                self.top_products_by_time[(day, hour)] = list(
                    zip(products["product_id"], products["total_count"])
                )

        # Формируем популярные товары по дням недели (без учета часа)
        daily_counts = (
            train_data.groupby(["order_day_of_week", "product_id"])["product_count"]
            .sum()
            .reset_index(name="total_count")
        )

        for day in daily_counts["order_day_of_week"].unique():
            products = (
                daily_counts[daily_counts["order_day_of_week"] == day]
                .sort_values("total_count", ascending=False)
                .head(self.max_items)
            )

            self.top_products_by_time[(day, -1)] = list(
                zip(products["product_id"], products["total_count"])
            )

        # Формируем общий топ товаров
        total_counts = (
            train_data.groupby("product_id")["product_count"]
            .sum()
            .reset_index(name="total_count")
            .sort_values("total_count", ascending=False)
            .head(self.max_items)
        )

        self.top_products_by_time[(-1, -1)] = list(
            zip(total_counts["product_id"], total_counts["total_count"])
        )

    def recommend(self, max_items: int, day: int = -1, hour: int = -1) -> List[str]:
        """
        Возвращает список рекомендованных товаров.

        Parameters
        ---
        max_items : int
            Максимальное количество рекомендаций для возврата.
        day : int, опционально
            День недели (-1 = неизвестен, 1-7 где 1=понедельник, 7=воскресенье).
        hour : int, опционально
            Час дня (-1 = неизвестен, 0-23), по умолчанию -1.

        Returns
        ---
        List[str]
            Список идентификаторов товаров, отсортированный по популярности.

        Raises
        ---
        ValueError
            Если указан некорректный день недели или час.

        """
        if not (-1 <= day <= 7 and day != 0):
            raise ValueError("day должен быть от -1 до 7, исключая 0")

        if not (-1 <= hour <= 23):
            raise ValueError("hour должен быть от -1 до 23")

        # Пробуем получить рекомендации для конкретного часа и дня
        if (day, hour) in self.top_products_by_time:
            products = self.top_products_by_time[(day, hour)]
        # Если нет, пробуем получить рекомендации для дня
        elif (day, -1) in self.top_products_by_time:
            products = self.top_products_by_time[(day, -1)]
        # Если и это не получилось, берем общий топ
        else:
            products = self.top_products_by_time[(-1, -1)]

        # Возвращаем только product_id в нужном количестве
        return list(zip(*products[:max_items]))[0]

In [5]:
class PersonalizedTopRecommender:
    """
    Класс для персонализированной рекомендации товаров на основе их популярности
    с учетом дня недели, часа, пола и возраста покупателя.

    Attributes
    ----------
    top_products_by_features : Dict
        Словарь с популярными товарами по различным срезам.
        Ключи: кортежи (день_недели, час, пол, возраст).
        Значения: списки кортежей (product_id, суммарное_количество).
    max_items : int
        Максимальное количество товаров для хранения в каждом срезе.
    """

    def __init__(self, max_items: int = 1000):
        """
        Parameters
        ----------
        max_items : int, опционально
            Максимальное количество товаров для хранения в каждом срезе,
            по умолчанию 1000.
        """
        self.top_products_by_features = {}
        self.max_items = max_items

    def _get_age_group(self, age: int) -> int:
        """
        Определяет возрастную группу для заданного возраста.

        Parameters
        ----------
        age : int
            Возраст покупателя.

        Returns
        -------
        int
            Возрастная группа (-1 если возраст неизвестен).
        """
        if age == -1:
            return -1
        # Группируем по десятилетиям: 0-9, 10-19, 20-29 и т.д.
        return age // 10

    def _convert_to_is_male(self, is_female: int) -> int:
        """
        Конвертирует значение is_female в is_male.

        Parameters
        ----------
        is_female : int
            Признак женского пола (-1 = неизвестно, 0 = мужчина, 1 = женщина).

        Returns
        -------
        int
            Признак мужского пола (-1 = неизвестно, 0 = женщина, 1 = мужчина).
        """
        if is_female == -1:
            return -1
        return 1 - is_female

    def _calculate_gender_stats(self, train_data: pd.DataFrame) -> None:
        """
        Подсчет статистик по полу.

        Parameters
        ----------
        train_data : pd.DataFrame
            Датафрейм с историческими данными о покупках.
            Должен содержать колонки: product_id, order_day_of_week,
            order_hour_of_day, is_male, age_group, product_count.

        Returns
        -------
        None
        """
        for is_male in [0, 1]:
            gender_mask = train_data["is_male"] == is_male
            gender_counts = (
                train_data[gender_mask]
                .groupby("product_id")["product_count"]
                .sum()
                .reset_index(name="total_count")
                .sort_values("total_count", ascending=False)
                .head(self.max_items)
            )

            if not gender_counts.empty:
                self.top_products_by_features[(-1, -1, is_male, -1)] = list(
                    zip(gender_counts["product_id"], gender_counts["total_count"])
                )

    def _calculate_age_stats(self, train_data: pd.DataFrame) -> None:
        """
        Подсчет статистик по возрастным группам.

        Parameters
        ----------
        train_data : pd.DataFrame
            Датафрейм с историческими данными о покупках.
            Должен содержать колонки: product_id, order_day_of_week,
            order_hour_of_day, is_male, age_group, product_count.

        Returns
        -------
        None
        """
        for age_group in train_data["age_group"].unique():
            if age_group != -1:
                age_mask = train_data["age_group"] == age_group
                age_counts = (
                    train_data[age_mask]
                    .groupby("product_id")["product_count"]
                    .sum()
                    .reset_index(name="total_count")
                    .sort_values("total_count", ascending=False)
                    .head(self.max_items)
                )

                if not age_counts.empty:
                    self.top_products_by_features[(-1, -1, -1, age_group)] = list(
                        zip(age_counts["product_id"], age_counts["total_count"])
                    )

    def _calculate_gender_age_stats(self, train_data: pd.DataFrame) -> None:
        """
        Подсчет статистик по комбинации пола и возрастной группы.

        Parameters
        ----------
        train_data : pd.DataFrame
            Датафрейм с историческими данными о покупках.
            Должен содержать колонки: product_id, order_day_of_week,
            order_hour_of_day, is_male, age_group, product_count.

        Returns
        -------
        None
        """
        for is_male in [0, 1]:
            for age_group in train_data["age_group"].unique():
                if age_group != -1:
                    mask = (train_data["is_male"] == is_male) & (
                        train_data["age_group"] == age_group
                    )
                    counts = (
                        train_data[mask]
                        .groupby("product_id")["product_count"]
                        .sum()
                        .reset_index(name="total_count")
                        .sort_values("total_count", ascending=False)
                        .head(self.max_items)
                    )

                    if not counts.empty:
                        self.top_products_by_features[(-1, -1, is_male, age_group)] = (
                            list(zip(counts["product_id"], counts["total_count"]))
                        )

    def _calculate_time_gender_stats(self, train_data: pd.DataFrame) -> None:
        """
        Подсчет статистик по времени и полу.

        Parameters
        ----------
        train_data : pd.DataFrame
            Датафрейм с историческими данными о покупках.
            Должен содержать колонки: product_id, order_day_of_week,
            order_hour_of_day, is_male, age_group, product_count.

        Returns
        -------
        None
        """
        for day in train_data["order_day_of_week"].unique():
            for hour in train_data["order_hour_of_day"].unique():
                for is_male in [0, 1]:
                    mask = (
                        (train_data["order_day_of_week"] == day)
                        & (train_data["order_hour_of_day"] == hour)
                        & (train_data["is_male"] == is_male)
                    )
                    counts = (
                        train_data[mask]
                        .groupby("product_id")["product_count"]
                        .sum()
                        .reset_index(name="total_count")
                        .sort_values("total_count", ascending=False)
                        .head(self.max_items)
                    )

                    if not counts.empty:
                        self.top_products_by_features[(day, hour, is_male, -1)] = list(
                            zip(counts["product_id"], counts["total_count"])
                        )

    def _calculate_time_age_stats(self, train_data: pd.DataFrame) -> None:
        """
        Подсчет статистик по времени и возрастной группе.

        Parameters
        ----------
        train_data : pd.DataFrame
            Датафрейм с историческими данными о покупках.
            Должен содержать колонки: product_id, order_day_of_week,
            order_hour_of_day, is_male, age_group, product_count.

        Returns
        -------
        None
        """
        for day in train_data["order_day_of_week"].unique():
            for hour in train_data["order_hour_of_day"].unique():
                for age_group in train_data["age_group"].unique():
                    if age_group != -1:
                        mask = (
                            (train_data["order_day_of_week"] == day)
                            & (train_data["order_hour_of_day"] == hour)
                            & (train_data["age_group"] == age_group)
                        )
                        counts = (
                            train_data[mask]
                            .groupby("product_id")["product_count"]
                            .sum()
                            .reset_index(name="total_count")
                            .sort_values("total_count", ascending=False)
                            .head(self.max_items)
                        )

                        if not counts.empty:
                            self.top_products_by_features[
                                (day, hour, -1, age_group)
                            ] = list(zip(counts["product_id"], counts["total_count"]))

    def _calculate_time_gender_age_stats(self, train_data: pd.DataFrame) -> None:
        """
        Подсчет статистик по времени, полу и возрастной группе.

        Parameters
        ----------
        train_data : pd.DataFrame
            Датафрейм с историческими данными о покупках.
            Должен содержать колонки: product_id, order_day_of_week,
            order_hour_of_day, is_male, age_group, product_count.

        Returns
        -------
        None
        """
        for day in train_data["order_day_of_week"].unique():
            for hour in train_data["order_hour_of_day"].unique():
                for is_male in [0, 1]:
                    for age_group in train_data["age_group"].unique():
                        if age_group != -1:
                            mask = (
                                (train_data["order_day_of_week"] == day)
                                & (train_data["order_hour_of_day"] == hour)
                                & (train_data["is_male"] == is_male)
                                & (train_data["age_group"] == age_group)
                            )
                            counts = (
                                train_data[mask]
                                .groupby("product_id")["product_count"]
                                .sum()
                                .reset_index(name="total_count")
                                .sort_values("total_count", ascending=False)
                                .head(self.max_items)
                            )

                            if not counts.empty:
                                self.top_products_by_features[
                                    (day, hour, is_male, age_group)
                                ] = list(
                                    zip(counts["product_id"], counts["total_count"])
                                )

    def fit(self, train_data: pd.DataFrame) -> None:
        """
        Обучает рекомендатель на исторических данных.

        Parameters
        ----------
        train_data : pd.DataFrame
            Датафрейм с историческими данными о покупках.
            Должен содержать колонки: product_id, order_day_of_week,
            order_hour_of_day, buyer_is_female, buyer_age, product_count.

        Returns
        -------
        None
        """
        # Подготавливаем данные
        train_data = train_data.copy()
        train_data["age_group"] = train_data["buyer_age"].apply(self._get_age_group)
        train_data["is_male"] = train_data["buyer_is_female"].apply(
            self._convert_to_is_male
        )

        # Подсчет статистик по отдельным признакам
        self._calculate_gender_stats(train_data)
        self._calculate_age_stats(train_data)

        # Подсчет статистик по комбинациям признаков
        self._calculate_gender_age_stats(train_data)

        # Подсчет статистик с учетом времени
        self._calculate_time_gender_stats(train_data)
        self._calculate_time_age_stats(train_data)
        self._calculate_time_gender_age_stats(train_data)

    def recommend(
        self,
        max_items: int,
        day_of_week: int = -1,
        hour: int = -1,
        age: int = -1,
        is_male: int = -1,
    ) -> List[str]:
        """
        Возвращает список рекомендованных товаров.

        Parameters
        ----------
        max_items : int
            Максимальное количество рекомендаций для возврата.
        day_of_week : int
            День недели (-1 = неизвестен, 1-7 где 1=понедельник, 7=воскресенье).
        hour : int, опционально
            Час дня (-1 = неизвестен, 0-23), по умолчанию -1.
        age : int, опционально
            Возраст покупателя (-1 = неизвестен), по умолчанию -1.
        is_male : int, опционально
            Пол покупателя (-1 = неизвестен, 0 = женщина, 1 = мужчина), по умолчанию -1.

        Returns
        -------
        List[str]
            Список идентификаторов товаров, отсортированный по популярности.

        Raises
        ------
        ValueError
            Если входные параметры имеют недопустимые значения.
        """
        if not (-1 <= day_of_week <= 7 and day_of_week != 0):
            raise ValueError("day_of_week должен быть от -1 до 7, исключая 0")
        
        if not (-1 <= hour <= 23):
            raise ValueError("hour должен быть от -1 до 23")
            
        if not (-1 <= is_male <= 1):
            raise ValueError("is_male должен быть -1, 0 или 1")
            
        if age < -1:
            raise ValueError("age должен быть -1 или положительным числом")

        if is_male == -1 and age == -1:
            raise ValueError("Должен быть известен хотя бы один персональный признак (пол или возраст)")

        age_group = self._get_age_group(age)

        # Пробуем получить рекомендации, последовательно обобщая параметры
        feature_combinations = []
        
        # Если известны оба признака
        if is_male != -1 and age != -1:
            feature_combinations.extend([
                (day_of_week, hour, is_male, age_group),  # все признаки
                (-1, -1, is_male, age_group),            # только пол и возраст
            ])
        
        # Если известен только пол
        if is_male != -1:
            feature_combinations.extend([
                (day_of_week, hour, is_male, -1),  # день, час и пол
                (-1, -1, is_male, -1),            # только пол
            ])
        
        # Если известен только возраст
        if age != -1:
            feature_combinations.extend([
                (day_of_week, hour, -1, age_group),  # день, час и возраст
                (-1, -1, -1, age_group),            # только возраст
            ])

        for features in feature_combinations:
            if features in self.top_products_by_features:
                products = self.top_products_by_features[features]
                return list(zip(*products[:max_items]))[0]

        return []  # пустой список, если ничего не найдено

In [6]:
class MetricsCalculator:
    """
    Калькулятор метрик для оценки качества рекомендаций.

    Args:
        k_values: список значений K для расчета метрик @K
    """

    def __init__(self, k_values: List[int]):
        self.k_values = sorted(k_values)

    @staticmethod
    def _calculate_ndcg_for_user(
        data_tuple: Tuple[str, Dict[str, np.ndarray], int],
    ) -> Optional[float]:
        """
        Вспомогательная функция для расчета NDCG одного пользователя.

        Args:
            data_tuple: кортеж (user_id, user_data, k) с данными пользователя и параметром k

        Returns:
            Optional[float]: значение NDCG для пользователя или None в случае ошибки
        """
        user_id, user_data, k = data_tuple
        try:
            return ndcg_score(
                y_true=[user_data["ideal"]], y_score=[user_data["relevance"]], k=k
            )
        except Exception as e:
            logging.error(f"Ошибка при расчете NDCG для пользователя {user_id}: {e}")
            return None

    def _calculate_ndcg_at_k(
        self,
        k: int,
        prepared_data: Dict[str, Dict[str, np.ndarray]],
    ) -> float:
        """
        Расчет метрики NDCG@K (Normalized Discounted Cumulative Gain) для заданного значения K.

        NDCG - это метрика, которая оценивает качество ранжирования рекомендаций с учетом
        их позиции в списке. В отличие от Precision и Recall, NDCG учитывает порядок
        рекомендаций, придавая больший вес релевантным товарам на верхних позициях.

        Формула расчета:
        DCG@K = rel₁ + Σᵢ₌₂ᵏ (relᵢ / log₂(i + 1))
        NDCG@K = DCG@K / IDCG@K
        где:
        - relᵢ - релевантность i-го товара (в нашем случае 0 или 1)
        - IDCG@K - максимально возможное значение DCG@K (идеальное ранжирование)

        Args:
            k: количество рекомендаций для оценки
            prepared_data: словарь подготовленных данных для расчета NDCG
                {
                    user_id: {
                        "relevance": np.ndarray,  # вектор релевантности рекомендаций
                        "ideal": np.ndarray       # вектор идеального ранжирования
                    }
                }

        Returns:
            float: среднее значение NDCG@K по всем пользователям.
            Возвращает 0.0 в следующих случаях:
            - если нет данных для расчета
            - если все попытки расчета NDCG завершились ошибкой
            - если prepared_data пуст

        Example:
            >>> prepared_data = {
            ...     'user1': {
            ...         'relevance': np.array([1, 0, 1, 0]),
            ...         'ideal': np.array([1, 1, 0, 0])
            ...     }
            ... }
            >>> calculator._calculate_ndcg_for_k(2, prepared_data)
            0.6309  # NDCG@2 для данного примера

        Notes:
            - Особенности метрики:
            * Учитывает порядок рекомендаций
            * Нормализована на [0, 1]
            * Чувствительна к позициям релевантных товаров
            * Использует логарифмическое дисконтирование

            - Интерпретация значений:
            * 1.0 - идеальное ранжирование
            * 0.0 - наихудшее возможное ранжирование
            * Типичные значения в реальных системах: 0.3-0.7

            - Важные замечания:
            * Метод использует готовые векторы релевантности
            * Требует предварительной подготовки данных
            * Обрабатывает ошибки для каждого пользователя отдельно
            * Использует реализацию из sklearn.metrics
        """
        if not prepared_data:
            return 0.0

        # Подготавливаем данные для параллельной обработки
        data_for_parallel = [
            (user_id, user_data, k) for user_id, user_data in prepared_data.items()
        ]

        # Используем контекстный менеджер для автоматического закрытия пула
        with Pool() as pool:
            # Запускаем параллельные вычисления
            results = pool.map(
                MetricsCalculator._calculate_ndcg_for_user, data_for_parallel
            )

            # Фильтруем None значения и считаем среднее
            valid_results = [r for r in results if r is not None]

            return float(np.mean(valid_results)) if valid_results else 0.0

    def _calculate_precision_for_user(
        self,
        pred_items_at_k: List[str],
        true_items: set[str],
        k: int,
    ) -> Optional[float]:
        """
        Расчет Precision@K для одного пользователя.

        Precision (точность) показывает, какая доля рекомендованных системой товаров
        в топ-K оказалась релевантной для пользователя. Эта метрика оценивает
        способность системы давать точные рекомендации без "шума".

        Формула расчета:
        Precision@K = |релевантные товары ∩ рекомендованные товары в топ-K| / K

        Args:
            pred_items_at_k: список первых K рекомендованных товаров
            true_items: множество фактически купленных товаров
            k: количество рекомендаций для оценки

        Returns:
            float: значение Precision@K для пользователя
            None: если нет данных для расчета

        Example:
            >>> pred_items_at_k = ['item1', 'item2']
            >>> true_items = {'item1', 'item4'}
            >>> calculator._calculate_precision_for_user(pred_items_at_k, true_items, k=2)
            0.5  # из двух рекомендованных товаров только один оказался релевантным

        Notes:
            - Высокое значение Precision означает, что большинство рекомендаций релевантны
            - Особенности метрики:
            * Всегда нормализована на K
            * При увеличении K обычно уменьшается
            * Не учитывает общее количество релевантных товаров
            * Чувствительна к порядку рекомендаций (в топ-K попадают первые K товаров)
        """
        if not pred_items_at_k:
            return None

        relevant_count = len(set(pred_items_at_k) & true_items)
        return relevant_count / k

    def _calculate_recall_for_user(
        self,
        pred_items_at_k: List[str],
        true_items: set[str],
    ) -> Optional[float]:
        """
        Расчет метрики Recall@K (полноты) для одного пользователя.

        Recall (полнота) показывает, какую долю от всех релевантных товаров пользователя
        система смогла рекомендовать в топ-K рекомендациях. Эта метрика оценивает
        способность системы находить все интересные пользователю товары.

        Формула расчета:
        Recall@K = |релевантные товары ∩ рекомендованные товары в топ-K| / |релевантные товары|

        Args:
            pred_items_at_k: список первых K рекомендованных товаров
            true_items: множество фактически купленных товаров

        Returns:
            float: значение Recall@K для пользователя
            None: если нет данных для расчета (пустой список рекомендаций или нет релевантных товаров)

        Example:
            >>> pred_items_at_k = ['item1', 'item2']
            >>> true_items = {'item1', 'item2', 'item4'}
            >>> calculator._calculate_recall_for_user(pred_items_at_k, true_items)
            0.667  # из трех релевантных товаров (item1, item2, item4)
                # в топ-2 рекомендациях найдено два (item1, item2)

        Notes:
            - Особенности метрики для одного пользователя:
            * Чувствительна к количеству релевантных товаров
            * При увеличении K обычно растет
            * Может быть низкой, если у пользователя много релевантных товаров
            * Не учитывает порядок рекомендаций в топ-K
            * Не учитывает нерелевантные рекомендации

            - Важные замечания:
            * Возвращает None для пустого списка рекомендаций
            * Возвращает None если нет релевантных товаров
            * Значение всегда в диапазоне [0, 1]
            * Равен 1.0, если все релевантные товары найдены
            * Равен 0.0, если не найдено ни одного релевантного товара
        """
        if not pred_items_at_k or not true_items:
            return None

        relevant_count = len(set(pred_items_at_k) & true_items)
        return relevant_count / len(true_items)

    def _calculate_diversity_for_user(
        self,
        pred_items_at_k: List[str],
        item_categories: Dict[str, str],
    ) -> Optional[float]:
        """
        Расчет метрики Diversity (разнообразия) для одного пользователя.

        Diversity (разнообразие) измеряет насколько разнообразны рекомендации с точки зрения
        категорий товаров. Высокое разнообразие означает, что система способна рекомендовать
        товары из разных категорий, а не концентрируется только на нескольких популярных
        категориях.

        Формула расчета:
        Diversity = количество уникальных категорий в рекомендациях / общее количество категорий

        Args:
            pred_items_at_k: список рекомендованных товаров
            item_categories: словарь соответствия товаров и их категорий {item_id: category_id}

        Returns:
            float: значение Diversity для пользователя
            None: если нет данных для расчета (пустой список рекомендаций или нет категорий)

        Example:
            >>> pred_items_at_k = ['item1', 'item2', 'item3']
            >>> item_categories = {
            ...     'item1': 'category1',
            ...     'item2': 'category1',
            ...     'item3': 'category2'
            ... }
            >>> calculator._calculate_diversity_for_user(pred_items_at_k, item_categories)
            0.5  # рекомендации содержат товары из 2 категорий из 4 возможных

        Notes:
            - Высокое значение разнообразия может указывать на то, что система
            предлагает пользователям широкий спектр товаров
            - Низкое значение может говорить о том, что система "зациклилась"
            на определенных категориях
            - Особенности метрики для одного пользователя:
            * Нормализована на общее количество доступных категорий
            * Не зависит от порядка рекомендаций
            * Учитывает только уникальные категории
            * Игнорирует товары без категорий

            - Важные замечания:
            * Возвращает None для пустого списка рекомендаций
            * Возвращает None если нет информации о категориях
            * Значение всегда в диапазоне [0, 1]
            * Равен 1.0, если рекомендации охватывают все категории
            * Равен 0.0, если все рекомендации из одной категории

        """
        if not pred_items_at_k or not item_categories:
            return None

        total_categories = len(set(item_categories.values()))
        if total_categories == 0:
            return None

        recommended_categories = {
            item_categories[item] for item in pred_items_at_k if item in item_categories
        }

        if not recommended_categories:
            return None

        return len(recommended_categories) / total_categories

    def _prepare_data(
        self,
        recommendations: Dict[str, List[str]],
        test_data: pd.DataFrame,
    ) -> Dict[str, Dict[str, np.ndarray]]:
        """
        Подготовка данных для расчета NDCG.

        Args:
            recommendations: словарь {user_id: list of recommended items}
            test_data: тестовый датафрейм с колонками 'buyer_id' и 'product_id'

        Returns:
            Dict с подготовленными данными для каждого пользователя
        """
        # Получаем фактические покупки пользователей
        true_items = test_data.groupby("buyer_id")["product_id"].agg(list).to_dict()

        # Собираем все уникальные товары
        all_items = sorted(
            list(set(item for items in recommendations.values() for item in items))
        )
        item_to_idx = {item: idx for idx, item in enumerate(all_items)}

        # Готовим структуры для расчета NDCG
        prepared_data = {}

        for user_id, pred_items in recommendations.items():
            if user_id not in true_items:
                continue

            # Создаем вектор релевантности для всех товаров
            true_set = set(true_items[user_id])
            n_items = len(all_items)

            # Создаем полный вектор релевантности
            relevance = np.zeros(n_items)
            for item in pred_items:
                if item in item_to_idx:
                    relevance[item_to_idx[item]] = 1 if item in true_set else 0

            # Создаем идеальный вектор релевантности
            ideal = np.zeros(n_items)
            for item in true_set:
                if item in item_to_idx:
                    ideal[item_to_idx[item]] = 1

            prepared_data[user_id] = {"relevance": relevance, "ideal": ideal}

        return prepared_data

    def calculate(
        self,
        recommendations: Dict[str, List[str]],
        test_data: pd.DataFrame,
        item_categories: Optional[Dict[str, str]] = None,
    ) -> Dict[str, float]:
        """
        Расчет всех метрик

        Args:
            recommendations: словарь {user_id: list of recommended items}
            test_data: тестовый датафрейм с колонками 'buyer_id' и 'product_id'
            item_categories: словарь {item_id: category_id} для расчета разнообразия

        Returns:
            Dict[str, float]: словарь с метриками NDCG@K, Precision@K, Recall@K
            и Diversity@K для каждого значения K
        """
        # Подготавливаем общие данные
        prepared_data = self._prepare_data(recommendations, test_data)
        true_items = test_data.groupby("buyer_id")["product_id"].agg(set).to_dict()

        # Инициализируем словарь с метриками
        metrics = {}

        # Инициализируем списки для хранения значений метрик
        precision_values = []
        recall_values = []
        diversity_values = []

        # Рассчитываем все метрики
        for k in self.k_values:
            metrics[f"ndcg_{k}"] = self._calculate_ndcg_at_k(k, prepared_data)

            for user_id, pred_items in recommendations.items():
                pred_items_at_k = pred_items[:k]
                user_id_in_true_items = user_id in true_items

                # Precision
                if user_id_in_true_items:
                    precision_value = self._calculate_precision_for_user(
                        pred_items_at_k=pred_items_at_k,
                        true_items=true_items[user_id],
                        k=k,
                    )
                    if precision_value is not None:
                        precision_values.append(precision_value)

                # Recall
                if user_id_in_true_items:
                    recall_value = self._calculate_recall_for_user(
                        pred_items_at_k=pred_items_at_k, true_items=true_items[user_id]
                    )
                    if recall_value is not None:
                        recall_values.append(recall_value)

                # Diversity
                diversity_value = self._calculate_diversity_for_user(
                    pred_items_at_k=pred_items_at_k,
                    item_categories=item_categories or {},
                )
                if diversity_value is not None:
                    diversity_values.append(diversity_value)

            # Сохраняем средние значения метрик
            metrics[f"precision_{k}"] = (
                float(np.mean(precision_values)) if precision_values else 0.0
            )
            metrics[f"recall_{k}"] = (
                float(np.mean(recall_values)) if recall_values else 0.0
            )
            metrics[f"diversity_{k}"] = (
                float(np.mean(diversity_values)) if diversity_values else 0.0
            )

        return metrics

## Бейзлайн

In [9]:
train_data = pd.read_csv(f"{DATA_PATH}/{ORGANIZATION_ID}_{PROCESSING_DATE}_train.csv")
test_data = pd.read_csv(f"{DATA_PATH}/{ORGANIZATION_ID}_{PROCESSING_DATE}_test.csv")
ranked_recommendations = pd.read_csv(
    f"{DATA_PATH}/{ORGANIZATION_ID}_{PROCESSING_DATE}_xgb_recommendations.csv"
)
item_categories = {
    row["product_id"]: row["product_group"] for _, row in train_data.iterrows()
}

In [92]:
class BaselinePostprocessor:
    """
    Класс для постобработки и объединения рекомендаций из разных источников.

    Объединяет рекомендации от разных рекомендательных систем в соответствии
    с заданными пропорциями, исключая дубликаты.
    """

    def fit(self, ranked_recommendations: pd.DataFrame) -> None:
        """
        Создает индекс рекомендаций.

        Parameters
        ----------
        ranked_recommendations : pd.DataFrame
            Датафрейм с персонализированными отранжированными рекомендациями.
            Должен содержать колонки: buyer_id, product_id.
        """
        self._ranked_recommendations_dict = dict(
            ranked_recommendations.groupby("buyer_id")["product_id"].apply(list)
        )

    def recommend(
        self,
        buyer_id: str,
        max_items: int,
        top_recommendations: List[str],
        personalized_top_recommendations: List[str],
    ) -> List[str]:
        """
        Формирует итоговый список рекомендаций для пользователя.

        Parameters
        ----------
        buyer_id : str
            Идентификатор покупателя.
        max_items : int
            Максимальное количество рекомендаций для возврата.
        top_recommendations : List[str]
            Список идентификаторов товаров от TopRecommender.
        personalized_top_recommendations : List[str]
            Список идентификаторов товаров от PersonalizedTopRecommender.

        Returns
        -------
        List[str]
            Список идентификаторов рекомендованных товаров.

        Notes
        -----
        Распределение рекомендаций:
        - 40% из ranked_recommendations
        - 30% из personalized_top_recommendations
        - 30% из top_recommendations
        При нехватке рекомендаций из одного источника, добираются из других.
        """
        # Получаем персонализированные рекомендации для пользователя из предварительно созданного словаря
        user_ranked_recs = self._ranked_recommendations_dict.get(buyer_id, [])

        # Создаем множества для быстрого поиска
        ranked_set = set(user_ranked_recs)
        personalized_set = set(personalized_top_recommendations)
        top_set = set(top_recommendations)

        # Рассчитываем целевые размеры для каждого источника
        ranked_size = int(max_items * 0.4)
        personalized_size = int(max_items * 0.3)
        top_size = max_items - ranked_size - personalized_size

        # Инициализируем результирующий список и множество использованных рекомендаций
        final_recommendations = []
        used_items = set()

        # Функция для добавления уникальных рекомендаций из источника
        def add_unique_recommendations(
            source: List[str], target_size: int
        ) -> List[str]:
            added = []
            for item in source:
                if len(added) >= target_size:
                    break
                if item not in used_items:
                    added.append(item)
                    used_items.add(item)
            return added

        # Добавляем рекомендации из ranked_recommendations
        final_recommendations.extend(
            add_unique_recommendations(user_ranked_recs, ranked_size)
        )

        # Добавляем рекомендации из personalized_top
        remaining_personalized = personalized_size + (
            ranked_size - len(final_recommendations)
        )
        final_recommendations.extend(
            add_unique_recommendations(
                personalized_top_recommendations, remaining_personalized
            )
        )

        # Добавляем рекомендации из top
        remaining_top = max_items - len(final_recommendations)
        final_recommendations.extend(
            add_unique_recommendations(top_recommendations, remaining_top)
        )

        # Если все еще не хватает рекомендаций, добираем из оставшихся уникальных товаров
        if len(final_recommendations) < max_items:
            # Объединяем все оставшиеся уникальные товары
            remaining_items = (ranked_set | personalized_set | top_set) - used_items
            final_recommendations.extend(
                list(remaining_items)[: max_items - len(final_recommendations)]
            )

        return final_recommendations[:max_items]

In [108]:
def process_single_buyer(
    buyer_data: Tuple[str, pd.Series],
    top_recommendations: List[str],
    personalized_top_recommender: PersonalizedTopRecommender,
    baseline_postprocessor: BaselinePostprocessor,
    max_items: int = 1000,
) -> Tuple[str, List[str]]:
    """
    Обрабатывает одного покупателя и возвращает его рекомендации.

    Parameters
    ----------
    buyer_data : Tuple[str, pd.Series]
        Кортеж, содержащий id покупателя и его данные.
    top_recommendations : List[str]
        Общие топовые рекомендации.
    personalized_top_recommender : PersonalizedTopRecommender
        Экземпляр персонализированного рекомендера.
    baseline_postprocessor : BaselinePostprocessor
        Экземпляр постпроцессора.
    max_items : int, optional
        Максимальное количество рекомендаций, по умолчанию 1000.

    Returns
    -------
    Tuple[str, List[str]]
        Кортеж из id покупателя и списка рекомендаций.
    """
    buyer_id, buyer_info = buyer_data

    # Получаем персонализированные рекомендации
    personalized_top_recommendations = personalized_top_recommender.recommend(
        max_items=max_items,
        age=(
            buyer_info["buyer_age"]
            if buyer_info["buyer_age"] != 0 and buyer_info["buyer_age"] >= -1
            else -1
        ),
        is_male=1 - buyer_info["buyer_is_female"],
    )

    # Формируем финальные рекомендации
    final_recommendations = baseline_postprocessor.recommend(
        buyer_id=buyer_id,
        max_items=max_items,
        top_recommendations=top_recommendations,
        personalized_top_recommendations=personalized_top_recommendations,
    )

    return buyer_id, final_recommendations


def get_recommendations_parallel(
    test_buyers: List[str],
    train_data: pd.DataFrame,
    top_recommendations: List[str],
    personalized_top_recommender: PersonalizedTopRecommender,
    baseline_postprocessor: BaselinePostprocessor,
    max_items: int = 1000,
    n_jobs: int = None,
) -> Dict[str, List[str]]:
    """
    Получает рекомендации для списка покупателей, используя параллельную обработку.

    Parameters
    ----------
    test_buyers : List[str]
        Список идентификаторов покупателей.
    train_data : pd.DataFrame
        Тренировочные данные.
    top_recommendations : List[str]
        Общие топовые рекомендации.
    personalized_top_recommender : PersonalizedTopRecommender
        Экземпляр персонализированного рекомендера.
    baseline_postprocessor : BaselinePostprocessor
        Экземпляр постпроцессора.
    max_items : int, optional
        Максимальное количество рекомендаций, по умолчанию 1000.
    n_jobs : int, optional
        Количество процессов для параллельной обработки.
        Если None, используется количество доступных ядер.

    Returns
    -------
    Dict[str, List[str]]
        Словарь рекомендаций для каждого покупателя.
    """
    # Если n_jobs не указан, используем количество доступных ядер
    if n_jobs is None:
        n_jobs = multiprocessing.cpu_count()

    # Получаем информацию о покупателях
    buyers_info = (
        train_data[train_data["buyer_id"].isin(test_buyers)]
        .groupby("buyer_id")
        .first()[["buyer_is_female", "buyer_age"]]
    )

    # Создаем список кортежей (buyer_id, buyer_info) для обработки
    buyers_data = list(buyers_info.iterrows())

    # Создаем частичную функцию с предустановленными параметрами
    process_buyer = partial(
        process_single_buyer,
        top_recommendations=top_recommendations,
        personalized_top_recommender=personalized_top_recommender,
        baseline_postprocessor=baseline_postprocessor,
        max_items=max_items,
    )

    # Запускаем параллельную обработку
    with multiprocessing.Pool(processes=n_jobs) as pool:
        results = pool.map(process_buyer, buyers_data)

    # Преобразуем результаты в словарь
    return dict(results)


logging.info("Создаем экземпляры всех необходимых классов")
top_recommender = TopRecommender(max_items=1000)
personalized_top_recommender = PersonalizedTopRecommender(max_items=1000)
baseline_postprocessor = BaselinePostprocessor()
metrics_calculator = MetricsCalculator(k_values=[10, 100, 1000])

logging.info("Обучаем")
top_recommender.fit(train_data)
personalized_top_recommender.fit(train_data)
baseline_postprocessor.fit(ranked_recommendations=ranked_recommendations)

logging.info("Рекомендации от TopRecommender")
top_recommendations = top_recommender.recommend(max_items=1000)

# Возьмем первых 10 уникальных покупателей из тестовых данных
# test_buyers = test_data['buyer_id'].unique()[:10]
test_buyers = test_data["buyer_id"].unique()

logging.info("Для каждого покупателя сформируем рекомендации")
final_recommendations_dict = get_recommendations_parallel(
    test_buyers=test_buyers,
    train_data=train_data,
    top_recommendations=top_recommendations,
    personalized_top_recommender=personalized_top_recommender,
    baseline_postprocessor=baseline_postprocessor,
    max_items=1000,
    n_jobs=multiprocessing.cpu_count(),
)


logging.info("Рассчитаем метрики")
# Рассчитаем метрики
metrics = metrics_calculator.calculate(
    recommendations=final_recommendations_dict,
    test_data=test_data,
    item_categories=item_categories,
)

# Вывод результатов
logging.info("Результаты:")
for k in metrics_calculator.k_values:
    logging.info(f"Метрики для K={k}:")
    logging.info(f"NDCG@{k}: {metrics[f'ndcg_{k}']:.4f}")
    logging.info(f"Precision@{k}: {metrics[f'precision_{k}']:.4f}")
    logging.info(f"Recall@{k}: {metrics[f'recall_{k}']:.4f}")
    logging.info(f"Diversity@{k}: {metrics[f'diversity_{k}']:.4f}")
    logging.info("--------------------------------")

2025-04-29 10:07:26 - INFO - Создаем экземпляры всех необходимых классов


2025-04-29 10:07:26 - INFO - Обучаем
2025-04-29 10:08:52 - INFO - Рекомендации от TopRecommender
2025-04-29 10:08:52 - INFO - Для каждого покупателя сформируем рекомендации
2025-04-29 10:10:46 - INFO - Рассчитаем метрики
2025-04-29 10:15:35 - INFO - Результаты:
2025-04-29 10:15:35 - INFO - Метрики для K=10:
2025-04-29 10:15:35 - INFO - NDCG@10: 0.8859
2025-04-29 10:15:35 - INFO - Precision@10: 0.0962
2025-04-29 10:15:35 - INFO - Recall@10: 0.2067
2025-04-29 10:15:35 - INFO - Diversity@10: 0.0046
2025-04-29 10:15:35 - INFO - --------------------------------
2025-04-29 10:15:35 - INFO - Метрики для K=100:
2025-04-29 10:15:35 - INFO - NDCG@100: 0.8758
2025-04-29 10:15:35 - INFO - Precision@100: 0.0538
2025-04-29 10:15:35 - INFO - Recall@100: 0.2220
2025-04-29 10:15:35 - INFO - Diversity@100: 0.0142
2025-04-29 10:15:35 - INFO - --------------------------------
2025-04-29 10:15:35 - INFO - Метрики для K=1000:
2025-04-29 10:15:35 - INFO - NDCG@1000: 0.8800
2025-04-29 10:15:35 - INFO - Precis

## Продвинутый постпроцессинг

In [7]:
train_data = pd.read_csv(f"{DATA_PATH}/{ORGANIZATION_ID}_{PROCESSING_DATE}_train.csv")
test_data = pd.read_csv(f"{DATA_PATH}/{ORGANIZATION_ID}_{PROCESSING_DATE}_test.csv")
ranked_recommendations = pd.read_csv(
    f"{DATA_PATH}/{ORGANIZATION_ID}_{PROCESSING_DATE}_xgb_recommendations.csv"
)
item_categories = {
    row["product_id"]: row["product_group"] for _, row in train_data.iterrows()
}

In [17]:
class AdvancedPostprocessor:
    """
    Класс для продвинутой постобработки и объединения рекомендаций из разных источников
    с настраиваемыми пропорциями для разных диапазонов количества рекомендаций.

    Attributes
    ----------
    _ranked_recommendations_dict : Dict[str, List[str]]
        Словарь с предварительно отранжированными рекомендациями.
    """

    def __init__(self):
        """
        Инициализация постпроцессора.
        """
        self._ranked_recommendations_dict = {}

    def fit(self, ranked_recommendations: pd.DataFrame) -> None:
        """
        Создает индекс рекомендаций.

        Parameters
        ----------
        ranked_recommendations : pd.DataFrame
            Датафрейм с персонализированными отранжированными рекомендациями.
            Должен содержать колонки: buyer_id, product_id.
        """
        self._ranked_recommendations_dict = dict(
            ranked_recommendations.groupby("buyer_id")["product_id"].apply(list)
        )

    def _get_recommendations_for_range(
        self,
        start_idx: int,
        end_idx: int,
        user_ranked_recs: List[str],
        personalized_top_recommendations: List[str],
        top_recommendations: List[str],
        proportions: Dict[str, float],
        used_items: set
    ) -> List[str]:
        """
        Формирует рекомендации для заданного диапазона с учетом пропорций.

        Parameters
        ----------
        start_idx : int
            Начальный индекс диапазона.
        end_idx : int
            Конечный индекс диапазона.
        user_ranked_recs : List[str]
            Список рекомендаций из ranked_recommendations.
        personalized_top_recommendations : List[str]
            Список рекомендаций из personalized_top.
        top_recommendations : List[str]
            Список рекомендаций из top.
        proportions : Dict[str, float]
            Пропорции для текущего диапазона.
        used_items : set
            Множество уже использованных рекомендаций.

        Returns
        -------
        List[str]
            Список рекомендаций для диапазона.
        """
        range_size = end_idx - start_idx
        recommendations = []

        def add_unique_recommendations(
            source: List[str],
            target_size: int,
            start_from: int = 0
        ) -> List[str]:
            added = []
            for item in source[start_from:]:
                if len(added) >= target_size:
                    break
                if item not in used_items:
                    added.append(item)
                    used_items.add(item)
            return added

        # Добавляем рекомендации из ranked_recommendations
        ranked_size = int(range_size * proportions["ranked"])
        recommendations.extend(
            add_unique_recommendations(
                user_ranked_recs,
                ranked_size,
                start_idx
            )
        )

        # Добавляем рекомендации из personalized_top
        personalized_size = int(range_size * proportions["personalized"])
        remaining_personalized = personalized_size + (ranked_size - len(recommendations))
        recommendations.extend(
            add_unique_recommendations(
                personalized_top_recommendations,
                remaining_personalized,
                start_idx
            )
        )

        # Добавляем рекомендации из top
        remaining_size = range_size - len(recommendations)
        recommendations.extend(
            add_unique_recommendations(
                top_recommendations,
                remaining_size,
                start_idx
            )
        )

        return recommendations

    def recommend(
        self,
        buyer_id: str,
        max_items: int,
        top_recommendations: List[str],
        personalized_top_recommendations: List[str],
        proportions_by_range: Optional[Dict[str, Dict[str, float]]] = None,
    ) -> List[str]:
        """
        Формирует итоговый список рекомендаций для пользователя с учетом
        настроенных пропорций для каждого диапазона.

        Parameters
        ----------
        buyer_id : str
            Идентификатор покупателя.
        max_items : int
            Максимальное количество рекомендаций для возврата.
        top_recommendations : List[str]
            Список идентификаторов товаров от TopRecommender.
        personalized_top_recommendations : List[str]
            Список идентификаторов товаров от PersonalizedTopRecommender.
        proportions_by_range : Dict[str, Dict[str, float]], optional
            Пропорции для разных диапазонов рекомендаций.
            Пример: {
                "1_10": {"ranked": 0.5, "personalized": 0.3, "top": 0.2},
                "11_100": {"ranked": 0.4, "personalized": 0.4, "top": 0.2},
                "101_1000": {"ranked": 0.3, "personalized": 0.4, "top": 0.3}
            }

        Returns
        -------
        List[str]
            Список идентификаторов рекомендованных товаров.

        Raises
        ------
        ValueError
            Если сумма пропорций не равна 1.0 или указаны некорректные пропорции.
        """
        if max_items <= 0:
            return []

        # Пропорции по умолчанию для каждого диапазона
        default_proportions = {
            "1_10": {"ranked": 0.5, "personalized": 0.3, "top": 0.2},
            "11_100": {"ranked": 0.4, "personalized": 0.4, "top": 0.2},
            "101_1000": {"ranked": 0.3, "personalized": 0.4, "top": 0.3}
        }

        # Используем переданные пропорции или пропорции по умолчанию
        proportions = proportions_by_range or default_proportions

        # Проверяем корректность пропорций для каждого диапазона
        for range_key, props in proportions.items():
            if not all(0 <= p <= 1 for p in props.values()):
                raise ValueError(f"Все пропорции для диапазона {range_key} должны быть в диапазоне [0, 1]")

            if abs(sum(props.values()) - 1.0) > 1e-10:
                raise ValueError(f"Сумма пропорций для диапазона {range_key} должна быть равна 1.0")

            if not all(key in props for key in ["ranked", "personalized", "top"]):
                raise ValueError(
                    f'Пропорции для диапазона {range_key} должны содержать ключи "ranked", "personalized" и "top"'
                )

        # Получаем рекомендации пользователя
        user_ranked_recs = self._ranked_recommendations_dict.get(buyer_id, [])
        used_items = set()
        final_recommendations = []

        # Формируем рекомендации по диапазонам
        if max_items >= 1:
            # Диапазон 1-10
            range_size = min(10, max_items)
            recommendations = self._get_recommendations_for_range(
                0, range_size,
                user_ranked_recs,
                personalized_top_recommendations,
                top_recommendations,
                proportions["1_10"],
                used_items
            )
            final_recommendations.extend(recommendations)

        if max_items > 10:
            # Диапазон 11-100
            range_size = min(100, max_items) - 10
            recommendations = self._get_recommendations_for_range(
                10, min(100, max_items),
                user_ranked_recs,
                personalized_top_recommendations,
                top_recommendations,
                proportions["11_100"],
                used_items
            )
            final_recommendations.extend(recommendations)

        if max_items > 100:
            # Диапазон 101-1000
            range_size = max_items - 100
            recommendations = self._get_recommendations_for_range(
                100, max_items,
                user_ranked_recs,
                personalized_top_recommendations,
                top_recommendations,
                proportions["101_1000"],
                used_items
            )
            final_recommendations.extend(recommendations)

        # Если все еще не хватает рекомендаций, добираем из оставшихся уникальных товаров
        if len(final_recommendations) < max_items:
            all_items = set(user_ranked_recs) | set(personalized_top_recommendations) | set(top_recommendations)
            remaining_items = all_items - used_items
            final_recommendations.extend(
                list(remaining_items)[: max_items - len(final_recommendations)]
            )

        return final_recommendations[:max_items]

In [44]:
metrics_calculator = MetricsCalculator(k_values=[10, 100, 1000])
test_buyers = test_data["buyer_id"].unique()[:10]
relevant_items_dict = {
    buyer_id: set(test_data[test_data["buyer_id"] == buyer_id]["product_id"])
    for buyer_id in test_buyers
}

In [48]:
logging.info("Создаем экземпляры всех необходимых классов")
top_recommender = TopRecommender(max_items=1000)
personalized_top_recommender = PersonalizedTopRecommender(max_items=1000)
metrics_calculator = MetricsCalculator(k_values=[10, 100, 1000])

logging.info("Обучаем")
top_recommender.fit(train_data)
personalized_top_recommender.fit(train_data)

logging.info("Рекомендации от TopRecommender")
top_recommendations = top_recommender.recommend(max_items=1000)


2025-04-30 10:06:11 - INFO - Создаем экземпляры всех необходимых классов
2025-04-30 10:06:11 - INFO - Обучаем
2025-04-30 10:08:01 - INFO - Рекомендации от TopRecommender


In [49]:
# Создаем экземпляр постпроцессора
advanced_postprocessor = AdvancedPostprocessor()

# Обучаем постпроцессор
advanced_postprocessor.fit(ranked_recommendations)


In [None]:
def evaluate_recommendations_for_sample(
    postprocessor,
    test_data: pd.DataFrame,
    top_recommender,
    personalized_top_recommender,
    n_users: int = 100,
    k_list: List[int] = [10, 100, 1000],
    proportions_by_range: Optional[Dict[str, Dict[str, float]]] = None,
) -> Dict[str, Dict[str, float]]:
    """
    Оценка качества рекомендаций на выборке пользователей.

    Parameters
    ----------
    postprocessor : AdvancedPostprocessor
        Обученный постпроцессор
    test_data : pd.DataFrame
        Тестовый датасет с колонками buyer_id, product_id
    top_recommender : BaseRecommender
        Рекомендер для получения топовых рекомендаций
    personalized_top_recommender : BaseRecommender
        Рекомендер для получения персонализированных рекомендаций
    n_users : int
        Количество пользователей для оценки
    k_list : List[int]
        Список значений k для оценки метрик
    proportions_by_range : Dict[str, Dict[str, float]], optional
        Пропорции для разных диапазонов рекомендаций

    Returns
    -------
    Dict[str, Dict[str, float]]
        Словарь с метриками для каждого k
    """
    # Создаем калькулятор метрик
    metrics_calculator = MetricsCalculator(k_values=k_list)

    # Получаем случайную выборку пользователей
    # test_users = test_data["buyer_id"].unique()[:n_users]
    test_users = test_data["buyer_id"].unique()

    # Получаем общие топовые рекомендации (они одинаковые для всех пользователей)
    top_recommendations = top_recommender.recommend(max_items=max(k_list))

    # Словарь для хранения рекомендаций всех пользователей
    all_recommendations = {}

    # Получаем и оцениваем рекомендации для каждого пользователя
    for buyer_id in tqdm(test_users, desc="Evaluating users"):
        # Получаем демографические данные пользователя
        user_data = test_data[test_data['buyer_id'] == buyer_id][['buyer_is_female', 'buyer_age']].iloc[0]
        is_male = 0 if user_data["buyer_is_female"] else 1
        age = user_data["buyer_age"] if user_data["buyer_age"] != 0 and user_data["buyer_age"] >= -1 else -1

        # Получаем персонализированные рекомендации для пользователя
        personalized_top_recommendations = personalized_top_recommender.recommend(
            is_male=is_male,
            age=age,
            max_items=max(k_list)
        )

        # Получаем итоговые рекомендации через постпроцессор
        recommendations = postprocessor.recommend(
            buyer_id=buyer_id,
            max_items=max(k_list),
            top_recommendations=top_recommendations,
            personalized_top_recommendations=personalized_top_recommendations,
            proportions_by_range=proportions_by_range
        )
        
        # Сохраняем рекомендации пользователя
        all_recommendations[buyer_id] = recommendations

    # Рассчитываем метрики
    final_metrics = metrics_calculator.calculate(
        recommendations=all_recommendations,
        test_data=test_data[test_data['buyer_id'].isin(test_users)],
        item_categories=item_categories
    )

    return final_metrics

# Пример использования
if __name__ == "__main__":
    # Задаем пропорции для разных диапазонов
    proportions_by_range = {
        "1_10": {"ranked": 0.7, "personalized": 0.2, "top": 0.1},
        "11_100": {"ranked": 0.4, "personalized": 0.3, "top": 0.3},
        "101_1000": {"ranked": 0.4, "personalized": 0.3, "top": 0.3}
    }
    
    # Оцениваем рекомендации
    metrics = evaluate_recommendations_for_sample(
        postprocessor=advanced_postprocessor,
        test_data=test_data,
        top_recommender=top_recommender,
        personalized_top_recommender=personalized_top_recommender,
        n_users=100,
        k_list=[10, 100, 1000],
        proportions_by_range=proportions_by_range
    )

    # Вывод результатов
    logging.info("Результаты:")
    for k in metrics_calculator.k_values:
        logging.info(f"Метрики для K={k}:")
        logging.info(f"NDCG@{k}: {metrics[f'ndcg_{k}']:.4f}")
        logging.info(f"Precision@{k}: {metrics[f'precision_{k}']:.4f}")
        logging.info(f"Recall@{k}: {metrics[f'recall_{k}']:.4f}")
        logging.info(f"Diversity@{k}: {metrics[f'diversity_{k}']:.4f}")
        logging.info("--------------------------------")

Evaluating users: 100%|██████████| 60050/60050 [29:36<00:00, 33.79it/s]  
2025-04-30 16:29:05 - INFO - Результаты:
2025-04-30 16:29:05 - INFO - Метрики для K=10:
2025-04-30 16:29:05 - INFO - NDCG@10: 0.8376
2025-04-30 16:29:05 - INFO - Precision@10: 0.0965
2025-04-30 16:29:05 - INFO - Recall@10: 0.2128
2025-04-30 16:29:05 - INFO - Diversity@10: 0.0053
2025-04-30 16:29:05 - INFO - --------------------------------
2025-04-30 16:29:05 - INFO - Метрики для K=100:
2025-04-30 16:29:05 - INFO - NDCG@100: 0.8262
2025-04-30 16:29:05 - INFO - Precision@100: 0.0569
2025-04-30 16:29:05 - INFO - Recall@100: 0.2946
2025-04-30 16:29:05 - INFO - Diversity@100: 0.0198
2025-04-30 16:29:05 - INFO - --------------------------------
2025-04-30 16:29:05 - INFO - Метрики для K=1000:
2025-04-30 16:29:05 - INFO - NDCG@1000: 0.8316
2025-04-30 16:29:05 - INFO - Precision@1000: 0.0391
2025-04-30 16:29:05 - INFO - Recall@1000: 0.4463
2025-04-30 16:29:05 - INFO - Diversity@1000: 0.0755
2025-04-30 16:29:05 - INFO - 

In [70]:
def optimize_proportions_for_diversity(
    postprocessor,
    test_data: pd.DataFrame,
    top_recommender,
    personalized_top_recommender,
    n_trials: int = 50,
    n_users: int = 100,
    timeout: int = 3600,
    random_state: int = 42,
) -> Tuple[Dict[str, float], Dict[str, float]]:
    """
    Оптимизация пропорций для диапазона 1-10 с фокусом на метрику Diversity.

    Parameters
    ----------
    postprocessor : AdvancedPostprocessor
        Обученный постпроцессор
    test_data : pd.DataFrame
        Тестовый датасет
    top_recommender : BaseRecommender
        Рекомендер для получения топовых рекомендаций
    personalized_top_recommender : BaseRecommender
        Рекомендер для получения персонализированных рекомендаций
    n_trials : int
        Количество итераций оптимизации
    n_users : int
        Количество пользователей для оценки в каждой итерации
    timeout : int
        Максимальное время оптимизации в секундах
    random_state : int
        Фиксация случайности

    Returns
    -------
    Tuple[Dict[str, float], Dict[str, float]]
        (лучшие пропорции, метрики на лучших пропорциях)
    """
    # Создаем калькулятор метрик
    metrics_calculator = MetricsCalculator(k_values=[10, 100, 1000])

    # Получаем случайную выборку пользователей
    test_users = test_data["buyer_id"].unique()[:n_users]

    # Получаем общие топовые рекомендации (они одинаковые для всех пользователей)
    top_recommendations = top_recommender.recommend(
        max_items=max(metrics_calculator.k_values)
    )

    test_data_sample = test_data[test_data["buyer_id"].isin(test_users)]

    def objective(trial: Trial) -> float:
        """
        Целевая функция для оптимизации.
        Максимизируем Diversity при условии, что остальные метрики не сильно ухудшаются.
        """
        # Определяем пропорции через optuna с шагом 0.1
        ranked_prop = trial.suggest_categorical(
            "ranked", [0.2, 0.3, 0.4, 0.5, 0.6]  # 20%-60% с шагом 10%
        )

        personalized_prop = trial.suggest_categorical(
            "personalized", [0.2, 0.3, 0.4, 0.5, 0.6]  # 20%-60% с шагом 10%
        )

        # Оставшееся отдаем под top рекомендации
        top_prop = 1.0 - (ranked_prop + personalized_prop)

        # Проверяем корректность пропорций
        if top_prop < 0.1:  # Минимум 10% для топовых рекомендаций
            return -1.0

        proportions = {
            "1_10": {"ranked": 0.7, "personalized": 0.2, "top": 0.1},
            "11_100": {
                "ranked": ranked_prop,
                "personalized": personalized_prop,
                "top": top_prop,
            },
            "101_1000": {"ranked": 0.3, "personalized": 0.4, "top": 0.3},
        }

        # Словарь для хранения рекомендаций всех пользователей
        all_recommendations = {}

        # Получаем рекомендации для каждого пользователя
        for buyer_id in test_users:
            # Получаем демографические данные пользователя
            user_data = test_data_sample[test_data_sample["buyer_id"] == buyer_id][
                ["buyer_is_female", "buyer_age"]
            ].iloc[0]

            is_male = 0 if user_data["buyer_is_female"] else 1
            age = user_data["buyer_age"] if user_data["buyer_age"] != 0 and user_data["buyer_age"] >= -1 else -1

            # Получаем персонализированные рекомендации
            personalized_top_recommendations = personalized_top_recommender.recommend(
                is_male=is_male, age=age, max_items=10
            )

            # Получаем итоговые рекомендации
            recommendations = postprocessor.recommend(
                buyer_id=buyer_id,
                max_items=10,
                top_recommendations=top_recommendations,
                personalized_top_recommendations=personalized_top_recommendations,
                proportions_by_range=proportions,
            )

            all_recommendations[buyer_id] = recommendations

        # Рассчитываем метрики
        metrics = metrics_calculator.calculate(
            recommendations=all_recommendations,
            test_data=test_data_sample,
            item_categories=item_categories,
        )

        # # Целевая функция: максимизируем Diversity с учетом других метрик
        # diversity = metrics['diversity_10']
        # precision = metrics['precision_10']
        # recall = metrics['recall_10']
        # ndcg = metrics['ndcg_10']

        # # Штрафуем, если основные метрики сильно падают
        # penalty = 0.0
        # if precision < 0.01:  # минимальный порог для precision
        #     penalty += 0.2
        # if recall < 0.01:    # минимальный порог для recall
        #     penalty += 0.2
        # if ndcg < 0.01:      # минимальный порог для ndcg
        #     penalty += 0.2

        # # Возвращаем взвешенную сумму метрик с акцентом на diversity
        # return diversity * 0.7 + (precision + recall + ndcg) * 0.1 - penalty
        return metrics["diversity_100"]

    # Создаем исследование
    study = optuna.create_study(
        direction="maximize", sampler=optuna.samplers.TPESampler(seed=random_state)
    )

    # Запускаем оптимизацию
    study.optimize(
        objective, n_trials=n_trials, timeout=timeout, show_progress_bar=True
    )

    # Получаем лучшие параметры
    best_ranked = study.best_params["ranked"]
    best_personalized = study.best_params["personalized"]
    best_top = 1.0 - (best_ranked + best_personalized)

    best_proportions = {
        "ranked": best_ranked,
        "personalized": best_personalized,
        "top": best_top,
    }

    # Оцениваем метрики на лучших параметрах
    proportions = {
        "1_10": {"ranked": 0.7, "personalized": 0.2, "top": 0.1},
        "11_100": best_proportions,
        "101_1000": {"ranked": 0.3, "personalized": 0.4, "top": 0.3},
    }

    # Получаем финальные рекомендации на лучших параметрах
    final_recommendations = {}
    for buyer_id in test_users:
        user_data = test_data_sample[test_data_sample["buyer_id"] == buyer_id][
            ["buyer_is_female", "buyer_age"]
        ].iloc[0]

        is_male = 0 if user_data["buyer_is_female"] else 1
        age = user_data["buyer_age"] if user_data["buyer_age"] != 0 and user_data["buyer_age"] >= -1 else -1

        personalized_top_recommendations = personalized_top_recommender.recommend(
            is_male=is_male, age=age, max_items=10
        )

        recommendations = postprocessor.recommend(
            buyer_id=buyer_id,
            max_items=10,
            top_recommendations=top_recommendations,
            personalized_top_recommendations=personalized_top_recommendations,
            proportions_by_range=proportions,
        )

        final_recommendations[buyer_id] = recommendations

    # Рассчитываем финальные метрики
    final_metrics = metrics_calculator.calculate(
        recommendations=final_recommendations,
        test_data=test_data_sample,
        item_categories=item_categories,
    )

    return best_proportions, final_metrics


# Пример использования
if __name__ == "__main__":
    # Запускаем оптимизацию
    best_proportions, final_metrics = optimize_proportions_for_diversity(
        postprocessor=advanced_postprocessor,
        test_data=test_data,
        top_recommender=top_recommender,
        personalized_top_recommender=personalized_top_recommender,
        n_trials=50,
        n_users=len(test_data["buyer_id"].unique()),
        timeout=3600,
        random_state=42,
    )

    # Выводим результаты
    print("\nЛучшие пропорции для диапазона 1-10:")
    for source, prop in best_proportions.items():
        print(f"{source}: {prop:.3f}")

    # Вывод результатов
    logging.info("Результаты:")
    for k in [10, 100, 1000]:
        logging.info(f"Метрики для K={k}:")
        logging.info(f"NDCG@{k}: {final_metrics[f'ndcg_{k}']:.4f}")
        logging.info(f"Precision@{k}: {final_metrics[f'precision_{k}']:.4f}")
        logging.info(f"Recall@{k}: {final_metrics[f'recall_{k}']:.4f}")
        logging.info(f"Diversity@{k}: {final_metrics[f'diversity_{k}']:.4f}")
        logging.info("--------------------------------")

[I 2025-04-30 14:15:39,658] A new study created in memory with name: no-name-6a987a67-48d1-4dc3-9eda-aea7042af12e
Best trial: 0. Best value: 0.00533395:   2%|▏         | 1/50 [30:32<24:56:29, 1832.45s/it, 1832.45/3600 seconds]

[I 2025-04-30 14:46:12,098] Trial 0 finished with value: 0.005333952286565522 and parameters: {'ranked': 0.3, 'personalized': 0.4}. Best is trial 0 with value: 0.005333952286565522.


Best trial: 0. Best value: 0.00533395:   4%|▍         | 2/50 [1:01:21<24:32:40, 1840.85s/it, 3681.69/3600 seconds]


[I 2025-04-30 15:17:01,348] Trial 1 finished with value: 0.005333952286565522 and parameters: {'ranked': 0.3, 'personalized': 0.4}. Best is trial 0 with value: 0.005333952286565522.

Лучшие пропорции для диапазона 1-10:
ranked: 0.300
personalized: 0.400
top: 0.300


2025-04-30 15:47:53 - INFO - Результаты:
2025-04-30 15:47:53 - INFO - Метрики для K=10:
2025-04-30 15:47:53 - INFO - NDCG@10: 0.2977
2025-04-30 15:47:53 - INFO - Precision@10: 0.0965
2025-04-30 15:47:53 - INFO - Recall@10: 0.2128
2025-04-30 15:47:53 - INFO - Diversity@10: 0.0053
2025-04-30 15:47:53 - INFO - --------------------------------
2025-04-30 15:47:53 - INFO - Метрики для K=100:
2025-04-30 15:47:53 - INFO - NDCG@100: 0.2951
2025-04-30 15:47:53 - INFO - Precision@100: 0.0531
2025-04-30 15:47:53 - INFO - Recall@100: 0.2128
2025-04-30 15:47:53 - INFO - Diversity@100: 0.0053
2025-04-30 15:47:53 - INFO - --------------------------------
2025-04-30 15:47:53 - INFO - Метрики для K=1000:
2025-04-30 15:47:53 - INFO - NDCG@1000: 0.3297
2025-04-30 15:47:53 - INFO - Precision@1000: 0.0357
2025-04-30 15:47:53 - INFO - Recall@1000: 0.2128
2025-04-30 15:47:53 - INFO - Diversity@1000: 0.0053
2025-04-30 15:47:53 - INFO - --------------------------------


## Итоги

Результатом работы является конфигурируемый и адаптируемый под различные требования с помощью параметров программный код, который позволяет выполнять повторное ранжирование и сбор итогового набора рекомендаций с учётом требований бизнеса.

Конфигурировать можно источники рекомендаций и их пропорции в разных диапазонах.