## BERT代码实现

## Bidirectional Encoder Representations from Transformers(BERT)

In [18]:
import torch
from torch import nn
from d2l import torch as d2l

## Input Represention

In [19]:
def get_tokens_and_segments(tokens_a, tokens_b = None):
    tokens = ['<cls>'] + tokens_a + ['<sep>']
    segments = [0] * (len(tokens_a) + 2)
    if tokens_b is not None:
        tokens += tokens_b + ['<sep>']
        segments += [1] * (len(tokens_b) + 1)
    return tokens, segments

In [63]:
get_tokens_and_segments(["hello", "world"])
get_tokens_and_segments(["hello", "world"], ["good"])

(['<cls>', 'hello', 'world', '<sep>', 'good', '<sep>'], [0, 0, 0, 0, 1, 1])

## ```BERTEncoder``` class

In [20]:
class BERTEncoder(nn.Module):
    def __init__(self, vocab_size, num_hiddens, norm_shape, ffn_num_input,
                 ffn_num_hiddens, num_heads, num_layers, dropout, 
                 max_len=1000, key_size=768, query_size=768, value_size=768, **kwargs):
        super(BERTEncoder, self).__init__(**kwargs)
        self.token_embedding = nn.Embedding(vocab_size, num_hiddens)
        # 给句子标号的片段,给句子对标号
        self.segment_embedding = nn.Embedding(2, num_hiddens)
        self.blks = nn.Sequential()
        for i in range(num_layers):
            self.blks.add_module(f"{i}", d2l.EncoderBlock(
                key_size, query_size, value_size, num_hiddens, norm_shape,
                ffn_num_input, ffn_num_hiddens, num_heads, dropout, True))
        # 在BERT中，位置嵌入是可学习的，因此我们创建一个足够长的位置嵌入参数
        # size(batch_size, 最大序列长度，隐藏层大小）
        self.pos_embedding = nn.Parameter(torch.randn(1, max_len,
                                                      num_hiddens))

    def forward(self, tokens, segments, valid_lens):
        # 在以下代码段中，X的形状保持不变：（批量大小，最大序列长度，num_hiddens）
        X = self.token_embedding(tokens) + self.segment_embedding(segments)
        X = X + self.pos_embedding.data[:, :X.shape[1], :]
        for blk in self.blks:
            X = blk(X, valid_lens)
        return X
        

## Inference of ```BERTEncoder```
将句子中的每一个词抽取出一个```num_hiddens```的向量特征

In [24]:
vocab_size, num_hiddens, ffn_num_hiddens, num_heads = 10000, 768, 1024, 4
norm_shape, ffn_num_input, num_layers, dropout = [768], 768, 2, 0.2
encoder = BERTEncoder(vocab_size, num_hiddens, norm_shape, ffn_num_input,
                      ffn_num_hiddens, num_heads, num_layers, dropout)

tokens = torch.randint(0, vocab_size, (2, 8))
segments = torch.tensor([[0, 0, 0, 0, 1, 1, 1, 1], [0, 0, 0, 1, 1, 1, 1, 1]])
encoded_X = encoder(tokens, segments, None)
# (batch_size, 句子长度, num_hiddens)
encoded_X.shape

torch.Size([2, 8, 768])

## Masked Language Modeling

In [27]:
class MaskLM(nn.Module):
    """
    The masked language model task of BERT
    这是BERT之上接的单隐藏层的MLP
    """
    def __init__(self, vocab_size, num_hiddens, num_inputs=768, **kwargs):
        super(MaskLM, self).__init__(**kwargs)
        self.mlp = nn.Sequential(
            # 先使用一个全连接层
            nn.Linear(num_inputs, num_hiddens),
            nn.ReLU(),
            nn.LayerNorm(num_hiddens),
            # 再用一个全连接层，输出长度为vocab_size： 这是因为需要对每个词进行预测，看它是哪个词
            nn.Linear(num_hiddens, vocab_size))
        
    def forward(self, X, pred_positions):
        # 它需要两个输入：BERTEncoder的编码结果和用于预测的词元位置。输出是这些位置的预测结果。
        # X是给定的句子里面，BERTEncoder的输出，size：（批量大小，样本大小，num_hiddens）
        # pred_positions: 哪些词是加了Mask的，即告诉我们，需要预测哪些词
        # pred_positions size (样本数量，待预测的词的位置)
        num_pred_positions = pred_positions.shape[1]
        pred_positions = pred_positions.reshape(-1)
        batch_size = X.shape[0]
        batch_idx = torch.arange(0, batch_size)
        # 复制每批的索引
        # 假设batch_size=2，num_pred_positions=3
        # 那么batch_idx是np.array（[0,0,0,1,1]）
        batch_idx = torch.repeat_interleave(batch_idx, num_pred_positions)
        masked_X = X[batch_idx, pred_positions]
        masked_X = masked_X.reshape((batch_size, num_pred_positions, -1))
        mlp_Y_hat = self.mlp(masked_X)
        return mlp_Y_hat
        
        

In [26]:
t = torch.arange(0, 2)
t, torch.repeat_interleave(t, 4)

(tensor([0, 1]), tensor([0, 0, 0, 0, 1, 1, 1, 1]))

## The forward inference of ```MaskLM```

In [32]:
mlm = MaskLM(vocab_size, num_hiddens)
# 第0个句子预测的词的位置[1, 5, 2]
mlm_positions = torch.tensor([[1, 5, 2], [6, 1, 5]])
mlm_Y_hat = mlm(encoded_X, mlp_positions)
# mlm_Y_hat 输出(2, 3, 10000), 其中2是batch_size, 3是预测的词，10000是预测的输出(vocab_size为10000）
mlm_positions.shape, mlm_Y_hat.shape

(torch.Size([2, 3]), torch.Size([2, 3, 10000]))

In [40]:
mlm_Y = torch.tensor([[7, 8, 9], [10, 20, 30]])
loss =nn.CrossEntropyLoss(reduction='none')
mlm_l = loss(mlm_Y_hat.reshape((-1, vocab_size)), mlm_Y.reshape(-1))
mlm_l.shape

torch.Size([6])

In [44]:
mlm_Y_hat.reshape((-1, vocab_size)).shape, mlm_Y.reshape(-1).shape

(torch.Size([6, 10000]), torch.Size([6]))

## Next Sentence Prediction

In [45]:
class NextSentencePred(nn.Module):
    """
    The next sentence prediction task of BERT
    本质就是一个单分类问题
    """
    def __init__(self, num_inputs, **kwargs):
        super(NextSentencePred, self).__init__(**kwargs)
        self.output = nn.Linear(num_inputs, 2)
    
    def forward(self, X):
        return self.output(X)
        

## The forward inference of an ```NextSentencePred```

In [50]:
print("before", encoded_X.shape)
ns_encoded_X = torch.flatten(encoded_X, start_dim=1)
print("After", ns_encoded_X.shape)
nsp = NextSentencePred(ns_encoded_X.shape[-1])
nsp_Y_hat = nsp(ns_encoded_X)
nsp_Y_hat.shape

before torch.Size([2, 8, 768])
After torch.Size([2, 6144])


torch.Size([2, 2])

In [61]:
nsp_y = torch.tensor([0, 1])
nsp_l = loss(nsp_Y_hat, nsp_y)
nsp_l.shape, nsp_l

(torch.Size([2]), tensor([0.7634, 1.0668], grad_fn=<NllLossBackward0>))

## BERT模型

In [62]:
class BERTModel(nn.Module):
    
    def __init__(self, vocab_size, num_hiddens, norm_shape, ffn_num_input,
                 ffn_num_hiddens, num_heads, num_layers, dropout,
                 max_len=1000, key_size=768, query_size=768, value_size=768,
                 hid_in_features=768, mlm_in_features=768,
                 nsp_in_features=768):
        super(BERTModel, self).__init__()
        self.encoder = BERTEncoder(vocab_size, num_hiddens, norm_shape,
                                   ffn_num_input, ffn_num_hiddens, num_heads, num_layers,
                                   dropout, max_len=max_len, key_size=key_size,
                                   query_size=query_size, value_size=value_size)
        self.hidden = nn.Sequential(nn.Linear(hid_in_features, num_hiddens), nn.Tanh())
        self.mlm = MaskLM(vocab_size, num_hiddens, mlm_in_features)
        self.nsp = NextSentencePred(nsp_in_features)
        
    def forward(self, tokens, segments, valid_lens=None,
                pred_positions=None):
        """
        pred_positions为None表示不做语言模型
        """
        # 使用编码器抽取特征
        # encoded_X: (批量大小，最大序列长度，num_hiddens）
        encoded_X = self.encoder(tokens, segments, valid_lens)
        if pred_positions is not None:
            # 如果是预测位置
            mlm_Y_hat = self.mlm(encoded_X, pred_positions)
        else:
            mlm_Y_hat = None
        # 用于下一句预测的多层感知机分类器的隐藏层，0是“<cls>”标记的索引
        nsp_Y_hat = self.nsp(
            # 0是句子对中第一个句子的向量
            self.hidden(encoded_X[:, 0, :]))
        return encoded_X, mlm_Y_hat, nsp_Y_hat