Disable eager execution to solve tf version related issue.


In [None]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
import tensorflow.compat.v1 as tf
tf.compat.v1.disable_eager_execution()

Imports


In [None]:
import os
import cv2
import numpy as np
import matplotlib.pyplot as plt
from sklearn.metrics import confusion_matrix, accuracy_score
from keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras.utils import to_categorical

Classifier class

In [None]:
INPUT_DIM = 32
NUM_OF_CLASSES = 10

class CNNet(object):
  
  def __init__(self, learning_rate=0.001, input_dim = 32, num_class=10):
    
    # Make hyperparameters instance variables. 
    self.learning_rate = learning_rate
    self.num_class = num_class
    self.input_dim = input_dim
 
    self.initializer = tf.keras.initializers.glorot_uniform()
    self.optimizer = tf.train.AdamOptimizer(self.learning_rate)
    
    # Set Random seed for Tensorflow.
    self.random_seed = 42
    tf.set_random_seed(self.random_seed)


  def network(self, X, activations=False):
    
    with tf.variable_scope('network', initializer=self.initializer):
      
        # Define the layers.
        self.layers = [
            
            tf.layers.Conv2D(filters=16, kernel_size=3,
                                     strides=(1, 1), activation='relu',padding='SAME'),
            
            tf.layers.Conv2D(filters=32, kernel_size=3,
                                     strides=(2, 2), activation='relu'),
            tf.keras.layers.Conv2D(filters=64, kernel_size=3,
                                     strides=(2, 2), activation='relu'),
            tf.keras.layers.Flatten(),
            tf.keras.layers.Dense(self.num_class)
        ]
        
        # Store activations for investigation later.
        activations_list = []
        
        # Forward pass loop, store intermediate activations.
        out = X
        for layer in self.layers:
          out = layer(out)
          activations_list.append(out)
        
        if activations:
          return out, activations_list
        else:
          return out, tf.nn.softmax(out)

  def model(self, X, y):

    # Get the logits from the network.
    out_logits, _ = self.network(X)
   
    # Calculate Cross Entropy loss.
    loss = tf.reduce_mean(tf.nn.softmax_cross_entropy_with_logits_v2(
            labels=y, logits=out_logits))
    
    
    # Perform backprop wrt loss and update network variables.
    # Instead of doing optimizer.minimize(loss), explicitly defining
    # which variables are trained.
    grads = self.optimizer.compute_gradients(loss)
    
    vars_list = tf.get_collection(tf.GraphKeys.TRAINABLE_VARIABLES, 
                                  scope="network")
    grad_list = [(g, v) for g, v in grads if v in vars_list]
    optimize_op = self.optimizer.apply_gradients(grad_list)
    
    return loss, optimize_op, out_logits
  
  def metrics(self, y, logits, verbose=True):
    
    # Get prediction values and flatten.
    y = np.argmax(y, axis=1).flatten()
    y_ = np.argmax(logits, axis=1).flatten()

    confusion = confusion_matrix(y_true=y, y_pred=y_)
    accuracy = accuracy_score(y_true=y, y_pred=y_)
    
    if verbose:
      print ("accuracy score: ", accuracy) 
      
    return accuracy
  
  def train(self, train_X, train_y, test_X, test_y, 
            batch_size=256, epochs=100):
    
    # Clear deafult graph stack and reset global graph definition.
    tf.reset_default_graph()
    
    # GPU config.  
    config = tf.ConfigProto()
    config.gpu_options.allow_growth=True
    
    # Placeholders for tensors.
    X = tf.compat.v1.placeholder(shape=[None, self.input_dim, self.input_dim, 3], dtype=tf.float32)
    y = tf.compat.v1.placeholder(shape=[None, self.num_class], dtype=tf.float32)
    
    # Get the ops for training the model.
    loss, optimize, out_logits = self.model(X, y)
     
    self.saver =  tf.compat.v1.train.Saver()
    
    # Initialize session.
    with tf.Session(config=config) as sess:
      
      # Initialize the variables in the graph.
      sess.run(tf.global_variables_initializer())
      sess.run(tf.local_variables_initializer())
      
      # Stochastic Gradient Descent loop.
      for step in range(epochs):
        
        # Total number of batch and start index.
        num_train_batches, start = int(train_X.shape[0]/batch_size), 0

        for _ in range(num_train_batches):
          
            # Indexes for batch selection.
            end = start + batch_size         
            limit = end if end < train_X.shape[0] else train_X.shape[0]
            idx = np.arange(start, limit)
            
            # Run optimization op with batch.
            _, step_loss = sess.run([optimize, loss], 
                                    {X: train_X[idx], y: train_y[idx]})
            start = end
        
        print('='*80+'\nEpoch: {0} Training Loss: {1}'.format(step, step_loss))
        
        # Get probabilities and report metrics.
        probs = sess.run(tf.nn.softmax(out_logits), {X: test_X, y: test_y})
        acc = self.metrics(test_y, probs)
        
        self.saver.save(sess, "model.ckpt")
        
      # Get and save representation space for training set.
      probs = sess.run(out_logits, {X: train_X})
      np.save('representations.npy', probs)
      
      return step_loss, acc

  def predict(self, X_test, logits=False, reps=False):
    
    tf.reset_default_graph()
    tf.set_random_seed(42)

    X =  tf.compat.v1.placeholder(shape=[None, self.input_dim, self.input_dim, 3], dtype=tf.float32)
    
    # Get the ops for running inference on the model.
    out_logits, out_probs = self.network(X)
    
    saver = tf.compat.v1.train.Saver()
    # Initialize a new session on the graph.
    with tf.Session() as sess:
      
        # Load the trained model into the session to run inference.
        saver.restore(sess, "model.ckpt")
        # Get 
        rep_logits, probs = sess.run([out_logits, out_probs], {X: X_test})
    
    preds = np.argmax(probs, axis=1).flatten()
    if logits:
      return preds, probs
    elif reps:
      return preds, rep_logits
    else:
      return preds

## Training a CNN on Medical MNIST

**Methods**

This method convert training/ testing iterator to array

In [None]:
def convertIteratorToArray(data_generator):
  data_list = []
  batch_index = 0

  while batch_index <= data_generator.batch_index:
      data = data_generator.next()
      data_list.append(data[0])
      batch_index = batch_index + 1

  # now, data_array is the numeric data of whole images
  
  converted_array = np.asarray(data_list)
  return converted_array

This method convert the class labels to an array

In [None]:
def processLabels(data_generator):
  
  data_list = []
  batch_index = 0

  while batch_index <= data_generator.batch_index:
      data = data_generator.next()
      item = data[1].reshape(10)
      data_list.append(item)
      batch_index = batch_index + 1

  # now, data_array is the numeric data of whole images
  converted_array = np.asarray(data_list)
  return converted_array


**Data Preporcessing**

Creating ImageDataGenerator for Loading Training Images 

In [None]:
datagen = ImageDataGenerator(validation_split=0.33)

Load and iterate training dataset

In [None]:
dataset_path = "/content/drive/MyDrive/Research/GTZAN_Adversarial/images_original/"

In [None]:

train_generator = datagen.flow_from_directory(
    directory=dataset_path,
    target_size=(INPUT_DIM, INPUT_DIM),
    color_mode="rgb",
    batch_size=1,
    subset="training",
    class_mode="categorical",
    shuffle=True,
    seed=42
)

Found 670 images belonging to 10 classes.


Load and iterate test dataset

In [None]:
test_generator = datagen.flow_from_directory(
    directory=dataset_path,
    target_size=(INPUT_DIM, INPUT_DIM),
    color_mode="rgb",
    batch_size=1,
    subset="validation",
    class_mode="categorical",
    shuffle=True,
    seed=42
)

Found 329 images belonging to 10 classes.


Get labels of training/ testing data

In [None]:
train_labels = processLabels(train_generator)
test_labels = processLabels(test_generator)

Converting training/testing iterators to an array

In [None]:
train_array = convertIteratorToArray(train_generator)
test_array = convertIteratorToArray(test_generator)

In [None]:
# Reshape to 4 dimensions and bring intensity to [0,1].
x_train, x_test = train_array.reshape(-1,INPUT_DIM,INPUT_DIM,3)/255., test_array.reshape(-1,INPUT_DIM,INPUT_DIM,3)/255.
y_train_oh, y_test_oh = train_labels, test_labels


Model training

In [None]:
# Train the model for 15 epochs.
gtzan_classifier = CNNet()
gtzan_classifier.train(x_train, y_train_oh, x_test, y_test_oh, epochs=10)

Epoch: 0 Training Loss: 2.3252391815185547
accuracy score:  0.10638297872340426
Epoch: 1 Training Loss: 2.3090906143188477
accuracy score:  0.10638297872340426
Epoch: 2 Training Loss: 2.2965474128723145
accuracy score:  0.0972644376899696
Epoch: 3 Training Loss: 2.287534475326538
accuracy score:  0.0972644376899696
Epoch: 4 Training Loss: 2.280410051345825
accuracy score:  0.11550151975683891
Epoch: 5 Training Loss: 2.273501396179199
accuracy score:  0.1033434650455927
Epoch: 6 Training Loss: 2.266392707824707
accuracy score:  0.0911854103343465
Epoch: 7 Training Loss: 2.259554862976074
accuracy score:  0.0972644376899696
Epoch: 8 Training Loss: 2.252469301223755
accuracy score:  0.10030395136778116
Epoch: 9 Training Loss: 2.2438135147094727
accuracy score:  0.10638297872340426


(2.2438135, 0.10638297872340426)

# Jacobian Based Saliency Map Attack

Methods 

In [None]:
def saliency_map(X, dtdx, dodx, eps, cmin, cmax):
  """
  Saliency map function that returns score for each input dimension.
  """
  # Check initial conditions.
  c1 = tf.logical_or(eps < 0, X < cmax)
  c2 = tf.logical_or(eps > 0, X > cmin)
  
  # Check saliency map conditions.
  c3 = dtdx >= 0
  c4 = dodx <= 0
  
  # Get 1D score by doing logical AND between conditions.
  cond = tf.cast(tf.reduce_all([c1, c2, c3, c4], axis=0),dtype=tf.float32)
  
  score = cond * (dtdx * tf.abs(dodx))
  
  # Return score for each pixel
  score = tf.reshape(score, shape=[1, 3072])# 32*32*3 = 3072
  
  return score

In [None]:
def jacobian_matrix(y, x, n_class):
  """
  Calculate jacobian of logits wrt input.
  """
  for i in range(n_class):
      if i==0:
          j = tf.gradients(y[i], x)
      else:
          j = tf.concat([j, tf.gradients(y[i], x)],axis=0)
  return j

In [None]:
def jsma(X_adv, target_y, model, eps, cmin=0.0, cmax=1.0):
  """
  Implementation of JSMA method to generate adversarial images.
  """
  # Get model logits and probs for the input.
  logits, probs = model(tf.reshape(X_adv, shape=(-1,32,32,3)))
  
  # Get model prediction for inputs.
  y_ind = tf.argmax(probs[0])
  
  # Calculate jacobian matrix of logits wrt to input.
  jacobian = jacobian_matrix(tf.reshape(logits, (-1,)), X_adv, 10)

  # Get the gradient of logits wrt to prediction and target.
  grad_input, grad_target = jacobian[y_ind], jacobian[target_y]
  grad_other = grad_input - grad_target
  
  # Compute saliency score for each dimension.
  score = saliency_map(X_adv, grad_target, grad_other, eps, cmin, cmax)

  # Select dimension of input and apply epsilon value.
  idx = tf.argmax(score, axis=1)
  pert = tf.one_hot(idx, 3072, on_value=eps, off_value=0.0)
  pert = tf.reshape(pert, shape=tf.shape(X_adv))

  X_adv = tf.clip_by_value(X_adv + pert, cmin, cmax)

  return X_adv, pert

In [None]:
def generate_jsma(model, X, target, eps=1.0, epochs=50):
  """
  Run JSMA on input image for `epochs` number of times.
  """
  tf.reset_default_graph()
  tf.set_random_seed(42)
  
  # Placeholder for single image.
  X_p =  tf.compat.v1.placeholder(shape=[32, 32, 3], dtype=tf.float32)
  
  # Op for one iteration of jsma.
  adv_op = jsma(X_p, target_y=target, model=model, eps=eps)

  digit = X.reshape(32,32,3)

  with tf.Session() as sess:
    tf.compat.v1.train.Saver().restore(sess, "model.ckpt")
    for i in range(epochs):
      digit, pert_iter = sess.run(adv_op, {X_p: digit})
      
  pert = digit - X
      
  return digit.reshape(32,32,3), pert.reshape(32,32,3)

Generate adversarial image and pertubations using JSMA method.

In [None]:

advs_jsma, perts_jsma = [], []

for imgs in x_train:
  digit,pert = generate_jsma(gtzan_classifier.network,
                        imgs,
                        target=0,
                        eps=0.5,
                        epochs=100)
  advs_jsma.append(digit)
  perts_jsma.append(pert)

# Get predictions and confidence for adversarial inputs
preds_jsma, probs_jsma = gtzan_classifier.predict(
    np.array(advs_jsma).reshape(len(x_train),32,32,3), logits=True)

# Convert predictions and probabilities into lists.
preds_jsma = preds_jsma.tolist()
probs_jsma = np.max(probs_jsma, axis=1).tolist()


Saving Output Images

In [None]:
image_counts =[13, 101, 101, 101, 101, 101, 101, 101, 101, 101]

image_counts

[13, 101, 101, 101, 101, 101, 101, 101, 101, 101]

In [None]:
label_names = list(train_generator.class_indices.keys())


In [None]:
def save_adv_images(advs_jsma, probs_jsma):
  output_folder_root_dir = "/content/drive/MyDrive/Research/GTZAN_Adversarial/jsma_output"
  label_names = list(test_generator.class_indices.keys())

  for index,image in enumerate(advs_jsma):
    image_resize = image*255 
    # print(f"label {probs_jsma[index]}")
    if image_counts[probs_jsma[index]] < 101:
      if(train_generator.classes[index] != probs_jsma[index]):
        image_name = str(image_counts[probs_jsma[index]])+'.png'
        image_dir = os.path.join(output_folder_root_dir, label_names[probs_jsma[index]])
        if not os.path.exists(image_dir):
          os.makedirs(image_dir)
        image_path = os.path.join(image_dir,image_name)
        isWritten = cv2.imwrite(image_path, image_resize)
        if isWritten:
          print(f'Image {index} is successfully saved in folder {label_names[probs_jsma[index]]}')
          print(f'Actual label {label_names[train_generator.classes[index]]}')
          image_counts[probs_jsma[index]] += 1
      

In [None]:
save_adv_images(advs_jsma, preds_jsma )