In [1]:
!pip install torchmetrics

import torch
import torch.optim as optim
import torch.nn as nn
from torch.nn import functional as F

from torchmetrics import Accuracy, BLEUScore

device = 'cuda' if torch.cuda.is_available() else 'cpu'

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting torchmetrics
  Downloading torchmetrics-0.11.4-py3-none-any.whl (519 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m519.2/519.2 kB[0m [31m16.2 MB/s[0m eta [36m0:00:00[0m
Installing collected packages: torchmetrics
Successfully installed torchmetrics-0.11.4


### Tokenization

In [2]:
class CharacterTokenizer:
  def __init__(self, path):    
    with open(path, 'r', encoding='utf-8') as f:
        self.text = f.read()

    self.chars = sorted(list(set(self.text)))
    self.vocab_size = len(self.chars)

    self.stoi = { ch:i for i,ch in enumerate(self.chars) }
    self.itos = { i:ch for i,ch in enumerate(self.chars) }
    
  def encode(self):
    raw_data = []
    
    for char in self.text:
      raw_data.append(self.stoi[char])
    
    return raw_data

  def decode(self, encoded_data):
    raw_data = []
  
    for char_enc in encoded_data:
      raw_data.append(self.itos[char_enc])
    
    return raw_data

In [3]:
tokenizer = CharacterTokenizer('./input.txt')

### Train Test Split

In [4]:
data = torch.tensor(tokenizer.encode(), dtype=torch.long)
n = int(0.9*len(data))
train_data = data[:n]
val_data = data[n:]

### Batch maker (Basically an adhoc DataLoader)



In [5]:
def get_batch(split):
    # generate a small batch of data of inputs x and targets y
    data = train_data if split == 'train' else val_data
    ix = torch.randint(len(data) - block_size, (batch_size,))
    x = torch.stack([data[i:i+block_size] for i in ix])
    y = torch.stack([data[i+1:i+block_size+1] for i in ix])
    x, y = x.to(device), y.to(device)
    return x, y


### Feed Forward Network which will come after MultiHead SA

In [6]:
class FeedForwardNetwork(nn.Module):
  def __init__(self, n_embedding, dropout):
    super().__init__()

    self.fn = nn.Sequential(
        nn.Linear(n_embedding, 4 * n_embedding),
        nn.ReLU(),
        nn.Linear(4 * n_embedding, n_embedding),
        nn.Dropout(dropout)
    )

  def forward(self, x):
    return self.fn(x)

### PyTorch inbuilt MultiHeadAttention and its TransformerBlock class

In [None]:
class TransformerBlock(nn.Module):
  def __init__(self, n_embedding, n_heads, dropout, block_size=None) -> None:
    super().__init__()
    
    self.self_attention = nn.MultiheadAttention(embed_dim=n_embedding,
                                                 num_heads=n_heads,
                                                 device=device,
                                                 dropout=dropout)  # simple multihead attention from pytorch
    
    self.feed_forward = FeedForwardNetwork(n_embedding, dropout)
    self.layer_norm_1 = nn.LayerNorm(n_embedding)
    self.layer_norm_2 = nn.LayerNorm(n_embedding)

  
  def forward(self, x):
    o = self.layer_norm_1(x)
    o, _ = self.self_attention(o,o,o)
    x = x + o
    x = x + self.feed_forward(self.layer_norm_2(x))

    return x

### Self attention built from scratch and its own TransformerBlock class

In [7]:
class SelfAttentionHead(nn.Module):
  def __init__(self, n_embedding, head_size, block_size, dropout):
    super().__init__()
    self.dropout = nn.Dropout(dropout)

    self.Query = nn.Linear(in_features=n_embedding, out_features=head_size, bias=False)
    self.Key = nn.Linear(in_features=n_embedding, out_features=head_size, bias=False)
    self.Value = nn.Linear(in_features=n_embedding, out_features=head_size, bias=False)

    self.register_buffer('tril', torch.tril(torch.ones(block_size, block_size)))   # I thought of using a no gradient tensor but this is better

  def forward(self, x):
    Batch, Time, Channel = x.shape

    q = self.Query(x)
    k = self.Key(x)
    v = self.Value(x)

    dot_prod = q @ k.transpose(-2, -1) * k.shape[-1]**0.5 # Stability - basically 1 variance (Evenly distributed)
    dot_prod = dot_prod.masked_fill(self.tril[:Time, :Time]==0, float('-inf'))  # Bars the attention from looking ahead
    dot_prod = F.softmax(dot_prod, dim=-1)
    dot_prod = self.dropout(dot_prod)

    out = dot_prod @ v

    return out

In [8]:
class MultiHeadSelfAttention(nn.Module):
  def __init__(self, n_heads, n_embedding, head_size, block_size, dropout):
    super().__init__()

    self.attention_blocks = nn.ModuleList([SelfAttentionHead(n_embedding, head_size, block_size, dropout) for _ in range(n_heads)])
    self.projection_layer = nn.Linear(in_features=n_heads*head_size, out_features=n_embedding)
    self.dropout = nn.Dropout(dropout)

  def forward(self, x):
    
    out = torch.cat([h(x) for h in self.attention_blocks], dim=-1)  # concatenated vectors from each attention head
    out = self.dropout(self.projection_layer(out)) # Finally getting a vector of same size as n_embedding

    return out


In [9]:
class TransformerBlock(nn.Module):
  def __init__(self, n_embedding, n_heads, dropout, block_size) -> None:
    super().__init__()
    
    head_size = n_embedding//n_heads
    
    self.self_attention = MultiHeadSelfAttention(n_heads, n_embedding, head_size, block_size, dropout)

    self.feed_forward = FeedForwardNetwork(n_embedding, dropout)
    self.layer_norm_1 = nn.LayerNorm(n_embedding)
    self.layer_norm_2 = nn.LayerNorm(n_embedding)

  
  def forward(self, x):
    o = self.layer_norm_1(x)
    o = self.self_attention(o)
    x = x + o  # Residual connection
    x = x + self.feed_forward(self.layer_norm_2(x)) # Residual connection

    return x

### Nano GPT

In [10]:
class NanoGPT(nn.Module):
  def __init__(self, vocab_size, n_embedding, block_size, n_heads, n_layers, dropout):
    super().__init__()

    self.token_embeddding_lookup = nn.Embedding(vocab_size, n_embedding)
    self.block_embedding_lookup = nn.Embedding(block_size, n_embedding)  #basically postional information

    self.transformer_blocks = nn.Sequential(*[TransformerBlock(n_embedding, n_heads, dropout, block_size) for _ in range(n_layers)])
    self.layer_norm = nn.LayerNorm(n_embedding)
    self.linear_head = nn.Linear(n_embedding, vocab_size)

  def forward(self, idx, targets):
    Batch, Time = idx.shape

    token_embedding = self.token_embeddding_lookup(idx)
    positional_embedding = self.block_embedding_lookup(torch.arange(Time, device=device))
    
    x = token_embedding + positional_embedding
    x = self.transformer_blocks(x)
    x = self.layer_norm(x)
    
    logits = self.linear_head(x)
    Batch, Time, Channel = logits.shape
    logits = logits.view(Batch*Time, Channel)  #Just changing the view of tensor not copying data
    targets = targets.view(Batch*Time) #Just changing the view of tensor not copying data
    
    loss = F.cross_entropy(logits, targets)

    return logits, loss

### Training Loop

Hyperparams

In [19]:
batch_size = 128
block_size = 256
epochs = 2400
eval_interval = 500
learning_rate = 0.1 #3e-4
eval_epochs = 200
embedding_dim = 384
n_head = 6
n_layer = 6
dropout = 0.2

In [20]:
model = NanoGPT(vocab_size=tokenizer.vocab_size,
                n_embedding=embedding_dim,
                block_size=block_size,
                n_heads=n_head,
                n_layers=n_layer,
                dropout=dropout)

model.to(device)
optimizer = optim.SGD(model.parameters(), lr=learning_rate, momentum=0.9) #optim.Adam(model.parameters(), lr=learning_rate)  

In [13]:
@torch.no_grad()
def estimate_loss(model, eval_epochs):
  out={}
  model.eval()
  for split in ['train', 'val']:
    losses = torch.zeros(eval_epochs)
    for k in range(eval_epochs):
        X, Y = get_batch(split)
        logits, loss = model(X, Y)
        losses[k] = loss.item()
    out[split] = losses.mean()
  model.train()
  return out

In [None]:
for epoch in range(epochs):  
  X, y = get_batch('train')
  logits, loss = model(X, y)
  loss.backward()
  optimizer.step()
  optimizer.zero_grad()

  if epoch % eval_epochs == 0 or epoch == epochs-1:
    losses = estimate_loss(model, eval_epochs)
    print(f"Epoch {epoch}: Train loss {losses['train']:.3f}, Validation loss {losses['val']:.3f}")
  

Epoch 0: Train loss 3.874, Validation loss 3.888
Epoch 200: Train loss 2.477, Validation loss 2.497
Epoch 400: Train loss 2.452, Validation loss 2.474


In [15]:
@torch.no_grad()
def evaluate_model(model):
  accuracy = Accuracy(task='multiclass', num_classes=65, top_k=10).to(device)
  model.eval()
  out={}
  for split in ['train', 'val']:
    accuracies = torch.zeros(eval_epochs)
    for k in range(eval_epochs):
        X, Y = get_batch(split)
        logits, _ = model(X, Y)
        # logits = torch.argmax(logits, dim=-1)
        Y = torch.flatten(Y)
        acc = accuracy(logits, Y)
        accuracies[k] = acc.item()
    out[split] = accuracies.mean()
  model.train()
  return out

accuracies = evaluate_model(model)

In [16]:
print(f"Train top-10 accuracy {accuracies['train']:.3f}, Validation top-10 accuracy {accuracies['val']:.3f}")

Train top-10 accuracy 0.850, Validation top-10 accuracy 0.845
