# text-analysis

> Useful code for analyzing text.

In [None]:
#| default_exp common.text_analysis

In [None]:
#| hide
from nbdev.showdoc import *

In [None]:
#| hide
from fastcore.test import *

In [None]:
#| export
from collections import defaultdict
from typing import Dict, Iterable, Sequence, Tuple

In [None]:
#| export
import torch

In [None]:
#| export
def build_next_token_map(
    text: str, prefix_len: int, vocab_size: int, stoi: Dict[str, int]
) -> Dict[str, torch.Tensor]:
    """For a given body of text, build a map of all prefixes of a given
    length to the frequencies of the next token."""
    next_token_map: Dict[str, torch.Tensor] = defaultdict(
        lambda: torch.zeros(vocab_size, dtype=torch.long)
    )

    for i in range(
        len(text) - prefix_len
    ):  # This range ensures the last prefix has a next token
        prefix = text[i : i + prefix_len]
        next_token = text[i + prefix_len]
        next_token_map[prefix][stoi[next_token]] += 1

    # The loop above will have added every substring of length
    # `prefx_len` to the map, except for the very last one,
    # because it has no next token. But, it is useful to have
    # this last string in the map, with zeros for all the next
    # token counts (it is a valid substring of the right length
    # and calling code might want to look it up). We add it here.
    last_prefix = text[-prefix_len:]

    # Adding zero ensures the entry is unchanged if it exists, but
    # will added it (via the defaultdict's default factory) if it
    # doesn't.
    next_token_map[last_prefix] += 0

    return next_token_map

In [None]:
# Tests for build_next_token_map
test_text = "abcabcc"
test_prefix_len = 2
test_vocab_size = 3
test_stoi = {"a": 0, "b": 1, "c": 2}
test_next_token_map = build_next_token_map(
    test_text, test_prefix_len, test_vocab_size, test_stoi
)
test_eq(len(test_next_token_map), 4)
test_eq(test_next_token_map["ab"], torch.tensor([0, 0, 2]))
test_eq(test_next_token_map["bc"], torch.tensor([1, 0, 1]))
test_eq(test_next_token_map["ca"], torch.tensor([0, 1, 0]))

# The last substring should be in the map with all zeros
test_eq(test_next_token_map["cc"], torch.tensor([0, 0, 0]))

# Test the case where the last substring is already in the map.
test_next_token_map = build_next_token_map(
    "abcabc", test_prefix_len, test_vocab_size, test_stoi
)
test_eq(test_next_token_map["bc"], torch.tensor([1, 0, 0]))

In [None]:
#| export
def top_nonzero_tokens(freqs: torch.Tensor, itos: Dict[int, str]) -> Iterable[Tuple[str, float]]:
    k = torch.count_nonzero(freqs).item()
    assert isinstance(k, int) # keep mypy happy
    topk = torch.topk(freqs, k=k)
    return [(itos[i], freqs[i].item()) for i in topk.indices.tolist()]

In [None]:
# Tests for top_nonzero_tokens
itos = {0: 'a', 1: 'b', 2: 'c', 3: 'd'}

# All zeros
freqs = torch.tensor([0, 0, 0, 0])
test_eq(top_nonzero_tokens(freqs, itos), [])

# All non-zeros
freqs = torch.tensor([1, 2, 3, 4])
test_eq(top_nonzero_tokens(freqs, itos), [('d', 4), ('c', 3), ('b', 2), ('a', 1)])

# Some zeros
freqs = torch.tensor([0, 2, 0, 4])
test_eq(top_nonzero_tokens(freqs, itos), [('d', 4), ('b', 2)])

In [None]:
#| export
class SubstringFrequencyAnalysis:
    """Class that performs frequency analysis on a body of text for a set of substrings."""

    def __init__(
        self,
        substrs: Sequence[str],
        next_token_map: Dict[str, torch.Tensor],
        itos: Dict[int, str],
    ):
        # Need at least one string to determine the length
        # and for this to be useful.
        assert len(substrs) > 0

        self.freq_map = {
            s: next_token_map[s]
            for s in substrs
        }

        # Compute the cumulative frequencies
        vocab_size = len(next(iter(self.freq_map.values())))
        self.cumulative_freqs = torch.zeros(vocab_size, dtype=torch.long)
        for freqs in self.freq_map.values():
            self.cumulative_freqs += freqs

        # Normalize the cumulative frequencies
        self.norm_cumulative_freqs = self.cumulative_freqs.float() / self.cumulative_freqs.sum()

        # Figure out the top tokens for each substring
        self.top_tokens = {
            s: top_nonzero_tokens(freqs, itos)
            for s, freqs in self.freq_map.items()
        }
        self.top_tokens_cumulative = top_nonzero_tokens(
            self.norm_cumulative_freqs, itos
        )

    def print_summary(self):
        print(f"Substrings: {', '.join([repr(substr) for substr in self.freq_map.keys()])}")

        print("Top Tokens for each substring:")
        s_len = max([len(s) for s in self.freq_map.keys()])
        for s, tokens in self.top_tokens.items():
            print(
                f"{repr(s):>{2*s_len+2}}: {', '.join([f'{repr(token):>4} ({freq:>4})' for token, freq in tokens])}"
            )

        print("Cumulative Top Tokens:")
        print(
            ', '.join(
                [
                    f'{repr(token):>4} ({freq:.2f})'
                    for token, freq in self.top_tokens_cumulative
                ]
            )
        )

In [None]:
# Tests for SubstringFrequencyAnalysis
itos = {0: 'a', 1: 'b', 2: 'c', 3: 'd'}
stoi = {v: k for k, v in itos.items()}
text = 'aaabbbcccddd'
substrs = ['aaa', 'bbb', 'ccc', 'ddd']

next_token_map = build_next_token_map(text, len(substrs[0]), len(itos), stoi)
sfa = SubstringFrequencyAnalysis(
    substrs=substrs, next_token_map=next_token_map, itos=itos
)
test_eq(sfa.freq_map['aaa'].tolist(), [0, 1, 0, 0])
test_eq(sfa.freq_map['bbb'].tolist(), [0, 0, 1, 0])
test_eq(sfa.freq_map['ccc'].tolist(), [0, 0, 0, 1])
test_eq(sfa.freq_map['ddd'].tolist(), [0, 0, 0, 0])

test_close(sfa.norm_cumulative_freqs.tolist(), [0, 1/3, 1/3, 1/3])

test_eq(sfa.top_tokens['aaa'], [('b', 1)])
test_eq(sfa.top_tokens['bbb'], [('c', 1)])
test_eq(sfa.top_tokens['ccc'], [('d', 1)])

In [None]:
#| hide
import nbdev; nbdev.nbdev_export()