In [None]:
complete_dataset_url = 'http://files.grouplens.org/datasets/movielens/ml-latest.zip'
small_dataset_url = 'http://files.grouplens.org/datasets/movielens/ml-latest-small.zip'

In [10]:
# define download locations
import os

datasets_path = os.path.join('datasets')

complete_dataset_path = os.path.join(datasets_path, 'ml-latest.zip')
small_dataset_path = os.path.join(datasets_path, 'ml-latest-small.zip')

In [11]:
# proceed with downloads
import urllib.request

small_f = urllib.request.urlretrieve(small_dataset_url, small_dataset_path)
complete_f = urllib.request.urlretrieve(complete_dataset_url, complete_dataset_path)

In [12]:
# extract rating data into their invididual folders
import zipfile

with zipfile.ZipFile(small_dataset_path, 'r') as z:
    z.extractall(datasets_path)
with zipfile.ZipFile(complete_dataset_path, 'r') as z:
    z.extractall(datasets_path)

In [19]:
# laod the raw ratings data and filter out the header
from pyspark import SparkContext

sc = SparkContext.getOrCreate()
small_ratings_file = os.path.join(datasets_path, 'ml-latest-small', 'ratings.csv')

small_ratings_raw_data = sc.textFile(small_ratings_file)
small_ratings_raw_data_header = small_ratings_raw_data.take(1)[0]

In [29]:
small_ratings_data = small_ratings_raw_data.filter(lambda line: line!=small_ratings_raw_data_header)\
.map(lambda line: line.split(',')).map(lambda tokens: (tokens[0],tokens[1],tokens[2])).cache()

In [31]:
small_movies_file = os.path.join(datasets_path, 'ml-latest-small', 'movies.csv')

small_movies_raw_data = sc.textFile(small_movies_file)
small_movies_raw_data_header = small_movies_raw_data.take(1)[0]

In [32]:
small_movies_data = small_movies_raw_data.filter(lambda line: line!=small_movies_raw_data_header)\
.map(lambda line: line.split(',')).map(lambda tokens: (tokens[0], tokens[1])).cache()

In [33]:
small_movies_raw_data_header

'movieId,title,genres'

In [35]:
small_ratings_data.take(5)

[('1', '1', '4.0'),
 ('1', '3', '4.0'),
 ('1', '6', '4.0'),
 ('1', '47', '5.0'),
 ('1', '50', '5.0')]

In [38]:
# split the small dataset into train, validation and test datasets
training_RDD, validation_RDD, test_RDD = small_ratings_data.randomSplit([6, 2, 2], seed=0)
validation_for_predict_RDD = validation_RDD.map(lambda x: (x[0], x[1]))
test_for_predict_RDD = test_RDD.map(lambda x: (x[0], x[1]))

In [40]:
# model training with small dataset
from pyspark.mllib.recommendation import ALS
import math

seed = 5
iterations = 10
regularization_parameter = 0.1
ranks = [4, 8, 12]
errors = [0, 0, 0]
err = 0
tolerance = 0.02

min_error = float('inf')
best_rank = -1
best_iteration = -1
for rank in ranks:
    model = ALS.train(training_RDD, rank, seed=seed, iterations=iterations,
                     lambda_=regularization_parameter)
    predictions = model.predictAll(validation_for_predict_RDD).map(lambda r: ((r[0], r[1]), r[2]))
    rates_and_preds = validation_RDD.map(lambda r: ((int(r[0]), int(r[1])), float(r[2]))).join(predictions)
    error = math.sqrt(rates_and_preds.map(lambda r: (r[1][0] - r[1][1]) ** 2).mean())
    errors[err] = error
    err += 1
    print('For rank %s the RMSE is %s' % (rank, error))
    if error < min_error:
        min_error = error
        best_rank = rank
print('The best model was trained with rank %s' % best_rank)

For rank 4 the RMSE is 0.9135506292157884
For rank 8 the RMSE is 0.9133253198173535
For rank 12 the RMSE is 0.9194236820985827
The best model was trained with rank 8


In [41]:
predictions.take(3)

[((4, 1084), 3.6227087809721867),
 ((156, 1084), 3.8102899221717266),
 ((372, 1084), 3.5483496000160977)]

In [42]:
rates_and_preds.take(3)

[((1, 47), (5.0, 4.779416113993275)),
 ((1, 457), (5.0, 4.505201388257333)),
 ((1, 527), (5.0, 5.17322824176339))]

In [43]:
# test the selected model
model = ALS. train(training_RDD, best_rank, seed=seed, iterations=iterations, 
                  lambda_=regularization_parameter)
predictions = model.predictAll(test_for_predict_RDD).map(lambda r: ((r[0], r[1]), r[2]))
rates_and_preds = test_RDD.map(lambda r: ((int(r[0]), int(r[1])), float(r[2]))).join(predictions)
error = math.sqrt(rates_and_preds.map(lambda r: (r[1][0] - r[1][1]) ** 2).mean())
print('For testing data the RMSE is %s' % error)

For testing data the RMSE is 0.9134290435791891


In [47]:
# using the complete dataset to build the final model
# load the complete datast file
complete_ratings_file = os.path.join(datasets_path, 'ml-latest', 'ratings.csv')
complete_ratings_raw_data = sc.textFile(complete_ratings_file)
complete_ratings_raw_data_header = complete_ratings_raw_data.take(1)[0]

# parse the data
complete_ratings_data = complete_ratings_raw_data.filter(lambda line: line!=complete_ratings_raw_data_header)\
    .map(lambda line: line.split(",")).map(lambda tokens: (int(tokens[0]),int(tokens[1]),float(tokens[2]))).cache()
    
print('There are %s recommendations in the complete dataset' % (complete_ratings_data.count()))

There are 27753444 recommendations in the complete dataset


In [45]:
# train the recommender model
training_RDD, test_RDD = complete_ratings_data.randomSplit([7, 3], seed=0)

complete_model = ALS.train(training_RDD, best_rank, seed=seed, iterations=iterations, lambda_=regularization_parameter)

In [48]:
# test the complete model on testing set
test_for_predict_RDD = test_RDD.map(lambda x: (x[0], x[1]))
predictions = complete_model.predictAll(test_for_predict_RDD).map(lambda r: ((r[0], r[1]), r[2]))
rates_and_preds = test_RDD.map(lambda r: ((int(r[0]), int(r[1])), float(r[2]))).join(predictions)
error = math.sqrt(rates_and_preds.map(lambda r: (r[1][0] - r[1][1]) ** 2).mean())

print('For testing data the RMSE is %s' % error)

For testing data the RMSE is 0.8213568307148696


In [49]:
# load the movies complete file for later use
complete_movies_file = os.path.join(datasets_path, 'ml-latest', 'movies.csv')
complete_movies_raw_data = sc.textFile(complete_movies_file)
complete_movies_data_header = complete_movies_raw_data.take(1)[0]

# parse the data
complete_movies_data = complete_movies_raw_data.filter(lambda line: line != complete_movies_data_header)\
.map(lambda line: line.split(',')).map(lambda tokens: (int(tokens[0]), tokens[1], tokens[2])).cache()

In [52]:
complete_movies_titles = complete_movies_data.map(lambda x: (int(x[0]), x[1]))
print('There are %s movies in the complete datasets' % (complete_movies_titles.count()))

There are 58098 movies in the complete datasets


In [1]:
# calculate average ratings and total number of ratings for each movie
def get_counts_and_averages(ID_and_ratings_tuple):
    nratings = len(ID_and_ratings_tuple[1])
    return ID_and_ratings_tuple[0], (nratings, float(sum(x for x in ID_and_ratings_tuple[1])) / nratings)

movie_ID_with_ratings_RDD = complete_ratings_data.map(lambda x: (x[1], x[2])).groupByKey()
movie_ID_with_avg_ratings_RDD = movie_ID_with_ratings_RDD.map(get_counts_and_averages)
movie_rating_counts_RDD = movie_ID_with_avg_ratings_RDD.map(lambda x: (x[0], x[1][0]))

NameError: name 'complete_ratings_data' is not defined

In [55]:
# add new user ratings and put them in a new RDD
new_user_ID = 0
# The format of each line is (userID, movieID, rating)
new_user_ratings = [
     (0,260,9), # Star Wars (1977)
     (0,1,8), # Toy Story (1995)
     (0,16,7), # Casino (1995)
     (0,25,8), # Leaving Las Vegas (1995)
     (0,32,9), # Twelve Monkeys (a.k.a. 12 Monkeys) (1995)
     (0,335,4), # Flintstones, The (1994)
     (0,379,3), # Timecop (1994)
     (0,296,7), # Pulp Fiction (1994)
     (0,858,10) , # Godfather, The (1972)
     (0,50,8) # Usual Suspects, The (1995)
    ]
new_user_ratings_RDD = sc.parallelize(new_user_ratings)
print('New user ratings: %s' % new_user_ratings_RDD.take(10))

New user ratings: [(0, 260, 9), (0, 1, 8), (0, 16, 7), (0, 25, 8), (0, 32, 9), (0, 335, 4), (0, 379, 3), (0, 296, 7), (0, 858, 10), (0, 50, 8)]


In [57]:
# add the new user ratings to the training data for recommender model
complete_data_with_new_ratings_RDD = complete_ratings_data.union(new_user_ratings_RDD)

In [58]:
# train the ALS model using all the parameters seleted before
from time import time
t0 = time()
new_ratings_model = ALS.train(complete_data_with_new_ratings_RDD, best_rank, seed=seed,
                             iterations=iterations, lambda_=regularization_parameter)
tt = time() - t0

print('New model trained in %s seconds' % round(tt, 3))

New model trained in 156.641 seconds


In [59]:
complete_movies_data.take(1)

[(1, 'Toy Story (1995)', 'Adventure|Animation|Children|Comedy|Fantasy')]

In [61]:
# getting top recommendations
# get an RDD with all the movies that the new user hasn't rated yet
new_user_ratings_ids = map(lambda x: x[1], new_user_ratings) # get just movie IDs
# keep just those not on the ID list
new_user_unrated_movies_RDD = complete_movies_data.filter(lambda x: x[0] not in new_user_ratings_ids).map(lambda x: (new_user_ID, x[0]))

In [62]:
new_user_unrated_movies_RDD.take(2)

[(0, 2), (0, 3)]

In [63]:
# use the input RDD, new_users_unrated_movies_RDD, with new_ratings_model.predictAll() to predict new ratings for the movies
new_user_recommendations_RDD = new_ratings_model.predictAll(new_user_unrated_movies_RDD)

In [64]:
new_user_recommendations_RDD.take(2)

[Rating(user=0, product=116688, rating=1.942818887104374),
 Rating(user=0, product=57044, rating=5.872604571679518)]

In [31]:
# transform new_user_recommendations_RDD into pairs of the form (Movie ID, Predicted Rating)
new_user_recommendations_rating_RDD = new_user_recommendations_RDD.map(lambda x: (x.product, x.rating))
new_user_recommendations_rating_title_and_count_RDD = \
new_user_recommendations_rating_RDD.join(complete_movies_titles).join(movie_rating_counts_RDD)
new_user_recommendations_rating_title_and_count_RDD.take(3)

[(125970, ((4.0670946283444085, 'Halloweentown (1998)'), 148)),
 (7410, ((5.169659204023402, '"Osterman Weekend'), 177)),
 (163020, ((5.2543450405208345, 'The Fits (2016)'), 37))]

In [66]:
complete_movies_titles.take(1)

[(1, 'Toy Story (1995)')]

In [67]:
movie_rating_counts_RDD.take(1)

[(1449, 6867)]

In [32]:
new_user_recommendations_rating_title_and_count_RDD = \
new_user_recommendations_rating_title_and_count_RDD.map(lambda r: (r[1][0][1], r[1][0][0], r[1][1]))

In [33]:
new_user_recommendations_rating_title_and_count_RDD.take(1)

[('Halloweentown (1998)', 4.0670946283444085, 148)]

In [34]:
top_movies = new_user_recommendations_rating_title_and_count_RDD.filter(lambda r: r[2] >= 25).takeOrdered(25, key=lambda x: -x[1])
print('Top recommended movies (with more than 25 reviews):\n %s' % '\n'.join(map(str, top_movies)))

Top recommended movies (with more than 25 reviews):
 ('Cosmos', 8.77380602282307, 157)
('"Godfather', 8.673892677504824, 60904)
('Music for One Apartment and Six Drummers (2001)', 8.660283657976892, 31)
('Planet Earth II (2016)', 8.572289265784786, 853)
('Planet Earth (2006)', 8.548089885641577, 1384)
('Band of Brothers (2001)', 8.525198248300295, 984)
('Frozen Planet (2011)', 8.512755610097, 402)
('"Godfather: Part II', 8.497728955132377, 38875)
('Seven Samurai (Shichinin no samurai) (1954)', 8.420076488054455, 14578)
('Star Wars: Episode V - The Empire Strikes Back (1980)', 8.379513705645964, 65822)
('"Civil War', 8.370763863325083, 431)
('The Godfather Trilogy: 1972-1990 (1992)', 8.364873099117656, 421)
('Star Wars: Episode IV - A New Hope (1977)', 8.362069511992544, 81815)
('"Lord of the Rings: The Fellowship of the Ring', 8.358713928000869, 61883)
('Casablanca (1942)', 8.353733164606588, 31095)
('Dr. Strangelove or: How I Learned to Stop Worrying and Love the Bomb (1964)', 8.35122

In [35]:
my_movie = sc.parallelize([(0, 500)]) # Quiz Show (1994)
individual_movie_rating_RDD = new_ratings_model.predictAll(new_user_unrated_movies_RDD)
individual_movie_rating_RDD.filter(lambda x: x[1] == 500).take(1)

[Rating(user=0, product=500, rating=5.266544001099173)]

In [36]:
my_movie = sc.parallelize([(0, 500)]) # Quiz Show (1994)
individual_movie_rating_RDD = new_ratings_model.predictAll(my_movie)
individual_movie_rating_RDD.take(1)

[Rating(user=0, product=500, rating=5.266544001099173)]

In [54]:
# persisting the model
from pyspark.mllib.recommendation import MatrixFactorizationModel

model_path = os.path.join('models', 'movie_lens_als')

# Save and load model
model.save(sc, model_path)
same_model = MatrixFactorizationModel.load(sc, model_path)

Py4JJavaError: An error occurred while calling o352.save.
: org.apache.hadoop.mapred.FileAlreadyExistsException: Output directory file:/C:/Users/Xiaoman/Desktop/spark-movie-recommendation/models/movie_lens_als/metadata already exists
	at org.apache.hadoop.mapred.FileOutputFormat.checkOutputSpecs(FileOutputFormat.java:131)
	at org.apache.spark.internal.io.HadoopMapRedWriteConfigUtil.assertConf(SparkHadoopWriter.scala:287)
	at org.apache.spark.internal.io.SparkHadoopWriter$.write(SparkHadoopWriter.scala:71)
	at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1.apply$mcV$sp(PairRDDFunctions.scala:1096)
	at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1.apply(PairRDDFunctions.scala:1094)
	at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1.apply(PairRDDFunctions.scala:1094)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
	at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopDataset(PairRDDFunctions.scala:1094)
	at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$4.apply$mcV$sp(PairRDDFunctions.scala:1067)
	at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$4.apply(PairRDDFunctions.scala:1032)
	at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$4.apply(PairRDDFunctions.scala:1032)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
	at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:1032)
	at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$1.apply$mcV$sp(PairRDDFunctions.scala:958)
	at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$1.apply(PairRDDFunctions.scala:958)
	at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$1.apply(PairRDDFunctions.scala:958)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
	at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:957)
	at org.apache.spark.rdd.RDD$$anonfun$saveAsTextFile$1.apply$mcV$sp(RDD.scala:1499)
	at org.apache.spark.rdd.RDD$$anonfun$saveAsTextFile$1.apply(RDD.scala:1478)
	at org.apache.spark.rdd.RDD$$anonfun$saveAsTextFile$1.apply(RDD.scala:1478)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
	at org.apache.spark.rdd.RDD.saveAsTextFile(RDD.scala:1478)
	at org.apache.spark.mllib.recommendation.MatrixFactorizationModel$SaveLoadV1_0$.save(MatrixFactorizationModel.scala:367)
	at org.apache.spark.mllib.recommendation.MatrixFactorizationModel.save(MatrixFactorizationModel.scala:205)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:745)
