# 7章
- 以下で実行するコードには確率的な処理が含まれていることがあり、コードの出力結果と本書に記載されている出力例が異なることがあります。

In [None]:
# 7-1
!mkdir chap7
%cd ./chap7

In [None]:
# 7-0. 必要ライブラリのインストール
!pip install fugashi unidic-lite

In [None]:
# 7-2
!pip install transformers fugashi ipadic pytorch-lightning

In [None]:
# 7-3
import random
import glob
import json
from tqdm import tqdm

import torch
from torch.utils.data import DataLoader
from transformers import BertJapaneseTokenizer, BertModel
import pytorch_lightning as pl

# 日本語の事前学習モデル
MODEL_NAME = 'tohoku-nlp/bert-base-japanese-whole-word-masking'

In [None]:
# 7-4
class BertForSequenceClassificationMultiLabel(torch.nn.Module):

    def __init__(self, model_name, num_labels):
        super().__init__()
        # BertModelのロード
        self.bert = BertModel.from_pretrained(model_name)
        # 線形変換を初期化しておく
        self.linear = torch.nn.Linear(
            self.bert.config.hidden_size, num_labels
        )

    def forward(
        self,
        input_ids=None,
        attention_mask=None,
        token_type_ids=None,
        labels=None
    ):
        # データを入力しBERTの最終層の出力を得る。
        bert_output = self.bert(
            input_ids=input_ids,
            attention_mask=attention_mask,
            token_type_ids=token_type_ids)
        last_hidden_state = bert_output.last_hidden_state

        # [PAD]以外のトークンで隠れ状態の平均をとる
        averaged_hidden_state = \
            (last_hidden_state*attention_mask.unsqueeze(-1)).sum(1) \
            / attention_mask.sum(1, keepdim=True)

        # 線形変換
        scores = self.linear(averaged_hidden_state)

        # 出力の形式を整える。
        output = {'logits': scores}

        # labelsが入力に含まれていたら、損失を計算し出力する。
        if labels is not None:
            loss = torch.nn.BCEWithLogitsLoss()(scores, labels.float())
            output['loss'] = loss

        # 属性でアクセスできるようにする。
        output = type('bert_output', (object,), output)

        return output

In [None]:
!pip install unidic-lite


In [None]:
# 7-5
tokenizer = BertJapaneseTokenizer.from_pretrained(MODEL_NAME)
bert_scml = BertForSequenceClassificationMultiLabel(
    MODEL_NAME, num_labels=2
)
bert_scml = bert_scml.cuda()

In [None]:
# 7-6
text_list = [
    '今日の仕事はうまくいったが、体調があまり良くない。',
    '昨日は楽しかった。'
]

labels_list = [
    [1, 1],
    [0, 1]
]

# データの符号化
encoding = tokenizer(
    text_list,
    padding='longest',
    return_tensors='pt'
)
encoding = { k: v.cuda() for k, v in encoding.items() }
labels = torch.tensor(labels_list).cuda()

# BERTへデータを入力し分類スコアを得る。
with torch.no_grad():
    output = bert_scml(**encoding)
scores = output.logits

# スコアが正ならば、そのカテゴリーを選択する。
labels_predicted = ( scores > 0 ).int()

# 精度の計算
num_correct = ( labels_predicted == labels ).all(-1).sum().item()
accuracy = num_correct/labels.size(0)

In [None]:
# 7-7
# データの符号化
encoding = tokenizer(
    text_list,
    padding='longest',
    return_tensors='pt'
)
encoding['labels'] = torch.tensor(labels_list) # 入力にlabelsを含める。
encoding = { k: v.cuda() for k, v in encoding.items() }

output = bert_scml(**encoding)
loss = output.loss # 損失

In [None]:
# 7-8
# データのダウンロード
!wget https://s3-ap-northeast-1.amazonaws.com/dev.tech-sketch.jp/chakki/public/chABSA-dataset.zip
# データの解凍
!unzip chABSA-dataset.zip

In [None]:
# 7-9
data = json.load(open('chABSA-dataset/e00030_ann.json'))
print( data['sentences'][0] )

In [None]:
# 7-10
category_id = {'negative':0, 'neutral':1 , 'positive':2}

dataset = []
for file in glob.glob('chABSA-dataset/*.json'):
    data = json.load(open(file))
    # 各データから文章（text）を抜き出し、ラベル（'labels'）を作成
    for sentence in data['sentences']:
        text = sentence['sentence']
        labels = [0,0,0]
        for opinion in sentence['opinions']:
            labels[category_id[opinion['polarity']]] = 1
        sample = {'text': text, 'labels': labels}
        dataset.append(sample)

In [None]:
# 7-11
print(dataset[0])

In [None]:
from pprint import pprint

file = 'chABSA-dataset/e00030_ann.json'
data = json.load(open(file))
first_sentence = data['sentences'][0]

print("文：")
print(first_sentence['sentence'])

print("\nopinions：")
pprint(first_sentence['opinions'])


In [None]:
# 7-12
# トークナイザのロード
tokenizer = BertJapaneseTokenizer.from_pretrained(MODEL_NAME)

# 各データの形式を整える
max_length = 128
dataset_for_loader = []
for sample in dataset:
    text = sample['text']
    labels = sample['labels']
    encoding = tokenizer(
        text,
        max_length=max_length,
        padding='max_length',
        truncation=True
    )
    encoding['labels'] = labels
    encoding = { k: torch.tensor(v) for k, v in encoding.items() }
    encoding['text'] = text  # ← ここが追加ポイント
    dataset_for_loader.append(encoding)

# データセットの分割
random.shuffle(dataset_for_loader)
n = len(dataset_for_loader)
n_train = int(0.6*n)
n_val = int(0.2*n)
dataset_train = dataset_for_loader[:n_train] # 学習データ
dataset_val = dataset_for_loader[n_train:n_train+n_val] # 検証データ
dataset_test = dataset_for_loader[n_train+n_val:] # テストデータ

#　データセットからデータローダを作成
dataloader_train = DataLoader(
    dataset_train, batch_size=32, shuffle=True
)
dataloader_val = DataLoader(dataset_val, batch_size=256)
dataloader_test = DataLoader(dataset_test, batch_size=256)

# テストデータの中から否定文・非否定文を分離
def contains_negation(text):
    NEGATION_PATTERNS = [
        "ない", "なかった", "ません", "ぬ", "ず", "ん",
        "じゃない", "ではない", "わけではない", "ことはない",
        "というわけではない", "かと思いきや",
        "補えず", "届かず", "至らず", "不透明", "不確実", "不十分",
        "できず", "失う", "減少", "低下", "悪化", "回復しない", "停滞", "弱含み"
    ]

    return any(neg in text for neg in NEGATION_PATTERNS)


# 7-12.5: 形態素解析による否定文抽出関数
import fugashi
tagger = fugashi.Tagger()

# 否定助動詞に基づいて否定文かどうか判定する
def contains_negation_morph(text):
    for word in tagger(text):
        # 助動詞で "ない" などを含むものを探す
        if word.surface in ["ない", "なかった", "ぬ", "ず", "ん"] and "助動詞" in word.feature:
            # 感情とは関係ない定型文はスキップ
            if any(skip_phrase in text for skip_phrase in ["消費税", "含んでおりません", "記載", "税抜", "税込"]):
                continue
            return True
    return False


# ↓ここだけ変更（contains_negation → contains_negation_morph）
test_data_negative = [s for s in dataset_test if contains_negation(s['text'])]
test_data_non_negative = [s for s in dataset_test if not contains_negation(s['text'])]


print(f'否定文数: {len(test_data_negative)}')
print(f'非否定文数: {len(test_data_non_negative)}')

dataloader_neg = DataLoader(test_data_negative, batch_size=256)
dataloader_non_neg = DataLoader(test_data_non_negative, batch_size=256)


In [None]:
print("\n==== 否定文のサンプル ====")
for i, sample in enumerate(test_data_negative[:20]):
    print(f"{i+1}. {sample['text']}")

print("\n==== 非否定文のサンプル ====")
for i, sample in enumerate(test_data_non_negative[:20]):
    print(f"{i+1}. {sample['text']}")

In [None]:
# 7-13
class BertForSequenceClassificationMultiLabel_pl(pl.LightningModule):

    def __init__(self, model_name, num_labels, lr):
        super().__init__()
        self.save_hyperparameters()
        self.bert_scml = BertForSequenceClassificationMultiLabel(
            model_name, num_labels=num_labels
        )

    def training_step(self, batch, batch_idx):
        batch.pop('text', None)  # ← 追加！
        output = self.bert_scml(**batch)
        loss = output.loss
        self.log('train_loss', loss)
        return loss

    def validation_step(self, batch, batch_idx):
        batch.pop('text', None)  # ← 追加！
        output = self.bert_scml(**batch)
        val_loss = output.loss
        self.log('val_loss', val_loss)

    def test_step(self, batch, batch_idx):
        batch.pop('text', None)  # ← 追加！
        labels = batch.pop('labels')
        output = self.bert_scml(**batch)
        scores = output.logits
        labels_predicted = ( scores > 0 ).int()
        num_correct = ( labels_predicted == labels ).all(-1).sum().item()
        accuracy = num_correct/scores.size(0)
        self.log('accuracy', accuracy)

    def configure_optimizers(self):
        return torch.optim.Adam(self.parameters(), lr=self.hparams.lr)

checkpoint = pl.callbacks.ModelCheckpoint(
    monitor='val_loss',
    mode='min',
    save_top_k=1,
    save_weights_only=True,
    dirpath='model/',
)

trainer = pl.Trainer(
    accelerator='gpu',  # ← 修正箇所
    devices=1,          # ← 修正箇所
    max_epochs=5,
    callbacks=[checkpoint]
)

model = BertForSequenceClassificationMultiLabel_pl(
    MODEL_NAME,
    num_labels=3,
    lr=1e-5
)
trainer.fit(model, dataloader_train, dataloader_val)
test = trainer.test(dataloaders=dataloader_test)
print(f'Accuracy: {test[0]["accuracy"]:.2f}')

In [None]:
# 7-14
# 入力する文章
text_list = [
    "今期は売り上げが順調に推移したが、株価は低迷の一途を辿っている。",
    "昨年から黒字が減少した。",
    "今日の飲み会は楽しかった。"
]

# モデルのロード
best_model_path = checkpoint.best_model_path
model = BertForSequenceClassificationMultiLabel_pl.load_from_checkpoint(best_model_path)
bert_scml = model.bert_scml.cuda()

# データの符号化
encoding = tokenizer(
    text_list,
    padding = 'longest',
    return_tensors='pt'
)
encoding = { k: v.cuda() for k, v in encoding.items() }

# BERTへデータを入力し分類スコアを得る。
with torch.no_grad():
    output = bert_scml(**encoding)
scores = output.logits
labels_predicted = ( scores > 0 ).int().cpu().numpy().tolist()

# 結果を表示
for text, label in zip(text_list, labels_predicted):
    print('--')
    print(f'入力：{text}')
    print(f'出力：{label}')

In [None]:
def evaluate_accuracy(dataloader, label):
    correct = 0
    total = 0
    for batch in dataloader:
        batch.pop('text', None)
        labels = batch.pop('labels')
        batch = {k: v.cuda() for k, v in batch.items()}
        labels = labels.cuda()

        with torch.no_grad():
            output = bert_scml(**batch)
        scores = output.logits
        preds = (scores > 0).int()

        correct += (preds == labels).all(-1).sum().item()
        total += labels.size(0)

    print(f'{label}精度: {correct / total:.2f} （{correct}/{total}）')

# 否定文、非否定文の精度出力
evaluate_accuracy(dataloader_neg, "否定文")
evaluate_accuracy(dataloader_non_neg, "非否定文")


In [None]:
from collections import Counter

label_counts = Counter(tuple(s['labels']) for s in dataset)
print(label_counts)


In [None]:
# 7-15 否定語の判定関数（簡易版）
def is_negative_sentence(text):
    negative_keywords = ['ない', 'ぬ', 'ん', 'ず', 'ません', 'なかった', 'できない', 'しない']
    return any(kw in text for kw in negative_keywords)

# 7-16 否定文・非否定文に分割
test_data_negative = [s for s in dataset_test if is_negative_sentence(tokenizer.decode(s['input_ids']))]
test_data_non_negative = [s for s in dataset_test if not is_negative_sentence(tokenizer.decode(s['input_ids']))]

print(f'否定文数: {len(test_data_negative)}')
print(f'非否定文数: {len(test_data_non_negative)}')


In [None]:
# 評価用関数
def evaluate(model, dataset):
    loader = DataLoader(dataset, batch_size=256)
    num_correct = 0
    total = 0
    model.eval()
    with torch.no_grad():
        for batch in loader:
            labels = batch.pop('labels').cuda()
            batch = {k: v.cuda() for k, v in batch.items()}
            output = model(**batch)
            preds = (output.logits > 0).int()
            num_correct += (preds == labels).all(-1).sum().item()
            total += labels.size(0)
    return num_correct / total if total > 0 else 0.0

# モデルロード（念のため）
model = BertForSequenceClassificationMultiLabel_pl.load_from_checkpoint(best_model_path)
bert_scml = model.bert_scml.cuda()

# 精度を評価
acc_neg = evaluate(bert_scml, test_data_negative)
acc_non_neg = evaluate(bert_scml, test_data_non_negative)

print(f"否定文の精度: {acc_neg:.2f}")
print(f"非否定文の精度: {acc_non_neg:.2f}")
