# Introduction

One common machine learning task is to take user ratings for some product and then try to recommend new products. A common way to do this is by matrix decomposition of a user/product rating matrix. This notebook uses the ratings from the Grouplens MovieLens dataset (https://grouplens.org/datasets/movielens/), which I downloaded from Kaggle at https://www.kaggle.com/rounakbanik/the-movies-dataset. Here, I use the small dataset, which has approximately 100,000 ratings from 700 users. There is a much larger dataset with 26,000,000 ratings available that will likely give much stronger results. I'm just using the smaller one so that things will run quickly. 

A different approach, such as the Restricted Boltzmann Machine recommender described by Salakhutdinov, Mnih, and Hinton (http://www.machinelearning.org/proceedings/icml2007/papers/407.pdf) might be interesting to test out on the full dataset. This method will tend to have many fewer parameters and will scale better for very large datasets. However, I don't think RBMs are included in the most common neural net frameworks, so you'd have to do the implementation on your own or find something on Github. I may try my own C++ implementation at some point.

Returning to the point on scaling, if we use 10 latent features, we will fit for (700 users x 10 features) + (10 features x 9000 movies) = 97,000 matrix elements. For the full dataset, the number is (270,000 users x 10 features) + (10 features x 45,000 movies) = 3.15M elements. There are maybe some interesting implications of these numbers. The smaller dataset has many more movies than users but the opposite is true for the full dataset. We might expect that the latent features could be very different due to this.

To create the recommender system, I will use the Spark machine learning library, which has an implementation of an alternating least squares recommender. This notebook includes the following tools techniques:
  - PySpark
  - Basic mapreduce operations
  - Pipeline
  - Alternating Least Squares
  - k-Fold Cross Validation
  - Training-test split
  - Hyperparameter grid search
  - Recommendations of items for different users
  - Recommendations of users for different items

First, we should start Spark. I'm just using 2 cores, but you should increase this to speed things up. I'll also start an SQLContext since I'll be using Spark DataFrames later.

In [1]:
from pyspark import SparkContext
from pyspark.sql import SQLContext
import re

sc = SparkContext('local[2]','Tutorial')
sqlContext = SQLContext(sc)

Now import the remaining modules that we'll need

In [2]:
import numpy as np
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.sql import Row
from pyspark.ml.tuning import CrossValidator
from pyspark.ml.tuning import ParamGridBuilder
from pyspark.ml import Pipeline
from pyspark.sql import functions


# Reading the data

I have the ratings file saved in a text format in my home directory on my Hadoop server. I'll load it as an RDD first. Then, I need to do some cleaning before I can convert it into a Row of numeric types.

In [3]:
rdd = sc.textFile('ratings_small.csv')
def toList(x):
    x = x.split(',')
    return (x[0],[x[1],x[2]])

def has_data(x):
    if re.match(r'[A-Za-z]',x[0]):
        return False
    return True

def toNumeric(x):
    return Row(userId=int(x[0]),movieId=int(x[1][0]),rating=float(x[1][1]))


Before I create a recommender system, I'll look a bit at my data. First, I need to use the functions I created above to get a new RDD with the data that I want. Each entry is now a row with a movie ID, a rating, and a user ID. Everything is numeric here, so I don't know anything else about the users or the movies. There is additional information in other files, but I am not trying to use that here.

In [4]:
users = rdd.map(toList).filter(has_data).map(toNumeric)
users.take(5)

[Row(movieId=31, rating=2.5, userId=1),
 Row(movieId=1029, rating=3.0, userId=1),
 Row(movieId=1061, rating=3.0, userId=1),
 Row(movieId=1129, rating=2.0, userId=1),
 Row(movieId=1172, rating=4.0, userId=1)]

In [5]:
rate = users.map(lambda x : x[1] )
rate.distinct().sortBy(keyfunc=lambda x : x).collect()

[0.5, 1.0, 1.5, 2.0, 2.5, 3.0, 3.5, 4.0, 4.5, 5.0]

There are 10 distinct ratings, from 0.5 to 5.0

In [6]:
user = users.map(lambda x : x[2])
user.distinct().count()

671

There are 671 distinct users in the dataset.

In [7]:
movies = users.map(lambda x : x[0] )
movies.distinct().count()

9066

There are ratings for 9066 distinct movies.

In [8]:
users.count()

100004

There are also just over 100,000 ratings, so an average of 11 ratings per movie and 149 ratings per user. I want to do some aggregations, so I'll convert the dataset into a DataFrame now.

In [6]:
ratings = sqlContext.createDataFrame(users)


In [10]:
gbm = ratings.groupBy(ratings.movieId).count()
avg = gbm.agg(functions.avg('count').alias('avg')).collect()[0].avg
maxN = gbm.agg(functions.max('count').alias('maxN')).collect()[0].maxN
minN = gbm.agg(functions.min('count').alias('minN')).collect()[0].minN
std = gbm.agg(functions.stddev('count').alias('std')).collect()[0].std

print('# of Ratings per Movie')
print('Mean: {:0.4}'.format(avg))
print('Min: {}'.format(minN))
print('Max: {}'.format(maxN))
print('Std. Dev: {:0.4}'.format(std))

# of Ratings per Movie
Mean: 11.03
Min: 1
Max: 341
Std. Dev: 24.05


In [11]:
gbu = ratings.groupBy(ratings.userId).count()
avg = gbu.agg(functions.avg('count').alias('avg')).collect()[0].avg
maxN = gbu.agg(functions.max('count').alias('maxN')).collect()[0].maxN
minN = gbu.agg(functions.min('count').alias('minN')).collect()[0].minN
std = gbu.agg(functions.stddev('count').alias('std')).collect()[0].std

print('# of Ratings per User')
print('Mean: {:0.4}'.format(avg))
print('Min: {}'.format(minN))
print('Max: {}'.format(maxN))
print('Std. Dev: {:0.4}'.format(std))

# of Ratings per User
Mean: 149.0
Min: 20
Max: 2391
Std. Dev: 231.2


In [12]:
gbr = ratings.groupBy(ratings.rating).count() \
             .sort(functions.col('rating')).collect()
mean = 0
tot = 0
std = 0
for rat in gbr:
    print('Rating: {}, Count: {}'.format(rat.rating,rat['count']))
    c = rat['count']
    mean += c * rat.rating
    std += c * rat.rating*rat.rating
    tot += c
mean /= tot
std /= tot
std = np.sqrt(tot/(tot-1))* np.sqrt(std - mean*mean)
print('Mean Rating: {:0.3}'.format(mean))
print('Std. Dev: {:0.3}'.format(std))

Rating: 0.5, Count: 1101
Rating: 1.0, Count: 3326
Rating: 1.5, Count: 1687
Rating: 2.0, Count: 7271
Rating: 2.5, Count: 4449
Rating: 3.0, Count: 20064
Rating: 3.5, Count: 10538
Rating: 4.0, Count: 28750
Rating: 4.5, Count: 7723
Rating: 5.0, Count: 15095
Mean Rating: 3.54
Std. Dev: 1.06


From these aggregations we see several things.

First, the counts for both movies and users are highly right-skewed. Movies have as few as a single rating or as many as several hundred. When we run cross validation or use a test set, we expect that the movies with very few ratings will not generalize well. In many cases, they will not even be used in the training. Users have as few as 20 or as many as thousands of ratings, so at least each user should have several movies in a random split.

The ratings data shows that the ratings have a fairly high average, around 3.5 of 5, and a standard deviation of 1.06. So, if we just guess an average of 3.54 for all ratings, we'd expect an RMSE of 1.06. We have to beat this if our recommender system is doing anything useful.

You can also see that users are much more likely to give integer ratings than half-integer ratings, and there are few very low ratings. Movie watchers tend to only pick movies that they want to see, so it may not be surprising that there are so few ratings where users truly hated a movie.

# Building a Recommender System

The recommender system that I use here is the Alternating Least Squares implementation in the Spark ML package. Suppose we have a matrix $\mathbf{R}$ where $R_{ij}$ corresponds to a rating of movie $i$ from user $j$. Then, we can attempt to decompose $\mathbf{R}$ as:

$$ \mathbf{R} = \mathbf{M}\mathbf{U} $$

where $\mathbf{M}$ is an $m$ by $k$ matrix and $\mathbf{U}$ is a $k$ by $n$ matrix. Here, $m$ is the number of movies, $n$ is the number of users, and $k$ is a number of latent features that we wish to learn.

If the number of latent features is large, we can get an exact representation of $\mathbf{R}$. However, this would not be very useful. $\mathbf{R}$ is a sparse matrix in most cases and we wish to guess values for the unfilled elements. What we instead want to do is use a small number of latent features so that the number of elements in $\mathbf{R}$ is much greater than the number of elements in $\mathbf{M}$ and $\mathbf{U}$. Now, we can try to fit by minimizing the squared error between the MU decomposition and R:

$$ E = \frac{1}{2N}\sum\limits_{r=0}^N \left(R_{i_r j_r} - M_{i_r k}U_{k j_r}\right)^2 $$

where $N$ is the total number of input ratings and $r$ is an index for each rating. There is a problem here, though. This error function is quartic in our fit parameters. We can't guarantee that this is a convex function, so minimization could find a local but not global minimum. 

This is where alternating least squares comes in. We can run an iterative process to find a solution. To do this, we can fix either U or M and minimize the error for just one of our matrices. This would be a quadratic function, so methods such as gradient descent are guaranteed to find the solution if run with reasonable parameters. If the matrix is small enough, we could even just get an exact solution using matrix inversion.

Once we do one fit, we fix the newly-fit matrix and fit again for the other matrix. After some number of iterations, we stop this and look at the result. So, we are running least squares many times by alternating which of the two matrices we are optimizing.

To actually implement this, we first want to run a training/test split on our DataFrame.

In [7]:
(training,test) = ratings.randomSplit([0.8,0.2],seed=123)

The ALS object in Spark has several different parameters. I am leaving the rank (the number of latent features) at 10, but we realistically would want to optimize this. There is also a regularization parameter, and the number of iterations. I am just setting the number of iterations to 10 and optimizing only the regularization. I am also setting the coldStartStrategy to "drop." This sort of recommender needs to be fit on all users, so if a user is absent from the training set, the output is nan. This will suppress those values. The RBM recommender I mentioned at the beginning does not have this issue, which is one thing that makes it an intriguing possibility.

If you really want to get recommendations for a new user without retraining, you could try to identify the most similar users (maybe by using cosine similarity or even unsupervised clustering) and calculate ratings from those users.

In [8]:
als = ALS(maxIter=10,regParam=0.2,userCol='userId',
          itemCol='movieId',ratingCol='rating',
          coldStartStrategy='drop')

pipeline = Pipeline(stages=[als])
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating",
                                predictionCol="prediction")
param = ParamGridBuilder().addGrid(als.regParam,[0.01,0.03,0.05,0.1,0.2,0.3,0.5]).build()
cv = CrossValidator(estimator=pipeline,estimatorParamMaps=param,
                    evaluator=evaluator,numFolds=5)
cvModel = cv.fit(training)

I've now fit a cross validation model and trained it on my training set. First, I should see what parameter I chose.

In [9]:
for par,score in zip(param,cvModel.avgMetrics):
    print("Reg Param: {}, RMSE: {:0.3}".format(list(par.values())[0],score))

Reg Param: 0.01, RMSE: 1.23
Reg Param: 0.03, RMSE: 1.1
Reg Param: 0.05, RMSE: 1.03
Reg Param: 0.1, RMSE: 0.942
Reg Param: 0.2, RMSE: 0.917
Reg Param: 0.3, RMSE: 0.944
Reg Param: 0.5, RMSE: 1.03


I see that a regularization parameter of 0.2 gave the optimal result. With this amount of regularization, the RMSE error shows that I can accurately estimate users' scores to less than one point (of 5). Now, we can run on our test set to see how the result compares on new data from the same users.

In [10]:
predictions = cvModel.transform(test)
predictions = predictions.withColumn('trivial',functions.lit(3.54))
rmse = evaluator.evaluate(predictions)
print("Root-mean-square error = " + str(rmse))
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating",
                                predictionCol="trivial")
rmse = evaluator.evaluate(predictions)

print('Trivial model RMSE = ' + str(rmse))

Root-mean-square error = 0.904858797400974
Trivial model RMSE = 1.056236423550511


Ther result on the test set is very close to the cross-validation result. This shows that our model is generalizing well, so it should be effective to use for making recommendations. Furthermore, the error is about 15% less than what we would get if we just always pick the mean rating. So, it's not amazing, but we have at least improved upon a trivial model using a fairly small data set. Now, we should see what the predictions look like.

In [11]:
predictions.take(20)

[Row(movieId=463, rating=4.0, userId=534, prediction=3.5110599994659424, trivial=3.54),
 Row(movieId=471, rating=3.0, userId=588, prediction=3.7625296115875244, trivial=3.54),
 Row(movieId=471, rating=5.0, userId=460, prediction=3.823929786682129, trivial=3.54),
 Row(movieId=471, rating=3.0, userId=350, prediction=3.84967041015625, trivial=3.54),
 Row(movieId=471, rating=5.0, userId=285, prediction=3.5324244499206543, trivial=3.54),
 Row(movieId=471, rating=3.0, userId=440, prediction=3.685539722442627, trivial=3.54),
 Row(movieId=471, rating=3.0, userId=491, prediction=3.87979793548584, trivial=3.54),
 Row(movieId=471, rating=4.0, userId=659, prediction=3.603774070739746, trivial=3.54),
 Row(movieId=471, rating=4.0, userId=487, prediction=3.867652654647827, trivial=3.54),
 Row(movieId=471, rating=4.0, userId=105, prediction=3.5243277549743652, trivial=3.54),
 Row(movieId=471, rating=4.0, userId=529, prediction=3.451523542404175, trivial=3.54),
 Row(movieId=471, rating=4.0, userId=30, 

As we might expect, each prediction gives a movie and user ID, the actual rating, and the predicted rating. From here, we can get the top recommendations for each user.

I'll look at the top 10 recommended movies for ten users.

In [13]:
userRecs = cvModel.bestModel.stages[0].recommendForAllUsers(10)
recs = userRecs.take(10)
for row in recs:
    print("User: {}".format(row.userId))
    for movie in row.recommendations:
        print('\tMovie: {:06}, Rating: {:0.3}'.format(movie.movieId,movie.rating))

User: 471
	Movie: 067504, Rating: 5.21
	Movie: 132333, Rating: 4.98
	Movie: 065037, Rating: 4.93
	Movie: 008535, Rating: 4.86
	Movie: 144976, Rating: 4.85
	Movie: 005765, Rating: 4.83
	Movie: 004630, Rating: 4.83
	Movie: 106471, Rating: 4.82
	Movie: 003030, Rating: 4.77
	Movie: 093320, Rating: 4.75
User: 463
	Movie: 067504, Rating: 5.16
	Movie: 072647, Rating: 4.64
	Movie: 004405, Rating: 4.64
	Movie: 031547, Rating: 4.64
	Movie: 005059, Rating: 4.64
	Movie: 025764, Rating: 4.64
	Movie: 007074, Rating: 4.64
	Movie: 080599, Rating: 4.64
	Movie: 008535, Rating: 4.62
	Movie: 132333, Rating: 4.62
User: 496
	Movie: 065037, Rating: 5.38
	Movie: 067504, Rating: 5.38
	Movie: 008535, Rating: 5.34
	Movie: 008908, Rating: 5.33
	Movie: 090061, Rating: 5.32
	Movie: 132333, Rating: 5.22
	Movie: 106471, Rating: 5.19
	Movie: 144976, Rating: 5.18
	Movie: 004630, Rating: 5.17
	Movie: 005765, Rating: 5.17
User: 148
	Movie: 067504, Rating: 5.83
	Movie: 031547, Rating: 5.25
	Movie: 005059, Rating: 5.25
	Mo

We actually see here that many of top the predicted ratings are greater than 5, which is an impossible score. This isn't necessarily a terrible thing, as it gives us some way to order even very highly-rated movies. If we reduce these back to 5, we would see an improvement in our error. However, if our regularization is very small, we start seeing some very large predictions such as a score of 9. We would expect the same at the low end, where the worst movies for each user may well drop below the minimum score of 0.5.

Finally, we can recommend users to each item. Mathematically, there is no difference but there may also be good reasons to look at this. Perhaps you have some stock of an item that you need to sell. Even if it's not anyone's top recommended item, you could still recommend it to the top users to increase sales.

In [14]:
itemRecs = cvModel.bestModel.stages[0].recommendForAllItems(10)
recs = itemRecs.take(10)
for row in recs:
    print("Movie: {}".format(row.movieId))
    for user in row.recommendations:
        print('\tUser: {}, Rating: {:0.3}'.format(user.userId,user.rating))

Movie: 1580
	User: 46, Rating: 4.83
	User: 113, Rating: 4.76
	User: 543, Rating: 4.52
	User: 287, Rating: 4.44
	User: 656, Rating: 4.41
	User: 89, Rating: 4.39
	User: 145, Rating: 4.38
	User: 646, Rating: 4.35
	User: 517, Rating: 4.33
	User: 464, Rating: 4.32
Movie: 5300
	User: 227, Rating: 5.74
	User: 46, Rating: 5.72
	User: 113, Rating: 5.56
	User: 123, Rating: 5.43
	User: 290, Rating: 5.32
	User: 89, Rating: 5.23
	User: 78, Rating: 5.21
	User: 592, Rating: 5.18
	User: 177, Rating: 5.16
	User: 145, Rating: 5.14
Movie: 6620
	User: 123, Rating: 4.54
	User: 357, Rating: 4.53
	User: 113, Rating: 4.41
	User: 401, Rating: 4.39
	User: 156, Rating: 4.37
	User: 242, Rating: 4.36
	User: 280, Rating: 4.36
	User: 298, Rating: 4.35
	User: 4, Rating: 4.34
	User: 264, Rating: 4.34
Movie: 32460
	User: 46, Rating: 4.88
	User: 113, Rating: 4.84
	User: 298, Rating: 4.81
	User: 89, Rating: 4.77
	User: 401, Rating: 4.75
	User: 443, Rating: 4.73
	User: 287, Rating: 4.72
	User: 357, Rating: 4.7
	User: 543,

# Conclusions

In this notebook, I've used collaborative filtering in Spark to produce movie recommendations based on user rating data. The model used a fairly small dataset but was able to achieve a substantial improvement over what we would expect from a trivial rating model.