In [23]:
from torch.utils.data import Dataset, DataLoader
from torchvision.transforms import transforms as tt
import torchvision.models as models
import timm
from PIL import Image as pil
import os
import numpy as np
import torch
import pickle
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from numpy import linalg as LA

resnet50 = models.resnet50(pretrained=True)
vit = timm.create_model('vit_small_patch16_224', pretrained=True)
image_path="images"
old_label_path="images//old_labels"
image_list=os.listdir(image_path)


## 图片导入与数据集生成

In [2]:
def normalize(x, mean=[0.5,0.5,0.5], std=[0.5,0.5,0.5]):
    mean_t = torch.Tensor(mean).reshape([1,3,1,1]).to(x.device)
    std_t = torch.Tensor(std).reshape([1,3,1,1]).to(x.device)
    y = (x-mean_t)/std_t
    return y

def get_preprocess(model_name):
    if model_name[:6]=="resnet":
        mean=[0.485, 0.456, 0.406]
        std=[0.229, 0.224, 0.225]
    elif model_name[:3]=="vit":
        mean=[0.5,0.5,0.5]
        std=[0.5,0.5,0.5]
    def preprocess(images):
        y = F.interpolate(images.unsqueeze(0),224)
        return normalize(y, mean=mean,
                                 std=std)
    return preprocess

In [3]:
class mydata1(Dataset):
    def __init__(self,image_path):
        self.file=[]
        for i in open(old_label_path):
            self.file.append(i)
    
    def __getitem__(self,idx):
        img=pil.open("images//"+self.file[idx].split(' ')[0])
        label=np.array(int(self.file[idx].split(' ')[1]))
        img=preprocess1(tt.ToTensor()(img))
        return img[0], label
    
    def __len__(self):
        return len(self.file)
    
    
class mydata2(Dataset):
    def __init__(self,image_path):
        self.file=[]
        for i in open(old_label_path):
            self.file.append(i)
    
    def __getitem__(self,idx):
        img=pil.open("images//"+self.file[idx].split(' ')[0])
        label=np.array(int(self.file[idx].split(' ')[1]))
        img=preprocess2(tt.ToTensor()(img))
        return img[0], label
    
    def __len__(self):
        return len(self.file)
preprocess1=get_preprocess("resnet")
res_data_Tensor = torch.utils.data.DataLoader(dataset=mydata1(image_path))
preprocess2=get_preprocess("vit")
vit_data_Tensor=torch.utils.data.DataLoader(dataset=mydata2(image_path))

##  测试

In [4]:
def test(model,test_Tensor):
    model.eval()
    with torch.no_grad():
        correct = 0
        total = 0
        for images, labels in test_Tensor:
            test_output = model(images)
            pred_y = torch.max(test_output, 1)[1].data.squeeze()
            correct=correct+(pred_y == labels)
            total=total+1
        accuracy = correct / float(total)
            
    print('Test Accuracy of the model on the 1000 test images: %.3f' % accuracy)
    print(total)

In [5]:
test(resnet50,res_data_Tensor)

KeyboardInterrupt: 

In [None]:
test(vit,vit_data_Tensor)

## 

## 对抗样本生成

### C&W attack

In [12]:
def loss1_func(w,x,d,c):
    return torch.dist(x,(torch.tanh(w)*d+c),p=2)

def f(output,tlab,target,k=0):
    #f6(x)=max((max(Z(x')i)-Z(x')t),-k)
    real= torch.max(output*tlab)
    second=torch.max((1-tlab)*output)
    #如果指定了对象，则让这个更接近，否则选择第二个较大的
    if(target):
        return torch.max(second - real, torch.Tensor([-k]).cuda())
    else:
        return torch.max(real - second, torch.Tensor([-k]).cuda())

def cwattack_l2(img,model,right_label,iteration=1000,lr=0.001,target=False,target_label=0):
    shape=(1,3,28,28)
    binary_number=9                         #二分搜索
    maxc=1e10                               #从0.01-100去找c
    minc=0
    c=1e-3                                   #from c = 0:01 to c = 100 on the MNIST dataset.
    min_loss=1000000                         #找到最小的loss，即为方程的解
    min_loss_img=img                         #扰动后的图片
    k=0                                      #f函数使用，论文默认为0
    b_min = 0                                #盒约束，论文中使用了0-1 代码中-0.5 0.5
    b_max = 1
    b_mul=(b_max-b_min)/2.0
    b_plus=(b_min+b_max)/2.0
    if(not target):
        target_label=right_label
    tlab=Variable(torch.from_numpy(np.eye(10)[target_label]).cuda().float())
    for binary_index in range(binary_number):
        print("------------Start {} search, current c is {}------------".format(binary_index,c))
        
        #将img转换为w，w=arctanh(2x-1)，作为原始图片
        w = Variable(torch.from_numpy(np.arctanh((img.numpy()-b_plus)/b_mul*0.99999)).float()).cuda()
        
        w_pert=Variable(torch.zeros_like(w).cuda().float())
        w_pert.requires_grad = True
        #最初图像x
        x= Variable(img).cuda()
        optimizer = optim.Adam([w_pert], lr=lr) #论文p3选用Adam  
        isSuccessfulAttack=False
        
        for iteration_index in range(1,iteration+1):
            optimizer.zero_grad()
            
            # w加入扰动w_pert之后的新图像
            img_new=torch.tanh(w_pert+w)*b_mul+b_plus  #0.5*tanh(w+pert)+0.5
            loss_1=loss1_func(w,img_new,b_mul,b_plus)  #\\ deta\\  p=2
            model.eval()
            output=model(img_new)                      # Z(x)
            loss_2=c*f(output,tlab,target)             # c*f(x+deta) , x+deta=img_new, 
            loss=loss_1+loss_2                         # Minimize loss=loss1+loss2
            loss.backward(retain_graph=True)
            optimizer.step()
            if iteration_index%200==0:
                 print('Iters: [{}/{}]\tLoss: {},Loss1(L2 distance):{}, Loss2:{}'
                       .format(iteration_index,iteration,loss.item(),loss_1.item(),loss_2.item()))
                    
            pred_result=output.argmax(1, keepdim=True).item()
            #指定目标模式下,此处考虑l2距离最小,即找到最小的loss1
            if(target):
                if(min_loss>loss_1 and pred_result==target_label):
                    flag=False
                    for i in range(20):
                        output=model(img_new)
                        pred_result=output.argmax(1, keepdim=True).item()
                        if(pred_result!=target_label):
                            flag=True  #原模型中存在dropout，此处判断连续成功攻击20次，则视为有效
                            break
                    if(flag):
                        continue
                    min_loss=loss_1
                    min_loss_img=img_new
                    print('success when loss: {}, pred: {}'.format(min_loss,pred_result))
                    isSuccessfulAttack=True
             #非目标模式，找到最接近的一个,连续20次不预测成功
            else:
                if(min_loss>loss_1 and pred_result!=right_label):
                    flag=False
                    for i in range(50):
                        output=model(img_new)
                        pred_result=output.argmax(1, keepdim=True).item()
                        if(pred_result==right_label):
                            flag=True  #原模型中存在dropout，此处判断连续成功攻击50次，则视为有效
                            break
                    if(flag):
                        continue
                    min_loss=loss_1
                    min_loss_img=img_new
                    print('success when loss: {}, pred: {}'.format(min_loss,pred_result))
                    isSuccessfulAttack=True
        if(isSuccessfulAttack):
            maxc=min(maxc,c)
            if maxc<1e9: 
                c=(minc+maxc)/2
        #攻击失败，尝试放大c
        else:
            minc=max(minc,c)
            if(maxc<1e9):
                c=(maxc+minc)/2
            else:
                c=c*10
    return min_loss_img

In [25]:



def cw_attack(model,input_Tensor,epsilon,lim):
    cw_sample_list=[]
    id=0
    for (data, target) in input_Tensor:
        print("Correct Label is ",target)

        victim_img_input2 = torch.unsqueeze(data, 0)
        attack_untarget_img = cwattack_l2(data,resnet50,target,
                         iteration=1000,lr=0.01)
        cw_sample_list.append(attack_untarget_img)
        id=id+1
        if(id%100==0):
            print(("%d/"+"1000"+"\r")%(id))
        
    return fgsm_sample_list

### FGSM

#### L_inf

In [26]:


def fgm(model, image, label, epsilon, order, clip_min, clip_max, device):
    imageArray = image.cpu().detach().numpy()
    X_fgsm = torch.tensor(imageArray).to(device)

    #print(image.data)

    X_fgsm.requires_grad = True

    opt = optim.SGD([X_fgsm], lr=1e-3)
    opt.zero_grad()

    loss = nn.CrossEntropyLoss()(model(X_fgsm), label.long())

    loss.backward()
    #print(X_fgsm)
    #print(X_fgsm.grad)
    if order == np.inf:
        d = epsilon * X_fgsm.grad.data.sign()
    elif order == 2:
        gradient = X_fgsm.grad
        d = torch.zeros(gradient.shape, device = device)
        for i in range(gradient.shape[0]):
            norm_grad = gradient[i].data/LA.norm(gradient[i].data.cpu().numpy())
            d[i] = norm_grad * epsilon
    else:
        raise ValueError('Other p norms may need other algorithms')

    x_adv = X_fgsm + d

    if clip_max == None and clip_min == None:
        clip_max = np.inf
        clip_min = -np.inf

    x_adv = torch.clamp(x_adv,clip_min, clip_max)

    return x_adv
        

def fgsm_attack(model,input_Tensor,epsilon,lim):
    fgsm_sample_list=[]
    id=0
    for (data, target) in input_Tensor:
        perturbed_data = fgm(model,data,target,epsilon,2,0,1,"cpu")
        fgsm_sample_list.append((perturbed_data,target))
        id=id+1
        if(id%100==0):
            print(("%d/"+"1000"+"\r")%(id))
        
    return fgsm_sample_list

In [28]:
epsilon=0.3
fgsm_sample={}
fgsm_sample_path='./fgsm_sample.txt'
fgsm_sample["res"]=fgsm_attack(resnet50, res_data_Tensor,epsilon,lim)



100/1000
200/1000
300/1000
400/1000
500/1000
600/1000
700/1000
800/1000
900/1000
1000/1000


In [None]:
fgsm_sample["vit"]=fgsm_attack(vit, vit_data_Tensor,epsilon,lim)

In [None]:
with open(fgsm_sample_path,'wb') as f:
    content = pickle.dumps(fgsm_sample)
    f.write(content)

In [29]:
test(resnet50,fgsm_sample["res"])

Test Accuracy of the model on the 1000 test images: 0.181
1000


In [None]:
test(vit,fgsm_sample["vit"])

#### L_2

### PGD

#### L_inf

In [None]:
iters = 6     #迭代次数
alpha =epsilon*2/iters    #迭代步长

def PGD(input,epsilon,data_grad,iters,alpha):
    grad_attack = data_grad.sign()
    delta = grad_attack*alpha
    result = input+delta
    result = torch.clamp(result,0,1)
    return result
        
  

def PGD_attack(model,input_Tensor,epsilon,iters,alpha):
    pgd_sample_list=[]
    for (data,target) in input_Tensor:
        perturbed_data=data.detach()
        for i in range(iters):
            perturbed_data.requires_grad = True
            output = model(perturbed_data)
            init_pred = output.max(1)[1].squeeze()
            loss = loss_fn(output, target.long())
            model.zero_grad()
            loss.backward()
            perturbed_data_grad = perturbed_data.grad.data
            disturb = torch.clamp(PGD(perturbed_data,epsilon,perturbed_data_grad,iters,alpha)-data,-epsilon,epsilon)
            perturbed_data=(data+disturb).detach()
        pgd_sample_list.append((perturbed_data,target)) 
        
        
    return pgd_sample_list

pgd_sample={}
pgd_sample_path='./pgd_sample.txt'


pgd_sample["vit"]=PGD_attack(vit,vit_data_Tensor,epsilon,iters,alpha)
pgd_sample["res"]=PGD_attack(resnet50,res_data_Tensor,epsilon,iters,alpha)
    
with open(pgd_sample_path,'wb') as f:
    content = pickle.dumps(pgd_sample)
    f.write(content)

In [None]:
test(resnet50,pgd_sample["res"])

In [None]:
test(vit,pgd_sample["vit"])

#### L_2

In [None]:
iters = 6     #迭代次数
alpha =epsilon*2/iters    #迭代步长

def PGD(input,epsilon,data_grad,iters,alpha):
    grad_attack = data_grad.sign()
    delta = grad_attack*alpha 
    result = input+delta
    result = torch.clamp(result,0,1)
    return result
        
  

def PGD_attack(model,input_Tensor,epsilon,iters,alpha):
    pgd_sample_list=[]
    for (data,target) in input_Tensor:
        perturbed_data=data.detach()
        for i in range(iters):
            perturbed_data.requires_grad = True
            output = model(perturbed_data)
            init_pred = output.max(1)[1].squeeze()
            loss = loss_fn(output, target.long())
            model.zero_grad()
            loss.backward()
            perturbed_data_grad = perturbed_data.grad.data
            disturb = torch.clamp(PGD(perturbed_data,epsilon,perturbed_data_grad,iters,alpha)-data,-epsilon,epsilon)
            perturbed_data=(data+disturb).detach()
        pgd_sample_list.append((perturbed_data,target)) 
        
        
    return pgd_sample_list

pgd_sample={}
pgd_sample_path='./pgd_sample.txt'


pgd_sample["vit"]=PGD_attack(vit,vit_data_Tensor,epsilon,iters,alpha)
pgd_sample["res"]=PGD_attack(resnet50,res_data_Tensor,epsilon,iters,alpha)
    
with open(pgd_sample_path,'wb') as f:
    content = pickle.dumps(pgd_sample)
    f.write(content)