# Phase 1 Experiment — Dataset 2: AG News

**Task:** Topic Classification  
**Sequence Type:** MEDIUM (~200 tokens)  
**Classes:** 4 (World / Sports / Business / Sci-Tech)  
**Source:** HuggingFace `ag_news`

**Goal:** Compare all 5 PE methods on medium-length topic classification.

In [None]:
import os, sys, math, time
import numpy as np
import matplotlib.pyplot as plt
from collections import Counter

import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
from sklearn.metrics import accuracy_score, f1_score

PROJECT_ROOT = os.path.abspath(os.path.join(os.getcwd(), '..', '..'))
if PROJECT_ROOT not in sys.path:
    sys.path.insert(0, PROJECT_ROOT)

from PE.sinusoidal_pe import SinusoidalPositionalEncoding
from PE.binary_pe    import BinaryPositionalEncoding
from PE.rope         import RoPEPositionalEncoding
from PE.learned_pe   import LearnedPositionalEncoding
from PE.dape         import DAPEPositionalEncoding

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f'Device: {device}')

In [None]:
class Config:
    dataset_name = 'ag_news'
    text_col     = 'text'
    label_col    = 'label'
    n_classes    = 4
    max_seq_len  = 256        # AG News articles are medium length
    vocab_size   = 20000
    max_train    = 30000      # subset for speed
    max_val      = 5000

    d_model  = 128
    n_heads  = 4
    n_layers = 3
    d_ff     = 256
    dropout  = 0.1

    batch_size = 64
    lr         = 1e-3
    epochs     = 10
    seed       = 42

cfg = Config()
torch.manual_seed(cfg.seed)
np.random.seed(cfg.seed)
print('Config loaded. n_classes =', cfg.n_classes)

In [None]:
from datasets import load_dataset

print('Loading AG News...')
raw = load_dataset('ag_news')
print(raw)

train_texts  = raw['train'][cfg.text_col][:cfg.max_train]
train_labels = raw['train'][cfg.label_col][:cfg.max_train]
val_texts    = raw['test'][cfg.text_col][:cfg.max_val]
val_labels   = raw['test'][cfg.label_col][:cfg.max_val]

print(f'Train: {len(train_texts)} | Val: {len(val_texts)}')
print(f'Label names: {raw["train"].features["label"].names}')

counter = Counter()
for text in train_texts:
    counter.update(text.lower().split())

vocab = {'<PAD>': 0, '<UNK>': 1}
for word, _ in counter.most_common(cfg.vocab_size - 2):
    vocab[word] = len(vocab)

print(f'Vocabulary size: {len(vocab)}')

lengths = [len(t.split()) for t in train_texts]
print(f'Seq length — mean: {np.mean(lengths):.1f} | max: {max(lengths)}')

In [None]:
def tokenize(text, vocab, max_len):
    tokens = text.lower().split()[:max_len]
    ids = [vocab.get(t, 1) for t in tokens]
    ids += [0] * (max_len - len(ids))
    return ids

class TextDataset(Dataset):
    def __init__(self, texts, labels, vocab, max_len):
        self.X = torch.tensor([tokenize(t, vocab, max_len) for t in texts], dtype=torch.long)
        self.y = torch.tensor(labels, dtype=torch.long)
    def __len__(self): return len(self.y)
    def __getitem__(self, idx): return self.X[idx], self.y[idx]

train_ds = TextDataset(train_texts, train_labels, vocab, cfg.max_seq_len)
val_ds   = TextDataset(val_texts,   val_labels,   vocab, cfg.max_seq_len)
train_loader = DataLoader(train_ds, batch_size=cfg.batch_size, shuffle=True)
val_loader   = DataLoader(val_ds,   batch_size=cfg.batch_size, shuffle=False)
print(f'Train batches: {len(train_loader)} | Val batches: {len(val_loader)}')

In [None]:
class MultiHeadAttention(nn.Module):
    def __init__(self, d_model, n_heads, dropout=0.1):
        super().__init__()
        self.n_heads = n_heads; self.d_k = d_model // n_heads
        self.W_q = nn.Linear(d_model, d_model); self.W_k = nn.Linear(d_model, d_model)
        self.W_v = nn.Linear(d_model, d_model); self.W_o = nn.Linear(d_model, d_model)
        self.dropout = nn.Dropout(dropout)
    def forward(self, x):
        B, T, D = x.shape
        Q = self.W_q(x).view(B, T, self.n_heads, self.d_k).transpose(1, 2)
        K = self.W_k(x).view(B, T, self.n_heads, self.d_k).transpose(1, 2)
        V = self.W_v(x).view(B, T, self.n_heads, self.d_k).transpose(1, 2)
        scores = torch.matmul(Q, K.transpose(-2, -1)) / math.sqrt(self.d_k)
        out = torch.matmul(self.dropout(F.softmax(scores, dim=-1)), V)
        return self.W_o(out.transpose(1, 2).contiguous().view(B, T, D))

class FeedForward(nn.Module):
    def __init__(self, d_model, d_ff, dropout=0.1):
        super().__init__()
        self.net = nn.Sequential(nn.Linear(d_model, d_ff), nn.GELU(), nn.Dropout(dropout), nn.Linear(d_ff, d_model))
    def forward(self, x): return self.net(x)

class TransformerBlock(nn.Module):
    def __init__(self, d_model, n_heads, d_ff, dropout=0.1):
        super().__init__()
        self.attn = MultiHeadAttention(d_model, n_heads, dropout)
        self.ff = FeedForward(d_model, d_ff, dropout)
        self.ln1 = nn.LayerNorm(d_model); self.ln2 = nn.LayerNorm(d_model)
        self.drop = nn.Dropout(dropout)
    def forward(self, x):
        x = self.ln1(x + self.drop(self.attn(x)))
        return self.ln2(x + self.drop(self.ff(x)))

class TransformerClassifier(nn.Module):
    def __init__(self, vocab_size, n_classes, d_model, n_heads, n_layers, d_ff, max_seq_len, pe_class, dropout=0.1):
        super().__init__()
        self.embedding = nn.Embedding(vocab_size, d_model, padding_idx=0)
        self.pe = pe_class(d_model, max_seq_len, dropout)
        self.blocks = nn.ModuleList([TransformerBlock(d_model, n_heads, d_ff, dropout) for _ in range(n_layers)])
        self.norm = nn.LayerNorm(d_model)
        self.head = nn.Linear(d_model, n_classes)
    def forward(self, x):
        x = self.pe(self.embedding(x))
        for b in self.blocks: x = b(x)
        return self.head(self.norm(x.mean(dim=1)))

print('Model classes defined.')

In [None]:
def train_epoch(model, loader, criterion, optimizer):
    model.train(); total_loss, correct, total = 0, 0, 0
    for X, y in loader:
        X, y = X.to(device), y.to(device)
        optimizer.zero_grad()
        loss = criterion(model(X), y)
        loss.backward()
        torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0)
        optimizer.step()
        total_loss += loss.item() * X.size(0)
        correct += (model(X).argmax(-1) == y).sum().item()
        total += y.size(0)
    return total_loss / total, correct / total

def evaluate(model, loader):
    model.eval(); all_preds, all_labels = [], []
    with torch.no_grad():
        for X, y in loader:
            preds = model(X.to(device)).argmax(-1).cpu()
            all_preds.extend(preds.tolist()); all_labels.extend(y.tolist())
    return accuracy_score(all_labels, all_preds), f1_score(all_labels, all_preds, average='macro')

print('Training/eval functions defined.')

In [None]:
PE_METHODS = {
    'sinusoidal': SinusoidalPositionalEncoding,
    'binary':     BinaryPositionalEncoding,
    'rope':       RoPEPositionalEncoding,
    'learned':    LearnedPositionalEncoding,
    'dape':       DAPEPositionalEncoding,
}

results = {}
criterion = nn.CrossEntropyLoss()

for pe_name, pe_class in PE_METHODS.items():
    print(f'\n=== {pe_name.upper()} PE ===')
    torch.manual_seed(cfg.seed)
    model = TransformerClassifier(
        len(vocab), cfg.n_classes, cfg.d_model, cfg.n_heads,
        cfg.n_layers, cfg.d_ff, cfg.max_seq_len, pe_class, cfg.dropout
    ).to(device)
    print(f'  Parameters: {sum(p.numel() for p in model.parameters()):,}')
    optimizer = torch.optim.AdamW(model.parameters(), lr=cfg.lr, weight_decay=1e-4)
    scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=cfg.epochs)
    best_acc, best_f1, t0 = 0, 0, time.time()
    for epoch in range(cfg.epochs):
        train_loss, train_acc = train_epoch(model, train_loader, criterion, optimizer)
        val_acc, val_f1 = evaluate(model, val_loader)
        scheduler.step()
        if val_acc > best_acc: best_acc, best_f1 = val_acc, val_f1
        print(f'  Ep {epoch+1:02d} | loss {train_loss:.4f} | train {train_acc:.4f} | val_acc {val_acc:.4f} | val_f1 {val_f1:.4f}')
    elapsed = time.time() - t0
    results[pe_name] = {'accuracy': best_acc, 'f1': best_f1, 'time_s': elapsed}
    print(f'  Done in {elapsed:.1f}s — best val acc: {best_acc:.4f}')

In [None]:
print('\n' + '='*60)
print('PHASE 1 RESULTS — AG News (MEDIUM sequences, 4-class topic)')
print('='*60)
print(f'{"PE Method":<15} {"Accuracy":>10} {"F1":>10} {"Time (s)":>10}')
print('-'*60)
best_acc_val = max(v['accuracy'] for v in results.values())
for pe_name, m in results.items():
    marker = ' <-- BEST' if m['accuracy'] == best_acc_val else ''
    print(f'{pe_name:<15} {m["accuracy"]:>10.4f} {m["f1"]:>10.4f} {m["time_s"]:>10.1f}{marker}')
print('='*60)

names = list(results.keys())
accs  = [results[n]['accuracy'] for n in names]
f1s   = [results[n]['f1']       for n in names]
times = [results[n]['time_s']   for n in names]

fig, axes = plt.subplots(1, 3, figsize=(15, 4))
colors = ['#4C72B0', '#DD8452', '#55A868', '#C44E52', '#8172B2']
axes[0].bar(names, accs, color=colors); axes[0].set_title('Accuracy — AG News'); axes[0].tick_params(axis='x', rotation=15)
axes[1].bar(names, f1s,  color=colors); axes[1].set_title('F1 Score — AG News'); axes[1].tick_params(axis='x', rotation=15)
axes[2].bar(names, times,color=colors); axes[2].set_title('Training Time (s)');   axes[2].tick_params(axis='x', rotation=15)
plt.suptitle('Phase 1: PE Comparison on AG News (MEDIUM sequences)', fontsize=13)
plt.tight_layout()
plt.savefig('results_agnews.png', dpi=150, bbox_inches='tight')
plt.show()
print('Saved: results_agnews.png')