In [None]:
# ====================
# ライブラリのインストール
# ====================

! pip install pytorch-lightning==1.8.0
! pip install transformers==4.24.0
! pip install sentencepiece fugashi ipadic
! pip install datasets
! pip install pytorch-lightning
! pip install pyknp
! sudo apt-get update
! sudo apt-get install jumanpp

In [None]:
# ====================
# ライブラリの読み込み
# ====================

import glob
import random
from tqdm import tqdm

import torch
import pytorch_lightning as pl
from torch.utils.data import DataLoader
from transformers import BertJapaneseTokenizer
from transformers import RobertaForSequenceClassification
from datasets import load_dataset

#rinna,早稲田
from transformers import AutoTokenizer, AutoModelForMaskedLM, AutoModelForSequenceClassification


In [None]:
dataset = load_dataset(
    "shunk031/livedoor-news-corpus",
    train_ratio=0.8,
    val_ratio=0.1,
    test_ratio=0.1,
    random_state=42,
    shuffle=True,
)

print(dataset)

In [None]:
# ====================
# 前処理
# ====================

# RoBERTa
#rinna
#model_name = "rinna/japanese-roberta-base"
#tokenizer.do_lower_case = True  # due to some bug of tokenizer config loading
#早稲田
#model_name = "nlp-waseda/roberta-base-japanese-with-auto-jumanpp"
#京大 base
#model_name = 'ku-nlp/roberta-base-japanese-char-wwm'
#京大 large
#model_name = 'ku-nlp/roberta-large-japanese-char-wwm'
#Megagon Labs RoBERTa
#model_name = "megagonlabs/roberta-long-japanese"
#ACCMS RoBERTa
model_name = 'ku-accms/roberta-base-japanese-ssuw'
tokenizer = AutoTokenizer.from_pretrained(model_name, use_fast=False)
model = AutoModelForMaskedLM.from_pretrained(model_name)

# DeBERTa
#京大v2base
#model_name = 'ku-nlp/deberta-v2-base-japanese'
#京大v2large (使えない)
#model_name = 'ku-nlp/deberta-v2-large-japanese'
#京大v2base バッチサイズ16で行ける
#model_name = 'ku-nlp/deberta-v3-base-japanese'
#東大v2 base
model_name = "izumi-lab/deberta-v2-base-japanese"
tokenizer = AutoTokenizer.from_pretrained(model_name)
model = AutoModelForSequenceClassification.from_pretrained(model_name)

# LUKE
model_name = "studio-ousia/luke-japanese-base-lite"
tokenizer = AutoTokenizer.from_pretrained(model_name)
model = AutoModelForSequenceClassification.from_pretrained(model_name)

# BigBird
model_name = "nlp-waseda/bigbird-base-japanese"
tokenizer = AutoTokenizer.from_pretrained(model_name)
model = AutoModelForSequenceClassification.from_pretrained(model_name)



# 最大文長の設定
max_length = 128

def make_dataset(tokenizer, max_length, texts, labels):
    dataset_for_loader = list()

    for text, label in zip(texts, labels):
        # テキストをトークンに分割する。ただし、最大文長は "max_length" で指定したトークン数である。
        # 最大文長より短い文については、 "[PAD]" などの特殊トークンで残りの長さを埋める。
        # 最大文長を超える文については、はみ出す部分を無視する。
        encoding = tokenizer(text, max_length=max_length, padding="max_length", truncation=True)

        # tokenizerメソッドは辞書を返す。その辞書にラベルのIDも持たせる。
        encoding["labels"] = label

        # テンソルに変換
        encoding = {key: torch.tensor(value) for key, value in encoding.items()}

        # 前処理済みのデータを保存して次の文へ
        dataset_for_loader.append(encoding)
    return dataset_for_loader

dataset_train = make_dataset(tokenizer, max_length, [dataset["train"][i]["title"] for i in range(len(dataset["train"]))], [dataset["train"][i]["category"] for i in range(len(dataset["train"]))])
dataset_val = make_dataset(tokenizer, max_length, [dataset["validation"][i]["title"] for i in range(len(dataset["validation"]))], [dataset["validation"][i]["category"] for i in range(len(dataset["validation"]))])
dataset_test = make_dataset(tokenizer, max_length, [dataset["test"][i]["title"] for i in range(len(dataset["test"]))], [dataset["test"][i]["category"] for i in range(len(dataset["test"]))])

# データローダを作成。訓練用データはシャッフルしながら使う。
# 検証用と評価用は損失の勾配を計算する必要がないため、バッチサイズを大きめにとれる。
dataloader_train = DataLoader(dataset_train, batch_size=32, shuffle=True)
dataloader_val = DataLoader(dataset_val, batch_size=256, shuffle=False)
dataloader_test = DataLoader(dataset_test, batch_size=256, shuffle=False)

In [None]:
# ====================
# RoBERTaによるテキスト分類
# ====================

class RoBertaClassification(pl.LightningModule):

    # モデルの読み込みなど。損失関数は自動的に設定される。
    # num_labels == 1 -> 回帰タスクなので MSELoss()
    # num_labels > 1 -> 分類タスクなので CrossEntropyLoss()
    def __init__(self, model_name, num_labels, lr):
        super().__init__()
        self.save_hyperparameters()    # num_labelsとlrを保存する。例えば、self.hparams.lrでlrにアクセスできる。
        self.bert_sc = RobertaForSequenceClassification.from_pretrained(model_name, num_labels=num_labels)

    # 訓練用データのバッチを受け取って損失を計算
    def training_step(self, batch, batch_idx):
        output = self.bert_sc(**batch)
        loss = output.loss
        self.log("train_loss", loss)
        return loss

    # 検証用データのバッチを受け取って損失を計算
    def validation_step(self, batch, batch_idx):
        output = self.bert_sc(**batch)
        val_loss = output.loss
        self.log("val_loss", val_loss)

    # 評価用データのバッチを受け取って分類の正解率を計算
    def test_step(self, batch, batch_idx):
        # ラベルの推定
        output = self.bert_sc(**batch)
        labels_predicted = output.logits.argmax(-1)
        # 正解率の計算
        labels = batch.pop("labels")
        num_correct = (labels_predicted == labels).sum().item()
        accuracy = num_correct / labels.size(0)
        self.log("accuracy", accuracy)

    # 最適化手法を設定
    def configure_optimizers(self):
        return torch.optim.Adam(self.parameters(), lr=self.hparams.lr)

In [None]:
# ====================
# DeBERTaによるテキスト分類
# ====================

class DebertaClassifier(pl.LightningModule):

    # モデルの読み込みなど。損失関数は自動的に設定される。
    # num_labels == 1 -> 回帰タスクなので MSELoss()
    # num_labels > 1 -> 分類タスクなので CrossEntropyLoss()
    def __init__(self, model):
        super(DebertaClassifier, self).__init__()
        self.model = model
        self.bert_sc = AutoModelForSequenceClassification.from_pretrained(model_name, num_labels=9)

    # 訓練用データのバッチを受け取って損失を計算
    def training_step(self, batch, batch_idx):
        output = self.bert_sc(**batch)
        loss = output.loss
        self.log("train_loss", loss)
        return loss

    # 検証用データのバッチを受け取って損失を計算
    def validation_step(self, batch, batch_idx):
        output = self.bert_sc(**batch)
        val_loss = output.loss
        self.log("val_loss", val_loss)

    # 評価用データのバッチを受け取って分類の正解率を計算
    def test_step(self, batch, batch_idx):
        # ラベルの推定
        output = self.bert_sc(**batch)
        labels_predicted = output.logits.argmax(-1)
        # 正解率の計算
        labels = batch.pop("labels")
        num_correct = (labels_predicted == labels).sum().item()
        accuracy = num_correct / labels.size(0)
        self.log("accuracy", accuracy)

    # 最適化手法を設定
    def configure_optimizers(self):
        return torch.optim.Adam(self.model.parameters(), lr=1e-5)

In [None]:
# ====================
# LUKEによるテキスト分類
# ====================

class LUKEClassifier(pl.LightningModule):

    # モデルの読み込みなど。損失関数は自動的に設定される。
    # num_labels == 1 -> 回帰タスクなので MSELoss()
    # num_labels > 1 -> 分類タスクなので CrossEntropyLoss()
    def __init__(self, model):
        super(LUKEClassifier, self).__init__()
        self.model = model
        self.bert_sc = AutoModelForSequenceClassification.from_pretrained(model_name, num_labels=9)

    # 訓練用データのバッチを受け取って損失を計算
    def training_step(self, batch, batch_idx):
        output = self.bert_sc(**batch)
        loss = output.loss
        self.log("train_loss", loss)
        return loss

    # 検証用データのバッチを受け取って損失を計算
    def validation_step(self, batch, batch_idx):
        output = self.bert_sc(**batch)
        val_loss = output.loss
        self.log("val_loss", val_loss)

    # 評価用データのバッチを受け取って分類の正解率を計算
    def test_step(self, batch, batch_idx):
        # ラベルの推定
        output = self.bert_sc(**batch)
        labels_predicted = output.logits.argmax(-1)
        # 正解率の計算
        labels = batch.pop("labels")
        num_correct = (labels_predicted == labels).sum().item()
        accuracy = num_correct / labels.size(0)
        self.log("accuracy", accuracy)

    # 最適化手法を設定
    def configure_optimizers(self):
        return torch.optim.Adam(self.model.parameters(), lr=1e-5)

In [None]:
# ====================
# BigBirdによるテキスト分類
# ====================

class LUKEClassifier(pl.LightningModule):

    # モデルの読み込みなど。損失関数は自動的に設定される。
    # num_labels == 1 -> 回帰タスクなので MSELoss()
    # num_labels > 1 -> 分類タスクなので CrossEntropyLoss()
    def __init__(self, model):
        super(LUKEClassifier, self).__init__()
        self.model = model
        self.bert_sc = AutoModelForSequenceClassification.from_pretrained(model_name, num_labels=9)

    # 訓練用データのバッチを受け取って損失を計算
    def training_step(self, batch, batch_idx):
        output = self.bert_sc(**batch)
        loss = output.loss
        self.log("train_loss", loss)
        return loss

    # 検証用データのバッチを受け取って損失を計算
    def validation_step(self, batch, batch_idx):
        output = self.bert_sc(**batch)
        val_loss = output.loss
        self.log("val_loss", val_loss)

    # 評価用データのバッチを受け取って分類の正解率を計算
    def test_step(self, batch, batch_idx):
        # ラベルの推定
        output = self.bert_sc(**batch)
        labels_predicted = output.logits.argmax(-1)
        # 正解率の計算
        labels = batch.pop("labels")
        num_correct = (labels_predicted == labels).sum().item()
        accuracy = num_correct / labels.size(0)
        self.log("accuracy", accuracy)

    # 最適化手法を設定
    def configure_optimizers(self):
        return torch.optim.Adam(self.model.parameters(), lr=1e-5)

In [None]:
# ====================
# 訓練 RoBERTa
# ====================

model = RoBertaClassification(model_name, num_labels=9, lr=1e-5)

# 訓練中にモデルを保存するための設定
checkpoint = pl.callbacks.ModelCheckpoint(
    # 検証用データにおける損失が最も小さいモデルを保存する
    monitor="val_loss", mode="min", save_top_k=1,
    # モデルファイル（重みのみ）を "model" というディレクトリに保存する
    save_weights_only=True, dirpath="model/"
)

# 訓練
trainer = pl.Trainer(devices=1, max_epochs=3, callbacks=[checkpoint])
trainer.fit(model, dataloader_train, dataloader_val)

# ベストモデルの確認
print("ベストモデル: ", checkpoint.best_model_path)
print("ベストモデルの検証用データにおける損失: ", checkpoint.best_model_score)

In [None]:
# ====================
# 訓練 DeBERTa
# ====================

model = AutoModelForSequenceClassification.from_pretrained(model_name, num_labels=9)
deberta_classifier = DebertaClassifier(model)

# 訓練中にモデルを保存するための設定
checkpoint = pl.callbacks.ModelCheckpoint(
    # 検証用データにおける損失が最も小さいモデルを保存する
    monitor="val_loss", mode="min", save_top_k=1,
    # モデルファイル（重みのみ）を "model" というディレクトリに保存する
    save_weights_only=True, dirpath="model/"
)

# 訓練
trainer = pl.Trainer(devices=1, max_epochs=3)
trainer.fit(deberta_classifier, dataloader_train, dataloader_val)
# ベストモデルの確認
print("ベストモデル: ", checkpoint.best_model_path)
print("ベストモデルの検証用データにおける損失: ", checkpoint.best_model_score)

In [None]:
# ====================
# 訓練 LUKE
# ====================

model = AutoModelForSequenceClassification.from_pretrained(model_name, num_labels=9)
deberta_classifier = LUKEClassifier(model)

# 訓練中にモデルを保存するための設定
checkpoint = pl.callbacks.ModelCheckpoint(
    # 検証用データにおける損失が最も小さいモデルを保存する
    monitor="val_loss", mode="min", save_top_k=1,
    # モデルファイル（重みのみ）を "model" というディレクトリに保存する
    save_weights_only=True, dirpath="model/"
)

# 訓練
trainer = pl.Trainer(devices=1, max_epochs=3)
trainer.fit(deberta_classifier, dataloader_train, dataloader_val)
# ベストモデルの確認
print("ベストモデル: ", checkpoint.best_model_path)
print("ベストモデルの検証用データにおける損失: ", checkpoint.best_model_score)

In [None]:
# ====================
# 訓練 BigBird
# ====================

model = AutoModelForSequenceClassification.from_pretrained(model_name, num_labels=9)
deberta_classifier = LUKEClassifier(model)

# 訓練中にモデルを保存するための設定
checkpoint = pl.callbacks.ModelCheckpoint(
    # 検証用データにおける損失が最も小さいモデルを保存する
    monitor="val_loss", mode="min", save_top_k=1,
    # モデルファイル（重みのみ）を "model" というディレクトリに保存する
    save_weights_only=True, dirpath="model/"
)

# 訓練
trainer = pl.Trainer(devices=1, max_epochs=3)
trainer.fit(deberta_classifier, dataloader_train, dataloader_val)
# ベストモデルの確認
print("ベストモデル: ", checkpoint.best_model_path)
print("ベストモデルの検証用データにおける損失: ", checkpoint.best_model_score)

In [None]:
# ====================
# 評価
# ====================

trainer.test(dataloaders=dataloader_test)

In [None]:
# ====================
# 確認
# ====================

text = [dataset["test"][i]["title"] for i in range(len(dataset["test"]))]
gold = [dataset["test"][i]["category"] for i in range(len(dataset["test"]))]
label = ['movie-enter', 'it-life-hack', 'kaden-channel', 'topic-news', 'livedoor-homme', 'peachy', 'sports-watch', 'dokujo-tsushin', 'smax']

with torch.no_grad():
    preds = list()
    for batch in dataloader_test:
        output = model.bert_sc(**batch)
        labels_predicted = output.logits.argmax(-1)
        preds.append(labels_predicted)
    preds = torch.cat(preds)
    pred_label = [label.item() for label in preds]

for i in range(10):
    print(text[i])
    print("pred: %s\tgold: %s\n" % (label[pred_label[i]], label[gold[i]]))