In [None]:
# PyTorchが使うCPUの数を制限します。(VMを使う場合)
%env OMP_NUM_THREADS=1
%env MKL_NUM_THREADS=1

from torch import set_num_threads, set_num_interop_threads
num_threads = 1
set_num_threads(num_threads)
set_num_interop_threads(num_threads)

# Transformerを用いた文字列予測

Language Modelはこれまでの単語列から次の単語を予測します。例えば、
```
P("pen"|"This is a") = 0.9
P("pineapple"|"This is a") = 0.05
```
のようなイメージです。"This is a"ときたら次の単語は"pen"である確率が90%ということを意味しています。(90%はテキトーな値です。実際はもっと低いと思います。)

このノートブックではTransformerを使って、このような言語モデル(Language Model)を実装してみましょう。
通常はトークン("単語"のような単位)を予測しますが、ここでは簡単のため、"文字"を予測させるようにしましょう。
例
```
P("e"|"This is a p") = 0.8
P("i"|"This is a p") = 0.03
```

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F

まずは文字を数字に直します。ここではアスキーコードを使うことにしましょう。
| 文字 | ASCIIコード |
| --- | --- |
| A | 65 |
| B | 66 |
| ... | ... |
| Z | 90 |
| a | 91 |
| ... | ... |
| z | 122 |

このトークン化をすると、"Hello World!"は

`['H', 'e', 'l', 'l', 'o', ' ', 'W', 'o', 'r', ',', 'd', '!']`

->

`[72, 101, 108, 108, 111, 32, 87, 111, 114, 108, 100, 33]`

と変換されます。('H' = 77, 'e' = 101, ...)

この操作は以下のようにして関数化出来ます。

In [None]:
vocab_size = 128  # アスキー文字の数
def encode(s): return [ord(c) for c in s]
def decode(l): return ''.join([chr(i) for i in l])

In [None]:
encode("Hello World!")  # ['H', 'e', 'l', 'l', 'o', ' ', 'W', 'o', 'r', ',', 'd', '!']

In [None]:
decode([72, 101, 108, 108, 111, 32, 87, 111, 114, 108, 100, 33])

## データセット

データセットは"Tiny Shakespeare"を使ってみます。
これはシェイクスピアの台詞をデータセットとしてまとめたものです。

[tinyshakespeare]("https://raw.githubusercontent.com/karpathy/char-rnn/master/data/tinyshakespeare/input.txt")から取得できます。
今回の実習環境では`/data/staff/deeplearning/tinyshakespeare.txt`にもあります。

In [None]:
with open("/data/staff/deeplearning/tinyshakespeare.txt", "r") as f:
    text = f.read()

# 空行や役名、改行を除外します。
text = " ".join([
    line
    for line in text.split("\n")
    if not (line == '' or line.endswith(":"))
])

print(text[:500])  # 最初の500文字を表示

data = torch.tensor(encode(text), dtype=torch.long)  # Torchのテンソルに変換

上記の処理で、役名や改行などを除去したので、文同士のつながりが分かりづらくなってしまっていますが、単語や文法を学ぶには問題ないでしょう。

このデータセットを使って、次文字予測をします。(例: "Befor" -> "e")

データを効率的に読み出すためにデータセットクラスを定義します。
ここでは`context_size`ごとにテキストを切り取っています。　
元のテキストは"Before we proceed any further ..."でしたが、ここから
| 入力 (x) | 出力ラベル (t) |
| --- | --- |
| Before we  | efore we p |
| efore we p | fore we pr |
| fore we pr | ore we pro |
| ore we pro | re we proc |
| re we proc | e we procc |

のような入力・出力の組み合わせを作成しています。
このトレーニングデータを使って
"Before we "が入力されたときに"p"を出力するようなモデルを学習させます。

In [None]:
from torch.utils.data import Dataset

class CharDataset(Dataset):
    def __init__(self, data, context_size):
        self.data = data
        self.context_size = context_size

    def __len__(self):
        return len(self.data) - self.context_size

    def __getitem__(self, idx):
        x = self.data[idx : idx + self.context_size]
        y = self.data[idx + 1 : idx + self.context_size + 1]
        return x, y
    
context_size = 10 # 入力長（コンテキスト長）
dataset = CharDataset(data, context_size)

# データローダーの作成 (ミニバッチで学習を行います。)
from torch.utils.data import DataLoader
batch_size = 128 # バッチサイズ
dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=True)

In [None]:
# データセットのサンプルを表示
for i in range(10):
    x, y = dataset[i]
    print(f"Input: {decode(x.tolist())}, Target: {decode(y.tolist())}")

TransformerモデルはPyTorchに標準で実装されています。これを用いて`MyTransformer`という名前のモジュールを実装します。

In [None]:
# ---------------------
# Transformer モデル定義
# ---------------------
class MyTransformer(nn.Module):
    def __init__(self, vocab_size, emb_size=16, nhead=1, nhid=32, nlayers=1, context_size=10):
        super().__init__()

        self.context_size = context_size

        # 文字(ASCIIコード)を潜在空間上のベクトルに変換するための埋め込み層
        self.token_emb = nn.Embedding(vocab_size, emb_size)

        # 位置エンコーディングのための学習可能パラメータ
        self.pos_emb = nn.Parameter(torch.zeros(1, context_size, emb_size))

        # Transformerエンコーダー層の定義
        encoder_layer = nn.TransformerEncoderLayer(
            d_model=emb_size,
            nhead=nhead,
            dim_feedforward=nhid,
            batch_first=True
        )
        # Transformerエンコーダーの定義 (エンコーダー層をスタックして構成)
        self.transformer_encoder = nn.TransformerEncoder(
            encoder_layer=encoder_layer,
            num_layers=nlayers
        )
        # 出力層: Transformerの出力を文字(ASCIIコード)に変換する線形層
        self.linear = nn.Linear(emb_size, vocab_size)

    def forward(self, x):
        B, T = x.size()

        if T > self.context_size:
            raise ValueError(f"Input sequence length ({T}) exceeds context_size ({self.context_size}).")

        x = self.token_emb(x) + self.pos_emb[:, :T, :]

        # マスクを生成: Transformerの入力に対して、未来の情報を見えないようにするためのマスクを生成
        mask = torch.nn.Transformer.generate_square_subsequent_mask(T, device=x.device)

        # Transformerエンコーダーに入力を通す
        out = self.transformer_encoder(src=x, mask=mask)

        # 出力層を通して、各トークンの次の文字(ASCIIコード)を予測
        return self.linear(out)


定義した`MyTransformer`をインスタンス化し、中身を見てみましょう。

In [None]:
# モデルのインスタンス化。語彙サイズは128, context_sizeは10、エンコーダ層のレイヤー数は3。
model = MyTransformer(vocab_size, context_size=context_size, nlayers=3)

import torchinfo
torchinfo.summary(model, input_size=(1, context_size), depth=5, col_names=["output_size", "num_params"], dtypes=[torch.long])

`MyTransformer`は埋め込み層(Embedding layer)、Transformerエンコーダ層(3層)、出力線形変換、から構成されています。
- 埋め込み層は ASCIIコード(0~127)を(ここでは)16次元のベクトルに変換します。すなわち文字の一つ一つが16次元のベクトルに変換されます。Transformer内部ではこの16次元のベクトルを変換してきます。
  - 例:
    - 'a' -> (0.01,  0.02, ..., 0.01)
    - 'b' -> (0.00,  0.05, ..., 0.02)
    - 'c' -> (0.00, -0.02, ..., 0.03)
    - (ここではこれらのベクトルは学習可能なパラメータとしているので、学習で最適化されます。)
- 最後のLinear層は16次元のベクトルをASCIIコード(0~127)に変換します。正確には、各文字に対応する確率が出力されます('a'の確率が32%、'b'の確率が2%、といったイメージです。)。最も確率の高い文字を予測値として用います。
  - 例:
    - ('a', 'b', 'c', ...) = (0.02, 0.10, 0.85, ...) のような出力のとき、最大の値を持つ'c'が予測値となります。
- Transformerエンコーダ層は複数のモジュールから構成されていますが、大雑把には`Self-Attention` -> `MLP` の2つで構成されています。

予測(生成)は1文字ずつ実行します。例えば"This"から始めて文字列生成をすると、
1. Model('This ') -> 'i'
2. Model('This i') -> 's'
3. Model('This is') -> ' ' (スペース)
4. Model('This is ') -> 'a'

のように、1文字ずつ、かつそれまでの予測結果を使いながら次の文字を予測してきます。

以下の関数は上記の操作を実行する関数です。

In [None]:
# ---------------------
# 推論
# ---------------------
def generate(model, start_text="", context_size=5, length=5):
    # モデルを評価モードに設定
    model.eval()

    # 入力文字をASCIIコードにエンコードした後、PyTorch Tensor型に変換
    context = torch.tensor(encode(start_text), dtype=torch.long).unsqueeze(0)  # (1, T)
    
    with torch.no_grad():
        # length個の文字を生成する
        for _ in range(length):
            # 順伝搬。文字ごとの出現確率を予測
            logits = model(context[:, -context_size:])

            # 予測した出現確率から、最も高い確率の文字を選ぶ
            next_token = torch.argmax(logits[:, -1, :], dim=-1, keepdim=True)

            # 予測した文字をテキストに追加する
            context = torch.cat([context, next_token], dim=1)

    return decode(context[0].tolist())  # contextをデコード (ASCIIコード -> 文字列)

学習前のモデルに対して生成を実行してみます。

In [None]:
# 30文字文のテキストを生成。
generate(model, "This", context_size=context_size, length=30)

学習前なのでランダムな文字列が出力されています。

それではモデルの学習をさせてみましょう。

In [None]:
# ---------------------
# 学習
# ---------------------
from tqdm import tqdm

optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)

# for i_epoch in range(1):
model.train()

for x_batch, y_batch in tqdm(dataloader):

    # 順伝搬
    # - 入力のshape: (バッチサイズ, トークン長)
    # - 出力のshape: (バッチサイズ, トークン長, 語彙数(=128))
    logits = model(x_batch)
    
    # ロスの計算 (クロスエントロピー損失を使用)
    loss = F.cross_entropy(logits.view(-1, vocab_size), y_batch.view(-1))

    # 誤差逆伝播の前に各パラメータの勾配の値を0にセットする。
    # これをしないと、勾配の値はそれまでの値との和がとられる。
    optimizer.zero_grad()

    # 誤差逆伝播。各パラメータの勾配が計算される。
    loss.backward()

    # 各パラメータの勾配の値を基に、optimizerにより値が更新される。
    optimizer.step()

print(f"loss = {loss}")
# print(f"epoch = {i_epoch}, loss = {loss}")
    

In [None]:
# 30文字文のテキストを生成。
generate(model, "This", context_size=context_size, length=30)

意味のある"文"は予測できていませんが、学習前と比較すると"単語"っぽいものは生成できるようになったと思います。

LLM (Large Language Model)もここで実装したものと同じような仕組みで単語を出力させています。自然な文章を出力させるためには、より大きなモデル、より大きなデータセット、膨大な計算資源が必要となります。