In [1]:
""" 
<< Implementing ALS for yelp data set >>
1. Filtered the dataset to consider only Tempe businesses
2. Created two files, tempe_train and tempe_test using df.randomSplit()
3. Refer script (tempe_als_data_prep)
4. Trained ALS using tempe_train
"""

import math

import findspark
findspark.init()

import pyspark
from pyspark.mllib.recommendation import ALS, MatrixFactorizationModel, Rating

import collections
import time
import sys



sc = pyspark.SparkContext("local", "Recommendation system")

training_data = sc.textFile("tempe_train.csv")
test_data = sc.textFile("tempe_test.csv")

#training_data = sc.textFile(sys.argv[1])
#test_data = sc.textFile(sys.argv[2])

header = training_data.first()
header_test = test_data.first()

train_rdd = training_data.filter(lambda z: z != header) \
    .map(lambda x: x.split(',')).map(lambda x: (x[0], x[1], x[2]))

test_rdd = test_data.filter(lambda z: z != header) \
    .map(lambda x: x.split(',')).map(lambda x: (x[0], x[1], x[2]))

all_dist_busId_test = test_rdd.map(lambda x: x[1]).distinct()
all_dist_busId_train = train_rdd.map(lambda x: x[1]).distinct()

all_dist_busId_final = all_dist_busId_test.union(all_dist_busId_train).distinct()  # .collect()

all_busId_dict = {}
uniqueId_forAll_busId = all_dist_busId_final.zipWithUniqueId()
all_busId_dict = uniqueId_forAll_busId.collectAsMap()

##########################################################################################################

all_dist_userId_test = test_rdd.map(lambda x: x[0]).distinct()
all_dist_userId_train = train_rdd.map(lambda x: x[0]).distinct()

all_dist_userId_final = all_dist_userId_test.union(all_dist_userId_train).distinct()

all_userId_dict = {}
uniqueId_forAll_userId = all_dist_userId_final.zipWithUniqueId()
all_userId_dict = uniqueId_forAll_userId.collectAsMap()

final_train_rdd2 = train_rdd.map(lambda x:((all_userId_dict[x[0]], all_busId_dict[x[1]], x[2])))

final_train_rdd2.collect()

final_ratings = final_train_rdd2.map(lambda x: Rating(x[0], x[1], x[2]))

rank = 3
numIterations = 20

model = ALS.train(final_ratings, rank, numIterations, 0.3, seed=300)

final_test_rdd = test_rdd.map(lambda x: ((all_userId_dict[x[0]], x[0]), (all_busId_dict[x[1]], x[1]), x[2]))

final_test_rdd2 = test_rdd.map(lambda x: (all_userId_dict[x[0]], all_busId_dict[x[1]]))

predictions = model.predictAll(final_test_rdd2).map(lambda r: ((r[0], r[1]), r[2]))

final_test_rdd = test_rdd.map(lambda x: ((all_userId_dict[x[0]], all_busId_dict[x[1]]), (x[0], x[1], x[2])))

ratesAndPreds = final_test_rdd.join(predictions)

MSE = ratesAndPreds.map(lambda x: ((float(x[1][0][2]) - x[1][1]) ** 2)).mean()

#print(MSE)


abs_diff_between_rate_and_pred = ratesAndPreds.map(lambda x: abs(float(x[1][0][2]) - x[1][1]))

abs_diff_between_rate_and_pred.take(10)

zero_to_one = abs_diff_between_rate_and_pred.filter(lambda x: 0 <= x <= 1.0).count()
one_to_two = abs_diff_between_rate_and_pred.filter(lambda x: 1.0 <= x <= 2.0).count()
two_to_three = abs_diff_between_rate_and_pred.filter(lambda x: 2.0 <= x <= 3.0).count()
three_to_four = abs_diff_between_rate_and_pred.filter(lambda x: 3.0 <= x <= 4.0).count()
greater_than_four = abs_diff_between_rate_and_pred.filter(lambda x: x >= 1.0).count()

print("Root Mean Squared Error = " + str(math.sqrt(MSE)))
print(">=0 and <1: " + str(zero_to_one))
print(">=1 and <2: " + str(one_to_two))
print(">=2 and <3: " + str(two_to_three))
print(">=3 and <4: " + str(three_to_four))
print(">=4: " + str(greater_than_four))

final_results_dict = ratesAndPreds.map(lambda x: ((x[1][0][0], x[1][0][1]), x[1][1])).collectAsMap()

test_final_results_dict = collections.OrderedDict(sorted(final_results_dict.items()))


f = open("tempe_ModelBasedCF.txt", 'w+')

for k, v in test_final_results_dict.items():
    f.write(str(k[0]) + "," + str(k[1]) + "," + str(v) + '\n')






Root Mean Squared Error = 1.58434144893
>=0 and <1: 11640
>=1 and <2: 6893
>=2 and <3: 2713
>=3 and <4: 980
>=4: 11127
