
# IT Audit Request Router — LangGraph + Gemini (Colab-Ready)

Rules-first, LLM-second router for IT audit intake. LLM = **Google Gemini** via `google-generativeai`.
Route to **IT Audit Desk** vs **Global Audit Intake**. Produces evidence checklists, ticket/email stubs, and an audit log.


## Install

In [58]:

# %%install
!pip -q install "langgraph>=0.2.36" "google-generativeai>=0.6.0" "pydantic>=2.8" "typing_extensions>=4.12" >/dev/null
print("Dependencies installed.")


Dependencies installed.


In [59]:
!pip install -U google-generativeai




## Configure API key

In [60]:

# %%config
!pip -q install -U google-generativeai langgraph >/dev/null

import os, google.generativeai as genai
from google.api_core.exceptions import Forbidden

# Set GOOGLE_API_KEY before running this cell (e.g., %env GOOGLE_API_KEY=...).
key = (os.getenv("GOOGLE_API_KEY") or "").strip()
if not key:
    raise RuntimeError("GOOGLE_API_KEY is not set.")
genai.configure(api_key=key)

# List ONLY models your key can actually call with generateContent
try:
    _supported = [m.name for m in genai.list_models()
                  if "generateContent" in m.supported_generation_methods]
except Forbidden:
    raise SystemExit(
        "Gemini API is disabled for this project/key. Enable it in GCP console "
        "(generativelanguage.googleapis.com) or use a different key."
    )

if not _supported:
    raise SystemExit("No Gemini models with generateContent available for this key/project.")

# Prefer newer if present; otherwise fall back to what you have (often 'models/gemini-pro').
PREFERRED_ORDER = [
    "models/gemini-1.5-flash-latest",
    "models/gemini-1.5-pro-latest",
    "models/gemini-1.5-flash",
    "models/gemini-1.5-pro",
    "models/gemini-pro",            # legacy; commonly available on v1beta
    "models/gemini-1.0-pro",        # legacy; sometimes available
]
MODEL = next((m for m in PREFERRED_ORDER if m in _supported), _supported[0])

print("Supported models:", _supported)
print("Using model:", MODEL)



Supported models: ['models/gemini-2.5-pro-preview-03-25', 'models/gemini-2.5-flash-preview-05-20', 'models/gemini-2.5-flash', 'models/gemini-2.5-flash-lite-preview-06-17', 'models/gemini-2.5-pro-preview-05-06', 'models/gemini-2.5-pro-preview-06-05', 'models/gemini-2.5-pro', 'models/gemini-2.0-flash-exp', 'models/gemini-2.0-flash', 'models/gemini-2.0-flash-001', 'models/gemini-2.0-flash-exp-image-generation', 'models/gemini-2.0-flash-lite-001', 'models/gemini-2.0-flash-lite', 'models/gemini-2.0-flash-preview-image-generation', 'models/gemini-2.0-flash-lite-preview-02-05', 'models/gemini-2.0-flash-lite-preview', 'models/gemini-2.0-pro-exp', 'models/gemini-2.0-pro-exp-02-05', 'models/gemini-exp-1206', 'models/gemini-2.0-flash-thinking-exp-01-21', 'models/gemini-2.0-flash-thinking-exp', 'models/gemini-2.0-flash-thinking-exp-1219', 'models/gemini-2.5-flash-preview-tts', 'models/gemini-2.5-pro-preview-tts', 'models/learnlm-2.0-flash-experimental', 'models/gemma-3-1b-it', 'models/gemma-3-4b-i

## Domain: frameworks, keywords, evidence library

In [61]:

# %%domain
from enum import Enum
from typing import Dict, List, Optional, Tuple
import re, json, datetime

class Framework(str, Enum):
    SOX = "SOX"; SOC1 = "SOC 1"; SOC2 = "SOC 2"; HIPAA = "HIPAA"
    COBIT = "COBIT"; PCI = "PCI DSS"; ISO27001 = "ISO 27001"; NIST_CSF = "NIST CSF"

KEYWORDS: Dict[Framework, List[str]] = {
    Framework.SOX: ["sox","sarbanes-oxley","icfr","404","302"],
    Framework.SOC1: ["soc 1","ssae 18","isae 3402 type i","isae 3402 type ii"],
    Framework.SOC2: ["soc 2","trust services criteria","tsc","type i","type ii"],
    Framework.HIPAA: ["hipaa","phi","privacy rule","security rule","hitech"],
    Framework.COBIT: ["cobit","control objectives","isaca cobit"],
    Framework.PCI: ["pci","pci dss","roc","saq","qsa","cardholder data","chd"],
    Framework.ISO27001: ["iso 27001","iso/iec 27001","annex a","isms"],
    Framework.NIST_CSF: ["nist csf","identify protect detect respond recover","csf","framework core"],
}

EVIDENCE_LIBRARY: Dict[Framework, List[str]] = {
    Framework.SOX: ["ICFR control list/owners","Narratives/flowcharts","ToD","ToE","Populations & samples"],
    Framework.SOC1: ["System description","Control objectives/controls","Change mgmt/access evidence","Sampling method"],
    Framework.SOC2: ["TSC mapping","Security/Availability/Confidentiality evidence","IR records","Vendor/subservice list"],
    Framework.HIPAA: ["Privacy/Security/Breach policies","ePHI inventory/data flow","Risk analysis/plan","Access logs/audit trails"],
    Framework.PCI: ["CDE diagram","Segmentation evidence","ASV scans/Pentest","Key mgmt/encryption docs"],
    Framework.ISO27001: ["ISMS scope/SoA","Annex A evidence","Internal audit results","Mgmt review & CAPA"],
    Framework.NIST_CSF: ["Current vs Target Profile","Tier assessment","Risk register","IR/BC/DR runbooks"],
}


In [62]:
pip install pydantic[email]



## Models and redaction

In [63]:

# %%models
from pydantic import BaseModel, Field, EmailStr

PII_PATTERNS = [
    re.compile(r"\b\d{3}-\d{2}-\d{4}\b"),
    re.compile(r"\b\d{16}\b"),
    re.compile(r"\b\d{4} \d{4} \d{4} \d{4}\b"),
    re.compile(r"[A-Z0-9._%+-]+@[A-Z0-9.-]+\.[A-Z]{2,}", re.I),
    re.compile(r"\b\d{10}\b"),
]
def redact(text:str)->str:
    for pat in PII_PATTERNS:
        text = pat.sub("[REDACTED]", text)
    return text

class AuditRequest(BaseModel):
    request_id: str
    requester_name: str
    requester_email: EmailStr
    business_unit: str
    title: str
    description: str
    due_date: Optional[str] = None
    attachments: List[str] = Field(default_factory=list)
    def sanitize(self) -> "AuditRequest":
        return AuditRequest(
            request_id=self.request_id,
            requester_name=redact(self.requester_name),
            requester_email=self.requester_email,  # keep actual for comms
            business_unit=redact(self.business_unit),
            title=redact(self.title),
            description=redact(self.description),
            due_date=self.due_date,
            attachments=[redact(a) for a in self.attachments],
        )

class RuleResult(BaseModel):
    matched_frameworks: List[Framework] = []
    confidence: float = 0.0
    reasoning: str = ""

class LLMResult(BaseModel):
    is_it_audit: bool
    matched_frameworks: List[Framework] = []
    confidence: float = 0.0
    rationale: str = ""

class Decision(BaseModel):
    route: str  # IT_AUDIT or GLOBAL_AUDIT
    confidence: float
    reason: str

class EvidenceChecklist(BaseModel):
    framework: Optional[Framework]
    items: List[str]

class TicketStub(BaseModel):
    system: str
    payload: dict

class EmailStub(BaseModel):
    to: str
    subject: str
    body: str


## Rules classifier

In [64]:

# %%rules
from typing import Tuple

def keyword_match(text: str) -> Tuple[List[Framework], float, str]:
    text_l = text.lower()
    hits = []
    rationale = []
    for fw, words in KEYWORDS.items():
        matched = [w for w in words if w in text_l]
        if matched:
            hits.append(fw)
            rationale.append(f"{fw.value}: {', '.join(matched)}")
    conf = min(0.95, 0.5 + 0.1*len(hits)) if hits else 0.1
    reason = "; ".join(rationale) if rationale else "No framework keywords found."
    return hits, conf, reason

def run_rules(req: AuditRequest) -> RuleResult:
    text = f"{req.title}\n{req.description}"
    fws, conf, reason = keyword_match(text)
    return RuleResult(matched_frameworks=fws, confidence=conf, reasoning=reason)


## Gemini verifier

In [65]:

# %%gemini verifier
import re, json
import google.generativeai as genai

def _extract_json(text: str) -> dict:
    try:
        return json.loads(text)
    except Exception:
        pass
    m = re.search(r"\{[\s\S]*\}", text)
    if m:
        try:
            return json.loads(m.group(0))
        except Exception:
            return {"is_it_audit": False, "matched_frameworks": [], "confidence": 0.0, "rationale": text[:400]}
    return {"is_it_audit": False, "matched_frameworks": [], "confidence": 0.0, "rationale": text[:400]}

def llm_verify(title: str, description: str) -> LLMResult:
    prompt = (
        "You are an enterprise audit intake assistant. Determine whether the request concerns IT audit "
        "frameworks (SOX, SOC 1, SOC 2, HIPAA, COBIT, PCI DSS, ISO 27001, NIST CSF). "
        "Return JSON with keys: is_it_audit (bool), matched_frameworks (list of strings), "
        "confidence (0-1), rationale (string).\n\n"
        f"Title: {title}\nDescription: {description}"
    )
    model_obj = genai.GenerativeModel(MODEL)
    resp = model_obj.generate_content(prompt, generation_config={"temperature": 0})
    text = (resp.text or "").strip()
    raw = _extract_json(text)

    # Normalize framework strings -> enums
    mapping = {"SOC1": "SOC 1", "SOC2": "SOC 2", "ISO27001": "ISO 27001", "PCI": "PCI DSS", "NIST": "NIST CSF"}
    fws = []
    for f in raw.get("matched_frameworks", []):
        alias = mapping.get(str(f).strip(), str(f).strip())
        try:
            fws.append(Framework(alias))
        except Exception:
            pass

    return LLMResult(
        is_it_audit=bool(raw.get("is_it_audit", False)),
        matched_frameworks=fws,
        confidence=float(raw.get("confidence", 0.0)),
        rationale=str(raw.get("rationale", ""))[:800],
    )


## LangGraph orchestration

In [66]:

# %%graph
from langgraph.graph import StateGraph, END
from typing import Any, Dict, TypedDict, Optional, List

class GraphState(TypedDict):
    req: AuditRequest
    sanitized: AuditRequest
    rule_result: RuleResult
    llm_result: Optional[LLMResult]
    decision: Optional[Decision]
    evidence: List[EvidenceChecklist]
    ticket: Optional[TicketStub]
    email: Optional[EmailStub]
    log: Dict[str, Any]

IT_AUDIT_EMAIL = "it-audit-desk@example.com"
GLOBAL_INTAKE_EMAIL = "global-audit-intake@example.com"

def n_sanitize(state: GraphState) -> GraphState:
    state["sanitized"] = state["req"].sanitize(); return state

def n_rules(state: GraphState) -> GraphState:
    state["rule_result"] = run_rules(state["sanitized"]); return state

def should_llm(state: GraphState) -> str:
    rr = state["rule_result"]
    return "llm" if (len(rr.matched_frameworks)==0 or rr.confidence < 0.7) else "decide"

def n_llm(state: GraphState) -> GraphState:
    s = state["sanitized"]
    state["llm_result"] = llm_verify(s.title, s.description)
    return state

def n_decide(state: GraphState) -> GraphState:
    rr = state["rule_result"]; lr = state.get("llm_result")
    if rr.matched_frameworks and rr.confidence >= 0.7:
        route, conf = "IT_AUDIT", rr.confidence
        reason = f"Rules matched: {[f.value for f in rr.matched_frameworks]} | {rr.reasoning}"
        fws = rr.matched_frameworks
    else:
        if lr and (lr.is_it_audit or lr.matched_frameworks):
            route, conf = "IT_AUDIT", max(rr.confidence, lr.confidence)
            reason = f"LLM verification: {lr.rationale}"
            fws = lr.matched_frameworks or rr.matched_frameworks
        else:
            route, conf = "GLOBAL_AUDIT", max(rr.confidence, (lr.confidence if lr else 0.5))
            reason = f"No decisive IT frameworks via rules/LLM. Rules: {rr.reasoning}" + (f" | LLM: {lr.rationale}" if lr else "")
            fws = []
    state["decision"] = Decision(route=route, confidence=conf, reason=reason)
    state["evidence"] = [EvidenceChecklist(framework=fw, items=EVIDENCE_LIBRARY.get(fw, [])) for fw in fws] if fws else [EvidenceChecklist(framework=None, items=["Describe ask","Provide scope/timeline/stakeholders","Attach policy/process docs"])]
    return state

def n_artifacts(state: GraphState) -> GraphState:
    d = state["decision"]; s = state["sanitized"]
    if d.route == "IT_AUDIT":
        to, subject, system = IT_AUDIT_EMAIL, f"New IT Audit Request: {s.title}", "ServiceNow-IT-Audit"
    else:
        to, subject, system = GLOBAL_INTAKE_EMAIL, f"Rerouted to Global Audit Intake: {s.title}", "Global-Audit-Intake"
    body = (f"Requester: {s.requester_name} <{s.requester_email}>\nBU: {s.business_unit}\n"
            f"Decision: {d.route} (conf={d.confidence:.2f})\nReason: {d.reason}\n")
    state["email"] = EmailStub(to=to, subject=subject, body=body)
    state["ticket"] = TicketStub(system=system, payload={
        "short_description": f"[{d.route}] {s.title}",
        "description": s.description,
        "caller": str(s.requester_email),
        "business_unit": s.business_unit,
        "matched_frameworks": ", ".join([e.framework.value for e in state["evidence"] if e.framework]) or "N/A"
    })
    state["log"] = {"request_id": state["req"].request_id, "route": d.route, "confidence": d.confidence, "reason": d.reason, "ts": datetime.datetime.utcnow().isoformat() + "Z"}
    return state

graph = StateGraph(GraphState)
graph.add_node("sanitize", n_sanitize)
graph.add_node("rules", n_rules)
graph.add_node("llm", n_llm)
graph.add_node("decide", n_decide)
graph.add_node("artifacts", n_artifacts)
graph.set_entry_point("sanitize")
graph.add_edge("sanitize","rules")
graph.add_conditional_edges("rules", should_llm, {"llm":"llm","decide":"decide"})
graph.add_edge("llm","decide")
graph.add_edge("decide","artifacts")
graph.add_edge("artifacts", END)
app = graph.compile()
print("Graph compiled.")


Graph compiled.


## Demo

In [67]:

# %%demo
import pprint, json, time

samples = [
    AuditRequest(
        request_id="REQ-3001",
        requester_name="Alice Smith",
        requester_email="alice@example.com",
        business_unit="Payments",
        title="SOC 2 Type II evidence for trust criteria",
        description="Need SOC 2 Type II support: security, availability controls, incident response, vendor management evidence.",
        due_date="2025-12-15"
    ),
    AuditRequest(
        request_id="REQ-3002",
        requester_name="Bob Johnson",
        requester_email="bob@example.com",
        business_unit="HR",
        title="Brand compliance review",
        description="Regional brand compliance for a marketing campaign. No IT systems in scope."
    ),
]

results = []
for req in samples:
    state = {"req": req, "sanitized": None, "rule_result": None, "llm_result": None,
             "decision": None, "evidence": [], "ticket": None, "email": None, "log": {}}
    out = app.invoke(state)
    results.append({
        "request": req.model_dump(),
        "decision": out["decision"].model_dump(),
        "evidence": [e.model_dump() for e in out["evidence"]],
        "ticket": out["ticket"].model_dump(),
        "email": out["email"].model_dump(),
        "log": out["log"],
    })
    print("\n=== ", req.request_id, " ===")
    pprint.pp(out["decision"].model_dump())
    pprint.pp(out["ticket"].model_dump())
    pprint.pp(out["email"].model_dump())
    print("Evidence:", [ (e.framework.value if e.framework else "General") for e in out["evidence"] ])
    time.sleep(1)
with open("it_audit_router_gemini_run.json","w") as f:
    json.dump(results, f, indent=2)
print("\nWrote it_audit_router_gemini_run.json")


TooManyRequests: 429 POST https://generativelanguage.googleapis.com/v1beta/models/gemini-2.5-pro-preview-03-25:generateContent?%24alt=json%3Benum-encoding%3Dint: You exceeded your current quota, please check your plan and billing details. For more information on this error, head to: https://ai.google.dev/gemini-api/docs/rate-limits. To monitor your current usage, head to: https://ai.dev/usage?tab=rate-limit.
* Quota exceeded for metric: generativelanguage.googleapis.com/generate_content_free_tier_input_token_count, limit: 0
* Quota exceeded for metric: generativelanguage.googleapis.com/generate_content_free_tier_input_token_count, limit: 0
* Quota exceeded for metric: generativelanguage.googleapis.com/generate_content_free_tier_requests, limit: 0
* Quota exceeded for metric: generativelanguage.googleapis.com/generate_content_free_tier_requests, limit: 0
Please retry in 11.447233913s.

