# 📓 The GenAI Revolution Cookbook

**Title:** How to Build a Deterministic LangGraph Agent with Plan-Execute

**Description:** Ship a production-grade LangGraph agent: deterministic plan-execute, strict JSON schemas, thread memory, SQLite checkpoints, and a single FastAPI /agent endpoint.

---

*This jupyter notebook contains executable code examples. Run the cells below to try out the code yourself!*



## Why This Approach Works

Building reliable AI agents for production requires more than chaining LLM calls—you need **deterministic planning**, **strict validation**, and **resumable state**. This guide shows you how to combine LangGraph's state management with structured schemas and guardrails to create an agent that:

- **Plans before acting** using a fixed schema, so you can audit and control every step
- **Validates tool inputs** with Pydantic, catching errors before execution
- **Persists checkpoints** in SQLite, enabling pause/resume and debugging
- **Exposes a clean API** via FastAPI for integration into real systems

You'll build a single `/agent` endpoint that accepts a user query, plans a sequence of tool calls, executes them with retries and guardrails, and returns a final answer—all with predictable costs and debuggable runs.

## How It Works (High-Level Overview)

The agent follows a three-node graph:

1. **Planner** – The LLM receives the user query and outputs a structured `Plan` (list of steps with tool names and arguments). Temperature is set to 0 for consistency.
2. **Executor** – Each step is validated against the tool's Pydantic schema, then executed with retries and timeout. Results (or errors) are appended to state.
3. **Finalizer** – The LLM synthesizes all evidence into a final answer, citing step results.

State flows through a SQLite checkpointer, so you can interrupt, inspect, and resume at any step. Conditional edges route errors to END immediately, short-circuiting the graph when guardrails fail.

## Setup & Installation

Install dependencies (pinned for reproducibility):

In [None]:
!pip install langgraph~=0.2.0 langgraph-checkpoint-sqlite~=0.1.0 langchain-core~=0.3.0 langchain-openai~=0.2.0 openai~=1.0 pydantic~=2.0 fastapi~=0.115.0 uvicorn~=0.30.0 python-dotenv~=1.0 httpx~=0.27.0 nest-asyncio~=1.6.0

Set your OpenAI API key (for Colab, set programmatically; for local, use `.env`):

In [None]:
import os
os.environ["OPENAI_API_KEY"] = "sk-..."  # Replace with your key
# Optional: os.environ["LANGCHAIN_API_KEY"] = "..." for LangSmith tracing

Print versions to confirm setup:

In [None]:
import langgraph, langchain_core, langchain_openai, pydantic, fastapi
print(f"LangGraph: {langgraph.__version__}")
print(f"LangChain Core: {langchain_core.__version__}")
print(f"LangChain OpenAI: {langchain_openai.__version__}")
print(f"Pydantic: {pydantic.__version__}")
print(f"FastAPI: {fastapi.__version__}")

## Step-by-Step Implementation

### 1. Define Pydantic Schemas

Strict schemas enforce structure at both the planning and tool layers. The planner outputs a `Plan` with a list of `Step` objects; each tool defines its own input schema.

In [None]:
from pydantic import BaseModel, Field
from typing import Literal

class Step(BaseModel):
    """A single planned action."""
    step_id: int = Field(..., description="Unique step number")
    tool: Literal["calculator", "search_docs"] = Field(..., description="Tool name")
    args: dict = Field(..., description="Tool arguments as key-value pairs")
    reason: str = Field(..., description="Why this step is needed")

class Plan(BaseModel):
    """Structured plan output by the planner LLM."""
    steps: list[Step] = Field(..., description="Ordered list of steps")

class CalculatorInput(BaseModel):
    """Schema for calculator tool."""
    expression: str = Field(..., description="Math expression using +, -, *, /, (, )")

class SearchDocsInput(BaseModel):
    """Schema for document search tool."""
    query: str = Field(..., description="Search query string")

**Why strict schemas?** They prevent the LLM from hallucinating invalid tool names or malformed arguments, and they make every plan auditable before execution.

### 2. Implement Tools with Validation

Each tool validates its input schema and includes guardrails (e.g., disallowing unsafe operations). We use `ast.parse` to whitelist safe math nodes and reject anything else.

In [None]:
import ast
import operator

# Calculator tool with AST-based guardrails
def calculator(expression: str) -> str:
    """
    Evaluate a math expression safely.
    Only allows +, -, *, /, parentheses, and numbers.
    """
    allowed_nodes = (ast.Expression, ast.BinOp, ast.UnaryOp, ast.Num, ast.Constant)
    allowed_ops = (ast.Add, ast.Sub, ast.Mult, ast.Div, ast.USub, ast.UAdd)
    
    try:
        tree = ast.parse(expression, mode='eval')
        for node in ast.walk(tree):
            if not isinstance(node, allowed_nodes):
                return f"Error: Disallowed operation in expression: {expression}"
            if isinstance(node, (ast.BinOp, ast.UnaryOp)) and not isinstance(node.op, allowed_ops):
                return f"Error: Disallowed operator in expression: {expression}"
        
        result = eval(compile(tree, filename="", mode="eval"))
        return str(result)
    except Exception as e:
        return f"Error: {str(e)}"

# Mock document search tool
def search_docs(query: str) -> str:
    """
    Mock document search. Replace with real retrieval logic.
    """
    mock_db = {
        "policy": "Our return policy allows 30-day returns.",
        "hours": "We are open Mon-Fri 9am-5pm."
    }
    for key, doc in mock_db.items():
        if key in query.lower():
            return doc
    return "No relevant documents found."

**Why AST parsing?** Simple `eval` is unsafe. AST whitelisting ensures only arithmetic operations run, blocking code injection or disallowed functions.

Test the calculator guardrail:

In [None]:
print(calculator("2 + 2 * 3"))       # Should return "8"
print(calculator("2 + sqrt(4)"))     # Should return error (sqrt not allowed)
print(calculator("2 + 2 ^ 3"))       # Should return error (^ not allowed)

### 3. Register Tools in a Guarded Registry

The registry maps tool names to (function, schema) pairs and wraps execution with retries, timeout, and validation.

In [None]:
from tenacity import retry, stop_after_attempt, wait_exponential
import asyncio

TOOL_REGISTRY = {
    "calculator": (calculator, CalculatorInput),
    "search_docs": (search_docs, SearchDocsInput),
}

@retry(stop=stop_after_attempt(2), wait=wait_exponential(min=1, max=4))
async def call_tool_safe(tool_name: str, args: dict) -> str:
    """
    Validate args against schema, then call tool with timeout and retry.
    """
    if tool_name not in TOOL_REGISTRY:
        return f"Error: Unknown tool '{tool_name}'"
    
    func, schema = TOOL_REGISTRY[tool_name]
    
    # Validate args
    try:
        validated = schema(**args)
    except Exception as e:
        return f"Validation error: {str(e)}"
    
    # Execute with timeout
    try:
        result = await asyncio.wait_for(
            asyncio.to_thread(func, **validated.dict()),
            timeout=5.0
        )
        return result
    except asyncio.TimeoutError:
        return "Error: Tool execution timeout"
    except Exception as e:
        return f"Error: {str(e)}"

**Why retries and timeout?** External tools (APIs, databases) can fail transiently. Retries with exponential backoff improve reliability; timeout prevents hanging.

Test the guarded caller:

In [None]:
import asyncio
result = asyncio.run(call_tool_safe("calculator", {"expression": "10 / 2"}))
print(result)  # Should print "5.0"

result = asyncio.run(call_tool_safe("calculator", {"expression": "10 / 0"}))
print(result)  # Should print error message

### 4. Configure LLM Clients

We use two clients: one for structured planning (with `with_structured_output`), one for natural language finalization. Temperature is 0 for determinism.

In [None]:
from langchain_openai import ChatOpenAI

# Planner: outputs structured Plan
planner_llm = ChatOpenAI(
    model="gpt-4o-mini",
    temperature=0  # Deterministic planning
).with_structured_output(Plan)

# Finalizer: outputs natural language
finalizer_llm = ChatOpenAI(
    model="gpt-4o-mini",
    temperature=0
)

**Why temperature=0?** It minimizes variance in plan structure and final answers, making costs and behavior predictable across runs.

Test the planner with a simple prompt:

In [None]:
test_plan = planner_llm.invoke("Calculate 5 + 3 and search for return policy")
print(test_plan)  # Should print a Plan object with two steps

### 5. Define State Schema

State carries the query, plan, execution results, memory summary, and step index through the graph.

In [None]:
from typing import TypedDict, Annotated
from langgraph.graph import add_messages

class AgentState(TypedDict):
    query: str
    plan: Plan | None
    results: Annotated[list[dict], add_messages]  # Append-only list of step results
    memory_summary: str  # Thread-scoped context from prior runs
    step_index: int
    final_answer: str | None
    error: str | None

**Why `add_messages` for results?** It ensures results accumulate across graph invocations without overwriting, critical for resumability.

### 6. Implement Graph Nodes

Each node is an async function that reads and writes state.

#### Planner Node

In [None]:
async def planner_node(state: AgentState) -> dict:
    """
    Generate a structured plan from the user query and memory.
    """
    prompt = f"User query: {state['query']}\n"
    if state.get("memory_summary"):
        prompt += f"Context from prior conversation: {state['memory_summary']}\n"
    prompt += "Create a step-by-step plan using available tools: calculator, search_docs."
    
    plan = await planner_llm.ainvoke(prompt)
    return {"plan": plan, "step_index": 0}

**Why inject memory_summary?** It allows the planner to adapt based on prior interactions in the same thread, enabling multi-turn workflows.

#### Executor Node

In [None]:
async def executor_node(state: AgentState) -> dict:
    """
    Execute the current step, validate, and append result to state.
    """
    plan = state["plan"]
    idx = state["step_index"]
    
    if idx >= len(plan.steps):
        return {"step_index": idx}  # All steps done
    
    step = plan.steps[idx]
    result_text = await call_tool_safe(step.tool, step.args)
    
    result_entry = {
        "step_id": step.step_id,
        "tool": step.tool,
        "args": step.args,
        "result": result_text
    }
    
    # Check for errors
    if result_text.startswith("Error:"):
        return {
            "results": [result_entry],
            "error": result_text,
            "step_index": idx + 1
        }
    
    return {
        "results": [result_entry],
        "step_index": idx + 1
    }

**Why check for "Error:" prefix?** It's a simple convention to route failures to END via conditional edges, short-circuiting the graph.

#### Finalizer Node

In [None]:
async def finalizer_node(state: AgentState) -> dict:
    """
    Synthesize all step results into a final answer.
    """
    evidence = "\n".join([
        f"Step {r['step_id']}: {r['tool']}({r['args']}) -> {r['result']}"
        for r in state["results"]
    ])
    
    prompt = f"User query: {state['query']}\n\nEvidence:\n{evidence}\n\nProvide a concise final answer."
    
    response = await finalizer_llm.ainvoke(prompt)
    return {"final_answer": response.content}

**Why cite evidence?** It makes the final answer auditable and helps debug incorrect results by tracing back to specific tool outputs.

### 7. Build the Graph with Conditional Edges

Wire the nodes with conditional routing: continue executing steps until done or error, then finalize or end.

In [None]:
from langgraph.graph import StateGraph, END

def should_continue(state: AgentState) -> str:
    """Route to executor, finalizer, or end based on state."""
    if state.get("error"):
        return END
    if state["step_index"] >= len(state["plan"].steps):
        return "finalizer"
    return "executor"

# Build graph
graph_builder = StateGraph(AgentState)
graph_builder.add_node("planner", planner_node)
graph_builder.add_node("executor", executor_node)
graph_builder.add_node("finalizer", finalizer_node)

graph_builder.set_entry_point("planner")
graph_builder.add_edge("planner", "executor")
graph_builder.add_conditional_edges("executor", should_continue, {
    "executor": "executor",
    "finalizer": "finalizer",
    END: END
})
graph_builder.add_edge("finalizer", END)

**Why conditional edges?** They enable dynamic routing (loop over steps, short-circuit on error) without hardcoding the number of steps.

### 8. Add SQLite Checkpointer and Compile

The checkpointer persists state after each node, enabling pause/resume and debugging.

In [None]:
from langgraph.checkpoint.sqlite import SqliteSaver

checkpointer = SqliteSaver.from_conn_string("checkpoints.db")
graph = graph_builder.compile(checkpointer=checkpointer)

**Why SQLite?** It's simple for development and supports thread-scoped state. For production, consider Postgres or Redis for concurrency and scale.

Test the graph without the API:

In [None]:
config = {"configurable": {"thread_id": "test-thread-1"}}
result = await graph.ainvoke(
    {"query": "Calculate 10 + 5 and search for return policy", "results": [], "step_index": 0, "memory_summary": ""},
    config
)
print(result["final_answer"])

### 9. Expose via FastAPI

Wrap the graph in a POST endpoint that accepts a query and thread_id, invokes the graph, and returns the final answer or error.

In [None]:
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel as PydanticBase

app = FastAPI()

class AgentRequest(PydanticBase):
    query: str
    thread_id: str = "default"
    memory_summary: str = ""

class AgentResponse(PydanticBase):
    final_answer: str | None = None
    error: str | None = None
    thread_id: str

@app.post("/agent", response_model=AgentResponse)
async def agent_endpoint(req: AgentRequest):
    """
    Run the agent graph for a given query and thread.
    """
    config = {"configurable": {"thread_id": req.thread_id}}
    initial_state = {
        "query": req.query,
        "results": [],
        "step_index": 0,
        "memory_summary": req.memory_summary,
        "plan": None,
        "final_answer": None,
        "error": None
    }
    
    try:
        final_state = await graph.ainvoke(initial_state, config)
        return AgentResponse(
            final_answer=final_state.get("final_answer"),
            error=final_state.get("error"),
            thread_id=req.thread_id
        )
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))

**Why thread_id?** It scopes checkpoints and memory to a conversation, enabling multi-turn interactions and resumability.

**Concurrency note:** If multiple requests for the same `thread_id` arrive concurrently, checkpoint writes may race. For production, add a simple lock per thread or use a queue.

Run the server (for local execution):

In [None]:
import uvicorn
# For notebooks, use nest_asyncio to allow uvicorn in the same event loop
import nest_asyncio
nest_asyncio.apply()

# Run in background or separate cell
uvicorn.run(app, host="0.0.0.0", port=8000, reload=False)

For Colab, run the server in a background thread and test with `httpx` in the same notebook.

## Run and Validate

### Happy Path

Test a successful query:

In [None]:
import httpx
import asyncio

async def test_happy_path():
    async with httpx.AsyncClient() as client:
        response = await client.post(
            "http://localhost:8000/agent",
            json={"query": "Calculate 15 * 3 and search for business hours", "thread_id": "thread-1"}
        )
        print(response.json())

asyncio.run(test_happy_path())

Expected output:

```json
{
  "final_answer": "15 * 3 equals 45. Our business hours are Mon-Fri 9am-5pm.",
  "error": null,
  "thread_id": "thread-1"
}
```

### Guardrail Failure

Test an invalid expression:

In [None]:
async def test_guardrail():
    async with httpx.AsyncClient() as client:
        response = await client.post(
            "http://localhost:8000/agent",
            json={"query": "Calculate 2 + sqrt(4)", "thread_id": "thread-2"}
        )
        print(response.json())

asyncio.run(test_guardrail())

Expected output:

```json
{
  "final_answer": null,
  "error": "Error: Disallowed operation in expression: 2 + sqrt(4)",
  "thread_id": "thread-2"
}
```

### Resumability

To demonstrate resumability, you would interrupt the graph mid-execution (e.g., by adding a human-in-the-loop node) and re-invoke with the same `thread_id`. The checkpointer restores state and continues from the last completed node.

For a simple test, run two sequential requests with the same `thread_id` and pass a `memory_summary` in the second request to verify the planner adapts:

In [None]:
async def test_memory():
    async with httpx.AsyncClient() as client:
        # First request
        r1 = await client.post(
            "http://localhost:8000/agent",
            json={"query": "Calculate 10 + 5", "thread_id": "thread-3"}
        )
        print("First response:", r1.json())
        
        # Second request with memory
        r2 = await client.post(
            "http://localhost:8000/agent",
            json={
                "query": "Now multiply that result by 2",
                "thread_id": "thread-3",
                "memory_summary": "User previously calculated 10 + 5 = 15"
            }
        )
        print("Second response:", r2.json())

asyncio.run(test_memory())

The second response should reference the prior result (15) and calculate 15 * 2 = 30.

## Conclusion

You've built a deterministic, resumable AI agent with:

- **Structured planning** via Pydantic schemas and `with_structured_output`
- **Guardrails** using AST parsing and strict validation
- **Retries and timeout** for reliable tool execution
- **SQLite checkpoints** for pause/resume and debugging
- **FastAPI endpoint** for integration into real systems

**Key design decisions:**

- Temperature=0 ensures consistent plans and answers across runs
- Strict schemas prevent hallucinated tool calls and malformed arguments
- Conditional edges enable dynamic routing (loop, short-circuit on error)
- Thread-scoped state supports multi-turn conversations and resumability

**Next steps for production:**

- Add authentication (FastAPI dependency for API key validation)
- Implement rate limiting (e.g., `slowapi` or Redis-based limiter)
- Add observability (Prometheus metrics, LangSmith tracing, structured logging)
- Replace SQLite with Postgres or Redis for concurrent access
- Deploy with Docker + Kubernetes or serverless (AWS Lambda, GCP Cloud Run)
- Add a simple lock or queue per `thread_id` to prevent checkpoint races

You now have a ship-ready foundation for building reliable, auditable AI agents that scale.