# Section 4: Human-in-the-Loop with LangGraph Primitives

<div align="center">
    <img src="../../images/supervisor_with_verification.png" width="800">
</div>

In Section 3, we built a supervisor using `create_agent` that routes queries to specialized sub-agents.

However, to handle some underspecified queries like _"Whats the status of my recent order?"_, we need to first verify the customer's identity **before** the agent can provide support.

In this section, we'll add a **verification layer** to our agent using LangGraph primitives that:
- Classifies whether a user question needs identity verification
- Pauses execution to collect customer email (HITL)
- Validates the email against our customer database to get a `customer_id`
- Skips verification on follow-up questions in the same thread
- Routes to our Supervisor Agent for the actual query handling

We keep the sub-agents simple (`create_agent`), but add more complex orchestration with LangGraph.

### Setup

Load environment variables and necessary imports.

In [None]:
from dotenv import load_dotenv

load_dotenv()

## Build Verification Layer with LangGraph Primitives

Let's add the verification layer that sits **in front of** the supervisor. To do so, we'll need to:
1. Define a custom state schema that stores a `customer_id` once the user is verified
2. Build a set of nodes and edges that implement our verification and routing logic
3. Compile our state + nodes + supervisor agent into a graph that we can invoke

### State Schema

Since we'll now be collecting and storing information about the customer, we need to make sure this information is persisted in our graph's state. 

We'll extend LangGraph's `MessagesState` to include a `customer_id`, so this value will be shared across the verification layer, supervisor, and sub-agents.

In [None]:
from langgraph.graph import MessagesState


class IntermediateState(MessagesState):
    """Extended MessagesState with customer_id

    MessagesState includes a `messages` key with proper reducers by default.
    Shared keys automatically flow between parent and subgraphs.
    """

    customer_id: str

### Define Graph Nodes

<div align="center">
    <img src="../../images/verification_layer.png" width="800">
</div>

We'll create 4 nodes:
1. **query_router**: Classifies whether a query needs identity verification before answering
2. **verify_customer**: Ensures the agent has a valid `customer_id` based on provided email address
3. **collect_email**: Dedicated HITL node that pauses with `interrupt()` to collect user input
4. **supervisor_agent (subgraph)**: The supervisor agent that handles the actual query

#### Node 1: Query Router

The "Query Router" node makes a decision about whether identity verification is required to answer the user's question.

It inspects the agent's state and the user's latest message to determine:
1. If the customer should be prompted for verification (such as when asking for information about a particular order), or 
2. If the request can proceed directly to the supervisor agent (for general questions or public information)


##### Helper Function

We'll use an LLM with structured output to classify each user message to determine if it requires verification, so lets set that up first.

In [None]:
import pprint
from typing_extensions import TypedDict, Annotated
from langchain.chat_models import init_chat_model
from config import DEFAULT_MODEL


# TypedDict for structured LLM output
class QueryClassification(TypedDict):
    """Classification of whether customer identity verification is required."""

    reasoning: Annotated[
        str, ..., "Brief explanation of why verification is or isn't needed"
    ]
    requires_verification: Annotated[
        bool,
        ...,
        "True if the query requires knowing customer identity (e.g., 'my orders', 'my account', 'my purchases'). False for general questions (product info, policies, how-to questions).",
    ]


def classify_query_intent(query: str) -> QueryClassification:
    """Classify whether a query requires customer identity verification.

    Args:
        query: The user's query string

    Returns:
        QueryClassification dict with reasoning and requires_verification fields
    """
    llm = init_chat_model(DEFAULT_MODEL)

    # Create structured LLM
    structured_llm = llm.with_structured_output(QueryClassification)
    classification_prompt = """Analyze the user's query to determine if it requires knowing their customer identity in order to answer the question."""

    # Get structured classification
    classification = structured_llm.invoke(
        [
            {"role": "system", "content": classification_prompt},
            {"role": "user", "content": "Query: " + query},
        ]
    )

    return classification

In [None]:
test = classify_query_intent("Whats the status of my recent order?")
pprint.pprint(test)

##### Node Implementation

Now, we can leverage this `classify_query_intent` function in a node to intelligently decide when to route users for identity verification vs. letting them skip directly to the supervisor agent for general questions.

In [None]:
from langgraph.types import Command
from typing import Literal


# Node 1: Query Router
def query_router(
    state: IntermediateState,
) -> Command[Literal["verify_customer", "supervisor_agent"]]:
    """Route query based on verification needs.

    Logic:
    1. If customer already verified from earlier in the thread → supervisor_agent
    2. If query needs verification → verify_customer
    3. Otherwise → supervisor_agent
    """

    # Already verified? Skip to supervisor agent
    if state.get("customer_id"):
        return Command(goto="supervisor_agent")

    # Not already verified - classify query to see if verification is needed
    last_message = state["messages"][-1]
    query_classification = classify_query_intent(last_message.content)

    # Route based on classification
    if query_classification.get("requires_verification"):
        return Command(goto="verify_customer")
    return Command(goto="supervisor_agent")

#### Node 2: Verify Customer

The "Verify Customer" node is responsible for ensuring our agent has a valid `customer_id` for the user. 

It does this by asking the customer to provide their email address, extracting it from their response, and then validating it against our TechHub database.

##### Helper Functions

**1. Email Extraction**

The first helper extracts email addresses from natural language user input. 

This uses structured output to parse messages like "Ok cool, my email is bob@gmail.com" into a clean format (i.e. "bob@gmail.com") that we can easily validate - we can't assume the user will respond with just a nicely formatted email address.


In [None]:
from typing_extensions import TypedDict, Annotated


class EmailExtraction(TypedDict):
    """Schema for extracting email from user message."""

    email: Annotated[
        str,
        "The email address extracted from the message, or empty string if none found",
    ]


def create_email_extractor():
    """Create an LLM configured to extract emails from natural language."""
    llm = init_chat_model(DEFAULT_MODEL)
    return llm.with_structured_output(EmailExtraction)

In [None]:
# Test email extraction
extractor = create_email_extractor()
result = extractor.invoke("Ok cool, my email is bob@gmail.com")
print(result)

**2. Customer Validation**

The second helper validates an email address by referencing information in our TechHub database. It returns a `CustomerInfo` object with the `customer_id` and `customer_name` if the email exists, or `None` if not found.


In [None]:
from typing import NamedTuple
from langchain_community.utilities import SQLDatabase


class CustomerInfo(NamedTuple):
    """Customer information returned from validation."""

    customer_id: str
    customer_name: str


def validate_customer_email(email: str, db: SQLDatabase) -> CustomerInfo | None:
    """Validate email format and lookup customer in database.

    Args:
        email: Email address to validate
        db: SQLDatabase connection

    Returns:
        CustomerInfo with customer_id and customer_name if valid, None otherwise
    """
    # Check email format
    if not email or "@" not in email:
        return None

    # Lookup in database
    result = db._execute(
        f"SELECT customer_id, name FROM customers WHERE email = '{email}'"
    )

    # Convert SQLDatabase query results to list of tuples (values only)
    result = [tuple(row.values()) for row in result]

    if not result:
        return None

    customer_id, customer_name = result[0]
    return CustomerInfo(customer_id=customer_id, customer_name=customer_name)

Lets test it out

In [None]:
from tools.database import get_database

# Get database connection (lazy loaded)
db = get_database()

# Test customer validation - Valid email
valid_customer = validate_customer_email(email="sarah.chen@gmail.com", db=db)
print("Valid email:", valid_customer)

# Test customer validation - Invalid email
invalid_customer = validate_customer_email(email="nonexistent@example.com", db=db)
print("Invalid email:", invalid_customer)

#### Node Implementation

Now we can utilize these functions to build our `verify_customer` node, which has one job: **ensure we have a valid email and corresponding customer_id**.

**Three-path logic:**

1. **Try to extract email** from user's last message using structured output
2. **If email found** → Validate against database:
   - ✅ **Valid**: Set `customer_id` + `goto="supervisor_agent"` (done!)
   - ❌ **Invalid**: Return error + `goto="collect_email"` (retry)
3. **If no email found in message** → Request email + `goto="collect_email"` (first time)

**Why this design?**
- **First visit** (from `query_router`): No email yet → Path 3 → Asks for it
- **Subsequent visits** (from `collect_email`): Email present → Path 2 → Validates it

One node handles both cases cleanly! Each path uses `Command(goto=...)` to explicitly control routing.


In [None]:
from langchain_core.messages import AIMessage


# Node 2: Verify Customer
def verify_customer(
    state: IntermediateState,
) -> Command[Literal["supervisor_agent", "collect_email"]]:
    """Ensure we have a valid customer email and set the `customer_id` in state.

    Uses Command to explicitly route based on result.
    """

    # Get last message from user
    last_message = state["messages"][-1]

    # Try to extract email using structured output
    email_extractor = create_email_extractor()
    extraction = email_extractor.invoke([last_message])

    # If we have an email, attempt to validate it
    if extraction["email"]:
        db = get_database()
        customer = validate_customer_email(email=extraction["email"], db=db)

        if customer:
            # Success! Email verified → Go to supervisor
            return Command(
                update={
                    "customer_id": customer.customer_id,
                    "messages": [
                        AIMessage(
                            content=f"✓ Verified! Welcome back, {customer.customer_name}."
                        )
                    ],
                },
                goto="supervisor_agent",
            )
        else:
            # Email not valid → Try again
            return Command(
                update={
                    "messages": [
                        AIMessage(
                            content=f"I couldn't find '{extraction['email']}' in our system. Please check and try again."
                        )
                    ]
                },
                goto="collect_email",
            )

    # No email detected → Ask for it
    return Command(
        update={
            "messages": [
                AIMessage(
                    content="To access information about your account or orders, please provide your email address."
                )
            ]
        },
        goto="collect_email",
    )

#### Node 3: Collect Email (Human Input)

The "Collect Email" node is our dedicated node for collecting the users input.

By separating HITL into its own node, we get:
- **Clear graph visualization** - The interrupt point is explicit in the graph
- **Clean separation of concerns** - Business logic (verify_customer) separate from input collection


##### Node Implementation

The node simply:
1. Calls `interrupt()` to pause execution
2. Waits for user to provide input via `Command(resume=...)`
3. Updates state with the `HumanMessage` and routes back for verification


In [None]:
from langgraph.types import interrupt
from langchain_core.messages import HumanMessage


# Node 3: Collect Email (HITL)
def collect_email(state: IntermediateState) -> Command[Literal["verify_customer"]]:
    """Dedicated node for collecting human input via interrupt."""
    user_input = interrupt(value="Please provide your email:")
    return Command(
        update={"messages": [HumanMessage(content=user_input)]}, goto="verify_customer"
    )

#### Node 4: Supervisor Agent

The "Supervisor Agent" is the final node in our verification graph. It's a complete compiled graph from Section 3 that we're [reusing as a node](https://docs.langchain.com/oss/python/langgraph/use-subgraphs#add-a-graph-as-a-node)!

> Note: We've refactored ALL agents from Section 3 (Database, Documents, and Supervisor) into the `agents/` directory as factory functions.


##### Node Implementation

We instantiate the supervisor agent using the factory function, which internally creates and wires together the sub-agents.

**Key Implementation Detail:** We also pass `IntermediateState` to our supervisor agent. This creates a shared state schema between the verification graph and supervisor graph - all shared keys (`messages`, `customer_id`) automatically flow between.


**Introducing State-Dependent Agent Behavior with Dynamic Prompts**

Up to this point (Sections 1-3), all agents operated without contextual state - they required explicit parameters in every query. But now that we have customer verification with `customer_id` stored in state, we can introduce **dynamic prompt injection** to make the supervisor agent context-aware!

**The Pattern: Dynamic System Prompts**

When a customer is verified, their `customer_id` is stored in the graph state. The supervisor agent uses the [`@dynamic_prompt` middleware](https://docs.langchain.com/oss/python/langchain/agents#dynamic-system-prompt) to automatically inject this information into its system prompt:

```python
@dynamic_prompt
def supervisor_prompt(request: ModelRequest) -> str:
    customer_id = request.state.get("customer_id", None)
    if customer_id:
        return f"""{prompt}
        \n\nThe customer's ID in this conversation is: {customer_id}
        """
    return prompt
```

The supervisor can now coordinate queries with verified `customer_id` to the database specialist. See the full implementation in [`agents/supervisor_agent.py`](../../agents/supervisor_agent.py#L95-L104).

In [None]:
from agents import create_db_agent, create_docs_agent, create_supervisor_agent
from tools import get_customer_orders

# Instantiate sub-agents - the db_agent now has access to get_customer_orders() which takes a customer_id
db_agent = create_db_agent(additional_tools=[get_customer_orders])
docs_agent = create_docs_agent()

# Instantiate supervisor agent (which wraps the sub-agents as tools)
supervisor_agent = create_supervisor_agent(
    db_agent, docs_agent, state_schema=IntermediateState
)

### Build and Compile the Verification Graph

Now we wire everything together:
- Add all nodes (including supervisor agent as a subgraph node)
- Set entry point
- Compile with checkpointer for persistence

In [None]:
from langgraph.graph import MessagesState, StateGraph, START
from langgraph.checkpoint.memory import MemorySaver

# Build the verification graph using MessagesState for input and output (i.e. just the messages key)
# and IntermediateState (i.e. with customer_id key) for communication between nodes
workflow = StateGraph(
    input_schema=MessagesState,
    state_schema=IntermediateState,
    output_schema=MessagesState,
)

# Add nodes
workflow.add_node("query_router", query_router)
workflow.add_node("verify_customer", verify_customer)
workflow.add_node("collect_email", collect_email)
workflow.add_node("supervisor_agent", supervisor_agent)

# Set entry point
workflow.add_edge(START, "query_router")

# Compile with checkpointer (REQUIRED for interrupt)
verification_graph = workflow.compile(checkpointer=MemorySaver())

### Visualize the Graph

Let's see what we built:

In [None]:
from IPython.display import Image

display(Image(verification_graph.get_graph(xray=True).draw_mermaid_png()))

## Test the Verification Graph

Let's test all four scenarios:
1. **General query** - No verification needed
2. **Personal query** - Verification with successful email
3. **Personal query with retry** - Invalid email, then valid email
4. **Follow-up query** - Verification skipped (already verified in thread)

### Scenario 1: General Query (No HITL)

Query about products or policies don't need customer identity verification.

In [None]:
import uuid

thread_id_1 = uuid.uuid4()
config_1 = {"configurable": {"thread_id": thread_id_1}}

result = verification_graph.invoke(
    {"messages": [HumanMessage(content="how much does the Apple Magic Mouse cost?")]},
    config=config_1,
)

for msg in result["messages"]:
    msg.pretty_print()

### Scenario 2: Personal Query with Successful Verification

Query about "my orders" require identity verification.

**Test email:** `sarah.chen@gmail.com` (exists in database)

In [None]:
# New thread
thread_id_2 = uuid.uuid4()
config_2 = {"configurable": {"thread_id": thread_id_2}}

# First invocation - will pause at interrupt
result = verification_graph.invoke(
    {"messages": [HumanMessage(content="Whats the status of my recent order?")]},
    config=config_2,
)

result

In [None]:
# Resume with valid email
result = verification_graph.invoke(
    Command(resume="Ok, its: sarah.chen@gmail.com"),
    config=config_2,
)
result

In [None]:
for msg in result["messages"]:
    msg.pretty_print()

### Scenario 3: Personal Query with Retry (Invalid Email)

What happens when the user provides an invalid email? The graph retries!

In [None]:
# New thread
thread_id_3 = uuid.uuid4()
config_3 = {"configurable": {"thread_id": thread_id_3}}


# First invocation - pauses
result = verification_graph.invoke(
    {"messages": [HumanMessage(content="Show me my recent purchases")]},
    config=config_3,
)
result

In [None]:
# Resume with INVALID email (not in database)
result = verification_graph.invoke(Command(resume="wrong@email.com"), config=config_3)
result

In [None]:
# Resume with VALID email
result = verification_graph.invoke(
    Command(resume="Ah sorry its actually sarah.chen@gmail.com"),
    config=config_3,
)
result

In [None]:
for msg in result["messages"]:
    msg.pretty_print()

### Scenario 4: Follow-up Query (Skip Verification)

Once verified in a thread, follow-up queries skip verification entirely!

In [None]:
# Use the same thread from Scenario 3 (already verified)

result = verification_graph.invoke(
    {
        "messages": [
            HumanMessage(
                content="Can you remind me what was in ORD-2024-0081 and how much it cost?"
            )
        ]
    },
    config=config_3,  # Reuse config from Scenario 3
)

result["messages"][-1].pretty_print()