In [None]:
# https://github.com/jadianes/spark-movie-lens/blob/master/notebooks/building-recommender.ipynb

In [1]:
import findspark
findspark.init()
import pyspark

In [2]:
sc = pyspark.SparkContext()

In [3]:
small_ratings_raw_data = sc.textFile('ratings.csv')

In [4]:
small_ratings_raw_data.take(5)[:3]

['userId,movieId,rating,timestamp',
 '1,31,2.5,1260759144',
 '1,1029,3.0,1260759179']

In [5]:
small_ratings_raw_data_header = small_ratings_raw_data.take(1)[0]

#### Remove the header and timestamp

In [9]:
small_ratings_data = small_ratings_raw_data.filter(lambda line: line!=small_ratings_raw_data_header)\
                                           .map(lambda line: line.split(","))\
                                           .map(lambda tokens: (int(tokens[0]), int(tokens[1]), float(tokens[2]))).cache()

In [7]:
small_ratings_data.take(5)

[(1, 31, 2.5), (1, 1029, 3.0), (1, 1061, 3.0), (1, 1129, 2.0), (1, 1172, 4.0)]

In [8]:
small_movies_raw_data = sc.textFile('movies.csv')
small_movies_raw_data_header = small_movies_raw_data.take(1)[0]

small_movies_raw_data.take(3)

['movieId,title,genres',
 '1,Toy Story (1995),Adventure|Animation|Children|Comedy|Fantasy',
 '2,Jumanji (1995),Adventure|Children|Fantasy']

In [11]:
small_movies_data = small_movies_raw_data.filter(lambda line: line!=small_movies_raw_data_header)\
                                         .map(lambda line: line.split(","))\
                                         .map(lambda tokens: (tokens[0],tokens[1])).cache()

In [12]:
small_movies_data.take(3)

[('1', 'Toy Story (1995)'),
 ('2', 'Jumanji (1995)'),
 ('3', 'Grumpier Old Men (1995)')]

#### Separate the data into training, validation, testing data

In [13]:
training_RDD, validation_RDD, test_RDD = small_ratings_data.randomSplit([6, 2, 2], seed=0)

In [14]:
# remove the ratings
validation_for_predict_RDD = validation_RDD.map(lambda x: (x[0], x[1]))
test_for_predict_RDD = test_RDD.map(lambda x: (x[0], x[1]))

## Training phase

In [15]:
from pyspark.mllib.recommendation import ALS
import math

#### 1.numBlocks is the number of blocks used to parallelize computation (set to -1 to auto-configure).
#### 2.rank is the number of latent factors in the model.
#### 3.iterations is the number of iterations to run.
#### 4.lambda specifies the regularization parameter in ALS.
#### 5.implicitPrefs specifies whether to use the explicit feedback ALS variant or one adapted for implicit feedback data.
#### 6.alpha is a parameter applicable to the implicit feedback variant of ALS that governs the baseline confidence in preference observations

In [16]:
seed = 5
iterations = 10
regularization_parameter = 0.1
ranks = [4, 8, 12]
errors = [0, 0, 0]
err = 0
tolerance = 0.02

min_error = float('inf')
best_rank = -1
best_iteration = -1
for rank in ranks:
    
    model = ALS.train(training_RDD, rank, seed=seed, iterations=iterations, lambda_=regularization_parameter)
    predictions = model.predictAll(validation_for_predict_RDD)\
                       .map(lambda r: ((r[0], r[1]), r[2]))
        
    rates_and_preds = validation_RDD.map(lambda r: ( (int(r[0]), int(r[1])), float(r[2]) )).join(predictions)
    error = math.sqrt(rates_and_preds.map(lambda r: (r[1][0] - r[1][1])**2).mean())
    errors[err] = error
    err += 1
    
    print ('For rank %s the RMSE is %s' % (rank, error))
    
    if error < min_error:
        min_error = error
        best_rank = rank

print('The best model was trained with rank %s'%best_rank)

For rank 4 the RMSE is 0.9405925542574991
For rank 8 the RMSE is 0.9451745059144594
For rank 12 the RMSE is 0.943590394737689
The best model was trained with rank 4


## Make recommendations

In [17]:
complete_movies_data = small_movies_data

In [18]:
complete_movies_titles = complete_movies_data.map(lambda x: (int(x[0]),x[1]))
print ("There are %s movies in the complete dataset" % (complete_movies_titles.count()))

There are 9125 movies in the complete dataset


In [19]:
complete_ratings_data = small_ratings_data
complete_ratings_data.take(1)

[(1, 31, 2.5)]

#### Count the number of ratings per movie

In [77]:
def get_counts_and_averages(ID_and_ratings_tuple):
    nratings = len(ID_and_ratings_tuple[1])
    return ID_and_ratings_tuple[0], (nratings, float(sum(x for x in ID_and_ratings_tuple[1]))/nratings)

movie_ID_with_ratings_RDD = (complete_ratings_data.map(lambda x: (x[1], x[2])).groupByKey())
movie_ID_with_avg_ratings_RDD = movie_ID_with_ratings_RDD.map(get_counts_and_averages)
movie_rating_counts_RDD = movie_ID_with_avg_ratings_RDD.map(lambda x: (x[0], x[1][0]))

In [91]:
# after function groupByKey, we can get every ratings for a single movieID
print(movie_ID_with_ratings_RDD.map(lambda x : (x[0], x[1])).take(1))

[(1172, <pyspark.resultiterable.ResultIterable object at 0x000001BC392400F0>)]


In [92]:
# after function groupByKey, we can get every ratings for a single movieID
print(movie_ID_with_ratings_RDD.map(lambda x : (x[0], list(x[1]))).take(1))

[(1172, [4.0, 5.0, 4.5, 2.0, 3.5, 4.0, 4.0, 4.0, 5.0, 5.0, 3.5, 4.5, 2.0, 3.0, 5.0, 3.5, 4.0, 5.0, 5.0, 2.0, 5.0, 4.0, 5.0, 3.0, 5.0, 5.0, 5.0, 4.5, 5.0, 4.0, 3.5, 3.0, 4.0, 5.0, 5.0, 4.0, 4.0, 5.0, 5.0, 5.0, 5.0, 5.0, 5.0, 4.0, 5.0, 4.5])]


In [90]:
movie_ID_with_avg_ratings_RDD.map(lambda x: (x[0], x[1])).take(1)

[(1172, (46, 4.260869565217392))]

In [89]:
movie_rating_counts_RDD.map(lambda x: (x[0], x[1])).take(1)

[(1172, 46)]

In [95]:
# no one's id is 0, so we assign 0 to new user
new_user_ID = 0

# The format of each line is (userID, movieID, rating)
new_user_ratings = [
     (0,260,9), # Star Wars (1977)
     (0,1,8), # Toy Story (1995)
     (0,16,7), # Casino (1995)
     (0,25,8), # Leaving Las Vegas (1995)
     (0,32,9), # Twelve Monkeys (a.k.a. 12 Monkeys) (1995)
     (0,335,4), # Flintstones, The (1994)
     (0,379,3), # Timecop (1994)
     (0,296,7), # Pulp Fiction (1994)
     (0,858,10) , # Godfather, The (1972)
     (0,50,8) # Usual Suspects, The (1995)
    ]
new_user_ratings_RDD = sc.parallelize(new_user_ratings)
print ('New user ratings: %s' % new_user_ratings_RDD.take(10))

New user ratings: [(0, 260, 9), (0, 1, 8), (0, 16, 7), (0, 25, 8), (0, 32, 9), (0, 335, 4), (0, 379, 3), (0, 296, 7), (0, 858, 10), (0, 50, 8)]


In [96]:
complete_data_with_new_ratings_RDD = complete_ratings_data.union(new_user_ratings_RDD)

In [97]:
from time import time

In [98]:
t0 = time()
new_ratings_model = ALS.train(complete_data_with_new_ratings_RDD, best_rank, seed=seed, 
                              iterations=iterations, lambda_=regularization_parameter)
tt = time() - t0

print ("New model trained in %s seconds" % round(tt,3))

New model trained in 26.067 seconds


#### Getting top recommendations

In [100]:
complete_movies_data.take(1)

[('1', 'Toy Story (1995)')]

In [99]:
new_user_ratings_ids = map(lambda x: x[1], new_user_ratings) # get just movie IDs

new_user_unrated_movies_RDD = (complete_movies_data.filter(lambda x: x[0] not in new_user_ratings_ids)\
                                                   .map(lambda x: (new_user_ID, x[0])))

# Use the input RDD, new_user_unrated_movies_RDD, with new_ratings_model.predictAll() to predict new ratings for the movies
new_user_recommendations_RDD = new_ratings_model.predictAll(new_user_unrated_movies_RDD)

In [102]:
new_user_unrated_movies_RDD.take(10)

[(0, '1'),
 (0, '2'),
 (0, '3'),
 (0, '4'),
 (0, '5'),
 (0, '6'),
 (0, '7'),
 (0, '8'),
 (0, '9'),
 (0, '10')]

In [103]:
new_user_recommendations_RDD.take(10)

[Rating(user=0, product=57972, rating=8.97712400398538),
 Rating(user=0, product=1084, rating=7.733498813537221),
 Rating(user=0, product=3456, rating=8.430838408537497),
 Rating(user=0, product=6400, rating=6.797203837496145),
 Rating(user=0, product=3272, rating=6.342372974087468),
 Rating(user=0, product=142192, rating=6.562134703263556),
 Rating(user=0, product=81132, rating=7.008927197290522),
 Rating(user=0, product=6308, rating=4.216882794514294),
 Rating(user=0, product=44828, rating=6.6684528947495725),
 Rating(user=0, product=52328, rating=6.167889490127285)]

In [None]:
new_user_recommendations_rating_RDD = new_user_recommendations_RDD.map(lambda x: (x.product, x.rating))

In [105]:
new_user_recommendations_rating_RDD.take(5)

[(57972, 8.97712400398538),
 (1084, 7.733498813537221),
 (3456, 8.430838408537497),
 (6400, 6.797203837496145),
 (3272, 6.342372974087468)]

In [109]:
complete_movies_titles.take(1), movie_rating_counts_RDD.take(1)

([(1, 'Toy Story (1995)')], [(1172, 46)])

In [None]:
# join by key
new_user_recommendations_rating_title_and_count_RDD = \
    new_user_recommendations_rating_RDD.join(complete_movies_titles).join(movie_rating_counts_RDD)

In [106]:
new_user_recommendations_rating_title_and_count_RDD.take(5)

[(3456, ((8.430838408537497, '"Color of Paradise'), 4)),
 (69720, ((3.208269796036223, 'Hood of Horror (2006)'), 1)),
 (912, ((8.448746826843543, 'Casablanca (1942)'), 117)),
 (4992, ((3.7333967240131116, 'Kate & Leopold (2001)'), 10)),
 (160656, ((6.562134703263556, 'Tallulah (2016)'), 1))]

In [110]:
# flat this down and remove movieID
new_user_recommendations_rating_title_and_count_RDD = \
    new_user_recommendations_rating_title_and_count_RDD.map(lambda r: (r[1][0][1], r[1][0][0], r[1][1]))

In [111]:
new_user_recommendations_rating_title_and_count_RDD.take(1)

[('"Color of Paradise', 8.430838408537497, 4)]

In [114]:
top_movies = new_user_recommendations_rating_title_and_count_RDD.filter(lambda r: r[2]>=25)\
                                                                .takeOrdered(25, key=lambda x: -x[1])

print ('TOP recommended movies (with more than 25 reviews):\n%s' %'\n'.join(map(str, top_movies)))

TOP recommended movies (with more than 25 reviews):
('Cinema Paradiso (Nuovo cinema Paradiso) (1989)', 9.091297353139804, 46)
('Ran (1985)', 9.001436362745064, 26)
('All About Eve (1950)', 8.977068919296402, 38)
('Modern Times (1936)', 8.919219242514382, 32)
('Sunset Blvd. (a.k.a. Sunset Boulevard) (1950)', 8.90551847973807, 39)
('Hamlet (1996)', 8.781169586470195, 25)
('"Godfather', 8.774708535972755, 200)
('There Will Be Blood (2007)', 8.761357663614609, 26)
('Annie Hall (1977)', 8.72696335322896, 80)
('And Your Mother Too (Y tu mamá también) (2001)', 8.6527620624353, 26)
('Dr. Strangelove or: How I Learned to Stop Worrying and Love the Bomb (1964)', 8.64813781548837, 105)
('It Happened One Night (1934)', 8.647597648260657, 25)
('On the Waterfront (1954)', 8.637991683707142, 29)
('"Third Man', 8.634315353567942, 38)
('Roman Holiday (1953)', 8.626193827872093, 26)
('Pulp Fiction (1994)', 8.580314246570065, 324)
('Duck Soup (1933)', 8.571521258274593, 34)
('His Girl Friday (1940)', 8.5

In [120]:
a = sc.parallelize([(0,1), (11, 2)])

In [121]:
a.take(2)

[(0, 1), (11, 2)]

In [131]:
b = sc.parallelize([(0,'Suck'), (11, 'Kidding')])

In [134]:
c = a.join(b).cache()

In [130]:
c.map(lambda x: (x[0], list(x[1]))).take(3)

[(0, [(1, 3)]), (11, [(2, 4), (2, 5)])]

In [136]:
c.take(3)

[(0, (1, 'Suck')), (11, (2, 'Kidding'))]

In [20]:
sc.stop()