# Ngrams & Dependency Analysis

In [None]:
import json
import math
import os
from collections import defaultdict

import matplotlib.gridspec as gridspec
import matplotlib.pyplot as plt
import nltk
import pandas as pd
import pyrootutils
import seaborn as sns
from nltk.lm.preprocessing import padded_everygram_pipeline
from nltk.parse import ViterbiParser

from formal_gym import grammar as fg_grammar

In [None]:
PROJECT_ROOT = pyrootutils.find_root(
    search_from=os.path.abspath(""), indicator=".project-root"
)

grammars_dir = PROJECT_ROOT / "data" / "grammars"
grammar_stats_filename = "grammar_stats.json"
samples_stats_filename = "filtered_samples_stats.json"

grammar_dirs = [
    f
    for f in grammars_dir.iterdir()
    if (f.is_dir())
    and (f / grammar_stats_filename).exists()
    and (f / samples_stats_filename).exists()
]

grammar_stats = []
for g in grammar_dirs:
    g_stats = json.load(open(g / grammar_stats_filename))
    s_stats = json.load(open(g / samples_stats_filename))
    merged = {**g_stats, **s_stats}
    grammar_stats.append(merged)
grammar_stats_df = pd.DataFrame(grammar_stats)

grammar_stats_df = (
    grammar_stats_df[
        (grammar_stats_df.coverage > 0.98)
        & (grammar_stats_df.n_terminals <= 100)
        & (grammar_stats_df.n_nonterminals <= 100)
        & (grammar_stats_df.n_lexical_productions <= 100)
        & (grammar_stats_df.n_nonlexical_productions <= 100)
    ]
    .drop(["median_positive_parses", "mean_positive_parses"], axis=1)
    .sort_values(by="grammar_name", ascending=True)
    .reset_index(drop=True)
)

# grammar_stats_df["grammar"] = grammar_stats_df["grammar_name"].apply(
#     lambda x: fg_grammar.Grammar.from_file(grammars_dir / x / f"{x}.cfg")
# )

grammar_stats_df

In [None]:
test_g_name = grammar_stats_df["grammar_name"].iloc[0]
test_g_name

In [None]:
test_grammar = fg_grammar.Grammar.from_file(
    grammars_dir / test_g_name / f"{test_g_name}.cfg"
)

test_grammar.as_pcfg

In [None]:
def get_ngram(
    grammar: str,
    n: int = 1,
) -> dict[tuple[str, ...], float]:
    counts = defaultdict(int)
    vocab = set()

    with open(grammars_dir / grammar / "filtered_positive_samples.txt") as f:
        for line in f:
            tokens = line.strip().split(" ")
            vocab.update(tokens)
            if len(tokens) < n:
                continue
            for i in range(len(tokens) - n + 1):
                ngram = tuple(tokens[i : i + n])
                counts[ngram] += 1

    V = len(vocab)
    total = sum(counts.values()) + V

    def get_prob(ngram: tuple[str, ...]) -> float:
        return (counts.get(ngram, 0) + 1) / total

    return get_prob, V


def string_logprob(s: str, get_prob, n: int) -> float:
    tokens = s.strip().split(" ")

    if len(tokens) < n:
        return float("-inf")

    logp = 0.0
    for i in range(len(tokens) - n + 1):
        ngram = tuple(tokens[i : i + n])
        logp += math.log(get_prob(ngram))

    return logp

In [None]:
test_string = "t14 t18 t4 t20 t14 t24 t12"
N = 4
get_p, V = get_ngram(test_g_name, n=N)
test_string_logprob = string_logprob(test_string, get_p, N)

test_string_logprob

In [None]:
def get_nltk_ngram(grammar: str, n: int = 1) -> nltk.lm.MLE:
    # open the file and read lines
    with open(grammars_dir / grammar / "filtered_positive_samples.txt") as f:
        lines = f.readlines()
    # tokenize the lines
    tokenized_lines = [line.strip().split(" ") for line in lines]

    train, vocab = padded_everygram_pipeline(2, tokenized_lines)
    lm = nltk.lm.models.Laplace(n)
    lm.fit(train, vocab)
    return lm, vocab


def get_ngram_string_logprob(s: str, lm: nltk.lm.MLE) -> float:
    tokens = s.strip().split(" ")

    logp = 0.0
    for i in range(len(tokens)):
        context = tuple(tokens[max(0, i - lm.order + 1) : i])
        logp += lm.logscore(tokens[i], context)
    return logp

In [None]:
lm, vocab = get_nltk_ngram(test_g_name, n=1)
get_ngram_string_logprob(test_string, lm)

In [None]:
def get_pcfg_string_logprob(s: str, grammar: str) -> float:
    grammar_object = fg_grammar.Grammar.from_file(
        grammars_dir / grammar / f"{grammar}.cfg"
    )

    parser = ViterbiParser(grammar_object.as_pcfg)
    trees = list(parser.parse(s.strip().split(" ")))

    if not trees:
        return float("-inf")

    total_prob = sum(tree.prob() for tree in trees)
    log_prob = math.log(total_prob) if total_prob > 0 else float("-inf")
    return log_prob

In [None]:
get_pcfg_string_logprob(test_string, test_g_name)

In [None]:
def kl_divergence(grammar: str, ns: list[int] = [1, 2, 3, 4, 5, 6]) -> float:
    strings_file = grammars_dir / grammar / "filtered_positive_samples.txt"
    with open(strings_file) as f:
        strings = [line.strip() for line in f if line.strip()]

    ngrams = {}
    for n in ns:
        ngrams[n], _ = get_nltk_ngram(grammar, n=n)

    print("Calculating PCFG log probabilities...")
    p_logprobs = [get_pcfg_string_logprob(s, grammar) for s in strings]

    qs_logprobs = {}
    for n in ns:
        print(f"Calculating {n}-gram log probabilities...")
        qs_logprobs[n] = [get_ngram_string_logprob(s, ngrams[n]) for s in strings]

    kl_divs = {}
    for n in ns:
        kl_divs[n] = sum(
            math.exp(p) * (p - q)
            for q, p in zip(qs_logprobs[n], p_logprobs)
            if p > float("-inf")
        ) / len(strings)

    return kl_divs

In [None]:
g2 = "grammar_20250319112222_327647"
kl_divs = kl_divergence(g2)

In [None]:
kl_divs_fmt = []
for n, div in kl_divs.items():
    kl_divs_fmt.append({"n": n, "kl_divergence": div})
kl_divs_df = pd.DataFrame(kl_divs_fmt)

sns.lineplot(
    data=kl_divs_df,
    x="n",
    y="kl_divergence",
    marker="o",
    markersize=8,
    linewidth=2,
)