# Chapter 8b — Plan-and-Execute Agent

**Pattern:** Separate planning from execution. A Planner creates a structured plan, an Executor runs one step at a time, and a Replanner adapts after each step.

```
Goal → [Planner] → JSON Plan → [Executor x N] → [Replanner] → done
                                    ↑_________________________|
```

### How This Builds on Earlier Chapters

| Chapter | Pattern | What we learned | How Plan-and-Execute extends it |
|---------|---------|-----------------|--------------------------------|
| Ch 2 | Routing | Coordinator delegates to specialists | Planner *routes each step* to the right tool via `tool_hint` |
| Ch 3 | Parallelization | Run agents concurrently | Plan steps are independent — future optimization can parallelize them |
| Ch 5 | Tool Use | Single agent, single tool call | Executor uses **multiple tools** across a structured plan |
| Ch 6 | Reflection | Adversarial reviewer corrects output | Replanner reviews progress and **revises the plan** if results change the task |
| Ch 7 | Sequential | Fixed A→B→C pipeline | Plan-and-Execute is a **dynamic** sequence — the Replanner can rewrite remaining steps |
| Ch 8a | ReAct | Inline reasoning, no upfront plan | Plan-and-Execute commits to a **visible, auditable plan** before executing |

### When to Use Plan-and-Execute vs ReAct

| Dimension | Plan-and-Execute | ReAct (Ch 8a) |
|-----------|-----------------|---------------|
| Task structure | Known upfront | Emergent |
| Parallelism | Possible (future) | No |
| Plan visibility | **Explicit JSON** — inspectable, loggable | Implicit in trace |
| Recovery from surprises | Replanner loop | Each Thought step |
| Best for | Long-horizon tasks (6+ steps) | Short exploratory tasks (2-4 steps) |

### The Scenario

You're a **developer** who wants to research a technical topic, write runnable code that demonstrates the concept, execute it, and save everything to files. This is a **structured, multi-step task** — the kind that benefits from an upfront plan rather than ad-hoc exploration.

Three specialized agents collaborate:
1. **PlannerAgent** — Breaks the goal into an ordered JSON plan (never executes)
2. **ExecutorAgent** — Runs one step at a time using tools (never plans ahead)
3. **ReplannerAgent** — Reviews progress and decides: continue, replan, or done

In [36]:
import os
import json
import nest_asyncio
nest_asyncio.apply()

from dotenv import load_dotenv
load_dotenv()
assert os.environ.get("GOOGLE_API_KEY"), "Set GOOGLE_API_KEY first"
print("Google API Key set:", bool(os.environ.get("GOOGLE_API_KEY")))

Google API Key set: True


In [37]:
from google.adk.agents import LlmAgent
from google.adk.runners import Runner
from google.adk.sessions import InMemorySessionService
from google.adk.tools import FunctionTool
from google.genai import types

---
## Tools — The Executor's Toolkit

The Executor gets four tools — one for each kind of action a step might require. The **Planner** never touches these tools; it only outputs `tool_hint` suggestions. This **separation of concerns** (Ch 2: Routing) means the Planner can focus on strategy while the Executor focuses on execution.

| Tool | Purpose | Comparable to |
|------|---------|---------------|
| `web_search` | Research technical topics | Ch 5: Tool Use (Google Search) |
| `write_file` | Save code or notes to disk | Ch 8a: write_note (but for any file) |
| `read_file` | Read back saved files | Verify what was written |
| `run_python` | Execute Python code | Ch 5: Code Execution (but local) |

In [38]:
def web_search(query: str) -> str:
    """Search the web for technical information. Returns a summary."""
    # Deterministic mock — same principle as Ch 6 (mock internals for reproducibility)
    mock_results = {
        "rust ownership": (
            "Rust ownership ensures memory safety without GC via the borrow checker. "
            "Each value has exactly one owner. Ownership can be transferred (moved) or borrowed. "
            "The borrow checker enforces: one mutable ref OR multiple immutable refs, never both."
        ),
        "python context manager": (
            "Python context managers (with statement) guarantee resource cleanup via __enter__/__exit__. "
            "Similar to Rust's RAII pattern — resources are acquired on entry and released on exit. "
            "contextlib.contextmanager decorator simplifies creation from generator functions."
        ),
        "raii pattern": (
            "RAII (Resource Acquisition Is Initialization) ties resource lifetime to object scope. "
            "Rust implements RAII through its ownership system — when an owner goes out of scope, "
            "Drop::drop() is called automatically. Python's context managers achieve similar guarantees."
        ),
        "llm tokenizer": (
            "Tokenizers split text into tokens using BPE or WordPiece algorithms. "
            "GPT uses byte-pair encoding. BERT uses WordPiece. Both handle subword units."
        ),
    }
    for key, val in mock_results.items():
        if key in query.lower():
            return val
    return f"Search results for '{query}': Multiple relevant articles found covering this topic."


def write_file(filename: str, content: str) -> str:
    """Write content to a file. Returns confirmation with file path and size."""
    path = f"/tmp/{filename}"
    with open(path, "w") as f:
        f.write(content)
    return f"File written: {path} ({len(content)} chars)"


def read_file(filename: str) -> str:
    """Read content from a previously written file."""
    path = f"/tmp/{filename}"
    try:
        with open(path) as f:
            return f.read()
    except FileNotFoundError:
        return f"File not found: {path}"


def run_python(code: str) -> str:
    """Execute Python code and return the stdout output."""
    import io, contextlib
    buf = io.StringIO()
    try:
        with contextlib.redirect_stdout(buf):
            exec(code, {})
        output = buf.getvalue()
        return output if output else "(code executed successfully, no output)"
    except Exception as e:
        return f"Error: {type(e).__name__}: {e}"


print("Tools defined: web_search, write_file, read_file, run_python")

Tools defined: web_search, write_file, read_file, run_python


---
## Agent 1: The Planner

The Planner's **only job** is to decompose a goal into ordered steps. It outputs **strict JSON** — no markdown, no prose. This makes the plan inspectable, loggable, and parseable.

Key design decision: the Planner has **no tools**. It cannot execute anything. This is the **separation of planning from execution** — the core idea of this pattern. Compare to Ch 8a (ReAct) where a single agent both plans and executes.

### Rate-Limit Retry (All Agents)

All three agents share a `RETRY_CONFIG` that tells ADK to **automatically retry 429 errors** with exponential backoff (2s initial delay, up to 3 attempts). This is the [recommended approach from the ADK docs](https://google.github.io/adk-docs/agents/models/#error-code-429-resource_exhausted) — Layer 1 of our rate-limit protection. Layer 2 is the orchestrator's burst/cooldown logic (see below).

In [39]:
from google.genai import types as genai_types

# ── ADK-native retry config ───────────────────────────────────
# Instead of only managing rate limits in Python, we tell ADK to
# automatically retry 429 errors with exponential backoff.
# This is the recommended approach from the ADK docs:
# https://google.github.io/adk-docs/agents/models/#error-code-429-resource_exhausted

RETRY_CONFIG = genai_types.GenerateContentConfig(
    http_options=genai_types.HttpOptions(
        retry_options=genai_types.HttpRetryOptions(
            initial_delay=60,    # seconds before first retry
            attempts=3,         # total attempts (1 original + 2 retries)
        ),
    ),
)

PLANNER_INSTRUCTION = """
You are a strategic Planner. Your only job is to decompose a high-level goal
into a clear, ordered list of concrete steps.

Rules:
- Output ONLY valid JSON — no markdown fences, no prose.
- Schema:
  {
    "goal": "<original goal>",
    "steps": [
      {"id": 1, "description": "<what to do>", "tool_hint": "<web_search|write_file|run_python|none>"},
      ...
    ]
  }
- Each step must be independently executable.
- Maximum 6 steps. Prefer fewer.
- Do NOT start executing — just plan.
"""

planner_agent = LlmAgent(
    name="PlannerAgent",
    model="gemini-2.0-flash",
    instruction=PLANNER_INSTRUCTION,
    description="Decomposes a goal into an ordered JSON plan.",
    generate_content_config=RETRY_CONFIG,
)

print("PlannerAgent ready (no tools — planning only)")
print("  Built-in retry: 3 attempts, 2s initial delay")

PlannerAgent ready (no tools — planning only)
  Built-in retry: 3 attempts, 2s initial delay


---
## Agent 2: The Executor

The Executor receives **one step at a time** and completes it using tools. It never plans ahead or works on other steps.

This is the **specialist pattern** from Ch 2 (Routing) — one agent, one focused job. The Executor is like the specialist sub-agents in Ch 6's supply chain pipeline (ForexAnalyst, LogisticsManager), except it's general-purpose with multiple tools.

In [40]:
EXECUTOR_INSTRUCTION = """
You are a focused Executor. You receive a single step description and must
complete ONLY that step using the available tools.

Rules:
- Use exactly the right tool for the step (web_search, write_file, read_file, run_python).
- If no tool is needed, reason through the answer directly.
- Return a concise result summary — what you did and what you found.
- Do NOT plan ahead or work on other steps.
"""

executor_agent = LlmAgent(
    name="ExecutorAgent",
    model="gemini-2.0-flash",
    instruction=EXECUTOR_INSTRUCTION,
    description="Executes a single plan step using tools.",
    tools=[
        FunctionTool(web_search),
        FunctionTool(write_file),
        FunctionTool(read_file),
        FunctionTool(run_python),
    ],
    generate_content_config=RETRY_CONFIG,
)

print("ExecutorAgent ready with tools:", [t.name for t in executor_agent.tools])
print("  Built-in retry: 3 attempts, 60s initial delay")

ExecutorAgent ready with tools: ['web_search', 'write_file', 'read_file', 'run_python']
  Built-in retry: 3 attempts, 60s initial delay


---
## Agent 3: The Replanner

After each step executes, the Replanner reviews progress and decides:

- **`continue`** — Remaining steps are still correct. Keep going.
- **`replan`** — Results revealed something new. Revise the remaining steps.
- **`done`** — Goal is already achieved. Stop early.

This is **Reflection from Ch 4/Ch 6** but applied to the *plan itself*, not just the output. In Ch 6, the compliance reviewer checked a procurement strategy against business rules. Here, the Replanner checks the *plan* against *reality* as it unfolds.

Key design decision: the Replanner is **conservative** — it only replans when results genuinely change the task. This prevents thrashing.

In [41]:
REPLANNER_INSTRUCTION = """
You are a Replanner. You review progress on a goal and decide what to do next.

You will receive:
  - The original goal
  - Steps already completed and their results
  - Remaining steps not yet executed

Output ONLY valid JSON (no markdown fences). Choose one of:

Option A — Keep the remaining steps as-is:
  {"action": "continue", "remaining_steps": [<original remaining steps>]}

Option B — Revise the plan given what you learned:
  {"action": "replan", "revised_steps": [
    {"id": <n>, "description": "...", "tool_hint": "..."},
    ...
  ]}

Option C — Goal is already achieved, stop:
  {"action": "done", "summary": "<brief summary of what was accomplished>"}

Be conservative: only replan if results revealed something that makes the
original steps wrong or redundant.
"""

replanner_agent = LlmAgent(
    name="ReplannerAgent",
    model="gemini-2.0-flash",
    instruction=REPLANNER_INSTRUCTION,
    description="Reviews progress and decides to continue, replan, or finish.",
    generate_content_config=RETRY_CONFIG,
)

print("ReplannerAgent ready (no tools — review only)")
print("  Built-in retry: 3 attempts, 60s initial delay")

ReplannerAgent ready (no tools — review only)
  Built-in retry: 3 attempts, 60s initial delay


---
## The Orchestrator — Driving the Plan→Execute→Replan Loop

The orchestrator is **not an LLM agent** — it's a Python class that drives the loop deterministically. This is the same principle as Ch 3/Ch 6: use `ParallelAgent` / `SequentialAgent` for deterministic orchestration, not LLM reasoning.

```
  goal
   │
   ▼
  [Planner] ──► initial plan (JSON)
   │
   ▼
  ┌─ Pick next step ◄──────────────────────┐
  │                                         │
  ▼                                         │
 [Executor] ──► step result                 │
  │                                         │
  ▼                                         │
 [Replanner] ──► continue ─────────────────►┘
              └─► replan ──► revised steps ─►┘
              └─► done ──► final summary
```

Each agent gets its own ADK session — **no state bleed between roles**. The orchestrator passes context explicitly via message text, not shared state. Compare this to Ch 6 where agents shared a blackboard via `output_key`.

### Rate-Limit Protection

All three agents use ADK's built-in retry via `generate_content_config`. If a call hits a 429, ADK automatically retries with exponential backoff (2s initial delay, up to 3 attempts) — no manual throttling needed. The orchestrator stays clean and simple.

In [42]:
class PlanAndExecuteOrchestrator:
    """Drives the Plan → Execute → Replan loop using three ADK agents."""

    def __init__(self, session_service, app_name: str = "plan_execute_app"):
        self.session_service = session_service
        self.app_name = app_name
        self._runners = {}
        for agent in [planner_agent, executor_agent, replanner_agent]:
            self._runners[agent.name] = Runner(
                agent=agent,
                app_name=app_name,
                session_service=session_service,
            )

    async def _call_agent(self, agent_name: str, user_id: str, message: str) -> str:
        """Send a message to an agent and return its text response."""
        runner = self._runners[agent_name]
        session_id = f"{user_id}_{agent_name}"

        # Create session if it doesn't exist yet
        existing = await self.session_service.get_session(
            app_name=self.app_name, user_id=user_id, session_id=session_id
        )
        if not existing:
            await self.session_service.create_session(
                app_name=self.app_name, user_id=user_id, session_id=session_id
            )

        content = types.Content(
            role="user",
            parts=[types.Part.from_text(text=message)],
        )
        response_text = ""
        async for event in runner.run_async(
            user_id=user_id, session_id=session_id, new_message=content,
        ):
            if event.is_final_response() and event.content:
                for part in event.content.parts:
                    if part.text:
                        response_text += part.text
        return response_text.strip()

    def _parse_json(self, raw: str) -> dict:
        """Robustly parse JSON — strip markdown fences if the model adds them."""
        raw = raw.strip()
        if raw.startswith("```"):
            raw = raw.split("```")[1]
            if raw.startswith("json"):
                raw = raw[4:]
        return json.loads(raw.strip())

    async def run(self, goal: str, user_id: str = "user_001") -> str:
        print(f"{'='*60}")
        print(f"GOAL: {goal}")
        print(f"{'='*60}")

        # ── Phase 1: Plan ─────────────────────────────────────────
        print("\n[PLANNER] Generating initial plan...")
        plan_raw = await self._call_agent("PlannerAgent", user_id, goal)
        plan = self._parse_json(plan_raw)
        remaining_steps = plan["steps"]
        completed_steps = []

        print(f"[PLANNER] {len(remaining_steps)} steps generated:")
        for s in remaining_steps:
            print(f"  Step {s['id']}: {s['description']}  (tool: {s['tool_hint']})")

        max_iterations = 15
        iteration = 0

        # ── Phase 2: Execute → Replan loop ────────────────────────
        while remaining_steps and iteration < max_iterations:
            iteration += 1
            current_step = remaining_steps[0]

            # Execute one step
            print(f"\n[EXECUTOR] Step {current_step['id']}: {current_step['description']}")
            executor_prompt = (
                f"Complete this single step:\n"
                f"Step: {current_step['description']}\n"
                f"Tool hint: {current_step['tool_hint']}\n\n"
                f"Context from previous steps:\n"
                + "\n".join(
                    f"- Step {c['step']['id']}: {c['result'][:150]}"
                    for c in completed_steps
                )
            )
            result = await self._call_agent("ExecutorAgent", user_id, executor_prompt)
            completed_steps.append({"step": current_step, "result": result})
            print(f"[EXECUTOR] Result: {result[:250]}{'...' if len(result)>250 else ''}")

            remaining_steps = remaining_steps[1:]

            # Replan check (skip if nothing left)
            if not remaining_steps:
                break

            print(f"\n[REPLANNER] Reviewing progress ({len(remaining_steps)} steps left)...")
            replan_prompt = (
                f"Goal: {goal}\n\n"
                f"Completed steps:\n"
                + "\n".join(
                    f"  Step {c['step']['id']}: {c['step']['description']}\n"
                    f"  Result: {c['result'][:200]}"
                    for c in completed_steps
                )
                + f"\n\nRemaining steps:\n"
                + "\n".join(
                    f"  Step {s['id']}: {s['description']}"
                    for s in remaining_steps
                )
            )
            replan_raw = await self._call_agent("ReplannerAgent", user_id, replan_prompt)
            replan = self._parse_json(replan_raw)

            if replan["action"] == "done":
                print(f"\n[REPLANNER] Goal achieved early!")
                print(f"  Summary: {replan['summary']}")
                return replan["summary"]
            elif replan["action"] == "replan":
                remaining_steps = replan["revised_steps"]
                print(f"[REPLANNER] Revised plan — {len(remaining_steps)} steps remaining:")
                for s in remaining_steps:
                    print(f"  Step {s['id']}: {s['description']}")
            else:
                print("[REPLANNER] Continuing with original remaining steps.")

        # ── Final summary ────────────────────────────────────────
        summary_prompt = (
            f"Goal: {goal}\n\n"
            f"All completed steps:\n"
            + "\n".join(
                f"Step {c['step']['id']} — {c['step']['description']}:\n{c['result'][:300]}"
                for c in completed_steps
            )
            + "\n\nWrite a concise final summary of what was accomplished."
        )
        final = await self._call_agent("PlannerAgent", user_id, summary_prompt)
        print(f"\n{'='*60}")
        print(f"FINAL SUMMARY:\n{final}")
        print(f"{'='*60}")
        return final


print("PlanAndExecuteOrchestrator defined")

PlanAndExecuteOrchestrator defined


---
## Run It — Structured Multi-Step Task

Watch the three agents collaborate:
1. The Planner generates a structured JSON plan
2. The Executor works through steps one at a time, using tools
3. The Replanner checks after each step — should we continue, revise, or stop?

Notice the **separation of concerns**: the Planner never touches tools, the Executor never plans ahead, and the Replanner only reviews.

In [43]:
session_service = InMemorySessionService()
orchestrator = PlanAndExecuteOrchestrator(session_service)

result = await orchestrator.run(
    "Research what Rust ownership is and how it compares to Python's context managers. "
    "Write a short Python script that demonstrates a similar RAII concept using context managers, "
    "run it to verify it works, and save the script to a file called rust_ownership_demo.py"
    # "Research Rust's ownership model and compare it to Go for building "
    # "latency-sensitive microservices. Look into real-world adoption cases. "
    # "Save your key findings as notes, then give me a recommendation "
    # "on whether our team should adopt Rust for our new payment gateway service."
)

GOAL: Research what Rust ownership is and how it compares to Python's context managers. Write a short Python script that demonstrates a similar RAII concept using context managers, run it to verify it works, and save the script to a file called rust_ownership_demo.py

[PLANNER] Generating initial plan...
[PLANNER] 4 steps generated:
  Step 1: Research Rust's ownership concept, focusing on move semantics, borrowing, and lifetimes. Also, research Python's context managers and the RAII (Resource Acquisition Is Initialization) principle.  (tool: web_search)
  Step 2: Based on the research, write a Python script that demonstrates RAII using context managers to mimic the resource management aspects of Rust ownership. The script should simulate acquiring and releasing a resource.  (tool: write_file)
  Step 3: Save the script to a file named `rust_ownership_demo.py`.  (tool: none)
  Step 4: Execute the `rust_ownership_demo.py` script to verify that it functions correctly and demonstrates the i

---
## Verify the Output

The Executor should have written a file. Let's read it back to confirm.

In [44]:
try:
    with open("/tmp/rust_ownership_demo.py") as f:
        print(f.read())
except FileNotFoundError:
    print("File not found — the executor may have used a different filename.")
    import glob
    files = glob.glob("/tmp/*.py")
    if files:
        print(f"Found files: {files}")
        with open(files[-1]) as f:
            print(f"\nContents of {files[-1]}:\n{f.read()}")

class ManagedResource:
    def __init__(self, name):
        self.name = name
        print(f"Resource '{self.name}' created.")

    def __enter__(self):
        print(f"Resource '{self.name}' acquired.")
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        print(f"Resource '{self.name}' released.")

    def use_resource(self):
        print(f"Resource '{self.name}' is being used.")


with ManagedResource("MyResource") as resource:
    resource.use_resource()

print("Exiting the 'with' block.")



---
## Try a Different Goal

The orchestrator works on any structured multi-step task.

In [None]:
# Uncomment and run with your own goal:
# session_service2 = InMemorySessionService()
# orchestrator2 = PlanAndExecuteOrchestrator(session_service2)
# await orchestrator2.run(
#     "Research how LLM tokenizers work (BPE vs WordPiece). "
#     "Write a Python script that implements a simple character-level tokenizer, "
#     "run it on the sentence 'Hello World', and save the script."
# )

---
## Key Takeaways

### Three-Agent Architecture

| Agent | Has Tools? | Has Session State? | Role |
|-------|-----------|-------------------|------|
| PlannerAgent | No | Own session | Strategy only — outputs JSON plan |
| ExecutorAgent | Yes (4 tools) | Own session | Tactics only — completes one step |
| ReplannerAgent | No | Own session | Review only — continue, replan, or done |

### Patterns Composed

| Pattern | Chapter | How it appears here |
|---------|---------|--------------------|
| Routing | Ch 2 | Planner routes each step to the right tool via `tool_hint` |
| Parallelization | Ch 3 | Plan steps are independent — could be parallelized |
| Tool Use | Ch 5 | Executor uses web_search, write_file, read_file, run_python |
| Reflection | Ch 6 | Replanner reviews progress and revises the plan |
| Sequential | Ch 7 | Plan → Execute → Replan is a sequential chain |

### Plan-and-Execute vs ReAct (Ch 8a)

| Dimension | Plan-and-Execute | ReAct |
|-----------|-----------------|-------|
| Agents | 3 specialized | 1 general |
| Plan | Explicit JSON (loggable, auditable) | Implicit in reasoning trace |
| Adaptability | Replanner revises remaining steps | Every Thought can pivot |
| Debugging | Trace across 3 agents | Single agent trace |
| Parallelism | Steps are independent — can parallelize | Inherently serial |
| Best for | **Long, structured tasks** | **Short, exploratory tasks** |

### Design Decisions

- **Strict JSON output** from Planner/Replanner — easy to parse, inspect, and log. No ambiguity.
- **Separate sessions** per agent — prevents state bleed. Context is passed explicitly via message text.
- **Conservative Replanner** — only replans when results *change the task*. Prevents thrashing.
- **Deterministic orchestrator** (Python class, not LLM) — the loop logic is reliable. Same principle as Ch 3/Ch 6's ParallelAgent.