In [2]:
# news_graph_multi.py
import os, asyncio, datetime, json, math
from typing import List, Optional, TypedDict, Dict, Any
import feedparser, httpx, trafilatura
from dotenv import load_dotenv
from openai import OpenAI
import nest_asyncio
nest_asyncio.apply()
from langgraph.graph import StateGraph

In [3]:
def arun(coro):  # helper to run async in notebooks
    return asyncio.get_event_loop().run_until_complete(coro)

In [4]:
# -----------------------------------------------------------------------------
# Setup (same as your original, kept stable)
# -----------------------------------------------------------------------------
load_dotenv()
OPENAI_MODEL = os.getenv("OPENAI_MODEL", "gpt-4o")
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")
if not OPENAI_API_KEY:
    raise RuntimeError("Set OPENAI_API_KEY in your .env")

oai = OpenAI(api_key=OPENAI_API_KEY)

EMBED_MODEL = os.getenv("OPENAI_EMBED_MODEL", "text-embedding-3-small")
EMBED_CACHE_PATH = os.getenv("EMBED_CACHE_PATH", "keyword_embs_1.json")
EMBED_SIM_THRESHOLD = float(os.getenv("EMBED_SIM_THRESHOLD", "0.75"))
SCORING_MODE = os.getenv("SCORING_MODE", "embedding")  # kept for future use



In [5]:

FEEDS: Dict[str, List[str]] = {
    "AI": [
        "https://openai.com/blog/rss/",
        "https://deepmind.com/blog/feed/basic/",
        "https://paperswithcode.com/feeds/latest",
    ],
    "Tech": [
        "https://techcrunch.com/feed/",
        "https://www.theverge.com/rss/index.xml",
        "https://www.wired.com/feed/rss"
    ],
    "Finance": [
        "https://www.marketwatch.com/feeds/topstories",
        "http://feeds.reuters.com/reuters/financialsNews",
        "https://seekingalpha.com/market_currents.xml"
    ],
}

KEYWORDS = [
    "AI", "NVIDIA", "OPENAI", "APPLE", "GOOGLE", "TESLA", "FACEBOOK", "AMAZON", "SAMSUNG",
    "chip", "ECB", "inflation", "etf", "Bitcoin", "crypto", "BTC", "GPT"
    "TRUMP", "POWELL", "Warren Buffet", "MUSK", "Dimon", "Solomon",
    "Soros", "Dalio", "Ackman"
]
MAX_TOTAL = 18
TOP_N = 9

In [6]:
def keyword_score(title: str, text: str):
    base = (title + " " + text[:800]).lower()
    score = sum(2.0 if kw.isupper() else 1.0
                for kw in KEYWORDS if kw.lower() in base)

In [7]:

# -----------------------------------------------------------------------------
# Minimal embedding cache support (re-using your approach)
# -----------------------------------------------------------------------------
def _cosine_sim(a: List[float], b: List[float]) -> float:
    dot = sum(x*y for x, y in zip(a, b))
    na = math.sqrt(sum(x*x for x in a))
    nb = math.sqrt(sum(y*y for y in b))
    return dot / (na * nb) if na and nb else 0.0

def _load_keyword_cache() -> Dict[str, List[float]]:
    if os.path.exists(EMBED_CACHE_PATH):
        try:
            with open(EMBED_CACHE_PATH, "r", encoding="utf-8") as f:
                data = json.load(f)
                return {k: v for k, v in data.items() if k in KEYWORDS}
        except Exception:
            return {}
    return {}

def _save_keyword_cache(cache: Dict[str, List[float]]):
    try:
        with open(EMBED_CACHE_PATH, "w", encoding="utf-8") as f:
            json.dump(cache, f)
        print(f"✅ Keyword cache saved to {EMBED_CACHE_PATH} ({len(cache)} entries).")
    except Exception as e:
        print(f"⚠️ Warning: failed to save keyword cache to {EMBED_CACHE_PATH}: {e}")


def _embed_text(txt: str) -> List[float]:
    resp = oai.embeddings.create(model=EMBED_MODEL, input=txt)
    return resp.data[0].embedding

_KEYWORD_EMBS: Dict[str, List[float]] = _load_keyword_cache()
_missing = [kw for kw in KEYWORDS if kw not in _KEYWORD_EMBS]
if _missing:
    for kw in _missing:
        _KEYWORD_EMBS[kw] = _embed_text(kw)
    _save_keyword_cache(_KEYWORD_EMBS)


✅ Keyword cache saved to keyword_embs_1.json (25 entries).


In [8]:

# -----------------------------------------------------------------------------
# State schema
# -----------------------------------------------------------------------------
class Article(TypedDict, total=False):
    title: str
    link: str
    category: str
    text: str
    summary: Optional[str]
    score: float
    risk: Optional[str]         # added: critic’s risk note
    grounded: Optional[bool]    # added: critic’s grounding verdict
    entities: Optional[Dict[str, Any]]  # added: tagger’s JSON (companies/people/tickers/topics)

class AgentState(TypedDict):
    articles: List[Article]
    top_n: int
    digest_md: Optional[str]

In [9]:
# -------------------------------------------------------------------
# Fetcher Node (balanced per category with round-robin)
# -------------------------------------------------------------------
async def fetcher_node(state: AgentState) -> AgentState:
    items: List[Article] = []

    # Step 1. Collect raw feed entries
    for cat, feeds in FEEDS.items():
        for url in feeds:
            f = feedparser.parse(url)
            for e in f.entries[:6]:  # cap per feed to avoid overload
                link = getattr(e, "link", None)
                title = getattr(e, "title", "Untitled")
                if not link:
                    continue
                items.append({
                    "title": title,
                    "link": link,
                    "category": cat,
                    "text": "",
                    "summary": None,
                    "score": 0.0,
                })

    # Step 2. Deduplicate
    seen, uniq = set(), []
    for it in items:
        if it["link"] in seen:
            continue
        seen.add(it["link"])
        uniq.append(it)
    items = uniq

    # Step 3. Bucket by category
    from collections import defaultdict, deque
    by_cat = defaultdict(deque)
    for it in items:
        by_cat[it["category"]].append(it)

    # Step 4. Round-robin balance up to MAX_TOTAL
    balanced, cats = [], list(by_cat.keys())
    while len(balanced) < MAX_TOTAL and any(by_cat[c] for c in cats):
        for c in cats:
            if by_cat[c] and len(balanced) < MAX_TOTAL:
                balanced.append(by_cat[c].popleft())
    items = balanced  # evenly interleaved

    # Step 5. Pull HTML + extract text
    async with httpx.AsyncClient(follow_redirects=True, timeout=15) as client:
        async def extract(it: Article):
            try:
                r = await client.get(it["link"])
                html = r.text if r.status_code == 200 else ""
                text = trafilatura.extract(
                    html,
                    include_comments=False,
                    include_tables=False
                ) or ""
                it["text"] = text
            except Exception:
                it["text"] = ""
            return it

        items = await asyncio.gather(*(extract(it) for it in items))

    # Step 6. Update state
    state["articles"] = items
    return state


In [10]:
import runpy, types, asyncio
mod = types.SimpleNamespace(**runpy.run_path("news_graph_multi.py"))

In [11]:
def arun(coro):  # helper to run async in notebooks
    return asyncio.get_event_loop().run_until_complete(coro)

In [12]:
state = {"articles": [], "top_n": 9, "digest_md": None}

In [13]:
state = arun(mod.fetcher_node(state))
print("After fetcher:", len(state["articles"]), "articles")
print("Sample title:", state["articles"][0]["title"])
print(state["articles"][0]["text"][:400])

After fetcher: 18 articles
Sample title: Strengthening our Frontier Safety Framework
Responsibility & Safety
Strengthening our Frontier Safety Framework
We’re expanding our risk domains and refining our risk assessment process.
AI breakthroughs are transforming our everyday lives, from advancing mathematics, biology and astronomy to realizing the potential of personalized education. As we build increasingly powerful AI models, we’re committed to responsibly developing our technolo


In [14]:
for i, art in enumerate(state["articles"][:50], 1):
    print(f"\n--- Article {i} ---")
    print("Title:", art["title"])
    print("Category:", art["category"])
    print("Link:", art["link"])
    print("Text preview:", art["text"][:200], "...")



--- Article 1 ---
Title: Strengthening our Frontier Safety Framework
Category: AI
Link: https://deepmind.google/discover/blog/strengthening-our-frontier-safety-framework/
Text preview: Responsibility & Safety
Strengthening our Frontier Safety Framework
We’re expanding our risk domains and refining our risk assessment process.
AI breakthroughs are transforming our everyday lives, fro ...

--- Article 2 ---
Title: Discovering new solutions to century-old problems in fluid dynamics
Category: AI
Link: https://deepmind.google/discover/blog/discovering-new-solutions-to-century-old-problems-in-fluid-dynamics/
Text preview: Science
Discovering new solutions to century-old problems in fluid dynamics
Our new method could help mathematicians leverage AI techniques to tackle long-standing challenges in mathematics, physics a ...

--- Article 3 ---
Title: Gemini achieves gold-level performance at the International Collegiate Programming Contest World Finals
Category: AI
Link: https://deepmind.goog

In [15]:

async def summariser_node(state: AgentState) -> AgentState:
    async def do_sum(a: Article):
        if not a["text"]:
            a["summary"] = "(no extractable content)"
            a["score"] = keyword_score(a["title"], "")
            return a
        prompt = f"""You are a financial/tech news summariser.
Summarise the article in 3–5 crisp, extractive bullets (no speculation).
Add sentiment tag in square brackets at the end of the last bullet: [Bullish|Neutral|Cautious].
Use ONLY facts present in the text.

Title: {a['title']}
Text:
{a['text'][:8000]}"""
        rsp = oai.chat.completions.create(
            model=OPENAI_MODEL,
            messages=[{"role": "user", "content": prompt}],
            temperature=0.2,
        )
        a["summary"] = rsp.choices[0].message.content.strip()
        a["score"] = keyword_score(a["title"], a["text"])
        return a
    state["articles"] = await asyncio.gather(*(do_sum(a) for a in state["articles"]))
    return state

In [16]:
# after defining async def summariser_node(...)
state = await summariser_node(state)   # not mod.summariser_node
print("After summariser:", sum(1 for a in state["articles"] if a.get("summary")))
print(state["articles"][0]["summary"])


After summariser: 18
- The latest update to the Frontier Safety Framework (FSF) focuses on identifying and mitigating severe risks from advanced AI models, building on previous iterations and collaborations with experts.  
- A new Critical Capability Level (CCL) has been introduced to address risks associated with harmful manipulation by AI models that could significantly alter beliefs and behaviors.  
- The Framework has been expanded to include protocols for misalignment risks, ensuring that AI models do not interfere with operators' control over their operations.  
- The risk assessment process has been sharpened to prioritize critical threats, with detailed analyses conducted before external launches and large-scale internal deployments.  
- The update reflects a commitment to an evidence-based approach in tracking AI risks, aiming to ensure that AI advancements benefit humanity while minimizing potential harms. [Bullish]


In [17]:
for i, art in enumerate(state["articles"], 1):
    print(f"\n--- Article {i} ---")
    print("Title:", art["title"])
    print("Summary:", art["summary"])



--- Article 1 ---
Title: Strengthening our Frontier Safety Framework
Summary: - The latest update to the Frontier Safety Framework (FSF) focuses on identifying and mitigating severe risks from advanced AI models, building on previous iterations and collaborations with experts.  
- A new Critical Capability Level (CCL) has been introduced to address risks associated with harmful manipulation by AI models that could significantly alter beliefs and behaviors.  
- The Framework has been expanded to include protocols for misalignment risks, ensuring that AI models do not interfere with operators' control over their operations.  
- The risk assessment process has been sharpened to prioritize critical threats, with detailed analyses conducted before external launches and large-scale internal deployments.  
- The update reflects a commitment to an evidence-based approach in tracking AI risks, aiming to ensure that AI advancements benefit humanity while minimizing potential harms. [Bullish]

-

In [18]:

async def critic_node(state: AgentState) -> AgentState:
    """Risk & Grounding Critic: flags rumor/risk and checks extractiveness."""
    async def critique(a: Article):
        if not a.get("summary"):
            a["risk"] = "no-summary"
            a["grounded"] = False
            return a
        prompt = f"""You are a Risk & Grounding Critic for financial/tech news.
Given the article text and its 3–5 bullet summary, do two things:

1) Is each bullet extractive (appears in the text) and free of speculation? Reply "YES" only if all bullets are extractive; else "NO".
2) Provide a short risk note: one of ["rumor-risk","low-evidence","balanced","well-sourced"].

Return strict JSON with keys: grounded (true/false), risk (string), issues (array of bullet indices with problems).

TITLE: {a['title']}
TEXT:
{a['text'][:6000]}
SUMMARY:
{a['summary']}"""
        rsp = oai.chat.completions.create(
            model=OPENAI_MODEL,
            messages=[{"role":"user","content":prompt}],
            temperature=0,
            response_format={"type":"json_object"}
        )
        data = json.loads(rsp.choices[0].message.content)
        a["grounded"] = bool(data.get("grounded", False))
        a["risk"] = str(data.get("risk", "balanced"))
        # Optionally: you could redact problematic bullets here
        return a
    state["articles"] = await asyncio.gather(*(critique(a) for a in state["articles"]))
    return state

In [19]:
state = arun(mod.critic_node(state))
print("After critic:", sum(1 for a in state["articles"] if a.get("grounded") is not None))
print(state["articles"][0]["grounded"], state["articles"][0].get("risk"))

After critic: 18
True well-sourced


In [20]:
for i, art in enumerate(state["articles"], 1):
    print(f"\n--- Article {i} ---")
    print("Title:", art["title"])
    print("Summary:", art["summary"])
    print("Grounded:", art.get("grounded"))
    print("Risk:", art.get("risk"))
    print("Issues:", art.get("issues", []))



--- Article 1 ---
Title: Strengthening our Frontier Safety Framework
Summary: - The latest update to the Frontier Safety Framework (FSF) focuses on identifying and mitigating severe risks from advanced AI models, building on previous iterations and collaborations with experts.  
- A new Critical Capability Level (CCL) has been introduced to address risks associated with harmful manipulation by AI models that could significantly alter beliefs and behaviors.  
- The Framework has been expanded to include protocols for misalignment risks, ensuring that AI models do not interfere with operators' control over their operations.  
- The risk assessment process has been sharpened to prioritize critical threats, with detailed analyses conducted before external launches and large-scale internal deployments.  
- The update reflects a commitment to an evidence-based approach in tracking AI risks, aiming to ensure that AI advancements benefit humanity while minimizing potential harms. [Bullish]
Gr

In [21]:

async def tagger_node(state: AgentState) -> AgentState:
    """Entity & Topic Tagger: companies, people, tickers, topics."""
    async def tag(a: Article):
        if not a.get("text"):
            return a
        prompt = f"""Extract entities and topics as JSON with keys:
companies (array), people (array), tickers (array), topics (array: e.g., 'chips','LLMs','rates').
Base only on the TEXT below. No guessing.

TEXT:
{a['text'][:6000]}"""
        rsp = oai.chat.completions.create(
            model=OPENAI_MODEL,
            messages=[{"role":"user","content":prompt}],
            temperature=0,
            response_format={"type":"json_object"}
        )
        a["entities"] = json.loads(rsp.choices[0].message.content)
        return a
    state["articles"] = await asyncio.gather(*(tag(a) for a in state["articles"]))
    return state

In [22]:
state = arun(mod.tagger_node(state))
print("After tagger: entities =", sum(1 for a in state["articles"] if a.get("entities")))
print(state["articles"][0]["entities"])

After tagger: entities = 18
{'companies': [], 'people': [], 'tickers': [], 'topics': ['AI', 'risk assessment', 'safety', 'machine learning', 'AGI']}


In [23]:
for i, art in enumerate(state["articles"], 1):
    print(f"\n--- Article {i} ---")
    print("Title:", art.get("title"))
    entities = art.get("entities", {})
    print("Companies:", entities.get("companies", []))
    print("People:", entities.get("people", []))
    print("Tickers:", entities.get("tickers", []))
    print("Topics:", entities.get("topics", []))



--- Article 1 ---
Title: Strengthening our Frontier Safety Framework
Companies: []
People: []
Tickers: []
Topics: ['AI', 'risk assessment', 'safety', 'machine learning', 'AGI']

--- Article 2 ---
Title: Discovering new solutions to century-old problems in fluid dynamics
Companies: []
People: ['Yongji Wang', 'Mehdi Bennani', 'James Martens', 'Sébastien Racanière', 'Sam Blackwell', 'Alex Matthews', 'Stanislav Nikolov', 'Gonzalo Cao-Labora', 'Daniel S. Park', 'Martin Arjovsky', 'Daniel Worrall', 'Chongli Qin', 'Ferran Alet', 'Borislav Kozlovskii', 'Nenad Tomašev', 'Alex Davies', 'Pushmeet Kohli', 'Tristan Buckmaster', 'Bogdan Georgiev', 'Javier Gómez-Serrano', 'Ray Jiang', 'Ching-Yao Lai']
Tickers: []
Topics: ['fluid dynamics', 'AI', 'mathematics', 'physics', 'engineering', 'singularities', 'machine learning', 'Physics-Informed Neural Networks', 'computer-assisted proofs']

--- Article 3 ---
Title: Gemini achieves gold-level performance at the International Collegiate Programming Contest

In [24]:

def curator_node(state: AgentState) -> AgentState:
    # Prioritise grounded items; then keyword score; keep category balance.
    state["articles"].sort(key=lambda a: (1 if a.get("grounded") else 0, a.get("score", 0.0)), reverse=True)
    per_cat_limit = max(1, state["top_n"] // 3)
    kept, counts = [], {"AI":0, "Tech":0, "Finance":0}
    for a in state["articles"]:
        if counts[a["category"]] < per_cat_limit:
            kept.append(a)
            counts[a["category"]] += 1
        if len(kept) >= state["top_n"]:
            break
    if len(kept) < state["top_n"]:
        chosen = set(id(x) for x in kept)
        for a in state["articles"]:
            if id(a) in chosen:
                continue
            kept.append(a)
            if len(kept) >= state["top_n"]:
                break
    state["articles"] = kept
    return state

In [25]:
state = mod.curator_node(state)
print("After curator:", len(state["articles"]))

After curator: 9


In [27]:
for i, a in enumerate(state["articles"], 1):
    print(f"\n--- Curated {i} ---")
    print("Title:", a.get("title"))
    print("Category:", a.get("category"))
    print("Grounded:", a.get("grounded"))
    print("Risk:", a.get("risk"))
    print("Score:", round(a.get("score", 0.0), 3))



--- Curated 1 ---
Title: Alibaba to offer Nvidia’s physical AI development tools in its AI platform
Category: Tech
Grounded: True
Risk: well-sourced


TypeError: type tuple doesn't define __round__ method

In [28]:

def formatter_node(state: AgentState) -> AgentState:
    today = datetime.date.today().isoformat()
    lines = [f"# Daily AI • Tech • Finance — {today}", ""]
    for cat in ["AI", "Tech", "Finance"]:
        section = [a for a in state["articles"] if a["category"] == cat]
        if not section:
            continue
        lines.append(f"## {cat}")
        for a in section:
            risk = a.get("risk", "balanced")
            grounded = "Grounded✅" if a.get("grounded") else "Needs-check⚠️"
            lines.append(f"### [{a['title']}]({a['link']})  \n*{grounded} • risk={risk}*")
            lines.append(a.get("summary") or "(no summary)")
            ents = a.get("entities") or {}
            if ents:
                lines.append(f"_Entities:_ {ents.get('companies', [])}  \n_Tickers:_ {ents.get('tickers', [])}  \n_Topics:_ {ents.get('topics', [])}")
            lines.append("")
    state["digest_md"] = "\n".join(lines)
    return state

In [29]:
state = mod.formatter_node(state)
print("=== Digest Output ===\n")
print(state["digest_md"])

=== Digest Output ===

# Daily AI • Tech • Finance — 2025-09-24

## AI
### [Strengthening our Frontier Safety Framework](https://deepmind.google/discover/blog/strengthening-our-frontier-safety-framework/)  
*Grounded✅ • risk=well-sourced*
- The latest update to the Frontier Safety Framework (FSF) focuses on identifying and mitigating severe risks from advanced AI models, building on previous iterations and collaborations with experts.  
- A new Critical Capability Level (CCL) has been introduced to address risks associated with harmful manipulation by AI models that could significantly alter beliefs and behaviors.  
- The Framework has been expanded to include protocols for misalignment risks, ensuring that AI models do not interfere with operators' control over their operations.  
- The risk assessment process has been sharpened to prioritize critical threats, with detailed analyses conducted before external launches and large-scale internal deployments.  
- The update reflects a comm

In [30]:
from langgraph.graph import StateGraph, START, END   # make sure START, END are imported

graph = StateGraph(AgentState)

# Add your nodes
graph.add_node("fetcher", fetcher_node)
graph.add_node("curator", curator_node)
graph.add_node("critic", critic_node)
graph.add_node("formatter", formatter_node)

# Define the flow
graph.add_edge(START, "fetcher")       # entrypoint
graph.add_edge("fetcher", "curator")
graph.add_edge("curator", "critic")
graph.add_edge("critic", "formatter")
graph.add_edge("formatter", END)       # exit point

# Compile
app = graph.compile()


In [31]:
# run (async)
result = await app.ainvoke({
    "articles": [],
    "top_n": 10,          # ← add this
    # optionally other knobs you already use:
    # "keywords": ["CBA", "markets"],
    # "score_threshold": 0.6,
})


In [32]:
result

{'articles': [{'title': 'Strengthening our Frontier Safety Framework',
   'link': 'https://deepmind.google/discover/blog/strengthening-our-frontier-safety-framework/',
   'category': 'AI',
   'summary': None,
   'score': 0.0,
   'risk': 'no-summary',
   'grounded': False},
  {'title': 'UK police arrest man linked to ransomware attack that caused airport disruptions in Europe',
   'link': 'https://techcrunch.com/2025/09/24/uk-police-arrest-man-linked-to-ransomware-attack-that-caused-airport-disruptions-in-europe/',
   'category': 'Tech',
   'text': 'The U.K.’s National Crime Agency (NCA) said on Wednesday that a man was arrested in connection to the ransomware attack that has caused delays and disruptions at several European airports since the weekend.\nThe hack targeted check-in systems provided by Collins Aerospace on Friday, causing delays at Brussels, Berlin, Dublin, and London’s Heathrow airport, which lasted until yesterday.\nWhile the NCA did not name the man, it said he is “in h

In [33]:

# -----------------------------------------------------------------------------
# Main
# -----------------------------------------------------------------------------
if __name__ == "__main__":
    app = build_graph()
    result: AgentState = asyncio.run(app.ainvoke({"articles": [], "top_n": TOP_N, "digest_md": None}))
    stamp = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
    out_path = f"daily_digest_{stamp}.md"

    header = f"_(Generated {stamp})_\n\n"
    content = header + (result.get("digest_md") or "(No content)")
    with open(out_path, "w", encoding="utf-8") as f:
        f.write(content)
    print(f"✅ Wrote {out_path}")


NameError: name 'build_graph' is not defined


# -----------------------------------------------------------------------------
# Main
# -----------------------------------------------------------------------------
if __name__ == "__main__":
    app = build_graph()
    result: AgentState = asyncio.run(app.ainvoke({"articles": [], "top_n": TOP_N, "digest_md": None}))
    stamp = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
    out_path = f"daily_digest_{stamp}.md"

    header = f"_(Generated {stamp})_\n\n"
    content = header + (result.get("digest_md") or "(No content)")
    with open(out_path, "w", encoding="utf-8") as f:
        f.write(content)
    print(f"✅ Wrote {out_path}")
