# CO2 Emissions Prediction — ETL WDI (World Bank)

## Objetivo
Extrair indicadores do World Development Indicators (WDI) via API do Banco Mundial e gerar:

- `data/raw/wdi_long.csv` (formato longo: país–indicador–ano–valor)
- `data/processed/wdi_wide.csv` (formato wide estilo Kaggle: 1 linha por país+indicador, colunas por ano)

## Decisões de engenharia
- **Sem falha silenciosa**: respostas inesperadas da API geram erro explicativo.
- **Cache por indicador**: salva em `data/raw/indicators/{INDICATOR}.csv`.
- **Robustez**: retries/backoff + timeouts adequados + pausa curta entre páginas.

In [1]:
# Imports + logging
import json
import logging
import time
from pathlib import Path
from typing import Any, Dict, List, Optional

import pandas as pd
import requests

logging.basicConfig(level=logging.INFO, format="%(asctime)s | %(levelname)s | %(message)s")
logger = logging.getLogger("wdi_etl")

pd.set_option("display.max_columns", 50)
pd.set_option("display.width", 120)

In [2]:
# Configurações (indicadores, anos, paths)
# Intervalo de anos
YEARS = list(range(2000, 2021))
DATE_RANGE = f"{min(YEARS)}:{max(YEARS)}"

# Base da API
BASE = "https://api.worldbank.org/v2"

# Indicadores (WDI codes)
INDICATORS = [
    "EG.ELC.ACCS.ZS",       # Access to electricity (% of population)
    "AG.LND.AGRI.ZS",       # Agricultural land (% of land area)
    "ER.H2O.FWVT.ZS",       # Annual freshwater withdrawals, total (% of internal resources)
    "AG.LND.ARBL.ZS",       # Arable land (% of land area)
    "AG.LND.FRST.ZS",       # Forest area (% of land area)
    "EG.USE.ELEC.KH.PC",    # Electric power consumption (kWh per capita)
    "EG.USE.PCAP.KG.OE",    # Energy use (kg of oil equivalent per capita)
    "EG.ELC.RNEW.ZS",       # Renewable electricity output (% of total electricity output)
    "EG.FEC.RNEW.ZS",       # Renewable energy consumption (% of total final energy consumption)
    "SP.POP.GROW",          # Population growth (annual %)
    "NY.GDP.PCAP.CD",       # GDP per capita (current US$)
    "EN.ATM.CO2E.PC",       # CO2 emissions (metric tons per capita)
]

# Pastas/arquivos
DIR_RAW = Path("data/raw")
DIR_PROC = Path("data/processed")
DIR_INDICATORS = DIR_RAW / "indicators"

PATH_LONG = DIR_RAW / "wdi_long.csv"
PATH_WIDE = DIR_PROC / "wdi_wide.csv"

In [3]:
# Criar pastas (idempotente)
DIR_RAW.mkdir(parents=True, exist_ok=True)
DIR_PROC.mkdir(parents=True, exist_ok=True)
DIR_INDICATORS.mkdir(parents=True, exist_ok=True)

logger.info("Estrutura de pastas pronta: %s | %s | %s", DIR_RAW, DIR_PROC, DIR_INDICATORS)

2026-02-25 12:55:20,338 | INFO | Estrutura de pastas pronta: data/raw | data/processed | data/raw/indicators


In [4]:
# Session com retry/backoff (robusto)
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry

def build_session() -> requests.Session:
    """
    Cria uma requests.Session com retry/backoff para lidar com:
    - timeouts intermitentes
    - 429 (rate limit) e 5xx
    """
    session = requests.Session()

    retry = Retry(
        total=8,
        connect=8,
        read=8,
        backoff_factor=1.0,
        status_forcelist=[429, 500, 502, 503, 504],
        allowed_methods=["GET"],
        raise_on_status=False,
        respect_retry_after_header=True,
    )

    adapter = HTTPAdapter(max_retries=retry, pool_connections=20, pool_maxsize=20)
    session.mount("https://", adapter)
    session.mount("http://", adapter)

    # User-Agent ajuda alguns gateways/proxies e melhora rastreabilidade
    session.headers.update({"User-Agent": "CO2_Emissions_Prediction/1.0 (requests)"})
    return session

SESSION = build_session()

In [5]:
# Request JSON “à prova de falhas silenciosas”
def request_json(url: str, params: Dict[str, Any], session: requests.Session = SESSION) -> Any:
    """
    Faz um GET e retorna JSON, com validação forte.
    Se a API retornar HTML, texto, ou um JSON fora do padrão, levanta erro com contexto.
    """
    # timeouts: (conexão, leitura)
    resp = session.get(url, params=params, timeout=(10, 180))
    resp.raise_for_status()

    # Tenta parsear JSON
    try:
        data = resp.json()
    except Exception:
        snippet = resp.text[:300] if resp.text else ""
        raise ValueError(
            f"Resposta não-JSON da API. url={url} params={params} status={resp.status_code} snippet={snippet!r}"
        )

    # Alguns erros vêm como dict (ex.: {"message": ...})
    if isinstance(data, dict):
        raise ValueError(f"API retornou dict (erro/limite). url={url} params={params} payload={data}")

    return data

In [6]:
# Paginação da API (sem engolir erro)
def wb_get_all_pages(url: str, params: Dict[str, Any], session: requests.Session = SESSION) -> List[dict]:
    """
    Busca todas as páginas do endpoint World Bank v2.

    Esperado: [meta, items]
      - meta: contém 'page' e 'pages'
      - items: lista de registros (ou None quando não há dados)
    """
    params = dict(params)
    params.setdefault("format", "json")
    params.setdefault("per_page", 1000)  # menor = resposta mais leve

    page = 1
    out: List[dict] = []

    while True:
        params["page"] = page
        data = request_json(url, params=params, session=session)

        # Validação do formato esperado
        if not isinstance(data, list) or len(data) < 2:
            raise ValueError(f"Formato inesperado. url={url} params={params} payload_head={str(data)[:200]}")

        meta, items = data[0], data[1]

        if items is None:
            # Não há dados (fim)
            break

        if not isinstance(items, list):
            raise ValueError(f"Items inesperado (não é lista). url={url} params={params} items_head={str(items)[:200]}")

        out.extend(items)

        pages = int(meta.get("pages", 1))
        if page >= pages:
            break

        page += 1
        time.sleep(0.15)  # gentil com a API

    return out

In [7]:
# Smoke test (rodar antes do download completo)
# Smoke test: se isso falhar, não adianta rodar o ETL completo.
test_url = f"{BASE}/country/BRA/indicator/EN.ATM.CO2E.PC"
test_items = wb_get_all_pages(test_url, params={"date": "2010:2012"})
print("Smoke test OK. Itens retornados:", len(test_items))
print("Exemplo (1º item):", test_items[0] if test_items else None)

# Também testmos o endpoint "all" (que é o que usaremos para os indicadores)
test_url_all = f"{BASE}/country/all/indicator/EN.ATM.CO2E.PC"
test_items_all = wb_get_all_pages(test_url_all, params={"date": "2010:2012"})
print("Smoke test (all) OK. Itens retornados:", len(test_items_all))

ValueError: Formato inesperado. url=https://api.worldbank.org/v2/country/BRA/indicator/EN.ATM.CO2E.PC params={'date': '2010:2012', 'format': 'json', 'per_page': 1000, 'page': 1} payload_head=[{'message': [{'id': '175', 'key': 'Invalid format', 'value': 'The indicator was not found. It may have been deleted or archived.'}]}]

In [None]:
# Baixar países e criar valid_country_codes
countries_url = f"{BASE}/country"
countries_items = wb_get_all_pages(countries_url, params={"per_page": 1000})

df_countries = pd.json_normalize(countries_items)

# Marcar agregados (normalmente region.value == "Aggregates")
if "region.value" in df_countries.columns:
    df_countries["is_aggregate"] = df_countries["region.value"].eq("Aggregates")
else:
    df_countries["is_aggregate"] = False

valid_country_codes = set(df_countries.loc[~df_countries["is_aggregate"], "id"].astype(str).tolist())

logger.info("Total entidades (inclui agregados): %s", df_countries.shape[0])
logger.info("Total países/territórios (sem agregados): %s", len(valid_country_codes))

df_countries.head()

In [None]:
# Função: baixar 1 indicador em formato longo
def download_indicator_long(indicator_code: str) -> pd.DataFrame:
    """
    Baixa um indicador para todos os países no intervalo DATE_RANGE,
    devolvendo DataFrame longo padronizado.
    """
    url = f"{BASE}/country/all/indicator/{indicator_code}"
    items = wb_get_all_pages(url, params={"date": DATE_RANGE})

    rows = []
    for it in items:
        ccode = (it.get("country") or {}).get("id")
        cname = (it.get("country") or {}).get("value")
        icode = (it.get("indicator") or {}).get("id")
        iname = (it.get("indicator") or {}).get("value")
        year = it.get("date")
        val = it.get("value")

        if ccode is None or year is None:
            continue

        rows.append(
            {
                "Country Code": str(ccode),
                "Country Name": cname,
                "Indicator Code": icode,
                "Indicator Name": iname,
                "Year": int(year),
                "Value": val,
            }
        )

    df = pd.DataFrame(rows)
    if not df.empty:
        df["Year"] = pd.to_numeric(df["Year"], errors="coerce").astype("Int64")
        df["Value"] = pd.to_numeric(df["Value"], errors="coerce")
        df = df.dropna(subset=["Year"]).copy()
        df["Year"] = df["Year"].astype(int)

    return df

In [None]:
# Cache por indicador (não perder progresso)
def indicator_cache_path(indicator_code: str) -> Path:
    return DIR_INDICATORS / f"{indicator_code}.csv"

def load_or_download_indicator(indicator_code: str, force: bool = False) -> pd.DataFrame:
    """
    Carrega do cache se existir; caso contrário baixa e salva.
    Se o download vier vazio, levanta erro (não salva cache vazio).
    """
    path = indicator_cache_path(indicator_code)

    if path.exists() and not force:
        df = pd.read_csv(path)
        df["Year"] = pd.to_numeric(df["Year"], errors="coerce")
        df = df.dropna(subset=["Year"]).copy()
        df["Year"] = df["Year"].astype(int)
        df["Value"] = pd.to_numeric(df["Value"], errors="coerce")
        return df

    df = download_indicator_long(indicator_code)

    if df.empty:
        raise ValueError(
            f"Download vazio para indicador {indicator_code}. "
            "Abortando para evitar cache vazio. Rode o smoke test e verifique a resposta da API."
        )

    df.to_csv(path, index=False)
    logger.info("Cache salvo: %s | shape=%s", path, df.shape)
    return df

In [None]:
# Construir df_long (a partir do cache por indicador)
dfs = []
for i, ind in enumerate(INDICATORS, start=1):
    logger.info("[%s/%s] Processando indicador: %s", i, len(INDICATORS), ind)
    dfi = load_or_download_indicator(ind, force=False)
    logger.info("  -> linhas=%s | anos=%s..%s", len(dfi), dfi["Year"].min(), dfi["Year"].max())
    dfs.append(dfi)

df_long = pd.concat(dfs, ignore_index=True)

# Garantia de faixa de anos
df_long = df_long[df_long["Year"].between(min(YEARS), max(YEARS))].copy()

if df_long.empty:
    raise ValueError("df_long ficou vazio após concatenar. Isso indica falha sistemática no download/cache.")

df_long.to_csv(PATH_LONG, index=False)
logger.info("Salvo: %s | shape=%s", PATH_LONG, df_long.shape)

df_long.head()

In [None]:
# Limpeza final + filtro de países + validação
# Normalização defensiva
df_long["Year"] = pd.to_numeric(df_long["Year"], errors="coerce")
df_long = df_long.dropna(subset=["Year"]).copy()
df_long["Year"] = df_long["Year"].astype(int)

# Filtra anos (blindagem)
df_long = df_long[df_long["Year"].between(min(YEARS), max(YEARS))].copy()

# Remove agregados (opcional)
df_long = df_long[df_long["Country Code"].isin(valid_country_codes)].copy()

if df_long.empty:
    raise ValueError(
        "df_long ficou vazio após filtro de países. "
        "Provável divergência de códigos; inspecione df_long['Country Code'].unique() e valid_country_codes."
    )

assert df_long["Year"].min() >= min(YEARS)
assert df_long["Year"].max() <= max(YEARS)

logger.info("Após filtros: shape=%s", df_long.shape)
logger.info("Anos presentes: %s..%s", df_long["Year"].min(), df_long["Year"].max())
logger.info("Países: %s | Indicadores: %s",
            df_long["Country Code"].nunique(),
            df_long["Indicator Code"].nunique())

df_long.head()



In [None]:
# Gerar df_wide estilo Kaggle + salvar
df_wide = (
    df_long.pivot_table(
        index=["Country Code", "Country Name", "Indicator Code", "Indicator Name"],
        columns="Year",
        values="Value",
        aggfunc="first",
    )
    .reset_index()
)

base_cols = ["Country Code", "Country Name", "Indicator Code", "Indicator Name"]
year_cols = [c for c in df_wide.columns if isinstance(c, int)]
year_cols = sorted(year_cols)

df_wide = df_wide[base_cols + year_cols]

df_wide.to_csv(PATH_WIDE, index=False)
logger.info("Salvo: %s | shape=%s", PATH_WIDE, df_wide.shape)

df_wide.head()

## Configurações do ETL
- INDICATORS: lista dos códigos WDI (fonte oficial)
- YEARS: intervalo 2000–2020
- PATH_LONG/PATH_WIDE: saídas em disco

In [None]:
INDICATORS = [
    "EG.ELC.ACCS.ZS",       # Access to electricity (% of population)
    "AG.LND.AGRI.ZS",       # Agricultural land (% of land area)
    "ER.H2O.FWVT.ZS",       # Annual freshwater withdrawals, total (% of internal resources)
    "AG.LND.ARBL.ZS",       # Arable land (% of land area)
    "AG.LND.FRST.ZS",       # Forest area (% of land area)
    "EG.USE.ELEC.KH.PC",    # Electric power consumption (kWh per capita)
    "EG.USE.PCAP.KG.OE",    # Energy use (kg of oil equivalent per capita)
    "EG.ELC.RNEW.ZS",       # Renewable electricity output (% of total electricity output)
    "EG.FEC.RNEW.ZS",       # Renewable energy consumption (% of total final energy consumption)
    "SP.POP.GROW",          # Population growth (annual %)
    "NY.GDP.PCAP.CD",       # GDP per capita (current US$)
    "EN.ATM.CO2E.PC",       # CO2 emissions (metric tons per capita)
]

YEARS = list(range(2000, 2021))
DATE_RANGE = f"{min(YEARS)}:{max(YEARS)}"

BASE = "https://api.worldbank.org/v2"

PATH_LONG = Path("data/raw/wdi_long.csv")
PATH_WIDE = Path("data/processed/wdi_wide.csv")

In [None]:
# Criar pastas
Path("data/raw").mkdir(parents=True, exist_ok=True)
Path("data/processed").mkdir(parents=True, exist_ok=True)
logger.info("Pastas data/raw e data/processed prontas.")

## Função utilitária: paginação da APIA 
API do World Bank é paginada. A função abaixo:
- percorre todas as páginas
- acumula os itens
- retorna uma lista única

In [None]:
# Função para capturar todas as paginas
import time

def wb_get_all_pages(url: str, params: dict, session: requests.Session = SESSION) -> list:
    """
    Busca todas as páginas de um endpoint do World Bank (API v2),
    com resiliência a timeouts e respostas lentas.
    """
    params = dict(params)
    params.setdefault("format", "json")
    params.setdefault("per_page", 2000)  # menor = resposta mais leve e menos timeout

    page = 1
    out = []

    while True:
        params["page"] = page

        # timeout pode ser tupla: (connect_timeout, read_timeout)
        resp = session.get(url, params=params, timeout=(10, 180))
        resp.raise_for_status()

        data = resp.json()

        if not isinstance(data, list) or len(data) < 2 or data[1] is None:
            break

        meta, items = data[0], data[1]
        out.extend(items)

        pages = int(meta.get("pages", 1))
        if page >= pages:
            break

        page += 1

        # “educado” com a API (ajuda estabilidade)
        time.sleep(0.2)

    return out

## Países e filtro de agregados
Baixamos o catálogo de países para:
- identificar e remover “Aggregates” (World, regiões, etc.)
- manter apenas países/territórios como unidades de análise

In [None]:
# Baixar países + valid_country_codes
countries_url = f"{BASE}/country"
countries_items = wb_get_all_pages(countries_url, params={"per_page": 20000})
df_countries = pd.json_normalize(countries_items)

# Agregados normalmente aparecem com region.value == "Aggregates"
if "region.value" in df_countries.columns:
    df_countries["is_aggregate"] = df_countries["region.value"].eq("Aggregates")
else:
    df_countries["is_aggregate"] = False  # fallback defensivo

valid_country_codes = set(df_countries.loc[~df_countries["is_aggregate"], "id"].tolist())

logger.info("Total entidades (inclui agregados): %s", df_countries.shape[0])
logger.info("Total países/territórios (sem agregados): %s", len(valid_country_codes))

## Função: download de 1 indicador (formato longo)
Baixa um indicador WDI para todos os países, no intervalo configurado, e devolve:
- Country Code,
- Country Name,
- Indicator Code,
- Indicator Name,
- Year,
- Value.

In [None]:
# Download de indicador (formato longo)

def download_indicator_long(indicator_code: str) -> pd.DataFrame:
    """
    Baixa um indicador WDI (todos países) para o intervalo DATE_RANGE.

    Retorna um DataFrame no formato longo:
      (Country, Indicator, Year) -> Value
    """
    url = f"{BASE}/country/all/indicator/{indicator_code}"
    items = wb_get_all_pages(url, params={"date": DATE_RANGE})

    rows = []
    for it in items:
        # A API é semi-estruturada; usamos .get com fallback seguro
        ccode = (it.get("country") or {}).get("id")
        cname = (it.get("country") or {}).get("value")
        icode = (it.get("indicator") or {}).get("id")
        iname = (it.get("indicator") or {}).get("value")
        year = it.get("date")
        val = it.get("value")

        if ccode is None or year is None:
            continue

        rows.append({
            "Country Code": ccode,
            "Country Name": cname,
            "Indicator Code": icode,
            "Indicator Name": iname,
            "Year": int(year),
            "Value": val,
        })

    df = pd.DataFrame(rows)

    # Normaliza tipos (Value pode vir None)
    if not df.empty:
        df["Value"] = pd.to_numeric(df["Value"], errors="coerce")
        df["Year"] = df["Year"].astype(int)

    return df

# Construção do dataset LONG (com cache)
- Se data/raw/wdi_long.csv existir → carrega
- Caso contrário → baixa todos os indicadores e salva


In [None]:
# Construir df_long com cache
if PATH_LONG.exists():
    df_long = pd.read_csv(PATH_LONG)
    df_long["Year"] = df_long["Year"].astype(int)
    df_long["Value"] = pd.to_numeric(df_long["Value"], errors="coerce")
    logger.info("Carregado do cache: %s | shape=%s", PATH_LONG, df_long.shape)
else:
    dfs = []
    for i, ind in enumerate(INDICATORS, start=1):
        logger.info("[%s/%s] Baixando indicador: %s", i, len(INDICATORS), ind)
        dfs.append(download_indicator_long(ind))

    df_long = pd.concat(dfs, ignore_index=True)
    df_long = df_long[df_long["Year"].between(min(YEARS), max(YEARS))]

    df_long.to_csv(PATH_LONG, index=False)
    logger.info("Salvo: %s | shape=%s", PATH_LONG, df_long.shape)

In [None]:
df_long.head()

## Filtro final + checagens
### Limpeza final + checagens
- Remove agregados (opcional)
- Valida intervalo de anos
- Reporta cardinalidades úteis

In [None]:
print("shape:", df_long.shape)
print("Year dtype:", df_long["Year"].dtype)
print("Year min/max:", df_long["Year"].min(), df_long["Year"].max())
print("Year nulos:", df_long["Year"].isna().sum())

print("shape após filtro de anos:", df_long[df_long["Year"].between(min(YEARS), max(YEARS))].shape)
print("shape após filtro de países:", df_long[df_long["Country Code"].isin(valid_country_codes)].shape)

In [None]:
# Normalizar Year
df_long["Year"] = pd.to_numeric(df_long["Year"], errors="coerce")
df_long = df_long.dropna(subset=["Year"]).copy()
df_long["Year"] = df_long["Year"].astype(int)

# Filtrar anos primeiro
df_long = df_long[df_long["Year"].between(min(YEARS), max(YEARS))].copy()

# Só então filtrar países (e validar se não zerou)
df_long = df_long[df_long["Country Code"].isin(valid_country_codes)].copy()

if df_long.empty:
    raise ValueError("df_long ficou vazio após filtros (anos/país). Comente o filtro de países ou revise valid_country_codes.")

# Agora sim faz sentido validar min/max
assert df_long["Year"].min() >= min(YEARS)
assert df_long["Year"].max() <= max(YEARS)

In [None]:
# Aplicar filtro e validar códigos de país
# Garantir tipo correto e remover anos inválidos
df_long["Year"] = pd.to_numeric(df_long["Year"], errors="coerce")
df_long = df_long.dropna(subset=["Year"]).copy()
df_long["Year"] = df_long["Year"].astype(int)

# Reaplicar filtro de anos (blindagem)
df_long = df_long[df_long["Year"].between(min(YEARS), max(YEARS))].copy()

# Agora sim: filtro de países (opcional)
df_long = df_long[df_long["Country Code"].isin(valid_country_codes)].copy()

assert df_long["Year"].min() >= min(YEARS)
assert df_long["Year"].max() <= max(YEARS)