In [1]:
from openai import OpenAI
from pydantic import BaseModel
import json
import numpy as np
from tqdm.auto import tqdm
from dotenv import load_dotenv
load_dotenv()

openai_client = OpenAI()

In [2]:
from pathlib import Path

from tqdm.auto import tqdm
from minsearch import Index

import docs

## Data load and prep

In [3]:
documents = []

data_folder = Path('../data_cache/youtube_videos/')

for f in tqdm(data_folder.glob('*.txt')):
    filename = f.name
    video_id, _ = filename.split('.')

    transcript = f.read_text(encoding='utf-8')

    chunks = docs.sliding_window(transcript, size=3000, step=1500)

    for chunk in chunks:
        chunk['video_id'] = video_id
        documents.append(chunk)
        

0it [00:00, ?it/s]

In [4]:
index = Index(
    text_fields=['content'],
    keyword_fields=['video_id']
)

index.fit(documents)

<minsearch.minsearch.Index at 0x250ceab1d00>

## Search Function Tool

In [5]:
from typing import Any, Dict, List, TypedDict, Optional

class SearchResult(TypedDict):
    """Represents a single search result entry."""
    start: int
    content: str
    video_id: str
    _id: int # added

class SearchTools:

    def __init__(self, index):
        self.index = index
        
    def search(self, query: str) -> List[SearchResult]:
        """
        Search the index for documents matching the given query.
    
        Args:
            query (str): The search query string.
    
        Returns:
            List[SearchResult]: A list of search results. Each result dictionary contains:
                - start (int): The starting position or offset within the source file.
                - content (str): A text excerpt or snippet containing the match.
                - video_id (str): Youtube video_id for the snippet.
                - _id (int): The unique id for the document
        """
        return self.index.search(
            query=query,
            num_results=5,
            output_ids=True,
        )

    def get_document_by_id(self, _id: int) -> Optional[SearchResult]:
        """
        Retrieve a document by its unique ID.

        Args:
            _id (int): The document id.

        Returns:
            SearchResult: The document corresponding to the given ID or None if it's not in the index.
        """
        if _id < 0 or _id >= len(self.index.docs):
            return None

        return self.index.docs[_id]

tools = SearchTools(index)

In [None]:
from pydantic_ai.messages import FunctionToolCallEvent, FunctionToolResultEvent

class NamedCallback:

    def __init__(self, agent):
        self.agent_name = agent.name

    async def print_function_calls(self, ctx, event):
        # Detect nested streams
        if hasattr(event, "__aiter__"):
            async for sub in event:
                await self.print_function_calls(ctx, sub)
            return

        if isinstance(event, FunctionToolCallEvent):
            print("CALL →", event.part.tool_name, event.part.args_as_dict(), event.tool_call_id)
        elif isinstance(event, FunctionToolResultEvent):
            print("RES  ←", event.result.tool_name, event.tool_call_id, event.result.content)

    async def __call__(self, ctx, event):
        return await self.print_function_calls(ctx, event)

## Clarifier Agent

In [13]:
from pydantic_ai import Agent
from pydantic import BaseModel, Field

from typing import List

In [14]:
clarifier_instructions = """
You are the CLARIFIER agent.

ROLE
Your job is to interpret and refine the user's research request so that it can be passed
to the RESEARCH agent for structured exploration.

OBJECTIVES
1. Understand what the user truly wants to learn or achieve (their intent).
2. Identify the core topic and any implicit goals (e.g., learn, compare, evaluate, predict, build).
3. Ask the user one targeted clarification question — to confirm scope, focus, or purpose.
4. Once the user responds, synthesize a refined version of their request that includes:
   - The clarified intent (what the user ultimately wants)
   - The initial request (in their own words)
   - The refined research focus (a precise version suitable for the RESEARCH agent)
   - 3–7 search queries that capture the clarified scope and intent
   - A short instruction summary for the RESEARCH agent explaining what to explore

DATA SOURCES
- You may use your own general knowledge to infer user intent.
- You may use the `search()` tool to quickly check ambiguous terms or context.

CONSTRAINTS
- Ask the user for clarification **once only**.
- Do not fabricate information; if uncertain, clarify directly with the user.
- The goal is to output a structured handoff ready for the RESEARCH agent's Stage 1 process.
""".strip()

In [15]:
class ResearchInstructions(BaseModel):
    """
    Output of the CLARIFIER agent.
    Provides both the user's raw input and the refined, structured guidance
    for the RESEARCH agent to begin its first stage.
    """
    initial_request: str = Field(
        ...,
        description="The user's original question or request, captured verbatim."
    )
    refined_request: str = Field(
        ...,
        description="A clarified, rephrased, and contextually grounded version of the initial request."
    )
    user_intent: str = Field(
        ...,
        description=(
            "A short summary (1–2 sentences) of what the user truly wants to accomplish "
            "or learn, inferred from both the initial request and clarification."
        )
    )
    queries: List[str] = Field(
        ...,
        description=(
            "A list of 3–7 specific search queries derived from the refined request, "
            "covering complementary angles or subtopics the RESEARCH agent should explore."
        )
    )
    instructions: str = Field(
        ...,
        description=(
            "Concise operational guidance for the RESEARCH agent, explaining how to use "
            "the queries and what to prioritize during Stage 1 research."
        )
    )

In [16]:
clarifier = Agent(
    name='clarifier',
    instructions=clarifier_instructions,
    tools=[tools.search],
    model='gpt-4o-mini',
)

clarifier_callback = NamedCallback(clarifier)

In [18]:
question = "how do I get started with data engineering"

In [19]:
clarifier_results1 = await clarifier.run(
    user_prompt=question,
    event_stream_handler=clarifier_callback
)

In [20]:
print(clarifier_results1.output)

To better assist you, could you clarify whether you are looking for information on specific skills and tools within data engineering, a learning pathway or resources, or perhaps insights on how to transition into a data engineering role?


In [21]:
answer = "I want a learning path"

In [22]:
clarifier_results2 = await clarifier.run(
    user_prompt=answer,
    message_history=clarifier_results1.new_messages(),
    output_type=ResearchInstructions,
    event_stream_handler=clarifier_callback,
)

In [25]:
print(clarifier_results2.output)

initial_request='how do I get started with data engineering' refined_request='I want a structured learning path to get started with data engineering.' user_intent='To obtain a clear and structured learning path for entering the field of data engineering.' queries=['data engineering learning path for beginners', 'essential skills for data engineers', 'best online courses for data engineering', 'data engineering tools and technologies', 'step-by-step guide to becoming a data engineer', 'data engineering projects for practice'] instructions='Explore various resources and structured learning paths for data engineering, focusing on beginner-friendly courses, essential skills, and practical projects.'


In [26]:
research_task = clarifier_results2.output

In [28]:
clarifier_results2.output.queries

['data engineering learning path for beginners',
 'essential skills for data engineers',
 'best online courses for data engineering',
 'data engineering tools and technologies',
 'step-by-step guide to becoming a data engineer',
 'data engineering projects for practice']

In [29]:
clarifier_results2.output.instructions

'Explore various resources and structured learning paths for data engineering, focusing on beginner-friendly courses, essential skills, and practical projects.'

## Researcher Agent

In [27]:
researcher_instructions = """
You are the RESEARCH agent.

ROLE
You perform structured research on a proprietary podcast/video database for a specific stage
of exploration (Stage 1, 2, or 3).

DATA SOURCE
- You may ONLY use the `search()` function
- Every reference must cite a real snippet with a valid `youtube_id`, `timestamp` and `_id`.
- Do not invent data, names, or timestamps.

INTENT HANDLING
- Before searching, infer the underlying intent behind the user's request.
  Examples:
    - “getting into ML” → learning pathways, beginner resources, first projects
    - “AI safety concerns” → risks, ethical challenges, mitigation strategies
    - “startup funding trends” → investment patterns, valuations, stages
- Generate searches that reflect this **intent**, not just literal words.

STAGES

Stage 1 — Initial Search
- Use the user’s question or clarified keywords from context.
- Identify 3–5 primary keywords, run one or more searches.
- Summarize the main findings, highlighting initial insights and directions.

Stage 2 — Expansion
- Build upon Stage 1 outputs (from context).
- Generate 5–7 related or complementary queries.
- Summarize recurring ideas and patterns across new results.

Stage 3 — Deep Dive
- Build upon Stage 1 and Stage 2.
- Generate 5–7 deeper or contrasting queries.
- Explore nuances, counterpoints, or mechanisms.
- Provide a more analytical synthesis.

CONSTRAINTS
- Use context from previous stages to guide deeper exploration.
- You must perform the necessary amount of queries for each stage:
    - 3-5 for stage 1
    - 5-7 for stage 2
    - 5-7 for stage 3
"""

In [30]:
class Reference(BaseModel):
    """
    A single, verifiable citation to a transcript snippet or video segment.
    Must correspond to a real snippet returned by the `search()` tool.
    """
    document_id: int = Field(..., description="Internal ID of the transcript snippet.")
    quote: str = Field(..., description="Exact snippet that supports the keyword or insight.")
    timestamp: str = Field(..., description="Timestamp in the source video where the quote occurs, 'mm:ss' or 'h:mm:ss'")
    relevance_to_keyword: str = Field(..., description="Explanation of *how* this quote supports or illustrates the specific keyword or concept being explored.")
    relevance_to_user_intent:  str = Field(..., description="Explanation of *how* this quote help the user with their intent.")

class ResearchKeyword(BaseModel):
    """
    Represents a keyword explicitly searched during this research stage.
    Each keyword must match an actual query used in the search tool calls.
    """
    keyword: str = Field(..., description="The exact keyword or phrase used in the search() tool call.")
    relevant_references: List[Reference] = Field(
        ..., 
        description="List of transcript snippets directly relevant to this keyword. Each must include a 'relevance_to_keyword' explanation."
    )


class VerifiableInsight(BaseModel):
    """
    A synthesized insight that can be traced back to specific evidence.
    Each insight must be supported by at least one real reference.
    """
    insight: str = Field(..., description="An insight derived from the research, phrased in an evidence-based, verifiable way.")
    references: List[Reference] = Field(..., description="Citations that directly support this insight. Must contain valid timestamps and IDs.")


class ResearchStageReport(BaseModel):
    """
    Structured output for each research stage (1–3).
    Ensures traceability between searches, keywords, and findings.
    """
    stage: int = Field(..., description="The research stage number (1 = Initial Search, 2 = Expansion, 3 = Deep Dive).")
    explored_keywords: List[ResearchKeyword] = Field(
        ..., 
        description="List of the *exact* keywords used in this stage's search() calls, along with references showing their relevance."
    )
    verifiable_insights: List[VerifiableInsight] = Field(
        ..., 
        description="List of data-backed insights derived from the references gathered at this stage."
    )
    stage_summary: str = Field(..., description="Analytical summary of what was learned at this stage, connecting evidence to emerging themes.")
    recommended_next_steps: str = Field(..., description="Guidance for what to do in the next stage — e.g., new angles, counterpoints, or subtopics.")
    recommended_next_keywords: List[str] = Field(
        ..., 
        description="Suggested next queries based on gaps or promising directions discovered in this stage."
    )

In [31]:
researcher = Agent(
    name='researcher',
    instructions=researcher_instructions,
    tools=[tools.search],
    model='gpt-4o-mini',
    output_type=ResearchStageReport
)

researcher_callback = NamedCallback(researcher)

In [33]:
stage = 1
stage_instructions = research_task.model_dump_json()
previous_stages_json = []

In [34]:
async def do_research(
    stage: int,
    stage_instructions: str,
    previous_stages: List[ResearchStageReport]
) -> ResearchStageReport:
    previous_stages_json = '\n'.join([r.model_dump_json() for r in previous_stages])
    
    user_prompt = f"""
    Current stage: {stage}

    Stage instrustructions:
    {stage_instructions}

    Previous stages:
    {previous_stages_json}
    """

    callback = NamedCallback(researcher)

    results = await researcher.run(
        user_prompt=user_prompt,
        event_stream_handler=callback,
    )

    return results.output

In [35]:
stage1 = await do_research(
    stage=1,
    stage_instructions=research_task.model_dump_json(),
    previous_stages=[]
)

CALL → search {'query': 'data engineering learning path for beginners'} call_PzX6FWvolNVEHc7ZrC5v7H0j
CALL → search {'query': 'essential skills for data engineers'} call_UfQjXQTMEFUGkY5hNrCtnsI3
CALL → search {'query': 'best online courses for data engineering'} call_JLqVF7rIrXgzmSNbuHnL9Xr4
CALL → search {'query': 'data engineering tools and technologies'} call_xvI8X4YbzzcxxgexB68km1iD
CALL → search {'query': 'step-by-step guide to becoming a data engineer'} call_4e5AisxqtTlyluXPNH2arcT9
RES  ← search call_UfQjXQTMEFUGkY5hNrCtnsI3 [{'start': 9000, 'content': "a engineers like uh buy\n9:28 the book and the way it's supposed to be\n9:30 a difference in terms of um how you\n9:32 process the data like big data\n9:34 engineering requires a bit different\n9:36 tools like heavy load\n9:37 optimizations data engineering small\n9:40 like\n9:40 software engineering on the back end but\n9:43 in reality i would say that\n9:44 a lot of companies name big data\n9:46 engineers as data engineers and 

In [45]:
stage1

ResearchStageReport(stage=1, explored_keywords=[ResearchKeyword(keyword='data engineering learning path for beginners', relevant_references=[Reference(document_id=2965, quote='so definitely i felt that before I had that sort of guidance I was just kind of fumbling around in the dark reading a lot of different articles...', timestamp='19:40', relevance_to_keyword='This quote highlights that having guidance is critical for following an effective learning path in data engineering, especially for beginners.', relevance_to_user_intent='It illustrates the importance of finding clear directions for learning data engineering.')]), ResearchKeyword(keyword='essential skills for data engineers', relevant_references=[Reference(document_id=7527, quote='I would say the most important one is coding skills...', timestamp='11:33', relevance_to_keyword='This snippet points out that coding skills are fundamental for data engineers, emphasizing the skills necessary for the role.', relevance_to_user_intent

In [44]:
print(stage1.model_dump_json(indent=2))

{
  "stage": 1,
  "explored_keywords": [
    {
      "keyword": "data engineering learning path for beginners",
      "relevant_references": [
        {
          "document_id": 2965,
          "quote": "so definitely i felt that before I had that sort of guidance I was just kind of fumbling around in the dark reading a lot of different articles...",
          "timestamp": "19:40",
          "relevance_to_keyword": "This quote highlights that having guidance is critical for following an effective learning path in data engineering, especially for beginners.",
          "relevance_to_user_intent": "It illustrates the importance of finding clear directions for learning data engineering."
        }
      ]
    },
    {
      "keyword": "essential skills for data engineers",
      "relevant_references": [
        {
          "document_id": 7527,
          "quote": "I would say the most important one is coding skills...",
          "timestamp": "11:33",
          "relevance_to_keyword": "Thi

In [36]:
stage2 = await do_research(
    stage=2,
    stage_instructions="continue research",
    previous_stages=[stage1]
)

CALL → search {'query': 'data engineering projects for beginners'} call_U3tErpkDZvjWV6rtiO3RFtho
CALL → search {'query': 'advanced data engineering skills'} call_jK1GMwxZ1fz9z3GfI0AbOnC2
CALL → search {'query': 'career transitions to data engineering'} call_KPyHfQs0oO7Bv344WhMJzqqb
CALL → search {'query': 'mentorship in data engineering'} call_44iCrRmO8DY3AuQoVMWECe5K
CALL → search {'query': 'data engineering certifications'} call_iAH9afrQLxz8BQPDMTl4YjTf
CALL → search {'query': 'real-world data engineering examples'} call_5B1uTYNGHzMUPwLZQdn6H9fO
CALL → search {'query': 'data engineering interview preparation'} call_I1zHYs2jFV46b525GqTYZBYw
RES  ← search call_U3tErpkDZvjWV6rtiO3RFtho [{'start': 1500, 'content': "during the interview and that's\n1:33 all i have\n1:37 for the introduction are you ready to\n1:38 start yeah\n1:40 okay so let me just\n1:47 pull my notes\n1:52 okay we have um today we will talk about\n1:55 uh the difference between uh\n1:57 big data engineers and data scien

In [47]:
print(stage2.model_dump_json(indent=1))

{
 "stage": 2,
 "explored_keywords": [
  {
   "keyword": "data engineering projects for beginners",
   "relevant_references": [
    {
     "document_id": 6845,
     "quote": "...one useful data set could be car parking data because we collect uh car parking figures in real time...",
     "timestamp": "47:07",
     "relevance_to_keyword": "This quote provides a concrete example of a beginner project involving real-time data.",
     "relevance_to_user_intent": "It suggests a practical project idea that can help beginners apply data engineering skills."
    },
    {
     "document_id": 2284,
     "quote": "...first if you start with the source systems where data exists right it could be relational database flat files cloud s3...",
     "timestamp": "58:12",
     "relevance_to_keyword": "This snippet discusses the foundational steps in data engineering that beginners should understand.",
     "relevance_to_user_intent": "It conveys basics of data engineering project workflow, useful for be

In [48]:
stage3 = await do_research(
    stage=3,
    stage_instructions="finish research",
    previous_stages=[stage1, stage2]
)

CALL → search {'query': 'advanced data engineering skills'} call_eTeG4HivalZD0buG00WbnrGs
CALL → search {'query': 'data engineering certifications'} call_yBPGq2JqrMa7OJOLStxrNvrT
CALL → search {'query': 'mentorship opportunities in data engineering'} call_JRcEzrYwbG0n1yXJxmnpG4Bm
CALL → search {'query': 'real-world applications of data engineering'} call_sr2t5sXOGm3J4tVZZT9wYk8a
CALL → search {'query': 'transitioning from academic to professional in data engineering'} call_rlwea5ceH7pdTrohhqqTqEiD
CALL → search {'query': 'data engineering interview preparation'} call_X43dMzO9YaXAUsvqk1yplPSv
CALL → search {'query': 'best practices for data engineering projects'} call_uifSsg3Dqdk1p34CpozmH2qV
RES  ← search call_yBPGq2JqrMa7OJOLStxrNvrT [{'start': 16500, 'content': "not forget anything it's\n18:53 uh\n18:54 uh I tried a lot of\n18:57 scripting languages I ended up using\n19:00 python because it was the most used in\n19:03 the industry just I didn't think more\n19:06 about it uh\n19:09 it

In [49]:
print(stage3.model_dump_json(indent=2))

{
  "stage": 3,
  "explored_keywords": [
    {
      "keyword": "advanced data engineering skills",
      "relevant_references": [
        {
          "document_id": 622,
          "quote": "...there's often a lot of pipelining work that you do on yourself.",
          "timestamp": "10:33",
          "relevance_to_keyword": "This snippet highlights the transferable skills and responsibilities associated with data engineering, showcasing the overlap between data science and engineering roles.",
          "relevance_to_user_intent": "It illustrates the complexity and skills needed in data engineering, informing users about career pathways."
        },
        {
          "document_id": 7552,
          "quote": "I would say that coding skills...is also important...",
          "timestamp": "47:00",
          "relevance_to_keyword": "This indicates that coding is crucial in both data science and engineering, emphasizing the foundational skills necessary for data careers.",
          "relev

## Syntensizer/Verifier

In [58]:
synthesizer_instructions = """
You are the SYNTHESIZER agent.

ROLE
You create a cohesive, factual article by synthesizing verified information from all
three research stages (StageReports 1–3).

DATA SOURCES
- You will receive one or more `ResearchStageReport` objects, each containing
  verifiable references with document_ids, timestamps, and quotes.
- You have access to the tool `get_document_by_id` to retrieve full source text
  for any reference.
- You must use this tool to verify every claim that appears in your article.

TASKS
1. Carefully read all StageReports and extract recurring insights and verified facts.
2. Use `get_document_by_id` to check each cited reference and confirm that
   the quote or insight is correctly represented.
3. Only include claims that are explicitly supported by at least one verified source.
4. Synthesize related findings into 5–6 cohesive sections with a logical flow.
5. Ensure that the article aligns with the original user intent (as passed from the clarifier).

ARTICLE STRUCTURE
- Introduction: Summarize what the article will explore and why it matters.
- 5-6 body sections, each:
  - Centered on one major theme or subtopic.
  - Contains 3–4 related claims (each 3–4 sentences long).
  - Each claim includes an in-text reference
- Conclusion: Summarize the most important insights and actionable takeaways.

VERIFICATION RULES
- For every claim, retrieve at least one cited source using `get_document_by_id`
  and confirm that the text supports the claim.
- If a reference cannot be verified or is inconsistent, omit it.
- Do not invent or infer facts beyond what’s supported by verified material.

STYLE
- Maintain factual, neutral, and coherent tone.
- Avoid speculation, exaggeration, or unsupported synthesis.
- Write in clear prose suitable for an informed but general audience.

OUTPUT
- A single, well-structured factual article ready for presentation.
- All references cited
"""

In [59]:
class Reference(BaseModel):
    """Citations that directly tie each claim to a verifiable source."""
    quote: str = Field(..., description="A short, verbatim quote (2–4 sentences) from the database snippet.")
    youtube_id: str = Field(..., description="Video ID")
    timestamp: str = Field(..., description="Timestamp to the exact position in the video where the quote is, 'h:mm:ss' or 'mm:ss' format.")

class Keyword(BaseModel):
    """Research results for a specific keyword"""
    search_keyword: str = Field(..., description="Exact keyword used for search.")
    summary: str = Field(..., description="Short summary of the search result.")
    references: List[Reference] = Field(..., description="Specific references to help us track the findings of the research.")
    relevance_summary: str = Field(..., description="1 sentence for each reference explainig how it supports the keyword's summary — ensure factual consistency.")
    other_ideas: str = Field(..., description="Free-form description of related or complimentary ideas to explore in next stages.")

class StageReport(BaseModel):
    """Summarizes what was found during a single exploration stage."""
    stage: int = Field(..., description="Stage number (1 for initial search, 2 for expansion, 3 for deep dive).")
    keywords: List[Keyword] = Field(..., description="Search keywords ")
    summary: str = Field(..., description="A concise synthesis of insights found in this stage, summarizing themes and discoveries from all queries executed in the stage.")

class Claim(BaseModel):
    """A factual statement supported by one specific reference."""
    description: str = Field(..., description=(
        "A short paragraph (3–4 sentences) that paraphrases the meaning of the quote in your own words. "
        "It must stay faithful to the factual content of the quote — no speculation or extrapolation."
    ))
    relevance_check: str = Field(..., description=(
        "1–2 sentences explaining *why* this quote supports the claim — a brief justification to ensure factual grounding."
    ))
    reference: Reference = Field(..., description=(
        "A direct quote that explicitly supports or demonstrates the statement made in 'description'. "
        "The claim should be a paraphrase or interpretation of this quote."
    ))

class ArticleSection(BaseModel):
    """One thematic part of the final article, containing multiple claims."""
    title: str = Field(..., description="A concise section title summarizing the theme.")
    claims: List[Claim] = Field(..., description="3–4 claims that explore different aspects of this section's theme.")

class ActionPoint(BaseModel):
    """Practical takeaways from the research."""
    point: str = Field(..., description="A concrete recommendation, insight, or action derived from the research.")
    relevance_check: str = Field(..., description="Explain how the referenced quote supports this action point — must show logical connection, not assumption.")
    reference: Reference = Field(..., description="Source supporting this action point.")

class Article(BaseModel):
    """The final synthesized output — a structured article summarizing all research stages."""
    title: str = Field(..., description="Compelling headline summarizing the topic and main insight (7-10 words).")
    introduction: str = Field(..., description="A short overview (3-4 paragraphs) explaining what the research explored and why it matters.")
    sections: List[ArticleSection] = Field(..., description="5-8 well-structured sections presenting grouped claims by topic.")
    action_points: List[ActionPoint] = Field(..., description="Optional 3-5 key insights or recommendations derived from the findings.")
    conclusion: str = Field(..., description="Final synthesis paragraph summarizing the broader takeaways and closing thoughts.")

class ResearchReport(BaseModel):
    """The complete record of exploration across all stages, culminating in the final article."""
    stages: List[StageReport] = Field(..., description="Exploration stage reports (Stage 1–3) detailing the search process.")
    article: Article = Field(..., description="The final article.")

In [60]:
synthesizer = Agent(
    name='synthesizer',
    instructions=synthesizer_instructions,
    tools=[tools.get_document_by_id],
    model='gpt-4o-mini',
)

synthesizer_callback = NamedCallback(synthesizer)

In [61]:
all_reports = [stage1, stage2, stage3]
reports = '\n'.join([r.model_dump_json() for r in all_reports])

user_prompt = f"""
initial request:
{research_task.model_dump_json()}

reports:
{reports}
"""

In [62]:
synthesizer_results = await synthesizer.run(
    user_prompt=user_prompt,
    output_type=ResearchReport,
    event_stream_handler=synthesizer_callback
)

CALL → get_document_by_id {'_id': 2965} call_eDMJUMmGpCttZsElaWqiDHzz
CALL → get_document_by_id {'_id': 7527} call_5SN4lICntMMSohGJWqAfi2d9
CALL → get_document_by_id {'_id': 643} call_sijVIK91Plt6PweED2mexCFG
CALL → get_document_by_id {'_id': 2283} call_rrSpubjmiMXexGSaQUuvABlG
CALL → get_document_by_id {'_id': 618} call_A66AHUz1BJwW8mcBDeu8Lj0K
CALL → get_document_by_id {'_id': 6845} call_i03SBAesZXGqM2CJ82ijbvvw
CALL → get_document_by_id {'_id': 2284} call_n03zLzriQakwLBFfYOr8dFql
CALL → get_document_by_id {'_id': 622} call_CT9XwKxJyuIfVbTMSV54ubuZ
CALL → get_document_by_id {'_id': 7552} call_sP1PvZhRitAkOjbHtGad0MuA
CALL → get_document_by_id {'_id': 5734} call_IJzdEgtrDXOCcRwjCAIU2QFh
CALL → get_document_by_id {'_id': 618} call_UKG3HlRB4YsqMKERjxT2dglh
CALL → get_document_by_id {'_id': 2236} call_KzPtke0I8kDR1K4VxExAkZYG
CALL → get_document_by_id {'_id': 4058} call_mhncjgdBkydyB3REX73xfYgL
CALL → get_document_by_id {'_id': 2319} call_vtzIgnE7sRQuiwfWfWyaVWWz
CALL → get_document_by_i

In [65]:
synthesizer_results

AgentRunResult(output=ResearchReport(stages=[StageReport(stage=1, keywords=[Keyword(search_keyword='data engineering learning path for beginners', summary='The importance of guidance for beginners in data engineering was highlighted.', references=[Reference(quote='so definitely i felt that before I had that sort of guidance I was just kind of fumbling around in the dark reading a lot of different articles...', youtube_id='HVQ0DZOQcts', timestamp='19:40')], relevance_summary='This quote emphasizes how essential clear guidance is for newcomers to learn data engineering effectively.', other_ideas='Exploring structured mentorship options could further enhance the learning experience.'), Keyword(search_keyword='essential skills for data engineers', summary='Experts consider coding skills fundamental for success in data engineering.', references=[Reference(quote='I would say the most important one is coding skills...', youtube_id='yg3d1lFd7Uo', timestamp='11:33')], relevance_summary='This sn

In [68]:
synthesizer_results.output.article

Article(title='A Structured Learning Path for Aspiring Data Engineers', introduction='Data engineering is a critical field that underpins the modern data-driven world. As organizations increasingly rely on data for decision-making, the demand for skilled data engineers has surged. This article seeks to provide a structured learning path for those looking to start a career in data engineering, emphasizing essential skills, tools, and resources.', sections=[ArticleSection(title='The Importance of Guidance for Beginners', claims=[Claim(description="For beginners in data engineering, having structured guidance is crucial. As one expert noted, prior to receiving direction, many individuals feel like they are 'fumbling around in the dark,' which can lead to confusion and frustration. This highlights the necessity of finding a clear learning path to navigate the complexities of data engineering (HVQ0DZOQcts).", relevance_check='This insight emphasizes how critical a structured approach is for

In [69]:
article = synthesizer_results.output.article

In [70]:
def to_link(reference) -> str:
    """
    Converts the timestamp to a YouTube URL with a proper time offset.
    Supports both 'h:mm:ss' and 'mm:ss' formats.
    """
    if not reference.timestamp:
        return f"https://www.youtube.com/watch?v={reference.youtube_id}"

    ts = reference.timestamp.strip()
    if not ts:
        return f"https://www.youtube.com/watch?v={reference.youtube_id}"

    parts = ts.split(":")

    try:
        parts = [int(p) for p in parts]
    except ValueError:
        return f"https://www.youtube.com/watch?v={reference.youtube_id}"

    if len(parts) == 3: # h:mm:ss
        hours, minutes, seconds = parts
    elif len(parts) == 2: # mm:ss
        hours, minutes, seconds = 0, parts[0], parts[1]
    elif len(parts) == 1:
        hours, minutes, seconds = 0, 0, parts[0]

    total_seconds = hours * 3600 + minutes * 60 + seconds
    return f"https://www.youtube.com/watch?v={reference.youtube_id}&t={total_seconds}s"

def diplay_reference(reference: Reference): 
    return f"[{reference.quote}]({to_link(reference)})" 


In [71]:
report = synthesizer_results.output

# Display stage-by-stage findings
for stage in report.stages:
    print('Stage:', stage.stage)
    for kw in stage.keywords:
        print('  keyword:', kw.search_keyword)
        print('  summary:', kw.summary)
        print('  references:', [diplay_reference(r) for r in kw.references])
    print(stage.summary)

# Display the final article
article = report.article
print('#', article.title)
print('## Introduction')
print(article.introduction)

for section in article.sections:
    print('##', section.title)
    for claim in section.claims:
        print(claim.description, '(', diplay_reference(claim.reference), ')')

print('## Action Points')
for action_point in article.action_points:
    print('*', action_point.point, diplay_reference(action_point.reference))

print('## Conclusion')
print(article.conclusion)

Stage: 1
  keyword: data engineering learning path for beginners
  summary: The importance of guidance for beginners in data engineering was highlighted.
  references: ['[so definitely i felt that before I had that sort of guidance I was just kind of fumbling around in the dark reading a lot of different articles...](https://www.youtube.com/watch?v=HVQ0DZOQcts&t=1180s)']
  keyword: essential skills for data engineers
  summary: Experts consider coding skills fundamental for success in data engineering.
  references: ['[I would say the most important one is coding skills...](https://www.youtube.com/watch?v=yg3d1lFd7Uo&t=693s)']
  keyword: best online courses for data engineering
  summary: Numerous online courses are available that focus on various skills fundamental to data engineering.
  references: ['[There are good courses about this...](https://www.youtube.com/watch?v=HVQ0DZOQcts&t=1504s)']
  keyword: data engineering tools and technologies
  summary: Understanding various tools us

# Orchestrator

In [72]:
from pydantic_ai import RunContext

In [73]:
def print_messages(messages):
    contents = []
    
    for m in messages:
        print(m.kind)

        for p in m.parts:
            print(p.part_kind)
            kind = p.part_kind
            if kind == 'user-prompt' or kind == 'text':
                print(p.content)
            if kind == 'tool-call': 
                print(p.tool_name, p.args)
            if kind == 'tool-return':
                print(type(p.content), p.content)
            print()

        print()

In [74]:
orchestrator_instructions = """
first, ask user an initial question via clarifier (clarify_tool_initial)
then formulate requiremets for the researcher (clarify_tool_research_task)
then execute research via researcher in three stages: 1, 2, 3 (reserch_tool)
each research step should be done after the previous one is completed

make it timeless: don't add years to queries. for example:
"learning machine learning" is better than "learning machine learning in 2023"

when the resarch it ready, output a short summary of the research
"""

orchestrator = Agent(
    name='orchestrator',
    instructions=orchestrator_instructions,
    model='gpt-4o-mini',
)

orchestrator_callback = NamedCallback(orchestrator)

In [75]:
@orchestrator.tool
async def clarify_tool_initial(ctx: RunContext, query: str) -> str:
    """Runs the clarifier once to interpret the user's request.

    Args:
        query: Raw user question.

    Returns:
        A short text summary describing the user's intent.
    """
    print("\n=== Clarifier (Initial) ===")
    callback = NamedCallback(clarifier)
    results = await clarifier.run(user_prompt=query, event_stream_handler=callback)
    return results.output

In [76]:
@orchestrator.tool
async def clarify_tool_research_task(ctx: RunContext, query: str) -> ResearchInstructions:
    """Runs the clarifier again using both the user query and prior clarifier output
    to create a structured ResearchInstructions object.

    Args:
        query: User's original question.

    Returns:
        ResearchInstructions with refined request, intent, and search queries.
    """
    print("\n=== Clarifier (Research Task) ===")
    prior_outputs = []
    for m in ctx.messages:
        for p in m.parts:
            if p.part_kind == "tool-return" and p.tool_name == "clarify_tool_initial":
                prior_outputs.append(p.content)

    prior_text = "\n".join(str(x) for x in prior_outputs)

    prompt = f"""
    User query:
    {query}
    
    Prior clarification:
    {prior_text}
    """.strip()

    callback = NamedCallback(clarifier)

    results = await clarifier.run(
        user_prompt=prompt,
        event_stream_handler=callback,
        output_type=ResearchInstructions
    )

    return results.output

In [77]:
@orchestrator.tool
async def research_tool(ctx: RunContext, stage: int, stage_instructions: str) -> ResearchStageReport:
    """Runs one stage of research using prior reports as context.

    Args:
        stage: Research stage number (1–3).
        stage_instructions: Description of what this stage should focus on.

    Returns:
        ResearchStageReport with insights, references, and next steps.
    """
    print(f"\n=== RESEARCH stage {stage} ===")
    
    prior_reports: List[ResearchStageReport] = []

    for m in ctx.messages:
        for p in m.parts:
            if p.part_kind == "tool-return" and p.tool_name == "research_tool":
                if isinstance(p.content, ResearchStageReport):
                    prior_reports.append(p.content)
    
    result = await do_research(
        stage=stage,
        stage_instructions=stage_instructions,
        previous_stages=prior_reports,
    )

    return result

In [78]:
message_history = []

In [79]:
question = "how do I get started with data engineering?"

In [80]:
orchestrator_results = await orchestrator.run(
    user_prompt=question,
    message_history=message_history,
    event_stream_handler=orchestrator_callback,
)

CALL → clarify_tool_initial {'query': 'how do I get started with data engineering?'} call_04XrqlBrj5fDygEFmZe4RgEB

=== Clarifier (Initial) ===
RES  ← clarify_tool_initial call_04XrqlBrj5fDygEFmZe4RgEB To help clarify your request, could you please specify what aspect of getting started with data engineering you are most interested in? For example, are you looking for educational resources, specific skills to learn, job opportunities, or practical projects to undertake?


In [81]:
messages = orchestrator_results.new_messages()
message_history.extend(messages)
print_messages(messages)

request
user-prompt
how do I get started with data engineering?


response
tool-call
clarify_tool_initial {"query":"how do I get started with data engineering?"}


request
tool-return
<class 'str'> To help clarify your request, could you please specify what aspect of getting started with data engineering you are most interested in? For example, are you looking for educational resources, specific skills to learn, job opportunities, or practical projects to undertake?


response
text
What aspect of getting started with data engineering are you most interested in? For example, are you looking for educational resources, specific skills to learn, job opportunities, or practical projects to undertake?




In [82]:
callback = NamedCallback(orchestrator)

message_history = []

while True:
    user_input = input('You ', )
    if user_input.lower().strip() == 'stop':
        break

    results = await orchestrator.run(
        user_prompt=user_input,
        message_history=message_history,
        event_stream_handler=callback,
    )

    new_messages = results.new_messages()
    message_history.extend(new_messages)
    print_messages(new_messages)

CALL → clarify_tool_initial {'query': 'How do I learn about data science'} call_OFnvlmWvHMpgzI86BPLhGiAB

=== Clarifier (Initial) ===
RES  ← clarify_tool_initial call_OFnvlmWvHMpgzI86BPLhGiAB To refine your request, could you please clarify what specific aspects of data science you're interested in? For example, are you looking for resources (courses, books), learning paths, practical projects, or something else?
request
user-prompt
How do I learn about data science


response
tool-call
clarify_tool_initial {"query":"How do I learn about data science"}


request
tool-return
<class 'str'> To refine your request, could you please clarify what specific aspects of data science you're interested in? For example, are you looking for resources (courses, books), learning paths, practical projects, or something else?


response
text
What specific aspects of data science are you interested in? For example, are you looking for resources (courses, books), learning paths, practical projects, or som

In [83]:
prior_reports: List[ResearchStageReport] = []

for m in message_history:
    for p in m.parts:
        if p.part_kind == "tool-return" and p.tool_name == "research_tool":
            if isinstance(p.content, ResearchStageReport):
                prior_reports.append(p.content)

In [85]:
prior_reports

[ResearchStageReport(stage=1, explored_keywords=[ResearchKeyword(keyword='practical applications of data science', relevant_references=[Reference(document_id=6132, quote='at the code 80 Driven Academy we teach a lot of courses on everything data science', timestamp='3:09', relevance_to_keyword='This quote provides a context for learning practical data science through structured courses.', relevance_to_user_intent='The mention of the academy indicates opportunities for applying data science practically via courses offered.'), Reference(document_id=6131, quote='I created a deep learning with NLP course or an unsupervised learning course and those are more detailed or more specific topics', timestamp='3:25', relevance_to_keyword='Courses that are specifically designed for certain data science skills show direct pathways to practical applications.', relevance_to_user_intent="This aligns with the user's interest in detailed, practical applications in data science."), Reference(document_id=4

In [84]:
len(prior_reports)

3