In [None]:
# ライブラリの読み込み
import numpy as np
import pandas as pd
import scipy
import matplotlib.pyplot  as plt
import torch
import torch.nn as nn
import torch.optim as optimizers
import torch.nn.functional as F 
import torchtext
import glob
import os
import io
import string
import re
import random
from torchtext.vocab import Vectors
from utils.dataloader import get_IMDb_DataLoaders_and_TEXT

np.random.seed(9837)
torch.manual_seed(9837)
pd.set_option("display.max_rows", 250)
pd.set_option("display.max_columns", 100)

# データの前処理

## データファイルの作成

In [None]:
# 前処理の関数
def preprocessing_text(text):
    # 改行コードを消去
    text = re.sub('<br />', '', text)

    # カンマ、ピリオド以外の記号をスペースに置換
    for p in string.punctuation:
        if (p == ".") or (p == ","):
            continue
        else:
            text = text.replace(p, " ")

    # ピリオドなどの前後にはスペースを入れておく
    text = text.replace(".", " . ")
    text = text.replace(",", " , ")
    return text

# 分かち書き（今回はデータが英語で、簡易的にスペースで区切る）
def tokenizer_punctuation(text):
    return text.strip().split()


# 前処理と分かち書きをまとめた関数を定義
def tokenizer_with_preprocessing(text):
    text = preprocessing_text(text)
    ret = tokenizer_punctuation(text)
    return ret

In [None]:
# 訓練データのtsvファイルを作成
path = "D:/Statistics/data/deep_leraning/nlp/transformer/"

f = open(path + "IMDb_train.tsv", "w", encoding="utf-8")

positive_path = path + "aclImdb/train/pos/"
for fname in glob.glob(os.path.join(positive_path, "*.txt")):
    with io.open(fname, "r", encoding="utf-8") as ff:
        text = ff.readline()

        # タブがあれば消去
        text = text.replace("\t", " ")

        text = text+"\t"+"1"+"\t"+"\n"
        f.write(text)

negative_path = path + "aclImdb/train/neg/"
for fname in glob.glob(os.path.join(negative_path, "*.txt")):
    with io.open(fname, "r", encoding="utf-8") as ff:
        text = ff.readline()

        # タブがあれば消去
        text = text.replace("\t", " ")

        text = text+"\t"+"0"+"\t"+"\n"
        f.write(text)

f.close()

# テストデータのtsvファイルを作成
f = open(path + "IMDb_test.tsv", "w", encoding="utf-8")

positive_path = path + "aclImdb/test/pos/"
for fname in glob.glob(os.path.join(positive_path, "*.txt")):
    with io.open(fname, "r", encoding="utf-8") as ff:
        text = ff.readline()

        # タブがあれば消去
        text = text.replace("\t", " ")

        text = text+"\t"+"1"+"\t"+"\n"
        f.write(text)

negative_path = path + "aclImdb/test/neg/"
for fname in glob.glob(os.path.join(negative_path, "*.txt")):
    with io.open(fname, "r", encoding="utf-8") as ff:
        text = ff.readline()

        # タブがあれば消去
        text = text.replace("\t", " ")

        text = text+"\t"+"0"+"\t"+"\n"
        f.write(text)

f.close()

## DataLoaderの作成

In [None]:
# textとラベルを定義
# 文章とラベルの両方を用意
max_length = 256
TEXT = torchtext.data.Field(sequential=True, tokenize=tokenizer_with_preprocessing, use_vocab=True,
                            lower=True, include_lengths=True, batch_first=True, fix_length=max_length, 
                            init_token="<cls>", eos_token="<eos>")
LABEL = torchtext.data.Field(sequential=False, use_vocab=False)

# フォルダ「data」からtsvファイルを読み込み
train_val_ds, test_ds = torchtext.data.TabularDataset.splits(path=path, train="IMDb_train.tsv", test="IMDb_test.tsv", format="tsv",
                                                             fields=[("Text", TEXT), ("Label", LABEL)])

# torchtext.data.Datasetのsplit関数で訓練データと検証データに分割
train_ds, val_ds = train_val_ds.split(split_ratio=0.8, random_state=random.seed(1234))


# ボキャブラリーを作成
# torchtextで単語ベクトルとして英語学習済みモデルを読み込み
load_path = path + "wiki-news-300d-1M.vec" 
english_fasttext_vectors = Vectors(name=load_path)

# ベクトル化したバージョンのボキャブラリーを作成
TEXT.build_vocab(train_ds, vectors=english_fasttext_vectors, min_freq=10)

# DataLoaderを作成
batch_size = 64
train_dl = torchtext.data.Iterator(train_ds, batch_size=batch_size, train=True)
val_dl = torchtext.data.Iterator(test_ds, batch_size=batch_size, train=False, sort=False)
test_dl = torchtext.data.Iterator(test_ds, batch_size=batch_size, train=False, sort=False)
dataloaders_dict = {"train": train_dl, "val": val_dl}   # 辞書オブジェクトにまとめる

# ミニバッチの用意
batch = next(iter(train_dl))

# Transformerのblockを定義

## Embedding層

In [None]:
# Embedding層モジュールを定義
class Embedder(nn.Module):
    # idで示される単語をベクトルに変換
    
    def __init__(self, text_embedding_vectors):
        super(Embedder, self).__init__()
        
        # 学習済み単語ベクトルを読み込み(freeze=Trueで学習しない)
        self.embeddings = nn.Embedding.from_pretrained(embeddings=text_embedding_vectors, freeze=True)
        
    def forward(self, x):
        x_vec = self.embeddings(x)
        return x_vec        
    
# 動作を確認
word_id = batch.Text[0]
net1 = Embedder(TEXT.vocab.vectors)
x = net1(word_id)   # 単語をベクトルに

print("入力のテンソルサイズ：", word_id.shape)
print("出力のテンソルサイズ：", x.shape)

## Positional Encoder層

In [None]:
# Positional Encoder層モジュールを定義
class PositionalEncoder(nn.Module):
    
    # 入力された単語の位置を示すベクトル情報を付加する
    def __init__(self, d_model=300, max_seq_len=256):
        super().__init__()
        self.d_model = d_model   # 単語ベクトルの次元数
        
        # 単語の順序と埋め込みベクトルが一意に定まる値を定義
        pe = torch.zeros((max_seq_len, d_model))
        
        # GPUが使える場合はGPUへ送る、ここでは省略。実際に学習時には使用する
        device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
        pe = pe.to(device)
        
        for pos in range(max_seq_len):
            for i in range(0, d_model, 2):
                pe[pos, i] = scipy.sin(pos / (10000 ** ((2 * i)/d_model)))
                pe[pos, i+1] = scipy.cos(pos / (10000 ** ((2 * (i + 1))/d_model)))
                
        self.pe = pe.unsqueeze(0)
        self.pe.requires_grad = False   # 勾配は計算しない
       
    # 入力する単語ベクトルとPositional Embeddingの和を取る
    def forward(self, x):
        ret = np.sqrt(self.d_model)*x + self.pe
        return ret
    
# 動作確認
# モデル構築
net1 = Embedder(TEXT.vocab.vectors)
net2 = PositionalEncoder(d_model=300, max_seq_len=256)

# 入出力
word_id = batch.Text[0]
x1 = net1(word_id)
x2 = net2(x1)
print("入力のテンソルサイズ：", x1.shape)
print("出力のテンソルサイズ：", x2.shape)

## Self Attention層

In [None]:
# Attention層モジュールを定義
class Attention(nn.Module):
    
    def __init__(self, d_model=300):
        super().__init__()
        
        # 全結合層で特徴量を変換
        self.q_linear = nn.Linear(d_model, d_model)
        self.v_linear = nn.Linear(d_model, d_model)
        self.k_linear = nn.Linear(d_model, d_model)
        
        # 出力時の全結合層
        self.out = nn.Linear(d_model, d_model)
        
        # attentionの大きさを調整
        self.d_k = d_model
        
    def forward(self, q, k, v, mask):
        # 全結合層で特徴量を変換
        k = self.k_linear(k)
        q = self.q_linear(q)
        v = self.v_linear(v)
        
        # attentionの値を計算
        weights = torch.matmul(q, k.transpose(1, 2)) / np.sqrt(self.d_k)
        
        # mask計算
        mask = mask.unsqueeze(1)
        weights = weights.masked_fill(mask==0, -1e9)
        
        # softmaxで規格化
        normalized_weights = F.softmax(weights, dim=-1)
        
        # AttentionとValueの積
        output = torch.matmul(normalized_weights, v)
        
        # 全結合層で特徴量を変換
        output = self.out(output)
        
        return output, normalized_weights, k, q, v
    
# 動作確認
# モデル構築
net1 = Embedder(TEXT.vocab.vectors)
net2 = PositionalEncoder(d_model=300, max_seq_len=256)
net3 = Attention(d_model=300)

# 入力を定義
word_id = batch.Text[0]
input_pad = 1
input_mask = (word_id!=input_pad)

# モデル検証
x1 = net1(word_id)
x2 = net2(x1)
output, normalized_weights, k, q, v = net3(x2, x2, x2, input_mask)

## Transfomer Block層

In [None]:
# Transformerブロックを定義
# Feedforward層を定義
class FeedForward(nn.Module):
    
    def __init__(self, d_model, d_ff=1024, dropout=0.1):
        super().__init__()

        self.linear_1 = nn.Linear(d_model, d_ff)
        self.dropout = nn.Dropout(dropout)
        self.linear_2 = nn.Linear(d_ff, d_model)
    
    def forward(self, x):
        x = self.linear_1(x)
        x = self.dropout(F.relu(x))
        x = self.linear_2(x)
        return x
    
# Transformer層を定義
class TransformerBlock(nn.Module):
    def __init__(self, d_model, dropout=0.1):
        super().__init__()
        
        # LayerNormalization層
        self.norm1 = nn.LayerNorm(d_model)
        self.norm2 = nn.LayerNorm(d_model)
        
        # Attention層
        self.attn = Attention(d_model)
        
        # 全結合層
        self.ff = FeedForward(d_model)
        
        # Dropout
        self.dropout_1 = nn.Dropout(dropout)
        self.dropout_2 = nn.Dropout(dropout)
        
    def forward(self, x, mask):
        # normalizationとattention
        x_normalized1 = self.norm1(x)
        output, normalized_weights, k, q, v = self.attn(x_normalized1, x_normalized1, x_normalized1, mask)
        
        x2 = x + self.dropout_1(output)
        
        # 正規化と全結合層
        x_normalized2 = self.norm2(x2)
        output = x2 + self.dropout_2(self.ff(x_normalized2))
        return output, normalized_weights
    
# 動作確認
# モデル構築
net1 = Embedder(TEXT.vocab.vectors)
net2 = PositionalEncoder(d_model=300, max_seq_len=256)
net3 = TransformerBlock(d_model=300)

# 入力を定義
word_id = batch.Text[0]
input_pad = 1
input_mask = (word_id!=input_pad)

# モデル検証
x1 = net1(word_id)
x2 = net2(x1)
x3, normalized_weights = net3(x2, input_mask)  # Self-Attentionで特徴量を変換

print("入力のテンソルサイズ：", x2.shape)
print("出力のテンソルサイズ：", x3.shape)
print("Attentionのサイズ：", normalized_weights.shape)

## Classification Head層

In [None]:
# Classification Head層を定義
class ClassificationHead(nn.Module):
    # Transformer Blockの出力を使用し、最後にクラス分類させる
    def __init__(self, d_model=300, output_dim=2):
        super().__init__()
        
        # 全結合層
        self.linear = nn.Linear(d_model, output_dim)   # output dimはポジティブ、ネガティブの2つ
        
        # 重み初期化処理
        nn.init.normal_(self.linear.weight, std=0.02)
        nn.init.normal_(self.linear.bias, 0)
        
    def forward(self, x):
        Z = x[:, 0, :]   # 各ミニバッチの各文の先頭の単語の特徴量(300次元)を取り出す
        out = self.linear(Z)
        return out
    
    
# 動作確認
# モデル構築
net1 = Embedder(TEXT.vocab.vectors)
net2 = PositionalEncoder(d_model=300, max_seq_len=256)
net3 = TransformerBlock(d_model=300)
net4 = ClassificationHead(d_model=300, output_dim=2)

# 入力を定義
word_id = batch.Text[0]
input_pad = 1
input_mask = (word_id!=input_pad)

# モデル検証
x1 = net1(word_id)
x2 = net2(x1)
x3, normalized_weights = net3(x2, input_mask)  # Self-Attentionで特徴量を変換
x4 = net4(x3)  # 最終出力の0単語目を使用して、分類0-1のスカラーを出力

print("入力のテンソルサイズ：", x3.shape)
print("出力のテンソルサイズ：", x4.shape)


In [None]:
word_id.shape

In [None]:
test = []
for batch in (dataloaders_dict["train"]):
    # batchはTextとLableの辞書オブジェクト

    # GPUが使えるならGPUにデータを送る
    test.append(batch.Text[0].numpy().astype("int"))

In [None]:
np.prod(np.vstack((test)).shape)

## Transformerモデルを定義

In [None]:
# Transformerを実装
class TransformerClassification(nn.Module):
    # Transformerでクラス分類させる
    def __init__(self, text_embedding_vectors, d_model=300, max_seq_len=256, output_dim=2):
        super().__init__()
        
        # モデル構築
        self.net1 = Embedder(text_embedding_vectors)
        self.net2 = PositionalEncoder(d_model=d_model, max_seq_len=max_seq_len)
        self.net3_1 = TransformerBlock(d_model=d_model)
        self.net3_2 = TransformerBlock(d_model=d_model)
        self.net4 = ClassificationHead(d_model=d_model, output_dim=output_dim)
        
    def forward(self, word_id, mask):
        x1 = self.net1(word_id)   # 単語をベクトルに
        x2 = self.net2(x1)   # position情報の和を取る
        x3_1, normalized_weights_1 = self.net3_1(x2, input_mask)  # 1つ目のSelf Attentionで特徴量を変換
        x3_2, normalized_weights_2 = self.net3_2(x3_1, input_mask)   # 2つ目のSelf Attentionで特徴量を変換
        x4 = self.net4(x3_2)  # 最終出力の0単語目を使用して、分類0-1のスカラーを出力
        return x4, normalized_weights_1, normalized_weights_2
    
# 動作確認
# モデル構築
net = TransformerClassification(text_embedding_vectors=TEXT.vocab.vectors, d_model=300, max_seq_len=256, output_dim=2)

# 入力を定義
word_id = batch.Text[0]
input_pad = 1
input_mask = (word_id!=input_pad)

# モデル検証
out, normalized_weights_1, normalized_weights_2 = net(word_id, input_mask)

print("出力のテンソルサイズ：", out.shape)
print("出力テンソルのsigmoid：", F.softmax(out, dim=1))

# Transformerの学習と推論

## モデルを定義

In [None]:
# ネットワークの初期化関数を定義
def weights_init(m):
    classname = m.__class__.__name__
    if classname.find("Linear")!=-1:
        # Linear層の初期化
        nn.init.kaiming_normal_(m.weight)
        if m.bias is not None:
            nn.init.constant_(m.bias, 0.0)

# ネットワークモデルを定義
net = TransformerClassification(text_embedding_vectors=TEXT.vocab.vectors, d_model=300, max_seq_len=256, output_dim=2)

# 訓練モードに設定
net.train()

net.net3_1.apply(weights_init)
net.net3_2.apply(weights_init)

## 負の対数尤度関数と最適化手法を定義

In [None]:
# 負の対数尤度関数の設定
criterion = nn.CrossEntropyLoss()

# 最適化手法の設定
learning_rate = 2e-5
optimizer = optimizers.Adam(net.parameters(), lr=learning_rate)

## 学習・検証を実施

In [None]:
# アルゴリズムの設定
num_epochs = 15
input_pad = 1  # 単語のIDにおいて、'<pad>': 1 なので

# GPUが使えるかを確認
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
print("使用デバイス：", device)
print('-----start-------')
# ネットワークをGPUへ
net.to(device)

# ネットワークがある程度固定であれば、高速化させる
torch.backends.cudnn.benchmark = True

# epochのループ
for epoch in range(num_epochs):
    # epochごとの訓練と検証のループ
    for phase in ['train', 'val']:
        if phase=='train':
            net.train()  # モデルを訓練モードに
        else:
            net.eval()   # モデルを検証モードに

        epoch_loss = 0.0  # epochの損失和
        epoch_corrects = 0  # epochの正解数

        # データローダーからミニバッチを取り出すループ
        for batch in (dataloaders_dict[phase]):
            # batchはTextとLableの辞書オブジェクト

            # GPUが使えるならGPUにデータを送る
            inputs = batch.Text[0].to(device)  # 文章
            labels = batch.Label.to(device)  # ラベル

            # optimizerを初期化
            optimizer.zero_grad()

            # 順伝搬（forward）計算
            with torch.set_grad_enabled(phase=='train'):

                # mask作成
                input_mask = (inputs!=input_pad)

                # Transformerに入力
                outputs, _, _ = net(inputs, input_mask)
                loss = criterion(outputs, labels)  # 損失を計算

                _, preds = torch.max(outputs, 1)  # ラベルを予測

                # 訓練時はバックプロパゲーション
                if phase == 'train':
                    loss.backward()
                    optimizer.step()

                # 結果の計算
                epoch_loss += loss.item() * inputs.size(0)  # lossの合計を更新
                # 正解数の合計を更新
                epoch_corrects += torch.sum(preds==labels.data)

        # epochごとのlossと正解率
        epoch_loss = epoch_loss / len(dataloaders_dict[phase].dataset)
        epoch_acc = epoch_corrects.double(
        ) / len(dataloaders_dict[phase].dataset)

        print('Epoch {}/{} | {:^5} |  Loss: {:.4f} Acc: {:.4f}'.format(epoch+1, num_epochs,
                                                                       phase, epoch_loss, epoch_acc))