In [0]:
dbutils.widgets.text("url", "")
dbutils.widgets.text("token", "")
dbutils.widgets.text("qtd_meses", "")
dbutils.widgets.text("tipo_data", "")
dbutils.widgets.text("volume", "")
dbutils.widgets.text("catalog", "")

In [0]:
url = dbutils.widgets.get("url")
token = dbutils.widgets.get("token")
qtd_meses = int(dbutils.widgets.get("qtd_meses"))
tipo_data = dbutils.widgets.get("tipo_data")
volume = dbutils.widgets.get("volume")
catalog = dbutils.widgets.get("catalog")
schema = "raw"

In [0]:
spark.sql(f"USE CATALOG {catalog}")

spark.sql(
    f"CREATE VOLUME IF NOT EXISTS {catalog}.{schema}.{volume}"
)

In [0]:
from datetime import date, datetime
from dateutil.relativedelta import relativedelta
from typing import List, Dict


def obterPeriodo(qtd_meses: int) -> Dict:
    hoje = date.today()
    data_inicial = (hoje - relativedelta(months=qtd_meses)).replace(day=1)

    return {
        "dataInicial": data_inicial.strftime("%Y-%m-%d"),
        "dataFinal": hoje.strftime("%Y-%m-%d"),
    }


def geraParametros(tipoData: str) -> dict:
    periodo = obterPeriodo(qtd_meses)

    dados = {
        'dataInicial': periodo['dataInicial'],
        'dataFinal': periodo['dataFinal'],
        'tipoData': tipo_data,
        'token': token
    }

    if tipoData == 'data_vencimento':
        data_final = datetime.strptime(periodo['dataFinal'], '%Y-%m-%d').date()
        nova_data = data_final + relativedelta(months=12)
        dados['dataFinal'] = nova_data.strftime('%Y-%m-%d')

    return dados

In [0]:
import requests
from typing import Dict, Optional


def chamar_api(url: str, parametros: Dict, pagina: int = 1) -> Optional[requests.Response]:
    params = parametros.copy()
    params["pagina"] = pagina

    try:
        response = requests.get(url, params=params, timeout=60)
        response.raise_for_status()
        return response
    except requests.RequestException as e:
        return None


def obter_qtd_paginas(url: str, parametros: dict) -> int:
    response = chamar_api(url, parametros, pagina=1)
    if response:
        try:
            total = int(response.json().get('totalPages', 0))
            return max(total, 0)
        except (ValueError, KeyError, TypeError):
            return 0
    return 0

def obter_todas_paginas(url: str, parametros: dict) -> list:
    total_paginas = obter_qtd_paginas(url, parametros)
    registros = []

    if total_paginas > 0:
        for pagina in range(1, total_paginas + 1):
            response = chamar_api(url, parametros, pagina)
            if response:
                try:
                    dados = response.json().get('registros', [])
                    registros.extend(dados)
                except (ValueError, KeyError, TypeError):
                   None
    else:
        response = chamar_api(url, parametros, pagina=1)
        if response:
            try:
                dados = response.json().get('registros', [])
                registros.extend(dados)
            except (ValueError, KeyError, TypeError):
                None
        else:
           None

    return registros

In [0]:
import json

# Configuração do volume
CATALOG = catalog
SCHEMA = schema
VOLUME = volume
VOLUME_PATH = f"/Volumes/{CATALOG}/{SCHEMA}/{VOLUME}"

# Obter parâmetros e dados
parametros = geraParametros(tipo_data)

# Buscar registros da API
registros = obter_todas_paginas(url, parametros)

# Adicionar metadados aos registros
for item in registros:
    item["_data_referencia"] = parametros["dataInicial"]
    item["_ingestion_timestamp"] = datetime.now().isoformat()

# PERSISTÊNCIA NO VOLUME (JSON RAW)
if registros:
    data_execucao = datetime.now().strftime("%Y%m%d_%H%M%S")
    file_path = f"{VOLUME_PATH}/{VOLUME}_{data_execucao}.json"

    # Salvar usando dbutils.fs
    json_content = json.dumps(registros, ensure_ascii=False, indent=2)
    
    dbutils.fs.put(
        file_path,
        json_content,
        overwrite=True
    )