# Homework 1

You are to implement the stages of finding textually similar documents based on Jaccard similarity using the shingling, minhashing, and locality-sensitive hashing (LSH) techniques and corresponding algorithms. The implementation can be done using any big data processing framework, such as Apache Spark, Apache Flink, or no framework, e.g., in Java, Python, etc. To test and evaluate your implementation, write a program that uses your implementation to find similar documents in a corpus of 5-10 or more documents, such as web pages or emails.

The stages should be implemented as a collection of classes, modules, functions, or procedures depending on the framework and the language of your choice. Below, we describe sample classes implementing different stages of finding textually similar documents. You do not have to develop the exact same classes and data types described below. Feel free to use data structures that suit you best.

In [1]:
# import packages
from pyspark import SparkContext, SparkConf
import hashlib
import random
import numpy as np
import pandas as pd
import findspark

In [2]:
# initializing Spark
findspark.init()
conf = SparkConf().setAppName("SimDoc").setMaster("local[*]")
sc = SparkContext(conf=conf)

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/11/07 17:02:30 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


### Pre-process data:

In [3]:
# collect texts from dataframes
df = pd.read_csv("datasets/custom_job_dataset.csv", delimiter=',')

docus = df['description'].dropna().tolist()

# creating a list of documents where we store all our texts
documents_map = {i: content for i, content in enumerate(docus)}
documents_map = dict(list(documents_map.items())[:10])

# testing text creation
#print(len(documents_map))
#print(documents_map[0])


### Parameters

In [4]:
k = 5  #Shingle length
signature_len = 100  #Length of minhash signatures
similarity_threshold = 0.8  #Similarity threshold
b = 20  #Number of bands in LSH
r = signature_len // b #Rows per band in LSH

### Shingling method

In [5]:
def create_shingles(id, text, k):
    """Creating k-shingles from a given text

    Args:
        id: ID of the document
        text: Given document
        k: Length of the shingles

    Returns:
        ID of the document with the created shingles
    """
    content = text.lower().replace('.', '').replace(',', '')
    shingles = set(text[i:i + k] for i in range(len(text) - k + 1))
    return id, shingles


def generate_hashed_shingles(text):
    """Generate hashed shingles for each document
    
    Args:
        text: Shingles of a given document

    Returns:
        ID of the document and set of the hashed shingles
    """
    id, shingles = text
    hashed_shingles = set(hash(shingle) for shingle in shingles)
    return id, hashed_shingles


### Jaccard Similarity Calculation

In [6]:
def jaccard_similarity(docu1, docu2):
    """Calculate Jaccard similarity between two sets of shingles
    
    Args:
        docu1: First document for comparison 
        docu2: Second document for comparison

    Returns:
        _type_: _description_
    """
    id1, shingles1 = docu1
    id2, shingles2 = docu2
    intersection = shingles1.intersection(shingles2)
    union = shingles1.union(shingles2)
    similarity = len(intersection) / len(union) if union else 0.0
    return (id1, id2), similarity


In [7]:
# convert documents_map to an RDD
jaccard_rdd = sc.parallelize(documents_map.items())

# create shinglings
shingles_rdd = jaccard_rdd.map(lambda doc: create_shingles(doc[0], doc[1], k))
hashed_shingles_rdd = shingles_rdd.map(generate_hashed_shingles)

# getting all pairs
jaccard_pairs = hashed_shingles_rdd.cartesian(hashed_shingles_rdd).filter(lambda x: x[0][0] < x[1][0]) #remove duplicated pairs

In [8]:
# computing Jaccard similarities 
jaccard_pairs_with_similarities = jaccard_pairs.map(lambda pair: jaccard_similarity(pair[0], pair[1]))
jaccard_pairs_threshold = jaccard_pairs_with_similarities.filter(lambda x: x[1] >= similarity_threshold)

In [9]:
# print out the pairs which are similar enough based on the threshold
result = jaccard_pairs_threshold.collect()
for item in result:
    print(item)

                                                                                

((4, 5), 0.9680111265646731)
((4, 8), 0.9789915966386554)
((5, 8), 0.9667128987517337)


### MinHash

- source of next_prime: http://compoasso.free.fr/primelistweb/page/prime/liste_online_en.php
- minHash source: https://github.com/chrisjmccormick/MinHash/blob/master/runMinHashExample.py
- hash function: h(x) = (a*x + b) % max_shingle_id
    - a, b: random coefficients - these are fixed we generate it ones
    - always choose the minimum from the hashed values


In [10]:
max_shingle_id = 2**32-1

#  h(x) = (a*x + b) % max_shingle_id
# a, b: random coefficients

def get_coefficients():
    #max_shingle_id: it can be any integer number based on the range for the coeffs
    coeffs = []
    while len(coeffs) < signature_len:
        rand_idx = random.randint(1, max_shingle_id) 
        #print('anyad')
        while rand_idx in coeffs:
            rand_idx = random.randint(1, max_shingle_id)
            #print('apad')
        coeffs.append(rand_idx)
    return coeffs

coeffs_a = get_coefficients()
coeffs_b = get_coefficients()
    
def get_minhash_signature(hashed_shingles):
    signature = []
    for i in range(signature_len):
        min_hash_code = min([(coeffs_a[i] * shingle + coeffs_b[i]) % max_shingle_id for shingle in hashed_shingles])
        signature.append(min_hash_code)
    return signature

def minhash_docu(id, hashed_shingles):
    return id, get_minhash_signature(hashed_shingles)

In [11]:
def signature_similarity(docu1, docu2, signature_len):
    id1, signature1 = docu1
    id2, signature2 = docu2
    
    agree_cnt = sum(1 for i in range(signature_len) if signature1[i] == signature2[i])
    similarity = agree_cnt / signature_len
    
    return (id1, id2), similarity

In [12]:
minhash_rdd = hashed_shingles_rdd.map(lambda doc: minhash_docu(doc[0], doc[1]))
minhash_pairs = minhash_rdd.cartesian(minhash_rdd).filter(lambda x: x[0][0] < x[1][0]) # remove duplicates
minhash_with_similarities = minhash_pairs.map(lambda doc: signature_similarity(doc[0], doc[1], signature_len))
minhash_threshold = minhash_with_similarities.filter(lambda x: x[1] >= similarity_threshold)

In [13]:
result = minhash_threshold.collect()
for item in result:
    print(item)



((4, 5), 0.98)
((4, 8), 0.98)
((5, 8), 0.96)


                                                                                

### LSH

In [14]:
def create_bands_by_signature(doc, b, r):
    """Split signature vectors into bands

    Args:
        doc : It's a pair ina  following format (docu_id, signature)
        b : Number of bands
        r: Number of row in each band

    Returns:
        A list where every element is a ((band_id, band_hash), docu_id)
    """
    docu_id, signature = doc
    bands = []
    for i in range(b):
        start_index = i * r
        end_index = start_index + r
        band = tuple(signature[start_index:end_index])  #Convert to tuple for consistency in hashing
        bands.append(((i, hash(band)), docu_id))  #Use (band_id, band_hash) as the key
    return bands


def generate_candidate_pairs(docu):
    """Generating candidate pairs from buckets and avoid duplicates

    Args:
        docu: ((band_id, band_hash), [docu_ids]), where the pair first element is a pair containing the id and the hash of the given band 
            and the pair second element is a list of document ids whose hashed value in the same band are the same

    Returns:
        Candidate pairs where every element is a (docu_id1, docu_id2)
    """
    _, docu_ids = docu
    candidates = []
    if len(docu_ids) > 1:
        for i in range(len(docu_ids)):
            for j in range(i + 1, len(docu_ids)):
                candidates.append((docu_ids[i], docu_ids[j]))
    return candidates


def compute_signature_similarity(docus_with_signitures):
    """Computing similarities between 2 documents in the same band

    Args:
        docus_with_signitures: Getting te data in the following format: ((docu_id1, docu_id2), [(docu_id1, signature), (docu_id2, signature)])

    Returns:
        Document is pairs with the similarity between the 2 documents
    """
    (docu_id1, docu_id2), docs_signatures = docus_with_signitures
    sig_dict = {doc_id: signature for doc_id, signature in docs_signatures}
    if docu_id1 in sig_dict and docu_id2 in sig_dict:
        signature1 = sig_dict[docu_id1]
        signature2 = sig_dict[docu_id2]
        # Calculate the similarity based on matching elements in the signatures
        agree_cnt = sum(1 for i in range(signature_len) if signature1[i] == signature2[i])
        similarity = agree_cnt / signature_len
        return ((docu_id1, docu_id2), similarity)
    else:
        # One of the signatures is missing; return zero similarity
        return ((docu_id1, docu_id2), 0.0)

In [15]:
#Contains ((band_id, band_hash), docu_id) pairs
band_buckets_rdd = minhash_rdd.flatMap(lambda doc: create_bands_by_signature(doc, b, r))

#Group documents by (band_id, band_hash) to get lists of doc_ids in the same bucket so we get the following format ((band_id, band_hash), [docu_ids])
bucketed_docus_rdd = band_buckets_rdd.groupByKey().mapValues(list)
# Generating candidate pairs from buckets with more than one document in the format os (docu_id1, docu_id2)
candidate_pairs_rdd = bucketed_docus_rdd.flatMap(lambda docu: generate_candidate_pairs(docu)).distinct()
# ((docu_id1, (docu_id1, docu_id2)), (docu_id2, (docu_id1, docu_id2))) to make join with the minhash_rdd
candidate_pairs_with_keys_rdd = candidate_pairs_rdd.flatMap(lambda pair: [(pair[0], pair), (pair[1], pair)])

# Attaching each document's signature to its part of the candidate pair
# Joined the candidate_pairs_with_keys with minhash_rdd and get the following format: (docu_id, ((docu_id1, docu_id2), signature))
joined_rdd = candidate_pairs_with_keys_rdd.join(minhash_rdd)

# ((docu_id1, docu_id2), (docu_id, signature)) where key is the pair and we generate 2 result where docu_id=docu_id1 and in the other one docu_id=docu_id2
signatures_by_pair_rdd = joined_rdd.map(lambda x: (x[1][0], (x[0], x[1][1])))

# Grouping by candidate pair to collect both signatures in the following format ((docu_id1, docu_id2), [(docu_id1, signature), (docu_id2, dignature)])
grouped_signatures_rdd = signatures_by_pair_rdd.groupByKey().mapValues(list)

# Calculating similarities and get the following format: ((docu_id1, docu_id2), similarity)
similarities_rdd = grouped_signatures_rdd.map(lambda pair: compute_signature_similarity(pair))
# Filtering with the given threshold
filtered_similar_docs_rdd = similarities_rdd.filter(lambda x: x[1] >= similarity_threshold)


In [16]:
# Print the results
result = filtered_similar_docs_rdd.collect()
for item in result:
    print(item)



((4, 5), 0.98)
((4, 8), 0.98)
((5, 8), 0.96)


                                                                                