In [1]:
# -*- coding: utf-8 -*-
# -*- authors : Vincent Roduit -*-
# -*- date : 2024-09-30 -*-
# -*- Last revision: 2024-09-30 by Vincent Roduit -*-
# -*- python version : 3.9.19 -*-
# -*- Description: Constants used in the code *-

# <center> CS - 423: Distributed Information Systems </center>
## <center> Ecole Polytechnique Fédérale de Lausanne </center>
### <center>Project 1: Document Retrieval </center>
---

In [2]:
#import files
import os
import multiprocessing
import pickle as pkl
import pandas as pd
from tqdm import tqdm
from collections import Counter, defaultdict
import math
import numpy as np
from time import time
from nltk.tokenize import RegexpTokenizer
from nltk.stem import PorterStemmer

# 1. Constants

In [3]:
CORES = multiprocessing.cpu_count()

# Path to the data folder
DATA_FOLDER = "/kaggle/input/"

#Path for pickles
PICKLES_FOLDER = os.path.join(DATA_FOLDER, "pickle-files-dis")

#Path for the stopwords
STOPWORDS_FOLDER = os.path.join(DATA_FOLDER, "stopwords")

def load_stopwords(path):
    with open(path, 'r') as f:
        arabic_stopwords = f.read().splitlines()
    return arabic_stopwords

STOP_WORDS = {
    "en": set(load_stopwords(os.path.join(STOPWORDS_FOLDER, "english"))),
    "fr": set(load_stopwords(os.path.join(STOPWORDS_FOLDER, "french"))),
    "de": set(load_stopwords(os.path.join(STOPWORDS_FOLDER, "german"))),
    "es": set(load_stopwords(os.path.join(STOPWORDS_FOLDER, "spanish"))),
    "it": set(load_stopwords(os.path.join(STOPWORDS_FOLDER, "italian"))),
    "ko": set(load_stopwords(os.path.join(STOPWORDS_FOLDER, "korean"))),
    "ar": set(load_stopwords(os.path.join(STOPWORDS_FOLDER, "arabic"))),
}

#Path for the corpus
CORPUS = os.path.join(DATA_FOLDER, "dis-project-1-document-retrieval", "corpus.json")
QUERIES = os.path.join(DATA_FOLDER,"dis-project-1-document-retrieval", "test.csv")

# 2. Util functions

In [4]:
def save_data(data: any, file_name: str, folder: str = os.path.join(DATA_FOLDER, "pickles")):
    """Save the data to a file
    
    Args:
        * data (any): the data to save

        * file_name (str): the name of the file

        * folder (str): the folder where to save the file
    """
    if not os.path.exists(folder):
        os.makedirs(folder)

    file_path = os.path.join(folder, file_name)

    with open(file_path, 'wb') as handle:
        pkl.dump(data, handle)

def load_data(file_name: str, folder: str = os.path.join(DATA_FOLDER, "pickles")) -> any:
    """Load the data from a file

    Args:
        * file_name (str): the name of the file

        * folder (str): the folder where to save the file

    Returns:
        * any: the data
    """
    file_path = os.path.join(folder, file_name)

    with open(file_path, 'rb') as handle:
        data = pkl.load(handle)

    return data

def create_term_to_id(tokens_list):
    term_to_id = {}
    for document in tqdm(tokens_list):
        for term in document:
            if term not in term_to_id:
                term_to_id[term] = len(term_to_id)
    return term_to_id

def transform_query_to_int(query, term_to_id):
    query_int = []
    for term in query:
        if term in term_to_id:
            query_int.append(term_to_id[term])
    return query_int

# 3. Processing functions

In [5]:
tokenizer = RegexpTokenizer(r'\w+')
stemmer = PorterStemmer()

def tokenize(text:str, lang:str="en") -> list:
    """Tokenizes and stems the input text efficiently.

    Args:
        * text(str): The text to tokenize.

        * lang(str): The language of the text. Defaults to "en".

    Returns:
        * list: The list of stemmed tokens.
    """
    
    tokens = tokenizer.tokenize(text)
    
    # Combine stemming and stopword filtering into one pass for efficiency
    return [stemmer.stem(word.lower()) for word in tokens if word.lower() not in STOP_WORDS[lang] or stemmer.stem(word.lower()) not in STOP_WORDS[lang]]

def tokenize_documents(file_name:str, corpus:pd.DataFrame, drop_text:bool=True, verbose:bool=False) -> pd.DataFrame:
    """Tokenize the corpus

    Args:
        * file_name(str): The name of the file to save the tokenized corpus.

        * corpus(pd.DataFrame): The corpus to tokenize.
    
    Returns:
        * pd.DataFrame: The tokenized corpus.
    """
    tqdm.pandas() 
    if os.path.exists(os.path.join(PICKLES_FOLDER, file_name + "_tokenized.pkl")) and os.path.exists(os.path.join(PICKLES_FOLDER, file_name + "_tokens_list.pkl")):
        if verbose:
            print("Loading tokenized corpus from pickle")
        corpus = load_data(file_name + "_tokenized.pkl", PICKLES_FOLDER)
        tokens_list = load_data(file_name + "_tokens_list.pkl", PICKLES_FOLDER)
    else: 
        if verbose:
            print("Tokenizing corpus")
        corpus["tokens"] = corpus.progress_apply(lambda row: tokenize(row['text'], lang=row['lang']), axis=1)
        if drop_text:
            corpus.drop(columns=["text"], inplace=True)
        tokens_list = corpus["tokens"].tolist()
        #save_data(corpus, file_name + "_tokenized.pkl", PICKLES_FOLDER)
        #save_data(tokens_list, file_name + "_tokens_list.pkl", PICKLES_FOLDER)
    return corpus, tokens_list

# 4. Score function

In [6]:
def bm25_score(
    query:list, 
    document_id:int, 
    idf:dict, 
    tf:dict,
    length_norm:float, 
    k1:float=1.5
) -> float:
    """Compute the BM25 score for a given query and the document position in the corpus

    Args:
        * query(list): The list of query terms.

        * document_id(int): The document position in the corpus.

        * idf(dict): The inverse document frequency of the terms.

        * tf(dict): The term frequency of the terms in the documents.

        * length_norm(float): The length normalization of the document.

        * k1(float): The BM25 parameter k1. Defaults to 1.5.
    
    Returns:
        * float: The BM25 score.
    """
    if document_id not in tf:
        return 0
    
    score = 0

    query_terms = set(query)  # Use set to avoid redundant term checks
    for term in query_terms:
        if term in tf[document_id]:
            idf_term = idf[term]
            tf_term = tf[document_id][term]
            score += idf_term * (tf_term * (k1 + 1) / (tf_term + length_norm))
    return score

# 5. CorpusBase class

In [7]:
class CorpusBase:
    def __init__(self, corpus_path: str, query_path: str, verbose: bool=False):
        """ Initialize the CorpusBase object.
        Args:
            * corpus_path (str): the path to the corpus file.

            * query_path (str): the path to the query file.

            * verbose (bool): whether to print the progress or not. Defaults to False.

        Class attributes:
            * corpus (pd.DataFrame): the corpus.

            * corpus_path (str): the path to the corpus file.

            * corpus_file_name (str): the name of the corpus file.

            * query_file_name (str): the name of the query file.

            * tokens_list (list): the tokens list.

            * results (list): the results of the queries.

            * query (pd.DataFrame): the query.

            * query_tokens_list (list): the query tokens list.

            * query_path (str): the path to the query file.

            * verbose (bool): whether to print the progress or not.
        """
        self.corpus = None
        self.corpus_path = corpus_path
        self.corpus_file_name = corpus_path.split('/')[-1].split('.')[0]
        self.query_file_name = query_path.split('/')[-1].split('.')[0]
        self.tokens_list = None 
        self.results = None
        self.query = None
        self.query_tokens_list = None
        self.query_path = query_path
        self.verbose = verbose

    def load_corpus(self):
        """Load the corpus
        """
        if os.path.exists(os.path.join(PICKLES_FOLDER, self.corpus_file_name + ".pkl")):
            if self.verbose:
                print("Loading corpus from pickle")
            self.corpus = load_data(self.corpus_file_name + ".pkl", PICKLES_FOLDER)
        else:
            if self.verbose:
                print(f"Loading corpus from {self.corpus_path}")
            if '.csv' in self.corpus_path:
                self.corpus = pd.read_csv(self.corpus_path)
            elif '.json' in self.corpus_path:
                self.corpus = pd.read_json(self.corpus_path)
            else:
                raise ValueError("The file format is not supported")
            #save_data(self.corpus, self.corpus_file_name + ".pkl", PICKLES_FOLDER)
    
    def load_query(self):
        """Load the query
        """
        if os.path.exists(os.path.join(PICKLES_FOLDER, self.query_file_name + ".pkl")):
            if self.verbose:
                print("Loading query from pickle")
            self.query = load_data(self.query_file_name + ".pkl", PICKLES_FOLDER)
        else:
            if self.verbose:
                print(f"Loading query from {self.query_path}")
            if '.csv' in self.query_path:
                self.query = pd.read_csv(self.query_path)

    def tokenize_corpus(self, drop_text:bool=True):
        """Tokenize the corpus

        Args:
            * drop_text (bool): whether to drop the text column or not.
        """
        if self.corpus is None:
            self.load_corpus()
        self.corpus, self.tokens_list = tokenize_documents(self.corpus_file_name, self.corpus, drop_text, self.verbose)
    
    def tokenize_query(self, drop_text:bool=True):
        """Tokenize the query

        Args:
            * drop_text (bool): whether to drop the text column or not.
        """
        if self.query is None:
            self.load_query()
        if 'query' in self.query.columns:
            self.query.rename(columns={'query': 'text'}, inplace=True)
        self.query, self.query_tokens_list = tokenize_documents(self.query_file_name, self.query, drop_text, self.verbose) 

# 6. CorpusBm25 class

In [8]:
class CorpusBm25(CorpusBase):
    def __init__(
            self, corpus_path: str, 
            query_path: str, 
            k1:float=1.5, 
            b:float=0.75, 
            filter:bool=False,
            filt_docs:int=10000,
            verbose:bool=True):
        """
        Initialize the CorpusBM25 object.

        Args:
            * corpus_path: str, the path to the corpus file.

            * query_path: str, the path to the query file.

            * k1: float, the BM25 parameter k1. Defaults to 1.5.

            * b: float, the BM25 parameter b. Defaults to 0.75.

            * filter: bool, whether to filter the results or not. Defaults to False.

            * filt_docs: int, the number of documents to filter. Defaults to 10000.

            * verbose: bool, whether to print the progress or not. Defaults to False.

        Class attributes:
            * tf (dict): the term frequency for each term in each document.

            * idf (dict): the inverse document frequency for each term.

            * df (dict): the document frequency for each term.

            * avg_doc_len (float): the average document length.

            * doc_len (list): the length of each document in the corpus.

            * results (list): the results of the queries.

            * k1 (float): the BM25 parameter k1.

            * b (float): the BM25 parameter b.

            * filter (bool): whether to filter the results or not.

            * filt_docs (int): the number of documents to filter.

            * inverted_index (dict): the inverted index of the corpus.

            * term_to_id (dict): the mapping of terms to IDs.

            * time (float): the time taken to process the queries and calculate the BM25 scores.

        """
        super().__init__(corpus_path, query_path)
        self.tf = None
        self.idf = None
        self.df = None
        self.avg_doc_len = None
        self.doc_len = None
        self.results = None
        self.inverted_index = None
        self.term_to_id = None
        self.k1 = k1
        self.b = b
        self.filter = filter
        self.filt_docs = int(filt_docs)
        self.time = None
        self.verbose = verbose

    def _compute_df(self):
        """Compute the document frequency for each term in the corpus (i.e., the number of documents in which the term appears).
        """
        if self.df is None:
            if os.path.exists(os.path.join(PICKLES_FOLDER, self.corpus_file_name + "_df.pkl")) \
                and os.path.exists(os.path.join(PICKLES_FOLDER, self.corpus_file_name + "_term_to_id.pkl")):
                if self.verbose:
                    print("Loading df from pickle")
                self.df = load_data(self.corpus_file_name + "_df.pkl", PICKLES_FOLDER)
                self.term_to_id = load_data(self.corpus_file_name + "_term_to_id.pkl", PICKLES_FOLDER)
            
            else:
                if self.corpus is None:
                    self.load_corpus()
                if 'tokens' not in self.corpus.columns:
                    self.tokenize_corpus()
                if self.term_to_id is None:
                    self.term_to_id = create_term_to_id(self.corpus['tokens'].tolist())
                    #save_data(self.term_to_id, self.corpus_file_name + "_term_to_id.pkl", PICKLES_FOLDER)
                self.corpus['tokens'] = self.corpus['tokens'].apply(lambda x: transform_query_to_int(x, self.term_to_id))
                if self.verbose:
                    print("Computing df")
                corpus_tokenized = self.corpus['tokens'].tolist()
                self.df = Counter(term for document in corpus_tokenized for term in set(document))
                #save_data(self.df, self.corpus_file_name + "_df.pkl", PICKLES_FOLDER)   
    
    def _compute_idf(self):
        """Compute the inverse document frequency for each term in the corpus.
        """
        if self.idf is None:
            if os.path.exists(os.path.join(PICKLES_FOLDER, self.corpus_file_name + "_idf.pkl")):
                if self.verbose:
                    print("Loading idf from pickle")
                self.idf = load_data(self.corpus_file_name + "_idf.pkl", PICKLES_FOLDER)    
            else:
                if self.df is None:
                    self._compute_df()
                if self.corpus is None:
                    self.load_corpus()
                if self.verbose:
                    print("Computing idf")
                num_documents = len(self.corpus)
                self.idf = {term: math.log(1 + (num_documents - self.df[term] + 0.5) / (self.df[term] + 0.5)) for term in self.df}
                #save_data(self.idf, self.corpus_file_name + "_idf.pkl", PICKLES_FOLDER)
    
    def _compute_tf(self):
        """Compute the term frequency for each term in each document.
        """
        if self.tf is None:
            if os.path.exists(os.path.join(PICKLES_FOLDER, self.corpus_file_name + "_tf.pkl")):
                if self.verbose:
                    print("Loading tf from pickle")
                self.tf = load_data(self.corpus_file_name + "_tf.pkl", PICKLES_FOLDER)
            else:
                if self.corpus is None:
                    self.load_corpus()
                if 'tokens' not in self.corpus.columns:
                    self.tokenize_corpus()
                if self.verbose:
                    print("Computing tf")
                corpus_tokenized = self.corpus['tokens'].tolist()
                self.tf = {i: dict(Counter(document)) for i, document in enumerate(corpus_tokenized)}
                #save_data(self.tf, self.corpus_file_name + "_tf.pkl", PICKLES_FOLDER)
    
    def _compute_doc_len(self):
        """Compute the length of each document in the corpus.
        """
        if self.doc_len is None or self.avg_doc_len is None:
            if os.path.exists(os.path.join(PICKLES_FOLDER, self.corpus_file_name + "_doc_len.pkl")) \
                and os.path.exists(os.path.join(PICKLES_FOLDER, self.corpus_file_name + "_avg_doc_len.pkl")):
                if self.verbose:
                    print("Loading doc_len from pickle")
                self.doc_len = load_data(self.corpus_file_name + "_doc_len.pkl", PICKLES_FOLDER)
                self.avg_doc_len = load_data(self.corpus_file_name + "_avg_doc_len.pkl", PICKLES_FOLDER)
            else:
                if self.corpus is None:
                    self.load_corpus()
                if 'tokens' not in self.corpus.columns:
                    self.tokenize_corpus()
                if self.verbose:
                    print("Computing doc_len")
                self.doc_len = [len(document) for document in self.corpus['tokens'].tolist()]
                self.avg_doc_len = sum(self.doc_len) / len(self.doc_len)
                #save_data(self.doc_len, self.corpus_file_name + "_doc_len.pkl", PICKLES_FOLDER)
                #save_data(self.avg_doc_len, self.corpus_file_name + "_avg_doc_len.pkl", PICKLES_FOLDER)

    def _compute_length_norm(self):
        """Compute the length normalization factor for a given document length.

        Args:
            * doc_len (int): the length of the document.

            * avg_doc_len (float): the average document length.

            * k1 (float): the BM25 parameter k1. Defaults to 1.5.

            * b (float): the BM25 parameter b. Defaults to 0.75.
        """
        if self.doc_len is None:
            self._compute_doc_len()
        if self.avg_doc_len is None:
            self._compute_doc_len()
        if self.verbose:
            print("Computing length_norm")
        self.length_norm = [self.k1 * (1 - self.b + self.b * doc_length / self.avg_doc_len) for doc_length in self.doc_len]
    
    def _compute_inverted_index(self):
        if os.path.exists(os.path.join(PICKLES_FOLDER, self.corpus_file_name + "_inverted_index.pkl")):
            if self.verbose:
                print("Loading inverted index from pickle")
            self.inverted_index = load_data(self.corpus_file_name + "_inverted_index.pkl", PICKLES_FOLDER)
        else:
            if self.tf is None:
                self._compute_tf()
            self.inverted_index = {}
            if self.verbose:
                for doc_id, doc in tqdm(self.tf.items(), total=len(self.tf), desc="Computing inverted index"):
                    for word, _ in doc.items():
                        if word not in self.inverted_index:
                            self.inverted_index[word] = []
                        self.inverted_index[word].append(doc_id)
            else:
                for doc_id, doc in self.tf.items():
                    for word, _ in doc.items():
                        if word not in self.inverted_index:
                            self.inverted_index[word] = []
                        self.inverted_index[word].append(doc_id)
            #save_data(self.inverted_index, self.corpus_file_name + "_inverted_index.pkl", PICKLES_FOLDER)
    
    def _BM25_search(self,query, docid,relevant_docs, k=10):
        """Compute BM25 score for all documents in the corpus for a given query and language and return the top-k documents

        Args:
            * query (list): the tokenized query.

            * docid (list): the list of document IDs.

            * relevant_docs (list): the list of relevant document IDs.

            * k (int): the number of documents to return.

        Returns:
            * top_doc_ids (list): the list of document IDs with the highest BM25 scores.
        """

        # Calculate scores only for relevant documents
        scores = []
        for i in relevant_docs:
            length_norm = self.length_norm[i]
            score = bm25_score(query, i, self.idf, self.tf, length_norm, self.k1)
            scores.append((score, docid[i]))

        # Sort and get top-k documents by score
        scores.sort(key=lambda x: -x[0])
        top_doc_ids = [doc_id for _, doc_id in scores[:k]]
        
        return top_doc_ids
    
    def _get_relevant_docs(self, query, relevant_docs, k=10000):
        query_test = set(query)
        rel_docs = defaultdict(int)
        for word in query_test:
            if word in self.inverted_index:
                for doc_id in self.inverted_index[word]:
                    rel_docs[doc_id] += 1
        sorted_rel_docs = np.array(sorted(rel_docs.items(), key=lambda x: x[1], reverse=True))
        rel_docs_ids = np.intersect1d(sorted_rel_docs, relevant_docs)
        rel_docs_ids = rel_docs_ids[:k] if len(rel_docs_ids) > k else rel_docs_ids
        return rel_docs_ids

    
    def get_results(self):
        """Get the results of the queries
        """
        self.results = []

        #initialize the idf, tf, avg_doc_len, doc_len
        if self.verbose:
            print("Computing idf, tf, avg_doc_len, doc_len")
        self._compute_df()
        self._compute_idf()
        self._compute_tf()
        self._compute_doc_len()
        self._compute_length_norm()
        if self.filter:
            self._compute_inverted_index()
        
        if os.path.exists(os.path.join(PICKLES_FOLDER, self.corpus_file_name + "_docid.pkl")):
            if self.verbose:
                print("Loading docid from pickle")
            docid = load_data(self.corpus_file_name + "_docid.pkl", PICKLES_FOLDER)
        if os.path.exists(os.path.join(PICKLES_FOLDER, self.corpus_file_name + "_lang.pkl")):
            if self.verbose:
                print("Loading lang from pickle")
            lang = load_data(self.corpus_file_name + "_lang.pkl", PICKLES_FOLDER)
        else:
            #load the corpus
            if self.corpus is None:
                self.load_corpus()

            #extract list of docid, lang and tokenized text from the corpus
            docid = self.corpus['docid'].tolist()
            #save_data(docid, self.corpus_file_name + "_docid.pkl", PICKLES_FOLDER)
            lang = self.corpus['lang'].tolist()
            #save_data(lang, self.corpus_file_name + "_lang.pkl", PICKLES_FOLDER)

        # Create a dictionary with the relevant documents for each language
        langs = set(lang)
        dict_relevant_docs = {l: [i for i in range(len(docid)) if lang[i] == l] for l in langs}

        start = time()

        # Load the queries
        if self.query is None:
            self.load_query()
        if 'tokens' not in self.query.columns:
            self.tokenize_query()
            self.query['tokens'] = self.query['tokens'].apply(lambda x: transform_query_to_int(x, self.term_to_id))

        #extract list of tokenized text and lang from the test queries
        list_test_queries = self.query["tokens"].tolist()
        list_lang_test_queries = self.query["lang"].tolist()

        # Loop over each query and calculate the BM25 scores
        if self.verbose:
            for idx, query in tqdm(enumerate(list_test_queries), total=len(list_test_queries), desc="Calculating BM25 scores"):
                query_lang = list_lang_test_queries[idx]  # Get the language for the current query
                
                # Get the top 10 documents for the current query
                relevant_docs = np.array(dict_relevant_docs[query_lang])
                if self.filter:
                    relevant_docs = self._get_relevant_docs(query, relevant_docs, self.filt_docs)
                top_docs = self._BM25_search(query, docid,relevant_docs, k=10)
                
                # Append the result as a dictionary
                self.results.append({
                    'id': idx,  # You may replace idx with actual query ID if available
                    'docids': top_docs
                })  
        else:
            for idx, query in enumerate(list_test_queries):
                query_lang = list_lang_test_queries[idx]  # Get the language for the current query
                
                # Get the top 10 documents for the current query
                relevant_docs = np.array(dict_relevant_docs[query_lang])
                if self.filter:
                    relevant_docs = self._get_relevant_docs(query, relevant_docs, self.filt_docs)
                top_docs = self._BM25_search(query, docid,relevant_docs, k=10)
                
                # Append the result as a dictionary
                self.results.append({
                    'id': idx,  # You may replace idx with actual query ID if available
                    'docids': top_docs
                }) 
        end = time()
        if self.verbose:
            print(f"Time taken to process queries and compute BM25 scores: {int((end - start) / 60)} min {int((end - start) % 60)} sec")
        self.time = end - start

    def create_submission(self, output_path: str):
        """ Create a submission file for the BM25 model.

        Args:
            * output_path (str): the path to the output file.
        """
        if self.results is None:
            self.get_results()
        results_df = pd.DataFrame(self.results)
        results_df.to_csv(output_path, index=False)

# 7. Final Part: Computing results

In [9]:
documents = CorpusBm25(CORPUS, QUERIES, filter=True, filt_docs=10000, k1=2, b=0.9)

In [10]:
documents.create_submission(output_path='submission_bm25_v.csv')

Computing idf, tf, avg_doc_len, doc_len
Loading df from pickle
Loading idf from pickle
Loading tf from pickle
Loading doc_len from pickle
Computing length_norm
Loading inverted index from pickle
Loading docid from pickle
Loading lang from pickle
Loading query from /kaggle/input/dis-project-1-document-retrieval/test.csv
Tokenizing corpus


100%|██████████| 2000/2000 [00:01<00:00, 1661.68it/s]
Calculating BM25 scores: 100%|██████████| 2000/2000 [09:23<00:00,  3.55it/s]

Time taken to process queries and compute BM25 scores: 9 min 24 sec



