In [None]:
import os
import numpy as np
import pandas as pd
from simpletransformers.classification import ClassificationModel, ClassificationArgs
import logging
from sklearn.model_selection import train_test_split
import torch
import random
from tqdm import tqdm
from torch.utils.data import DataLoader
from transformers import BertTokenizer, BertForSequenceClassification
import pytorch_lightning as pl
#GPUの指定
os.environ["CUDA_VISIBLE_DEVICES"]='0'
#pre_train_modelの指定
MODEL_NAME='cl-tohoku/bert-base-japanese-whole-word-masking'
tokenizer = BertTokenizer.from_pretrained(MODEL_NAME)
#batch_size
batch=1

In [None]:
df=pd.read_csv("tmp.csv")

In [None]:
#train-val-testを8:1:1で分ける本当は固めといた方が良い
index_train=int(len(df)*0.8)
index_val=int(len(df)*0.1)
index_test=int(len(df)*0.1)
df_train=df.iloc[0:index_train,1:]
df_val=df.iloc[index_train:index_train+index_val,1:]
df_test=df.iloc[index_train+index_val:index_train+index_val+index_test,1:]

In [None]:
from sklearn.preprocessing import LabelEncoder
L=LabelEncoder()
df["target_label"]=L.fit_transform(df["target_label"])
df_train["target_label"]=L.transform(df_train["target_label"])
df_test["target_label"]=L.transform(df_test["target_label"])
df_val["target_label"]=L.transform(df_val["target_label"])

In [None]:
df_train=df_train.reset_index(drop=True)
df_val=df_val.reset_index(drop=True)
df_test=df_test.reset_index(drop=True)

In [None]:
df_train["text_data"]=df_train["text_data"].astype(str)
df_val["text_data"]=df_val["text_data"].astype(str)
df_test["text_data"]=df_test["text_data"].astype(str)

In [None]:
#分類するクラス数
num_class=df_train["target_label"].max()

In [None]:
sentences_text_train=df_train.tweet.values
distance_list_train=df_train.code.values
dataset_for_loader_train = []
i=0
for sentence in sentences_text_train:
    encoding = tokenizer(
                sentence,                      
                max_length = 100,           # 文章の長さを固定（Padding/Trancatinating）
                pad_to_max_length = True,# PADDINGで埋める
                truncation=True,
                #return_tensors = 'pt'
                )
    encoding['labels'] = distance_list_train[i]
    i=i+1
    encoding = { k: torch.tensor(v) for k, v in encoding.items() }
    dataset_for_loader_train.append(encoding)

In [None]:
sentences_text_val=df_val.tweet.values
distance_list_val=df_val.code.values
dataset_for_loader_val = []
i=0
for sentence in sentences_text_val:
    encoding = tokenizer(
                sentence,                      
                max_length = 100,           # 文章の長さを固定（Padding/Trancatinating）
                pad_to_max_length = True,# PADDINGで埋める
                truncation=True,
                #return_tensors = 'pt'
                )
    encoding['labels'] = distance_list_val[i]
    i=i+1
    encoding = { k: torch.tensor(v) for k, v in encoding.items() }
    dataset_for_loader_val.append(encoding)

In [None]:
sentences_text_test=df_test.tweet.values
distance_list_test=df_test.code.values
dataset_for_loader_test = []
i=0
for sentence in sentences_text_test:
    encoding = tokenizer(
                sentence,                      
                max_length = 107,           # 文章の長さを固定（Padding/Trancatinating）
                pad_to_max_length = True,# PADDINGで埋める
                truncation=True,
                #return_tensors = 'pt'
                )
    encoding['labels'] = distance_list_test[i]
    i=i+1
    encoding = { k: torch.tensor(v) for k, v in encoding.items() }
    dataset_for_loader_test.append(encoding)

In [None]:
dataset_train = dataset_for_loader_train
dataset_val = dataset_for_loader_val
dataset_test = dataset_for_loader_test

dataloader_train = DataLoader(dataset_train, batch_size=batch) 
dataloader_val = DataLoader(dataset_val, batch_size=batch)
dataloader_test = DataLoader(dataset_test, batch_size=batch)

In [None]:
from torch import nn
import torch.nn.functional as F
from transformers import AutoModel
from transformers import BertModel
class BertForSequenceClassifier_pl(pl.LightningModule):
    def __init__(self, model_name, lr, num_class):
        # model_name: Transformersのモデルの名前
        # num_labels: ラベルの数
        # lr: 学習率
        super().__init__()
        # 引数のnum_labelsとlrを保存。
        # 例えば、self.hparams.lrでlrにアクセスできる。
        # チェックポイント作成時にも自動で保存される。
        self.save_hyperparameters()
        # BERTのロード
        self.bert = BertModel.from_pretrained(model_name)
        self.classifier = nn.Linear(self.bert.config.hidden_size, num_class)
        #損失関数の設定回帰であればnn.MSELossなどに
        self.criterion = nn.CrossEntropyLoss()
        # BertLayerモジュールの最後を勾配計算ありに変更
        for param in self.bert.parameters():
            param.requires_grad = False
        for param in self.bert.encoder.layer[-1].parameters():
            param.requires_grad = True

    def forward(self, input_ids, attention_mask, labels=None):
        output = self.bert(input_ids, attention_mask=attention_mask)
        preds= self.classifier(output.pooler_output)
        loss = 0
        if labels is not None:
            loss = self.criterion(preds, labels)
        #print(f"tihi is {loss}")
        return loss, preds
    # trainのミニバッチに対して行う処理
    def training_step(self, batch, batch_idx):
        loss, preds = self.forward(input_ids=batch["input_ids"],
                                    attention_mask=batch["attention_mask"],
                                    labels=batch["labels"])
        self.log('train_loss', loss)
        return {'loss': loss,
                'batch_preds': preds,
                'batch_labels': batch["labels"]}

    # validation、testでもtrain_stepと同じ処理を行う
    def validation_step(self, batch, batch_idx):
        loss, preds = self.forward(input_ids=batch["input_ids"],
                                    attention_mask=batch["attention_mask"],
                                    labels=batch["labels"])
        return {'loss': loss,
                'batch_preds': preds,
                'batch_labels': batch["labels"]}

    def test_step(self, batch, batch_idx):
        loss, preds = self.forward(input_ids=batch["input_ids"],
                                    attention_mask=batch["attention_mask"],
                                    labels=batch["labels"])
        return {'loss': loss,
                'batch_preds': preds,
                'batch_labels': batch["labels"]}

    # epoch終了時にvalidationのlossとaccuracyを記録
    def validation_epoch_end(self, outputs, mode="val"):
        # loss計算
        epoch_preds = torch.cat([x['batch_preds'] for x in outputs])
        epoch_labels = torch.cat([x['batch_labels'] for x in outputs])
        epoch_loss = self.criterion(epoch_preds, epoch_labels)
        self.log(f"{mode}_loss", epoch_loss, logger=True)

        num_correct = (epoch_preds.argmax(dim=1) == epoch_labels).sum().item()
        epoch_accuracy = num_correct / len(epoch_labels)
        self.log(f"{mode}_accuracy", epoch_accuracy, logger=True)

    # testデータのlossとaccuracyを算出（validationの使いまわし）
    def test_epoch_end(self, outputs):
        return self.validation_epoch_end(outputs, "test")

    # 学習に用いるオプティマイザを返す関数を書く。
    def configure_optimizers(self):
        return torch.optim.Adam(self.parameters(), lr=self.hparams.lr)

In [None]:
checkpoint = pl.callbacks.ModelCheckpoint(
    monitor='val_loss',
    mode='min',
    save_top_k=1,
    save_weights_only=True,
    dirpath='model/',#モデルを保存する先を指定
)

# 学習の方法を指定
trainer = pl.Trainer(
    gpus=1,
    max_epochs=5,#epoch数の指定
    callbacks=[checkpoint]
)
model = BertForSequenceClassifier_pl(
    model_name=MODEL_NAME, lr=1e-5 , num_class=num_class+1
)

# ファインチューニングを行う。
trainer.fit(model, dataloader_train, dataloader_val)

In [None]:
test = trainer.test(dataloaders=dataloader_test)

In [None]:
best_model_path = checkpoint.best_model_path # ベストモデルのファイル
print(best_model_path)