In [1]:
import json 

with open("./save/dataset.json", encoding="utf-8") as f:
    data = json.load(f)

In [7]:
import re
import unicodedata 

def preprocess(s: str):
    # don't know what this does ngl
    s = unicodedata.normalize("NFC", s).lower()
    # remove hyphens
    s = re.sub(r"[-‐-‒–—]+", " ", s)
    # accent folding
    s = unicodedata.normalize("NFKD", s).encode("ascii", "ignore").decode("ascii")
    # whitespace for single space 
    s = re.sub(r"\s+", " ", s).strip()
    return s

def tokenize(text):
    tokens = re.findall(r"(?u)\b[^\W\d_]+(?:'[^\W\d_]+)?\b", text)
    return [t.lower() for t in tokens]

docs = []
for d in data:
    doc = preprocess(d['title']) + " " + preprocess(d['text'])
    docs.append(tokenize(doc))

In [None]:
from collections import Counter, defaultdict
from heapq import nlargest
import math 

class BM25:
    def __init__(self, docs, k1=1.2, b=0.75):
        self.k1, self.b = k1, b
        self.N = len(docs)
        self.doc_len = [0]*self.N
        self.avgdl = 0.0
        self.df = defaultdict(int)          
        self.postings = defaultdict(list)   
       
        for i, tokens in enumerate(docs):
            self.doc_len[i] = len(tokens)
            self.avgdl += len(tokens)
            tf = Counter(tokens)
            for term, f in tf.items():
                self.postings[term].append((i, f))
                self.df[term] += 1
        self.avgdl /= max(1, self.N)
       
        self.idf = {
            t: math.log((self.N - df + 0.5) / (df + 0.5) + 1.0)
            for t, df in self.df.items()
        }
    
    def score(self, query, top_k=10):
        q_terms = tokenize(query)
        scores = defaultdict(float)
        
        for t in set(q_terms):
            if t not in self.postings: 
                continue

            idf = self.idf.get(t, 0.0)
            for doc_id, f in self.postings[t]:
                dl = self.doc_len[doc_id]
                denom = f + self.k1 * (1.0 - self.b + self.b * dl / self.avgdl)
                scores[doc_id] += idf * (f * (self.k1 + 1.0)) / denom
        
        return nlargest(top_k, scores.items(), key=lambda x: x[1])

In [11]:
bm25 = BM25(docs, k1=1.2, b=0.75)

nn_docs = bm25.score("quantitative finance option pricing black scholes", top_k=10)

for idx, _ in nn_docs:
    print(data[idx]['title'])

Robert C. Merton
Riccardo Rebonato
Risk-neutral measure
Jim Gatheral
Index of international trade articles
John Y. Campbell
Söhnke M. Bartram
The Oxford Companion to Music
Gregory D. Scholes
Discounted cash flow


In [20]:
def search_bm25(idx, top_k=10, prune_size=200):
    tokens = docs[idx]
    tf = Counter(tokens)
    
    # pick `prune_size` largest tf-idf to prune search (and ignore spurious/outliers)
    weighted = [(t, tf[t] * bm25.idf.get(t, 0.0)) for t in tf]
    weighted.sort(key=lambda x: x[1], reverse=True)
    
    q = " ".join(t for t, _ in weighted[:prune_size])
    res = bm25.score(q, top_k=top_k+1)
    
    print(f"Query: {data[res[0][0]]['title']}", end = "\n\n")
    for idx, _ in res[1:]:
        print(data[idx]['title']) 
    
    return [(d,s) for d,s in res if d != idx][:top_k]

In [22]:
_ = search_bm25(1)

Query: Four Pillars of Destiny

Records of Kangxi's Travel Incognito
Solar term
List of Chinese actors
Huang Tsung-hsing
The First Half of My Life
List of Chinese philosophers
Tseng Chang
1911 (film)
Eastern Wu family trees
Liao Hua
