In [1]:
"""
------------------------------------------------------------------------
Obter e consolidar consumos horários (por CP4) a partir da API
https://e-redes.opendatasoft.com/api/explore/v2.1/catalog/
        datasets/consumos_horario_codigo_postal/records
 – concorrência com concurrent.futures
 – paginação (limit = 20, offset)
 – interpolação de Outubro (ausente) a partir de Setembro & Novembro
 – ano forçado a 2024
 – médias diárias normalizadas pelo valor de 10-Fev-2024
 – barra de progresso **em tempo-real** (uma página = 1 “tick”)
 – ficheiro de log “download_consumos.log”
------------------------------------------------------------------------

Requisitos:
    pip install pandas requests tqdm
------------------------------------------------------------------------
"""

from __future__ import annotations

import concurrent.futures as cf
import datetime as dt
import logging
import os
import time
from typing import List

import pandas as pd
import requests
from requests.exceptions import ConnectionError, HTTPError
from tqdm.auto import tqdm

# ---------------------------------------------------------------------#
# 1.  Configuração de logging                                          #
# ---------------------------------------------------------------------#
LOGNAME = "download_consumos.log"
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s [%(levelname)s] %(message)s",
    handlers=[
        logging.FileHandler(LOGNAME, mode="w", encoding="utf-8"),
        logging.StreamHandler(),  # também mostra no console
    ],
)
log = logging.getLogger("consumos")

# ---------------------------------------------------------------------#
# 2.  Parâmetros gerais                                                #
# ---------------------------------------------------------------------#
BASE_URL = (
    "https://e-redes.opendatasoft.com/api/explore/v2.1/"
    "catalog/datasets/consumos_horario_codigo_postal/records"
)
PAGE_LIMIT = 20
MAX_WORKERS = 6          # nº de threads
SLEEP_BETWEEN_PAGES = 0.5  # s

# Retry/back-off
MAX_RETRIES = 5
BACKOFF_BASE = 1.5
RETRY_STATUS = {429, 503}    # Too Many Requests, Service Unavailable


# ---------------------------------------------------------------------#
# 3.  Download de uma única página                                     #
# ---------------------------------------------------------------------#
def _fetch_page(cp4: str, offset: int) -> list[dict]:
    """Devolve **até** PAGE_LIMIT registos ou lista vazia se acabar."""
    base_params = {"limit": PAGE_LIMIT, "offset": offset}

    for attempt in range(1, MAX_RETRIES + 1):
        try:
            # 1) CP4 numérico
            params = base_params | {"where": f"codigo_postal={cp4}"}
            r = requests.get(BASE_URL, params=params, timeout=60)
            r.raise_for_status()
            return r.json().get("results", [])

        except HTTPError as exc:
            status = exc.response.status_code
            if status == 400:  # tentar com aspas
                params = base_params | {"where": f"codigo_postal='{cp4}'"}
                r = requests.get(BASE_URL, params=params, timeout=60)
                r.raise_for_status()
                return r.json().get("results", [])

            if status == 404:      # CP4 não existe → sem dados
                log.warning("CP4 %s não encontrado (404).", cp4)
                return []

            if status in RETRY_STATUS and attempt < MAX_RETRIES:
                wait = BACKOFF_BASE ** attempt
                log.info("CP4 %s offset=%d -> %s; retry em %.1fs",
                         cp4, offset, status, wait)
                time.sleep(wait)
                continue
            raise

        except ConnectionError:
            if attempt < MAX_RETRIES:
                wait = BACKOFF_BASE ** attempt
                log.info("Erro de ligação CP4 %s offset=%d; retry em %.1fs",
                         cp4, offset, wait)
                time.sleep(wait)
                continue
            raise

    return []


# ---------------------------------------------------------------------#
# 4.  Download **de todo** um CP4  (thread worker)                     #
#     — actualiza a barra global a cada página                         #
# ---------------------------------------------------------------------#
def _fetch_cp4(cp4: str, pbar: tqdm) -> pd.DataFrame:
    offset = 0
    rows: list[dict] = []
    log.info("Início CP4 %s", cp4)

    while True:
        page = _fetch_page(cp4, offset)
        if not page:
            break
        rows.extend(page)
        offset += PAGE_LIMIT
        pbar.update(1)                     # 1 página concluída
        time.sleep(SLEEP_BETWEEN_PAGES)

    log.info("Fim CP4 %s — %d registos", cp4, len(rows))
    return pd.DataFrame.from_records(rows)


# ---------------------------------------------------------------------#
# 5.  Orquestração do download                                         #
# ---------------------------------------------------------------------#
def _download_all(unique_cp4s: List[str]) -> pd.DataFrame:
    """Descarrega todos os CP4s — barra mostra **páginas** recebidas."""
    results: list[pd.DataFrame] = []

    # barra global (total indefinido → None)
    with tqdm(desc="⬇️ páginas", unit="pág", dynamic_ncols=True) as pbar, \
         cf.ThreadPoolExecutor(max_workers=MAX_WORKERS) as pool:

        futures = [pool.submit(_fetch_cp4, cp4, pbar) for cp4 in unique_cp4s]

        for fut in cf.as_completed(futures):
            try:
                results.append(fut.result())
            except Exception as exc:
                log.error("Falha no futuro: %s", exc)

    return pd.concat(results, ignore_index=True) if results else pd.DataFrame()


# ---------------------------------------------------------------------#
# 6.  Interpolação de Outubro                                          #
# ---------------------------------------------------------------------#
def _interpolar_outubro(df_raw: pd.DataFrame) -> pd.DataFrame:
    df = df_raw.copy()
    df["Data/Hora"] = pd.to_datetime(df["Data/Hora"], errors="coerce")
    df = df.dropna(subset=["Data/Hora"])
    df["Mês"] = df["Data/Hora"].dt.month
    df["Dia"] = df["Data/Hora"].dt.day
    df["Hora"] = df["Data/Hora"].dt.hour

    novas: list[dict] = []

    for codigo in df["Código Postal"].unique():
        df_cp = df[df["Código Postal"] == codigo]
        df_sept, df_nov = df_cp[df_cp["Mês"] == 9], df_cp[df_cp["Mês"] == 11]
        if df_sept.empty or df_nov.empty:
            continue

        m_sept = df_sept.groupby(["Dia", "Hora"])["Energia ativa (kWh)"].mean()
        m_nov = df_nov.groupby(["Dia", "Hora"])["Energia ativa (kWh)"].mean()
        df_r = pd.concat([m_sept.rename("set"), m_nov.rename("nov")], axis=1).dropna()
        if df_r.empty:
            continue

        df_r["out"] = df_r["set"] * (1 + ((df_r["nov"] / df_r["set"]) - 1) * 0.5)
        df_r = df_r.reset_index()

        for _, row in df_r.iterrows():
            dia, hora = int(row["Dia"]), int(row["Hora"])
            if df_cp.query("Mês==10 and Dia==@dia and Hora==@hora").empty:
                novas.append({
                    "Data/Hora": pd.Timestamp(2023, 10, dia, hora),
                    "Energia ativa (kWh)": row["out"],
                    "Código Postal": codigo,
                })

        # duplicar 30→31
        df30 = df_cp.query("Mês==10 and Dia==30")
        for _, row in df30.iterrows():
            hora = int(row["Hora"])
            if df_cp.query("Mês==10 and Dia==31 and Hora==@hora").empty:
                novas.append({
                    "Data/Hora": pd.Timestamp(2023, 10, 31, hora),
                    "Energia ativa (kWh)": row["Energia ativa (kWh)"],
                    "Código Postal": codigo,
                })

    if novas:
        df = pd.concat([df, pd.DataFrame(novas)], ignore_index=True)

    return df.sort_values(["Código Postal", "Data/Hora"]).reset_index(drop=True)


# ---------------------------------------------------------------------#
# 7.  Pipeline principal                                               #
# ---------------------------------------------------------------------#
def obter_consumos_normalizados(df_cp4: pd.DataFrame) -> pd.DataFrame:
    cps = df_cp4["CP4"].astype(str).unique().tolist()
    log.info("CP4s a descarregar: %s", cps)

    raw = _download_all(cps)
    if raw.empty:
        raise RuntimeError("A API não devolveu dados utilizáveis.")

    mapping = {
        "codigo_postal": "Código Postal",
        "cp4": "Código Postal",
        "data_hora": "Data/Hora",
        "consumo_kwh": "Energia ativa (kWh)",
        "energia_ativa_kwh": "Energia ativa (kWh)",
    }
    raw = raw.rename(columns=mapping)

    obrig = {"Código Postal", "Data/Hora", "Energia ativa (kWh)"}
    faltam = obrig - set(raw.columns)
    if faltam:
        raise KeyError(f"Campos em falta na API: {', '.join(sorted(faltam))}")

    df_full = _interpolar_outubro(raw)
    df_full["Data/Hora"] = pd.to_datetime(df_full["Data/Hora"])
    df_full["Data/Hora"] = df_full["Data/Hora"].apply(lambda d: d.replace(year=2024))

    df_full["Data"] = df_full["Data/Hora"].dt.date
    diarios = (
        df_full.groupby(["Código Postal", "Data"])["Energia ativa (kWh)"]
        .mean()
        .reset_index()
    )

    ref = (
        diarios[diarios["Data"] == dt.date(2024, 2, 10)]
        .rename(columns={"Energia ativa (kWh)": "Ref"})
        [["Código Postal", "Ref"]]
    )
    diarios = diarios.merge(ref, on="Código Postal", how="left")
    diarios["Valor normalizado"] = diarios["Energia ativa (kWh)"] / diarios["Ref"]

    log.info("Processamento concluído.")
    return (
        diarios[["Código Postal", "Data", "Valor normalizado"]]
        .sort_values(["Código Postal", "Data"])
        .reset_index(drop=True)
    )

In [None]:
df_base = pd.read_csv(r"./data/perc_cp_PTD.csv")
df_final = obter_consumos_normalizados(df_base)

2025-06-03 15:47:46,643 [INFO] CP4s a descarregar: ['2520', '4910', '2560', '2705', '2580', '2710', '2755', '2785', '2750', '2640', '2970', '2590', '2615', '2645', '2635', '2765', '2565', '2754', '2655', '2780', '2825', '2829', '2775', '2715', '2665', '2756', '4635', '2659', '2550', '2649', '2714', '2725', '2530', '2769', '1685', '2050', '2525', '2729', '2709', '7050', '2644', '2745', '2789', '2779', '2735', '2639', '2670', '2490', '2510', '2605', '2740', '2790', '2700', '2630', '2870', '2784', '1800', '2744', '2739', '2760', '2770', '2730', '2600', '2660', '2774', '2680', '2734', '2749', '2650', '1750', '2761', '1495', '1499', '2610', '2720', '2609', '2795', '2799', '2500', '2794', '2810', '2820', '2900', '2704', '2654', '2910', '1400', '2540', '5160', '2724', '2460', '2614', '2695', '2675', '1675', '4620', '1449', '9325', '2620', '2890', '7300', '5320', '2685', '2625', '9950', '1500', '1950', '8650', '8670', '1990', '1300', '2925', '2855', '1600', '4550', '1050', '1549', '2594', '483

⬇️ páginas: 0pág [00:00, ?pág/s]

2025-06-03 15:47:46,659 [INFO] Início CP4 2520
2025-06-03 15:47:46,661 [INFO] Início CP4 4910
2025-06-03 15:47:46,661 [INFO] Início CP4 2560
2025-06-03 15:47:46,663 [INFO] Início CP4 2705
2025-06-03 15:47:46,664 [INFO] Início CP4 2580
2025-06-03 15:47:46,664 [INFO] Início CP4 2710
2025-06-03 15:56:26,264 [INFO] Fim CP4 2710 — 8016 registos
2025-06-03 15:56:26,284 [INFO] Início CP4 2755
2025-06-03 15:56:31,073 [INFO] Fim CP4 2560 — 8016 registos
2025-06-03 15:56:31,088 [INFO] Início CP4 2785
2025-06-03 15:56:32,408 [INFO] Fim CP4 2580 — 8016 registos
2025-06-03 15:56:32,420 [INFO] Início CP4 2750
2025-06-03 15:56:33,033 [INFO] Fim CP4 4910 — 8016 registos
2025-06-03 15:56:33,047 [INFO] Início CP4 2640
2025-06-03 15:56:33,616 [INFO] Fim CP4 2520 — 8016 registos
2025-06-03 15:56:33,632 [INFO] Início CP4 2970
2025-06-03 15:56:34,483 [INFO] Fim CP4 2705 — 8016 registos
2025-06-03 15:56:34,500 [INFO] Início CP4 2590
2025-06-03 16:05:20,819 [INFO] Fim CP4 2755 — 8016 registos
2025-06-03 16:05