<a href="https://colab.research.google.com/github/tomonari-masada/course2022-nlp/blob/main/12_interpreting_NLP.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# BERTによるテキスト分類を解釈する

* 参考資料
 * https://captum.ai/tutorials/IMDB_TorchText_Interpret
 * https://captum.ai/tutorials/Bert_SQUAD_Interpret

## 準備

* 次のセルは、多分、不要。だから、コメントアウトしてある。
 * 万が一、何かがインストールされていない、的なエラーが出たら、次のセルを実行してください。

In [None]:
#!apt install aptitude swig
#!aptitude install mecab libmecab-dev mecab-ipadic-utf8 git make curl xz-utils file -y

* 必要なパッケージをインストール。

In [None]:
!pip install mecab-python3
!pip install fugashi ipadic
!pip install transformers

## Captumのインストール

* Captum （カプタム） は機械学習モデルの解釈のためのライブラリ。

In [None]:
!pip install captum

## インポート

In [None]:
import re
import csv
import tarfile
import numpy as np
import pandas as pd

import torch
from torch.utils.data import Dataset, DataLoader, random_split
from transformers import AutoModel, AutoTokenizer, BertForSequenceClassification

from captum.attr import LayerIntegratedGradients, TokenReferenceBase, visualization

np.random.seed(0)
torch.manual_seed(0)

MODEL_NAME = "cl-tohoku/bert-base-japanese-whole-word-masking"

MODEL_PATH = "/content/drive/MyDrive/2022Courses/nlp/bert_for_classification.pt"

## ライブドアニュースコーパスの準備

In [None]:
!wget https://www.rondhuit.com/download/ldcc-20140209.tar.gz

* 自分のGoogleドライブに移動させておく。

In [None]:
!mv ldcc-20140209.tar.gz /content/drive/MyDrive/data/

In [None]:
DATASET_PATH = "/content/drive/MyDrive/data/ldcc-20140209.tar.gz"

## データの読み込みとクレンジング

* クレンジング後のデータを、csvファイルとして保存しておく。

In [None]:
csv_fname = "all_text.csv" 

def remove_brackets(inp):
  brackets_tail = re.compile('【[^】]*】$') #【と】で囲まれた文字列で末尾にあるもの
  brackets_head = re.compile('^【[^】]*】') #【と】で囲まれた文字列で先頭にあるもの
  return re.sub(brackets_head, '', re.sub(brackets_tail, '', inp))

def read_title(f):
  next(f) # URLをスキップ
  next(f) # タイムスタンプをスキップ
  title = next(f) # タイトルを取得
  return remove_brackets(title.decode('utf-8'))[:-1]

tf = tarfile.open(DATASET_PATH)
genre_fnames = {}
for ti in tf:
  if "LICENSE.txt" in ti.name: #ライセンスファイルはスキップ
    continue
  if len(ti.name.split('/')) < 3: #ディレクトリの深さのチェック
    continue
  if not ti.name.endswith(".txt"): #テキストファイル以外はスキップ
    continue
  genre = ti.name.split('/')[1] #ジャンルの取得
  if not genre in genre_fnames:
    genre_fnames[genre] = []
  genre_fnames[genre].append(ti.name)

with open(csv_fname, "w") as wf:
  writer = csv.writer(wf)
  for i, genre in enumerate(genre_fnames):
    for fname in genre_fnames[genre]:
      f = tf.extractfile(fname)
      title = read_title(f)
      row = [genre, i, title]
      writer.writerow(row)

### 分類先のクラスの確認

In [None]:
class_names = list(genre_fnames.keys())
print(class_names)

### データフレーム化

In [None]:
df = pd.read_csv("all_text.csv", header=None, names=['genre', 'label', 'sentence'])
df = df.dropna(how='any') # nanは落とす
print(f'num of files： {df.shape[0]}')

* ここでは、3つのクラスにデータを絞り込むことにする。
 * 時間節約のため。

In [None]:
class_names = ['sports-watch', 'movie-enter', 'it-life-hack']
df_new = df.query("genre in ['sports-watch', 'movie-enter', 'it-life-hack']")
print(f'num of files： {df_new.shape[0]}')
display(df_new.sample(10))

* labelインデックスの付け直し

In [None]:
def relabel(class_name):
  return class_names.index(class_name)

df_new = df.query("genre in ['sports-watch', 'movie-enter', 'it-life-hack']")
df_new["label"] = df_new["genre"].apply(relabel)
display(df_new.sample(10))

In [None]:
sentences = df_new.sentence.values
labels = df_new.label.values

## 事前学習済みBERTのトークナイザの準備

In [None]:
tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME)

* 試しにトークン化してみる。

In [None]:
print('text: ', sentences[0])
print('tokenized: ', tokenizer.tokenize(sentences[0]))
print('token IDs: ', tokenizer.convert_tokens_to_ids(tokenizer.tokenize(sentences[0])))

* special tokensを見てみる。

In [None]:
tokenizer.all_special_tokens

### テキストの最大長の調査

* 最大長でミニバッチの大きさを固定するため。

In [None]:
max_len = 0
longest_sentence = ""
for sentence in sentences:
  token_words = tokenizer.tokenize(sentence)
  if len(token_words) > max_len:
    max_len = len(token_words)
    longest_sentence = sentence
print(f"最大長 = {max_len}\n{longest_sentence}")

## データセット
* データセットにアクセスがあるたびにトークナイズする。

In [None]:
class MyDataset(Dataset):
  def __init__(self, texts, labels, tokenizer, max_len):
    super(MyDataset, self).__init__()
    self.texts = texts
    self.labels = labels
    self.tokenizer = tokenizer
    self.max_len = max_len

  def __len__(self):
    return len(self.labels)

  def encode(self, text):
    return self.tokenizer.encode_plus(
        text,
        add_special_tokens=True,
        max_length=self.max_len,
        padding='max_length',
        )

  def __getitem__(self, index):
    text = self.texts[index]
    encoded = self.encode(text)
    return (
        torch.tensor(encoded['input_ids']).long(),
        torch.tensor(encoded['attention_mask']).long(),
        self.labels[index],
        text,
    )

* 最大長に2を足しているのは`[CLS]`と`[SEP]`の分（下のセルを参照）

In [None]:
dataset = MyDataset(sentences, labels, tokenizer, max_len+2)
dataset[0]

### データセットの分割
* 訓練：検証：テスト = 8:1:1とした。

In [None]:
valid_size = len(dataset) // 10
test_size = valid_size
train_size = len(dataset) - valid_size - test_size

train_dataset, valid_dataset, test_dataset = random_split(dataset, [train_size, valid_size, test_size])

print(f"訓練データ数={train_size} 検証データ数={valid_size} テストデータ数={test_size}")

## データローダ

In [None]:
BATCH_SIZE = 8
train_dataloader = DataLoader(train_dataset, shuffle=True, batch_size=BATCH_SIZE)
valid_dataloader = DataLoader(valid_dataset, batch_size=BATCH_SIZE)
test_dataloader = DataLoader(test_dataset, batch_size=BATCH_SIZE)

## 事前学習済みBERTの準備
* これをfinetuneする。

In [None]:
model = BertForSequenceClassification.from_pretrained(
    MODEL_NAME,
    num_labels=len(class_names),
    output_attentions=True,
    output_hidden_states=True,
)

* 今回は、BERT本体のrequires_gradはFalseにし、分類用の全結合層だけをtrainingすることにする。
 * 単に説明のための時間を短縮したいからで、こうしないほうが分類性能は良くなる。
 * 参考情報: 全結合層だけtrainingしてから、全体のfinetuningをすると良いという話もある。 ( https://arxiv.org/abs/2202.10054 )

In [None]:
for param in model.base_model.parameters():
  param.requires_grad = False

## finetuningの実行

* GPUの設定

In [None]:
device = torch.device('cuda')
model = model.to(device)

* すでにfinetuneしたモデルがあるときは、以下のようにpathを指定して読み込む。

In [None]:
model.load_state_dict(torch.load(MODEL_PATH))

* すでにfinetuneしたモデルを使う場合は、以下のfinetuningのコードは動かさなくてよい。

* オプティマイザの準備

In [None]:
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

In [None]:
NUM_TRAIN_EPOCHS = 20

for epoch in range(1, NUM_TRAIN_EPOCHS+1):

  model.train()
  train_losses = []
  for batch in train_dataloader:
    ids = batch[0].to(device)
    mask = batch[1].to(device)
    labels = batch[2].to(device)
    loss = model(ids, mask, labels=labels).loss
    loss.backward()
    optimizer.step()
    optimizer.zero_grad()
    train_losses.append(loss.item())
  print(f"epoch {epoch} | train loss {sum(train_losses)/len(train_losses):.4f}", end=" ")

  model.eval()
  valid_losses = []
  for batch in valid_dataloader:
    ids = batch[0].to(device)
    mask = batch[1].to(device)
    labels = batch[2].to(device)
    with torch.no_grad():
      loss = model(ids, mask, labels=labels).loss
    valid_losses.append(loss.item())
  print(f"| valid loss {sum(valid_losses)/len(valid_losses):.4f}")

* モデルの保存

In [None]:
torch.save(model.state_dict(), MODEL_PATH)

## finetuningしたモデルの評価

* 正解率を求めるヘルパ関数の定義

In [None]:
def evaluation(dataloader):
  model.eval()
  n_correct_answers = 0
  n_instances = 0
  for batch in dataloader:
    ids = batch[0].to(device)
    mask = batch[1].to(device)
    labels = batch[2].to(device)
    with torch.no_grad():
      logits = model(ids, mask).logits
    predicted_class_id = logits.argmax(-1)
    n_correct_answers += (predicted_class_id == labels).sum()
    n_instances += len(labels)
  return n_correct_answers, n_instances

In [None]:
n_correct_answers, n_instances = evaluation(valid_dataloader)
print(f"classification accuracy={n_correct_answers / n_instances:.3f}")

## Captumを使う

### パディング用トークンのインデックスを取得
 * 何の情報も持たない(＝baselineとなる)トークン列を作るために、必要となる。

In [None]:
PAD_IND = tokenizer.pad_token_id
print(PAD_IND)

* パディング用トークンをリファレンストークンとして設定する。

In [None]:
token_reference = TokenReferenceBase(reference_token_idx=PAD_IND)

* 指定されたラベルの確率を返す関数を定義しておく。
 * 後で必要になるため。

In [None]:
def bert_forward_func(ids, mask, label):
  pred_probs = torch.softmax(model(ids, mask).logits, -1)
  return pred_probs[:,label]

* 定義した関数の動作を確認する。

In [None]:
ids, mask, true_class, text = test_dataset[0]
ids = ids.unsqueeze(0).to(device)
mask = mask.unsqueeze(0).to(device)
print(bert_forward_func(ids, mask, true_class))

### 解釈手法（Integrated Gradients）の準備
* それに対するパラメータの寄与を解釈したい値を返す関数を、指定する。
* また、attributionを算出したいパラメータを、指定する。

In [None]:
lig = LayerIntegratedGradients(bert_forward_func, model.bert.embeddings.word_embeddings)

### 可視化のヘルパ関数

* https://github.com/pytorch/captum/blob/master/captum/attr/_utils/visualization.py#L755

In [None]:
def add_attributions_to_visualizer(attributions, text, pred_prob, pred_class, true_class,
                                   attr_class, convergence_scores, vis_data_records):

  attributions = attributions.sum(dim=2).squeeze(0)
  attributions = attributions / torch.norm(attributions)
  attributions = attributions.cpu().detach().numpy()

  # storing couple samples in an array for visualization purposes
  vis_data_records.append(visualization.VisualizationDataRecord(
      attributions,
      pred_prob,
      pred_class,
      true_class,
      attr_class,
      attributions.sum(),
      text,
      convergence_scores))

### テキストの分類結果を解釈するためのヘルパ関数

In [None]:
def interpret_sentence(sentence, label):
  encoded = dataset.encode(sentence)

  indexed = encoded.input_ids
  seq_length = len(indexed)
  input_indices = torch.tensor(indexed).long()
  input_indices = input_indices.unsqueeze(0).to(device)

  mask = torch.tensor(encoded.attention_mask).long()
  mask = mask.unsqueeze(0).to(device)

  text = dataset.tokenizer.convert_ids_to_tokens(indexed)

  # generate reference indices for each sample
  reference_indices = token_reference.generate_reference(seq_length, device=device)
  reference_indices = reference_indices.unsqueeze(0).to(device)
  
  pred_probs = torch.softmax(model(input_indices, mask).logits, -1).squeeze()
  pred_ind = pred_probs.argmax().item()

  # compute attributions and approximation delta using layer integrated gradients
  attributions_ig, delta = lig.attribute(
      input_indices,
      reference_indices,
      additional_forward_args=(mask, pred_ind),
      return_convergence_delta=True,
      )

  add_attributions_to_visualizer(
    attributions_ig, 
    text, 
    pred_probs[pred_ind], 
    class_names[pred_ind], 
    class_names[label],
    class_names[pred_ind],
    delta, 
    vis_data_records_ig)

In [None]:
for param in model.parameters():
  param.requires_grad = False

In [None]:
vis_data_records_ig = []

In [None]:
ids, mask, true_class, text = test_dataset[0]
interpret_sentence(text, label=true_class)

In [None]:
_ = visualization.visualize_text(vis_data_records_ig)