# Sequence-to-Sequence Model Overview

## Understanding Concept and Definition

A Sequence-to-Sequence (Seq2Seq) model is a type of neural network architecture designed to transform a given sequence of elements, such as words in a sentence, into another sequence. This model is widely used in various natural language processing (NLP) tasks, including machine translation, text summarization, and conversational agents.

### Key Components

1. **Encoder**: The encoder processes the input sequence and compresses the information into a fixed-size context vector. It typically consists of recurrent neural networks (RNNs), long short-term memory networks (LSTMs), or gated recurrent units (GRUs).

2. **Decoder**: The decoder takes the context vector from the encoder and generates the output sequence. It also uses RNNs, LSTMs, or GRUs and often incorporates attention mechanisms to focus on different parts of the input sequence during the generation process.

3. **Attention Mechanism**: An optional component that allows the decoder to selectively focus on different parts of the input sequence, improving the model's performance on longer sequences and complex tasks.

### Applications

- **Machine Translation**: Translating text from one language to another.
- **Text Summarization**: Condensing long documents into shorter summaries.
- **Conversational Agents**: Generating responses in dialogue systems.
- **Speech Recognition**: Converting spoken language into text.

Seq2Seq models have revolutionized NLP by providing a flexible and powerful framework for handling various sequence transformation tasks.

## Implementation of Seq2Seq without Attention


In [18]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import numpy as np

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

class Encoder(nn.Module):
    def __init__(self, input_size, embedding_size, hidden_size, num_layers, dropout=0.5):
        super(Encoder, self).__init__()
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        
        self.embedding = nn.Embedding(input_size, embedding_size)
        self.dropout = nn.Dropout(dropout)
        self.lstm = nn.LSTM(embedding_size, hidden_size, num_layers, 
                           batch_first=True, dropout=dropout if num_layers > 1 else 0)

    def forward(self, x):
        # x shape: (batch_size, seq_length)
        embedded = self.dropout(self.embedding(x))
        # embedded shape: (batch_size, seq_length, embedding_size)
        
        h0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(device)
        c0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(device)
        
        outputs, (hidden, cell) = self.lstm(embedded, (h0, c0))
        return outputs, hidden, cell

class Decoder(nn.Module):
    def __init__(self, output_size, embedding_size, hidden_size, num_layers, dropout=0.5):
        super(Decoder, self).__init__()
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        self.output_size = output_size
        
        self.embedding = nn.Embedding(output_size, embedding_size)
        self.dropout = nn.Dropout(dropout)
        self.lstm = nn.LSTM(embedding_size, hidden_size, num_layers, 
                           batch_first=True, dropout=dropout if num_layers > 1 else 0)
        self.fc = nn.Linear(hidden_size, output_size)

    def forward(self, x, hidden, cell):
        # x shape: (batch_size, 1)
        x = x.unsqueeze(1)
        
        embedded = self.dropout(self.embedding(x))
        # embedded shape: (batch_size, 1, embedding_size)
        
        output, (hidden, cell) = self.lstm(embedded, (hidden, cell))
        prediction = self.fc(output)
        
        # prediction shape: (batch_size, 1, output_size)
        return prediction, hidden, cell

class Seq2Seq(nn.Module):
    def __init__(self, encoder, decoder, device):
        super(Seq2Seq, self).__init__()
        self.encoder = encoder
        self.decoder = decoder
        self.device = device

    def forward(self, src, trg, teacher_forcing_ratio=0.5):
        # src shape: (batch_size, src_seq_length)
        # trg shape: (batch_size, trg_seq_length)
        
        batch_size = src.shape[0]
        trg_len = trg.shape[1]
        trg_vocab_size = self.decoder.output_size
        
        # tensor to store decoder outputs
        outputs = torch.zeros(batch_size, trg_len, trg_vocab_size).to(self.device)
        
        # encoder_outputs shape: (batch_size, src_seq_length, hidden_size)
        # hidden shape: (num_layers, batch_size, hidden_size)
        # cell shape: (num_layers, batch_size, hidden_size)
        encoder_outputs, hidden, cell = self.encoder(src)
        
        # first input to the decoder is the <sos> token
        _input = trg[:, 0]
        
        for t in range(1, trg_len):
            output, hidden, cell = self.decoder(_input, hidden, cell)
            
            # output shape: (batch_size, 1, output_size)
            outputs[:, t] = output.squeeze(1)
            
            # decide if we are going to use teacher forcing or not
            teacher_force = torch.rand(1).item() < teacher_forcing_ratio
            
            # get the highest predicted token from our predictions
            top1 = output.argmax(2).squeeze(1)
            
            # if teacher forcing, use actual next token as next input
            # if not, use predicted token
            _input = trg[:, t] if teacher_force else top1
        
        return outputs

In [2]:
import random

# Source and target sentences for machine translation training
source_sentences = [
    "Hello, how are you?", "What is your name?", "I love programming.",
    "The weather is nice today.", "She is reading a book.", "Where is the nearest bus stop?",
    "Can you help me?", "This is a beautiful place.", "I am learning a new language.",
    "We will go to the park tomorrow.", "I need a cup of coffee.", "He is my best friend.",
    "How much does this cost?", "The sun is shining brightly.", "She enjoys listening to music.",
    "Do you speak English?", "This food tastes amazing.", "Where do you live?",
    "Let's go to the cinema.", "I'm feeling very happy today.", "He is a great teacher.",
    "My favorite color is blue.", "Please turn off the lights.", "I have a pet dog.",
    "The train arrives at 10 AM.", "We should visit the museum.", "She wants to buy a new dress.",
    "It's time to go to bed.", "Could you repeat that, please?", "I'm studying artificial intelligence."
]

target_sentences = [
    "Xin chào, bạn khỏe không?", "Tên bạn là gì?", "Tôi yêu lập trình.",
    "Thời tiết hôm nay đẹp.", "Cô ấy đang đọc sách.", "Trạm xe buýt gần nhất ở đâu?",
    "Bạn có thể giúp tôi không?", "Đây là một nơi đẹp.", "Tôi đang học một ngôn ngữ mới.",
    "Chúng ta sẽ đi công viên vào ngày mai.", "Tôi cần một tách cà phê.", "Anh ấy là bạn thân nhất của tôi.",
    "Cái này giá bao nhiêu?", "Mặt trời đang chiếu sáng rực rỡ.", "Cô ấy thích nghe nhạc.",
    "Bạn có nói tiếng Anh không?", "Món ăn này có vị tuyệt vời.", "Bạn sống ở đâu?",
    "Hãy đi đến rạp chiếu phim.", "Hôm nay tôi cảm thấy rất vui.", "Anh ấy là một giáo viên tuyệt vời.",
    "Màu yêu thích của tôi là màu xanh.", "Vui lòng tắt đèn.", "Tôi có một con chó cưng.",
    "Tàu sẽ đến lúc 10 giờ sáng.", "Chúng ta nên đi thăm bảo tàng.", "Cô ấy muốn mua một chiếc váy mới.",
    "Đã đến lúc đi ngủ rồi.", "Bạn có thể nhắc lại không?", "Tôi đang học trí tuệ nhân tạo."
]

# Create random translation data
translation_data = [{"source": src, "target": tgt} for src, tgt in zip(source_sentences, target_sentences)]

# Save into a list
train_data = list(translation_data)

# Print example data
for example in train_data[:5]:  # Show first 5 examples
    print(example)


{'source': 'Hello, how are you?', 'target': 'Xin chào, bạn khỏe không?'}
{'source': 'What is your name?', 'target': 'Tên bạn là gì?'}
{'source': 'I love programming.', 'target': 'Tôi yêu lập trình.'}
{'source': 'The weather is nice today.', 'target': 'Thời tiết hôm nay đẹp.'}
{'source': 'She is reading a book.', 'target': 'Cô ấy đang đọc sách.'}


In [6]:
# Pre-process data 
word2index = {"<pad>": 0, "<sos>": 1, "<eos>": 2, "<unk>": 3}

# add source_sentences & target_sentences to word2index
for example in train_data:
    for word in example["source"].split():
        if word not in word2index:
            word2index[word] = len(word2index)
    for word in example["target"].split():
        if word not in word2index:
            word2index[word] = len(word2index)

index2word = {index: word for word, index in word2index.items()}
vocab_size = len(word2index)


In [25]:
class MachineTranslateDataset:

    def __init__(self, data, word2index, max_length=50):
        self.data = data
        self.word2index = word2index
        self.max_length = max_length

    def __len__(self):
        return len(self.data)
    
    def pad_sequence(self, sequence, max_len):
        # Truncate if sequence is longer than max_len
        if len(sequence) > max_len:
            return sequence[:max_len]
        # Pad with <pad> token if sequence is shorter
        else:
            return sequence + [self.word2index["<pad>"]] * (max_len - len(sequence))
    
    def __getitem__(self, index):
        src = self.data[index]["source"]
        trg = self.data[index]["target"]
        
        # Convert words to indices
        src_indexes = [self.word2index.get(word, self.word2index["<unk>"]) 
                      for word in src.split()]
        trg_indexes = [self.word2index.get(word, self.word2index["<unk>"]) 
                      for word in trg.split()]
        
        # Add <sos> and <eos> tokens
        src_indexes = [self.word2index["<sos>"]] + src_indexes + [self.word2index["<eos>"]]
        trg_indexes = [self.word2index["<sos>"]] + trg_indexes + [self.word2index["<eos>"]]
        
        # Pad sequences
        src_indexes = self.pad_sequence(src_indexes, self.max_length)
        trg_indexes = self.pad_sequence(trg_indexes, self.max_length)
        
        return (torch.tensor(src_indexes, dtype=torch.long), 
                torch.tensor(trg_indexes, dtype=torch.long))

    


In [26]:
machine_translate_dataset =  MachineTranslateDataset(translation_data, word2index)
machine_translate_dataset.__getitem__(0)

(tensor([1, 4, 5, 6, 7, 2, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
         0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
         0, 0]),
 tensor([ 1,  8,  9, 10, 11, 12,  2,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,
          0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,
          0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0]))

In [29]:
# Hyperparameters
input_size_encoder = vocab_size
num_epochs = 100
learning_rate = 0.001
batch_size = 8
embedding_size = 256
hidden_size = 512
num_layers = 2

# Initialize dataloader
data_loader = torch.utils.data.DataLoader(dataset=machine_translate_dataset,
                                          batch_size=batch_size,
                                          shuffle=True)

# Initialize model
model = Seq2Seq(encoder=Encoder(vocab_size, embedding_size, hidden_size, num_layers),
                decoder=Decoder(vocab_size, embedding_size, hidden_size, num_layers),
                device=device).to(device)
print(model)

# Loss and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=learning_rate)



Seq2Seq(
  (encoder): Encoder(
    (embedding): Embedding(233, 256)
    (dropout): Dropout(p=0.5, inplace=False)
    (lstm): LSTM(256, 512, num_layers=2, batch_first=True, dropout=0.5)
  )
  (decoder): Decoder(
    (embedding): Embedding(233, 256)
    (dropout): Dropout(p=0.5, inplace=False)
    (lstm): LSTM(256, 512, num_layers=2, batch_first=True, dropout=0.5)
    (fc): Linear(in_features=512, out_features=233, bias=True)
  )
)


In [84]:
#  Training loop
model.train()
total_steps = len(data_loader)
for epoch in range(num_epochs):
    total_loss = 0
    for i, (src, trg) in enumerate(data_loader):
        src = src.to(device)
        trg = trg.to(device)

        # Forward pass
        optimizer.zero_grad()
        output = model(src, trg)

        output_dim = output.shape[-1]

       # Calculate loss
        # output shape: (batch_size, seq_len, vocab_size)
        # target shape: (batch_size, seq_len)
        output = output[:, 1:].reshape(-1, output.shape[-1])
        target = trg[:, 1:].reshape(-1)
        
        loss = criterion(output, target)
        
        # Backward pass and optimize
        loss.backward()
        # Clip gradients to prevent exploding gradients
        torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1)
        optimizer.step()
        
        total_loss += loss.item()
        # Print progress
        if (i + 1) % 100 == 0:
            print(f'Epoch [{epoch+1}/{num_epochs}], '
                    f'Step [{i+1}/{total_steps}], '
                    f'Loss: {loss.item():.4f}')
            
    # Print epoch statistics
    avg_loss = total_loss / len(data_loader)
    print(f'Epoch [{epoch+1}/{num_epochs}], Average Loss: {avg_loss:.4f}')


Epoch [1/100], Average Loss: 0.1345
Epoch [2/100], Average Loss: 0.1535
Epoch [3/100], Average Loss: 0.1668
Epoch [4/100], Average Loss: 0.2068
Epoch [5/100], Average Loss: 0.1242
Epoch [6/100], Average Loss: 0.1790
Epoch [7/100], Average Loss: 0.2777
Epoch [8/100], Average Loss: 0.3051
Epoch [9/100], Average Loss: 0.1795
Epoch [10/100], Average Loss: 0.1250
Epoch [11/100], Average Loss: 0.1986
Epoch [12/100], Average Loss: 0.1575
Epoch [13/100], Average Loss: 0.1480
Epoch [14/100], Average Loss: 0.1427
Epoch [15/100], Average Loss: 0.1939
Epoch [16/100], Average Loss: 0.1271
Epoch [17/100], Average Loss: 0.1676
Epoch [18/100], Average Loss: 0.2059
Epoch [19/100], Average Loss: 0.2145
Epoch [20/100], Average Loss: 0.1542
Epoch [21/100], Average Loss: 0.1271
Epoch [22/100], Average Loss: 0.2323
Epoch [23/100], Average Loss: 0.1123
Epoch [24/100], Average Loss: 0.0937
Epoch [25/100], Average Loss: 0.1188
Epoch [26/100], Average Loss: 0.2041
Epoch [27/100], Average Loss: 0.1781
Epoch [28/

In [87]:
def pad_sequence(sequence, max_len, word2index):
    if len(sequence) > max_len:
        return sequence[:max_len]
    else:
        return sequence + [word2index["<pad>"]] * (max_len - len(sequence))

def translate(model, src_sentence, word2index, device, max_length=50):
    model.eval()
    
    # Convert sentence to indices
    src_indexes = [word2index.get(token, word2index["<unk>"]) for token in src_sentence]
    src_indexes = [word2index["<sos>"]] + src_indexes + [word2index["<eos>"]]
    src_indexes = pad_sequence(src_indexes, max_length, word2index)
    
    # Create source tensor: shape should be [batch_size=1, seq_len]
    src_tensor = torch.LongTensor([src_indexes]).to(device)
    
    with torch.no_grad():
        # Get encoder outputs
        encoder_outputs, hidden, cell = model.encoder(src_tensor)
        
        # Initialize decoder input with <sos> token
        decoder_input = torch.LongTensor([word2index["<sos>"]]).to(device)
        
        # Store all decoder outputs
        decoded_words = []
        
        for _ in range(max_length):
            # Run decoder for one step
            decoder_output, hidden, cell = model.decoder(decoder_input, hidden, cell)
            
            # Get the most likely word
            topv, topi = decoder_output.squeeze().data.topk(1)
            decoded_token = topi.item()
            
            # Add the token to results
            if decoded_token == word2index["<eos>"]:
                break
            elif decoded_token == word2index["<pad>"]:
                continue
            else:
                # Convert index back to word
                decoded_words.append(
                    next(word for word, index in word2index.items() 
                        if index == decoded_token)
                )
            
            # Next input is the decoded token
            decoder_input = torch.LongTensor([decoded_token]).to(device)
    
    return decoded_words

In [91]:
# save model and vocab
import json
import os
model_dir = "seq2se_model"

if not os.path.exists(model_dir):
    os.makedirs(model_dir)
    
def save_model():
    torch.save(model.state_dict(), f"{model_dir}/seq2seq.pth")
    with open(f"{model_dir}/word2index.json", "w") as f:
        json.dump(word2index, f)

save_model()

In [89]:
# Example usage:
translated = translate(
    model=model,
    src_sentence="The weather is nice today.".split(),
    word2index=word2index,
    device=device,
    max_length=50
)
print(" ".join(translated))

Thời tiết hôm nay đẹp.
