In [1]:
from pyspark import SparkConf, SparkContext
import sys
import csv
import math
import pandas as pd
import matplotlib.pyplot as plt
from matplotlib.pyplot import MultipleLocator
import binascii

globalShingleList = []
globalShingleRDD = [0]*101 # 要變成 RDD
globalShingleLen = 0


### 計算word weight

因為檔案中包含的文字包括數字和英文  
為了確保取出的word有意義，因此設計一個計算word weight的方式  
在weight高於一定數值的時候才把他當成有意義的字  
並建立(1, [word])的tuple，加入wordList中

In [2]:
def mapper_filter_word(line):
    words = line.split(" ")
    wordList = []
    
    for word in words:
        if word != "":
            weight = 0
            for c in word:
                if (ord(c) >= 65 and ord(c) <= 90) or (ord(c) >= 97 and ord(c) <= 122): # 65~90小寫字母, 97~122大寫字母
                    weight += 1
                elif ord(c) >= 48 and ord(c) <= 57: # 48~57數字
                    weight += 0.5
                else:
                    weight += 0.1
            
            if weight > (len(word)/3)*2:
                wordList.append((1, [word]))
                
    return wordList 

### 建立Shingle  

輸入line = (1, [words])，[words]是包含所有word的list  
每三個字的字串建立一個shingle, 用binascii.crc32計算crc32加密結果作為shingle value  
並存成shingleList=[shingle1, shingle2...]  

In [3]:
def mapper_encode(line):
    shingleList = []
    length = len(line[1])
    
    i = 0
    for word in line[1]:
        if i == length-2: break
        else:
            shingle = line[1][i] + line[1][i+1] + line[1][i+2]
            shingle = shingle.encode()
            shingleNum = binascii.crc32(shingle)
            shingleList.append(shingleNum)
        i += 1
        
    return shingleList

### shingle value轉換成對應shingle id
對輸入的line=[docID, shingle1, shingle2, ...]  
尋找該shingle在globalShingleList中對應到的index  
存成(docID, [shingleIndex])形式

In [4]:
def mapper_setUpShingleId(line):
    shingleList = []
    length = len(line)
    docId = line[0]
    
    for i in range(1, length):
        shingleIndex = globalShingleList.index(line[i])
        shingleList.append((docId, [shingleIndex]))
        
    return shingleList

### minhash 計算hash
line = (docID, [shingleList])  
對shingleList中所有shingleID計算hash的結果
需要使用100種hash function進行計算  
使用hash function產生方式為 j*(j+1)+shingle*(j+2)+j % globalShingleLen  
並對持續更新100個hashVal的值  
最終產生minhashList = (docID, [hash1, ..., hash100])

In [5]:
def mapper_minhash(line):
    minHashList = []
    length = len(line[1])
    docId = line[0]
    
    hashVal = [globalShingleLen]*100
     
    for i in range(length):
        for j in range(100):
            val = (j*(j+1) + line[1][i]*(j+2) + j) % globalShingleLen
            if val < hashVal[j]: 
                hashVal[j] = val
                
    minHashList.append((docId, hashVal))
    
    return minHashList

### LSH hash
line = (0, [hash1~hash100])  
因為 r=2, b=50  
對每兩個item進行division hash, hash function = (item1*item2)%10007，共計算50次  
算出來的結果與對應的document ID建立成一個((bandID, hash_value)， [docID])的tuple, 存進LSHList中

In [6]:
def mapper_LSH(line):
    LSHlist = []
    docId = line[0]
    
    for i in range(50):
        item1 = line[1][2*i]
        item2 = line[1][2*i + 1]
        bucket = (item1*item2)%10007
        
        LSHlist.append(((i+1, bucket), [docId]))
        
    return LSHlist

### 計算candidate pair
line = ((bandID, hash_value), [doc1, doc2, ...])  
檢查hash到一樣value的document數量，兩個以上的，其中所有doc為candidate pair  
並回傳((candidate1, candidate2, ...), 1)

In [7]:
def mapper_filter_candidate(line):
    candidateList = []
    candidate_length = len(line[1])
    candidates = line[1]
    
    if candidate_length <= 1: pass
    else:
        candidateList.append((tuple(candidates), 1))
    return candidateList

### 計算similarity
line = (candidate list, 同一組candidate list出現的頻率)  
對candidateList中所有candidate pair  
透過先前算好的sigMatrix（包含所有document的100個hash value）  
計算similarity，也就是hash value相同的機率  
並將結果((candidate1, canditate2), similarity)存進simList

In [8]:
def mapper_calSim(line):
    simList = []
    length = len(line[0])
    
    for i in range(length): # 有多少 candidate
        for j in range(i+1, length):
            match = 0
            for h in range(100):
                if sigMatrix[line[0][i]-1][h] == sigMatrix[line[0][j]-1][h]:
                    match += 1
                    
            sim = match / 100
            simList.append(((line[0][i], line[0][j]), sim))
            
    return simList

In [9]:
def mapper_switch(line):
    return (line[1], line[0])

### Step1 : Shingle

讀進所有document  
對每一個document，把document中的單詞建立成[(1, [word1, word2, ...]]的rdd  
將所有document rdd建立成[shingle1, .shingle2,...]的shingles rdd,  
再將shingles rdd轉成shingle list  

接著檢查所有不重複的shingle，暫存在localShingleList中，進行排序後存進globalShingleRDD  
globalShingleRDD是準備轉成rdd的2d array
對每個document i，我們將document的localShingleListh存成[docID, shingle1, shingle2...]的格式
並存入globalShingleRDD中  
並且另外建立globalShingleList，存入所有document的shingles  

建立好所有需要的list後，把globalShingleRDD轉成rdd  
並且用mapper_setUpShingleId把其中所有shingle value轉換成shingle index形式
最終產生一個[(0, [shingleList]), (1, [shingleList]), ....] 格式的doc_shingleId

In [10]:
conf = SparkConf().setMaster("local").setAppName("TermProject")
sc = SparkContext.getOrCreate(conf=conf)

for i in range(101):
    if i >=0 and i <=8: fileName = "./athletics/" + "00" + str(i+1) + ".txt"
    elif i >= 9 and i <= 98: fileName = "./athletics/" + "0" + str(i+1) + ".txt"
    else: fileName = "./athletics/" + str(i+1) + ".txt"
        
    document = sc.textFile(fileName).flatMap(mapper_filter_word).reduceByKey(lambda x, y: x+y)
    
    # encode shingle
    localShingleList = []
    localShingleList.append(i) # 第幾個 document
    
    shingles = document.flatMap(mapper_encode)
    shingleNum = shingles.count() # # of shingles (per document)
    shingleList = shingles.take(shingleNum) # 從 RDD 變成 list
    
    for j in range(shingleNum):
        if shingleList[j] not in localShingleList:
            localShingleList.append(shingleList[j])
        if shingleList[j] not in globalShingleList:
            globalShingleList.append(shingleList[j])
    localShingleList.sort()
    globalShingleRDD[i-1] = localShingleList
    
globalShingleList.sort()
globalShingleLen = len(globalShingleList)

doc_shingleId = sc.parallelize(globalShingleRDD).flatMap(mapper_setUpShingleId).reduceByKey(lambda x, y: x+y)


                                                                                

### Step2 : Min hashing

對doc_shingleId內存的所有single用mapper_minhash進行hash
產生minhashResult = [(0, [hash1, ..., hash100]), (1, [hash1, ..., hash100]),...]  
轉換成signature matrix=[  
hashList0,  
hashList1,  
...  
]

In [11]:
# minhash
minhashResult = doc_shingleId.flatMap(mapper_minhash)
minhashResultList = minhashResult.take(101)

sigMatrix = [0]*101
for i in range(101):
    index = minhashResultList[i][0] - 1
    sigMatrix[index] = minhashResultList[i][1]


                                                                                

### Step3 : LSH
用mapper_LSH對minhashResult = [(0, [hash1, ..., hash100]),...] 計算會hash到哪一個bucket  
產生LSHresult，其中每一組key-value pair為((bandID, hash_value), [doc1, doc2, ...])  
以紀錄哪一些doc會hash到一樣的值  
並進一步用mapper_filter_candidate算出hash到同一個value的candidate list  
key-value pair = (candidate list, 同一組candidate list出現的次數)  

In [12]:
# LSH
LSHresult = minhashResult.flatMap(mapper_LSH).reduceByKey(lambda x, y: x+y)
LSHresult = LSHresult.flatMap(mapper_filter_candidate).reduceByKey(lambda x, y: x+y)

### Step4 : 算出最高similarity的documents
用mapper_calSim算出所有candidate pair和他們的similarity,  
因為重複的candidate pair可能出現，reduceByKey用來把重複的candidate pair刪掉  
最終產生sim的key-value pair = ((docID1, docID2), similarity)  
用mapper_switch將key-value pair轉成(similarity, (docID1, docID2))後，由大到小排序  
並且印出前10大的similarity及document name，即為最終結果  

In [13]:
def reducer_sim(x, y):
    if x > y :
        return x
    else:
        return y

In [14]:
# calculate Jaccard similarity & write to file
sim = LSHresult.flatMap(mapper_calSim).reduceByKey(reducer_sim)
sim = sim.map(mapper_switch).sortByKey(False)

fp = open("Outputfile.txt","w")
similarity = sim.collect()
for i in range(10):
    if similarity[i][1][0]+1 > 0 and similarity[i][1][0]+1 <= 9: 
        doc1 = "00" + str(similarity[i][1][0]+1)
    elif similarity[i][1][0]+1 > 9 and similarity[i][1][0]+1 <= 99: 
        doc1 = "0" + str(similarity[i][1][0]+1)
    else: 
        doc1 = str(similarity[i][1][0]+1)
        
    if similarity[i][1][1]+1 > 0 and similarity[i][1][1]+1 <= 9: 
        doc2 = "00" + str(similarity[i][1][1]+1)
    elif similarity[i][1][1]+1 > 9 and similarity[i][1][1]+1 <= 99: 
        doc2 = "0" + str(similarity[i][1][1]+1)
    else: 
        doc2 = str(similarity[i][1][1]+1)
            
    num = round(similarity[i][0]*100, 2)
    
    fp.write("(" + doc1 + ", " + doc2 + "): " + "{:.2f}".format(num) + "%\n")

fp.seek(0)
fp.close()


In [15]:
sc.stop()