In [2]:
import pandas as pd
pd.options.display.max_colwidth = 30

In [3]:
import random
import numpy as np
import torch


def torch_fix_seed(seed=0):
    # Python random
    random.seed(seed)
    # Numpy
    np.random.seed(seed)
    # Pytorch
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.backends.cudnn.deterministic = True
    torch.use_deterministic_algorithms = True


torch_fix_seed()

In [4]:
import gensim
import pandas as pd

# Word2Vecモデルのロード
# gensimで学習済みモデルを読み込む
word2vec_model = gensim.models.KeyedVectors.load_word2vec_format(
    "../data/word2vec_ja/jawiki.word_vectors.100d.txt", binary=False
)

# 重みを取得
weights = torch.FloatTensor(word2vec_model.vectors)

In [5]:
import torch.nn as nn


class SelfAttentionModel(nn.Module):
    def __init__(self, vocab_size, embedding_dim, output_dim, weights):
        super(SelfAttentionModel, self).__init__()

        # Embedding層の作成
        self.embedding = nn.Embedding(vocab_size, embedding_dim, _weight=weights)

        # Embedding層の重みをfreeze
        self.embedding.weight.requires_grad = False

        # Self-Attention層の作成
        self.self_attention = nn.MultiheadAttention(
            embed_dim=embedding_dim, num_heads=1, batch_first=True
        )

        # 線形層の作成
        self.fc = nn.Linear(embedding_dim, output_dim)

    def forward(self, x):
        embedded = self.embedding(x)

        # Self-Attentionの入力は(batch_size, seq_len, embedding_dim)である必要がある
        # embeddedは(batch_size, seq_len, embedding_dim)の形状を持っていると仮定
        attention_output, _ = self.self_attention(embedded, embedded, embedded)

        attention_output_mean = (attention_output + embedded).mean(dim=1)

        # Self-Attentionの出力を最終的な出力に変換するために、
        # seq_lenの最後のベクトルを使用して線形層に渡します。
        # attention_outputは(batch_size, seq_len, embedding_dim)の形状を持っています。
        # ここでは、シーケンスの最後のベクトルを使用します。
        output = self.fc(attention_output_mean)
        return output


# パラメータ
vocab_size = len(word2vec_model.index_to_key)  # Word2Vecの語彙サイズ
embedding_dim = 100
hidden_dim = 32

In [6]:
import math
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from sklearn.model_selection import train_test_split

import MeCab

mecab = MeCab.Tagger(
    "-O wakati -d /usr/lib/x86_64-linux-gnu/mecab/dic/mecab-ipadic-neologd"
)


# テキストをIDのシーケンスに変換する関数
def text_to_sequence(text, word2vec_model):
    return [
        word2vec_model.key_to_index.get(word, 0) for word in mecab.parse(text).split()
    ]


# データセットクラス
class TextDataset(Dataset):
    def __init__(self, texts, labels, word2vec_model, max_length):
        self.texts = [text_to_sequence(text, word2vec_model) for text in texts]
        self.labels = labels
        self.max_length = max_length

    def __len__(self):
        return len(self.texts)

    def __getitem__(self, idx):
        # パディングの追加
        text = self.texts[idx]
        if len(text) < self.max_length:
            text += [0] * (self.max_length - len(text))
        text = text[: self.max_length]
        return torch.tensor(text, dtype=torch.long), torch.tensor(
            self.labels[idx], dtype=torch.float
        )

In [7]:
train_df = pd.read_csv("../data/train.tsv", sep="\t")
valid_df = pd.read_csv("../data/valid.tsv", sep="\t")
test_df = pd.read_csv("../data/test.tsv", sep="\t")

# 文の最大長を決定
max_length = max(
    len(text_to_sequence(text, word2vec_model))
    for text in pd.concat([train_df["poem"], valid_df["poem"], test_df["poem"]])
)

# データセットの分割
train_dataset = TextDataset(
    train_df["poem"], train_df["label"], word2vec_model, max_length
)
valid_dataset = TextDataset(
    valid_df["poem"], valid_df["label"], word2vec_model, max_length
)
test_dataset = TextDataset(
    test_df["poem"], test_df["label"], word2vec_model, max_length
)

train_loader = DataLoader(train_dataset, batch_size=2, shuffle=True)
valid_loader = DataLoader(valid_dataset, batch_size=2)
test_loader = DataLoader(test_dataset, batch_size=2)

# モデル、損失関数、最適化手法の設定
model = SelfAttentionModel(vocab_size, embedding_dim, 1, weights)  # 出力は1次元
loss_function = nn.BCEWithLogitsLoss()  # 2値分類のための損失関数
optimizer = optim.Adam(filter(lambda p: p.requires_grad, model.parameters()), lr=0.0001)


# 訓練関数
def train(model, iterator, optimizer, loss_function):
    model.train()
    total_loss = 0

    for text, label in iterator:
        optimizer.zero_grad()
        predictions = model(text).squeeze(1)
        loss = loss_function(predictions, label)
        loss.backward()
        optimizer.step()
        total_loss += loss.item()

    return total_loss / len(iterator)


# 評価関数
def evaluate(model, iterator, loss_function):
    model.eval()
    total_loss = 0

    with torch.no_grad():
        for text, label in iterator:
            predictions = model(text).squeeze(1)
            loss = loss_function(predictions, label)
            total_loss += loss.item()

    return total_loss / len(iterator)


class EarlyStopping:
    def __init__(self, patience=5, min_delta=0):
        self.patience = patience
        self.min_delta = min_delta
        self.counter = 0
        self.best_loss = None
        self.early_stop = False
        self.best_model = None

    def __call__(self, val_loss, model):
        if self.best_loss is None:
            self.best_loss = val_loss
            self.best_model = model.state_dict()
        elif self.best_loss - val_loss > self.min_delta:
            self.best_loss = val_loss
            self.best_model = model.state_dict()
            self.counter = 0
        else:
            self.counter += 1
            if self.counter >= self.patience:
                self.early_stop = True


early_stopping = EarlyStopping(patience=10, min_delta=0.001)

# 訓練ループ
EPOCHS = 100

for epoch in range(EPOCHS):
    train_loss = train(model, train_loader, optimizer, loss_function)
    valid_loss = evaluate(model, valid_loader, loss_function)
    print(f"Epoch: {epoch+1:02}")
    print(f"\tTrain Loss: {train_loss:.3f} | Train PPL: {math.exp(train_loss):.3f}")
    print(f"\t Val. Loss: {valid_loss:.3f} |  Val. PPL: {math.exp(valid_loss):.3f}")

    early_stopping(valid_loss, model)
    if early_stopping.early_stop:
        print("Early stopping")
        model.load_state_dict(early_stopping.best_model)
        break

  from .autonotebook import tqdm as notebook_tqdm


Epoch: 01
	Train Loss: 0.635 | Train PPL: 1.886
	 Val. Loss: 0.603 |  Val. PPL: 1.827
Epoch: 02
	Train Loss: 0.571 | Train PPL: 1.769
	 Val. Loss: 0.582 |  Val. PPL: 1.790
Epoch: 03
	Train Loss: 0.557 | Train PPL: 1.745
	 Val. Loss: 0.574 |  Val. PPL: 1.776
Epoch: 04
	Train Loss: 0.546 | Train PPL: 1.726
	 Val. Loss: 0.553 |  Val. PPL: 1.739
Epoch: 05
	Train Loss: 0.525 | Train PPL: 1.690
	 Val. Loss: 0.514 |  Val. PPL: 1.673
Epoch: 06
	Train Loss: 0.481 | Train PPL: 1.617
	 Val. Loss: 0.453 |  Val. PPL: 1.572
Epoch: 07
	Train Loss: 0.412 | Train PPL: 1.510
	 Val. Loss: 0.380 |  Val. PPL: 1.462
Epoch: 08
	Train Loss: 0.341 | Train PPL: 1.406
	 Val. Loss: 0.324 |  Val. PPL: 1.382
Epoch: 09
	Train Loss: 0.290 | Train PPL: 1.336
	 Val. Loss: 0.289 |  Val. PPL: 1.334
Epoch: 10
	Train Loss: 0.262 | Train PPL: 1.299
	 Val. Loss: 0.265 |  Val. PPL: 1.304
Epoch: 11
	Train Loss: 0.241 | Train PPL: 1.273
	 Val. Loss: 0.253 |  Val. PPL: 1.288
Epoch: 12
	Train Loss: 0.226 | Train PPL: 1.253
	 Val.

In [8]:
all_predictions = []

with torch.no_grad():
    for inputs, _ in test_loader:  # "_"はラベルやターゲットを使わない場合
        outputs = model(inputs)

        # 予測結果をリストに追加
        all_predictions.extend(outputs.cpu().numpy())

In [9]:
import numpy as np
from sklearn.metrics import accuracy_score


pred_labels = (np.stack(all_predictions).flatten() > 0).astype(int)

accuracy_score(test_df["label"], pred_labels)

0.8333333333333334

In [10]:
# 単語ロジスティック回帰で誤っていた問題を解けていたか確認

test_df["pred"] = pred_labels
test_df[test_df["poem"].map(lambda x: "眼" in x)]

Unnamed: 0,poem,label,pred
3,時は常に背後から迫り唸りを上げて眼前に流れ去る踏み止...,0,0
15,ああおれたちは皆眼をあけたまま空を飛ぶ夢を見てるんだ,0,1
