In [54]:
import tiktoken
import torch
from typing import Dict, List, Any, Tuple

In [4]:
# we're using GPT2 tokenizer
tokenizer = tiktoken.get_encoding("gpt2")
CONFIG = {
    "vocab_size": 50257,
    "context_length": 256,
    "embed_dim": 768,
    "n_heads": 12,
    "n_layers": 12,
    "drop_rate": 0.1,
}

In [13]:
def text_to_token_ids(tokenizer, text: str) -> torch.Tensor:
    # convert given text to token ids 
    return torch.tensor(tokenizer.encode(text)).unsqueeze(0)

def token_ids_to_text(tokenizer, token_ids: torch.Tensor) -> str:
    return tokenizer.decode(token_ids.squeeze(0).tolist())

def generete_tokens(
    model: torch.nn.Module,
    token_ids: torch.Tensor,
    max_generated_tokens: int,
    context_length: int
): 
    # Perform autocomplete using given GPT model
    for _ in range(max_generated_tokens):
        # pick current context 
        current_context_token_ids = token_ids[:, -context_length:]  # (batch_size, context_length)
        logits = model(current_context_token_ids)  # (batch_size, context_length, vocab_size)
        probs = torch.nn.functional.softmax(logits[:, -1, :], dim = -1)  # (batch_size, vocab_size)
        new_token_ids = torch.argmax(probs, dim = -1, keepdim = True)  # (batch_size, 1)
        token_ids = torch.concat((token_ids, new_token_ids), dim = 1)
        
    return token_ids

In [33]:
class Embeddings(torch.nn.Module):
    def __init__(self, config: Dict):
        super().__init__()
        self.token_embeddings = torch.nn.Embedding(
            num_embeddings=config["vocab_size"],
            embedding_dim=config["embed_dim"],
        )
        self.position_embeddings = torch.nn.Embedding(
            num_embeddings=config["context_length"],
            embedding_dim=config["embed_dim"],
        )
        self.dropout = torch.nn.Dropout(config["drop_rate"])
        
    def forward(self, token_ids: torch.Tensor) -> torch.Tensor:
        """
        token_ids: (batch_size, seq_size)
        
        return: (batch_size, seq_size, embed_dim)
        """
        seq_size = token_ids.size(-1)
        position_ids = torch.arange(seq_size).unsqueeze(0)
        
        token_emb = self.token_embeddings(token_ids)
        position_emb = self.position_embeddings(position_ids)
        emb = token_emb + position_emb
        emb = self.dropout(emb)
        return emb
    
def scaled_dot_attention(
    query: torch.Tensor, 
    key: torch.Tensor, 
    value: torch.Tensor,
    mask: torch.Tensor
) -> torch.Tensor:
    """
    query: (batch_size, seq_size, head_dim)
    mask: (seq_size, seq_size)
    
    return: (batch_size, seq_size, head_dim)
    """
    batch_size, seq_size, head_dim = key.size()
    scores = torch.bmm(query, key.transpose(1, 2))
    scores = scores.masked_fill(mask.bool()[:seq_size, :seq_size], -torch.inf)
    weights = torch.nn.functional.softmax(scores / head_dim**0.5, dim = -1)
    return torch.bmm(weights, value)
    
class AttentionHead(torch.nn.Module):
    def __init__(self, embed_dim: int, head_dim: int, context_length: int):
        super().__init__()
        self.query = torch.nn.Linear(embed_dim, head_dim)
        self.key = torch.nn.Linear(embed_dim, head_dim)
        self.value = torch.nn.Linear(embed_dim, head_dim)
        self.register_buffer(
            "mask",
            torch.triu(torch.ones(context_length, context_length), diagonal = 1)
        )
        
    def forward(self, x: torch.Tensor) -> torch.Tensor:
        """
        x: (batch_size, seq_size, embed_dim)
        
        return: (batch_size, seq_size, head_dim)
        """
        return scaled_dot_attention(
            self.query(x),
            self.key(x),
            self.value(x),
            self.mask,
        )
    
class MultiHeadAttention(torch.nn.Module):
    def __init__(self, config: Dict):
        super().__init__()
        embed_dim = config["embed_dim"]
        head_dim = embed_dim // config["n_heads"]
        self.heads = torch.nn.ModuleList([
            AttentionHead(embed_dim, head_dim, config["context_length"]) for _ in range(config["n_heads"])
        ])
        self.output_layer = torch.nn.Linear(config["embed_dim"], config["embed_dim"])
        
    def forward(self, x: torch.Tensor) -> torch.Tensor:
        """
        token_ids: (batch_size, seq_size, embed_dim)
        
        return: (batch_size, seq_size, embed_dim)
        """
        hidden = torch.concat([h(x) for h in self.heads], dim = -1)
        x = self.output_layer(hidden)
        return x
    
    
class FeedForward(torch.nn.Module):
    def __init__(self, config: Dict):
        super().__init__()
        self.layers = torch.nn.Sequential(
            torch.nn.Linear(config["embed_dim"], 4 * config["embed_dim"]),
            torch.nn.GELU(),
            torch.nn.Linear(4 * config["embed_dim"], config["embed_dim"]),
            torch.nn.Dropout(config["drop_rate"]),
        )
        
    def forward(self, x: torch.Tensor) -> torch.Tensor:
        """
        token_ids: (batch_size, seq_size, embed_dim)
        
        return: (batch_size, seq_size, embed_dim)
        """
        return self.layers(x)
    
    
class TransformerBlock(torch.nn.Module):
    def __init__(self, config: Dict):
        super().__init__()
        self.norm1 = torch.nn.LayerNorm(config["embed_dim"])
        self.attn = MultiHeadAttention(config)
        self.norm2 = torch.nn.LayerNorm(config["embed_dim"])
        self.fnn = FeedForward(config)
        self.dropout = torch.nn.Dropout(config["drop_rate"])
        
    def forward(self, x: torch.Tensor) -> torch.Tensor:
        """
        token_ids: (batch_size, seq_size, embed_dim)
        
        return: (batch_size, seq_size, embed_dim)
        """
        skip = x
        x = self.norm1(x)
        x = self.attn(x)
        x = self.dropout(x)
        x = x + skip
        
        skip = x
        x = self.norm2(x)
        x = self.fnn(x)
        x = self.dropout(x)
        x = x + skip
        
        return x
        

class GPT(torch.nn.Module):
    def __init__(self, config: Dict):
        super().__init__()
        self.layers = torch.nn.Sequential(
            Embeddings(config),
            *[TransformerBlock(config) for _ in range(config["n_layers"])],
            torch.nn.Linear(config["embed_dim"], config["vocab_size"])
        )
        
    def forward(self, token_ids: torch.Tensor) -> torch.Tensor:
        """
        token_ids: (batch_size, seq_size)
        
        return: (batch_size, seq_size, vocab_size)
        """
        return self.layers(token_ids)

In [36]:
model = GPT(CONFIG)
output_tokens = generete_tokens(
    model,
    text_to_token_ids(tokenizer, "Hello world!"),
    max_generated_tokens=10,
    context_length=CONFIG["context_length"]
)
token_ids_to_text(tokenizer, output_tokens)

'Hello world!チredited observational 42 bind deline Presidential adrenowell Stability'

In [41]:
# Create dataloaders
from torch.utils.data import Dataset, DataLoader

class GPTDataset(Dataset):
    def __init__(self, tokenizer, text: str, stride: int, max_length: int):
        self.input_ids = []
        self.target_ids = []
        token_ids = tokenizer.encode(text)
        for i in range(0, len(token_ids) - max_length, stride):
            input_chunk = token_ids[i: i + max_length]
            target_chunk = token_ids[i + 1: i + max_length + 1]
        
            self.input_ids.append(torch.tensor(input_chunk))
            self.target_ids.append(torch.tensor(target_chunk))

    def __len__(self):
        return len(self.input_ids)

    def __getitem__(self, idx):
        return self.input_ids[idx], self.target_ids[idx]
    
def create_dataloader_v1(
    tokenizer,
    text: str,
    batch_size: int,
    max_length: int,
    stride: int,
    shuffle: bool,
    drop_last: bool,
    num_workers: int = 0
):
    dataset = GPTDataset(tokenizer, text, max_length=max_length, stride=stride)
    dataloader = DataLoader(
        dataset,
        batch_size=batch_size,
        shuffle=shuffle,
        drop_last=drop_last,
        num_workers=num_workers
    )
    return dataloader

In [47]:
file_path = "/Users/lkhamsurenl/development/transformers/notebooks/the-verdict.txt"
with open(file_path, "r", encoding="utf-8") as f:
    text_data = f.read()
    
train_ratio = 0.9
idx = int(train_ratio * len(text_data))
train_text = text_data[:idx]
val_text = text_data[idx:]

    
train_loader = create_dataloader_v1(
    tokenizer,
    text=train_text,
    batch_size=2,
    max_length=CONFIG["context_length"],
    stride=CONFIG["context_length"],
    shuffle=True,
    drop_last=True,
)
val_loader = create_dataloader_v1(
    tokenizer,
    text=val_text,
    batch_size=2,
    max_length=CONFIG["context_length"],
    stride=CONFIG["context_length"],
    shuffle=False,
    drop_last=False,
)

In [48]:
for x, y in train_loader:
    print(x.shape, y.shape)

torch.Size([2, 256]) torch.Size([2, 256])
torch.Size([2, 256]) torch.Size([2, 256])
torch.Size([2, 256]) torch.Size([2, 256])
torch.Size([2, 256]) torch.Size([2, 256])
torch.Size([2, 256]) torch.Size([2, 256])
torch.Size([2, 256]) torch.Size([2, 256])
torch.Size([2, 256]) torch.Size([2, 256])
torch.Size([2, 256]) torch.Size([2, 256])
torch.Size([2, 256]) torch.Size([2, 256])


In [49]:
for x, y in val_loader:
    print(x.shape, y.shape)

torch.Size([2, 256]) torch.Size([2, 256])


In [50]:
# TODO: Add simple training loop with evaluation step

In [51]:
optimizer = torch.optim.AdamW(model.parameters(), lr=0.0004, weight_decay = 0.1)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

  from .autonotebook import tqdm as notebook_tqdm


In [59]:
def calc_loss_batch(input_batch, target_batch, model, device):
    input_batch.to(device)
    target_batch.to(device)
    logits = model(input_batch)
    loss = torch.nn.functional.cross_entropy(
        logits.flatten(0, 1), target_batch.flatten(0, 1)
    )
    return loss

def calc_loss_loader(dataloader: DataLoader, model: torch.nn.Module, device, num_batches=None):
    total_loss = 0
    if len(dataloader) == 0:
        return float("nan")
    elif num_batches is None:
        num_batches = len(dataloader)
    else:
        num_batches = min(num_batches, len(dataloader))

    for i, (input_batch, target_batch) in enumerate(dataloader):
        if i < num_batches:
            loss = calc_loss_batch(input_batch, target_batch, model, device)
            total_loss += loss
        else:
            break
    return total_loss / num_batches

def evaluate_model(model, train_loader, val_loader, device, eval_iter):
    model.eval()
    with torch.no_grad():
        train_loss = calc_loss_loader(train_loader, model, device, num_batches=eval_iter)
        val_loss = calc_loss_loader(val_loader, model, device, num_batches=eval_iter)
    model.train()
    return train_loss, val_loss


def generate_and_print_sample(model, tokenizer, device, start_context) -> None:
    output_tokens = generete_tokens(
        model,
        text_to_token_ids(tokenizer, start_context),
        max_generated_tokens=10,
        context_length=CONFIG["context_length"]
    )
    
    output_text = token_ids_to_text(tokenizer, output_tokens)
    print(f"Output: {output_text}")
    

def train_model_simple(
    model,
    tokenizer,
    train_loader: DataLoader,
    val_loader: DataLoader,
    num_epochs: int,
    optimizer,
    device,
    eval_freq: int,
    eval_iter: int,
    start_context: str
) -> Tuple[List, List]:
    train_losses, val_losses = [], []
    global_step = -1
    
    for epoch in range(num_epochs):
        for input_batch, target_batch in train_loader:
            # Ensure we're doing backprop based on current batch only
            optimizer.zero_grad()
            loss = calc_loss_batch(input_batch, target_batch, model, device)
            loss.backward()
            optimizer.step()
            global_step += 1
            
            if global_step % eval_freq == 0:
                train_loss, val_loss = evaluate_model(model, train_loader, val_loader, device, eval_iter)
                train_losses.append(train_loss)
                val_losses.append(val_loss)
                
                print(f"Ep {epoch + 1} (Step {global_step}): "
                     f"Train loss: {train_loss}; "
                     f"Val loss: {val_loss}")
                
        generate_and_print_sample(model, tokenizer, device, start_context)
    
    return train_losses, val_losses

In [60]:
train_model_simple(
    model,
    tokenizer,
    train_loader,
    val_loader,
    num_epochs=10,
    optimizer=optimizer,
    device=device,
    eval_freq=5,
    eval_iter=5,
    start_context="Hello, my name is"
)

Ep 1 (Step 0): Train loss: 9.799917221069336; Val loss: 10.494915962219238
Ep 1 (Step 5): Train loss: 7.4723076820373535; Val loss: 8.816893577575684
Output: Hello, my name is to be one of the axioms he had
Ep 2 (Step 10): Train loss: 4.401607990264893; Val loss: 7.378841876983643
Ep 2 (Step 15): Train loss: 3.3797054290771484; Val loss: 6.811258316040039
Output: Hello, my name is he was one of my host of my dear,
Ep 3 (Step 20): Train loss: 2.01564621925354; Val loss: 6.828524589538574
Ep 3 (Step 25): Train loss: 1.2992844581604004; Val loss: 6.848301410675049
Output: Hello, my name is to the sun a little too as a smile that
Ep 4 (Step 30): Train loss: 0.7146415710449219; Val loss: 7.095798969268799
Ep 4 (Step 35): Train loss: 0.3647560477256775; Val loss: 7.472814559936523
Output: Hello, my name is to work.
"quite insensible to the
Ep 5 (Step 40): Train loss: 0.2630985379219055; Val loss: 7.831353664398193
Output: Hello, my name is the inevitable garlanded frame. The mere outline
Ep 

NameError: name 'train_loses' is not defined