In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F

# Use CUDA if available
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [None]:
from pathlib import Path

text = Path("../../../data/tiny-shakespeare.txt").read_text()

In [2]:
print(text[0:1000])

First Citizen:
Before we proceed any further, hear me speak.

All:
Speak, speak.

First Citizen:
You are all resolved rather to die than to famish?

All:
Resolved. resolved.

First Citizen:
First, you know Caius Marcius is chief enemy to the people.

All:
We know't, we know't.

First Citizen:
Let us kill him, and we'll have corn at our own price.
Is't a verdict?

All:
No more talking on't; let it be done: away, away!

Second Citizen:
One word, good citizens.

First Citizen:
We are accounted poor citizens, the patricians good.
What authority surfeits on would relieve us: if they
would yield us but the superfluity, while it were
wholesome, we might guess they relieved us humanely;
but they think we are too dear: the leanness that
afflicts us, the object of our misery, is as an
inventory to particularise their abundance; our
sufferance is a gain to them Let us revenge this with
our pikes, ere we become rakes: for the gods know I
speak this in hunger for bread, not in thirst for revenge.



In [None]:
class CharTokenizer:
    def __init__(self, vocabulary):
        self.token_id_for_char = {
            char: token_id for token_id, char in enumerate(vocabulary)
        }
        self.char_for_token_id = {
            token_id: char for token_id, char in enumerate(vocabulary)
        }

    @staticmethod
    def train_from_text(text):
        vocabulary = set(text)
        return CharTokenizer(sorted(list(vocabulary)))

    def encode(self, text):
        token_ids = []
        for char in text:
            token_ids.append(self.token_id_for_char[char])
        return torch.tensor(token_ids, dtype=torch.long)

    def decode(self, token_ids):
        chars = []
        for token_id in token_ids.tolist():
            chars.append(self.char_for_token_id[token_id])
        return "".join(chars)

    def vocabulary_size(self):
        return len(self.token_id_for_char)

In [None]:
tokenizer = CharTokenizer.train_from_text(text)

In [None]:
print(tokenizer.encode("Hello world"))
print(tokenizer.decode(tokenizer.encode("Hello world")))

In [None]:
print(f"Vocabulary size: {tokenizer.vocabulary_size()}")

In [None]:
from torch.utils.data import Dataset


class TokenIdsDataset(Dataset):
    def __init__(self, data, block_size):
        self.data = data
        self.block_size = block_size

    def __len__(self):
        return len(self.data) - self.block_size

    def __getitem__(self, pos):
        assert pos < len(self.data) - self.block_size

        x = self.data[pos : pos + self.block_size]
        y = self.data[pos + 1 : pos + 1 + self.block_size]
        return x, y

In [None]:
config = {
    "vocabulary_size": tokenizer.vocabulary_size(),
    "context_size": 256,
    "d_embed": 768,
    "heads_num": 12,
    "layers_num": 10,
    "dropout_rate": 0.1,
    "use_bias": False,
}

config["head_size"] = config["d_embed"] // config["heads_num"]

In [None]:
class AttentionHead(nn.Module):
    def __init__(self, config):
        super().__init__()
        self.Q_weights = nn.Linear(
            config["d_embed"], config["head_size"], config["use_bias"]
        )
        self.K_weights = nn.Linear(
            config["d_embed"], config["head_size"], config["use_bias"]
        )
        self.V_weights = nn.Linear(
            config["d_embed"], config["head_size"], config["use_bias"]
        )

        self.dropout = nn.Dropout(config["dropout_rate"])

        casual_attention_mask = torch.tril(
            torch.ones(config["context_size"], config["context_size"])
        )
        self.register_buffer("casual_attention_mask", casual_attention_mask)

    def forward(self, input):
        batch_size, tokens_num, d_embed = input.shape
        Q = self.Q_weights(input)
        K = self.K_weights(input)
        V = self.V_weights(input)

        attention_scores = Q @ K.transpose(1, 2)
        attention_scores = attention_scores.masked_fill(
            self.casual_attention_mask[:tokens_num, :tokens_num] == 0, -torch.inf
        )
        attention_scores = attention_scores / (K.shape[-1] ** 0.5)
        attention_scores = torch.softmax(attention_scores, dim=-1)
        attention_scores = self.dropout(attention_scores)

        return attention_scores @ V

In [None]:
input = torch.rand(8, config["context_size"], config["d_embed"])

In [None]:
ah = AttentionHead(config)

In [None]:
output = ah(input)

In [None]:
output.shape

In [None]:
class MultiHeadAttention(nn.Module):
    def __init__(self, config):
        super().__init__()

        heads_list = [AttentionHead(config) for _ in range(config["heads_num"])]
        self.heads = nn.ModuleList(heads_list)

        self.linear = nn.Linear(config["d_embed"], config["d_embed"])
        self.dropout = nn.Dropout(config["dropout_rate"])

    def forward(self, input):
        heads_outputs = [head(input) for head in self.heads]

        scores_change = torch.cat(heads_outputs, dim=-1)

        scores_change = self.linear(scores_change)
        return self.dropout(scores_change)

In [None]:
mha = MultiHeadAttention(config)

In [None]:
input = torch.rand(8, config["context_size"], config["d_embed"])

In [None]:
output = mha(input)

In [None]:
output.shape

In [None]:
class FeedForward(nn.Module):

    def __init__(self, config):
        super().__init__()

        self.linear_layers = nn.Sequential(
            nn.Linear(config["d_embed"], config["d_embed"] * 4),
            nn.GELU(),
            nn.Linear(config["d_embed"] * 4, config["d_embed"]),
            nn.Dropout(config["dropout_rate"]),
        )

    def forward(self, input):
        return self.linear_layers(input)

In [None]:
ff = FeedForward(config)

In [None]:
input = torch.rand(8, config["context_size"], config["d_embed"])

In [None]:
ouptut = ff(input)

In [None]:
output.shape

In [None]:
class Block(nn.Module):

    def __init__(self, config):
        super().__init__()

        self.multi_head = MultiHeadAttention(config)
        self.layer_norm_1 = nn.LayerNorm(config["d_embed"])

        self.feed_forward = FeedForward(config)
        self.layer_norm_2 = nn.LayerNorm(config["d_embed"])

    def forward(self, input):
        residual = input
        x = self.multi_head(self.layer_norm_1(input))
        x = x + residual

        residual = x
        x = self.feed_forward(self.layer_norm_2(x))
        return x + residual

In [None]:
b = Block(config)

In [None]:
ouptut = b(input)

In [None]:
output.shape

In [None]:
class DemoGPT(nn.Module):
    def __init__(self, config):
        super().__init__()

        self.token_embedding_layer = nn.Embedding(
            config["vocabulary_size"], config["d_embed"]
        )
        self.positional_embedding_layer = nn.Embedding(
            config["context_size"], config["d_embed"]
        )

        blocks = [Block(config) for _ in range(config["layers_num"])]
        self.layers = nn.Sequential(*blocks)

        self.layer_norm = nn.LayerNorm(config["d_embed"])
        self.unembedding = nn.Linear(
            config["d_embed"], config["vocabulary_size"], bias=False
        )

    def forward(self, token_ids):
        batch_size, tokens_num = token_ids.shape

        x = self.token_embedding_layer(token_ids)
        sequence = torch.arange(tokens_num, device=device)
        x = x + self.positional_embedding_layer(sequence)

        x = self.layers(x)
        x = self.layer_norm(x)
        x = self.unembedding(x)

        return x

In [None]:
model = DemoGPT(config).to(device)

In [None]:
output = model(tokenizer.encode("Hi").unsqueeze(dim=0).to(device))

In [None]:
output.shape

In [None]:
def generate(model, prompt_ids, max_tokens):
    output_ids = prompt_ids
    for _ in range(max_tokens):
        if output_ids.shape[1] >= config["context_size"]:
            break
        with torch.no_grad():
            logits = model(output_ids)

        logits = logits[:, -1, :]
        probs = F.softmax(logits, dim=-1)
        # Sample a random token given the softmax distribution
        next_token_id = torch.multinomial(probs, num_samples=1)
        # Add new token to the output, and repeat the process
        output_ids = torch.cat([output_ids, next_token_id], dim=-1)
    return output_ids

In [None]:
def generate_with_prompt(model, tokenizer, prompt, max_tokens=100):
    model.eval()

    prompt = tokenizer.encode(prompt).unsqueeze(dim=0).to(device)

    return tokenizer.decode(generate(model, prompt, max_tokens=max_tokens)[0])

In [None]:
generate_with_prompt(model, tokenizer, "First Citizen:\n")

In [None]:
batch_size = 64

train_iterations = 500
evaluation_interval = 10
learning_rate = 4e-4
train_split = 0.9

In [None]:
# Step 1 - Split Data into Training and Validation Dataset

tokenized_text = tokenizer.encode(text).to(device)
# TODO: Get number of tokens in the training dataset. Should be train_split * number_of_tokens
# TODO: Split data into training and validation datasets

In [None]:
# Step 2 - Create Validation Dataset

train_dataset = TokenIdsDataset(train_data, config["context_size"])
# TODO: Create a validation dataset from the validation data

In [None]:
# Step 3 - Create Validation DataLoader

from torch.utils.data import Dataset, DataLoader, RandomSampler

train_sampler = RandomSampler(
    train_dataset, num_samples=batch_size * train_iterations, replacement=True
)
train_dataloader = DataLoader(
    train_dataset, batch_size=batch_size, sampler=train_sampler
)

validation_sampler = RandomSampler(validation_dataset, replacement=True)
# TODO: Create validation data loader

In [None]:
# Step 4 - Calculate Validation Loss


# Compute validation loss for the model using "batches_num" batches
# from the validation data loader
@torch.no_grad()
def calculate_validation_loss(model, batches_num):
    model.eval()
    total_loss = 0

    # TODO: Create an iterator for the validation data loader

    for _ in range(batches_num):
        idx, targets = next(validation_iter)
        logits = model(idx)

        # TODO: Call "next" function to get input and targets from the iterator
        # TODO: Using the model compute logits given the input

        # TODO: Use the "view" method to convert logits and targets so we could use the "cross_entropy" function
        # It should be similar to how we do it in the training code

        # TODO: calculate cross entropy using logits and target data

        # TODO: Add loss to the "total_loss" variable
        # Note: you would need to use the "item()" method to convert a tensor to a number

    average_loss = total_loss / batches_num

    return average_loss

In [None]:
optimizer = torch.optim.AdamW(model.parameters(), lr=learning_rate)

In [None]:
# Step 5 - Update the Training Loop

import os
from IPython.display import display, clear_output
from matplotlib import pyplot as plt
from IPython.display import display
import ipywidgets as widgets
%matplotlib inline

plot_output = widgets.Output()

display(plot_output)

def update_plot(train_losses, train_steps, validation_losses, validation_steps):

  with plot_output:
    clear_output(wait=True)  # Clear only the plot output, not the text
    plt.figure(figsize=(7, 5))
    plt.plot(train_steps, train_losses, label='Training Loss')
    plt.plot(validation_steps, validation_losses, label='Validation Loss')
    plt.title('Training and Validation Loss')
    plt.xlabel('epoch')
    plt.legend(loc='center left')
    plt.grid(True)
    plt.show()


# Set up lists to store losses for plotting
train_losses = []
train_steps = []
eval_losses = []
eval_steps = []


for step_num, sample in enumerate(train_dataloader):

  model.train()
  input, targets = sample
  logits = model(input)

  logits_view = logits.view(batch_size * config["context_size"], config["vocabulary_size"])
  targets_view = targets.view(batch_size * config["context_size"])
  
  loss = F.cross_entropy(logits_view, targets_view)
  # Backward propagation
  loss.backward()
  # Update model parameters
  optimizer.step()
  # Set to None to reduce memory usage
  optimizer.zero_grad(set_to_none=True)

  train_losses.append(loss.item())
  train_steps.append(step_num)
  # TODO: Append training loss
  # TODO: Append training step

  print(f"Step {step_num}. Loss {loss.item():.3f}")

  if step_num % evaluation_interval == 0:
    print("Demo GPT:\n" + generate_with_prompt(model, tokenizer, "\n"))

    validation_loss = calculate_validation_loss(model, batches_num=10)
    # TODO: Append validation loss
    # TODO: Append validation step

    print(f"Step {step_num}. Validation loss: {validation_loss:.3f}")


  update_plot(train_losses, train_steps, eval_losses, eval_steps)