In [None]:
"""
Date   : Nov 14 2017
@author: Archana Neelipalayam Masilamani
Task Description:
Recommend movies by Collaborative Filtering method for users by 
using Matrix Factorizarion - Alternating Least Square method. 
This project is implemented using  spark MLlib library and Python

The dataset used is MovieLens data
"""

In [None]:
#Load the data and parse it using spark

In [6]:
from pyspark import SparkContext,SparkConf
from pyspark.mllib.recommendation import ALS, MatrixFactorizationModel, Rating

sc  = SparkContext()
#Load the ratings file 
data = sc.textFile("/Users/archana/Documents/Projects/Recommender_System/MovieLens/ml-latest-small/ratings.csv")
rdd =  data.map(lambda line: line.split(','))
#Remove the header from the csv file read
header = rdd.first()
rdd = rdd.filter(lambda line: line != header)
#REmoving Timestamp column and haveing only userId, MovieId and Ratings
rating_data = rdd.map(lambda line : (line[0],line[1],line[2]))
rating_data.take(5)

[('1', '31', '2.5'),
 ('1', '1029', '3.0'),
 ('1', '1061', '3.0'),
 ('1', '1129', '2.0'),
 ('1', '1172', '4.0')]

In [7]:
# Reference : https://spark.apache.org/docs/2.2.0/api/python/pyspark.mllib.html#pyspark.mllib.recommendation.ALS
# Parameters: 
# ratings – RDD of Rating or (userID, productID, rating) tuple.
# rank – Number of features to use (also referred to as the number of latent factors).
# iterations – Number of iterations of ALS. (default: 5)
# lambda – Regularization parameter. (default: 0.01)
# blocks – Number of blocks used to parallelize the computation. A value of -1 will use an auto-configured number of blocks. (default: -1)
# alpha – A constant used in computing confidence. (default: 0.01)
# nonnegative – A value of True will solve least-squares with nonnegativity constraints. (default: False)
# seed – Random seed for initial matrix factorization model. A value of None will use system time as the seed. (default: None)

# Initialize rank to 10 and iterations as 10

rank = 10
iterations = 10

#Train a model for the given data
model = ALS.train(rating_data, rank, iterations)

In [8]:
#Displays the first row of the product feature matrix
model.productFeatures().first()

(4,
 array('d', [-0.912808895111084, 0.6975384950637817, 0.44138476252555847, 0.5536988973617554, 1.851621389389038, 0.7033200263977051, 0.08659005910158157, -0.7737082839012146, -1.4310253858566284, -0.3459320068359375]))

In [9]:
#Displays the first row of the user feature matrix
model.userFeatures().first()

(4,
 array('d', [-0.5362852811813354, 0.625486433506012, -0.002939474070444703, -0.5402898788452148, 1.0157285928726196, -0.054121218621730804, 0.2876712381839752, -0.48325034976005554, -0.5564104318618774, -0.03544832393527031]))

In [101]:
#Displays the top 10 recommended movies for user 5 
recommendMovies = model.recommendProducts(5,10)
recommendMovies

[Rating(user=5, product=5959, rating=6.452139027697633),
 Rating(user=5, product=1034, rating=6.000732588789519),
 Rating(user=5, product=89904, rating=5.956621352876272),
 Rating(user=5, product=1649, rating=5.801531782627663),
 Rating(user=5, product=1411, rating=5.729648083007195),
 Rating(user=5, product=3365, rating=5.711824594402292),
 Rating(user=5, product=1050, rating=5.620523538861128),
 Rating(user=5, product=7064, rating=5.573826659634433),
 Rating(user=5, product=122904, rating=5.529775110354646),
 Rating(user=5, product=2120, rating=5.496883618869416)]

In [33]:
# To evaluate the model, take only user and product from the 
# training data. (i.e drop the rating)
testdata = rating_data.map(lambda p: (p[0], p[1]))
testdata.take(5)

[('1', '31'), ('1', '1029'), ('1', '1061'), ('1', '1129'), ('1', '1172')]

In [39]:
#Predict the ratings for all the Test Data
predictions = model.predictAll(testdata)
predictions.filter(lambda l: l[0]==5).take(5)

[Rating(user=5, product=344, rating=3.729826668276227),
 Rating(user=5, product=5816, rating=4.1657680595849005),
 Rating(user=5, product=3408, rating=3.7520828214155433),
 Rating(user=5, product=2424, rating=4.151401735855002),
 Rating(user=5, product=8464, rating=3.6188992045703596)]

In [67]:
#Load the movie data 
data = sc.textFile("/Users/archana/Documents/Projects/Recommender_System/MovieLens/ml-latest-small/movies.csv")
rdd =  data.map(lambda line: line.split(','))
#Remove the header from the csv file read
header = rdd.first()
rdd = rdd.filter(lambda line: line != header)
#REmoving Timestamp column and haveing only userId, MovieId and Ratings
movie_data = rdd.map(lambda line : ((line[0],line[1])))
movie_data.take(5)

[('1', 'Toy Story (1995)'),
 ('2', 'Jumanji (1995)'),
 ('3', 'Grumpier Old Men (1995)'),
 ('4', 'Waiting to Exhale (1995)'),
 ('5', 'Father of the Bride Part II (1995)')]

In [102]:
#Mapping movie names with the result of recommended products(line 10)recommendMovies_movieID = recommendMovies.map(lambda l: l[1])
recommendedMovieID = sc.parallelize(recommendMovies).map(lambda l: l[1])
recommendedMovieID.collect()

[5959, 1034, 89904, 1649, 1411, 3365, 1050, 7064, 122904, 2120]

In [103]:
#Print the top movies
print("Top ten movie recommendation for User ID 5 is: \n")
for i in recommendedMovieID.toLocalIterator():
    print(str(movie_data.lookup(str(i))).strip('[\'\"]'));

Top ten movie recommendation for user 5 are: 

Narc (2002)
Freeway (1996)
The Artist (2011)
Fast
Hamlet (1996)
Searchers
Looking for Richard (1996)
Beauty and the Beast (La belle et la bête) (1946)
Deadpool (2016)
Needful Things (1993)


In [None]:
#End