
# $l^p$-FGSM in TensorFlow: Mitigating Catastrophic Overfitting in Fast Adversarial Training

 TensorFlow implementation of the $l^p$-Fast Gradient Sign Method ($l^p$-FGSM), a novel approach presented in the paper "An $l^p$ Norm Solution to Catastrophic Overfitting in Fast Adversarial Training." This notebook aims to provide a comprehensive and interactive exploration of the techniques and insights introduced in our research.

## Overview

Adversarial training has emerged as a powerful tool for enhancing the robustness of deep neural networks against adversarial attacks. Traditional methods, while effective, often come with considerable computational costs. Fast adversarial training methods like the Fast Gradient Sign Method (FGSM) offer a more efficient alternative. However, they also introduce the challenge of catastrophic overfitting, where models become robust against single-step attacks but remain surprisingly vulnerable to multi-step variants.

Our work focuses on this pivotal issue, examining the prevalence of catastrophic overfitting under different norm constraints. Through empirical analysis, we discovered that catastrophic overfitting is more pronounced under the $ l^\infty $ norm than the $ l^2 $ norm. Building on this insight, we developed the $l^p$-FGSM framework, which generalizes adversarial perturbation creation across various norms. This framework allows a seamless transition from $ l^2 $ to $ l^\infty $ attacks,

In this notebook, we delve into the $l^p$-FGSM method, addressing the challenge of catastrophic overfitting in fast adversarial training and providing a hands-on experience with our proposed solution.

### Key Highlights:
- Implementation of the $l^p$-FGSM method in TensorFlow.
- Detailed exploration of the impact of different $ l^p $ norms on adversarial robustness.


In [1]:
# @title Imports
import math
import numpy as np
import tensorflow as tf
import tensorflow_datasets as tfds

from tensorflow import keras
from tensorflow.keras import layers, models, callbacks, optimizers, utils
from tensorflow.keras.models import load_model
from tensorflow.keras.datasets import cifar10, cifar100
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras.optimizers import Adam, AdamW, SGD
from tensorflow.keras.callbacks import Callback


In [2]:
# Global configuration parameters
dataset_name = "CIFAR10"  # Change this to "SVHN" ,"CIFAR10" "CIFAR100" as needed
batch_size = 128
epochs=30
weight_decay =5e-4  #
init_shape = (32, 32, 3)
activation = "relu"
dropout=0.1

In [3]:
# @title Dataset Loading

seed = 88888888
np.random.seed(seed)  # For NumPy random numbers
tf.random.set_seed(seed)  # For TensorFlow random numbers

# Function to load and preprocess SVHN dataset
def load_svhn():
    (svhn_train, svhn_test), ds_info = tfds.load(
        'svhn_cropped', split=['train', 'test'], as_supervised=True, with_info=True, batch_size=-1)

    svhn_train_images, svhn_train_labels = tfds.as_numpy(svhn_train)
    svhn_test_images, svhn_test_labels = tfds.as_numpy(svhn_test)

    # Normalize and preprocess images
    x_train = svhn_train_images.astype("float32") / 255.0
    x_test = svhn_test_images.astype("float32") / 255.0

    x_train = x_train / np.max(x_train, axis=(1, 2, 3), keepdims=True)
    x_test = x_test / np.max(x_test, axis=(1, 2, 3), keepdims=True)

    # Replace any division-by-zero
    x_train[np.isnan(x_train)] = 0
    x_test[np.isnan(x_test)] = 0

    # One-hot encode labels
    num_classes = ds_info.features['label'].num_classes
    y_train = utils.to_categorical(svhn_train_labels, num_classes)
    y_test = utils.to_categorical(svhn_test_labels, num_classes)

    return (x_train, y_train), (x_test, y_test),num_classes

# Function to load and preprocess CIFAR-10 dataset
def load_cifar10():
    (x_train, y_train), (x_test, y_test) = cifar10.load_data()

    # Normalize the data
    x_train = x_train.astype("float32") / 255.0
    x_test = x_test.astype("float32") / 255.0

    # One-hot encode labels
    num_classes = 10
    y_train = utils.to_categorical(y_train, num_classes)
    y_test = utils.to_categorical(y_test, num_classes)

    return (x_train, y_train), (x_test, y_test),num_classes

# Function to load and preprocess CIFAR-100 dataset
def load_cifar100():
    (x_train, y_train), (x_test, y_test) = cifar100.load_data()

    # Normalize the data
    x_train = x_train.astype("float32") / 255
    x_test = x_test.astype("float32") / 255

    # One-hot encode labels
    num_classes = 100
    y_train = utils.to_categorical(y_train, num_classes)
    y_test = utils.to_categorical(y_test, num_classes)

    return (x_train, y_train), (x_test, y_test),num_classes




# Function to apply custom preprocessing and data augmentation based on the dataset
def preprocessing_(dataset_name, x_train, y_train):
    if dataset_name == "SVHN":
        # Custom preprocessing and data augmentation for SVHN
        datagen = ImageDataGenerator(
            rotation_range=8.0,
            zoom_range=[0.95, 1.05],
            height_shift_range=0.10,
            shear_range=0.15
        )
    elif dataset_name == "CIFAR10":
        # Custom preprocessing and data augmentation for CIFAR-10
        datagen = ImageDataGenerator(
            rotation_range=10,
            width_shift_range=5./32,
            height_shift_range=5./32,
            horizontal_flip=True
        )
    elif dataset_name == "CIFAR100":
        # Custom preprocessing and data augmentation for CIFAR-100
        datagen = ImageDataGenerator(
            rotation_range=10,
            width_shift_range=5./32,
            height_shift_range=5./32,
            horizontal_flip=True
        )
    else:
        raise ValueError("Unknown dataset")

    # Fit the ImageDataGenerator to the training data
    datagen.fit(x_train,seed=seed, augment=True)

    return datagen

# Function to load and preprocess dataset based on name
def load_and_preprocess_dataset(dataset_name):
    if dataset_name == "SVHN":
        (x_train, y_train), (x_test, y_test), num_classes = load_svhn()
    elif dataset_name == "CIFAR10":
        (x_train, y_train), (x_test, y_test),num_classes = load_cifar10()
    elif dataset_name == "CIFAR100":
        (x_train, y_train), (x_test, y_test),num_classes = load_cifar100()
    else:
        raise ValueError("Unknown dataset")

    # Preprocessing and data augmentation
    datagen = preprocessing_(dataset_name, x_train, y_train)

    return datagen, (x_train, y_train), (x_test, y_test),num_classes


datagen, (x_train, y_train), (x_test, y_test), num_classes = load_and_preprocess_dataset(dataset_name)



Downloading data from https://www.cs.toronto.edu/~kriz/cifar-10-python.tar.gz


In [4]:
# @title PreActResNet18


from tensorflow.keras.layers import Input, Conv2D, BatchNormalization, Activation, Add, GlobalAveragePooling2D, Dense, MaxPooling2D, Dropout
from tensorflow.keras.models import Model
from tensorflow.keras.regularizers import l2

def preact_res_block(x, filters, activation='relu', kernel_size=(3, 3), stride=1, weight_decay=0.0, dropout_rate=0.0):
    """
    Creates a pre-activation residual block for PreActResNet.

    Args:
        x: Input tensor or layer.
        filters: Number of filters for the convolution layers.
        activation: Activation function to use.
        kernel_size: Size of the convolution kernel.
        stride: Stride size for the convolution.
        weight_decay: L2 regularization factor.
        dropout_rate: Dropout rate.

    Returns:
        A tensor representing the output of the residual block.
    """
    shortcut = x

    # Applying batch normalization, activation, and convolution twice
    for _ in range(2):
        x = BatchNormalization()(x)
        x = Activation(activation)(x)
        x = Conv2D(filters, kernel_size, strides=stride if _ == 0 else 1, padding='same', kernel_initializer='he_normal', kernel_regularizer=l2(weight_decay))(x)
        if dropout_rate > 0.0:
            x = Dropout(dropout_rate)(x)

    # Adjusting shortcut path for dimensionality matching (if needed)
    if stride != 1 or shortcut.shape[-1] != filters:
        shortcut = Conv2D(filters, (1, 1), strides=stride, kernel_initializer='he_normal', kernel_regularizer=l2(weight_decay))(shortcut)

    x = Add()([shortcut, x])  # Skip connection (element-wise addition)
    return x

def PreActResNet18(input_shape, num_classes=10, activation='relu', weight_decay=0.0, dropout_rate=0.0):
    """
    Constructs a PreActResNet18 model.

    Args:
        input_shape: Shape of the input data.
        num_classes: Number of classes for the output layer.
        activation: Activation function to use in the blocks.
        weight_decay: L2 regularization factor.
        dropout_rate: Dropout rate.

    Returns:
        A PreActResNet18 model.
    """
    input = Input(input_shape)
    x = Conv2D(64, (7, 7), strides=2, padding='same', kernel_initializer='he_normal', kernel_regularizer=l2(weight_decay))(2.0 * (input - 0.5))
    x = MaxPooling2D(pool_size=(3, 3), strides=2, padding='same')(x)

    # Constructing ResNet blocks with specified activation function
    for filters, repetitions, use_strided_conv in zip([64, 128, 256, 512], [2, 2, 2, 2], [False, True, True, True]):
        for i in range(repetitions):
            x = preact_res_block(x, filters, activation, stride=2 if i == 0 and use_strided_conv else 1, weight_decay=weight_decay, dropout_rate=dropout_rate)

    x = GlobalAveragePooling2D()(x)
    if dropout_rate > 0.0:
        x = Dropout(dropout_rate)(x)
    x = Dense(num_classes, kernel_regularizer=l2(weight_decay))(x)
    x = Activation('softmax')(x)  # Softmax activation for classification

    return Model(input, x)


In [5]:
# @title WideResNet
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Input, Add, Activation, Dropout, Flatten, Dense, Conv2D, AveragePooling2D, BatchNormalization
from tensorflow.keras.regularizers import l2

def initial_conv(input, weight_decay, activation='relu'):
    """
    Initial convolutional layer for WideResNet.

    Args:
        input: Input tensor or layer.
        weight_decay: L2 regularization factor.
        activation: Activation function to use.

    Returns:
        Tensor after applying Conv2D, BatchNormalization, and the specified Activation.
    """
    x = Conv2D(16, (3, 3), padding='same', kernel_initializer='he_normal', kernel_regularizer=l2(weight_decay), use_bias=False)(input)
    x = BatchNormalization(momentum=0.1, epsilon=1e-5, gamma_initializer='uniform')(x)
    x = Activation(activation)(x)
    return x

def expand_conv(init, base, k, strides=(1, 1), weight_decay=0.0, activation='relu'):
    """
    Expanding convolution layer in WideResNet. Increases the dimensionality of the input tensor.

    Args:
        init: Initial tensor or input layer.
        base: Number of base filters.
        k: Width factor for scaling the number of filters.
        strides: Convolution strides.
        weight_decay: L2 regularization factor.
        activation: Activation function to use.

    Returns:
        Tensor after applying Conv2D, BatchNormalization, and Activation.
    """
    x = Conv2D(base * k, (3, 3), padding='same', strides=strides, kernel_initializer='he_normal', kernel_regularizer=l2(weight_decay), use_bias=False)(init)
    x = BatchNormalization(momentum=0.1, epsilon=1e-5, gamma_initializer='uniform')(x)
    x = Activation(activation)(x)

    x = Conv2D(base * k, (3, 3), padding='same', kernel_initializer='he_normal', kernel_regularizer=l2(weight_decay), use_bias=False)(x)

    # Creating a shortcut path to implement the skip connection
    skip = Conv2D(base * k, (1, 1), padding='same', strides=strides, kernel_initializer='he_normal', kernel_regularizer=l2(weight_decay), use_bias=False)(init)
    m = Add()([x, skip])  # Adding the skip connection
    return m

def conv_block(input, k, base, dropout, weight_decay, activation='relu'):
    """
    Standard convolutional block for WideResNet.

    Args:
        input: Input tensor or layer.
        k: Width factor for scaling the number of filters.
        base: Number of base filters.
        dropout: Dropout rate.
        weight_decay: L2 regularization factor.
        activation: Activation function to use.

    Returns:
        Tensor after applying Conv2D, BatchNormalization, Activation, and optionally Dropout.
    """
    init = input
    x = BatchNormalization(momentum=0.1, epsilon=1e-5, gamma_initializer='uniform')(input)
    x = Activation(activation)(x)
    x = Conv2D(base * k, (3, 3), padding='same', kernel_initializer='he_normal', kernel_regularizer=l2(weight_decay), use_bias=False)(x)

    if dropout > 0.0:
        x = Dropout(dropout)(x)

    x = BatchNormalization(momentum=0.1, epsilon=1e-5, gamma_initializer='uniform')(x)
    x = Activation(activation)(x)
    x = Conv2D(base * k, (3, 3), padding='same', kernel_initializer='he_normal', kernel_regularizer=l2(weight_decay), use_bias=False)(x)

    m = Add()([init, x])
    return m

def create_wide_residual_network(input_dim, nb_classes=100, N=2, k=1, dropout=0.0, verbose=1, weight_decay=1e-4, activation='relu'):
    """
    Creates a Wide Residual Network (WideResNet) with specified parameters.

    Args:
        input_dim: Dimension of the input data.
        nb_classes: Number of output classes for the network.
        N: Number of blocks in each group.
        k: Width factor for scaling the number of filters.
        dropout: Dropout rate.
        verbose: Verbosity mode.
        weight_decay: L2 regularization factor.
        activation: Activation function to use.

    Returns:
        A WideResNet model.
    """
    ip = Input(shape=input_dim)
    x = initial_conv(ip, weight_decay, activation=activation)
    nb_conv = 4  # Initial convolutional layer

    # First group of blocks
    x = expand_conv(x, 16, k, weight_decay=weight_decay, activation=activation)
    nb_conv += 2

    for i in range(N - 1):
        x = conv_block(x, k, 16, dropout, weight_decay, activation=activation)
        nb_conv += 2

    # Second group of blocks
    x = expand_conv(x, 32, k, strides=(2, 2), weight_decay=weight_decay, activation=activation)
    nb_conv += 2

    for i in range(N - 1):
        x = conv_block(x, k, 32, dropout, weight_decay, activation=activation)
        nb_conv += 2

    # Third group of blocks
    x = expand_conv(x, 64, k, strides=(2, 2), weight_decay=weight_decay, activation=activation)
    nb_conv += 2

    for i in range(N - 1):
        x = conv_block(x, k, 64, dropout, weight_decay, activation=activation)
        nb_conv += 2

    x = BatchNormalization(momentum=0.1, epsilon=1e-5, gamma_initializer='uniform')(x)
    x = Activation(activation)(x)

    x = AveragePooling2D((8, 8))(x)
    x = Flatten()(x)
    x = Dense(nb_classes, kernel_regularizer=l2(weight_decay))(x)
    x = Activation('softmax')(x)
    model = Model(ip, x)

    if verbose:
        print("Wide Residual Network-%d-%d created." % (nb_conv, k))
    return model


In [6]:
# @title Pretraining
# Assuming dataset_name is a string variable that contains the name of the dataset
if dataset_name == 'SVHN':
    # Instantiate PreActResNet18 for SVHN
    model = PreActResNet18(input_shape=init_shape, num_classes=10, weight_decay=weight_decay, dropout_rate=dropout, activation=activation)

elif dataset_name == 'CIFAR10':
    # Create WRN-28-10 for CIFAR10
    model = create_wide_residual_network(init_shape, nb_classes=10, N=4, k=8, dropout=dropout, weight_decay=weight_decay, activation=activation)

elif dataset_name == 'CIFAR100':
    # Create WRN-28-10 for CIFAR100
    model = create_wide_residual_network(init_shape, nb_classes=100, N=4, k=8, dropout=dropout, weight_decay=weight_decay, activation=activation)

else:
    raise ValueError("Unknown dataset")

# Compile the model
model.compile(
    optimizer=Adam(learning_rate=0.001),
    loss='categorical_crossentropy',
    metrics=['accuracy']
)

pretrain=False # Change this as needed
load_saved_model = False  # Change this as needed

if pretrain: # In case you want to force local convexity
    if load_saved_model:
        # Load saved model and evaluate
        model = load_model('init_model_pretrained.h5')
        model.evaluate(x_test, y_test)
    else:
        # Train the model
        history = model.fit(
            datagen.flow(x_train, y_train, batch_size=batch_size),
            validation_data=(x_test, y_test),
            epochs=epochs
            )
        model.save('init_model_pretrained.h5')




Wide Residual Network-28-8 created.


In [7]:
# Variants of cross entropy loss
ccs = tf.keras.losses.CategoricalCrossentropy(reduction='sum') # reduction sum
cce = tf.keras.losses.CategoricalCrossentropy(reduction=tf.keras.losses.Reduction.NONE) # reduction none to get access to each loss

In [8]:
# @title PGD attack

def clip_epsilon(tensor, epsilon):
    """
    Clips the input tensor values to be within the range [-epsilon, epsilon].

    :param tensor: Input tensor.
    :param epsilon: Maximum allowed absolute value for elements in the tensor.
    :return: Tensor with values clipped to the specified range.
    """
    return tf.clip_by_value(tensor, -epsilon, epsilon)

def pgd_attack(model, x, y, epsilon, alpha, attack_iters, clip_min=0.0, clip_max=1.0):
    """
    Performs the Projected Gradient Descent (PGD) attack on a batch of images.

    Args:
        model: The neural network model to attack.
        x: Input images (batch).
        y: True labels for x.
        epsilon: The maximum perturbation allowed (L-infinity norm).
        alpha: Step size for each iteration of the attack.
        attack_iters: Number of iterations for the attack.
        clip_min: Minimum pixel value after perturbation.
        clip_max: Maximum pixel value after perturbation.

    Returns:
        A batch of adversarial images generated from the input images.
    """

    # Normalize epsilon and alpha according to the image scale (0-255)
    epsilon = epsilon / 255.0
    alpha = alpha / 255.0

    # Initialize adversarial images with random perturbations
    adv_x = x + tf.random.uniform(tf.shape(x), minval=-epsilon, maxval=epsilon)
    adv_x = tf.clip_by_value(adv_x, clip_min, clip_max)  # Ensure they stay within valid pixel range

    # Iteratively apply the PGD attack
    for _ in tf.range(attack_iters):
        with tf.GradientTape() as tape:
            tape.watch(adv_x)  # Watch the adversarial examples for gradient computation
            logits = model(adv_x)  # Compute the model's output on the adversarial examples
            loss = model.compiled_loss(y, logits)  # Calculate loss

        # Compute gradients of the loss w.r.t. adversarial examples
        gradients = tape.gradient(loss, adv_x)

        # Update adversarial examples using the sign of the gradients
        adv_x = adv_x + alpha * tf.sign(gradients)

        # Clip the adversarial examples to stay within epsilon-ball and valid pixel range
        adv_x = x + clip_epsilon(adv_x-x, epsilon)
        adv_x = tf.clip_by_value(adv_x, clip_min, clip_max)

    return adv_x





def evaluate_pgd(model, dataset, epsilon=8, alpha=2, attack_iters=50, restarts=1, batch_size=64):
    adversarial_acc_metric = tf.keras.metrics.CategoricalAccuracy()

    total_batches = sum(1 for _ in dataset.batch(batch_size))
    print(f"\nEvaluating PGD-{attack_iters}-{restarts} on {total_batches} batches...")

    progbar = tf.keras.utils.Progbar(total_batches * restarts)
    batch_count = 0

    # Evaluate clean accuracy using model.evaluate
    clean_results = model.evaluate(dataset.batch(batch_size), verbose=0)
    standard_accuracy = clean_results[1]  # Assuming the accuracy is the second metric

    for batch_num, (x, y) in enumerate(dataset.batch(batch_size)):
        best_adv_x = x
        for restart_num in range(restarts):
            adv_x = pgd_attack(model, x, y, epsilon, alpha, attack_iters)

            # Keep adversarial examples where model predictions are incorrect
            incorrect_preds = tf.argmax(model(adv_x, training=False), axis=1) != tf.argmax(y, axis=1)
            incorrect_preds = tf.reshape(incorrect_preds, [-1, 1, 1, 1])
            best_adv_x = tf.where(incorrect_preds, adv_x, best_adv_x)

            progbar.update(batch_count + 1)
            batch_count += 1

        logits_adv = model(best_adv_x, training=False)
        adversarial_acc_metric.update_state(y, logits_adv)

    adversarial_accuracy = adversarial_acc_metric.result().numpy()

    print(f"PGD-{attack_iters}-{restarts} Evaluation complete. \nValidation Accuracy: {100.0*standard_accuracy:.2f}%, PGD-{attack_iters}-{restarts} Adversarial Accuracy: {100.0*adversarial_accuracy:.2f}%")
    return {"standard_accuracy": standard_accuracy, "adversarial_accuracy": adversarial_accuracy}





In [9]:
# @title Checkpoint


class AdversarialCheckpoint(Callback):
    def __init__(self, dataset, epsilon=8.0, alpha=2.0, attack_iters=50, restarts=1, adv_batch_size=64):
        """
        Callback to evaluate model performance on adversarial examples after training.

        :param dataset: tf.data.Dataset for evaluation.
        :param epsilon: Maximum perturbation for PGD attack.
        :param alpha: Step size for PGD attack.
        :param attack_iters: Number of iterations for PGD attack.
        :param restarts: Number of restarts for PGD attack.
        :param batch_size: Batch size for evaluation.
        """
        super(AdversarialCheckpoint, self).__init__()
        self.dataset = dataset
        self.epsilon = epsilon
        self.alpha = alpha
        self.attack_iters = attack_iters
        self.restarts = restarts
        self.adv_batch_size = adv_batch_size
        self.adv_accuracy = 0.0
        self.test_accuracy = 0.0

    def on_train_end(self, logs=None):
        """
        Called at the end of training. Evaluates the model on both clean and adversarial examples.
        """
        # Evaluate the model on adversarial examples using PGD with multiple restarts
        accuracies = evaluate_pgd(
            model=self.model, dataset=self.dataset, epsilon=self.epsilon, alpha=self.alpha,
            attack_iters=self.attack_iters, restarts=self.restarts, adv_batch_size=self.adv_batch_size
        )

        self.adv_accuracy = accuracies["adversarial_accuracy"]
        self.test_accuracy = accuracies["standard_accuracy"]



class AdversarialCheckpoint_Epochs(Callback):
    def __init__(self, dataset, epsilon=8.0, alpha=2.0, attack_iters=50, restarts=1, adv_batch_size=64):
        """
        Callback to evaluate model performance on adversarial examples after each epoch.

        :param dataset: tf.data.Dataset for evaluation.
        :param epsilon: Maximum perturbation for PGD attack.
        :param alpha: Step size for PGD attack.
        :param attack_iters: Number of iterations for PGD attack.
        :param restarts: Number of restarts for PGD attack.
        :param batch_size: Batch size for evaluation.
        """
        super(AdversarialCheckpoint_Epochs, self).__init__()
        self.dataset = dataset
        self.epsilon = epsilon
        self.alpha = alpha
        self.attack_iters = attack_iters
        self.restarts = restarts
        self.adv_batch_size = adv_batch_size

    def on_epoch_end(self, epoch, logs=None):
        """
        Called at the end of each epoch. Evaluates the model on both clean and adversarial examples.
        """
        # Evaluate the model on adversarial examples using PGD with multiple restarts
        accuracies = evaluate_pgd(
            model=self.model, dataset=self.dataset, epsilon=self.epsilon, alpha=self.alpha,
            attack_iters=self.attack_iters, restarts=self.restarts, batch_size=self.adv_batch_size
        )

        self.adv_accuracy = accuracies["adversarial_accuracy"]
        self.test_accuracy = accuracies["standard_accuracy"]

In [10]:
# Quick test to use the architectures

if False:
    model = create_wide_residual_network(input_dim=(32, 32, 3), nb_classes=10, N=4, k=8, dropout=0.0, activation=activation)
    model.compile(optimizer='adam', loss='categorical_crossentropy', metrics=['accuracy'])

    dataset_test = tf.data.Dataset.from_tensor_slices((x_test, y_test))

    adv_checkpoint = AdversarialCheckpoint_Epochs(dataset=dataset_test, epsilon=8.0, alpha=2.0, attack_iters=50, restarts=1, adv_batch_size=1024)
    model.fit(x_train, y_train, epochs=10, batch_size=128, validation_data=(x_test, y_test), callbacks=[adv_checkpoint])



In [11]:
# @title Cyclic learning rate

class CyclicLR(tf.keras.optimizers.schedules.LearningRateSchedule):
    """
    Cyclic learning rate scheduler.

    This scheduler varies the learning rate between a maximum and a minimum value in a cosine
    wave pattern. The learning rate starts at `lr_max` and gradually decreases to `lr_min`,
    then goes back to `lr_max` in a cyclic fashion.

    Args:
        lr_max (float): The maximum learning rate.
        lr_min (float): The minimum learning rate.
        nb_epochs (int): The number of epochs over which the learning rate cycles.
    """
    def __init__(self, lr_max, lr_min, nb_epochs):
        self.lr_max = lr_max
        self.lr_min = lr_min
        self.nb_epochs = nb_epochs

    def __call__(self, step):
        """
        Calculate the learning rate for a given step.
        Args:
            step (int): The current training step.

        Returns:
            float: The calculated learning rate.
        """
        # Calculate the current epoch based on the step
        epoch = tf.cast(step // self.nb_epochs, dtype=tf.float32)

        # Calculate the position in the current cycle
        cycle = tf.constant(np.pi, dtype=tf.float32) * epoch / tf.cast(self.nb_epochs, dtype=tf.float32)

        # Calculate and return the cyclic learning rate
        return self.lr_min + 0.5 * (self.lr_max - self.lr_min) * (1 + tf.math.cos(cycle))

# Parameters for the cyclic learning rate
lr_max = 0.2
lr_min = 0.001
nb_epochs = epochs

# Create an instance of the CyclicLR scheduler
cyclic_lr_schedule = CyclicLR(lr_max, lr_min, nb_epochs)


In [12]:
# @title $l^p$-FGSM


class Lp_FGSM(Model):
    """
    Lp Fast Gradient Sign Method (FGSM) adversarial training model.

    This class extends the Keras Model class, enabling the creation of adversarial examples
    based on an Lp norm during training.

    Attributes:
        base_model: The underlying model for making predictions.
        p (float): The norm degree for Lp norm.
        eps (float): Maximum perturbation allowed for adversarial examples.
        cce (tf.keras.losses.Loss): Custom categorical cross-entropy loss function.
        add_noise (bool): Flag to add noise to the inputs for adversarial generation.
        vareps (float): A small constant for numerical stability in norm calculations.
    """

    def __init__(self, base_model, p=64.0, eps=8.0, cce=None, add_noise=True, vareps=1e-12, *args, **kwargs):
        super(Lp_FGSM, self).__init__(*args, **kwargs)
        self.base_model = base_model  # The underlying model for predictions
        self.p = tf.constant(p, dtype=tf.float32)
        self.q = self.p / (self.p - 1.0)
        self.eps = tf.constant(eps / 255.0, dtype=tf.float32)
        self.vareps = tf.constant(vareps, dtype=tf.float32)
        self.add_noise = add_noise
        self.cce = cce if cce is not None else tf.keras.losses.CategoricalCrossentropy(reduction=tf.keras.losses.Reduction.NONE)


    @tf.function(jit_compile=True)
    def call(self, inputs, training=True):
        return self.base_model(inputs, training=training)

    @tf.function(jit_compile=True)
    def train_step(self, data):
        """
        Custom training step for the Lp_FGSM adversarial training.

        Args:
            data: Tuple of (input data, labels).

        Returns:
            Dictionary mapping metric names to their current value.
        """
        x, y = data  # Unpack the data
        probs = self.base_model(x, training=True)
        # Generate adversarial examples
        if self.add_noise:
            # Add random noise and compute the Lp norm perturbation
            x_rnd = tf.random.uniform(tf.shape(x), minval=-1.0, maxval=1.0)
            x_adv = x + self.eps * x_rnd
            Ups_ = tf.pow(tf.reduce_sum(tf.pow(tf.abs(x_rnd), self.q), axis=[1, 2, 3], keepdims=True), 1.0 / self.q)
            Upsilon = tf.pow(self.vareps + tf.abs(x_rnd) / (Ups_), self.q - 1.0)
            x_rnd = tf.sign(x_rnd) * Upsilon
            x_aug = x + self.eps * x_rnd
        else:
            x_aug = x
            x_adv = x

        # Gradient computation for the augmented data
        with tf.GradientTape(watch_accessed_variables=False, persistent=False) as inner_tape:
            inner_tape.watch(x_aug)
            probs_aug = self.base_model(x_aug, training=True)
            loss_sum = self.cce(y, probs_aug)
        dlx = inner_tape.gradient(loss_sum, x_aug)
        dlxq = tf.pow(tf.reduce_sum(tf.pow(tf.abs(dlx), self.q), axis=[1, 2, 3], keepdims=True), 1.0 / self.q)
        Upsilon = tf.pow(self.vareps + tf.abs(dlx) / (dlxq), self.q - 1.0)
        dlxn = tf.sign(dlx) * Upsilon
        x_adv += self.eps * dlxn

        # Training step for the adversarial data
        with tf.GradientTape(watch_accessed_variables=False, persistent=False) as tape:
            tape.watch(self.base_model.trainable_variables)
            probs_adv = self.base_model(x_adv, training=True)
            losses_adv = self.cce(y, probs_adv)
            loss_total = tf.reduce_mean(losses_adv)

        grads = tape.gradient(loss_total, self.base_model.trainable_variables)
        self.optimizer.apply_gradients(zip(grads, self.base_model.trainable_variables))
        self.compiled_metrics.update_state(y, probs)
        return {m.name: m.result() for m in self.metrics}


In [13]:
# @title Training module
def train_model(dataset_name, datagen, x_test, y_test, eps=8.0, vareps=1e-12, p=32.0, add_noise=True, epochs=30, learning_rate=0.001, weight_decay=5e-4, batch_size=64, dropout=0.0, pretrain_epochs=0, cyclic_lr=False):
    """
    Trains a neural network model based on the specified dataset.

    Args:
        dataset_name: String name of the dataset (e.g., 'SVHN', 'CIFAR10', 'CIFAR100').
        datagen: Data generator for training (e.g., with data augmentation).
        x_test, y_test: Test dataset.
        eps: Perturbation limit for adversarial training.
        vareps: A small constant for numerical stability in adversarial training.
        p: The norm degree for Lp norm in adversarial training.
        add_noise: Flag to add noise during adversarial training.
        epochs: Number of training epochs.
        learning_rate: Learning rate for the optimizer.
        weight_decay: Weight decay for L2 regularization.
        batch_size: Batch size for training.
        dropout: Dropout rate for the model.
        pretrain: If True, pretrains the model.
        cyclic_lr: If True, uses cyclic learning rate; otherwise uses a constant learning rate.

    Returns:
        adv_accuracy_linf: Adversarial accuracy on Linf norm perturbed data.
        test_accuracy: Accuracy on the clean test dataset.
    """

    # Select the appropriate model based on the dataset
    if dataset_name == 'SVHN':
        # PreActResNet18 for SVHN dataset
        num_classes = 10
        init_shape = (32, 32, 3)
        model = PreActResNet18(input_shape=init_shape, num_classes=num_classes, weight_decay=weight_decay, dropout_rate=dropout, activation='relu')
    elif dataset_name in ['CIFAR10', 'CIFAR100']:
        # WideResNet for CIFAR datasets
        num_classes = 10 if dataset_name == 'CIFAR10' else 100
        init_shape = (32, 32, 3)
        model = create_wide_residual_network(init_shape, nb_classes=num_classes, N=4, k=8, dropout=dropout, weight_decay=weight_decay, activation='relu')
    else:
        raise ValueError("Unknown dataset")

    # Compile the model
    model.compile(optimizer=Adam(learning_rate=learning_rate, weight_decay=weight_decay), loss="categorical_crossentropy", metrics=["accuracy"])

    # Pretrain the model if specified
    if pretrain_epochs>0:
        model.fit(datagen.flow(x_train, y_train, batch_size=batch_size), validation_data=(x_test, y_test), epochs=pretrain_epochs)

    # Initialize and compile the Lp_FGSM model for adversarial training
    model_lp = Lp_FGSM(base_model=model,p=p, eps=eps, vareps=vareps, add_noise=add_noise)
    if cyclic_lr:
        # Use a cyclic learning rate schedule if specified
        model_lp.compile(optimizer=SGD(learning_rate=cyclic_lr_schedule, weight_decay=weight_decay, momentum=0.9), loss="categorical_crossentropy", metrics=["accuracy"])
    else:
        # Use a constant learning rate otherwise
        model_lp.compile(optimizer=Adam(learning_rate=learning_rate, weight_decay=weight_decay), loss="categorical_crossentropy", metrics=["accuracy"])

    # Set up adversarial checkpoint callback
    dataset_test = tf.data.Dataset.from_tensor_slices((x_test, y_test))
    adv_callback = AdversarialCheckpoint_Epochs(dataset=dataset_test, epsilon=eps, attack_iters=50, restarts=1, adv_batch_size=1024)
    callbacks_list = [adv_callback]

    # Train the model using adversarial training
    model_lp.fit(datagen.flow(x_train, y_train, batch_size=batch_size), epochs=epochs, validation_data=(x_test, y_test), callbacks=callbacks_list, verbose=1)

    # Extract and return the final adversarial and clean test accuracies
    adv_accuracy_linf = adv_callback.adv_accuracy
    test_accuracy = adv_callback.test_accuracy

    return adv_accuracy_linf, test_accuracy


In [14]:
adv_accuracy_linf, test_accuracy = train_model(dataset_name, datagen, x_test, y_test, eps=8.0, vareps=1e-12, p=64.0, add_noise=True, epochs=epochs, learning_rate=0.001, weight_decay=weight_decay, batch_size=batch_size, dropout=dropout, pretrain_epochs=00, cyclic_lr=False)

Wide Residual Network-28-8 created.
Epoch 1/30
  6/391 [..............................] - ETA: 35s - accuracy: 0.1823



Evaluating PGD-50-1 on 10 batches...
PGD-50-1 Evaluation complete. 
Validation Accuracy: 42.09%, PGD-50-1 Adversarial Accuracy: 29.13%
Epoch 2/30
Evaluating PGD-50-1 on 10 batches...
PGD-50-1 Evaluation complete. 
Validation Accuracy: 48.20%, PGD-50-1 Adversarial Accuracy: 31.67%
Epoch 3/30
Evaluating PGD-50-1 on 10 batches...
PGD-50-1 Evaluation complete. 
Validation Accuracy: 51.84%, PGD-50-1 Adversarial Accuracy: 34.24%
Epoch 4/30
Evaluating PGD-50-1 on 10 batches...
PGD-50-1 Evaluation complete. 
Validation Accuracy: 59.49%, PGD-50-1 Adversarial Accuracy: 36.83%
Epoch 5/30
Evaluating PGD-50-1 on 10 batches...
PGD-50-1 Evaluation complete. 
Validation Accuracy: 62.24%, PGD-50-1 Adversarial Accuracy: 38.16%
Epoch 6/30
Evaluating PGD-50-1 on 10 batches...
PGD-50-1 Evaluation complete. 
Validation Accuracy: 60.44%, PGD-50-1 Adversarial Accuracy: 38.99%
Epoch 7/30
Evaluating PGD-50-1 on 10 batches...
PGD-50-1 Evaluation complete. 
Validation Accuracy: 64.26%, PGD-50-1 Adversarial Accura

In [None]:
# @title Figure 6 subplot


# Initialize matrices to store results
num_seeds = 5
epochs= 30
vareps=0.0
dropout=0.0
weight_decay=0.0
eps=8.0
add_noise=False
batch_size=64

cyclic_lr=True
ps = [2, 4, 8, 16, 32, 64, 128, 256]
A_adv = np.zeros((len(ps), num_seeds))
A_test = np.zeros((len(ps), num_seeds))

for seed in range(num_seeds):
    # Set the random seed for reproducibility
    np.random.seed(seed)
    tf.random.set_seed(seed)

    # Loop over each value of p
    for i, p in enumerate(ps):
    # Loop over each seed value
        # Run the training model, assuming that other parameters are set or passed explicitly
        adv_accuracy_linf, test_accuracy = train_model(dataset_name, datagen, x_test, y_test,eps=eps, vareps=vareps, p=p, add_noise=add_noise, epochs=epochs, weight_decay=weight_decay, batch_size=batch_size, dropout=dropout, pretrain_epochs=0)

        # Store the results in the matrices
        A_adv[i, seed] = adv_accuracy_linf
        A_test[i, seed] = test_accuracy

    # Save matrices to disk
    np.save('CIFAR10_Fig6_adv.npy', A_adv)
    np.save('CIFAR10_Fig6_test.npy', A_test)



