# ゲート付きRNN

RNN層にゲートと呼ばれる機構を追加して長期的な文脈が保持できるようになったもの。

In [1]:
import random

import sentencepiece as spm
import torch
from torch import nn, optim
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader, random_split
from torch.nn.utils.rnn import pad_sequence
from dlprog import train_progress

In [2]:
prog = train_progress(with_test=True)
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
device

device(type='cuda')

### 学習データの用意

In [3]:
n_data = 1000
textfile = f'data/jawiki_{n_data}.txt'
tokenizer_prefix = f'models/tokenizer_jawiki_{n_data}'

In [4]:
with open(textfile) as f:
    data = f.read().splitlines()

In [5]:
sp = spm.SentencePieceProcessor(f'{tokenizer_prefix}.model')
n_vocab = len(sp)

unk_id = sp.unk_id()
bos_id = sp.bos_id()
eos_id = sp.eos_id()
pad_id = sp.pad_id()

data_ids = sp.encode(data)
for ids in data_ids:
    ids.insert(0, bos_id)
    ids.append(eos_id)

print('num of vocabrary:', n_vocab)
data_ids[0][:10] # example

num of vocabrary: 8000


[1, 12, 19, 6255, 55, 1058, 59, 1686, 80, 123]

In [6]:
class TextDataset(Dataset):
    def __init__(self, data_ids):
        self._n_samples = len(data_ids)
        self.data = [torch.tensor(ids) for ids in data_ids]

    def __getitem__(self, idx):
        in_text = self.data[idx][:-1]
        out_text = self.data[idx][1:]
        return in_text, out_text

    def __len__(self):
        return self._n_samples

def collate_fn(batch):
    in_text, out_text = zip(*batch)
    in_text = pad_sequence(in_text, batch_first=True, padding_value=pad_id)
    out_text = pad_sequence(out_text, batch_first=True, padding_value=pad_id)
    return in_text, out_text

batch_size = 32
dataset = TextDataset(data_ids)
train_dataset, test_dataset = random_split(dataset, [0.8, 0.2])
train_loader = DataLoader(
    train_dataset,
    batch_size=batch_size,
    shuffle=True,
    collate_fn=collate_fn
)
test_loader = DataLoader(
    test_dataset,
    batch_size=batch_size,
    collate_fn=collate_fn
)

sample = next(iter(train_loader))
sample[0].shape

torch.Size([32, 1083])


---

## ゲート

あるデータをどれくらい通すかを示したもの。具体的には対称のデータと同じサイズのベクトル。0-1の値をとる。  
NNで実装してみる。

In [7]:
class Gate(nn.Module):
    def __init__(self, input_size):
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(input_size, input_size),
            nn.Sigmoid()
        )

    def forward(self, x):
        return self.net(x)

入力されたデータを線形変化し、sigmoid関数に入力するだけ。このモデルにあるデータを入力したときの出力値が、そのデータのゲートとなる。  
ゲートを元のデータに掛けることで、元のデータの一部を**通した**ということになる。

In [8]:
input_size = 3
gate = Gate(input_size)

x = torch.randn(input_size)
y = x * gate(x)
print('input:', x)
print('gate:', gate(x))
print('output:', y)

input: tensor([ 0.5143,  0.1596, -0.4854])
gate: tensor([0.5667, 0.5508, 0.4446], grad_fn=<SigmoidBackward0>)
output: tensor([ 0.2915,  0.0879, -0.2158], grad_fn=<MulBackward0>)



---

## GRU

*Gate Recurrent Unit*

ゲート付きRNNの一種。

一旦RNNの復習をしよう。


RNNはある時間$t$の入力$x_t$に対して以下のような演算で出力値$h_t$を決定する。

$$
h_t = \mathrm{tanh}(W_x x_t + b_x + W_h h_{t-1} + b_h)
$$

この$x_t$と$h_{t-1}$の全結合の部分は$\mathrm{fc}(x,h)$で表すことにしよう。

$$
\begin{align}
h_t &= \mathrm{tanh}(\mathrm{fc}(x_t,h_{t-1})) \\
\mathrm{fc}(x,h) &= W_x x + b_x + W_h h + b_h
\end{align}
$$

んで、$\mathrm{fc}(x,h)$の実装もしておこう。

In [9]:
class FullyConnected(nn.Module):
    def __init__(self, input_size, hidden_size):
        super().__init__()
        self.fc_input = nn.Linear(input_size, hidden_size)
        self.fc_hidden = nn.Linear(hidden_size, hidden_size)

    def forward(self, x, h):
        return self.fc_input(x) + self.fc_hidden(h)

では、GRUの構造を見ていこう。  
GRUは以下のような演算で出力値$h_t$を決定する。

$$
\begin{align}
h_t &= (1 - z_t) \odot \tilde{h}_t + z_t \odot h_{t-1} \\
\tilde{h}_t &= \mathrm{tanh}(\mathrm{fc}_{\tilde h}(x_t,h_{t-1})) \\
z_t &= \sigma(\mathrm{fc}_{z}(x_t,h_{t-1})) \\
\end{align}
$$

$\sigma(x)$はsigmoid関数。

RNNでは新たなデータ$\tilde h_t$がそのまま出力されていた。  
GRUでは、新たなデータ$\tilde h_t$を古いデータ$h_{t-1}$に足して出力する。そして、その際の比率をゲート$z_t$で決める。この$z_t$は$h_{t-1}$をどれだけ通すかを表す。

In [10]:
class SimpleGRU(nn.Module):
    def __init__(self, input_size, hidden_size):
        super().__init__()
        self.fc = FullyConnected(input_size, hidden_size)
        self.gate = nn.Sequential(
            FullyConnected(input_size, hidden_size),
            nn.Sigmoid()
        )

    def forward(self, x, h):
        h_new = F.tanh(self.fc(x, h))
        z = self.gate(x)
        h = (1 - z) * h_new + z * h
        return h

このように、GRUではゲートを用いて新たなデータをどれだけ取り入れるべきか、そして古いデータをどれだけ捨てるか考えることが出来る。  
この枠組みの下で学習を行うことで、長期的に保持すべきデータをしっかりと保持できるようになることが期待される。

ちなみに、上記のモデルは一般的なGRUを私が簡略化したもの。  
一般的なGRUは、上記のモデルにゲートを一つ追加した以下のモデルである。


$$
\begin{align}
h_t &= (1 - z_t) \odot \tilde{h}_t + z_t \odot h_{t-1} \\
\tilde{h}_t &= \mathrm{tanh}(\mathrm{fc}_{\tilde h}(x_t,r_t \odot h_{t-1})) \\
z_t &= \sigma(\mathrm{fc}_{z}(x_t,h_{t-1})) \\
r_t &= \sigma(\mathrm{fc}_{r}(x_t,h_{t-1})) \\
\end{align}
$$

新なデータ$\tilde h_t$を生成する際に、古いデータ$h_{t-1}$をどれだけ考慮するかを決めるゲート$r_t$が追加されている。

In [11]:
class GRU(nn.Module):
    def __init__(self, input_size, hidden_size):
        super().__init__()
        self.fc_input = FullyConnected(input_size, hidden_size)
        self.gate_update = nn.Sequential(
            FullyConnected(input_size, hidden_size),
            nn.Sigmoid()
        )
        self.gate_reset = nn.Sequential(
            FullyConnected(input_size, hidden_size),
            nn.Sigmoid()
        )

    def forward(self, x, h):
        r = self.gate_reset(x, h)
        h_new = F.tanh(self.fc_input(x, r * h))
        z = self.gate_update(x)
        h = (1 - z) * h_new + z * h
        return h

また、RNN同様、PyTorchにクラスとして`torch.nn.GRU`が用意されている:  
[GRU — PyTorch 2.0 documentation](https://pytorch.org/docs/stable/generated/torch.nn.GRU.html)

In [12]:
gru = nn.GRU(input_size, input_size)


---

## LSTM

*Long Short-Term Memory*

長短期記憶

GRUの進化版。考え方はGRUと同じで、RNNにゲートを取り入れてイイ感じにしたもの。  
ちなみに、GRUよりLSTMの方が先に提案されている。GRUはLSTMの簡易版として後から提案された。

LSTMには出力する隠れ状態$h_t$だけでなく、**記憶セル**と呼ばれる変数$c_t$を持つ。記憶セルはLSTMの外に出力されることはなく、LSTM内部でのみ使用される。

まず簡単に文字で説明する。  
記憶セル$c_t$がGRUでの隠れ状態$h_t$に当たり、ゲートを用いた不要な情報の削除と新たな情報の追加が行われる。なおゲートの生成には入力$x_t$と前の隠れ状態$h_{t-1}$を用いる（記憶セルは用いない）。そしてこの記憶セルを活性化関数に通したものをLSTMの出力=隠れ状態$h_t$とする。

具体的な構造を見てみよう。

$$
\begin{align}
h_t &= o_t \odot \mathrm{tanh}(c_t) \\
c_t &= f_t \odot c_{t-1} + i_t \odot \tilde c_t \\
\tilde c_t &= \mathrm{tanh}(\mathrm{fc}_{\tilde c}(x_t,h_{t-1})) \\
i_t &= \sigma(\mathrm{fc}_{i}(x_t,h_{t-1})) \\
f_t &= \sigma(\mathrm{fc}_{f}(x_t,h_{t-1})) \\
o_t &= \sigma(\mathrm{fc}_{o}(x_t,h_{t-1})) \\
\end{align}
$$


- $\tilde c_t$: 新たな情報。
- $i_t$: inputゲート。新たな情報$\tilde c_t$をどれだけ取り入れるかを決める。
- $f_t$: forgetゲート。古い情報$c_{h-1}$をどれだけ保持するかを決めるゲート。
- $o_t$: outputゲート。出力する隠れ状態の量を決めるゲート。

GRUでは1つのゲートを用いて新たな情報と古い情報の比率を決めていたが、LSTMでは別々のゲートを用いて決める。

実装は以下の通り。

In [13]:
class LSTM(nn.Module):
    def __init__(self, input_size, hidden_size):
        super().__init__()
        self.gate_input = nn.Sequential(
            FullyConnected(input_size, hidden_size),
            nn.Sigmoid()
        )
        self.gate_forget = nn.Sequential(
            FullyConnected(input_size, hidden_size),
            nn.Sigmoid()
        )
        self.gate_output = nn.Sequential(
            FullyConnected(input_size, hidden_size),
            nn.Sigmoid()
        )
        self.fc = FullyConnected(input_size, hidden_size)

    def forward(self, x, h, c):
        c_new = F.tanh(self.fc(x, h))
        i = self.gate_input(x, h)
        f = self.gate_forget(x, h)
        o = self.gate_output(x, h)
        c = f * c + i * c_new
        h = o * F.tanh(c)
        return h, c

PyTorchにも`torch.nn.LSTM`が用意されている:  
[LSTM — PyTorch 2.0 documentation](https://pytorch.org/docs/stable/generated/torch.nn.LSTM.html)


---

## LSTMを用いた言語モデル

せっかくなので、LSTMで言語モデルを作ってみよう。RNNLMのRNN層をLSTMに変更するだけ。

モデル。RNN層の部分をLSTMに変更する。  
LSTMは隠れ状態$h$と記憶セル$c$の2つを出力するので、それらを与える・受け取ることが出来るようにする。

In [14]:
class LanguageModel(nn.Module):
    def __init__(self, n_vocab, embed_size, hidden_size):
        super().__init__()
        self.embedding = nn.Embedding(n_vocab, embed_size)
        self.lstm = nn.LSTM(embed_size, hidden_size, batch_first=True)
        self.fc = nn.Linear(hidden_size, n_vocab)

    def forward(self, x, hc=None):
        x = self.embedding(x) # (seq_len, embed_size)
        y, (h, c) = self.lstm(x, hc) # (seq_len, hidden_size)
        y = self.fc(y) # (seq_len, n_vocab)
        return y, (h, c)

では学習させる。  
ミニバッチ&Truncated BPTT。

In [15]:
cross_entropy = nn.CrossEntropyLoss(ignore_index=pad_id)
def loss_fn(y, t):
    """
    y: (batch_size, seq_length, n_vocab)
    t: (batch_size, seq_length)
    """
    loss = cross_entropy(y.reshape(-1, n_vocab), t.ravel())
    return loss

def eval_model(model, trunc_len=100):
    model.eval()
    losses = []
    with torch.no_grad():
        for x, t in test_loader:
            hc = None
            for i in range(0, x.shape[1], trunc_len):
                x_batch = x[:, i:i+trunc_len].to(device)
                t_batch = t[:, i:i+trunc_len].to(device)
                y, hc = model(x_batch, hc)
                loss = loss_fn(y, t_batch)
                losses.append(loss.item())
    loss = sum(losses) / len(losses)
    return loss

def train(model, optimizer, trunc_len, n_epochs, prog_unit=1):
    prog.start(n_iter=len(train_loader), n_epochs=n_epochs, unit=prog_unit)
    for _ in range(n_epochs):
        model.train()
        for x, t in train_loader:
            hc = None
            for i in range(0, x.shape[1], trunc_len):
                x_batch = x[:, i:i+trunc_len].to(device)
                t_batch = t[:, i:i+trunc_len].to(device)
                optimizer.zero_grad()
                y, (h, c) = model(x_batch, hc)
                loss = loss_fn(y, t_batch)
                loss.backward()
                optimizer.step()
                prog.update(loss.item(), advance=0)
                hc = (h.detach(), c.detach())
            prog.update()

        if prog.now_epoch % prog_unit == 0:
            test_loss = eval_model(model)
            prog.memo(f'test: {test_loss:.5f}', no_step=True)
        prog.memo()

In [16]:
n_vocab = len(sp)
embed_size = 512
hidden_size = 512
model = LanguageModel(n_vocab, hidden_size, hidden_size).to(device)
optimizer = optim.Adam(model.parameters(), lr=1e-4)

In [17]:
train(model, optimizer, trunc_len=100, n_epochs=100, prog_unit=10)

   1-10/100: ############################## 100% [00:01:05.29] loss train: 6.43617, test: 6.58057 
  11-20/100: ############################## 100% [00:01:04.33] loss train: 5.05783, test: 6.37913 
  21-30/100: ############################## 100% [00:01:05.00] loss train: 3.94386, test: 6.36970 
  31-40/100: ############################## 100% [00:01:04.13] loss train: 3.04396, test: 6.43398 
  41-50/100: ############################## 100% [00:01:04.77] loss train: 2.31738, test: 6.54909 
  51-60/100: ############################## 100% [00:01:04.83] loss train: 1.77019, test: 6.75696 
  61-70/100: ############################## 100% [00:01:04.68] loss train: 1.34886, test: 6.96692 
  71-80/100: ############################## 100% [00:01:08.01] loss train: 1.01704, test: 7.24643 
  81-90/100: ############################## 100% [00:01:08.88] loss train: 0.77764, test: 7.49640 
 91-100/100: ############################## 100% [00:01:07.45] loss train: 0.60005, test: 7.69774 


lossの減り方はRNNの方が良いね。

文章を生成してみる。

In [18]:
def token_sampling(y):
    y.squeeze_(0)
    y[unk_id] = -torch.inf
    probs = F.softmax(y, dim=-1)
    token, = random.choices(range(n_vocab), weights=probs)
    return token

@torch.no_grad()
def generate_sentence(
    model: nn.Module,
    start: str = '',
    max_len: int = 50
) -> str:
    model.eval()
    token_ids = sp.encode(start)
    token_ids.insert(0, bos_id)
    x = torch.tensor(token_ids, device=device)
    y, (h, c) = model(x)
    next_token = token_sampling(y)
    token_ids.append(next_token)

    while len(token_ids) <= max_len and next_token != eos_id:
        x = torch.tensor([next_token], device=device)
        y, (h, c) = model(x, (h, c))
        next_token = token_sampling(y)
        token_ids.append(next_token)

    sentence = sp.decode(token_ids)
    return sentence

In [19]:
for _ in range(5):
    print(generate_sentence(model, max_len=50))

いすゞ・成果・券組はけようり、ピアノがこな入学により下であるほか確変更に誕し、激しい血思想がめられているなければならない。程のの中で費や象無線根田最高の誕生生と
「ばってん税の列車は手師を繰り返していたが、配ねなかけるとしているが、そこのような司陳情は内容 -で結成準備だが、この天皇が開催され外国人経済要因である事に広まっ付した。しかしにおいても
テキサス中有髄線維はと考える学部フに住み、クラリネット2にアレクサンドウキャストを曲参して成功を計画した。
決勝で実質年代は権な力圏を経験うものから小川まで異た暗の324年36で「馬G主義川」がある。
紀伊続風土記(福島から)のうち、中央部・マーケティングの美術高額から評価ブームが、翌4年には国士舘南北に対処する。(187後量は5年にNス4圏(.29年)を完


良くなったのだろうか。ぱっと見分らん。