#DL基礎最終課題

第6回目の講義でRNNとLSTMの実装について学びました。その講義の中で紹介されたのはRNN, GRU, LSTM, Attentionでした。
その中で演習でLSTMにフォーカスしてsentiment analysisタスクに取り組んだので、今回は演習で扱われなかった部分について自分の手でConvLSTMを実装し、実験しました。
実験環境は以下の通りです。

###必要なライブラリのインポートと前処理

In [1]:
!pip install portalocker
!pip install einops

Collecting portalocker
  Downloading portalocker-2.7.0-py2.py3-none-any.whl (15 kB)
Installing collected packages: portalocker
Successfully installed portalocker-2.7.0
Collecting einops
  Downloading einops-0.6.1-py3-none-any.whl (42 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m42.2/42.2 kB[0m [31m1.4 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: einops
Successfully installed einops-0.6.1


In [2]:
import random
import math
import numpy as np
import string
import re
from collections import Counter
from typing import List
import torch
import torch.nn as nn
import torch.optim as optim
import torch.autograd as autograd
from torch.utils.data import DataLoader
import torch.nn.functional as F
from torch.nn.utils.rnn import pad_sequence
import torchtext
from torchtext import data
from torchtext import datasets
from torchtext.vocab import vocab
from torchtext.data.utils import get_tokenizer
from sklearn.metrics import f1_score
from einops.layers.torch import Rearrange
from einops import rearrange

In [3]:
seed = 1234
torch.manual_seed(seed)
np.random.seed(seed)
random.seed(seed)

In [4]:
# torch.log(0)によるnanを防ぐための関数
def torch_log(x):
    return torch.log(torch.clamp(x, min=1e-10))

In [5]:
!pip list | grep torch

torch                            2.0.1+cu118
torchaudio                       2.0.2+cu118
torchdata                        0.6.1
torchsummary                     1.5.1
torchtext                        0.15.2
torchvision                      0.15.2+cu118


In [6]:
train_iter = datasets.IMDB(split='train')

train_iter, valid_iter = train_iter.random_split(
    weights={"train": 0.8, "valid": 0.2},
    seed=seed,
    total_length=len(list(train_iter)),
)

In [7]:
# 単語をスペースで区切り，!"#$%&といった記号を除去する，すべて小文字化する，などの処理
# https://pytorch.org/text/stable/data_utils.html
tokenizer = get_tokenizer("basic_english")

counter = Counter()

for label, line in train_iter:
    counter.update(tokenizer(line))

vocabulary = vocab(
    counter,
    min_freq=25,
    specials=('<unk>', '<PAD>', '<BOS>', '<EOS>')
)
# <unk>をデフォルトに設定することにより，min_freq回以上出てこない単語は<unk>になる
vocabulary.set_default_index(vocabulary['<unk>'])

word_num = len(vocabulary)

print(f"単語種数: {word_num}")
print(*vocabulary.get_itos()[:100], sep=', ')

単語種数: 9937
<unk>, <PAD>, <BOS>, <EOS>, i, am, curious, yellow, is, a, and, pretentious, steaming, pile, ., it, doesn, ', t, matter, what, one, s, political, views, are, because, this, film, can, hardly, be, taken, seriously, on, any, level, as, for, the, claim, that, frontal, male, nudity, an, automatic, ,, isn, true, ve, seen, films, with, granted, they, only, offer, some, fleeting, but, where, ?, nowhere, don, exist, same, goes, those, crappy, cable, shows, swinging, in, not, sight, indie, movies, like, brown, bunny, which, we, re, treated, to, site, of, vincent, johnson, trace, pink, visible, chloe, before, crying, (, or, ), matters


In [8]:
def text_transform(_text, max_length=256):
    # <BOS>と<EOS>の分 -2
    text = [vocabulary[token] for token in tokenizer(_text)][:max_length - 2]
    text = [vocabulary['<BOS>']] + text + [vocabulary['<EOS>']]

    return text, len(text)

def collate_batch(batch):
   label_list, text_list, len_seq_list = [], [], []

   for _label, _text in batch:
      # torchtext==0.14.0まではnegativeは'neg', positiveは'pos'
      # torchtext==0.15.1からはnegativeは1，positiveは2なので，-1して{0, 1}にする
      label_list.append(_label - 1)

      processed_text, len_seq = text_transform(_text)
      text_list.append(torch.tensor(processed_text))
      len_seq_list.append(len_seq)

   return torch.tensor(label_list), pad_sequence(text_list, padding_value=1).T, torch.tensor(len_seq_list)

In [9]:
batch_size = 128

train_dataloader = DataLoader(
   list(train_iter),
   batch_size=batch_size,
   shuffle=True,
   collate_fn=collate_batch
)
valid_dataloader = DataLoader(
   list(valid_iter),
   batch_size=batch_size,
   shuffle=False,
   collate_fn=collate_batch
)

In [10]:
# 学習関数の定義
def train(
    net,
    optimizer,
    n_epochs,
):
    for epoch in range(n_epochs):
        losses_train = []
        losses_valid = []

        net.train()
        n_train = 0
        acc_train = 0
        for label, line, len_seq in train_dataloader:
            net.zero_grad()  # 勾配の初期化

            t = label.to(device) # テンソルをGPUに移動
            x = line.to(device) # ( batch, time )
            len_seq.to(device)

            h = net(x, torch.max(len_seq), len_seq)
            y = torch.sigmoid(h).squeeze()

            loss = -torch.mean(t * torch_log(y) + (1 - t) * torch_log(1 - y))

            loss.backward()  # 誤差の逆伝播

            optimizer.step()  # パラメータの更新

            losses_train.append(loss.tolist())

            n_train += t.size()[0]

        # Valid
        t_valid = []
        y_pred = []
        net.eval()
        for label, line, len_seq in valid_dataloader:

            t = label.to(device) # テンソルをGPUに移動
            x = line.to(device)
            len_seq.to(device)

            h = net(x, torch.max(len_seq), len_seq)
            y = torch.sigmoid(h).squeeze()

            loss = -torch.mean(t*torch_log(y) + (1 - t)*torch_log(1 - y))

            pred = y.round().squeeze()  # 0.5以上の値を持つ要素を正ラベルと予測する

            t_valid.extend(t.tolist())
            y_pred.extend(pred.tolist())

            losses_valid.append(loss.tolist())

        print('EPOCH: {}, Train Loss: {:.3f}, Valid Loss: {:.3f}, Validation F1: {:.3f}'.format(
            epoch,
            np.mean(losses_train),
            np.mean(losses_valid),
            f1_score(t_valid, y_pred, average='macro')
        ))

### 1. Recurrent Neural Network (RNN) によるIMDbのsentiment analysis

In [11]:
class Embedding(nn.Module):
    def __init__(self, emb_dim, vocab_size):
        super().__init__()
        self.embedding_matrix = nn.Parameter(torch.rand((vocab_size, emb_dim),
                                                        dtype=torch.float))

    def forward(self, x):
        return F.embedding(x, self.embedding_matrix)

In [12]:
class RNN(nn.Module):
    def __init__(self, in_dim, hid_dim):
        super().__init__()
        self.hid_dim = hid_dim
        glorot = 6 / (in_dim + hid_dim*2)
        self.W = nn.Parameter(torch.tensor(np.random.uniform(
                        low=-np.sqrt(glorot),
                        high=np.sqrt(glorot),
                        size=(in_dim + hid_dim, hid_dim)
                    ).astype('float32')))
        self.b = nn.Parameter(torch.tensor(np.zeros([hid_dim]).astype('float32')))

    def function(self, h, x):
        return torch.tanh(torch.matmul(torch.cat([h, x], dim=1), self.W) + self.b)

    def forward(self, x, len_seq_max=0, init_state=None):
        x = x.transpose(0, 1)  # 系列のバッチ処理のため、次元の順番を「系列、バッチ」の順に入れ替える
        state = init_state

        if init_state is None:  # 初期値を設定しない場合は0で初期化する
            state = torch.zeros((x[0].size()[0], self.hid_dim)).to(x.device)

        size = list(state.unsqueeze(0).size())
        size[0] = 0
        output = torch.empty(size, dtype=torch.float).to(x.device)  # 一旦空テンソルを定義して順次出力を追加する

        if len_seq_max == 0:
            len_seq_max = x.size(0)
        for i in range(len_seq_max):
            state = self.function(state, x[i])
            output = torch.cat([output, state.unsqueeze(0)])  # 出力系列の追加
        return output

In [None]:
class SequenceTaggingNet(nn.Module):
    def __init__(self, word_num, emb_dim, hid_dim):
        super().__init__()
        self.emb = Embedding(emb_dim, word_num)
        self.rnn = RNN(emb_dim, hid_dim)
        self.linear = nn.Linear(hid_dim, 1)

    def forward(self, x, len_seq_max=0, len_seq=None, init_state=None):
        h = self.emb(x)
        h = self.rnn(h, len_seq_max, init_state)
        if len_seq is not None:
            # 系列が終わった時点での出力を取る必要があるので len_seq を元に集約する
            h = h[len_seq - 1, list(range(len(x))), :]
        else:
            h = h[-1]
        y = self.linear(h)
        return y

In [None]:
emb_dim = 100
hid_dim = 50
n_epochs = 5
device = 'cuda'

net = SequenceTaggingNet(word_num, emb_dim, hid_dim)
net.to(device)

optimizer = optim.Adam(net.parameters())

train(net, optimizer, n_epochs)

EPOCH: 0, Train Loss: 0.693, Valid Loss: 0.688, Validation F1: 0.527
EPOCH: 1, Train Loss: 0.664, Valid Loss: 0.658, Validation F1: 0.595
EPOCH: 2, Train Loss: 0.596, Valid Loss: 0.651, Validation F1: 0.625
EPOCH: 3, Train Loss: 0.490, Valid Loss: 0.592, Validation F1: 0.718
EPOCH: 4, Train Loss: 0.471, Valid Loss: 0.716, Validation F1: 0.631


### 2. Long short-term memory (LSTM)によるIMDbのsentiment analysis

実装する式は次のようになります．($\odot$は要素ごとの積)

- 入力ゲート: $\hspace{20mm}\boldsymbol{i}_t = \mathrm{\sigma} \left(\boldsymbol{W}_i \left[\begin{array}{c} \boldsymbol{x}_t \\ \boldsymbol{h}_{t-1} \end{array}\right] + \boldsymbol{b}_i\right)$
- 忘却ゲート: $\hspace{20mm}\boldsymbol{f}_t = \mathrm{\sigma} \left(\boldsymbol{W}_f \left[\begin{array}{c} \boldsymbol{x}_t \\ \boldsymbol{h}_{t-1} \end{array}\right] + \boldsymbol{b}_f\right)$
- 出力ゲート: $\hspace{20mm}\boldsymbol{o}_t = \mathrm{\sigma} \left(\boldsymbol{W}_o \left[\begin{array}{c} \boldsymbol{x}_t \\ \boldsymbol{h}_{t-1} \end{array}\right] + \boldsymbol{b}_o\right)$
- セル:　　　 $\hspace{20mm}\boldsymbol{c}_t = \boldsymbol{f}_t \odot \boldsymbol{c}_{t-1} + \boldsymbol{i}_t \odot \tanh \left(\boldsymbol{W}_c \left[\begin{array}{c} \boldsymbol{x}_t \\ \boldsymbol{h}_{t-1} \end{array}\right] + \boldsymbol{b}_c\right)$
- 隠れ状態: 　$\hspace{20mm}\boldsymbol{h}_t = \boldsymbol{o}_t \odot \tanh \left(\boldsymbol{c}_t \right)$

In [13]:
class LSTM(nn.Module):
    def __init__(self, in_dim, hid_dim):
        super().__init__()
        self.hid_dim = hid_dim
        glorot = 6/(in_dim + hid_dim*2)

        self.W_i = nn.Parameter(torch.tensor(np.random.uniform(
                        low=-np.sqrt(glorot),
                        high=np.sqrt(glorot),
                        size=(in_dim + hid_dim, hid_dim)
                    ).astype('float32')))
        self.b_i = nn.Parameter(torch.tensor(np.zeros([hid_dim]).astype('float32')))

        self.W_f = nn.Parameter(torch.tensor(np.random.uniform(
                        low=-np.sqrt(glorot),
                        high=np.sqrt(glorot),
                        size=(in_dim + hid_dim, hid_dim)
                    ).astype('float32')))
        self.b_f = nn.Parameter(torch.tensor(np.zeros([hid_dim]).astype('float32')))

        self.W_o = nn.Parameter(torch.tensor(np.random.uniform(
                        low=-np.sqrt(glorot),
                        high=np.sqrt(glorot),
                        size=(in_dim + hid_dim, hid_dim)
                    ).astype('float32')))
        self.b_o = nn.Parameter(torch.tensor(np.zeros([hid_dim]).astype('float32')))

        self.W_c = nn.Parameter(torch.tensor(np.random.uniform(
                        low=-np.sqrt(glorot),
                        high=np.sqrt(glorot),
                        size=(in_dim + hid_dim, hid_dim)
                    ).astype('float32')))
        self.b_c = nn.Parameter(torch.tensor(np.zeros([hid_dim]).astype('float32')))

    def function(self, state_c, state_h, x):
        i = torch.sigmoid(torch.matmul(torch.cat([state_h, x], dim=1), self.W_i) + self.b_i)
        f = torch.sigmoid(torch.matmul(torch.cat([state_h, x], dim=1), self.W_f) + self.b_f)
        o = torch.sigmoid(torch.matmul(torch.cat([state_h, x], dim=1), self.W_o) + self.b_o)
        c = f*state_c + i*torch.tanh(torch.matmul(torch.cat([state_h, x], dim=1), self.W_c) + self.b_c)
        h = o*torch.tanh(c)
        return c, h

    def forward(self, x, len_seq_max=0, init_state_c=None, init_state_h=None):
        x = x.transpose(0, 1)  # 系列のバッチ処理のため、次元の順番を「系列、バッチ」の順に入れ替える
        state_c = init_state_c
        state_h = init_state_h
        if init_state_c is None:  # 初期値を設定しない場合は0で初期化する
            state_c = torch.zeros((x[0].size()[0], self.hid_dim)).to(x.device)
        if init_state_h is None:  # 初期値を設定しない場合は0で初期化する
            state_h = torch.zeros((x[0].size()[0], self.hid_dim)).to(x.device)

        size = list(state_h.unsqueeze(0).size())
        size[0] = 0
        output = torch.empty(size, dtype=torch.float).to(x.device)  # 一旦空テンソルを定義して順次出力を追加する

        if len_seq_max == 0:
            len_seq_max = x.size(0)
        for i in range(len_seq_max):
            state_c, state_h = self.function(state_c, state_h, x[i])
            output = torch.cat([output, state_h.unsqueeze(0)])  # 出力系列の追加
        return output

In [None]:
class SequenceTaggingNet2(nn.Module):
    def __init__(self, word_num, emb_dim, hid_dim):
        super().__init__()
        self.emb = Embedding(emb_dim, word_num)
        self.lstm = LSTM(emb_dim, hid_dim)
        self.linear = nn.Linear(hid_dim, 1)

    def forward(self, x, len_seq_max=0, len_seq=None, init_state=None):
        h = self.emb(x)
        h = self.lstm(h, len_seq_max, init_state)
        if len_seq is not None:
            # 系列が終わった時点での出力を取る必要があるので len_seq を元に集約する
            h = h[len_seq - 1, list(range(len(x))), :]
        else:
            h = h[-1]
        y = self.linear(h)
        return y

In [None]:
emb_dim = 100
hid_dim = 50
n_epochs = 5
device = 'cuda'

net = SequenceTaggingNet2(word_num, emb_dim, hid_dim)
net.to(device)
optimizer = optim.Adam(net.parameters())

train(net, optimizer, n_epochs)

EPOCH: 0, Train Loss: 0.680, Valid Loss: 0.720, Validation F1: 0.373
EPOCH: 1, Train Loss: 0.621, Valid Loss: 0.622, Validation F1: 0.682
EPOCH: 2, Train Loss: 0.549, Valid Loss: 0.567, Validation F1: 0.696
EPOCH: 3, Train Loss: 0.416, Valid Loss: 0.420, Validation F1: 0.814
EPOCH: 4, Train Loss: 0.297, Valid Loss: 0.333, Validation F1: 0.862


h_size=[256, 128, 50]\
batch_size=128\
h_dim=50

###LSTM(Peephole Connection)によるIMDbのsentiment analysis

実装する式

- 入力ゲート: $\hspace{20mm}\boldsymbol{i}_t = \mathrm{\sigma} \left(\boldsymbol{W}_i \left[\begin{array}{c} \boldsymbol{x}_t \\ \boldsymbol{h}_{t-1} \\ \boldsymbol {c}_{t-1} \end{array}\right] + \boldsymbol{b}_i\right)$
- 忘却ゲート: $\hspace{20mm}\boldsymbol{f}_t = \mathrm{\sigma} \left(\boldsymbol{W}_f \left[\begin{array}{c} \boldsymbol{x}_t \\ \boldsymbol{h}_{t-1} \\ \boldsymbol{c}_{t-1} \end{array}\right] + \boldsymbol{b}_f\right)$
- 出力ゲート: $\hspace{20mm}\boldsymbol{o}_t = \mathrm{\sigma} \left(\boldsymbol{W}_o \left[\begin{array}{c} \boldsymbol{x}_t \\ \boldsymbol{h}_{t-1} \\ \boldsymbol{c}_{t-1} \end{array}\right] + \boldsymbol{b}_o\right)$
- セル:　　　 $\hspace{20mm}\boldsymbol{c}_t = \boldsymbol{f}_t \odot \boldsymbol{c}_{t-1} + \boldsymbol{i}_t \odot \tanh \left(\boldsymbol{W}_c \left[\begin{array}{c} \boldsymbol{x}_t \\ \boldsymbol{h}_{t-1} \end{array}\right] + \boldsymbol{b}_c\right)$
- 隠れ状態: 　$\hspace{20mm}\boldsymbol{h}_t = \boldsymbol{o}_t \odot \tanh \left(\boldsymbol{c}_t \right)$

In [None]:
class LSTM2(nn.Module):
    def __init__(self, in_dim, hid_dim):
        super().__init__()
        self.hid_dim = hid_dim
        glorot = 6/(in_dim + hid_dim*2)

        self.W_i = nn.Parameter(torch.tensor(np.random.uniform(
                        low=-np.sqrt(glorot),
                        high=np.sqrt(glorot),
                        size=(in_dim + hid_dim*2, hid_dim)
                    ).astype('float32')))
        self.b_i = nn.Parameter(torch.tensor(np.zeros([hid_dim]).astype('float32')))

        self.W_f = nn.Parameter(torch.tensor(np.random.uniform(
                        low=-np.sqrt(glorot),
                        high=np.sqrt(glorot),
                        size=(in_dim + hid_dim*2, hid_dim)
                    ).astype('float32')))
        self.b_f = nn.Parameter(torch.tensor(np.zeros([hid_dim]).astype('float32')))

        self.W_o = nn.Parameter(torch.tensor(np.random.uniform(
                        low=-np.sqrt(glorot),
                        high=np.sqrt(glorot),
                        size=(in_dim + hid_dim*2, hid_dim)
                    ).astype('float32')))
        self.b_o = nn.Parameter(torch.tensor(np.zeros([hid_dim]).astype('float32')))

        self.W_c = nn.Parameter(torch.tensor(np.random.uniform(
                        low=-np.sqrt(glorot),
                        high=np.sqrt(glorot),
                        size=(in_dim + hid_dim, hid_dim)
                    ).astype('float32')))
        self.b_c = nn.Parameter(torch.tensor(np.zeros([hid_dim]).astype('float32')))

    def function(self, state_c, state_h, x):
        i = torch.sigmoid(torch.matmul(torch.cat([state_h, x, state_c], dim=1), self.W_i) + self.b_i)
        f = torch.sigmoid(torch.matmul(torch.cat([state_h, x, state_c], dim=1), self.W_f) + self.b_f)
        o = torch.sigmoid(torch.matmul(torch.cat([state_h, x, state_c], dim=1), self.W_o) + self.b_o)
        c = f*state_c + i*torch.tanh(torch.matmul(torch.cat([state_h, x], dim=1), self.W_c) + self.b_c)
        h = o*torch.tanh(c)
        return c, h

    def forward(self, x, len_seq_max=0, init_state_c=None, init_state_h=None):
        x = x.transpose(0, 1)  # 系列のバッチ処理のため、次元の順番を「系列、バッチ」の順に入れ替える
        state_c = init_state_c
        state_h = init_state_h
        if init_state_c is None:  # 初期値を設定しない場合は0で初期化する
            state_c = torch.zeros((x[0].size()[0], self.hid_dim)).to(x.device)
        if init_state_h is None:  # 初期値を設定しない場合は0で初期化する
            state_h = torch.zeros((x[0].size()[0], self.hid_dim)).to(x.device)

        size = list(state_h.unsqueeze(0).size())
        size[0] = 0
        output = torch.empty(size, dtype=torch.float).to(x.device)  # 一旦空テンソルを定義して順次出力を追加する

        if len_seq_max == 0:
            len_seq_max = x.size(0)
        for i in range(len_seq_max):
            state_c, state_h = self.function(state_c, state_h, x[i])
            output = torch.cat([output, state_h.unsqueeze(0)])  # 出力系列の追加
        return output

In [None]:
class SequenceTaggingNet3(nn.Module):
    def __init__(self, word_num, emb_dim, hid_dim):
        super().__init__()
        self.emb = Embedding(emb_dim, word_num)
        self.lstm = LSTM2(emb_dim, hid_dim)
        self.linear = nn.Linear(hid_dim, 1)

    def forward(self, x, len_seq_max=0, len_seq=None, init_state=None):
        h = self.emb(x)
        h = self.lstm(h, len_seq_max, init_state)
        if len_seq is not None:
            # 系列が終わった時点での出力を取る必要があるので len_seq を元に集約する
            h = h[len_seq - 1, list(range(len(x))), :]
        else:
            h = h[-1]
        y = self.linear(h)
        return y

In [None]:
emb_dim = 100
hid_dim = 50
n_epochs = 5
device = 'cuda'

net = SequenceTaggingNet3(word_num, emb_dim, hid_dim)
net.to(device)
optimizer = optim.Adam(net.parameters())

train(net, optimizer, n_epochs)

EPOCH: 0, Train Loss: 0.690, Valid Loss: 0.701, Validation F1: 0.336
EPOCH: 1, Train Loss: 0.677, Valid Loss: 0.647, Validation F1: 0.747
EPOCH: 2, Train Loss: 0.580, Valid Loss: 0.476, Validation F1: 0.827
EPOCH: 3, Train Loss: 0.413, Valid Loss: 0.392, Validation F1: 0.846
EPOCH: 4, Train Loss: 0.361, Valid Loss: 0.368, Validation F1: 0.856


###TransformerによるIMDbのsentiment analysis

In [14]:
class LSTM3(nn.Module):
    def __init__(self, in_dim, hid_dim):
        super().__init__()
        self.hid_dim = hid_dim
        glorot = 6/(in_dim + hid_dim*2)

        self.W_i = nn.Parameter(torch.tensor(np.random.uniform(
                        low=-np.sqrt(glorot),
                        high=np.sqrt(glorot),
                        size=(in_dim + hid_dim*2, hid_dim)
                    ).astype('float32')))
        self.b_i = nn.Parameter(torch.tensor(np.zeros([hid_dim]).astype('float32')))

        self.W_f = nn.Parameter(torch.tensor(np.random.uniform(
                        low=-np.sqrt(glorot),
                        high=np.sqrt(glorot),
                        size=(in_dim + hid_dim*2, hid_dim)
                    ).astype('float32')))
        self.b_f = nn.Parameter(torch.tensor(np.zeros([hid_dim]).astype('float32')))

        self.W_o = nn.Parameter(torch.tensor(np.random.uniform(
                        low=-np.sqrt(glorot),
                        high=np.sqrt(glorot),
                        size=(in_dim + hid_dim*2, hid_dim)
                    ).astype('float32')))
        self.b_o = nn.Parameter(torch.tensor(np.zeros([hid_dim]).astype('float32')))

        self.W_c = nn.Parameter(torch.tensor(np.random.uniform(
                        low=-np.sqrt(glorot),
                        high=np.sqrt(glorot),
                        size=(in_dim + hid_dim, hid_dim)
                    ).astype('float32')))
        self.b_c = nn.Parameter(torch.tensor(np.zeros([hid_dim]).astype('float32')))

    def function(self, state_c, state_h, x):
        i = torch.sigmoid(torch.matmul(torch.cat([state_h, x, state_c], dim=1), self.W_i) + self.b_i)
        f = torch.sigmoid(torch.matmul(torch.cat([state_h, x, state_c], dim=1), self.W_f) + self.b_f)
        o = torch.sigmoid(torch.matmul(torch.cat([state_h, x, state_c], dim=1), self.W_o) + self.b_o)
        c = f*state_c + i*torch.tanh(torch.matmul(torch.cat([state_h, x], dim=1), self.W_c) + self.b_c)
        h = o*torch.tanh(c)
        return c, h

    def forward(self, x, len_seq_max=0, init_state_c=None, init_state_h=None):
        x = x.transpose(0, 1)  # 系列のバッチ処理のため、次元の順番を「系列、バッチ」の順に入れ替える
        state_c = init_state_c
        state_h = init_state_h
        if init_state_c is None:  # 初期値を設定しない場合は0で初期化する
            state_c = torch.zeros((x[0].size()[0], self.hid_dim)).to(x.device)
        if init_state_h is None:  # 初期値を設定しない場合は0で初期化する
            state_h = torch.zeros((x[0].size()[0], self.hid_dim)).to(x.device)

        size = list(state_h.unsqueeze(0).size())
        size[0] = 0
        output = torch.empty(size, dtype=torch.float).to(x.device)  # 一旦空テンソルを定義して順次出力を追加する

        if len_seq_max == 0:
            len_seq_max = x.size(0)
        for i in range(len_seq_max):
            state_c, state_h = self.function(state_c, state_h, x[i])
            output = torch.cat([output, state_c.unsqueeze(0)])  # 出力系列の追加
        return output

In [15]:
# Multi-Head Attentionの実装
class Attention(nn.Module):
    def __init__(self, dim, heads, dim_head, dropout=0.):
        """
        Arguments
        ---------
        dim : int
            入力データの次元数．埋め込み次元数と一致する．
        heads : int
            ヘッドの数．
        dim_head : int
            各ヘッドのデータの次元数．
        dropout : float
            Dropoutの確率(default=0.)．
        """
        super().__init__()

        self.dim = dim
        self.dim_head = dim_head
        inner_dim = dim_head * heads  # ヘッドに分割する前のQ, K, Vの次元数．self.dimと異なっても良い．
        project_out = not (heads == 1 and dim_head == dim)  # headsが1，dim_headがdimと等しければ通常のSelf-Attention

        self.heads = heads
        self.scale = math.sqrt(dim_head)  # ソフトマックス関数を適用する前のスケーリング係数(dim_k)

        self.attend = nn.Softmax(dim=-1)  # アテンションスコアの算出に利用するソフトマックス関数
        self.dropout = nn.Dropout(dropout)

        # Q, K, Vに変換するための全結合層
        self.to_q = nn.Linear(in_features=dim, out_features=inner_dim)
        self.to_k = nn.Linear(in_features=dim, out_features=inner_dim)
        self.to_v = nn.Linear(in_features=dim, out_features=inner_dim)

        # dim != inner_dimなら線形層を入れる，そうでなければそのまま出力
        self.to_out = nn.Sequential(
            nn.Linear(in_features=inner_dim, out_features=dim),
            nn.Dropout(dropout),
        ) if project_out else nn.Identity()

    def forward(self, x):
        """
        B: バッチサイズ
        N: 系列長
        D: データの次元数(dim)
        """
        B, N, D = x.size()

        # 入力データをQ, K, Vに変換する
        # (B, N, dim) -> (B, N, inner_dim)
        q = self.to_q(x)
        k = self.to_k(x)
        v = self.to_v(x)

        # Q, K, Vをヘッドに分割する
        # (B, N, inner_dim) -> (B, heads, N, dim_head)
        q = rearrange(q, "b n (h d) -> b h n d", h=self.heads, d=self.dim_head)
        k = rearrange(k, "b n (h d) -> b h n d", h=self.heads, d=self.dim_head)
        v = rearrange(v, "b n (h d) -> b h n d", h=self.heads, d=self.dim_head)

        # QK^T / sqrt(d_k)を計算する
        # (B, heads, N, dim_head) x (B, heads, dim_head, N) -> (B, heads, N, N)
        dots = torch.matmul(q, k.transpose(-2, -1)) / self.scale

        # ソフトマックス関数でスコアを算出し，Dropoutをする
        attn = self.attend(dots)
        attn = self.dropout(attn)

        # softmax(QK^T / sqrt(d_k))Vを計算する
        # (B, heads, N, N) x (B, heads, N, dim_head) -> (B, heads, N, dim_head)
        out = torch.matmul(attn ,v)

        # もとの形に戻す
        # (B, heads, N, dim_head) -> (B, N, dim)
        out = rearrange(out, "b h n d -> b n (h d)", h=self.heads, d=self.dim_head)

        # 次元が違っていればもとに戻して出力
        # 表現の可視化のためにattention mapも返すようにしておく
        return self.to_out(out), attn

In [16]:
# Feed-Forward Networkの実装
class FFN(nn.Module):
    def __init__(self, dim, hidden_dim, dropout=0.):
        """
        Arguments
        ---------
        dim : int
            入力データの次元数．
        hidden_dim : int
            隠れ層の次元．
        dropout : float
            各全結合層の後のDropoutの確率(default=0.)．
        """
        super().__init__()

        self.net = nn.Sequential(
            nn.Linear(in_features=dim, out_features=hidden_dim),
            nn.GELU(),
            nn.Dropout(dropout),
            nn.Linear(in_features=hidden_dim, out_features=dim),
            nn.Dropout(dropout),
        )

    def forward(self, x):
        """
        (B, D) -> (B, D)
        B: バッチサイズ
        D: 次元数
        """
        return self.net(x)

In [17]:
class Block(nn.Module):
    def __init__(self, dim, heads, dim_head, mlp_dim, dropout):
        """
        TransformerのEncoder Blockの実装．

        Arguments
        ---------
        dim : int
            埋め込みされた次元数．PatchEmbedのembed_dimと同じ値．
        heads : int
            Multi-Head Attentionのヘッドの数．
        dim_head : int
            Multi-Head Attentionの各ヘッドの次元数．
        mlp_dim : int
            Feed-Forward Networkの隠れ層の次元数．
        dropout : float
            Droptou層の確率p．
        """
        super().__init__()

        self.attn_ln = nn.LayerNorm(dim)  # Attention前のLayerNorm
        self.attn = Attention(dim, heads, dim_head, dropout)
        self.ffn_ln = nn.LayerNorm(dim)  # FFN前のLayerNorm
        self.ffn = FFN(dim, mlp_dim, dropout)

    def forward(self, x, return_attn=False):
        """
        x: (B, N, dim)
        B: バッチサイズ
        N: 系列長
        dim: 埋め込み次元
        """
        y, attn = self.attn(self.attn_ln(x))
        if return_attn:  # attention mapを返す（attention mapの可視化に利用）
            return attn
        x = y + x
        out = self.ffn(self.ffn_ln(x)) + x

        return out

In [20]:
class SequenceTaggingNet5(nn.Module):
    def __init__(self, word_num, emb_dim, hid_dim, heads, dim_head, mlp_dim):
        super().__init__()
        self.emb = Embedding(emb_dim, word_num)
        self.block = Block(hid_dim, heads, dim_head, mlp_dim, 0.)
        self.linear = nn.Linear(hid_dim, 1)

    def forward(self, x, len_seq_max=0, len_seq=None, init_state=None):
        h = self.emb(x)
        h = self.block(h)
        if len_seq is not None:
            # 系列が終わった時点での出力を取る必要があるので len_seq を元に集約する
            h = h[len_seq - 1, list(range(len(x))), :]
        else:
            h = h[-1]

        y = self.linear(h)
        return y

In [21]:
emb_dim = 100
hid_dim = 50
n_epochs = 5
heads = 5
dim_head = 10
mlp_dim = 100
device = 'cuda'

net = SequenceTaggingNet5(word_num, emb_dim, hid_dim, heads, dim_head, mlp_dim)
net.to(device)
optimizer = optim.Adam(net.parameters())

In [22]:
train(net, optimizer, n_epochs)

EPOCH: 0, Train Loss: 0.691, Valid Loss: 0.682, Validation F1: 0.501
EPOCH: 1, Train Loss: 0.578, Valid Loss: 0.605, Validation F1: 0.725
EPOCH: 2, Train Loss: 0.335, Valid Loss: 0.758, Validation F1: 0.674
EPOCH: 3, Train Loss: 0.249, Valid Loss: 0.800, Validation F1: 0.675
EPOCH: 4, Train Loss: 0.187, Valid Loss: 0.928, Validation F1: 0.650


###LSTM+TransformerによるIMDbのsentiment analysis

改変\
TRansformerモデルのPositional Encoding部分をLSTMで置換\
参考文献：https://note.com/diatonic_codes/n/nab29c78bbf2e

In [18]:
class SequenceTaggingNet4(nn.Module):
    def __init__(self, word_num, emb_dim, hid_dim, heads, dim_head, mlp_dim):
        super().__init__()
        self.emb = Embedding(emb_dim, word_num)
        self.lstm = LSTM3(emb_dim, hid_dim)
        self.block = Block(hid_dim, heads, dim_head, mlp_dim, 0.)
        self.linear = nn.Linear(hid_dim, 1)

    def forward(self, x, len_seq_max=0, len_seq=None, init_state=None):
        h = self.emb(x)
        h = self.lstm(h, len_seq_max, init_state)
        h = self.block(h)
        if len_seq is not None:
            # 系列が終わった時点での出力を取る必要があるので len_seq を元に集約する
            h = h[len_seq - 1, list(range(len(x))), :]
        else:
            h = h[-1]

        y = self.linear(h)
        return y

In [19]:
emb_dim = 100
hid_dim = 50
n_epochs = 5
heads = 5
dim_head = 10
mlp_dim = 100
device = 'cuda'

net = SequenceTaggingNet4(word_num, emb_dim, hid_dim, heads, dim_head, mlp_dim)
net.to(device)
optimizer = optim.Adam(net.parameters())


for epoch in range(n_epochs):
    losses_train = []
    losses_valid = []

    net.train()
    n_train = 0
    acc_train = 0
    for label, line, len_seq in train_dataloader:
        net.zero_grad()  # 勾配の初期化

        t = label.to(device) # テンソルをGPUに移動
        x = line.to(device) # ( batch, time )
        len_seq.to(device)

        h = net(x, torch.max(len_seq), len_seq)
        y = torch.sigmoid(h).squeeze()

        loss = -torch.mean(t * torch_log(y) + (1 - t) * torch_log(1 - y))

        loss.backward()  # 誤差の逆伝播

        optimizer.step()  # パラメータの更新

        losses_train.append(loss.tolist())

        n_train += t.size()[0]

        # Valid
    t_valid = []
    y_pred = []
    net.eval()
    for label, line, len_seq in valid_dataloader:

        t = label.to(device) # テンソルをGPUに移動
        x = line.to(device)
        len_seq.to(device)

        h = net(x, torch.max(len_seq), len_seq)
        y = torch.sigmoid(h).squeeze()

        loss = -torch.mean(t*torch_log(y) + (1 - t)*torch_log(1 - y))

        pred = y.round().squeeze()  # 0.5以上の値を持つ要素を正ラベルと予測する

        t_valid.extend(t.tolist())
        y_pred.extend(pred.tolist())

        losses_valid.append(loss.tolist())

    print('EPOCH: {}, Train Loss: {:.3f}, Valid Loss: {:.3f}, Validation F1: {:.3f}'.format(
        epoch,
        np.mean(losses_train),
        np.mean(losses_valid),
        f1_score(t_valid, y_pred, average='macro')
    ))

EPOCH: 0, Train Loss: 0.699, Valid Loss: 0.689, Validation F1: 0.542
EPOCH: 1, Train Loss: 0.655, Valid Loss: 0.680, Validation F1: 0.614
EPOCH: 2, Train Loss: 0.416, Valid Loss: 0.842, Validation F1: 0.629
EPOCH: 3, Train Loss: 0.263, Valid Loss: 1.012, Validation F1: 0.580
EPOCH: 4, Train Loss: 0.208, Valid Loss: 0.832, Validation F1: 0.674
