In [None]:
import keras
from keras import backend as K

import sys
import h5py
import numpy as np
import matplotlib.pyplot as plt
# import keract

In [None]:
# Helper functions to load data

def data_loader(filepath):
    data = h5py.File(filepath, 'r')
    x_data = np.array(data['data'])
    y_data = np.array(data['label'])
    x_data = x_data.transpose((0,2,3,1))
    return x_data, y_data

Define paths to the model and data files

In [None]:
model_path = 'models/sunglasses_bd_net.h5'
clean_data_path = 'data/clean_test_data.h5'
pois_data_path = 'data/sunglasses_poisoned_data.h5'
val_data_path = 'data/clean_validation_data.h5'

Load in data from the h5 files

In [None]:
bd_model = keras.models.load_model(model_path)
x_clean, y_clean = data_loader(clean_data_path)
x_pois, y_pois = data_loader(pois_data_path)
x_val, y_val = data_loader(val_data_path)

We'll first create a data generator to carry out finding the trigger in batches. Let's say thta the badnet is infected to classify into label target label.

In [None]:
num_labels = np.unique(y_clean).shape[0]
target = 0

In [None]:
from keras.preprocessing.image import ImageDataGenerator

datagen = ImageDataGenerator()
generator = datagen.flow(x_clean, y_clean, batch_size=32) # We'll use a batch size of 32

We'll start with a random mask and a random pattern

In [None]:
input_shape = bd_model.input_shape[1:]

In [None]:
pattern_init = np.random.random(bd_model.input_shape[1:]) * 255.0
mask_init = np.random.random(bd_model.input_shape[1:3])
mask_init = np.expand_dims(mask_init, axis=2)

In [None]:
pattern = pattern_init
mask = mask_init

In [None]:
# convert to tanh space
mask_tanh = np.arctanh((mask - 0.5) * (2 - K.epsilon()))
pattern_tanh = np.arctanh((pattern / 255.0 - 0.5) * (2 - K.epsilon()))

In [None]:
from keras.layers import UpSampling2D, Cropping2D
# prepare mask related tensors
upsample_size = 1
mask_tanh_tensor = K.variable(mask_tanh)
mask_tensor_unrepeat = (K.tanh(mask_tanh_tensor) / (2 - K.epsilon()) + 0.5)
mask_tensor_unexpand = K.repeat_elements(mask_tensor_unrepeat, rep=3, axis=2)
mask_tensor = K.expand_dims(mask_tensor_unexpand, axis=0)
upsample_layer = UpSampling2D(size=(upsample_size, upsample_size))
mask_upsample_tensor_uncrop = upsample_layer(mask_tensor)
uncrop_shape = K.int_shape(mask_upsample_tensor_uncrop)[1:]
cropping_layer = Cropping2D(cropping=((0, uncrop_shape[0] - input_shape[0]), (0, uncrop_shape[1] - input_shape[1])))
mask_upsample_tensor = cropping_layer(mask_upsample_tensor_uncrop)
reverse_mask_tensor = (K.ones_like(mask_upsample_tensor) - mask_upsample_tensor)

In [None]:
# prepare pattern related tensors
pattern_tanh_tensor = K.variable(pattern_tanh)
pattern_raw_tensor = ((K.tanh(pattern_tanh_tensor) / (2 - K.epsilon()) + 0.5) * 255.0)

In [None]:
# Adverserial input tensor
input_tensor = K.placeholder(bd_model.input_shape)
input_raw_tensor = input_tensor
X_adv_raw_tensor = (reverse_mask_tensor * input_raw_tensor * 255.0 + mask_upsample_tensor * pattern_raw_tensor)
X_adv_tensor = X_adv_raw_tensor / 255.0

In [None]:
output_tensor = bd_model(X_adv_tensor)
y_true_tensor = K.placeholder(bd_model.output_shape)

In [None]:
from keras.losses import categorical_crossentropy
from keras.metrics import categorical_accuracy

loss_ce = categorical_crossentropy(output_tensor, y_true_tensor)
loss_acc = categorical_accuracy(output_tensor, y_true_tensor)
loss_reg = K.constant(0)
loss = loss_ce

In [None]:
from keras.optimizers import Adam
opt = Adam(lr=0.1, beta_1=0.5, beta_2=0.9)

In [None]:
updates = opt.get_updates(loss, [pattern_tanh_tensor, mask_tanh_tensor])

In [None]:
train = K.function([input_tensor, y_true_tensor], [loss_ce, loss_reg, loss, loss_acc], updates=updates)

In [None]:
from keras.utils import to_categorical

In [None]:
attack_succ_threshold = 0.99

In [None]:
def reset_opt(opt):
    K.set_value(opt.iterations, 0)
    for w in opt.weights:
        K.set_value(w, np.zeros(K.int_shape(w)))
    pass

In [None]:
def reset_state(mask_init, pattern_init, opt):
    global mask_tanh_tensor, pattern_tanh_tensor
    # setting mask and pattern
    mask = np.array(mask_init)
    pattern = np.array(pattern_init)
    mask = np.clip(mask, 0, 1)
    pattern = np.clip(pattern, 0, 255)

    # convert to tanh space
    mask_tanh = np.arctanh((mask - 0.5) * (2 - K.epsilon()))
    pattern_tanh = np.arctanh((pattern / 255.0 - 0.5) * (2 - K.epsilon()))
    print('mask_tanh', np.min(mask_tanh), np.max(mask_tanh))
    print('pattern_tanh', np.min(pattern_tanh), np.max(pattern_tanh))

    K.set_value(mask_tanh_tensor, mask_tanh)
    K.set_value(pattern_tanh_tensor, pattern_tanh)
    # resetting optimizer states
    reset_opt(opt)
    pass

In [None]:
for step in range(1000):
    mini_batch = int(np.ceil(12830 / 32))
    reset_state(mask_init, pattern_init, opt)
    # record loss for all mini-batches
    # best optimization results
    mask_best = None
    mask_upsample_best = None
    pattern_best = None
    reg_best = float('inf')
    loss_ce_list = []
    loss_reg_list = []
    loss_list = []
    loss_acc_list = []
    for idx in range(mini_batch):
        X_batch, y_batch = generator.next()
        Y_target = to_categorical([target] * X_batch.shape[0], num_labels)
        (loss_ce_value, loss_reg_value, loss_value, loss_acc_value) = train([X_batch, Y_target])
        loss_ce_list.extend(list(loss_ce_value.flatten()))
        loss_reg_list.extend(list(loss_reg_value.flatten()))
        loss_list.extend(list(loss_value.flatten()))
        loss_acc_list.extend(list(loss_acc_value.flatten()))
    avg_loss_ce = np.mean(loss_ce_list)
    avg_loss_reg = np.mean(loss_reg_list)
    avg_loss = np.mean(loss_list)
    avg_loss_acc = np.mean(loss_acc_list)
    
    print('step: %3d, attack: %.3f, loss: %f, ce: %f, reg: %f, reg_best: %f' %
                          (step, avg_loss_acc, avg_loss,
                           avg_loss_ce, avg_loss_reg, reg_best))

    if avg_loss_acc >= attack_succ_threshold and avg_loss_reg < reg_best:
        mask_best = K.eval(mask_tensor)
        mask_best = mask_best[0, ..., 0]
        mask_upsample_best = K.eval(mask_upsample_tensor)
        mask_upsample_best = mask_upsample_best[0, ..., 0]
        pattern_best = K.eval(pattern_raw_tensor)
        reg_best = avg_loss_reg
        
    if mask_best is None:
        mask_best = K.eval(mask_tensor)
        mask_best = mask_best[0, ..., 0]
        mask_upsample_best = K.eval(mask_upsample_tensor)
        mask_upsample_best = mask_upsample_best[0, ..., 0]
        pattern_best = K.eval(pattern_raw_tensor)