# Building a Recommendation System with Spark ML

In this notebook we explore creating a movie recommendation system using Spark ML. We will work with the MovieLens 1M dataset (http://grouplens.org/datasets/movielens). This dataset consists of more than 1 million ratings of approximately 4000 movies made by 6000 MovieLens users who joined MovieLens in 2000. MovieLens is a recommender system and virtual community website that recommends movies for its users to watch, based on their film preferences using collaborative filtering. This benchmark dataset was released February 2003.

A common task of recommender systems is to improve customer experience through personalized recommendations based on prior user feedback. Collaborative filtering is a technique that is commonly used for recommender systems. It employs a form of wisdom of the crowd approach to generate recommendations based on the preferences of users.

Spark ML supports an implementation of matrix factorization for collaborative filtering. Matrix factorization models have consistently shown to perform extremely well for collaborative filtering. The type of matrix factorization we will explore in this notebook is called explicit matrix factorization. In explicit matrix factorization, preferences provided by users themselves are utilized - as contrasted with implicit matrix factorization, where only implicit feedback (e.g. views, clicks, purchases, likes, shares etc.) is utilized. Collaborative filtering aims to fill in the missing entries of a user-item (in the case of movie recommendations, this consists of user and movie IDs) association matrix in which users (userID) and items (movieID) are described by a small set of latent factors that can be used to predict missing entries. Spark ML uses the Alternating Least Squares (ALS) algorithm to learn these latent factors for this matix factorization problem. ALS works by iteratively solving a series of least square regression problems to derive a model.

![Factorization Graphic](https://encrypted-tbn1.gstatic.com/images?q=tbn:ANd9GcSID6SakBUeYVGD4VUJ06oJwnEtqeXfnicgBWu5n7fIDTY6HsHooA)

## Please note that this notebook requires Spark 1.6.0 or greater
See JIRA SPARK-11284. This notebook was written using Spark 1.6.1, but will run unchanged on Spark 2.0.1.

## Verify Spark version and existence of Spark context

In [1]:
println("Spark version = " + sc.version)

Spark version = 2.1.2


## Import required Spark libraries

In [2]:
import org.apache.spark.ml.evaluation.RegressionEvaluator
import org.apache.spark.ml.recommendation.ALS
import org.apache.spark.ml.tuning.{ParamGridBuilder, CrossValidator}
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.DoubleType
val sqlContext = new org.apache.spark.sql.SQLContext(sc); import sqlContext.implicits._
import sys.process._

## Download the MovieLens 1M dataset from  http://grouplens.org/datasets/movielens

In [3]:
"rm -f ./ml-1m.zip".!
"wget http://files.grouplens.org/datasets/movielens/ml-1m.zip".!

--2018-04-23 21:44:56--  http://files.grouplens.org/datasets/movielens/ml-1m.zip
Resolving files.grouplens.org (files.grouplens.org)... 128.101.34.235
Connecting to files.grouplens.org (files.grouplens.org)|128.101.34.235|:80... connected.
HTTP request sent, awaiting response... 200 OK
Length: 5917549 (5.6M) [application/zip]
Saving to: ‘ml-1m.zip’

     0K .......... .......... .......... .......... ..........  0%  917K 6s
    50K .......... .......... .......... .......... ..........  1% 1.80M 5s
   100K .......... .......... .......... .......... ..........  2% 42.7M 3s
   150K .......... .......... .......... .......... ..........  3% 65.1M 2s
   200K .......... .......... .......... .......... ..........  4% 1.87M 2s
   250K .......... .......... .......... .......... ..........  5% 92.7M 2s
   300K .......... .......... .......... .......... ..........  6%  104M 2s
   350K .......... .......... .......... .......... ..........  6%  109M 1s
   400K .......... .......... ..........

  5300K .......... .......... .......... .......... .......... 92% 5.29M 0s
  5350K .......... .......... .......... .......... .......... 93% 53.7M 0s
  5400K .......... .......... .......... .......... .......... 94% 85.2M 0s
  5450K .......... .......... .......... .......... .......... 95% 96.8M 0s
  5500K .......... .......... .......... .......... .......... 96% 94.0M 0s
  5550K .......... .......... .......... .......... .......... 96%  104M 0s
  5600K .......... .......... .......... .......... .......... 97% 96.9M 0s
  5650K .......... .......... .......... .......... .......... 98% 99.9M 0s
  5700K .......... .......... .......... .......... .......... 99%  120M 0s
  5750K .......... .......... ........                        100% 88.4M=0.3s

2018-04-23 21:44:56 (22.5 MB/s) - ‘ml-1m.zip’ saved [5917549/5917549]



0

## Show that the MovieLens 1M dataset zip file is now on the local filesystem

In [4]:
"ls ./ml-1m.zip".!

./ml-1m.zip


0

## Unzip the MovieLens 1M dataset

In [5]:
"rm -r ./ml-1m".!
"unzip ml-1m.zip".!

Archive:  ml-1m.zip
   creating: ml-1m/
  inflating: ml-1m/movies.dat        
  inflating: ml-1m/ratings.dat       
  inflating: ml-1m/README            
  inflating: ml-1m/users.dat         


0

## Show the unzipped MovieLens 1M dataset files
### In this demo we will only be using the "ratings.dat" and "movies.dat" dataset. We will not use the "users.dat" dataset.

In [6]:
"ls ./ml-1m".!

movies.dat
ratings.dat
README
users.dat


0

## Read in Ratings Dataset
### Ratings File Description

All ratings are contained in the file “ratings.dat” and are in the following format:

    UserID::MovieID::Rating::Timestamp

    UserIDs range between 1 and 6040
    MovieIDs range between 1 and 3952
    Ratings are made on a 5-star scale (whole-star ratings only)
    Timestamp is represented in seconds since the epoch (1 January 1970)
    
Each user has at least 20 ratings



## Read contents of "ratings.dat" and show sample content

In [7]:
import org.apache.spark.sql.types.{StructField, StructType, IntegerType, FloatType, StringType}
val ratingsSchema = StructType(Array(StructField("userId", StringType, true), StructField("movieId", StringType, true), StructField("rating", StringType, true)));

In [8]:
val ratings_raw = sc.textFile("./ml-1m/ratings.dat")

In [9]:
import org.apache.spark.sql.Row
val ratings_RDD = ratings_raw.map(line => Row.fromSeq(line.split("::")))

## Convert ratings data to a DataFrame
Don't need the timestamp

In [10]:
val ratings_df = spark.createDataFrame(ratings_RDD, ratingsSchema)

In [11]:
import org.apache.spark.sql.functions.col
val ratings = ratings_df.withColumn("userIdTemp", ratings_df("userId").cast(IntegerType)).drop("userId").withColumnRenamed("userIdTemp", "userId").withColumn("movieIdTemp", ratings_df("movieId").cast(IntegerType)).drop("movieId").withColumnRenamed("movieIdTemp", "movieId").withColumn("ratingTemp", ratings_df("rating").cast(FloatType)).drop("rating").withColumnRenamed("ratingTemp", "rating")

In [12]:
ratings.printSchema()

root
 |-- userId: integer (nullable = true)
 |-- movieId: integer (nullable = true)
 |-- rating: float (nullable = true)



## Show the number of ratings in the dataset is slightly more than one million

In [13]:
println("Number of ratings = " + ratings_raw.count())

Number of ratings = 1000209                                                     


## Show a sample of the Ratings DataFrame

In [14]:
ratings.sample(false, 0.0001, seed=0).show(10)

+------+-------+------+
|userId|movieId|rating|
+------+-------+------+
|    68|   2908|   5.0|
|   173|   3730|   5.0|
|   456|   2917|   2.0|
|   526|    589|   4.0|
|   533|   2348|   3.0|
|   588|   1285|   4.0|
|   711|   1206|   4.0|
|   730|   3361|   4.0|
|   779|   3203|   5.0|
|   843|   1196|   4.0|
+------+-------+------+
only showing top 10 rows



## Show sample number of ratings per user

In [15]:
val grouped_ratings = ratings.groupBy("userId").count().withColumnRenamed("count", "No. of ratings")
grouped_ratings.show(10)

+------+--------------+                                                         
|userId|No. of ratings|
+------+--------------+
|   148|           624|
|   463|           123|
|   471|           105|
|   496|           119|
|   833|            21|
|  1088|          1176|
|  1238|            45|
|  1342|            92|
|  1580|            37|
|  1591|           314|
+------+--------------+
only showing top 10 rows



## Show the number of users in the dataset is approximately 6000

In [16]:
println("Number of users = " + grouped_ratings.count())

Number of users = 6040                                                          


## Read in Movies Dataset
### Movies File Description

Movie information is in the file “movies.dat” and is in the following format:

    movieId::Title::Genres

Titles are identical to titles provided by the IMDb and include the year of release

Genres are pipe-separated and are selected from the following genres:
* Action
* Adventure
* Animation
* Children's
* Comedy
* Crime
* Documentary
* Drama
* Fantasy
* Film-Noir
* Horror
* Musical
* Mystery
* Romance
* Sci-Fi
* Thriller
* War
* Western

Some MovieIDs do not correspond to a movie due to accidental duplicate entries and/or test entries. Movies are mostly entered by hand, so errors and inconsistencies may exist



## Read contents of "movies.dat" and show sample content

In [17]:
val movies_raw = sc.textFile("./ml-1m/movies.dat")
movies_raw.takeSample(false,10, seed=0).foreach(println)

1713::Mouse Hunt (1997)::Children's|Comedy
659::Purple Noon (1960)::Crime|Thriller
914::My Fair Lady (1964)::Musical|Romance
684::Windows (1980)::Drama
129::Pie in the Sky (1995)::Comedy|Romance
219::Cure, The (1995)::Drama
1026::So Dear to My Heart (1949)::Children's|Drama
2015::Absent Minded Professor, The (1961)::Children's|Comedy|Fantasy
353::Crow, The (1994)::Action|Romance|Thriller
1891::Ugly, The (1997)::Horror|Thriller


## Convert movies data to a DataFrame

In [18]:
val moviesSchema = StructType(Array(StructField("movieId", StringType, true), StructField("Title", StringType, true), StructField("Genre", StringType, true)));
val movies_RDD = movies_raw.map(line => Row.fromSeq(line.split("::")))
val movies_df = spark.createDataFrame(movies_RDD, moviesSchema)
val movies = movies_df.withColumn("movieIdTemp", movies_df("movieId").cast(IntegerType)).drop("movieId").withColumnRenamed("movieIdTemp", "movieId")

In [19]:
movies.show(10, false)

+----------------------------------+----------------------------+-------+
|Title                             |Genre                       |movieId|
+----------------------------------+----------------------------+-------+
|Toy Story (1995)                  |Animation|Children's|Comedy |1      |
|Jumanji (1995)                    |Adventure|Children's|Fantasy|2      |
|Grumpier Old Men (1995)           |Comedy|Romance              |3      |
|Waiting to Exhale (1995)          |Comedy|Drama                |4      |
|Father of the Bride Part II (1995)|Comedy                      |5      |
|Heat (1995)                       |Action|Crime|Thriller       |6      |
|Sabrina (1995)                    |Comedy|Romance              |7      |
|Tom and Huck (1995)               |Adventure|Children's        |8      |
|Sudden Death (1995)               |Action                      |9      |
|GoldenEye (1995)                  |Action|Adventure|Thriller   |10     |
+----------------------------------+--

In [20]:
movies.printSchema()

root
 |-- Title: string (nullable = true)
 |-- Genre: string (nullable = true)
 |-- movieId: integer (nullable = true)



## Show the number of movies in the dataset is approximately 4000

In [21]:
println("Number of movies = " + movies.count())

Number of movies = 3883


## Split Ratings data into Training (80%) and Test (20%) datasets

In [22]:
val Array(training, test) = ratings.randomSplit(Array(0.8, 0.2), seed=0L)

## Show resulting Ratings dataset counts

In [23]:
val trainingRatio = training.count().toDouble/ratings.count().toDouble*100
val testRatio = test.count().toDouble/ratings.count().toDouble*100

println("Total number of ratings = " + ratings.count())
println("Training dataset count = " + training.count() + ", " + BigDecimal(trainingRatio).setScale(2, BigDecimal.RoundingMode.HALF_UP).toDouble + "%")
println("Test dataset count = " + test.count() + ", " + BigDecimal(testRatio).setScale(2, BigDecimal.RoundingMode.HALF_UP).toDouble+ "%")

Total number of ratings = 1000209                                               
Training dataset count = 799809, 79.96%                                         
Test dataset count = 200400, 20.04%                                             


## Show sample of Ratings Training dataset

In [24]:
training.sample(false, 0.0001, seed=0).show(10)

+------+-------+------+
|userId|movieId|rating|
+------+-------+------+
|    93|   2058|   5.0|
|   204|   2424|   3.0|
|   555|   1136|   4.0|
|   654|   3699|   2.0|
|   669|   2693|   5.0|
|   724|   2692|   4.0|
|   877|   2291|   4.0|
|   899|   1270|   4.0|
|   973|    246|   5.0|
|  1038|   3505|   5.0|
+------+-------+------+
only showing top 10 rows



## Show sample of Ratings Test dataset

In [25]:
test.sample(false, 0.0001, seed=0).show(10)

+------+-------+------+
|userId|movieId|rating|
+------+-------+------+
|   329|   2436|   3.0|
|   791|     34|   5.0|
|  2000|    110|   5.0|
|  2288|   1020|   2.0|
|  2386|   2093|   3.0|
|  2740|   2153|   1.0|
|  3393|   1610|   5.0|
|  3445|   3112|   2.0|
|  3547|   3260|   3.0|
|  4036|      3|   4.0|
+------+-------+------+
only showing top 10 rows



## Build the recommendation model on the training data using ALS

In [26]:
val als = new ALS().setMaxIter(10).setRegParam(0.01).setUserCol("userId").setItemCol("movieId").setRatingCol("rating")
val model = als.fit(training)

[Stage 49:>                                                        (0 + 0) / 10]

## ALS Model Parameters

### Let's take a look at all the paramaters available for ALS, the default values of those parameters which we did not change, and valide the values of those parameters set above.

#### We specifically set

    MaxIter (maximum number of iterations) = 10 (which is the default)
    RegParam (regularization parameter) = 0.01 (default is 0.1)
    UserCol (column name for user ids) = “userID” (default is "user")
    ItemCol (column name for item ids) = “movieID” (default is "item")
    RatingCol (column for ratings) = “rating” (which is the default)



In [27]:
println(als.explainParams)

alpha: alpha for implicit preference (default: 1.0)
checkpointInterval: set checkpoint interval (>= 1) or disable checkpoint (-1). E.g. 10 means that the cache will get checkpointed every 10 iterations (default: 10)
finalStorageLevel: StorageLevel for ALS model factors. (default: MEMORY_AND_DISK)
implicitPrefs: whether to use implicit preference (default: false)
intermediateStorageLevel: StorageLevel for intermediate datasets. Cannot be 'NONE'. (default: MEMORY_AND_DISK)
itemCol: column name for item ids. Ids must be within the integer value range. (default: item, current: movieId)
maxIter: maximum number of iterations (>= 0) (default: 10, current: 10)
nonnegative: whether to use nonnegative constraint for least squares (default: false)
numItemBlocks: number of item blocks (default: 10)
numUserBlocks: number of user blocks (default: 10)
predictionCol: prediction column name (default: prediction)
rank: rank of the factorization (default: 10)
ratingCol: column name for ratings (default: 

## Run the model against the Test data and show a sample of the predictions

In [28]:
val predictions = model.transform(test).na.drop()
predictions.show(10)

|userId|movieId|rating|prediction|
+------+-------+------+----------+
|  1605|    148|   2.0| 1.8921044|
|    53|    148|   5.0| 3.3562677|
|  2507|    148|   4.0| 2.9727654|
|  3717|    463|   2.0|  3.038925|
|  3650|    463|   2.0| 3.2706196|
|  3328|    463|   4.0| 2.5866077|
|  4252|    463|   3.0| 3.2075772|
|   331|    463|   4.0| 2.5212603|
|  4040|    463|   1.0| 1.9798533|
|  3753|    463|   2.0| 2.0343888|
+------+-------+------+----------+
only showing top 10 rows



## Evaluate the model by computing the RMSE on the test data

The Spark ML evaluator for regression, RegressionEvaluator, expects two input columns: prediction and label. RegressionEvaluator supports “rmse” (default), “mse”, “r2”, and “mae”. 

We will use RMSE, which is the square root of the average of the square of all of the error. 
RMSE is an excellent general purpose error metric for numerical predictions.

In [29]:
val evaluator = new RegressionEvaluator().setMetricName("rmse").setLabelCol("rating").setPredictionCol("prediction")
val rmse = evaluator.evaluate(predictions)
println("Root-mean-square error = " + rmse)



## Show that a smaller value of rmse is better

This is obviously the case since RMSE is an aggregation of all the error. Thus evaluator.isLargerBetter should be 'false'.

In [30]:
evaluator.isLargerBetter

false

## Tune the Model
Build a Parameter Grid specifying what parameters and values will be evaluated in order to determine the best combination.

Spark ML algorithms provide many hyperparameters for tuning models. These hyperparameters are distinct from the model parameters being optimized by ML itself. Hyperparameter tuning is accomplished by choosing the best set of parameters based on model performance on test data that the model was not trained with. All combinations of hyperparameters specified will be tried in order to find the one that leads to the model with the best evaluation result.

In this example, we will only be evaluating the ALS regularization parameter, regParam. In machine learning, regularization refers to a process of introducing additional information in order to prevent overfitting.

In [31]:
val paramGrid = new ParamGridBuilder().addGrid(als.regParam, Array(0.01, 0.1)).build()

## Create a cross validator to tune the model with the defined parameter grid

Cross-validation attempts to fit the underlying estimator with user-specified combinations of parameters, cross-evaluate the fitted models, and output the best one.

In k-fold cross-validation, the original sample is randomly partitioned into k equal sized subsamples. Of the k subsamples, a single subsample is retained as the validation data for testing the model, and the remaining k − 1 subsamples are used as training data. The cross-validation process is then repeated k times (the folds), with each of the k subsamples used exactly once as the validation data. The k results from the folds can then combined to produce a single estimation.


In [32]:
val cv = new CrossValidator().setEstimator(als).setEvaluator(evaluator).setEstimatorParamMaps(paramGrid).setNumFolds(2)

## Validate the parameter grid values

In [33]:
cv.getEstimatorParamMaps.foreach(println)

{
	als_e4383e2e19ae-regParam: 0.01
}
{
	als_e4383e2e19ae-regParam: 0.1
}


## Cross-evaluate to find the best model

using the RMSE evaluator and hyperparameters specified in the parameter grid


In [34]:
val cvModel = cv.fit(training)

[Stage 489:>                                                        (0 + 2) / 2]

In [35]:
println("Best fit root-mean-square error = " + evaluator.evaluate(cvModel.transform(test).na.drop()))

Best fit root-mean-square error = 0.8938252623053243                            


## Now lets use our model to recommend movies to a user
For this example we will be recommending movies to user with userID = 3000.

In [36]:
val userId = 3000

## Create a DataFrame with the movies that user 3000 has rated
First let's take a look at user 3000's ratings in the ratings dataset.

In [37]:
val movies_watched = ratings.filter(ratings("userId") === userId)
movies_watched.show(10)

+------+-------+------+
|userId|movieId|rating|
+------+-------+------+
|  3000|   2987|   4.0|
|  3000|   2990|   3.0|
|  3000|   3793|   3.0|
|  3000|   1252|   4.0|
|  3000|   2997|   4.0|
|  3000|   1259|   3.0|
|  3000|    589|   4.0|
|  3000|      9|   1.0|
|  3000|   1265|   5.0|
|  3000|    733|   5.0|
+------+-------+------+
only showing top 10 rows



## Calculate  user 3000's minimum, maximum and average movie rating

In [38]:
movies_watched.select(min($"rating"), max($"rating"), avg($"rating") ).show()

|min(rating)|max(rating)|       avg(rating)|
+-----------+-----------+------------------+
|        1.0|        5.0|3.2641509433962264|
+-----------+-----------+------------------+



## Show user 3000's top 10 rated movies (with movie title, genre, and rating)
To do this, we must join the ratings dataset with the movies dataset. The ratings dataset only contains the movieID. Only the movies dataset contains the movie title and genre.

We are joining the ratings and movies datasets based on the common movieID column, filtering on userID 3000, and sorting in descending order by rating.

In [39]:
ratings.as('a).filter(ratings("userId") === userId).join(movies.as('b), $"a.movieId" === $"b.movieId").select("a.userId", "a.movieId", "b.Title", "b.Genre", "a.rating").sort($"a.rating".desc).show(10,false)

|userId|movieId|Title                         |Genre                          |rating|
+------+-------+------------------------------+-------------------------------+------+
|3000  |733    |Rock, The (1996)              |Action|Adventure|Thriller      |5.0   |
|3000  |3552   |Caddyshack (1980)             |Comedy                         |5.0   |
|3000  |2968   |Time Bandits (1981)           |Adventure|Fantasy|Sci-Fi       |5.0   |
|3000  |34     |Babe (1995)                   |Children's|Comedy|Drama        |5.0   |
|3000  |1197   |Princess Bride, The (1987)    |Action|Adventure|Comedy|Romance|5.0   |
|3000  |1307   |When Harry Met Sally... (1989)|Comedy|Romance                 |5.0   |
|3000  |590    |Dances with Wolves (1990)     |Adventure|Drama|Western        |5.0   |
|3000  |1199   |Brazil (1985)                 |Sci-Fi                         |5.0   |
|3000  |1653   |Gattaca (1997)                |Drama|Sci-Fi|Thriller          |5.0   |
|3000  |1265   |Groundhog Day (1993)       

## Determining what movies user 3000 has not already watched and rated so that we can make new movie recommendations

In order to make new movie recommendations from the list of movies in the movies dataset, we must first figure out which movies user 3000 has not already watched.

In [40]:
val movies_notwatched = ratings.filter(ratings("userId") !== userId)
movies_notwatched.sample(false, 0.0001, seed=0).show(5)
println("Count = " + movies_notwatched.count())

+------+-------+------+
|userId|movieId|rating|
+------+-------+------+
|    68|   2908|   5.0|
|   173|   3730|   5.0|
|   456|   2917|   2.0|
|   526|    589|   4.0|
|   533|   2348|   3.0|
+------+-------+------+
only showing top 5 rows

Count = 1000103                                                                 


## Determining what movies user 3000 has not already watched and rated so that we can make new movie recommendations - another attempt

This previous attempt at determining what movies 3000 has not already watched, did not really give us what we need. It simply provided all the movie ratings in the ratings dataset not rated by user 3000. What we want, instead, is a list of all movies in the movies dataset that user 3000 has not rated so that we can make new movie recommendations based on these movies which user 3000 has not yet watched. We don't want to recommend movies that the user has already rated.

In order to do this, we will again need to join the ratings and movies datasets. However, unlike the the previous join we did, which was an inner join, this join needs to be an outer join as we want all the movies in the movies dataset that the user has not rated in the ratings dataset. Since the join order is ratings then movies, we specifically need to employ a right outer join. We will again use movieId as the join column, filter on userId 3000

In [41]:
val movies_notwatched_movieId = ratings.filter(ratings("userId") === userId).as('a).join(movies.as('b), $"a.movieId" === $"b.movieId", "right").filter($"a.movieId".isNull).select($"b.movieId", $"b.Title").sort($"b.movieId".asc)
movies_notwatched_movieId.show(10, false)

|movieId|Title                             |
+-------+----------------------------------+
|1      |Toy Story (1995)                  |
|2      |Jumanji (1995)                    |
|3      |Grumpier Old Men (1995)           |
|4      |Waiting to Exhale (1995)          |
|5      |Father of the Bride Part II (1995)|
|6      |Heat (1995)                       |
|7      |Sabrina (1995)                    |
|8      |Tom and Huck (1995)               |
|11     |American President, The (1995)    |
|12     |Dracula: Dead and Loving It (1995)|
+-------+----------------------------------+
only showing top 10 rows



## Let's do a check on the DataFrame we just created
Let's check to see that the results of our right outer join make sense by looking at the row count of the resulting DataFrame. The number of movies not watched by the user from the movies dataset (the result of the right outer join) plus the number of movies rated by the user in the ratings dataset should equal the total number of movies in the movies dataset.


In [42]:
println("Total number of movies = " + movies.count())
println("Number of movies rated by user = " + ratings.filter(ratings("userId") === userId).count())
println("Number of movies NOT rated by user = " + movies_notwatched_movieId.count())

Total number of movies = 3883
Number of movies rated by user = 106                                            
Number of movies NOT rated by user = 3777                                       


## Create input DataFrame to use with the model to recommend new movies
The ALS algorithm we used above to build the recommendation model requires two inputs for making predictions - userID and movieId. Look back at the ratings dataset we used earlier to test the model to confirm this.

Remember that the rating column in the ratings dataset is only used for training and evaluating the model, not for making predicitons.

In order to make the DataFrame resulting from the right outer join ready for input into the model to make movie predicitons for user 3000, we need to add a userId column to the DataFrame with userId set to 3000 for every movieId. As this is the dataset that predicitons will be made on, it must contain movieId and userID colums with values for the movies (those which the user has not already watched and rated - the result of the right outer join) and the userId (in this case 3000) for whom we are making recommendations.


In [43]:
val data_userId = movies_notwatched_movieId.withColumn("userId", lit(userId))
data_userId.show(10, false)

|movieId|Title                             |userId|
+-------+----------------------------------+------+
|1      |Toy Story (1995)                  |3000  |
|2      |Jumanji (1995)                    |3000  |
|3      |Grumpier Old Men (1995)           |3000  |
|4      |Waiting to Exhale (1995)          |3000  |
|5      |Father of the Bride Part II (1995)|3000  |
|6      |Heat (1995)                       |3000  |
|7      |Sabrina (1995)                    |3000  |
|8      |Tom and Huck (1995)               |3000  |
|11     |American President, The (1995)    |3000  |
|12     |Dracula: Dead and Loving It (1995)|3000  |
+-------+----------------------------------+------+
only showing top 10 rows



## We are now ready to make movie recommendations!!!
We will take the DataFrame we just created and run our model against it - outputing the movie title and the rating predicted by the model for user 3000. The results are sorted by predicted rating and limited to the top ten results, so we can give the user ten movie recommendations.

In [44]:
val predictions_userId = cvModel.transform(data_userId).na.drop()
val top10 = predictions_userId.select("Title", "prediction").sort($"prediction".desc).show(10, false)

|Title                                             |prediction|
+--------------------------------------------------+----------+
|Still Crazy (1998)                                |7.2666903 |
|Algiers (1938)                                    |7.1671004 |
|My Name Is Joe (1998)                             |6.7693667 |
|Hell in the Pacific (1968)                        |6.3978605 |
|Last Days, The (1998)                             |6.3157787 |
|Star Maker, The (Uomo delle stelle, L') (1995)    |6.3082347 |
|One Magic Christmas (1985)                        |6.2919774 |
|My Best Fiend (Mein liebster Feind) (1999)        |5.9928555 |
|Spirits of the Dead (Tre Passi nel Delirio) (1968)|5.991328  |
|Office Killer (1997)                              |5.9232535 |
+--------------------------------------------------+----------+
only showing top 10 rows



## Conclusion
This notebook was intended to illustrate how to use spark.ml machine learning with DataFrames to create a recommendation system for making movie recommendations to a user. The recommendations are based on that user's prior ratings feedback in relation to those of the entire user community who have rated other movies that we would like to recommend to the user. This notebook also demonstrates how to use the DataFrame API to inspect and transform the input datasets in support of the machine learning approach employed.

Please note that although this demo illustrates how to tune the model for better fit, no attempt was made to actually optimize to the best possible model. The intent was simply to show the methodology.

This notebook is self standing. That is, all data required to run the notebook is downloaded from within the notebook itself - specifically the "ratings.dat" and "movie.dat" files from the MovieLens 1M dataset.


![IBM Logo](http://www-03.ibm.com/press/img/Large_IBM_Logo_TN.jpg)

Rich Tarro  
Big Data Architect, IBM Corporation  
email: rtarro@us.ibm.com

June 7, 2016  
Updated April 23, 2018