# Datos del censo

Consulta datos para cada manzano en el geoportal del INE. 

In [15]:
import geopandas as gpd
from io import BytesIO
from IPython.display import clear_output
from pathlib import Path
import asyncio, random, json
import httpx
from openpyxl import load_workbook
from typing import Callable
import pandas as pd

Parámetros para consultas asíncronas.

In [2]:
CONCURRENCY = 10
verify_sem = asyncio.Semaphore(CONCURRENCY)
excel_sem  = asyncio.Semaphore(CONCURRENCY)
_write_lock = asyncio.Lock()
_token_lock = asyncio.Lock()
TOKEN: str | None = None
BASE_HEADERS: dict = {}

Algunas funciones de apoyo.

In [3]:
def build_headers() -> dict:
    h = dict(BASE_HEADERS)
    if TOKEN:
        h["Authorization"]   = f"Bearer {TOKEN}"
        h["X-Session-Token"] = TOKEN
    return h

def parse_excel_bytes(b: bytes, campos: dict) -> dict:
    hoja = load_workbook(BytesIO(b), data_only=True, read_only=True).active
    return {campo: hoja[campo].value for campo in campos.keys()}

def append_jsonl(path: Path, obj: dict):
    path.parent.mkdir(parents=True, exist_ok=True)
    with open(path, "a", encoding="utf-8") as f:
        f.write(json.dumps(obj, ensure_ascii=False) + "\n")

def load_jsonl(path: Path) -> list[dict]:
    if not path.exists():
        return []
    with open(path, "r", encoding="utf-8") as f:
        return [json.loads(line) for line in f if line.strip()]

In [4]:
async def refresh_headers(client: httpx.AsyncClient):
    global TOKEN
    async with _token_lock:
        r = await client.post(
            "https://wgeoportal.ine.gob.bo/api/v1/geoportal/registroSesion",
            timeout=20,
            headers=BASE_HEADERS,
        )
        r.raise_for_status()
        token = r.json().get("session_token")
        if not token:
            raise RuntimeError("registroSesion did not return session_token")
        TOKEN = token
        return TOKEN

In [5]:
def _content_type(resp: httpx.Response) -> str:
    return (resp.headers.get("content-type") or "").lower()

Consultar el número de personas, viviendas y si es posible descargar una ficha con más información.

In [6]:
async def make_request_json(client: httpx.AsyncClient, url: str, json_data: dict, *, max_retries: int = 6):
    backoff = 1.0
    last_exc = None
    for _ in range(max_retries):
        try:
            r = await client.post(url, json=json_data, headers=build_headers(), timeout=30)
            if r.status_code == 200:
                try:
                    js = r.json()
                except Exception:
                    await refresh_headers(client)
                    await asyncio.sleep(0.2)
                else:
                    return r
            elif r.status_code in (401, 403):
                await refresh_headers(client)
            elif r.status_code in (429, 500, 502, 503, 504):
                await asyncio.sleep(backoff + random.random() * 0.25)
                backoff = min(backoff * 2, 8)
            else:
                await asyncio.sleep(0.5)
        except (httpx.TimeoutException, httpx.TransportError) as e:
            last_exc = e
            await asyncio.sleep(backoff + random.random() * 0.25)
            backoff = min(backoff * 2, 8)

    if last_exc:
        raise last_exc
    raise RuntimeError(f"Request failed after {max_retries} attempts: {url}")

Descarga la ficha en formato excel.

In [7]:
async def make_request_bytes(client: httpx.AsyncClient, url: str, json_data: dict, *, max_retries: int = 6):
    backoff = 1.0
    last_exc = None
    for _ in range(max_retries):
        try:
            r = await client.post(url, json=json_data, headers=build_headers(), timeout=60)
            if r.status_code == 200:
                ct = _content_type(r)
                if ("application/vnd.openxmlformats-officedocument.spreadsheetml.sheet" in ct) or \
                   ("application/octet-stream" in ct) or \
                   ("application/vnd.ms-excel" in ct):
                    return r
                if "application/json" in ct:
                    try:
                        js = r.json()
                    except Exception:
                        js = {"message": "non-json while expecting excel"}
                    await refresh_headers(client)
                    await asyncio.sleep(backoff + random.random() * 0.25)
                    backoff = min(backoff * 2, 8)
                else:
                    await asyncio.sleep(backoff + random.random() * 0.25)
                    backoff = min(backoff * 2, 8)
            elif r.status_code in (401, 403):
                await refresh_headers(client)
            elif r.status_code in (429, 500, 502, 503, 504):
                await asyncio.sleep(backoff + random.random() * 0.25)
                backoff = min(backoff * 2, 8)
            else:
                await asyncio.sleep(0.5)
        except (httpx.TimeoutException, httpx.TransportError) as e:
            last_exc = e
            await asyncio.sleep(backoff + random.random() * 0.25)
            backoff = min(backoff * 2, 8)

    if last_exc:
        raise last_exc
    raise RuntimeError(f"Request failed after {max_retries} attempts: {url}")


Descarga datos para un manzano.

In [8]:
async def get_data(
    manzano,
    *,
    client: httpx.AsyncClient,
    campos: dict,
    censo_ndjson_path: Path,
    total: int,
    snapshot_every: int | None = 100,
    code_cast=str,
    debug_sample_false_limit: int = 3,
    debug_bucket: dict | None = None,
):
    global censo, validos, progress_done

    codigo = code_cast(manzano)
    json_data = {"codigos": [codigo]}

    async with verify_sem:
        r = await make_request_json(
            client,
            "https://wgeoportal.ine.gob.bo/api/v1/ficha-tecnica/verificar-validar",
            json_data,
        )
    info = r.json()

    validado = bool(info.get("validado", False))
    personas = info.get("cantidad_personas")
    viviendas = info.get("cantidad_viviendas")

    if (not validado) and debug_bucket is not None:
        if debug_bucket["false_count"] < debug_sample_false_limit:
            debug_bucket["false_count"] += 1
            debug_bucket["false_examples"].append({"codigo": codigo, "payload": info})

    data = {
        "codigo": codigo,
        "validado": validado,
        "personas": personas,
        "viviendas": viviendas,
    }

    if validado:
        async with excel_sem:
            r2 = await make_request_bytes(
                client,
                "https://wgeoportal.ine.gob.bo/api/v1/generar-excel",
                json_data,
            )
        parsed = await asyncio.to_thread(parse_excel_bytes, r2.content, campos)
        data.update(parsed)

    async with _write_lock:
        censo.append(data)
        await asyncio.to_thread(append_jsonl, censo_ndjson_path, data)
        progress_done += 1
        if validado:
            validos += 1

        if snapshot_every and (progress_done % snapshot_every == 0):
            with open("temporal/censo.json", "w", encoding="utf-8") as f:
                json.dump(censo, f, ensure_ascii=False)

        clear_output(wait=True)
        print(f"{progress_done} / {total} ({validos} validos)")

Descarga datos para todos los manzanos restantes.

In [11]:
async def run_censo_async(
    gdf,
    headers: dict,
    campos: dict,
    *,
    censo_ndjson: str | Path = "temporal/censo.ndjson",
    snapshot_every: int | None = 100,
    concurrency: int = 10,
    verify_concurrency: int | None = None,  # set to e.g. 6 if you want stricter gating
    excel_concurrency: int | None = None,   # set to e.g. 2 if you want to be extra gentle
    code_cast: Callable = str,              # ensure codes are strings; change if you need zero-padding
) -> list[dict]:

    global CONCURRENCY, verify_sem, excel_sem, censo, validos, progress_done, BASE_HEADERS, TOKEN

    CONCURRENCY = int(concurrency)
    verify_sem = asyncio.Semaphore(verify_concurrency or CONCURRENCY)
    excel_sem  = asyncio.Semaphore(excel_concurrency  or CONCURRENCY)

    BASE_HEADERS = dict(headers)
    for k in ("Authorization", "X-Session-Token"):
        if k in BASE_HEADERS:
            BASE_HEADERS.pop(k)

    censo_ndjson_path = Path(censo_ndjson)
    censo = load_jsonl(censo_ndjson_path)
    validos = sum(1 for i in censo if i.get("validado"))
    progress_done = len(censo)

    total = int(gdf.shape[0])
    seen = {i["codigo"] for i in censo}
    codes = [codigo for codigo in gdf.codigo if codigo not in seen]

    if not codes:
        clear_output(wait=True)
        print(f"{progress_done} / {total} ({validos} validos) — nothing to do.")
        return censo

    debug_bucket = {"false_count": 0, "false_examples": []}

    async with httpx.AsyncClient(
        http2=False,
        limits=httpx.Limits(
            max_connections=max(verify_sem._value, excel_sem._value),
            max_keepalive_connections=max(verify_sem._value, excel_sem._value),
        ),
        timeout=30,
        headers=BASE_HEADERS,
    ) as client:
        await refresh_headers(client)

        tasks = [
            asyncio.create_task(
                get_data(
                    code,
                    client=client,
                    campos=campos,
                    censo_ndjson_path=censo_ndjson_path,
                    total=total,
                    snapshot_every=snapshot_every,
                    code_cast=code_cast,
                    debug_bucket=debug_bucket,
                )
            )
            for code in codes
        ]
        await asyncio.gather(*tasks)

    with open("temporal/censo.json", "w", encoding="utf-8") as f:
        json.dump(censo, f, ensure_ascii=False)

    if debug_bucket["false_examples"]:
        print("\nSample of 'validado=False' payloads (first few):")
        for ex in debug_bucket["false_examples"][:3]:
            print(json.dumps(ex, ensure_ascii=False)[:1000])

    return censo

Inicializar cabezales básicos, el listado de manzanos y el diccionario de campos para interpretar la ficha en excel con datos del censo.

In [9]:
headers = {
    'User-Agent': 'Mozilla/5.0 (X11; Linux x86_64; rv:144.0) Gecko/20100101 Firefox/144.0',
    'Accept': 'application/json, text/plain, */*',
    'Accept-Language': 'en-US,en;q=0.5',
    'Content-Type': 'application/json',
    'Origin': 'https://geoportal.ine.gob.bo',
    'Sec-GPC': '1',
    'Connection': 'keep-alive',
    'Referer': 'https://geoportal.ine.gob.bo/',
    'Sec-Fetch-Dest': 'empty',
    'Sec-Fetch-Mode': 'cors',
    'Sec-Fetch-Site': 'same-site',
}
gdf = gpd.read_parquet("datos/manzanos.parquet")
with open("recursos/campos.json", "r") as f:
    campos = json.load(f)

Correr!

In [13]:
censo = await run_censo_async(
    gdf,
    headers,
    campos,
    censo_ndjson="temporal/censo.ndjson",
    snapshot_every=100,
    concurrency=10,
    verify_concurrency=6,
    excel_concurrency=2,
)

247346 / 247346 (131788 validos)


Luego de la descarga, armar todo y guardar dos tablas:

- poblacion: valores de población y vivienda para todos los manzanos.
- fichas: valores adicionales del censo para los manzanos donde estén disponibles.

In [2]:
with open("temporal/censo.json", "r") as f:
    df = pd.DataFrame(json.load(f))

In [17]:
df

Unnamed: 0,codigo,validado,personas,viviendas,D13,D14,D15,D16,E13,E14,...,J71,J72,J73,J74,J75,J76,J79,J80,J81,J82
0,04141933837-A,False,0,1,,,,,,,...,,,,,,,,,,
1,04143579299-A,False,10,5,,,,,,,...,,,,,,,,,,
2,04140381143-A,False,1,1,,,,,,,...,,,,,,,,,,
3,04143356216-A,False,0,3,,,,,,,...,,,,,,,,,,
4,04139661122-A,False,0,1,,,,,,,...,,,,,,,,,,
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
247341,11590934602-A,True,21,8,5.0,3.0,1.0,0.0,5.0,4.0,...,0.0,0.0,0.0,3.0,1.0,0.0,3.0,3.0,4.0,3.0
247342,11590440089-A,True,65,13,12.0,14.0,11.0,3.0,7.0,9.0,...,0.0,2.0,0.0,7.0,0.0,1.0,5.0,3.0,6.0,5.0
247343,11591804494-A,True,48,11,5.0,8.0,8.0,1.0,12.0,7.0,...,0.0,0.0,0.0,11.0,0.0,0.0,6.0,6.0,8.0,9.0
247344,11590834901-A,True,28,10,5.0,9.0,5.0,1.0,1.0,5.0,...,0.0,0.0,0.0,8.0,0.0,0.0,0.0,3.0,4.0,1.0


In [20]:
df[["codigo", "validado", "personas", "viviendas"]].to_parquet(
    "datos/poblacion.parquet"
)

In [29]:
df[df.validado][["codigo"] + list(campos.keys())].rename(columns=campos).to_parquet(
    "datos/fichas.parquet"
)