### MVP - Construção de um pipeline de dados

  A pipeline consistem em cinco etapas bem definidas, como buscar, coletar, modelar, carregar e análisar os dados.

  Na primeira etapa que é a busca dos dados, foi escolhido dados abertos na web no site Portal da Transparencia do governo federal [https://portaldatransparencia.gov.br/].

  Dentre as inumeras bases de dados no site, a escolhida foram os dados dos aposentados do Banco Central do Brasil (BACEN) que encontram-se ativos.  
  O período de análise será de Janeiro de 2025 até Setembro de 2025.

  A proposta é conseguir responder as seguintes perguntas com a massa de dados escolhida:
  - Qual a faixa etária dos servidores?
  - Qual é a média de idade desses servidores?   
  - Quantos desses servidores estão ocupando cargos de alta gestão ou função comissionada? 
  - Qual foi o fluxo de aposentados na ativa no período? 
  - Quantos aposentados foram reativados a cada mês? 
  - Em quais departamentos a concentração de aposentados na ativa é maior? 

  A segunda etapa consistem em coletar esses dados do site e importar ele dentro da ferramaneta databricks.


In [0]:

%sql

-- Cria o catálogo principal para todos os seus dados do BACEN
CREATE CATALOG IF NOT EXISTS catalog_bacen
COMMENT 'Catálogo para dados públicos dos Servidores Aposentados Ativos do Banco do Brasil (BACEN).';

-- Opcional: Crie os esquemas (databases) Bronze, Silver e Gold dentro do novo catálogo
CREATE SCHEMA IF NOT EXISTS catalog_bacen.bronze;
CREATE SCHEMA IF NOT EXISTS catalog_bacen.silver;
CREATE SCHEMA IF NOT EXISTS catalog_bacen.gold;

-- 1. Criação do Volume para armazenar os arquivos
-- Assumimos o nome 'dados_servidores' para o Volume que armazenará seus arquivos.
CREATE VOLUME IF NOT EXISTS catalog_bacen.bronze.dados_servidores
COMMENT 'Volume para arquivos CSV brutos dos Servidores Aposentados Ativos do Banco do Brasil (BACEN).';

-- 2. Criação do Volume para armazenar as tabelas 
-- Assumimos o nome 'dados_servidores' para o Volume que armazenará suas tabelas.
CREATE VOLUME IF NOT EXISTS catalog_bacen.silver.dados_servidores
COMMENT 'Volume para armazenar as tabelas dos Servidores Aposentados Ativos do Banco do Brasil (BACEN).';

-- 2. Criação do Volume para armazenar o modelo estrela
-- Assumimos o nome 'dados_servidores' para o Volume que armazenará o modelo estrela.
CREATE VOLUME IF NOT EXISTS catalog_bacen.gold.dados_servidores
COMMENT 'Volume para armazenar o modelo estrela dos Servidores Aposentados Ativos do Banco do Brasil (BACEN).';

-- Define o catálogo recém-criado como o padrão para as próximas operações
USE CATALOG catalog_bacen;

In [0]:
import requests
import os
from zipfile import ZipFile
import shutil # Adicionado para manipulação de arquivos locais

# --- VARIÁVEIS DE CONFIGURAÇÃO ---
ZIP_URL = "https://portaldatransparencia.gov.br/download-de-dados/servidores/202501_Aposentados_BACEN/"
ZIP_FILENAME = "202501_Aposentados_BACEN.zip"

# Caminho do Volume UC (Destino dos CSVs)
UC_VOLUME_PATH = "/Volumes/catalog_bacen/bronze/dados_servidores/" 

# Diretórios Temporários no Driver
TEMP_DOWNLOAD_PATH = f"/tmp/{ZIP_FILENAME}"
TEMP_UNZIP_DIR = "/tmp/unzip_servidores_data"


# --- FLUXO DE TRABALHO CORRIGIDO E UNIFICADO ---
print(f"1. Iniciando o fluxo de ingestão...")

try:
    # A. DOWNLOAD e DESCOMPACTAR
    print(f"   -> Iniciando o download do arquivo ZIP: {ZIP_FILENAME}")
    response = requests.get(ZIP_URL, stream=True)
    response.raise_for_status()
    
    with open(TEMP_DOWNLOAD_PATH, "wb") as file:
        for chunk in response.iter_content(chunk_size=1024 * 1024):
            file.write(chunk)
            
    os.makedirs(TEMP_UNZIP_DIR, exist_ok=True)
    with ZipFile(TEMP_DOWNLOAD_PATH, 'r') as zip_ref:
        zip_ref.extractall(TEMP_UNZIP_DIR)
        
    print("   -> Download e Descompactação concluídos.")
    
    # B. INGESTÃO SEGURA: Iterar e Fazer o Upload de Cada CSV
    print(f"2. Movendo os arquivos CSV para o Volume UC: {UC_VOLUME_PATH}")
    
    # Cria o diretório de destino no Volume (UC)
    dbutils.fs.mkdirs(UC_VOLUME_PATH)

    moved_count = 0
    
    for filename in os.listdir(TEMP_UNZIP_DIR):
        if filename.lower().endswith('.csv'):
            local_csv_path = os.path.join(TEMP_UNZIP_DIR, filename)
            remote_delta_path = UC_VOLUME_PATH + filename.replace(".csv", "_delta")
            
            print(f"   -> Processando: {filename}")
            
            # 1. Lê o conteúdo do arquivo CSV local como texto
            with open(local_csv_path, 'r', encoding='ISO-8859-1') as f:
                csv_content = f.read()

            # 2. Usa dbutils.fs.put para escrever o conteúdo como um arquivo temporário no Volume
            temp_csv_file_on_volume = f"{UC_VOLUME_PATH}temp_{filename}"
            dbutils.fs.put(temp_csv_file_on_volume, csv_content, overwrite=True)
            
            # 3. Usa Spark para ler o CSV do Volume e salvar como Delta
            df_temp = (spark.read 
                       .format("csv") 
                       .option("header", "true") 
                       .option("delimiter", ";") 
                       .option("encoding", "ISO-8859-1") 
                       .load(temp_csv_file_on_volume)
                      )
            
            # 4. Salva o DataFrame como Tabela Delta (Formato Bronze)
            df_temp.write.format("delta").mode("overwrite").save(remote_delta_path)
            
            # 5. Remove o arquivo CSV temporário no Volume
            dbutils.fs.rm(temp_csv_file_on_volume)
            
            moved_count += 1
    
    if moved_count > 0:
        print(f"\n   -> {moved_count} arquivo(s) CSV lido(s) localmente e salvo(s) como Delta na Camada Bronze com sucesso!")
    else:
         print("\n   -> ATENÇÃO: Nenhum arquivo CSV encontrado na pasta descompactada.")


    # C. LIMPEZA
    print("3. Limpeza de arquivos temporários...")
    
    os.remove(TEMP_DOWNLOAD_PATH) 
    shutil.rmtree(TEMP_UNZIP_DIR) 
    
    print("   -> Limpeza concluída.")
    
    # D. CONFIRMAÇÃO
    print("\nConteúdo do Volume de destino (deve ser o arquivo delta):")
    dbutils.fs.ls(UC_VOLUME_PATH)

# --- TRATAMENTO DE ERROS UNIFICADO ---
except requests.exceptions.RequestException as e:
    print(f"\nERRO DE DOWNLOAD: Falha ao acessar a URL. Verifique se a URL está correta. Detalhes: {e}")
except Exception as e:
    # Este catch agora trata qualquer erro durante a descompactação ou ingestão
    print(f"\nOCORREU UM ERRO INESPERADO: {e}")