In [1]:
import torch
#import torchvision
from torch.utils.data import DataLoader
from torchvision import datasets, transforms

# import numpy as np
# import matplotlib.pyplot as plt
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.optim.lr_scheduler import MultiStepLR
from torch.autograd import Variable

from torchmetrics.classification import Accuracy
# import wandb

from tqdm import tqdm

In [2]:
import os
os.environ["CUDA_VISIBLE_DEVICES"]="0"
device = "cuda" if torch.cuda.is_available() else "cpu"

In [91]:
## Resnet Model

class BasicBlock(nn.Module):
    expansion = 1

    def __init__(self, in_channels, out_channels, stride=1):
        super().__init__()
        self.conv1 = nn.Conv2d(in_channels=in_channels, out_channels=out_channels, kernel_size=3, stride=stride, padding=1, bias=False)
        self.bn1 = nn.BatchNorm2d(out_channels)
        self.conv2 = nn.Conv2d(in_channels=out_channels, out_channels=out_channels, kernel_size=3, stride=1, padding=1, bias=False)
        self.bn2 = nn.BatchNorm2d(out_channels)

        self.residual = nn.Sequential()
        if stride != 1 or in_channels != self.expansion * out_channels:
            self.residual = nn.Sequential(
                nn.Conv2d(in_channels, self.expansion * out_channels, kernel_size=1, stride=stride, bias=False),
                nn.BatchNorm2d(self.expansion * out_channels)
            )
    
    def forward(self, x):
        out = F.relu(self.bn1(self.conv1(x)), inplace=True)
        out = F.relu(self.bn2(self.conv2(out)), inplace=True)
        out = out + self.residual(x)
        out = F.relu(out, inplace=True)
        return out


class ResNet(nn.Module):
    def __init__(self, block: BasicBlock, num_blocks: list[int], num_classes: int=10):
        super().__init__()
        self.in_channels = 64

        self.conv1 = nn.Conv2d(in_channels=3, out_channels=64, kernel_size=3, stride=1, padding=1, bias=False)
        self.bn1 = nn.BatchNorm2d(64)
        self.layer0 = self.make_layer(block, 64, num_blocks[0], stride=1)
        self.layer1 = self.make_layer(block, 128, num_blocks[1], stride=2)
        self.layer2 = self.make_layer(block, 256, num_blocks[2], stride=2)
        self.layer3 = self.make_layer(block, 512, num_blocks[3], stride=2)
        self.linear = nn.Linear(512 * block.expansion, num_classes)


    
    def make_layer(self, block, out_channels, num_blocks, stride):
        layers = []

        layers.append(block(self.in_channels, out_channels, stride=stride))
        self.in_channels = out_channels * block.expansion

        for _ in range(1, num_blocks):
            layers.append(block(self.in_channels, out_channels))
        
        return nn.Sequential(*layers)
    
    def forward(self, x):
        out = F.relu(self.bn1(self.conv1(x)), inplace=True)
        out = self.layer0(out)
        out = self.layer1(out)
        out = self.layer2(out)
        out = self.layer3(out)
        out = F.avg_pool2d(out, 4)
        out = out.view(out.size(0), -1)
        logits = self.linear(out)
        return logits

def ResNet18():
    return ResNet(BasicBlock, [2,2,2,2])


In [92]:
clean_model = ResNet18().to(device)
base_model_path = "./notebooks/checkpoints/model_epoch_87_valloss_0.3925.pth"
checkpoint = torch.load(base_model_path, weights_only=True)
clean_model.load_state_dict(checkpoint["model_state_dict"])

<All keys matched successfully>

In [5]:
test_cifar = datasets.CIFAR10(root='./data', download=True, train=False, transform=transforms.ToTensor())
test_loader_cifar = DataLoader(dataset=test_cifar, batch_size=128, shuffle=False)

In [6]:
import torchattacks

In [50]:
# fgsm = torchattacks.FGSM(clean_model, eps=8/255)
# pgd20 = torchattacks.PGD(clean_model, eps=8/255, alpha=0.003137, steps=20, random_start=True)
# cw_inf = torchattacks.CW(clean_model, c=1e-4, kappa=0, steps=1000)

# attacks= ["natural", fgsm, pgd20] #, cw_inf]

In [None]:
# model = ResNet18().to(device)
# check_path = "/scratch/joluseti/projects/adversarial_ml/notebooks/checkpoints/"
# checkpoint = torch.load(check_path, weights_only=True)
# model.load_state_dict(checkpoint["model_state_dict"])

In [84]:
class BasicBlock(nn.Module):
    expansion = 1

    def __init__(self, in_planes, planes, stride=1):
        super(BasicBlock, self).__init__()
        self.conv1 = nn.Conv2d(in_planes, planes, kernel_size=3, stride=stride, padding=1, bias=False)
        self.bn1 = nn.BatchNorm2d(planes)
        self.conv2 = nn.Conv2d(planes, planes, kernel_size=3, stride=1, padding=1, bias=False)
        self.bn2 = nn.BatchNorm2d(planes)

        self.shortcut = nn.Sequential()
        if stride != 1 or in_planes != self.expansion * planes:
            self.shortcut = nn.Sequential(
                nn.Conv2d(in_planes, self.expansion * planes, kernel_size=1, stride=stride, bias=False),
                nn.BatchNorm2d(self.expansion * planes)
            )

    def forward(self, x):
        out = F.relu(self.bn1(self.conv1(x)))
        out = self.bn2(self.conv2(out))
        out += self.shortcut(x)
        out = F.relu(out)
        return out


class Bottleneck(nn.Module):
    expansion = 4

    def __init__(self, in_planes, planes, stride=1):
        super(Bottleneck, self).__init__()
        self.conv1 = nn.Conv2d(in_planes, planes, kernel_size=1, bias=False)
        self.bn1 = nn.BatchNorm2d(planes)
        self.conv2 = nn.Conv2d(planes, planes, kernel_size=3, stride=stride, padding=1, bias=False)
        self.bn2 = nn.BatchNorm2d(planes)
        self.conv3 = nn.Conv2d(planes, self.expansion * planes, kernel_size=1, bias=False)
        self.bn3 = nn.BatchNorm2d(self.expansion * planes)

        self.shortcut = nn.Sequential()
        if stride != 1 or in_planes != self.expansion * planes:
            self.shortcut = nn.Sequential(
                nn.Conv2d(in_planes, self.expansion * planes, kernel_size=1, stride=stride, bias=False),
                nn.BatchNorm2d(self.expansion * planes)
            )

    def forward(self, x):
        out = F.relu(self.bn1(self.conv1(x)))
        out = F.relu(self.bn2(self.conv2(out)))
        out = self.bn3(self.conv3(out))
        out += self.shortcut(x)
        out = F.relu(out)
        return out


class ResNet_2(nn.Module):
    def __init__(self, block, num_blocks, num_classes=10):
        super().__init__()
        self.in_planes = 64

        self.conv1 = nn.Conv2d(3, 64, kernel_size=3, stride=1, padding=1, bias=False)
        self.bn1 = nn.BatchNorm2d(64)
        self.layer1 = self._make_layer(block, 64, num_blocks[0], stride=1)
        self.layer2 = self._make_layer(block, 128, num_blocks[1], stride=2)
        self.layer3 = self._make_layer(block, 256, num_blocks[2], stride=2)
        self.layer4 = self._make_layer(block, 512, num_blocks[3], stride=2)
        self.linear = nn.Linear(512 * block.expansion, num_classes)

    def _make_layer(self, block, planes, num_blocks, stride):
        strides = [stride] + [1] * (num_blocks - 1)
        layers = []
        for stride in strides:
            layers.append(block(self.in_planes, planes, stride))
            self.in_planes = planes * block.expansion
        return nn.Sequential(*layers)

    def forward(self, x):
        out = F.relu(self.bn1(self.conv1(x)))
        out = self.layer1(out)
        out = self.layer2(out)
        out = self.layer3(out)
        out = self.layer4(out)
        out = F.avg_pool2d(out, 4)
        out = out.view(out.size(0), -1)
        out = self.linear(out)
        return out


def ResNet18():
    return ResNet_2(BasicBlock, [2, 2, 2, 2])

In [87]:
adv_model = ResNet18().to(device)
# adv_model_path = "/scratch/joluseti/projects/adversarial_ml/notebooks/adv_checkpoints/model_epoch_81.pth"
adv_model_path ="/scratch/joluseti/projects/MART/resnet_new/model-res-epoch100.pt"
checkpoint = torch.load(adv_model_path, weights_only=True)
# adv_model.load_state_dict(checkpoint["model_state_dict"])
adv_model.load_state_dict(checkpoint)

<All keys matched successfully>

In [55]:
# def pgd_attack(model, inputs, labels, epsilon=0.031, step_size=0.007, num_steps=10, random_start=True, device="cuda"):
#     """
#     Performs a Projected Gradient Descent (PGD) attack on a model.
#     PGD is an iterative adversarial attack that perturbs the input data to maximize
#     the loss, constrained by a maximum L-infinity perturbation of epsilon.
#     Args:
#         model: The neural network model to attack.
#         inputs: Input tensor of shape (batch_size, channels, height, width).
#         labels: True labels corresponding to the inputs.
#         epsilon: Maximum L-infinity norm of the perturbation (default: 0.031).
#         step_size: Step size for each iteration (default: 0.007).
#         num_steps: Number of PGD iterations (default: 10).
#         random_start: Whether to start with a random perturbation (default: True).
#         device: Device to perform the attack on (default: "cuda").
#     Returns:
#         torch.Tensor: Adversarial examples of the same shape as inputs.
#     """
#     model.eval()
#     # Keep original clean samples
#     x_clean = inputs.clone().detach()

#     delta = torch.rand_like(inputs, device=device) * 2 * epsilon - epsilon
#     delta = torch.clamp(delta, -epsilon, epsilon)
#     delta.requires_grad = True

#     for step in range(num_steps):
#         x_adv = x_clean + delta
        
#         # for stable batch norm stats and to disable dropout
#         with torch.enable_grad():
#             outputs = model(x_adv)
#         loss = F.cross_entropy(outputs, labels)

#         grad = torch.autograd.grad(loss, [delta])[0]
#         delta = delta.detach() + step_size * torch.sign(grad)
#         delta = torch.clamp(delta, -epsilon, epsilon)

#         # clamp adversarial sample to valid pixel range        
#         x_adv_clamped = torch.clamp(x_clean + delta, 0.0, 1.0)
        
#         # get accurate delat from clamped x
#         delta = x_adv_clamped - x_clean
#         delta = delta.detach()
        
#         if step < num_steps - 1:
#             delta.requires_grad = True
    
#     # final adversarial sample
#     x_adv = (x_clean + delta).detach()
#     return x_adv


# def validate(model, dataloader, metrics, num_steps=20, epsilon=0.031, step_size=0.003, device="cuda"):
#     metrics.reset()
#     model.eval()

#     for images, labels in tqdm(dataloader, desc=f"Validation"):
#         images, labels = images.to(device, non_blocking=True), labels.to(device, non_blocking=True)
        
#         # ---------- clean accuracy ----------
#         with torch.no_grad():
#             logits_clean = model(images)

#         metrics["val/clean_acc"].update(logits_clean.argmax(dim=-1), labels)

#         # ---------- adversarial accuracy ----------
#         x_adv = pgd_attack(model, inputs=images, labels=labels, epsilon=epsilon, step_size=step_size, num_steps=num_steps)
#         with torch.no_grad():
#             logits_adv = model(x_adv)        
#         metrics["val/adv_acc"].update(logits_adv.argmax(dim=-1), labels)

#     return metrics.compute()

# from torchmetrics import Accuracy, MetricCollection

# val_metrics   = MetricCollection({
#         "clean_acc" : Accuracy(task="multiclass", num_classes=10),
#         "adv_acc"   : Accuracy(task="multiclass", num_classes=10)}).to(device).clone(prefix="val/")

# val_stats = validate(model=adv_model,
#                     dataloader=test_loader_cifar,
#                     metrics=val_metrics)

# val_stats

In [11]:




def evaluate_model(attacks, model, dataloader, device="cuda"):
    model.eval()
    results = {}

    for attack in attacks:
        correct = 0
        total   = 0
        name    = attack if attack=="natural" else attack.__class__.__name__
        print(f"\n→ Evaluating {name}")

        for images, labels in tqdm(dataloader, desc=name):
            images, labels = images.to(device), labels.to(device)

            # natural accuracy
            if attack=="natural":
                with torch.no_grad():
                    preds = model(images).argmax(1)
                correct += (preds == labels).sum().item()

            # adversarial accuracy
            else:
                # 4) check the magnitude once
                adv = attack(images, labels)
                δ = (adv-images).view(images.size(0),-1).abs().max(1)[0]
                # print(f"  avg perturb {δ.mean():.4f}")

                with torch.no_grad():
                    preds = model(adv).argmax(1)
                correct += (preds == labels).sum().item()

            total += labels.size(0)

        acc = 100*correct/total
        results[name] = acc
        print(f"  → {name} accuracy = {acc:.2f}%")

    return results




In [8]:
# -------- basic pre‑activation block ------------------------------------------
class BasicBlock(nn.Module):
    def __init__(self, in_ch, out_ch, stride, drop_rate=0.0):
        super().__init__()
        self.equal_io = (in_ch == out_ch)

        self.bn1   = nn.BatchNorm2d(in_ch)
        self.relu1 = nn.ReLU(inplace=True)
        self.conv1 = nn.Conv2d(in_ch, out_ch, 3, stride=stride, padding=1, bias=False)

        self.bn2   = nn.BatchNorm2d(out_ch)
        self.relu2 = nn.ReLU(inplace=True)
        self.conv2 = nn.Conv2d(out_ch, out_ch, 3, padding=1, bias=False)

        self.drop_rate = drop_rate
        self.shortcut  = None if self.equal_io else nn.Conv2d(
            in_ch, out_ch, 1, stride=stride, bias=False)

    def forward(self, x):
        out = self.relu1(self.bn1(x)) if self.equal_io else x
        out = self.conv1(out)
        out = self.relu2(self.bn2(out))
        if self.drop_rate > 0:
            out = F.dropout(out, p=self.drop_rate, training=self.training)
        out = self.conv2(out)
        return (x if self.equal_io else self.shortcut(x)) + out


# -------- stack of BasicBlocks -----------------------------------------------
class NetworkBlock(nn.Module):
    def __init__(self, n_layers, in_ch, out_ch, stride, drop_rate):
        super().__init__()
        layers = []
        for i in range(n_layers):
            layers.append(
                BasicBlock(
                    in_ch if i == 0 else out_ch,
                    out_ch,
                    stride if i == 0 else 1,
                    drop_rate,
                )
            )
        self.block = nn.Sequential(*layers)

    def forward(self, x):
        return self.block(x)


# -------- Wide ResNet ---------------------------------------------------------
class WideResNet(nn.Module):
    """
    Wide ResNet (WRN‑d‑k)        – Zagoruyko & Komodakis, 2016
    depth d = 6n+4, widen factor k.

    Example: WRN‑34‑10  -> depth=34, widen_factor=10
    """
    def __init__(self, depth=34, widen_factor=10, num_classes=10, drop_rate=0.0):
        super().__init__()
        assert (depth - 4) % 6 == 0, "depth must be 6n+4"
        n = (depth - 4) // 6

        ch = [16, 16 * widen_factor, 32 * widen_factor, 64 * widen_factor]

        self.conv1  = nn.Conv2d(3, ch[0], 3, padding=1, bias=False)
        self.block1 = NetworkBlock(n, ch[0], ch[1], stride=1, drop_rate=drop_rate)
        self.block2 = NetworkBlock(n, ch[1], ch[2], stride=2, drop_rate=drop_rate)
        self.block3 = NetworkBlock(n, ch[2], ch[3], stride=2, drop_rate=drop_rate)

        self.bn     = nn.BatchNorm2d(ch[3])
        self.relu   = nn.ReLU(inplace=True)
        self.pool   = nn.AdaptiveAvgPool2d(1)
        self.fc     = nn.Linear(ch[3], num_classes)

        self._init_weights()

    # --------------------------------------------------------------------------
    def _init_weights(self):
        for m in self.modules():
            if isinstance(m, nn.Conv2d):
                fan_out = m.kernel_size[0] * m.kernel_size[1] * m.out_channels
                m.weight.data.normal_(0, math.sqrt(2.0 / fan_out))
            elif isinstance(m, nn.BatchNorm2d):
                m.weight.data.fill_(1.0)
                m.bias.data.zero_()
            elif isinstance(m, nn.Linear):
                m.bias.data.zero_()

    # --------------------------------------------------------------------------
    def forward(self, x):
        x = self.conv1(x)          # 32×32 → 32×32
        x = self.block1(x)         # 32×32
        x = self.block2(x)         # 16×16
        x = self.block3(x)         # 8×8
        x = self.relu(self.bn(x))
        x = self.pool(x).flatten(1)  # global avg‑pool
        return self.fc(x)


In [9]:
import math
wrn_model = WideResNet().to(device)
base_model_path = "/scratch/joluseti/projects/adversarial_ml/results/models/wrn/main/best_valAcc=0.445.pth"
checkpoint = torch.load(base_model_path, weights_only=True)
stripped_check = {
    k.replace("_orig_mod.",""):v for k,v in checkpoint["model_state"].items()
}
wrn_model.load_state_dict(stripped_check)

<All keys matched successfully>

In [12]:
model = wrn_model.to(device).eval()
# 1) use the same model instance
# 2) pick a sane step size
fgsm  = torchattacks.FGSM(model, eps=8/255)
pgd20 = torchattacks.PGD(model,
                         eps=8/255,
                         alpha=2/255,
                         steps=20,
                         random_start=True)

attacks= ["natural", fgsm, pgd20] #, cw_inf]


res = evaluate_model(attacks, model, test_loader_cifar)
print(res)


→ Evaluating natural


natural: 100%|██████████| 79/79 [00:14<00:00,  5.54it/s]


  → natural accuracy = 64.43%

→ Evaluating FGSM


FGSM: 100%|██████████| 79/79 [00:42<00:00,  1.87it/s]


  → FGSM accuracy = 45.73%

→ Evaluating PGD


PGD: 100%|██████████| 79/79 [09:33<00:00,  7.26s/it]

  → PGD accuracy = 43.94%
{'natural': 64.43, 'FGSM': 45.73, 'PGD': 43.94}



