# **Mini Transformer for Dialogue Generation**

## **1. Import Library & Seed**

In [None]:
import math, random
import numpy as np
import torch, torch.nn as nn, torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from datasets import load_dataset
import evaluate

# Reproducibility
seed = 42
random.seed(seed); np.random.seed(seed); torch.manual_seed(seed)
if torch.cuda.is_available(): torch.cuda.manual_seed_all(seed)
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print("Using device:", device)


## **2. Import Library & Seed**

In [None]:
# Load DailyDialog
raw = load_dataset('li2017dailydialog/daily_dialog')  # train/validation/test 

def preprocess(example):
    toks = ['<bos>']
    for utt in example['dialog']:
        toks += utt.split() + ['<sep>']
    toks[-1] = '<eos>'
    return {'tokens': toks}

train_raw = raw['train'].map(preprocess).select(range(5000))
val_raw   = raw['validation'].map(preprocess).select(range(1000))
print(f"Train: {len(train_raw)}, Val: {len(val_raw)}")

## **3. PyTorch Dataset & DataLoader**

In [None]:
class DialogDataset(Dataset):
    def __init__(self, data, toi, max_len=64):
        self.data, self.toi, self.max_len = data, toi, max_len

    def __len__(self): return len(self.data)

    def encode(self, toks):
        ids = [self.toi.get(t, self.toi['<unk>']) for t in toks]
        if len(ids)>self.max_len: ids=ids[:self.max_len]
        else: ids += [self.toi['<pad>']]*(self.max_len-len(ids))
        return ids

    def __getitem__(self, idx):
        ids = self.encode(self.data[idx]['tokens'])
        x = torch.tensor(ids[:-1], dtype=torch.long)
        y = torch.tensor(ids[1:],  dtype=torch.long)
        return x, y

train_ds = DialogDataset(train_raw, toi)
val_ds   = DialogDataset(val_raw,   toi)
train_loader = DataLoader(train_ds, batch_size=32, shuffle=True)
val_loader   = DataLoader(val_ds,   batch_size=32)


## **4. Positional Encoding Varian**

### 4.1 Sinusoidal

In [None]:
class SinusoidalPE(nn.Module):
    def __init__(self, d_model, max_len=64):
        super().__init__()
        pos = torch.arange(max_len).unsqueeze(1)
        i   = torch.arange(d_model//2).unsqueeze(0)
        angles = pos / (10000**(2*i/d_model))
        pe = torch.zeros(max_len, d_model)
        pe[:,0::2], pe[:,1::2] = torch.sin(angles), torch.cos(angles)
        self.pe = pe.unsqueeze(0).to(device)

    def forward(self, x): return x + self.pe[:,:x.size(1),:]

### 4.1 Learnable

class LearnablePE(nn.Module):
    def __init__(self, max_len, d_model):
        super().__init__()
        self.embed = nn.Embedding(max_len, d_model)
    def forward(self, x):
        pos = torch.arange(x.size(1), device=x.device).unsqueeze(0)
        return self.embed(pos)


## **5. Arsitektur Decoder-Only Transformer**

In [None]:
class DecoderBlock(nn.Module):
    def __init__(self, d_model, nhead, d_ff, dropout=0.1):
        super().__init__()
        self.ln1  = nn.LayerNorm(d_model)
        self.attn = nn.MultiheadAttention(d_model, nhead, dropout=dropout)
        self.ln2  = nn.LayerNorm(d_model)
        self.ff   = nn.Sequential(
            nn.Linear(d_model, d_ff), nn.GELU(),
            nn.Linear(d_ff, d_model), nn.Dropout(dropout)
        )

    def forward(self, x):
        # x: (T, B, D)
        T,B,_ = x.size()
        mask  = torch.triu(torch.full((T,T), float('-inf'), device=x.device), 1)
        res = x
        x = self.ln1(x)
        x,_ = self.attn(x, x, x, attn_mask=mask)
        x = res + x
        res = x
        x = self.ln2(x)
        x = res + self.ff(x)
        return x

class DecoderOnlyTransformer(nn.Module):
    def __init__(self, cfg):
        super().__init__()
        d, V, L = cfg['d_model'], cfg['vocab_size'], cfg['max_len']
        self.tok_embed = nn.Embedding(V, d)
        self.pos_embed = (SinusoidalPE(d,L) 
                          if cfg['pos']=='sinusoidal' 
                          else LearnablePE(L,d))
        self.layers = nn.ModuleList([
            DecoderBlock(d, cfg['nhead'], cfg['d_ff']) 
            for _ in range(cfg['nlayers'])
        ])
        self.ln_f = nn.LayerNorm(d)
        self.head = nn.Linear(d, V, bias=False)

    def forward(self, input_ids):
        B,T = input_ids.shape
        x = self.tok_embed(input_ids) * math.sqrt(self.tok_embed.embedding_dim)
        if isinstance(self.pos_embed, SinusoidalPE):
            x = self.pos_embed(x)
        else:
            x = x + self.pos_embed(input_ids)
        x = x.transpose(0,1)  # (T,B,D)
        for layer in self.layers: x = layer(x)
        x = self.ln_f(x)
        logits = self.head(x) # (T,B,V)
        return logits.transpose(0,1)  # (B,T,V)


## **6. Training & Evaluasi**

In [None]:
cfg = {
  'd_model':64, 'nhead':1, 'd_ff':256,
  'nlayers':2, 'vocab_size':len(itos), 'max_len':64,
  'pos':'sinusoidal'
}
model = DecoderOnlyTransformer(cfg).to(device)
optimizer = optim.AdamW(model.parameters(), lr=1e-4)
criterion = nn.CrossEntropyLoss(ignore_index=toi['<pad>'])

def train_epoch():
    model.train(); total=0
    for x,y in train_loader:
        x,y = x.to(device), y.to(device)
        logits = model(x)
        loss = criterion(logits.view(-1,logits.size(-1)), y.view(-1))
        optimizer.zero_grad(); loss.backward(); optimizer.step()
        total += loss.item()
    return total/len(train_loader)

def eval_epoch(loader):
    model.eval(); total=0
    with torch.no_grad():
        for x,y in loader:
            x,y = x.to(device), y.to(device)
            logits = model(x)
            total += criterion(logits.view(-1,logits.size(-1)), y.view(-1)).item()
    return total/len(loader)

# Loop Pelatihan
for exp in ['exp1','exp2','exp3']:
    print("===",exp,"===")
    for epoch in range(3):
        tr = train_epoch()
        vl = eval_epoch(val_loader)
        print(f"Epoch {epoch+1}: train_loss={tr:.4f}, val_loss={vl:.4f}")


## **7. Inference: Greedy Decoding**

In [None]:
def greedy_generate(max_len=64):
    model.eval()
    ids = torch.full((1,1), toi['<bos>'], dtype=torch.long, device=device)
    for _ in range(max_len-1):
        logits = model(ids)               # (1,T,V)
        next_id = logits[:,-1].argmax(-1) # (1,)
        ids = torch.cat([ids, next_id.unsqueeze(-1)], dim=1)
        if next_id.item()==toi['<eos>']: break
    return [itos[i] for i in ids.squeeze().tolist()]

print("Generated:", greedy_generate())


## **8. Evaluasi dengan Hugging Face Evaluate**

In [None]:
# Load metrik
rouge  = evaluate.load('rouge')    # ROUGE-1/2/L
bleu   = evaluate.load('bleu')     # BLEU
meteor = evaluate.load('meteor')   # METEOR

# Kumpulkan prediksi & referensi
preds, refs = [], []
model.eval()
with torch.no_grad():
    for x,y in val_loader:
        x = x.to(device)
        logits = model(x)
        pred_ids = logits.argmax(-1).cpu().tolist()
        for pi, yi in zip(pred_ids, y.tolist()):
            # decode hingga eos
            p = [itos[id] for id in pi if id not in [toi['<pad>']]]
            r = [itos[id] for id in yi if id not in [toi['<pad>']]]
            preds.append(" ".join(p).split("<eos>")[0].strip())
            refs.append([" ".join(r).split("<eos>")[0].strip()])

# Compute skor
res_rouge  = rouge.compute(predictions=preds, references=refs)
res_bleu   = bleu.compute(predictions=preds, references=refs)
res_meteor = meteor.compute(predictions=preds, references=refs)

print("ROUGE:",  res_rouge)
print("BLEU:",   res_bleu)
print("METEOR:", res_meteor)