# Deep Agents

## Planning, Delegation, Reasoning, and Reflection

This notebook explores the architecture of **deep agents** — agents that go beyond single-turn responses to plan, delegate, reason through multi-step problems, reflect on their own progress, and backtrack when stuck.

We cover the four key elements that transform a basic agent into a deep agent:

```
    ┌─────────────────────────────────────────────────────────────┐
    │                    Deep Agent Coordinator                   │
    │                                                             │
    │   ┌──────────┐  ┌──────────┐  ┌──────────┐  ┌──────────┐    │
    │   │ Planning │  │ Context  │  │Delegation│  │Reflection│    │
    │   │  & Task  │  │ Manage-  │  │& Sub-    │  │& Back-   │    │
    │   │  Decomp  │  │  ment    │  │ agents   │  │ tracking │    │
    │   └────┬─────┘  └────┬─────┘  └────┬─────┘  └────┬─────┘    │
    │        │             │             │             │          │
    │        v             v             v             v          │
    │   Break goals   Prioritize    Spawn focused  Evaluate       │
    │   into tasks    & compress    workers for    progress &     │
    │   with deps     context       subtasks       adjust plans   │
    └─────────────────────────────────────────────────────────────┘
```

Each section builds up reusable classes and functions that come together in the final **"Bringing It All Together"** section, where we build a complete Deep Research Agent.

## Setup and Imports

Make sure you have the required packages installed:

```bash
uv sync
```

In [None]:
import os
import json
import time
import asyncio
from dataclasses import dataclass, field
from typing import Any, TypedDict
from enum import Enum

from dotenv import load_dotenv
from pydantic import BaseModel
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage, SystemMessage
from langgraph.graph import StateGraph, END

# Load environment variables
load_dotenv()

# Verify API key
if not os.getenv("OPENAI_API_KEY"):
    raise ValueError("OPENAI_API_KEY not found in environment variables")

# Initialize the LLM
llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)

print("Setup complete!")

## Planning and Task Decomposition

Every complex project starts with a plan. Without explicit planning, agents tend to tackle whatever seems most immediate, which often isn't what's most important. Planning gives agents direction.

The core idea: take a high-level goal and break it down into a sequence of smaller, achievable tasks with explicit dependencies.

In [None]:
class Task(BaseModel):
    id: str
    description: str
    depends_on: list[str] = []
    estimated_hours: float
    status: str = "pending"

class Plan(BaseModel):
    goal: str
    tasks: list[Task]

offsite_plan = Plan(
    goal="Plan team offsite for 30 people",
    tasks=[
        Task(id="t1", description="Determine budget and dates", depends_on=[], estimated_hours=1),
        Task(id="t2", description="Survey team for location preferences", depends_on=["t1"], estimated_hours=0.5),
        Task(id="t3", description="Research venue options matching budget", depends_on=["t1", "t2"], estimated_hours=3),
        Task(id="t4", description="Get quotes from top 3 venues", depends_on=["t3"], estimated_hours=2),
        Task(id="t5", description="Book selected venue", depends_on=["t4"], estimated_hours=0.5),
        Task(id="t6", description="Plan agenda and activities", depends_on=["t5"], estimated_hours=4),
        Task(id="t7", description="Arrange transportation and logistics", depends_on=["t5"], estimated_hours=2),
        Task(id="t8", description="Send invitations with details", depends_on=["t6", "t7"], estimated_hours=1),
    ]
)

# Display the plan
print(f"Goal: {offsite_plan.goal}")
print(f"Tasks: {len(offsite_plan.tasks)}")
for task in offsite_plan.tasks:
    deps = f" (depends on: {', '.join(task.depends_on)})" if task.depends_on else ""
    print(f"  {task.id}: {task.description}{deps} [{task.estimated_hours}h]")

### Forward Planning vs. Backward Chaining

There are two main strategies for generating plans:

- **Forward planning** starts from the current state and asks, "What's the first thing I need to do? Then what?"
- **Backward chaining** starts from the goal and works backwards: "To achieve X, what needs to be true? To make that true, what needs to happen first?"

For learning-related goals, backward chaining often works beautifully. The implementation below adds a `max_depth` parameter to prevent infinite recursion.

In [None]:
def backward_chain(goal: str, llm, max_depth: int = 3, depth: int = 0) -> list[str]:
    """Generate prerequisites by working backward from goal.

    Args:
        goal: The goal to decompose.
        llm: The language model to use for generating prerequisites.
        max_depth: Maximum recursion depth to prevent infinite loops.
        depth: Current recursion depth (internal use).

    Returns:
        Ordered list of tasks from earliest prerequisite to final goal.
    """
    if depth >= max_depth:
        return [goal]

    prompt = f"""Given this goal: {goal}
    
What must be true or completed IMMEDIATELY BEFORE this goal can be achieved?
List only the direct prerequisites, not earlier steps.
Be specific and actionable.

Format: One prerequisite per line, no numbering."""
    
    prerequisites = llm.invoke(prompt).content.strip().split("\n")
    
    all_tasks = []
    for prereq in prerequisites:
        prereq = prereq.strip()
        if prereq:
            # Recursively find prerequisites of prerequisites
            # Note: backward_chain already includes prereq as its final element
            sub_prereqs = backward_chain(prereq, llm, max_depth=max_depth, depth=depth + 1)
            all_tasks.extend(sub_prereqs)
    
    all_tasks.append(goal)
    return all_tasks

# Test it with a concrete goal
tasks = backward_chain("Pass the AWS Solutions Architect certification exam", llm, max_depth=2)
print("Backward-chained tasks (from earliest to final goal):")
for i, task in enumerate(tasks, 1):
    print(f"  {i}. {task}")

## Context Management in Deep Agents

When an agent operates over extended periods, information accumulates. Eventually you hit the context window limit, and even before that, performance degrades as important information gets lost in noise.

Deep agents need explicit strategies for managing context: **prioritization** (what's important right now?), **compression** (summarize what you can), and **progressive summarization** (maintain multiple levels of detail).

In [None]:
def is_relevant(item: dict, current_task: str) -> bool:
    """Check if a context item is relevant to the current task.

    Uses simple keyword overlap between the item's content/description
    and the current task description.
    """
    item_text = json.dumps(item).lower()
    task_words = set(current_task.lower().split())
    # Remove common stop words
    stop_words = {"the", "a", "an", "is", "to", "and", "or", "for", "in", "on", "of", "with"}
    task_words -= stop_words
    overlap = sum(1 for word in task_words if word in item_text)
    return overlap >= 2  # At least 2 keyword matches


def prioritize_context(context_items: list[dict], current_task: str) -> list[dict]:
    """Rank context items by relevance to current task."""
    
    categories = {
        "goal": 10,          # Original objective - always keep
        "plan": 9,           # Current plan - critical
        "progress": 8,       # What's done/pending - important
        "findings": 7,       # Key results - valuable
        "recent": 6,         # Last few messages - continuity
        "reference": 3,      # Supporting details - nice to have
        "historical": 1,     # Old interactions - low priority
    }
    
    scored = []
    for item in context_items:
        base_score = categories.get(item.get("type"), 2)
        
        # Boost if relevant to current task
        if is_relevant(item, current_task):
            base_score *= 1.5
            
        scored.append((base_score, item))
    
    # Sort by score descending, return items
    scored.sort(key=lambda x: x[0], reverse=True)
    return [item for _, item in scored]


# Demonstrate context prioritization
context_items = [
    {"type": "goal", "content": "Research best practices for fine-tuning LLMs"},
    {"type": "historical", "content": "Last week we discussed prompt engineering basics"},
    {"type": "findings", "content": "LoRA reduces fine-tuning compute by 10x"},
    {"type": "recent", "content": "User asked about dataset preparation for fine-tuning"},
    {"type": "reference", "content": "Python documentation for asyncio module"},
    {"type": "progress", "content": "Completed literature review on fine-tuning methods"},
    {"type": "plan", "content": "Next: evaluate LoRA vs full fine-tuning on sample dataset"},
]

prioritized = prioritize_context(context_items, "evaluate fine-tuning approaches for LLMs")
print("Prioritized context (highest priority first):")
for item in prioritized:
    print(f"  [{item['type']:>12}] {item['content'][:70]}")

### Context Compression and Summarization

When you can't keep everything, summarize. The trick is summarizing at the right level of detail: enough to be useful, concise enough to save space.

In [None]:
async def compress_context(full_context: str, max_tokens: int, llm) -> str:
    """Compress context while preserving essential information."""
    
    prompt = f"""Summarize the following context, preserving:
1. The main goal or objective
2. Key decisions made and their rationale
3. Important findings or results
4. Current status and next steps
5. Any critical constraints or requirements

Remove:
- Redundant information
- Detailed intermediate reasoning
- Verbose explanations where a summary suffices

Context to compress:
{full_context}

Provide a compressed summary in under {max_tokens} tokens:"""
    
    summary = await llm.ainvoke(prompt)
    return summary.content


# Test compression
long_context = """
We are researching the best approaches to fine-tune large language models for a 
customer service chatbot. Our goal is to create a model that handles support tickets 
accurately and empathetically.

In our first phase, we reviewed 15 papers on fine-tuning techniques. We found that 
LoRA (Low-Rank Adaptation) is the most cost-effective approach for our scale, requiring 
only 10% of the compute of full fine-tuning while achieving 95% of the quality. We also 
considered QLoRA but decided against it due to quantization artifacts in our specific 
domain. Full fine-tuning was ruled out due to GPU cost constraints.

We collected 50,000 support ticket examples, cleaned and formatted them. The dataset 
covers billing issues (40%), technical support (35%), and general inquiries (25%).

Current status: We have trained a LoRA adapter on the dataset and initial results show 
82% accuracy on our test set. Next steps include hyperparameter tuning, testing with 
real support agents, and deploying to a staging environment.
"""

compressed = await compress_context(long_context, 100, llm)
print(f"Original length: {len(long_context)} chars")
print(f"Compressed length: {len(compressed)} chars")
print(f"\nCompressed:\n{compressed}")

## Subagent Spawning and Delegation

A single agent trying to do everything is like a one-person company trying to scale. Subagent spawning is the pattern where a coordinating agent creates specialized helper agents to handle specific subtasks. Each subagent has a focused role, relevant tools, and just the context it needs for its job.

In [None]:
class SubagentStatus(Enum):
    PENDING = "pending"
    RUNNING = "running"
    COMPLETED = "completed"
    FAILED = "failed"

@dataclass
class Subagent:
    id: str
    role: str
    task: str
    status: SubagentStatus
    result: Any = None
    error: str = None

class SubagentManager:
    """Manages the lifecycle of subagents: spawning, monitoring, and collecting results."""

    def __init__(self, llm=None):
        self.subagents: dict[str, Subagent] = {}
        self.shared_context: dict[str, Any] = {}
        self.llm = llm

    def spawn(self, role: str, task: str, context: dict) -> str:
        """Create and execute a new subagent."""
        agent_id = f"{role}_{len(self.subagents)}"

        subagent = Subagent(
            id=agent_id,
            role=role,
            task=task,
            status=SubagentStatus.PENDING
        )
        self.subagents[agent_id] = subagent

        # In practice, you'd actually create and run the agent here
        self._execute_subagent(subagent, context)

        return agent_id

    def _execute_subagent(self, subagent: Subagent, context: dict):
        """Execute a subagent's task using the LLM.

        In a production system, this would create an independent agent
        with its own tools and context. Here we simulate it with a
        focused LLM call.
        """
        subagent.status = SubagentStatus.RUNNING
        try:
            context_str = json.dumps(context, indent=2) if context else "No additional context."
            prompt = f"""You are a specialized {subagent.role} agent.

Your task: {subagent.task}

Context: {context_str}

Complete this task thoroughly and provide your results."""

            response = self.llm.invoke(prompt)
            subagent.result = response.content
            subagent.status = SubagentStatus.COMPLETED
        except Exception as e:
            subagent.error = str(e)
            subagent.status = SubagentStatus.FAILED

    def get_status(self, agent_id: str) -> SubagentStatus:
        return self.subagents[agent_id].status

    def get_result(self, agent_id: str) -> Any:
        subagent = self.subagents[agent_id]
        if subagent.status != SubagentStatus.COMPLETED:
            raise ValueError(f"Subagent {agent_id} not yet complete")
        return subagent.result

    def get_error(self, agent_id: str) -> str:
        """Get the error message from a failed subagent."""
        return self.subagents[agent_id].error

    def terminate(self, agent_id: str):
        """Terminate a running subagent."""
        subagent = self.subagents[agent_id]
        subagent.status = SubagentStatus.FAILED
        subagent.error = "Terminated by manager"

    def update_shared_context(self, role: str, result: Any):
        """Update shared context with a subagent's results."""
        self.shared_context[role] = result

    def collect_all(self) -> dict[str, Any]:
        """Collect results from all completed subagents."""
        return {
            agent_id: sa.result 
            for agent_id, sa in self.subagents.items()
            if sa.status == SubagentStatus.COMPLETED
        }


# Demonstrate SubagentManager
manager = SubagentManager(llm=llm)
agent_id = manager.spawn(
    role="researcher",
    task="List 3 key benefits of LoRA for fine-tuning large language models",
    context={"domain": "AI/ML", "audience": "engineers"}
)

print(f"Agent ID: {agent_id}")
print(f"Status: {manager.get_status(agent_id)}")
print(f"Result:\n{manager.get_result(agent_id)[:500]}")

### Parallel vs. Sequential Delegation

Some subtasks can run in parallel (independent of each other), while others must run sequentially (each builds on the previous). Deep agents need to handle both patterns.

In [None]:
async def parallel_delegation(tasks: list[dict], manager: SubagentManager) -> list[Any]:
    """Execute independent tasks in parallel."""
    
    async def run_task(task):
        agent_id = manager.spawn(
            role=task["role"],
            task=task["description"],
            context=task.get("context", {})
        )
        # Wait for completion (simplified)
        while manager.get_status(agent_id) == SubagentStatus.RUNNING:
            await asyncio.sleep(0.1)
        return manager.get_result(agent_id)
    
    results = await asyncio.gather(*[run_task(t) for t in tasks])
    return results


def sequential_delegation(tasks: list[dict], manager: SubagentManager) -> list[Any]:
    """Execute dependent tasks sequentially, passing results forward."""
    
    results = []
    accumulated_context = {}
    
    for task in tasks:
        # Include results from previous tasks in context
        task_context = {**task.get("context", {}), **accumulated_context}
        
        agent_id = manager.spawn(
            role=task["role"],
            task=task["description"],
            context=task_context
        )
        
        # Wait and get result
        result = manager.get_result(agent_id)
        results.append(result)
        
        # Add to accumulated context for next task
        accumulated_context[f"{task['role']}_result"] = result
    
    return results


# Test parallel delegation
parallel_tasks = [
    {"role": "researcher", "description": "Find 3 benefits of RAG systems", "context": {}},
    {"role": "analyst", "description": "List 3 challenges of RAG systems", "context": {}},
]

manager_parallel = SubagentManager(llm=llm)
parallel_results = await parallel_delegation(parallel_tasks, manager_parallel)
for task, result in zip(parallel_tasks, parallel_results):
    print(f"\n[{task['role']}] {task['description']}")
    print(f"  {result[:300]}...")

### Hybrid Delegation

Real workflows mix parallel and sequential execution. The research phase might run three searches in parallel, but the writing phase must wait for all research to complete.

In [None]:
async def hybrid_delegation(workflow: dict, manager: SubagentManager) -> dict:
    """Execute a workflow with both parallel and sequential phases."""
    
    results = {}
    
    for phase in workflow["phases"]:
        phase_tasks = phase["tasks"]
        
        if phase.get("parallel", False):
            # Run all tasks in this phase concurrently
            phase_results = await parallel_delegation(phase_tasks, manager)
        else:
            # Run tasks sequentially
            phase_results = sequential_delegation(phase_tasks, manager)
        
        results[phase["name"]] = phase_results
        
        # Add phase results to context for next phase
        for task, result in zip(phase_tasks, phase_results):
            manager.update_shared_context(task["role"], result)
    
    return results


# Test hybrid delegation
workflow = {
    "phases": [
        {
            "name": "research",
            "parallel": True,
            "tasks": [
                {"role": "topic_researcher", "description": "Summarize key concepts of LoRA fine-tuning in 2 sentences"},
                {"role": "comparison_researcher", "description": "Compare LoRA vs full fine-tuning in 2 sentences"},
            ]
        },
        {
            "name": "synthesis",
            "parallel": False,
            "tasks": [
                {"role": "synthesizer", "description": "Combine the research findings into a brief recommendation"},
            ]
        }
    ]
}

manager_hybrid = SubagentManager(llm=llm)
hybrid_results = await hybrid_delegation(workflow, manager_hybrid)
for phase_name, phase_results in hybrid_results.items():
    print(f"\n--- Phase: {phase_name} ---")
    for result in phase_results:
        print(f"  {result[:200]}...")

### Collecting and Integrating Results

The coordinator's job is to both delegate tasks *and* make sense of what comes back. Subagent results often need to be synthesized, conflicts resolved, and gaps identified.

In [None]:
def format_results(results: list[dict]) -> str:
    """Format research results for LLM synthesis.

    Each result dict should have 'source' and 'content' keys.
    """
    parts = []
    for i, r in enumerate(results, 1):
        source = r.get("source", f"Source {i}")
        content = r.get("content", str(r))
        parts.append(f"[{source}]:\n{content}")
    return "\n\n".join(parts)


def integrate_research_results(results: list[dict], llm) -> dict:
    """Synthesize results from multiple research subagents."""
    
    prompt = f"""You have research results from multiple specialists:

{format_results(results)}

Synthesize these into a coherent summary:
1. Identify key themes that appear across multiple sources
2. Note any contradictions or disagreements between sources
3. Highlight gaps where more research may be needed
4. Provide an overall conclusion

Be specific and cite which source each finding came from."""
    
    synthesis = llm.invoke(prompt)
    
    return {
        "synthesis": synthesis.content,
        "source_count": len(results),
        "sources": [r.get("source") for r in results]
    }


# Test integration
research_results = [
    {"source": "Topic Researcher", "content": "LoRA adds low-rank matrices to attention layers, achieving near full fine-tuning quality at 10% of the compute cost."},
    {"source": "Comparison Analyst", "content": "Full fine-tuning modifies all parameters and achieves marginally better results, but costs 10x more in compute. LoRA is preferred for most use cases."},
    {"source": "Practitioner", "content": "In production, LoRA adapters can be hot-swapped, allowing one base model to serve multiple specialized tasks."},
]

integrated = integrate_research_results(research_results, llm)
print(f"Sources integrated: {integrated['source_count']}")
print(f"\nSynthesis:\n{integrated['synthesis'][:600]}")

## Multi-Step Reasoning

Deep agents reason through complex problems step by step, exploring multiple angles when needed. This section covers sequential chains, parallel exploration, and adaptive stopping.

In [None]:
def sequential_reasoning(problem: str, max_steps: int, llm) -> dict:
    """Reason through a problem step by step."""
    
    steps = []
    current_state = f"Problem: {problem}\n\nLet me think through this step by step."
    
    for i in range(max_steps):
        prompt = f"""{current_state}

Step {i + 1}: What's the next logical step in solving this problem?
Consider what we know, what we need to find out, and what follows logically.

If we have enough information to give a final answer, say \"CONCLUSION: [answer]\"
Otherwise, describe your reasoning for this step."""
        
        response = llm.invoke(prompt).content
        steps.append({"step": i + 1, "reasoning": response})
        
        if response.startswith("CONCLUSION:"):
            return {
                "conclusion": response.replace("CONCLUSION:", "").strip(),
                "steps": steps,
                "total_steps": i + 1
            }
        
        current_state += f"\n\nStep {i + 1}: {response}"
    
    return {
        "conclusion": None,
        "steps": steps,
        "total_steps": max_steps,
        "status": "max_steps_reached"
    }


# Test sequential reasoning
result = sequential_reasoning(
    "Should a startup with a 5-person team use microservices or a monolith architecture?",
    max_steps=4,
    llm=llm
)
print(f"Completed in {result['total_steps']} steps")
if result.get("conclusion"):
    print(f"Conclusion: {result['conclusion'][:300]}")
else:
    print("Final step reasoning:")
    print(f"  {result['steps'][-1]['reasoning'][:300]}")

### Parallel Reasoning Paths

For some problems, multiple reasoning approaches might lead to an answer. Rather than committing to one path, deep agents can explore several in parallel and compare results.

In [None]:
def format_reasoning_results(results: list[dict]) -> str:
    """Format parallel reasoning results for comparison."""
    parts = []
    for r in results:
        parts.append(f"Approach: {r['approach']}\nReasoning: {r['reasoning']}")
    return "\n\n---\n\n".join(parts)


async def parallel_reasoning(problem: str, approaches: list[str], llm) -> dict:
    """Try multiple reasoning approaches in parallel."""
    
    async def try_approach(approach: str):
        prompt = f"""Problem: {problem}

Approach this problem using: {approach}

Think through the problem using this approach and provide your conclusion.
Explain your reasoning clearly."""
        
        response = await llm.ainvoke(prompt)
        return {"approach": approach, "reasoning": response.content}
    
    results = await asyncio.gather(*[try_approach(a) for a in approaches])
    
    # Compare results
    comparison_prompt = f"""Problem: {problem}

Multiple reasoning approaches were tried:

{format_reasoning_results(results)}

Compare these approaches:
1. Do they reach the same conclusion?
2. Which approach seems most rigorous?
3. Are there any errors in reasoning?
4. What is the best final answer?"""
    
    comparison = llm.invoke(comparison_prompt)
    
    return {
        "approaches": results,
        "comparison": comparison.content
    }


# Test parallel reasoning
result = await parallel_reasoning(
    "What is the most effective way to reduce hallucinations in LLM applications?",
    approaches=[
        "Engineering perspective (technical solutions)",
        "Process perspective (workflow and validation)",
        "User experience perspective (design patterns)",
    ],
    llm=llm
)

print("Approaches tested:")
for approach in result["approaches"]:
    print(f"\n  [{approach['approach']}]")
    print(f"  {approach['reasoning'][:150]}...")

print(f"\nComparison:\n{result['comparison'][:400]}")

### Adaptive Stopping

Deep reasoning is powerful but expensive. Every additional reasoning step costs time and tokens. Adaptive reasoning uses confidence tracking and plateau detection to know when to stop:

- **Confidence threshold**: The agent believes it has a reliable answer
- **Diminishing returns**: Recent steps aren't adding new insights
- **Step limit**: Hard cap to prevent runaway reasoning
- **Convergence**: Multiple approaches reach the same conclusion

In [None]:
def format_reasoning_history(steps: list[dict]) -> str:
    """Format the reasoning history for inclusion in prompts."""
    if not steps:
        return ""
    parts = []
    for s in steps:
        parts.append(f"Step {s['step']}: {s['reasoning']}")
    return "\n\n".join(parts)


def parse_reasoning_response(response: str) -> tuple[str, float, str | None]:
    """Parse a reasoning response into (reasoning, confidence, optional_answer).

    Expected format in response:
        REASONING: ...
        CONFIDENCE: 0.X
        FINAL_ANSWER: ... (optional)
    """
    reasoning = response
    confidence = 0.5
    answer = None

    if "REASONING:" in response:
        reasoning = response.split("REASONING:")[-1].split("CONFIDENCE:")[0].strip()
    if "CONFIDENCE:" in response:
        try:
            conf_str = response.split("CONFIDENCE:")[-1].split("\n")[0].strip()
            confidence = float(conf_str)
        except (ValueError, IndexError):
            confidence = 0.5
    if "FINAL_ANSWER:" in response:
        answer = response.split("FINAL_ANSWER:")[-1].strip()

    return reasoning, confidence, answer


def get_final_answer(problem: str, steps: list[dict], llm) -> str:
    """Ask the LLM for a final answer given the reasoning history."""
    history = format_reasoning_history(steps)
    prompt = f"""Problem: {problem}

Based on this reasoning:
{history}

Provide a clear, definitive final answer in 2-3 sentences."""
    return llm.invoke(prompt).content


def adaptive_reasoning(problem: str, llm, max_steps: int = 10) -> dict:
    """Reason with adaptive stopping based on confidence and progress."""
    
    steps = []
    confidence_history = []
    
    for i in range(max_steps):
        # Generate next reasoning step
        context = format_reasoning_history(steps)
        
        prompt = f"""Problem: {problem}

Reasoning so far:
{context if context else "None yet - this is step 1"}

Continue reasoning. After your analysis, rate your confidence (0.0-1.0)
that you can now give a correct final answer.

Format:
REASONING: Your next step of analysis
CONFIDENCE: 0.X
FINAL_ANSWER: [only if confidence >= 0.8] Your answer"""
        
        response = llm.invoke(prompt).content
        reasoning, confidence, answer = parse_reasoning_response(response)
        
        steps.append({"step": i + 1, "reasoning": reasoning})
        confidence_history.append(confidence)
        
        # Check stopping conditions
        if answer:  # Model provided final answer
            return {"answer": answer, "steps": steps, "confidence": confidence}
        
        if confidence >= 0.85:  # High confidence - prompt for answer
            final = get_final_answer(problem, steps, llm)
            return {"answer": final, "steps": steps, "confidence": confidence}
        
        # Check for plateau (no confidence improvement in 3 steps)
        if len(confidence_history) >= 3:
            recent = confidence_history[-3:]
            if max(recent) - min(recent) < 0.05:
                final = get_final_answer(problem, steps, llm)
                return {"answer": final, "steps": steps, "confidence": confidence, 
                        "note": "Stopped due to confidence plateau"}
    
    # Max steps reached
    final = get_final_answer(problem, steps, llm)
    return {"answer": final, "steps": steps, "confidence": confidence_history[-1],
            "note": "Max steps reached"}


# Test adaptive reasoning
result = adaptive_reasoning(
    "Is it better to use RAG or fine-tuning for a customer support chatbot with 10,000 FAQ entries?",
    llm=llm,
    max_steps=5
)

print(f"Steps taken: {len(result['steps'])}")
print(f"Final confidence: {result.get('confidence', 'N/A')}")
if result.get("note"):
    print(f"Note: {result['note']}")
print(f"\nAnswer: {result['answer'][:400]}")

## Self-Reflection and Metacognition

Deep agents can think about their own thinking. They evaluate whether they're making progress, critique their own outputs, and adjust strategies when something isn't working.

In [None]:
def format_plan(plan: Plan) -> str:
    """Format a Plan object for display in prompts."""
    lines = [f"Goal: {plan.goal}", "Tasks:"]
    for t in plan.tasks:
        deps = f" (depends on: {', '.join(t.depends_on)})" if t.depends_on else ""
        lines.append(f"  [{t.status}] {t.id}: {t.description}{deps}")
    return "\n".join(lines)


def format_completed(completed_tasks: list[str]) -> str:
    """Format a list of completed task descriptions for prompts."""
    if not completed_tasks:
        return "(none yet)"
    return "\n".join(f"  - {task}" for task in completed_tasks)


def parse_plan(plan_text: str) -> Plan:
    """Parse LLM-generated plan text into a Plan object.

    Creates tasks from numbered or bulleted lines in the LLM's response.
    """
    lines = plan_text.strip().split("\n")
    tasks = []
    for i, line in enumerate(lines):
        line = line.strip().lstrip("0123456789.-) ")
        if line and len(line) > 5:
            tasks.append(Task(
                id=f"t{i+1}",
                description=line,
                estimated_hours=1.0
            ))
    if not tasks:
        tasks = [Task(id="t1", description="Re-evaluate approach", estimated_hours=1.0)]
    return Plan(goal="Revised plan", tasks=tasks)


def evaluate_progress(goal: str, plan: Plan, completed_tasks: list[str], llm) -> dict:
    """Assess progress toward the overall goal."""
    
    total_tasks = len(plan.tasks)
    completed_count = len(completed_tasks)
    
    prompt = f"""Goal: {goal}

Original plan:
{format_plan(plan)}

Completed tasks:
{format_completed(completed_tasks)}

Progress: {completed_count}/{total_tasks} tasks complete

Evaluate:
1. Are we on track to achieve the goal?
2. Have completed tasks actually moved us closer to the goal?
3. Are there any warning signs or blockers?
4. Should we adjust the remaining plan?

Be honest and specific."""
    
    evaluation = llm.invoke(prompt)
    
    return {
        "progress_fraction": completed_count / total_tasks if total_tasks > 0 else 0,
        "evaluation": evaluation.content,
        "status": "on_track" if completed_count / total_tasks > 0.3 else "needs_review"
    }


def adjust_strategy(current_plan: Plan, evaluation: dict, llm) -> Plan:
    """Modify plan based on progress evaluation."""
    
    if "on_track" in evaluation.get("status", ""):
        return current_plan  # No adjustment needed
    
    prompt = f"""Current plan:
{format_plan(current_plan)}

Progress evaluation:
{evaluation['evaluation']}

The current approach isn't working well. Propose adjustments:
1. Which remaining tasks should be modified or replaced?
2. Are there new tasks that should be added?
3. Should any tasks be removed or deprioritized?
4. What's the revised strategy?

Provide a revised task list (one task per line)."""
    
    revised = llm.invoke(prompt)
    
    # In practice, you'd parse this into a new Plan object
    return parse_plan(revised.content)


# Test evaluation
progress = evaluate_progress(
    goal=offsite_plan.goal,
    plan=offsite_plan,
    completed_tasks=["Determine budget and dates", "Survey team for location preferences"],
    llm=llm
)
print(f"Progress: {progress['progress_fraction']:.0%}")
print(f"Status: {progress['status']}")
print(f"\nEvaluation:\n{progress['evaluation'][:400]}")

### The Reflection Loop with LangGraph

Reflection is most powerful when built into the agent's core loop. After completing significant work, the agent evaluates and potentially revises before moving on.

The following uses LangGraph's `StateGraph` to create a structured execute-reflect-replan cycle.

In [None]:
def execute_task(task: dict) -> str:
    """Execute a single task using the LLM.

    In a real system, this might call tools, APIs, or spawn subagents.
    Here we simulate execution with an LLM call.
    """
    prompt = f"""You are executing this task: {task.get('description', task.get('id', 'unknown'))}

Complete this task and provide the result. Be concise (2-3 sentences)."""
    return llm.invoke(prompt).content


class ReflectiveState(TypedDict):
    goal: str
    plan: dict
    current_task_idx: int
    task_results: dict
    reflection: str
    needs_replanning: bool


def execute_task_node(state: ReflectiveState) -> ReflectiveState:
    """Execute the current task."""
    task = state["plan"]["tasks"][state["current_task_idx"]]
    result = execute_task(task)
    
    new_results = {**state["task_results"], task["id"]: result}
    return {"task_results": new_results, "current_task_idx": state["current_task_idx"] + 1}


def reflect_node(state: ReflectiveState) -> ReflectiveState:
    """Reflect on progress after task completion."""
    task_idx = state["current_task_idx"] - 1  # Just completed
    
    reflection_prompt = f"""Goal: {state['goal']}
    
Completed task: {state['plan']['tasks'][task_idx]['description']}
Result: {list(state['task_results'].values())[-1]}

Reflect:
1. Did this task achieve what we expected?
2. Are we still on track for the overall goal?
3. Should we adjust the plan?

If the plan needs significant changes, say \"REPLAN NEEDED\".
Otherwise, say \"CONTINUE\"."""
    
    reflection = llm.invoke(reflection_prompt).content
    
    needs_replan = "REPLAN NEEDED" in reflection
    return {"reflection": reflection, "needs_replanning": needs_replan}


def replan_node(state: ReflectiveState) -> ReflectiveState:
    """Create a new plan based on reflection."""
    plan_obj = Plan(
        goal=state["goal"],
        tasks=[Task(**t) for t in state["plan"]["tasks"]]
    )
    new_plan = adjust_strategy(
        plan_obj,
        {"evaluation": state["reflection"], "status": "needs_review"},
        llm
    )
    new_plan_dict = {
        "goal": new_plan.goal,
        "tasks": [t.model_dump() for t in new_plan.tasks]
    }
    return {"plan": new_plan_dict, "needs_replanning": False, "current_task_idx": 0}


def route_after_reflection(state: ReflectiveState) -> str:
    """Decide whether to continue, replan, or finish."""
    if state["needs_replanning"]:
        return "replan"
    if state["current_task_idx"] >= len(state["plan"]["tasks"]):
        return "finish"
    return "continue"


# Build the graph
graph = StateGraph(ReflectiveState)
graph.add_node("execute", execute_task_node)
graph.add_node("reflect", reflect_node)
graph.add_node("replan", replan_node)

graph.set_entry_point("execute")
graph.add_edge("execute", "reflect")
graph.add_conditional_edges("reflect", route_after_reflection, {
    "continue": "execute",
    "replan": "replan",
    "finish": END
})
graph.add_edge("replan", "execute")

reflective_agent = graph.compile()


# Test with a small plan (2 tasks for speed)
small_plan = Plan(
    goal="Prepare a presentation on LoRA fine-tuning",
    tasks=[
        Task(id="t1", description="Research key concepts of LoRA", depends_on=[], estimated_hours=1),
        Task(id="t2", description="Create outline with 3 main sections", depends_on=["t1"], estimated_hours=0.5),
    ]
)

result = reflective_agent.invoke({
    "goal": small_plan.goal,
    "plan": {"goal": small_plan.goal, "tasks": [t.model_dump() for t in small_plan.tasks]},
    "current_task_idx": 0,
    "task_results": {},
    "reflection": "",
    "needs_replanning": False,
})

print(f"Goal: {result['goal']}")
print(f"Tasks completed: {len(result['task_results'])}")
for task_id, task_result in result["task_results"].items():
    print(f"\n  [{task_id}]: {task_result[:200]}")
print(f"\nFinal reflection: {result['reflection'][:300]}")

## Backtracking and Recovery

Even with good planning and reflection, agents sometimes go down wrong paths. Deep agents need the ability to recognize dead ends and backtrack to try alternative approaches.

In [None]:
def detect_dead_end(state: dict, history: list[dict]) -> dict:
    """Check if the agent is stuck.

    Detects three patterns: repeated failures, task cycling,
    and lack of progress.
    """
    # Check for repeated failures
    recent_failures = [h for h in history[-5:] if h.get("status") == "failed"]
    if len(recent_failures) >= 3:
        return {"is_stuck": True, "reason": "repeated_failures"}
    
    # Check for cycling (same task attempted multiple times)
    recent_tasks = [h.get("task_id") for h in history[-10:]]
    for task_id in set(recent_tasks):
        if task_id and recent_tasks.count(task_id) >= 3:
            return {"is_stuck": True, "reason": "task_cycling", "task": task_id}
    
    # Check for lack of progress
    if len(history) > 10:
        recent_progress = sum(1 for h in history[-10:] if h.get("progress_made"))
        if recent_progress < 2:
            return {"is_stuck": True, "reason": "no_progress"}
    
    return {"is_stuck": False}


# Test dead end detection
history_stuck = [
    {"task_id": "t3", "status": "failed", "progress_made": False},
    {"task_id": "t3", "status": "failed", "progress_made": False},
    {"task_id": "t3", "status": "failed", "progress_made": False},
]
print(f"Repeated failures: {detect_dead_end({}, history_stuck)}")

history_cycling = [{"task_id": "t1"}, {"task_id": "t2"}, {"task_id": "t1"}, 
                   {"task_id": "t2"}, {"task_id": "t1"}, {"task_id": "t2"}]
print(f"Task cycling: {detect_dead_end({}, history_cycling)}")

history_ok = [{"task_id": "t1", "status": "completed", "progress_made": True}]
print(f"Normal progress: {detect_dead_end({}, history_ok)}")

In [None]:
@dataclass
class ExplorationState:
    id: str
    task_state: dict
    decision_made: str
    alternatives: list[str]
    timestamp: float


class ExplorationHistory:
    """Maintains checkpoints at decision points for backtracking."""

    def __init__(self):
        self.states: list[ExplorationState] = []
        self.current_idx: int = -1

    def checkpoint(self, state: dict, decision: str, alternatives: list[str]):
        """Save a decision point we might want to return to."""
        exploration_state = ExplorationState(
            id=f"checkpoint_{len(self.states)}",
            task_state=state.copy(),
            decision_made=decision,
            alternatives=alternatives,
            timestamp=time.time()
        )
        self.states.append(exploration_state)
        self.current_idx = len(self.states) - 1

    def backtrack(self) -> tuple[dict, str] | None:
        """Return to most recent checkpoint with untried alternatives."""
        for i in range(self.current_idx, -1, -1):
            state = self.states[i]
            if state.alternatives:
                # Try next alternative
                next_alternative = state.alternatives.pop(0)
                self.current_idx = i
                return state.task_state, next_alternative

        return None  # No alternatives left

In [None]:
def handle_dead_end(state: dict, history: ExplorationHistory, llm) -> dict:
    """Recover from a dead end by backtracking and trying alternatives."""
    
    backtrack_result = history.backtrack()
    
    if backtrack_result is None:
        # No alternatives left - report failure
        return {
            "status": "failed",
            "reason": "All alternatives exhausted"
        }
    
    restored_state, alternative = backtrack_result
    
    # Log the backtrack
    print(f"Backtracking to try alternative: {alternative}")
    
    # Continue from restored state with new approach
    return {
        "status": "backtracked",
        "restored_state": restored_state,
        "new_approach": alternative
    }


# Demonstrate backtracking
history = ExplorationHistory()

# Checkpoint 1: choosing a venue approach
history.checkpoint(
    state={"task": "find_venue", "progress": 0.2},
    decision="Search online databases",
    alternatives=["Ask local contacts", "Hire an event planner"]
)

# Checkpoint 2: choosing a specific database
history.checkpoint(
    state={"task": "find_venue", "progress": 0.3, "approach": "online"},
    decision="Use Eventbrite",
    alternatives=["Use Google Maps", "Use Yelp"]
)

# Simulate hitting dead ends
print("Dead end detected! Online search yielded no results.")
result = handle_dead_end({}, history, llm)
print(f"Result: {result}")

print("\nAnother dead end!")
result = handle_dead_end({}, history, llm)
print(f"Result: {result}")

print("\nYet another dead end!")
result = handle_dead_end({}, history, llm)
print(f"Result: {result}")

## Bringing It All Together

Now let's combine planning, delegation, reasoning, reflection, and backtracking into a single cohesive pipeline. We'll build a **Deep Research Agent** that:

1. **Plans** — Decomposes a research question into sub-questions
2. **Delegates** — Spawns subagents to research each sub-question in parallel
3. **Reasons** — Synthesizes findings using parallel reasoning approaches
4. **Reflects** — Evaluates whether the research is complete and identifies gaps
5. **Backtracks** — Checks if we're stuck and tries alternatives if needed

In [None]:
async def deep_research_agent(question: str, llm, max_iterations: int = 3) -> dict:
    """A deep agent that researches a question using all four pillars.

    Combines planning, delegation, reasoning, reflection, and backtracking
    to produce a comprehensive research report.
    """
    print(f"{'='*60}")
    print(f"DEEP RESEARCH AGENT")
    print(f"Question: {question}")
    print(f"{'='*60}")

    # -------------------------------------------------------
    # PHASE 1: PLANNING -- Decompose the question
    # -------------------------------------------------------
    print("\n--- Phase 1: Planning ---")
    planning_prompt = f"""Break this research question into 3 specific sub-questions
that, when answered, would provide a comprehensive answer:

Question: {question}

Format each sub-question on its own line, no numbering."""

    sub_questions_raw = llm.invoke(planning_prompt).content.strip().split("\n")
    sub_questions = [q.strip() for q in sub_questions_raw if q.strip()][:3]

    plan = Plan(
        goal=question,
        tasks=[
            Task(id=f"research_{i+1}", description=sq, estimated_hours=1)
            for i, sq in enumerate(sub_questions)
        ]
    )

    print(f"Plan created with {len(plan.tasks)} sub-questions:")
    for task in plan.tasks:
        print(f"  {task.id}: {task.description}")

    # -------------------------------------------------------
    # PHASE 2: DELEGATION -- Research each sub-question
    # -------------------------------------------------------
    print("\n--- Phase 2: Delegation (parallel) ---")
    manager = SubagentManager(llm=llm)

    research_tasks = [
        {
            "role": f"researcher_{i+1}",
            "description": f"Research this question thoroughly and provide key findings: {sq}",
            "context": {"main_question": question}
        }
        for i, sq in enumerate(sub_questions)
    ]

    research_results_raw = await parallel_delegation(research_tasks, manager)

    research_results = [
        {"source": f"Researcher {i+1}: {sub_questions[i][:50]}", "content": result}
        for i, result in enumerate(research_results_raw)
    ]

    print(f"Received {len(research_results)} research reports")
    for r in research_results:
        print(f"  [{r['source'][:40]}...] {r['content'][:80]}...")

    # -------------------------------------------------------
    # PHASE 3: REASONING -- Synthesize findings
    # -------------------------------------------------------
    print("\n--- Phase 3: Reasoning (parallel approaches) ---")

    findings_text = format_results(research_results)
    synthesis_problem = f"""Given these research findings about '{question}':

{findings_text}

What is the comprehensive answer to the original question?"""

    reasoning_result = await parallel_reasoning(
        synthesis_problem,
        approaches=[
            "Analytical synthesis (identify patterns and themes)",
            "Critical evaluation (assess strength of evidence)",
        ],
        llm=llm
    )

    print(f"Reasoning complete with {len(reasoning_result['approaches'])} approaches")

    # -------------------------------------------------------
    # PHASE 4: REFLECTION -- Evaluate completeness
    # -------------------------------------------------------
    print("\n--- Phase 4: Reflection ---")

    completed_tasks = [t.description for t in plan.tasks]
    progress = evaluate_progress(
        goal=question,
        plan=plan,
        completed_tasks=completed_tasks,
        llm=llm
    )

    print(f"Progress: {progress['progress_fraction']:.0%}")
    print(f"Status: {progress['status']}")

    # -------------------------------------------------------
    # PHASE 5: BACKTRACKING CHECK -- Are we stuck?
    # -------------------------------------------------------
    print("\n--- Phase 5: Backtracking check ---")

    exec_history = [
        {"task_id": t.id, "status": "completed", "progress_made": True}
        for t in plan.tasks
    ]
    dead_end_check = detect_dead_end({}, exec_history)
    print(f"Stuck? {dead_end_check['is_stuck']}")

    # -------------------------------------------------------
    # FINAL OUTPUT
    # -------------------------------------------------------
    print(f"\n{'='*60}")
    print("RESEARCH COMPLETE")
    print(f"{'='*60}")

    final_report = {
        "question": question,
        "sub_questions": sub_questions,
        "research_findings": [r["content"][:300] for r in research_results],
        "synthesis": reasoning_result["comparison"],
        "progress_evaluation": progress["evaluation"],
        "is_stuck": dead_end_check["is_stuck"],
    }

    return final_report


# Run the deep research agent
report = await deep_research_agent(
    "What are the most effective strategies for reducing hallucinations in production LLM applications?",
    llm=llm
)

print(f"\n{'='*60}")
print("FINAL SYNTHESIS")
print(f"{'='*60}")
print(report["synthesis"][:800])