In [None]:
# Comment the following lines if you're not in colab:
from google.colab import drive
drive.mount('/content/drive')
# If you're in colab, cd to your own working directory here:
%cd ..//..//content//drive//MyDrive//Colab-Notebooks//HY-673-Tutorials//Tutorial-6

# <u>Building a Transformer with PyTorch</u>

Based on:
https://www.datacamp.com/tutorial/building-a-transformer-with-py-torch <br>
Here, it is implemented with an extra layer normalization after the encoder and decoder to match PyTorch's implementation.

In [None]:
import torch as tc
import torch.nn as nn
from tqdm import tqdm
from time import perf_counter
from torchsummary import summary

bar = '-'*64
seed = 42
tc.manual_seed(seed)
device = 'cuda' if tc.cuda.is_available() else 'cpu'
print(f"using {device}")

using cuda


### <u>Multi-Head Attention Layer</u>

The Multi-Head Attention mechanism computes the self-attention between each pair of positions in a sequence. It consists of multiple attention heads that capture different aspects of the input sequence. Revisiting our theory document for this tutorial, we have:

\begin{equation}
  \text{MultiHead} = \text{Concatenate}\left(O^{(1)}, O^{(2)}, \dots, O^{(h)} \right)\cdot W_O,
\end{equation}

\begin{equation}
  O^{(i)} = S^{(i)} \cdot V^{(i)} = \text{softmax}\left( \frac{Q^{(i)} \cdot \left(K^{(i)}\right)^\intercal}{\sqrt(d_k)} \right) \cdot V^{(i)},
\end{equation}
<br>
\begin{equation}
  Q^{(i)} = W_Q^{(i)} \cdot X, \ K^{(i)} = W_K^{(i)} \cdot X, \ V^{(i)} = W_V^{(i)} \cdot X.
\end{equation}

#### <u>Splitting and Combining for Multi-Head Attention</u>
 In the original Transformer model, the dimension $d_\text{model}$ is divided among several heads, say $h$ heads. Thus, for each head, the dimensionality of $d_k$ and $d_v$ becomes $d_\text{model}/h$. The code below simplifies the implementation by initially projecting $X$ into a space of dimension $d_\text{model}$ for $Q,K,V$, with the understanding that this space will be subsequently divided among the heads. This is why the fully-connected layers map from $d_\text{model}$ to $d_\text{model}$, and the actual splitting into multiple heads (and thus into $d_k$ and $d_v$ dimensions for each head) occurs later in the code.
Also, in the Transformer model, each attention head uses a different set of weights to project the input $X$ into $Q,K,V$. In the code below, the weights for head are not explicitly separated in the definition of the fully-connected layers. Instead, they are defined to transform the entire input dimension $d_\text{model}$. It is only after we apply these linear transformations that we split the resulting matrices into multiple heads with the `split_heads()` method. By this reshaping, the model implicitly uses different slices of the transformed input for each head, as if, indeed, each head has its own set of weights.

We can implement the above operations in PyTorch like so:

In [None]:
class MultiHeadAttention(nn.Module):
    """Splits the input into multiple attention heads, applies attention to each head, and then combines the results."""
    def __init__(self, d_model, num_heads):
        super(MultiHeadAttention, self).__init__()
        assert d_model % num_heads == 0, "d_model must be divisible by num_heads"

        # Initialize dimensions:
        self.d_model = d_model      # Model's dimension
        self.num_heads = num_heads  # Number of attention heads
        self.d_k = tc.tensor(
            d_model // num_heads, device=device
        ) # Dimension of each head's key, query, and value

        # Linear layers for transforming inputs:
        self.W_Q = nn.Linear(d_model, d_model) # Query  transformation
        self.W_K = nn.Linear(d_model, d_model) # Key    transformation
        self.W_V = nn.Linear(d_model, d_model) # Value  transformation
        self.W_O = nn.Linear(d_model, d_model) # Output transformation

    def scaled_dot_product_attention(self, q, k, v, mask=None):
        # Calculate attention scores Q*K^T/sqrt(d_k):
        attn_scores = tc.matmul(q, k.transpose(-2, -1)) / tc.sqrt(self.d_k)

        # Apply mask if provided (useful for preventing attention to certain parts like padding):
        if mask is not None:
            attn_scores = attn_scores.masked_fill(mask==0, value=-1e9)

        # Softmax is applied row-wise to obtain the attention matrix S:
        s = tc.softmax(attn_scores, dim=-1)

        # Multiply S by the values to obtain the output:
        return tc.matmul(s, v)

    def split_heads(self, x):
        """Reshapes the input x from
        (batch_size, seq_len, d_model) --> (batch_size, num_heads, seq_length, d_k).
        It enables the model to process multiple attention heads concurrently, allowing for parallel computation."""
        batch_size_, seq_length, d_model = x.size()
        return x.view(batch_size_, seq_length, self.num_heads, self.d_k).transpose(1, 2)

    def combine_heads(self, x):
        """Reshapes the input x from
        (batch_size, num_heads, seq_length, d_k) --> (batch_size, seq_len, d_model).
        This prepares the result for further processing."""
        # Combine the multiple heads back to original shape:
        batch_size_, _, seq_length, d_k = x.size()
        return x.transpose(1, 2).contiguous().view(batch_size_, seq_length, self.d_model)

    def forward(self, q, k, v, mask=None):
        # Apply linear transformations and split heads:
        q = self.split_heads(self.W_Q(q))
        k = self.split_heads(self.W_K(k))
        v = self.split_heads(self.W_V(v))
        # Perform scaled dot-product (self) attention:
        o = self.scaled_dot_product_attention(q, k, v, mask)
        # Combine heads and apply output transformation:
        return self.W_O(self.combine_heads(o))

### <u>Position-Wise Feed-Forward Network (FFN)</u>
Next, let us define the Feed Forward Network (FFN) block. It is a position-wise feed-forward neural network that consists of two linear layers with a ReLU activation function in between:

\begin{equation}
  \text{FFN}(X) = \text{ReLU}\left(X \cdot W_1 + b1\right)\cdot W_2 + b2.
\end{equation}

In the context of transformer models, this feed-forward network is applied to each position separately and identically. It helps in transforming the features learned by the attention mechanisms within the transformer, acting as an additional processing step for the attention outputs:

In [None]:
class PositionWiseFeedForward(nn.Module):
    """Implementation of the Feed Forward Network (FFN block)."""
    def __init__(self, d_model, d_ff):
        super(PositionWiseFeedForward, self).__init__()
        self.fc1 = nn.Linear(d_model, d_ff)
        self.fc2 = nn.Linear(d_ff, d_model)
        self.relu = nn.ReLU()

    def forward(self, x):
        return self.fc2(self.relu(self.fc1(x)))

### <u>Positional Encoding Layer</u>

Same as in the previous notebook:

In [None]:
loge4 = 4*tc.log(tc.tensor(10))

class PositionalEncoding(nn.Module):
    """Using sine and cosine functions of different frequencies to generate the positional encoding."""
    def __init__(self, d_model, max_seq_length):
        super(PositionalEncoding, self).__init__()
        position = tc.arange(0, max_seq_length, dtype=tc.float).unsqueeze(1)
        div_term = tc.exp(tc.arange(0, d_model, 2).float() * -(loge4 / d_model))
        pe = tc.zeros(max_seq_length, d_model)
        pe[:, 0::2] = tc.sin(position * div_term)
        pe[:, 1::2] = tc.cos(position * div_term)
        self.register_buffer('pe', pe.unsqueeze(0))

    def forward(self, x):
        return x + self.pe[:, :x.size(1)]

### <u>Encoder Layer</u>

![](fig/encoder.png)
![](https://drive.google.com/uc?export=view&id=1YzYKw4JoiaCe5ECDrNxsfjDQOIkOT48a)

The EncoderLayer class defines a single layer of the transformer's encoder. It encapsulates a multi-head self-attention mechanism followed by position-wise feed-forward neural network, with residual connections, layer normalization, and dropout applied as appropriate. These components together allow the encoder to capture complex relationships in the input data and transform them into a useful representation for downstream tasks. Typically, multiple such encoder layers are stacked to form the complete encoder part of a transformer model:

In [None]:
class EncoderLayer(nn.Module):
    def __init__(self, d_model, num_heads, d_ff, dropout):
        super(EncoderLayer, self).__init__()
        self.self_attn = MultiHeadAttention(d_model, num_heads)
        self.feed_forward = PositionWiseFeedForward(d_model, d_ff)
        self.norm1 = nn.LayerNorm(d_model)
        self.norm2 = nn.LayerNorm(d_model)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x, mask):
        o = self.self_attn(q=x, k=x, v=x, mask=mask)
        x = self.norm1(x + self.dropout(o))
        ffn_output = self.feed_forward(x)
        return self.norm2(x + self.dropout(ffn_output))

### <u>Decoder Layer</u>

![](fig/decoder.png)
![](https://drive.google.com/uc?export=view&id=1blrXGkNx_uZnXSNk34_inlyActhVGt2i)

The DecoderLayer class defines a single layer of the transformer's decoder. It consists of a multi-head self-attention mechanism, a multi-head cross-attention mechanism (that attends to the encoder's output), a position-wise feed-forward neural network, and the corresponding residual connections, layer normalization, and dropout layers. This combination enables the decoder to generate meaningful outputs based on the encoder's representations, taking into account both the target sequence and the source sequence. As with the encoder, multiple decoder layers are typically stacked to form the complete decoder part of a transformer model:

In [None]:
class DecoderLayer(nn.Module):
    def __init__(self, d_model, num_heads, d_ff, dropout):
        super(DecoderLayer, self).__init__()
        self.self_attn = MultiHeadAttention(d_model, num_heads)
        self.cross_attn = MultiHeadAttention(d_model, num_heads)
        self.feed_forward = PositionWiseFeedForward(d_model, d_ff)
        self.norm1 = nn.LayerNorm(d_model)
        self.norm2 = nn.LayerNorm(d_model)
        self.norm3 = nn.LayerNorm(d_model)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x, enc_output, src_mask, tgt_mask):
        o = self.self_attn(q=x, k=x, v=x, mask=tgt_mask)
        x = self.norm1(x + self.dropout(o))
        o = self.cross_attn(q=x, k=enc_output, v=enc_output, mask=src_mask)
        x = self.norm2(x + self.dropout(o))
        ffn_output = self.feed_forward(x)
        return self.norm3(x + self.dropout(ffn_output))

### <u>Transformer</u>

![](fig/transformer.png)
![](https://drive.google.com/uc?export=view&id=1L-ZrIzWPkF17fnBtS0FJSCTMz5s53Tyy)

Next, the Encoder and Decoder blocks are brought together to construct the comprehensive Transformer model. The Transformer class brings together the various components of a Transformer model, including the embeddings, positional encoding, encoder layers, and decoder layers. It provides a convenient interface for training and inference, encapsulating the complexities of multi-head attention, feed-forward networks, and layer normalization.

### <u> Masking </u>

The source mask (`src_mask`) in the code below is generated just for completeness; it serves no purpose in the subsequent operations here. An encoder mask would be necessary in several other scenarios, such as, Variable-Length Input Sequences with Padding, Avoiding Specific Tokens, Sequence-to-Sequence Tasks with Source Filtering, Noise Reduction, Implementing Certain Types of Attention (e.g., sparse). The target mask (`tgt_mask`), as explained in the previous notebook, is a boolean mask with `True` for unmasked and `False` for masked values.

+ `tgt_mask` identifies which positions are real data (`True`) and which are padding (`False`).
+ `no_peak_mask` prevents positions from seeing future positions in the sequence (`True` means block, `False` means allow).
+ The `&` combines these two masks (bitwise AND). A position must be real data and not in the future to be allowed (`True` in both masks becomes `True`; any `False` in either mask becomes `False` in the combined mask).

In [None]:
def generate_mask(src, tgt):
    # y.unsqueeze(p): add an empty dimension to y at position p
    src_mask = (src != 0).unsqueeze(1).unsqueeze(2)
    tgt_mask = (tgt != 0).unsqueeze(1).unsqueeze(3)
    seq_length = tgt.size(1)
    no_peak_mask = (1 - tc.triu(tc.ones(1, seq_length, seq_length), diagonal=1)).bool().to(device)
    tgt_mask = tgt_mask & no_peak_mask
    return src_mask, tgt_mask

class Transformer(nn.Module):
    def __init__(self, src_voc_size, tgt_voc_size, d_model, num_heads, num_layers, d_ff, max_seq_length, dropout):
        super(Transformer, self).__init__()
        # Same as before:
        self.encoder_embedding = nn.Embedding(num_embeddings=src_voc_size, embedding_dim=d_model)
        self.decoder_embedding = nn.Embedding(num_embeddings=tgt_voc_size, embedding_dim=d_model)
        self.positional_encoding = PositionalEncoding(d_model, max_seq_length)
        # Need to add layer normalization manually now:
        self.enc_output_norm = nn.LayerNorm(d_model)
        self.dec_output_norm = nn.LayerNorm(d_model)
        # Encoder architecture:
        self.encoder_layers = nn.ModuleList(
            [EncoderLayer(d_model, num_heads, d_ff, dropout) for _ in range(num_layers)]
        )
        # Decoder architecture:
        self.decoder_layers = nn.ModuleList(
            [DecoderLayer(d_model, num_heads, d_ff, dropout) for _ in range(num_layers)]
        )
        # Same as before:
        self.W_O = nn.Linear(d_model, tgt_voc_size)
        self.dropout = nn.Dropout(dropout)

    def forward(self, src, tgt):
        # Generate target mask (source mask is empty):
        src_mask, tgt_mask = generate_mask(src, tgt)
        # Same as before:
        src = self.dropout(self.positional_encoding(self.encoder_embedding(src)))
        tgt = self.dropout(self.positional_encoding(self.decoder_embedding(tgt)))
        # Pass through encoder:
        enc_output = src
        for enc_layer in self.encoder_layers:
            enc_output = enc_layer(enc_output, src_mask)
        enc_output = self.enc_output_norm(enc_output)
        # Pass through decoder:
        dec_output = tgt
        for dec_layer in self.decoder_layers:
            dec_output = dec_layer(dec_output, enc_output, src_mask, tgt_mask)
        dec_output = self.dec_output_norm(dec_output)
        return self.W_O(dec_output)

Let's create an instance of our transformer model and print its architecture and trainable parameters:

In [None]:
# Hyperparameters:
src_vocab_size = 5000
tgt_vocab_size = 5000
dmodel = 256
max_seq_len = 512
nheads = 4
nlayers = 2
dff = 512
drop = 0.1

# Model instance:
transformer = Transformer(
    src_voc_size=src_vocab_size,
    tgt_voc_size=tgt_vocab_size,
    d_model=dmodel,
    num_heads=nheads,
    num_layers=nlayers,
    d_ff=dff,
    max_seq_length=max_seq_len,
    dropout=drop
).to(device)

total_params = sum(p.numel() for p in transformer.parameters() if p.requires_grad)
print(f"Total trainable parameters = {total_params:,}\n(same as before - this is a good sign!)")

# Does not work in colab:
_ = summary(model=transformer)

Total trainable parameters = 6,481,800
(same as before - this is a good sign!)
Layer (type:depth-idx)                        Param #
├─Embedding: 1-1                              1,280,000
├─Embedding: 1-2                              1,280,000
├─PositionalEncoding: 1-3                     --
├─LayerNorm: 1-4                              512
├─LayerNorm: 1-5                              512
├─ModuleList: 1-6                             --
|    └─EncoderLayer: 2-1                      --
|    |    └─MultiHeadAttention: 3-1           263,168
|    |    └─PositionWiseFeedForward: 3-2      262,912
|    |    └─LayerNorm: 3-3                    512
|    |    └─LayerNorm: 3-4                    512
|    |    └─Dropout: 3-5                      --
|    └─EncoderLayer: 2-2                      --
|    |    └─MultiHeadAttention: 3-6           263,168
|    |    └─PositionWiseFeedForward: 3-7      262,912
|    |    └─LayerNorm: 3-8                    512
|    |    └─LayerNorm: 3-9                  

## <u>Training our Custom Transformer Model</u>

As in previous tutorials, we can use the cross-entropy loss to train out transformer and Adam as the optimizer:

In [None]:
# Training hyperparameters:
lr_ = 1e-4
betas_ = (0.9, 0.98)
eps_ = 1e-9
num_epochs = 30
batch_size = 32

# Same as before:
loss_fn = nn.CrossEntropyLoss(ignore_index=0)
optimizer = tc.optim.Adam(params=transformer.parameters(), lr=lr_, betas=betas_, eps=eps_)

In [None]:
# Same as before:
src_data = tc.randint(1, src_vocab_size, (batch_size, max_seq_len), device=device)
tgt_data = tc.randint(1, tgt_vocab_size, (batch_size, max_seq_len), device=device)

In [None]:
tic = perf_counter()
# Same as before:
transformer.train()
pbar = tqdm(range(num_epochs), desc='Epochs')
for epoch in pbar:
    optimizer.zero_grad()
    output = transformer(src_data, tgt_data[:, :-1])
    output_aligned = output.contiguous().view(-1, tgt_vocab_size)
    target_aligned = tgt_data[:, 1:].contiguous().view(-1)
    loss = loss_fn(output_aligned, target_aligned)
    loss.backward()
    optimizer.step()
    pbar.set_description(f"Epoch: {epoch+1}/{num_epochs}, Loss: {loss.item():.4f}")
toc = perf_counter()
print(f"Time elapsed: {toc-tic:.4f} seconds.")

Epoch: 30/30, Loss: 8.3157: 100%|██████████| 30/30 [00:08<00:00,  3.45it/s]

Time elapsed: 8.7087 seconds.





After or during training the model, its performance can be evaluated on a validation dataset. The following is an example of how this could be done. Likewise, `transformer.eval()` puts the transformer model in evaluation mode, which is important in order to turn off certain behaviors like dropout that are only used during training.

In [None]:
# Same as before:
val_src_data = tc.randint(1, src_vocab_size, (batch_size, max_seq_len), device=device)
val_tgt_data = tc.randint(1, tgt_vocab_size, (batch_size, max_seq_len), device=device)

transformer.eval()
with tc.no_grad():
    output = transformer(val_src_data, val_tgt_data[:, :-1])
    output_aligned = output.contiguous().view(-1, tgt_vocab_size)
    target_aligned = val_tgt_data[:, 1:].contiguous().view(-1)
    val_loss = loss_fn(output_aligned, target_aligned)
    print(f"Validation Loss: {val_loss.item():.4f}")

Validation Loss: 8.6754


### <u> Conclusion </u>

In this tutorial, we demonstrated how to construct a Transformer model using PyTorch from scratch, one of the most versatile tools for deep learning. For additional begginer-level sources on the Transformer, see: <br>
https://www.datacamp.com/tutorial/an-introduction-to-using-transformers-and-hugging-face

