# Import

In [1]:
from pyspark import SparkConf, SparkContext
import pyspark
import random
import binascii

# Mapper

## shingling

將文字 3 個 3 個連再一起，每個是一個 shingle

In [2]:
def shingling(line):
    shingle = []
    for i in range(len(line[1])-2):
        shingle.append( (line[1][i]+line[1][i+1]+line[1][i+2]) )
    print(shingle)
    return (line[0], shingle)

## hash_shingling

這裡把原本 3 個字合再一起的 shingle 經過 hash function 輸出

In [3]:
def hash_shingling(line):
    hash_shingle = []
    
    for i in line[1]:
        hash_shingle.append( binascii.crc32(i.encode()) & 0xffffffff )

    return (line[0], hash_shingle)

## min_hashing

這裡將每個文件的 shingle 轉成 signature，因為需要 100 個 hash function，所以我們需要 a b 各 100 個亂數，同時又不重複，所以前面宣告了 randomA 和 randomB 以免亂數的數字重複，再用 shingle 的總數 N 以及質數 17159 做hash function (後面會解釋)

In [4]:
def min_hashing(line):
    signatures = []

    for i in range(0, 100):
        minhashing = 17159 + 1
        for j in range(len(line[1])):
            hashing = ((randomA[i] * line[1][j] + randomB[i]) % 17159) % 17141
            if hashing < minhashing:
                minhashing = hashing
        signatures.append(minhashing)

    return (line[0], signatures)

## deleteDuplicatedElementFromList

因為之後 candidate pair 會重複計算，所以這裡把重複的 candidate pair 刪除掉

In [5]:
def deleteDuplicatedElementFromList(listA):
    return sorted(set(listA), key = listA.index)

## 讀取資料

因為這次的檔案是要讀取資料夾裡面的資料，所以用 wholeTextFiles 這個 function 來讀全部的資料，然後將文章分成每個字為一個字串

In [6]:
conf = SparkConf().set("spark.default.parallelism", 4).setAppName("LSH")
sc = SparkContext(conf=conf)
data = sc.wholeTextFiles("./athletics").map(lambda line: (line[0][len(line[0])-7:len(line[0])-4], line[1]))\
        .map(lambda line: (line[0], line[1].split())) 


sorted(data.collect()) 

[('001',
  ['Claxton',
   'hunting',
   'first',
   'major',
   'medal',
   'British',
   'hurdler',
   'Sarah',
   'Claxton',
   'is',
   'confident',
   'she',
   'can',
   'win',
   'her',
   'first',
   'major',
   'medal',
   'at',
   'next',
   "month's",
   'European',
   'Indoor',
   'Championships',
   'in',
   'Madrid.',
   'The',
   '25-year-old',
   'has',
   'already',
   'smashed',
   'the',
   'British',
   'record',
   'over',
   '60m',
   'hurdles',
   'twice',
   'this',
   'season,',
   'setting',
   'a',
   'new',
   'mark',
   'of',
   '7.96',
   'seconds',
   'to',
   'win',
   'the',
   'AAAs',
   'title.',
   '"I',
   'am',
   'quite',
   'confident,"',
   'said',
   'Claxton.',
   '"But',
   'I',
   'take',
   'each',
   'race',
   'as',
   'it',
   'comes.',
   '"As',
   'long',
   'as',
   'I',
   'keep',
   'up',
   'my',
   'training',
   'but',
   'not',
   'do',
   'too',
   'much',
   'I',
   'think',
   'there',
   'is',
   'a',
   'chance',
   'of',
  

## 3-shingle

這裡將 3 個字合成一個 shingle

In [7]:
shingle = data.map(shingling)
sorted(shingle.collect()) 

[('001',
  ['Claxtonhuntingfirst',
   'huntingfirstmajor',
   'firstmajormedal',
   'majormedalBritish',
   'medalBritishhurdler',
   'BritishhurdlerSarah',
   'hurdlerSarahClaxton',
   'SarahClaxtonis',
   'Claxtonisconfident',
   'isconfidentshe',
   'confidentshecan',
   'shecanwin',
   'canwinher',
   'winherfirst',
   'herfirstmajor',
   'firstmajormedal',
   'majormedalat',
   'medalatnext',
   "atnextmonth's",
   "nextmonth'sEuropean",
   "month'sEuropeanIndoor",
   'EuropeanIndoorChampionships',
   'IndoorChampionshipsin',
   'ChampionshipsinMadrid.',
   'inMadrid.The',
   'Madrid.The25-year-old',
   'The25-year-oldhas',
   '25-year-oldhasalready',
   'hasalreadysmashed',
   'alreadysmashedthe',
   'smashedtheBritish',
   'theBritishrecord',
   'Britishrecordover',
   'recordover60m',
   'over60mhurdles',
   '60mhurdlestwice',
   'hurdlestwicethis',
   'twicethisseason,',
   'thisseason,setting',
   'season,settinga',
   'settinganew',
   'anewmark',
   'newmarkof',
   'markof7

把原本 3 個字合再一起的 shingle 經過 hash function 輸出

In [8]:
hash_shingle = shingle.map(hash_shingling)
hash_shingle_collect = sorted(hash_shingle.collect())
sorted(hash_shingle.collect())

[('001',
  [2750093452,
   3760738465,
   1564725011,
   2082462026,
   1083243625,
   508890344,
   1725749241,
   942240670,
   163059214,
   1690900502,
   3790971227,
   1273459375,
   4254499661,
   3455381212,
   961734091,
   1564725011,
   3224076784,
   2084611627,
   700519753,
   3888751149,
   1733927853,
   1802718430,
   1569286053,
   3096735205,
   996933242,
   1815209242,
   1187485715,
   3219063005,
   2624421854,
   3415413325,
   3261277580,
   3809408771,
   2538275825,
   4206988877,
   1137006215,
   2488779765,
   1378878566,
   3614104546,
   2321308013,
   1394879431,
   2921760489,
   4128360367,
   273986376,
   2593026064,
   3268876054,
   4072810375,
   2805643321,
   2856214670,
   494040254,
   2678050378,
   325056601,
   2495175397,
   1286175939,
   1896739923,
   790583714,
   1993628137,
   9578444,
   2289133157,
   1278165023,
   3931507013,
   2616443445,
   1237077518,
   1331603640,
   160705287,
   2230914647,
   1464894723,
   895925766,
 

這裡算出 shingle 總共的數量 N 為 17141，而計算 hash function 所需要的 P 是稍大於 N 的質數，算出 N 後找到 P 為 17159

In [9]:
N = []
for i in range(50):
    N.append( len(hash_shingle_collect[i][1]) )
print(sum(N))
P = 17159

17141


宣告 randomA randomB 以用來製造 100 個不同的亂數，用來製造不同的 hash function

In [10]:
randomA = []
time = 100

while time > 0:
    num = random.randint(0, 17159)
    while num in randomA:
        num = random.randint(0, 17159)
    randomA.append(num)
    time = time - 1

randomB = []
time = 100

while time > 0:
    num = random.randint(0, 17159)
    while num in randomB:
        num = random.randint(0, 17159)
    randomB.append(num)
    time = time - 1

## MinHash

將每個文件的 shingle 降維轉成 100 個 signature，利用前面算出的 randomA randomB 和 N P 來對 shingle 做 min hash

In [11]:
signature = hash_shingle.map(min_hashing)
signature_collect = signature.collect()
sorted(signature_collect)

[('001',
  [62,
   145,
   4,
   25,
   11,
   0,
   4,
   23,
   56,
   92,
   0,
   224,
   56,
   4,
   8,
   11,
   0,
   8,
   58,
   156,
   6,
   112,
   12,
   88,
   196,
   19,
   68,
   148,
   219,
   24,
   99,
   64,
   11,
   25,
   12,
   11,
   63,
   71,
   394,
   48,
   15,
   108,
   14,
   288,
   0,
   0,
   136,
   181,
   115,
   2,
   63,
   302,
   173,
   15,
   115,
   54,
   180,
   213,
   17,
   81,
   101,
   129,
   28,
   90,
   31,
   13,
   9,
   130,
   87,
   53,
   203,
   3,
   24,
   18,
   72,
   241,
   13,
   0,
   218,
   42,
   48,
   255,
   10,
   233,
   168,
   111,
   226,
   17,
   76,
   16,
   54,
   152,
   226,
   12,
   16,
   5,
   54,
   120,
   48,
   12]),
 ('002',
  [97,
   55,
   3,
   255,
   99,
   699,
   408,
   134,
   107,
   31,
   58,
   65,
   76,
   551,
   13,
   26,
   88,
   230,
   548,
   60,
   99,
   15,
   295,
   123,
   13,
   46,
   236,
   17,
   70,
   198,
   36,
   193,
   2,
   98,
   104,
   131,

## LSH

因為 band 有 50 個，r 有 2 個，而每個文件有 100 個 signature，所以每個 band 總共有 2 個 signature，r 總共有 1 個
而只要 band 裡面兩個 r 都一樣就可以說是 candidate pair，所以這裡判斷方式只要將 signature 兩兩讀出來，若有兩個文件讀出來的兩個 signature 都一樣我們就 append 到 candidate 裡面，同時也加上兩個文件的 similarity 以便後面拿來比較用，最後再用 deleteDuplicatedElementFromList 將 candidate 裡重複的 candidate pair 刪除掉就好。

In [12]:
candidate = []

for i in range(49):
    for j in range(i+1, 50):
        for k in range(50):
            if( (signature_collect[i][1][2*k], signature_collect[i][1][2*k+1]) ) == ( (signature_collect[j][1][2*k], signature_collect[j][1][2*k+1]) ):
                s1 = set(signature_collect[i][1])
                s2 = set(signature_collect[j][1])
                sim = len(s1.intersection(s2)) / float(len(s1.union(s2)))
                candidate.append( (signature_collect[i][0], signature_collect[j][0], sim) )
final = deleteDuplicatedElementFromList(candidate)
    
print(final)

[('048', '049', 0.5306122448979592), ('048', '028', 0.27631578947368424), ('048', '035', 0.49019607843137253), ('048', '030', 0.4666666666666667), ('048', '047', 0.425531914893617), ('049', '028', 0.3442622950819672), ('049', '035', 0.5641025641025641), ('049', '030', 0.4897959183673469), ('049', '047', 0.6774193548387096), ('003', '038', 0.44285714285714284), ('003', '023', 0.44642857142857145), ('003', '044', 0.3258426966292135), ('017', '016', 0.3), ('014', '040', 0.4107142857142857), ('028', '035', 0.38333333333333336), ('028', '030', 0.484375), ('028', '047', 0.2982456140350877), ('011', '033', 0.2358490566037736), ('005', '035', 0.26582278481012656), ('005', '043', 0.22580645161290322), ('038', '023', 0.43859649122807015), ('012', '020', 1.0), ('013', '021', 0.3333333333333333), ('037', '045', 0.27184466019417475), ('035', '030', 0.7209302325581395), ('035', '047', 0.5142857142857142), ('030', '047', 0.41304347826086957)]


## 輸出

最後我們用 similarity 由大往小 sort 之後，再輸出前 10 大的即可

In [13]:
def sort(line):
    return (float(line[2]),line[1],line[0])
test_rdd = sc.parallelize(final).map(lambda line : sort(line))

result = sorted(test_rdd.collect(),reverse = True)
print(result)

[(1.0, '020', '012'), (0.7209302325581395, '030', '035'), (0.6774193548387096, '047', '049'), (0.5641025641025641, '035', '049'), (0.5306122448979592, '049', '048'), (0.5142857142857142, '047', '035'), (0.49019607843137253, '035', '048'), (0.4897959183673469, '030', '049'), (0.484375, '030', '028'), (0.4666666666666667, '030', '048'), (0.44642857142857145, '023', '003'), (0.44285714285714284, '038', '003'), (0.43859649122807015, '023', '038'), (0.425531914893617, '047', '048'), (0.41304347826086957, '047', '030'), (0.4107142857142857, '040', '014'), (0.38333333333333336, '035', '028'), (0.3442622950819672, '028', '049'), (0.3333333333333333, '021', '013'), (0.3258426966292135, '044', '003'), (0.3, '016', '017'), (0.2982456140350877, '047', '028'), (0.27631578947368424, '028', '048'), (0.27184466019417475, '045', '037'), (0.26582278481012656, '035', '005'), (0.2358490566037736, '033', '011'), (0.22580645161290322, '043', '005')]


## Output

In [14]:
for i in range(10):
    print( str((result[i][1], result[i][2]))+' : '+str(result[i][0])+'\n' )


('020', '012') : 1.0

('030', '035') : 0.7209302325581395

('047', '049') : 0.6774193548387096

('035', '049') : 0.5641025641025641

('049', '048') : 0.5306122448979592

('047', '035') : 0.5142857142857142

('035', '048') : 0.49019607843137253

('030', '049') : 0.4897959183673469

('030', '028') : 0.484375

('030', '048') : 0.4666666666666667

