Setting up the environment

In [None]:
import math
import random
import requests
import torch
import torch.nn as nn
import torch.nn.functional as F
import matplotlib.pyplot as plt


1. Dataset

In [None]:
file_path = "/content/Roman_Empire.txt"
with open(file_path, 'r', encoding='utf-8') as f:
    text = f.read()

print("text length:", len(text))

text length: 99423


2. Tokenization (character-level)

In [None]:
chars = sorted(list(set(text)))
vocab_size = len(chars)

str_to_int = {}
for i, ch in enumerate(chars):
    str_to_int[ch] = i

int_to_str = {}
for ch, i in str_to_int.items():
    int_to_str[i] = ch


def encode(string):
    ids = []
    for c in string:
        ids.append(str_to_int[c])
    return ids


def decode(ids):
    chars = []
    for i in ids:
        chars.append(int_to_str[i])
    return "".join(chars)


data = torch.tensor(encode(text), dtype=torch.long)
print("vocab_size:", vocab_size, "data_len:", len(data))


train_ratio = 0.9
split_index = int(train_ratio * len(data))
train_data = data[:split_index]
val_data = data[split_index:]


batch_size = 16
block_size = 64


def get_batch(split):
    if split == "train":
        d = train_data
    else:
        d = val_data

    max_start = len(d) - block_size - 1
    start_positions = torch.randint(0, max_start, (batch_size,))

    x_list = []
    y_list = []
    for start in start_positions:
        start = start.item()
        x_seq = d[start : start + block_size]
        y_seq = d[start + 1 : start + block_size + 1]
        x_list.append(x_seq)
        y_list.append(y_seq)

    x = torch.stack(x_list)
    y = torch.stack(y_list)
    return x, y


def causal_mask(T):
    mask = torch.tril(torch.ones(T, T))
    return mask.bool()


class SelfAttention(nn.Module):
    def __init__(self, d_model):
        super().__init__()
        self.q_proj = nn.Linear(d_model, d_model, bias=False)
        self.k_proj = nn.Linear(d_model, d_model, bias=False)
        self.v_proj = nn.Linear(d_model, d_model, bias=False)

    def forward(self, x):
        B, T, C = x.shape

        q = self.q_proj(x)
        k = self.k_proj(x)
        v = self.v_proj(x)

        scores = q @ k.transpose(-2, -1)
        scores = scores / math.sqrt(C)

        mask = causal_mask(T)
        scores = scores.masked_fill(~mask, float("-inf"))

        weights = F.softmax(scores, dim=-1)

        out = weights @ v
        return out


class FeedForward(nn.Module):
    def __init__(self, d_model):
        super().__init__()
        self.fc1 = nn.Linear(d_model, 4 * d_model)
        self.act = nn.GELU()
        self.fc2 = nn.Linear(4 * d_model, d_model)

    def forward(self, x):
        x = self.fc1(x)
        x = self.act(x)
        x = self.fc2(x)
        return x


class DecoderBlock(nn.Module):
    def __init__(self, d_model):
        super().__init__()
        self.ln1 = nn.LayerNorm(d_model)
        self.attn = SelfAttention(d_model)
        self.ln2 = nn.LayerNorm(d_model)
        self.ffn = FeedForward(d_model)

    def forward(self, x):
        x = x + self.attn(self.ln1(x))
        x = x + self.ffn(self.ln2(x))
        return x


d_model = 128
n_layers = 4


class MiniGPT(nn.Module):
    def __init__(self, vocab_size, block_size, d_model, n_layers):
        super().__init__()
        self.block_size = block_size

        self.token_emb = nn.Embedding(vocab_size, d_model)
        self.pos_emb = nn.Embedding(block_size, d_model)

        blocks = []
        for _ in range(n_layers):
            blocks.append(DecoderBlock(d_model))
        self.blocks = nn.Sequential(*blocks)

        self.ln_f = nn.LayerNorm(d_model)
        self.lm_head = nn.Linear(d_model, vocab_size)

    def forward(self, idx, targets=None):
        B, T = idx.shape
        pos = torch.arange(T).unsqueeze(0)

        tok = self.token_emb(idx)
        pos = self.pos_emb(pos)
        x = tok + pos

        x = self.blocks(x)

        x = self.ln_f(x)
        logits = self.lm_head(x)

        loss = None
        if targets is not None:
            logits_2d = logits.view(B * T, vocab_size)
            targets_1d = targets.view(B * T)
            loss = F.cross_entropy(logits_2d, targets_1d)

        return logits, loss


model = MiniGPT(vocab_size, block_size, d_model, n_layers)
optimizer = torch.optim.AdamW(model.parameters(), lr=1e-3)
max_steps = 2000
eval_every = 100


@torch.no_grad()
def evaluate_on_val():
    model.eval()
    losses = []
    for _ in range(50):
        xb, yb = get_batch("val")
        _, loss = model(xb, yb)
        losses.append(loss.item())
    model.train()
    return sum(losses) / len(losses)


# --- added: loss tracking lists (does not change existing logic) ---
steps_log = []
train_losses_log = []
val_losses_log = []


for step in range(1, max_steps + 1):
    xb, yb = get_batch("train")
    _, loss = model(xb, yb)

    optimizer.zero_grad()
    loss.backward()
    optimizer.step()

    if step % eval_every == 0:
        val_loss = evaluate_on_val()
        train_loss = loss.item()

        # --- added: save for plotting ---
        steps_log.append(step)
        train_losses_log.append(train_loss)
        val_losses_log.append(val_loss)

        print(f"step {step} | train loss {train_loss:.3f} | val loss {val_loss:.3f}")


# --- added: plot and save loss curves (does not change existing logic) ---
plt.figure()
plt.plot(steps_log, train_losses_log, label="train loss")
plt.plot(steps_log, val_losses_log, label="val loss")
plt.xlabel("step")
plt.ylabel("loss")
plt.title(f"Loss curves | n_layers={n_layers}, d_model={d_model}, block_size={block_size}")
plt.legend()
plt.grid(True)

plot_name = f"loss_curve_layers{n_layers}_dmodel{d_model}_block{block_size}.png"
plt.savefig(plot_name, dpi=200, bbox_inches="tight")
plt.show()


@torch.no_grad()
def generate(prompt, max_new=200, temperature=1.0):
    model.eval()

    idx = torch.tensor([encode(prompt)], dtype=torch.long)

    for _ in range(max_new):
        idx_cond = idx[:, -block_size:]

        logits, _ = model(idx_cond)
        next_logits = logits[:, -1, :] / temperature

        probs = F.softmax(next_logits, dim=-1)
        next_id = torch.multinomial(probs, num_samples=1)

        idx = torch.cat([idx, next_id], dim=1)

    return decode(idx[0].tolist())


print(generate("Rome was built", max_new=300, temperature=0.9))


vocab_size: 81 data_len: 99423
step 100 | train loss 2.494 | val loss 2.505
step 200 | train loss 2.378 | val loss 2.364
step 300 | train loss 2.150 | val loss 2.235
step 400 | train loss 2.156 | val loss 2.123
step 500 | train loss 2.034 | val loss 2.074
step 600 | train loss 2.049 | val loss 2.006
step 700 | train loss 1.897 | val loss 1.965
step 800 | train loss 1.777 | val loss 1.896
step 900 | train loss 1.944 | val loss 1.882
step 1000 | train loss 1.759 | val loss 1.820
step 1100 | train loss 1.714 | val loss 1.815
step 1200 | train loss 1.580 | val loss 1.826
step 1300 | train loss 1.822 | val loss 1.799
step 1400 | train loss 1.701 | val loss 1.759
step 1500 | train loss 1.554 | val loss 1.759
step 1600 | train loss 1.578 | val loss 1.735
step 1700 | train loss 1.646 | val loss 1.715
step 1800 | train loss 1.506 | val loss 1.706
step 1900 | train loss 1.404 | val loss 1.717
step 2000 | train loss 1.464 | val loss 1.697
Rome was built becaulthood, and milition tencondenced Chri