## Self-Reflective Research (Research Crew)

Research Scout (Agent): Generates highly specific search queries based on the Mapper Agent's key entities and flagged claims.
* Output structure: Initial raw search results (Snippets, URLs).

Critique Agent: Reads the raw results, identifies contradictions, missing sources, or lack of factual support for the claims identified in Stage 1 (Mapper Agent).
* Output structure: 
    * **Critique Report**: A structured critique that generates **new, refined queries** to fill the gaps.

Orchestrator: **(Self-Correction Loop)** -> If the Critique Agent finds gaps, the Orchestrator sends the refined queries back to the Research Scout for iteration (repeat search/ critique).
* Output structure: Verified, reliable set of retrieved documents.

In [1]:
from openai import AsyncOpenAI
from agents import Agent, trace, Runner, function_tool, OpenAIChatCompletionsModel
from agents.model_settings import ModelSettings
from pydantic import BaseModel, Field, conlist
import asyncio
from typing import Dict
import requests
import os
from datetime import datetime
from IPython.display import Markdown, display
from collections import namedtuple

### Get the Model

In [4]:
# Models

# get the API key
google_api_key = os.getenv("GOOGLE_API_KEY")

# connect to endpoints (necessary for Google but not OpenAI)
GEMINI_BASE_URL = "https://generativelanguage.googleapis.com/v1beta/openai/"

gemini_client = AsyncOpenAI(base_url=GEMINI_BASE_URL, api_key=google_api_key)

gemini_model = OpenAIChatCompletionsModel(model="gemini-2.0-flash", openai_client=gemini_client)

### Mapper Agent: Structured Outputs

The **Research crew** built out in this notebook expects a *MapperAgentOutput* object as the intial input so I am including these classes below.

In [12]:
class KnowledgeTriplet(BaseModel):
    """Represents a single relationship (edge) between two concepts (nodes) 
    extracted directly from the user's input."""
    
    subject: str = Field(description="The primary concept or entity (Node) in the relationship.")
    
    # Standardized predicates based on input analysis
    # Use one of the following: 'IMPLIES', 'ASSUMES', 'INQUIRES', 'FOCUSES_ON'.
    predicate: str = Field(description="The nature of the relationship (Edge) between the subject and object (e.g., 'IMPLIES', 'ASSUMES', 'INQUIRES').")
    
    object: str = Field(description="The secondary concept, claim, or assumption (Node) related to the subject.")
    
class MapperAgentOutput(BaseModel):
    """The complete structured output for the Mapper Agent."""
    
    # The list of relationships forms the Draft Knowledge Graph
    draft_knowledge_graph: list[KnowledgeTriplet] = Field(
        description="A list of KnowledgeTriplet objects representing the conceptual map of the user's input only."
    )
    
    # The key decision flag for the next stage
    research_needed: bool = Field(
        description="True if the input is a knowledge-based query ('What are the implications of X?') requiring external evidence. \
            False if it is a simple planning query ('How to build Y?')."
    )

### Research crew: Structured Outputs

In [2]:
# Search Query Item
class SearchQueryItem(BaseModel):
    """A single, highly specific search query."""
    
    focus_entity: str = Field(
        description="They key entity or claim this search query is designed to investigate (e.g., 'four-day work week economic viability')."
    )
    
    query: str = Field(
        description="The specific keyword or question for the search engine."
    )
    
# Research Scout Output
class ResearchScoutOutput(BaseModel):
    """A collection of generated search queries."""
    
    search_queries: list[SearchQueryItem] = Field(
        description="A list of specific search queries formulated to investigate the concepts and claims from the Draft Knowledge Graph."
    )
    
# Critique Agent Outputs
class RefinedQuery(BaseModel):
    """A new or refined query generated to address a gap in the current evidence."""
    
    reason_for_refinement: str = Field(
        description="Explains why the current evidence is insufficient (e.g., 'Sources were too old', 'Contradictory data found', 'Missing financial data')."
    )
    
    new_query: str = Field(
        description="A highly focused, specific query to fill the identified gap."
    )
    
class CritiqueAgentOutput(BaseModel):
    """The structured report on the quality of the current research results."""
    
    gaps_found: bool = Field(
        description="True if the current raw results are incomplete, contradictory, or lack sufficient factual support. This triggers the self-correction loop."
    )
    
    critique_summary: str = Field(
        description="A brief summary of the overall evidence quality and the primary missing pieces."
    )
    
    refined_queries: list[RefinedQuery] = Field(
        description="A list of new queries to be executed if gaps_found is True."
    )

### Research Scout Agent

In [5]:
# Research Scout Agent
SCOUT_AGENT_INSTRUCTIONS = """You are the Research Scout for the Conceptual Pathfinder Agent.
Your input is a Draft Knowledge Graph (a list of triplets like 'Concept [IMPLIES] Claim) from the Mapper Agent.

**Your Goal:** Generate a set of highly specific, factual search queries to gather evidence for the claims and questions flagged in the graph.
Do NOT introduce general planning queries.
1. **Prioritize:** Focus searches on any Claims or Concepts connected by the 'INQUIRIES' predicate.
2. **Output:** Provide a list of SearchQueryItem objects ready for execution."""

research_scout_agent = Agent(
    name="Research Scout",
    instructions=SCOUT_AGENT_INSTRUCTIONS,
    model=gemini_model,
    output_type=ResearchScoutOutput,
)



### Critique Agent

In [6]:
# Critique Agent
CRITIQUE_AGENT_INSTRUCTIONS = """You are the Critique Agent.
Your input is the set of initial claims AND the raw, summarized search results.

**Your Goal:** Evaluate the raw search summaries against the original claims for completeness, factual support, and contradiction.
1. **Identify Gaps:** Look for contradictions, summaries that are too generic, or claims with zero factual support.
2. **Flag Self-Correction:** Set 'gaps_found' to True if the current evidence is unreliable or insufficient to definitively answer the query.
3. **Refine Queries:** If gaps are found, generate highly specific 'new_query' items to target the missing information."""

critique_agent = Agent(
    name="Critique Agent",
    instructions=CRITIQUE_AGENT_INSTRUCTIONS,
    model=gemini_model,
    output_type=CritiqueAgentOutput,
)

### Orchestrator and Search Execution Logic with Serper

#### Serper web search `@function_tool`

In [7]:
@function_tool
def serper_search(query: str) -> str:
    """Peforms a web search using the Serper API and returns a structured string of results"""
    
    SERPER_API_KEY = os.getenv("SERPER_API_KEY")
    if not SERPER_API_KEY:
        return "Error: Serper API key not found."
    url = "https://google.serper.dev/search"
    payload = {"q": query}
    headers = {
        'X-API-KEY': SERPER_API_KEY,
        'Content-Type': 'application/json'
    }
    
    try:
        response = requests.post(url, headers=headers, json=payload)
        response.raise_for_status() # Raise an exception for bad status codes
        
        data = response.json()
        
        # --- Format the results for the Agent ---
        # The agent performs better with a concise, text-based summary.
        formatted_results = []
        
        # Prioritize 'organic' (general web) results
        if 'organic' in data:
            for item in data['organic'][:3]: # Take the top 3 results
                formatted_results.append(f"Title: {item.get('title')}\nSnippet: {item.get('snippet')}\nURL: {item.get('link')}")
                
        # Also include 'answerBox' or 'knowledgeGraph' for direct answers
        elif 'answerBox' in data and data['answerBox'].get('snippet'):
            formatted_results.insert(0, f"Direct Answer: {data['answerBox']['snippet']}")
        elif 'knowledgeGraph' in data and data['knowledgeGraph'].get('snippet'):
            formatted_results.insert(0, f"Direct Answer: {data['knowledgeGraph']['snippet']}")
            
        return "\n---\n".join(formatted_results) if formatted_results else "No relevant search results found."

    except requests.exceptions.RequestException as e:
        return f"Search Error: Failed to connect to Serper API. Details: {e}"
    

#### Search Agent

In [8]:
# Search Agent instructions
SEARCH_AGENT_INSTRUCTIONS = """You are a research assistant. Given a term you search the web for that term and
produce a concise summary of the results. The summary should be no more than 200 words.
Capture the main points. Write succinctly, no need to have complete sentences or good gramary. 
This will be consumed by another Agent synthesizing a report, so it's vital that you capture the
essence and ignore all the fluff. Do not include any additional commentary other than the summary itself.
"""

search_agent = Agent(
    name="Search Agent",
    instructions=SEARCH_AGENT_INSTRUCTIONS,
    tools=[serper_search],
    model=gemini_model,
    model_settings=ModelSettings(tool_choice="required"),
)


#### Execute single search function

In [10]:
async def _execute_single_search(item: SearchQueryItem):
    """Internal helper to execute a single search using the Search Agent."""
    
    input = f"Search item: {item.query}\nFocus: {item.focus_entity}"
    result = await Runner.run(search_agent, input)
    
    # Return the query and the summarized result together for the Critique Agent
    return {
        "query": item.query,
        "focus": item.focus_entity,
        "summary": result.final_output # This is the 200 word summary from Search Agent
    }

#### Run the research crew

This is where the input from the **Mapper Agent** will be given to the agents.

In [13]:
async def run_research_crew(draft_graph_output: MapperAgentOutput) -> list[dict]:
    """
    The Orchestrator function. Manages the self-correction loop.
    Input: The structured output from the Mapper Agent.
    Output: A verified list of search result summaries.
    """
    
    # 1. Inintial Planning (Scout)
    scout_plan_result = await Runner.run(research_scout_agent, draft_graph_output.draft_knowledge_graph)
    current_queries: list[SearchQueryItem] = scout_plan_result.final_output.search_queries
    
    MAX_ITERATIONS = 3
    final_verified_results = []
    
    for iteration in range(MAX_ITERATIONS):
        print(f"\n--- Research Iteration {iteration + 1} ---")
        
        # 2. Execute searches concurrently
        tasks = [asyncio.create_task(_execute_single_search(q)) for q in current_queries]
        raw_results = await asyncio.gather(*tasks)
        
        # Consolidate results for the Critique Agent's input
        results_text = "\n\n".join([f"QUERY: {r['query']}\nSUMMARY: {r['summary']}" for r in raw_results])
        
        # The Critique Agent needs the claims and the new results
        critique_input = f"CLAIMS TO VALIDATE: {draft_graph_output.draft_knowledge_graph}\n\nCURRENT RAW RESULTS:\n{results_text}"
        
        # 3. Run the Critique Agent
        critique_result = await Runner.run(critique_agent, critique_input)
        critique_report: CritiqueAgentOutput = critique_result.final_output
        
        final_verified_results.extend(raw_results) # Add current results to the final list
        
        # 4. Check for Gaps (Self-Correction Logic)
        if not critique_report.gaps_found:
            print("Critique successful: Evidence verified and sufficient.")
            return final_verified_results
        
        # If gaps found, prepare for next iteration
        print(f"Critique found gaps. Refining queries for next iteration. Summary: {critique_report.critique_summary}")
        current_queries = [
            SearchQueryItem(focus_entity=q.reason_for_refinement, query=q.new_query)
            for q in critique_report.refined_queries
        ]
        
        # Fallback if max iteration reached
        print("\nWARNING: Max research iterations reached. Returning unverified results.")
        return final_verified_results
        

### Testing

In [14]:
# Test prompt (expects external research to be required)
external_research_query = "Is a four-day work week economically viable for all businesses, or does it only succeed in the tech sector?"