In [1]:
"""
classifier and attacker
dataset: mnist
@author: Ying Meng (y.meng201011(at)gmail(dot)com)
""" 
# ---------------------
# import required packages
# ---------------------
from __future__ import division, absolute_import, print_function

import cv2
import keras.backend as K
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
import tensorflow as tf
import tensorflow.keras.backend as tf_K
#import time

from cleverhans.evaluation import batch_eval
from keras.utils import np_utils
from keras.datasets import mnist
from keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras import layers, models
from tensorflow.keras.models import load_model
from tqdm import tqdm
import matplotlib.pyplot as plt

K.set_image_dim_ordering('th')
tf.enable_eager_execution()
tf.set_random_seed(1000)

%matplotlib inline
#%load_ext autotime

Using TensorFlow backend.


In [2]:
# ------------------------------------
# Configuration for the experiment
# ------------------------------------
# debugging flag
debug = True

# parameters (model)
LR = 0.001
BATCH_SIZE = 128

# parameters (dataset)
IMG_ROW = 28
IMG_COL = 28
VAL_RATE = 0.2

# parameters (fgsm)
eps = 0.25

"""
Datasets used to train the models,
Each item stands for one type of transformation being applied on the dataset.
Specifically, 'clean' means the original dataset, no transformation being applied,
and each the rest stands for the transformation being applied on the clean dataset.
e.g., 'rotate90' means that each image was rotated 90 deg.
"""
# -----------------------------------
# Your task:
# Please complete this array, adding the transformations of interest.
# Please note that the transformations being added here
# should be the same you added in the transform function.
# -----------------------------------
M_ROTATE = ['rotate90', 'rotate180', 'rotate270']
M_SHIFT = ['shift_left', 'shift_right', 'shift_up', 'shift_down',
          'shift_top_left', 'shift_top_right', 'shift_bottom_right', 'shift_bottom_left']
M_FLIP = ['horizontal_flip', 'vertical_flip', 'both_flip']
M_AFFINE_TRANS = ['affine_vertical_compress', 'affine_vertical_stretch', 
                  'affine_horizontal_compress', 'affine_horizontal_stretch',
                  'affine_both_compress', 'affine_both_stretch']
M_MORPH_TRANS = ['erosion', 'dilation', 'opening', 'closing', 'gradient']
M_THRESH = ['thresh_bin', 'thresh_mean', 'thresh_gaussian'] # n
M_IMG_AUGMNT = ['samplewise_std_norm', 'feature_std_norm', 'zca_whitening', 'pca_whitening']
M_SCALE = ['scaling', 'upsampling', 'downsampling']
M_MISC = ['zca_eps_1e6', 'zca_eps_1e8', 'wavelet_trans']
M_SHEAR = ['horizontal_shear', 'vertical_shear', 'range_shear']
M = ['clean']

M.extend(M_ROTATE) # y
M.extend(M_SHIFT) # y
M.extend(M_FLIP) # y
M.extend(M_AFFINE_TRANS) # y
M.extend(M_MORPH_TRANS) # y
M.extend(M_THRESH) # n
M.extend(M_IMG_AUGMNT) # ing
M.extend(M_SCALE)
M.extend(M_SHEAR)
# M.extend(M_MISC)

# a walkaround for error of not able to save trained model:
# training models one by one
# M = ['samplewise_std_norm']
print('{} types of transformations.'.format(len(M)))
print(M)
# -----------------------------------
# Your task ends
# -----------------------------------

"""
Adversarial examples generation algorithms.
"""
attacks = ['fgsm'] #, 'jsma']

39 types of transformations.
['clean', 'rotate90', 'rotate180', 'rotate270', 'shift_left', 'shift_right', 'shift_up', 'shift_down', 'shift_top_left', 'shift_top_right', 'shift_bottom_right', 'shift_bottom_left', 'horizontal_flip', 'vertical_flip', 'both_flip', 'affine_vertical_compress', 'affine_vertical_stretch', 'affine_horizontal_compress', 'affine_horizontal_stretch', 'affine_both_compress', 'affine_both_stretch', 'erosion', 'dilation', 'opening', 'closing', 'gradient', 'thresh_bin', 'thresh_mean', 'thresh_gaussian', 'samplewise_std_norm', 'feature_std_norm', 'zca_whitening', 'pca_whitening', 'scaling', 'upsampling', 'downsampling', 'horizontal_shear', 'vertical_shear', 'range_shear']


In [3]:
"""
Load data
@author: Ying Meng (y.meng201011(at)gmail(dot)com)
"""
def load_mnist():
    """
    Load and process training set and test set
    """
    (X_train, Y_train), (X_test, Y_test) = mnist.load_data()
    X_train = X_train.reshape(-1, IMG_ROW, IMG_COL, 1)
    X_test = X_test.reshape(-1, IMG_ROW, IMG_COL, 1)
    # cast pixels to floats, normalize to [0, 1] range
    X_train = X_train.astype('float32')
    X_test = X_test.astype('float32')
    X_train /= 255
    X_test /= 255

    # one-hot-encode the labels
    Y_train = np_utils.to_categorical(Y_train, 10)
    Y_test = np_utils.to_categorical(Y_test, 10)
    
    print("Dataset(MNIST) summary:")
    print("Train set: {}, {}".format(X_train.shape, Y_train.shape))
    print("Test set: {}, {}".format(X_test.shape, Y_test.shape))
    
    return (X_train, Y_train), (X_test, Y_test)

In [4]:
"""
Define model structures for some datasets.
@author: Ying Meng (y.meng201011(at)gmail(dot)com)
"""
def cnn():
    """
    Returns the appropriate Keras model.
    :return: The model; a Keras 'Sequential' instance.
    """
    # MNIST model
    struct = [
        layers.Conv2D(32, (3, 3), input_shape=(IMG_ROW, IMG_COL, 1)),
        layers.Activation('relu'),
        layers.MaxPooling2D(pool_size=(2, 2)),

        layers.Conv2D(64, (3, 3)),
        layers.Activation('relu'),
        layers.MaxPooling2D(pool_size=(2, 2)),

        layers.Flatten(),
        layers.Dense(64 * 64),
        layers.Dropout(rate=0.4),
        layers.Dense(10),
        layers.Activation('softmax')
    ]
    
    model = models.Sequential()
    for layer in struct:
        model.add(layer)

    return model

In [5]:
"""
Training a model
@author: Ying Meng (y.meng201011(at)gmail(dot)com)
"""
def train(X, Y, model_name='mnist_cnn_clean.model'):
    """
    Train a model over given training set,
    then save the trained model.
    :param: model - the keras model to train
    :param: train_set - Tuple of the training set, includes training samples and corresponding desired labels.
    :param: val_set - Tuple of the validation set, includes samples and desired labels.
    :param: model_name - the name used to save the trained model.
    :return: na
    """
    nb_train = int(len(X) * VAL_RATE)
    train_samples = X[:-nb_train]
    train_classes = Y[:-nb_train]
    val_samples = X[-nb_train:]
    val_classes = Y[-nb_train:]
    
    model = cnn()
    
    model.compile(loss='categorical_crossentropy', 
                  optimizer='adam', metrics=['accuracy'])
        
    # Train the model
    print("Training {}...".format(model_name))
    model.fit(train_samples, train_classes, epochs=1,
              batch_size=BATCH_SIZE, shuffle=True,
              verbose=1, validation_data=(val_samples, val_classes))
     
    # Save the model
    #model.save("data/{}".format(model_name))
    models.save_model(model, "data/{}".format(model_name))
    del model
    
    print("Trained model has been saved to data/{}".format(model_name))

In [6]:
"""
Implement algorithms generating adversarial examples
@author: Ying Meng (y.meng201011(at)gmail(dot)com)
"""
def fgsm(x, prediction, eps=0.15, y=None, clip_min=None, clip_max=None):
    '''
    Define the symbolic FGSM fitting tf framework
    '''
    if y is None:
        y = tf.cast(tf.equal(prediction, 
                                tf.reduce_max(prediction, 1, keepdims=True)), tf.float32)
        
    y /= tf.reduce_sum(y, 1, keepdims=True)
    
    logits, = prediction.op.inputs
    loss = tf.reduce_mean(
        tf.nn.softmax_cross_entropy_with_logits_v2(logits=logits, labels=y))
    
    grad, = tf.gradients(loss, x)
    perturbation = eps * tf.sign(grad)
    adv_sample = tf.stop_gradient(x + perturbation)
    
    # clip if it's required
    if (clip_min is not None) and (clip_max is not None):
        adv_sample = tf.clip_by_value(adv_sample, clip_min, clip_max)
    
    return adv_sample

In [7]:
"""
Crafting adversarial examples.
@author: Ying Meng (y.meng201011(at)gmail(dot)com)
"""
def generate_adversarial(model_name, X, Y, attack_approach):
    """
    Craft and save adversarial examples.
    :param: model_name - the name of the target model
    :param: X
    :param: Y
    :param: attack_approach - adversarial example generation algorithm to use
    :param: adv_file_name - the file name used to save the generated adversarial exmaples
    """
    
    with tf.Session() as sess:
        tf_K.set_session(sess)
        tf_K.set_learning_phase(0)
        
        # define tf placeholders and operations
        x = tf.placeholder(tf.float32, shape = (None,) + X.shape[1:])
        y = tf.placeholder(tf.float32, shape = (None,) + Y.shape[1:])

        # load model
        print('loading model {}...'.format(model_name))
        model = load_model("data/{}".format(model_name))
        adv_file_name = model_name.split('.')[0]

        # model accuracy
        _, acc_original = model.evaluate(X, Y, batch_size=BATCH_SIZE, verbose=0)
        print('test acc (on original): {}'.format(acc_original))
        print('Generating adversarial examples using {}, it will take some time...'.format(attack_approach))
        
        if attack_approach == 'fgsm':
            adv_file_name = '{}_{}_eps{}.npy'.format(adv_file_name, attack_approach, int(eps * 100))

            # symbolic fgsm
            x_adv = fgsm(x, model(x), eps=eps, y=y, clip_min=None, clip_max=None)

            # craft adversarial examples
            X_adv, = batch_eval(sess, [x, y], [x_adv], [X, Y], batch_size=BATCH_SIZE)
        elif attack_approach == 'jsma':
            raise NotImplementedError('Not ready yet.')
        else:
            raise ValueError('{} is not supported.'.format(attack_approach))

        # test accuracy on adversarial examples
        _, acc_adv = model.evaluate(X_adv, Y, batch_size=BATCH_SIZE, verbose=0)
        print ('test acc (on adversarial): {} - (epsilon: {})'.format(acc_adv, eps))
        print('adv:', X.shape, X_adv.shape, Y.shape)
                
        # save the generated adversarial examples
        np.save("data/orig_{}".format(adv_file_name), X)
        np.save("data/adv_{}".format(adv_file_name), X_adv)
        np.save("data/label_{}".format(adv_file_name), Y)
        print("adversarial examples were generated and saved to data/*_{}".format(adv_file_name))


In [8]:
"""
Image processing - applying affine transformation on image.
@author: Ying Meng (y.meng201011(at)gmail(dot)com)
"""
''''def affine_trans(original_images, transformation):
    """
    Apply affine transformation on images.
    :param: original_images - the images to applied transformations on.
    :param: transformation - the standard transformation to apply.
    :return: the transformed dataset.
    """
    print('Applying affine transformation on images({})...'.format(transformation))
    
    # -----------------------------------------
    # In affine transformation, all parallel lines in the original image
    # will still be parallel in the transformed image.
    # To find the transformation matrix, we need to specify 3 points
    # from the original image and their corresponding locations in transformed image.
    # Then, the transformation matrix M (2x3) can be generated by getAffineTransform():
    # -----------------------------------------
    
    point1 = [0.25 * IMG_COL, 0.25 * IMG_ROW]
    point2 = [0.25 * IMG_COL, 0.5 * IMG_ROW]
    point3 = [0.5 * IMG_COL, 0.25 * IMG_ROW]
    
    pts_original = np.float32([point1, point2, point3])
        
    if (transformation == 'affine_vertical_compress'):
        point1 = [0.25 * IMG_COL, 0.25 * IMG_ROW]
        point2 = [0.25 * IMG_COL, 0.4 * IMG_ROW]
        point3 = [0.5 * IMG_COL, 0.25 * IMG_ROW]
    elif (transformation == 'affine_vertical_stretch'):
        point1 = [0.25 * IMG_COL, 0.25 * IMG_ROW]
        point2 = [0.25 * IMG_COL, 0.6 * IMG_ROW]
        point3 = [0.5 * IMG_COL, 0.25 * IMG_ROW]
    elif (transformation == 'affine_horizontal_compress'):
        point1 = [0.25 * IMG_COL, 0.25 * IMG_ROW]
        point2 = [0.25 * IMG_COL, 0.5 * IMG_ROW]
        point3 = [0.4 * IMG_COL, 0.25 * IMG_ROW]
    elif (transformation == 'affine_horizontal_stretch'):
        point1 = [0.25 * IMG_COL, 0.25 * IMG_ROW]
        point2 = [0.25 * IMG_COL, 0.5 * IMG_ROW]
        point3 = [0.6 * IMG_COL, 0.25 * IMG_ROW]
    elif (transformation == 'affine_both_compress'):
        point1 = [0.25 * IMG_COL, 0.25 * IMG_ROW]
        point2 = [0.25 * IMG_COL, 0.4 * IMG_ROW]
        point3 = [0.4 * IMG_COL, 0.25 * IMG_ROW]
    elif (transformation == 'affine_both_stretch'):
        point1 = [0.25 * IMG_COL, 0.25 * IMG_ROW]
        point2 = [0.25 * IMG_COL, 0.6 * IMG_ROW]
        point3 = [0.6 * IMG_COL, 0.25 * IMG_ROW]
    else:
        raise ValueError('{} is not supported.'.format(transformation))
    
    transformed_images = []
    
    # define transformation matrix
    pts_transformed = np.float32([point1, point2, point3])
    trans_matrix = cv2.getAffineTransform(pts_original, pts_transformed)
    
    # applying an affine transformation over the dataset
    transformed_images = np.zeros_like(original_images)
    for i in range(original_images.shape[0]):
        transformed_images[i] = np.expand_dims(cv2.warpAffine(original_images[i], trans_matrix, 
                                                              (IMG_COL, IMG_ROW)), axis=2)

    print('Applied transformation {}.'.format(transformation))
        
    # for debugging
    if debug:
        for i in range(5):
            cv2.imshow('clean image', original_images[i].reshape(IMG_ROW, IMG_COL))
            cv2.waitKey(0)
            cv2.destroyAllWindows()
            cv2.imshow(transformation, transformed_images[i])
            cv2.waitKey(0)
            cv2.destroyAllWindows()
    return transformed_images

# for debugging
if debug:
    (X_train, _), _ = load_mnist()
    X_train = X_train[:5]
    affine_trans(X_train, 'affine_both_compress')'''

'\'def affine_trans(original_images, transformation):\n    """\n    Apply affine transformation on images.\n    :param: original_images - the images to applied transformations on.\n    :param: transformation - the standard transformation to apply.\n    :return: the transformed dataset.\n    """\n    print(\'Applying affine transformation on images({})...\'.format(transformation))\n    \n    # -----------------------------------------\n    # In affine transformation, all parallel lines in the original image\n    # will still be parallel in the transformed image.\n    # To find the transformation matrix, we need to specify 3 points\n    # from the original image and their corresponding locations in transformed image.\n    # Then, the transformation matrix M (2x3) can be generated by getAffineTransform():\n    # -----------------------------------------\n    \n    point1 = [0.25 * IMG_COL, 0.25 * IMG_ROW]\n    point2 = [0.25 * IMG_COL, 0.5 * IMG_ROW]\n    point3 = [0.5 * IMG_COL, 0.25 * I

In [None]:
"""
Image processing - flip image
@author: Ying Meng (y.meng201011(at)gmail(dot)com)
"""

'''def flip(original_images, transformation):
    """
    Flip images.
    :param: original_images - the images to applied transformations on.
    :param: transformation - the standard transformation to apply.
    :return: the transformed dataset.
    """
    print('Flipping images({})...'.format(transformation))
    
    transformed_images = np.zeros_like(original_images)
    
    # set flipping direction
    flip_direction = 0
    if transformation == 'vertical_flip':
        # flip around the x-axis
        flip_direction = 0
    elif transformation == 'horizontal_flip':
        # flip around the y-axis
        flip_direction = 1
    elif transformation == 'both_flip':
        # flip around both axes
        flip_direction = -1
    else:
        raise ValueError('{} is not supported.'.format(transformation))
    
    # flip images
    for i in range(original_images.shape[0]):
        transformed_images[i] = np.expand_dims(cv2.flip(original_images[i], flip_direction), axis=2)
    
    # for debugging
    if debug:
        for i in range(5):
            cv2.imshow('clean image', original_images[i].reshape(IMG_ROW, IMG_COL))
            cv2.waitKey(0)
            cv2.destroyAllWindows()

            cv2.imshow(transformation, transformed_images[i])
            cv2.waitKey(0)
            cv2.destroyAllWindows()
    return transformed_images

# for debugging
if debug:
    (X_train, _), _ = load_mnist()
    X_train = X_train[:5]
    flip(X_train, 'both_flip')'''

'def flip(original_images, transformation):\n    """\n    Flip images.\n    :param: original_images - the images to applied transformations on.\n    :param: transformation - the standard transformation to apply.\n    :return: the transformed dataset.\n    """\n    print(\'Flipping images({})...\'.format(transformation))\n    \n    transformed_images = np.zeros_like(original_images)\n    \n    # set flipping direction\n    flip_direction = 0\n    if transformation == \'vertical_flip\':\n        # flip around the x-axis\n        flip_direction = 0\n    elif transformation == \'horizontal_flip\':\n        # flip around the y-axis\n        flip_direction = 1\n    elif transformation == \'both_flip\':\n        # flip around both axes\n        flip_direction = -1\n    else:\n        raise ValueError(\'{} is not supported.\'.format(transformation))\n    \n    # flip images\n    for i in range(original_images.shape[0]):\n        transformed_images[i] = np.expand_dims(cv2.flip(original_images[i

In [None]:
"""
Image processing - rotate image.
@author: Ying Meng (y.meng201011(at)gmail(dot)com)
"""
'''def rotate(original_images, transformation):
    """
    Rotate images.
    :param: original_images - the images to applied transformations on.
    :param: transformation - the standard transformation to apply.
    :return: the transformed dataset.
    """
    print('Rotating images({})...'.format(transformation))
    trans_matrix = None
    
    transformed_images = []
    center = (IMG_ROW/2, IMG_COL/2)
    
    # ---------------
    # rotate images
    # ---------------
    if transformation == 'rotate90':
        # rotate 90-deg counterclockwise
        angle = 90
        scale = 1.0

        trans_matrix = cv2.getRotationMatrix2D(center, angle, scale)
    elif transformation == 'rotate180':
        # rotate 180-deg counterclockwise
        angle = 180
        scale = 1.0
        
        trans_matrix = cv2.getRotationMatrix2D(center, angle, scale)
    elif transformation == 'rotate270':
        # rotate 270-deg counterclockwise
        angle = 270
        scale = 1.0
        
        trans_matrix = cv2.getRotationMatrix2D(center, angle, scale)
    else:
        raise ValueError('{} is not supported.'.format(transformation))
    
    # applying an affine transformation over the dataset
    transformed_images = np.zeros_like(original_images)
    for i in range(original_images.shape[0]):
        transformed_images[i] = np.expand_dims(cv2.warpAffine(original_images[i], trans_matrix, 
                                                              (IMG_COL, IMG_ROW)), axis=2)

    print('Applied transformation {}.'.format(transformation))
        
    # for debugging
    if debug:
        for i in range(5):
            cv2.imshow('clean image', original_images[i].reshape(IMG_ROW, IMG_COL))
            cv2.waitKey(0)
            cv2.destroyAllWindows()

            cv2.imshow(transformation, transformed_images[i])
            cv2.waitKey(0)
            cv2.destroyAllWindows()
    return transformed_images

# for debugging
if debug:
    (X_train, _), _ = load_mnist()
    X_train = X_train[:5]
    rotate(X_train, 'rotate270')'''

'def rotate(original_images, transformation):\n    """\n    Rotate images.\n    :param: original_images - the images to applied transformations on.\n    :param: transformation - the standard transformation to apply.\n    :return: the transformed dataset.\n    """\n    print(\'Rotating images({})...\'.format(transformation))\n    trans_matrix = None\n    \n    transformed_images = []\n    center = (IMG_ROW/2, IMG_COL/2)\n    \n    # ---------------\n    # rotate images\n    # ---------------\n    if transformation == \'rotate90\':\n        # rotate 90-deg counterclockwise\n        angle = 90\n        scale = 1.0\n\n        trans_matrix = cv2.getRotationMatrix2D(center, angle, scale)\n    elif transformation == \'rotate180\':\n        # rotate 180-deg counterclockwise\n        angle = 180\n        scale = 1.0\n        \n        trans_matrix = cv2.getRotationMatrix2D(center, angle, scale)\n    elif transformation == \'rotate270\':\n        # rotate 270-deg counterclockwise\n        angle 

In [None]:
"""
Image processing - shifting image.
@author: Ying Meng (y.meng201011(at)gmail(dot)com)
"""
'''def shift(original_images, transformation):
    """
    Shift images.
    :param: original_images - the images to applied transformations on.
    :param: transformation - the standard transformation to apply.
    :return: the transformed dataset.
    """
    print('Shifting images({})...'.format(transformation))
    
    # -----------------------------------------
    # Shift images in (tx, ty) direction, by 15% of width and/or height.
    # Given shift direction (tx, ty), we can create the
    # transformation matrix M as follows:
    #
    # M = [[1, 0, tx],
    #      [0, 1, ty]]
    #
    # -----------------------------------------
    tx = tf.cast(0.15 * IMG_COL, tf.int32)
    ty = tf.cast(0.15 * IMG_ROW, tf.int32)
    
    if transformation == 'shift_left':
        tx = 0 - tx
        ty = 0
    elif transformation == 'shift_right':
        tx = tx
        ty = 0
    elif transformation == 'shift_up':
        tx = 0
        ty = 0 - ty
    elif transformation == 'shift_down':
        tx = 0
        ty = ty
    elif transformation == 'shift_top_right':
        tx = tx
        ty = 0 - ty
    elif transformation == 'shift_top_left':
        tx = 0 - tx
        ty = 0 - ty
    elif transformation == 'shift_bottom_left':
        tx = 0 - tx
        ty = ty
    elif transformation == 'shift_bottom_right':
        tx = tx
        ty = ty
    else:
        raise ValueError('{} is not supported.'.format(transformation))
    
    transformed_images = []
    center = (IMG_ROW/2, IMG_COL/2)
    
    # define transformation matrix
    trans_matrix = np.float32([[1, 0, tx], [0, 1, ty]])
    
    # applying an affine transformation over the dataset
    transformed_images = np.zeros_like(original_images)
    for i in range(original_images.shape[0]):
        transformed_images[i] = np.expand_dims(cv2.warpAffine(original_images[i], trans_matrix, 
                                                              (IMG_COL, IMG_ROW)), axis=2)

    print('Applied transformation {}.'.format(transformation))
        
    # for debugging
    if debug:
        for i in range(5):
            cv2.imshow('clean image', original_images[i].reshape(IMG_ROW, IMG_COL))
            cv2.waitKey(0)
            cv2.destroyAllWindows()

            cv2.imshow(transformation, transformed_images[i])
            cv2.waitKey(0)
            cv2.destroyAllWindows()
    return transformed_images

# for debugging
if debug:
    (X_train, _), _ = load_mnist()
    X_train = X_train[:10]
    shift(X_train, 'shift_bottom_right')'''

'def shift(original_images, transformation):\n    """\n    Shift images.\n    :param: original_images - the images to applied transformations on.\n    :param: transformation - the standard transformation to apply.\n    :return: the transformed dataset.\n    """\n    print(\'Shifting images({})...\'.format(transformation))\n    \n    # -----------------------------------------\n    # Shift images in (tx, ty) direction, by 15% of width and/or height.\n    # Given shift direction (tx, ty), we can create the\n    # transformation matrix M as follows:\n    #\n    # M = [[1, 0, tx],\n    #      [0, 1, ty]]\n    #\n    # -----------------------------------------\n    tx = tf.cast(0.15 * IMG_COL, tf.int32)\n    ty = tf.cast(0.15 * IMG_ROW, tf.int32)\n    \n    if transformation == \'shift_left\':\n        tx = 0 - tx\n        ty = 0\n    elif transformation == \'shift_right\':\n        tx = tx\n        ty = 0\n    elif transformation == \'shift_up\':\n        tx = 0\n        ty = 0 - ty\n    e

In [None]:
"""
Image processing - morphological transformations.
@author: Ying Meng (y.meng201011(at)gmail(dot)com)
"""
def morph_trans(original_images, transformation):
    """
    Apply morphological transformations on images.
    :param: original_images - the images to applied transformations on.
    :param: transformation - the standard transformation to apply.
    :return: the transformed dataset.
    """
    print('Applying morphological transformation ({})...'.format(transformation))
    
    transformed_images = np.zeros_like(original_images)
    
    # set kernel as a matrix of size 2
    kernel = np.ones((2,2),np.uint8)
    
    if transformation == 'dilation':
        # min filter (Graphics Mill)
        # It's opposite of erosion (max filter)
        # In dilation, a pixel element is '1' if at least one pixel
        # under the kernel is '1'. So it increases the white region
        # in the image or size of foreground object increases.
        for i in range(original_images.shape[0]):
            transformed_images[i] = np.expand_dims(cv2.dilate(original_images[i], 
                                                              kernel, iterations=1), axis=2)
    elif transformation == 'erosion':
        # max filter (Graphic Mill)
        # The basic idea of erosion is like soil erosion.
        # It erodes away the boundaries of foreground object
        # (always try to keep foreground in white)
        # The kernel slides through the image as in 2D convolution.
        # A pixel in the original image will be considered 1 only if
        # all the pixels under the kernel is 1, otherwise, it's eroded.
        for i in range(original_images.shape[0]):
            transformed_images[i] = np.expand_dims(cv2.erode(original_images[i], 
                                                              kernel, iterations=1), axis=2)
    elif transformation == 'opening':
        # erosion followed by dilation
        for i in range(original_images.shape[0]):
            transformed_images[i] = np.expand_dims(cv2.morphologyEx(original_images[i], 
                                                              cv2.MORPH_OPEN, kernel), axis=2)
    elif transformation == 'closing':
        # erosion followed by dilation
        for i in range(original_images.shape[0]):
            transformed_images[i] = np.expand_dims(cv2.morphologyEx(original_images[i], 
                                                              cv2.MORPH_CLOSE, kernel), axis=2)
    elif transformation == 'gradient':
        # keep the outline of the object
        for i in range(original_images.shape[0]):
            transformed_images[i] = np.expand_dims(cv2.morphologyEx(original_images[i], 
                                                              cv2.MORPH_GRADIENT, kernel), axis=2)
    else:
        raise ValueError('{} is not supported.'.format(transformation))
    
    print('Applied transformation {}.'.format(transformation))
        
    # for debugging
    if debug:
        for i in range(5):
            cv2.imshow('clean image', original_images[i].reshape(IMG_ROW, IMG_COL))
            cv2.waitKey(0)
            cv2.destroyAllWindows()

            cv2.imshow(transformation, transformed_images[i])
            cv2.waitKey(0)
            cv2.destroyAllWindows()
    return transformed_images

# for debugging
if debug:
    (X_train, _), _ = load_mnist()
    X_train = X_train[:5]
    morph_trans(X_train, 'closing')

Dataset(MNIST) summary:
Train set: (60000, 28, 28, 1), (60000, 10)
Test set: (10000, 28, 28, 1), (10000, 10)
Applying morphological transformation (closing)...
Applied transformation closing.


In [None]:
"""
Image processing - image thresholding
@author: Ying Meng (y.meng201011(at)gmail(dot)com)
"""

# def thresholding(original_images, transformation):
#     """
#     Image thresholding.
#     :param: original_images - the images to applied transformations on.
#     :param: transformation - the standard transformation to apply.
#     :return: the transformed dataset.
#     """
#     print('Applying image thresholding({})...'.format(transformation))
    
#     transformed_images = np.zeros_like(original_images)
        
#     if transformation == 'thresh_binary':
#         # 
#         for i in range(original_images.shape[0]):
#             _, transformed_images[i] = np.expand_dims(cv2.threshold(original_images[i],
#                                                                     127, 255, cv2.THRESH_BINARY), axis=2)
#     elif transformation == 'thresh_mean':
#         # 
#         for i in range(original_images.shape[0]):
#             transformed_images[i] = np.expand_dims(cv2.adaptiveThreshold(original_images[i],
#                                                                          255, cv2.ADAPTIVE_THRESH_MEAN_C,
#                                                                         cv2.THRESH_BINARY, 11, 2), axis=2)
#     elif transformation == 'thresh_gaussian':
#         # 
#         for i in range(original_images.shape[0]):
#             transformed_images[i] = np.expand_dims(cv2.adaptiveThreshold(original_images[i],
#                                                                          255, cv2.ADAPTIVE_THRESH_GAUSSIAN_C,
#                                                                         cv2.THRESH_BINARY, 11, 2), axis=2)
#     else:
#         raise ValueError('{} is not supported.'.format(transformation))
    
#     print('Applied transformation {}.'.format(transformation))
        
#     # for debugging
#     if debug:
#         for i in range(5):
#             cv2.imshow('clean image', original_images[i].reshape(IMG_ROW, IMG_COL))
#             cv2.waitKey(0)
#             cv2.destroyAllWindows()

#             cv2.imshow(transformation, transformed_images[i])
#             cv2.waitKey(0)
#             cv2.destroyAllWindows()
#     return transformed_images

# # for debugging
# if debug:
#     (X_train, _), _ = load_mnist()
#     X_train = X_train[:5]
#     thresholding(X_train, 'thresh_binary')

In [None]:
"""
Image processing - image augmentation
@author: Ying Meng (y.meng201011(at)gmail(dot)com)
"""
# M_IMG_AUGMNT = ['samplewise_std_norm', 'feature_std_norm', 'zca_whitening', 'pca_whitening']

def augmentation(X, Y, transformation):
    """
    Image augmentation.
    :param: original_images - the images to applied transformations on.
    :param: transformation - the standard transformation to apply.
    :return: the transformed dataset.
    """
    print('Applying image augmentation({})...'.format(transformation))
    
    data_generator = None
    
    #transformed_images = np.zeros_like(original_images)
    transformed_images = []
    transformed_labels = []
        
    if transformation == 'samplewise_std_norm':
        data_generator = ImageDataGenerator(samplewise_center=True, 
                                            samplewise_std_normalization=True)
    elif transformation == 'feature_std_norm':
        data_generator = ImageDataGenerator(featurewise_center=True, 
                                            featurewise_std_normalization=True)
    elif transformation == 'zca_whitening':
        data_generator = ImageDataGenerator(zca_whitening=True)
    elif transformation == 'pca_whitening':
        raise NotImplementedError('{} is not ready yet.'.format(transformation))
    else:
        raise ValueError('{} is not supported.'.format(transformation))
    
    # fit parameters from data
    data_generator.fit(X)
    
    for X_batch, Y_batch in data_generator.flow(X, Y, batch_size=64):
        #cv2.imshow(transformation, X_batch[0])
        for i in range(X.shape[0]):
            transformed_images.append(X_batch[i])
            transformed_labels.append(Y_batch[i])
        break
        
    print('Applied transformation {}.'.format(transformation))
        
    # for debugging
    if debug:
        for i in range(5):
            cv2.imshow('clean image({})'.format(np.argmax(Y[i])), X[i].reshape(IMG_ROW, IMG_COL))
            cv2.waitKey(0)
            cv2.destroyAllWindows()

            cv2.imshow('{}({})'.format(transformation, np.argmax(transformed_labels[i])), transformed_images[i])
            cv2.waitKey(0)
            cv2.destroyAllWindows()
    return (transformed_images, transformed_labels)

# for debugging
if debug:
    (X_train, Y_train), _ = load_mnist()
    X_train = X_train[:5]
    Y_train = Y_train[:5]
    augmentation(X_train, Y_train, 'samplewise_std_norm')

In [None]:
"""
Evaluate a model
@author: Ying Meng (y.meng201011(at)gmail(dot)com)
"""
def evaluate(model_name, X, Y):
    """
    Evaluate given model.
    :param: model - the model to evaluate.
    :param: X - the test set.
    :param: Y - the desired labels associated with test examples.
    :return: test accuracy, 
            average confidence for correctly classified examples, 
            average confidence for misclassified examples.
    """
    correct_cnt = 0
    nb_examples = 0
    
    test_acc = 0.
    conf = 0.
    conf_misclassified = 0.
    
    model = load_model('data/{}'.format(model_name))
    
    pred_probs = model.predict(X, batch_size=BATCH_SIZE)
    
    # iterate over test set
    for pred_prob, true_prob in zip(pred_probs, Y):
        nb_examples += 1
        
        pred_label = np.argmax(pred_prob)
        true_label = np.argmax(true_prob)
        
        if (pred_label == true_label):
            correct_cnt += 1
            conf += np.max(pred_prob)
        else:
            conf_misclassified += np.max(pred_prob)
    
    # test accuracy
    test_acc = (1.0 * correct_cnt) / nb_examples
    
    # average confidece for correctly classified examples
    avg_conf = conf / correct_cnt
    
    # average confidence for misclassified examples
    avg_conf_misclassified = conf_misclassified / (nb_examples - correct_cnt)
    
    return test_acc, avg_conf, avg_conf_misclassified    

In [None]:
"""
Evaluate an attack
@author: Ying Meng (y.meng201011(at)gmail(dot)com)
"""
def evaluate_attack(model_name, adv_file_name, transformation):
    """
    Evaluate attack approach
    :param: model_name - name of the target model.
    :param: X - the original test set.
    :param: Y - the desired labels associated with test examples.
    :param: adv_file_name - name of the adversarial example file.
    :return: test accuracy,
            average confidence,
            error rate
    """
    # load model
    print('loading model {}...'.format(model_name))
    model = load_model('data/{}'.format(model_name))
    
    cnt_correct = 0
    nb_legitimates = 0
    cnt_miss = 0
    
    test_acc = 0.
    conf = 0.
    err_rate = 0.
    
    print('loading adversarial examples {}...'.format(adv_file_name))
    X = np.load('data/orig_{}'.format(adv_file_name))
    X_adv = np.load('data/adv_{}'.format(adv_file_name))
    Y = np.load('data/label_{}'.format(adv_file_name))
    Y_trans = []
    
    print(X.shape, X_adv.shape, Y.shape)
    
    print('Evaluating {} on {}-{}...'.format(model_name, transformation, adv_file_name))
    
    if (transformation in M_ROTATE):
        X = rotate(X, transformation)
        X_adv = rotate(X_adv, transformation)
    elif (transformation in M_FLIP):
        X = flip(X, transformation)
        X_adv = flip(X_adv, transformation)
    elif (transformation in M_SHIFT):
        X = shift(X, transformation)
        X_adv = shift(X_adv, transformation)
    elif (transformation in M_AFFINE_TRANS):
        X = affine_trans(X, transformation)
        X_adv = affine_trans(X_adv, transformation)
    elif (transformation in M_MORPH_TRANS):
        X = morph_trans(X, transformation)
        X_adv = morph_trans(X_adv, transformation)
    elif (transformation in M_IMG_AUGMNT):
        # TODO: after applied augmentations, the data were shuffled
        (X, Y) = augmentation(X, Y, transformation)
        (X_adv, Y_trans) = augmentation(X_adv, Y_trans, transformation)
    
    pred_probs = model.predict(X, batch_size=BATCH_SIZE)
    pred_probs_adv = model.predict(X_adv, batch_size=BATCH_SIZE)
    
    _, acc_original = model.evaluate(X, Y, batch_size=BATCH_SIZE, verbose=0)
    _, acc_adv = model.evaluate(X_adv, Y, batch_size=BATCH_SIZE, verbose=0)
    
    print(acc_original, acc_adv)
    
    for pred_prob, pred_prob_adv, true_prob in zip(pred_probs, pred_probs_adv, Y):
        pred_label = np.argmax(pred_prob)
        pred_label_adv = np.argmax(pred_prob_adv)
        true_label = np.argmax(true_prob)
        
        if (pred_label == true_label):
            nb_legitimates += 1
            if (pred_label_adv != pred_label):
                conf += np.max(pred_prob_adv)
                cnt_miss += 1
            else:
                cnt_correct += 1
              
    # error rate
    miss_original = 1 - acc_original
    miss_adv = 1 - acc_adv
    err_rate = miss_adv - miss_original
    
    # average confidece for examples successfully attacked
    avg_conf = conf / cnt_miss
        
    return acc_original, acc_adv, err_rate, avg_conf

In [None]:
"""
Training models: One over each transformed dataset.
Then evaluating each trained models.
@author: Ying Meng (y.meng201011(at)gmail(dot)com)
"""
def training():
    # training models
    for transformation in tqdm(M):
        model_name = 'mnist_cnn_{}.h5'.format(transformation)
        print("Training model {}...".format(model_name))
        
        (X_train, Y_train), _ = load_mnist()

        if (transformation in M_ROTATE):
            X_train = rotate(X_train, transformation)
        elif (transformation in M_FLIP):
            X_train = flip(X_train, transformation)
        elif (transformation in M_SHIFT):
            X_train = shift(X_train, transformation)
        elif (transformation in M_AFFINE_TRANS):
            X_train = affine_trans(X_train, transformation)
        elif (transformation in M_MORPH_TRANS):
            X_train = morph_trans(X_train, transformation)
        elif (transformation in M_IMG_AUGMNT):
            (X_train, Y_train) = augmentation(X_train, Y_train, transformation)
        
        # train the model
        train(X_train, Y_train, model_name=model_name)
        
        print("---------------------------")
        print()

In [None]:
"""
Crafting adversarial examples.
@author: Ying Meng (y.meng201011(at)gmail(dot)com)
"""
def crafting_adversarial_examples():
    (X_train, Y_train), (X_test, Y_test) = load_mnist()
    
    for attack in tqdm(attacks):
        #generate_adversarial('mnist_cnn_clean.h5', X_train, Y_train, attack)
        generate_adversarial('mnist_cnn_clean.h5', X_test, Y_test, attack)

In [None]:
"""
Evaluating adversarial attacks.
@author: Ying Meng (y.meng201011(at)gmail(dot)com)
"""
def evaluating_attacks():
    acc_models = []
    acc_advs = []
    ave_confs = []
    error_rates = []
    
    # def evaluate_attack(model_name, X, Y, adv_file_name):
    for attack in tqdm(attacks):
        adv_file_name = 'mnist_cnn_clean_{}_eps{}.npy'.format(attack, int(eps * 100))

        for trans in tqdm(M):
            model_name = 'mnist_cnn_{}.h5'.format(trans)
            acc, acc_adv, err_rate, conf = evaluate_attack(model_name, adv_file_name, trans)

            acc_models.append(acc)
            acc_advs.append(acc_adv)
            ave_confs.append(conf)
            error_rates.append(err_rate)

    return acc_models, acc_advs, error_rates, ave_confs

In [None]:
"""
Evaluate models
@author: Ying Meng (y.meng201011(at)gmail(dot)com)
"""
def evaluating():
    test_accuracies = []
    ave_confs = []
    ave_confs_misclassified = []
    
    # training models
    for transformation in tqdm(M):
        model_name = 'mnist_cnn_{}.h5'.format(transformation)
        print("Evaluating model {}...".format(model_name))
        
        _, (X_test, Y_test) = load_mnist()

        if (transformation in M_ROTATE):
            X_test = rotate(X_test, transformation)
        elif (transformation in M_FLIP):
            X_test = flip(X_test, transformation)
        elif (transformation in M_SHIFT):
            X_test = shift(X_test, transformation)
        elif (transformation in M_AFFINE_TRANS):
            X_test = affine_trans(X_test, transformation)
        elif (transformation in M_MORPH_TRANS):
            X_test = morph_trans(X_test, transformation)
        elif (transformation in M_IMG_AUGMNT):
            (X_test, Y_test) = augmentation(X_test, Y_test, transformation)
        
        # evaluate trained model
        acc, conf, conf_misclassified = evaluate(model_name, X_test, Y_test)

        test_accuracies.append(acc)
        ave_confs.append(conf)
        ave_confs_misclassified.append(conf_misclassified)

    return test_accuracies, ave_confs, ave_confs_misclassified

In [None]:
# ---------------------------
# Automating experiments.
# 1. train and save models
# 
# You only need to run this once per model structure, dataset.
# ---------------------------
training()

In [None]:
# ---------------------------
# Automating experiments.
# 2. craft and save adversarial examples
#
# You only need to run this once per targeted model, attack approach.
# ---------------------------
crafting_adversarial_examples()

In [None]:
# ---------------------------
# Automating experiments.
# 3. evaluate trained models
# 4. evaluate attacks
#
# ---------------------------
acc, confs, confs_misclassified = evaluating()
acc_models, acc_adv, error_rates, confs_adv = evaluating_attacks()

# print reports
pd.options.display.float_format = '{:.4f}'.format
df = pd.DataFrame(data = {
    'Model': M,
    'Acc(model)': acc,
    'Conf(correct)': confs,
    'Conf(miss)': confs_misclassified,
    'Acc(fooled)': acc_adv,
    'Conf(fooled)': confs_adv,
    'Error Rate(eps = 0.25)': error_rates
})

In [None]:
# ---------------------------
# Automating experiments.
# 5. export collected data to a csv file
# 6. print the report
#
# ---------------------------
df.to_csv('data/results.csv', sep=',', encoding='utf-8')
df