## Collaborative Filtering: Alternating Least Squares (ALS)

In [1]:
from pyspark.sql import SparkSession
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
import pyspark.sql.functions as F
from pyspark import SparkConf, SparkContext

In [2]:
# Column names for the dataset
COL_USER = "userId"
COL_ITEM = "movieId"
COL_RATING = "rating"
COL_TIMESTAMP = "timestamp"

In [3]:
spark = SparkSession.builder.master("local[1]").appName('movielens2').getOrCreate()
# spark.sparkContext.stop()
sc = spark.sparkContext

## Loading and parsing datasets


Each line in the ratings dataset (`ratings.csv`) is formatted as:
`userId,movieId,rating,timestamp`

Each line in the movies (`movies.csv`) dataset is formatted as:
`movieId,title,genres`

Genres has the format:
`Genre1|Genre2|Genre3...`

The tags file (`tags.csv`) has the format:
`userId,movieId,tag,timestamp`

And finally, the `links.csv` file has the format:
`movieId,imdbId,tmdbId`

### *Ratings* data

In [4]:
small_ratings_file = './ml-latest-small/ratings.csv'
small_ratings_raw_data = sc.textFile(small_ratings_file)
small_ratings_raw_data.collect()

['userId,movieId,rating,timestamp',
 '1,1,4.0,964982703',
 '1,3,4.0,964981247',
 '1,6,4.0,964982224',
 '1,47,5.0,964983815',
 '1,50,5.0,964982931',
 '1,70,3.0,964982400',
 '1,101,5.0,964980868',
 '1,110,4.0,964982176',
 '1,151,5.0,964984041',
 '1,157,5.0,964984100',
 '1,163,5.0,964983650',
 '1,216,5.0,964981208',
 '1,223,3.0,964980985',
 '1,231,5.0,964981179',
 '1,235,4.0,964980908',
 '1,260,5.0,964981680',
 '1,296,3.0,964982967',
 '1,316,3.0,964982310',
 '1,333,5.0,964981179',
 '1,349,4.0,964982563',
 '1,356,4.0,964980962',
 '1,362,5.0,964982588',
 '1,367,4.0,964981710',
 '1,423,3.0,964982363',
 '1,441,4.0,964980868',
 '1,457,5.0,964981909',
 '1,480,4.0,964982346',
 '1,500,3.0,964981208',
 '1,527,5.0,964984002',
 '1,543,4.0,964981179',
 '1,552,4.0,964982653',
 '1,553,5.0,964984153',
 '1,590,4.0,964982546',
 '1,592,4.0,964982271',
 '1,593,4.0,964983793',
 '1,596,5.0,964982838',
 '1,608,5.0,964982931',
 '1,648,3.0,964982563',
 '1,661,5.0,964982838',
 '1,673,3.0,964981775',
 '1,733,4.0,9

In [5]:
small_ratings_raw_data_header = 'userId,movieId,rating,timestamp'

In [6]:
small_ratings_data = small_ratings_raw_data.filter(lambda line: line!=small_ratings_raw_data_header)\
    .map(lambda line: line.split(",")).map(lambda tokens: (tokens[0],tokens[1],tokens[2])).cache()

In [7]:
small_ratings_data.take(3)

[('1', '1', '4.0'), ('1', '3', '4.0'), ('1', '6', '4.0')]

### *Movies* data

Proceed with the `movies.csv` file:

In [8]:
small_movies_file = './ml-latest-small/movies.csv'

small_movies_raw_data = sc.textFile(small_movies_file)
small_movies_raw_data_header = small_movies_raw_data.take(1)[0]

small_movies_data = small_movies_raw_data.filter(lambda line: line!=small_movies_raw_data_header)\
    .map(lambda line: line.split(",")).map(lambda tokens: (tokens[0],tokens[1])).cache()
    
# small_movies_data.take(3)

In [9]:
small_movies_data.collect()

[('1', 'Toy Story (1995)'),
 ('2', 'Jumanji (1995)'),
 ('3', 'Grumpier Old Men (1995)'),
 ('4', 'Waiting to Exhale (1995)'),
 ('5', 'Father of the Bride Part II (1995)'),
 ('6', 'Heat (1995)'),
 ('7', 'Sabrina (1995)'),
 ('8', 'Tom and Huck (1995)'),
 ('9', 'Sudden Death (1995)'),
 ('10', 'GoldenEye (1995)'),
 ('11', '"American President'),
 ('12', 'Dracula: Dead and Loving It (1995)'),
 ('13', 'Balto (1995)'),
 ('14', 'Nixon (1995)'),
 ('15', 'Cutthroat Island (1995)'),
 ('16', 'Casino (1995)'),
 ('17', 'Sense and Sensibility (1995)'),
 ('18', 'Four Rooms (1995)'),
 ('19', 'Ace Ventura: When Nature Calls (1995)'),
 ('20', 'Money Train (1995)'),
 ('21', 'Get Shorty (1995)'),
 ('22', 'Copycat (1995)'),
 ('23', 'Assassins (1995)'),
 ('24', 'Powder (1995)'),
 ('25', 'Leaving Las Vegas (1995)'),
 ('26', 'Othello (1995)'),
 ('27', 'Now and Then (1995)'),
 ('28', 'Persuasion (1995)'),
 ('29', '"City of Lost Children'),
 ('30', 'Shanghai Triad (Yao a yao yao dao waipo qiao) (1995)'),
 ('31', 

## Collaborative Filtering

### Selecting ALS parameters using the small dataset
In Collaborative filtering we make predictions (filtering) about the interests of a user by collecting preferences or taste information from many users (collaborating). The underlying assumption is that if a user A has the same opinion as a user B on an issue, A is more likely to have B's opinion on a different issue x than to have the opinion on x of a user chosen randomly.

The image below (from [Wikipedia](https://en.wikipedia.org/?title=Collaborative_filtering)) shows an example of collaborative filtering. At first, people rate different items (like videos, images, games). Then, the system makes predictions about a user's rating for an item not rated yet. The new predictions are built upon the existing ratings of other users with similar ratings with the active user. In the image, the system predicts that the user will not like the video.
<p><a href="https://commons.wikimedia.org/wiki/File:Collaborative_filtering.gif#/media/File:Collaborative_filtering.gif"><img src="https://upload.wikimedia.org/wikipedia/commons/5/52/Collaborative_filtering.gif" alt="Collaborative filtering.gif" width="498" height="480"></a><br><a href="https://creativecommons.org/licenses/by-sa/3.0" title="Creative Commons Attribution-Share Alike 3.0">CC BY-SA 3.0</a>, <a href="https://commons.wikimedia.org/w/index.php?curid=24097346">Link</a></p>


Spark MLlib library for Machine Learning provides a Collaborative Filtering implementation by using Alternating Least Squares. The implementation in MLlib has the following parameters:
- `numBlocks` is the number of blocks used to parallelize computation (set to -1 to auto-configure).
- `rank` is the number of latent factors in the model.
- `iterations` is the number of iterations to run.
- `lambda` specifies the regularization parameter in ALS.
- `implicitPrefs` specifies whether to use the explicit feedback ALS variant or one adapted for implicit feedback data.
- `alpha` is a parameter applicable to the implicit feedback variant of ALS that governs the baseline confidence in preference observations.

First, split the dataset into train, validation, and test datasets.

In [10]:
training_RDD, validation_RDD, test_RDD = small_ratings_data.randomSplit([6, 2, 2], seed=0)
validation_for_predict_RDD = validation_RDD.map(lambda x: (x[0], x[1]))
test_for_predict_RDD = test_RDD.map(lambda x: (x[0], x[1]))

Proceed with the training phase:

In [11]:
from pyspark.mllib.recommendation import ALS
import math

seed = 5
iterations = 10
regularization_parameter = 0.1
ranks = [4, 8, 12]
errors = [0, 0, 0]
err = 0
tolerance = 0.02

min_error = float('inf')
best_rank = -1
best_iteration = -1
for rank in ranks:
    model = ALS.train(training_RDD, rank, seed=seed, iterations=iterations,
                      lambda_=regularization_parameter)
    predictions = model.predictAll(validation_for_predict_RDD).map(lambda r: ((r[0], r[1]), r[2]))
    rates_and_preds = validation_RDD.map(lambda r: ((int(r[0]), int(r[1])), float(r[2]))).join(predictions)
    error = math.sqrt(rates_and_preds.map(lambda r: (r[1][0] - r[1][1])**2).mean())
    errors[err] = error
    err += 1
    print(f'For rank {rank} the RMSE is {error}')
    if error < min_error:
        min_error = error
        best_rank = rank

print ('The best model was trained with rank', best_rank)

For rank 4 the RMSE is 0.8973056095511496
For rank 8 the RMSE is 0.9143149057530106
For rank 12 the RMSE is 0.9141049213609704
The best model was trained with rank 4


In [12]:
model = ALS.train(training_RDD, best_rank, seed=seed, iterations=iterations,
                      lambda_=regularization_parameter)
# model.save(sc, './models')
predictions = model.predictAll(test_for_predict_RDD).map(lambda r: ((r[0], r[1]), r[2]))
rates_and_preds = test_RDD.map(lambda r: ((int(r[0]), int(r[1])), float(r[2]))).join(predictions)
error = math.sqrt(rates_and_preds.map(lambda r: (r[1][0] - r[1][1])**2).mean())
    
print(f'For testing data the RMSE is {error}')

For testing data the RMSE is 0.9041156142840086


In [13]:
predictions.take(10)

[((140, 37739), 2.9152140567715112),
 ((232, 37739), 2.78983291683084),
 ((28, 37739), 2.656870373312786),
 ((492, 667), 4.358831537079135),
 ((307, 44828), 2.249519832233183),
 ((387, 44828), 2.6234492031819654),
 ((307, 5618), 3.293970646082535),
 ((318, 5618), 3.8430326028648616),
 ((434, 5618), 4.542502274348248),
 ((98, 5618), 4.864687846094984)]

### Using the 1M dataset to build the final model
In order to build our recommender model, we will use the complete dataset. Therefore, we need to process it the same way we did with the small dataset.

In [14]:
complete_ratings_file = './ml-1m/ratings.csv'
complete_ratings_raw_data = sc.textFile(complete_ratings_file)
complete_ratings_raw_data_header = complete_ratings_raw_data.take(1)[0]

# Parse
complete_ratings_data = complete_ratings_raw_data.filter(lambda line: line!=complete_ratings_raw_data_header)\
    .map(lambda line: line.split(",")).map(lambda tokens: (int(tokens[0]),int(tokens[1]),float(tokens[2]))).cache()
print(f"There are {complete_ratings_data.count()} recommendations in the complete dataset")

There are 1016822 recommendations in the complete dataset


In [15]:
complete_ratings_data.take(10)

[(1, 2, 3.5),
 (1, 29, 3.5),
 (1, 32, 3.5),
 (1, 47, 3.5),
 (1, 50, 3.5),
 (1, 112, 3.5),
 (1, 151, 4.0),
 (1, 223, 4.0),
 (1, 253, 4.0),
 (1, 260, 4.0)]

Split the data into training and test RDDs, then train the model:

In [16]:
training_RDD, test_RDD = complete_ratings_data.randomSplit([7, 3], seed=0)

complete_model = ALS.train(training_RDD, best_rank, seed=seed, iterations=iterations, lambda_=regularization_parameter)

In [17]:
test_for_predict_RDD = test_RDD.map(lambda x: (x[0], x[1]))

predictions = complete_model.predictAll(test_for_predict_RDD).map(lambda r: ((r[0], r[1]), r[2]))
rates_and_preds = test_RDD.map(lambda r: ((int(r[0]), int(r[1])), float(r[2]))).join(predictions)
error = math.sqrt(rates_and_preds.map(lambda r: (r[1][0] - r[1][1])**2).mean())
    
print(f'For testing data the RMSE is {error}')

For testing data the RMSE is 0.8366257480512765


# Making recommendations
When using collaborative filtering, obtaining recommendations is not as straightforward as making predictions for new entries using an existing model. Instead, we must retrain the model by incorporating the new user preferences to compare them with other users in the dataset. In other words, the recommender system needs to be trained each time there are new user ratings (although the same model can be used by multiple users, of course!). This process becomes expensive and contributes to scalability challenges (which can be addressed by technologies like Spark). However, once our model is trained, we can reuse it to efficiently obtain top recommendations for a specific user or individual ratings for particular movies. These operations are less costly compared to training the model from scratch.

Now, let's load the `movies` (from the ml-1m version) for future use.

In [18]:
complete_movies_file = './ml-1m/movies.csv'

complete_movies_raw_data = sc.textFile(complete_movies_file)
complete_movies_raw_data_header = complete_movies_raw_data.take(1)[0]

# Parse
complete_movies_data = complete_movies_raw_data.filter(lambda line: line!=complete_movies_raw_data_header)\
    .map(lambda line: line.split(",")).map(lambda tokens: (int(tokens[0]),tokens[1],tokens[2])).cache()

complete_movies_titles = complete_movies_data.map(lambda x: (int(x[0]),x[1]))
    
print (f"There are {complete_movies_titles.count()} movies in the complete dataset")

There are 27278 movies in the complete dataset


In [19]:
complete_movies_data.take(10)

[(1, 'Toy Story (1995)', 'Adventure|Animation|Children|Comedy|Fantasy'),
 (2, 'Jumanji (1995)', 'Adventure|Children|Fantasy'),
 (3, 'Grumpier Old Men (1995)', 'Comedy|Romance'),
 (4, 'Waiting to Exhale (1995)', 'Comedy|Drama|Romance'),
 (5, 'Father of the Bride Part II (1995)', 'Comedy'),
 (6, 'Heat (1995)', 'Action|Crime|Thriller'),
 (7, 'Sabrina (1995)', 'Comedy|Romance'),
 (8, 'Tom and Huck (1995)', 'Adventure|Children'),
 (9, 'Sudden Death (1995)', 'Action'),
 (10, 'GoldenEye (1995)', 'Action|Adventure|Thriller')]

Another objective is to provide movie suggestions based on a specific minimum number of ratings. To achieve this, we need to count the number of ratings for each movie.

In [21]:
def get_counts_and_averages(ID_and_ratings_tuple):
    nratings = len(ID_and_ratings_tuple[1])
    return ID_and_ratings_tuple[0], (nratings, float(sum(x for x in ID_and_ratings_tuple[1]))/nratings)

movie_ID_with_ratings_RDD = (complete_ratings_data.map(lambda x: (x[1], x[2])).groupByKey())
movie_ID_with_avg_ratings_RDD = movie_ID_with_ratings_RDD.map(get_counts_and_averages)
movie_rating_counts_RDD = movie_ID_with_avg_ratings_RDD.map(lambda x: (x[0], x[1][0]))

In [22]:
movie_rating_counts_RDD.take(10)

[(2, 1123),
 (29, 437),
 (32, 2241),
 (47, 2165),
 (50, 2396),
 (112, 605),
 (151, 679),
 (223, 1212),
 (253, 1410),
 (260, 2778)]

## Adding new user ratings
Now we need to rate some movies for the new user. We will put them in a new RDD and we will use the user ID 0, that is not assigned in the MovieLens dataset.

In [23]:
new_user_ID = 0

# The format of each line is (userID, movieID, rating)
new_user_ratings = [
    (0, 260, 9),  # Star Wars (1977)
    (0, 1, 8),  # Toy Story (1995)
    (0, 16, 7),  # Casino (1995)
    (0, 25, 8),  # Leaving Las Vegas (1995)
    (0, 32, 9),  # Twelve Monkeys (a.k.a. 12 Monkeys) (1995)
    (0, 335, 4),  # Flintstones, The (1994)
    (0, 379, 3),  # Timecop (1994)
    (0, 296, 7),  # Pulp Fiction (1994)
    (0, 858, 10),  # Godfather, The (1972)
    (0, 50, 8)  # Usual Suspects, The (1995)
]
new_user_ratings_RDD = sc.parallelize(new_user_ratings)
print(f'New user ratings: {new_user_ratings_RDD.take(10)}')

New user ratings: [(0, 260, 9), (0, 1, 8), (0, 16, 7), (0, 25, 8), (0, 32, 9), (0, 335, 4), (0, 379, 3), (0, 296, 7), (0, 858, 10), (0, 50, 8)]


Add them to the data that will be used to train the recommendation model by using `union()` method:

In [24]:
complete_data_with_new_ratings_RDD = complete_ratings_data.union(new_user_ratings_RDD)

Train the ALS model using all the parameters selected before when using the small dataset:

In [25]:
new_ratings_model = ALS.train(complete_data_with_new_ratings_RDD, best_rank, seed=seed, iterations=iterations, lambda_=regularization_parameter)

## Getting top recommendations

In [26]:
new_user_ratings_ids = map(lambda x: x[1], new_user_ratings) # get just movie IDs
# keep just those not on the ID list
new_user_unrated_movies_RDD = (complete_movies_data.filter(lambda x: x[0] not in new_user_ratings_ids).map(lambda x: (new_user_ID, x[0])))

# Use the input RDD, new_user_unrated_movies_RDD, with new_ratings_model.predictAll() to predict new ratings for the movies
new_user_recommendations_RDD = new_ratings_model.predictAll(new_user_unrated_movies_RDD)

In [27]:
# Transform new_user_recommendations_RDD into pairs of the form (Movie ID, Predicted Rating)
new_user_recommendations_rating_RDD = new_user_recommendations_RDD.map(lambda x: (x.product, x.rating))
new_user_recommendations_rating_title_and_count_RDD = new_user_recommendations_rating_RDD.join(complete_movies_titles).join(movie_rating_counts_RDD)
new_user_recommendations_rating_title_and_count_RDD.take(3)

[(91902, ((8.276510704669155, 'Elena (2011)'), 3)),
 (4926,
  ((5.689454821306413, "Everybody's Famous! (Iedereen beroemd!) (2000)"), 1)),
 (4992, ((4.9502469603492045, 'Kate & Leopold (2001)'), 161))]

In [28]:
new_user_recommendations_rating_title_and_count_RDD = new_user_recommendations_rating_title_and_count_RDD.map(lambda r: (r[1][0][1], r[1][0][0], r[1][1]))

In [29]:
top_movies = new_user_recommendations_rating_title_and_count_RDD.filter(lambda r: r[2]>=25).takeOrdered(25, key=lambda x: -x[1])
res = "\n"
print (f'TOP recommended movies (with more than 25 reviews):\n {res.join(map(str, top_movies))}')

TOP recommended movies (with more than 25 reviews):
 ('Cosmos (1980)', 9.560979904358788, 56)
('"Best of Youth', 9.287058750381005, 29)
('Baraka (1992)', 9.070479205528232, 73)
('"Lives of Others', 8.914468033558713, 290)
('Before the Rain (Pred dozhdot) (1994)', 8.893510719376897, 50)
('"Strada', 8.846726983222451, 41)
('Black Mirror (2011)', 8.804826720003058, 35)
('Senna (2010)', 8.796618628800461, 40)
('Snowpiercer (2013)', 8.786108439241591, 32)
('"General', 8.752642034300312, 120)
('City Lights (1931)', 8.74763028401696, 135)
('Grand Illusion (La grande illusion) (1937)', 8.693028223917253, 85)
('Spirited Away (Sen to Chihiro no kamikakushi) (2001)', 8.681895804050143, 692)
('"Children of Heaven', 8.670061393748052, 36)
('"Amelie (Fabuleux destin d\'Amélie Poulain', 8.657056505339037, 1212)
('"Passion of Joan of Arc', 8.651484987487667, 31)
('Yi Yi (2000)', 8.649924334306235, 26)
('"Wings of Desire (Himmel über Berlin', 8.649756401696067, 206)
('12 Angry Men (1957)', 8.5830374727

In [30]:
my_movie = sc.parallelize([(0, 1196)]) # Star Wars: Episode V - The Empire Strikes Back (1980)
individual_movie_rating_RDD = new_ratings_model.predictAll(new_user_unrated_movies_RDD)
individual_movie_rating_RDD.take(1)

[Rating(user=0, product=91902, rating=8.276510704669155)]

In [32]:
from pyspark.mllib.recommendation import MatrixFactorizationModel

model_path = './models'
model.save(sc, model_path)