In [None]:
# read corpus index
# read docs in corpus one by one
# build unigram and bigram histograms
# also build sets of UUIDs of docs where words are present

# as an end result, we have a mapping: bigram -> freq, set(docs)
# and the same mapping for unigrams
# based on these, we can select most important bigrams and drop others
# also threshold unigrams based on frequency?
# we can also filter stopwords in place

# do stemming?

# new objective:
# for each doc, build an index of words (also remove stop-words and too specific ones?)

In [None]:
import pathlib

import pandas as pd
import plotly.express as px

In [None]:
corpus_index = pd.read_json("../data/index.json", lines=True)
corpus_index.head()

In [None]:
def read_doc(corpus_dir: pathlib.Path, doc_id: str):
    with open(corpus_dir / f"{doc_id}.txt", encoding="utf-8") as f:
        return f.read()

In [None]:
doc_ids = corpus_index["uuid"].to_list()

In [None]:
head_doc_ids = corpus_index.loc[:5, "uuid"]
head_doc_ids

In [None]:
CORPUS_DIR = pathlib.Path("../data/pages")

doc_contens = tuple(read_doc(CORPUS_DIR, d) for d in head_doc_ids)

In [None]:
doc_contens[0]

In [None]:
import re
import collections
import itertools


WORD_PATTERN = re.compile(r"(?u)\b\w\w+\b")
STOP_WORDS = {
    # articles
    "a",
    "an",
    "the",
    # personals
    "i",
    "me",
    "you",
    "your",
    "it",
    "he",
    "she",
    "this",
    "that",
    # proposals
    "to",
    "of",
    "in",
    "at",
    "by",
    "from",
    "out",
    "on",
    # common verbs
    "be",
    "is",
    "was",
    "were",
    "have",
    "been",
    # conjuctions
    "and",
    "or",
    # other
    "if",
    "also",
    "for",
    # modals
    "can",
    "could",
}


def build_collocations(text: str):
    text = text.lower()
    words = tuple(
        filter(
            lambda x: x not in STOP_WORDS and x.isalpha(),
            map(lambda x: x.group(), WORD_PATTERN.finditer(text)),
        )
    )

    # build unigram index
    unigrams = collections.defaultdict(int)
    for w in words:
        unigrams[w] += 1

    for ug in unigrams:
        unigrams[ug] /= len(unigrams)

    # build bigram index
    bigrams = collections.defaultdict(int)
    for word, collocant in itertools.pairwise(words):
        bigrams[(word, collocant)] += 1

    for bg in bigrams:
        bigrams[bg] /= len(bigrams)

    def bigram_mi(pair):
        x, y = pair
        return (bigrams[pair] / unigrams[x] * unigrams[y])

    # filter bigram index by mutual information
    mi_index = [(p, bigram_mi(p)) for p in bigrams]
    mi_index.sort(key=lambda x: x[-1])

    cutoff = len(mi_index) // 10
    collocations = {k: v for k, v in mi_index[- 5 * cutoff:]}

    return unigrams, bigrams, mi_index, collocations 

In [None]:
text = doc_contens[0]

words = tuple(
    filter(
        lambda x: x not in STOP_WORDS and x.isalpha(),
        map(lambda x: x.group(), WORD_PATTERN.finditer(text.lower())),
    )
)

print(words[:50])

In [None]:
unigrams, bigrams, mi_index, bigram_collocations = build_collocations(doc_contens[0])

In [None]:
len(unigrams)

In [None]:
len(bigrams)

In [None]:
len(mi_index)

In [None]:
len(bigram_collocations)

In [None]:
for i, (k, v) in enumerate(unigrams.items()):
    print(f"{k} -> {v:.4f}")
    if i > 10:
        break

for i, (k, v) in enumerate(bigram_collocations.items()):
    print(f"{k} -> {v:.4f}")
    if i > 10:
        break

In [None]:
mi_index_values = tuple(v for _, v in mi_index)

df = pd.DataFrame({"mi": mi_index_values})
df["freq"] = df["mi"] > df["mi"].quantile(q=0.6)

In [None]:
px.histogram(df, x="mi", color="freq")

In [None]:
def build_collocations(text: str):
    text = text.lower()
    words = tuple(
        filter(
            lambda x: x not in STOP_WORDS,
            map(lambda x: x.group(), WORD_PATTERN.finditer(text)),
        )
    )

    # build unigram index
    unigrams = collections.defaultdict(int)
    for w in words:
        unigrams[w] += 1

    # normalize unigrams
    for ug in unigrams:
        unigrams[ug] /= len(unigrams)

    # build bigram index
    bigrams = collections.defaultdict(int)
    for word, collocant in itertools.pairwise(words):
        bigrams[(word, collocant)] += 1

    # normalize bigrams
    for bg in bigrams:
        bigrams[bg] /= len(bigrams)

    def bigram_mi(pair):
        x, y = pair
        return (bigrams[pair] / unigrams[x] * unigrams[y])

    # filter bigram index by mutual information
    mi_index = [(p, bigram_mi(p)) for p in bigrams]
    mi_index.sort(key=lambda x: x[-1])

    cutoff = len(mi_index) // 10
    bigram_collocations = {k: v for k, v in mi_index[-cutoff:]}

    return unigrams, bigram_collocations 

In [None]:
from dataclasses import dataclass
from typing import Mapping, Sequence, Tuple


@dataclass(frozen=True, slots=True)
class DocPlainIndex:
    doc_id: str
    unigrams: Mapping[str, float]
    bigrams: Mapping[Tuple[str, str], float]

    def has_word(self, word: str):
        return word in self.unigrams
    
    def has_bigram(self, bigram: Tuple[str, str]):
        return bigram in self.bigrams


def build_plain_index(corpus_dir: pathlib.Path, doc_ids: Sequence[str]):
    def doc_words(doc_id: str):
        doc_content = read_doc(corpus_dir, doc_id)
        unigrams, bigrams = build_collocations(doc_content)
        return DocPlainIndex(doc_id, unigrams=unigrams, bigrams=bigrams)


    return tuple(doc_words(d) for d in doc_ids)


In [None]:
plain_index = build_plain_index(CORPUS_DIR, doc_ids=doc_ids)


In [None]:
def search_plain_index(idx: Sequence[DocPlainIndex], query: str) -> Sequence[str]:
    words = tuple(query.split())

    if len(words) > 2:
        raise ValueError("only single-word and bigram search is supported")

    if len(words) == 2:
        return [i for i in idx if i.has_bigram(words)]

    (word,) = words
    return [i for i in idx if i.has_word(word)]


In [None]:
words = (
    "game",
    "chess",
    "tea",
    "coffee",
    "cup tea",
    "cup coffee",
    "drink",
    "drink tea",
    "drink coffee",
    "sun",
    "look",
    "support",
    "coins",
    "play with",
    "ends with",
    "destroy",
    "frog",
    "fog",
    "knife",
    "cut",
    "watts",
    "power source",
)

print(f"total documents: {len(plain_index)}")

for word in words:
    print(f"{word} -> {len(search_plain_index(plain_index, word))}")