<a href="https://colab.research.google.com/github/sehab1611251/Pairwise-Document-Similarity-Computation-for-Big-Data/blob/main/Documents_All_Pairs_Similarity.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
!pip install --upgrade beir

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting beir
  Downloading beir-1.0.1.tar.gz (50 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m50.3/50.3 kB[0m [31m2.2 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting sentence-transformers (from beir)
  Downloading sentence-transformers-2.2.2.tar.gz (85 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m86.0/86.0 kB[0m [31m4.9 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting pytrec_eval (from beir)
  Downloading pytrec_eval-0.5.tar.gz (15 kB)
  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting faiss_cpu (from beir)
  Downloading faiss_cpu-1.7.4-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (17.6 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m17.6/17.6 MB[0m [31m68.4 MB/s[0m eta [36m0:00:00[0m

In [None]:
from beir import util, LoggingHandler
from beir.datasets.data_loader import GenericDataLoader
from beir.retrieval.evaluation import EvaluateRetrieval
from beir.retrieval.search.dense import DenseRetrievalExactSearch as DRES
import logging
import os

logging.basicConfig(format='%(asctime)s - %(message)s',datefmt='%Y-%m-%d %H:%M:%S',level=logging.INFO,handlers=[LoggingHandler()])

  from tqdm.autonotebook import tqdm


In [None]:
# importing the Dataset
dataset = "trec-covid"
url = "https://public.ukp.informatik.tu-darmstadt.de/thakur/BEIR/datasets/{}.zip".format(dataset)
out_dir = "datasets"
data_path = util.download_and_unzip(url, out_dir)

time: 1.1 ms (started: 2023-05-17 13:23:53 +00:00)


In [None]:
corpus, queries, qrels = GenericDataLoader(data_folder=data_path).load(split="test")

  0%|          | 0/171332 [00:00<?, ?it/s]

time: 1.84 s (started: 2023-05-17 13:24:05 +00:00)


**Since Dataset is large and takes so much time for computation, I am taking a subset of the data.**

In [None]:
subset_size = 500 # The size of the subset

# Create a new dictionary to store the subset of the corpus
corpus_subset = {}

# Iterate over the items in the corpus and add them to the subset until the desired size is reached
for doc_id, doc_text in corpus.items():
    corpus_subset[doc_id] = doc_text
    if len(corpus_subset) == subset_size:
        break

In [None]:
len(corpus_subset)

500

time: 3.02 ms (started: 2023-05-17 13:24:19 +00:00)


In [None]:
# Print the first 5 entries in the corpus dictionary
print("Corpus:")
for i, (doc_id, doc_text) in enumerate(corpus_subset.items()):
    print(f"  Document ID: {doc_id}")
    print(f"  Document text: {doc_text}")
    if i >= 4:
        break

Corpus:
  Document ID: ug7v899j
  Document text: {'text': 'OBJECTIVE: This retrospective chart review describes the epidemiology and clinical features of 40 patients with culture-proven Mycoplasma pneumoniae infections at King Abdulaziz University Hospital, Jeddah, Saudi Arabia. METHODS: Patients with positive M. pneumoniae cultures from respiratory specimens from January 1997 through December 1998 were identified through the Microbiology records. Charts of patients were reviewed. RESULTS: 40 patients were identified, 33 (82.5%) of whom required admission. Most infections (92.5%) were community-acquired. The infection affected all age groups but was most common in infants (32.5%) and pre-school children (22.5%). It occurred year-round but was most common in the fall (35%) and spring (30%). More than three-quarters of patients (77.5%) had comorbidities. Twenty-four isolates (60%) were associated with pneumonia, 14 (35%) with upper respiratory tract infections, and 2 (5%) with bronchioli

**Pre-processing the corpus**

In [None]:
from sklearn.feature_extraction.text import TfidfVectorizer
import numpy as np

In [None]:
vectorizer = TfidfVectorizer()  #  It creates an instance of the TfidfVectorizer class and uses it to fit and transform the list of document texts into a matrix of TF-IDF features

In [None]:
corpus_list = [{'id': key, 'text': value['text']} for key, value in corpus_subset.items()]  # converts corpus_subset into a list of dictionaries where each dictionary has an 'id' field and a 'text' field

In [None]:
X = vectorizer.fit_transform([doc['text'] for doc in corpus_list])

In [None]:
corpus_vectorized = {doc['id']: vec for doc, vec in zip(corpus_list, X)}

In [None]:
len(corpus_vectorized)

500

time: 2.93 ms (started: 2023-05-17 13:24:35 +00:00)


In [None]:
# Print the first 3 entries in the vectorized corpus dictionary
print("Corpus:")
for i, (doc_id, doc_text) in enumerate(corpus_vectorized.items()):
    print(f"  Document ID: {doc_id}")
    print(f"  Document text: {doc_text}")
    if i >= 2:
        break

# the output shows,  The vector representation of each document is a sparse matrix in Compressed Sparse Row (CSR) format.
#For example, the (0, 19114) 0.03581738621457777 entry indicates that the value at row 0 and column 19114 of the matrix is 0.03581738621457777.

Corpus:
  Document ID: ug7v899j
  Document text:   (0, 4338)	0.027321845857352013
  (0, 7603)	0.03589175763829515
  (0, 5983)	0.036594973612808814
  (0, 7206)	0.06955036177702395
  (0, 9118)	0.027273805227828955
  (0, 3690)	0.05137505815384527
  (0, 3773)	0.013250170882229637
  (0, 3437)	0.06216031950650572
  (0, 2514)	0.025462963736577804
  (0, 7473)	0.04880389023467086
  (0, 8436)	0.03873443975147449
  (0, 6534)	0.02580971226014836
  (0, 2134)	0.02896533525167126
  (0, 6529)	0.027220787495635945
  (0, 9968)	0.041801584414995326
  (0, 9548)	0.046124482021992715
  (0, 2984)	0.03635537752218948
  (0, 1221)	0.038442323469250805
  (0, 2753)	0.1643108317079625
  (0, 51)	0.0793242365526593
  (0, 2085)	0.054770277235987506
  (0, 8542)	0.03502094099401403
  (0, 3765)	0.03709095035715525
  (0, 7704)	0.05137505815384527
  (0, 7705)	0.05351452429251094
  :	:
  (0, 4423)	0.040678275129186876
  (0, 9598)	0.05619393250518908
  (0, 407)	0.06955036177702395
  (0, 5210)	0.06955036177702395
  (0, 1073)

**Selecting the Threshold**

In [None]:
threshold=0.3 # The threshold determines how similar two documents must be in order to be considered similar.

In [None]:
!pip install ipython-autotime

%load_ext autotime

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting ipython-autotime
  Downloading ipython_autotime-0.3.1-py2.py3-none-any.whl (6.8 kB)
Collecting jedi>=0.16 (from ipython->ipython-autotime)
  Downloading jedi-0.18.2-py2.py3-none-any.whl (1.6 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.6/1.6 MB[0m [31m19.4 MB/s[0m eta [36m0:00:00[0m
Installing collected packages: jedi, ipython-autotime
Successfully installed ipython-autotime-0.3.1 jedi-0.18.2
time: 274 µs (started: 2023-05-17 13:01:07 +00:00)


**Sequential Solution**

In [None]:
import numpy as np
from scipy.sparse import csr_matrix

def cosine_similarity(d1, d2):  # This function calculates the cosine similarity between two document vectors d1 and d2.
    dot_product = np.dot(d1.A, d2.T.A)[0, 0]
    norm1 = np.linalg.norm(d1.A) # Calculate the L2 norm of the first vector
    norm2 = np.linalg.norm(d2.A) # Calculate the L2 norm of the second vector
    return dot_product / (norm1 * norm2) # Return the cosine similarity

def find_similar_documents(corpus_vectorized, threshold): # This function returns the number of document pairs in the corpus with a cosine similarity greater than or equal to the threshold.
    sim_docs = []
    for id1, d1 in corpus_vectorized.items():
        for id2, d2 in corpus_vectorized.items():
            if id1 != id2:  # if d1!=d2 and s(d1, d2)>=threshold : ( Skip pairs with the same document ID )
                d1_csr = csr_matrix(d1)
                d2_csr = csr_matrix(d2)
                if cosine_similarity(d1_csr, d2_csr) >= threshold:
                    sim_docs.append((id1, id2))
    return sim_docs

In [None]:
S_result=find_similar_documents(corpus_vectorized, threshold)

In [None]:
print("The similar document pairs:", S_result)

The similar document pairs: [('ug7v899j', '00rk8fb5'), ('ejv2xln0', 'niii78fr'), ('bbvxu8op', 'c8yzoen7'), ('zowp10ts', 'cgl34ykt'), ('zowp10ts', '4tvrhpss'), ('754nln40', 'ipwm9uob'), ('p34ezktf', 'fite9vs8'), ('p34ezktf', '05ppugs7'), ('cgl34ykt', 'zowp10ts'), ('cgl34ykt', 'x7wva1ax'), ('cgl34ykt', '4tvrhpss'), ('ajlctjeb', 'boto4h8x'), ('oluq7v0h', '094d0rn6'), ('tw6wusxe', '05ppugs7'), ('tw6wusxe', 'utmt5sva'), ('58czem0j', '1dus0u4m'), ('5eqdrd52', 'tex6bgab'), ('xgwbl8em', '69gftii4'), ('xgwbl8em', '8crvjzz4'), ('xgwbl8em', 'pc4x5e24'), ('69gftii4', 'xgwbl8em'), ('5fl0rk90', 'ze511t38'), ('1n0rg5vd', 'iy4c7404'), ('1n0rg5vd', 'vexwisnz'), ('nzh87aux', 'yxtdqjay'), ('nzh87aux', '8d4f4gmw'), ('fvfjz7al', 'fite9vs8'), ('fvfjz7al', '05ppugs7'), ('s4y6uxsb', 'z5klydpi'), ('9zmyojbu', '7gk8uzo0'), ('z5klydpi', 's4y6uxsb'), ('2ks9iimj', 'sasijnks'), ('2ks9iimj', '87mjdccj'), ('2ks9iimj', 'yxtdqjay'), ('1n69h3i3', 'epkw6222'), ('yfn8sy1m', 'wcyv6w47'), ('9zm4per4', 'wtvfow2f'), ('9zm4per

In [None]:
print("The number of similar document pairs:", len(S_result))

The number of similar document pairs: 156
time: 569 µs (started: 2023-05-17 13:27:17 +00:00)


**Parallel Solution (MapReduce & Apache Spark)**

In [None]:
!pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.4.0.tar.gz (310.8 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m310.8/310.8 MB[0m [31m3.0 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.4.0-py2.py3-none-any.whl size=311317130 sha256=b0c40d1ecaf87d8ba251ab5ab657200d2baf2d236de880cf164e7beae8bc1e6a
  Stored in directory: /root/.cache/pip/wheels/7b/1b/4b/3363a1d04368e7ff0d408e57ff57966fcdf00583774e761327
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.4.0
time: 38 s (started: 2023-05-17 13:28:50 +00:00)


In [None]:
from pyspark.sql import SparkSession
from scipy.sparse import find
from collections import defaultdict

In [None]:
def map(doc_id, document, threshold):
    pairs = []
    for term in find(document)[1]:  # Iterate over the terms in the document
        if term > b(document, threshold):
            pairs.append((term, (doc_id, document))) # Add a pair to the list of pairs
    return pairs

def reduce(t, values, threshold):
    result = []
    for id1, d1 in values: # Iterate over all pairs of values
        for id2, d2 in values:
            if id1 != id2 and t == max(set(find(d1)[1]) & set(find(d2)[1])): # Check if the ids are different and if t is equal to the maximum of the intersection of the terms in d1 and d2
                sim = cosine_similarity(d1, d2)
                if sim >= threshold:
                    result.append((id1, id2, sim))
    return result

def cosine_similarity(d1, d2):
    dot_product = np.dot(d1.A, d2.T.A)[0, 0]  # Compute the dot product between d1 and d2
    # Compute the norm of d1 and d2
    norm1 = np.linalg.norm(d1.A)
    norm2 = np.linalg.norm(d2.A)
    return dot_product / (norm1 * norm2)

def b(document, threshold): # Prefix Filtering function
    sum_ = 0
    for t in find(document)[1]:  # Iterate over the terms in the document
        sum_ += document[0,t] * d_star[t] # Update the sum with the product of the term frequency and d_star[t]
        if sum_ >= threshold:
            return t
    return float('inf')

def find_similar_documents_parallel(corpus_vectorized_rdd, threshold):
    global d_star
    d_star = defaultdict(float)  # Initialize d_star as a defaultdict with float values
    for doc_id, document in corpus_vectorized_rdd.collect(): # Iterate over all documents in corpus_vectorized_rdd
        for term in find(document)[1]: # Iterate over all terms in document
            d_star[term] = max(d_star[term], document[0,term]) # Update d_star[term] with the maximum value between its current value and document[0,term]

    # Apply a flatMap transformation to corpus_vectorized_rdd using the map function defined above
    pairs_rdd = corpus_vectorized_rdd.flatMap(lambda x: map(x[0], x[1], threshold))

    # Apply a groupByKey transformation to pairs_rdd followed by a flatMap transformation using the reduce function defined above
    result_rdd = pairs_rdd.groupByKey().flatMap(lambda x: reduce(x[0], list(x[1]), threshold))

    # Collect and return the results
    return result_rdd.collect()

In [None]:
# Create a SparkSession object with spark.executor.cores set Number of cores and a SparkContext object
spark = SparkSession.builder.appName("MySimilaritySearch").config("spark.executor.cores", "16").getOrCreate()
sc = spark.sparkContext

In [None]:
# Parallelize corpus_vectorized using sc.parallelize()
corpus_vectorized_rdd = sc.parallelize(corpus_vectorized.items())

In [None]:
# Call find_similar_documents_parallel() with corpus_vectorized_rdd and threshold as arguments and store the result in P_result variable.
P_result = find_similar_documents_parallel(corpus_vectorized_rdd, threshold)

time: 17min 6s (started: 2023-05-17 13:29:37 +00:00)


In [None]:
print("The similar document pairs:", P_result)


The similar document pairs: [('ug7v899j', '00rk8fb5', 0.32579619694172535), ('ejv2xln0', 'niii78fr', 0.5300750651914734), ('zowp10ts', 'cgl34ykt', 0.38022471904420285), ('754nln40', 'ipwm9uob', 0.34639129893658316), ('p34ezktf', 'fite9vs8', 0.3040123496349497), ('cgl34ykt', 'zowp10ts', 0.38022471904420285), ('cgl34ykt', 'x7wva1ax', 0.41214688251661163), ('tw6wusxe', '05ppugs7', 0.32757212270821123), ('58czem0j', '1dus0u4m', 0.31120839923843807), ('5eqdrd52', 'tex6bgab', 0.3182368926634537), ('5fl0rk90', 'ze511t38', 0.3053971064147807), ('1n0rg5vd', 'iy4c7404', 0.4275541982910774), ('s4y6uxsb', 'z5klydpi', 0.3085262235954564), ('9zmyojbu', '7gk8uzo0', 0.30536684043101203), ('z5klydpi', 's4y6uxsb', 0.3085262235954564), ('2ks9iimj', 'sasijnks', 0.3184332283921504), ('2ks9iimj', '87mjdccj', 0.37752865496002624), ('2ks9iimj', 'yxtdqjay', 0.32512236385033877), ('yfn8sy1m', 'wcyv6w47', 0.32220267902434263), ('9zm4per4', 'q26f8pv4', 0.3532136255448324), ('noscodew', '5j496cx0', 0.3530251884180

In [None]:
print("The number of similar document pairs:", len(P_result))

The number of similar document pairs: 156
time: 632 µs (started: 2023-05-17 13:46:58 +00:00)
