# Building a Recommendation System in PySpark 
# Stage 1: Data Collection and Pre-processing

## Objectives:
* Demonsrate the basic understanding of how recommendation systems are used for personalization of services.
* Download and store MovieLens data from within the python environment.
* Parse and filter dataset, storing required features into RDDs. 
* Save the preprocessed dataset for later use. 

## Introduction

Recommender/Recommendation Systems are one of the most successful applications of Machine Learning in the Big Data domain. Such systems are an integral part of the success of Amazon (Books, Items), Pandora/Spotify (Music), Google (News, Search), YouTube (Videos) etc.  For Amazon these systems bring more than 30% of revenues. For Netflix service, 75% of movies that people watch are from some sort of recommendation.

> The goal of Recommendation Systems is to find what is likely to be of interest to the user. This enables organizations to offer a high level of personalization and customer tailored services.

For online video content services like Netflix and Hulu, the need to build robust movie recommendation systems is extremely important. An example of recommendation system is such as this:

    User A watches Game of Thrones and Breaking Bad.
    User B performs a search query for Game of Thrones.
    The system suggests Breaking Bad from data collected about user A.
    

This lab will guide you step-by-step into how to use the MovieLens dataset to build a movie recommendation system using collaborative filtering technique with Spark's Alternating Least Saqures implementation.

## Movie-lens Data Collection

Social computing research centre at university of Minnesota, GroupLens Research,  has collected and made available rating related datasets at the [MovieLens website](http://movielens.org/). The datasets were collected over various periods of time and can be directly downloaded from [this location](http://grouplens.org/datasets/movielens/). 

A data dictionary with details on individual datasets and included features can be viewed [HERE](http://files.grouplens.org/datasets/movielens/ml-20m-README.html)

For our experiment , we shall download the latest datasets direct from the website in the zip format.

* **Small Dataset**: 100,000 ratings and 1,300 tag applications applied to 9,000 movies by 700 users. Last updated 10/2016.
* **Complete Dataset**: 26,000,000 ratings and 750,000 tag applications applied to 45,000 movies by 270,000 users. Last updated 8/2017.

Let's first the URLs for both datasets for downloading. 

In [1]:
smallURL = 'http://files.grouplens.org/datasets/movielens/ml-latest-small.zip'
completeURL = 'http://files.grouplens.org/datasets/movielens/ml-latest.zip'

Let's also import PySpark to our Python environment and and initiate a local SparkContext

In [2]:
import pyspark
sc = pyspark.SparkContext('local[*]') # [*] represents a local context i.e. no cluster

### Create Data Path 

We shall now define the download locations and filenames for both datasets. Let's create a folder "datasets" to download and save the zip files using [os.mkdir(path)](https://www.tutorialspoint.com/python/os_mkdir.htm). We also need to define paths for individual zip files for complete and small datasets. This can be achieved using [os.path.join()](https://code-maven.com/slides/python-programming/os-path-join) method. You need to import the `os` module into Python which provides directory system access, and save the zip files as **ml-latest.zip** and **ml-latest-small.zip**.



In [None]:
import os

datasetsPath = "datasets"

# create a directory "datasets"
os.mkdir(datasetsPath)

datasetsPath = os.path.join(dataPath)

In [4]:
# Create path variables with filenames for both datasets
completePath = os.path.join(datasetsPath, 'ml-latest.zip')
smallPath = os.path.join(datasetsPath, 'ml-latest-small.zip')

### Download Datasets
We can now move on and download both files from the server using URL provided above. 

Import `urllib.request` module in Python and use `urllib.request.urlretrieve(url, path)` with urls and paths given above for both datasets. A good resource on downloading files in python can be found [HERE](http://stackabuse.com/download-files-with-python/) 

In [None]:
import urllib.request

#Download the datasets, provide URL and local destination for each dataset
small = urllib.request.urlretrieve(smallURL, smallPath)
complete= urllib.request.urlretrieve(completeURL, completePath)

### Extract Datasets to Folders 

We need to unzip the small and full datasets into their own respective folders. Let's use Python's `zipfile` module to perform this task. Details on the `zipfile` module can be seen from [THIS](https://pymotw.com/2/zipfile/) resource.  

In [None]:
import zipfile

with zipfile.ZipFile(smallPath, "r") as z:
    z.extractall(datasetsPath)

with zipfile.ZipFile(completePath, "r") as z:
    z.extractall(datasetsPath)

## Dataset Parsing, Selection and Filtering

With our SparkContext initialized, and datasets in their respective locations, we can now parse these csv files and read them into RDDs. Each dataset contains following files:  

#### ratings.csv
Each line in the ratings dataset (ratings.csv) is formatted as:**userId, movieId, rating, timestamp**

#### movies.csv
Each line in the movies (movies.csv) dataset is formatted as:**movieId, title, genres**

Genres has the format:**Genre1|Genre2|Genre3...**

#### tags.csv
The tags file (tags.csv) has the format:**userId, movieId, tag, timestamp**

#### links.csv
Contains links to IMDB and has the format:**movieId, imdbId, tmdbId**

The complete dataset also contains other files which are not needed for this experiment. 

The format of these files is uniform and simple and such comma delimited files can be parsed line by line using Python `split()`  once they are loaded into RDDs. 

Let's first work with the small dataset to get parsing code in place. We shall first parse ratings and movies files into two RDDs. We also need to filter out the header row  in each file. We shall only use ratings.csv and movies.csv for building a simple recommendation system. 

>For each line in the ratings dataset, we create a tuple of (UserID, MovieID, Rating). We drop the timestamp because we do not need it for this recommender.

>For each line in the movies dataset, we create a tuple of (MovieID, Title). We drop the genres because we do not use them for this recommender.

In [5]:
# Create a path for identifying the ratings file 

smallRatingsPath = os.path.join(datasetsPath, 'ml-latest-small', 'ratings.csv')
smallRatingsPath

# 'datasets/ml-latest-small/ratings.csv'

'datasets/ml-latest-small/ratings.csv'

In [6]:
# Use .textFile() to read the raw contents of ratings file into an RDD
# read the first line of this RDD as a header and view header contents

smallRatingsRaw = sc.textFile(smallRatingsPath)
smallRatingsHeader = smallRatingsRaw.take(1)[0]
smallRatingsHeader

# 'userId,movieId,rating,timestamp'

'userId,movieId,rating,timestamp'

Now we can parse the raw data into a new RDD. Perform following transformations on `smallRatingsRaw`:

1. Use `.filter()` to exclude the header information collected above
2. Split each line of the csv file using `,` as the input argument with `split()` function
3. Collect the first three elements of each row (UserID, MovieID, Rating) and discard timetsep field.
4. Cache the final RDD

In [7]:
smallRatingsNoHeader= smallRatingsRaw.filter(lambda line: line != smallRatingsHeader )
smallRatingsSplit = smallRatingsNoHeader.map(lambda line: line.split(","))
smallRatingsRDD = smallRatingsSplit.map(lambda tokens: (tokens[0],tokens[1],tokens[2]))
smallRatingsRDD.cache()

PythonRDD[3] at RDD at PythonRDD.scala:49

In [8]:
# Show first five rows of the final RDD
smallRatingsRDD.take(5)

# [('1', '31', '2.5'),
#  ('1', '1029', '3.0'),
#  ('1', '1061', '3.0'),
#  ('1', '1129', '2.0'),
#  ('1', '1172', '4.0')]

[('1', '31', '2.5'),
 ('1', '1029', '3.0'),
 ('1', '1061', '3.0'),
 ('1', '1129', '2.0'),
 ('1', '1172', '4.0')]

We shall now proceed in a similar way with movies.csv file. Repeat following steps as performed above:
1. Create a path variable identifying the location of movies.csv
2. Read the text file into RDD
3. Exclude the header information
4. Split the line contents of the csv file
5. Read the contents of resulting RDD creating a (MovieID, Title) tuple and discard genres. 

In [9]:
smallMoviesPath = os.path.join(datasetsPath, 'ml-latest-small', 'movies.csv')

smallMoviesRaw = sc.textFile(smallMoviesPath)
smallMoviesHeader = smallMoviesRaw.take(1)[0]

smallMoviesRDD = smallMoviesRaw.filter(lambda line: line != smallMoviesHeader)\
                               .map(lambda line: line.split(","))\
                               .map(lambda tokens: (tokens[0],tokens[1]))\
                               .cache()
    
smallMoviesRDD.take(5)

# [('1', 'Toy Story (1995)'),
#  ('2', 'Jumanji (1995)'),
#  ('3', 'Grumpier Old Men (1995)'),
#  ('4', 'Waiting to Exhale (1995)'),
#  ('5', 'Father of the Bride Part II (1995)')]

[('1', 'Toy Story (1995)'),
 ('2', 'Jumanji (1995)'),
 ('3', 'Grumpier Old Men (1995)'),
 ('4', 'Waiting to Exhale (1995)'),
 ('5', 'Father of the Bride Part II (1995)')]

### Saving Pre-Processed Data (optional)

We can now save our preprocessed datasets. Create a folder "processed" and save smallMovieRDD and smallRatingsRDD using `RDD.saveAsTExtFile(location, filename)`. 

In [None]:
# Create a directory "processed" and store the preprocessed dataset RDDs as text files using .saveAsTExtFiles() method. 

processedPath = 'processed'
os.mkdir(processedPath)

smallMoviesRDD.saveAsTextFile(os.path.join(processedPath, 'smallMoviesRDD'))
smallRatingsRDD.saveAsTextFile(os.path.join(processedPath, 'smallRatingsRDD'))

## SUMMARY SECTION 1

In this lab, we perfomed data collection with some filtering to download the MovieLens dataset from the internet and storing these in their respective folders from within the python environment. We also looked at some basic filtering steps for feature selection for the movie recommendation system. This lab concludes by saving preprocessed dataset into text files, which will be used for training and evaluation in later labs. 

# Stage 2: Alternate Least Squares: Model Training and Evaluation

### Splitting the Data as Testing , Training and Validation Sets. 

We can now go ahead and split the data from training the recommendation system. We can use spark's `randomsplit()` transformation that uses given weights to split an rdd into any number of sub-RDDs. The standared usage of this transformation function is :
> `RDD.randomSplit(weights, seed=None)`

where:
weights – weights for splits, will be normalized if they don’t sum to 1

seed – random seed

Remember that the exact number of entries in each dataset varies slightly due to the random nature of the randomSplit() transformation.

Let's split the smallRatingsRDD into training, testing and validation RDDs using weights 6,2,2 respectively. Perform a `.count` on resulting datasets to view the count of each RDD. 

In [10]:
# Split ratingsRDD into training, validation and testing RDDs.
trainRDD, validRDD, testRDD = smallRatingsRDD.randomSplit([6, 2, 2], seed=100)

trainRDD.count(), testRDD.count(), validRDD.count()

# (59902, 19991, 20111) > trainRDD, validRDD ,testRDD

(59907, 19993, 20104)

For prediction, we need customerID and movieID from validation and test RDDs respectively. Let's map these values into two new RDDs which will be used for training and validation purpose. We shall ignore the actual ratings values for these RDDs. 

In [11]:
validPredictionRDD = validRDD.map(lambda x: (x[0], x[1]))
testPredictionRDD = testRDD.map(lambda x: (x[0], x[1]))

validPredictionRDD.take(3), testPredictionRDD.take(3)

([('1', '31'), ('1', '1061'), ('1', '1129')],
 [('1', '1287'), ('1', '1293'), ('1', '1339')])

## Collaborative Filtering

Collaborative filtering allows us to make predictions (filtering) about the interests of a user by collecting preferences or taste information from many users (collaborating). 

The underlying assumption is that if a user A has the same opinion as a user B on an issue, A is more likely to have B's opinion on a different issue x than to have the opinion on x of a user chosen randomly. Following gif (from [Wikipedia](https://en.wikipedia.org/wiki/Collaborative_filtering)) shows an example of collaborative filtering. At first, people rate different items (like videos, images, games). Then, the system makes predictions about a user's rating for an item not rated yet. The new predictions are built upon the existing ratings of other users with similar ratings with the active user. In the image, the system predicts that the user will not like the video.
![](https://upload.wikimedia.org/wikipedia/commons/5/52/Collaborative_filtering.gif)


Spark MLlib library for Machine Learning provides a Collaborative Filtering implementation by using Alternating Least Squares. 

## Alternate Least Squares in Spark

The ALS implementation for collaborative filtering in MLlib has the following parameters:

* `numBlocks` is the number of blocks used to parallelize computation **(set to -1 to auto-configure)**
* `rank` is the number of latent factors in the model. **(use the list [2,4,6,8,10] as rank values)**
* `iterations` is the number of iterations to run. **(set to 15)**
* `lambda` specifies the regularization parameter in ALS.**(set to 0.1)**
* `implicitPrefs` specifies whether to use the explicit feedback ALS variant or one adapted for implicit feedback data.**(use default)**
* `alpha` is a parameter applicable to the implicit feedback variant of ALS that governs the baseline confidence in preference observations.**(use default)**

Details on spark's ALS implementation can be viewed [HERE](https://spark.apache.org/docs/2.2.0/mllib-collaborative-filtering.html)



We shall now import the ALS algorithm rom spark's machine learning library `mllib` and set the learning and training parameter values as shown above. 

### Model Training and Validation for hyper-parameter optimization

We can now start our training process using above parameter values which would include following steps: 

* Run the training for each of the rank values in our `ranks` list inside a for loop.
* Train the model using trainRDD, rank, seed, iterations and lambda value as model parameters. 
* Validate the trained model by predicting ratings for `validPredictionRDD`.
* Compare predicted ratings to actual ratings. 
* calculate error as RMSE for each rank. 

For sake of simplicity, we shall repeat training process for changing ranks value **only**. Other values can also be changed as a detailed grid search for improved predictive performance. 

In [15]:
# Import ALS from spark's mllib
from pyspark.mllib.recommendation import ALS
import math

# set learning parameters 
seed = 20
iterations = 15
lambdaRegParam = 0.1
ranks = [2, 4, 6, 8, 10]
errors = [0, 0, 0, 0, 0] # initialize a matrix for storing error values
err = 0 # iterator for above list 
#tolerance = 0.02

# Set training parameters
minError = float('inf')
bestRank = -1
bestIteration = -1


for rank in ranks:
    
    # Train the model using trainRDD, rank, seed, iterations and lambda value as model parameters
    movieRecModel = ALS.train(trainRDD, 
                      rank = rank, 
                      seed = seed, 
                      iterations = iterations,
                      lambda_ = lambdaRegParam)
    
    # Use the trained model to predict the ratings from validPredictionRDD using model.predictAll()
    predictions = movieRecModel.predictAll(validPredictionRDD).map(lambda r: ((r[0], r[1]), r[2]))

    # Compare predicted ratings and actual ratings in validRDD
    ratingsAndPreds = validRDD.map(lambda r: ((int(r[0]), int(r[1])), float(r[2]))).join(predictions)
    
    # Calculate RMSE error for the difference between ratings and predictions
    error = math.sqrt(ratingsAndPreds.map(lambda r: (r[1][0] - r[1][1])**2).mean())
    
    # save error into errors array
    errors[err] = error
    err += 1
    
    print ('For rank %s the RMSE is %s' % (rank, error))
    
    # Check for best error and rank values
    if error < minError:
        minError = error
        bestRank = rank

print ('The best model was trained with rank %s' % bestRank)

For rank 2 the RMSE is 0.9373037986511492
For rank 4 the RMSE is 0.9379199229901556
For rank 6 the RMSE is 0.937083572495462
For rank 8 the RMSE is 0.9418894910915557
For rank 10 the RMSE is 0.9476290426992048
The best model was trained with rank 6


### The Learning Phase

Let's have a look at the predictions the model generated during last validation stage. 

In [16]:
# take 3 elements from the predictions RDD
predictions.take(3)

# [((86, 1084), 3.86742553170992),
#  ((242, 1084), 4.535501401736902),
#  ((575, 1084), 3.6610691865669773)]

[((86, 1084), 3.86742553170992),
 ((242, 1084), 4.535501401736902),
 ((575, 1084), 3.6610691865669773)]

The output shows we have the `((UserID,  MovieID), Rating)` tuple, similar to the ratings dataset. The `Ratings` field in the predictions RDD refer to the ratings predicted by the trained ALS model. 

Then we join these predictions with our validation data and the result looks as follows:

In [17]:
# take 3 elements from the ratingsAndPredsRDD
ratingsAndPreds.take(3)

# [((1, 1061), (3.0, 2.699500607996266)),
#  ((1, 1129), (2.0, 2.3323779150221435)),
#  ((2, 144), (3.0, 2.7568272196178905))]

[((1, 1061), (3.0, 2.699500607996266)),
 ((1, 1129), (2.0, 2.3323779150221435)),
 ((2, 144), (3.0, 2.7568272196178905))]

This output shows the format `((UserId, MovieId), Ratings, PredictedRatings)`. 

We then calculated the RMSE by taking the squred difference and calculating the mean value as our `error` value.

### Testing the Model

We shall now test the model with test dataset hich has been kept away from the learning phase upto this point. 
Use following parameters:
* USe `trainRDD` for training the model.
* Use `bestRank` value learnt during the validation phase.
* Use other parameter values same as above. 
* Generate predictions with `testPredictionRDD`
* Calculate error between predicted values and ground truth.

In [19]:
# Train and test the model with selected parameter bestRank

movieRecModel = ALS.train(trainRDD, 
                           bestRank, 
                           seed=seed, 
                           iterations=iterations,
                           lambda_=lambdaRegParam)

# Calculate predictions for testPredictionRDD
predictions = movieRecModel.predictAll(testPredictionRDD).map(lambda r: ((r[0], r[1]), r[2]))

# Combine real ratings and predictions
ratingsAndPreds = testRDD.map(lambda r: ((int(r[0]), int(r[1])), float(r[2]))).join(predictions)

# Calculate RMSE
error = math.sqrt(ratingsAndPreds.map(lambda r: (r[1][0] - r[1][1])**2).mean())
    
print ('For testing data the RMSE is %s' % (error))

For testing data the RMSE is 0.9477724241906258


We can see that our testing error is slightly higher than our validation error which is an expected behaviour. Due to probablistic nature of ALS algorithm, changing the seed value will also show somefluctuations in these values. 

## Training with Complete MovieLens Database

After identifying the optimum parameter values (ranks in our case), we will use the complete dataset to build our final model. Therefore, we need to process it the same way we did with the small dataset.

In [30]:
# Load the complete dataset ratings file (will be slow)
MlRatingsFile = os.path.join(datasetsPath, 'ml-latest', 'ratings.csv')
MlRatingsRaw= sc.textFile(MlRatingsFile)

MlRatingsHeader = MlRatingsRaw.take(1)[0]

# Parse the Ratings into the RDD
MlRatingsRDD = MlRatingsRaw.filter(lambda line: line != MlRatingsHeader)\
                           .map(lambda line: line.split(","))\
                           .map(lambda tokens: (int(tokens[0]),int(tokens[1]),float(tokens[2])))\
                           .cache()
    
print ("There are %s recommendations in the complete dataset" % (MlRatingsRDD.count()))

There are 26024289 recommendations in the complete dataset


Perform a 70/30 train test split on `MlRatingsRDD` using `.randomsplit()` and train the model with complete ML datset.

In [36]:
# PErform a 70/30 train test split on full ratings RDD
trainRDD, testRDD = MlRatingsRDD.randomSplit([7, 3], seed=200)

# Train the model with trainRDD
MlModel = ALS.train(trainRDD, 
                    rank = bestRank, 
                    seed = seed, 
                    iterations = iterations, 
                    lambda_ = lambdaRegParam)

We can now perform testing similar to the one shown above, using complete ML test dataset . 



In [37]:
# Use the complete trained model to predict with complete testRDD and calculate RMSE exactly as before. 
testPredictionRDD = testRDD.map(lambda x: (x[0], x[1]))

predictions = MlModel.predictAll(testPredictionRDD).map(lambda r: ((r[0], r[1]), r[2]))
ratingsAndPreds = testRDD.map(lambda r: ((int(r[0]), int(r[1])), float(r[2]))).join(predictions)
error = math.sqrt(ratingsAndPreds.map(lambda r: (r[1][0] - r[1][1])**2).mean())
    
print ('For testing data the RMSE is %s' % (error))

# For testing data the RMSE is 0.8213246944974424

For testing data the RMSE is 0.8213246944974424


We can see now that we have much higher accuracy than small dataset. The primary reason for this improvement is using large amounts of data for training the model. 

# Stage 3: Making Recommendations

When using collaborative filtering, getting recommendations is not as simple as predicting for the new entries using a previously generated model. Instead, we need to train again the model but including the new user preferences in order to compare them with other users in the dataset. That is, the recommender needs to be trained every time we have new user ratings (although a single model can be used by multiple users of course!). This makes the process expensive, and it is one of the reasons why scalability is a problem (and Spark a solution!). Once we have our model trained, we can reuse it to obtain top recomendations for a given user or an individual rating for a particular movie. These are less costly operations than training the model itself.

So let's first load the movies complete file for later use.

In [None]:
complete_movies_file = os.path.join(datasets_path, 'ml-latest', 'movies.csv')
complete_movies_raw_data = sc.textFile(complete_movies_file)
complete_movies_raw_data_header = complete_movies_raw_data.take(1)[0]

# Parse
complete_movies_data = complete_movies_raw_data.filter(lambda line: line!=complete_movies_raw_data_header)\
    .map(lambda line: line.split(",")).map(lambda tokens: (int(tokens[0]),tokens[1],tokens[2])).cache()

complete_movies_titles = complete_movies_data.map(lambda x: (int(x[0]),x[1]))
    
print ("There are %s movies in the complete dataset" % (complete_movies_titles.count()))

In [None]:
def get_counts_and_averages(ID_and_ratings_tuple):
    nratings = len(ID_and_ratings_tuple[1])
    return ID_and_ratings_tuple[0], (nratings, float(sum(x for x in ID_and_ratings_tuple[1]))/nratings)

movie_ID_with_ratings_RDD = (complete_ratings_data.map(lambda x: (x[1], x[2])).groupByKey())
movie_ID_with_avg_ratings_RDD = movie_ID_with_ratings_RDD.map(get_counts_and_averages)
movie_rating_counts_RDD = movie_ID_with_avg_ratings_RDD.map(lambda x: (x[0], x[1][0]))


In [None]:
new_user_ID = 0

# The format of each line is (userID, movieID, rating)

new_user_ratings = [
     (0,260,4), # Star Wars (1977)
     (0,1,3), # Toy Story (1995)
     (0,16,3), # Casino (1995)
     (0,25,4), # Leaving Las Vegas (1995)
     (0,32,4), # Twelve Monkeys (a.k.a. 12 Monkeys) (1995)
     (0,335,1), # Flintstones, The (1994)
     (0,379,1), # Timecop (1994)
     (0,296,3), # Pulp Fiction (1994)
     (0,858,5) , # Godfather, The (1972)
     (0,50,4) # Usual Suspects, The (1995)
    ]
new_user_ratings_RDD = sc.parallelize(new_user_ratings)
print ('New user ratings: %s' % new_user_ratings_RDD.take(10))

In [None]:
complete_data_with_new_ratings_RDD = complete_ratings_data.union(new_user_ratings_RDD)

In [None]:
from time import time

t0 = time()
new_ratings_model = ALS.train(complete_data_with_new_ratings_RDD, best_rank, seed=seed, 
                              iterations=iterations, lambda_=regularization_parameter)
tt = time() - t0

print ("New model trained in %s seconds" % round(tt,3))

In [None]:
new_user_ratings_ids = map(lambda x: x[1], new_user_ratings) # get just movie IDs
# keep just those not on the ID list 
new_user_unrated_movies_RDD = (complete_movies_data.filter(lambda x: x[0] not in new_user_ratings_ids).map(lambda x: (new_user_ID, x[0])))

# Use the input RDD, new_user_unrated_movies_RDD, with new_ratings_model.predictAll() to predict new ratings for the movies
new_user_recommendations_RDD = new_ratings_model.predictAll(new_user_unrated_movies_RDD)

In [None]:
# Transform new_user_recommendations_RDD into pairs of the form (Movie ID, Predicted Rating)
new_user_recommendations_rating_RDD = new_user_recommendations_RDD.map(lambda x: (x.product, x.rating))
new_user_recommendations_rating_title_and_count_RDD = \
    new_user_recommendations_rating_RDD.join(complete_movies_titles).join(movie_rating_counts_RDD)
new_user_recommendations_rating_title_and_count_RDD.take(3)

In [None]:
new_user_recommendations_rating_title_and_count_RDD = \
    new_user_recommendations_rating_title_and_count_RDD.map(lambda r: (r[1][0][1], r[1][0][0], r[1][1]))

In [None]:
top_movies = new_user_recommendations_rating_title_and_count_RDD.filter(lambda r: r[2]>=25).takeOrdered(25, key=lambda x: -x[1])

print ('TOP recommended movies (with more than 25 reviews):\n%s' %
        '\n'.join(map(str, top_movies)))

In [None]:
my_movie = sc.parallelize([(0, 500)]) # Quiz Show (1994)
individual_movie_rating_RDD = new_ratings_model.predictAll(new_user_unrated_movies_RDD)
individual_movie_rating_RDD.take(5)