In [1]:
!pip install replay-rec[all] --quiet
!pip install pyspark
!pip install rs_datasets

In [2]:
import numpy as np
import pandas as pd
from replay.splitters import TimeSplitter
from rs_datasets import MovieLens

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, avg, count, max, min
from rs_datasets import MovieLens
import polars as pl

import warnings
warnings.filterwarnings("ignore")

##Spark pipeline

### Скачивание датасета

In [3]:
ml = MovieLens("1m")
pandas_data = ml.ratings
pandas_data.head()

INFO:rs_datasets:Downloading ml-1m from grouplens...
5.93MB [00:00, 9.01MB/s]                            


Unnamed: 0,user_id,item_id,rating,timestamp
0,1,1193,5,978300760
1,1,661,3,978302109
2,1,914,3,978301968
3,1,3408,4,978300275
4,1,2355,5,978824291


### Создание Spark сессии и открытие датасета

In [4]:
spark = SparkSession.builder \
    .appName("Spark DataFrame Examples") \
    .getOrCreate()

In [5]:
from replay.utils.spark_utils import convert2spark

In [6]:
data = convert2spark(pandas_data)

Предобработка датасета

In [7]:
from replay.preprocessing.filters import MinCountFilter

In [8]:
data = MinCountFilter(num_entries=20).transform(data)

### Анализ датасета на Spark

Схема датасета

In [9]:
data.printSchema()

root
 |-- user_id: integer (nullable = true)
 |-- item_id: integer (nullable = true)
 |-- rating: integer (nullable = true)
 |-- timestamp: integer (nullable = true)



In [10]:
data.show(5)

+-------+-------+------+---------+
|user_id|item_id|rating|timestamp|
+-------+-------+------+---------+
|      2|   1357|     5|978298709|
|      2|   3068|     4|978299000|
|      2|   1537|     4|978299620|
|      2|    647|     3|978299351|
|      2|   2194|     4|978299297|
+-------+-------+------+---------+
only showing top 5 rows



In [11]:
unique_users_count = data.select("user_id").distinct().count()
unique_items_count = data.select("item_id").distinct().count()
total_rows_count = data.count()

print(f"Количество уникальных пользователей: {unique_users_count}")
print(f"Количество уникальных айтемов: {unique_items_count}")
print(f"Общее количество взаимодействий: {total_rows_count}")

Количество уникальных пользователей: 6040
Количество уникальных айтемов: 3706
Общее количество взаимодействий: 1000209


In [12]:
user_item_counts = data.groupBy("user_id").agg(count("item_id").alias("item_count"))
average_item_count = user_item_counts.agg(avg("item_count")).first()[0]
density = total_rows_count / (unique_items_count * unique_users_count)

print(f"Среднее количество взаимодействий: {int(average_item_count)}")
print(f"Плотность данных: {round(density,3)}")

Среднее количество взаимодействий: 165
Плотность данных: 0.045


### Разбиение данных

In [13]:
train, test = TimeSplitter(time_threshold=0.2,
                           drop_cold_users=True,
                           drop_cold_items=True,
                           query_column="user_id").split(data)

оставим в test взаимодействия с оценкой не ниже 4

In [14]:
from replay.preprocessing.filters import LowRatingFilter

In [15]:
test = LowRatingFilter(value=4, rating_column="rating").transform(test)

In [16]:
unique_users_count = train.select("user_id").distinct().count()
unique_items_count = train.select("item_id").distinct().count()
total_rows_count = train.count()

print(f"Количество уникальных пользователей в train: {unique_users_count}")
print(f"Количество уникальных айтемов в train: {unique_items_count}")
print(f"Общее количество строк в train: {total_rows_count}")

Количество уникальных пользователей в train: 5400
Количество уникальных айтемов в train: 3662
Общее количество строк в train: 800164


In [17]:
unique_users_count = test.select("user_id").distinct().count()
unique_items_count = test.select("item_id").distinct().count()
total_rows_count = test.count()

print(f"Количество уникальных пользователей в test: {unique_users_count}")
print(f"Количество уникальных айтемов в test: {unique_items_count}")
print(f"Общее количество строк в test: {total_rows_count}")

Количество уникальных пользователей в test: 1122
Количество уникальных айтемов в test: 3021
Общее количество строк в test: 56249


### Подготовка данных

In [18]:
from replay.data.dataset import Dataset, FeatureSchema, FeatureInfo, FeatureHint, FeatureType
from replay.data.dataset_utils import DatasetLabelEncoder

In [19]:
feature_schema = FeatureSchema(
     [
         FeatureInfo(
             column="user_id",
             feature_type=FeatureType.CATEGORICAL,
             feature_hint=FeatureHint.QUERY_ID,
         ),
         FeatureInfo(
             column="item_id",
             feature_type=FeatureType.CATEGORICAL,
             feature_hint=FeatureHint.ITEM_ID,
         ),
        FeatureInfo(
            column="rating",
            feature_type=FeatureType.NUMERICAL,
            feature_hint=FeatureHint.RATING,
         ),
     ]
)
train_dataset = Dataset(feature_schema, train)
test_dataset = Dataset(feature_schema, test)

Применим LabelEncoder

In [20]:
encoder = DatasetLabelEncoder()
encoder.fit(train_dataset)

<replay.data.dataset_utils.dataset_label_encoder.DatasetLabelEncoder at 0x7c9f846c79d0>

In [21]:
train_dataset = encoder.transform(train_dataset)
test_dataset = encoder.transform(test_dataset)

### ALS

In [22]:
from replay.models import ALSWrap

In [23]:
model = ALSWrap(rank=32)

In [24]:
model.fit(train_dataset)

In [25]:
candidates = model.predict(dataset=train_dataset, k=10)
candidates.show(5)

+-------+-------+------------------+
|user_id|item_id|            rating|
+-------+-------+------------------+
|      2|   3176|0.9856088757514954|
|      2|   3200|0.9761657118797302|
|      2|   1878|0.9660410284996033|
|      2|   1949|0.9466403722763062|
|      2|   3196|0.9440043568611145|
+-------+-------+------------------+
only showing top 5 rows



### Подсчет метрик

In [26]:
from replay.metrics import HitRate, NDCG, MAP, Coverage, Experiment

In [27]:
metrics = [
     HitRate(topk=10),
     NDCG(topk=10),
     MAP(topk=10),
     Coverage(topk=10)]

In [28]:
exp = Experiment(
    metrics,
    test_dataset.interactions,
    train_dataset.interactions,
    query_column=train_dataset.feature_schema.query_id_column,
    item_column=train_dataset.feature_schema.item_id_column,
    rating_column=train_dataset.feature_schema.interactions_rating_column,
)

In [29]:
exp.add_result(name="ALS", recommendations=candidates)
exp.results

Unnamed: 0,HitRate@10,NDCG@10,MAP@10,Coverage@10
ALS,0.632799,0.20383,0.116949,0.358547


## Polars pipeline

In [30]:
data = pl.from_pandas(pandas_data)

Предобработка датасета

In [31]:
from replay.preprocessing.filters import MinCountFilter

In [32]:
data = MinCountFilter(num_entries=20, groupby_column="item_id").transform(data)

### Анализ датасета на Polars

In [33]:
data.head(5)

user_id,item_id,rating,timestamp
i32,i32,i32,i32
1,1193,5,978300760
1,661,3,978302109
1,914,3,978301968
1,3408,4,978300275
1,2355,5,978824291


In [34]:
unique_users_count = data.select(pl.col("user_id").unique()).height
unique_items_count = data.select(pl.col("item_id").unique()).height
total_rows_count = data.height

print(f"Количество уникальных пользователей: {unique_users_count}")
print(f"Количество уникальных айтемов: {unique_items_count}")
print(f"Общее количество строк: {total_rows_count}")

Количество уникальных пользователей: 6040
Количество уникальных айтемов: 3043
Общее количество строк: 995492


In [35]:
user_item_counts = data.groupby("user_id").agg(pl.count("item_id").alias("item_count"))
average_item_count = user_item_counts.select(pl.mean("item_count")).to_numpy()[0][0]

density = total_rows_count / (unique_users_count * unique_items_count)

print(f"Среднее количество взаимодействий: {int(average_item_count)}")
print(f"Плотность данных: {round(density,3)}")

Среднее количество взаимодействий: 164
Плотность данных: 0.054


### Разбиение данных

In [36]:
train, test = TimeSplitter(time_threshold=0.2,
                           drop_cold_users=True,
                           drop_cold_items=True,
                           query_column='user_id').split(data)

оставим в test взаимодействия с оценкой не ниже 4

In [37]:
from replay.preprocessing.filters import LowRatingFilter

In [38]:
test = LowRatingFilter(value=4, rating_column="rating").transform(test)

In [39]:
unique_users_count = train.select(pl.col("user_id").unique()).height
unique_items_count = train.select(pl.col("item_id").unique()).height
total_rows_count = train.height

print(f"Количество уникальных пользователей в train: {unique_users_count}")
print(f"Количество уникальных айтемов в train: {unique_items_count}")
print(f"Общее количество строк в train: {total_rows_count}")

Количество уникальных пользователей в train: 5399
Количество уникальных айтемов в train: 3041
Общее количество строк в train: 796393


In [40]:
unique_users_count = test.select(pl.col("user_id").unique()).height
unique_items_count = test.select(pl.col("item_id").unique()).height
total_rows_count = test.height

print(f"Количество уникальных пользователей в test: {unique_users_count}")
print(f"Количество уникальных айтемов в test: {unique_items_count}")
print(f"Общее количество строк в test: {total_rows_count}")

Количество уникальных пользователей в test: 1120
Количество уникальных айтемов в test: 2816
Общее количество строк в test: 55987


### Подготовка данных

In [41]:
from replay.data.dataset import Dataset, FeatureSchema, FeatureInfo, FeatureHint, FeatureType

In [42]:
feature_schema = FeatureSchema(
     [
         FeatureInfo(
             column="user_id",
             feature_type=FeatureType.CATEGORICAL,
             feature_hint=FeatureHint.QUERY_ID,
         ),
         FeatureInfo(
             column="item_id",
             feature_type=FeatureType.CATEGORICAL,
             feature_hint=FeatureHint.ITEM_ID,
         ),
        FeatureInfo(
            column="rating",
            feature_type=FeatureType.NUMERICAL,
            feature_hint=FeatureHint.RATING,
         ),
     ]
)
train_dataset = Dataset(feature_schema, train)
test_dataset = Dataset(feature_schema, test)

Применяем LabelEncoder

In [43]:
encoder = DatasetLabelEncoder()
encoder.fit(train_dataset)

<replay.data.dataset_utils.dataset_label_encoder.DatasetLabelEncoder at 0x7c9f79638220>

In [44]:
train_dataset = encoder.transform(train_dataset)
test_dataset = encoder.transform(test_dataset)

Далее train_dataset можно будет использовать в различных моделях из RePlay на основе pytorch.

В качестве построенной модели будет рекомендация случайных айтемов.

In [48]:
np.random.seed = 17

all_items = data["item_id"].unique().to_list()
random_items = np.random.choice(all_items, 10)

unique_users = data["user_id"].unique().to_list()
predict = [{"user_id": user, "item_id": item} for user in unique_users for item in random_items]
predict = pl.DataFrame(predict)
rating = np.tile(range(10), len(unique_users))
predict = predict.with_columns(pl.Series("rating", rating),
                               pl.col("user_id").cast(pl.Int64),
                               pl.col("item_id").cast(pl.Int64))

predict.head()

user_id,item_id,rating
i64,i64,i64
1,2611,0
1,2045,1
1,478,2
1,3397,3
1,27,4


In [49]:
exp = Experiment(
    metrics,
    test_dataset.interactions,
    train_dataset.interactions,
    query_column=train_dataset.feature_schema.query_id_column,
    item_column=train_dataset.feature_schema.item_id_column,
    rating_column=train_dataset.feature_schema.interactions_rating_column,
)

In [50]:
exp.add_result(name='Random', recommendations=predict)
exp.results

Unnamed: 0,HitRate@10,NDCG@10,MAP@10,Coverage@10
Random,0.124107,0.014696,0.004454,0.00296
