In [9]:
from pyspark.mllib.recommendation import ALS, MatrixFactorizationModel, Rating
from pyspark import SparkConf,SparkContext
from pyspark.sql import SQLContext
import math

In [2]:
conf = SparkConf().setAppName("modelBased")
sc = SparkContext(conf = conf)

In [3]:
# Load and parse the data
train_data = sc.textFile("train_data.txt")
train_rdd = train_data.map(lambda l: l.split(',')).persist()
test_data = sc.textFile("test_data.txt")
test_rdd = test_data.map(lambda l: l.split(',')).persist()

In [4]:
user_id_table = sc.broadcast(train_rdd.map(lambda line:line[0]).union(test_rdd.map(lambda line:line[0]))\
                            .distinct().zipWithIndex().collectAsMap())
busiess_id_table = sc.broadcast(train_rdd.map(lambda line:line[1]).union(test_rdd.map(lambda line:line[1]))\
                            .distinct().zipWithIndex().collectAsMap())

In [5]:
train_ratings = train_rdd.map(lambda line:(user_id_table.value[line[0]],busiess_id_table.value[line[1]],float(line[2]))).map(lambda l: Rating(l[0], l[1], l[2])).persist()

In [6]:
train_ratings.take(10)

[Rating(user=282273, product=0, rating=5.0),
 Rating(user=282273, product=1, rating=1.0),
 Rating(user=282273, product=131548, rating=1.0),
 Rating(user=282273, product=98666, rating=1.0),
 Rating(user=282273, product=2, rating=5.0),
 Rating(user=282273, product=98667, rating=1.0),
 Rating(user=282273, product=98668, rating=5.0),
 Rating(user=282273, product=65794, rating=5.0),
 Rating(user=282273, product=131549, rating=5.0),
 Rating(user=282273, product=32924, rating=2.0)]

In [14]:
rank = 3
numIterations = 15
lambda_value = 0.3
model = ALS.train(train_ratings, rank, numIterations,lambda_value)

In [15]:
test_ratings = test_rdd.map(lambda line:(user_id_table.value[line[0]],busiess_id_table.value[line[1]],float(line[2]))).map(lambda l: Rating(l[0], l[1], l[2])).persist()
test_data = test_ratings.map(lambda p: (p[0], p[1]))
predictions = model.predictAll(test_data).map(lambda r: ((r[0], r[1]), r[2]))
ratesAndPreds = test_ratings.map(lambda r: ((r[0], r[1]), r[2])).join(predictions)

In [16]:
MSE = ratesAndPreds.map(lambda r: (r[1][0] - r[1][1])**2).mean()
RMSE = math.sqrt(MSE)
print("Mean Squared Error = " + str(RMSE))


Mean Squared Error = 1.9017059066505093
