<a href="https://colab.research.google.com/github/kerimoglutolga/AdversarialLearning/blob/master/finalreport.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
import numpy as np
import torch
import torchvision
import torch.nn as nn
import torch.nn.functional as F
import matplotlib.pyplot as plt

In [2]:
n_epochs = 5
batch_size_train = 64
batch_size_test = 64
learning_rate = 0.001
momentum = 0.5
log_interval = 100

In [3]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [4]:
train_loader = torch.utils.data.DataLoader(
    torchvision.datasets.MNIST('/files/', train=True, download=True,
                               transform=torchvision.transforms.Compose([
                                   torchvision.transforms.ToTensor(),
                                   #torchvision.transforms.Normalize(
                                   #    (0.1307,), (0.3081,))
                               ])),
    batch_size=batch_size_train, shuffle=True, drop_last=True)

test_loader = torch.utils.data.DataLoader(
    torchvision.datasets.MNIST('/files/', train=False, download=True,
                               transform=torchvision.transforms.Compose([
                                   torchvision.transforms.ToTensor(),
                                   #torchvision.transforms.Normalize(
                                    #   (0.1307,), (0.3081,))
                               ])),
    batch_size=batch_size_test, shuffle=True, drop_last=True)

In [5]:
class Encoder(nn.Module):
    def __init__(self):
        super().__init__()
        layers = [nn.Conv2d(1,10, kernel_size=5), nn.MaxPool2d(kernel_size=2), nn.ReLU(),
                  nn.Conv2d(10, 10, kernel_size=5), nn.Dropout2d(),
                  nn.MaxPool2d(kernel_size=2), ]
        self.net = nn.Sequential(*layers)
        
    def forward(self, x):
        return self.net(x)

In [6]:
class MLP(nn.Module):
    def __init__(self, input_dim=160):
        super().__init__()
        layers = [nn.Linear(input_dim, 50), nn.ReLU(), nn.Dropout(),
                  nn.Linear(50, 10)]
        self.net = nn.Sequential(*layers)
    
    def forward(self, x):
        return F.log_softmax(self.net(x), dim=1)

In [7]:
class Classifier(nn.Module):
    def __init__(self):
        super().__init__()
        self.encoder = Encoder()
        self.mlp = MLP()
    
    def forward(self, x):
        x = self.encoder(x).view(x.shape[0], -1)
        return self.mlp(x)

In [9]:
def pgd_attack(model, images, labels, eps=0.3, alpha=2/255, iters=30) :
    loss = nn.CrossEntropyLoss()
    
    ori_images = images.data

    for i in range(iters) :    
        images.requires_grad = True
        outputs = model(images)
        model.zero_grad()
        cost = loss(outputs, labels).to(device)
        images.retain_grad()
        cost.backward()

        adv_images = images + alpha*images.grad.sign()
        eta = torch.clamp(adv_images - ori_images, min=-eps, max=eps)
        images = torch.clamp(ori_images + eta, min=0, max=1).detach_()
            
    return images

In [12]:
network = Classifier().to(device)
optimizer = torch.optim.Adam(network.parameters(), lr=learning_rate);

In [None]:
train_losses = []
train_counter = []
test_losses = []
test_counter = [i*len(train_loader.dataset) for i in range(n_epochs + 1)]

In [None]:
def train(epoch, adv=False, mix_rate=0.5):
    loss_fn = nn.CrossEntropyLoss()
    network.train()
    for batch_idx, (data, target) in enumerate(train_loader):
        data, target = data.to(device), target.to(device)
        if adv:
          val = torch.rand(1)
          if val < mix_rate:
            data = pgd_attack(network, data, target)
        optimizer.zero_grad()
        output = network(data)
        loss = loss_fn(output, target)
        loss.backward()
        optimizer.step()
        if batch_idx % log_interval == 0:
            print('Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}'.format(
                epoch, batch_idx * len(data), len(train_loader.dataset),
                       100. * batch_idx / len(train_loader), loss.item()))
            train_losses.append(loss.item())
            train_counter.append(
                (batch_idx*64) + ((epoch-1)*len(train_loader.dataset)))

In [None]:
def test(adv=False):
  network.eval()
  test_loss = 0
  correct = 0
  for data, target in test_loader:
      data, target = data.to(device), target.to(device)
      if adv:
        data = pgd_attack(network, data, target)
      output = network(data)
      test_loss += F.nll_loss(output, target).item()
      pred = output.data.max(1, keepdim=True)[1]
      correct += pred.eq(target.data.view_as(pred)).sum()
  test_loss /= len(test_loader.dataset)
  test_losses.append(test_loss)
  if adv:
    print('Test set: Avg. adversarial loss: {:.4f}, Adversarial Accuracy: {}/{} ({:.0f}%)\n'.format(
      test_loss, correct, len(test_loader.dataset),
      100. * correct / len(test_loader.dataset)))
  else: print('Test set: Avg. natural loss: {:.4f}, Natural Accuracy: {}/{} ({:.0f}%)'.format(
      test_loss, correct, len(test_loader.dataset),
      100. * correct / len(test_loader.dataset)))

In [None]:
# Regular training
for epoch in range(1, n_epochs + 1):
    train(epoch)
    test()
    test(adv=True)

Test set: Avg. natural loss: 0.0026, Natural Accuracy: 9475/10000 (95%)
Test set: Avg. adversarial loss: 0.1118, Adversarial Accuracy: 41/10000 (0%)

Test set: Avg. natural loss: 0.0018, Natural Accuracy: 9601/10000 (96%)
Test set: Avg. adversarial loss: 0.1304, Adversarial Accuracy: 52/10000 (1%)

Test set: Avg. natural loss: 0.0015, Natural Accuracy: 9661/10000 (97%)
Test set: Avg. adversarial loss: 0.1421, Adversarial Accuracy: 68/10000 (1%)

Test set: Avg. natural loss: 0.0013, Natural Accuracy: 9701/10000 (97%)
Test set: Avg. adversarial loss: 0.1567, Adversarial Accuracy: 38/10000 (0%)

Test set: Avg. natural loss: 0.0012, Natural Accuracy: 9730/10000 (97%)
Test set: Avg. adversarial loss: 0.1740, Adversarial Accuracy: 24/10000 (0%)



In [None]:
# Adversarial training
for epoch in range(1, n_epochs + 1):
    train(epoch, adv=True, mix_rate=1)
    test()
    test(adv=True)



KeyboardInterrupt: ignored

In [None]:
torch.save(network, "85-69.pth")

In [13]:
network = torch.load("85-69.pth")

In [14]:
features = []
labels = []
loss_fn = nn.CrossEntropyLoss()
network.eval()
for batch_idx, (data, target) in enumerate(test_loader):
    data, target = data.to(device), target.to(device)
    data = pgd_attack(network, data, target)
    optimizer.zero_grad()
    with torch.no_grad():
      output = np.reshape(network.encoder(data).cpu().detach().numpy(), (64, -1))
      target = target.cpu().detach().numpy()
      output_list = np.split(output, 64)
      target_list = np.split(target, 64)
      features.extend(output_list)
      labels.extend(target_list)

In [15]:
features = np.array(features)
labels = np.array(labels)
features = np.squeeze(features, 1)
labels = np.squeeze(labels, 1)

In [17]:
from sklearn.feature_selection import RFE
from sklearn.svm import SVR

In [18]:
estimator = SVR(kernel="linear")
selector = RFE(estimator, n_features_to_select=40, step=1)
selector = selector.fit(features, labels)

In [20]:
indices = selector.get_support()
np.save("indices")

TypeError: ignored

In [None]:
selected_features = np.load("features.npy")

In [None]:
# MLP train
new_model = MLP(30).cuda()
def post_train(epoch, adv=False, mix_rate=0.5):
    optimizer = torch.optim.Adam(new_model.parameters(), lr=learning_rate)
    loss_fn = nn.CrossEntropyLoss()
    network.eval()
    for batch_idx, (data, target) in enumerate(train_loader):
        data, target = data.to(device), target.to(device)
        output = np.reshape(network.encoder(data).cpu().detach().numpy(), (64, -1))
        output = output[:, selected_features == 1]
        output = torch.from_numpy(output).float().cuda()
        if adv:
          val = torch.rand(1)
          if val < mix_rate:
            data = pgd_attack(new_model, output, target)
        output = new_model(output)
        loss = loss_fn(output, target)
        loss.backward()
        optimizer.step()
        if batch_idx % log_interval == 0:
            print('Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}'.format(
                epoch, batch_idx * len(data), len(train_loader.dataset),
                       100. * batch_idx / len(train_loader), loss.item()))
            train_losses.append(loss.item())
            train_counter.append(
                (batch_idx*64) + ((epoch-1)*len(train_loader.dataset)))

In [None]:
def post_test(model,adv=False):
  model.eval()
  test_loss = 0
  correct = 0
  for data, target in test_loader:
      data, target = data.to(device), target.to(device)
      if adv:
        data = pgd_attack(network, data, target)
        output = np.reshape(network.encoder(data).cpu().detach().numpy(), (64, -1))
        output = output[:, selected_features == 1]
        output = torch.from_numpy(output).float().cuda()
        output = new_model(output)
      test_loss += F.nll_loss(output, target).item()
      pred = output.data.max(1, keepdim=True)[1]
      correct += pred.eq(target.data.view_as(pred)).sum()
  test_loss /= len(test_loader.dataset)
  test_losses.append(test_loss)
  if adv:
    print('Test set: Avg. adversarial loss: {:.4f}, Adversarial Accuracy: {}/{} ({:.0f}%)\n'.format(
      test_loss, correct, len(test_loader.dataset),
      100. * correct / len(test_loader.dataset)))
  else: print('Test set: Avg. natural loss: {:.4f}, Natural Accuracy: {}/{} ({:.0f}%)'.format(
      test_loss, correct, len(test_loader.dataset),
      100. * correct / len(test_loader.dataset)))

In [None]:
for epoch in range(1, 11):
    post_train(epoch)
    post_test(new_model)
    post_test(new_model, adv=True)

IndexError: ignored

In [None]:
for epoch in range(1, 11):
    post_train(epoch, adv=True, mix_rate=1)
    post_test(new_model)
    post_test(new_model, adv=True)