# AttentionViz（の一部）をBERTで再現してみる
[AttentionViz: A Global View of Transformer Attention](https://catherinesyeh.github.io/attn-docs/)の「5.1.1 Vector Nomalization」を実装し、相関が高くなる定数を求めた上でQ,Kベクトル正規化。それをPCAで描画してみた。

## BERTからQ, K, Vの重みベクトルを取得に向けて1（隠れ層、アテンション取得）

In [1]:
from transformers import BertTokenizer, BertModel
from torch import nn
import torch
import numpy as np
import math

# モデルの容易
model_name = "bert-base-uncased"
tokenizer = BertTokenizer.from_pretrained(model_name)
model = BertModel.from_pretrained(model_name, output_attentions=True, output_hidden_states=True)

# テキストを入力としてモデルを実行
text = "The brown capybara is sleeping now."
input_ids = tokenizer.encode(text, return_tensors="pt")
tokens = tokenizer.convert_ids_to_tokens(input_ids[0])
print(f'{len(input_ids[0])=}') # 11 tokens
print(f'{tokens=}')

# モデルの各層の出力を取得
outputs = model(input_ids)
hidden_states = outputs["hidden_states"]
print(f'{len(hidden_states)=}') # input layer + 12 layers = 13
print(f'{hidden_states[0].shape=}') # torch.Size([1, 11, 768]), [sequence_size, token_num, dims]

# 各層のアテンションを取得（動作確認用）
attention = outputs["attentions"]
print(f'{len(attention)=}') # 12 layers
print(f'{attention[0].shape=}') # torch.Size([1, 12, 11, 11]), [sequence_size, heads, token_num, token_num]


  from .autonotebook import tqdm as notebook_tqdm
Some weights of the model checkpoint at bert-base-uncased were not used when initializing BertModel: ['cls.predictions.decoder.weight', 'cls.seq_relationship.weight', 'cls.predictions.transform.dense.bias', 'cls.predictions.transform.LayerNorm.weight', 'cls.seq_relationship.bias', 'cls.predictions.transform.LayerNorm.bias', 'cls.predictions.bias', 'cls.predictions.transform.dense.weight']
- This IS expected if you are initializing BertModel from the checkpoint of a model trained on another task or with another architecture (e.g. initializing a BertForSequenceClassification model from a BertForPreTraining model).
- This IS NOT expected if you are initializing BertModel from the checkpoint of a model that you expect to be exactly identical (initializing a BertForSequenceClassification model from a BertForSequenceClassification model).


len(input_ids[0])=11
tokens=['[CLS]', 'the', 'brown', 'cap', '##y', '##bara', 'is', 'sleeping', 'now', '.', '[SEP]']
len(hidden_states)=13
hidden_states[0].shape=torch.Size([1, 11, 768])
len(attention)=12
attention[0].shape=torch.Size([1, 12, 11, 11])


## BERTからQ, K, Vの重みベクトルを取得に向けて2（レイヤー0でQ, K, V取得、アテンション算出）
Q, K, Vの取得は[Source code for transformers.modeling_bert](https://huggingface.co/transformers/v3.2.0/_modules/transformers/modeling_bert.html)の BertSelfAttention.forward を参考にしました。

In [2]:
layer_index = 0 # 隠れ層1層目だけを対象とする。

# Q, K, Vの重みベクトル取得
mixed_query_layer = model.encoder.layer[layer_index].attention.self.query(outputs["hidden_states"][layer_index])
mixed_key_layer = model.encoder.layer[layer_index].attention.self.key(outputs["hidden_states"][layer_index])
mixed_value_layer = model.encoder.layer[layer_index].attention.self.value(outputs["hidden_states"][layer_index])

query_layer = model.encoder.layer[layer_index].attention.self.transpose_for_scores(mixed_query_layer)
key_layer = model.encoder.layer[layer_index].attention.self.transpose_for_scores(mixed_key_layer)
value_layer = model.encoder.layer[layer_index].attention.self.transpose_for_scores(mixed_value_layer)
print(f'{query_layer.shape=}') # torch.Size([1, 12, 11, 64]), [sequence_size, heads, token_num, dims]
print(f'{key_layer.shape=}') # same
print(f'{value_layer.shape=}') # same

# アテンション求めてみる
attention_scores = torch.matmul(query_layer, key_layer.transpose(-1, -2))
attention_head_size = model.encoder.layer[layer_index].attention.self.attention_head_size
attention_scores = attention_scores / math.sqrt(attention_head_size)
attention_probs = nn.Softmax(dim=-1)(attention_scores)

# 実際のアテンションと計算結果がおおよそ等しいこと（=Q,K,Vが正しく取れてること）を確認
print(f'{torch.allclose(attention_probs, attention[layer_index])=}') # True

query_layer.shape=torch.Size([1, 12, 11, 64])
key_layer.shape=torch.Size([1, 12, 11, 64])
value_layer.shape=torch.Size([1, 12, 11, 64])
torch.allclose(attention_probs, attention[layer_index])=True


## ベクトル正規化
定数a,cを求めるために総当り法チックにやってるのだけど、他に良い方法あるよね。

In [3]:
def normalize_vectors(query_layer, key_layer, a, c):
    """論文5.1.1 Vector Nomalization（定数a,cを元に正規化）
    query_layer, key_layerは、この前に取得した重みベクトル。
    a, cは定数（スカラ）。
    """
    # before
    num_sequence, num_heads, num_tokens, dims = query_layer.shape
    queries_reshaped = torch.reshape(query_layer, (-1,dims))
    keys_reshaped = torch.reshape(key_layer, (-1,dims))

    # after
    shifted_keys = keys_reshaped + a
    scaled_queries = queries_reshaped * c
    scaled_keys = shifted_keys * (1.0 / c)

    # reshape to orig
    normalized_query = torch.reshape(scaled_queries, (num_sequence, num_heads, num_tokens, dims))
    normalized_key = torch.reshape(scaled_keys, (num_sequence, num_heads, num_tokens, dims))

    return normalized_query, normalized_key

def calc_dot_product(query_layer, key_layer):
    """tensor同士の内積。
    dims次元ベクトルに変換し、内積を求めている。
    """
    # dims次元ベクトルに変換
    dims = query_layer.shape[-1]
    reshaped_queries = torch.reshape(query_layer, (-1,dims)).detach().numpy()
    reshaped_keys = torch.reshape(key_layer, (-1,dims)).detach().numpy()

    # 内積
    dot_product = np.sum(reshaped_queries * reshaped_keys, axis=1)
    return dot_product

def calc_cosine_distance(query_layer, key_layer):
    """tensor同士のコサイン距離。
    dims次元ベクトルに変換し、コサイン距離を求めている。
    """
    # dims次元ベクトルに変換
    dims = query_layer.shape[-1]
    reshaped_queries = torch.reshape(query_layer, (-1,dims)).detach().numpy()
    reshaped_keys = torch.reshape(key_layer, (-1,dims)).detach().numpy()

    # コサイン距離
    norm_queries = np.linalg.norm(reshaped_queries, axis=1)
    norm_keys = np.linalg.norm(reshaped_keys, axis=1)
    dot_product = calc_dot_product(query_layer, key_layer)
    cosine_distance = 1 - dot_product / (norm_queries * norm_keys)
    return cosine_distance

def calc_weighted_corr(a, c, query_layer, key_layer):
    """重み付け相関。
    これで正しいのかわからないけど、論文では単に「weighted correlation metric」と書いている。
    このことを「正規化したベクトルを使った内積とコサイン距離の相関」と解釈して実装。
    """
    normalized_query, normalized_key = normalize_vectors(query_layer, key_layer, a, c)
    normalized_dot_product = calc_dot_product(normalized_query, normalized_key)
    normalized_cosine_distance = calc_cosine_distance(normalized_query, normalized_key)

    weighted_corr = np.corrcoef(normalized_dot_product, normalized_cosine_distance)
    return weighted_corr[0, 1]


# 定数a,cを適当な範囲の組み合わせで総当り。
# aの範囲はもっと大きくしても動作し、より大きな相関が得られるが、描画結果は観察しづらくなる。
# サンプル数の問題？
optimal_a = optimal_c = None
optimal_corr = 0

for a in np.linspace(-0.5, 0.5, 10):
#for a in [0]:
    for c in np.linspace(-2,2, 10):
        if c == 0.0:
            continue
        corr = calc_weighted_corr(a, c, query_layer, key_layer)
        if np.abs(corr) > optimal_corr:
            optimal_a = a
            optimal_c = c
            optimal_corr = np.abs(corr)
            print(f'{optimal_a=}, {optimal_c=}, {optimal_corr=}')

normalized_query, normalized_key = normalize_vectors(query_layer, key_layer, optimal_a, optimal_c)

optimal_a=-0.5, optimal_c=-2.0, optimal_corr=0.8910502854269778
optimal_a=-0.5, optimal_c=-1.5555555555555556, optimal_corr=0.8910502954454634


## 正規化したベクトルにおける相関（コサイン距離 vs 内積）
一つ手前のa,c検索時の出力をみると、初期値の時点でかなり相関の絶対値が大きく、その後ほとんど更新されていない。サンプル数が小さいこと、1レイヤーしか参照していないことが影響していると思うけれども、それ以上に今回の正規化アプローチが効果的であることを意味してるようにも見える。ただしここでいう効果的はあくまでも「コサイン距離と内積の相関を高く維持したまま調整しやすい」ぐらいの意味。ここでは実際に相関がどうだったのかを描画。



In [4]:
import plotly.graph_objects as go
from scipy.optimize import curve_fit

def plot_scatter_with_regression(scaled_cosine_distance, scaled_dot_product, optimal_a, optimal_c, optimal_corr):

    # 回帰直線のフィッティング
    def linear_fit(x, a, b):
        return a * x + b
    popt, _ = curve_fit(linear_fit, scaled_cosine_distance, scaled_dot_product)

    # 散布図を作成
    fig = go.Figure()
    fig.add_trace(go.Scatter(x=scaled_cosine_distance, y=scaled_dot_product,
                             mode='markers', marker=dict(size=8), name='Data Points'))

    # 回帰直線を追加
    x_fit = np.linspace(min(scaled_cosine_distance), max(scaled_cosine_distance), 100)
    y_fit = linear_fit(x_fit, *popt)
    fig.add_trace(go.Scatter(x=x_fit, y=y_fit, mode='lines', line=dict(color='red', width=3), name='Regression Line'))

    # レイアウト設定
    fig.update_layout(title=f"Scatter Plot with Regression Line (layer={layer_index}, a={optimal_a:.3f}, c={optimal_c:.3f}, abs(cor)={optimal_corr:.3f})",
                      xaxis_title="Scaled Cosine Distance",
                      yaxis_title="Scaled Dot Product",
                      showlegend=True)

    # グラフを表示
    #fig.show()
    fig.write_image(f"out/cor_layer{layer_index}.png")

normalized_dot_product = calc_dot_product(normalized_query, normalized_key)
normalized_cosine_distance = calc_cosine_distance(normalized_query, normalized_key)
plot_scatter_with_regression(normalized_cosine_distance, normalized_dot_product, optimal_a, optimal_c, optimal_corr)

In [5]:
from IPython.display import HTML
HTML(f'<html><body><img src="./out/cor_layer{layer_index}.png"</body></html>')

## Q,Kペアのアテンションを加味した位置関係（ヘッド毎に描画）
やりたいことは「アテンションが高いペアをより近くに描画」すること。今回の正規化でどう変わるのかを比較するため、正規化前後でヘッド毎に別グラフを用意してみることに。

- クエリは緑、キーはピンク。（論文通り）
- クエリ＆キーのペアで最大アテンションとなる組み合わせを青色直線で結ぶ。直線の太さをアテンションに応じて調整。
- このうち観察したいトークン（'brown' => 'cap', '##y', '##bara'）が最大ペアの場合には赤色とする。

In [6]:
import plotly.graph_objects as go
from sklearn.decomposition import PCA

def plot2d_queries_and_keys_with_PCA(query_layer, key_layer, attention, comment):
    """クエリ＆キーのペアをPCAで描画。
    図中の mean distance は、最大ペアのPCA空間における平均ユークリッド距離。
    これが近くなることを期待しましたが、今回の結果ではそうならず。

    Note:
      (1) query_layer, key_layerには全シーケンス x 全隠れ層分の重みがそれぞれ入っているが、
      ここでは決め打ちでシーケンス数1個目だけ参照、隠れ層は1番目だけ参照としている。

    Args:
      attention: アテンション行列（このファイル冒頭で取得）。
      comment: ファイル名やキャプションに付けるための文字列。
    """
    num_heads, num_tokens, embedding_dim = query_layer.shape[1:]
    pca = PCA(n_components=2)

    for head_index in range(num_heads):
        # query_layerとkey_layerのベクトルを抽出して2次元に圧縮
        query_vectors = query_layer[0, head_index].reshape(num_tokens, embedding_dim).detach().numpy() # 0 = sequence id
        key_vectors = key_layer[0, head_index].reshape(num_tokens, embedding_dim).detach().numpy() # 0 = sequence id
        all_vectors = np.concatenate([query_vectors, key_vectors])
        pca.fit_transform(all_vectors)

        # 描画の都合でクエリとキーに分ける
        query_pca_result = pca.transform(query_vectors)
        key_pca_result = pca.transform(key_vectors)

        # 最もアテンションが大きいペアを探索
        attention_head = attention[layer_index][0, head_index] # 0 = sequence id
        max_attention_indices = torch.argmax(attention_head, dim=-1)
        
        # グラフを描画
        fig = go.Figure()

        # query_layerの点を描画
        query_trace = go.Scatter(x=query_pca_result[:, 0], y=query_pca_result[:, 1],
                                mode='markers', name='query_layer',
                                marker=dict(color='green', size=10), text=tokens, textposition='bottom center')

        # key_layerの点を描画
        key_trace = go.Scatter(x=key_pca_result[:, 0], y=key_pca_result[:, 1],
                            mode='markers', name='key_layer',
                            marker=dict(color='pink', size=10), text=tokens, textposition='top center')

        # 各点に対応する文字列をアノテーションとして追加
        for i in range(num_tokens):
            query_text_annotation = dict(
                x=query_pca_result[i, 0], y=query_pca_result[i, 1],
                text=tokens[i], showarrow=False,
                font=dict(size=12, color='green')
            )
            key_text_annotation = dict(
                x=key_pca_result[i, 0], y=key_pca_result[i, 1],
                text=tokens[i], showarrow=False,
                font=dict(size=12, color='pink')
            )
            fig.add_annotation(**query_text_annotation)
            fig.add_annotation(**key_text_annotation)

        # 最もアテンションが大きいペアを直線で結ぶ
        for i, max_idx in enumerate(max_attention_indices):
            attettion_value = attention_head[i][max_idx]
            query_point = query_pca_result[i]
            key_point = key_pca_result[max_idx]
            if i == 2 and (3 <= max_idx <=5):
                color = "red"
            else:
                color = "blue"
            fig.add_trace(go.Scatter(x=[query_point[0], key_point[0]], y=[query_point[1], key_point[1]],
                                    mode='lines', line=dict(color=color, width=int(10*attettion_value)), showlegend=False))

        # 2次元空間におけるユークリッド距離を計算
        euclidean_distances = np.linalg.norm(query_pca_result - key_pca_result[max_attention_indices], axis=1)
        mean_distance = np.mean(euclidean_distances)
        
        fig.update_layout(title=f"{comment}: Head {head_index + 1} - Mean Distance: {mean_distance:.2f}",
                        xaxis_title="PCA Component 1",
                        yaxis_title="PCA Component 2",
                        showlegend=True)

        #fig.show()
        fig.write_image(f'./out/qk_{comment}_layer{layer_index}_head{head_index}.png')

plot2d_queries_and_keys_with_PCA(query_layer, key_layer, attention, 'default')
plot2d_queries_and_keys_with_PCA(normalized_query, normalized_key, attention, 'normalized')

In [7]:
from IPython.display import HTML
html_text = f"""
<html>
<body>
<table border="1">
<caption>Naive AttentionViz (bert-base-uncased, layer {layer_index})</caption>
<th>default</th><th>noramlized</th>
"""

num_heads, num_tokens, embedding_dim = query_layer.shape[1:]
for head_idx in range(num_heads):
    html_text += f'<tr><td><img src="./out/qk_default_layer{layer_index}_head{head_idx}.png"></td><td><img src="./out/qk_normalized_layer{layer_index}_head{head_idx}.png"></td></tr>\n'

html_text += "</table></body></html>"

with open(f"./result_layer{layer_index}.html", "w") as f:
    f.write(html_text)

HTML(html_text)

0,1
,
,
,
,
,
,
,
,
,
,
