<a href="https://colab.research.google.com/github/nares10/gpt2_shakespeare/blob/main/gpt2_implementation.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [2]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
import numpy as np

In [68]:
# Load Tiny Shakespeare dataset
url = "https://raw.githubusercontent.com/karpathy/char-rnn/master/data/tinyshakespeare/input.txt"
import requests
text = requests.get(url).text
print("length of dataset in characters: ", len(text))
print(text[:1000])

length of dataset in characters:  1115394
First Citizen:
Before we proceed any further, hear me speak.

All:
Speak, speak.

First Citizen:
You are all resolved rather to die than to famish?

All:
Resolved. resolved.

First Citizen:
First, you know Caius Marcius is chief enemy to the people.

All:
We know't, we know't.

First Citizen:
Let us kill him, and we'll have corn at our own price.
Is't a verdict?

All:
No more talking on't; let it be done: away, away!

Second Citizen:
One word, good citizens.

First Citizen:
We are accounted poor citizens, the patricians good.
What authority surfeits on would relieve us: if they
would yield us but the superfluity, while it were
wholesome, we might guess they relieved us humanely;
but they think we are too dear: the leanness that
afflicts us, the object of our misery, is as an
inventory to particularise their abundance; our
sufferance is a gain to them Let us revenge this with
our pikes, ere we become rakes: for the gods know I
speak this in hung

In [69]:
# Tokenization
chars = sorted(list(set(text)))
vocab_size = len(chars)
char_to_idx = {ch: i for i, ch in enumerate(chars)}
idx_to_char = {i: ch for i, ch in enumerate(chars)}
print("vocab size: ",vocab_size)
print("unique character: ", chars)

vocab size:  65
unique character:  ['\n', ' ', '!', '$', '&', "'", ',', '-', '.', '3', ':', ';', '?', 'A', 'B', 'C', 'D', 'E', 'F', 'G', 'H', 'I', 'J', 'K', 'L', 'M', 'N', 'O', 'P', 'Q', 'R', 'S', 'T', 'U', 'V', 'W', 'X', 'Y', 'Z', 'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r', 's', 't', 'u', 'v', 'w', 'x', 'y', 'z']


In [70]:
def encode(s):
    return [char_to_idx[c] for c in s]

def decode(l):
    return ''.join([idx_to_char[i] for i in l])

In [71]:
# Hyperparameters
block_size = 128  # context length
batch_size = 32
embed_dim = 256
n_heads = 8
n_layers = 12
hidden_dim = 1024
learning_rate = 3e-4
num_epochs = 5
device = 'cuda' if torch.cuda.is_available() else 'cpu'

In [72]:
# Prepare Dataset
class ShakespeareDataset(Dataset):
    def __init__(self, text, block_size):
        data = torch.tensor(encode(text), dtype=torch.long)
        self.inputs = [data[i:i + block_size] for i in range(len(data) - block_size)]
        self.targets = [data[i + 1:i + block_size + 1] for i in range(len(data) - block_size)]

    def __len__(self):
        return len(self.inputs)

    def __getitem__(self, idx):
        return self.inputs[idx], self.targets[idx]

In [73]:
dataset = ShakespeareDataset(text, block_size)
dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=True)

In [74]:
class MultiHeadSelfAttention(nn.Module):
    def __init__(self, embed_dim, n_heads):
        super().__init__()
        assert embed_dim % n_heads == 0, "Embedding dimension must be divisible by number of heads"
        self.embed_dim = embed_dim
        self.n_heads = n_heads
        self.head_dim = embed_dim // n_heads

        self.q_linear = nn.Linear(embed_dim, embed_dim)
        self.k_linear = nn.Linear(embed_dim, embed_dim)
        self.v_linear = nn.Linear(embed_dim, embed_dim)
        self.out_linear = nn.Linear(embed_dim, embed_dim)

    def forward(self, x, mask=None):
        batch_size, seq_len, _ = x.size()

        # Linear projections for query, key, value
        q = self.q_linear(x)  # (batch, seq_len, embed_dim)
        k = self.k_linear(x)
        v = self.v_linear(x)

        # Reshape for multi-head attention: (batch, n_heads, seq_len, head_dim)
        q = q.view(batch_size, seq_len, self.n_heads, self.head_dim).transpose(1, 2)
        k = k.view(batch_size, seq_len, self.n_heads, self.head_dim).transpose(1, 2)
        v = v.view(batch_size, seq_len, self.n_heads, self.head_dim).transpose(1, 2)

        # Scaled dot-product attention
        attn_scores = torch.matmul(q, k.transpose(-2, -1)) / (self.head_dim ** 0.5)
        if mask is not None:
            # mask shape: (seq_len, seq_len); expand to (batch, n_heads, seq_len, seq_len)
            attn_scores = attn_scores.masked_fill(mask == 0, float('-inf'))
        attn_weights = torch.softmax(attn_scores, dim=-1)
        attn_output = torch.matmul(attn_weights, v)  # (batch, n_heads, seq_len, head_dim)

        # Concatenate attention heads
        attn_output = attn_output.transpose(1, 2).contiguous().view(batch_size, seq_len, self.embed_dim)
        output = self.out_linear(attn_output)
        return output

In [75]:
class TransformerDecoderBlock(nn.Module):
    def __init__(self, embed_dim, n_heads, hidden_dim, dropout=0.1):
        super().__init__()
        self.self_attn = MultiHeadSelfAttention(embed_dim, n_heads)
        self.ln1 = nn.LayerNorm(embed_dim)
        self.ff = nn.Sequential(
            nn.Linear(embed_dim, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, embed_dim),
        )
        self.ln2 = nn.LayerNorm(embed_dim)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x, mask=None):
        # Self-attention with residual connection and layer normalization
        attn_out = self.self_attn(x, mask=mask)
        x = self.ln1(x + self.dropout(attn_out))
        # Feed-forward network with residual connection and layer normalization
        ff_out = self.ff(x)
        x = self.ln2(x + self.dropout(ff_out))
        return x

In [76]:
class GPT2(nn.Module):
    def __init__(self, vocab_size, embed_dim, n_heads, n_layers, hidden_dim, block_size, dropout=0.1):
        super().__init__()
        self.block_size = block_size
        self.embedding = nn.Embedding(vocab_size, embed_dim)
        self.pos_embedding = nn.Parameter(torch.zeros(1, block_size, embed_dim))
        self.layers = nn.ModuleList([
            TransformerDecoderBlock(embed_dim, n_heads, hidden_dim, dropout)
            for _ in range(n_layers)
        ])
        self.ln_f = nn.LayerNorm(embed_dim)
        self.lm_head = nn.Linear(embed_dim, vocab_size)

    def forward(self, x):
        batch_size, seq_len = x.size()
        # Embed tokens and add positional embeddings
        #x = self.embedding(x) + self.pos_embedding[:, :seq_len, :]
        x = self.embedding(x) + self.pos_embedding[:, :min(seq_len, self.block_size), :]

        # Create causal mask: (seq_len, seq_len) with ones in the lower triangle
        mask = torch.tril(torch.ones(seq_len, seq_len, device=x.device)).bool()
        # Pass through all transformer decoder layers
        for layer in self.layers:
            x = layer(x, mask=mask)
        x = self.ln_f(x)
        logits = self.lm_head(x)
        return logits


In [77]:
model = GPT2(vocab_size, embed_dim, n_heads, n_layers, hidden_dim, block_size).to(device)
optimizer = optim.AdamW(model.parameters(), lr=learning_rate)
criterion = nn.CrossEntropyLoss()

In [None]:
model.train()
for epoch in range(num_epochs):
    total_loss = 0
    for inputs, targets in dataloader:
        inputs, targets = inputs.to(device), targets.to(device)
        optimizer.zero_grad()
        logits = model(inputs)
        # reshape logits and targets for computing loss
        loss = criterion(logits.view(-1, vocab_size), targets.view(-1))
        loss.backward()
        optimizer.step()
        total_loss += loss.item()
    print(f"Epoch {epoch+1}/{num_epochs}, Loss: {total_loss / len(dataloader):.4f}")

In [81]:
def generate_text(model, start_text, max_length=200):
    model.eval()
    input_seq = torch.tensor(encode(start_text), dtype=torch.long).unsqueeze(0).to(device)
    for _ in range(max_length):
        with torch.no_grad():
            logits = model(input_seq[:, -model.block_size:])

            #logits = model(input_seq)
            # Select the token with the highest probability
            next_token = torch.argmax(logits[:, -1, :], dim=-1).item()
            input_seq = torch.cat([input_seq, torch.tensor([[next_token]], device=device)], dim=1)
    return decode(input_seq.squeeze().tolist())

print(generate_text(model, "JULIET:", 200))

JULIET:
And then the shall be so much since as the sea,
As the shadow of the streets and the streets of his head,
And then the sea is so fair and shall be so.

KING RICHARD III:
Then shall we shall be so far


In [58]:
# Save model parameters
torch.save(model.state_dict(), "gpt2_shakespeare.pth")

In [59]:
# Later, to load the model parameters back:
model = GPT2(vocab_size, embed_dim, n_heads, n_layers, hidden_dim, block_size).to(device)
model.load_state_dict(torch.load("gpt2_shakespeare.pth"))
model.eval()

  model.load_state_dict(torch.load("gpt2_shakespeare.pth"))


GPT2(
  (embedding): Embedding(65, 256)
  (layers): ModuleList(
    (0-11): 12 x TransformerDecoderBlock(
      (self_attn): MultiHeadSelfAttention(
        (q_linear): Linear(in_features=256, out_features=256, bias=True)
        (k_linear): Linear(in_features=256, out_features=256, bias=True)
        (v_linear): Linear(in_features=256, out_features=256, bias=True)
        (out_linear): Linear(in_features=256, out_features=256, bias=True)
      )
      (ln1): LayerNorm((256,), eps=1e-05, elementwise_affine=True)
      (ff): Sequential(
        (0): Linear(in_features=256, out_features=1024, bias=True)
        (1): ReLU()
        (2): Linear(in_features=1024, out_features=256, bias=True)
      )
      (ln2): LayerNorm((256,), eps=1e-05, elementwise_affine=True)
      (dropout): Dropout(p=0.1, inplace=False)
    )
  )
  (ln_f): LayerNorm((256,), eps=1e-05, elementwise_affine=True)
  (lm_head): Linear(in_features=256, out_features=65, bias=True)
)