# Movielens - Created by Jesús García García - Masterś Degree in Big Data Analytics UEM


In [1]:
# Source https://spark.apache.org/docs/2.2.0/mllib-collaborative-filtering.html

In [2]:
import warnings
import time
from pyspark import SparkConf, SparkContext
from pyspark.mllib.recommendation import ALS, MatrixFactorizationModel, Rating


# Delete execution warnings
warnings.filterwarnings('ignore')
# Stop SparkContext previously created when we put "pyspark" command on Terminal
#sc.stop()

# set up environment
#conf=SparkConf()
# conf.set("spark.executor.memory", "3g")
#conf.set("spark.cores.max", "2")
#conf.setAppName("MovieLensJGG")
#conf.setMaster('localhost')
#sc = SparkContext('local', conf=conf)


# Load and parse the data
data = sc.textFile("file:///home/cloudera/Downloads/ratings100k.csv")
ratings = data.map(lambda l: l.split(','))\
    .map(lambda l: Rating(int(l[0]), int(l[1]), float(l[2])))
# Train 80%, Test 20%    
trainData, testData = ratings.randomSplit([0.8,0.2],seed=42)

In [3]:
# Build the recommendation model using Alternating Least Squares
rank = 10
numIterations = 10
model = ALS.train(trainData, rank, numIterations)

# Evaluate the model on training data
now = time.time()
testdata = testData.map(lambda p: (p[0], p[1]))
predictions = model.predictAll(testdata).map(lambda r: ((r[0], r[1]), r[2]))
ratesAndPreds = ratings.map(lambda r: ((r[0], r[1]), r[2])).join(predictions)
MSE = ratesAndPreds.map(lambda r: (r[1][0] - r[1][1])**2).mean()
elapsed = (time.time() - now)
print "Average time for evaluating the model on training data: {:.2f}ms".format(elapsed)
print("Mean Squared Error = " + str(MSE))
print(predictions)
print(ratesAndPreds)

Average time for evaluating the model on training data: 94.91ms
Mean Squared Error = 0.793508422526
PythonRDD[234] at RDD at PythonRDD.scala:43
PythonRDD[235] at RDD at PythonRDD.scala:43


In [4]:
# Show 'userId','movieId','prediction' zipped by index
predictions.zipWithIndex().take(10)

[(((384, 1197), 3.4055480000631873), 0),
 (((384, 593), 3.783229836080878), 1),
 (((4551, 162), 4.694435177008994), 2),
 (((4551, 163), 2.4677092057926626), 3),
 (((4551, 858), 4.044437707353276), 4),
 (((4551, 924), 4.824257565465284), 5),
 (((4551, 3730), 4.561968747333529), 6),
 (((4551, 3798), 1.5983559525190403), 7),
 (((4551, 1411), 3.7238974176341983), 8),
 (((4551, 2289), 4.3020726333073585), 9)]

In [5]:
# Show 'userId','movieId','prediction' in descending order by prediction
predictions.takeOrdered(10, key = lambda x: -x[1])

[((3113, 297), 10.0076166746921),
 ((573, 2314), 9.478392936673151),
 ((3097, 3003), 8.153251639775025),
 ((4119, 3833), 8.105737864701812),
 ((3998, 3832), 7.665092479597142),
 ((2490, 3566), 7.620707026659138),
 ((606, 343), 7.531386871379306),
 ((1754, 1261), 7.494842860502503),
 ((4125, 1517), 7.452316487379109),
 ((2686, 2710), 7.444993859711412)]

In [6]:
# Show 'userId','movieId','prediction' in ascending order by prediction
predictions.takeOrdered(10, key = lambda x: x[1])

[((1341, 3940), -4.495183027355824),
 ((87, 2157), -4.305969368196735),
 ((4775, 2914), -3.637732878667646),
 ((5334, 1114), -3.4658511484016152),
 ((3240, 1539), -3.016577276391475),
 ((206, 3910), -2.8203504592566313),
 ((1713, 1534), -2.7109208542104537),
 ((2640, 2041), -2.333351358340016),
 ((87, 1649), -2.053842267461786),
 ((126, 2721), -2.0169561747330285)]

In [7]:
# Show 'userId','movieId','rating','prediction' zipped by index
ratesAndPreds.zipWithIndex().take(10)

[(((822, 1620), (4.0, 3.182799384909713)), 0),
 (((2680, 608), (4.0, 3.5265800880373686)), 1),
 (((3430, 1262), (3.0, 4.052250150553348)), 2),
 (((1151, 1265), (4.0, 4.317060269650934)), 3),
 (((2895, 339), (3.0, 3.0515072507260177)), 4),
 (((2934, 2334), (3.0, 3.116187265302175)), 5),
 (((4124, 118), (3.0, 3.072943194197491)), 6),
 (((4407, 3253), (3.0, 3.433521332568125)), 7),
 (((593, 1885), (4.0, 3.6045353848520922)), 8),
 (((424, 2148), (3.0, 3.2314793164247173)), 9)]

In [8]:
# Show 'userId','movieId','rating','prediction' in ascending order by rating
ratesAndPreds.takeOrdered(10,key =lambda x: x[1])

[((1341, 3940), (1.0, -4.495183027355824)),
 ((87, 2157), (1.0, -4.305969368196735)),
 ((206, 3910), (1.0, -2.8203504592566313)),
 ((87, 1649), (1.0, -2.053842267461786)),
 ((5016, 1983), (1.0, -1.9916753857214111)),
 ((800, 427), (1.0, -1.776090105352421)),
 ((87, 390), (1.0, -1.7690830639895512)),
 ((3662, 3531), (1.0, -1.7560758607913098)),
 ((3610, 2221), (1.0, -1.4810178791655946)),
 ((99, 2938), (1.0, -1.3855805426572285))]

In [9]:
# Show 'userId','movieId','rating','prediction' in descending order by rating
# It's needed to convert RDD to Pandas Dataframe
import pandas as pd
dfaux = ratings.map(lambda r: ((r[0], r[1]), r[2])).join(predictions).toDF()
df = dfaux.toPandas()
df.columns = ['userId_movieId','rating_prediction']
df.sort(['rating_prediction'], ascending=False)

Unnamed: 0,userId_movieId,rating_prediction
138146,"(573, 2314)","(5.0, 9.47839293667)"
161391,"(5476, 3379)","(5.0, 7.12907769945)"
65171,"(1237, 2411)","(5.0, 7.03489654233)"
169408,"(1521, 668)","(5.0, 6.91491255437)"
62751,"(46, 1982)","(5.0, 6.88094347783)"
192812,"(3032, 297)","(5.0, 6.74047418906)"
105482,"(5075, 924)","(5.0, 6.6581930658)"
24348,"(5004, 3548)","(5.0, 6.61077462356)"
135457,"(46, 1347)","(5.0, 6.55691036682)"
175700,"(128, 3073)","(5.0, 6.54277311223)"


In [10]:
# Save and load model
model.save(sc, "file:///home/cloudera/Downloads/ALSJGG100kModel")
sameModel = MatrixFactorizationModel.load(sc, "file:///home/cloudera/Downloads/ALSJGG100kModel")
sameModel

<pyspark.mllib.recommendation.MatrixFactorizationModel at 0x7fde28fe7210>