<a href="https://colab.research.google.com/github/prathmeshtiwari22/Pract/blob/main/gpt.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
# ===== Tiny Wizard-of-Oz GPT Q&A (fast) =====
# Requirements: Python + PyTorch
# File needed: wizard_of_oz.txt (UTF-8 text of the book)

import math, random, re, os
from collections import Counter, defaultdict
import torch
import torch.nn as nn
import torch.nn.functional as F

# ----------------------------
# Config (tweak for speed/quality)
# ----------------------------
device = 'cuda' if torch.cuda.is_available() else 'cpu'
block_size   = 256     # context window (keep small for speed)
batch_size   = 16      # smaller = faster
n_embd       = 128     # model width (small)
n_head       = 4       # attention heads
n_layer      = 2       # transformer blocks
dropout      = 0.1
max_iters    = 300     # training steps (keep low for speed)
eval_interval= 100
eval_iters   = 20
learning_rate= 3e-3
gen_tokens   = 200     # tokens to generate for answer

# ----------------------------
# Load book
# ----------------------------
with open('wizard_of_oz.txt', 'r', encoding='utf-8') as f:
    text = f.read().replace('\r','')

# Build char-level vocab (works with any text/characters)
chars = sorted(list(set(text)))
vocab_size = len(chars)
stoi = {ch:i for i,ch in enumerate(chars)}
itos = {i:ch for i,ch in enumerate(chars)}
def encode(s): return [stoi[c] for c in s if c in stoi]
def decode(ids): return ''.join(itos[i] for i in ids)

# Train/val split
n = int(0.95 * len(text))
train_ids = torch.tensor(encode(text[:n]), dtype=torch.long)
val_ids   = torch.tensor(encode(text[n:]), dtype=torch.long)

def get_batch(split):
    data = train_ids if split=='train' else val_ids
    ix = torch.randint(len(data) - block_size - 1, (batch_size,))
    x = torch.stack([data[i:i+block_size] for i in ix])
    y = torch.stack([data[i+1:i+block_size+1] for i in ix])
    return x.to(device), y.to(device)

@torch.no_grad()
def estimate_loss(model):
    out = {}
    model.eval()
    for split in ['train','val']:
        losses = []
        for _ in range(eval_iters):
            xb,yb = get_batch(split)
            _, loss = model(xb, yb)
            losses.append(loss.item())
        out[split] = sum(losses)/len(losses)
    model.train()
    return out

# ----------------------------
# Tiny GPT (Transformer)
# ----------------------------
class Head(nn.Module):
    def __init__(self, head_size):
        super().__init__()
        self.key   = nn.Linear(n_embd, head_size, bias=False)
        self.query = nn.Linear(n_embd, head_size, bias=False)
        self.value = nn.Linear(n_embd, head_size, bias=False)
        self.register_buffer('tril', torch.tril(torch.ones(block_size, block_size)))
        self.dropout = nn.Dropout(dropout)
    def forward(self, x):
        B,T,C = x.shape
        k = self.key(x)   # (B,T,hs)
        q = self.query(x) # (B,T,hs)
        wei = q @ k.transpose(-2,-1) * (k.shape[-1] ** -0.5)  # (B,T,T)
        wei = wei.masked_fill(self.tril[:T,:T]==0, float('-inf'))
        wei = F.softmax(wei, dim=-1)
        wei = self.dropout(wei)
        v = self.value(x)                                   # (B,T,hs)
        out = wei @ v                                       # (B,T,hs)
        return out

class MultiHeadAttention(nn.Module):
    def __init__(self, num_heads, head_size):
        super().__init__()
        self.heads = nn.ModuleList([Head(head_size) for _ in range(num_heads)])
        self.proj  = nn.Linear(num_heads*head_size, n_embd)
        self.dropout = nn.Dropout(dropout)
    def forward(self, x):
        out = torch.cat([h(x) for h in self.heads], dim=-1)
        out = self.dropout(self.proj(out))
        return out

class FeedForward(nn.Module):
    def __init__(self, n_embd):
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(n_embd, 4*n_embd),
            nn.ReLU(),
            nn.Linear(4*n_embd, n_embd),
            nn.Dropout(dropout),
        )
    def forward(self, x): return self.net(x)

class Block(nn.Module):
    def __init__(self, n_embd, n_head):
        super().__init__()
        head_size = n_embd // n_head
        self.sa   = MultiHeadAttention(n_head, head_size)
        self.ffwd = FeedForward(n_embd)
        self.ln1  = nn.LayerNorm(n_embd)
        self.ln2  = nn.LayerNorm(n_embd)
    def forward(self, x):
        x = self.ln1(x + self.sa(x))
        x = self.ln2(x + self.ffwd(x))
        return x

class GPTMini(nn.Module):
    def __init__(self, vocab_size):
        super().__init__()
        self.token_emb = nn.Embedding(vocab_size, n_embd)
        self.pos_emb   = nn.Embedding(block_size, n_embd)
        self.blocks    = nn.Sequential(*[Block(n_embd, n_head) for _ in range(n_layer)])
        self.ln_f      = nn.LayerNorm(n_embd)
        self.lm_head   = nn.Linear(n_embd, vocab_size)
        self.apply(self._init_weights)
    def _init_weights(self, m):
        if isinstance(m, nn.Linear):
            nn.init.normal_(m.weight, mean=0.0, std=0.02)
            if m.bias is not None: nn.init.zeros_(m.bias)
        elif isinstance(m, nn.Embedding):
            nn.init.normal_(m.weight, mean=0.0, std=0.02)
    def forward(self, idx, targets=None):
        B,T = idx.shape
        tok = self.token_emb(idx)                           # (B,T,C)
        pos = self.pos_emb(torch.arange(T, device=idx.device))  # (T,C)
        x = tok + pos
        x = self.blocks(x)
        x = self.ln_f(x)
        logits = self.lm_head(x)                           # (B,T,vocab)
        loss = None
        if targets is not None:
            B,T,C = logits.shape
            loss = F.cross_entropy(logits.view(B*T, C), targets.view(B*T))
        return logits, loss
    @torch.no_grad()
    def generate(self, idx, max_new_tokens=gen_tokens):
        self.eval()
        for _ in range(max_new_tokens):
            idx_cond = idx[:, -block_size:]
            logits,_ = self(idx_cond)
            logits = logits[:, -1, :]
            probs = F.softmax(logits, dim=-1)
            idx_next = torch.multinomial(probs, num_samples=1)
            idx = torch.cat((idx, idx_next), dim=1)
        return idx

# ----------------------------
# Train tiny GPT (quick)
# ----------------------------
model = GPTMini(vocab_size).to(device)
optimizer = torch.optim.AdamW(model.parameters(), lr=learning_rate)

for it in range(max_iters):
    if it % eval_interval == 0:
        losses = estimate_loss(model)
        print(f"step {it:4d} | train {losses['train']:.3f} | val {losses['val']:.3f}")
    xb,yb = get_batch('train')
    _, loss = model(xb, yb)
    optimizer.zero_grad(set_to_none=True)
    loss.backward()
    optimizer.step()

print("Final loss:", loss.item())

# ----------------------------
# Ultra-simple Retriever (fast)
# ----------------------------
# Split book into overlapping chunks (by characters). Score by word overlap.
CHUNK_CHARS = 800
STRIDE      = 600

def simple_tokenize_words(s):
    # Lowercase, keep letters/numbers, split on non-alphanum
    return re.findall(r"[a-zA-Z0-9']+", s.lower())

# make chunks
chunks = []
for start in range(0, len(text), STRIDE):
    chunk = text[start:start+CHUNK_CHARS]
    if len(chunk) < 100: break
    chunks.append(chunk)

# precompute word counts
chunk_word_counts = [Counter(simple_tokenize_words(c)) for c in chunks]

def retrieve_context(question, top_k=3):
    q_words = simple_tokenize_words(question)
    if not q_words:
        return ""
    scores = []
    q_set = set(q_words)
    for i, wc in enumerate(chunk_word_counts):
        # score: sum of frequencies for query words (very fast)
        score = sum(wc.get(w, 0) for w in q_set)
        scores.append((score, i))
    scores.sort(reverse=True)
    best = [chunks[i] for (score,i) in scores[:top_k] if score > 0]
    return "\n---\n".join(best) if best else chunks[0]

# ----------------------------
# Answer questions using:  Context + Question → tiny GPT
# ----------------------------
INSTRUCTION = (
    "You are answering questions only from the provided context. "
    "If unsure, say you don't know.\n\n"
)

def answer(question, max_new_tokens=gen_tokens):
    context = retrieve_context(question, top_k=3)
    prompt = (
        INSTRUCTION +
        "Context:\n" + context + "\n\n" +
        "Question: " + question.strip() + "\nAnswer: "
    )
    # Encode prompt; drop chars not in vocab
    idx = torch.tensor([encode(prompt)], dtype=torch.long, device=device)
    with torch.no_grad():
        out = model.generate(idx, max_new_tokens=max_new_tokens)[0].tolist()
    generated = decode(out)
    # Only return what's after "Answer: "
    ans = generated.split("Answer:", 1)[-1]
    return ans.strip()

# ----------------------------
# Demo
# ----------------------------
examples = [
    "Who is Dorothy?",
    "Where does the Yellow Brick Road lead?",
    "Who helps Dorothy along the way?",
    "How does Dorothy return home?",
]

for q in examples:
    print("\nQ:", q)
    print("A:", answer(q, max_new_tokens=200)[:600])


step    0 | train 4.304 | val 4.305
step  100 | train 2.423 | val 2.373
step  200 | train 2.350 | val 2.302
Final loss: 2.3036084175109863

Q: Who is Dorothy?
A: s Bet acede swayomarered eate y to gred wl Do
of aupashucrmpist wasid ty ithe  toronher l by oveler. C ghed Ozz, ned whey tund hed sishe
le avededom pin an hoy beag eackerun aricanqurswitcanhingh wsth

Q: Where does the Yellow Brick Road lead?
A: gryomo lordee fanomedirnur ither.
"
[dwhon hanourndnd ave,"
t the be s s. s "I t whire. rabearanord hum Tivor ps te jaghin
" whobuct as uthal t'd anf goris ar."

"

"An onco a "D ast, the wirthereal r

Q: Who helps Dorothy along the way?
A: Tinddng stheele gow."I'mas tarisa afort hewnd whersty kser
Oz n nsas, crorcode hare sard thaping towood tht wheyeve;
s, Skinca wexcre arinyoundon nd andit tse w as roue'shashat the clond
heme whe Nof

Q: How does Dorothy return home?
A: akok brouthyoaru teshefing t and," wabee meld r wof f the p.
e!"Thure otheder yold of her ad to aseand tlato mand

In [3]:
# ===== Word-level Tiny GPT for Wizard of Oz Q&A (FAST) =====
# Files: wizard_of_oz.txt (UTF-8)
# pip install torch  (if needed)

import re, math, random
from collections import Counter
import torch
import torch.nn as nn
import torch.nn.functional as F

# ----------------------------
# Speed / size knobs (tweak here)
# ----------------------------
device        = 'cuda' if torch.cuda.is_available() else 'cpu'
block_size    = 64     # context window in tokens (keep small for speed)
batch_size    = 32
n_embd        = 96
n_head        = 4
n_layer       = 2
dropout       = 0.1
learning_rate = 3e-3
max_iters     = 1200   # ~ quick; try 2000-4000 for better quality
eval_interval = 200
eval_iters    = 20
gen_tokens    = 120    # tokens to generate for each answer

random.seed(1337)
torch.manual_seed(1337)
if device == 'cuda':
    torch.cuda.manual_seed_all(1337)

# ----------------------------
# Load & tokenize (WORD-level)
# ----------------------------
with open('wizard_of_oz.txt', 'r', encoding='utf-8') as f:
    raw_text = f.read().replace('\r','')

# Simple word tokenizer: words + numbers + apostrophes; keep punctuation separate
def word_tokenize(s: str):
    # words/numbers/apostrophes or single non-space, non-word punct
    return re.findall(r"[A-Za-z0-9']+|[^\w\s]", s)

tokens = word_tokenize(raw_text)
# Build vocab
word_freq = Counter(tokens)
vocab = sorted(word_freq.keys())
stoi = {w:i for i,w in enumerate(vocab)}
itos = {i:w for w,i in stoi.items()}
vocab_size = len(vocab)

def encode(words):
    return [stoi[w] for w in words if w in stoi]

def decode(ids):
    # join words w/ spaces but avoid spaces before punctuation
    ws = [itos[i] for i in ids]
    out = []
    for i,w in enumerate(ws):
        if i>0 and re.match(r"[^\w\s]", w):  # punctuation
            out[-1] = out[-1] + w
        else:
            out.append(w)
    return " ".join(out)

# Train/Val split on token ids
all_ids = torch.tensor(encode(tokens), dtype=torch.long)
n = int(0.95*len(all_ids))
train_ids = all_ids[:n]
val_ids   = all_ids[n:]

def get_batch(split):
    data = train_ids if split=='train' else val_ids
    ix = torch.randint(0, len(data) - block_size - 1, (batch_size,))
    x = torch.stack([data[i:i+block_size] for i in ix])
    y = torch.stack([data[i+1:i+block_size+1] for i in ix])
    return x.to(device), y.to(device)

@torch.no_grad()
def estimate_loss(model):
    out = {}
    model.eval()
    for split in ['train','val']:
        losses = []
        for _ in range(eval_iters):
            xb,yb = get_batch(split)
            _, loss = model(xb, yb)
            losses.append(loss.item())
        out[split] = sum(losses)/len(losses)
    model.train()
    return out

# ----------------------------
# Tiny GPT (Transformer, word-level)
# ----------------------------
class Head(nn.Module):
    def __init__(self, head_size):
        super().__init__()
        self.key   = nn.Linear(n_embd, head_size, bias=False)
        self.query = nn.Linear(n_embd, head_size, bias=False)
        self.value = nn.Linear(n_embd, head_size, bias=False)
        self.register_buffer('tril', torch.tril(torch.ones(block_size, block_size)))
        self.dropout = nn.Dropout(dropout)
    def forward(self, x):
        B,T,C = x.shape
        k = self.key(x)
        q = self.query(x)
        wei = q @ k.transpose(-2,-1) * (k.shape[-1] ** -0.5) # (B,T,T)
        wei = wei.masked_fill(self.tril[:T,:T]==0, float('-inf'))
        wei = F.softmax(wei, dim=-1)
        wei = self.dropout(wei)
        v = self.value(x)
        out = wei @ v
        return out

class MultiHeadAttention(nn.Module):
    def __init__(self, num_heads, head_size):
        super().__init__()
        self.heads = nn.ModuleList([Head(head_size) for _ in range(num_heads)])
        self.proj  = nn.Linear(num_heads*head_size, n_embd)
        self.dropout = nn.Dropout(dropout)
    def forward(self, x):
        out = torch.cat([h(x) for h in self.heads], dim=-1)
        out = self.dropout(self.proj(out))
        return out

class FeedForward(nn.Module):
    def __init__(self, n_embd):
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(n_embd, 4*n_embd),
            nn.ReLU(),
            nn.Linear(4*n_embd, n_embd),
            nn.Dropout(dropout),
        )
    def forward(self, x): return self.net(x)

class Block(nn.Module):
    def __init__(self, n_embd, n_head):
        super().__init__()
        head_size = n_embd // n_head
        self.sa   = MultiHeadAttention(n_head, head_size)
        self.ffwd = FeedForward(n_embd)
        self.ln1  = nn.LayerNorm(n_embd)
        self.ln2  = nn.LayerNorm(n_embd)
    def forward(self, x):
        x = self.ln1(x + self.sa(x))
        x = self.ln2(x + self.ffwd(x))
        return x

class GPTWordMini(nn.Module):
    def __init__(self, vocab_size):
        super().__init__()
        self.token_emb = nn.Embedding(vocab_size, n_embd)
        self.pos_emb   = nn.Embedding(block_size, n_embd)
        self.blocks    = nn.Sequential(*[Block(n_embd, n_head) for _ in range(n_layer)])
        self.ln_f      = nn.LayerNorm(n_embd)
        self.lm_head   = nn.Linear(n_embd, vocab_size)
        self.apply(self._init_weights)
    def _init_weights(self, m):
        if isinstance(m, nn.Linear):
            nn.init.normal_(m.weight, mean=0.0, std=0.02)
            if m.bias is not None: nn.init.zeros_(m.bias)
        elif isinstance(m, nn.Embedding):
            nn.init.normal_(m.weight, mean=0.0, std=0.02)
    def forward(self, idx, targets=None):
        B,T = idx.shape
        tok = self.token_emb(idx)                            # (B,T,C)
        pos = self.pos_emb(torch.arange(T, device=idx.device)) # (T,C)
        x = tok + pos
        x = self.blocks(x)
        x = self.ln_f(x)
        logits = self.lm_head(x)                            # (B,T,vocab)
        loss = None
        if targets is not None:
            B,T,C = logits.shape
            loss = F.cross_entropy(logits.view(B*T, C), targets.view(B*T))
        return logits, loss
    @torch.no_grad()
    def generate(self, idx, max_new_tokens=gen_tokens):
        self.eval()
        for _ in range(max_new_tokens):
            idx_cond = idx[:, -block_size:]
            logits,_ = self(idx_cond)
            logits = logits[:, -1, :]
            probs = F.softmax(logits, dim=-1)
            idx_next = torch.multinomial(probs, num_samples=1)
            idx = torch.cat((idx, idx_next), dim=1)
        return idx

# ----------------------------
# Train quickly
# ----------------------------
model = GPTWordMini(vocab_size).to(device)
optimizer = torch.optim.AdamW(model.parameters(), lr=learning_rate)

for it in range(max_iters):
    if it % eval_interval == 0:
        losses = estimate_loss(model)
        print(f"step {it:4d} | train {losses['train']:.3f} | val {losses['val']:.3f}")
    xb,yb = get_batch('train')
    _, loss = model(xb, yb)
    optimizer.zero_grad(set_to_none=True)
    loss.backward()
    optimizer.step()

print("Final loss:", loss.item())

# ----------------------------
# Ultra-fast retriever (word overlap)
# ----------------------------
# Make overlapping chunks of the book (by word tokens)
CHUNK_TOKENS = 220
STRIDE       = 180

# Convert back to words for chunks
words = [itos[i] for i in all_ids.tolist()]
chunks = []
for start in range(0, len(words), STRIDE):
    chunk_words = words[start:start+CHUNK_TOKENS]
    if len(chunk_words) < 40: break
    chunks.append(" ".join(chunk_words))

chunk_counts = [Counter(w.lower() for w in word_tokenize(c)) for c in chunks]

def retrieve_context(question, top_k=3):
    q_words = [w.lower() for w in word_tokenize(question)]
    q_set = set(q_words)
    scores = []
    for i, wc in enumerate(chunk_counts):
        score = sum(wc.get(w, 0) for w in q_set)
        scores.append((score, i))
    scores.sort(reverse=True)
    best = [chunks[i] for (score,i) in scores[:top_k] if score>0]
    return "\n---\n".join(best) if best else chunks[0]

# ----------------------------
# Q&A: Context + Question → prompt → generate answer
# ----------------------------
INSTRUCTION = (
    "Answer only using the context. If unsure, say you don't know.\n\n"
)

def answer(question, max_new_tokens=gen_tokens):
    context = retrieve_context(question, top_k=3)
    prompt = (
        INSTRUCTION +
        "Context:\n" + context + "\n\n" +
        "Question: " + question.strip() + "\nAnswer:"
    )
    # Encode prompt at word level
    prompt_tokens = word_tokenize(prompt)
    prompt_ids = torch.tensor([encode(prompt_tokens)], dtype=torch.long, device=device)
    out_ids = model.generate(prompt_ids, max_new_tokens=max_new_tokens)[0].tolist()
    generated = decode(out_ids)
    # Return only after 'Answer:'
    return generated.split("Answer:",1)[-1].strip()

# ----------------------------
# Demo
# ----------------------------
queries = [
    "Who is Dorothy?",
    "Where does the Yellow Brick Road lead?",
    "Who helps Dorothy along the way?",
    "How does Dorothy return home?",
]

for q in queries:
    print("\nQ:", q)
    print("A:", answer(q)[:600])


step    0 | train 7.728 | val 7.719
step  200 | train 1.710 | val 6.419
step  400 | train 0.287 | val 8.539
step  600 | train 0.181 | val 9.229
step  800 | train 0.154 | val 9.766
step 1000 | train 0.138 | val 10.063
Final loss: 0.1706017702817917

Q: Who is Dorothy?
A: only the. If, say you don't know.: blue and his clothes scarlet, and Dorothy noticed that every button on his jacket was the head of some animal. The top button was a bear's head and the next button a wolf's head; the next was a cat's head and the next a weasel's head, while the last button of all was the head of a field- mouse. When Dorothy looked into the eyes of these animals' heads, they all nodded and said in a chorus:" Don't believe all you hear, little girl!"[ Illustration]" Silence!" said the small ferryman, slapping each button head in turn, but not hard enough to hurt them. Then he t

Q: Where does the Yellow Brick Road lead?
A: only the. If, say you don't know.: it while fishing for his friend." All together 

🚀 GPT Workflow (Step by Step)

Input → Numbers (Tokenization)

Text is first broken into tokens (words, subwords, or characters).

Each token is converted into a number (an integer ID).
Example: "Dorothy is in Oz" → [101, 56, 77, 888].

Embedding Layer → Vectors

Each number (token ID) is mapped to a vector (dense representation).

These vectors capture meaning.
Example:
101 → [0.2, -0.4, 0.7, ...].

Positional Embedding

Since GPT reads in order, we add positional info so the model knows word order.

Example: "Dorothy" at position 1 ≠ "Dorothy" at position 10`.

Transformer Blocks (Stacked many times)
Each block has:

Self-Attention: Looks at all tokens and learns which words relate.
(e.g., "Dorothy" relates to "Oz").

FeedForward Network: Extra processing for richer meaning.

Residual + LayerNorm: Keeps training stable.

Multi-Head Attention

Instead of one "attention view", GPT uses multiple "heads".

Each head looks at relationships differently (syntax, meaning, long-range).

All heads are combined.

Output Layer (Prediction of Next Token)

After transformer blocks, the model predicts the probability of the next token.

Example: Input = "Dorothy is" → Model predicts "in" with highest probability.

Generation (Autoregressive Loop)

The predicted token is added back to input.

Repeat until max length is reached.

Example: "Dorothy is" → "in" → "Oz" → ".".

✅ In short:
Text → Numbers → Embeddings → Positional Info → Transformer Blocks (Attention + FeedForward) → MultiHead → Output Prediction → Generation **Loop**