***Using the Neural Cleanse method to prune the Badnet***

In [None]:
import keras
import keras.backend as K
from keras import initializers


def Net():
	# define input
	x = keras.Input(shape=(55, 47, 3), name='input')
	# feature extraction
	conv_1 = keras.layers.Conv2D(20, (4, 4), activation='relu', name='conv_1')(x)
	pool_1 = keras.layers.MaxPooling2D((2, 2), name='pool_1')(conv_1)
	conv_2 = keras.layers.Conv2D(40, (3, 3), activation='relu', name='conv_2')(pool_1)
	pool_2 = keras.layers.MaxPooling2D((2, 2), name='pool_2')(conv_2)
	conv_3 = keras.layers.Conv2D(60, (3, 3), activation='relu', name='conv_3')(pool_2)
	pool_3 = keras.layers.MaxPooling2D((2, 2), name='pool_3')(conv_3)
	# first interpretation model
	flat_1 = keras.layers.Flatten()(pool_3)	
	fc_1 = keras.layers.Dense(160, name='fc_1')(flat_1)
	# second interpretation model
	conv_4 = keras.layers.Conv2D(80, (2, 2), activation='relu', name='conv_4')(pool_3)
	flat_2 = keras.layers.Flatten()(conv_4)
	fc_2 = keras.layers.Dense(160, name='fc_2')(flat_2)
	# merge interpretation
	merge = keras.layers.Add()([fc_1, fc_2])
	add_1 = keras.layers.Activation('relu')(merge)
	drop = keras.layers.Dropout(0.5)
	# output
	y_hat = keras.layers.Dense(1283, activation='softmax', name='output')(add_1)
	model = keras.Model(inputs=x, outputs=y_hat)
	# summarize layers
	#print(model.summary())
	# plot graph
	#plot_model(model, to_file='model_architecture.png')

	return model


K.clear_session()
model = Net()

In [None]:
##Data loader
class DataGenerator(object):
  def__init__(self,target_ls):
    self.target_ls=target_ls
  def generate_data(self,X,Y, inject_ratio):
    batch_X,batch_Y = [] , []
    while 1:
      inject_ptr = random.uniform(0,1)
      cur_idx = random.randrange(0,len(Y)-1)
      cur_x = X[cur_idx]
      cur_y = Y[cur_idx]

      if inject_ptr < inject_ratio:
        tgt = random.choice(self.target_ls)
        cur_x,cur_y = infect_X(cur_x,tgt)
      
      batch_X.append(cur_x)
      batch_Y.append(cur_y)

      if len(batch_Y) == BATCH_SIZE:
        yield np.array(batch_X), np.array(batch_Y)
        batch_X.batch_Y = [] , []
        



In [None]:
##training the model
def inject_backdoor():
  train_X,train_Y,test_X,test_Y = load_dataset()
  model = load_traffic_sign_model()

  base_gen =DataGenerator(TARGET_LS)
  test_adv_gen = base_gen.generate_data(test_X,test_Y,1)
  train_gen = base_gen.generate(train_X,train_Y,INJECT_RATIO)

  cb = BackdoorCall(tset_X,test_Y, test_adv_gen)
  number_images = NUMBER_IMAGES_RATIO * len(train_Y)
  model.fit_generator(train_gen,steps_per_epoch=number_images)

  loss, acc =model.evaluate(test_X,test_Y,verbose=0)
  loss,backdoor_acc = model.evaluate_generator(test_adv_gen,steps=200,verbose=0)

In [None]:
##Reverse trigger
def visualize_trigger_w_mask(visualizer,gen,y_target,save_pattern_flag=True):
  visualize_start_time = time.time()

  pattern = np.random.random(INPUT_SHAPE)*255
  mask = np.random.random(MASK_SHAPE)

  pattern,mask,mask_upsample, logs=visualizer.visualize(
      gen=gen,y_target=y_target,pattern_init=pattern, mask_init=mask
  )

In [None]:
def visualize(self,gen,y_target,pattern_init,mask_init):
  self.reset_state(pattern_init,mask_init)
  mask_best=None
  mask_upsample_best=None
  pattern_best = None
  reg_best = float('inf')

  Y_target= to_categorical([y_target]*self.batch_size,self.num_classes)
  for step in range(self.steps):
    loss_ce_list=[]
    loss_reg_list=[]
    loss_list=[]
    loss_acc_list = []
    for idx in range(self.minibatch):
      X_batch,_=gen.next()
      if batch.shape[0] != Y_target.shape[0]:
        Y_target= to_categorical([y_target]&X_batch.shape[0],self.num_classes)
        loss_ce_list.extend(list(loss_ce_value.flatten()))
        loss_reg_list.extend(list(loss_reg_value.flatten()))
        loss_list.extend(list(loss_value.flatten()))
        loss_acc_list.extend(list(loss_acc_value.flatten()))
      avg_ce_loss = np.mean(loss_ce_list)
      avg_loss_reg = np.mean(loss_reg_list)
      avg_loss = np.mean(loss_list)
      avg_loss_acc = np.mean(loss_list_acc)

      if avg_loss_acc >= self.attack_succ_threshold and avg_loss_reg < reg_best:
        mask_best = K.eval(self.mask_tensor)
        mask_best = mask_best[0,...,0]
        mask_upsample_best = K.eval(self.mask_upsample_tensor)
        mask_upsample_best = mask_upsample_best[0,...,0]
        pattern_best =K.eval(self.pattern_raw_tensor)
        reg_best=avg_loss_reg


In [None]:
#pattern
  img_filename=(
      '%s/%s'%(
          RESULT_DIR,IMG_FILENAME_TEMPLATE %('pattern',y_target)))
  utils_backdoor.dump_image(pattern,img_filename,'png')


#mask
  img_filename=(
      '%s/%s'%(
          RESULT_DIR,IMG_FILENAME_TEMPLATE %('mask',y_target)))
  utils_backdoor.dump_image(np.expand_dims(mask,axis=2)*255,img_filename,'png')

  fusion = np.multiply(pattern,np.expand_dims(mask,axis=2))
  img_filename=(
      '%s/%s'%(
          RESULT_DIR,IMG_FILENAME_TEMPLATE %('fusion',y_target)))
  utils_backdoor.dump_image(fusion,img_filename,'png')
