In [None]:
import os
import time

import dill
import mlflow
import requests
import numpy as np
import pandas as pd

from tqdm import tqdm
from lightfm import LightFM
from scipy import sparse
from copy import deepcopy
from rectools import Columns
from rectools.dataset import Dataset, Features
from rectools.models import LightFMWrapperModel
from rectools.models.vector import VectorModel, Factors
from rectools.metrics import Precision, Recall, MAP, calc_metrics

import warnings
warnings.filterwarnings(action='ignore', category=UserWarning)

In [None]:
os.environ["OPENBLAS_NUM_THREADS"] = "1"

# Load Data

In [None]:
url = "https://storage.yandexcloud.net/itmo-recsys-public-data/kion_train.zip"

req = requests.get(url, stream=True)

with open('kion_train.zip', "wb") as fd:
    total_size_in_bytes = int(req.headers.get('Content-Length', 0))
    progress_bar = tqdm(desc='kion dataset download', total=total_size_in_bytes, unit='iB', unit_scale=True)
    for chunk in req.iter_content(chunk_size=2 ** 20):
        progress_bar.update(len(chunk))
        fd.write(chunk)

kion dataset download:  97%|█████████▋| 76.5M/78.8M [00:04<00:00, 22.9MiB/s]

In [None]:
!unzip kion_train.zip

Archive:  kion_train.zip
replace kion_train/interactions.csv? [y]es, [n]o, [A]ll, [N]one, [r]ename: 


kion dataset download: 100%|██████████| 78.8M/78.8M [00:19<00:00, 21.2MiB/s][A

In [None]:
interactions = pd.read_csv('kion_train/interactions.csv')
users = pd.read_csv('kion_train/users.csv')
items = pd.read_csv('kion_train/items.csv')

Columns.Datetime = 'last_watch_dt'

kion dataset download: 100%|██████████| 78.8M/78.8M [00:19<00:00, 22.9MiB/s]

In [None]:
interactions.drop(interactions[interactions[Columns.Datetime].str.len() != 10].index, inplace=True)
interactions[Columns.Datetime] = pd.to_datetime(interactions[Columns.Datetime], format='%Y-%m-%d')
max_date = interactions[Columns.Datetime].max()
interactions[Columns.Weight] = np.where(interactions['watched_pct'] > 10, 3, 1)

In [None]:
train = interactions[interactions[Columns.Datetime] < max_date - pd.Timedelta(days=7)].copy()
test = interactions[interactions[Columns.Datetime] >= max_date - pd.Timedelta(days=7)].copy()

train.drop(train.query("total_dur < 300").index, inplace=True)
cold_users = set(test[Columns.User]) - set(train[Columns.User])

# Отбрасываем холодных пользователей
test.drop(test[test[Columns.User].isin(cold_users)].index, inplace=True)

# Подготовка фич


### User features


In [None]:
users.fillna('Unknown', inplace=True)
users = users.loc[users[Columns.User].isin(train[Columns.User])].copy()

In [None]:
user_features_frames = []
for feature in ["sex", "age", "income"]:
    feature_frame = users.reindex(columns=[Columns.User, feature])
    feature_frame.columns = ["id", "value"]
    feature_frame["feature"] = feature
    user_features_frames.append(feature_frame)
user_features = pd.concat(user_features_frames)
user_features.head()

Unnamed: 0,id,value,feature
0,973171,М,sex
1,962099,М,sex
3,721985,Ж,sex
4,704055,Ж,sex
5,1037719,М,sex


# Item features


In [None]:
items.fillna('Unknown', inplace=True)
items = items.loc[items[Columns.Item].isin(train[Columns.Item])].copy()

In [None]:
items["genre"] = items["genres"].str.lower().str.replace(", ", ",", regex=False).str.split(",")
genre_feature = items[["item_id", "genre"]].explode("genre")
genre_feature.columns = ["id", "value"]
genre_feature["feature"] = "genre"
genre_feature.head()

Unnamed: 0,id,value,feature
0,10711,драмы,genre
0,10711,зарубежные,genre
0,10711,детективы,genre
0,10711,мелодрамы,genre
1,2508,зарубежные,genre


In [None]:
content_feature = items.reindex(columns=[Columns.Item, "content_type"])
content_feature.columns = ["id", "value"]
content_feature["feature"] = "content_type"

In [None]:
countries_feature = items.reindex(columns=[Columns.Item, "countries"])
countries_feature.columns = ["id", "value"]
countries_feature["feature"] = "countries"

In [None]:
item_features = pd.concat((genre_feature, content_feature, countries_feature))

In [None]:
metrics_name = {
    'Precision': Precision,
    'Recall': Recall,
    'MAP': MAP,
}

metrics = {}
for metric_name, metric in metrics_name.items():
    for k in range(1, 11):
        metrics[f'{metric_name}@{k}'] = metric(k=k)

In [None]:
dataset = Dataset.construct(
    interactions_df=train,
    user_features_df=user_features,
    cat_user_features=["sex", "age", "income"],
    item_features_df=item_features,
    cat_item_features=["genre", "content_type", "countries"],
)

TEST_USERS = test[Columns.User].unique()

# MLFLow

In [None]:
# Написал класс, чтобы переопределить метод _fit, заменив fit на fit_partial
# Название метода не менял, т.к в ModelBase вызывается _fit
class LightFMWrapperFitPart(LightFMWrapperModel):
    def __init__(
        self,
        model: LightFM,
        epochs: int = 1,
        num_threads: int = 16,
        verbose: int = 0,
    ):
        super().__init__(model, epochs, num_threads, verbose)

        self.model: LightFM = model
        self.n_epochs = epochs
        self.n_threads = num_threads

    def _fit(self, dataset: Dataset) -> None:  # type: ignore
        ui_coo = dataset.get_user_item_matrix(include_weights=True).tocoo(copy=False)
        user_features = self._prepare_features(dataset.user_features)
        item_features = self._prepare_features(dataset.item_features)

        self.model.fit_partial(
            ui_coo,
            user_features=user_features,
            item_features=item_features,
            sample_weight=ui_coo,
            epochs=self.n_epochs,
            num_threads=self.n_threads,
            verbose=self.verbose > 0,
        )

In [None]:
def get_model_size_mb(model):
    with open('model.dill', 'wb') as f:
        dill.dump(model, f)
    return round(os.path.getsize('model.dill') * 1e-6, 2)

def get_metrics(model, metrics, test, train):
    recos = model.recommend(
        users=TEST_USERS,
        dataset=dataset,
        k=10,
        filter_viewed=True
    )
    return calc_metrics(metrics, recos, test, train)

In [47]:
mlflow.set_tracking_uri('http://213.202.219.36:5000')
mlflow.set_experiment('lightfm')

for num in [64, 128, 256]:
    for lr in [0.009, 0.01, 0.011]:
        with mlflow.start_run(run_name=f'no_components{num}'):
            model = LightFMWrapperFitPart(LightFM(
                no_components=num, 
                learning_rate=lr, 
                loss='warp',
                random_state=42
                )
            )

            mlflow.log_params({'no_components': num, 'learning_rate': lr})

            for i in range(5):
                start = time.time()
                model.fit(dataset)
                time_per_epoch = (time.time() - start) / 60
                metrics_ = get_metrics(model, metrics, test, train)
                mlflow.log_metrics({
                    'Precision': round(metrics_['Precision@10'], 4),
                    'Recall': round(metrics_['Recall@10'], 4),
                    'MAP': round(metrics_['MAP@10'], 4),
                    'Model size MB': get_model_size_mb(model),
                    'Training time per epoch min': round(time_per_epoch, 2)
                }, step=i)

## MLFlow Tracker API

In [11]:
# Получаем данные запусков и выводим параметры лучшей модели
mlflow.set_tracking_uri('http://213.202.219.36:5000')
runs = mlflow.search_runs(experiment_ids='1')

mlflow_metrics = ['metrics.MAP', 'metrics.Precision', 'metrics.Recall']
params = ['params.no_components', 'params.learning_rate']
runs = mlflow.search_runs(experiment_ids='1')
runs[[*params, *mlflow_metrics]].sort_values(by=mlflow_metrics, ascending=False).head(1)

Unnamed: 0,params.no_components,params.learning_rate,metrics.MAP,metrics.Precision,metrics.Recall
8,64,0.009,0.0775,0.0348,0.1615
