Adapted from Karpathy - Let's build GPT from scratch, in code, spelled out ([Video](https://www.youtube.com/watch?v=kCc8FmEb1nY&t=797s))

In [15]:
import torch

batch_size = 64 # how many independent sequences will we process in parallel?
block_size = 256 # what is the maximum context length for predictions?
max_iters = 5000 # how many training iterations to run for
eval_interval = 500 # how often to evaluate the model on the validation set
learning_rate = 3e-4 # what learning rate to use
device = 'cuda' if torch.cuda.is_available() else 'cpu'
eval_iters = 200 # how many batches to use for evaluation
n_embed = 384 # dimensionality of the token embeddings
n_head = 6 # how many heads to use in the multi-head attention
n_layer = 6 # how many layers to use in the transformer
dropout = 0.2


In [16]:
import os
import urllib.request

print(os.getcwd())
# Where to save it (match nanoGPT structure if you want)
out_dir = os.path.join("data", "shakespeare_char")
os.makedirs(out_dir, exist_ok=True)

url = "https://raw.githubusercontent.com/karpathy/char-rnn/master/data/tinyshakespeare/input.txt"
out_path = os.path.join(out_dir, "input.txt")

print("Downloading Tiny Shakespeare...")
urllib.request.urlretrieve(url, out_path)
print(f"Saved to {out_path}")

C:\Users\wadka\Documents\GitHub\pytorch-gpu-anatomy\notebooks
Downloading Tiny Shakespeare...
Saved to data\shakespeare_char\input.txt


In [17]:
with open(f"{out_path}",encoding='utf-8') as f:
    text = f.read()

In [18]:
print("length of dataset in characters: ",len(text))
print(text[:1000])

length of dataset in characters:  1115394
First Citizen:
Before we proceed any further, hear me speak.

All:
Speak, speak.

First Citizen:
You are all resolved rather to die than to famish?

All:
Resolved. resolved.

First Citizen:
First, you know Caius Marcius is chief enemy to the people.

All:
We know't, we know't.

First Citizen:
Let us kill him, and we'll have corn at our own price.
Is't a verdict?

All:
No more talking on't; let it be done: away, away!

Second Citizen:
One word, good citizens.

First Citizen:
We are accounted poor citizens, the patricians good.
What authority surfeits on would relieve us: if they
would yield us but the superfluity, while it were
wholesome, we might guess they relieved us humanely;
but they think we are too dear: the leanness that
afflicts us, the object of our misery, is as an
inventory to particularise their abundance; our
sufferance is a gain to them Let us revenge this with
our pikes, ere we become rakes: for the gods know I
speak this in hung

In [19]:
chars = sorted(set(text))
vocab_size = len(chars)
print("all the unique characters:", chars)
print(''.join(chars))
print("vocab size:", vocab_size)
# %%

all the unique characters: ['\n', ' ', '!', '$', '&', "'", ',', '-', '.', '3', ':', ';', '?', 'A', 'B', 'C', 'D', 'E', 'F', 'G', 'H', 'I', 'J', 'K', 'L', 'M', 'N', 'O', 'P', 'Q', 'R', 'S', 'T', 'U', 'V', 'W', 'X', 'Y', 'Z', 'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r', 's', 't', 'u', 'v', 'w', 'x', 'y', 'z']

 !$&',-.3:;?ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz
vocab size: 65


In [20]:
stoi = {ch:i for i,ch in enumerate(chars)}
itos = dict(enumerate(chars))

def encode(mystr):
    return [stoi[c] for c in mystr]

def decode(tokens):
    return ''.join([itos[i] for i in tokens])

print(encode("hii there"))
print(decode(encode("hii there")))

[46, 47, 47, 1, 58, 46, 43, 56, 43]
hii there


In [21]:
data = torch.tensor(encode(text), dtype=torch.long)
print(data.shape, data.dtype)
print(data[:1000])
# %%

torch.Size([1115394]) torch.int64
tensor([18, 47, 56, 57, 58,  1, 15, 47, 58, 47, 64, 43, 52, 10,  0, 14, 43, 44,
        53, 56, 43,  1, 61, 43,  1, 54, 56, 53, 41, 43, 43, 42,  1, 39, 52, 63,
         1, 44, 59, 56, 58, 46, 43, 56,  6,  1, 46, 43, 39, 56,  1, 51, 43,  1,
        57, 54, 43, 39, 49,  8,  0,  0, 13, 50, 50, 10,  0, 31, 54, 43, 39, 49,
         6,  1, 57, 54, 43, 39, 49,  8,  0,  0, 18, 47, 56, 57, 58,  1, 15, 47,
        58, 47, 64, 43, 52, 10,  0, 37, 53, 59,  1, 39, 56, 43,  1, 39, 50, 50,
         1, 56, 43, 57, 53, 50, 60, 43, 42,  1, 56, 39, 58, 46, 43, 56,  1, 58,
        53,  1, 42, 47, 43,  1, 58, 46, 39, 52,  1, 58, 53,  1, 44, 39, 51, 47,
        57, 46, 12,  0,  0, 13, 50, 50, 10,  0, 30, 43, 57, 53, 50, 60, 43, 42,
         8,  1, 56, 43, 57, 53, 50, 60, 43, 42,  8,  0,  0, 18, 47, 56, 57, 58,
         1, 15, 47, 58, 47, 64, 43, 52, 10,  0, 18, 47, 56, 57, 58,  6,  1, 63,
        53, 59,  1, 49, 52, 53, 61,  1, 15, 39, 47, 59, 57,  1, 25, 39, 56, 41,
      

In [22]:
#Separate the dataset into train and test
n = int(0.9*len(data))
train_data = data[:n]
val_data = data[n:]
# %%

In [24]:
def get_batch(split):
    #generate a small batch of data of inputs x and targets y
    data = train_data if split == 'train' else val_data
    #Generate starting indexes for each sequence
    ix = torch.randint(len(data) - block_size, (batch_size,))
    #Take consecutive blocks of data starting from ix
    x = torch.stack([data[i:i+block_size] for i in ix])
    #Take consecutive blocks of data starting from ix+1
    y = torch.stack([data[i+1:i+block_size+1] for i in ix])
    return x, y

xb, yb = get_batch('train')
print('inputs:')
print(xb.shape)
print(xb)
print('targets:')
print(yb.shape)
print(yb)

print('----')

inputs:
torch.Size([64, 256])
tensor([[ 1, 41, 46,  ..., 59, 50, 42],
        [56, 58, 59,  ..., 46, 39, 58],
        [53,  1, 46,  ..., 56, 59, 52],
        ...,
        [45, 53, 53,  ..., 56, 43,  1],
        [52, 42,  1,  ..., 58, 46, 59],
        [39,  1, 50,  ..., 39,  1, 52]])
targets:
torch.Size([64, 256])
tensor([[41, 46, 39,  ..., 50, 42,  1],
        [58, 59, 56,  ..., 39, 58,  6],
        [ 1, 46, 47,  ..., 59, 52, 41],
        ...,
        [53, 53, 42,  ..., 43,  1, 63],
        [42,  1, 39,  ..., 46, 59, 52],
        [ 1, 50, 53,  ...,  1, 52, 47]])
----


In [25]:
import torch
from torch import nn
from torch.nn import functional as F

torch.manual_seed(1337)

<torch._C.Generator at 0x203ce698d10>

In [30]:
class Head(nn.Module):
    """
    One causal self-attention head.

    Input:
      x: (B, T, n_embed)  # B=batch size, T=sequence length, n_embed=model width

    Output:
      out: (B, T, head_size)

    What it computes (conceptually):
      For each position t, produce a weighted sum of "value" vectors from positions <= t.
      The weights come from similarity between the current token's "query" and all tokens' "keys",
      with a causal mask that prevents looking into the future.
    """

    def __init__(self, head_size: int):
        super().__init__()

        # Project the model embedding (n_embed) into key/query/value subspaces (head_size).
        # These are learned linear maps shared across all tokens and time steps.
        #
        # key(x)[b,t]   = K vector used as "address" for token at time t
        # query(x)[b,t] = Q vector used to ask "what should I attend to from the past?"
        # value(x)[b,t] = V vector that contains the information we will mix/aggregate
        #
        # Shapes:
        #   x: (B, T, n_embed)
        #   key/query/value outputs: (B, T, head_size)
        self.key = nn.Linear(n_embed, head_size, bias=False)
        self.query = nn.Linear(n_embed, head_size, bias=False)
        self.value = nn.Linear(n_embed, head_size, bias=False)

        # Precompute a lower-triangular causal mask of shape (block_size, block_size).
        #
        # tril[i, j] = 1 if j <= i else 0
        #
        # We register it as a buffer (not a Parameter) because:
        #   - it is not learned
        #   - it should move with the module to GPU/CPU (m.to(device))
        #   - it should be saved/loaded with the state_dict
        self.register_buffer(
            "tril",
            torch.tril(torch.ones(block_size, block_size))
        )
        self.dropout = nn.Dropout(dropout)

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        """
        Compute causal self-attention for one head.

        Args:
          x: (B, T, n_embed)

        Returns:
          out: (B, T, head_size)
        """
        # Unpack shapes.
        # C here is n_embed (model width). Don't confuse it with head_size.
        B, T, C = x.shape

        # Compute Keys and Queries for all tokens in the sequence.
        # k[b, t, :] and q[b, t, :] are vectors in R^(head_size).
        k = self.key(x)     # (B, T, head_size)
        q = self.query(x)   # (B, T, head_size)

        # Compute raw attention scores ("affinities") between each query and each key.
        #
        # For each batch b:
        #   q[b] is (T, head_size)
        #   k[b].transpose(-2, -1) is (head_size, T)
        #   q[b] @ k[b]^T gives (T, T), where entry (t, i) is dot(q_t, k_i)
        #
        # After broadcasting over batch:
        #   (B, T, head_size) @ (B, head_size, T) -> (B, T, T)
        #
        # Scaling by 1/sqrt(head_size) stabilizes softmax:
        # dot products grow with dimension; without scaling softmax can saturate (very peaky),
        # hurting gradients and training stability.
        wei = q @ k.transpose(-2, -1) * (k.size(-1) ** -0.5)   # (B, T, T)

        # Apply causal mask: disallow attention to future positions.
        #
        # self.tril[:T, :T] is a (T, T) matrix with 1s in the lower triangle.
        # Positions where mask == 0 correspond to "future" tokens (i < j) for a given row.
        #
        # masked_fill sets those illegal positions to -inf so that after softmax they become 0.
        wei = wei.masked_fill(self.tril[:T, :T] == 0, float("-inf"))  # (B, T, T)

        # Convert scores to probabilities along the last dimension (over "keys"/time positions).
        #
        # For each (b, t), wei[b, t, :] becomes a distribution over i in [0..T-1],
        # and due to masking, probability mass is only on i <= t.
        wei = F.softmax(wei, dim=-1)  # (B, T, T)
        wei = self.dropout(wei)
        # Compute Values.
        v = self.value(x)  # (B, T, head_size)

        # Weighted sum of values using attention weights.
        #
        # For each batch b:
        #   wei[b] is (T, T) and v[b] is (T, head_size)
        #   out[b] = wei[b] @ v[b] gives (T, head_size)
        #
        # Interpretation:
        #   out[b, t, :] = sum_{i=0..T-1} wei[b, t, i] * v[b, i, :]
        # and because of the mask, this is effectively sum_{i<=t}.
        out = wei @ v  # (B, T, head_size)

        return out

In [None]:
class MultiHeadAttention(nn.Module):
    """
    Multi-head causal self-attention.

    This module runs several independent causal self-attention "heads" in parallel,
    concatenates their outputs, and then projects the result back to the model
    embedding dimension (n_embed).

    Why multiple heads?
      - Each head attends to the sequence in a different subspace.
      - Different heads can specialize (syntax, long-range deps, local patterns, etc.).
      - Concatenation preserves all head-specific information before mixing.
    """

    def __init__(self, num_heads: int, head_size: int):
        super().__init__()

        # Create `num_heads` independent attention heads.
        #
        # Each Head:
        #   input:  (B, T, n_embed)
        #   output: (B, T, head_size)
        #
        # ModuleList is required so PyTorch:
        #   - registers the submodules
        #   - tracks their parameters
        #   - moves them with .to(device)
        self.heads = nn.ModuleList(
            [Head(head_size) for _ in range(num_heads)]
        )

        # Final linear projection that mixes information from all heads.
        #
        # Input dimension:  head_size * num_heads
        # Output dimension: n_embed
        #
        # This allows the model to:
        #   - recombine features from different heads
        #   - return to the standard embedding width expected by later layers
        self.proj = nn.Linear(head_size * num_heads, n_embed)


    def forward(self, x: torch.Tensor) -> torch.Tensor:
        """
        Forward pass.

        Args:
          x: (B, T, n_embed)
             Token representations with positional information already added.

        Returns:
          out: (B, T, n_embed)
               Context-enriched representations after multi-head attention.
        """

        # Run all attention heads in parallel on the same input x.
        #
        # For each head h:
        #   h(x) has shape (B, T, head_size)
        #
        # The list comprehension produces a list of tensors:
        #   [ (B,T,head_size), (B,T,head_size), ..., num_heads times ]
        #
        # torch.cat(..., dim=-1) concatenates along the channel dimension:
        #   (B, T, head_size * num_heads)
        #
        # This preserves all head outputs side-by-side.
        out = torch.cat(
            [h(x) for h in self.heads],
            dim=-1
        )  # (B, T, head_size * num_heads)

        # Linearly project concatenated head outputs back to n_embed.
        #
        # This step:
        #   - mixes information across heads
        #   - restores the model's canonical embedding width
        #
        # Without this projection, downstream layers would need to handle
        # a wider tensor, and heads would never interact.
        out = self.proj(out)  # (B, T, n_embed)

        return out

In [None]:
class FeedForward(nn.Module):
    """
    Position-wise Feed-Forward Network (FFN).

    PURPOSE
    -------
    The FeedForward block applies a non-linear transformation to each token
    *independently* after attention has mixed information across tokens.

    Conceptually:
      - Self-attention = "communication" between tokens
      - FeedForward    = "computation" within each token

    This block allows the model to:
      - increase representational capacity
      - apply non-linear feature transformations
      - re-encode attended information in a richer way

    Crucially:
      - It does NOT mix information across time steps.
      - Each token position is processed independently and identically.
      - The same MLP is shared across all positions.

    INPUT / OUTPUT SHAPES
    ---------------------
      Input:  x of shape (B, T, n_embed)
      Output: x of shape (B, T, n_embed)

    where:
      B = batch size
      T = sequence length
      n_embed = model embedding dimension
    """

    def __init__(self, n_embed: int):
        super().__init__()

        # The feed-forward network is a simple 2-layer MLP:
        #
        #   n_embed  -> 4*n_embed -> n_embed
        #
        # The expansion factor (4x) is standard in Transformer architectures.
        # It gives the model a wider intermediate space to learn complex
        # feature interactions before projecting back to the model dimension.
        #
        # This entire network is applied independently to each token.
        self.net = nn.Sequential(
            nn.Linear(n_embed, 4 * n_embed),  # expand feature dimension
            nn.ReLU(),                        # non-linearity
            nn.Linear(4 * n_embed, n_embed),  # project back to model dimension
        )

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        """
        Forward pass.

        Args:
          x: (B, T, n_embed)
             Token representations after attention and (usually) layer normalization.

        Returns:
          out: (B, T, n_embed)
               Transformed token representations.

        Note:
          - The same MLP is applied to every token position.
          - There is no interaction between tokens here.
        """
        return self.net(x)


In [None]:
class Block(nn.Module):
    """
    Transformer block: multi-head causal self-attention + feed-forward MLP,
    with residual connections.
    """
    def __init__(self, n_embed: int, num_heads: int):
        super().__init__()
        assert n_embed % num_heads == 0
        head_size = n_embed // num_heads

        self.sa = MultiHeadAttention(num_heads=num_heads, head_size=head_size)
        self.ffwd = FeedForward(n_embed)

        # Recommended later (stability):
        self.ln1 = nn.LayerNorm(n_embed)
        self.ln2 = nn.LayerNorm(n_embed)

    def forward(self, x):
        # Simple residual version (works, not as stable as LayerNorm version):
        x = x + self.sa(self.ln1(x))
        x = x + self.ffwd(self.ln2(x))
        return x


In [26]:

class BigramLanguageModel4(nn.Module):
    def __init__(self, num_heads: int = 4, n_layers: int = 3):
        super().__init__()
        assert n_embed % num_heads == 0, "n_embed must be divisible by num_heads"

        self.token_embedding_table = nn.Embedding(vocab_size, n_embed)
        self.position_embedding_table = nn.Embedding(block_size, n_embed)

        self.blocks = nn.Sequential(*[
            Block(n_embed=n_embed, num_heads=4)
            for _ in range(n_layers)
        ])
        self.ln_f = nn.LayerNorm(n_embed)
        self.lm_head = nn.Linear(n_embed, vocab_size)

    def forward(self, idx, targets=None):
        B, T = idx.shape

        tok_emb = self.token_embedding_table(idx)              # (B,T,n_embed)
        pos = torch.arange(T, device=idx.device)               # (T,)
        pos_emb = self.position_embedding_table(pos)           # (T,n_embed)
        x = tok_emb + pos_emb                                  # (B,T,n_embed)

        x = self.blocks(x)                                     # (B,T,n_embed)
        x = self.ln_f(x)                                       # (B,T,n_embed)
        logits = self.lm_head(x)                               # (B,T,vocab)

        loss = None
        if targets is not None:
            loss = F.cross_entropy(
                logits.reshape(B*T, logits.size(-1)),
                targets.reshape(B*T),
            )
        return logits, loss

    def generate(self, idx, max_new_tokens, temperature: float = 1.0, top_k: int | None = None):
        self.eval()
        with torch.no_grad():
            for _ in range(max_new_tokens):
                idx_cond = idx[:, -block_size:]
                logits, _ = self(idx_cond)
                last_logits = logits[:, -1, :]

                if temperature != 1.0:
                    last_logits = last_logits / temperature

                if top_k is not None:
                    v, _ = torch.topk(last_logits, k=top_k, dim=-1)
                    cutoff = v[:, -1].unsqueeze(-1)
                    last_logits = last_logits.masked_fill(last_logits < cutoff, float("-inf"))

                probs = F.softmax(last_logits, dim=-1)
                idx_next = torch.multinomial(probs, num_samples=1)
                idx = torch.cat((idx, idx_next), dim=1)

        # Optional: don't force train mode here; let caller control it.
        # self.train()
        return idx


In [29]:
m = BigramLanguageModel4().to(device)
optimizer = torch.optim.AdamW(m.parameters(), lr=1e-3)

batch_size = 32
for _steps in range(10000):
    xb, yb = get_batch('train')
    xb, yb = xb.to(device), yb.to(device)

    logits, loss = m(xb, yb)
    optimizer.zero_grad(set_to_none=True)
    loss.backward()
    optimizer.step()

print(loss.item())
#Ran in 13m 52s
#loss=0.9423200488090515

0.9423200488090515


In [31]:
start = torch.zeros((1,1), dtype=torch.long, device=device)
print(decode(m.generate(start, max_new_tokens=1000)[0].tolist()))


To find me thy greatest friends that must seem,
The raren of the souls had not the heart to say
Yet thus exquisite: 'twere as the mad joy
That balling to the higher-degree;
But that thou with bright forwardly slain:
High as we will devotion his chince;
How he could make a lisping, without-book;
The world doth each one first of the name
To have look what seems unsuspicious and
When in the which shame to win our fox?

AEdile:
It is not.

AEdile:
Do not yet, with these few wonder wonder on thee.

GREGORY:
No, masters, my lord-spearers.

SAMPSON:
If; but we thrive now the benefits not
speak against help; but to be sworth in law,
Whether receive be not stoop'd by the wind.

GREGORY:
They say, bethink me, but not prosper in the guilty
Than I am coat, this right way but drown;
And hastely dived as incle York it did.

KING HENRY VI:
You slew in his charge; bulk as never wear
Thus it makes us the wealth upon this clothes?
And so well I; and this blood upon this eyes,
They were a but sutter'd i