In [11]:
import torch
import torch.nn as nn
import numpy as np
import jieba
import math

In [12]:
torch.__version__

'1.10.2'

In [13]:
class Attention(nn.Module):
    def __init__(self, attention_dropout=0.1):
        super(Attention, self).__init__()
        self.dropout = nn.Dropout(attention_dropout)
        self.softmax = nn.Softmax(dim=-1)

    def forward(self, q, k, v, scale=False, attn_mask=None):
        # [batch, head, seq, feature]
        attention = torch.matmul(q, k.transpose(-1, -2))  # Q × K^T
        if scale:
            # print(k.shape)
            attention = attention * math.sqrt(k.shape[-1])       # 是否设置缩放，根号dk
        if attn_mask:
            attention = attention.masked_fill(attn_mask, -np.inf)     # 给需要mask的地方设置一个负无穷。
        # 计算softmax
        attention = self.softmax(attention)  # 这里的softmax，在dim=-1, -2都是可以的
        # 添加dropout
        attention = self.dropout(attention)  # (N × N)
        # softmax(Q×KT) × V
        z = torch.matmul(attention, v)
        return z, attention  # z是attention的最终输出，attention注意力矩阵

In [14]:
class MultiHeadAttention(nn.Module):
    def __init__(self, model_dim=64, num_heads=4, dropout=0.1):
        super(MultiHeadAttention, self).__init__()

        self.head_dim = model_dim//num_heads   # 每个头的维度
        self.num_heads = num_heads
        self.Wk = nn.Linear(model_dim, self.head_dim * num_heads)
        self.Wv = nn.Linear(model_dim, self.head_dim * num_heads)
        self.Wq = nn.Linear(model_dim, self.head_dim * num_heads)

        self.Attention = Attention()

        self.Wo0 = nn.Linear(model_dim, model_dim)
        self.Wo1 = nn.Linear(model_dim, model_dim)

        self.relu = nn.ReLU()
        self.dropout = nn.Dropout(dropout)
        self.layer_norm = nn.LayerNorm(model_dim)         # LayerNorm 归一化。

    def forward(self, X, scale = True, attn_mask=None):
        # 残差连接
        residual = X

        head_dim = self.head_dim     # 每个头的维度
        num_heads = self.num_heads   # 头的个数
        batch_size = X.shape[0]       # batch_size

        # 线性映射，求  Q  K  V
        K = self.Wk(X)
        V = self.Wv(X)
        Q = self.Wq(X)  # (Batch_size, seq_length, 64)

        # 按照头进行分割，shape：[batch, seq, feature] -> [batch, head, seq, feature]
        K = K.view(batch_size, num_heads, -1, head_dim)  # (Batch_size, 4, seq_length, 16)
        V = V.view(batch_size, num_heads, -1, head_dim)
        Q = Q.view(batch_size, num_heads, -1, head_dim)

        if attn_mask:
            attn_mask = attn_mask.repeat(num_heads, 1, 1)
        # print(Q.shape)
        Z, attention = self.Attention(Q, K, V, scale, attn_mask)

        # 进行头合并 concat heads，又变回  [batch, seq, feature]
        Z = Z.view(batch_size, -1, head_dim * num_heads)    # (Batch_size, seq_length, 64)

        # 进行线性映射，论文中有两层线性层，激活函数是RELU
        Z = self.Wo0(Z)
        Z = self.relu(Z)
        Z = self.Wo1(Z)
        # dropout
        Z = self.dropout(Z)

        # 添加残差层和正则化层。
        output = self.layer_norm(residual + Z)

        return output, attention

In [15]:
sentence = "我爱学习，学习使我快乐"
seg = jieba.cut(sentence)            # 分词
seg_list = ','.join(seg).split(',')  # 转化为列表
print(seg_list)
token_list = list(set(seg_list))       # 去重
print(token_list)

dict = {}
# for token in seg_list:
for i in range(len(token_list)):       # 存入字典
    dict[token_list[i]] = i
    pass

print(dict)

dict['吃'] = 6
dict['汉堡'] = 7
seq = [dict[i] for i in seg_list]
print(seq)

['我', '爱', '学习', '，', '学习', '使', '我', '快乐']
['爱', '使', '学习', '我', '，', '快乐']
{'爱': 0, '使': 1, '学习': 2, '我': 3, '，': 4, '快乐': 5}
[3, 0, 2, 4, 2, 1, 3, 5]


In [16]:
embedding = nn.Embedding(len(dict), 64)
input_embedding = embedding(torch.tensor(seq)).unsqueeze(0)
print(input_embedding.shape)   # batch_size = 1，只有1条数据

torch.Size([1, 8, 64])


In [17]:
Multi_Head_Attention = MultiHeadAttention()
output, attention = Multi_Head_Attention(input_embedding)
print(output.shape, attention.shape)

torch.Size([1, 8, 64]) torch.Size([1, 4, 8, 8])


In [18]:
# pytorch中有封装好的transformer层，拿来用就行
encoder_layer = nn.TransformerEncoderLayer(d_model=64, nhead=4)
transformer_encoder = nn.TransformerEncoder(encoder_layer, num_layers=4)

output = transformer_encoder(input_embedding)
print(output.shape)
# decoder用法与encoder类似

torch.Size([1, 8, 64])


In [19]:
# pytorch还有封装好的multi-head self-attention层
attention_layer = nn.MultiheadAttention(embed_dim=64, num_heads=4, batch_first=True)
output, attention = attention_layer(input_embedding, input_embedding, input_embedding) #, average_attn_weights=False)
# average_attn_weights 参数决定是否把所有head的attention取平均，默认true，如果是False，输出的attention矩阵大小就是[1,4,8,8]
# 该参数在最新版torch(1.11)中出现。本教程使用1.10.2，因此没有该参数。
print(output.shape, attention.shape)

torch.Size([1, 8, 64]) torch.Size([1, 8, 8])
