# BIGRoC: Boosting Image Generation via a Robust Classifier

--------------------------------------------------

## Comparison with other methods - ImageNet $128\times128$

This colab notebook contains the needed code to experiment with our proposed algorithm for comparison with competitive metohds on ImageNet128x128 dataset (Section 5.3).

**How to use**

1.   Upload the notebook to colab
2.   Make sure that colab uses a GPU (Edit $\rightarrow$ Notebook settings $\rightarrow$ Hardware accelerator)

**Downloading pretrained generators:**

The pretrained image generators are stored in a [Google Drive](https://drive.google.com/drive/folders/1yN6WjMmc-pi3zHylF-I1jri7I2nZpsGJ?usp=sharing). This notebook mounts google drive and assumes that [this directory](https://drive.google.com/drive/folders/1yN6WjMmc-pi3zHylF-I1jri7I2nZpsGJ?usp=sharing) is located in your root folder in Google Drive. Therefore, please open the Google Drive's link and choose "add a shortcut to drive" and pick "My Drive" is the chosen location. By doing so, you are ready to go :)

If you rather not to mount your google drive, download the files and edit the relevant checkpoint paths accordingly.

**credits**

This notebook uses the following packages:

* [pfnet-researcrch](https://github.com/pfnet-research/sngan_projection) - For the official pretrained SN-ResNetGAN
* [robustness](https://github.com/MadryLab/robustness) - For adversarially robust classifier
* [guided-diffuision](https://github.com/openai/guided-diffusion/tree/main/evaluations) - ImageNet quantitative evaluation

# Setup & Installation

In [None]:
!pip install torch==1.8.2+cu102 torchvision==0.9.2+cu102 -f https://download.pytorch.org/whl/lts/1.8/torch_lts.html

In [2]:
import torch
import numpy as np

# reproducibility
torch.backends.cudnn.benchmark = False
torch.backends.cudnn.deterministic = True

torch.manual_seed(3407)
np.random.seed(3407)

In [None]:
from google.colab import drive
drive.mount('/content/gdrive')

In [None]:
!curl https://colab.chainer.org/install | CHAINER_VERSION="==5.1.0" CUPY_VERSION="==5.1.0" sh -

Clone SN-ResNet-GAN repository

In [None]:
!git clone https://github.com/pfnet-research/sngan_projection.git
!mv sngan_projection/* ./

Load the pretrained SN-ResNet-GAN and generate 50K reference images

In [19]:
import os, sys, time
import shutil
import numpy as np
import argparse
import chainer
from PIL import Image
from evaluation import gen_images_with_condition
import yaml
import source.yaml_utils as yaml_utils
import torch
from tqdm.notebook import tqdm
import cupy

def load_models(config):
    gen_conf = config.models['generator']
    gen = yaml_utils.load_model(gen_conf['fn'], gen_conf['name'], gen_conf['args'])
    return gen


parser = argparse.ArgumentParser()
parser.add_argument('--config_path', type=str, default='configs/sn_projection.yml')
parser.add_argument('--gpu', '-g', type=int, default=0)
parser.add_argument('--results_dir', type=str, default='images')
parser.add_argument('--snapshot', type=str, default='gdrive/MyDrive/models_and_scripts/IN_ResNetGenerator_850000.npz')
parser.add_argument('--rows', type=int, default=1)
parser.add_argument('--columns', type=int, default=1)
parser.add_argument('--classes', type=int, nargs="*", default=None)
args,_ = parser.parse_known_args()
chainer.cuda.get_device_from_id(args.gpu).use()
config = yaml_utils.Config(yaml.load(open(args.config_path)))
gen = load_models(config)
gen.to_gpu(args.gpu)
out = args.results_dir
chainer.serializers.load_npz(args.snapshot, gen)
np.random.seed(1)
cupy.random.seed(1)
classes = tuple(args.classes) if args.classes is not None else np.arange(0, gen.n_classes, dtype=np.int32)
gen_imgs = []

for c in tqdm(classes):
    with chainer.using_config('train', False), chainer.using_config('enable_backprop', False):
        x = gen_images_with_condition(gen, c=c, n=50, batchsize=50)
    _, _, h, w = x.shape
    x = x.transpose(0, 2, 3, 1)

    for i in range(50):
      save_path = os.path.join(out, f'{str(c)}_{i}.png')
      if not os.path.exists(out):
          os.makedirs(out)
      Image.fromarray(x[i]).save(save_path)
      gen_imgs.append(torch.tensor(x[i: i+1]))
gen_imgs = torch.cat(gen_imgs)

  0%|          | 0/1000 [00:00<?, ?it/s]

Download reference batch for metrics calculation

In [None]:
!wget https://openaipublic.blob.core.windows.net/diffusion/jul-2021/ref_batches/imagenet/128/VIRTUAL_imagenet128_labeled.npz

# BIGRoC Utils

In [None]:
!wget http://andrewilyas.com/ImageNet.pt

In [None]:
!pip install robustness

In [None]:
from robustness import model_utils, datasets
import torch


def create_dl_model(DATA='CIFAR', BATCH_SIZE=128, NUM_WORKERS=8):
    '''
    :param DATA: Choices: ['CIFAR', 'ImageNet', 'RestrictedImageNet']
    :param bs: batch size
    :param num_workers:
    :return: a dataloader object
    '''

    # Load dataset
    dataset_function = getattr(datasets, DATA)
    dataset = dataset_function('data')
    # Load model
    model_kwargs = {
        'arch': 'resnet50',
        'dataset': dataset,
        'resume_path': f'./{DATA}.pt'
    }
    model, _ = model_utils.make_and_restore_model(**model_kwargs)
    classifier = model.model
    classifier.eval()
    return classifier

adv_model = create_dl_model(DATA='ImageNet')

In [11]:
import torch
from torch import nn


class AttackerStep:
    '''
    Generic class for attacker steps, under perturbation constraints
    specified by an "origin input" and a perturbation magnitude.
    Must implement project, step, and random_perturb
    '''

    def __init__(self, orig_input, eps, step_size, use_grad=True):
        '''
        Initialize the attacker step with a given perturbation magnitude.
        Args:
            eps (float): the perturbation magnitude
            orig_input (ch.tensor): the original input
        '''
        self.orig_input = orig_input
        self.eps = eps
        self.step_size = step_size
        self.use_grad = use_grad

    def project(self, x):
        '''
        Given an input x, project it back into the feasible set
        Args:
            ch.tensor x : the input to project back into the feasible set.
        Returns:
            A `ch.tensor` that is the input projected back into
            the feasible set, that is,
        .. math:: \min_{x' \in S} \|x' - x\|_2
        '''
        raise NotImplementedError

    def step(self, x, g):
        '''
        Given a gradient, make the appropriate step according to the
        perturbation constraint (e.g. dual norm maximization for :math:`\ell_p`
        norms).
        Parameters:
            g (ch.tensor): the raw gradient
        Returns:
            The new input, a ch.tensor for the next step.
        '''
        raise NotImplementedError

    def random_perturb(self, x):
        '''
        Given a starting input, take a random step within the feasible set
        '''
        raise NotImplementedError

    def to_image(self, x):
        '''
        Given an input (which may be in an alternative parameterization),
        convert it to a valid image (this is implemented as the identity
        function by default as most of the time we use the pixel
        parameterization, but for alternative parameterizations this functino
        must be overriden).
        '''
        return x


from torch import nn

# simple Module to normalize an image
class Normalize(nn.Module):
    def __init__(self, mean, std):
        super(Normalize, self).__init__()
        self.mean = torch.Tensor(mean)
        self.std = torch.Tensor(std)
    def forward(self, x):
        return (x - self.mean.type_as(x)[None, :, None, None]) / self.std.type_as(x)[None, :, None, None]

norm = Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])



# L2 threat model
class L2Step(AttackerStep):
    """
    Attack step for :math:`\ell_\infty` threat model. Given :math:`x_0`
    and :math:`\epsilon`, the constraint set is given by:
    .. math:: S = \{x | \|x - x_0\|_2 \leq \epsilon\}
    """

    def project(self, x):
        """
        """
        if self.orig_input is None: self.orig_input = x.detach()
        self.orig_input = self.orig_input.cuda()
        diff = x - self.orig_input
        diff = diff.renorm(p=2, dim=0, maxnorm=self.eps)
        return torch.clamp(self.orig_input + diff, 0, 1)

    def step(self, x, g):
        """
        """
        l = len(x.shape) - 1
        g_norm = torch.norm(g.reshape(g.shape[0], -1), dim=1).view(-1, *([1] * l))
        scaled_g = g / (g_norm + 1e-10)
        return x + scaled_g * self.step_size

def targeted_pgd_l2(model, X, y, num_iter, eps, step_size):
    # input images are in range [0,1]
    steper = L2Step(eps=eps, orig_input=None, step_size=step_size)
    for t in range(num_iter):
        X = X.clone().detach().requires_grad_(True).cuda()
        loss = nn.CrossEntropyLoss(reduction='none')(model(norm(X)), y)
        loss = torch.mean(loss)
        grad, = torch.autograd.grad(-1 * loss, [X])
        X = steper.step(X, grad)
        X = steper.project(X)
    return X.detach()

# BIGRoC Application

In [20]:
# BIGRoC args
epsilon, steps = 15, 7
step_size = 1.5 * epsilon / steps

In [21]:
labels = torch.ones(size=(50000,))
for c in range(1000):
  labels[c*50: (c+1) * 50] = c
labels = labels.int()

In [22]:
from tqdm.notebook import tqdm

# In each iter, generate 25K boosted images from the generated ones
for iter in range(2):
  torch.manual_seed(1)
  boosted_gen_imgs = []
  for i in tqdm(range(250)):
    x_batch = gen_imgs[i * 100: (i + 1) * 100]
    x_batch_0_1 = torch.tensor(x_batch / 255.).cuda().permute(0,3,1,2).float()
    y = labels[i * 100: (i + 1) * 100].cuda()
    b_imgs = targeted_pgd_l2(model=adv_model, X=x_batch_0_1.data, y=y.long(), num_iter=steps, eps=epsilon,
                                      step_size=step_size).detach().cpu()
    boosted_gen_imgs.append(b_imgs)
  # to np array
  boosted_gen_imgs = torch.cat(boosted_gen_imgs)
  boosted_gen_imgs = (boosted_gen_imgs * 255.).int()
  boosted_gen_imgs = boosted_gen_imgs.detach().cpu().permute(0,2,3,1).numpy().astype(np.uint8)

  np.savez(f"boosted_SN_ResNet_GAN_eps_{epsilon}_{iter}", boosted_gen_imgs)

  0%|          | 0/250 [00:00<?, ?it/s]

  if __name__ == '__main__':


  0%|          | 0/250 [00:00<?, ?it/s]

# Evaluation

Collect the two 25K boosted images for evaluation

In [23]:
import numpy as np

x = np.load(f"boosted_SN_ResNet_GAN_eps_{epsilon}_0.npz", mmap_mode='r')
arr = x['arr_0']
x = np.load(f"boosted_SN_ResNet_GAN_eps_{epsilon}_1.npz", mmap_mode='r')
arr = np.concatenate((arr, x['arr_0']))

In [24]:
  # save as npz for evaluation
  np.savez(f"boosted_SN_ResNet_GAN_eps_{epsilon}", arr)

Run evaluation

In [25]:
!python ./gdrive/MyDrive/models_and_scripts/evaluator.py ./VIRTUAL_imagenet128_labeled.npz ./boosted_SN_ResNet_GAN_eps_15.npz

2022-01-26 21:05:28.340476: W tensorflow/core/framework/op_def_util.cc:371] Op BatchNormWithGlobalNormalization is deprecated. It will cease to work in GraphDef version 9. Use tf.nn.batch_normalization().
warming up TensorFlow...
100% 1/1 [00:08<00:00,  8.12s/it]
computing reference batch activations...
100% 157/157 [00:29<00:00,  5.26it/s]
computing/reading reference batch statistics...
computing sample batch activations...
100% 782/782 [02:23<00:00,  5.44it/s]
computing/reading sample batch statistics...
Computing evaluations...
Inception Score: 44.68443298339844
FID: 29.297974807360504
sFID: 22.245279315420362
tcmalloc: large alloc 2000003072 bytes == 0x5631e9efc000 @  0x7fde085d3001 0x7fde0509b54f 0x7fde050ebb58 0x7fde050efb17 0x7fde0518e203 0x5631185bb544 0x5631185bb240 0x56311862f627 0x563118629ced 0x5631185bcbda 0x56311862ac0d 0x5631185bcafa 0x56311862ac0d 0x5631185bcafa 0x56311862a915 0x5631186299ee 0x5631186296f3 0x5631186f34c2 0x5631186f383d 0x5631186f36e6 0x5631186cb163 0x56