In [None]:
import re
from collections import Counter, defaultdict
from typing import Dict, Iterable, List, Tuple

import ipywidgets as widgets
import matplotlib.pyplot as plt
import nltk
import numpy as np
import pandas as pd
from ipywidgets import interact
from sklearn.model_selection import train_test_split

plt.style.use("ggplot")

In [None]:
SEED = 42
np.random.seed(SEED)

DATA_PATH = "./data/lm_dataset_jsonl.gz"

Uncomment if you are using colab:

In [None]:
# !mkdir ./data
# !wget https://raw.githubusercontent.com/vadim0912/ML2023/main/lecture08/data/lm_dataset_jsonl.gz -O $DATA_PATH

# Dataset

In [None]:
df = pd.read_json(DATA_PATH, lines=True)

In [None]:
text_by_year = df["timestamp"].apply(lambda x: x.year).value_counts().sort_index()

In [None]:
plt.bar(text_by_year.index, text_by_year.values)
plt.xlabel("year")
plt.ylabel("number of pages")
plt.show()

In [None]:
texts = df["text"].apply(lambda x: x.split("\n")).explode().reset_index(drop=True)

In [None]:
text_by_length = texts.str.len().value_counts().sort_index()

plt.plot(text_by_length.index, text_by_length.values)
plt.xlabel("length")
plt.ylabel("number of pages")
plt.xscale("log")

In [None]:
def normalize_and_tokenize(text: str) -> List[str]:
    text = text.lower().replace("ё", "е")
    text = re.sub('[^а-я0-9a-z,.\-?!–«»"": ]', " ", text)
    text = re.sub(" +", " ", text).strip()
    text = nltk.wordpunct_tokenize(text)
    return text

In [None]:
tokenized_texts = texts.apply(normalize_and_tokenize).values.tolist()

In [None]:
train_set, val_set = train_test_split(
    tokenized_texts, test_size=0.05, random_state=SEED
)

# N-gram Language Model

$$
    P(w_1, \dots, w_k) = \prod_t P(w_t \mid w_{t - n + 1}, \dots, w_{t - 1}).
$$

$$
    P(w_t \mid w_{t - n + 1}, \dots, w_{t - 1}) \approx \frac{\text{count} (w_{t - n + 1} \dots, w_{t - 1}, w_t)}{\displaystyle \sum_w \text{count} (w_{t - n + 1} \dots w_{t - 1}, w)}=\frac{\text{count} (w_{t - n + 1} \dots, w_{t - 1}, w_t)}{\text{count} (w_{t - n + 1} \dots w_{t - 1})}
$$

In [None]:
BOS = "<BOS>"
EOS = "<EOS>"

ngrams_config = {
    "pad_left": True,
    "pad_right": True,
    "left_pad_symbol": BOS,
    "right_pad_symbol": EOS,
}


def build_ngram_counts(
    tokenized_texts: Iterable[Iterable[str]], n: int
) -> Dict[Tuple[str, ...], Dict[str, int]]:
    """
    встречаемость слова при условии (n - 1) предыдущего слова

    для n = 2:
        {
            ('добрый',): {'день': 273, 'вечер': 55, 'путь': 8, ...},
            ('машинное',): {'масло': 2, 'отделение': 6, 'обучение': 4, ...}
            ...
        }

    для n = 3:
         {
            ('<BOS>', '<BOS>'): {'мэр': 22, 'выпуск': 40, ...},
            ('<BOS>', 'мэр'): {'москвы': 3, 'перми': 3, ...},
            ...
        }
    """

    counts = defaultdict(Counter)

    raise NotImplementedError

    return counts

In [None]:
class LanguageModel:
    def __init__(self, tokenized_texts: Iterable[Iterable[str]], n: int) -> None:
        self.n: int = n
        self.probs: Dict[Tuple[str, ...], Dict[str, float]] = defaultdict(Counter)

    def get_token_distribution(self, prefix: List[str]) -> Dict[str, float]:
        prefix = prefix[max(0, len(prefix) - self.n + 1) :]
        prefix = [BOS] * (self.n - 1 - len(prefix)) + prefix
        return self.probs[tuple(prefix)]

    def get_next_token_prob(self, prefix: List[str], token: str) -> float:
        return self.get_token_distribution(prefix)[token]

In [None]:
%%time

model = LanguageModel(train_set, n=2)

In [None]:
def plot_distribution(
    distribution: Dict[str, float],
    top_k: int = None,
    title: str = None,
    xlim: bool = True,
) -> None:
    data = sorted(distribution.items(), key=lambda x: -x[1])
    if top_k:
        data = data[:top_k]

    with plt.xkcd():
        plt.barh([word for word, count in data], [count for word, count in data])
        if xlim:
            plt.xlim([0, 1])
        if title:
            plt.title(title)
        plt.gca().invert_yaxis()
        plt.xlabel("probability")
        plt.grid()
        plt.show()


for word in ("вряд", "хочу"):
    plot_distribution(model.probs[(word,)], top_k=15, title=word + " ...")

# Generation

$$
p(i)_T = \frac{p(i) ^ \frac{1}{T}}{\displaystyle \sum_j p(j) ^ \frac{1}{T}}
$$

In [None]:
def get_next_token(
    lm: LanguageModel, prefix: List[str], temperature: float = 1.0
) -> str:
    """
    * получение распределения вероятностей для следующего слова при условии префикса
    * сэмплирование из полученного распределения с температурой
    """

    distribution: Dict[str, float] = lm.get_token_distribution(prefix)

    raise NotImplementedError

    return ""

In [None]:
@interact(
    word="компьютер",
    temperature=widgets.FloatSlider(
        value=1, min=0.01, max=3.0, step=0.2, description="Temperature:"
    ),
    top_k=widgets.IntSlider(value=10, min=5, max=20, step=1, description="top_k:"),
)
def plot_with_temperature(word: str, temperature: float, top_k: int):
    distr = model.get_token_distribution(prefix=[word])
    distr = {k: v ** (1.0 / temperature.real) for k, v in distr.items()}
    norm = sum(distr.values())
    distr = {k: v / norm for k, v in distr.items()}

    title = f"{word} (T = {round(temperature.real, 2):.2f})"
    plot_distribution(distr, top_k=top_k, title=title)

# Perplexity

$$
P(w_1, \dots, w_N)^{-\frac1N} = \left( \prod_t P(w_t \mid w_{t - n + 1}, \dots, w_{t - 1})\right)^{-\frac1N} = \frac{1}{\sqrt[\leftroot{-2}\uproot{2}N]{\displaystyle \prod_t P(w_t \mid w_{t - n + 1}, \dots, w_{t - 1})}}
$$

$$
e^{\displaystyle \log P(w_1, \dots, w_N)^{-\frac1N}} = e^{\displaystyle -\frac1N \log P(w_1, \dots, w_N)} = e^{\displaystyle -\frac1N \log \left( \prod_t P(w_t \mid w_{t - n + 1}, \dots, w_{t - 1}) \right)} = e^{\displaystyle -\frac1N \sum_t \log P(w_t \mid w_{t - n + 1}, \dots, w_{t - 1})}
$$

In [None]:
def perplexity(
    lm: LanguageModel,
    tokenized_texts: Iterable[Iterable[str]],
    min_logprob: float = np.log(10**-50.0),
) -> float:
    logprobs_sum: float = 0.0
    N: int = 0
    for tokens in tokenized_texts:
        prefix = [BOS] * (lm.n - 1)
        padded_tokens = tokens + [EOS]
        for token in padded_tokens:
            logprob = np.log(lm.get_next_token_prob(prefix, token))
            prefix = prefix[1:] + [token]
            logprobs_sum += max(logprob, min_logprob)
        N += len(padded_tokens)
    return np.exp(-logprobs_sum / N)

In [None]:
perplexity(model, val_set)

# Laplace Smoothing

Maximum Likelihood Estimation: $$
    P(w_t \mid w_{t - n + 1}, \dots, w_{t - 1}) \approx \frac{\text{count} (w_{t - n + 1} \dots, w_{t - 1}, w_t)}{\displaystyle \sum_w \text{count} (w_{t - n + 1} \dots w_{t - 1}, w)}=\frac{\text{count} (w_{t - n + 1} \dots, w_{t - 1}, w_t)}{\text{count} (w_{t - n + 1} \dots w_{t - 1})}
$$

Laplace Smoothing: $$
P(w_t \mid w_{t - n + 1}, \dots, w_{t - 1}) \approx \frac{\text{count} (w_{t - n + 1} \dots, w_{t - 1}, w_t) + 1}{\displaystyle \sum_w \left( \text{count} (w_{t - n + 1} \dots w_{t - 1}, w) + 1 \right)} = \frac{\text{count} (w_{t - n + 1} \dots, w_{t - 1}, w_t) + 1}{\text{count} (w_{t - n + 1} \dots w_{t - 1}) + \lvert V \rvert}
$$

$\delta$-Smoothing: $$
P(w_t \mid w_{t - n + 1}, \dots, w_{t - 1}) \approx \frac{\text{count} (w_{t - n + 1} \dots, w_{t - 1}, w_t) + \delta}{\displaystyle \sum_w \left( \text{count} (w_{t - n + 1} \dots w_{t - 1}, w) + \delta \right)} = \frac{\text{count} (w_{t - n + 1} \dots, w_{t - 1}, w_t) + \delta}{\text{count} (w_{t - n + 1} \dots w_{t - 1}) + \delta \lvert V \rvert}
$$

In [None]:
class LaplaceLanguageModel(LanguageModel):
    def __init__(
        self, tokenized_texts: Iterable[Iterable[str]], n: int, delta: float = 1.0
    ):
        self.n = n
        ngram_counts = build_ngram_counts(tokenized_texts, n)

        self.vocab = {
            token for distribution in ngram_counts.values() for token in distribution
        }

        self.probs = defaultdict(Counter)

        for prefix, distribution in ngram_counts.items():
            norm: float = sum(distribution.values()) + delta * len(self.vocab)
            self.probs[prefix] = {
                token: (count + delta) / norm for token, count in distribution.items()
            }

    def get_token_distribution(self, prefix: List[str]) -> Dict[str, float]:
        distribution: Dict[str, float] = super().get_token_distribution(prefix)
        missing_prob_total: float = 1.0 - sum(distribution.values())
        missing_prob = missing_prob_total / max(1, len(self.vocab) - len(distribution))
        return {token: distribution.get(token, missing_prob) for token in self.vocab}

    def get_next_token_prob(self, prefix: List[str], next_token: str):
        distribution: Dict[str, float] = super().get_token_distribution(prefix)
        if next_token in distribution:
            return distribution[next_token]
        else:
            missing_prob_total = 1.0 - sum(distribution.values())
            return max(0, missing_prob_total) / max(
                1, len(self.vocab) - len(distribution)
            )

перплексия снизилась, но остается большой, поскольку вероятностные распределения сильно искажаются сглаживанием

In [None]:
for n in (1, 2, 3):
    laplace_model = LaplaceLanguageModel(train_set, n=n)
    print(f"{n}: {perplexity(laplace_model, val_set)}")

In [None]:
laplace_model = LaplaceLanguageModel(train_set, n=2)

In [None]:
plot_distribution(model.get_token_distribution(["машинное"]), top_k=10, title="mle lm")
plot_distribution(
    laplace_model.get_token_distribution(["машинное"]),
    top_k=10,
    title="laplace smoothing lm",
)

попробуем менее возмущающее сглаживание

In [None]:
delta_model = LaplaceLanguageModel(train_set, n=2, delta=1e-5)

In [None]:
perplexity(delta_model, val_set)

In [None]:
prefix = "мама мыла".split()

top_k = 5
max_tokens = 10
temperature = 1.1

for _ in range(top_k):
    generated = prefix[:]
    for _ in range(max_tokens):
        next_token = get_next_token(laplace_model, generated, temperature=temperature)
        generated.append(next_token)
        if next_token == EOS:
            break
    print(" ".join(generated))
    print("#" * 100)

# Transformers

In [None]:
# !pip install transformers==4.12.3

In [None]:
from transformers import AutoModelForCausalLM, AutoTokenizer

tokenizer = AutoTokenizer.from_pretrained("ai-forever/rugpt3small_based_on_gpt2")
model = AutoModelForCausalLM.from_pretrained("ai-forever/rugpt3small_based_on_gpt2")

In [None]:
tokenized_tensors_dict = tokenizer(
    text="Сегодня состоялась лекция по machine learning", return_tensors="pt"
)

generated = model.generate(
    **tokenized_tensors_dict, max_length=100, temperature=1.0, do_sample=True
)

print(tokenizer.decode(generated.numpy()[0]))