### Import libraries

In [1]:
from __future__ import division
from pyspark import SparkConf, SparkContext
from pyspark.mllib.recommendation import ALS, Rating
import time
import json
import sys
from math import sqrt
from pyspark.sql.types import *


### Import Dataset

In [2]:
import findspark
findspark.init()

import pyspark
findspark.find()

from pyspark import SparkContext
sc = SparkContext()

In [3]:
train = sc.textFile('./Train.csv')
test = sc.textFile('./Test.csv')

In [4]:
train_header = train.first()
train_filtered = train.filter(lambda row: row != train_header)
test_header = test.first()
test_filtered = test.filter(lambda row: row != test_header)

In [5]:
def mapDict(vals):
        return_dict = {}
        for val in vals:
            return_dict[val[0]] = val[1]
        return return_dict

def findAvg(vals):
        '''Find average value for each row'''
        count = 0
        tot = 0
        for item, rating in vals.items():
            tot += rating
            count += 1
        average = float(tot) / count
        
        for item, rating in vals.items():
            vals[item] = rating - average
        vals['row_avg'] = average
        return vals

In [6]:
train_rdd = train_filtered.map(lambda x: x.split(',')).map(lambda line_split: (line_split[0], (line_split[1], float(line_split[2]))))
test_rdd = test_filtered.map(lambda x: x.split(',')).map(lambda line_split: (line_split[0], line_split[1], float(line_split[2])))

In [21]:
train_rdd.take(5)

[('276725', ('034545104X', 0.0)),
 ('276726', ('0155061224', 5.0)),
 ('276727', ('0446520802', 0.0)),
 ('276729', ('052165615X', 3.0)),
 ('276729', ('0521795028', 6.0))]

In [7]:
training_group = train_rdd.groupByKey().mapValues(list)
training_group_dict = training_group.mapValues(mapDict)
training_group_dict_avg = training_group_dict.mapValues(findAvg)

In [None]:
training_group_dict_avg

In [8]:
training_group_dict_one = training_group_dict_avg.map(lambda x: (1, x))
training_one_reduce = training_group_dict_one.groupByKey().mapValues(list).map(lambda x: x[1])

In [22]:
training_one_reduce.collect()

[[('276725', {'034545104X': 0.0, 'row_avg': 0.0}),
  ('276729', {'052165615X': -1.5, '0521795028': 1.5, 'row_avg': 4.5}),
  ('276733', {'2080674722': 0.0, 'row_avg': 0.0}),
  ('276736', {'3257224281': 0.0, 'row_avg': 8.0}),
  ('276737', {'0600570967': 0.0, 'row_avg': 6.0}),
  ('276746',
   {'0425115801': 0.0,
    '0449006522': 0.0,
    '0553561618': 0.0,
    '055356451X': 0.0,
    '0786013990': 0.0,
    '0786014512': 0.0,
    'row_avg': 0.0}),
  ('276754', {'0684867621': 0.0, 'row_avg': 8.0}),
  ('276765', {'9029716894': 0.0, 'row_avg': 0.0}),
  ('276768', {'9057868059': 0.0, 'row_avg': 4.0}),
  ('276780',
   {'8434811634': -4.666666666666667,
    '8484330478': 2.333333333333333,
    '8484332039': 2.333333333333333,
    'row_avg': 4.666666666666667}),
  ('276786',
   {'2864322102': 3.8333333333333335,
    '8402065945': -2.1666666666666665,
    '8423314901': -2.1666666666666665,
    '842333533X': -2.1666666666666665,
    '8427911769': -2.1666666666666665,
    '8433914456': -2.1666666666

In [9]:
training_data_compile = training_one_reduce.collect()
training_data_compile = training_data_compile[0]
training_compile_dict = {}
for i in range(len(training_data_compile)):
    training_compile_dict[training_data_compile[i][0]] = training_data_compile[i][1]
    
    '''Get item to user dictionary so we can use it to find rows containing certain item faster'''
item_to_user_pre = train_rdd.map(lambda x: (x[1][0], x[0])).groupByKey().mapValues(list)
item_to_user_compile = item_to_user_pre.collect()
item_to_user_dict = {}
for i in range(len(item_to_user_compile)):
    item_to_user_dict[item_to_user_compile[i][0]] = set(item_to_user_compile[i][1])

In [10]:
test_data = test_rdd.collect()
RMSE_tmp = 0
tmp_result = []
pearson_threshold = 0.3
random_pred = 0
upper_limit = 150

In [11]:
for test in test_data:
        '''Get all the rows corresponding to cur user and item of test dataset'''
        cur_user, cur_item = test[0], test[1]
        filtered_train = {}
        if cur_item not in training_compile_dict or cur_user not in item_to_user_dict:
            '''If it's an unseen business id, assign some random prediction'''
            prediction = 5
            random_pred += 1
        else:
            '''We want to attach row corresponding to current item'''
            filtered_train[cur_user] = training_compile_dict[cur_user]
            cur_user_info = filtered_train[cur_user]
            
            '''Get a list of user_id who contains the current item'''
            row_set = item_to_user_dict[cur_item]
            for row in row_set:
                if len(training_compile_dict[row]) < upper_limit:
                    filtered_train[row] = training_compile_dict[row]
                
            '''Compute Pearson for each row and add to the final result if Pearson
            passes the threshold value'''
            predict_num = 0
            predict_den = 0
            for user, item_list in filtered_train.items():
                if user != cur_user:
                    num = 0
                    den1 = 0
                    den2 = 0
                    for item, rating in item_list.items():
                        if item in cur_user_info and item != cur_item and item != 'row_avg':
                            num += rating * cur_user_info[item]
                            den1 += rating**2
                            den2 += (cur_user_info[item])**2
                    denom = sqrt(den1) * sqrt(den2)
                    if num == 0 or denom == 0:
                        pearson = 0
                    else:
                        pearson = float(num) / denom
                    if pearson > pearson_threshold:
                        predict_num += (filtered_train[user][cur_item]) * pearson
                        predict_den += abs(pearson)
            if predict_num == 0 or predict_den == 0:
                prediction = cur_user_info['row_avg']
            else:
                prediction = cur_user_info['row_avg'] + float(predict_num) / predict_den
                prediction = (prediction + cur_user_info['row_avg']) / 2.0 
        '''Save the results which consists of user_id, business_id, ground truth and predicted'''
        tmp_result.append((test, prediction))
        '''Compile results for final MSE computation'''
        RMSE_tmp += (test[2] - prediction)**2

In [12]:
RMSE = sqrt(RMSE_tmp / len(test_data))
RMSE

4.337211085478778