<a href="https://colab.research.google.com/github/sabarishraja/Transformer-pirate-style-generator/blob/main/Transformer_final.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
!pip -q install -U spacy==3.7.4 textacy==0.13.0 spacy-lookups-data
!python -m spacy download en_core_web_sm

[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m61.0/61.0 kB[0m [31m4.6 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m6.5/6.5 MB[0m [31m88.4 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m210.7/210.7 kB[0m [31m17.5 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m98.5/98.5 MB[0m [31m9.7 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m2.1/2.1 MB[0m [31m85.6 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m321.6/321.6 kB[0m [31m30.4 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m355.9/355.9 kB[0m [31m30.3 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m2.1/2.1 MB[0m [31m90.0 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━

In [1]:
import os, re, json, csv, textwrap, requests
import spacy
from spacy.tokenizer import Tokenizer
from spacy.util import compile_infix_regex
from spacy.symbols import ORTH
import textacy
import textacy.preprocessing as tprep
from textacy.extract import ngrams
from collections import Counter
import numpy as np
import pandas as pd
from pathlib import Path

In [2]:
BOOKS = [
    ("Treasure Island (Stevenson)", "https://www.gutenberg.org/cache/epub/120/pg120.txt"),
    ("Captain Blood (Sabatini)", "https://www.gutenberg.org/ebooks/1965.txt.utf-8"),
    ("Captain Singleton (Defoe)", "https://www.gutenberg.org/ebooks/6422.txt.utf-8")
]
print("Books loaded:", len(BOOKS))


Books loaded: 3


# Strip to the start content of book

In [3]:
def fetch_text(url):
    r = requests.get(url, timeout=60); r.raise_for_status(); return r.text

def strip_gutenberg(txt):
    s = re.search(r"\*\*\* START OF(.*)\*\*\*", txt)
    e = re.search(r"\*\*\* END OF(.*)\*\*\*", txt)
    if s and e and s.end() < e.start():
        txt = txt[s.end():e.start()]
    return txt.replace("\r\n","\n").strip()

def light_preprocess(s):
    s = tprep.normalize.unicode(s, form="NFKC")
    s = tprep.normalize.quotation_marks(s)
    s = tprep.normalize.hyphenated_words(s)
    s = tprep.normalize.whitespace(s)
    return s.strip()

# TEST
raw = fetch_text(BOOKS[2][1])
core = strip_gutenberg(raw)
clean = light_preprocess(core)
print("Sample clean excerpt:\n", clean[:300])
print("Chars:", len(clean))


Sample clean excerpt:
 Produced by Tom Allen, Charles Franks and the Online
Distributed Proofreading Team
CAPTAIN SINGLETON
By Daniel Defoe
With An Introduction By Edward Garnett
[Transcriber's Note: In the print copy, the following words and those of
the title page are written in intricate, illuminated calligraphy.]
A TA
Chars: 592374


In [4]:
# Build combined corpus (mentor expects a single input.txt)
clean_texts = []
for title, url in BOOKS:
    raw   = fetch_text(url)
    core  = strip_gutenberg(raw)
    clean = light_preprocess(core)
    clean_texts.append(clean)

combined = "\n\n".join(clean_texts)

Path("input.txt").write_text(combined, encoding="utf-8")
print("Wrote input.txt with", len(combined), "characters and", len(BOOKS), "books.")

Wrote input.txt with 1593384 characters and 3 books.


# Building the Transformer model

In [5]:
import torch
import torch.nn as nn
import torch.nn.functional as F

In [6]:
device = 'cuda' if torch.cuda.is_available() else 'cpu'
print("Using device:", device)

Using device: cuda


In [7]:
with open("input.txt", "r", encoding="utf-8") as f:
    text = f.read()

In [8]:
characters = sorted(list(set(text)))
vocab_size = len(characters)
print("Vocab size:", vocab_size)

char_to_idx = { ch:i for i,ch in enumerate(characters) }
idx_to_char = { i:ch for i,ch in enumerate(characters) }
encode = lambda xs: [char_to_idx[x] for x in xs]
decode = lambda xs: ''.join([idx_to_char[x] for x in xs])


Vocab size: 82


In [9]:
data = torch.tensor(encode(text), dtype=torch.long)
n = int(len(text) * 0.9)
train_data = data[:n]
val_data   = data[n:]

In [10]:
def get_batch(split, batch_size, context_size):
    data_src = train_data if split == 'train' else val_data
    ix = torch.randint(len(data_src) - context_size, (batch_size,))
    x = torch.stack([data_src[i:i+context_size] for i in ix])
    y = torch.stack([data_src[i+1:i+context_size+1] for i in ix])
    x, y = x.to(device), y.to(device)
    return x, y

In [11]:
@torch.no_grad()
def generate(model, context_size, start_idx, number_of_tokens, temperature=0.9, top_p=0.9):
    import torch
    import torch.nn.functional as F
    idx = start_idx
    model.eval()
    for _ in range(number_of_tokens):
        idx_cond = idx[:, -context_size:]
        logits, _ = model(idx_cond)
        logits = logits[:, -1, :] / max(1e-6, temperature)
        probs = F.softmax(logits, dim=-1)

        # nucleus sampling
        sorted_probs, sorted_idx = torch.sort(probs, descending=True)
        cum = torch.cumsum(sorted_probs, dim=-1)
        mask = cum > top_p
        mask[..., 0] = False
        sorted_probs[mask] = 0
        sorted_probs = sorted_probs / sorted_probs.sum(dim=-1, keepdim=True)
        next_local = torch.multinomial(sorted_probs, 1)
        next_token = sorted_idx.gather(-1, next_local)

        idx = torch.cat((idx, next_token), dim=1)
    return idx

In [12]:
@torch.no_grad()
def estimate_loss(model, batch_size, context_size, eval_iters=100):
    out = {}
    model.eval()
    for split in ['train', 'val']:
        losses = torch.zeros(eval_iters)
        for k in range(eval_iters):
            X, Y = get_batch(split, batch_size, context_size)
            _, loss = model(X, Y)
            losses[k] = loss.item()
        out[split] = losses.mean()
    model.train()
    return out

In [13]:
def train(model, steps, batch_size, context_size, report_frequency=1000, lr=1e-3):
    optimizer = torch.optim.AdamW(model.parameters(), lr=lr)
    for step in range(steps):
        xb, yb = get_batch('train', batch_size, context_size)
        _, loss = model(xb, yb)
        optimizer.zero_grad(set_to_none=True)
        loss.backward()
        optimizer.step()
        if step % report_frequency == 0 or step == steps - 1:
            losses = estimate_loss(model, batch_size, context_size)
            print(f"Step {step}, train loss: {losses['train']:.4f} | val loss: {losses['val']:.4f}")


In [14]:
def train_generate_print(model, steps=5000, batch_size=32, context_size=8, lr=1e-3):
    train(model, steps, batch_size, context_size, lr=lr)
    start_idx = torch.zeros((1, 1), dtype=torch.long, device=device)
    max_tokens = 300
    out = generate(model, context_size, start_idx=start_idx, number_of_tokens=max_tokens)[0].tolist()
    print("\n=== TRANSFORMER SAMPLE OUTPUT ===\n")
    print(decode(out))

In [15]:
class Head(nn.Module):
    def __init__(self, head_size, n_embd, context_size):
        super().__init__()
        self.key   = nn.Linear(n_embd, head_size, bias=False)
        self.query = nn.Linear(n_embd, head_size, bias=False)
        self.value = nn.Linear(n_embd, head_size, bias=False)
        self.dropout = nn.Dropout(0.2)
        self.register_buffer('tril', torch.tril(torch.ones(context_size, context_size)))

    def forward(self, x):
        B, T, C = x.shape
        k = self.key(x)   # (B,T,hd)
        q = self.query(x) # (B,T,hd)
        wei = q @ k.transpose(-2, -1) * C**-0.5   # (B,T,T)   # mentor-style scaling
        wei = wei.masked_fill(self.tril[:T, :T] == 0, float('-inf'))
        wei = F.softmax(wei, dim=-1)
        wei = self.dropout(wei)
        v = self.value(x)                         # (B,T,hd)
        out = wei @ v                             # (B,T,hd)
        return out

In [16]:
class MultiHeadAttention(nn.Module):
    def __init__(self, num_heads, head_size, n_embd, context_size):
        super().__init__()
        self.heads = nn.ModuleList([Head(head_size, n_embd, context_size) for _ in range(num_heads)])
        self.proj = nn.Linear(head_size * num_heads, n_embd)
        self.dropout = nn.Dropout(0.2)

    def forward(self, x):
        out = torch.cat([h(x) for h in self.heads], dim=-1)
        out = self.proj(out)
        out = self.dropout(out)
        return out

In [17]:
class FeedFoward(nn.Module):
    def __init__(self, n_embd):
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(n_embd, n_embd * 4),
            nn.ReLU(),
            nn.Linear(n_embd * 4, n_embd),
            nn.Dropout(0.2),
        )
    def forward(self, x): return self.net(x)

In [18]:
class Block(nn.Module):
    def __init__(self, n_embd, n_head, context_size):
        super().__init__()
        head_size = n_embd // n_head
        self.sa   = MultiHeadAttention(n_head, head_size, n_embd, context_size)
        self.ffwd = FeedFoward(n_embd)
        self.ln1  = nn.LayerNorm(n_embd)
        self.ln2  = nn.LayerNorm(n_embd)
    def forward(self, x):
        x = x + self.sa(self.ln1(x))
        x = x + self.ffwd(self.ln2(x))
        return x

In [19]:
class GPTLanguageModel(nn.Module):
    def __init__(self, vocab_size, n_embd=32, context_size=8, n_head=4, n_layer=4):
        super().__init__()
        self.token_embedding_table    = nn.Embedding(vocab_size, n_embd)
        self.position_embedding_table = nn.Embedding(context_size, n_embd)
        self.blocks = nn.Sequential(*[
            Block(n_embd, n_head=n_head, context_size=context_size) for _ in range(n_layer)
        ])
        self.ln_f    = nn.LayerNorm(n_embd)
        self.lm_head = nn.Linear(n_embd, vocab_size)

    def forward(self, idx, targets=None):
        B, T = idx.shape
        emb     = self.token_embedding_table(idx)                                   # (B,T,C)
        pos_emb = self.position_embedding_table(torch.arange(T, device=idx.device)) # (T,C)
        x = emb + pos_emb
        x = self.blocks(x)
        x = self.ln_f(x)
        logits = self.lm_head(x)                                                    # (B,T,V)
        loss = None
        if targets is not None:
            B, T, C = logits.shape
            logits = logits.view(B*T, C)
            targets = targets.view(B*T)
            loss = F.cross_entropy(logits, targets)
        return logits, loss

In [20]:
batch_size   = 32
context_size = 256
lr           = 3e-4   # note: train_generate_print default is 1e-3 unless passed explicitly
n_embd       = 384
n_heads      = 6
n_layer      = 6

# 5) Init, train, and sample
m = GPTLanguageModel(
    vocab_size,
    n_embd=n_embd,
    context_size=context_size,
    n_head=n_heads,
    n_layer=n_layer
).to(device)

In [21]:
train_generate_print(m, steps=5000, batch_size=batch_size, context_size=context_size)

Step 0, train loss: 3.8937 | val loss: 3.8212
Step 1000, train loss: 1.4751 | val loss: 1.4510
Step 2000, train loss: 1.2539 | val loss: 1.2798
Step 3000, train loss: 1.1471 | val loss: 1.2152
Step 4000, train loss: 1.0873 | val loss: 1.1898
Step 4999, train loss: 1.0397 | val loss: 1.1859

=== TRANSFORMER SAMPLE OUTPUT ===


the great number of a boat flew and the ship in the morning of the side
that the next morning spread and his torch, which was out of the great
company we found the ship and the work of the night we found him where he
had all lost this fellow shot, he might be found ourselves, and the lions
of the de


In [22]:
total_params = sum(p.numel() for p in m.parameters())
print(f"Total parameters: {total_params/1e6:.2f}M ({total_params:,})")

Total parameters: 10.80M (10,802,002)


In [31]:
m.eval()
with torch.no_grad():
    start_idx = torch.zeros((1, 1), dtype=torch.long, device=device)
    long_ids = generate(
        m, context_size, start_idx=start_idx, number_of_tokens=2000,
        temperature=0.95, top_p=0.95
    )[0].tolist()

In [32]:
prompt = (
    "PIRATE DIALOGUE (use slang: avast, aye, yarrr, matey, scallywag, grog, booty, keelhaul, cutlass, brig, plunder, buccaneer)\n"
    "CAPTAIN: Avast ye, lads—pass the grog and mind yer cutlasses!\n"
    "CREW: Aye, cap'n—booty in sight off th' port bow! Yarrr!\n"
    "CAPTAIN: "
)

In [33]:
missing = [c for c in prompt if c not in char_to_idx]
if missing:
    print("Warning: characters not in vocab replaced with space:", set(missing))
    prompt = "".join(c if c in char_to_idx else " " for c in prompt)

prompt_tokens = torch.tensor([[char_to_idx[c] for c in prompt]], dtype=torch.long, device=device)

In [34]:
print(decode(out))

PIRATE DIALOGUE (use slang: avast, aye, yarrr, matey, scallywag, grog, booty, keelhaul, cutlass, brig, plunder, buccaneer)
CAPTAIN: Avast ye, lads—pass the grog and mind yer cutlasses!
CREW: Aye, cap'n—booty in sight off th' port bow! Yarrr!
CAPTAIN: what if I do not think I was the powers and risked with me. It
was the prisoners at last ship of the thicket of the most following more
relief. The buccaneers should be so all as if I could not take the best
country of the rest. The rest of the whole place in the boat sand in the
same strange of the woods and the river Niger ships which was no company
of the country and so much as the only sound o
