In [1]:
!pip install lightning
!pip install torch

Collecting lightning
  Downloading lightning-2.4.0-py3-none-any.whl.metadata (38 kB)
Downloading lightning-2.4.0-py3-none-any.whl (810 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m811.0/811.0 kB[0m [31m25.6 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: lightning
Successfully installed lightning-2.4.0


In [2]:
import torch
import torch.nn as nn
import torch.nn.functional as functional
from torch.optim import Adam
from torch.utils.data import TensorDataset, DataLoader
import lightning

There are only two prompts we want our transformer to respond to:-

"What is PyTorch?" and "PyTorch is what?"

The answer for both of these would be: "Awesome"

In [3]:
# Vocabulary - Our world only knows 5 tokens!
token_to_id = {"what": 0, "is": 1, "pytorch": 2, "awesome": 3, "<EOS>": 4}
id_to_token = dict(map(reversed, token_to_id.items()))

print(f"{token_to_id=} \n{id_to_token=}")

token_to_id={'what': 0, 'is': 1, 'pytorch': 2, 'awesome': 3, '<EOS>': 4} 
id_to_token={0: 'what', 1: 'is', 2: 'pytorch', 3: 'awesome', 4: '<EOS>'}


In [4]:
# We have 2 sentences, so inputs will be those token_to_id in order
inputs = torch.tensor([
    [
        token_to_id["what"],
        token_to_id["is"],
        token_to_id["pytorch"],
        token_to_id["<EOS>"],
        token_to_id["awesome"]
    ],
    [
        token_to_id["pytorch"],
        token_to_id["is"],
        token_to_id["what"],
        token_to_id["<EOS>"],
        token_to_id["awesome"]
    ]
])

# Each input token's next token to be predicted is below. 
# For the first sentence, we want the decoder to output "is" for the input "what". 
# For the next token "is", we want the decoder to output "pytorch" and so on...
labels = torch.tensor([
    [
        token_to_id["is"],
        token_to_id["pytorch"],
        token_to_id["<EOS>"],
        token_to_id["awesome"],
        token_to_id["<EOS>"]
    ],
    [
        token_to_id["is"],
        token_to_id["what"],
        token_to_id["<EOS>"],
        token_to_id["awesome"],
        token_to_id["<EOS>"]
    ]
])

dataset = TensorDataset(inputs, labels)
dataloader = DataLoader(dataset)

In [5]:
# Now Positional Encoding!

class PositionEncoding(nn.Module):
    def __init__(self, d_model=2, max_len=6):
        # d_model - dimension of model, no. of word embedding values per token - 2.
        # max_len is max no. of tokens that our transformer can process - 6 
        super().__init__()
        pe = torch.zeros(max_len, d_model)
        
        position = torch.arange(start=0, end=max_len, step=1).float().unsqueeze(1)
        embedding_index = torch.arange(start=0, end=d_model, step=2).float()
        
        div_term = 1/torch.tensor(10000.0)**(embedding_index / d_model)
        
        # PE(pos, 2i) = sin(pos/10000^(2i/d_model))
        pe[:, 0::2] = torch.sin(position * div_term)
        
        # PE(pos, 2i+1) = cos(pos/10000^(2i/d_model))
        pe[:, 1::2] = torch.cos(position * div_term)
        
        
        self.register_buffer('pe', pe)
        
    def forward(self, word_embeddings):
        # add positional encodings to word embeddings
        return word_embeddings + self.pe[:word_embeddings.size(0), :]

In [6]:
# Masked Self Attention
class Attention(nn.Module):
    def __init__(self, d_model=2):
        super().__init__()
        
        # in - rows in weight matrix
        # out - cols in output 
        # this returns untrained objects and also does training later on 
        self.W_q = nn.Linear(in_features=d_model, out_features=d_model, bias=False) # query
        self.W_k = nn.Linear(in_features=d_model, out_features=d_model, bias=False) # key
        self.W_v = nn.Linear(in_features=d_model, out_features=d_model, bias=False) # values
        
        self.row_dim = 0
        self.col_dim = 1
    
    def forward(self, encodings_for_q, encodings_for_k, encodings_for_v, mask=None):
        q = self.W_q(encodings_for_q)
        k = self.W_k(encodings_for_k)
        v = self.W_v(encodings_for_v)
        
        # Attention(Q,K,V) = softmax(((QK^T)/(dk)^(1/2)) + M) * V
        sims = torch.matmul(q, k.transpose(dim0=self.row_dim, dim1=self.col_dim))
        scaled_sims = sims / torch.tensor(k.size(self.col_dim) ** 0.5)
        
        if mask is not None:
            scaled_sims = scaled_sims.masked_fill(mask=mask, value=-1e9)
        
        attention_percents = functional.softmax(scaled_sims, dim=self.col_dim)
        attention_scores = torch.matmul(attention_percents, v)
        
        return attention_scores

In [7]:
# Putting them together
class DecoderOnlyTransformer(lightning.LightningModule):
    def __init__(self, num_tokens=4, d_model=2, max_len=6):
        super().__init__()
        
        self.we = nn.Embedding(num_embeddings=num_tokens, embedding_dim=d_model)
        self.pe = PositionEncoding(d_model=d_model, max_len=max_len)
        self.self_attention = Attention(d_model=d_model)
        self.fc_layer = nn.Linear(in_features=d_model, out_features=num_tokens)
        
        self.loss = nn.CrossEntropyLoss() # does softmax too
        
    def forward(self, token_ids):
        word_embeddings = self.we(token_ids)
        position_encoded = self.pe(word_embeddings)
        
        mask = torch.tril(torch.ones((token_ids.size(dim=0), token_ids.size(dim=0))))
        mask = mask == 0
        
        self_attention_values = self.self_attention(position_encoded, position_encoded, position_encoded, mask=mask)
        
        residual_connection_values = position_encoded + self_attention_values
        fc_layer_output = self.fc_layer(residual_connection_values)
        return fc_layer_output
    
    def configure_optimizers(self):
        return Adam(self.parameters(), lr=0.1)
    
    def training_step(self, batch, batch_idx):
        input_tokens, labels = batch
        output = self.forward(input_tokens[0])
        loss = self.loss(output, labels[0])
        
        return loss

In [8]:
model = DecoderOnlyTransformer(num_tokens=len(token_to_id), d_model=2, max_len=6)

model_input = torch.tensor([
    token_to_id["what"],
    token_to_id["is"],
    token_to_id["pytorch"],
    token_to_id["<EOS>"]
])

input_length = model_input.size(dim=0)

predictions = model(model_input)
predicted_id = torch.tensor([torch.argmax(predictions[-1,:])]) # After EOS. Taking largest value
predicted_ids = predicted_id

max_length = 6
for i in range(input_length, max_length):
    if (predicted_id == token_to_id["<EOS>"]):
        break
    
    model_input = torch.cat((model_input, predicted_id))
    
    predictions = model(model_input)
    predicted_id = torch.tensor([torch.argmax(predictions[-1:])])
    predicted_ids = torch.cat((predicted_ids, predicted_id))

print("Predicted Tokens:\n")
for id_ in predicted_ids:
    print("\t", id_to_token[id_.item()])

Predicted Tokens:

	 what
	 awesome
	 what


In [9]:
trainer = lightning.Trainer(max_epochs=30)
trainer.fit(model, train_dataloaders=dataloader)

INFO: GPU available: False, used: False
INFO: TPU available: False, using: 0 TPU cores
INFO: HPU available: False, using: 0 HPUs
INFO: 
  | Name           | Type             | Params | Mode 
------------------------------------------------------------
0 | we             | Embedding        | 10     | train
1 | pe             | PositionEncoding | 0      | train
2 | self_attention | Attention        | 12     | train
3 | fc_layer       | Linear           | 15     | train
4 | loss           | CrossEntropyLoss | 0      | train
------------------------------------------------------------
37        Trainable params
0         Non-trainable params
37        Total params
0.000     Total estimated model params size (MB)
8         Modules in train mode
0         Modules in eval mode
/opt/conda/lib/python3.10/site-packages/lightning/pytorch/trainer/connectors/data_connector.py:424: The 'train_dataloader' does not have many workers which may be a bottleneck. Consider increasing the value of the `num_

Training: |          | 0/? [00:00<?, ?it/s]

INFO: `Trainer.fit` stopped: `max_epochs=30` reached.


In [10]:
# model = DecoderOnlyTransformer(num_tokens=len(token_to_id), d_model=2, max_len=6)
# We are using the trained model now!!!

model_input = torch.tensor([
    token_to_id["what"],
    token_to_id["is"],
    token_to_id["pytorch"],
    token_to_id["<EOS>"]
])

input_length = model_input.size(dim=0)

predictions = model(model_input)
predicted_id = torch.tensor([torch.argmax(predictions[-1,:])]) # After EOS. Taking largest value
predicted_ids = predicted_id

max_length = 6
for i in range(input_length, max_length):
    if (predicted_id == token_to_id["<EOS>"]):
        break
    
    model_input = torch.cat((model_input, predicted_id))
    
    predictions = model(model_input)
    predicted_id = torch.tensor([torch.argmax(predictions[-1:])])
    predicted_ids = torch.cat((predicted_ids, predicted_id))

print("Predicted Tokens:\n")
for id_ in predicted_ids:
    print("\t", id_to_token[id_.item()])

Predicted Tokens:

	 awesome
	 <EOS>


In [11]:
# model = DecoderOnlyTransformer(num_tokens=len(token_to_id), d_model=2, max_len=6)
# We are using the trained model now!!!

model_input = torch.tensor([
    token_to_id["pytorch"],
    token_to_id["is"],
    token_to_id["what"],
    token_to_id["<EOS>"]
])

input_length = model_input.size(dim=0)

predictions = model(model_input)
predicted_id = torch.tensor([torch.argmax(predictions[-1,:])]) # After EOS. Taking largest value
predicted_ids = predicted_id

max_length = 6
for i in range(input_length, max_length):
    if (predicted_id == token_to_id["<EOS>"]):
        break
    
    model_input = torch.cat((model_input, predicted_id))
    
    predictions = model(model_input)
    predicted_id = torch.tensor([torch.argmax(predictions[-1:])])
    predicted_ids = torch.cat((predicted_ids, predicted_id))

print("Predicted Tokens:\n")
for id_ in predicted_ids:
    print("\t", id_to_token[id_.item()])

Predicted Tokens:

	 awesome
	 <EOS>
