# Stage 5 (3.2 data_preprocessing_for_2_stage_model)

# Импортируем библиотеки

In [1]:
from typing import Optional

import warnings

# ----------------
# Data processing
# ----------------
import dill

import numpy as np
import polars as pl


warnings.filterwarnings("ignore")

RANDOM_STATE = 42

# Импортируем пути

In [2]:
data_path = "../data/closed/"
models_path = "../data/models/"
candidates_data_path = models_path + "candidates_data/"


In [3]:
# Так как импортируем таблицу с этапа 2.1:
# в ней категориальные фитчи уже закодированы и рассчитаны значения
# для колонок "item_pop", "item_avg_hist"
# Сейчас данные колонки необходимо перерасчитать
pl.scan_parquet(data_path + "df_items_mod.parquet").drop(
    ["item_pop", "item_avg_hist"]
).collect().write_parquet(data_path + "df_items_mod.parquet")

# Создадим датасет взаимодействий `ranker_data_bNr`
pl.concat(
    [
        pl.scan_parquet(data_path + "base_models_data.parquet").select(
            ["user_id", "item_id", "dt", "ui_inter", "u_total_inter"]
        ),
        pl.scan_parquet(data_path + "ranker_data.parquet").select(
            ["user_id", "item_id", "dt", "ui_inter", "u_total_inter"]
        ),
    ],
    how="vertical",
).collect().write_parquet(
    data_path + "ranker_data_bNr.parquet"
)

5 s

# Feature Engineering

In [4]:
def func_feature_engineering_with_interactions(
    df: pl.LazyFrame | pl.DataFrame,
) -> pl.LazyFrame | pl.DataFrame:
    """
    Function for calculating new features
    """
    # переименуем для удобства
    df = df.rename(
        {
            "u_total_inter": "user_hist",
        }
    )
    # Получаем популярность контента
    df = df.with_columns(pl.col("user_id").count().over("item_id").alias("item_pop"))

    # Получаем среднюю популярность контента, просматриваемого этим юзером
    df = df.with_columns(
        pl.col("item_pop").mean().over("user_id").alias("user_avg_pop")
    )

    # Получаем среднюю длину истории пользователя, которые смотрит этот контент
    df = df.with_columns(
        pl.col("user_hist").mean().over("item_id").alias("item_avg_hist")
    )

    # Получаем популярность последнего просмотренного контента
    df = df.sort(
        by=["user_id", "dt"],
        descending=[False, True],
    )
    df = df.with_columns(
        pl.col("item_pop").first().over("user_id").alias("user_last_pop")
    )

    return df

In [5]:
def get_tables_with_users_and_items_features(
    interactions_path: str,
    users_path: Optional[str] = None,
    items_path: Optional[str] = None,
    save_users_path: Optional[str] = None,
    save_items_path: Optional[str] = None,
):
    interactions = pl.scan_parquet(interactions_path)

    if users_path:
        df_users = pl.scan_parquet(users_path)
    elif save_users_path:
        df_users = interactions.select("user_id").unique()
    else:
        raise "users_path or save_users_path should be passed to funciton"

    if items_path:
        df_items = pl.scan_parquet(items_path)
    elif save_items_path:
        df_items = interactions.select("item_id").unique()
    else:
        raise "users_path or save_users_path should be passed to funciton"

    # Добавляем новые фичи в соответствующие таблицы
    df_items.join(
        other=func_feature_engineering_with_interactions(interactions)
        .select(["item_id", "item_pop", "item_avg_hist"])
        .unique(),
        how="left",
        on="item_id",
    ).fill_null(0).collect().write_parquet(save_items_path)

    # Создаем таблицу с фитчами пользователей
    df_users.join(
        other=func_feature_engineering_with_interactions(interactions)
        .select(["user_id", "user_hist", "user_avg_pop", "user_last_pop"])
        .unique(),
        how="left",
        on="user_id",
    ).fill_null(0).collect().write_parquet(save_users_path)

In [6]:
get_tables_with_users_and_items_features(
    interactions_path=data_path + "ranker_data_bNr.parquet",
    users_path=None,
    items_path=data_path + "df_items_mod.parquet",
    save_users_path=data_path + "df_users.parquet",
    save_items_path=data_path + "df_items_mod.parquet",
)

27 s ~4 GB RAM

# Модель второго уровня (ранкер)

## Ranker Data

### Load

In [7]:
with (
    # Пользователи из test_df, которым будут выданы
    # таргетирвонные рекомондации
    open(data_path + "bNr2t_users.dill", "rb") as users_f,
):
    bNr2t_users = dill.load(users_f)

ranker_data = (
    pl.scan_parquet(data_path + "ranker_data.parquet")
    .select(["user_id", "item_id", "ui_inter"])
    .filter(pl.col("user_id").is_in(bNr2t_users))
)
candidates_full = pl.scan_parquet(
    candidates_data_path + "candidates_full.parquet"
).filter(pl.col("user_id").is_in(bNr2t_users))

In [8]:
candidates_list = ["cos", "bm25", "tfidf", "lfm"]
default_values_candidates = {}
for cand in candidates_list:
    default_values_candidates[f"{cand}_score"] = (
        candidates_full.select(f"{cand}_score").min().collect().item()
    )
    default_values_candidates[f"{cand}_rank"] = (
        candidates_full.select(f"{cand}_rank").max().collect().item()
    )

6.5 s

### Test data

In [9]:
# Оставляем среди users только тех, для кого есть
# и рекомендации и таргеты
def users_filter(
    user_list: np.ndarray,
    candidates_df: pl.LazyFrame,
    df: pl.LazyFrame,
) -> pl.DataFrame:
    """
    Filters user interaction data and candidate recommendations,
    ensuring each user has both interactions and recommendations.

    Args:
        user_list (np.ndarray): User IDs to include.
        candidates_df (pl.LazyFrame): Candidate item recommendations
            with ranks ('cos_rank', 'bm25_rank', 'lfm_rank', 'tfidf_rank').
        df (pl.LazyFrame): User-item interactions ('user_id', 'item_id', 'dt',
            and potentially other weight-based columns).

    Returns:
        pl.LazyFrame: Filtered and merged DataFrame with user interactions
            and candidate items sorted and with missing values filled.
            It also filters down to items with at least one rank < 15
    """
    # For fillna
    default_values = {
        "ui_inter": 0,
        **default_values_candidates,
    }

    # Get valid interactions
    df = df.filter(pl.col("user_id").is_in(user_list))
    candidates_df = candidates_df.filter(pl.col("user_id").is_in(user_list))

    # join interaction на наших кандидатов для users из train, val, test
    df = (
        df.join(
            other=candidates_df,
            how="outer",
            on=["user_id", "item_id"],
        )
        .with_columns(
            pl.col("user_id").fill_null(pl.col("user_id_right")),
            pl.col("item_id").fill_null(pl.col("item_id_right")),
        )
        .drop(["user_id_right", "item_id_right"])
    )
    df = df.collect().with_columns(
        (
            pl.col(col_name).fill_null(default_values[col_name])
            for col_name in default_values.keys()
        )
    )

    # Сортируем по user_id
    df = df.sort(by=["user_id", "item_id"])
    df = df.filter(
        (pl.col("cos_rank") < 15)
        | (pl.col("bm25_rank") < 15)
        | (pl.col("lfm_rank") < 15)
        | (pl.col("tfidf_rank") < 15)
    )

    return df

In [10]:
users_filter(bNr2t_users, candidates_full, ranker_data).write_parquet(
    data_path + "ranker_data_bNr.parquet"
)

12 s ~2GB RAM

## Добавим фитчи предметов и пользователей 

### Пользователей

In [11]:
# Загружаем таблицу фитчей пользователей
df_users = pl.scan_parquet(data_path + "df_users.parquet")


# Для новых фичей юзеров
default_values_users = {
    "user_hist": 0,
    "user_avg_pop": df_users.select("user_avg_pop").median().collect().item(),
    "user_last_pop": df_users.select("user_last_pop").median().collect().item(),
}

In [12]:
# Добавляем фичи
def add_users_features(
    df: pl.LazyFrame,
    users: pl.LazyFrame,
) -> pl.DataFrame:
    """
    Merges user and item features into a DataFrame, handling missing values.

    Args:
        df (pl.LazyFrame): Interaction DataFrame ('user_id', 'item_id').
        users (pl.LazyFrame): User features DataFrame ('user_id').

    Returns:
        pl.DataFrame: DataFrame with merged user and item features,
            and missing values filled.
    """

    df = df.join(
        other=users.filter(
            pl.col("user_id").is_in(df.select("user_id").unique().collect())
        ),
        how="left",
        on=["user_id"],
    )

    # При джойне могут получиться строки
    # с несуществующими айтемами или юзерами.
    # Заполняем пропуски
    return df.collect().with_columns(
        (
            pl.col(col_name).fill_null(default_values_users[col_name])
            for col_name in default_values_users.keys()
        )
    )

In [13]:
add_users_features(
    pl.scan_parquet(data_path + "ranker_data_bNr.parquet"), df_users
).write_parquet(data_path + "ranker_data_bNr.parquet")

5 s ~2GM RAM

### Предметов

In [14]:
# Загружаем таблицу айтемов
df_items = pl.scan_parquet(data_path + "df_items_mod.parquet")

In [15]:
# Загружаем таблицу айтемов
df_items = pl.scan_parquet(data_path + "df_items_mod.parquet")

ITEM_NUM_FEATURES = [
    "item_pop",
    "item_avg_hist",
    # ---------------
    "title_len",
    "descr_len",
    "title_word_len",
    "descr_word_len",
    "txt_emb_pca_0",
    "txt_emb_pca_1",
    "txt_emb_pca_2",
    "txt_emb_pca_3",
    "txt_emb_pca_4",
    "txt_emb_pca_5",
    "txt_emb_pca_6",
    "txt_emb_pca_7",
    "txt_emb_pca_8",
    "txt_emb_pca_9",
    # ---------------
    "img_emb_pca_0",
    "img_emb_pca_1",
    "img_emb_pca_2",
    "img_emb_pca_3",
    "img_emb_pca_4",
    "img_emb_pca_5",
    "img_emb_pca_6",
    "img_emb_pca_7",
    "img_emb_pca_8",
    "img_emb_pca_9",
]

ITEM_CATEGORIAL_FEATURES = [
    "brand",
    "color",
    "closure",
    "country",
    "cut",
    "height",
    "length",
    "material",
    "model",
    "neckline",
    "pattern",
    "pocket",
    "purpose",
    "sleeve",
]

# Для новых фичей айтемов
default_values_items = {}

for num in ITEM_NUM_FEATURES:
    default_values_items[num] = df_items.select(num).median().collect().item()

for cat in ITEM_CATEGORIAL_FEATURES:
    default_values_items[cat] = (
        df_items.group_by(cat)
        .agg(pl.col(cat).count().alias("count"))
        .sort("count", descending=True)
        .select(cat)
        .first()
        .collect()
        .item()
    )

In [16]:
# Добавляем фичи
def add_items_features(
    df: pl.LazyFrame,
    items: pl.LazyFrame,
) -> pl.DataFrame:
    """
    Merges user and item features into a DataFrame, handling missing values.

    Args:
        df (pd.DataFrame): Interaction DataFrame ('user_id', 'item_id').
        items (pd.DataFrame): Item features DataFrame ('item_id').

    Returns:
        pd.DataFrame: DataFrame with merged user and item features,
            and missing values filled.
    """
    
    df = df.join(
        other=items.filter(
            pl.col("item_id").is_in(df.select("item_id").unique().collect())
        ),
        how="left",
        on=["item_id"],
    )

    # При джойне могут получиться строки
    # с несуществующими айтемами или юзерами.
    # Заполняем пропуски
    return df.collect().with_columns(
        (
            pl.col(col_name).fill_null(default_values_items[col_name])
            for col_name in default_values_items.keys()
        )
    )

In [17]:
add_items_features(
    pl.scan_parquet(data_path + "ranker_data_bNr.parquet"), df_items
).write_parquet(data_path + "ranker_data_bNr.parquet")

32 s ~7GB RAM

## Добавим таргет

In [18]:
def add_target(df: pl.LazyFrame) -> pl.DataFrame:
    return df.with_columns(
        pl.when(pl.col("ui_inter") > 6)
        .then(10)
        .when(pl.col("ui_inter") > 4)
        .then(8)
        .when(pl.col("ui_inter") > 2)
        .then(4)
        .when(pl.col("ui_inter") > 1)
        .then(2)
        .otherwise(1)
        .alias("target")
    ).collect()

In [19]:
add_target(pl.scan_parquet(data_path + "ranker_data_bNr.parquet")).write_parquet(
    data_path + "ranker_data_bNr.parquet"
)

32s ~7GM RAM