# 🤖 Multi-Agent Deep Research System

This notebook showcases how to harness the combined power of **AutoGen** and **LangChain** tools to automate and elevate deep research workflows. At its core, the system coordinates a network of specialized agents—each executing a distinct role in the research and report generation workflow. Together, these agents collect data, analyze findings, and produce polished, insight-driven reports.

[Open in Colab](https://colab.research.google.com/github/miztiik/taars/blob/master/notebooks/deepresearch_w_autogen_langchain_tools.ipynb) <a href="https://colab.research.google.com/github/miztiik/taars/blob/master/notebooks/deepresearch_w_autogen_langchain_tools.ipynb" target="_blank"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## 🧠 Agent Roles

- **🧭 Planner**: Defines research scope, objectives, and success criteria
- **🔍 Researcher**: Gathers evidence using `wiki_search`, `web_search` and `web_fetch` for content extraction
- **🧪 Critic**: Reviews plans and outputs for quality and completeness
- **✍️ Editor**: Formats final reports with proper citations and structure

## 🔄 System Architecture

```mermaid
flowchart TD
    A[User Query] --> B[🧭 Planner]
    B --> C[🔍 Researcher]
    C --> D[🧪 Critic]
    D --> E[✍️ Editor]
    E --> F[📄 Final Report]

    B -.-> D
    C -.-> B
    D -.-> C

    C --> G[Wiki Search]
    C --> H[Web Search]
    C --> I[Content Fetch]

```

## 🚀 Quick Start

### Prerequisites

Set environment variables:

```bash
# Required: Gemini API (or configure other models in cells below)
export GEMINI_API_KEY="your_api_key"
export GEMINI_MODEL_NAME="gemini-1.5-flash"
export GEMINI_BASE_URL="https://generativelanguage.googleapis.com/v1beta/"
```

### Run Research Task

1. **Execute all cells** in sequence
2. **Modify the query** in the final cell:
   ```python
   output = asyncio.run(run_task("Your research question here"))
   ```
3. **Monitor progress** in real-time through cell outputs

## 📋 Example Queries

- **Financial Analysis**: `"Indian steel sector growth prospects in an era of US tariffs"`
- **Economic Research**: `"Government factors that improved Indian economy during Modi era"`
- **Tech Industry**: `"Are we witnessing an AI infrastructure bubble? GDP investment vs productivity gains"`
- **AI/ML Trends**: `"Current trends in tool usage during LLM training"`

## 📊 Outputs & Monitoring

| Output Type | Location                         | Description                               |
| ----------- | -------------------------------- | ----------------------------------------- |
| **Reports** | `./reports/`                     | Timestamped Markdown reports (auto-saved) |
| **Logs**    | `./logs/deep_research_agent.log` | Detailed execution logs with token usage  |
| **State**   | `./team_state.json`              | Conversation state for resume capability  |

## 📝 TODO & Roadmap

- [ ] **Specialized Models**: Different LLMs for different agent roles (planning vs research vs writing)
- [ ] **Semantic Depth Search**: Advanced content extraction with semantic similarity scoring
- [ ] **Source Verification**: Cross-reference validation and fact-checking workflows
- [ ] **Domain-Specific Tools**: Specialized research tools for finance, science, law, etc.
- [ ] **Agent Control Flow Logging**: Meaningful event logging for agent-to-agent handoffs
- [ ] **Organic Flow Orchestration**: Improved prompts for natural, adaptive conversation flow
- [ ] **Streaming UI**: Real-time progress visualization and intervention capability
- [ ] **Performance Metrics**: Research quality scoring and optimization analytics


In [2]:
%%capture --no-stderr

%pip install -qU ipykernel
%pip install -qU autogen-agentchat
%pip install -qU autogen-ext

%pip install -qU loguru

%pip install -qU langchain
%pip install -qU langchain-community
%pip install -qU wikipedia
%pip install -qU selenium unstructured
%pip install -qU lxml
%pip install -qU ddgs


ERROR: pip's dependency resolver does not currently take into account all the packages that are installed. This behaviour is the source of the following dependency conflicts.
htmldate 1.9.3 requires lxml<6,>=5.3.0; platform_system != "Darwin" or python_version > "3.8", but you have lxml 6.0.1 which is incompatible.


In [None]:
## GOOGLE COLAB LINE WRAPPING
# https://stackoverflow.com/questions/58890109/line-wrapping-in-collaboratory-google-results

from IPython.display import HTML, display


def set_css():
    display(
        HTML(
            """
  <style>
    pre {
        white-space: pre-wrap;
    }
  </style>
  """
        )
    )


get_ipython().events.register("pre_run_cell", set_css)


In [None]:
# %load_ext autoreload
# %autoreload 2
# %aimport -langchain_community
# Automatically reload modules before executing code

# https://ipython.org/ipython-doc/3/config/extensions/autoreload.html


In [None]:
## GENERIC IMPORTS
import asyncio
import json
import os
from pathlib import Path
import re
import sys
import time
from datetime import datetime
from typing import Any, Dict, List, Optional, Annotated

import nest_asyncio
import tenacity
from loguru import logger
from IPython.display import Markdown, display


In [None]:
## CONSTANTS

## LOG CONFIG
LOG_ROTATION_SIZE = "10 MB"
LOG_RETENTION_DAYS = "7 days"

## AUTOGEN CONFIG
CONVERSATION_BUFFER_SIZE = 10
TASK_TERMINATION_MAX_MESSAGES = 30
API_CALL_DELAY_SECONDS = int(
    os.environ.get("API_CALL_DELAY_SECONDS", "45")
)  # 45 seconds = 1.33 RPM
NON_API_EVENT_DELAY = 0.5  # Small delay for non-API events

## TOOL CONFIG
WIKI_MAX_RESULTS = 5
WIKI_MAX_CHARS = 5000
WIKIPEDIA_MAX_DOCS = 2
DDGS_MAX_RESULTS = 10
WEB_CONTENT_MAX_LENGTH = 15000
WEB_CONTENT_MIN_LENGTH = 50
WEB_BATCH_MAX_URLS = 10
SELENIUM_WINDOW_SIZE = "1920,1080"
BROWSER_USER_AGENT = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/127.0.0.0 Safari/537.36"
TITLE_SLICE_LENGTH = 50
TOP_RESULTS_COUNT = 3
TASK_MAX_MEANINGFUL_WORDS = 3
CRITIC_MAX_WORDS = 500
URL_FETCH_DELAY = 2


In [None]:
## LOGGING CONFIG
notebook_dir = Path.cwd()
log_dir = notebook_dir / "logs"
log_file = log_dir / "deep_research_agent.log"
log_dir.mkdir(exist_ok=True)

if not getattr(sys, "_loguru_configured", False):
    logger.remove()
    logger.add(
        str(log_file),
        level="DEBUG",
        rotation="10 MB",
        retention="7 days",
        compression="zip",
        enqueue=True,
    )
    logger.add(
        sys.stderr,
        colorize=True,
    )
    sys._loguru_configured = True

logger.info(f"✅ Logging configured successfully")


In [None]:
## IMPORTS
from autogen_agentchat.agents import AssistantAgent
from autogen_agentchat.teams import SelectorGroupChat
from autogen_agentchat.messages import StopMessage
from autogen_agentchat.conditions import MaxMessageTermination, TextMentionTermination
from autogen_core.model_context import BufferedChatCompletionContext
from autogen_core.tools import Tool
from autogen_ext.models.openai import OpenAIChatCompletionClient
from autogen_agentchat.messages import TextMessage


from langchain_community.tools import WikipediaQueryRun
from langchain_community.utilities import WikipediaAPIWrapper
from langchain_community.document_loaders import WikipediaLoader
from langchain_community.document_loaders import SeleniumURLLoader
from ddgs import DDGS


In [None]:
## GEMINI MODEL CLIENT
from autogen_ext.models.openai import OpenAIChatCompletionClient
from autogen_core.models import ModelInfo


# Confirm the API key is set
assert os.environ["GEMINI_API_KEY"], "GEMINI_API_KEY is not set"
assert os.environ["GEMINI_MODEL_NAME"], "GEMINI_MODEL_NAME is not set"
assert os.environ["GEMINI_BASE_URL"], "GEMINI_BASE_URL is not set"


gemini_model_info = ModelInfo(
    vision=False,
    function_calling=True,
    json_output=True,
    family=os.environ["GEMINI_MODEL_NAME"],
    structured_output=True,
)

gemini_model_client = OpenAIChatCompletionClient(
    model=os.environ["GEMINI_MODEL_NAME"],
    api_key=os.environ["GEMINI_API_KEY"],
    base_url=os.environ["GEMINI_BASE_URL"],
    model_info=gemini_model_info,
    max_retries=2,
    parallel_tool_calls=False,
)


In [None]:
## AZURE OPENAI MODEL CLIENT
# from autogen_ext.models.openai import AzureOpenAIChatCompletionClient

## Confirm the API key is set
# assert os.environ["AZURE_OAI_DEPLOYMENT"], "AZURE_OAI_DEPLOYMENT is not set"
# assert os.environ["AZURE_OAI_MODEL_NAME"], "AZURE_OAI_MODEL_NAME is not set"
# assert os.environ["AZURE_OAI_MODEL_VERSION"], "AZURE_OAI_MODEL_VERSION is not set"
# assert os.environ["AZURE_OAI_BASE_URL"], "AZURE_OAI_BASE_URL is not set"

# az_oai_model_client = AzureOpenAIChatCompletionClient(
#     azure_deployment=os.environ["AZURE_OAI_DEPLOYMENT"],
#     model=os.environ["AZURE_OAI_MODEL_NAME"],
#     api_version=os.environ["AZURE_OAI_MODEL_VERSION"],
#     azure_endpoint=os.environ["AZURE_OAI_BASE_URL"],

# )


In [None]:
## AGENT TOOLS

# Wiki Search Tool
wiki_api = WikipediaAPIWrapper(
    top_k_results=WIKI_MAX_RESULTS, doc_content_chars_max=WIKI_MAX_CHARS
)
wiki_tool = WikipediaQueryRun(api_wrapper=wiki_api)


def wiki_full_search(input: str) -> str:
    """Search Wikipedia for a query and return maximum 2 results."""
    logger.info(f"🔍 wiki_full_search: Starting search for '{input}'")

    try:
        search_docs = WikipediaLoader(
            query=input, load_max_docs=WIKIPEDIA_MAX_DOCS
        ).load()

        if not search_docs:
            logger.warning(f"⚠️ wiki_full_search: No documents found for '{input}'")
            return f"No Wikipedia articles found for query: {input}"

        logger.info(
            f"✅ wiki_full_search: Found {len(search_docs)} documents for '{input}'"
        )

        formatted_search_docs = "\n\n---\n\n".join(
            [
                f'<Document source="{doc.metadata["source"]}" page="{doc.metadata.get("page", "")}"/>\n{doc.page_content}\n</Document>'
                for doc in search_docs
            ]
        )

        content_length = len(formatted_search_docs)
        logger.info(
            f"📊 wiki_full_search: Returning {content_length} characters for '{input}'"
        )

        return formatted_search_docs

    except Exception as e:
        logger.error(f"❌ wiki_full_search failed for '{input}': {str(e)}")
        return f"Error searching Wikipedia for '{input}': {str(e)}"


def wiki_search(q: str) -> dict:
    """Return structured output including text and source."""
    logger.info(f"🔍 wiki_search: Starting search for '{q}'")

    try:
        result = wiki_tool.run(q)

        if not result or result.strip() == "":
            logger.warning(f"⚠️ wiki_search: Empty result for '{q}'")
            return {"text": "No results found", "source": "Wikipedia", "query": q}

        result_length = len(result)
        logger.info(f"✅ wiki_search: Retrieved {result_length} characters for '{q}'")

        return {"text": result, "source": "Wikipedia", "query": q}

    except Exception as e:
        logger.error(f"❌ wiki_search failed for '{q}': {str(e)}")
        return {"text": f"Error: {str(e)}", "source": None, "query": q}


# Web Search Tool
def web_search(q: str) -> dict:
    """Search the web using DuckDuckGo for information."""
    logger.info(f"🌐 web_search: Starting web search for '{q}'")

    try:
        results = DDGS().text(
            q, region="us-en", safesearch="off", max_results=DDGS_MAX_RESULTS
        )

        if not results:
            logger.warning(f"⚠️ web_search: No results found for '{q}'")
            return {
                "text": [],
                "source": "DuckDuckGo",
                "query": q,
                "results_found": 0,
            }

        logger.info(f"✅ web_search: Found {len(results)} results for '{q}'")

        # Log sample of top results for debugging
        if results:
            top_titles = [
                r.get("title", "No title")[:TITLE_SLICE_LENGTH]
                for r in results[:TOP_RESULTS_COUNT]
            ]
            logger.debug(f"📋 web_search: Top results for '{q}': {top_titles}")

        return {
            "text": results,
            "source": "DuckDuckGo",
            "query": q,
            "results_found": len(results),
        }

    except Exception as e:
        logger.error(f"❌ web_search failed for '{q}': {str(e)}")
        return {"text": f"Error: {str(e)}", "source": None, "query": q}


def web_fetch(
    url: str, max_content_length: int = WEB_CONTENT_MAX_LENGTH
) -> Dict[str, Any]:
    """
    Fetch web page content using Selenium for JavaScript-heavy sites.

    Args:
        url: The URL to fetch content from
        max_content_length: Maximum content length to return. Defaults to WEB_CONTENT_MAX_LENGTH

    Returns:
        Dict with keys: content, url, status, error (if any)
    """
    logger.info(
        f"🌐 web_fetch: Starting fetch for {url} (max_length: {max_content_length})"
    )

    loader = None

    try:
        # Validate URL
        if not url or not url.startswith(("http://", "https://")):
            return {
                "content": "",
                "url": url,
                "status": "error",
                "error": "Invalid URL format",
            }

        # Configure Selenium loader
        loader = SeleniumURLLoader(
            urls=[url],
            continue_on_failure=True,
            arguments=_get_selenium_arguments(),
            browser="chrome",
        )

        # Load content
        logger.info(f"📥 web_fetch: Loading content from {url}")
        documents = loader.load()

        if not documents:
            logger.warning(f"⚠️ web_fetch: No documents loaded from {url}")
            return {
                "content": "",
                "url": url,
                "status": "error",
                "error": "No content could be loaded from URL",
            }

        # Process content
        content = documents[0].page_content.strip()
        original_length = len(content)

        logger.debug(f"📊 web_fetch: Loaded {original_length} characters from {url}")

        if len(content) < WEB_CONTENT_MIN_LENGTH:
            logger.warning(
                f"⚠️ web_fetch: Content too short ({len(content)} chars) from {url}"
            )
            return {
                "content": content,
                "url": url,
                "status": "warning",
                "error": "Content appears too short, may indicate loading issues",
            }

        # Truncate if too long
        if len(content) > max_content_length:
            content = content[:max_content_length] + "\n\n[Content truncated...]"
            logger.info(
                f"✂️ web_fetch: Content truncated from {original_length} to {max_content_length} chars for {url}"
            )

        logger.info(
            f"✅ web_fetch: Successfully fetched {len(content)} characters from {url}"
        )

        return {"content": content, "url": url, "status": "success", "error": None}

    except Exception as e:
        logger.error(f"❌ web_fetch failed for {url}: {str(e)}")
        return {
            "content": "",
            "url": url,
            "status": "error",
            "error": f"Failed to fetch content: {str(e)}",
        }

    finally:
        # CRITICAL: Clean up browser resources
        if loader and hasattr(loader, "web_driver") and loader.web_driver:
            try:
                loader.web_driver.quit()
                logger.debug(f"🧹 web_fetch: Browser cleaned up for {url}")
            except Exception as cleanup_error:
                logger.warning(f"⚠️ web_fetch: Browser cleanup failed: {cleanup_error}")


def _get_selenium_arguments() -> List[str]:
    """Get optimized Selenium browser arguments for reliability and stealth.
    Returns:
        List of browser arguments
    """
    return [
        # Core stability
        "--headless",
        "--no-sandbox",
        "--disable-dev-shm-usage",
        "--disable-gpu",
        f"--window-size={SELENIUM_WINDOW_SIZE}",
        # Performance
        "--disable-extensions",
        "--disable-plugins",
        "--disable-images",
        "--disable-javascript",
        # Stealth and compatibility
        "--disable-blink-features=AutomationControlled",
        f"--user-agent={BROWSER_USER_AGENT}",
        # GDPR/Cookie banner handling
        "--disable-notifications",
        "--disable-infobars",
        "--disable-default-apps",
        # Security bypasses (use cautiously)
        "--ignore-certificate-errors",
        "--ignore-ssl-errors",
        "--allow-running-insecure-content",
    ]


def web_fetch_multiple(
    urls: List[str], max_content_length: int = WEB_CONTENT_MAX_LENGTH
) -> Dict[str, Any]:
    """
    Fetch content from multiple URLs efficiently.

    Args:
        urls: List of URLs to fetch
        max_content_length: Maximum content length per URL

    Returns:
        Dict with results for each URL and summary statistics
    """
    logger.info(f"🌐 web_fetch_multiple: Starting batch fetch for {len(urls)} URLs")

    if not urls or len(urls) > WEB_BATCH_MAX_URLS:
        logger.warning(
            f"⚠️ web_fetch_multiple: Invalid URL list - {len(urls) if urls else 0} URLs (max {WEB_BATCH_MAX_URLS})"
        )
        return {
            "results": [],
            "status": "error",
            "error": f"Invalid URL list (empty or too many URLs, max {WEB_BATCH_MAX_URLS})",
        }

    results = []
    success_count = 0

    logger.debug(
        f"📋 web_fetch_multiple: Processing URLs: {[f'{url[:TITLE_SLICE_LENGTH]}...' if len(url) > TITLE_SLICE_LENGTH else url for url in urls]}"
    )

    for i, url in enumerate(urls, 1):
        logger.debug(f"🔄 web_fetch_multiple: Processing URL {i}/{len(urls)}: {url}")

        result = web_fetch(url, max_content_length)
        results.append(result)

        if result["status"] == "success":
            success_count += 1
            logger.debug(f"✅ web_fetch_multiple: URL {i}/{len(urls)} successful")
        else:
            logger.debug(
                f"❌ web_fetch_multiple: URL {i}/{len(urls)} failed: {result.get('error', 'Unknown error')}"
            )

        # Brief delay to be respectful
        time.sleep(URL_FETCH_DELAY)

    logger.info(
        f"🏁 web_fetch_multiple: Completed batch - {success_count}/{len(urls)} successful"
    )

    return {
        "results": results,
        "total_urls": len(urls),
        "successful": success_count,
        "failed": len(urls) - success_count,
        "status": "completed",
    }


def save_report(
    content: str, task_description: str, reports_dir: str = "reports"
) -> Dict[str, Any]:
    """
    Save timestamped Markdown report to disk with auto-generated filename.

    Args:
        content: Report content (plain text or Markdown). Auto-adds title if missing.
        task_description: Brief task description for filename generation.
        reports_dir: Output directory (default: "reports"). Created if missing.

    Returns:
        Dict with keys: status ("success"/"error"), filepath, filename, error

    Examples:
        save_report("# Analysis\n\nFindings...", "market research 2024")
        # → reports/20250821_1430_market_research.md

        save_report(draft_text, "AI impact assessment")
        # → reports/20250821_1431_ai_impact.md

    Filename: YYYYMMDD_HHMM_key_words.md (auto-numbered if exists)
    """
    try:
        # Ensure reports directory exists
        reports_path = Path(reports_dir)
        reports_path.mkdir(exist_ok=True)

        # Generate timestamped filename
        timestamp = datetime.now()
        date_time = timestamp.strftime("%Y%m%d_%H%M")
        task_name = _extract_task_name(task_description)

        filename = f"{date_time}_{task_name}.md"
        filepath = reports_path / filename

        # Handle filename conflicts with counter
        counter = 1
        while filepath.exists():
            filename = f"{date_time}_{task_name}_{counter}.md"
            filepath = reports_path / filename
            counter += 1

        # Format content with title if needed
        formatted_content = _format_content(content, task_description, timestamp)

        # Save to disk using Path object
        filepath.write_text(formatted_content, encoding="utf-8")
        logger.info(f"📄 Report saved: {filepath}")

        return {
            "status": "success",
            "filepath": str(filepath),
            "filename": filename,
            "timestamp": timestamp.isoformat(),
        }

    except Exception as e:
        error_msg = f"Failed to save report: {str(e)}"
        logger.error(f"❌ {error_msg}")
        return {"status": "error", "error": error_msg, "task": task_description}


def _extract_task_name(task: str) -> str:
    """
    Extract 2-3 meaningful words from task description for filename.

    Args:
        task: The task description string

    Returns:
        Underscore-separated words suitable for filename
    """
    # Clean special characters and normalize
    clean_task = re.sub(r"[^\w\s]", " ", task.lower())
    words = [w for w in clean_task.split() if len(w) > 2]

    # Filter common stop words
    stop_words = {
        "the",
        "and",
        "for",
        "with",
        "from",
        "about",
        "into",
        "through",
        "during",
        "before",
        "after",
        "above",
        "below",
        "over",
        "under",
    }
    meaningful = [w for w in words if w not in stop_words][:TASK_MAX_MEANINGFUL_WORDS]

    return "_".join(meaningful) if meaningful else "report"


def _format_content(content: str, task: str, timestamp: datetime) -> str:
    """
    Format content with title and timestamp if needed.

    Args:
        content: Raw content to format
        task: Task description for title generation
        timestamp: When the report was created

    Returns:
        Formatted Markdown content
    """
    # If content already has a Markdown title, use as-is
    if content.strip().startswith("#"):
        return content

    # Add title and timestamp for plain text content
    formatted_title = f"# Report: {task}"
    timestamp_line = f"*Generated: {timestamp.strftime('%Y-%m-%d %H:%M')}*"

    return f"{formatted_title}\n\n{timestamp_line}\n\n{content}"


In [None]:
## REGISTER FUNCTIONS AS TOOLS

wiki_search_tool = Tool.from_function(wiki_search, name="wiki_search")
web_search_tool = Tool.from_function(web_search, name="web_search")
web_fetch_tool = Tool.from_function(web_fetch, name="web_fetch")
web_fetch_multiple_tool = Tool.from_function(
    web_fetch_multiple, name="web_fetch_multiple"
)
save_report_tool = Tool.from_function(save_report, name="save_report")


In [None]:
## SYSTEM PROMPTS

### References
# https://docs.anthropic.com/en/docs/build-with-claude/prompt-engineering/claude-4-best-practices
# https://techcommunity.microsoft.com/blog/azure-ai-foundry-blog/prompt-engineering-for-openai%E2%80%99s-o1-and-o3-mini-reasoning-models/4374010

today_str = datetime.now().strftime("%Y-%m-%d")

PLANNER_SYSTEM_PROMPT = f"""You are a Strategic Research Planner. Define research scope and success criteria. Today is {today_str}.

ROLE FOCUS:
- Define WHAT needs to be researched (scope, objectives, key questions)
- Set measurable success criteria
- Identify critical knowledge gaps to fill
- DO NOT specify search strategies or tools (Researcher's domain)
- You ONLY plan and delegate tasks - you do not execute them yourself.
- Your TEAM MEMBERS are:
    - Researcher: Gathers evidence and conducts analysis
    - Critic: Evaluates plans and outputs
    - Editor: Polishes final reports

WORKFLOW:
1. Analyze user query: Extract core research questions
2. Define research objectives with measurable outcomes
3. Set success criteria that directly address user needs
4. Request Critic review: "Critic, evaluate this plan against the user query"
5. Declare "PLAN_COMPLETE" only after Critic approval

REPLANNING (when research fails):
1. Analyze what failed and why
2. Pivot strategy with new approach
3. Request Critic review of revised plan
4. Declare "REVISED_PLAN_COMPLETE" after approval

OUTPUT FORMAT:
## Research Objectives
[List 3-5 specific objectives]

## Key Research Questions
[Questions that directly address user query]

## Success Criteria
[Measurable outcomes that satisfy user requirements]

## Critical Knowledge Gaps
[What must be discovered to answer user query]

Today is {today_str}. Focus on research scope, not execution methodology."""

RESEARCHER_SYSTEM_PROMPT = f"""Senior Research Analyst. Gather comprehensive evidence using strategic tool selection. Today is {today_str}. Use this for context when needed.

TOOLS:
- wiki_search: Background, definitions, historical context
- web_search: Current events, market data, recent reports
- web_fetch: Deep content extraction from specific URLs (handles JS-heavy sites)
- web_fetch_multiple: Batch fetch multiple URLs efficiently

EXECUTION WORKFLOW:
0. Start only if the plan is approved by critic.
1. Foundation research (wiki_search)
2. Current intelligence (web_search, web_fetch, web_fetch_multiple)
4. Cross-validation and synthesis

COMPLETION SIGNALS:
- When research is sufficient, end with: "## RESEARCH COMPLETE"
- For incomplete research, end with: "## CONTINUING RESEARCH" 
- Never hand off raw tool outputs - always synthesize findings first

OUTPUT FORMAT:
# Evidence Summary: [Topic]
## Key Findings
[Synthesized insights from multiple sources]
## Current Intelligence  
[Recent developments and data]
## Source Assessment
[Quality and reliability of sources used]
## Research Gaps
[Areas needing further investigation]

## RESEARCH COMPLETE
[Only when comprehensive evidence gathered]

Focus on quality, not presentation. Hand off to Editor for final reports.
"""

CRITIC_SYSTEM_PROMPT = """You are a Domain-agnostic Critic. Provide concise, evidence-based critique. Adapt evaluation based on artifact type.

FORMAT:
1. ARTIFACT: Type and goal (1 line)
2. STRENGTHS AND CONTEXT CHECK: Does this address the goal? Up to 3 key strengths.
3. ISSUES: Highlight unclear or weak sections. Challenge assumptions. Label issues Critical/Major/Minor
4. VERACITY: Source verification requests. If sources could not be found, mark as unverified and opinion-based.
5. VERDICT: Accept/Revise/Reject + confidence level.
6. NEXT ACTIONS: [What needs to happen]

Respond "APPROVED" only when artifact fully serves user needs. Max 500 words total."""

EDITOR_SYSTEM_PROMPT = """You are a Editorial Agent. Transform evidence into polished reports. Adapt structure to content type. Focus on actionable insights with proper citations.

STANDARDS:
- Synthesis over summary
- Quantify impact with metrics
- Professional structure and tone
- Proper citations [1], [2]
- Use save_report tool for final output

CITATION FORMAT:
- Use reference IDs from researcher's URL references
- Format: "Key finding from recent study [1]"
- References section with each reference on a separate line. Format: [1] Source (URL)

TERMINATION: After save: "REPORT_SAVED. TASK_COMPLETED. TERMINATE"
"""


In [None]:
## AGENT SETUP


planner = AssistantAgent(
    name="Planner",
    model_client=gemini_model_client,
    system_message=PLANNER_SYSTEM_PROMPT,
    description="Creates and adapts research plans, handles replanning when research hits obstacles",
)

researcher = AssistantAgent(
    name="Researcher",
    description="Expert research agent that strategically uses multiple tools to gather comprehensive and factual evidence to produce well-researched draft reports.",
    model_client=gemini_model_client,
    tools=[wiki_search_tool, web_search_tool, web_fetch_tool, web_fetch_multiple_tool],
    reflect_on_tool_use=False,
    system_message=RESEARCHER_SYSTEM_PROMPT,
)

critic = AssistantAgent(
    name="Critic",
    model_client=gemini_model_client,
    system_message=CRITIC_SYSTEM_PROMPT,
    description="Reviews and provides constructive criticism; outputs 'APPROVED: [...]' when ready.",
)


editor = AssistantAgent(
    name="Editor",
    description="Formats approved drafts with proper citations, adapts structure to content type.",
    model_client=gemini_model_client,
    tools=[save_report_tool],
    system_message=EDITOR_SYSTEM_PROMPT,
)

# ---------- Selector prompt with few-shot examples ----------
selector_prompt = """Select an agent to perform task.
AGENT ROLES: 
{roles}

CONVERSATION HISTORY: 
{history}

RULES:
- Return the selected agent's name (no extra text).

Read the above conversation, then select an agent from {participants} to perform the next task.
When the task is complete, let the user approve or disapprove the task.
"""


# Team configuration with constants
max_messages = TASK_TERMINATION_MAX_MESSAGES
txt_termination = TextMentionTermination("TERMINATE")
termination_condition = (
    MaxMessageTermination(max_messages=max_messages) | txt_termination
)

# Build SelectorGroupChat

model_context = BufferedChatCompletionContext(buffer_size=CONVERSATION_BUFFER_SIZE)

team = SelectorGroupChat(
    name="Deep Research Team",
    description="A team of specialized agents working together to conduct deep research.",
    model_context=model_context,
    participants=[planner, researcher, critic, editor],
    model_client=gemini_model_client,
    selector_prompt=selector_prompt,
    termination_condition=termination_condition,
    emit_team_events=True,
)


In [None]:
## SETUP TASK RUN


# @tenacity.retry(
#     wait=tenacity.wait_exponential(multiplier=1, min=60, max=120),
#     stop=tenacity.stop_after_attempt(2),
#     retry=tenacity.retry_if_exception_type(Exception),
# )
async def run_task(task_text: str):
    """
    Execute a multi-agent research task with proper termination handling.

    Args:
        task_text (str): The research task description

    Returns:
        str | None: Final report content or None if task incomplete
    """
    final_report = None
    task_completed = False

    # Token tracking variables
    total_prompt_tokens = 0
    total_completion_tokens = 0
    total_tokens = 0

    try:
        logger.info(f"🚀 Starting task: {task_text}\n\n")

        async for event in team.run_stream(task=task_text):
            logger.info(f"📝 Event: {event}\n\n")

            # Track tokens if available
            if hasattr(event, "models_usage") and event.models_usage:
                current_prompt = event.models_usage.prompt_tokens
                current_completion = event.models_usage.completion_tokens
                current_total = current_prompt + current_completion

                # Update running totals
                total_prompt_tokens += current_prompt
                total_completion_tokens += current_completion
                total_tokens += current_total

            logger.info(
                f"\n📊 Token Usage: \n\tPrompts: {total_prompt_tokens} \n\tCompletions: {total_completion_tokens} \n\tTotal: {total_tokens}"
            )

            # Check for TERMINATE keyword in message content
            if hasattr(event, "content") and isinstance(event.content, str):
                if "TERMINATE" in event.content:
                    final_report = event.content.split("TERMINATE", 1)[0].strip()
                    task_completed = True
                    logger.info(f"\n\n✅ Task completed with TERMINATE signal")
                    logger.info(
                        f"📊 Final Token Usage - Total: {total_tokens} (prompt: {total_prompt_tokens}, completion: {total_completion_tokens})"
                    )
                    break

            if hasattr(event, "models_usage") and event.models_usage:
                # This event involved a model API call - apply rate limiting
                logger.info(
                    f"\n\n⏳ Model API call detected, waiting {API_CALL_DELAY_SECONDS} seconds.\n\n"
                )
                await asyncio.sleep(API_CALL_DELAY_SECONDS)
            else:
                # Non-API event (planning, internal processing) - minimal delay
                logger.debug(
                    f"\n\n💨 Non-API event, brief pause ({NON_API_EVENT_DELAY}s)\n\n"
                )
                await asyncio.sleep(NON_API_EVENT_DELAY)

        # Log final totals even if task didn't complete normally
        if not task_completed:
            logger.info(
                f"📊 Final Token Usage - Total: {total_tokens} (prompt: {total_prompt_tokens}, completion: {total_completion_tokens})"
            )

        return final_report if task_completed else None

    except Exception as e:
        logger.error(f"\n\n❌ Exception during task execution: {e}")
        logger.info(
            f"📊 Token Usage at Error - Total: {total_tokens} (prompt: {total_prompt_tokens}, completion: {total_completion_tokens})"
        )
        raise

    finally:
        await _save_team_state()


async def _save_team_state():
    """Helper function to save team state with error handling using Path objects."""
    try:
        state = await team.save_state()
        state_file = Path("team_state.json")

        with state_file.open("w") as f:
            json.dump(state, f, indent=2)
        logger.info("💾 Team state saved successfully")
    except Exception as e:
        logger.error(f"❌ Failed to save team state: {e}")


async def resume_from_saved_state():
    """Resume execution from previously saved team state using Path objects."""
    try:
        state_file = Path("team_state.json")

        with state_file.open("r") as f:
            saved_state = json.load(f)

        await team.load_state(saved_state)
        logger.info("🔄 Resuming from saved state")

        async for event in team.run_stream():
            logger.info(f"📝 Resume Event: {event}")

    except FileNotFoundError:
        logger.error("❌ No saved state file found")
    except Exception as e:
        logger.error(f"❌ Failed to resume from saved state: {e}")
        raise


In [None]:
## RUN TASK
nest_asyncio.apply()

if __name__ == "__main__":

    __SAMPLE_QUERY_1 = "Indian steel sector growth post modernization and growth prospects in an era of US tariffs and reduce government protection through trade barriers and cheaper import options from China"

    __SAMPLE_QUERY_2 = "How government and governance factors improved economy and lives of indians during Modi and Pre-Modi starting from 1991"

    __SAMPLE_QUERY_3 = "Sectoral growth based on cyclics for 2025 and macro economic pressure and trade tariffs and uncertainty, which sectors are best poised for maximum investment returns in terms of % for the next year for a moderate to average risk profile investments"

    __SAMPLE_QUERY_4 = "Hyperscaler investments in data centers and cloud infrastructure for AI growth is not matching the proposed productivity gains in GDP. Are we witnessing a bubble? Is % of global GDP being invested in AI infrastructure matches the productivity gain percentages?"

    __SAMPLE_QUERY_5 = "If neural networks are foundation of LLMs and based on the human brain; Are LLMs given tools during training? Humans learn with tool usage, What are current trends on tool usage in LLM training?"

    __SAMPLE_QUERY_6 = f"Today is {today_str}. Analyze stock price performance of Nvidia in the past month, compare it with top 3 listed POWER Producers in India."

    try:

        output = asyncio.run(run_task(__SAMPLE_QUERY_1))
        if output:
            print("\n" + "=" * 60)
            print("🎯 FINAL REPORT")
            print("=" * 60)
            print(output)
        else:
            print("❌ Task did not complete successfully.")
    except Exception as e:
        logger.error(f"❌ Fatal error: {e}")
        print(f"❌ Execution failed: {e}")
