gokartというパイプライン処理を実現するライブラリがある。

gokartの解説はこちら


gokartを用いて機械学習の処理をタスク単位で切り分けつつ、再利用性を高めたり、再実行の際にキャッシュを聞かせて素早く試行錯誤ができる状態を作りたい。

今回は、既に実現しているtransformersのモデルとTraierクラスを用いたfinetuneの処理を、可能な範囲でgokartのタスク化していく。

結果としては、学習結果のモデルの保存の都合を考えると、タスク化は部分的にとどめたほうがよいように感じた。

タスクの中身に関しては説明しないので、コードを読んでください。

そもそもここでの作業は、./finetune-with-trtansformers.ipynbの内容を可能な範囲でタスク化したものなので、そちらのnotebookを見ていれば多分分かるはず

In [1]:
import luigi
import gokart

まずは、livedoorのコーパスのダウンロードとpandas dataframe化のタスクを用意したので以下のように使う。

`data_task`はtaskのインスタンスであり、中身はまだ実行されてない。gokart.buildに渡すことで、タスクの中身が実行される。

In [2]:
from tasks.data.livedoor import LivedoorCourpusData
data_task = LivedoorCourpusData()

df = gokart.build(data_task)

BERTのconfigとモデルのクラスを読み込むタスク

In [3]:
from bert import LoadBertConfig, LoadBertForSequenceClassification

In [4]:
num_labels = len(df.category.unique())
model_dir = '/mnt/d/yoheikikuta-bert-japanese/'
base_ckpt = 'model.ckpt-1400000'    # 拡張子は含めない

num_labelsをdata_taskからbuildして取得しているが、これは後にConvertCategoryToLabelでnum_labelsを得るタスク定義したのでそっち使っても良かったかも

In [5]:
config_task = LoadBertConfig(num_labels=num_labels)
model_task = LoadBertForSequenceClassification(
    config_task=config_task,
    model_dir=model_dir,
    base_ckpt=base_ckpt,
)
model = gokart.build(model_task)

In [6]:
! ls {model_dir}

graph.pbtxt					    model.ckpt-1400000.meta
jawiki-20181220-pages-articles-multistream.xml.bz2  wiki-ja.model
model.ckpt-1400000.data-00000-of-00001		    wiki-ja.vocab
model.ckpt-1400000.index


In [7]:
DIR_BERT_KIKUTA = '/mnt/d/yoheikikuta-bert-japanese/'
BASE_SPM = 'wiki-ja.model'
BASE_VOCAB = 'wiki-ja.vocab'

In [None]:
ConvertCategoryToLabelは、data_taskを渡して、カテゴリ数を得るタスク

CreateDatasetは、data_taskを渡して、transformersのDatasetに変換するタスク

In [8]:
from tasks.data.convert import ConvertCategoryToLabel
from tasks.data.dataset import CreateDataset

In [9]:
task = ConvertCategoryToLabel(data_task=data_task, target_column='category')
dataset_task = CreateDataset(data_task=task)
gokart.build(dataset_task)



DatasetDict({
    train: Dataset({
        features: ['labels', 'text'],
        num_rows: 5525
    })
    test: Dataset({
        features: ['labels', 'text'],
        num_rows: 1842
    })
})

In [10]:
DIR_BERT_KIKUTA = '/mnt/d/yoheikikuta-bert-japanese/'
BASE_SPM = 'wiki-ja.model'
BASE_VOCAB = 'wiki-ja.vocab'

tokenizerを返すタスクが以下。まだ別のファイルに切り出してないだけ

（transformersに乗っかるなら、AutoTokenizer使って、タスクのパラメータとして文字列を受け取るといいかも？

In [12]:
from transformers import AlbertTokenizer

class LoadSentencepieceTokenizer(gokart.TaskOnKart):
    path = luigi.Parameter()
    def run(self):
        tokenizer = AlbertTokenizer(self.path)
        self.dump(tokenizer)

In [13]:
tokenizer_task=LoadSentencepieceTokenizer(path=DIR_BERT_KIKUTA + BASE_SPM)

Datasetをtokenizeするタスク

In [14]:
class TokenizeDataset(gokart.TaskOnKart):
    dataset_task = gokart.TaskInstanceParameter()
    config_task = gokart.TaskInstanceParameter()
    tokenizer_task = gokart.TaskInstanceParameter()

    def run(self):
        dataset = self.load("dataset_task")
        config = self.load("config_task")
        max_length = config.max_position_embeddings
        tokenizer = self.load("tokenizer_task")
        
        def tokenize(examples):
            input_ids = tokenizer(examples["text"], max_length=max_length, padding="max_length",truncation=True,).input_ids
            return {
                "input_ids": input_ids,
                "labels": examples['labels']
            }

        
        tokenized = dataset.map(tokenize, batched=True, remove_columns=['text'])
        tokenized.set_format(type='torch', columns=['input_ids', 'labels'])
        self.dump(tokenized)

In [15]:
task = TokenizeDataset(dataset_task=dataset_task, config_task=config_task, tokenizer_task=tokenizer_task)
tokenized_dataset = gokart.build(task)

- データの取得
- モデルの設定
- モデル本体
- Dataset
- トークナイザ
- Datasetのトークナイズ



といった作業が、タスク単位で切り出された。異なる事前学習モデルを使うときは、タスク単位でモデルやトークナイザーを切り替えれば良い。

今回は処理のながれをnotebook上でそのまま書いたが、タスクの読み込みとインスタンス化、パラメータとしての受け渡しを一つの関数としてまとめつつ、適切な抽象度でパラメータやタスクの切り替えをできるような形にすると良いと思われる

これは、gokartを使わずにnotebook直書きの頃よりも、全体の見通しがしやすく、コードの再利用がしやすいように思える。

## 学習

できれば学習もタスク化したかったが、transformersのモデルの出力はフォルダ単位であり、これがgokartとは相性が悪いので諦めた。
modelをそのままpklとしてdumpしちゃえばいいのはそれはそうであるが、なんか気持ち悪いので一旦やめておく。
pklで気にせず保存して、gokart.buildで読み込みつつsave_pretrainedすればいつでもtransformersの形で保存しなおせはするのだが、ディスクに同じ内容のモデルがpklとpytorchのcheckpointの２種類で保存されることになり、容量を食いすぎる

gokartで独自のフォルダをいい感じにdumpする方法は、ソースを見た感じ自分でも作れそうだったが、ちょっとだるいので一旦飛ばす。




学習に必要なdatasetを得るところまでがタスク化されてるだけでも、中間ファイルのキャッシュが効いて再実行が楽にできるので、部分的な導入も試行錯誤の速さには貢献している

特定のパラメータで、どういうモデルが出力されたのか、というところはnotebook上で手書きで我慢する。

In [16]:
from transformers import Trainer, TrainingArguments

# モデルの準備

# Trainerのパラメータの準備
training_args = TrainingArguments(
    output_dir='./results2',          # 出力フォルダ
    num_train_epochs=10,              # エポック数
    per_device_train_batch_size=4,  # 訓練のバッチサイズ
    per_device_eval_batch_size=4,   # 評価のバッチサイズ
    warmup_ratio=0.1,
    weight_decay=0.01,               # 重み減衰の強さ
    logging_dir='./logs',            # ログ保存フォルダ
    eval_accumulation_steps=1,
    save_strategy="no",

)


In [17]:
import numpy as np
from datasets import load_metric

f1 = load_metric("f1")
acc = load_metric("accuracy")
precision = load_metric("precision")
recall = load_metric("recall")

def compute_metrics(eval_pred):
    logits, labels = eval_pred
    predictions = np.argmax(logits, axis=-1)
    params = dict(predictions=predictions, references=labels,)
    return { 
        **acc.compute(**params),
        **f1.compute(**params, average="weighted"),
        **precision.compute(**params, average="weighted"),
        **recall.compute(**params, average="weighted"),
       }

In [18]:
from transformers import Trainer
# Trainerの準備
trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=tokenized_dataset['train'],     # 訓練データセット
    eval_dataset=tokenized_dataset['test'],        # 評価データセット
    compute_metrics=compute_metrics,
)

In [19]:
trainer.train()

***** Running training *****
  Num examples = 5525
  Num Epochs = 10
  Instantaneous batch size per device = 4
  Total train batch size (w. parallel, distributed & accumulation) = 4
  Gradient Accumulation steps = 1
  Total optimization steps = 13820


Step,Training Loss
500,1.4553
1000,0.4558
1500,0.4663
2000,0.3786
2500,0.2889
3000,0.2342
3500,0.186
4000,0.1702
4500,0.1435
5000,0.129




Training completed. Do not forget to share your model on huggingface.co/models =)




TrainOutput(global_step=13820, training_loss=0.16138851144883443, metrics={'train_runtime': 5732.7766, 'train_samples_per_second': 9.638, 'train_steps_per_second': 2.411, 'total_flos': 1.4537799454464e+16, 'train_loss': 0.16138851144883443, 'epoch': 10.0})

In [20]:
trainer.evaluate()

***** Running Evaluation *****
  Num examples = 1842
  Batch size = 4


{'eval_loss': 0.5032410025596619,
 'eval_accuracy': 0.9462540716612378,
 'eval_f1': 0.9461223435888861,
 'eval_precision': 0.9470629203965109,
 'eval_recall': 0.9462540716612378,
 'eval_runtime': 42.1204,
 'eval_samples_per_second': 43.732,
 'eval_steps_per_second': 10.945,
 'epoch': 10.0}

In [21]:
trainer.save_model()

Saving model checkpoint to ./results2
Configuration saved in ./results2/config.json
Model weights saved in ./results2/pytorch_model.bin


In [23]:
trainer.save_state()