In [1]:
import sys
from collections import defaultdict
from itertools import *
import random
import numpy as np
import pdb

from pyspark import SparkContext
sc=SparkContext.getOrCreate()

In [2]:
#lines = sc.parallelize([(1,1,7),(1,2,6),(1,3,7),(1,4,4),(1,5,5),(1,6,4),(2,1,6),(2,2,7),(2,4,4),(2,5,3),(2,6,4)\
 #                      ,(3,2,3),(3,3,3),(3,4,1),(3,5,1),(4,1,1),(4,2,2),(4,3,2),(4,4,3),(4,5,3),(4,6,4),(5,1,1)\
  #                     ,(5,3,1),(5,4,2),(5,5,3),(5,6,3)])

def parseVectorOnUser(line):
    '''
    Parse each line of the specified data file, assuming a "|" delimiter.
    Key is user_id, converts each rating to a float.
    '''
    return line[0],(line[1],float(line[2]))

def parseVectorOnItem(line):
    '''
    Parse each line of the specified data file, assuming a "|" delimiter.
    Key is item_id, converts each rating to a float.
    '''
    return line[1],(line[0],float(line[2]))

def sampleInteractions(item_id,users_with_rating,n):
    '''
    For items with # interactions > n, replace their interaction history
    with a sample of n users_with_rating
    '''
    if len(users_with_rating) > n:
        return item_id, random.sample(users_with_rating,n)
    else:
        return item_id, users_with_rating
    
def findUserPairs(item_id,users_with_rating):
    '''
    For each item, find all user-user pairs combos. (i.e. users with the same item) 
    '''
    l = []
    for user1,user2 in permutations(users_with_rating,2):
        l.append(((user1[0],user2[0]),(user1[1],user2[1])))
    return l

def keyOnFirstUser(user_pair,item_sim_data):
    '''
    For each user-user pair, make the first user's id the key
    '''
    (user1_id,user2_id) = user_pair
    return user1_id,(user2_id,item_sim_data)

def calcSim(user_pair,rating_pairs, shrink, similarity="cosine"):
    ''' 
    For each user-user pair, return the specified similarity measure,
    along with co_raters_count.
    '''
    sum_xx, sum_xy, sum_yy, sum_x, sum_y, n = (0.0, 0.0, 0.0, 0.0, 0.0, 0)
    
    if(similarity=="cosine"):
        for rating_pair in rating_pairs:
            sum_xx += np.float(rating_pair[0]) * np.float(rating_pair[0])
            sum_yy += np.float(rating_pair[1]) * np.float(rating_pair[1])
            sum_xy += np.float(rating_pair[0]) * np.float(rating_pair[1])
            # sum_y += rt[1]
            # sum_x += rt[0]
            n += 1

        cos_sim = cosine(sum_xy,np.sqrt(sum_xx),np.sqrt(sum_yy), shrink)
        return user_pair, (cos_sim,n)
    if(similarity=="jaccard"):
        for rating_pair in rating_pairs:
            sum_xx += np.float(rating_pair[0]) * np.float(rating_pair[0])
            sum_yy += np.float(rating_pair[1]) * np.float(rating_pair[1])
            sum_xy += np.float(rating_pair[0]) * np.float(rating_pair[1])
            # sum_y += rt[1]
            # sum_x += rt[0]
            n += 1
        jac_sim = jaccard(sum_xy, sum_xx, sum_yy, shrink)
        return user_pair, (jac_sim,n)

def cosine(dot_product,rating_norm_squared,rating2_norm_squared, shrink):
    '''
    The cosine between two vectors A, B
       dotProduct(A, B) / (norm(A) * norm(B))
    '''
    numerator = dot_product
    denominator = rating_norm_squared * rating2_norm_squared + shrink

    return (numerator / (float(denominator))) if denominator else 0.0

def jaccard(dot_product, den1, den2, shrink):
    '''
    The jaccard similarity between two vectors A, B
       dotProduct(A, B) / (dotProduct(A, A^t) + dotProduct(B, B^t) dotProduct(A, B)- 
    '''
    numerator = dot_product
    denominator = den1 + den2 - dot_product + shrink

    return (numerator / (float(denominator))) if denominator else 0.0
    

def nearestNeighbors(user,users_and_sims,n):
    '''
    Sort the predictions list by similarity and select the top-N neighbors
    '''
    
    users_and_sims.sort(key=lambda x: x[1][0],reverse=True)
    return user, users_and_sims[:n]

def topNRecommendations(user_id,user_sims,users_with_rating,seenDict,n, shrink):
    '''
    Calculate the top-N item recommendations for each user using the 
    weighted sums method
    '''

    # initialize dicts to store the score of each individual item,
    # since an item can exist in more than one item neighborhood
    totals = defaultdict(int)
    sim_sums = defaultdict(int)

    for (neighbor,(sim,count)) in user_sims:

        # lookup the item predictions for this neighbor
        unscored_items = users_with_rating.get(neighbor,None)

        if unscored_items:
            for (item,rating) in unscored_items:
                if item not in seenDict[user_id]:

                    # update totals and sim_sums with the rating data
                    totals[item] += sim * rating
                    sim_sums[item] += sim

    # create the normalized list of scored items 
    scored_items = [(total/(sim_sums[item]+shrink),item) for item,total in totals.items()]

    # sort the scored items in ascending order
    scored_items.sort(reverse=True)

    # take out the item score
    ranked_items = [x[1] for x in scored_items]

    return user_id,ranked_items[:n]

In [3]:
trainSet = sc.textFile("../train.csv")
trainSet = trainSet.map(lambda l: l.split(','))
trainSet = trainSet.filter(lambda line: 'userId' not in line)
trainSet = trainSet.map(lambda line: (int(line[0]), int(line[1]), int(line[2])))
meanVotePerUser = trainSet.map(lambda x: (x[0], (x[2], 1)))\
                        .reduceByKey(lambda x,y: (x[0]+y[0], x[1]+y[1]))\
                        .map(lambda x: (x[0], x[1][0]/x[1][1]))
meanVotePerUserDict = meanVotePerUser.collectAsMap()
lines = trainSet.map(lambda x: (x[0], x[1], x[2] - meanVotePerUserDict[x[0]]))
lines.take(5)

[(2738, 1, -5.666666666666667),
 (4716, 1, -2.9283333333333337),
 (13298, 1, 0.9902439024390244),
 (15122, 1, -0.829787234042553),
 (11326, 2, -0.833333333333333)]

In [4]:
item_user_pairs = lines.map(parseVectorOnItem).groupByKey().cache()
print("Item: ",item_user_pairs.collect()[0][0], "- User Pairs: ", list(item_user_pairs.collect()[0][1]))

Item:  32768 - User Pairs:  [(7287, -2.8645640074211505)]


In [5]:
pairwise_users = item_user_pairs.filter(
        lambda p: len(p[1]) > 1).map(
        lambda p: findUserPairs(p[0],p[1])).flatMap(lambda x: x).groupByKey()
pairwise_users.take(5)

[((5676, 12406), <pyspark.resultiterable.ResultIterable at 0x269856a7b00>),
 ((7571, 7615), <pyspark.resultiterable.ResultIterable at 0x269856a79b0>),
 ((4669, 1191), <pyspark.resultiterable.ResultIterable at 0x269856a7898>),
 ((15085, 9453), <pyspark.resultiterable.ResultIterable at 0x269856a7438>),
 ((1956, 6120), <pyspark.resultiterable.ResultIterable at 0x269856a70b8>)]

In [6]:
user_sims = pairwise_users.map(lambda p: calcSim(p[0],p[1],7,"jaccard"))\
                        .map(lambda p: keyOnFirstUser(p[0],p[1])).groupByKey()\
                        .map(lambda x : (x[0], list(x[1])))\
                        .map(lambda p: nearestNeighbors(p[0],p[1],50))
user_sims.take(5)

[(2,
  [(7586, (0.0, 1)),
   (6258, (0.0, 1)),
   (842, (0.0, 1)),
   (2436, (0.0, 1)),
   (8228, (0.0, 1)),
   (520, (0.0, 1)),
   (6590, (0.0, 1)),
   (4806, (0.0, 1)),
   (1710, (0.0, 1)),
   (8406, (0.0, 1)),
   (7040, (0.0, 1)),
   (6438, (0.0, 1)),
   (9278, (0.0, 1)),
   (5321, (0.0, 1)),
   (3809, (0.0, 1)),
   (3169, (0.0, 1)),
   (2107, (0.0, 1)),
   (1481, (0.0, 1)),
   (9243, (0.0, 1)),
   (4813, (0.0, 1)),
   (9177, (0.0, 1)),
   (1793, (0.0, 1)),
   (11501, (0.0, 1)),
   (10165, (0.0, 1)),
   (9195, (0.0, 1)),
   (10651, (0.0, 1)),
   (13857, (0.0, 1))]),
 (4,
  [(7232, (0.0, 1)),
   (12366, (0.0, 1)),
   (2124, (0.0, 1)),
   (1242, (0.0, 1)),
   (3576, (0.0, 1)),
   (5682, (0.0, 1)),
   (9996, (0.0, 1)),
   (3748, (0.0, 1)),
   (14632, (0.0, 1)),
   (2696, (0.0, 1)),
   (6438, (0.0, 1)),
   (924, (0.0, 1)),
   (206, (0.0, 1)),
   (1726, (0.0, 1)),
   (9846, (0.0, 1)),
   (12822, (0.0, 1)),
   (14198, (0.0, 1)),
   (2854, (0.0, 1)),
   (13338, (0.0, 1)),
   (1426, (0.0, 1

In [17]:
user_sims_modified = user_sims.map(lambda x: x[1])
user_item_hist = lines.map(parseVectorOnUser).groupByKey().collect()
ui_dict = {}
for (user,items) in user_item_hist: 
    ui_dict[user] = items

uib = sc.broadcast(ui_dict)
seen_item_dict = lines.map(lambda x: (x[0], [x[1]])).reduceByKey(lambda x,y: x+y).collectAsMap()
'''
Calculate the top-N item recommendations for each user
    user_id -> [item1,item2,item3,...]
'''
user_item_recs = user_sims.map(lambda p: topNRecommendations(p[0],p[1],uib.value,seen_item_dict,5,7)).collect()
user_item_recs

[(2, [37132, 37130, 37122, 37112, 37088]),
 (4, [37140, 37132, 37130, 37113, 37111]),
 (8, [29915, 28966, 25363, 18016, 28109]),
 (10, [32287, 11717, 28196, 9893, 33500]),
 (12, [26686, 22963, 17001, 11792, 10607]),
 (14, [4381, 35121, 22991, 16001, 26889]),
 (16, [6239, 4558, 4236, 33589, 16171]),
 (18, [3431, 18615, 2041, 7759, 23240]),
 (20, [6461, 36599, 28688, 9913, 35300]),
 (22, [12340, 18546, 2143, 27950, 30964]),
 (24, [22583, 24421, 33710, 33088, 31973]),
 (26, [37132, 37078, 37065, 37053, 37024]),
 (28, [9913, 25363, 30653, 2049, 10701]),
 (30, [13687, 30325, 11618, 37038, 36938]),
 (32, [6461, 9735, 33173, 10701, 13726]),
 (34, [18546, 30982, 8939, 8205, 5572]),
 (36, [30142, 32038, 32806, 5050, 12866]),
 (38, [12997, 15086, 30142, 20635, 10082]),
 (40, [2227, 17380, 11936, 5834, 19419]),
 (42, [29464, 29301, 33818, 8519, 36872]),
 (44, [37132, 37113, 37076, 37056, 37053]),
 (46, [19752, 22637, 20294, 9408, 4851]),
 (48, [36501, 18803, 30964, 17856, 13574]),
 (50, [6821, 25