In [6]:
# Adversarial Training and Model Robustness Study
# Import necessary libraries
import torch
import torch.nn as nn
import torch.optim as optim
import torchvision
import torchvision.transforms as transforms
from torch.utils.data import DataLoader
from torchvision.models import resnet50
from art.attacks.evasion import FastGradientMethod, ProjectedGradientDescent
from art.estimators.classification import PyTorchClassifier
from art.utils import load_cifar10
import numpy as np
from torch.utils.tensorboard import SummaryWriter
import matplotlib.pyplot as plt
from PIL import Image
import random

def rand_bbox(size, lam):
    """Generate random bounding box coordinates for CutMix.

    Args:
        size: Tensor size (B, C, W, H)
        lam: Lambda value for computing cut ratio

    Returns:
        Bounding box coordinates (x1, y1, x2, y2)
    """
    W = size[2]
    H = size[3]
    cut_rat = np.sqrt(1. - lam)
    cut_w = int(W * cut_rat)
    cut_h = int(H * cut_rat)

    # uniform
    cx = np.random.randint(W)
    cy = np.random.randint(H)

    bbx1 = np.clip(cx - cut_w // 2, 0, W)
    bby1 = np.clip(cy - cut_h // 2, 0, H)
    bbx2 = np.clip(cx + cut_w // 2, 0, W)
    bby2 = np.clip(cy + cut_h // 2, 0, H)

    return bbx1, bby1, bbx2, bby2

# 1. Data Loading and Preprocessing
class DatasetPreparation:
    def __init__(self, batch_size=128):
        self.batch_size = batch_size
        self.transform_train = transforms.Compose([
            transforms.RandomCrop(32, padding=4),
            transforms.RandomHorizontalFlip(),
            transforms.ToTensor(),
            transforms.Normalize((0.4914, 0.4822, 0.4465), (0.2023, 0.1994, 0.2010))
        ])
        self.transform_test = transforms.Compose([
            transforms.ToTensor(),
            transforms.Normalize((0.4914, 0.4822, 0.4465), (0.2023, 0.1994, 0.2010))
        ])

    def load_data(self):
        trainset = torchvision.datasets.CIFAR10(
            root='./data', train=True, download=True, transform=self.transform_train)
        trainloader = DataLoader(
            trainset, batch_size=self.batch_size, shuffle=True, num_workers=2)

        testset = torchvision.datasets.CIFAR10(
            root='./data', train=False, download=True, transform=self.transform_test)
        testloader = DataLoader(
            testset, batch_size=self.batch_size, shuffle=False, num_workers=2)

        return trainloader, testloader

# 2. Model Definition
class ModelSetup:
    def __init__(self, num_classes=10):
        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        self.model = resnet50(pretrained=True)
        # Modify the final layer for CIFAR-10
        self.model.fc = nn.Linear(self.model.fc.in_features, num_classes)
        self.model = self.model.to(self.device)

    def get_art_model(self, model, loss_fn, optimizer):
        return PyTorchClassifier(
            model=model,
            loss=loss_fn,
            optimizer=optimizer,
            input_shape=(3, 32, 32),
            nb_classes=10
        )

# 3. Training Functions
class ModelTrainer:
    def __init__(self, model, trainloader, testloader, writer):
        self.model = model
        self.trainloader = trainloader
        self.testloader = testloader
        self.writer = writer
        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        self.criterion = nn.CrossEntropyLoss()
        self.optimizer = optim.SGD(model.parameters(), lr=0.1, momentum=0.9, weight_decay=5e-4)
        self.scheduler = optim.lr_scheduler.CosineAnnealingLR(self.optimizer, T_max=200)

    def train_epoch(self, epoch):
        self.model.train()
        running_loss = 0.0
        correct = 0
        total = 0

        for batch_idx, (inputs, targets) in enumerate(self.trainloader):
            inputs, targets = inputs.to(self.device), targets.to(self.device)
            self.optimizer.zero_grad()
            outputs = self.model(inputs)
            loss = self.criterion(outputs, targets)
            loss.backward()
            self.optimizer.step()

            running_loss += loss.item()
            _, predicted = outputs.max(1)
            total += targets.size(0)
            correct += predicted.eq(targets).sum().item()

        accuracy = 100. * correct / total
        self.writer.add_scalar('Train/Loss', running_loss/(batch_idx+1), epoch)
        self.writer.add_scalar('Train/Accuracy', accuracy, epoch)
        return running_loss/(batch_idx+1), accuracy

# 4. Adversarial Attack Implementation
class AdversarialAttacks:
    def __init__(self, classifier):
        self.classifier = classifier

    def create_fgsm_attack(self, eps=0.3):
        return FastGradientMethod(estimator=self.classifier, eps=eps)

    def create_pgd_attack(self, eps=0.3, eps_step=0.1, max_iter=40):
        return ProjectedGradientDescent(
            estimator=self.classifier,
            eps=eps,
            eps_step=eps_step,
            max_iter=max_iter,
            targeted=False
        )

# 5. Robustness Evaluation
class RobustnessEvaluator:
    def __init__(self, model, testloader, writer):
        self.model = model
        self.testloader = testloader
        self.writer = writer
        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

    def evaluate_clean(self, epoch):
        self.model.eval()
        correct = 0
        total = 0
        with torch.no_grad():
            for inputs, targets in self.testloader:
                inputs, targets = inputs.to(self.device), targets.to(self.device)
                outputs = self.model(inputs)
                _, predicted = outputs.max(1)
                total += targets.size(0)
                correct += predicted.eq(targets).sum().item()

        accuracy = 100. * correct / total
        self.writer.add_scalar('Test/Clean_Accuracy', accuracy, epoch)
        return accuracy

    def evaluate_adversarial(self, attack, epoch):
        self.model.eval()
        correct = 0
        total = 0

        for inputs, targets in self.testloader:
            inputs, targets = inputs.to(self.device), targets.to(self.device)
            adv_inputs = attack.generate(x=inputs.cpu().numpy())
            adv_inputs = torch.FloatTensor(adv_inputs).to(self.device)

            outputs = self.model(adv_inputs)
            _, predicted = outputs.max(1)
            total += targets.size(0)
            correct += predicted.eq(targets).sum().item()

        accuracy = 100. * correct / total
        self.writer.add_scalar('Test/Adversarial_Accuracy', accuracy, epoch)
        return accuracy

# 6. Augmentation Methods
class AugmentationMethods:
    @staticmethod
    def cutmix(x, y, alpha=1.0):
        if alpha > 0:
            lam = np.random.beta(alpha, alpha)
        else:
            lam = 1

        batch_size = x.size()[0]
        index = torch.randperm(batch_size).to(x.device)

        bbx1, bby1, bbx2, bby2 = rand_bbox(x.size(), lam)
        x[:, :, bbx1:bbx2, bby1:bby2] = x[index, :, bbx1:bbx2, bby1:bby2]
        y_a, y_b = y, y[index]
        return x, y_a, y_b, lam

    @staticmethod
    def mixup(x, y, alpha=1.0):
        if alpha > 0:
            lam = np.random.beta(alpha, alpha)
        else:
            lam = 1

        batch_size = x.size()[0]
        index = torch.randperm(batch_size).to(x.device)

        mixed_x = lam * x + (1 - lam) * x[index, :]
        y_a, y_b = y, y[index]
        return mixed_x, y_a, y_b, lam

# 7. Main Training Loop
def main():
    # Initialize TensorBoard writer
    writer = SummaryWriter('runs/adversarial_training_experiment')

    # Setup data and model
    data_prep = DatasetPreparation()
    trainloader, testloader = data_prep.load_data()

    model_setup = ModelSetup()
    model = model_setup.model

    # Initialize trainer and evaluator
    trainer = ModelTrainer(model, trainloader, testloader, writer)
    evaluator = RobustnessEvaluator(model, testloader, writer)

    # Create ART classifier
    art_classifier = model_setup.get_art_model(
        model, trainer.criterion, trainer.optimizer)

    # Initialize attacks
    attacks = AdversarialAttacks(art_classifier)
    fgsm_attack = attacks.create_fgsm_attack()
    pgd_attack = attacks.create_pgd_attack()

    # Training loop
    num_epochs = 200
    for epoch in range(num_epochs):
        # Train one epoch
        train_loss, train_acc = trainer.train_epoch(epoch)

        # Evaluate on clean and adversarial examples
        clean_acc = evaluator.evaluate_clean(epoch)
        fgsm_acc = evaluator.evaluate_adversarial(fgsm_attack, epoch)
        pgd_acc = evaluator.evaluate_adversarial(pgd_attack, epoch)

        print(f'Epoch: {epoch}')
        print(f'Train Loss: {train_loss:.3f} | Train Acc: {train_acc:.3f}%')
        print(f'Clean Acc: {clean_acc:.3f}% | FGSM Acc: {fgsm_acc:.3f}% | PGD Acc: {pgd_acc:.3f}%')

        trainer.scheduler.step()

    writer.close()

if __name__ == "__main__":
    main()





2024-12-28 17:20:41.375500: I tensorflow/core/platform/cpu_feature_guard.cc:210] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: SSE4.1 SSE4.2 AVX AVX2 AVX512F AVX512_VNNI FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.


Downloading https://www.cs.toronto.edu/~kriz/cifar-10-python.tar.gz to ./data/cifar-10-python.tar.gz


100%|██████████| 170498071/170498071 [00:37<00:00, 4542762.96it/s]


Extracting ./data/cifar-10-python.tar.gz to ./data
Files already downloaded and verified


Downloading: "https://download.pytorch.org/models/resnet50-0676ba61.pth" to /Users/lastjan/.cache/torch/hub/checkpoints/resnet50-0676ba61.pth
100%|██████████| 97.8M/97.8M [00:19<00:00, 5.18MB/s]


KeyboardInterrupt: 