ChatGPTが大バズリしている昨今です。僕はプロンプトを考えるのが面倒なので（ええ...)あまり使わないのですが、友人が論文を書くのに使っていたり、僕の母親が話し相手に使っていたりするようです。親不孝な息子でごめんなさいという感じもしますが、僕はいまだにTransformerが何なのかすらよくわかっていないで、これを機に何をやっているのかくらいは理解してみます。

# 参考にしたもの
- [Formal Algorithms for Transformers](https://arxiv.org/abs/2207.09238)
  - 疑似コードをまとめた論文です。これが一番わかりやすいと思うので、とりあえずこれを見ればいいと思います。他にも[The Illustrated GPT-2](https://jalammar.github.io/illustrated-gpt2/)などを見たのですが、よくわかりませんでした。
- [Shumi-Note Transformer](https://github.com/syuntoku14/Shumi-Note/blob/main/notebooks/NN_transformer.ipynb)
  - これを見て真似しようと思ったのがこの記事のきっかけです。

# トークン列のエンコーディング
Transformerでは、入力されたトークン列に対し、multihead attentionと呼ばれるレイヤーを繰り返し適用します。そこで、まずどのようにトークン列をベクトルに射影するのかを説明します。

## 最初に列があった
トークン列というのは文字通りトークンからなる列のことです。トークンは有限集合の要素です。実用上はbyte pair encodingにより得られたsubwordなどがこれに該当しますが、とりあえず気にしなくていいです。トークンの集合を$V$とし、$[Nv] := {1, ..., Nv}$と番号付けしておきます。トークン列を$x = x[1: l]$と書きます。また、トークン列の最大の長さを$L$とします。トークンとして連続値や無限集合は扱えないと思いますが、素人なので何か抜け道があるかどうかは知りません。

## トークンからベクトルに
適当な$d_e \times Nv$次元の行列$W_e$を使って、$v$番目のトークンから埋め込み（Token embedding）を $e = W_e[:, v]$により得ます。これは$d_e$次元のベクトルになります。なお、numpy風に$i$番目の行ベクトルを$W[i, :]$、$j$番目の列ベクトルを$W[:, j]$と書いています。この行列$W_e$は勾配降下により学習されるようです。

## ついでに位置もベクトルに
適当な$d_p \times L$次元の行列$W_p$を使って、トークン列中の$l$番目にトークンがあるという情報から、位置埋め込み（Positional embedding）を $p = W_p[:, l]$により得ます。これも$d_e$次元のベクトルになります。正直なんの意味があるのかよくわからないのですが、これを先程のトークン埋め込みに足してトークン列$x$の$t$番目のトークン$x[t]$に対する埋め込みを$e = W_e[:, x[t]] + W_p[:, t]$によって得ます。これ足して大丈夫なのかな？って思うんですが。
位置埋め込みは、学習されることもあるようですが、Transformerが最初に提案された[Attention Is All You Need](https://arxiv.org/abs/1706.03762)の論文では、以下のように構成されています。
$$
\begin{align*}
W_p[2i - 1, t] &= \sin (\frac{t}{L^{2i / d_e}}) \\
W_p[2i, t] &= \cos (\frac{t}{L^{2i / d_e}}) \\
&~~~~~(0 < 2i \leq d_e)
\end{align*}
$$
これを$L=50, d_e = 5$として可視化してみましょう。

In [None]:
import numpy as np
from matplotlib import pyplot as plt

L = 50
d_e = 5
x = np.arange(L)
for i in range(1, 1 + d_e):
    if i % 2 == 0:
        w_p = np.sin(x / L ** (i / d_e))
    else:
        w_p = np.cos(x / L ** ((i - 1) / d_e)) 
    _ = plt.plot(x, w_p, label=f"i={i}")
plt.legend()

というわけで、この埋めこみは各成分ごとに異なる周波数での単語を埋め込むようです。これにより、短いコンテキストの中での位置も同時に考慮できるのかな。

# Attention
Transformerの主要な構成要素になるのがAttentionです。Attentionでは、入力されたトークン列中のすべてのトークンの組み合わせについて、そいつらの組み合わせがどれくらい重要なのかというモデル化を行います。具体的に、単一クエリに対するAttentionでは、現在のトークンから得た埋め込み$e_t$と$x$中のすべてのトークンの埋め込み$e_0, e_1, ..., e_{Nv} \in E$に対し、以下のような操作を行います。
$$
\begin{align*}
q_t &\leftarrow W_q e_t + b_q \\
k_{t'} &\leftarrow W_k e_{t'} + b_k,~\forall e_{t'} \in E \\
v_{t'} &\leftarrow W_v e_{t'} + b_v,~\forall e_{t'} \in E \\
\alpha_{t'} &\leftarrow \frac{\exp(q_t^\top k_{t'} / \sqrt{d_{\textrm{attn}}})}{\sum_u \exp(q_t^\top k_{t'} / \sqrt{d_{\textrm{attn}}})},~\forall e_{t'} \in E \\
v_\textrm{attr} &\leftarrow \sum_{t = 1}^T \alpha_{t'} v_{t'}
\end{align*}
$$
埋め込みの次元を$d_\textrm{in}$、出力の次元と$d_\textrm{out}$とすると、$W_q, Q_k$は$d_\textrm{attn} \times e$の行列、$W_q, Q_k$は$d_\textrm{out} \times d_\textrm{in}$の行列、$b_*$はベクトル（バイアス項）です。ここで、$q^\top k_{t'}$の値でソフトマックスをとって$v$にマスクをかけるので、これは現在のトークンと$t'$番目のトークンが「どれくらい対応しているか」を表していてほしいです。$v_{t'}$が何を表しているかはタスクによって異なると思いますが、$t'$番目のトークンの埋め込みに線形に関係する値が入っているはずです。このトークン列に後ろ向きの因果関係がない場合（あるトークン$x[t]$が、任意の未来のトークン$x[t']~\textrm{where}~t < t'$に依存しない場合）は、$\alpha_{t'}$にマスクをかける($\alpha_{t'}[i] = 0 ~\textrm{if}~t < i$)こともあります。なので、未来を予測する際にはこのマスクをかけるのが一般的なようです。

実際に、時系列から何か（次の単語、ラベルなど）を予測する際には、この単一クエリに対するAttentionを長さ$T$の系列内のすべてのトークンに対して計算し、$d_\textrm{out} \times T$の行列$\tilde{V}$を得ます。

とりあえずこれを学習させてみましょう。今回は[jax](https://jax.readthedocs.io/en/latest/)と[equinox](https://docs.kidger.site/equinox/)を使ってモデルを書いてみます。

In [None]:
import equinox as eqx
import jax
import jax.numpy as jnp


class Attention(eqx.Module):
    w_q: jax.Array
    b_q: jax.Array
    w_k: jax.Array
    b_k: jax.Array
    w_v: jax.Array
    b_v: jax.Array
    sqrt_d_attn: float

    def __init__(self, d_in: int, d_attn: int, d_out: int, key: jax.Array) -> None:
        wq_key, bq_key, wk_key, bk_key, wv_key, bv_key = jax.random.split(key, 6)
        self.w_q = jax.random.normal(wq_key, (d_attn, d_in))
        self.b_q = jax.random.normal(bq_key, (d_attn, 1))
        self.w_k = jax.random.normal(wk_key, (d_attn, d_in))
        self.b_k = jax.random.normal(bk_key, (d_attn, 1))
        self.w_v = jax.random.normal(wv_key, (d_out, d_in))
        self.b_v = jax.random.normal(bv_key, (d_out, 1))
        self.sqrt_d_attn = float(np.sqrt(d_attn))

    def __call__(self, e: jax.Array) -> jax.Array:
        """Take a matrix e with shape [d_in x seq_len], compute attention for all tokens in e.
        Outputs a matrix with shape [d_out x seq_len]
        """
        q = self.w_q @ e + self.b_q
        k = self.w_k @ e + self.b_k
        v = self.w_v @ e + self.b_v
        alpha = jax.nn.softmax(q.T @ k / self.sqrt_d_attn, axis=-1)
        return v @ alpha.T


def causal_mask(x: jax.Array) -> jax.Array:
    ltri = jnp.tri(x.shape[0], dtype=bool, k=-1)
    return jax.lax.select(ltri, jnp.ones_like(x) * -jnp.inf, x)


class MaskedAttention(Attention):
    def __call__(self, e: jax.Array) -> jax.Array:
        q = self.w_q @ e + self.b_q
        k = self.w_k @ e + self.b_k
        v = self.w_v @ e + self.b_v
        score = causal_mask(q.T @ k) / self.sqrt_d_attn
        alpha = jax.nn.softmax(score, axis=-1)
        return v @ alpha.T

これを学習させてみましょう。トークンとして、天気🌧️・☁️・☀️を考えます。この3つの記号に対し適当な埋め込みを与えて、次の日の天気を学習させてみます。よくわからないので、ダブらないようにトークン埋め込みを$[-1, 0, 1]$、位置埋め込みを$1 / t$としてみましょう。最大文字列長は適当に10にします。

In [None]:
TOKEN_EMBEDDING = {
    "🌧️": -1.0,
    "☁️": 0.0,
    "☀️": 1.0,
}

def get_embedding(seq: str, max_seq_len: int | None = None) -> np.ndarray:
    if max_seq_len is None:
        max_seq_len = len(seq)
    length = len(seq) // 2
    e = np.zeros(length)
    for i in range(length):
        x = seq[i * 2: i * 2 + 2]
        e[i] = TOKEN_EMBEDDING[x] + (i + 1) / max_seq_len
    return e.reshape(1, length)

## Markovモデルの学習

まず簡単なモデルで天気を生成してみます。**次の日の天気は、前の日の天気にもとづいて確率的に決まる**ことにしましょう。🌧️・☁️・☀️がマルチバイト文字であることに注意して、以下のように実装します。

In [None]:
import dataclasses

_GEN = np.random.Generator(np.random.PCG64(11111))
_MARKOV = {
    "": [0.3, 0.4, 0.3],
    "🌧️": [0.6, 0.3, 0.1],
    "☁️": [0.3, 0.4, 0.3],
    "☀️": [0.2, 0.3, 0.5],
}

WEATHERS = ["🌧️", "☁️", "☀️"]


def markov(prev: str) -> str:
    prob = _MARKOV[prev[-2:]]
    return prev + _GEN.choice(WEATHERS, p=prob)


def generate(f, n: int):
    value = ""
    for _ in range(n):
        value = f(value)
    return value


@dataclasses.dataclass
class Dataset:
    weathers: list[str]
    embeddings: jax.Array
    next_weather_indices: jax.Array
    
    def __len__(self) -> int:
        return len(self.weathers)


def make_dataset(f, seq_len, size) -> Dataset:
    w_list, e_list, nw_list = [], [], []
    for _ in range(size):
        weathers = generate(f, seq_len + 1)
        e = jnp.array(get_embedding(weathers[:-2]))
        w_list.append(weathers)
        e_list.append(e)
        nw_list.append(WEATHERS.index(weathers[-2:]))
    return Dataset(w_list, jnp.stack(e_list), jnp.array(nw_list))


generate(markov, 20)

こんな感じです。いま、次の日の天気だけ予測したいので、モデルの出力は集合{🌧️・☁️・☀️}上での確率分布が適切でしょう。Attentionは長さ$T$の埋め込み列に対して長さ$d_\textrm{out} \times T$の行列をかえします。なので、$d_\textrm{out} = 3$とし、Attentionの出力$\tilde{V}$に対してソフトマックス関数を適用し、$P_t = \textrm{softmax}(\tilde{V}[:, t])$とします。このとき、$P_t$の各要素が次の日🌧️・☁️・☀️になる確率を表すとして、モデル化します。これを、対数尤度の和$\sum_t \log P_t(\textrm{next weather})$を最大化するように学習しましょう。学習のコードを定義します。

In [None]:
import optax


def neg_logp(model: eqx.Module, seq: jax.Array, next_w: jax.Array) -> jax.Array:
    batch_size = seq.shape[0]
    tilde_v = jax.vmap(model)(seq)  # B x OUT x SEQ_LEN
    logp = jax.nn.log_softmax(tilde_v, axis=1)  # B x OUT x SEQ_LEN
    logp_masked = logp * jax.nn.one_hot(next_w, num_classes=3).reshape(-1, 3, 1)
    return -jnp.mean(jnp.sum(logp_masked.reshape(batch_size, -1), axis=-1))


compute_loss = eqx.filter_value_and_grad(neg_logp)
evaluate_model = jax.jit(neg_logp)


def train(
    n_epochs: int,
    minibatch_size: int,
    model: eqx.Module,
    ds: Dataset,
    test_ds: Dataset,
    key: jax.Array,
    learning_rate: float = 1e-2,
) -> tuple[eqx.Module, jax.Array, list[float], list[float]]:
    n_data = len(ds)
    indices = jnp.arange(n_data)
    optim = optax.adam(learning_rate)

    @eqx.filter_jit
    def train_1step(
        model: eqx.Module,
        seq: jax.Array,
        next_w: jax.Array,
        opt_state: optax.OptState,
    ) -> tuple[jax.Array, eqx.Module, optax.OptState]:
        loss, grads = compute_loss(model, seq, next_w)
        updates, opt_state = optim.update(grads, opt_state)
        model = eqx.apply_updates(model, updates)
        return loss, model, opt_state

    opt_state = optim.init(model)
    n_optim_epochs = n_data // minibatch_size
    loss_list, eval_list = [], []
    for epoch in range(n_epochs):
        key, shuffle_key = jax.random.split(key)
        shuffled_indices = jax.random.shuffle(shuffle_key, indices)
        for _ in range(n_optim_epochs):
            e = ds.embeddings[shuffled_indices]
            next_w = ds.next_weather_indices[shuffled_indices]
            loss, model, opt_state = train_1step(model, e, next_w, opt_state)
            loss_list.append(loss.item())
            test_loss = evaluate_model(
                model, test_ds.embeddings, test_ds.next_weather_indices
            )
            eval_list.append(test_loss.item())
    return model, key, loss_list, eval_list

これを実際に走らせてみます。適当に、Attentionの次元を8、天気列の長さを10にしましょう。

In [None]:
D_ATTN = 8
SEQ_LEN = 10
key = jax.random.PRNGKey(123)
model = MaskedAttention(1, D_ATTN, 3, key)
ds = make_dataset(markov, SEQ_LEN, 1000)
test_ds = make_dataset(markov, SEQ_LEN, 100)
model, key, loss_list, eval_list = train(50, 100, model, ds, test_ds, key, 1e-3)
plt.plot(loss_list, label="Training Loss")
plt.plot(eval_list, label="Test Loss")
plt.xlabel("Training Epochs")
plt.ylabel("Negative Log Likelihood")
plt.legend()