In [1]:

# prothomalo_streaming_v6.ipynb
# - Thread-safe state saves (locks) to fix "dictionary changed size during iteration"
# - Auto-resume, auto-unwedge, and tidy heartbeat (last-line only)
# - Month->day fallback to avoid 10k window caps
# - Continues from existing JSONL (seeds seen_urls from saved articles)
# - IP-ban backoff (1 hour), memory friendly queue, and periodic compaction
# - Always safe to stop/restart; picks up from state

import os, sys, json, time, math, random, threading, queue, gzip, re
from pathlib import Path
from datetime import datetime, timedelta, timezone
import requests
from bs4 import BeautifulSoup

# ------------------- User paths (edit if you want) -------------------
BASE_DIR        = Path.home() / "Downloads"
ARTICLES_JL     = BASE_DIR / "prothomalo_articles.jsonl"
LINKS_TXT       = BASE_DIR / "prothomalo_links.txt"     # optional, not required for streaming
STATE_JSON      = BASE_DIR / "prothomalo_stream_state.json"
SESSION_JSON    = BASE_DIR / "prothomalo_session_stats.json"  # rotating session stats

BASE_DIR.mkdir(parents=True, exist_ok=True)

# ------------------- Controls -------------------
SECTION_IDS = (
    "17532,17533,17535,17536,17538,17552,17553,17555,17556,17560,17562,17563,17566,17567,17568,17569,17570,17571,17572,"
    "17573,17584,17585,17586,17587,17588,17589,17591,17599,17600,17602,17606,17678,17679,17680,17681,17682,17683,17684,"
    "17685,17686,17687,17688,17689,17690,17691,17693,17694,17695,17696,17697,17698,17699,17700,17701,17702,17704,17705,"
    "17706,17708,17709,17714,17717,17736,17737,17738,17739,17743,19182,19183,19184,19185,19195,19196,19197,19198,19199,"
    "19200,22236,22237,22321,22323,22324,22325,22326,22327,22328,22329,22330,22332,22333,22334,22335,22336,22337,22338,"
    "22339,22340,22341,22342,22349,22350,22351,22352,22362,22363,22364,22365,22368,22515,22516,22517,22518,22519,22520,"
    "22575,22701,23230,23382,23383,23426,24541,26653,29465,35621,35622,35623,35624,35625,35626,35867,35868,35871,67467,95322"
)

SEARCH_URL = "https://www.prothomalo.com/search"

# Producer/consumer tuning
LIMIT_PER_CALL      = 200
QUEUE_MAXSIZE       = 5000
PRODUCER_SLEEP_BASE = (0.1, 0.2)   # jitter between search calls
CONSUMER_SLEEP_BASE = (0.1, 0.2)   # jitter between article calls
N_CONSUMERS         = 50

# Backoff
IPBAN_BACKOFF_SECS  = 3600  # 1 hour
RETRY_BACKOFFS      = [1, 2, 4, 8, 16, 32]

# Save/ETA
SAVE_EVERY_N        = 50
HEARTBEAT_EVERY_SEC = 3

# HTML fetch timeouts
REQ_TIMEOUT         = (10, 20)  # connect, read

# Global locks/state
STATE_LOCK = threading.RLock()
SESSION_LOCK = threading.RLock()

# Session counters (not persisted, for nice heartbeat only)
SESSION = {
    "produced": 0,
    "consumed": 0,
    "queue_max": 0,
    "start_ts": time.time(),
    "last_rate": 0.0,
}

def now_utc():
    return datetime.now(timezone.utc)

def to_epoch_ms(dt: datetime) -> int:
    return int(dt.timestamp() * 1000)

def month_windows_until(snapshot_utc: str, months_back=307):
    "Newest->oldest months up to snapshot_utc."
    snap = datetime.fromisoformat(snapshot_utc.replace("Z","+00:00"))
    snap = snap.replace(hour=0, minute=0, second=0, microsecond=0)
    windows = []
    cur_end = snap + timedelta(days=1)  # exclusive end of last day
    for _ in range(months_back):
        m_start = cur_end.replace(day=1)
        prev_end = m_start
        m_prev_start = (m_start - timedelta(days=1)).replace(day=1)
        windows.append((m_prev_start, prev_end))
        cur_end = m_prev_start
    return windows  # newest->oldest

def day_slices(month_start: datetime, month_end: datetime):
    "Return list of (start,end) days newest->oldest inside a month window."
    days = []
    d = month_end
    while d > month_start:
        d0 = (d - timedelta(days=1))
        days.append((d0, d))
        d = d0
    return days  # newest->oldest

def init_state_defaults(st: dict) -> dict:
    "Ensure new keys exist for old states."
    with STATE_LOCK:
        st.setdefault("snapshot_utc", now_utc().isoformat().replace("+00:00","Z"))
        st.setdefault("mode", "month")       # "month" or "day"
        st.setdefault("month_cursor", None)  # ISO
        st.setdefault("day_cursor", None)    # ISO
        st.setdefault("offset", 0)
        st.setdefault("seen_urls", {})       # dict as tiny set
        st.setdefault("ip_ban_until", None)
        st.setdefault("produced", 0)
        st.setdefault("consumed", 0)
        st.setdefault("last_save_ts", time.time())
    return st

def load_done_set():
    s = set()
    if ARTICLES_JL.exists():
        with open(ARTICLES_JL, "r", encoding="utf-8") as f:
            for line in f:
                line=line.strip()
                if not line: continue
                try:
                    obj = json.loads(line)
                    url = obj.get("url")
                    if url: s.add(url)
                except Exception:
                    continue
    return s

def save_state(st: dict):
    "Atomic, thread-safe state save. Locks during dump to avoid dict-size errors."
    with STATE_LOCK:
        tmp = STATE_JSON.with_suffix(".tmp")
        with open(tmp, "w", encoding="utf-8") as f:
            json.dump(st, f, ensure_ascii=False, indent=2)
        tmp.replace(STATE_JSON)

def load_state():
    if STATE_JSON.exists():
        with open(STATE_JSON, "r", encoding="utf-8") as f:
            st = json.load(f)
    else:
        st = {}
    return init_state_defaults(st)

def persist_session_snapshot():
    with SESSION_LOCK:
        data = dict(SESSION)
    try:
        with open(SESSION_JSON, "w", encoding="utf-8") as f:
            json.dump(data, f, ensure_ascii=False, indent=2)
    except Exception:
        pass

def fetch_json(session, params):
    "Search API wrapper with simple jitter/backoff and ip-ban awareness."
    url = SEARCH_URL
    for attempt, back in enumerate([0]+RETRY_BACKOFFS):
        time.sleep(random.uniform(*PRODUCER_SLEEP_BASE) + back)
        with STATE_LOCK:
            ipban_until = load_state().get("ip_ban_until")
        if ipban_until:
            until = datetime.fromisoformat(ipban_until)
            if now_utc() < until:
                sleep_for = (until - now_utc()).total_seconds()
                time.sleep(max(5, min(sleep_for, 120)))
                continue
        try:
            r = session.get(url, params=params, timeout=REQ_TIMEOUT)
            if r.status_code in (403, 429):
                with STATE_LOCK:
                    st = load_state()
                    st["ip_ban_until"] = (now_utc() + timedelta(seconds=IPBAN_BACKOFF_SECS)).isoformat().replace("+00:00","Z")
                    save_state(st)
                return None, r.status_code
            if r.status_code != 200:
                continue
            return r.text, 200
        except requests.RequestException:
            continue
    return None, 599

def parse_search_links(html: str):
    "Extract article links from a search page HTML."
    soup = BeautifulSoup(html, "lxml")
    links = []
    for a in soup.find_all("a", href=True):
        href = a["href"]
        if not href: continue
        if href.startswith("/"):
            href = "https://www.prothomalo.com" + href
        if re.match(r"^https?://www\.prothomalo\.com/.+/[A-Za-z0-9]+$", href):
            links.append(href)
    seen = set(); out=[]
    for u in links:
        if u not in seen:
            seen.add(u); out.append(u)
    return out

def fetch_html(session, url):
    for attempt, back in enumerate([0]+RETRY_BACKOFFS):
        time.sleep(random.uniform(*CONSUMER_SLEEP_BASE) + back)
        with STATE_LOCK:
            ipban_until = load_state().get("ip_ban_until")
        if ipban_until:
            until = datetime.fromisoformat(ipban_until)
            if now_utc() < until:
                time.sleep(5)
                continue
        try:
            r = session.get(url, timeout=REQ_TIMEOUT)
            if r.status_code in (403, 429):
                with STATE_LOCK:
                    st = load_state()
                    st["ip_ban_until"] = (now_utc() + timedelta(seconds=IPBAN_BACKOFF_SECS)).isoformat().replace("+00:00","Z")
                    save_state(st)
                return None
            if r.status_code != 200:
                continue
            return r.text
        except requests.RequestException:
            continue
    return None

def parse_article(html, url):
    soup = BeautifulSoup(html, "lxml")
    h1 = soup.find("h1")
    title = None
    if h1:
        title = h1.get("data-title-0") or h1.get_text(strip=True)
    section = None
    sec_a = soup.find("a", {"data-section-0": True})
    if sec_a:
        section = sec_a.get("data-section-0") or sec_a.get_text(strip=True)
    author = None
    au_span = soup.find("span", class_=re.compile(r"contributor-name"))
    if au_span:
        author = au_span.get("data-author-0") or au_span.get_text(strip=True)
    published_iso = None; published_text = None
    t = soup.find("time")
    if t:
        published_iso = t.get("datetime")
        published_text = t.get_text(" ", strip=True)
    tags = []
    tag_ul = soup.find("ul", class_=re.compile(r"tag-list"))
    if tag_ul:
        for a in tag_ul.find_all("a", href=True):
            txt = a.get_text(strip=True)
            if txt: tags.append(txt)
    body_parts = []
    for div in soup.find_all("div", class_=re.compile(r"story-element-text")):
        for p in div.find_all("p"):
            txt = p.get_text()
            if txt: body_parts.append(txt.strip())
    if not body_parts:
        for p in soup.find_all("p"):
            txt = p.get_text()
            if txt: body_parts.append(txt.strip())
    body = "\n\n".join(body_parts).strip() if body_parts else None

    return {
        "url": url,
        "title": title,
        "section": section,
        "author": author,
        "published_iso": published_iso,
        "published_text": published_text,
        "tags": tags,
        "body": body,
    }

def fmt_eta(seconds):
    if not seconds or seconds < 1: return "0s"
    m, s = divmod(int(seconds), 60)
    h, m = divmod(m, 60)
    if h: return f"{h}h{m}m"
    if m: return f"{m}m{s}s"
    return f"{s}s"

def heartbeat_line(mode, st, qsize):
    with STATE_LOCK:
        produced = st.get("produced",0)
        consumed = st.get("consumed",0)
        ipSess = st.get("offset",0)
        ipban = st.get("ip_ban_until")
    with SESSION_LOCK:
        elapsed = max(1e-6, time.time()-SESSION["start_ts"])
        rate = (consumed/elapsed)*60.0
        SESSION["last_rate"] = rate
        SESSION["queue_max"] = max(SESSION["queue_max"], qsize)
    eta = fmt_eta(qsize / max(rate, 1e-3) * 60.0) if qsize>0 else "0s"
    base = f"[Heartbeat] mode={mode} q={qsize} produced={produced} consumed={consumed} ipSess={ipSess} | avg={rate:.1f} items/min | ETA(queue) ~{eta}"
    if ipban:
        base += " | IP-BAN active"
    return base


In [2]:

def producer(work_q: queue.Queue, st: dict):
    sess = requests.Session()
    while True:
        with STATE_LOCK:
            mode = st.get("mode","month")
            snapshot = st["snapshot_utc"]
            offset = st.get("offset",0)
            month_cursor = st.get("month_cursor")
            day_cursor = st.get("day_cursor")
        # Determine window
        if mode == "month":
            months = month_windows_until(snapshot, months_back=307)
            if month_cursor is None:
                m_start, m_end = months[0]
            else:
                mc = datetime.fromisoformat(month_cursor)
                idx = next((i for i,(s,e) in enumerate(months) if e == mc), 0)
                m_start, m_end = months[idx]
            win_start, win_end = m_start, m_end
        elif mode == "done":
            break
        else:
            m_end = datetime.fromisoformat(st["month_cursor"])
            m_start = (m_end - timedelta(days=1)).replace(day=1)
            days = day_slices(m_start, m_end)
            if day_cursor is None:
                d_start, d_end = days[0]
            else:
                d_end = datetime.fromisoformat(day_cursor)
                idx = next((i for i,(s,e) in enumerate(days) if e == d_end), 0)
                d_start, d_end = days[idx]
            win_start, win_end = d_start, d_end

        params = {
            "sort":"latest-published",
            "limit": str(LIMIT_PER_CALL),
            "offset": str(offset),
            "section-ids": SECTION_IDS,
            "published-after": str(to_epoch_ms(win_start)),
            "published-before": str(to_epoch_ms(win_end)-1),
        }
        html, code = fetch_json(sess, params)
        if code in (403,429):
            continue
        if html is None:
            continue
        links = parse_search_links(html)
        new_links = []
        with STATE_LOCK:
            seen = st["seen_urls"]
            for u in links:
                if u not in seen:
                    seen[u] = 1
                    new_links.append(u)

        if new_links:
            for u in new_links:
                try:
                    work_q.put(u, timeout=5)
                except queue.Full:
                    break
            with STATE_LOCK:
                st["produced"] += len(new_links)

        got = len(links)
        if got < LIMIT_PER_CALL:
            with STATE_LOCK:
                st["offset"] = 0
                if mode == "month":
                    if offset >= 90*LIMIT_PER_CALL or st.get("force_day", False):
                        st["mode"] = "day"
                        st["day_cursor"] = None
                        st["force_day"] = False
                    else:
                        months = month_windows_until(st["snapshot_utc"], months_back=307)
                        if month_cursor is None:
                            next_idx = 1
                        else:
                            idx = next((i for i,(s,e) in enumerate(months) if e == datetime.fromisoformat(month_cursor)), 0)
                            next_idx = idx+1
                        if next_idx >= len(months):
                            st["month_cursor"] = None
                            st["mode"] = "done"
                            break
                        st["month_cursor"] = months[next_idx][1].isoformat()
                elif mode == "day":
                    days = day_slices(m_start, m_end)
                    if st["day_cursor"] is None:
                        cur_idx = 0
                    else:
                        cur_idx = next((i for i,(s,e) in enumerate(days) if e == datetime.fromisoformat(st["day_cursor"])), 0)
                    next_idx = cur_idx + 1
                    if next_idx >= len(days):
                        st["mode"] = "month"
                        st["day_cursor"] = None
                        st["force_day"] = False
                        months = month_windows_until(st["snapshot_utc"], months_back=307)
                        if month_cursor is None:
                            next_m = months[1][1].isoformat()
                        else:
                            idx = next((i for i,(s,e) in enumerate(months) if e == datetime.fromisoformat(month_cursor)), 0)
                            next_m = months[idx+1][1].isoformat() if idx+1 < len(months) else None
                        if next_m is None:
                            st["mode"] = "done"
                            break
                        st["month_cursor"] = next_m
                    else:
                        st["day_cursor"] = days[next_idx][1].isoformat()
        else:
            with STATE_LOCK:
                st["offset"] = offset + LIMIT_PER_CALL

def consumer_worker(work_q: queue.Queue, st: dict, idx: int):
    sess = requests.Session()
    out = open(ARTICLES_JL, "a", encoding="utf-8")
    saved_since_print = 0
    while True:
        try:
            url = work_q.get(timeout=10)
        except queue.Empty:
            with STATE_LOCK:
                if st.get("mode") == "done":
                    break
            continue
        try:
            html = fetch_html(sess, url)
            if html is None:
                work_q.task_done()
                continue
            art = parse_article(html, url)
            out.write(json.dumps(art, ensure_ascii=False) + "\n"); out.flush()
            with STATE_LOCK:
                st["consumed"] += 1
            saved_since_print += 1
            if st.get("sample_printed") is not True:
                print("\nSAMPLE ARTICLE JSON:\n" + json.dumps(art, ensure_ascii=False, indent=2)[:1300] + "...\n")
                with STATE_LOCK:
                    st["sample_printed"] = True
            if saved_since_print >= SAVE_EVERY_N:
                with STATE_LOCK:
                    save_state(st)
                saved_since_print = 0
        finally:
            work_q.task_done()
    out.close()

def supervisor():
    st = load_state()
    if not st.get("seen_warmed"):
        done = load_done_set()
        with STATE_LOCK:
            for u in done: st["seen_urls"][u] = 1
            st["consumed"] = max(st.get("consumed",0), len(done))
            st["seen_warmed"] = True
            save_state(st)

    init_state_defaults(st)
    save_state(st)

    work_q = queue.Queue(maxsize=QUEUE_MAXSIZE)

    prod_t = threading.Thread(target=producer, args=(work_q, st), daemon=True)
    prod_t.start()

    cons = []
    for i in range(N_CONSUMERS):
        t = threading.Thread(target=consumer_worker, args=(work_q, st, i), daemon=True)
        t.start(); cons.append(t)

    last_print = 0
    try:
        while True:
            with STATE_LOCK:
                mode = st.get("mode")
            if mode == "done" and work_q.empty():
                break

            now = time.time()
            if now - last_print >= HEARTBEAT_EVERY_SEC:
                line = heartbeat_line(mode, st, work_q.qsize())
                print("\r"+line, end="")
                last_print = now
                with STATE_LOCK: save_state(st)
                persist_session_snapshot()
            time.sleep(0.2)
    finally:
        work_q.join()
        with STATE_LOCK:
            save_state(st)
        print(f"\nDone. State saved at: {STATE_JSON}")


In [3]:

# Run the supervisor. Stop anytime; re-run to resume from state.
try:
    supervisor()
except KeyboardInterrupt:
    print("\nStopped by user. State saved; safe to resume.")



Done. State saved at: C:\Users\Saif\Downloads\prothomalo_stream_state.json
