# LangGraph Invoice Processing Agent with Human-In-The-Loop (HITL)

This notebook implements **Langie – the Invoice Processing LangGraph Agent**. The agent models a complex, multi-stage invoice processing workflow with deterministic steps, dynamic routing, and a Human-In-The-Loop (HITL) checkpoint.

## Step 1: Environment Setup
This cell installs the necessary LangGraph libraries and the specialized SQLite checkpointer.

In [1]:
!pip install -U langgraph langgraph-checkpoint-sqlite

Collecting langgraph-checkpoint-sqlite
  Downloading langgraph_checkpoint_sqlite-3.0.1-py3-none-any.whl.metadata (2.8 kB)
Collecting sqlite-vec>=0.1.6 (from langgraph-checkpoint-sqlite)
  Downloading sqlite_vec-0.1.6-py3-none-manylinux_2_17_x86_64.manylinux2014_x86_64.manylinux1_x86_64.whl.metadata (198 bytes)
Downloading langgraph_checkpoint_sqlite-3.0.1-py3-none-any.whl (33 kB)
Downloading sqlite_vec-0.1.6-py3-none-manylinux_2_17_x86_64.manylinux2014_x86_64.manylinux1_x86_64.whl (151 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m151.6/151.6 kB[0m [31m4.9 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: sqlite-vec, langgraph-checkpoint-sqlite
Successfully installed langgraph-checkpoint-sqlite-3.0.1 sqlite-vec-0.1.6


## Step 2: Database Initialization (Business DB)
While LangGraph handles internal checkpoints, the task requires a dedicated database for the Human Review Queue and Audit Logs.

In [2]:
import sqlite3
import json
import uuid
import time
from typing import Dict, List, Any, Optional
from typing_extensions import TypedDict
from langgraph.graph import StateGraph, START, END
from langgraph.checkpoint.sqlite import SqliteSaver

def init_business_db():
    conn = sqlite3.connect("invoice_business.db", check_same_thread=False)
    cursor = conn.cursor()
    # Table for Stage 6: CHECKPOINT_HITL
    cursor.execute("""CREATE TABLE IF NOT EXISTS human_review_queue (
        checkpoint_id TEXT PRIMARY KEY,
        invoice_id TEXT,
        state_blob TEXT,
        paused_reason TEXT,
        review_url TEXT,
        status TEXT)""")
    # Table for Stage 12: COMPLETE audit
    cursor.execute("""CREATE TABLE IF NOT EXISTS audit_logs (
        id INTEGER PRIMARY KEY AUTOINCREMENT,
        thread_id TEXT, stage TEXT, log_entry TEXT,
        timestamp DATETIME DEFAULT CURRENT_TIMESTAMP)""")
    conn.commit()
    return conn

biz_conn = init_business_db()

## Step 3: Orchestration Registry (Bigtool & MCP)
This cell implements the functional logic for Bigtool and MCP Client Orchestration.

In [3]:
class Bigtool:
    @staticmethod
    def select(capability: str, pool: List[str], context: Dict) -> str:
        # Selection logic: Premium tools for high-value invoices (over 2500)
        amount = context.get("amount", 0)
        return pool[0] if amount <= 2500 else pool[-1]

class MCPClient:
    @staticmethod
    def execute_ability(server: str, ability: str, data: Dict):
        # Simulates routing to ATLAS or COMMON servers
        return {"status": "success", "server": server, "ability": ability}

## Step 4: Config-Driven LangGraph Construction
This cell constructs the 12-stage workflow dynamically from the workflow.json configuration .

In [4]:
# State Definition
class AgentState(TypedDict):
    invoice_payload: Dict[str, Any]
    match_score: float
    match_result: str
    human_decision: str
    status: str
    audit_log: List[str]
    final_payload: Optional[Dict[str, Any]]

# Configuration (Based on workflow.json requirement)
WORKFLOW_JSON = {
    "stages": [
        {"id": "INTAKE", "server": "COMMON", "abilities": ["validate_schema", "persist"]},
        {"id": "UNDERSTAND", "server": "ATLAS", "cap": "ocr", "pool": ["google_vision", "tesseract", "aws_textract"], "abilities": ["ocr_extract"]},
        {"id": "PREPARE", "server": "ATLAS", "cap": "enrichment", "pool": ["clearbit", "vendor_db"], "abilities": ["enrich_vendor"]},
        {"id": "RETRIEVE", "server": "ATLAS", "cap": "erp_connector", "pool": ["sap_sandbox", "netsuite"], "abilities": ["fetch_po", "fetch_grn"]},
        {"id": "MATCH_TWO_WAY", "server": "COMMON", "abilities": ["compute_match_score"]},
        {"id": "CHECKPOINT_HITL", "server": "COMMON", "abilities": ["save_state"]},
        {"id": "HITL_DECISION", "server": "ATLAS", "abilities": ["get_decision"]},
        {"id": "RECONCILE", "server": "COMMON", "abilities": ["build_accounting_entries"]},
        {"id": "APPROVE", "server": "ATLAS", "abilities": ["apply_policy"]},
        {"id": "POSTING", "server": "ATLAS", "cap": "erp_connector", "pool": ["sap_sandbox", "mock_erp"], "abilities": ["post_to_erp"]},
        {"id": "NOTIFY", "server": "ATLAS", "cap": "email", "pool": ["sendgrid", "ses"], "abilities": ["notify_vendor"]},
        {"id": "COMPLETE", "server": "COMMON", "abilities": ["output_final_payload"]}
    ]
}

def create_stage_node(stage_cfg):
    s_id = stage_cfg["id"]
    def logic(state: AgentState):
        if "cap" in stage_cfg:
            tool = Bigtool.select(stage_cfg["cap"], stage_cfg["pool"], state["invoice_payload"])
            state["audit_log"].append(f"Bigtool selected: {tool}")

        for ability in stage_cfg["abilities"]:
            MCPClient.execute_ability(stage_cfg["server"], ability, state["invoice_payload"])

        if s_id == "MATCH_TWO_WAY":
            score = 0.85 if state["invoice_payload"]["amount"] > WORKFLOW_JSON.get("match_threshold", 1000) else 0.95
            return {"match_score": score, "match_result": "FAILED" if score < 0.9 else "MATCHED"}

        elif s_id == "CHECKPOINT_HITL":
            check_id = str(uuid.uuid4())
            biz_conn.execute("INSERT INTO human_review_queue VALUES (?, ?, ?, ?, ?, ?)",
                (check_id, state["invoice_payload"].get("invoice_id"), json.dumps(state),
                 "MATCH_FAILED", f"http://review.local/{check_id}", "PAUSED"))
            biz_conn.commit()
            return {"status": "PAUSED"}

        elif s_id == "COMPLETE":
            final = {"id": state["invoice_payload"].get("invoice_id"), "status": "FINALIZED"}
            biz_conn.execute("INSERT INTO audit_logs (thread_id, stage, log_entry) VALUES (?, ?, ?)",
                           ("TXN_1", "COMPLETE", "Process Finished"))
            biz_conn.commit()
            return {"final_payload": final, "status": "COMPLETED"}

        return {"audit_log": state["audit_log"] + [f"Stage {s_id} completed."]}
    return logic

# Assemble Graph
builder = StateGraph(AgentState)
for s in WORKFLOW_JSON["stages"]:
    builder.add_node(s["id"].lower(), create_stage_node(s))

builder.add_edge(START, "intake")
builder.add_edge("intake", "understand")
builder.add_edge("understand", "prepare")
builder.add_edge("prepare", "retrieve")
builder.add_edge("retrieve", "match_two_way")
builder.add_conditional_edges("match_two_way", lambda x: "checkpoint_hitl" if x["match_result"] == "FAILED" else "reconcile")
builder.add_edge("checkpoint_hitl", "hitl_decision")
builder.add_conditional_edges("hitl_decision", lambda x: "reconcile" if x.get("human_decision") == "ACCEPT" else END)
builder.add_edge("reconcile", "approve")
builder.add_edge("approve", "posting")
builder.add_edge("posting", "notify")
builder.add_edge("notify", "complete")
builder.add_edge("complete", END)

# Persistence
memory = SqliteSaver(sqlite3.connect("workflow_memory.db", check_same_thread=False))
langie_app = builder.compile(checkpointer=memory, interrupt_after=["checkpoint_hitl"])

## Step 5: Demo Run & Audit
This cell simulates a full run of Langie. It processes an invoice that fails the matching criteria, pauses for a human decision, and then resumes to produce the final payload .

In [5]:
# 1. Automated Execution (Up to HITL Checkpoint)
config = {"configurable": {"thread_id": "DEMO_RUN_01"}}
initial_input = {"invoice_payload": {"invoice_id": "INV-1001", "amount": 5000}, "audit_log": []}

print("--- [AUTOMATED PROCESSING START] ---")
for event in langie_app.stream(initial_input, config):
    print(f"Node Executed: {list(event.keys())[0]}")

# 2. Simulate Human Review via API decision
print("\n--- [HUMAN REVIEW TRIGGERED] ---")
langie_app.update_state(config, {"human_decision": "ACCEPT", "status": "RESUMING"}, as_node="hitl_decision")

# 3. Resume Execution to Stage 12 (COMPLETE)
print("\n--- [RESUMING WORKFLOW] ---")
for event in langie_app.stream(None, config):
    if "complete" in event:
        print("\nFinal Structured Payload:")
        print(json.dumps(event['complete']['final_payload'], indent=2))

--- [AUTOMATED PROCESSING START] ---
Node Executed: intake
Node Executed: understand
Node Executed: prepare
Node Executed: retrieve
Node Executed: match_two_way
Node Executed: checkpoint_hitl
Node Executed: __interrupt__

--- [HUMAN REVIEW TRIGGERED] ---

--- [RESUMING WORKFLOW] ---

Final Structured Payload:
{
  "id": "INV-1001",
  "status": "FINALIZED"
}
