# Bigram LM as based for a Transformer LM

In [2]:
with open("../input/fortune-messages.txt") as f:
  text = f.read()

In [3]:
chars = sorted(list(set(text)))
vocab_size = len(chars)

In [4]:
char_to_token = { c:i for i, c in enumerate(chars) }
token_to_char = { i:c for i, c in enumerate(chars) }
encode = lambda s: [ char_to_token[c] for c in s ]
decode = lambda l: "".join([ token_to_char[t] for t in l])

print(encode("hello"))
print(decode(encode("hello")))

[46, 43, 50, 50, 53]
hello


In [5]:
import torch
data = torch.tensor(encode(text), dtype=torch.long)
n = int(0.9 * len(data))
train_data = data[:n]
val_data = data[n:]
print(data.shape)

torch.Size([13083])


In [6]:
# For each x and y, we get block_size training samples
block_size = 8
x = train_data[:block_size]
y = train_data[1:block_size+1]
print("For this chunk (x)", train_data[:block_size])
for i in range(block_size):
  context = x[:i+1]
  target = y[i]
  print(f"When the input is {context}, we expect {target}")

For this chunk (x) tensor([37, 47, 58, 46,  1, 47, 52, 58])
When the input is tensor([37]), we expect 47
When the input is tensor([37, 47]), we expect 58
When the input is tensor([37, 47, 58]), we expect 46
When the input is tensor([37, 47, 58, 46]), we expect 1
When the input is tensor([37, 47, 58, 46,  1]), we expect 47
When the input is tensor([37, 47, 58, 46,  1, 47]), we expect 52
When the input is tensor([37, 47, 58, 46,  1, 47, 52]), we expect 58
When the input is tensor([37, 47, 58, 46,  1, 47, 52, 58]), we expect 43


In [7]:
batch_size = 4
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

def get_batch(source):
  data = train_data if source == "train" else val_data
  rv = torch.randint(len(data) - block_size, (batch_size,))
  inputs = torch.stack([data[r:r+block_size] for r in rv])
  outputs = torch.stack([data[r+1:r+block_size+1] for r in rv])
  inputs, outputs = inputs.to(device), outputs.to(device)

  return [inputs, outputs]

xb, yb = get_batch("train")
get_batch("train")

[tensor([[43,  1, 58, 46, 43,  1, 51, 53],
         [41, 53, 51, 54, 50, 47, 57, 46],
         [58,  1, 42, 53,  1, 63, 53, 59],
         [39, 52, 42,  1, 58, 46, 43,  1]]),
 tensor([[ 1, 58, 46, 43,  1, 51, 53, 57],
         [53, 51, 54, 50, 47, 57, 46, 51],
         [ 1, 42, 53,  1, 63, 53, 59, 56],
         [52, 42,  1, 58, 46, 43,  1, 39]])]

In [None]:
import torch
import torch.nn as nn
from torch.nn import functional as f

class BigramLM(nn.Module):
  def __init__(self, vocab_size):
    super().__init__()
    self.token_embedding_table = nn.Embedding(vocab_size, vocab_size)

  # idx is a tensor of shape (B, T): batch size B, sequence length T
  def forward(self, idx, targets=None):
    # For each token in the input, retrieve its embedding
    # Result is shape (B, T, C), where C = vocab_size
    # Since this is a Bigram model, the embedding gives the posiblidates of the followin token
    logits = self.token_embedding_table(idx)

    if targets == None:
      return logits, None

    B, T, C = logits.shape

    # The cross entropy function expects logits and targets
    # The logits tensor should have shape (B, C), where B is the total number of samples (e.g., batch size),
    # and C is the number of classes (for example, 10 if you're classifying digits 0–9)
    # The target should be a tensor of shape (B,), where each element is the class index
    # for each corresponding sample in the batch (e.g., a number from 0 to 9 in digit classification)
    logits = logits.view(B * T, C)
    target = targets.view(B * T)
    cost = f.cross_entropy(logits, target)

    return logits, cost

  def generate(self, idx, new_tokens):
    for _ in range(new_tokens):
      logits, _ = self(idx)

      # We take only the logits from the last time step (T),
      # which gives us the predicted probabilities for the next token.
      # Selecting the last index from the second dimension gives a tensor of shape (B, C),
      # where B is the batch size and C is the vocabulary size.
      logits = logits[:, -1, :]

      # Apply softmax across the last dimension (C) to convert logits to probabilities
      probs = f.softmax(logits, dim=-1)

      # Sample the next token from the probability distribution
      # The result has shape (B, 1), where each element is the sampled token index
      idx_next = torch.multinomial(probs, num_samples=1)

      # Append the sampled token to the existing sequence
      # The new idx has shape (B, T+1)
      idx = torch.cat((idx, idx_next), dim=-1)
    return idx

model = BigramLM(vocab_size)

In [9]:
batch_size = 30
optimizer = torch.optim.AdamW(model.parameters(), lr=1e-3)

print(f"-- Text generation before training a Bigram model: ",
  decode(model.generate(torch.zeros(1, 1, dtype=torch.long), 200)[0].tolist()))

for steps in range(10000):
  xb, yb = get_batch("train")

  logits, cost = model(xb, yb)

  optimizer.zero_grad(set_to_none=True)
  cost.backward()
  optimizer.step()

print(f"-- Text generation before training a Bigram model: ",
  decode(model.generate(torch.zeros(1, 1, dtype=torch.long), 200)[0].tolist()))
print(f"Cost: {cost.item()}")


-- Text generation before training a Bigram model:  
z:yqRIEkp'D1qv3dWjsFBu (ayBrjstp,rzWYkp??A.AnyUG))NMpnCDAl wLxHKx?1HnRrYhG!!nR)C"SnhTP
PYH)"Vcu'qvCGNp'etT
?'LvvGH'2CII?CDyjtrKE.Njvt")dUDeTxUgdlErKq
"gWki(DECbqvDsVDss;ollednJMCzHJyi-(GdjhrzWYhpVEzL3
-- Text generation before training a Bigram model:  
Man t.
Dou ned is ondve gisely ferityon derssstu yoou thespou tisithoveniorin ibilo ubofingonecer tuesthawimiour's.
Be t ing taveilly.
M'st u"Sal y.
Thmilfe tis o es tthan s s a es.
Yotearunge plece o
Cost: 2.2598772048950195


# Matematical trick in seft-attention

In [10]:
B,T,C = 4,8,2
x = torch.randn(B,T,C)

# bow stands for bag of words, is the tensor where we are going to store the avg
xbow = torch.zeros((B,T,C))

for b in range(B):
  for t in range(T):
    xprev = x[b,:t+1] #(t + 1, C)

    # We generate a new embedding that is the average of all previous ones (including the current one)
    # The average is taken along dimension 0 — that is, across rows
    xbow[b,t] = torch.mean(xprev, 0)

print(x[0], xbow[0])

tensor([[ 0.2185,  0.6711],
        [ 0.5332, -1.0323],
        [ 0.2328,  0.2747],
        [-1.2962,  1.7230],
        [ 0.0458,  0.6589],
        [-1.3700,  1.9989],
        [ 0.6677,  0.6635],
        [-1.2354,  0.0185]]) tensor([[ 0.2185,  0.6711],
        [ 0.3758, -0.1806],
        [ 0.3282, -0.0289],
        [-0.0779,  0.4091],
        [-0.0532,  0.4591],
        [-0.2726,  0.7157],
        [-0.1383,  0.7082],
        [-0.2754,  0.6220]])


In [11]:
# Because we're using for loops, we're not being efficient.
# But we can achieve the same result using matrix multiplication.

# Let's use a new matrix wei to compute the average of the previous embeddings
wei = torch.tril(torch.ones(3, 3))
print(wei)

# Configure the wei matrix to compute the average of the previous embeddings across dimension 0
# Each row is normalized by the number of ones (i.e., number of elements being averaged)
wei = wei / torch.sum(wei, 1, keepdim=True)

# Create an example matrix x, where each row is a "time step" with an embedding of size 2
x = torch.randint(0, 10, (3, 2)).float()

print(wei)
print(x)
print(wei @ x)

tensor([[1., 0., 0.],
        [1., 1., 0.],
        [1., 1., 1.]])
tensor([[1.0000, 0.0000, 0.0000],
        [0.5000, 0.5000, 0.0000],
        [0.3333, 0.3333, 0.3333]])
tensor([[3., 2.],
        [6., 5.],
        [6., 4.]])
tensor([[3.0000, 2.0000],
        [4.5000, 3.5000],
        [5.0000, 3.6667]])


In [12]:
# Example of a single head of self attention as a decoder

B,T,C = 4,8,2
head_size = 16

x = torch.randn(B,T,C)
key = nn.Linear(C, head_size, bias=False)
query = nn.Linear(C, head_size, bias=False)
value = nn.Linear(C, head_size, bias=False)

# This is self-attention because keys, queries, and values all come from the same input x

# Queries: "What should I pay attention to?"
q = query(x)  # Shape: (B, T, head_size)

# Keys: "What information do I contain?"
k = key(x)    # Shape: (B, T, head_size)

# Values: "What information should actually be passed on?"
# The model needs more flexibility and capacity to learn rich patterns than what raw embeddings provide.
v = value(x)  # Shape: (B, T, head_size)

# Compute attention scores: dot product between queries and keys
# Shape: (B, T, T) — how much each token should attend to every other token
wei = q @ k.transpose(-2, -1)  # Transpose swaps last two dims of k to match q

# If the values in wei are too large, the softmax becomes very peaky
# meaning it assigns almost all the attention to a single token and ignores the rest.
# This can hurt learning and make training slower or unstable.
# So we scale the scores to normalize them before applying softmax.
wei = wei / (head_size ** 0.5) # Scale the scores by sqrt(head_size)

# Mask upper triangle (future tokens), to make a decoder
# We’re tryyin to decode the sequence so far into the next token,
# So we want to prevent the model from looking at future tokens.
tri = torch.tril(torch.ones(T, T))
wei = wei.masked_fill(tri == 0, float('-inf'))  

wei = f.softmax(wei, dim=-1)

# Get updated values based on attention scores
out = wei @ v  # Shape: (B, T, head_size)

# Transformer Language Model

In [None]:
batch_size = 120
block_size = 16
max_steps = 5000
eval_iters = 500
learning_rate = 1e-3
n_embeddings = 32

class Transformer(nn.Module):
  def __init__(self):
    super().__init__()

    self.token_embedding_table = nn.Embedding(vocab_size, n_embeddings)
    self.position_embedding_table = nn.Embedding(block_size, n_embeddings)
    self.blocks = nn.Sequential(
      TransformerBlock(n_embeddings, n_embeddings // 4),
      TransformerBlock(n_embeddings, n_embeddings // 4),
      TransformerBlock(n_embeddings, n_embeddings // 4),
      TransformerBlock(n_embeddings, n_embeddings // 4),
    )

    # It maps the output embeddings back to a distribution over the vocabulary
    # the term "head" refers to the final part of a model that produces the actual output (logits)
    self.lm_head = nn.Linear(n_embeddings, vocab_size)

  def forward(self, idx, targets=None):
    B, T = idx.shape

    tokens_embeddings = self.token_embedding_table(idx) # B, T, n_embeddings
    pos_embeddings = self.position_embedding_table(torch.arange(T, device=idx.device)) # T, n_embeddings

    #This way, the model understands not just what each word is, but also where it is in the sentence.
    x = tokens_embeddings + pos_embeddings # B, T, n_embeddings
    x = self.blocks(x)

    logits = self.lm_head(x) # B, T, vocab_size

    if targets == None:
      return logits, None

    B, T, C = logits.shape

    logits = logits.view(B*T, C)
    target = targets.view(B*T)
    cost = f.cross_entropy(logits, target)

    return logits, cost

  ## Idx is B, T
  def generate(self, idx, new_tokens):
    for _ in range(new_tokens):
      logits, _ = self(idx[:, -block_size:])
      logits = logits[:, -1, :]

      probs = f.softmax(logits, dim=-1)
      idx_next = torch.multinomial(probs, num_samples=1)

      idx = torch.cat((idx, idx_next), dim=1)
    return idx

# Attention: “What should I know from others?”
class SingleHeadOfAttention(nn.Module):
  def __init__(self, head_size):
    super().__init__()
    self.key = nn.Linear(n_embeddings, head_size, bias=False)
    self.query = nn.Linear(n_embeddings, head_size, bias=False)
    self.value = nn.Linear(n_embeddings, head_size, bias=False)
    # Registers this matrix as a buffer in the model:
    # Include this tensor when saving the model's state with model.state_dict().
    # Automatically move it to GPU/CPU along with the model.
    self.register_buffer("tril", torch.tril(torch.ones(block_size, block_size)))

  # We recevive the embeddings for each c in t
  def forward(self, x):
    B, T, C = x.shape

    k = self.key(x) # B, T, head_size
    q = self.query(x) # B, T, head_size
    v = self.value(x)

    wei = q @ k.transpose(-2, -1) * (head_size ** -0.5) # B, T, T
    wei = wei.masked_fill(self.tril[:T, :T] == 0, float("-inf"))
    wei = f.softmax(wei, dim=-1)

    return wei @ v # B, T, head_size

# A single attention head can only focus on one type of relationship between tokens at a time.
# Multiple heads allow the model to learn multiple perspectives, Syntactic structure, Semantic meaning, Local vs global dependencies
class MultiHeadOfAttention(nn.Module):
  def __init__(self, num_heads, head_size):
    super().__init__()
    self.heads = nn.ModuleList([SingleHeadOfAttention(head_size) for _ in range(num_heads)])
    # Learns how to combine the outputs of the individual heads in a meaningful wa
    self.proj = nn.Linear(n_embeddings, n_embeddings)

  def forward(self, x):
    res = torch.cat([h(x) for h in self.heads], dim=-1)
    return self.proj(res)

# What should I do with that knowledge that I gained from the self attention.
# Attention doesn't actually transform the token’s representation in a deep, nonlinear way.
# That's the job of the feedforward layer.
class FeedForward(nn.Module):
  def __init__(self, n_embeddings):
    super().__init__()
    self.n_embeddings = n_embeddings
    self.net = nn.Sequential(
      nn.Linear(n_embeddings, n_embeddings * 4), # *4 # Expand (richer representation)
      nn.ReLU(),
      nn.Linear(n_embeddings * 4, n_embeddings)
    )

  def forward(self, x):
    return self.net(x)

# Allow the model to learn increasingly abstract and complex representations of the input.
# First block: each token gathers some context.
# Second block: tokens now gather context from already context-aware representations.
# Third block: further refined interactions, more abstract patterns.
class TransformerBlock(nn.Module):
  def __init__(self, n_embeddings, n_head):
    super().__init__()
    head_size = n_embeddings // n_head
    self.sa = MultiHeadOfAttention(n_head, head_size)
    self.ffwd = FeedForward(n_embeddings)
    self.n1 = nn.LayerNorm(n_embeddings)
    self.n2 = nn.LayerNorm(n_embeddings)

  #Using residual connection to improve vanishing gradient
  def forward(self, x):
    # Communication
    x = x + self.sa(self.n1(x))

    # Computation
    x = x + self.ffwd(self.n2(x))
    return x

@torch.no_grad()
def estimate_loss():
  out = {}
  model.eval()
  for split in ["train", "val"]:
    losses = torch.zeros(eval_iters)
    for i in range(eval_iters):
      xb, yb = get_batch(split)
      logits, cost = model(xb, yb)
      losses[i] = cost.item()
    out[split] = losses.mean()
  model.train()
  return out

model = Transformer()
model = model.to(device)

In [16]:
import time

optimizer = torch.optim.AdamW(model.parameters(), lr=learning_rate)
start = time.time()
print("device", device)
print("batch_size", batch_size)

for steps in range(max_steps + 1):
  xb, yb = get_batch("train")

  if steps % eval_iters == 0:
    losses = estimate_loss()
    print(f"step {steps}: train loss {losses['train']:.4f}, val loss {losses['val']:.4f}")

  logits, cost = model(xb, yb)

  optimizer.zero_grad(set_to_none=True)
  cost.backward()
  optimizer.step()

end = time.time()
print("Training time:", (end - start) / 60)

device cpu
batch_size 120
step 0: train loss 4.6570, val loss 4.6366
step 500: train loss 2.1040, val loss 2.2226
step 1000: train loss 1.8534, val loss 2.0706
step 1500: train loss 1.6827, val loss 2.0235
step 2000: train loss 1.5512, val loss 2.0072
step 2500: train loss 1.4477, val loss 2.0370


KeyboardInterrupt: 

In [None]:
context = model.generate(torch.zeros(1, 1, dtype=torch.long, device=device), 1000)
print(decode(context[0].tolist()))