# Building Recommender

### File download

In [1]:
complete_dataset_url = 'http://files.grouplens.org/datasets/movielens/ml-latest.zip'
small_dataset_url = 'http://files.grouplens.org/datasets/movielens/ml-latest-small.zip'

In [2]:
import os

!mkdir -p datasets
datasets_path = os.path.join('./', 'datasets')

complete_dataset_path = os.path.join(datasets_path, 'ml-latest.zip')
small_dataset_path = os.path.join(datasets_path, 'ml-latest-small.zip')

In [3]:
import urllib

small_f = urllib.request.urlretrieve (small_dataset_url, small_dataset_path)
complete_f = urllib.request.urlretrieve (complete_dataset_url, complete_dataset_path)

In [4]:
import zipfile

with zipfile.ZipFile(small_dataset_path, "r") as z:
    z.extractall(datasets_path)

with zipfile.ZipFile(complete_dataset_path, "r") as z:
    z.extractall(datasets_path)

### Install Spark

In [5]:
%%capture
!sudo apt-get update --fix-missing

!apt-get install openjdk-8-jdk-headless -qq > /dev/null

!wget -q https://archive.apache.org/dist/spark/spark-3.0.0/spark-3.0.0-bin-hadoop3.2.tgz
#!wget -q https://downloads.apache.org/spark/spark-3.0.0/spark-3.0.0-bin-hadoop3.2.tgz

!mv spark-3.0.0-bin-hadoop3.2.tgz sparkkk
!tar xf sparkkk
!pip install -q findspark
!pip install pyspark

In [6]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.0.0-bin-hadoop3.2"

import findspark
findspark.init()

from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()

from pyspark.sql import SparkSession
spark = SparkSession \
    .builder \
    .appName('building-recommender') \
    .getOrCreate()

spark   

### Loading and parsing datasets

In [7]:
sc = spark.sparkContext
small_ratings_file = os.path.join(datasets_path, 'ml-latest-small', 'ratings.csv')

small_ratings_raw_data = sc.textFile(small_ratings_file)
small_ratings_raw_data_header = small_ratings_raw_data.take(1)[0]

In [8]:
small_ratings_data = small_ratings_raw_data.filter(lambda line: line!=small_ratings_raw_data_header)\
    .map(lambda line: line.split(",")).map(lambda tokens: (tokens[0],tokens[1],tokens[2])).cache()

In [9]:
small_ratings_data.take(3)

[('1', '1', '4.0'), ('1', '3', '4.0'), ('1', '6', '4.0')]

In [10]:
small_movies_file = os.path.join(datasets_path, 'ml-latest-small', 'movies.csv')

small_movies_raw_data = sc.textFile(small_movies_file)
small_movies_raw_data_header = small_movies_raw_data.take(1)[0]

small_movies_data = small_movies_raw_data.filter(lambda line: line!=small_movies_raw_data_header)\
    .map(lambda line: line.split(",")).map(lambda tokens: (tokens[0],tokens[1])).cache()
    
small_movies_data.take(3)

[('1', 'Toy Story (1995)'),
 ('2', 'Jumanji (1995)'),
 ('3', 'Grumpier Old Men (1995)')]

### Selecting ALS parameters using the small dataset

In [11]:
training_RDD, validation_RDD, test_RDD = small_ratings_data.randomSplit([6, 2, 2], seed=0)
validation_for_predict_RDD = validation_RDD.map(lambda x: (x[0], x[1]))
test_for_predict_RDD = test_RDD.map(lambda x: (x[0], x[1]))

In [19]:
from pyspark.mllib.recommendation import ALS
import math

seed = 5
iterations = [5, 10, 20]
regularization_parameter = 0.1
ranks = [4, 8, 12]
errors = []
tolerance = 0.02

min_error = float('inf')
best_rank = -1
best_iteration = -1

for rank in ranks:
    for iteration in iterations:
        model = ALS.train(training_RDD, rank, seed=seed, iterations=iteration,
                          lambda_=regularization_parameter)
        predictions = model.predictAll(validation_for_predict_RDD).map(lambda r: ((r[0], r[1]), r[2]))
        rates_and_preds = validation_RDD.map(lambda r: ((int(r[0]), int(r[1])), float(r[2]))).join(predictions)
        error = math.sqrt(rates_and_preds.map(lambda r: (r[1][0] - r[1][1])**2).mean())
        errors.append(error)
        print("For rank %s and iteration %s the RMSE is %s" % (rank, iteration, error))
        if error < min_error:
            min_error = error
            best_rank = rank
            best_iteration = iteration

print("The best model was trained with rank %s and iteration %s" % (best_rank, best_iteration))

For rank 4 and iteration 5 the RMSE is 0.9216828115540059
For rank 4 and iteration 10 the RMSE is 0.9121002114021121
For rank 4 and iteration 20 the RMSE is 0.9084344829867742
For rank 8 and iteration 5 the RMSE is 0.9252257022633836
For rank 8 and iteration 10 the RMSE is 0.9184327213070025
For rank 8 and iteration 20 the RMSE is 0.9154033053129291
For rank 12 and iteration 5 the RMSE is 0.9251850957336817
For rank 12 and iteration 10 the RMSE is 0.9160151537868968
For rank 12 and iteration 20 the RMSE is 0.9109898805756902
The best model was trained with rank 4 and iteration 20


In [17]:
predictions.take(3)

[((372, 1084), 3.6273558036170312),
 ((4, 1084), 3.806848948516306),
 ((402, 1084), 3.4207522972274793)]

In [18]:
rates_and_preds.take(3)

[((1, 457), (5.0, 4.505973514879727)),
 ((1, 1025), (5.0, 4.598382643002677)),
 ((1, 1089), (5.0, 4.855986245944868))]

In [22]:
model = ALS.train(training_RDD, best_rank, seed=seed, iterations=best_iteration,
                      lambda_=regularization_parameter)
predictions = model.predictAll(test_for_predict_RDD).map(lambda r: ((r[0], r[1]), r[2]))
rates_and_preds = test_RDD.map(lambda r: ((int(r[0]), int(r[1])), float(r[2]))).join(predictions)
error = math.sqrt(rates_and_preds.map(lambda r: (r[1][0] - r[1][1])**2).mean())
    
print ("For testing data the RMSE is %s" % (error))

For testing data the RMSE is 0.9098996087060645


### Using the complete dataset to build the final model

In [24]:
# Load the complete dataset file
complete_ratings_file = os.path.join(datasets_path, 'ml-latest', 'ratings.csv')
complete_ratings_raw_data = sc.textFile(complete_ratings_file)
complete_ratings_raw_data_header = complete_ratings_raw_data.take(1)[0]

# Parse
complete_ratings_data = complete_ratings_raw_data.filter(lambda line: line!=complete_ratings_raw_data_header)\
    .map(lambda line: line.split(",")).map(lambda tokens: (int(tokens[0]),int(tokens[1]),float(tokens[2]))).cache()
    
print("There are %s recommendations in the complete dataset" % (complete_ratings_data.count()))

There are 27753444 recommendations in the complete dataset


In [26]:
training_RDD, test_RDD = complete_ratings_data.randomSplit([7, 3], seed=0)

complete_model = ALS.train(training_RDD, best_rank, seed=seed, 
                           iterations=best_iteration, lambda_=regularization_parameter)

In [27]:
test_for_predict_RDD = test_RDD.map(lambda x: (x[0], x[1]))

predictions = complete_model.predictAll(test_for_predict_RDD).map(lambda r: ((r[0], r[1]), r[2]))
rates_and_preds = test_RDD.map(lambda r: ((int(r[0]), int(r[1])), float(r[2]))).join(predictions)
error = math.sqrt(rates_and_preds.map(lambda r: (r[1][0] - r[1][1])**2).mean())
    
print("For testing data the RMSE is %s" % (error))

For testing data the RMSE is 0.8320316099023749


### How to make recommendations

In [28]:
complete_movies_file = os.path.join(datasets_path, 'ml-latest', 'movies.csv')
complete_movies_raw_data = sc.textFile(complete_movies_file)
complete_movies_raw_data_header = complete_movies_raw_data.take(1)[0]

# Parse
complete_movies_data = complete_movies_raw_data.filter(lambda line: line!=complete_movies_raw_data_header)\
    .map(lambda line: line.split(",")).map(lambda tokens: (int(tokens[0]),tokens[1],tokens[2])).cache()

complete_movies_titles = complete_movies_data.map(lambda x: (int(x[0]),x[1]))
    
print("There are %s movies in the complete dataset" % (complete_movies_titles.count()))

There are 58098 movies in the complete dataset


In [29]:
def get_counts_and_averages(ID_and_ratings_tuple):
    nratings = len(ID_and_ratings_tuple[1])
    return ID_and_ratings_tuple[0], (nratings, float(sum(x for x in ID_and_ratings_tuple[1]))/nratings)

movie_ID_with_ratings_RDD = (complete_ratings_data.map(lambda x: (x[1], x[2])).groupByKey())
movie_ID_with_avg_ratings_RDD = movie_ID_with_ratings_RDD.map(get_counts_and_averages)
movie_rating_counts_RDD = movie_ID_with_avg_ratings_RDD.map(lambda x: (x[0], x[1][0]))

### Adding new user ratings

In [30]:
new_user_ID = 0

# The format of each line is (userID, movieID, rating)
new_user_ratings = [
     (0,260,9), # Star Wars (1977)
     (0,1,8), # Toy Story (1995)
     (0,16,7), # Casino (1995)
     (0,25,8), # Leaving Las Vegas (1995)
     (0,32,9), # Twelve Monkeys (a.k.a. 12 Monkeys) (1995)
     (0,335,4), # Flintstones, The (1994)
     (0,379,3), # Timecop (1994)
     (0,296,7), # Pulp Fiction (1994)
     (0,858,10) , # Godfather, The (1972)
     (0,50,8) # Usual Suspects, The (1995)
    ]
new_user_ratings_RDD = sc.parallelize(new_user_ratings)
print("New user ratings: %s" % new_user_ratings_RDD.take(10))
     

New user ratings: [(0, 260, 9), (0, 1, 8), (0, 16, 7), (0, 25, 8), (0, 32, 9), (0, 335, 4), (0, 379, 3), (0, 296, 7), (0, 858, 10), (0, 50, 8)]


In [31]:
complete_data_with_new_ratings_RDD = complete_ratings_data.union(new_user_ratings_RDD)

In [38]:
from time import time

t0 = time()
new_ratings_model = ALS.train(complete_data_with_new_ratings_RDD, best_rank, seed=seed, 
                              iterations=best_iteration, lambda_=regularization_parameter)
tt = time() - t0

print("New model trained in %s seconds" % round(tt,3))

New model trained in 364.207 seconds


### Getting top recommendations

In [39]:
new_user_ratings_ids = map(lambda x: x[1], new_user_ratings) # get just movie IDs
# keep just those not on the ID list (thanks Lei Li for spotting the error!)
new_user_unrated_movies_RDD = (complete_movies_data.filter(lambda x: x[0] not in new_user_ratings_ids).map(lambda x: (new_user_ID, x[0])))

# Use the input RDD, new_user_unrated_movies_RDD, with new_ratings_model.predictAll() to predict new ratings for the movies
new_user_recommendations_RDD = new_ratings_model.predictAll(new_user_unrated_movies_RDD)

In [40]:
# Transform new_user_recommendations_RDD into pairs of the form (Movie ID, Predicted Rating)
new_user_recommendations_rating_RDD = new_user_recommendations_RDD.map(lambda x: (x.product, x.rating))
new_user_recommendations_rating_title_and_count_RDD = \
    new_user_recommendations_rating_RDD.join(complete_movies_titles).join(movie_rating_counts_RDD)
new_user_recommendations_rating_title_and_count_RDD.take(3)

[(6216,
  ((7.026445662645672, 'Nowhere in Africa (Nirgendwo in Afrika) (2001)'),
   717)),
 (124320, ((7.492523897937501, 'Once a Thief (1965)'), 1)),
 (83916, ((6.498638531131836, 'Blues in the Night (1941)'), 9))]

In [41]:
new_user_recommendations_rating_title_and_count_RDD = \
    new_user_recommendations_rating_title_and_count_RDD.map(lambda r: (r[1][0][1], r[1][0][0], r[1][1]))

In [42]:
top_movies = new_user_recommendations_rating_title_and_count_RDD.filter(lambda r: r[2]>=25).takeOrdered(25, key=lambda x: -x[1])

print("TOP recommended movies (with more than 25 reviews):\n%s" %
        '\n'.join(map(str, top_movies)))

TOP recommended movies (with more than 25 reviews):
('Music for One Apartment and Six Drummers (2001)', 9.127790682513762, 31)
('Rabbit of Seville (1950)', 8.981667585474511, 30)
('"Human Condition III', 8.924352265935553, 91)
('Baseball (1994)', 8.825348145082458, 42)
('Harakiri (Seppuku) (1962)', 8.82350612380764, 679)
('Connections (1978)', 8.80328925260546, 49)
('"I', 8.761593869238741, 85)
("Jim Henson's The Storyteller (1989)", 8.737766561534556, 36)
('Wow! A Talking Fish! (1983)', 8.733375962558078, 47)
('"Last Lions', 8.731667090276083, 38)
('Duck Amuck (1953)', 8.720967892241077, 226)
('Elway To Marino (2013)', 8.720128372049018, 25)
('"Lonely Wife', 8.71044688710834, 43)
('Cosmos', 8.701890553829813, 157)
('The Garden of Sinners - Chapter 5: Paradox Paradigm (2008)', 8.689368679793624, 27)
('Rabbit Fire (1951)', 8.678015012489311, 46)
('Dimensions of Dialogue (Moznosti dialogu) (1982)', 8.67314498984205, 65)
('Crooks in Clover (a.k.a. Monsieur Gangster) (Les tontons flingueur

### Getting individual ratings

In [43]:
my_movie = sc.parallelize([(0, 500)]) # Quiz Show (1994)
individual_movie_rating_RDD = new_ratings_model.predictAll(new_user_unrated_movies_RDD)
individual_movie_rating_RDD.take(1)

[Rating(user=0, product=116688, rating=2.0178150675852966)]