# Spark ALS Implementation
Will Bennett & James Meredith

Inspired by [example spark docs](https://spark.apache.org/docs/latest/ml-collaborative-filtering.html)

We've taken the basic example and added pipelining and gridsearch.

In [8]:
import os
exec(open(os.path.join(os.environ["SPARK_HOME"], 'python/pyspark/shell.py')).read())

Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 3.4.0
      /_/

Using Python version 3.8.5 (default, Sep  4 2020 02:22:02)
Spark context Web UI available at http://192.168.4.22:4040
Spark context available as 'sc' (master = local[*], app id = local-1687365561172).
SparkSession available as 'spark'.


In [9]:
# Imports
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.recommendation import ALS

We start by creating a spark session

In [10]:
spark = SparkSession\
    .builder\
    .appName("ALSExample")\
    .getOrCreate()

ratings = spark.read.options(header=True, inferSchema=True).csv("ml-latest-small/ratings.csv")
(training, test) = ratings.randomSplit([0.8, 0.2])

Next we will create a base model using the parameters in the example

In [11]:
als = ALS(maxIter=5, regParam=0.01, userCol="userId", itemCol="movieId", ratingCol="rating",
          coldStartStrategy="drop")
model = als.fit(training)

predictions = model.transform(test)
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating",
                                predictionCol="prediction")
rmse = evaluator.evaluate(predictions)
print("Root-mean-square error = " + str(rmse))

Root-mean-square error = 1.1002666310372953


That is fairly high, for a ranking system of 1-5 it means our system is off by an average of over 1

Next let's set up a pipeline and gridsearch to improve our model.

We will gridsearch `maxIter`, `rank`, and `alpha`

In [12]:
# Set up Alternating Least Squares
als = ALS(maxIter=20, 
          userCol="userId", 
          itemCol="movieId", 
          rank=5,
          ratingCol="rating",
          coldStartStrategy="drop",
          alpha = 1.2,
          seed=42)

# Create Pipeline
pipeline = Pipeline(stages=[als])

# Create grid for gridsearch
paramGrid = ParamGridBuilder() \
    .addGrid(als.maxIter, [5, 10, 20]) \
    .addGrid(als.rank, [5, 10, 15]) \
    .addGrid(als.alpha, [0.8, 1, 1.2]) \
    .build()

# Create our evaluator
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating",
                                predictionCol="prediction")

# Set up our cross validation
crossval = CrossValidator(estimator=pipeline,
                          estimatorParamMaps=paramGrid,
                          evaluator=evaluator,
                          numFolds=5)

# Cross validate our model on training data
cvModel = crossval.fit(training)

# Generate our predictions for test set
predictions = cvModel.transform(test)

# Calculate RMSE for test set
rmse = evaluator.evaluate(predictions)
print("Root-mean-square error = " + str(rmse))

Root-mean-square error = 0.8708752579886659


That's definitely better, let's see what the best values were

In [17]:
best_maxiter = cvModel.bestModel.stages[-1]._java_obj.parent().getMaxIter()
best_rank = cvModel.bestModel.stages[-1]._java_obj.parent().getRank()
best_alpha = cvModel.bestModel.stages[-1]._java_obj.parent().getAlpha()

print(f'Best Max Iter: {best_maxiter}')
print(f'Best rank: {best_rank}')
print(f'Best alpha: {best_alpha}')

Best Max Iter: 20
Best rank: 5
Best alpha: 0.8


Great! Now let's create a new model based on those parameters

In [20]:
# Create a model with the best parameters
als = ALS(maxIter=20, 
          userCol="userId", 
          itemCol="movieId", 
          ratingCol="rating",
          coldStartStrategy="drop",
          rank=5,
          alpha=0.8
         )
best_model = als.fit(training)

predictions = best_model.transform(test)
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating",
                                predictionCol="prediction")
rmse = evaluator.evaluate(predictions)
print("Root-mean-square error = " + str(rmse))

Root-mean-square error = 0.8718633113662384


We can take that model and train it on all our data

In [21]:
# Train our model on all data
best_model = als.fit(ratings)

Now that our model is ready to go let's generate some recommendations

In [22]:
# Generate top 10 movie recommendations for each user
userRecs = best_model.recommendForAllUsers(10)
# Generate top 10 user recommendations for each movie
movieRecs = best_model.recommendForAllItems(10)

# Generate top 10 movie recommendations for a specified set of users
users = ratings.select(als.getUserCol()).distinct().limit(3)
userSubsetRecs = best_model.recommendForUserSubset(users, 10)
# Generate top 10 user recommendations for a specified set of movies
movies = ratings.select(als.getItemCol()).distinct().limit(3)
movieSubSetRecs = best_model.recommendForItemSubset(movies, 10)
# $example off$
userRecs.show()
movieRecs.show()
userSubsetRecs.show()
movieSubSetRecs.show()

+------+--------------------+
|userId|     recommendations|
+------+--------------------+
|     1|[{3379, 6.0570064...|
|     2|[{67618, 4.982239...|
|     3|[{74754, 6.236243...|
|     4|[{5222, 5.759787}...|
|     5|[{132333, 5.08985...|
|     6|[{26171, 6.024038...|
|     7|[{68945, 5.338016...|
|     8|[{141718, 5.18157...|
|     9|[{26171, 5.464975...|
|    10|[{32892, 5.774617...|
|    11|[{3925, 5.6317244...|
|    12|[{26171, 6.516180...|
|    13|[{60943, 5.597091...|
|    14|[{141718, 5.28826...|
|    15|[{141718, 4.92957...|
|    16|[{3379, 4.783925}...|
|    17|[{3379, 5.426992}...|
|    18|[{3379, 5.149359}...|
|    19|[{3379, 4.182879}...|
|    20|[{26171, 5.698621...|
+------+--------------------+
only showing top 20 rows

+-------+--------------------+
|movieId|     recommendations|
+-------+--------------------+
|      1|[{53, 5.1614575},...|
|     12|[{537, 4.3878174}...|
|     13|[{175, 4.162911},...|
|     22|[{53, 4.817724}, ...|
|     26|[{53, 4.733883}, ...|
|     

In [23]:
spark.stop()

## Conclusion

ALS using Spark is an incredibly efficient way to setup a recommendation system at scale. 
It includes helpful functions to generate recommendations including `recommendForAllUsers` and `recommendForAllItems`.
The output can then be easily taken and used in a web application.

While the RMSE of 0.872 is not as good as SVD using surprise the ease of implementation and ability to scale make up for it.