In [1]:
import torch
import numpy 
import torch.nn as nn 
import torch.nn.functional as F 


### 关于 word embedding ，以序列建模
 - 需要考虑source sentence  和 target sentence
 - 

In [None]:
batch_size = 2

# 词表大小
max_num_src_words = 8
max_num_tgt_words = 15

# 序列的最大长度
max_src_seq_len = 5
max_tgt_seq_len = 7

# 假设2 条样本的src_len 分别为 2, 4 tgt_len 分别为 4, 3
src_len = torch.Tensor([2, 4]).to(torch.int32)
tgt_len = torch.Tensor([4, 3]).to(torch.int32)

# 根据单词 id 构建的句子， 句子长度不相同，需要按照最长的那条(但是貌似此处pad 到所有序列的最大长度)， pad ID设置为0， [TENSOR, TENSOR] 这种无法直接torch.tensor()  ，因为数据格式为 batch_size, max_seq_len ，因此转为两维，然后cat
# F.pad(tensor, (left, right, up, down))第二个参数分别表示 左， 右， 上， 下， 填充几行或者列
src_seq = torch.cat([torch.unsqueeze(F.pad(torch.randint(1, max_num_src_words, (L, )), (0, max(src_len)-L)), 0) for L in src_len])
tgt_seq = torch.cat([torch.unsqueeze(F.pad(torch.randint(1, max_num_tgt_words, (L, )), (0, max(tgt_len)-L)), 0) for L in tgt_len])

src_seq, tgt_seq

(tensor([[4, 5, 0, 0],
         [2, 3, 6, 6]]),
 tensor([[14, 10, 12,  3],
         [ 2,  2,  2,  0]]))

### 构造 word embedding

In [40]:
###  根据上述 id  构造 word embedding， 词表中多一个0 pad ，因此多加 1 
model_dim = 18
src_embedding_table = nn.Embedding(max_num_src_words+1, model_dim)
tgt_embedding_table = nn.Embedding(max_num_tgt_words+1, model_dim)
src_seq_embeded = src_embedding_table(src_seq)
tgt_src_embeded = tgt_embedding_table(tgt_seq)

src_embedding_table, tgt_embedding_table

(Embedding(9, 18), Embedding(16, 18))

### 构造position embedding

In [None]:
### 构造position embedding 
# 根据transformer 的公式，需要获取 pos  和 i 两个矩阵， pos 是 token 所在 seq 的位置， i 是对一个token 的 embedding 中的位置
max_position_len = 21
pos_mat = torch.arange(max_position_len).reshape(-1, 1) # shape = [21, 1]
i_mat = torch.pow(10000, torch.arange(0, model_dim, 2).reshape(1, -1) / model_dim) 
pe_embedding_table = torch.zeros(max_position_len, model_dim)
pe_embedding_table[:, 0::2] = torch.sin(pos_mat / i_mat)  
pe_embedding_table[:, 1::2] = torch.cos(pos_mat / i_mat)

# 赋值位置编码的值
pe_embedding = nn.Embedding(max_position_len, model_dim)
pe_embedding.weight = nn.Parameter(pe_embedding_table, requires_grad=False)  

# 根据位置索引，拿位置编码， 需要根据batch 的最大长度，来确定位置索引
src_pos = torch.cat([torch.unsqueeze(torch.arange(max(src_len)), 0) for _ in src_len])
tgt_pos = torch.cat([torch.unsqueeze(torch.arange(max(tgt_len)), 0) for _ in tgt_len])

src_pe_embedding = pe_embedding(src_pos)
tgt_pe_embedding = pe_embedding(tgt_pos)

src_pe_embedding.shape

torch.Size([2, 4, 18])

### 构造encoder self-attn mask

In [37]:
### 构造encoder 的 self-attention mask， 本质是对pad 的位置进行mask
# mask 的 shape [batch_szie, max_src_len, max_src_len] Q * K^T 之后进行mask , max是一个batch 而言
# 思路是用邻接矩阵反应两两之间的相关性
valid_encoder_pos = torch.cat([torch.unsqueeze(F.pad(torch.ones(L), (0, max(src_len)-L)), 0) for L in src_len])  #[batch_size, max(src_len)]

# 不同sample 之间毫无关系，需要扩一个维度
valid_encoder_pos = torch.unsqueeze(valid_encoder_pos, 2)                                                        #[batch_size, max(src_len), 1]

# 利用邻接矩阵，判断同一个样本不同TOKEN 之间是否关联， 1 表示关联， 0 表示其中一个TOKEN 是 pad=0
# torch.bmm (batch matrix mul)
valid_encoder_pos_matrix = torch.bmm(valid_encoder_pos, valid_encoder_pos.transpose(-2, -1))                     # [batch_size, max(src_len), max(src_len)]
valid_encoder_pos_matrix

# 第一个样本中，只有两个token ,因此有效的就是前两个token 相互为 1， 剩下都为0，

# 无效matrix
invalid_encoder_pos_matrix = 1 - valid_encoder_pos_matrix
mask_encoder_self_attn = invalid_encoder_pos_matrix.to(torch.bool)
# True 表示相对无关系， 有一个TOKEN 为 pad = 0, False 表示两个有效token 
mask_encoder_self_attn  



## 举例子计算mask_score

score = torch.randn(batch_size, max(src_len), max(src_len))
masked_score = score.masked_fill(mask_encoder_self_attn, -1e9)  # mask_encoder_self_attn 为 True 的位置采用 -1e9 替换 负10 亿
prob = F.softmax(masked_score, dim=-1)
prob

tensor([[[0.0699, 0.9301, 0.0000, 0.0000],
         [0.7419, 0.2581, 0.0000, 0.0000],
         [0.2500, 0.2500, 0.2500, 0.2500],
         [0.2500, 0.2500, 0.2500, 0.2500]],

        [[0.2497, 0.6800, 0.0337, 0.0366],
         [0.7394, 0.0840, 0.0542, 0.1224],
         [0.3557, 0.0668, 0.0485, 0.5290],
         [0.2485, 0.0614, 0.1061, 0.5839]]])

### 构造 intra-attention mask 
 - 即encoder-decoder mask

In [50]:
### intra-attention mask  形状为 [batch_size, max_tgt_len, max_src_len] ， max 是一个batch 而言
# 邻接矩阵表示不同token 之间是否相关，此处就是如果两个都是有效token 不是pad=0， 则相关
valid_encoder_pos = torch.unsqueeze(torch.cat([torch.unsqueeze(F.pad(torch.ones(L), (0, max(src_len)-L)), 0) for L in src_len]), 2)
valid_decoder_pos = torch.unsqueeze(torch.cat([torch.unsqueeze(F.pad(torch.ones(L), (0, max(tgt_len)-L)), 0) for L in tgt_len]), 2)

valid_cross_pos_matrix = torch.bmm(valid_decoder_pos, valid_encoder_pos.transpose(-2, -1))
invalid_cross_pos_matrix = 1 - valid_cross_pos_matrix
masked_cross_attention = invalid_cross_pos_matrix.to(torch.bool)
masked_cross_attention

tensor([[[False, False,  True,  True],
         [False, False,  True,  True],
         [False, False,  True,  True],
         [False, False,  True,  True]],

        [[False, False, False, False],
         [False, False, False, False],
         [False, False, False, False],
         [ True,  True,  True,  True]]])

In [47]:
valid_encoder_pos

tensor([[[1.],
         [1.],
         [0.],
         [0.]],

        [[1.],
         [1.],
         [1.],
         [1.]]])

In [48]:
valid_decoder_pos

tensor([[[1.],
         [1.],
         [1.],
         [1.]],

        [[1.],
         [1.],
         [1.],
         [0.]]])

### 构造 decoder self-attn masked
 - 下三角的attn
 - tri 是三角 ， torch.tril 下三角 low tri , torch.triu 上三角 up tri

In [60]:
# batch 内的都一样的大小，需要pad , 第二个参数分别表示 左， 右， 上， 下， 填充几行或者列
# 形状为 [batch_size, max_tgt_len, max_tgt_len] max 是一个batch 而言
valid_decoder_tri_matrix = torch.cat([torch.unsqueeze(F.pad(torch.tril(torch.ones((L, L))), (0, max(tgt_len)-L, 0, max(tgt_len)-L)), 0) for L in tgt_len])
invalid_decoder_tri_matrix = 1 - valid_decoder_tri_matrix
invalid_decoder_tri_matrix = invalid_decoder_tri_matrix.to(torch.bool)

score = torch.randn(batch_size, max(tgt_len), max(tgt_len))
masked_score = score.masked_fill(invalid_decoder_tri_matrix, -1e9)
prob = F.softmax(masked_score, dim=-1)
print(prob)

tensor([[[1.0000, 0.0000, 0.0000, 0.0000],
         [0.1923, 0.8077, 0.0000, 0.0000],
         [0.3294, 0.4685, 0.2021, 0.0000],
         [0.2039, 0.1408, 0.1644, 0.4909]],

        [[1.0000, 0.0000, 0.0000, 0.0000],
         [0.7790, 0.2210, 0.0000, 0.0000],
         [0.5301, 0.1637, 0.3062, 0.0000],
         [0.2500, 0.2500, 0.2500, 0.2500]]])


### 构造自注意力计算


In [None]:
def scaled_dot_product_attention(Q, K, V, mask_attn):
    # Q [bs, n_heads, seq_len, embedding_dim/n_heads]
    # K [bs, n_heads, seq_len, embedding_dim/n_heads]  # 同侧时， seq_len 和 Q 相同， 不同侧时不同，一个encoder ，一个decoder，不一定相同
    # V [bs, n_heads, seq_len, embedding_dim/n_heads]
    # attn_mask [bs, seq_len, seq_len]

    score = torch.bmm(Q, K.transpose(-2, -1)) / torch.sqrt(model_dim)
    masked_score = score.masked_fill(mask_attn, -1e9)
    prob = F.softmax(masked_score)
    context = torch.bmm(prob, V)
    return context
