In [None]:
!pip install bertviz

In [None]:
# アテンションの可視化
from transformers import AutoTokenizer
from bertviz.transformers_neuron_view import BertModel
from bertviz.neuron_view import show

model_ckpt = "bert-base-uncased"
tokenizer = AutoTokenizer.from_pretrained(model_ckpt)
model = BertModel.from_pretrained(model_ckpt)
text = "time flies like an arrow"
show(model, "bert", tokenizer, text, display_mode="light", layer=0, head=8)

In [None]:
# 入力
inputs = tokenizer(text, return_tensors="pt", add_special_tokens=False)
inputs.input_ids # 特殊トークンを除いた5個

In [None]:
# 密な埋め込みを用意
from torch import nn
from transformers import AutoConfig

config = AutoConfig.from_pretrained(model_ckpt)
token_emb = nn.Embedding(config.vocab_size, config.hidden_size)
token_emb # ルックアップテーブル

In [None]:
# 埋め込みベクトル化
inputs_embeds = token_emb(inputs.input_ids)
inputs_embeds.size() # [1, 5, 768]

In [None]:
# アテンションスコア計算
import torch
from math import sqrt

query = key = value = inputs_embeds
dim_k = key.size(-1)
scores = torch.bmm(query, key.transpose(1, 2)) / sqrt(dim_k) # transposeで 0-index の1,2成分をチェンジ
scores.size() # [1, 5, 5]

In [None]:
# ソフトマックスをかける
import torch.nn.functional as F

weights = F.softmax(scores, dim=-1)
weights.sum(dim=-1) # 5成分とも1

In [None]:
# アテンションの重みとバリューをかける
attn_outputs = torch.bmm(weights, value)
attn_outputs.shape # [1, 5, 768]

In [None]:
# スケール化ドット積アテンション関数
def scaled_dot_product_attention(query, key, value):
  dim_k = query.size(-1)
  scores = torch.bmm(query, key.transpose(1,2)) / sqrt(dim_k)
  weights = F.softmax(scores, dim=-1)
  return torch.bmm(weights, value)

In [None]:
# アテンションヘッド
class AttentionHead(nn.Module):
  def __init__(self, embed_dim, head_dim):
    super().__init__()
    self.q = nn.Linear(embed_dim, head_dim)
    self.k = nn.Linear(embed_dim, head_dim)
    self.v = nn.Linear(embed_dim, head_dim)

  def forward(self, hidden_state):
    attn_outputs = scaled_dot_product_attention(
        self.q(hidden_state), self.k(hidden_state), self.v(hidden_state))
    return attn_outputs

In [None]:
# マルチヘッドアテンション層
class MultiHeadAttention(nn.Module):
  def __init__(self, config):
    super().__init__()
    embed_dim = config.hidden_size
    num_heads = config.num_attention_heads
    head_dim = embed_dim // num_heads
    self.heads = nn.ModuleList(
        [AttentionHead(embed_dim, head_dim) for _ in range(num_heads)]
    )
    self.output_linear = nn.Linear(embed_dim, embed_dim)

  def forward(self, hidden_state):
    x = torch.cat([h(hidden_state) for h in self.heads], dim=-1) # 各アテンションヘッドの出力連結
    x = self.output_linear(x) # 全結合層
    return x

In [None]:
# BERTモデルで初期化
multihead_attn = MultiHeadAttention(config)
attn_output = multihead_attn(inputs_embeds)
attn_output.size() # [1, 5, 768]

In [None]:
# アテンションを可視化
from bertviz import head_view
from transformers import AutoModel

model = AutoModel.from_pretrained(model_ckpt, output_attentions=True)

sentence_a = "time flies like an arrow"
sentence_b = "fruit flies like a banana"

viz_inputs = tokenizer(sentence_a, sentence_b, return_tensors='pt')
attention = model(**viz_inputs).attentions
sentence_b_start = (viz_inputs.token_type_ids == 0).sum(dim=1)
tokens = tokenizer.convert_ids_to_tokens(viz_inputs.input_ids[0])

head_view(attention, tokens, sentence_b_start, heads=[8])

In [None]:
# 順伝播層実装
class FeedForward(nn.Module):
  def __init__(self, config):
    super().__init__()
    self.linear_1 = nn.Linear(config.hidden_size, config.intermediate_size) # 中間表現へ
    self.linear_2 = nn.Linear(config.intermediate_size, config.hidden_size) # 中間表現から戻す
    self.gelu = nn.GELU()
    self.dropout = nn.Dropout(config.hidden_dropout_prob)

  def forward(self, x):
    x = self.linear_1(x)
    x = self.gelu(x)
    x = self.linear_2(x)
    x = self.dropout(x)
    return x

In [None]:
# 順伝播の出力
feed_forward = FeedForward(config)
ff_outputs = feed_forward(attn_outputs)
ff_outputs.size() # [1, 5, 768]

In [None]:
# Pre-LN によるレイヤー正規化
class TransformerEncoderLayer(nn.Module):
  def __init__(self, config):
    super().__init__()
    self.layer_norm1 = nn.LayerNorm(config.hidden_size) # 正規化層1
    self.layer_norm2 = nn.LayerNorm(config.hidden_size) # 正規化層2
    self.attention = MultiHeadAttention(config)
    self.feed_forward = FeedForward(config)

  def forward(self, x):
    hidden_state = self.layer_norm1(x)
    x = x + self.attention(hidden_state) # スキップ接続つきアテンション
    x = x + self.feed_forward(self.layer_norm2(x)) # スキップ接続つき順伝播層
    return x

In [None]:
# レイヤー正規化に通してみる
encoder_layer = TransformerEncoderLayer(config)
inputs_embeds.shape, encoder_layer(inputs_embeds).size() # ともに[1, 5, 768]

In [None]:
# トークン+位置埋め込み
class Embeddings(nn.Module):
  def __init__(self, config):
    super().__init__()
    # トークン埋め込み
    self.token_embeddings = nn.Embedding(config.vocab_size,
                                         config.hidden_size)
    # 位置埋め込み
    self.position_embeddings = nn.Embedding(config.max_position_embeddings,
                                            config.hidden_size)
    self.layer_norm = nn.LayerNorm(config.hidden_size, eps=1e-12)
    self.dropout = nn.Dropout()

  def forward(self, input_ids):
    # 位置ID作成
    seq_length = input_ids.size(1)
    position_ids = torch.arange(seq_length, dtype=torch.long).unsqueeze(0) # [1, len]に
    # 各埋め込み
    token_embeddings = self.token_embeddings(input_ids) # トークンIDの埋め込み
    position_embeddings = self.position_embeddings(position_ids) # 位置の埋め込み
    # 組み合わせ
    embeddings = token_embeddings + position_embeddings
    embeddings = self.layer_norm(embeddings)
    embeddings = self.dropout(embeddings)
    return embeddings

In [None]:
# 埋め込みに通してみる
embedding_layer = Embeddings(config)
embedding_layer(inputs.input_ids).size() # [1, 5, 768]

In [None]:
# エンコーダ構築
class TransformerEncoder(nn.Module):
  def __init__(self, config):
    super().__init__()
    self.embeddings = Embeddings(config)
    self.layers = nn.ModuleList([TransformerEncoderLayer(config)
                                 for _ in range(config.num_hidden_layers)]) # 深さ分のレイヤー

  def forward(self, x):
    x = self.embeddings(x)
    for layer in self.layers:
      x = layer(x)
    return x

In [None]:
# エンコーダに通してみる
encoder = TransformerEncoder(config)
encoder(inputs.input_ids).size() # [1, 5, 768]

In [None]:
# 分類ヘッド
class TransformerForSequenceClassification(nn.Module):
  def __init__(self, config):
    super().__init__()
    self.encoder = TransformerEncoder(config)
    self.dropout = nn.Dropout(config.hidden_dropout_prob)
    self.classifier = nn.Linear(config.hidden_size, config.num_labels)

  def forward(self, x):
    x = self.encoder(x)[:, 0, :] # 予測には最初のトークンを用いる(通常は[CLS]だが、今回は"time")
    x = self.dropout(x)
    x = self.classifier(x)
    return x

In [None]:
# モデルの用意
config.num_labels = 3 # 3クラス分類
encoder_classifier = TransformerForSequenceClassification(config)
encoder_classifier(inputs.input_ids).size() # [1, 3]