ここでは、torchtextを使う代わりにHugging Faceが提供しているライブラリを用いて、感情分析タスクを行います。現在、torchtextのFieldクラスは非推奨となっており、最新バージョンでは使えないそうです。また、torchtext自体もこれ以上アップグレードされないそうですので、別のライブラリを使う必要があるかと存じます。なお、359ページ以降のみを実装していきます。

## 設定
Google ColabとGoogle Driveを用いて実装していきます。

In [None]:
# ディレクトリの設定
from google.colab import drive
drive.mount('/content/drive/')

root_path = '/content/drive/My Drive/your/dir/path/7_nlp_sentiment_transformer'
%cd $root_path

In [None]:
# ライブラリのインストール
!pip install gensim          # fastTextを読み込むために使います
!pip install datasets        # torchtextの代わりに使います
# !pip install transformers  # 必要に応じてインストールしてください。Colabだとプレインストールされています

## 分散表現の準備
ここでは分散表現をダウンロードします。単語ベクトルの扱いにはgensimライブラリを用います。

In [None]:
# 分散表現のダウンロード
import gensim
import torch
import torch.nn as nn

# file_path
vec_file = root_path + '/data/wiki-news-300d-1M.vec'

# gensimを使用して.vecファイルから単語ベクトルを読み込む
model = gensim.models.KeyedVectors.load_word2vec_format(vec_file)

# 単語ベクトルをPyTorchのテンソルに変換
fasttext_model = torch.FloatTensor(model.vectors)

In [None]:
# 分散表現の確認
print('1単語を表現する次元数: ', fasttext_model.shape[1])
print("単語数: ", fasttext_model.shape[0])

print(fasttext_model[model.key_to_index['king']])

print(fasttext_model.shape)

print(model.key_to_index)

## 前処理等
IMDb_train.tsv、IMDb_test.tsvをColab上で作成するとうまくいきませんでした。ローカルで作成したうえでDriveに保存することをお勧めします。

また、書籍通りに.tsvファイルを作成するとうまくいきませんでした。具体的には、余計な列が含まれてしまうので、それらを削除する必要がありました。その場合はpandasライブラリを用いることで簡単に削除できます。

以下の順番で実装していきます。

-前処理

-トークン化

-DataLoaderの作成

-ボキャブラリの作成

In [None]:
# 前処理、トークン化、DataLoaderの作成
# AutoTokenizerを用いています
# （たぶんもっと上手い実装の仕方があると思います。。。）

from datasets import load_dataset, DatasetDict
from transformers import AutoTokenizer, DataCollatorWithPadding
from torch.utils.data import DataLoader
import re

# 前処理とトークナイズを組み合わせた関数
def preprocess_and_tokenize(examples):

    # テキストの前処理
    examples['text'] = [re.sub('', '', t) for t in examples['text']]
    examples['text'] = [re.sub(r'[^.,a-zA-Z0-9\s]', ' ', t) for t in examples['text']]
    examples['text'] = [t.replace('.', ' . ').replace(',', ' , ') for t in examples['text']]

    # トークナイズとラベルの追加
    tokenized_inputs = tokenizer(examples['text'], padding='max_length', truncation=True, max_length=256)

    # 'labels' キーを追加
    tokenized_inputs['labels'] = examples['label']

    # 'token_type_ids' が不要な場合は削除
    if 'token_type_ids' in tokenized_inputs:
        del tokenized_inputs['token_type_ids']
    return tokenized_inputs

# 事前学習済みモデルに対応する AutoTokenizer のロード
tokenizer = AutoTokenizer.from_pretrained('bert-base-uncased')

# データセットの読み込み
tr_path = root_path + '/data/IMDb_train.tsv'   # Colab上で.tsvファイルを作成するとデータが欠損しました
ts_path = root_path + '/data/IMDb_test.tsv'

# データセットの読み込みと前処理
tr_val_ds = load_dataset('csv', data_files=tr_path, delimiter='\t', split='train', column_names=['text', 'label'])
ts_ds = load_dataset('csv', data_files=ts_path, delimiter='\t', split='train', column_names=['text', 'label'])

# 前処理とトークナイズの適用
tr_val_ds = tr_val_ds.map(preprocess_and_tokenize, batched=True)
ts_ds = ts_ds.map(preprocess_and_tokenize, batched=True)

# IDリストを作成
id_to_word = {v: k for k, v in tokenizer.vocab.items()}

# テキストとラベルのデータを保存
Text = tr_val_ds['text']
Label = tr_val_ds['label']
Text_ts = ts_ds['text']
Label_ts = ts_ds['label']

# 不要な列の削除
tr_val_ds = tr_val_ds.remove_columns(['text', 'label'])
ts_ds = ts_ds.remove_columns(['text', 'label'])

# DatasetDictを作成して分割
dataset_dict = DatasetDict({
    'train': tr_val_ds
})
train_test_split = dataset_dict['train'].train_test_split(test_size=0.2)

# 分割されたデータセットを取得
tr_ds = train_test_split['train']
val_ds = train_test_split['test']

# DataCollatorWithPadding のインスタンスを作成
data_collator = DataCollatorWithPadding(tokenizer=tokenizer)

tr_dl = DataLoader(tr_ds, shuffle=True, batch_size=8, collate_fn=data_collator)
val_dl = DataLoader(val_ds, batch_size=8, collate_fn=data_collator)
ts_dl = DataLoader(ts_ds, batch_size=8, collate_fn=data_collator)


In [None]:
# 動作確認
for batch in ts_dl:
    input_ids = batch['input_ids']
    attention_mask = batch['attention_mask']
    labels = batch['labels']
    print("Input IDs:", input_ids)
    print("Attention Mask:", attention_mask)
    print("Labels:", labels)
    break

In [None]:
# ボキャブラリの作成
from collections import Counter
from gensim.corpora import Dictionary

word_counts = Counter()

for example in tr_ds:
  words = [id_to_word[token_id] for token_id in example['input_ids']]
  word_counts.update(words)

gensim_dictionary = Dictionary()

for word in word_counts:
  gensim_dictionary.add_documents([[word]])

word_id = gensim_dictionary.token2id

## Neural Networkの設計
ここでは、データの取り出し方を使用しているライブラリに合わせて変更しています。また、現行のバージョンのPythonに合わせてコードを一部書き換えています。

In [None]:
import math
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F

In [None]:
class Embedder(nn.Module):
  def __init__(self, text_embedding_vectors):
    super().__init__()
    self.embeddings = nn.Embedding.from_pretrained(embeddings=fasttext_model, freeze=True)

  def forward(self, x):
    x_vec = self.embeddings(x)
    return x_vec

In [None]:
# 動作確認
batch = next(iter(tr_dl))

net1 = Embedder(fasttext_model)
x = batch['input_ids']
x1 = net1(x)

print('size of input tensor: ', x.shape)
print('size of output tensor: ', x1.shape)

In [None]:
class PositionalEncoder(nn.Module):
  def __init__(self, d_model=300, max_seq_len=256):
    super().__init__()
    self.d_model = d_model
    pe = torch.zeros(max_seq_len, d_model)

    device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
    pe = pe.to(device)

    for pos in range(max_seq_len):
      for i in range(0, d_model, 2):
        pe[pos, i] = math.sin(pos / (10000 ** ((2 * i) / d_model)))
        pe[pos, i + 1] = math.cos(pos / (10000 ** ((2 * i) / d_model)))

    self.pe = pe.unsqueeze(0)
    self.pe.requires_grad = False

  def forward(self, x):
    ret = math.sqrt(self.d_model) * x + self.pe
    return ret

In [None]:
# 動作確認
net1 = Embedder(fasttext_model)
net2 = PositionalEncoder(d_model=300, max_seq_len=256)

x = batch['input_ids']
x1 = net1(x)
x2 = net2(x1)

print("size of input tensor: ", x1.shape)
print(f"size of output tensor: {x2.shape}")

In [None]:
class Attention(nn.Module):
  def __init__(self, d_model=300):
    super().__init__()
    self.q_linear = nn.Linear(d_model, d_model)
    self.v_linear = nn.Linear(d_model, d_model)
    self.k_linear = nn.Linear(d_model, d_model)

    self.out = nn.Linear(d_model, d_model)
    self.d_k = d_model   # Attentionの大きさ調整の変数

  def forward(self, q, k, v, mask):
    k = self.k_linear(k)
    q = self.q_linear(q)
    v = self.v_linear(v)

    weights = torch.matmul(q, k.transpose(1, 2) / math.sqrt(self.d_k))

    mask = mask.unsqueeze(1)
    weights = weights.masked_fill(mask == 0, -1e9)

    normalized_weights = F.softmax(weights, dim=-1)
    output = torch.matmul(normalized_weights, v)
    output = self.out(output)

    return output, normalized_weights

In [None]:
class FeedForward(nn.Module):
  def __init__(self, d_model, d_ff=1024, dropout=0.1):
    super().__init__()
    self.linear_1 = nn.Linear(d_model, d_ff)
    self.dropout = nn.Dropout(dropout)
    self.linear_2 = nn.Linear(d_ff, d_model)

  def forward(self, x):
    x = self.linear_1(x)
    x = self.dropout(x)
    x = self.linear_2(x)

    return x

In [None]:
class TransformerBlock(nn.Module):
  def __init__(self, d_model, dropout=0.1):
    super().__init__()
    self.norm_1 = nn.LayerNorm(d_model)
    self.norm_2 = nn.LayerNorm(d_model)
    self.attn   = Attention(d_model)
    self.ff     = FeedForward(d_model)
    self.dropout_1 = nn.Dropout(dropout)
    self.dropout_2 = nn.Dropout(dropout)

  def forward(self, x, mask):
    x_normalized = self.norm_1(x)
    output, normalized_weights = self.attn(x_normalized, x_normalized, x_normalized, mask)

    x2 = x + self.dropout_1(output)
    x_normalized_2 = self.norm_2(x2)
    output = x2 + self.dropout_2(self.ff(x_normalized_2))

    return output, normalized_weights

In [None]:
# 動作確認
net1 = Embedder(fasttext_model)
net2 = PositionalEncoder(d_model=300, max_seq_len=256)
net3 = TransformerBlock(d_model=300)

x = batch['input_ids']
input_pad = 91                 # [PAD]のIDに注意
input_mask = (x != input_pad)
print(input_mask)

x1 = net1(x)
x2 = net2(x1)
x3, normalized_weights = net3(x2, input_mask)

print(f"size of input tensor: {x1.shape}")
print(f"size of output tensor: {x3.shape}")
print(f"size of Attention; {normalized_weights.shape}")

In [None]:
class Classification(nn.Module):
  def __init__(self, d_model, output_dim=2):
    super().__init__()
    self.linear = nn.Linear(d_model, output_dim)
    nn.init.normal_(self.linear.weight, std=0.02)
    nn.init.normal_(self.linear.bias, 0)

  def forward(self, x):
    x0 = x[:, 0, :]
    out = self.linear(x0)

    return out

In [None]:
class TransformerClassification(nn.Module):
  def __init__(self, text_embedding_vectors, d_model=300, max_seq_len=256, output_dim=2):
    super().__init__()

    self.net1 = Embedder(fasttext_model)
    self.net2 = PositionalEncoder(d_model=d_model, max_seq_len=max_seq_len)
    self.net3_1 = TransformerBlock(d_model=d_model)
    self.net3_2 = TransformerBlock(d_model=d_model)
    self.net4 = Classification(output_dim=output_dim, d_model=d_model)

  def forward(self, x, mask):
    x1 = self.net1(x)
    x2 = self.net2(x1)
    x3_1, normalized_weights_1 = self.net3_1(x2, mask)
    x3_2, normalized_weights_2 = self.net3_2(x3_1, mask)
    x4 = self.net4(x3_2)

    return x4, normalized_weights_1, normalized_weights_2



In [None]:
# 動作確認

batch = next(iter(tr_dl))

net = TransformerClassification(
    text_embedding_vectors=fasttext_model,
    d_model=300,
    max_seq_len=256,
    output_dim=2
)

x = batch['input_ids']
input_pad = 91
input_mask = (x != input_pad)
out, normalized_attention_weights_1, normalized_attention_weights_2 = net(x, input_mask)

print(f"size of output tensor: {out.shape}")
print(f"output of sigmoid function: {F.softmax(out)}")



## Transformerモデルの学習と評価
ここも、上と同じくデータの取り出し方を変更しています。

In [None]:
# Networkの初期化

# 学習・検証データの準備
dl_dict = {'train': tr_dl, 'val': val_dl}

# model構築
net = TransformerClassification(
    text_embedding_vectors=fasttext_model,
    d_model=300,
    max_seq_len=256,
    output_dim=2
)

# initialization
def weights_init(m):
  classname = m.__class__.__name__
  if classname.find('Linear') != -1:
    nn.init.kaiming_normal_(m.weight)
    if m.bias is not None:
      nn.init.constant_(m.bias, 0.0)

net.train()

net.net3_1.apply(weights_init)
net.net3_2.apply(weights_init)

print('Network initialized !!')


In [None]:
# ハイパーパラメータの設定
criterion = nn.CrossEntropyLoss()
LR = 2e-5
optimizer = optim.Adam(net.parameters(), lr=LR)

In [None]:
from tqdm import tqdm

def train_model(net, dl_dict, criterion, optimizer, num_epochs):
  device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
  compute_device = "GPU" if device.type == "cuda" else "CPU"
  print(f"This is {compute_device} trainer !!")
  print("-"*20, "Start", "-"*20)

  net.to(device)

  # network acceleration
  torch.backends.cudnn.benchmark = True

  for epoch in range(num_epochs):
    for phase in ['train', 'val']:
      if phase == 'train':
        net.train()
      else:
        net.eval()

      epoch_loss = 0.0
      epoch_corrects = 0

      for batch in tqdm(dl_dict[phase]):
        inputs = batch['input_ids'].to(device)
        labels = batch['labels'].to(device)

        optimizer.zero_grad()

        with torch.set_grad_enabled(phase == 'train'):
          # mask
          input_pad = 91
          input_mask = (inputs != input_pad)

          # to transformzer
          outputs, _, _ = net(inputs, input_mask)
          loss = criterion(outputs, labels)
          _, preds = torch.max(outputs, 1)

          if phase == 'train':
            loss.backward()
            optimizer.step()

          epoch_loss += loss.item() * inputs.size(0)
          epoch_corrects += torch.sum(preds == labels.data)

      epoch_loss = epoch_loss / len(dl_dict[phase].dataset)
      epoch_acc  = epoch_corrects.double() / len(dl_dict[phase].dataset)

      print('Epoch {}/{} | {:^5} | Loss: {:.4f} ACC: {:.4f}'.format(
          epoch + 1,
          num_epochs,
          phase,
          epoch_loss,
          epoch_acc
      ))

  return net



In [None]:
# 学習
num_epochs = 2
net_trained = train_model(net, dl_dict, criterion, optimizer, num_epochs=num_epochs)

In [None]:
#評価
device = torch.device("cuda0" if torch.cuda.is_available() else "cpu")
net_trained.eval()
net_trained.to(device)

epoch_corrects = 0
for batch in ts_dl:
  inputs = batch['input_ids'].to(device)
  labels = batch['labels'].to(device)

  with torch.set_grad_enabled(False):
    input_pad = 91
    input_mask = (inputs != input_pad)

    outputs, _, _ = net_trained(inputs, input_mask)
    _, preds = torch.max(outputs, 1)
    epoch_corrects += torch.sum(preds == labels.data)

epoch_acc = epoch_corrects.double() / len(ts_ds)
print('ACC with {} test data: {:.4f}'.format(len(ts_dl.dataset), epoch_acc))

In [None]:
epoch_acc = epoch_corrects.double() / len(ts_ds)
print('ACC with {} test data: {:.4f}'.format(len(ts_dl.dataset), epoch_acc))

## Attentionの可視化
ここもデータの取り出し方を変更しています。

In [None]:
# HTMLを作成する関数を実装

def highlight(word, attn):
    "Attentionの値が大きいと文字の背景が濃い赤になるhtmlを出力させる関数"

    html_color = '#%02X%02X%02X' % (
        255, int(255*(1 - attn)), int(255*(1 - attn)))
    return '<span style="background-color: {}"> {}</span>'.format(html_color, word)


def mk_html(index, ts_ds, preds, normlized_weights_1, normlized_weights_2):
    "HTMLデータを作成する"

    # indexの結果を抽出
    example  = ts_ds[index]  # データセットからサンプルを取得
    sentence = tokenizer.convert_ids_to_tokens(example['input_ids'])  # 文章
    label    = example['labels']  # ラベル
    pred     = preds[index]  # 予測

    # indexのAttentionを抽出と規格化
    attens1 = normlized_weights_1[index, 0, :]  # 0番目の<cls>のAttention
    attens1 /= attens1.max()

    attens2 = normlized_weights_2[index, 0, :]  # 0番目の<cls>のAttention
    attens2 /= attens2.max()

    # ラベルと予測結果を文字に置き換え
    if label == 0:
        label_str = "Negative"
    else:
        label_str = "Positive"

    if pred == 0:
        pred_str = "Negative"
    else:
        pred_str = "Positive"

    # 表示用のHTMLを作成する
    html = '正解ラベル：{}<br>推論ラベル：{}<br><br>'.format(label_str, pred_str)

    # 1段目のAttention
    html += '[TransformerBlockの1段目のAttentionを可視化]<br>'
    for word_id, attn in zip(example['input_ids'], attens1):
        word = tokenizer.convert_ids_to_tokens([word_id])[0]
        html += highlight(word, attn)
    html += "<br><br>"

    # 2段目のAttention
    html += '[TransformerBlockの1段目のAttentionを可視化]<br>'
    for word_id, attn in zip(example['input_ids'], attens2):
        word = tokenizer.convert_ids_to_tokens([word_id])[0]
        html += highlight(word, attn)
    html += "<br><br>"

    return html

In [None]:
from IPython.display import HTML

# Transformerで処理

# ミニバッチの用意
batch = next(iter(ts_dl))

# GPUが使えるならGPUにデータを送る
inputs = batch['input_ids'].to(device)  # 文章
labels = batch['labels'].to(device)  # ラベル

# mask作成
input_pad = 91  # 単語のIDにおいて、'<pad>': 1 なので
input_mask = (inputs != input_pad)

# Transformerに入力
outputs, normlized_weights_1, normlized_weights_2 = net_trained(
    inputs, input_mask)
_, preds = torch.max(outputs, 1)  # ラベルを予測


index = 7  # 出力させたいデータ
html_output = mk_html(index, ts_ds, preds, normlized_weights_1, normlized_weights_2)  # HTML作成
HTML(html_output)  # HTML形式で出力

以上