<a href="https://colab.research.google.com/github/m-mehabadi/grad-maker/blob/main/_notebooks/UniversalAttackGenerator.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>


Areas:
* Domain Generalization in Classification
* Domain Generalization in Mitosis Detection
* Universal Adversarial Example Generator
* An Aproach to Optimize Robustness and Performance Simultaneously
* Multi-Tasking using GradMaker
* Applications in Federated Learning


In [None]:
import torch
import torch.nn as nn
import torchvision
import torchvision.datasets
import torchvision.transforms as transforms
import matplotlib.pyplot as plt
import numpy as np

import gc
import torch.nn.functional as F
from torch.autograd import Variable

In [None]:
# config
device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')

# Param
num_epochs = 1
num_classes = 10
batch_size = 100
learning_rate = 0.001

In [None]:
# MNIST datasets
train_dataset = torchvision.datasets.MNIST(root='./', train=True, transform=transforms.ToTensor(), download=True)
test_dataset = torchvision.datasets.MNIST(root='./', train=False, transform=transforms.ToTensor(), download=True)

# MNIST dataloaders
train_loader = torch.utils.data.DataLoader(dataset=train_dataset, batch_size=batch_size, shuffle=True)
test_loader = torch.utils.data.DataLoader(dataset=test_dataset, batch_size=batch_size, shuffle=False)

In [None]:
# CNN(two layer)
class ConvNet(nn.Module):
    def __init__(self, num_classes=10):
        super(ConvNet, self).__init__()
        self.layer1 = nn.Sequential(
            nn.Conv2d(1, 16, kernel_size=5, stride=1, padding=2),
            nn.BatchNorm2d(16),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2))
        self.layer2 = nn.Sequential(
            nn.Conv2d(16, 32, kernel_size=5, stride=1, padding=2),
            nn.BatchNorm2d(32),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2))
        self.fc = nn.Linear(7 * 7 * 32, num_classes)

    def forward(self, x):
        out = self.layer1(x)
        out = self.layer2(out)
        out = out.reshape(out.size(0), -1)
        out = self.fc(out)
        return out

In [None]:
model = ConvNet(num_classes).to(device)

# Loss and optimize
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)

# Train
model.train()
total_step = len(train_loader)
for epoch in range(num_epochs):
    for i, (images, labels) in enumerate(train_loader):
        images = images.to(device)
        labels = labels.to(device)

        # Forward pass
        outputs = model(images)
        loss = criterion(outputs, labels)

        # Backprop and optimize
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        if (i + 1) % 100 == 0:
            print (f'Epoch [{epoch + 1}/{num_epochs}], Step [{i + 1}/{total_step}], Training Loss: {loss.item():.4f}')

# Test the model
print(f'Model Accuracy on the 10000 test images: {validate(model, test_loader):.4f} %')

Epoch [1/1], Step [100/600], Training Loss: 0.1575
Epoch [1/1], Step [200/600], Training Loss: 0.1005
Epoch [1/1], Step [300/600], Training Loss: 0.0808
Epoch [1/1], Step [400/600], Training Loss: 0.0707
Epoch [1/1], Step [500/600], Training Loss: 0.0668
Epoch [1/1], Step [600/600], Training Loss: 0.0631
Model Accuracy on the 10000 test images: 98.5800 %


### Now let's generate the attack


In [None]:
def gc_collect(*vars):
    for var in vars:
        del var
    gc.collect()

def empty_cache():
    torch.cuda.empty_cache()

In [None]:
def adv_clip(adv, real, alpha=0):
    return torch.clamp(adv, min=torch.clamp(real-alpha, min=0.),
                       max=torch.clamp(real+alpha, max=1.))

In [None]:
def accuracy(model, X, y):
    outputs = model(X)
    _, predicted = torch.max(outputs.data, 1)
    total = y.size(0)
    correct = (predicted == y).sum().item()
    return correct / total

In [None]:
def gradient_maker(domain_grads, epsilon=0.5, alpha=0.05, alpha_schedule='exp'):
    iters = 50000
    dgr = domain_grads.view(domain_grads.shape[0], -1).to(device)
    number_of_domains, dim = dgr.shape

    #
    g = torch.randn(dim).to(device)
    u_ = torch.zeros(number_of_domains).to(device)
    
    iter = 0
    while (not torch.abs(torch.min(dgr@g)-epsilon)<=0.001) and (iter < iters):
        u_ = u_ + alpha*(epsilon - (dgr@g))
        g = (1./number_of_domains)*torch.sum(((1+(u_>=0)*u_).reshape(number_of_domains, 1))*dgr, axis=0)
        iter += 1
    return g.view(domain_grads.shape[1:])

In [None]:
def validate(model, dloader, pert=None, is_batched=True):
    model.eval()
    # with torch.no_grad()
    correct = 0
    total = 0
    if pert is not None:
        pert_ = pert(model, dloader) if not is_batched else None
    for images, labels in dloader:
        images = images.to(device)
        labels = labels.to(device)
        if pert is not None:
            print("213123")
            print(f"X.size()={images.size()}, y.size()={labels.size()}")
            images = Variable(images.clone(), requires_grad=True)
            # labels = Variable(labels.clone(), requires_grad=True)
            print(images.requires_grad)
            pert_ = pert(model, images, labels) if is_batched else pert_
            print("12432544")
            images = images.data.detach() + pert_.data.detach()
            gc_collect(pert_, images, labels)
            empty_cache()
        outputs = model(images)
        _, predicted = torch.max(outputs.data, 1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()
    return (100 * correct / total)

In [None]:
validate(model, train_loader, fgsm, True)

213123
X.size()=torch.Size([100, 1, 28, 28]), y.size()=torch.Size([100])
True
84723ryskdfhaszjkdaslkdj
12432544


RuntimeError: ignored

In [None]:
X, y = next(iter(train_loader))
X, y = X.to(device), y.to(device)
# dX(model,X, y)
print(f"X.size()={X.size()}, y.size()={y.size()}")
fgsm(model, X, y).size()

In [None]:
def dX(model, X, y):
    # X = Variable(X.data, requires_grad=True)
    outputs = model(X)
    loss = F.cross_entropy(outputs, y)
    # X.retain_grad()
    print("84723ryskdfhaszjkdaslkdj")
    loss.backward()
    # print("84723ryskdfhaszjkdaslkdj")
    return X.grad.data.clone()

def fgsm(model, X, y, eps=0.1):
    advs = X + eps * torch.sign(dX(model, X, y))
    return adv_clip(advs, X, alpha=eps) - X

def universal_v1(model, X, y, alpha=0.01, steps=20, eps=0.15):
    X_first = X.clone()
    for step in range(steps):
        X_grad = dX(model, X, y)
        X_advs = adv_clip(X + alpha * torch.sign(gradient_maker(X_grad)), X, eps)
        X = Variable(X_advs.data, requires_grad=False)
        
        #
        gc_collect(X, X_advs)
        empty_cache()
    return X - X_first

def universal_v2(model, dataloader, alpha=0.01, steps=20, eps=0.15):
    pert = None
    for step in range(steps):
        grads = []
        for images, labels in dataloader:
            images, labels = images.to(device), labels.to(device)
            if pert is None:
                pert = torch.zeros_like(images)
            images_grads = dX(model, adv_clip(images + pert), labels)
            grads.append(gradient_maker(images_grads))
        pert += alpha * torch.sign(gradient_maker(torch.stack((grads))))
    return pert

In [None]:
validate(model, train_loader, fgsm, is_batched=True)

In [None]:
# validate(model, train_loader, None)

In [None]:
# validate(model, test_loader, None)