In [1]:
# this command makes pyspark importable as a regular library
import findspark

findspark.init()

In [2]:
# initializing Spark and importingg classes

import pyspark

from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()  #starts a spark session

from pyspark import SparkContext 
sc = SparkContext.getOrCreate()  #establishes a connection to a Spark execution environment

df = spark.sql("select 'spark' as hello ")

df.show()

+-----+
|hello|
+-----+
|spark|
+-----+



In [3]:
#setting the datasets path

small_dataset_url = 'http://files.grouplens.org/datasets/movielens/ml-latest-small.zip'
complete_dataset_url = 'http://files.grouplens.org/datasets/movielens/ml-latest.zip'

In [4]:
import os

datasets_path = os.path.join('Desktop', 'datasets')

small_dataset_path = os.path.join(datasets_path, 'ml-latest-small.zip')
complete_dataset_path = os.path.join(datasets_path, 'ml-latest.zip')

In [5]:
# opening the url

import urllib

small_f = urllib.request.urlretrieve (small_dataset_url, small_dataset_path)
complete_f = urllib.request.urlretrieve (complete_dataset_url, complete_dataset_path)

In [6]:
#extracting the datasets

import zipfile

with zipfile.ZipFile(small_dataset_path, "r") as z:
    z.extractall(datasets_path)

with zipfile.ZipFile(complete_dataset_path, "r") as z:
    z.extractall(datasets_path)

In [7]:
#Loading dataset

small_ratings_file = os.path.join(datasets_path, 'ml-latest-small', 'ratings.csv')

small_ratings_raw_data = sc.textFile(small_ratings_file)
small_ratings_raw_data_header = small_ratings_raw_data.take(1)[0]

In [8]:
#Parsing the dataset

small_ratings_data = small_ratings_raw_data.filter(lambda line: line!=small_ratings_raw_data_header)\
    .map(lambda line: line.split(",")).map(lambda tokens: (int(tokens[0]),int(tokens[1]),float(tokens[2]))).cache()
    
print ("There are %s recommendations in the complete dataset" % (small_ratings_data.count()))

There are 100836 recommendations in the complete dataset


In [9]:
#Testing the RDD

small_ratings_data.take(5)

[(1, 1, 4.0), (1, 3, 4.0), (1, 6, 4.0), (1, 47, 5.0), (1, 50, 5.0)]

In [10]:
#Loading and parsing the movies dataset

small_movies_file = os.path.join(datasets_path, 'ml-latest-small', 'movies.csv')

small_movies_raw_data = sc.textFile(small_movies_file)
small_movies_raw_data_header = small_movies_raw_data.take(1)[0]

small_movies_data = small_movies_raw_data.filter(lambda line: line!=small_movies_raw_data_header)\
    .map(lambda line: line.split(",")).map(lambda tokens: (tokens[0],tokens[1])).cache()
    
small_movies_data.take(5)

[('1', 'Toy Story (1995)'),
 ('2', 'Jumanji (1995)'),
 ('3', 'Grumpier Old Men (1995)'),
 ('4', 'Waiting to Exhale (1995)'),
 ('5', 'Father of the Bride Part II (1995)')]

In [11]:
#Splitting the dataset into train, validation, and test datasets

training_RDD, validation_RDD, test_RDD = small_ratings_data.randomSplit([6, 2, 2], seed=0)
validation_for_predict_RDD = validation_RDD.map(lambda x: (x[0], x[1]))
test_for_predict_RDD = test_RDD.map(lambda x: (x[0], x[1]))

In [12]:
#Training phase

from pyspark.mllib.recommendation import ALS
import math

seed = 5
iterations = 10
regularization_parameter = 0.1
ranks = [4, 8, 12]
errors = [0, 0, 0]
err = 0
tolerance = 0.02

min_error = float('inf')
best_rank = +1
best_iteration = -1
for rank in ranks:
    small_model = ALS.train(training_RDD, rank, seed=seed, iterations=iterations,
                        lambda_=regularization_parameter)
    predictions = small_model.predictAll(validation_for_predict_RDD).map(lambda r: ((r[0], r[1]), r[2]))
    rates_and_preds = validation_RDD.map(lambda r: ((int(r[0]), int(r[1])), float(r[2]))).join(predictions)
    error = math.sqrt(rates_and_preds.map(lambda r: (r[1][0] - r[1][1])**2).mean())
    errors[err] = error
    err += 1
    print ("For rank %s the RMSE is %s" % (rank, error))
    if error < min_error:
        min_error = error
        best_rank = rank

print ('The best model was trained with rank %s' % best_rank)

For rank 4 the RMSE is 0.9144576028987792
For rank 8 the RMSE is 0.9196881273681683
For rank 12 the RMSE is 0.9180712492654134
The best model was trained with rank 4


In [13]:
#Testing the model

small_model = ALS.train(training_RDD, best_rank, seed=seed, iterations=iterations,
                      lambda_=regularization_parameter)
predictions = small_model.predictAll(test_for_predict_RDD).map(lambda r: ((r[0], r[1]), r[2]))
rates_and_preds = test_RDD.map(lambda r: ((int(r[0]), int(r[1])), float(r[2]))).join(predictions)
error = math.sqrt(rates_and_preds.map(lambda r: (r[1][0] - r[1][1])**2).mean())
    
print ('For testing data the RMSE is %s' % (error))

For testing data the RMSE is 0.9179382254323948


In [14]:
# Load the complete dataset file
complete_ratings_file = os.path.join(datasets_path, 'ml-latest', 'ratings.csv')
complete_ratings_raw_data = sc.textFile(complete_ratings_file)
complete_ratings_raw_data_header = complete_ratings_raw_data.take(1)[0]

# Parse
complete_ratings_data = complete_ratings_raw_data.filter(lambda line: line!=complete_ratings_raw_data_header)\
    .map(lambda line: line.split(",")).map(lambda tokens: (int(tokens[0]),int(tokens[1]),float(tokens[2]))).cache()
    
print ("There are %s recommendations in the complete dataset" % (complete_ratings_data.count()))

There are 27753444 recommendations in the complete dataset


In [15]:
training_RDD, test_RDD = complete_ratings_data.randomSplit([7, 3], seed=0)

complete_model = ALS.train(training_RDD, best_rank, seed=seed, 
                           iterations=iterations, lambda_=regularization_parameter)

In [16]:
test_for_predict_RDD = test_RDD.map(lambda x: (x[0], x[1]))

predictions = complete_model.predictAll(test_for_predict_RDD).map(lambda r: ((r[0], r[1]), r[2]))
rates_and_preds = test_RDD.map(lambda r: ((int(r[0]), int(r[1])), float(r[2]))).join(predictions)
error = math.sqrt(rates_and_preds.map(lambda r: (r[1][0] - r[1][1])**2).mean())
    
print ('For testing data the RMSE is %s' % (error))

For testing data the RMSE is 0.8318265262101795


In [17]:
#Loading the movies dataset

complete_movies_file = os.path.join(datasets_path, 'ml-latest', 'movies.csv')
complete_movies_raw_data = sc.textFile(complete_movies_file)
complete_movies_raw_data_header = complete_movies_raw_data.take(1)[0]

# Parsing the movies dataset

complete_movies_data = complete_movies_raw_data.filter(lambda line: line!=complete_movies_raw_data_header)\
    .map(lambda line: line.split(",")).map(lambda tokens: (int(tokens[0]),tokens[1],tokens[2])).cache()

complete_movies_titles = complete_movies_data.map(lambda x: (int(x[0]),x[1]))
    
print ("There are %s movies in the complete dataset" % (complete_movies_titles.count()))

There are 58098 movies in the complete dataset


In [18]:
#Creating a function that counts the number of votes

def get_counts_and_averages(ID_and_ratings_tuple):
    nratings = len(ID_and_ratings_tuple[1])
    return ID_and_ratings_tuple[0], (nratings, float(sum(x for x in ID_and_ratings_tuple[1]))/nratings)

movie_ID_with_ratings_RDD = (complete_ratings_data.map(lambda x: (x[1], x[2])).groupByKey())
movie_ID_with_avg_ratings_RDD = movie_ID_with_ratings_RDD.map(get_counts_and_averages)
movie_rating_counts_RDD = movie_ID_with_avg_ratings_RDD.map(lambda x: (x[0], x[1][0]))

In [19]:
#New User Ratings

new_user_ID = 0

# The format of each line is (userID, movieID, rating)
new_user_ratings = [
     (0,167876,9), # The Last of the Mohicans (1932)
     (0,1222,10), # Full Metal Jacket (1987)
     (0,1227,10), # Once Upon A Time in America (1984)
     (0,64614,9), # Gran Torino (2008)
     (0,32,5), # Twelve Monkeys (a.k.a. 12 Monkeys) (1995)
     (0,335,2), # Flintstones, The (1994)
     (0,379,3), # Timecop (1994)
     (0,296,9), # Pulp Fiction (1994)
     (0,858,10) , # Godfather, The (1972)
     (0,50,8) # Usual Suspects, The (1995)
    ]
new_user_ratings_RDD = sc.parallelize(new_user_ratings)
print ('New user ratings: %s' % new_user_ratings_RDD.take(10))

New user ratings: [(0, 167876, 9), (0, 1222, 10), (0, 1227, 10), (0, 64614, 9), (0, 32, 5), (0, 335, 2), (0, 379, 3), (0, 296, 9), (0, 858, 10), (0, 50, 8)]


In [20]:
complete_data_with_new_ratings_RDD = complete_ratings_data.union(new_user_ratings_RDD)

In [21]:
#Training the model for the new user

from time import time

t0 = time()
new_ratings_model = ALS.train(complete_data_with_new_ratings_RDD, best_rank, seed=seed, 
                              iterations=iterations, lambda_=regularization_parameter)
tt = time() - t0

print ("New model trained in %s seconds" % round(tt,3))

New model trained in 56.259 seconds


In [22]:
new_user_ratings_ids = map(lambda x: x[1], new_user_ratings) # get just movie IDs
new_user_unrated_movies_RDD = (complete_movies_data.filter(lambda x: x[0] not in new_user_ratings_ids).map(lambda x: (new_user_ID, x[0])))

new_user_recommendations_RDD = new_ratings_model.predictAll(new_user_unrated_movies_RDD)

In [23]:
# Transform new_user_recommendations_RDD into pairs of the form (Movie ID, Predicted Rating)

new_user_recommendations_rating_RDD = new_user_recommendations_RDD.map(lambda x: (x.product, x.rating))
new_user_recommendations_rating_title_and_count_RDD = \
    new_user_recommendations_rating_RDD.join(complete_movies_titles).join(movie_rating_counts_RDD)
new_user_recommendations_rating_title_and_count_RDD.take(5)

[(122304, ((4.560097531470719, 'The Freebie (2010)'), 5)),
 (135408, ((5.674147552775725, 'Intangible Asset Number 82 (2008)'), 2)),
 (8736, ((6.090366845265251, 'Out of the Blue (1980)'), 36)),
 (104832, ((6.4571569861442875, '"Glass Web'), 3)),
 (148512, ((1.2730116226141703, 'Seattle Superstorm (2012)'), 2))]

In [24]:
new_user_recommendations_rating_title_and_count_RDD = \
    new_user_recommendations_rating_title_and_count_RDD.map(lambda r: (r[1][0][1], r[1][0][0], r[1][1]))

In [25]:
top_movies = new_user_recommendations_rating_title_and_count_RDD.filter(lambda r: r[2]>=25).takeOrdered(25, key=lambda x: -x[1])

print ('TOP recommended movies (with more than 25 reviews):\n%s' %
        '\n'.join(map(str, top_movies)))

TOP recommended movies (with more than 25 reviews):
('Cosmos', 8.984181544985391, 157)
('Music for One Apartment and Six Drummers (2001)', 8.949775118063627, 31)
('Slaying the Badger', 8.943893815462454, 25)
("Long Night's Journey Into Day (2000)", 8.933831897832928, 35)
('Demetri Martin. Person. (2007)', 8.812392261407073, 26)
('"Godfather', 8.789430397443065, 60904)
('Ghost in the Shell Arise - Border 5: Pyrophoric Cult (2015)', 8.786744357760611, 26)
('"Lord of the Rings: The Fellowship of the Ring', 8.7730360156105, 61883)
('"Lord of the Rings: The Return of the King', 8.728540306905806, 57378)
('"Godfather: Part II', 8.70354765733596, 38875)
('Frozen Planet (2011)', 8.702414964549305, 402)
('Planet Earth II (2016)', 8.699848681364546, 853)
('Wow! A Talking Fish! (1983)', 8.698775771423916, 47)
('Star Wars: Episode V - The Empire Strikes Back (1980)', 8.698242056436044, 65822)
('Pulp Fiction (1994)', 8.686014889180504, 92406)
("Jim Henson's The Storyteller (1989)", 8.67980645383174