In [77]:
filepath = '/Users/navi/Desktop/'

ratings = filepath + 'ml-latest-small/ratings.csv'
movies = filepath + 'ml-latest-small/movies.csv'

In [5]:
ratings_raw_data = sc.textFile(ratings)

In [13]:
ratings_header = ratings_raw_data.take(1)[0]

In [14]:
ratings_data = ratings_raw_data.filter(lambda line: line!= ratings_header)\
.map(lambda line: line.split(",")).map(lambda tokens : (tokens[0], tokens[1], tokens[2])).cache()

In [15]:
ratings_data.take(4)

[('1', '31', '2.5'),
 ('1', '1029', '3'),
 ('1', '1061', '3'),
 ('1', '1129', '2')]

In [16]:
movies_raw_data = sc.textFile(movies)

In [21]:
movies_header = movies_raw_data.take(1)[0]

In [26]:
movies_data = movies_raw_data.filter(lambda line: line != movies_header)\
.map(lambda line: line.split(",")).map(lambda tokens : (tokens[0], tokens[1])).cache()


In [27]:
movies_data.take(4)

[('1', 'Toy Story (1995)'),
 ('2', 'Jumanji (1995)'),
 ('3', 'Grumpier Old Men (1995)'),
 ('4', 'Waiting to Exhale (1995)')]

In [75]:
training_RDD, validation_RDD, test_RDD = ratings_data.randomSplit([6, 2, 2])
validation_for_predict_RDD = validation_RDD.map(lambda x: (x[0], x[1]))
test_for_predict_RDD = test_RDD.map(lambda x: (x[0], x[1]))

In [30]:
from pyspark.mllib.recommendation import ALS
import math

In [33]:
seed = 5
iterations = 10
regularization_parameter = 0.1
ranks = [4, 8, 12]
errors = [0, 0, 0]
err = 0
tolerance = 0.02


In [35]:
min_error = float('inf')
best_rank = -1
best_iteration = -1
for rank in ranks:
    model = ALS.train(training_RDD, rank, seed=seed, iterations=iterations,
                      lambda_=regularization_parameter)
    predictions = model.predictAll(validation_for_predict_RDD).map(lambda r: ((r[0], r[1]), r[2]))
    rates_and_preds = validation_RDD.map(lambda r: ((int(r[0]), int(r[1])), float(r[2]))).join(predictions)
    error = math.sqrt(rates_and_preds.map(lambda r: (r[1][0] - r[1][1])**2).mean())
    errors[err] = error
    err += 1
    print ('For rank %s the RMSE is %s' % (rank, error))
    if error < min_error:
        min_error = error
        best_rank = rank

print ('The best model was trained with rank %s' % best_rank)

For rank 4 the RMSE is 0.9508883651995287
For rank 8 the RMSE is 0.9560731348412872
For rank 12 the RMSE is 0.9544416427577839
The best model was trained with rank 4


In [36]:
predictions.take(3)

[((608, 1084), 4.324410439126544),
 ((472, 1084), 4.018760403690836),
 ((436, 1084), 3.8793227009760365)]

In [37]:
rates_and_preds.take(3)

[((2, 272), (3.0, 2.856199679232424)),
 ((2, 474), (2.0, 3.56038179699224)),
 ((4, 10), (4.0, 3.753283107803004))]

In [39]:
model = ALS.train(training_RDD, best_rank, seed=seed, iterations=iterations,
                      lambda_=regularization_parameter)
predictions = model.predictAll(test_for_predict_RDD).map(lambda r: ((r[0], r[1]), r[2]))
rates_and_preds = test_RDD.map(lambda r: ((int(r[0]), int(r[1])), float(r[2]))).join(predictions)
error = math.sqrt(rates_and_preds.map(lambda r: (r[1][0] - r[1][1])**2).mean())
    
print ('For testing data the RMSE is %s' % (error))

For testing data the RMSE is 0.9473274359242615


In [78]:
# Load the complete dataset file
complete_ratings_file = filepath + 'ml-latest/ratings.csv'
complete_ratings_raw_data = sc.textFile(complete_ratings_file)
complete_ratings_raw_data_header = complete_ratings_raw_data.take(1)[0]

# Parse
complete_ratings_data = complete_ratings_raw_data.filter(lambda line: line!=complete_ratings_raw_data_header)\
    .map(lambda line: line.split(",")).map(lambda tokens: (int(tokens[0]),int(tokens[1]),float(tokens[2]))).cache()
    
print ("There are %s recommendations in the complete dataset" % (complete_ratings_data.count()))

There are 26024289 recommendations in the complete dataset


In [80]:
training_RDD, test_RDD = complete_ratings_data.randomSplit([7, 3])

complete_model = ALS.train(training_RDD, best_rank, seed=seed, 
                           iterations=iterations, lambda_=regularization_parameter)

In [82]:

test_for_predict_RDD = test_RDD.map(lambda x: (x[0], x[1]))

predictions = complete_model.predictAll(test_for_predict_RDD).map(lambda r: ((r[0], r[1]), r[2]))
rates_and_preds = test_RDD.map(lambda r: ((int(r[0]), int(r[1])), float(r[2]))).join(predictions)
error = math.sqrt(rates_and_preds.map(lambda r: (r[1][0] - r[1][1])**2).mean())
    
print ('For testing data the RMSE is %s' % (error))


For testing data the RMSE is 0.8328719334961197


In [83]:
complete_movies_file = filepath + 'ml-latest/movies.csv'
complete_movies_raw_data = sc.textFile(complete_movies_file)
complete_movies_raw_data_header = complete_movies_raw_data.take(1)[0]

# Parse
complete_movies_data = complete_movies_raw_data.filter(lambda line: line!=complete_movies_raw_data_header)\
    .map(lambda line: line.split(",")).map(lambda tokens: (int(tokens[0]),tokens[1],tokens[2])).cache()

complete_movies_titles = complete_movies_data.map(lambda x: (int(x[0]),x[1]))
    
print ("There are %s movies in the complete dataset" % (complete_movies_titles.count()))

There are 45843 movies in the complete dataset


In [40]:
def get_counts_and_averages(ID_and_ratings_tuple):
    nratings = len(ID_and_ratings_tuple[1])
    return ID_and_ratings_tuple[0], (nratings, float(sum(x for x in ID_and_ratings_tuple[1]))/nratings)

In [84]:
movie_ID_with_ratings_RDD = (complete_ratings_data.map(lambda x: (x[1], x[2])).groupByKey())
movie_ID_with_avg_ratings_RDD = movie_ID_with_ratings_RDD.map(get_counts_and_averages)
movie_rating_counts_RDD = movie_ID_with_avg_ratings_RDD.map(lambda x: (x[0], x[1][0]))

In [42]:
new_user_ID = 0

# The format of each line is (userID, movieID, rating)
new_user_ratings = [
     (0,260,9), # Star Wars (1977)
     (0,1,8), # Toy Story (1995)
     (0,16,7), # Casino (1995)
     (0,25,8), # Leaving Las Vegas (1995)
     (0,32,9), # Twelve Monkeys (a.k.a. 12 Monkeys) (1995)
     (0,335,4), # Flintstones, The (1994)
     (0,379,3), # Timecop (1994)
     (0,296,7), # Pulp Fiction (1994)
     (0,858,10) , # Godfather, The (1972)
     (0,50,8) # Usual Suspects, The (1995)
    ]
new_user_ratings_RDD = sc.parallelize(new_user_ratings)
print ('New user ratings: %s' % new_user_ratings_RDD.take(10))

New user ratings: [(0, 260, 9), (0, 1, 8), (0, 16, 7), (0, 25, 8), (0, 32, 9), (0, 335, 4), (0, 379, 3), (0, 296, 7), (0, 858, 10), (0, 50, 8)]


In [85]:
complete_data_with_new_ratings_RDD = complete_ratings_data.union(new_user_ratings_RDD)

In [86]:
from time import time

t0 = time()
new_ratings_model = ALS.train(complete_data_with_new_ratings_RDD, best_rank, seed=seed, 
                              iterations=iterations, lambda_=regularization_parameter)
tt = time() - t0

print ("New model trained in %s seconds" % round(tt,3))

New model trained in 111.525 seconds


In [87]:
new_user_ratings_ids = map(lambda x: x[1], new_user_ratings) # get just movie IDs
# keep just those not on the ID list (thanks Lei Li for spotting the error!)
new_user_unrated_movies_RDD = (complete_movies_data.filter(lambda x: x[0] not in new_user_ratings_ids).map(lambda x: (new_user_ID, x[0])))

# Use the input RDD, new_user_unrated_movies_RDD, with new_ratings_model.predictAll() to predict new ratings for the movies
new_user_recommendations_RDD = new_ratings_model.predictAll(new_user_unrated_movies_RDD)

In [49]:
movies_titles = movies_data.map(lambda x: (int(x[0]),x[1]))

In [88]:
# Transform new_user_recommendations_RDD into pairs of the form (Movie ID, Predicted Rating)
new_user_recommendations_rating_RDD = new_user_recommendations_RDD.map(lambda x: (x.product, x.rating))
new_user_recommendations_rating_title_and_count_RDD = \
    new_user_recommendations_rating_RDD.join(complete_movies_titles).join(movie_rating_counts_RDD)
new_user_recommendations_rating_title_and_count_RDD.take(3)

[(144300, ((5.946452608106953, 'As the Light Goes Out (2014)'), 9)),
 (7215, ((7.750161004267109, 'To Have and Have Not (1944)'), 1026)),
 (165945, ((5.218340373876387, 'The Last Ronin (2010)'), 2))]

In [89]:
new_user_recommendations_rating_title_and_count_RDD = \
    new_user_recommendations_rating_title_and_count_RDD.map(lambda r: (r[1][0][1], r[1][0][0], r[1][1]))

In [90]:
top_movies = new_user_recommendations_rating_title_and_count_RDD.filter(lambda r: r[2]>=25).takeOrdered(25, key=lambda x: -x[1])

print ('TOP recommended movies (with more than 25 reviews):\n%s' %
        '\n'.join(map(str, top_movies)))

TOP recommended movies (with more than 25 reviews):
('"Godfather', 8.72253087608388, 57070)
('"I', 8.676032031409026, 63)
('Baseball (1994)', 8.590651113426288, 29)
('"Godfather: Part II', 8.584168164223634, 36679)
('Planet Earth (2006)', 8.538458821157345, 754)
('Duck Amuck (1953)', 8.507706063230337, 178)
('Death on the Staircase (Soupçons) (2004)', 8.48977406184169, 95)
('Seven Samurai (Shichinin no samurai) (1954)', 8.475140274558461, 13994)
('Harakiri (Seppuku) (1962)', 8.474783937715731, 610)
('The War (2007)', 8.455334803335798, 44)
('"Civil War', 8.426721058431154, 400)
('Band of Brothers (2001)', 8.426353333118199, 284)
('Dr. Strangelove or: How I Learned to Stop Worrying and Love the Bomb (1964)', 8.41203209568879, 28280)
('"Last Lions', 8.40557102848036, 27)
('Frozen Planet (2011)', 8.375006317846855, 322)
('Paths of Glory (1957)', 8.37437729788749, 4271)
('Heimat - A Chronicle of Germany (Heimat - Eine deutsche Chronik) (1984)', 8.36192667670944, 32)
('Yojimbo (1961)', 8.33

In [91]:
my_movie = sc.parallelize([(0, 500)]) # Quiz Show (1994)
individual_movie_rating_RDD = new_ratings_model.predictAll(new_user_unrated_movies_RDD)
individual_movie_rating_RDD.take(1)

[Rating(user=0, product=116688, rating=1.9130054533578615)]