In [1]:
!pip install beir
!pip install pandas
!pip install sklearn
!pip install -U pip setuptools wheel
!pip install -U spacy
!python -m spacy download en_core_web_sm
!pip install ipywidgets

Collecting en-core-web-sm==3.5.0
  Downloading https://github.com/explosion/spacy-models/releases/download/en_core_web_sm-3.5.0/en_core_web_sm-3.5.0-py3-none-any.whl (12.8 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m12.8/12.8 MB[0m [31m8.9 MB/s[0m eta [36m0:00:00[0m:00:01[0m00:01[0m
[38;5;2m✔ Download and installation successful[0m
You can now load the package via spacy.load('en_core_web_sm')


In [2]:
from typing import Dict, List, Tuple

from beir import util
from beir.datasets.data_loader import GenericDataLoader

import os
'''try:
    import ipywidgets
    from tqdm.auto import tqdm
except ModuleNotFoundError:
    from tqdm import tqdm'''
from tqdm import tqdm

import spacy

import time
import numpy as np
import pandas as pd
import pyspark.pandas as ps
from concurrent.futures import ProcessPoolExecutor
import multiprocessing as mp

from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics.pairwise import cosine_similarity

  from tqdm.autonotebook import tqdm


# Available Datasets

| Dataset   | Website| BEIR-Name | Domain     | Relevancy| Queries  | Documents | Avg. Docs/Q | Download | 
| -------- | -----| ---------| ----------- | ---------| ---------| --------- | ------| ------------| 
| MSMARCO    | [``Homepage``](https://microsoft.github.io/msmarco/)| ``msmarco`` | Misc.       |  Binary  |  6,980   |  8.84M     |    1.1 | Yes |  
| TREC-COVID |  [``Homepage``](https://ir.nist.gov/covidSubmit/index.html)| ``trec-covid``| Bio-Medical |  3-level|50|  171K| 493.5 | Yes | 
| NFCorpus   | [``Homepage``](https://www.cl.uni-heidelberg.de/statnlpgroup/nfcorpus/) | ``nfcorpus``  | Bio-Medical |  3-level |  323     |  3.6K     |  38.2 | Yes |
| BioASQ     | [``Homepage``](http://bioasq.org) | ``bioasq``| Bio-Medical |  Binary  |   500    |  14.91M    |  8.05 | No | 
| NQ         | [``Homepage``](https://ai.google.com/research/NaturalQuestions) | ``nq``| Wikipedia   |  Binary  |  3,452   |  2.68M  |  1.2 | Yes | 
| HotpotQA   | [``Homepage``](https://hotpotqa.github.io) | ``hotpotqa``| Wikipedia   |  Binary  |  7,405   |  5.23M  |  2.0 | Yes |
| FiQA-2018  | [``Homepage``](https://sites.google.com/view/fiqa/) | ``fiqa``    | Finance     |  Binary  |  648     |  57K    |  2.6 | Yes | 
| Signal-1M (RT) | [``Homepage``](https://research.signal-ai.com/datasets/signal1m-tweetir.html)| ``signal1m`` | Twitter     |  3-level  |   97   |  2.86M  |  19.6 | No |
| TREC-NEWS  | [``Homepage``](https://trec.nist.gov/data/news2019.html) | ``trec-news``    | News     |  5-level  |   57    |  595K    |  19.6 | No |
| ArguAna    | [``Homepage``](http://argumentation.bplaced.net/arguana/data) | ``arguana`` | Misc.       |  Binary  |  1,406     |  8.67K    |  1.0 | Yes |
| Touche-2020| [``Homepage``](https://webis.de/events/touche-20/shared-task-1.html) | ``webis-touche2020``| Misc.       |  6-level  |  49     |  382K    |  49.2 |  Yes |
| CQADupstack| [``Homepage``](http://nlp.cis.unimelb.edu.au/resources/cqadupstack/) | ``cqadupstack``| StackEx.      |  Binary  |  13,145 |  457K  |  1.4 |  Yes |
| Quora| [``Homepage``](https://www.quora.com/q/quoradata/First-Quora-Dataset-Release-Question-Pairs) | ``quora``| Quora  | Binary  |  10,000     |  523K    |  1.6 |  Yes | 
| DBPedia | [``Homepage``](https://github.com/iai-group/DBpedia-Entity/) | ``dbpedia-entity``| Wikipedia |  3-level  |  400    |  4.63M    |  38.2 |  Yes | 
| SCIDOCS| [``Homepage``](https://allenai.org/data/scidocs) | ``scidocs``| Scientific |  Binary  |  1,000     |  25K    |  4.9 |  Yes | 
| FEVER| [``Homepage``](http://fever.ai) | ``fever``| Wikipedia     |  Binary  |  6,666     |  5.42M    |  1.2|  Yes | 
| Climate-FEVER| [``Homepage``](http://climatefever.ai) | ``climate-fever``| Wikipedia |  Binary  |  1,535     |  5.42M |  3.0 |  Yes |
| SciFact| [``Homepage``](https://github.com/allenai/scifact) | ``scifact``| Scientific |  Binary  |  300     |  5K    |  1.1 |  Yes |


# Dataset Download & Pre-Processing

In [3]:
def download_dataset(dataset: str) -> Dict[str, List[str]]:
	'''
	PURPOSE: download the dataset
	ARGUMENTS:
		- dataset (str): string describing the beir dataset
	RETURN:
		- (List[str]) list of documents
	'''
	data_path = f'datasets/{dataset}'
	if not os.path.isdir(data_path):
		url = f'https://public.ukp.informatik.tu-darmstadt.de/thakur/BEIR/datasets/{dataset}.zip'
		out_dir = os.path.join(os.getcwd(), 'datasets')
		data_path = util.download_and_unzip(url, out_dir)
		print(f'Dataset downloaded here: {data_path}')
	corpus, _, _ = GenericDataLoader(data_path).load(split="test")
	return {doc_id: title_text['title'] + ' ' + title_text['text'] for doc_id, title_text in corpus.items()}

datasets = ['scifact'] # Choosen datasets #, 'nfcorpus'
threshold = 0.8

datasets_data = {dataset: download_dataset(dataset) for dataset in datasets}

  0%|          | 0/5183 [00:00<?, ?it/s]

## Pre-Processing with Spacy

In [4]:
nlp = spacy.load('en_core_web_sm')
stopwords = nlp.Defaults.stop_words
clean_tokens = lambda tokens : ' '.join([token.lemma_.lower() for token in tokens if token not in stopwords and not token.is_punct])
# Lambda for text pre-processing

In [19]:
def pre_process(dictionary):
	'''
	PURPOSE: preprocess the text using spaCy
	ARGUMENTS:
		- corpus (str): string of document to pre-process
	RETURN:
		- str: cleaned document
	'''
	key, value = dictionary
	return {key: clean_tokens(nlp(value))}



def documents_preprocessing(dataset_name: str, documents: Dict[str, str]) -> Dict[str, str]:
	'''
	PURPOSE: preprocess all the documents and query for the relative dataset
	ARGUMENTS:
		- dataset_name (str): string describing the dataset name
		- documents (Dict[str, List[str]]): doc_id, document_text dictionary
	RETURN: 
		- new_documents (Dict[str, List[str]]): dictionary of cleaned documents
	'''
 
	path_datasets = os.path.join(os.getcwd(), 'datasets')
	if os.path.exists(os.path.join(path_datasets, dataset_name, 'pre_processed_corpus.parquet')):
		return pd.read_parquet(os.path.join(path_datasets, dataset_name, 'pre_processed_corpus.parquet')).to_dict()[0]
	
 
	new_documents = {}

	with ProcessPoolExecutor(max_workers=mp.cpu_count()) as executor:
		results = list(
				tqdm(
					executor.map(pre_process, documents.items()),
					total=len(documents),
					desc=f'{dataset_name} - Documents Pre-Processing',
				)
			)

	for result in results:
		new_documents |= result

	write_pd = pd.DataFrame.from_dict(new_documents, orient='index')
	write_pd.to_parquet(os.path.join(path_datasets, dataset_name, 'pre_processed_corpus.parquet'))

	return new_documents

In [20]:
# Dictionary of dataset: pre-processed documents
pre_processed_data = {dataset: documents_preprocessing(dataset, docs_dict) for dataset, docs_dict in datasets_data.items()}

# Sequential Version - All Pairs Documents Similarity

In [22]:
def classic_all_pairs_docs_sim(docs_list: List[str], threshold: float):
    count = 0
    doc_similaritis = []
    vectorizer = TfidfVectorizer()
    features = vectorizer.fit_transform(docs_list)
    
    
    start = time.time()
    similarities = cosine_similarity(features)
    for doc_1, doc_sims in enumerate(similarities):
        for doc_2, doc_sim in enumerate(doc_sims[(doc_1+1):], start=doc_1+1):
            if doc_sim >= threshold:
                count += 1
                doc_similaritis.append((doc_1, doc_2, doc_sim))
    end = time.time()
    
    
    return doc_similaritis, {'threshold': threshold, 'similar_doc': count, 'elapsed': end-start}

In [23]:
def npargwhere_all_pairs_docs_sim(docs_list: List[str], threshold: float):
    print()
    vectorizer = TfidfVectorizer()
    features = vectorizer.fit_transform(docs_list)
    
    
    start = time.time()
    similarities = cosine_similarity(features)
    np.fill_diagonal(similarities, 0.0)
    idx_doc_similaritis = np.argwhere(similarities > threshold)
    end = time.time()
    

    return [(similar.tolist(), similarities[similar[0], similar[1]]) for similar in idx_doc_similaritis], \
        {'threshold': threshold, 'similar_doc': int(len(idx_doc_similaritis)/2), 'elapsed': end-start}

In [24]:
def perform_all_pairs_docs_sim(data, threshold):
    result = {}
    for datasets_name, docs_list in data.items():
        print(f'All Documents Pairs Similarities - {datasets_name}')
        similar_list, stat = npargwhere_all_pairs_docs_sim(list(docs_list.values()), threshold)
        for tuple in similar_list: print(tuple)
        result[datasets_name] = stat
    return result

In [25]:
res = perform_all_pairs_docs_sim(pre_processed_data, threshold) 
res

All Documents Pairs Similarities - scifact

([331, 3623], 0.8165055418303958)
([450, 4681], 0.8038672910026508)
([1576, 4295], 0.8486930447199021)
([1578, 5026], 0.8631119628654668)
([2131, 3236], 0.8750903273705578)
([3236, 2131], 0.8750903273705578)
([3623, 331], 0.8165055418303958)
([4295, 1576], 0.8486930447199021)
([4491, 4588], 0.8982349824099298)
([4588, 4491], 0.8982349824099298)
([4681, 450], 0.8038672910026508)
([5026, 1578], 0.8631119628654668)


{'scifact': {'threshold': 0.8, 'similar_doc': 6, 'elapsed': 2.455399751663208}}

In [27]:
def perform_all_pairs_docs_sim(data, threshold):
    result = {}
    for datasets_name, docs_list in data.items():
        print(f'All Documents Pairs Similarities - {datasets_name}')
        similar_list, stat = classic_all_pairs_docs_sim(list(docs_list.values()), threshold)
        for tuple in similar_list: print(tuple)
        result[datasets_name] = stat
    return result

In [28]:
res = perform_all_pairs_docs_sim(pre_processed_data, threshold) 
res

All Documents Pairs Similarities - scifact
(331, 3623, 0.8165055418303958)
(450, 4681, 0.8038672910026508)
(1576, 4295, 0.8486930447199021)
(1578, 5026, 0.8631119628654668)
(2131, 3236, 0.8750903273705578)
(4491, 4588, 0.8982349824099298)


{'scifact': {'threshold': 0.8, 'similar_doc': 6, 'elapsed': 4.336603403091431}}

# Parallel Version with Map Reduce from PySpark - All Pairs Documents Similarity

## Download PySpark 

In [29]:
!pip install pyspark



## Active PySpark

In [None]:
from pyspark.sql import SparkSession

# Create SparkSession 
spark = SparkSession.builder \
    .master('local[1]') \
    .config("spark.driver.memory", "15g") \
    .appName("all_pairs_docs_similarity.com") \
    .getOrCreate()

sc = spark.sparkContext

## PySpark Dataset Creation

In [None]:
# Create the features and columns vectors
vectorizer = TfidfVectorizer()
tfidf_features = vectorizer.fit_transform(pre_processed_data['scifact'].values())
tfidf_columns = vectorizer.get_feature_names_out()

In [None]:
# Create a dictionary of key document ID and value the list of TF-IDF values
scifact_dict = dict(
    zip(pre_processed_data['scifact'].keys(), tfidf_features.toarray())
)

scifact_rdd = sc.parallelize(scifact_dict) #tfidf_features.toarray()
scifact_rdd.collect()

# Get the d_star with the maximum TF-IDF value of each term from any documents
# todo

In [None]:
# Create the pandas Dataframe and convert it into a PySpark Dataframe
scifact_tfidf_pdf  = pd.DataFrame(data=tfidf_features.toarray(), index=pre_processed_data['scifact'].keys(), columns=tfidf_columns)
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled","true")
scifact_tfidf_df = spark.createDataFrame(scifact_tfidf_pdf)
scifact_rdd = scifact_tfidf_df.rdd # Obtain the rrd from the dataframe
print(scifact_rdd.take(5))

# Get the d_star with the maximum TF-IDF value of each term from any documents
d_star = scifact_tfidf_pdf.max(axis='rows')

In [None]:
# Map function
def map_fun(doc_id, tf_idf_list):
    for idx, _ in enumerate(tf_idf_list): 
        return (scifact_tfidf_df.columns[idx], [doc_id, tf_idf_list])

In [None]:
# Reduce function
def reduce_fun(doc_id_doc_list):
    threshold = 0.9
    for id1, d1 in doc_id_doc_list:
        for id2, d2 in doc_id_doc_list:
            if cosine_similarity(d1, d2) >= threshold:
                print([id1, id2, cosine_similarity(d1, d2)])

In [None]:
start = time.time()
result = scifact_rdd.map(map_fun).reduceByKey(reduce_fun)
end = time.time()
result.collect(), (start-end)