In [1]:
import pip 
try:
    __import__("lightning")
except ImportError:
    pip.main(['install', "lightning"])

Please see https://github.com/pypa/pip/issues/5599 for advice on fixing the underlying issue.
To avoid this problem you can invoke Python with '-m pip' instead of running pip directly.


In [3]:
import torch 
import torch.nn as nn 
import torch.nn.functional as F 
from torch.optim import Adam 

from torch.utils.data import TensorDataset, DataLoader 
import lightning as L # for scaling the code 

  from .autonotebook import tqdm as notebook_tqdm


In [7]:
token_to_id = {
    'what': 0,
    'is': 1,
    'SleepingAI': 2,
    'lovely': 3,
    '<EOS>': 4, # end of sequence 
}

id_to_token = dict(map(reversed, token_to_id.items()))

inputs = torch.tensor([[token_to_id["what"],
                        token_to_id["is"],
                        token_to_id["SleepingAI"],
                        token_to_id["<EOS>"],
                        token_to_id["lovely"]], 
                       
                       [token_to_id["SleepingAI"],
                        token_to_id["is"],
                        token_to_id["<EOS>"],
                        token_to_id["lovely"],
                        token_to_id["<EOS>"]]])

labels = torch.tensor([[token_to_id["is"], 
                        token_to_id["SleepingAI"], 
                        token_to_id["<EOS>"], 
                        token_to_id["lovely"], 
                        token_to_id["<EOS>"]],  
                       
                       [token_to_id["is"], 
                        token_to_id["what"], 
                        token_to_id["<EOS>"], 
                        token_to_id["lovely"], 
                        token_to_id["<EOS>"]]])

In [8]:
dataset = TensorDataset(inputs, labels) 
dataloader = DataLoader(dataset)

Writing the positional encoding function

In [9]:
class PositionalEncoding(nn.Module):
    def __init__(self, d_model=2, max_len=6):
        super().__init__()
        pe = torch.zeros(max_len, d_model)
        position = torch.arange(start=0, end=max_len, step=1).float().unsqueeze(1)
        embedding_index = torch.arange(start=0, end=d_model, step=2).float()
        div_term = 1/torch.tensor(10000.0)**(embedding_index / d_model)
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        self.register_buffer('pe', pe)
    
    def forward(self, word_embeddings):
        return word_embeddings + self.pe[:word_embeddings.size(0), :]

Attention Markdown mechnism

In [10]:
class Attention(nn.Module):
    def __init__(self, d_model=2):
        super().__init__()
        self.W_q = nn.Linear(in_features=d_model, out_features=d_model, bias=False)
        self.W_k = nn.Linear(in_features=d_model, out_features=d_model, bias=False)
        self.W_v = nn.Linear(in_features=d_model, out_features=d_model, bias=False)
        
        self.row_dim = 0
        self.col_dim = 1 
        
    def forward(self, encoding_for_q, encoding_for_k, encoding_for_v, mask=None):
        q = self.W_q(encoding_for_q)
        k = self.W_k(encoding_for_k)
        v = self.W_v(encoding_for_v)
        
        sims = torch.matmul(q, k.transpose(dim0=self.row_dim, dim1=self.col_dim))
        scaled_sims = sims / torch.tensor(k.size(self.col_dim) ** 0.5)
        if mask is not None:
            scaled_sims = scaled_sims.masked_fill(mask=mask, value=1e-9)
        attention_percents = F.softmax(scaled_sims, dim=self.col_dim)
        attention_scores = torch.matmul(attention_percents, v)
        return attention_scores

In [12]:
class DecoderOnlyTransformer(L.LightningDataModule):
    def __init__(self, num_tokens=4, d_model=2, max_len=6):
        super().__init__()
        L.seed_everything(seed=42)
        self.we = nn.Embedding(num_embeddings=num_tokens,
                               embedding_dim=d_model)
        self.pe = PositionalEncoding(d_model=d_model, max_len=max_len)
        self.self_attention = Attention(d_model=d_model)
        self.fc_layer = nn.Linear(in_features=d_model, out_features=num_tokens)
        self.loss = nn.CrossEntropyLoss()
        
    def forward(self, token_ids):
        word_embeddings = self.we(token_ids)
        position_encoded = self.pe(word_embeddings)
        mask = torch.tril(torch.ones((token_ids.size(dim=0), token_ids.size(dim=0))))
        mask = mask == 0
        self_attention_values = self.self_attention(position_encoded,
                                                    position_encoded, 
                                                    position_encoded,
                                                    mask=mask)
        residual_connection_values = position_encoded + self_attention_values
        fc_layer_output = self.fc_layer(residual_connection_values)
        return fc_layer_output
    
    def configure_optimizer(self):
        return Adam(self.paramters(), lr=0.1)
    
    def training_step(self, batch, batch_idx):
        input_tokens, labels = batch 
        output = self.forward(input_tokens[0])
        loss = self.loss(output, labels[0])
        return loss

We write a slightly different version of Transformers

In [None]:
import pip 
pip.main(['install', 'torchtext'])

In [19]:
# importing libraries 
import torch 
import torch.nn as nn 
import torch.optim as optim 
import time

In [20]:
# Embedding layer 
class Embedding(nn.Module):
    def __init__(self, vocab_size, max_len, embed_dim, dropout=0.1):
        super().__init__()
        self.word_embed = nn.Embedding(vocab_size, embed_dim)
        self.pos_embed = nn.Embedding(max_len, embed_dim)
        self.dropout = nn.Dropout(dropout)
    
    def forward(self, x):
        batch_size, seq_len = x.shape
        device = torch.device('cuda' if torch.cuda_is_available() else 'cpu')
        positions = torch.arange(0, seq_len).expand(
            batch_size, seq_len
        ).to(device)
        embedding = self.word_embed(x) + self.pos_embed(positions)
        return self.dropout(embedding)

Writing Multi-head attention

In [21]:
class MHSelfAttention(nn.Module):
    def __init__(self, embed_dim, num_heads):
        super().__init__()
        self.embed_dim = embed_dim
        self.num_heads = num_heads
        self.head_dim = embed_dim // num_heads
        
        assert (self.num_heads * self.head_dim == self.embed_dim)
        
        self.w_queries = nn.Linear(self.embed_dim, self.embed_dim, bias=False)
        self.w_keys = nn.Linear(self.embed_dim, self.embed_dim, bias=False)
        self.w_values = nn.Linear(self.embed_dim, self.embed_dim, bias=False)
        
    def forward(self, x):
        batch_size = x.shape[0]
        senetence_len = x.shape[1]
        queries = self.w_queries(x).reshape(
            batch_size, senetence_len, self.num_heads, self.head_dim
        ).permute(0, 2, 1, 3)
        keys = self.w_keys(x).reshape(batch_size, senetence_len, self.num_heads, self.head_dim).permute(
            0, 2, 3, 1
        )
        values = self.w_values(x).reshape(
            batch_size, senetence_len, self.num_heads, self.head_dim
        ).permute(0, 2, 3, 1)
        attention_score = torch.einsum('bijk, bikl -> bijl', queries, keys)
        attention_dist = torch.softmax(attention_score / (self.embed_dim ** (1/2)), dim=1)
        attention_out = torch.einsum('bijk, bikl -> bijl', attention_dist, values)
        concat_out = attention_out.permute(0, 2, 3, 1).reshape(
            batch_size, senetence_len, self.embed_dim
        )
        return concat_out

The actual transformer block

In [None]:
class TransformerEncoder(nn.Module):
    def __init__(self, embed_dim, num_heads, forward_expansion, dropout=0.1):
        super().__init__()
        self.attention = MHSelfAttention(embed_dim, num_heads)
        self.norm1 = nn.LayerNorm(embed_dim)
        self.norm2 = nn.LayerNorm(embed_dim)
        
        self.feed_forward = nn.Sequential(
            nn.Linear(embed_dim, forward_expansion * embed_dim),
            nn.ReLU(),
            nn.Linear(forward_expansion * embed_dim, embed_dim),
        )
        self.dropout = nn.Dropout(dropout)
        
    def forward(self, x):
        attention_out = self.dropout(self.attention(x))
        x = self.norm1(x + attention_out)
        forward_out = self.dropout(self.feed_forward(x))
        out = self.norm2(x + forward_out)
        return out