In [None]:
# TESTE DE CONEXÃO, DESCOMENTAR EM AMBIENTE SAGEMAKER
# Este script testa se o ambiente do SageMaker tem as permissões corretas para acessar um bucket S3.
# Ele tenta listar os buckets, escrever um arquivo de teste, ler esse arquivo e depois removê-lo.
# Se todas as operações forem bem-sucedidas, o ambiente está corretamente configurado.
# Caso contrário, mensagens de erro ajudarão a diagnosticar o problema.

# import boto3
# import uuid

# # Substitua pelo nome do seu bucket
# BUCKET_NAME = 'labriscoenergeticomod2'
# # Defina o "caminho" (prefixo) da pasta que você deseja testar
# FOLDER_PATH = 'raw'

# # Cria um cliente S3
# s3_client = boto3.client('s3')

# print("--- Iniciando o teste de permissões do S3 ---")

# try:
#     # 1. Tentar listar os buckets (verifica a permissão 's3:ListAllMyBuckets')
#     print("Tentando listar os buckets...")
#     response = s3_client.list_buckets()
#     buckets = [b['Name'] for b in response['Buckets']]
#     print(f"Buckets acessíveis: {buckets}")

#     # 2. Tentar escrever um arquivo de teste no bucket e pasta especificados
#     print(f"\nTentando escrever um arquivo de teste em s3://{BUCKET_NAME}/{FOLDER_PATH}...")
    
#     # Gera um nome de arquivo único para evitar conflitos
#     test_key = f"{FOLDER_PATH}/teste_acesso_{uuid.uuid4()}.txt"
#     test_content = "Este é um arquivo de teste para verificar as permissões de escrita do SageMaker."

#     s3_client.put_object(
#         Bucket=BUCKET_NAME,
#         Key=test_key,
#         Body=test_content.encode('utf-8')
#     )

#     print(f"\n✅ SUCESSO! O arquivo de teste '{test_key}' foi criado com sucesso.")
#     print(f"Você pode verificar o arquivo no S3 em: https://s3.console.aws.amazon.com/s3/object/{BUCKET_NAME}?prefix={test_key}")

#     # 3. Tentar ler o arquivo de teste (verifica a permissão 's3:GetObject')
#     print("\nTentando ler o arquivo de teste...")
#     obj = s3_client.get_object(
#         Bucket=BUCKET_NAME,
#         Key=test_key
#     )
#     read_content = obj['Body'].read().decode('utf-8')
#     print(f"Conteúdo lido: '{read_content}'")
    
#     # 4. Tentar remover o arquivo de teste (verifica a permissão 's3:DeleteObject')
#     print("\nTentando remover o arquivo de teste...")
#     s3_client.delete_object(
#         Bucket=BUCKET_NAME,
#         Key=test_key
#     )
#     print("✅ SUCESSO! O arquivo de teste foi removido.")

#     print("\nO ambiente do SageMaker tem as permissões de leitura, escrita e exclusão corretas para o bucket.")

# except Exception as e:
#     print(f"\n❌ ERRO: Falha no teste de acesso. Detalhes do erro:")
#     print(e)
#     if "Access Denied" in str(e):
#         print("\nO erro 'Access Denied' indica que o role IAM do seu notebook não tem permissão para realizar esta ação no S3.")
#     elif "Invalid bucket name" in str(e):
#         print("\nO erro 'Invalid bucket name' indica que o nome do bucket está incorreto. Verifique a variável BUCKET_NAME.")
#     else:
#         print("\nOcorreu um erro inesperado. Verifique a configuração do seu bucket e as políticas do seu role IAM.")

--- Iniciando o teste de permissões do S3 ---
Tentando listar os buckets...
Buckets acessíveis: ['amazon-sagemaker-101859807606-us-east-2-29c5f0787bb5', 'labriscoenergeticomod2']

Tentando escrever um arquivo de teste em s3://labriscoenergeticomod2/raw...

✅ SUCESSO! O arquivo de teste 'raw/teste_acesso_d48d8e23-f594-4748-bfc0-3199bfb1311f.txt' foi criado com sucesso.
Você pode verificar o arquivo no S3 em: https://s3.console.aws.amazon.com/s3/object/labriscoenergeticomod2?prefix=raw/teste_acesso_d48d8e23-f594-4748-bfc0-3199bfb1311f.txt

Tentando ler o arquivo de teste...
Conteúdo lido: 'Este é um arquivo de teste para verificar as permissões de escrita do SageMaker.'

Tentando remover o arquivo de teste...
✅ SUCESSO! O arquivo de teste foi removido.

O ambiente do SageMaker tem as permissões de leitura, escrita e exclusão corretas para o bucket.


In [5]:
import argparse
import io
import os
import re
import zipfile
import boto3
from dataclasses import dataclass
from pathlib import Path
from typing import Dict, List, Optional, Tuple
from datetime import datetime

import pandas as pd
import requests

CKAN_BASE = "https://dados.ons.org.br/api/3/action"
SITE_BASE = CKAN_BASE.split("/api/3", 1)[0]

# --- Configurações de S3 CORRIGIDAS ---
S3_BUCKET = "labriscoenergeticomod2"
S3_FOLDER_RAW = "raw"
S3_FOLDER_BRONZE = "bronze"
S3_CLIENT = boto3.client("s3")

# Slugs preferidos no CKAN por dataset (evita selecionar pacotes de detalhamento)
PREFERRED_SLUG_BY_QUERY: Dict[str, str] = {
    "Balanço de Energia nos Subsistemas": "balanco_energia_subsistema_ho",
    "Intercâmbios Entre Subsistemas": "intercambio_nacional_ho",
    "ENA Diário por Subsistema": "ena_subsistema_di",
    "EAR Diário por Subsistema": "ear_subsistema_di",
    "Restrição de Operação por Constrained-off de Usinas Eólicas": "restricao_coff_eolica_tm",
    "Restrição de Operação por Constrained-off de Usinas Fotovoltaicas": "restricao_coff_fotovoltaica_tm",
    "Carga Verificada": "carga_verificada_tm",
}

# --- Funções auxiliares (sem alteração) ---
def _norm(s: str) -> str:
    """Normaliza texto (minúsculas, sem acentos, sem pontuação extra)."""
    import unicodedata
    s = s.strip().lower()
    s = unicodedata.normalize("NFKD", s)
    s = "".join(c for c in s if not unicodedata.combining(c))
    s = re.sub(r"[^a-z0-9]+", " ", s)
    s = re.sub(r"\s+", " ", s).strip()
    return s

def _s3_upload_file_from_memory(buffer: io.BytesIO, bucket_name: str, s3_key: str):
    """Faz o upload de um buffer de bytes para o S3."""
    S3_CLIENT.put_object(Body=buffer.getvalue(), Bucket=bucket_name, Key=s3_key)

def _is_s3_file_exists(bucket_name: str, s3_key: str) -> bool:
    """Verifica se um objeto existe no S3."""
    try:
        S3_CLIENT.head_object(Bucket=bucket_name, Key=s3_key)
        return True
    except S3_CLIENT.exceptions.ClientError:
        return False

def _s3_download_file_to_memory(bucket_name: str, s3_key: str) -> io.BytesIO:
    """Baixa um objeto do S3 para um buffer de bytes na memória."""
    obj = S3_CLIENT.get_object(Bucket=bucket_name, Key=s3_key)
    return io.BytesIO(obj["Body"].read())

def _pick_resource(
    resources: List[dict], prefer_formats: Tuple[str, ...] = ("CSV", "XLSX", "ZIP")
) -> Optional[dict]:
    """Escolhe o melhor recurso (CSV/XLSX/ZIP mais recente) entre os disponíveis."""
    if not resources:
        return None
    def key(r):
        fmt = (r.get("format") or r.get("mimetype") or "").upper()
        try:
            fmt_idx = prefer_formats.index(fmt)
        except ValueError:
            fmt_idx = len(prefer_formats)
        last = r.get("last_modified") or r.get("created") or ""
        return (fmt_idx, last)
    return sorted(resources, key=key)[0]

class CkanClient:
    """Cliente simplificado para interagir com a API CKAN do dados.ons.org.br."""
    def __init__(self, base: str = CKAN_BASE, timeout: int = 60):
        self.base = base.rstrip("/")
        self.timeout = timeout
    def _get(self, path: str, params: Optional[dict] = None) -> dict:
        url = f"{self.base}/{path.lstrip('/')}"
        r = requests.get(url, params=params, timeout=self.timeout)
        r.raise_for_status()
        j = r.json()
        if not j.get("success"):
            raise RuntimeError(f"CKAN call failed: {url}")
        return j["result"]
    def search_package(self, query: str, rows: int = 10) -> List[dict]:
        res = self._get("package_search", {"q": query, "rows": rows})
        return res.get("results", [])
    def show_package(self, package_id: str) -> dict:
        return self._get("package_show", {"id": package_id})

@dataclass
class DatasetSpec:
    """Especificação de dataset a baixar do CKAN."""
    query: str
    out_name: str
    note: str = ""
    resource_filter: Optional[str] = None

DEFAULT_SPECS: Dict[str, DatasetSpec] = {
    "balanco": DatasetSpec(
        query="Balanço de Energia nos Subsistemas",
        out_name="ons_balanco_subsistema_horario.csv",
        note="Usado para derivar geração por fonte (hidro/term/eolica/fv).",
        resource_filter=r"(?i)hor(a|á)ria|hour|hora",
    ),
    "intercambio": DatasetSpec(
        query="Intercâmbios Entre Subsistemas",
        out_name="ons_intercambios_entre_subsistemas_horario.csv",
        note="Fluxos entre subsistemas para calcular import/export do SE/CO.",
        resource_filter=r"(?i)hor(a|á)ria|hour|hora",
    ),
    "ena": DatasetSpec(
        query="ENA Diário por Subsistema",
        out_name="ons_ena_diario_subsistema.csv",
        note="ENA diária por subsistema (mwmed).",
        resource_filter=r"(?i)di(á|a)rio|daily",
    ),
    "ear": DatasetSpec(
        query="EAR Diário por Subsistema",
        out_name="ons_ear_diario_subsistema.csv",
        note="EAR diária por subsistema (%).",
        resource_filter=r"(?i)di(á|a)rio|daily",
    ),
    "corte_eolica": DatasetSpec(
        query="Restrição de Operação por Constrained-off de Usinas Eólicas",
        out_name="ons_constrained_off_eolica_mensal.csv",
        note="Cortes eólicos (mensal).",
        resource_filter=r"(?i)mensal|monthly",
    ),
    "corte_fv": DatasetSpec(
        query="Restrição de Operação por Constrained-off de Usinas Fotovoltaicas",
        out_name="ons_constrained_off_fv_mensal.csv",
        note="Cortes FV (mensal).",
        resource_filter=r"(?i)mensal|monthly",
    ),
    "carga": DatasetSpec(
        query="Carga Verificada",
        out_name="ons_carga.csv",
        note="Carga verificada (horária ou diária).",
        resource_filter=None,
    ),
}

def _parse_since_to_date(since: Optional[str]) -> Optional[str]:
    """Converte um ano ou ano-mês em uma string de data completa."""
    if not since:
        return None
    try:
        parts = since.split("-")
        if len(parts) == 1 and len(parts[0]) == 4:
            return f"{int(parts[0]):04d}-01-01"
        elif len(parts) >= 2:
            return f"{int(parts[0]):04d}-{int(parts[1]):02d}-01"
    except Exception:
        return None
    return None

def fetch_carga_api(
    out_s3_key: str,
    since: Optional[str] = None,
    until: Optional[str] = None,
    area: str = "SECO",
    overwrite: bool = False,
    verbose: bool = True,
) -> Optional[str]:
    """Baixa Carga Verificada via API e salva em S3."""
    # CORRIGIDO: Define a chave S3 completa combinando a pasta e o nome do arquivo
    s3_key_full = f"{S3_FOLDER_RAW}/{out_s3_key}"

    if _is_s3_file_exists(S3_BUCKET, s3_key_full) and not overwrite:
        if verbose:
            print(
                f"[CKAN] Já existe s3://{S3_BUCKET}/{s3_key_full}; pulando (use --overwrite para baixar novamente)"
            )
        return s3_key_full
    
    start_str = _parse_since_to_date(since) or (datetime.utcnow().strftime("%Y-%m-01"))
    start = pd.to_datetime(start_str)
    end = (
        pd.to_datetime(until)
        if until
        else pd.to_datetime(datetime.utcnow().strftime("%Y-%m-%d"))
    )
    if end < start:
        end = start

    base_url = "https://apicarga.ons.org.br/prd/cargaverificada"
    step = pd.Timedelta(days=89)

    dfs = []
    cur = start
    while cur <= end:
        j_ini = cur.strftime("%Y-%m-%d")
        j_fim = min(cur + step, end).strftime("%Y-%m-%d")
        params = {"dat_inicio": j_ini, "dat_fim": j_fim, "cod_areacarga": area}
        if verbose:
            print(f"[ONS API] Carga {j_ini} -> {j_fim} ({area})")
        try:
            r = requests.get(base_url, params=params, timeout=(15, 60))
            r.raise_for_status()
            j = r.json()
            if isinstance(j, dict):
                arr = next((v for v in j.values() if isinstance(v, list)), [])
            elif isinstance(j, list):
                arr = j
            else:
                arr = []
            if not arr:
                cur += step + pd.Timedelta(days=1)
                continue
            df = pd.DataFrame(arr)
            dfs.append(df)
        except Exception as e:
            if verbose:
                print(f"[ONS API] Falha {j_ini}->{j_fim}: {e}")
        cur += step + pd.Timedelta(days=1)

    if not dfs:
        if verbose:
            print("[ONS API] Nenhum dado retornado para Carga.")
        return None

    raw = pd.concat(dfs, ignore_index=True)
    buffer = io.BytesIO()
    raw.to_csv(buffer, index=False)
    buffer.seek(0)
    # CORRIGIDO: Usa a variável S3_BUCKET e a chave completa
    _s3_upload_file_from_memory(buffer, S3_BUCKET, s3_key_full)
    return s3_key_full

def _download_resource_to_memory(url: str) -> io.BytesIO:
    """Baixa um recurso de uma URL para um buffer de bytes na memória."""
    with requests.get(url, stream=True, timeout=(15, 300)) as r:
        r.raise_for_status()
        total = int(r.headers.get("Content-Length", 0))
        read = 0
        last_mb_print = 0
        buffer = io.BytesIO()
        for chunk in r.iter_content(chunk_size=1024 * 1024):  # 1MB
            if not chunk:
                continue
            buffer.write(chunk)
            read += len(chunk)
            if total and (read - last_mb_print) >= 50 * 1024 * 1024:
                mb = read / (1024 * 1024)
                tot_mb = total / (1024 * 1024)
                print(f"  [download] {mb:.0f}/{tot_mb:.0f} MB")
                last_mb_print = read
        buffer.seek(0)
        return buffer

def _maybe_convert_to_csv_in_memory(in_buffer: io.BytesIO, file_url: str) -> io.BytesIO:
    """Converte um arquivo (XLS, XLSX, ZIP) para um buffer CSV em memória."""
    ext = Path(file_url).suffix.lower()
    df = None
    if ext == ".csv":
        in_buffer.seek(0)
        try:
            df = pd.read_csv(in_buffer, sep=None, engine="python")
        except Exception:
            in_buffer.seek(0)
            df = pd.read_csv(in_buffer, encoding='latin1', sep=None, engine="python")
    elif ext in (".xlsx", ".xls"):
        df = pd.read_excel(in_buffer)
    elif ext == ".zip":
        with zipfile.ZipFile(in_buffer, "r") as z:
            infos = sorted(z.infolist(), key=lambda i: i.file_size, reverse=True)
            choice = None
            for info in infos:
                name = info.filename.lower()
                if name.endswith(".csv") or name.endswith(".xlsx"):
                    choice = info
                    break
            if choice is None:
                choice = infos[0]
            with z.open(choice) as f:
                content = f.read()
            if choice.filename.lower().endswith(".csv"):
                df = pd.read_csv(io.BytesIO(content), sep=None, engine="python")
            else:
                df = pd.read_excel(io.BytesIO(content))
    else:
        try:
            in_buffer.seek(0)
            df = pd.read_csv(in_buffer, sep=None, engine="python")
        except Exception:
            in_buffer.seek(0)
            try:
                df = pd.read_excel(in_buffer)
            except Exception:
                raise ValueError("Formato de arquivo não suportado")
    if df is not None:
        out_buffer = io.BytesIO()
        df.to_csv(out_buffer, index=False)
        out_buffer.seek(0)
        return out_buffer
    raise ValueError("Não foi possível converter o arquivo para CSV.")

def _append_csv_buffers(sources: List[io.BytesIO], target: io.BytesIO):
    """Concatena múltiplos buffers CSV em um único buffer `target`."""
    if not sources:
        raise ValueError("Nenhuma fonte para concatenar")
    wrote_any = False
    for i, src in enumerate(sources):
        src.seek(0)
        if not wrote_any:
            target.write(src.read())
            wrote_any = True
        else:
            src.readline()
            target.write(src.read())

def _parse_iso_dt(s: Optional[str]) -> Optional[datetime]:
    """Converte uma string de data (formato ISO) em um objeto datetime."""
    if not s:
        return None
    for fmt in ("%Y-%m-%dT%H:%M:%S.%f", "%Y-%m-%dT%H:%M:%S", "%Y-%m-%d"):
        try:
            return datetime.strptime(s.split("Z")[0], fmt)
        except Exception:
            continue
    return None

def _extract_period_ym_from_resource(r: dict) -> Optional[Tuple[int, Optional[int]]]:
    """Extrai (ano, mes?) do recurso a partir do nome/descrição/URL ou metadados."""
    key = _period_key_from_resource(r)
    m = re.match(r"^(\d{4})(?:-(\d{2}))?$", key)
    if m:
        y = int(m.group(1))
        mth = int(m.group(2)) if m.group(2) else None
        return y, mth
    return None

def _parse_since_until(
    since: Optional[str], until: Optional[str]
) -> Tuple[Optional[Tuple[int, Optional[int]]], Optional[Tuple[int, Optional[int]]]]:
    """Analisa as strings 'since' e 'until' em tuplas (ano, mês)."""
    def _p(val: Optional[str]) -> Optional[Tuple[int, Optional[int]]]:
        if not val:
            return None
        m = re.match(r"^(\d{4})(?:-(\d{2}))?$", val)
        if not m:
            return None
        y = int(m.group(1))
        mth = int(m.group(2)) if m.group(2) else None
        return y, mth
    return _p(since), _p(until)

def _resource_matches_since_until(
    r: dict, since: Optional[str], until: Optional[str]
) -> bool:
    """Decide se um recurso entra no intervalo."""
    if not since and not until:
        return True
    r_ym = _extract_period_ym_from_resource(r)
    s_ym, u_ym = _parse_since_until(since, until)
    if r_ym:
        ry, rm = r_ym
        ok_since = True
        ok_until = True
        if s_ym:
            sy, sm = s_ym
            ok_since = (ry > sy) or (
                ry == sy and (sm is None or (rm is not None and rm >= sm))
            )
        if u_ym:
            uy, um = u_ym
            ok_until = (ry < uy) or (
                ry == uy and (um is None or (rm is not None and rm <= um))
            )
        return ok_since and ok_until
    name = (r.get("name") or "") + " " + (r.get("description") or "")
    ok_since = True if not since else (since in name)
    ok_until = True if not until else (until in name)
    return ok_since and ok_until

def _list_resources(
    client: CkanClient, query: str, resource_filter: Optional[str], verbose: bool
) -> Tuple[dict, List[dict]]:
    """Busca o pacote mais relevante para uma query e lista seus recursos."""
    pkgs = client.search_package(query, rows=50)
    if not pkgs:
        if verbose:
            print(f"[CKAN] Nenhum pacote encontrado para query: {query}")
        return {}, []
    qn = _norm(query)
    pref_slug = PREFERRED_SLUG_BY_QUERY.get(query, "").lower()
    def score(pkg):
        title = _norm(pkg.get("title", ""))
        s = 0
        if qn in title or title in qn:
            s += 10
        mm = pkg.get("metadata_modified") or ""
        name = (pkg.get("name") or "").lower()
        if pref_slug and pref_slug in name:
            s += 500
        if ("detalh" in title) or ("detail" in title) or ("detail" in name):
            s -= 200
        return (s, mm)
    pkg = None
    if pref_slug:
        for p in pkgs:
            if pref_slug in (p.get("name") or "").lower():
                pkg = p
                break
    if pkg is None:
        pkg = sorted(pkgs, key=score, reverse=True)[0]
    pkg_full = client.show_package(pkg["id"]) if "id" in pkg else pkg
    resources = pkg_full.get("resources", [])
    if resource_filter:
        rx = re.compile(resource_filter)
        filtered = [
            r
            for r in resources
            if rx.search((r.get("name") or "") + " " + (r.get("description") or ""))
        ]
        resources = filtered or resources
    return pkg_full, resources

def _pref_index(
    fmt: Optional[str], prefer: Tuple[str, ...] = ("CSV", "XLSX", "ZIP")
) -> int:
    """Retorna o índice de preferência de um formato de arquivo."""
    f = (fmt or "").upper()
    try:
        return prefer.index(f)
    except ValueError:
        return len(prefer)

_RE_YM = re.compile(r"(20\d{2})[-_/.]?(0[1-9]|1[0-2])")
_RE_Y = re.compile(r"(19\d{2}|20\d{2})")

def _period_key_from_resource(r: dict) -> str:
    """Extrai uma chave de período (YYYY-MM ou YYYY) do nome/descrição/URL do recurso."""
    name = (
        (r.get("name") or "")
        + " "
        + (r.get("description") or "")
        + " "
        + (r.get("url") or "")
    )
    m = _RE_YM.search(name)
    if m:
        return f"{m.group(1)}-{m.group(2)}"
    m = _RE_Y.search(name)
    if m:
        return m.group(1)
    lm = _parse_iso_dt(r.get("last_modified") or r.get("created"))
    if lm:
        return f"{lm.year:04d}-{lm.month:02d}"
    return (r.get("name") or "").strip() or (r.get("id") or "unknown")

def _group_prefer_by_period(
    resources: List[dict], prefer: Tuple[str, ...] = ("CSV", "XLSX", "ZIP")
) -> List[dict]:
    """Agrupa recursos por período e escolhe o melhor formato para cada período."""
    groups: Dict[str, List[dict]] = {}
    for r in resources:
        k = _period_key_from_resource(r)
        groups.setdefault(k, []).append(r)
    chosen: List[dict] = []
    for k, items in groups.items():
        items_sorted = sorted(
            items,
            key=lambda r: (
                _pref_index(r.get("format"), prefer),
                r.get("last_modified") or r.get("created") or "",
            ),
        )
        chosen.append(items_sorted[0])
    chosen = sorted(chosen, key=lambda r: _period_key_from_resource(r))
    return chosen

def _is_data_resource(r: dict) -> bool:
    """Retorna True se o recurso parece ser dado e não documentação."""
    fmt = (r.get("format") or r.get("mimetype") or "").upper()
    url = (r.get("url") or r.get("download_url") or "").lower()
    name = ((r.get("name") or "") + " " + (r.get("description") or "")).lower()
    if any(
        bad in name
        for bad in ["dicionario", "dictionary", "glossario", "metadado", "metadata"]
    ):
        return False
    if fmt in {"CSV", "XLSX", "XLS", "ZIP"}:
        return True
    if (
        url.endswith(".csv")
        or url.endswith(".xlsx")
        or url.endswith(".xls")
        or url.endswith(".zip")
    ):
        return True
    return False

def fetch_one(
    client: CkanClient,
    spec: DatasetSpec,
    out_s3_key: str,
    verbose: bool = True,
    since: Optional[str] = None,
    until: Optional[str] = None,
    overwrite: bool = False,
) -> Optional[str]:
    """Baixa um dataset do ONS e salva no S3."""
    # CORRIGIDO: Define a chave S3 completa
    s3_key_full = f"{S3_FOLDER_RAW}/{out_s3_key}"
    
    # CORRIGIDO: Verifica a existência com o nome do bucket correto e a chave completa
    if _is_s3_file_exists(S3_BUCKET, s3_key_full) and not overwrite:
        if verbose:
            print(
                f"[CKAN] Já existe s3://{S3_BUCKET}/{s3_key_full}; pulando (use --overwrite para baixar novamente)"
            )
        return s3_key_full
    pkg_full, resources = _list_resources(
        client, spec.query, spec.resource_filter, verbose
    )
    if not resources:
        return None
    resources = [r for r in resources if _is_data_resource(r)]
    if since or until:
        candidates = [
            r for r in resources if _resource_matches_since_until(r, since, until)
        ] or resources
        res = _pick_resource(candidates)
    else:
        res = _pick_resource(resources)
    if not res:
        if verbose:
            print(f"[CKAN] Nenhum recurso adequado para: {spec.query}")
        return None
    url = res.get("url") or res.get("download_url")
    if not url:
        if verbose:
            print(f"[CKAN] Recurso sem URL para: {spec.query}")
        return None
    if verbose:
        page_slug = pkg_full.get("name") or pkg_full.get("id") or ""
        page_url = f"{SITE_BASE}/dataset/{page_slug}" if page_slug else SITE_BASE
        res_name = (res.get("name") or "").strip()
        res_fmt = (res.get("format") or res.get("mimetype") or "").upper()
        res_last = res.get("last_modified") or res.get("created") or ""
        print(f"[CKAN] Pacote: {pkg_full.get('title','(sem título)')} -> {page_url}")
        print(f"[CKAN] Recurso: {res_name} [{res_fmt}] {res_last}")
        print(f"[CKAN] URL: {url}")
    if verbose:
        print(f"[CKAN] Baixando {spec.query} -> {s3_key_full}")
    try:
        downloaded_buffer = _download_resource_to_memory(url)
        csv_buffer = _maybe_convert_to_csv_in_memory(downloaded_buffer, url)
        # CORRIGIDO: Usa a variável S3_BUCKET e a chave completa
        _s3_upload_file_from_memory(csv_buffer, S3_BUCKET, s3_key_full)
    except Exception as e:
        if verbose:
            print(f"[CKAN] Falha no processamento: {e}")
        return None
    return s3_key_full

def fetch_many_and_concat(
    client: CkanClient,
    spec: DatasetSpec,
    out_s3_key: str,
    since: Optional[str] = None,
    until: Optional[str] = None,
    verbose: bool = True,
    overwrite: bool = False,
) -> Optional[str]:
    """Baixa todos os recursos do pacote, concatena e salva no S3."""
    # CORRIGIDO: Define a chave S3 completa
    s3_key_full = f"{S3_FOLDER_RAW}/{out_s3_key}"

    # CORRIGIDO: Verifica a existência com o nome do bucket correto e a chave completa
    if _is_s3_file_exists(S3_BUCKET, s3_key_full) and not overwrite:
        if verbose:
            print(
                f"[CKAN] Já existe s3://{S3_BUCKET}/{s3_key_full}; pulando concatenação (use --overwrite)"
            )
        return s3_key_full
    pkg_full, resources = _list_resources(
        client, spec.query, spec.resource_filter, verbose
    )
    if not resources:
        return None
    def rkey(r):
        return r.get("last_modified") or r.get("created") or r.get("name") or ""
    resources = sorted(resources, key=rkey)
    resources = [r for r in resources if _is_data_resource(r)]
    resources = [r for r in resources if _resource_matches_since_until(r, since, until)]
    resources = _group_prefer_by_period(resources)
    csv_buffers = []
    for r in resources:
        url = r.get("url") or r.get("download_url")
        if not url:
            continue
        if verbose:
            print(f"[CKAN] Recurso mensal: {r.get('name','(sem nome)')} -> {url}")
        try:
            downloaded_buffer = _download_resource_to_memory(url)
            csv_buffer = _maybe_convert_to_csv_in_memory(downloaded_buffer, url)
            csv_buffers.append(csv_buffer)
        except Exception as e:
            if verbose:
                print(f"[CKAN] Falha no processamento do recurso: {e}")
            continue
    if not csv_buffers:
        if verbose:
            print(f"[CKAN] Nenhum recurso para concatenar para: {spec.query}")
        return None
    final_buffer = io.BytesIO()
    _append_csv_buffers(csv_buffers, final_buffer)
    final_buffer.seek(0)
    # CORRIGIDO: Usa a variável S3_BUCKET e a chave completa
    _s3_upload_file_from_memory(final_buffer, S3_BUCKET, s3_key_full)
    return s3_key_full

def fetch_all(
    datasets: Optional[List[str]] = None,
    verbose: bool = True,
    since: Optional[str] = None,
    until: Optional[str] = None,
    overwrite: bool = False,
) -> Dict[str, Optional[str]]:
    """Baixa todos os datasets especificados (ou os padrão) e salva no S3."""
    if datasets is None:
        datasets = list(DEFAULT_SPECS.keys())
    client = CkanClient()
    results: Dict[str, Optional[str]] = {}
    for key in datasets:
        spec = DEFAULT_SPECS[key]
        out_s3_key = spec.out_name
        try:
            if key in {
                "corte_eolica",
                "corte_fv",
                "balanco",
                "intercambio",
                "ena",
                "ear",
            }:
                p = fetch_many_and_concat(
                    client,
                    spec,
                    out_s3_key,
                    since=since,
                    until=until,
                    verbose=verbose,
                    overwrite=overwrite,
                )
            elif key == "carga":
                p = fetch_carga_api(
                    out_s3_key,
                    since=since,
                    until=until,
                    overwrite=overwrite,
                    verbose=verbose,
                )
            else:
                p = fetch_one(
                    client,
                    spec,
                    out_s3_key,
                    verbose=verbose,
                    since=since,
                    until=until,
                    overwrite=overwrite,
                )
        except Exception as e:
            print(f"[ERRO] {key}: {e}")
            p = None
        results[key] = p
    return results

In [6]:
# Este código deve ser executado em uma CÉLULA SEPARADA
# após a célula que contém todas as definições de funções.

print("Iniciando o processo de download e upload para o S3...")

# Para baixar todos os datasets definidos em DEFAULT_SPECS
download_results = fetch_all(
    datasets=list(DEFAULT_SPECS.keys()),
    since="2018-01",  # Exemplo: Baixar dados a partir de janeiro de 2024
    overwrite=True    # Força o download mesmo se os arquivos já existirem
)

print("\n--- Relatório Final de Processamento ---")
# Imprime o resultado do download para verificar quais arquivos foram salvos
for key, s3_key_full in download_results.items():
    if s3_key_full:
        print(f"✅ SUCESSO! Dataset '{key}' salvo em: s3://{S3_BUCKET}/{s3_key_full}")
    else:
        print(f"❌ FALHA! Dataset '{key}' não foi salvo.")

Iniciando o processo de download e upload para o S3...
[CKAN] Recurso mensal: Balanco_de_Energia_Subsistema-2019 -> https://ons-aws-prod-opendata.s3.amazonaws.com/dataset/balanco_energia_subsistema_ho/BALANCO_ENERGIA_SUBSISTEMA_2019.csv
[CKAN] Recurso mensal: Balanco_de_Energia_Subsistema-2020 -> https://ons-aws-prod-opendata.s3.amazonaws.com/dataset/balanco_energia_subsistema_ho/BALANCO_ENERGIA_SUBSISTEMA_2020.csv
[CKAN] Recurso mensal: Balanco_de_Energia_Subsistema-2021 -> https://ons-aws-prod-opendata.s3.amazonaws.com/dataset/balanco_energia_subsistema_ho/BALANCO_ENERGIA_SUBSISTEMA_2021.csv
[CKAN] Recurso mensal: Balanco_de_Energia_Subsistema-2022 -> https://ons-aws-prod-opendata.s3.amazonaws.com/dataset/balanco_energia_subsistema_ho/BALANCO_ENERGIA_SUBSISTEMA_2022.csv
[CKAN] Recurso mensal: Balanco_de_Energia_Subsistema-2023 -> https://ons-aws-prod-opendata.s3.amazonaws.com/dataset/balanco_energia_subsistema_ho/BALANCO_ENERGIA_SUBSISTEMA_2023.csv
[CKAN] Recurso mensal: Balanco_de_E