<div align="center">

#  **Multi-Agent Financial Analysis System**
### 👥 **Team 4**

</div>

---

#### **Team Members:**  
**• Marwah Faraj**  
**• Patrick Woo-Sam**  
**• Atul Prasad**

---

### 🔗 **Project Resources**

- 🧩 **GitHub Repository:**  
  [https://github.com/marwahfaraj/Multi_Agent_Financial_Analysis_System](https://github.com/marwahfaraj/Multi_Agent_Financial_Analysis_System)

- 📓 **Project Notebook:**  
  [https://github.com/marwahfaraj/Multi_Agent_Financial_Analysis_System/blob/main/notebook.ipynb](https://github.com/marwahfaraj/Multi_Agent_Financial_Analysis_System/blob/main/notebook.ipynb)

---



### Project Overview

This notebook presents the implementation of a multi-agent, large language model (LLM)-driven 

financial analysis system that autonomously performs market research, earnings analysis, news 

sentiment extraction, and self-evaluation of analytical quality.

The system represents a practical application of AI agent collaboration,

integrating reasoning, retrieval, and evaluation modules within a single orchestrated workflow.

### Technological Stack

| Category                       | Technology / Framework                           | Description                                                                          |
| ------------------------------ | ------------------------------------------------ | ------------------------------------------------------------------------------------ |
| **LLM Backbone**               | Google **Gemini 2.5-Flash** (via Agno Framework) | Core reasoning and generation engine for all agents                                  |
| **Agent & Workflow Framework** | **Agno Agent / Workflow API**                    | Defines modular agents and orchestrates their interaction                            |
| **Market Data Source**         | **yfinance API**                                 | Retrieves historical and real-time stock data (OHLCV, price, metadata)               |
| **News Retrieval Tool**        | **DuckDuckGoTools**                              | Gathers recent financial news and metadata for sentiment classification              |
| **Knowledge Persistence**      | **MemoryTools + SQLite**                         | Enables long-term contextual storage and retrieval of financial insights             |
| **Evaluation Subsystem**       | **Evaluator Agent (Gemini)**                     | Provides quantitative assessment of report quality (accuracy, clarity, completeness) |
| **Execution Environment**      | **Python (≥3.11)**, **Jupyter Notebook**         | Development and presentation environment                                             |
| **Configuration Management**   | **dotenv**                                       | Securely manages environment variables and API keys                                  |


### Program workflow

```
┌──────────────────────────────────────────────────────────────────────────────┐
│                          ANALYSIS WORKFLOW                                   │
└──────────────────────────────────────────────────────────────────────────────┘
[User Prompt]
      │
      ▼
┌──────────────────────────────────────────────────────────────────────────────┐
│ Step 1: Preprocess Input                                                     │
│  Agent: preprocessing_agent (JSON-only)                                      │
│  Output: {ticker, action_item, data_types}                                   │
└──────────────────────────────────────────────────────────────────────────────┘
      │
      ▼
┌──────────────────────────────────────────────────────────────────────────────┐
│ Step 2: Gather Data                                                          │
│  Exec: multi_agent_router_executor → multi_agent_routing(...)                │
│  Routes (by data_types):                                                     │
│   • Market Data Agent  → tools: fetch_quote, fetch_ohlcv (yfinance)          │
│   • News Agent         → DuckDuckGoTools                                     │
│   • Earnings Agent     → filings/ratios/guidance                             │
│  Output: aggregated agent responses                                          │
└──────────────────────────────────────────────────────────────────────────────┘
      │
      ▼
┌──────────────────────────────────────────────────────────────────────────────┐
│ Step 3: Retrieve Stored Data                                                 │
│  Exec: stored_data_retriever_executor → Memory Agent (MemoryTools/SQLite)    │
│  Output: prior memories (if any) appended                                    │
└──────────────────────────────────────────────────────────────────────────────┘
      │
      ▼
┌──────────────────────────────────────────┬───────────────────────────────────┐
│ Parallel A: Synthesize Analysis           │ Parallel B: Store Data in Memory │
│  Agent: investment_research_agent        │  Agent: memory_agent             │
│  Output: research draft                  │  Output: “Noted/Stored”        │
└──────────────────────────────────────────┴───────────────────────────────────┘
                      │
                      ▼
┌──────────────────────────────────────────────────────────────────────────────┐
│ Step 4: Select Synthesis Output                                              │
│  Exec: select_synthesis_executor (pick research draft from Parallel outputs) │
└──────────────────────────────────────────────────────────────────────────────┘
      │
      ▼
┌──────────────────────────────────────────────────────────────────────────────┐
│ Step 5: Pre-Optimization Evaluation (Display)                                │
│  Exec: show_pre_eval_metrics_executor                                        │
│  Agent: evaluator_agent (UUID session)                                       │
│  Output: score table + ===PRE_EVAL_OVERALL=…=== marker                       │
└──────────────────────────────────────────────────────────────────────────────┘
      │
      ▼
┌──────────────────────────────────────────────────────────────────────────────┐
│ Step 6: Gate Optimization (≤ 50% only)                                       │
│  Exec: gate_optimization_executor                                            │
│  Decision: overall ≤ 0.50 → OPTIMIZE  |  overall > 0.50 → SKIP marker        │
└──────────────────────────────────────────────────────────────────────────────┘
      │
      ├────────────── if SKIP ────────────────────────────────────────────┐
      │                                                                  │
      ▼                                                                  │
┌────────────────────────────────────────────────────────────────────────┘
│ Step 7: Evaluator–Optimizer Loop (max_iterations = 1)                        │
│  • optimizer_executor                                                        │
│     - If SKIP: evaluate once and annotate                                    │
│     - Else: rewrite via investment_research_agent using prior eval JSON      │
│     - Re-evaluate with evaluator_agent (UUID session)                        │
│  • evaluator_end_condition: stop if overall ≥ 0.85 AND no gaps               │
└──────────────────────────────────────────────────────────────────────────────┘
      │
      ▼
┌──────────────────────────────────────────────────────────────────────────────┐
│ Step 8: Finalize & Print Optimized Report                                    │
│  Exec: finalize_print_executor (strip eval JSON; print clean report)         │
└──────────────────────────────────────────────────────────────────────────────┘

```

In [1]:
from __future__ import annotations
import json
import yfinance as yf
import types
from agno.agent import Agent
from agno.db.sqlite import SqliteDb
from agno.models.google import Gemini
from agno.tools import tool
from agno.tools.duckduckgo import DuckDuckGoTools
from agno.tools.memory import MemoryTools
from agno.workflow import Step, Workflow, StepInput, StepOutput, Parallel, Loop
from dotenv import load_dotenv
from json import loads
from typing import Dict, Any, Optional, Tuple
import re 
import uuid

Load environment variables from a .env file

The following variables must be defined:
- GOOGLE_API_KEY
- NEWSAPI_KEY
- FRED_API_KEY

In [2]:
#Load environment variables from a local .env file into the process environment.
load_dotenv()

#Set the default LLM model identifier for all agents in this notebook.
DEFAULT_MODEL_ID = "gemini-2.5-flash"

#Instantiate the Agno Gemini model wrapper with the chosen model ID.
DEFAULT_MODEL = Gemini(id=DEFAULT_MODEL_ID)

#Common configuration passed to all agents unless explicitly overridden.
DEFAULT_AGENT_KWARGS = {
    #Use the shared Gemini model for coherent behavior and easy swapping.
    "model": DEFAULT_MODEL,

    #Enable exponential backoff between retries to be considerate to provider rate limits
    #and transient failures (network hiccups, 429s).
    "exponential_backoff": True,

    #Number of times an agent will automatically retry a failed call.
    "retries": 5,

    #Initial delay (in seconds) before the first retry; subsequent retries back off exponentially.
    "delay_between_retries": 5,
}


## Define Agents

### Earnings Agent

In [3]:
#Instantiate an agent specialized in analyzing corporate earnings and financial filings.
earnings_agent = Agent(
    name="Earnings Agent",
    #System instructions that define the agent’s scope of work and output expectations.
    instructions=[
        "You are an earnings analysis agent for financial analysis.",
        "The user will provide a company name or stock symbol. Retrieve and summarize key insights from "
        "financial filings and earnings reports (such as SEC EDGAR) for the provided company or symbol.",
        "Focus on analyzing:",
        "1. Revenue and earnings growth trends",
        "2. Key financial ratios (P/E, debt-to-equity, ROE, etc.)",
        "3. Cash flow analysis and liquidity position",
        "4. Segment performance and business unit analysis",
        "5. Management guidance and forward-looking statements",
        "6. Risk factors and regulatory compliance issues",
        "7. Recent acquisitions, divestitures, or strategic initiatives",
        "Present your findings in a structured format with clear insights and implications for investment decisions.",
        "Highlight both positive and negative trends with supporting data from the filings.",
    ],
    #Lightweight persistence for this agent’s conversations and tool outputs.
    #Storing context in a per-agent SQLite DB enables traceability, reproducibility,
    #and the ability to resume or audit analyses.
    db=SqliteDb(db_file="earnings_agent.db"),
    #Inject the current date/time into the context.
    #Filings and guidance are time-sensitive; including timestamps helps 
    #the model reason about recency (e.g., “most recent quarter”).
    add_datetime_to_context=True,
    #Ask Agno to render the agent’s output as Markdown by default.
    markdown=True,
    #Inherit common reliability settings and default model configuration
    **DEFAULT_AGENT_KWARGS
)

### Market Data Agent

In [4]:
#Tool that agents can call to fetch the latest quote for a stock symbol.
@tool(name="fetch_quote", description="Get latest stock price and metadata for a symbol.")
def fetch_quote(symbol: str) -> str:
    """Fetch the latest quote for a stock symbol."""
    try:
        #Initialize a yfinance Ticker object for the requested symbol.
        t = yf.Ticker(symbol)
        #fast_info when available (lighter-weight, quick metadata access).
        info = getattr(t, "fast_info", {}) or {}
        #read the most recent price directly.
        price = info.get("last_price")

        #Fallback: if fast_info doesn't include a last price, pull recent daily history
        # and take the last close. 'auto_adjust=True' adjusts for splits/dividends.
        if price is None:
            hist = t.history(period="5d", interval="1d", auto_adjust=True)
            if not hist.empty:
                price = float(hist["Close"].iloc[-1])

        result = {
            "symbol": symbol.upper(),
            "last_price": None if price is None else float(price),
            "currency": info.get("currency"),
            "exchange": info.get("exchange"),
        }
        return json.dumps(result)
    except Exception as e:
        return json.dumps({"symbol": symbol, "error": str(e)})

#Define a tool that returns a small, recent OHLCV(Open, High, Low, Close, Volume) sample for a symbol.
@tool(name="fetch_ohlcv", description="Get last 5 OHLCV rows for a symbol.")
def fetch_ohlcv(symbol: str, period: str = "1y", interval: str = "1d") -> str:
    """Fetch OHLCV(Open, High, Low, Close, Volume) history for a stock symbol."""
    try:
        df = yf.download(symbol, period=period, interval=interval, 
            progress=False, auto_adjust=True, threads=False)
        if df.empty:
            return json.dumps({"symbol": symbol.upper(), "rows": 0, "tail5": []})
        tail = df.tail(5).reset_index()
        tail["Date"] = tail["Date"].astype(str)
        result = {
            "symbol": symbol.upper(),
            "rows": int(len(df)),
            "period": period,
            "interval": interval,
            "tail5": [
                {
                    "Date": r["Date"],
                    "Open": float(r["Open"]),
                    "High": float(r["High"]),
                    "Low": float(r["Low"]),
                    "Close": float(r["Close"]),
                    "Adj Close": float(r.get("Adj Close", r["Close"])),
                    "Volume": int(r["Volume"]),
                }
                for _, r in tail.iterrows()
            ],
        }
        return json.dumps(result)
    except Exception as e:
        return json.dumps({"symbol": symbol, "error": str(e)})

#Collect the defined tools in a list for convenient injection into an agent.
TOOLS = [fetch_quote, fetch_ohlcv]

#Create a dedicated agent that knows *when* to call the above tools and
#how to summarize their outputs for the user.
market_data_agent = Agent(
    name="Market Data Agent",
    instructions=[
        "You are a financial market data agent.",
        "When the user provides a stock symbol or company name, call fetch_quote or fetch_ohlcv "
        "to fetch and summarize market data.",
        "Be clear about symbol, time range, and assumptions.",
    ],
    #Inject tools, so the agent can invoke them during reasoning.
    tools=TOOLS,
    db=SqliteDb(db_file="market_data_agent.db"),
    add_datetime_to_context=True,
    markdown=True,
    **DEFAULT_AGENT_KWARGS
)

### News Agent

In [5]:
#Instantiate an agent dedicated to discovering and summarizing company-specific news.
news_agent = Agent(
    name="News Agent",
    instructions=[
        "You are a financial analysis assistant."
        "The user will provide the name of a company or its stock ticker symbol. "
        "Please identify the most recent and relevant news articles for financial analysis.",
        "First search for recent news articles about the company or stock ticker symbol provided by the user.",
        "Then analyze the news articles and classify tem as positive, negative, or "
        "neutral in terms of their potential impact on the company's stock price.",
        "Create a list item for each news article, including these columns: 'Title', "
        "'Source', 'URL', 'Date', 'Sentiment', and 'Summary'.",
        """Example:
        - Title: Example News Article
            - Source: Example News Source
            - URL: https://example.com/news/article
            - Date: 2023-01-01
            - Sentiment: Positive
            - Summary: This is an example summary of the news article.
        - Title: Another News Article
            - Source: Another News Source
            - URL: https://example.com/news/another-article
            - Date: 2023-01-02
            - Sentiment: Negative
            - Summary: This is another example summary of a different news article.
        """,
        "Respond only the list of news articles in markdown format.",
    ],
    db=SqliteDb(db_file="news_agent.db"),
    #Use web search tool for news. DuckDuckGoTools avoids API keys, 
    #and a small fixed_max_results keeps token and
    #cognitive load low while surfacing top, relevant items.
    tools=[DuckDuckGoTools(fixed_max_results=5)],
    #Limit the number of tool invocations the agent can make per request.
    tool_call_limit=5,
    #Include prior messages in the model context.
    add_history_to_context=True,
    #Provide the current date/time to the model.
    add_datetime_to_context=True,
    markdown=True,
    **DEFAULT_AGENT_KWARGS
)

### Memory Agent

In [6]:
memory_instructions = """Capture any memory that can be used for financial analysis. Examples include:
- news articles
- stock performance data
- market trends
- analyst opinions
Tag or categorize the information with relevant keywords for easy retrieval later.
If multiple pieces of information are provided, store them as separate memories.
Do not give your opinion or analysis. Just store or retrieve the information as requested.,

You have access to Think, Add Memory, Update Memory, Delete Memory, and Analyze tools that 
will help you financial analysis data called memories and analyze their operations.
Use these tools as frequently as needed to successfully complete memory management tasks.

## How to use the Think, Memory Operations, and Analyze tools:             
          
1. **Think**
- Purpose: A scratchpad for planning memory operations, brainstorming memory content, 
    and refining your approach. You never reveal your     
"Think" content to the user.           
- Usage: Call `think` whenever you need to figure out what memory operations to perform, 
    analyze requirements, or decide on strategy.       
          
2. **Get Memories**                    
- Purpose: Retrieves a list of memories from the database.                         
- Usage: Call `get_memories` when you need to retrieve memories.                   
          
3. **Add Memory**                      
- Purpose: Creates new memories in the database with specified content and metadata.                    
- Usage: Call `add_memory` with memory content and optional topics when you need to store new information.                                  
          
4. **Update Memory**                   
- Purpose: Modifies existing memories in the database by memory ID.        
- Usage: Call `update_memory` with a memory ID and the fields you want to change. 
    Only specify the fields that need updating.               
          
5. **Delete Memory**                   
- Purpose: Removes memories from the database by memory ID.                
- Usage: Call `delete_memory` with a memory ID when a memory is no longer needed or requested to be removed.                                
          
6. **Analyze**                         
- Purpose: Evaluate whether the memory operations results are correct and sufficient. 
    If not, go back to "Think" or use memory operations   
with refined parameters.               
- Usage: Call `analyze` after performing memory operations to verify:      
    - Success: Did the operation complete successfully?                    
    - Accuracy: Is the memory content correct and well-formed?             
    - Completeness: Are all required fields populated appropriately?       
    - Errors: Were there any failures or unexpected behaviors?             
          
**Important Guidelines**:              
- Do not include your internal chain-of-thought in direct user responses.  
- Use "Think" to reason internally. These notes are never exposed to the user.                          
- When you provide a final answer to the user, be clear, concise, and based on the memory operation results.                                
- If memory operations fail or produce unexpected results, acknowledge limitations and explain what went wrong.                             
- Always verify memory IDs exist before attempting updates or deletions.   
- Use descriptive topics and clear memory content to make memories easily searchable and understandable.
You can refer to the examples below as guidance for how to use each tool.  
          
### Examples
          
#### Example 1: Adding multiple news articles as memories

User: <multiple articles and metadata about stock performance and market trends>
Think:  I should store each article and its metadata. I should create a memory with this information and use relevant topics for easy       
retrieval.
Add Memory: memory="<article1 information>, <article1 metadata>, <article1 sentiment>", 
    topics=["<company name>", "<stock symbol>", "news", "stock performance", "market trends", "<date>", "<sentiment>"]
Add Memory: memory="<article2 information>, <article2 metadata>, <article2 sentiment>", 
    topics=["<company name>", "<stock symbol>", "news", "stock performance", "market trends", "<date>", "<sentiment>"]
Add Memory: memory="<article3 information>, <article3 metadata>, <article3 sentiment>", 
    topics=["<company name>", "<stock symbol>", "news", "stock performance", "market trends", "<date>", "<sentiment>"]
Analyze: Successfully created a memory with article information for each article provided. 
    The topics are well-chosen for future retrieval. This should help with future
news-related requests.

Final Answer: Noted. I've stored the articles information. I'll remember the key details for future reference.

#### Example 2: Adding 1 news article as memories

User: <article information and metadata about stock performance and market trends>
Think:  I should store the article and its metadata. I should create a memory with this 
    information and use relevant topics for easy retrieval.
Add Memory: memory="<article information>, <article metadata>, <article sentiment>", 
    topics=["<company name>", "<stock symbol>", "news", "stock performance", "market trends", "<date>", "<sentiment>"]
Analyze: Successfully created a memory with article information. 
    The topics are well-chosen for future retrieval. This should help with future news-related requests.

Final Answer: Noted. I've stored the article information. I'll remember the key details for future reference.

#### Example 3: Updating Existing Information

User: Whoops, that was a mistake. The sentiment for the Business Insider's article written about 
    MSFT in 2025-10-05 should be "neutral" instead of "positive".
Think: I need to find the specific memory related to this article and update its sentiment.
Update Memory: memory_id="matching_memory_id", memory="Business Insider's article about MSFT in 2025-10-05",          
topics=["MSFT", "Business Insider", "2025-10-05", "neutral"]
Analyze: Successfully updated the sentiment for the Business Insider's article about MSFT. 
    The content now accurately reflects the corrected sentiment.

Final Answer: I've updated the sentiment for the Business Insider's article about MSFT to "neutral". 
    Let me know if there's anything else you'd like to adjust.
          
#### Example 4: Removing Outdated Information                              

User: Please forget all Microsoft data older than 2023.
Think: The user wants me to delete old Microsoft data since it's no longer relevant. 
    I should find and remove those memories.
Delete Memory: memory_id="microsoft_data_memory_id1"
Delete Memory: memory_id="microsoft_data_memory_id2"
Analyze: Successfully deleted the outdated Microsoft data memory. 
    The old information won't interfere with future requests.

Final Answer: I've removed your old Microsoft data. Feel free to share any new 
    information when you're ready, and I'll store the updated information.

#### Example 5: Retrieving Memories    
          
User: Latest MSFT news?
Think: The user wants to retrieve financial analysis data about Microsoft. 
    I should use the get_memories tool to retrieve the memories.
Get Memories:
Analyze: Successfully retrieved the memories about Microsoft. The memories are relevant to the user's request criteria.

Final Answer: I've retrieved the memories about Microsoft. 
    The latest news includes information about its stock performance, market trends, and other relevant financial data.
"""

#Memory agent with persistent storage and clear operating rules.
#Provides long-lived context that can be reused across analyses (e.g., past
#news, cached insights), improving continuity and reducing redundant retrieval work
memory_agent = Agent(
    name="Memory Agent",
    #Use a fixed session identifier so that the same logical memory context persists
    #across runs in this notebook/session. This keeps memories grouped and retrievable.
    session_id="financial_memory_session",
    #A stable user identifier for the memory store, useful if multiple users or
    #personas will interact with the same backend in other deployments.
    user_id="financial_user",
    instructions=[
        "You are a memory agent that stores and retrieves information for financial analysis.",
        "User may provide new information related to financial analysis. Store this information in a structured format.",
        "Always acknowledge if any information is stored.",
        "User may request information about a specific company or stock ticker symbol. Upon such requests, "
        "retrieve and summarize all relevant stored information related to that company or stock ticker symbol.",
    ],
    #Attach Memory Tools with a dedicated SQLite database and operational instructions.
    #The separate DB isolates memory content from other agents and ensures persistence
    #beyond one cell/step instructions define how the tool should behave.
    tools=[
        MemoryTools(
            db=SqliteDb(db_file="memory_agent_memories.db"),
            instructions=memory_instructions,
        )
    ],
    add_datetime_to_context=True,
    markdown=True,
    **DEFAULT_AGENT_KWARGS
)

### Investment Research Agent

In [7]:
investment_research_agent = Agent(
    name="Investment Research Agent",
    instructions=[
        "You are an investment research agent for financial analysis.",
        "The user will provide a stock symbol or company name. Plan a research workflow for "
        "the provided stock symbol, including which data to gather, which agents to consult, and how to synthesize the findings.",
        "Create a comprehensive research plan that includes:",
        "1. Market data analysis (price trends, volume, technical indicators)",
        "2. News sentiment analysis (recent news articles and their impact)",
        "3. Earnings and financial filings review (SEC filings, quarterly reports)",
        "4. Risk assessment and market positioning",
        "5. Investment recommendation framework",
        "Structure your response as a detailed research workflow with specific steps and data sources.",
        "Provide actionable insights and recommendations based on your analysis framework.",
    ],
    db=SqliteDb(db_file="investment_research_agent.db"),
    add_datetime_to_context=True,
    markdown=True,
    **DEFAULT_AGENT_KWARGS
)

### Preprocessing Agent

In [8]:
#Preprocessing Agent: normalizes free-form user prompts into a strict JSON schema.
#Extracts (a) ticker symbol, (b) action item (intent), and (c) data_types
#expected downstream (subset of ['earnings', 'news', 'market']).
#Establishing a stable, machine-readable contract at the pipeline entry
#removes ambiguity and makes routing / orchestration deterministic.
preprocessing_agent = Agent(
    name="Preprocessing agent",
    #System prompt that specifies scope, output constraints, and examples.
    #Forces the LLM to act as a preprocessor, not an analyst, and
    #emit strictly structured JSON that later steps can parse safely.
    instructions=[
        "You are a financial analysis assistant."
        "Your task is to preprocess prompt inputs for downstream analysis.",
        "You respond in strict JSON format.",
        "Given raw text input, perform the following preprocessing steps:",
        "- Identify the ticker symbol for the company mentioned in the text.",
        "- Extract the intent of the user's request without losing context. Convert it into a concise action item.",
        "- Determine the type of data needed for analysis (The only valid values are: ['earnings', 'news', 'market']).",
        "Examples:",
        """Input: "Can you provide the latest news on Microsoft?"
Output: {"ticker": "MSFT", "action_item": "Provide the latest news on Microsoft", "data_types": ["news"]}""",
        """Input: "I need the recent earnings report for Apple."
Output: {"ticker": "AAPL", "action_item": "Retrieve the recent earnings report for Apple", "data_types": ["earnings"]}""",
        """Input: "What's the current market status of Tesla?"
Output: {"ticker": "TSLA", "action_item": "Get the current market status of Tesla", "data_types": ["market"]}""",
        """Input: "Perform a comprehensive analysis of Amazon.",
Output: {"ticker": "AMZN", "action_item": "Perform a comprehensive analysis of Amazon", "data_types": ["earnings", "news", "market"]}""",
    ],
    use_json_mode=True,
    **DEFAULT_AGENT_KWARGS
)

### Evaluator Agent

In [9]:
#-----------------------------------------------------------------------------
#Evaluator Agent: provides quantitative and qualitative assessment of drafts.
#Scores analysis drafts on three primary quality dimensions and returns a
#structured JSON payload with scores, feedback, and concrete follow-ups.
#
# ── Metrics (0.0–1.0) used by the evaluator ───────────────────────────────────
# • completeness: Does the draft cover the requested scope end-to-end?
#   - Includes presence of all key sections (market, earnings, news, risks),
#     data references, and explicit limitations/assumptions.
#   - Heuristics: breadth of topics covered, inclusion of requested artifacts,
#     presence of evidence and rationale for claims.
#
# • accuracy: Are the statements/data consistent and plausible?
#   - Penalizes factual conflicts, unsupported claims, or misinterpretation.
#   - Heuristics: internal consistency, alignment between numbers and prose,
#     cautious handling of uncertainty, and avoidance of over-claiming.
#
# • clarity: Is the writing structured, readable, and unambiguous?
#   - Rewards clean organization, scannable sections, and precise language.
#   - Heuristics: logical flow, concise sentences, informative headings/tables.
#
# • overall: Mean(completeness, accuracy, clarity).
#   - Single scalar used to gate optimization and readiness.
#
# ── Readiness rule ────────────────────────────────────────────────────────────
# • ready_for_delivery = True iff overall ≥ 0.85 AND no critical gaps remain.
#   - Threshold chosen to balance rigor (academic standards) with practicality.
#   - If below threshold, downstream logic may trigger an optimization pass.
#
# ── Qualitative fields ────────────────────────────────────────────────────────
# • feedback.strengths: What the draft did well (kept concise & actionable).
# • feedback.gaps: Missing/weak elements requiring attention.
# • feedback.suggestions: Targeted edits to raise scores efficiently.
# • actions.priority_fixes: High-leverage edits for the next revision cycle.
# • actions.checks: Verifications (e.g., cross-check figures) before finalizing.
# • actions.followups: Optional deeper work (e.g., add comparable peers).
#
# The agent stores its conversation state in a dedicated SQLite DB to allow
# auditability and reproducibility of evaluations across runs.

evaluator_agent = Agent(
    name="Evaluator Agent",
    instructions=[
        "You are an evaluation agent for financial research outputs.",
        "Evaluate drafts for completeness, accuracy, and clarity.",
        "Provide actionable feedback with specific, constructive suggestions.",
        "If data is missing or uncertain, recommend concrete next steps to resolve gaps.",
        "Always return a JSON object with this schema:\n"
        "{"
        '  "scores": {"completeness": float, "accuracy": float, "clarity": float, "overall": float},'
        '  "feedback": {"strengths": [str], "gaps": [str], "suggestions": [str]},'
        '  "actions": {"priority_fixes": [str], "checks": [str], "followups": [str]},'
        '  "ready_for_delivery": bool'
        "}",
        "Scores must be 0.0-1.0. The 'overall' is the mean of the three dimensions.",
        "Mark 'ready_for_delivery' true only if overall ≥ 0.85 and no critical gaps remain.",
        "Be objective, concise, and specific. Generic advice is ok.",
    ],
    db=SqliteDb(db_file="evaluator_agent.db"),
    add_datetime_to_context=True,
    markdown=True,
    **DEFAULT_AGENT_KWARGS,
)

## Workflows

### Multi-Agent Router

In [10]:
#Agent Registry and Routing Utilities
#Central directory of agents + functions to route incoming tasks to the
#correct specialist (single or multiple), with a lightweight “intelligent”
#classifier based on keywords/regex. Map short keys to instantiated 
#agents for programmatic lookup and dispatch.
AGENT_REGISTRY = {
    "investment": investment_research_agent,
    "earnings": earnings_agent,
    "news": news_agent,
    "memory": memory_agent,
    "market": market_data_agent,
    "evaluator": evaluator_agent,
}


def route_content(content_type: str, data: str, **kwargs) -> Dict[str, Any]:
    """
    Implement Routing workflow pattern:
    Direct content to the right specialist (earnings, news, or market analyzers)
    
    Args:
        content_type: Type of content (earnings, news, market, memory, investment, evaluator)
        data: Content data to analyze
        **kwargs: Additional parameters to pass to the agent
        
    Returns:
        Dict containing:
        - agent_name: Name of the agent that processed the request
        - content_type: Type of content routed
        - response: Analysis from the appropriate specialist agent
        - status: Success or error status
        
    Example:
        >>> result = route_content("market", "AAPL")
        >>> print(result["response"])
    """
    # Normalize content type
    content_type_lower = content_type.lower().strip()
    
    # Map content types to agents
    routing_map = {
        "earnings": "earnings",
        "financial": "earnings",
        "filings": "earnings",
        "sec": "earnings",
        "quarterly": "earnings",
        "annual": "earnings",
        
        "news": "news",
        "sentiment": "news",
        "articles": "news",
        "headlines": "news",
        
        "market": "market",
        "price": "market",
        "stock": "market",
        "quote": "market",
        "ohlcv": "market",
        "technical": "market",
        
        "memory": "memory",
        "remember": "memory",
        "recall": "memory",
        "store": "memory",
        
        "investment": "investment",
        "research": "investment",
        "analysis": "investment",
        "recommendation": "investment",
        "general": "investment",
        
        "evaluator": "evaluator",
        "evaluate": "evaluator",
        "quality": "evaluator",
        "assess": "evaluator",
    }
    
    # Find the appropriate agent
    agent_key = routing_map.get(content_type_lower)
    
    if not agent_key:
        return {
            "agent_name": "Unknown",
            "content_type": content_type,
            "response": f"Error: Unknown content type '{content_type}'. "
                       f"Valid types: {', '.join(set(routing_map.values()))}",
            "status": "error"
        }
    
    # Get the agent from registry
    agent = AGENT_REGISTRY.get(agent_key)
    
    if not agent:
        return {
            "agent_name": agent_key,
            "content_type": content_type,
            "response": f"Error: Agent '{agent_key}' not found in registry",
            "status": "error"
        }
    
    try:
        # Route to the appropriate agent
        response = agent.run(data, **kwargs)
        
        # Extract content from response (handle different response formats)
        if hasattr(response, 'content'):
            response_content = response.content
        elif isinstance(response, dict) and 'content' in response:
            response_content = response['content']
        else:
            response_content = str(response)
        
        return {
            "agent_name": agent.name,
            "content_type": content_type,
            "response": response_content,
            "status": "success"
        }
        
    except Exception as e:
        return {
            "agent_name": agent.name if agent else "Unknown",
            "content_type": content_type,
            "response": f"Error during routing: {str(e)}",
            "status": "error"
        }


def intelligent_routing(content: str, context: Optional[str] = None) -> Dict[str, Any]:
    """
    Intelligently route content based on automatic analysis of content type.
    Uses pattern matching and keyword analysis to determine the best agent.
    
    Args:
        content: Raw content to analyze and route
        context: Optional context to help with routing decision
        
    Returns:
        Dict containing:
        - agent_name: Name of the agent that processed the request
        - detected_type: Automatically detected content type
        - confidence: Confidence score of the routing decision (0.0 to 1.0)
        - response: Analysis from the most appropriate agent
        - status: Success or error status
        
    Example:
        >>> result = intelligent_routing("What is the latest stock price for TSLA?")
        >>> print(f"Routed to: {result['agent_name']}")
        >>> print(result['response'])
    """
    # Keywords for each content type with confidence weights
    content_lower = content.lower()
    context_lower = context.lower() if context else ""
    combined_text = f"{content_lower} {context_lower}"
    
    # Define keyword patterns with weights
    patterns = {
        "market": {
            "keywords": ["price", "stock", "quote", "ohlcv", "market data", "ticker", 
                        "volume", "technical", "chart", "trading"],
            "weight": 0.0
        },
        "earnings": {
            "keywords": ["earnings", "financial", "filings", "sec", "quarterly", 
                        "annual", "revenue", "profit", "balance sheet", "10-k", "10-q"],
            "weight": 0.0
        },
        "news": {
            "keywords": ["news", "article", "headlines", "sentiment", "recent", 
                        "announcement", "press release", "media"],
            "weight": 0.0
        },
        "memory": {
            "keywords": ["remember", "recall", "store", "forget", "save", 
                        "retrieve", "memory", "past"],
            "weight": 0.0
        },
        "evaluator": {
            "keywords": ["evaluate", "assess", "quality", "review", "feedback",
                        "score", "rating", "critique"],
            "weight": 0.0
        },
    }
    
    # Calculate weights based on keyword matches
    for category, data in patterns.items():
        for keyword in data["keywords"]:
            if keyword in combined_text:
                data["weight"] += 1.0
    
    # Regex patterns for specific formats (boost confidence)
    if re.search(r'\b[A-Z]{1,5}\b', content):  # Stock ticker pattern
        patterns["market"]["weight"] += 2.0
    
    if re.search(r'\b(10-[KQ]|8-K|earnings report|quarterly report)\b', content_lower):
        patterns["earnings"]["weight"] += 2.0
    
    # Determine the best match
    detected_type = max(patterns.items(), key=lambda x: x[1]["weight"])
    content_type = detected_type[0]
    confidence_score = detected_type[1]["weight"]
    
    # If no strong match found, default to investment research agent
    if confidence_score == 0.0:
        content_type = "investment"
        confidence_score = 0.5  # Medium confidence for default
    else:
        # Normalize confidence score (cap at 1.0)
        confidence_score = min(confidence_score / 5.0, 1.0)
    
    # Route to the determined agent
    result = route_content(content_type, content)
    
    # Add intelligent routing metadata
    result["detected_type"] = content_type
    result["confidence"] = round(confidence_score, 2)
    result["routing_method"] = "intelligent"
    
    return result


def multi_agent_routing(content: str, agent_types: list) -> Dict[str, Any]:
    """
    Route content to multiple agents and aggregate results.
    Useful for comprehensive analysis requiring multiple perspectives.
    
    Args:
        content: Content to analyze
        agent_types: List of agent types to route to (e.g., ["market", "news", "earnings"])
        
    Returns:
        Dict containing:
        - agents_used: List of agents that processed the request
        - responses: Dict mapping agent names to their responses
        - status: Overall status
        
    Example:
        >>> result = multi_agent_routing("AAPL", ["market", "news"])
        >>> for agent, response in result['responses'].items():
        >>>     print(f"{agent}: {response}")
    """
    results = {
        "agents_used": [],
        "responses": {},
        "status": "success"
    }
    
    for agent_type in agent_types:
        try:
            response = route_content(agent_type, content)
            if response["status"] == "success":
                results["agents_used"].append(response["agent_name"])
                results["responses"][response["agent_name"]] = response["response"]
            else:
                results["responses"][agent_type] = f"Error: {response['response']}"
                results["status"] = "partial_success"
        except Exception as e:
            results["responses"][agent_type] = f"Exception: {str(e)}"
            results["status"] = "partial_success"
    
    return results

### Evaluator-optimizer workflow

In [11]:
# Threshold score (0–1) required to consider a draft "ready" without further optimization.
READY_THRESHOLD = 0.85

# Delimiter used to concatenate the pre-eval display block with the draft text in one payload.
_PRE_EVAL_MARK = "\n\n===DRAFT_BELOW==="

# Formats the evaluator's feedback.
def _format_eval_matrix(feedback: dict) -> str:
    """
    Build a Markdown score table and compute the overall score from an evaluator response.

    Logic summary:
      - Safely extract the 'scores' dict from the evaluator output and coerce each metric
        (completeness, accuracy, clarity) to float with 0.0 defaults.
      - Compute 'overall' as the mean of the three metrics when not explicitly provided.
      - Extract 'gaps' (missing or weak areas) from feedback for a bullet list.
      - Compose a compact Markdown table plus a "Gaps" section suitable for notebook display.

    Returns:
      Tuple[str, float]: (markdown_table, overall_score), where `markdown_table` is a fully
      formatted Markdown string and `overall_score` is the numeric scalar used by gating logic.
    """
    scores = feedback.get("scores", {}) or {}
    # Extract completeness score; default to 0.0 if missing; cast to float for math/formatting.
    comp = float(scores.get("completeness", 0.0))
    # Extract accuracy score
    acc  = float(scores.get("accuracy", 0.0))
    # Extract clarity score
    clr  = float(scores.get("clarity", 0.0))
    # Compute overall if not provided: mean of the three scores when any are present
    ovl  = float(scores.get("overall", (comp + acc + clr) / 3 if any([comp, acc, clr]) else 0.0))
    # Extract gap messages (what's missing) to display
    gaps = feedback.get("feedback", {}).get("gaps", []) or []

    rows = [
        # Header row for the table.
        "| Metric        | Score |",
        "|--------------|-------|",
        f"| Completeness | {comp:.2f} |",
        f"| Accuracy     | {acc:.2f} |",
        f"| Clarity      | {clr:.2f} |",
        f"| **Overall**  | **{ovl:.2f}** |",
        "",
        "#### Gaps",
        # print gaps as a bullet list; show placeholder if there are none.
        "- " + "\n- ".join(gaps) if gaps else "_None detected_",
    ]
    return "\n".join(rows), ovl

# Normalize any Agno step context into plain text for downstream steps.
def _extract_text_from_ctx(ctx, *, prefer_longest_from_lists: bool = True) -> str:
    """
    Normalize Agno step contexts to a plain string for downstream steps.

    Handles:
      - StepInput: prefer .previous_step_content (the upstream result) over .content (the current payload)
      - StepOutput: use .content
      - list of StepOutputs/strings/dicts (e.g., from Parallel): pick the longest text (default) so we prefer
        the synthesized analysis over short acknowledgements (like memory "Noted..."), or use last item if configured
      - anything else: return str(ctx) as a safe fallback

    Args:
      prefer_longest_from_lists: when True (default), select the longest string from list contexts; when False,
                                 fall back to using the last meaningful list item.

    Returns:
      A best-effort string representing the textual content we want to evaluate/optimize/print.
    """

    # Check for StepInput-like objects: these usually have both .previous_step_content and .content attributes.
    #  prefer .previous_step_content because it carries the *output* of the prior step.
    if hasattr(ctx, "previous_step_content") or hasattr(ctx, "content"):
        # Return the previous step's content if available; otherwise use current content; else empty string.
        return (getattr(ctx, "previous_step_content", None)
                or getattr(ctx, "content", "")
                or "")

    # If it's StepOutput-like (has a .content attribute) but not StepInput, just return that content.
    if hasattr(ctx, "content"):
        # Guard with "or ''" so callers never get None.
        return getattr(ctx, "content") or ""

    # If the context is a list (common after Parallel), then decide which element's text to pass along.
    if isinstance(ctx, list) and ctx:
        # If  prefer the longest text, capture the synthesized analysis instead of a short ack.
        if prefer_longest_from_lists:
            # Initialize "best" as empty string to compare lengths safely.
            best = ""
            # Iterate through each item in the list to select the longest content.
            for item in ctx:
                # If the item looks like StepOutput with a string .content, consider it.
                if hasattr(item, "content") and isinstance(item.content, str):
                    # Update "best" when we find a longer string.
                    if len(item.content) > len(best):
                        best = item.content
                # If the item is a plain string, also consider and compare lengths.
                elif isinstance(item, str):
                    if len(item) > len(best):
                        best = item
                # If the item is a dict with a string 'content' key (some event shapes), consider it too.
                elif isinstance(item, dict) and isinstance(item.get("content"), str):
                    if len(item["content"]) > len(best):
                        best = item["content"]
            # Return whichever candidate ended up longest (may still be empty if no text found).
            return best
        else:
            # Alternative policy: walk from the end and pick the last meaningful content (sometimes desired).
            for item in reversed(ctx):
                # Prefer StepOutput-like items with string .content.
                if hasattr(item, "content") and isinstance(item.content, str):
                    return item.content or ""
                # Next, accept plain strings.
                if isinstance(item, str):
                    return item
                # Finally, accept dicts that carry a string 'content'.
                if isinstance(item, dict) and isinstance(item.get("content"), str):
                    return item["content"] or ""
            # If the list had no usable text, return empty string.
            return ""

    # Fallback: for any other unexpected shape (None, numbers, custom objects), return a safe string form.
    return str(ctx or "")

# Agno executor step that displays pre-optimization scores and passes draft forward.
def show_pre_eval_metrics_executor(step_input) -> StepOutput:
    """
    Compute and display pre-optimization evaluation metrics, then forward the draft downstream.

    Logic summary:
      - Extract the synthesized draft text from the incoming StepInput/StepOutput/list using
        `_extract_text_from_ctx` (robust against different Agno container types).
      - Call `_evaluate` to obtain a structured evaluation JSON for the draft.
      - Render a Markdown matrix via `_format_eval_matrix` and embed a parseable marker
        `===PRE_EVAL_OVERALL=<float>===` so later steps can read the score without re-evaluating.
      - Concatenate the display block, marker, and the raw draft separated by `_PRE_EVAL_MARK`.

    Returns:
      StepOutput: Content containing:
        - A Markdown section "Pre-Optimization Evaluation" with a score table.
        - The PRE_EVAL_OVERALL marker.
        - The delimiter `_PRE_EVAL_MARK` followed by the original draft for subsequent steps.
    """
    # Extract the synthesized draft text from prior steps (handles StepInput/list/etc.).
    draft = _extract_text_from_ctx(step_input)
    # Call the evaluator agent to get structured feedback on the draft.
    feedback = _evaluate(draft)
    # Format the feedback into a markdown table and get numeric overall.
    matrix_md, ovl = _format_eval_matrix(feedback)
    # Embed the overall score as a parsable marker so later steps can read it without re-evaluating.
    meta = f"\n\n===PRE_EVAL_OVERALL={ovl:.4f}==="
    # Compose the step output: the visible matrix, then meta marker, then our delimiter, then the draft.
    content = "### Pre-Optimization Evaluation\n" + matrix_md + meta + _PRE_EVAL_MARK + "\n" + draft
    # Return as StepOutput so Agno prints it in a boxed step and forwards the content.
    return StepOutput(content=content)


# Normalizes any Agent.run(...) response (object/string/generator/list/chunks) into plain text.
def _to_text(resp) -> str:
    """
    Normalize multiple agent responses into a single plain-text string.

    Logic summary:
      - If the response has a `.content` attribute (common in Agno message objects), return it.
      - If it's already a string, return as-is.
      - If it's a dict, prefer 'content' or 'delta'.
      - If it's an iterable (e.g., streaming/generator), consume and concatenate textual parts
        (handles strings, dict chunks, objects with .content) while swallowing iteration errors.
      - Otherwise, coerce to str() as a safe fallback.

    Returns:
      str: The best-effort textual representation of the agent output, trimmed.

    """
    # If the response has a .content attribute (Agno message object), use it directly.
    if hasattr(resp, "content"):
        return resp.content
    # If it's already a string, return as-is.
    if isinstance(resp, str):
        return resp
    # If it's a dict (e.g., event chunk), try common keys; otherwise stringify.
    if isinstance(resp, dict):
        return resp.get("content") or resp.get("delta") or str(resp)
    # If it's a generator/iterable (streaming), consume and concatenate all parts.
    if isinstance(resp, types.GeneratorType) or hasattr(resp, "__iter__"):
        # Accumulate chunks to form the full text.
        parts = []
        try:
            # Iterate through chunks safely; support multiple shapes.
            for chunk in resp:
                # Prefer chunk.content if present.
                if hasattr(chunk, "content"):
                    parts.append(chunk.content)
                # Otherwise, if chunk is a string, append it.
                elif isinstance(chunk, str):
                    parts.append(chunk)
                # For dict chunks, try common text-bearing keys.
                elif isinstance(chunk, dict):
                    parts.append(chunk.get("content") or chunk.get("delta") or "")
                # Fallback: stringify unknown shapes.
                else:
                    parts.append(str(chunk))
        except Exception:
            # Swallow iteration errors to avoid failing the whole step.
            pass
        # Join and trim the final text.
        return "".join(parts).strip()
    # Final fallback: stringify arbitrary objects.
    return str(resp)


# Computes an overall score as the mean of completeness/accuracy/clarity.
def _compute_overall(scores: dict) -> float:
    """
    Compute a single scalar 'overall' score as the arithmetic mean of three dimensions.

    Args:
      scores (dict): A mapping with possible keys 'completeness', 'accuracy', 'clarity'.
                     Missing keys default to 0.0.

    Returns:
      float: (completeness + accuracy + clarity) / 3, with safe defaults and divide-by-zero guard.
    """
    # Define the three dimensions we average.
    keys = ["completeness", "accuracy", "clarity"]
    # Extract each score as float with default 0.0.
    vals = [float(scores.get(k, 0.0)) for k in keys]
    # Avoid divide-by-zero by max(len(vals), 1).
    return sum(vals) / max(len(vals), 1)


#  Parse strict JSON from potentially messy LLM output
def _json_from_messy(text: str):
    """
    Attempt to recover a valid JSON object from imperfect LLM output. LMs often wrap JSON in json … or 
    add explanations before/after the object. _json_from_messy first strips fence lines and tries a plain json.loads.
    When direct parsing fails, it hunts for the largest {...} blocks with a permissive regex, then attempts to 
    parse those candidates starting from the last one (often the most complete).

    Logic summary:
      - Strip fenced code markers (```json, ```), then try direct json.loads.
      - If that fails, regex-extract the largest '{...}' blocks and try to parse from the end.
      - If still failing, apply minor "repair" (remove trailing commas before ']' or '}').
      - As a last resort, return a sentinel dict with the raw response.

    Args:
      text (str): The raw text possibly containing JSON (with or without extra prose).

    Returns:
      dict | list: Parsed JSON structure when possible; otherwise, {"raw_response": <text>}.
    """
    # Remove code fences like ```json to increase chance of a clean parse.
    cleaned = "\n".join(line for line in (text or "").splitlines() if not line.strip().startswith("```")).strip()
    # First attempt: direct JSON parse of the cleaned string.
    try:
        return json.loads(cleaned)
    except Exception:
        # Ignore and try extracting a JSON object substring below.
        pass
    # Find the largest {...} blocks using a permissive regex across newlines.
    m = re.findall(r"\{[\s\S]*\}", cleaned)
    # If any candidates exist, try them from the end (often the last block is complete).
    if m:
        for block in reversed(m):
            try:
                # Try parsing the candidate block.
                return json.loads(block)
            except Exception:
                # Attempt minor repairs (remove trailing commas) and re-parse.
                repaired = block.replace(",]", "]").replace(",}", "}")
                try:
                    return json.loads(repaired)
                except Exception:
                    # Continue trying earlier blocks.
                    continue
    # If nothing parsed, return a sentinel with the raw text for debugging.
    return {"raw_response": text}

EVAL_SCHEMA_JSON = """{
  "scores": {
    "completeness": 0.0,
    "accuracy": 0.0,
    "clarity": 0.0,
    "overall": 0.0
  },
  "feedback": {
    "strengths": [],
    "gaps": [],
    "suggestions": []
  },
  "actions": {
    "priority_fixes": [],
    "checks": [],
    "followups": []
  },
  "ready_for_delivery": false
}"""


# Calls the evaluator agent to score a draft and returns parsed JSON (with 'overall' ensured).
def _evaluate(draft: str) -> dict:
    """
    Query the evaluator agent for structured quality feedback on a draft.

    Logic summary:
      - Construct a strict instruction that asks for JSON matching `EVAL_SCHEMA_JSON` and
        embeds the provided draft under '== DRAFT =='.
      - Run the evaluator agent (non-streaming) with a fresh UUID session_id to reduce cross-run
        context bleed, then normalize the response to text via `_to_text`.
      - Parse the response using `_json_from_messy` to tolerate minor formatting errors.
      - If 'overall' is missing but component scores exist, compute it via `_compute_overall`.

    Args:
      draft (str): The textual analysis to be evaluated.

    Returns:
      dict: A JSON-like Python dict conforming (as closely as possible) to `EVAL_SCHEMA_JSON`.
    """
    # Build an instruction that asks strictly for the JSON schema on the provided draft.
    prompt = (
            "Evaluate the following financial analysis draft.\n"
            "Return ONLY a valid JSON object matching this exact schema (no prose, no code fences):\n"
            f"{EVAL_SCHEMA_JSON}\n\n"
            "Rules:\n"
            "- scores.* must be within [0.0, 1.0]\n"
            "- scores.overall = mean(completeness, accuracy, clarity)\n"
            "- If unsure about a score, use a conservative value (e.g., 0.0–0.3) and list the reason in feedback.gaps\n"
            "- Keep arrays concise and specific\n\n"
            "== DRAFT ==\n" + (draft or "").strip()
    )
    # Run the evaluator agent non-streaming to simplify parsing.
    resp = evaluator_agent.run(prompt, stream=False, session_id=str(uuid.uuid4()))
    # Normalize the agent response (handles strings/generators/chunks) into plain text.
    raw = _to_text(resp)
    # Parse robustly into JSON, repairing if needed.
    data = _json_from_messy(raw)
    # If scores exist but 'overall' is missing, compute it to keep downstream logic simple.
    if isinstance(data, dict) and "scores" in data and "overall" not in data["scores"]:
        data["scores"]["overall"] = _compute_overall(data["scores"])
    # Return the structured feedback.
    return data


# Returns True if feedback meets readiness (overall >= READY_THRESHOLD and no 'gaps'); else False.
def _is_ready(feedback: dict, min_overall: float = READY_THRESHOLD) -> bool:
    """
    Decide whether a draft is "ready for delivery" based on evaluator feedback.

    Logic summary:
      - Read 'overall' from feedback.scores; if absent, compute via `_compute_overall`.
      - Check for any reported gaps under feedback.gaps.
      - Return True only when overall >= `min_overall` AND there are no gaps.

    Args:
      feedback (dict): Evaluator JSON-like output containing 'scores' and 'feedback'.
      min_overall (float): Threshold for readiness, defaults to global READY_THRESHOLD.

    Returns:
      bool: Readiness decision (True = ready, False = needs improvement).

    """
    try:
        # Safely read the scores object; default empty to avoid exceptions.
        scores = feedback.get("scores", {})
        # Pull 'overall' if present; otherwise compute from components.
        overall = float(scores.get("overall", _compute_overall(scores)))
        # Identify any remaining gaps (missing or weak areas) from feedback.
        gaps = feedback.get("feedback", {}).get("gaps", [])
        # Ready when overall meets threshold and evaluator reported no gaps.
        return overall >= min_overall and not gaps
    except Exception:
        # Be conservative on any parsing/computation error.
        return False


# Threshold below which we allow optimization (≤ 0.50 means "too weak, optimize").
OPT_GATE_THRESHOLD = 0.50
# Marker used to tell the optimizer loop to skip optimization entirely.
SKIP_OPT_MARK = "===SKIP_OPTIMIZATION==="

# Decides whether to run optimization based on pre-eval overall score; passes draft forward accordingly.
def gate_optimization_executor(step_input) -> StepOutput:
    """
    Gate the optimization step using the pre-evaluation overall score.

    Logic summary:
      - Extract mixed content (display + draft) and isolate the draft using `_PRE_EVAL_MARK` if present.
      - Parse the PRE_EVAL_OVERALL marker if available; otherwise compute a fresh evaluation with `_evaluate`.
      - If `overall <= OPT_GATE_THRESHOLD` → return content that signals *OPTIMIZE* and passes the raw draft.
      - Else → prepend `SKIP_OPT_MARK` to the draft so the optimizer can fast-path and just attach feedback.

    Args:
      step_input: An Agno StepInput/StepOutput/list-like object carrying the prior step’s content.

    Returns:
      StepOutput: A banner + draft (optimize path) OR a banner + SKIP marker + draft (skip path).
    """
    mixed = _extract_text_from_ctx(step_input)

    if _PRE_EVAL_MARK in mixed:
        draft = mixed.split(_PRE_EVAL_MARK, 1)[1].lstrip()
    else:
        draft = mixed

    # Try to parse marker: ===PRE_EVAL_OVERALL=<number>===
    marker = "===PRE_EVAL_OVERALL="
    overall = None
    i = mixed.find(marker)
    if i != -1:
        j = mixed.find("===", i + len(marker))
        if j != -1:
            overall_str = mixed[i + len(marker): j].strip()
            try:
                overall = float(overall_str)
            except Exception:
                overall = None

    # Fallback: if marker not found or parse failed, evaluate now
    if overall is None:
        fb = _evaluate(draft)
        scores = fb.get("scores", {}) or {}
        overall = float(scores.get("overall", 0.0))

    if overall <= OPT_GATE_THRESHOLD:
        decision_banner = f"**Optimization Gate:** overall={overall:.2f} ≤ {OPT_GATE_THRESHOLD:.2f} → *OPTIMIZE*"
        return StepOutput(content=decision_banner + "\n\n" + draft)
    else:
        decision_banner = f"**Optimization Gate:** overall={overall:.2f} > {OPT_GATE_THRESHOLD:.2f} → *SKIP*"
        return StepOutput(content=decision_banner + "\n\n" + SKIP_OPT_MARK + "\n" + draft)


# Loop stop condition: stop immediately if SKIP marker present; else evaluate and check readiness.
def evaluator_end_condition(ctx) -> bool:
    """
    Determine whether the optimization loop should terminate.

    Logic summary:
      - Extract the current working text (which may start with `SKIP_OPT_MARK`).
      - If the content begins with the skip marker → terminate immediately.
      - Otherwise, run `_evaluate` on the current draft and log the scores,
        then call `_is_ready` (overall ≥ READY_THRESHOLD and no gaps) to decide.

    Args:
      ctx: The loop context (StepInput/StepOutput/list/string) holding the current draft state.

    Returns:
      bool: True to stop the loop; False to continue another iteration.
    """
    # Normalize ctx to get the current text under evaluation.
    current_text = _extract_text_from_ctx(ctx)
    # If previous step requested skipping optimization, stop the loop right away.
    if current_text.lstrip().startswith(SKIP_OPT_MARK):
        return True
    # Run evaluator on the current draft content.
    feedback = _evaluate(current_text)
    # show the scores block for visibility.
    print("[LOOP] end_condition: scores =", feedback.get("scores"))
    # Return True to stop if the draft meets quality threshold and has no gaps; otherwise continue.
    return _is_ready(feedback, READY_THRESHOLD)


# Optimizer step: either skip (when marked) or ask the research agent to rewrite and then re-evaluate.
def optimizer_executor(step_input: StepInput) -> StepOutput:
    """
    Improve the draft (if needed) using the Investment Research Agent, then re-evaluate.

    Logic summary:
      - Extract the current text. If it starts with `SKIP_OPT_MARK`, strip the marker, evaluate once,
        attach the evaluation JSON, and return (fast path).
      - Otherwise, strip any prior "=== Evaluation Feedback ===" block to isolate the pure draft.
      - Parse any prior evaluation JSON that follows that block and pass it as guidance.
      - Build a targeted rewrite prompt with goals and embedded prior feedback; run the
        `investment_research_agent` to produce an improved draft.
      - Re-evaluate the improved draft using `_evaluate`, attach the evaluation JSON, and return.

    Args:
      step_input (StepInput): The loop’s incoming payload containing the current draft (and possibly feedback).

    Returns:
      StepOutput: The improved (or unchanged, if skipped) draft annotated with a fresh evaluation block.
    """
    # Get the working text from the loop (could include the skip marker).
    text = _extract_text_from_ctx(step_input).lstrip()

    # Fast path: if gate said to skip, keep the draft and attach one evaluation for loop termination.
    if text.startswith(SKIP_OPT_MARK):
        # Remove the skip marker line and keep the remaining draft.
        draft = text.split("\n", 1)[1] if "\n" in text else ""
        # Evaluate once so the loop condition has scores to inspect.
        feedback_after = _evaluate(draft)
        # Annotate the draft with evaluation JSON and a note indicating we skipped optimization.
        annotated = draft + "\n\n=== Evaluation Feedback ===\n" + json.dumps(feedback_after, indent=2) \
                    + "\n\n(Optimization skipped because overall > 0.50)"
        # Return annotated content (draft + feedback) so the next end_condition can stop cleanly.
        return StepOutput(content=annotated)

    # If previous iteration already appended a feedback block, strip it before rewriting.
    parts = text.rsplit("=== Evaluation Feedback ===", 1)
    # Keep only the pure draft for the optimizer prompt.
    draft_only = parts[0].strip()

    # If there was prior feedback, try to parse it and pass as guidance to the optimizer.
    prior_feedback = {}
    if len(parts) == 2:
        try:
            prior_feedback = json.loads(parts[1])
        except Exception:
            # If it wasn't valid JSON, ignore gracefully.
            prior_feedback = {}

    # Construct an instruction for the Investment Research Agent to directly rewrite the draft.
    optimizer_prompt = "\n".join([
        # Set the role and expectation (rewrite, not meta-plan).
        "You are improving the following financial research draft.",
        # Clarify concrete goals to align the rewrite.
        "Goals:",
        "- Address all gaps and priority fixes.",
        "- Improve completeness, accuracy, and clarity.",
        "- Keep structure tight; use headings and bullets sparingly.",
        "",
        # Provide the draft text.
        "== DRAFT ==",
        draft_only,
        "",
        # Provide the prior evaluation JSON to steer the revision.
        "== EVALUATION JSON ==",
        json.dumps(prior_feedback, indent=2),
        "",
        # Force a direct revised report as the output.
        "== INSTRUCTIONS ==",
        "Rewrite the draft, directly producing the improved report with the fixes applied.",
        "Do not output a plan—output the revised analysis only.",
    ])

    # Ask the research agent for the improved draft; non-streaming avoids generator handling here.
    improved_resp = investment_research_agent.run(optimizer_prompt, stream=False)
    # Normalize the agent response to plain text.
    improved_text = _to_text(improved_resp)
    # Evaluate the improved draft so the loop has fresh metrics for the end condition.
    feedback_after = _evaluate(improved_text)
    # Attach the evaluation block to the improved draft (loop checks this next).
    annotated = improved_text + "\n\n=== Evaluation Feedback ===\n" + json.dumps(feedback_after, indent=2)
    # Return annotated text to the loop.
    return StepOutput(content=annotated)


# Final step to print a clean report (without attached evaluation JSON) in an Agno-styled box.
def finalize_print_executor(step_input) -> StepOutput:
    """
    Produce the final, clean report for end-users by stripping internal evaluation artifacts.

    Logic summary:
      - Extract the latest working text (which may contain an appended evaluation JSON block).
      - Remove everything from the first occurrence of "\n\n=== Evaluation Feedback ===".
      - Return only the cleaned analysis text for display/export.

    Args:
      step_input: The incoming StepInput/StepOutput/list holding the annotated draft.

    Returns:
      StepOutput: A clean StepOutput containing only the final analysis (no meta).
    """
    # Extract the latest content (likely improved draft + evaluation block).
    text = _extract_text_from_ctx(step_input)
    # Strip the evaluation block so end-users see only the polished report.
    cleaned = text.split("\n\n=== Evaluation Feedback ===", 1)[0].strip()
    # Return the cleaned report; Agno will render it in a boxed step.
    return StepOutput(content=cleaned)


# Agno Loop that runs the optimizer step repeatedly until the end condition says stop.
# Loop configuration summary:
#   - name: Human-readable identifier used in logs.
#   - steps: A single optimization step (`optimizer_executor`) executed per iteration.
#   - end_condition: `evaluator_end_condition` decides when to stop (skip marker or readiness).
#   - max_iterations: Safety cap (here 1) to prevent runaway optimization cycles.
eval_opt_loop = Loop(
    # Give the loop a descriptive name for logging.
    name="Evaluator–Optimizer Loop",
    steps=[Step(name="Optimize Draft", executor=optimizer_executor)],
    # This function is called after each iteration to decide whether to continue or stop.
    end_condition=evaluator_end_condition,
    # Cap the number of iterations to avoid runaway loops (1 = single pass optimization).
    max_iterations=1,
)

### Prompt Chaining and Evaluator-Optimizer Workflow

In [12]:
def parse_preprocessing_output(step_input: StepInput) -> dict:
    """
    Parses the JSON output from the preprocessing agent.

    Args:
        output (str): The JSON string output from the preprocessing agent.

    Returns:
        dict: Parsed dictionary containing ticker, action_item, and data_types.
    """
    try:
        output = step_input.get_step_content("Preprocess Input") or ""
        if not output.strip():
            return {}
        # Clean the output to ensure it's valid JSON
        cleaned_output = "\n".join(
            line for line in output.splitlines() if not line.strip().startswith("```")
        )
        return loads(cleaned_output)
    except Exception as e:
        print(f"Error parsing preprocessing output: {e}")
        return {}


def multi_agent_router_executor(step_input: StepInput) -> StepOutput:
    """
    Executor function to route tasks to multiple agents based on input parameters.

    Args:
        step_input (StepInput): Input containing content and agent types.

    Returns:
        StepOutput: Output from the routed agents.
    """
    data = parse_preprocessing_output(step_input)
    ticker = data.get("ticker")
    action_item = data.get("action_item")
    agent_types = data.get("data_types", [])

    # Call the multi-agent routing function with the provided content and agent types
    result = multi_agent_routing(f"Stock Ticker: {ticker}\n{action_item}", agent_types)
    output = ""
    for agent, response in result["responses"].items():
        output += f"{agent}: {response}\n\n"

    return StepOutput(content=output)


def stored_data_retriever_executor(step_input: StepInput) -> StepOutput:
    """
    Executor function to retrieve stored data from memory based on input parameters.

    Args:
        step_input (StepInput): Input containing content and agent types.

    Returns:
        StepOutput: Output from the memory agent.
    """
    data = parse_preprocessing_output(step_input)
    ticker = data.get("ticker")
    action_item = data.get("action_item")

    memory_prompt = "\n".join(
        [
            f"Retrieve any stored information related to {ticker} that might assist with the following request: {action_item}.",
            "This is what we already know, do not repeat it: ",
            step_input.previous_step_content,
            "If no relevant information is found, respond with 'No relevant information found.'",
        ]
    )
    memory_response = memory_agent.run(memory_prompt).content

    if "No relevant information found." not in memory_response:
        return StepOutput(
            content="\n".join(
                [
                    step_input.previous_step_content,
                    f"\n\nMemory Agent: {memory_response}",
                ]
            )
        )

    return StepOutput(content=step_input.previous_step_content)


def select_synthesis_executor(step_input) -> StepOutput:
    ctx = step_input.previous_step_content or step_input.content
    chosen = _extract_text_from_ctx(ctx)
    return StepOutput(content=chosen)


workflow = Workflow(
    name="Analysis workflow",
    steps=[
        # Parse the user input to extract ticker, action item, and data types
        Step(name="Preprocess Input", agent=preprocessing_agent),
        # Gather data using the multi-agent router
        Step(
            name="Gather Data",
            executor=multi_agent_router_executor,
        ),
        # Retrieve any additional stored data from memory
        Step(
            name="Retrieve Stored Data",
            executor=stored_data_retriever_executor,
        ),
        # Synthesize the analysis and store relevant information in memory
        Parallel(
            Step(
                name="Synthesize Analysis",
                agent=investment_research_agent,
            ),
            Step(
                name="Store Data in Memory",
                agent=memory_agent,
            ),
        ),
        # fetch the text from investment_research_agent out of the Parallel results
        Step(name="Select Synthesis Output", executor=select_synthesis_executor),
        # Runs the evaluator, prints a score “matrix”, and annotates the text with a meta marker.
        Step(name="Pre-Optimization Evaluation (Display)", executor=show_pre_eval_metrics_executor),
        # skip the optimizer if overall score is  > 0.50
        Step(name="Gate Optimization (≤ 50% only)", executor=gate_optimization_executor),
        # iteratively improve the draft until scores/criteria are met
        eval_opt_loop,  
        # Prints the clean final report in an Agno box.. strips internal annotations
        Step(name="Finalize & Print Optimized Report", executor=finalize_print_executor),
    ],
)

## Example Usage

In [13]:
workflow.print_response(
    (
        "Analyze the financial health and market position of AMD "
        "based on recent earnings reports, market data, and news articles."
    ),
    stream=True,
    stream_intermediate_steps=True,
)

Output()