USD MS AAI - 510 Machine Learning - Final Project

## Financial Agentic System

Group 5: Antonio Recalde, Ajmal Jalal, Darin Verduzco

GitHub: https://github.com/victorhg/aai-520-final-project-group5

News sources: Yahoo Finance and NewsAPI

## Overall project workflow
Planner and routing via Orchestrator -> Data Retrieval and Preprocessing via Ingestion -> Summarizer -> Memory worker -> Evaluator - > Optimizer -> Final output -> Memory worker 

Agent Functions 
1. Plans its research steps for a given stock symbol.
2. Uses tools dynamically (APIs, datasets, retrieval).
3. Self-reflects to assess the quality of its output.
4. Learns across runs (e.g., keeps brief memories or notes to improve future analyses).

Workflow Patterns 
1. Prompt Chaining: Ingest News → Preprocess → Classify → Extract → Summarize
2. Routing: Direct content to the right specialist (e.g., earnings, news, or market analyzers).
3. Evaluator–Optimizer: Generate analysis → evaluate quality → refine using feedback.

## Install requirements

In [1]:
!pip install yfinance feedparser requests python-dotenv typing_extensions pydantic langgraph IPython langchain-openai

Defaulting to user installation because normal site-packages is not writeable


## Import Libraries

In [2]:
# ---------------------------
# Data Handling & I/O
# ---------------------------
import os
import json
import re
import html
from datetime import datetime, timezone
import yfinance as yf
import feedparser
import requests
from dotenv import load_dotenv

# ---------------------------
# Typing & Preprocessing Utilities
# ---------------------------
from __future__ import annotations
from typing import Dict, Any, List, Optional, Union, TypedDict, Literal
from typing_extensions import Annotated
from concurrent.futures import ThreadPoolExecutor, as_completed

# ---------------------------
# Models / Evaluation
# ---------------------------
from pydantic import BaseModel, Field
from langgraph.graph import StateGraph, START, END
from IPython.display import Image, display, Markdown
from langchain_openai import ChatOpenAI

# Load "news_openai.env" environment key file
load_dotenv("news_openai.env")
# Set NewsAPI key
NEWS_API_KEY = os.getenv("NEWS_API_KEY")
# OpenAI API key loaded via "_initialize_openai_llm" with env variable "OPENAI_API_KEY"

# Step 1 - Ingestion
Combines data from financial_data.py (Yahoo Finance) and news_data.py (NewsAPI) using "Ingestion" worker

In [3]:
# ---------------------------
# worker/base_worker.py
# ---------------------------
class BaseWorker:
    def execute(self, *inputs):
        """Execute the worker's main function. To be overridden by subclasses."""
        raise NotImplementedError("This method should be overridden by subclasses.")

# ---------------------------
# (step 1)
# ingestion/financial_data.py (file 1 of 3)
# (Yahoo Finance data fetching, metrics calculation, and error handling, data structuring.)
# ---------------------------

"""
NLP-5: Financial Data Ingestion
Fetch stock data from Yahoo Finance API (yfinance)
"""

class FinancialDataIngestion(BaseWorker):
    """
    Fetches financial data from Yahoo Finance.
    
    Responsibilities:
    - Fetch historical OHLCV (Open, High, Low, Close, Volume) data
    - Fetch company information and fundamentals
    - Calculate basic metrics from raw data
    - Handle API errors gracefully
    - Return structured financial data
    """
    
    def execute(self, *inputs) -> Dict[str, Any]:
        """
        Fetch financial data for a given stock symbol.
        
        Args:
            inputs[0] (str): Stock ticker symbol (e.g., "AAPL")
            inputs[1] (str, optional): Time period (default: "1mo")
                Valid periods: 1d, 5d, 1mo, 3mo, 6mo, 1y, 2y, 5y, 10y, ytd, max
        
        Returns:
            dict: Financial data bundle containing:
                - symbol: Stock ticker
                - price metrics: Current price, changes, highs/lows
                - volume metrics: Current and average volume
                - volatility: Calculated volatility
                - fundamentals: P/E ratio, market cap, beta, etc.
                - company info: Sector, industry, summary
                - historical_data: Recent OHLCV records
                - status: Success or error status
        """
        try:
            symbol = inputs[0]
            period = inputs[1] if len(inputs) > 1 else "1mo"

            stock = yf.Ticker(symbol)

            # Get historical data and info
            hist = stock.history(period=period)
            
            if hist.empty:
                return {
                    "source": "yahoo_finance",
                    "symbol": symbol,
                    "data": None,
                    "status": "error",
                    "error": f"No data found for symbol {symbol}",
                    "timestamp": datetime.now().isoformat()
                }
            
            info = stock.info
            
            # Calculate basic metrics
            current_price = hist['Close'].iloc[-1]
            prev_close = info.get('previousClose', hist['Close'].iloc[-2] if len(hist) > 1 else current_price)
            price_change = current_price - prev_close
            price_change_pct = (price_change / prev_close) * 100 if prev_close != 0 else 0
            
            # Calculate volatility (30-day annualized)
            returns = hist['Close'].pct_change().dropna()
            volatility = returns.tail(30).std() * (252 ** 0.5) if len(returns) > 0 else 0
            
            # Get volume metrics
            avg_volume = hist['Volume'].tail(30).mean()
            current_volume = hist['Volume'].iloc[-1]

            # Raw data result from yf
            result = {
                "source": "yahoo_finance",
                "symbol": symbol,
                "data": {
                    "current_price": float(current_price),
                    "price_change": float(price_change),
                    "price_change_pct": float(price_change_pct),
                    "volume": int(current_volume),
                    "avg_volume_30d": float(avg_volume),
                    "volatility_30d": float(volatility),
                    "market_cap": info.get("marketCap"),
                    "pe_ratio": info.get("forwardPE"),
                    "dividend_yield": info.get("dividendYield"),
                    "52_week_high": info.get("fiftyTwoWeekHigh"),
                    "52_week_low": info.get("fiftyTwoWeekLow"),
                    "beta": info.get("beta"),
                    "sector": info.get("sector"),
                    "industry": info.get("industry"),
                    "company_summary": info.get("longBusinessSummary", "")[:500],
                    "historical_data": hist.tail(30).to_dict('records')
                },
                "status": "success",
                "error": None,
                "timestamp": datetime.now().isoformat()
            }
            
            return result
            
        except Exception as e:
            return {
                "source": "yahoo_finance",
                "symbol": inputs[0] if len(inputs) > 0 else "UNKNOWN",
                "data": None,
                "status": "error",
                "error": str(e),
                "timestamp": datetime.now().isoformat()
            }
# ---------------------------
# (step 1)
# ingestion/news_data.py (file 2 of 3)
# ---------------------------

"""
NLP-6: News Data Ingestion
Fetch news articles from NewsAPI and Yahoo Finance RSS
"""

class NewsDataIngestion(BaseWorker):
    """
    Fetches news data from multiple sources.
    
    Responsibilities:
    - Fetch news articles from NewsAPI
    - Fetch news from Yahoo Finance RSS feeds
    - Handle API errors and rate limits gracefully
    - Return structured list of articles
    """
    
    DEFAULT_LIMIT = 10
    
    def __init__(self):
        super().__init__()

    def execute(self, *inputs) -> Dict[str, Any]:
        """
        Fetch news articles for a given stock symbol.
        
        Args:
            inputs[0] (str): Stock ticker symbol (e.g., "AAPL")
            inputs[1] (int, optional): Number of articles to fetch (default: 10)
        
        Returns:
            dict: News data bundle containing:
                - articles: List of articles from all sources
                - sources_queried: Which sources were successfully queried
                - total_count: Total number of articles fetched
                - status: Success or error status
        """
        try:
            symbol = inputs[0]
            limit = inputs[1] if len(inputs) > 1 else self.DEFAULT_LIMIT
            
            articles = []
            sources_queried = []
            errors = []
            
            # Fetch from Yahoo Finance RSS (half the limit)
            try:
                yahoo_articles = self._fetch_from_yahoo_rss(symbol, limit // 2)
                articles.extend(yahoo_articles)
                sources_queried.append("yahoo_rss")
            except Exception as e:
                errors.append({"source": "yahoo_rss", "error": str(e)})
            
            # Fetch from NewsAPI (half the limit)
            if NEWS_API_KEY:
                try:
                    newsapi_articles = self._fetch_from_newsapi(symbol, limit // 2)
                    articles.extend(newsapi_articles)
                    sources_queried.append("newsapi")
                except Exception as e:
                    errors.append({"source": "newsapi", "error": str(e)})
            else:
                errors.append({"source": "newsapi", "error": "NEWS_API_KEY not found in environment"})
            
            return {
                "source": "news_aggregated",
                "symbol": symbol,
                "data": {
                    "articles": articles,
                    "sources_queried": sources_queried,
                    "total_count": len(articles)
                },
                "status": "success" if len(articles) > 0 else "partial_success",
                "error": errors if len(errors) > 0 else None,
                "timestamp": datetime.now().isoformat()
            }
            
        except Exception as e:
            return {
                "source": "news_aggregated",
                "symbol": inputs[0] if len(inputs) > 0 else "UNKNOWN",
                "data": {
                    "articles": [],
                    "sources_queried": [],
                    "total_count": 0
                },
                "status": "error",
                "error": str(e),
                "timestamp": datetime.now().isoformat()
            }
    
    def _preprocess_text(self, text: str) -> str:
        # Remove <script>...</script> blocks (case-insensitive, dot matches newline)
        text = re.sub(r'(?is)<script.*?>.*?</script>', ' ', text)

        # Remove any remaining HTML tags
        text = re.sub(r'<[^>]+>', ' ', text)

        # Remove javascript: URIs and inline event handlers like onload=, onclick= etc.
        text = re.sub(r'(?i)javascript\s*:', '', text)
        text = re.sub(r'(?i)on\w+\s*=\s*["\'].*?["\']', ' ', text)

        # Remove control characters
        text = re.sub(r'[\x00-\x1f\x7f]', ' ', text)

        # Unescape HTML entities then escape to ensure safe plain text
        text = html.unescape(text)
        text = html.escape(text)

        # Collapse multiple whitespace to single space and trim
        text = re.sub(r'\s+', ' ', text).strip()

        return text
    
    def _fetch_from_newsapi(self, symbol: str, limit: int) -> List[Dict[str, Any]]:
        """
        Fetch articles from NewsAPI.
        
        Args:
            symbol: Stock ticker symbol
            limit: Maximum number of articles
            
        Returns:
            List of article dictionaries
        """
        url = "https://newsapi.org/v2/everything"
        params = {
            "q": f"{symbol} stock",
            "language": "en",
            "sortBy": "publishedAt",
            "pageSize": limit,
            "apiKey": NEWS_API_KEY
        }
        
        response = requests.get(url, params=params, timeout=10)
        
        if response.status_code == 200:
            data = response.json()
            articles = []
            for article in data.get("articles", []):
                processed_summary = self._preprocess_text(article.get("description", ""))   
                articles.append({
                    "title": article.get("title", ""),
                    "link": article.get("url", ""),
                    "published": article.get("publishedAt", ""),
                    "summary": processed_summary,
                    "source": article.get("source", {}).get("name", "NewsAPI")
                })
            return articles
        else:
            raise Exception(f"NewsAPI request failed with status {response.status_code}")
    
    def _fetch_from_yahoo_rss(self, symbol: str, limit: int) -> List[Dict[str, Any]]:
        """
        Fetch articles from Yahoo Finance RSS feed.
        
        Args:
            symbol: Stock ticker symbol
            limit: Maximum number of articles
            
        Returns:
            List of article dictionaries
        """
        yahoo_rss = f"https://feeds.finance.yahoo.com/rss/2.0/headline?s={symbol}&region=US&lang=en-US"
        feed = feedparser.parse(yahoo_rss)
        
        articles = []
        for entry in feed.entries[:limit]:
            processed_summary = self._preprocess_text(entry.get("summary", ""))
            articles.append({
                "title": entry.get("title", ""),
                "link": entry.get("link", ""),
                "published": entry.get("published", ""),
                "summary": processed_summary,
                "source": "Yahoo Finance"
            })
        
        return articles

# ---------------------------
# (step 1)
# ingestion/ingestion.py (file 3 of 3)
# ---------------------------
"""
Main Ingestion Coordinator
Orchestrates parallel data fetching from financial and news sources (simplified)
"""

class Ingestion(BaseWorker):
    """
    Main Ingestion Coordinator that fetches data from financial and news sources in parallel.
    
    Responsibilities:
    - Coordinate parallel data fetching from financial and news sources
    - Combine results into single bundle
    - Handle partial failures gracefully
    - Track errors from each source
    - Return complete data bundle
    """
    
    def __init__(self):
        """Initialize data ingestors."""
        self.financial_ingestor = FinancialDataIngestion()
        self.news_ingestor = NewsDataIngestion()
    
    def execute(self, *inputs) -> Dict[str, Any]:
        """
        Execute parallel data ingestion from all sources.
        
        Args:
            inputs[0] (str): Stock ticker symbol (e.g., "AAPL")
            inputs[1] (str, optional): Time period for historical data (default: "1mo")
            inputs[2] (int, optional): Number of news articles (default: 10)
        
        Returns:
            dict: Complete data bundle with financial and news data
        """
        try:
            # Extract parameters
            symbol = inputs[0] if len(inputs) > 0 else "AAPL"
            period = inputs[1] if len(inputs) > 1 else "1mo"
            news_limit = inputs[2] if len(inputs) > 2 else 10
            
            # Execute parallel fetching
            results = self._execute_parallel(symbol, period, news_limit)
            
            # Combine results
            bundle = self._combine_results(symbol, results)
            
            return bundle
            
        except Exception as e:
            return {
                "symbol": inputs[0] if len(inputs) > 0 else "UNKNOWN",
                "timestamp": datetime.now().isoformat(),
                "financial_data": None,
                "news_data": None,
                "errors": [{"source": "ingestion_coordinator", "error": str(e)}],
                "status": "error"
            }
    
    def _execute_parallel(self, symbol: str, period: str, news_limit: int) -> Dict[str, Any]:
        """
        Execute all ingestion tasks in parallel.
        
        Args:
            symbol: Stock ticker symbol
            period: Time period for historical data
            news_limit: Number of news articles
            
        Returns:
            Dictionary with results from all sources
        """
        results = {
            "financial": None,
            "news": None
        }
        
        with ThreadPoolExecutor(max_workers=2) as executor:
            # Submit tasks
            future_to_source = {
                executor.submit(self.financial_ingestor.execute, symbol, period): "financial",
                executor.submit(self.news_ingestor.execute, symbol, news_limit): "news"
            }
            
            # Collect results as they complete
            for future in as_completed(future_to_source):
                source = future_to_source[future]
                try:
                    result = future.result()
                    results[source] = result
                except Exception as e:
                    results[source] = {
                        "source": source,
                        "data": None,
                        "status": "error",
                        "error": str(e),
                        "timestamp": datetime.now().isoformat()
                    }
        
        return results
    
    def _combine_results(self, symbol: str, results: Dict[str, Any]) -> Dict[str, Any]:
        """
        Combine results from all ingestors into single bundle.
        
        Args:
            symbol: Stock ticker symbol
            results: Dictionary with results from each ingestor
            
        Returns:
            Combined data bundle
        """
        errors = []
        
        # Extract financial data
        financial_result = results.get("financial", {})
        financial_data = financial_result.get("data") if financial_result.get("status") == "success" else None
        if financial_result.get("error"):
            errors.append({"source": "financial", "error": financial_result.get("error")})
        
        # Extract news data
        news_result = results.get("news", {})
        news_data = news_result.get("data") if news_result.get("status") in ["success", "partial_success"] else None
        if news_result.get("error"):
            errors.append({"source": "news", "error": news_result.get("error")})
        
        # Determine overall status
        status = self._determine_status(results)
        
        return {
            "symbol": symbol,
            "timestamp": datetime.now().isoformat(),
            "financial_data": financial_data,
            "news_data": news_data,
            "errors": errors,
            "status": status
        }
    
    def _determine_status(self, results: Dict[str, Any]) -> str:
        """
        Determine overall ingestion status based on results.
        
        Args:
            results: Dictionary with results from each ingestor
            
        Returns:
            Status string: "success", "partial_success", or "error"
        """
        success_count = 0
        total_count = len(results)
        
        for source, result in results.items():
            if result and result.get("status") in ["success", "partial_success"]:
                success_count += 1
        
        if success_count == total_count:
            return "success"
        elif success_count > 0:
            return "partial_success"
        else:
            return "error"

# STEP 2 - Summarizer 
- Formats data into structured insight with routed notes and confidence scores
- stored as summary_result["summary"]

# STEP 3 - Memory 
- stores retrieves and searches short text memory with tags for agent persistence
- saves to data/agent_memory.json

In [4]:
# ---------------------------
# (step 2)
# src/summarizer/summarizer.py
# ---------------------------

ROUTES = {
    "earnings": ["eps", "guidance", "revenue", "call", "forecast", "beat", "miss", "margin"],
    "macro":    ["fed", "rate", "cpi", "inflation", "jobs", "gdp", "unemployment", "yields", "oil"],
    "company":  ["product", "launch", "recall", "supply", "lawsuit", "merger", "partnership", "contract"],
}

def _route(text: str) -> str:
    t = (text or "").lower()
    for route, keys in ROUTES.items():
        if any(k in t for k in keys):
            return route
    return "company"

PROMPT_TEMPLATE = """You are a pragmatic equity analyst.
Goal: {goal}
Symbol: {symbol}

Context (recent daily stats + sampled headlines):
{context}

Write 5–8 concise bullets on likely near-term price *drivers* and 2 bullets on *key risks*.
Avoid hype; be specific. Include dates or sources inline when present.
"""

class SummarizerWorker(BaseWorker):
    def __init__(self, name="summarizer", role="news_summary", model: str | None = None):
        super().__init__()
        self.name = name
        self.role = role
        self.model = model

    def execute(self, inputs: Dict[str, Any]) -> Dict[str, Any]:
        symbol: str = inputs["symbol"]
        news_daily = inputs.get("news_daily")
        raw_news   = inputs.get("raw_news")
        window     = int(inputs.get("window", 7))
        goal       = inputs.get("analysis_goal", f"Next-week price drivers for {symbol}")

        context = self._format_context(news_daily, raw_news, window)
        routed  = self._route_headlines(raw_news)
        prompt  = PROMPT_TEMPLATE.format(goal=goal, symbol=symbol, context=context)

        summary_text = (
            "(Stubbed summary — replace with your LLM call)\n"
            + prompt
            + "\n- Headlines cluster around a few catalysts; monitor official updates.\n"
              "- Tone is slightly positive; momentum sensitive to macro prints.\n"
              "- Risks: guidance/margin pressure; policy surprises."
        )

        confidence = self._confidence_from_news(news_daily, window)
        memory_writes = [
            f"[{symbol}] {window}d summary (conf={confidence:.2f})",
            f"[{symbol}] Routes: " + ", ".join([k for k, v in routed.items() if v])
        ]

        return {
            "symbol": symbol,
            "summary": summary_text,
            "routed_notes": routed,
            "confidence": confidence,
            "artifacts": {"prompt": prompt},
            "memory_writes": memory_writes,
        }

    # -------- helpers --------
    def _format_context(self, news_daily, raw_news: Union[List[dict], "pd.DataFrame", None], window: int) -> str:
        parts: List[str] = []
        if news_daily is not None and hasattr(news_daily, "tail") and len(news_daily) > 0:
            tail = news_daily.tail(window)
            parts.append("Daily sentiment (most recent first):")
            for idx, row in tail.iloc[::-1].iterrows():
                parts.append(
                    f"- {idx.date()}: count={int(row.get('news_count', 0))}, "
                    f"sent_mean={row.get('sent_mean', 0):+.3f}, decay={row.get('sent_decay', 0):+.3f}"
                )
        if raw_news is not None:
            try:
                import pandas as pd
                df = raw_news if isinstance(raw_news, pd.DataFrame) else pd.DataFrame(raw_news)
                ts = "published" if "published" in df.columns else (df.columns[0] if len(df.columns) else None)
                if ts:
                    df = df.sort_values(by=ts).tail(12)
                parts.append("Recent headlines:")
                for _, r in df.iterrows():
                    ttl = str(r.get("title", ""))[:160]
                    src = r.get("source", "") or "news"
                    dt  = r.get("published", r.get("date", ""))
                    parts.append(f"- [{dt}] ({src}) {ttl}")
            except Exception:
                pass
        return "\n".join(parts) if parts else "No recent news."

    def _route_headlines(self, raw_news) -> dict:
        routed = {"earnings": [], "macro": [], "company": []}
        if raw_news is None:
            return routed
        try:
            import pandas as pd
            df = raw_news if isinstance(raw_news, pd.DataFrame) else pd.DataFrame(raw_news)
            for _, r in df.tail(50).iterrows():
                ttl = str(r.get("title", "")) or ""
                routed[_route(ttl)].append(ttl)
        except Exception:
            pass
        return {k: v[:5] for k, v in routed.items()}

    def _confidence_from_news(self, news_daily, window: int) -> float:
        if news_daily is None or not hasattr(news_daily, "tail") or len(news_daily) == 0:
            return 0.50
        try:
            avg_cnt = float(news_daily.tail(window)["news_count"].mean())
            return round(min(1.0, 0.5 + 0.05 * avg_cnt), 2)
        except Exception:
            return 0.50

# ---------------------------
# (step 3)
# src/memory/memory.py
# ---------------------------

# Simple in-memory store for this example. For persistence, use a file or DB.
_memory_store: List[Dict[str, Any]] = []
MEMORY_FILE = "./data/agent_memory.json"

def _load_memory():
    """Loads memories from the JSON file into the in-memory list."""
    global _memory_store
    if not os.path.exists(MEMORY_FILE):
        _memory_store = []
        return
    try:
        with open(MEMORY_FILE, "r", encoding="utf-8") as f:
            _memory_store = json.load(f)
    except (json.JSONDecodeError, FileNotFoundError):
        _memory_store = []

def _save_memory():
    """Saves the in-memory list of memories to the JSON file."""
    os.makedirs(os.path.dirname(MEMORY_FILE), exist_ok=True)
    with open(MEMORY_FILE, "w", encoding="utf-8") as f:
        json.dump(_memory_store, f, indent=2)

# Load memory when module is imported
_load_memory()

class MemoryWorker(BaseWorker):
    """
    A BaseWorker implementation for managing agent memories. It stores short text
    memories with optional tags in a JSON file so the agent can learn across runs.
    """
    def execute(self, *inputs) -> Any:
        """
        Manages agent memories. The first input is the operation ('add', 'search', 'get_recent').

        Usage:
            - execute('add', 'some memory text', ['tag1', 'tag2'])
            - execute('search', 'query text')
            - execute('get_recent', 5)
        """
        if not inputs:
            raise ValueError("MemoryWorker requires at least one input for the operation.")

        operation = inputs[0]

        if operation == 'add':
            if len(inputs) < 2:
                raise ValueError("The 'add' operation requires text for the memory.")
            text = inputs[1]
            tags = inputs[2] if len(inputs) > 2 else []
            entry = {
                "timestamp": datetime.now(timezone.utc).isoformat(),
                "text": text,
                "tags": tags,
            }
            _memory_store.append(entry)
            _save_memory()
            return entry
        
        elif operation == 'search':
            if len(inputs) < 2:
                raise ValueError("The 'search' operation requires a query string.")
            query = inputs[1].lower()
            top_k = inputs[2] if len(inputs) > 2 else 5
            
            matches = [
                m for m in reversed(_memory_store) 
                if query in m['text'].lower() or any(query in t.lower() for t in m.get('tags', []))
            ]
            return matches[:top_k]

        elif operation == 'get_recent':
            n = inputs[1] if len(inputs) > 1 else 5
            return list(reversed(_memory_store))[:n]

        else:
            raise ValueError(f"Unknown operation: {operation}. Available operations: 'add', 'search', 'get_recent'.")

# Step 4 - Evaluator-optimizer

In [5]:
# ---------------------------
# (step 4)
# evaluator_optimizer/init.py
# ---------------------------

"""
Evaluator-Optimizer Module

Implements the Evaluator-Optimizer workflow pattern for iterative refinement
of investment research summaries.
"""

__all__ = ["EvaluatorOptimizer", "Feedback", "State"]

# ---------------------------
# (step 4)
# evaluator_optimizer/evaluator_optimizer.py
# ---------------------------

"""
Evaluator-Optimizer Module

Implements the Evaluator-Optimizer workflow pattern where:
1. Generator creates an investment research summary
2. Evaluator assesses quality and provides feedback
3. Loop continues until quality passes or max iterations reached
4. Final summary is stored in memory

Based on LangGraph pattern: https://langchain-ai.github.io/langgraph/tutorials/workflows/#evaluator-optimizer
"""

# --- State Definition ---
class State(TypedDict):
    """Graph state for the Evaluator-Optimizer workflow"""
    symbol: str  # Stock symbol being analyzed
    instructions: str  # User's request/query about the stock
    context: Dict[str, Any]  # Financial data context (news, prices, etc.)
    summary: str  # Current investment research summary
    feedback: str  # Feedback from evaluator
    grade: str  # Quality grade: "pass" or "fail"
    quality_score: float  # Numeric quality score (0-10)
    issues: List[str]  # List of identified issues
    iteration: int  # Current iteration number
    max_iterations: int  # Maximum allowed iterations
    history: List[Dict[str, Any]]  # History of iterations for tracking

# --- Structured Output Schema for Evaluation ---
class Feedback(BaseModel):
    """Structured evaluation feedback from the Evaluator"""
    
    grade: Literal["pass", "fail"] = Field(
        description="Overall quality assessment: 'pass' if summary meets quality criteria, 'fail' otherwise"
    )
    quality_score: float = Field(
        description="Numeric quality score from 0-10, where 10 is excellent",
        ge=0.0,
        le=10.0
    )
    feedback: str = Field(
        description="Detailed, actionable feedback for improving the summary if grade is 'fail'"
    )
    issues: List[str] = Field(
        description="List of specific issues identified in the summary",
        default_factory=list
    )

# --- Evaluator-Optimizer Class ---
class EvaluatorOptimizer(BaseWorker):
    """
    Implements the Evaluator-Optimizer workflow pattern for investment research summaries.
    
    The workflow:
    1. Generator creates an initial summary from context data
    2. Evaluator assesses quality against defined criteria
    3. If quality fails, provides feedback and loops back to Generator
    4. Continues until quality passes or max iterations reached
    5. Returns final optimized summary
    """
    
    # Quality criteria for investment research summaries
    QUALITY_CRITERIA = """
    A high-quality investment research summary should:
    1. COMPLETENESS: Cover key financial metrics, sentiment analysis, and risk factors
    2. CLARITY: Be well-structured, concise, and easy to understand
    3. ACTIONABILITY: Include a clear investment recommendation (buy/sell/hold) with rationale
    4. EVIDENCE-BASED: Back claims with specific data from news and financial metrics
    5. COHERENCE: Have logical flow without contradictions
    6. RISK AWARENESS: Acknowledge both opportunities and risks
    """
    
    def __init__(self, llm: Optional[ChatOpenAI] = None, max_iterations: int = 3):
        """
        Initialize the Evaluator-Optimizer.
        
        Args:
            llm: Language model instance (if None, initializes OpenAI LLM from environment)
            max_iterations: Maximum refinement iterations before stopping
        """
        # Initialize LLM
        if llm is not None:
            self.llm = llm
        else:
            self.llm = self._initialize_openai_llm()
        
        self.max_iterations = max_iterations
        
        # Create structured output LLM for evaluation
        self.evaluator_llm = self.llm.with_structured_output(Feedback)
        
        # Build the workflow graph
        self.workflow = self._build_workflow()
    
    def _initialize_openai_llm(self) -> ChatOpenAI:
        """Initialize OpenAI LLM with API key from environment"""
        api_key = os.getenv("OPENAI_API_KEY")
        llm = ChatOpenAI(
            model="gpt-4o-mini",  # Using gpt-4o-mini for cost efficiency
            temperature=0.7,
            api_key=api_key
        )
        return llm
    
    def _build_workflow(self) -> StateGraph:
        """Builds the LangGraph workflow for Evaluator-Optimizer pattern"""
        
        # Create the graph
        builder = StateGraph(State)
        
        # Add nodes
        builder.add_node("generator", self._generator_node)
        builder.add_node("evaluator", self._evaluator_node)
        
        # Add edges
        builder.add_edge(START, "generator")
        builder.add_edge("generator", "evaluator")
        
        # Conditional edge: loop back to generator or end
        builder.add_conditional_edges(
            "evaluator",
            self._should_continue,
            {
                "continue": "generator",  # Loop back with feedback
                "end": END  # Quality passed or max iterations reached
            }
        )
        
        # Compile the workflow
        return builder.compile()
    
    def _generator_node(self, state: State) -> Dict[str, Any]:
        """
        Generator Node: Creates or refines the investment research summary.
        
        On first iteration: creates initial summary from context
        On subsequent iterations: refines summary based on evaluator feedback
        """
        symbol = state["symbol"]
        context = state["context"]
        instructions = state.get("instructions", "")
        feedback = state.get("feedback", "")
        iteration = state.get("iteration", 0)
        
        # Format the context data
        formatted_context = self._format_context(context)
        
        # Build the prompt
        if iteration == 0:
            # Initial summary generation
            prompt = f"""You are an expert financial analyst. Create a comprehensive investment research summary for {symbol}.

USER REQUEST:
{instructions}

AVAILABLE DATA:
{formatted_context}

QUALITY REQUIREMENTS:
{self.QUALITY_CRITERIA}

INSTRUCTIONS:
1. Directly address the user's request/questions in your analysis
2. Use the provided financial data, news, and market context to support your analysis
3. If the user asked specific questions, answer them explicitly
4. If data is missing or unavailable, acknowledge it and work with what's available
5. Provide a clear investment recommendation (buy/sell/hold) with detailed rationale
6. Structure your response professionally with clear sections
7. Back all claims with specific data points from the context

Generate a well-structured investment research summary that meets all quality criteria and addresses the user's request."""
        else:
            # Refinement based on feedback
            current_summary = state["summary"]
            prompt = f"""You are refining an investment research summary for {symbol}. 

USER REQUEST:
{instructions}

CURRENT SUMMARY:
{current_summary}

EVALUATOR FEEDBACK:
{feedback}

ISSUES IDENTIFIED:
{', '.join(state.get('issues', []))}

AVAILABLE DATA:
{formatted_context}

QUALITY REQUIREMENTS:
{self.QUALITY_CRITERIA}

INSTRUCTIONS:
1. Address all issues identified by the evaluator
2. Ensure the user's original request is still fully addressed
3. Improve clarity, completeness, and actionability
4. Add missing data points or analysis where needed
5. Maintain professional structure and tone

Generate an improved version that addresses all feedback and meets quality standards."""
        
        # Generate summary
        response = self.llm.invoke(prompt)
        new_summary = response.content
        
        # Update iteration count
        new_iteration = iteration + 1
        
        print(f"\n{'='*60}")
        print(f"GENERATOR - Iteration {new_iteration}")
        print(f"{'='*60}")
        print(f"User Request: {instructions[:80]}...")
        print(f"Summary generated ({len(new_summary)} characters)")
        if iteration > 0:
            print(f"Addressing feedback: {feedback[:100]}...")
        
        return {
            "summary": new_summary,
            "iteration": new_iteration
        }
    
    def _evaluator_node(self, state: State) -> Dict[str, Any]:
        """
        Evaluator Node: Assesses the quality of the summary and provides feedback.
        
        Uses structured output to return:
        - grade: "pass" or "fail"
        - quality_score: 0-10
        - feedback: actionable improvement suggestions
        - issues: specific problems identified
        """
        summary = state["summary"]
        symbol = state["symbol"]
        instructions = state.get("instructions", "")
        iteration = state["iteration"]
        
        # Evaluation prompt
        prompt = f"""You are a senior financial analyst evaluating an investment research summary for {symbol}.

USER'S ORIGINAL REQUEST:
{instructions}

SUMMARY TO EVALUATE:
{summary}

EVALUATION CRITERIA:
{self.QUALITY_CRITERIA}

ASSESSMENT REQUIREMENTS:
1. Does the summary directly address the user's request/questions?
2. Is the analysis backed by specific data points?
3. Are all quality criteria met (completeness, clarity, actionability, evidence-based, coherence, risk awareness)?
4. Is the investment recommendation clear and well-justified?
5. Are there any contradictions or unsupported claims?
6. Is the structure professional and easy to follow?

Provide:
- grade: "pass" if the summary meets professional standards and addresses the user's request, "fail" if significant improvements needed
- quality_score: 0-10 (be generous with 7+ for good work, reserve 9+ for exceptional analysis)
- feedback: Specific, actionable suggestions for improvement (if grade is "fail")
- issues: List specific problems (missing data, unclear reasoning, unanswered questions, etc.)

Be fair but thorough. A passing grade means the summary is publication-ready and fully addresses the user's needs."""
        
        # Get structured evaluation
        evaluation = self.evaluator_llm.invoke(prompt)
        
        # Track iteration history
        history = state.get("history", [])
        history.append({
            "iteration": iteration,
            "summary_length": len(summary),
            "grade": evaluation.grade,
            "quality_score": evaluation.quality_score,
            "issues_count": len(evaluation.issues)
        })
        
        print(f"\n{'='*60}")
        print(f"EVALUATOR - Iteration {iteration}")
        print(f"{'='*60}")
        print(f"Grade: {evaluation.grade.upper()}")
        print(f"Quality Score: {evaluation.quality_score}/10")
        print(f"Issues Found: {len(evaluation.issues)}")
        if evaluation.issues:
            for i, issue in enumerate(evaluation.issues, 1):
                print(f"  {i}. {issue}")
        print(f"Feedback: {evaluation.feedback[:150]}...")
        
        return {
            "grade": evaluation.grade,
            "quality_score": evaluation.quality_score,
            "feedback": evaluation.feedback,
            "issues": evaluation.issues,
            "history": history
        }
    
    def _should_continue(self, state: State) -> Literal["continue", "end"]:
        """
        Conditional routing: decide whether to continue refinement or end.
        
        Continue if:
        - Grade is "fail" AND
        - Haven't reached max iterations
        
        End if:
        - Grade is "pass" OR
        - Max iterations reached
        """
        grade = state["grade"]
        iteration = state["iteration"]
        max_iterations = state["max_iterations"]
        
        if grade == "pass":
            print(f"\nQuality PASSED - Ending optimization")
            return "end"
        elif iteration >= max_iterations:
            print(f"\nMax iterations ({max_iterations}) reached - Ending optimization")
            return "end"
        else:
            print(f"\nQuality FAILED - Continuing to iteration {iteration + 1}")
            return "continue"
    
    def _format_context(self, context: Dict[str, Any]) -> str:
        """Formats context data into a readable string for prompts"""
        formatted = []
        
        # Format financial data with key metrics
        if "financial_data" in context:
            fin_data = context["financial_data"]
            formatted.append("=== FINANCIAL METRICS ===")
            formatted.append(f"Symbol: {fin_data.get('symbol', 'N/A')}")
            formatted.append(f"Current Price: ${fin_data.get('current_price', 0):.2f}")
            formatted.append(f"Price Change: ${fin_data.get('price_change', 0):.2f} ({fin_data.get('price_change_pct', 0):.2f}%)")
            formatted.append(f"Volume: {fin_data.get('volume', 0):,}")
            formatted.append(f"Market Cap: ${fin_data.get('market_cap', 0):,}")
            formatted.append(f"P/E Ratio: {fin_data.get('pe_ratio', 0):.2f}")
            formatted.append(f"Dividend Yield: {fin_data.get('dividend_yield', 0):.2f}%")
            formatted.append(f"52-Week Range: ${fin_data.get('52_week_low', 0):.2f} - ${fin_data.get('52_week_high', 0):.2f}")
            formatted.append(f"Beta: {fin_data.get('beta', 0):.3f}")
            formatted.append(f"Sector: {fin_data.get('sector', 'N/A')}")
            formatted.append(f"Industry: {fin_data.get('industry', 'N/A')}")
            
            # Add volatility if available
            if 'volatility_30d' in fin_data:
                formatted.append(f"30-Day Volatility: {fin_data['volatility_30d']:.2%}")
            
            # Add company summary if available
            if 'company_summary' in fin_data and fin_data['company_summary']:
                formatted.append(f"\nCompany Overview: {fin_data['company_summary'][:300]}...")
            
            # Add recent price trends from historical data
            if 'historical_data' in fin_data and fin_data['historical_data']:
                hist = fin_data['historical_data']
                if len(hist) >= 5:
                    formatted.append("\nRecent Price Trend (Last 5 Days):")
                    for i, day in enumerate(hist[-5:], 1):
                        formatted.append(f"  Day {i}: Close=${day.get('Close', 0):.2f}, Volume={day.get('Volume', 0):,}")
        
        # Format news data
        if "news_data" in context:
            news = context["news_data"]
            if isinstance(news, dict) and "articles" in news:
                articles = news["articles"]
                total = news.get("total_count", 0)
                formatted.append(f"\n=== NEWS ANALYSIS ===")
                formatted.append(f"Total Articles Found: {total}")
                
                if articles and len(articles) > 0:
                    formatted.append("\nRecent Headlines:")
                    for i, article in enumerate(articles[:5], 1):
                        title = article.get('title', 'No title')
                        date = article.get('publishedAt', article.get('date', 'N/A'))
                        formatted.append(f"  {i}. {title} ({date})")
                else:
                    formatted.append("No recent news articles available.")
            else:
                formatted.append("\n=== NEWS ANALYSIS ===")
                formatted.append("News data format not recognized or unavailable.")
        
        # Format sentiment if available
        if "sentiment" in context:
            formatted.append(f"\n=== SENTIMENT ANALYSIS ===")
            formatted.append(f"Overall Sentiment: {context['sentiment']}")
        
        # Add any errors encountered
        if "errors" in context and context["errors"]:
            formatted.append(f"\n=== DATA COLLECTION NOTES ===")
            for error in context["errors"]:
                source = error.get("source", "unknown")
                error_detail = error.get("error", "Unknown error")
                formatted.append(f"Note: {source} - {error_detail}")
        
        # Add timestamp
        if "timestamp" in context:
            formatted.append(f"\nData Retrieved: {context['timestamp']}")
        
        return "\n".join(formatted) if formatted else "No context data available"
    
    def execute(self, symbol: str, context: Dict[str, Any], instructions: str) -> Dict[str, Any]:
        """
        Execute the Evaluator-Optimizer workflow.
        
        Args:
            symbol: Stock symbol to analyze
            context: Dictionary containing financial data, news, sentiment, etc.
            instructions: User's request/query (e.g., "Should I buy AAPL?" or "Analyze the potential for AAPL stock")
        
        Returns:
            Dictionary with:
                - final_summary: The optimized summary
                - quality_score: Final quality score
                - iterations: Number of iterations performed
                - history: Detailed iteration history
                - passed: Whether quality criteria were met
        """
        print(f"\n{'#'*60}")
        print(f"EVALUATOR-OPTIMIZER WORKFLOW")
        print(f"Symbol: {symbol}")
        print(f"User Request: {instructions}")
        print(f"Max Iterations: {self.max_iterations}")
        print(f"{'#'*60}")
        
        # Initialize state
        initial_state = {
            "symbol": symbol,
            "instructions": instructions,
            "context": context,
            "summary": summary_result["summary"], # Add initial summary from summarizer
            "feedback": "",
            "grade": "",
            "quality_score": 0.0,
            "issues": [],
            "iteration": 0,
            "max_iterations": self.max_iterations,
            "history": []
        }
        
        # Run the workflow
        final_state = self.workflow.invoke(initial_state)
        
        # Prepare results
        results = {
            "final_summary": final_state["summary"],
            "quality_score": final_state["quality_score"],
            "iterations": final_state["iteration"],
            "history": final_state["history"],
            "passed": final_state["grade"] == "pass",
            "final_grade": final_state["grade"],
            "issues": final_state["issues"]
        }
        
        print(f"\n{'#'*60}")
        print(f"WORKFLOW COMPLETE")
        print(f"{'#'*60}")
        print(f"Total Iterations: {results['iterations']}")
        print(f"Final Grade: {results['final_grade'].upper()}")
        print(f"Final Quality Score: {results['quality_score']}/10")
        print(f"Quality Passed: {'✅ YES' if results['passed'] else '❌ NO'}")
        print(f"{'#'*60}\n")
        
        return results
    
    def visualize(self, output_path: str = "evaluator_optimizer_graph.png"):
        """
        Visualize the workflow graph.
        
        Args:
            output_path: Path to save the graph image
        """
        try:
            img_data = self.workflow.get_graph().draw_mermaid_png()
            with open(output_path, "wb") as f:
                f.write(img_data)
            print(f"Workflow graph saved to: {output_path}")
            return Image(img_data)
        except Exception as e:
            print(f"Could not generate graph visualization: {e}")
            return None

# Step 0 - Orchestrator
- initialize workers
- planning - execute workers in order
- routing - pass outputs from one worker to the next

In [6]:
# --- Orchestrator Module ---
class Orchestrator:
    """
    Coordinates the workflow of all workers:
    1. Ingestion: fetch financial and news data
    2. Summarizer: generate summary from ingested news
    3. Memory: store summary and metadata
    4. EvaluatorOptimizer: refine the summary iteratively
    """

    # -----------------------------
    # Worker Initialization
    # -----------------------------
    def __init__(self):
        self.workers = {
            "ingestion": Ingestion(),
            "summarizer": SummarizerWorker(),
            "memory": MemoryWorker(),
            "evaluator_optimizer": EvaluatorOptimizer(),
        }

    # -----------------------------
    # Planning and Routing Execution
    # -----------------------------
    def execute(self, symbol: str, instructions: str) -> str:
        """Runs the full workflow and returns a formatted Markdown summary"""

        # --- Step 1: Fetch financial and news data ---
        ingestion_result = self.workers["ingestion"].execute(symbol)
        news_articles = (ingestion_result.get("news_data") or {}).get("articles", [])

        # --- Step 2: Summarize news ---
        summary_result = self.workers["summarizer"].execute({
            "symbol": symbol,
            "raw_news": news_articles,
            "news_daily": None,  # defaults confidence to 0.5 if not provided
            "window": 7,
            "analysis_goal": instructions
        })

        # --- Step 3: Store summary in memory ---
        for note in summary_result.get("memory_writes", []):
            self.workers["memory"].execute("add", note, [symbol, "summary"])

        # --- Step 4: Evaluator-Optimizer Workflow ---
        evaluator = self.workers["evaluator_optimizer"]
        initial_state = {
            "symbol": symbol,
            "instructions": instructions,
            "context": ingestion_result,
            "summary": summary_result["summary"],
            "feedback": "",
            "grade": "",
            "quality_score": 0.0,
            "issues": [],
            "iteration": 0,
            "max_iterations": evaluator.max_iterations,
            "history": []
        }

        final_result = evaluator.workflow.invoke(initial_state)

        # Step 5: Store final optimized summary in memory
        final_summary_text = final_result["summary"]
        memory_note = f"[{symbol}] Final Optimized Summary"
        self.workers["memory"].execute("add", memory_note, [symbol, "summary", final_summary_text])

        return final_summary_text  # returns plain Markdown string

# -----------------------------
# Helper Function to Run Analysis
# -----------------------------
def run_investment_analysis(symbol: str, instructions: str) -> str:
    orchestrator = Orchestrator()
    return orchestrator.execute(symbol, instructions)

In [7]:
# Initialize orchestrator
orchestrator = Orchestrator()

# Run analysis
result = run_investment_analysis("AAPL", "Analyze stock for the past 10 days...")
# Output result in Markdown format
display(Markdown(result))


GENERATOR - Iteration 1
User Request: Analyze stock for the past 10 days......
Summary generated (3623 characters)

EVALUATOR - Iteration 1
Grade: PASS
Quality Score: 8.0/10
Issues Found: 0
Feedback: ...

Quality PASSED - Ending optimization


# Investment Research Summary: Apple Inc. (AAPL)

## 1. Company Overview
Apple Inc. (AAPL) is a leading technology company known for designing, manufacturing, and marketing consumer electronics, software, and online services. Its product portfolio includes the iPhone, Mac computers, iPad tablets, and a variety of wearables and accessories. The company operates in a highly competitive market, continually innovating to maintain its leadership position.

## 2. Recent Financial Metrics
- **Current Price**: $247.77
- **Price Change**: $0.11 (0.04%)
- **Volume**: 35,447,900 shares traded
- **Market Cap**: $3.68 trillion
- **P/E Ratio**: 29.82
- **Dividend Yield**: 0.42%
- **52-Week Range**: $169.21 - $260.10
- **Beta**: 1.094 (indicating slightly higher volatility than the market)
- **Sector**: Technology
- **Industry**: Consumer Electronics
- **30-Day Volatility**: 25.07%

## 3. Recent Price Trend (Last 5 Days)
| Day       | Close Price | Volume        |
|-----------|-------------|---------------|
| Day 1    | $258.06     | 36,496,900    |
| Day 2    | $254.04     | 38,322,000    |
| Day 3    | $245.27     | 61,999,100    |
| Day 4    | $247.66     | 38,142,900    |
| Day 5    | $247.77     | 35,447,900    |

### Summary of Recent Price Movement
- The stock experienced volatility over the last five trading days, with a peak closing price of $258.06 on Day 1 and a low of $245.27 on Day 3.
- On average, the stock has shown resilience, recovering some losses to close at $247.77 on Day 5.

## 4. News Analysis
Recent news articles have highlighted several key developments impacting Apple:
1. **Demand Surge**: Apple has seen a surge in demand for its iPhone 17 in China, which may positively influence sales and revenue.
2. **Investment in China**: CEO Tim Cook has indicated plans to bolster investments in China, signaling confidence in one of its largest markets.
3. **Tax Law Lobbying in India**: Apple is reportedly lobbying the Indian government to modify tax laws, which could enhance its operational efficiency in the country.

These developments suggest a favorable sentiment towards Apple’s growth prospects, particularly in international markets.

## 5. Risk Factors
- **Market Volatility**: AAPL has a beta of 1.094, indicating that it is slightly more volatile than the overall market. Investors should be prepared for price fluctuations.
- **Global Economic Conditions**: Apple’s performance is closely tied to global economic conditions and consumer spending trends.
- **Regulatory Risks**: Ongoing lobbying efforts in India and potential regulatory changes in China could pose challenges.

## 6. Investment Recommendation
**Recommendation**: **Hold**

### Rationale
- **Positive Catalysts**: The strong demand for the iPhone 17 in China and potential regulatory changes in India could drive growth in revenue and market share.
- **Volatility Awareness**: While the stock has shown short-term volatility, the fundamentals remain strong with a healthy P/E ratio and consistent demand for its products.
- **Market Position**: Apple maintains a leading position in the technology sector, and any long-term investor may benefit from holding onto the stock, particularly given its history of innovation and brand loyalty.

### Conclusion
Given the current market conditions, recent price trends, and ongoing developments in Apple’s business strategy, a hold position is prudent. Investors should continue to monitor market conditions and company announcements, particularly regarding sales performance and global economic indicators, which could necessitate a reassessment of this recommendation.