<a href="https://colab.research.google.com/github/xinxin-yan/MetSim/blob/main/Comp_Ling_final_assessment_KYCS5_2%20with%20new%20loss.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# 0. Encironment preparation
安装 transformers, datasets, torch 等依赖库。

In [None]:
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
import pandas as pd
from transformers import RobertaTokenizer, RobertaModel
from torch.optim import AdamW
from sklearn.metrics import f1_score
import numpy as np
import os
!pip install spacy benepar
!python -m spacy download en_core_web_sm

Collecting en-core-web-sm==3.7.1


# 1. Data preprocessing
加载 VUA 数据集，并转换为适用于 RoBERTa 的格式（Tokenization）。

处理标签（隐喻/非隐喻分类任务）。



In [None]:
# Dataset Class
class MetaphorDataset(Dataset):
    def __init__(self, file_path, tokenizer_name="roberta-base", max_len=128):
        self.data = pd.read_csv(file_path, sep='\t', header=None)
        self.tokenizer = RobertaTokenizer.from_pretrained(tokenizer_name)
        self.max_len = max_len

        self.sentences = self.data[2].tolist()
        self.fgpos = self.data[4].tolist()
        self.target_indices = self.data[5].tolist()
        self.labels = self.data[6].tolist()

    def __len__(self):
        return len(self.sentences)

    def __getitem__(self, idx):
        sentence = str(self.sentences[idx])
        fgpos_tag = str(self.fgpos[idx])
        target_index = int(self.target_indices[idx])
        label = int(self.labels[idx])

        words = sentence.split()
        target_word = words[target_index]

        marked_sentence = f"<s> {' '.join(words)} </s> {fgpos_tag} </s>"
        sent_encoding = self.tokenizer(marked_sentence, padding='max_length', truncation=True, max_length=self.max_len, return_tensors='pt')

        isolated_word = f"<s> {target_word} </s>"
        word_encoding = self.tokenizer(isolated_word, padding='max_length', truncation=True, max_length=self.max_len, return_tensors='pt')

        tokenized_words = self.tokenizer.tokenize(" ".join(words))
        try:
            token_idx = tokenized_words.index(self.tokenizer.tokenize(target_word)[0]) + 1
        except ValueError:
            token_idx = 1

        return {
            "sentence_input_ids": sent_encoding["input_ids"].squeeze(0),
            "sentence_attention_mask": sent_encoding["attention_mask"].squeeze(0),
            "word_input_ids": word_encoding["input_ids"].squeeze(0),
            "word_attention_mask": word_encoding["attention_mask"].squeeze(0),
            "target_token_index": torch.tensor(token_idx),
            "label": torch.tensor(label, dtype=torch.float)
        }

# 2. Model building
加载 RoBERTa 预训练模型。

修改最后一层以适应二分类任务（隐喻检测）。

In [None]:
# Model using MIP + SPV as in MelBERT
class MelBERT(nn.Module):
    def __init__(self, hidden_size=768, classifier_hidden=256):
        super(MelBERT, self).__init__()
        self.encoder = RobertaModel.from_pretrained("roberta-base", add_pooling_layer=False)

        self.mip_layer = nn.Linear(2 * hidden_size, classifier_hidden)
        self.spv_layer = nn.Linear(2 * hidden_size, classifier_hidden)
        self.classifier = nn.Linear(2 * classifier_hidden, 1)
        self.dropout = nn.Dropout(0.1)
        self.sigmoid = nn.Sigmoid()

        self._init_weights(self.mip_layer)
        self._init_weights(self.spv_layer)
        self._init_weights(self.classifier)

    def _init_weights(self, module):
        if isinstance(module, nn.Linear):
            nn.init.xavier_uniform_(module.weight)
            if module.bias is not None:
                module.bias.data.zero_()

    def forward(self, sentence_input_ids, sentence_attention_mask, word_input_ids, word_attention_mask, target_token_index):
        sent_output = self.encoder(input_ids=sentence_input_ids, attention_mask=sentence_attention_mask)
        word_output = self.encoder(input_ids=word_input_ids, attention_mask=word_attention_mask)

        vS = sent_output.last_hidden_state[:, 0, :]  # sentence CLS
        vS_t = torch.stack([sent_output.last_hidden_state[i, idx, :] for i, idx in enumerate(target_token_index)])
        vT = word_output.last_hidden_state[:, 0, :]  # isolated CLS

        h_mip = self.dropout(torch.relu(self.mip_layer(torch.cat([vT, vS_t], dim=1))))
        h_spv = self.dropout(torch.relu(self.spv_layer(torch.cat([vS, vS_t], dim=1))))

        logits = self.classifier(torch.cat([h_mip, h_spv], dim=1)).squeeze(1)
        return self.sigmoid(logits)


# 3. Model training
设定优化器、损失函数（如交叉熵损失）。

使用 GPU 训练，并调整超参数（学习率、批量大小等）。

In [None]:
# Training Loop
def train(model, dataloader, optimizer, device, class_weight=1.0):
    model.train()
    total_loss = 0

    # Weighted NLLLoss
    loss_fct = nn.BCEWithLogitsLoss(pos_weight=torch.tensor([class_weight]).to(device))

    for batch in dataloader:
        for k in batch:
            batch[k] = batch[k].to(device)

        outputs = model(batch['sentence_input_ids'],
                        batch['sentence_attention_mask'],
                        batch['word_input_ids'],
                        batch['word_attention_mask'],
                        batch['target_token_index'])

        # If your model ends with sigmoid(), use BCELoss instead
        logits = torch.stack([1 - outputs, outputs], dim=1)  # [batch_size, 2] for NLLLoss
        labels = batch['label'].float()  # float for BCE loss

        loss = loss_fct(outputs, labels)
        loss.backward()
        optimizer.step()
        optimizer.zero_grad()
        total_loss += loss.item()

    return total_loss / len(dataloader)


# 4. Model evaluation
在测试集上进行评估，计算 F1-score, accuracy 等指标。

In [None]:
# Evaluation Loop
def evaluate(model, dataloader, device):
    model.eval()
    preds, labels = [], []
    with torch.no_grad():
        for batch in dataloader:
            for k in batch:
                batch[k] = batch[k].to(device)
            outputs = model(batch['sentence_input_ids'], batch['sentence_attention_mask'],
                            batch['word_input_ids'], batch['word_attention_mask'], batch['target_token_index'])
            preds.extend(outputs.cpu().numpy())
            labels.extend(batch['label'].cpu().numpy())
    preds = np.array(preds) > 0.5
    return f1_score(labels, preds), preds

# 5. Main

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
# Main Entry
train_file = "/content/drive/MyDrive/Colab Notebooks/cl dataset/train_novelty_labels.tsv"
test_file = "/content/drive/MyDrive/Colab Notebooks/cl dataset/test_novelty_labels.tsv"

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

train_dataset = MetaphorDataset(train_file)
test_dataset = MetaphorDataset(test_file)

train_loader = DataLoader(train_dataset, batch_size=16, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=32)

model = MelBERT().to(device)
optimizer = AdamW(model.parameters(), lr=2e-5)
criterion = nn.BCELoss()

for epoch in range(5):
    train_loss = train(model, train_loader, optimizer, device, class_weight=5.0)
    f1, _ = evaluate(model, test_loader, device)
    print(f"Epoch {epoch+1} - Train Loss: {train_loss:.4f}, Test F1: {f1:.4f}")

Epoch 1 - Train Loss: 0.7064, Test F1: 0.0000
Epoch 2 - Train Loss: 0.7060, Test F1: 0.0000
Epoch 3 - Train Loss: 0.7060, Test F1: 0.0000
Epoch 4 - Train Loss: 0.7060, Test F1: 0.0000
Epoch 5 - Train Loss: 0.7060, Test F1: 0.0000


# extraction

In [None]:
import spacy
import benepar

# 加载 parser
nlp = spacy.load("en_core_web_sm")
if not benepar.is_loaded("benepar_en3"):
    benepar.download("benepar_en3")
nlp.add_pipe("benepar", config={"model": "benepar_en3"})

def extract_constituent_phrase(sentence, target_index, phrase_type="NP_or_VP"):
    """
    phrase_type = "NP_or_VP": 自动判断
                 "NP": 强制只抽NP
                 "VP": 强制只抽VP
    """
    doc = nlp(sentence)
    words = sentence.split()
    target_word = words[target_index]

    for sent in doc.sents:
        tree = sent._.parse_string
        parsed = sent._.constituent

        # 遍历 constituent 树查找包含 target 的最小 NP/VP
        for constituent in parsed.subtrees:
            if constituent.label_ in {"NP", "VP"}:
                leaves = list(constituent.leaves())
                leaf_text = [t.text for t in leaves]
                if target_word in leaf_text:
                    if phrase_type == "NP_or_VP":
                        return " ".join(leaf_text)
                    elif phrase_type == "NP" and constituent.label_ == "NP":
                        return " ".join(leaf_text)
                    elif phrase_type == "VP" and constituent.label_ == "VP":
                        return " ".join(leaf_text)
    return target_word  # fallback


In [None]:
def guess_phrase_type(pos_tag):
    if pos_tag.startswith("V"):
        return "VP"
    elif pos_tag.startswith("N"):
        return "NP"
    else:
        return "NP_or_VP"


# 输出模型预测为 metaphor 的词及其所在短语
for i in range(len(test_dataset)):
    item = test_dataset[i]
    sentence = test_dataset.sentences[i]
    idx = int(test_dataset.target_indices[i])
    pos = test_dataset.pos[i]
    if preds[i]:  # 被预测为 metaphor
        phrase = extract_constituent_phrase(sentence, idx, guess_phrase_type(pos))
        print(f"📍 [{phrase}]  ← from sentence: {sentence}")


# save

In [None]:
import csv

output_path = "melbert_metaphor_predictions.tsv"
rows = []

for i in range(len(test_dataset)):
    sentence = test_dataset.sentences[i]
    idx = int(test_dataset.target_indices[i])
    pos = test_dataset.pos[i]
    label = int(test_dataset.labels[i])
    pred = int(preds[i])

    target_word = sentence.split()[idx]
    phrase = extract_constituent_phrase(sentence, idx, guess_phrase_type(pos))

    rows.append({
        "sentence": sentence,
        "target_word": target_word,
        "target_index": idx,
        "POS": pos,
        "true_label": label,
        "predicted_label": pred,
        "extracted_phrase": phrase
    })

# 保存为 TSV
with open(output_path, "w", newline="", encoding="utf-8") as f:
    writer = csv.DictWriter(f, fieldnames=rows[0].keys(), delimiter="\t")
    writer.writeheader()
    writer.writerows(rows)

print(f"✅ 保存成功！预测结果已写入：{output_path}")