In [1]:
import os, json, re, time

def _vertex_available():
    try:
        import vertexai  # noqa
        return os.environ.get("GOOGLE_CLOUD_PROJECT") is not None
    except Exception:
        return False

def _make_vertex_call_llm():
    import vertexai
    from vertexai.generative_models import GenerativeModel, GenerationConfig
    from google.api_core.exceptions import NotFound, PermissionDenied, FailedPrecondition

    PROJECT  = os.environ.get("GOOGLE_CLOUD_PROJECT")
    MODEL = os.environ["GOOGLE_CLOUD_VERTEX_MODEL"] = "gemini-2.5-flash"
    REGION = os.environ["GOOGLE_CLOUD_LOCATION"] = "global"


    if not PROJECT:
        raise EnvironmentError("GOOGLE_CLOUD_PROJECT not set")

    vertexai.init(project=PROJECT, location=REGION)

    _cache = {}

    def _safe_json(text: str) -> str:
        try:
            return json.dumps(json.loads(text))
        except Exception:
            t = text.strip()
            if t.startswith("```"):
                t = t.strip("`").split("\n", 1)[-1]
                try:
                    return json.dumps(json.loads(t))
                except Exception:
                    pass
            return json.dumps({"raw": text})

    def call_llm_vertex(
        prompt: str,
        system: str | None = None,
        json_schema: dict | None = None,
        model_name: str = MODEL,
        temperature: float = 0.2,
        max_output_tokens: int = 4096,
        top_p: float = 0.95,
        top_k: int = 40,
    ) -> str:
        key = (system or "")
        if key not in _cache:
            kwargs = {}
            if system:
                kwargs["system_instruction"] = system
            try:
                model = GenerativeModel(model_name=model_name, **kwargs)
                _ = model.generate_content("ping", generation_config=GenerationConfig(max_output_tokens=1))
                _cache[key] = model
            except (NotFound, PermissionDenied, FailedPrecondition) as e:
                raise RuntimeError(
                    f"Vertex model not accessible: region={REGION}, model={MODEL}. "
                    "Enable Vertex AI API, grant Vertex/Generative AI roles, and ensure org policy allows this model/region."
                ) from e

        model = _cache[key]

        if json_schema:
            gen_cfg = GenerationConfig(
                temperature=temperature,
                max_output_tokens=max_output_tokens,
                top_p=top_p,
                top_k=top_k,
                response_mime_type="application/json",
                response_schema=json_schema,
            )
        else:
            gen_cfg = GenerationConfig(
                temperature=temperature,
                max_output_tokens=max_output_tokens,
                top_p=top_p,
                top_k=top_k,
            )

        last_err = None
        for attempt in range(3):
            try:
                resp = model.generate_content(prompt, generation_config=gen_cfg)
                text = getattr(resp, "text", None)
                if text is None:
                    parts = []
                    for c in getattr(resp, "candidates", []) or []:
                        for p in getattr(c, "content", []).parts:
                            parts.append(getattr(p, "text", "") or str(p))
                    text = "\n".join([p for p in parts if p])
                return _safe_json(text) if json_schema else text
            except Exception as e:
                last_err = e
                time.sleep(0.5 * (2 ** attempt))
        raise last_err

    return call_llm_vertex

# ---- choose active path ----
USE_VERTEX = _vertex_available()
try:
    call_llm = _make_vertex_call_llm()
    if not USE_VERTEX:
        print("[INFO] Vertex AI not available or GOOGLE_CLOUD_PROJECT unset; using safe fallback.")
except Exception as _e:
    print(f"[WARN] Vertex AI unavailable ({type(_e).__name__}: {_e}). Using safe fallback.")


In [2]:
import json
import random
from typing import Dict, Any, Callable

INTENT_DEFINITIONS = [
    {
        "intent": "Greeting",
        "description": "A simple greeting or conversational opening. (e.g., 'Hello', 'Hi')"
    },
    {
        "intent": "FAQ_Simple",
        "description": "A simple question that can be answered with a knowledge base lookup. (e.g., 'What is the return policy?', 'When is my bill?')"
    },
    {
        "intent": "Product_Inquiry",
        "description": "A user is asking for details about a product or service. (e.g., 'Does this work with a Mac?', 'Is the Nespresso machine red?')"
    },
    {
        "intent": "Billing_Question",
        "description": "A user has a specific question about a charge or their bill. (e.g., 'Why was I charged $50?', 'What is this fee?')"
    },
    {
        "intent": "Account_Lock",
        "description": "A user is locked out of their account or needs a password reset."
    },
    {
        "intent": "Bug_Report_Complex",
        "description": "A user is reporting a complex technical issue. (e.g., 'My site is down', 'I see error 404')"
    },
    {
        "intent": "Cancel_Order",
        "description": "A user explicitly wants to cancel a recent order. (e.g., 'I need to cancel order #12345')"
    },
    {
        "intent": "Unknown",
        "description": "The user's query is unclear, out of scope, or cannot be categorized."
    }
]

PROMPT_TEMPLATES = {
    "1.0": {
        "system_persona": "You are a specialized intent classifier. Your single function is to output accurate JSON.",
        "n_shot_examples": [
            {"query": "My account is locked.", "intent": "Account_Lock", "confidence": "0.98"},
            {"query": "When is my bill due?", "intent": "Billing_FAQ", "confidence": "0.95"},
            {"query": "What is the store refund policy?", "intent": "FAQ_Simple", "confidence": "0.99"}
        ]
    },
    "2.0": {
        "system_persona": "You are a general classifier focused on maximizing coverage.",
        "n_shot_examples": [
            {"query": "Help me reset.", "intent": "Account_Lock", "confidence": "0.85"}
        ]
    }
}

def get_classification_schema() -> Dict[str, Any]:
    """
    [BEHAVIOR] Defines the JSON Schema for the classification output.
    """
    return {
        "type": "object",
        "properties": {
            "intent": {
                "type": "string",
                "description": "The primary intent of the user's query (e.g., FAQ_Simple, Bug_Report_Complex)."
            },
            "confidence": {
                "type": "number",
                "description": "A confidence score between 0.0 and 1.0 that the classification is correct."
            }
        },
        "required": ["intent", "confidence"]
    }

def get_prompt_template(version: str) -> Dict[str, Any]:
    """[BEHAVIOR] Implements Prompt Versioning.
    Retrieves a specific, versioned template definition.
    """
    return PROMPT_TEMPLATES.get(version, PROMPT_TEMPLATES["1.0"])


def classify_and_structure(user_query: str) -> Dict[str, Any]:
    """[BEHAVIOR] Implements Structured Prompting, Templating, and N-Shot Prompting.
    Constructs a detailed prompt to force the LLM into a reliable classification behavior.
    """
    # 1. Prompt Versioning: Select the latest stable version
    template = get_prompt_template("1.0")
    
    # Construct N-Shot examples string
    n_shot_str = "\n".join([
        f"INPUT: {ex['query']}\nOUTPUT: {json.dumps({'intent': ex['intent'], 'confidence': ex['confidence']})}"
        for ex in template['n_shot_examples']
    ])
    
    # 2. Structured Prompt with Prompt Templating
    # This template is used to construct the final instruction to the LLM
    valid_intents_str = "\n".join([
        f"- {i['intent']}: {i['description']}" for i in INTENT_DEFINITIONS
    ])
    
    structured_prompt_template = (
        f"{template['system_persona']}\n\n"
        f"You must analyze the user's INPUT and return ONLY a single, valid JSON object.\n"
        f"Your output JSON must contain 'intent' (string) and 'confidence' (float, 0.0-1.0).\n\n"
        f"--- VALID INTENTS ---\n"
        f"You MUST choose one of the following intents:\n"
        f"{valid_intents_str}\n\n"
        f"--- EXAMPLES ({len(template['n_shot_examples'])} Shot Prompting) ---\n"
        f"{n_shot_str}\n\n"
        f"--- CLASSIFICATION TASK ---\n"
        f"INPUT: {user_query}\n"
        f"OUTPUT:"
    )

    # 3. Call LLM with the structured prompt
    raw_response = call_llm(
        prompt=structured_prompt_template, 
        json_schema=get_classification_schema()
    )    
    
    # Parse the JSON response
    try:
        # Assuming the LLM returns only the JSON object
        data = json.loads(raw_response)
        data["priority"] = "High" if data.get("confidence", 0.0) < 0.6 else "Medium"
        return data
    except Exception as e:
        print(f"Error parsing LLM response: {e}")
        return {"intent": "Error", "priority": "High", "confidence": 0.0}
    
# print(classify_and_structure("can I return Nespresso"))

In [3]:
pip install chromadb sentence-transformers


[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m25.2[0m[39;49m -> [0m[32;49m25.3[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip install --upgrade pip[0m
Note: you may need to restart the kernel to use updated packages.


In [4]:
# retrieval.py
import chromadb
from sentence_transformers import SentenceTransformer
from typing import List, Dict, Any
from chromadb.utils.embedding_functions import SentenceTransformerEmbeddingFunction

# --- GLOBAL COMPONENTS (RAG Infrastructure) ---
# 1. Embedding Model: Used to convert query/documents into vectors
EMBEDDING_MODEL = SentenceTransformer('all-MiniLM-L6-v2') 
# Define the embedding function for Chroma
# def embed_function(texts: List[str]) -> List[List[float]]:
#     return EMBEDDING_MODEL.encode(texts).tolist()

# 2. Vector DB Client and Collection (The "real" Vector DB)
CLIENT = chromadb.Client()
COLLECTION_NAME = "company_knowledge_base"
EMBEDDING_FUNCTION = SentenceTransformerEmbeddingFunction(model_name='all-MiniLM-L6-v2')

# --- COMPANY KNOWLEDGE DATA ---
COMPANY_DOCUMENTS = [
    {"id": "doc_001", "content": "The standard return policy is 30 days for electronics (like Nespresso machines), provided the original packaging is intact.", "source": "Policy"},
    {"id": "doc_002", "content": "Error code 404 is a client-side network error, typically resolved by clearing the browser cache or checking the URL.", "source": "TechDoc"},
    {"id": "doc_003", "content": "Annual service fees are due on the 1st of January and are non-refundable after 7 days. The monthly subscription fee is $50.", "source": "Billing"},
    {"id": "doc_004", "content": "Our new security update requires all users to enable two-factor authentication by the end of the quarter.", "source": "Policy"},
    {"id": "doc_005", "content": "To cancel an order, you must log in to your portal and hit 'Cancel' within 2 hours of placing the order. After 2 hours, orders cannot be canceled.", "source": "Policy"},
    {"id": "doc_006", "content": "The Nespresso Vertuo machine is available in Red, Black, and Chrome.", "source": "Product"}
]

def setup_vector_db():
    """Initializes the Vector DB and indexes the documents."""
    print("‚ú® [VectorDB Setup] Initializing ChromaDB and indexing data...")
    # Create the collection, specifying the embedding function
    collection = CLIENT.get_or_create_collection(
        name=COLLECTION_NAME, 
        embedding_function=EMBEDDING_FUNCTION
    )

    documents = [d['content'] for d in COMPANY_DOCUMENTS]
    metadatas = [{"source": d['source']} for d in COMPANY_DOCUMENTS]
    ids = [d['id'] for d in COMPANY_DOCUMENTS]

    # Index the documents
    collection.add(
        documents=documents,
        metadatas=metadatas,
        ids=ids
    )
    print(f"‚úÖ [VectorDB Setup] Indexed {len(documents)} documents into '{COLLECTION_NAME}'.")
    return collection

# Initialize the collection (run this once when the application starts)
KNOWLEDGE_COLLECTION = setup_vector_db()


def perform_rag_search(query: str, required_k: int = 3) -> str:
    """
    [RETRIEVAL] Implements the RAG pattern with Data Source Router and Ranking.
    Uses the ChromaDB client to perform real vector search.
    """
    if KNOWLEDGE_COLLECTION is None:
        print("üö® [Retrieval] Vector DB not initialized.")
        return ""
        
    print(f"üìö [Retrieval] Searching Vector DB for relevant context (k={required_k})...")
    
    # 1. Retrieval Router (Metadata Filter)
    # This acts as a router/filter to scope the vector search
    metadata_filter = {}
    if any(keyword in query.lower() for keyword in ["policy", "refund", "return", "cancel"]):
        metadata_filter = {"source": "Policy"}
        print("  -> Retrieval Router applied filter: 'Policy'")
    elif any(keyword in query.lower() for keyword in ["error", "fix", "cache", "404"]):
        metadata_filter = {"source": "TechDoc"}
        print("  -> Retrieval Router applied filter: 'TechDoc'")
    elif any(keyword in query.lower() for keyword in ["bill", "charge", "fee", "$50"]):
        metadata_filter = {"source": "Billing"}
        print("  -> Retrieval Router applied filter: 'Billing'")
    elif any(keyword in query.lower() for keyword in ["color", "Nespresso", "mac"]):
        metadata_filter = {"source": "Product"}
        print("  -> Retrieval Router applied filter: 'Product'")
    
   # 2. Vector Search (Retrieval) 
    if metadata_filter:
        print(f"  -> Retrieval Router applied filter: {metadata_filter}")
        results = KNOWLEDGE_COLLECTION.query(
            query_texts=[query],
            n_results=required_k,
            where=metadata_filter # Apply the router filter
        )
    else:
        print("  -> Retrieval Router: No filter applied, searching all documents.")
        results = KNOWLEDGE_COLLECTION.query(
            query_texts=[query],
            n_results=required_k
            # No 'where' parameter is passed
        )

    if not results or not results.get('documents') or not results['documents'][0]:
        print("‚ö†Ô∏è No context retrieved.")
        return ""

    # 3. Ranking (Slicing)
    # Chroma returns results sorted by distance. For this demo, "ranking"
    # means we just take the top 'required_k' that Chroma already ranked.
    # A true "Re-ranking" pattern would fetch more (e.g., k*2) and use a
    # second model to re-order them for relevance.
    
    ranked_chunks = []
    for i, doc in enumerate(results['documents'][0]):
        chunk = {
            "text": doc,
            "source": results['metadatas'][0][i]['source'],
            "similarity_score": 1.0 - results['distances'][0][i] 
        }
        ranked_chunks.append(chunk)

    final_chunks = ranked_chunks[:required_k]

    # 4. Context Formatting (for the LLM)
    context_list = []
    for i, chunk in enumerate(final_chunks):
        context_list.append(
            f"Source {i+1} [Type: {chunk['source']} | Score: {chunk['similarity_score']:.3f}]: {chunk['text']}"
        )

    print(f"  -> Ranking completed. Selected {len(final_chunks)} chunks for context.")
    return "\n---\n".join(context_list)

‚ú® [VectorDB Setup] Initializing ChromaDB and indexing data...
‚úÖ [VectorDB Setup] Indexed 6 documents into 'company_knowledge_base'.


In [5]:
# governance.py
import random
from typing import Dict, Any, Optional

MIN_CONFIDENCE_THRESHOLD = 0.85 # Policy for classification confidence
MIN_FAITHFULNESS_SCORE = 0.75   # Policy for LLM Judge output

def get_judge_schema() -> Dict[str, Any]:
    """Defines the JSON Schema for the Judge's structured output."""
    return {
        "type": "object",
        "properties": {
            "evaluation": {"type": "string", "enum": ["PASS", "FAIL"]},
            "reasoning": {"type": "string"},
            "faithfulness_score": {"type": "number"}
        },
        "required": ["evaluation", "reasoning", "faithfulness_score"]
    }
def check_guardrails(
    text: str, 
    is_input: bool, 
    user_query: Optional[str] = None, 
    rag_context: Optional[str] = None
) -> Dict[str, Any]:
    """
    [GOVERNANCE] Comprehensive Guardrail Check (Simple + LLM Judge).
    Returns a dictionary: {"status": "PASS"/"FAIL", "reason": "..."}
    """
    
    # 1. INPUT Guardrails (Fast Checks)
    if is_input:
        if "admin access" in text.lower():
            return {"status": "FAIL", "reason": "Input BLOCKED: Prompt injection attempt."}
        if any(w in text.lower() for w in ["kill", "bomb", "threat"]):
            return {"status": "FAIL", "reason": "Input BLOCKED: Detected toxic language."}
        print("‚úÖ [Guardrail] Input passed safety checks.")
        return {"status": "PASS"}

    
    # 2. OUTPUT Guardrails (Simple/Fast Checks)
    if any(s in text for s in ["123-456-7890", "confidential"]):
        return {"status": "FAIL", "reason": "Output BLOCKED: Detected sensitive data (PII)."}
    if any(w in text.lower() for w in ["hate", "illegal"]):
        return {"status": "FAIL", "reason": "Output BLOCKED: Detected toxic language."}
    print("‚úÖ [Guardrail] Output passed simple safety checks.")
    
    # 3. LLM Judge Guardrail (Complex Check for RAG Quality)
    # Only run the complex Judge if we have context (i.e., this was a RAG answer)
    if rag_context and user_query:
        print("‚öñÔ∏è [Judge] Starting LLM-as-a-Judge evaluation for faithfulness...")
        
        judge_prompt = (
            f"You are a Quality Control Judge for a customer support bot. "
            f"Your task is to evaluate a generated answer based on three criteria: "
            f"**Faithfulness/Hallucination**, **Helpfulness**, and **Safety**.\n\n"

            f"--- CONTEXT ---\n{rag_context}\n\n"
            f"--- USER QUERY ---\n{user_query}\n\n"
            f"--- ANSWER TO EVALUATE ---\n{text}\n\n"

            f"--- EVALUATION TASK ---\n"
            f"1. **Faithfulness/Hallucination**: Does the answer only contain facts that are explicitly supported by the CONTEXT? If yes, score 1.0. If it contains unsupported facts (Hallucination), score 0.0.\n"
            f"2. **Helpfulness**: Does the answer directly address the user query?\n"
            f"3. **Verdict**: If the faithfulness score is below {MIN_FAITHFULNESS_SCORE}, or if the answer is unhelpful, set 'evaluation' to 'FAIL'. Otherwise, 'PASS'.\n\n"

            f"You must return ONLY a single, valid JSON object matching the required schema."
        )
        judge_model = "gemini-2.5-pro"
        
        try:
            raw_response = call_llm(prompt=judge_prompt, model_name=judge_model, json_schema=get_judge_schema())
            evaluation = json.loads(raw_response)
            
            # Policy Enforcement: Check faithfulness score
            if evaluation.get("faithfulness_score", 0.0) < MIN_FAITHFULNESS_SCORE:
                return {
                    "status": "FAIL", 
                    "reason": f"Judge FAIL: Hallucination score ({evaluation['faithfulness_score']:.2f}) below policy.",
                    "details": evaluation
                }
            
            print(f"‚úÖ [Judge] Hallucination Check PASSED (Faithfulness: {evaluation.get('faithfulness_score'):.2f}).")
            return {"status": "PASS", "details": evaluation}
            
        except Exception as e:
            # If the Judge itself fails to run or parse, fail the process for safety
            return {"status": "FAIL", "reason": f"Judge Execution Error: {e}"}

    # 4. Final PASS (For non-RAG output that passed simple checks)
    return {"status": "PASS"}

def check_for_escalation(intent: str, confidence: float) -> bool:
    """
    [GOVERNANCE] Implements the Confidence & Escalation Pattern.
    Returns True if the request MUST be escalated.
    """
    if intent in ["Error", "Unknown"]:
        print(f"üö® [Governance] Intent '{intent}' requires immediate escalation.")
        return True
        
    if confidence < MIN_CONFIDENCE_THRESHOLD:
        print(f"üö® [Governance] Confidence ({confidence:.2f}) is below policy threshold ({MIN_CONFIDENCE_THRESHOLD}). ESCALATION REQUIRED.")
        return True
    
    print(f"‚úÖ [Governance] Confidence ({confidence:.2f}) is above policy threshold.")
    return False

def escalate_to_hitl(user_query: str, intent: str, confidence: float) -> Dict[str, Any]:
    """[GOVERNANCE] Handles the final escalation action."""
    print("‚û°Ô∏è [Governance] Routing request to Human-in-the-Loop (HITL) queue.")
    # In a real system, this would write to a database or a pub/sub topic.
    return {
        "status": "ESCALATED", 
        "reason": f"Model uncertainty for intent '{intent}' (Confidence: {confidence:.2f}).",
        "payload": user_query
    }

In [6]:
# orchestration.py
from typing import Callable, Dict, Any
# We import the placeholder

# --- MOCK TOOL CALL HANDLER ---
def _handle_tool_call(tool_name: str, arguments: Dict[str, Any]) -> str:
    """Simulates executing an external function call."""
    print(f"üõ†Ô∏è [Tool Handler] Executing tool: {tool_name} with args: {arguments}")
    if tool_name == "get_billing_details":
        return f"Tool Response: The user was charged $50 for 'Monthly Subscription Fee' on {arguments.get('date', '2025-11-15')}."
    elif tool_name == "trigger_cancellation_workflow":
        return f"Tool Response: Workflow initiated for order {arguments.get('order_id', 'unknown')}."
    return f"Tool {tool_name} executed successfully."


# --- ROUTER CORE LOGIC ---

def select_model_and_context(
    intent: str, 
    query: str
) -> Dict[str, Any]:
    """
    [ORCHESTRATION] Router Logic: Determines the best resource (Model, Tool, RAG)
    based on the classified intent.
    """
    if intent == "Greeting":
        return {"resource_type": "model", "model_name": "gemini-2.5-flash", "use_rag": False, "tool_call": None}
        
    elif intent == "FAQ_Simple" or intent == "Product_Inquiry":
        # Simple RAG with a fast model
        return {"resource_type": "model", "model_name": "gemini-2.5-flash", "use_rag": True, "tool_call": None}
    
    elif intent == "Bug_Report_Complex":
        # Complex RAG with a powerful model
        return {"resource_type": "model", "model_name": "gemini-2.5-pro", "use_rag": True, "tool_call": None}
    
    elif intent == "Billing_Question":
        # Example: This intent triggers a tool call first
        return {
            "resource_type": "tool",
            "model_name": "gemini-2.5-flash", # Model to use *after* tool
            "use_rag": False, # Tool provides context
            "tool_call": {
                "name": "get_billing_details", 
                "arguments": {"date": "2025-11-15"} # Mocked args
            }
        }
    
    elif intent == "Cancel_Order":
        # Example: This intent triggers a tool call
        return {
            "resource_type": "tool",
            "model_name": "gemini-2.5-flash",
            "use_rag": False, 
            "tool_call": {
                "name": "trigger_cancellation_workflow", 
                "arguments": {"order_id": "12345"} # Mocked args
            }
        }

    # Default (e.g., Account_Lock, Unknown)
    return {"resource_type": "model", "model_name": "gemini-2.5-pro", "use_rag": True, "tool_call": None}


def execute_generation(
    route: Dict[str, Any],
    query: str,
    rag_context: str
) -> str:
    """
    [ORCHESTRATION] Final execution function
    """
    resource_type = route['resource_type']
    model_name = route['model_name']
    
    # 1. Handle Tool-based routes
    if resource_type == "tool":
        tool_call = route['tool_call']
        print(f"‚öôÔ∏è [Router] Delegating task to Tool: {tool_call['name']}")
        
        # Execute the tool
        tool_response = _handle_tool_call(tool_call['name'], tool_call['arguments'])
        
        # Now, use the model to *summarize* the tool's output
        final_prompt = (
            f"You are a helpful assistant. A user asked: '{query}'.\n"
            f"An internal tool was run and provided this data: '{tool_response}'.\n"
            f"Answer the user's question based *only* on the tool's response."
        )
        # Note: We are now calling the LLM as a *model*
        
    # 2. Handle Model-based routes
    else:
        print(f"‚öôÔ∏è [Router] Delegating task to LLM: {model_name}")
        
        # Construct the final prompt, conditionally adding RAG context
        final_prompt = f"USER QUESTION: {query}"
        if route['use_rag'] and rag_context:
            final_prompt = (
                f"Use the following context to answer the user's question.\n"
                f"CONTEXT:\n{rag_context}\n\n"
                f"--- \n\n"
                f"USER QUESTION: {query}"
            )
        elif route['use_rag'] and not rag_context:
             final_prompt = f"USER QUESTION: {query}\n\n(Note: No specific context was found, answer generally.)"
    
    # 3. Call the LLM with the final composed prompt
    return call_llm(
        prompt=final_prompt,
        model_name=model_name
    )

In [7]:
# main.py
# import governance
# import behavior
# import retrieval
# import orchestration
from typing import Dict, Any

def process_customer_query(user_query: str):
    """
    Composes all design patterns into the Customer Support Query workflow.
    """
    print("\n" + "="*50)
    print(f"| NEW QUERY: {user_query}")
    print("="*50)

    # 1. GOVERNANCE: Input Guardrail
    if not check_guardrails(user_query, is_input=True):
        return {"status": "REJECTED", "reason": "Input guardrail violation."}

    # 2. BEHAVIOR: Classification
    # The behavior module uses the LLM to get structured intent
    classification_data = classify_and_structure(user_query)
    intent = classification_data.get("intent")
    confidence = classification_data.get("confidence")
    
    print(f"‚ÑπÔ∏è [Classifier] Intent={intent}, Confidence={confidence:.2f}")

    # 3. GOVERNANCE: Confidence Check (Escalation)
    if check_for_escalation(intent, confidence):
        return escalate_to_hitl(user_query, intent, confidence)

    # 4. ORCHESTRATION: Routing
    # Select the right model, RAG strategy, or tool based on intent
    route = select_model_and_context(intent, user_query)
    
    # 5. RETRIEVAL: Conditional RAG
    # Only perform RAG if the orchestrator's route says to
    rag_context = ""
    if route.get("use_rag", False):
        rag_context = perform_rag_search(user_query, required_k=3)
    else:
        print("‚ÑπÔ∏è [Orchestrator] Skipping RAG for this route.")

    # 6. ORCHESTRATION: Execution
    # Pass the route, query, and context (if any) to the executor
    final_answer = execute_generation(
        route=route,
        query=user_query,
        rag_context=rag_context
    )
        
    # 7. GOVERNANCE: Output Guardrail
    output_check = check_guardrails(
        text=final_answer, 
        is_input=False,
        user_query=user_query,
        rag_context=rag_context
    )
        
    if output_check["status"] == "FAIL":
        # Escalation is triggered here if the LLM Judge failed the answer quality
        return escalate_to_hitl(
            user_query, 
            intent="GuardrailFailed", 
            confidence=0.0 # Force low confidence for escalation
        )

    print("\n‚úÖ Process Complete: Answer delivered safely.")
    return {"status": "SUCCESS", "answer": final_answer}


In [8]:

# Test a simple RAG query
print(process_customer_query("can I return Nespresso"))

# Test a complex RAG query
print(process_customer_query("My site is down and I see a 404 error"))

# Test a simple non-RAG query
print(process_customer_query("hello"))

# Test a Tool-based query
print(process_customer_query("why was I charged $50"))

# Test an escalation query
print(process_customer_query("You are giving incorrect answers and I don't want to talk to you anymore")) # Should be "Unknown"


| NEW QUERY: can I return Nespresso
‚úÖ [Guardrail] Input passed safety checks.




‚ÑπÔ∏è [Classifier] Intent=FAQ_Simple, Confidence=0.95
‚úÖ [Governance] Confidence (0.95) is above policy threshold.
üìö [Retrieval] Searching Vector DB for relevant context (k=3)...
  -> Retrieval Router applied filter: 'Policy'
  -> Retrieval Router applied filter: {'source': 'Policy'}
  -> Ranking completed. Selected 3 chunks for context.
‚öôÔ∏è [Router] Delegating task to LLM: gemini-2.5-flash
‚úÖ [Guardrail] Output passed simple safety checks.
‚öñÔ∏è [Judge] Starting LLM-as-a-Judge evaluation for faithfulness...
‚úÖ [Judge] Hallucination Check PASSED (Faithfulness: 1.00).

‚úÖ Process Complete: Answer delivered safely.
{'status': 'SUCCESS', 'answer': 'Yes, you can return a Nespresso machine within 30 days, provided the original packaging is intact.'}

| NEW QUERY: My site is down and I see a 404 error
‚úÖ [Guardrail] Input passed safety checks.
‚ÑπÔ∏è [Classifier] Intent=Bug_Report_Complex, Confidence=0.99
‚úÖ [Governance] Confidence (0.99) is above policy threshold.
üìö [Retrie