In [1]:
#Importando bibliotecas

import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as np
import random
import torch.optim as optim
import pickle
from torchvision import datasets, transforms
from torch.utils.data import DataLoader
import time

In [2]:
#Rede neural

class ResidualBlock(nn.Module):
    def __init__(self, inCh, outCh, stride):
        super(ResidualBlock, self).__init__()
        self.left = nn.Sequential(
            nn.Conv2d(in_channels=inCh, out_channels=outCh,
                            kernel_size=3, padding=1, stride=stride),
            nn.BatchNorm2d(outCh),
            nn.ReLU(inplace=True),
            nn.Conv2d(in_channels=outCh, out_channels=outCh,
                            kernel_size=3, padding=1, stride=1),
            nn.BatchNorm2d(outCh)
        )

        self.shortcut = nn.Sequential()
        if stride != 1 or inCh != outCh:
            self.shortcut = nn.Sequential(
                nn.Conv2d(in_channels=inCh, out_channels=outCh,
                            kernel_size=1, stride=stride),
                nn.BatchNorm2d(outCh)
            )

    def forward(self, x):
        out = self.left(x)
        out += self.shortcut(x)
        out = F.relu(out)
        return out


class ResNet18(nn.Module):
    def __init__(self):
        super(ResNet18, self).__init__()
        self.conv1 = nn.Sequential(
            nn.Conv2d(in_channels=3, out_channels=64, kernel_size=3, padding=1, stride=1),
            nn.BatchNorm2d(64),
            nn.ReLU(inplace=True)
        )
        self.layer_1 = self.make_layer(ResidualBlock, 64, 64, stride=1)
        self.layer_2 = self.make_layer(ResidualBlock, 64, 128, stride=2)
        self.layer_3 = self.make_layer(ResidualBlock, 128, 256, stride=2)
        self.layer_4 = self.make_layer(ResidualBlock, 256, 512, stride=2)
        self.avgpool = nn.AvgPool2d((3, 3), stride=2)
        self.fc = nn.Linear(512 * 1 * 1, 10)

    def forward(self, x):
        x = self.conv1(x)
        x = self.layer_1(x)
        x = self.layer_2(x)
        x = self.layer_3(x)
        x = self.layer_4(x)
        x = self.avgpool(x)
        x = x.view(-1, 512*1*1)
        x = self.fc(x)
        return x

    def make_layer(self, block, inCh, outCh, stride, block_num=2):
        layers = []
        layers.append(block(inCh, outCh, stride))
        for i in range(block_num - 1):
            layers.append(block(outCh, outCh, 1))
        return nn.Sequential(*layers)

In [3]:
#Parâmetros básicos

batchsize=128
worker=2
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
torch.manual_seed(0)
mean = [0.4914, 0.4822, 0.4465]
std = [0.2023, 0.1994, 0.2010]
transform_norm = transforms.Normalize(mean, std)

  return torch._C._cuda_getDeviceCount() > 0


In [4]:
#Ataques

def fgsm(model, X, y, epsilon=8.0/255.0):
    """ Construct FGSM adversarial examples on the examples X"""
    delta = torch.zeros_like(X, requires_grad=True)
    loss = nn.CrossEntropyLoss()(model(transform_norm(X + delta)), y)
    loss.backward()
    return epsilon * delta.grad.detach().sign()

def pgd_linf(model, X, y, epsilon=8.0/255.0, alpha=2/255, num_iter=7, randomize=False):
    """ Construct FGSM adversarial examples on the examples X"""
    if randomize:
        delta = torch.rand_like(X, requires_grad=True)
        delta.data = delta.data * 2 * epsilon - epsilon
    else:
        delta = torch.zeros_like(X, requires_grad=True)

    for t in range(num_iter):
        loss = nn.CrossEntropyLoss()(model(transform_norm(X + delta)), y)
        loss.backward()
        delta.data = (delta + alpha * delta.grad.detach().sign()).clamp(-epsilon, epsilon)
        delta.grad.zero_()
    return delta.detach()

In [5]:
#Treinamento adversário

def epoch_adversarial_train_ds(loader, model, attack,  P_up, minibatchsize, opt=None, **kwargs):
    """Adversarial training/evaluation epoch over the dataset"""
    total_loss, total_err, total_acc = 0., 0., 0.
    set_selected=[]
    for X, y in loader:
        X, y = X.to(device), y.to(device)
        yy = torch.cat((y, y), dim=0)
        delta = attack(model, X, y, **kwargs)
        adv = transform_norm(X + delta)
        yp_adv = model(adv)
        loss_adv = nn.CrossEntropyLoss()(yp_adv, y)
        clean = transform_norm(X)
        yp_clean = model(clean)
        loss_clean = nn.CrossEntropyLoss()(yp_clean, y)

        yp = torch.cat((yp_adv,yp_clean), dim=0)
        image =  torch.cat((adv, clean), dim=0)

        myloss = compute_crossentropyloss_manual(yp,yy)
        total_acc += (yp.max(dim=1)[1] == yy).sum().item()/len(myloss)
        size_sel = int(P_up *2* minibatchsize)

        error_signal = np.argsort(myloss)
        error_signal = error_signal[::-1]
        selec = error_signal[0:size_sel]

        set_selected.append(selec)
        label_aux = yy.cpu().detach().numpy()
        yy = torch.from_numpy(label_aux[selec])
        image_aux = image.cpu().detach().numpy()
        image = torch.from_numpy(image_aux[selec])

        yp = model(image.cpu())
        loss = nn.CrossEntropyLoss()(yp.cpu(), yy.cpu())
        if opt:
            opt.zero_grad()
            loss.backward()
            opt.step()

        total_err += (yp.cpu().max(dim=1)[1] == yy.cpu()).sum().item()
        total_loss += loss.item() * X.shape[0]

    return 0., total_loss / len(loader.dataset), total_err / (2*P_up*len(loader.dataset))

def compute_crossentropyloss_manual(x,y0):
    """
    x is the vector of probabilities with shape (batch_size,C)
    y0 shape is the same (batch_size), whose entries are integers from 0 to C-1
    """
    myloss=[]
    loss = 0.
    n_batch, n_class = x.shape
    # print(n_class)
    for x1,y1 in zip(x,y0):
        x1 = x1.cpu().detach().numpy().astype(np.float128)
        class_index = int(y1.item())
        loss = -np.log(np.exp(x1[class_index])/(np.exp(x1).sum()))
        myloss.append(loss)
    loss = - loss/n_batch

    return myloss

In [6]:
#Testes

def epoch(loader, model, opt=None):
    """Standard training/evaluation epoch over the dataset"""
    total_loss, total_err = 0., 0.
    for X, y in loader:
        X, y = X.to(device), y.to(device)
        X = transform_norm(X)
        yp = model(X)
        loss = nn.CrossEntropyLoss()(yp, y)
        if opt:
            opt.zero_grad()
            loss.backward()
            opt.step()

        total_err += (yp.max(dim=1)[1] == y).sum().item()
        total_loss += loss.item() * X.shape[0]
    return total_err / len(loader.dataset), total_loss / len(loader.dataset)


def epoch_adversarial(loader, model, attack, opt=None, **kwargs):
    """Adversarial training/evaluation epoch over the dataset"""
    total_loss, total_err = 0., 0.
    for X, y in loader:
        X, y = X.to(device), y.to(device)
        # with adversarial example
        delta = attack(model, X, y, **kwargs)
        adv = transform_norm(X + delta)
        yp = model(adv)
        loss = nn.CrossEntropyLoss()(yp, y)
        if opt:
            opt.zero_grad()
            loss.backward()
            opt.step()
        total_err += (yp.max(dim=1)[1] == y).sum().item()
        total_loss += loss.item() * X.shape[0]

    return total_err / len(loader.dataset), total_loss / len(loader.dataset)

In [7]:
#Importando dataset

transform = transforms.Compose(
    [transforms.RandomCrop(32, padding=4),
     transforms.RandomHorizontalFlip(),
     transforms.ToTensor()])
transform_test = transforms.Compose(
    [transforms.ToTensor()])

trainset = datasets.CIFAR10(root='./data', train=True, download=True, transform=transform)
testset = datasets.CIFAR10(root='./data', train=False, download=True, transform=transform_test)

train_loader = DataLoader(trainset, batch_size=batchsize, shuffle=True, num_workers=worker)
test_loader = DataLoader(testset, batch_size=batchsize, shuffle=False, num_workers=worker)

Files already downloaded and verified
Files already downloaded and verified


In [8]:
#Seleção de dados

P_up = 1.0

In [None]:
for P_up in [.2,.4,.6,.8,1]:
    
    #Looping de treinamento

    model_cnn_robust = ResNet18().to(device)
    
    LR = 0.1

    opt = optim.SGD(model_cnn_robust.parameters(), lr=LR, momentum=0.9, weight_decay=5e-4)
    train_err_all=[]
    test_err_all = []
    adv_err_all = []
    all_set_selected=[]
    time_train=[]
    P_up_all=[]

    ini = time.time()

    for t in range(200):
        
        if t == 99 or t == 149:
            opt = optim.SGD(model_cnn_robust.parameters(), lr=LR, momentum=0.9, weight_decay=0.1)
        else:
            opt = optim.SGD(model_cnn_robust.parameters(), lr=LR, momentum=0.9, weight_decay=5e-4)
        
        train_err, train_loss, acc = epoch_adversarial_train_ds(train_loader, model_cnn_robust, pgd_linf, P_up, batchsize, opt)
        P_up_all.append(P_up)
        #P_up = (1-acc)*P_up
        test_err, test_loss = epoch(test_loader, model_cnn_robust)

        adv_err, adv_loss = epoch_adversarial(test_loader, model_cnn_robust, pgd_linf)
        train_err_all.append(train_err)
        test_err_all.append(test_err)
        adv_err_all.append(adv_err)

        print('\n')
        print(f'Train Error: {train_err}; Test Acc: {test_err}; Adversarial Acc: {adv_err}')
        print(f'Train Loss: {train_loss}; Test Loss: {test_loss}; Adversarial Loss: {adv_loss}')
        with open('adv_train_pup_1.pickle', 'wb') as f:
            pickle.dump([train_err_all, test_err_all, adv_err_all], f)
    torch.save(model_cnn_robust.state_dict(), f"model_adv_train_pup_DS={P_up}_LR={LR}.pt")

    fim = time.time()
    print(f'Tempo de execução: {fim-ini}')



Train Error: 0.0; Test Acc: 0.1; Adversarial Acc: 0.1
Train Loss: 2.4485930293273928; Test Loss: 2.301503427886963; Adversarial Loss: 2.3031305778503417
