# RePlay Tutorial
This notebook is designed to familiarize with the use of RePlay library, including:
- creating SparkSession or passing your own session to RePlay
- data preprocessing
- dataset users and items re-indexing
- data splitting
- model training and inference
- model optimization
- model saving and loading
- models comparison

In [None]:
%load_ext autoreload
%autoreload 2

In [None]:
%config Completer.use_jedi = False

In [None]:
import warnings
from optuna.exceptions import ExperimentalWarning
warnings.filterwarnings("ignore", category=UserWarning)
warnings.filterwarnings("ignore", category=ExperimentalWarning)

In [None]:
import pandas as pd

from pyspark.sql import SparkSession

from replay.preprocessing.data_preparator import DataPreparator, Indexer
from replay.metrics import Coverage, HitRate, NDCG, MAP, Experiment
from replay.utils.model_handler import save, load, save_indexer, load_indexer
from replay.utils.session_handler import get_spark_session, State 
from replay.splitters import UserSplitter
from replay.utils import convert2spark, get_log_info

from replay.data import Dataset, FeatureHint, FeatureInfo, FeatureSchema, FeatureSource, FeatureType
from replay.data.dataset_utils import DatasetLabelEncoder

from replay.models import (
    ALSWrap, 
    ADMMSLIM, 
    ItemKNN,
    LightFMWrap, 
    MultVAE, 
    NeuroMF, 
    SLIM, 
    PopRec, 
    RandomRec,
    UCB,
    UserPopRec,
    Wilson, 
    Word2VecRec,
)

from replay.models.base_rec import HybridRecommender

In [None]:
K = 5
SEED=42

## Managing SparkSession

RePlay uses Spark as a backend, and thus `SparkSession` should be created before RePlay running. Depends on your needs, you can choose, what to do about `SparkSession`.

- Option 1: use default RePlay `SparkSession`
- You can pass you own session to RePlay. Class `State` stores current session. Here you also have two options: 
    - Option 2: call `get_spark_session` to use default RePlay `SparkSession` with the custom driver memory and number of partitions 
    - Option 3: create `SparkSession` from scratch


### Option 1: use default RePlay's SparkSession
It is the simplest and recommended way for the local execution mode. RePlay will get existing SparkSession or create the new one with default configuration.  Default session parameters are stated in `replay/utils/session_handler.py` file. The driver memory volume and number of partitions depends on available RAM and number of cores respectively.

You could initiate default session creation explicitly, e.g. to preprocess spark DataFrames, get link to SparkUI and set logging level, but if you do not create it by yourself, the session will be created by RePlay anyway.

In [None]:
spark = State().session
spark.sparkContext.setLogLevel('ERROR')
spark

In [None]:
def print_config_param(session, conf_name):
    # get current spark session configuration:
    conf = session.sparkContext.getConf().getAll()
    # get num partitions
    print(f'{conf_name}: {dict(conf)[conf_name]}')

In [None]:
print_config_param(spark, 'spark.sql.shuffle.partitions')

### Option 2:  Call `get_spark_session`  function to customize driver memory (spark.driver.memory) or number of partitions (spark.sql.shuffle.partitions) and use the default RePlay settings for other configuration parameters.
We will specify 16 partitions and 4Gb driver memory for example. Pass created session to RePlay `State`.

In [None]:
spark.stop()
session = get_spark_session(spark_memory=4, shuffle_partitions=16)
spark = State(session).session

In [None]:
print_config_param(spark, 'spark.sql.shuffle.partitions')

### Option 3: Create your own session
Pass created session to RePlay `State`.

In [None]:
spark.stop()
session = (
        SparkSession.builder.config("spark.driver.memory", "8g")
        .config("spark.sql.shuffle.partitions", "50")
        .config("spark.driver.bindAddress", "127.0.0.1")
        .config("spark.driver.host", "localhost")
        .master("local[*]")
        .enableHiveSupport()
        .getOrCreate()
    )
spark = State(session).session
print_config_param(spark, 'spark.sql.shuffle.partitions')

#### Will return to the default session config

In [None]:
spark.stop()
spark = State(get_spark_session()).session
spark.sparkContext.setLogLevel('ERROR')
spark

## 0. Data preprocessing <a name='data-preparator'></a>
We will use MovieLens 1m as an example.

In [None]:
df = pd.read_csv("data/ml1m_ratings.dat", sep="\t", names=["user_id", "item_id", "relevance", "timestamp"])
users = pd.read_csv("data/ml1m_users.dat", sep="\t", names=["user_id", "gender", "age", "occupation", "zip_code"])

In [None]:
df.head(2)

In [None]:
df.user_id.nunique(), df.item_id.nunique(), 

In [None]:
users.head(2)

In [None]:
users.user_id.nunique()

### 0.1. DataPreparator

An inner data format in RePlay is a spark dataframe. 

Columns with users' and items' identifiers are required for interaction log. Original user and item identifiers should be named as `user_id` and `item_id`. Those identifiers in section [0.3. Indexing](#indexing) will be converted to integer identifiers, which will be named `user_idx`, `item_idx`. Optional columns for interaction matrix are ``relevance`` and interaction ``timestamp``.

DataFrames with user or item features should have column `user_id` or `item_id` respectively.

We implemented DataPreparator class to convert pandas dataframes to spark format and preprocess the data, including renaming/creation of required and optional interaction matrix columns, null check and dates parsing. It is an optional step, if you already have data in Spark DataFrame format, could rename the above mentioned columns, and confident in completeness and quality of the data, skip this step.

In [None]:
preparator = DataPreparator()

#### Interactions log preprocessing

In [None]:
%%time
log = preparator.transform(columns_mapping={'user_id': 'user_id',
                                      'item_id': 'item_id',
                                      'relevance': 'relevance',
                                      'timestamp': 'timestamp'
                                     }, 
                           data=df)

In [None]:
log.show(2)

In [None]:
log.printSchema()

In [None]:
get_log_info(log, user_col='user_id', item_col='item_id')

As you see, `userId` was renamed to `user_id` and `timestamp` was converted to `TimestampType`.

#### Feature dataframe preprocessing
To transform feature dataframes you could also use DataPreparator:

In [None]:
user_features = preparator.transform(columns_mapping={'user_id': 'user_id'},
                                     data=users)
user_features.show(2)

Using the DataPreparator is optional, you could convert dataFrame to spark with ``convert_to_spark`` from ``replay.utils.spark_utils.py`` and manually rename columns.

In [None]:
# the same result without DataPreparator
convert2spark(users).show(2)

### 0.2 Filtering
It is common to filter interactions log by interaction date or rating value or remove items or users with small number of interactions. RePlay offers some filters presented in `replay.preprocessig.filters` module.
We will leave ratings greater than or equal to 3 and remove users with 4 or fewer interactions.

In [None]:
from replay.preprocessing.filters import filter_by_min_count, filter_out_low_ratings

In [None]:
log = filter_out_low_ratings(log, value=3)
get_log_info(log, user_col='user_id', item_col='item_id')

In [None]:
%%time
log = filter_by_min_count(log, num_entries=5, group_by='user_id')
get_log_info(log, user_col='user_id', item_col='item_id')

<a id='indexing'></a>
### 0.3. Indexing

RePlay models require columns with users' and items' identifiers _(ids)_ to be named as `user_idx` and `item_idx`. Those _ids_ should be integers starting at zero without gaps. This is important for models that use sparse matrices and define the matrix size as the biggest seen user and item index. Storing _ids_ as integers also help to reduce memory usage compared to string _ids_.

You should convert user and item _ids_ in interaction's log and feature dataframes. RaPlay offers Indexer class to perform the _ids_ conversion and convert them back after recommendations generation (predict). The Indexer will store label encoders for users and items and allow transforming ids for users and items, which come after the Indexer fit.

In [None]:
indexer = Indexer(user_col='user_id', item_col='item_id')

Take all available user and item ids from log and features and pass them to Indexer. The _ids_ could repeat, the indexes will be ordered by label frequencies, so the most frequent label gets index 0.

In [None]:
%%time
indexer.fit(users=log.select('user_id').unionByName(user_features.select('user_id')),
            items=log.select('item_id'))

In [None]:
%%time
log_replay = indexer.transform(df=log)
log_replay.show(2)

In [None]:
%%time
user_features_replay = indexer.transform(df=user_features)
user_features_replay.show(2)

### 0.4. Split

RePlay provides you with data splitters to reproduce a validation schemas widely-used in recommender systems. Splitters return cached dataframes to compute them once and re-use for models training, inference and metrics calculation.

`UserSplitter` takes ``item_test_size`` items for ``user_test_size`` user to the test dataset.

In [None]:
%%time
splitter = UserSplitter(
    drop_cold_items=True,
    drop_cold_users=True,
    item_test_size=K,
    user_test_size=500,
    seed=SEED,
    shuffle=False
)
train, test = splitter.split(log_replay)
print(train.count(), test.count())

In [None]:
test.is_cached

### 0.5. Alternative preparations with Dataset and DatasetLabelEncoder

In [None]:
from replay.preprocessing.filters import filter_by_min_count, filter_out_low_ratings

In [None]:
df_spark = convert2spark(df)
df_spark.show(2)

In [None]:
users_spark = convert2spark(users)
users_spark.show(2)

In [None]:
filtered_df = filter_out_low_ratings(df_spark, value=3, rating_column="relevance")
get_log_info(filtered_df, user_col='user_id', item_col='item_id')

In [None]:
filtered_df = filter_by_min_count(filtered_df, num_entries=5, group_by='user_id')
get_log_info(filtered_df, user_col='user_id', item_col='item_id')

In [None]:
filtered_df.show(10)

In [None]:
splitter = UserSplitter(
    user_col="user_id",
    item_col="item_id",
    date_col="timestamp",
    drop_cold_items=True,
    drop_cold_users=True,
    item_test_size=K,
    user_test_size=500,
    seed=SEED,
    shuffle=True
)
train_df, test_df = splitter.split(filtered_df)
print(train_df.count(), test_df.count())

In [None]:
total_user_count = filtered_df.select("user_id").distinct().count()
total_item_count = filtered_df.select("item_id").distinct().count()
print(total_user_count, total_item_count)

In [None]:
feature_schema = FeatureSchema(
    [
        FeatureInfo(
            column="user_id",
            feature_type=FeatureType.CATEGORICAL,
            feature_hint=FeatureHint.QUERY_ID,
            cardinality=total_user_count,
        ),
        FeatureInfo(
            column="item_id",
            feature_type=FeatureType.CATEGORICAL,
            feature_hint=FeatureHint.ITEM_ID,
            cardinality=total_item_count,
        ),
        FeatureInfo(
            column="relevance",
            feature_type=FeatureType.NUMERICAL,
            feature_hint=FeatureHint.RATING,
        ),
        FeatureInfo(
            column="timestamp",
            feature_type=FeatureType.NUMERICAL,
            feature_hint=FeatureHint.TIMESTAMP,
        ),
        FeatureInfo(
            column="gender",
            feature_type=FeatureType.CATEGORICAL,
        ),
        FeatureInfo(
            column="zip_code",
            feature_type=FeatureType.CATEGORICAL,
        ),
    ]
)

feature_schema = FeatureSchema(
    [
        FeatureInfo(
            column="user_id",
            feature_type=FeatureType.CATEGORICAL,
            feature_hint=FeatureHint.QUERY_ID,
            cardinality=total_user_count,
        ),
        FeatureInfo(
            column="item_id",
            feature_type=FeatureType.CATEGORICAL,
            feature_hint=FeatureHint.ITEM_ID,
            cardinality=total_item_count,
        ),
        FeatureInfo(
            column="relevance",
            feature_type=FeatureType.NUMERICAL,
            feature_hint=FeatureHint.RATING,
        ),
        FeatureInfo(
            column="timestamp",
            feature_type=FeatureType.NUMERICAL,
            feature_hint=FeatureHint.TIMESTAMP,
        ),
    ]
)

In [None]:
train_dataset = Dataset(
    feature_schema=feature_schema,
    interactions=train_df,
    query_features=None,
    item_features=None,
    check_consistency=True,
    categorical_encoded=False,
)

test_dataset = Dataset(
    feature_schema=feature_schema,
    interactions=test_df,
    query_features=None,
    item_features=None,
    check_consistency=True,
    categorical_encoded=False,
)

In [None]:
encoder = DatasetLabelEncoder()
train_dataset = encoder.fit_transform(train_dataset)
test_dataset = encoder.transform(test_dataset)

In [None]:
{
    train_dataset.feature_schema.item_id_column,
    train_dataset.feature_schema.query_id_column,
    train_dataset.feature_schema.interactions_rating_column,
    train_dataset.feature_schema.interactions_timestamp_column,
}

## 1. Models training

#### SLIM

In [None]:
slim = SLIM(seed=SEED)

In [None]:
%%time
slim.fit(log=train)

In [None]:
%%time

recs = slim.predict(
    k=K,
    users=test.select('user_idx').distinct(),
    log=train,
    filter_seen_items=False
)

In [None]:
recs.show(2)

## 2. Models evaluation

RePlay implements some popular recommenders' quality metrics. Use pure metrics or calculate a set of chosen metrics and compare models with the ``Experiment`` class.

In [None]:
col_names = {
    "item_col": train_dataset.feature_schema.item_id_column,
    "query_col": train_dataset.feature_schema.query_id_column,
    "rating_col": train_dataset.feature_schema.interactions_rating_column,
}

In [None]:
metrics = Experiment(test_dataset, {NDCG(**col_names): K,
                            MAP(**col_names) : K,
                            HitRate(**col_names): [1, K],
                            Coverage(
                                log=train_dataset.interactions,
                                **col_names
                            ): K
                           })

In [None]:
%%time
metrics.add_result("SLIM", recs)
metrics.results

## 3. Hyperparameters optimization

#### 3.1 Search

In [None]:
# data split for hyperparameters optimization
# train_opt, val_opt = splitter.split(train)

In [None]:
# %%time
# best_params = slim.optimize(train_opt, val_opt, criterion=NDCG(), k=K, budget=15)

In [None]:
# best_params

#### 3.2 Compare with previous

In [None]:
def fit_predict_evaluate(model, experiment, name):
    model.fit(train_dataset)

    recs = model.predict(
        dataset=train_dataset,
        k=K,
        users=test_dataset.query_ids,
        filter_seen_items=False
    )

    experiment.add_result(name, recs)
    return recs

In [None]:
# %%time
# recs = fit_predict_evaluate(SLIM(**best_params, seed=SEED), metrics, 'SLIM_optimized')
# recs.cache() #caching for further processing
# metrics.results.sort_values('NDCG@5', ascending=False)

The optimized model was better on the validation dataset, but shows comparable quality on test (better by HitRate@5 and worse by the other quality metrics). 

## 4. Getting final recommendations 

### Return to original user and item identifiers

In [None]:
# %%time
# recs = indexer.inverse_transform(recs)
# recs.show(2)

### Convert to pandas or save

In [None]:
# recs_pd = recs.toPandas()
# recs_pd.head(2)

In [None]:
# %%time
# recs.write.parquet(path='./slim_recs.parquet', mode='overwrite')

## 4. Save and load

RePlay allows saving and loading fitted models with `save` and `load` functions of `model_handler` module. Model is saved as a folder with all necessary parameters and data.

In [None]:
# %%time
# save_indexer(indexer, './indexer_ml1')
# indexer = load_indexer('./indexer_ml1')

In [None]:
# %%time
# save(slim, path='./slim_best_params')
# slim_loaded = load('./slim_best_params')

In [None]:
# slim_loaded.beta, slim_loaded.lambda_

In [None]:
# %%time
# pred_from_loaded = slim_loaded.predict(k=K,
#     users=test.select('user_idx').distinct(),
#     log=train,
#     filter_seen_items=True)
# pred_from_loaded.show(2)

In [None]:
# %%time
# recs = indexer.inverse_transform(pred_from_loaded)
# recs.show(2)

## 5. Other RePlay models

#### ALS
Commonly-used matrix factorization algorithm.

In [None]:
recs.show(2)

#### ItemKNN
Commonly-used item-based recommender

In [None]:
recs = fit_predict_evaluate(UserPopRec(), metrics, 'UserPopRec')
metrics.results.sort_values('NDCG@5', ascending=False)

In [None]:
# %%time
recs = fit_predict_evaluate(ItemKNN(num_neighbours=100), metrics, 'ItemKNN')
metrics.results.sort_values('NDCG@5', ascending=False)

In [None]:
recs = fit_predict_evaluate(SLIM(seed=SEED), metrics, 'SLIM')
metrics.results.sort_values('NDCG@5', ascending=False)

In [None]:
recs = fit_predict_evaluate(Word2VecRec(seed=SEED), metrics, 'Word2VecRec')
metrics.results.sort_values('NDCG@5', ascending=False)

In [None]:
recs = fit_predict_evaluate(ADMMSLIM(seed=SEED), metrics, 'ADMM SLIM')
metrics.results.sort_values('NDCG@5', ascending=False)

In [None]:
recs = fit_predict_evaluate(ALSWrap(seed=SEED), metrics, 'Implicit ALS')
metrics.results.sort_values('NDCG@5', ascending=False)

In [None]:
recs = fit_predict_evaluate(ALSWrap(seed=SEED, implicit_prefs=False), metrics, 'Explicit ALS')
metrics.results.sort_values('NDCG@5', ascending=False)

In [None]:
recs = fit_predict_evaluate(PopRec(), metrics, 'PopRec')
metrics.results.sort_values('NDCG@5', ascending=False)

In [None]:
recs = fit_predict_evaluate(Wilson(), metrics, 'PopRec')
metrics.results.sort_values('NDCG@5', ascending=False)

In [None]:
recs = fit_predict_evaluate(RandomRec(seed=SEED, distribution='uniform'), metrics, 'RandomRec')
metrics.results.sort_values('NDCG@5', ascending=False)

In [None]:
recs = fit_predict_evaluate(LightFMWrap(random_state=SEED), metrics, 'LightFM')
metrics.results.sort_values('NDCG@5', ascending=False)

In [None]:
ALSWrap, 
ADMMSLIM, 
ItemKNN,
LightFMWrap, 
MultVAE, 
NeuroMF, 
SLIM, 
PopRec, 
RandomRec,
UCB,
UserPopRec,
Wilson, 
Word2VecRec

In [None]:
recs.show(10)

## 6 Compare RePlay models with others
To easily evaluate recommendations obtained from other sources, read and pass these recommendations to ``Experiment``

In [None]:
import pyspark.sql.functions as sf

In [None]:
# metrics.add_result("my_model", recs.withColumn("relevance", sf.rand()))
# metrics.results.sort_values("NDCG@5", ascending=False)