# PicoGPT Inference Notebook

This notebook demonstrates how to load and run inference with GPT-2 using the picoGPT implementation.

picoGPT is a minimal implementation of GPT-2 in pure NumPy, making it easy to understand the core concepts of transformer-based language models.

**Features:**
- Load pre-trained GPT-2 weights
- BPE tokenization
- Text generation with greedy decoding

## 1. Setup and Imports

In [None]:
import os
import json
import re
from functools import lru_cache

import numpy as np
import regex
import requests
import tensorflow as tf
from tqdm import tqdm

## 2. BPE Tokenizer (Encoder)

The GPT-2 tokenizer uses Byte Pair Encoding (BPE) to convert text into tokens.

In [None]:
@lru_cache()
def bytes_to_unicode():
    """
    Returns list of utf-8 byte and a corresponding list of unicode strings.
    The reversible bpe codes work on unicode strings.
    """
    bs = list(range(ord("!"), ord("~") + 1)) + list(range(ord("¡"), ord("¬") + 1)) + list(range(ord("®"), ord("ÿ") + 1))
    cs = bs[:]
    n = 0
    for b in range(2**8):
        if b not in bs:
            bs.append(b)
            cs.append(2**8 + n)
            n += 1
    cs = [chr(n) for n in cs]
    return dict(zip(bs, cs))


def get_pairs(word):
    """Return set of symbol pairs in a word."""
    pairs = set()
    prev_char = word[0]
    for char in word[1:]:
        pairs.add((prev_char, char))
        prev_char = char
    return pairs


class Encoder:
    """BPE Encoder/Decoder for GPT-2."""
    
    def __init__(self, encoder, bpe_merges, errors="replace"):
        self.encoder = encoder
        self.decoder = {v: k for k, v in self.encoder.items()}
        self.errors = errors
        self.byte_encoder = bytes_to_unicode()
        self.byte_decoder = {v: k for k, v in self.byte_encoder.items()}
        self.bpe_ranks = dict(zip(bpe_merges, range(len(bpe_merges))))
        self.cache = {}
        self.pat = regex.compile(r"""'s|'t|'re|'ve|'m|'ll|'d| ?\p{L}+| ?\p{N}+| ?[^\s\p{L}\p{N}]+|\s+(?!\S)|\s+""")

    def bpe(self, token):
        if token in self.cache:
            return self.cache[token]
        word = tuple(token)
        pairs = get_pairs(word)

        if not pairs:
            return token

        while True:
            bigram = min(pairs, key=lambda pair: self.bpe_ranks.get(pair, float("inf")))
            if bigram not in self.bpe_ranks:
                break
            first, second = bigram
            new_word = []
            i = 0
            while i < len(word):
                try:
                    j = word.index(first, i)
                    new_word.extend(word[i:j])
                    i = j
                except:
                    new_word.extend(word[i:])
                    break

                if word[i] == first and i < len(word) - 1 and word[i + 1] == second:
                    new_word.append(first + second)
                    i += 2
                else:
                    new_word.append(word[i])
                    i += 1
            new_word = tuple(new_word)
            word = new_word
            if len(word) == 1:
                break
            else:
                pairs = get_pairs(word)
        word = " ".join(word)
        self.cache[token] = word
        return word

    def encode(self, text):
        """Encode text to token IDs."""
        bpe_tokens = []
        for token in regex.findall(self.pat, text):
            token = "".join(self.byte_encoder[b] for b in token.encode("utf-8"))
            bpe_tokens.extend(self.encoder[bpe_token] for bpe_token in self.bpe(token).split(" "))
        return bpe_tokens

    def decode(self, tokens):
        """Decode token IDs back to text."""
        text = "".join([self.decoder[token] for token in tokens])
        text = bytearray([self.byte_decoder[c] for c in text]).decode("utf-8", errors=self.errors)
        return text


def get_encoder(model_name, models_dir):
    """Load the BPE encoder from files."""
    with open(os.path.join(models_dir, model_name, "encoder.json"), "r") as f:
        encoder = json.load(f)
    with open(os.path.join(models_dir, model_name, "vocab.bpe"), "r", encoding="utf-8") as f:
        bpe_data = f.read()
    bpe_merges = [tuple(merge_str.split()) for merge_str in bpe_data.split("\n")[1:-1]]
    return Encoder(encoder=encoder, bpe_merges=bpe_merges)

## 3. Model Loading Utilities

Functions to download GPT-2 weights from OpenAI and load them from TensorFlow checkpoints.

In [None]:
def download_gpt2_files(model_size, model_dir):
    """Download GPT-2 model files from OpenAI."""
    assert model_size in ["124M", "355M", "774M", "1558M"]
    for filename in [
        "checkpoint",
        "encoder.json",
        "hparams.json",
        "model.ckpt.data-00000-of-00001",
        "model.ckpt.index",
        "model.ckpt.meta",
        "vocab.bpe",
    ]:
        url = "https://openaipublic.blob.core.windows.net/gpt-2/models"
        r = requests.get(f"{url}/{model_size}/{filename}", stream=True)
        r.raise_for_status()

        with open(os.path.join(model_dir, filename), "wb") as f:
            file_size = int(r.headers["content-length"])
            chunk_size = 1000
            with tqdm(
                ncols=100,
                desc="Fetching " + filename,
                total=file_size,
                unit_scale=True,
                unit="b",
            ) as pbar:
                for chunk in r.iter_content(chunk_size=chunk_size):
                    f.write(chunk)
                    pbar.update(chunk_size)


def load_gpt2_params_from_tf_ckpt(tf_ckpt_path, hparams):
    """Load GPT-2 parameters from TensorFlow checkpoint."""
    def set_in_nested_dict(d, keys, val):
        if not keys:
            return val
        if keys[0] not in d:
            d[keys[0]] = {}
        d[keys[0]] = set_in_nested_dict(d[keys[0]], keys[1:], val)
        return d

    params = {"blocks": [{} for _ in range(hparams["n_layer"])]}
    for name, _ in tf.train.list_variables(tf_ckpt_path):
        array = np.squeeze(tf.train.load_variable(tf_ckpt_path, name))
        name = name[len("model/") :]
        if name.startswith("h"):
            m = re.match(r"h([0-9]+)/(.*)", name)
            n = int(m[1])
            sub_name = m[2]
            set_in_nested_dict(params["blocks"][n], sub_name.split("/"), array)
        else:
            set_in_nested_dict(params, name.split("/"), array)

    return params


def load_encoder_hparams_and_params(model_size, models_dir):
    """Load encoder, hyperparameters, and model parameters."""
    assert model_size in ["124M", "355M", "774M", "1558M"]

    model_dir = os.path.join(models_dir, model_size)
    tf_ckpt_path = tf.train.latest_checkpoint(model_dir)
    if not tf_ckpt_path:  # download files if necessary
        os.makedirs(model_dir, exist_ok=True)
        download_gpt2_files(model_size, model_dir)
        tf_ckpt_path = tf.train.latest_checkpoint(model_dir)

    encoder = get_encoder(model_size, models_dir)
    hparams = json.load(open(os.path.join(model_dir, "hparams.json")))
    params = load_gpt2_params_from_tf_ckpt(tf_ckpt_path, hparams)

    return encoder, hparams, params

## 4. GPT-2 Model Components

These are the core building blocks of the GPT-2 architecture, implemented in pure NumPy.

In [None]:
def gelu(x):
    """Gaussian Error Linear Unit activation function."""
    return 0.5 * x * (1 + np.tanh(np.sqrt(2 / np.pi) * (x + 0.044715 * x**3)))


def softmax(x):
    """Numerically stable softmax function."""
    exp_x = np.exp(x - np.max(x, axis=-1, keepdims=True))
    return exp_x / np.sum(exp_x, axis=-1, keepdims=True)


def layer_norm(x, g, b, eps: float = 1e-5):
    """Layer normalization."""
    mean = np.mean(x, axis=-1, keepdims=True)
    variance = np.var(x, axis=-1, keepdims=True)
    x = (x - mean) / np.sqrt(variance + eps)  # normalize x to have mean=0 and var=1 over last axis
    return g * x + b  # scale and offset with gamma/beta params


def linear(x, w, b):  # [m, in], [in, out], [out] -> [m, out]
    """Linear transformation."""
    return x @ w + b

In [None]:
def ffn(x, c_fc, c_proj):  # [n_seq, n_embd] -> [n_seq, n_embd]
    """Feed-forward network (MLP) in transformer."""
    # project up
    a = gelu(linear(x, **c_fc))  # [n_seq, n_embd] -> [n_seq, 4*n_embd]

    # project back down
    x = linear(a, **c_proj)  # [n_seq, 4*n_embd] -> [n_seq, n_embd]

    return x


def attention(q, k, v, mask):  # [n_q, d_k], [n_k, d_k], [n_k, d_v], [n_q, n_k] -> [n_q, d_v]
    """Scaled dot-product attention."""
    return softmax(q @ k.T / np.sqrt(q.shape[-1]) + mask) @ v


def mha(x, c_attn, c_proj, n_head):  # [n_seq, n_embd] -> [n_seq, n_embd]
    """Multi-head attention."""
    # qkv projection
    x = linear(x, **c_attn)  # [n_seq, n_embd] -> [n_seq, 3*n_embd]

    # split into qkv
    qkv = np.split(x, 3, axis=-1)  # [n_seq, 3*n_embd] -> [3, n_seq, n_embd]

    # split into heads
    qkv_heads = list(map(lambda x: np.split(x, n_head, axis=-1), qkv))  # [3, n_seq, n_embd] -> [3, n_head, n_seq, n_embd/n_head]

    # causal mask to hide future inputs from being attended to
    causal_mask = (1 - np.tri(x.shape[0], dtype=x.dtype)) * -1e10  # [n_seq, n_seq]

    # perform attention over each head
    out_heads = [attention(q, k, v, causal_mask) for q, k, v in zip(*qkv_heads)]  # [3, n_head, n_seq, n_embd/n_head] -> [n_head, n_seq, n_embd/n_head]

    # merge heads
    x = np.hstack(out_heads)  # [n_head, n_seq, n_embd/n_head] -> [n_seq, n_embd]

    # out projection
    x = linear(x, **c_proj)  # [n_seq, n_embd] -> [n_seq, n_embd]

    return x

In [None]:
def transformer_block(x, mlp, attn, ln_1, ln_2, n_head):  # [n_seq, n_embd] -> [n_seq, n_embd]
    """A single transformer block."""
    # multi-head causal self attention
    x = x + mha(layer_norm(x, **ln_1), **attn, n_head=n_head)  # [n_seq, n_embd] -> [n_seq, n_embd]

    # position-wise feed forward network
    x = x + ffn(layer_norm(x, **ln_2), **mlp)  # [n_seq, n_embd] -> [n_seq, n_embd]

    return x


def gpt2(inputs, wte, wpe, blocks, ln_f, n_head):  # [n_seq] -> [n_seq, n_vocab]
    """GPT-2 forward pass."""
    # token + positional embeddings
    x = wte[inputs] + wpe[range(len(inputs))]  # [n_seq] -> [n_seq, n_embd]

    # forward pass through n_layer transformer blocks
    for block in blocks:
        x = transformer_block(x, **block, n_head=n_head)  # [n_seq, n_embd] -> [n_seq, n_embd]

    # projection to vocab
    x = layer_norm(x, **ln_f)  # [n_seq, n_embd] -> [n_seq, n_embd]
    return x @ wte.T  # [n_seq, n_embd] -> [n_seq, n_vocab]

## 5. Text Generation Function

In [None]:
def generate(inputs, params, n_head, n_tokens_to_generate):
    """Generate tokens autoregressively using greedy decoding."""
    for _ in tqdm(range(n_tokens_to_generate), desc="Generating"):
        logits = gpt2(inputs, **params, n_head=n_head)  # model forward pass
        next_id = np.argmax(logits[-1])  # greedy sampling
        inputs.append(int(next_id))  # append prediction to input

    return inputs[len(inputs) - n_tokens_to_generate:]  # only return generated ids

## 6. Load Model Weights and Tokenizer

We'll load the pre-trained GPT-2 model. The model will be downloaded automatically if not present.

Available model sizes:
- `124M` - Small (default)
- `355M` - Medium
- `774M` - Large
- `1558M` - XL

In [None]:
# Configuration
MODEL_SIZE = "124M"  # Choose from: "124M", "355M", "774M", "1558M"
MODELS_DIR = "models"  # Directory to store downloaded models

# Load encoder (tokenizer), hyperparameters, and model parameters
print(f"Loading GPT-2 {MODEL_SIZE} model...")
encoder, hparams, params = load_encoder_hparams_and_params(MODEL_SIZE, MODELS_DIR)
print("Model loaded successfully!")

In [None]:
# Display model hyperparameters
print("Model Hyperparameters:")
print(f"  - Number of layers (n_layer): {hparams['n_layer']}")
print(f"  - Number of attention heads (n_head): {hparams['n_head']}")
print(f"  - Embedding dimension (n_embd): {hparams['n_embd']}")
print(f"  - Vocabulary size (n_vocab): {hparams['n_vocab']}")
print(f"  - Context length (n_ctx): {hparams['n_ctx']}")

## 7. Run Inference

Now let's generate some text! You can modify the prompt and the number of tokens to generate.

In [None]:
# Input prompt
prompt = "Alan Turing theorized that computers would one day become"

# Number of tokens to generate
n_tokens_to_generate = 40

print(f"Prompt: {prompt}")
print(f"Generating {n_tokens_to_generate} tokens...")
print()

In [None]:
# Encode the input prompt
input_ids = encoder.encode(prompt)
print(f"Input token IDs: {input_ids}")
print(f"Number of input tokens: {len(input_ids)}")

# Make sure we don't exceed the context length
assert len(input_ids) + n_tokens_to_generate < hparams["n_ctx"], \
    f"Total tokens ({len(input_ids) + n_tokens_to_generate}) exceeds context length ({hparams['n_ctx']})"

In [None]:
# Generate output tokens
output_ids = generate(input_ids, params, hparams["n_head"], n_tokens_to_generate)

# Decode the generated tokens back to text
output_text = encoder.decode(output_ids)

print("\n" + "="*50)
print("Generated Text:")
print("="*50)
print(f"{prompt}{output_text}")

## 8. Interactive Generation

Try different prompts below!

In [None]:
def generate_text(prompt, n_tokens=40):
    """Helper function to generate text from a prompt."""
    input_ids = encoder.encode(prompt)
    
    if len(input_ids) + n_tokens >= hparams["n_ctx"]:
        print(f"Warning: Reducing tokens to fit context length")
        n_tokens = hparams["n_ctx"] - len(input_ids) - 1
    
    output_ids = generate(input_ids, params, hparams["n_head"], n_tokens)
    output_text = encoder.decode(output_ids)
    
    return prompt + output_text

In [None]:
# Try your own prompts!
my_prompt = "The future of artificial intelligence is"
result = generate_text(my_prompt, n_tokens=50)
print(result)

In [None]:
# Another example
my_prompt = "In a world where robots"
result = generate_text(my_prompt, n_tokens=50)
print(result)

## 9. Understanding the Model Architecture

Let's explore the model's parameters to better understand its structure.

In [None]:
# Explore the parameter structure
print("Top-level parameters:")
for key in params.keys():
    if key != 'blocks':
        if isinstance(params[key], dict):
            print(f"  {key}: {list(params[key].keys())}")
        else:
            print(f"  {key}: shape = {params[key].shape}")

print(f"\nNumber of transformer blocks: {len(params['blocks'])}")

In [None]:
# Explore a single transformer block
block = params['blocks'][0]
print("Structure of a transformer block:")
for key, value in block.items():
    if isinstance(value, dict):
        print(f"  {key}:")
        for k, v in value.items():
            print(f"    {k}: shape = {v.shape}")
    else:
        print(f"  {key}: shape = {value.shape}")

In [None]:
# Calculate total number of parameters
def count_params(d):
    total = 0
    for key, value in d.items():
        if isinstance(value, dict):
            total += count_params(value)
        elif isinstance(value, list):
            for item in value:
                if isinstance(item, dict):
                    total += count_params(item)
        elif isinstance(value, np.ndarray):
            total += value.size
    return total

total_params = count_params(params)
print(f"Total number of parameters: {total_params:,}")
print(f"Approximately: {total_params / 1e6:.1f}M parameters")

## 10. Tokenizer Exploration

Let's see how the BPE tokenizer works.

In [None]:
# Encode and decode examples
test_texts = [
    "Hello, world!",
    "GPT-2 is a large language model.",
    "The quick brown fox jumps over the lazy dog.",
]

for text in test_texts:
    tokens = encoder.encode(text)
    decoded = encoder.decode(tokens)
    print(f"Original: '{text}'")
    print(f"Tokens: {tokens}")
    print(f"Decoded: '{decoded}'")
    print(f"Number of tokens: {len(tokens)}")
    print()