In [None]:
import random
from typing import List, Optional, Tuple

import optuna
import torch
import torch.nn as nn
import torch.nn.functional as F
import matplotlib.pyplot as plt
from torch.utils.data import Dataset, DataLoader
from tqdm.autonotebook import tqdm

%matplotlib inline

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [None]:
words = open("../../data/names.txt", "r").read().splitlines()
words[:8]

In [None]:
len(words)

In [None]:
# build the vocabulary of characters and mappings to/from integers
chars = sorted(list(set("".join(words))))
STOI = {s: i + 1 for i, s in enumerate(chars)}
STOI["."] = 0
ITOS = {i: s for s, i in STOI.items()}
print(ITOS)

In [None]:
class WordTokensDataset(Dataset):
    def __init__(self, words: List[str], block_size: int):
        X, Y = [], []
        for w in words:

            context = [0] * block_size
            for ch in w + ".":
                ix = STOI[ch]
                X.append(context)
                Y.append(ix)
                context = context[1:] + [ix]  # crop and append

        X = torch.tensor(X)
        Y = torch.tensor(Y)

        self.X = X
        self.Y = Y

    def __getitem__(self, idx):
        return self.X[idx], self.Y[idx]

    def __len__(self):
        return self.X.shape[0]

In [None]:
random.seed(42)
BLOCK_SIZE = 3

random.shuffle(words)
n1 = int(0.8 * len(words))
n2 = int(0.9 * len(words))

train_dataset = WordTokensDataset(words[:n1], BLOCK_SIZE)
validation_dataset = WordTokensDataset(words[n1:n2], BLOCK_SIZE)
test_dataset = WordTokensDataset(words[n2:], BLOCK_SIZE)

In [None]:
train_dataset.X.shape, train_dataset.Y.shape

In [None]:
class WordTokenModel(nn.Module):
    def __init__(
        self,
        token_count: int,
        block_size: int,
        embedding_layer_size: int,
        hidden_layer_size: int,
        generator: Optional[torch.Generator],
    ):
        super().__init__()

        if not generator:
            generator = torch.Generator()

        self.token_count = token_count
        self.block_size = block_size
        self.embedding_layer_size = embedding_layer_size
        self.hidden_layer_size = hidden_layer_size

        self.C = nn.Parameter(
            torch.randn((token_count, embedding_layer_size), generator=generator)
        )
        self.W1 = nn.Parameter(
            torch.randn(
                (embedding_layer_size * block_size, hidden_layer_size),
                generator=generator,
            )
        )
        self.b1 = nn.Parameter(torch.randn(hidden_layer_size, generator=generator))
        self.W2 = nn.Parameter(
            torch.randn((hidden_layer_size, token_count), generator=generator)
        )
        self.b2 = nn.Parameter(torch.randn(token_count, generator=generator))

    def forward(self, X: torch.Tensor):
        emb = self.C[X]  # (BATCH_SIZE, block_size, embedding_layer_size)
        h = torch.tanh(
            emb.view(-1, self.embedding_layer_size * self.block_size) @ self.W1
            + self.b1
        )  # (BATCH_SIZE, hidden_layer_size)
        logits = h @ self.W2 + self.b2  # (BATCH_SIZE, token_count)
        return logits

In [None]:
model = WordTokenModel(
    token_count=len(STOI),
    block_size=BLOCK_SIZE,
    embedding_layer_size=10,
    hidden_layer_size=200,
    generator=torch.Generator().manual_seed(2147483647),
).to(device)

In [None]:
class StepBasedLrGDOptimizer(torch.optim.Optimizer):
    def __init__(self, params, max_step_to_lr: List[Tuple[Optional[int], float]]):
        defaults = dict(max_epoch_to_lr=max_step_to_lr)
        super(StepBasedLrGDOptimizer, self).__init__(params, defaults)
        self.state = {"step": 0}

    def step(self):
        step = self.state["step"]

        for group in self.param_groups:
            lr = next(
                lr
                for max_epoch, lr in group["max_epoch_to_lr"]
                if max_epoch is None or step < max_epoch
            )

            for p in group["params"]:
                if p.grad is None:
                    raise ValueError("Invalid None gradient")
                p.data.add_(-lr * p.grad.data)

        self.state["step"] += 1

In [None]:
def train_model(
    model: WordTokenModel,
    dataset: Dataset,
    optimizer: torch.optim.Optimizer,
    epochs: int,
    batch_size: int,
) -> Tuple[List[int], List[float]]:
    model.train()

    dataloader = DataLoader(
        dataset, batch_size=batch_size, shuffle=True, pin_memory=True
    )

    lossi = []
    stepi = []

    for epoch in tqdm(range(epochs)):
        for X_batch, Y_batch in dataloader:

            X_batch = X_batch.to(device)
            Y_batch = Y_batch.to(device)

            logits = model.forward(X_batch)
            loss = F.cross_entropy(logits, Y_batch)

            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

        # track stats
        stepi.append(epoch)
        lossi.append(loss.log10().item())

    return stepi, lossi

In [None]:
BATCH_SIZE = 512 if device == torch.device("cuda") else 32
TOTAL_SAMPLES_TO_TRAIN = 20_000_000 if device == torch.device("cuda") else 5_000_000
EPOCHS = TOTAL_SAMPLES_TO_TRAIN // len(train_dataset)
BATCHES_BY_EPOCH = len(train_dataset) // BATCH_SIZE

In [None]:
optimizer = StepBasedLrGDOptimizer(
    model.parameters(),
    max_step_to_lr=[
        (BATCHES_BY_EPOCH * EPOCHS * 0.5, 0.1),
        (BATCHES_BY_EPOCH * EPOCHS * 0.75, 0.01),
        (None, 0.001),
    ],
)

stepi, lossi = train_model(
    model=model,
    dataset=train_dataset,
    optimizer=optimizer,
    epochs=EPOCHS,
    batch_size=BATCH_SIZE,
)

In [None]:
plt.plot(stepi, lossi)

In [None]:
def calculate_loss(model: WordTokenModel, dataset: WordTokensDataset):
    X, Y = dataset.X, dataset.Y
    X = X.to(device)
    Y = Y.to(device)
    logits = model.forward(X)
    loss = F.cross_entropy(logits, Y)
    return loss.item()

In [None]:
training_loss = calculate_loss(model, train_dataset)
validation_loss = calculate_loss(model, validation_dataset)
print(f"{training_loss = :4f}, {validation_loss = :4f}")

In [None]:
# visualize dimensions 0 and 1 of the embedding matrix C for all characters
C_cpu = model.C.cpu()

plt.figure(figsize=(8, 8))
plt.scatter(C_cpu[:, 0].data, C_cpu[:, 1].data, s=200)
for i in range(C_cpu.shape[0]):
    plt.text(
        C_cpu[i, 0].item(),
        C_cpu[i, 1].item(),
        ITOS[i],
        ha="center",
        va="center",
        color="white",
    )
plt.grid("minor")

In [None]:
# sample from the model
g = torch.Generator(device).manual_seed(2147483647 + 10)

for _ in range(20):

    out = []
    context = [0] * BLOCK_SIZE  # initialize with all ...
    while True:
        logits = model.forward(torch.tensor([context], device=device))
        probs = F.softmax(logits, dim=1)
        ix = torch.multinomial(probs, num_samples=1, generator=g).item()
        context = context[1:] + [ix]
        out.append(ix)
        if ix == 0:
            break

    print("".join(ITOS[i] for i in out))

### Hyperparameter tuning

In [None]:
def objective(trial: optuna.Trial) -> float:
    n_embedding_layer_size = trial.suggest_int("embedding_layer_size", 2, 15)
    n_hidden_layer_size = trial.suggest_int("hidden_layer_size", 100, 500)
    n_lr_start = trial.suggest_float("lr", 1e-2, 1e-1)

    model = WordTokenModel(
        token_count=len(STOI),
        block_size=BLOCK_SIZE,
        embedding_layer_size=n_embedding_layer_size,
        hidden_layer_size=n_hidden_layer_size,
        generator=torch.Generator().manual_seed(2147483647),
    ).to(device)

    optimizer = StepBasedLrGDOptimizer(
        model.parameters(),
        max_step_to_lr=[
            (BATCHES_BY_EPOCH * EPOCHS * 0.5, n_lr_start),
            (BATCHES_BY_EPOCH * EPOCHS * 0.75, n_lr_start * 0.1),
            (None, n_lr_start * 0.01),
        ],
    )

    train_model(
        model=model,
        dataset=train_dataset,
        optimizer=optimizer,
        epochs=EPOCHS,
        batch_size=BATCH_SIZE,
    )

    validation_loss = calculate_loss(model, validation_dataset)
    return validation_loss

In [None]:
study = optuna.create_study(direction="minimize")
study.optimize(objective, n_trials=15)

trial = study.best_trial

print(f"Loss: {trial.value}")
print(f"Best hyperparameters: {trial.params}")

In [None]:
n_embedding_layer_size = trial.params["embedding_layer_size"]
n_hidden_layer_size = trial.params["hidden_layer_size"]
n_lr_start = trial.params["lr"]

model = WordTokenModel(
    token_count=len(STOI),
    block_size=BLOCK_SIZE,
    embedding_layer_size=n_embedding_layer_size,
    hidden_layer_size=n_hidden_layer_size,
    generator=torch.Generator().manual_seed(2147483647),
).to(device)

optimizer = StepBasedLrGDOptimizer(
    model.parameters(),
    max_step_to_lr=[
        (BATCHES_BY_EPOCH * EPOCHS * 0.5, n_lr_start),
        (BATCHES_BY_EPOCH * EPOCHS * 0.75, n_lr_start * 0.1),
        (None, n_lr_start * 0.01),
    ],
)

_ = train_model(
    model=model,
    dataset=train_dataset,
    optimizer=optimizer,
    epochs=EPOCHS * 3,  # Train the final model a bit more
    batch_size=BATCH_SIZE,
)

In [None]:
training_loss = calculate_loss(model, train_dataset)
validation_loss = calculate_loss(model, validation_dataset)
test_loss = calculate_loss(model, test_dataset)
print(f"{training_loss = :4f}, {validation_loss = :4f}, {test_loss = :4f}")