# Tutorial: Orchestration Flow Rebuild in Notebook

Audience:
- Researchers who want to re-implement AgentFlow-style orchestration in a transparent way.

Learning goals:
- Rebuild the full loop (Planner -> Worker -> Verifier -> Memory -> Generator).
- Log every step as structured trace data.


## Outline

1. Define minimal sub-agent interfaces
2. Implement a state-machine orchestration loop
3. Run and inspect step-level traces
4. Add early-stop and budget constraints


In [None]:
from __future__ import annotations

from dataclasses import dataclass, asdict
from typing import Dict, Any, List
import time

@dataclass
class PlanStep:
    context: str
    sub_goal: str
    tool_name: str


In [None]:
class MockPlanner:
    def analyze_query(self, question: str) -> str:
        return f"Need factual lookup for: {question}"

    def generate_next_step(self, question: str, analysis: str, memory: List[Dict[str, Any]], step_idx: int) -> PlanStep:
        if step_idx == 1:
            return PlanStep(context=analysis, sub_goal="Search factual answer", tool_name="Google_Search_Tool")
        return PlanStep(context=str(memory[-1]) if memory else analysis, sub_goal="Summarize evidence", tool_name="Base_Generator_Tool")

    def generate_direct_output(self, question: str, memory: List[Dict[str, Any]]) -> str:
        if not memory:
            return "No answer"
        return f"Final answer based on latest memory: {memory[-1]['result']}"

class MockWorker:
    KNOWLEDGE = {
        "what is the capital of france?": "Paris",
        "what is the capital of japan?": "Tokyo",
    }

    def execute(self, tool_name: str, question: str, step: PlanStep) -> Dict[str, Any]:
        if tool_name == "Google_Search_Tool":
            return {"tool": tool_name, "result": self.KNOWLEDGE.get(question.lower(), "UNKNOWN")}
        if tool_name == "Base_Generator_Tool":
            return {"tool": tool_name, "result": f"Summary for {question}"}
        return {"tool": tool_name, "result": "UNSUPPORTED_TOOL"}

class MockVerifier:
    def decision(self, question: str, memory: List[Dict[str, Any]]) -> str:
        if not memory:
            return "CONTINUE"
        latest = str(memory[-1].get("result", "")).lower()
        if "france" in question.lower() and "paris" in latest:
            return "STOP"
        if "japan" in question.lower() and "tokyo" in latest:
            return "STOP"
        return "CONTINUE"


In [None]:
def run_orchestration(question: str, max_steps: int = 4, max_time_s: int = 10) -> Dict[str, Any]:
    planner = MockPlanner()
    worker = MockWorker()
    verifier = MockVerifier()

    start = time.time()
    trace: List[Dict[str, Any]] = []
    memory: List[Dict[str, Any]] = []

    analysis = planner.analyze_query(question)

    for step_idx in range(1, max_steps + 1):
        if time.time() - start > max_time_s:
            break

        pstep = planner.generate_next_step(question, analysis, memory, step_idx)
        wout = worker.execute(pstep.tool_name, question, pstep)
        memory.append(wout)
        dec = verifier.decision(question, memory)

        trace.append({
            "step": step_idx,
            "planner": asdict(pstep),
            "worker": wout,
            "verifier": dec,
        })

        if dec == "STOP":
            break

    direct = planner.generate_direct_output(question, memory)
    return {"question": question, "analysis": analysis, "trace": trace, "direct_output": direct}


## Step - Run the rebuilt loop


In [None]:
result = run_orchestration("What is the capital of France?")
print("analysis:", result["analysis"])
print("direct_output:", result["direct_output"])
print("trace_len:", len(result["trace"]))
result["trace"]


## Exercises

1. Replace mock planner with calls to the real AgentFlow planner endpoint.
2. Add tool timeout and retry policy in worker stage.
3. Add per-step cost accounting and stop on cost budget.
