# 第9章: 事前学習済み言語モデル（BERT型）

本章では、BERT型の事前学習済みモデルを利用して、マスク単語の予測や文ベクトルの計算、評判分析器（ポジネガ分類器）の構築に取り組む。

## 80. トークン化

"The movie was full of incomprehensibilities."という文をトークンに分解し、トークン列を表示せよ。

In [1]:
# Hugging Face Transformers ライブラリのインストール (まだの場合、Jupyter Notebookのセルで先頭に!をつけて実行)
# !pip install transformers

from transformers import AutoTokenizer

# 1. トークナイザのロード
# 一般的な英語BERTモデルである 'bert-base-uncased' を例として使用します。
# このモデルは、テキストを小文字化してからトークン化します。
# 他のモデル名 (例: 'bert-base-cased', 'bert-large-uncased') も指定可能です。
tokenizer_name = "bert-base-uncased" 

try:
    tokenizer = AutoTokenizer.from_pretrained(tokenizer_name)
    print(f"トークナイザ '{tokenizer_name}' をロードしました。")
except Exception as e:
    print(f"トークナイザ '{tokenizer_name}' のロード中にエラーが発生しました: {e}")
    print("指定したモデル名が正しいか、インターネット接続があるか確認してください。")
    print("Hugging Face Hub (https://huggingface.co/models) でモデル名を確認できます。")
    tokenizer = None # エラーの場合はtokenizerをNoneに設定

if tokenizer:
    # 2. トークン化するテキスト
    text_to_tokenize = "The movie was full of incomprehensibilities."
    print(f"\n元のテキスト: \"{text_to_tokenize}\"")

    # 3. テキストのトークン化
    # tokenize() メソッドは、テキストをトークンの文字列リストに分割します。
    tokens = tokenizer.tokenize(text_to_tokenize)
    
    # 4. トークン列の表示
    print("\nトークン化された列:")
    print(tokens)

    print("-" * 30)
    print("（参考情報）")
    # 参考: トークナイザの encode() メソッドによる出力 (トークンIDのリスト)
    # add_special_tokens=True にすると、[CLS] や [SEP] といった特殊トークンも自動的に付与されます。
    # (BERTが文の入力を受け付ける際の標準的な形式)
    token_ids_with_special_tokens = tokenizer.encode(text_to_tokenize, add_special_tokens=True)
    print(f"\nencode() によるトークンID (特殊トークン含む):")
    print(token_ids_with_special_tokens)
    
    # 参考: 上記のトークンIDを再度トークン文字列に変換
    decoded_tokens_with_special = tokenizer.convert_ids_to_tokens(token_ids_with_special_tokens)
    print(f"\n上記IDをトークン文字列に戻したもの (特殊トークン含む):")
    print(decoded_tokens_with_special)

    # 参考: トークナイザの __call__() メソッド (推奨される使い方)
    # PyTorchテンソル形式で、モデル入力に必要な情報をまとめて取得できます。
    inputs = tokenizer(text_to_tokenize, return_tensors="pt", add_special_tokens=True)
    print("\n__call__() によるモデル入力形式 (PyTorchテンソル):")
    print(f"  Input IDs: {inputs['input_ids']}")
    if 'token_type_ids' in inputs: # モデルによっては token_type_ids がない場合もある
        print(f"  Token Type IDs: {inputs['token_type_ids']}")
    print(f"  Attention Mask: {inputs['attention_mask']}")
    print("-" * 30)

else:
    print("\nトークナイザがロードされなかったため、処理を実行できませんでした。")

トークナイザ 'bert-base-uncased' をロードしました。

元のテキスト: "The movie was full of incomprehensibilities."

トークン化された列:
['the', 'movie', 'was', 'full', 'of', 'inc', '##omp', '##re', '##hen', '##si', '##bilities', '.']
------------------------------
（参考情報）

encode() によるトークンID (特殊トークン含む):
[101, 1996, 3185, 2001, 2440, 1997, 4297, 25377, 2890, 10222, 5332, 14680, 1012, 102]

上記IDをトークン文字列に戻したもの (特殊トークン含む):
['[CLS]', 'the', 'movie', 'was', 'full', 'of', 'inc', '##omp', '##re', '##hen', '##si', '##bilities', '.', '[SEP]']

__call__() によるモデル入力形式 (PyTorchテンソル):
  Input IDs: tensor([[  101,  1996,  3185,  2001,  2440,  1997,  4297, 25377,  2890, 10222,
          5332, 14680,  1012,   102]])
  Token Type IDs: tensor([[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]])
  Attention Mask: tensor([[1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1]])
------------------------------


## 81. マスクの予測

"The movie was full of [MASK]."の"[MASK]"を埋めるのに最も適切なトークンを求めよ。

In [2]:
# Hugging Face Transformers ライブラリのインストール (まだの場合)
# !pip install transformers
# !pip install torch # PyTorchも必要

import torch
from transformers import AutoTokenizer, AutoModelForMaskedLM

# 1. トークナイザとモデルのロード
# 問題80で使用したのと同じモデル名を指定するのが一般的
tokenizer_name_mlm = "bert-base-uncased"
model_name_mlm = "bert-base-uncased" 

try:
    tokenizer_mlm = AutoTokenizer.from_pretrained(tokenizer_name_mlm)
    model_mlm = AutoModelForMaskedLM.from_pretrained(model_name_mlm)
    print(f"トークナイザ '{tokenizer_name_mlm}' とモデル '{model_name_mlm}' をロードしました。")
    
    # モデルを評価モードに設定 (推論のみ行うため)
    model_mlm.eval()
    
except Exception as e:
    print(f"トークナイザまたはモデルのロード中にエラーが発生しました: {e}")
    tokenizer_mlm = None
    model_mlm = None

if tokenizer_mlm and model_mlm:
    # 2. 入力テキストの準備
    text_with_mask = "The movie was full of [MASK]."
    print(f"\n入力テキスト: \"{text_with_mask}\"")

    # 3. テキストのトークン化とマスク位置の特定
    # モデル入力形式で取得 (PyTorchテンソル)
    inputs = tokenizer_mlm(text_with_mask, return_tensors="pt", add_special_tokens=True)
    input_ids = inputs["input_ids"]
    
    # [MASK] トークンのIDを取得
    mask_token_id = tokenizer_mlm.mask_token_id
    
    # input_ids の中で [MASK] トークンID がある位置(インデックス)を見つける
    # バッチサイズ1を想定
    mask_token_indices = torch.where(input_ids == mask_token_id)[1] # [1]で列インデックスを取得

    if len(mask_token_indices) == 0:
        print("エラー: 入力テキストに [MASK] トークンが見つかりません。")
    else:
        mask_token_index = mask_token_indices[0] # 最初のマスク位置を使用
        print(f"[MASK] トークンの位置 (インデックス): {mask_token_index.item()}")

        # 4. モデルによる予測 (勾配計算は不要なので torch.no_grad() を使用)
        with torch.no_grad():
            outputs = model_mlm(**inputs) # キーワード引数として渡す
            predictions = outputs.logits
            # predictions の形状: (バッチサイズ, シーケンス長, 語彙サイズ)
        
        # 5. 最も適切なトークンの特定
        # [MASK] トークンの位置における予測 logits を取得
        mask_token_logits = predictions[0, mask_token_index, :] # バッチサイズ1を仮定
        
        # 最もスコアが高いトークンのIDを予測
        predicted_token_id = torch.argmax(mask_token_logits).item()
        
        # 予測されたトークンIDを実際のトークン文字列に変換
        predicted_token = tokenizer_mlm.convert_ids_to_tokens([predicted_token_id])[0]
        
        print(f"\n[MASK] を埋めるのに最も適切なトークン: \"{predicted_token}\"")

else:
    print("\nトークナイザまたはモデルがロードされなかったため、処理を実行できませんでした。")

Some weights of the model checkpoint at bert-base-uncased were not used when initializing BertForMaskedLM: ['bert.pooler.dense.bias', 'bert.pooler.dense.weight', 'cls.seq_relationship.bias', 'cls.seq_relationship.weight']
- This IS expected if you are initializing BertForMaskedLM from the checkpoint of a model trained on another task or with another architecture (e.g. initializing a BertForSequenceClassification model from a BertForPreTraining model).
- This IS NOT expected if you are initializing BertForMaskedLM from the checkpoint of a model that you expect to be exactly identical (initializing a BertForSequenceClassification model from a BertForSequenceClassification model).


トークナイザ 'bert-base-uncased' とモデル 'bert-base-uncased' をロードしました。

入力テキスト: "The movie was full of [MASK]."
[MASK] トークンの位置 (インデックス): 6

[MASK] を埋めるのに最も適切なトークン: "fun"


## 82. マスクのtop-k予測

"The movie was full of [MASK]."の"[MASK]"に埋めるのに適切なトークン上位10個と、その確率（尤度）を求めよ。

In [3]:
import torch
import torch.nn.functional as F # ソフトマックス関数を使用するため
from transformers import AutoTokenizer, AutoModelForMaskedLM

# 1. トークナイザとモデルのロード (問題81でロード済みであれば再利用)
# tokenizer_name_mlm = "bert-base-uncased" # 問題81と同じ
# model_name_mlm = "bert-base-uncased"   # 問題81と同じ

# tokenizer_mlm と model_mlm が問題81から引き継がれているか確認
if 'tokenizer_mlm' not in locals() or 'model_mlm' not in locals() or \
   tokenizer_mlm is None or model_mlm is None:
    print("エラー: 'tokenizer_mlm' または 'model_mlm' が定義されていません。")
    print("問題81を先に実行して、トークナイザとモデルをロードしてください。")
    # 処理を中断するための措置
    raise NameError("Tokenizer or Model not loaded from Problem 81.")
else:
    print(f"問題81でロード済みのトークナイザ '{tokenizer_mlm.name_or_path}' とモデルを使用します。")
    model_mlm.eval() # 念のため評価モードに

# 2. 入力テキストの準備
text_with_mask_top_k = "The movie was full of [MASK]."
print(f"\n入力テキスト: \"{text_with_mask_top_k}\"")

# 3. テキストのトークン化とマスク位置の特定
inputs_top_k = tokenizer_mlm(text_with_mask_top_k, return_tensors="pt", add_special_tokens=True)
input_ids_top_k = inputs_top_k["input_ids"]
mask_token_id_top_k = tokenizer_mlm.mask_token_id
mask_token_indices_top_k = torch.where(input_ids_top_k == mask_token_id_top_k)[1]

if len(mask_token_indices_top_k) == 0:
    print("エラー: 入力テキストに [MASK] トークンが見つかりません。")
else:
    mask_token_index_top_k = mask_token_indices_top_k[0]
    print(f"[MASK] トークンの位置 (インデックス): {mask_token_index_top_k.item()}")

    # 4. モデルによる予測とスコア取得 (勾配計算は不要)
    with torch.no_grad():
        outputs_top_k = model_mlm(**inputs_top_k)
        predictions_top_k = outputs_top_k.logits
    
    # [MASK] トークンの位置における予測 logits を取得
    mask_token_logits_top_k = predictions_top_k[0, mask_token_index_top_k, :]
    
    # 5. 上位k個のトークンとその確率の特定
    top_k = 10
    
    # ロジットを確率に変換 (ソフトマックス)
    mask_token_probs = F.softmax(mask_token_logits_top_k, dim=-1)
    
    # 確率が高い上位k個のトークンの確率値とインデックス(ID)を取得
    top_k_probs, top_k_indices = torch.topk(mask_token_probs, top_k)
    
    # 予測された上位k個のトークンIDを実際のトークン文字列に変換
    top_k_tokens = tokenizer_mlm.convert_ids_to_tokens(top_k_indices)
    
    print(f"\n[MASK] を埋めるのに適切なトークン上位{top_k}個と、その確率:")
    for i in range(top_k):
        print(f"  {i+1}. トークン: \"{top_k_tokens[i]}\", 確率: {top_k_probs[i].item():.4f}")

問題81でロード済みのトークナイザ 'bert-base-uncased' とモデルを使用します。

入力テキスト: "The movie was full of [MASK]."
[MASK] トークンの位置 (インデックス): 6

[MASK] を埋めるのに適切なトークン上位10個と、その確率:
  1. トークン: "fun", 確率: 0.1071
  2. トークン: "surprises", 確率: 0.0663
  3. トークン: "drama", 確率: 0.0447
  4. トークン: "stars", 確率: 0.0272
  5. トークン: "laughs", 確率: 0.0254
  6. トークン: "action", 確率: 0.0195
  7. トークン: "excitement", 確率: 0.0190
  8. トークン: "people", 確率: 0.0183
  9. トークン: "tension", 確率: 0.0150
  10. トークン: "music", 確率: 0.0146


## 83. CLSトークンによる文ベクトル

以下の文の全ての組み合わせに対して、最終層の[CLS]トークンの埋め込みベクトルを用いてコサイン類似度を求めよ。

- "The movie was full of fun."
- "The movie was full of excitement."
- "The movie was full of crap."
- "The movie was full of rubbish."


In [4]:
import torch
import torch.nn.functional as F # コサイン類似度の計算用
from transformers import AutoTokenizer, AutoModel
from itertools import combinations # 文のペア作成用

# 1. トークナイザとモデルのロード
# 問題80-82と同じトークナイザ、ただしモデルは AutoModel を使用
tokenizer_name_cls = "bert-base-uncased" 
model_name_cls = "bert-base-uncased"     # 事前学習済みBERTモデル

try:
    tokenizer_cls = AutoTokenizer.from_pretrained(tokenizer_name_cls)
    # CLSトークンベクトルを取得するため、AutoModelForMaskedLMではなくAutoModelを使用
    model_cls = AutoModel.from_pretrained(model_name_cls)
    print(f"トークナイザ '{tokenizer_name_cls}' とモデル '{model_name_cls}' をロードしました。")
    
    # モデルを評価モードに設定
    model_cls.eval()
    
except Exception as e:
    print(f"トークナイザまたはモデルのロード中にエラーが発生しました: {e}")
    tokenizer_cls = None
    model_cls = None

def get_cls_vector(text, tokenizer, model):
    """テキストからBERTの最終層の[CLS]トークンベクトルを取得する関数"""
    if not tokenizer or not model:
        return None
        
    inputs = tokenizer(text, return_tensors="pt", add_special_tokens=True, truncation=True, max_length=512)
    
    with torch.no_grad():
        outputs = model(**inputs)
        # last_hidden_state の形状: (バッチサイズ, シーケンス長, 隠れ層次元数)
        # [CLS] トークンはシーケンスの最初のトークン (インデックス0)
        cls_vector = outputs.last_hidden_state[:, 0, :] # バッチ内の全事例のCLSトークンを取得
        # 今回は1文ずつ処理するので、バッチサイズは1
        return cls_vector[0] # (隠れ層次元数) の形状のテンソルを返す

if tokenizer_cls and model_cls:
    # 3. 対象の文リスト
    sentences = [
        "The movie was full of fun.",
        "The movie was full of excitement.",
        "The movie was full of crap.",
        "The movie was full of rubbish."
    ]

    print("\n各文の[CLS]トークンベクトルを抽出します...")
    sentence_vectors = {}
    for sentence in sentences:
        cls_vec = get_cls_vector(sentence, tokenizer_cls, model_cls)
        if cls_vec is not None:
            sentence_vectors[sentence] = cls_vec
            print(f"  文: \"{sentence}\" -> CLSベクトル抽出完了 (形状: {cls_vec.shape})")
        else:
            print(f"  文: \"{sentence}\" -> CLSベクトル抽出失敗")

    # 4. 各文ペアに対するコサイン類似度の計算
    print("\n文ペア間のコサイン類似度:")
    # sentences リストのインデックスではなく、sentence_vectors のキー (文自体) を使う
    # これにより、ベクトル抽出に失敗した文は自動的に除外される
    valid_sentences = list(sentence_vectors.keys())

    for sent1, sent2 in combinations(valid_sentences, 2): # 全てのユニークなペアを作成
        if sent1 in sentence_vectors and sent2 in sentence_vectors:
            vec1 = sentence_vectors[sent1].unsqueeze(0) # (1, hidden_dim) に形状変更
            vec2 = sentence_vectors[sent2].unsqueeze(0) # (1, hidden_dim) に形状変更
            
            # コサイン類似度を計算 (torch.nn.functional.cosine_similarity)
            # F.cosine_similarity は (N, D) と (N, D) を比較し、(N) を返すので、
            # ここでは (1, D) と (1, D) から (1) のテンソルが得られる
            similarity = F.cosine_similarity(vec1, vec2, dim=1) # dim=1でベクトル間で計算
            
            print(f"  文1: \"{sent1}\"")
            print(f"  文2: \"{sent2}\"")
            print(f"  コサイン類似度: {similarity.item():.4f}\n")
else:
    print("\nトークナイザまたはモデルがロードされなかったため、処理を実行できませんでした。")

トークナイザ 'bert-base-uncased' とモデル 'bert-base-uncased' をロードしました。

各文の[CLS]トークンベクトルを抽出します...
  文: "The movie was full of fun." -> CLSベクトル抽出完了 (形状: torch.Size([768]))
  文: "The movie was full of excitement." -> CLSベクトル抽出完了 (形状: torch.Size([768]))
  文: "The movie was full of crap." -> CLSベクトル抽出完了 (形状: torch.Size([768]))
  文: "The movie was full of rubbish." -> CLSベクトル抽出完了 (形状: torch.Size([768]))

文ペア間のコサイン類似度:
  文1: "The movie was full of fun."
  文2: "The movie was full of excitement."
  コサイン類似度: 0.9881

  文1: "The movie was full of fun."
  文2: "The movie was full of crap."
  コサイン類似度: 0.9558

  文1: "The movie was full of fun."
  文2: "The movie was full of rubbish."
  コサイン類似度: 0.9475

  文1: "The movie was full of excitement."
  文2: "The movie was full of crap."
  コサイン類似度: 0.9541

  文1: "The movie was full of excitement."
  文2: "The movie was full of rubbish."
  コサイン類似度: 0.9487

  文1: "The movie was full of crap."
  文2: "The movie was full of rubbish."
  コサイン類似度: 0.9807



## 84. 平均による文ベクトル

以下の文の全ての組み合わせに対して、最終層の埋め込みベクトルの平均を用いてコサイン類似度を求めよ。

- "The movie was full of fun."
- "The movie was full of excitement."
- "The movie was full of crap."
- "The movie was full of rubbish."

In [5]:
import torch
import torch.nn.functional as F
from transformers import AutoTokenizer, AutoModel
from itertools import combinations

# 1. トークナイザとモデルのロード (問題83でロード済みであれば再利用)
# tokenizer_name_avg = "bert-base-uncased" 
# model_name_avg = "bert-base-uncased"

# tokenizer_cls と model_cls が問題83から引き継がれているか確認
if 'tokenizer_cls' not in locals() or 'model_cls' not in locals() or \
   tokenizer_cls is None or model_cls is None:
    print("エラー: 'tokenizer_cls' または 'model_cls' が定義されていません。")
    print("問題83を先に実行して、トークナイザとモデルをロードしてください。")
    # 処理を中断するための措置
    raise NameError("Tokenizer or Model not loaded from Problem 83.")
else:
    print(f"問題83でロード済みのトークナイザ '{tokenizer_cls.name_or_path}' とモデルを使用します。")
    # 変数名をこの問題用に変えるか、そのまま tokenizer_cls, model_cls を使用
    tokenizer_avg = tokenizer_cls
    model_avg = model_cls
    model_avg.eval() # 念のため評価モードに

def get_mean_pooling_vector(text, tokenizer, model):
    """テキストからBERTの最終層の全トークンベクトルの平均 (attention_mask利用) を取得する関数"""
    if not tokenizer or not model:
        return None
        
    # padding=True, truncation=True を指定して、バッチ処理に対応できるようにし、
    # かつ長すぎるシーケンスを切り詰める
    inputs = tokenizer(text, return_tensors="pt", add_special_tokens=True, 
                       padding=True, truncation=True, max_length=512)
    
    attention_mask = inputs['attention_mask'] # (バッチサイズ, シーケンス長)
    
    with torch.no_grad():
        outputs = model(**inputs)
        last_hidden_states = outputs.last_hidden_state # (バッチサイズ, シーケンス長, 隠れ層次元数)
    
    # attention_mask を使って、パディングされていないトークンのベクトルのみを対象に平均を取る
    # [CLS] や [SEP] も含めて平均を取るか、除外するかは設計による。
    # ここでは、attention_maskが1の部分（非パディング、特殊トークン含む）の平均を取る。
    # より厳密に [CLS], [SEP] を除きたい場合は、input_idsを調べてマスクを調整する。
    
    # attention_maskを (バッチサイズ, シーケンス長, 1) に拡張してブロードキャスト可能にする
    input_mask_expanded = attention_mask.unsqueeze(-1).expand(last_hidden_states.size()).float()
    
    # マスクされた部分のベクトルを0にする (実際にはsumを取るので影響は限定的だが、明示的)
    sum_embeddings = torch.sum(last_hidden_states * input_mask_expanded, 1) # シーケンス長次元で合計
    
    # マスクされた部分を除いた実際のトークン数をカウント (各事例ごと)
    sum_mask = input_mask_expanded.sum(1) # (バッチサイズ, 1)
    sum_mask = torch.clamp(sum_mask, min=1e-9) # ゼロ除算を避ける
    
    mean_pooled_vector = sum_embeddings / sum_mask
    
    # 今回は1文ずつ処理するので、バッチサイズは1
    return mean_pooled_vector[0] # (隠れ層次元数) の形状のテンソルを返す

if tokenizer_avg and model_avg:
    # 3. 対象の文リスト (問題83と同じ)
    sentences_avg = [
        "The movie was full of fun.",
        "The movie was full of excitement.",
        "The movie was full of crap.",
        "The movie was full of rubbish."
    ]

    print("\n各文の最終層平均ベクトルを抽出します...")
    sentence_vectors_avg = {}
    for sentence in sentences_avg:
        mean_vec = get_mean_pooling_vector(sentence, tokenizer_avg, model_avg)
        if mean_vec is not None:
            sentence_vectors_avg[sentence] = mean_vec
            print(f"  文: \"{sentence}\" -> 平均ベクトル抽出完了 (形状: {mean_vec.shape})")
        else:
            print(f"  文: \"{sentence}\" -> 平均ベクトル抽出失敗")

    # 4. 各文ペアに対するコサイン類似度の計算
    print("\n文ペア間のコサイン類似度 (平均ベクトル使用):")
    valid_sentences_avg = list(sentence_vectors_avg.keys())

    for sent1, sent2 in combinations(valid_sentences_avg, 2):
        if sent1 in sentence_vectors_avg and sent2 in sentence_vectors_avg:
            vec1 = sentence_vectors_avg[sent1].unsqueeze(0)
            vec2 = sentence_vectors_avg[sent2].unsqueeze(0)
            
            similarity = F.cosine_similarity(vec1, vec2, dim=1)
            
            print(f"  文1: \"{sent1}\"")
            print(f"  文2: \"{sent2}\"")
            print(f"  コサイン類似度: {similarity.item():.4f}\n")
else:
    print("\nトークナイザまたはモデルがロードされなかったため、処理を実行できませんでした。")

問題83でロード済みのトークナイザ 'bert-base-uncased' とモデルを使用します。

各文の最終層平均ベクトルを抽出します...
  文: "The movie was full of fun." -> 平均ベクトル抽出完了 (形状: torch.Size([768]))
  文: "The movie was full of excitement." -> 平均ベクトル抽出完了 (形状: torch.Size([768]))
  文: "The movie was full of crap." -> 平均ベクトル抽出完了 (形状: torch.Size([768]))
  文: "The movie was full of rubbish." -> 平均ベクトル抽出完了 (形状: torch.Size([768]))

文ペア間のコサイン類似度 (平均ベクトル使用):
  文1: "The movie was full of fun."
  文2: "The movie was full of excitement."
  コサイン類似度: 0.9568

  文1: "The movie was full of fun."
  文2: "The movie was full of crap."
  コサイン類似度: 0.8490

  文1: "The movie was full of fun."
  文2: "The movie was full of rubbish."
  コサイン類似度: 0.8169

  文1: "The movie was full of excitement."
  文2: "The movie was full of crap."
  コサイン類似度: 0.8352

  文1: "The movie was full of excitement."
  文2: "The movie was full of rubbish."
  コサイン類似度: 0.7938

  文1: "The movie was full of crap."
  文2: "The movie was full of rubbish."
  コサイン類似度: 0.9226



## 85. データセットの準備

[General Language Understanding Evaluation (GLUE)](https://gluebenchmark.com/) ベンチマークで配布されている[Stanford Sentiment Treebank (SST)](https://dl.fbaipublicfiles.com/glue/data/SST-2.zip) から訓練セット（train.tsv）と開発セット（dev.tsv）のテキストと極性ラベルと読み込み、さらに全てのテキストはトークン列に変換せよ。

In [6]:
import pandas as pd
import torch
from transformers import AutoTokenizer
import os
# requests と zipfile, io は問題60でダウンロード・展開が済んでいれば不要な場合もある

# --- 前提となる変数 (問題80から) ---
# tokenizer_name = "bert-base-uncased" # 問題80で使用したトークナイザ名
# tokenizer: 問題80でロードした AutoTokenizer のインスタンス

# トークナイザがロードされているか確認
if 'tokenizer_cls' in locals() and tokenizer_cls is not None: # 問題83/84のものを流用
    tokenizer = tokenizer_cls
    print(f"問題83/84でロード済みのトークナイザ '{tokenizer.name_or_path}' を使用します。")
elif 'tokenizer_mlm' in locals() and tokenizer_mlm is not None: # 問題81/82のものを流用
    tokenizer = tokenizer_mlm
    print(f"問題81/82でロード済みのトークナイザ '{tokenizer.name_or_path}' を使用します。")
elif 'tokenizer' in locals() and tokenizer is not None: # 問題80のものを流用
    print(f"問題80でロード済みのトークナイザ '{tokenizer.name_or_path}' を使用します。")
else:
    print("エラー: BERTトークナイザがロードされていません。問題80を先に実行してください。")
    tokenizer_name = "bert-base-uncased" # フォールバックとして定義
    try:
        tokenizer = AutoTokenizer.from_pretrained(tokenizer_name)
        print(f"フォールバックとしてトークナイザ '{tokenizer_name}' を新規にロードしました。")
    except Exception as e:
        print(f"フォールバックのトークナイザロード中にエラー: {e}")
        raise NameError("BERT Tokenizer is not available.")


# --- SST-2データセットのパス設定 (問題60, 71と同様) ---
# 問題60でSST-2データを展開した親ディレクトリへのパスを想定
base_data_dir_ch9 = '../data/SST-2_data' 
train_file_path_ch9 = os.path.join(base_data_dir_ch9, "SST-2/train.tsv")
dev_file_path_ch9 = os.path.join(base_data_dir_ch9, "SST-2/dev.tsv")

# データセットが存在するか確認 (問題60でダウンロード・展開済みのはず)
if not (os.path.exists(train_file_path_ch9) and os.path.exists(dev_file_path_ch9)):
    print(f"エラー: {train_file_path_ch9} または {dev_file_path_ch9} が見つかりません。")
    print("問題60を先に実行して、SST-2データセットをダウンロード・展開してください。")
    # 処理を中断
    raise FileNotFoundError("SST-2 dataset files not found.")

# --- ここから問題85の処理 ---
MAX_LENGTH = 128 # BERTに入力する最大トークン長 (適宜調整)

def process_data_for_bert(file_path, tokenizer_bert, max_len, dataset_name="データセット"):
    """SST-2データを読み込み、BERT用にトークン化・整形する関数"""
    try:
        df = pd.read_csv(file_path, sep='\t')
        print(f"\n--- {dataset_name} ({os.path.basename(file_path)}) ---")
        print(f"元の事例数: {len(df)}")
    except FileNotFoundError:
        print(f"エラー: ファイル '{file_path}' が見つかりません。")
        return None
    except Exception as e:
        print(f"ファイル '{file_path}' の読み込み中にエラー: {e}")
        return None

    # BERTトークナイザはテキストのリストを直接扱える
    texts = df['sentence'].tolist()
    labels = df['label'].tolist()

    # トークン化 (paddingとtruncationを同時に行う)
    # tokenizer() は辞書を返す。キー: 'input_ids', 'token_type_ids', 'attention_mask'
    print(f"{dataset_name}のテキストをトークン化しています (max_length={max_len})...")
    encodings = tokenizer(texts, padding=True, truncation=True, max_length=max_len, return_tensors="pt")
    
    # PyTorchのDatasetで扱いやすいように、各要素を辞書とするリストに変換することもできるし、
    # この encodings と labels を使って直接Datasetオブジェクトを作ることもできる。
    # ここでは、問題文の指示「全てのテキストはトークン列に変換せよ」に従い、
    # encodings['input_ids'] を主に見る。
    # labels もテンソルに変換しておく。
    
    dataset_for_bert = {
        'input_ids': encodings['input_ids'],
        'attention_mask': encodings['attention_mask'],
        'labels': torch.tensor(labels) # ラベルをテンソルに
    }
    # Token Type IDs はSST-2のような単一文入力タスクでは通常不要なので、含めなくても良い場合が多い
    if 'token_type_ids' in encodings:
         dataset_for_bert['token_type_ids'] = encodings['token_type_ids']

    print(f"{dataset_name}の処理完了。")
    print(f"  input_ids の形状: {dataset_for_bert['input_ids'].shape}") # (事例数, max_length)
    print(f"  attention_mask の形状: {dataset_for_bert['attention_mask'].shape}")
    print(f"  labels の形状: {dataset_for_bert['labels'].shape}")
    
    return dataset_for_bert


# 訓練データの処理
train_bert_dataset = process_data_for_bert(train_file_path_ch9, tokenizer, MAX_LENGTH, "訓練データ")

# 開発（検証）データの処理
dev_bert_dataset = process_data_for_bert(dev_file_path_ch9, tokenizer, MAX_LENGTH, "開発データ")

# 最初の数件のinput_idsと対応するテキスト、ラベルを表示して確認 (任意)
if train_bert_dataset:
    print("\n訓練データの最初の事例（トークンIDとデコード結果）:")
    idx_to_show = 0
    sample_input_ids = train_bert_dataset['input_ids'][idx_to_show]
    sample_label = train_bert_dataset['labels'][idx_to_show]
    
    # 元のテキストを取得する部分をf-stringの外に出す
    # df_train_temp はこのブロック内でのみ使用する一時的なDataFrame
    try:
        df_train_temp = pd.read_csv(train_file_path_ch9, sep='\t')
        original_text_from_df = df_train_temp.iloc[idx_to_show]['sentence']
    except Exception as e:
        original_text_from_df = f"(元のテキストの読み込みに失敗: {e})"

    print(f"  Input IDs: {sample_input_ids.tolist()}")
    print(f"  Decoded Text: {tokenizer.decode(sample_input_ids, skip_special_tokens=True)}")
    # ★★★ 修正箇所 ★★★
    print(f"  Original Text (from df): {original_text_from_df}") 
    print(f"  Label: {sample_label.item()}")

# これで train_bert_dataset と dev_bert_dataset に
# BERTモデルでのファインチューニングに適した形式のデータが格納されました。
# これらは後の問題で使用します。

問題83/84でロード済みのトークナイザ 'bert-base-uncased' を使用します。

--- 訓練データ (train.tsv) ---
元の事例数: 67349
訓練データのテキストをトークン化しています (max_length=128)...
訓練データの処理完了。
  input_ids の形状: torch.Size([67349, 66])
  attention_mask の形状: torch.Size([67349, 66])
  labels の形状: torch.Size([67349])

--- 開発データ (dev.tsv) ---
元の事例数: 872
開発データのテキストをトークン化しています (max_length=128)...
開発データの処理完了。
  input_ids の形状: torch.Size([872, 55])
  attention_mask の形状: torch.Size([872, 55])
  labels の形状: torch.Size([872])

訓練データの最初の事例（トークンIDとデコード結果）:
  Input IDs: [101, 5342, 2047, 3595, 8496, 2013, 1996, 18643, 3197, 102, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
  Decoded Text: hide new secretions from the parental units
  Original Text (from df): hide new secretions from the parental units 
  Label: 0


## 86. ミニバッチの作成

85で読み込んだ訓練データの一部（例えば冒頭の4事例）に対して、パディングなどの処理を行い、トークン列の長さを揃えてミニバッチを構成せよ。

In [7]:
import torch
from transformers import AutoTokenizer # tokenizerがロード済みであることを期待

# --- 前提となる変数 (問題85から) ---
# train_bert_dataset: 問題85で作成した、トークン化・パディング済みの訓練データ全体を格納した辞書
#                     {'input_ids': tensor, 'attention_mask': tensor, 'labels': tensor}
# tokenizer: 問題80 (または83, 85) でロードした AutoTokenizer のインスタンス
# MAX_LENGTH: 問題85で使用した最大シーケンス長

# 前提変数が存在するか確認
if 'train_bert_dataset' not in locals() or not train_bert_dataset or \
   'tokenizer' not in locals() or tokenizer is None or \
   'MAX_LENGTH' not in locals():
    print("エラー: 必要な変数 ('train_bert_dataset', 'tokenizer', 'MAX_LENGTH') が定義されていません。")
    print("問題85を先に実行して、これらの変数を準備してください。")
    raise NameError("Required variables from Problem 85 are not defined.")

# --- ここから問題86の処理 ---

num_samples_to_show = 4 # 問題文の指示「冒頭の4事例」

if len(train_bert_dataset['input_ids']) < num_samples_to_show:
    print(f"警告: 訓練データには{num_samples_to_show}件未満の事例しかありません。表示件数を調整します。")
    num_samples_to_show = len(train_bert_dataset['input_ids'])

if num_samples_to_show > 0:
    print(f"訓練データの最初の{num_samples_to_show}事例から構成されるミニバッチ（の一部）を表示します。")
    print(f"(最大シーケンス長 MAX_LENGTH = {MAX_LENGTH} にパディング/切り詰めされています)")

    # 冒頭4事例を抽出
    sample_input_ids = train_bert_dataset['input_ids'][:num_samples_to_show]
    sample_attention_mask = train_bert_dataset['attention_mask'][:num_samples_to_show]
    sample_labels = train_bert_dataset['labels'][:num_samples_to_show]

    print("\nInput IDs (最初の4事例):")
    print(sample_input_ids)
    print(f"形状: {sample_input_ids.shape}") # 期待: (4, MAX_LENGTH)

    print("\nAttention Mask (最初の4事例):")
    print(sample_attention_mask)
    print(f"形状: {sample_attention_mask.shape}") # 期待: (4, MAX_LENGTH)
    
    print("\nLabels (最初の4事例):")
    print(sample_labels)
    print(f"形状: {sample_labels.shape}") # 期待: (4) または (4,1)など

    print("\n各事例のデコード結果 (パディングと特殊トークン含む):")
    for i in range(num_samples_to_show):
        ids = sample_input_ids[i]
        # パディング部分を除いた実際のトークンを見るためにattention_maskを利用
        actual_tokens_ids = ids[sample_attention_mask[i] == 1] 
        
        print(f"事例 {i+1}:")
        print(f"  Input IDs (raw): {ids.tolist()}")
        print(f"  Decoded (with special, incl. padding if not skipped by decode): {tokenizer.decode(ids)}")
        print(f"  Decoded (actual tokens, skip special): {tokenizer.decode(actual_tokens_ids, skip_special_tokens=True)}")
        print(f"  Label: {sample_labels[i].item()}")
else:
    print("表示する訓練データがありません。")

訓練データの最初の4事例から構成されるミニバッチ（の一部）を表示します。
(最大シーケンス長 MAX_LENGTH = 128 にパディング/切り詰めされています)

Input IDs (最初の4事例):
tensor([[  101,  5342,  2047,  3595,  8496,  2013,  1996, 18643,  3197,   102,
             0,     0,     0,     0,     0,     0,     0,     0,     0,     0,
             0,     0,     0,     0,     0,     0,     0,     0,     0,     0,
             0,     0,     0,     0,     0,     0,     0,     0,     0,     0,
             0,     0,     0,     0,     0,     0,     0,     0,     0,     0,
             0,     0,     0,     0,     0,     0,     0,     0,     0,     0,
             0,     0,     0,     0,     0,     0],
        [  101,  3397,  2053, 15966,  1010,  2069,  4450,  2098, 18201,  2015,
           102,     0,     0,     0,     0,     0,     0,     0,     0,     0,
             0,     0,     0,     0,     0,     0,     0,     0,     0,     0,
             0,     0,     0,     0,     0,     0,     0,     0,     0,     0,
             0,     0,     0,     0,     0,     0,    

## 87. ファインチューニング

訓練セットを用い、事前学習済みモデルを極性分析タスク向けにファインチューニングせよ。検証セット上でファインチューニングされたモデルの正解率を計測せよ。

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader # DataLoaderも一緒にインポートしておくと良い
from torch.utils.data import DataLoader, TensorDataset # TensorDataset を使う場合
from transformers import AutoTokenizer, AutoModelForSequenceClassification, get_linear_schedule_with_warmup
from sklearn.metrics import accuracy_score
import time
import os
import pandas as pd # データ構造の確認や、万が一の再読み込み用

# --- 前提となる変数・データ (問題80, 85から) ---
# tokenizer: 問題80または85でロードした AutoTokenizer のインスタンス
# train_bert_dataset: 問題85で作成した訓練データ {'input_ids': tensor, 'attention_mask': tensor, 'labels': tensor}
# dev_bert_dataset: 問題85で作成した検証データ {'input_ids': tensor, 'attention_mask': tensor, 'labels': tensor}
# MAX_LENGTH: 問題85で使用した最大シーケンス長

# --- 変数・クラスが現在のセッションに存在するか確認 ---
required_vars_p87 = ['tokenizer', 'train_bert_dataset', 'dev_bert_dataset', 'MAX_LENGTH']
for var_name in required_vars_p87:
    if var_name not in locals() or locals()[var_name] is None:
        print(f"エラー: 前提となる変数 '{var_name}' が定義されていないかNoneです。")
        print("問題80および85を先に実行してください。")
        raise NameError(f"Variable '{var_name}' is not defined or None.")

# --- ここから問題87の処理 ---

# 1. モデルの準備
model_name_finetune = "bert-base-uncased" # または他の適切なBERTモデル名
try:
    # num_labels=2 で2値分類用のヘッドを持つモデルをロード
    model_ft = AutoModelForSequenceClassification.from_pretrained(model_name_finetune, num_labels=2)
    print(f"事前学習済みモデル '{model_name_finetune}' (分類用) をロードしました。")
except Exception as e:
    print(f"モデル '{model_name_finetune}' のロード中にエラーが発生しました: {e}")
    raise

# 2. データセットとデータローダーの準備
# 問題85で train_bert_dataset と dev_bert_dataset は既に必要なキーを持つ辞書として作成済み
# これらをPyTorchのDatasetオブジェクトに変換する
class BertSST2Dataset(Dataset):
    def __init__(self, encodings, labels):
        self.encodings = encodings # トークナイザからの出力辞書 {'input_ids', 'attention_mask', ...}
        self.labels = labels     # ラベルのテンソル

    def __getitem__(self, idx):
        item = {key: val[idx].clone().detach() for key, val in self.encodings.items()}
        item['labels'] = self.labels[idx].clone().detach()
        return item

    def __len__(self):
        return len(self.labels)

train_dataset_ft = BertSST2Dataset(
    {'input_ids': train_bert_dataset['input_ids'], 'attention_mask': train_bert_dataset['attention_mask']}, 
    train_bert_dataset['labels']
)
dev_dataset_ft = BertSST2Dataset(
    {'input_ids': dev_bert_dataset['input_ids'], 'attention_mask': dev_bert_dataset['attention_mask']},
    dev_bert_dataset['labels']
)

batch_size_ft = 16 # ファインチューニング時のバッチサイズ (GPUメモリに応じて調整)
train_dataloader_ft = DataLoader(train_dataset_ft, batch_size=batch_size_ft, shuffle=True)
dev_dataloader_ft = DataLoader(dev_dataset_ft, batch_size=batch_size_ft, shuffle=False)


# 3. 学習の準備
# GPUの利用設定
if torch.cuda.is_available():
    device = torch.device("cuda")
    print(f"\nGPU ({torch.cuda.get_device_name(0)}) を使用します。")
else:
    device = torch.device("cpu")
    print("\nGPUが利用できません。CPUを使用します。")

model_ft.to(device) # モデルをGPUへ

# 最適化アルゴリズムと学習率スケジューラ
optimizer_ft = optim.AdamW(model_ft.parameters(), lr=2e-5) # AdamWと小さめの学習率が一般的
num_epochs_ft = 3 # ファインチューニングのエポック数 (通常は少なめ)
total_steps = len(train_dataloader_ft) * num_epochs_ft
scheduler = get_linear_schedule_with_warmup(optimizer_ft, 
                                            num_warmup_steps=0, # ウォームアップステップ数 (0も可)
                                            num_training_steps=total_steps)
# 損失関数はモデル内部で計算されることが多い (CrossEntropyLoss相当)
# もしモデルがロジットのみを返し、ラベルが [0, 1] の整数なら以下のように自分で定義も可能
# criterion_ft = nn.CrossEntropyLoss()


print(f"\nファインチューニング (バッチサイズ={batch_size_ft}, エポック数={num_epochs_ft}) を開始します...")
start_time_ft = time.time()

# 4. ファインチューニング（学習ループ）
for epoch in range(num_epochs_ft):
    print(f"\n--- Epoch {epoch+1}/{num_epochs_ft} ---")
    model_ft.train() # モデルを学習モードに
    total_train_loss = 0
    
    for batch_idx, batch in enumerate(train_dataloader_ft):
        optimizer_ft.zero_grad()
        
        # データをGPUへ
        input_ids = batch['input_ids'].to(device)
        attention_mask = batch['attention_mask'].to(device)
        labels = batch['labels'].to(device)
        
        # モデルのフォワードパス
        # AutoModelForSequenceClassification は通常、損失も一緒に出力する
        outputs = model_ft(input_ids, attention_mask=attention_mask, labels=labels)
        
        loss = outputs.loss # モデル出力から損失を取得
        total_train_loss += loss.item()
        
        loss.backward()
        optimizer_ft.step()
        scheduler.step() # 学習率を更新
        
        if (batch_idx + 1) % (len(train_dataloader_ft) // 10) == 0 or (batch_idx + 1) == len(train_dataloader_ft): # 約10%ごとと最後に表示
            print(f"  Batch [{batch_idx+1}/{len(train_dataloader_ft)}], Avg Train Loss so far: {total_train_loss / (batch_idx+1):.4f}")

    avg_train_loss = total_train_loss / len(train_dataloader_ft)
    print(f"Epoch [{epoch+1}/{num_epochs_ft}] 完了, 平均訓練損失: {avg_train_loss:.4f}")

    # エポックごとに検証データで評価
    model_ft.eval() # モデルを評価モードに
    total_eval_accuracy = 0
    total_eval_loss = 0
    
    with torch.no_grad():
        for batch in dev_dataloader_ft:
            input_ids = batch['input_ids'].to(device)
            attention_mask = batch['attention_mask'].to(device)
            labels = batch['labels'].to(device)
            
            outputs = model_ft(input_ids, attention_mask=attention_mask, labels=labels)
            
            loss = outputs.loss
            total_eval_loss += loss.item()
            
            logits = outputs.logits
            predictions = torch.argmax(logits, dim=-1)
            total_eval_accuracy += accuracy_score(labels.cpu().numpy(), predictions.cpu().numpy()) * labels.size(0) # バッチ内の正解数を加算

    avg_val_accuracy = total_eval_accuracy / len(dev_dataset_ft)
    avg_val_loss = total_eval_loss / len(dev_dataloader_ft)
    print(f"  Epoch [{epoch+1}/{num_epochs_ft}], 検証データ: 平均損失={avg_val_loss:.4f}, 正解率={avg_val_accuracy:.4f}")

end_time_ft = time.time()
print(f"\nファインチューニングが完了しました。所要時間: {end_time_ft - start_time_ft:.2f} 秒")

# 5. 最終評価 (学習完了後のモデルで再度検証セットの正解率を計測)
print("\n--- 最終評価 (開発セット、ファインチューニング後) ---")
model_ft.eval()
final_predictions = []
final_true_labels = []
with torch.no_grad():
    for batch in dev_dataloader_ft:
        input_ids = batch['input_ids'].to(device)
        attention_mask = batch['attention_mask'].to(device)
        labels = batch['labels'].to(device)
        
        outputs = model_ft(input_ids, attention_mask=attention_mask) # ラベルは渡さない
        logits = outputs.logits
        predictions = torch.argmax(logits, dim=-1)
        
        final_predictions.extend(predictions.cpu().numpy())
        final_true_labels.extend(labels.cpu().numpy())

final_accuracy = accuracy_score(final_true_labels, final_predictions)
print(f"開発セットにおける最終正解率 (ファインチューニング後): {final_accuracy:.4f}")
print(f"  正解した事例数: {int(final_accuracy * len(final_true_labels))}")
print(f"  総事例数: {len(final_true_labels)}")

Some weights of BertForSequenceClassification were not initialized from the model checkpoint at bert-base-uncased and are newly initialized: ['classifier.bias', 'classifier.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


事前学習済みモデル 'bert-base-uncased' (分類用) をロードしました。

GPUが利用できません。CPUを使用します。

ファインチューニング (バッチサイズ=16, エポック数=3) を開始します...

--- Epoch 1/3 ---


## 88. 極性分析

問題87でファインチューニングされたモデルを用いて、以下の文の極性を予測せよ。

- "The movie was full of incomprehensibilities."
- "The movie was full of fun."
- "The movie was full of excitement."
- "The movie was full of crap."
- "The movie was full of rubbish."


In [None]:
import torch
import torch.nn.functional as F # ソフトマックス用 (任意)
from transformers import AutoTokenizer, AutoModelForSequenceClassification # 再ロードする場合

# --- 前提となる変数 (問題87から) ---
# model_ft: 問題87でファインチューニング済みのモデル
# tokenizer: 問題80または85でロードしたトークナイザ
# device: 問題87で使用したデバイス ('cuda' または 'cpu')
# MAX_LENGTH: 問題85で使用した最大シーケンス長 (トークナイズ時に使用)


# --- 変数・クラスが現在のセッションに存在するか確認 ---
if 'model_ft' not in locals() or model_ft is None:
    print("エラー: ファインチューニング済みモデル 'model_ft' が定義されていません。")
    print("問題87を先に実行して、モデルを準備（学習またはロード）してください。")
    raise NameError("Fine-tuned model 'model_ft' is not defined.")

if 'tokenizer' not in locals() or tokenizer is None:
    print("エラー: トークナイザ 'tokenizer' が定義されていません。")
    print("問題80または85を先に実行して、トークナイザを準備してください。")
    raise NameError("Tokenizer is not defined.")

if 'device' not in locals() or device is None:
    print("エラー: デバイス 'device' が定義されていません。問題87のデバイス設定を確認してください。")
    # フォールバックとしてCPUを設定する例 (GPUが推奨)
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    print(f"フォールバックデバイスを '{device}' に設定しました。")

if 'MAX_LENGTH' not in locals() or MAX_LENGTH is None:
    print("エラー: 'MAX_LENGTH' が定義されていません。問題85のMAX_LENGTH設定を確認してください。")
    MAX_LENGTH = 128 # フォールバック値
    print(f"フォールバックとして MAX_LENGTH を {MAX_LENGTH} に設定しました。")

# --- ここから問題88の処理 ---

# 2. 予測対象の文リスト
sentences_to_predict = [
    "The movie was full of incomprehensibilities.",
    "The movie was full of fun.",
    "The movie was full of excitement.",
    "The movie was full of crap.",
    "The movie was full of rubbish."
]

# ラベルのマッピング (0: ネガティブ, 1: ポジティブ)
label_map = {0: "ネガティブ", 1: "ポジティブ"}

print("\n指定された文の極性分析を行います...")

# 3. 各文に対する極性予測
model_ft.eval() # モデルを評価モードに
with torch.no_grad(): # 勾配計算を無効に
    for i, sentence in enumerate(sentences_to_predict):
        # テキストをトークン化
        inputs = tokenizer(sentence, return_tensors="pt", 
                           padding='max_length', # バッチ処理ではないが、一貫性のためmax_lengthにパディング
                           truncation=True, 
                           max_length=MAX_LENGTH)
        
        # データをモデルと同じデバイスに送る
        input_ids = inputs['input_ids'].to(device)
        attention_mask = inputs['attention_mask'].to(device)
        
        # モデルで予測
        outputs = model_ft(input_ids, attention_mask=attention_mask)
        logits = outputs.logits # (バッチサイズ, num_labels) -> (1, 2) の形状
        
        # 予測ラベルを取得 (最もスコアが高いクラスのインデックス)
        predicted_class_id = torch.argmax(logits, dim=-1).item()
        predicted_label_name = label_map.get(predicted_class_id, "不明なラベル")
        
        # (任意) 各クラスの確率を取得
        probabilities = F.softmax(logits, dim=-1)[0] # バッチサイズ1なので[0]で最初の要素を取得
        prob_negative = probabilities[0].item() # ラベル0がネガティブと仮定
        prob_positive = probabilities[1].item() # ラベル1がポジティブと仮定
        # 注意: model_ft.config.id2label や label2id で正確なマッピングを確認した方が良い場合もある
        
        print(f"\n文 {i+1}: \"{sentence}\"")
        print(f"  予測極性: {predicted_label_name} (ID: {predicted_class_id})")
        print(f"  確率 (ネガティブ): {prob_negative:.4f}")
        print(f"  確率 (ポジティブ): {prob_positive:.4f}")

## 89. アーキテクチャの変更

問題87とは異なるアーキテクチャ（例えば[CLS]トークンを用いるか、各トークンの最大値プーリングを用いるなど）の分類モデルを設計し、事前学習済みモデルを極性分析タスク向けにファインチューニングせよ。検証セット上でファインチューニングされたモデルの正解率を計測せよ。

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset
from transformers import AutoTokenizer, AutoModel, get_linear_schedule_with_warmup # AutoModelを使用
from sklearn.metrics import accuracy_score
import time
import os
import pandas as pd

# --- 前提となる変数・クラス・関数の定義 (問題87と同様) ---
# tokenizer, train_bert_dataset, dev_bert_dataset, MAX_LENGTH
# BertSST2Dataset クラス

# --- 変数・クラスが現在のセッションに存在するか確認 ---
required_vars_p89 = ['tokenizer', 'train_bert_dataset', 'dev_bert_dataset', 'MAX_LENGTH']
for var_name in required_vars_p89:
    if var_name not in locals() or locals()[var_name] is None:
        print(f"エラー: 前提となる変数 '{var_name}' が定義されていません。")
        raise NameError(f"Variable '{var_name}' is not defined or None.")

if 'BertSST2Dataset' not in locals():
        print(f"エラー: 前提となるクラス 'BertSST2Dataset' が定義されていません。")
        raise NameError(f"Class 'BertSST2Dataset' is not defined.")

# --- ここから問題89の処理 ---

# 1. 新しいアーキテクチャのモデルクラス定義 (平均プーリング)
class BertMeanPoolingClassifier(nn.Module):
    def __init__(self, model_name, num_labels, dropout_rate=0.1):
        super(BertMeanPoolingClassifier, self).__init__()
        # 事前学習済みBERTモデルをロード (分類ヘッドなし)
        self.bert = AutoModel.from_pretrained(model_name)
        # BERTの隠れ層サイズを取得 (例: bert-base-uncased なら 768)
        self.bert_hidden_size = self.bert.config.hidden_size 
        
        self.dropout = nn.Dropout(dropout_rate)
        # 分類用の線形層
        self.classifier = nn.Linear(self.bert_hidden_size, num_labels)

    def forward(self, input_ids, attention_mask):
        # BERTモデルから最終層の隠れ状態を取得
        outputs = self.bert(input_ids=input_ids, attention_mask=attention_mask)
        last_hidden_state = outputs.last_hidden_state # (batch_size, seq_len, hidden_size)
        
        # 平均プーリング (attention_mask を利用してパディングを無視)
        input_mask_expanded = attention_mask.unsqueeze(-1).expand(last_hidden_state.size()).float()
        sum_embeddings = torch.sum(last_hidden_state * input_mask_expanded, 1)
        sum_mask = torch.clamp(input_mask_expanded.sum(1), min=1e-9) # ゼロ除算を避ける
        mean_pooled_output = sum_embeddings / sum_mask
        # mean_pooled_output: (batch_size, hidden_size)
        
        # ドロップアウトと分類
        pooled_output_dropout = self.dropout(mean_pooled_output)
        logits = self.classifier(pooled_output_dropout) # (batch_size, num_labels)
        
        return logits

# --- モデルと学習の準備 ---
model_name_p89 = "bert-base-uncased" # 問題87と同じ事前学習モデルを使用
num_labels_p89 = 2 # ポジネガ2値分類

# デバイス設定
if torch.cuda.is_available():
    device = torch.device("cuda")
    print(f"GPU ({torch.cuda.get_device_name(0)}) を使用します。")
else:
    device = torch.device("cpu")
    print("GPUが利用できません。CPUを使用します。")

# モデルのインスタンス化とデバイスへの転送
model_meanpool = BertMeanPoolingClassifier(model_name_p89, num_labels_p89)
model_meanpool.to(device)
print(f"\n平均プーリングを用いたBERT分類モデル '{model_name_p89}' をロードし、デバイスに転送しました。")
print(model_meanpool)


# データローダーの準備 (問題87と同様)
# BertSST2Dataset は {key: val[idx]} を返すので、モデルの入力に合わせて調整が必要な場合がある
# BertMeanPoolingClassifier は input_ids と attention_mask を想定
class BertInputDataset(Dataset): # BertSST2Dataset を少し変更
    def __init__(self, encodings_dict, labels_tensor):
        self.input_ids = encodings_dict['input_ids']
        self.attention_mask = encodings_dict['attention_mask']
        # token_type_ids があればそれも
        self.token_type_ids = encodings_dict.get('token_type_ids', None) # なければNone
        self.labels = labels_tensor

    def __getitem__(self, idx):
        item = {
            'input_ids': self.input_ids[idx].clone().detach(),
            'attention_mask': self.attention_mask[idx].clone().detach()
        }
        if self.token_type_ids is not None:
             item['token_type_ids'] = self.token_type_ids[idx].clone().detach()
        item['labels'] = self.labels[idx].clone().detach()
        return item

    def __len__(self):
        return len(self.labels)

train_dataset_mp = BertInputDataset(
    {'input_ids': train_bert_dataset['input_ids'], 
     'attention_mask': train_bert_dataset['attention_mask'],
     'token_type_ids': train_bert_dataset.get('token_type_ids', None)}, # token_type_idsも渡す
    train_bert_dataset['labels']
)
dev_dataset_mp = BertInputDataset(
    {'input_ids': dev_bert_dataset['input_ids'], 
     'attention_mask': dev_bert_dataset['attention_mask'],
     'token_type_ids': dev_bert_dataset.get('token_type_ids', None)},
    dev_bert_dataset['labels']
)

batch_size_mp = 16
train_dataloader_mp = DataLoader(train_dataset_mp, batch_size=batch_size_mp, shuffle=True)
dev_dataloader_mp = DataLoader(dev_dataset_mp, batch_size=batch_size_mp, shuffle=False)


# 学習パラメータ
learning_rate_mp = 2e-5
num_epochs_mp = 3 # 問題87と同程度のエポック数で比較

# 損失関数 (クラス数が2なのでCrossEntropyLossが一般的)
# モデルの出力ロジットは (batch_size, num_labels) の形状になる
criterion_mp = nn.CrossEntropyLoss() 
optimizer_mp = optim.AdamW(model_meanpool.parameters(), lr=learning_rate_mp)
total_steps_mp = len(train_dataloader_mp) * num_epochs_mp
scheduler_mp = get_linear_schedule_with_warmup(optimizer_mp, num_warmup_steps=0, num_training_steps=total_steps_mp)

print(f"\n平均プーリングモデルのファインチューニング (バッチサイズ={batch_size_mp}, エポック数={num_epochs_mp}) を開始します...")
start_time_mp = time.time()

# --- 学習ループ (問題87とほぼ同じだが、損失計算の仕方が少し異なる可能性) ---
for epoch in range(num_epochs_mp):
    print(f"\n--- Epoch {epoch+1}/{num_epochs_mp} ---")
    model_meanpool.train()
    total_train_loss = 0
    
    for batch_idx, batch in enumerate(train_dataloader_mp):
        optimizer_mp.zero_grad()
        
        input_ids = batch['input_ids'].to(device)
        attention_mask = batch['attention_mask'].to(device)
        labels = batch['labels'].to(device) # ラベルは (batch_size) の形状で、クラスインデックス (0 or 1)
        # token_type_ids は今回のモデルでは使わないが、渡せるようにしておく
        # token_type_ids = batch.get('token_type_ids', None)
        # if token_type_ids is not None:
        #     token_type_ids = token_type_ids.to(device)
        
        # logits = model_meanpool(input_ids=input_ids, attention_mask=attention_mask)
        # AutoModelは **kwargs を受け付けるので、token_type_ids も渡せる
        model_inputs = {'input_ids': input_ids, 'attention_mask': attention_mask}
        # if token_type_ids is not None: # BertMeanPoolingClassifier側で **kwargs を受け取らないので、直接渡せない
        #     model_inputs['token_type_ids'] = token_type_ids
        logits = model_meanpool(**model_inputs) # **model_inputs で展開して渡す

        loss = criterion_mp(logits, labels) # CrossEntropyLossはロジットと整数のラベルを期待
        total_train_loss += loss.item()
        
        loss.backward()
        optimizer_mp.step()
        scheduler_mp.step()
        
        if (batch_idx + 1) % (len(train_dataloader_mp) // 10) == 0 or (batch_idx + 1) == len(train_dataloader_mp):
            print(f"  Batch [{batch_idx+1}/{len(train_dataloader_mp)}], Avg Train Loss so far: {total_train_loss / (batch_idx+1):.4f}")

    avg_train_loss = total_train_loss / len(train_dataloader_mp)
    print(f"Epoch [{epoch+1}/{num_epochs_mp}] 完了, 平均訓練損失: {avg_train_loss:.4f}")

    # エポックごとに検証データで評価
    model_meanpool.eval()
    total_eval_accuracy = 0
    total_eval_loss = 0
    
    with torch.no_grad():
        for batch in dev_dataloader_mp:
            input_ids = batch['input_ids'].to(device)
            attention_mask = batch['attention_mask'].to(device)
            labels = batch['labels'].to(device)
            # token_type_ids = batch.get('token_type_ids', None)
            # if token_type_ids is not None:
            #     token_type_ids = token_type_ids.to(device)
            
            model_inputs_dev = {'input_ids': input_ids, 'attention_mask': attention_mask}
            # if token_type_ids is not None:
            #     model_inputs_dev['token_type_ids'] = token_type_ids
            logits = model_meanpool(**model_inputs_dev)
            
            loss = criterion_mp(logits, labels)
            total_eval_loss += loss.item()
            
            predictions = torch.argmax(logits, dim=-1) # (batch_size) の形状
            total_eval_accuracy += accuracy_score(labels.cpu().numpy(), predictions.cpu().numpy()) * labels.size(0)

    avg_val_accuracy = total_eval_accuracy / len(dev_dataset_mp)
    avg_val_loss = total_eval_loss / len(dev_dataloader_mp)
    print(f"  Epoch [{epoch+1}/{num_epochs_mp}], 検証データ: 平均損失={avg_val_loss:.4f}, 正解率={avg_val_accuracy:.4f}")

end_time_mp = time.time()
print(f"\n平均プーリングモデルのファインチューニングが完了しました。所要時間: {end_time_mp - start_time_mp:.2f} 秒")

# 5. 最終評価
print("\n--- 最終評価 (開発セット、平均プーリングモデル) ---")
model_meanpool.eval()
final_predictions_mp = []
final_true_labels_mp = []
with torch.no_grad():
    for batch in dev_dataloader_mp:
        input_ids = batch['input_ids'].to(device)
        attention_mask = batch['attention_mask'].to(device)
        labels = batch['labels'].to(device)
        # token_type_ids = batch.get('token_type_ids', None)
        # if token_type_ids is not None:
        #     token_type_ids = token_type_ids.to(device)

        model_inputs_final_eval = {'input_ids': input_ids, 'attention_mask': attention_mask}
        # if token_type_ids is not None:
        #     model_inputs_final_eval['token_type_ids'] = token_type_ids
        logits = model_meanpool(**model_inputs_final_eval)
        
        predictions = torch.argmax(logits, dim=-1)
        
        final_predictions_mp.extend(predictions.cpu().numpy())
        final_true_labels_mp.extend(labels.cpu().numpy())

final_accuracy_mp = accuracy_score(final_true_labels_mp, final_predictions_mp)
print(f"開発セットにおける最終正解率 (平均プーリングモデル): {final_accuracy_mp:.4f}")
print(f"  正解した事例数: {int(final_accuracy_mp * len(final_true_labels_mp))}")
print(f"  総事例数: {len(final_true_labels_mp)}")