In [1]:
from pyspark.sql import SparkSession
spark = SparkSession.\
        builder.\
        appName("pyspark-notebook").\
        master("spark://spark-master:7077").\
        config("spark.executor.memory", "512m").\
        getOrCreate()

In [2]:
import os
import time
import itertools
import collections
import numpy as np
import matplotlib.pyplot as plt

In [3]:
import os
TARGET = 'sampledocs/'
print(os.listdir(TARGET))
documents = []
for article in os.listdir(TARGET):
    if article == 'stopwords.txt':
        continue
    if article == '.ipynb_checkpoints':
        continue
    path = os.path.join(TARGET, article)
    with open(path, 'r') as file:
        documents.append(file.read())
        
stopwords = []
with open(os.path.join(TARGET, 'stopwords.txt'), 'r') as file:
    for line in file:
        stopwords.append(line.strip())
        
        
for i, doc in enumerate(documents):
    doc = doc.strip().replace('\n', ' ').lower()
    for word in stopwords:
        doc = doc.replace(' '+word+' ', ' ')
    documents[i] = doc

print(f"Average char-length: \
{np.mean(np.array([len(x) for x in documents]))}")
print(f"Min char-length: {min(len(x) for x in documents)}")
print(f"Max char-length: {max(len(x) for x in documents)}")

['article5.txt', 'article2.txt', 'article1.txt', 'article6.txt', '.ipynb_checkpoints', 'stopwords.txt', 'article3.txt', 'article4.txt']
Average char-length: 3627.1666666666665
Min char-length: 2412
Max char-length: 5873


In [4]:
def getShingles(str1, K=5):
    d1 = set()
    for i in range(len(str1)-K):
        d1.add(str1[i:i+K])
    print(f"Found {len(d1)} unique shingles, out of {len(str1)} possible.")
    return d1
doc_shingles = [getShingles(s, 5) for s in documents]

Found 3953 unique shingles, out of 5873 possible.
Found 2091 unique shingles, out of 3291 possible.
Found 2060 unique shingles, out of 3009 possible.
Found 2245 unique shingles, out of 3505 possible.
Found 2782 unique shingles, out of 3673 possible.
Found 1918 unique shingles, out of 2412 possible.


In [6]:
def jaccardSim(d1,d2):
    return len(d1.intersection(d2))/len(d1.union(d2))

# itertools.combinations finds all (,n) n-pairs
# then we use a map op on the tuples with jaccardSim
pairs = itertools.combinations(documents, 2)
pair_labels = []
pair_sims = []
for x1, x2 in itertools.combinations(zip(range(len(doc_shingles)),doc_shingles), 2):
    pair_labels.append((x1[0],x2[0]))
    pair_sims.append(jaccardSim(x1[1],x2[1]))
    
print(f"**~~~~~~ True similarity scores ~~~~~~**")
print("Pair\tScore")
print("-"*14)
for pair, score in zip(pair_labels, pair_sims):
    print(f"{pair}\t{score:.3f}")

**~~~~~~ True similarity scores ~~~~~~**
Pair	Score
--------------
(0, 1)	0.294
(0, 2)	0.336
(0, 3)	0.384
(0, 4)	0.083
(0, 5)	0.400
(1, 2)	0.093
(1, 3)	0.704
(1, 4)	0.081
(1, 5)	0.051
(2, 3)	0.090
(2, 4)	0.069
(2, 5)	0.050
(3, 4)	0.079
(3, 5)	0.178
(4, 5)	0.052


In [7]:
# Take union of all sets. Convert to an array and assign
# each element an integer based on position in array
fullset = set.union(*doc_shingles)
shingle_dict = dict(zip(list(fullset),range(len(fullset))))
print(f"There are {len(shingle_dict)} shingles")

There are 7538 shingles


In [8]:
# Create a hash function
# define as a callable class, so that we only
# intialize random functions once
class HashManager():
    def __init__(self, shingle_dict):
        self.shingle_dict = shingle_dict
        self.N = len(shingle_dict)
        self.params = None
        
    def _initParams(self, n_sig):
        self.params = np.random.randint(self.N, size=[n_sig,2])
    
    def _permuteRow(self, row):
        return (self.params@np.array([1,row]))%self.N
    
    def __call__(self, docs, n_sig, init=True):
        # Initialize if we change signature matrix length
        # or if we request to re-initialize
        if self.params is None or len(self.params) != n_sig or init:
            self._initParams(n_sig)
            
        #initialize signature matrix
        sig = np.full((n_sig, len(docs)), np.inf)
        
        # each doc in docs is assumed to be an iterable object
        for j, doc in enumerate(docs):
            for shingle in doc:
                orig_row = shingle_dict[shingle]
                curr_col = self._permuteRow(orig_row)
                sig[:,j] = np.minimum(sig[:,j],curr_col)
        return sig.astype(int)
    
# run some tests:
try:
    print("Initialization test: ", end="")
    hm = HashManager(shingle_dict)
    print("passed")

    print("Set parameters to right size: ", end="")
    hm._initParams(n_sig=4)
    assert(hm.params.shape == (4,2))
    print("passed")

    print("Permuting a row integer returns array: ", end="")
    curr_col = hm._permuteRow(3)
    assert(curr_col.shape == (4,))
    print("passed")

    print("Compute minhashed signature matrix: ", end="")
    hm(doc_shingles, 4)
    print("passed")
except Exception as e:
    print("failure")
    print(e.args)

Initialization test: passed
Set parameters to right size: passed
Permuting a row integer returns array: passed
Compute minhashed signature matrix: passed


In [9]:
hm = HashManager(shingle_dict)

In [10]:
def trueSimScores(doc_shingles):
    pair_labels = []
    pair_sims = []
    idxs = range(len(doc_shingles))
    for x1, x2 in itertools.combinations(zip(idxs,doc_shingles), 2):
        pair_labels.append((x1[0], x2[0]))
        pair_sims.append(jaccardSim(x1[1], x2[1]))
    return dict(zip(pair_labels, pair_sims))
    
def sigSimScores(sig_mat):
#     cols = [sig_mat[:,i] for i in range(sig_mat.shape[1])]
    cols = sig_mat.T
    idxs = range(sig_mat.shape[1])
    
    pair_labels = []
    pair_sims = []
    for (i,col1), (j,col2) in itertools.combinations(zip(idxs, cols),2):
        pair_labels.append((i,j))
        pair_sims.append(np.mean(col1==col2))
    
    return dict(zip(pair_labels, pair_sims))

def printScoreComparison(true_dict, approx_dict):
    print(f"**~~~~~~ Similarity score comparison ~~~~~~**")
    print("Pair\t\tApprox\t\tTrue\t\t%Error")
    for pair, true_value in true_dict.items():
        approx_value = approx_dict[pair]
        err = 100*abs(true_value-approx_value)/true_value
        print(f"{pair}\t\t{approx_value:.3f}\t\t{true_value:.3f}\t\t{err:.2f}")

def candidatePairs(score_dict, threshold):
    return set(pair for pair, scr in score_dict.items() if scr>=threshold)

def accMatrix(true_dict, approx_dict, threshold):
    true_pairs = candidatePairs(true_dict, threshold)
    approx_pairs = candidatePairs(approx_dict, threshold)
    false_negatives = len(true_pairs - approx_pairs)
    false_positives = len(approx_pairs - true_pairs)
    print(f"False negatives: {false_negatives}")
    print(f"Potential false positives: {false_positives}")

sig_mat = hm(doc_shingles, 10)
true_score_dict = trueSimScores(doc_shingles)
approx_score_dict = sigSimScores(sig_mat)
printScoreComparison(true_score_dict, approx_score_dict)

print("True pairs:",candidatePairs(true_score_dict, 0.25))
print("Candidate pairs:",candidatePairs(approx_score_dict, 0.25))
accMatrix(true_score_dict, approx_score_dict, 0.4)

**~~~~~~ Similarity score comparison ~~~~~~**
Pair		Approx		True		%Error
(0, 1)		0.500		0.294		69.78
(0, 2)		0.400		0.336		18.97
(0, 3)		0.500		0.384		30.17
(0, 4)		0.200		0.083		141.05
(0, 5)		0.600		0.400		49.93
(1, 2)		0.300		0.093		221.78
(1, 3)		0.800		0.704		13.68
(1, 4)		0.200		0.081		147.01
(1, 5)		0.200		0.051		289.08
(2, 3)		0.300		0.090		233.80
(2, 4)		0.000		0.069		100.00
(2, 5)		0.100		0.050		100.48
(3, 4)		0.200		0.079		152.47
(3, 5)		0.400		0.178		125.16
(4, 5)		0.200		0.052		286.93
True pairs: {(0, 1), (1, 3), (0, 5), (0, 3), (0, 2)}
Candidate pairs: {(0, 1), (1, 2), (1, 3), (0, 5), (2, 3), (0, 3), (0, 2), (3, 5)}
False negatives: 0
Potential false positives: 4


In [73]:
def bandedCandidatePair(col1, col2, b, r):
    """Returns a boolean if the two columns are a candidate pair
    inputs must obey n=len(col1)=len(col2)=b*r"""
    n = len(col1)
    assert(n==b*r)
    assert(n==len(col2))
    truth_array = (col1==col2)
    return any(all(band) for band in np.array_split(truth_array,b))

def bandedCandidatePairs(sig_mat, b, r):
    d = sig_mat.shape[1]
    idxs = range(d)
    cols = [sig_mat[:,i] for i in range(d)]
    pairs = set()
    for (i,col1), (j,col2) in itertools.combinations(zip(idxs,cols),2):
        if bandedCandidatePair(col1,col2,b,r):
            pairs.add((i,j))
    return pairs

# set p = 0.3 arbitrarily
p = 0.3
n = 120
b = 30
r = 4

# see how many candidate pairs we got right!
sig_mat = hm(doc_shingles, n)
true_score_dict = trueSimScores(doc_shingles)
approx_score_dict = sigSimScores(sig_mat)
print("True pairs:",candidatePairs(true_score_dict, p))
print("LSH pairs:",bandedCandidatePairs(sig_mat, b, r))
print("Vanilla MinHash pairs:",candidatePairs(approx_score_dict, p))
# accMatrix(true_score_dict, approx_score_dict, 0.4)

# sig_mat = hm(doc_shingles, n)
# true_score_dict = trueSimScores(doc_shingles)
printScoreComparison(true_score_dict, approx_score_dict)

True pairs: {(0, 2), (0, 4)}
LSH pairs: {(0, 1), (0, 2), (0, 4)}
Vanilla MinHash pairs: {(0, 1), (0, 2), (0, 4)}
**~~~~~~ Similarity score comparison ~~~~~~**
Pair		Approx		True		%Error
(0, 1)		0.383		0.294		30.17
(0, 2)		0.400		0.336		18.97
(0, 3)		0.175		0.083		110.92
(0, 4)		0.508		0.400		27.02
(1, 2)		0.175		0.093		87.70
(1, 3)		0.125		0.081		54.38
(1, 4)		0.183		0.051		256.66
(2, 3)		0.167		0.069		141.99
(2, 4)		0.150		0.050		200.71
(3, 4)		0.133		0.052		157.95
