# What is a token and why?

Our goal is to model language using fundamental components and the relationships between. We want the model to be dense in terms of relationships. Having a node out by itself means it will be very unlikely to get used, but it may be very important to a topic. At the same time, we want to minimize the number of components and maximize the relationships between those components.

"Words" will make very bad fundamental components for our model. Why? Let's just focus on English. How many words do we have?


In [None]:
import sys

# import for tokenizing and for the dictionary of english words
import tiktoken
from wordfreq import top_n_list

In [None]:
# Grab 100,000 most frequently used English words
words = top_n_list('en', 100000)
sentence = ' '.join(words)
sys.getsizeof(sentence)

In [None]:
# encode that dictionary using GPT-2 tiktoken
enc = tiktoken.get_encoding("cl100k_base")
tokens = enc.encode(sentence)
sys.getsizeof(tokens)

In [None]:
enc.encode('I am writing this to demonstrate how a BPE encoder works')

In [None]:
for t in enc.encode('I am writing this to demonstrate how a BPE encoder works!'):
    print(f"{t:>8d} - '{enc.decode([t])}'")

In [None]:
for t in enc.encode('bookkeepers keep the books for bookies who do the booking for bookers'):
    print(f"{t:>8d} - '{enc.decode([t])}'")

## Building our own LLM from scratch
...well, using PyTorch

We're going to build a very small LLM using lines from country music lyrics and see how well it does writing a new line for us!

In [None]:
# setup
import math
import random
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch import optim

device = "cuda" if torch.cuda.is_available() else "cpu"
device

In [None]:
corpus = [
    "the cat sat on the mat",
    "the dog sat on the rug",
    "the cat chased the mouse",
    "the dog chased the ball",
    "the cat slept on the mat",
    "the dog barked at the mailman",
]

# corpus = [
#     "Picture-perfect memories scattered all around the floor",
#     "Busted flat in Baton Rouge, headin' for a train. Feelin' nearly faded as my jeans",
#     "I can make anybody pretty. I can make you believe any lie",
#     "Stuck at a red light outside an adult bookstore. He said, 'Daddy, what are all those XXXs for?'",
#     "Well, I woke up Sunday morning. With no way to hold my head that didn't hurt",
#     "Becky was a beauty from South Alabama. Her daddy had a heart like a nine-pound hammer",
#     "I'd gladly walk across a desert with no shoes upon my feet",
#     "He had plastic bags wrapped 'round his shoes. He was covered with the evening news",
#     "A candy-colored clown they call the Sandman. Tiptoes to my room every night",
#     "I said, 'Grandpa, what's this picture here?  It's all black and white and ain't real clear'",
#     "Maybe I didn't love you quite as often as I should have",
#     "Well I'm an eight ball-shooting, double-fisted-drinking son of a gun",
#     "She put him out like the burnin' end of a midnight cigarette",
#     "Hello, walls. How'd things go for you today?",
#     "Tumble outta bed and stumble to the kitchen. Pour myself a cup of ambition"
#     "He said, 'I'll love you 'til I die'",
#     "I hear the train a comin'. It's rollin' 'round the bend. And I ain't seen the sunshine since I don't know when",
#     "Blame it all on my roots, I showed up in boots",
#     "I wandered so aimless, life filled with sin. I wouldn't let my dear Savior in",
#     "The only two things in life that make it worth livin'. Is guitars that tune good and firm-feelin' women"
# ]

# Build word-level vocabulary
tokens = " ".join(corpus).split()
vocab = sorted(set(tokens))
stoi = {w: i for i, w in enumerate(vocab)}
itos = {i: w for w, i in stoi.items()}
vocab_size = len(vocab)

print("Vocab:", vocab)
print("Vocab size:", vocab_size)

In [None]:
block_size = 5  # context length (tiny on purpose)

def encode(sentence):
    return [stoi[w] for w in sentence.split()]

def decode(ids):
    return " ".join(itos[i] for i in ids)

# Build dataset of (input, target) pairs
def build_dataset(corpus):
    X, Y = [], []
    for sent in corpus:
        ids = encode(sent)
        # slide a window over the sentence
        for i in range(1, len(ids)):
            # context is up to block_size tokens before position i
            start = max(0, i - block_size)
            x = ids[start:i]
            y = ids[i]
            # left-pad context with a special index (we'll use vocab_size as PAD)
            pad_len = block_size - len(x)
            x = [vocab_size] * pad_len + x
            X.append(x)
            Y.append(y)
    return torch.tensor(X, dtype=torch.long), torch.tensor(Y, dtype=torch.long)

X, Y = build_dataset(corpus)
print("X shape:", X.shape, "Y shape:", Y.shape)

In [None]:
pad_token_id = vocab_size
vocab_size_with_pad = vocab_size + 1
print("Vocab size incl. PAD:", vocab_size_with_pad)

In [None]:
class MiniTransformer(nn.Module):
    def __init__(self, vocab_size, d_model=32, n_heads=2, block_size=5):
        super().__init__()
        self.d_model = d_model
        self.block_size = block_size
        
        self.token_embed = nn.Embedding(vocab_size, d_model)
        self.pos_embed = nn.Embedding(block_size, d_model)
        
        self.attn = nn.MultiheadAttention(
            embed_dim=d_model,
            num_heads=n_heads,
            batch_first=True
        )
        self.ff = nn.Sequential(
            nn.Linear(d_model, 4 * d_model),
            nn.ReLU(),
            nn.Linear(4 * d_model, d_model),
        )
        self.ln1 = nn.LayerNorm(d_model)
        self.ln2 = nn.LayerNorm(d_model)
        
        self.lm_head = nn.Linear(d_model, vocab_size)

    def forward(self, x):
        """
        x: (batch, block_size) of token ids
        """
        B, T = x.shape
        
        tok_emb = self.token_embed(x)                   # (B, T, d_model)
        pos_ids = torch.arange(T, device=x.device)      # (T,)
        pos_emb = self.pos_embed(pos_ids)[None, :, :]   # (1, T, d_model)
        
        h = tok_emb + pos_emb                           # add positional info
        
        # Self-attention (mask not strictly needed with fixed-length windows,
        # but we could add a causal mask for realism)
        h_norm = self.ln1(h)
        attn_output, attn_weights = self.attn(h_norm, h_norm, h_norm)  # (B, T, d_model)
        h = h + attn_output                             # residual
        
        # Feed-forward
        h_norm = self.ln2(h)
        ff_output = self.ff(h_norm)                     # (B, T, d_model)
        h = h + ff_output                               # residual
        
        logits = self.lm_head(h)                        # (B, T, vocab_size)
        return logits, attn_weights

In [None]:
torch.manual_seed(42)

model = MiniTransformer(
    vocab_size=vocab_size_with_pad, # include PAD
    d_model=32,
    n_heads=2,
    block_size=block_size,
).to(device)

optimizer = optim.AdamW(model.parameters(), lr=1e-3)
criterion = nn.CrossEntropyLoss()

In [None]:
X_train = X.to(device)
Y_train = Y.to(device)

def train(model, X, Y, epochs=300):
    model.train()
    for epoch in range(1, epochs + 1):
        logits, _ = model(X)              # (B, T, vocab_size)
        
        # We only care about the **last** position in each sequence,
        # which predicts the "next token" after the context window.
        # So take logits[:, -1, :]
        logits_last = logits[:, -1, :]    # (B, vocab_size)
        loss = criterion(logits_last, Y)
        
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        
        if epoch % 50 == 0 or epoch == 1:
            print(f"Epoch {epoch:3d} | loss = {loss.item():.4f}")

train(model, X_train, Y_train, epochs=300)

In [None]:
def generate(model, prompt, max_new_tokens=5):
    model.eval()
    with torch.no_grad():
        tokens = prompt.split()
        for _ in range(max_new_tokens):
            # encode last block_size tokens (or pad on the left)
            ids = [stoi[w] for w in tokens if w in stoi]
            ids = ids[-block_size:]
            pad_len = block_size - len(ids)
            x = [pad_token_id] * pad_len + ids
            x = torch.tensor(x, dtype=torch.long, device=device)[None, :]  # (1, T)
            
            logits, _ = model(x)
            logits_last = logits[:, -1, :]          # (1, vocab_size)
            probs = F.softmax(logits_last, dim=-1)  # (1, vocab_size)
            
            # Sample or take argmax
            next_id = torch.multinomial(probs, num_samples=1).item()
            if next_id == pad_token_id:
                break  # avoid generating PAD token
            
            tokens.append(itos[next_id])
        return " ".join(tokens)

prompts = [
    "the cat",
    "the dog",
    "the cat chased",
    "the dog sat",
    "the mouse",
    "the mailman"
]


# prompts = [
#     "my heart",
#     "your head"
# ]

for p in prompts:
    print(f"Prompt: {p:20s} -> {generate(model, p, max_new_tokens=15)}")

In [None]:
model.eval()
with torch.no_grad():
    # Take one example batch row
    sample_x = X_train[4:5]            # shape (1, block_size)
    logits, attn_weights = model(sample_x)

print("Context tokens:", decode([
    id for id in sample_x[0].tolist() if id != pad_token_id
]))

for id in sample_x[0].tolist():
    if id != pad_token_id:
        print(f"{id:>5d} - {decode([id])}")

print("Attention weights shape:", attn_weights.shape)  # (num_heads, B, T, T)
attn_weights[0]  # first head's attention matrix

In [None]:
attn_weights