In [None]:
from time import strftime

import torch as t
import os
import torchvision as tv
from torch.utils.data import DataLoader
import numpy as np
from torch import nn
from torch.optim import Adam
import tqdm

import models
from attack.PGDAttack import LinfPGDAttack
from attack.PixelAttack import attack_all
DOWNLOAD_ImageNet = False #是否下载数据集

In [None]:

def train():
    '''
    训练神经网络
    :return:
    '''
    #1.加载配置
    opt = Config()
    opt._parese()
    global DOWNLOAD_ImageNet

    #1a.加载模型
    model = getattr(models,opt.model)()
    if opt.model_path:
        model.load(opt.model_path)
    model.to(opt.device)

    #2.定义损失函数和优化器
    criterion = nn.CrossEntropyLoss()
    optimizer = Adam(model.parameters(), lr=opt.lr)


    #3.加载数据
    if not (os.path.exists('./data/ImageNet/')) or not os.listdir('./data/ImageNet/'):
        DOWNLOAD_ImageNet=True

    transform = tv.transforms.Compose([
        #要先完成所有操作，然后totensor，normalize
        tv.transforms.Resize(256),
        tv.transforms.CenterCrop(224),
        tv.transforms.ToTensor(),
        tv.transforms.Normalize(mean=opt.MEAN, std=opt.STD)
    ])
    train_data = tv.datasets.ImageNet(
        root='./data/ImageNet/',
        split='val',
        transform=transform,
        download=DOWNLOAD_ImageNet
    )

    train_loader = DataLoader(
        dataset=train_data,
        batch_size=opt.batch_size,
        shuffle=True,
        num_workers=opt.num_workers
                              )

    #训练模型
    for epoch in range(opt.train_epoch):
        for ii,(data,label) in tqdm.tqdm(enumerate(train_loader)):
            input = data.to(opt.device)
            target = label.to(opt.device)

            optimizer.zero_grad()
            score = model(input)
            loss = criterion(score,target)
            loss.backward()
            optimizer.step()

            if (ii+1)%opt.print_freq ==0:
                print('loss:%.2f'%loss.cpu().data.numpy())
        if (epoch + 1) % opt.save_every == 0:
            model.save(epoch=epoch+1)

@t.no_grad()
def test_acc():
    #测试准确率
    # 1. 加载配置
    opt = Config()
    opt._parese()
    global DOWNLOAD_ImageNet
    if not (os.path.exists('./data/ImageNet/')) or not os.listdir('./data/ImageNet/'):
        DOWNLOAD_ImageNet=True

    # 1a.加载模型
    model = getattr(models, opt.model)()
    if opt.model_path:
        model.load(opt.model_path)
    model.to(opt.device)
    model.eval()

    # 2.加载数据
    transform = tv.transforms.Compose([
        tv.transforms.Resize(256),
        tv.transforms.CenterCrop(224),
        tv.transforms.ToTensor(),
        tv.transforms.Normalize(mean=opt.MEAN, std=opt.STD)

    ])

    test_data = tv.datasets.ImageNet(
        root='./data/ImageNet/',
        split='val',
        transform=transform,
        download=DOWNLOAD_ImageNet
    )
    #test_data.loader
    test_loader = DataLoader(test_data, batch_size=opt.test_num, shuffle=True, num_workers=opt.num_workers)

    dataiter = iter(test_loader)
    test_x, test_y = next(dataiter)
    test_x = test_x.to(opt.device)

    test_score = model(test_x)
    accuracy = np.mean((t.argmax(test_score.to('cpu'), 1) == test_y).numpy())
    print('test accuracy:%.2f' % accuracy)
    return accuracy

@t.no_grad()
def attack_model_pixel():
    '''
    pixel攻击模型
    :return:
    '''

    # 1.加载配置
    opt = Config()
    opt._parese()
    global DOWNLOAD_ImageNet
    if not (os.path.exists('./data/ImageNet/')) or not os.listdir('./data/ImageNet/'):
        DOWNLOAD_ImageNet=True

    # 1a.加载模型
    model = getattr(models, opt.model)()
    if opt.model_path:
        model.load(opt.model_path)
    model.to(opt.device).eval()


    # 2.加载数据
    transform = tv.transforms.Compose([
        tv.transforms.Resize(256),
        tv.transforms.CenterCrop(224),
        tv.transforms.ToTensor(),
        tv.transforms.Normalize(mean=opt.MEAN,std= opt.STD)
    ])

    test_data = tv.datasets.ImageNet(
        root='./data/ImageNet/',
        split='val',
        transform=transform,
        download=DOWNLOAD_ImageNet
    )

    test_loader = DataLoader(test_data, batch_size=1, shuffle=True)



    success_rate = attack_all(model,test_loader,pixels=3,targeted=False,maxiter=100,popsize=400,verbose=False,device=opt.device,sample=opt.test_num)
    accuracy = test_acc()
    accuracy_after = accuracy*(1-success_rate)
    string = 'pixel , {} , {} , {} , {} ,  {}\n'.format(opt.model_path,accuracy,accuracy_after,success_rate, strftime('%m_%d_%H_%M_%S'))
    open('log.csv','a').write(string)


def attack_model_PGD():
    '''
    PGD攻击模型
    :return:
    '''

    # 1.加载配置
    opt = Config()
    opt._parese()
    global DOWNLOAD_ImageNet
    if not (os.path.exists('./data/ImageNet/')) or not os.listdir('./data/ImageNet/'):
        DOWNLOAD_ImageNet=True

    # 1a.加载模型
    model = getattr(models, opt.model)()
    if opt.model_path:
        model.load(opt.model_path)
    model.to(opt.device).eval()


    # 2.加载数据
    transform = tv.transforms.Compose([
        tv.transforms.Resize(256),
        tv.transforms.CenterCrop(224),
        tv.transforms.ToTensor(),
        tv.transforms.Normalize(mean=opt.MEAN,std= opt.STD)
    ])

    test_data = tv.datasets.ImageNet(
        root='./data/ImageNet/',
        split='val',
        transform=transform,
        download=DOWNLOAD_ImageNet
    )

    test_loader = DataLoader(test_data, batch_size=1, shuffle=True)

    success_num = 0
    attack = LinfPGDAttack(model)
    for ii, (data, label) in enumerate(test_loader):
        if ii>=opt.test_num:
            break

        test_score = model(data.to(opt.device))
        if t.argmax(test_score.to('cpu'), 1) == label.to('cpu'):
            continue
        perturb_x =attack.perturb(data.numpy(),label)
        test_score = model(t.FloatTensor(perturb_x).to(opt.device))
        if t.argmax(test_score.to('cpu'), 1) != label:
            success_num+=1


    success_rate = success_num/ii
    accuracy = test_acc()
    accuracy_after = accuracy*(1-success_rate)
    string = 'PGD , {} , {} , {} , {} ,  {}\n'.format(opt.model_path, accuracy, accuracy_after, success_rate,strftime('%m_%d_%H_%M_%S'))
    open('log.csv', 'a').write(string)


In [None]:
class Config:
    model_path = None# 预训练模型，None表示重新训练
    model = 'ShuffleNetV2'#加载的模型，模型名必须与models/__init__.py中的名字一致
    '''
    ShuffleNetV2,MobileNetV2,SqueezeNet1_0,SqueezeNet1_1
    VGG11,VGG13,VGG16,VGG19
    ResNet18,ResNet34,ResNet50
    '''
    lr = 0.0005 #学习率
    use_gpu = True  #是否使用gpu
    MEAN=(0.485, 0.456, 0.406)
    STD=(0.229, 0.224, 0.225)#均值和方差
    train_epoch = 1  # 将数据集训练多少次
    save_every = 1  # 每训练多少轮保存一次模型
    # imagenet得出的较好的值，具体过程参考
    # https://cloud.tencent.com/developer/ask/153881

    test_num = 16  # 选择攻击和测试的样本数量


    batch_size = 128  # 每次喂入多少数据

    print_freq = 500  # 每训练多少批次就打印一次
    num_workers = 8 #加载数据集的线程数
    
    def _parese(self):
        self.device = t.device('cuda') if self.use_gpu else t.device('cpu')
        print('Caculate on {}'.format(self.device))
        print('user config:')
        for k, v in self.__class__.__dict__.items():
            if not k.startswith('_'):
                print(k, getattr(self, k))



In [None]:
#  train()
#  test_acc()
#  attack_model_PGD()
#  attack_model_pixel()