In [9]:
from pyspark.sql import Row
from pyspark.sql import SparkSession
import numpy as np
import pandas as pd
from pyspark.sql.functions import lit
from pyspark.sql.window import Window
from pyspark.sql.functions import lit, row_number


In [10]:
spark = SparkSession.builder.appName(
"Analyzing the vocabulary of Pride and Prejudice."
).getOrCreate()

In [13]:
class Shingler:
    def __init__(self, k):
        self.shinglings_hashes = dict() # hash function is just  converting the shingle to a unique integer
        self.hash_to_shingle = {}
        self.shingle_hash_start_number = 0
        self.k = k
        
    def hash_shingle(self,shingle):
        if shingle in self.shinglings_hashes:
            return self.shinglings_hashes[shingle]
        else:
            self.shinglings_hashes[shingle] = self.shingle_hash_start_number
            self.hash_to_shingle[self.shingle_hash_start_number] = shingle
            self.shingle_hash_start_number += 1
            return self.shingle_hash_start_number - 1
            
    def to_binary_shingle(self, document):
        all_shingles = self.get_all_shingles()
        binary_shingles = []
        for shingle in all_shingles:
            if shingle in document:
                binary_shingles.append(1)
            else:
                binary_shingles.append(0)
        assert len(binary_shingles) == len(all_shingles)
        return binary_shingles

    def get_all_shingles(self):
        return list(range(0, self.shingle_hash_start_number))
    
    def create_shingles(self, document): 
        shingles = set()
        un_shingles = set()
        for i in range(0, len(document)-self.k+1 ):
            un_shingles.add(document[i:i+self.k])
            shingles.add(self.hash_shingle(document[i:i+self.k]))
        return shingles

    def to_text(self, s):
        return self.hash_to_shingle[s]

In [14]:
def find_jaccard_similarity(A,B):
    return len(A & B) / len(A | B)

## Definitions

In [15]:
k = 3

## Part 1: Represent all documents as a  vector of ordered set of shingles

In [16]:
def find_jaccard_similarity_matrix(signatures):
    jaccard_similarity_matrix = np.full((len(signatures), len(signatures)), -1.0)
    for i in range(0, len(signatures)):
        doci = signatures[i]
        for j in range(0, len(signatures)):
            if i==j:
                jaccard_similarity_matrix[i,j]=None
            else:
                docj = signatures[j]
                jaccard_similarity_matrix[i,j] = find_jaccard_similarity(doci, docj)
    return jaccard_similarity_matrix

def to_spark_df(matrix, elements=False):
    transposed_matrix = np.array(matrix).T.tolist()

    # Create the Spark DataFrame
    data = [Row(*row) for row in transposed_matrix]
    columns = [f'col_{i}' for i in range(0,len(matrix))]
    if elements:
        columns[-1] = "elements"
    df = spark.createDataFrame(data, columns)
    return df

In [17]:
# Read and shingle documents
all_documents = []
documents_path = "../news_dataset"
shingler = Shingler(k)
all_files = []
for i in range(1,8):
    with open(f"{documents_path}/{i}.txt", "r") as file:
        file_contents = file.read()
        all_files.append(file_contents)
        shingles = shingler.create_shingles(file_contents)
        all_documents.append(shingles)


In [18]:
find_jaccard_similarity(all_documents[5], all_documents[4]), find_jaccard_similarity(all_documents[4], all_documents[6]), find_jaccard_similarity(all_documents[5], all_documents[6])

(0.5397653194263363, 0.20153550863723607, 0.19412878787878787)

## Find Jaccard similarity between sets 

In [19]:
jaccard_similarity_matrix = find_jaccard_similarity_matrix(all_documents)
jaccard_df = to_spark_df(jaccard_similarity_matrix)

jaccard_df.show()

+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+
|              col_0|              col_1|              col_2|              col_3|              col_4|              col_5|              col_6|
+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+
|                NaN|0.32629399585921326|0.22456575682382135| 0.2202583276682529|0.21645313553607554|0.22005383580080753| 0.2551632245169887|
|0.32629399585921326|                NaN|0.21068904593639576|0.19218241042345277|0.18698060941828254|0.18566176470588236|0.23187732342007436|
|0.22456575682382135|0.21068904593639576|                NaN|0.20633484162895926| 0.2111913357400722| 0.2138364779874214|0.21227621483375958|
| 0.2202583276682529|0.19218241042345277|0.20633484162895926|                NaN| 0.5548128342245989| 0.5044929396662388| 0.1998069498069498|
|0.216

## MinHashing Similarity

In [20]:
import hashlib

In [26]:
class MyHash:

    def __init__(self, a, b):
        self.a= a
        self.b = b
        self.c = 21323

    def hash(self, element):
        return (self.a * element + self.b) % self.c

    def get_min_hash(self, document):
        min_hash = np.inf
        for shingle in document:
            hash_value = self.hash(shingle)
            if hash_value < min_hash:
                min_hash = hash_value
        return min_hash

class Signature:
    def __init__(self, hash_funcs):
        self.hash_funcs = hash_funcs

    def get_signature(self, document):
        doc_signature = []
        for hashf in self.hash_funcs:
            min_hash = hashf.get_min_hash(document)
            doc_signature.append(min_hash)
        return doc_signature

In [35]:
#create signature matrix


def create_signatures(all_documents, k):
    hash_funcs = [MyHash(i+10, i+30) for i in  range(0, k)]
    signature_builder = Signature(hash_funcs)
    signatures = []
    for document in all_documents:
        doc_signature = signature_builder.get_signature(document)
        signatures.append(doc_signature)
    assert len(signatures) == len(all_documents)
    assert len(signatures[0]) == len(signatures[1]) == k
    return signatures

def calculate_min_hash_similarity(signatures):
    signatures = np.array(signatures)
    similarity_matrix = np.full((len(signatures), len(signatures)), -1.0)
    for i, doc1 in enumerate(signatures):
        for j, doc2 in enumerate(signatures):
            if i==j:
                similarity_matrix[i,j] = 1.0
            else:
                doc1 = np.array(doc1)
                doc2 = np.array(doc2)
                similarity =  sum(doc1 == doc2)/len(doc1)
                similarity_matrix[i,j] = similarity
    return similarity_matrix

def mean_error(signatures, similarity_matrix, jaccard_similarity_matrix ):
    mean_squared_error = 0
    counter=0
    for i in range(0, len(signatures)):
        for j in range(0, len(signatures)):
            if i>=j:
                continue
            minhash = similarity_matrix[i,j]
    
            jaccard = jaccard_similarity_matrix[i][j]
            mean_squared_error = mean_squared_error + (minhash-jaccard)**2
            counter +=1
    error = mean_squared_error/counter
    return error

In [44]:
# compare documents
def tune_num_hash_funcs(all_documents): 
    num_hash_func = [100,200,300,400,500]
    min_error = np.inf
    for k in num_hash_func:
        signatures = create_signatures(all_documents, k)
        similarity_matrix = calculate_min_hash_similarity(signatures)
        error = mean_error(signatures, similarity_matrix, jaccard_similarity_matrix )
        if error<min_error:
            best_num = k
            min_error = error
    
    print(f"min error for {best_num}: {min_error}") 
    return best_num
    
num_hash = tune_num_hash_funcs(all_documents)  

min error for 500: 0.0005424679368489426


### Compare

In [45]:
signatures = create_signatures(all_documents, k)
similarity_matrix = calculate_min_hash_similarity(signatures)
similarity_matrix_df = to_spark_df(similarity_matrix)
similarity_matrix_df.show()
jaccard_df.show()

+-----+-----+-----+-----+-----+-----+-----+
|col_0|col_1|col_2|col_3|col_4|col_5|col_6|
+-----+-----+-----+-----+-----+-----+-----+
|  1.0|0.342|0.236|0.266| 0.25| 0.24|0.238|
|0.342|  1.0|0.254|0.222|0.218|0.216|0.264|
|0.236|0.254|  1.0|0.204|  0.2|0.212| 0.21|
|0.266|0.222|0.204|  1.0|0.544|0.474|0.196|
| 0.25|0.218|  0.2|0.544|  1.0|0.526|0.206|
| 0.24|0.216|0.212|0.474|0.526|  1.0|0.198|
|0.238|0.264| 0.21|0.196|0.206|0.198|  1.0|
+-----+-----+-----+-----+-----+-----+-----+

+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+
|              col_0|              col_1|              col_2|              col_3|              col_4|              col_5|              col_6|
+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+
|                NaN|0.32629399585921326|0.22456575682382135| 0.2202583276682529|0.216453

## LHS

In [53]:
def build_bands(signature, b):
    step = len(signature) // b
    signature_in_bands = []   
    for i in range(0, len(signature), step):
        signature_in_bands.append(signature[i:i+step])
    if i+k < len(signature):
        signature_in_bands.append(signature[i+step:-1]) 
    return signature_in_bands

class CustomHash:
    def __init__(self, num_buckets):
        self.num_buckets = num_buckets
        
    def hash(self, value):
        return hash(frozenset(value)) % self.num_buckets
        
def are_docs_possibly_similar(sign1, sign2):  
    pairs_to_compare = []
    myhash = CustomHash(100)
    for band1, band2 in zip(sign1,sign2):
        b1 = myhash.hash(band1)
        b2 = myhash.hash(band2)
        if b1 == b2:
            return True
    return False

# def which_bucket(sign1, sign2):  
#     pairs_to_compare = []
#     myhash = CustomHash(100)
#     for band1, band2 in zip(sign1,sign2):
#         b1 = myhash.hash(band1)
#         b2 = myhash.hash(band2)
#         if b1 == b2:
#             return b1,b2
#     return False

In [54]:
all_signatures_in_bands = list(map(lambda x: build_bands(x, 100), signatures))
pairs_to_compare = []
for i, doc1_sign in enumerate(all_signatures_in_bands):
    for j, doc2_sign in enumerate(all_signatures_in_bands):
        if i>=j:
            continue
        if are_docs_possibly_similar(doc1_sign, doc2_sign):
            pairs_to_compare.append(((i, signatures[i]), (j, signatures[j])))

In [55]:
list(map(lambda x: (x[0][0], x[1][0]), pairs_to_compare))

[(0, 1),
 (0, 2),
 (0, 3),
 (0, 4),
 (0, 6),
 (1, 2),
 (1, 3),
 (1, 4),
 (1, 5),
 (1, 6),
 (2, 3),
 (2, 5),
 (2, 6),
 (3, 4),
 (3, 5),
 (4, 5),
 (4, 6)]

# End

### Notes

In [42]:
all_docs_encoded = list(map(lambda x: shingler.to_binary_shingle(x), all_documents))
shingles = shingler.get_all_shingles()
all_docs_encoded.append(shingles)
docs_df = to_spark_df(all_docs_encoded, elements=True)

In [17]:
# elements_data = docs_df.select("elements").rdd.flatMap(lambda x: x).collect()
# elements_data

In [10]:
# shingles_order = range(0,len(shingles))
# shingles_order

range(0, 16529)

In [18]:
from pyspark.sql.functions import rand

In [57]:
signatures = []
df = docs_df
rows = df.collect()
signature_size = len(all_documents)
signatures = []
num_permutations = 3
for n in range(0, num_permutations):
    permuted_df = df.orderBy(rand())
    permuted_rows = permuted_df.collect()
    signature = [-1] * signature_size 
    print(permuted_rows)
    for (num_row, row) in enumerate(permuted_rows):
        # print("signature", signature)
        for i in range(0, signature_size):
            col = row[f"col_{i}"]
            # print(col, signature[col])
            if signature[col]<0 and col==1:
                signature[i]=num_row
                
            
        # if np.all(signature)>0:
        #     break
    signatures.append(signature)   

[Row(col_0=0, col_1=0, col_2=0, col_3=0, col_4=0, col_5=1, elements=15108), Row(col_0=0, col_1=0, col_2=1, col_3=0, col_4=0, col_5=0, elements=8578), Row(col_0=0, col_1=1, col_2=0, col_3=0, col_4=0, col_5=0, elements=4781), Row(col_0=0, col_1=1, col_2=0, col_3=0, col_4=0, col_5=0, elements=6140), Row(col_0=0, col_1=1, col_2=0, col_3=0, col_4=0, col_5=0, elements=4966), Row(col_0=0, col_1=0, col_2=0, col_3=0, col_4=1, col_5=1, elements=11627), Row(col_0=0, col_1=0, col_2=0, col_3=1, col_4=0, col_5=0, elements=9714), Row(col_0=1, col_1=0, col_2=0, col_3=0, col_4=1, col_5=0, elements=24), Row(col_0=0, col_1=0, col_2=0, col_3=0, col_4=0, col_5=1, elements=12836), Row(col_0=0, col_1=1, col_2=0, col_3=0, col_4=0, col_5=0, elements=5600), Row(col_0=0, col_1=0, col_2=1, col_3=0, col_4=0, col_5=0, elements=8455), Row(col_0=0, col_1=0, col_2=0, col_3=0, col_4=0, col_5=1, elements=16444), Row(col_0=1, col_1=0, col_2=0, col_3=0, col_4=0, col_5=0, elements=3110), Row(col_0=0, col_1=0, col_2=0, col_

IOPub data rate exceeded.
The Jupyter server will temporarily stop sending output
to the client in order to avoid crashing it.
To change this limit, set the config variable
`--ServerApp.iopub_data_rate_limit`.

Current values:
ServerApp.iopub_data_rate_limit=1000000.0 (bytes/sec)
ServerApp.rate_limit_window=3.0 (secs)

