## Project 1
Select five CIFAR-10 (ℓ∞) models from RobustBench and re-evaluate them using AutoAttack under different values of the radius epsilon (e.g., from 1/255 to 16/255, regularly spaced interval including the baseline 8/255), using a subset of 100-200 samples. Compare the resulting robust accuracies and model rankings across these settings. Evaluate the stability of model rankings across different epsilon values. Identify cases where these changes lead to significant rank shifts and discuss what this reveals about the reliability of RobustBench leaderboards.


In [1]:
import torch
import torchvision.datasets
from robustbench.utils import load_model
from torch.utils.data import DataLoader, Subset
from autoattack import AutoAttack
from fractions import Fraction
import json
from pathlib import Path

In [4]:
class EvaluateOnAutoAttack:
    def __init__(self, models_names, epsilons, dataset, threat_model, device, checkpoint_path, batch_size):
        self._models_names = models_names
        self._epsilons = epsilons
        self._dataset = dataset
        self._threat_model = threat_model
        self._device = device
        self._checkpoint_path = checkpoint_path
        self._batch_size = batch_size

        self._test_data_loader = self._loadTestDataLoader()
        self._x_test, self._y_test = self._prepareTestForAutoAttack()

        self._models_aa_dict = dict()
        self._results_checkpoint = self._loadResultCheckpoint()

    def _loadTestDataLoader(self):
        if self._dataset == "cifar10":
            test_dataset = torchvision.datasets.CIFAR10(
                transform=torchvision.transforms.ToTensor(),
                train=False,
                root="./data/datasets",
                download=True,
            )

            test_dataset = Subset(test_dataset, list(range(200)))
            return DataLoader(test_dataset, batch_size=self._batch_size, shuffle=False)
        else:
            # TO-DO: manage error
            return None

    def _prepareTestForAutoAttack(self):
        all_x = []
        all_y = []

        for x, y in self._test_data_loader:
            all_x.append(x)
            all_y.append(y)

        x_test = torch.cat(all_x).to(self._device)    # shape [N, 3, H, W]
        y_test = torch.cat(all_y).to(self._device)     # shape [N]

        return x_test, y_test

    def _loadResultCheckpoint(self):
        if self._checkpoint_path.exists():
            with self._checkpoint_path.open("r") as f:
                return json.load(f)
        return {}

    def _saveResultCheckpoint(self):
        tmp = self._checkpoint_path.with_suffix(".tmp")
        with tmp.open("w") as f:
            json.dump(self._results_checkpoint, f, indent=2)
        tmp.replace(self._checkpoint_path)

    def _loadModel(self, model_name):
        if model_name in self._models_aa_dict:
            current_model = self._models_aa_dict[model_name]["model"]
        else:
            current_model = load_model(model_name=model_name, dataset=self._dataset, threat_model=self._threat_model)
            current_model.to(self._device)
            # current_model = BasePytorchClassifier(net)
            self._models_aa_dict[model_name] = dict()
            self._models_aa_dict[model_name]["model"] = current_model
            self._models_aa_dict[model_name]["clean_acc"] = self._getCleanAccuracy(current_model, self._test_data_loader)

        return current_model


    def _getCleanAccuracy(self, current_model, test_data):
        #return Accuracy()(current_model, test_data).item()
        current_model.eval()
        correct = 0
        total = 0

        with torch.no_grad():
            for x, y in test_data:
                x = x.to(self._device)
                y = y.to(self._device)

                logits = current_model(x)
                preds = logits.argmax(dim=1)

                correct += (preds == y).sum().item()
                total += y.size(0)

        return correct / total

    def _loadAutoAttack(self, current_model, current_epsilon):
        return AutoAttack(
            current_model,
            norm=self._threat_model,         # Linf
            eps=current_epsilon,
            version='standard',             # APGD-CE, APGD-DLR, FAB, Square Attack
            device=self._device
        )

    def _startAutoAttack(self, adversary):
        return adversary.run_standard_evaluation(
            self._x_test, self._y_test, bs=self._batch_size
        )

    def _getRobustAccuracy(self, current_model, x_adv):
        current_model.eval()
        with torch.no_grad():
            logits = current_model(x_adv)
            preds = logits.argmax(1)
            robust_acc = (preds == self._y_test).float().mean().item()

        return robust_acc

    def _emptyCache(self, x_adv):
        # optional: free memory between runs (often needed)
        del x_adv
        import gc
        gc.collect()
        if "cuda" in str(self._device):
            torch.cuda.empty_cache()

    def _computeRanking(self, epsilon_str):
        # compute ranking for this epsilon
        items = self._results_checkpoint[epsilon_str]

        # sort models by robust accuracy (descending)
        sorted_models = sorted(
            items.items(),
            key=lambda x: x[1]["robust_acc"],
            reverse=True
        )

        # assign ranks (1 = best)
        for rank, (model_name, data) in enumerate(sorted_models, start=1):
            data["rank"] = rank

    def attackModel(self):
        for epsilon_index, epsilon_str in enumerate(self._epsilons):
            current_epsilon = float(Fraction(epsilon_str))
            self._results_checkpoint.setdefault(epsilon_str, {})

            for model_index, model_name in enumerate(self._models_names):
                print(f"----------- CASE ({epsilon_index + 1}.{model_index + 1}) epsilon = {epsilon_str} & model = {model_name} -----------")
                if model_name in self._results_checkpoint[epsilon_str]:
                    print("!! SKIPPED because it's already been computed !!")
                    print("\n\n")
                    continue

                current_model = self._loadModel(model_name)

                adversary = self._loadAutoAttack(current_model, current_epsilon)
                adversary.verbose = True
                x_adv = self._startAutoAttack(adversary)
                robust_acc = self._getRobustAccuracy(current_model, x_adv)

                self._emptyCache(x_adv)

                clean_acc = self._models_aa_dict[model_name]['clean_acc']
                print(f"Clean Accuracy: {clean_acc}\nRobust Accuracy: {robust_acc}")
                print("\n\n")

                self._results_checkpoint[epsilon_str][model_name] = dict()
                self._results_checkpoint[epsilon_str][model_name]["clean_acc"] = clean_acc
                self._results_checkpoint[epsilon_str][model_name]["robust_acc"] = robust_acc

                self._saveResultCheckpoint()

            self._computeRanking(epsilon_str)
            self._saveResultCheckpoint()

In [8]:
DEVICE = "cuda" if torch.cuda.is_available() else "cpu"
# MODELS = [
#     "Carmon2019Unlabeled",
#     "Bartoldson2024Adversarial_WRN-82-8",
#     "Rebuffi2021Fixing_28_10_cutmix_ddpm",
#     "Wang2023Better_WRN-28-10",
#     "Cui2023Decoupled_WRN-28-10"
#]
MODELS = [
    "Carmon2019Unlabeled",
    "Sehwag2021Proxy_R18",
    "Rebuffi2021Fixing_R18_ddpm",
    "Wang2023Better_WRN-28-10",
    "Cui2023Decoupled_WRN-28-10"
]
EPSILONS = ["1/255", "4/255", "8/255", "12/255", "16/255"]
BATCH_SIZE = 16
CHECKPOINT_PATH = Path("./results_checkpoint.json")

In [9]:
auto_attack = EvaluateOnAutoAttack(
    models_names = MODELS,
    epsilons = EPSILONS,
    dataset="cifar10",
    threat_model="Linf",
    device=DEVICE,
    checkpoint_path=CHECKPOINT_PATH,
    batch_size=BATCH_SIZE
)

Files already downloaded and verified


In [10]:
auto_attack.attackModel()

----------- CASE (1.1) epsilon = 1/255 & model = Carmon2019Unlabeled -----------
!! SKIPPED because it's already been computed !!



----------- CASE (1.2) epsilon = 1/255 & model = Sehwag2021Proxy_R18 -----------
!! SKIPPED because it's already been computed !!



----------- CASE (1.3) epsilon = 1/255 & model = Rebuffi2021Fixing_R18_ddpm -----------
Downloading models\cifar10\Linf\Rebuffi2021Fixing_R18_ddpm.pt (gdrive_id=1--dxE66AsgBSUsuK2sXCTrsYUV9B5f95).


Downloading...
From (original): https://drive.google.com/uc?id=1--dxE66AsgBSUsuK2sXCTrsYUV9B5f95
From (redirected): https://drive.google.com/uc?id=1--dxE66AsgBSUsuK2sXCTrsYUV9B5f95&confirm=t&uuid=a3743637-c6cd-4bb9-8700-088cb9fd5372
To: C:\Users\stefa\Documents\workspace Python\AutoAttack\models\cifar10\Linf\Rebuffi2021Fixing_R18_ddpm.pt
100%|██████████| 50.3M/50.3M [00:01<00:00, 27.4MB/s]


setting parameters for standard version
using standard version including apgd-ce, apgd-t, fab-t, square.
initial accuracy: 85.50%
apgd-ce - 1/11 - 0 out of 16 successfully perturbed
apgd-ce - 2/11 - 0 out of 16 successfully perturbed
apgd-ce - 3/11 - 1 out of 16 successfully perturbed
apgd-ce - 4/11 - 1 out of 16 successfully perturbed
apgd-ce - 5/11 - 0 out of 16 successfully perturbed
apgd-ce - 6/11 - 0 out of 16 successfully perturbed
apgd-ce - 7/11 - 0 out of 16 successfully perturbed
apgd-ce - 8/11 - 0 out of 16 successfully perturbed
apgd-ce - 9/11 - 0 out of 16 successfully perturbed
apgd-ce - 10/11 - 0 out of 16 successfully perturbed
apgd-ce - 11/11 - 0 out of 11 successfully perturbed
robust accuracy after APGD-CE: 84.50% (total time 7.7 s)
apgd-t - 1/11 - 1 out of 16 successfully perturbed
apgd-t - 2/11 - 0 out of 16 successfully perturbed
apgd-t - 3/11 - 0 out of 16 successfully perturbed
apgd-t - 4/11 - 0 out of 16 successfully perturbed
apgd-t - 5/11 - 0 out of 16 success