# Kaggle notebook for training 

In [1]:
import tensorflow as tf
import numpy as np 
from tensorflow.keras.callbacks import  EarlyStopping
from cleverhans.tf2.attacks.projected_gradient_descent import projected_gradient_descent
from tensorflow.keras.datasets import cifar10
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Conv2D, AveragePooling2D, BatchNormalization, Dropout, Input, Activation, Add, Dense, Flatten
from tensorflow.keras.regularizers import l2
from tensorflow.keras import backend as K


2024-09-23 19:13:39.345436: E external/local_xla/xla/stream_executor/cuda/cuda_dnn.cc:9261] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered
2024-09-23 19:13:39.345566: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:607] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered
2024-09-23 19:13:39.482270: E external/local_xla/xla/stream_executor/cuda/cuda_blas.cc:1515] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered


In [2]:
strategy = tf.distribute.MirroredStrategy()

In [3]:
(X_train, y_train), (X_test, y_test) = cifar10.load_data()
X_train = X_train.astype('float32')
X_test = X_test.astype('float32')

mean_cifar = np.mean(X_train/255.,axis = (0,1,2))
std_cifar = np.std(X_train/255.,axis = (0,1,2))

res = tf.keras.layers.Rescaling(1/255)
norm = tf.keras.layers.Normalization(mean=mean_cifar, variance=std_cifar**2)


Downloading data from https://www.cs.toronto.edu/~kriz/cifar-10-python.tar.gz
[1m170498071/170498071[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m4s[0m 0us/step


# IG adversarial training model

In [5]:
class CustomModel(tf.keras.Model):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.loss_tracker = tf.keras.metrics.SparseCategoricalCrossentropy(name="loss")
#         self.mae_metric = tf.keras.metrics.MeanAbsoluteError(name="mae")
        self.sparscat_metric=tf.keras.metrics.SparseCategoricalAccuracy(name='sparse_categorical_accuracy')
 #       self.delta = tf.Variable(tf.zeros([32,224,224,3]))
    

    
    def train_step(self, data):
        # Unpack the data. Its structure depends on your model and
        # on what you pass to fit().
        x, y = data
        y = tf.cast(y,tf.int32)
        x = tf.cast(x,tf.float32)
        eps=8.0
        m = 4
        alpha = eps/m
        g_adv = tf.zeros_like(x)
        for i in range(m):
            with tf.GradientTape() as tape:
                tape.watch(x)
                predictions = self(x, training = True) 
                loss = self.compute_loss(y=y,y_pred=predictions)
            delta_mig = self.IG(x,y)
            g_adv = g_adv + (delta_mig/tf.norm(delta_mig,1))
            grad = tape.gradient(loss, self.trainable_variables)
            x = x - alpha*tf.sign(g_adv) # (+) overfit (-) regularization
            x = tf.clip_by_value(x, x-eps, x+eps)
        

            self.optimizer.apply_gradients(zip(grad, self.trainable_variables))

            for metric in self.metrics:
                metric.update_state(y, predictions)
            # Return a dict mapping metric names to current value
            result= {m.name: m.result() for m in self.metrics}
         
        return result
    
    
    @tf.function
    def IG(self, images, labels):
        # interpolated intensities
        alphas_x = tf.linspace(0.0,1.0,16)

        # standardize input
        #images = tf.cast(images,'float32')
        labels = tf.squeeze(labels)
        #labels = tf.cast(labels,tf.int32)

        # calculate interpolated images
        delta = tf.expand_dims(images,1)
        interpolated = delta * alphas_x

        # calculate gradient for each image
        grads = tf.TensorArray(tf.float32,400)
        #grads = []
        i = 0
        #print(labels.dtype)
        for inter in interpolated:
            with tf.GradientTape() as tape:
                tape.watch(inter)
                probs = self(inter)[:,labels[i]]
            grads = grads.write(i,tape.gradient(probs,inter))
            #grads.append(tape.gradient(probs,inter))
            i+= 1

        grads = grads.stack()
        #grads = tf.convert_to_tensor(grads, dtype=tf.float32)
        # aproximate integration using remain trapoziod
        grads = (grads[:,:-1] + grads[:,1:]) / tf.constant(2.0)
        avg_gradients = tf.math.reduce_mean(grads, axis=1)

        integrated_gradient = images * avg_gradients

        return integrated_gradient
    
    @property
    def metrics(self):
        # We list our Metric objects here so that reset_states() can be
        # called automatically at the start of each epoch
        # or at the start of evaluate().
        return [ self.loss_tracker,self.sparscat_metric]
    def test_step(self, data):
        # Unpack the data
        x, y = data
        # Compute predictions
        y_pred = self(x, training=False)
        # Updates the metrics tracking the loss
        self.compute_loss(y=y, y_pred=y_pred)
        # Update the metrics.
        for metric in self.metrics:
                metric.update_state(y, y_pred)
        # Return a dict mapping metric names to current value.
        # Note that it will include the loss (tracked in self.metrics).
        return {m.name: m.result() for m in self.metrics}

# WideResNet Model 

In [6]:
image_size = 32
if K.image_data_format() == "th":
    channel_axis = 1
    input_shape = (3, image_size, image_size)
else:
    channel_axis = -1
    input_shape = (image_size, image_size, 3)

def _wide_basic(n_input_plane, n_output_plane, stride, weight_decay = 0.0005,
                dropout_probability = 0, weight_init = "random_uniform", use_bias = False):
    def f(net):
        # format of conv_params:
        #               [ [nb_col="kernel width", nb_row="kernel height",
        #               subsample="(stride_vertical,stride_horizontal)",
        #               border_mode="same" or "valid"] ]
        # B(3,3): orignal <<basic>> block
        conv_params = [ [3,3,stride,"same"],
                        [3,3,(1,1),"same"] ]
        
        n_bottleneck_plane = n_output_plane

        # Residual block
        for i, v in enumerate(conv_params):
            if i == 0:
                if n_input_plane != n_output_plane:
                    net = BatchNormalization(axis=channel_axis)(net)
                    net = Activation("relu")(net)
                    convs = net
                else:
                    convs = BatchNormalization(axis=channel_axis)(net)
                    convs = Activation("relu")(convs)
                convs = Conv2D(n_bottleneck_plane, 
                               (v[0],v[1]),
                                strides=v[2],
                                padding=v[3],
                                kernel_initializer=weight_init,
                                kernel_regularizer=l2(weight_decay),
                                use_bias=use_bias)(convs)
            else:
                convs = BatchNormalization(axis=channel_axis)(convs)
                convs = Activation("relu")(convs)
                if dropout_probability > 0:
                    convs = Dropout(dropout_probability)(convs)
                convs = Conv2D(n_bottleneck_plane, 
                               (v[0],v[1]),
                                strides=v[2],
                                padding=v[3],
                                kernel_initializer=weight_init,
                                kernel_regularizer=l2(weight_decay),
                                use_bias=use_bias)(convs)

        # Shortcut Conntection: identity function or 1x1 convolutional
        #  (depends on difference between input & output shape - this
        #   corresponds to whether we are using the first block in each
        #   group; see _layer() ).
        if n_input_plane != n_output_plane:
            shortcut = Conv2D(n_output_plane, 
                              (1,1),
                              strides=stride,
                              padding="same",
                              kernel_initializer=weight_init,
                              kernel_regularizer=l2(weight_decay),
                              use_bias=use_bias)(net)
        else:
            shortcut = net

        return Add()([convs, shortcut])
    
    return f

# "Stacking Residual Units on the same stage"
def _layer(block, n_input_plane, n_output_plane, count, stride):
    def f(net):
        net = block(n_input_plane, n_output_plane, stride)(net)
        for i in range(2,int(count+1)):
            net = block(n_output_plane, n_output_plane, stride=(1,1))(net)
        return net
    
    return f


def WideResNet(depth, width, input_shape, nb_classes, weight_decay = 0.0005,
                  dropout_probability = 0, weight_init = "random_uniform", use_bias = False):
    assert((depth - 4) % 6 == 0)
    n = (depth - 4) / 6
    
    inputs = Input(shape=input_shape)

    n_stages=[16, 16*width, 32*width, 64*width]


    conv1 = Conv2D(n_stages[0], 
                    (3, 3), 
                    strides=1,
                    padding="same",
                    kernel_initializer=weight_init,
                    kernel_regularizer=l2(weight_decay),
                    use_bias=use_bias)(inputs) # "One conv at the beginning (spatial size: 32x32)"

    # Add wide residual blocks
    block_fn = _wide_basic
    conv2 = _layer(block_fn, n_input_plane=n_stages[0], n_output_plane=n_stages[1], count=n, stride=(1,1))(conv1)# "Stage 1 (spatial size: 32x32)"
    conv3 = _layer(block_fn, n_input_plane=n_stages[1], n_output_plane=n_stages[2], count=n, stride=(2,2))(conv2)# "Stage 2 (spatial size: 16x16)"
    conv4 = _layer(block_fn, n_input_plane=n_stages[2], n_output_plane=n_stages[3], count=n, stride=(2,2))(conv3)# "Stage 3 (spatial size: 8x8)"

    batch_norm = BatchNormalization(axis=channel_axis)(conv4)
    relu = Activation("relu")(batch_norm)
                                            
    # Classifier block
    pool = AveragePooling2D(pool_size=(8, 8), strides=(1, 1), padding="same")(relu)
    flatten = Flatten()(pool)
    predictions = Dense(units=nb_classes, kernel_initializer=weight_init, use_bias=use_bias,
                        kernel_regularizer=l2(weight_decay), activation="softmax")(flatten)

    model = Model(inputs=inputs, outputs=predictions)
    return model

In [7]:
# lr_schedule = [30, 50, 70] # epoch_step
lr_schedule = [5, 15, 22] # epoch_step
def schedule(epoch_idx):
    return 0.001 - epoch_idx * 0.0000396 # linear schedule

with strategy.scope():
    inputs = tf.keras.layers.Input((32, 32, 3))
    r = res(inputs)
    pre = norm(r)
    model = WideResNet(40,2, input_shape, 10)
    outputs = model(pre)
    WRN = CustomModel(inputs = inputs, outputs = outputs)
    WRN.compile(optimizer=tf.keras.optimizers.AdamW(learning_rate=0.01,weight_decay=1e-3),loss = "sparse_categorical_crossentropy",metrics=['sparse_categorical_accuracy'])

In [8]:
cb = [EarlyStopping(patience=15, monitor='val_loss', mode='auto' ,restore_best_weights=True,min_delta = 0.01,verbose = True)
     ,LearningRateScheduler(schedule=schedule)]

#WRN.fit(X_train, y_train, validation_data = (X_test, y_test), callbacks = cb ,verbose = True, epochs = 200 , batch_size= 64 )

WRN.fit(X_train, y_train, validation_data = (X_test, y_test), callbacks = cb ,verbose = True, epochs = 25 , batch_size= 400 )

WRN.save('saving/cifar10.h5')

Epoch 1/25
[1m125/125[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1629s[0m 12s/step - loss: 1.8303 - sparse_categorical_accuracy: 0.3420 - val_loss: 1.6187 - val_sparse_categorical_accuracy: 0.4207 - learning_rate: 0.0010
Epoch 2/25
[1m125/125[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1537s[0m 12s/step - loss: 1.5285 - sparse_categorical_accuracy: 0.4319 - val_loss: 1.4637 - val_sparse_categorical_accuracy: 0.4793 - learning_rate: 9.6040e-04
Epoch 3/25
[1m125/125[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1537s[0m 12s/step - loss: 1.4186 - sparse_categorical_accuracy: 0.4754 - val_loss: 1.1852 - val_sparse_categorical_accuracy: 0.5919 - learning_rate: 9.2080e-04
Epoch 4/25
[1m125/125[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1537s[0m 12s/step - loss: 1.3060 - sparse_categorical_accuracy: 0.5201 - val_loss: 1.0483 - val_sparse_categorical_accuracy: 0.6307 - learning_rate: 8.8120e-04
Epoch 5/25
[1m125/125[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1537s[0m 

# Sanity Check: evaluate the accuracy for one batch

In [10]:
test_dataset = tf.data.Dataset.from_tensor_slices((X_test, y_test))
test_dataset = test_dataset.batch(400).prefetch(tf.data.AUTOTUNE)

In [11]:
images, labels = next(iter(test_dataset))
images = tf.cast(images, tf.float32)
labels = tf.squeeze(labels)
labels = tf.cast(labels,tf.int32)
adv = projected_gradient_descent(WRN,images,4,4/20,20,np.inf,y=labels)
WRN.evaluate(adv,labels)

[1m13/13[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m2s[0m 10ms/step - loss: 1.8569 - sparse_categorical_accuracy: 0.4799


[1.8256205320358276, 0.4975000023841858]

In [12]:
adv = projected_gradient_descent(WRN,images,16,16/20,20,np.inf,y=labels)
WRN.evaluate(adv,labels)

[1m13/13[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 10ms/step - loss: 11.1184 - sparse_categorical_accuracy: 0.0191


[10.969334602355957, 0.014999999664723873]

In [13]:
@tf.function
def IG(model, images, labels, steps = 15):
    # interpolated intensities
    alphas = tf.linspace(0.0,1.0,steps+1)
    alphas_x = alphas[:,tf.newaxis, tf.newaxis, tf.newaxis,tf.newaxis]
    alphas_x = tf.reshape(alphas_x,(1,alphas.shape[0],1,1,1))

    # standardize input
    images = tf.cast(images,'float32')

    # calculate interpolated images
    delta = tf.expand_dims(images,1)
    interpolated = delta * alphas_x
    # calculate gradient for each image
    grads = tf.TensorArray(tf.float32,400)
    i = 0
    for inter in interpolated:
        with tf.GradientTape() as tape:
            tape.watch(inter)
            probs = model(inter)[:,labels[0]]
        grads = grads.write(i,tape.gradient(probs,inter))
        
        i+= 1

    grads = grads.stack()

    # aproximate integration using remain trapoziod
    grads = (grads[:,:-1] + grads[:,1:]) / tf.constant(2.0)
    avg_gradients = tf.math.reduce_mean(grads, axis=1)

    integrated_gradient = images * avg_gradients

    return integrated_gradient

@tf.function
def MIG(model, images, labels, epsilon = 16, iterations = 20):
    g_t = tf.zeros_like(images)
    x_t = images
    eps_step = tf.constant(epsilon/iterations)
    for i in tf.range(iterations):
        # steps = 20 as research
        #labels = tf.argmax(model(x_t) , -1)
        delta_t = IG(model,x_t,labels) 
        g_t = g_t + (delta_t/tf.norm(delta_t,1))
        x_h = x_t - eps_step * tf.sign(g_t)
        x_t = tf.clip_by_value(x_h,x_h-epsilon,x_h+epsilon)
    return tf.clip_by_value(x_t,0,255)

In [14]:
adv_mig = MIG(WRN, images, labels, 16)
WRN.evaluate(adv_mig,labels)

[1m13/13[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 11ms/step - loss: 1.1070 - sparse_categorical_accuracy: 0.7879


[1.0134222507476807, 0.8050000071525574]

In [15]:
adv_mig = MIG(WRN, images, labels, 4)
WRN.evaluate(adv_mig,labels)

[1m13/13[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 10ms/step - loss: 0.5169 - sparse_categorical_accuracy: 0.8290


[0.46495750546455383, 0.8424999713897705]