In [None]:
# This is where the Movie Lens data is - updated every month, the last update was in October 2016 from where the
# the data was pulled
complete_dataset_url = 'http://files.grouplens.org/datasets/movielens/ml-latest.zip'
small_dataset_url = 'http://files.grouplens.org/datasets/movielens/ml-latest-small.zip'

In [1]:
# specifies where to place the downloaded data
import os

datasets_path = os.path.join('/home/kswamy/Documents/data_analysis', 'datasets')

complete_dataset_path = os.path.join(datasets_path, 'ml-latest.zip')
small_dataset_path = os.path.join(datasets_path, 'ml-latest-small.zip')

In [None]:
# This retrieves the zipped data from the url location and places it in the data set path specified above
import urllib

small_f = urllib.urlretrieve (small_dataset_url, small_dataset_path)
complete_f = urllib.urlretrieve (complete_dataset_url, complete_dataset_path)

In [None]:
# unzips the zip file
import zipfile

with zipfile.ZipFile(small_dataset_path, "r") as z:
    z.extractall(datasets_path)

with zipfile.ZipFile(complete_dataset_path, "r") as z:
    z.extractall(datasets_path)

In [2]:
# assigns the ratings raw data variable of the small movie ratings
small_ratings_file = os.path.join(datasets_path, 'ml-latest-small', 'ratings.csv')

small_ratings_raw_data = sc.textFile(small_ratings_file)
small_ratings_raw_data_header = small_ratings_raw_data.take(1)[0]


In [3]:
# filters to remove the header from the data, splits the data into columns and assigns the three columns
# The four columns contained in the ratings data are userId,movieId,rating,timestamp
# We ignore timestamp in the modeling here
small_ratings_data = small_ratings_raw_data.filter(lambda line: line!=small_ratings_raw_data_header)\
    .map(lambda line: line.split(",")).map(lambda tokens: (tokens[0],tokens[1],tokens[2])).cache()

In [4]:
# The columns are userId,movieId,rating
small_ratings_data.take(3)

[(u'1', u'31', u'2.5'), (u'1', u'1029', u'3.0'), (u'1', u'1061', u'3.0')]

In [5]:
# Assigns the movies title file of the small movie data set
small_movies_file = os.path.join(datasets_path, 'ml-latest-small', 'movies.csv')

small_movies_raw_data = sc.textFile(small_movies_file)
small_movies_raw_data_header = small_movies_raw_data.take(1)[0]

small_movies_data = small_movies_raw_data.filter(lambda line: line!=small_movies_raw_data_header)\
    .map(lambda line: line.split(",")).map(lambda tokens: (tokens[0],tokens[1])).cache()

small_movies_data.take(3)

[(u'1', u'Toy Story (1995)'),
 (u'2', u'Jumanji (1995)'),
 (u'3', u'Grumpier Old Men (1995)')]

In [6]:
small_movies_titles = small_movies_data.map(lambda x: (int(x[0]),x[1]))
small_movies_titles.take(3)

[(1, u'Toy Story (1995)'),
 (2, u'Jumanji (1995)'),
 (3, u'Grumpier Old Men (1995)')]

In [113]:
# create training, validation and test data sets
small_training_RDD, small_validation_RDD, small_test_RDD = small_ratings_data.randomSplit([6, 2, 2], seed=0L)
small_validation_for_predict_RDD = small_validation_RDD.map(lambda x: (x[0], x[1]))
small_test_for_predict_RDD = small_test_RDD.map(lambda x: (x[0], x[1]))

In [114]:
# Train the model using the training dataset and predict the error using the validation dataset
# Rank 40 gave the lowest error and this rank will be used to train the large dataset
from pyspark.mllib.recommendation import ALS
import math

seed = 5L
iterations = 10
regularization_parameter = 0.1
ranks = [4, 8, 12,25,30,40,50]
errors = [0, 0, 0,0,0,0,0]
err = 0
tolerance = 0.02

min_error = float('inf')
best_rank = -1
best_iteration = -1
for rank in ranks:
    model = ALS.train(small_training_RDD, rank, seed=seed, iterations=iterations,
                      lambda_=regularization_parameter)
    predictions = model.predictAll(small_validation_for_predict_RDD).map(lambda r: ((r[0], r[1]), r[2]))
    rates_and_preds = small_validation_RDD.map(lambda r: ((int(r[0]), int(r[1])), float(r[2]))).join(predictions)
    error = math.sqrt(rates_and_preds.map(lambda r: (r[1][0] - r[1][1])**2).mean())
    errors[err] = error
    err += 1
    print 'For rank %s the RMSE is %s' % (rank, error)
    if error < min_error:
        min_error = error
        best_rank = rank
print 'The best model was trained with rank %s' % best_rank

For rank 4 the RMSE is 0.952401795348
For rank 8 the RMSE is 0.959968407203
For rank 12 the RMSE is 0.953274504895
For rank 25 the RMSE is 0.952975573491
For rank 30 the RMSE is 0.94888354937
For rank 40 the RMSE is 0.944372594307
For rank 50 the RMSE is 0.945498085497
The best model was trained with rank 40


In [10]:
# The ALS trained model has userFeatures and productFeatures matrix
(model.userFeatures().count())

671

In [11]:
# The productFeatures matrix will be used to create an item-item collaborative filtering recommendation model
model.productFeatures().count()

7556

In [12]:
model.productFeatures().lookup(1084)[0]

array('d', [-0.27228254079818726, 0.43617916107177734, 0.27535906434059143, 0.25024840235710144, 0.7473196387290955, -0.2839663624763489, -0.1463722586631775, 0.2846594750881195, 1.190673589706421, 1.3351117372512817, -0.3633521497249603, -1.066757082939148])

In [13]:
import numpy as np
from numpy import linalg as LA

In [14]:
def cosineSimilarity(vec1, vec2):
  return vec1.dot(vec2) / (LA.norm(vec1) * LA.norm(vec2))

In [32]:
# Choose a movie ID to predict the similar movies to it
itemId = 1084

itemFactor = np.asarray(model.productFeatures().lookup(itemId))[0]

In [16]:
itemFactor

array([-0.27228254,  0.43617916,  0.27535906,  0.2502484 ,  0.74731964,
       -0.28396636, -0.14637226,  0.28465948,  1.19067359,  1.33511174,
       -0.36335215, -1.06675708])

In [17]:
cosineSimilarity(itemFactor,itemFactor)

1.0

In [33]:
# similarity matrix built for a particular movie item using the product features matrix
sims = model.productFeatures().map(lambda products:(products[0],
                                        cosineSimilarity(np.asarray(products[1]), itemFactor)))\
                                .join(small_movies_titles).map(lambda r: (r[1][1], r[1][0], r[0]))

In [34]:
sims.take(3)

[(u'Beyond the Valley of the Dolls (1970)', 0.82334611930333124, 8196),
 (u'Heat (1995)', 0.76750135402878283, 6),
 (u'Dracula: Dead and Loving It (1995)', 0.39088161722647613, 12)]

In [35]:
sortedSims = sims.takeOrdered(20, key=lambda x: -x[1])

In [36]:
# Sorted Similarity matrix to predict the movies most similar to the chosen movie
sortedSims

[(u'Bonnie and Clyde (1967)', 1.0, 1084),
 (u'"Andromeda Strain', 0.9727425984780248, 6303),
 (u'Monsieur Verdoux (1947)', 0.96915349847514232, 3632),
 (u'"Public Enemy', 0.96535496116070552, 7056),
 (u'"Immigrant', 0.96382595368313073, 8511),
 (u'Death to Smoochy (2002)', 0.96246929945464077, 5265),
 (u'"Day of the Jackal', 0.96147569443342451, 8207),
 (u'"Lavender Hill Mob', 0.96057896116625829, 5603),
 (u'Broadway Danny Rose (1984)', 0.9574629902906906, 7983),
 (u'And Then There Were None (1945)', 0.95667523623821327, 4969),
 (u'Kind Hearts and Coronets (1949)', 0.95641880421324321, 6650),
 (u'"Awful Truth', 0.9564187996238841, 6254),
 (u'"Sorry', 0.9564187996238841, 5434),
 (u'"Crazies', 0.95596062207871346, 6395),
 (u'71 Fragments of a Chronology of Chance (71 Fragmente einer Chronologie des Zufalls) (1994)',
  0.95526845145991002,
  26850),
 (u'Where the Sidewalk Ends (1950)', 0.95526845145991002, 59832),
 (u"Mon oncle d'Am\xe9rique (1980)", 0.95526845145991002, 5202),
 (u'"Road'

In [14]:
predictions.take(3)

[((452, 1084), 3.0789427124972235),
 ((472, 1084), 3.469723768504703),
 ((529, 1084), 3.6833522903175013)]

In [15]:
rates_and_preds.take(3)

[((368, 2664), (4.0, 3.9869699179369316)),
 ((153, 3825), (3.0, 2.406334801407196)),
 ((148, 1208), (5.0, 4.332034819234105))]

In [37]:
# Load the complete ratings dataset file
complete_ratings_file = os.path.join(datasets_path, 'ml-latest', 'ratings.csv')
complete_ratings_raw_data = sc.textFile(complete_ratings_file)
complete_ratings_raw_data_header = complete_ratings_raw_data.take(1)[0]

# Parse
complete_ratings_data = complete_ratings_raw_data.filter(lambda line: line!=complete_ratings_raw_data_header)\
    .map(lambda line: line.split(",")).map(lambda tokens: (int(tokens[0]),int(tokens[1]),float(tokens[2]))).cache()

print "There are %s recommendations in the complete dataset" % (complete_ratings_data.count())

There are 24404096 recommendations in the complete dataset


In [115]:
# Training and Test data from the complete movie dataset
training_RDD, test_RDD = complete_ratings_data.randomSplit([7, 3], seed=0L)

complete_model = ALS.train(training_RDD, best_rank, seed=seed, 
                           iterations=iterations, lambda_=regularization_parameter)

In [116]:
test_for_predict_RDD = test_RDD.map(lambda x: (x[0], x[1]))

predictions = complete_model.predictAll(test_for_predict_RDD).map(lambda r: ((r[0], r[1]), r[2]))
rates_and_preds = test_RDD.map(lambda r: ((int(r[0]), int(r[1])), float(r[2]))).join(predictions)
error = math.sqrt(rates_and_preds.map(lambda r: (r[1][0] - r[1][1])**2).mean())

print 'For testing data the RMSE is %s' % (error)

For testing data the RMSE is 0.818180394334


In [40]:
# Load the complete movie titles file
complete_movies_file = os.path.join(datasets_path, 'ml-latest', 'movies.csv')
complete_movies_raw_data = sc.textFile(complete_movies_file)
complete_movies_raw_data_header = complete_movies_raw_data.take(1)[0]

# Parse
complete_movies_data = complete_movies_raw_data.filter(lambda line: line!=complete_movies_raw_data_header)\
    .map(lambda line: line.split(",")).map(lambda tokens: (int(tokens[0]),tokens[1],tokens[2])).cache()

complete_movies_titles = complete_movies_data.map(lambda x: (int(x[0]),x[1]))

print "There are %s movies in the complete dataset" % (complete_movies_titles.count())

There are 40110 movies in the complete dataset


In [41]:
def get_counts_and_averages(ID_and_ratings_tuple):
    nratings = len(ID_and_ratings_tuple[1])
    return ID_and_ratings_tuple[0], (nratings, float(sum(x for x in ID_and_ratings_tuple[1]))/nratings)

movie_ID_with_ratings_RDD = (complete_ratings_data.map(lambda x: (x[1], x[2])).groupByKey())
movie_ID_with_avg_ratings_RDD = movie_ID_with_ratings_RDD.map(get_counts_and_averages)
movie_rating_counts_RDD = movie_ID_with_avg_ratings_RDD.map(lambda x: (x[0], x[1][0]))


In [110]:
complete_model.productFeatures().count()

36909

In [111]:
movie_ID_with_avg_ratings_RDD.count()

39443

In [105]:
# Chosen MovieID for calculating similar movies
itemId = 25800

complete_itemFactor = np.asarray(complete_model.productFeatures().lookup(itemId))[0]

In [106]:
# calculating the similarity coefficients of all the movies against the chosen movie ID
complete_sims = complete_model.productFeatures().map(lambda products:(products[0],
                                        cosineSimilarity(np.asarray(products[1]), complete_itemFactor)))\
                                .join(complete_movies_titles).join(movie_rating_counts_RDD)\
                                .join(movie_ID_with_avg_ratings_RDD)
complete_sims = complete_sims.map(lambda r: (r[1][0][0][1], r[1][0][0][0], r[0], r[1][0][1], r[1][1][1]))

In [49]:
# Filter the data by movieIDs that have the count of user ratings higher than five
# Sort the similarity data by decreasing order of the similarity coefficient, and then take the top 20 from the ordered
# list
complete_sortedSims = complete_sims.filter(lambda r: r[3]>=5).takeOrdered(20, key=lambda x: -x[1])
complete_sortedSims

[(u'Bonnie and Clyde (1967)', 1.0, 1084, 10729, 3.8564171870631),
 (u'Animal Crackers (1930)',
  0.99993380386717889,
  7706,
  998,
  3.9398797595190382),
 (u'You Only Live Once (1937)',
  0.99989432912252063,
  6515,
  69,
  3.463768115942029),
 (u'"Graduate', 0.99975585418568769, 1247, 19276, 4.050140070554057),
 (u'"Ladykillers', 0.99971463184957576, 5602, 1119, 3.772117962466488),
 (u'Creature Comforts (1989)',
  0.99954742167054844,
  3429,
  2697,
  4.103262884686689),
 (u'"Hard Day\'s Night', 0.99952498733217099, 2863, 4627, 3.8240760752107197),
 (u'Vertigo (1958)', 0.99952031000213459, 903, 16545, 4.1249622242369295),
 (u'On the Ropes (1999)', 0.9994926801856886, 2824, 148, 3.543918918918919),
 (u'"Harder They Fall', 0.99947801225433885, 6064, 170, 3.5764705882352943),
 (u'"Apartment', 0.99935385557621415, 909, 5911, 4.0219083065471155),
 (u'Angels with Dirty Faces (1938)',
  0.99927936779293147,
  8600,
  477,
  3.7536687631027252),
 (u'This Gun for Hire (1942)',
  0.99926158

In [117]:
# A test variable is created for pickling the productFeatures matrix data, combined with movie titles
# and average ratings.  
test2 = complete_model.productFeatures().join(complete_movies_titles)\
                                .join(movie_ID_with_avg_ratings_RDD)

In [118]:
# pickle the productFeatures matrix
test2.saveAsPickleFile('/home/kswamy/Documents/data_analysis/spark_models/movie_lens_als_complete/product_40feature_rating')

In [None]:
# Load the pickled model
from pyspark.mllib.recommendation import MatrixFactorizationModel

model_path = os.path.join('/home/kswamy/Documents/data_analysis', 'spark_models', 'movie_lens_als_complete')

# Save and load model
complete_model.save(sc, model_path)
#same_model = MatrixFactorizationModel.load(sc, model_path)
