In [1]:
import re
import numpy as np

In [2]:
docs = [
    "Pad Thai is a popular Thai stir-fried noodle dish with tamarind sauce, peanuts, and lime",
    "Sushi is a Japanese dish featuring vinegared rice combined with raw fish and vegetables",
    "Biryani is a fragrant Indian rice dish cooked with aromatic spices, meat, and saffron",
    "Pho is a Vietnamese soup with rice noodles, herbs, and beef or chicken broth",
    "Ramen consists of Chinese-style wheat noodles served in a flavorful meat or fish broth",
    "Dim sum includes various Chinese small bite-sized portions served in steamer baskets",
    "Kimchi is a traditional Korean fermented vegetable dish made primarily with napa cabbage",
    "Tom Yum is a hot and sour Thai soup with lemongrass, lime leaves, and shrimp",
    "Dumplings are filled dough pockets found across Asian cuisines with meat or vegetable fillings",
    "Butter chicken is a creamy Indian curry dish with tender chicken in tomato-based sauce"
]

In [3]:
queries = [
    "Thai soup spicy",
    "noodles broth Asian",
    "Indian rice spices"
]

In [4]:
def render_matrix(matrix, precision=2):
    """Pretty print a 2D numpy array"""
    rows, cols = matrix.shape
    
    # Find max width needed for formatting
    max_width = max(len(f"{val:.{precision}f}") for row in matrix for val in row)
    
    for i in range(rows):
        row_str = " ".join(f"{matrix[i, j]:>{max_width}.{precision}f}" for j in range(cols))
        print(row_str)


In [5]:
class Vocab:
    def __init__(self, tokenized_docs) -> None:
        self.vocab = self.compute_vocab(tokenized_docs)
        self.vocab_size = len(self.vocab)
        self.token2id = {token: idx for idx, token in enumerate(self.vocab)}

    def compute_vocab(self, tokenized_docs):
        vocab = set()
        for token_doc in tokenized_docs:
            vocab.update(token_doc)
        
        vocab = sorted(vocab)
        print(f"Vocab created of size: {len(vocab)}")
        return vocab

UNKNOWN_TOKEN_ID = -1

class BM25:
    def __init__(self, docs, k1=1.2, b=0.75) -> None:
        self.docs = [self.tokenize(doc) for doc in docs]
        self.doc_lengths = [len(doc) for doc in self.docs]
        self.avg_doclen = np.mean(self.doc_lengths)
        self.vocab = Vocab(tokenized_docs=self.docs)
        self.n_docs = len(self.docs)
        self.dxv_matrix = self._compute_doc_x_vocab_matrix()
        self.k1 = k1
        self.b = b
    
    def tokenize(self, text):
        # Lowercase and split on non-alphanumeric characters
        tokens = re.findall(r'\b\w+\b', text.lower())
        return tokens
    
    def _compute_doc_x_vocab_matrix(self):
        dxv_matrix = np.zeros(shape=(self.n_docs, self.vocab.vocab_size))
        # Iterate over all the docs
        for doc_id in range(self.n_docs):
            for token in self.docs[doc_id]:
                token_id = self.vocab.token2id[token]
                dxv_matrix[doc_id][token_id] += 1 # increase occurance count of token with id `token_id` in doc of id `doc_id`
        return dxv_matrix
        
    def compute_idf_term(self, token):
        # compute n_q -> # of documents that contains the query term q
        # self.n_docs
        try:
            token_id = self.vocab.token2id[token]
        except KeyError:
            token_id = UNKNOWN_TOKEN_ID
        
        if token_id != UNKNOWN_TOKEN_ID:
            n_q = np.sum(self.dxv_matrix[:, token_id] != 0)
            print(f"`{token}` appeared in {n_q}/{self.n_docs} docs")
            return np.log((self.n_docs-n_q+0.5)/(n_q+0.5)+1)
        else:
            # TODO: Need to rethink what to return here. for now I will just take a guess and return -1
            print(f"`{token}` not found in vocab.")
            return -1

    def compute_tf_term(self, token, doc_id):
        token_id = self.vocab.token2id[token]

        f_qd = self.dxv_matrix[doc_id][token_id]

        return f_qd
    
    def compute_doc_norm(self, doc_id):
        return (1 - self.b + (self.b * self.doc_lengths[doc_id]/self.avg_doclen))
    
    def compute_bm25(self, query: str):
        query_tokens = self.tokenize(query)
        score_final = np.zeros(self.n_docs)
        idf_out = np.zeros(len(query_tokens))
        for idx, token in enumerate(query_tokens):
            idf_out[idx] = self.compute_idf_term(token=token)
            
        for doc_id in range(self.n_docs):
            doc_norm = self.compute_doc_norm(doc_id=doc_id)
            score_d = 0.0
            for idx, token in enumerate(query_tokens):
                f_qd = self.compute_tf_term(token=token, doc_id=doc_id)
                score_dxt = idf_out[idx] * (f_qd*(self.k1+1)) / (f_qd + (self.k1 * doc_norm))
                score_d += score_dxt
            score_final[doc_id] = score_d

        results = []
        for doc_id in range(self.n_docs):
            res = {
                "doc": self.docs[doc_id],
                "bm25_score": score_final[doc_id]
            }
            results.append(res)
        
        sorted_results = sorted(results, key=lambda x: x["bm25_score"], reverse=True)
        return sorted_results, score_final

In [None]:
bm25 = BM25(docs=docs)

Vocab created of size: 92


In [7]:
query = "thai soup chicken"
scores_with_docs, scores_only = bm25.compute_bm25(query=query)

`thai` appeared in 2/10 docs
`soup` appeared in 2/10 docs
`chicken` appeared in 2/10 docs


In [8]:
print(scores_only)

[1.97129536 0.         0.         2.98886046 0.         0.
 0.         2.90503452 0.         2.00953994]


In [14]:
from rank_bm25 import BM25Okapi


def tokenize(text):
    # Lowercase and split on non-alphanumeric characters
    tokens = re.findall(r'\b\w+\b', text.lower())
    return tokens

tokenized_corpus = [tokenize(text=doc) for doc in docs]

bm25_kapi = BM25Okapi(tokenized_corpus, k1=1.2)


In [15]:
print(query)

thai soup chicken


In [16]:
tokenized_query = tokenize(query)

In [17]:
doc_scores = bm25_kapi.get_scores(tokenized_query)

In [18]:
print(doc_scores)

[1.62825016 0.         0.         2.46873838 0.         0.
 0.         2.39949985 0.         1.65983941]
