In [8]:
from pyspark import RDD, SparkContext

from itertools import combinations
import sys
import math
import json
import time
from copy import deepcopy
import csv

In [3]:
train_file = "yelp_train.csv"
#train_file = "train_easy_2.csv"
#test_file = "test_easy_2.csv"
test_file = "yelp_val.csv"
output = "pred.csv"

In [2]:
test_partitions = 2
sc = SparkContext("local", "HW3").getOrCreate()
sc.setLogLevel("WARN")

In [12]:
min_ratings_for_biz = 40
min_corated = 40

In [13]:
#input_RDD = sc.textFile(train_file, test_partitions).map(lambda line: line.split(','))
input_RDD = sc.textFile(train_file).map(lambda line: line.split(','))
header = input_RDD.first()
input_RDD = input_RDD.filter(lambda data: data != header).map(lambda x: (x[0], x[1], float(x[2])))

In [14]:
test_RDD = sc.textFile(test_file).map(lambda line: line.split(','))
header = test_RDD.first()
test_RDD = test_RDD.filter(lambda data: data != header).map(lambda x: (x[0], x[1], x[2]))

In [15]:
biz_map = input_RDD.map(lambda x: x[1]).distinct().sortBy(lambda x: x).zipWithIndex().collectAsMap()

In [16]:
user_map = input_RDD.map(lambda x:x[0]).distinct().sortBy(lambda x:x).zipWithIndex().collectAsMap()

In [17]:
user_biz_RDD = input_RDD.map(lambda x: (user_map[x[0]], [biz_map[x[1]],x[2]]))

In [19]:
biz_user_RDD = input_RDD.map(lambda x: (biz_map[x[1]], [user_map[x[0]],x[2]]))

In [20]:
ratings_RDD = input_RDD.map(lambda x: ( (user_map[x[0]],biz_map[x[1]]),x[2])).collectAsMap()

In [21]:
set_of_users_that_reviewed_biz = biz_user_RDD.map(lambda x:(x[0],{x[1][0]})).reduceByKey(lambda x,y: x|y).collectAsMap()

In [22]:
average_biz_rating = biz_user_RDD.map(lambda x: (x[0],(int(x[1][1]),1))).reduceByKey(lambda a,b:(a[0]+b[0],a[1]+b[1])).map(lambda x: (x[0], (x[1][0]/x[1][1]))).collectAsMap()

In [23]:
biz_for_pearson_score = biz_user_RDD.groupByKey().map(lambda x:(x[0],len(x[1]))).filter(lambda x:x[1]>min_ratings_for_biz).map(lambda x:x[0]).collect()

Pearson weights is for Business

In [24]:
def Get_pearson_correlation(x):

    biz_i = x[0][0]
    r_i_dash = average_biz_rating[biz_i]
    biz_j = x[0][1]
    r_j_dash = average_biz_rating[biz_j]
    users = list(x[1])
    numerator = 0
    denom_i = 0
    denom_j = 0
    for user in users:
        numerator += (ratings_RDD[(user, biz_i)] - r_i_dash) * (ratings_RDD[(user, biz_j)] - r_j_dash)
        denom_i += pow(ratings_RDD[(user, biz_i)] - r_i_dash,2)
        denom_j += pow(ratings_RDD[(user, biz_j)] - r_j_dash,2)

    denominator = math.sqrt(denom_i) * math.sqrt(denom_j)
    try:
        w = numerator/denominator
        # Case Amplification
        w = w * pow(abs(w),1.5)
    except:
        return 0

    return w


In [25]:
pearson_weights = sc.parallelize(combinations(biz_for_pearson_score,2)).\
    map(lambda x: (x,set_of_users_that_reviewed_biz[x[0]]&set_of_users_that_reviewed_biz[x[1]])).\
    filter(lambda x:len(x[1])>50).\
    map(lambda x:(x[0],Get_pearson_correlation(x))).sortBy(lambda x:x[0])

In [26]:
pearson_weights.collect()

[((2090, 3453), 0.0006190587415607064),
 ((2090, 8200), 0.02182894633887842),
 ((2090, 10109), 6.042134126708259e-05),
 ((2090, 10929), 2.9214555357314523e-05),
 ((2090, 17852), -0.0027457066941605564),
 ((2500, 2090), 0.014623943849308747),
 ((2500, 8200), 0.00017853032114144082),
 ((2500, 16577), -0.0003423460759391244),
 ((2500, 17852), 4.545661310742922e-06),
 ((4788, 8200), 0.0008786941132097012),
 ((5709, 2090), 0.006277147642771002),
 ((5709, 8200), 0.00036541361064433255),
 ((5709, 10109), -0.003370565213565528),
 ((5709, 10929), 0.001168328364736924),
 ((5709, 16577), 0.00269095561283379),
 ((5709, 17852), 0.0040297056631606),
 ((6119, 6420), 0.022963385764412547),
 ((6420, 1967), 0.12351120321614137),
 ((6420, 2090), 0.12443468391347169),
 ((6420, 2500), 0.07846003435901536),
 ((6420, 8200), -0.011036752477523948),
 ((6420, 9353), 0.02541984273703853),
 ((6420, 10929), -0.0012521631026232592),
 ((6420, 13140), 0.006433917634271589),
 ((6420, 16577), 0.01662611870899385),
 ((6

In [30]:
def calc_rating(row):

    biz = row[0][1]
    row = row[1]
    numerator = 0
    denominator = 0
    for rating in row:
        numerator += rating[1] * rating[2]
        denominator += abs(rating[2])
    try:
        rating = numerator/denominator
    except:
        rating = average_biz_rating[biz]

    return rating



In [31]:
predictions = test_RDD.map(lambda x: (user_map.get(x[0]),biz_map.get(x[1]))).leftOuterJoin(user_biz_RDD) \
    .map(lambda x: ((x[1][0],x[1][1][0]),((x[0],x[1][0]),x[1][1][1]))) \
    .leftOuterJoin(pearson_weights)

In [None]:
predictions= predictions.filter(lambda x:x[1][1] is not None).filter(lambda x:x[1][1]>0).\
    map(lambda x:(x[1][0][0], (x[0], x[1][0][1],x[1][1])))\
    .groupByKey().map(lambda x: (x[0], calc_rating(x)))

In [32]:
predictions.collect()

[((20004, 14570), (((5514, 20004), 3.0), None)),
 ((20004, 14570), (((8412, 20004), 2.0), None)),
 ((1684, 7018), (((5514, 1684), 5.0), None)),
 ((1684, 7018), (((10132, 1684), 1.0), None)),
 ((16385, 14570), (((5514, 16385), 3.0), None)),
 ((7777, 20823), (((5514, 7777), 2.0), None)),
 ((23844, 13724), (((5514, 23844), 4.0), None)),
 ((18224, 2255), (((5514, 18224), 4.0), None)),
 ((5982, 16614), (((5514, 5982), 5.0), None)),
 ((6650, 12328), (((9822, 6650), 2.0), 0.029502993942247056)),
 ((18248, 8124), (((4388, 18248), 5.0), None)),
 ((18248, 8124), (((250, 18248), 5.0), None)),
 ((22699, 9187), (((9706, 22699), 5.0), None)),
 ((6256, 13236), (((6328, 6256), 5.0), None)),
 ((6256, 13236), (((804, 6256), 4.0), None)),
 ((18540, 1535), (((11122, 18540), 5.0), None)),
 ((16176, 3938), (((11122, 16176), 4.0), None)),
 ((22416, 9824), (((4572, 22416), 5.0), None)),
 ((14619, 8440), (((8206, 14619), 4.0), None)),
 ((14619, 8440), (((6444, 14619), 3.0), None)),
 ((14619, 8440), (((9366, 14

In [3]:
from pyspark import RDD, SparkContext

from itertools import combinations
import sys
import math
import json
import time
from copy import deepcopy
import csv

train_file = "yelp_train.csv"
test_file = "yelp_val.csv"
output = "output.json"


#sc = SparkContext("local", "HW3").getOrCreate()
#sc.setLogLevel("WARN")

min_ratings_for_biz = 55
min_corated = 55


start_time = time.time()

train_RDD = sc.textFile(train_file).map(lambda line: line.split(','))
header = train_RDD.first()
train_RDD = train_RDD.filter(lambda data: data != header).map(lambda x: (x[0], x[1], float(x[2])))

test_RDD = sc.textFile(test_file).map(lambda line: line.split(','))
header = test_RDD.first()
test_RDD = test_RDD.filter(lambda data: data != header).map(lambda x: (x[0], x[1], x[2]))

biz_map = train_RDD.map(lambda x: x[1]).distinct().sortBy(lambda x: x).zipWithIndex().collectAsMap()

biz_map_inversion = {v: k for k, v in biz_map.items()}

user_map = train_RDD.map(lambda x:x[0]).distinct().sortBy(lambda x:x).zipWithIndex().collectAsMap()

user_map_inversion = {v: k for k, v in user_map.items()}

user_biz_RDD = train_RDD.map(lambda x: (user_map[x[0]], [biz_map[x[1]],x[2]])).groupByKey().mapValues(lambda x:dict(x))

biz_user_RDD = train_RDD.map(lambda x: (biz_map[x[1]], [user_map[x[0]],x[2]]))

filtered_biz_user_RDD = train_RDD.map(lambda x: (biz_map[x[1]], [user_map[x[0]],x[2]])).groupByKey()\
    .filter(lambda x:len(x[1])>=min_ratings_for_biz).mapValues(lambda x: dict(x))

average_biz_rating = biz_user_RDD.map(lambda x: (x[0],(int(x[1][1]),1))).reduceByKey(lambda a,b:(a[0]+b[0],a[1]+b[1])).map(lambda x: (x[0], (x[1][0]/x[1][1]))).collectAsMap()

sc.broadcast(average_biz_rating)




def Get_pearson_correlation(x):
    #min_corated = 50
    ratings_i = x[0][1]
    ratings_j = x[1][1]
    corated_items = set(ratings_i).intersection(set(ratings_j))
    """
    Check if corated score is better or average score
    #r_j_dash = average_biz_rating[biz_i]
    #r_j_dash = average_biz_rating[biz_j]
    """
    if len(corated_items)<min_corated: return 0
    r_i_dash = 0.0
    r_j_dash = 0.0

    for biz in corated_items:
        r_i_dash+= ratings_i[biz]
        r_j_dash+= ratings_j[biz]
    r_i_dash /= len(corated_items)
    r_j_dash /= len(corated_items)

    numerator = 0.0
    denom_i = 0.0
    denom_j = 0.0
    for biz in corated_items:
        numerator += (ratings_i[biz] - r_i_dash) * (ratings_j[biz] - r_j_dash)
        denom_i += pow((ratings_i[biz] - r_i_dash),2)
        denom_j += pow((ratings_j[biz] - r_j_dash),2)

    denominator = math.sqrt(denom_i) * math.sqrt(denom_j)
    try:
        w = numerator/denominator
        # Case Amplification
        #w = w * pow(abs(w),1.5)
    except:
        return 0

    return w



def filter_pearson_weights(cartesian):
    for combination in cartesian:
        similarity_score = Get_pearson_correlation(combination)
        if similarity_score>0: yield (combination[0][0], combination[1][0]), similarity_score


def two_way_relationship(partition):
    for i in partition:
        yield i[0][0], (i[0][1], i[1])
        yield i[0][1], (i[0][0], i[1])

filtered_pearson_values = filtered_biz_user_RDD.cartesian(filtered_biz_user_RDD)\
        .filter(lambda x:x[0][0] < x[1][0])\
        .mapPartitions(filter_pearson_weights)

p_corr_map = filtered_pearson_values.mapPartitions(lambda x: two_way_relationship(x)).groupByKey().mapValues(lambda x: dict(x))


def recommend_ratings(x):


    user_ratings = x[1][0][1]
    correlation_map = x[1][1]
    if correlation_map is None or len(correlation_map)==0:
        try:
            return(average_biz_rating[x[0]])
        except:
            return(3)

    corated_biz = user_ratings.keys() & correlation_map.keys()



    numerator = 0
    denominator = 0
    for i in corated_biz:
        numerator += user_ratings[i]*correlation_map[i]
        denominator += abs(correlation_map[i])
    if denominator == 0:
        try:
            return(average_biz_rating[x[0]])
        except:
            return(3)
    return numerator/denominator

test_dataset = test_RDD.map(lambda x: (user_map.get(x[0]), biz_map.get(x[1],x[1])))
co_rated_users = test_dataset.leftOuterJoin(user_biz_RDD)
user_combined_w_pearson_map = co_rated_users.map(lambda x:(x[1][0],(x[0],x[1][1]))).leftOuterJoin(p_corr_map)

predictions = user_combined_w_pearson_map.map(lambda x: (user_map_inversion.get(x[1][0][0]), biz_map_inversion.get(x[0],x[0]), recommend_ratings(x))).collect()



0
1.0633933776725741
43.575334787368774


In [5]:
predictions

[('wf1GqnKQuvH-V3QN80UOOQ', 'LJwS9epPXXvzel7bZ616Gg', 4.5),
 ('do99OWeyyueHnjuHN-QjVQ', 'LJwS9epPXXvzel7bZ616Gg', 4.5),
 ('8RK_89OgXbeG4ohAdViggw', 'LJwS9epPXXvzel7bZ616Gg', 4.5),
 ('XhtMmcvMN4h9NF4C-eGD0g', 'LJwS9epPXXvzel7bZ616Gg', 4.5),
 ('RylA6VZUTRuMGBu4nHKbCw', 'LJwS9epPXXvzel7bZ616Gg', 4.5),
 ('U2N1EymyF7f20UVfR6k5Xg', 'LJwS9epPXXvzel7bZ616Gg', 4.5),
 ('wf1GqnKQuvH-V3QN80UOOQ', 'fja2EnaSxKu1D7uctMhw-w', 4.310344827586207),
 ('z0R494JwBcMDVt41_B2jPg', 'fja2EnaSxKu1D7uctMhw-w', 4.310344827586207),
 ('J2ypbaPCF34BJ9IY7W0PLA', 'fja2EnaSxKu1D7uctMhw-w', 4.310344827586207),
 ('TjXqTqYOXAs8VKo_otj3lw', 'fja2EnaSxKu1D7uctMhw-w', 4.310344827586207),
 ('wf1GqnKQuvH-V3QN80UOOQ', 'knJh7agyWMdPyndI9dURjw', 2.4878048780487805),
 ('VLy6l8L72adD7iTBNqiPXw', 'knJh7agyWMdPyndI9dURjw', 2.4878048780487805),
 ('_fpHwIiEJOp79qED9YSE7Q', 'knJh7agyWMdPyndI9dURjw', 2.4878048780487805),
 ('WL_U3MBUfTkeHL9lYxoleg', 'knJh7agyWMdPyndI9dURjw', 2.4878048780487805),
 ('vFcW_gM7-NyBv3xUYACTAQ', 'knJh7agyWMdPynd

In [4]:
import pickle

In [21]:
with open(output, 'w+') as f:
    f.write('user_id, business_id, prediction\n')

    for i in predictions:
        f.write(str(i[0]) + "," + str(i[1]) + "," + str(i[2]) + "\n")

In [15]:
y = user_combined_w_pearson_map.filter(lambda x: biz_map_inversion.get(x[0],x[0]) is None)

In [16]:
y.collect()

[(None,
  ((11122,
    {6631: 5.0,
     10415: 2.0,
     4818: 3.0,
     12243: 4.0,
     22663: 3.0,
     5590: 3.0,
     17731: 4.0,
     7947: 3.0,
     24013: 4.0,
     22841: 3.0,
     11823: 2.0,
     7980: 5.0,
     5336: 2.0,
     7708: 4.0,
     12175: 3.0,
     4326: 3.0,
     19011: 4.0,
     12866: 4.0,
     1349: 5.0,
     16289: 4.0,
     13666: 5.0,
     11559: 4.0,
     1356: 5.0,
     3242: 3.0,
     19840: 3.0,
     12025: 4.0,
     17908: 4.0,
     8889: 5.0,
     17934: 3.0,
     10997: 5.0,
     1230: 4.0,
     19994: 3.0,
     10128: 3.0,
     4238: 5.0,
     2709: 4.0,
     4302: 5.0,
     14207: 4.0,
     19711: 3.0,
     15592: 4.0,
     1886: 5.0,
     17550: 3.0,
     22464: 3.0,
     14741: 3.0,
     4220: 4.0,
     11018: 3.0,
     6006: 2.0,
     1535: 5.0,
     4841: 4.0,
     20114: 4.0,
     14962: 4.0,
     5700: 4.0,
     23022: 3.0,
     17158: 4.0,
     19885: 3.0,
     20354: 4.0,
     10767: 1.0,
     1378: 3.0,
     19001: 1.0,
     11206: 3.0,
 

In [13]:
x = user_combined_w_pearson_map.collect()

In [14]:
x

[(8661,
  ((10670,
    {21547: 5.0,
     14117: 5.0,
     19739: 5.0,
     14342: 4.0,
     15203: 5.0,
     3520: 5.0,
     23761: 5.0,
     20035: 5.0,
     11170: 5.0,
     22085: 1.0,
     14598: 4.0,
     567: 4.0,
     1327: 5.0,
     17307: 4.0,
     20128: 5.0,
     9119: 3.0,
     14356: 4.0,
     11860: 3.0,
     10915: 4.0,
     7875: 3.0,
     12034: 4.0,
     15663: 5.0,
     6570: 5.0,
     21413: 3.0,
     21326: 3.0,
     17183: 5.0,
     21897: 3.0,
     8498: 5.0,
     3487: 5.0,
     23300: 5.0,
     23536: 4.0,
     20145: 5.0,
     9373: 5.0,
     635: 4.0}),
   None)),
 (8661,
  ((7428,
    {10514: 5.0,
     6717: 5.0,
     17975: 4.0,
     23867: 5.0,
     4024: 5.0,
     17026: 3.0,
     8902: 5.0,
     10638: 4.0,
     17683: 4.0,
     11753: 5.0,
     20001: 4.0,
     3999: 5.0,
     15087: 4.0,
     15687: 5.0,
     7414: 5.0,
     9085: 4.0,
     24220: 4.0,
     6684: 5.0,
     11936: 4.0,
     11860: 3.0,
     23310: 5.0,
     16074: 5.0,
     4521: 5.0,
 

In [11]:
rmse = 0
skipped = 0
for p in predictions:
    try:
        rmse += (float(p[2])-gt[p[0]+p[1]])**2
    except:
        print(p)
        skipped+=1

('zBi_JWB5uUdVuz3JLoAxGQ', None, 3)
('FTZyZbr1eF7s3Ss7uSLcXQ', None, 3)
('rCWrxuRC8_pfagpchtHp6A', None, 3)
('ZLS7cwa1UplSB8nRrwrHIQ', None, 3)
('p8eHROuRDiStavK6iEkS8g', None, 3)
('OLn8EvPsu4hNug8V5PF2jA', None, 3)
('haSh72Q0MsQZUpWPeVgp0Q', None, 3)
('3egcdazws_x1wW35jgXfNw', None, 3)
('NfU0zDaTMEQ4-X9dbQWd9A', None, 3)
('YCkRZrpcLi177y5wUSQX2w', None, 3)
('RlpkcJqctkKXl-LO1IAtig', None, 3)
('0FMte0z-repSVWSJ_BaQTg', None, 3)
('7fZu8ud7JXFthU0jPxVf4g', None, 3)
('VLy6l8L72adD7iTBNqiPXw', None, 3)
('BbwldZQM1i89cAvV7fce3A', None, 3)
('hqmnMdDS-Opjp3BfBJA8qA', None, 3)
('B_4cl4IGRqYb0wQSe-rYjQ', None, 3)
('3KEHmthOP_mSk40zJ4gANA', None, 3)
('O3q-nwYZykMmacxjru01Zg', None, 3)
('VVimjPD7f8O5D1fhy5vSpw', None, 3)
('_O8yXAtCODPoKkO79lnYXw', None, 3)
('h1KzT38UnfzR7maSi7oVUw', None, 3)
('xqUn2yqxQRq5MrthbRb-7Q', None, 3)
('Reuq65EOFI938Yg8xgff9g', None, 3)
('H4W1CyyrU3Ru3rcSMkCG7g', None, 3)
('zsZBYWYEmLLs81_f-HHM8w', None, 3)
('xN-xoLhTHUFfTS_BMGG0xg', None, 3)
('WAb5zPq711dVH7GzxZpLNw', N

In [8]:
x = biz_map_inversion.get("owLXnEhlMLsFpemYoOxNkg")
x

In [2]:
predictions

[('wf1GqnKQuvH-V3QN80UOOQ', 'LJwS9epPXXvzel7bZ616Gg', 4.5),
 ('do99OWeyyueHnjuHN-QjVQ', 'LJwS9epPXXvzel7bZ616Gg', 4.5),
 ('8RK_89OgXbeG4ohAdViggw', 'LJwS9epPXXvzel7bZ616Gg', 4.5),
 ('XhtMmcvMN4h9NF4C-eGD0g', 'LJwS9epPXXvzel7bZ616Gg', 4.5),
 ('RylA6VZUTRuMGBu4nHKbCw', 'LJwS9epPXXvzel7bZ616Gg', 4.5),
 ('U2N1EymyF7f20UVfR6k5Xg', 'LJwS9epPXXvzel7bZ616Gg', 4.5),
 ('wf1GqnKQuvH-V3QN80UOOQ', 'fja2EnaSxKu1D7uctMhw-w', 4.310344827586207),
 ('z0R494JwBcMDVt41_B2jPg', 'fja2EnaSxKu1D7uctMhw-w', 4.310344827586207),
 ('J2ypbaPCF34BJ9IY7W0PLA', 'fja2EnaSxKu1D7uctMhw-w', 4.310344827586207),
 ('TjXqTqYOXAs8VKo_otj3lw', 'fja2EnaSxKu1D7uctMhw-w', 4.310344827586207),
 ('wf1GqnKQuvH-V3QN80UOOQ', 'knJh7agyWMdPyndI9dURjw', 2.4878048780487805),
 ('VLy6l8L72adD7iTBNqiPXw', 'knJh7agyWMdPyndI9dURjw', 2.4878048780487805),
 ('_fpHwIiEJOp79qED9YSE7Q', 'knJh7agyWMdPyndI9dURjw', 2.4878048780487805),
 ('WL_U3MBUfTkeHL9lYxoleg', 'knJh7agyWMdPyndI9dURjw', 2.4878048780487805),
 ('vFcW_gM7-NyBv3xUYACTAQ', 'knJh7agyWMdPynd

In [4]:
with open(output, 'w+') as f:
    f.write('user_id, business_id, prediction\n')

    for i in predictions:
        f.write(str(i[0][0]) + "," + str(i[0][1]) + "," + str(i[1]) + "\n")