# Recommendation System with Spark ML

In this notebook we explore creating a movie recommendation system using Spark ML. We will work with the MovieLens 20M dataset (http://grouplens.org/datasets/movielens). This dataset consists of more than 20 million ratings of approximately 27,000 movies made by 138,000 MovieLens users who joined MovieLens in 2015. Includes tag genome data with 12 million relevance scores across 1,100 tags.MovieLens is a recommender system and virtual community website that recommends movies for its users to watch, based on their film preferences using collaborative filtering. This benchmark dataset was released April 2015.

Spark ML supports an implementation of matrix factorization for collaborative filtering. Matrix factorization models have consistently shown to perform extremely well for collaborative filtering. The type of matrix factorization we will explore in this notebook is called explicit matrix factorization. In explicit matrix factorization, preferences provided by users themselves are utilized - as contrasted with implicit matrix factorization, where only implicit feedback (e.g. views, clicks, purchases, likes, shares etc.) is utilized. Collaborative filtering aims to fill in the missing entries of a user-item (in the case of movie recommendations, this consists of user and movie IDs) association matrix in which users (userID) and items (movieID) are described by a small set of latent factors that can be used to predict missing entries. Spark ML uses the Alternating Least Squares (ALS) algorithm to learn these latent factors for this matix factorization problem. ALS works by iteratively solving a series of least square regression problems to derive a model.

![Factorization Graphic](https://encrypted-tbn1.gstatic.com/images?q=tbn:ANd9GcSID6SakBUeYVGD4VUJ06oJwnEtqeXfnicgBWu5n7fIDTY6HsHooA)

## Verify Spark version and existence of Spark context

In [3]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Movie Recommendation Engine using Spark").config("spark.some.config.option","some-value").getOrCreate()
sc = spark.sparkContext

## Import Spark libraries

In [5]:
from pyspark.ml.tuning import ParamGridBuilder,CrossValidator
from pyspark.sql import SQLContext 
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.sql import Row
import pyspark.sql.functions
from pyspark.sql.functions import lit
import sys


## Create the Spark SQL Context

In [7]:
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)

## Download and load the MovieLens 20 Million dataset

## Read in Ratings Dataset
### Ratings File Description

All ratings are contained in the file “ratings.dat” and are in the following format:

    UserID::MovieID::Rating::Timestamp

    UserIDs range between 1 and 138,000
    MovieIDs range between 1 and 27,000
    Ratings are made on a 5-star scale (whole-star ratings only)
    Timestamp is represented in seconds 
    
Each user has at least 20-30 ratings

## Read contents of "ratings.dat" into a dataframe

In [11]:
df = spark.read.csv("/FileStore/tables/ratings.csv" , header= 'true')

## Examine the ratings dataframe schema

In [13]:
df.printSchema()

In [14]:
df.head()

## Examine the total ratings dataframe record count

In [16]:
df.count()

## Now lets us find standard deviation, mean, minimum & maximum values

In [18]:
df.describe().show()

## Now lets see a sample of the Ratings DataFrame

In [20]:
df.show(3)

# Convert all datatypes into integer values

In [22]:
ratings = df.select('userId','movieId','rating')
from pyspark.sql.types import IntegerType
ratings=ratings.withColumn("userId", ratings["userId"].cast(IntegerType()))
ratings=ratings.withColumn("movieId", ratings["movieId"].cast(IntegerType()))
ratings=ratings.withColumn("rating", ratings["rating"].cast(IntegerType()))
ratings.dtypes

## Now let us take a small fraction of the dataset for our recommendation engine

In [24]:
ratings.sample(fraction=0.1, seed=4).count()

## Show sample number of ratings per user

In [26]:
grouped_ratings = ratings.groupBy("userId").count().withColumnRenamed("count", "No. of ratings")
grouped_ratings.show(10)

## Show the number of users in the dataset is approximately 138500

In [28]:
print(grouped_ratings.count())

## Read in Movies Dataset
### Movies File Description

Movie information is in the file “movies.dat” and is in the following format:

    movieId::Title::Genres

Titles are identical to titles provided by the IMDb and include the year of release

Genres are pipe-separated and are selected from the following genres:
* Action
* Adventure
* Animation
* Children's
* Comedy
* Crime
* Documentary
* Drama
* Fantasy
* Film-Noir
* Horror
* Musical
* Mystery
* Romance
* Sci-Fi
* Thriller
* War
* Western

Some MovieIDs do not correspond to a movie due to accidental duplicate entries and/or test entries. Movies are mostly entered by hand, so errors and inconsistencies may exist

## Read contents of "movies.dat" and show sample content

In [31]:
movies = spark.read.csv("/FileStore/tables/movies.csv",header="true")
from pyspark.sql.types import IntegerType
movies=movies.withColumn("movieId", movies["movieId"].cast(IntegerType()))
movies.show(5)

## Show the number of movies in the dataset is approximately 27000

In [33]:
movies.count()

## Split Ratings data into Training (80%) and Test (20%) datasets

In [35]:
train, test= ratings.randomSplit([0.8,0.2], seed=26)


## Show resulting Ratings dataset counts

In [37]:
print("Total no of records in ratings the dataset sample: ")
print(ratings.count())
print("Total no of train ratings records in the dataset sample:  ")
print(train.count())
print("Total no of test ratings records in the dataset sample: ")
print(test.count())

## Show sample of Ratings Training dataset

In [39]:
train.sample(withReplacement=False, fraction=0.0001, seed=3).show(5)

## Show sample of Ratings Test dataset

In [41]:
test.sample(withReplacement=False, fraction=0.0001, seed=3).show(5)

## Build the recommendation model on the training data using ALS

In [43]:
als = ALS().setMaxIter(10).setRegParam(0.01).setUserCol("userId").setItemCol("movieId").setRatingCol("rating")
model = als.fit(train)

## ALS Model Parameters

### Let's take a look at all the paramaters available for ALS, the default values of those parameters which we did not change, and valide the values of those parameters set above.

#### We specifically set

    MaxIter (maximum number of iterations) = 10 (which is the default)
    RegParam (regularization parameter) = 0.01 (default is 0.1)
    UserCol (column name for user ids) = “userID” (default is "user")
    ItemCol (column name for item ids) = “movieID” (default is "item")
    RatingCol (column for ratings) = “rating” (which is the default)

## Show the ALS explained parameters

In [46]:
print(als.explainParams())

## Run the model against the Test data and show a sample of the predictions

In [48]:
predictions = model.transform(test).na.drop()
predictions.show(10)

## Evaluate the model by computing the RMSE on the test data

The Spark ML evaluator for regression, RegressionEvaluator, expects two input columns: prediction and label. RegressionEvaluator supports “rmse” (default), “mse”, “r2”, and “mae”. 

We will use RMSE, which is the square root of the average of the square of all of the error. 
RMSE is an excellent general purpose error metric for numerical predictions.

In [51]:
evaluator = RegressionEvaluator().setMetricName("rmse").setLabelCol("rating").setPredictionCol("prediction")
rmse = evaluator.evaluate(predictions)
print("Root Mean Square Error : ")
print(rmse)

## Show that a smaller value of rmse is better

This is obviously the case since RMSE is an aggregation of all the error. Thus evaluator.isLargerBetter should be 'false'.

In [53]:
evaluator.isLargerBetter()

## Tune the Model
Build a Parameter Grid specifying what parameters and values will be evaluated in order to determine the best combination.

Spark ML algorithms provide many hyperparameters for tuning models. These hyperparameters are distinct from the model parameters being optimized by ML itself. Hyperparameter tuning is accomplished by choosing the best set of parameters based on model performance on test data that the model was not trained with. All combinations of hyperparameters specified will be tried in order to find the one that leads to the model with the best evaluation result.

In this example, we will only be evaluating the ALS regularization parameter, regParam. In machine learning, regularization refers to a process of introducing additional information in order to prevent overfitting.

In [55]:
paramGrid = ParamGridBuilder().addGrid(als.regParam, [0.01, 0.1]).build()

## Create a cross validator to tune the model with the defined parameter grid

Cross-validation attempts to fit the underlying estimator with user-specified combinations of parameters, cross-evaluate the fitted models, and output the best one.

In k-fold cross-validation, the original sample is randomly partitioned into k equal sized subsamples. Of the k subsamples, a single subsample is retained as the validation data for testing the model, and the remaining k − 1 subsamples are used as training data. The cross-validation process is then repeated k times (the folds), with each of the k subsamples used exactly once as the validation data. The k results from the folds can then combined to produce a single estimation.

In [57]:
cv = CrossValidator().setEstimator(als).setEvaluator(evaluator).setEstimatorParamMaps(paramGrid).setNumFolds(2)

## Validate the parameter grid values

In [59]:
cv.getEstimatorParamMaps()


## Cross-evaluate to find the best model

using the RMSE evaluator and hyperparameters specified in the parameter grid

In [61]:
cvModel = cv.fit(train)

In [62]:
predictions = cvModel.transform(test).na.drop()
predictions.show(10,False)

In [63]:
print(evaluator.evaluate(cvModel.transform(test).na.drop()))

## Now lets use our model to recommend movies to a random user
For this example we will be recommending movies to user with userID = 237.

In [65]:
userId = 237

## Create a DataFrame with the movies that user 237 has rated
First let's take a look at user 237's ratings in the ratings dataset.

In [67]:
movies_watched = ratings.filter(ratings.userId == userId)
movies_watched.show(10,False)

## Calculate  user 237's minimum, maximum and average movie rating

In [69]:
movies_watched.describe().show()

## Show user 237's top 10 rated movies (with movie title, genre, and rating)
To do this, we must join the ratings dataset with the movies dataset. The ratings dataset only contains the movieID. Only the movies dataset contains the movie title and genre.

We are joining the ratings and movies datasets based on the common movieID column, filtering on userID 237, and sorting in descending order by rating.

In [71]:
a = ratings.alias('a')
b = movies.alias('b')

In [72]:
a.filter(ratings.userId == userId).join(b, a.movieId == b.movieId).select(a.userId, a.movieId, b.title, b.genres, a.rating).sort(a.rating,ascending=False).show(10,False)

## Determining what movies user 237 has not already watched and rated so that we can make new movie recommendations

In order to make new movie recommendations from the list of movies in the movies dataset, we must first figure out which movies user 237 has not already watched.

In [74]:
movies_notwatched = ratings.filter(ratings.userId != userId)
movies_notwatched.sample(False, 0.0001, seed=0).show(10,False)
print("Total movie ratings in the ratings dataset not rated and not watched by user 237 = ")
print(movies_notwatched.count())

## Determining what movies user 237 has not already watched and rated so that we can make new movie recommendations - another attempt

This previous attempt at determining what movies user 237 has not already watched, did not really give us what we need. It simply provided all the movie ratings in the ratings dataset not rated by user 237. What we want, instead, is a list of all movies in the movies dataset that user 237 has not rated so that we can make new movie recommendations based on these movies which user 237 has not yet watched. We don't want to recommend movies that the user has already rated.

In order to do this, we will again need to join the ratings and movies datasets. However, unlike the the previous join we did, which was an inner join, this join needs to be an outer join as we want all the movies in the movies dataset that the user has not rated in the ratings dataset. Since the join order is ratings then movies, we specifically need to employ a right outer join. We will again use movieId as the join column, filter on userId 237

In [76]:
a = ratings.alias('a')
b = movies.alias('b')
movies_notwatched_movieId = a.filter(ratings.userId == userId).join(b, a.movieId == b.movieId, 'right').filter(a.movieId.isNull()).select(b.movieId,b.title).sort(b.movieId,ascending = True)
movies_notwatched_movieId.show(10,False)

## Let's check dataframe we just created
Let's check to see that the results of our right outer join make sense by looking at the row count of the resulting DataFrame. The number of movies not watched by the user from the movies dataset (the result of the right outer join) plus the number of movies rated by the user in the ratings dataset should equal the total number of movies in the movies dataset.

In [78]:
print("Total number of movies = ")
print(movies.count())
print("Number of movies rated by user = ")
print(ratings.filter(ratings.userId == userId).count())
print("Number of movies NOT rated & NOT watched by user = ")
print(movies_notwatched_movieId.count())

## Create input DataFrame to use with the model to recommend new movies
The ALS algorithm we used above to build the recommendation model requires two inputs for making predictions - userID and movieId. Look back at the ratings dataset we used earlier to test the model to confirm this.

Remember that the rating column in the ratings dataset is only used for training and evaluating the model, not for making predicitons.

In order to make the DataFrame resulting from the right outer join ready for input into the model to make movie predicitons for user 237, we need to add a userId column to the DataFrame with userId set to 237 for every movieId. As this is the dataset that predicitons will be made on, it must contain movieId and userID columns with values for the movies (those which the user has not already watched and rated - the result of the right outer join) and the userId (in this case 237) for whom we are making recommendations.

In [80]:

data_userId = movies_notwatched_movieId.withColumn("userId", lit(userId))
data_userId.show(10, False)
print(" Total number of movies not watched and not rated by user 237 = ")
print(data_userId.count())


## Generate top 10 movie recommendations for each user

In [82]:
userRecs = model.recommendForAllUsers(10)
print(userRecs.show())

##  Generate top 10 user recommendations for each movie

In [84]:
movieRecs = model.recommendForAllItems(10)
print(movieRecs.show())

## Generate top 10 movie recommendations for userID 237

In [86]:
#users = ratings.select(als.getUserCol()).distinct().limit(3)
users = ratings.filter(ratings.userId==userId)
userSubsetRecs = model.recommendForUserSubset(users,10)
print("Top 10 Movie Recommendations for 237 user ID is as follows in [MovieID, Prediction] format :")
print(userSubsetRecs.select("recommendations").show(10,False))


##  Conclusion:
As you can find above the top 10 movie recommendations for user 237 depicted in the form of Movie Id and Prediction Score

##### MovieId             Prediction

79410  ||  7.897084

73529  ||  7.3702297

107434 ||  6.6337686 

116155  || 6.584987

66927  ||  6.52591

80480  ||  6.5221

88313   || 6.5021305

103753  || 6.3608503

109253  || 6.3608503

113790  || 6.3608503