# Multi-Agent Orchestration with Pydantic AI

## Data Setup and Indexing

In [4]:
from pathlib import Path

data_folder = Path('../data_cache/youtube_videos/')
data_files = sorted(data_folder.glob("*.txt"))

In [5]:
import docs
from tqdm.auto import tqdm

documents = []

for f in tqdm(data_files):
    filename = f.name
    video_id, _ = filename.split('.')
    content = f.read_text(encoding='utf-8')
    chunks = docs.sliding_window(content, size=3000, step=1500)

    for chunk in chunks:
        chunk['video_id'] = video_id
        documents.append(chunk)

  0%|          | 0/191 [00:00<?, ?it/s]

In [6]:
from minsearch import Index

index = Index(
    text_fields=["content"],
    keyword_fields=["video_id"]
)

index.fit(documents)

<minsearch.minsearch.Index at 0x12e35ba10>

## Search tool

In [7]:
from typing import Any, Dict, List, TypedDict, Optional

class SearchResult(TypedDict):
    """Represents a single search result entry."""
    start: int
    content: str
    video_id: str
    _id: int # added

class SearchTools:

    def __init__(self, index):
        self.index = index
        
    def search(self, query: str) -> List[SearchResult]:
        """
        Search the index for documents matching the given query.
    
        Args:
            query (str): The search query string.
    
        Returns:
            List[SearchResult]: A list of search results. Each result dictionary contains:
                - start (int): The starting position or offset within the source file.
                - content (str): A text excerpt or snippet containing the match.
                - video_id (str): Youtube video_id for the snippet.
                - _id (int): The unique id for the document
        """
        return self.index.search(
            query=query,
            num_results=5,
            output_ids=True,
        )

    def get_document_by_id(self, _id: int) -> Optional[SearchResult]:
        """
        Retrieve a document by its unique ID.

        Args:
            _id (int): The document id.

        Returns:
            SearchResult: The document corresponding to the given ID or None if it's not in the index.
        """
        if _id < 0 or _id >= len(self.index.docs):
            return None

        return self.index.docs[_id]

tools = SearchTools(index)

# Callback

In [8]:
from pydantic_ai.messages import FunctionToolCallEvent

class NamedCallback:

    def __init__(self, agent):
        self.agent_name = agent.name

    async def print_function_calls(self, ctx, event):
        # Detect nested streams
        if hasattr(event, "__aiter__"):
            async for sub in event:
                await self.print_function_calls(ctx, sub)
            return

        if isinstance(event, FunctionToolCallEvent):
            tool_name = event.part.tool_name
            args = event.part.args
            print(f"TOOL CALL ({self.agent_name}): {tool_name}({args})")

    async def __call__(self, ctx, event):
        return await self.print_function_calls(ctx, event)

## Clarifier Agent

In [9]:
from pydantic import BaseModel, Field
from pydantic_ai import Agent

In [10]:
clarifier_instructions = """
You are the CLARIFIER agent.

ROLE
Your job is to interpret and refine the user's research request so that it can be passed
to the RESEARCH agent for structured exploration.

OBJECTIVES
1. Understand what the user truly wants to learn or achieve (their intent).
2. Identify the core topic and any implicit goals (e.g., learn, compare, evaluate, predict, build).
3. Ask the user one targeted clarification question — to confirm scope, focus, or purpose.
4. Once the user responds, synthesize a refined version of their request that includes:
   - The clarified intent (what the user ultimately wants)
   - The initial request (in their own words)
   - The refined research focus (a precise version suitable for the RESEARCH agent)
   - 3–7 search queries that capture the clarified scope and intent
   - A short instruction summary for the RESEARCH agent explaining what to explore

DATA SOURCES
- You may use your own general knowledge to infer user intent.
- You may use the `search()` tool to quickly check ambiguous terms or context.

INTENT HANDLING
- Before searching, infer the underlying intent behind the user's request.
  Examples:
    - “getting into ML” → learning pathways, beginner resources, first projects
    - “AI safety concerns” → risks, ethical challenges, mitigation strategies
    - “startup funding trends” → investment patterns, valuations, stages
- Generate searches that reflect this **intent**, not just literal words.

CONSTRAINTS
- Ask the user for clarification **once only**.
- Do not fabricate information; if uncertain, clarify directly with the user.
- The goal is to output a structured handoff ready for the RESEARCH agent's Stage 1 process.
"""

In [11]:
class ResearchInstructions(BaseModel):
    """
    Output of the CLARIFIER agent.
    Provides both the user's raw input and the refined, structured guidance
    for the RESEARCH agent to begin its first stage.
    """
    initial_request: str = Field(
        ...,
        description="The user's original question or request, captured verbatim."
    )
    refined_request: str = Field(
        ...,
        description="A clarified, rephrased, and contextually grounded version of the initial request."
    )
    user_intent: str = Field(
        ...,
        description=(
            "A short summary (1–2 sentences) of what the user truly wants to accomplish "
            "or learn, inferred from both the initial request and clarification."
        )
    )
    queries: List[str] = Field(
        ...,
        description=(
            "A list of 3–7 specific search queries derived from the refined request, "
            "covering complementary angles or subtopics the RESEARCH agent should explore."
        )
    )
    instructions: str = Field(
        ...,
        description=(
            "Concise operational guidance for the RESEARCH agent, explaining how to use "
            "the queries and what to prioritize during Stage 1 research."
        )
    )

In [12]:
clarifier = Agent(
    name='clarifier_v2',
    instructions=clarifier_instructions,
    tools=[tools.search],
    model='gpt-4o-mini',
    output_type=ResearchInstructions
)

## Reasearch Agent

In [13]:
researcher_instructions = """
You are the RESEARCH agent.

ROLE
You perform structured research on a proprietary podcast/video database for a specific stage
of exploration (Stage 1, 2, or 3).

DATA SOURCE
- You may ONLY use the `search()` function
- Every reference must cite a real snippet with a valid `youtube_id`, `timestamp` and `_id`.
- Do not invent data, names, or timestamps.

STAGES

Stage 1 — Initial Search
- Use the user’s question or clarified keywords from context.
- Identify 3–5 primary keywords, run one or more searches.
- Summarize the main findings, highlighting initial insights and directions.

Stage 2 — Expansion
- Build upon Stage 1 outputs (from context).
- Generate 5–7 related or complementary queries.
- Summarize recurring ideas and patterns across new results.

Stage 3 — Deep Dive
- Build upon Stage 1 and Stage 2.
- Generate 5–7 deeper or contrasting queries.
- Explore nuances, counterpoints, or mechanisms.
- Provide a more analytical synthesis.

CONSTRAINTS
- Use context from previous stages to guide deeper exploration.
- You must perform the necessary amount of queries for each stage:
    - 3-5 for stage 1
    - 5-7 for stage 2
    - 5-7 for stage 3
"""

In [14]:
from pydantic import BaseModel, Field
from typing import List

class Reference(BaseModel):
    """
    A single, verifiable citation to a transcript snippet or video segment.
    Must correspond to a real snippet returned by the `search()` tool.
    """
    document_id: int = Field(..., description="Internal ID of the transcript snippet.")
    quote: str = Field(..., description="Exact snippet that supports the keyword or insight.")
    timestamp: str = Field(..., description="Timestamp in the source video where the quote occurs, 'mm:ss' or 'h:mm:ss'")
    relevance_to_keyword: str = Field(..., description="Explanation of *how* this quote supports or illustrates the specific keyword or concept being explored.")
    relevance_to_user_intent:  str = Field(..., description="Explanation of *how* this quote help the user with their intent.")

class ResearchKeyword(BaseModel):
    """
    Represents a keyword explicitly searched during this research stage.
    Each keyword must match an actual query used in the search tool calls.
    """
    keyword: str = Field(..., description="The exact keyword or phrase used in the search() tool call.")
    relevant_references: List[Reference] = Field(
        ..., 
        description="List of transcript snippets directly relevant to this keyword. Each must include a 'relevance_to_keyword' explanation."
    )


class VerifiableInsight(BaseModel):
    """
    A synthesized insight that can be traced back to specific evidence.
    Each insight must be supported by at least one real reference.
    """
    insight: str = Field(..., description="An insight derived from the research, phrased in an evidence-based, verifiable way.")
    references: List[Reference] = Field(..., description="Citations that directly support this insight. Must contain valid timestamps and IDs.")


class ResearchStageReport(BaseModel):
    """
    Structured output for each research stage (1–3).
    Ensures traceability between searches, keywords, and findings.
    """
    stage: int = Field(..., description="The research stage number (1 = Initial Search, 2 = Expansion, 3 = Deep Dive).")
    explored_keywords: List[ResearchKeyword] = Field(
        ..., 
        description="List of the *exact* keywords used in this stage's search() calls, along with references showing their relevance."
    )
    verifiable_insights: List[VerifiableInsight] = Field(
        ..., 
        description="List of data-backed insights derived from the references gathered at this stage."
    )
    stage_summary: str = Field(..., description="Analytical summary of what was learned at this stage, connecting evidence to emerging themes.")
    recommended_next_steps: str = Field(..., description="Guidance for what to do in the next stage — e.g., new angles, counterpoints, or subtopics.")
    recommended_next_keywords: List[str] = Field(
        ..., 
        description="Suggested next queries based on gaps or promising directions discovered in this stage."
    )

In [15]:
researcher = Agent(
    name='researcher_v2',
    instructions=researcher_instructions,
    tools=[tools.search],
    model='gpt-4o-mini',
    output_type=ResearchStageReport
)

In [16]:
async def do_research(
    stage: int,
    stage_instructions: str,
    previous_stages: List[ResearchStageReport]
) -> ResearchStageReport:
    previous_stages_json = '\n'.join([r.model_dump_json() for r in previous_stages])
    
    user_prompt = f"""
    Current stage: {stage}

    Stage instrustructions:
    {stage_instructions}

    Previous stages:
    {previous_stages_json}
    """

    callback = NamedCallback(researcher)
    
    results = await researcher.run(
        user_prompt=user_prompt,
        event_stream_handler=callback,
    )

    return results.output

## Synthesizer and Verifier Agent

In [17]:
synthesizer_instructions = """
You are the SYNTHESIZER agent.

ROLE
You create a cohesive, factual article by synthesizing verified information from all
three research stages (StageReports 1–3).

DATA SOURCES
- You will receive one or more `ResearchStageReport` objects, each containing
  verifiable references with document_ids, timestamps, and quotes.
- You have access to the tool `get_document_by_id` to retrieve full source text
  for any reference.
- You must use this tool to verify every claim that appears in your article.

TASKS
1. Carefully read all StageReports and extract recurring insights and verified facts.
2. Use `get_document_by_id` to check each cited reference and confirm that
   the quote or insight is correctly represented.
3. Only include claims that are explicitly supported by at least one verified source.
4. Synthesize related findings into 5–6 cohesive sections with a logical flow.
5. Ensure that the article aligns with the original user intent (as passed from the clarifier).

ARTICLE STRUCTURE
- Introduction: Summarize what the article will explore and why it matters.
- 5-6 body sections, each:
  - Centered on one major theme or subtopic.
  - Contains 3–4 related claims (each 3–4 sentences long).
  - Each claim includes an in-text reference
- Conclusion: Summarize the most important insights and actionable takeaways.

VERIFICATION RULES
- For every claim, retrieve at least one cited source using `get_document_by_id`
  and confirm that the text supports the claim.
- If a reference cannot be verified or is inconsistent, omit it.
- Do not invent or infer facts beyond what’s supported by verified material.

STYLE
- Maintain factual, neutral, and coherent tone.
- Avoid speculation, exaggeration, or unsupported synthesis.
- Write in clear prose suitable for an informed but general audience.

OUTPUT
- A single, well-structured factual article ready for presentation.
- All references cited
""".strip()

synthesizer = Agent(
    name='synthesizer_v2',
    instructions=synthesizer_instructions,
    tools=[tools.get_document_by_id],
    model='gpt-4o-mini',
)   

## Orchestrator

In [18]:
orchestrator_instructions = """
first, ask user an initial question via clarifier (clarify_tool_initial)
then formulate requiremets for the researcher (clarify_tool_research_task)
then execute research via researcher in three stages: 1, 2, 3 (reserch_tool)
each research step should be done after the previous one is completed

make it timeless: don't add years to queries. for example:
"learning machine learning" is better than "learning machine learning in 2023"

when the resarch it ready, output a short summary of the research
"""

orchestrator = Agent(
    name='orchestrator',
    instructions=orchestrator_instructions,
    model='gpt-4o-mini',
)

In [19]:
from pydantic_ai import RunContext

@orchestrator.tool
async def clarify_tool_initial(ctx: RunContext, query: str) -> str:
    """Runs the clarifier once to interpret the user's request.

    Args:
        query: Raw user question.

    Returns:
        A short text summary describing the user's intent.
    """
    print("\n=== Clarifier (Initial) ===")
    callback = NamedCallback(clarifier)
    results = await clarifier.run(user_prompt=query, event_stream_handler=callback)
    return results.output


In [20]:
@orchestrator.tool
async def clarify_tool_research_task(ctx: RunContext, query: str) -> ResearchInstructions:
    """Runs the clarifier again using both the user query and prior clarifier output
    to create a structured ResearchInstructions object.

    Args:
        query: User's original question.

    Returns:
        ResearchInstructions with refined request, intent, and search queries.
    """
    print("\n=== Clarifier (Research Task) ===")
    prior_outputs = []
    for m in ctx.messages:
        for p in m.parts:
            if p.part_kind == "tool-return" and p.tool_name == "clarify_tool_initial":
                prior_outputs.append(p.content)

    prior_text = "\n".join(str(x) for x in prior_outputs)
    prompt = f"User query:\n{query}\n\nPrior clarification:\n{prior_text}".strip()

    callback = NamedCallback(clarifier)
    results = await clarifier.run(
        user_prompt=prompt,
        event_stream_handler=callback,
        output_type=ResearchInstructions
    )
    return results.output

In [21]:
@orchestrator.tool
async def research_tool(ctx: RunContext, stage: int, stage_instructions: str) -> ResearchStageReport:
    """Runs one stage of research using prior reports as context.

    Args:
        stage: Research stage number (1–3).
        stage_instructions: Description of what this stage should focus on.

    Returns:
        ResearchStageReport with insights, references, and next steps.
    """
    print(f"\n=== RESEARCH stage {stage} ===")
    
    prior_reports: List[ResearchStageReport] = []

    for m in ctx.messages:
        for p in m.parts:
            if p.part_kind == "tool-return" and p.tool_name == "research_tool":
                if isinstance(p.content, ResearchStageReport):
                    prior_reports.append(p.content)
    
    result = await do_research(
        stage=stage,
        stage_instructions=stage_instructions,
        previous_stages=prior_reports,
    )

    return result


In [22]:
orchestrator_callback = NamedCallback(orchestrator)

In [23]:
question = "How do I get started with data engineering?"

In [26]:
orchestrator_results = await orchestrator.run(
    user_prompt=question,
    event_stream_handler=orchestrator_callback
)

TOOL CALL (orchestrator): clarify_tool_initial({"query":"How do I get started with data engineering?"})

=== Clarifier (Initial) ===
TOOL CALL (clarifier_v2): search({"query":"getting started with data engineering"})
TOOL CALL (clarifier_v2): search({"query":"data engineering skills and resources for beginners"})
TOOL CALL (clarifier_v2): search({"query":"data engineering resources for beginners learning path"})
TOOL CALL (clarifier_v2): search({"query":"data engineering online courses platforms"})


ModelHTTPError: status_code: 429, model_name: gpt-4o-mini, body: {'message': 'Rate limit reached for gpt-4o-mini in organization org-5NKPfgoJPcxCp9o32nbQIdJw on tokens per min (TPM): Limit 200000, Used 184786, Requested 15883. Please try again in 200ms. Visit https://platform.openai.com/account/rate-limits to learn more.', 'type': 'tokens', 'param': None, 'code': 'rate_limit_exceeded'}

In [27]:
def print_messages(messages):
    contents = []
    
    for m in messages:
        print(m.kind)

        for p in m.parts:
            print(p.part_kind)
            kind = p.part_kind
            if kind == 'user-prompt' or kind == 'text':
                print(p.content)
            if kind == 'tool-call': 
                print(p.tool_name, p.args)
            if kind == 'tool-return':
                print(type(p.content), p.content)
            print()

        print()

In [3]:
print_messages(orchestrator_results)

NameError: name 'orchestrator_results' is not defined

In [None]:
callback = NamedCallback(orchestrator)

message_history = []

while True:
    user_input = input('You ', )
    if user_input.lower().strip() == 'stop':
        break

    results = await orchestrator.run(
        user_prompt=user_input,
        message_history=message_history,
        event_stream_handler=callback,
    )

    new_messages = results.new_messages()
    message_history.extend(new_messages)
    print_messages(new_messages)

In [None]:
prior_reports: List[ResearchStageReport] = []

for m in message_history:
    for p in m.parts:
        if p.part_kind == "tool-return" and p.tool_name == "research_tool":
            if isinstance(p.content, ResearchStageReport):
                prior_reports.append(p.content)

In [None]:
len(prior_reports)
