# Adversarial Robustness Testing with PyTorch

## This notebook demonstrates:
1. Training a simple image classification model
2. Attacking the model using adversarial examples
3. Measuring robustness degradation
4. Applying a basic defense
5. Interpreting results for AI security & governance

## Why this matters:
- Adversarial attacks are a real AI security risk
- Regulators and auditors expect robustness evidence
- This notebook produces measurable, auditable results

## Why This Notebook Matters

### Security:
- Demonstrates real adversarial vulnerability
- Tests defensive effectiveness

### Governance:
- Produces measurable risk metrics
- Supports AI risk assessments
- Aligns with OWASP AI, NIT AI RMF and ISO/IEC 42001 expectations


## Install the appropriate packages

In [2]:
!pip install adversarial-robustness-toolbox torch torchvision



## Imports

### We use:
- PyTorch for model training
- NumPy for numerical operations
- ART for attacks and defenses

ART works on NumPy arrays, so we will convert tensors where needed.

In [3]:
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np

from torchvision import datasets, transforms
from torch.utils.data import DataLoader

from art.estimators.classification import PyTorchClassifier, SklearnClassifier
from art.attacks.evasion import FastGradientMethod, ProjectedGradientDescent
from art.defences.preprocessor import FeatureSqueezing

from sklearn.linear_model import LogisticRegression

## Loading the MNIST Dataset

MNIST contains handwritten digits (0‚Äì9).

Key points:
- Images are grayscale (1 channel)
- Values are normalized to [0, 1]
- This normalization must match ART's `clip_values`

In [4]:
transform = transforms.Compose([transforms.ToTensor()])

train_dataset = datasets.MNIST(root="./data", train=True, download=True, transform=transform)
test_dataset = datasets.MNIST(root="./data", train=False, download=True, transform=transform)

train_loader = DataLoader(train_dataset, batch_size=128, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=1000, shuffle=False)

# Convert test set to NumPy (ART requires NumPy)
x_test = np.concatenate([x.numpy() for x, _ in test_loader], axis=0)
y_test = np.concatenate([y.numpy() for _, y in test_loader], axis=0)

100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 9.91M/9.91M [00:00<00:00, 19.8MB/s]
100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 28.9k/28.9k [00:00<00:00, 542kB/s]
100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 1.65M/1.65M [00:00<00:00, 4.28MB/s]
100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 4.54k/4.54k [00:00<00:00, 9.14MB/s]


## Model Architecture

### We use a small CNN:
- Convolution layer to extract features
- Fully connected layers for classification

### This model is intentionally simple so that:
- Attacks are clearly visible
- Robustness degradation is easy to interpret

In [5]:
class SimpleCNN(nn.Module):
    def __init__(self):
        super().__init__()
        self.net = nn.Sequential(
            nn.Conv2d(1, 32, kernel_size=3),
            nn.ReLU(),
            nn.Flatten(),
            nn.Linear(26 * 26 * 32, 128),
            nn.ReLU(),
            nn.Linear(128, 10)
        )

    def forward(self, x):
        return self.net(x)

model = SimpleCNN()

criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

## Training the Model

### We train for only a few epochs:
- High accuracy is not the goal
- Robustness testing is the goal

### Security insight:
A highly accurate model can still be extremely fragile.

In [6]:
model.train()
for epoch in range(3):
    for x, y in train_loader:
        optimizer.zero_grad()
        loss = criterion(model(x), y)
        loss.backward()
        optimizer.step()

print("Training complete")

Training complete


## Wrapping the Model with ART

### This step allows ART to:
- Compute gradients
- Generate adversarial examples
- Apply defenses

The `clip_values` parameter defines valid input boundaries.

In [7]:
classifier = PyTorchClassifier(
    model=model,
    loss=criterion,
    optimizer=optimizer,
    input_shape=(1, 28, 28),
    nb_classes=10,
    clip_values=(0.0, 1.0)
)

## Utility: ensure_float32
This helper function enforces a **dtype contract** between NumPy and PyTorch.

### Why it exists:
- ART attacks and preprocessors often return `float64`
- PyTorch models are typically trained in `float32`
- Mixing these causes runtime failures

`ensure_float32` guarantees that all data entering the model is compatible and stable, which is critical in agent-based pipelines.


In [8]:
def ensure_float32(x):
    if x.dtype != np.float32:
        return x.astype(np.float32)
    return x

## Accuracy Helper

We use the same function to measure:
- Clean accuracy
- Adversarial accuracy
- Defended accuracy

In [9]:
def accuracy(x, y):
    x = ensure_float32(x)  # enforce contract
    preds = classifier.predict(x)
    return np.mean(np.argmax(preds, axis=1) == y)

In [10]:
print("Clean accuracy:", accuracy(x_test, y_test))

Clean accuracy: 0.9832


## Red Team Agent (Attack Planner)

### Responsibility

- Select attack type
- Escalate attack strength
- Stop when risk threshold breached

### Adversarial Attacks

- FGSM: Fast, single-step attack
- PGD: Strong, iterative attack (industry standard)
- These simulate malicious input manipulation.

### Example Strategy

- Start with FGSM
- Increase Œµ until accuracy < threshold
- Escalate to PGD
- Record worst-case robustness



In [11]:
class RedTeamAgent:
    def __init__(self, classifier, accuracy_fn):
        self.classifier = classifier
        self.accuracy_fn = accuracy_fn

    def run(self, x, y):
        x = ensure_float32(x)
        results = []

        for eps in [0.05, 0.1, 0.2, 0.3]:
            fgsm = FastGradientMethod(self.classifier, eps=eps)
            x_adv = ensure_float32(fgsm.generate(x))
            results.append({
                "attack": "FGSM",
                "epsilon": eps,
                "accuracy": self.accuracy_fn(x_adv, y)
            })

        pgd = ProjectedGradientDescent(
            self.classifier, eps=0.3, eps_step=0.05, max_iter=40
        )
        x_adv = ensure_float32(pgd.generate(x))
        results.append({
            "attack": "PGD",
            "epsilon": 0.3,
            "accuracy": self.accuracy_fn(x_adv, y)
        })

        return results

## Evaluator Agent (Risk Quantification)

### Responsibility

- Turn failures into risk metrics
- Produce audit-ready outputs

In [12]:
class EvaluatorAgent:
    def summarize(self, attack_results):
        worst = min(attack_results, key=lambda r: r["accuracy"])
        return {
            "worst_attack": worst["attack"],
            "worst_accuracy": worst["accuracy"]
        }

# Blue Team Agent (Defense & Risk Treatment)

## Responsibility

- Apply defenses
- Re-test robustness
- Decide if residual risk is acceptable

## Defense: Feature Squeezing
### Feature squeezing:

- Reduces input precision
- Removes adversarial noise
- Is fast and low-cost
- This is a defensive control, not a silver bullet.

In [13]:
class BlueTeamAgent:
    def __init__(self, classifier, accuracy_fn):
        self.classifier = classifier
        self.accuracy_fn = accuracy_fn
        self.squeezer = FeatureSqueezing(bit_depth=5, clip_values=(0.0, 1.0))
        self.fgsm = FastGradientMethod(estimator=self.classifier, eps=0.2)
        self.pgd = ProjectedGradientDescent(
            estimator=self.classifier,
            eps=0.3,
            eps_step=0.05,
            max_iter=40
        )

    def defend_and_test(self, x, y):
        x = ensure_float32(x)

        # Clean ‚Üí Defense
        x_clean_def = ensure_float32(self.squeezer(x)[0])
        clean_acc = self.accuracy_fn(x_clean_def, y)

        # FGSM ‚Üí Defense
        x_fgsm = ensure_float32(self.fgsm.generate(x))
        x_fgsm_def = ensure_float32(self.squeezer(x_fgsm)[0])
        fgsm_acc = self.accuracy_fn(x_fgsm_def, y)

        # PGD ‚Üí Defense
        x_pgd = ensure_float32(self.pgd.generate(x))
        x_pgd_def = ensure_float32(self.squeezer(x_pgd)[0])
        pgd_acc = self.accuracy_fn(x_pgd_def, y)

        return {
            "defense": "FeatureSqueezing",
            "clean_defended_accuracy": clean_acc,
            "fgsm_defended_accuracy": fgsm_acc,
            "pgd_defended_accuracy": pgd_acc
        }

## Detector Agent (Detection-Based Control)

The Detector Agent implements a **detection-based compensating control** for adversarial and abusive inputs.

### Why this agent exists
Even with adversarial training, no model can be made fully robust against all adaptive attacks.  
Instead of relying solely on prevention, the Detector Agent enables **early identification and containment** of suspicious inputs.

This aligns with real-world security practice:
- *Prevent what you can*
- *Detect what you can‚Äôt fully prevent*
- *Contain and escalate residual risk*

### What the Detector Agent does
The Detector Agent:
- Learns to distinguish **clean inputs vs adversarial inputs**
- Flags inputs that exhibit adversarial characteristics
- Produces a **detector flag rate**, which is used as a governance signal

### Detector flag rate (key metric)
The **detector flag rate** is defined as:

> The fraction of inputs flagged as adversarial or anomalous in a given batch or time window.

This metric answers:
- *‚ÄúHow hostile is the input environment?‚Äù*
- *‚ÄúAre we under active attack or abuse?‚Äù*

### How it is used in governance
The detector flag rate is combined with robustness metrics (PGD / FGSM accuracy) in the **Multi-Metric Risk Gate** to:
- Trigger escalation
- Enable human-in-the-loop routing
- Activate compensating controls

### Why detection is critical
Detection provides:
- Protection against unknown or future attacks
- Defense against abuse even when accuracy remains high
- Evidence of active threat conditions

### Governance alignment
- **ISO/IEC 42001:** Operational monitoring and compensating controls  
- **NIST AI RMF:** MANAGE function (risk containment)  
- **OWASP AI:** Input Manipulation, Model Abuse

In [14]:
class DetectorAgent:
    def __init__(self):
        self.detector = None

    def train(self, x_clean, x_adv):
        x = np.concatenate([x_clean, x_adv]).reshape(len(x_clean)*2, -1)
        y = np.concatenate([np.zeros(len(x_clean)), np.ones(len(x_adv))])

        model = LogisticRegression(max_iter=1000)
        model.fit(x, y)

        self.detector = SklearnClassifier(model=model, clip_values=(0,1))

    def flag_rate(self, x):
        x = x.reshape(len(x), -1)
        preds = self.detector.predict(x)
        return np.argmax(preds, axis=1).mean()

# Adversarial Training Agent

## This agent is only triggered on escalation.

### What it does (simply)

- Generates PGD adversarial examples
- Mixes them with clean data
- Retrains the model
- Updates the ART classifier

### Why this design?

- Explicit
- Deterministic
- No hidden callbacks
- Easy to audit

In [15]:
class AdversarialTrainingAgent:
    def __init__(self, classifier, model, optimizer, loss_fn):
        self.classifier = classifier
        self.model = model
        self.optimizer = optimizer
        self.loss_fn = loss_fn

        self.pgd = ProjectedGradientDescent(
            estimator=self.classifier,
            eps=0.3,
            eps_step=0.05,
            max_iter=40
        )

    def retrain(self, x_train, y_train, epochs=2, adv_ratio=0.5):
        x_train = ensure_float32(x_train)

        for epoch in range(epochs):
            # Generate PGD adversarial examples
            x_adv = ensure_float32(self.pgd.generate(x_train))

            split = int(len(x_train) * adv_ratio)
            x_mix = np.concatenate([x_train[:split], x_adv[:split]])
            y_mix = np.concatenate([y_train[:split], y_train[:split]])

            x_mix_t = torch.from_numpy(x_mix)
            y_mix_t = torch.from_numpy(y_mix)

            self.model.train()
            self.optimizer.zero_grad()
            loss = self.loss_fn(self.model(x_mix_t), y_mix_t)
            loss.backward()
            self.optimizer.step()

            print(f"[ADV TRAIN] Epoch {epoch+1} | Loss={loss.item():.4f}")

        return {
            "status": "RETRAINED",
            "epochs": epochs,
            "adv_ratio": adv_ratio
        }

## MultiMetricRiskGate
The MultiMetricRiskGate evaluates **multiple risk signals together** to make a single governance decision.

Signals evaluated:
- PGD defended accuracy (worst-case robustness)
- FGSM defended accuracy (casual attacker)
- Detector flag rate (active abuse signal)
- Clean accuracy (business impact)

This avoids single-metric blind spots and enables policy-driven, enterprise-grade risk decisions.


In [16]:
class MultiMetricRiskGate:
    def __init__(self, policy):
        self.metrics = policy["metrics"]
        self.actions = policy["actions"]

    def evaluate(self, observed):
        triggered = []
        score = 0.0

        for name, rule in self.metrics.items():
            val = observed[name]
            w = rule["weight"]

            if "escalate_below" in rule and val < rule["escalate_below"]:
                triggered.append(name); score += w
            elif "warn_below" in rule and val < rule["warn_below"]:
                score += 0.5 * w

            if "escalate_above" in rule and val > rule["escalate_above"]:
                triggered.append(name); score += w

        if triggered:
            return {**self.actions["escalate"], "triggered_by": triggered, "risk_score": score}

        return {**self.actions["accept"], "risk_score": score}

## RiskPolicyLoader
This component loads the **external risk policy** from YAML.

Why this matters:
- Governance rules are **decoupled from code**
- Risk thresholds can be changed without redeployment
- Policies are version-controlled and auditable

This aligns with ISO/NIST expectations for documented, reviewable risk criteria.


In [20]:
import yaml

class RiskPolicyLoader:
    @staticmethod
    def load(path: str) -> dict:
        with open(path, "r") as f:
            return yaml.safe_load(f)

# Orchestrating the Red Team Exercise

## Engineering reasons

- Owns state (model lifecycle)
- Owns control flow
- Coordinates agents
- Enforces contracts
- Governance reasons
- Single point of decision authority
- Explicit risk acceptance logic
- Auditable ‚Äúwho decided what and why‚Äù

### This aligns directly with:

- ISO/IEC 42001 (decision ownership)
- NIST AI RMF (GOVERN + MANAGE)

In [18]:
class AIOrchestrator:
    """
    Central governance and execution authority.

    Responsibilities:
    - Run red-team attacks
    - Evaluate worst-case robustness
    - Apply blue-team defenses
    - Measure detection signals
    - Aggregate metrics
    - Enforce policy-driven risk decisions
    - Trigger adversarial training on escalation
    """

    def __init__(
        self,
        classifier,
        model,
        optimizer,
        loss_fn,
        accuracy_fn,
        policy_path
    ):
        # --- Agents ---
        self.red_agent = RedTeamAgent(classifier, accuracy_fn)
        self.eval_agent = EvaluatorAgent()
        self.blue_agent = BlueTeamAgent(classifier, accuracy_fn)
        self.detector_agent = DetectorAgent()
        self.adv_train_agent = AdversarialTrainingAgent(
            classifier, model, optimizer, loss_fn
        )

        # --- Governance ---
        policy = RiskPolicyLoader.load(policy_path)
        self.risk_gate = MultiMetricRiskGate(policy)

        self.classifier = classifier

    def run(
        self,
        x_test,
        y_test,
        retrain_on_escalation=True,
        retrain_epochs=2,
        retrain_adv_ratio=0.5,
    ):
        """
        Executes one full governance cycle.
        """

        # ----------------------------
        # 1. Red Team (Attacks)
        # ----------------------------
        attack_results = self.red_agent.run(x_test, y_test)

        # ----------------------------
        # 2. Evaluation (Worst Case)
        # ----------------------------
        evaluation_summary = self.eval_agent.summarize(attack_results)

        # ----------------------------
        # 3. Blue Team (Defense + Attacks)
        # ----------------------------
        defense_results = self.blue_agent.defend_and_test(x_test, y_test)

        # ----------------------------
        # 4. Detection Signal
        # ----------------------------
        pgd = ProjectedGradientDescent(
            estimator=self.classifier,
            eps=0.3,
            eps_step=0.05,
            max_iter=40
        )

        x_adv = ensure_float32(pgd.generate(x_test))

        # Train detector on a subset (governance-friendly, bounded)
        self.detector_agent.train(
            x_clean=x_test[:2000],
            x_adv=x_adv[:2000]
        )

        detector_flag_rate = self.detector_agent.flag_rate(x_test)

        # ----------------------------
        # 5. Aggregate Metrics
        # ----------------------------
        observed_metrics = {
            "pgd_defended_accuracy": defense_results["pgd_defended_accuracy"],
            "fgsm_defended_accuracy": defense_results["fgsm_defended_accuracy"],
            "clean_accuracy": defense_results["clean_defended_accuracy"],
            "detector_flag_rate": detector_flag_rate,
        }

        # ----------------------------
        # 6. Policy-Driven Risk Decision
        # ----------------------------
        decision = self.risk_gate.evaluate(observed_metrics)

        result = {
            "attack_results": attack_results,
            "evaluation": evaluation_summary,
            "defense": defense_results,
            "metrics": observed_metrics,
            "decision": decision,
        }

        # ----------------------------
        # 7. Automatic Mitigation (if allowed)
        # ----------------------------
        if (
            decision["decision"] == "ESCALATE"
            and retrain_on_escalation
        ):
            print("üö® Risk escalation triggered ‚Üí adversarial training")

            retrain_info = self.adv_train_agent.retrain(
                x_train=x_test[:5000],
                y_train=y_test[:5000],
                epochs=retrain_epochs,
                adv_ratio=retrain_adv_ratio,
            )

            # ----------------------------
            # 8. Post-Retraining Re-evaluation
            # ----------------------------
            print("üîÅ Re-running governance cycle after retraining")

            post_attack_results = self.red_agent.run(x_test, y_test)
            post_eval_summary = self.eval_agent.summarize(post_attack_results)
            post_defense_results = self.blue_agent.defend_and_test(x_test, y_test)

            x_adv_post = ensure_float32(pgd.generate(x_test))
            self.detector_agent.train(
                x_clean=x_test[:2000],
                x_adv=x_adv_post[:2000]
            )
            post_detector_flag_rate = self.detector_agent.flag_rate(x_test)

            post_metrics = {
                "pgd_defended_accuracy": post_defense_results["pgd_defended_accuracy"],
                "fgsm_defended_accuracy": post_defense_results["fgsm_defended_accuracy"],
                "clean_accuracy": post_defense_results["clean_defended_accuracy"],
                "detector_flag_rate": post_detector_flag_rate,
            }

            post_decision = self.risk_gate.evaluate(post_metrics)

            result["post_training"] = {
                "retraining": retrain_info,
                "attack_results": post_attack_results,
                "evaluation": post_eval_summary,
                "defense": post_defense_results,
                "metrics": post_metrics,
                "decision": post_decision,
            }

        return result

## Call Orchestrator to execute the attack

In [None]:
orchestrator = AIOrchestrator(
    classifier=classifier,
    model=model,
    optimizer=optimizer,
    loss_fn=criterion,
    accuracy_fn=accuracy,
    policy_path="risk_policy.yaml"
)

results = orchestrator.run(x_test, y_test)
print(results)
