<a href="https://colab.research.google.com/github/tsuji1234/sample/blob/main/BertByTsuji.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# pip-install
---

In [None]:
## =================================================================
## Googleドライブをマウント
## =================================================================
from google.colab import drive
drive.mount('/content/drive')
workDirPath = '/content/drive/MyDrive/SoftmBert'


Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
## =================================================================
## パッケージインストール
## =================================================================
!pip install --upgrade pip
!pip install transformers==4.18.0 fugashi==1.1.0 ipadic==1.0.0 pytorch-lightning==1.6.1
!pip install livelossplot --quiet # acc, lossグラフ表示用


Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
[0mLooking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
[0m

# Debug用
---

In [None]:
#import pdb
#pdb.set_trace()

# クラス定義
---

In [None]:
from transformers import BertJapaneseTokenizer
MODEL_NAME = 'cl-tohoku/bert-base-japanese-whole-word-masking'

## =================================================================
## 名称：BERT関連のオブジェクト生成
## =================================================================
class BertBuilder:
    # =================================================
    # 内容：コンストラクタ
    # 入力：GPU                   True:gpuあり、False:なし
    # 戻値：なし
    # =================================================
    def __init__(self, batch_size=16, max_token=256, max_epochs=5):
        self.batch_size = batch_size
        self.max_token = max_token
        self.max_epochs = max_epochs

    # =================================================
    # 内容：分類用の事前学習モデルを作成
    # 入力：viewer                学習状況表示用
    # 戻値：model                 事前学習モデル
    # =================================================
    def makeClassificationModel(self, viewer):
        model = BertForSequenceClassification_pl(MODEL_NAME, num_labels=2, lr=1e-5, labelName='labels', lossViewer=viewer)
        return model

    # =================================================
    # 内容：入力ファイルをデータローダーに変換
    # 入力：inputDataList         入力データのリスト
    # 戻値：dlTrain, dlVal, dlTest
    # =================================================
    def makeDataLoader(self, inputDataList):
        # トークナイザ定義
        bertTokenizer = BertJapaneseTokenizer.from_pretrained(MODEL_NAME)
        qaTokenizer = QATokenizer(bertTokenizer, None)

        inputToken = qaTokenizer.tokenizer(inputDataList, max_length=self.max_token)
        dlTrain, dlVal, dlTest = DataLoaderConverter(self.batch_size).convert(inputToken)

        self.__printInData(dlTrain)
        return [dlTrain, dlVal, dlTest]

    # =================================================
    # 内容：トレーナー作成
    # 入力：modelDir              モデル出力先フォルダ
    # 戻値：trainer, checkPoint
    # =================================================
    def makeTrainer(self, modelDir):
        gpus = -1 if torch.cuda.is_available() else 0
        return TrainerBuilder(self.max_epochs, modelDir).build(gpus)

    # =================================================
    # 内容：計算量のデバッグログ
    # 入力：
    # 戻値：
    # =================================================
    def __printInData(self, dlTrain):
        print('バッチサイズ：{0}、エポック数：{1}、最大トークンサイズ：{2}'.format(self.batch_size, self.max_epochs, self.max_token))
        print('入力データ：{0}個、1エポック：{1:.0f}STEP、合計：{2:.0f}STEP'.format(len(dlTrain.dataset),
                                                                            (len(dlTrain.dataset) / self.batch_size),
                                                                            ((len(dlTrain.dataset) / self.batch_size) * self.max_epochs)))


In [None]:
## =================================================================
## PyTorch Lightningの定義
## =================================================================
import torch
import pytorch_lightning as pl
from transformers import BertForSequenceClassification

class BertForSequenceClassification_pl(pl.LightningModule):
    # ===========================================================
    # 名称：コンストラクタ
    # 引数：model_name		 Transformersのモデルの名前
    #       num_labels		ラベルの数
    #       lr				学習率
    # ===========================================================
    def __init__(self, model_name, num_labels, lr, labelName, lossViewer=None):
        super().__init__()
        self.save_hyperparameters()		## 以降、self.hparamsでnum_labelsとlrにアクセス出来る

        # BERTのロード
        self.bert_sc = BertForSequenceClassification.from_pretrained(model_name, num_labels=num_labels)
        device = "cuda:0" if torch.cuda.is_available() else "cpu"
        self.bert_sc = self.bert_sc.to(device)

        # Viewer設定
        self.lossViewer = lossViewer
        if lossViewer == None:
            lossViewer = NullViewer()
        pass

    # 学習データのミニバッチ(`batch`)が与えられた時に損失を出力する関数を書く。
    # batch_idxはミニバッチの番号であるが今回は使わない。
    def training_step(self, batch, batch_idx):
        output = self.bert_sc(**batch)
        loss = output.loss
        self.log('train_loss', loss) # 損失を'train_loss'の名前でログをとる。
        self.lossViewer.setLoss(loss)
        return loss

    # 検証データのミニバッチが与えられた時に、
    # 検証データを評価する指標を計算する関数を書く。
    def validation_step(self, batch, batch_idx):
        output = self.bert_sc(**batch)
        val_loss = output.loss
        self.log('val_loss', val_loss) # 損失を'val_loss'の名前でログをとる。
        self.lossViewer.setValLoss(val_loss)

    # テストデータのミニバッチが与えられた時に、
    # テストデータを評価する指標を計算する関数を書く。
    def test_step(self, batch, batch_idx):
        labels = batch.pop(self.hparams.labelName) # バッチからラベルを取得
        output = self.bert_sc(**batch)
        labels_predicted = output.logits.argmax(-1)
        num_correct = ( labels_predicted == labels ).sum().item()
        accuracy = num_correct/labels.size(0) #精度
        self.log('accuracy', accuracy) # 精度を'accuracy'の名前でログをとる。

    # 学習に用いるオプティマイザを返す関数を書く。
    def configure_optimizers(self):
        return torch.optim.Adam(self.parameters(), lr=self.hparams.lr)


In [None]:
import torch
from transformers import BertForSequenceClassification
import torch.nn.functional as F
MODEL_NAME = 'cl-tohoku/bert-base-japanese-whole-word-masking'

## =================================================================
## 名称：BERTによる推論
## =================================================================
class BertGesser:
    # =================================================
    # 名称：コンストラクタ
    # 入力：dirPath : 入力CSVが入っているフォルダパス
    #       hdr_skip      True :CSVの1行目をスキップする
    #                     False:CSVの1行目スキップをしない
    # =================================================
    def __init__(self):
        self.device = "cuda:0" if torch.cuda.is_available() else "cpu"
        self.tokenizer = BertJapaneseTokenizer.from_pretrained(MODEL_NAME)
        pass

    # =================================================
    # 名称：ベストモデル読込み
    # 入力：bestModelDir  読み込むフォルダ
    # 出力：bert_sc       読み込んだモデル
    # =================================================
    def LoadModel(self, bestModelDir):
        bert_sc = BertForSequenceClassification.from_pretrained(bestModelDir)
        bert_sc = bert_sc.to(self.device)
        return bert_sc

    # =================================================
    # 名称：推論結果を作成
    # 入力：bestModel              推論に使用するモデル
    #       inputDataList          入力データ
    # 出力：outCsvData             二次元リスト
    # =================================================
    def convResultList(self, bestModel, inputDataList):
        LABEL_IMBT = 0
        LABEL_COIN = 1
        outCsvData = [['Q', 'A', '正解ラベル', 'BERT推論', 'IMBT確率', '硬貨部確率', '結果']]

        loopCnt = 1
        for line in inputDataList:
            # 推論
            outputs = self.guess(bestModel, [line[0]]) # 推論

            # 推論結果を解析
            logits = outputs.logits
            predicted_label = torch.argmax(logits, dim=1)     # 判定結果
            probs = F.softmax(logits, dim=1)
            prob_class0 = torch.tensor(probs[:, LABEL_IMBT]).item()    # IMBTの信頼度
            prob_class1 = torch.tensor(probs[:, LABEL_COIN]).item()    # 硬貨部の信頼度
            japanese_label = 'IMBT' if predicted_label==LABEL_IMBT else '硬貨部'
            judge_str = 'True' if line[2] == japanese_label else 'False'

            # 1行分の結果リストを作成
            outTmpLineList = []
            outTmpLineList.append(line[0]) # Q
            outTmpLineList.append(line[1]) # A
            outTmpLineList.append(line[2]) # 正解ラベル
            outTmpLineList.append(japanese_label) # 推論ラベル
            outTmpLineList.append(prob_class0) # 確率
            outTmpLineList.append(prob_class1) # 確率
            outTmpLineList.append(judge_str) # 結果判定

            # 行追加
            outCsvData.append(outTmpLineList)

            # 進捗ログ
            if (loopCnt % 10) == 0:
                print('{0} / {1}'.format(loopCnt, len(inputData)))
            loopCnt += 1

        return outCsvData

    # =================================================
    # 名称：推論実行
    # 入力：inputDataSet           推論入力データ
    #                              リストになっているが現状は1項目のみ入っている。
    # 出力：outputs                推論結果オブジェクト
    # =================================================
    def guess(self, bestModel, inputDataSet):
        encoding = self.tokenizer(inputDataSet[0], padding = 'longest', return_tensors='pt').to(self.device)
        encoding = { k: v for k, v in encoding.items() }
        with torch.no_grad():
            outputs = bestModel.forward(**encoding)

        return outputs


In [None]:
import csv
import glob
import os
import random

## =================================================================
## 名称：CSV読込みクラス
## =================================================================
class csvLoader:
    # =================================================
    # 名称：コンストラクタ
    # 入力：dirPath : 入力CSVが入っているフォルダパス
    #       hdr_skip      True :CSVの1行目をスキップする
    #                     False:CSVの1行目スキップをしない
    # =================================================
    def __init__(self, dirPath, hdr_skip=False):
        self.dirPath = dirPath
        self.hdr_skip = hdr_skip

    # =================================================
    # 名称：CSVファイル読込み
    # 入力：無し
    # 戻値：二次元リスト
    # 説明：CSVを読み込んで二次元リストを返す
    # =================================================
    def load(self):
        chkFile = os.path.join(self.dirPath, "*.csv")
        schFileList = glob.glob(chkFile)

        retList = []
        for fPath in schFileList:
            print('read file : {0}'.format(fPath))
            with open(fPath, 'r', encoding = 'shift_jis', errors = 'ignore') as f:
                reader = csv.reader(f)
                if self.hdr_skip == True : next(reader) # ヘッダースキップ
                for line in reader:
                    retList.append(line)
        return retList

    # =================================================
    # 名称：ラベルの偏りを調整する
    # 入力：
    # 戻値：
    # =================================================
    def adjustLabelBalance(self, inputDataList):
        ## ラベル毎に分離
        IMBTDataList = [row for row in inputDataList if row[2] == 'IMBT']
        CoinDataList = [row for row in inputDataList if row[2] != 'IMBT']

        ## 大小比較
        if len(IMBTDataList) < len(CoinDataList):
            minList = IMBTDataList
            bigList = CoinDataList
        else:
            bigList = IMBTDataList
            minList = CoinDataList

        ## 大きい方をシャッフル
        random.shuffle(bigList)

        ## サイズを合わせる
        bigList = bigList[0:len(minList)]

        ## 結合
        margeList = minList + bigList
        random.shuffle(margeList)

        return margeList


In [None]:
import torch
import random
from torch.utils.data import DataLoader

## =================================================================
## 名称：トークンデータをDataLoaderに変換する
## =================================================================
class DataLoaderConverter:
    # ========================
    # コンストラクタ
    # inputLoader      入力データロード用クラスインスタンス
    #                  load()メソッドを実装している
    # ========================
    def __init__(self, batchSize):
        self.batchSize = batchSize

    def convert(self, inputToken):
        tensorList = self.__convertTensor(inputToken)	# torchで扱える形式に変換
        dsTrain, dsVal, dsTest = self.__dataSplit(tensorList)
        print('トレーニングデータ:{0}、検証データ:{1}、テストデータ:{2}、合計:{3}'.format(len(dsTrain), len(dsVal), len(dsTest), len(tensorList)))

        # データセットからデータローダを作成
        # 学習データはshuffle=Trueにする。
        dataloader_train = DataLoader(dsTrain, batch_size=self.batchSize, shuffle=True)
        dataloader_val   = DataLoader(dsVal,   batch_size=256)
        dataloader_test  = DataLoader(dsTest,  batch_size=256)
        return [dataloader_train, dataloader_val, dataloader_test]

    ## データセット分割
    def __dataSplit(self, tensorList):
        random.shuffle(tensorList) # ランダムにシャッフル
        n = len(tensorList)
        n_train = int(0.6*n)
        n_val = int(0.2*n)
        dataset_train = tensorList[:n_train]				# 学習データ(0 ～ MAX*0.6)
        dataset_val = tensorList[n_train:n_train+n_val]		# 検証データ(MAX*0.6 ～ MAX*0.8)
        dataset_test = tensorList[n_train+n_val:]			# テストデータ(MAX*0.8 ～ MAX)
        return [dataset_train, dataset_val, dataset_test]

    ## torchで扱える形式に変換
    def __convertTensor(self, input):
        encodingList = []
        for line in input:
            encoding = { k: torch.tensor(v) for k, v in line.items() }
            encodingList.append(encoding)
        return encodingList



In [None]:
import os
import shutil
import datetime

## =================================================================
## 名称：フォルダ操作
## =================================================================
class DirController:
    def __init__(self):
        pass

    ## 削除してフォルダ作成
    def clearDir(self, dir_path):
        if os.path.exists(modelDir):
            shutil.rmtree(modelDir)
        os.mkdir(modelDir)
        pass

    ## リネームしてフォルダ作成
    def clearBackupDir(self, dir_path):
        # 現在時刻をフォルダ名に付与してリネーム
        if os.path.exists(dir_path):
            now_str = datetime.datetime.now().strftime('_%Y%m%d%H%M')
            os.rename(dir_path, dir_path + now_str)

        # フォルダ作成
        os.makedirs(dir_path)
        pass


In [None]:
## =================================================================
## 名称：処理機カテゴライズ用のtokenizer
## =================================================================
class QATokenizer:
    # ========================
    # コンストラクタ
    # inputLoader      入力データロード用クラスインスタンス
    #                  load()メソッドを実装している
    # ========================
    def __init__(self, bertTokenizer, inputLoader):
        self.bertTokenizer = bertTokenizer
        self.inputLoader = inputLoader

    # ========================
    # トークン化
    # inputDataList    二次元リスト
    # ========================
    def tokenizer(self, inputDataList, max_length=512):
        retList = []
        for line in inputDataList:
            text = line[0]
            label = line[2]

            encoding = self.bertTokenizer(text, max_length = max_length, padding = 'max_length', truncation = True)
            encoding['labels'] = 0 if label == 'IMBT' else 1
            retList.append(encoding)

        return retList


In [None]:
import pytorch_lightning as pl

## =================================================================
## 名称：トレーニング用のインスタンス作成
## =================================================================
class TrainerBuilder:
    # ========================
    # コンストラクタ
    # ========================
    def __init__(self, epochs, modelTempPath):
        self.epochs = epochs
        self.modelTempPath = modelTempPath
        pass

    def build(self, gpus):
        # 学習時にモデルの重みを保存する条件を指定
        checkpoint = pl.callbacks.ModelCheckpoint(
            monitor = 'val_loss',
            mode = 'min',
            save_top_k = 1,
            save_weights_only = True,
            dirpath = self.modelTempPath,
            )

        # 学習の方法を指定
        trainer = pl.Trainer(
            gpus = gpus,                 # 使用GPU
            val_check_interval = 0.1,    # トレーニング中の検証呼出し率
            max_epochs = self.epochs,    # エポック数
            callbacks = [checkpoint]
            )

        return [trainer, checkpoint]


In [None]:
import numpy as np
import matplotlib.pyplot as plt
import torch
from livelossplot import PlotLosses

## =================================================================
## 名称：学習状況の表示
## =================================================================
class trainViewer:
    # ========================
    # コンストラクタ
    # ========================
    def __init__(self):
        self.loss = []
        self.val_loss = []
        self.liveloss = PlotLosses()

    def setLoss(self, loss):
        self.loss.append(loss)
        logs = {}
        logs['loss'] = loss.detach().cpu()
        self.liveloss.update(logs)
        self.liveloss.send()

    def setValLoss(self, val_loss):
        self.val_loss.append(val_loss)
        logs = {}
        logs['val_loss'] = val_loss.detach().cpu()
        self.liveloss.update(logs)
        self.liveloss.send()


class trainLogger:
    # ========================
    # コンストラクタ
    # ========================
    def __init__(self):
        self.loss = []
        self.val_loss = []
        self.liveloss = PlotLosses()

    def setLoss(self, loss):
        self.loss.append(loss)
        print('loss : {0}'.format(loss))

    def setValLoss(self, val_loss):
        self.val_loss.append(val_loss)
        print('val_loss : {0}'.format(val_loss))


## Null Object
class NullViewer:
    def setLoss(loss):
        pass
    def setValLoss(val_loss):
        pass


# 学習処理メイン
---

In [None]:
## =================================================================
## データ作成
## =================================================================
import os

## 学習状態表示ビューアを定義
plotViewer = trainViewer()

## BERTクラス分類用プレモデル生成
builder = BertBuilder(batch_size=16, max_token=256, max_epochs=1)
preModel = builder.makeClassificationModel(plotViewer)

## ファイルロード
inputLoader = csvLoader(os.path.join(workDirPath, 'Data'), hdr_skip=True)
inputData = inputLoader.load()
inputData = inputLoader.adjustLabelBalance(inputData) # IMBTと硬貨部の個数を合わせる
inputData = inputData[0:100]

## DataLoader作成
dlTrain, dlVal, dlTest = builder.makeDataLoader(inputData)

## トレーナー作成
modelDir = os.path.join(workDirPath, 'Model/')
trainer, checkPoint = builder.makeTrainer(modelDir)


Downloading:   0%|          | 0.00/479 [00:00<?, ?B/s]

Downloading:   0%|          | 0.00/424M [00:00<?, ?B/s]

Some weights of the model checkpoint at cl-tohoku/bert-base-japanese-whole-word-masking were not used when initializing BertForSequenceClassification: ['cls.predictions.transform.LayerNorm.weight', 'cls.predictions.transform.dense.bias', 'cls.predictions.decoder.weight', 'cls.seq_relationship.bias', 'cls.seq_relationship.weight', 'cls.predictions.bias', 'cls.predictions.transform.dense.weight', 'cls.predictions.transform.LayerNorm.bias']
- This IS expected if you are initializing BertForSequenceClassification from the checkpoint of a model trained on another task or with another architecture (e.g. initializing a BertForSequenceClassification model from a BertForPreTraining model).
- This IS NOT expected if you are initializing BertForSequenceClassification from the checkpoint of a model that you expect to be exactly identical (initializing a BertForSequenceClassification model from a BertForSequenceClassification model).
Some weights of BertForSequenceClassification were not initialize

read file : /content/drive/MyDrive/SoftmBert/Data/202109.csv
read file : /content/drive/MyDrive/SoftmBert/Data/202107.csv
read file : /content/drive/MyDrive/SoftmBert/Data/202106.csv
read file : /content/drive/MyDrive/SoftmBert/Data/202105.csv
read file : /content/drive/MyDrive/SoftmBert/Data/202203.csv
read file : /content/drive/MyDrive/SoftmBert/Data/202111.csv
read file : /content/drive/MyDrive/SoftmBert/Data/202202.csv
read file : /content/drive/MyDrive/SoftmBert/Data/202110.csv
read file : /content/drive/MyDrive/SoftmBert/Data/202201.csv
read file : /content/drive/MyDrive/SoftmBert/Data/202108.csv
read file : /content/drive/MyDrive/SoftmBert/Data/202104.csv
read file : /content/drive/MyDrive/SoftmBert/Data/202112.csv


Downloading:   0%|          | 0.00/252k [00:00<?, ?B/s]

Downloading:   0%|          | 0.00/110 [00:00<?, ?B/s]

INFO:pytorch_lightning.utilities.rank_zero:GPU available: False, used: False
INFO:pytorch_lightning.utilities.rank_zero:TPU available: False, using: 0 TPU cores
INFO:pytorch_lightning.utilities.rank_zero:IPU available: False, using: 0 IPUs
INFO:pytorch_lightning.utilities.rank_zero:HPU available: False, using: 0 HPUs


トレーニングデータ:60、検証データ:20、テストデータ:20、合計:100
バッチサイズ：16、エポック数：1、最大トークンサイズ：256
入力データ：60個、1エポック：4STEP、合計：4STEP


In [None]:
## =================================================================
## ファインチューニング
## =================================================================
# ディレクトリを空にする
DirController().clearDir(modelDir)

## fit
trainer.fit(preModel, dlTrain, dlVal)


In [None]:
## =================================================================
## 結果の評価
## =================================================================
print('ベストモデルのファイル: ', checkPoint.best_model_path)
print('ベストモデルの検証データに対する損失: ', checkPoint.best_model_score)

# チューニング済みモデルをロード
tuneModel = preModel.load_from_checkpoint(checkPoint.best_model_path)

# 評価実施
testResult = trainer.test(tuneModel, dlTest)
print(f'Accuracy: {testResult[0]["accuracy"]:.2f}')


In [None]:
## =================================================================
## モデル保存
## =================================================================
# フォルダを空にする
resultDirPath = os.path.join(workDirPath, 'Result')
DirController().clearDir(resultDirPath)

# 保存
saveDirectory = os.path.join(resultDirPath, 'model_transformers')
tuneModel.bert_sc.save_pretrained( saveDirectory )
print('チューニング済みモデル保存先：', saveDirectory)


チューニング済みモデル保存先： /content/drive/MyDrive/SoftmBert/Result/model_transformers


In [None]:
## Colabのランタイムを停止＆削除
from google.colab import runtime
runtime.unassign()

# 推論実行（テスト）
---

In [None]:
## =================================================================
## 推論実行
## =================================================================
## ファイルロード
inputLoader = csvLoader(os.path.join(workDirPath, 'Data'), True)
inputData = inputLoader.load()
#inputData = inputLoader.adjustLabelBalance(inputData) # IMBTと硬貨部の個数を合わせる

## モデル準備
guesser = BertGesser()
bestModelDir = os.path.join(workDirPath, 'Result_20230327', 'model_transformers') # todo Resultフォルダ名
bestModel = guesser.LoadModel(bestModelDir)

## テスト実行
#inputData = inputData[:1]
csvList = guesser.convResultList(bestModel, inputData)

## CSV保存
import pandas as pd
df = pd.DataFrame(csvList)
df.to_csv('guessResult.csv', index=False, header=False, encoding='sjis')

## ローカルDL
from google.colab import files
files.download('guessResult.csv')
