In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
import torch.utils.data as data
import torch.nn.functional as F
import os
import math
import copy

In [None]:
directory_path = '/content/drive/MyDrive/NLP/v2/en-hi'

en_file_name = 'train.en'
hi_file_name = 'train.hi'

en_file_path = os.path.join(directory_path, en_file_name)
hi_file_path = os.path.join(directory_path, hi_file_name)

In [None]:
with open(en_file_path, 'r', encoding='utf-8') as en_file:
    en_sentences = [line.strip() for line in en_file.readlines()]

with open(hi_file_path, 'r', encoding='utf-8') as hi_file:
    hi_sentences = [line.strip() for line in hi_file.readlines()]

# Print the first few sentences as a sample
print("Sample English Sentences:")
print(en_sentences[:3])

print("\nSample Hindi Sentences:")
print(hi_sentences[:3])

Sample English Sentences:
["However, Paes, who was partnering Australia's Paul Hanley, could only go as far as the quarterfinals where they lost to Bhupathi and Knowles", 'Whosoever desires the reward of the world, with Allah is the reward of the world and of the Everlasting Life. Allah is the Hearer, the Seer.', 'The value of insects in the biosphere is enormous because they outnumber all other living groups in measure of species richness.']

Sample Hindi Sentences:
['आस्ट्रेलिया के पाल हेनली के साथ जोड़ी बनाने वाले पेस मियामी में क्वार्टरफाइनल तक ही पहुंच सके क्योंकि इस दौर में उन्हें भूपति और नोल्स ने हराया था।', 'और जो शख्स (अपने आमाल का) बदला दुनिया ही में चाहता है तो ख़ुदा के पास दुनिया व आख़िरत दोनों का अज्र मौजूद है और ख़ुदा तो हर शख्स की सुनता और सबको देखता है', 'जैव-मंडल में कीड़ों का मूल्य बहुत है, क्योंकि प्रजातियों की समृद्धि के मामले में उनकी संख्या अन्य जीव समूहों से ज़्यादा है।']


In [None]:
en_sentences = en_sentences[:6000]
hi_sentences = hi_sentences[:6000]

## Attention and Encoding

In [None]:
class MultiHeadAttention(nn.Module):
    def __init__(self, d_model, num_heads):
        super(MultiHeadAttention, self).__init__()

        assert d_model % num_heads == 0, "d_model must be divisible by num_heads"


        self.d_model = d_model
        self.num_heads = num_heads
        self.d_k = d_model // num_heads

        self.W_q = nn.Linear(d_model, d_model)
        self.W_k = nn.Linear(d_model, d_model)
        self.W_v = nn.Linear(d_model, d_model)
        self.W_o = nn.Linear(d_model, d_model)

    def scaled_dot_product_attention(self, Q, K, V, mask=None):

        attn_scores = torch.matmul(Q, K.transpose(-2, -1)) / math.sqrt(self.d_k)

        if mask is not None:
            attn_scores = attn_scores.masked_fill(mask == 0, -1e9)

        attn_probs = torch.softmax(attn_scores, dim=-1)

        output = torch.matmul(attn_probs, V)
        return output

    def split_heads(self, x):
        batch_size, seq_length, d_model = x.size()
        return x.view(batch_size, seq_length, self.num_heads, self.d_k).transpose(1, 2)

    def combine_heads(self, x):
        batch_size, _, seq_length, d_k = x.size()
        return x.transpose(1, 2).contiguous().view(batch_size, seq_length, self.d_model)

    def forward(self, Q, K, V, mask=None):
        Q = self.split_heads(self.W_q(Q))
        K = self.split_heads(self.W_k(K))
        V = self.split_heads(self.W_v(V))

        attn_output = self.scaled_dot_product_attention(Q, K, V, mask)
        output = self.W_o(self.combine_heads(attn_output))
        return output

In [None]:
class PositionWiseFeedForward(nn.Module):
    def __init__(self, d_model, d_ff):
        super(PositionWiseFeedForward, self).__init__()
        self.fc1 = nn.Linear(d_model, d_ff)
        self.fc2 = nn.Linear(d_ff, d_model)
        self.relu = nn.ReLU()

    def forward(self, x):
        return self.fc2(self.relu(self.fc1(x)))

In [None]:
class PositionalEncoding(nn.Module):
    def __init__(self, d_model, max_seq_length):
        super(PositionalEncoding, self).__init__()

        pe = torch.zeros(max_seq_length, d_model)
        position = torch.arange(0, max_seq_length, dtype=torch.float).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * -(math.log(10000.0) / d_model))

        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)

        self.register_buffer('pe', pe.unsqueeze(0))

    def forward(self, x):
        return x + self.pe[:, :x.size(1)]

## Encoder and Decoder blocks

In [None]:
class EncoderLayer(nn.Module):
    def __init__(self, d_model, num_heads, d_ff, dropout):
        super(EncoderLayer, self).__init__()
        self.self_attn = MultiHeadAttention(d_model, num_heads)
        self.feed_forward = PositionWiseFeedForward(d_model, d_ff)
        self.norm1 = nn.LayerNorm(d_model)
        self.norm2 = nn.LayerNorm(d_model)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x, mask):
        attn_output = self.self_attn(x, x, x, mask)
        x = self.norm1(x + self.dropout(attn_output))
        ff_output = self.feed_forward(x)
        x = self.norm2(x + self.dropout(ff_output))
        return x

In [None]:
class DecoderLayer(nn.Module):
    def __init__(self, d_model, num_heads, d_ff, dropout):
        super(DecoderLayer, self).__init__()
        self.self_attn = MultiHeadAttention(d_model, num_heads)
        self.cross_attn = MultiHeadAttention(d_model, num_heads)
        self.feed_forward = PositionWiseFeedForward(d_model, d_ff)
        self.norm1 = nn.LayerNorm(d_model)
        self.norm2 = nn.LayerNorm(d_model)
        self.norm3 = nn.LayerNorm(d_model)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x, enc_output, src_mask, tgt_mask):
        attn_output = self.self_attn(x, x, x, tgt_mask)
        x = self.norm1(x + self.dropout(attn_output))
        attn_output = self.cross_attn(x, enc_output, enc_output, src_mask)
        x = self.norm2(x + self.dropout(attn_output))
        ff_output = self.feed_forward(x)
        x = self.norm3(x + self.dropout(ff_output))
        return x

## Transformer Model

In [None]:
class Transformer(nn.Module):
    def __init__(self, src_vocab_size, tgt_vocab_size, d_model, num_heads, num_layers, d_ff, max_seq_length, dropout):
        super(Transformer, self).__init__()
        self.encoder_embedding = nn.Embedding(src_vocab_size, d_model)
        self.decoder_embedding = nn.Embedding(tgt_vocab_size, d_model)
        self.positional_encoding = PositionalEncoding(d_model, max_seq_length)

        self.encoder_layers = nn.ModuleList([EncoderLayer(d_model, num_heads, d_ff, dropout) for _ in range(num_layers)])
        self.decoder_layers = nn.ModuleList([DecoderLayer(d_model, num_heads, d_ff, dropout) for _ in range(num_layers)])

        self.fc = nn.Linear(d_model, tgt_vocab_size)
        self.dropout = nn.Dropout(dropout)

    def generate_mask(self, src, tgt):
        src_mask = (src != 0).unsqueeze(1).unsqueeze(2)
        tgt_mask = (tgt != 0).unsqueeze(1).unsqueeze(3)
        seq_length = tgt.size(1)
        nopeak_mask = (1 - torch.triu(torch.ones(1, seq_length, seq_length), diagonal=1)).bool()
        tgt_mask = tgt_mask & nopeak_mask
        return src_mask, tgt_mask

    def forward(self, src, tgt):
        src_mask, tgt_mask = self.generate_mask(src, tgt)
        src_embedded = self.dropout(self.positional_encoding(self.encoder_embedding(src)))
        tgt_embedded = self.dropout(self.positional_encoding(self.decoder_embedding(tgt)))

        enc_output = src_embedded
        for enc_layer in self.encoder_layers:
            enc_output = enc_layer(enc_output, src_mask)

        dec_output = tgt_embedded
        for dec_layer in self.decoder_layers:
            dec_output = dec_layer(dec_output, enc_output, src_mask, tgt_mask)

        output = self.fc(dec_output)
        return output

## Load and Preprocess data

In [None]:
import torch
from sklearn.model_selection import train_test_split

def tokenize_sentences(sentences, vocab, max_length):
    tokenized = []
    for sentence in sentences:
        tokens = [vocab[word] if word in vocab else vocab["<UNK>"] for word in sentence.split()]
        tokens += [vocab["<PAD>"]] * (max_length - len(tokens))  # Pad to the maximum length
        tokenized.append(tokens)
    return tokenized

def build_vocab(sentences):
    vocab = {"<PAD>": 0, "<SOS>": 1, "<EOS>": 2, "<UNK>": 3}  # Special tokens for padding, start of sequence, end of sequence, and unknown words
    idx = len(vocab)

    for sentence in sentences:
        for word in sentence.split():
            if word not in vocab:
                vocab[word] = idx
                idx += 1

    return vocab

def preprocess_and_split_data(en_sentences, hi_sentences, test_size=0.2, random_state=42):

    en_vocab = build_vocab(en_sentences)
    hi_vocab = build_vocab(hi_sentences)

    en_max_length = max(len(sentence.split()) for sentence in en_sentences)
    hi_max_length = max(len(sentence.split()) for sentence in hi_sentences)

    en_tokenized = tokenize_sentences(en_sentences, en_vocab, 200)
    hi_tokenized = tokenize_sentences(hi_sentences, hi_vocab, 200)

    src_data = torch.tensor(en_tokenized)
    tgt_data = torch.tensor(hi_tokenized)

    src_train, src_test, tgt_train, tgt_test = train_test_split(src_data, tgt_data, test_size=test_size, random_state=random_state)

    return src_train, src_test, tgt_train, tgt_test, en_vocab, hi_vocab


src_train, src_test, tgt_train, tgt_test, en_vocab, hi_vocab = preprocess_and_split_data(en_sentences, hi_sentences)

## Train

In [None]:
src_vocab_size = len(en_vocab)
tgt_vocab_size = len(hi_vocab)
d_model = 512
num_heads = 8
num_layers = 6
d_ff = 2048
max_seq_length = 200
dropout = 0.1

transformer = Transformer(src_vocab_size, tgt_vocab_size, d_model, num_heads, num_layers, d_ff, max_seq_length, dropout)

criterion = nn.CrossEntropyLoss(ignore_index=0)
optimizer = optim.Adam(transformer.parameters(), lr=0.0001, betas=(0.9, 0.98), eps=1e-9)


transformer.train()

for epoch in range(100):
    optimizer.zero_grad()
    output = transformer(src_train, tgt_train[:, :-1])
    loss = criterion(output.contiguous().view(-1, tgt_vocab_size), tgt_train[:, 1:].contiguous().view(-1))
    loss.backward()
    optimizer.step()
    print(f"Epoch: {epoch+1}, Loss: {loss.item()}")

Epoch: 1, Loss: 6.634443283081055
Epoch: 2, Loss: 6.068175315856934
Epoch: 3, Loss: 5.88072395324707
Epoch: 4, Loss: 5.763003349304199
Epoch: 5, Loss: 5.668879985809326
Epoch: 6, Loss: 5.54315185546875
Epoch: 7, Loss: 5.3716630935668945
Epoch: 8, Loss: 5.170507431030273
Epoch: 9, Loss: 4.960590362548828
Epoch: 10, Loss: 4.772012710571289
Epoch: 11, Loss: 4.598611354827881
Epoch: 12, Loss: 4.409081935882568
Epoch: 13, Loss: 4.2277703285217285
Epoch: 14, Loss: 4.056327819824219
Epoch: 15, Loss: 3.8969850540161133
Epoch: 16, Loss: 3.7167510986328125
Epoch: 17, Loss: 3.5563735961914062
Epoch: 18, Loss: 3.4014415740966797
Epoch: 19, Loss: 3.243086338043213
Epoch: 20, Loss: 3.0986011028289795
Epoch: 21, Loss: 2.9604973793029785
Epoch: 22, Loss: 2.810966730117798
Epoch: 23, Loss: 2.693506956100464
Epoch: 24, Loss: 2.5489792823791504
Epoch: 25, Loss: 2.4287033081054688
Epoch: 26, Loss: 2.2963860034942627
Epoch: 27, Loss: 2.174686908721924
Epoch: 28, Loss: 2.0574753284454346
Epoch: 29, Loss: 1.

## Evaluate

In [None]:
def tensor_to_sentence(tensor, vocab, remove_pad=True):
    sentence = []
    for num in tensor:
      word = list(vocab.keys())[list(vocab.values()).index(num.item())]
      sentence.append(word)

    if remove_pad:
          sentence = [word for word in sentence if word != '<PAD>']

    return " ".join(sentence)

In [None]:
transformer.eval()

with torch.no_grad():
    val_output = transformer(src_test, tgt_test[:, :-1])

    val_loss = criterion(val_output.contiguous().view(-1, tgt_vocab_size), tgt_test[:, 1:].contiguous().view(-1))
    print(f"Validation Loss: {val_loss.item()}")

Validation Loss: 6.552799701690674


## Example output

In [None]:
input_tensor = src_test[0]
input_sentence = tensor_to_sentence(input_tensor, en_vocab)
print("Input: ", input_sentence)


target_tensor = tgt_test[0]
target_sentence = tensor_to_sentence(target_tensor, hi_vocab)
print("Expected: ", target_sentence)


predicted_tensor = val_output.argmax(dim=-1)
predicted_sentence = tensor_to_sentence(predicted_tensor[0], hi_vocab)
print("Predicted: ", target_sentence)


Input:  However, Paes, who was partnering Australia's Paul Hanley, could only go as far as the quarterfinals where they lost to Bhupathi and Knowles
Expected:  आस्ट्रेलिया के पाल हेनली के साथ जोड़ी बनाने वाले पेस मियामी में क्वार्टरफाइनल तक ही पहुंच सके क्योंकि इस दौर में उन्हें भूपति और नोल्स ने हराया था।
Predicted:  आस्ट्रेलिया के पाल हेनली के साथ जोड़ी बनाने वाले पेस मियामी में क्वार्टरफाइनल तक ही पहुंच सके क्योंकि इस दौर में उन्हें भूपति और नोल्स ने हराया था।
