In [10]:
# Calculate success rate in victim model

In [11]:
import os
import tempfile
import importlib
import pickle

import tensorflow as tf
import numpy as np
import matplotlib.pyplot as plt
import hydra
import mlflow
from omegaconf import DictConfig
import foolbox

import mlflow_writer

In [12]:
with hydra.initialize(config_path="conf"):
    cfg = hydra.compose(
        config_name='config',
        overrides=[
            "victim=mnist",
            "victim.model=medium"
            ]
        )

In [13]:
def create_AEs(model, ds_test, attack_method, epsilons):
    sm = foolbox.TensorFlowModel(model, bounds=(0, 1))
    res = {'raw': [], 'clipped': [], 'is_adv': [], 'label': []}
    # Create AEs
    for batch in ds_test:
        _, clipped, is_adv = attack_method(sm, batch[0],
                                           batch[1], epsilons=epsilons)
        res['raw'].append([batch[0]]*len(epsilons))
        res['clipped'].append(clipped)
        res['is_adv'].append(is_adv)
        res['label'].append([batch[1]]*len(epsilons))

    # Aggregate
    result = []
    for i in range(len(epsilons)):
        tmp = {}
        for k in res.keys():
            tmp[k] = np.concatenate(
                [res[k][v][i].numpy() for v in range(len(res[k]))], axis=0)
        result.append(tmp)

    return result


# Victim modelが正解できるサンプルだけを抽出する
def dataset_screening(
    victim_model: tf.keras.Model,
    dataset: tf.data.Dataset
) -> tf.data.Dataset:

    correct_x = []
    correct_t = []
    for batch in dataset:
        x, t = batch
        t = t.numpy()
        y = np.argmax(victim_model(x, training=False).numpy(), axis=1)
        idx = (t == y)
        correct_x.append(x.numpy()[idx])
        correct_t.append(t[idx])
    correct_x = np.concatenate(correct_x, axis=0)
    correct_t = np.concatenate(correct_t, axis=0)

    return tf.data.Dataset.from_tensor_slices((correct_x, correct_t))


def eval_AEs(victim_model, AEs):
    y = []
    for batch in AEs:
        y.append(victim_model(batch[0], training=False).numpy())
    y = np.concatenate(y, axis=0)
    return y

In [14]:
# Prepare test dataset
get_dataset = getattr(
        importlib.import_module(
            'datasets.'+cfg['victim']['task_type']
        ),
        'get_dataset'
    )
_, ds_test = get_dataset(cfg['victim'])

# Load Victim model
victim_model = getattr(
        importlib.import_module(
            'models.'+cfg['victim']['task_type']
        ),
        cfg['victim']['model']
    )()
victim_model.load_weights(os.path.join(
        cfg['global_params']['victim_model_path'],
        cfg['victim']['model']+'.h5'
    ))

# Datasetのスクリーニング
ds_test = dataset_screening(
    victim_model,
    ds_test
)

if len(ds_test) < cfg['create_AEs']['num_samples']:
    print("# of candidates for AEs is lower than create_AEs.num_samples")

ds_test = ds_test.take(
    cfg['create_AEs']['num_samples']
    ).batch(
        cfg['create_AEs']['batch_size']
    )

# AEs生成
eps = [eval(v) for v in cfg['create_AEs']['epsilons']]

In [15]:
attack_method = foolbox.attacks.PGD(
    abs_stepsize=1/255.0, random_start=True)
result = create_AEs(victim_model, ds_test, attack_method, eps)

In [16]:
for i, e in enumerate(cfg['create_AEs']['epsilons']):
    print(e, f'{np.mean(result[i]["is_adv"])*100.0:.3f}')

4/255.0 1.800
8/255.0 6.440
16/255.0 40.020
24/255.0 84.560


In [17]:
res = {}
for i, e in enumerate(cfg['create_AEs']['epsilons']):
    tmp = result[i]
    pre = []
    for x in tmp['clipped']:
        p = victim_model(x[np.newaxis, ...], training=False).numpy()
        pre.append(p)
    tmp['victim_prediction'] = np.concatenate(pre, axis=0)
    res[e] = tmp

In [None]:
import pickle
with open(f'./figures/ae_origin_{cfg["victim"]["task_name"]}.pkl', 'wb') as f:
    pickle.dump(res, f)