# ＊PYTORCHによるRoBERTaの実装


- 環境:Ubuntu20.04
- GPU:NVIDIA RTX A6000
- ドライバー:NVIDIA-SMI 510.47.03, Driver Version: 510.47.03, CUDA Version: 11.6
- ./data:train.tsv, test.tsv,test.csv
- EarlyStoppingを利用する場合はhttps://github.com/Bjarten/early-stopping-pytorch からpytorchtools.pyをutilsにインストールし学習・検証のコメントアウトを外すこと

# ＊事前準備


In [None]:
#https://pytorch.org/get-started/previous-versions/
#https://github.com/pytorch/text/issues/1342
#https://github.com/pytorch/text#installation
#https://teratail.com/questions/358588

!conda create -n pytorch
!conda activate pytorch
# https://pytorch.org/get-started/previous-versions/#linux-and-windows-11
!pip install torch==1.8.0+cu111 torchvision==0.9.0+cu111 torchaudio==0.8.0 -f https://download.pytorch.org/whl/torch_stable.html
!pip install jupyterlab

!pip install transformers==4.17.0
!pip install torchtext==0.9.0
!pip install tqdm
!pip install pyknp
!pip install attrdict
!pip install spacy
!pip install mojimoji
!pip install protobuf
!pip install sentencepiece
!pip install fugashi
!pip install pandas
!pip install sklearn

In [None]:
import torch
print(torch.cuda.is_available())
#print(torch.cuda.device(0))                                                    
print(torch.version.cuda)
print(torch.__version__)
print(torch.cuda.device_count())
print(torch.cuda.current_device())
#True
#11.1
#1.8.0+cu111
#0

# ＊ライブラリのインストール

In [None]:
import random
import time
import numpy as np
from tqdm import tqdm
import torch 
from torch import nn
import torch.optim as optim
import torchtext
import glob
import os
import io
import string
import re
import sys
import random
import mojimoji
import string
import pickle

In [None]:
from transformers import BertJapaneseTokenizer, BertModel, AutoTokenizer
path_result="./result/"
path_weights="./weights/"
max_length=128

# ＊Jumanで形態素解析

In [None]:
from pyknp import Juman
class JumanTokenize(object):
    def __init__(self):
        self.juman = Juman()

    def tokenize(self, text):
        result = self.juman.analysis(text)
        return [mrph.midasi for mrph in result.mrph_list()]
    
class BertTokenizer(object):
    def __init__(self):  
        self.juman_tokenizer = JumanTokenize()
    def tokenize(self, text):
        split_tokens = []
        token = self.juman_tokenizer.tokenize(text)
        """
        for token in self.juman_tokenizer.tokenize(text):
                print(token)
                split_tokens.append(token)
        """
        #print(token)
        return token

# ＊前処理

In [None]:
def preprocessing_text(text):
    # 半角・全角の統一（半角から全角へ変換）
    text = mojimoji.han_to_zen(text) 
    # 改行、半角スペース、全角スペースを削除
    text = re.sub('\r', '', text)
    text = re.sub('\n', '', text)
    text = re.sub('　', '', text)
    text = re.sub(' ', '', text)
    #どっちでも
    #text = re.sub(',', '', text)

    # 数字文字の一律「0」化
    #text = re.sub(r'[0-9 ０-９]+', '0', text)  # 数字

    # カンマ、ピリオド以外の記号をスペースに置換
    for p in string.punctuation:
        #if (p == "."):
        if (p == ".") or (p == ","):
            continue
        else:
            text = text.replace(p, " ")
        return text

# ＊TorchtextでDatasetの作成

In [None]:
# 乱数のシードを設定
torch.manual_seed(1234)
np.random.seed(1234)
random.seed(1234)

# 単語分割用のTokenizerを用意
tokenizer_bert=AutoTokenizer.from_pretrained("nlp-waseda/roberta-base-japanese")
#tokenizer_bert = AutoTokenizer.from_pretrained("cl-tohoku/bert-base-japanese-v2")
#tokenizer_bert = AutoTokenizer.from_pretrained("cl-tohoku/bert-base-japanese-whole-word-masking")

tokenizer_bert_Juman = BertTokenizer()

def tokenizer_with_preprocessing(text, tokenizer=tokenizer_bert_Juman):
    text = preprocessing_text(text)
    token = tokenizer_bert_Juman.tokenize(text)
    ret = tokenizer_bert.encode(" ".join(token), max_length=max_length, truncation=True, return_tensors='pt')[0]
    #print(ret)
    return ret

# データを読み込んだときに、読み込んだ内容に対して行う処理を定義します
TEXT = torchtext.legacy.data.Field(sequential=True, tokenize=tokenizer_with_preprocessing, use_vocab=False, lower=False, include_lengths=True, batch_first=True, fix_length=max_length, pad_token=0)

LABEL = torchtext.legacy.data.Field(sequential=False, use_vocab=False)

# フォルダ「data」から各tsvファイルを読み込みます
train_val_ds, test_ds = torchtext.legacy.data.TabularDataset.splits(
    path="data", train='train.tsv',
    test='test.tsv', format='tsv',
    fields=[('Text', TEXT), ('Label', LABEL)])

# ＊Dataloaderの作成

In [None]:
# BERTでは16、32あたりを使用する
batch_size=32

# torchtext.data.Datasetのsplit関数で訓練データとvalidationデータを分ける
train_ds, val_ds = train_val_ds.split(split_ratio=0.8, random_state=random.seed(1234))

train_dl = torchtext.legacy.data.Iterator(train_val_ds, batch_size=batch_size, train=True)
train_dl_val = torchtext.legacy.data.Iterator(train_ds, batch_size=batch_size, train=True)
val_dl = torchtext.legacy.data.Iterator(val_ds, batch_size=batch_size, train=False, sort=False)
test_dl = torchtext.legacy.data.Iterator(test_ds, batch_size=batch_size, train=False, sort=False)

# 辞書オブジェクトにまとめる
dataloaders_dict_val = {"train": train_dl_val, "val": val_dl}
dataloaders_dict = {"train": train_dl}

# ＊動作確認

In [None]:
# 動作確認 テストデータのデータセットで確認
batch = next(iter(test_dl))
print(batch)
print(batch.Text)
print(batch.Label)

In [None]:
# ミニバッチの1文目を確認してみる
text_minibatch_1 = (batch.Text[0][1]).numpy()

# IDを単語に戻す
text = tokenizer_bert.convert_ids_to_tokens(text_minibatch_1)

print(text)

In [None]:
# 動作確認
tokenizer_bert_Juman = BertTokenizer()
text="早稲田大学で自然言語処理の勉強をする"
text = tokenizer_bert_Juman.tokenize(text)
tokenizer_bert=AutoTokenizer.from_pretrained("nlp-waseda/roberta-base-japanese")
text=tokenizer_bert(" ".join(text))
print(text)

# IDを単語に戻す
text2=tokenizer_bert.convert_ids_to_tokens(text["input_ids"])
print(text2)

# ＊RoBERTaモデルを構築

In [None]:
#https://github.com/huggingface/transformers/issues/5421
from transformers import BertModel, RobertaModel, AutoModelForMaskedLM

net_bert=RobertaModel.from_pretrained("nlp-waseda/roberta-base-japanese")
#net_bert=BertModel.from_pretrained("cl-tohoku/bert-base-japanese-whole-word-masking")
print(net_bert.config)

#model= AutoModelForMaskedLM.from_pretrained("nlp-waseda/roberta-base-japanese")
#net_bert = RobertaModel.from_pretrained(model.config, add_pooling_layer=True)

In [None]:
#https://github.com/huggingface/transformers/blob/main/src/transformers/models/roberta/modeling_roberta.py
#https://github.com/huggingface/transformers/issues/8776
#https://github.com/huggingface/transformers/issues/1328

class BertClassifier(nn.Module):

    def __init__(self, net_bert):
        super(BertClassifier, self).__init__()

        # BERTモジュール
        self.bert = net_bert  # BERTモデル   
        self.classifier = RobertaClassificationHead(net_bert.config)

        # headに予測を追加
        # 入力はBERTの出力特徴量の次元、出力は2つ
        self.cls = nn.Linear(in_features=768, out_features=2)
        #増やす場合はout_reaturesを変更する
        #self.cls = nn.Linear(in_features=768, out_features=3)
        
        #　ドロップアウト率
        self.dropout = nn.Dropout(0.1)

        # 重み初期化処理
        nn.init.normal_(self.cls.weight, std=0.02)
        nn.init.normal_(self.cls.bias, 0)

    def forward(self, input_ids, token_type_ids=None, attention_mask=None, output_all_encoded_layers=False, attention_show_flg=False):
        '''
        input_ids： [batch_size, sequence_length]の文章の単語IDの羅列
        token_type_ids： [batch_size, sequence_length]の、各単語が1文目なのか、2文目なのかを示すid
        attention_mask：Transformerのマスクと同じ働きのマスキングです
        output_all_encoded_layers：最終出力に12段のTransformerの全部をリストで返すか、最後だけかを指定
        attention_show_flg：Self-Attentionの重みを返すかのフラグ
        '''

        # BERTの基本モデル部分の順伝搬
        # 順伝搬させる
        if attention_show_flg == True:
            output  = self.bert(input_ids, output_attentions=True, output_hidden_states=True)
            encoded_layers = output['last_hidden_state']
            attentions = output['attentions']
            pooler_output = output['pooler_output'] 
            
        elif attention_show_flg == False:
            output  = self.bert(input_ids)
            #output = self.bert(input_ids, token_type_ids, attention_mask, output_all_encoded_layers, attention_show_flg)
            encoded_layers = output['last_hidden_state']
            pooler_output = output['pooler_output'] 
        
        # 書籍の実装：入力文章の1単語目[CLS]の特徴量を使用して、分類します
        #vec_0 = encoded_layers[:, 0, :]
        #vec_0 = vec_0.view(-1, 768)  # sizeを[batch_size, hidden_sizeに変換
        #out = self.cls(vec_0)
        
        # Hugging faceの実装：RobertaPoolerとの違いはdropoutのみ?
        #sequence_output = output['last_hidden_state']
        #out = self.classifier(sequence_output)
        
        # オリジナル：Hugging faceの実装とほとんど同じ（違いはdropoutのみ?）
        pooler_output = self.dropout(pooler_output)
        out = self.cls(pooler_output)

        # attention_showのときは、attention_probs（1番最後の）もリターンする
        if attention_show_flg == True:
            return out, attentions
        elif attention_show_flg == False:
            return out
        
class RobertaClassificationHead(nn.Module):
    """Head for sentence-level classification tasks."""

    def __init__(self, config):
        super().__init__()
        self.dense = nn.Linear(config.hidden_size, config.hidden_size)
        classifier_dropout = (
            config.classifier_dropout if config.classifier_dropout is not None else config.hidden_dropout_prob
        )
        self.dropout = nn.Dropout(classifier_dropout)
        self.out_proj = nn.Linear(config.hidden_size, config.num_labels)

    def forward(self, features, **kwargs):
        x = features[:, 0, :]  # take <s> token (equiv. to [CLS])
        x = self.dropout(x)
        x = self.dense(x)
        x = torch.tanh(x)
        x = self.dropout(x)
        x = self.out_proj(x)
        return x

In [None]:
# モデル構築
net = BertClassifier(net_bert)
# 訓練モードに設定
net.train()

print('ネットワーク設定完了')

# ＊BERTのファインチューニング

## パターン1：全てのパラメータを更新

In [None]:
# BERTよりも学習率を大きくしないと学習しない
# 勾配計算True
for name, param in net.named_parameters():
    param.requires_grad = True
    
# 最適化手法の設定
# BERTの元の部分はファインチューニング
optimizer = optim.Adam([
    {'params': net.cls.parameters(), 'lr': 5e-3}
], betas=(0.9, 0.999))

# 損失関数の設定
criterion = nn.CrossEntropyLoss()
# nn.LogSoftmax()を計算してからnn.NLLLoss(negative log likelihood loss)を計算

## パターン2：BERTの最終層と識別器のパラメータを更新

In [None]:
# 勾配計算を最後のBertLayerモジュールと追加した分類アダプターのみ実行

# 1. まず全部を、勾配計算Falseにしてしまう
for name, param in net.named_parameters():
    param.requires_grad = False

# 2. 最後のBertLayerモジュールを勾配計算ありに変更
for name, param in net.bert.encoder.layer[-1].named_parameters():
    param.requires_grad = True

# 3. 識別器を勾配計算ありに変更
for name, param in net.cls.named_parameters():
    param.requires_grad = True
    
# 最適化手法の設定

# BERTの元の部分はファインチューニング
optimizer = optim.Adam([
    {'params': net.bert.encoder.layer[-1].parameters(), 'lr': 5e-5},
    {'params': net.cls.parameters(), 'lr': 1e-5}
], betas=(0.9, 0.999))

# 損失関数の設定
criterion = nn.CrossEntropyLoss()
# nn.LogSoftmax()を計算してからnn.NLLLoss(negative log likelihood loss)を計算

## パターン3：BERTの最終4層と識別器のパラメータを更新

In [None]:
# まず全部を、勾配計算Falseにしてしまう
for param in net.parameters():
    param.requires_grad = False

# BERTの最終4層分を勾配計算ありに変更
for param in net.bert.encoder.layer[-1].parameters():
    param.requires_grad = True

for param in net.bert.encoder.layer[-2].parameters():
    param.requires_grad = True

for param in net.bert.encoder.layer[-3].parameters():
    param.requires_grad = True

for param in net.bert.encoder.layer[-4].parameters():
    param.requires_grad = True

# 識別器を勾配計算ありに変更
for param in net.cls.parameters():
    param.requires_grad = True

# 事前学習済の箇所は学習率小さめ、最後の全結合層は大きめにする
optimizer = optim.Adam([
    {'params': net.bert.encoder.layer[-1].parameters(), 'lr': 5e-5},
    {'params': net.bert.encoder.layer[-2].parameters(), 'lr': 5e-5},
    {'params': net.bert.encoder.layer[-3].parameters(), 'lr': 5e-5},
    {'params': net.bert.encoder.layer[-4].parameters(), 'lr': 5e-5},
    {'params': net.cls.parameters(), 'lr': 1e-4}
])

# 損失関数の設定
criterion = nn.CrossEntropyLoss()
# nn.LogSoftmax()を計算してからnn.NLLLoss(negative log likelihood loss)を計算

# ＊学習・検証

## ＊ 開発データでハイパーパラメータを決定

In [None]:
# モデルを学習させる関数を作成
#from utils.pytorchtools import EarlyStopping
def train_model(net, dataloaders_dict, criterion, optimizer, num_epochs):
    #エポック数,Acuraccy,Loss保存用
    Epochs=[]
    Accuracy_train=[]
    Loss_train=[]
    Accuracy_val=[]
    Loss_val=[]
    
    #イテレータのエポック数,Acuraccy,Loss保存用
    Epochs_it=[]
    Accuracy_train_it=[]
    Loss_train_it=[]
    
    # GPUが使えるかを確認
    device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
    #device = torch.device("cpu")
    print("使用デバイス：", device)
    print('-----start-------')
    #early_stopping = EarlyStopping(patience=10)
    
    # ネットワークをGPUへ
    net.to(device)

    # ネットワークがある程度固定であれば、高速化させる
    torch.backends.cudnn.benchmark = True

    # ミニバッチのサイズ
    batch_size = dataloaders_dict["train"].batch_size
    n=0
    
    #時間
    start =time.time()
    # epochのループ
    for epoch in range(num_epochs):
        """
        if early_stopping.early_stop:
            print("Early Stopping")
            break # 打ち切り
        # epochごとの訓練と検証のループ
        """
        
        for phase in ['train', 'val']:
            count=0
            if phase == 'train':
                net.train()  # モデルを訓練モードに
            else:
                net.eval()   # モデルを検証モードに
                count=1
            
            epoch_loss = 0.0  # epochの損失和
            epoch_corrects = 0  # epochの正解数
            iteration = 1

            # 開始時刻を保存
            t_epoch_start = time.time()
            t_iter_start = time.time()

            # データローダーからミニバッチを取り出すループ
            #print(dataloaders_dict[phase])
            for batch in (dataloaders_dict[phase]):
                # batchはTextとLableの辞書型変数

                # GPUが使えるならGPUにデータを送る
                inputs = batch.Text[0].to(device)  # 文章
                labels = batch.Label.to(device)  # ラベル

                # optimizerを初期化
                optimizer.zero_grad()

                # 順伝搬（forward）計算
                with torch.set_grad_enabled(phase == 'train'):

                    # Bertに入力
                    outputs = net(inputs, token_type_ids=None, attention_mask=None,
                                  output_all_encoded_layers=False, attention_show_flg=False)

                    loss = criterion(outputs, labels)  # 損失を計算
                    #loss = criterion(outputs.view(-1, 2), labels.view(-1))

                    _, preds = torch.max(outputs, 1)  # ラベルを予測
                    

                    # 訓練時はバックプロパゲーション
                    if phase == 'train':
                        loss.backward()
                        optimizer.step()
                        
                        
                        if (iteration % 10 == 0):  # 10iterに1度、lossを表示
                            t_iter_finish = time.time()
                            duration = t_iter_finish - t_iter_start
                            acc = (torch.sum(preds == labels.data)).double()/batch_size
                            print('イテレーション {} || Loss: {:.4f} || 10iter: {:.4f} sec. || 本イテレーションの正解率：{}'.format(iteration, loss.item(), duration, acc))
                            t_iter_start = time.time()
                            
                            """
                            Epochs_it.append(iteration/10)
                            Accuracy_train_it.append(acc)
                            Loss_train_it.append(loss.item())
                            """
                            #early_stopping(loss.item(), net)
                                       

                    iteration += 1

                    # 損失と正解数の合計を更新
                    epoch_loss += loss.item() * batch_size
                    epoch_corrects += torch.sum(preds == labels.data)
                    
            # epochごとのlossと正解率
            t_epoch_finish = time.time()
            epoch_loss = epoch_loss / len(dataloaders_dict[phase].dataset)
            epoch_acc = epoch_corrects.double(
            ) / len(dataloaders_dict[phase].dataset)

            print('Epoch {}/{} | {:^5} |  Loss: {:.4f} Acc: {:.4f}'.format(epoch+1, num_epochs,
                                                                           phase, epoch_loss, epoch_acc))
            
            a=float("{:.4f}".format(epoch_acc))
                
            t_epoch_start = time.time()
            
            if count == 0:
                Loss_train.append(epoch_loss)
                Accuracy_train.append(a)
            elif count==1:
                Loss_val.append(epoch_loss)
                Accuracy_val.append(a)
        Epochs.append(epoch+1)
    t=time.time()
    print("Time:{:.4f}sec".format(t-start))
        
    return net,Epochs,Loss_train,Accuracy_train,Loss_val,Accuracy_val

In [None]:
# 学習・検証を実行する。1epochに20分ほどかかります
num_epochs = 10
net_trained, Epochs, Loss_train, Accuracy_train, Loss_val, Accuracy_val = train_model(net, dataloaders_dict_val,
                          criterion, optimizer, num_epochs=num_epochs)


## ＊学習時のEpochsごとのAccuracyを出力

In [None]:
import matplotlib.pyplot as plt
#print(Accuracy_train)
#print(Accuracy_val)
#Accuracy_test=[]
plt.plot(Epochs,Accuracy_train,color="blue",label="train")
plt.plot(Epochs,Accuracy_val,color="red",label="val")
#plt.plot(Epochs,Accuracy_test,color="red",label="test")
plt.xticks(Epochs)
plt.xlabel("Epochs")
plt.ylabel("Accuracy")
plt.grid()
plt.legend()
plt.savefig(path_result+"acc_val.png")

## ＊学習時のEpochsごとのLossを出力

In [None]:
#plt.axes().set_aspect("equal")
#Loss_test=[]
plt.plot(Epochs, Loss_train,color="blue",label="train")
plt.plot(Epochs, Loss_val,color="red",label="val")
#plt.plot(Epochs,Loss_test,color="red",label="test")
plt.xticks(Epochs)
plt.xlabel("Epochs")
plt.ylabel("Loss")
plt.grid()
plt.legend()
plt.savefig(path_result+"Loss_val.png")

## ＊ 全データで学習

In [None]:
# モデルを学習させる関数を作成
#from utils.pytorchtools import EarlyStopping
def train_model(net, dataloaders_dict, criterion, optimizer, num_epochs):
    
    #エポック数,Acuraccy,Loss保存用
    Epochs=[]
    Accuracy_train=[]
    Loss_train=[]
    Accuracy_val=[]
    Loss_val=[]
    
    #イテレータのエポック数,Acuraccy,Loss保存用
    Epochs_it=[]
    Accuracy_train_it=[]
    Loss_train_it=[]
    
    # GPUが使えるかを確認
    device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
    #device = torch.device("cpu")
    print("使用デバイス：", device)
    print('-----start-------')
    #early_stopping = EarlyStopping(patience=10)
    
    # ネットワークをGPUへ
    net.to(device)

    # ネットワークがある程度固定であれば、高速化させる
    torch.backends.cudnn.benchmark = True

    # ミニバッチのサイズ
    batch_size = dataloaders_dict["train"].batch_size
    n=0
    
    #時間
    start =time.time()
    
    # epochのループ
    for epoch in range(num_epochs):
        """
        if early_stopping.early_stop:
            print("Early Stopping")
            break # 打ち切り
        # epochごとの訓練と検証のループ
        """
        
        #for phase in ['train', 'val']:
        for phase in ['train']:
            count=0
            if phase == 'train':
                net.train()  # モデルを訓練モードに
            else:
                net.eval()   # モデルを検証モードに
                count=1
            
            epoch_loss = 0.0  # epochの損失和
            epoch_corrects = 0  # epochの正解数
            iteration = 1

            # 開始時刻を保存
            t_epoch_start = time.time()
            t_iter_start = time.time()

            # データローダーからミニバッチを取り出すループ
            for batch in (dataloaders_dict[phase]):
                # batchはTextとLableの辞書型変数

                # GPUが使えるならGPUにデータを送る
                inputs = batch.Text[0].to(device)  # 文章
                labels = batch.Label.to(device)  # ラベル

                # optimizerを初期化
                optimizer.zero_grad()

                # 順伝搬（forward）計算
                with torch.set_grad_enabled(phase == 'train'):

                    # BertForIMDbに入力
                    outputs = net(inputs, token_type_ids=None, attention_mask=None,
                                  output_all_encoded_layers=False, attention_show_flg=False)

                    loss = criterion(outputs, labels)  # 損失を計算

                    _, preds = torch.max(outputs, 1)  # ラベルを予測
                    

                    # 訓練時はバックプロパゲーション
                    if phase == 'train':
                        loss.backward()
                        optimizer.step()
                        
                        
                        if (iteration % 10 == 0):  # 10iterに1度、lossを表示
                            t_iter_finish = time.time()
                            duration = t_iter_finish - t_iter_start
                            acc = (torch.sum(preds == labels.data)).double()/batch_size
                            print('イテレーション {} || Loss: {:.4f} || 10iter: {:.4f} sec. || 本イテレーションの正解率：{}'.format(iteration, loss.item(), duration, acc))
                            t_iter_start = time.time()
                            
                            """
                            Epochs_it.append(iteration/10)
                            Accuracy_train_it.append(acc)
                            Loss_train_it.append(loss.item())
                            """
                            #early_stopping(loss.item(), net)
                                       

                    iteration += 1

                    # 損失と正解数の合計を更新
                    epoch_loss += loss.item() * batch_size
                    epoch_corrects += torch.sum(preds == labels.data)
                    
            # epochごとのlossと正解率
            t_epoch_finish = time.time()
            epoch_loss = epoch_loss / len(dataloaders_dict[phase].dataset)
            epoch_acc = epoch_corrects.double(
            ) / len(dataloaders_dict[phase].dataset)

            print('Epoch {}/{} | {:^5} |  Loss: {:.4f} Acc: {:.4f}'.format(epoch+1, num_epochs,
                                                                           phase, epoch_loss, epoch_acc))
            
            a=float("{:.4f}".format(epoch_acc))
                
            t_epoch_start = time.time()
            
            if count == 0:
                Loss_train.append(epoch_loss)
                Accuracy_train.append(a)
            elif count==1:
                Loss_val.append(epoch_loss)
                Accuracy_val.append(a)
        Epochs.append(epoch+1)
    t=time.time()
    time_val = t-start
    print("Time:{:.4f}sec".format(time_val ))
        
    return net, Epochs, Loss_train, Accuracy_train, Loss_val, Accuracy_val, time_val


In [None]:
# 学習・検証を実行する。1epochに20分ほどかかります
num_epochs = 10
net_trained, Epochs, Loss_train, Accuracy_train, Loss_val, Accuracy_val, time_val= train_model(net, dataloaders_dict,
                          criterion, optimizer, num_epochs=num_epochs)

In [None]:
# 学習したネットワークパラメータを保存します
save_path = path_weights+'bert_fine_tuning.pth'
torch.save(net_trained.state_dict(), save_path)

## ＊＊＊保存しているモデルを使用する場合＊＊＊
- RoBERTaモデルを構築


In [None]:
#https://github.com/huggingface/transformers/blob/main/src/transformers/models/roberta/modeling_roberta.py
class BertClassifier(nn.Module):

    def __init__(self, net_bert):
        super(BertClassifier, self).__init__()

        # BERTモジュール
        self.bert = net_bert  # BERTモデル   
        self.classifier = RobertaClassificationHead(net_bert.config)

        # headに予測を追加
        # 入力はBERTの出力特徴量の次元、出力は2つ
        self.cls = nn.Linear(in_features=768, out_features=2)
        #増やす場合はout_reaturesを変更する
        #self.cls = nn.Linear(in_features=768, out_features=3)
        
        #　ドロップアウト率
        self.dropout = nn.Dropout(0.1)

        # 重み初期化処理
        nn.init.normal_(self.cls.weight, std=0.02)
        nn.init.normal_(self.cls.bias, 0)

    def forward(self, input_ids, token_type_ids=None, attention_mask=None, output_all_encoded_layers=False, attention_show_flg=False):
        '''
        input_ids： [batch_size, sequence_length]の文章の単語IDの羅列
        token_type_ids： [batch_size, sequence_length]の、各単語が1文目なのか、2文目なのかを示すid
        attention_mask：Transformerのマスクと同じ働きのマスキングです
        output_all_encoded_layers：最終出力に12段のTransformerの全部をリストで返すか、最後だけかを指定
        attention_show_flg：Self-Attentionの重みを返すかのフラグ
        '''

        # BERTの基本モデル部分の順伝搬
        # 順伝搬させる
        if attention_show_flg == True:
            output  = self.bert(input_ids, output_attentions=True, output_hidden_states=True)
            encoded_layers = output['last_hidden_state']
            attentions = output['attentions']
            pooler_output = output['pooler_output'] 
            
        elif attention_show_flg == False:
            output  = self.bert(input_ids)
            #output = self.bert(input_ids, token_type_ids, attention_mask, output_all_encoded_layers, attention_show_flg)
            encoded_layers = output['last_hidden_state']
            pooler_output = output['pooler_output'] 
        
        # 書籍の実装：入力文章の1単語目[CLS]の特徴量を使用して、分類します
        #vec_0 = encoded_layers[:, 0, :]
        #vec_0 = vec_0.view(-1, 768)  # sizeを[batch_size, hidden_sizeに変換
        #out = self.cls(vec_0)
        
        # Hugging faceの実装：RobertaPoolerとの違いはdropoutのみ?
        #sequence_output = output['last_hidden_state']
        #out = self.classifier(sequence_output)
        
        # オリジナル：Hugging faceの実装とほとんど同じ（違いはdropoutのみ?）
        pooler_output = self.dropout(pooler_output)
        out = self.cls(pooler_output)

        # attention_showのときは、attention_probs（1番最後の）もリターンする
        if attention_show_flg == True:
            return out, attentions
        elif attention_show_flg == False:
            return out
        
class RobertaClassificationHead(nn.Module):
    """Head for sentence-level classification tasks."""

    def __init__(self, config):
        super().__init__()
        self.dense = nn.Linear(config.hidden_size, config.hidden_size)
        classifier_dropout = (
            config.classifier_dropout if config.classifier_dropout is not None else config.hidden_dropout_prob
        )
        self.dropout = nn.Dropout(classifier_dropout)
        self.out_proj = nn.Linear(config.hidden_size, config.num_labels)

    def forward(self, features, **kwargs):
        x = features[:, 0, :]  # take <s> token (equiv. to [CLS])
        x = self.dropout(x)
        x = self.dense(x)
        x = torch.tanh(x)
        x = self.dropout(x)
        x = self.out_proj(x)
        return x

In [None]:
# モデル構築
net_trained = BertClassifier(net_bert)

# 訓練モードに設定
net_trained.train()

print('ネットワーク設定完了')

#学習済みモデルを読み込む
model_path = './weights/bert_fine_tuning.pth'
net_trained.load_state_dict(torch.load(model_path, map_location=torch.device('cpu')))

# 損失関数の設定
criterion = nn.CrossEntropyLoss()
# nn.LogSoftmax()を計算してからnn.NLLLoss(negative log likelihood loss)を計算

## ＊＊＊Juman前処理用（テストデータのみ）＊＊＊

In [None]:
from transformers import BertJapaneseTokenizer, BertModel, AutoTokenizer
path_result="./result/"
path_weights="./weights/"
max_length=128
batch_size=16

#テストデータのみ
# 乱数のシードを設定
torch.manual_seed(1234)
np.random.seed(1234)
random.seed(1234)

# 単語分割用のTokenizerを用意
tokenizer_bert = AutoTokenizer.from_pretrained("nlp-waseda/roberta-base-japanese")
#tokenizer_bert = AutoTokenizer.from_pretrained("cl-tohoku/bert-base-japanese-whole-word-masking")

def preprocessing_text(text):
    # 半角・全角の統一
    text = mojimoji.han_to_zen(text) 
    # 改行、半角スペース、全角スペースを削除
    text = re.sub('\r', '', text)
    text = re.sub('\n', '', text)
    text = re.sub('　', '', text)
    text = re.sub(' ', '', text)

    # 数字文字の一律「0」化
    text = re.sub(r'[0-9 ０-９]+', '0', text)  # 数字

    # カンマ、ピリオド以外の記号をスペースに置換
    for p in string.punctuation:
        #if (p == "."):
        if (p == ".") or (p == ","):
            continue
        else:
            text = text.replace(p, " ")
        return text

# 前処理と単語分割をまとめた関数を定義
# 単語分割の関数を渡すので、tokenizer_bertではなく、tokenizer_bert.tokenizeを渡す点に注意
def tokenizer_with_preprocessing(text, tokenizer=tokenizer_bert.tokenize):
    text = preprocessing_text(text)
    ret = tokenizer(text)  # tokenizer_bert
    return ret

# データを読み込んだときに、読み込んだ内容に対して行う処理を定義します
#max_length=128
TEXT = torchtext.data.legacy.Field(sequential=True, tokenize=tokenizer_with_preprocessing, use_vocab=True, lower=False, include_lengths=True, batch_first=True, fix_length=max_length, init_token="[CLS]", eos_token="[SEP]", pad_token="[PAD]",unk_token='[UNK]')

LABEL = torchtext.data.legacy.Field(sequential=False, use_vocab=False)

# フォルダ「data」から各tsvファイルを読み込みます
train_ds, test_ds = torchtext.legacy.data.TabularDataset.splits(
    path=DATA_PATH, train='train.tsv',
    test='test.tsv', format='tsv',
    fields=[('Text', TEXT), ('Label', LABEL)])

test_dl = torchtext.legacy.data.Iterator(test_ds, batch_size=batch_size, train=False, sort=False)
# 辞書オブジェクトにまとめる
dataloaders_dict = {"test": test_dl}

## ＊検証

In [None]:
import pandas as pd

# テストデータでの正解率を求める
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

net_trained.eval()   # モデルを検証モードに
net_trained.to(device)  # GPUが使えるならGPUへ送る

# epochの正解数を記録する変数
epoch_corrects = 0

predicted_label=[]#予測ラベル
ture_label=[]#正解ラベル

score_0=[]#0のスコア
score_1=[]#1のスコア
count=0

start =time.time()
for batch in tqdm(test_dl):  # testデータのDataLoader
    
    # batchはTextとLableの辞書オブジェクト
    # GPUが使えるならGPUにデータを送る
    device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
    inputs = batch.Text[0].to(device)  # 文章
    labels = batch.Label.to(device)  # ラベル
    epoch_loss=0.0

    
    # 順伝搬（forward）計算
    with torch.set_grad_enabled(False):

        # Bertに入力
        outputs = net_trained(inputs, token_type_ids=None, attention_mask=None,
                              output_all_encoded_layers=False, attention_show_flg=False)

        loss = criterion(outputs, labels)  # 損失を計算
        
        _, preds = torch.max(outputs, 1)  # ラベルを予測
        epoch_corrects += torch.sum(preds == labels.data)  # 正解数の合計を更新
        
        #outputs=cc(outputs)
        for i in range(batch_size):
            try:
                s1=outputs[i][0].item()
                s2=outputs[i][1].item()
                p_label = preds[i].item()
                t_label = labels[i].item()
                score_0.append(s1)
                score_1.append(s2)
                predicted_label.append(p_label)
                ture_label.append(t_label)
                count+=1
            except:
                break
        epoch_loss += loss.item() * batch_size
        
t=time.time()
time_test =t-start
print("Time:{:.4f}sec".format(time_test))
        
epoch_loss = epoch_loss / len(test_dl.dataset)
print('Loss:{:.4f}'.format(epoch_loss))
# 正解率
epoch_acc = epoch_corrects.double() / len(test_dl.dataset)

print('テストデータ{}個での正解率：{:.4f}'.format(len(test_dl.dataset), epoch_acc))

df = pd.read_csv("./data/test.csv", names=("TEXT", "LABEL"), engine="python", encoding="utf-8-sig")
#df["TEXT"] = np.nan   #予測列を追加
#df["LABEL"] = np.nan   #予測列を追加
df["PREDICT"] = np.nan   #予測列を追加
df["AUC+"] = np.nan   #予測列を追加
df["AUC-"] = np.nan   #予測列を追加

for index in range(count):
    df.at[index, "PREDICT"] = predicted_label[index]
    
    df.at[index, "AUC+"] = score_0[index]
    df.at[index, "AUC-"] = score_1[index]
    
    
df.to_csv(path_result+"predicted_test.csv", encoding="utf-8-sig", index=False)

## ＊検証時のROC(AUC)の出力

In [None]:
from sklearn.metrics import roc_curve, roc_auc_score
import matplotlib.pyplot as plt
fpr, tpr, thresholds = roc_curve(ture_label, score_1)
plt.axes().set_aspect("equal")
#plt.plot(fpr, tpr,marker=".")
plt.plot(fpr, tpr)
plt.xlabel("False Positive Rate")
plt.ylabel("True Positive Rate")
plt.grid()
plt.savefig(path_result+"roc_curve.png")
auc=roc_auc_score(ture_label,score_1)
print("AUC:{}".format(auc))

In [None]:
from sklearn.metrics import precision_recall_curve, auc
import matplotlib.pyplot as plt
precision, recall, thresholds = precision_recall_curve(ture_label, score_1)
#plt.plot(fpr, tpr,marker=".")
plt.axes().set_aspect("equal")
plt.xlim(0.0, 1.0)
plt.ylim(0.0, 1.0)
plt.plot(recall, precision)
plt.xlabel("Recall")
plt.ylabel("Precision")
plt.grid()
plt.savefig(path_result+"precision_recall.png")
pr_auc=auc(recall, precision)
print("AUC:{}".format(pr_auc))

## ＊混同行列と精度

In [None]:
from IPython.display import HTML, display
import pandas as pd
import numpy as np
from sklearn.metrics import confusion_matrix
from sklearn.metrics import accuracy_score
from sklearn.metrics import precision_score
from sklearn.metrics import recall_score
from sklearn.metrics import f1_score

In [None]:
#混合行列の表示（評価）
y_true =[]
y_pred =[]
df = pd.read_csv(path_result+"predicted_test.csv", engine="python", encoding="utf-8-sig")
#df = pd.read_csv("./result/predicted_test.csv", engine="python", encoding="sjis")
for index, row in df.iterrows():
    if row['LABEL'] == 0:
        y_true.append("負例")
    if row['LABEL'] ==1:
        y_true.append("正例")
    if row['PREDICT'] ==0:
        y_pred.append("負例")
    if row['PREDICT'] ==1:
        y_pred.append("正例")

    
print(len(y_true))
print(len(y_pred))


# 混同行列(confusion matrix)の取得
labels = ["負例", "正例"]
from sklearn.metrics import confusion_matrix
cm = confusion_matrix(y_true, y_pred, labels=labels)

# データフレームに変換
cm_labeled = pd.DataFrame(cm, columns=labels, index=labels)

# 結果の表示
cm_labeled.to_csv(path_result+"confusion_matrix.csv", encoding="utf-8-sig")
cm_labeled


In [None]:
y_true =[]
y_pred =[]
df = pd.read_csv(path_result+"predicted_test.csv", engine="python", encoding="utf-8-sig")
#df = pd.read_csv("./result/predicted_test.csv", engine="python", encoding="sjis")
for index, row in df.iterrows():
    y_true.append(row["LABEL"])
    y_pred.append(row["PREDICT"])
"""       
print("正解率（すべてのサンプルのうち正解したサンプルの割合）={}%".format((round(accuracy_score(y_true, y_pred),2)) *100 ))
print("適合率（positiveと予測された中で実際にpositiveだった確率）={}%".format((round(precision_score(y_true, y_pred),2)) *100 ))
print("再現率（positiveなデータに対してpositiveと予測された確率）={}%".format((round(recall_score(y_true, y_pred),2)) *100 ))
print("F1（適合率と再現率の調和平均）={}%".format((round(f1_score(y_true, y_pred),2)) *100 ))
"""
print("正解率（すべてのサンプルのうち正解したサンプルの割合）={}".format((accuracy_score(y_true, y_pred))))
print("適合率（positiveと予測された中で実際にpositiveだった確率）={}".format((precision_score(y_true, y_pred))))
print("再現率（positiveなデータに対してpositiveと予測された確率）={}".format((recall_score(y_true, y_pred))))
print("F1（適合率と再現率の調和平均）={}".format((f1_score(y_true, y_pred))))
with open("{}auc_f.txt".format(path_result),"a",encoding="utf-8") as f:
    f.write("roc_curve, AUC:{}\n".format(auc))
    f.write("precision_recall, AUC:{}\n".format(pr_auc))
    f.write("正解率（すべてのサンプルのうち正解したサンプルの割合）={}\n".format((accuracy_score(y_true, y_pred))))
    f.write("適合率（positiveと予測された中で実際にpositiveだった確率）={}\n".format((precision_score(y_true, y_pred))))
    f.write("再現率（positiveなデータに対してpositiveと予測された確率）={}\n".format((recall_score(y_true, y_pred))))
    f.write("F1（適合率と再現率の調和平均）={}\n".format((f1_score(y_true, y_pred))))
    f.write("Time_val:{:.4f}sec\n".format(time_val))
    f.write("Time_test:{:.4f}sec\n".format(time_test))
    f.close()

# ＊Attentionの可視化

In [None]:
# ミニバッチの用意
batch = next(iter(test_dl))

# GPUが使えるならGPUにデータを送る
inputs = batch.Text[0].to(device)  # 文章
labels = batch.Label.to(device)  # ラベル

outputs, attentions = net_trained(inputs, token_type_ids=None, attention_mask=None,
                                       output_all_encoded_layers=False, attention_show_flg=True)

_, preds = torch.max(outputs, 1)  # ラベルを予測

In [None]:
# HTMLを作成する関数を実装

def highlight(word, attn):
    "Attentionの値が大きいと文字の背景が濃い赤になるhtmlを出力させる関数"

    html_color = '#%02X%02X%02X' % (
        255, int(255*(1 - attn)), int(255*(1 - attn)))
    return '<span style="background-color: {}"> {}</span>'.format(html_color, word)


def mk_html(index, batch, preds, normlized_weights, TEXT):
    "HTMLデータを作成する"

    # indexの結果を抽出
    sentence = batch.Text[0][index]  # 文章
    label = batch.Label[index]  # ラベル
    pred = preds[index]  # 予測

    # ラベルと予測結果を文字に置き換え
    """
    if label == 0:
        label_str = "負例"
    else:
        label_str = "正例"

    if pred == 0:
        pred_str = "負例"
    else:
        pred_str = "正例"
    """
    if label == 0:
        label_str = "非有益"
    else:
        label_str = "有益"

    if pred == 0:
        pred_str = "非有益"
    else:
        pred_str = "有益"


    # 表示用のHTMLを作成する
    #html = '正解ラベル：{}<br>推論ラベル：{}<br><br>'.format(label_str, pred_str)
    html = '正解ラベル：{}<br>推論ラベル：{}<br>'.format(label_str, pred_str)

    # Self-Attentionの重みを可視化。Multi-Headが12個なので、12種類のアテンションが存在
    for i in range(12):

        # indexのAttentionを抽出と規格化
        # 0単語目[CLS]の、i番目のMulti-Head Attentionを取り出す
        # indexはミニバッチの何個目のデータかをしめす
        attens = normlized_weights[index, i, 0, :]
        attens /= attens.max()
        """
        html += '[BERTのAttentionを可視化_' + str(i+1) + ']<br>'
        for word, attn in zip(sentence, attens):

            # 単語が[SEP]の場合は文章が終わりなのでbreak
            if tokenizer_bert.convert_ids_to_tokens([word.numpy().tolist()])[0] == "[SEP]":
                break

            # 関数highlightで色をつける、関数tokenizer_bert.convert_ids_to_tokensでIDを単語に戻す
            html += highlight(tokenizer_bert.convert_ids_to_tokens(
                [word.numpy().tolist()])[0], attn)
        html += "<br><br>"
        """

    # 12種類のAttentionの平均を求める。最大値で規格化
    all_attens = attens*0  # all_attensという変数を作成する
    for i in range(12):
        attens += normlized_weights[index, i, 0, :]
    attens /= attens.max()

    #html += '[BERTのAttentionを可視化_ALL]<br>'
    for word, attn in zip(sentence, attens):

        # 単語が[SEP]の場合は文章が終わりなのでbreak
        
        if tokenizer_bert.convert_ids_to_tokens([word.numpy().tolist()])[0] == "[SEP]":
            break
        

        # 関数highlightで色をつける、関数tokenizer_bert.convert_ids_to_tokensでIDを単語に戻す
        html += highlight(tokenizer_bert.convert_ids_to_tokens(
            [word.numpy().tolist()])[0], attn)
        
    html += "<br><br>"

    return html


In [None]:
from IPython.display import HTML

index = 5  # 出力させたいデータ
html_output = mk_html(index, batch, preds, attentions[-1], TEXT)  # HTML作成
HTML(html_output)  # HTML形式で出力


In [None]:
count=0
for batch in test_dl:
    # ミニバッチの用意
    #batch = next(iter(test_dl))

    # GPUが使えるならGPUにデータを送る
    inputs = batch.Text[0].to(device)  # 文章
    labels = batch.Label.to(device)  # ラベル

    outputs, attentions = net_trained(inputs, token_type_ids=None, attention_mask=None,
                                           output_all_encoded_layers=False, attention_show_flg=True)
    _, preds = torch.max(outputs, 1)  # ラベルを予測
    
    for index in range(batch_size):
        try:
            html_output = mk_html(index, batch, preds, attentions[-1], TEXT)  # HTML作成
            with open("{}Attention.html".format(path_result),"a", encoding="utf-8") as f:
                f.write(html_output)
            count+=1
        except:
            pass 