<a href="https://colab.research.google.com/github/ryghrmni/Models/blob/main/seq2seq_model.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

**Task:** Translate simple English sentences into French using a Seq2Seq model.

In [57]:
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np

In [58]:
# Dataset: Simple English-French pairs
data = [
    ("I am a student", "je suis un étudiant"),
    ("I love you", "je t'aime"),
    ("Hello", "Bonjour"),
    ("How are you", "comment ça va"),
    ("Thank you", "merci"),
]

In [59]:
# Step 1: Preprocessing
def tokenize(text):
    return text.lower().split()

In [111]:
def build_vocab(sentences):
    vocab = set()
    for sentence in sentences:
        vocab.update(tokenize(sentence))
        #print(vocab)
    vocab = {word: idx + 3 for idx, word in enumerate(vocab)}  # Start indices from 2
    print(vocab)
    vocab['<PAD>'] = 0  # Padding token
    vocab['<SOS>'] = 1  # Start of sentence token
    vocab['<EOS>'] = 2  # End of sentence token
    return vocab

In [112]:
def sentence_to_indices(sentence, vocab):
    return [vocab[word] for word in tokenize(sentence)]

In [113]:
# Build vocabularies for both English and French
source_sentences, target_sentences = zip(*data)
source_vocab = build_vocab(source_sentences)
target_vocab = build_vocab(target_sentences)

{'love': 3, 'thank': 4, 'you': 5, 'hello': 6, 'a': 7, 'i': 8, 'are': 9, 'am': 10, 'how': 11, 'student': 12}
{'étudiant': 3, 'bonjour': 4, 'suis': 5, 'un': 6, 'je': 7, "t'aime": 8, 'ça': 9, 'va': 10, 'comment': 11, 'merci': 12}


In [114]:
source_sentences

('I am a student', 'I love you', 'Hello', 'How are you', 'Thank you')

In [115]:
target_sentences

('je suis un étudiant', "je t'aime", 'Bonjour', 'comment ça va', 'merci')

In [116]:
source_vocab

{'love': 3,
 'thank': 4,
 'you': 5,
 'hello': 6,
 'a': 7,
 'i': 8,
 'are': 9,
 'am': 10,
 'how': 11,
 'student': 12,
 '<PAD>': 0,
 '<SOS>': 1,
 '<EOS>': 2}

In [117]:
target_vocab

{'étudiant': 3,
 'bonjour': 4,
 'suis': 5,
 'un': 6,
 'je': 7,
 "t'aime": 8,
 'ça': 9,
 'va': 10,
 'comment': 11,
 'merci': 12,
 '<PAD>': 0,
 '<SOS>': 1,
 '<EOS>': 2}

In [118]:
# Inverse vocab for decoding
inv_target_vocab = {idx: word for word, idx in target_vocab.items()}

In [119]:
inv_target_vocab

{3: 'étudiant',
 4: 'bonjour',
 5: 'suis',
 6: 'un',
 7: 'je',
 8: "t'aime",
 9: 'ça',
 10: 'va',
 11: 'comment',
 12: 'merci',
 0: '<PAD>',
 1: '<SOS>',
 2: '<EOS>'}

In [121]:
# Parameters
INPUT_DIM = len(source_vocab)
OUTPUT_DIM = len(target_vocab)
EMB_DIM = 32
HIDDEN_DIM = 64
NUM_LAYERS = 1

In [122]:
INPUT_DIM

13

In [123]:
OUTPUT_DIM

13

In [125]:
# Attention class
class Attention(nn.Module):
    def __init__(self, hidden_dim):
        super(Attention, self).__init__()
        self.attn = nn.Linear(hidden_dim * 2, hidden_dim)
        self.v = nn.Linear(hidden_dim, 1, bias=False)

    def forward(self, hidden, encoder_outputs):
        src_len = encoder_outputs.shape[1]
        hidden = hidden[-1].unsqueeze(1).repeat(1, src_len, 1)
        energy = torch.tanh(self.attn(torch.cat((hidden, encoder_outputs), dim=2)))
        attention = self.v(energy).squeeze(2)
        return torch.softmax(attention, dim=1)

In [126]:
# Encoder class
class Encoder(nn.Module):
    def __init__(self, input_dim, emb_dim, hidden_dim, num_layers):
        super(Encoder, self).__init__()
        self.embedding = nn.Embedding(input_dim, emb_dim)
        self.lstm = nn.LSTM(emb_dim, hidden_dim, num_layers, batch_first=True)

    def forward(self, src):
        embedded = self.embedding(src)
        outputs, (hidden, cell) = self.lstm(embedded)
        return outputs, hidden, cell

In [127]:
# Decoder class with Attention
class Decoder(nn.Module):
    def __init__(self, output_dim, emb_dim, hidden_dim, num_layers, attention):
        super(Decoder, self).__init__()
        self.embedding = nn.Embedding(output_dim, emb_dim)
        self.lstm = nn.LSTM(emb_dim + hidden_dim, hidden_dim, num_layers, batch_first=True)
        self.fc = nn.Linear(hidden_dim * 2, output_dim)
        self.attention = attention

    def forward(self, trg, hidden, cell, encoder_outputs):
        trg = trg.unsqueeze(1)
        embedded = self.embedding(trg)

        attn_weights = self.attention(hidden, encoder_outputs)
        attn_weights = attn_weights.unsqueeze(1)

        context = torch.bmm(attn_weights, encoder_outputs)
        lstm_input = torch.cat((embedded, context), dim=2)

        output, (hidden, cell) = self.lstm(lstm_input, (hidden, cell))
        prediction = self.fc(torch.cat((output.squeeze(1), context.squeeze(1)), dim=1))

        return prediction, hidden, cell

In [128]:
# Seq2Seq model
class Seq2Seq(nn.Module):
    def __init__(self, encoder, decoder):
        super(Seq2Seq, self).__init__()
        self.encoder = encoder
        self.decoder = decoder

    def forward(self, src, trg):
        encoder_outputs, hidden, cell = self.encoder(src)
        outputs = torch.zeros(trg.size(0), trg.size(1), OUTPUT_DIM).to(src.device)
        input_token = trg[:, 0]
        for t in range(1, trg.size(1)):
            output, hidden, cell = self.decoder(input_token, hidden, cell, encoder_outputs)
            outputs[:, t, :] = output
            input_token = trg[:, t]
        return outputs

In [129]:
# Instantiate the model with attention
attention = Attention(HIDDEN_DIM)
encoder = Encoder(INPUT_DIM, EMB_DIM, HIDDEN_DIM, NUM_LAYERS)
decoder = Decoder(OUTPUT_DIM, EMB_DIM, HIDDEN_DIM, NUM_LAYERS, attention)
model = Seq2Seq(encoder, decoder)

In [130]:
model

Seq2Seq(
  (encoder): Encoder(
    (embedding): Embedding(13, 32)
    (lstm): LSTM(32, 64, batch_first=True)
  )
  (decoder): Decoder(
    (embedding): Embedding(13, 32)
    (lstm): LSTM(96, 64, batch_first=True)
    (fc): Linear(in_features=128, out_features=13, bias=True)
    (attention): Attention(
      (attn): Linear(in_features=128, out_features=64, bias=True)
      (v): Linear(in_features=64, out_features=1, bias=False)
    )
  )
)

In [131]:
# Training setup
criterion = nn.CrossEntropyLoss(ignore_index=0)  # Ignore padding index
optimizer = optim.Adam(model.parameters(), lr=0.001)

In [132]:
def prepare_data(data, source_vocab, target_vocab):
    source_indices = [sentence_to_indices(src, source_vocab) for src, _ in data]
    target_indices = [[1] + sentence_to_indices(trg, target_vocab) + [2] for _, trg in data]  # Add <EOS> token
    max_len = max(len(seq) for seq in target_indices)
    source_padded = [seq + [0] * (max_len - len(seq)) for seq in source_indices]
    target_padded = [seq + [0] * (max_len - len(seq)) for seq in target_indices]
    return torch.tensor(source_padded), torch.tensor(target_padded)

In [133]:
source_tensor, target_tensor = prepare_data(data, source_vocab, target_vocab)

In [134]:
source_tensor

tensor([[ 8, 10,  7, 12,  0,  0],
        [ 8,  3,  5,  0,  0,  0],
        [ 6,  0,  0,  0,  0,  0],
        [11,  9,  5,  0,  0,  0],
        [ 4,  5,  0,  0,  0,  0]])

In [135]:
target_tensor

tensor([[ 1,  7,  5,  6,  3,  2],
        [ 1,  7,  8,  2,  0,  0],
        [ 1,  4,  2,  0,  0,  0],
        [ 1, 11,  9, 10,  2,  0],
        [ 1, 12,  2,  0,  0,  0]])

In [136]:
# Training loop
num_epochs = 1000
for epoch in range(num_epochs):
    model.train()
    optimizer.zero_grad()
    output = model(source_tensor, target_tensor)
    output_dim = output.shape[-1]
    loss = criterion(output[:, 1:].reshape(-1, output_dim), target_tensor[:, 1:].reshape(-1))
    loss.backward()
    optimizer.step()

    if epoch % 100 == 0:
        print(f'Epoch {epoch}, Loss: {loss.item()}')

Epoch 0, Loss: 2.5551395416259766
Epoch 100, Loss: 0.052865270525217056
Epoch 200, Loss: 0.012262150645256042
Epoch 300, Loss: 0.005883478093892336
Epoch 400, Loss: 0.003535899566486478
Epoch 500, Loss: 0.00239496654830873
Epoch 600, Loss: 0.0017436250345781446
Epoch 700, Loss: 0.0013334167888388038
Epoch 800, Loss: 0.0010565038537606597
Epoch 900, Loss: 0.0008597331470809877


In [137]:
def translate(sentence, model, source_vocab, target_vocab, inv_target_vocab, max_len=10):
    model.eval()
    indices = sentence_to_indices(sentence, source_vocab)
    src_tensor = torch.tensor([indices + [0] * (max_len - len(indices))])

    # Correctly unpack the encoder outputs
    encoder_outputs, hidden, cell = model.encoder(src_tensor)

    trg_indices = [1]  # <SOS> token
    for _ in range(max_len):
        trg_tensor = torch.tensor([trg_indices[-1]])
        output, hidden, cell = model.decoder(trg_tensor, hidden, cell, encoder_outputs)
        pred_token = output.argmax(1).item()
        trg_indices.append(pred_token)

        # Stop if the model predicts the <EOS> token
        if pred_token == 2:  # Assuming 2 is the <EOS> token in target_vocab
            break

    return ' '.join(inv_target_vocab[idx] for idx in trg_indices[1:] if idx not in [0, 1, 2])  # Exclude <PAD>, <SOS>, <EOS>

In [138]:
# Testing the model
test_sentence = "I love you"
translation = translate(test_sentence, model, source_vocab, target_vocab, inv_target_vocab)
print(f'Translation: {translation}')

Translation: je t'aime
