# PARSeq 文本识别模型对抗攻击实验

In [None]:
# 导入所需的库
import torch
import torch.nn.functional as F
import torchvision.transforms as transforms
import numpy as np
import matplotlib.pyplot as plt
from PIL import Image
import os
from pathlib import Path
import warnings
import sys

# 添加父目录到Python路径以便导入strhub模块
sys.path.append(str(Path(__file__).parent.parent) if '__file__' in globals() else str(Path.cwd().parent))

warnings.filterwarnings('ignore')

# 设置图片显示
plt.rcParams['figure.figsize'] = (15, 8)
plt.rcParams['font.size'] = 12

print("导入库完成！")
print(f"PyTorch版本: {torch.__version__}")
print(f"设备: {'CUDA' if torch.cuda.is_available() else 'CPU'}")

# 设置设备
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"使用设备: {device}")


In [None]:
# 正确的文本识别解码方法
def predict_text(model, images):
    """使用模型的tokenizer正确解码文本"""
    with torch.no_grad():
        # 获取模型输出的概率分布
        logits = model(images)
        probs = logits.softmax(-1)
        
        # 使用模型自带的tokenizer解码
        predictions, confidences = model.tokenizer.decode(probs)
        
    return predictions

print("文本识别函数准备完成")

In [None]:
# 模型选择配置
"""
PARSeq模型选择器 - 支持所有可用的预训练模型
"""

# 定义所有可用的PARSeq模型
AVAILABLE_MODELS = {
    1: {
        'name': 'parseq_tiny', 
        'description': '微型模型 - 最快速度，适合快速测试',
        'params': '~1M',
        'accuracy': '中等'
    },
    2: {
        'name': 'parseq_patch16_224', 
        'description': '标准模型 - 平衡性能和速度',
        'params': '~23M',
        'accuracy': '高'
    },
    3: {
        'name': 'parseq', 
        'description': '完整模型 - 最佳性能',
        'params': '~23M',
        'accuracy': '最高'
    },
    4: {
        'name': 'parseq_base', 
        'description': '基础模型 - 标准配置',
        'params': '~23M',
        'accuracy': '高'
    }
}

def display_model_options():
    """显示所有可用的模型选项"""
    print("可用的PARSeq模型:")
    print("=" * 80)
    for idx, model_info in AVAILABLE_MODELS.items():
        print(f"{idx}. {model_info['name']}")
        print(f"   描述: {model_info['description']}")
        print(f"   参数量: {model_info['params']}")
        print(f"   准确性: {model_info['accuracy']}")
        print("-" * 60)
    
def select_model():
    """交互式模型选择"""
    display_model_options()
    
    while True:
        try:
            choice = input("\n请选择模型 (输入数字 1-4): ").strip()
            choice = int(choice)
            
            if choice in AVAILABLE_MODELS:
                selected_model = AVAILABLE_MODELS[choice]
                print(f"\n已选择: {selected_model['name']}")
                print(f"{selected_model['description']}")
                return selected_model['name']
            else:
                print("无效选择，请输入 1-4 之间的数字")
                
        except ValueError:
            print("请输入有效的数字")
        except KeyboardInterrupt:
            print("\n\n用户取消选择，使用默认模型: parseq")
            return 'parseq'

# 选择模型 (您可以修改这里来直接指定模型)
# 方式1: 交互式选择 (取消注释下面这行)
# selected_model_name = select_model()

# 方式2: 直接指定模型 (推荐在notebook中使用)
selected_model_name = 'parseq'  # 可以改为: 'parseq_tiny', 'parseq_patch16_224', 'parseq_base'

print(f"\n当前选择的模型: {selected_model_name}")

# 查找选中模型的详细信息
selected_info = None
for info in AVAILABLE_MODELS.values():
    if info['name'] == selected_model_name:
        selected_info = info
        break

if selected_info:
    print(f"模型信息: {selected_info['description']}")
    print(f"参数量: {selected_info['params']}")
    print(f"准确性: {selected_info['accuracy']}")
else:
    print("使用自定义模型名称")

print("\n" + "="*50)

In [None]:
# 加载选择的PARSeq模型
print(f"正在加载PARSeq模型: {selected_model_name}...")

try:
    # 动态加载选择的预训练模型
    print(f"从torch.hub加载模型: {selected_model_name}")
    model = torch.hub.load('baudm/parseq', selected_model_name, pretrained=True, trust_repo=True)
    model.eval()
    model.to(device)
    
    print(f"{selected_model_name} 模型加载成功!")
    
    # 获取模型信息
    total_params = sum(p.numel() for p in model.parameters())
    trainable_params = sum(p.numel() for p in model.parameters() if p.requires_grad)
    
    print(f"模型统计信息:")
    print(f"   总参数量: {total_params:,}")
    print(f"   可训练参数: {trainable_params:,}")
    print(f"   模型大小: ~{total_params * 4 / 1024 / 1024:.1f} MB")
    
    # 测试模型
    print(f"测试模型...")
    dummy_input = torch.randn(1, 3, 32, 128).to(device)
    with torch.no_grad():
        output = model(dummy_input)
    print(f"模型测试通过，输出形状: {output.shape}")
    
    # 显示模型架构概要
    print(f"\n模型架构:")
    print(f"   输入尺寸: {model.hparams.img_size}")
    print(f"   最大序列长度: {model.hparams.max_label_length}")
    if hasattr(model.hparams, 'charset_size'):
        print(f"   字符集大小: {model.hparams.charset_size}")
    
except Exception as e:
    print(f"模型 {selected_model_name} 加载失败: {e}")
    print("尝试加载备用模型 parseq...")
    try:
        model = torch.hub.load('baudm/parseq', 'parseq', pretrained=True, trust_repo=True)
        model.eval()
        model.to(device)
        selected_model_name = 'parseq'
        print("备用模型加载成功!")
    except Exception as e2:
        print(f"备用模型也加载失败: {e2}")
        raise

# 加载CUTE80数据集的前三张图像
cute80_dir = Path("../CUTE80")
print(f"\nCUTE80数据集路径: {cute80_dir.absolute()}")

# 获取图像文件列表
image_files = []
for ext in ['*.jpg', '*.JPG', '*.png', '*.PNG']:
    image_files.extend(list(cute80_dir.glob(ext)))

if len(image_files) == 0:
    print("未找到图像文件！请检查CUTE80数据集路径")
else:
    print(f"找到 {len(image_files)} 张图像")
    
# 选择前3张图像进行演示
test_images = image_files[:3]
print("选择用于演示的图像:")
for i, img_path in enumerate(test_images):
    print(f"  {i+1}. {img_path.name}")

print(f"\n当前使用模型: {selected_model_name}")
print("=" * 60)

In [None]:
# 快速模型切换 (可选)
"""
如果您想要切换到其他模型，请修改下面的模型名称并运行此cell
然后重新运行后续的实验cell
"""

def switch_model(new_model_name):
    """快速切换模型"""
    global model, selected_model_name
    
    print(f"切换到模型: {new_model_name}")
    
    try:
        # 释放当前模型内存
        if 'model' in globals():
            del model
            torch.cuda.empty_cache() if torch.cuda.is_available() else None
        
        # 加载新模型
        model = torch.hub.load('baudm/parseq', new_model_name, pretrained=True, trust_repo=True)
        model.eval()
        model.to(device)
        selected_model_name = new_model_name
        
        # 显示模型信息
        total_params = sum(p.numel() for p in model.parameters())
        print(f"成功切换到 {new_model_name}")
        print(f"参数量: {total_params:,}")
        print(f"模型大小: ~{total_params * 4 / 1024 / 1024:.1f} MB")
        
        # 测试模型
        dummy_input = torch.randn(1, 3, 32, 128).to(device)
        with torch.no_grad():
            output = model(dummy_input)
        print(f"模型测试通过")
        
        return True
        
    except Exception as e:
        print(f"模型切换失败: {e}")
        return False

# 使用示例:
# 取消注释下面任意一行来切换模型:

# switch_model('parseq_tiny')        # 切换到微型模型
# switch_model('parseq')             # 切换到完整模型  
# switch_model('parseq_patch16_224') # 切换到patch16模型
# switch_model('parseq_base')        # 切换到基础模型

print("模型切换功能已准备就绪")
print("如需切换模型，请取消注释上面的任意一行")
print(f"当前使用模型: {selected_model_name}")

# 显示所有可用模型的快捷切换命令
print("\n可用的模型切换命令:")
for idx, model_info in AVAILABLE_MODELS.items():
    print(f"   switch_model('{model_info['name']}')  # {model_info['description']}")

print("\n" + "="*70)

In [None]:
# 加载CUTE80数据集的前三张图像
cute80_dir = Path("../CUTE80")
print(f"CUTE80数据集路径: {cute80_dir}")

# 查找图像文件，避免重复
image_files = []
for ext in ['*.png', '*.jpg', '*.jpeg', '*.PNG', '*.JPG', '*.JPEG']:
    image_files.extend(list(cute80_dir.glob(ext)))

# 去除重复文件（Windows文件系统中.jpg和.JPG可能指向同一文件）
unique_files = {}
for file_path in image_files:
    # 使用文件的绝对路径作为唯一标识符
    key = str(file_path.resolve()).lower()
    if key not in unique_files:
        unique_files[key] = file_path

image_files = sorted(unique_files.values())[:3]  # 取前三张图像
print(f"找到 {len(image_files)} 张图像")

# 显示选中的图像文件
for i, img_path in enumerate(image_files):
    print(f"  {i+1}. {img_path.name}")

# 使用与read.py相同的图像预处理方法
from strhub.data.module import SceneTextDataModule
transform = SceneTextDataModule.get_transform(model.hparams.img_size)

# 加载图像
original_images = []
image_tensors = []
image_names = []

print("\n加载图像...")
for i, img_path in enumerate(image_files):
    # 加载原始图像
    orig_img = Image.open(img_path).convert('RGB')
    original_images.append(orig_img)
    
    # 预处理
    img_tensor = transform(orig_img)
    image_tensors.append(img_tensor)
    
    # 获取真实标签（从文件名）
    true_label = img_path.stem  # 使用文件名作为真实标签
    image_names.append(true_label)
    
    print(f"  图像 {i+1}: {img_path.name} -> {orig_img.size}")

# 转换为batch
images_batch = torch.stack(image_tensors).to(device)
print(f"\n图像批次准备完成，形状: {images_batch.shape}")

In [None]:
# 对原始图像进行正确的文本识别
print("对原始图像进行文本识别...")

# 使用正确的识别方法
original_texts = predict_text(model, images_batch)

print("原始图像识别完成!")
print("\n正确的识别结果:")
for i, (name, text) in enumerate(zip(image_names, original_texts)):
    print(f"  图像 {i+1} ({name}): \"{text}\"")

# 可视化原始图像和识别结果
fig, axes = plt.subplots(1, 3, figsize=(15, 5))
fig.suptitle("Original Images and Recognition Results", fontsize=16, fontweight='bold')

for i in range(3):
    # 显示原始图像
    axes[i].imshow(original_images[i])
    axes[i].set_title(f'Image {i+1}\nRecognition: "{original_texts[i]}"', 
                     fontsize=12, pad=10)
    axes[i].axis('off')

plt.tight_layout()
plt.show()

In [None]:
# 定义正确的对抗攻击方法
class AdversarialAttacker:
    """对抗攻击器 - 使用正确的模型接口"""
    
    def __init__(self, model, device='cpu'):
        self.model = model.eval()
        self.device = device
        
    def fgsm_attack(self, images, epsilon=0.1):
        """
        快速梯度符号方法 (FGSM) 攻击
        """
        images = images.clone().detach().to(self.device)
        images.requires_grad = True
        
        # 获取原始预测作为攻击目标
        with torch.no_grad():
            original_logits = self.model(images)
            original_probs = original_logits.softmax(-1)
            original_preds, _ = self.model.tokenizer.decode(original_probs)
        
        # 前向传播获取logits
        logits = self.model(images)
        probs = logits.softmax(-1)
        
        # 使用negative log likelihood作为损失 - 最大化损失来产生错误预测
        # 这里我们使用一个简化的方法：让模型对当前最可能的预测产生最大的损失
        target_indices = logits.argmax(-1)  # 当前最可能的预测
        
        # 计算交叉熵损失
        loss = F.cross_entropy(logits.view(-1, logits.size(-1)), 
                              target_indices.view(-1))
        
        # 反向传播
        self.model.zero_grad()
        loss.backward()
        
        # 生成对抗样本
        data_grad = images.grad.data
        sign_data_grad = data_grad.sign()
        perturbed_images = images + epsilon * sign_data_grad
        
        # 限制像素值范围
        perturbed_images = torch.clamp(perturbed_images, 0, 1)
        
        return perturbed_images.detach()
    
    def pgd_attack(self, images, epsilon=0.1, alpha=0.01, iters=10):
        """
        投影梯度下降 (PGD) 攻击
        """
        images = images.clone().detach().to(self.device)
        ori_images = images.clone().detach()
        
        # 随机初始化扰动
        perturbed_images = images + torch.empty_like(images).uniform_(-epsilon, epsilon)
        perturbed_images = torch.clamp(perturbed_images, 0, 1)
        
        for i in range(iters):
            perturbed_images.requires_grad = True
            
            logits = self.model(perturbed_images)
            target_indices = logits.argmax(-1)
            loss = F.cross_entropy(logits.view(-1, logits.size(-1)), 
                                  target_indices.view(-1))
            
            self.model.zero_grad()
            loss.backward()
            
            data_grad = perturbed_images.grad.data
            sign_data_grad = data_grad.sign()
            perturbed_images = perturbed_images.detach() + alpha * sign_data_grad
            
            # 投影到epsilon球内
            delta = torch.clamp(perturbed_images - ori_images, min=-epsilon, max=epsilon)
            perturbed_images = torch.clamp(ori_images + delta, 0, 1)
            
        return perturbed_images.detach()

# 创建修正后的攻击器
attacker = AdversarialAttacker(model, device)

In [None]:
# 执行FGSM攻击
print("执行FGSM攻击...")

# 设置攻击参数
epsilon = 0.1  # 扰动强度

# 执行FGSM攻击
fgsm_adversarial = attacker.fgsm_attack(images_batch, epsilon=epsilon)

print(f"FGSM攻击完成! (epsilon={epsilon})")
print(f"   原始图像范围: [{images_batch.min():.3f}, {images_batch.max():.3f}]")
print(f"   对抗样本范围: [{fgsm_adversarial.min():.3f}, {fgsm_adversarial.max():.3f}]")

# 计算扰动
fgsm_perturbation = fgsm_adversarial - images_batch
print(f"   扰动范围: [{fgsm_perturbation.min():.3f}, {fgsm_perturbation.max():.3f}]")
print(f"   平均扰动幅度: {fgsm_perturbation.abs().mean():.6f}")

In [None]:
# 对攻击后的图像进行正确的文本识别
print("对FGSM攻击后的图像进行文本识别...")

# 使用正确的识别方法
fgsm_texts = predict_text(model, fgsm_adversarial)

print("FGSM对抗样本识别完成!")

# 比较攻击前后的结果
print("\nFGSM攻击效果对比:")
print("="*60)
for i in range(3):
    attack_success = original_texts[i] != fgsm_texts[i]
    status = "[攻击成功]" if attack_success else "[攻击失败]"
    
    print(f"图像 {i+1} ({image_names[i]}):")
    print(f"  原始识别: \"{original_texts[i]}\"")
    print(f"  攻击后:   \"{fgsm_texts[i]}\"")
    print(f"  状态:     {status}")
    print("-" * 40)

# 计算攻击成功率
success_count = sum(1 for orig, adv in zip(original_texts, fgsm_texts) if orig != adv)
success_rate = success_count / len(original_texts)
print(f"\nFGSM攻击成功率: {success_count}/{len(original_texts)} = {success_rate:.1%}")

In [None]:
# 执行PGD攻击进行对比
print("执行PGD攻击...")

# PGD攻击参数
pgd_epsilon = 0.1
pgd_alpha = 0.02
pgd_iters = 10

# 执行PGD攻击
pgd_adversarial = attacker.pgd_attack(images_batch, 
                                     epsilon=pgd_epsilon, 
                                     alpha=pgd_alpha, 
                                     iters=pgd_iters)

print(f"PGD攻击完成! (epsilon={pgd_epsilon}, alpha={pgd_alpha}, iters={pgd_iters})")

# 对PGD对抗样本进行正确识别
pgd_texts = predict_text(model, pgd_adversarial)

print("PGD对抗样本识别完成!")

# PGD攻击效果
print("\nPGD攻击效果对比:")
print("="*60)
for i in range(3):
    attack_success = original_texts[i] != pgd_texts[i]
    status = "[攻击成功]" if attack_success else "[攻击失败]"
    
    print(f"图像 {i+1} ({image_names[i]}):")
    print(f"  原始识别: \"{original_texts[i]}\"")
    print(f"  PGD攻击后: \"{pgd_texts[i]}\"")
    print(f"  状态:     {status}")
    print("-" * 40)

# 计算PGD攻击成功率
pgd_success_count = sum(1 for orig, adv in zip(original_texts, pgd_texts) if orig != adv)
pgd_success_rate = pgd_success_count / len(original_texts)
print(f"\nPGD攻击成功率: {pgd_success_count}/{len(original_texts)} = {pgd_success_rate:.1%}")

In [None]:
# 反归一化函数用于可视化
def denormalize_tensor(tensor, mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]):
    """将归一化的tensor转换回[0,1]范围用于显示"""
    mean = torch.tensor(mean).view(1, 3, 1, 1).to(tensor.device)
    std = torch.tensor(std).view(1, 3, 1, 1).to(tensor.device)
    
    # 反归一化
    denorm = tensor * std + mean
    
    # 限制到[0,1]范围
    denorm = torch.clamp(denorm, 0, 1)
    
    return denorm

def tensor_to_image(tensor):
    """将tensor转换为可显示的numpy数组"""
    if tensor.dim() == 4:  # batch tensor
        tensor = tensor[0]  # 取第一张图
    
    # 转换维度顺序: CHW -> HWC
    img = tensor.permute(1, 2, 0).cpu().numpy()
    
    # 确保在[0,1]范围内
    img = np.clip(img, 0, 1)
    
    return img

print("Visualization helper functions ready")

In [None]:
# 可视化攻击效果对比
print("Generating visualization comparison...")

# 首先以文本形式输出攻击结果
print("\n" + "="*80)
print("文本识别攻击结果对比")
print("="*80)

for i in range(len(original_texts)):
    print(f"\n图像 {i+1} ({image_names[i]}):")
    print(f"   原始识别结果: \"{original_texts[i]}\"")
    
    # FGSM攻击结果
    fgsm_status = "[攻击成功]" if original_texts[i] != fgsm_texts[i] else "[攻击失败]"
    print(f"   FGSM攻击后: \"{fgsm_texts[i]}\" {fgsm_status}")
    
    # PGD攻击结果
    pgd_status = "[攻击成功]" if original_texts[i] != pgd_texts[i] else "[攻击失败]"
    print(f"   PGD攻击后:  \"{pgd_texts[i]}\" {pgd_status}")

print("\n" + "="*80)
print(f"攻击统计:")
print(f"   FGSM成功率: {success_rate:.1%} ({success_count}/{len(original_texts)})")
print(f"   PGD成功率:  {pgd_success_rate:.1%} ({pgd_success_count}/{len(original_texts)})")
print("="*80 + "\n")

# 反归一化用于显示
original_denorm = denormalize_tensor(images_batch)
fgsm_denorm = denormalize_tensor(fgsm_adversarial)
pgd_denorm = denormalize_tensor(pgd_adversarial)

# 计算扰动
fgsm_perturbation = fgsm_denorm - original_denorm
pgd_perturbation = pgd_denorm - original_denorm

# 创建大图进行对比展示
fig, axes = plt.subplots(3, 5, figsize=(20, 12))
fig.suptitle('PARSeq Text Recognition Model Adversarial Attack Comparison', fontsize=18, fontweight='bold', y=0.98)

# 设置列标题
col_titles = ['Original Image', 'FGSM Adversarial', 'FGSM Perturbation', 'PGD Adversarial', 'PGD Perturbation']
for j, title in enumerate(col_titles):
    axes[0, j].set_title(title, fontsize=14, fontweight='bold', pad=15)

for i in range(3):  # 三张图像
    # 原始图像
    axes[i, 0].imshow(tensor_to_image(original_denorm[i]))
    axes[i, 0].text(0.5, -0.15, f'Recognition: "{original_texts[i]}"', 
                    transform=axes[i, 0].transAxes, ha='center', fontsize=10,
                    bbox=dict(boxstyle="round,pad=0.3", facecolor="lightblue"))
    
    # FGSM对抗样本
    axes[i, 1].imshow(tensor_to_image(fgsm_denorm[i]))
    fgsm_success = "[SUCCESS]" if original_texts[i] != fgsm_texts[i] else "[FAILED]"
    axes[i, 1].text(0.5, -0.15, f'Recognition: "{fgsm_texts[i]}" {fgsm_success}', 
                    transform=axes[i, 1].transAxes, ha='center', fontsize=10,
                    bbox=dict(boxstyle="round,pad=0.3", 
                             facecolor="lightcoral" if fgsm_success=="[SUCCESS]" else "lightgray"))
    
    # FGSM扰动可视化
    pert_img = fgsm_perturbation[i].cpu().permute(1, 2, 0).numpy()
    pert_img = (pert_img - pert_img.min()) / (pert_img.max() - pert_img.min() + 1e-8)
    axes[i, 2].imshow(pert_img, cmap='seismic')
    axes[i, 2].text(0.5, -0.15, f'Magnitude: {fgsm_perturbation[i].abs().mean():.4f}', 
                    transform=axes[i, 2].transAxes, ha='center', fontsize=10)
    
    # PGD对抗样本
    axes[i, 3].imshow(tensor_to_image(pgd_denorm[i]))
    pgd_success = "[SUCCESS]" if original_texts[i] != pgd_texts[i] else "[FAILED]"
    axes[i, 3].text(0.5, -0.15, f'Recognition: "{pgd_texts[i]}" {pgd_success}', 
                    transform=axes[i, 3].transAxes, ha='center', fontsize=10,
                    bbox=dict(boxstyle="round,pad=0.3", 
                             facecolor="lightcoral" if pgd_success=="[SUCCESS]" else "lightgray"))
    
    # PGD扰动可视化
    pert_img = pgd_perturbation[i].cpu().permute(1, 2, 0).numpy()
    pert_img = (pert_img - pert_img.min()) / (pert_img.max() - pert_img.min() + 1e-8)
    axes[i, 4].imshow(pert_img, cmap='seismic')
    axes[i, 4].text(0.5, -0.15, f'Magnitude: {pgd_perturbation[i].abs().mean():.4f}', 
                    transform=axes[i, 4].transAxes, ha='center', fontsize=10)
    
    # 设置行标题
    axes[i, 0].text(-0.15, 0.5, f'Image {i+1}\n({image_names[i]})', 
                    transform=axes[i, 0].transAxes, ha='center', va='center',
                    rotation=90, fontsize=12, fontweight='bold')

# 移除所有坐标轴
for i in range(3):
    for j in range(5):
        axes[i, j].axis('off')

plt.tight_layout()
plt.subplots_adjust(top=0.93, hspace=0.3)
plt.show()

print("Visualization completed!")

In [None]:
# 实验总结
"""
PARSeq文本识别模型对抗攻击实验总结
"""

print("=" * 80)
print("实验总结报告")
print("=" * 80)

print(f"使用模型: {selected_model_name}")
print(f"测试图像数量: {len(original_texts)}")
print(f"攻击方法: FGSM 和 PGD")

print(f"\n攻击结果:")
print(f"  FGSM攻击成功率: {success_rate:.1%} ({success_count}/{len(original_texts)})")
print(f"  PGD攻击成功率:  {pgd_success_rate:.1%} ({pgd_success_count}/{len(original_texts)})")

print(f"\n攻击参数:")
print(f"  FGSM epsilon: {epsilon}")
print(f"  PGD epsilon: {pgd_epsilon}, alpha: {pgd_alpha}, iterations: {pgd_iters}")

print(f"\n扰动分析:")
fgsm_avg_perturbation = fgsm_perturbation.abs().mean().item()
pgd_avg_perturbation = (pgd_adversarial - images_batch).abs().mean().item()
print(f"  FGSM平均扰动幅度: {fgsm_avg_perturbation:.6f}")
print(f"  PGD平均扰动幅度:  {pgd_avg_perturbation:.6f}")

print(f"\n结论:")
if pgd_success_rate > success_rate:
    print("  PGD攻击比FGSM攻击更有效")
elif pgd_success_rate < success_rate:
    print("  FGSM攻击比PGD攻击更有效")
else:
    print("  FGSM和PGD攻击效果相当")

if max(success_rate, pgd_success_rate) > 0.5:
    print("  该模型对对抗攻击较为敏感")
else:
    print("  该模型对对抗攻击具有一定的鲁棒性")

print("=" * 80)
print("实验完成!")
print("=" * 80)

In [None]:
# 对整个CUTE80数据集进行攻击和测试
import time
from tqdm import tqdm
import pandas as pd

print("=" * 80)
print("开始对整个CUTE80数据集进行对抗攻击测试")
print("=" * 80)

# 重新加载所有图像文件，避免重复
cute80_dir = Path("../CUTE80")
all_image_files = []
for ext in ['*.jpg', '*.JPG', '*.png', '*.PNG']:
    all_image_files.extend(list(cute80_dir.glob(ext)))

# 去除重复文件（Windows文件系统中.jpg和.JPG可能指向同一文件）
unique_files = {}
for file_path in all_image_files:
    # 使用文件的绝对路径作为唯一标识符
    key = str(file_path.resolve()).lower()
    if key not in unique_files:
        unique_files[key] = file_path

all_image_files = sorted(unique_files.values())
print(f"总共找到 {len(all_image_files)} 张图像")

# 批处理设置
batch_size = 8  # 根据GPU内存调整
total_images = len(all_image_files)
num_batches = (total_images + batch_size - 1) // batch_size

print(f"将分 {num_batches} 个批次处理，每批 {batch_size} 张图像")

# 结果存储
results = {
    'image_name': [],
    'original_text': [],
    'fgsm_text': [],
    'pgd_text': [],
    'fgsm_success': [],
    'pgd_success': [],
    'fgsm_perturbation': [],
    'pgd_perturbation': []
}

# 攻击参数
epsilon = 0.1
pgd_epsilon = 0.1
pgd_alpha = 0.02
pgd_iters = 10

print(f"\n攻击参数:")
print(f"  FGSM epsilon: {epsilon}")
print(f"  PGD epsilon: {pgd_epsilon}, alpha: {pgd_alpha}, iterations: {pgd_iters}")
print(f"  批处理大小: {batch_size}")

start_time = time.time()

# 分批处理图像
for batch_idx in tqdm(range(num_batches), desc="处理批次"):
    # 计算当前批次的图像范围
    start_idx = batch_idx * batch_size
    end_idx = min(start_idx + batch_size, total_images)
    current_batch_files = all_image_files[start_idx:end_idx]
    
    try:
        # 为当前批次加载图像
        batch_images = []
        batch_names = []
        
        for img_path in current_batch_files:
            # 加载图像
            img = Image.open(img_path).convert('RGB')
            img_tensor = transform(img)
            batch_images.append(img_tensor)
            batch_names.append(img_path.stem)
        
        # 创建批次张量
        batch_tensor = torch.stack(batch_images).to(device)
        
        # 获取原始识别结果
        original_batch_texts = predict_text(model, batch_tensor)
        
        # 执行FGSM攻击
        fgsm_adversarial_batch = attacker.fgsm_attack(batch_tensor, epsilon=epsilon)
        fgsm_batch_texts = predict_text(model, fgsm_adversarial_batch)
        
        # 执行PGD攻击
        pgd_adversarial_batch = attacker.pgd_attack(batch_tensor, 
                                                   epsilon=pgd_epsilon, 
                                                   alpha=pgd_alpha, 
                                                   iters=pgd_iters)
        pgd_batch_texts = predict_text(model, pgd_adversarial_batch)
        
        # 计算扰动
        fgsm_perturbations = (fgsm_adversarial_batch - batch_tensor).abs().mean(dim=[1,2,3])
        pgd_perturbations = (pgd_adversarial_batch - batch_tensor).abs().mean(dim=[1,2,3])
        
        # 保存结果
        for i in range(len(current_batch_files)):
            results['image_name'].append(batch_names[i])
            results['original_text'].append(original_batch_texts[i])
            results['fgsm_text'].append(fgsm_batch_texts[i])
            results['pgd_text'].append(pgd_batch_texts[i])
            results['fgsm_success'].append(original_batch_texts[i] != fgsm_batch_texts[i])
            results['pgd_success'].append(original_batch_texts[i] != pgd_batch_texts[i])
            results['fgsm_perturbation'].append(fgsm_perturbations[i].item())
            results['pgd_perturbation'].append(pgd_perturbations[i].item())
        
        # 显示进度信息
        if (batch_idx + 1) % 10 == 0 or batch_idx == 0:
            current_progress = (batch_idx + 1) / num_batches * 100
            current_fgsm_success = sum(results['fgsm_success'])
            current_pgd_success = sum(results['pgd_success']) 
            current_total = len(results['image_name'])
            print(f"\n进度 {current_progress:.1f}% - 已处理 {current_total} 张图像")
            print(f"当前FGSM成功率: {current_fgsm_success/current_total:.1%}")
            print(f"当前PGD成功率: {current_pgd_success/current_total:.1%}")
    
    except Exception as e:
        print(f"\n批次 {batch_idx + 1} 处理失败: {e}")
        # 为失败的图像添加默认值
        for img_path in current_batch_files:
            results['image_name'].append(img_path.stem)
            results['original_text'].append("ERROR")
            results['fgsm_text'].append("ERROR")
            results['pgd_text'].append("ERROR")
            results['fgsm_success'].append(False)
            results['pgd_success'].append(False)
            results['fgsm_perturbation'].append(0.0)
            results['pgd_perturbation'].append(0.0)
        continue

end_time = time.time()
processing_time = end_time - start_time

print(f"\n处理完成! 总耗时: {processing_time:.2f} 秒")
print(f"平均每张图像处理时间: {processing_time/total_images:.3f} 秒")

# 创建结果DataFrame
df_results = pd.DataFrame(results)

print("\n" + "=" * 80)
print("Complete Dataset Attack Results Statistics")
print("=" * 80)

In [None]:
# 分析和可视化整个数据集的攻击结果

# 基本统计
total_samples = len(df_results)
fgsm_success_count = df_results['fgsm_success'].sum()
pgd_success_count = df_results['pgd_success'].sum()
fgsm_success_rate = fgsm_success_count / total_samples
pgd_success_rate = pgd_success_count / total_samples

print(f"数据集规模: {total_samples} 张图像")
print(f"FGSM攻击成功: {fgsm_success_count} / {total_samples} = {fgsm_success_rate:.1%}")
print(f"PGD攻击成功:  {pgd_success_count} / {total_samples} = {pgd_success_rate:.1%}")

# 扰动分析
avg_fgsm_perturbation = df_results['fgsm_perturbation'].mean()
avg_pgd_perturbation = df_results['pgd_perturbation'].mean()
std_fgsm_perturbation = df_results['fgsm_perturbation'].std()
std_pgd_perturbation = df_results['pgd_perturbation'].std()

print(f"\n扰动统计:")
print(f"FGSM平均扰动: {avg_fgsm_perturbation:.6f} ± {std_fgsm_perturbation:.6f}")
print(f"PGD平均扰动:  {avg_pgd_perturbation:.6f} ± {std_pgd_perturbation:.6f}")

# 成功案例分析
successful_fgsm = df_results[df_results['fgsm_success'] == True]
successful_pgd = df_results[df_results['pgd_success'] == True]
both_successful = df_results[(df_results['fgsm_success'] == True) & 
                            (df_results['pgd_success'] == True)]

print(f"\n攻击成功模式分析:")
print(f"仅FGSM成功: {len(successful_fgsm) - len(both_successful)} 个案例")
print(f"仅PGD成功:  {len(successful_pgd) - len(both_successful)} 个案例")
print(f"两种攻击都成功: {len(both_successful)} 个案例")
print(f"两种攻击都失败: {total_samples - len(successful_fgsm.index.union(successful_pgd.index))} 个案例")

# 保存详细结果到CSV文件
results_file = 'adversarial_attack_results_full_dataset.csv'
df_results.to_csv(results_file, index=False, encoding='utf-8')
print(f"\n详细结果已保存到: {results_file}")

# 可视化结果
import matplotlib.pyplot as plt
import numpy as np

# 创建综合分析图表
fig, axes = plt.subplots(2, 3, figsize=(18, 12))
fig.suptitle(f'CUTE80 Dataset Adversarial Attack Complete Analysis ({total_samples} Images)', fontsize=16, fontweight='bold')

# 1. Attack Success Rate Comparison
axes[0, 0].bar(['FGSM', 'PGD'], [fgsm_success_rate*100, pgd_success_rate*100], 
               color=['lightcoral', 'lightblue'], alpha=0.7)
axes[0, 0].set_ylabel('Success Rate (%)')
axes[0, 0].set_title('Attack Success Rate Comparison')
axes[0, 0].set_ylim(0, 100)
for i, v in enumerate([fgsm_success_rate*100, pgd_success_rate*100]):
    axes[0, 0].text(i, v + 1, f'{v:.1f}%', ha='center', fontweight='bold')

# 2. Perturbation Magnitude Distribution
axes[0, 1].hist(df_results['fgsm_perturbation'], bins=30, alpha=0.6, label='FGSM', color='lightcoral')
axes[0, 1].hist(df_results['pgd_perturbation'], bins=30, alpha=0.6, label='PGD', color='lightblue')
axes[0, 1].set_xlabel('Perturbation Magnitude')
axes[0, 1].set_ylabel('Number of Images')
axes[0, 1].set_title('Perturbation Magnitude Distribution')
axes[0, 1].legend()
axes[0, 1].grid(True, alpha=0.3)

# 3. FGSM Attack Success vs Perturbation Relationship
fgsm_successful_pert = df_results[df_results['fgsm_success']]['fgsm_perturbation']
fgsm_failed_pert = df_results[~df_results['fgsm_success']]['fgsm_perturbation']
axes[0, 2].boxplot([fgsm_successful_pert, fgsm_failed_pert], 
                   labels=['FGSM Success', 'FGSM Failed'])
axes[0, 2].set_ylabel('Perturbation Magnitude')
axes[0, 2].set_title('FGSM Attack Success vs Perturbation')
axes[0, 2].grid(True, alpha=0.3)

# 4. Attack Pattern Distribution Pie Chart
attack_patterns = ['Both Success', 'FGSM Only', 'PGD Only', 'Both Failed']
pattern_counts = [
    len(both_successful),
    len(successful_fgsm) - len(both_successful),
    len(successful_pgd) - len(both_successful),
    total_samples - len(successful_fgsm.index.union(successful_pgd.index))
]
colors = ['red', 'orange', 'yellow', 'lightgray']
axes[1, 0].pie(pattern_counts, labels=attack_patterns, autopct='%1.1f%%', colors=colors)
axes[1, 0].set_title('Attack Pattern Distribution')

# 5. PGD Attack Success vs Perturbation Relationship
pgd_successful_pert = df_results[df_results['pgd_success']]['pgd_perturbation']
pgd_failed_pert = df_results[~df_results['pgd_success']]['pgd_perturbation']
axes[1, 1].boxplot([pgd_successful_pert, pgd_failed_pert], 
                   labels=['PGD Success', 'PGD Failed'])
axes[1, 1].set_ylabel('Perturbation Magnitude')
axes[1, 1].set_title('PGD Attack Success vs Perturbation')
axes[1, 1].grid(True, alpha=0.3)

# 6. FGSM vs PGD Perturbation Magnitude Scatter Plot
axes[1, 2].scatter(df_results['fgsm_perturbation'], df_results['pgd_perturbation'], 
                   alpha=0.6, s=20)
axes[1, 2].plot([0, max(df_results['fgsm_perturbation'].max(), df_results['pgd_perturbation'].max())], 
                [0, max(df_results['fgsm_perturbation'].max(), df_results['pgd_perturbation'].max())], 
                'r--', alpha=0.5)
axes[1, 2].set_xlabel('FGSM Perturbation Magnitude')
axes[1, 2].set_ylabel('PGD Perturbation Magnitude')
axes[1, 2].set_title('FGSM vs PGD Perturbation Comparison')
axes[1, 2].grid(True, alpha=0.3)

plt.tight_layout()
plt.show()

print("\n" + "=" * 80)
print("数据集攻击分析完成!")
print("=" * 80)