In [61]:
import sentencepiece as spm

sp = spm.SentencePieceProcessor()
sp.load("sp.model")

vocab_size=sp.get_piece_size()
print(vocab_size)

8000


In [62]:
import os
import torch
import torch.nn as nn
import torch.nn.functional as F

# Load a text file (any book / text)
with open("input.txt", "r", encoding="utf-8") as f:
    text = f.read()

tokens = sp.encode(text, out_type=int)
print(len(tokens))
data = torch.tensor(tokens, dtype=torch.long)

279427


In [63]:
cpu_only=False
device = "cpu" if cpu_only or not torch.cuda.is_available() else "cuda"
print("device:",device)

device: cpu


In [64]:
n = int(0.9 * len(data))
train_data = data[:n]
val_data = data[n:]

In [65]:
block_size = 128   # context length
batch_size = 32

def get_batch(split):
    data_src = train_data if split == "train" else val_data
    ix = torch.randint(len(data_src) - block_size -1, (batch_size,))

    # Input tokens
    x = torch.stack([data_src[i:i+block_size] for i in ix])
    # Target = next character
    y = torch.stack([data_src[i+1:i+block_size+1] for i in ix])

    return x.to(device), y.to(device)

In [66]:
get_batch("train")

(tensor([[7979,  269,  518,  ...,   45,  429,  141],
         [  35, 3894,   91,  ..., 7951,   13, 2553],
         [ 389,   19,  224,  ..., 7959,  387,  277],
         ...,
         [ 276, 7965, 7942,  ...,  762, 1505,  281],
         [ 493, 7959,  111,  ..., 6974,   10,  354],
         [  79,   41,  143,  ...,  255, 7979,   13]]),
 tensor([[ 269,  518,  765,  ...,  429,  141, 2337],
         [3894,   91, 7951,  ...,   13, 2553, 4979],
         [  19,  224, 7959,  ...,  387,  277, 7965],
         ...,
         [7965, 7942,    5,  ..., 1505,  281,    5],
         [7959,  111,  790,  ...,   10,  354, 5227],
         [  41,  143,  318,  ..., 7979,   13, 6341]]))

In [67]:
class CharEmbedding(nn.Module):
    def __init__(self, vocab_size, d_model, block_size):
        super().__init__()
        self.token_emb = nn.Embedding(vocab_size, d_model)
        self.pos_emb = nn.Embedding(block_size, d_model)

    def forward(self, x):
        B, T = x.shape
        tok = self.token_emb(x)              # (B, T, d_model)
        pos = self.pos_emb(torch.arange(T, device=device))  # (T, d_model)
        return tok + pos


In [68]:
class MultiHeadSelfAttention(nn.Module):
    def __init__(self, d_model, n_heads, block_size):
        super().__init__()
        assert d_model % n_heads == 0

        self.n_heads = n_heads
        self.d_head = d_model // n_heads

        self.key   = nn.Linear(d_model, d_model, bias=False)
        self.query = nn.Linear(d_model, d_model, bias=False)
        self.value = nn.Linear(d_model, d_model, bias=False)

        self.proj = nn.Linear(d_model, d_model)

        self.register_buffer(
            "mask",
            torch.tril(torch.ones(block_size, block_size)).bool()
        )

    def forward(self, x):
        B, T, C = x.shape

        # Project once
        K = self.key(x)    # (B, T, C)
        Q = self.query(x)
        V = self.value(x)

        # Split into heads
        K = K.view(B, T, self.n_heads, self.d_head).transpose(1, 2)
        Q = Q.view(B, T, self.n_heads, self.d_head).transpose(1, 2)
        V = V.view(B, T, self.n_heads, self.d_head).transpose(1, 2)
        # Shapes: (B, n_heads, T, d_head)

        # Attention scores
        att = (Q @ K.transpose(-2, -1)) / (self.d_head ** 0.5)
        # (B, n_heads, T, T)

        att = att.masked_fill(~self.mask[:T, :T], float('-inf'))
        att = F.softmax(att, dim=-1)

        # Weighted sum
        out = att @ V  # (B, n_heads, T, d_head)

        # Recombine heads
        out = out.transpose(1, 2).contiguous().view(B, T, C)

        return self.proj(out)

In [69]:
class TransformerBlock(nn.Module):
    def __init__(self, d_model, block_size, head_n):
        super().__init__()
        self.ln1 = nn.LayerNorm(d_model)
        self.attn = MultiHeadSelfAttention(d_model, head_n, block_size)
        self.ln2 = nn.LayerNorm(d_model)

        self.ff = nn.Sequential(
            nn.Linear(d_model, 4 * d_model),
            nn.ReLU(),
            nn.Linear(4 * d_model, d_model)
        )

    def forward(self, x):
        # Attention with residual
        x = x + self.attn(self.ln1(x))
        # Feed-forward with residual
        x = x + self.ff(self.ln2(x))
        return x

In [70]:
class SubwordLM(nn.Module):
    def __init__(self, vocab_size, d_model, block_size,head_n):
        super().__init__()
        self.embed = CharEmbedding(vocab_size, d_model, block_size)
        self.block = TransformerBlock(d_model,block_size, head_n)
        self.ln = nn.LayerNorm(d_model)
        self.head = nn.Linear(d_model, vocab_size)

    def forward(self, x, targets=None):
        x = self.embed(x)        # (B, T, d_model)
        x = self.block(x)
        x = self.ln(x)
        logits = self.head(x)   # (B, T, vocab_size)

        if targets is None:
            return logits

        B, T, V = logits.shape
        probs=logits.view(B*T, V) # probabilities: B * T, V
        ids = targets.view(B*T) # ids: B * T
        loss = F.cross_entropy(
            probs,
            ids
        )
        return logits, loss

In [71]:
model = SubwordLM(vocab_size, d_model=128, block_size=block_size, head_n=4).to(device)

def train():
    optimizer = torch.optim.AdamW(model.parameters(), lr=3e-4)
    for step in range(5000):
        xb, yb = get_batch("train")

        logits, loss = model(xb, yb)

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        if step % 500 == 0:
            print(f"step {step}, loss {loss.item():.4f}")

    torch.save(model.state_dict(), 'subword.pth')

model_path="subword.pth"
if os.path.exists(model_path):
    model.load_state_dict(torch.load(model_path, weights_only=True))
else:
    train()

In [72]:
@torch.no_grad()
def generate(model, start, max_new_tokens=200):
    model.eval()
    idx = torch.tensor([sp.encode(start)], device=device, dtype=torch.long)

    for _ in range(max_new_tokens):
        idx_cond = idx[:, -block_size:]
        logits = model(idx_cond)
        logits = logits[:, -1, :]
        probs = F.softmax(logits, dim=-1)
        next_id = torch.multinomial(probs, num_samples=1)
        idx = torch.cat([idx, next_id], dim=1)

    return sp.decode(idx[0].tolist())


In [74]:
print(generate(model, "CORIOLANUS:"))

CORIOLANUS: hands blush daughter Montague betteriod remembimble coll friar good proper extremity hold years France furnishims zeal M Nor by durst battle wrong Edward seated Calais gladlyubs dugIV forbid una gro jotgs Ha true company saw hideBRO Thank Bound wrong AEd Whatia Ban western gone disin glass crown Had ag die chee Being Pardon extrem doubt lendsct bodes tilllong whistle Wiltshire dischargedud chap greaterGE sirs noseTwas brainsumbling Min gent lod shinesout poison rose counteruselve Mont as nail religellow woes Pit timesGHBYhich Whereof object beha quoth toooon chee woningers drag BIONDELLOocate him trothrs way Right ves sle achieve lives stumble sleicer Many conditions drunkard sway Vienna ca gatecience pleasuretheonst many device song NORTHUMBERLAND landia amen gown waken doesotted dearer helps wakenelve Un yieldratchimo Lartius roundly CyER Welcome exped Hie Unb,anuth MONTAGUE admiriny fig PARIS chaste selfsamehear tree dwell under lower Jupnithen Cob pri served MercyENES M