### 3章 Transformer

Jupyter用のBertVizを使ってアテンションの重みを可視化

In [1]:
from transformers import AutoTokenizer
from bertviz.transformers_neuron_view import BertModel
from bertviz.neuron_view import show

model_ckpt = "bert-base-uncased"
tokenizer = AutoTokenizer.from_pretrained(model_ckpt)
model = BertModel.from_pretrained(model_ckpt)
text = "time flies like an arrow"
show(model, "bert", tokenizer, text, display_mode="light", layer=0, head=8)

  state_dict = torch.load(resolved_archive_file, map_location='cpu')


<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

Pytorchでスケールドット積アテンションを実装

In [2]:
from pprint import pprint
inputs = tokenizer(text, return_tensors="pt", add_special_tokens=False)
pprint(inputs)

{'attention_mask': tensor([[1, 1, 1, 1, 1]]),
 'input_ids': tensor([[ 2051, 10029,  2066,  2019,  8612]]),
 'token_type_ids': tensor([[0, 0, 0, 0, 0]])}


In [3]:
# 密な埋込ベクトル(one-hotと反対)を作る
from torch import nn
from transformers import AutoConfig

config = AutoConfig.from_pretrained(model_ckpt)
token_emb = nn.Embedding(config.vocab_size, config.hidden_size) # ルックアップテーブル(単語数x埋め込み次元)
pprint(token_emb)

Embedding(30522, 768)


In [4]:
inputs_embeds = token_emb(inputs.input_ids)
inputs_embeds.size() # (batch_size, seq_len, hidden_dim)

torch.Size([1, 5, 768])

In [5]:
import torch
from math import sqrt

# QKVの計算(今回は重み行列W_QKVはなし)
query = key = value = inputs_embeds
dim_k = key.size(-1)
print("dim_k", dim_k)
scores = torch.bmm(query, key.transpose(1,2)) / sqrt(dim_k) # QK^T / √k. (B=1, N=5, K=768) x (B, K, N) = (B=1, N=5, N=5)
scores.size()


dim_k 768


torch.Size([1, 5, 5])

+ `torch.bmm()`関数は、クエリとキーベクトルが`[batch_size, seq_len, hidden_dim]`形式の場合、アテンションスコアの計算を単純化する**バッチ化された行列積**を計算します。
+  もしバッチ次元を無視すれば、単純にキーテンソルを転置して`[hidden_dim, que_len]`の形状とし、行列積を用いてすべてのドット積を`[seq_len, seq_len]`行列に集めることにより各クエリ及びキーベクトル間のドット積を計算できます。

In [6]:
# softmaxの計算
import torch.nn.functional as F

weights = F.softmax(scores, dim=-1) # アテンション(B, N, N)
pprint(weights)
pprint(weights.sum(dim=-1)) # 各行の要素の和は1.0

tensor([[[1.0000e+00, 2.9539e-13, 9.1081e-13, 7.1634e-13, 3.8932e-12],
         [1.0595e-14, 1.0000e+00, 8.5085e-15, 9.1408e-13, 2.4244e-13],
         [6.5487e-13, 1.7055e-13, 1.0000e+00, 9.6741e-13, 2.1326e-12],
         [3.7920e-13, 1.3490e-11, 7.1224e-13, 1.0000e+00, 6.2845e-13],
         [5.3448e-12, 9.2789e-12, 4.0719e-12, 1.6298e-12, 1.0000e+00]]],
       grad_fn=<SoftmaxBackward0>)
tensor([[1., 1., 1., 1., 1.]], grad_fn=<SumBackward1>)


In [7]:
print(weights.size())
print(value.size())

torch.Size([1, 5, 5])
torch.Size([1, 5, 768])


In [8]:
# アテンションをバリューに乗じる
attn_outputs = torch.bmm(weights, value)
attn_outputs.size()

torch.Size([1, 5, 768])

In [9]:
def scaled_dot_product_attention(query, key, value):
    # 今回は重み行列W_QKVはなし : 一種の平均化処理
    dim_k = query.size(-1)
    scores = torch.bmm(query, key.transpose(1,2)) / sqrt(dim_k)
    weights = F.softmax(scores, dim=-1)
    return torch.bmm(weights, value)

マルチヘッドにする理由は、様々な関連性を学習できるようにするため。CNNフィルターのようにあるフィルタは顔特徴量を検出する機能をもち、あるフィルタは輪郭特徴を検出する機能をもつといった具合で、ヘッドもマルチにしてあげる

In [10]:
class AttentionHead(nn.Module):
    def __init__(self, embed_dim, head_dim):
        super().__init__()
        self.q = nn.Linear(embed_dim, head_dim, bias=False)
        self.k = nn.Linear(embed_dim, head_dim, bias=False)
        self.v = nn.Linear(embed_dim, head_dim, bias=False)

    def forward(self, hidden_state):
        attn_outputs = scaled_dot_product_attention(self.q(hidden_state), self.k(hidden_state), self.v(hidden_state))
        return attn_outputs


In [11]:
class MultiHeadAttention(nn.Module):
    def __init__(self, config):
        super().__init__()
        embed_dim = config.hidden_size
        num_heads = config.num_attention_heads
        head_dim = embed_dim // num_heads
        self.heads = nn.ModuleList(
            [AttentionHead(embed_dim, head_dim) for _ in range(num_heads)]
        )

        self.output_linear = nn.Linear(embed_dim, embed_dim)

    def forward(self, hidden_state):
        x = torch.cat([h(hidden_state) for h in self.heads], dim=-1)
        x = self.output_linear(x)
        return x



アテンションを連結して得られる出力は、下流の順伝伝播型ネットワークに適した形状`[batch_size, seq_len, hidden_dim]`の出力テンソルを生成するために、最後の全結合層に送られる

In [12]:
multihead_attn = MultiHeadAttention(config)
attn_outputs = multihead_attn(inputs_embeds)
attn_outputs.size()

torch.Size([1, 5, 768])

In [13]:
from bertviz import head_view
from transformers import AutoModel

model = AutoModel.from_pretrained(model_ckpt, output_attentions=True)

sentence_a = "time files like an arrow"
sentence_b = "fruit files like a banana"

viz_inputs = tokenizer(sentence_a, sentence_b, return_tensors="pt")
print("viz_inputs", viz_inputs)
attention = model(**viz_inputs).attentions
print("attention", attention)
sentence_b_start = (viz_inputs.token_type_ids == 0).sum(dim=1)
print("sentence_b_start", sentence_b_start)
tokens = tokenizer.convert_ids_to_tokens(viz_inputs.input_ids[0])
print("tokens", tokens)

head_view(attention, tokens, sentence_b_start, heads=[8])

  state_dict = torch.load(resolved_archive_file, map_location="cpu")
Some weights of the model checkpoint at bert-base-uncased were not used when initializing BertModel: ['cls.predictions.transform.LayerNorm.weight', 'cls.predictions.transform.dense.weight', 'cls.seq_relationship.bias', 'cls.predictions.transform.dense.bias', 'cls.predictions.transform.LayerNorm.bias', 'cls.predictions.decoder.weight', 'cls.predictions.bias', 'cls.seq_relationship.weight']
- This IS expected if you are initializing BertModel from the checkpoint of a model trained on another task or with another architecture (e.g. initializing a BertForSequenceClassification model from a BertForPreTraining model).
- This IS NOT expected if you are initializing BertModel from the checkpoint of a model that you expect to be exactly identical (initializing a BertForSequenceClassification model from a BertForSequenceClassification model).


viz_inputs {'input_ids': tensor([[  101,  2051,  6764,  2066,  2019,  8612,   102,  5909,  6764,  2066,
          1037, 15212,   102]]), 'token_type_ids': tensor([[0, 0, 0, 0, 0, 0, 0, 1, 1, 1, 1, 1, 1]]), 'attention_mask': tensor([[1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1]])}
attention (tensor([[[[4.9604e-02, 4.7810e-02, 4.0896e-02,  ..., 1.0513e-01,
           3.3722e-02, 1.5397e-01],
          [2.2119e-01, 5.1522e-02, 2.2728e-01,  ..., 1.1713e-03,
           6.6409e-03, 4.9973e-03],
          [8.6369e-02, 1.8868e-01, 1.4056e-01,  ..., 3.9274e-03,
           6.0494e-03, 4.0190e-03],
          ...,
          [8.5742e-02, 2.6737e-03, 3.4177e-03,  ..., 8.4724e-02,
           2.1517e-01, 1.5445e-01],
          [6.1192e-02, 1.2808e-03, 3.9958e-03,  ..., 3.4235e-02,
           1.8965e-01, 2.1331e-01],
          [5.4679e-02, 1.2832e-03, 9.4459e-04,  ..., 1.5828e-01,
           1.3540e-01, 3.4778e-01]],

         [[8.5621e-01, 1.3166e-02, 6.1412e-03,  ..., 1.9891e-02,
           5.8655e-03, 9.8

<IPython.core.display.Javascript object>

In [14]:
class FeedForward(nn.Module):
    def __init__(self, config):
        super().__init__()
        self.linear_1 = nn.Linear(config.hidden_size, config.intermediate_size)
        self.linear_2 = nn.Linear(config.intermediate_size, config.hidden_size)
        self.gelu = nn.GELU()
        self.dropout = nn.Dropout(config.hidden_dropout_prob)

    def forward(self, x):
        x = self.linear_1(x)
        x = self.gelu(x)
        x = self.linear_2(x)
        x = self.dropout(x)
        return x

In [15]:
feed_forward = FeedForward(config)
ff_outputs = feed_forward(attn_outputs)
ff_outputs.size()

torch.Size([1, 5, 768])

In [16]:
class TransformerEncoderLayer(nn.Module):
    def __init__(self, config):
        super().__init__()
        self.layer_norm_1 = nn.LayerNorm(config.hidden_size)
        self.layer_norm_2 = nn.LayerNorm(config.hidden_size)
        self.attention = MultiHeadAttention(config)
        self.feed_forward = FeedForward(config)

    def forward(self, x):
        # レイヤー正規化を適用し、入力をクエリ、キー、バリューにコピー
        hidden_state = self.layer_norm_1(x)
        # スキップ接続付きのアテンションを適用
        x = x + self.attention(hidden_state)
        # スキップ接続付きの順伝播層を適用
        x = x + self.feed_forward(self.layer_norm_2(x))
        return x

In [17]:
encoder_layer = TransformerEncoderLayer(config)
inputs_embeds.shape, encoder_layer(inputs_embeds).size()

(torch.Size([1, 5, 768]), torch.Size([1, 5, 768]))

#### 位置埋込み
＋ セルフアテンション層と順伝播層は`permutation equivariant`と呼ばれる.
+ 入力順序が変更になった場合、出力も全く同じ順序変更が発生する

In [22]:
# position_embeddings = nn.Embedding(config.max_position_embeddings, config.hidden_size)
# print(position_embeddings.weight)
# print(position_embeddings.weight.shape)

Parameter containing:
tensor([[-1.4816, -0.4049,  0.4073,  ..., -2.1581,  0.1354, -1.4331],
        [-0.4499, -0.2962,  0.5502,  ..., -0.1001, -0.3113,  0.2003],
        [ 0.2066, -0.9477, -0.4182,  ..., -0.2273, -1.1413, -0.4476],
        ...,
        [-0.5891, -1.2758,  1.1353,  ...,  0.6629, -0.3174,  1.6183],
        [-0.2760, -1.7467, -0.7262,  ..., -0.7107, -1.3466, -0.4113],
        [-1.4135, -0.6160,  1.5391,  ...,  0.9116, -0.7790,  0.5820]],
       requires_grad=True)
torch.Size([512, 768])


In [18]:
class Embeddings(nn.Module):
    def __init__(self, config):
        super().__init__()
        self.token_embeddings = nn.Embedding(config.vocab_size, config.hidden_size)
        self.position_embeddings = nn.Embedding(config.max_position_embeddings, config.hidden_size)
        self.layer_norm = nn.LayerNorm(config.hidden_size, eps=1e-12)
        self.dropout = nn.Dropout()

    def forward(self, input_ids):
        # 入力系列に対する位置IDを作成
        seq_length = input_ids.size(1) # [B, S] -> S
        position_ids = torch.arange(seq_length, dtype=torch.long).unsqueeze(0) # [B, S]
        # トークン埋め込みと位置埋め込みを作成
        token_embeddings = self.token_embeddings(input_ids)
        position_embeddings = self.position_embeddings(position_ids)
        # トークン埋め込みと位置埋め込みを組み合わせる
        embeddings = token_embeddings + position_embeddings
        embeddings = self.layer_norm(embeddings)
        embeddings = self.dropout(embeddings)
        return embeddings

In [19]:
embedding_layer = Embeddings(config)
print("inputs.input_ids", inputs.input_ids)
print("inputs.input_ids.size()", inputs.input_ids.size())
print("embedding_layer.position_embeddings", embedding_layer.position_embeddings)

embedding_layer(inputs.input_ids).size()

inputs.input_ids tensor([[ 2051, 10029,  2066,  2019,  8612]])
inputs.input_ids.size() torch.Size([1, 5])
embedding_layer.position_embeddings Embedding(512, 768)


torch.Size([1, 5, 768])

+ 絶対位置表現(sin波とcos波の組み合わせ): 学習データが少ない場合に効果が大きい
+ 相対位置表現: 埋め込み計算をする際、周囲のトークンが最重要医なのでトークン間の相対的な位置をエンコードする. トークンのどこに注目するかによって想定的埋め込みが変わるので、最初に新しい相対位置埋め込み層を導入するだけでは設定できない。トークン間の相対的な位置を考慮する項を追加して、アテンション機構を改造する

In [23]:
class TransformerEncoder(nn.Module):
    def __init__(self, config):
        super().__init__()
        self.embeddings = Embeddings(config)
        self.layers = nn.ModuleList([TransformerEncoderLayer(config) for _ in range(config.num_hidden_layers)])

    def forward(self, x):
        x = self.embeddings(x)
        for layer in self.layers:
            x = layer(x)
        return x

In [24]:
encoder = TransformerEncoder(config)
encoder(inputs.input_ids).size()

torch.Size([1, 5, 768])

#### 分類ヘッドの追加(特定タスク用)

In [25]:
class TransformerForSequenceClassification(nn.Module):
    def __init__(self, config):
        super().__init__()
        self.encoder = TransformerEncoder(config)
        self.dropout = nn.Dropout(config.hidden_dropout_prob)
        self.classifier = nn.Linear(config.hidden_size, config.num_labels)

    def forward(self, x):
        x = self.encoder(x)[:, 0, :] # select hidden state of [CLS] token
        x = self.dropout(x)
        x = self.classifier(x)
        return x

In [26]:
config.num_labels = 3
encoder_classifier = TransformerForSequenceClassification(config)
encoder_classifier(inputs.input_ids).size()

torch.Size([1, 3])

### デコーダ

#### セルフアテンションにマスキングを取り入れる方法

In [35]:
# 下三角行列
tril_matrix = torch.tril(torch.ones(5, 5))
print(tril_matrix)

tmp_scores = torch.ones((5,5)) * 100
print(tmp_scores)

print(tmp_scores.masked_fill(tril_matrix == 0, -float("inf")))
# print(F.softmax(tmp_scores, dim=-1))

tensor([[1., 0., 0., 0., 0.],
        [1., 1., 0., 0., 0.],
        [1., 1., 1., 0., 0.],
        [1., 1., 1., 1., 0.],
        [1., 1., 1., 1., 1.]])
tensor([[100., 100., 100., 100., 100.],
        [100., 100., 100., 100., 100.],
        [100., 100., 100., 100., 100.],
        [100., 100., 100., 100., 100.],
        [100., 100., 100., 100., 100.]])
tensor([[100., -inf, -inf, -inf, -inf],
        [100., 100., -inf, -inf, -inf],
        [100., 100., 100., -inf, -inf],
        [100., 100., 100., 100., -inf],
        [100., 100., 100., 100., 100.]])
tensor([[0.2000, 0.2000, 0.2000, 0.2000, 0.2000],
        [0.2000, 0.2000, 0.2000, 0.2000, 0.2000],
        [0.2000, 0.2000, 0.2000, 0.2000, 0.2000],
        [0.2000, 0.2000, 0.2000, 0.2000, 0.2000],
        [0.2000, 0.2000, 0.2000, 0.2000, 0.2000]])


In [32]:
seq_len = inputs.input_ids.size(-1)
print("seq_len", seq_len)

mask = torch.tril(torch.ones(seq_len, seq_len)).unsqueeze(0)
print("mask", mask)
print("mask.size()", mask.size())

scores.masked_fill(mask == 0 , -float('inf'))

seq_len 5
mask tensor([[[1., 0., 0., 0., 0.],
         [1., 1., 0., 0., 0.],
         [1., 1., 1., 0., 0.],
         [1., 1., 1., 1., 0.],
         [1., 1., 1., 1., 1.]]])
mask.size() torch.Size([1, 5, 5])


tensor([[[26.7897,    -inf,    -inf,    -inf,    -inf],
         [-2.0608, 30.1176,    -inf,    -inf,    -inf],
         [-0.9348, -2.2801, 27.1196,    -inf,    -inf],
         [-1.1749,  2.3967, -0.5446, 27.4258,    -inf],
         [ 0.5179,  1.0695,  0.2459, -0.6697, 26.4728]]],
       grad_fn=<MaskedFillBackward0>)

#### エンコーダ・デコーダアテンション層の実装は宿題
+ マスク付きセルフアテンション. 成分が1の下三角行列を使う
+ クロスアテンション (K,V)が外部がらの入力

In [40]:
# マスク付きセルフアテンションの実装
class MaskSelfAttention(nn.Module):
    def __init__(self, embed_dim, head_dim, is_mask=True):
        super().__init__()
        self.q = nn.Linear(embed_dim, head_dim, bias=False)
        self.k = nn.Linear(embed_dim, head_dim, bias=False)
        self.v = nn.Linear(embed_dim, head_dim, bias=False)

        if is_mask:
            seq_len = self.q.size(1) # [B, N, D]
            self.mask = torch.tril(torch.ones(seq_len, seq_len)).unsqueeze(0)  # [B, N, N]
        else:
            self.mask = None

    def _scaled_dot_product_attention(self, query, key, value, mask):
        dim_k = query.size(-1)
        scores = torch.bmm(query, key.transpose(1,2)) / sqrt(dim_k)
        if mask is not None:
            scores = scores.masked_fill(mask == 0, float("-inf"))
        weights = F.softmax(scores, dim=-1)
        # pprint("weights", weights)
        return weights.bmm(value)

    def forward(self, hidden_state):
        attn_outputs = self._scaled_dot_product_attention(self.q(hidden_state),
                                                          self.k(hidden_state),
                                                          self.v(hidden_state),
                                                          self.mask)
        return attn_outputs


class MaskSelfMultiHeadAttention(nn.Module):
    def __init__(self, config):
        super().__init__()
        embed_dim = config.hidden_size
        num_heads = config.num_attention_heads
        head_dim = embed_dim // num_heads
        self.heads = nn.ModuleList(
            [MaskSelfAttention(embed_dim, head_dim, is_mask=True) for _ in range(num_heads)]
        )
        self.output_linear = nn.Linear(embed_dim, embed_dim)

    def forward(self, hidden_state):
        x = torch.cat([h(hidden_state) for h in self.heads], dim=-1)
        x = self.output_linear(x)
        return x


In [37]:
# クロスアテンションの実装
class MaskCrossAttention(nn.Module):
    def __init__(self, embed_dim, head_dim, is_mask=True):
        super().__init__()
        self.q = nn.Linear(embed_dim, head_dim, bias=False)

        if is_mask:
            seq_len = self.q.size(1) # [B, N, D]
            self.mask = torch.tril(torch.ones(seq_len, seq_len)).unsqueeze(0)  # [B, N, N]
        else:
            self.mask = None

    def _scaled_to_product_attention(self, query, key, value, mask):
        dim_k = query.size(-1)
        scores = torch.bmm(query, key.transpose(1,2)) / sqrt(dim_k)
        if mask is not None:
            scores = scores.masked_fill(mask == 0, float("-inf"))
        weights = F.softmax(scores, dim=-1)
        return weights.bmm(value)

    def forward(self, hidden_state, key, value):
        attn_outputs = self._scaled_to_product_attention(self.q(hidden_state),
                                                         key,
                                                         value,
                                                         self.mask)
        return attn_outputs


class MaskCrossMultiHeadAttention(nn.Module):
    def __init__(self, config):
        super().__init__()
        embed_dim = config.hidden_size
        num_heads = config.num_attention_heads
        self.head_dim = embed_dim // num_heads
        self.heads = nn.ModuleList(
            [MaskCrossAttention(embed_dim, self.head_dim, is_mask=True) for _ in range(num_heads)]
        )

        self.output_linear = nn.Linear(embed_dim, embed_dim)

    def forward(self, hidden_state, key, value):
        x = torch.cat([h(hidden_state,
                         key[..., i * self.head_dim : (i+1) * self.head_dim],
                         value[..., i * self.head_dim : (i+1) * self.head_dim]
                         ) for i, h in enumerate(self.heads)], dim=-1)
        x = self.output_linear(x)
        return x

In [38]:
# デコーダブロックの実装
class TransformerDecoderLayer(nn.Module):
    def __init__(self, config):
        super().__init__()
        self.layer_norm_1 = nn.LayerNorm(config.hidden_size)
        self.layer_norm_2 = nn.LayerNorm(config.hidden_size)
        self.self_attn = MaskSelfMultiHeadAttention(config)
        self.cross_attn = MaskCrossAttention(config)
        self.feed_forward = FeedForward(config)

    def forward(self, x, key, value):
        # レイヤー正規化を適用し、入力をクエリ、キー、バリューにコピー
        hidden_state = self.layer_norm_1(x)
        # スキップ接続付きのセルフマルチヘッドアテンションを適用
        x = x + self.self_attn(hidden_state)
        # スキップ接続付きのクロスマルチヘッドアテンションを適用
        x = x + self.cross_attn(x, key, value)
        # スキップ接続付きの順伝播層を適用
        x = x + self.feed_forward(self.layer_norm_2(x))
        return x

In [39]:
# デコーダの実装
class TransformerDecoder(nn.Module):
    def __init__(self, config):
        super().__init__()
        self.embeddings = Embeddings(config)
        self.layers = nn.ModuleList([TransformerDecoderLayer(config) for _ in range(config.num_hidden_layers)])

    def forward(self, x, key, value):
        x = self.embeddings(x)
        for layer in self.layers:
            x = layer(x, key, value)
        return x

In [None]:
# 動作確認