In [1]:
# 0.1 切换运行时为 GPU/TPU (Runtime -> Change runtime type)

# 0.2 安装必要的库（使用可用版本：tensorflow 2.18.1， transformers 4.38.1）
!pip install tensorflow==2.18.1 transformers==4.38.1 scikit-learn nltk
!pip install jieba # 专门用于中文分词
!pip install textattack

# 0.3 引入库并设置环境
import numpy as np
import pandas as pd
import tensorflow as tf
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Input, Embedding, Bidirectional, LSTM, Dense, Dropout
from tensorflow.keras.preprocessing.text import Tokenizer
from tensorflow.keras.preprocessing.sequence import pad_sequences
from sklearn.metrics import accuracy_score
# 关键变更：使用 TFAutoModelForSequenceClassification 提高兼容性
from transformers import TFAutoModelForSequenceClassification, AutoTokenizer
import jieba
import os
import re

# 设置随机种子
SEED = 42
tf.random.set_seed(SEED)
np.random.seed(SEED)
print(f"TensorFlow Version: {tf.__version__}")

TensorFlow Version: 2.18.1


In [2]:
# --- 1.1 Colab环境配置: 文件上传与路径设置 ---
# 指定数据路径为 Colab 工作区
DATA_PATH = '/content/sample_data/'

# 定义实际的输入和标签列名
CONTENT_COL = 'specific_dialogue_content'
LABEL_COL = 'is_fraud'

# --- 1.2 数据集加载与分配 ---
try:
    train_df = pd.read_csv(DATA_PATH + 'train_results.csv', encoding='utf-8')
    test_df = pd.read_csv(DATA_PATH + 'test_results.csv', encoding='utf-8')
except FileNotFoundError:
    print("错误：无法找到数据集文件。请检查文件是否已上传到 '/content/sample_data/' 目录。")
    raise

# 清理列名（防止隐藏空格或BOM字符）
def clean_columns(df):
    df.columns = df.columns.str.strip()
    df.columns = df.columns.str.replace('\ufeff', '')
    return df

train_df = clean_columns(train_df)
test_df = clean_columns(test_df)

# 最终检查
if CONTENT_COL not in train_df.columns or LABEL_COL not in train_df.columns:
    print("\n致命错误：无法在数据集中找到 'specific_dialogue_content' 或 'is_fraud' 列。请检查您的CSV文件头！")
    print(f"当前列名列表: {train_df.columns.tolist()}")
    raise KeyError(f"Required columns {CONTENT_COL} or {LABEL_COL} not found.")

# ⬇️⬇️⬇️ 关键修复：标签映射与缺失值处理 ⬇️⬇️⬇️
try:
    # 1. 转换为字符串并统一转为大写
    label_mapping = {'TRUE': 1, 'FALSE': 0}

    # 应用映射。注意：NaN值映射后仍为NaN（属于 float 类型）
    train_df[LABEL_COL] = train_df[LABEL_COL].astype(str).str.upper().map(label_mapping)
    test_df[LABEL_COL] = test_df[LABEL_COL].astype(str).str.upper().map(label_mapping)

    # 2. 删除标签列中带有 NaN 的行
    initial_train_size = len(train_df)
    initial_test_size = len(test_df)

    train_df.dropna(subset=[LABEL_COL], inplace=True)
    test_df.dropna(subset=[LABEL_COL], inplace=True)

    # 3. 转换为整数类型（此时已无 NaN，可安全转换）
    y_train = train_df[LABEL_COL].astype(int)
    y_test = test_df[LABEL_COL].astype(int)

    # 打印处理结果
    print(f"训练集：初始行数 {initial_train_size}，缺失标签删除后剩余 {len(train_df)} 行。")
    print(f"测试集：初始行数 {initial_test_size}，缺失标签删除后剩余 {len(test_df)} 行。")

except Exception as e:
    print(f"\n致命错误：标签转换失败。错误信息: {e}")
    raise

# 提取数据
X_train_raw = train_df[CONTENT_COL].astype(str)
X_test_raw = test_df[CONTENT_COL].astype(str)
y_train = np.array(y_train)
y_test = np.array(y_test)

print(f"训练集大小: {len(X_train_raw)}, 测试集大小: {len(X_test_raw)}")
print(f"输入内容列: {CONTENT_COL}, 标签列: {LABEL_COL}")

# --- 1.3 传统模型(BiLSTM)的中文分词与向量化 ---
def chinese_segment(texts):
    return [' '.join(jieba.cut(text)) for text in texts]

X_train_seg = chinese_segment(X_train_raw)
X_test_seg = chinese_segment(X_test_raw)

MAX_WORDS = 20000
MAX_LEN = 80
EMBEDDING_DIM = 100

tokenizer = Tokenizer(num_words=MAX_WORDS, oov_token="<unk>")
tokenizer.fit_on_texts(X_train_seg)

X_train_seq = tokenizer.texts_to_sequences(X_train_seg)
X_test_seq = tokenizer.texts_to_sequences(X_test_seg)

X_train_pad = pad_sequences(X_train_seq, maxlen=MAX_LEN, padding='post', truncating='post')
X_test_pad = pad_sequences(X_test_seq, maxlen=MAX_LEN, padding='post', truncating='post')

# --- 1.4 大模型(TFBert)的中文Tokenizer ---
BERT_MODEL_NAME = 'bert-base-chinese'
bert_tokenizer = AutoTokenizer.from_pretrained(BERT_MODEL_NAME)

def encode_data(texts, tokenizer, max_len=MAX_LEN):
    return tokenizer(
        list(texts),
        max_length=max_len,
        truncation=True,
        padding='max_length',
        return_tensors='tf'
    )

Building prefix dict from the default dictionary ...
DEBUG:jieba:Building prefix dict from the default dictionary ...
Loading model from cache /tmp/jieba.cache
DEBUG:jieba:Loading model from cache /tmp/jieba.cache


训练集：初始行数 14363，缺失标签删除后剩余 13635 行。
测试集：初始行数 2677，缺失标签删除后剩余 2548 行。
训练集大小: 13635, 测试集大小: 2548
输入内容列: specific_dialogue_content, 标签列: is_fraud


Loading model cost 2.205 seconds.
DEBUG:jieba:Loading model cost 2.205 seconds.
Prefix dict has been built successfully.
DEBUG:jieba:Prefix dict has been built successfully.
The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


In [3]:
# --- 2.1 BiLSTM 模型 ---
def build_bilstm_model(max_words, max_len, embedding_dim):
    input_layer = Input(shape=(max_len,))
    x = Embedding(max_words, embedding_dim, input_length=max_len)(input_layer)
    x = Bidirectional(LSTM(128, return_sequences=False))(x)
    x = Dropout(0.5)(x)
    output_layer = Dense(1, activation='sigmoid')(x)

    model = Model(inputs=input_layer, outputs=output_layer)
    model.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy'])
    return model

bilstm_model = build_bilstm_model(MAX_WORDS, MAX_LEN, EMBEDDING_DIM)

print("\n--- 训练 BiLSTM ---")
bilstm_model.fit(
    X_train_pad, y_train,
    epochs=10,
    batch_size=64,
    validation_split=0.1,
    verbose=1
)

_, bilstm_acc = bilstm_model.evaluate(X_test_pad, y_test, verbose=0)
print(f"BiLSTM 原始测试集准确率 (基线): {bilstm_acc:.4f}")




--- 训练 BiLSTM ---
Epoch 1/10
[1m192/192[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m8s[0m 17ms/step - accuracy: 0.8735 - loss: 0.2257 - val_accuracy: 0.9912 - val_loss: 0.0669
Epoch 2/10
[1m192/192[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m2s[0m 12ms/step - accuracy: 0.9738 - loss: 0.0963 - val_accuracy: 0.9751 - val_loss: 0.0857
Epoch 3/10
[1m192/192[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m2s[0m 13ms/step - accuracy: 0.9608 - loss: 0.1235 - val_accuracy: 0.7221 - val_loss: 0.4007
Epoch 4/10
[1m192/192[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m2s[0m 12ms/step - accuracy: 0.9797 - loss: 0.0594 - val_accuracy: 0.9978 - val_loss: 0.0077
Epoch 5/10
[1m192/192[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m3s[0m 15ms/step - accuracy: 0.9990 - loss: 0.0050 - val_accuracy: 0.9985 - val_loss: 0.0025
Epoch 6/10
[1m192/192[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m3s[0m 13ms/step - accuracy: 0.9959 - loss: 0.0124 - val_accuracy: 0.9993 - val_loss: 0.0035
Epo

In [5]:
# --- 2.2 TFBert 模型 ---
train_encodings = encode_data(X_train_raw, bert_tokenizer)
test_encodings = encode_data(X_test_raw, bert_tokenizer)

def build_bert_model(model_name):
    model = TFAutoModelForSequenceClassification.from_pretrained(
        model_name,
        num_labels=1,
        from_pt=True
    )

    # 关键修复 1: 使用字符串 'adam' 绕过对象识别错误
    optimizer_name = 'adam'
    loss = tf.keras.losses.BinaryCrossentropy(from_logits=True)

    model.compile(optimizer=optimizer_name, loss=loss, metrics=['accuracy'])

    # 关键修复 2: 编译后，手动设置正确的学习率 (3e-5)
    model.optimizer.learning_rate.assign(3e-5)

    return model

bert_model = build_bert_model(BERT_MODEL_NAME)

print("\n--- 训练 TFBert ---")
train_dataset = tf.data.Dataset.from_tensor_slices((dict(train_encodings), y_train)).shuffle(100).batch(16)

bert_model.fit(
    train_dataset,
    epochs=3,
    verbose=1
)

test_dataset = tf.data.Dataset.from_tensor_slices((dict(test_encodings), y_test)).batch(16)
_, bert_acc = bert_model.evaluate(test_dataset, verbose=0)
print(f"TFBert 原始测试集准确率 (基线): {bert_acc:.4f}")

All PyTorch model weights were used when initializing TFBertForSequenceClassification.

Some weights or buffers of the TF 2.0 model TFBertForSequenceClassification were not initialized from the PyTorch model and are newly initialized: ['classifier.weight', 'classifier.bias']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.



--- 训练 TFBert ---
Epoch 1/3
Epoch 2/3
Epoch 3/3
TFBert 原始测试集准确率 (基线): 0.9984


In [11]:
# --- 3.1 核心替换函数（中文同义词代理，作为 PSO 的搜索空间） ---
def get_chinese_synonym_candidates_proxy(word):
    # 保持与之前的 Sememe 替换的词汇粒度一致
    substitution_map = {
        '银行': ['金融机构', '分行', '机构'],
        '验证码': ['校验码', '代码', '数字'],
        '转账': ['汇款', '划款', '打钱'],
        '密码': ['口令', '密钥'],
        '账户': ['卡号', '名下'],
        '贷款': ['借款', '借钱'],
        '您好': ['你好', '喂']
    }

    if len(word) <= 1 or word in ['的', '是', '了', '吗', '请问', '和', '也']:
        return []

    # 返回原始词和所有同义词，形成一个搜索空间
    candidates = substitution_map.get(word, [])
    if candidates:
        return [word] + candidates
    return []

# --- 3.2 对抗攻击：简化的离散 PSO (Particle Swarm Optimization) ---

# PSO 参数
POPULATION_SIZE = 10  # 粒子数量
MAX_ITER = 5          # 最大迭代次数
C1 = 0.5              # 认知项权重 (pBest)
C2 = 0.5              # 社会项权重 (gBest)
W = 0.8               # 惯性权重

def get_word_to_index_map(words, candidates_fn):
    """创建词语在候选空间中的索引映射"""
    word_to_index = {}
    index_to_word = {}

    for i, word in enumerate(words):
        candidates = candidates_fn(word)
        if candidates:
            word_to_index[i] = {cand: idx for idx, cand in enumerate(candidates)}
            index_to_word[i] = candidates
    return word_to_index, index_to_word

# 修复后的 get_text_from_position 函数需要依赖 perform_attack_pso 内部的局部变量
def get_prediction_score(text, target_model, tokenizer_fn):
    """计算模型对文本的预测分数和标签 (兼容 Keras 和 TFBert)"""
    model_input = tokenizer_fn([text])
    raw_output = target_model.predict(model_input, verbose=0)

    # 1. 处理模型输出：统一为 logits/probability tensor
    if isinstance(raw_output, tuple) and hasattr(raw_output, 'logits'):
        # Hugging Face Model (TFBert): Output is a tuple with logits
        output_tensor = raw_output.logits
        is_hf_bert = True
    else:
        # Keras Functional Model (BiLSTM): Output is the probability tensor
        # TFBert also falls here if predict returns raw tensor
        output_tensor = raw_output
        # 我们只能通过检查模型类型来确定它是否是 BiLSTM
        is_hf_bert = hasattr(target_model, 'config') # 简单检查是否存在 config 属性来判断是否为 HF 模型

    # 2. 确定概率值 (P(Fraud=1))
    # 关键：获取模型名称来判断是否需要 sigmoid
    if is_hf_bert:
        # TFBert with num_labels=1 始终输出原始 logits，需要应用 sigmoid
        # 注意：这里假设 TFBert model.predict() 成功返回了 logits
        try:
            prob_original_class = tf.nn.sigmoid(output_tensor[0][0]).numpy()
        except:
            # 如果 output_tensor 已经是 numpy array，直接取值
            prob_original_class = tf.nn.sigmoid(output_tensor[0][0]).numpy() if tf.is_tensor(output_tensor) else output_tensor[0][0]
    else:
        # BiLSTM Keras model: 输出层是 sigmoid，所以 output_tensor 已经是概率
        prob_original_class = output_tensor[0][0]

    # 3. 计算攻击分数和标签
    # 攻击分数 (score): 目标是欺骗模型，即最大化错误的类别 0 (非欺诈) 的概率。
    # score = 1.0 - P(Fraud=1)
    score = 1.0 - prob_original_class
    original_label = 1 if prob_original_class > 0.5 else 0

    return original_label, score

def perform_attack_pso(original_text, target_model, tokenizer_fn, get_candidates_fn, max_modifications=3):
    """
    使用简化的离散 PSO 算法执行攻击
    """
    # 内部定义 get_text_from_position 需要访问局部变量
    global words, modifiable_indices, index_to_word

    words = list(jieba.cut(original_text))
    original_label, _ = get_prediction_score(original_text, target_model, tokenizer_fn)

    # 1. 初始化粒子和搜索空间
    word_to_index, index_to_word = get_word_to_index_map(words, get_candidates_fn)
    modifiable_indices = list(word_to_index.keys())

    if not modifiable_indices:
        return False, original_text, original_label, original_label, 0

    num_dims = len(modifiable_indices)

    # 在内部定义 get_text_from_position 以访问局部变量 words, modifiable_indices, index_to_word
    def get_text_from_position(position_indices):
        temp_words = words[:]
        for dim, word_idx in enumerate(modifiable_indices):
            cand_list = index_to_word[word_idx]
            cand_index = position_indices[dim]
            # 关键修复：确保索引是整数
            temp_words[word_idx] = cand_list[int(cand_index)]
        return "".join(temp_words)

    # 初始化粒子的 "位置" (X) 和 "速度" (V)
    particle_positions = []  # 粒子位置 (词语索引)
    particle_velocities = [] # 粒子速度 (连续值)

    for _ in range(POPULATION_SIZE):
        pos = []
        vel = []
        for word_idx in modifiable_indices:
            # 随机选择一个初始位置 (词语索引)
            cand_len = len(index_to_word[word_idx])
            pos.append(np.random.randint(0, cand_len))
            # 随机初始化速度
            vel.append(np.random.rand() * 2 - 1)
        particle_positions.append(np.array(pos, dtype=float)) # 保持 float 类型以便后续速度相加
        particle_velocities.append(np.array(vel, dtype=float))

    # 初始化 pBest (个体最优) 和 gBest (全局最优)
    pbest_positions = [pos.copy() for pos in particle_positions]
    pbest_scores = np.full(POPULATION_SIZE, -1e9)

    gbest_position_idx = 0
    gbest_score = -1e9

    # 2. PSO 迭代
    for iter_i in range(MAX_ITER):
        # 评估所有粒子
        for i in range(POPULATION_SIZE):
            current_pos = particle_positions[i]
            # 关键修复：初始化 current_text 为原始文本，确保它始终有值
            current_text = original_text

            # 检查修改次数是否超过限制
            modified_count = 0
            for dim, word_idx in enumerate(modifiable_indices):
                # 假设原始词总是候选列表的第 0 个 (或最相似的那个)
                if int(current_pos[dim]) != 0:
                    modified_count += 1

            if modified_count > max_modifications:
                current_score = -1e9 # 惩罚超过修改限制的粒子
            else:
                current_text = get_text_from_position(current_pos)
                _, current_score = get_prediction_score(current_text, target_model, tokenizer_fn)

            # 更新 pBest
            if current_score > pbest_scores[i]:
                # 修复 DeprecationWarning：确保 current_score 是标量
                pbest_scores[i] = current_score.item() if isinstance(current_score, np.ndarray) else current_score
                pbest_positions[i] = current_pos.copy()

            # 更新 gBest
            if current_score > gbest_score:
                gbest_score = current_score.item() if isinstance(current_score, np.ndarray) else current_score
                gbest_position_idx = i

            # 检查是否攻击成功
            # 只有在未被惩罚时才检查攻击成功
            if modified_count <= max_modifications:
                adv_label, _ = get_prediction_score(current_text, target_model, tokenizer_fn)
                if adv_label != original_label:
                    # 攻击成功，返回当前对抗样本
                    original_words = list(jieba.cut(original_text))
                    modified_words = list(jieba.cut(current_text))
                    num_modified = sum(1 for w1, w2 in zip(original_words, modified_words) if w1 != w2)
                    return True, current_text, original_label, adv_label, num_modified

        # 更新粒子速度和位置
        gbest_pos = pbest_positions[gbest_position_idx]

        for i in range(POPULATION_SIZE):
            r1 = np.random.rand(num_dims)
            r2 = np.random.rand(num_dims)

            # 速度更新公式: V_new = W * V + C1 * R1 * (Pbest - X) + C2 * R2 * (Gbest - X)
            cognitive_component = C1 * r1 * (pbest_positions[i] - particle_positions[i])
            social_component = C2 * r2 * (gbest_pos - particle_positions[i])

            # 更新速度
            new_velocity = W * particle_velocities[i] + cognitive_component + social_component
            particle_velocities[i] = new_velocity

            # 位置更新: X_new = X + V_new (离散化)
            new_position = particle_positions[i] + particle_velocities[i]

            # 确保位置 (词语索引) 有效
            for dim, word_idx in enumerate(modifiable_indices):
                cand_len = len(index_to_word[word_idx])
                # 裁剪位置到 [0, cand_len-1] 范围内并四舍五入
                new_position[dim] = np.clip(np.round(new_position[dim]), 0, cand_len - 1)

            particle_positions[i] = new_position


    # 3. 迭代结束，返回全局最优解 (可能未成功攻击)
    final_pos = pbest_positions[gbest_position_idx]
    final_adv_text = get_text_from_position(final_pos)
    final_adv_label, _ = get_prediction_score(final_adv_text, target_model, tokenizer_fn)

    is_successful = (original_label != final_adv_label)

    original_words = list(jieba.cut(original_text))
    modified_words = list(jieba.cut(final_adv_text))
    num_modified = sum(1 for w1, w2 in zip(original_words, modified_words) if w1 != w2)

    return is_successful, final_adv_text, original_label, final_adv_label, num_modified

# --- 3.3 封装模型输入函数 (不变) ---
def bilstm_tokenizer_fn(texts):
    seg_texts = chinese_segment(texts)
    seq = tokenizer.texts_to_sequences(seg_texts)
    return pad_sequences(seq, maxlen=MAX_LEN, padding='post', truncating='post')

def bert_tokenizer_fn(texts):
    return encode_data(texts, bert_tokenizer)

# --- 4.1 实验一: Synonym+PSO 攻击 BiLSTM/TFBert ---

def run_attack_experiment_pso(model, tokenizer_fn, model_name, X_test, y_test, get_candidates_fn):

    X_test_enc = tokenizer_fn(X_test)
    raw_output = model.predict(X_test_enc, verbose=0)

    # 修复：明确根据模型名称处理输出
    if model_name == "TFBert":
        # 提取 logits
        if hasattr(raw_output, 'logits'):
            logits = raw_output.logits
        else:
            logits = raw_output # 假设是原始张量

        if logits.shape[-1] == 1:
             predictions = (tf.nn.sigmoid(logits).numpy() > 0.5).astype(int).flatten()
        else:
             predictions = np.argmax(logits, axis=1)
    else: # BiLSTM
        predictions = (raw_output > 0.5).astype(int).flatten()

    correctly_classified_indices = np.where(predictions == y_test)[0]

    X_attack = X_test.iloc[correctly_classified_indices]
    y_attack = y_test[correctly_classified_indices]

    print(f"\n--- {model_name} 攻击 (PSO) 开始 ---")
    print(f"原始正确分类样本数 (攻击目标): {len(X_attack)}")

    success_count = 0
    modified_texts = []
    modification_rates = []

    # 由于 PSO 攻击耗时较长，我们只对前 50 个样本进行攻击以加快运行速度
    X_sample = X_attack.head(50)
    y_sample = y_attack[:50]

    for idx, (original_text, original_label) in enumerate(zip(X_sample, y_sample)):
        is_successful, adv_text, _, adv_label, num_modified = perform_attack_pso(
            original_text,
            model,
            tokenizer_fn,
            get_candidates_fn,
            max_modifications=3
        )

        modified_texts.append(adv_text)

        original_len = len(list(jieba.cut(original_text)))
        modification_rates.append(num_modified / original_len if original_len > 0 else 0)

        if is_successful:
            success_count += 1
            if success_count <= 2:
                print(f"\n[成功案例] 原文: {original_text}")
                print(f"[成功案例] 对抗样本: {adv_text} (修改词数: {num_modified})")

    asr = success_count / len(X_sample) if len(X_sample) > 0 else 0
    avg_mod_rate = np.mean(modification_rates) if modification_rates else 0
    print(f"\n模型: {model_name}")
    print(f"攻击成功率 (ASR): {asr:.4f} ({success_count}/{len(X_sample)})")
    print(f"平均改动率: {avg_mod_rate:.4f}")

    # 为了评估，我们将攻击成功的样本和未成功的样本合并回原始的攻击目标集合中
    adv_test_set = pd.DataFrame({'text': modified_texts, 'label': y_sample})
    return asr, avg_mod_rate, adv_test_set

# 运行 BiLSTM 攻击
bilstm_asr, bilstm_mod_rate, adv_bilstm_df = run_attack_experiment_pso(
    bilstm_model, bilstm_tokenizer_fn, "BiLSTM", X_test_raw, y_test, get_chinese_synonym_candidates_proxy
)

# 运行 TFBert 攻击
bert_asr, bert_mod_rate, adv_bert_df = run_attack_experiment_pso(
    bert_model, bert_tokenizer_fn, "TFBert", X_test_raw, y_test, get_chinese_synonym_candidates_proxy
)

# --- 4.2 结果分析 (准确率下降情况) ---
def evaluate_on_adv_data(model, tokenizer_fn, adv_df, model_name, original_acc):
    if adv_df.empty:
        print(f"模型 {model_name} 在改写数据集 (D_adv) 上的准确率：0 (无成功攻击样本)")
        return

    X_adv_enc = tokenizer_fn(adv_df['text'])
    y_true = np.array(adv_df['label'])

    # 兼容 Keras 和 TFBert 的预测结果
    raw_output = model.predict(X_adv_enc, verbose=0)

    # 修复：明确根据模型名称处理输出
    if model_name == "TFBert":
        # 提取 logits
        if hasattr(raw_output, 'logits'):
            logits = raw_output.logits
        else:
            logits = raw_output # 假设是原始张量

        if logits.shape[-1] == 1:
            # TFBert 是 logits，需要 sigmoid 才能判断
            preds = (tf.nn.sigmoid(logits).numpy() > 0.5).astype(int).flatten()
        else:
            preds = np.argmax(logits, axis=1)
    else:
        # Keras BiLSTM (输出已经是概率)
        preds = (raw_output > 0.5).astype(int).flatten()

    adv_acc = accuracy_score(y_true, preds)
    print(f"\n模型 {model_name} 在改写数据集 (D_adv) 上的准确率: {adv_acc:.4f}")

    # 注意：这里的原始准确率 (original_acc) 是针对完整的测试集 X_test_raw 计算的，
    # 而 adv_df 只包含 X_sample 的结果。因此，这个下降比例仅供参考。
    print(f"（原始准确率 {original_acc:.4f} 下降到 {adv_acc:.4f}，基于前 {len(y_true)} 个攻击目标）")

evaluate_on_adv_data(bilstm_model, bilstm_tokenizer_fn, adv_bilstm_df, "BiLSTM", bilstm_acc)
evaluate_on_adv_data(bert_model, bert_tokenizer_fn, adv_bert_df, "TFBert", bert_acc)


--- BiLSTM 攻击 (PSO) 开始 ---
原始正确分类样本数 (攻击目标): 2539

模型: BiLSTM
攻击成功率 (ASR): 0.0000 (0/50)
平均改动率: 0.0504

--- TFBert 攻击 (PSO) 开始 ---
原始正确分类样本数 (攻击目标): 2543

模型: TFBert
攻击成功率 (ASR): 0.0000 (0/50)
平均改动率: 0.0040

模型 BiLSTM 在改写数据集 (D_adv) 上的准确率: 1.0000
（原始准确率 0.9965 下降到 1.0000，基于前 50 个攻击目标）

模型 TFBert 在改写数据集 (D_adv) 上的准确率: 1.0000
（原始准确率 0.9984 下降到 1.0000，基于前 50 个攻击目标）
