#### Create SparkContext

In [6]:
import pyspark
sc = pyspark.SparkContext('local[*]')

#### File download

In [7]:
complete_dataset_url = 'http://files.grouplens.org/datasets/movielens/ml-latest.zip'
small_dataset_url = 'http://files.grouplens.org/datasets/movielens/ml-latest-small.zip'

#### Download Location

In [8]:
import os

datasets_path = os.path.join('/home/jovyan', 'work')

complete_dataset_path = os.path.join(datasets_path, 'ml-latest.zip')
small_dataset_path = os.path.join(datasets_path, 'ml-latest-small.zip')

#### Getting files

In [9]:
import urllib
small_f = urllib.request.urlretrieve(small_dataset_url, small_dataset_path)
complete_f = urllib.request.urlretrieve(complete_dataset_url, complete_dataset_path)


#### Extracting zip files

In [10]:
import zipfile

with zipfile.ZipFile(small_dataset_path, "r") as z:
    z.extractall(datasets_path)

with zipfile.ZipFile(complete_dataset_path, "r") as z:
    z.extractall(datasets_path)

### Load and parse the datasets

#### ratings.csv

In [11]:
small_ratings_file = os.path.join(datasets_path, 'ml-latest-small', 'ratings.csv')

small_ratings_raw_data = sc.textFile(small_ratings_file)
small_ratings_raw_data_header = small_ratings_raw_data.take(1)[0]

In [12]:
small_ratings_data = small_ratings_raw_data.filter(lambda line: line!=small_ratings_raw_data_header)\
    .map(lambda line: line.split(",")).map(lambda tokens: (tokens[0],tokens[1],tokens[2])).cache()

#### movies.csv 

In [13]:
small_movies_file = os.path.join(datasets_path, 'ml-latest-small', 'movies.csv')

small_movies_raw_data = sc.textFile(small_movies_file)
small_movies_raw_data_header = small_movies_raw_data.take(1)[0]

small_movies_data = small_movies_raw_data.filter(lambda line: line!=small_movies_raw_data_header)\
    .map(lambda line: line.split(",")).map(lambda tokens: (tokens[0],tokens[1])).cache()
    
small_movies_data.take(3)

[('1', 'Toy Story (1995)'),
 ('2', 'Jumanji (1995)'),
 ('3', 'Grumpier Old Men (1995)')]

### Collaborative Filtering

#### Selecting ALS parameteres 

In [14]:
training_RDD, validation_RDD, test_RDD = small_ratings_data.randomSplit([6, 2, 2], seed=0)
validation_for_predict_RDD = validation_RDD.map(lambda x: (x[0], x[1]))
test_for_predict_RDD = test_RDD.map(lambda x: (x[0], x[1]))

In [15]:
from pyspark.mllib.recommendation import ALS
import math

seed = 5
iterations = 10
regularization_parameter = 0.1
ranks = [4, 8, 12]
errors = [0, 0, 0]
err = 0
tolerance = 0.02

min_error = float('inf')
best_rank = -1
best_iteration = -1
for rank in ranks:
    model = ALS.train(training_RDD, rank, seed=seed, iterations=iterations,
                      lambda_=regularization_parameter)
    predictions = model.predictAll(validation_for_predict_RDD).map(lambda r: ((r[0], r[1]), r[2]))
    rates_and_preds = validation_RDD.map(lambda r: ((int(r[0]), int(r[1])), float(r[2]))).join(predictions)
    error = math.sqrt(rates_and_preds.map(lambda r: (r[1][0] - r[1][1])**2).mean())
    errors[err] = error
    err += 1
    print ('For rank {} the RMSE is {}'.format(rank,error))
    if error < min_error:
        min_error = error
        best_rank = rank

print ('The best model was trained with rank {}'.format(best_rank))


For rank 4 the RMSE is 0.908078105265682
For rank 8 the RMSE is 0.916462973348527
For rank 12 the RMSE is 0.917665030756129
The best model was trained with rank 4


In [16]:
predictions.take(3)


[((372, 1084), 3.42419871162954),
 ((4, 1084), 3.866749726695713),
 ((402, 1084), 3.4099577968422152)]

In [17]:
rates_and_preds.take(3)


[((1, 457), (5.0, 4.381060760461434)),
 ((1, 1025), (5.0, 4.705295366590298)),
 ((1, 1089), (5.0, 4.979982471805129))]

In [18]:
model = ALS.train(training_RDD, best_rank, seed=seed, iterations=iterations,
                      lambda_=regularization_parameter)
predictions = model.predictAll(test_for_predict_RDD).map(lambda r: ((r[0], r[1]), r[2]))
rates_and_preds = test_RDD.map(lambda r: ((int(r[0]), int(r[1])), float(r[2]))).join(predictions)
error = math.sqrt(rates_and_preds.map(lambda r: (r[1][0] - r[1][1])**2).mean())
    
print ('For testing data the RMSE is %s' % (error))

For testing data the RMSE is 0.9113780946334407


#### Using the complete dataset to build the final model 

In [19]:
# Load the complete dataset file
complete_ratings_file = os.path.join(datasets_path, 'ml-latest', 'ratings.csv')
complete_ratings_raw_data = sc.textFile(complete_ratings_file)
complete_ratings_raw_data_header = complete_ratings_raw_data.take(1)[0]

# Parse
complete_ratings_data = complete_ratings_raw_data.filter(lambda line: line!=complete_ratings_raw_data_header)\
    .map(lambda line: line.split(",")).map(lambda tokens: (int(tokens[0]),int(tokens[1]),float(tokens[2]))).cache()
    
print ("There are %s recommendations in the complete dataset" % (complete_ratings_data.count()))


There are 27753444 recommendations in the complete dataset


#### Training the recommender model 

In [20]:
training_RDD, test_RDD = complete_ratings_data.randomSplit([7, 3], seed=0)

complete_model = ALS.train(training_RDD, best_rank, seed=seed, 
                           iterations=iterations, lambda_=regularization_parameter)

#### Testing on test model 

In [21]:
test_for_predict_RDD = test_RDD.map(lambda x: (x[0], x[1]))

predictions = complete_model.predictAll(test_for_predict_RDD).map(lambda r: ((r[0], r[1]), r[2]))
rates_and_preds = test_RDD.map(lambda r: ((int(r[0]), int(r[1])), float(r[2]))).join(predictions)
error = math.sqrt(rates_and_preds.map(lambda r: (r[1][0] - r[1][1])**2).mean())
    
print ('For testing data the RMSE is {}'.format(error))

For testing data the RMSE is 0.8318265262101795


#### How to make recommendations

In [22]:
complete_movies_file = os.path.join(datasets_path, 'ml-latest', 'movies.csv')
complete_movies_raw_data = sc.textFile(complete_movies_file)
complete_movies_raw_data_header = complete_movies_raw_data.take(1)[0]

# Parse
complete_movies_data = complete_movies_raw_data.filter(lambda line: line!=complete_movies_raw_data_header)\
    .map(lambda line: line.split(",")).map(lambda tokens: (int(tokens[0]),tokens[1],tokens[2])).cache()

complete_movies_titles = complete_movies_data.map(lambda x: (int(x[0]),x[1]))
    
print ("There are %s movies in the complete dataset" % (complete_movies_titles.count()))


There are 58098 movies in the complete dataset


In [23]:
def get_counts_and_averages(ID_and_ratings_tuple):
    nratings = len(ID_and_ratings_tuple[1])
    return ID_and_ratings_tuple[0], (nratings, float(sum(x for x in ID_and_ratings_tuple[1]))/nratings)

movie_ID_with_ratings_RDD = (complete_ratings_data.map(lambda x: (x[1], x[2])).groupByKey())
movie_ID_with_avg_ratings_RDD = movie_ID_with_ratings_RDD.map(get_counts_and_averages)
movie_rating_counts_RDD = movie_ID_with_avg_ratings_RDD.map(lambda x: (x[0], x[1][0]))

#### Adding new user ratings

In [24]:
new_user_ID = 0

# The format of each line is (userID, movieID, rating)
new_user_ratings = [
     (0,260,4), # Star Wars (1977)
     (0,1,3), # Toy Story (1995)
     (0,16,3), # Casino (1995)
     (0,25,4), # Leaving Las Vegas (1995)
     (0,32,4), # Twelve Monkeys (a.k.a. 12 Monkeys) (1995)
     (0,335,1), # Flintstones, The (1994)
     (0,379,1), # Timecop (1994)
     (0,296,3), # Pulp Fiction (1994)
     (0,858,5) , # Godfather, The (1972)
     (0,50,4) # Usual Suspects, The (1995)
    ]
new_user_ratings_RDD = sc.parallelize(new_user_ratings)
print ('New user ratings: {}'.format(new_user_ratings_RDD.take(10)))


New user ratings: [(0, 260, 4), (0, 1, 3), (0, 16, 3), (0, 25, 4), (0, 32, 4), (0, 335, 1), (0, 379, 1), (0, 296, 3), (0, 858, 5), (0, 50, 4)]


In [25]:
sample_70 = complete_ratings_data.sample(False, 0.7, 42)
complete_data_with_new_ratings_RDD = sample_70.union(new_user_ratings_RDD)

In [26]:
from time import time

t0 = time()
new_ratings_model = ALS.train(complete_data_with_new_ratings_RDD, best_rank, seed=seed, 
                              iterations=iterations, lambda_=regularization_parameter)
tt = time() - t0

print ("New model trained in {} seconds".format(round(tt,3)))

New model trained in 118.107 seconds


#### Getting top recommendations

In [27]:
new_user_ratings_ids = map(lambda x: x[1], new_user_ratings) # get just movie IDs
# keep just those not on the ID list 
new_user_unrated_movies_RDD = (complete_movies_data.filter(lambda x: x[0] not in new_user_ratings_ids).map(lambda x: (new_user_ID, x[0])))

# Use the input RDD, new_user_unrated_movies_RDD, with new_ratings_model.predictAll() to predict new ratings for the movies
new_user_recommendations_RDD = new_ratings_model.predictAll(new_user_unrated_movies_RDD)

In [28]:
# Transform new_user_recommendations_RDD into pairs of the form (Movie ID, Predicted Rating)
new_user_recommendations_rating_RDD = new_user_recommendations_RDD.map(lambda x: (x.product, x.rating))
new_user_recommendations_rating_title_and_count_RDD = \
    new_user_recommendations_rating_RDD.join(complete_movies_titles).join(movie_rating_counts_RDD)
new_user_recommendations_rating_title_and_count_RDD.take(3)

[(6216,
  ((3.464152601182734, 'Nowhere in Africa (Nirgendwo in Afrika) (2001)'),
   717)),
 (83916, ((3.0367799007762226, 'Blues in the Night (1941)'), 9)),
 (146076, ((2.901215057590658, 'Time Please (2013)'), 1))]

In [29]:
new_user_recommendations_rating_title_and_count_RDD = \
    new_user_recommendations_rating_title_and_count_RDD.map(lambda r: (r[1][0][1], r[1][0][0], r[1][1]))

In [34]:
top_movies = new_user_recommendations_rating_title_and_count_RDD.filter(lambda r: r[2]>=25).takeOrdered(10, key=lambda x: -x[1])

print ('TOP 10 recommended movies (with more than 25 reviews):\n{}'.format('\n'.join(map(str, top_movies))))

TOP 10 recommended movies (with more than 25 reviews):
('Slaying the Badger', 4.335636079521507, 25)
('Eight Deadly Shots (Kahdeksan surmanluotia) (1972)', 4.2562934871694065, 30)
('Он вам не Димон (2017)', 4.222122199861215, 26)
('Hamlet (Gamlet) (1964)', 4.203159958686045, 37)
('Madagascar (2011)', 4.199562581570549, 32)
('"Dylan Moran: Yeah', 4.174523546688804, 81)
('"Human Condition III', 4.12884967197042, 91)
('"Sandglass', 4.127023345894943, 38)
('Strangers in Good Company (1990)', 4.114607046834268, 26)
('Landscape in the Mist (Topio stin omichli) (1988)', 4.112073211803821, 58)


In [31]:
top_movies = new_user_recommendations_rating_title_and_count_RDD.filter(lambda r: r[2]>=100).takeOrdered(10, key=lambda x: -x[1])

print ('TOP 10 recommended movies (with more than 100 reviews):\n{}'.format('\n'.join(map(str, top_movies))))

TOP 10 recommended movies (with more than 100 reviews):
('"Decalogue', 4.111964226177889, 547)
('Harakiri (Seppuku) (1962)', 4.101194870888274, 679)
('"Century of the Self', 4.077190923632896, 213)
("Smiley's People (1982)", 4.058489734175244, 116)
('Ikiru (1952)', 4.051591917900783, 1551)
('Planet Earth II (2016)', 4.0312370633117185, 853)
('Sunless (Sans Soleil) (1983)', 4.015547424917147, 312)
('Blue Planet II (2017)', 4.015402360579362, 349)
('Olive Kitteridge (2014)', 4.006381444457638, 211)
('Song of the Little Road (Pather Panchali) (1955)', 4.003156354421398, 906)


#### Getting individual ratings

In [32]:
my_movie = sc.parallelize([(0, 500)]) # Quiz Show (1994)
individual_movie_rating_RDD = new_ratings_model.predictAll(new_user_unrated_movies_RDD)
individual_movie_rating_RDD.take(1)

[Rating(user=0, product=116688, rating=0.45235868442114674)]

#### Adding second user ratings

In [53]:
fam_user_ID = 1989

# The format of each line is (userID, movieID, rating)
fam_user_ratings = [
     (0,260,2), # Star Wars (1977)
     (0,3,5), # Grumper Old Men (1995)
     (0,16,1), # Casino (1995)
     (0,25,4), # Leaving Las Vegas (1995)
     (0,2,1), # Jumanji (1995)
     (0,335,1), # Flintstones, The (1994)
     (0,344,2), 
     (0,296,5), # Pulp Fiction (1994)
     (0,858,5) , # Godfather, The (1972)
     (0,50,4) # Usual Suspects, The (1995)
    ]
fam_user_ratings_RDD = sc.parallelize(fam_user_ratings)
print ('Second user ratings: {}'.format(fam_user_ratings_RDD.take(10)))

Second user ratings: [(0, 260, 2), (0, 3, 5), (0, 16, 1), (0, 25, 4), (0, 2, 1), (0, 335, 1), (0, 344, 2), (0, 296, 5), (0, 858, 5), (0, 50, 4)]


In [61]:
sample_80 = complete_ratings_data.sample(False, 0.8, 17)
complete_data_with_fam_ratings_RDD = sample_80.union(fam_user_ratings_RDD)

In [62]:
from time import time

t0 = time()
fam_ratings_model = ALS.train(complete_data_with_fam_ratings_RDD, best_rank, seed=seed, 
                              iterations=iterations, lambda_=regularization_parameter)
tt = time() - t0

print ("Second new model trained in {} seconds".format(round(tt,3)))

Second new model trained in 187.679 seconds


#### Getting second user's top recommendations

In [63]:
fam_user_ratings_ids = map(lambda x: x[1], fam_user_ratings) # get just movie IDs
# keep just those not on the ID list 
fam_user_unrated_movies_RDD = (complete_movies_data.filter(lambda x: x[0] not in fam_user_ratings_ids).map(lambda x: (fam_user_ID, x[0])))

# Use the input RDD, fam_user_unrated_movies_RDD, with fam_ratings_model.predictAll() to predict fafm ratings for the movies
fam_user_recommendations_RDD = fam_ratings_model.predictAll(fam_user_unrated_movies_RDD)

In [64]:
# Transform fam_user_recommendations_RDD into pairs of the form (Movie ID, Predicted Rating)
fam_user_recommendations_rating_RDD = fam_user_recommendations_RDD.map(lambda x: (x.product, x.rating))
fam_user_recommendations_rating_title_and_count_RDD = \
    fam_user_recommendations_rating_RDD.join(complete_movies_titles).join(movie_rating_counts_RDD)
fam_user_recommendations_rating_title_and_count_RDD.take(3)

[(6216,
  ((3.1537494259231797, 'Nowhere in Africa (Nirgendwo in Afrika) (2001)'),
   717)),
 (124320, ((3.1112767390471396, 'Once a Thief (1965)'), 1)),
 (83916, ((3.942969439184698, 'Blues in the Night (1941)'), 9))]

In [65]:
fam_user_recommendations_rating_title_and_count_RDD = \
    fam_user_recommendations_rating_title_and_count_RDD.map(lambda r: (r[1][0][1], r[1][0][0], r[1][1]))

In [70]:
fam25 = fam_user_recommendations_rating_title_and_count_RDD.filter(lambda r: r[2]>=25).takeOrdered(10, key=lambda x: -x[1])

print ('TOP 10 recommended movies (with more than 25 reviews):\n{}'.format('\n'.join(map(str, fam25))))

TOP 10 recommended movies (with more than 25 reviews):
("Satan's Tango (Sátántangó) (1994)", 4.653193652704272, 87)
('"Sandglass', 4.6369677521259165, 38)
('Le Plaisir (1952)', 4.584083159008987, 29)
('Eight Deadly Shots (Kahdeksan surmanluotia) (1972)', 4.556402199526614, 30)
('"Human Condition I', 4.485184475501852, 151)
('Sunless (Sans Soleil) (1983)', 4.466655732036177, 312)
('Mafioso (1962)', 4.458834627808075, 25)
('"Saragossa Manuscript', 4.4511903422437396, 159)
('"Beaches of Agnes', 4.434394569849746, 40)
("Vive L'Amour (Ai qing wan sui) (1994)", 4.4332663770356655, 72)


In [69]:
fam100 = fam_user_recommendations_rating_title_and_count_RDD.filter(lambda r: r[2]>=100).takeOrdered(10, key=lambda x: -x[1])

print ('TOP 10 recommended movies (with more than 100 reviews):\n{}'.format('\n'.join(map(str, fam100))))

TOP 10 recommended movies (with more than 100 reviews):
('"Human Condition I', 4.485184475501852, 151)
('Sunless (Sans Soleil) (1983)', 4.466655732036177, 312)
('"Saragossa Manuscript', 4.4511903422437396, 159)
('"Mirror', 4.391705343494355, 826)
('"Man Escaped', 4.37165284350748, 440)
('"Eclisse', 4.3680769774358215, 340)
('Harakiri (Seppuku) (1962)', 4.354270857690135, 679)
('"Night', 4.353725124221909, 336)
('Persona (1966)', 4.344078111410672, 1799)
('"Passion of Joan of Arc', 4.341445910577617, 1036)
