In [None]:
from collections import defaultdict

import pandas as pd
import numpy as np
import scipy.stats as ss

import lightfm
import lightfm.data as ld
import lightfm.evaluation as lv

import tqdm
import json
import glob
import faiss
import typing
#import optuna

from sklearn.metrics.pairwise import cosine_similarity

import matplotlib.pyplot as plt

np.random.seed(31337)

# Этап 1. Обучение вспомогательной модели

### Возьмем данные с предыдущего семинара (можете сгенерить самостоятельно)

In [None]:
data = pd.concat([
    pd.read_json(data_path, lines=True)
    for data_path
    in glob.glob("./stage_1/train/*/*")
])

In [None]:
random_sample = np.random.uniform(size=data.shape[0])

In [None]:
data_stage_1 = data[random_sample < 0.4]
data_stage_2 = data[random_sample >= 0.4]

In [None]:
positives = data_stage_1[data_stage_1["time"] > 0.7].copy()
positives["test"] = np.random.uniform(size=len(positives)) >= 0.95
positives.drop_duplicates(["user", "track"], inplace=True)

user_counts = positives[~positives["test"]].groupby("user").size()
users = set(user_counts[user_counts >= 2].index.values)

track_counts = positives[~positives["test"]].groupby("track").size()
tracks = set(track_counts[track_counts >= 2].index.values)

len(users), len(tracks)

### Снова обучим LightFM

В данном случае LightFM - это наша вспомогательная модель, которая поможет нам рассчитать IPS.

Для обучения возьмем небольшой сабсет данных.

In [None]:
train_data = positives[~positives["test"] & positives["user"].isin(users) & positives["track"].isin(tracks)]
test_data = positives[positives["test"] & positives["user"].isin(users) & positives["track"].isin(tracks)]

len(train_data), len(test_data)

In [None]:
dataset = ld.Dataset()
dataset.fit(users, tracks)

In [None]:
train_interactions, _ = dataset.build_interactions(train_data[["user", "track"]].itertuples(index=False, name=None))
test_interactions, _ = dataset.build_interactions(test_data[["user", "track"]].itertuples(index=False, name=None))

In [None]:
def fit_model(
    epochs=1, 
    at=10,
    loss="warp",
    no_components=30,
    learning_rate=0.01, 
    max_sampled=10,
    user_alpha=0.0, 
    item_alpha=0.0, 
    threads=30, 
    verbose=False,
    patience=3,
    epsilon=1e-6,
):
    model = lightfm.LightFM(
        no_components=no_components,
        loss=loss,
        learning_rate=learning_rate,
        max_sampled=max_sampled,
        user_alpha=user_alpha,
        item_alpha=item_alpha,
    )

    precisions_at = []
    
    for epoch in range(epochs):
        model = model.fit_partial(train_interactions, num_threads=threads)
        
        precision_at = lv.precision_at_k(model, test_interactions, train_interactions=train_interactions, k=at, num_threads=threads)
        
        if verbose:
            print(f"{epoch}:\t{np.mean(precision_at)} +/- {ss.sem(precision_at) * 1.96}")
            
        precisions_at.append(np.mean(precision_at))
            
        if epoch > patience and all([precisions_at[-j] - precisions_at[-patience-1] < epsilon for j in range(1, patience + 1)]):
            if verbose:
                print("Early stopiing!")
            break
        
    else:
        if verbose:
            print("No early stopiing happened: increase epochs maybe?")
        
    return model, precisions_at

In [None]:
model, precisions_at = fit_model(
    epochs=100,
    at=10,
    loss='warp',
    no_components=50, 
    learning_rate=0.01,
    max_sampled=100,
    user_alpha=0.0,
    item_alpha=0.0001,
    patience=15,
    verbose=True,
)

In [None]:
figure, ax = plt.subplots()
ax.plot(np.arange(len(precisions_at)), precisions_at)
pass

### Сохраняем эмбеддинги и готовим рекомендации

In [None]:
item_biases, item_embeddings = model.get_item_representations()
user_biases, user_embeddings = model.get_user_representations()

np.save("./stage_1/item_biases", item_biases)
np.save("./stage_1/item_embeddings", item_embeddings)
np.save("./stage_1/user_biases", user_biases)
np.save("./stage_1/user_embeddings", user_embeddings)

In [None]:
ITEM_BIASES = np.load("./stage_1/item_biases.npy")
ITEM_EMBEDDINGS = np.load("./stage_1/item_embeddings.npy")
USER_BIASES = np.load("./stage_1/user_biases.npy")
USER_EMBEDDINGS = np.load("./stage_1/user_embeddings.npy")

### Делаем маппинги индекс трека -> айди трека

In [None]:
TRACK_META = pd.read_json("./data/tracks.json", lines=True)
TRACK_META["track_index"] = TRACK_META["track"].map(lambda t: dataset.mapping()[2].get(t))
TRACK_META = TRACK_META[~np.isnan(TRACK_META["track_index"])]
TRACK_META["track_index"] = TRACK_META["track_index"].astype(int)

In [None]:
TRACK_META[["artist", "album", "title", "track", "track_index"]].to_csv("track_meta.csv", index=False)

In [None]:
TRACK_META = pd.read_csv("track_meta.csv")
TRACK_META = TRACK_META.set_index("track_index")

### Делаем маппинги айди юзера -> индекс юзера

In [None]:
import pickle

user_mapping_raw = dataset.mapping()[0]
user_mapping = {int(k):int(v) for k, v in user_mapping_raw.items()}

with open('user_mapping.pickle', 'wb') as f:
    pickle.dump(user_mapping, f, protocol=pickle.HIGHEST_PROTOCOL)

In [None]:
import pickle

user_mapping_raw = dataset.mapping()[0]

with open('user_mapping.pickle', 'wb') as f:
    pickle.dump({int(v):int(k) for k, v in user_mapping_raw.items()}, f, protocol=pickle.HIGHEST_PROTOCOL)

with open('user_mapping_inverse.pickle', 'wb') as f:
    pickle.dump({int(k):int(v) for k, v in user_mapping_raw.items()}, f, protocol=pickle.HIGHEST_PROTOCOL)

In [None]:
USER_MAPPING = dict()

with open('user_mapping.pickle', 'rb') as f:
    USER_MAPPING = pickle.load(f)

with open('user_mapping_inverse.pickle', 'rb') as f:
    USER_MAPPING_INVERSE = pickle.load(f)

# Этап 2. IPS

# IPS: Оценим $P(O = 1|user, track)$

Зафиксируем, что элементарное событие - это факт того что пользователь "увидел" трек. 
Иными словами - это любое не-пустое значение в матрице user-item. 
Существует два фундаментально отличающихся процесса, которые могут порождать данное событие:
1) Пользователь сам выбрал первый трек для прослушивания, в данном случае $P(O=1|user, track) = P(O=1| X_{user}, X_{track}, X_{hidden})$, где
* $X_{user}$ - это признаки пользователя,
* $X_{track}$ - признаки трека,
* $X_{hidden}$ - ненаблюдаемые признаки, например - пользователь случайно услышал трек на радио, либо ему этот трек порекомендовал друг.

2) Пользователь слушал наши рекомендации. Например, для Sequential рекоммендера, вероятность может быть записана как $P(O=1|user, track) = P(O=1| user, track, t)$, где $t$ - позиция трека в выдаче. Сама вероятность может грубо задаваться рекуррентной формулой $P(O=1| user, track, t) = P(O=1| X_u, X_{track}) * P(O=1| user, track, t-1)$.

Как можно заметить, задача оценки $P(O=1|user,track)$, вообще говоря, не является тривиальной.

## Оценка на базе первого прослушивания пользователя

Сделаем очень грубое допущение, и попробуем оценивать $P(O=1|user, track)$ только по первому процессу. Иными словами, построим модель $P(O=1|user, track) = P(O=1| X_{user}, X_{track}, X_{hidden})$.

Чтобы не усложнять себе жизнь - проигнорируем признаки $X_{hidden}$, и получим модель $P(O=1| X_{user}, X_{track})$, которую можно попытаться оценить логистической регрессией, например,

$P(O=1| X_{user}, X_{track}) = \sigma (b_1 + b_2 * (e_u, e_i) + b_3 * b_u + b_4 * b_i)$.

Величины $e_u, e_i, b_u, b_i$ мы возьмем из ранее обученной лайтфм.

In [None]:
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import roc_auc_score
from collections import namedtuple

Подготовим датасет для обучения, в качестве true лейблов - возьмем прослушки инициированныме пользователем.

In [None]:
Entry = namedtuple("session", ["user", "track"])

user_observations = []

for k, user_data in data_stage_1.groupby("user"):
    first = None
    for _, row in user_data.sort_values("timestamp").iterrows():
        if first is None:
            first = row["track"]
            user_observations.append(Entry(row["user"], first))

        if row["message"] == "last":
            first = None

In [None]:
track_meta_inverse = TRACK_META.copy()
track_meta_inverse['track_index'] = track_meta_inverse.index
track_meta_inverse.set_index("track", inplace=True)

Подготовим датасет, описательные признаки будут следующие,

$[b_u, b_i, (e_u, e_i)]$

Лейблы будут такие:
* true - пользователь инициировал прослушку трека,
* false - пользователь не инициировал прослушку трека.

In [None]:
NUM_NEGATIVE_OBSERVATIONS = 20
train_data = list()
train_target = list()
all_observed_tracks = track_meta_inverse.track_index

for entry in user_observations:
    if entry.track not in track_meta_inverse.index:
        continue

    if entry.user not in USER_MAPPING_INVERSE.keys():
        continue

    # собираем эмбедды
    
    u_ix = USER_MAPPING_INVERSE[entry.user]
    oi_ix = track_meta_inverse.loc[entry.track]['track_index']
    
    e_u = USER_EMBEDDINGS[u_ix]
    b_u = USER_BIASES[u_ix]
    
    e_oi = ITEM_EMBEDDINGS[oi_ix]
    b_oi = ITEM_BIASES[oi_ix]

    item_unobserved_ix = np.random.choice(all_observed_tracks, size=NUM_NEGATIVE_OBSERVATIONS)

    e_ui = ITEM_EMBEDDINGS[item_unobserved_ix, :]
    b_ui = ITEM_BIASES[item_unobserved_ix]

    # считаем фичи

    e_uoi_dot = e_u.dot(e_oi)
    e_uui_dot = e_u.dot(e_ui.T)

    train_data.append(np.concat([
        np.repeat(b_u, 21)[np.newaxis, :], 
        np.append(b_oi, b_ui)[np.newaxis, :],
        np.append(e_uoi_dot, e_uui_dot)[np.newaxis, :],
    ], axis=0).T)

    train_target.append([1] + [0] * NUM_NEGATIVE_OBSERVATIONS)

train_data = np.concat(train_data, axis=0)
train_target = np.concat(train_target, axis=0)

In [None]:
logreg = LogisticRegression()
logreg.fit(train_data, train_target)

roc_auc_score(train_target, logreg.predict_proba(train_data)[:, 1])

При значении ROC > 0.5 перфоманс уже должен быть лучше подхода без IPS

> One might be worried that
we need to perfectly reconstruct all propensities for effective learning. However, as we will show, **we merely need
estimated propensities that are “better” than the naive assumption of observations being revealed uniformly**.

# Этап 3. Обучим SVD с IPS

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.utils.data as td

import pytorch_lightning as pl

import pandas as pd
import numpy as np
import typing
import json

from tqdm import tqdm

tqdm.pandas()

In [None]:
# если код будет крашиться - можно снизить
NUM_NEGATIVE_SAMPLES = 20

positives = data_stage_2[data_stage_2["time"] > 0.7].copy()

user_counts = positives.groupby("user").size()
users = set(user_counts[user_counts >= 4].index.values)

track_counts = positives.groupby("track").size()
tracks = set(track_counts[track_counts >= 4].index.values)

In [None]:
triplets = positives[positives["user"].isin(users) & positives["track"].isin(tracks)]
triplets = triplets[["user", "track"]].rename(columns={"track": "track_pos"})

In [None]:
triplets =  pd.concat([triplets] * NUM_NEGATIVE_SAMPLES).sort_index().reset_index(drop=True)
triplets["track_neg"] = np.random.choice(range(50000+1), len(triplets))

Рассчитаем $log P(O=1|user, track)$

In [None]:
# Если для определенного трека / пользователя у нас нет эмбедда, используем дефолтное значение
DEFAULT_LOG_PROBA = float(np.log(1e-3))

def compute_proba(user: int, item: int) -> float:
    if user not in USER_MAPPING_INVERSE.keys():
        return DEFAULT_LOG_PROBA

    if item not in track_meta_inverse.index:
        return DEFAULT_LOG_PROBA

    user_ix = USER_MAPPING_INVERSE[user]
    
    u_e = USER_EMBEDDINGS[user_ix, :]
    u_b = USER_BIASES[user_ix]

    item_ix = track_meta_inverse.loc[item]['track_index']
    
    i_e = ITEM_EMBEDDINGS[item_ix, :]
    i_b = ITEM_BIASES[item_ix]

    ui_score = u_e.dot(i_e)
    return float(logreg.predict_log_proba(
        np.array([u_b, i_b, ui_score])[np.newaxis, :]
    )[0, 1])

In [None]:
triplets["pos_proba"] = triplets.progress_apply(lambda x: compute_proba(x['user'], x['track_pos']), axis=1)
triplets["neg_proba"] = triplets.progress_apply(lambda x: compute_proba(x['user'], x['track_neg']), axis=1)

In [None]:
triplets.to_csv("./stage_2/dataset.csv", index=False)

In [None]:
triplets = pd.read_csv("./stage_2/dataset.csv")

In [None]:
rdm = np.random.random(len(triplets))
train_data = triplets[rdm < 0.9]
val_data = triplets[rdm >= 0.9]

len(train_data), len(val_data)

In [None]:
class SVDData(pl.LightningDataModule):
  def __init__(self, train_triplets, val_triplets):
      super().__init__()
      self.train_triplets = train_triplets
      self.val_triplets = val_triplets

  def _collect_data(self, triplets):
      users = triplets["user"].values
      positives = triplets["track_pos"].values
      negatives = triplets["track_neg"].values
      pos_proba = triplets["pos_proba"].values
      neg_proba = triplets["neg_proba"].values

      return td.TensorDataset(
            torch.from_numpy(users).long(),
            torch.from_numpy(positives).long(),
            torch.from_numpy(negatives).long(),
            torch.from_numpy(pos_proba).float(),
            torch.from_numpy(neg_proba).float(),
      )

  def prepare_data(self, stage=None):
      if stage == "fit" or stage is None:
        self.train_dataset = self._collect_data(self.train_triplets)
        self.val_dataset = self._collect_data(self.val_triplets)
      elif stage == "test" or stage is None:
        self.test_dataset = self._collect_data(self.test_triplets)

  def train_dataloader(self):
      return td.DataLoader(self.train_dataset, batch_size=2048, shuffle=True, num_workers=0)

  def val_dataloader(self):
      return td.DataLoader(self.val_dataset, batch_size=2048, num_workers=0)

SVD будем оптимизировать через BPR loss с использованием и без использования IPS, иными словами
1. Vanilla: $P((e_u, e_p) - (e_u, e_n))$
2. IPS: $P((e_u, e_p) - (e_u, e_n)) / P(O_{u,p} = 1, O_{u,n} = 1)$.

Считаем, что события $O_{u,p}$ и $O_{u,n}$ независимы, поэтому $P(O_{u,p} = 1, O_{u,n} = 1) = P(O_{u,p} = 1) P(O_{u,n} = 1)$

In [None]:
class SVD(pl.LightningModule):
    def __init__(
        self,
        user_size: int,
        item_size: int,
        embedding_dim: int = 100,
        use_ips: bool = True,
        lr: float = 1e-3,
        weight_decay: float = 1e-6,
        log_to_prog_bar: bool = True,
    ) -> None:
        super().__init__()
        self.lr = lr
        self.weight_decay = weight_decay
        self.log_to_prog_bar = log_to_prog_bar
        self.use_ips = use_ips
        
        self.user_embeddings = nn.Embedding(user_size, embedding_dim)
        
        self.item_embeddings = nn.Embedding(item_size, embedding_dim)
        self.item_bias = nn.Embedding(item_size, 1)

    def forward(
        self,
        user_id: torch.Tensor,
        item_id: torch.Tensor
    ) -> torch.Tensor:
        user_embedding = self.user_embeddings(user_id)
        item_embedding = self.item_embeddings(item_id)
        item_bias = self.item_bias(item_id)

        # User bias использовать не будем, поскольку он сократится в формуле BPR
        return (user_embedding * item_embedding).sum(axis=1) + item_bias

    def _step(self, batch, batch_idx, metric, prog_bar=False):
        user, pos, neg, pos_proba, neg_proba = batch
        
        pos_score = self(user, pos)
        neg_score = self(user, neg)
        
        loss = F.logsigmoid(pos_score - neg_score)
        if self.use_ips:
            loss = -(loss - pos_proba - neg_proba).mean()
        else:
            loss = -loss.mean()
        self.log(metric, loss, prog_bar=prog_bar)
        return loss

    def training_step(self, batch: typing.Sequence[torch.Tensor], batch_idx: int) -> torch.Tensor:
        return self._step(batch, batch_idx, "train_loss")

    def validation_step(self, batch: typing.Sequence[torch.Tensor], batch_idx: int) -> torch.Tensor:
        return self._step(batch, batch_idx, "val_loss", self.log_to_prog_bar)

    def test_step(self, batch, batch_idx, prog_bar=False):
        return self._step(batch, batch_idx, "test_loss", self.log_to_prog_bar)

    def configure_optimizers(self):
        optimizer = torch.optim.Adam(self.parameters(), lr=self.lr, weight_decay=self.weight_decay)
        lr_scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, patience=3, verbose=True)
        scheduler = {
            'scheduler': lr_scheduler,
            'monitor': 'val_loss'
        }
        return [optimizer], [scheduler]

In [None]:
def compute_recommendations(svd: pl.LightningModule, users: np.array):
    user_embeddings = svd.user_embeddings(torch.from_numpy(users))
    item_embeddings = svd.item_embeddings(torch.from_numpy(np.arange(0, 50000, dtype=np.int32)))
    item_biases = svd.item_bias(torch.from_numpy(np.arange(0, 50000, dtype=np.int32)))
    scores = ((user_embeddings @ item_embeddings.T) + item_biases.T)
    recs = scores.argsort(descending=True, axis=1)[:, :50].cpu().numpy()

    result = dict()
    for u, r in zip(users, recs):
        result[int(u)] = r.tolist()
    return result

In [None]:
def save_recommendations(recommendations: dict, fhandle):
    result = list()
    for user, tracks in recommendations.items():
        result.append(json.dumps({"user": user, "tracks": tracks}))
        
    fhandle.write("\n".join(result))

In [None]:
data_module = SVDData(train_data, val_data)
svd_ips = SVD(10001, 50001, embedding_dim=50, use_ips=True, lr=1e-2).float()

trainer = pl.Trainer(
    max_epochs=50,
    accelerator='gpu',
    devices=1,
    callbacks=[
        pl.callbacks.early_stopping.EarlyStopping(monitor="val_loss", patience=5),
        pl.callbacks.LearningRateMonitor(logging_interval="step")
    ])

In [None]:
trainer.fit(svd_ips, data_module)

In [None]:
with open("./stage_2/recommendations_svd_ips.json", "w") as f:
    recommendations = compute_recommendations(svd_ips, triplets['user'].unique())
    save_recommendations(recommendations, f)

In [None]:
data_module = SVDData(train_data, val_data)
svd = SVD(10001, 50001, embedding_dim=50, use_ips=False, lr=1e-2).float()

trainer = pl.Trainer(
    max_epochs=30,
    accelerator='gpu',
    devices=1,
    callbacks=[
        pl.callbacks.early_stopping.EarlyStopping(monitor="val_loss", patience=5),
        pl.callbacks.LearningRateMonitor(logging_interval="step")
    ])

In [None]:
trainer.fit(svd, data_module)

In [None]:
with open("./stage_2/recommendations_svd.json", "w") as f:
    recommendations = compute_recommendations(svd, triplets['user'].unique())
    save_recommendations(recommendations, f)