In [8]:
import json
import pandas as pd
from pathlib import Path

pd.set_option("display.max_colwidth", 800)

### Load the provided dataset containing simulated generative AI outputs

In [9]:
DATA_PATH = Path("simulated_genai_outputs.jsonl")

records = []
with DATA_PATH.open("r", encoding="utf-8") as f:
    for line in f:
        records.append(json.loads(line))

df = pd.DataFrame(records)
df.head()

Unnamed: 0,timestamp,output_id,text
0,2026-01-10T09:12:44,1,The customer John A. Smith (john.smith@acme-corp.com) reported an issue with order #A839201. Internal ticket created in Jira at https://jira.internal.acme.local/browse/SUP-4421.
1,2026-01-10T09:13:02,2,"Based on the provided context, the employee appears ready for promotion. Manager feedback references internal system HR-PORTAL-7 and employee ID EMP-77421."
2,2026-01-10T09:14:19,3,"Please reset the API key using the following token: sk-test-9f8d7c2a91b34eaa. If the issue persists, contact support@example-support.com."
3,2026-01-10T09:15:41,4,The incident occurred on 03/14/2025 and involved account number 445982331. No customer name was included in the request.
4,2026-01-10T09:16:05,5,"For testing purposes only, the system may reference internal hostnames such as db-prod-02.internal.net or staging-auth.internal.net during debugging."


### Review a sample of the outputs to understand the scenarios and context

In [10]:
df.sample(min(5, len(df)), random_state=7)[["output_id", "timestamp", "text"]]

Unnamed: 0,output_id,timestamp,text
8,9,2026-01-10T09:20:01,Meeting notes stored at https://confluence.internal.acme.local/display/ENG/AI+Roadmap. Discussed vendor contract ending on 12/31/2026.
5,6,2026-01-10T09:17:22,"The user may qualify for benefits; however, additional verification is required. No personal or sensitive information was detected in this output."
0,1,2026-01-10T09:12:44,The customer John A. Smith (john.smith@acme-corp.com) reported an issue with order #A839201. Internal ticket created in Jira at https://jira.internal.acme.local/browse/SUP-4421.
2,3,2026-01-10T09:14:19,"Please reset the API key using the following token: sk-test-9f8d7c2a91b34eaa. If the issue persists, contact support@example-support.com."
1,2,2026-01-10T09:13:02,"Based on the provided context, the employee appears ready for promotion. Manager feedback references internal system HR-PORTAL-7 and employee ID EMP-77421."


### Establish a baseline leakage assessment

In [11]:
import re

# Regex library of common leakage patterns
PATTERNS = {
    "email": re.compile(r"\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Za-z]{2,}\b"),
    "phone_us": re.compile(r"\b(?:\+1[-.\s]?)?(?:\(\d{3}\)|\d{3})[-.\s]?\d{3}[-.\s]?\d{4}\b"),
    "ssn_us": re.compile(r"\b\d{3}-\d{2}-\d{4}\b"),
    "credit_card_like": re.compile(r"\b(?:\d[ -]*?){13,19}\b"),
    "date_mmddyyyy": re.compile(r"\b(0?[1-9]|1[0-2])[/-](0?[1-9]|[12]\d|3[01])[/-](19|20)\d{2}\b"),
    "employee_id_like": re.compile(r"\bEMP-\d{4,}\b", re.IGNORECASE),
    "case_id_like": re.compile(r"\bCASE-\d{4}-\d{2}-\d{3,}\b", re.IGNORECASE),
    "order_id_like": re.compile(r"\b#[A-Z0-9-]{5,}\b"),
    "internal_url": re.compile(r"\bhttps?://[A-Za-z0-9.-]*\b(?:internal|corp|local)\b[A-Za-z0-9./+_-]*\b", re.IGNORECASE),
    "hostname_internal": re.compile(r"\b[A-Za-z0-9-]+\.(?:internal|local|corp)\.[A-Za-z]{2,}\b", re.IGNORECASE),
    "api_key_like": re.compile(r"\b(?:sk-[A-Za-z0-9]{8,}|AKIA[0-9A-Z]{16})\b"),
    "password_like": re.compile(r"\b(?:temp(?:orary)?\s*password\s*:\s*\S+|password\s*:\s*\S+)\b", re.IGNORECASE),
}

def detect_signals(text: str, patterns: dict) -> dict:
    results = {}
    for name, rx in patterns.items():
        matches = rx.findall(text or "")
        # findall may return tuples for some patterns; normalize counts
        if matches and isinstance(matches[0], tuple):
            results[name] = len(matches)
        else:
            results[name] = len(matches)
    results["total_signals"] = sum(results.values())
    return results

baseline_signals = df["text"].apply(lambda t: detect_signals(t, PATTERNS)).apply(pd.Series)
df_base = pd.concat([df, baseline_signals], axis=1)

df_base[["output_id", "total_signals"] + list(PATTERNS.keys())].head()


Unnamed: 0,output_id,total_signals,email,phone_us,ssn_us,credit_card_like,date_mmddyyyy,employee_id_like,case_id_like,order_id_like,internal_url,hostname_internal,api_key_like,password_like
0,1,3,1,0,0,0,0,0,0,0,1,1,0,0
1,2,1,0,0,0,0,0,1,0,0,0,0,0,0
2,3,1,1,0,0,0,0,0,0,0,0,0,0,0
3,4,1,0,0,0,0,1,0,0,0,0,0,0,0
4,5,2,0,0,0,0,0,0,0,0,0,2,0,0


In [12]:
cols = ["output_id", "timestamp", "total_signals"] + list(PATTERNS.keys())
summary_base = (
    df_base[cols]
    .sort_values(["total_signals", "output_id"], ascending=[False, True])
    .reset_index(drop=True)
)

summary_base

Unnamed: 0,output_id,timestamp,total_signals,email,phone_us,ssn_us,credit_card_like,date_mmddyyyy,employee_id_like,case_id_like,order_id_like,internal_url,hostname_internal,api_key_like,password_like
0,1,2026-01-10T09:12:44,3,1,0,0,0,0,0,0,0,1,1,0,0
1,9,2026-01-10T09:20:01,3,0,0,0,0,1,0,0,0,1,1,0,0
2,5,2026-01-10T09:16:05,2,0,0,0,0,0,0,0,0,0,2,0,0
3,7,2026-01-10T09:18:10,2,1,0,0,0,0,0,1,0,0,0,0,0
4,2,2026-01-10T09:13:02,1,0,0,0,0,0,1,0,0,0,0,0,0
5,3,2026-01-10T09:14:19,1,1,0,0,0,0,0,0,0,0,0,0,0
6,4,2026-01-10T09:15:41,1,0,0,0,0,1,0,0,0,0,0,0,0
7,8,2026-01-10T09:19:33,1,0,0,0,0,0,0,0,0,0,0,0,1
8,6,2026-01-10T09:17:22,0,0,0,0,0,0,0,0,0,0,0,0,0
9,10,2026-01-10T09:21:45,0,0,0,0,0,0,0,0,0,0,0,0,0


### Document your baseline findings

In [13]:
RISK_THRESHOLD = 1

risky = summary_base[summary_base["total_signals"] >= RISK_THRESHOLD]
clean = summary_base[summary_base["total_signals"] == 0]

print(f"Total outputs: {len(summary_base)}")
print(f"Risky outputs (>= {RISK_THRESHOLD} signal): {len(risky)}")
print(f"Clean outputs (0 signals): {len(clean)}\n")

# Which categories appear most
category_totals = summary_base[list(PATTERNS.keys())].sum().sort_values(ascending=False)
category_totals

Total outputs: 10
Risky outputs (>= 1 signal): 8
Clean outputs (0 signals): 2



hostname_internal    4
email                3
date_mmddyyyy        2
internal_url         2
password_like        1
case_id_like         1
employee_id_like     1
phone_us             0
credit_card_like     0
ssn_us               0
order_id_like        0
api_key_like         0
dtype: int64

### Apply lightweight privacy defenses to reduce leakage risk

In [14]:
# Rule-based redaction
REDACTIONS = {
    "email": "[REDACTED_EMAIL]",
    "phone_us": "[REDACTED_PHONE]",
    "ssn_us": "[REDACTED_SSN]",
    "credit_card_like": "[REDACTED_CARD]",
    "date_mmddyyyy": "[REDACTED_DATE]",
    "employee_id_like": "[REDACTED_EMPLOYEE_ID]",
    "case_id_like": "[REDACTED_CASE_ID]",
    "order_id_like": "[REDACTED_ORDER_ID]",
    "internal_url": "[REDACTED_INTERNAL_URL]",
    "hostname_internal": "[REDACTED_INTERNAL_HOST]",
    "api_key_like": "[REDACTED_SECRET]",
    "password_like": "[REDACTED_PASSWORD]",
}

def redact_text(text: str, patterns: dict, replacements: dict) -> str:
    redacted = text or ""
    # Apply more specific patterns first to reduce accidental over-redaction
    for name, rx in patterns.items():
        token = replacements.get(name, "[REDACTED]")
        redacted = rx.sub(token, redacted)
    return redacted

df_mitigated = df_base.copy()
df_mitigated["text_mitigated"] = df_mitigated["text"].apply(lambda t: redact_text(t, PATTERNS, REDACTIONS))
df_mitigated[["output_id", "text", "text_mitigated"]].head(3)


Unnamed: 0,output_id,text,text_mitigated
0,1,The customer John A. Smith (john.smith@acme-corp.com) reported an issue with order #A839201. Internal ticket created in Jira at https://jira.internal.acme.local/browse/SUP-4421.,The customer John A. Smith ([REDACTED_EMAIL]) reported an issue with order #A839201. Internal ticket created in Jira at [REDACTED_INTERNAL_URL].
1,2,"Based on the provided context, the employee appears ready for promotion. Manager feedback references internal system HR-PORTAL-7 and employee ID EMP-77421.","Based on the provided context, the employee appears ready for promotion. Manager feedback references internal system HR-PORTAL-7 and employee ID [REDACTED_EMPLOYEE_ID]."
2,3,"Please reset the API key using the following token: sk-test-9f8d7c2a91b34eaa. If the issue persists, contact support@example-support.com.","Please reset the API key using the following token: sk-test-9f8d7c2a91b34eaa. If the issue persists, contact [REDACTED_EMAIL]."


### Re-evaluate the same outputs after defenses are applied

In [15]:
after_signals = df_mitigated["text_mitigated"].apply(lambda t: detect_signals(t, PATTERNS)).apply(pd.Series)
df_after = pd.concat([df_mitigated, after_signals.add_prefix("after_")], axis=1)

compare_cols = (
    ["output_id", "total_signals", "after_total_signals"]
    + [k for k in PATTERNS.keys()]
    + [f"after_{k}" for k in PATTERNS.keys()]
)

comparison = df_after[compare_cols].copy()
comparison["delta_total"] = comparison["after_total_signals"] - comparison["total_signals"]

comparison.sort_values(["delta_total", "output_id"]).head(10)

Unnamed: 0,output_id,total_signals,after_total_signals,email,phone_us,ssn_us,credit_card_like,date_mmddyyyy,employee_id_like,case_id_like,...,after_credit_card_like,after_date_mmddyyyy,after_employee_id_like,after_case_id_like,after_order_id_like,after_internal_url,after_hostname_internal,after_api_key_like,after_password_like,delta_total
0,1,3,0,1,0,0,0,0,0,0,...,0,0,0,0,0,0,0,0,0,-3
8,9,3,0,0,0,0,0,1,0,0,...,0,0,0,0,0,0,0,0,0,-3
4,5,2,0,0,0,0,0,0,0,0,...,0,0,0,0,0,0,0,0,0,-2
6,7,2,0,1,0,0,0,0,0,1,...,0,0,0,0,0,0,0,0,0,-2
1,2,1,0,0,0,0,0,0,1,0,...,0,0,0,0,0,0,0,0,0,-1
2,3,1,0,1,0,0,0,0,0,0,...,0,0,0,0,0,0,0,0,0,-1
3,4,1,0,0,0,0,0,1,0,0,...,0,0,0,0,0,0,0,0,0,-1
7,8,1,0,0,0,0,0,0,0,0,...,0,0,0,0,0,0,0,0,0,-1
5,6,0,0,0,0,0,0,0,0,0,...,0,0,0,0,0,0,0,0,0,0
9,10,0,0,0,0,0,0,0,0,0,...,0,0,0,0,0,0,0,0,0,0


In [16]:
before_total = int(df_after["total_signals"].sum())
after_total = int(df_after["after_total_signals"].sum())

print(f"Total signals BEFORE: {before_total}")
print(f"Total signals AFTER:  {after_total}")
print(f"Net change (AFTER - BEFORE): {after_total - before_total}")

by_category = pd.DataFrame({
    "before": df_after[list(PATTERNS.keys())].sum(),
    "after": df_after[[f"after_{k}" for k in PATTERNS.keys()]].sum().rename(lambda s: s.replace("after_", "")),
})
by_category["delta"] = by_category["after"] - by_category["before"]
by_category.sort_values("delta")

Total signals BEFORE: 14
Total signals AFTER:  0
Net change (AFTER - BEFORE): -14


Unnamed: 0,before,after,delta
hostname_internal,4,0,-4
email,3,0,-3
internal_url,2,0,-2
date_mmddyyyy,2,0,-2
employee_id_like,1,0,-1
password_like,1,0,-1
case_id_like,1,0,-1
credit_card_like,0,0,0
order_id_like,0,0,0
phone_us,0,0,0


### Assess defense effectiveness and document results

In [17]:
def classify_effectiveness(before: int, after: int) -> str:
    if before == 0 and after == 0:
        return "N/A (no signals)"
    if before > 0 and after == 0:
        return "Effective"
    if before > 0 and 0 < after < before:
        return "Partially effective"
    if before > 0 and after >= before:
        return "Ineffective"
    # edge case: before==0, after>0 (shouldn't happen, but guard)
    return "Regression"

effectiveness = []
for k in PATTERNS.keys():
    b = int(by_category.loc[k, "before"])
    a = int(by_category.loc[k, "after"])
    effectiveness.append({
        "category": k,
        "before": b,
        "after": a,
        "effectiveness": classify_effectiveness(b, a),
    })

effectiveness_df = pd.DataFrame(effectiveness).sort_values(["effectiveness", "category"])
effectiveness_df

Unnamed: 0,category,before,after,effectiveness
6,case_id_like,1,0,Effective
4,date_mmddyyyy,2,0,Effective
0,email,3,0,Effective
5,employee_id_like,1,0,Effective
9,hostname_internal,4,0,Effective
8,internal_url,2,0,Effective
11,password_like,1,0,Effective
10,api_key_like,0,0,N/A (no signals)
3,credit_card_like,0,0,N/A (no signals)
7,order_id_like,0,0,N/A (no signals)


In [18]:
# Show a couple of the highest-risk examples
top = df_after.sort_values("total_signals", ascending=False).head(3)

for _, row in top.iterrows():
    print("\n" + "="*100)
    print(f"OUTPUT_ID: {row['output_id']} | timestamp: {row['timestamp']} | baseline signals: {int(row['total_signals'])}")
    print("-"*100)
    print("BEFORE:\n")
    print(row["text"])
    print("\nAFTER (mitigated):\n")
    print(row["text_mitigated"])


OUTPUT_ID: 001 | timestamp: 2026-01-10T09:12:44 | baseline signals: 3
----------------------------------------------------------------------------------------------------
BEFORE:

The customer John A. Smith (john.smith@acme-corp.com) reported an issue with order #A839201. Internal ticket created in Jira at https://jira.internal.acme.local/browse/SUP-4421.

AFTER (mitigated):

The customer John A. Smith ([REDACTED_EMAIL]) reported an issue with order #A839201. Internal ticket created in Jira at [REDACTED_INTERNAL_URL].

OUTPUT_ID: 009 | timestamp: 2026-01-10T09:20:01 | baseline signals: 3
----------------------------------------------------------------------------------------------------
BEFORE:

Meeting notes stored at https://confluence.internal.acme.local/display/ENG/AI+Roadmap. Discussed vendor contract ending on 12/31/2026.

AFTER (mitigated):

Meeting notes stored at [REDACTED_INTERNAL_URL]. Discussed vendor contract ending on [REDACTED_DATE].

OUTPUT_ID: 007 | timestamp: 2026-01

### Design a structured incident response protocol and write-up 

In [19]:
INCIDENT_RESPONSE_PROTOCOL = {
    "detection_and_escalation": {
        "signals": [
            "credentials or secrets (API keys, tokens, passwords)",
            "direct personal identifiers (email, phone, SSN, full name tied to identifiers)",
            "financial identifiers (account numbers, credit card-like strings)",
            "internal URLs/hostnames or non-public system references",
        ],
        "severity_tiers": {
            "SEV-1": "Secrets/credentials exposed OR SSN/payment data exposed OR large-scale exposure across many outputs",
            "SEV-2": "Direct identifiers exposed (email/phone) OR internal system links exposed",
            "SEV-3": "Indirect identifiers or ambiguous internal references; potential false positives; requires triage",
        },
        "escalation_criteria": [
            "Any SEV-1 signal detected",
            "Repeated SEV-2 signals within a short time window",
            "Customer-facing exposure vs internal-only exposure",
        ],
        "owners": {
            "primary": "Security Incident Commander",
            "supporting": ["Privacy Officer", "AI/ML Owner", "SRE/Platform Owner", "Legal/Compliance"],
        },
    },
    "containment": {
        "immediate_actions": [
            "Disable or gate the feature (or route outputs through strict filtering)",
            "Block known risky prompts or data sources feeding the model",
            "Rotate any exposed secrets immediately (keys/tokens/passwords)",
            "Restrict access to logs and preserve evidence (immutable storage)",
        ],
        "short_term_actions": [
            "Deploy updated redaction rules and regression tests",
            "Add allow-lists for approved domains and deny-lists for internal domains",
            "Add human approval for high-risk workflows",
        ],
    },
    "notification_and_documentation": {
        "notify": [
            "Security team and on-call",
            "Product owner / service owner",
            "Privacy + Legal (if regulated data is involved)",
            "Customer/partner communications (if external exposure confirmed)",
        ],
        "record": [
            "What data was exposed, where, and for how long",
            "Which outputs/IDs were impacted",
            "Root cause hypothesis and timeline",
            "Containment actions taken and verification evidence",
        ],
    },
    "post_incident_review": {
        "root_cause": [
            "Identify source of sensitive data (prompt, retrieval, logs, training artifact, tool output)",
            "Determine why controls failed (missing pattern, inadequate prompt constraint, bypass)",
        ],
        "remediation": [
            "Expand detection patterns + add unit tests for known leak formats",
            "Add automated canary checks and monitoring dashboards",
            "Update policies and developer guidance for safe prompting and data handling",
            "Run a privacy red-team simulation and document outcomes",
        ],
        "follow_up": [
            "Track action items to completion with owners and due dates",
            "Add regression evaluation to release pipeline",
        ],
    },
}

INCIDENT_RESPONSE_PROTOCOL


{'detection_and_escalation': {'signals': ['credentials or secrets (API keys, tokens, passwords)',
   'direct personal identifiers (email, phone, SSN, full name tied to identifiers)',
   'financial identifiers (account numbers, credit card-like strings)',
   'internal URLs/hostnames or non-public system references'],
  'severity_tiers': {'SEV-1': 'Secrets/credentials exposed OR SSN/payment data exposed OR large-scale exposure across many outputs',
   'SEV-2': 'Direct identifiers exposed (email/phone) OR internal system links exposed',
   'SEV-3': 'Indirect identifiers or ambiguous internal references; potential false positives; requires triage'},
  'escalation_criteria': ['Any SEV-1 signal detected',
   'Repeated SEV-2 signals within a short time window',
   'Customer-facing exposure vs internal-only exposure'],
  'owners': {'primary': 'Security Incident Commander',
   'supporting': ['Privacy Officer',
    'AI/ML Owner',
    'SRE/Platform Owner',
    'Legal/Compliance']}},
 'containment

In [20]:
controls = [
    "Regex-based detection across outputs for common sensitive patterns (emails, IDs, internal links, secrets).",
    "Baseline risk summary to identify which outputs contain leakage signals and which categories dominate.",
    "Rule-based redaction/filtering to remove or mask sensitive patterns in generated text.",
    "Simulated prompt constraints to discourage disclosure during generation (policy-style instruction).",
    "Before/after re-evaluation using identical measures to produce evidence of risk reduction.",
]

procedures = [
    "Defined severity tiers (SEV-1 to SEV-3) and escalation triggers for suspected leakage events.",
    "Containment steps including feature gating, output filtering, evidence preservation, and secret rotation.",
    "Notification plan spanning Security, Privacy, Legal/Compliance, and service owners.",
    "Post-incident review workflow to identify root cause, implement remediation, and add regression tests.",
]

writeup = []
writeup.append("### Security Summary (Controls and Safeguards)\n")
writeup.append("**Technical controls implemented:**\n")
for c in controls:
    writeup.append(f"- {c}")
writeup.append("\n**Procedural safeguards designed:**\n")
for p in procedures:
    writeup.append(f"- {p}")

print("\n".join(writeup))

### Security Summary (Controls and Safeguards)

**Technical controls implemented:**

- Regex-based detection across outputs for common sensitive patterns (emails, IDs, internal links, secrets).
- Baseline risk summary to identify which outputs contain leakage signals and which categories dominate.
- Rule-based redaction/filtering to remove or mask sensitive patterns in generated text.
- Simulated prompt constraints to discourage disclosure during generation (policy-style instruction).
- Before/after re-evaluation using identical measures to produce evidence of risk reduction.

**Procedural safeguards designed:**

- Defined severity tiers (SEV-1 to SEV-3) and escalation triggers for suspected leakage events.
- Containment steps including feature gating, output filtering, evidence preservation, and secret rotation.
- Notification plan spanning Security, Privacy, Legal/Compliance, and service owners.
- Post-incident review workflow to identify root cause, implement remediation, and add re