<a href="https://colab.research.google.com/github/lorrespz/Transformers-Language-Models--Pytorch-/blob/main/Transformers_Decoder_(GPT_like)_architecture.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Decoder architecture (GPT-like model) from scratch

This code is from Lazy Programmer's Transformers course

https://www.udemy.com/course/data-science-transformers-nlp/

Recall that GPT models are a family of language models in which the main pretraining task is performed by predicting a token using **only** the tokens that come before it. In short, the prediction is for the probability $p_t$

$\langle p_t\,| \ldots, p_{t-2}, p_{t-1}\rangle$

Decoder architecture is largely similar to encoder architecture, with the only difference being the attention block used is the causal version (which implements the dependency solely on past tokens).

In [1]:
import math
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import dataset
import numpy as np
import matplotlib.pyplot as plt

# Causal Attention Block

The structure of the causal attention block is almost identical to that of the normal attention block used in Encoder Language Model, with the exception being the presence of a 'causal attention mask' applied before the softmax in the attention formula to set all past tokens to 1.

In [2]:
class CausalSelfAttention(nn.Module):
  def __init__(self, d_k, d_model, n_heads, max_len):
    super().__init__()
    #Assume d_v = d_k (len(Q) = len(K) = d_k, len(V) = d_v)
    self.d_k = d_k
    self.n_heads = n_heads
    self.key = nn.Linear(d_model, d_k*n_heads)
    self.query = nn.Linear(d_model, d_k*n_heads)
    self.value = nn.Linear(d_model, d_k*n_heads)
    #final linear layer
    self.fc = nn.Linear(d_k*n_heads, d_model)
    #causal mask: a square matrix of size max_len x max_len
    #with the lower triangle half being all 1,
    #upper triangle half being all 0
    cm = torch.tril(torch.ones(max_len, max_len))
    self.register_buffer('causal_mask',
                         cm.view(1,1,max_len, max_len))

  def forward(self, q, k, v, pad_mask = None):
    q = self.query(q)   # N x T x (hd_k)
    k = self.key(k)     # N x T x (hd_k)
    v = self.value(v)    # N x T x (hd_v)
    #h = n_heads
    # N = batch size
    N = q.shape[0]
    # T = sequence length
    T = q.shape[1]

    #change the shape to:
    # (N, T, h, d_k) --> (N, h, T, d_k)
    q = q.view(N, T, self.n_heads, self.d_k).transpose(1,2)
    k = k.view(N, T, self.n_heads, self.d_k).transpose(1,2)
    v = v.view(N, T, self.n_heads, self.d_k).transpose(1,2)

    #compute attention weights
    # q * k^T
    #(N,  h, T,  d_k) x (N, h, d_k, T) --> (N, h, T, T)
    #transposing the last 2 dimensions of k
    attn_scores = q @ k.transpose(-2, -1)/math.sqrt(self.d_k)
    #apply the mask, which is a tensor of size (N,T) of values 0, 1
    #for each of the N samples, need to know which of the T tokens is important
    #Change from 2D to 4D by adding None, which introduces superfluous dim of size 1
    # (N, T) --> (N, 1, 1, T)
    if pad_mask is not None:
      #mask_fill(arg1, arg2): if arg1 = True, apply arg2
      #softmax(-inf) = 0
       attn_scores = attn_scores.masked_fill(pad_mask[:, None, None,:] == 0, float('-inf'))
       #HERE IS THE CAUSAL MASK !!!
       attn_scores = attn_scores.masked_fill(self.causal_mask[:, :, :T,:T] == 0, float('-inf'))
    attn_weights = F.softmax(attn_scores, dim = -1)

    #compute attention-weighted values
    #(N, h, T, T) x (N, h, T, d_k) --> (N, h, T, d_k)
    A = attn_weights @ v

    #reshape it back before the final linear layer
    A = A.transpose(1, 2) # (N, T, h, d_k)
    A = A.contiguous().view(N, T, self.d_k*self.n_heads) #(N, T, h*d_k)

    #final step is to project A with the Linear layer to
    #get the same shape as the input sequence
    return self.fc(A)


# Transformer Block

Almost the same as that used in the Encoder architecture, with the only difference being an additional input 'max_len'

In [3]:
class TransformerBlock(nn.Module):
  def __init__(self, d_k, d_model, n_heads, max_len, dropout_prob = 0.1):
    super().__init__()

    self.ln1 = nn.LayerNorm(d_model)
    self.ln2 = nn.LayerNorm(d_model)
    self.mha = CausalSelfAttention(d_k, d_model, n_heads, max_len)
    self.ann = nn.Sequential(
        nn.Linear(d_model, d_model*4),
        nn.GELU(),
        nn.Linear(d_model*4, d_model),
        nn.Dropout(dropout_prob)
        )
    self.dropout = nn.Dropout(p = dropout_prob)

  def forward(self, x, pad_mask = None):
    #x is an input sequence of size (NxTXD)
    # mask is of size (NxT)
    #FIRST LAYER NORM:
    #pass x in as the query, key, value into the multihead attention block
    #then add the output to the residual 'x' to be passed in the 1st layer norm
    x = self.ln1(x+ self.mha(x,x,x,pad_mask))
    # SECOND LAYER NORM: ann + x
    x = self.ln2(x + self.ann(x))
    x = self.dropout(x)
    return(x)

# Positional Encoding Block

In [4]:
class PositionalEncoding(nn.Module):
  def __init__(self, d_model, max_len = 2048, dropout_prob = 0.1):
    super().__init__()
    self.dropout = nn.Dropout(p = dropout_prob)
    #unsqueeze(1) adds a superfluous dim of size 1 at the end
    #so that we have a 2d array of size (max_len, 1)
    #position is pos variable in the formula
    position = torch.arange(max_len).unsqueeze(1)
    #exp_term is the '2i' in the exponent of the denominator in the formula
    exp_term = torch.arange(0, d_model, 2)
    #this is just the term 10000^(-2i/d_model)
    div_term = torch.exp(exp_term*(-math.log(10000.0)/d_model))
    #PE term
    pe = torch.zeros(1, max_len, d_model)
    #0::2 means 2, 4, 6, 8, ... indexing
    pe[0, :, 0::2] = torch.sin(position*div_term)
    #1::2 means 1,3,5, 7, ... indexing
    pe[0, :, 1::2] = torch.cos(position*div_term)
    #register_buffer allows for saving and loading the model correctly
    self.register_buffer('pe', pe)

  def forward(self, x):
    #x shape: NxTxD (D: d_model)
    x  = x + self.pe[:,:x.size(1), :]
    return self.dropout(x)

# Decoder Block

In [5]:
class Decoder(nn.Module):
  def __init__(self, vocab_size, max_len, d_k, d_model, n_heads, n_layers, dropout_prob):
    super().__init__()

    self.embedding = nn.Embedding(vocab_size, d_model)
    self.pos_encoding = PositionalEncoding(d_model, max_len, dropout_prob)
    transformers_blocks = [TransformerBlock(d_k, d_model, n_heads, max_len, dropout_prob) for _ in range(n_layers)]
    self.transformer_blocks = nn.Sequential(*transformers_blocks)
    self.ln = nn.LayerNorm(d_model)
    self.fc = nn.Linear(d_model, vocab_size)

  def forward(self, x, pad_mask = None):
    x = self.embedding(x)
    x = self.pos_encoding(x)
    for block in self.transformer_blocks:
      x = block(x, pad_mask)
    x = self.ln(x)
    x = self.fc(x) #many_to_many tasl

    return x

# Test the decoder with a random input

In [6]:
model = Decoder(20000, 1024, 16, 64, 4, 2, 0.1)

In [7]:
device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
print(device)
model.to(device)

cuda:0


Decoder(
  (embedding): Embedding(20000, 64)
  (pos_encoding): PositionalEncoding(
    (dropout): Dropout(p=0.1, inplace=False)
  )
  (transformer_blocks): Sequential(
    (0): TransformerBlock(
      (ln1): LayerNorm((64,), eps=1e-05, elementwise_affine=True)
      (ln2): LayerNorm((64,), eps=1e-05, elementwise_affine=True)
      (mha): CausalSelfAttention(
        (key): Linear(in_features=64, out_features=64, bias=True)
        (query): Linear(in_features=64, out_features=64, bias=True)
        (value): Linear(in_features=64, out_features=64, bias=True)
        (fc): Linear(in_features=64, out_features=64, bias=True)
      )
      (ann): Sequential(
        (0): Linear(in_features=64, out_features=256, bias=True)
        (1): GELU(approximate='none')
        (2): Linear(in_features=256, out_features=64, bias=True)
        (3): Dropout(p=0.1, inplace=False)
      )
      (dropout): Dropout(p=0.1, inplace=False)
    )
    (1): TransformerBlock(
      (ln1): LayerNorm((64,), eps=1e-05,

In [8]:
# Create some random sequence of integers representing a sequence of tokens of size (N,T) =(8, 512)
x = np.random.randint(0, 20000, size = (8,512))
x_t = torch.tensor(x).to(device)
x_t

tensor([[ 6043, 10930,   177,  ...,   681, 10249,  8482],
        [ 4863,  1640, 16227,  ...,  2034,  7062, 13644],
        [11400,  7823,  1907,  ..., 18434,   898, 11997],
        ...,
        [ 5067, 19949,  6301,  ..., 17548, 17588,   612],
        [16764,  4361, 17452,  ..., 13244,  6946,   113],
        [14127, 15075, 19763,  ...,  6535,  6059, 17018]], device='cuda:0')

In [9]:
#Create the corresponding mask for the input above
mask = np.ones((8,512))
mask[:, 256:] = 0
mask_t = torch.tensor(mask).to(device)
mask_t

tensor([[1., 1., 1.,  ..., 0., 0., 0.],
        [1., 1., 1.,  ..., 0., 0., 0.],
        [1., 1., 1.,  ..., 0., 0., 0.],
        ...,
        [1., 1., 1.,  ..., 0., 0., 0.],
        [1., 1., 1.,  ..., 0., 0., 0.],
        [1., 1., 1.,  ..., 0., 0., 0.]], device='cuda:0', dtype=torch.float64)

In [10]:
# Pass the input and mask thru the encoder
y = model(x_t, mask_t)
#The shape should be (8, 512, vocab_size = 20000)
y.shape

torch.Size([8, 512, 20000])