# Training ResNet56 Over CIFAR10 dataset
Reference : [https://keras.io/examples/cifar10_resnet/](https://keras.io/examples/cifar10_resnet/)

In [None]:
import keras
import os
from skimage import io
import numpy as np
from keras import backend as K
from keras.datasets import cifar10
from keras.models import Model, Sequential
from keras.layers import Dense, Dropout, Flatten, Input, AveragePooling2D, merge, Activation
from keras.layers import Conv2D, MaxPooling2D, BatchNormalization, DepthwiseConv2D
from keras.layers import Concatenate
from keras.models import load_model
from keras.optimizers import SGD, Adam, RMSprop
from keras.preprocessing.image import ImageDataGenerator
from keras.callbacks import ReduceLROnPlateau, ModelCheckpoint, EarlyStopping, LearningRateScheduler, CSVLogger
from keras.callbacks import Callback
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split
import tensorflow as tf
from keras import backend as k
import copy
import time

In [None]:
# Training parameters
batch_size = 32  # orig paper trained all networks with batch_size=128
epochs = 200
data_augmentation = True
num_classes = 10

# Subtracting pixel mean improves accuracy
subtract_pixel_mean = True

# CCP Augmented Training
ccp_augment = False #For augmented CCP attack training dataset, change ccp_augment=False to ccp_augment=True

# Preprocessing Cifar10 Dataset

In [None]:
# CCP ATTACK FUNCTIONS
def change_brightness(image, alpha, beta):
  new_image = np.zeros(image.shape, np.int64)
  new_image = np.clip( alpha*image + beta, 0, 255)
  return new_image
	
def CCP_Attack_Brightness(image, transform):
	img = copy.copy(image)
	for channel in range(img.shape[2]):
		temp1 = image[:,:,0]
		temp2 = image[:,:,1]
		temp3 = image[:,:,2]

		temp = temp1 * transform[channel][0] + temp2 * transform[channel][1] + temp3 * transform[channel][2]

		img[:,:,channel] = temp/3

	img1 = change_brightness(img, 2, 0)
	return img1

In [None]:
# Load the CIFAR10 data.
(x_train, y_train), (x_test, y_test) = cifar10.load_data()

# Input image dimensions.
input_shape = x_train.shape[1:]

# CCP Augmentation
if ccp_augment:
  print('CCP augmented included!')
  # Copy Dataset
  x_train_trans = copy.copy(x_train)

  # Intializing at each transformation [0,1.0]
  a = np.random.uniform(low=0.0, high=1.0, size=(3,))
  b = np.random.uniform(low=0.0, high=1.0, size=(3,))
  c = np.random.uniform(low=0.0, high=1.0, size=(3,))
  transform = np.array([a,b,c]) 


  for test in range(len(x_train)):
    
    ## Comment or Uncomment this for Fixed or Variable settings respectively.
    a = np.random.uniform(low=0.0, high=1.0, size=(3,))
    b = np.random.uniform(low=0.0, high=1.0, size=(3,))
    c = np.random.uniform(low=0.0, high=1.0, size=(3,))
    transform = np.array([a,b,c])

    ## Copy Image
    img = copy.copy(x_train[test])

    ## CCP Attack
    x_train_trans[test] = CCP_Attack_Brightness(img, transform)

  # Including augmented training dataset, i.e., 1,00,000 training images total
  x_train = np.vstack((x_train,x_train_trans))
  y_train = np.vstack((y_train,y_train))
    


# Normalize data.
x_train = x_train.astype('float32') / 255
x_test = x_test.astype('float32') / 255

# If subtract pixel mean is enabled
if subtract_pixel_mean:
    x_train_mean = np.mean(x_train, axis=0)
    x_train -= x_train_mean
    x_test -= x_train_mean

print('x_train shape:', x_train.shape)
print(x_train.shape[0], 'train samples')
print(x_test.shape[0], 'test samples')
print('y_train shape:', y_train.shape)

# Convert class vectors to binary class matrices.
y_train = keras.utils.to_categorical(y_train, num_classes)
y_test = keras.utils.to_categorical(y_test, num_classes)

# Model Building

In [None]:
def resnet_layer(inputs,
                 num_filters=16,
                 kernel_size=3,
                 strides=1,
                 activation='relu',
                 batch_normalization=True,
                 conv_first=True):
    """2D Convolution-Batch Normalization-Activation stack builder
    # Arguments
        inputs (tensor): input tensor from input image or previous layer
        num_filters (int): Conv2D number of filters
        kernel_size (int): Conv2D square kernel dimensions
        strides (int): Conv2D square stride dimensions
        activation (string): activation name
        batch_normalization (bool): whether to include batch normalization
        conv_first (bool): conv-bn-activation (True) or
            bn-activation-conv (False)
    # Returns
        x (tensor): tensor as input to the next layer
    """
    conv = Conv2D(num_filters,
                  kernel_size=kernel_size,
                  strides=strides,
                  padding='same',
                  kernel_initializer='he_normal',
                  kernel_regularizer=l2(1e-4))

    x = inputs
    if conv_first:
        x = conv(x)
        if batch_normalization:
            x = BatchNormalization()(x)
        if activation is not None:
            x = Activation(activation)(x)
    else:
        if batch_normalization:
            x = BatchNormalization()(x)
        if activation is not None:
            x = Activation(activation)(x)
        x = conv(x)
    return x

In [None]:
def ResNet56(input_shape, num_classes=10):
    """
    Stacks of 2 x (3 x 3) Conv2D-BN-ReLU
    Last ReLU is after the shortcut connection.
    At the beginning of each stage, the feature map size is halved (downsampled)
    by a convolutional layer with strides=2, while the number of filters is
    doubled. Within each stage, the layers have the same number filters and the
    same number of filters.
    Features maps sizes:
    stage 0: 32x32, 16
    stage 1: 16x16, 32
    stage 2:  8x8,  64
    # Arguments
        input_shape (tensor): shape of input image tensor
        depth (int): number of core convolutional layers
        num_classes (int): number of classes (CIFAR10 has 10)
    # Returns
        model (Model): Keras model instance
    """

    # Start model definition.
    depth = 56
    num_filters = 16
    num_res_blocks = int((depth - 2) / 6)

    inputs = Input(shape=input_shape)
    x = resnet_layer(inputs=inputs)
    # Instantiate the stack of residual units
    for stack in range(3):
        for res_block in range(num_res_blocks):
            strides = 1
            if stack > 0 and res_block == 0:  # first layer but not first stack
                strides = 2  # downsample
            y = resnet_layer(inputs=x,
                             num_filters=num_filters,
                             strides=strides)
            y = resnet_layer(inputs=y,
                             num_filters=num_filters,
                             activation=None)
            if stack > 0 and res_block == 0:  # first layer but not first stack
                # linear projection residual shortcut connection to match
                # changed dims
                x = resnet_layer(inputs=x,
                                 num_filters=num_filters,
                                 kernel_size=1,
                                 strides=strides,
                                 activation=None,
                                 batch_normalization=False)
            x = keras.layers.add([x, y])
            x = Activation('relu')(x)
        num_filters *= 2

    # Add classifier on top.
    # v1 does not use BN after last shortcut connection-ReLU
    x = AveragePooling2D(pool_size=8)(x)
    y = Flatten()(x)
    outputs = Dense(num_classes,
                    activation='softmax',
                    kernel_initializer='he_normal')(y)

    # Instantiate model.
    model = Model(inputs=inputs, outputs=outputs)
    return model

In [None]:
model = ResNet56(input_shape=input_shape)
model.summary()

# Preparing Model for Compiling and Training

In [None]:
def lr_schedule(epoch):
    """Learning Rate Schedule
    Learning rate is scheduled to be reduced after 80, 120, 160, 180 epochs.
    Called automatically every epoch as part of callbacks during training.
    # Arguments
        epoch (int): The number of epochs
    # Returns
        lr (float32): learning rate
    """
    lr = 1e-3
    if epoch > 180:
        lr *= 0.5e-3
    elif epoch > 160:
        lr *= 1e-3
    elif epoch > 120:
        lr *= 1e-2
    elif epoch > 80:
        lr *= 1e-1
    print('Learning rate: ', lr)
    return lr

In [None]:
model.compile(loss='categorical_crossentropy',
              optimizer=Adam(learning_rate=lr_schedule(0)),
              metrics=['accuracy'])

In [None]:
# Prepare model model saving directory.
save_dir = os.path.join(os.getcwd(), 'saved_models')
model_name = 'cifar10_%s_model.{epoch:03d}.h5' % model_type
if not os.path.isdir(save_dir):
    os.makedirs(save_dir)
filepath = os.path.join(save_dir, model_name)

In [None]:
# Prepare callbacks for model saving and for learning rate adjustment.
checkpoint = ModelCheckpoint(filepath=filepath,
                             monitor='val_accuracy',
                             verbose=1,
                             save_best_only=True)

lr_scheduler = LearningRateScheduler(lr_schedule)

lr_reducer = ReduceLROnPlateau(factor=np.sqrt(0.1),
                               cooldown=0,
                               patience=5,
                               min_lr=0.5e-6)

callbacks = [checkpoint, lr_reducer, lr_scheduler]

# Training Model

In [None]:
# Run training, with or without data augmentation.
if not data_augmentation:
    print('Not using data augmentation.')
    model.fit(x_train, y_train,
              batch_size=batch_size,
              epochs=epochs,
              validation_data=(x_test, y_test),
              shuffle=True,
              callbacks=callbacks)
else:
    print('Using real-time data augmentation.')
    # This will do preprocessing and realtime data augmentation:
    datagen = ImageDataGenerator(
        # set input mean to 0 over the dataset
        featurewise_center=False,
        # set each sample mean to 0
        samplewise_center=False,
        # divide inputs by std of dataset
        featurewise_std_normalization=False,
        # divide each input by its std
        samplewise_std_normalization=False,
        # apply ZCA whitening
        zca_whitening=False,
        # epsilon for ZCA whitening
        zca_epsilon=1e-06,
        # randomly rotate images in the range (deg 0 to 180)
        rotation_range=0,
        # randomly shift images horizontally
        width_shift_range=0.1,
        # randomly shift images vertically
        height_shift_range=0.1,
        # set range for random shear
        shear_range=0.,
        # set range for random zoom
        zoom_range=0.,
        # set range for random channel shifts
        channel_shift_range=0.,
        # set mode for filling points outside the input boundaries
        fill_mode='nearest',
        # value used for fill_mode = "constant"
        cval=0.,
        # randomly flip images
        horizontal_flip=True,
        # randomly flip images
        vertical_flip=False,
        # set rescaling factor (applied before any other transformation)
        rescale=None,
        # set function that will be applied on each input
        preprocessing_function=None,
        # image data format, either "channels_first" or "channels_last"
        data_format=None,
        # fraction of images reserved for validation (strictly between 0 and 1)
        validation_split=0.0)

    # Compute quantities required for featurewise normalization
    # (std, mean, and principal components if ZCA whitening is applied).
    datagen.fit(x_train)

    # Fit the model on the batches generated by datagen.flow().
    model.fit_generator(datagen.flow(x_train, y_train, batch_size=batch_size),
                        validation_data=(x_test, y_test),
                        epochs=epochs, verbose=1, workers=4,
                        callbacks=callbacks)

Evaluate Model

In [None]:
# Score trained model.
scores = model.evaluate(x_test, y_test, verbose=1)
print('Test loss:', scores[0])
print('Test accuracy:', scores[1])

# Testing on CCP Attack
### Testing the CIFAR10 test dataset with s=2 and b=0
#### (CCP_F and CCP_V)

In [None]:
# Load Model
import keras
new_model = keras.models.load_model(filepath)

In [None]:
# CCP ATTACK FUNCTIONS
def change_brightness(image, alpha, beta):
  new_image = np.zeros(image.shape, np.int64)
  new_image = np.clip( alpha*image + beta, 0, 255)
  return new_image
	
def CCP_Attack_Brightness(image, transform):
	img = copy.copy(image)
	for channel in range(img.shape[2]):
		temp1 = image[:,:,0]
		temp2 = image[:,:,1]
		temp3 = image[:,:,2]

		temp = temp1 * transform[channel][0] + temp2 * transform[channel][1] + temp3 * transform[channel][2]

		img[:,:,channel] = temp/3

	img1 = change_brightness(img, 2, 0)
	return img1

In [None]:
# Copy Dataset
x_test_trans = copy.copy(x_test)


# Load the CIFAR10 data.
(x_train, y_train), (x_test, y_test) = cifar10.load_data()

# Copy Dataset
x_test_trans = copy.copy(x_test)

# Intializing at each transformation [0,1.0]
a = np.random.uniform(low=0.0, high=1.0, size=(3,))
b = np.random.uniform(low=0.0, high=1.0, size=(3,))
c = np.random.uniform(low=0.0, high=1.0, size=(3,))
transform = np.array([a,b,c]) 


for test in range(len(x_test)):
  
  ## Comment or Uncomment below for Fixed or Variable settings respectively.
  a = np.random.uniform(low=0.0, high=1.0, size=(3,))
  b = np.random.uniform(low=0.0, high=1.0, size=(3,))
  c = np.random.uniform(low=0.0, high=1.0, size=(3,))
  transform = np.array([a,b,c])

  ## Copy Image
  img = copy.copy(x_test[test])

  ## CCP Attack
  x_test_trans[test] = CCP_Attack_Brightness(img, transform)
  
    


# Input image dimensions.
input_shape = x_train.shape[1:]

# Normalize data.
x_train = x_train.astype('float32') / 255
x_test = x_test.astype('float32') / 255
x_test_trans = x_test_trans.astype('float32') / 255

# If subtract pixel mean is enabled
if subtract_pixel_mean:
    x_train_mean = np.mean(x_train, axis=0)
    x_train -= x_train_mean
    x_test -= x_train_mean
    x_test_trans -= x_train_mean

# Convert class vectors to binary class matrices.
y_test = keras.utils.to_categorical(y_test, num_classes)

# Evaluate Model
scores = new_model.evaluate(x_test_trans, y_test, verbose=1)
print('Test loss:', scores[0])
print('Test accuracy:', scores[1])