In [None]:
from pydantic_ai import Agent
from utils import AgentConfig, NamedCallback

In [3]:
from pydantic import BaseModel, Field 
from typing import Optional


class Reference(BaseModel):
    title: Optional[str] = None
    episode_name: Optional[str] = None
    authors: Optional[str] = None
    published_year: Optional[int] = None
    url: Optional[str] = None
    start_time: Optional[str]=None
    end_time: Optional[str]=None
    def format_citations(self) -> str:
        if self.episode_name:
            citation = f"{self.episode_name} ({self.start_time}-{self.end_time})"
        else:
            citation = f"{self.authors} ({self.published_year}). *{self.title}*. {self.url}"
        return citation

class Section(BaseModel):
    heading:str
    content:str
    references:list[Reference] = Field(default_factory=list)

class ResultResponse(BaseModel):
    context: str
    sections: list[Section]
    reference: list[Reference] = Field(default_factory=list)

    def format_response(self) -> str:
        output = "### Context"
        output += f"{self.context}\n\n"

        for section in self.sections:
            output += f"### {section.heading}\n\n"
            output += f"{section.content}\n\n"
        if self.references:
            output += "### References\n"
            for ref in self.references:
                output += f"- {ref.format_citations()}\n"
        return output


In [4]:

orchestrator_instructions = """
You are the Orchestrator in charge of delegating tasks to the right specialized agent.

YOUR CHOICE OF AGENTS:
- clarifier_agent rewrites the original query in exactly 3 distinct ways which will be passed downstream to the search agent

- search_agent works with the elastic search vector store and has two tools:
   - embed: embed query strings
   - vector_search: search the vector store for matches between queries and the huberman knowledge base 

- websearch_agent searches the web for further information on research topics and has two tools:
   - search_web: find relevant links pertaining to research topics across a selected set of web domains.
   - web_page_content: retrieve the Markdown content of selected webpages.

YOUR WORKFLOW: 
- ALWAYS invoke the clarifier_agent first to rephrase the user query; this increases the chances of better search results
- Next, invoke the search_agent. The search_agent makes one search to the vector store using all three queries to increase chances of matches. Make sure search_agent comprehensively searches the vector database.
- Finally, the user may ask for more information. You ask the user if they want to proceed with further research, and ONLY THEN SHOULD YOU INVOKE THE websearch_agent.

RULES:
- Each one of your agents have their own workflow. They understand what they should do. Do not interfere with their work. Your job is to delegate.
- Each one of your agents have a specific structured output format they need to follow. Allow them to provide this.
- Verify that all responses that you provide to the user is valid json so that it can be properly parsed.
- Never invent tool names or fields names.
- Never answer using free-form prose. Ensure valid JSON.
- Never hallucinate search results, webpage content, or metadata.
- Never use external knowledge except what is returned by search_agent or websearch_agent. Explicitly state when you are unsure of something.

FOLLOW THIS OUTPUT STRUCTURE:
- Context: concise summary of what was done, rephrase the user question, and what will be in the output.
- Content section(s)
- References
"""

def create_orchestrator(config: AgentConfig = None):
    """Build and return the orchestrator Agent with configured instructions."""
    if config is None:
        config = AgentConfig()

    orchestrator = Agent(
        name = "orchestrator",
        instructions = orchestrator_instructions,
        model = config.model,
        output_type = ResultResponse
    )
    return orchestrator

orchestrator = create_orchestrator()
orchestrator_callback = NamedCallback(orchestrator)


In [5]:
from typing import List
from pydantic import BaseModel
from pydantic_ai import RunContext


class RewriteResponse(BaseModel):
    rewrites: str

clarifier_instructions = """ 

You assist the search_agent and websearch_agent.
You take a user's query and rewrite it 3 distinct ways using different phrasing, key terms, related subquestions.

"""

def create_clarifier_agent(config: AgentConfig = None) -> Agent:
    """Instantiate the clarifier agent that rewrites user queries."""

    if config is None: 
        config = AgentConfig()

    clarifier_agent = Agent(
        name="clarifier_agent",
        instructions=clarifier_instructions,
        model=config.model,
        output_type=RewriteResponse
    )
    return clarifier_agent

clarifier_agent = create_clarifier_agent()

@orchestrator.tool
async def rewrite_user_query(ctx: RunContext, query:str) -> str:
    """Use the clarifier agent to produce three rewritten queries."""
    callback = NamedCallback(clarifier_agent)
    results = await clarifier_agent.run(user_prompt=query, event_stream_handler=callback)
    return results.output


In [6]:

from utils import AgentConfig
from search_agent import SearchResultResponse
from sentence_transformers import SentenceTransformer


search_instructions = """
You are an assistant that specializes in finding relevant passages from the Huberman Lab podcast archive (topics include but not limited to sleep, motivation, neuroscience, fitness, general health). 

SEARCH STRATEGY
- Do ONE vector search with embeddings from all queries.
- Merge the retrieved chunks, synthesize an answer based on the retrieved chunks in natural language, and cite every statement with its reference metadata. Make sure you include the rephrased question in your response.
- Paraphrase the user's query clearly and include this in your final response.
- If no relevant chunks are found after all rewrites, state that explicitly and offer general guidance.

TOOLS YOU CAN USE
- embed - embed all queries
- vector_search() - fetch relevant chunks

RULES
- Call vector search ONE time.
- Search comprehesively to provide a comprehensive response. Aim for one section; the response reads like an eassy with concise paragraphs and references, like (podcast episode name, timestamps). 
- Aim for more than one reference. DO NOT INVENT REFERENCES.
- Always reference a claim. For example, "A study highlighted...", CITE THIS with the episode name and timestamps.
- Provide references with the name of the episode with time stamps and in parentheses the Huberman Lab Podcast, like so (Huberman Lab Podcast).
- Use only information returned from the vector search tool; never invent facts. EXPLICITLY state that you are giving general guidance if information you provided was not derived from the search tool.
- For each response, rewrite the user's question clearly and ensure that you are answering the question that you rewrote.
- Write your answer clearly and accurately.


CONTEXT:
---
{chunk}
---

""".strip()


def create_search_agent(config: AgentConfig = None):
    if config is None:
        config = AgentConfig()
    search_agent = Agent(
        name="search_agent",
        instructions=search_instructions, 
        model=config.model
    )
    return search_agent

search_agent = create_search_agent()


@orchestrator.tool
async def embed(ctx: RunContext, query:str, config:AgentConfig=None):
    embedding_model = SentenceTransformer(config.embedding_model_name)
    embeddings = embedding_model.encoding(query)
    return embeddings.tolist()



@orchestrator.tool
async def vector_search(ctx: RunContext, query:str, config:AgentConfig=None):
    """Run the domain search agent before falling back to web search."""
    if config is None:
        config = AgentConfig()

    prior_outputs = []
    for m in ctx.messages:
        for p in m.parts:
            if p.part_kind == "tool-return" and p.tool_name == "embed":
                prior_outputs.append(p.content)

    prior_text = "".join(str(x) for x in prior_outputs)

    prompt = f"""
    User query:
    {query}

    Prior clarification:
    {prior_text}
    """.strip()

    callback = NamedCallback(search_agent)

    results = await search_agent.run(
        user_prompt=prompt, 
        event_stream_handler=callback, 
        output_type=SearchResultResponse

    )
    return results.output.format_response()


  from .autonotebook import tqdm as notebook_tqdm


In [7]:
from websearch_agent import websearch_instructions, ResearchReport
import requests
import random
import os


def create_websearch_agent(config:AgentConfig = None):
    """Create the websearch agent that uses Brave API tools."""
    if config is None:
        config = AgentConfig()
    
    websearch_agent = Agent(
        name="websearch_agent",
        instructions=websearch_instructions,
        model=config.model
    )
    return websearch_agent

websearch_agent = create_websearch_agent()


@orchestrator.tool
async def search_web(ctx: RunContext, query:str):
    """Call the Brave API and delegate summarization to the web agent."""

    preferred_sites = [
    "brainfacts",
    "nimh",
    "nih"
    "alleninstitute",
    "mit",
    "stanford",
    "acsm",
    "nsca",
    "acefitness",
    "exerciseismedicine",
    "bjsm",
    "apa",
    "stanford",
    "motivationscience",
    "berkeley",
    "mayoclinic",
    "clevelandclinic",
    "harvard",
    "hopkinsmedicine",
    "cdc",
    "mit",
    "mpg",
    "yale",
    "scientificamerican",
    "psychologytoday",
    "nature",
    "science"
]
    
    urls = "\n".join(preferred_sites)

    url = f"https://api.search.brave.com/res/v1/web/search?q={query}"
    headers = {
        "Accept": "application/json",
        "X-Subscription-Token": os.getenv("BRAVE_API_KEY")
    }

    try:
        response = requests.get(url, headers=headers)
        results = response.json().get("web", {}).get("results", [])
        urls_all = [item.get("url") for item in results if item.get("url")]  
        urls_filtered = [u for u in urls_all if any(i in u for i in urls)]

        urls_filtered_5 = random.sample(urls_filtered, min(5, len(urls_filtered)))
    except (requests.exceptions.RequestException, UnicodeDecodeError) as e:
        print(f" Error fetching content for {query}: {e}")

    prior_outputs = []
    for m in ctx.messages:
        for p in m.parts:
            if p.part_kind == "tool-return" and p.tool_name == "rewrite_user_query":
                prior_outputs.append(p.content)

    prior_text = "\n".join(str(x) for x in prior_outputs)

    prompt = f"""
    User query:
    {query}
    
    Prior clarification:
    {prior_text}
    """.strip()

    callback = NamedCallback(websearch_agent)

    results = await websearch_agent.run(
        user_prompt=prompt, 
        event_stream_handler=callback, 

    )
    return results



In [8]:

@orchestrator.tool
async def web_page_content(ctx: RunContext, url:str, query:str, config: AgentConfig = None):
    """Fetch page content through the reader proxy and summarize it."""
    reader_url_prefix = "https://r.jina.ai/"
    reader_url = reader_url_prefix + url

    try:
        response = requests.get(reader_url, timeout=45)
        response.raise_for_status()  # raises for 4xx/5xx HTTP errors
        content = response.content.decode("utf-8")
    except (requests.exceptions.RequestException, UnicodeDecodeError) as e:
        # Optional: log or print the error for debugging
        print(f"Error fetching content from {url}: {e}")
        # return None

    prior_outputs = []
    for m in ctx.messages:
        for p in m.parts:
            if p.part_kind == "tool-return" and p.tool_name == "search_web":
                prior_outputs.append(p.content)

    prior_text = "\n".join(str(x) for x in prior_outputs)

    prompt = f"""
    User query:
    {query}
    
    Prior clarification:
    {prior_text}
    """.strip()

    callback = NamedCallback(websearch_agent)

    results = await websearch_agent.run(
        user_prompt=prompt, 
        event_stream_handler=callback, 
        output_type=ResearchReport
    )
    return results.output.format_response()

In [10]:
message_history = []
question = "alzheimer's and coffee"
orchestrator_results = await orchestrator.run(
    user_prompt=question,
    message_history=message_history,
    event_stream_handler=orchestrator_callback,
    output_type = ResultResponse
)

TOOL CALL (orchestrator): rewrite_user_query({"query":"alzheimer's and coffee"})
TOOL CALL (orchestrator): vector_search({"query":"1. How does coffee consumption impact Alzheimer's disease?  \n2. What is the relationship between caffeine intake and Alzheimer's risk?  \n3. Can drinking coffee help prevent or alleviate symptoms of Alzheimer's?","config":{"model":"openai:gpt-4o-mini","index_name":"huberman","num_result":15}})
TOOL CALL (orchestrator): final_result({"context":"I have explored the relationship between coffee consumption, caffeine intake, and Alzheimer's disease, including its potential impacts on symptoms and risk. The findings reveal significant associations between coffee and Alzheimer's.","sections":[{"heading":"Coffee Consumption and Alzheimer's Disease","content":"Coffee consumption has been linked to a lower risk of developing Alzheimer's disease in numerous studies. Specifically, caffeine, a major component of coffee, has been investigated for its neuroprotective eff

In [11]:
from utils import print_messages
messages = orchestrator_results.new_messages()
message_history.extend(messages)
print_messages(messages)

request
user-prompt
alzheimer's and coffee


response
tool-call
rewrite_user_query {"query":"alzheimer's and coffee"}


request
tool-return
<class '__main__.RewriteResponse'> rewrites="1. How does coffee consumption impact Alzheimer's disease?  \n2. What is the relationship between caffeine intake and Alzheimer's risk?  \n3. Can drinking coffee help prevent or alleviate symptoms of Alzheimer's?"


response
tool-call
vector_search {"query":"1. How does coffee consumption impact Alzheimer's disease?  \n2. What is the relationship between caffeine intake and Alzheimer's risk?  \n3. Can drinking coffee help prevent or alleviate symptoms of Alzheimer's?","config":{"model":"openai:gpt-4o-mini","index_name":"huberman","num_result":15}}


request
tool-return
<class 'str'> ### Your Question

What is the relationship between coffee consumption, caffeine intake, and Alzheimer's disease, including potential impacts on symptoms and risk?

### Coffee Consumption and Alzheimer's Disease

Coffee consu

In [None]:
# message_history = []
# question = "alzheimer's and coffee"
# orchestrator_results = await orchestrator.run(
#     user_prompt=question,
#     message_history=message_history,
#     event_stream_handler=orchestrator_callback,
# )

In [None]:
# from utils import print_messages
# messages = orchestrator_results.new_messages()
# message_history.extend(messages)
# print_messages(messages)