In [None]:
import json

with open('dataset/valid.json', 'r',encoding='utf-8') as f:
    valid_data = json.load(f)

texts = [item['text'] for item in valid_data]
labels = [0 if item['label'] == 'clean' else 1 for item in valid_data]


# 获取hidden states

In [None]:
import torch
from transformers import AutoTokenizer, AutoModelForCausalLM
from tqdm import tqdm
# import torch

# 设置随机种子
torch.manual_seed(42)
torch.cuda.manual_seed_all(42)

# 现在你的代码中的随机操作应该是可重复的，无论它们是否在 GPU 上运行
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

tokenizer = AutoTokenizer.from_pretrained("detected_model")
model = AutoModelForCausalLM.from_pretrained("detected_model", torch_dtype=torch.bfloat16).to(device)

def extract_features(texts, tokenizer, model):
    model.eval()
    features = []
    with torch.no_grad():
        for text in tqdm(texts):
            inputs = tokenizer(text, return_tensors='pt', truncation=True, padding=True).to(device)
            outputs = model(**inputs, output_hidden_states=True)
            last_hidden_state = outputs.hidden_states[-1]  # last hidden state
            feature = last_hidden_state.mean(dim=1)  # mean pooling
            features.append(feature.float().cpu().numpy())
    return torch.tensor(features)

features = extract_features(texts, tokenizer, model)


In [None]:
# print(features.shape)
# for feature in features:
#     print(feature.shape)

In [None]:
import torch
from torch.utils.data import TensorDataset, DataLoader, RandomSampler, SequentialSampler
# print(features.shape)
# print(labels.shape)
# 构造 TensorDataset
dataset = TensorDataset(features, torch.tensor(labels))
from torch.utils.data import random_split

# 假设你想要80%的数据作为训练集，20%的数据作为测试集
train_size = int(0.8 * len(dataset))
test_size = len(dataset) - train_size

# 使用 random_split 来拆分数据集
train_dataset, test_dataset = random_split(dataset, [train_size, test_size])
batch_size = 16
# 现在你可以创建 DataLoader
train_dataloader = DataLoader(train_dataset, sampler=RandomSampler(train_dataset), batch_size=batch_size)
test_dataloader = DataLoader(test_dataset, sampler=SequentialSampler(test_dataset), batch_size=batch_size)
# print(train_dataloader.dataset.tensors[0].shape)
# print(test_dataloader.dataset.tensors[0].shape)
print(len(train_dataset))
print(len(test_dataset))

In [None]:
# # 获取训练数据的一个批次
# train_batch = next(iter(train_dataloader))
# print("Train batch shape:")
# for item in train_batch:
#     print(item.shape)

# # 获取测试数据的一个批次
# test_batch = next(iter(test_dataloader))
# print("Test batch shape:")
# for item in test_batch:
#     print(item.shape)

# 训练

In [None]:
from transformers import BertTokenizer, BertForSequenceClassification, AdamW
import torch.nn as nn
from sklearn.metrics import classification_report
from sklearn.metrics import roc_auc_score
# 加载 BERT 模型和 tokenizer
tokenizer2 = BertTokenizer.from_pretrained('gaunernst/bert-medium-uncased')
model2 = BertForSequenceClassification.from_pretrained('gaunernst/bert-medium-uncased', num_labels=2).to(device)

# 定义优化器
optimizer = AdamW(model2.parameters(), lr=2e-4)
criterion = nn.CrossEntropyLoss()
# 训练模型
model2.train()
epochs = 8

for epoch in range(epochs):
    print(f"Epoch {epoch + 1}")
    for batch in train_dataloader:
        b_features, b_labels = batch
        b_features = b_features.to(device)
        b_labels = b_labels.to(device)

        outputs = model2(inputs_embeds=b_features, labels=b_labels)
        logits = outputs.logits
        # probabilities = torch.nn.functional.softmax(logits, dim=-1)
        # print(probabilities.shape)
        # print(probabilities[:, 1])
        # print(b_labels.shape)
        # 收集所有概率和标签
        # threshold = 0.5
        # binary_predictions = (probabilities[:, 1] >= probabilities[:,0]).float()
        loss = criterion(logits, b_labels)

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        # 开始测试
    model2.eval()  # 设置模型为评估模式
    all_preds = []
    all_labels = []
    with torch.no_grad():
        for batch in test_dataloader:
            b_features, b_labels = batch
            b_features = b_features.to(device)
            b_labels = b_labels.to(device)

            outputs = model2(inputs_embeds=b_features)
            _, preds = torch.max(outputs.logits, 1)
            all_preds.extend(preds.tolist())
            all_labels.extend(b_labels.tolist())

    print(classification_report(all_labels, all_preds))  # 打印分类报告
    # 评估模型
# model2.eval()

    all_probabilities = []
    all_labels = []

    with torch.no_grad():
        for batch in tqdm(test_dataloader):
            b_features, b_labels = batch
            b_features = b_features.to(device)
            b_labels = b_labels.to(device)

            logits = model2(inputs_embeds=b_features, labels=b_labels).logits
            probabilities = torch.nn.functional.softmax(logits, dim=-1)
            # print(probabilities.shape)
            # print(probabilities[:, 1])
            # print(b_labels)
            # 收集所有概率和标签
            # threshold = 0.5
            binary_predictions = (probabilities[:, 1] >= probabilities[:,0]).long()
            # all_predictions.append()
            all_probabilities.append(binary_predictions)
            all_labels.append(b_labels )
    # 检查标签分布
    # print("Label distribution:", {label: all_labels.count(label) for label in set(all_labels)})

    # 将所有概率和标签拼接成单个张量
    all_probabilities = torch.cat(all_probabilities)
    all_labels = torch.cat(all_labels)
    print(all_probabilities.shape)
    print(all_labels.shape)
    # 计算 AUC
    try:
        auc_score = roc_auc_score(all_probabilities.cpu(), all_labels.cpu())
        print(f"AUC: {auc_score.item()}")
    except:
        continue
    model2.train()  # 设置模型回到训练模式
    print(f"Epoch {epoch + 1}, Loss: {loss.item()}")


# 测试

In [None]:
import torchmetrics
from torchmetrics.classification import BinaryAUROC

# 初始化 AUC 计算器
auc = BinaryAUROC()

# 评估模型
model2.eval()

all_probabilities = []
all_labels = []

with torch.no_grad():
    for batch in tqdm(test_dataloader):
        b_features, b_labels = batch
        b_features = b_features.to(device)
        b_labels = b_labels.to(device)

        logits = model2(inputs_embeds=b_features, labels=b_labels).logits
        probabilities = torch.nn.functional.softmax(logits, dim=-1)
        # print(probabilities.shape)
        # print(probabilities[:, 1])
        # print(b_labels.shape)
        # 收集所有概率和标签
        # threshold = 0.5
        binary_predictions = (probabilities[:, 1] >= probabilities[:,0]).long()
        # all_predictions.append()
        all_probabilities.append(binary_predictions)
        all_labels.append(b_labels )

# 将所有概率和标签拼接成单个张量
all_probabilities = torch.cat(all_probabilities)
all_labels = torch.cat(all_labels)
print(all_probabilities[-11])
print(all_labels[-11])
# 计算 AUC
from sklearn.metrics import roc_auc_score
auc_score = roc_auc_score(all_probabilities.cpu(), all_labels.cpu())
print(f"AUC: {auc_score.item()}")


# 绘图

In [None]:
from sklearn.metrics import roc_curve, auc
import matplotlib.pyplot as plt

# 假设 all_probabilities 和 all_labels 是长度相同的列表或者张量
probabilities = all_probabilities.cpu().numpy()
labels = all_labels.cpu().numpy()

fpr, tpr, _ = roc_curve(labels, probabilities)
roc_auc = auc(fpr, tpr)

plt.figure()
lw = 2
plt.plot(fpr, tpr, color='darkorange', lw=lw, label='ROC curve (area = %0.2f)' % roc_auc)
plt.plot([0, 1], [0, 1], color='navy', lw=lw, linestyle='--')
plt.xlim([0.0, 1.0])
plt.ylim([0.0, 1.05])
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.title('Receiver Operating Characteristic')
plt.legend(loc="lower right")
plt.show()