In [1]:
from pathlib import Path
from typing import Any, Protocol, List, TypeVar, Generic, Optional, Tuple
from pprint import pprint

import polars as pl
import numpy as np
from scipy.sparse import coo_matrix, csr_matrix
from implicit.als import AlternatingLeastSquares

from catboost import CatBoostClassifier, Pool
from catboost.metrics import (
    Logloss
)
from sklearn.model_selection import train_test_split

from my_recsys_metrics import compute_metrics
from my_utils import make_submission


In [2]:
data_path = Path("../data/music_recsys")
train_events = pl.read_parquet(data_path / "train_events.parquet")
track_features = pl.read_parquet(data_path / "track_features.parquet")
users_for_submission = pl.read_parquet(data_path / "users_for_submission.parquet")


In [3]:
track_features


track_id,artist_id,cluster_id,vector
i32,i32,i32,list[f32]
780567,119273,52,"[-0.16186, 0.005062, … -0.012133]"
679759,159956,93,"[-0.148054, -0.00309, … -0.013735]"
679759,119273,93,"[-0.148054, -0.00309, … -0.013735]"
652834,119273,52,"[-0.177701, 0.01612, … -0.007356]"
652834,105420,52,"[-0.177701, 0.01612, … -0.007356]"
951680,21163,40,"[-0.160207, 0.024698, … -0.021361]"
951680,119273,40,"[-0.160207, 0.024698, … -0.021361]"
370197,119273,4,"[-0.164728, 0.009855, … -0.036365]"
370197,131894,4,"[-0.164728, 0.009855, … -0.036365]"
1450872,152607,86,"[-0.144641, -0.016293, … 0.037072]"


In [4]:
train_events


user_id,track_id,session_id,datetime,play_ratio
i32,i32,i32,datetime[ms],f32
4810600,1401235,10316065,2023-05-13 07:28:06.428,0.446721
4810600,873282,10316065,2023-05-13 07:36:22.449,1.0
4810600,230422,10316065,2023-05-13 07:36:37.923,0.182609
4810600,795841,10316065,2023-05-13 07:41:18.818,1.0
4810600,1457475,10316065,2023-05-13 07:41:49.042,0.154206
4810600,522206,10316065,2023-05-13 07:46:40.852,1.0
4810600,400737,10316065,2023-05-13 07:48:41.085,0.762821
4810600,373450,10316065,2023-05-13 07:55:08.769,1.0
4810600,967007,10316065,2023-05-13 08:00:35.971,1.0
4810600,613489,10316065,2023-05-13 08:03:36.837,0.767347


In [5]:
_T = TypeVar("_T")
_U = TypeVar("_U")

class TransformerLike(Protocol):
    def fit_transform(self, input: Any) -> Any: ...


class Pipeline(Generic[_T, _U]):
    def __init__(self, transformers: List[TransformerLike]) -> None:
        self.transformers = transformers

    def fit_transform(self, x: _T) -> _U:
        y: Any = x
        for t in self.transformers:
            print(f"Fit-transform with {t.__class__.__name__}")
            if isinstance(y, tuple):
                y = t.fit_transform(*y)
            else:
                y = t.fit_transform(y)
        return y

class AddInput(Generic[_T, _U]):
    def __init__(self, value: _U) -> None:
        self.value = value

    def fit_transform(self, *x: _T) -> Tuple[_T, _U]:
        return (*x, self.value)


class OrdinalEncoder:
    def __init__(self, column: str) -> None:
        self.column = column

    def fit(self, df: pl.DataFrame) -> "OrdinalEncoder":
        self._mapper = (
            df[[self.column]].unique()
            .sort(self.column)
            .with_row_count("__index__")
            .with_columns(pl.col("__index__").cast(pl.Int32))
        )
        return self

    def transform(self, df: pl.DataFrame) -> pl.DataFrame:
        df = (
            df
            .join(self._mapper, on=self.column, how="left")
            .drop(self.column)
            .rename({"__index__": self.column})
        )
        return df

    def inverse_transform(self, df: pl.DataFrame) -> pl.DataFrame:
        df = (
            df
            .rename({self.column: "__index__"})
            .join(
                self._mapper,
                on="__index__",
                how="left",
            )
            .drop(f"__index__")
        )
        return df

    def fit_transform(self, df: pl.DataFrame) -> pl.DataFrame:
        return self.fit(df).transform(df)


class FrequencyEncoder:
    def __init__(self, user_column: str, item_column: str, value_column: str) -> None:
        self.user_column = user_column
        self.item_column = item_column
        self.value_column = value_column

    def fit_transform(self, events: pl.DataFrame) -> pl.DataFrame:
        frequency_scores = (
            events
            .group_by(self.user_column, self.item_column)
            .agg(pl.col(self.item_column).count().alias("n_interactions_per_user"))
            .with_columns(
                pl.col("n_interactions_per_user").sum().over(self.user_column).alias("n_interactions_total"),
            )
            .with_columns(
                (pl.col("n_interactions_per_user") / pl.col("n_interactions_total")).alias(self.value_column),
            )
            .drop("n_interactions_per_user", "n_interactions_total")
        )
        return frequency_scores


class CSRConverter:
    def __init__(self, user_column: str, item_column: str, value_column: str) -> None:
        self.user_column = user_column
        self.item_column = item_column
        self.value_column = value_column

    def fit_transform(self, coo: pl.DataFrame) -> csr_matrix:
        user_idx = coo[self.user_column].to_numpy()
        item_idx = coo[self.item_column].to_numpy()
        values = coo[self.value_column].to_numpy()

        n_users = user_idx.max() + 1
        n_items = item_idx.max() + 1

        user_item_coo = coo_matrix(
            (
                values.astype(np.float32),
                (user_idx, item_idx),
            ),
            shape=(n_users, n_items),
            dtype=np.float32,
        )

        user_item_coo.sum_duplicates()

        user_item_csr = user_item_coo.tocsr()
        return user_item_csr


In [6]:
encoding_pipeline: Pipeline[pl.DataFrame, pl.DataFrame] = Pipeline([
    OrdinalEncoder(column="user_id"),
    OrdinalEncoder(column="track_id"),
])

encoded_train_events = encoding_pipeline.fit_transform(train_events)

item_encoder: OrdinalEncoder = encoding_pipeline.transformers[1]
track_features_encoded = item_encoder.transform(track_features)


Fit-transform with OrdinalEncoder
Fit-transform with OrdinalEncoder


In [7]:
class LeaveLastOutSplit:
    def __init__(
        self,
        min_ratio: float,
    ) -> None:
        self.min_ratio = min_ratio

    def fit_transform(self, events: pl.DataFrame) -> Tuple[pl.DataFrame, pl.DataFrame]:
        positive_events = events.filter(pl.col("play_ratio") > self.min_ratio)
        split_df_per_user = (
            positive_events
            .group_by("user_id")
            .agg(pl.col("datetime").alias("split_dt_per_user"))
            .filter(pl.col("split_dt_per_user").list.len() > 5)
            .with_columns(pl.col("split_dt_per_user").list.sort().list.take(-5))
            .explode("split_dt_per_user")
        )

        events_with_split_dt = events.join(split_df_per_user, on="user_id", how="inner")

        fold1_events = (
            events_with_split_dt
            .filter(pl.col("datetime") < pl.col("split_dt_per_user"))
            .drop("split_dt_per_user")
        )
        fold2_events = (
            events_with_split_dt
            .filter(pl.col("datetime") >= pl.col("split_dt_per_user"))
            .drop("split_dt_per_user")
        )

        fold1_events, fold2_events = self._keep_user_intersection(fold1_events, fold2_events)
        return (fold1_events, fold2_events)

    def _keep_user_intersection(self, df1: pl.DataFrame, df2: pl.DataFrame) -> Tuple[pl.DataFrame, pl.DataFrame]:
        unique_users = set(df1["user_id"].unique()) & set(df2["user_id"].unique())
        common_users = pl.Series("user_id", list(unique_users), df1["user_id"].dtype).to_frame()

        df1 = df1.join(common_users, on="user_id", how="inner")
        df2 = df2.join(common_users, on="user_id", how="inner")
        return (df1, df2)


encoded_train_events_fold1, encoded_train_events_fold2 = \
    LeaveLastOutSplit(min_ratio=0.3).fit_transform(encoded_train_events)


In [8]:
class ALS:
    def __init__(
        self,
        user_column: str,
        item_column: str,
        score_column: str,
        n_factors: int,
        n_iterations: int,
        top_k: int,
    ) -> None:
        self.user_column = user_column
        self.item_column = item_column
        self.score_column = score_column
        self.n_factors = n_factors
        self.n_iterations = n_iterations
        self.top_k = top_k

    def fit_predict(self, events: pl.DataFrame) -> pl.DataFrame:
        user_item = self._preprocess_events(events)

        als = AlternatingLeastSquares(
            factors=self.n_factors,
            iterations=self.n_iterations,
            alpha=40.0,
            regularization=0.001,
            calculate_training_loss=True,
        )
        als.fit(user_item)

        user_ids = np.arange(user_item.shape[0])
        recommended_item_indices, recommended_scores = als.recommend(
            user_ids,
            user_item,
            N=self.top_k,
            filter_already_liked_items=True,
        )

        scores_df = pl.DataFrame({
            self.user_column: pl.Series(user_ids, dtype=pl.Int32),
            self.item_column: pl.Series(recommended_item_indices, dtype=pl.List(pl.Int32)),
            self.score_column: pl.Series(recommended_scores, dtype=pl.List(pl.Float32)),
        })

        scores_df = scores_df.explode(self.item_column, self.score_column)

        return scores_df


    def _preprocess_events(self, events: pl.DataFrame) -> csr_matrix:
        user_item_pipeline: Pipeline[pl.DataFrame, csr_matrix] = Pipeline([
            FrequencyEncoder(user_column=self.user_column, item_column=self.item_column, value_column="value"),
            CSRConverter(user_column=self.user_column, item_column=self.item_column, value_column="value"),
        ])

        user_item = user_item_pipeline.fit_transform(events)
        return user_item


In [9]:
als_fold1 = ALS(
    user_column="user_id",
    item_column="track_id",
    score_column="score",
    n_factors=128,
    n_iterations=10,
    top_k=200,
)
als_fold1_recommendations = als_fold1.fit_predict(encoded_train_events_fold1)


Fit-transform with FrequencyEncoder
Fit-transform with CSRConverter


100%|██████████| 10/10 [00:44<00:00,  4.43s/it, loss=0.000288]


In [10]:

user_encoder: OrdinalEncoder = encoding_pipeline.transformers[0]
item_encoder: OrdinalEncoder = encoding_pipeline.transformers[1]

als_recommendations_decoded = als_fold1_recommendations
als_recommendations_decoded = user_encoder.inverse_transform(als_recommendations_decoded)
als_recommendations_decoded = item_encoder.inverse_transform(als_recommendations_decoded)

als_submission = make_submission(als_recommendations_decoded)
compute_metrics(als_submission, pl.read_parquet(data_path / "ground_truth.parquet"))


{'ndcg@10': 0.015397905343272917, 'recall@10': 0.023803214329530118}

In [11]:
class LabelCandidateRecommendations:
    def __init__(self, min_ratio: float) -> None:
        self.min_ratio = min_ratio

    def fit_transform(
        self,
        recommendations: pl.DataFrame,
        ground_truth: pl.DataFrame,
    ) -> pl.DataFrame:

        ground_truth_labels = (
            ground_truth
            .filter(pl.col("play_ratio") >= self.min_ratio)
            [["user_id", "track_id"]].unique()
            .with_columns(pl.lit(1).alias("label"))
        )

        labeled = (
            recommendations
            .join(ground_truth_labels, on=["user_id", "track_id"], how="left")
            .with_columns(pl.col("label").fill_null(0))
        )

        return labeled


als_fold1_recommendations_labeled = LabelCandidateRecommendations(min_ratio=0.3).fit_transform(
    als_fold1_recommendations,
    encoded_train_events_fold2,
)


In [12]:
als_fold1_recommendations_labeled


user_id,track_id,score,label
i32,i32,f32,i32
0,110961,0.53156,0
0,11237,0.359287,0
0,18762,0.333765,0
0,17440,0.320054,0
0,7592,0.279366,0
0,63402,0.256453,0
0,23826,0.247641,0
0,74189,0.244757,0
0,84451,0.236919,0
0,32314,0.235172,0


In [13]:
class TrackPopularityFeature:
    def fit_transform(self, candidates: pl.DataFrame, events: pl.DataFrame) -> pl.DataFrame:
        track_popularity = (
            events
            .filter(pl.col("play_ratio") >= 0.3)
            .group_by("track_id")
            .agg((pl.col("user_id").count() / events["track_id"].n_unique()).alias("track_popularity"))
        )

        features = candidates.join(track_popularity, on="track_id", how="left")
        return features


class TrackListenRatio:
    def fit_transform(self, candidates: pl.DataFrame, events: pl.DataFrame) -> pl.DataFrame:
        track_skip_ratio = (
            events
            .with_columns((pl.col("play_ratio") > 0.9).alias("is_listened"))
            .group_by("track_id").agg(pl.col("is_listened").mean().alias("track_listen_ratio"))
        )

        features = candidates.join(track_skip_ratio, on="track_id", how="left")
        return features


class TrackSkipRatio:
    def fit_transform(self, candidates: pl.DataFrame, events: pl.DataFrame) -> pl.DataFrame:
        track_skip_ratio = (
            events
            .with_columns((pl.col("play_ratio") < 0.1).alias("is_skip"))
            .group_by("track_id").agg(pl.col("is_skip").mean().alias("track_skip_ratio"))
        )

        features = candidates.join(track_skip_ratio, on="track_id", how="left")
        return features


class TrackMeanPlayRatioFeature:
    def fit_transform(self, candidates: pl.DataFrame, events: pl.DataFrame) -> pl.DataFrame:
        mean_play_ratio = events["play_ratio"].mean()

        track_play_ratio = (
            events
            .group_by("track_id")
            .agg((pl.col("play_ratio").mean()).alias("track_mean_play_ratio"))
            .with_columns(
                (pl.col("track_mean_play_ratio") - mean_play_ratio)
                .alias("track_mean_play_ratio_diff")
            )
        )

        features = candidates.join(track_play_ratio, on="track_id", how="left")
        return features


class UserAlreadyListenedArtistFeature:
    def fit_transform(
        self,
        candidates: pl.DataFrame,
        events: pl.DataFrame,
        track_features: pl.DataFrame,
    ) -> pl.DataFrame:
        track_artist_mapping = track_features[["track_id", "artist_id"]].unique()

        user_artists = (
            events[["user_id", "track_id"]].unique()
            .join(track_artist_mapping, on="track_id", how="inner")
            .select("user_id", "artist_id")
            .unique()
            .with_columns(pl.lit(1).alias("user_listened_track_artist"))
        )

        candidates_with_artists = (
            candidates
            .select("user_id", "track_id")
            .unique()
            .join(track_artist_mapping, on="track_id", how="inner")
        )


        candidates_with_artists = (
            candidates_with_artists
            .join(user_artists, on=["user_id", "artist_id"], how="left")
            .with_columns(pl.col("user_listened_track_artist").fill_null(0))
            .group_by(["user_id", "track_id"]).agg(pl.col("user_listened_track_artist").sum())
        )

        candidates = (
            candidates
            .join(candidates_with_artists, on=["user_id", "track_id"], how="left")
            .with_columns(pl.col("user_listened_track_artist").fill_null(0))
        )
        return candidates


class ArtistListenRatio:
    def fit_transform(
        self,
        candidates: pl.DataFrame,
        events: pl.DataFrame,
        track_features: pl.DataFrame,
    ) -> pl.DataFrame:
        track_artist_mapping = track_features[["track_id", "artist_id"]].unique()

        artist_ratio = (
            events[["user_id", "track_id", "play_ratio"]]
            .join(track_artist_mapping, on="track_id", how="inner")
            .with_columns((pl.col("play_ratio") >= 0.3).alias("is_listened"))
            .group_by("artist_id").agg(pl.col("is_listened").mean().alias("artist_listen_ratio"))
        )

        candidates_with_artists = (
            candidates
            .join(track_artist_mapping.unique(subset="track_id", keep="first"), on="track_id", how="inner")
        )

        features = candidates_with_artists.join(artist_ratio, on="artist_id", how="left").drop("artist_id")
        return features



In [14]:
candidates_features_pipeline = Pipeline[pl.DataFrame, pl.DataFrame]([
    AddInput(encoded_train_events_fold1),
    TrackPopularityFeature(),
    AddInput(encoded_train_events_fold1),
    TrackSkipRatio(),
    AddInput(encoded_train_events_fold1),
    TrackListenRatio(),
    AddInput(encoded_train_events_fold1),
    TrackMeanPlayRatioFeature(),
    AddInput(encoded_train_events_fold1),
    AddInput(track_features_encoded),
    UserAlreadyListenedArtistFeature(),
])

als_fold1_recommendations_featurized = candidates_features_pipeline.fit_transform(als_fold1_recommendations_labeled)
als_fold1_recommendations_featurized


Fit-transform with AddInput
Fit-transform with TrackPopularityFeature
Fit-transform with AddInput
Fit-transform with TrackSkipRatio
Fit-transform with AddInput
Fit-transform with TrackListenRatio
Fit-transform with AddInput
Fit-transform with TrackMeanPlayRatioFeature
Fit-transform with AddInput
Fit-transform with AddInput
Fit-transform with UserAlreadyListenedArtistFeature


user_id,track_id,score,label,track_popularity,track_skip_ratio,track_listen_ratio,track_mean_play_ratio,track_mean_play_ratio_diff,user_listened_track_artist
i32,i32,f32,i32,f64,f64,f64,f32,f32,i32
0,110961,0.53156,0,0.008165,0.264764,0.511612,0.599892,0.072317,0
0,11237,0.359287,0,0.007603,0.273834,0.48952,0.576922,0.049346,1
0,18762,0.333765,0,0.017919,0.333798,0.45845,0.551915,0.024339,0
0,17440,0.320054,0,0.011984,0.295839,0.515775,0.601671,0.074096,0
0,7592,0.279366,0,0.005789,0.304007,0.45122,0.553364,0.025788,2
0,63402,0.256453,0,0.001149,0.395349,0.445736,0.511266,-0.016309,0
0,23826,0.247641,0,0.007491,0.383446,0.342905,0.456235,-0.07134,0
0,74189,0.244757,0,0.005193,0.343659,0.496612,0.567318,0.039743,0
0,84451,0.236919,0,0.001218,0.466238,0.324759,0.42392,-0.103656,0
0,32314,0.235172,0,0.043044,0.311679,0.462861,0.563517,0.035941,0


In [15]:
class CatboostRanker:
    def __init__(self, top_k: int) -> None:
        self.top_k = top_k

    def fit_transform(self, recommendations_labeled: pl.DataFrame) -> pl.DataFrame:
        return self.fit(recommendations_labeled).transform(recommendations_labeled)

    def fit(self, recommendations_labeled: pl.DataFrame) -> "CatboostRanker":
        X_train, X_eval = train_test_split(recommendations_labeled, test_size=0.2)

        positive_classes_rate = X_train["label"].sum() / len(X_train)
        print("Positive rate:", positive_classes_rate)

        X_train_pd, X_eval_pd = [
            X.drop("user_id", "track_id", "label").to_pandas()
            for X in [X_train, X_eval]
        ]

        train_pool = Pool(X_train_pd, X_train["label"].to_numpy())
        eval_pool = Pool(X_eval_pd, X_eval["label"].to_numpy())

        catboost_cls = CatBoostClassifier(
            iterations=100,
            class_weights=[positive_classes_rate, 1 - positive_classes_rate],
            eval_metric=Logloss(),
            objective=Logloss(),
            early_stopping_rounds=20,
            verbose=1,
        )

        catboost_cls.fit(
            train_pool,
            eval_set=eval_pool,
        )

        pprint(sorted(zip(X_train_pd.columns, catboost_cls.feature_importances_), key=lambda x: -x[1]))

        self._model = catboost_cls
        return self

    def transform(self, recommendations: pl.DataFrame) -> pl.DataFrame:
        pool = Pool(recommendations.drop("user_id", "track_id", "label").to_pandas())
        probas = self._model.predict_proba(pool)[:, 1]

        recommendations_ranked = recommendations.select(
            pl.col("user_id"),
            pl.col("track_id"),
            pl.lit(probas).alias("score")
        )

        recommendations_head = (
            recommendations_ranked
            .group_by("user_id")
            .agg(pl.col("track_id", "score").sort_by("score", descending=True).head(self.top_k))
            .explode("track_id", "score")
        )

        return recommendations_head



catboost_fold2_recommendations = \
    CatboostRanker(top_k=10).fit_transform(als_fold1_recommendations_featurized)


Positive rate: 0.0029586161773125052
Learning rate set to 0.5
0:	learn: 0.6291182	test: 0.6290583	best: 0.6290583 (0)	total: 278ms	remaining: 27.6s
1:	learn: 0.6071553	test: 0.6077790	best: 0.6077790 (1)	total: 427ms	remaining: 20.9s
2:	learn: 0.5982370	test: 0.5992341	best: 0.5992341 (2)	total: 529ms	remaining: 17.1s
3:	learn: 0.5932117	test: 0.5954932	best: 0.5954932 (3)	total: 623ms	remaining: 14.9s
4:	learn: 0.5903936	test: 0.5937505	best: 0.5937505 (4)	total: 703ms	remaining: 13.4s
5:	learn: 0.5877511	test: 0.5924777	best: 0.5924777 (5)	total: 791ms	remaining: 12.4s
6:	learn: 0.5854918	test: 0.5912804	best: 0.5912804 (6)	total: 878ms	remaining: 11.7s
7:	learn: 0.5843424	test: 0.5909076	best: 0.5909076 (7)	total: 968ms	remaining: 11.1s
8:	learn: 0.5823998	test: 0.5912815	best: 0.5909076 (7)	total: 1.08s	remaining: 10.9s
9:	learn: 0.5806569	test: 0.5909510	best: 0.5909076 (7)	total: 1.16s	remaining: 10.5s
10:	learn: 0.5791124	test: 0.5903127	best: 0.5903127 (10)	total: 1.26s	remaini

In [16]:

user_encoder: OrdinalEncoder = encoding_pipeline.transformers[0]
item_encoder: OrdinalEncoder = encoding_pipeline.transformers[1]

catboost_recommendations_decoded = catboost_fold2_recommendations
catboost_recommendations_decoded = user_encoder.inverse_transform(catboost_recommendations_decoded)
catboost_recommendations_decoded = item_encoder.inverse_transform(catboost_recommendations_decoded)

catboost_submission = make_submission(catboost_recommendations_decoded)
compute_metrics(catboost_submission, pl.read_parquet(data_path / "ground_truth.parquet"))

# {'ndcg@10': 0.015397905343272917, 'recall@10': 0.023803214329530118}


{'ndcg@10': 0.01583349889611419, 'recall@10': 0.024534412955465587}