In [4]:
complete_dataset_url = 'http://files.grouplens.org/datasets/movielens/ml-latest.zip'
small_dataset_url = 'http://files.grouplens.org/datasets/movielens/ml-latest-small.zip'

In [5]:
import os

datasets_path = os.path.join('..', 'datasets')

complete_dataset_path = os.path.join(datasets_path, 'ml-latest.zip')
small_dataset_path = os.path.join(datasets_path, 'ml-latest-small.zip')


In [7]:
import urllib

small_f = urllib.urlretrieve (small_dataset_url, small_dataset_path)
complete_f = urllib.urlretrieve (complete_dataset_url, complete_dataset_path)


In [8]:
import zipfile

with zipfile.ZipFile(small_dataset_path, "r") as z:
    z.extractall(datasets_path)

with zipfile.ZipFile(complete_dataset_path, "r") as z:
    z.extractall(datasets_path)


In [9]:
small_ratings_file = os.path.join(datasets_path, 'ml-latest-small', 'ratings.csv')

small_ratings_raw_data = sc.textFile(small_ratings_file)
small_ratings_raw_data_header = small_ratings_raw_data.take(1)[0]

In [10]:
small_ratings_data = small_ratings_raw_data.filter(lambda line: line!=small_ratings_raw_data_header)\
    .map(lambda line: line.split(",")).map(lambda tokens: (tokens[0],tokens[1],tokens[2])).cache()

In [11]:
small_movies_file = os.path.join(datasets_path, 'ml-latest-small', 'movies.csv')

small_movies_raw_data = sc.textFile(small_movies_file)
small_movies_raw_data_header = small_movies_raw_data.take(1)[0]

small_movies_data = small_movies_raw_data.filter(lambda line: line!=small_movies_raw_data_header)\
    .map(lambda line: line.split(",")).map(lambda tokens: (tokens[0],tokens[1])).cache()

small_movies_data.take(3)

[(u'1', u'Toy Story (1995)'),
 (u'2', u'Jumanji (1995)'),
 (u'3', u'Grumpier Old Men (1995)')]

In [116]:
small_ratings_raw_data.take(3)

[u'userId,movieId,rating,timestamp',
 u'1,31,2.5,1260759144',
 u'1,1029,3.0,1260759179']

In [114]:
#test
def test(line):
    attributes = line[2].split("|")
    time = line[1][-5:-1]
    attributes.append(time)
    return (line[0], attributes)
movies_attribute = small_movies_raw_data.map(lambda line:line.split(","))\
.map(lambda tokens: (tokens[0], tokens[1], tokens[2]))\
.map(test)
movie_rating_number = small_ratings_data.map(lambda line: (int(line[1]), 1)).reduceByKey(lambda a, b: a + b)
movie_average_rating = small_ratings_data.map(lambda line: (int(line[1]), float(line[2])))\
.reduceByKey(lambda a, b: a + b)\
 .join(movie_rating_number).map(lambda line: (line[0], line[1][0] / line[1][1]))
# movie_average_rating.take(40)
# .map(lambda line: (line[0], line[1][-5:-1], line[2].split("|")))\
# movies_attribute.join(movie_rating_number).take(3)

In [115]:
small_ratings_data.take(3)

[(u'1', u'31', u'2.5'), (u'1', u'1029', u'3.0'), (u'1', u'1061', u'3.0')]

In [77]:
movie_rating_number = small_ratings_data.map(lambda line: (line[0], 1)).reduceByKey(lambda a, b: a + b).sortByKey()
# movie_rating_number.sortByKey(ascending = False)
movie_rating_number.take(20)
movie_rating_number.join(movies_attribute).take(15)

[(u'344', (177, u'Comedy')),
 (u'340', (39, u' The (1994)"')),
 (u'348', (53, u'Comedy')),
 (u'296', (20, u'Comedy|Crime|Drama|Thriller')),
 (u'292', (278, u'Action|Drama|Sci-Fi|Thriller')),
 (u'586', (57, u'Children|Comedy')),
 (u'274', (39, u'Comedy')),
 (u'278', (53, u'Comedy')),
 (u'582', (138, u'Comedy|Drama')),
 (u'102', (678, u'Comedy')),
 (u'99', (188, u'Documentary')),
 (u'95', (299, u'Action|Adventure|Thriller')),
 (u'559', (129, u' France (1993)"')),
 (u'551', (85, u' The (1993)"')),
 (u'511', (31, u' The (1993)"'))]

In [12]:
training_RDD, validation_RDD, test_RDD = small_ratings_data.randomSplit([6, 2, 2], seed=0L)
validation_for_predict_RDD = validation_RDD.map(lambda x: (x[0], x[1]))
test_for_predict_RDD = test_RDD.map(lambda x: (x[0], x[1]))

In [13]:
from pyspark.mllib.recommendation import ALS
import math

seed = 5L
iterations = 10
regularization_parameter = 0.1
ranks = [4, 8, 12]
errors = [0, 0, 0]
err = 0
tolerance = 0.02

min_error = float('inf')
best_rank = -1
best_iteration = -1
for rank in ranks:
    model = ALS.train(training_RDD, rank, seed=seed, iterations=iterations,
                      lambda_=regularization_parameter)
    predictions = model.predictAll(validation_for_predict_RDD).map(lambda r: ((r[0], r[1]), r[2]))
    rates_and_preds = validation_RDD.map(lambda r: ((int(r[0]), int(r[1])), float(r[2]))).join(predictions)
    error = math.sqrt(rates_and_preds.map(lambda r: (r[1][0] - r[1][1])**2).mean())
    errors[err] = error
    err += 1
    print 'For rank %s the RMSE is %s' % (rank, error)
    if error < min_error:
        min_error = error
        best_rank = rank

print 'The best model was trained with rank %s' % best_rank

For rank 4 the RMSE is 0.947397387831
For rank 8 the RMSE is 0.957024708311
For rank 12 the RMSE is 0.954850413563
The best model was trained with rank 4


In [14]:
model = ALS.train(training_RDD, best_rank, seed=seed, iterations=iterations,
                      lambda_=regularization_parameter)
predictions = model.predictAll(test_for_predict_RDD).map(lambda r: ((r[0], r[1]), r[2]))
rates_and_preds = test_RDD.map(lambda r: ((int(r[0]), int(r[1])), float(r[2]))).join(predictions)
error = math.sqrt(rates_and_preds.map(lambda r: (r[1][0] - r[1][1])**2).mean())

print 'For testing data the RMSE is %s' % (error)

For testing data the RMSE is 0.937620917297


In [15]:
# Load the complete dataset file
complete_ratings_file = os.path.join(datasets_path, 'ml-latest', 'ratings.csv')
complete_ratings_raw_data = sc.textFile(complete_ratings_file)
complete_ratings_raw_data_header = complete_ratings_raw_data.take(1)[0]

# Parse
complete_ratings_data = complete_ratings_raw_data.filter(lambda line: line!=complete_ratings_raw_data_header)\
    .map(lambda line: line.split(",")).map(lambda tokens: (int(tokens[0]),int(tokens[1]),float(tokens[2]))).cache()

print "There are %s recommendations in the complete dataset" % (complete_ratings_data.count())


There are 24404096 recommendations in the complete dataset


In [16]:
training_RDD, test_RDD = complete_ratings_data.randomSplit([7, 3], seed=0L)

complete_model = ALS.train(training_RDD, best_rank, seed=seed, 
                           iterations=iterations, lambda_=regularization_parameter)

In [17]:
test_for_predict_RDD = test_RDD.map(lambda x: (x[0], x[1]))

predictions = complete_model.predictAll(test_for_predict_RDD).map(lambda r: ((r[0], r[1]), r[2]))
rates_and_preds = test_RDD.map(lambda r: ((int(r[0]), int(r[1])), float(r[2]))).join(predictions)
error = math.sqrt(rates_and_preds.map(lambda r: (r[1][0] - r[1][1])**2).mean())

print 'For testing data the RMSE is %s' % (error)

For testing data the RMSE is 0.831954381788


In [29]:
complete_movies_file = os.path.join(datasets_path, 'ml-latest', 'movies.csv')
complete_movies_raw_data = sc.textFile(complete_movies_file)
complete_movies_raw_data_header = complete_movies_raw_data.take(1)[0]

# Parse
complete_movies_data = complete_movies_raw_data.filter(lambda line: line!=complete_movies_raw_data_header)\
    .map(lambda line: line.split(",")).map(lambda tokens: (int(tokens[0]),tokens[1],tokens[2])).cache()

complete_movies_titles = complete_movies_data.map(lambda x: (int(x[0]),x[1]))

print "There are %s movies in the complete dataset" % (complete_movies_titles.count())

There are 40110 movies in the complete dataset


In [30]:
def get_counts_and_averages(ID_and_ratings_tuple):
    nratings = len(ID_and_ratings_tuple[1])
    return ID_and_ratings_tuple[0], (nratings, float(sum(x for x in ID_and_ratings_tuple[1]))/nratings)

movie_ID_with_ratings_RDD = (complete_ratings_data.map(lambda x: (x[1], x[2])).groupByKey())
movie_ID_with_avg_ratings_RDD = movie_ID_with_ratings_RDD.map(get_counts_and_averages)
movie_rating_counts_RDD = movie_ID_with_avg_ratings_RDD.map(lambda x: (x[0], x[1][0]))

In [31]:
new_user_ID = 0

# The format of each line is (userID, movieID, rating)
new_user_ratings = [
     (0,260,4), # Star Wars (1977)
     (0,1,3), # Toy Story (1995)
     (0,16,3), # Casino (1995)
     (0,25,4), # Leaving Las Vegas (1995)
     (0,32,4), # Twelve Monkeys (a.k.a. 12 Monkeys) (1995)
     (0,335,1), # Flintstones, The (1994)
     (0,379,1), # Timecop (1994)
     (0,296,3), # Pulp Fiction (1994)
     (0,858,5) , # Godfather, The (1972)
     (0,50,4) # Usual Suspects, The (1995)
    ]
new_user_ratings_RDD = sc.parallelize(new_user_ratings)
print 'New user ratings: %s' % new_user_ratings_RDD.take(10)

New user ratings: [(0, 260, 4), (0, 1, 3), (0, 16, 3), (0, 25, 4), (0, 32, 4), (0, 335, 1), (0, 379, 1), (0, 296, 3), (0, 858, 5), (0, 50, 4)]


In [32]:
complete_data_with_new_ratings_RDD = complete_ratings_data.union(new_user_ratings_RDD)

In [33]:
from time import time

t0 = time()
new_ratings_model = ALS.train(complete_data_with_new_ratings_RDD, best_rank, seed=seed, 
                              iterations=iterations, lambda_=regularization_parameter)
tt = time() - t0

print "New model trained in %s seconds" % round(tt,3)


New model trained in 95.749 seconds


In [34]:
new_user_ratings_ids = map(lambda x: x[1], new_user_ratings) # get just movie IDs
# keep just those not on the ID list (thanks Lei Li for spotting the error!)
new_user_unrated_movies_RDD = (complete_movies_data.filter(lambda x: x[0] not in new_user_ratings_ids).map(lambda x: (new_user_ID, x[0])))

# Use the input RDD, new_user_unrated_movies_RDD, with new_ratings_model.predictAll() to predict new ratings for the movies
new_user_recommendations_RDD = new_ratings_model.predictAll(new_user_unrated_movies_RDD)

In [37]:
# Transform new_user_recommendations_RDD into pairs of the form (Movie ID, Predicted Rating)
new_user_recommendations_rating_RDD = new_user_recommendations_RDD.map(lambda x: (x.product, x.rating))
new_user_recommendations_rating_title_and_count_RDD = \
    new_user_recommendations_rating_RDD.join(complete_movies_titles).join(movie_rating_counts_RDD)
new_user_recommendations_rating_title_and_count_RDD.take(3)

[(135168, ((2.673367646553945, u'Smithereens (1982)'), 5)),
 (65538, ((1.8030351861956486, u'"Young Visiters'), 9)),
 (30723, ((2.4026795772335334, u'Vincent & Theo (1990)'), 30))]

In [38]:
new_user_recommendations_rating_title_and_count_RDD = \
    new_user_recommendations_rating_title_and_count_RDD.map(lambda r: (r[1][0][1], r[1][0][0], r[1][1]))

In [39]:
top_movies = new_user_recommendations_rating_title_and_count_RDD.filter(lambda r: r[2]>=25).takeOrdered(25, key=lambda x: -x[1])

print ('TOP recommended movies (with more than 25 reviews):\n%s' %
        '\n'.join(map(str, top_movies)))

TOP recommended movies (with more than 25 reviews):
(u"Long Night's Journey Into Day (2000)", 4.059913721183614, 35)
(u'Frozen Planet (2011)', 4.0407705022659215, 239)
(u'Death on the Staircase (Soup\xe7ons) (2004)', 3.9845210795473873, 63)
(u'Beastie Boys: Sabotage (1994)', 3.9772592886532956, 28)
(u'Mei and the Kittenbus (2002)', 3.953108502689679, 25)
(u'Long Way Round (2004)', 3.9440116890655434, 29)
(u'Milius (2013)', 3.9346396405885726, 29)
(u'"Godfather: Part II', 3.8924631308171693, 34508)
(u'Heimat - A Chronicle of Germany (Heimat - Eine deutsche Chronik) (1984)', 3.8746458118970244, 31)
(u'The War (2007)', 3.851656248176492, 34)
(u'Band of Brothers (2001)', 3.8364625879590832, 7833)
(u'"Lord of the Rings: The Fellowship of the Ring', 3.822754409966089, 51871)
(u'Cosmos (1980)', 3.8128094454858243, 1740)
(u'"Civil War', 3.805263407628287, 380)
(u'Duck Amuck (1953)', 3.802050204028244, 120)
(u'"Lord of the Rings: The Return of the King', 3.799644723311997, 46446)
(u'The Hound o

In [40]:
my_movie = sc.parallelize([(0, 500)]) # Quiz Show (1994)
individual_movie_rating_RDD = new_ratings_model.predictAll(new_user_unrated_movies_RDD)
individual_movie_rating_RDD.take(1)

[Rating(user=0, product=116688, rating=0.7394535070613866)]

In [41]:
from pyspark.mllib.recommendation import MatrixFactorizationModel

model_path = os.path.join('..', 'models', 'movie_lens_als')

# Save and load model
model.save(sc, model_path)
same_model = MatrixFactorizationModel.load(sc, model_path)
