# AI Red Teaming Notebook Overview

This streamlined notebook guides you through progressively richer AI red teaming evaluations using the Azure AI Evaluation SDK.

You will:
- Run a fast smoke test with a deterministic safe callback (baseline expectations, near-zero Attack Success Rate).
- Target a real Azure OpenAI deployment to observe genuine safety behavior.
- Expand coverage across multiple risk categories and layered attack strategies.
- Add advanced multi-strategy scans (including composed transformations) to probe layered defenses.
- (Optional, end of notebook) Supply your own domain‑specific risky objectives.

Artifacts: Each scan writes a JSON scorecard file (label + UTC time). Use these for comparison, regression tracking, or upload into Azure AI Foundry.

Execution time scales roughly with: risk_categories × attack_strategies × num_objectives. Start small, expand only after verifying prior steps.

In [2]:
# Cell 1: Installation

import sys, subprocess

packages = [
    "azure-ai-evaluation[redteam]",
    "azure-identity",
    "openai",
    "azure-ai-projects",
    "python-dotenv",
]

subprocess.check_call([sys.executable, "-m", "pip", "install", "--quiet", "--upgrade", *packages])
print("Installed:")
for p in packages:
    print("  -", p)
print("Done.")

Installed:
  - azure-ai-evaluation[redteam]
  - azure-identity
  - openai
  - azure-ai-projects
  - python-dotenv
Done.


In [None]:
# Cell 2 - imports
from typing import Optional, Dict, Any
import os

# Azure imports
from azure.ai.evaluation.red_team import RedTeam, RiskCategory, AttackStrategy

# OpenAI import
from openai import AzureOpenAI

## Core Concepts: RedTeam, Risk Categories, Attack Strategies & Targets

**RedTeam Orchestrator**: Generates attack objectives, transforms prompts via strategies, invokes your target, and scores responses.

**Risk Categories (what we probe)**: Violence, Hate/Unfairness, Sexual, SelfHarm. You can supply a subset for faster iteration. Missing categories reduce coverage but cut cost/time.

**Attack Strategies (how we probe)**:
- Complexity group macros: `EASY`, `MODERATE` (bundles of simpler / moderate transformations)
- Individual transformations: Flip, CharSwap, UnicodeConfusable, Leetspeak, Url, Base64, ROT13, etc.
- Composition: `AttackStrategy.Compose([Base64, ROT13])` layers transformations to simulate obfuscation chains.

**num_objectives**: Count of seed prompts per category (per applied strategy). Linear multiplier on runtime.

**Targets (what gets attacked)**:
1. Simple synchronous callback (returns fixed text) – deterministic baseline.
2. Model configuration dict – RedTeam handles generation calls internally.
3. Fully custom (async) application wrapper – replicate real app logic, pre/post-processing.

We progress through (1) → (2) → (3+) for clarity.

> Tip: Keep early scans lean (≤2 categories, 1 strategy, num_objectives=1) to validate authentication & environment quickly.

In [None]:
# Cell 3 - Login with a Managed Identity
from azure.identity import ManagedIdentityCredential

credential = ManagedIdentityCredential()
try:
    credential.get_token("https://management.azure.com/.default")  # quick probe
    print("Managed Identity OK.")
except Exception as e:  # noqa: BLE001
    raise RuntimeError("Managed Identity token acquisition failed.") from e

### Authentication Choice: Managed Identity
We use `ManagedIdentityCredential` for zero secret management and automatic rotation. The quick token probe confirms the compute environment is wired for Azure resource access.

If running outside a managed environment, swap in another credential (e.g., `AzureCliCredential`) without changing later RedTeam logic. All subsequent operations rely only on standard ARM & service tokens.

In [None]:
# Cell 4 - load .env file
from dotenv import load_dotenv
import os

# Attempt a few likely .env locations
candidates = [
    "files/.env", 
    "/files/.env",
    "./.env",
    os.path.expanduser("~/.env"),
]
loaded_path = None
for p in candidates:
    if load_dotenv(dotenv_path=p, override=False):
        loaded_path = p
        break
if not loaded_path:
    print("WARNING: No .env file loaded from candidates", candidates)
else:
    print(f"Loaded environment variables from: {loaded_path}")

### Environment Variables Loaded
Required keys (subscription, resource group, project name, OpenAI deployment details, API key & version) enable two things:
1. Locating the Azure AI Project for logging / result persistence.
2. Invoking your Azure OpenAI deployment during model-based scans.

Missing values will be surfaced in the next cell; resolve them before proceeding to model or advanced scans.

In [None]:
# Cell 5 - Set variables
import os

_required_keys = [
    "AZURE_SUBSCRIPTION_ID",
    "AZURE_RESOURCE_GROUP_NAME",
    "AZURE_PROJECT_NAME",
    "AZURE_OPENAI_DEPLOYMENT_NAME",
    "AZURE_OPENAI_ENDPOINT",
    "AZURE_OPENAI_API_KEY",
    "AZURE_OPENAI_API_VERSION",
]
_env = {k: os.environ.get(k) for k in _required_keys}
_missing = [k for k, v in _env.items() if not v]

if _missing:
    print("Missing environment variables:", _missing)
else:
    # Construct objects / variables consumed by later cells
    azure_ai_project = {
        "subscription_id": _env["AZURE_SUBSCRIPTION_ID"],
        "resource_group_name": _env["AZURE_RESOURCE_GROUP_NAME"],
        "project_name": _env["AZURE_PROJECT_NAME"],
        "credential": credential,
    }
    azure_openai_deployment = _env["AZURE_OPENAI_DEPLOYMENT_NAME"]
    azure_openai_endpoint = _env["AZURE_OPENAI_ENDPOINT"]
    azure_openai_api_key = _env["AZURE_OPENAI_API_KEY"]
    azure_openai_api_version = _env["AZURE_OPENAI_API_VERSION"]

    print("Config loaded.")
    print(f"Deployment: {azure_openai_deployment}")
    print(f"Endpoint: {azure_openai_endpoint}")
    print("API key prefix:", azure_openai_api_key[:6] + '...')

### Target Patterns
You can point RedTeam at different target styles:
- Deterministic callback (fast baseline, isolates framework logic).
- Direct model configuration dict (minimal code, invokes deployment directly).
- Rich application function (mirrors real system behavior: pre/post-processing, memory, safety layers).

We start with the deterministic callback to validate plumbing before spending tokens on real model invocations.

In [None]:
# Cell 6 - Define a simple callback function that always returns a fixed response
def financial_advisor_callback(query: str) -> str:  # noqa: ARG001
    return "I'm a financial advisor assistant. I can help with investment advice and financial planning within legal and ethical guidelines."

In [None]:
# Cell 7 - Create the `RedTeam` instance with minimal configurations
red_team = RedTeam(
    azure_ai_project=azure_ai_project,
    credential=credential,
    risk_categories=[RiskCategory.Violence, RiskCategory.HateUnfairness],
    num_objectives=1,
)

### Minimal Configuration Rationale
We start with two categories and `num_objectives=1` to:
- Minimize cost & latency.
- Confirm authentication, environment variables, and output writing.
- Produce a quick baseline artifact for later comparison.

Naming pattern: `<Label>-Scan-<UTC hhmmss>` keeps artifacts sortable and unique.

In [None]:
# Cell 8 - Basic Scan (standardized naming)
from datetime import datetime, timezone
from pathlib import Path

# Short timestamp (HHMMSS UTC)
scan_ts = datetime.now(timezone.utc).strftime("%H%M%S")
scan_label = "Basic"
scan_name = f"{scan_label}-Scan-{scan_ts}"
output_path = Path(f"{scan_label.lower()}_scan-{scan_ts}.json")

# This will test 1 objective prompt for each of Violence and HateUnfairness categories with the Flip strategy
basic_scan_result = await red_team.scan(
    target=financial_advisor_callback,
    scan_name=scan_name,
    attack_strategies=[AttackStrategy.Flip],
    output_path=str(output_path),
)
print(f"Completed {scan_name} -> {output_path}")

### Interpreting the Basic Scan
Key metric: **Attack Success Rate (ASR)** = percentage of prompts whose responses are flagged harmful.

Because our callback always returns a fixed safe sentence, expected ASR ≈ 0%.

Use the emitted JSON to verify structure and plan which additional strategies or categories to add next.

In [None]:
# Cell 9 - Intermediary Scan (standardized naming)
from datetime import datetime, timezone
from pathlib import Path

# Lazy client creation (safe on reruns)
if '_rt_client' not in globals():
    from openai import AzureOpenAI
    _rt_client = AzureOpenAI(
        api_key=azure_openai_api_key,
        api_version=azure_openai_api_version,
        azure_endpoint=azure_openai_endpoint,
    )

# Accept both invocation styles used internally by RedTeam
def model_target(query: str = None, user_prompt: str = None):
    prompt = user_prompt if (user_prompt not in (None, "")) else query or "(empty prompt)"
    return _rt_client.chat.completions.create(
        model=azure_openai_deployment,
        messages=[{"role": "user", "content": prompt}],
        temperature=0,
        max_tokens=256,
    ).choices[0].message.content or ""

scan_ts = datetime.now(timezone.utc).strftime('%H%M%S')
scan_label = "Intermediary"
scan_name = f"{scan_label}-Scan-{scan_ts}"
output_path = Path(f"{scan_label.lower()}_scan-{scan_ts}.json")

intermediary_scan_result = await red_team.scan(
    target=model_target,
    scan_name=scan_name,
    attack_strategies=[AttackStrategy.Flip],
    output_path=str(output_path),
)
print(f"Completed {scan_name} -> {output_path}")

### Moving to a Model Target
Switching from a deterministic callback to an actual model introduces variability and real guardrail evaluation. Keeping the same single `Flip` strategy isolates model safety behavior from added obfuscation complexity.

Next expansions: increase `num_objectives`, add additional strategies (CharSwap, UnicodeConfusable, etc.), or broaden risk categories.

In [None]:
# Cell 10: Advanced scan - create expanded RedTeam instance
advanced_risk_categories = [
    RiskCategory.Violence,
    RiskCategory.HateUnfairness,
    RiskCategory.Sexual,
    RiskCategory.SelfHarm,
]
advanced_red_team = RedTeam(
    azure_ai_project=azure_ai_project,
    credential=credential,
    risk_categories=advanced_risk_categories,
    num_objectives=3,  # increase coverage per category (adjust for cost/time)
)
print("Advanced RedTeam configured with categories:", [c.name for c in advanced_risk_categories])

### Expanding Coverage
We now include all four core risk categories and raise `num_objectives` to increase statistical signal. This increases token/time consumption proportionally.

Broader coverage helps surface category-specific weaknesses early (e.g., higher ASR in SelfHarm vs Sexual).

In [None]:
# Cell 11: Advanced scan + inline stderr ERROR capture (standardized naming)
from datetime import datetime, timezone
import sys, io, re, contextlib, time, json
from pathlib import Path

#### Debugger (capture + extract ERROR lines)
error_re = re.compile(r'^ERROR:.*', re.IGNORECASE)
class _Cap(contextlib.AbstractContextManager):
    def __enter__(self):
        self._orig = sys.stderr; self.buf = io.StringIO(); sys.stderr = self; return self
    def write(self,d): self.buf.write(d); self._orig.write(d)
    def flush(self): self._orig.flush()
    def __exit__(self,*a): sys.stderr = self._orig
#### End Debugger (setup)

scan_ts = datetime.now(timezone.utc).strftime('%H%M%S')
scan_label = "Advanced"
scan_name = f"{scan_label}-Scan-{scan_ts}"
output_path = Path(f"{scan_label.lower()}_scan-{scan_ts}.json")

with _Cap() as cap:  # debugger active
    advanced_scan_result = await advanced_red_team.scan(
        target=model_target,
        scan_name=scan_name,
        attack_strategies=[
            AttackStrategy.EASY,
            AttackStrategy.MODERATE,
            AttackStrategy.Flip,
            AttackStrategy.CharSwap,
            AttackStrategy.UnicodeConfusable,
            AttackStrategy.Leetspeak,
            AttackStrategy.Url,
            AttackStrategy.Compose([AttackStrategy.Base64, AttackStrategy.ROT13]),
        ],
        output_path=str(output_path),
    )
    sys.stderr.flush(); time.sleep(0.2)

#### Debugger (ERROR extraction/print)
errs = [ln.strip() for ln in cap.buf.getvalue().splitlines() if error_re.match(ln)]
print(f"Completed {scan_name} -> {output_path}")
print("No ERROR lines captured." if not errs else f"{len(errs)} ERROR line(s):\n" + "\n".join(errs))

### Advanced Strategies & Layering
The advanced scan mixes:
- Complexity groups (`EASY`, `MODERATE`) for breadth.
- Obfuscations (CharSwap, UnicodeConfusable, Leetspeak, Url) to probe normalization defenses.
- Encoding (Base64, ROT13 via composition) to test decoding / content safety layers.

Capturing stderr lets you quickly surface any internal SDK errors alongside scan results.

## Bring Your Own Objectives: Custom Attack Seed Prompts
You can supply your own domain or application-specific risky prompts as objectives instead of (or in addition to) automatically generated ones.

Format: a JSON file whose entries include `prompt` text and `risk-type` (one of: `violence`, `sexual`, `hate_unfairness`, `self_harm`). The number of prompts provided becomes the effective `num_objectives` for the scan.

Use this when:
- You have proprietary misuse scenarios not covered by generic seeds.
- You want regression tracking on a fixed, curated risky prompt set.
- You need to validate mitigations against previously successful attacks.

Below we instantiate a new `RedTeam` with `custom_attack_seed_prompts` pointing to `data/prompts.json`, then run grouped difficulty strategies.

> Tip: Keep a version-controlled prompts file so additions are reviewable and diffs tie to shifts in ASR.


In [None]:
# Custom prompts RedTeam instance
from pathlib import Path

custom_prompts_path = Path("data/prompts.json")
assert custom_prompts_path.exists(), f"Custom prompts file not found: {custom_prompts_path}"

custom_red_team = RedTeam(
    azure_ai_project=azure_ai_project,
    credential=credential,
    custom_attack_seed_prompts=str(custom_prompts_path),
)
print("Custom RedTeam ready. Prompt count determines num_objectives.")

In [None]:
# Execute scan with custom prompts and grouped difficulty strategies
custom_result = await custom_red_team.scan(
    target=model_target,  # reuse earlier model target callback
    scan_name="Custom-Prompt-Scan",
    attack_strategies=[
        AttackStrategy.EASY,
        AttackStrategy.MODERATE,
        AttackStrategy.DIFFICULT,
    ],
    output_path="Custom-Prompt-Scan.json",
)
print("Custom prompt scan complete -> Custom-Prompt-Scan.json")