In [None]:
!pip install torch
!pip install tqdm
!pip install sentencepiece

In [1]:
!git clone https://github.com/kunalkushwahatg/transformer_from_scratch.git
!cd transformer_from_scratch

Cloning into 'transformer_from_scratch'...
remote: Enumerating objects: 52, done.[K
remote: Counting objects: 100% (52/52), done.[K
remote: Compressing objects: 100% (42/42), done.[K
remote: Total 52 (delta 10), reused 41 (delta 6), pack-reused 0[K
Receiving objects: 100% (52/52), 580.68 KiB | 7.00 MiB/s, done.
Resolving deltas: 100% (10/10), done.


In [32]:
import torch
import string
import torch.nn as nn
import torch.nn.functional as F
import time
import math
import tqdm
from tokenizers import ByteLevelBPETokenizer
from torch.utils.data import Dataset, DataLoader
import sentencepiece as spm

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("Current device:", device)

Current device: cuda


In [44]:
# Load configurations
DEVICE = device
BATCH_SIZE = 128
MAX_LEN = 10
DMODEL = 512
EPOCHS = 10
LEARNING_RATE = 0.001
VOCAB_SIZE = 10000


In [45]:


class TextDataset(Dataset):
    def __init__(self, file_path, max_len, vocab_size):
        self.file_path = file_path
        self.max_len = max_len
        self.vocab_size = vocab_size
        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        self.sp_model_path = 'spm.model'
        self._prepare_data()

    def _prepare_data(self):
        self._train_tokenizer()
        self._load_tokenizer()
        self.tokens_num = self._tokenize_text()
        self.sentence, self.label = self._create_sequences(self.tokens_num)
        self.vocab_size = self._get_vocab_size()

    def _train_tokenizer(self):
        spm.SentencePieceTrainer.train(
            input=self.file_path, model_prefix='spm', vocab_size=self.vocab_size,
            pad_id=0, unk_id=1, bos_id=2, eos_id=3, user_defined_symbols=['<pad>', '<bos>', '<eos>']
        )

    def _load_tokenizer(self):
        self.sp = spm.SentencePieceProcessor()
        self.sp.load(self.sp_model_path)

    def _tokenize_text(self):
        with open(self.file_path, "r", encoding="utf-8") as file:
            text = file.read().replace("\n", " ").lower()
        text = ''.join(char for char in text if char not in string.punctuation)
        tokens = self.sp.encode(text)
        return tokens

    def _create_sequences(self, tokens_num):
        x = []
        y = []
        for i in range(len(tokens_num) - self.max_len - 1):
            x.append(tokens_num[i:self.max_len + i])
            y.append(tokens_num[self.max_len + i])
        sentence = torch.Tensor(x).long()
        label = torch.Tensor(y).long()
        return sentence, label

    def _get_vocab_size(self):
        return self.sp.get_piece_size()

    def __len__(self):
        return len(self.sentence)

    def __getitem__(self, idx):
        return self.sentence[idx], self.label[idx]

    def decode(self, token):
        token = token.cpu().numpy()
        return self.sp.decode(token.tolist())



In [46]:
dataset = TextDataset("/content/transformer_from_scratch/data/input.txt", max_len=MAX_LEN, vocab_size=VOCAB_SIZE)
print("Vocabulary size:", dataset.vocab_size)

Vocabulary size: 10000


In [47]:
class PositionalEncoding(nn.Module):
    '''
    Converts the vector embedding of a batch of sequences to their positional encoding vectors.

    Arguments:
            shape : shape of embedding vector => tuple(batch_size, max_len, dmodel)
            device : device to perform the computation on (e.g., 'cpu' or 'cuda')

    Returns:
            positional encoded vector

    '''
    def __init__(self, shape, device='cpu'):
        super(PositionalEncoding, self).__init__()
        self.max_len = shape[1]
        self.dmodel = shape[2]
        self.device = device

        position = torch.arange(0, self.max_len, device=self.device).float().unsqueeze(1)

        div_term = torch.exp(torch.arange(0, self.dmodel, 2, device=self.device).float() * -(math.log(10000.0) / self.dmodel))

        pos_enc = torch.zeros((1, self.max_len, self.dmodel), device=self.device)
        pos_enc[0, :, 0::2] = torch.sin(position * div_term)
        pos_enc[0, :, 1::2] = torch.cos(position * div_term)

        self.pos_enc = pos_enc

    def forward(self, x):
        x = x + self.pos_enc[:, :x.size(1), :]
        return x


In [48]:


class MultiHeadAttention(nn.Module):
    '''
    Multi-Head Attention mechanism for transformer models.

    Arguments:
        dmodel: Dimension of the model
        heads: Number of attention heads

    Methods:
        forward(x): Perform multi-head attention on the input tensor x
    '''
    def __init__(self, dmodel, heads):
        super(MultiHeadAttention, self).__init__()

        self.dmodel = dmodel
        self.heads = heads
        self.head_size = dmodel // heads

        self.k_linear = nn.Linear(dmodel, dmodel)
        self.q_linear = nn.Linear(dmodel, dmodel)
        self.v_linear = nn.Linear(dmodel, dmodel)
        self.out_linear = nn.Linear(dmodel, dmodel)

    def split_heads(self, x, batch_size):
        '''
        Split the last dimension into (heads, head_size) and transpose to shape (batch_size, heads, seq_len, head_size).
        '''
        return x.view(batch_size, -1, self.heads, self.head_size).transpose(1, 2)

    def attention(self, k, q, v):
        '''
        Compute the attention weights and apply them to the value vectors.
        '''
        d_k = q.size(-1)
        scores = torch.matmul(q, k.transpose(-2, -1)) / torch.sqrt(torch.tensor(d_k, dtype=torch.float32, device=q.device))
        attn = F.softmax(scores, dim=-1)
        return torch.matmul(attn, v)

    def forward(self, x):
        '''
        Perform the multi-head attention mechanism on the input tensor x.
        '''
        batch_size = x.size(0)

        K = self.split_heads(self.k_linear(x), batch_size)  # Key: What can I offer
        Q = self.split_heads(self.q_linear(x), batch_size)  # Query: What am I looking for
        V = self.split_heads(self.v_linear(x), batch_size)  # Value: What I actually offer

        attn_output = self.attention(K, Q, V)
        attn_output = attn_output.transpose(1, 2).contiguous().view(batch_size, -1, self.dmodel)

        return self.out_linear(attn_output)


In [49]:

class AddAndNorm(nn.Module):
    '''
    Add and Layer Normalization module for transformer models.

    Arguments:
        dmodel: Dimension of the model

    Methods:
        forward(x, residual): Add the input tensor x and the residual tensor, then apply layer normalization
    '''
    def __init__(self, dmodel):
        super(AddAndNorm, self).__init__()
        self.layer_norm = nn.LayerNorm(dmodel)

    def forward(self, x, residual):
        '''
        Add the input tensor x and the residual tensor, then apply layer normalization.

        Arguments:
            x: Input tensor
            residual: Residual tensor to be added to the input tensor

        Returns:
            Tensor after addition and layer normalization
        '''
        return self.layer_norm(x + residual)


In [50]:
class FeedForward(nn.Module):
    '''
    Position-wise Feed-Forward Network for transformer models with dropout.

    Arguments:
        dmodel: Dimension of the model
        dropout: Dropout probability

    Methods:
        forward(x): Apply the feed-forward network with dropout on the input tensor x
    '''
    def __init__(self, dmodel, dropout=0.1):
        super(FeedForward, self).__init__()
        self.linear1 = nn.Linear(dmodel, dmodel)
        self.relu = nn.ReLU()
        self.dropout = nn.Dropout(dropout)
        self.linear2 = nn.Linear(dmodel, dmodel)

    def forward(self, x):
        '''
        Apply the feed-forward network with dropout on the input tensor x.

        Arguments:
            x: Input tensor

        Returns:
            Tensor after applying the feed-forward network and dropout
        '''
        return self.linear2(self.dropout(self.relu(self.linear1(x))))


In [51]:

import torch
import torch.nn as nn

class Encoder(nn.Module):
    '''
    Transformer Encoder implementation.

    Arguments:
        vocab_size: Size of the vocabulary
        shape: Shape of the input tensor (batch_size, max_len, dmodel)
        heads: Number of attention heads

    Methods:
        forward(x): Forward pass through the encoder
    '''
    def __init__(self, vocab_size, shape, heads=4):
        super(Encoder, self).__init__()
        self.token_embedding_table = nn.Embedding(vocab_size, shape[2])
        self.positional_encoding = PositionalEncoding(shape,device=device)
        self.multi_headed_attention = MultiHeadAttention(shape[2], heads)
        self.add_and_norm1 = AddAndNorm(shape[2])
        self.feed_forward = FeedForward(dmodel=shape[2])
        self.add_and_norm2 = AddAndNorm(shape[2])
        self.linear = nn.Linear(shape[2], 512)
        self.softmax = nn.Softmax(dim=-1)

    def forward(self, x):
        out = self.token_embedding_table(x)
        residual = self.positional_encoding(out)
        out = self.multi_headed_attention(residual)

        residual = self.add_and_norm1(out, residual)

        out = self.feed_forward(residual)
        out = self.add_and_norm2(out, residual)

        out = self.linear(out)

        return out


In [67]:
class Pretraining(nn.Module):
    '''
    Pretraining model for next word prediction using a transformer encoder.

    Arguments:
        vocab_size: Size of the vocabulary
        shape: Shape of the input tensor (batch_size, max_len, dmodel)
        heads: Number of attention heads

    Methods:
        forward(x): Forward pass through the pretraining model
        predict_next_word(x): Predict the next word for the input sequence
    '''
    def __init__(self,n_encoders, vocab_size, shape, heads=4):
        super(Pretraining, self).__init__()
        self.encoder = Encoder(vocab_size, shape, heads)
        self.linear = nn.Linear(shape[2] * shape[1], vocab_size)
        self.softmax = nn.Softmax(dim=-1)

    def forward(self, x):
        out = self.encoder(x)
        out = out.view(out.size(0), -1) #torch.Size([Batch,time*dmodel])
        out = self.linear(out)
        return out


In [68]:
model = Pretraining(n_encoders=3,vocab_size=dataset.vocab_size, shape=(BATCH_SIZE, MAX_LEN, DMODEL), heads=4)
model.to(DEVICE)

Pretraining(
  (encoder): Encoder(
    (token_embedding_table): Embedding(10000, 512)
    (positional_encoding): PositionalEncoding()
    (multi_headed_attention): MultiHeadAttention(
      (k_linear): Linear(in_features=512, out_features=512, bias=True)
      (q_linear): Linear(in_features=512, out_features=512, bias=True)
      (v_linear): Linear(in_features=512, out_features=512, bias=True)
      (out_linear): Linear(in_features=512, out_features=512, bias=True)
    )
    (add_and_norm1): AddAndNorm(
      (layer_norm): LayerNorm((512,), eps=1e-05, elementwise_affine=True)
    )
    (feed_forward): FeedForward(
      (linear1): Linear(in_features=512, out_features=512, bias=True)
      (relu): ReLU()
      (dropout): Dropout(p=0.1, inplace=False)
      (linear2): Linear(in_features=512, out_features=512, bias=True)
    )
    (add_and_norm2): AddAndNorm(
      (layer_norm): LayerNorm((512,), eps=1e-05, elementwise_affine=True)
    )
    (linear): Linear(in_features=512, out_features=

In [None]:

from torch.optim import Adam

# Create a DataLoader
dataloader = DataLoader(dataset, batch_size=BATCH_SIZE, shuffle=True)



criterion = nn.CrossEntropyLoss()
optimizer = Adam(model.parameters(), lr=LEARNING_RATE)

# Training loop
for epoch in range(EPOCHS):
    losses = []
    running_loss = 0.0
    model.train()

    for b in tqdm.tqdm(dataloader, desc=f'Epoch {epoch + 1}/{EPOCHS - 1}'):
        inputs, targets = b[0].to(DEVICE), b[1].to(DEVICE)
        optimizer.zero_grad()
        outputs = model(inputs)
        loss = criterion(outputs, targets)

        loss.backward()
        optimizer.step()

        running_loss += loss.item()
        losses.append(loss.item())

    average_loss = sum(losses) / len(losses)
    print(f'Epoch {epoch + 1}, Average Loss: {average_loss}')

print("Training completed.")


In [None]:
#infrence
TOKEN_GEN = 100
text = "hello this story mostly tells us about how we look "

def infrence(text):
    model.eval()
    for i in range(TOKEN_GEN):
        tokens = dataset.sp.encode(text)

        #acess last MAX_LEN tokens
        tokens = tokens[-MAX_LEN:]

        #convert to tensor
        tokens = torch.Tensor(tokens).long().unsqueeze(0).to(DEVICE)

        #get prediction
        prediction = model(tokens)
        prediction = prediction.squeeze(0)

        #get argmax
        prediction = torch.argmax(prediction,dim=-1)

        #decode
        text += " " + dataset.decode(prediction)

    return text

print(infrence(text))
