<a href="https://colab.research.google.com/github/micah-shull/AI_Agents/blob/main/379_GCO_PolicyEval_Utils.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
"""Policy evaluation utilities for Governance & Compliance Orchestrator

Evaluates agent action log events against policy rules to detect violations.
"""

from typing import Dict, Any, List, Optional


def check_condition(
    event: Dict[str, Any],
    condition_key: str,
    condition_value: Any
) -> bool:
    """
    Check if an event matches a policy condition.

    Args:
        event: Agent action log event
        condition_key: Condition key (e.g., "region", "confidence_score_lt")
        condition_value: Condition value to match against

    Returns:
        True if condition matches, False otherwise
    """
    # Handle special condition keys
    if condition_key == "confidence_score_lt":
        confidence = event.get("confidence_score", 1.0)
        return confidence < condition_value

    if condition_key == "deal_size_gt":
        input_data = event.get("input_data", {})
        deal_size = input_data.get("deal_size", 0)
        return deal_size > condition_value

    if condition_key == "input_contains_protected_attribute":
        # Check if input_data contains common protected attributes
        input_data = event.get("input_data", {})
        protected_attrs = ["gender", "age", "race", "religion", "nationality", "disability"]
        return any(attr in input_data for attr in protected_attrs)

    if condition_key == "explanation_missing":
        # Check if output lacks explanation
        output = event.get("output", {})
        return "explanation" not in output and "reasoning" not in output

    # Handle array conditions (e.g., action_type: ["recommendation_generated", "decision_made"])
    if isinstance(condition_value, list):
        event_value = event.get(condition_key)
        return event_value in condition_value

    # Handle nested conditions (e.g., region in input_data)
    if condition_key in ["region", "customer_segment"]:
        input_data = event.get("input_data", {})
        return input_data.get(condition_key) == condition_value

    # Direct match
    return event.get(condition_key) == condition_value


def evaluate_event_against_policy(
    event: Dict[str, Any],
    policy: Dict[str, Any]
) -> Dict[str, Any]:
    """
    Evaluate a single event against a single policy rule.

    Args:
        event: Agent action log event
        policy: Policy rule to evaluate against

    Returns:
        Evaluation result with matched and violation status
    """
    conditions = policy.get("conditions", {})
    required_action = policy.get("required_action")
    severity = policy.get("severity", "medium")

    # Check all conditions
    all_conditions_match = True
    matched_conditions = []

    for condition_key, condition_value in conditions.items():
        matches = check_condition(event, condition_key, condition_value)
        matched_conditions.append({
            "condition": condition_key,
            "value": condition_value,
            "matches": matches
        })
        if not matches:
            all_conditions_match = False

    # Determine if this is a violation
    # A violation occurs when:
    # 1. All conditions match (policy applies)
    # 2. The required action was NOT taken
    violation = False
    reason = ""

    if all_conditions_match:
        if required_action == "human_approval" and not event.get("human_in_the_loop", False):
            violation = True
            reason = f"Policy requires human approval but event had human_in_the_loop=false"
        elif required_action == "human_review" and not event.get("human_in_the_loop", False):
            violation = True
            reason = f"Policy requires human review but event had human_in_the_loop=false"
        elif required_action == "block_and_escalate":
            violation = True
            reason = f"Policy requires blocking and escalation (protected attribute detected)"
        elif required_action == "request_explanation":
            output = event.get("output", {})
            if "explanation" not in output and "reasoning" not in output:
                violation = True
                reason = f"Policy requires explanation but event output lacks explanation"

    return {
        "event_id": event.get("event_id"),
        "policy_id": policy.get("policy_id"),
        "matched": all_conditions_match,
        "violation": violation,
        "severity": severity,
        "required_action": required_action,
        "reason": reason if violation else "Policy conditions matched but no violation detected",
        "matched_conditions": matched_conditions
    }


def evaluate_all_events(
    events: List[Dict[str, Any]],
    policies: List[Dict[str, Any]]
) -> List[Dict[str, Any]]:
    """
    Evaluate all events against all policies.

    Args:
        events: List of agent action log events
        policies: List of policy rules

    Returns:
        List of policy evaluations (one per event-policy combination)
    """
    evaluations = []

    for event in events:
        for policy in policies:
            evaluation = evaluate_event_against_policy(event, policy)
            evaluations.append(evaluation)

    return evaluations

