# E2E solution of recommendation system

## Introduction
In this notebook, we'll demonstrate data engineering and data science work flow with an e2e sample. The scenario is to build a recommender for online book recommendation.


There are different types of recommendation algorithms, we'll use a model based collaborative filtering algorithm named Alternating Least Squares (ALS) matrix factorization in this notebook.
<img src="https://negustpublicblob.blob.core.windows.net/public/recommenders.png" style="width:600px;"/>

ALS attempts to estimate the ratings matrix R as the product of two lower-rank matrices, X and Y, i.e. X * Yt = R. Typically these approximations are called ‘factor’ matrices. 
The general approach is iterative. During each iteration, one of the factor matrices is held constant, while the other is solved for using least squares. The newly-solved factor matrix is 
then held constant while solving for the other factor matrix.

<img src="https://negustpublicblob.blob.core.windows.net/public/Matrixfactor.svg" style="width:600px;"/>


## Step 1: Load the Data

```
+--- Book-Recommendation-Dataset
|   +--- Books.csv
|   +--- Ratings.csv
|   +--- Users.csv

==> Books.csv <==
ISBN,Book-Title,Book-Author,Year-Of-Publication,Publisher,Image-URL-S,Image-URL-M,Image-URL-L
0195153448,Classical Mythology,Mark P. O. Morford,2002,Oxford University Press,http://images.amazon.com/images/P/0195153448.01.THUMBZZZ.jpg,http://images.amazon.com/images/P/0195153448.01.MZZZZZZZ.jpg,http://images.amazon.com/images/P/0195153448.01.LZZZZZZZ.jpg
0002005018,Clara Callan,Richard Bruce Wright,2001,HarperFlamingo Canada,http://images.amazon.com/images/P/0002005018.01.THUMBZZZ.jpg,http://images.amazon.com/images/P/0002005018.01.MZZZZZZZ.jpg,http://images.amazon.com/images/P/0002005018.01.LZZZZZZZ.jpg

==> Ratings.csv <==
User-ID,ISBN,Book-Rating
276725,034545104X,0
276726,0155061224,5

==> Users.csv <==
User-ID,Location,Age
1,"nyc, new york, usa",
2,"stockton, california, usa",18.0
```

**By defining below parameters, we can apply this notebook on different datasets easily.**

In [None]:
IS_CUSTOM_DATA = False  # if True, dataset has to be uploaded manually

USER_ID_COL = "User-ID"  # must not be '_user_id' for this notebook to run successfully
ITEM_ID_COL = "ISBN"  # must not be '_item_id' for this notebook to run successfully
ITEM_INFO_COL = (
    "Book-Title"  # must not be '_item_info' for this notebook to run successfully
)
RATING_COL = (
    "Book-Rating"  # must not be '_rating' for this notebook to run successfully
)
IS_SAMPLE = True  # if True, use only <SAMPLE_ROWS> rows of data for training, otherwise use all data
SAMPLE_ROWS = 5000  # if IS_SAMPLE is True, use only this number of rows for training

DATA_FOLDER = "Files/book-recommendation/"  # folder containing the dataset
ITEMS_FILE = "Books.csv"  # file containing the items information
USERS_FILE = "Users.csv"  # file containing the users information
RATINGS_FILE = "Ratings.csv"  # file containing the ratings information

### Download dataset and Upload to lakehouse

In [None]:
if not IS_CUSTOM_DATA:
    # Download demo data files into lakehouse if not exist
    remote_url = "https://synapseaisolutionsa.blob.core.windows.net/public/Book-Recommendation-Dataset"
    file_list = ["Books.csv", "Ratings.csv", "Users.csv"]

    # For this demo, we first check if the dataset files are already prepared in the default lakehouse. If not, we'll download the dataset.
    import os
    import requests

    if not os.path.exists("/lakehouse/default"):
        # ask user to add a lakehouse if no default lakehouse added to the notebook.
        # a new notebook will not link to any lakehouse by default.
        raise FileNotFoundError(
            "Default lakehouse not found, please add a lakehouse for the notebook."
        )
    else:
        # check if the needed files are already in the lakehouse, try to download if not.
        # raise an error if downloading failed.
        os.makedirs(f"/lakehouse/default/{DATA_FOLDER}/raw/", exist_ok=True)
        for fname in file_list:
            if not os.path.exists(f"/lakehouse/default/{DATA_FOLDER}/raw/{fname}"):
                try:
                    r = requests.get(f"{remote_url}/{fname}", timeout=30)
                    with open(
                        f"/lakehouse/default/{DATA_FOLDER}/raw/{fname}", "wb"
                    ) as f:
                        f.write(r.content)
                    print(f"Downloaded {fname} into {DATA_FOLDER}/raw/.")
                except Exception as e:
                    print(f"Failed on downloading {fname}, error message: {e}")
            else:
                print(f"{fname} already exists in {DATA_FOLDER}/raw/.")

In [None]:
# to record the notebook running time
import time

ts = time.time()

### Read data from lakehouse

In [None]:
df_items = (
    spark.read.option("header", True)
    .option("inferSchema", True)
    .csv(f"{DATA_FOLDER}/raw/{ITEMS_FILE}")
)

df_ratings = (
    spark.read.option("header", True)
    .option("inferSchema", True)
    .csv(f"{DATA_FOLDER}/raw/{RATINGS_FILE}")
)

df_users = (
    spark.read.option("header", True)
    .option("inferSchema", True)
    .csv(f"{DATA_FOLDER}/raw/{USERS_FILE}")
)

## Step 2. Exploratory Data Analysis

### Display Raw Data

We can explore the raw data with `display`, do some basic statistcs or even show chart views.

In [None]:
import pyspark.sql.functions as F

In [None]:
display(df_items, summary=True)

Add `item_id` column for later usage. `item_id` must be integer.

Using .coalesce(1) puts the Dataframe in one partition, and so have monotonically increasing and successive index column.

In [None]:
df_items = df_items.coalesce(1).withColumn("_item_id", F.monotonically_increasing_id())

In [None]:
display(df_items.sort(F.col("_item_id").desc()))

In [None]:
display(df_users, summary=True)

There is a missing value in `User-ID`, we'll drop the row with missing value.

In [None]:
df_users = df_users.dropna(subset=(USER_ID_COL))

In [None]:
display(df_users, summary=True)

Add `user_id` column for later usage. `user_id` must be integer. In this book dataset, 
we already have `User-ID` column which is integer. But we still add `user_id` column 
for compatibility to different datasets, making this notebook more robust.

In [None]:
df_users = df_users.coalesce(1).withColumn("_user_id", F.monotonically_increasing_id())

In [None]:
display(df_users.sort(F.col("_user_id").desc()))

In [None]:
display(df_ratings, summary=True)

In [None]:
ratings = df_ratings.select(RATING_COL).distinct().rdd.flatMap(lambda x: x).collect()
print(ratings)

### Merge data
Merge raw dataframes into one dataframe for more comprehensive analysis.

In [None]:
from pyspark.sql import functions as F

In [None]:
df_tmp = df_ratings.join(df_users, USER_ID_COL, "inner")
df_all = df_tmp.join(df_items, ITEM_ID_COL, "inner")
df_all_columns = df_all.columns
df_all_columns.remove("_user_id")
df_all_columns.remove("_item_id")
df_all_columns.remove(RATING_COL)
df_all = df_all.select(["_user_id", "_item_id", RATING_COL] + df_all_columns)
df_all = df_all.withColumn("id", F.monotonically_increasing_id())

display(df_all)

In [None]:
print(f"Total Users: {df_users.select('_user_id').distinct().count()}")
print(f"Total Items: {df_items.select('_item_id').distinct().count()}")
print(f"Total User-Item Interactions: {df_all.count()}")

### Compute and Plot most popular items

In [None]:
# import libs

import pandas as pd  # dataframes
import matplotlib.pyplot as plt  # plotting
import seaborn as sns  # plotting

color = sns.color_palette()  # adjusting plotting style
import warnings

warnings.filterwarnings("ignore")  # silence annoying warnings

In [None]:
# compute top popular products
df_top_items = (
    df_all.groupby(["_item_id"])
    .count()
    .join(df_items, "_item_id", "inner")
    .sort(["count"], ascending=[0])
)

In [None]:
# find top <topn> popular items
topn = 10
pd_top_items = df_top_items.limit(topn).toPandas()
pd_top_items.head()

Top `<topn>` popular items, which can be used for **recommendation section "Popular"** or **"Top purchased"**.

In [None]:
# Plot top <topn> items
f, ax = plt.subplots(figsize=(12, 10))
plt.xticks(rotation="vertical")
sns.barplot(x=ITEM_INFO_COL, y="count", data=pd_top_items)
plt.ylabel("Number of Ratings for the Item")
plt.xlabel("Item Name")
plt.show()

## Step 3. Model development and deploy
We have explored the dataset, got some insights from the exploratory data analysis and even achieved most popular recommendation.

Next, we'll develop the ALS model as mentioned in the Introduction section for more personalized recommendation.

### Prepare training and testing data

In [None]:
if IS_SAMPLE:
    # need to sort by '_user_id' before limit, so as to make sure ALS work normally.
    # if train and test dataset have no common _user_id, ALS will fail
    _df_all = df_all.sort("_user_id").limit(SAMPLE_ROWS)

fractions_train = {0: 0}
fractions_test = {0: 0}
for i in ratings:
    if i == 0:
        continue
    fractions_train[i] = 0.8
    fractions_test[i] = 1
train = _df_all.sampleBy(RATING_COL, fractions=fractions_train)
test = _df_all.join(train, on="id", how="leftanti").sampleBy(  # not in
    RATING_COL, fractions=fractions_test
)

In [None]:
# cast column into the correct types
from pyspark.sql.functions import col
from pyspark.sql.types import IntegerType

train = (
    train.withColumn("_item_id", col("_item_id").cast(IntegerType()))
    .withColumn("_user_id", col("_user_id").cast(IntegerType()))
    .withColumn(RATING_COL, col(RATING_COL).cast(IntegerType()))
)

test = (
    test.withColumn("_item_id", col("_item_id").cast(IntegerType()))
    .withColumn("_user_id", col("_user_id").cast(IntegerType()))
    .withColumn(RATING_COL, col(RATING_COL).cast(IntegerType()))
)

In [None]:
# compute the sparsity of the dataset
def get_mat_sparsity(ratings):
    # Count the total number of ratings in the dataset
    count_nonzero = ratings.select(RATING_COL).count()
    print(f"Number of rows: {count_nonzero}")

    # Count the number of distinct user_id and distinct product_id
    total_elements = (
        ratings.select("_user_id").distinct().count()
        * ratings.select("_item_id").distinct().count()
    )

    # Divide the numerator by the denominator
    sparsity = (1.0 - (count_nonzero * 1.0) / total_elements) * 100
    print("The ratings dataframe is ", "%.4f" % sparsity + "% sparse.")


get_mat_sparsity(_df_all)

In [None]:
# check the id range
# ALS only supports values in Integer range
print(f'max user_id: {_df_all.select("_user_id").rdd.max()[0]}')
print(f'max item_id: {_df_all.select("_item_id").rdd.max()[0]}')

### Define the Model

With our data in place, we can now define the recommendation model. We'll apply Alternating Least Squares (ALS) 
model in this notebook. 

Spark ML provides a convenient API in building the model. However, the model is not good enough at 
handling problems like data sparsity and cold start. We'll combine cross validation and auto hyperparameter tuning 
to improve the performance of the model.

In [None]:
# Specify training parameters
num_epochs = 1
rank_size_list = [64, 128]
reg_param_list = [0.01, 0.1]
model_tuning_method = "TrainValidationSplit"  # TrainValidationSplit or CrossValidator

In [None]:
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator, TrainValidationSplit

# Build the recommendation model using ALS on the training data
# Note we set cold start strategy to 'drop' to ensure we don't get NaN evaluation metrics
als = ALS(
    maxIter=num_epochs,
    userCol="_user_id",
    itemCol="_item_id",
    ratingCol=RATING_COL,
    coldStartStrategy="drop",
    implicitPrefs=False,
    nonnegative=True,
)

### Model training and hyper-tunning

In [None]:
# Define tuning parameters
param_grid = (
    ParamGridBuilder()
    .addGrid(als.rank, rank_size_list)
    .addGrid(als.regParam, reg_param_list)
    .build()
)

print("Number of models to be tested: ", len(param_grid))

In [None]:
# Define evaluator, set rmse as loss
evaluator = RegressionEvaluator(
    metricName="rmse", labelCol=RATING_COL, predictionCol="prediction"
)

In [None]:
# Build cross validation using CrossValidator and TrainValidationSplit
cv = CrossValidator(
    estimator=als, estimatorParamMaps=param_grid, evaluator=evaluator, numFolds=5
)

tvs = TrainValidationSplit(
    estimator=als,
    estimatorParamMaps=param_grid,
    evaluator=evaluator,
    # 80% of the data will be used for training, 20% for validation.
    trainRatio=0.8,
)

In [None]:
if model_tuning_method == "CrossValidator":
    # Fit cross validator to the 'train' dataset
    # As we set numFolds to 5, it will take 5 times of time on training comparing to TrainValidationSplit
    models = cv.fit(train)
elif model_tuning_method == "TrainValidationSplit":
    # Fit train validator
    models = tvs.fit(train)
else:
    models = None

In [None]:
import numpy as np
import pandas as pd

# Extract best model
model = models.bestModel

if model_tuning_method == "CrossValidator":
    best_params = models.getEstimatorParamMaps()[np.argmin(models.avgMetrics)]
    pd_metrics = pd.DataFrame(data={"Metric": models.avgMetrics})
elif model_tuning_method == "TrainValidationSplit":
    best_params = models.getEstimatorParamMaps()[np.argmin(models.validationMetrics)]
    pd_metrics = pd.DataFrame(data={"Metric": models.validationMetrics})
else:
    best_params = None
    pd_metrics = None

if best_params:
    print("** Best Model **")
    for k in best_params:
        print(f"{k.name}: {best_params[k]}")

    # collect metrics
    tmp_list = []
    for params in models.getEstimatorParamMaps():
        tmp = ""
        for k in params:
            tmp += k.name + "=" + str(params[k]) + " "
        tmp_list.append(tmp)

    pd_metrics["Params"] = tmp_list

In [None]:
# Plot metrics of different submodels
f, ax = plt.subplots(figsize=(12, 5))
sns.lineplot(x=pd_metrics["Params"], y=pd_metrics["Metric"])
plt.ylabel("Loss: RMSE")
plt.xlabel("Params")
plt.title("Loss of SubModels")
plt.show()

### Model Evaluation

We now have the best model, then we can do more evaluations on the test data. 
If we trained the model well, it should have high metrics on both train and test datasets.
If we see only good metrics on train, then the model is overfitted, we may need to increase training data size.
If we see bad metrics on both datasets, then the model is not defined well, 
we may need to change model architecture or at least fine tune hyper parameters.

Evaluation on trainig data

In [None]:
# get the prediction
predictions = model.transform(train)
predictions = predictions.withColumn(
    "prediction", predictions.prediction.cast("double")
)
predictions.select("_user_id", "_item_id", RATING_COL, "prediction").limit(10).show()

# initialize the binary evaluator
evaluator = RegressionEvaluator(predictionCol="prediction", labelCol=RATING_COL)
print(f'RMSE score = {evaluator.evaluate(predictions, {evaluator.metricName: "rmse"})}')
print(f'MAE score = {evaluator.evaluate(predictions, {evaluator.metricName: "mae"})}')
print(f'R2 score = {evaluator.evaluate(predictions, {evaluator.metricName: "r2"})}')
print(
    f'Explained variance = {evaluator.evaluate(predictions, {evaluator.metricName: "var"})}'
)

Evaluation on test data.

If R2 is negative, it means the trained model is actually worse than a horizontal straight line.

In [None]:
# get the prediction
predictions = model.transform(test)
predictions = predictions.withColumn(
    "prediction", predictions.prediction.cast("double")
)
predictions.select("_user_id", "_item_id", RATING_COL, "prediction").limit(10).show()

# initialize the binary evaluator
evaluator = RegressionEvaluator(predictionCol="prediction", labelCol=RATING_COL)
print(f'RMSE score = {evaluator.evaluate(predictions, {evaluator.metricName: "rmse"})}')
print(f'MAE score = {evaluator.evaluate(predictions, {evaluator.metricName: "mae"})}')
print(f'R2 score = {evaluator.evaluate(predictions, {evaluator.metricName: "r2"})}')
print(
    f'Explained variance = {evaluator.evaluate(predictions, {evaluator.metricName: "var"})}'
)

### Save Model
Now we get a pretty good model, we can save it for later use.

In [None]:
import time

saved_model_path = f"{DATA_FOLDER}/als_model/{int(time.time())}"
model.save(saved_model_path)
print(f"model is saved in: {saved_model_path}")

## Step 4. Save Prediction Results

### Model Deploy and Prediction

Load Model back

In [None]:
# from pyspark.ml.recommendation import ALS
# saved_model_path = '{DATA_FOLDER}/als_model/'
# model1 = ALS.load(saved_model_path)

#### Offline Recommendation
Recommend 10 items for each user

##### Save offline recommendation results

In [None]:
# Generate top 10 product recommendations for each user
userRecs = model.recommendForAllUsers(10)

In [None]:
# convert recommendations into interpretable format
userRecs = (
    userRecs.withColumn("rec_exp", F.explode("recommendations"))
    .select("_user_id", F.col("rec_exp._item_id"), F.col("rec_exp.rating"))
    .join(df_items.select(["_item_id", "Book-Title"]), on="_item_id")
)
userRecs.limit(10).show()

In [None]:
# code for saving userRecs into lakehouse
userRecs.write.mode("overwrite").parquet(f"{DATA_FOLDER}/predictions/userRecs")

In [None]:
print(f"Full run cost {int(time.time() - ts)} seconds.")