# Collaborative Filtering

In [348]:
from pyspark.sql import *
from pyspark import SQLContext
from pyspark.mllib.recommendation import ALS, MatrixFactorizationModel, Rating

In [349]:
sqlContext = SQLContext(sc)
movies_rdd = sc.textFile("/home/sarthak/Jupyter/movies.csv")

In [350]:
movies_rdd.take(10)

['movieId,title,genres',
 '1,Toy Story (1995),Adventure|Animation|Children|Comedy|Fantasy',
 '2,Jumanji (1995),Adventure|Children|Fantasy',
 '3,Grumpier Old Men (1995),Comedy|Romance',
 '4,Waiting to Exhale (1995),Comedy|Drama|Romance',
 '5,Father of the Bride Part II (1995),Comedy',
 '6,Heat (1995),Action|Crime|Thriller',
 '7,Sabrina (1995),Comedy|Romance',
 '8,Tom and Huck (1995),Adventure|Children',
 '9,Sudden Death (1995),Action']

In [351]:
ratings_rdd = sc.textFile("/home/sarthak/Jupyter/ratings.csv")
ratings_rdd.take(10)

['userId,movieId,rating,timestamp',
 '1,31,2.5,1260759144',
 '1,1029,3.0,1260759179',
 '1,1061,3.0,1260759182',
 '1,1129,2.0,1260759185',
 '1,1172,4.0,1260759205',
 '1,1263,2.0,1260759151',
 '1,1287,2.0,1260759187',
 '1,1293,2.0,1260759148',
 '1,1339,3.5,1260759125']

In [352]:
header_ratings = ratings_rdd.take(1)[0]
ratings_rdd_withoutheader = ratings_rdd.filter(lambda line : line!=header_ratings)
header_ratings

'userId,movieId,rating,timestamp'

In [353]:
ratings_rdd_withoutheader.take(10)

['1,31,2.5,1260759144',
 '1,1029,3.0,1260759179',
 '1,1061,3.0,1260759182',
 '1,1129,2.0,1260759185',
 '1,1172,4.0,1260759205',
 '1,1263,2.0,1260759151',
 '1,1287,2.0,1260759187',
 '1,1293,2.0,1260759148',
 '1,1339,3.5,1260759125',
 '1,1343,2.0,1260759131']

In [354]:
new_ratings_data = ratings_rdd_withoutheader.map(lambda line: line.split(",")).map(lambda r: (r[0],r[1],r[2]))

In [355]:
new_ratings_data.take(3)

[('1', '31', '2.5'), ('1', '1029', '3.0'), ('1', '1061', '3.0')]

In [356]:
header_movies = movies_rdd.take(1)[0]
movies_rdd_withoutheader = movies_rdd.filter(lambda line : line!=header_movies)

In [357]:
new_movies_data = movies_rdd_withoutheader.map(lambda line: line.split(",")).map(lambda r: (r[0],r[1]))
new_movies_data.take(10)

[('1', 'Toy Story (1995)'),
 ('2', 'Jumanji (1995)'),
 ('3', 'Grumpier Old Men (1995)'),
 ('4', 'Waiting to Exhale (1995)'),
 ('5', 'Father of the Bride Part II (1995)'),
 ('6', 'Heat (1995)'),
 ('7', 'Sabrina (1995)'),
 ('8', 'Tom and Huck (1995)'),
 ('9', 'Sudden Death (1995)'),
 ('10', 'GoldenEye (1995)')]

In [358]:
# Build the recommendation model using Alternating Least Squares
rank = 10
numIterations = 10

# Model used for prediction

In [359]:
model = ALS.train(new_ratings_data,rank, numIterations)

In [360]:
test_data = new_ratings_data.map(lambda p: (p[0], p[1]))

In [361]:
predictions = model.predictAll(test_data).map(lambda r: ((r[0], r[1]), r[2]))

In [362]:
ratingNprediction = new_ratings_data.map(lambda r: ((int(r[0]), int(r[1])), float(r[2]))).join(predictions)

# The user rating and the prediction

In [412]:
ratingNprediction.take(50)

[((1, 1029), (3.0, 3.0557497917770418)),
 ((1, 1061), (3.0, 2.823217917849904)),
 ((1, 1129), (2.0, 2.288753176305595)),
 ((1, 1293), (2.0, 1.9354866281027512)),
 ((1, 1405), (1.0, 0.9401619423416037)),
 ((1, 1953), (4.0, 3.831853348871155)),
 ((1, 2105), (4.0, 3.5630839376749526)),
 ((1, 2193), (2.0, 2.362233369790772)),
 ((2, 52), (3.0, 3.0278998146530105)),
 ((2, 144), (3.0, 2.967256783771995)),
 ((2, 168), (3.0, 2.6946708144642946)),
 ((2, 208), (3.0, 2.6097532874776546)),
 ((2, 248), (3.0, 2.831833964075469)),
 ((2, 272), (3.0, 3.21321483143364)),
 ((2, 292), (3.0, 2.761842954676365)),
 ((2, 296), (4.0, 4.27799145583791)),
 ((2, 300), (3.0, 3.2202414200148652)),
 ((2, 356), (3.0, 3.129291020223235)),
 ((2, 364), (3.0, 4.1045698782628)),
 ((2, 372), (3.0, 2.2916931715052504)),
 ((2, 468), (4.0, 3.9743703251135494)),
 ((2, 480), (4.0, 4.135193541903432)),
 ((2, 500), (4.0, 3.6509360191637206)),
 ((2, 508), (4.0, 3.3441404820760896)),
 ((2, 552), (3.0, 2.999221047500358)),
 ((2, 588)