<a href="https://colab.research.google.com/github/zmy6208/zmy6208/blob/main/CRoBERTa_EmotionalAnalysis_Classification_20240708.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# 実行環境の設定

In [None]:
# GPUおよびPython環境のチェック
#import locale
#locale.getpreferredencoding = lambda: "UTF-8"
!nvidia-smi
!python --version

# Google Drive のマウント
from google.colab import drive
drive.mount('/content/gdrive')

# パッケージインストール

In [None]:
!pip install sentencepiece transformers pytorch-lightning logzero
import pytorch_lightning as pl
from transformers import AutoTokenizer, AutoModel, AutoModelForSequenceClassification

#
import numpy as np
import pandas as pd
import openpyxl as op
import shutil
import json
import sys
import os, torch, datetime
import glob
from time import time
from tqdm import tqdm
from sklearn import metrics
from sklearn.model_selection import train_test_split
!pip install logzero
from logzero import logger
from torch.utils.data import DataLoader

# ファイルパス

In [None]:
# パス分離記号
PATH_SEPARATOR = "/"

###
# プロジェクトルート
PROJECT_ROOT = "/content/gdrive/MyDrive/CRoBERTa/"

# CRoBERTa pretrained_model path
PRETRAINED_MODEL = PROJECT_ROOT + "chinese-roberta-wwm-ext"

# 訓練データを格納するディレクトリ
TRAINING_DATA_DIR = PROJECT_ROOT + "corpus/"
TRAINING_DATA = TRAINING_DATA_DIR + "Corpus-Review+Corpus-Novel.xlsx"

# Fine-tuning済みモデルの保存先
MODEL_SAVE_DIR = PROJECT_ROOT + "classification/models/"

# 11感情分類CRoBERTaモデル
SELECTED_MODEL_DIR = PROJECT_ROOT + "classification/selected/"    # 11感情分類モデルの保存先

# 分類ターゲットデータ名
TARGET_DATA_NAME = "target.xlsx"                                # 分類対象ファイル名に変更する！

# 分類ターゲットデータ保存先
CLASSIFICATION_INPUT_DIR = PROJECT_ROOT + "data/input/"
CLASSIFICATION_INPUT_FILENAME = CLASSIFICATION_INPUT_DIR + TARGET_DATA_NAME

# 分類結果保存先
CLASSIFICATION_RESULT_DIR = PROJECT_ROOT + "data/result/" + TARGET_DATA_NAME[:-5]
CLASSIFICATION_RESULT_FILENAME = CLASSIFICATION_RESULT_DIR + PATH_SEPARATOR + "分類結果.xlsx"

# 学習モデルおよびパラメータ設定

In [None]:
# 11感情カテゴリのリスト
TARGET_EMOTIONS = ["喜", "怒", "哀", "怖", "恥", "好", "厭", "昂", "安", "驚", "望"]

# 事前学習モデルの名前(Huggingface)
MODEL = AutoModel.from_pretrained(PRETRAINED_MODEL)
# Tokenizer
TOKENIZER = AutoTokenizer.from_pretrained(PRETRAINED_MODEL)
TOKENIZER.do_lower_case = True

# 計算資源 (GPUが使用できる場合はCUDA, できない場合はCPUが選択される)
DEVICE = torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu")

# 学習パラメータ
#EPOCHS = 3
#TRAIN_BATCH_SIZE = 32
#VAL_BATCH_SIZE = 64
#TEST_BATCH_SIZE = 64
PREDICT_BATCH_SIZE = 64

# ファイル入出力ユーティリティ

In [None]:
### jsonファイル入出力
# jsonファイルの読み込み：必要な列数に書き換える
def read_json_file(JSON_DATAFILE):
    pd_data = pd.read_json(JSON_DATAFILE)
    ndaray_data = pd_data.to_numpy(copy=True).transpose()
    list_first_column = list(ndaray_data[0])
    list_second_column = list(ndaray_data[1])
    #print(list_first_column)
    #print(list_second_column)
    return list_first_column, list_second_column

# 2次元list_dataリストのjsonファイルへの書き出し
def write_json_file(JSON_DATAFILE, list_data):
    with open(JSON_DATAFILE, "w") as f:
        json.dump(list_data, f)

### excelファイル入出力
# excelデータの読み込み
def read_excel_file(EXCEL_DATAFILE):
    input_book = pd.ExcelFile(EXCEL_DATAFILE)
    input_sheet_name = input_book.sheet_names
    input_sheet_df = input_book.parse(input_sheet_name[0])
    transposed_df = input_sheet_df.transpose()
    data = transposed_df.values.tolist()
    #print("Number of columns:", len(data))
    if len(data) == 13:                              # Chinese corpus for emotional analysis
        sentences = data[1]
        emotion_data = np.array(data[2:13])
    else:
        print("Excel : not supported format")
        sys.exit()
    return len(data), sentences, emotion_data

# excelファイルへの書き出し
def write_excel_file(EXCEL_DATAFILE, pd_data):
    pd_data.to_excel(EXCEL_DATAFILE, index=False, header=True)

## textファイルの入出力
# textデータの読み込み
def read_text_file(TEXT_DATAFILE):
    sentences = []
    with open(TEXT_DATAFILE,mode='r', encoding='utf-8') as f:
        for s in f:
            try:
                sentences.append(s.rstrip())
            except Exception as e:
                print(e)
    return sentences

# textファイルへの書き出し
def write_text_file(TEXT_DATAFILE, sentences):
    with open(TEXT_DATAFILE, mode='w', encoding='utf-8') as f:
        f.write('\n'.join(sentences))

# 11感情分類実験用モデルを選択：！変更しないときは実行しない！

In [None]:
### モデルを変更しないときは実行しない
#
def select_finetuned_model():
    for e in TARGET_EMOTIONS:
        files = []
        for fname in os.listdir(MODEL_SAVE_DIR + e):
            if fname.endswith(".ckpt"):
                #print(fname)
                files.append(fname)
        if len(files) == 0:
            print(e + "モデルは", MODEL_SAVE_DIR + e + PATH_SEPARATOR, "にありません。")
            continue
        elif len(files) == 1:
            os.makedirs(SELECTED_MODEL_DIR + e, exist_ok=True)
            shutil.copy(MODEL_SAVE_DIR + e + PATH_SEPARATOR + files[0], SELECTED_MODEL_DIR + e + PATH_SEPARATOR)
            print(e + "モデルを" + SELECTED_MODEL_DIR + e + PATH_SEPARATOR + files[0] + "にコピーしました。")
        elif len(files) > 1:
            sorted_files = sorted(files)
            print('モデルをリストにします。(番号： モデル名)')
            nf = 0
            for fname in sorted_files:
                nf += 1
                print(str(nf) + ": " +  fname)
            print('q: 終了')
            print()
            while True:
                nstr = input('ファイル番号を入力してください。(コピーしない：q または Q) >> ')
                if nstr == 'q' or nstr == 'Q':
                    print(e + "モデルはコピーしません。")
                    break
                try:
                    nfile = int(nstr) - 1
                except Exception:
                    print('1以上' + str(nf) + '以下のファイル番号（半角数字）を入力してください。')
                    print()
                    continue
                if nfile < 0 and nf <= nfile:
                    print('1以上' + str(nf) + '以下のファイル番号（半角数字）を入力してください。')
                    print()
                    continue
            os.makedirs(SELECTED_MODEL_DIR + e, exist_ok=True)
            shutil.copy(MODEL_SAVE_DIR + e + PATH_SEPARATOR + files[nfile], SELECTED_MODEL_DIR + e + PATH_SEPARATOR)
            print(e + "モデルを" + SELECTED_MODEL_DIR + e + PATH_SEPARATOR + files[nfile] + "にコピーしました。")

select_finetuned_model()

# 11感情分類データの前処理

In [None]:
## targetデータの前処理
class ExcelDataset():
    def __init__(self, tokenizer, add_cls=False) -> None:
        self.tokenizer = tokenizer
        self.add_cls = add_cls

    # 文の前に"[CLS]"を追加する関数 (rinna RoBERTa用)
    # (※ rinna RoBERTaのT5Tokenizerでは[CLS]トークン自動で追加しないため、自分で分析文の前に追加する)
    # 詳細：https://huggingface.co/rinna/japanese-roberta-base
    def _add_cls(self, sentence: str) -> str:
        return "[CLS]" + sentence

    # Finetuningデータ全体から求めたmax_length（最大トークン数）を読み込む関数
    def _get_max_length(self) -> int:
        max_length = read_text_file(MODEL_SAVE_DIR + "max_length")
        max_length = int(max_length[0])
        #print("max_length:", max_length)
        return max_length

    # targetの文データを ID 化し、RoBERTaに入力できる形に変換する
    def build(self, file_dir: str) -> tuple:
        target = []

        target_df = pd.read_excel(os.path.join(file_dir, CLASSIFICATION_INPUT_FILENAME))
        max_length = self._get_max_length()

        logger.info("Loading target data")
        for _index, data in target_df.iterrows():
            #print(type(data),type(data[0]),data[0])
            sentence = self._add_cls(data["sentence"]) if self.add_cls else data["sentence"]
            encoding = self.tokenizer.encode_plus(sentence, max_length=max_length, padding="max_length")
            encoding["labels"] = 0
            encoding = {k: torch.tensor(v) for k, v in encoding.items()}
            target.append(encoding)
        logger.info("Loading completed")
        return target

#dataset = ExcelDataset(tokenizer=TOKENIZER, add_cls=True)          # rinna RoBERTa用
dataset = ExcelDataset(tokenizer=TOKENIZER, add_cls=False)          # CRoBERTa用


# targetデータの読み込み
target = dataset.build(
    file_dir=TRAINING_DATA_DIR
)

# ミニバッチ化
dataloader_target = DataLoader(target, batch_size=PREDICT_BATCH_SIZE)

# RoBERTaモデルの定義

In [None]:
# モデルのラッパークラス
class RobertaForSequenceClassification_pl(pl.LightningModule):
    def __init__(self, model_name, num_labels, lr):
        super().__init__()
        self.save_hyperparameters()

        # 事前学習済みRoBERTaの読み込み
        self.bert_sc = AutoModelForSequenceClassification.from_pretrained(model_name, num_labels=num_labels)

    def training_step(self, batch, batch_idx):
        output = self.bert_sc(**batch)
        loss = output.loss
        self.log("train_loss", loss)
        return loss

    def validation_step(self, batch, batch_idx):
        output = self.bert_sc(**batch)
        val_loss = output.loss
        self.log("val_loss", val_loss)

    def test_step(self, batch, batch_idx):
        global result_df
        labels = batch.pop("labels")
        output = self.bert_sc(**batch)
        labels_predicted = output.logits.argmax(dim=-1)
        num_correct = (labels_predicted==labels).sum().item()
        accuracy = num_correct / labels.size(0)
        self.log("accuracy", accuracy)

    def configure_optimizers(self):
        return torch.optim.Adam(self.parameters(), lr=self.hparams.lr)


# 11感情分類の実行

In [None]:
def CRoBERTa_clssification(emotion):
    # 訓練済みモデルの読み込み
    #model_pl = RobertaForSequenceClassification_pl.load_from_checkpoint(best_model_path)
    # model_pl = RobertaForSequenceClassification_pl.load_from_checkpoint(f"{save_model_dir}/{***.ckpt}")
    model_pl = None
    for fname in os.listdir(SELECTED_MODEL_DIR + emotion):
        if fname.endswith(".ckpt"):
            print(fname)
            model_pl = RobertaForSequenceClassification_pl.load_from_checkpoint(SELECTED_MODEL_DIR + emotion + PATH_SEPARATOR + fname)

    # ラッパーからRoBERTaモデルを取り出す
    model = model_pl.bert_sc.to(DEVICE)

    # 教師ラベル と 予測ラベル のリスト
    pred_labels = []

    # テストデータのミニバッチを1つずつ取り出し、予測値をリストに格納する
    for batch in tqdm(dataloader_target):
        batch = {k: v.to(DEVICE) for k, v in batch.items()}
        with torch.no_grad():
            output = model(**batch)
        score = output.logits
        labels_predicted = score.argmax(dim=-1)
        pred_labels.extend(labels_predicted.cpu().numpy())

    del batch, output, score, labels_predicted
    torch.cuda.empty_cache()

    return pred_labels

###
# 文と予測値のDataFrameを作成

# target.xlsx の文のDataFrameを構成
sentences = pd.read_excel(os.path.join(CLASSIFICATION_INPUT_DIR, CLASSIFICATION_INPUT_FILENAME))["sentence"].to_list()
sentence_df = pd.DataFrame(sentences, columns=["sentence"])
#print(sentence_df)

# 11感情分類予測値のDataFrameｂのリストを構成
pred_labels_df_e_list = []
for e in TARGET_EMOTIONS:
    pred_labels = CRoBERTa_clssification(e)
    pred_labels_df_e = pd.DataFrame(pred_labels, columns=[e])
    pred_labels_df_e_list.append(pred_labels_df_e)

# 文と分類結果を連結したDataFrameを構成
pred_labels_df = pd.concat([sentence_df] + pred_labels_df_e_list, axis=1)
#print(pred_labels_df)

# 分析結果の保存先
result_dir = CLASSIFICATION_RESULT_DIR
# 保存先がない場合は作成
if not os.path.exists(result_dir):
    os.makedirs(result_dir, exist_ok=True)

# 分析結果を保存
write_excel_file(CLASSIFICATION_RESULT_FILENAME, pred_labels_df)
print("11感情分類結果を" + CLASSIFICATION_RESULT_FILENAME + "に出力しました。")