# Encoder-Decoder Transformer (T5-like Model)

In this notebook, we implement an encoder-decoder transformer model similar to T5. The model is designed for sequence-to-sequence tasks like translation or summarization.

In [3]:
%pip install torch numpy

Note: you may need to restart the kernel to use updated packages.


In [4]:
# Import necessary libraries
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import numpy as np
import copy

In [5]:
# Set the device ("mps" if you're using an M series mac):
if torch.backends.mps.is_available():
    device = torch.device('mps')
elif torch.cuda.is_available():
    device = torch.device('cuda')
else:
    device = torch.device('cpu')

In [6]:
# Sample vocabulary and data
vocab = ['[PAD]', '[BOS]', '[EOS]', 'i', 'like', 'eating', 'apples', 'bananas', 'fruits']
word_to_idx = {word: idx for idx, word in enumerate(vocab)}
idx_to_word = {idx: word for word, idx in word_to_idx.items()}
vocab_size = len(vocab)

### Special Tokens
There are some different tokens you might stumble across when dealing with inputs in NLP. 
- The [PAD] Token acts as a padding if a sentence does not have the desired fixed length. 
- The [BOS] Token indicates the Beginning of the Sentence. 
- The [EOS] Token indicates the End of the Sentence. 
- The [CLS] Token represents Sentence Level Classification. 
- The [SEP] Token represents Separation of Sentences (used by BERT). 
- The [UNK] Token represents OOB-Tokens, meaning unknown Tokens that are not included in the vocabulary. 

In [7]:
# Sample input-output pairs (e.g., paraphrasing)
input_sentences = [
    ['[BOS]', 'i', 'like', 'eating', 'apples', '[EOS]'],
    ['[BOS]', 'i', 'like', 'eating', 'bananas', '[EOS]']
]
output_sentences = [
    ['[BOS]', 'i', 'like', 'fruits', '[EOS]'],
    ['[BOS]', 'i', 'like', 'fruits', '[EOS]']
]

In [8]:
# Prepare data
def prepare_data(sentences):
    inputs = []
    for sent in sentences:
        input_ids = [word_to_idx[word] for word in sent]
        inputs.append(input_ids)
    return torch.tensor(inputs)

In [9]:
# Convert data to tensors
encoder_inputs = prepare_data(input_sentences).to(device)
decoder_inputs = prepare_data([['[BOS]'] + sent[1:] for sent in output_sentences]).to(device)
decoder_targets = prepare_data([sent[1:] + ['[PAD]'] for sent in output_sentences]).to(device)

In [10]:
# Positional Encoding (same as before)
class PositionalEncoding(nn.Module):
    def __init__(self, model_dimension, max_len=512):
        super(PositionalEncoding, self).__init__()
        # TODO: Implement positional encoding
        positional_encoding = torch.zeros(max_len, model_dimension)
        position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
        frequency_divisor = torch.exp(torch.arange(0, model_dimension, 2).float() * (-np.log(10000.0) / model_dimension))
        positional_encoding[:, 0::2] = torch.sin(position * frequency_divisor)
        positional_encoding[:, 1::2] = torch.cos(position * frequency_divisor)
        positional_encoding = positional_encoding.unsqueeze(0)  # Shape: (1, max_len, d_model)
        self.register_buffer('pe', positional_encoding)

    def forward(self, x):
        x = x + self.pe[:, :x.size(1), :].to(x.device)
        return x

In [11]:
# Encoder Layer (same as before)
class EncoderLayer(nn.Module):
    def __init__(self, model_dimension, num_attention_heads, dim_feedforward, dropout=0.1):
        super(EncoderLayer, self).__init__()
        # TODO: Implement encoder layer components
        self.self_attention = nn.MultiheadAttention(model_dimension, num_attention_heads, dropout=dropout)
        # Feedforward network
        self.linear1 = nn.Linear(model_dimension, dim_feedforward)
        self.dropout = nn.Dropout(dropout)
        self.linear2 = nn.Linear(dim_feedforward, model_dimension)
        # Layer normalization
        self.norm1 = nn.LayerNorm(model_dimension)
        self.norm2 = nn.LayerNorm(model_dimension)
        # Dropout layers
        self.dropout1 = nn.Dropout(dropout)
        self.dropout2 = nn.Dropout(dropout)

    def forward(self, input_tensor, src_mask=None):
        # TODO: Implement forward pass
        attention_output = self.self_attention(input_tensor, input_tensor, input_tensor, attn_mask=src_mask)[0]
        input_tensor = input_tensor + self.dropout1(attention_output)
        input_tensor = self.norm1(input_tensor)
        # Feedforward network
        attention_output = self.linear2(self.dropout(F.relu(self.linear1(input_tensor))))
        input_tensor = input_tensor + self.dropout2(attention_output)
        input_tensor = self.norm2(input_tensor)
        return input_tensor

In [12]:
# Decoder Layer
class DecoderLayer(nn.Module):
    def __init__(self, model_dimension, num_attention_heads, dim_feedforward, dropout=0.1):
        super(DecoderLayer, self).__init__()
        # TODO: Implement decoder layer components
        self.self_attention = nn.MultiheadAttention(model_dimension, num_attention_heads, dropout=dropout)
        self.multihead_attention = nn.MultiheadAttention(model_dimension, num_attention_heads, dropout=dropout)
        # Feedforward network
        self.linear1 = nn.Linear(model_dimension, dim_feedforward)
        self.dropout = nn.Dropout(dropout)
        self.linear2 = nn.Linear(dim_feedforward, model_dimension)
        # Layer normalization
        self.norm1 = nn.LayerNorm(model_dimension)
        self.norm2 = nn.LayerNorm(model_dimension)
        self.norm3 = nn.LayerNorm(model_dimension)
        # Dropout layers
        self.dropout1 = nn.Dropout(dropout)
        self.dropout2 = nn.Dropout(dropout)
        self.dropout3 = nn.Dropout(dropout)

    def forward(self, target, memory, target_mask=None, memory_mask=None):
        # TODO: Implement forward pass
        # Self-attention
        target2 = self.self_attention(target, target, target, attn_mask=target_mask)[0]
        target = target + self.dropout1(target2)
        target = self.norm1(target)
        # Multi-head attention with encoder output
        target2 = self.multihead_attention(target, memory, memory, attn_mask=memory_mask)[0]
        target = target + self.dropout2(target2)
        target = self.norm2(target)
        # Feedforward network
        target2 = self.linear2(self.dropout(F.relu(self.linear1(target))))
        target = target + self.dropout3(target2)
        target = self.norm3(target)
        return target

In [13]:
# Encoder (same as before)
class Encoder(nn.Module):
    def __init__(self, num_layers, d_model, num_attention_heads, vocab_size, dim_feedforward, dropout=0.1):
        super(Encoder, self).__init__()
        # TODO: Implement encoder components
        self.model_dimension = d_model
        self.embedding = nn.Embedding(vocab_size, d_model)
        self.positional_encoding_layer = PositionalEncoding(d_model)
        # Create multiple encoder layers
        encoder_layer = EncoderLayer(d_model, num_attention_heads, dim_feedforward, dropout)
        self.layers = nn.ModuleList([copy.deepcopy(encoder_layer) for _ in range(num_layers)])
        self.norm = nn.LayerNorm(d_model)

    def forward(self, input_sequence, src_mask=None):
        # TODO: Implement forward pass
        input_sequence = self.embedding(input_sequence) * np.sqrt(self.model_dimension)
        input_sequence = self.positional_encoding_layer(input_sequence)
        input_sequence = input_sequence.permute(1, 0, 2)  # (sequence length, batch size, embedding size)
        for layer in self.layers:
            input_sequence = layer(input_sequence, src_mask)
        input_sequence = self.norm(input_sequence)
        return input_sequence

In [14]:
# Decoder
class Decoder(nn.Module):
    def __init__(self, num_layers, model_dimension, num_attention_heads, vocab_size, dim_feedforward, dropout=0.1):
        super(Decoder, self).__init__()
        # TODO: Implement decoder components
        self.d_model = model_dimension
        self.embedding = nn.Embedding(vocab_size, model_dimension)
        self.positional_encoder_layer = PositionalEncoding(model_dimension)
        # Create multiple decoder layers
        decoder_layer = DecoderLayer(model_dimension, num_attention_heads, dim_feedforward, dropout)
        self.layers = nn.ModuleList([copy.deepcopy(decoder_layer) for _ in range(num_layers)])
        self.norm = nn.LayerNorm(model_dimension)

    def forward(self, tgt, memory, tgt_mask=None, memory_mask=None):
        # TODO: Implement forward pass
        tgt = self.embedding(tgt) * np.sqrt(self.d_model)
        tgt = self.positional_encoder_layer(tgt)
        tgt = tgt.permute(1, 0, 2)  # (sequence length, batch size, embedding size)
        for layer in self.layers:
            tgt = layer(tgt, memory, tgt_mask, memory_mask)
        tgt = self.norm(tgt)
        return tgt

In [15]:
# Seq2Seq Model
class Seq2SeqModel(nn.Module):
    def __init__(self, num_layers, d_model, nhead, vocab_size, dim_feedforward, dropout=0.1):
        super(Seq2SeqModel, self).__init__()
        # TODO: Implement Seq2Seq model components
        self.encoder = Encoder(num_layers, d_model, nhead, vocab_size, dim_feedforward, dropout)
        self.decoder = Decoder(num_layers, d_model, nhead, vocab_size, dim_feedforward, dropout)
        self.output_layer = nn.Linear(d_model, vocab_size)

    def forward(self, src, tgt, src_mask=None, tgt_mask=None):
        # TODO: Implement forward pass
        memory = self.encoder(src, src_mask)
        decoder_output = self.decoder(tgt, memory, tgt_mask)
        output = self.output_layer(decoder_output.permute(1, 0, 2))  # Back to (batch size, sequence length, vocab size)
        return output

In [16]:
# Hyperparameters
num_layers = 2
model_dimension = 64
num_attention_heads = 4
dim_feedforward = 128
dropout = 0.1

In [17]:
# Initialize the model, loss function, and optimizer
model = Seq2SeqModel(num_layers, model_dimension, num_attention_heads, vocab_size, dim_feedforward, dropout).to(device)
criterion = nn.CrossEntropyLoss(ignore_index=word_to_idx['[PAD]'])
optimizer = optim.Adam(model.parameters(), lr=0.001)

In [18]:
# Create masks (not used in this simple example)
source_attention_mask = None
target_mask = None

In [19]:
# Training loop
epochs = 100
for epoch in range(epochs):
    model.train()
    optimizer.zero_grad()
    # TODO: Forward pass
    outputs = model(encoder_inputs, decoder_inputs, source_attention_mask, target_mask)
    # Reshape outputs and targets
    outputs = outputs.view(-1, vocab_size)
    targets = decoder_targets.view(-1)
    loss = criterion(outputs, targets)
    loss.backward()
    optimizer.step()
    if (epoch + 1) % 10 == 0:
        print(f"Epoch {epoch+1}/{epochs}, Loss: {loss.item():.4f}")

Epoch 10/100, Loss: 0.3137
Epoch 20/100, Loss: 0.0894
Epoch 30/100, Loss: 0.0484
Epoch 40/100, Loss: 0.0372
Epoch 50/100, Loss: 0.0279
Epoch 60/100, Loss: 0.0227
Epoch 70/100, Loss: 0.0197
Epoch 80/100, Loss: 0.0160
Epoch 90/100, Loss: 0.0147
Epoch 100/100, Loss: 0.0135


In [22]:
# Inference: Generate output sequence
def generate_sequence(model, input_sentence, max_length=10):
    model.eval()
    input_ids = [word_to_idx.get(word, word_to_idx['[PAD]']) for word in input_sentence]
    source_tensor = torch.tensor([input_ids]).to(device)
    memory = model.encoder(source_tensor)
    target_tokens = [word_to_idx['[BOS]']]
    for _ in range(max_length):
        target = torch.tensor([target_tokens]).to(device)
        output = model.decoder(target, memory)
        output = model.output_layer(output.permute(1, 0, 2))
        next_token = output.argmax(-1)[:, -1].item()
        target_tokens.append(next_token)
        if next_token == word_to_idx['[EOS]']:
            break
    output_sentence = [idx_to_word[idx] for idx in target_tokens]
    return output_sentence

In [23]:
# Test the model
test_sentence = ['[BOS]', 'i', 'like', 'eating', 'apples', '[EOS]']
generated_sentence = generate_sequence(model, test_sentence)
print("Input Sentence:", test_sentence)
print("Generated Sentence:", generated_sentence)

Input Sentence: ['[BOS]', 'i', 'like', 'eating', 'apples', '[EOS]']
Generated Sentence: ['[BOS]', 'i', 'like', 'fruits', '[EOS]']
