In [1]:
from __future__ import annotations

import asyncio
import csv
import json
import logging
import os
import random
import re
import sys
import time
import unicodedata
from dataclasses import dataclass
from datetime import datetime
from pathlib import Path
from typing import Any, Iterable, Optional

import httpx
from tqdm import tqdm


In [2]:
# ====== KONFIG (zmień tu) ======
TERM = 10
API_ROOT = "https://api.sejm.gov.pl"
USER_AGENT = "sejm-textmining/0.1 (research; contact: kstawarz@student.agh.edu.pl)"  # możesz zmienić

# Wydajność (Mac mini M4 spokojnie uciągnie wysoką współbieżność, ale nie katuj API bez sensu)
MAX_CONCURRENCY = 60          # typowo 40–80 jest OK
MAX_CONNECTIONS = 200
MAX_KEEPALIVE = 50

# Paginacja (ile rekordów na stronę dla list interpelacji / zapytań)
PAGE_LIMIT = 50              # dokumentacja: domyślnie często 50; 100 zwykle działa

# Co pobierać:
DOWNLOAD_TRANSCRIPTS = True
DOWNLOAD_INTERPELLATIONS = True
DOWNLOAD_WRITTEN_QUESTIONS = True
DOWNLOAD_COMMITTEE_SITTINGS = False   # może być bardzo dużo danych

# Załączniki (PDF itp.)
DOWNLOAD_ATTACHMENTS = True

# Katalog projektu: uruchamiaj notebook z root repo (tam gdzie masz np. data/)
PROJECT_ROOT = Path.cwd()

DATA_DIR = PROJECT_ROOT / "data"
RAW_DIR = DATA_DIR / "raw" / f"term{TERM}"
TABLES_DIR = DATA_DIR / "tables" / f"term{TERM}"
LOGS_DIR = PROJECT_ROOT / "logs"

RUN_TAG = datetime.now().strftime("%Y%m%d_%H%M%S")
LOG_PATH = LOGS_DIR / f"sejm_term{TERM}_{RUN_TAG}.log"

RAW_DIR, TABLES_DIR, LOGS_DIR


(PosixPath('/Users/krzysztofstawarz/GithubRepositories/textMiningLingwistykaSejmowa/data/raw/term10'),
 PosixPath('/Users/krzysztofstawarz/GithubRepositories/textMiningLingwistykaSejmowa/data/tables/term10'),
 PosixPath('/Users/krzysztofstawarz/GithubRepositories/textMiningLingwistykaSejmowa/logs'))

In [3]:
def ensure_dirs() -> None:
    # MP
    (RAW_DIR / "mp").mkdir(parents=True, exist_ok=True)
    (RAW_DIR / "mp" / "details").mkdir(parents=True, exist_ok=True)

    # Clubs
    (RAW_DIR / "clubs").mkdir(parents=True, exist_ok=True)
    (RAW_DIR / "clubs" / "details").mkdir(parents=True, exist_ok=True)

    # Proceedings + transcripts
    (RAW_DIR / "proceedings").mkdir(parents=True, exist_ok=True)
    (RAW_DIR / "proceedings" / "details").mkdir(parents=True, exist_ok=True)
    (RAW_DIR / "transcripts").mkdir(parents=True, exist_ok=True)
    (RAW_DIR / "transcripts" / "index_by_day").mkdir(parents=True, exist_ok=True)
    (RAW_DIR / "transcripts" / "html_by_mp").mkdir(parents=True, exist_ok=True)
    (RAW_DIR / "transcripts" / "html_non_mp").mkdir(parents=True, exist_ok=True)

    # Interpellations
    (RAW_DIR / "interpellations").mkdir(parents=True, exist_ok=True)
    (RAW_DIR / "interpellations" / "pages").mkdir(parents=True, exist_ok=True)
    (RAW_DIR / "interpellations" / "by_num").mkdir(parents=True, exist_ok=True)
    (RAW_DIR / "interpellations" / "by_author_ptr").mkdir(parents=True, exist_ok=True)

    # Written questions
    (RAW_DIR / "writtenQuestions").mkdir(parents=True, exist_ok=True)
    (RAW_DIR / "writtenQuestions" / "pages").mkdir(parents=True, exist_ok=True)
    (RAW_DIR / "writtenQuestions" / "by_num").mkdir(parents=True, exist_ok=True)
    (RAW_DIR / "writtenQuestions" / "by_author_ptr").mkdir(parents=True, exist_ok=True)

    # Committees
    (RAW_DIR / "committees").mkdir(parents=True, exist_ok=True)
    (RAW_DIR / "committees" / "details").mkdir(parents=True, exist_ok=True)
    (RAW_DIR / "committees" / "sittings").mkdir(parents=True, exist_ok=True)

    # Tables
    TABLES_DIR.mkdir(parents=True, exist_ok=True)

ensure_dirs()
print("OK:", RAW_DIR)


OK: /Users/krzysztofstawarz/GithubRepositories/textMiningLingwistykaSejmowa/data/raw/term10


In [4]:
LOGS_DIR.mkdir(parents=True, exist_ok=True)

logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s | %(levelname)s | %(message)s",
    handlers=[
        logging.FileHandler(LOG_PATH, encoding="utf-8"),
        logging.StreamHandler(sys.stdout),
    ],
)
logger = logging.getLogger("sejm")
logger.info("Log file: %s", LOG_PATH)
_slug_re = re.compile(r"[^A-Za-z0-9]+")

def slugify(s: str, max_len: int = 80) -> str:
    s = (s or "").strip()
    if not s:
        return "unknown"
    s = unicodedata.normalize("NFKD", s)
    s = "".join(c for c in s if not unicodedata.combining(c))
    s = _slug_re.sub("-", s).strip("-").lower()
    return (s[:max_len] if s else "unknown")

def safe_int(x: Any) -> Optional[int]:
    try:
        if x is None:
            return None
        if isinstance(x, int):
            return x
        s = str(x).strip()
        if not s:
            return None
        if re.fullmatch(r"\d+", s):
            return int(s)
        return None
    except Exception:
        return None

async def write_bytes(path: Path, data: bytes) -> None:
    path.parent.mkdir(parents=True, exist_ok=True)
    await asyncio.to_thread(path.write_bytes, data)

async def write_text(path: Path, text: str) -> None:
    path.parent.mkdir(parents=True, exist_ok=True)
    await asyncio.to_thread(path.write_text, text, encoding="utf-8")

async def write_json(path: Path, obj: Any) -> None:
    text = json.dumps(obj, ensure_ascii=False, indent=2)
    await write_text(path, text)



2026-01-11 14:50:26,585 | INFO | Log file: /Users/krzysztofstawarz/GithubRepositories/textMiningLingwistykaSejmowa/logs/sejm_term10_20260111_145026.log


In [5]:
async def fetch_json_to_file(client: httpx.AsyncClient, url: str, path: Path, *, accept_404: bool = False) -> FetchResult:
    if path.exists() and path.stat().st_size > 0:
        return FetchResult(url=url, status_code=200, ok=True, path=path)

    async with SEM:
        try:
            r = await request_with_retries(client, "GET", url, headers={"Accept": "application/json"}, accept_404=accept_404)
            if accept_404 and r.status_code == 404:
                await write_text(path, "")  # marker
                return FetchResult(url=url, status_code=404, ok=False, path=path, error="404")

            obj = r.json()
            await write_json(path, obj)
            return FetchResult(url=url, status_code=r.status_code, ok=True, path=path)
        except Exception as e:
            logger.exception("JSON fetch failed: %s", url)
            return FetchResult(url=url, status_code=0, ok=False, path=path, error=str(e))

async def fetch_text_to_file(client: httpx.AsyncClient, url: str, path: Path, *, accept: str = "text/html", accept_404: bool = False) -> FetchResult:
    if path.exists() and path.stat().st_size > 0:
        return FetchResult(url=url, status_code=200, ok=True, path=path)

    async with SEM:
        try:
            r = await request_with_retries(client, "GET", url, headers={"Accept": accept}, accept_404=accept_404)
            if accept_404 and r.status_code == 404:
                await write_text(path, "")  # marker
                return FetchResult(url=url, status_code=404, ok=False, path=path, error="404")
            await write_text(path, r.text)
            return FetchResult(url=url, status_code=r.status_code, ok=True, path=path)
        except Exception as e:
            logger.exception("TEXT fetch failed: %s", url)
            return FetchResult(url=url, status_code=0, ok=False, path=path, error=str(e))
            
async def download_mps_and_clubs() -> tuple[list[dict[str, Any]], dict[int, dict[str, Any]], list[dict[str, Any]]]:
    mp_list_url = f"{API_ROOT}/sejm/term{TERM}/MP"
    mp_list_path = RAW_DIR / "mp" / "mp_list.json"

    clubs_url = f"{API_ROOT}/sejm/term{TERM}/clubs"
    clubs_path = RAW_DIR / "clubs" / "clubs.json"

    async with make_client() as client:
        # MP list + clubs list
        await fetch_json_to_file(client, mp_list_url, mp_list_path)
        await fetch_json_to_file(client, clubs_url, clubs_path)

        mp_list = json.loads(mp_list_path.read_text(encoding="utf-8"))
        clubs_list = json.loads(clubs_path.read_text(encoding="utf-8"))

        # MP details
        mp_details: dict[int, dict[str, Any]] = {}
        jobs = []
        for mp in mp_list:
            mp_id = safe_int(mp.get("id") or mp.get("ID") or mp.get("mpId"))
            if not mp_id:
                continue
            url = f"{API_ROOT}/sejm/term{TERM}/MP/{mp_id}"
            path = RAW_DIR / "mp" / "details" / f"{mp_id}.json"
            jobs.append((mp_id, url, path))

        logger.info("MP details to fetch: %d", len(jobs))

        results = []
        with tqdm(total=len(jobs), desc="MP details") as pbar:
            tasks = [fetch_json_to_file(client, url, path) for _, url, path in jobs]
            for coro in asyncio.as_completed(tasks):
                res = await coro
                results.append(res)
                pbar.update(1)

        # read details back (fast enough)
        for mp_id, _, path in jobs:
            if path.exists() and path.stat().st_size > 0:
                try:
                    mp_details[mp_id] = json.loads(path.read_text(encoding="utf-8"))
                except Exception:
                    pass

        return mp_list, mp_details, clubs_list

mp_list, mp_details, clubs_list = await download_mps_and_clubs()
len(mp_list), len(mp_details), len(clubs_list)



NameError: name 'make_client' is not defined

In [None]:
def write_csv(path: Path, rows: Iterable[dict[str, Any]], fieldnames: list[str]) -> None:
    path.parent.mkdir(parents=True, exist_ok=True)
    with path.open("w", encoding="utf-8", newline="") as f:
        w = csv.DictWriter(f, fieldnames=fieldnames)
        w.writeheader()
        for r in rows:
            w.writerow({k: r.get(k, "") for k in fieldnames})

# DIM: MPs
mp_dim_rows = []
for mp in mp_list:
    mp_id = safe_int(mp.get("id") or mp.get("ID") or mp.get("mpId"))
    if not mp_id:
        continue
    det = mp_details.get(mp_id, {})
    mp_dim_rows.append({
        "mp_id": mp_id,
        "name": mp.get("firstLastName") or mp.get("name") or det.get("firstLastName") or "",
        "club": mp.get("club") or det.get("club") or "",
        "districtName": det.get("districtName") or "",
        "voivodeship": det.get("voivodeship") or "",
        "birthDate": det.get("birthDate") or "",
        "birthPlace": det.get("birthPlace") or "",
        "educationLevel": det.get("educationLevel") or det.get("education") or "",
        "profession": det.get("profession") or "",
    })

mp_dim_path = TABLES_DIR / "dim_mp.csv"
write_csv(mp_dim_path, mp_dim_rows, list(mp_dim_rows[0].keys()) if mp_dim_rows else ["mp_id"])
mp_dim_path

# DIM: Clubs
club_rows = []
for c in clubs_list:
    club_rows.append({
        "club_id": c.get("id") or "",
        "name": c.get("name") or "",
        "membersCount": c.get("membersCount") or "",
    })

club_dim_path = TABLES_DIR / "dim_club.csv"
write_csv(club_dim_path, club_rows, ["club_id", "name", "membersCount"])
club_dim_path

# Mapy pomocnicze do katalogowania
mp_id_to_name = {r["mp_id"]: r["name"] for r in mp_dim_rows}
mp_id_to_slug = {mp_id: slugify(name) for mp_id, name in mp_id_to_name.items()}
len(mp_id_to_slug)


In [None]:
async def download_proceedings_and_daily_indexes() -> list[dict[str, Any]]:
    proceedings_url = f"{API_ROOT}/sejm/term{TERM}/proceedings"
    proceedings_path = RAW_DIR / "proceedings" / "proceedings.json"

    async with make_client() as client:
        await fetch_json_to_file(client, proceedings_url, proceedings_path)
        proceedings = json.loads(proceedings_path.read_text(encoding="utf-8"))

        # Zapisz też szczegóły per posiedzenie (jeśli endpoint istnieje)
        # (API praktycznie to ma; gdyby nie, kod się nie wywali – tylko pominie 404)
        unique_nums = []
        for p in proceedings:
            n = safe_int(p.get("num") or p.get("number") or p.get("proceedingNum") or p.get("sitting") or p.get("id"))
            if n is not None:
                unique_nums.append(n)
        unique_nums = sorted(set(unique_nums))

        logger.info("Proceedings discovered: %d", len(unique_nums))

        detail_tasks = []
        for n in unique_nums:
            url = f"{API_ROOT}/sejm/term{TERM}/proceedings/{n}"
            path = RAW_DIR / "proceedings" / "details" / f"{n}.json"
            detail_tasks.append((n, url, path))

        with tqdm(total=len(detail_tasks), desc="Proceeding details") as pbar:
            tasks = [fetch_json_to_file(client, url, path, accept_404=True) for _, url, path in detail_tasks]
            for coro in asyncio.as_completed(tasks):
                _ = await coro
                pbar.update(1)

        # Zbuduj listę (proceedingNum, date) do indeksów transcripts
        day_pairs: list[tuple[int, str]] = []

        def extract_dates(obj: Any) -> list[str]:
            # szukamy listy stringów YYYY-MM-DD w typowych polach, albo rekurencyjnie
            found = set()

            def walk(x: Any) -> None:
                if isinstance(x, str) and re.fullmatch(r"\d{4}-\d{2}-\d{2}", x):
                    found.add(x)
                elif isinstance(x, list):
                    for it in x:
                        walk(it)
                elif isinstance(x, dict):
                    for v in x.values():
                        walk(v)

            # szybkie ścieżki
            if isinstance(obj, dict):
                for key in ("dates", "days", "sittingDays", "sessionDays"):
                    if key in obj:
                        walk(obj[key])
            walk(obj)
            return sorted(found)

        for n in unique_nums:
            # spróbuj wziąć daty z detali; jeśli brak, spróbuj z listy proceedings
            det_path = RAW_DIR / "proceedings" / "details" / f"{n}.json"
            dates = []
            if det_path.exists() and det_path.stat().st_size > 0:
                try:
                    det = json.loads(det_path.read_text(encoding="utf-8"))
                    dates = extract_dates(det)
                except Exception:
                    dates = []

            if not dates:
                # fallback: poszukaj w obiekcie z listy proceedings
                for p in proceedings:
                    pn = safe_int(p.get("num") or p.get("number") or p.get("proceedingNum") or p.get("sitting") or p.get("id"))
                    if pn == n:
                        dates = extract_dates(p)
                        break

            for d in dates:
                day_pairs.append((n, d))

        day_pairs = sorted(set(day_pairs))
        logger.info("Proceeding-day pairs (for transcript indexes): %d", len(day_pairs))

        # Pobierz indeks stenogramów per dzień
        idx_tasks = []
        for n, d in day_pairs:
            url = f"{API_ROOT}/sejm/term{TERM}/proceedings/{n}/{d}/transcripts"
            path = RAW_DIR / "transcripts" / "index_by_day" / f"p{n}" / f"d{d}.json"
            idx_tasks.append((n, d, url, path))

        ok_pairs = []
        with tqdm(total=len(idx_tasks), desc="Transcript indexes") as pbar:
            tasks = [fetch_json_to_file(client, url, path, accept_404=True) for _, _, url, path in idx_tasks]
            for (n, d, _, path), coro in zip(idx_tasks, asyncio.as_completed(tasks)):
                res = await coro
                if res.ok:
                    ok_pairs.append((n, d))
                pbar.update(1)

        return proceedings

proceedings = await download_proceedings_and_daily_indexes()
len(proceedings)


In [None]:
def iter_transcript_indexes() -> Iterable[Path]:
    base = RAW_DIR / "transcripts" / "index_by_day"
    if not base.exists():
        return []
    return base.rglob("d*.json")

statement_rows = []
html_jobs = []  # (url, path, mp_id or None)

for idx_path in iter_transcript_indexes():
    try:
        obj = json.loads(idx_path.read_text(encoding="utf-8"))
    except Exception:
        continue

    proceeding_num = safe_int(obj.get("proceedingNum"))
    date = obj.get("date")
    statements = obj.get("statements") or []

    if not proceeding_num or not date:
        continue

    for st in statements:
        st_num = st.get("num")
        mp_id = safe_int(st.get("memberID"))
        name = st.get("name") or ""
        function = st.get("function") or ""
        unspoken = st.get("unspoken")

        statement_rows.append({
            "term": TERM,
            "proceedingNum": proceeding_num,
            "date": date,
            "statementNum": st_num,
            "memberID": mp_id if mp_id is not None else "",
            "name": name,
            "function": function,
            "startDateTime": st.get("startDateTime") or "",
            "endDateTime": st.get("endDateTime") or "",
            "rapporteur": st.get("rapporteur") if st.get("rapporteur") is not None else "",
            "secretary": st.get("secretary") if st.get("secretary") is not None else "",
            "unspoken": unspoken if unspoken is not None else "",
        })

        # docelowy plik HTML (bez preprocessingu)
        st_num_str = str(st_num)
        url = f"{API_ROOT}/sejm/term{TERM}/proceedings/{proceeding_num}/{date}/transcripts/{st_num_str}"

        if mp_id and mp_id > 0:
            mp_folder = f"{mp_id:03d}-{mp_id_to_slug.get(mp_id, slugify(name))}"
            path = RAW_DIR / "transcripts" / "html_by_mp" / mp_folder / f"p{proceeding_num}" / f"d{date}" / f"s{st_num_str}.html"
        else:
            path = RAW_DIR / "transcripts" / "html_non_mp" / f"p{proceeding_num}" / f"d{date}" / f"s{st_num_str}.html"

        html_jobs.append((url, path))

# Zapisz index CSV
fact_transcripts_path = TABLES_DIR / "fact_transcript_statement_index.csv"
write_csv(
    fact_transcripts_path,
    statement_rows,
    ["term","proceedingNum","date","statementNum","memberID","name","function","startDateTime","endDateTime","rapporteur","secretary","unspoken"]
)
fact_transcripts_path, len(statement_rows), len(html_jobs)


In [None]:
async def download_many_html(jobs: list[tuple[str, Path]], desc: str) -> tuple[int, int]:
    ok = 0
    fail = 0
    async with make_client() as client:
        with tqdm(total=len(jobs), desc=desc) as pbar:
            tasks = [fetch_text_to_file(client, url, path, accept="text/html", accept_404=True) for url, path in jobs]
            for coro in asyncio.as_completed(tasks):
                res = await coro
                if res.ok:
                    ok += 1
                else:
                    fail += 1
                pbar.update(1)
    return ok, fail

if DOWNLOAD_TRANSCRIPTS:
    ok, fail = await download_many_html(html_jobs, desc="Downloading transcripts (HTML)")
    print("OK:", ok, "FAIL:", fail)
else:
    print("DOWNLOAD_TRANSCRIPTS=False, pomijam.")


In [None]:
async def paginated_list(
    client: httpx.AsyncClient,
    url: str,
    page_dir: Path,
    *,
    limit: int = PAGE_LIMIT,
    hard_stop: int = 1_000_000,
) -> list[dict[str, Any]]:
    """
    Pobiera wszystkie strony listy typu:
    GET ...?offset=0&limit=...
    Zwraca spłaszczoną listę rekordów.
    """
    all_items: list[dict[str, Any]] = []
    offset = 0

    while True:
        if offset >= hard_stop:
            logger.warning("Hard stop reached for %s", url)
            break

        page_path = page_dir / f"offset={offset}_limit={limit}.json"
        page_url = url

        # cache
        if page_path.exists() and page_path.stat().st_size > 0:
            items = json.loads(page_path.read_text(encoding="utf-8"))
        else:
            r = await request_with_retries(
                client,
                "GET",
                page_url,
                headers={"Accept": "application/json"},
                params={"offset": offset, "limit": limit},
            )
            items = r.json()
            await write_json(page_path, items)

        if not items:
            break

        all_items.extend(items)

        # jeśli API zwraca mniej niż limit, to zwykle koniec
        if isinstance(items, list) and len(items) < limit:
            break

        offset += limit

    return all_items


In [None]:
async def download_interpellations() -> None:
    base_list_url = f"{API_ROOT}/sejm/term{TERM}/interpellations"
    page_dir = RAW_DIR / "interpellations" / "pages"

    async with make_client() as client:
        items = await paginated_list(client, base_list_url, page_dir, limit=PAGE_LIMIT)
        logger.info("Interpellations list items: %d", len(items))

        nums = []
        for it in items:
            n = safe_int(it.get("num"))
            if n is not None:
                nums.append(n)
        nums = sorted(set(nums))
        logger.info("Interpellations unique nums: %d", len(nums))

        # details
        detail_jobs = []
        for n in nums:
            url = f"{API_ROOT}/sejm/term{TERM}/interpellations/{n}"
            path = RAW_DIR / "interpellations" / "by_num" / f"{n}" / "details.json"
            detail_jobs.append((n, url, path))

        with tqdm(total=len(detail_jobs), desc="Interpellations details") as pbar:
            tasks = [fetch_json_to_file(client, url, path, accept_404=True) for _, url, path in detail_jobs]
            for coro in asyncio.as_completed(tasks):
                _ = await coro
                pbar.update(1)

        # bodies + replies + attachments
        body_jobs = []
        reply_jobs = []
        attach_jobs = []  # (url, path)

        author_ptr_rows = []  # pointer files per author for quick folder iteration
        for n in nums:
            det_path = RAW_DIR / "interpellations" / "by_num" / f"{n}" / "details.json"
            if not det_path.exists() or det_path.stat().st_size == 0:
                continue
            try:
                det = json.loads(det_path.read_text(encoding="utf-8"))
            except Exception:
                continue

            # authors (field "from" bywa listą stringów z numerami legitymacji)
            authors = det.get("from") or []
            author_ids = []
            for a in authors:
                ai = safe_int(a)
                if ai is not None:
                    author_ids.append(ai)

            for ai in author_ids:
                ptr_dir = RAW_DIR / "interpellations" / "by_author_ptr" / f"{ai:03d}-{mp_id_to_slug.get(ai,'unknown')}"
                ptr_path = ptr_dir / f"i{n}.json"
                author_ptr_rows.append({"mp_id": ai, "num": n, "ptr_path": str(ptr_path.relative_to(RAW_DIR))})
                if not ptr_path.exists():
                    await write_json(ptr_path, {"type": "interpellation", "num": n, "target": str(det_path.parent)})

            # body html
            body_url = f"{API_ROOT}/sejm/term{TERM}/interpellations/{n}/body"
            body_path = RAW_DIR / "interpellations" / "by_num" / f"{n}" / "body.html"
            body_jobs.append((body_url, body_path))

            # replies
            for rep in det.get("replies") or []:
                key = rep.get("key")
                if not key:
                    continue
                rep_url = f"{API_ROOT}/sejm/term{TERM}/interpellations/{n}/reply/{key}/body"
                rep_path = RAW_DIR / "interpellations" / "by_num" / f"{n}" / "replies" / f"{key}.html"
                reply_jobs.append((rep_url, rep_path))

                # attachments in reply
                if DOWNLOAD_ATTACHMENTS:
                    for att in rep.get("attachments") or []:
                        aurl = att.get("URL")
                        aname = att.get("name") or "attachment.bin"
                        if aurl:
                            apath = RAW_DIR / "interpellations" / "by_num" / f"{n}" / "attachments" / f"{key}" / aname
                            attach_jobs.append((aurl, apath))

        # zapisz mały indeks pointerów autorów
        ptr_index_path = TABLES_DIR / "ptr_interpellations_by_author.csv"
        if author_ptr_rows:
            write_csv(ptr_index_path, author_ptr_rows, ["mp_id", "num", "ptr_path"])

        # pobieranie HTML
        ok1, fail1 = await download_many_html(body_jobs, "Interpellations bodies (HTML)")
        ok2, fail2 = await download_many_html(reply_jobs, "Interpellations replies (HTML)")
        logger.info("Interpellations body OK=%d FAIL=%d; reply OK=%d FAIL=%d", ok1, fail1, ok2, fail2)

        # pobieranie załączników (streaming)
        async def download_attachment(url: str, path: Path) -> FetchResult:
            if path.exists() and path.stat().st_size > 0:
                return FetchResult(url=url, status_code=200, ok=True, path=path)

            async with SEM:
                try:
                    async with client.stream("GET", url, headers={"Accept": "*/*"}) as r:
                        if r.status_code == 404:
                            await write_bytes(path, b"")  # marker
                            return FetchResult(url=url, status_code=404, ok=False, path=path, error="404")
                        r.raise_for_status()
                        path.parent.mkdir(parents=True, exist_ok=True)
                        # zapis strumieniowy
                        with path.open("wb") as f:
                            async for chunk in r.aiter_bytes():
                                f.write(chunk)
                    return FetchResult(url=url, status_code=200, ok=True, path=path)
                except Exception as e:
                    logger.exception("Attachment failed: %s", url)
                    return FetchResult(url=url, status_code=0, ok=False, path=path, error=str(e))

        if DOWNLOAD_ATTACHMENTS and attach_jobs:
            ok_a = 0
            fail_a = 0
            with tqdm(total=len(attach_jobs), desc="Interpellations attachments") as pbar:
                tasks = [download_attachment(url, path) for url, path in attach_jobs]
                for coro in asyncio.as_completed(tasks):
                    res = await coro
                    if res.ok:
                        ok_a += 1
                    else:
                        fail_a += 1
                    pbar.update(1)
            logger.info("Interpellations attachments OK=%d FAIL=%d", ok_a, fail_a)

if DOWNLOAD_INTERPELLATIONS:
    await download_interpellations()
else:
    print("DOWNLOAD_INTERPELLATIONS=False, pomijam.")


In [None]:
async def download_written_questions() -> None:
    base_list_url = f"{API_ROOT}/sejm/term{TERM}/writtenQuestions"
    page_dir = RAW_DIR / "writtenQuestions" / "pages"

    async with make_client() as client:
        items = await paginated_list(client, base_list_url, page_dir, limit=PAGE_LIMIT)
        logger.info("writtenQuestions list items: %d", len(items))

        nums = []
        for it in items:
            n = safe_int(it.get("num"))
            if n is not None:
                nums.append(n)
        nums = sorted(set(nums))
        logger.info("writtenQuestions unique nums: %d", len(nums))

        # details
        detail_jobs = []
        for n in nums:
            url = f"{API_ROOT}/sejm/term{TERM}/writtenQuestions/{n}"
            path = RAW_DIR / "writtenQuestions" / "by_num" / f"{n}" / "details.json"
            detail_jobs.append((n, url, path))

        with tqdm(total=len(detail_jobs), desc="writtenQuestions details") as pbar:
            tasks = [fetch_json_to_file(client, url, path, accept_404=True) for _, url, path in detail_jobs]
            for coro in asyncio.as_completed(tasks):
                _ = await coro
                pbar.update(1)

        # bodies + replies + attachments
        body_jobs = []
        reply_jobs = []
        attach_jobs = []

        author_ptr_rows = []
        for n in nums:
            det_path = RAW_DIR / "writtenQuestions" / "by_num" / f"{n}" / "details.json"
            if not det_path.exists() or det_path.stat().st_size == 0:
                continue
            try:
                det = json.loads(det_path.read_text(encoding="utf-8"))
            except Exception:
                continue

            authors = det.get("from") or []
            author_ids = []
            for a in authors:
                ai = safe_int(a)
                if ai is not None:
                    author_ids.append(ai)

            for ai in author_ids:
                ptr_dir = RAW_DIR / "writtenQuestions" / "by_author_ptr" / f"{ai:03d}-{mp_id_to_slug.get(ai,'unknown')}"
                ptr_path = ptr_dir / f"q{n}.json"
                author_ptr_rows.append({"mp_id": ai, "num": n, "ptr_path": str(ptr_path.relative_to(RAW_DIR))})
                if not ptr_path.exists():
                    await write_json(ptr_path, {"type": "writtenQuestion", "num": n, "target": str(det_path.parent)})

            body_url = f"{API_ROOT}/sejm/term{TERM}/writtenQuestions/{n}/body"
            body_path = RAW_DIR / "writtenQuestions" / "by_num" / f"{n}" / "body.html"
            body_jobs.append((body_url, body_path))

            for rep in det.get("replies") or []:
                key = rep.get("key")
                if not key:
                    continue
                rep_url = f"{API_ROOT}/sejm/term{TERM}/writtenQuestions/{n}/reply/{key}/body"
                rep_path = RAW_DIR / "writtenQuestions" / "by_num" / f"{n}" / "replies" / f"{key}.html"
                reply_jobs.append((rep_url, rep_path))

                if DOWNLOAD_ATTACHMENTS:
                    for att in rep.get("attachments") or []:
                        aurl = att.get("URL")
                        aname = att.get("name") or "attachment.bin"
                        if aurl:
                            apath = RAW_DIR / "writtenQuestions" / "by_num" / f"{n}" / "attachments" / f"{key}" / aname
                            attach_jobs.append((aurl, apath))

        ptr_index_path = TABLES_DIR / "ptr_writtenQuestions_by_author.csv"
        if author_ptr_rows:
            write_csv(ptr_index_path, author_ptr_rows, ["mp_id", "num", "ptr_path"])

        ok1, fail1 = await download_many_html(body_jobs, "writtenQuestions bodies (HTML)")
        ok2, fail2 = await download_many_html(reply_jobs, "writtenQuestions replies (HTML)")
        logger.info("writtenQuestions body OK=%d FAIL=%d; reply OK=%d FAIL=%d", ok1, fail1, ok2, fail2)

        async def download_attachment(url: str, path: Path) -> FetchResult:
            if path.exists() and path.stat().st_size > 0:
                return FetchResult(url=url, status_code=200, ok=True, path=path)

            async with SEM:
                try:
                    async with client.stream("GET", url, headers={"Accept": "*/*"}) as r:
                        if r.status_code == 404:
                            await write_bytes(path, b"")
                            return FetchResult(url=url, status_code=404, ok=False, path=path, error="404")
                        r.raise_for_status()
                        path.parent.mkdir(parents=True, exist_ok=True)
                        with path.open("wb") as f:
                            async for chunk in r.aiter_bytes():
                                f.write(chunk)
                    return FetchResult(url=url, status_code=200, ok=True, path=path)
                except Exception as e:
                    logger.exception("Attachment failed: %s", url)
                    return FetchResult(url=url, status_code=0, ok=False, path=path, error=str(e))

        if DOWNLOAD_ATTACHMENTS and attach_jobs:
            ok_a = 0
            fail_a = 0
            with tqdm(total=len(attach_jobs), desc="writtenQuestions attachments") as pbar:
                tasks = [download_attachment(url, path) for url, path in attach_jobs]
                for coro in asyncio.as_completed(tasks):
                    res = await coro
                    if res.ok:
                        ok_a += 1
                    else:
                        fail_a += 1
                    pbar.update(1)
            logger.info("writtenQuestions attachments OK=%d FAIL=%d", ok_a, fail_a)

if DOWNLOAD_WRITTEN_QUESTIONS:
    await download_written_questions()
else:
    print("DOWNLOAD_WRITTEN_QUESTIONS=False, pomijam.")


In [None]:
async def download_committees() -> None:
    async with make_client() as client:
        # lista komisji
        committees_url = f"{API_ROOT}/sejm/term{TERM}/committees"
        committees_path = RAW_DIR / "committees" / "committees.json"
        await fetch_json_to_file(client, committees_url, committees_path)
        committees = json.loads(committees_path.read_text(encoding="utf-8"))

        # per komisja: lista posiedzeń
        sitting_jobs = []  # (code, sitting_num)
        for c in committees:
            code = c.get("code") or c.get("id") or c.get("committeeCode")
            if not code:
                continue
            url = f"{API_ROOT}/sejm/term{TERM}/committees/{code}/sittings"
            path = RAW_DIR / "committees" / "sittings" / f"{code}_sittings.json"
            await fetch_json_to_file(client, url, path, accept_404=True)
            if not path.exists() or path.stat().st_size == 0:
                continue
            try:
                sittings = json.loads(path.read_text(encoding="utf-8"))
            except Exception:
                continue
            for s in sittings:
                sn = safe_int(s.get("num") or s.get("number") or s.get("id"))
                if sn is not None:
                    sitting_jobs.append((code, sn))

        sitting_jobs = sorted(set(sitting_jobs))
        logger.info("Committee sittings to fetch: %d", len(sitting_jobs))

        # pobierz HTML/PDF zapis posiedzenia
        html_jobs = []
        pdf_jobs = []
        for code, sn in sitting_jobs:
            html_url = f"{API_ROOT}/sejm/term{TERM}/committees/{code}/sittings/{sn}/html"
            pdf_url = f"{API_ROOT}/sejm/term{TERM}/committees/{code}/sittings/{sn}/pdf"
            html_path = RAW_DIR / "committees" / "sittings" / code / f"{sn}" / "sitting.html"
            pdf_path = RAW_DIR / "committees" / "sittings" / code / f"{sn}" / "sitting.pdf"
            html_jobs.append((html_url, html_path))
            pdf_jobs.append((pdf_url, pdf_path))

        ok_h, fail_h = await download_many_html(html_jobs, "Committees sittings (HTML)")

        # pdf streaming
        async def download_pdf(url: str, path: Path) -> FetchResult:
            if path.exists() and path.stat().st_size > 0:
                return FetchResult(url=url, status_code=200, ok=True, path=path)
            async with SEM:
                try:
                    async with client.stream("GET", url, headers={"Accept": "application/pdf"}) as r:
                        if r.status_code == 404:
                            await write_bytes(path, b"")
                            return FetchResult(url=url, status_code=404, ok=False, path=path, error="404")
                        r.raise_for_status()
                        path.parent.mkdir(parents=True, exist_ok=True)
                        with path.open("wb") as f:
                            async for chunk in r.aiter_bytes():
                                f.write(chunk)
                    return FetchResult(url=url, status_code=200, ok=True, path=path)
                except Exception as e:
                    logger.exception("PDF failed: %s", url)
                    return FetchResult(url=url, status_code=0, ok=False, path=path, error=str(e))

        ok_p = 0
        fail_p = 0
        with tqdm(total=len(pdf_jobs), desc="Committees sittings (PDF)") as pbar:
            tasks = [download_pdf(url, path) for url, path in pdf_jobs]
            for coro in asyncio.as_completed(tasks):
                res = await coro
                if res.ok:
                    ok_p += 1
                else:
                    fail_p += 1
                pbar.update(1)

        logger.info("Committees: HTML OK=%d FAIL=%d; PDF OK=%d FAIL=%d", ok_h, fail_h, ok_p, fail_p)

if DOWNLOAD_COMMITTEE_SITTINGS:
    await download_committees()
else:
    print("DOWNLOAD_COMMITTEE_SITTINGS=False, pomijam.")


In [None]:
def count_files(p: Path, pattern: str) -> int:
    return sum(1 for _ in p.rglob(pattern))

print("MP details:", count_files(RAW_DIR / "mp" / "details", "*.json"))
print("Transcript indexes:", count_files(RAW_DIR / "transcripts" / "index_by_day", "*.json"))
print("Transcript HTML (MP):", count_files(RAW_DIR / "transcripts" / "html_by_mp", "*.html"))
print("Interpellations bodies:", count_files(RAW_DIR / "interpellations" / "by_num", "body.html"))
print("writtenQuestions bodies:", count_files(RAW_DIR / "writtenQuestions" / "by_num", "body.html"))

print("Tables:", list(TABLES_DIR.glob("*.csv")))
print("Log:", LOG_PATH)
