By Mike Silva

## Introduction

The goal of this project is to practice beginning to work with a distributed recommender system.  For this project I will be using data scrapped from BoardGameGeek.com (BGG).  This is part two of the project.  Previously I have built and tested the execution time for building an ALS based model on my laptop.  This is duplicating this but on a Spark platform.

### About the BGG Dataset
The BoardGameGeek dataset was collected by myself by scrapping data from the API that forms the backend of [BoardGameGeek's website](https://boardgamegeek.com/). Data scrapping in ongoing but this particular data set has over 1.9 million ratings (implicit and explicit) for about 88,000 games by 219,000 users. I have previously exported the ratings from the SQLite database and uploaded them to the [Databricks Community Platform](https://community.cloud.databricks.com/).

## Data Wrangling

This is the same process used in the first part of this project.  It will fill in the implicit ratings with the explicit values using the same distribution.  For a more through explanation of the method I would refer the reader to [Part A's notebook on GitHub](https://github.com/mikeasilva/CUNY-SPS/blob/master/DATA612/Project5a.ipynb)

In [2]:
import matplotlib.pyplot as plt
import pandas as pd
import numpy as np
import random
import seaborn as sns
import time

from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.ml.tuning import TrainValidationSplit, ParamGridBuilder

my_seed = 42
random.seed(my_seed)
np.random.seed(my_seed)

# Read in the raw data
df = spark.read.format("csv").option("inferSchema", True).option("header", True).option("sep", ",").load("/FileStore/tables/bgg_ratings.csv")
df = df.select("*").toPandas()

# Wrangle the ratings
df["rounded_rating"] = df["rating"].round().astype(int)

# Extract explicit ratings
is_an_explicit_rating = (df['rating_tstamp'].notnull()) | (df['rounded_rating'] > 0)
is_an_explicit_rating = df['rounded_rating'] > 0
explicit_rating = df[is_an_explicit_rating]

# Get the values to fill in the implicit ratings
explicit_rating = explicit_rating.assign(n= 1)
fill_with = explicit_rating[explicit_rating.rounded_rating > 6].groupby("rounded_rating")[["n"]].agg("sum").reset_index()

# Get the probabilities to fill the ratings
temp = dict()
total = 0

for item in fill_with.to_dict("records"):
    temp[item["rounded_rating"]] = item["n"]
    total += item["n"]
choices = list(temp.keys())
probs = [val / total for rating, val in temp.items()]

# Fill implicit ratings
implicit_rating = df[~is_an_explicit_rating]
implicit_rating = implicit_rating.assign(rounded_rating = np.random.choice(choices, len(implicit_rating.index), p=probs))

# Combine the explicit and implicit data frames
cols = ["user_id", "item_id", "rounded_rating"]
bgg_ratings = pd.concat([explicit_rating[cols], implicit_rating[cols]])

# Remove sparse items
counts_by_item = bgg_ratings.assign(n = 1).groupby("item_id")[["n"]].agg("sum").reset_index()
item_filter = counts_by_item[counts_by_item.n >= 25]["item_id"].tolist()
bgg_ratings = bgg_ratings[bgg_ratings.item_id.isin(item_filter)].reset_index(drop=True)

# Remove sparse users
counts_by_user = bgg_ratings.assign(n= 1).groupby("user_id")[["n"]].agg("sum").reset_index()
user_filter = counts_by_user[counts_by_user.n >= 10]["user_id"].tolist()
bgg_ratings = bgg_ratings[bgg_ratings.user_id.isin(user_filter)].reset_index(drop=True)

bgg_ratings = sqlContext.createDataFrame(bgg_ratings)
bgg_ratings.show()

Now that I have the data in Spark, I will replicate the same process I used on my local laptop.  I will train the model using a vanilla ALS algorythm.  I will repeat the process 5 times and compute an average off of those five runs on all evaluation metrics.

In [4]:
def spark_k_fold_als(algo, k, data):
  total_rmse = 0
  train_time = 0
  test_time = 0
  rmse_time = 0
  for i in range(0, k):
    (training_set, test_set) = data.randomSplit([0.8, 0.2])
    # Train the models on the training set
    train_start_time = time.time()
    als_model = algo.fit(training_set)
    train_time += time.time() - train_start_time
    # Get the predictions on the test set
    test_start_time = time.time()
    algo_predictions = als_model.transform(test_set)
    test_time += time.time() - test_start_time
    # Add in the RMSE
    rmse_start_time = time.time()
    evaluator = RegressionEvaluator(metricName="rmse", labelCol="rounded_rating", predictionCol="prediction")
    total_rmse += evaluator.evaluate(algo_predictions)
    rmse_time += time.time() - test_start_time
  average_rmse = total_rmse / k
  average_train_time = train_time / k
  average_test_time = test_time / k
  average_rmse_time = rmse_time / k
  return (average_rmse, average_train_time, average_test_time, average_rmse_time)


vanilla_als = ALS(userCol = "user_id", itemCol="item_id", ratingCol="rounded_rating", coldStartStrategy="drop", nonnegative=True)
  
average_rmse, average_train_time, average_test_time, average_rmse_time = spark_k_fold_als(vanilla_als, 5, bgg_ratings)
print("Training Model: %s seconds" % (average_train_time))
print("Predictions: %s seconds" % (average_test_time))
print("Evaluation: %s seconds" % (average_rmse_time))
print("RMSE: %s" % (average_rmse))

## Conclusion

I have preformed a 5 fold cross validation of a ALS recommender system model on both my laptop and databricks community edition.  I gathered the RMSE and execution times for the 3 stages of the process (model training, test set predictions, prediction evaluation).  Here's the results:

| Metric               | My Laptop          | Databricks           |
|----------------------|--------------------|----------------------|
| RMSE                 | 1.297831628696796  | 1.4212333949235902   |
| Training (seconds)   | 3.6788469791412353 | 56.30785298347473    |
| Prediction (seconds) | 1.3248661518096925 | 0.035971784591674806 |
| Evaluation (seconds) | 1.493586826324463  | 66.98323187828063    |
| Total Execution Time | 31.5 seconds       | 10.28 minutes        |

There are some aspects that my computer out preformed Spark, most notably in overall execution time, training, and evaluation.  However Spark outpreformed the prediction step.  The prediction is lightning fast.

It is interesting to me that there is that much of a difference in the RMSE.  I thought it would have been less.

I could see that this difference would be increasingly important as the data set gets larger.  If a new set of data (i.e. a user's session) flows in, getting quick predictions would be paramount.  There is also a point where the data will be too large for my laptop's resources.  That is definately when I would need to move to the distributed platform.