In [57]:
import tensorflow as tf
from tensorflow.keras.datasets import imdb
from tensorflow.keras.preprocessing.sequence import pad_sequences

# 定义参数
VOCAB_SIZE = 5000   # 单词表大小
MAX_LEN = 128       # 每条评论的最大长度（填充长度）
EMBEDDING_DIM = 64  # 嵌入维度

# 加载 IMDb 数据集
(x_train, y_train), (x_test, y_test) = imdb.load_data(num_words=VOCAB_SIZE)

# 填充序列长度
x_train = pad_sequences(x_train, maxlen=MAX_LEN, padding='post')
x_test = pad_sequences(x_test, maxlen=MAX_LEN, padding='post')

# 划分训练集和验证集
from sklearn.model_selection import train_test_split
x_train, x_val, y_train, y_val = train_test_split(x_train, y_train, test_size=0.1, random_state=42)

print(f"Train set: {x_train.shape}, Validation set: {x_val.shape}, Test set: {x_test.shape}")


Train set: (22500, 128), Validation set: (2500, 128), Test set: (25000, 128)


In [58]:
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Embedding, Conv1D, GlobalMaxPooling1D, Dense, Dropout

# 构建模型
def build_simple_model(vocab_size, embedding_dim, max_len):
    model = Sequential([
        Embedding(input_dim=vocab_size, output_dim=embedding_dim, input_length=max_len),  # 嵌入层
        Conv1D(filters=128, kernel_size=5, activation='relu'),  # 卷积层
        GlobalMaxPooling1D(),  # 全局最大池化
        Dropout(0.5),
        Dense(64, activation='relu'),  # 全连接层
        Dropout(0.5),
        Dense(1, activation='sigmoid')  # 输出层，二分类
    ])
    return model

# 初始化模型
model = build_simple_model(VOCAB_SIZE, EMBEDDING_DIM, MAX_LEN)

# 编译模型
model.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy'])

# 打印模型结构
model.summary()

# 模型训练
EPOCHS = 3
BATCH_SIZE = 64

history = model.fit(
    x_train, y_train,
    validation_data=(x_val, y_val),
    epochs=EPOCHS,
    batch_size=BATCH_SIZE
)




Epoch 1/3
[1m352/352[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m28s[0m 73ms/step - accuracy: 0.5840 - loss: 0.6522 - val_accuracy: 0.8028 - val_loss: 0.4197
Epoch 2/3
[1m352/352[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m39s[0m 69ms/step - accuracy: 0.8464 - loss: 0.3596 - val_accuracy: 0.8476 - val_loss: 0.3485
Epoch 3/3
[1m352/352[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m42s[0m 70ms/step - accuracy: 0.9095 - loss: 0.2442 - val_accuracy: 0.8572 - val_loss: 0.3392


In [61]:
def generate_pgd_samples(
    model,
    x_input,
    y_input,
    epsilon=0.1,
    alpha=0.01,
    num_iter=5
):
    """
    生成 PGD 对抗样本。
    """
    # 确保输入为张量
    x_input = tf.convert_to_tensor(x_input, dtype=tf.float32)
    y_input = tf.convert_to_tensor(y_input, dtype=tf.float32)

    # 调整 y_input 的维度以匹配模型输出
    if len(y_input.shape) == 1:  # y_input 为 (batch_size,)
        y_input = tf.expand_dims(y_input, axis=-1)  # 扩展维度为 (batch_size, 1)

    # 初始化对抗样本
    adv_x = tf.Variable(x_input)

    # PGD 迭代
    for i in range(num_iter):
        with tf.GradientTape() as tape:
            tape.watch(adv_x)  # 追踪对抗样本
            predictions = model(adv_x, training=False)  # 获取预测
            loss = tf.keras.losses.binary_crossentropy(y_input, predictions)  # 计算损失

        # 计算梯度
        gradients = tape.gradient(loss, adv_x)

        # 如果梯度为 None，跳过
        if gradients is None:
            print(f"Warning: Gradients are None at iteration {i}, skipping update.")
            continue

        # 更新对抗样本的扰动
        adv_x.assign_add(alpha * tf.sign(gradients))

        # 将扰动裁剪到 [x_input - epsilon, x_input + epsilon]
        adv_x.assign(tf.clip_by_value(adv_x, x_input - epsilon, x_input + epsilon))

        # 保证扰动输入的合法范围（IMDb 输入是整数索引）
        adv_x.assign(tf.clip_by_value(adv_x, 0, VOCAB_SIZE - 1))

    return tf.cast(adv_x, tf.int32)  # 返回整数形式的对抗样本


In [62]:
# 选择部分测试数据
x_test_subset = x_test[:100]  # 测试样本
y_test_subset = y_test[:100]  # 测试标签

# 生成对抗样本
x_test_adv = generate_pgd_samples(model, x_test_subset, y_test_subset)

# 使用对抗样本评估模型性能
print("Evaluating model on adversarial samples...")
adv_loss, adv_acc = model.evaluate(x_test_adv, y_test_subset, verbose=1)
print(f"Adversarial sample accuracy: {adv_acc:.4f}")


Evaluating model on adversarial samples...
[1m4/4[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 9ms/step - accuracy: 0.9291 - loss: 0.2290 
Adversarial sample accuracy: 0.9400


In [67]:
# 合并原始数据和对抗样本
x_train_combined = tf.concat([x_train, x_test_adv], axis=0)  # 拼接原始和对抗样本，保留 int32 类型
y_train_combined = tf.concat([y_train, y_test_subset], axis=0)  # 拼接对应的标签

# 如果标签形状不匹配，调整其形状
if len(y_train_combined.shape) == 1:
    y_train_combined = tf.expand_dims(y_train_combined, axis=-1)  # 确保标签为 (batch_size, 1)

# 如果标签数据类型为 int64，先转换为 int32，再转换为 float32
if y_train_combined.dtype != tf.float32:
    y_train_combined = tf.cast(y_train_combined, dtype=tf.float32)  # 转换为 float32

# 检查合并后的数据形状
print(f"x_train_combined shape: {x_train_combined.shape}, dtype: {x_train_combined.dtype}")
print(f"y_train_combined shape: {y_train_combined.shape}, dtype: {y_train_combined.dtype}")

# 对抗训练过程
print("Starting adversarial training...")
history_adv = model.fit(
    x_train_combined, y_train_combined,
    validation_data=(x_val, y_val),
    epochs=3,
    batch_size=64
)


x_train_combined shape: (22600, 128), dtype: <dtype: 'int32'>
y_train_combined shape: (22600, 1), dtype: <dtype: 'float32'>
Starting adversarial training...
Epoch 1/3
[1m354/354[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m25s[0m 66ms/step - accuracy: 0.9364 - loss: 0.1868 - val_accuracy: 0.8540 - val_loss: 0.3731
Epoch 2/3
[1m354/354[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m41s[0m 66ms/step - accuracy: 0.9579 - loss: 0.1245 - val_accuracy: 0.8476 - val_loss: 0.4204
Epoch 3/3
[1m354/354[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m44s[0m 74ms/step - accuracy: 0.9679 - loss: 0.0943 - val_accuracy: 0.8448 - val_loss: 0.5212


In [68]:
# 原始测试集评估
print("Evaluating model on original test set...")
orig_loss, orig_acc = model.evaluate(x_test, y_test, verbose=1)
print(f"Original Test Accuracy: {orig_acc:.4f}")

# 对抗样本评估（训练后）
print("Evaluating model on adversarial samples...")
adv_loss, adv_acc = model.evaluate(x_test_adv, y_test_subset, verbose=1)
print(f"Adversarial Sample Accuracy after training: {adv_acc:.4f}")


Evaluating model on original test set...
[1m782/782[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m9s[0m 12ms/step - accuracy: 0.8487 - loss: 0.4847
Original Test Accuracy: 0.8465
Evaluating model on adversarial samples...
[1m4/4[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 9ms/step - accuracy: 0.9847 - loss: 0.0590 
Adversarial Sample Accuracy after training: 0.9800
