In [69]:
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F

# word embedding
batch_size = 2

# word vocab size
max_num_src_words = 8
max_num_tgt_words = 8
model_dim = 8

# max sequence length
max_src_seq_len = 5
max_tgt_seq_len = 5
max_position_len = 5

# src_len = torch.randint(2, 5, (batch_size,))
# tgt_len = torch.randint(2, 5, (batch_size,))
src_len = torch.Tensor([2, 4]).to(torch.int32)
tgt_len = torch.Tensor([4, 3]).to(torch.int32)

# word index sequence
src_seq = torch.cat(
    [torch.unsqueeze(F.pad(torch.randint(1, max_num_src_words, (L,)), (0, max_src_seq_len - L)), 0) for L in src_len])
tgt_seq = torch.cat(
    [torch.unsqueeze(F.pad(torch.randint(1, max_num_tgt_words, (L,)), (0, max_tgt_seq_len - L)), 0) for L in tgt_len])

# Construct word embedding
src_embedding_table = nn.Embedding(max_num_src_words + 1, model_dim)
tgt_embedding_table = nn.Embedding(max_num_tgt_words + 1, model_dim)
src_embedding = src_embedding_table(src_seq)
tgt_embedding = tgt_embedding_table(tgt_seq)

# Construct position embedding
pos_mat = torch.arange(max_position_len).reshape((-1, 1))
i_mat = torch.pow(10000, torch.arange(0, 8, 2).reshape((1, -1)) / model_dim)
pe_embedding_table = torch.zeros(max_position_len, model_dim)
pe_embedding_table[:, 0::2] = torch.sin(pos_mat / i_mat)
pe_embedding_table[:, 1::2] = torch.cos(pos_mat / i_mat)

pe_embedding = nn.Embedding(max_position_len, model_dim)
pe_embedding.weight = nn.Parameter(pe_embedding_table, requires_grad=False)

src_positions = torch.arange(max_src_seq_len).expand(batch_size, max_src_seq_len)
src_pe = pe_embedding(src_positions)
src_pe_embedding = src_embedding + src_pe

tgt_positions = torch.arange(max_tgt_seq_len).expand(batch_size, max_tgt_seq_len)
tgt_pe = pe_embedding(tgt_positions)
tgt_pe_embedding = tgt_embedding + tgt_pe

# self-attention mask of encoder
# The shape of mask: [batch_size, max_src_len, max_src_len], value: 1 or -inf
valid_encoder_pos = torch.unsqueeze(
    torch.cat([torch.unsqueeze(F.pad(torch.ones(L), (0, max(src_len) - L)), 0) for L in src_len]), 2)
valid_encoder_pos_matrix = torch.bmm(valid_encoder_pos, valid_encoder_pos.transpose(1, 2))
invalid_encoder_pos_matrix = 1 - valid_encoder_pos_matrix
mask_encoder_self_attention = invalid_encoder_pos_matrix.to(torch.bool)

score = torch.randn(batch_size, max(src_len), max(src_len))
masked_score = score.masked_fill(mask_encoder_self_attention, -1e9)
prob = F.softmax(masked_score, -1)

print(prob)

tensor([[[0.3923, 0.6077, 0.0000, 0.0000],
         [0.0638, 0.9362, 0.0000, 0.0000],
         [0.2500, 0.2500, 0.2500, 0.2500],
         [0.2500, 0.2500, 0.2500, 0.2500]],

        [[0.3104, 0.1882, 0.0395, 0.4620],
         [0.1044, 0.4549, 0.0348, 0.4058],
         [0.3452, 0.2657, 0.1628, 0.2263],
         [0.0773, 0.1452, 0.1420, 0.6355]]])


In [92]:
# mask of intra-attention
valid_decoder_pos = torch.unsqueeze(
    torch.cat([torch.unsqueeze(F.pad(torch.ones(L), (0, max(tgt_len) - L)), 0) for L in tgt_len]), 2)
valid_cross_pos_matrix = torch.bmm(valid_decoder_pos, valid_encoder_pos.transpose(1, 2))
invalid_cross_pos_matrix = 1 - valid_cross_pos_matrix
mask_cross_attention = invalid_cross_pos_matrix.to(torch.bool)

valid_decoder_tril_matrix = torch.cat(
    [torch.unsqueeze(F.pad(torch.tril(torch.ones((L, L))), (0, max(tgt_len) - L, 0, max(tgt_len) - L)), 0) for L in
     tgt_len])
invalid_decoder_tri_matrix = 1 - valid_decoder_tril_matrix
invalid_decoder_tri_matrix = invalid_decoder_tri_matrix.to(torch.bool)

score = torch.randn(batch_size, max(tgt_len), max(tgt_len))
masked_score = score.masked_fill(invalid_decoder_tri_matrix, -1e9)
prob = F.softmax(masked_score, -1)
print(prob)

tensor([[[1.0000, 0.0000, 0.0000, 0.0000],
         [0.2035, 0.7965, 0.0000, 0.0000],
         [0.5029, 0.2976, 0.1996, 0.0000],
         [0.0221, 0.1329, 0.4752, 0.3698]],

        [[1.0000, 0.0000, 0.0000, 0.0000],
         [0.5493, 0.4507, 0.0000, 0.0000],
         [0.1614, 0.3774, 0.4612, 0.0000],
         [0.2500, 0.2500, 0.2500, 0.2500]]])


In [ ]:
def scaled_dot_product_attention(Q, K, V, attn_mask):
    score = torch.bmm(Q, K.transpose(-2, -1)) / torch.sqrt(model_dim)
    masked_score = score.masked_fill(attn_mask, -1e9)
    prob = F.softmax(masked_score, -1)
    context = torch.bmm(prob, V)
    return context