In [1]:
import torch
import torch.nn as nn
import math
import copy

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [2]:
import json
import pandas as pd
from sklearn.model_selection import train_test_split
from gensim.models.doc2vec import Doc2Vec, TaggedDocument
import nltk
from nltk.tokenize import word_tokenize
import re
import torch.optim as optim
import numpy as np
nltk.download('punkt')

[nltk_data] Downloading package punkt to
[nltk_data]     C:\Users\rishe\AppData\Roaming\nltk_data...
[nltk_data]   Package punkt is already up-to-date!


True

In [4]:
with open('data/train-claims.json', 'r') as file:
    data = json.load(file)
with open('data/evidence.json', 'r') as file:
    evidence_data = json.load(file)

# New Section

In [5]:
output_data = []
for claim_id, claim_info in data.items():
    claim_text = claim_info['claim_text']
    claim_label = claim_info['claim_label']
    if claim_label == "DISPUTED":
        continue  # 忽略 disputed 的声明
    label_mapping = {
        "SUPPORTS": "support",
        "REFUTES": "refute",
        "NOT_ENOUGH_INFO": "irrelevant"
    }
    if claim_label in label_mapping:
        for evidence_id in claim_info['evidences']:
            output_data.append({
                "claim_id": claim_id,
                "claim_text": claim_text,
                "evidence_id": evidence_id,
                "label": label_mapping[claim_label],
                "evidence_text": evidence_data.get(evidence_id, "")
            })

In [6]:
df_output = pd.DataFrame(output_data)
train_df, valid_df = train_test_split(df_output, test_size=0.2, random_state=42)

# 函数来预处理和向量化文本
def preprocess_text(text):
    pattern = re.compile(r'\b[a-zA-Z0-9]+\b')
    tokens = word_tokenize(text.lower())
    filtered_tokens = [token for token in tokens if pattern.match(token)]
    return filtered_tokens

# 准备文档
train_documents = [TaggedDocument(words=preprocess_text(row['claim_text']) + preprocess_text(row['evidence_text']), tags=[str(i)]) for i, row in train_df.iterrows()]
valid_documents = [TaggedDocument(words=preprocess_text(row['claim_text']) + preprocess_text(row['evidence_text']), tags=[str(i)]) for i, row in valid_df.iterrows()]

In [7]:
model_d2v = Doc2Vec(vector_size=128, window=5, min_count=1, workers=4, epochs=50)

# 构建词汇表并训练模型
model_d2v.build_vocab(train_documents)
model_d2v.train(train_documents, total_examples=model_d2v.corpus_count, epochs=model_d2v.epochs)

In [8]:
def get_vectors(documents):
    vectors = np.array([model_d2v.infer_vector(doc.words) for doc in documents])
    return torch.tensor(vectors, dtype=torch.float).view(vectors.shape[0], 1, -1)

In [9]:
train_vectors = get_vectors(train_documents).to(device)
valid_vectors = get_vectors(valid_documents).to(device)

# 获取标签并转换为 tensor
train_labels = torch.tensor(train_df['label'].astype('category').cat.codes.to_numpy(), dtype=torch.long).to(device)
valid_labels = torch.tensor(valid_df['label'].astype('category').cat.codes.to_numpy(), dtype=torch.long).to(device)

In [10]:
class PositionalEncoding(nn.Module):
    def __init__ (self, d_model, dropout = 0.1, max_len = 5000):
        super(PositionalEncoding, self).__init__()
        self.dropout = nn.Dropout(p = dropout)
        self.encoding = torch.zeros(max_len, d_model, device=device)
        position = torch.arange(0, max_len, device=device).unsqueeze(1).float()
        div_term = torch.exp(torch.arange(0, d_model, 2, device=device).float() * -(math.log(10000.0) / d_model))
        self.encoding[:,0::2] = torch.sin(position * div_term)
        self.encoding[:,1::2] = torch.cos(position * div_term)
        self.encoding = self.encoding.unsqueeze(0)

    def forward(self, x):
        return x + self.encoding[:, :x.size(1)].dropout(self.dropout)


class TransformerEncoderLayer(nn.Module):
    def __init__(self, d_model, nhead, dim_feedforward=2048, dropout=0.1):
        super(TransformerEncoderLayer, self).__init__()
        self.self_attn = nn.MultiheadAttention(d_model, nhead, dropout=dropout)
        self.linear1 = nn.Linear(d_model, dim_feedforward)
        self.dropout = nn.Dropout(dropout)
        self.linear2 = nn.Linear(dim_feedforward, d_model)
        self.norm1 = nn.LayerNorm(d_model)
        self.norm2 = nn.LayerNorm(d_model)
        self.dropout1 = nn.Dropout(dropout)
        self.dropout2 = nn.Dropout(dropout)
        self.activation = nn.ReLU()

    def forward(self, src, src_mask=None, src_key_padding_mask=None):
        src2 = self.norm1(src)
        src2 = self.self_attn(src, src, src, attn_mask=src_mask, key_padding_mask=src_key_padding_mask)[0]
        src = src + self.dropout1(src2)
        src2 = self.norm2(src)
        src2 = self.linear1(src2)
        src2 = self.activation(src2)
        src2 = self.dropout(src2)
        src2 = self.linear2(src2)
        src = src + self.dropout2(src2)
        return src



class TransformerEncoder(nn.Module):
    def __init__(self, encoder_layer, num_layers):
        super(TransformerEncoder, self).__init__()
        self.layers = nn.ModuleList([copy.deepcopy(encoder_layer) for _ in range(num_layers)])
        self.norm = nn.LayerNorm(encoder_layer.self_attn.embed_dim)

    def forward(self, src, mask = None, src_key_padding_mask = None):
        output = src
        for layer in self.layers:
            output = layer(output, src_mask = mask, src_key_padding_mask = src_key_padding_mask)
        output = self.norm(output)
        return output

class TransformerForClassification(nn.Module):
    def __init__(self, input_dim, d_model, nhead, num_encoder_layers, dim_feedforward, num_classes):
        super(TransformerForClassification, self).__init__()
        self.encoder_layer = TransformerEncoderLayer(d_model, nhead, dim_feedforward)
        self.transformer_encoder = TransformerEncoder(self.encoder_layer, num_encoder_layers)
        self.pos_encoder = PositionalEncoding(d_model)
        self.input_embedding = nn.Linear(input_dim, d_model)
        self.output_layer = nn.Linear(d_model, num_classes)

    def forward(self, src, src_mask=None, src_key_padding_mask=None):
        src = self.input_embedding(src)
        src = self.pos_encoder(src)
        src = self.transformer_encoder(src, src_mask= src_mask, src_key_padding_mask=src_key_padding_mask)
        src = src.mean(dim=1)
        output = self.output_layer(src)
        return output


In [11]:
model = TransformerForClassification(input_dim = 128, d_model = 2984, nhead = 8, num_encoder_layers = 6, dim_feedforward = 2048, num_classes = 10).to(device)

In [None]:
optimizer = optim.Adam(model.parameters(), lr = 0.001)
criterion = nn.CrossEntropyLoss()

In [None]:
def train(model, train_vectors, train_labels, optimizer, criterion, n_epochs):
    model.train()
    batch_size = 32
    for epoch in range(n_epochs):
        optimizer.zero_grad()
        output = model(train_vectors)
        loss = criterion(output, train_labels)
        loss.backward()
        optimizer.step()
        if epoch % 10 == 0:  # 每10个epoch输出一次信息
            print(f'Epoch {epoch + 1}, Loss: {loss.item()}')

def evaluate(model, valid_vectors, valid_labels):
    model.eval()
    with torch.no_grad():
        outputs = model(valid_vectors)
        _, predicted = torch.max(outputs, 1)
        correct = (predicted == valid_labels).sum().item()
        accuracy = correct / valid_labels.size(0)
    return accuracy

In [None]:
n_epochs = 500
train_losses = []
valid_losses = []

n_epochs = 500
train(model, train_vectors, train_labels, optimizer, criterion, n_epochs=n_epochs)

Epoch 1, Loss: 2.221970796585083
Epoch 11, Loss: 6.217411994934082
Epoch 21, Loss: 1.4169812202453613
Epoch 31, Loss: 2.474271059036255


KeyboardInterrupt: 

In [None]:
validation_accuracy = evaluate(model, valid_vectors, valid_labels)
print(f'Validation Accuracy: {validation_accuracy:.2f}')

Validation Accuracy: 0.53
