In [112]:
import os
import urllib
import urllib.request
import zipfile
import pyspark
from pyspark.sql import SQLContext
from pyspark.mllib.recommendation import ALS
import math
sc = pyspark.SparkContext('local[*]')

ValueError: Cannot run multiple SparkContexts at once; existing SparkContext(app=pyspark-shell, master=local[*]) created by __init__ at <ipython-input-1-268736d1964f>:9 

In [2]:
# getting data sources from url
complete_dataset_url = 'http://files.grouplens.org/datasets/movielens/ml-latest.zip'
small_dataset_url = 'http://files.grouplens.org/datasets/movielens/ml-latest-small.zip'

#create dataset path
datasets_path = os.path.join('..', 'work')
complete_dataset_path = os.path.join(datasets_path, 'ml-latest.zip')
small_dataset_path = os.path.join(datasets_path, 'ml-latest-small.zip')

small_f = urllib.request.urlretrieve(small_dataset_url, small_dataset_path)
complete_f = urllib.request.urlretrieve (complete_dataset_url, complete_dataset_path)

In [3]:
# Extract zipfile
with zipfile.ZipFile(small_dataset_path, "r") as z:
    z.extractall(datasets_path)

with zipfile.ZipFile(complete_dataset_path, "r") as z:
    z.extractall(datasets_path)

In [4]:
# getting small rating file
small_ratings_file = os.path.join(datasets_path, 'ml-latest-small', 'ratings.csv')
small_ratings_raw_data = sc.textFile(small_ratings_file)
small_ratings_raw_data_header = small_ratings_raw_data.take(1)[0]
small_ratings_raw_data.take(5)

['userId,movieId,rating,timestamp',
 '1,1,4.0,964982703',
 '1,3,4.0,964981247',
 '1,6,4.0,964982224',
 '1,47,5.0,964983815']

In [92]:
# get data from rating and movies files
def map_rating(path, folder, file):
    ratings_file = os.path.join(path, folder, file)
    
    ratings_raw_data = sc.textFile(ratings_file)
    
    ratings_raw_data_header = ratings_raw_data.take(1)[0]
    
    ratings_raw_data.take(5)
    
    ratings_data = ratings_raw_data.filter(lambda line: line!=ratings_raw_data_header)\
    .map(lambda line: line.split(",")).map(lambda tokens: (int(tokens[0]),int(tokens[1]),float(tokens[2]))).cache()
    return(ratings_data)

def map_movie(path, folder, file):
    movies_file = os.path.join(path, folder, file)
    
    movies_raw_data = sc.textFile(movies_file)
    
    movies_raw_data_header = movies_raw_data.take(1)[0]
    
    movies_data = movies_raw_data.filter(lambda line: line!= movies_raw_data_header)\
   .map(lambda line: line.split(",")).map(lambda tokens: (tokens[0],tokens[1],tokens[2])).cache()
    return(movies_data)


In [7]:
small_ratings_data = map_rating(datasets_path, 'ml-latest-small', 'ratings.csv')

small_ratings_data.take(10)

[(1, 1, 4.0),
 (1, 3, 4.0),
 (1, 6, 4.0),
 (1, 47, 5.0),
 (1, 50, 5.0),
 (1, 70, 3.0),
 (1, 101, 5.0),
 (1, 110, 4.0),
 (1, 151, 5.0),
 (1, 157, 5.0)]

In [11]:
# process movie file
small_movies_data =  map_movie(datasets_path, 'ml-latest-small', 'movies.csv')
small_movies_data.take(10)

[('1', 'Toy Story (1995)'),
 ('2', 'Jumanji (1995)'),
 ('3', 'Grumpier Old Men (1995)'),
 ('4', 'Waiting to Exhale (1995)'),
 ('5', 'Father of the Bride Part II (1995)'),
 ('6', 'Heat (1995)'),
 ('7', 'Sabrina (1995)'),
 ('8', 'Tom and Huck (1995)'),
 ('9', 'Sudden Death (1995)'),
 ('10', 'GoldenEye (1995)')]

## Collaborative Filtering

### Selecting ALS parameters using the small dataset

In [12]:
# split dataset into train, validate, and test data set
training_RDD, validation_RDD, test_RDD = small_ratings_data.randomSplit([6, 2, 2], seed=0)
validation_for_predict_RDD = validation_RDD.map(lambda x: (x[0], x[1]))
test_for_predict_RDD = test_RDD.map(lambda x: (x[0], x[1]))

In [13]:
# training phrase
# fix parameter.
seed = 5
iterations = 10
regularization_parameter = 0.1
ranks = [4, 8, 12]
errors = [0, 0, 0]
err = 0
tolerance = 0.02

min_error = float('inf')
best_rank = -1
best_iteration = -1
for rank in ranks:
    model = ALS.train(training_RDD, rank, seed=seed, iterations=iterations,
                      lambda_=regularization_parameter)
    predictions = model.predictAll(validation_for_predict_RDD).map(lambda r: ((r[0], r[1]), r[2]))
    rates_and_preds = validation_RDD.map(lambda r: ((int(r[0]), int(r[1])), float(r[2]))).join(predictions)
    error = math.sqrt(rates_and_preds.map(lambda r: (r[1][0] - r[1][1])**2).mean())
    errors[err] = error
    err += 1
    print('For rank %s the RMSE is %s' % (rank, error))
    if error < min_error:
        min_error = error
        best_rank = rank

print('The best model was trained with rank %s' % best_rank)

For rank 4 the RMSE is 0.908078105265682
For rank 8 the RMSE is 0.916462973348527
For rank 12 the RMSE is 0.917665030756129
The best model was trained with rank 4


In [14]:
predictions.take(3)

[((372, 1084), 3.42419871162954),
 ((4, 1084), 3.866749726695713),
 ((402, 1084), 3.4099577968422152)]

In [15]:
rates_and_preds.take(3)

[((1, 457), (5.0, 4.381060760461434)),
 ((1, 1025), (5.0, 4.705295366590298)),
 ((1, 1089), (5.0, 4.979982471805129))]

In [121]:
model = ALS.train(training_RDD, best_rank, seed=seed, iterations=iterations,
                      lambda_=regularization_parameter)
predictions = model.predictAll(test_for_predict_RDD).map(lambda r: ((r[0], r[1]), r[2]))
rates_and_preds = test_RDD.map(lambda r: ((int(r[0]), int(r[1])), float(r[2]))).join(predictions)
error = math.sqrt(rates_and_preds.map(lambda r: (r[1][0] - r[1][1])**2).mean())
    
print('For testing data the RMSE is %s' % (error))

For testing data the RMSE is 0.9113780946334407


## Using the complete dataset to build the final model

In [16]:
# loaf complete rating data
complete_ratings_data = map_rating(datasets_path, 'ml-latest', 'ratings.csv')
complete_movies_data =  map_movie(datasets_path, 'ml-latest', 'movies.csv')    
print("There are %s recommendations in the complete dataset" % (complete_ratings_data.count()))

There are 100836 recommendations in the complete dataset


In [17]:
#train model
training_RDD, test_RDD = complete_ratings_data.randomSplit([7, 3], seed=0)
complete_model = ALS.train(training_RDD, best_rank, seed=seed, 
                           iterations=iterations, lambda_=regularization_parameter)

In [18]:
# testing
test_for_predict_RDD = test_RDD.map(lambda x: (x[0], x[1]))

predictions = complete_model.predictAll(test_for_predict_RDD).map(lambda r: ((r[0], r[1]), r[2]))
rates_and_preds = test_RDD.map(lambda r: ((int(r[0]), int(r[1])), float(r[2]))).join(predictions)
error = math.sqrt(rates_and_preds.map(lambda r: (r[1][0] - r[1][1])**2).mean())
    
print('For testing data the RMSE is %s' % (error))

For testing data the RMSE is 0.8949959237223808


## make recommendations

In [20]:
complete_movies_titles = complete_movies_data.map(lambda x: (int(x[0]),x[1]))

print("There are %s movies in the complete dataset" % (complete_movies_titles.count()))


There are 58098 movies in the complete dataset


In [21]:
def get_counts_and_averages(ID_and_ratings_tuple):
    nratings = len(ID_and_ratings_tuple[1])
    return ID_and_ratings_tuple[0], (nratings, float(sum(x for x in ID_and_ratings_tuple[1]))/nratings)

movie_ID_with_ratings_RDD = (complete_ratings_data.map(lambda x: (x[1], x[2])).groupByKey())
movie_ID_with_avg_ratings_RDD = movie_ID_with_ratings_RDD.map(get_counts_and_averages)
movie_rating_counts_RDD = movie_ID_with_avg_ratings_RDD.map(lambda x: (x[0], x[1][0]))


In [22]:
movie_rating_counts_RDD.take(10)

[(6, 102),
 (50, 204),
 (70, 55),
 (110, 237),
 (216, 49),
 (260, 251),
 (296, 307),
 (316, 140),
 (356, 329),
 (362, 34)]

## Adding new user ratings

In [23]:
new_user_ID = 0
# The format of each line is (userID, movieID, rating)
new_user_ratings = [
     (0,111,1), # 'Taxi Driver (1976)'
     (0,165,4), # 165, 'Die Hard: With a Vengeance (1995)'
     (0,112556,5), # Gone Girl (2014)'
     (0,122892,5), # 122892,Avengers: Age of Ultron (2015)
     (0,122898,3), # 122898,Justice League (2017)
     (0,362,2), # 362, '"Jungle Book'
     (0,125916,3), # 125916,Fifty Shades of Grey '
     (0,539,3), # 539, 'Sleepless in Seattle (1993)
     (0,329,5) , # 329, 'Star Trek: Generations (1994)'
     (0,858,5) # 858, '"Godfather'
    ]
new_user_ratings_RDD = sc.parallelize(new_user_ratings)
print('New user ratings: %s' % new_user_ratings_RDD.take(10))

New user ratings: [(0, 111, 1), (0, 165, 4), (0, 112556, 5), (0, 122892, 5), (0, 122898, 3), (0, 362, 2), (0, 125916, 3), (0, 539, 3), (0, 329, 5), (0, 858, 5)]


In [24]:
complete_data_with_new_ratings_RDD = complete_ratings_data.union(new_user_ratings_RDD)

In [25]:
from time import time

t0 = time()
new_ratings_model = ALS.train(complete_data_with_new_ratings_RDD, best_rank, seed=seed, 
                              iterations=iterations, lambda_=regularization_parameter)
tt = time() - t0

print ("New model trained in %s seconds" % round(tt,3))

New model trained in 2.45 seconds


## Getting top recommendations

In [26]:
new_user_ratings_ids = map(lambda x: x[1], new_user_ratings) # get just movie IDs
# keep just those not on the ID list (thanks Lei Li for spotting the error!)
new_user_unrated_movies_RDD = (complete_movies_data.filter(lambda x: x[0] not in new_user_ratings_ids).map(lambda x: (new_user_ID, x[0])))

# Use the input RDD, new_user_unrated_movies_RDD, with new_ratings_model.predictAll() to predict new ratings for the movies
new_user_recommendations_RDD = new_ratings_model.predictAll(new_user_unrated_movies_RDD)

In [27]:
# Transform new_user_recommendations_RDD into pairs of the form (Movie ID, Predicted Rating)
new_user_recommendations_rating_RDD = new_user_recommendations_RDD.map(lambda x: (x.product, x.rating))
new_user_recommendations_rating_title_and_count_RDD = \
    new_user_recommendations_rating_RDD.join(complete_movies_titles).join(movie_rating_counts_RDD)
new_user_recommendations_rating_title_and_count_RDD.take(3)

[(4992, ((4.065143061238492, 'Kate & Leopold (2001)'), 17)),
 (7572, ((5.405579736703817, 'Wit (2001)'), 1)),
 (5688, ((3.562487664595012, 'Tully (2000)'), 2))]

### create function

In [51]:
# define userfunction for all steps

# split dataset
def split_dataset(dataset):
    training_RDD, validation_RDD, test_RDD = dataset.randomSplit([6, 2, 2], seed=0)
    validation_for_predict_RDD = validation_RDD.map(lambda x: (x[0], x[1]))
    test_for_predict_RDD = test_RDD.map(lambda x: (x[0], x[1]))
    return(training_RDD,validation_RDD,test_RDD,validation_for_predict_RDD,test_for_predict_RDD)

# select best ranks:
def select_bestranks(training, validation, validation_for_predict):
    seed = 5
    iterations = 10
    regularization_parameter = 0.1
    ranks = [4, 8, 12]
    errors = [0, 0, 0]
    err = 0
    tolerance = 0.02

    min_error = float('inf')
    best_rank = -1
    best_iteration = -1
    for rank in ranks:
        model = ALS.train(training, rank, seed= seed, iterations= iterations, lambda_= regularization_parameter)
        predictions = model.predictAll(validation_for_predict).map(lambda r: ((r[0], r[1]), r[2]))
        rates_and_preds = validation.map(lambda r: ((int(r[0]), int(r[1])), float(r[2]))).join(predictions)
        error = math.sqrt(rates_and_preds.map(lambda r: (r[1][0] - r[1][1])**2).mean())
        errors[err] = error
        err += 1
        if error < min_error:
            min_error = error
            best_rank = rank

    return(best_rank)

# train model
#seed = 5
#iterations = 10
#regularization_parameter = 0.1
#ranks = [4, 8, 12]
#errors = [0, 0, 0]
#err = 0
#tolerance = 0.02
def train_model(data, best_rank):
    model = ALS.train(data, best_rank, seed=seed, iterations=iterations, lambda_= regularization_parameter)
    return(model)
# testing
def test_model(model, test_for_predict, test):
    predictions = model.predictAll(test_for_predict).map(lambda r: ((r[0], r[1]), r[2]))
    rates_and_preds = test.map(lambda r: ((int(r[0]), int(r[1])), float(r[2]))).join(predictions)
    error = math.sqrt(rates_and_preds.map(lambda r: (r[1][0] - r[1][1])**2).mean())
    return(error)

# movie rating count
def move_ratcnt(ratings_data):
    movie_ID_with_ratings = (ratings_data.map(lambda x: (x[1], x[2])).groupByKey())
    movie_ID_with_avg_ratings = movie_ID_with_ratings.map(get_counts_and_averages)
    movie_rating_counts = movie_ID_with_avg_ratings.map(lambda x: (x[0], x[1][0]))
    return(movie_rating_counts)


# recommendation
def recommendation(model, new_user_ratings, movies_data, movies_titles, movie_rating_counts, new_user_ID):
    new_user_ratings_ids = map(lambda x: x[1], new_user_ratings) # get just movie IDs
    
    # keep just those not on the ID list (thanks Lei Li for spotting the error!)
    new_user_unrated_movies_RDD = (movies_data.filter(lambda x: x[0] not in new_user_ratings_ids).map(lambda x: (new_user_ID, x[0])))
    
    # Use the input RDD, new_user_unrated_movies_RDD, with new_ratings_model.predictAll() to predict new ratings for the movies
    new_user_recommendations_RDD = model.predictAll(new_user_unrated_movies_RDD)
    
    # Transform new_user_recommendations_RDD into pairs of the form (Movie ID, Predicted Rating)
    new_user_recommendations_rating_RDD = new_user_recommendations_RDD.map(lambda x: (x.product, x.rating))
    new_user_recommendations_rating_title_and_count = new_user_recommendations_rating_RDD.join(movies_titles).join(movie_rating_counts)
    new_user_recommendations_rating_title_and_count = \
    new_user_recommendations_rating_title_and_count.map(lambda r: (r[1][0][1], r[1][0][0], r[1][1]))
    top_movies = new_user_recommendations_rating_title_and_count.filter(lambda r: r[2]>=25).takeOrdered(25, key=lambda x: -x[1])
    return(top_movies)

In [111]:
#Full function
def full_recom(datasets_path, folder, rating_file, movie_file):
    # get data
    ratings_data = map_rating(datasets_path, folder, rating_file)
    movies_data =  map_movie(datasets_path, folder, movie_file)
    movies_titles = movies_data.map(lambda x: (int(x[0]),x[1]))
    new_user_ID = 0
    training, validation, test, validation_for_predict, test_for_predict = split_dataset(ratings_data)
    # best rank
    best_rank = select_bestranks(training, validation, validation_for_predict)
    # union user data + traindata
    data_with_new_ratings_RDD = training.union(new_user_ratings_RDD)
    # training model
    model = train_model(data_with_new_ratings_RDD, best_rank)
    # model error rate
    RMSE = test_model(model, test_for_predict, test)

    movie_rating_counts = move_ratcnt(ratings_data)

    recom = recommendation(model, new_user_ratings, movies_data ,movies_titles, movie_rating_counts, 0)
    return(RMSE, recom)

In [32]:
#create new data file 30%
import random

def split_file(percentage=0.70,isShuffle=True,seed=123):
    random.seed(seed)
    with open("ml-latest/ratings.csv", 'r',encoding="utf-8") as fin, \
         open("ml-latest/ratings70.csv", 'w') as fout70, \
         open("ml-latest/ratings30.csv", 'w') as fout30:

        nLines = sum(1 for line in fin)
        fin.seek(0)

        nTrain = int(nLines*percentage) 
        nValid = nLines - nTrain

        i = 0
        for line in fin:
            r = random.random() if isShuffle else 0 # so that always evaluated to true when not isShuffle
            if (i < nTrain and r < percentage) or (nLines - i > nValid):
                fout70.write(line)
                i += 1
            else:
                fout30.write(line)


In [33]:
split_file()

# Trying 4 scenarios

  ## Small dataset
   

In [97]:
RMSE_small, recom_small = full_recom(datasets_path,'ml-latest-small','ratings.csv','movies.csv')

In [98]:
RMSE_small

0.9132018474248642

In [99]:
recom_small

[('Red Dragon (2002)', 5.762509985863026, 31),
 ('Slumdog Millionaire (2008)', 5.625623588879715, 71),
 ('1408 (2007)', 5.61796692330184, 25),
 ('Star Wars: Episode III - Revenge of the Sith (2005)',
  5.547503826871905,
  78),
 ('Ex Machina (2015)', 5.541494116931921, 28),
 ('Lord of War (2005)', 5.459302189567019, 35),
 ('"I', 5.3714544836466445, 61),
 ("Pirates of the Caribbean: At World's End (2007)", 5.369481763618821, 56),
 ('Big Fish (2003)', 5.3260223641138875, 69),
 ('Transformers (2007)', 5.30981342942797, 39),
 ('"World Is Not Enough', 5.239654752861686, 39),
 ('Mystery Science Theater 3000: The Movie (1996)', 5.233780094884777, 36),
 ('Punch-Drunk Love (2002)', 5.227305198631046, 33),
 ('"Bone Collector', 5.200058372585449, 25),
 ('Star Wars: Episode II - Attack of the Clones (2002)',
  5.170406443449441,
  92),
 ('True Grit (2010)', 5.166600519131521, 28),
 ('Captain America: The Winter Soldier (2014)', 5.130851164377033, 31),
 ('Remember the Titans (2000)', 5.098682129497

## 30% complete dataset

In [100]:
RMSE_30, recom_30 = full_recom(datasets_path,'ml-latest','ratings30.csv','movies.csv')

In [101]:
RMSE_30

0.8265084703240173

In [102]:
recom_30

[("Harrison's Flowers (2000)", 5.253354784163777, 35),
 ('"Stoning of Soraya M.', 5.210474668335228, 55),
 ('Dragon Ball Z: Bardock - The Father of Goku (Doragon bôru Z: Tatta hitori no saishuu kessen - Furiiza ni itonda Z senshi Kakarotto no chichi) (1990)',
  5.014820129537917,
  27),
 ('Instructions Not Included (No se Aceptan Devoluciones) (2013)',
  4.882053133720186,
  32),
 ('The Hobbit: The Battle of the Five Armies (2014)', 4.814544513662654, 1547),
 ('"Hobbit: The Desolation of Smaug', 4.808711867630247, 2418),
 ('Fallen Art (Sztuka spadania) (2004)', 4.7999751960012444, 44),
 ('Stargate SG-1 Children of the Gods - Final Cut (2009)',
  4.785991065095895,
  55),
 ('August Rush (2007)', 4.784916995693632, 416),
 ('Doctor Who: The Waters of Mars (2009)', 4.783987321918712, 115),
 ('Barbarians at the Gate (1993)', 4.762796649930198, 27),
 ('Only the Brave (2017)', 4.7429898128138355, 62),
 ('"Lord of the Rings: The Two Towers', 4.736673685726939, 17064),
 ('"Lord of the Rings: Th

## 70% complete dataset

In [103]:
RMSE_70, recom_70 = full_recom(datasets_path,'ml-latest','ratings70.csv','movies.csv')

In [104]:
RMSE_70

0.8166786814742976

In [105]:
recom_70

[('Foo Fighters: Back and Forth (2011)', 5.107764176493012, 35),
 ('Two Rabbits (2 Coelhos) (2012)', 5.047183258092536, 34),
 ('Who Am I (Kein System Ist Sicher) (2014)', 4.986711935609485, 249),
 ('The Reichenbach Fall (2012)', 4.935866879148645, 30),
 ('Jim Jefferies: Freedumb (2016)', 4.852225367396554, 62),
 ('Rock the Kasbah (2015)', 4.808769405690816, 51),
 ('The Lost Room (2006)', 4.796616582134121, 196),
 ('Cranford (2007)', 4.785157644628979, 27),
 ('"Fuck You', 4.7785663022519556, 75),
 ('The Glass Castle', 4.748164410795484, 54),
 ('Sherlock - A Study in Pink (2010)', 4.737348106967002, 153),
 ('"Matrix', 4.735215369795464, 59186),
 ('Till Human Voices Wake Us (2001)', 4.73009809265276, 34),
 ('Inception (2010)', 4.729145095434363, 28982),
 ('Bunraku (2010)', 4.7234350159123775, 79),
 ('The Matrix Revisited (2001)', 4.713674298908891, 40),
 ('We Are The Night (Wir sind die Nacht) (2010)', 4.709029632750187, 28),
 ('Gladiator (2000)', 4.696818055627896, 34027),
 ('Bill Hicks:

## complete dataset

In [None]:
RMSE_100, recom_100 = full_recom(datasets_path,'ml-latest','ratings.csv','movies.csv')


In [109]:
recom_100

[('Cosmos: A Spacetime Odissey', 5.127218844239509, 37),
 ('Sinatra: All or Nothing at All (2015)', 4.842528159331449, 35),
 ('The Butterfly Circus (2009)', 4.817608714502535, 28),
 ('Dragon Ball: Episode of Bardock (2011)', 4.813921035431139, 30),
 ('Dragon Ball Z: Super Android 13! (Doragon bôru Z 7: Kyokugen batoru!! San dai sûpâ saiyajin) (1992)',
  4.79990103247374,
  44),
 ('Dragon Ball Z: Broly Second Coming (Doragon bôru Z 10: Kiken na futari! Sûpâ senshi wa nemurenai) (1994)',
  4.795481041673521,
  38),
 ('Loose Change 9/11: An American Coup (2009)', 4.790218227567256, 46),
 ('Dragon Ball Z: Resurrection of F (2015)', 4.747191956158441, 90),
 ('The Lost Room (2006)', 4.698187527796, 280),
 ('DMB (2000)', 4.639759076980306, 29),
 ('"Hobbit: The Desolation of Smaug', 4.634019648143463, 7995),
 ('Doctor Who: Twice Upon A Time (2017)', 4.633733277492968, 65),
 ('Yolki 2 (2011)', 4.625842867926236, 31),
 ('Iron Jawed Angels (2004)', 4.621041018653384, 39),
 ('Adrift (2018)', 4.592

In [110]:
RMSE_100

0.8199886721277755