# DIS Project 1: Document Retrieval
EPFL CS-423 (Distributed Information Systems)

Team: "TIIM" - André Santo (376762), Vincent Fiszbin (394790), Rasmus Veski (395667)

# Imports

In [1]:
%pip install -U PyStemmer
%pip install -U konlpy

Note: you may need to restart the kernel to use updated packages.
Note: you may need to restart the kernel to use updated packages.


In [2]:
from concurrent.futures import ThreadPoolExecutor
from collections import Counter
from functools import partial
import os
from typing import Dict, List, NamedTuple
import jax.lax
from tqdm.notebook import tqdm
import numpy as np
import math
import scipy.sparse as sp
import re
from typing import Dict, List, Union, NamedTuple, Tuple
import Stemmer
from konlpy.tag import Kkma
import pickle
import gc
import json
import pandas as pd
import sys
import time

sys.path.append(os.path.abspath('/kaggle/input/stopwords'))

from stopwords import (
    STOPWORDS_EN,
    STOPWORDS_EN_PLUS,
    STOPWORDS_GERMAN,
    STOPWORDS_DUTCH,
    STOPWORDS_FRENCH,
    STOPWORDS_SPANISH,
    STOPWORDS_PORTUGUESE,
    STOPWORDS_ITALIAN,
    STOPWORDS_RUSSIAN,
    STOPWORDS_SWEDISH,
    STOPWORDS_NORWEGIAN,
    STOPWORDS_CHINESE,
    STOPWORDS_ARABIC,
    STOPWORDS_KOREAN
)

# BM25 Implementation

In [3]:
# Specify data types for memory efficiency and performance
FLOAT_TYPE = "float32"
INT_TYPE = "int32"

class TokenData(NamedTuple):
    ids: List[List[int]]
    vocab: Dict[str, int]

class SearchResults(NamedTuple):
    documents: np.ndarray
    scores: np.ndarray

def compute_doc_frequencies(tokenized_corpus, unique_tokens, show_progress=True) -> dict:
    """
    Compute document frequencies (DF) for each token in the corpus.

    Parameters:
    tokenized_corpus (List[List[int]]): List of tokenized documents.
    unique_tokens (List[int]): List of unique token IDs.
    show_progress (bool): Whether to show a progress bar.

    Returns:
    dict: Dictionary with token IDs as keys and document frequencies as values.
    """
    unique_tokens = set(unique_tokens)
    doc_freqs = {token: 0 for token in unique_tokens}
    for doc_tokens in tqdm(tokenized_corpus, disable=not show_progress, desc="Counting Tokens"):
        for token in unique_tokens.intersection(doc_tokens):
            doc_freqs[token] += 1
    return doc_freqs

def create_idf_array(doc_freqs: dict, total_docs: int) -> np.ndarray:
    """
    Compute the Inverse Document Frequency (IDF) for each token using the document frequencies.

    Parameters:
    doc_freqs (dict): Dictionary with token IDs as keys and document frequencies as values.
    total_docs (int): Total number of documents in the corpus.

    Returns:
    np.ndarray: Array of IDF values.
    """
    idf_array = np.zeros(len(doc_freqs), dtype=FLOAT_TYPE)
    for token_id, df in doc_freqs.items():
        idf_array[token_id] = math.log(1 + (total_docs - df + 0.5) / (df + 0.5)) # Lucene variant
    return idf_array

def compute_term_frequency(tf_array, doc_len, avg_doc_len, k1, b):
    """
    Compute term frequency using the BM25 formula.

    Parameters:
    tf_array (np.ndarray): Array of term frequencies.
    doc_len (int): Length of the document.
    avg_doc_len (float): Average document length in the corpus.
    k1 (float): BM25 parameter.
    b (float): BM25 parameter.

    Returns:
    np.ndarray: Array of term frequency values.
    """
    return tf_array / (k1 * ((1 - b) + b * doc_len / avg_doc_len) + tf_array) # Robertson variant

def get_token_counts(token_ids):
    """
    Get token counts from a list of token IDs.

    Parameters:
    token_ids (List[int]): List of token IDs.

    Returns:
    Tuple[np.ndarray, np.ndarray]: Arrays of unique token IDs and their counts.
    """
    token_counter = Counter(token_ids)
    return np.array(list(token_counter.keys()), dtype=INT_TYPE), np.array(list(token_counter.values()), dtype=FLOAT_TYPE)

def create_score_matrix(corpus_token_ids, idf_array, avg_doc_len, doc_freqs, k1, b, show_progress=True):
    """
    Create the BM25 score matrix for the corpus.
    Compute the BM25 scores for each token in each document of the corpus, the scores along with 
    the corresponding document and vocabulary indices.

    Parameters:
    corpus_token_ids (List[List[int]]): List of tokenized documents.
    idf_array (np.ndarray): Array of IDF values.
    avg_doc_len (float): Average document length in the corpus.
    doc_freqs (dict): Dictionary with token IDs as keys and document frequencies as values.
    k1 (float): BM25 parameter.
    b (float): BM25 parameter.
    show_progress (bool): Whether to show a progress bar.

    Returns:
    Tuple[np.ndarray, np.ndarray, np.ndarray]: Arrays of scores, document indices, and vocabulary indices.
    """
    array_size = sum(doc_freqs.values())
    
    # Initialize arrays to store scores, document indices, and vocabulary indices
    scores = np.empty(array_size, dtype=FLOAT_TYPE)
    doc_indices = np.empty(array_size, dtype=INT_TYPE)
    vocab_indices = np.empty(array_size, dtype=INT_TYPE)
    
    i = 0
    # Iterate over each document in the corpus
    for doc_idx, token_ids in enumerate(tqdm(corpus_token_ids, desc="Computing Scores", disable=not show_progress)):
        doc_len = len(token_ids)
        
        # Get term frequencies in the current document
        vocab_indices_doc, tf_array = get_token_counts(token_ids)
        
        # Compute BM25 scores (for each token) for the current document
        scores_doc = idf_array[vocab_indices_doc] * compute_term_frequency(tf_array, doc_len, avg_doc_len, k1, b)
        
        doc_len = len(scores_doc)
        
        # Determine the start and end indices for the current document scores in the arrays
        start = i
        end = i + doc_len
        i = end  # position where next document's scores will start
        
        # Store the computed scores and corresponding indices in the arrays
        doc_indices[start:end] = doc_idx
        vocab_indices[start:end] = vocab_indices_doc
        scores[start:end] = scores_doc
    
    return scores, doc_indices, vocab_indices

def tokens_to_strings(token_data: TokenData) -> List[List[str]]:
    """
    Convert token IDs back to strings.

    Parameters:
    token_data (TokenData): TokenData object containing token IDs and vocabulary.

    Returns:
    List[List[str]]: List of documents with token strings.
    """
    reverse_vocab = {v: k for k, v in token_data.vocab.items()}
    return [[reverse_vocab[token_id] for token_id in doc_ids] for doc_ids in token_data.ids]

def get_top_k(query_scores, k):
    """
    Get the top k scores and their indices.

    Parameters:
    query_scores (np.ndarray): Array of query scores.
    k (int): Number of top scores to retrieve.

    Returns:
    Tuple[np.ndarray, np.ndarray]: Arrays of top k scores and their indices.
    """
    topk_scores, topk_indices = jax.lax.top_k(query_scores, k)
    return np.asarray(topk_scores), np.asarray(topk_indices)

class BM25:
    def __init__(self, k1=1.5, b=0.75):
        """
        Initialize the BM25 object with parameters.

        Parameters:
        k1 (float): BM25 parameter.
        b (float): BM25 parameter.
        """
        self.k1 = k1
        self.b = b

    def compute_relevance_scores(self, query_token_ids):
        """
        Compute relevance scores for a query. Using precomputed BM25 scores.

        Parameters:
        data (np.ndarray): Array of score data.
        indptr (np.ndarray): Index pointer array.
        indices (np.ndarray): Array of indices.
        num_docs (int): Number of documents.
        query_token_ids (np.ndarray): Array of query token IDs.

        Returns:
        np.ndarray: Array of relevance scores for the query
        """
        data = self.scores["data"] 
        indptr = self.scores["indptr"]
        indices = self.scores["indices"]
        num_docs = self.scores["num_docs"]
        scores = np.zeros(num_docs, dtype=FLOAT_TYPE)
        for i in range(len(query_token_ids)):
            start, end = indptr[query_token_ids[i]], indptr[query_token_ids[i] + 1]
            np.add.at(scores, indices[start:end], data[start:end])
        return scores

    def build_index(self, unique_token_ids, corpus_token_ids, show_progress=True):
        """
        Build the index for the corpus.

        Parameters:
        unique_token_ids (List[int]): List of unique token IDs.
        corpus_token_ids (List[List[int]]): List of tokenized documents.
        show_progress (bool): Whether to show a progress bar.

        Returns:
        dict: Dictionary containing score matrix data, indices, index pointer, and number of documents.
        """
        avg_doc_len = np.mean([len(doc_ids) for doc_ids in corpus_token_ids])
        total_docs = len(corpus_token_ids)
        doc_freqs = compute_doc_frequencies(corpus_token_ids, unique_token_ids, show_progress)
        idf_array = create_idf_array(doc_freqs, total_docs)
        scores, doc_indices, vocab_indices = create_score_matrix(corpus_token_ids, idf_array, avg_doc_len, doc_freqs, self.k1, self.b, show_progress)
        score_matrix = sp.csc_matrix((scores, (doc_indices, vocab_indices)), shape=(total_docs, len(unique_token_ids)), dtype=FLOAT_TYPE) # efficiently stores BM25 scores for each term in each document
        return {"data": score_matrix.data, "indices": score_matrix.indices, "indptr": score_matrix.indptr, "num_docs": total_docs}

    def index_corpus(self, corpus: TokenData, show_progress=True):
        """
        Index the corpus.

        Parameters:
        corpus (TokenData): TokenData object containing tokenized documents and vocabulary.
        show_progress (bool): Whether to show a progress bar.
        """
        self.scores = self.build_index(list(corpus.vocab.values()), corpus.ids, show_progress)
        self.vocab_dict = corpus.vocab

    def dump(self, scores_path, vocab_path):
        """
        Save the BM25 index and vocabulary to disk.

        Parameters:
        scores_path (str): Path to save the BM25 scores.
        vocab_path (str): Path to save the vocabulary dictionary.
        """
        # Save self.scores
        np.savez_compressed(scores_path, 
                            data=self.scores['data'], 
                            indices=self.scores['indices'], 
                            indptr=self.scores['indptr'], 
                            num_docs=self.scores['num_docs'])
        
        # Save self.vocab_dict
        with open(vocab_path, 'w') as vocab_file:
            json.dump(self.vocab_dict, vocab_file)

    def load(self, scores_path, vocab_path):
        """
        Load the BM25 index and vocabulary from disk.

        Parameters:
        scores_path (str): Path to load the BM25 scores from.
        vocab_path (str): Path to load the vocabulary dictionary from.
        """
        # Load self.scores
        loaded = np.load(scores_path)
        self.scores = {
            "data": loaded['data'], 
            "indices": loaded['indices'], 
            "indptr": loaded['indptr'], 
            "num_docs": loaded['num_docs']
        }
        
        # Load self.vocab_dict
        with open(vocab_path, 'r') as vocab_file:
            self.vocab_dict = json.load(vocab_file)

    def get_token_ids(self, query_tokens: List[str]) -> List[int]:
        """
        Get token IDs for a query.

        Parameters:
        query_tokens (List[str]): List of query tokens.

        Returns:
        List[int]: List of token IDs.
        """
        return [self.vocab_dict[token] for token in query_tokens if token in self.vocab_dict] # words not in vocab are ignored

    def retrieve_top_k(self, query_tokens: List[str], lang: str, k: int = 10, doc_index_to_lang = None, filter_by_lang: bool = False):
        """
        Retrieve the top k documents for a query.

        Parameters:
        query_tokens (List[str]): List of query tokens.
        lang (str): Language of the documents to search.
        k (int): Number of top documents to retrieve.
        doc_index_to_lang (dict): Dictionary mapping document indices to their languages.
        filter_by_lang (bool): Whether to filter documents by language.

        Returns:
        Tuple[np.ndarray, np.ndarray]: Arrays of top k scores and their indices.
        """
        query_tokens_ids = self.get_token_ids(query_tokens)
        scores = self.compute_relevance_scores(np.asarray(query_tokens_ids, dtype=INT_TYPE))

        if filter_by_lang and lang is not None:
            # Filter scores based on document language
            filtered_scores = [(score, idx) for idx, score in enumerate(scores) if doc_index_to_lang[idx] == lang]
            if not filtered_scores:
                return np.array([]), np.array([])

            # Convert filtered scores to arrays
            filtered_scores_array = np.array([score for score, _ in filtered_scores], dtype=FLOAT_TYPE)
            filtered_indices_array = np.array([idx for _, idx in filtered_scores], dtype=INT_TYPE)

            # Get top k scores and their indices
            top_k_scores, top_k_indices = get_top_k(filtered_scores_array, k)

            # Map top k indices back to original document indices
            top_k_indices = filtered_indices_array[top_k_indices]
        else:
            # Get top k scores and their indices without filtering by language
            top_k_scores, top_k_indices = get_top_k(scores, k)

        return top_k_scores, top_k_indices

    def search(self, queries: TokenData, langs: List[str], k: int = 10, show_progress: bool = True, n_threads: int = 0, chunksize: int = 50, doc_index_to_lang = None, filter_by_lang: bool = False):
        """
        Retrieve the top-k documents for each query.

        Parameters:
        queries (TokenData): A TokenData object with tokenized queries.
        langs (List[str]): List of languages corresponding to each query.
        k (int): Number of documents to retrieve for each query.
        show_progress (bool): Whether to show a progress bar.
        n_threads (int): Number of jobs to run in parallel. If -1, it will use all available CPUs. If 0, it will run the jobs sequentially.
        chunksize (int): Number of batches to process in each job in the multiprocessing pool.
        doc_index_to_lang (dict): Dictionary mapping document indices to their languages.
        filter_by_lang (bool): Whether to filter documents by language.

        Returns:
        Tuple of top k document ids retrieved and their scores.
        """
        if n_threads == -1:
            n_threads = os.cpu_count()
        queries = tokens_to_strings(queries)

        queries_with_langs = zip(queries, langs)

        topk_fn = partial(self.retrieve_top_k, k=k, doc_index_to_lang=doc_index_to_lang, filter_by_lang=filter_by_lang) # new function with all these args already set
        if n_threads == 0:
            out = list(tqdm(map(lambda ql: topk_fn(ql[0], ql[1]), queries_with_langs), total=len(queries), desc="Retrieving Documents", disable=not show_progress))
        else:
            with ThreadPoolExecutor(max_workers=n_threads) as executor:
                out = list(tqdm(executor.map(lambda ql: topk_fn(ql[0], ql[1]), queries_with_langs, chunksize=chunksize), total=len(queries), desc="Retrieving Documents", disable=not show_progress))
        scores, indices = zip(*out)
        return SearchResults(documents=np.array(indices), scores=np.array(scores))

# Preprocessing functions

In [4]:
class KoreanStemmer:
    def __init__(self):
        self.kkma = Kkma()

    def stem_words(self, words):
        return [self.kkma.morphs(word) for word in words]

stemmers = {
    "en": Stemmer.Stemmer("english"),
    "fr": Stemmer.Stemmer("french"),
    # "es": Stemmer.Stemmer("spanish"),
    "de": Stemmer.Stemmer("german"),
    "it": Stemmer.Stemmer("italian"),
    "ar": Stemmer.Stemmer("arabic"),
    "ko": KoreanStemmer()
}

class TokenData(NamedTuple):
    ids: List[List[int]]
    vocab: Dict[str, int]

def get_stopwords(language: str) -> List[str]:
    """
    Get stopwords for a given language.
    """
    if language in ["english", "en"]: 
        return STOPWORDS_EN
    elif language in ["english_plus", "en_plus"]:
        return STOPWORDS_EN_PLUS
    elif language in ["german", "de"]:
        return STOPWORDS_GERMAN
    elif language in ["dutch", "nl"]:
        return STOPWORDS_DUTCH
    elif language in ["french", "fr"]:
        return STOPWORDS_FRENCH
    # elif language in ["spanish", "es"]:
    #     return STOPWORDS_SPANISH
    elif language in ["portuguese", "pt"]:
        return STOPWORDS_PORTUGUESE
    elif language in ["italian", "it"]:
        return STOPWORDS_ITALIAN
    elif language in ["russian", "ru"]:
        return STOPWORDS_RUSSIAN
    elif language in ["swedish", "sv"]:
        return STOPWORDS_SWEDISH
    elif language in ["norwegian", "no"]:
        return STOPWORDS_NORWEGIAN
    elif language in ["chinese", "zh"]:
        return STOPWORDS_CHINESE
    elif language in ["arabic", "ar"]:
        return STOPWORDS_ARABIC
    elif language in ["korean", "ko"]:
        return STOPWORDS_KOREAN
    else:
        # print(f"{language} stopwords not supported, defaulting to English stopwords")
        return STOPWORDS_EN

def tokenize(
    texts: List[Tuple[str, str]],  # list of tuples (text, language)
    lower: bool = True,
    return_ids: bool = True,
    show_progress: bool = True,
    leave: bool = False,
) -> Union[List[List[str]], TokenData]:
    """
    Tokenize a list of texts with optional stemming and stopwords removal.

    Parameters
    ----------
    texts : List[Tuple[str, str]]
        A list of tuples where each tuple contains a text and its language.

    lower : bool, optional
        Convert text to lowercase before tokenization.

    return_ids : bool, optional
        Return token IDs and vocabulary if True, else return tokenized strings.

    show_progress : bool, optional
        Show progress bar if True.

    leave : bool, optional
        Leave progress bar after completion if True.

    Returns
    -------
    Union[List[List[str]], TokenData]
        Tokenized texts as strings or token IDs with vocabulary.
    """
    token_pattern = r"(?u)\b\w\w+\b"
    split_fn = re.compile(token_pattern).findall

    corpus_ids = []
    token_to_index = {}

    for text, language in tqdm(texts, desc="Tokenizing texts", leave=leave, disable=not show_progress):
        stopwords_set = set(get_stopwords(language))
        if lower:
            text = text.lower()

        tokens = split_fn(text)
        doc_ids = []

        for token in tokens:
            if token in stopwords_set:
                continue

            if token not in token_to_index:
                token_to_index[token] = len(token_to_index)

            token_id = token_to_index[token]
            doc_ids.append(token_id)

        corpus_ids.append(doc_ids)

    unique_tokens = list(token_to_index.keys())

    stemmer = stemmers.get(language, Stemmer.Stemmer("english"))
    if hasattr(stemmer, "stemWords"):
        stemmer_fn = stemmer.stemWords
    elif callable(stemmer):
        stemmer_fn = stemmer
    else:
        raise ValueError("Stemmer must have a `stemWords` method or be callable")

    stemmed_tokens = stemmer_fn(unique_tokens)
    vocab = set(stemmed_tokens)
    vocab_dict = {token: i for i, token in enumerate(vocab)}
    stem_id_to_stem = {v: k for k, v in vocab_dict.items()}
    doc_id_to_stem_id = {token_to_index[token]: vocab_dict[stem] for token, stem in zip(unique_tokens, stemmed_tokens)}

    for i, doc_ids in enumerate(tqdm(corpus_ids, desc="Stemming tokens", leave=leave, disable=not show_progress)):
        corpus_ids[i] = [doc_id_to_stem_id[doc_id] for doc_id in doc_ids]

    if return_ids:
        return TokenData(ids=corpus_ids, vocab=vocab_dict)
    else:
        reverse_dict = stem_id_to_stem if stemmers is not None else unique_tokens
        for i, token_ids in enumerate(tqdm(corpus_ids, desc="Reconstructing token strings", leave=leave, disable=not show_progress)):
            corpus_ids[i] = [reverse_dict[token_id] for token_id in token_ids]
        return corpus_ids

# Load model

Two objects are loaded into the model :
- Scores: a dictionary that contains the BM25 score matrix components. This sparse matrix is a way to efficiently store BM25 scores for each term in each document, allowing for fast retrieval and ranking of documents.
- Vocab: a dictionary mapping each token to its ID. It is used to convert query tokens into their corresponding token IDs, which are then used to look up BM25 scores in the score matrix.

In [5]:
# Load model from disk
start_time = time.time()
retriever = BM25() 
retriever.load('/kaggle/input/saved-objects4/bm25_scores.npz', '/kaggle/input/saved-objects4/bm25_vocab.json')
end_time = time.time()
print(f"Loaded model from disk. It took {end_time - start_time} seconds")

Loaded model from disk. It took 25.250093936920166 seconds


In [6]:
# Load dict to match document index and corresponding docid
with open('/kaggle/input/saved-objects/doc_index_to_docid.json', 'r') as f:
    doc_index_to_docid = json.load(f)
doc_index_to_docid = {int(key): value for key, value in doc_index_to_docid.items()} # reconvert keys to int

# Load dict to match document index and corresponding lang
with open('/kaggle/input/saved-objects/doc_index_to_lang.json', 'r') as f:
    doc_index_to_lang = json.load(f)
doc_index_to_lang = {int(key): value for key, value in doc_index_to_lang.items()} # reconvert keys to int

# Predictions on test set

In [7]:
# Load test set
test_set_path = '/kaggle/input/dis-project-1-document-retrieval/test.csv'
test_set = pd.read_csv(test_set_path)

In [8]:
# Retrieve docs for each query in test set
queries_and_lang = []
langs = []
for index, row in test_set.iterrows():
    queries_and_lang.append((row['query'], row['lang'])) # to preprocess according to lang
    langs.append(row['lang']) # to associate query with its lang during retrieval

start_time = time.time()
queries_tokens = tokenize(queries_and_lang)
end_time = time.time()
print(f"Tokenized queries. It took {end_time - start_time} seconds")

start_time = time.time()
retrieved_docs_indices, scores = retriever.search(queries_tokens, langs, k=10, n_threads=-1, doc_index_to_lang=doc_index_to_lang, filter_by_lang=False)
end_time = time.time()
print(f"Searched and retrieved similar documents. It took {end_time - start_time} seconds")

Tokenizing texts:   0%|          | 0/2000 [00:00<?, ?it/s]

Stemming tokens:   0%|          | 0/2000 [00:00<?, ?it/s]

Tokenized queries. It took 0.14459896087646484 seconds


Retrieving Documents:   0%|          | 0/2000 [00:00<?, ?it/s]

Searched and retrieved similar documents. It took 5.620001792907715 seconds


In [9]:
# Get documents retrieved for each query in test set
predicted_docs = []
for index, row in test_set.iterrows():
    query_id = row['id']
    retrieve_docs_ids = [doc_index_to_docid[doc_index] for doc_index in retrieved_docs_indices[index]]
    predicted_docs.append((query_id, retrieve_docs_ids))

# Create Dataframe with results
results_df = pd.DataFrame(predicted_docs, columns=['id', 'docids'])

# Save to csv
results_df.to_csv('submission.csv', index=False)