In [51]:
from torch.utils.data import Dataset
import torch.nn as nn
import numpy as np
np.random.seed(42)
import torch
import json
from pickle import load
import torchtext
import torch
import numpy as np
from torch.optim import Adam
from torch.nn import CrossEntropyLoss
device='cuda' if torch.cuda.is_available() else 'cpu'
print(device)


cuda


## <font color='red'> Custom Tokenizer to tokenize the text data </font>

In [52]:
class CustomTokenizer:
    def __init__(self, dataset=None, vocab=None):
        # Initialize the CustomTokenizer with a dataset and a custom vocabulary (if provided)

        # Create a custom vocabulary from the dataset if no vocabulary is provided
        if vocab is None:
            self.vocab = {}
            self.pad_index = 0
            self.vocab['<pad>'] = self.pad_index
            self.sos_index = len(self.vocab)
            self.vocab['<sos>'] = self.sos_index
            self.eos_index = len(self.vocab)
            self.vocab['<eos>'] = self.eos_index

            # Iterate through the dataset to build the vocabulary
            for text in dataset:
                # Exclude the first and last 5 characters from each text
                text = text[5:-5]

                # Tokenize the text and add tokens to the vocabulary
                for token in text.split():
                    if token not in self.vocab:
                        self.vocab[token] = len(self.vocab)
        else:
            # Use the provided vocabulary
            self.vocab = vocab

    def get_vocab(self):
        # Get the custom vocabulary
        return self.vocab

    def tokenize(self, text, max_length):
        # Tokenize a text using the custom vocabulary

        # Exclude the first and last 5 characters from the text
        text = text[5:-5]

        # Initialize the encoded vector with the '<sos>' token
        encoded_vector = [self.vocab['<sos>']]

        # Remove non-alphanumeric characters from the text
        text = ''.join([i for i in text if i.isalnum() or i == ' '])

        # Tokenize the text and add tokens to the encoded vector
        for i, token in enumerate(text.split()):
            if i >= max_length - 2:
                break
            if token in self.vocab:
                encoded_vector.append(self.vocab[token])

        # Add the '<eos>' token to the end of the encoded vector
        encoded_vector.append(self.vocab['<eos>'])

        # Add padding to the encoded vector if its length is less than max_length
        if len(encoded_vector) < max_length:
            # Add padding till max_length
            for i in range(max_length - len(encoded_vector)):
                encoded_vector.append(self.vocab['<pad>'])

        # Return the encoded vector as a PyTorch tensor
        return torch.tensor(encoded_vector)

    def tokenize_batch(self, text_list, max_len):
        # Tokenize a list of texts using the custom vocabulary
        encoded_batch = torch.stack([self.tokenize(text, max_len) for text in text_list])
        return encoded_batch

    def get_vocab_size(self):
        # Get the size of the custom vocabulary
        return len(self.vocab)


## <font color='red'> Dataloader class to load the data, split into train and test </font>

In [53]:
class TranslationDataloader(Dataset):
    def __init__(self, filepath, max_sentences):
        # Initialize the TranslationDataloader with a file path and maximum number of sentences

        # Load English and German data from the file
        english_data, german_data = self.load_data_from_file(filepath)

        # Limit the data to the specified maximum number of sentences
        self.english_data = english_data[:max_sentences]
        self.german_data = german_data[:max_sentences]

        # Shuffle indices for later use in train/test split
        self.shuffle_indices = np.arange(len(self.english_data))
        np.random.shuffle(self.shuffle_indices)

        # Define train/test split percentages
        self.train_split = 0.85
        self.test_split = 0.15

        # Get train and test split data
        self.eng_train_data, self.ger_train_data, self.eng_test_data, self.ger_test_data = self.get_train_test_split()

        print("Train Data Size: ", len(self.eng_train_data))
        print("Test Data Size: ", len(self.eng_test_data))

        # Create CustomTokenizers for English and German vocabularies
        english_vocab = CustomTokenizer(dataset=self.eng_train_data)
        german_vocab = CustomTokenizer(dataset=self.ger_train_data)

        # Get vocabularies from the CustomTokenizers
        self.eng_tokenizer = english_vocab.get_vocab()
        self.ger_tokenizer = german_vocab.get_vocab()

        # Set maximum lengths for German and English sentences
        self.max_german_length = 33
        self.max_english_length = 33
        print("Max German Sentence Length: ", self.max_german_length)
        print("Max English Sentence Length: ", self.max_english_length)

        # Save English and German tokenizers to JSON files
        with open('eng_tokenizer.json', 'w') as fp:
            json.dump(self.eng_tokenizer, fp)
        with open('ger_tokenizer.json', 'w') as fp:
            json.dump(self.ger_tokenizer, fp)
        print("English Vocab Size: ", len(self.eng_tokenizer))
        print("German Vocab Size: ", len(self.ger_tokenizer))


    def __len__(self):
        # Return the length of the English data
        return len(self.english_data)

    def __getitem__(self, idx):
        # Return the English and German data at the specified index
        return self.english_data[idx], self.german_data[idx]

    def load_data_from_file(self, filepath):
        # Load data from a file using pickle
        dataset = load(open(filepath, 'rb'))
        # Prepend '<sos>' and append '<eos>' to each English and German sentence
        english_sentences = ['<sos>' + sentence[0] + '<eos>' for sentence in dataset]
        german_sentences = ['<sos>' + sentence[1] + '<eos>' for sentence in dataset]
        return english_sentences, german_sentences

    def get_train_test_split(self):
        # Get train/test split based on shuffle indices
        train_end_index = int(self.train_split * len(self.english_data))
        eng_train_data = self.english_data[self.shuffle_indices[:train_end_index]]
        ger_train_data = self.german_data[self.shuffle_indices[:train_end_index]]
        eng_test_data = self.english_data[self.shuffle_indices[train_end_index:]]
        ger_test_data = self.german_data[self.shuffle_indices[train_end_index:]]
        return eng_train_data, ger_train_data, eng_test_data, ger_test_data

    def create_encoded_data(self, data, vocab, max_len):
        # Create encoded data tensor for a given data, vocabulary, and maximum length
        encoded_data = torch.zeros((len(data), max_len), dtype=torch.long)
        for index, text in enumerate(data):
            encoded_vector = self.create_encoded_vector(text, vocab, max_len)
            encoded_data[index] = torch.tensor(encoded_vector, dtype=torch.long)
        return encoded_data

    def create_encoded_vector(self, text, vocab, max_len):
        # Create an encoded vector for a given text, vocabulary, and maximum length
        text = text[5:-5]
        encoded_vector = [vocab['<sos>']]
        for i, token in enumerate(text.split()):
            if i >= max_len - 2:
                break
            if token in vocab:
                encoded_vector.append(vocab[token])
        encoded_vector.append(vocab['<eos>'])
        for k in range(max_len - len(encoded_vector)):
            encoded_vector.append(vocab['<pad>'])
        return encoded_vector

    def get_encoded_data(self):
        # Get encoded data tensors for train and test sets
        train_encoded_eng = self.create_encoded_data(self.eng_train_data, self.eng_tokenizer, self.max_english_length)
        train_encoded_ger = self.create_encoded_data(self.ger_train_data, self.ger_tokenizer, self.max_german_length)
        test_encoded_eng = self.create_encoded_data(self.eng_test_data, self.eng_tokenizer, self.max_english_length)
        test_encoded_ger = self.create_encoded_data(self.ger_test_data, self.ger_tokenizer, self.max_german_length)
        return train_encoded_eng, train_encoded_ger, test_encoded_eng, test_encoded_ger


## <font color='red'> Word and positional embeddings </font>

In [54]:
class TokenPositionEmbeddings(nn.Module):
    def __init__(self, vocab_size, max_len, embedding_dim):
        # Initialize TokenPositionEmbeddings module with vocabulary size, maximum length, and embedding dimension

        super().__init__()

        # Set vocabulary size, maximum length, and embedding dimension
        self.vocab_size = vocab_size
        self.max_len = max_len
        self.embedding_dim = embedding_dim

        # Token embeddings layer
        self.token_embeddings = nn.Embedding(self.vocab_size, self.embedding_dim)

        # Position embeddings layer with precomputed weights and frozen
        weight_position_embeddings = self.get_position_encoding(max_len, embedding_dim)
        self.position_embeddings = nn.Embedding(self.max_len, self.embedding_dim, _weight=weight_position_embeddings, _freeze=True)

    def get_position_encoding(self, seq_length, hidden_size, n=10000):
        # Generate position encoding based on the given sequence length, hidden size, and frequency (n)

        position_enc = torch.zeros(seq_length, hidden_size)

        for pos in range(seq_length):
            for i in range(hidden_size // 2):
                position_enc[pos, 2*i] = torch.sin(torch.tensor(pos / (n**(2*i / hidden_size))))
                position_enc[pos, 2*i+1] = torch.cos(torch.tensor(pos / (n**((2*i+1) / hidden_size))))

        return position_enc

    def forward(self, inputs):
        # Forward pass of the TokenPositionEmbeddings module

        # inputs is of shape [batch_size, max_len]
        batch_size, max_len = inputs.shape

        # Create position indices
        position_indices = torch.arange(max_len).to(device)

        # Get token embeddings
        token_embeddings = self.token_embeddings(inputs)

        # Get position embeddings
        position_embeddings = self.position_embeddings(position_indices)

        # Add both token and position embeddings
        embeddings = token_embeddings + position_embeddings

        return embeddings


## <font color='red'> ScaledDotProduct and Multihead attention (the secret sauce) </font>

In [55]:
import torch.nn as nn

class ScaledDotProductAttention(nn.Module):
    def __init__(self):
        # Initialize ScaledDotProductAttention module
        super().__init__()

    def forward(self, query, key, value, mask=None):
        # Forward pass of the ScaledDotProductAttention module

        # query, key, and value shapes: [B, nh, T, hs]
        B, nh, T, hs = query.shape

        # Transpose key for matrix multiplication
        key = key.transpose(-2, -1)

        # Calculate attention scores
        attention_score = torch.matmul(query, key)

        # Scale the attention scores
        attention_score = attention_score / torch.sqrt(torch.tensor(hs))

        # Apply mask if provided
        if mask is not None:
            attention_score += (mask * -1e9)

        # Apply softmax to get attention weights
        attention_weights = torch.softmax(attention_score, dim=-1)

        # Calculate output using attention weights and value
        output = torch.matmul(attention_weights, value)
        # Output shape: [B, nh, T, hs]

        return output


class MultiHeadAttention(nn.Module):
    def __init__(self, num_heads, embed_size, key_dim, query_dim, value_dim, mask=False):
        # Initialize MultiHeadAttention module with specified parameters
        super().__init__()

        self.num_heads = num_heads
        self.head_size = embed_size // num_heads
        self.key_dim = key_dim
        self.query_dim = query_dim
        self.value_dim = value_dim
        self.mask = mask

        # ScaledDotProductAttention module
        self.attention = ScaledDotProductAttention()

        # Linear layers for key, query, value, and final layer
        self.key_layer = nn.Linear(embed_size, key_dim).to(device)
        self.query_layer = nn.Linear(embed_size, query_dim).to(device)
        self.value_layer = nn.Linear(embed_size, value_dim).to(device)
        self.final_layer = nn.Linear(value_dim, embed_size).to(device)

    def forward(self, query, key, value, mask=None):
        # Forward pass of the MultiHeadAttention module

        # Input shape: [B, T, C]
        B, T, C = query.shape

        # Apply linear layers to key, query, and value
        query = self.query_layer(query)
        key = self.key_layer(key)
        value = self.value_layer(value)

        # Reshape key, query, and value for multiple heads
        query = query.view(B, T, self.num_heads, -1).transpose(1, 2)
        key = key.view(B, T, self.num_heads, -1).transpose(1, 2)
        value = value.view(B, T, self.num_heads, -1).transpose(1, 2)

        # Apply ScaledDotProductAttention
        scaled_attention = self.attention(query, key, value, mask)
        # Scaled_attention shape: [B, T, nh, hs]

        # Transpose and reshape scaled_attention
        scaled_attention = scaled_attention.transpose(1, 2).contiguous().view(B, T, -1)
        # Scaled_attention shape: [B, T, nh*hs]

        # Apply final linear layer
        output = self.final_layer(scaled_attention)
        # Output shape: [B, T, C]

        return output


## <font color='red'> The Encoder block </font>

In [56]:
import torch.nn as nn

class EncoderBlock(nn.Module):
    def __init__(self, num_heads, embed_size, key_dim, query_dim, value_dim):
        # Initialize EncoderBlock module with specified parameters
        super().__init__()

        # MultiHeadAttention module
        self.MultiHeadAttention = MultiHeadAttention(num_heads, embed_size, key_dim, query_dim, value_dim)

        # Layer normalization
        self.layer_norm = nn.LayerNorm(embed_size).to(device)

        # Feed-forward neural network
        self.feed_forward = nn.Sequential(
            nn.Linear(embed_size, 4 * embed_size),
            nn.ReLU(),
            nn.Linear(4 * embed_size, embed_size)
        ).to(device)

    def forward(self, inputs, mask):
        # Forward pass of the EncoderBlock module

        # Apply MultiHeadAttention
        attention = self.MultiHeadAttention(inputs, inputs, inputs, mask)

        # Apply layer normalization and residual connection
        normalized_op1 = self.layer_norm(inputs + attention)

        # Apply feed-forward neural network
        feed_forward_op = self.feed_forward(normalized_op1)

        # Apply layer normalization and another residual connection
        normalized_op2 = self.layer_norm(normalized_op1 + feed_forward_op)

        return normalized_op2


class Encoder(nn.Module):
    def __init__(self, src_max_length, embedding_dim, key_dim, query_dim, value_dim, src_vocab_size, dropout_rate, num_blocks, num_heads):
        # Initialize Encoder module with specified parameters
        super().__init__()

        # Maximum length for positional embeddings
        self.max_length = src_max_length

        # TokenPositionEmbeddings module
        self.token_position_embeddings = TokenPositionEmbeddings(src_vocab_size, src_max_length, embedding_dim)

        # List of EncoderBlocks
        self.encoder_stack = [EncoderBlock(num_heads, embedding_dim, key_dim, query_dim, value_dim) for _ in range(num_blocks)]

    def forward(self, inputs, mask):
        # Forward pass of the Encoder module

        # Apply token and position embeddings
        x = self.token_position_embeddings(inputs)

        # Iterate through the encoder stack
        for encoder_block in self.encoder_stack:
            x = encoder_block(x, mask)

        return x


## <font color='red'> The Decoder Block </font>

In [57]:
import torch.nn as nn

class DecoderBlock(nn.Module):
    def __init__(self, num_heads, embed_size, key_dim, query_dim, value_dim):
        # Initialize DecoderBlock module with specified parameters
        super().__init__()

        # MultiHeadAttention module for masked attention
        self.maskedMultiHeadAttention = MultiHeadAttention(num_heads, embed_size, key_dim, query_dim, value_dim)

        # MultiHeadAttention module for attention with encoder output
        self.multihead_attention = MultiHeadAttention(num_heads, embed_size, key_dim, query_dim, value_dim)

        # Layer normalization
        self.layer_norm = nn.LayerNorm(embed_size).to(device)

        # Feed-forward neural network
        self.feed_forward = nn.Sequential(
            nn.Linear(embed_size, embed_size * 4),
            nn.ReLU(),
            nn.Linear(embed_size * 4, embed_size)
        ).to(device)

    def forward(self, dec_input, enc_output, lookahead_mask, padding_mask):
        # Forward pass of the DecoderBlock module

        # Apply masked multi-head attention
        masked_attn_output = self.maskedMultiHeadAttention(dec_input, dec_input, dec_input, lookahead_mask)

        # Apply layer normalization and residual connection
        normalized_op1 = self.layer_norm(dec_input + masked_attn_output)

        # Apply multi-head attention with encoder output
        attn_output = self.multihead_attention(normalized_op1, enc_output, enc_output, padding_mask)

        # Apply layer normalization and another residual connection
        normalized_op2 = self.layer_norm(normalized_op1 + attn_output)

        # Apply feed-forward neural network
        feed_forward_output = self.feed_forward(normalized_op2)

        # Apply layer normalization and another residual connection
        normalized_op3 = self.layer_norm(normalized_op2 + feed_forward_output)

        return normalized_op3

class Decoder(nn.Module):
    def __init__(self, tar_max_length, embedding_dim, key_dim, value_dim, query_dim, tar_vocab_size, dropout_rate, num_blocks, num_heads, device='cpu'):
        # Initialize Decoder module with specified parameters
        super().__init__()

        # Maximum length for positional embeddings
        self.max_length = tar_max_length

        # TokenPositionEmbeddings module
        self.token_position_embeddings = TokenPositionEmbeddings(tar_vocab_size, tar_max_length, embedding_dim)

        # List of DecoderBlocks
        self.decoder_stack = [DecoderBlock(num_heads, embedding_dim, key_dim, value_dim, query_dim) for _ in range(num_blocks)]

        self.device = device

    def forward(self, inputs, enc_output, lookahead_mask, padding_mask):
        # Forward pass of the Decoder module

        # Apply token and position embeddings
        x = self.token_position_embeddings(inputs)

        # Iterate through the decoder stack
        for decoder_block in self.decoder_stack:
            x = decoder_block(x, enc_output, lookahead_mask, padding_mask)

        return x


## <font color='red'> The complete Transformer Model </font>

In [58]:
import torch.nn as nn

class TransformerModel(nn.Module):
    def __init__(self, src_max_length, tar_max_length, embedding_dim, key_dim, query_dim, value_dim, src_vocab_size, tar_vocab_size, dropout_rate, num_blocks, num_heads, device='cpu'):
        # Initialize TransformerModel module with specified parameters
        super().__init__()

        # Encoder module
        self.encoder = Encoder(src_max_length, embedding_dim, key_dim, query_dim, value_dim, src_vocab_size, dropout_rate, num_blocks, num_heads)

        # Decoder module
        self.decoder = Decoder(tar_max_length, embedding_dim, key_dim, query_dim, value_dim, tar_vocab_size, dropout_rate, num_blocks, num_heads)

        # Final linear layer
        self.final_layer = nn.Linear(embedding_dim, tar_vocab_size)

    def create_padding_mask(self, inputs):
        # Create padding mask for inputs
        mask = torch.zeros(inputs.shape[0], inputs.shape[1]).to(device)
        mask = mask.masked_fill(inputs == 0, 1)
        mask = mask.view(inputs.shape[0], 1, 1, inputs.shape[1])
        return mask

    def create_lookahead_mask(self, inputs):
        # Create lookahead mask for inputs
        mask = torch.triu(torch.ones((inputs.shape[1], inputs.shape[1])), diagonal=1)
        return mask

    def forward(self, enc_inputs, dec_inputs, target):
        # Forward pass of the TransformerModel module

        # Create padding mask for encoder inputs
        padding_mask_enc = self.create_padding_mask(enc_inputs)

        # Create padding mask and lookahead mask for decoder inputs
        padding_mask_dec = self.create_padding_mask(dec_inputs)
        lookahead_mask_dec = self.create_lookahead_mask(dec_inputs).to(device)

        # Combine padding mask and lookahead mask for decoder
        dec_mask = torch.max(padding_mask_dec, lookahead_mask_dec)

        # Forward pass through encoder
        enc_output = self.encoder(enc_inputs, padding_mask_enc)

        # Forward pass through decoder
        dec_output = self.decoder(dec_inputs, enc_output, dec_mask, padding_mask_dec)

        # Forward pass through final linear layer
        output = self.final_layer(dec_output)

        loss = None
        if target is not None:
            # Calculate CrossEntropyLoss if target is provided
            loss_fct = nn.CrossEntropyLoss(ignore_index=0)
            output = output.reshape(-1, output.shape[-1])
            target = target.reshape(-1)
            loss = loss_fct(output, target)

        return output, loss


## <font color='red'> Model Training </font>

In [None]:
import torch
from torch.optim import Adam
from torch.nn import CrossEntropyLoss
from torch.optim.lr_scheduler import LambdaLR

def get_batch_data(batch_size, is_train=True):
    # Get batch data for training or testing
    if is_train:
        eng = train_eng
        ger = train_ger
    else:
        eng = test_eng
        ger = test_ger

    # Get random index as batch starting index
    batch_start = np.random.randint(0, len(eng) - batch_size)
    batch_end_index = batch_start + batch_size
    batch_eng = eng[batch_start:batch_end_index]
    batch_ger = ger[batch_start:batch_end_index]

    return batch_eng, batch_ger

# Optimizer
if __name__ == '__main__':
    ger_max_len = 33
    eng_max_len = 33
    ger_vocab_size = 8663
    eng_vocab_size = 5627
    embedding_size = 256
    num_blocks = 6
    num_heads = 8
    key_dim = 64
    query_dim = 64
    value_dim = 64
    epochs = 10000
    batch_size = 32
    eval_iters = 100

    # Initialize DataLoader
    dataset = TransLationDataloader('/content/drive/MyDrive/kData/english-german_60k.pkl', 15000)
    train_eng, train_ger, test_eng, test_ger = dataset.get_encoded_data()

    # Initialize Transformer Model
    model = TransformerModel(src_max_length=eng_max_len, tar_max_length=ger_max_len, embedding_dim=embedding_size, key_dim=key_dim, query_dim=query_dim,
                             value_dim=value_dim, src_vocab_size=eng_vocab_size, tar_vocab_size=ger_vocab_size, dropout_rate=0.1, num_blocks=num_blocks,
                             num_heads=num_heads, device=device)

    # Move model to device
    model = model.to(device)

    # Learning rate scheduling function
    def rate(step, model_size, factor, warmup):
        if step == 0:
            step = 1
        return factor * (
            model_size ** (-0.5) * min(step ** (-0.5), step * warmup ** (-1.5))
        )

    # Optimizer and learning rate scheduler
    optimizer = Adam(
        model.parameters(), lr=1.0, betas=(0.9, 0.98), eps=1e-9)

    lr_scheduler = LambdaLR(
        optimizer=optimizer,
        lr_lambda=lambda step: rate(
            step, embedding_size, factor=1, warmup=1000
        ),
    )

    @torch.no_grad()
    def estimate_loss():
        # Estimate loss on training and test splits
        out = []
        model.eval()

        for split in [True, False]:
            losses = torch.zeros(eval_iters)
            for k in range(eval_iters):
                eng_batch, ger_batch = get_batch_data(batch_size, split)
                encoder_input = eng_batch[:, 1:].to(device)
                decoder_input = ger_batch[:, :-1].to(device)
                decoder_target = ger_batch[:, 1:].to(device)
                logits, loss = model(encoder_input, decoder_input, decoder_target)
                losses[k] = loss.item()
            out.append(losses.mean())
        model.train()
        return out

    for epoch in range(epochs):
        eng_batch, ger_batch = get_batch_data(batch_size, True)
        encoder_input = eng_batch[:, 1:].to(device)
        decoder_input = ger_batch[:, :-1].to(device)
        decoder_target = ger_batch[:, 1:].to(device)
        logits, loss = model(encoder_input, decoder_input, decoder_target)

        if epoch % 200 == 0:
            train_loss, test_loss = estimate_loss()
            print('Epoch: ', epoch, 'Train Loss: ', train_loss.item(), 'Test Loss: ', test_loss.item())

        optimizer.zero_grad(set_to_none=True)
        loss.backward()
        optimizer.step()
        lr_scheduler.step()

Train Data Size:  12750
Test Data Size:  2250
Max German Sentence Length:  33
Max English Sentence Length:  33
English Vocab Size:  5627
German Vocab Size:  8663
Epoch:  0 Train Loss:  9.159955978393555 Test Loss:  9.149428367614746
Epoch:  200 Train Loss:  6.015683650970459 Test Loss:  5.8538641929626465
Epoch:  400 Train Loss:  5.491262912750244 Test Loss:  5.334099769592285
Epoch:  600 Train Loss:  4.94631814956665 Test Loss:  4.983303070068359
