In [None]:
# HuggingFace関連のライブラリー
!pip install transformers[torch]
!pip install datasets
!pip install huggingface_hub
# トークナイザー用に日本語対応パッケージ
!pip install ipadic
!pip install fugashi
!pip install unidic_lite
# その他
# !pip install wandb  # transformersライブラリーのTrainerクラスのtrainメソッドを行う時のエラー対策
!pip install torchinfo

In [None]:
import numpy as np
import pandas as pd
import pprint
import torch
import torchinfo
import transformers
import datasets
from sklearn import metrics
from google.colab import userdata
from huggingface_hub import login

In [None]:
print("numpy：", np.__version__)
print("pandas：", pd.__version__)
print("torch：", torch.__version__)
print("transformers：", transformers.__version__)
print("datasets：", datasets.__version__)

In [None]:
DEVICE = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
HuggingFace_API_KEY = userdata.get("HuggingFace_API_KEY")

**テキスト分類を学習するための学習データを収集**

In [None]:
# ネタになるデータをHuggingFaceから取得
dataset_dict = datasets.load_dataset("tyqiangz/multilingual-sentiments", "japanese")

In [None]:
# 取得したデータと、そのタイプを確認
print(dataset_dict)
print(type(dataset_dict))
# 取得したデータの項目を確認(trainを対象にして確認)
print("データの項目")
pprint.pprint(dataset_dict["train"].features)
# DatasetDictのtrainのデータの中身を2つ確認
print("データの中身")
pprint.pprint(dataset_dict["train"][0:2])
# 取得したデータ量が多いので、用いるデータは全体の1%だけにする
dataset_dict["train"] = dataset_dict["train"].select(indices=range(dataset_dict["train"].num_rows//1))
dataset_dict["validation"] = dataset_dict["validation"].select(indices=range(dataset_dict["validation"].num_rows//1))
dataset_dict["test"] = dataset_dict["test"].select(indices=range(dataset_dict["test"].num_rows//1))
# データ量を減らしたdataset_dictを確認
print(dataset_dict)

DatasetDictをDataFrameに変換してcsvファイルで保存する(後続処理でデータをシャッフルするため)

In [None]:
# DatasetDictをDataFrameに変換する準備
dataset_dict.set_format(type="pandas")
# それぞれtrainとvalidationとtestをcsvファイルに保存
dataset_dict["train"].to_csv(path_or_buf="./hugging_face_dataset_train.csv",
                             index=None)
dataset_dict["validation"].to_csv(path_or_buf="./hugging_face_dataset_validation.csv",
                                  index=None)
dataset_dict["test"].to_csv(path_or_buf="./hugging_face_dataset_test.csv",
                            index=None)
# データのタイプをDatasetDictに戻す
# hugging_face_dataset.reset_format()

csvファイルからDataFrameにしてデータをシャッフル後にDatasetDictにする

In [None]:
# csvファイルをDataFrameとして読み込み
train_df = pd.read_csv(filepath_or_buffer="./hugging_face_dataset_train.csv",
                       encoding="utf-8",
                       index_col=False)
validation_df = pd.read_csv(filepath_or_buffer="./hugging_face_dataset_validation.csv",
                            encoding="utf-8",
                            index_col=False)
test_df = pd.read_csv(filepath_or_buffer="./hugging_face_dataset_test.csv",
                      encoding="utf-8",
                      index_col=False)
# DataFrameの中身をシャッフルする
train_df = train_df.sample(frac=1).reset_index(drop=True)
validation_df = validation_df.sample(frac=1).reset_index(drop=True)
test_df = test_df.sample(frac=1).reset_index(drop=True)
# データ数を全体の5%のみにする
train_df = train_df.iloc[0:len(train_df)//20, :]
validation_df = validation_df.iloc[0:len(validation_df)//20, :]
test_df = test_df.iloc[0:len(test_df)//20, :]
# 事前学習済みBERTモデルで使えるようにDataFrameをDatasetへ変換
train_ds = datasets.Dataset.from_pandas(df=train_df)
validation_ds = datasets.Dataset.from_pandas(df=validation_df)
test_ds = datasets.Dataset.from_pandas(df=test_df)
# 変換したDatasetのlabel(正解ラベル)はint64の0,1,2になっているので、正解ラベルとしての0,1,2になるようにint64をClassLabelへ変換
class_labels = datasets.ClassLabel(num_classes=3,
                                   names=["positive", "neutral", "negative"])
train_ds = train_ds.cast_column(column="label",
                                feature=class_labels)
validation_ds = validation_ds.cast_column(column="label",
                                          feature=class_labels)
test_ds = test_ds.cast_column(column="label",
                              feature=class_labels)
# 各DatasetをDatasetDictで纏める
dataset_dict_from_csv = datasets.DatasetDict({"train": train_ds,
                                              "validation": validation_ds,
                                              "test": test_ds})
# DatasetDictを確認
print(dataset_dict_from_csv)
print(type(dataset_dict_from_csv))
# DatasetDictのデータの項目を確認(trainを対象にして確認)
pprint.pprint(dataset_dict_from_csv["train"].features)
# DatasetDictのtrainのデータの中身を2つ確認
pprint.pprint(dataset_dict_from_csv["train"][0:2])

**東北大BERTモデルで使われているトークナイザーを取得**

In [None]:
# 東北大BERTモデルのチェックポイント名
model_checkpoint = "cl-tohoku/bert-base-japanese-whole-word-masking"
# 東北大の事前学習済みBERTモデルのトークナイザーを取得
tokenizer = transformers.AutoTokenizer.from_pretrained(pretrained_model_name_or_path=model_checkpoint)
# 使っているトークナイザーの単語toIDデータ
pprint.pprint(tokenizer.get_vocab())

In [None]:
# 学習データのDatasetDictをトークン化する際に使う関数
def tokenize_datasetdict(batch):
    texts = [str(text) for text in batch["text"] if text is not None]  # text input must be of type str (single example), List[str] (batch or single pretokenized example) or List[List[str]を防ぐため
    print(type(texts), texts, len(texts), type(texts[0]), texts[0], sep="\n")
    return tokenizer(texts, padding=True, truncation=True, add_special_tokens=True, return_tensors="pt")

In [None]:
# 学習データのDatasetDictをトークン化(map関数は、pandasのapply関数みたいなイメージ)
tokenized_dataset_dict = dataset_dict_from_csv.map(function=tokenize_datasetdict,
                                                   batched=True,
                                                   batch_size=None)

In [None]:
# トークン化した学習データのDatasetDictを確認
print(tokenized_dataset_dict)
for feature in tokenized_dataset_dict["train"].features.keys():
    print("データ項目：{a}".format(a=feature))
    print(tokenized_dataset_dict["train"][feature][0])
ids_to_tokens = tokenizer.convert_ids_to_tokens(tokenized_dataset_dict["train"]["input_ids"][0])
print("データ項目：token")
print(ids_to_tokens)

**東北大BERTモデルを取得**

In [None]:
# 東北大BERTモデルのチェックポイント名
model_checkpoint = "cl-tohoku/bert-base-japanese-whole-word-masking"
# 出力層のパーセプトロン数を、データのラベル数の形にして東北大BERTモデルを取得
n_class = tokenized_dataset_dict["train"].features["label"].num_classes
nn_model = transformers.AutoModelForSequenceClassification.from_pretrained(pretrained_model_name_or_path=model_checkpoint,
                                                                           num_labels=n_class).to(DEVICE)

In [None]:
torchinfo.summary(nn_model,
                  input_size=(16, 512),
                  dtypes=[torch.int])
# dtypes(入力データの型)はデフォルトでfloatだけれど、自然言語処理では入力データのトークンはintなので、intを指定
# (参考URL：https://cocoinit23.com/huggingface-runtimeerror-failed-to-run-torchinfo/)

**FineTuningを行う前での文章分類**

In [None]:
# 推論(テストデータを対象にして予測)
test_token_tensor = torch.LongTensor(np.array(tokenized_dataset_dict["test"]["input_ids"])).to(DEVICE)
test_attention_mask_tensor = torch.LongTensor(np.array(tokenized_dataset_dict["test"]["attention_mask"])).to(DEVICE)
with torch.no_grad():
    # https://huggingface.co/docs/transformers/v4.50.0/ja/model_doc/bert#transformers.BertForSequenceClassification.forward
    test_output_before = nn_model(input_ids=test_token_tensor,
                                  attention_mask=test_attention_mask_tensor)
test_output_before_array = test_output_before.logits.to("cpu").detach().numpy()  # 出力結果の属性logitsに行列もしくはテンソルがあって、それをcpu処理にして、gradを外して、numpyにする
test_predict_before = np.argmax(a=test_output_before_array,
                                axis=1)  # 予測したラベル
print("推論結果：", test_predict_before)
test_answer = np.array(tokenized_dataset_dict["test"]["label"])  # 正解ラベル
print("正解：", test_answer)
test_accuracy_result_before = metrics.accuracy_score(y_true=test_answer,
                                                     y_pred=test_predict_before)
print("Accuracy：", test_accuracy_result_before)

**FineTuning**

In [None]:
# 評価関数を算出する関数(transformers.Trainerクラスをインスタンス化する時に使う)
def compute_metric(pred):  # 引数predにはEvalPredictionオブジェクトが入る
    answer = pred.label_ids  # EvalPredictionオブジェクト内のlabel_ids属性を取得(正解データの正解ラベルになる)
    predict = pred.predictions.argmax(-1)  # EvalPredictionオブジェクト内のprediction属性でargmax(-1)を取得(推測データの推測結果になる)
    precision_value, recall_value, f1_value, _ = metrics.precision_recall_fscore_support(y_true=answer,
                                                                                         y_pred=predict,
                                                                                         average="weighted")
    accuracy_value = metrics.accuracy_score(y_true=answer,
                                            y_pred=predict)
    return {"accuracy": accuracy_value,
            "recall": recall_value,
            "precision": precision_value,
            "f1": f1_value}

In [None]:
# 学習時のパラメータを設定(transformers.Trainerクラスをインスタンス化する時に使う)
minibatch_size = 32
training_args = transformers.TrainingArguments(output_dir=".",  # ファイルの出力先
                                               run_name="training_for_BERT_finetuning",
                                               do_train=True,
                                               do_eval=True,
                                               eval_strategy="epoch",  # 評価関数を計算するタイミング
                                               logging_steps=tokenized_dataset_dict["train"].num_rows//minibatch_size,  # ログを出すタイミング
                                               log_level="error",  # ログレベル
                                               save_strategy="no",  # チェックポイントを保存するタイミング
                                               save_total_limit=None,  # チェックポイントを保存する数
                                               label_names=["label"],  # 正解ラベルのカラム名
                                               learning_rate=2e-5,  # 学習率
                                               weight_decay=0.01,  # 減衰率
                                               per_device_train_batch_size=minibatch_size,  # 学習の時のミニバッチ数
                                               per_device_eval_batch_size=minibatch_size,  # 検証の時のミニバッチ数
                                               fp16=True,  # 16ビット計算
                                               num_train_epochs=3,  # 学習エポック数
                                               report_to="none")  # trainメソッド時のYou must call wandb.init() before wandb.log()を防ぐため

In [None]:
# 学習のクラスでインスタンスを作成
nn_model_trainer = transformers.Trainer(model=nn_model,
                                        args=training_args,
                                        compute_metrics=compute_metric,
                                        train_dataset=tokenized_dataset_dict["train"],
                                        eval_dataset=tokenized_dataset_dict["validation"],
                                        processing_class=tokenizer)

In [None]:
# trainメソッドでHuggingFaceのAPIキーを求められないように事前にログイン
login_success = login(token=HuggingFace_API_KEY)

In [None]:
# 学習
nn_model_trainer.train()

In [None]:
# 検証データ(validデータ)でのaccuracyを確認
valid_output = nn_model_trainer.predict(test_dataset=tokenized_dataset_dict["validation"])  # validデータで推論
valid_predict = np.argmax(a=valid_output.predictions[1],
                          axis=1)  # 出力結果のpredictions属性のインデックス[1]に(時系列数, ラベル数)の行列がある
print("推論結果：", valid_predict)
valid_answer = np.array(tokenized_dataset_dict["validation"]["label"])
print("正解：", valid_answer)
valid_accuracy_result = metrics.accuracy_score(y_true=valid_answer,
                                               y_pred=valid_predict)
print("Accuracy：", valid_accuracy_result)

In [None]:
# 学習(Fine Tuning)したモデルを保存
id2value = {}  # 分類対象のカラム(目的変数)のIDtoVALUEを保存
value2id = {}  # 分類対象のカラム(目的変数)のVALUEtoIDを保存
for i in range(tokenized_dataset_dict["train"].features["label"].num_classes):
    id2value[i] = tokenized_dataset_dict["train"].features["label"].int2str(i)
    value2id[tokenized_dataset_dict["train"].features["label"].int2str(i)] = i
print("id2value：", id2value, sep="\n")
nn_model_trainer.model.config.id2label = id2value
print("value2id：", value2id, sep="\n")
nn_model_trainer.model.config.label2id = value2id
nn_model_trainer.save_model(output_dir=".")
nn_model_trainer.save_state()

**FineTuningを行った後での文章分類**

In [None]:
# 学習(Fine Tuning)したモデルをロード
fine_tuning_tokenizer = transformers.AutoTokenizer.from_pretrained(pretrained_model_name_or_path=".")
fine_tuning_nn_model = transformers.AutoModelForSequenceClassification.from_pretrained(pretrained_model_name_or_path=".").to(DEVICE)

In [None]:
# 推論(テストデータを対象にして予測)
test_token_tensor = torch.LongTensor(np.array(tokenized_dataset_dict["test"]["input_ids"])).to(DEVICE)
test_attention_mask_tensor = torch.LongTensor(np.array(tokenized_dataset_dict["test"]["attention_mask"])).to(DEVICE)
with torch.no_grad():
    # https://huggingface.co/docs/transformers/v4.50.0/ja/model_doc/bert#transformers.BertForSequenceClassification.forward
    test_output = fine_tuning_nn_model(input_ids=test_token_tensor,
                                       attention_mask=test_attention_mask_tensor)
test_output_array = test_output.logits.to("cpu").detach().numpy()  # 出力結果の属性logitsに行列もしくはテンソルがあって、それをcpu処理にして、gradを外して、numpyにする
test_predict = np.argmax(a=test_output_array,
                         axis=1)  # 予測したラベル
print("推論結果：", test_predict)
test_answer = np.array(tokenized_dataset_dict["test"]["label"])  # 正解ラベル
print("正解：", test_answer)
test_accuracy_result = metrics.accuracy_score(y_true=test_answer,
                                              y_pred=test_predict)
print("Accuracy：", test_accuracy_result)