In [1]:
import os
import sys
from pyspark import SparkContext, SparkConf
import json
import itertools
import math
import numpy as np
import time

In [2]:
appName = 'assignment3'
master = 'local[*]'
conf = SparkConf().setAppName(appName).setMaster(master)
# conf = SparkConf().setAll([('spark.executor.memory', '8g'), ('spark.executor.cores', '3'), ('spark.cores.max', '3'), ('spark.driver.memory','8g')])
sc = SparkContext(conf=conf)
sc.setLogLevel("INFO")

In [3]:
sc

In [7]:
def read_csv_line(line):
    line = line.split(',')
    return (line[0].strip(), line[1].strip(), line[2].strip())

def read_business(data_path):
    rdd = sc.textFile(data_path)
    header = rdd.first()
    rdd = rdd.filter(lambda x: x != header).map(read_csv_line)
    unique_users = rdd.map(lambda x: x[0]).distinct().collect()    
    userid_index = {}
    for i, uid in enumerate(unique_users):
        userid_index[uid] = i
    
    business_users = rdd.map(lambda x: (x[1], userid_index[x[0]])).groupByKey().map(lambda x: (x[0], list(x[1])))
    
    return business_users

In [36]:
out = read_business('./data/yelp_train.csv').take(5)

In [8]:
# Total number of users 11270
p = 13591
m = 64
number_of_hashes = 64
a = np.random.randint(1, high=p, size = number_of_hashes)
b = np.random.randint(0, high=p, size = number_of_hashes)

def minhash(vector, a, b, p, m):
    vector = np.array(vector)
    return list(np.min((a.reshape(1, a.shape[0]) * vector.reshape(vector.shape[0], 1) + b) %p %m, axis=0))

In [21]:
rdd = read_business('./data/yelp_train.csv')
st = time.time()
business_sig = rdd.map(lambda x: (x[0], minhash(x[1], a, b, p, m)))
print(time.time() - st)

0.00021839141845703125


In [18]:
from itertools import combinations

def hash_bands(x):
    doc_id, sig = x
    b = 16
    r = 4
    output = []
    for i in range(0, b):
        output.append(((i, hash(frozenset(sig[r*i:(i+1)*r]))), doc_id))
    return output


def jaccard_sim(x, y):
    x = set(x)
    y = set(y)
    return len(x.intersection(y)) / len(x.union(y))


def index_signatures(signatures):
    index = {}
    for x, y in signatures:
        index[x] = y
        
    return index

def find_similarity(candidate, index):
    bucket = candidate[1]
    output = []
    for candidate1, candidate2 in combinations(bucket, 2):
        sim = jaccard_sim(index[candidate1], index[candidate2])
        if(sim >= 0.5):
            output.append((candidate1, candidate2, sim))
            
    return output


rdd = read_business('./data/yelp_train.csv')
signatures = rdd.collect()
index = index_signatures(signatures)
business_sig = sc.parallelize(signatures).map(lambda x: (x[0], minhash(x[1], a, b, p, m)))
candidates = business_sig.flatMap(hash_bands).groupByKey().map(lambda x: (x[0], list(x[1]))).filter(lambda x: len(x[1]) > 1)
print(candidates.take(10))
similar_pairs = candidates.flatMap(lambda x: find_similarity(x, index)).collect()

[((10, 5129871794670698270), ['3MntE_HWbNNoyiLGxywjYA', 'S_JZst0IGCads_KhoueMCw', 'MYD64NGYbF0n7sQZ-I4o5g', 'q3Z-jPBok8tky9vBGqGHtg', '618yZxC6stz-Un4xLsp5vw', 'mqIpIqxnT7f94eilrNkZOw', 'fOKeVUHC7nb6c8G5sILdHQ', '3M9zgL7r7wDcrx6MP63Vpg', 'XK7_B2moEchOdg8UQQc6Bg', '6WyTW2XS6NMlcU04aA1bYA']), ((14, 9000494312501845833), ['XO2hZb0xC8jTexSHG4SxFg', 'xVEtGucSRLk5pxxN0t4i6g', 'cgoHLKJLsAK1ww0_ezNFnw', 'ciDXNLbcwI7qt5guNKZPIw', 'MwCai0x6GILtK4KkwufZig', 'ii_zDR8crbQVJehMMWoCkQ', 'kRhjWeAPs-U5RmakIKz0Pg', 'xaJS4YP0lISl2V5z2qA3GQ', 'M51gw2cz_vXarNBLbKkOxQ', 'jXHmmircEbhQmZjWDR1xIA', 'f_eiOrEcMnkHB7GvQVOHkQ', '8xPmlVJy2o6x0J04CBpEMQ', 'upB0RQl-l529IVwgOpwOQQ', 'ugLqbAvBdRDc-gS4hpslXw', 'aY0Y058p2_iWxMAZron2mg', 'odyYR1Bg2bqmp1kVlm8OYw', 'jobP3ywRd3QNZ_GCoPG2DQ', 'dm6sO_Y8JdKTE1ZM955yug', '04kZ5CSh6oKhI5huU5bLdg', '68vGFIH94olrtPi1kAEYVw', 'a0v7Si0DK4cIko7AQY4YXg', '56_j_lcGj5X9SpM2KzLm4A', 'U9aA5H13y7t9xWnoQslV0Q', 'gwYt_q3Ob7aQJIefIBsZiQ', 'AercPW1B2YVNK3UsiPDkTg', 'ajoqEHnCZTD8-8GqGLq9-Q', 'kG

In [10]:
similar_pairs[:2]

[('dOEMnyvSk8njKJfqsDaqMA', 'XNFA-aJFX8IQjol8Dg6GnA', 1.0),
 ('H8mq-5oLkF9jlfyYi3vvOw', '4XGjbI2Ggi-kdgt9eZR83w', 0.6)]

In [15]:
def save_output(output, file_path):
    with open(file_path, "wt") as f:
        f.write('business_id_1, business_id_2, similarity\n')
        for pairs in output:
            f.write(','.join([str(x) for x in pairs])+'\n')
    return 

save_output(similar_pairs, 'output.csv')

In [None]:
['3', '4'].join()

In [13]:
# b*r = n
# (1/b)^(1/r) = threshold


24732

In [None]:
sc.parallelize