In [None]:
import os
import torchvision as tv
from torch.utils.data import DataLoader
from torch.optim import Adam
import torch.nn as nn
from tqdm import tqdm
import torch as t
import numpy as np
from time import strftime

from config import opt
import models
from attack.PixelAttack import attack_all
from attack.PGDAttack import LinfPGDAttack

DOWNLOAD_CIFAR10=False   #是否需要下载数据集


In [None]:
class Config():
    #一般只修改以下两项
    model_path = './ckps/MobileNet_04_19_11_37_ep200.pth'  # 预训练模型，None表示重新训练，训练好的模型会存放在'.ckps/'文件夹下
    model = 'MobileNet'#加载的模型，模型名必须与models/__init__.py中的名字一致，注释中已经给出要用到的格式
    
    '''
    MobileNet,ShuffleNetV2,ShuffleNet,MobileNetV2,SqueezeNet
    VGG11,VGG13,VGG16,VGG19,
    ResNet18,ResNet34,ResNet50,ResNet101,ResNet152
    '''
    
    use_gpu=True    #是否使用gpu
   
    
    attack_num = 1000  #选择攻击的样本数量

    train_epoch = 200 #将数据集训练多少次
    save_every = 50#每多少轮迭代保存一次模型
    batch_size = 256 #每次训练喂入多少数据

    print_freq = 100    #每训练多少批次就打印一次

    def _parese(self):
        self.device = t.device('cuda') if self.use_gpu else t.device('cpu')
        print('Caculate on {}'.format(self.device))
        print('user config:')
        for k, v in self.__class__.__dict__.items():
            if not k.startswith('_'):
                print(k, getattr(self, k))
            

In [None]:

def train():
    '''
    训练神经网络
    :return:
    '''
    #1.加载配置
    opt = Config()
    opt._parese()
    global DOWNLOAD_CIFAR10

    #1a.加载模型
    model = getattr(models,opt.model)()
    if opt.model_path:
        model.load(opt.model_path)
    model.to(opt.device)

    #2.定义损失函数和优化器
    criterion = nn.CrossEntropyLoss()
    optimizer = Adam(model.parameters(), lr=0.0001)


    #3.加载数据
    if not (os.path.exists('./data/cifar/')) or not os.listdir('./data/cifar/'):
        DOWNLOAD_CIFAR10=True

    transform = tv.transforms.Compose([
        tv.transforms.ToTensor(),
        tv.transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))
    ])

    train_data = tv.datasets.CIFAR10(
        root='./data/cifar/',
        train=True,
        transform=transform,
        download=DOWNLOAD_CIFAR10
    )



    train_loader = DataLoader(
        dataset=train_data,
        batch_size=opt.batch_size,
        shuffle=True,
        num_workers=8
                              )

    #训练模型
    for epoch in range(opt.train_epoch):
        for ii,(data,label) in tqdm(enumerate(train_loader)):
            input = data.to(opt.device)
            target = label.to(opt.device)

            optimizer.zero_grad()
            score = model(input)
            loss = criterion(score,target)
            loss.backward()
            optimizer.step()

            if (ii+1)%opt.print_freq ==0:
                print('loss:%.2f'%loss.cpu().data.numpy())
        if (epoch+1)%opt.save_every==0:
            model.save(epoch=epoch)
        

@t.no_grad()
def test_acc():
    '''
    测试模型准确率
    :return:
    '''
    # 1.加载配置
    opt = Config()
    opt._parese()
    global DOWNLOAD_CIFAR10

    if not (os.path.exists('./data/cifar/')) or not os.listdir('./data/cifar/'):
        DOWNLOAD_CIFAR10=True

    # 1a.加载模型
    model = getattr(models, opt.model)()
    if opt.model_path:
        model.load(opt.model_path)
    model.to(opt.device)

    #2.加载数据
    transform = tv.transforms.Compose(        [
            tv.transforms.ToTensor(),
            tv.transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))
        ])

    test_data = tv.datasets.CIFAR10(
        root='./data/cifar/',
        train=False,
        transform=transform,
        download=DOWNLOAD_CIFAR10
    )

    test_loader = DataLoader(test_data,batch_size=1000,shuffle=True,num_workers=8)

    dataiter = iter(test_loader)
    test_x, test_y = next(dataiter)
    test_x = test_x.to(opt.device)

    test_score = model(test_x)
    accuracy = np.mean((t.argmax(test_score.to('cpu'),1)==test_y).numpy())
    print('test accuracy:%.2f' % accuracy)
    return accuracy


def attack_model_pixel():
    '''
    pixel攻击模型
    :return:
    '''
    accuracy = test_acc()
    # 1.加载配置
    opt = Config()
    opt._parese()
    global DOWNLOAD_CIFAR10
    if not (os.path.exists('./data/cifar/')) or not os.listdir('./data/cifar/'):
        DOWNLOAD_CIFAR10=True

    # 1a.加载模型
    model = getattr(models, opt.model)()
    if opt.model_path:
        model.load(opt.model_path)
    model.to(opt.device).eval()


    # 2.加载数据
    transform = tv.transforms.Compose([
           tv.transforms.ToTensor(),
           tv.transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))
       ])

    test_data = tv.datasets.CIFAR10(
        root='./data/cifar/',
        train=False,
        transform=transform,
        download=DOWNLOAD_CIFAR10
    )

    test_loader = DataLoader(test_data, batch_size=1, shuffle=True)



    success_rate = attack_all(model,test_loader,pixels=1,targeted=False,maxiter=400,popsize=400,verbose=False,device=opt.device,sample=opt.attack_num)
    string = 'attack mode:pixel attack | model name:{} | accuracy:{} | success rate:{}| time: {}\n'.format(opt.model,accuracy,success_rate, strftime('%m_%d_%H_%M_%S'))
    open('log.txt','a').write(string)

def attack_model_PGD():
    '''
    PGD攻击模型
    :return:
    '''
    accuracy = test_acc()
    # 1.加载配置
    opt = Config()
    opt._parese()
    global DOWNLOAD_CIFAR10
    if not (os.path.exists('./data/cifar/')) or not os.listdir('./data/cifar/'):
        DOWNLOAD_CIFAR10=True

    # 1a.加载模型
    model = getattr(models, opt.model)()
    if opt.model_path:
        model.load(opt.model_path)
    model.to(opt.device).eval()


    # 2.加载数据
    transform = tv.transforms.Compose([
           tv.transforms.ToTensor(),
           tv.transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))
       ])

    test_data = tv.datasets.CIFAR10(
        root='./data/cifar/',
        train=False,
        transform=transform,
        download=DOWNLOAD_CIFAR10
    )

    test_loader = DataLoader(test_data, batch_size=1, shuffle=True)

    success_num = 0
    attack = LinfPGDAttack(model)
    for ii, (data, label) in enumerate(test_loader):
        if ii>=opt.attack_num:
            break
        data,label = data.to(opt.device),label.to(opt.device)
        test_score = model(data)
        if t.argmax(test_score.to('cpu'), 1) == label:
            continue
        perturb_x =attack.perturb(data,label)
        test_score = model(t.FloatTensor(perturb_x).to(opt.device))
        if t.argmax(test_score.to('cpu'), 1) != label:
            success_num+=1


    success_rate = success_num/ii

    string = 'attack mode:PGD | model name:{} | accuracy:{} | success rate:{}| time: {}\n'.format(opt.model,accuracy,success_rate, strftime('%m_%d_%H_%M_%S'))
    open('log.txt','a').write(string)

In [None]:
#在这里运行即可
#train()
#attack_model_PGD()
#attack_model_pixel()
