# SDG Multilingual Media Narratives — 01: Collect News via GDELT (MAX 2024–2025)

This notebook fetches **as much data as possible** from the GDELT DOC 2.0 **ArtList** endpoint for the period **2024-01-01 → 2025-12-31**.

Key features:
- Splits the full range into **≤ 90-day windows** (ArtList effectively focuses on recent ~3 months inside any range).
- Paginates via **startrecord** to collect all pages in each window.
- Robust request handling: retries, backoff, and readable error messages for non-JSON responses.
- Saves **two datasets**:
  - **RAW**: all fetched rows (includes duplicates across pagination/windows)
  - **DEDUP**: unique articles by URL (recommended for analysis)

Outputs (in `data/raw/`):
- `gdelt_articles_raw_2024_2025.csv` (+ optional parquet)
- `gdelt_articles_dedup_2024_2025.csv` (+ optional parquet)
- `01_gdelt_collection_report_2024_2025.json`


In [1]:
import os
import re
import json
import time
import math
import random
import hashlib
from datetime import datetime, timezone, timedelta

import requests
import pandas as pd


In [2]:
# ---------- Paths ----------
PROJECT_DIR = os.path.abspath(".")
RAW_DIR = os.path.join(PROJECT_DIR, "data", "raw")
REPORTS_DIR = os.path.join(PROJECT_DIR, "reports")
os.makedirs(RAW_DIR, exist_ok=True)
os.makedirs(REPORTS_DIR, exist_ok=True)

print("RAW_DIR:", RAW_DIR)
print("REPORTS_DIR:", REPORTS_DIR)


RAW_DIR: /Users/sergey/code/sdg-multilingual-media-narratives/data/raw
REPORTS_DIR: /Users/sergey/code/sdg-multilingual-media-narratives/reports


## Query
Tips:
- GDELT treats anything inside **double quotes** as an *exact phrase*. Avoid quoting very short tokens like `"SDG"`.
- If you use `OR`, wrap the whole expression in parentheses.


In [14]:
SDG_TERMS = [
    '"sustainable development"',
    '"sustainable development goals"',
    '"climate change"',
    '"global warming"',
    '"renewable energy"',
    '"public health"',
    "poverty",
    "inequality",
    "biodiversity",
    "SDG",      
    "SDGs",    
]

QUERY = "(" + " OR ".join(SDG_TERMS) + ")"
print("QUERY =", QUERY)


QUERY = ("sustainable development" OR "sustainable development goals" OR "climate change" OR "global warming" OR "renewable energy" OR "public health" OR poverty OR inequality OR biodiversity OR SDG OR SDGs)


## Robust fetch helpers

In [20]:
GDELT_DOC_ENDPOINT = "https://api.gdeltproject.org/api/v2/doc/doc"

def sha1_text(s: str) -> str:
    return hashlib.sha1(s.encode("utf-8", errors="ignore")).hexdigest()

def now_utc_iso() -> str:
    return datetime.now(timezone.utc).replace(microsecond=0).isoformat()

def parse_gdelt_date(date_str: str):
    # GDELT often returns YYYYMMDDHHMMSS
    if not isinstance(date_str, str):
        return pd.NaT
    if re.fullmatch(r"\d{14}", date_str):
        return pd.to_datetime(date_str, format="%Y%m%d%H%M%S", errors="coerce", utc=True)
    return pd.to_datetime(date_str, errors="coerce", utc=True)

def normalize_whitespace(text: str) -> str:
    if not isinstance(text, str):
        return ""
    text = text.replace("\u00a0", " ")
    text = re.sub(r"\s+", " ", text).strip()
    return text

def _balanced_parens(s: str) -> bool:
    depth = 0
    for ch in s:
        if ch == "(":
            depth += 1
        elif ch == ")":
            depth -= 1
            if depth < 0:
                return False
    return depth == 0

def normalize_query_for_gdelt(q: str) -> str:
    # OR blocks must be wrapped in parentheses.
    q = (q or "").strip()
    if " OR " in q and not (q.startswith("(") and q.endswith(")") and _balanced_parens(q)):
        q = f"({q})"
    return q

def _gdelt_get_json(params: dict, max_retries: int = 6, timeout: int = 45):
    """Robust JSON fetch for GDELT.

    Handles cases where the API returns HTML/text (invalid params, short phrases, throttling, etc).
    """
    headers = {
        "User-Agent": "Mozilla/5.0 (compatible; sdg-research-bot/0.1; +https://gdeltproject.org)"
    }

    last_err = None
    for attempt in range(max_retries):
        r = requests.get(GDELT_DOC_ENDPOINT, params=params, headers=headers, timeout=timeout)

        # Retry on transient failures / throttling
        if r.status_code in (429, 500, 502, 503, 504):
            time.sleep(min(30, 1.5 ** attempt) + random.random())
            continue

        r.raise_for_status()

        text = (r.text or "").strip()
        ctype = (r.headers.get("Content-Type") or "").lower()

        if not text:
            last_err = RuntimeError("Empty response body from GDELT (status 200).")
        elif text[0] not in "{[":
            last_err = RuntimeError(
                "Non-JSON response from GDELT. "
                f"Content-Type={ctype!r}. First 300 chars:\n{text[:300]}"
            )
        else:
            try:
                return r.json()
            except Exception as e:
                last_err = RuntimeError(
                    "Failed to parse JSON. "
                    f"Content-Type={ctype!r}. First 300 chars:\n{text[:300]}\nError={e}"
                )

        time.sleep(min(20, 1.5 ** attempt) + random.random())

    raise last_err or RuntimeError("GDELT request failed after retries.")


## Max collection for 2024–2025 (windowed)

Why windows?
- With DOC 2.0 **ArtList**, very long `startdatetime → enddatetime` ranges do not reliably return deep history.
- The common workaround is to split into **≤ 90-day windows** and iterate.

Tune:
- `WINDOW_DAYS`: 30–90 (90 is usually efficient)
- `PAGE_SIZE`: 200–250 (try 250 first)
- `SLEEP_BETWEEN_CALLS`: be polite; increase if you see 429s


In [21]:
# ---------- Collection parameters ----------
WINDOW_DAYS = 90
PAGE_SIZE = 250
SLEEP_BETWEEN_CALLS = 0.35

START = datetime(2024, 1, 1, tzinfo=timezone.utc)
END = datetime(2025, 12, 31, 23, 59, 59, tzinfo=timezone.utc)

def fmt_gdelt_dt(dt: datetime) -> str:
    return dt.astimezone(timezone.utc).strftime("%Y%m%d%H%M%S")

def iter_windows(start: datetime, end: datetime, window_days: int = WINDOW_DAYS):
    cur = start
    delta = timedelta(days=window_days)
    while cur < end:
        nxt = min(end, cur + delta)
        yield cur, nxt
        cur = nxt


In [31]:
def fetch_artlist_window(query: str, start_dt: str, end_dt: str, page_size: int = PAGE_SIZE):
    """
    MAX collector for a time window using *adaptive time splitting*.

    Why: DOC 2.0 ArtList returns up to 250 results (maxrecords) per request.
    If a window returns exactly page_size, it likely hit the cap, so we split the window
    and fetch both halves to get more results.

    Inputs:
      - start_dt/end_dt: YYYYMMDDHHMMSS (UTC)
    Returns:
      - list[dict] of raw GDELT article records for that window (can be > page_size due to splitting)
    """
    # Hard cap per docs: maxrecords up to 250
    page_size = min(int(page_size), 250)

    def _str_to_dt(s: str) -> datetime:
        return datetime.strptime(s, "%Y%m%d%H%M%S").replace(tzinfo=timezone.utc)

    def _dt_to_str(dt: datetime) -> str:
        return dt.astimezone(timezone.utc).strftime("%Y%m%d%H%M%S")

    # Tune these if needed
    MIN_WINDOW = timedelta(hours=9)     # smallest window to split down to (reduce for more data, increase for fewer calls)
    MAX_DEPTH = 20                     # safety guard
    OVERLAP_GUARD = timedelta(seconds=1)

    def _fetch_once(sdt: str, edt: str) -> list:
        params = {
            "query": query,
            "mode": "ArtList",
            "format": "json",
            "maxrecords": page_size,
            "startdatetime": sdt,
            "enddatetime": edt,
            "sort": "DateDesc",
        }
        try:
            payload = _gdelt_get_json(params)
        except Exception as e:
            print(f"⚠️ request failed, skipping shard/page: {repr(e)}")
            return []
        arts = payload.get("articles", []) or []
        time.sleep(SLEEP_BETWEEN_CALLS)
        return arts

    def _split_fetch(s: datetime, e: datetime, depth: int = 0) -> list:
        sdt, edt = _dt_to_str(s), _dt_to_str(e)
        arts = _fetch_once(sdt, edt)

        # Not capped -> done
        if len(arts) < page_size:
            return arts

        # Capped -> try split if window still large enough
        if depth >= MAX_DEPTH or (e - s) <= MIN_WINDOW:
            # We cannot split further; return capped results (best we can do at this granularity)
            print(f"⚠️ Window hit cap={page_size} and cannot split further: {sdt} -> {edt} (depth={depth})")
            return arts

        # Split at midpoint; avoid double-counting the boundary by shifting right window start by 1s
        mid = s + (e - s) / 2
        mid = mid.replace(microsecond=0)
        right_start = mid + OVERLAP_GUARD
        if right_start >= e:
            # Degenerate split; stop splitting
            print(f"⚠️ Degenerate split prevented: {sdt} -> {edt} (depth={depth})")
            return arts

        left = _split_fetch(s, mid, depth + 1)
        right = _split_fetch(right_start, e, depth + 1)

        # Deduplicate within this merged window by URL (GDELT can return repeats across close boundaries)
        seen = set()
        merged = []
        for r in left + right:
            if not isinstance(r, dict):
                continue
            u = r.get("url")
            if not u or u in seen:
                continue
            seen.add(u)
            merged.append(r)

        return merged

    s0 = _str_to_dt(start_dt)
    e0 = _str_to_dt(end_dt)
    print("progress:", start_dt, "->", end_dt)
    return _split_fetch(s0, e0, depth=0)


def normalize_gdelt_record(r: dict) -> dict:
    url = r.get("url")
    title = normalize_whitespace(r.get("title", ""))
    snippet = normalize_whitespace(r.get("snippet", ""))
    seendate = parse_gdelt_date(r.get("seendate", ""))
    source_country = r.get("sourceCountry")
    source_lang = r.get("language") or r.get("sourceLanguage")
    domain = r.get("domain")

    uid = sha1_text((url or "") + "|" + title + "|" + snippet)

    return {
        "id": uid,
        "url": url,
        "title": title,
        "snippet": snippet,
        "seendate_utc": seendate,
        "domain": domain,
        "source_country": source_country,
        "language": source_lang,
        "gdelt_raw": json.dumps(r, ensure_ascii=False),
        "collected_at_utc": now_utc_iso(),
    }


In [32]:
def fetch_gdelt_max_2024_2025(query: str):
    query = normalize_query_for_gdelt(query)

    raw_rows = []
    dedup_rows = []
    seen_urls = set()
    failures = []  # keep errors but do not stop

    for w_start, w_end in iter_windows(START, END, WINDOW_DAYS):
        sdt = fmt_gdelt_dt(w_start)
        edt = fmt_gdelt_dt(w_end)

        print(f"Window {sdt} -> {edt}")

        try:
            raw = fetch_artlist_window(query, sdt, edt, page_size=PAGE_SIZE)
        except Exception as e:
            print(f"❌ Window failed (skipping): {sdt} -> {edt}")
            
            print("   Error:", repr(e))
            failures.append({"start": sdt, "end": edt, "error": repr(e)})

            # checkpoint what we already have
            if raw_rows:
                pd.DataFrame(raw_rows).to_csv(
                    os.path.join(RAW_DIR, "gdelt_articles_raw_2024_2025_checkpoint.csv"),
                    index=False
                )
            if dedup_rows:
                pd.DataFrame(dedup_rows).to_csv(
                    os.path.join(RAW_DIR, "gdelt_articles_dedup_2024_2025_checkpoint.csv"),
                    index=False
                )
            continue

        print("  fetched:", len(raw))

        norm = [normalize_gdelt_record(r) for r in raw if isinstance(r, dict)]
        raw_rows.extend(norm)

        added = 0
        for row in norm:
            u = row.get("url")
            if not u:
                continue
            if u in seen_urls:
                continue
            seen_urls.add(u)
            dedup_rows.append(row)
            added += 1

        print("  unique urls added:", added, "| total unique:", len(seen_urls))
        print("  cumulative raw:", len(raw_rows))
        print("-" * 60)

        # checkpoint after every window
        pd.DataFrame(raw_rows).to_csv(os.path.join(RAW_DIR, "gdelt_articles_raw_2024_2025_checkpoint.csv"), index=False)
        pd.DataFrame(dedup_rows).to_csv(os.path.join(RAW_DIR, "gdelt_articles_dedup_2024_2025_checkpoint.csv"), index=False)

    # save failures log
    if failures:
        fail_path = os.path.join(REPORTS_DIR, "01_gdelt_failures_2024_2025.json")
        with open(fail_path, "w", encoding="utf-8") as f:
            json.dump(failures, f, ensure_ascii=False, indent=2)
        print("Saved failures log:", fail_path)

    df_raw = pd.DataFrame(raw_rows)
    df_dedup = pd.DataFrame(dedup_rows)
    return df_raw, df_dedup


## Run collection (this may take a while depending on volume and rate limits)

In [None]:
df_raw, df_dedup = fetch_gdelt_max_2024_2025(QUERY)

print("DONE")
print("raw shape   :", df_raw.shape)
print("dedup shape :", df_dedup.shape)

# sanity: duplication ratio
raw_urls = df_raw["url"].nunique(dropna=True) if len(df_raw) else 0
dedup_urls = df_dedup["url"].nunique(dropna=True) if len(df_dedup) else 0
print("raw unique urls  :", raw_urls)
print("dedup unique urls:", dedup_urls)


# Load Data

In [37]:
import os, pandas as pd

raw_ckpt = os.path.join(RAW_DIR, "gdelt_articles_raw_2024_2025_checkpoint.csv")
ded_ckpt = os.path.join(RAW_DIR, "gdelt_articles_dedup_2024_2025_checkpoint.csv")

assert os.path.exists(raw_ckpt), f"Checkpoint not found: {raw_ckpt}"
assert os.path.exists(ded_ckpt), f"Checkpoint not found: {ded_ckpt}"

df_raw = pd.read_csv(raw_ckpt)
df_dedup = pd.read_csv(ded_ckpt)

print("Loaded from checkpoints:")
print(" raw  :", df_raw.shape)
print(" dedup:", df_dedup.shape)


Loaded from checkpoints:
 raw  : (56000, 10)
 dedup: (56000, 10)


## Inspect duplicates (why raw > dedup)

In [43]:
if len(df_raw):
    vc = df_raw["url"].value_counts().head(15)
    print("Top duplicated URLs in RAW:")
    display(vc)


Top duplicated URLs in RAW:


url
https://www.newsit.gr/oikonomia/elliniki-oikonomia-2024-oi-anoixtes-prokliseis-kai-to-astathes-diethnes-perivallon-polemon-kai-eklogon-se-ee-kai-ipa/3942401/                                                                                1
https://www.lowellsun.com/2024/01/01/beacon-hill-roll-call-senate-support-of-gov-maura-healey/                                                                                                                                               1
https://www.chelmsfordweeklynews.co.uk/news/national/24020417.energy-bill-price-hike-takes-effect-record-numbers-struggle-debt/                                                                                                              1
https://www.protothema.gr/world/article/1451302/kina-etoimos-na-sunergastei-me-ton-baiden-gia-eirini-kai-anaptuxi-ston-kosmo-dilonei-o-si-tzinping/                                                                                          1
https://www.hallandsposten.se/nyheter/v%

## Save outputs (CSV + Parquet via pyarrow)

In [44]:
raw_csv = os.path.join(RAW_DIR, "gdelt_articles_raw_2024_2025.csv")
dedup_csv = os.path.join(RAW_DIR, "gdelt_articles_dedup_2024_2025.csv")
df_raw.to_csv(raw_csv, index=False)
df_dedup.to_csv(dedup_csv, index=False)

print("Saved CSV:")
print(" -", raw_csv)
print(" -", dedup_csv)


Saved CSV:
 - /Users/sergey/code/sdg-multilingual-media-narratives/data/raw/gdelt_articles_raw_2024_2025.csv
 - /Users/sergey/code/sdg-multilingual-media-narratives/data/raw/gdelt_articles_dedup_2024_2025.csv


In [45]:
# Parquet saving via pyarrow (avoids pandas<->pyarrow extension-type issues)
try:
    import pyarrow as pa
    import pyarrow.parquet as pq

    raw_parquet = os.path.join(RAW_DIR, "gdelt_articles_raw_2024_2025.parquet")
    dedup_parquet = os.path.join(RAW_DIR, "gdelt_articles_dedup_2024_2025.parquet")

    tbl_raw = pa.Table.from_pandas(df_raw, preserve_index=False)
    tbl_dedup = pa.Table.from_pandas(df_dedup, preserve_index=False)

    pq.write_table(tbl_raw, raw_parquet, compression="snappy")
    pq.write_table(tbl_dedup, dedup_parquet, compression="snappy")

    print("Saved Parquet (pyarrow):")
    print(" -", raw_parquet)
    print(" -", dedup_parquet)
except Exception as e:
    print("Parquet save skipped/failed:", repr(e))
    print("CSV files are saved and are sufficient for the next notebooks.")


Saved Parquet (pyarrow):
 - /Users/sergey/code/sdg-multilingual-media-narratives/data/raw/gdelt_articles_raw_2024_2025.parquet
 - /Users/sergey/code/sdg-multilingual-media-narratives/data/raw/gdelt_articles_dedup_2024_2025.parquet


## Write a small report

In [46]:
report = {
    "query": QUERY,
    "start_utc": START.isoformat(),
    "end_utc": END.isoformat(),
    "window_days": WINDOW_DAYS,
    "page_size": PAGE_SIZE,
    "sleep_between_calls": SLEEP_BETWEEN_CALLS,
    "raw_rows": int(df_raw.shape[0]),
    "dedup_rows": int(df_dedup.shape[0]),
    "raw_unique_urls": int(df_raw["url"].nunique(dropna=True)) if len(df_raw) else 0,
    "dedup_unique_urls": int(df_dedup["url"].nunique(dropna=True)) if len(df_dedup) else 0,
    "generated_at_utc": now_utc_iso(),
}

report_path = os.path.join(REPORTS_DIR, "01_gdelt_collection_report_2024_2025.json")
with open(report_path, "w", encoding="utf-8") as f:
    json.dump(report, f, ensure_ascii=False, indent=2)

report_path, report


('/Users/sergey/code/sdg-multilingual-media-narratives/reports/01_gdelt_collection_report_2024_2025.json',
 {'query': '("sustainable development" OR "sustainable development goals" OR "climate change" OR "global warming" OR "renewable energy" OR "public health" OR poverty OR inequality OR biodiversity OR SDG OR SDGs)',
  'start_utc': '2024-01-01T00:00:00+00:00',
  'end_utc': '2025-12-31T23:59:59+00:00',
  'window_days': 90,
  'page_size': 250,
  'sleep_between_calls': 0.35,
  'raw_rows': 495520,
  'dedup_rows': 495520,
  'raw_unique_urls': 495520,
  'dedup_unique_urls': 495520,
  'generated_at_utc': '2026-01-31T19:29:03+00:00'})