In [28]:
# pip install pygooglenews feedparser python-dateutil pandas
from pygooglenews import GoogleNews
from dateutil import tz
from datetime import datetime
import hashlib, os, re
import pandas as pd
from urllib.parse import urlparse, urlunparse, parse_qsl, urlencode
import time

In [30]:
LANG = "en"
COUNTRY = "US"
WHEN = "1d"   # rolling window for continuous pulls; use from_/to_ in backfill()
RUN_DT = datetime.now(tz=tz.tzlocal())
DIR = 'data'
# Output dirs
RAW_DIR = f"raw/{RUN_DT:%Y-%m-%d}"
STATE_DIR = "state"
MASTER_DIR = "master"
MASTER_PATH = f"{DIR}/realtime_data_feed.csv"
SEEN_PATH = f"{DIR}/gnews_seen_ids.csv"

os.makedirs(RAW_DIR, exist_ok=True)
os.makedirs(STATE_DIR, exist_ok=True)
os.makedirs(MASTER_DIR, exist_ok=True)

# Queries (edit/add as needed)
ENTITY_QUERIES = [
    '("BNY Mellon" OR "Bank of New York Mellon" OR BK) -sports -gossip',
    'Pershing',
    'Dreyfus'
]
THEME_QUERIES = [
    '"asset servicing" OR custody OR "securities services"',
    'FX OR "repo market" OR "liquidity management" OR "securities lending"',
    '"Basel III" OR "Basel endgame" OR "capital requirements" OR "operational resilience"',
    '"rating action" OR "credit watch" OR "outlook revised"',
    '"tokenized assets" OR "market infrastructure" OR "T+1"'
]
ALL_QUERIES = ENTITY_QUERIES + THEME_QUERIES

# -------- Helpers --------
def clean_url(u: str) -> str:
    """Remove common tracking params, keep a stable canonical-ish URL."""
    try:
        p = urlparse(u)
        if not p.scheme:
            return u
        q = [(k, v) for k, v in parse_qsl(p.query, keep_blank_values=True)
             if not re.match(r'^(utm_|gclid|fbclid)$', k, re.I)]
        return urlunparse((p.scheme, p.netloc, p.path, "", urlencode(q), ""))
    except Exception:
        return u

def make_id(title: str, canonical_link: str) -> str:
    base = f"{(title or '').strip()}|{(canonical_link or '').strip()}"
    return hashlib.sha256(base.encode("utf-8")).hexdigest()

def load_seen(path: str) -> set:
    if not os.path.exists(path): return set()
    try:
        return set(pd.read_csv(path)["id"].astype(str).tolist())
    except Exception:
        return set()

def append_seen(path: str, new_ids: list[str], run_ts: str) -> None:
    if not new_ids: return
    df = pd.DataFrame({"id": new_ids, "first_seen_at": run_ts})
    header = not os.path.exists(path)
    df.to_csv(path, mode="a", header=header, index=False)

def write_master(rows: list[dict]) -> None:
    if not rows: return
    df = pd.DataFrame(rows)
    header = not os.path.exists(MASTER_PATH)
    # df.to_csv(MASTER_PATH, mode="a", header=header, index=False)
    

# -------- Main pull --------
def run_pull():
    gn = GoogleNews(lang=LANG, country=COUNTRY)
    seen = load_seen(SEEN_PATH)
    run_iso = RUN_DT.isoformat()
    raw_out = os.path.join(RAW_DIR, f"google_news_raw_{RUN_DT:%Y%m%d_%H%M}.csv")

    raw_rows, master_rows, new_ids = [], [], []

    for q in ALL_QUERIES:
        res = gn.search(q, when=WHEN)  # swap to from_/to_ in backfill()
        entries = (res or {}).get("entries", []) or []
        for e in entries:
            title = e.get("title", "")
            link = e.get("link", "")
            src = (e.get("source", {}) or {}).get("title", "")
            summary = e.get("summary", "")
            published = e.get("published", "")
            canonical = clean_url(link)
            _id = make_id(title, canonical)

            row = {
                "id": _id,
                "title": title,
                "link": canonical,
                "published": published,
                "source": src,
                "summary": summary,
                "query": q,
                "fetched_at": run_iso
            }
            raw_rows.append(row)

            if _id not in seen:
                master_rows.append(row)
                new_ids.append(_id)

    # Store raw data pull
    pd.DataFrame(raw_rows).to_csv(raw_out, index=False)
    
    # Return filtered df
    return pd.DataFrame(master_rows)
    append_seen(SEEN_PATH, new_ids, run_iso)

In [32]:
## Main Script
start = time.time()

df = run_pull()
header = not os.path.exists(MASTER_PATH)
# df.to_csv(MASTER_PATH, mode="a", header=header, index=False)
# append_seen(SEEN_PATH, new_ids, run_iso)

# fetch = time.time()


In [40]:
df

Unnamed: 0,id,title,link,published,source,summary,query,fetched_at
0,14afc5960984be56afd562d076fab48345699e74290b5b...,Retail Sales Remain Resilient - BNY,https://news.google.com/rss/articles/CBMiwgFBV...,"Wed, 12 Nov 2025 11:16:25 GMT",BNY,"<a href=""https://news.google.com/rss/articles/...","(""BNY Mellon"" OR ""Bank of New York Mellon"" OR ...",2025-11-12T11:29:07.233806-05:00
1,9de33d87790650e46d84dd144c58350ec47e9dd259bea4...,Black Kite Releases Global Adaptive AI Assessm...,https://news.google.com/rss/articles/CBMihwJBV...,"Wed, 12 Nov 2025 11:00:00 GMT",Morningstar,"<a href=""https://news.google.com/rss/articles/...","(""BNY Mellon"" OR ""Bank of New York Mellon"" OR ...",2025-11-12T11:29:07.233806-05:00
2,6ae6e9a893b3dd72e214e2cfa3dfcbaabf6821621334e3...,ABC Arbitrage SA Takes Position in The Bank of...,https://news.google.com/rss/articles/CBMi1AFBV...,"Wed, 12 Nov 2025 11:46:28 GMT",MarketBeat,"<a href=""https://news.google.com/rss/articles/...","(""BNY Mellon"" OR ""Bank of New York Mellon"" OR ...",2025-11-12T11:29:07.233806-05:00
3,eee8dff2da573b266d4f79039c7858d9139471f4914917...,Will The Bank of New York Mellon Corporation (...,https://news.google.com/rss/articles/CBMitgFBV...,"Wed, 12 Nov 2025 14:55:13 GMT",newser.com,"<a href=""https://news.google.com/rss/articles/...","(""BNY Mellon"" OR ""Bank of New York Mellon"" OR ...",2025-11-12T11:29:07.233806-05:00
4,5713a6a6a3fe6c45f3e298e23026fa499dc41426df65b7...,BK Technologies Data Breach Exposes Sensitive ...,https://news.google.com/rss/articles/CBMiugFBV...,"Wed, 12 Nov 2025 11:31:28 GMT",The Critical Communications Review,"<a href=""https://news.google.com/rss/articles/...","(""BNY Mellon"" OR ""Bank of New York Mellon"" OR ...",2025-11-12T11:29:07.233806-05:00
...,...,...,...,...,...,...,...,...
398,83b7a326746e1bd2c9a85d144ac8204c0f3df5f88e1f53...,Sebi and IEPFA Host 'Niveshak Shivir' to Recla...,https://news.google.com/rss/articles/CBMixAFBV...,"Tue, 11 Nov 2025 16:46:33 GMT",Devdiscourse,"<a href=""https://news.google.com/rss/articles/...","""tokenized assets"" OR ""market infrastructure"" ...",2025-11-12T11:29:07.233806-05:00
399,af0fef4937e78e64b52471f4c6fce906c0ef3e648c7d28...,Ethereum (ETH) Dominates Tokenized Assets: $20...,https://news.google.com/rss/articles/CBMinwFBV...,"Wed, 12 Nov 2025 06:33:15 GMT",Blockchain News,"<a href=""https://news.google.com/rss/articles/...","""tokenized assets"" OR ""market infrastructure"" ...",2025-11-12T11:29:07.233806-05:00
400,6fc30494b436b9f5efa8bab12ddcbaa3ca35b3ab44526e...,Dogecoin Price Prediction: DOGE Eyes $0.21 as ...,https://news.google.com/rss/articles/CBMi5gFBV...,"Wed, 12 Nov 2025 10:40:54 GMT",Coindoo,"<a href=""https://news.google.com/rss/articles/...","""tokenized assets"" OR ""market infrastructure"" ...",2025-11-12T11:29:07.233806-05:00
401,07c352d4a51ef56dc2fad46b9d2d07da40a5aa7025751d...,GL1 Launches Market Infrastructure Toolkit for...,https://news.google.com/rss/articles/CBMiWEFVX...,"Wed, 12 Nov 2025 00:00:00 GMT",Global Legal Chronicle,"<a href=""https://news.google.com/rss/articles/...","""tokenized assets"" OR ""market infrastructure"" ...",2025-11-12T11:29:07.233806-05:00


In [46]:
import google.generativeai as genai

genai.configure(api_key="AIzaSyAm-pqLVce_uYDyOHvJn-wHFaSHp2j3jt8")
model = genai.GenerativeModel("models/gemini-2.5-flash")

llm_c = 0
def llm_counter(link):
    global llm_c
    llm_c += 1
    return model.generate_content(f"For the article in \
    this link, {link}, provide me a summary of the \
    article. 2-3 sentences.").text
    
datafetch = time.time()
df["summary"] = df["link"].apply(
    lambda x: llm_counter(x)
)
llm_summary_time = time.time()

ResourceExhausted: 429 You exceeded your current quota, please check your plan and billing details. For more information on this error, head to: https://ai.google.dev/gemini-api/docs/rate-limits. To monitor your current usage, head to: https://ai.dev/usage?tab=rate-limit. 
* Quota exceeded for metric: generativelanguage.googleapis.com/generate_content_free_tier_requests, limit: 10
Please retry in 7.12409497s. [links {
  description: "Learn more about Gemini API quotas"
  url: "https://ai.google.dev/gemini-api/docs/rate-limits"
}
, violations {
  quota_metric: "generativelanguage.googleapis.com/generate_content_free_tier_requests"
  quota_id: "GenerateRequestsPerMinutePerProjectPerModel-FreeTier"
  quota_dimensions {
    key: "location"
    value: "global"
  }
  quota_dimensions {
    key: "model"
    value: "gemini-2.5-flash"
  }
  quota_value: 10
}
, retry_delay {
  seconds: 7
}
]

In [106]:
import time, json, math
import google.generativeai as genai
from typing import List, Dict

genai.configure(api_key="AIzaSyAm-pqLVce_uYDyOHvJn-wHFaSHp2j3jt8")
MODEL_NAME = "models/gemini-2.5-flash"

SYSTEM_INSTRUCTIONS = (
    "You will receive a list of items, each with an 'id' and a 'url'. "
    "For each url, write a concise 2-3 sentence summary of the article at that link. "
    "Return ONLY a JSON array where each element is an object with keys: "
    "id (copied from input) and summary (a string). "
    "Do not include any other keys. Do not include any text outside the JSON. "
    "Do not include comments, markdown, or explanations."
)

def build_prompt(batch_items: List[Dict]) -> str:
    """
    Build a single prompt with up to 40 items.
    We serialize the input as JSON so the model can return parallel JSON.
    """
    return (
        SYSTEM_INSTRUCTIONS
        + "\n\nINPUT_JSON:\n"
        + json.dumps(batch_items, ensure_ascii=False)
    )

def call_gemini_json(prompt: str, max_retries: int = 3, base_sleep: float = 8.0):
    """
    Call Gemini and parse strict JSON. Retries on quota errors (429) with backoff.
    Returns a Python object parsed from the model's JSON text.
    """
    model = genai.GenerativeModel(MODEL_NAME)
    for attempt in range(1, max_retries + 1):
        try:
            resp = model.generate_content(prompt)
            text = resp.text or ""
            # Try to extract JSON directly
            return json.loads(text)
        except json.JSONDecodeError:
            # If the model wrapped JSON in code fences or added text, try to salvage
            stripped = text.strip()
            # crude fence removal if present
            if stripped.startswith("```"):
                stripped = stripped.strip("`")
                # remove leading language hints like json
                if "\n" in stripped:
                    stripped = stripped.split("\n", 1)[1]
            try:
                return json.loads(stripped)
            except Exception:
                if attempt == max_retries:
                    raise
        except Exception as e:
            # Simple detection of quota/429; fall back to fixed backoff
            # Many Gemini quota errors surface as RESOURCE_EXHAUSTED with suggested retry.
            sleep_s = base_sleep * attempt
            time.sleep(sleep_s)
            if attempt == max_retries:
                raise

# def summarize_links_in_batches(
#     df,
#     link_col: str = "link",
#     out_col: str = "summary",
#     batch_size: int = 40,
#     max_calls: int = 10,
# ) -> None:
#     """
#     Summarize links in df[link_col] using Gemini in batches of <= batch_size.
#     Writes results to df[out_col]. Makes at most max_calls API calls.
#     Partially fills df[out_col] even if later calls fail.
#     """

#     if out_col not in df.columns:
#         df[out_col] = None

#     # Select rows that still need summaries
#     todo_idx = df.index[df[out_col].isna() | (df[out_col] == "")].tolist()
#     if not todo_idx:
#         return  # nothing to do

#     # Cap total items by batch_size * max_calls
#     max_items = batch_size * max_calls
#     todo_idx = todo_idx[:max_items]

#     # Create batches
#     batches = [
#         todo_idx[i : i + batch_size] for i in range(0, len(todo_idx), batch_size)
#     ]

#     calls_made = 0
#     for batch_indices in batches:
#         if calls_made >= max_calls:
#             break

#         batch_items = [{"id": int(i), "url": str(df.at[i, link_col])} for i in batch_indices]
#         prompt = build_prompt(batch_items)

#         try:
#             result = call_gemini_json(prompt)
#         except Exception as e:
#             # Keep partial progress, then stop further calls (stay within max_calls intent)
#             # You could log/print e here if desired.
#             break

#         # Validate and write back
#         # Expecting: [{"id": <df_index>, "summary": "..."} ...]
#         if isinstance(result, list):
#             for obj in result:
#                 try:
#                     row_id = obj["id"]
#                     summary = obj["summary"]
#                     if row_id in df.index:
#                         df.at[row_id, out_col] = summary
#                 except Exception:
#                     # Skip malformed objects; continue with others
#                     continue

#         calls_made += 1

#         # Optional throttle to respect 10 RPM even if something else calls us rapidly.
#         # With max_calls=10 this caps at ~1 minute total if you keep it enabled.
#         # time.sleep(6)

#     # Done. df[out_col] now contains summaries for processed rows.
#     # Unprocessed rows remain None/"" and can be handled in a later run.

# # ---------------------------
# # Example usage
# # ---------------------------
# # summarize_links_in_batches(df, link_col="link", out_col="summary",
# #                            batch_size=40, max_calls=10)
# # df.head()


In [111]:
def summarize_links_in_batches(
    df,
    link_col: str = "link",
    out_col: str = "summary",
) -> None:
    """
    Summarize ONLY the first 400 links from a DataFrame that has > 400 rows.
    Uses the existing batching/JSON architecture (build_prompt + call_gemini_json).
    Writes results to df[out_col] for those 400 rows only.
    """
    import time
    if len(df) <= 400:
        raise ValueError("DataFrame must contain more than 400 rows.")
    df.reset_index(drop=True, inplace=True)
    df = df.loc[:399].copy()
    # Ensure output column exists
    df[out_col] = None
    batches = [[i for i in range(40*x,40*x +40)] for x in range(10)]
    
    calls_made = 0
    results = []
    for batch_indices in batches:
        batch_items = [{"id": int(i), "url": str(df.at[i, link_col])} for i in batch_indices]
        prompt = build_prompt(batch_items)

        try:
            result = call_gemini_json(prompt)
        except Exception:
            # Keep partial progress; stop further calls
            break

        # Expecting: [{"id": <df_index>, "summary": "..."} ...]
        # if isinstance(result, list):
        #     for obj in result:
        #         try:
        #             row_id = obj["id"]
        #             summary = obj["summary"]
        #             if row_id in target_idx:
        #                 df.at[row_id, out_col] = summary
        #         except Exception:
        #             continue
        results.append(result)
        calls_made += 1
    return results

In [113]:
results = summarize_links_in_batches(df)

In [163]:
res_flat = [x for sublist in results for x in sublist]
summary_map = {item["id"]: item["summary"] for item in res_flat}
df["summary"] = df.index.map(summary_map)
df

Unnamed: 0,id,title,link,published,source,summary,query,fetched_at,out_col
0,14afc5960984be56afd562d076fab48345699e74290b5b...,Retail Sales Remain Resilient - BNY,https://news.google.com/rss/articles/CBMiwgFBV...,"Wed, 12 Nov 2025 11:16:25 GMT",BNY,"Australia's new ambassador to China, Scott Dew...","(""BNY Mellon"" OR ""Bank of New York Mellon"" OR ...",2025-11-12T11:29:07.233806-05:00,
1,9de33d87790650e46d84dd144c58350ec47e9dd259bea4...,Black Kite Releases Global Adaptive AI Assessm...,https://news.google.com/rss/articles/CBMihwJBV...,"Wed, 12 Nov 2025 11:00:00 GMT",Morningstar,The Albanese government is considering buildin...,"(""BNY Mellon"" OR ""Bank of New York Mellon"" OR ...",2025-11-12T11:29:07.233806-05:00,
2,6ae6e9a893b3dd72e214e2cfa3dfcbaabf6821621334e3...,ABC Arbitrage SA Takes Position in The Bank of...,https://news.google.com/rss/articles/CBMi1AFBV...,"Wed, 12 Nov 2025 11:46:28 GMT",MarketBeat,"Australia's new ambassador to China, Scott Dew...","(""BNY Mellon"" OR ""Bank of New York Mellon"" OR ...",2025-11-12T11:29:07.233806-05:00,
3,eee8dff2da573b266d4f79039c7858d9139471f4914917...,Will The Bank of New York Mellon Corporation (...,https://news.google.com/rss/articles/CBMitgFBV...,"Wed, 12 Nov 2025 14:55:13 GMT",newser.com,The NSW Premier has announced that the state w...,"(""BNY Mellon"" OR ""Bank of New York Mellon"" OR ...",2025-11-12T11:29:07.233806-05:00,
4,5713a6a6a3fe6c45f3e298e23026fa499dc41426df65b7...,BK Technologies Data Breach Exposes Sensitive ...,https://news.google.com/rss/articles/CBMiugFBV...,"Wed, 12 Nov 2025 11:31:28 GMT",The Critical Communications Review,Australian researchers have developed a ground...,"(""BNY Mellon"" OR ""Bank of New York Mellon"" OR ...",2025-11-12T11:29:07.233806-05:00,
...,...,...,...,...,...,...,...,...,...
398,83b7a326746e1bd2c9a85d144ac8204c0f3df5f88e1f53...,Sebi and IEPFA Host 'Niveshak Shivir' to Recla...,https://news.google.com/rss/articles/CBMixAFBV...,"Tue, 11 Nov 2025 16:46:33 GMT",Devdiscourse,A devastating fire at the Platan Research Inst...,"""tokenized assets"" OR ""market infrastructure"" ...",2025-11-12T11:29:07.233806-05:00,
399,af0fef4937e78e64b52471f4c6fce906c0ef3e648c7d28...,Ethereum (ETH) Dominates Tokenized Assets: $20...,https://news.google.com/rss/articles/CBMinwFBV...,"Wed, 12 Nov 2025 06:33:15 GMT",Blockchain News,A devastating fire at the Platan Research Inst...,"""tokenized assets"" OR ""market infrastructure"" ...",2025-11-12T11:29:07.233806-05:00,
400,6fc30494b436b9f5efa8bab12ddcbaa3ca35b3ab44526e...,Dogecoin Price Prediction: DOGE Eyes $0.21 as ...,https://news.google.com/rss/articles/CBMi5gFBV...,"Wed, 12 Nov 2025 10:40:54 GMT",Coindoo,,"""tokenized assets"" OR ""market infrastructure"" ...",2025-11-12T11:29:07.233806-05:00,
401,07c352d4a51ef56dc2fad46b9d2d07da40a5aa7025751d...,GL1 Launches Market Infrastructure Toolkit for...,https://news.google.com/rss/articles/CBMiWEFVX...,"Wed, 12 Nov 2025 00:00:00 GMT",Global Legal Chronicle,,"""tokenized assets"" OR ""market infrastructure"" ...",2025-11-12T11:29:07.233806-05:00,


In [186]:
import json, os, csv
OUT_CSV = "data/gnews.csv"
df.loc[:399].to_csv(OUT_CSV, mode="a", header=False, index=False, quoting=csv.QUOTE_MINIMAL)

In [184]:
df.summary.loc[100]

'Advances in artificial intelligence and machine learning are rapidly transforming various industries, from healthcare to finance. These technologies are enabling new capabilities, automating complex tasks, and generating unprecedented insights. Ethical considerations and regulatory frameworks are being developed to guide their responsible deployment.'