In [2]:
from pyspark import SparkContext
sc = SparkContext("local","Recommendation")

In [76]:
small_ratings_raw_data = sc.textFile("/root/RecommendationEngine/data/smalldata/ml-1m/ratings.dat")
small_ratings_data = small_ratings_raw_data.map(lambda line: line.split("::")).map(lambda tokens: (tokens[0], tokens[1], tokens[2])).cache()

In [77]:
small_ratings_data.take(1)

[(u'1', u'1193', u'5')]

In [78]:
small_movies_raw_data = sc.textFile("/root/RecommendationEngine/data/smalldata/ml-1m/movies.dat")
small_movies_data = small_movies_raw_data.map(lambda line: line.split("::")).map(lambda tokens: (tokens[0],tokens[1])).cache()

In [79]:
small_movies_data.take(3)

[(u'1', u'Toy Story (1995)'),
 (u'2', u'Jumanji (1995)'),
 (u'3', u'Grumpier Old Men (1995)')]

In [80]:
training_RDD, validation_RDD, test_RDD = small_ratings_data.randomSplit([6, 2, 2], seed=0L)
validation_for_predict_RDD = validation_RDD.map(lambda x: (x[0], x[1]))
test_for_predict_RDD = test_RDD.map(lambda x: (x[0], x[1]))

In [81]:
from pyspark.mllib.recommendation import ALS
import math

seed = 5L
iterations = 10
regularization_parameter = 0.1
ranks = [4, 8, 12]
errors = [0, 0, 0]
err = 0
tolerance = 0.02

min_error = float('inf')
best_rank = -1
best_iteration = -1
for rank in ranks:
    model = ALS.train(training_RDD, rank, seed=seed, iterations=iterations,
                      lambda_=regularization_parameter)
    predictions = model.predictAll(validation_for_predict_RDD).map(lambda r: ((r[0], r[1]), r[2]))
    rates_and_preds = validation_RDD.map(lambda r: ((int(r[0]), int(r[1])), float(r[2]))).join(predictions)
    error = math.sqrt(rates_and_preds.map(lambda r: (r[1][0] - r[1][1])**2).mean())
    errors[err] = error
    err += 1
    print 'For rank %s the RMSE is %s' % (rank, error)
    if error < min_error:
        min_error = error
        best_rank = rank

print 'The best model was trained with rank %s' % best_rank

For rank 4 the RMSE is 0.884074601493
For rank 8 the RMSE is 0.873017141495
For rank 12 the RMSE is 0.876433682033
The best model was trained with rank 8


In [82]:
model = ALS.train(training_RDD, best_rank, seed=seed, iterations=iterations,
                      lambda_=regularization_parameter)
predictions = model.predictAll(test_for_predict_RDD).map(lambda r: ((r[0], r[1]), r[2]))
rates_and_preds = test_RDD.map(lambda r: ((int(r[0]), int(r[1])), float(r[2]))).join(predictions)
error = math.sqrt(rates_and_preds.map(lambda r: (r[1][0] - r[1][1])**2).mean())
    
print 'For testing data the RMSE is %s' % (error)

For testing data the RMSE is 0.873829457299


In [90]:
test_for_predict_RDD.take(1)
training_RDD.take(1)

[(u'1', u'661', u'3')]

In [146]:
def get_counts_and_averages(ID_and_ratings_tuple):
    nratings = len(ID_and_ratings_tuple[1])
    return ID_and_ratings_tuple[0], (nratings)#, float(sum(x for x in ID_and_ratings_tuple[1]))/nratings)

movie_ID_with_ratings_RDD = (small_ratings_data.map(lambda x: (x[1], x[2])).groupByKey())
movie_ID_with_avg_ratings_RDD = movie_ID_with_ratings_RDD.map(get_counts_and_averages)
movie_rating_counts_RDD = movie_ID_with_avg_ratings_RDD

In [148]:
movie_rating_counts_RDD.take(10)

[(u'593', 2578),
 (u'1200', 1820),
 (u'3724', 195),
 (u'3725', 112),
 (u'643', 3),
 (u'344', 766),
 (u'345', 469),
 (u'346', 120),
 (u'347', 46),
 (u'340', 84)]

In [70]:
new_user_ID = 0

# The format of each line is (userID, movieID, rating)
new_user_ratings = [
     (0,260,9), # Star Wars (1977)
     (0,1,8), # Toy Story (1995)
     (0,16,7), # Casino (1995)
     (0,25,8), # Leaving Las Vegas (1995)
     (0,32,9), # Twelve Monkeys (a.k.a. 12 Monkeys) (1995)
     (0,335,4), # Flintstones, The (1994)
     (0,379,3), # Timecop (1994)
     (0,296,7), # Pulp Fiction (1994)
     (0,858,10) , # Godfather, The (1972)
     (0,50,8) # Usual Suspects, The (1995)
    ]
new_user_ratings_RDD = sc.parallelize(new_user_ratings)
print 'New user ratings: %s' % new_user_ratings_RDD.take(10)

New user ratings: [(0, 260, 9), (0, 1, 8), (0, 16, 7), (0, 25, 8), (0, 32, 9), (0, 335, 4), (0, 379, 3), (0, 296, 7), (0, 858, 10), (0, 50, 8)]


In [91]:
complete_data_with_new_ratings_RDD = small_ratings_data.union(new_user_ratings_RDD)
new_ratings_model = ALS.train(complete_data_with_new_ratings_RDD, best_rank, seed=seed, 
                              iterations=iterations, lambda_=regularization_parameter)

In [94]:
new_user_ratings_ids = map(lambda x: x[1], new_user_ratings) # get just movie IDs
# keep just those not on the ID list (thanks Lei Li for spotting the error!)
new_user_unrated_movies_RDD = (small_movies_data.filter(lambda x: x[0] not in new_user_ratings_ids).map(lambda x: (new_user_ID, x[0])))

# Use the input RDD, new_user_unrated_movies_RDD, with new_ratings_model.predictAll() to predict new ratings for the movies
new_user_recommendations_RDD = new_ratings_model.predictAll(new_user_unrated_movies_RDD)

In [177]:
# Transform new_user_recommendations_RDD into pairs of the form (Movie ID, Predicted Rating)
new_user_recommendations_rating_RDD = new_user_recommendations_RDD.map(lambda x: (x.product, x.rating))
new_user_recommendations_rating_title_and_count_RDD = \
    new_user_recommendations_rating_RDD.join(small_movies_data.map(lambda x: (int(x[0]),x[1]))).join(movie_rating_counts_RDD.map(lambda x: (int(x[0]),x[1])))
    
new_user_recommendations_rating_title_and_count_RDD.take(3)

[(2049, ((4.562199322393095, u'Happiest Millionaire, The (1967)'), 37)),
 (3, ((4.739298416418869, u'Grumpier Old Men (1995)'), 478)),
 (2052, ((3.1871852002088388, u'Hocus Pocus (1993)'), 183))]

In [178]:
new_user_recommendations_rating_title_and_count_RDD = \
    new_user_recommendations_rating_title_and_count_RDD.map(lambda r: (r[1][0][1], r[1][0][0], r[1][1]))

#top_movies = new_user_recommendations_rating_title_and_count_RDD.takeOrdered(25, key=lambda x: -x[1])

In [183]:
top_movies = \
new_user_recommendations_rating_title_and_count_RDD.filter(lambda r: r[2] > 25).takeOrdered(25, lambda x: -x[1])

In [190]:
print ('TOP recommended movies (with more than 25 reviews):\n%s' %
        '\n'.join(map(str, top_movies)))

TOP recommended movies (with more than 25 reviews):
(u'Godfather, The (1972)', 8.618782676279423, 2223)
(u'Sanjuro (1962)', 8.583727613437148, 69)
(u'Shawshank Redemption, The (1994)', 8.459625459203405, 2227)
(u'Usual Suspects, The (1995)', 8.43257649446354, 1783)
(u'Pulp Fiction (1994)', 8.323246582380175, 2171)
(u"One Flew Over the Cuckoo's Nest (1975)", 8.311948459529157, 1725)
(u"Schindler's List (1993)", 8.268551082592634, 2304)
(u'American Beauty (1999)', 8.235519216836375, 3428)
(u'Seven Samurai (The Magnificent Seven) (Shichinin no samurai) (1954)', 8.21146216421041, 628)
(u'For All Mankind (1989)', 8.200458923903248, 27)
(u'Godfather: Part II, The (1974)', 8.198687179698142, 1692)
(u'Last Days, The (1998)', 8.186190178011733, 27)
(u'Star Wars: Episode IV - A New Hope (1977)', 8.17872551517652, 2991)
(u'GoodFellas (1990)', 8.15863408999893, 1657)
(u'Life Is Beautiful (La Vita \ufffd bella) (1997)', 8.150404721272066, 1152)
(u'Casablanca (1942)', 8.130743454698232, 1669)
(u'Dr.