# MDA Term Report - Team 6
107062130 陳奕君 110024503 戴仲廷

- 題目： **Finding Similar Articles**
- 說明：Given a set of BBCSports articles, Implement LSH using MapReduce to find out articles similarity.

首先將所有文章讀取進來，並格外分出它們的 id 當作日後紀錄 document ID 使用。

`txtpreprocess` 利用 regular expression 去除標點符號和節選出單詞，並把所有單字變小寫避免重複計算 shingle。

In [20]:
import re
import os
from pyspark import SparkConf, SparkContext
from random import shuffle, randint

In [21]:
try:
    sc.stop()
except:
    pass

In [22]:
conf = SparkConf().setMaster("local").setAppName("LSH")
sc = SparkContext(conf = conf)

In [23]:
def txtpreprocess(line):
    txt = []
    b = re.findall('[a-zA-Z\'-0123456789]+', line)
    for i in b:
        txt.append(i.lower())
    return txt

In [24]:
path = "./athletics"
files = os.listdir(path)
txts = []
for file in files:
    if 'txt' in file:
        position = path +'/'+ file
        with open(position,"r") as f:
            data = f.read()
            num = file.split('.')[0]
            txts.append(num+data)

#print(len(txts), txts)

## Shingling
這裏我們把文章單詞一個個拆解出來，以 3-shingle 的方式合在一起，由於我們實驗了兩種 min-hashing 的方式，`mapper_shingle_3` 會以 document ID 為 key，該 document 擁有的 shingle IDs 為 value 來表示 shingle-document boolean matrix，而 `new_mapper_shingle_3` 會以 shingle ID 為 key，擁有該 shingle 的 document IDs 為 value 來表示 boolean matrix。

`mapper_shingle_1` 將每篇文章處理後，把文章ID當成key，文章中每3個單字形成一個tuple，再將所有tuple收集起來當成value。

After `mapper_shingle_1`: `[(docId, [s1, s2, ..., ]), (docId, [s1, s2...,]), ...]` (s = shingle)

In [25]:
def mapper_shingle_1(line):
    num = line[0:3]
    wordlist = txtpreprocess(line[3:])
    maplist = []
    for i in range(len(wordlist)-2):
        maplist.append((wordlist[i],wordlist[i+1],wordlist[i+2]))
    return (num, maplist)

lines = sc.parallelize(txts).map(mapper_shingle_1)

#print(sorted(lines.collect()))

`mapper_shingle_2` 將上一個部分所有可能的shingle抓出當成key，擁有這個shingle的文章收集起來當成value。

After mapper_shingle_2: `[(s1, [doc1, doc2, ...]), (s2, [doc1, doc3...]), ...]` (s = shingle)

In [26]:
def mapper_shingle_2(line):
    # line = (docId, [s1, s2, ..., ])
    # [(s1, docID), (s2, docID)...]
    res = []
    for sg in line[1]:
        res.append((sg, line[0]))
    return res

shingle_doc = lines.flatMap(mapper_shingle_2).groupByKey().mapValues(list)
SHINGLES_SIZE = shingle_doc.count()

#sd = shingle_doc.collect()
#print(len(sd), SHINGLES_SIZE)
#print(sd)

                                                                                

`mapper_shingle_3` 將上一個部分再轉變回以文章ID當成key，把所有shingle編號，並將每篇文章擁有的shingle ID收集起來當成value。

After mapper_shingle_3: `[(docID, [sID1, sID2...]), ...]` (sID = shingle ID)

In [27]:
shingle_size = 0
def mapper_shingle_3(line):
    # input: (s1, [doc1, doc2, ...])
    # output: [(doc1, sID), (doc2, sID)]
    global shingle_size
    shingle_size += 1
    res = []
    for doc in line[1]:
        res.append((doc, shingle_size))
    return res

doc_shingleID = shingle_doc.flatMap(mapper_shingle_3).groupByKey().mapValues(list)

#print(doc_shingleID.collect())

In [28]:
doc_shingleID_new = doc_shingleID.sortByKey().collect()

After new_mapper_shingle_3: `[(sID1, [doc1, doc2, ...]), (sID2, [doc1, doc3...]), ...]` (s = shingle)

In [29]:
shingle_size = 0
def new_mapper_shingle_3(line):
    # input: (s1, [doc1, doc2, ...])
    global shingle_size
    shingle_size += 1
    
    return (shingle_size, line[1])

shingleID_doc = shingle_doc.map(new_mapper_shingle_3)
# print(len(shingleID_doc.collect()))

## Min-hashing
### Method 1 - Permuting rows

`build_minhash_func` 會產生 100 種 0 ~ `SHINGLES_SIZE` 的排列，當作 hash function 使用。

`mapper_minhash` 把 `doc_shingleID` 作為 input，再根據 `minhash_func` 去看該 document 有沒有那些 shingle，最後生成 `[(docID, [min-hashv1, min-hashv2, ..., min-hashv100])...]`。

After mapper_minhash: `[(docID, [min-hashv1, min-hashv2, ..., min-hashv100])...]`

In [30]:
# def create_hash_func(size: int):
#     # function for creating the hash vector/function
#     hash_ex = list(range(0, size))
#     shuffle(hash_ex)
#     return hash_ex

def build_minhash_func(nbits: int):
    # function for building multiple minhash vectors
    hashes = []
    for i in range(nbits):
        hash_ex = list(range(0, SHINGLES_SIZE))
        shuffle(hash_ex)
        hashes.append(hash_ex)
    return hashes

# we create 100 minhash vectors
minhash_func = build_minhash_func(100)

In [31]:
def mapper_minhash(line):
    # (docID, [sID, sID...])
    signature = []
    for func in minhash_func:
        for i in range(0, SHINGLES_SIZE):
            idx = func.index(i)
            if idx in line[1]:
                signature.append(i)
                break
    return (line[0], signature)

min_hash = doc_shingleID.map(mapper_minhash)
# mh = min_hash.sortByKey().collect()
# print(mh)

### Method 2 - Row hashing
- p = MAGIC_NUMBER = 500009 (一個比 N 大的質數）
- N = SHINGLES_SIZE
- h(x) = ((ax + b) % p) % N    （參考講義 P.41 公式）

`create_hash_func` 創造出 100 種不同排列組合的參數 a, b，a 為 1 ~ 100，b 為 `randint(1, N)`。


`mapper_minhash_1` 以前面的 `shingleID_doc` 為 input，首先先算出 $h_i(r)$，再來 iterate 所有的 document 來創造 `((docID, hashIdx), h_r[hashIdx]`，到這邊就算出來所有可能的 M(i, c)，接著用 `reducer_minhash_1` 來拿取最小的 M(i, c)，接著用 `mapper_minhash_2` 和 `mapper_minhash_3` 來把 rdd 整理成方便 LSH 處理的 `[(docID, [min-hashv1, min-hashv2, ..., min-hashv100])...]`。

實作過程比較：method 2 比 method 1 速度快上非常多，同樣都是把 min_hash 完的值 collect 出來看，method 2 只需數秒，method 1 卻要長達好幾分鐘，可見優化是奏效的。

After mapper_minhash: `[(docID, [min-hashv1, min-hashv2, ..., min-hashv100])...]`

In [32]:
MAGIC_NUMBER = 500009

In [33]:
def create_hash_func():
    hash_func = []
    for a in range(1, 101):
        b = randint(1, SHINGLES_SIZE)
        hash_func.append([a, b])
    return hash_func

def mapper_minhash_1(line):
    # Input: (sID1, [doc1, doc2, ...])
    # Output: ((docID, i), M(i, c))
    signature = []
    h_r = []
    for func in minhash_func:
        h_r.append(((func[0] * line[0] + func[1]) % MAGIC_NUMBER) % SHINGLES_SIZE)
    for c in line[1]:
        for hashIdx in range(100):
            signature.append(((c, hashIdx), h_r[hashIdx]))
    return signature

def reducer_minhash_1(x, y):
    # if hi (r) < M(i, c) then M(i, c) := hi (r);
    if x < y:
        return x
    return y
        
def mapper_minhash_2(line):
    # Input: ((docID, i), M(i, c))
    # Output: (docID, (i, M(i, c)))
    return (line[0][0], (line[0][1], line[1]))

def mapper_minhash_3(line):
    # Input: (docID, (i, M(i, c)), (i, M(i, c))....)
    M = sorted(line[1], key = lambda x : x[0])
    new_M = []
    for m in M:
        new_M.append(m[1])
    return (line[0], new_M)
    
minhash_func = create_hash_func()
new_min_hash_1 = shingleID_doc.flatMap(mapper_minhash_1).reduceByKey(reducer_minhash_1)
new_min_hash_2 = new_min_hash_1.map(mapper_minhash_2).groupByKey().mapValues(list).map(mapper_minhash_3)
# new_1 = new_min_hash_2.collect()
# print(len(new_1), len(new_1[0][1]))
# print(new_1)
min_hash = new_min_hash_2

## Locality-sensitive-hashing
`LSH_band` 會把原本 100 個 min-hash value 兩個兩個 row 一組分成 50 個 band，再經過 `LSH_groupBand` 來整理成以 bandID 為 key，(docID, band-value) 為 value 的 rdd，到這裡就做完了 partition matrix into bands。

在 `LSH_pushBucket` 裡我們設計了一個 hash function: $(3000 b_1 + b_2) \% \text{SHINGLES_SIZE})$，$b_1, b_2$ 為 band 的第一個和第二個值，這樣每個 document 的 band 都被分配到一個 bucketID，就能把它整理成 `(bucketID, docID)`，再使用 `groupByKey().mapValues(list)` 就能把同一個 bucketID 的 documents 串在一起。

After LSH_band: `[(docID, [band1, band2..., band50])...]` (band = [min-hashv1, min-hashv2])

In [34]:
def LSH_band(line): 
    signature = line[1]
    r = int(len(signature) / 50)
    # code splitting signature in b parts
    subvecs = []
    for i in range(0, len(signature), 2):
        subvecs.append(signature[i : i+r])
    return (line[0], subvecs)

lsh_1 = min_hash.map(LSH_band)
#lsh = lsh_1.collect()
#print(len(lsh), lsh)

After LSH_groupBand: `[(bandID,  [(docID, band), (docID, band)])...]` each value of pair with len = 50

In [35]:
def LSH_groupBand(line):
    # Input: (docID, [band1, band2, ...])
    # Output: (bandID, (docID, band))
    # (bandID, [(docID, band), (docID, band), ...])
    res = []
    for i, b in enumerate(line[1]):
        res.append((i, (line[0], b)))
    return res

lsh_2 = lsh_1.flatMap(LSH_groupBand).groupByKey().mapValues(list)
#lsh2 = lsh_2.collect()
#print(lsh2)
    

After LSH_putBucket: `[(bucketID, docID), (bucketID, docID)...]`

Then use groupByKey and mapValues: `[(bucketID, [docID, docID, ...]),...]`

In [36]:
# (a * (3000 + b)) % shingle_size
def LSH_putBucket(line):
    # Input: (bandID, [(docID, band), (docID, band), ...])
    # (bucketID, docID)
    bucket = []
    for idx, pair in enumerate(line[1]):
        b = pair[1]
        bucket.append(((b[0] * 3000 + b[1]) % SHINGLES_SIZE, pair[0]))
    return bucket

In [37]:
lsh_3 = lsh_2.flatMap(LSH_putBucket).groupByKey().mapValues(list)
# lsh3 = lsh_3.collect()
# print(len(lsh3))

`find_candidate` 如果遇到少於一個 documentID 的 bucket 就會回傳 (-1, -1)，其他狀況就會回傳裡面的 document 的兩個兩個排列組合為 key，value 為 1 的 key-value pair，並在外面把 (-1, -1) 用 `filter` 過濾掉，並用 `reduceByKey` 去算 candidate pair 被 match 到的數量再用 `sortBy` 讓它照多寡排序。

After find_candidate: `[((docID, docID), 1), ((docID, docID), 1)` (candidate_pair, pair_count)

In [38]:
def find_candidate(line):
    # Input: (bucketID, [docID, docID])
    if len(line[1]) <= 1:
        return ([(-1, -1)])
    if len(line[1]) == 2:
        if (line[1][0] < line[1][1]):
            return ([((line[1][0], line[1][1]), 1)])
        return  ([((line[1][1], line[1][0]), 1)])
    doc = line[1]
    res = []
    for i in range(len(doc)):
        for j in range(i+1, len(doc)):
            if doc[i] < doc[j]:
                res.append(((doc[i], doc[j]), 1))
            else:
                res.append(((doc[j], doc[i]), 1))
    return res

In [39]:
candidate_pairs = lsh_3.flatMap(find_candidate).filter(lambda x: x != (-1, -1)).reduceByKey(lambda x, y: x + y).sortBy(lambda x: x[1], ascending=False)
cp = candidate_pairs.collect()
print(len(cp))
for p in cp:
    print(p)

[Stage 6:>                                                          (0 + 1) / 1]

697
(('052', '084'), 50)
(('012', '020'), 50)
(('030', '035'), 27)
(('047', '049'), 27)
(('023', '038'), 17)
(('049', '088'), 14)
(('048', '049'), 11)
(('047', '088'), 8)
(('047', '048'), 5)
(('045', '091'), 5)
(('014', '040'), 4)
(('043', '059'), 3)
(('034', '060'), 3)
(('030', '081'), 3)
(('050', '051'), 3)
(('028', '061'), 3)
(('034', '079'), 3)
(('057', '058'), 3)
(('057', '070'), 3)
(('035', '090'), 3)
(('040', '070'), 3)
(('040', '066'), 3)
(('074', '098'), 2)
(('098', '099'), 2)
(('084', '095'), 2)
(('052', '095'), 2)
(('035', '068'), 2)
(('030', '068'), 2)
(('042', '097'), 2)
(('016', '077'), 2)
(('059', '072'), 2)
(('009', '045'), 2)
(('035', '084'), 2)
(('035', '052'), 2)
(('013', '060'), 2)
(('034', '035'), 2)
(('027', '057'), 2)
(('002', '095'), 2)
(('058', '070'), 2)
(('008', '094'), 2)
(('067', '100'), 2)
(('033', '042'), 2)
(('015', '073'), 2)
(('057', '093'), 2)
(('025', '031'), 2)
(('045', '099'), 2)
(('021', '030'), 2)
(('041', '094'), 2)
(('003', '035'), 2)
(('010', 

                                                                                

In [37]:
candidate_pairs = lsh_3.flatMap(find_candidate).filter(lambda x: x != (-1, -1)).reduceByKey(lambda x, y: x + y).sortBy(lambda x: x[1], ascending=False)
cp = candidate_pairs.collect()
print(len(cp))
for p in cp:
    print(p)

728
(('052', '084'), 50)
(('012', '020'), 50)
(('047', '049'), 30)
(('030', '035'), 20)
(('048', '049'), 16)
(('023', '038'), 16)
(('049', '088'), 14)
(('047', '088'), 12)
(('047', '048'), 11)
(('014', '040'), 6)
(('030', '082'), 3)
(('073', '077'), 3)
(('023', '047'), 3)
(('014', '061'), 3)
(('032', '076'), 3)
(('085', '088'), 3)
(('047', '066'), 3)
(('049', '066'), 3)
(('050', '063'), 3)
(('045', '091'), 3)
(('059', '062'), 2)
(('038', '042'), 2)
(('031', '083'), 2)
(('058', '060'), 2)
(('058', '066'), 2)
(('044', '071'), 2)
(('050', '051'), 2)
(('051', '082'), 2)
(('050', '082'), 2)
(('060', '082'), 2)
(('007', '023'), 2)
(('069', '089'), 2)
(('057', '066'), 2)
(('068', '088'), 2)
(('040', '066'), 2)
(('036', '077'), 2)
(('011', '088'), 2)
(('057', '093'), 2)
(('059', '070'), 2)
(('028', '056'), 2)
(('013', '087'), 2)
(('068', '080'), 2)
(('047', '068'), 2)
(('049', '068'), 2)
(('023', '068'), 2)
(('049', '080'), 2)
(('040', '047'), 2)
(('023', '049'), 2)
(('040', '049'), 2)
(('023'

jaccard函數將兩個集合的jaccard similarity算出

In [40]:
def jaccard(a: set, b: set):
    return len(a.intersection(b)) / len(a.union(b))

將所有candidate pair hash到超過兩個bucket以上的similarity算出

In [41]:
result = []
for i,v in enumerate(cp):
    if v[1] >= 3:
        result.append((v[0],jaccard(set(doc_shingleID_new[int(v[0][0])-1][1]),set(doc_shingleID_new[int(v[0][1])-1][1]))))


將前10名similarity高的結果print出。

In [42]:
result = sorted(result,key=lambda s: s[1], reverse=True)[0:10]


In [43]:
for i,v in enumerate(result):
    print(v[0],":",round(v[1]*100,2),"%")

('052', '084') : 100.0 %
('012', '020') : 100.0 %
('047', '049') : 75.72 %
('030', '035') : 70.73 %
('049', '088') : 51.31 %
('023', '038') : 48.22 %
('048', '049') : 48.05 %
('014', '040') : 39.77 %
('047', '088') : 38.91 %
('047', '048') : 36.38 %


In [42]:
for i,v in enumerate(result):
    print(v[0],":",round(v[1]*100,2),"%")

('052', '084') : 100.0 %
('012', '020') : 100.0 %
('047', '049') : 75.72 %
('030', '035') : 70.73 %
('049', '088') : 51.31 %
('023', '038') : 48.22 %
('048', '049') : 48.05 %
('014', '040') : 39.77 %
('047', '088') : 38.91 %
('047', '048') : 36.38 %
