In [None]:
import torch
import math
import json
import random
from typing import Union

from tqdm import tqdm

from torch import Tensor
from torch import nn
from datasets import Dataset
from ignite.metrics import Rouge

from transformers import AutoTokenizer
from torch.utils.data import DataLoader

import matplotlib.pyplot as plt

#### Attn Mask Function
Borrowed from the Transfomer code by Huggingface.

In [None]:
def generate_square_subsequent_mask(sz):
  mask = (torch.triu(torch.ones(sz, sz)) == 1).transpose(0, 1)
  mask = mask.float().masked_fill(mask == 0, float('-inf')).masked_fill(mask == 1, float(0.0))
  return mask

#### Positional Embedding
Borrowed from the `spotpython` library. See [`spotpython.light.transformer.positionalEncoding`](https://github.com/sequential-parameter-optimization/spotPython/blob/main/src/spotpython/light/transformer/positionalEncoding.py).

In [None]:
class PositionalEncoding(nn.Module):
    """
    Class for adding positional encoding to the input tensor.
    """

    # Note: this code was borrowed from the spotpython library
    def __init__(self, d_model: int, max_len: int = 5000) -> None:
        """
        Initializes the PositionalEncoding module.

        Args:
            d_model (int): The dimensionality of the model (the size of the embeddings).
            max_len (int, optional): The maximum length of the input sequence. Defaults to 5000.
        """

        super().__init__()
        position = torch.arange(max_len).unsqueeze(1)
        div_term = torch.exp(
            torch.arange(0, d_model, 2) * (-math.log(10000.0) / d_model)
        )
        pe = torch.zeros(max_len, 1, d_model)
        pe[:, 0, 0::2] = torch.sin(position * div_term)
        pe[:, 0, 1::2] = torch.cos(position * div_term)
        self.register_buffer("pe", pe)

    def forward(self, x: Tensor) -> Tensor:
        """
        Adds the positional encodings to the input tensor `x`.

        Args:
            x (Tensor): The input tensor with shape
                ``[seq_len, batch_size, embedding_dim]``.

        Returns:
            Tensor: The input tensor with positional encodings added, of shape
                ``[seq_len, batch_size, embedding_dim]``.

        Raises:
            IndexError: If the sequence length of the input tensor exceeds the
                maximum length set during initialization.
        """

        return x + self.pe[: x.size(0)]

#### Encoder Layer

In [None]:
class EncoderLayer(nn.Module):
    """
    A single layer of the Transformer encoder.

    This layer includes multi-head self-attention followed by a feedforward neural network.
    Both components have residual connections, normalization, and optional dropout.
    """

    def __init__(self, embed_dim: int, hidden_dim: int, num_heads: int) -> None:
        """
        Initializes the EncoderLayer.

        Args:
            embed_dim (int): Dimensionality of the input embeddings.
            hidden_dim (int): Dimensionality of the feedforward neural network.
            num_heads (int): Number of attention heads in the multi-head attention mechanism.
        """

        super().__init__()
        self.mh_self_attn = nn.MultiheadAttention(
            embed_dim=embed_dim, num_heads=num_heads
        )
        self.dropout = nn.Dropout(0.1)
        self.relu = nn.ReLU()
        self.linear1 = nn.Linear(embed_dim, hidden_dim)
        self.linear2 = nn.Linear(hidden_dim, embed_dim)
        self.norm1 = nn.LayerNorm(embed_dim)
        self.norm2 = nn.LayerNorm(hidden_dim)
        self.norm3 = nn.LayerNorm(embed_dim)

    def forward(
        self, x: Tensor, src_key_padding_mask: Tensor, dropout: bool = True
    ) -> Tensor:
        """
        Forward pass of the encoder layer.

        Args:
            x (Tensor): The input tensor of shape ``[seq_len, batch_size, embed_dim]``.
            src_key_padding_mask (Tensor): A mask tensor indicating which positions should
                be ignored during attention.
            dropout (bool, optional): Whether to apply dropout in the feedforward network.
                Defaults to True.

        Returns:
            Tensor: The output tensor of shape ``[seq_len, batch_size, embed_dim]``, after
                attention and feedforward steps.
        """

        # Multihead self-attention
        self_attn_output, _ = self.mh_self_attn(
            query=x, key=x, value=x, key_padding_mask=src_key_padding_mask
        )
        # Add multihead output to input and normalize
        self_attn_output = self_attn_output + x
        self_attn_output = self.norm1(self_attn_output)

        # Feedforward
        ff_output = self.linear1(self_attn_output)
        ff_output = self.relu(ff_output)
        ff_output = self.norm2(ff_output)
        if dropout:
            ff_output = self.dropout(ff_output)
        ff_output = self.linear2(ff_output)

        # Add attention output to feed forward output and normalize
        output = ff_output + self_attn_output
        output = self.norm3(output)

        return output

#### Decoder Layer

In [None]:
class DecoderLayer(nn.Module):
    """
    A single layer of the Transformer decoder.

    This layer includes masked multi-head self-attention, multi-head encoder-decoder attention,
    and a feedforward neural network, all with residual connections and layer normalization.
    """

    def __init__(self, embed_dim: int, hidden_dim: int, num_heads: int) -> None:
        """
        Initializes the DecoderLayer.

        Args:
            embed_dim (int): Dimensionality of the input embeddings.
            hidden_dim (int): Dimensionality of the feedforward neural network.
            num_heads (int): Number of attention heads in the multi-head attention mechanism.
        """

        super().__init__()
        self.relu = nn.ReLU()
        self.dropout = nn.Dropout(0.1)
        self.mmh_self_attn = nn.MultiheadAttention(embed_dim, num_heads)
        self.mh_self_attn = nn.MultiheadAttention(embed_dim, num_heads)
        self.linear1 = nn.Linear(embed_dim, hidden_dim)
        self.linear2 = nn.Linear(hidden_dim, embed_dim)
        self.norm1 = nn.LayerNorm(embed_dim)
        self.norm2 = nn.LayerNorm(embed_dim)
        self.norm3 = nn.LayerNorm(hidden_dim)
        self.norm4 = nn.LayerNorm(embed_dim)

    def forward(
        self,
        x: Tensor,
        encoder_output: Tensor,
        trg_key_padding_mask: Tensor,
        src_key_padding_mask: Tensor,
        trg_attn_mask: Tensor,
        dropout: bool = True,
    ) -> Tensor:
        """
        Forward pass of the decoder layer.

        Args:
            x (Tensor): The input tensor of shape ``[seq_len, batch_size, embed_dim]``.
            encoder_output (Tensor): The output tensor from the encoder, shape ``[seq_len, batch_size, embed_dim]``.
            trg_key_padding_mask (Tensor): A mask tensor indicating which positions in the target sequence should be ignored.
            src_key_padding_mask (Tensor): A mask tensor indicating which positions in the source sequence should be ignored.
            trg_attn_mask (Tensor): A mask tensor for preventing attention to future tokens in the target sequence.
            dropout (bool, optional): Whether to apply dropout in the feedforward network. Defaults to True.

        Returns:
            Tensor: The output tensor of shape ``[seq_len, batch_size, embed_dim]``, after attention and feedforward steps.
        """

        # Masked multihead self-attention
        self_attn_output, _ = self.mmh_self_attn(
            query=x,
            key=x,
            value=x,
            is_causal=False,
            attn_mask=trg_attn_mask,
            key_padding_mask=trg_key_padding_mask,
        )

        # Add self-attention output to input and normalize
        self_attn_output = self_attn_output + x
        self_attn_output = self.norm1(self_attn_output)

        # Multihead encoder-decoder attention
        attn_output, _ = self.mh_self_attn(
            query=self_attn_output,
            key=encoder_output,
            value=encoder_output,
            key_padding_mask=src_key_padding_mask,
        )

        # Add encoder-decoder attn output to decoder self-attn ouput and normalize
        attn_output = attn_output + self_attn_output
        attn_output = self.norm2(attn_output)

        # Feedforward
        ff_output = self.linear1(attn_output)
        ff_output = self.relu(ff_output)
        ff_output = self.norm3(ff_output)
        if dropout:
            ff_output = self.dropout(ff_output)
        ff_output = self.linear2(ff_output)

        # Add x to output and normalize
        output = ff_output + attn_output
        output = self.norm4(output)

        return output

#### Full Transformer Model

In [None]:
class Transformer(nn.Module):
    """
    Transformer model for sequence-to-sequence tasks.

    This model implements the Transformer architecture, which consists of an encoder and a decoder.
    It includes embedding layers for source and target inputs, positional encoding, and multiple
    encoder and decoder layers.
    """

    def __init__(
        self,
        embed_dim: int,
        encoder_hidden_dim: int,
        decoder_hidden_dim: int,
        encoder_heads: int,
        decoder_heads: int,
        encoder_layers: int,
        decoder_layers: int,
        src_num_embeddings: int,
        trg_num_embeddings: int,
    ) -> None:
        """
        Initializes the Transformer model.

        Args:
            embed_dim (int): Dimensionality of the input embeddings.
            encoder_hidden_dim (int): Dimensionality of the hidden layers in the encoder.
            decoder_hidden_dim (int): Dimensionality of the hidden layers in the decoder.
            encoder_heads (int): Number of attention heads in the encoder.
            decoder_heads (int): Number of attention heads in the decoder.
            encoder_layers (int): Number of layers in the encoder.
            decoder_layers (int): Number of layers in the decoder.
            src_num_embeddings (int): Size of the source vocabulary.
            trg_num_embeddings (int): Size of the target vocabulary.
        """

        super().__init__()
        self.src_embedding = nn.Embedding(
            embedding_dim=embed_dim, num_embeddings=src_num_embeddings, padding_idx=0
        )
        self.trg_embedding = nn.Embedding(
            embedding_dim=embed_dim, num_embeddings=trg_num_embeddings, padding_idx=0
        )
        self.pos_encoder = PositionalEncoding(d_model=embed_dim)

        self.encoder = nn.ModuleList(
            [
                EncoderLayer(embed_dim, encoder_hidden_dim, encoder_heads)
                for _ in range(encoder_layers)
            ]
        )
        self.decoder = nn.ModuleList(
            [
                DecoderLayer(embed_dim, decoder_hidden_dim, decoder_heads)
                for _ in range(decoder_layers)
            ]
        )
        self.linear = nn.Linear(embed_dim, trg_num_embeddings)

    def forward(
        self,
        src_x: Tensor,
        trg_x: Tensor,
        src_key_padding_mask: Tensor,
        trg_key_padding_mask: Tensor,
        trg_attn_mask: Union[Tensor, None] = None,
        decoder_only: bool = False,
        return_encoder_output: bool = False,
    ) -> Union[Tensor, tuple[Tensor, Tensor]]:
        """
        Forward pass of the Transformer model.

        Args:
            src_x (Tensor): The source input tensor of shape ``[seq_len, batch_size]``.
            trg_x (Tensor): The target input tensor of shape ``[seq_len, batch_size]``.
            src_key_padding_mask (Tensor): A mask tensor for padding in the source input.
            trg_key_padding_mask (Tensor): A mask tensor for padding in the target input.
            trg_attn_mask (Union[Tensor, None], optional): A mask tensor for attention in the
                target input. Defaults to None.
            decoder_only (bool, optional): If True, only the decoder will be run. Defaults to False.
            return_encoder_output (bool, optional): If True, the output will include the encoder's
                output. Defaults to False.

        Returns:
            Union[Tensor, tuple[Tensor, Tensor]]: The output tensor of shape
                ``[seq_len, batch_size, trg_num_embeddings]`` or a tuple of (output, encoder output) if
                `return_encoder_output` is True.
        """

        trg_embed = self.trg_embedding(trg_x)
        trg = self.pos_encoder(trg_embed)

        if not decoder_only:
            src_embed = self.src_embedding(src_x)
            src = self.pos_encoder(src_embed)
            for layer in self.encoder:
                src = layer(src, src_key_padding_mask)
        else:
            src = src_x

        for layer in self.decoder:
            trg = layer(
                trg, src, trg_key_padding_mask, src_key_padding_mask, trg_attn_mask
            )

        output = self.linear(trg)

        if return_encoder_output:
            output = output, src

        return output

### Data

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

#### Load Data

In [None]:
!curl -L $(yadisk-direct https://disk.yandex.com/d/Q6Bm9NoG1VWcgA) -o valid_data.jsonl
!curl -L $(yadisk-direct https://disk.yandex.com/d/2V3YpeogygoBTA) -o train_data.jsonl

### Preprocess Data

In [None]:
german_tokenizer = AutoTokenizer.from_pretrained("dbmdz/bert-base-german-cased", clean_up_tokenization_spaces=True) # casing matters in German
english_tokenizer = AutoTokenizer.from_pretrained("bert-base-uncased", clean_up_tokenization_spaces=True)

english_tokenizer.add_tokens(['[BOS]', '[EOS]'], special_tokens=True)
german_tokenizer.add_tokens(['[EOS]'], special_tokens=True)

In [None]:
def german_tokenize(text):
 return german_tokenizer(
     text["german"],
     add_special_tokens=False,
     max_length=100, # attn masking and padding masking is easier when fixed
     padding='max_length',
     truncation=True,
     return_tensors='pt'
  )

def english_tokenize(text):
  return english_tokenizer(
      text["english"],
      add_special_tokens=False,
      max_length=100, # attn masking and padding masking is easier when fixed
      padding='max_length',
      truncation=True,
      return_tensors='pt'
)

In [None]:
def read_corpus(filename):

    data = {"german": [], "english": []}
    with open(filename, 'r', encoding='utf-8') as file:
        for line in file.readlines():
            json_line = json.loads(line)
            data["german"].append(json_line['de'] + ' [EOS]')
            data["english"].append('[BOS] ' + json_line['en'] + ' [EOS]') # shift target right
    return data

train_data = Dataset.from_dict(read_corpus('train_data.jsonl'))
valid_data = Dataset.from_dict(read_corpus('valid_data.jsonl'))

src_train_data = train_data.map(german_tokenize, batched=True)
trg_train_data = train_data.map(english_tokenize, batched=True)
src_valid_data = valid_data.map(german_tokenize, batched=True)
trg_valid_data = valid_data.map(english_tokenize, batched=True)

del train_data
del valid_data

In [None]:
src_train_data = src_train_data.remove_columns(['german', 'english', 'token_type_ids'])
src_valid_data = src_valid_data.remove_columns(['german', 'english', 'token_type_ids'])
trg_train_data = trg_train_data.remove_columns(['german', 'english', 'token_type_ids'])
trg_valid_data = trg_valid_data.remove_columns(['german', 'english', 'token_type_ids'])

In [None]:
src_train_data.set_format(type="torch", columns=["input_ids", "attention_mask"])
src_valid_data.set_format(type="torch", columns=["input_ids", "attention_mask"])
trg_train_data.set_format(type="torch", columns=["input_ids", "attention_mask"])
trg_valid_data.set_format(type="torch", columns=["input_ids", "attention_mask"])

In [None]:
src_train_loader = DataLoader(src_train_data, batch_size=128) # , generator=torch.Generator(device=device))
trg_train_loader = DataLoader(trg_train_data, batch_size=128) # , generator=torch.Generator(device=device))
src_valid_loader = DataLoader(src_valid_data, batch_size=1) # , generator=torch.Generator(device=device))
trg_valid_loader = DataLoader(trg_valid_data, batch_size=1) # , generator=torch.Generator(device=device))

### Train Model

In [None]:
EMBED_DIM = 200
OUTPUT_DIM = 200
ENCODER_HEADS = 4
DECODER_HEADS = 4
ENCODER_LAYERS = 1
DECODER_LAYERS = 1

In [None]:
model = Transformer(
    EMBED_DIM,
    OUTPUT_DIM,
    ENCODER_HEADS,
    DECODER_HEADS,
    ENCODER_LAYERS,
    DECODER_LAYERS
)

model = model.to(device)

In [None]:
criterion = nn.CrossEntropyLoss(reduction="mean")
optimizer = torch.optim.Adam(model.parameters(), lr=0.003)

In [None]:
epochs = 50
N = len(src_train_loader) # number of batches
losses = []
for epoch in range(epochs):
    model.train()
    epoch_loss = 0
    for src_batch, trg_batch in zip(src_train_loader, trg_train_loader):
        src_x = src_batch['input_ids'].T.to(device)
        trg_x = trg_batch['input_ids'].T.to(device)
        labels = trg_batch['input_ids'][:,1:] # remove [BOS]

        labels = torch.cat((labels, torch.zeros((labels.shape[0], 1))), dim=1) # add a row of 0's to correct padding
        labels = labels.type(torch.LongTensor).to(device)

        trg_attn_mask = generate_square_subsequent_mask(trg_x.shape[0]).to(device)
        src_key_padding_mask = (src_batch['attention_mask'] == 0).to(device)
        trg_key_padding_mask = (trg_batch['attention_mask'] == 0).to(device)

        optimizer.zero_grad()

        output = model(src_x, trg_x, src_key_padding_mask, trg_key_padding_mask, trg_attn_mask)
        output = output.permute(1, 2, 0)

        loss = criterion(output, labels)
        loss.backward()
        optimizer.step()

        epoch_loss += loss.item()

    losses.append(epoch_loss/N)
    print(f"Epoch: {epoch + 1}, Loss: {epoch_loss/N}")

In [None]:
plt.figure(figsize=(10, 6))
plt.plot(list(range(1, epochs + 1)), losses, marker='o', color='b', linestyle='-', markersize=5, label='Loss')

plt.title('Loss Over Epochs', fontsize=16)
plt.xlabel('Epoch', fontsize=14)
plt.ylabel('Loss', fontsize=14)
plt.grid(True)
plt.legend()

plt.show()

In [None]:
torch.save(model.state_dict(), 'model.pth')

### Evaluate Model

In [None]:
EMBED_DIM = 200
OUTPUT_DIM = 200
ENCODER_HEADS = 4
DECODER_HEADS = 4
ENCODER_LAYERS = 1
DECODER_LAYERS = 1

model = Transformer(
    EMBED_DIM,
    OUTPUT_DIM,
    ENCODER_HEADS,
    DECODER_HEADS,
    ENCODER_LAYERS,
    DECODER_LAYERS
)
model.load_state_dict(torch.load('model.pth'))
model = model.to(device)

In [None]:
EOS = 30523
trans_samples = []
rouge = Rouge(variants=["L", 3, 2], multiref="best")

for src_batch, trg_batch in tqdm(zip(src_valid_loader, trg_valid_loader), total=len(src_valid_loader)):
    src_x = src_batch['input_ids'].T.to(device)
    src_key_padding_mask = (src_batch['attention_mask'] == 0).to(device)
    trg_x = trg_batch['input_ids'][:,0].unsqueeze(0).to(device)

    model.eval()
    output, src_x = model(
        src_x, trg_x, src_key_padding_mask,
        trg_key_padding_mask=None, return_encoder_output=True
    )
    output_ids = output.argmax(-1)
    batch_size = trg_batch['attention_mask'].shape[1]
    for idx in range(batch_size):
      trg_x = torch.cat((trg_x, output_ids[idx,:].unsqueeze(0)))

      output = model(
          src_x, trg_x, src_key_padding_mask,
          trg_key_padding_mask=None, decoder_only=True
      )
      output_ids = output.argmax(-1)

      if output_ids.T.squeeze(0)[-1].item() == EOS:
        candidate = (trg_x.T.squeeze(0)[1:]).tolist()
        break

    reference = trg_batch['input_ids'][:,1:].squeeze()
    padding_index = torch.where(reference == EOS)[0].item()
    reference = (reference[:padding_index]).tolist()

    reference = [[str(r) for r in reference]]
    candidate = [str(id) for id in candidate]

    rouge.update(([candidate], [reference]))

    number = random.randint(0, 64)
    if number == 4:
      trans_samples.append((candidate, reference))

rouge.compute()