In [1]:
import torch
import math
import torch.nn as nn
from torch.nn import functional as F
from script.gpt_utils import *
import os

In [None]:
import re

# Read original file
with open('data/truyen_kieu_clean.txt', 'r', encoding='utf-8') as f:
    text = f.read()

# Remove special characters throughout the text
text = re.sub(r"[:;!.,?\\'\"]", '', text)

# Remove the last character if it's not a letter or number
text = re.sub(r'[^a-zA-Z0-9]$', '', text)

# Save to a new file
with open('data/truyen_kieu.txt', 'w', encoding='utf-8') as f:
    f.write(text)

print("Cleaned text saved to 'data/truyen_kieu.txt'")

In [2]:
# Preparation phase
chars, text, vocab_size = load_truyen_kieu_dataset("data/truyen_kieu.txt")
encoder, decoder = load_encoder_decoder(chars)
data = torch.tensor(encoder(text), dtype=torch.long)
train_data, val_data = split_data(data, 0.9)

Vocab size: 121
Number of characters: 101140


In [3]:
class MultiHeadAttention(nn.Module):
    def __init__(self, embedding_size, in_channels, n_heads, dropout=0.0):
        super().__init__()
        self.dropout = dropout
        self.flash = hasattr(torch.nn.functional, 'scaled_dot_product_attention')
        self.embedding_size = embedding_size
        self.in_channels = in_channels
        self.n_heads = n_heads
        self.head_size = embedding_size // n_heads
        
        self.c_attn = nn.Linear(embedding_size, embedding_size * 3)
        self.proj = nn.Linear(embedding_size, embedding_size)
        self.attn_dropout = nn.Dropout(dropout)
        self.ln_dropout = nn.Dropout(dropout)

    def forward(self, x):
        B, T, C = x.shape
        q, k, v = self.c_attn(x).split(self.embedding_size, 2)
        
        q = q.view(B, T, self.n_heads, self.head_size).transpose(1, 2)
        k = k.view(B, T, self.n_heads, self.head_size).transpose(1, 2)
        v = v.view(B, T, self.n_heads, self.head_size).transpose(1, 2)

        if self.flash:
            value = torch.nn.functional.scaled_dot_product_attention(q, k, v, attn_mask=None, dropout_p=self.dropout if self.training else 0, is_causal=True)
        else:
            attn = q @ k.transpose(-2, -1) * (1 / math.sqrt(self.head_size))
            attn = F.softmax(attn, dim=-1)
            attn = self.attn_dropout(attn)
            value = attn @ v
        value = value.transpose(1, 2).contiguous().view(B, T, C)

        value = self.proj(value)
        value = self.ln_dropout(value)
        return value
    
class FeedForward(nn.Module):
    def __init__(self, in_channels, factor, dropout=0.0):
        super().__init__()
        self.relu = nn.ReLU()
        self.ln1 = nn.Linear(in_channels, in_channels * factor)
        self.ln2 = nn.Linear(in_channels * factor, in_channels)
        self.dropout = nn.Dropout(dropout)
    
    def forward(self, x):
        x = self.relu(self.ln1(x))
        x = self.ln2(x)
        x = self.dropout(x)
        return x

class Block(nn.Module):
    def __init__(self, in_channels, embedding_size, n_heads, dropout=0.0):
        super().__init__()
        head_size = embedding_size // n_heads
        self.multi_head_attn = MultiHeadAttention(embedding_size, in_channels, n_heads, dropout)
        self.ffwd = FeedForward(embedding_size, 4, dropout)
        self.ln1 = nn.LayerNorm(embedding_size)
        self.ln2 = nn.LayerNorm(embedding_size)
    
    def forward(self, x):
        x = x + self.multi_head_attn(self.ln1(x))
        x = x + self.ffwd(self.ln2(x))
        return x
    
class GPT(nn.Module):
    def __init__(self, in_channels, vocab_size, embedding_size, n_heads, n_layers, dropout=0.0):
        super().__init__()
        self.token_embedding_table = nn.Embedding(vocab_size, embedding_size)
        self.position_embedding_table = nn.Embedding(in_channels, embedding_size)
        self.blocks = nn.Sequential(*[Block(in_channels, embedding_size, n_heads) for _ in range(n_layers)])
        self.ln_f = nn.LayerNorm(embedding_size)
        self.lm_head = nn.Linear(embedding_size, vocab_size)
        self.dropout = nn.Dropout(dropout)

        # init all weights
        self.apply(self._init_weights)
        # apply special scaled init to the residual projections, per GPT-2 paper
        for pn, p in self.named_parameters():
            if pn.endswith('c_proj.weight'):
                torch.nn.init.normal_(p, mean=0.0, std=0.02/math.sqrt(2 * n_layers))

    def forward(self, x, targets=None):
        device = x.device
        B,T = x.size()
        pos = torch.arange(0, T, dtype=torch.long, device=device)
        token_embedding = self.token_embedding_table(x)
        pos_embedding = self.position_embedding_table(pos)
        
        x = self.dropout(token_embedding + pos_embedding)
        x = self.blocks(x)
        x = self.ln_f(x)

        if targets is not None:
            logits = self.lm_head(x)
            loss = F.cross_entropy(logits.view(-1, logits.size(-1)), targets.view(-1))
        else:
            logits = self.lm_head(x[:, [-1], :])
            loss = None
        return logits, loss

    def _init_weights(self, module):
        if isinstance(module, nn.Linear):
            torch.nn.init.normal_(module.weight, mean=0.0, std=0.02)
            if module.bias is not None:
                torch.nn.init.zeros_(module.bias)
        elif isinstance(module, nn.Embedding):
            torch.nn.init.normal_(module.weight, mean=0.0, std=0.02)

    @torch.no_grad()
    def generate(self, idx, max_new_tokens, block_size: int = 32, temperature=1.0, top_k=None): 
        for _ in range(max_new_tokens):
            idx_cond = idx if idx.size(1) <= block_size else idx[:, -block_size:]
            logits, _ = self(idx_cond)
            logits = logits[:, -1, :] / temperature
            if top_k is not None:
                v, _ = torch.topk(logits, min(top_k, logits.size(-1)))
                logits[logits < v[:, [-1]]] = -float('Inf')
            probs = F.softmax(logits, dim=-1)
            idx_next = torch.multinomial(probs, num_samples=1)
            idx = torch.cat((idx, idx_next), dim=1)

        return idx

In [4]:
@torch.no_grad()
def eval(model, val_data, device: str = 'cpu', n_iters: int = 100):
    model.eval()
    losses = torch.zeros(n_iters)
    for k in range(n_iters):
        x_val, y_val = get_batch(val_data)
        x_val, y_val = x_val.to(device), y_val.to(device)
        logits, loss = model(x_val, y_val)
        losses[k] = loss.item()
    model.train()
    return loss.mean()

In [5]:
def train(
    model, 
    optimizer, 
    device, 
    train_data,
    val_data,
    n_steps: int = 1000, 
    train_iter: int = 100, 
    eval_iter: int = 100, 
    total_eval: int = 100, 
    save_checkpoint: int = 100, 
    save_dir: str = "checkpoints"
):
    # Preparation
    model.to(device)
    loss_history = []
    save = 1

    # Check if save dir valid
    if not os.path.exists(save_dir):
        print("Directory not exist, creating directory...")
        os.makedirs(save_dir)
        print(f"Directory {save_dir} created")
    else:
        print(f"Directory {save_dir} exists")
    print("Start training...")
    
    for step  in range(n_steps):
        model.train()
        x_train, y_train = get_batch(train_data)
        x_train, y_train = x_train.to(device), y_train.to(device)
        logits, loss = model(x_train, y_train)
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        loss_history.append(loss.item())
        
        # Evaluation
        if step != 0:
            if step % train_iter == 0:
                avg_loss = sum(loss_history[-train_iter:]) / train_iter  # Compute average over last eval_iter steps
                print(f"Step {step}: Average Loss = {avg_loss:.4f}")
        
            if step % eval_iter == 0:
                eval_loss = eval(model, val_data, device, total_eval)
                print(f"***Eval: {eval_loss:.4f}")
    
            if step % save_checkpoint == 0:
                torch.save(model.state_dict(), f"{save_dir}/checkpoint_{int(save)}.pth")
                save += 1

    # Final eval:
    eval_loss = eval(model, val_data, device, total_eval)
    print("*" * 10)
    avg_loss = sum(loss_history) / n_steps
    print(f"Final eval --- Step: {n_steps} - Training Loss: {avg_loss:.4f} - Eval Loss: {eval_loss:.4f}")
    torch.save(model.state_dict(), f"{save_dir}/checkpoint_final.pth")

    return loss_history, eval_loss

In [6]:
# hyperparamters 
device = "cuda" if torch.cuda.is_available() else "cpu"
n_steps = 1200
eval_iter = 100
train_iter = 100
total_eval = 100

model = GPT(512, vocab_size, 512, 32, 6)
model = model.to(device)

optimizer = torch.optim.AdamW(model.parameters(), lr=0.0004)


loss_history, eval_loss = train(
    model, 
    optimizer, 
    device,
    train_data,
    val_data,
    n_steps=n_steps,
    train_iter=train_iter,
    eval_iter=eval_iter,
    total_eval=total_eval,
    save_checkpoint=100,
    save_dir="checkpoints"
)

Directory checkpoints exists
Start training...
Step 100: Average Loss = 3.0137
***Eval: 2.3585
Step 200: Average Loss = 2.1331
***Eval: 2.0541
Step 300: Average Loss = 1.9053
***Eval: 1.8421
Step 400: Average Loss = 1.8084
***Eval: 1.7967
Step 500: Average Loss = 1.7473
***Eval: 1.7694
Step 600: Average Loss = 1.6986
***Eval: 1.7781
Step 700: Average Loss = 1.6518
***Eval: 1.7621
Step 800: Average Loss = 1.6107
***Eval: 1.7126
Step 900: Average Loss = 1.5548
***Eval: 1.6712
Step 1000: Average Loss = 1.5050
***Eval: 1.7941
Step 1100: Average Loss = 1.4307
***Eval: 1.7807
**********
Final eval --- Step: 1200 - Training Loss: 1.7879 - Eval Loss: 1.7809


In [7]:
beautiful_print(model, decoder, 200, device=device)

ành đường ai ráng tình quỉ can
Hồng đã quấn kiếp với cờ dài
Thoảng dương thiệt mặt ngày mời
Sầu bèa vửa đảoến chiêm m 
Trhưa Hết hẹp mừng mới là đầy sau
Lẫu sao như nước sốt bao
Hẳn sao vực nàng trưởn

'\nành đường ai ráng tình quỉ can\nHồng đã quấn kiếp với cờ dài\nThoảng dương thiệt mặt ngày mời\nSầu bèa vửa đảoến chiêm m \nTrhưa Hết hẹp mừng mới là đầy sau\nLẫu sao như nước sốt bao\nHẳn sao vực nàng trưởn'