## Transformer

In [2]:
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F

<img src="../images/self-attention1.png" alt="perplexity" width="1000">

### T1. Implement Self-Attention for a Single Head

First, prepare the input data in shape $N\times d$

*Hint*: 
- Use `torch.randn` to generate a torch tensor in the correct shape.

In [3]:
N = 3
d = 512
torch.manual_seed(0)

### START YOUR CODE ###
X = torch.randn(N, d)
### END YOUR CODE ###


# Test 
assert isinstance(X, torch.Tensor)
print('X.size():', X.size())
print('X[:,0]:', X[:,0].data.numpy())

# You should expect to see the following results:
# X.shape: (3, 512)
# X[:,0]: [-1.1258398  -0.54607284 -1.0840825 ]

X.size(): torch.Size([3, 512])
X[:,0]: [-1.1258398  -0.54607284 -1.0840825 ]


<img src="../images/self-attention.png" alt="perplexity" width="1000">

Then, initialize weight matrices $W^Q$, $W^K$, and $W^V$. We assume they are for a single head, so $d_k=d_v=d$

Using $W^Q$ as an example
- First initialize an empty tensor `W_q` in the dimension of $d\times d_k$, using the `torch.empty()` function. Then initialize it with `nn.init.xavier_normal_()`.
- After `W_q` is initialized, obtain the query matrix `Q` with a multiplication between `X` and `W_q`, using `torch.matmul()`.

In [4]:
torch.manual_seed(0) # Do not remove this line

n_heads = 1

### START YOUR CODE ###
d_k = d // n_heads # Compute d_k

W_q = torch.empty(d, d_k)
W_k = torch.empty(d, d_k)
W_v = torch.empty(d, d_k)

nn.init.xavier_normal_(W_q)
nn.init.xavier_normal_(W_k)
nn.init.xavier_normal_(W_v)

# Compute Q, K, V
Q = torch.matmul(X, W_q)
K = torch.matmul(X, W_k)
V = torch.matmul(X, W_v)
### END YOUR CODE ###


# Test
assert Q.size() == (N, d_k)
assert K.size() == (N, d_k)
assert V.size() == (N, d_k)

print('Q.size():', Q.size())
print('Q[:,0]:', Q[:,0].data.numpy())
print('K.size():', K.size())
print('K[:,0]:', K[:,0].data.numpy())
print('V.size():', V.size())
print('V[:,0]:', V[:,0].data.numpy())

# You should expect to see the following results:
# Q.size(): torch.Size([3, 512])
# Q[:,0]: [-0.45352045 -0.40904033  0.18985942]
# K.size(): torch.Size([3, 512])
# K[:,0]: [ 1.509987   -0.5503683   0.44788954]
# V.size(): torch.Size([3, 512])
# V[:,0]: [ 0.43034226  0.00162293 -0.1317436 ]

Q.size(): torch.Size([3, 512])
Q[:,0]: [-0.45352072 -0.40904027  0.18985987]
K.size(): torch.Size([3, 512])
K[:,0]: [ 1.5099866  -0.550368    0.44788927]
V.size(): torch.Size([3, 512])
V[:,0]: [ 0.4303431   0.00162256 -0.13174425]


Lastly, compute the attention scores $\alpha$ and the weighted output

Following the equation:
$$
\text{Attention}(Q, K, V) = \text{softmax}\left(\frac{QK^\top}{\sqrt{d_k}}\right)V
$$

*Hint*:
- $\alpha = \text{softmax}(\frac{QK^\top}{\sqrt{d_k}})$, where you can use `torch.nn.functional.softmax()` to compute the softmax. Pay attention to the `dim` parameter.
- The weighted output is the multiplication between $\alpha$ and $V$. Pay attention to their dimensions: $\alpha$ is of shape $N\times N$, and $\alpha_{ij}$ is the attention score from the $i$-th to the $j$-th word. 
- The weighted output is of shape $N\times d_v$, and here we assume $d_k=d_v$.

In [5]:
### START YOUR CODE ###
scale = torch.sqrt(torch.tensor(d_k, dtype=torch.float32))
attention_scores = torch.matmul(Q, K.T) / scale
alpha = F.softmax(attention_scores, dim=-1)
output = torch.matmul(alpha, V)
### END YOUR CODE ###


# Test
assert alpha.size() == (N, N)
assert output.size() == (N, d_k)

print('alpha.size():', alpha.size())
print('alpha:', alpha.data.numpy())
print('output.size():', output.size())
print('output[:,0]:', output[:,0].data.numpy())

# You should expect to see the following output:
# alpha.size(): torch.Size([3, 3])
# alpha: [[0.78344566 0.14102352 0.07553086]
#  [0.25583813 0.18030964 0.5638523 ]
#  [0.09271843 0.2767209  0.63056064]]
# output.size(): torch.Size([3, 512])
# output[:,0]: [ 0.32742795  0.03610666 -0.04272257]

alpha.size(): torch.Size([3, 3])
alpha: [[0.7834456  0.14102356 0.07553087]
 [0.2558382  0.18030982 0.563852  ]
 [0.0927186  0.2767208  0.6305606 ]]
output.size(): torch.Size([3, 512])
output[:,0]: [ 0.32742846  0.0361065  -0.04272293]


### T2. Mask Future Tokens

First, create a binary mask tensor of size $N\times N$, which is lower triangular, with the diagonal and upper triangle set to 0.

*Hint*: Use `torch.tril` and `torch.ones`.

In [6]:
### START YOUR CODE ###
mask =  torch.tril(torch.ones(N, N))
### END YOUR CODE ###

# Test
print('mask:', mask.data.numpy())

# You should expect to see the following output:
# mask: [[1. 0. 0.]
#  [1. 1. 0.]
#  [1. 1. 1.]]

mask: [[1. 0. 0.]
 [1. 1. 0.]
 [1. 1. 1.]]


Use the mask to fill the corresponding future cells in $QK^\top$ with $-\infty$ (`-np.inf`), and then pass it to softmax to compute the new attention scores.

*Hint*: Use `torch.Tensor.masked_fill` function to selectively fill the upper triangle area of the result.

<img src="../images/mask.png" alt="perplexity" width="1000">

In [7]:
### START YOUR CODE ###
masked_attention_scores = attention_scores.masked_fill(mask == 0, -np.inf)
new_alpha = F.softmax(masked_attention_scores, dim=-1)
### END YOUR CODE ###


# Test
print('new_alpha:', new_alpha.data.numpy())

# You should expect to see the following results:
# new_alpha: [[1.         0.         0.        ]
#  [0.5865858  0.41341412 0.        ]
#  [0.09271843 0.2767209  0.63056064]]

new_alpha: [[1.         0.         0.        ]
 [0.58658576 0.4134143  0.        ]
 [0.0927186  0.2767208  0.6305606 ]]


Lastly, the output should also be updated:

In [8]:
### START YOUR CODE ###
new_output = torch.matmul(new_alpha, V)
### END YOUR CODE ###

# Test
print('new_output.size():', new_output.size())
print('new_output[:,0]:', new_output[:,0].data.numpy())

# You should expect to see the following results:
# new_output.size(): torch.Size([3, 512])
# new_output[:,0]: [ 0.43034226  0.2531036  -0.04272257]

new_output.size(): torch.Size([3, 512])
new_output[:,0]: [ 0.4303431   0.2531039  -0.04272293]


### T3. Integrate Multiple Heads

Finally, integrate the above implemented functions into the `MultiHeadAttention` class.

**Note**:

- In this class, the weight matrices `W_q`, `W_k`, and `W_v` are defined as tensors of size $d\times d$. Thus, the output $Q=XW^Q$ is of size $N\times d$.

- Then we reshape $Q$ (and $K$, $V$ as well) into the tensor `Q_` of shape $N\times h\times d_k$, where $h$ is the number of heads (`n_heads`) and $d_k = d // h$. Similar operations are applied to $K$ and $V$. 

- The multiplication $QK^\top$ is now between two tensors of shape $N\times h\times d_k$, `Q_` and `K_`, and the output is of size $h\times N \times N$. Thus, you need to use `torch.matmul` and `torch.permute` properly to make the dimensions of `Q_`, `K_`, and `V_` be in the correct order.

- Also, remember to apply the future mask to each attention head's output. You can use `torch.repeat` to replicate the mask for `n_heads` times.

In [9]:
class MultiHeadAttention(nn.Module):
    def __init__(self, d_model, n_heads):
        super(MultiHeadAttention, self).__init__()
        assert d_model % n_heads == 0
        self.d_k = d_model // n_heads
        self.n_heads = n_heads

        self.W_q = nn.Parameter(torch.empty(d_model, d_model))
        self.W_k = nn.Parameter(torch.empty(d_model, d_model))
        self.W_v = nn.Parameter(torch.empty(d_model, d_model))

        nn.init.xavier_normal_(self.W_q)
        nn.init.xavier_normal_(self.W_k)
        nn.init.xavier_normal_(self.W_v)
        
    def forward(self, X):
        N = X.size(0)
        
        ### START YOUR CODE ###
        # 1. Compute Q, K, V
        Q = torch.matmul(X, self.W_q)  # (N, d_model)
        K = torch.matmul(X, self.W_k)  # (N, d_model)
        V = torch.matmul(X, self.W_v)  # (N, d_model)

        # 2. Reshape into multi-head format: (N, n_heads, d_k)
        Q_ = Q.view(N, self.n_heads, self.d_k).permute(1, 0, 2)  # (n_heads, N, d_k)
        K_ = K.view(N, self.n_heads, self.d_k).permute(1, 0, 2)  # (n_heads, N, d_k)
        V_ = V.view(N, self.n_heads, self.d_k).permute(1, 0, 2)  # (n_heads, N, d_k)

        # 3. Raw attention scores: QK^T / sqrt(d_k)
        attention_scores = torch.matmul(Q_, K_.transpose(-2, -1)) / torch.sqrt(torch.tensor(self.d_k, dtype=torch.float32))  # (n_heads, N, N)

        # 4. Apply the mask (for each head)
        mask = torch.tril(torch.ones(N, N))  # (N, N) lower triangular mask
        mask = mask.unsqueeze(0).repeat(self.n_heads, 1, 1)  # Repeat for each head (n_heads, N, N)
        attention_scores = attention_scores.masked_fill(mask == 0, -np.inf)

        # 5. Softmax over the attention scores
        alpha = F.softmax(attention_scores, dim=-1)  # (n_heads, N, N)

        # 6. Multiply by V_ to get the weighted output
        output = torch.matmul(alpha, V_)  # (n_heads, N, d_k)

        # 7. Concatenate heads: (N, d_model)
        output = output.permute(1, 0, 2).contiguous().view(N, -1)  # (N, d_model)
        ### END YOUR CODE ###

        return output

In [10]:
# Test
torch.manual_seed(0)

multi_head_attn = MultiHeadAttention(d, n_heads=1)
output = multi_head_attn(X)

assert output.size() == (N, d)
print('output.size():', output.size())
print('output[:,0]:', output[:,0].data.numpy())

# You should expect to see the following results:
# output.size(): torch.Size([3, 512])
# output[:,0]: [ 0.43034226  0.2531036  -0.04272257]

output.size(): torch.Size([3, 512])
output[:,0]: [ 0.4303431   0.2531039  -0.04272293]


**Note** that the above output size and values should be the same as the previous one, as we used `n_heads=1`.

<img src="../images/transformer.png" alt="perplexity" width="500">

In [11]:
class FeedForward(nn.Module):
    def __init__(self, d_model, d_ff):
        super(FeedForward, self).__init__()
        self.fc1 = nn.Linear(d_model, d_ff)
        self.fc2 = nn.Linear(d_ff, d_model)

    def forward(self, x):
        return self.fc2(F.relu(self.fc1(x)))

class PositionalEncoding(nn.Module):
    def __init__(self, d_model, max_len=5000):
        super(PositionalEncoding, self).__init__()
        pe = torch.zeros(max_len, d_model)
        position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-np.log(10000.0) / d_model))
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        pe = pe.unsqueeze(0)  # Add batch dimension
        self.register_buffer('pe', pe)

    def forward(self, x):
        return x + self.pe[:, :x.size(1), :]

class TransformerEncoderLayer(nn.Module):
    def __init__(self, d_model, n_heads, d_ff):
        super(TransformerEncoderLayer, self).__init__()
        self.attention = MultiHeadAttention(d_model, n_heads)
        self.ff = FeedForward(d_model, d_ff)
        self.norm1 = nn.LayerNorm(d_model)
        self.norm2 = nn.LayerNorm(d_model)

    def forward(self, x):
        # 1. Self-attention with residual connection and layer norm
        attn_output = self.attention(x)
        x = self.norm1(x + attn_output)

        # 2. Feed-forward network with residual connection and layer norm
        ff_output = self.ff(x)
        x = self.norm2(x + ff_output)

        return x

class TransformerEncoder(nn.Module):
    def __init__(self, d_model, n_heads, d_ff, num_layers, vocab_size, max_len):
        super(TransformerEncoder, self).__init__()
        self.embedding = nn.Embedding(vocab_size, d_model)
        self.pos_encoding = PositionalEncoding(d_model, max_len)
        self.layers = nn.ModuleList([TransformerEncoderLayer(d_model, n_heads, d_ff) for _ in range(num_layers)])
        self.fc_out = nn.Linear(d_model, vocab_size)

    def forward(self, src):
        # Embedding and positional encoding
        x = self.embedding(src)  # (N, seq_len, d_model)
        x = self.pos_encoding(x)

        # Pass through each Transformer encoder layer
        for layer in self.layers:
            x = layer(x)

        # Output layer (project to vocabulary size)
        output = self.fc_out(x)  # (N, seq_len, vocab_size)
        return output

In [None]:
class TransformerDecoderLayer(nn.Module):
    def __init__(self, d_model, n_heads, d_ff):
        super(TransformerDecoderLayer, self).__init__()
        self.self_attention = MultiHeadAttention(d_model, n_heads)
        self.cross_attention = MultiHeadAttention(d_model, n_heads)
        self.ff = FeedForward(d_model, d_ff)
        self.norm1 = nn.LayerNorm(d_model)
        self.norm2 = nn.LayerNorm(d_model)
        self.norm3 = nn.LayerNorm(d_model)

    def forward(self, tgt, memory):
        # Self-attention on the target sequence
        tgt_attn = self.self_attention(tgt)
        tgt = self.norm1(tgt + tgt_attn)

        # Cross-attention between target and encoder memory
        cross_attn = self.cross_attention(tgt, memory)
        tgt = self.norm2(tgt + cross_attn)

        # Feed-forward network
        ff_output = self.ff(tgt)
        tgt = self.norm3(tgt + ff_output)

        return tgt


class TransformerDecoder(nn.Module):
    def __init__(self, d_model, n_heads, d_ff, num_layers, vocab_size, max_len):
        super(TransformerDecoder, self).__init__()
        self.embedding = nn.Embedding(vocab_size, d_model)
        self.pos_encoding = PositionalEncoding(d_model, max_len)
        self.layers = nn.ModuleList([TransformerDecoderLayer(d_model, n_heads, d_ff) for _ in range(num_layers)])
        self.fc_out = nn.Linear(d_model, vocab_size)

    def forward(self, tgt, memory):
        # Embedding and positional encoding for target sequence
        tgt_emb = self.embedding(tgt)
        tgt_emb = self.pos_encoding(tgt_emb)

        # Pass through each Decoder layer
        for layer in self.layers:
            tgt_emb = layer(tgt_emb, memory)

        # Output layer
        output = self.fc_out(tgt_emb)
        return output
