In [None]:
"""Notebook for preprocessing of input text."""

import torch
import torch.nn as nn
from torch.nn import functional as F

In [None]:
# read file
with open("input.txt", "r") as file:
    text = file.read()

print(f"Dataset length (in characters): {len(text)}")

In [None]:
# preview data
print(text[:1000])

In [None]:
# get characters in text
chars = sorted(list(set(text)))
vocab_size = len(chars)
print(f"Characters: {''.join(chars)}\nVocabulary size: {vocab_size}")

In [None]:
# create basic encoder and decoder
stoi = {c: i for i, c in enumerate(chars)}
itos = {i: c for i, c in enumerate(chars)}
encode = lambda s: [stoi[c] for c in s]
decode = lambda v: "".join([itos[i] for i in v])

print(encode("Hello world!"))
print(decode(encode("Hello world!")))

# this is a very simple encoder/decoder, some advanced, popular ones include:
# - OpenAI's TikToken: https://github.com/openai/tiktoken
# - Google's Sentencepiece: https://github.com/google/sentencepiece

In [None]:
# change data into torch tensor
data = torch.tensor(encode(text), dtype=torch.long)
print(data.shape, data.dtype)
print(data[:1000])

In [None]:
# train/test split
split = int(0.9 * len(data))
train_data = data[:split]
val_data = data[split:]

In [None]:
# define block for data loading
BLOCK_SIZE = 8
print(train_data[: BLOCK_SIZE + 1])

In [None]:
# showcase the inputs and targets of a transformer
inputs = train_data[:BLOCK_SIZE]
targets = train_data[1 : BLOCK_SIZE + 1]
for t in range(BLOCK_SIZE):
    context = inputs[: t + 1]
    target = targets[t]
    print(f"For input {context} the target is {target}")

In [None]:
# showcase batches
BATCH_SIZE = 4


def get_batch(split):
    data = train_data if split == "train" else val_data
    ix = torch.randint(len(data) - BLOCK_SIZE, (BATCH_SIZE,))
    x = torch.stack([data[i : i + BLOCK_SIZE] for i in ix])
    y = torch.stack([data[i + 1 : i + BLOCK_SIZE + 1] for i in ix])
    return x, y


get_batch("train")

In [None]:
class BigramLanguageModel(nn.Module):
    # simple model that returns probability of P(c_t|c_t-1)

    def __init__(self, vocab_size):
        super().__init__()
        self.token_embedding_table = nn.Embedding(vocab_size, vocab_size)

    def forward(self, idx, targets=None):
        # returns (Batch, Time, Channels)
        logits = self.token_embedding_table(idx)

        if targets is None:
            loss = None
        else:
            B, T, C = logits.shape
            logits = logits.view(B * T, C)
            targets = targets.view(B * T)
            loss = F.cross_entropy(logits, targets)

        return logits, loss

    def generate(self, idx, max_new_tokens):
        for _ in range(max_new_tokens):
            logits, _ = self(idx)
            # get predictions only for last character (latest time)
            logits = logits[:, -1, :]
            # convert to probabilities
            probs = F.softmax(logits, dim=-1)
            # get 1 sample from multinomial distribution defined by probabilities
            idx_next = torch.multinomial(probs, num_samples=1)
            # append the result
            idx = torch.cat((idx, idx_next), dim=1)
        return idx


model = BigramLanguageModel(vocab_size)
logits, loss = model(*get_batch("train"))

print(f"Current loss: {loss}")
# Print text generated by model
print(
    decode(
        model.generate(torch.zeros((1, 1), dtype=torch.long), max_new_tokens=100)[
            0
        ].tolist()
    )
)

In [None]:
# create optimizer
optimizer = torch.optim.AdamW(model.parameters(), lr=1e-3)

In [None]:
BATCH_SIZE = 32

for steps in range(10000):
    xb, yb = get_batch("train")
    logits, loss = model(xb, yb)
    optimizer.zero_grad(set_to_none=True)
    # get all gradients for parameters
    loss.backward()
    # update the gradients
    optimizer.step()

print(loss.item())

In [None]:
# print model results after training
print(
    decode(
        model.generate(torch.zeros((1, 1), dtype=torch.long), max_new_tokens=100)[
            0
        ].tolist()
    )
)