In [1]:
import os
import zipfile
import logging
import pandas as pd
import requests
from datetime import datetime
from time import perf_counter
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry

# Reconfigurar logging
for handler in logging.root.handlers[:]:
    logging.root.removeHandler(handler)
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")

# Parâmetros
CHUNK_SIZE = 500_000
URL = "https://api-csvr.stg.cloud.cnj.jus.br/download_csv?tribunal=TJSP&indicador=&oj=&grau=&municipio=&ambiente=csv_p"

DESTINO_ZIP = "/lakehouse/default/Files/DATAJUD/ZIP/tjsp.zip"
DESTINO_EXTRAIDA = "/lakehouse/default/Files/DATAJUD/dados-extraidos"
DESTINO_PARQUET = "/lakehouse/default/Files/DATAJUD/lista-processos/tjsp_processos.parquet"

# Etapa 1 – Download

def download_zip(url, local_path, max_retries=5):
    logging.info("Etapa 1: Iniciando download do ZIP...")
    inicio = perf_counter()

    retry_strategy = Retry(
        total=max_retries,
        backoff_factor=5,
        status_forcelist=[500, 502, 503, 504],
        allowed_methods=["GET"]
    )
    adapter = HTTPAdapter(max_retries=retry_strategy)
    session = requests.Session()
    session.mount("https://", adapter)
    session.mount("http://", adapter)

    headers = {
        "User-Agent": "Mozilla/5.0",
        "Referer": "https://justica-em-numeros.cnj.jus.br/"
    }

    os.makedirs(os.path.dirname(local_path), exist_ok=True)

    with session.get(url, headers=headers, stream=True, timeout=180) as response:
        response.raise_for_status()
        total_size = int(response.headers.get("content-length", 0))
        total_mb = total_size / 1024 / 1024
        logging.info(f"Tamanho total do arquivo: {total_mb:.2f} MB")

        downloaded = 0
        with open(local_path, "wb") as f:
            for i, chunk in enumerate(response.iter_content(chunk_size=1024 * 1024)):
                if chunk:
                    f.write(chunk)
                    downloaded += len(chunk)
                    progresso = (downloaded / total_size) * 100 if total_size else 0
                    mb_baixado = downloaded / 1024 / 1024
                    logging.info(f"{progresso:.2f}% - {mb_baixado:.2f} MB baixados")

    logging.info(f"✅ ZIP salvo em: {local_path}")
    logging.info(f"Tempo total: {perf_counter() - inicio:.2f}s")


# Etapa 2 – Extração do ZIP local

def extrair_zip(zip_path, extract_dir):
    logging.info("Etapa 2: Extração do ZIP")
    inicio = perf_counter()

    os.makedirs(extract_dir, exist_ok=True)

    with zipfile.ZipFile(zip_path, 'r') as zip_ref:
        arquivos = zip_ref.namelist()
        logging.info(f"Arquivos no ZIP: {len(arquivos)}")

        for i, arquivo in enumerate(arquivos, 1):
            zip_ref.extract(arquivo, extract_dir)
            caminho_extraido = os.path.join(extract_dir, arquivo)
            tamanho_mb = os.path.getsize(caminho_extraido) / 1024 / 1024
            logging.info(f"Extraído {i}/{len(arquivos)}: {arquivo} ({tamanho_mb:.2f} MB)")

    logging.info(f"✅ Extração concluída para: {extract_dir}")
    logging.info(f"Tempo de extração: {perf_counter() - inicio:.2f}s")


# Etapa 3 – Processamento dos CSVs para Parquet

def process_csvs_to_parquet(input_dir, output_path, chunksize=CHUNK_SIZE):
    logging.info("Etapa 3: Processamento dos CSVs")
    inicio = perf_counter()

    os.makedirs(os.path.dirname(output_path), exist_ok=True)
    arquivos = [os.path.join(input_dir, f) for f in os.listdir(input_dir) if f.endswith(".csv")]

    if not arquivos:
        logging.warning("Nenhum CSV encontrado em: " + input_dir)
        return

    lista_chunks = []
    total_processos_lidos = 0
    total_validos = 0

    for i, caminho in enumerate(arquivos, 1):
        logging.info(f"📂 ({i}/{len(arquivos)}) Lendo arquivo: {os.path.basename(caminho)}")
        try:
            for j, chunk in enumerate(pd.read_csv(caminho, sep=';', usecols=["Processo"], chunksize=chunksize, low_memory=False), 1):
                total_lido = len(chunk)
                total_processos_lidos += total_lido

                chunk["Processo"] = chunk["Processo"].astype(str).str.replace(r"[-.]", "", regex=True)
                chunk = chunk[~chunk["Processo"].str.contains("sigiloso", case=False, na=False)]
                validos = len(chunk)
                total_validos += validos

                logging.info(f"   ✅ Chunk {j}: {validos} válidos de {total_lido}")
                lista_chunks.append(chunk)
        except Exception as e:
            logging.warning(f"⚠️ Erro ao processar {caminho}: {e}")

    if not lista_chunks:
        logging.error("❌ Nenhum dado válido encontrado após a leitura dos CSVs.")
        return

    df_final = pd.concat(lista_chunks, ignore_index=True)
    df_final.drop_duplicates(subset="Processo", inplace=True)
    df_final["data_download"] = datetime.now().strftime("%Y-%m-%d %H:%M:%S")

    logging.info(f"- Total lido: {total_processos_lidos:,}")
    logging.info(f"- Total válido antes de deduplicação: {total_validos:,}")
    logging.info(f"- Total único após deduplicação: {len(df_final):,}")

    df_final.to_parquet(output_path, index=False)
    logging.info(f"✅ Arquivo Parquet salvo: {output_path}")
    logging.info(f"- Tempo total de processamento: {perf_counter() - inicio:.2f}s")


StatementMeta(, 4de3cd6e-a81c-4659-84ec-1c1160ce06d9, 3, Finished, Available, Finished)

In [None]:
download_zip(URL, DESTINO_ZIP)

In [None]:
extrair_zip(DESTINO_ZIP, DESTINO_EXTRAIDA)

In [None]:
process_csvs_to_parquet(DESTINO_EXTRAIDA, DESTINO_PARQUET)