# プロダクト開発試験

## テーマ設定

プロダクト開発試験として、課題番号**004**の**文章をカテゴリー分類するモデルの作成**に取り組みました。

データセットとして**ライブドアニュースコーパス**を使用し、単語分散表現・GRUとCNN、Attentionを組み合わせてより良いモデルを作成しました。

## Abstract

## 環境構築

### ライブラリのインストール
必要なライブラリをインストールし、実行環境のバージョンを統一します。

In [None]:
import sys

# Google colab環境であるか判定
if "google.colab" in sys.modules:
    # ライブラリのインストール
    %pip install --no-warn-conflicts torch==2.1.1 torchvision==0.16.1 nltk==3.8.1 janome==0.5.0 numpy
else:
    print("Not Google Colab")

### ドライブのマウント

In [None]:
# Google colab環境であるか判定
if "google.colab" in sys.modules:
    # マウントを行う
    from google.colab import drive

    drive.mount("/content/drive")
else:
    print("Not Google Colab")

### ライブラリのインポート

In [2]:
import copy
import io
import os
import math
import re
import tarfile
import time
import urllib.request
from typing import Optional

import matplotlib.pyplot as plt
import nltk
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim

import pickle
from tqdm.auto import tqdm

from janome.tokenizer import Tokenizer
from nltk import tokenize
from sklearn.model_selection import train_test_split

import seaborn as sns
from sklearn.metrics import accuracy_score, classification_report, confusion_matrix

## データ収集

### データセットの準備
コーディング試験Chapter11-2で使用したLivedoorニュースコーパスをダウンロードして使用します。
インターネット上に公開されているデータセットを以下のコードでダウンロードします。

In [None]:
PRJ_ROOT = "/content/drive/MyDrive/product_assignment/"
TARGZ_PATH = PRJ_ROOT + "ldcc-20140209.tar.gz"

with urllib.request.urlopen("https://www.rondhuit.com/download/ldcc-20140209.tar.gz") as res:
    with open(TARGZ_PATH, "wb") as f:
        f.write(res.read())

ダウンロードしたファイルは圧縮されているので、作業フォルダに展開します。

In [None]:
tar = tarfile.open(TARGZ_PATH)
tar.extractall(PRJ_ROOT)
tar.close()

### データセットの作成
カテゴリをラベル、ファイル内の文章をデータとしてそれらが対になったデータをCSV形式にして保存します。

In [None]:
from tqdm.auto import tqdm # 進捗バーを表示

# 展開したテキスト群が置かれている親ディレクトリ
DATA_DIR = PRJ_ROOT + 'text/'

# カテゴリ名（サブディレクトリ）のリストを取得
# 不要なファイル（例：LICENSE.txt）は除外
categories = [d for d in os.listdir(DATA_DIR) if os.path.isdir(os.path.join(DATA_DIR, d))]
print("対象カテゴリ：", categories)

# 最終的にDataFrameにするための、行データ（辞書）を格納するためのリスト
all_data = []

# tqdmを使って進捗を表示しながらカテゴリごとにループ
for category in tqdm(categories, desc="カテゴリ処理中"):
    category_path = os.path.join(data_dir, category)

    files = os.listdir(category_path)
    for file_name in files:
        # category内のREADME.mdはスキップ
        if file_name.endwith(".txt"):
            file_path = os.path.join(category_path, file_name)

            try: 
                with open(file_path, 'r', encoding='utf-8') as f:
                    # 最初の2行はURLとタイムスタンプなので読み飛ばし、3行目以降を本文として取得
                    lines = f.readlines()
                    text_body = "".join(lines[2:]).strip()

                    # ラベル（カテゴリ名）とテキスト本文を辞書としてリストに追加
                    all_data.append( {'label': category, 'text': text_body} )
            except Exception as e:
                print(f"Error readin {file_path}: {e}")

# ループ完了後、リストから一気にDataFrameを作成
df = pd.DataFrame(all_data)

# 作成したDataFrameをCSVとして保存（インデックスは不要なのでindexにはFalseを設定）
df.to_csv(DATA_DIR + "livedoor_news_corpus.csv", index=False, encoding="utf-8-sig")

print("\nCSVファイルの作成が完了しました。")
print("データ件数：", len(df))
print("Head：\n", df.head())

### 言語データの前処理

日本語を形態素解析して単語表層形に分かち書きします。

そのうえで単語をIDに変換します。`"CUDA out of memory"` の回避のために文章が512文字を超えた場合には切り詰めを行いました。

#### 日本語の分かち書きメソッド

In [None]:
wakati = Tokenizer()

""" 日本語のトークン化 """
def tokenize_ja(sentences_list):
    wakati_list = []
    print("トークン処理を開始します。")
    for sentence in tqdm(sentences_list):
        # tokenizeから返される表層形を分かち書きリストに登録
        wakati_list.append([item.surface for item in wakati.tokenize(sentence)])
    return wakati_list

#### 単語からIDへの辞書を生成

In [None]:
""" 単語からIDへの辞書を作成 """
def create_word_id_dict(sentences):
    word_to_id = {}  # 単語からIDへの変換辞書
    id_to_word = {}  # IDから単語への逆引き辞書
    # 0はパディング／未知語用に予約
    word_to_id['<PAD>/<UNK>'] = 0
    id_to_word[0] = '<PAD>/<UNK>'

    # すべての文章をループ  
    for sentence in sentences:
        # 文章内の各単語をループ
        for word in sentence:
            # もし単語がまだ辞書に登録されていなければ、新しいIDを割り振る、
            if word not in word_to_id:
                # 新しいIDとして、現在の辞書のサイズ（登録済みの単語数）を使用する
                tmp_id = len(word_to_id)
                word_to_id[word] = tmp_id
                id_to_word[tmp_id] = word

    # (単語をキー、IDをバリューとする辞書, IDをキー、単語をバリューとする辞書)のタプルを返す
    return word_to_id, id_to_word

#### 文章をID列に変換

In [None]:
""" 単語で構成された文章のリストを対応するIDのリストに変換 """
def convert_sentences_to_ids(sentences, word_to_id):
    sentence_id_list = []
    for sentence in sentences:
        # dict.get(key, default)メソッドによって、未知語でもエラーにならずにデフォルトである<UNK>のIDを返す
        sentence_ids = [word_to_id.get(word, 0) for word in sentence]
        sentence_id_list.append( sentence_ids )

    # IDに変換された文章のリストを返す 
    return sentence_ids

#### 文章のパディング処理

In [None]:
""" IDに変換され文章のリスト"に対して、paddingと打ち切り処理を行う """
def padding_and_truncate_sentence(sentences, max_len=512):
    # 処理が行われた後の文章IDを格納するリスト
    processed_sentences = []

    for sentence in sentences:
        # １．打ち切り
        # 文章の長さが max_lenを超える場合、末尾から max_len分だけを取得します。
        # 文章の末尾に重重要な情報含まれる場合が多いため、前から切り捨てます
        # 改善モデルにおけるMultiheadAttention層が内部で行う計算で、
        # 巨大な行列を作成しようとしてGPUのメモリが足りなくなる問題への対応として実施しました。
        sentence = sentence[-max_len:]

        # ２．padding
        # 文章の長さがmax_lenに満たない場合、差分を計算
        padding_size = max_len - len(sentence)

        # 足りない分だけ <PAD>のIDのリストを作成し、文章の前方に連結
        padding = [0] * padding_size
        processed_sentences.append(padding + sentence)

    return processed_sentences

#### 前処理の実行

In [None]:
# 生のCSVの読み込み
df = pd.read_csv(PRJ_ROOT + "livedoor_news_corpus.csv")

# 文章カテゴリ（ラベル）をID化
label_to_id = {label: i for i, label in enumerate(df['label'].unique())}
id_to_label = {i: label for i, label in enumerate(df['label'].unique())}

# テキストの分かち書き
ja_sentences = tokenize_ja(df["text"].tolist())

# 単語辞書の作成
word_to_id, id_to_word = create_word_id_dict(ja_sentences)

# 文章をID列に変換
sentence_ids = convert_sentences_to_ids(ja_sentences, word_to_id)

# padding処理
padded_ids = padding_and_truncate_sentence(sentence_ids)

# データをまとめた辞書を作成
processed_data = {
    'padded_ids': padded_ids,
    'labels': df['label_id'].tolist(),
    'word_to_id': word_to_id,
    'id_to_word': id_to_word,
    'label_to_id': label_to_id,
    'id_to_label': id_to_label,
}

# 学習用データはpickle形式にして保存する
with open(PKL_FILE_PATH + 'processed_data_maxlen512.pkl', 'wb') as f:
    pickle.dump(processed_data, f)

print(f"保存ファイル: processed_data_maxlen512.pkl")
print(f"語彙数: {len(word_to_id)}")

## 前処理データの読み込みとデータセットの分割

今回はニュースカテゴリの分類という分類問題を扱います。分類問題に対しては、各カテゴリのデータ比率を保ったまま分割する層化サンプリング（Stratified Sampling） を行うのが最適な方法です。

もし単純なランダムサンプリングの場合、特定のカテゴリのデータがテストデータにほとんど含まれない、といった事故が生じる可能性があります。

層化サンプリングではデータが少ないカテゴリも訓練・検証・テストの各データセットに**均等**に分配されます。そのため、そのような事故を回避でき、信頼性の高いモデル評価ができるようになります。

### 前処理データの読み込み

In [None]:
# pklファイルから前処理済みデータを読み込む
PKL_FILE_PATH = '/content/drive/MyDrive/deeplearning/processed_data_maxlen512.pkl'
with open(PKL_FILE_PATH, 'rb') as f: # 'rb'：バイナリ読み込みモード
    data = pickle.load(f)

# dataは辞書型オブジェクトとして保存
padded_ids = data['padded_ids']
labels = data['labels']

### データセットの分割

In [None]:
# データを訓練・検証・テスト用に分割
# ※この時点ではまだPythonのリスト型として扱う

# 1. 全体を「訓練＋検証用」（90%）と「テスト用」（10%）に分割
# test_size=0.1: 全体の10％をテストデータとする
# random_state=42: 乱数を固定し、何度実行しても同じ分割結果とする（再現性の確保）
# stratify=labels: 元のデータのラベル比率を保ったまま分割する
train_val_ids, test_ids, train_val_labels, test_labels = train_test_split(
    padded_ids, labels, test_size=0.1, random_state=42, stratify=labels
)

# 2. 次に「訓練＋検証用」のデータを「訓訓用（80%）」と「検証用（10％）」に分割
# train_valデータは全体の90%なので、そのうちの1/9を検証用にすると、全体から見て10%分相当となる
# 0.9 * 1/9 = 0.1
train_ids, val_ids, train_labels, val_labels = train_test_split(
    train_val_ids, train_val_labels, test_size=(1/9), random_state=42, stratify=train_val_labels
)

# PyTorchのDatasetクラスの定義
# Datasetクラス：データとそのラベルと保持し、インデックスを指定して１つずつ取り出すための器
class LivedoorTensorDataset(Dataset):
    def __init__(self, ids, labels):
        # コンストではｈクタ、IDのリストとラベルのリストをインスタンス変数として保持
        self.ids = ids
        self.labels = labels
    
    def __len__(self):
        # データセット全体のサンプル数を取得
        return len(self.labels)
    
    def __getitem__(self, index):
        # 指定されたindexのデータを取得。DataLoaderから内部的に呼び出される
        # ここでPythonのリスト型からPyTorchが扱えるTensor型へと変換している
        # dtype=torch.long：整数ID（単語IDやラベルIDを扱う際の標準的なデータ型）
        input_ids = torch.tensor(self.ids[index], dtype=torch.long)
        label_id  = torch.tensor(self.labels[index], dtype=torch.long)
        return input_ids, label_id

# 各データ分割に対応するDatasetインスタンスの作成
# 上記で定義したDatasetクラスを使い、訓練・検証・テストそれぞれのデータセットオブジェクトを作成
train_dataset = LivedoorTensorDataset(train_ids, train_labels)
val_dataset   = LivedoorTensorDataset(val_ids,   val_labels)
test_dataset  = LivedoorTensorDataset(test_ids,  test_labels)

# DataLoaderの作成
# DataLoaderはDatasetをラップし、ミニバッチ処理、データのシャッフル、並列読み込みなどを効率的に行う
BATCH_SIZE = 16 # 一度にモデルを投入するデータ数（バッチサイズ）

# 訓練用DataLoader：データをシャッフルすることで、学習の偏りを防ぎ、モデルの汎用性を向上させる
train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True)
# 検証用とテスト用DataLoader：評価時はデータの順序を維持するため、シャッフルは不要
val_loader = DataLoader(val_dataset, batch_size=BATCH_SIZE)
test_loader = DataLoader(test_dataset, batch_size=BATCH_SIZE)

# モデルのハイパーパラメータ
# データから自動的に設定
vocab_size = len(data['word_to_id']) # 語彙数。単語とIDの対応辞書のサイズから取得
num_classes = len(set(labels)) # 分類クラス数。ラベルの種類数をset()で重複を除いてカウント

# 人間が設定
embedding_dim = 256 # 単語をベクトル表現に変換した際のベクトルの次元数
hidden_dim = 256 # CNNやGRUの中間層（隠れ層）の次元数。モデルの表現力に影響

# 設定したパラメータを画面に出力して確認
print(f"語彙数 (vocab_size): {vocab_size}")
print(f"分類クラス数 (num_classes): {num_classes}")
print(f"単語ベクトルの次元数 (embedding_dim): {embedding_dim}")
print(f"隠れ層の次元数 (hidden_dim): {hidden_dim}")

## アルゴリズム選択（ベースラインモデル）

### ベースラインモデル設計

　最初にCNNとGRUを1つのディープラーニングモデルの中に層（レイヤー）として組み込み、それぞれの長所を活かす**ハイブリッド**な構造を採用しました。

今回はLSTMではなく**GRU**を採用しました。

GRUはLSTMと比べてゲートの数が少なく構造がシンプルなため、**計算コストが低く学習が速い**傾向にあります。

それでいて、多くのタスクで**LSTMと同等**の性能を発揮することが知られています。

今回の課題では、**計算効率**と**実装の容易さ**を考慮し、RNN系手法としてGRUを採用しました。

CNNは入力された単語ベクトルの並びに対して、**局所的な特徴（n-gramのような短い単語の組み合わせ）**を抽出する役割を果たします。

例えば、「とても面白い」や「つまらない」といったキーフレーズを効率的に見つけ出す役割を担います。

CNNとGRUの組み合わせを選んだ理由としてこれらの手法の以下の特徴に着目しました：

* CNNの長所: 文中の重要なキーワードやフレーズ（局所的な特徴）を効率的に捉えることができる

* GRUの長所: RNN系列の手法として、単語の系列（シーケンス）の文脈や順序関係を効果的に捉えることができる

この2つを組み合わせることで、**「文章中の重要な部分（CNNが担当）が、どのような文脈で登場したか（GRUが担当）」**を同時に学習できるモデルを作ることができると考えました。

構築したニューラルネットワークは以下のような**4つの中間層**からなる構成となっています：

1. **Embedding層**: 単語をベクトルに変換する層
2. **CNN層** (1次元CNN): 特徴を抽出する層
3. **GRU層**: 系列情報を処理する再帰的な層
4. **全結合層**: 最終的な分類を行う層

### ベースラインモデル作成

In [None]:
""" ベースラインモデル """
class CNN_GRU_Model(nn.Module):
    """
    CNNで系列データから局所的な特徴を抽出します。
    GRUでその特徴の時間的な依存関係を学習します。
    """
    def __init__(self, vocab_size, embedding_dim, hidden_dim, num_classes):
        """
        コンストラクタ

        Args:
            vocab_size (int): 語彙数。入力される単語の種類の数。
            embedding_dim (int): 単語埋め込みベクトルの次元数。
            hidden_dim (int): CNNの出力チャネル数。GRUの隠れ状態の次元数。
            num_classes (int): 出力クラス数。分類したいカテゴリの数。
        """

        # 1. Embedding層
        # 単語IDの系列を単語埋め込みに変換
        self.embedding = nn.Embedding(vocab_size, embedding_dim, padding_idx=0)

        # 2. 1次元CNN層（Convolutional Neural Network）
        # 埋め込みベクトルの系列から、n-gramのような局所的な特徴を抽出
        # in_channels: 入力チャネル数（埋め込み次元数）
        # out_channels: 出力チャネル数（抽出される特徴マップの数 = hidden_dim）
        # kernel_size=3: ３つの連続する単語ベクトルを一度に見る（3-gramに相当）
        # padding=1: 畳み込み後も系列長を変化させないためにpaddingを追加
        self.cnn = nn.Conv1d(in_channels=embedding_dim, out_channels=hidden_dim, kernel_size=3, padding=1)
        self.relu = nn.ReLU() # 活性化関数

        # 3. GRU（Gated Recurrent Unit）
        # CNNが抽出した特徴系列を入力とし、時間的な依存関係を学習
        # input_size: 各タイムステップにおける入力特徴の次元数（＝CNNの出力チャネル数）
        # hidden_size: GRUの隠れ状態ベクトルの次元数
        # batch_size=True: 入力テンソルの形状を（batch_size, seq_len, input_size）として扱う
        self.gru = nn.GRU(input_size=hidden_dim, hidden_size=hidden_dim, batch_size=True)

        # 4. 全結合層（Fully Connected Layer）
        # GRUから得られた最終的な文脈ベクトルを、指定されたクラス数にマッピングし、分類スコアを出力
        self.fc = nn.Linear(hidden_dim, num_classes)

    def forward(self, x):
        """
        順伝播

        Args: 
            x (torch.Tensor): 入力データ。単語IDの系列
                            　形状：（batch_size, seq_len）
        Returns:
            torch.Tensor: 各クラスに対する分類スコア
                        　形状：（batch_size, num_clases）
        """
        # 1. Embedding
        # x の形状：（batch_size, seq_len）
        x = self.embedding(x)   # -> (batch_size, seq_len, embedding_len)

        # 2. CNN
        # Conv1dは 入力として(batch, channels, seq_len) の形状を受け付けるため、埋め込みベクトルの次元を入れ替える
        # (batch_size, seq_len, embedding_dim) -> (batch_size, embedding_dim, seq_len)
        #       0         1            2                0            2           1 
        x = x.permute(0, 2, 1)

        # 畳み込み層とのの活性化関数を適用
        x = self.cnn(x)         # -> (batch_size, hidden_dim, seq_len)
        x = self.relu(x)

        # 3. GRU
        # GRUの入力（batch_first=True）は（batch, seq, feature）の形状を受け付けるため、再度次元を入れ替える
        # (batch_size, hidden_size, seq_len) -> (batch_size, seq_len, hidden_dim)
        #       0         1            2             0          2          1 
        x = x.permute(0, 2, 1)

        # GRUは全てのタイムステップの出力（output）と最期のタイムステップの隠れ状態（h_n）を返す
        # ここでは文脈全体を返す最後の隠れ状態のみ使用
        _, h_n = self.gru(x) # h_nの形状：(num_layers, batch_size, hidden_dim) 

        # 4. 全結合層
        # GRuの最後の隠れ状態の次元（num_layer）を解除し、（batch_size, hidden_dim）の形状に整形
        x = h_n.squeeze(0) # -> (batch_size, hidden_dim)

        # 全結合層に入力し、最終的なクラススコアを得ます
        out = self.fc(x) # -> (batch_size, num_classes)

        return out

### ベースラインモデルの学習

In [None]:
# 実行環境（GPU／CPU）の確認と設定
# GPUが利用可能か確認し、利用可能なら "cuda"、そうでなければ "cpu" をdeviceに設定
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

# CNN_GRU_Modelのインスタンスを生成し、.to(device)で指定したデバイスにモデルを転送
# モデルのパラメータ（重み）や計算が、指定したデバイスで実施されるようにする
model = CNN_GRU_Model(vocab_size, embedding_dim, hidden_dim, num_classes)
model.to(device)

# 最適化手法と損失関数の定義
# 最適化アルゴリズムとしてAdamを使用
# model.parameters() を渡すことで、モデルが持つすべてのパラメータを最適化の対象として登録
# lrは学習率（learning rate）で、パラメータ更新のステップ幅を決定
optimizer = optim.Adam(model.parameters(), lr=1e-4)

# 損失関数としてクロスエントロピー損失を定義
criterion = nn.CrossEntropyLoss()

# モデルの保存設定と学習ループの準備
# 才能の検証損失を保存する変数を初期化することによって、検証データの損失が改善した場合にのみモデルを保存する
# 早期終了（Early Stopping）に近い戦略
best_valid_loss = np.inf
MODEL_SAVE_PATH = PRJ_ROOT + 'baseline_model_best_model.pth' # 最良モデルの保存先ファイルパス

n_epoch = 10 # 学習エポック数
for epoch in range(n_epoch):
    # 各エポックの開始時に、訓練データと検証データの損失と正解数をリセット
    loss_train = 0
    loss_valid = 0
    accuracy_train = 0
    accuracy_valid = 0

    # 訓練モード
    # model.train()を呼び出し、モデルを「訓練モード」に切り替える
    # Dropout層やBatchNorm層などが訓練時の挙動となる
    model.train()
    for x,t in train_loader: # DataLoaderからミニバッチを1件ずつ取り出す
        # データをモデルと同じデバイスに転送
        x = x.to(device)
        t = t.to(device)

        # 前のバッチの勾配が累積しないように勾配をリセットする。
        optimizer.zero_grad()

        # 順伝播
        output = model(x)

        # 損失計算：モデルの出力と正解ラベルとを比較し、損失を計算
        loss = criterion(output, t)
        # 逆伝播：損失に基づいて、各パラメータの勾配を計算
        loss.backward()

        # パラメータ更新：計算された勾配を基に、オプティマイザがモデルのパラメータを更新
        optimizer.step()

        # 予測ラベルを計算。出力が最も大きいクラスを予測結果とするる
        pred = output.argmax(dim=1)

        # このバッチでの損失と正解数を、エポック全体の集計に加算
        # .item()はテンソルからPythonのスカラー値を取り出す
        loss_train += loss.item()
        accuracy_train += torch.sum(pred == t.data)
    
    # 検証モード
    # model.eval()を呼び出し、モデルを「検証モード」に切り替える
    model.eval()

    # 検証ではパラメータ更新はおこなわず、メモリ消費量を削減し、計算速度は向上する
    with torch.no_grad():
        for x,t in val_loader:
            # データをデバイスに転送
            x = x.to(device)
            t = t.to(device)

            # 順伝播
            output = model(x)

            # 損失計算
            loss = criterion(output, t)
            
            # 予測ラベルを計算
            pred = output.argmax(dim=1)

            # 損失と正解数を加算
            loss_valid += loss.item()
            # モデルの予測と正解ラベルとが等しいものの個数をカウントして足し合わせる
            accuracy_valid += torch.sum(pred == t.data)
    
    # エポックごとの結果を計算して表示する
    avg_loss_train = loss_train / len(train_loader)
    avg_loss_valid = loss_valid / len(val_loader)
    avg_acc_train = accuracy_train / len(train_dataset)
    avg_acc_valid = accuracy_valid / len(val_dataset)

    # 結果を整形して表示
    print(
        f"| epoch {epoch+1:2d} | train loss {avg_loss_train:.4f}, acc {avg_acc_train:.4f} "
        f"| valid loss {avg_loss_valid:.4f}, acc {avg_acc_valid:.4f}"
    )

    # 最良モデルの保存
    # 検証データの損失が、これまでの最小値を更新した場合
    if avg_loss_valid < best_valid_loss:
        print(f"Validation loss improved ({best_valid_loss:.4f} --> {avg_loss_valid:.4f}). Saving model...")
        best_valid_loss = avg_loss_valid # 最小損失を更新

        # モデルのパラメータ（重み）のみを保存
        torch.ave(model.state_dict(), MODEL_SAVE_PATH)

print(f"\nTraining finished. Best model saved to {MODEL_SAVE_PATH}")

### ベースラインモデルの評価 

In [None]:
# モデルの評価と結果の可視化
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)

print(f" '{PRJ_ROOT + 'baseline_model_best_model.pth'}' を読み込んでいます...")
with open(PRJ_ROOT + 'baseline_model_best_model.pth', 'rb') as f:
    data = pickle.load(f)

# pklファイル内のデータから、IDとカテゴリ名の対応辞書を再取得
# 評価レポートやグラフのラベルとして使用
id_to_label = data['id_to_label']

# 評価レポート用に、カテゴリ名のリストもここから作成
category_names = list(id_to_label.values())

# 全ての予測結果と正解ラベルを格納するための空のリストを準備
all_preds = []
all_labels = []

# モデルを「評価モード」に切り替える
model.eval()

#勾配計算を無効にして計算リソースを節約
with torch.no_grad():
    # テストデータ用のDataLoaderからミニバッチを１件ずつ取り出す
    for x,t in test_loader:
        # データをモデルと同じデバイにに転送
        x = x.to(device)
        t = t.to(device)

        # モデルにデータを入力し、順伝播させる
        output = model(x)

        # 最もスコアの高いクラスのインデックスを予測結果とする
        pred = output.argmax(dim=1)

        # 予測結果と正解ラベルをリストに追加
        #　scikit-learnで計算するために、GPUのTensorからCPU上のリストに変換する
        # .cpu()でCPUに転送し、.tolist()でPythonのリストに変換
        # 
        all_preds.extend( pred.cpu().tolist() )
        all_labels.extend( t.cpu().tolist() )
        '''
          当初は.numpy()でNumPy配列に変換しようとしていたが、RuntimeError: Numpy is not availableが発生した。
          これは、NumPyライブラリが見つからない、または利用できない」ことを示している。
          Colab上にプリインストールされていたNumPyが他のライブラリとの兼ね合いなどで不安定になったと思われる。
          ランタイムの再起動／リセットを試したが、改善しなかった。
          対応策として、NumPyを一切経由せず、Tensorを直接Pythonの標準的なリストに変換する .tolist() メソッドを使用した。
          scikit-learnの評価関数はNumPy配列だけでなく、Pythonのリストも受け付けるため、この方法で動作。
        ''' 

# 正解率（Accuracy）の計算
accuracy = accuracy_score(all_labels, all_preds)
print(f"Accuracy: {accuracy:.4f}")

# 適合率（Precision）、再現率（Recall）、F1スコア（F1-Score）を含むレポートを表示
print("\nClassification Report:")
report = classification_report(all_labels, all_preds, target_names=category_names)

# 結果をテキストファイルに保存
with open(DATA_DIR + "CNN_GRU_test_results.txt","w") as f:
    f.write(f"Accuracy: {accuracy:.4f}\n")
    f.write("\nClassification Report:\n")
    f.write(report)

# 混合行列（Confusion Matrix）の計算と可視化
cm = confusion_matrix(all_labels, all_preds)

# numpy配列のままだと軸が数字で見づらいため、PandasのDataFrameに変換
# indexとcolumnsにカテゴリ名を設定
cm_df = pd.DataFrame(cm, index=category_names, columns=category_names)

# 可視化
plt.figure(figsize=(10,8)) # グラフのサイズを指定
# seabornのheatmap関数を使って、混合行列をヒートマップとして描画
# annot=True：各セルに数値を表示
# fmt='d': 数値を整数で表示
# cmap='Greens': 色のテーマを指定
sns.heatmap(cm_df, annot=True, cmap='Greens')
plt.title('Confusion Matrix') # グラフのタイトル
plt.ylabel('True Label') # Y軸のラベル
plt.xlabel('Predicted Label') # X軸のラベル

plt.tight_layout() # ラベルがグラフ領域からはみ出ないように自動調整

# 混合行列を画像ファイルとして保存
plt.savefig(PRJ_ROOT + "CNN_GRU_confusion_matrix.png")

plt.show()

#### ベースラインモデルの予測結果の保存

In [None]:
# 予測結果をNumPyファイルとして保存
np.save(PRJ_ROOT + 'cnn_predictions.npy', np.array(all_preds))
print("ベースラインモデルの予測結果を cnn_predictions.npy として保存しました。")

### ベースラインモデルの考察

## アルゴリズム選択（改善モデル）

### 改善モデル設計

CNN層が担う「文中の重要な部分（キーフレーズなど）を見つけ出す」という役割は、Attentionメカニズムで代替することが可能であると考えました。

##### 自然言語の特徴抽出におけるCNNとAttentionの比較

* CNN層

　CNNは、**「位置的に近い単語の組み合わせ」**に注目します。カーネル（フィルター）と呼ばれる固定サイズの窓を文頭からスライドさせ、局所的なパターン（例：「とても 面白い」「価格が 高い」など）を検出します。

**強み**: 計算が**高速**で、n-gramのような**局所的**な特徴を捉えるのが得意。

**弱み**: 窓のサイズが固定されているため、**遠く離れた**単語同士の関係性を捉えるのが苦手。

* Attentionメカニズム

　Attentionは、**「文脈上、どの単語が他の単語と関連が深いか」**に注目します。文中のある単語を処理する際に、他のすべての単語との関連度（Attentionスコア）を計算し、スコアが高い単語の情報を重点的に利用します。

**強み**: 距離に関係なく、文全体の文脈から動的に単語の重要度を判断できる。「その」という指示語が文頭のどの名詞を指しているか、といった**長期的な依存関係**を捉えるのが非常に得意。

**弱み**: 計算量が系列長の2乗に比例して増える（$O(n^2)$ ）ため、非常に長い文章では計算コストが高くなることがある。

　Attentionは、CNNのように局所的な関係に限定されません。文脈に応じて、隣り合った単語に高いスコアを与えることも、遠く離れた単語に高いスコアを与えることもできます。

　CNNの代わりにAttentionを特徴抽出に利用することで、**文脈情報**をより加味した分類を実現することができ、livedoor-hommeやpeachyのような、**単語の組み合わせ**や**文脈のニュアンス**が重要なカテゴリの分類精度を向上できると考えました。

以下のような**4つの中間層**からなる構成のニューラルネットワークを改善モデルとして考案しました：

1. **Embedding層**: 単語をベクトルに変換する層
2. **Attentionメカニズム** (Self-Attention): 特徴を抽出する層
3. **GRU層**: 系列情報を処理する再帰的な層
4. **全結合層**: 最終的な分類を行う層

### 改善モデル作成

In [None]:
class Attention_GRU_Model(nn.Module):
    """
    Multi-Head Self-Attention機構をGRU前段に組み込んだモデル
    Attention層が入力シーケンス内の単語間の関連性を捉え、
    GRUが時系列に処理することで、より高度な特徴抽出を目指す
    """
    def __init__(self, vocab_size, embedding_dim, hidden_dim, num_classes, num_heads=8):
        """
        コンストラクタ

        Args:  
            Vocab_size (int): 語彙数
            embedding_dim (int): 単語埋め込みベクトルの次元数。Attention層の入力次元数でもある
            hidden_dim (int): GRUの隠れ状態の次元数
            num_classes (int): 出力クラス数
            num_heads (int): Multi-Head Attentionのヘッド数
        """
        super(Attention_GRU_Model,self).__init__()

        # 1. Embedding層
        self.embedding = nn.Embedding(vocab_size, embedding_dim, padding_idx=0)

        # 2. Attention層（Multi-Head Self-Attention）
        # 入力系列内の各単語が、他のどの単語に注目すべきかを学習します。
        # embeded_dim: 入出力の次元数
        # num_heads: 並列計算のためにAttentionの「ヘッド」をいくつに分割するかを設定
        # batch_first=False: 入力の形式を（seq_len, batch_size, dim）に指定します。
        self.multi_head_attention = nn.MultiheadAttention(
            embed_dim=embedding_dim,
            num_heads=num_heads,
            batch_first=False
        )

        # 学習を安定させるためのレイヤ正規化
        # Attention層に適用
        self.norm1 = nn.LayerNorm(embedding_dim)

        # 3. GRU層
        self.gru = nn.GRU(input_size=embedding_dim, hidden_size=hidden_dim, batch_first=True)

        # 4. 全結合層
        self.fc = nn.Linear(hidden_dim, num_classes)

    def forward(self, x):
        """
        順伝播

        Args:
            x (torch.Tensro): 入力データ。形状：(batch_size, seq_len)
        
        Returns:
            torch.Tensor: 各クラスに対する分類スコア。形状：(batch_size, num_classes)
        """
        # 1. Embedding
        x = self.embedding(x) # -> (batch_size, seq_len, embedding_dim)

        # 2. Self-Attention
        # MultiheadAttentionの入力形式 (seq_len, batch_size, dim)に合わせるため、次元を入れ替える
        x_permuted = x.permute(1, 0, 2) # -> (seq_len, batch_size, embedding_dim)

        # Self-Atttentionでは、Query, Key, Valueにすべて同じ入力（x_permuted）を用いる
        # MultiheadAttentionの順伝播
        attn_output, _ = self.multi_head_attention(query=x_permuted, key=x_permuted, value=x_permuted)

        # 残差接続（residual Connection）とレイヤ正規化
        # 入力 x_permuted をAttentionの出たた足し合わせることで、勾配消失を防ぐ
        x = self.norm1(x_permuted + attn_output) # -> (seq_len, batch_size, embedding_dim)

        # 3. GRU
        # GRUの入力形式（batch_size, seq_len, dim）に戻すため、再度次元を入れ替える
        x_permuted = x.permute(1, 0, 2) # -> (seq_len, batch_size, embedding_dim)

        # GRU層に入力し、最後の隠れ状態 h_n を取得
        _, h_n = self.gru(x) # h_n の形状：（num_layers, batch_size, hidden_dim)

        # 4. 分類
        x = h_n.squeeze(0) # -> (batch_size, hidden_dim)
        # 全結合層で最終的なクラススコアを出力
        out = self.fc(x) # -> (batch_size, num_classes)

        return out

### 改善モデルの学習

In [None]:
# 実行環境（GPU／CPU）の確認と設定
# GPUが利用可能か確認し、利用可能なら "cuda"、そうでなければ "cpu" をdeviceに設定
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

# CNN_Attention_Modelのインスタンスを生成し、.to(device)で指定したデバイスにモデルを転送
# モデルのパラメータ（重み）や計算が、指定したデバイスで実施されるようにする
model = Attention_GRU_Model(vocab_size, embedding_dim, hidden_dim, num_classes)
model.to(device)

# 最適化手法と損失関数の定義
# 最適化アルゴリズムとしてAdamを使用
# model.parameters() を渡すことで、モデルが持つすべてのパラメータを最適化の対象として登録
# lrは学習率（learning rate）で、パラメータ更新のステップ幅を決定
optimizer = optim.Adam(model.parameters(), lr=1e-4)

# 損失関数としてクロスエントロピー損失を定義
criterion = nn.CrossEntropyLoss()

# モデルの保存設定と学習ループの準備
# 才能の検証損失を保存する変数を初期化することによって、検証データの損失が改善した場合にのみモデルを保存する
# 早期終了（Early Stopping）に近い戦略
best_valid_loss = np.inf
MODEL_SAVE_PATH = PRJ_ROOT + 'improved_model_best_model.pth' # 最良モデルの保存先ファイルパス

n_epoch = 10 # 学習エポック数
for epoch in range(n_epoch):
    # 各エポックの開始時に、訓練データと検証データの損失と正解数をリセット
    loss_train = 0
    loss_valid = 0
    accuracy_train = 0
    accuracy_valid = 0

    # 訓練モード
    # model.train()を呼び出し、モデルを「訓練モード」に切り替える
    # Dropout層やBatchNorm層などが訓練時の挙動となる
    model.train()
    for x,t in train_loader: # DataLoaderからミニバッチを1件ずつ取り出す
        # データをモデルと同じデバイスに転送
        x = x.to(device)
        t = t.to(device)

        # 前のバッチの勾配が累積しないように勾配をリセットする。
        optimizer.zero_grad()

        # 順伝播
        output = model(x)

        # 損失計算：モデルの出力と正解ラベルとを比較し、損失を計算
        loss = criterion(output, t)
        # 逆伝播：損失に基づいて、各パラメータの勾配を計算
        loss.backward()

        # パラメータ更新：計算された勾配を基に、オプティマイザがモデルのパラメータを更新
        optimizer.step()

        # 予測ラベルを計算。出力が最も大きいクラスを予測結果とするる
        pred = output.argmax(dim=1)

        # このバッチでの損失と正解数を、エポック全体の集計に加算
        # .item()はテンソルからPythonのスカラー値を取り出す
        loss_train += loss.item()
        accuracy_train += torch.sum(pred == t.data)
    
    # 検証モード
    # model.eval()を呼び出し、モデルを「検証モード」に切り替える
    model.eval()

    # 検証ではパラメータ更新はおこなわず、メモリ消費量を削減し、計算速度は向上する
    with torch.no_grad():
        for x,t in val_loader:
            # データをデバイスに転送
            x = x.to(device)
            t = t.to(device)

            # 順伝播
            output = model(x)

            # 損失計算
            loss = criterion(output, t)
            
            # 予測ラベルを計算
            pred = output.argmax(dim=1)

            # 損失と正解数を加算
            loss_valid += loss.item()
            # モデルの予測と正解ラベルとが等しいものの個数をカウントして足し合わせる
            accuracy_valid += torch.sum(pred == t.data)
    
    # エポックごとの結果を計算して表示する
    avg_loss_train = loss_train / len(train_loader)
    avg_loss_valid = loss_valid / len(val_loader)
    avg_acc_train = accuracy_train / len(train_dataset)
    avg_acc_valid = accuracy_valid / len(val_dataset)

    # 結果を整形して表示
    print(
        f"| epoch {epoch+1:2d} | train loss {avg_loss_train:.4f}, acc {avg_acc_train:.4f} "
        f"| valid loss {avg_loss_valid:.4f}, acc {avg_acc_valid:.4f}"
    )

    # 最良モデルの保存
    # 検証データの損失が、これまでの最小値を更新した場合
    if avg_loss_valid < best_valid_loss:
        print(f"Validation loss improved ({best_valid_loss:.4f} --> {avg_loss_valid:.4f}). Saving model...")
        best_valid_loss = avg_loss_valid # 最小損失を更新

        # モデルのパラメータ（重み）のみを保存
        torch.ave(model.state_dict(), MODEL_SAVE_PATH)

print(f"\nTraining finished. Best model saved to {MODEL_SAVE_PATH}")

### 改善モデルの評価

In [None]:
# モデルの評価と結果の可視化
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)

print(f" '{PRJ_ROOT + 'improved_model_best_model.pth'}' を読み込んでいます...")
with open(PRJ_ROOT + 'improved_model_best_model.pth', 'rb') as f:
    data = pickle.load(f)

# pklファイル内のデータから、IDとカテゴリ名の対応辞書を再取得
# 評価レポートやグラフのラベルとして使用
id_to_label = data['id_to_label']

# 評価レポート用に、カテゴリ名のリストもここから作成
category_names = list(id_to_label.values())

# 全ての予測結果と正解ラベルを格納するための空のリストを準備
all_preds = []
all_labels = []

# モデルを「評価モード」に切り替える
model.eval()

#勾配計算を無効にして計算リソースを節約
with torch.no_grad():
    # テストデータ用のDataLoaderからミニバッチを１件ずつ取り出す
    for x,t in test_loader:
        # データをモデルと同じデバイにに転送
        x = x.to(device)
        t = t.to(device)

        # モデルにデータを入力し、順伝播させる
        output = model(x)

        # 最もスコアの高いクラスのインデックスを予測結果とする
        pred = output.argmax(dim=1)

        # 予測結果と正解ラベルをリストに追加
        #　scikit-learnで計算するために、GPUのTensorからCPU上のリストに変換する
        # .cpu()でCPUに転送し、.tolist()でPythonのリストに変換
        # 
        all_preds.extend( pred.cpu().tolist() )
        all_labels.extend( t.cpu().tolist() )
        '''
          当初は.numpy()でNumPy配列に変換しようとしていたが、RuntimeError: Numpy is not availableが発生した。
          これは、NumPyライブラリが見つからない、または利用できない」ことを示している。
          プリインストールされていたNumPyが他のライブラリとの兼ね合いなどで不安定になったと思われる。
          ランタイムのリセット等を試したが、改善しなかったので、NumPyを一切経由せず、
          Tensorを直接Pythonの標準的なリストに変換する .tolist() メソッドを使用した。
          scikit-learnの評価関数はNumPy配列だけでなく、Pythonのリストも受け付けるため、この方法で問題なく動作。
        ''' 

# 正解率（Accuracy）の計算
accuracy = accuracy_score(all_labels, all_preds)
print(f"Accuracy: {accuracy:.4f}")

# 適合率（Precision）、再現率（Recall）、F1スコア（F1-Score）を含むレポートを表示
print("\nClassification Report:")
report = classification_report(all_labels, all_preds, target_names=category_names)

# 結果をテキストファイルに保存
with open(DATA_DIR + "Attention_GRU_test_results.txt","w") as f:
    f.write(f"Accuracy: {accuracy:.4f}\n")
    f.write("\nClassification Report:\n")
    f.write(report)

# 混合行列（Confusion Matrix）の計算と可視化
cm = confusion_matrix(all_labels, all_preds)

# numpy配列のままだと軸が数字で見づらいため、PandasのDataFrameに変換
# indexとcolumnsにカテゴリ名を設定
cm_df = pd.DataFrame(cm, index=category_names, columns=category_names)

# 可視化
plt.figure(figsize=(10,8)) # グラフのサイズを指定
# seabornのheatmap関数を使って、混合行列をヒートマップとして描画
# annot=True：各セルに数値を表示
# fmt='d': 数値を整数で表示
# cmap='Greens': 色のテーマを指定
sns.heatmap(cm_df, annot=True, cmap='Blues')
plt.title('Confusion Matrix') # グラフのタイトル
plt.ylabel('True Label') # Y軸のラベル
plt.xlabel('Predicted Label') # X軸のラベル

plt.tight_layout() # ラベルがグラフ領域からはみ出ないように自動調整

# 混合行列を画像ファイルとして保存
plt.savefig(PRJ_ROOT + "Attention_GRU_confusion_matrix.png")

plt.show()

#### 改善モデルの予測結果の保存

In [None]:
# 予測結果をNumPyファイルとして保存
np.save(PRJ_ROOT + 'attention_predictions.npy', np.array(all_preds))
print("改善モデルの予測結果を attention_predictions.npy として保存しました。")

### 改善モデルの考察

## 考察と今後の課題

### 考察

#### ベースラインモデル／改善モデルの予測結果サンプルを抽出

In [None]:
# 保存した両モデルの予測結果をロード
cnn_preds = np.load(PRJ_ROOT + 'cnn_predictions.npy').tolist()
attention_preds = np.load(PRJ_ROOT + 'attention_predictions.npy').tolist()

# pklファイルから前処理済みデータと辞書をロード
with open(DATA_DIR + 'processed_data_maxlen512.pkl','rb') as f:
    data = pickle.load(f)
id_to_label = data['id_to_label']

# テストデータの「原文」と「正解ラベル」を再取得
from sklearn.model_selection import train_test_split
raw_df = pd.read_csv(PRJ_ROOT + 'livedoor_news_corpus.csv') # 元の生データCSV

# livedoor_news_corpus.csv テキストとラベルを取得
X_raw = raw_df['text']
y_raw = raw_df['label'].map( { label: i for i, label in enumerate(raw_df['label'].unique()) } ) # ラベルを数値化

# 分割してテストデータを取得
_, X_test_raw, _, test_labels = train_test_split(
    X_raw, y_raw, test_size=0.1, random_state=42, stratify=y_raw
)

test_texts = X_test_raw.tolist()

# テストデータの原文、正解ラベル、両モデルの予測結果をまとめたDataFrameを作成
results_df = pd.DataFrame({
    'text': test_texts,
    'true_label': test_labels,
    'cnn_pred': cnn_preds,
    'attention_pred': attention_preds
})

# IDをカテゴリ名に変換
results_df['true_label_name'] = results_df['true_label'].map(id_to_label)
results_df['cnn_pred_name'] = results_df['cnn_pred'].map(id_to_label)
results_df['attention_pred_name'] = results_df['attention_pred'].map(id_to_label)

# 抽出条件
# ベースラインモデルでは誤分類したが、改善モデルでは正しく分類したもの
condition1 = results_df['cnn_pred'] != results_df['true_label']
condition2 = results_df['attention_pred'] == results_df['true_label']

improved_examples = results_df[condition1 & condition2]

# 結果の表示
print("【改善事例の抽出結果】")

# 特に 'livedoor-homme' の事例を見てみる
homme_improved = improved_examples[improved_examples['true_label_name'] == 'livedoor-homme']

for index, row in homme_improved.head(3).iterrows():
    print("="*50)
    print(f"【改善事例】 正解カテゴリ: {row['true_label_name']}")
    print(f"  - CNNの予測 (間違い): {row['cnn_pred_name']}")
    print(f"  - Attentionの予測 (正解): {row['attention_pred_name']}")
    print("\n【記事本文 (冒頭部分)】")
    print(row['text'][:200] + "...")
    print("="*50)

エラー分析: 実際にモデルが間違えた記事をいくつか読んでみて、なぜ間違えたのかを人間が分析することで、新たな改善のヒントが見つかることがあります。

### 今後の課題

### 参考