# PySpark - ML model

This notebook is based on https://github.com/jadianes/spark-py-notebooks

It is about to build a movie recommendation model (Collaborative Filtering) using public MovieLens dataset.

## Getting data

GroupLens Research has collected and made available rating data sets from the MovieLens website. The data sets were collected over various periods of time, depending on the size of the set. 

In our case, we will use the lastest datasets:
- Small dataset 
- Full dataset

In [None]:
# links of datasets
complete_dataset_url = 'http://files.grouplens.org/datasets/movielens/ml-latest.zip'
small_dataset_url = 'http://files.grouplens.org/datasets/movielens/ml-latest-small.zip'

# locations' paths store datasets
import os
datasets_path = os.path.join('C:/Users/xuand', 'datasets')

complete_dataset_path = os.path.join(datasets_path, 'ml-latest.zip')
small_dataset_path = os.path.join(datasets_path, 'ml-latest-small.zip')

# download datasets
import urllib.request

small_f = urllib.request.urlretrieve (small_dataset_url, small_dataset_path)
complete_f = urllib.request.urlretrieve (complete_dataset_url, complete_dataset_path)

# Extract these downloaded files into its individual folders
import zipfile

with zipfile.ZipFile(small_dataset_path, 'r') as z:
    z.extractall(datasets_path)
with zipfile.ZipFile(complete_dataset_path, 'r') as z:
    z.extractall(datasets_path)    

## Create RDD files

### Create a PySpark session

In [1]:
import findspark
findspark.init()

import pyspark
from pyspark.sql import SparkSession
conf = pyspark.SparkConf().setAppName('MLmodel-PySpark').setMaster('local')
sc = pyspark.SparkContext(conf=conf)
spark = SparkSession(sc)

### Create RDD files for ratings file

In [5]:
# create RDD files
import os
datasets_path = os.path.join('C:/Users/xuand', 'datasets')
small_ratings_file = os.path.join(datasets_path, 'ml-latest-small', 'ratings.csv')

small_ratings_raw_data = sc.textFile(small_ratings_file)
small_ratings_raw_data_header = small_ratings_raw_data.take(1)[0]

small_ratings_data = small_ratings_raw_data.filter(lambda line: line != small_ratings_raw_data_header).map(lambda line: line.split(',')).map(lambda tokens: (tokens[0], tokens[1], tokens[2])).cache()
small_ratings_data.take(3)

[('1', '1', '4.0'), ('1', '3', '4.0'), ('1', '6', '4.0')]

### Create RDD files for movies file

In [7]:
small_movies_file = os.path.join(datasets_path, 'ml-latest-small', 'movies.csv')

small_movies_raw_data = sc.textFile(small_movies_file)
small_movies_raw_data_header = small_movies_raw_data.take(1)[0]

small_movies_data = small_movies_raw_data.filter(lambda line: line != small_movies_raw_data_header).map(lambda line: line.split(',')).map(lambda tokens: (tokens[0], tokens[1])).cache()
small_movies_data.take(3)

[('1', 'Toy Story (1995)'),
 ('2', 'Jumanji (1995)'),
 ('3', 'Grumpier Old Men (1995)')]

## Collaborative filtering (CF)
In CF, we make predictions (filtering) about the interests of a user by collecting preferences or taste information from many users (collaborating). The underlying assumption is that if a user A has the same option as a user B on an issue, A is more likely to have B's opinion on a different issue x than to have the opinion on x of a user chosen randomly.

The image below shows an example of CF. At first, people rate different items, then the system makes predictions about a user's rating for an item not rated yet. The new predictions are built upon the existing ratings of other users with similar ratings with the active user. In the image, the system predicts that the user will not like the video.

### CF in MLlib 
Spark MLlib library for ML provides CF implementation by using Alternating Least Squares (ALS). The implementation in MLlib has the following parameters:

- numBlocks is the number of blocks used to parallelize computaiton (set to -1 to auto-configure)
- rank is the number of latent factors in the model
- iterations is the number of iterations to run
- lambda specifies the regularization parameter in ALS
- implicitPrefs specifies whether to use the explicit feedback ALS variant or one adapted for implicit feedback data
- alpha is a parameter applicable to the implicit feedback variant of ALS that governs the baseline confidence in preference observations.

In [8]:
from pyspark.mllib.recommendation import ALS
import math

### Using small dataset to select ALS parameters

In [9]:
# split dataset into train, validation, and test datasets
training_RDD, validation_RDD, test_RDD = small_ratings_data.randomSplit([6,2,2], seed=0)
validation_for_predict_RDD = validation_RDD.map(lambda x: (x[0], x[1]))
test_for_predict_RDD = test_RDD.map(lambda x: (x[0], x[1]))

seed = 5
iterations = 10
regularization_parameter = 0.1
ranks = [4, 8, 12]
errors = [0, 0, 0]
err = 0
tolerance = 0.02

min_error = float('inf')
best_rank = -1
best_iteration = -1
for rank in ranks:
    model = ALS.train(training_RDD, rank, seed=seed, iterations=iterations,
                      lambda_=regularization_parameter)
    predictions = model.predictAll(validation_for_predict_RDD).map(lambda r: ((r[0], r[1]), r[2]))
    rates_and_preds = validation_RDD.map(lambda r: ((int(r[0]), int(r[1])), float(r[2]))).join(predictions)
    error = math.sqrt(rates_and_preds.map(lambda r: (r[1][0] - r[1][1])**2).mean())
    errors[err] = error
    err += 1
    print('For rank %s the RMSE is %s' % (rank, error))
    if error < min_error:
        min_error = error
        best_rank = rank

print('The best model was trained with rank %s' %best_rank)

For rank 4 the RMSE is 0.8973056100718643
For rank 8 the RMSE is 0.9143149069672253
For rank 12 the RMSE is 0.9141049207539428
The best model was trained with rank 4


In [11]:
# test the selected model
model = ALS.train(training_RDD, best_rank, seed=seed, iterations=iterations,lambda_=regularization_parameter)
predictions = model.predictAll(test_for_predict_RDD).map(lambda r: ((r[0], r[1]), r[2]))
rates_and_preds = test_RDD.map(lambda r: ((int(r[0]), int(r[1])), float(r[2]))).join(predictions)
error = math.sqrt(rates_and_preds.map(lambda r: (r[1][0] - r[1][1])**2).mean())

print('for testing data, the RMSE is %s' %error)

for testing data, the RMSE is 0.9041156142407426


### Using the complete dataset to build the final model

In [12]:
# load the complete dataset file
complete_ratings_file = os.path.join(datasets_path, 'ml-latest', 'ratings.csv')
complete_ratings_raw_data = sc.textFile(complete_ratings_file)
complete_ratings_raw_data_header = complete_ratings_raw_data.take(1)[0]

# parse
complete_ratings_data = complete_ratings_raw_data.filter(lambda line: line != complete_ratings_raw_data_header).map(lambda line: line.split(',')).map(lambda tokens: (int(tokens[0]), int(tokens[1]), float(tokens[2]))).cache()
print('There are %s recommendations in the complete dataset' %(complete_ratings_data.count()))

# split dataset into train and test datasets
training_RDD, test_RDD = complete_ratings_data.randomSplit([7, 3], seed=0)

complete_model = ALS.train(training_RDD, best_rank, seed=seed,iterations=iterations, lambda_=regularization_parameter)

# test the selected model
test_for_predict_RDD = test_RDD.map(lambda x: (x[0], x[1]))

predictions = complete_model.predictAll(test_for_predict_RDD).map(lambda r: ((r[0], r[1]), r[2]))
rates_and_preds = test_RDD.map(lambda r: ((int(r[0]), int(r[1])), float(r[2]))).join(predictions)
error = math.sqrt(rates_and_preds.map(lambda r: (r[1][0] - r[1][1])**2).mean())
    
print('For testing data the RMSE is %s' % (error))

There are 27753444 recommendations in the complete dataset
For testing data the RMSE is 0.8334782410156314


### Adding new user ratings

We will put them in a new RDD and we will use the user ID 0, that is not assigned in the MovieLens dataset. 

In [13]:
new_user_ID = 0

# The format of each line is (userID, movieID, rating)
new_user_ratings = [
     (0,260,9), # Star Wars (1977)
     (0,1,8), # Toy Story (1995)
     (0,16,7), # Casino (1995)
     (0,25,8), # Leaving Las Vegas (1995)
     (0,32,9), # Twelve Monkeys (a.k.a. 12 Monkeys) (1995)
     (0,335,4), # Flintstones, The (1994)
     (0,379,3), # Timecop (1994)
     (0,296,7), # Pulp Fiction (1994)
     (0,858,10) , # Godfather, The (1972)
     (0,50,8) # Usual Suspects, The (1995)
    ]
new_user_ratings_RDD = sc.parallelize(new_user_ratings)
print('New user ratings: %s' % new_user_ratings_RDD.take(10))

# add new user to the data
complete_data_with_new_ratings_RDD = complete_ratings_data.union(new_user_ratings_RDD)

# train ALS model using new data set
new_ratings_model = ALS.train(complete_data_with_new_ratings_RDD, best_rank, seed=seed, 
                              iterations=iterations, lambda_=regularization_parameter)

New user ratings: [(0, 260, 9), (0, 1, 8), (0, 16, 7), (0, 25, 8), (0, 32, 9), (0, 335, 4), (0, 379, 3), (0, 296, 7), (0, 858, 10), (0, 50, 8)]


It took some time. We will need to repeat that every time a user add new ratings. Ideally we will do this in batches, and not for every single rating that comes into the system for every user.

### Getting top recommendations
Let's now get some recommendations! For that we will get an RDD with all the movies the new user hasn't rated yet. We will them together with the model to predict ratings.

In [15]:
# load and process movies file
complete_movies_file = os.path.join(datasets_path, 'ml-latest', 'movies.csv')
complete_movies_raw_data = sc.textFile(complete_movies_file)
complete_movies_raw_data_header = complete_movies_raw_data.take(1)[0]

# Parse
complete_movies_data = complete_movies_raw_data.filter(lambda line: line!=complete_movies_raw_data_header)\
    .map(lambda line: line.split(",")).map(lambda tokens: (int(tokens[0]),tokens[1],tokens[2])).cache()

complete_movies_titles = complete_movies_data.map(lambda x: (int(x[0]),x[1]))
    
print("There are %s movies in the complete dataset" % (complete_movies_titles.count()))

def get_counts_and_averages(ID_and_ratings_tuple):
    nratings = len(ID_and_ratings_tuple[1])
    return ID_and_ratings_tuple[0], (nratings, float(sum(x for x in ID_and_ratings_tuple[1]))/nratings)

movie_ID_with_ratings_RDD = (complete_ratings_data.map(lambda x: (x[1], x[2])).groupByKey())
movie_ID_with_avg_ratings_RDD = movie_ID_with_ratings_RDD.map(get_counts_and_averages)
movie_rating_counts_RDD = movie_ID_with_avg_ratings_RDD.map(lambda x: (x[0], x[1][0]))

# new_user_recommendations_RDD
new_user_ratings_ids = map(lambda x: x[1], new_user_ratings) # get just movie IDs
# keep just those not on the ID list (thanks Lei Li for spotting the error!)
new_user_unrated_movies_RDD = (complete_movies_data.filter(lambda x: x[0] not in new_user_ratings_ids).map(lambda x: (new_user_ID, x[0])))

# Use the input RDD, new_user_unrated_movies_RDD, with new_ratings_model.predictAll() to predict new ratings for the movies
new_user_recommendations_RDD = new_ratings_model.predictAll(new_user_unrated_movies_RDD)

# Transform new_user_recommendations_RDD into pairs of the form (Movie ID, Predicted Rating)
new_user_recommendations_rating_RDD = new_user_recommendations_RDD.map(lambda x: (x.product, x.rating))
new_user_recommendations_rating_title_and_count_RDD = \
    new_user_recommendations_rating_RDD.join(complete_movies_titles).join(movie_rating_counts_RDD)
new_user_recommendations_rating_title_and_count_RDD.take(3)

new_user_recommendations_rating_title_and_count_RDD = \
    new_user_recommendations_rating_title_and_count_RDD.map(lambda r: (r[1][0][1], r[1][0][0], r[1][1]))

There are 58098 movies in the complete dataset


Finally, get the highest rated recommendations for the new user, filtering out movies with less than 10 ratings.

In [16]:
top_movies = new_user_recommendations_rating_title_and_count_RDD.filter(lambda r: r[2]>=25).takeOrdered(10, key=lambda x: -x[1])

print ('TOP recommended movies (with more than 25 reviews):\n%s' %
        '\n'.join(map(str, top_movies)))

TOP recommended movies (with more than 25 reviews):
('Rabbit of Seville (1950)', 8.60283332885939, 30)
("Jim Henson's The Storyteller (1989)", 8.544291232021493, 36)
('"Godfather', 8.52628915469041, 60904)
('Cosmos', 8.52467364564589, 157)
('Death on the Staircase (SoupÃ§ons) (2004)', 8.521656039332541, 130)
('Music for One Apartment and Six Drummers (2001)', 8.502821262499474, 31)
('"Human Condition III', 8.495127363853982, 91)
('Harakiri (Seppuku) (1962)', 8.494578606156566, 679)
('"I', 8.424360308543928, 85)
('Planet Earth (2006)', 8.424323803290726, 1384)


Getting individual rating for a particular movie for a given user.

In [17]:
my_movie = sc.parallelize([(0, 500)]) # Quiz Show (1994)
individual_movie_rating_RDD = new_ratings_model.predictAll(new_user_unrated_movies_RDD)
individual_movie_rating_RDD.take(1)

[Rating(user=0, product=116688, rating=2.070225567265668)]