### In this notebook, we will use PyTorch + Lightning to create and optimize a Decoder-Only Transformer, like the one shown in the picture below. Decoder-Only Transformers are taking over AI right now, and quite possibly their most famous use is in ChatGPT.

### Import Necessary Libraries

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F #used to access softmax() function to calculate attention
from torch.optim import Adam #used to fit neural network to the data for back propagation
from torch.utils.data import TensorDataset, DataLoader #imported these modules to creat large scale transformers

In [None]:
%pip install pytorch-lightning



In [None]:
import pytorch_lightning as L #used for automatic code optimization and scaling in the cloud

### Create the input and output and data

#### In this tutorial we will build a simple Decoder-Only Transformer that can answer two super simple questions, What is computer science? and computer science is what?, and give them both the same answer, Awesome!!!

#### In order to keep track of our simple dataset, we'll create a dictionary that maps the words and tokens to ID numbers. This is because the class we will use to do word embedding for us, nn.Embedding(), only accepts ID numbers as input, rather than words or tokens.

In [None]:
token_to_id= {'what':0,
              'is':1,
              'computer science':2,
              'awesome':3,
              '<EOS>':4
              }

In [None]:
id_to_token = dict(map(reversed, token_to_id.items()))

In [None]:
inputs= torch.tensor([[token_to_id["what"],
                       token_to_id["is"],
                       token_to_id["computer science"],
                       token_to_id["<EOS>"],
                       token_to_id["awesome"]],
                      [token_to_id["computer science"],
                       token_to_id["is"],
                       token_to_id["what"],
                       token_to_id["<EOS>"],
                       token_to_id["awesome"]]])

In [None]:
labels = torch.tensor([[token_to_id["is"],
                        token_to_id["computer science"],
                        token_to_id["<EOS>"],
                        token_to_id["awesome"],
                        token_to_id["<EOS>"]],

                       [token_to_id["is"],
                        token_to_id["what"],
                        token_to_id["<EOS>"],
                        token_to_id["awesome"],
                        token_to_id["<EOS>"]]])

#### Then we will use the dictionary to create a Dataloader that contains the questions and the desired answers encoded as ID numbers. Ultimately we'll use the Dataloader to train the transformer. NOTE: Dataloaders are designed to scale to very large datasets, so this simple example should be useful even when you have a terabyte of text.

In [None]:
dataset= TensorDataset(inputs,labels)
dataloader= DataLoader(dataset)

## Position Encoding

Position Encoding helps the transformer keep track of the order of the words in the input and the output.

d_model is short for dimensions of the model which represents the number of word embedding values per token.

max_len = maximum number of tokens we allow as input.

Since we are precomputing the position encoding values and storing them in a lookup tablewe can use d_model and max_len to determine the number of rows and columns in that lookup table.

In [None]:
import torch
import torch.nn as nn

class PositionEncoding(nn.Module):
  def __init__(self, d_model=2, max_len=6):
    super().__init__()
    ## We call the super's init because by creating our own __init__() method, we overwrite the one
    #we inherited from nn.Module. So we have to explicity call nn.Module's __init__(), otherwise it
    #won't get initialized.

    #Now we create a lookup table, pe, of position encoding values and initialize all of them to 0.
    #To do this, we will make a matrix of 0s that has max_len rows and d_model columns.
    pe=torch.zeros(max_len, d_model)

    #Now we create a sequence of numbers for each position that a token can have in the input (or output).
    ## For example, if the input tokens where "I'm happy today!", then "I'm" would get the first
    ## position, 0, "happy" would get the second position, 1, and "today!" would get the third position, 2.
    ## NOTE: Since we are going to be doing math with these position indices to create the
    ## positional encoding for each one, we need them to be floats rather than ints. We use torch.arange to create floats

    ### Lastly, .unsqueeze(1) converts the single list of numbers that torch.arange creates into a matrix with
    ## one row for each index, and all of the indices in a single column. So if "max_len" = 3, then we
    ## would create a matrix with 3 rows and 1 column like this
    position= torch.arange(start=0, end=max_len,step=1).float().unsqueeze(1)

    ### The positional encoding equations used in "Attention is all you need" are.
    ## PE(pos, 2i)   = sin(pos / 10000^(2i/d_model))
    ## PE(pos, 2i+1) = cos(pos / 10000^(2i/d_model))
    ## and we see, within the sin() and cos() functions, we divide "pos" by some number that depends
    ## on the index (i) and total number of PE values we want per token (d_model).
    ## NOTE: When the index, i, is 0 then we are calculating the y-axis coordinates on the **first pair**
    ## of sine and cosine curves. When i=1, then we are calculating the y-axis coordiantes on the
    ## **second pair** of sine and cosine curves. etc. etc.

    embedding_index=torch.arange(start=0, end=d_model, step=2).float()

    #Now let's create an index for the embedding positions to simplify the code a little more.
    # Div term is originally calculated with the following formula:div_term = torch.exp(torch.arange(start=0, end=d_model, step=2).float() * -(math.log(10000.0) / d_model))
    #However to problem with this is underflowing (getting close to 0), so to prevent this i wrapped everything in a call to torch.exp() and used torch.log to convert it to a tensor
    div_term= 1/torch.tensor(10000.0)**(embedding_index/d_model)
    pe[:,0::2]=torch.sin(position*div_term)
    pe[:,1::2]=torch.cos(position*div_term)


    #Now we "register 'pe'.
    self.register_buffer('pe',pe)
    ## "register_buffer()" ensures that
    ## 'pe' will be moved to wherever the model gets
    ## moved to. So if the model is moved to a GPU, then,
    ## even though we don't need to optimize 'pe', it will
    ## also be moved to that GPU. This, in turn, means
    ## that accessing 'pe' will be relatively fast copared
    ## to having a GPU have to get the data from a CPU.


# we will add the position encoding values to the word embedding values
  def forward(self, word_embeddings):
    return word_embeddings+ self.pe[:word_embeddings.size(0),:]

#### We're going to code an Attention class to do all of the types of attention that a transformer might need: Self-Attention, Masked Self-Attention (which is used by the Decoder during training), and Encoder-Decoder Attention.

(Refer to Document for Matrix Math/ Softmax Function Interpertations and Visualisations provided in the Github Repository)

### Attention Class

In [None]:
class Attention(nn.Module):
  def __init__(self,d_model=2):
    super().__init__()
    self.d_model= d_model

    ## Initialize the Weights (W) that we'll use to create the
        ## query (q), key (k) and value (v) numbers for each token
    self.query= nn.Linear(in_features=d_model, out_features=d_model, bias=False) #query matrix
    self.key= nn.Linear(in_features=d_model, out_features=d_model, bias=False) #key matrix
    self.value= nn.Linear(in_features=d_model, out_features=d_model, bias= False) # value matrix

    ## NOTE: In this simple example, we are not training on the data in "batches"
        ## However, by defining variables for row_dim and col_dim, we could
        ## allow for batches by setting row_dim to 1 and col_com to 2.
    self.row_dim=0
    self.col_dim=1

  def forward (self, encodings_for_q, encodings_for_k, encodings_for_v, mask=None):
    ## Create the query, key and values using the encodings
        ## associated with each token (token encodings)
       ## NOTE: For Encoder-Decoder Attention, the encodings for q come from
        ##the decoder and the encodings for k and v come from the output
        ## from the encoder.
    q=self.query(encodings_for_q)
    k=self.key(encodings_for_k)
    v=self.value(encodings_for_v)

    ## Compute attention scores
        ## the equation is (q * k^T)/sqrt(d_model)
        ## NOTE: It seems most people use "reverse indexing" for the dimensions when transposing k
        ## k.transpose(dim0, dim1) will transpose k by swapping dim0 and dim1
        ## In standard matrix notation, we would want to swap rows (dim=0) with columns (dim=1)
        ## If we have 3 dimensions, because of batching, and the batch was the first dimension
        ## And thus dims are defined batch = 0, rows = 1, columns = 2
        ## then dim0=-2 = 3 - 2 = 1. dim1=-1 = 3 - 1 = 2.

    sims= torch.matmul(q, k.transpose(dim0=self.row_dim, dim1=self.col_dim))
    scaled_sims= sims/torch.tensor(k.size(self.col_dim)**0.5)

    if mask is not None:
      ## Here we are masking out things we don't want to pay attention to,
        ## like tokens that come after the current token.
      scaled_sims= scaled_sims.masked_fill(mask==0,value=-1e9)

    ## Apply softmax to determine what percent of each token's value to
        ## use in the final attention values.
    attention_percents= F.softmax(scaled_sims, dim=self.col_dim)

    # Scale the values by their associated percentages and add them up
    attention_scores=torch.matmul(attention_percents, v)
    return attention_scores

### Decoder Class

A Decoder-Only Transformer simply brings together...

Word Embedding

Position Encoding

Masked Self-Attention

Residual Connections

A fully connected layer

SoftMax - However, the loss function we are using nn.CrossEntropyLoss(), applies the SoftMax for us.

In [None]:
class DecoderOnlyTransformer(L.LightningModule):

    def __init__(self, num_tokens=4, d_model=2, max_len=6):
        super().__init__()
        L.seed_everything(seed=42)
        self.we = nn.Embedding(num_embeddings=num_tokens,
                               embedding_dim=d_model)
        self.pe = PositionEncoding(d_model=d_model,
                                   max_len=max_len)
        self.self_attention = Attention(d_model=d_model)
        ## NOTE: In this simple example, we are not doing multi-head attention
        ## If we wanted to do multi-head attention, we could
        ## initailize more Attention objects like this...
        ## self.self_attention_2 = Attention(d_model=d_model)
        ## self.self_attention_3 = Attention(d_model=d_model)
        ## If d_model=2, then using 3 self_attention objects would
        ## result in d_model*3 = 6 self-attention values per token,
        ## so we would need to initialize
        ## a fully connected layer to reduce the dimension of the
        ## self attention values back down to d_model like this:
        ## self.reduce_attention_dim = nn.Linear(in_features=(num_attention_heads*d_model), out_features=d_model)

        self.fc_layer = nn.Linear(in_features=d_model, out_features=num_tokens)
        self.loss = nn.CrossEntropyLoss()


    def forward(self, token_ids):
      ### For the decoder-only transformer, we need to use "masked self-attention" so that
        ## when we are training we can't cheat and look ahead at
        ## what words come after the current word.
        ## To create the mask we are creating a matrix where the lower triangle
        ## is filled with 0, and everything above the diagonal is filled with 0s.

        word_embeddings = self.we(token_ids)
        position_encoded = self.pe(word_embeddings)
        mask = torch.tril(torch.ones((token_ids.size(dim=0), token_ids.size(dim=0)), device=self.device))
        mask = mask == 0
        self_attention_values = self.self_attention(position_encoded,
                                                    position_encoded,
                                                    position_encoded,
                                                    mask=mask)
        residual_connection_values = position_encoded + self_attention_values
        fc_layer_output = self.fc_layer(residual_connection_values)
        return fc_layer_output


    def configure_optimizers(self):
        ## configure_optimizers() simply passes the parameters we want to
        ## optimize to the optimzes and sets the learning rate
        return Adam(self.parameters(), lr=0.1)


    def training_step(self, batch, batch_idx):
        ## training_step() is called by Lightning trainer when
        ## we want to train the model.
        input_tokens, labels = batch # collect input
        output = self.forward(input_tokens[0])
        loss = self.loss(output, labels[0])

        return loss

To use the transformer, we run an input phrase, either what is computer science <**EOS**> or computer science is what **<EOS>**, through the transformer to get the next predicted token. If the next predicted token is not <**EOS**>, then we add the predicted token to the input tokens and run that through the transformer and repeat until we get the **<EOS>** token or reach the maximum sequence length.

In [None]:
## First, create a model from DecoderOnlyTransformer()
model= DecoderOnlyTransformer(num_tokens=len(token_to_id),d_model=2, max_len=6)

## Now create the input for the transformer.
model_input= torch.tensor([token_to_id["what"],
                            token_to_id["is"],
                            token_to_id["computer science"],
                            token_to_id["<EOS>"]])
input_length= model_input.size(dim=0)

## Now get get predictions from the model
## NOTE: "predictions" is the output from the fully connected layer,
##not a softmax() function. We could, if we wanted to,
##Run "predictions" through a softmax() function, but
## since we're going to select the item with the largest value
## we can just use argmax instead.
predictions=model(model_input)

## We'll store predicted_id in an array, predicted_ids, that
## we'll add to each time we predict a new output token.
predicted_id= torch.tensor([torch.argmax(predictions[-1,:])])
predicted_ids= predicted_id

max_length=6
## Now use a loop to predict output tokens until we get an
## <EOS> token.
for i in range(input_length, max_length):
  if (predicted_id==token_to_id["<EOS>"]): # if the prediction is <EOS>, then we are done
    break
  model_input= torch.cat((model_input, predicted_id))
  predictions=model(model_input)
  predicted_id= torch.tensor([torch.argmax(predictions[-1,:])])
  predicted_ids= torch.cat((predicted_ids, predicted_id))

## Now printout the predicted output phrase.
print("Predicted Tokens:\n")
for id in predicted_ids:
    print("\t", id_to_token[id.item()])

INFO:lightning_fabric.utilities.seed:Seed set to 42


Predicted Tokens:

	 <EOS>


### And, without training, the transformer predicts EOS, but we wanted it to predict awesome EOS So, since the transformer didn't correctly respond to the prompt, we'll have to train it


In [None]:
trainer= L.Trainer(max_epochs=100)
trainer.fit(model, dataloader)

INFO:pytorch_lightning.utilities.rank_zero:💡 Tip: For seamless cloud uploads and versioning, try installing [litmodels](https://pypi.org/project/litmodels/) to enable LitModelCheckpoint, which syncs automatically with the Lightning model registry.
INFO:pytorch_lightning.utilities.rank_zero:GPU available: False, used: False
INFO:pytorch_lightning.utilities.rank_zero:TPU available: False, using: 0 TPU cores
INFO:pytorch_lightning.utilities.rank_zero:HPU available: False, using: 0 HPUs
INFO:pytorch_lightning.callbacks.model_summary:
  | Name           | Type             | Params | Mode 
------------------------------------------------------------
0 | we             | Embedding        | 10     | train
1 | pe             | PositionEncoding | 0      | train
2 | self_attention | Attention        | 12     | train
3 | fc_layer       | Linear           | 15     | train
4 | loss           | CrossEntropyLoss | 0      | train
------------------------------------------------------------
37        Tr

Training: |          | 0/? [00:00<?, ?it/s]

INFO:pytorch_lightning.utilities.rank_zero:`Trainer.fit` stopped: `max_epochs=100` reached.


Now that we've trained the transformer, let's use it!

To use the transformer that we just trained, we just repeat what we did earlier, only this time we use the trained transformer instead of an untrained transformer. First, we'll see if it correctly responds to the prompt Computer science is what?

In [None]:
## Now let's ask the other question...
model_input = torch.tensor([token_to_id["computer science"],
                            token_to_id["is"],
                            token_to_id["what"],
                            token_to_id["<EOS>"]])
input_length = model_input.size(dim=0)

predictions = model(model_input)
predicted_id = torch.tensor([torch.argmax(predictions[-1,:])])
predicted_ids = predicted_id

for i in range(input_length, max_length):
    if (predicted_id == token_to_id["<EOS>"]): # if the prediction is <EOS>, then we are done
        break

    model_input = torch.cat((model_input, predicted_id))

    predictions = model(model_input)
    predicted_id = torch.tensor([torch.argmax(predictions[-1,:])])
    predicted_ids = torch.cat((predicted_ids, predicted_id))

print("Predicted Tokens:\n")
for id in predicted_ids:
    print("\t", id_to_token[id.item()])

Predicted Tokens:

	 <EOS>


We got the correct output! Now let's see if it correctly responds to the prompt what is computer science?

In [None]:
model_input = torch.tensor([token_to_id["what"],
                            token_to_id["is"],
                            token_to_id["computer science"],
                            token_to_id["<EOS>"]])
input_length = model_input.size(dim=0)

predictions = model(model_input)
predicted_id = torch.tensor([torch.argmax(predictions[-1,:])])
predicted_ids = predicted_id

for i in range(input_length, max_length):
    if (predicted_id == token_to_id["<EOS>"]): # if the prediction is <EOS>, then we are done
        break

    model_input = torch.cat((model_input, predicted_id))

    predictions = model(model_input)
    predicted_id = torch.tensor([torch.argmax(predictions[-1,:])])
    predicted_ids = torch.cat((predicted_ids, predicted_id))

print("Predicted Tokens:\n")
for id in predicted_ids:
    print("\t", id_to_token[id.item()])

Predicted Tokens:

	 <EOS>


And the output for both questions is awesome EOS, which is exactly what we want.