Shout out to http://peterbloem.nl/blog/transformers for simplifing the OG paper https://arxiv.org/pdf/1706.03762.pdf

In [1]:
import os
import numpy as np
import random
import math
import json

from tqdm.notebook import tqdm

import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.utils.data as data
import torch.optim as optim

import pytorch_lightning as pl
from pytorch_lightning import seed_everything

seed_everything(1337)

Global seed set to 1337


1337

In [2]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [3]:
# dataset idea from https://github.com/karpathy/minGPT/blob/7569ab9d7fc476a783619d56fec10e0a4c8afdd6/play_math.ipynb
class AdditionDataModule(pl.LightningDataModule):
	"""
	Returns addition problems of up to some number of digits in the inputs. Recall
	that all GPT cares about are sequences of integers, and completing them according to
	patterns in the data. Therefore, we have to somehow encode addition problems
	as a sequence of integers.

	The sum of two n-digit numbers gives a third up to (n+1)-digit number. So our
	encoding will simply be the n-digit first number, n-digit second number, 
	and (n+1)-digit result, all simply concatenated together. Because each addition
	problem is so structured, there is no need to bother the model with encoding
	+, =, or other tokens. Each possible sequence has the same length, and simply
	contains the raw digits of the addition problem.

	As a few examples, the 2-digit problems:
	- 85 + 50 = 135 becomes the sequence [8, 5, 5, 0, 1, 3, 5]
	- 6 + 39 = 45 becomes the sequence [0, 6, 3, 9, 0, 4, 5]
	etc.

	We will also only train GPT on the final (n+1)-digits because the first
	two n-digits are always assumed to be given. So when we give GPT an exam later,
	we will e.g. feed it the sequence [0, 6, 3, 9], which encodes that we'd like
	to add 6 + 39, and hope that the model completes the integer sequence with [0, 4, 5]
	in 3 sequential steps.
	"""

	def __init__(self, batch_size=32, split=0.8):
		super().__init__()
		self.ds_X, self.ds_Y = self.get_dataset()
		shuffler = np.random.permutation(self.ds_X.shape[0])
		self.ds_X = self.ds_X[shuffler]
		self.ds_Y = self.ds_Y[shuffler]
		self.split = int(self.ds_X.shape[0]*split)
		self.batch_size = batch_size

	def get_dataset(self):
		ret = []
		for i in range(100):
			for j in range(100):
				s = i+j
			ret.append([i//10, i%10, j//10, j%10, s//100, (s//10)%10, s%10])
		ds = np.array(ret)
		return ds[:, 0:6], np.copy(ds[:, 1:])  

	def train_dataloader(self):
		ds_X_train, ds_Y_train = self.ds_X[0:self.split], self.ds_Y[0:self.split]
		return torch.utils.data.DataLoader(list(zip(ds_X_train, ds_Y_train)), \
			num_workers=16, \
			batch_size=self.batch_size)

	def val_dataloader(self):
		ds_X_test, ds_Y_test = self.ds_X[self.split:], self.ds_Y[self.split:]
		return torch.utils.data.DataLoader(list(zip(ds_X_test, ds_Y_test)), \
			num_workers=16, \
			batch_size=self.batch_size)

In [4]:
def scaled_dot_product_attn(q, k, v):
    d_k = q.size()[-1]
    attn_logits = torch.matmul(q, k.transpose(-2, -1))
    attn_logits = attn_logits / math.sqrt(d_k)
    attention = F.softmax(attn_logits, dim=-1)
    return torch.matmul(attention, v)

In [5]:
class MultiheadAttention(nn.Module):
    
    def __init__(self, embed_dim, num_heads):
        super().__init__()
        assert embed_dim % num_heads == 0, "Embedding dimension must be 0 modulo number of heads."
        
        self.embed_dim = embed_dim
        self.num_heads = num_heads
        self.head_dim = embed_dim // num_heads
        
        self.W_qkv = nn.Linear(embed_dim, 3*embed_dim)
        self.W_o = nn.Linear(embed_dim, embed_dim)


    def forward(self, x):
        batch_size, seq_length, input_dim = x.size()
        assert input_dim == self.embed_dim # sanity check
        qkv = self.W_qkv(x)
        
        # Separate Q, K, V from stacked linear output
        qkv = qkv.reshape(batch_size, seq_length, self.num_heads, 3*self.head_dim)
        qkv = qkv.permute(0, 2, 1, 3) # [Batch, Head, SeqLen, Dims]
        q, k, v = qkv.chunk(3, dim=-1)
        
        # Determine output values
        x = scaled_dot_product_attn(q, k, v)
        x = x.permute(0, 2, 1, 3) # [Batch, SeqLen, Head, Dims]
        x = x.reshape(batch_size, seq_length, self.embed_dim)
        x = self.W_o(x)
        return x

In [6]:
class EncoderBlock(nn.Module):
    
    def __init__(self, embed_dim, num_heads, dim_feedforward=2048, dropout=0.1):
        """
        Inputs:
            embed_dim - Dimensionality of the input
            num_heads - Number of heads to use in the attention block
            dim_feedforward - Dimensionality of the hidden layer in the MLP
            dropout - Dropout probability to use in the dropout layers
        """
        super().__init__()
        
        # Attention layer
        self.self_attn = MultiheadAttention(embed_dim, num_heads)
        
        # Two-layer MLP
        self.linear_net = nn.Sequential(
            nn.Linear(embed_dim, dim_feedforward),
            nn.Dropout(dropout),
            nn.ReLU(inplace=True),
            nn.Linear(dim_feedforward, embed_dim)
        )
        
        # Layers to apply in between the main layers
        self.norm1 = nn.LayerNorm(embed_dim)
        self.norm2 = nn.LayerNorm(embed_dim)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x,):
        # Attention part
        x = self.norm1(x + self.dropout(self.self_attn(x)))
        # MLP part
        x = self.norm2(x + self.dropout(self.linear_net(x)))
        return x

In [7]:
# https://pytorch.org/tutorials/beginner/transformer_tutorial.html#define-the-model
class PositionalEncoding(nn.Module):

    def __init__(self, d_model, max_len=5000):
        """
        Inputs
            d_model - Hidden dimensionality of the input.
            max_len - Maximum length of a sequence to expect.
        """
        super().__init__()

        # Create matrix of [SeqLen, HiddenDim] representing the positional encoding for max_len inputs
        self.pe = torch.zeros(max_len, d_model, device=device)
        position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model))
        self.pe[:, 0::2] = torch.sin(position * div_term)
        self.pe[:, 1::2] = torch.cos(position * div_term)
        self.pe = self.pe.unsqueeze(0)
        
        # register_buffer => Tensor which is not a parameter, but should be part of the modules state.
        # Used for tensors that need to be on the same device as the module.
        self.register_buffer('PositionalEncoding', self.pe)

    def forward(self, x):
        x = x + self.pe[:, :x.size(1)]
        return x

In [8]:
class TransformerPredictor(pl.LightningModule):

    def __init__(self, embed_dim=128, num_classes=10, num_heads=4, num_layers=2, lr=3e-4, dim_feedforward=32, dropout=0.0):
        """
        Inputs:
            embed_dim - Hidden dimensionality to use inside the Transformer
            num_classes - Number of classes to predict per sequence element
            num_heads - Number of heads to use in the Multi-Head Attention blocks
            num_layers - Number of encoder blocks to use.
            lr - Learning rate in the optimizer
            dim_feedforward - Dimensionality of the hidden layer in the MLP
            dropout - Dropout to apply inside the model
        """
        super().__init__()

        self.lr = lr
        self.num_classes = num_classes

        self.model = nn.Sequential(
            nn.Embedding(num_classes, embed_dim),
            PositionalEncoding(d_model=embed_dim),
            *[EncoderBlock(embed_dim, num_heads, dim_feedforward) for x in range(num_layers)],
            nn.Linear(embed_dim, embed_dim),
            nn.LayerNorm(embed_dim),
            nn.ReLU(inplace=True),
            nn.Dropout(dropout),
            nn.Linear(embed_dim, num_classes)
        )

    def forward(self, x):
        return self.model(x)

    def training_step(self, batch, batch_idx):
        x, y = batch
        output = self.model(x)
        loss = F.nll_loss(output.view(-1, self.num_classes), y.view(-1))
        self.log("train_loss", loss)
        return loss
    
    def validation_step(self, val_batch, batch_idx):
        x, y = val_batch
        pred = self.model(x).argmax(dim=2)
        val_accuracy = (pred == y).type(torch.float).mean()
        self.log("val_accuracy", val_accuracy, prog_bar=True)
    
    def configure_optimizers(self):
        return torch.optim.Adam(self.parameters(), lr=self.lr)

In [9]:
data = AdditionDataModule(batch_size=64)
model = TransformerPredictor()
trainer = pl.Trainer(enable_progress_bar=True, max_epochs=5, gpus=1, log_every_n_steps=1)
trainer.fit(model, data)

  rank_zero_deprecation(
GPU available: True (cuda), used: True
TPU available: False, using: 0 TPU cores
IPU available: False, using: 0 IPUs
HPU available: False, using: 0 HPUs
Missing logger folder: /home/rishab/Documents/codes/model-arch-implementation/quick-model-archs/transformers/lightning_logs
LOCAL_RANK: 0 - CUDA_VISIBLE_DEVICES: [0]

  | Name  | Type       | Params
-------------------------------------
0 | model | Sequential | 169 K 
-------------------------------------
169 K     Trainable params
0         Non-trainable params
169 K     Total params
0.677     Total estimated model params size (MB)


Epoch 4: 100%|██████████| 3/3 [00:01<00:00,  2.67it/s, loss=-0.645, v_num=0, val_accuracy=0.325]

`Trainer.fit` stopped: `max_epochs=5` reached.


Epoch 4: 100%|██████████| 3/3 [00:01<00:00,  2.63it/s, loss=-0.645, v_num=0, val_accuracy=0.325]


In [10]:
# tensorboard --logdir .

In [11]:
# !rm -rf lightning_logs