In [1]:
import os
import pickle
from tqdm import tqdm
import torch
import torch.nn as nn
import torch.nn.functional as F

In [2]:
class AttentionHead(nn.Module):
    def __init__(
        self,
        embedding_dim: int,
        head_dim: int,
        dropout: float
    ):
        super(AttentionHead, self).__init__()

        self.query = nn.Linear(embedding_dim, head_dim)
        self.key = nn.Linear(embedding_dim, head_dim)
        self.value = nn.Linear(embedding_dim, head_dim)

        self.dropout = nn.Dropout(dropout)

    def forward(self, x):
        # Linear
        # (B, T, E) -> (B, T, H)
        q = self.query(x)
        # (B, T, E) -> (B, T, H)
        k = self.key(x)
        # (B, T, E) -> (B, T, H)
        v = self.value(x)

        _, T, H = k.shape

        # MatMul (Query and Transpose of Key)
        # (B, T, H) @ (B, H, T) -> (B, T, T)
        qk = q @ k.transpose(-1, -2)

        # Scale
        qk = qk / H ** 0.5

        # Mask
        mask = torch.tril(torch.ones(T, T, device=x.device))
        qk = qk.masked_fill(mask == 0, float("-inf"))

        # Softmax
        attention_scores = torch.softmax(qk, dim=-1)

        # Dropout
        attention_scores = self.dropout(attention_scores)

        # MatMul (Attention scores and Value)
        # (B, T, T) @ (B, T, H) -> (B, T, H)
        out = attention_scores @ v

        return out

In [3]:
class MaskedMultiHeadAttention(nn.Module):
    def __init__(
        self,
        embedding_dim: int,
        num_heads: int,
        dropout: float
    ):
        super(MaskedMultiHeadAttention, self).__init__()

        self.heads = nn.ModuleList([
            AttentionHead(
                embedding_dim,
                embedding_dim // num_heads,
                dropout
            ) for _ in range(num_heads)
        ])
        
        self.linear = nn.Linear(embedding_dim, embedding_dim)

        self.dropout = nn.Dropout(dropout)

    def forward(self, x):
        # Scaled dot-product attention and Concat
        # (B, T, E)
        out = torch.cat([h(x) for h in self.heads], dim=-1)

        # Linear
        out = self.linear(out)

        # Dropout
        out = self.dropout(out)

        return out

In [4]:
class FeedForward(nn.Module):
    def __init__(
        self,
        embedding_dim: int,
        dropout: float
    ):
        super(FeedForward, self).__init__()

        self.feed_forward_network = nn.Sequential(
            nn.Linear(embedding_dim, embedding_dim * 4),
            nn.GELU(),
            nn.Linear(embedding_dim * 4, embedding_dim)
        )

        self.dropout = nn.Dropout(dropout)

    def forward(self, x):
        return self.dropout(self.feed_forward_network(x))

In [5]:
class DecoderBlock(nn.Module):
    def __init__(
        self,
        embedding_dim: int,
        num_heads: int,
        dropout: float
    ):
        super(DecoderBlock, self).__init__()

        self.multi_head_attention = MaskedMultiHeadAttention(
            embedding_dim,
            num_heads,
            dropout
        )
        self.layer_norm1 = nn.LayerNorm(embedding_dim)

        self.feed_forward = FeedForward(
            embedding_dim,
            dropout
        )
        self.layer_norm2 = nn.LayerNorm(embedding_dim)

    def forward(self, x):
        # Norm and Masked multi-head attention
        out = self.multi_head_attention(self.layer_norm1(x))
        # Add
        x = x + out

        # Norm and Feed forward
        out = self.feed_forward(self.layer_norm2(x))
        # Add
        out = x + out

        return out

In [6]:
class Transformer(nn.Module):
    def __init__(
        self,
        vocab_size: int,
        max_seq_len: int,
        embedding_dim: int,
        num_heads: int,
        num_layers: int,
        dropout: float
    ):
        super(Transformer, self).__init__()

        self.token_embedding = nn.Embedding(
            num_embeddings=vocab_size,
            embedding_dim=embedding_dim
        )
        self.position_embedding = nn.Embedding(
            num_embeddings=max_seq_len,
            embedding_dim=embedding_dim
        )
        self.dropout = nn.Dropout(dropout)

        self.decoder_blocks = nn.ModuleList([
            DecoderBlock(
                embedding_dim,
                num_heads,
                dropout
            ) for _ in range(num_layers)
        ])

        self.layer_norm = nn.LayerNorm(embedding_dim)
        self.lm_head = nn.Linear(embedding_dim, vocab_size)

    def forward(self, ids):
        token_embedding = self.token_embedding(ids)
        positional_encoding = self.position_embedding(torch.arange(0, ids.shape[-1], device=ids.device))
        x = token_embedding + positional_encoding
        x = self.dropout(x)

        for block in self.decoder_blocks:
            x = block(x)

        x = self.layer_norm(x)
        logits = self.lm_head(x)

        return logits

In [7]:
class ShakespeareGPT:
    def __init__(
        self,
        vocab_size: int,
        max_seq_len: int = 256,
        batch_size: int = 64,
        embedding_dim: int = 384,
        num_heads: int = 6,
        num_layers: int = 6,
        dropout: float = 0.2,
        optimizer_cls: type[torch.optim.Optimizer] = torch.optim.AdamW,
        learning_rate: float = 3e-4,
        optimizer_kwargs: dict = {},
        output_dir: str | None = "results",
        device: str = "auto"
    ):
        self.vocab_size = vocab_size
        self.max_seq_len = max_seq_len
        self.batch_size = batch_size
        self.optimizer_cls = optimizer_cls
        self.learning_rate = learning_rate
        self.optimizer_kwargs = optimizer_kwargs
        self.output_dir = output_dir
        self.device = torch.device("cuda" if device != "cpu" and torch.cuda.is_available() else "cpu")
        
        self.model = Transformer(
            vocab_size,
            max_seq_len,
            embedding_dim,
            num_heads,
            num_layers,
            dropout
        ).to(self.device)

    def train(
        self,
        train_data: torch.Tensor,
        eval_data: torch.Tensor,
        train_steps: int = 5000,
        eval_freq: int = 500,
        load_best_model_at_end: bool = False
    ) -> None:
        # If it is required to load the best model at the end, ensure output_dir is specified to save model checkpoints
        if load_best_model_at_end:
            assert self.output_dir is not None, \
                "output_dir must be specified to save model checkpoints in order to load the best model at the end of training."
            
        # Send data to GPU
        train_data = train_data.to(self.device)
        eval_data = eval_data.to(self.device)

        optimizer = self.optimizer_cls(
            self.model.parameters(),
            lr=self.learning_rate,
            **self.optimizer_kwargs
        )

        best_val_loss = float("inf")
        best_val_loss_step = None

        for step in tqdm(range(train_steps)):
            # Get batch of data
            x, y = self._get_batch(train_data)

            # Forward pass
            logits = self.model(x)
            loss = F.cross_entropy(
                logits.reshape(-1, self.vocab_size),
                y.reshape(-1)
            )

            # Backward pass
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

            if (step + 1) % eval_freq == 0:
                # Evaluate the model
                train_loss = self.evaluate(train_data)
                val_loss = self.evaluate(eval_data)
                print(f"Step: {step + 1} - Training loss: {train_loss:.4f} - Validation loss: {val_loss:.4f}")

                if self.output_dir is not None:
                    # Save model checkpoint
                    self.save(save_path=os.path.join(self.output_dir, f"checkpoint-{step + 1}", "model_weights.pth"))

                    if val_loss < best_val_loss:
                        best_val_loss = val_loss
                        best_val_loss_step = step + 1

        if load_best_model_at_end and best_val_loss_step is not None:
            self.model.load_state_dict(torch.load(
                os.path.join(self.output_dir, f"checkpoint-{best_val_loss_step}", "model_weights.pth"),
                weights_only=True
            ))

    @torch.no_grad()
    def evaluate(
        self,
        data: torch.Tensor
    ) -> float:
        # Send data to GPU
        data = data.to(self.device)
        
        # Set model to evaluation mode
        self.model.eval()

        # Tensor to store the loss from each batch
        losses = torch.zeros(100)

        for i in range(100):
            # Get batch of data
            x, y = self._get_batch(data)

            # Forward pass
            logits = self.model(x)
            loss = F.cross_entropy(
                logits.reshape(-1, self.vocab_size),
                y.reshape(-1)
            )

            # Store the loss from the batch
            losses[i] = loss.item()

        # Calculate mean loss
        loss = losses.mean().item()

        # Set model back to training mode
        self.model.train()

        return loss
    
    @torch.no_grad()
    def generate(
        self,
        input_ids: torch.Tensor,
        max_new_tokens: int = 1000,
        do_sample: bool = False
    ) -> torch.Tensor:
        # input_ids can be either 1D (seq_len,) or 2D (batch_size, seq_len)
        assert input_ids.ndim in (1, 2), \
            "input_ids must be a 1D or 2D tensor."
        # If 1D, ensure there is at least 1 element and add batch dimension
        if input_ids.ndim == 1:
            assert input_ids.shape[0] > 0, \
                "input_ids must contain at least 1 element."
            input_ids = input_ids.unsqueeze(dim=0)
        # If 2D, ensure both dimensions are greater than 0
        else:
            assert input_ids.shape[0] > 0, \
                "Batch size (dim=0) must be greater than 0."
            assert input_ids.shape[1] > 0, \
                "Sequence length (dim=1) must be greater than 0."
        # Send to GPU
        input_ids = input_ids.to(self.device) # (B, t)

        # Set model to evaluation mode
        self.model.eval()

        for _ in range(max_new_tokens):
            # The model's input will be the last max_seq_len tokens of each sequence
            x = input_ids[:, -self.max_seq_len:] # (B, T)
            # Get model predictions
            logits = self.model(x)[:, -1, :] # (B, C)
            if do_sample:
                # Get probability distributions
                probs = torch.softmax(logits, dim=-1) # (B, C)
                # Sample from the distributions
                ids = torch.multinomial(probs, num_samples=1) # (B, 1)
            else:
                # Get the indexes with the highest probabilities
                ids = torch.argmax(logits, dim=-1, keepdim=True) # (B, 1)
            # Append sampled indexes to the running sequences
            input_ids = torch.cat((input_ids, ids), dim=-1) # (B, t + 1)

        # Set model back to training mode
        self.model.train()
        
        return input_ids.cpu()
    
    def save(
        self,
        save_path: str | None = None
    ) -> None:
        if save_path is None:
            # If save_path is not provided, derive from output_dir
            assert self.output_dir is not None, \
                "As output_dir is not specified, you must provide a valid save_path to save the model."
            save_path = os.path.join(self.output_dir, "model_weights.pth")

        # Create save directory if it doesnt exist
        os.makedirs(os.path.dirname(save_path), exist_ok=True)
        
        # Save model weights
        torch.save(self.model.state_dict(), f=save_path)

    def load_model(
        self,
        save_path: str | None = None
    ) -> None:
        if save_path is None:
            # If save_path is not provided, derive from output_dir
            assert self.output_dir is not None, \
                "As output_dir is not specified, you must provide a valid save_path to load the tokenizer."            
            save_path = os.path.join(self.output_dir, "model_weights.pth")

        # Ensure save_path is valid
        assert os.path.exists(save_path), \
            f"save_path '{save_path}' does not exist. Please provide a valid path."
        
        # Load model weights
        self.model.load_state_dict(torch.load(save_path, map_location=self.device))

    def _get_batch(
        self,
        data: torch.Tensor
    ) -> tuple[torch.Tensor, torch.Tensor]:
        # Starting index of each batch
        start_ids = torch.randint(
            low=0,
            high=len(data) - self.max_seq_len,
            size=(self.batch_size,)
        )

        # Input sequences (max_seq_len tokens starting from each of the start_ids)
        x = torch.stack(
            [data[i:i + self.max_seq_len] for i in start_ids]
        ).to(self.device)
        # Target sequences (input sequences shifted right by one token)
        y = torch.stack(
            [data[i + 1:i + self.max_seq_len + 1] for i in start_ids]
        ).to(self.device)

        return x, y

In [8]:
class CharacterLevelTokenizer:
    def __init__(
        self,
        special_tokens: list[str] = [],
        output_dir: str | None = "results"
    ):
        self.special_tokens = special_tokens
        self.vocab_size = len(special_tokens)
        self.char2idx = {c: i for i, c in enumerate(special_tokens)}
        self.idx2char = {i: c for c, i in self.char2idx.items()}

        self.output_dir = output_dir
        if output_dir is not None:
            os.makedirs(output_dir, exist_ok=True)

    def train(
        self,
        text: str
    ) -> None:
        # Get all unique characters in the text
        chars = sorted(list(set(text)))

        # Add all unique characters to the vocabulary
        self.vocab_size = len(self.special_tokens) + len(chars)
        self.char2idx = {c: i for i, c in enumerate(self.special_tokens + chars)}
        self.idx2char = {i: c for c, i in self.char2idx.items()}

    def encode(
        self,
        text: str,
        return_tensor: bool = False
    ) -> list[int] | torch.Tensor:
        ids = []

        for char in text:
            if char in self.char2idx:
                # Append corresponding ID to the list if the character is in the vocabulary
                ids.append(self.char2idx[char])
            else:
                # Skip the character if it is not in the vocabulary
                print(f"Warning: Skipping the character '{char}' as it is not in the vocabulary.")
        
        # Convert to tensor if required
        if return_tensor:
            return torch.tensor(ids, dtype=torch.int64)
        
        return ids
    
    def decode(
        self,
        ids: list[int] | torch.Tensor
    ) -> str:
        # Convert ids to a list if it is a tensor
        if isinstance(ids, torch.Tensor):
            ids = ids.tolist()

        string = ""

        for idx in ids:
            # Skip special tokens
            if idx >= len(self.special_tokens):
                if idx in self.idx2char:
                    # Append corresponding character to the string if the ID is in the vocabulary
                    string += self.idx2char[idx]
                else:
                    # Skip if the ID is not in the vocabulary
                    print(f"Warning: Skipping the ID {idx} as it is not in the vocabulary.")
        
        return string
    
    def save(
        self,
        save_path: str | None = None
    ) -> None:
        if save_path is None:
            # If save_path is not provided, derive from output_dir
            assert self.output_dir is not None, \
                "As output_dir is not specified, you must provide a valid save_path to save the tokenizer."            
            save_path = os.path.join(self.output_dir, "tokenizer.pkl")

        # Create save directory if it doesnt exist
        os.makedirs(os.path.dirname(save_path), exist_ok=True)

        # Save tokenizer state to a pickle file
        with open(save_path, "wb") as f:
            pickle.dump({
                "special_tokens": self.special_tokens,
                "vocab_size": self.vocab_size,
                "char2idx": self.char2idx,
                "idx2char": self.idx2char
            }, f)

    def load(
        self,
        save_path: str | None = None
    ) -> None:
        if save_path is None:
            # If save_path is not provided, derive from output_dir
            assert self.output_dir is not None, \
                "As output_dir is not specified, you must provide a valid save_path to load the tokenizer."            
            save_path = os.path.join(self.output_dir, "tokenizer.pkl")

        # Ensure save_path is valid
        assert os.path.exists(save_path), \
            f"save_path '{save_path}' does not exist. Please provide a valid path."

        with open(save_path, "rb") as f:
            # Load tokenizer state from a pickle file
            data = pickle.load(f)

            # Set tokenizer attributes
            self.special_tokens = data["special_tokens"]
            self.vocab_size = data["vocab_size"]
            self.char2idx = data["char2idx"]
            self.idx2char = data["idx2char"]

In [9]:
with open(
    r"D:\Datasets\Tiny-Shakespeare\All.txt",
    "r",
    encoding="utf-8",
    errors="replace"
) as f:
    text = f.read()

In [10]:
tokenizer = CharacterLevelTokenizer(
    special_tokens=["<UNK>"],
    output_dir="results"
)
tokenizer.train(text)

In [11]:
model = ShakespeareGPT(
    vocab_size=tokenizer.vocab_size,
    max_seq_len=256,
    batch_size=64,
    embedding_dim=384,
    num_heads=6,
    num_layers=6,
    dropout=0.2,
    optimizer_cls=torch.optim.AdamW,
    learning_rate=3e-4,
    output_dir="results"
)

In [12]:
data = tokenizer.encode(text, return_tensor=True)

train_end_index = int(len(data) * 0.9)

train_data = data[:train_end_index]
eval_data = data[train_end_index:]

In [13]:
print(model.evaluate(eval_data))

4.375035762786865


In [14]:
model.train(
    train_data,
    eval_data,
    train_steps=5000,
    eval_freq=500,
    load_best_model_at_end=True
)

 10%|█         | 501/5000 [02:19<4:08:49,  3.32s/it]

Step: 500 - Training loss: 1.9806 - Validation loss: 2.0663


 20%|██        | 1001/5000 [04:39<3:41:13,  3.32s/it]

Step: 1000 - Training loss: 1.6008 - Validation loss: 1.7737


 30%|███       | 1501/5000 [06:58<3:14:00,  3.33s/it]

Step: 1500 - Training loss: 1.4423 - Validation loss: 1.6476


 40%|████      | 2001/5000 [09:18<2:46:22,  3.33s/it]

Step: 2000 - Training loss: 1.3432 - Validation loss: 1.5751


 50%|█████     | 2501/5000 [11:38<2:18:43,  3.33s/it]

Step: 2500 - Training loss: 1.2798 - Validation loss: 1.5273


 60%|██████    | 3001/5000 [13:58<1:50:40,  3.32s/it]

Step: 3000 - Training loss: 1.2267 - Validation loss: 1.4892


 70%|███████   | 3501/5000 [16:17<1:23:06,  3.33s/it]

Step: 3500 - Training loss: 1.1800 - Validation loss: 1.4803


 80%|████████  | 4001/5000 [18:37<55:23,  3.33s/it]  

Step: 4000 - Training loss: 1.1440 - Validation loss: 1.4767


 90%|█████████ | 4501/5000 [20:57<27:42,  3.33s/it]

Step: 4500 - Training loss: 1.1060 - Validation loss: 1.4683


100%|██████████| 5000/5000 [23:17<00:00,  3.58it/s]

Step: 5000 - Training loss: 1.0736 - Validation loss: 1.4723





In [15]:
print(model.evaluate(eval_data))

1.468522071838379


In [16]:
model.save()
tokenizer.save()

In [17]:
prompt = "\n"

input_ids = tokenizer.encode(prompt, return_tensor=True)
generated_ids = model.generate(input_ids, max_new_tokens=5000, do_sample=True)
generated = tokenizer.decode(generated_ids[0])
print(generated)


Where three, if this, they were to God's tent.

First Gentleman:
'Honest man, be perhaps, and not to't.
Why, sir is my own replies; there's throw I am love,
But they have deserved with him marriage
And look your own ears for pardon but royalty.
This tender seems pretised with our foes,
Envire Angelo, in good fellows and beague.
To be call'd fearful a fellow tomorrow.

RATCLIFF:
I say, kind what a horse! Why fool!

KING RICHARD II:
Ay, I am content to the way?

RATCLIFF:

KING RICHARD III:
Do, sweet me, desperate, of you must die,
That makes no gentleman weight me, never are yet to you;
And farewell long by this hourd in my law
Not death that blest you and still you.

DUCHESS OF YORK:
Ay, so your brother's member: a gate of wonder,
To put you take an one about to proclaim.

YORK:
You may fought of the prayers may wive your hands;
And three better than ever you should not
Confirm proved warwick.

DY CAPULET:
What off?

Nurse:
You pray to be not here?
She was reverend our honour in a gen

In [18]:
prompt = "Enter two rivals at dawn.\n\nFirst Rival:"

input_ids = tokenizer.encode(prompt, return_tensor=True)
generated_ids = model.generate(input_ids, do_sample=True)
generated = tokenizer.decode(generated_ids[0])
print(generated)

Enter two rivals at dawn.

First Rival:
Then can I love to Braint, his grace as Paulina blood
Have they made for the royal that will begin too.

Third Citizen:
Beseech you, my lord?

TYRREL:
And heart your highness so ripe is no man's.
Patiently, and then: even you have leave the heart the law,
Even all now you would grow our boy, we pray,
The tribune, and this oient will seek up you.

First Murderer:
What is the kind he would be could as lie,
And there's heaven mean, being kingdom with him.

CLARENCE:
How soft with his amendam?

CLARENCE:
What's the news? and who callsh wither?

AMILLIUS:
Which wants the right of time?

HASTINGS:
And gives one here in Christon.

YORK:
Music is true service and Nessful and poison;
For every old man she may sprow to ourselves.

SICINIUS:
For this matter, peace!

CORIOLANUS:
If it be not apprehended: come, sir.

First Gentleman:
For the lark of Margaret and Clarence
Tybalt, Isabel; a Montague many
That whose fellows inferior tears have wround fought
By w