# get data body in spark

In [1]:
import re
data = sc.wholeTextFiles('data/reut2-*')
newsArticles = data.map(lambda x:x[1]).flatMap(lambda x:x.split('<BODY>')[1:]).map(lambda x:x.split('</BODY>')[0])\
                   .map(lambda x:re.sub(' +', ' ', x.replace('\n', ' ')))

newsArticles.take(3)

['Paxar Corp said it has acquired Thermo-Print GmbH of Lohn, West Germany, a distributor of Paxar products, for undisclosed terms. Reuter &#3;',
 'Shr 10 cts vs 32 cts Net 975,000 vs 3,145,000 Sales 159.1 mln vs 147.3 mln Reuter &#3;',
 'Key Tronic corp said it has received contracts to provide seven original equipment manufacturers with which it has not done business recently with over 300,000 computer keyboards for delivery within the next 12 months. The company said "The new contracts represent an annual increase of approximately 25 pct in unit volume over last year." Reuter &#3;']

# (1) Given the Reuters-21578 dataset, please calculate all k-shingles and output the set representation of the text dataset as a matrix.

In [2]:
k = 3
shingles = newsArticles.flatMap(lambda x:[x[i:i+k] for i in range(len(x)-k+1)]).distinct()

shingles.take(5)

['Pax', 'r C', 'Cor', 'rp ', 'p s']

In [3]:
shingles_count = shingles.count()
articles_count = newsArticles.count()
print(shingles_count, 'different shingles.')
print(articles_count, 'different articles.')

35077 different shingles.
19043 different articles.


In [4]:
newsArticles = newsArticles.collect()
kShinglesMatrix = shingles.map(lambda s:[1 if s in a else 0 for a in newsArticles])

In [5]:
kShinglesMatrix.coalesce(1).saveAsTextFile('tmp')

In [6]:
!mv tmp/part-00000 result/kShinglesMatrix.txt
!rm -rf tmp

# (2) Given the set representation, compute the minhash signatures of all documents using MapReduce.

In [7]:
def biggerThanNFirstPrime(N):
    p = 2
    while True:
        isPrime = True
        for i in range(2,p//2+1):
            if(p%i==0):
                isPrime = False
                break
        if isPrime and p > N:
            return p
        else:
            p+=1

In [8]:
import random

h = 100
a = [random.randint(0, 100) for i in range(h)]
b = [random.randint(0, 100) for i in range(h)]
p = biggerThanNFirstPrime(articles_count)
N = articles_count

def rowHash(row, a, b, p, N):
    return ((a*row+b)%p)%N

In [9]:
minHashSignatures = list()
kShinglesMatrixZipWithIndex = kShinglesMatrix.zipWithIndex().cache()
for i in range(h):
    minHashSignatures.append(kShinglesMatrixZipWithIndex\
                             .map(lambda x:[rowHash(x[1], a[i], b[i], p ,N) if c == 1 else (articles_count + 10) for c in x[0]])\
                             .reduce(lambda x, y:[Mx if Mx < My else My for Mx, My in zip(x, y)]))

In [10]:
with open('result/minHashSignatures.txt', 'w') as result:
    for row in minHashSignatures:
        result.write(str(row) + '\n')

In [11]:
# import numpy as np
# from operator import add
# bands = 20
# r = 5
# similarRate = 0.8
# buckets = articles_count
# hashFuct = [[random.randint(0, 100) for i in range(r + 1)] for j in range(bands)]

# for i in range(articles_count):
#     candidatePairs = sc.emptyRDD()
#     for j in range(bands):
#         band = sc.parallelize(np.array(minHashSignatures[j*r:j*r+r]).T)\
#                  .zipWithIndex().filter(lambda x:x[1] >= i)\
#                  .map(lambda x:(x[1], (np.array(x[0]).dot(np.array(hashFuct[j][:r])) + hashFuct[j][-1]) % buckets))\
#                  .cache()
#         bucket = band.first()[1]
#         candidatePairs = candidatePairs.union(band.filter(lambda x:x[0] > i and x[1] == bucket).map(lambda x:((i, x[0]), 1)))
#     candidatePairs = candidatePairs.reduceByKey(add).filter(lambda x:x[1] >= bands*similarRate)
#     print(candidatePairs.collect())

In [12]:
import numpy as np
from operator import add
bands = 20
r = 5
similarRate = 0.8
buckets = articles_count
hashFuct = [[random.randint(0, 100) for i in range(r + 1)] for j in range(bands)]

with open('result/candidatePairs.txt', 'w') as result:
    for i in range(articles_count):
        candidatePairs = list()
        for j in range(bands):
            band = np.array(minHashSignatures[j*r:j*r+r]).T
            band = [(np.array(article).dot(np.array(hashFuct[j][:r])) + hashFuct[j][-1]) % buckets for article in band]
            for k, article in enumerate(band):
                if k > i and article == band[i]:
                    candidatePairs.append(k)
        candidatePairs = [(article, candidatePairs.count(article)) for article in set(candidatePairs)]
        candidatePairsTreshold = list()
        for candidatePair in candidatePairs:
            if candidatePair[1] >= bands*similarRate:
                candidatePairsTreshold.append(candidatePair[0])
        result.write('Articles' + str(i) + ':' + str(candidatePairsTreshold) + '\n')