In [1]:
# from future import annotations

import os
from typing import Annotated, List, Optional, TypedDict, Dict, Any

from pydantic import BaseModel, Field

from langgraph.graph import StateGraph, START, END
from langgraph.graph.message import add_messages
from langgraph.prebuilt import ToolNode, tools_condition
from langgraph.checkpoint.memory import MemorySaver

from langchain_openai import ChatOpenAI
from langchain_core.messages import SystemMessage, HumanMessage, AIMessage
from langchain_core.tools import tool

import re

### Web search tool using DuckDuckGo (no API key needed)

In [None]:
from ddgs import DDGS

@tool
def web_search(query: str, max_results: int = 5) -> List[Dict[str, Any]]:
    """
    Search the web for the query and return a list of results with title, href, and body.
    """
    results = []
    with DDGS() as ddgs:
        for r in ddgs.text(query, max_results=max_results):
            results.append({
            "title": r.get("title"),
            "href": r.get("href"),
            "snippet": r.get("body")
            })
    return results

@tool
def calculator(expression: str) -> str:
    """
    Safely evaluate a basic arithmetic expression.
    Allowed characters: digits, +, -, , /, %, **, parentheses, space, decimal point.
    """

    if not re.fullmatch(r"[0-9.+-*/%()\s*]", expression):
        raise ValueError("Disallowed characters in expression")
    # Evaluate in a restricted namespace
    try:
        result = eval(expression, {"builtins": {}}, {})

    except Exception as e:
        return f"Error evaluating expression: {e}"
    return str(result)

TOOLS = [web_search, calculator]

### ---------- Planning (structured output) ----------

In [3]:
class Plan(BaseModel):
    objectives: List[str] = Field(default_factory=list, description="High-level goals to accomplish")
    steps: List[str] = Field(default_factory=list, description="Ordered, concrete steps to complete the task")
    assumptions: List[str] = Field(default_factory=list, description="Assumptions or constraints discovered")
    clarifying_questions: List[str] = Field(default_factory=list, description="Questions to ask the user if needed")
    success_criteria: List[str] = Field(default_factory=list, description="How to know the task is done")

### ---------- State ----------

In [4]:
class AgentState(TypedDict):
    # Conversation and intermediate context
    messages: Annotated[List, add_messages]
    # Structured plan from the planner
    plan: Optional[Plan]
    # Which step index we are on (0-based)
    step_index: int
    # Collected outputs from each step
    results: List[Dict[str, Any]]
    # Final answer to return to the user
    final_answer: Optional[str]

### ---------- Models ----------
### You can swap models as preferred. Needs a model that supports tool-calling.

In [5]:
# Fixed analyze_and_plan function
def analyze_and_plan(state: AgentState) -> AgentState:
    """
    Analyze the user's ask and produce a Plan (objectives, steps, assumptions, etc.).
    """
    # Get the latest user message
    user_messages = [m for m in state["messages"] if isinstance(m, HumanMessage)]
    if not user_messages:
        raise ValueError("No user messages found in state")
    user_msg = user_messages[-1]

    plan: Plan = planner_llm.invoke([
        PLANNER_SYSTEM,
        HumanMessage(content=f"User task:\n{user_msg.content}\n\nReturn a plan.")
    ])

    # Optional: Present a sanitized plan summary to the conversation context
    plan_summary = "Proposed plan:\n"
    if plan.objectives:
        plan_summary += "- Objectives: " + "; ".join(plan.objectives) + "\n"
    if plan.steps:
        plan_summary += "- Steps:\n" + "\n".join([f"  {i+1}. {s}" for i, s in enumerate(plan.steps)])
    if plan.clarifying_questions:
        plan_summary += "\n- Clarifying questions: " + "; ".join(plan.clarifying_questions)

    return {
        "plan": plan,
        "messages": [AIMessage(content=plan_summary)],
        "step_index": 0
    }


In [6]:
DEFAULT_MODEL = os.environ.get("AGENT_MODEL", "gpt-4o-mini")

llm = ChatOpenAI(model=DEFAULT_MODEL, temperature=0)
planner_llm = llm.with_structured_output(Plan)  # For structured planning

### ---------- System messages ----------

In [7]:
PLANNER_SYSTEM = SystemMessage(content=(
                                        "You are a careful planner. Your job is to analyze the user's request and propose a clear, "
                                        "feasible plan before any execution. Think through the task internally, but only return a concise plan, "
                                        "not your private chain-of-thought. If you need clarifications, include them in clarifying_questions. "
                                        "Focus on relevant steps that can be executed with available tools or reasoning."
                                        ))

EXECUTOR_SYSTEM = SystemMessage(content=(
                                        "You are a precise executor. Follow the provided plan step-by-step. For each step:\n"
                                        "- Either call a tool if needed (e.g., web_search, calculator), or produce a short step result.\n"
                                        "- Keep responses concise. Do not reveal private reasoning or chain-of-thought.\n"
                                        "- Do not produce the final answer here; only complete the current step.\n"
                                        "If you need more context, state what you need succinctly."
                                        ))

REFLECTOR_SYSTEM = SystemMessage(content=(
                                        "You are a careful summarizer. Given the plan, steps taken, and results, produce the final answer.\n"
                                        "Do not reveal private chain-of-thought. Provide a clear, complete result suitable for the user."
                                        ))

### ---------- Nodes ----------

In [8]:
def analyze_and_plan(state: AgentState) -> AgentState:
    """
    Analyze the user's ask and produce a Plan (objectives, steps, assumptions, etc.).
    """
    # Get the latest user message
    user_messages = [m for m in state["messages"] if isinstance(m, HumanMessage)]
    if not user_messages:
        user_msg = user_messages[-1]

    plan: Plan = planner_llm.invoke([
        PLANNER_SYSTEM,
        HumanMessage(content=f"User task:\n{user_msg.content}\n\nReturn a plan.")
    ])

    # Optional: Present a sanitized plan summary to the conversation context
    plan_summary = "Proposed plan:\n"
    if plan.objectives:
        plan_summary += "- Objectives: " + "; ".join(plan.objectives) + "\n"
    if plan.steps:
        plan_summary += "- Steps:\n" + "\n".join([f"  {i+1}. {s}" for i, s in enumerate(plan.steps)])
    if plan.clarifying_questions:
        plan_summary += "\n- Clarifying questions: " + "; ".join(plan.clarifying_questions)

    return {
        "plan": plan,
        "messages": [AIMessage(content=plan_summary)],
        "step_index": 0
    }

### Bind tools to the executor LLM

In [9]:
executor_llm = llm.bind_tools(TOOLS)

def act_on_current_step(state: AgentState) -> AgentState:
    """
    Execute the current step: the model may either call a tool or provide a short step result.
    """
    plan = state.get("plan")
    if not plan or not plan.steps:
        # If no steps, fall back to simple answer
        return {"messages": [AIMessage(content="No steps available. I will answer directly.")]}
    i = state.get("step_index", 0)
    steps = plan.steps
    n = len(steps)
    if i >= n:
        # Already done
        return {}

    current_step = steps[i]
    # Give the model just enough context
    msgs = [
        EXECUTOR_SYSTEM,
        AIMessage(content=f"Plan steps ({n} total):\n" + "\n".join([f"{idx+1}. {s}" for idx, s in enumerate(steps)])),
        AIMessage(content=f"Current step {i+1}/{n}: {current_step}"),
    ]
    # Include last few messages from the state, if helpful
    # This keeps the context tighter to avoid accidental long history
    msgs += state["messages"][-6:]
    response = executor_llm.invoke(msgs)
    # The response (AIMessage) could include tool calls; ToolNode will handle them if present.
    return {"messages": [response]}


def advance_or_loop(state: AgentState) -> AgentState:
    """
    If the last assistant message didn't call a tool, consider the step complete and advance.
    Append the step result to state['results'].
    """
    plan = state.get("plan")
    if not plan or not plan.steps:
        return {}
    i = state.get("step_index", 0)
    steps = plan.steps
    if i >= len(steps):
        return {}

    # Find the last AI message content (not a tool message)
    last_ai_msgs = [m for m in state["messages"][::-1] if isinstance(m, AIMessage)]
    if not last_ai_msgs:
        return {}
    last_ai = last_ai_msgs[0]

    # Record the step output
    step_record = {
        "step_number": i + 1,
        "step_description": steps[i],
        "output": last_ai.content
    }

    new_results = list(state.get("results", []))
    new_results.append(step_record)

    return {
        "results": new_results,
        "step_index": i + 1
    }


def reflect_and_finalize(state: AgentState) -> AgentState:
    """
    Summarize the entire work: plan, steps taken, and their outputs, then produce the final answer.
    """
    plan = state.get("plan")
    results = state.get("results", [])

    plan_text = ""
    if plan:
        plan_text += "Objectives: " + "; ".join(plan.objectives or []) + "\n"
        plan_text += "Steps:\n" + "\n".join([f"{i+1}. {s}" for i, s in enumerate(plan.steps or [])])

    results_text = ""
    if results:
        results_text = "\nStep results:\n" + "\n".join(
            [f"{r['step_number']}. {r['step_description']} -> {r['output']}" for r in results]
        )

    msgs = [
        REFLECTOR_SYSTEM,
        AIMessage(content="Plan:\n" + plan_text),
        AIMessage(content=results_text),
        # Include the original user prompt for context
    ]

    # Add the last user message
    user_messages = [m for m in state["messages"] if isinstance(m, HumanMessage)]
    if user_messages:
        msgs.append(HumanMessage(content="Original user task:\n" + user_messages[-1].content))

    final_resp = llm.invoke(msgs)
    return {
        "final_answer": final_resp.content,
        "messages": [AIMessage(content=final_resp.content)]
    }

### ---------- Graph wiring ----------

In [10]:
def create_graph() -> StateGraph:
    workflow = StateGraph(AgentState)

    # Nodes
    workflow.add_node("plan", analyze_and_plan)
    workflow.add_node("act", act_on_current_step)
    workflow.add_node("advance", advance_or_loop)
    tool_node = ToolNode(TOOLS)
    workflow.add_node("tools", tool_node)
    workflow.add_node("finalize", reflect_and_finalize)

    # Edges
    workflow.add_edge(START, "plan")
    workflow.add_edge("plan", "act")

    # After "act", either go to tools (if there are tool calls) or advance
    workflow.add_conditional_edges(
        "act",
        tools_condition,  # inspects last AI message in state["messages"]
        {
            "tools": "tools",
            "end": "advance"
        }
    )
    # After tools execute, loop back to "act" so the model can see tool results
    workflow.add_edge("tools", "act")

    # After "advance", either continue to next step or finalize
    def should_continue(state: AgentState) -> str:
        plan = state.get("plan")
        i = state.get("step_index", 0)
        if not plan or not plan.steps:
            return "finalize"
        if i < len(plan.steps):
            return "act"
        return "finalize"

    workflow.add_conditional_edges("advance", should_continue, {"act": "act", "finalize": "finalize"})
    workflow.add_edge("finalize", END)

    return workflow

### ---------- Run ----------

In [11]:
# if name == "main":
# Build and compile graph with in-memory checkpointing (optional)
memory = MemorySaver()
graph = create_graph().compile(checkpointer=memory)

# Example task:
# - The agent will plan first, then execute steps with tools as needed.
user_task = input("Enter your task:\n> ").strip()
initial_state: AgentState = {
    "messages": [
        SystemMessage(content=(
            "You are a helpful assistant. Think step-by-step privately, "
            "but only share a short plan and the final results. Do not reveal your chain-of-thought."
        )),
        HumanMessage(content=user_task),
    ],
    "plan": None,
    "step_index": 0,
    "results": [],
    "final_answer": None
}

# Invoke the graph with thread configuration for the checkpointer
config = {"configurable": {"thread_id": "demo-thread-1"}}
result = graph.invoke(initial_state, config)

# Print outputs
print("\n--- Plan ---")
if result.get("plan"):
    plan = result["plan"]
    print("Objectives:", "; ".join(plan.objectives or []))
    print("Steps:")
    for idx, s in enumerate(plan.steps or []):
        print(f"  {idx+1}. {s}")
    if plan.clarifying_questions:
        print("Clarifying questions:", "; ".join(plan.clarifying_questions))
    if plan.assumptions:
        print("Assumptions:", "; ".join(plan.assumptions))
    if plan.success_criteria:
        print("Success criteria:", "; ".join(plan.success_criteria))

print("\n--- Step Results ---")
for r in result.get("results", []):
    print(f"{r['step_number']}. {r['step_description']}")
    print(f"   Output: {r['output']}")

print("\n--- Final Answer ---")
print(result.get("final_answer") or "(No final answer produced)")

UnboundLocalError: cannot access local variable 'user_msg' where it is not associated with a value

How it works

* plan node: Produces a structured Plan (objectives, steps, assumptions, clarifying questions, success criteria) using structured output. Shares a short plan summary with the conversation context.
* act node: Executes one step at a time. The model can call tools (web_search, calculator) or produce a short step result.
* ToolNode: Runs any tool calls and appends their outputs to messages; then act runs again, allowing the model to use tool output.
* advance node: If no tool call was made in act, we treat the step as completed, record its output, and advance to the next step.
* finalize node: Reflects over plan + step results and produces a final answer.
* Safety: The prompts instruct the model to reason privately and only share concise plan/outputs, never raw chain-of-thought.

Customization ideas

* Pause for plan approval: After plan, insert an interrupt or a human-in-the-loop gate to confirm or edit the plan before proceeding.
* Add more tools: e.g., structured web APIs, database lookups, code execution sandboxes.
* Memory and persistence: Use a real checkpointer (e.g., Postgres) if you need resumable runs.
* Guardrails: Put limits on max cycles per step to avoid infinite loops; add routing based on step type to choose specific tool subsets.
If you want, I can add an optional “plan approval” pause or convert this to a multi-agent pattern (planner/executor/critic) while keeping private reasoning hidden.

In [12]:
# Re-create and compile the graph with the corrected function
def create_graph_fixed() -> StateGraph:
    workflow = StateGraph(AgentState)

    # Nodes - using the corrected analyze_and_plan function from cell 8
    workflow.add_node("plan", analyze_and_plan)
    workflow.add_node("act", act_on_current_step)
    workflow.add_node("advance", advance_or_loop)
    tool_node = ToolNode(TOOLS)
    workflow.add_node("tools", tool_node)
    workflow.add_node("finalize", reflect_and_finalize)

    # Edges
    workflow.add_edge(START, "plan")
    workflow.add_edge("plan", "act")

    # After "act", either go to tools (if there are tool calls) or advance
    workflow.add_conditional_edges(
        "act",
        tools_condition,  # inspects last AI message in state["messages"]
        {
            "tools": "tools",
            "end": "advance"
        }
    )
    # After tools execute, loop back to "act" so the model can see tool results
    workflow.add_edge("tools", "act")

    # After "advance", either continue to next step or finalize
    def should_continue(state: AgentState) -> str:
        plan = state.get("plan")
        i = state.get("step_index", 0)
        if not plan or not plan.steps:
            return "finalize"
        if i < len(plan.steps):
            return "act"
        return "finalize"

    workflow.add_conditional_edges("advance", should_continue, {"act": "act", "finalize": "finalize"})
    workflow.add_edge("finalize", END)

    return workflow

# Build and compile graph with in-memory checkpointing using the fixed function
memory = MemorySaver()
graph_fixed = create_graph_fixed().compile(checkpointer=memory)

print("Graph compiled successfully with the fixed analyze_and_plan function!")


Graph compiled successfully with the fixed analyze_and_plan function!


In [13]:
# Test the fixed agent
user_task = input("Enter your task:\n> ").strip()
initial_state: AgentState = {
    "messages": [
        SystemMessage(content=(
            "You are a helpful assistant. Think step-by-step privately, "
            "but only share a short plan and the final results. Do not reveal your chain-of-thought."
        )),
        HumanMessage(content=user_task),
    ],
    "plan": None,
    "step_index": 0,
    "results": [],
    "final_answer": None
}

# Invoke the FIXED graph with thread configuration for the checkpointer
config = {"configurable": {"thread_id": "demo-thread-1"}}
result = graph_fixed.invoke(initial_state, config)

# Print outputs
print("\n--- Plan ---")
if result.get("plan"):
    plan = result["plan"]
    print("Objectives:", "; ".join(plan.objectives or []))
    print("Steps:")
    for idx, s in enumerate(plan.steps or []):
        print(f"  {idx+1}. {s}")
    if plan.clarifying_questions:
        print("Clarifying questions:", "; ".join(plan.clarifying_questions))
    if plan.assumptions:
        print("Assumptions:", "; ".join(plan.assumptions))
    if plan.success_criteria:
        print("Success criteria:", "; ".join(plan.success_criteria))

print("\n--- Step Results ---")
for r in result.get("results", []):
    print(f"{r['step_number']}. {r['step_description']}")
    print(f"   Output: {r['output']}")

print("\n--- Final Answer ---")
print(result.get("final_answer") or "(No final answer produced)")


UnboundLocalError: cannot access local variable 'user_msg' where it is not associated with a value

In [14]:
# Create a completely new function with a different name to avoid confusion
def analyze_and_plan_FIXED(state: AgentState) -> AgentState:
    """
    Analyze the user's ask and produce a Plan (objectives, steps, assumptions, etc.).
    FIXED VERSION: Properly handles case when no user messages exist.
    """
    # Get the latest user message
    user_messages = [m for m in state["messages"] if isinstance(m, HumanMessage)]
    if not user_messages:
        raise ValueError("No user messages found in state")
    user_msg = user_messages[-1]

    plan: Plan = planner_llm.invoke([
        PLANNER_SYSTEM,
        HumanMessage(content=f"User task:\n{user_msg.content}\n\nReturn a plan.")
    ])

    # Optional: Present a sanitized plan summary to the conversation context
    plan_summary = "Proposed plan:\n"
    if plan.objectives:
        plan_summary += "- Objectives: " + "; ".join(plan.objectives) + "\n"
    if plan.steps:
        plan_summary += "- Steps:\n" + "\n".join([f"  {i+1}. {s}" for i, s in enumerate(plan.steps)])
    if plan.clarifying_questions:
        plan_summary += "\n- Clarifying questions: " + "; ".join(plan.clarifying_questions)

    return {
        "plan": plan,
        "messages": [AIMessage(content=plan_summary)],
        "step_index": 0
    }

# Create the completely fixed graph using the new function name
def create_graph_ACTUALLY_FIXED() -> StateGraph:
    workflow = StateGraph(AgentState)

    # Nodes - using the ACTUALLY FIXED function
    workflow.add_node("plan", analyze_and_plan_FIXED)  # <- Note: different function name
    workflow.add_node("act", act_on_current_step)
    workflow.add_node("advance", advance_or_loop)
    tool_node = ToolNode(TOOLS)
    workflow.add_node("tools", tool_node)
    workflow.add_node("finalize", reflect_and_finalize)

    # Edges
    workflow.add_edge(START, "plan")
    workflow.add_edge("plan", "act")

    # After "act", either go to tools (if there are tool calls) or advance
    workflow.add_conditional_edges(
        "act",
        tools_condition,  # inspects last AI message in state["messages"]
        {
            "tools": "tools",
            "end": "advance"
        }
    )
    # After tools execute, loop back to "act" so the model can see tool results
    workflow.add_edge("tools", "act")

    # After "advance", either continue to next step or finalize
    def should_continue(state: AgentState) -> str:
        plan = state.get("plan")
        i = state.get("step_index", 0)
        if not plan or not plan.steps:
            return "finalize"
        if i < len(plan.steps):
            return "act"
        return "finalize"

    workflow.add_conditional_edges("advance", should_continue, {"act": "act", "finalize": "finalize"})
    workflow.add_edge("finalize", END)

    return workflow

# Build the ACTUALLY FIXED graph
memory_fixed = MemorySaver()
graph_ACTUALLY_FIXED = create_graph_ACTUALLY_FIXED().compile(checkpointer=memory_fixed)

print("Graph with ACTUALLY FIXED analyze_and_plan function compiled successfully!")


Graph with ACTUALLY FIXED analyze_and_plan function compiled successfully!


In [15]:
# Test the ACTUALLY FIXED agent
user_task = input("Enter your task:\n> ").strip()
initial_state: AgentState = {
    "messages": [
        SystemMessage(content=(
            "You are a helpful assistant. Think step-by-step privately, "
            "but only share a short plan and the final results. Do not reveal your chain-of-thought."
        )),
        HumanMessage(content=user_task),
    ],
    "plan": None,
    "step_index": 0,
    "results": [],
    "final_answer": None
}

# Invoke the ACTUALLY FIXED graph 
config = {"configurable": {"thread_id": "demo-thread-2"}}
result = graph_ACTUALLY_FIXED.invoke(initial_state, config)

# Print outputs
print("\n--- Plan ---")
if result.get("plan"):
    plan = result["plan"]
    print("Objectives:", "; ".join(plan.objectives or []))
    print("Steps:")
    for idx, s in enumerate(plan.steps or []):
        print(f"  {idx+1}. {s}")
    if plan.clarifying_questions:
        print("Clarifying questions:", "; ".join(plan.clarifying_questions))
    if plan.assumptions:
        print("Assumptions:", "; ".join(plan.assumptions))
    if plan.success_criteria:
        print("Success criteria:", "; ".join(plan.success_criteria))

print("\n--- Step Results ---")
for r in result.get("results", []):
    print(f"{r['step_number']}. {r['step_description']}")
    print(f"   Output: {r['output']}")

print("\n--- Final Answer ---")
print(result.get("final_answer") or "(No final answer produced)")


  with DDGS() as ddgs:
  with DDGS() as ddgs:
  with DDGS() as ddgs:
  with DDGS() as ddgs:
  with DDGS() as ddgs:
  with DDGS() as ddgs:
  with DDGS() as ddgs:
  with DDGS() as ddgs:
  with DDGS() as ddgs:
  with DDGS() as ddgs:
  with DDGS() as ddgs:
  with DDGS() as ddgs:


GraphRecursionError: Recursion limit of 25 reached without hitting a stop condition. You can increase the limit by setting the `recursion_limit` config key.
For troubleshooting, visit: https://python.langchain.com/docs/troubleshooting/errors/GRAPH_RECURSION_LIMIT

In [16]:
# Debug version with recursion limits and better logging
def create_graph_DEBUG() -> StateGraph:
    workflow = StateGraph(AgentState)

    # Enhanced functions with debugging
    def debug_should_continue(state: AgentState) -> str:
        plan = state.get("plan")
        i = state.get("step_index", 0)
        print(f"DEBUG should_continue: step_index={i}, plan exists={plan is not None}")
        if plan and plan.steps:
            print(f"DEBUG: Plan has {len(plan.steps)} steps")
            print(f"DEBUG: Current step index: {i}")
        
        if not plan or not plan.steps:
            print("DEBUG: No plan or no steps, going to finalize")
            return "finalize"
        if i < len(plan.steps):
            print(f"DEBUG: Step {i+1}/{len(plan.steps)}, continuing to act")
            return "act"
        print("DEBUG: All steps completed, going to finalize")
        return "finalize"

    def debug_advance_or_loop(state: AgentState) -> AgentState:
        """Enhanced advance function with debugging"""
        plan = state.get("plan")
        if not plan or not plan.steps:
            print("DEBUG advance: No plan or steps, returning empty")
            return {}
        
        i = state.get("step_index", 0)
        steps = plan.steps
        print(f"DEBUG advance: Current step {i}, total steps {len(steps)}")
        
        if i >= len(steps):
            print("DEBUG advance: Already at or past last step")
            return {}

        # Find the last AI message content (not a tool message)
        last_ai_msgs = [m for m in state["messages"][::-1] if isinstance(m, AIMessage)]
        if not last_ai_msgs:
            print("DEBUG advance: No AI messages found")
            return {}
        last_ai = last_ai_msgs[0]

        # Record the step output
        step_record = {
            "step_number": i + 1,
            "step_description": steps[i],
            "output": last_ai.content[:100] + "..." if len(last_ai.content) > 100 else last_ai.content
        }
        print(f"DEBUG advance: Recording step {i+1}: {step_record['step_description']}")

        new_results = list(state.get("results", []))
        new_results.append(step_record)

        new_step_index = i + 1
        print(f"DEBUG advance: Advancing from step {i} to step {new_step_index}")
        
        return {
            "results": new_results,
            "step_index": new_step_index
        }

    # Nodes - using the ACTUALLY FIXED function
    workflow.add_node("plan", analyze_and_plan_FIXED)
    workflow.add_node("act", act_on_current_step)
    workflow.add_node("advance", debug_advance_or_loop)  # Use debug version
    tool_node = ToolNode(TOOLS)
    workflow.add_node("tools", tool_node)
    workflow.add_node("finalize", reflect_and_finalize)

    # Edges
    workflow.add_edge(START, "plan")
    workflow.add_edge("plan", "act")

    # After "act", either go to tools (if there are tool calls) or advance
    workflow.add_conditional_edges(
        "act",
        tools_condition,  # inspects last AI message in state["messages"]
        {
            "tools": "tools",
            "end": "advance"
        }
    )
    # After tools execute, loop back to "act" so the model can see tool results
    workflow.add_edge("tools", "act")

    # After "advance", either continue to next step or finalize
    workflow.add_conditional_edges("advance", debug_should_continue, {"act": "act", "finalize": "finalize"})
    workflow.add_edge("finalize", END)

    return workflow

# Build the DEBUG graph with increased recursion limit
memory_debug = MemorySaver()
graph_DEBUG = create_graph_DEBUG().compile(checkpointer=memory_debug)

print("DEBUG graph compiled successfully!")


DEBUG graph compiled successfully!


In [17]:
# Test with DEBUG version and increased recursion limit
user_task = input("Enter your task:\n> ").strip()
initial_state: AgentState = {
    "messages": [
        SystemMessage(content=(
            "You are a helpful assistant. Think step-by-step privately, "
            "but only share a short plan and the final results. Do not reveal your chain-of-thought."
        )),
        HumanMessage(content=user_task),
    ],
    "plan": None,
    "step_index": 0,
    "results": [],
    "final_answer": None
}

# Invoke with increased recursion limit and debugging
config = {
    "configurable": {"thread_id": "debug-thread-1"},
    "recursion_limit": 50  # Increased from default 25
}

print("Starting DEBUG agent execution...")
print("=" * 50)

try:
    result = graph_DEBUG.invoke(initial_state, config)
    
    # Print outputs
    print("\n" + "=" * 50)
    print("DEBUG EXECUTION COMPLETED SUCCESSFULLY!")
    print("=" * 50)
    
    print("\n--- Plan ---")
    if result.get("plan"):
        plan = result["plan"]
        print("Objectives:", "; ".join(plan.objectives or []))
        print("Steps:")
        for idx, s in enumerate(plan.steps or []):
            print(f"  {idx+1}. {s}")
        if plan.clarifying_questions:
            print("Clarifying questions:", "; ".join(plan.clarifying_questions))
        if plan.assumptions:
            print("Assumptions:", "; ".join(plan.assumptions))
        if plan.success_criteria:
            print("Success criteria:", "; ".join(plan.success_criteria))

    print("\n--- Step Results ---")
    for r in result.get("results", []):
        print(f"{r['step_number']}. {r['step_description']}")
        print(f"   Output: {r['output']}")

    print("\n--- Final Answer ---")
    print(result.get("final_answer") or "(No final answer produced)")
    
except Exception as e:
    print(f"\nERROR OCCURRED: {e}")
    print("Check the debug output above to identify the issue.")


Starting DEBUG agent execution...


  with DDGS() as ddgs:
  with DDGS() as ddgs:
  with DDGS() as ddgs:
  with DDGS() as ddgs:
  with DDGS() as ddgs:



ERROR OCCURRED: Error code: 400 - {'error': {'message': "Invalid parameter: messages with role 'tool' must be a response to a preceeding message with 'tool_calls'.", 'type': 'invalid_request_error', 'param': 'messages.[3].role', 'code': None}}
Check the debug output above to identify the issue.


In [18]:
# Simple version without tools to test basic flow
def create_graph_SIMPLE() -> StateGraph:
    workflow = StateGraph(AgentState)

    def simple_act_on_current_step(state: AgentState) -> AgentState:
        """Execute the current step WITHOUT tools to avoid message formatting issues"""
        plan = state.get("plan")
        if not plan or not plan.steps:
            return {"messages": [AIMessage(content="No steps available. I will answer directly.")]}
        
        i = state.get("step_index", 0)
        steps = plan.steps
        n = len(steps)
        
        if i >= n:
            return {}

        current_step = steps[i]
        
        # Simple execution without tools - just use the LLM to process the step
        msgs = [
            SystemMessage(content="You are a step executor. Execute the given step and provide a clear result. Do NOT use tools."),
            HumanMessage(content=f"Execute this step: {current_step}")
        ]
        
        response = llm.invoke(msgs)  # Use regular LLM without tools
        return {"messages": [response]}

    def simple_advance_or_loop(state: AgentState) -> AgentState:
        """Simplified advance function"""
        plan = state.get("plan")
        if not plan or not plan.steps:
            return {}
        
        i = state.get("step_index", 0)
        steps = plan.steps
        
        if i >= len(steps):
            return {}

        # Find the last AI message
        last_ai_msgs = [m for m in state["messages"][::-1] if isinstance(m, AIMessage)]
        if not last_ai_msgs:
            return {}
        last_ai = last_ai_msgs[0]

        # Record the step
        step_record = {
            "step_number": i + 1,
            "step_description": steps[i],
            "output": last_ai.content
        }

        new_results = list(state.get("results", []))
        new_results.append(step_record)

        return {
            "results": new_results,
            "step_index": i + 1
        }

    def simple_should_continue(state: AgentState) -> str:
        plan = state.get("plan")
        i = state.get("step_index", 0)
        
        if not plan or not plan.steps:
            return "finalize"
        if i < len(plan.steps):
            return "act"
        return "finalize"

    # Nodes - NO TOOLS
    workflow.add_node("plan", analyze_and_plan_FIXED)
    workflow.add_node("act", simple_act_on_current_step)
    workflow.add_node("advance", simple_advance_or_loop)
    workflow.add_node("finalize", reflect_and_finalize)

    # Simple edges - NO TOOLS
    workflow.add_edge(START, "plan")
    workflow.add_edge("plan", "act")
    workflow.add_edge("act", "advance")  # Always advance after act (no tools)
    workflow.add_conditional_edges("advance", simple_should_continue, {"act": "act", "finalize": "finalize"})
    workflow.add_edge("finalize", END)

    return workflow

# Build the SIMPLE graph
memory_simple = MemorySaver()
graph_SIMPLE = create_graph_SIMPLE().compile(checkpointer=memory_simple)

print("SIMPLE graph (no tools) compiled successfully!")


SIMPLE graph (no tools) compiled successfully!


In [19]:
# Test the SIMPLE version (no tools)
user_task = input("Enter your task:\n> ").strip()
initial_state: AgentState = {
    "messages": [
        SystemMessage(content=(
            "You are a helpful assistant. Think step-by-step privately, "
            "but only share a short plan and the final results. Do not reveal your chain-of-thought."
        )),
        HumanMessage(content=user_task),
    ],
    "plan": None,
    "step_index": 0,
    "results": [],
    "final_answer": None
}

# Test the simple version
config = {
    "configurable": {"thread_id": "simple-thread-1"},
    "recursion_limit": 50
}

print("Starting SIMPLE agent execution (no tools)...")
print("=" * 50)

try:
    result = graph_SIMPLE.invoke(initial_state, config)
    
    print("\n" + "=" * 50)
    print("SIMPLE EXECUTION COMPLETED SUCCESSFULLY!")
    print("=" * 50)
    
    print("\n--- Plan ---")
    if result.get("plan"):
        plan = result["plan"]
        print("Objectives:", "; ".join(plan.objectives or []))
        print("Steps:")
        for idx, s in enumerate(plan.steps or []):
            print(f"  {idx+1}. {s}")
        if plan.clarifying_questions:
            print("Clarifying questions:", "; ".join(plan.clarifying_questions))
        if plan.assumptions:
            print("Assumptions:", "; ".join(plan.assumptions))
        if plan.success_criteria:
            print("Success criteria:", "; ".join(plan.success_criteria))

    print("\n--- Step Results ---")
    for r in result.get("results", []):
        print(f"{r['step_number']}. {r['step_description']}")
        print(f"   Output: {r['output']}")

    print("\n--- Final Answer ---")
    print(result.get("final_answer") or "(No final answer produced)")
    
except Exception as e:
    print(f"\nERROR OCCURRED: {e}")
    import traceback
    traceback.print_exc()


Starting SIMPLE agent execution (no tools)...

SIMPLE EXECUTION COMPLETED SUCCESSFULLY!

--- Plan ---
Objectives: Determine the current population of Wheaton, IL.
Steps:
  1. Research the latest population statistics for Wheaton, IL.
  2. Check reliable sources such as the U.S. Census Bureau or local government websites.
  3. Verify the data for accuracy and recency.
Clarifying questions: Are you looking for the most recent population estimate or a specific year?; Do you need additional demographic information along with the population count?
Assumptions: Population data is available and up-to-date from reliable sources.; The user is interested in the total population, not demographic breakdowns.
Success criteria: The current population number of Wheaton, IL is provided to the user.

--- Step Results ---
1. Research the latest population statistics for Wheaton, IL.
   Output: As of the latest available data, the population of Wheaton, IL, is approximately 53,000 residents. This figure 