In [10]:
# Import libraries
# import nltk
# nltk.download('punkt_tab')
import torch
import torch.nn as nn
import pickle
import math
from utils import *
import importlib
from torch.cuda.amp import GradScaler

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

In [2]:
class MultiHeadAttention(nn.Module):
    def __init__(self, d_model, n_heads):
        """
        Args:
            d_model: dimension of embeding vector output
            num_heads: number of self attention heads
        """
        super().__init__()
        assert d_model % n_heads == 0, "d_model must be divisible by num_heads"

        self.d_model = d_model # 512
        self.n_heads = n_heads # 8
        self.d_k = d_model // n_heads # 512/8 = 64 : Each key, query, val will be of 64d

        # key, query, and value matrixes # 64x64
        self.W_q = nn.Linear(d_model, d_model)
        self.W_k = nn.Linear(d_model, d_model)
        self.W_v = nn.Linear(d_model, d_model)
        self.W_o = nn.Linear(d_model, d_model)

    def scaled_dot_product_attention(self, Q, K, V, mask=None):
        attn_scores = torch.matmul(Q, K.transpose(-2, -1)) / math.sqrt(self.d_k)
        if mask is not None:
            attn_scores = attn_scores.masked_fill(mask == 0, -1e9)
        attn_probs = torch.softmax(attn_scores, dim=-1)
        output = torch.matmul(attn_probs, V)
        return output
    
    def split_heads(self, x):
        batch_size, seq_len, _ = x.size() # batch, seq_len, d_model
        return x.view(batch_size, seq_len, self.n_heads, self.d_k).transpose(1, 2)
    
    def combine_heads(self, x):
        batch_size, _, seq_len, _ = x.size() # batch_size, n_heads, seq_len, d_k
        return x.transpose(1, 2).contiguous().view(batch_size, seq_len, self.d_model)
        

    def forward(self, Q, K, V, mask=None): # batch_size x sequence_length x embedding_dim # 32x10x512
        Q = self.split_heads(self.W_q(Q))
        K = self.split_heads(self.W_k(K))
        V = self.split_heads(self.W_v(V))

        attn_scores = self.scaled_dot_product_attention(Q, K, V, mask)
        output = self.W_o(self.combine_heads(attn_scores))
        return output

class PositionWiseFeedForward(nn.Module):
    def __init__(self, d_model, d_off):
        """
        d_model: embedding_size, model's width
        d_ff: feed forward dimension
        """
        super().__init__()
        self.fc1 = nn.Linear(d_model, d_off)
        self.fc2 = nn.Linear(d_off, d_model)
        self.relu = nn.ReLU()

    def forward(self, x):
        return self.fc2(self.relu(self.fc1(x)))
    
class PositionalEncoding(nn.Module):
    def __init__(self, d_model, max_seq_len):
        super().__init__()

        pe = torch.zeros(max_seq_len, d_model)
        position = torch.arange(0, max_seq_len, dtype=torch.float).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * -(math.log(10000.0) / d_model))

        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)

        self.register_buffer('pe', pe.unsqueeze(0))

    def forward(self, x):
        return x + self.pe[:, :x.size(1)]

# class EncoderLayer(nn.Module):
#     def __init__(self, d_model, n_heads, d_ff, dropout):
#         super().__init__()

#         self.self_attn = MultiHeadAttention(d_model, n_heads)
#         self.feed_forward = PositionWiseFeedForward(d_model, d_ff)
#         self.norm1 = nn.LayerNorm(d_model)
#         self.norm2 = nn.LayerNorm(d_model)
#         self.dropout = nn.Dropout(dropout)
    
#     def forward(self, x, mask):
#         attn_output = self.self_attn(x, x, x, mask)
#         x = self.norm1(x + self.dropout(attn_output))
#         ff_output = self.feed_forward(x)
#         x = self.norm2(x + self.dropout(ff_output))
#         return x
    
class DecoderBlock(nn.Module):
   def __init__(self, d_model, n_heads, d_ff, dropout):
      super().__init__()

      self.self_attn = MultiHeadAttention(d_model, n_heads)
      self.cross_attn = MultiHeadAttention(d_model, n_heads)
      self.feed_forward = PositionWiseFeedForward(d_model, d_ff)
      
      self.norm1 = nn.LayerNorm(d_model)
      self.norm2 = nn.LayerNorm(d_model)
      self.dropout = nn.Dropout(dropout)

   def forward(self, x, tgt_mask):
      # casual self-attention
      attn = self.self_attn(x, x, x, tgt_mask)
      x = self.norm1(x + self.dropout(attn))

      ff_output = self.feed_forward(x)
      x = self.norm2(x + self.dropout(ff_output))
      return x
   
class GPTLike(nn.Module):
    def __init__(self, vocab_size, d_model=128, n_heads=8, 
                 n_layers=6, d_ff=512, max_seq_len=256, dropout=0.1):
        super().__init__() 

        self.token_emb = nn.Embedding(vocab_size, d_model)
        self.pos_enc = PositionalEncoding(d_model, max_seq_len)

        self.layers = nn.ModuleList([
            DecoderBlock(d_model, n_heads, d_ff, dropout) for _ in range(n_layers)
        ])

        self.ln_final = nn.LayerNorm(d_model)
        self.out_proj = nn.Linear(d_model, vocab_size, bias=False)

    def causual_mask(sz, device):
        return torch.tril(torch.ones(sz, sz, device=device)).unsqueeze(0).unsqueeze(0)
    
    def forward(self, x):
        B, T = x.size()

        tok = self.token_emb(x)
        h = self.pos_enc(tok)

        mask = GPTLike.causual_mask(T, x.device)

        for blk in self.layers:
            h = blk(h, mask)

        h = self.ln_final(h)
        logits = self.out_proj(h)
        return logits

In [11]:
import utils
importlib.reload(utils)

<module 'utils' from 'c:\\Users\\Ngo Minh Khoa\\Documents\\Artificial Intelligence\\self_testing material\\shakespeare\\utils.py'>

In [12]:
# Load word embeddings pretrained
data_path = 'data.txt'
text = load_data(data_path)
text[:100]

'THE SONNETS\n\n                    1\n\nFrom fairest creatures we desire increase,\nThat thereby beauty’s'

In [13]:
# Tokenize input and build vocab
tokens, word2idx, idx2word = tokenize_and_build_vocab(text)

pad_idx  = word2idx['<pad>']
unk_idx  = word2idx['<unk>']
vocab_size = len(word2idx)

In [5]:
# train_loader, val_loader = train_eval_split(tokens, word2idx, batch_size=1300)

# transformer = GPTLike(vocab_size).to(device)
# criterion = nn.CrossEntropyLoss(ignore_index=0)
# optimizer = torch.optim.AdamW(transformer.parameters(), lr=3e-4, betas=(0.9, 0.95))
# scheduler = get_transformer_scheduler(optimizer, d_model=128, warmup_steps=4000)

# transformer

In [6]:
# import utils
# importlib.reload(utils)

In [None]:
with open("config.pkl", "rb") as f:
    config = pickle.load(f)

transformer = GPTLike(**config).to(device)
transformer.load_state_dict(torch.load("transformer_state.pth"))
criterion = nn.CrossEntropyLoss(ignore_index=0)
optimizer = torch.optim.AdamW(transformer.parameters(), lr=3e-4, betas=(0.9, 0.95))
scheduler = get_transformer_scheduler(optimizer, d_model=128, warmup_steps=4000)
scaler = GradScaler('cuda')

train_loader, val_loader = train_eval_split(tokens, word2idx, batch_size=1320)

In [None]:
for epoch in range(20):
    loss = train_one_epoch(transformer, train_loader, optimizer, scheduler, criterion, scaler, device)
    print(f"Epoch {epoch+1}: {loss:.4f}")

Epoch 1: 3.9465


In [None]:
torch.save(transformer.state_dict(), "transformer_state.pth")
config = {
    "vocab_size": len(word2idx),
    "d_model": 128,
    "n_heads": 8,
    "n_layers": 6,
    "d_ff": 512,
    "max_seq_len": 256,
    "dropout": 0.1
}
with open("config.pkl", "wb") as f:
    pickle.dump(config, f)

In [15]:
with open("config.pkl", "rb") as f:
    config = pickle.load(f)

transformer = GPTLike(**config).to(device)
transformer.load_state_dict(torch.load("transformer_state.pth"))

<All keys matched successfully>

In [None]:
import os
os.environ["CUDA_LAUNCH_BLOCKING"] = "1"

In [19]:
prompt = "thou art"
prompt_ids, prompt_tokens = encode_prompt(prompt, word2idx)

print(prompt_ids)
print([idx2word[i] for i in prompt_ids.squeeze(0).tolist()])

out_ids = generate(transformer, prompt_ids, top_k=60, min_len=3, temperature=1.1)
print(' '.join(idx2word[i] for i in out_ids))

tensor([[ 29, 130]], device='cuda:0')
['thou', 'art']
thou art as a wise <eos>
