<a href="https://colab.research.google.com/github/m-zayed5722/Miscellaneous-Projects/blob/main/GuardrailX_Lite.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# GuardrailX Lite: Safety + PII Redaction Pipeline (Colab)
Pipeline:
Input → PII detection → Redaction → Policy checks → Safe Task Execution → Report

Features:
- PII detection (email/phone/credit card/IP)
- Lightweight "policy" checks (restricted requests, risky patterns)
- Safe execution wrapper for downstream GenAI tasks
- Audit report (what was redacted, why blocked, confidence score)

In [1]:
!pip -q install pandas rapidfuzz

[?25l   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/3.2 MB[0m [31m?[0m eta [36m-:--:--[0m[2K   [91m━━━━━━━━━[0m[91m╸[0m[90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.8/3.2 MB[0m [31m23.4 MB/s[0m eta [36m0:00:01[0m[2K   [91m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m[91m╸[0m [32m3.2/3.2 MB[0m [31m60.4 MB/s[0m eta [36m0:00:01[0m[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m3.2/3.2 MB[0m [31m41.6 MB/s[0m eta [36m0:00:00[0m
[?25h

In [2]:
import re
import pandas as pd
from dataclasses import dataclass
from typing import List, Dict, Optional
from rapidfuzz import fuzz

# PII regex patterns (lightweight demo; not perfect)
PII_PATTERNS = {
    "email": re.compile(r"\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Za-z]{2,}\b"),
    "phone": re.compile(r"\b(?:\+?1[-.\s]?)?(?:\(?\d{3}\)?[-.\s]?){1}\d{3}[-.\s]?\d{4}\b"),
    "credit_card": re.compile(r"\b(?:\d[ -]*?){13,19}\b"),  # heuristic
    "ip_address": re.compile(r"\b(?:\d{1,3}\.){3}\d{1,3}\b"),
}

# Very basic risky intent patterns (demo)
RISK_PATTERNS = {
    "credential_theft": [
        r"\bpassword\b", r"\bsteal\b", r"\bhack\b", r"\bphish\b", r"\bkeylogger\b"
    ],
    "weapons_or_harm": [
        r"\bbuild\b.*\bbomb\b", r"\bhow to\b.*\bweapon\b", r"\bpoison\b"
    ],
    "bypass_security": [
        r"\bbypass\b.*\bsecurity\b", r"\bdisable\b.*\bauth\b", r"\bevasion\b"
    ]
}
RISK_PATTERNS = {k: [re.compile(p, re.IGNORECASE) for p in v] for k, v in RISK_PATTERNS.items()}

In [3]:
@dataclass
class PIIMatch:
    pii_type: str
    value: str
    start: int
    end: int

def detect_pii(text: str) -> List[PIIMatch]:
    text = text or ""
    matches: List[PIIMatch] = []
    for pii_type, pattern in PII_PATTERNS.items():
        for m in pattern.finditer(text):
            matches.append(PIIMatch(pii_type, m.group(0), m.start(), m.end()))
    # sort by span
    matches.sort(key=lambda x: (x.start, x.end))
    return matches

def redact_pii(text: str, matches: List[PIIMatch]) -> str:
    if not matches:
        return text
    out = []
    last = 0
    for m in matches:
        out.append(text[last:m.start])
        out.append(f"[REDACTED_{m.pii_type.upper()}]")
        last = m.end
    out.append(text[last:])
    return "".join(out)

def pii_summary(matches: List[PIIMatch]) -> Dict[str, int]:
    counts = {}
    for m in matches:
        counts[m.pii_type] = counts.get(m.pii_type, 0) + 1
    return counts

In [4]:
@dataclass
class PolicyDecision:
    allowed: bool
    reasons: List[str]
    risk_level: str  # "low" | "medium" | "high"
    confidence: float  # 0..1

def policy_check(text: str) -> PolicyDecision:
    t = (text or "").strip()
    if not t:
        return PolicyDecision(False, ["Empty input"], "low", 1.0)

    hits = []
    for category, patterns in RISK_PATTERNS.items():
        for p in patterns:
            if p.search(t):
                hits.append(category)
                break

    if hits:
        # escalate based on category
        risk = "high" if ("credential_theft" in hits or "weapons_or_harm" in hits) else "medium"
        return PolicyDecision(
            allowed=False,
            reasons=[f"Risky intent detected: {', '.join(sorted(set(hits)))}"],
            risk_level=risk,
            confidence=0.85 if risk == "high" else 0.7
        )

    return PolicyDecision(True, ["No risky intent detected"], "low", 0.8)

In [5]:
@dataclass
class GuardrailReport:
    allowed: bool
    redacted_text: str
    pii_counts: Dict[str, int]
    policy: PolicyDecision
    action_taken: str  # "blocked" | "redacted_then_processed" | "processed"
    output: Optional[str]

def safe_run(task_fn, user_text: str) -> GuardrailReport:
    # 1) PII detect + redact
    pii = detect_pii(user_text)
    redacted = redact_pii(user_text, pii)
    counts = pii_summary(pii)

    # 2) policy check (run on original + redacted for safety)
    decision_orig = policy_check(user_text)
    decision_red = policy_check(redacted)
    # choose stricter
    decision = decision_orig if (not decision_orig.allowed) else decision_red

    # 3) block if disallowed
    if not decision.allowed:
        return GuardrailReport(
            allowed=False,
            redacted_text=redacted,
            pii_counts=counts,
            policy=decision,
            action_taken="blocked",
            output=None
        )

    # 4) run task
    output = task_fn(redacted)
    action = "redacted_then_processed" if counts else "processed"

    return GuardrailReport(
        allowed=True,
        redacted_text=redacted,
        pii_counts=counts,
        policy=decision,
        action_taken=action,
        output=output
    )

In [6]:
def task_summarize(text: str) -> str:
    # simple offline "summary" baseline
    text = re.sub(r"\s+", " ", text).strip()
    if len(text) <= 180:
        return f"Summary: {text}"
    return f"Summary: {text[:180]}..."

def task_extract_fields(text: str) -> str:
    # toy extractor: detect team/app/metric keywords
    t = text.lower()
    fields = []
    for kw in ["team", "app", "cost", "incidents", "launch", "director"]:
        if kw in t:
            fields.append(kw)
    return "Detected fields: " + (", ".join(fields) if fields else "(none)")

In [7]:
tests = [
    "My email is mohamed@example.com and my phone is (416) 555-0199. Summarize my message please.",
    "Can you help me hack a password and bypass security?",
    "Extract fields from this: Director Patel wants app launch date and cost per month."
]

for t in tests:
    rep = safe_run(task_summarize, t)
    print("="*80)
    print("INPUT:", t)
    print("ALLOWED:", rep.allowed)
    print("ACTION:", rep.action_taken)
    print("PII:", rep.pii_counts)
    print("POLICY:", rep.policy.risk_level, rep.policy.reasons, f"(conf={rep.policy.confidence})")
    print("REDACTED:", rep.redacted_text)
    print("OUTPUT:", rep.output)

INPUT: My email is mohamed@example.com and my phone is (416) 555-0199. Summarize my message please.
ALLOWED: True
ACTION: redacted_then_processed
PII: {'email': 1, 'phone': 1}
POLICY: low ['No risky intent detected'] (conf=0.8)
REDACTED: My email is [REDACTED_EMAIL] and my phone is ([REDACTED_PHONE]. Summarize my message please.
OUTPUT: Summary: My email is [REDACTED_EMAIL] and my phone is ([REDACTED_PHONE]. Summarize my message please.
INPUT: Can you help me hack a password and bypass security?
ALLOWED: False
ACTION: blocked
PII: {}
POLICY: high ['Risky intent detected: bypass_security, credential_theft'] (conf=0.85)
REDACTED: Can you help me hack a password and bypass security?
OUTPUT: None
INPUT: Extract fields from this: Director Patel wants app launch date and cost per month.
ALLOWED: True
ACTION: processed
PII: {}
POLICY: low ['No risky intent detected'] (conf=0.8)
REDACTED: Extract fields from this: Director Patel wants app launch date and cost per month.
OUTPUT: Summary: Extrac

In [8]:
rows = []
for t in tests:
    rep = safe_run(task_extract_fields, t)
    rows.append({
        "input": t,
        "allowed": rep.allowed,
        "action": rep.action_taken,
        "pii_counts": rep.pii_counts,
        "risk_level": rep.policy.risk_level,
        "reasons": "; ".join(rep.policy.reasons),
        "redacted_preview": rep.redacted_text[:120] + ("..." if len(rep.redacted_text) > 120 else ""),
        "output_preview": (rep.output or "")[:120]
    })

pd.DataFrame(rows)

Unnamed: 0,input,allowed,action,pii_counts,risk_level,reasons,redacted_preview,output_preview
0,My email is mohamed@example.com and my phone i...,True,redacted_then_processed,"{'email': 1, 'phone': 1}",low,No risky intent detected,My email is [REDACTED_EMAIL] and my phone is (...,Detected fields: (none)
1,Can you help me hack a password and bypass sec...,False,blocked,{},high,"Risky intent detected: bypass_security, creden...",Can you help me hack a password and bypass sec...,
2,Extract fields from this: Director Patel wants...,True,processed,{},low,No risky intent detected,Extract fields from this: Director Patel wants...,"Detected fields: app, cost, launch, director"
