In [1]:
import torch # to create tensors to store the raw data, weights and biases 
import torch.nn as nn # to make the weights and biases learnable (part of the network)
import torch.nn.functional as F # to apply activation functions 
from torch.optim import Adam  

import lightning as L # to train the model 
from torch.utils.data import DataLoader, TensorDataset # to load the data 
from lightning.pytorch.tuner.tuning import Tuner      
from lightning.pytorch.callbacks import ModelCheckpoint #Lightning is trying to delete the previous checkpoint (because the default ModelCheckpoint is set to save_top_k=1).
                                                        # On Windows a file gets locked as soon as any program (Explorer preview, antivirus, TensorBoard, VS Code, …) opens it, and Windows then blocks the delete call ⇒ PermissionError WinError 32

import pandas as pd # to load the data 
import matplotlib.pyplot as plt # graphs 
import seaborn as sns # graphs

  from .autonotebook import tqdm as notebook_tqdm


In [3]:
token_to_id = {
    "what": 0,
    "is": 1,
    "DL": 2,
    "<EOS>": 3,
    "awesome": 4
}
id_to_token = dict(map(reversed, token_to_id.items()))


inputs = torch.tensor([[token_to_id["what"],
                        token_to_id["is"],
                        token_to_id["DL"],
                        token_to_id["<EOS>"],
                        token_to_id["awesome"]],
                       
                       [token_to_id["DL"],
                        token_to_id["is"],
                        token_to_id["what"],
                        token_to_id["<EOS>"],
                        token_to_id["DL"]]])

labels = torch.tensor([[token_to_id["is"],
                       token_to_id["DL"],
                       token_to_id["<EOS>"],
                       token_to_id["awesome"],
                       token_to_id["<EOS>"]],
                      
                      [token_to_id["is"],
                       token_to_id["what"],
                       token_to_id["<EOS>"],
                       token_to_id["awesome"],
                       token_to_id["<EOS>"]]])

dataset = TensorDataset(inputs, labels)
dataloader = DataLoader(dataset)


In [None]:
class PositionEncoding(nn.Module):
    
    def __init__(self, d_model=2, max_len=6): # d_model is the dimension of the word embeddings and max_len is the maximum length of the sequence that the transformer can process for both inputs and outputs
        super().__init__()
        
        pe = torch.zeros(max_len, d_model) # position encoding matrix
        
        position = torch.arange(start=0, end=max_len, step=1).float().unsqueeze(1) # create a column matrix of positions , arrange() is to createa sequence of numbers from 0 to max_len-1, and unsqueeze(1) is to turn the sequence into a column matrix
        embedding_index = torch.arange(start=0, end=d_model, step=2).float() # create a row matrix of embedding indices, arrange() is to create a sequence of numbers from 0 to d_model-1, and step=2 to get 2i in the math formula
        
        div_term = 1/torch.tensor(10000.0)**(embedding_index / d_model) # create a matrix of division terms
        
        pe[:, 0::2] = torch.sin(position * div_term) # fill the even indices of the position encoding matrix with the sine of the position and division term
        pe[:, 1::2] = torch.cos(position * div_term) # fill the odd indices of the position encoding matrix with the cosine of the position and division term
        
        self.register_buffer('pe', pe) # to ensure pe gets moved to the GPU when the model is moved to the GPU
    
    def forward(self, word_embeddings):
        return word_embeddings + self.pe[:word_embeddings.size(1), :].unsqueeze(0)


In [None]:
class Attention(nn.Module):
    def __init__(self, d_model=2):
        super().__init__()
        self.W_q = nn.Linear(in_features=d_model, out_features=d_model,bias=False) #matrix of weight for the Query 
        self.W_k = nn.Linear(in_features=d_model, out_features=d_model,bias=False) #matrix of weight for the Key
        self.W_v = nn.Linear(in_features=d_model, out_features=d_model,bias=False) #matrix of weight for the Value
        
        self.row_dim=0
        self.col_dim=1
    def forward(self, encoding_for_query, encoding_for_key, encoding_for_value,mask=None):
        q = self.W_q(encoding_for_query) # query matrix
        k = self.W_k(encoding_for_key) # key matrix
        v = self.W_v(encoding_for_value) # value matrix

        #similarity scores are the dot product of the query and key matrices
        similarity_scores = torch.matmul(q, k.transpose(-2, -1)) # matmul is to multiply the query and key matrices, transpose is to swap the rows and columns of the key matrix

        scaled_similarity_scores = similarity_scores / torch.sqrt(torch.tensor(k.size(-1), dtype=torch.float))

        if mask is not None: # if mask is not None, set the similarity scores of the masked positions to -1e9, this is to prevent the model form cheating by looking at the future tokens
            scaled_similarity_scores = scaled_similarity_scores.masked_fill(mask == mask, -1e9)

        attention_percents = F.softmax(scaled_similarity_scores, dim=self.col_dim) # to determine the percentages of influence that each token should have on the others
        
        attention_scores = torch.matmul(attention_percents, v) # to get the attention scores

        return attention_scores

In [None]:
class DecoderOnlyTransformer(L.LightningModule):
    def __init__(self, d_model=2, num_tokens=5, max_len=6):
        super().__init__()

        self.embedding = nn.Embedding(num_embeddings=num_tokens, embedding_dim=d_model)

        self.positional_encoding = PositionEncoding(d_model=d_model, max_len=max_len)

        self.attention_values = Attention(d_model=d_model)

        self.fully_connected_layer = nn.Linear(in_features=d_model, out_features=num_tokens)

        self.loss_fn = nn.CrossEntropyLoss()
    
    def forward(self, token_ids):
        word_embeddings = self.embedding(token_ids)
        positional_encodings = self.positional_encoding(word_embeddings)

        mask = torch.tril(torch.ones(token_ids.size(dim=0),token_ids.size(dim=0))) # to create a lower triangular matrix of ones, this is to prevent the model from cheating by looking at the future tokens
        mask = mask ==0 # to convert 1 to True and 0 to False

        attention_output = self.attention_values(positional_encodings,positional_encodings,positional_encodings,mask=mask)

        residual_connection_values = attention_output + positional_encodings

        fully_connected_layer_output = self.fully_connected_layer(residual_connection_values)

        return fully_connected_layer_output

    def configure_optimizers(self):
        return Adam(self.parameters(), lr=0.1)
    
    def training_step(self, batch, batch_idx):
        input_tokens, labels = batch
        output = self.forward(input_tokens)
        loss = self.loss(output, labels[0])
        
        return loss

In [7]:
model = DecoderOnlyTransformer(num_tokens=len(token_to_id),d_model=2, max_len=6)

model_input = torch.tensor([[token_to_id["what"],
                        token_to_id["is"],
                        token_to_id["DL"],
                        token_to_id["<EOS>"]]])

input_length = model_input.size(dim=0)

predictions = model(model_input)
predicted_id = torch.tensor([torch.argmax(predictions[-1,:])]) 
predicted_ids = predicted_id

max_length = 6
for i in range(input_length, max_length):
    if (predicted_id == token_to_id["<EOS>"]):
        break
    
    model_input = torch.cat([model_input, predicted_id]) # each time we predict, we add it to the input sequence to give th emodel full context 
    
    predictions = model(model_input)
    predicted_id = torch.tensor([torch.argmax(predictions[-1,:])])
    predicted_ids = torch.cat([predicted_ids, predicted_id])

print("Predicted sentence:\n ")
for id in predicted_ids:
    print("\t",id_to_token[id.item()])



RuntimeError: mat1 and mat2 shapes cannot be multiplied (8x1 and 2x4)

In [None]:
ckpt_cb = ModelCheckpoint(save_top_k=-1)   # -1 = keep everything

trainer=L.Trainer(max_epochs=100,callbacks=[ckpt_cb])
trainer.fit(model, train_dataloaders=dataloader)

model_input = torch.tensor([[token_to_id["what"],
                        token_to_id["is"],
                        token_to_id["DL"],
                        token_to_id["<EOS>"]]])

input_length = model_input.size(dim=0)

predictions = model(model_input)
predicted_id = torch.tensor([torch.argmax(predictions[-1,:])]) 
predicted_ids = predicted_id

max_length = 6
for i in range(input_length, max_length):
    if (predicted_id == token_to_id["<EOS>"]):
        break
    
    model_input = torch.cat([model_input, predicted_id]) # each time we predict, we add it to the input sequence to give th emodel full context 
    
    predictions = model(model_input)
    predicted_id = torch.tensor([torch.argmax(predictions[-1,:])])
    predicted_ids = torch.cat([predicted_ids, predicted_id])

print("Predicted sentence:\n ")
for id in predicted_ids:
    print("\t",id_to_token[id.item()])

