In [1]:
from pyspark import SparkConf, SparkContext
import pyspark.sql.functions as f
import math
import random

## lower_clean_str(x)
* Clear the punctuations in the documents.

In [2]:
def lower_clean_str(x):
    punc='!"#$%&\'()*+,-./:;<=>?@[\\]^_`{|}~'
    lowercased_str = x.lower()
    lowercased_str = lowercased_str.replace(r'\\n', '')
    for ch in punc:
        lowercased_str = lowercased_str.replace(ch, '')
    wordlist = lowercased_str.split(" ")
    return wordlist

## kshingles(line, name)
* Split the document into 3-shingles.
* Output format is (word1, word2, word3).

In [3]:
def kshingles(line, name):
    line_len = len(line)
    maplist = []
    for i in range (line_len-2):
        maplist.append((line[i], line[i+1], line[i+2]))
    return maplist

## kshingles2(line, name)
* Add the file name in front of every shingles.
* The output format is ('00x', (word1, word2, word3)).

In [4]:
def kshingles2(line, name):
    if name < 10:
        newName = '00' + str(name)
    elif name < 100:
        newName = '0' + str(name)
    else:
        newName = str(name)
    line_len = len(line)
    maplist = []
    for i in range (line_len-2):
        maplist.append((newName, [line[i], line[i+1], line[i+2]]))
    return maplist

## Global Variables
* Define number of document and number of hash functions.

In [5]:
docSize = 101 #101
hashSize = 100 #100

## Main
* Read all the documents from folder.
* Set an index to all the shingles.

In [6]:
sc.stop()

conf = SparkConf().setMaster("local").setAppName("LSH")
sc = SparkContext(conf=conf)

fileNum = 0
fileName = 'null'

for i in range(1, docSize+1):
    
    if i < 10:
        fileName = 'athletics/00' + str(i) + '.txt'
    elif i < 100:
        fileName = 'athletics/0' + str(i) + '.txt'
    else:
        fileName = 'athletics/' + str(i) + '.txt'
    '''
    if i == 1:
        fileName = 'athletics/047.txt'
    elif i == 2:
        fileName = 'athletics/048.txt'
    elif i == 3:
        fileName = 'athletics/049.txt'
    '''
    file = sc.textFile(fileName)
    clean_text = file.map(lower_clean_str)
    shingles = clean_text.flatMap(lambda x: kshingles(x, str(i)))
    shingles_with_key = clean_text.flatMap(lambda x: kshingles2(x, i)).groupByKey().mapValues(tuple)
    #print(shingles_with_key.collect())

    if i == 1:
        main_shingles = sc.parallelize([])
        main_shingles_with_key = sc.parallelize([])

    main_shingles = main_shingles.union(shingles)
    main_shingles_with_key = main_shingles_with_key.union(shingles_with_key)

    print(i)
        
main_shingles = main_shingles.repartition(4)
main_shingles = main_shingles.distinct()
main_shingles = main_shingles.zipWithIndex()

#print(main_shingles.collect())
#print(main_shingles_with_key.collect())
#print(main_shingles_with_key.cache().take(5))

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101


## mapper1(line)
* Check every shingles of a file, and change them to their corresponding index.
* If a shingle is in the file, then the value will be set as 1.
* If a shingle is not in the file, then the value will be set as 0.
* The output format is (('00x', (shingle's index, 1)) or (('00x', (shingle's index, 0)).

In [7]:
def mapper1(line):
    maplist = []
    flag = 0
    for i in range(len(line[0][1])):
        if (line[1][0][0] == line[0][1][i][0]) and (line[1][0][1] == line[0][1][i][1]) and (line[1][0][2] == line[0][1][i][2]):
            maplist.append((line[0][0], (line[1][1], 1)))
            flag = 1
            break
    if flag == 0:
        maplist.append((line[0][0], (line[1][1], 0)))
    return maplist

In [8]:
#print(len(main_shingles_with_key.collect()))
shingles_length = len(main_shingles.collect())
#print(shingles_length)
cart_shingles = main_shingles_with_key.cartesian(main_shingles)
#print(cart_shingles.collect()[386])
#print(cart_shingles.collect()[0])
change_to_num = cart_shingles.flatMap(mapper1).groupByKey().mapValues(tuple)
#print(change_to_num.collect())

## Prepare for Minhash
* Generate mod variables for hash function (ax+b)%c.
* There are total 100 hash functions, so there need to be 200 varibles.
* For c, it should be a prime number.
* isPrime(n) can check whether the number n is prime or not.

In [9]:
def isPrime(n) : 
    # Corner cases 
    if (n <= 1) : 
        return False
    if (n <= 3) : 
        return True
    # This is checked so that we can skip 
    # middle five numbers in below loop 
    if (n % 2 == 0 or n % 3 == 0) : 
        return False
    i = 5
    while(i * i <= n) : 
        if (n % i == 0 or n % (i + 2) == 0) : 
            return False
        i = i + 6
    return True


primes = [j for j in range(len(main_shingles.collect()), 3*len(main_shingles.collect())) if isPrime(j)]
mod_num = random.choice(primes)
int_mod_num = int(mod_num)

#mod_a = random.sample(range(1, len(main_shingles.collect())), 100)
#mod_b = random.sample(range(1, len(main_shingles.collect())), 100)

modd = random.sample(range(1, len(main_shingles.collect())), 200)
#print(mod_a)
#print(mod_b)


for j in range(1, docSize+1):
    if j < 10:
        file_idx = '00' + str(j)
    elif j < 100:
        file_idx = '0' + str(j)
    else:
        file_idx = str(j)
    rdd_mod_tmp = sc.parallelize([(file_idx, tuple((modd, int_mod_num)))])
    if j == 1:
        main_rdd_mod = sc.parallelize([])
        main_rdd_mod = main_rdd_mod.union(rdd_mod_tmp)
    else:
        main_rdd_mod = main_rdd_mod.union(rdd_mod_tmp)

#print(main_rdd_mod.collect())


In [10]:
#print(change_to_num.collect())
name_sig_mod = change_to_num.union(main_rdd_mod).groupByKey().mapValues(tuple)
    
#print(name_sig_mod.collect())

## doMinhash(line)
* Check if current document contains specific 3-shingles.
* If the document does contain that shingle, then find the minimum result of the hash function.
* The output format is (document's index, (signatures, k-shingles)).

In [11]:
#print(name_sig_mod.collect()[3])
#print(name_sig_mod.collect()[4])
#print(shingles_length)

def doMinhash(line):
    maplist = []
    wordlist = []
    wordlist2 = []
    for i in range(100): #第幾個hash function
        mini = math.inf
        for j in range(shingles_length): #幾個shingles要丟進hash
            if line[1][0][j][1] == 1 and ((line[1][1][0][2*i]*j + line[1][1][0][2*i+1])%line[1][1][1]) < mini:
                mini = (line[1][1][0][2*i]*j + line[1][1][0][2*i+1])%line[1][1][1]
        wordlist.append(mini)
    for k in range(shingles_length):
        if line[1][0][k][1] == 1:
            wordlist2.append(line[1][0][k][0])
    #maplist.append((line[0], tuple(wordlist)))
    maplist.append((line[0], (tuple(wordlist), tuple(wordlist2))))
    return maplist

name_sig = name_sig_mod.flatMap(doMinhash)
#print(name_sig.collect())
#print(name_sig.cache().take(5))

## hashToBuckets(line)
* Generate two variables a and b for the bucket's hash function.
* Band = 50 and row = 2, and I set my bucket number as 10000.
* Then my hash function goes like (ax + by)%10000

In [12]:
bucketNum = 10000
bucket_a = random.sample(range(50, 200), 1)
bucket_b = random.sample(range(50, 200), 1)

def hashToBuckets(line):
    wordlist = []
    maplist = []
    for i in range(50):
        bucket_idx = (line[1][0][2*i]*int(bucket_a[0]) + line[1][0][2*i+1]*int(bucket_b[0]))%bucketNum
        wordlist.append(bucket_idx)
    maplist.append((line[0], (tuple(wordlist), line[1][1])))
    return maplist

after_LSH = name_sig.flatMap(hashToBuckets)

after_LSH = after_LSH.repartition(4)
#print(after_LSH.collect())
#print(after_LSH.cache().take(5))

In [13]:
after_LSH = after_LSH.repartition(4)
combine_two_pairs = after_LSH.cartesian(after_LSH)

#print(combine_two_pairs.collect())

## calculate_jaccard(line)
* For every two documents, if they have more than one same bucket, then they are a candidate pair.
* For each candidate pair, calculate their jaccard similarity.
* The output format is ((jaccard similarity, (document 1's index, document 2's index))

In [14]:
def calculate_jaccard(line):
    maplist = []
    
    if line[0][0] != line[1][0] and int(line[0][0]) < int(line[1][0]): #去掉同樣的
        #先判斷candidate pair
        isCandidate = 0
        for i in range(50):
            if line[0][1][0][i] == line[1][1][0][i]:
                isCandidate = 1

        #計算jaccard similarity
        if isCandidate == 1:
            same_elements = len(set(line[0][1][1]).intersection(set(line[1][1][1])))
            total_shingles = len(set(line[0][1][1]).union(set(line[1][1][1])))
            jaccard_similarity = same_elements/total_shingles*100
            maplist.append((jaccard_similarity, (line[0][0], line[1][0])))
    return maplist

combine_two_pairs = combine_two_pairs.repartition(4)
two_pairs_jaccard = combine_two_pairs.flatMap(calculate_jaccard)
#print(two_pairs_jaccard.collect())

## sortBy(lambda x: x[0], ascending=False)
* Sort the result in ascending order.

In [15]:
two_pairs_jaccard = two_pairs_jaccard.repartition(4)
descending_pairs = two_pairs_jaccard.sortBy(lambda x: x[0], ascending=False)
#print(descending_pairs.cache().take(10))
#print(descending_pairs.collect()[0])

In [16]:
#descending_pairs = descending_pairs.repartition(4)

for i in range(10):
    print((int(descending_pairs.collect()[i][1][0]), int(descending_pairs.collect()[i][1][1])), str(descending_pairs.collect()[i][0])+'%')


(12, 20) 100.0%
(52, 84) 100.0%
(47, 49) 75.76301615798923%
(30, 35) 71.0%
(49, 88) 51.47579693034239%
(48, 49) 48.5781990521327%
(23, 38) 48.17708333333333%
(14, 40) 40.03392705682782%
(47, 88) 39.033124440465535%
(47, 48) 36.80430879712747%


In [17]:
sc.stop()