In [71]:
# !pip install langgraph requests pandas matplotlib yfinance python-dateutil
# !pip install feedparser beautifulsoup4 lxml html5lib     # optional RSS fallback
# !pip install langchain-mcp-adapters mcp anyio            # optional MCP browsing
# !pip install openai                                      # optional LLM intent

In [72]:
# finance_multiagent_no_events_fixed_order.py
from __future__ import annotations

import os, math, time, re, hashlib, json
from dataclasses import dataclass, field
from typing import List, Dict, Any, Optional
from datetime import datetime, timedelta, timezone

# ---------- Plotting (headless) ----------
import matplotlib
matplotlib.use("Agg")
import matplotlib.pyplot as plt

# ---------- Data/HTTP ----------
import requests
import pandas as pd
import yfinance as yf
from dateutil import parser as dateparser

# ---------- LangGraph ----------
from langgraph.graph import StateGraph, START, END

# ---------- Optional modules ----------
# RSS fallback (no MCP)
try:
    import feedparser
    from bs4 import BeautifulSoup
    HAVE_RSS = True
except Exception:
    HAVE_RSS = False

# MCP (browsing)
HAVE_MCP = False
try:
    import anyio
    from langchain_mcp_adapters.client import MCPClient
    HAVE_MCP = True
except Exception:
    pass

# LLM (OpenAI or Azure OpenAI)
HAVE_OPENAI = False
try:
    from openai import OpenAI
    HAVE_OPENAI = True
except Exception:
    pass

In [73]:
# Config
USER_AGENT = {"User-Agent": "Mozilla/5.0 (compatible; FinanceAgent/1.0)"}

# Enable MCP servers if you have them; leave {} to auto-fallback to RSS
MCP_SERVERS: Dict[str, Dict[str, Any]] = {
    # "search":  {"command": "npx", "args": ["valueserp-googlesearch-mcp@latest"]},
    # "browser": {"command": "npx", "args": ["@djyde/mcp-browser@latest"]},
}

OPENAI_MODEL = os.getenv("OPENAI_MODEL", "gpt-4o-mini")
OPENAI_BASE_URL = os.getenv("OPENAI_BASE_URL")  # optional for Azure/proxy

US_EXCHANGES = {"NMS","NAS","NASDAQ","NYQ","NYSE","NYE","NCM","NGM","ASE","PCX","BATS"}


In [74]:
@dataclass
class State:
    question: str
    tickers: List[str] = field(default_factory=list)
    companies: List[str] = field(default_factory=list)

    start: Optional[datetime] = None
    end: Optional[datetime] = None
    interval: str = "1d"

    prices: Dict[str, pd.DataFrame] = field(default_factory=dict)
    quotes: Dict[str, Dict[str, Any]] = field(default_factory=dict)
    analysis: Dict[str, Any] = field(default_factory=dict)
    charts: Dict[str, str] = field(default_factory=dict)
    research: Dict[str, List[Dict[str, Any]]] = field(default_factory=dict)
    report: str = ""

In [75]:
# Helpers: dates/intervals
def _default_dates(start: Optional[datetime], end: Optional[datetime]):
    end = end or datetime.now(timezone.utc)
    start = start or (end - timedelta(days=182))  # ~6 months
    return start, end

def _pick_interval(start: datetime, end: datetime) -> str:
    days = (end - start).days
    if days <= 7: return "15m"
    if days <= 60: return "1h"
    if days <= 365: return "1d"
    return "1wk"

def _to_unix(dt: datetime) -> int:
    return int(dt.timestamp())

def _clean_text(x: str) -> str:
    return re.sub(r"\s+", " ", (x or "")).strip()

def _parse_date_or_none(s: Optional[str]) -> Optional[datetime]:
    if not s: return None
    try:
        return dateparser.parse(s)
    except Exception:
        return None
    
def _to_state(x) -> State:
    """Coerce a dict result from LangGraph into the State dataclass."""
    if isinstance(x, State):
        return x
    allowed = set(State.__annotations__.keys())
    return State(**{k: v for k, v in x.items() if k in allowed})

def _print_result(x):
    s = _to_state(x)
    print(s.report)
    print(s.charts)

In [76]:
# LLM intent extraction
def _heuristic_parse(question: str) -> Dict[str, Any]:
    out: Dict[str, Any] = {"companies": [], "tickers": [], "date_from": None, "date_to": None}
    out["tickers"] = re.findall(r"\b[A-Z]{1,5}\b", question)  # quick ALLCAPS capture
    m = re.search(r"from ([\w\-\s:\/]+) to ([\w\-\s:\/]+)", question, re.I)
    if m:
        out["date_from"], out["date_to"] = m.group(1), m.group(2)
    # naive company capture (capitalized phrases)
    out["companies"] = list({c for c in re.findall(r"\b([A-Z][a-zA-Z]+(?: [A-Z][a-zA-Z]+)*)\b", question) if len(c) > 2})
    return out

def _llm_extract(question: str) -> Dict[str, Any]:
    if not HAVE_OPENAI or not os.getenv("OPENAI_API_KEY"):
        return _heuristic_parse(question)
    try:
        client = OpenAI(api_key=os.getenv("OPENAI_API_KEY"), base_url=OPENAI_BASE_URL)
        sys = ("Extract finance intent. Return ONLY JSON: "
               "{companies:[], tickers:[], date_from:null|string, date_to:null|string}.")
        resp = client.chat.completions.create(
            model=OPENAI_MODEL,
            messages=[{"role":"system","content":sys},
                      {"role":"user","content":f"Question: {question}\nReturn JSON only."}],
            temperature=0,
            response_format={"type":"json_object"}
        )
        data = json.loads(resp.choices[0].message.content)
        data.setdefault("companies", []); data.setdefault("tickers", [])
        data.setdefault("date_from", None); data.setdefault("date_to", None)
        return data
    except Exception:
        return _heuristic_parse(question)

In [77]:
# Yahoo Finance: Symbol Search (web lookup), Chart v8 + fallback
def yahoo_symbol_search(query: str, region: str = "US", prefer_equity: bool = True) -> Optional[str]:
    """Resolve company name -> symbol using Yahoo's public search JSON (web lookup)."""
    url = "https://query2.finance.yahoo.com/v1/finance/search"
    params = {
        "q": query, "quotesCount": 10, "newsCount": 0, "listsCount": 0,
        "enableFuzzyQuery": "true", "lang": "en-US", "region": region
    }
    try:
        r = requests.get(url, params=params, timeout=10, headers=USER_AGENT)
        if r.status_code != 200:
            return None
        data = r.json()
        quotes = data.get("quotes", []) or []
        if not quotes:
            return None

        def score(q):
            s = 0
            qt = (q.get("quoteType") or q.get("typeDisp") or "").upper()
            exch = (q.get("exchange") or q.get("exchDisp") or "").upper()
            if prefer_equity and ("EQUITY" in qt or qt in {"S","STOCK"}): s += 3
            if exch in US_EXCHANGES: s += 2
            if q.get("isYahooFinance"): s += 1
            return (s, q.get("score", 0))
        best = sorted(quotes, key=score, reverse=True)[0]
        return best.get("symbol")
    except Exception:
        return None

def _get_chart_json(symbol: str, start: datetime, end: datetime, interval: str, max_retries=3) -> Dict:
    base = f"https://query2.finance.yahoo.com/v8/finance/chart/{symbol}"
    params = {"period1": _to_unix(start), "period2": _to_unix(end), "interval": interval}
    backoff = 1.0
    last_status = None
    for _ in range(max_retries):
        r = requests.get(base, params=params, timeout=15, headers=USER_AGENT)
        last_status = r.status_code
        if last_status == 200:
            return r.json()
        time.sleep(backoff); backoff *= 2
    raise RuntimeError(f"Yahoo chart v8 failed for {symbol}: HTTP {last_status}")

def _chart_to_dataframe(payload: Dict) -> pd.DataFrame:
    res = payload["chart"]["result"][0]
    ts = res.get("timestamp") or []
    if not ts:
        raise ValueError("No timestamps returned by Yahoo chart endpoint.")
    ind = pd.to_datetime(pd.Series(ts), unit="s", utc=True)
    q = res["indicators"]["quote"][0]
    df = pd.DataFrame({
        "open": q["open"], "high": q["high"],
        "low": q["low"], "close": q["close"], "volume": q["volume"]
    }, index=ind).sort_index()
    return df

def _yfinance_fallback(symbol: str, start: datetime, end: datetime, interval: str) -> pd.DataFrame:
    iv_map = {"15m": "15m", "1h": "60m", "1d": "1d", "1wk": "1wk"}
    yf_iv = iv_map.get(interval, "1d")
    df = yf.download(symbol, start=start, end=end, interval=yf_iv, progress=False)
    if df.empty:
        raise RuntimeError(f"yfinance returned empty frame for {symbol}")
    df.index = pd.to_datetime(df.index, utc=True)
    df = df.rename(columns=str.lower)
    return df[["open","high","low","close","volume"]]

In [78]:
# MCP (optional): symbol search + article extraction
async def _make_mcp_client():
    client = MCPClient()
    for name, spec in MCP_SERVERS.items():
        await client.add_server(name, command=spec["command"], args=spec.get("args", []))
    await client.start()
    return client

async def _pick_tool(client: "MCPClient", candidates: List[str]) -> Optional[str]:
    tools = await client.get_tools()
    names = {t.get("name"): t for t in tools}
    for cand in candidates:
        if cand in names:
            return cand
    lower_map = {k.lower(): k for k in names}
    for cand in candidates:
        if cand.lower() in lower_map:
            return lower_map[cand.lower()]
    return None

def _extract_symbol_from_text(url: str, text: str) -> Optional[str]:
    m = re.search(r"/quote/([A-Z.\-]{1,10})(?:[/?#]|$)", url)
    if m:
        return m.group(1).upper()
    m = re.search(r"\((?:NASDAQ|NYSE|AMEX|NYSEARCA|NYSEMKT)\s*:\s*([A-Z.\-]{1,10})\)", text, re.I)
    if m:
        return m.group(1).upper()
    m = re.search(r"(?:Ticker\s*(?:symbol)?\s*[:\-]\s*)([A-Z.\-]{1,10})\b", text, re.I)
    if m:
        return m.group(1).upper()
    return None

async def _mcp_symbol_search(company: str) -> Optional[str]:
    client = await _make_mcp_client()
    try:
        search_tool = await _pick_tool(client, ["search","web_search","google_search","brave_search"])
        open_tool   = await _pick_tool(client, ["open","goto","navigate","open_url"])
        extract_tool= await _pick_tool(client, ["extract","read","get_content","get_text"])
        if not search_tool or not open_tool or not extract_tool:
            return None

        q = (f'{company} stock ticker '
             f'site:finance.yahoo.com OR site:wikipedia.org OR site:reuters.com OR site:bloomberg.com')
        res = await client.call(search_tool, {"query": q, "num_results": 8})
        results = res.get("results") or res.get("data") or []

        for r in results:
            url = r.get("url") or r.get("link")
            if not url:
                continue
            try:
                await client.call(open_tool, {"url": url})
                page = await client.call(extract_tool, {"url": url, "max_chars": 3000})
                text = page.get("text") or page.get("content") or ""
            except Exception:
                text = r.get("snippet","") or ""
            sym = _extract_symbol_from_text(url, text)
            if sym:
                return sym
        return None
    finally:
        await client.stop()

def resolve_company_to_symbol(company: str) -> Optional[str]:
    sym = yahoo_symbol_search(company)
    if sym:
        return sym
    if HAVE_MCP and MCP_SERVERS:
        try:
            return anyio.run(_mcp_symbol_search, company)
        except Exception:
            pass
    return None

def resolve_names_to_symbols(companies: List[str]) -> List[str]:
    out: List[str] = []
    for c in companies:
        s = resolve_company_to_symbol(c)
        if s and s not in out:
            out.append(s)
        # if nothing found, skip silently (we'll still have explicit tickers if any)
    return out

In [79]:
# Research: MCP (optional) + RSS fallback
def _rss_url_for_ticker(ticker: str) -> str:
    from urllib.parse import quote
    return f"https://feeds.finance.yahoo.com/rss/2.0/headline?s={quote(ticker)}&region=US&lang=en-US"

def _collect_news_rss(ticker: str, days_window: int = 14, limit: int = 6) -> List[Dict[str, Any]]:
    if not HAVE_RSS:
        return []
    feed = feedparser.parse(_rss_url_for_ticker(ticker))
    items: List[Dict[str, Any]] = []
    cutoff = datetime.now(timezone.utc) - timedelta(days=days_window)
    seen = set()
    for e in feed.entries:
        title = _clean_text(getattr(e, "title", ""))
        link  = getattr(e, "link", "")
        if not title or not link:
            continue
        if getattr(e, "published_parsed", None):
            pub = datetime(*e.published_parsed[:6], tzinfo=timezone.utc)
        else:
            pub = datetime.now(timezone.utc)
        if pub < cutoff:
            continue
        dedup = hashlib.md5((title + link).encode()).hexdigest()
        if dedup in seen:
            continue
        seen.add(dedup)

        excerpt = ""
        try:
            r = requests.get(link, timeout=10, headers=USER_AGENT)
            if r.status_code == 200 and 'text/html' in (r.headers.get("content-type") or ""):
                soup = BeautifulSoup(r.text, "html.parser")
                excerpt = _clean_text(soup.get_text(" ", strip=True))[:700]
        except Exception:
            pass
        if not excerpt:
            excerpt = _clean_text(getattr(e, "summary", ""))[:700]

        items.append({"title": title, "url": link, "published": pub.isoformat(), "excerpt": excerpt})
        if len(items) >= limit:
            break
    return items

async def _mcp_search_and_scrape(ticker: str, days_window: int = 14, max_items: int = 6) -> List[Dict[str, Any]]:
    client = await _make_mcp_client()
    try:
        search_tool = await _pick_tool(client, ["search","web_search","google_search","brave_search"])
        open_tool   = await _pick_tool(client, ["open","goto","navigate","open_url"])
        extract_tool= await _pick_tool(client, ["extract","read","get_content","get_text"])
        if not search_tool or not open_tool or not extract_tool:
            return _collect_news_rss(ticker, days_window, max_items)

        q = (f'{ticker} stock news '
             f'site:reuters.com OR site:bloomberg.com OR site:cnbc.com OR site:wsj.com OR site:investopedia.com')

        res = await client.call(search_tool, {"query": q, "num_results": max_items * 2})
        results = res.get("results") or res.get("data") or []
        out, seen = [], set()
        for r in results:
            url = r.get("url") or r.get("link")
            title = _clean_text(r.get("title") or "")
            if not url or not title:
                continue
            key = hashlib.md5((title + url).encode()).hexdigest()
            if key in seen:
                continue
            seen.add(key)

            excerpt = ""
            try:
                await client.call(open_tool, {"url": url})
                page = await client.call(extract_tool, {"url": url, "max_chars": 2000})
                text = page.get("text") or page.get("content") or ""
                excerpt = _clean_text(text)[:700]
            except Exception:
                excerpt = _clean_text(r.get("snippet") or "")[:700]

            out.append({"title": title, "url": url, "published": r.get("date") or r.get("published") or "", "excerpt": excerpt})
            if len(out) >= max_items:
                break
        return out
    finally:
        await client.stop()

def _research_fallback_or_mcp(ticker: str, days_window: int = 14, max_items: int = 6) -> List[Dict[str, Any]]:
    if HAVE_MCP and MCP_SERVERS:
        try:
            return anyio.run(_mcp_search_and_scrape, ticker, days_window, max_items)
        except Exception:
            pass
    return _collect_news_rss(ticker, days_window, max_items)

In [80]:
# Agents
def intent_agent(state: State) -> State:
    """LLM/heuristic parse + web symbol resolution (Yahoo JSON, then MCP if enabled)."""
    parsed = _llm_extract(state.question)

    companies = [c.strip() for c in (parsed.get("companies") or []) if c.strip()]
    explicit_tickers = list(dict.fromkeys(
        (parsed.get("tickers") or []) + re.findall(r"\b[A-Z]{1,5}\b", state.question)
    ))

    resolved = resolve_names_to_symbols(companies)
    tickers = explicit_tickers + [t for t in resolved if t not in explicit_tickers]

    start = _parse_date_or_none(parsed.get("date_from"))
    end   = _parse_date_or_none(parsed.get("date_to"))

    state.companies = companies
    state.tickers = tickers or state.tickers or ["AAPL"]
    state.start, state.end = _default_dates(start or state.start, end or state.end)
    state.interval = _pick_interval(state.start, state.end)
    return state

def data_agent(state: State) -> State:
    for sym in state.tickers:
        try:
            js = _get_chart_json(sym, state.start, state.end, state.interval)
            df = _chart_to_dataframe(js)
            state.prices[sym] = df
            meta = js["chart"]["result"][0].get("meta", {})
            state.quotes[sym] = {
                "regularMarketPrice": meta.get("regularMarketPrice"),
                "currency": meta.get("currency")
            }
        except Exception:
            df = _yfinance_fallback(sym, state.start, state.end, state.interval)
            state.prices[sym] = df
            state.quotes[sym] = {
                "regularMarketPrice": float(df["close"].iloc[-1]),
                "currency": "USD"
            }
    return state

def research_agent(state: State) -> State:
    window_days = min(30, max(7, (state.end - state.start).days if state.end and state.start else 14))
    research: Dict[str, List[Dict[str, Any]]] = {}
    for sym in state.tickers:
        research[sym] = _research_fallback_or_mcp(sym, days_window=window_days, max_items=6)
    state.research = research
    return state

def analysis_agent(state: State) -> State:
    out: Dict[str, Any] = {}
    for sym, df in state.prices.items():
        ret = df["close"].pct_change().dropna()
        cum = (1 + ret).prod() - 1
        vol = ret.std() * math.sqrt(252 if state.interval in ("1d","1h","15m") else 52)
        max_dd = ((df["close"]/df["close"].cummax()) - 1).min()
        ma20 = df["close"].rolling(20).mean().iloc[-1]
        ma50 = df["close"].rolling(50).mean().iloc[-1] if len(df) >= 50 else None
        out[sym] = {
            "period_return": float(cum),
            "vol_annualized": float(vol),
            "max_drawdown": float(max_dd),
            "ma20": float(ma20) if ma20 else None,
            "ma50": float(ma50) if ma50 else None
        }
    state.analysis = out
    return state

def viz_agent(state: State) -> State:
    for sym, df in state.prices.items():
        if df.empty:
            continue
        fig, ax = plt.subplots()
        df["close"].plot(ax=ax, label="Close", linewidth=1.2)
        try:
            df["close"].rolling(20).mean().plot(ax=ax, label="MA20", linestyle="--")
            if len(df) >= 50:
                df["close"].rolling(50).mean().plot(ax=ax, label="MA50", linestyle=":")
        except Exception:
            pass
        ax.set_title(f"{sym} Price — {state.start.date()} to {state.end.date()}")
        ax.set_xlabel("Date")
        ax.set_ylabel("Price")
        ax.legend(loc="best", fontsize="small", frameon=True)
        path = f"{sym}_chart.png"
        plt.savefig(path, bbox_inches="tight")
        plt.close(fig)
        state.charts[sym] = path
    return state

def report_agent(state: State) -> State:
    lines: List[str] = []
    if state.companies:
        lines.append(f"**Resolved companies → tickers:** {', '.join(state.companies)} → {', '.join(state.tickers)}\n")

    for sym in state.tickers:
        q = state.quotes.get(sym, {})
        a = state.analysis.get(sym, {})
        price = q.get("regularMarketPrice")
        pr = a.get("period_return"); vol = a.get("vol_annualized"); dd = a.get("max_drawdown")
        ma20 = a.get("ma20"); ma50 = a.get("ma50")

        lines.append(
            f"**{sym}** — Last price ~ {price} {q.get('currency','')}. "
            f"Window {state.start.date()}→{state.end.date()}: "
            f"total return ≈ {pr:.1%}, annualized volatility ≈ {vol:.1%}, "
            f"max drawdown ≈ {dd:.1%}. "
            f"Trend check: 20-day MA {('>=' if ma50 and ma20>=ma50 else '<' if ma50 else '≈')} 50-day MA."
        )

        news_items = state.research.get(sym, [])
        if news_items:
            lines.append("**Research highlights (recent):**")
            for it in news_items[:5]:
                excerpt = it.get("excerpt","")[:160]
                lines.append(f"- {it.get('title','(no title)')} — {excerpt} ({it.get('url','')})")
        lines.append("")

    disclaimer = (
        "_Note: Educational analysis, not investment advice. "
        "Yahoo endpoints are unofficial and may change. "
        "External links are for reference—verify with primary sources._"
    )
    state.report = "\n".join(lines).strip() + "\n\n" + disclaimer
    return state

In [81]:
from langgraph.graph import StateGraph, START, END

def build_app():
    g = StateGraph(State)   # ensure we get a State object back
    g.add_node("intent", intent_agent)
    g.add_node("data", data_agent)
    g.add_node("research", research_agent)
    g.add_node("analysis", analysis_agent)
    g.add_node("viz", viz_agent)
    g.add_node("report", report_agent)

    g.add_edge(START, "intent")
    g.add_edge("intent", "data")
    g.add_edge("data", "research")
    g.add_edge("research", "analysis")
    g.add_edge("analysis", "viz")
    g.add_edge("viz", "report")
    g.add_edge("report", END)
    return g.compile()

In [82]:
app = build_app()

# Example 1
s1 = app.invoke(State(question="Analyze Apple from Jan 1, 2023 to Aug 31, 2025"))
_print_result(s1)

# Example 2
# s2 = app.invoke(State(question="Compare Tesla and NVIDIA"))
# _print_result(s2)
