In [1]:
import torch
import torch.nn as nn
import torch.nn.functional as F

# 1. Implement Self Attention Mechanism

$$ \text{SoftMax} \left( \frac{\frac{Q}{T}K^T}{\sqrt{d_K}}\right) V $$

In [3]:
class ScaledDotProductAttention(nn.Module):
    """ Scaled Dot Product Attention """
    def __init__(self, temperature, attn_dropout = 0.1):
        super(ScaledDotProductAttention).__init__()
        self.temperature = temperature
        self.dropout = nn.Dropout(attn_dropout)

    def forward(self, q, k, v, mask = None):
        attn = torch.matmul(q/self.temperature, k.transpose(2, 3))

        if mask is not None:
            attn = attn.masked_fill(mask == 0, -1e9)

        attn = self.dropout(F.softmax(attn, dim = -1))
        output = torch.matmul(attn, v)

        return output, attn

# 2. Implement Multi-Head Attention

Run several scaled dot product attention, each mapping to a different feature map, meaning each has its own Q, K, and V matrices

In [9]:
class MultiHeadaAttention(nn.Module):
    """ Multi-Head Attention """
    def __init__(self, n_head, d_model, d_k, d_v, dropout = 0.1):
        super(MultiHeadaAttention).__init__()

        self.n_head = n_head
        self.d_k = d_k
        self.d_v = d_v
        
        # Linear Transformations: y = x @ A^T + b
        # Where the values inside A are trainable

        # d_model - dictionary size
        # n_head - number of attention heads
        # d_k - feaure map dimension

        self.w_qs = nn.Linear(d_model, n_head * d_k, bias = False) 
        self.w_ks = nn.Linear(d_model, n_head * d_k, bias = False)
        self.w_vs = nn.Linear(d_model, n_head * d_v, bias = False)
        self.fc = nn.Linear(n_head * d_v, d_model, bias = False)

        self.attention = ScaledDotProductAttention(temperature = d_k ** 0.5)

        self.dropout = nn.Dropout(dropout)
        self.layer_norm = nn.LayerNorm(d_model, eps = 1e-6)

    def forward(self, q, k, v, mask = None):

        d_k, d_v, n_head = self.d_k, self.d_v, self.n_head
        sz_b, len_q, len_k, len_v = q.size(0), q.size(1), k.size(1), v.size(1)

        residual = q

        # Multiply q by repective weights and reshape
        q = self.w_qs(q).view(sz_b, len_q, n_head, d_k) # Splits the matrix into the various attention heads
        k = self.w_ks(k).view(sz_b, len_k, n_head, d_k)
        v = self.w_vs(v).view(sz_b, len_v, n_head, d_v)

        # Transpose values to calculate self attention: b x n x lq x dv
        q, k, v = q.transpose(1, 2), k.transpose(1, 2), v.transpose(1, 2)

        if mask is not None:
            mask = mask.unsqueeze(1) # Head axis broadcasting

        q, attn = self.attention(q, k, v, mask = mask)

        # Transpose to move the head dimension back: b x lq x n x dv
        # Combine last two dimensions to concatenate all the heads together: b x lq x (n*dv)

        q = q.transpose(1, 2).contiguous().view(sz_b, len_q, -1)
        q = self.dropout(self.fc(q))
        q += residual

        q = self.layer_norm(q)

        return q, attn

## 2.1 Multilayer Perceptron after the multi-head attention

In [8]:
class PositionwiseFeedForward(nn.Module):
    """ Two-feed-forward-layer module """

    def __init__(self, d_in, d_hid, dropout = 0.1):
        super(PositionwiseFeedForward).__init__()
        self.w_1 = nn.Linear(d_in, d_hid) # position-wise
        self.w_2 = nn.Linear(d_hid, d_in) # position-wise
        self.layer_norm = nn.LayerNorm(d_in, eps = 1e-6)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x):

        residual = x

        x = self.w_2(F.relu(self.w_1(x)))
        x = self.dropout(x)
        x += residual

        x = self.layer_norm(x)

        return x

# 3. Encoder/Decoder Implementations

## 3.1 Encoder

In [11]:
class EncoderLayer(nn.Module):
    """ Composed of Two Layers """
    def __init__(self, d_model, d_inner, n_head, d_k, d_v, dropout = 0.1):
        super(EncoderLayer).__init__()
        self.slf_attn = MultiHeadaAttention(n_head, d_model, d_k, d_v, dropout)
        self.pos_ffn = PositionwiseFeedForward(d_model, d_inner, dropout)

    def forward(self, enc_input, slf_attn_mask = None):
        enc_output, enc_slf_attn = self.slf_attn(enc_input, enc_input, mask = slf_attn_mask)
        enc_output = self.pos_ffn(enc_output)

        return enc_output, enc_slf_attn

## 3.2 Decoder

In [12]:
class DecoderLayer(nn.Module):
    """ Composed of Three Layers """
    def __init__(self, d_model, d_inner, n_head, d_k, d_v, dropout = 0.1):
        super(DecoderLayer).__init__()
        self.slf_attn = MultiHeadAttention(n_head, d_model, d_k, d_v, dropout)
        self.enc_layer = MultiHeadAttention(n_head, d_model, d_k, d_v, dropout)
        self.pos_ffn = PositionwiseFeedForward(d_model, d_inner, dropout)