##  Sobre a Fonte de Dados "Cartões de Pagamento do Governo Federal"
##  
Informações sobre gastos realizados pelo governo por meio do Cartão de Pagamento do Governo Federal, segundo limites e regras específicas. Consulte dados detalhados e gerenciais sobre valores, quantidade de portadores, órgãos, nome, estabelecimento e muito mais.



## Origem dos Dados

O Portal da Transparência integra e apresenta dados de diversos sistemas utilizados pelo Governo Federal para a sua gestão financeira e administrativa, objetivando prover transparência da gestão pública, além de instrumentalizar a sociedade para a realização do controle social.

Os dados são recebidos com periodicidade diária, semanal e mensal, a depender do tema, e são de responsabilidade dos ministérios e outros órgãos do Poder Executivo Federal, por serem eles os executores dos programas de governo e os responsáveis pela gestão das ações governamentais.

Os dados que iremos filtrar serão a partir da data da transação, do primeiro dia do ano atual até o último dia do ano atual, podendo ser atualizado constantemente.

## Objetivo do Trabalho
## 
Este projeto tem como objetivo principal construir um modelo de dados confiável e otimizado para análise das transações realizadas com cartões de pagamento do governo federal, utilizando dados disponibilizados pelo Portal da Transparência.

A partir dos dados oficiais em formato CSV, armazenados em um repositório, coletamos, transformamos e organizamos os dados em um esquema estrela (star schema) no Databricks, composto por:

Uma tabela fato (fato_transacoes): Contendo todas as transações com campos como valor, data, e chaves de relacionamento.

Tabelas de dimensão (dim_tipo_cartao, dim_estabelecimento, dim_unidade_gestora, dim_portador): Fornecendo contexto sobre os tipos de cartão, estabelecimentos, unidades gestoras e portadores.

### Principais Metas
### 
✅ Extração e Carga Automatizada

Desenvolver um processo ETL que busca dados do CSV no repositório disponível, tratando paginação e garantindo a integridade dos registros.

✅ Modelagem Dimensional (Data Warehouse)

Implementar um modelo em esquema estrela, ideal para consultas analíticas e relatórios.

Garantir a qualidade dos dados com verificações de:

Chaves primárias únicas (sem duplicatas).

Valores não nulos em campos obrigatórios.

Consistência (valores monetários positivos, datas válidas).

✅ Otimização para Performance

Aplicare técnicas como Z-Ordering e particionamento por data para acelerar consultas.

Atualizar estatísticas para melhorar o planejamento de execução do Spark.

✅ Documentação e Rastreabilidade

Criar um catálogo de dados descrevendo tabelas, campos e domínios.

Registrar a linhagem dos dados (origem, transformações, destino).

## 1. Configuração Inicial

In [0]:
def remover_acentos(texto):
    """Remove todos os acentos e caracteres especiais de uma string"""
    if not texto:
        return texto
    # Substitui caracteres acentuados
    substituicoes = {
        'Á': 'A', 'À': 'A', 'Â': 'A', 'Ã': 'A', 'Ä': 'A',
        'É': 'E', 'È': 'E', 'Ê': 'E', 'Ë': 'E',
        'Í': 'I', 'Ì': 'I', 'Î': 'I', 'Ï': 'I',
        'Ó': 'O', 'Ò': 'O', 'Ô': 'O', 'Õ': 'O', 'Ö': 'O',
        'Ú': 'U', 'Ù': 'U', 'Û': 'U', 'Ü': 'U',
        'Ç': 'C',
        'Ñ': 'N',
        'Ý': 'Y',
        'á': 'a', 'à': 'a', 'â': 'a', 'ã': 'a', 'ä': 'a',
        'é': 'e', 'è': 'e', 'ê': 'e', 'ë': 'e',
        'í': 'i', 'ì': 'i', 'î': 'i', 'ï': 'i',
        'ó': 'o', 'ò': 'o', 'ô': 'o', 'õ': 'o', 'ö': 'o',
        'ú': 'u', 'ù': 'u', 'û': 'u', 'ü': 'u',
        'ç': 'c',
        'ñ': 'n',
        'ý': 'y',
        'ÿ': 'y'
    }
    for original, substituto in substituicoes.items():
        texto = texto.replace(original, substituto)
    return texto.upper()


In [0]:
# Databricks notebook source
# Configurações
import pandas as pd
from pyspark.sql.functions import *
from pyspark.sql import functions as F
from pyspark.sql.types import *
import re



github_repo = "juancssantos/MVP_PUC_ENG_DADOS"
branch = "main"
data_dir = "dados_governo" 
database_name="dw_cpgf"
    

# Variável única para armazenar todos os dados
df_final = None

def listar_arquivos_cpgf():
    """Lista dinamicamente todos os arquivos _CPGF.csv do diretório"""
    import requests
    try:
        url = f"https://api.github.com/repos/{github_repo}/contents/{data_dir}"
        response = requests.get(url)
        response.raise_for_status()
        
        arquivos = [f['name'] for f in response.json() 
                  if f['name'].endswith('_CPGF.csv')]
        
        if not arquivos:
            print("⚠️ Nenhum arquivo _CPGF.csv encontrado no diretório")
            return []
            
        print(f"📂 Encontrados {len(arquivos)} arquivos no repositório")
        return arquivos
        
    except Exception as e:
        print(f"❌ Erro ao listar arquivos: {str(e)}")
        return []

def processar_csv_github(file_name):
    """Função otimizada para processamento em lote"""
    try:
        # URL correta do arquivo raw
        url = f"https://raw.githubusercontent.com/{github_repo}/{branch}/{data_dir}/{file_name}"
        
        # Tentamos primeiro com UTF-8, depois fallback para ISO-8859-1
        try:
            df_pandas = pd.read_csv(url, sep=';', encoding='utf-8')
        except UnicodeDecodeError:
            df_pandas = pd.read_csv(url, sep=';', encoding='ISO-8859-1')
        
        # Conversão para Spark DataFrame
        df_spark = spark.createDataFrame(df_pandas)
        
        # Padronização de nomes de colunas
        for col in df_spark.columns:
            df_spark = df_spark.withColumnRenamed(col, remover_acentos(col.replace(" ", "_").upper()))
        
        return df_spark
        
    except Exception as e:
        print(f"Erro ao processar {file_name}: {str(e)}")
        return None

# Processamento principal
print("⏳ Iniciando processamento dos arquivos CPGF")
arquivos = listar_arquivos_cpgf()

for arquivo in arquivos:
    print(f"Processando {arquivo}...", end=' ')
    
    df_temp = processar_csv_github(arquivo)
    
    if df_temp is not None:
        if df_final is None:
            df_final = df_temp
        else:
            df_final = df_final.union(df_temp)
        print(f"✅ ({df_temp.count()} linhas)")
    else:
        print("❌ falha")

# Resumo final
if df_final is not None:
    print(f"\n✅ Processamento concluído! Total: {df_final.count()} linhas")
    print(f"📊 Estrutura final:")
    df_final.printSchema()
else:
    print("\n❌ Nenhum arquivo foi processado com sucesso")

⏳ Iniciando processamento dos arquivos CPGF
📂 Encontrados 4 arquivos no repositório
Processando 202501_CPGF.csv... ✅ (10717 linhas)
Processando 202502_CPGF.csv... ✅ (2419 linhas)
Processando 202503_CPGF.csv... ✅ (9398 linhas)
Processando 202504_CPGF.csv... ✅ (12235 linhas)

✅ Processamento concluído! Total: 34769 linhas
📊 Estrutura final:
root
 |-- CODIGO_ORGAO_SUPERIOR: long (nullable = true)
 |-- NOME_ORGAO_SUPERIOR: string (nullable = true)
 |-- CODIGO_ORGAO: long (nullable = true)
 |-- NOME_ORGAO: string (nullable = true)
 |-- CODIGO_UNIDADE_GESTORA: long (nullable = true)
 |-- NOME_UNIDADE_GESTORA: string (nullable = true)
 |-- ANO_EXTRATO: long (nullable = true)
 |-- MES_EXTRATO: long (nullable = true)
 |-- CPF_PORTADOR: string (nullable = true)
 |-- NOME_PORTADOR: string (nullable = true)
 |-- CNPJ_OU_CPF_FAVORECIDO: long (nullable = true)
 |-- NOME_FAVORECIDO: string (nullable = true)
 |-- TRANSACAO: string (nullable = true)
 |-- DATA_TRANSACAO: string (nullable = true)
 |-- VA

In [0]:
display(df_final)

CODIGO_ORGAO_SUPERIOR,NOME_ORGAO_SUPERIOR,CODIGO_ORGAO,NOME_ORGAO,CODIGO_UNIDADE_GESTORA,NOME_UNIDADE_GESTORA,ANO_EXTRATO,MES_EXTRATO,CPF_PORTADOR,NOME_PORTADOR,CNPJ_OU_CPF_FAVORECIDO,NOME_FAVORECIDO,TRANSACAO,DATA_TRANSACAO,VALOR_TRANSACAO
63000,Advocacia-Geral da União,63000,Advocacia-Geral da União - Unidades com vínculo direto,110161,SUPERINTENDENCIA REG. DE ADMIN. DA 1ª REGIAO,2025,1,***.562.861-**,ANTONIO CARLOS MELO DOS SANTOS,77385797000117,CIBREL COMERCIAL BRASILEIRA DE REFRIGERACAO LIMITADA,COMPRA A/V - R$ - APRES,02/12/2024,18390
63000,Advocacia-Geral da União,63000,Advocacia-Geral da União - Unidades com vínculo direto,110161,SUPERINTENDENCIA REG. DE ADMIN. DA 1ª REGIAO,2025,1,***.725.752-**,VIVIANE CORREA LIMA,33311039000189,ELETRUS COMERCIO DE MATERIAIS DE CONSTRUCAO LTDA,COMPRA A/V - R$ - APRES,12/12/2024,5034
63000,Advocacia-Geral da União,63000,Advocacia-Geral da União - Unidades com vínculo direto,110161,SUPERINTENDENCIA REG. DE ADMIN. DA 1ª REGIAO,2025,1,***.384.652-**,CARLOS EPAMINONDAS GOMES DA SILVA,8632253000270,CROI COMPUTADORES LTDA,COMPRA A/V - R$ - APRES,27/11/2024,57500
63000,Advocacia-Geral da União,63000,Advocacia-Geral da União - Unidades com vínculo direto,110161,SUPERINTENDENCIA REG. DE ADMIN. DA 1ª REGIAO,2025,1,***.725.752-**,VIVIANE CORREA LIMA,84641331000362,DISMONZA DISTRIBUIDORA DE TINTAS E ABRASIVOS LTDA,COMPRA A/V - R$ - APRES,05/12/2024,2700
63000,Advocacia-Geral da União,63000,Advocacia-Geral da União - Unidades com vínculo direto,110161,SUPERINTENDENCIA REG. DE ADMIN. DA 1ª REGIAO,2025,1,***.945.361-**,CLAUDELI CONCEICAO DOS SANTOS,18727053000174,PAGAR.ME INSTITUICAO DE PAGAMENTO S.A,COMPRA A/V - R$ - APRES,26/11/2024,20500
63000,Advocacia-Geral da União,63000,Advocacia-Geral da União - Unidades com vínculo direto,110161,SUPERINTENDENCIA REG. DE ADMIN. DA 1ª REGIAO,2025,1,***.725.752-**,VIVIANE CORREA LIMA,2990016000114,LIBERDADE & SILVA LTDA,COMPRA A/V - R$ - APRES,04/12/2024,1260
63000,Advocacia-Geral da União,63000,Advocacia-Geral da União - Unidades com vínculo direto,110161,SUPERINTENDENCIA REG. DE ADMIN. DA 1ª REGIAO,2025,1,***.725.752-**,VIVIANE CORREA LIMA,84578855000194,B B FRANCA SILVA,COMPRA A/V - R$ - APRES,03/12/2024,2662
63000,Advocacia-Geral da União,63000,Advocacia-Geral da União - Unidades com vínculo direto,110161,SUPERINTENDENCIA REG. DE ADMIN. DA 1ª REGIAO,2025,1,***.562.861-**,ANTONIO CARLOS MELO DOS SANTOS,39875872000139,AUDIO EXPRESS COMERCIO DE ELETRONICOS E SERVICOS LTDA,COMPRA A/V - R$ - APRES,18/12/2024,600
63000,Advocacia-Geral da União,63000,Advocacia-Geral da União - Unidades com vínculo direto,110161,SUPERINTENDENCIA REG. DE ADMIN. DA 1ª REGIAO,2025,1,***.725.752-**,VIVIANE CORREA LIMA,79379491007510,HAVAN S.A,COMPRA A/V - R$ - APRES,26/11/2024,14997
63000,Advocacia-Geral da União,63000,Advocacia-Geral da União - Unidades com vínculo direto,110161,SUPERINTENDENCIA REG. DE ADMIN. DA 1ª REGIAO,2025,1,***.945.361-**,CLAUDELI CONCEICAO DOS SANTOS,55930862000105,SOLUCAO ELETRICA LTDA,COMPRA A/V - R$ - APRES,05/12/2024,175000


## 2. Definir Schemas para as Tabelas

In [0]:
from pyspark.sql.types import *
from datetime import date 

# Schema para a camada Bronze (dados brutos)
bronze_schema = StructType([
    StructField("CODIGO_ORGAO_SUPERIOR", StringType(), True),
    StructField("NOME_ORGAO_SUPERIOR", StringType(), True),
    StructField("CODIGO_ORGAO", StringType(), True),
    StructField("NOME_ORGAO", StringType(), True),
    StructField("CODIGO_UNIDADE_GESTORA", StringType(), True),
    StructField("NOME_UNIDADE_GESTORA", StringType(), True),
    StructField("ANO_EXTRATO", StringType(), True),
    StructField("MES_EXTRATO", StringType(), True),
    StructField("CPF_PORTADOR", StringType(), True),
    StructField("NOME_PORTADOR", StringType(), True),
    StructField("CNPJ_OU_CPF_FAVORECIDO", StringType(), True),
    StructField("NOME_FAVORECIDO", StringType(), True),
    StructField("TRANSACAO", StringType(), True),
    StructField("DATA_TRANSACAO", StringType(), True),
    StructField("VALOR_TRANSACAO", StringType(), True),
    StructField("FILE_NAME", StringType(), True),
    StructField("INGESTION_TIME", TimestampType(), True)
])

# Schema para a camada Silver (dados limpos e padronizados)
silver_schema = StructType([
    StructField("ID_TRANSACAO", LongType(), False),
    StructField("CODIGO_ORGAO_SUPERIOR", StringType(), True),
    StructField("NOME_ORGAO_SUPERIOR", StringType(), True),
    StructField("CODIGO_ORGAO", StringType(), True),
    StructField("NOME_ORGAO", StringType(), True),
    StructField("CODIGO_UNIDADE_GESTORA", StringType(), True),
    StructField("NOME_UNIDADE_GESTORA", StringType(), True),
    StructField("ANO_EXTRATO", IntegerType(), True),
    StructField("MES_EXTRATO", IntegerType(), True),
    StructField("CPF_PORTADOR", StringType(), True),
    StructField("NOME_PORTADOR", StringType(), True),
    StructField("CNPJ_OU_CPF_FAVORECIDO", StringType(), True),
    StructField("NOME_FAVORECIDO", StringType(), True),
    StructField("TRANSACAO", StringType(), True),
    StructField("DATA_TRANSACAO", DateType(), True),
    StructField("VALOR_TRANSACAO", DecimalType(18, 2), True),
    StructField("TIPO_TRANSACAO", StringType(), True),
    StructField("FILE_NAME", StringType(), True),
    StructField("INGESTION_TIME", TimestampType(), True),
    StructField("UPDATE_TIME", TimestampType(), True)
])


## 3. Função para Processar os Dados

In [0]:

# Schema com nomes de colunas padronizados
CPGF_SCHEMA = StructType([
    StructField("CODIGO_ORGAO_SUPERIOR", StringType(), True),
    StructField("NOME_ORGAO_SUPERIOR", StringType(), True),
    StructField("CODIGO_ORGAO", StringType(), True),
    StructField("NOME_ORGAO", StringType(), True),
    StructField("CODIGO_UNIDADE_GESTORA", StringType(), True),
    StructField("NOME_UNIDADE_GESTORA", StringType(), True),
    StructField("ANO_EXTRATO", StringType(), True),
    StructField("MES_EXTRATO", StringType(), True),
    StructField("CPF_PORTADOR", StringType(), True),
    StructField("NOME_PORTADOR", StringType(), True),
    StructField("CNPJ_OU_CPF_FAVORECIDO", StringType(), True),
    StructField("NOME_FAVORECIDO", StringType(), True),
    StructField("TRANSACAO", StringType(), True),
    StructField("DATA_TRANSACAO", StringType(), True),
    StructField("VALOR_TRANSACAO", StringType(), True)
])

## 2. Função de Limpeza de Decimais Aprimorada
def clean_decimal(value):
    if value is None:
        return None
    try:
        # Remove caracteres não numéricos exceto vírgula/ponto
        cleaned = re.sub(r"[^\d,.-]", "", str(value))
        # Caso formato 1.000,00 -> 1000.00
        if "," in cleaned:
            parts = cleaned.split(",")
            if len(parts) == 2:
                return f"{parts[0].replace('.', '')}.{parts[1]}"
        # Caso formato 1,000.00 -> 1000.00
        return cleaned.replace(",", "")
    except:
        return None

clean_decimal_udf = udf(clean_decimal, StringType())



## 4. Funções para Carregar os Dados nas Tabelas

In [0]:
## 3. Processamento Bronze (Metadados)
def process_bronze(df):
    return (df
            .withColumn("INGESTION_TIME", current_timestamp())
            .withColumn("_SOURCE", F.lit("GitHub"))
            .withColumn("_VALOR_CLEANED", clean_decimal_udf(F.col("VALOR_TRANSACAO")))
           )

## 4. Processamento Silver (Transformações)
def process_silver(bronze_df):
    # Primeiro padroniza nomes de colunas
    for col in bronze_df.columns:
        bronze_df = bronze_df.withColumnRenamed(col, col.replace(" ", "_").upper())
    
    return (bronze_df
            # Conversão de tipos
            .withColumn("ANO_EXTRATO", F.col("ANO_EXTRATO").cast(IntegerType()))
            .withColumn("MES_EXTRATO", F.col("MES_EXTRATO").cast(IntegerType()))
            .withColumn("DATA_TRANSACAO", F.to_date(F.col("DATA_TRANSACAO"), "dd/MM/yyyy"))
            .withColumn("VALOR_TRANSACAO", F.col("_VALOR_CLEANED").cast(DecimalType(18,2)))
            
            # Classificação de transações
            .withColumn("TIPO_TRANSACAO",
                       F.when(F.upper(F.col("TRANSACAO")).contains("COMPRA"), "COMPRA")
                       .when(F.upper(F.col("TRANSACAO")).contains("SAQUE"), "SAQUE")
                       .otherwise("OUTROS"))
            
            # Padronização textual
            .withColumn("NOME_ORGAO_SUPERIOR", F.initcap(F.trim(F.col("NOME_ORGAO_SUPERIOR"))))
            .withColumn("NOME_ORGAO", F.initcap(F.trim(F.col("NOME_ORGAO"))))
            .withColumn("NOME_UNIDADE_GESTORA", F.initcap(F.trim(F.col("NOME_UNIDADE_GESTORA"))))
            .withColumn("NOME_PORTADOR", F.initcap(F.trim(F.col("NOME_PORTADOR"))))
            .withColumn("NOME_FAVORECIDO", F.initcap(F.trim(F.col("NOME_FAVORECIDO"))))
            
            # Metadados
            .withColumn("UPDATE_TIME", current_timestamp())
            .withColumn("ID_TRANSACAO", F.monotonically_increasing_id())
            
            # Remove coluna temporária
            .drop("_VALOR_CLEANED")
           )

## 5. Pipeline Principal
def run_pipeline(df_input, db_name):
    try:
        # Garante que o DataFrame tenha o schema correto
        df_final = spark.createDataFrame(df_input.rdd, CPGF_SCHEMA)
        
        # Processa camadas
        bronze = process_bronze(df_final)
        bronze.write.mode("overwrite").format("delta").option("mergeSchema", "true").saveAsTable(f"{db_name}.bronze_transacoes")
        
        silver = process_silver(bronze)
        silver.write.mode("overwrite").format("delta").option("mergeSchema", "true").saveAsTable(f"{db_name}.silver_transacoes")
        
        print(f"✅ Pipeline concluído! {silver.count()} registros processados.")
        return silver
        
    except Exception as e:
        print(f"❌ Erro no pipeline: {str(e)}")
        raise

## 5. Execução Principal

In [0]:

# Configuração inicial
spark.sql(f"CREATE DATABASE IF NOT EXISTS {database_name}")
spark.catalog.setCurrentDatabase(database_name)

resultado = run_pipeline(df_final,database_name)
display(resultado.limit(5))

✅ Pipeline concluído! 34769 registros processados.


CODIGO_ORGAO_SUPERIOR,NOME_ORGAO_SUPERIOR,CODIGO_ORGAO,NOME_ORGAO,CODIGO_UNIDADE_GESTORA,NOME_UNIDADE_GESTORA,ANO_EXTRATO,MES_EXTRATO,CPF_PORTADOR,NOME_PORTADOR,CNPJ_OU_CPF_FAVORECIDO,NOME_FAVORECIDO,TRANSACAO,DATA_TRANSACAO,VALOR_TRANSACAO,INGESTION_TIME,_SOURCE,TIPO_TRANSACAO,UPDATE_TIME,ID_TRANSACAO
63000,Advocacia-geral Da União,63000,Advocacia-geral Da União - Unidades Com Vínculo Direto,110161,Superintendencia Reg. De Admin. Da 1ª Regiao,2025,1,***.562.861-**,Antonio Carlos Melo Dos Santos,77385797000117,Cibrel Comercial Brasileira De Refrigeracao Limitada,COMPRA A/V - R$ - APRES,2024-12-02,183.9,2025-04-10T15:19:11.626+0000,GitHub,COMPRA,2025-04-10T15:19:11.626+0000,0
63000,Advocacia-geral Da União,63000,Advocacia-geral Da União - Unidades Com Vínculo Direto,110161,Superintendencia Reg. De Admin. Da 1ª Regiao,2025,1,***.725.752-**,Viviane Correa Lima,33311039000189,Eletrus Comercio De Materiais De Construcao Ltda,COMPRA A/V - R$ - APRES,2024-12-12,50.34,2025-04-10T15:19:11.626+0000,GitHub,COMPRA,2025-04-10T15:19:11.626+0000,1
63000,Advocacia-geral Da União,63000,Advocacia-geral Da União - Unidades Com Vínculo Direto,110161,Superintendencia Reg. De Admin. Da 1ª Regiao,2025,1,***.384.652-**,Carlos Epaminondas Gomes Da Silva,8632253000270,Croi Computadores Ltda,COMPRA A/V - R$ - APRES,2024-11-27,575.0,2025-04-10T15:19:11.626+0000,GitHub,COMPRA,2025-04-10T15:19:11.626+0000,2
63000,Advocacia-geral Da União,63000,Advocacia-geral Da União - Unidades Com Vínculo Direto,110161,Superintendencia Reg. De Admin. Da 1ª Regiao,2025,1,***.725.752-**,Viviane Correa Lima,84641331000362,Dismonza Distribuidora De Tintas E Abrasivos Ltda,COMPRA A/V - R$ - APRES,2024-12-05,27.0,2025-04-10T15:19:11.626+0000,GitHub,COMPRA,2025-04-10T15:19:11.626+0000,3
63000,Advocacia-geral Da União,63000,Advocacia-geral Da União - Unidades Com Vínculo Direto,110161,Superintendencia Reg. De Admin. Da 1ª Regiao,2025,1,***.945.361-**,Claudeli Conceicao Dos Santos,18727053000174,Pagar.me Instituicao De Pagamento S.a,COMPRA A/V - R$ - APRES,2024-11-26,205.0,2025-04-10T15:19:11.626+0000,GitHub,COMPRA,2025-04-10T15:19:11.626+0000,4


## 6. Verificação dos Dados e Relacionamentos

In [0]:
try:
    print("\nTabelas disponíveis:")
    display(spark.sql(f"SHOW TABLES IN {database_name}"))
    
    print("\nExemplo de dados Bronze:")
    display(spark.sql(f"SELECT * FROM {database_name}.bronze_transacoes LIMIT 5"))
    
    print("\nExemplo de dados Silver:")
    display(spark.sql(f"SELECT * FROM {database_name}.silver_transacoes LIMIT 5"))
    
except Exception as e:
    print(f"Erro na verificação: {str(e)}")
    print("Verifique se as tabelas foram criadas corretamente:")
    print(f"Bronze exists: {spark.catalog.tableExists('bronze_transacoes')}")
    print(f"Silver exists: {spark.catalog.tableExists('silver_transacoes')}")
 
# Verifica as tabelas criadas
try:
    tables = spark.sql(f"SHOW TABLES IN {database_name}")
    if tables.count() > 0:
        display(tables)
    else:
        print(f"Nenhuma tabela encontrada no banco de dados {database_name}.")
except Exception as e:
    print(f"Erro ao verificar tabelas: {str(e)}")
    print("Tentando criar o banco de dados novamente...")
    spark.sql(f"CREATE DATABASE IF NOT EXISTS {database_name}")
    display(spark.sql(f"SHOW TABLES IN {database_name}"))

# Verifica os dados na silver
display(spark.sql(f"SELECT * FROM {database_name}.silver_transacoes LIMIT 10"))


Tabelas disponíveis:


database,tableName,isTemporary
dw_cpgf,bronze_transacoes,False
dw_cpgf,silver_transacoes,False



Exemplo de dados Bronze:


CODIGO_ORGAO_SUPERIOR,NOME_ORGAO_SUPERIOR,CODIGO_ORGAO,NOME_ORGAO,CODIGO_UNIDADE_GESTORA,NOME_UNIDADE_GESTORA,ANO_EXTRATO,MES_EXTRATO,CPF_PORTADOR,NOME_PORTADOR,CNPJ_OU_CPF_FAVORECIDO,NOME_FAVORECIDO,TRANSACAO,DATA_TRANSACAO,VALOR_TRANSACAO,INGESTION_TIME,_SOURCE,_VALOR_CLEANED
63000,Advocacia-Geral da União,63000,Advocacia-Geral da União - Unidades com vínculo direto,110161,SUPERINTENDENCIA REG. DE ADMIN. DA 1ª REGIAO,2025,1,***.562.861-**,ANTONIO CARLOS MELO DOS SANTOS,77385797000117,CIBREL COMERCIAL BRASILEIRA DE REFRIGERACAO LIMITADA,COMPRA A/V - R$ - APRES,02/12/2024,18390,2025-04-10T15:18:34.730+0000,GitHub,183.9
63000,Advocacia-Geral da União,63000,Advocacia-Geral da União - Unidades com vínculo direto,110161,SUPERINTENDENCIA REG. DE ADMIN. DA 1ª REGIAO,2025,1,***.725.752-**,VIVIANE CORREA LIMA,33311039000189,ELETRUS COMERCIO DE MATERIAIS DE CONSTRUCAO LTDA,COMPRA A/V - R$ - APRES,12/12/2024,5034,2025-04-10T15:18:34.730+0000,GitHub,50.34
63000,Advocacia-Geral da União,63000,Advocacia-Geral da União - Unidades com vínculo direto,110161,SUPERINTENDENCIA REG. DE ADMIN. DA 1ª REGIAO,2025,1,***.384.652-**,CARLOS EPAMINONDAS GOMES DA SILVA,8632253000270,CROI COMPUTADORES LTDA,COMPRA A/V - R$ - APRES,27/11/2024,57500,2025-04-10T15:18:34.730+0000,GitHub,575.0
63000,Advocacia-Geral da União,63000,Advocacia-Geral da União - Unidades com vínculo direto,110161,SUPERINTENDENCIA REG. DE ADMIN. DA 1ª REGIAO,2025,1,***.725.752-**,VIVIANE CORREA LIMA,84641331000362,DISMONZA DISTRIBUIDORA DE TINTAS E ABRASIVOS LTDA,COMPRA A/V - R$ - APRES,05/12/2024,2700,2025-04-10T15:18:34.730+0000,GitHub,27.0
63000,Advocacia-Geral da União,63000,Advocacia-Geral da União - Unidades com vínculo direto,110161,SUPERINTENDENCIA REG. DE ADMIN. DA 1ª REGIAO,2025,1,***.945.361-**,CLAUDELI CONCEICAO DOS SANTOS,18727053000174,PAGAR.ME INSTITUICAO DE PAGAMENTO S.A,COMPRA A/V - R$ - APRES,26/11/2024,20500,2025-04-10T15:18:34.730+0000,GitHub,205.0



Exemplo de dados Silver:


CODIGO_ORGAO_SUPERIOR,NOME_ORGAO_SUPERIOR,CODIGO_ORGAO,NOME_ORGAO,CODIGO_UNIDADE_GESTORA,NOME_UNIDADE_GESTORA,ANO_EXTRATO,MES_EXTRATO,CPF_PORTADOR,NOME_PORTADOR,CNPJ_OU_CPF_FAVORECIDO,NOME_FAVORECIDO,TRANSACAO,DATA_TRANSACAO,VALOR_TRANSACAO,INGESTION_TIME,_SOURCE,TIPO_TRANSACAO,UPDATE_TIME,ID_TRANSACAO
63000,Advocacia-geral Da União,63000,Advocacia-geral Da União - Unidades Com Vínculo Direto,110161,Superintendencia Reg. De Admin. Da 1ª Regiao,2025,1,***.562.861-**,Antonio Carlos Melo Dos Santos,77385797000117,Cibrel Comercial Brasileira De Refrigeracao Limitada,COMPRA A/V - R$ - APRES,2024-12-02,183.9,2025-04-10T15:18:59.767+0000,GitHub,COMPRA,2025-04-10T15:18:59.767+0000,0
63000,Advocacia-geral Da União,63000,Advocacia-geral Da União - Unidades Com Vínculo Direto,110161,Superintendencia Reg. De Admin. Da 1ª Regiao,2025,1,***.725.752-**,Viviane Correa Lima,33311039000189,Eletrus Comercio De Materiais De Construcao Ltda,COMPRA A/V - R$ - APRES,2024-12-12,50.34,2025-04-10T15:18:59.767+0000,GitHub,COMPRA,2025-04-10T15:18:59.767+0000,1
63000,Advocacia-geral Da União,63000,Advocacia-geral Da União - Unidades Com Vínculo Direto,110161,Superintendencia Reg. De Admin. Da 1ª Regiao,2025,1,***.384.652-**,Carlos Epaminondas Gomes Da Silva,8632253000270,Croi Computadores Ltda,COMPRA A/V - R$ - APRES,2024-11-27,575.0,2025-04-10T15:18:59.767+0000,GitHub,COMPRA,2025-04-10T15:18:59.767+0000,2
63000,Advocacia-geral Da União,63000,Advocacia-geral Da União - Unidades Com Vínculo Direto,110161,Superintendencia Reg. De Admin. Da 1ª Regiao,2025,1,***.725.752-**,Viviane Correa Lima,84641331000362,Dismonza Distribuidora De Tintas E Abrasivos Ltda,COMPRA A/V - R$ - APRES,2024-12-05,27.0,2025-04-10T15:18:59.767+0000,GitHub,COMPRA,2025-04-10T15:18:59.767+0000,3
63000,Advocacia-geral Da União,63000,Advocacia-geral Da União - Unidades Com Vínculo Direto,110161,Superintendencia Reg. De Admin. Da 1ª Regiao,2025,1,***.945.361-**,Claudeli Conceicao Dos Santos,18727053000174,Pagar.me Instituicao De Pagamento S.a,COMPRA A/V - R$ - APRES,2024-11-26,205.0,2025-04-10T15:18:59.767+0000,GitHub,COMPRA,2025-04-10T15:18:59.767+0000,4


database,tableName,isTemporary
dw_cpgf,bronze_transacoes,False
dw_cpgf,silver_transacoes,False


CODIGO_ORGAO_SUPERIOR,NOME_ORGAO_SUPERIOR,CODIGO_ORGAO,NOME_ORGAO,CODIGO_UNIDADE_GESTORA,NOME_UNIDADE_GESTORA,ANO_EXTRATO,MES_EXTRATO,CPF_PORTADOR,NOME_PORTADOR,CNPJ_OU_CPF_FAVORECIDO,NOME_FAVORECIDO,TRANSACAO,DATA_TRANSACAO,VALOR_TRANSACAO,INGESTION_TIME,_SOURCE,TIPO_TRANSACAO,UPDATE_TIME,ID_TRANSACAO
63000,Advocacia-geral Da União,63000,Advocacia-geral Da União - Unidades Com Vínculo Direto,110161,Superintendencia Reg. De Admin. Da 1ª Regiao,2025,1,***.562.861-**,Antonio Carlos Melo Dos Santos,77385797000117,Cibrel Comercial Brasileira De Refrigeracao Limitada,COMPRA A/V - R$ - APRES,2024-12-02,183.9,2025-04-10T15:18:59.767+0000,GitHub,COMPRA,2025-04-10T15:18:59.767+0000,0
63000,Advocacia-geral Da União,63000,Advocacia-geral Da União - Unidades Com Vínculo Direto,110161,Superintendencia Reg. De Admin. Da 1ª Regiao,2025,1,***.725.752-**,Viviane Correa Lima,33311039000189,Eletrus Comercio De Materiais De Construcao Ltda,COMPRA A/V - R$ - APRES,2024-12-12,50.34,2025-04-10T15:18:59.767+0000,GitHub,COMPRA,2025-04-10T15:18:59.767+0000,1
63000,Advocacia-geral Da União,63000,Advocacia-geral Da União - Unidades Com Vínculo Direto,110161,Superintendencia Reg. De Admin. Da 1ª Regiao,2025,1,***.384.652-**,Carlos Epaminondas Gomes Da Silva,8632253000270,Croi Computadores Ltda,COMPRA A/V - R$ - APRES,2024-11-27,575.0,2025-04-10T15:18:59.767+0000,GitHub,COMPRA,2025-04-10T15:18:59.767+0000,2
63000,Advocacia-geral Da União,63000,Advocacia-geral Da União - Unidades Com Vínculo Direto,110161,Superintendencia Reg. De Admin. Da 1ª Regiao,2025,1,***.725.752-**,Viviane Correa Lima,84641331000362,Dismonza Distribuidora De Tintas E Abrasivos Ltda,COMPRA A/V - R$ - APRES,2024-12-05,27.0,2025-04-10T15:18:59.767+0000,GitHub,COMPRA,2025-04-10T15:18:59.767+0000,3
63000,Advocacia-geral Da União,63000,Advocacia-geral Da União - Unidades Com Vínculo Direto,110161,Superintendencia Reg. De Admin. Da 1ª Regiao,2025,1,***.945.361-**,Claudeli Conceicao Dos Santos,18727053000174,Pagar.me Instituicao De Pagamento S.a,COMPRA A/V - R$ - APRES,2024-11-26,205.0,2025-04-10T15:18:59.767+0000,GitHub,COMPRA,2025-04-10T15:18:59.767+0000,4
63000,Advocacia-geral Da União,63000,Advocacia-geral Da União - Unidades Com Vínculo Direto,110161,Superintendencia Reg. De Admin. Da 1ª Regiao,2025,1,***.725.752-**,Viviane Correa Lima,2990016000114,Liberdade & Silva Ltda,COMPRA A/V - R$ - APRES,2024-12-04,12.6,2025-04-10T15:18:59.767+0000,GitHub,COMPRA,2025-04-10T15:18:59.767+0000,5
63000,Advocacia-geral Da União,63000,Advocacia-geral Da União - Unidades Com Vínculo Direto,110161,Superintendencia Reg. De Admin. Da 1ª Regiao,2025,1,***.725.752-**,Viviane Correa Lima,84578855000194,B B Franca Silva,COMPRA A/V - R$ - APRES,2024-12-03,26.62,2025-04-10T15:18:59.767+0000,GitHub,COMPRA,2025-04-10T15:18:59.767+0000,6
63000,Advocacia-geral Da União,63000,Advocacia-geral Da União - Unidades Com Vínculo Direto,110161,Superintendencia Reg. De Admin. Da 1ª Regiao,2025,1,***.562.861-**,Antonio Carlos Melo Dos Santos,39875872000139,Audio Express Comercio De Eletronicos E Servicos Ltda,COMPRA A/V - R$ - APRES,2024-12-18,6.0,2025-04-10T15:18:59.767+0000,GitHub,COMPRA,2025-04-10T15:18:59.767+0000,7
63000,Advocacia-geral Da União,63000,Advocacia-geral Da União - Unidades Com Vínculo Direto,110161,Superintendencia Reg. De Admin. Da 1ª Regiao,2025,1,***.725.752-**,Viviane Correa Lima,79379491007510,Havan S.a,COMPRA A/V - R$ - APRES,2024-11-26,149.97,2025-04-10T15:18:59.767+0000,GitHub,COMPRA,2025-04-10T15:18:59.767+0000,8
63000,Advocacia-geral Da União,63000,Advocacia-geral Da União - Unidades Com Vínculo Direto,110161,Superintendencia Reg. De Admin. Da 1ª Regiao,2025,1,***.945.361-**,Claudeli Conceicao Dos Santos,55930862000105,Solucao Eletrica Ltda,COMPRA A/V - R$ - APRES,2024-12-05,1750.0,2025-04-10T15:18:59.767+0000,GitHub,COMPRA,2025-04-10T15:18:59.767+0000,9


## 7. Transformação de Dados para o Modelo em Esquema Estrela

In [0]:
# Criação da database DW
 
# Cria o banco de dados DW se não existir
spark.sql(f"CREATE DATABASE IF NOT EXISTS {database_name}_dw")

 # Funções Auxiliares de Transformação
 


def create_dim_tables():
 # 1. First clean up metastore entries if they exist
    spark.sql(f"USE {database_name}_dw")
    
    tables = ["dim_tipo_cartao", "dim_estabelecimento", 
             "dim_unidade_gestora", "dim_portador", 
             "dim_tempo", "fato_transacoes"]
    
    for table in tables:
        # Drop table from metastore first
        spark.sql(f"DROP TABLE IF EXISTS {database_name}_dw.{table}")
        
        # Then delete files
        dbutils.fs.rm(f"dbfs:/user/hive/warehouse/{database_name}_dw.db/{table}", recurse=True)
    
  
    """Cria as tabelas de dimensão a partir dos dados silver"""
    
    spark.sql(f"CREATE DATABASE IF NOT EXISTS {database_name}_dw")
    
    # DIM_TIPO_CARTAO
    spark.sql(f"""
    CREATE TABLE IF NOT EXISTS {database_name}_dw.DIM_TIPO_CARTAO (
        ID_TIPO_CARTAO INT,
        TIPO_TRANSACAO STRING,
        DESCRICAO_TRANSACAO STRING,
        DATA_INICIO_VIGENCIA DATE,
        DATA_FIM_VIGENCIA DATE,
        CURRENT_FLAG BOOLEAN
    ) USING DELTA
    """)
    
    
    tipos_transacao = [
        (1, "COMPRA", "Transação de compra", date(1900, 1, 1), date(9999, 12, 31), True),
        (2, "SAQUE", "Transação de saque", date(1900, 1, 1), date(9999, 12, 31), True),
        (3, "OUTROS", "Outros tipos de transação", date(1900, 1, 1), date(9999, 12, 31), True)
    ]
    
    
    schema = StructType([
        StructField("ID_TIPO_CARTAO", IntegerType()),
        StructField("TIPO_TRANSACAO", StringType()),
        StructField("DESCRICAO_TRANSACAO", StringType()),
        StructField("DATA_INICIO_VIGENCIA", DateType()),
        StructField("DATA_FIM_VIGENCIA", DateType()),
        StructField("CURRENT_FLAG", BooleanType())
    ])
    
     
    df_tipos = spark.createDataFrame(tipos_transacao, schema=schema)
    
     
    df_tipos.write.mode("overwrite") \
        .format("delta") \
        .option("mergeSchema", "true") \
        .saveAsTable(f"{database_name}_dw.DIM_TIPO_CARTAO")
    
    
    # DIM_ESTABELECIMENTO
    spark.sql(f"""
    CREATE TABLE IF NOT EXISTS {database_name}_dw.DIM_ESTABELECIMENTO (
        ID_ESTABELECIMENTO BIGINT,
        CNPJ_OU_CPF_FAVORECIDO STRING,
        NOME_FAVORECIDO STRING,
        DATA_INICIO_VIGENCIA DATE,
        DATA_FIM_VIGENCIA DATE,
        CURRENT_FLAG BOOLEAN
    ) USING DELTA
    """)
    
    # DIM_UNIDADE_GESTORA
    spark.sql(f"""
    CREATE TABLE IF NOT EXISTS {database_name}_dw.DIM_UNIDADE_GESTORA (
        ID_UNIDADE_GESTORA BIGINT,
        CODIGO_UNIDADE_GESTORA STRING,
        NOME_UNIDADE_GESTORA STRING,
        CODIGO_ORGAO STRING,
        NOME_ORGAO STRING,
        CODIGO_ORGAO_SUPERIOR STRING,
        NOME_ORGAO_SUPERIOR STRING,
        DATA_INICIO_VIGENCIA DATE,
        DATA_FIM_VIGENCIA DATE,
        CURRENT_FLAG BOOLEAN
    ) USING DELTA
    """)
    
    # DIM_PORTADOR
    spark.sql(f"""
    CREATE TABLE IF NOT EXISTS {database_name}_dw.DIM_PORTADOR (
        ID_PORTADOR BIGINT,
        CPF_PORTADOR STRING,
        NOME_PORTADOR STRING,
        DATA_INICIO_VIGENCIA DATE,
        DATA_FIM_VIGENCIA DATE,
        CURRENT_FLAG BOOLEAN
    ) USING DELTA
    """)
    
    # DIM_TEMPO
    spark.sql(f"""
    CREATE TABLE IF NOT EXISTS {database_name}_dw.DIM_TEMPO (
        ID_TEMPO INT,
        DATA DATE,
        DIA INT,
        MES INT,
        ANO INT,
        TRIMESTRE INT,
        SEMESTRE INT,
        DIA_SEMANA INT,
        NOME_DIA_SEMANA STRING,
        NOME_MES STRING,
        FERIADO BOOLEAN,
        FIM_DE_SEMANA BOOLEAN
    ) USING DELTA
    LOCATION 'dbfs:/user/hive/warehouse/{database_name}_dw.db/dim_tempo'
    """)


## 8. Transformação das Dimensões

In [0]:

def transform_dimensions():
    """Transforma e carrega as dimensões"""
    
    # DIM_ESTABELECIMENTO
    df_estabelecimento = spark.sql(f"""
    SELECT 
        monotonically_increasing_id() AS ID_ESTABELECIMENTO,
        CNPJ_OU_CPF_FAVORECIDO,
        NOME_FAVORECIDO,
        current_date() AS DATA_INICIO_VIGENCIA,
        date('9999-12-31') AS DATA_FIM_VIGENCIA,
        TRUE AS CURRENT_FLAG
    FROM {database_name}.silver_transacoes
    WHERE CNPJ_OU_CPF_FAVORECIDO IS NOT NULL
    GROUP BY CNPJ_OU_CPF_FAVORECIDO, NOME_FAVORECIDO
    """)
    
    df_estabelecimento.write.mode("overwrite").format("delta").saveAsTable(f"{database_name}_dw.DIM_ESTABELECIMENTO")
    
    # DIM_UNIDADE_GESTORA
    df_unidade_gestora = spark.sql(f"""
    SELECT 
        monotonically_increasing_id() AS ID_UNIDADE_GESTORA,
        CODIGO_UNIDADE_GESTORA,
        NOME_UNIDADE_GESTORA,
        CODIGO_ORGAO,
        NOME_ORGAO,
        CODIGO_ORGAO_SUPERIOR,
        NOME_ORGAO_SUPERIOR,
        current_date() AS DATA_INICIO_VIGENCIA,
        date('9999-12-31') AS DATA_FIM_VIGENCIA,
        TRUE AS CURRENT_FLAG
    FROM {database_name}.silver_transacoes
    GROUP BY CODIGO_UNIDADE_GESTORA, NOME_UNIDADE_GESTORA, CODIGO_ORGAO, NOME_ORGAO, 
             CODIGO_ORGAO_SUPERIOR, NOME_ORGAO_SUPERIOR
    """)
    
    df_unidade_gestora.write.mode("overwrite").format("delta").saveAsTable(f"{database_name}_dw.DIM_UNIDADE_GESTORA")
    
    # DIM_PORTADOR
    df_portador = spark.sql(f"""
    SELECT 
        monotonically_increasing_id() AS ID_PORTADOR,
        CPF_PORTADOR,
        NOME_PORTADOR,
        current_date() AS DATA_INICIO_VIGENCIA,
        date('9999-12-31') AS DATA_FIM_VIGENCIA,
        TRUE AS CURRENT_FLAG
    FROM {database_name}.silver_transacoes
    WHERE CPF_PORTADOR IS NOT NULL
    GROUP BY CPF_PORTADOR, NOME_PORTADOR
    """)
    
    df_portador.write.mode("overwrite").format("delta").saveAsTable(f"{database_name}_dw.DIM_PORTADOR")
    
    # DIM_TEMPO (preenchemos com um range de datas)
    # Primeiro encontramos o min e max de datas nas transações
    date_range = spark.sql(f"""
    SELECT 
        min(DATA_TRANSACAO) as min_date,
        max(DATA_TRANSACAO) as max_date
    FROM {database_name}.silver_transacoes
    """).collect()[0]
    
    min_date = date_range["min_date"]
    max_date = date_range["max_date"]
    
    # Cria um dataframe com todas as datas no intervalo
    spark.sql(f"""
    CREATE OR REPLACE TEMPORARY VIEW date_series AS
    SELECT explode(sequence(to_date('{min_date}'), to_date('{max_date}'), interval 1 day)) as data
    """)
    
    # Preenche a dimensão tempo
    spark.sql(f"""
    INSERT OVERWRITE TABLE {database_name}_dw.DIM_TEMPO
    SELECT 
        cast(date_format(data, 'yyyyMMdd') as int) as ID_TEMPO,
        data as DATA,
        day(data) as DIA,
        month(data) as MES,
        year(data) as ANO,
        quarter(data) as TRIMESTRE,
        case when month(data) <= 6 then 1 else 2 end as SEMESTRE,
        dayofweek(data) as DIA_SEMANA,
        date_format(data, 'EEEE') as NOME_DIA_SEMANA,
        date_format(data, 'MMMM') as NOME_MES,
        FALSE as FERIADO,
        case when dayofweek(data) in (1, 7) then TRUE else FALSE end as FIM_DE_SEMANA
    FROM date_series
    """)

## 9. Transformação e Carga da Tabela Fato

In [0]:
def transform_fact_table():
    """Transforma e carrega a tabela fato"""
    
    spark.sql(f"""
    CREATE TABLE IF NOT EXISTS {database_name}_dw.FATO_TRANSACOES (
        ID_TRANSACAO BIGINT,
        ID_TIPO_CARTAO INT,
        ID_ESTABELECIMENTO BIGINT,
        ID_UNIDADE_GESTORA BIGINT,
        ID_PORTADOR BIGINT,
        ID_TEMPO INT,
        ANO_EXTRATO INT,
        MES_EXTRATO INT,
        VALOR_TRANSACAO DECIMAL(18,2),
        DATA_TRANSACAO DATE,
        DATA_CARGA TIMESTAMP
    ) USING DELTA
    """)
    
    # Carrega a tabela fato com joins para as dimensões
    spark.sql(f"""
    INSERT OVERWRITE TABLE {database_name}_dw.FATO_TRANSACOES
    SELECT 
        t.ID_TRANSACAO,
        tc.ID_TIPO_CARTAO,
        e.ID_ESTABELECIMENTO,
        ug.ID_UNIDADE_GESTORA,
        p.ID_PORTADOR,
        dt.ID_TEMPO,
        t.ANO_EXTRATO,
        t.MES_EXTRATO,
        t.VALOR_TRANSACAO,
        t.DATA_TRANSACAO,
        current_timestamp() as DATA_CARGA
    FROM {database_name}.silver_transacoes t
    LEFT JOIN {database_name}_dw.DIM_TIPO_CARTAO tc ON t.TIPO_TRANSACAO = tc.TIPO_TRANSACAO
    LEFT JOIN {database_name}_dw.DIM_ESTABELECIMENTO e ON t.CNPJ_OU_CPF_FAVORECIDO = e.CNPJ_OU_CPF_FAVORECIDO
    LEFT JOIN {database_name}_dw.DIM_UNIDADE_GESTORA ug ON t.CODIGO_UNIDADE_GESTORA = ug.CODIGO_UNIDADE_GESTORA
    LEFT JOIN {database_name}_dw.DIM_PORTADOR p ON t.CPF_PORTADOR = p.CPF_PORTADOR
    LEFT JOIN {database_name}_dw.DIM_TEMPO dt ON t.DATA_TRANSACAO = dt.DATA
    """)

# 10. Controles de Qualidade

In [0]:
def run_data_quality_checks():
    """Executa verificações de qualidade dos dados"""
    
    # Verifica se há valores nulos em campos importantes
    checks = [
        ("FATO_TRANSACOES", "ID_TRANSACAO IS NULL", "Transações sem ID"),
        ("FATO_TRANSACOES", "VALOR_TRANSACAO IS NULL", "Transações sem valor"),
        ("FATO_TRANSACOES", "DATA_TRANSACAO IS NULL", "Transações sem data"),
        ("DIM_ESTABELECIMENTO", "CNPJ_OU_CPF_FAVORECIDO IS NULL", "Estabelecimentos sem CNPJ/CPF"),
        ("DIM_PORTADOR", "CPF_PORTADOR IS NULL", "Portadores sem CPF")
    ]
    
    for table, condition, description in checks:
        count = spark.sql(f"SELECT COUNT(*) FROM {database_name}_dw.{table} WHERE {condition}").collect()[0][0]
        if count > 0:
            print(f"ALERTA: {count} registros com {description} na tabela {table}")
        else:
            print(f"OK: Nenhum registro com {description} na tabela {table}")
    
    # Verifica valores negativos em transações
    negative_count = spark.sql(f"""
    SELECT COUNT(*) 
    FROM {database_name}_dw.FATO_TRANSACOES 
    WHERE VALOR_TRANSACAO < 0
    """).collect()[0][0]
    
    if negative_count > 0:
        print(f"ALERTA: {negative_count} transações com valores negativos")
    else:
        print("OK: Nenhuma transação com valor negativo")
    
    # Verifica integridade referencial
    ref_checks = [
        ("FATO_TRANSACOES", "ID_TIPO_CARTAO", "DIM_TIPO_CARTAO", "ID_TIPO_CARTAO"),
        ("FATO_TRANSACOES", "ID_ESTABELECIMENTO", "DIM_ESTABELECIMENTO", "ID_ESTABELECIMENTO"),
        ("FATO_TRANSACOES", "ID_UNIDADE_GESTORA", "DIM_UNIDADE_GESTORA", "ID_UNIDADE_GESTORA"),
        ("FATO_TRANSACOES", "ID_PORTADOR", "DIM_PORTADOR", "ID_PORTADOR"),
        ("FATO_TRANSACOES", "ID_TEMPO", "DIM_TEMPO", "ID_TEMPO")
    ]
    
    for fact_table, fk, dim_table, pk in ref_checks:
        count = spark.sql(f"""
        SELECT COUNT(*) 
        FROM {database_name}_dw.{fact_table} f
        LEFT JOIN {database_name}_dw.{dim_table} d ON f.{fk} = d.{pk}
        WHERE d.{pk} IS NULL AND f.{fk} IS NOT NULL
        """).collect()[0][0]
        
        if count > 0:
            print(f"ALERTA: {count} registros com problemas de integridade referencial em {fact_table}.{fk}")
        else:
            print(f"OK: Integridade referencial válida para {fact_table}.{fk}")

# 11. Documentação Automatizada

In [0]:
def check_and_create_date_series():
    """Verifies if date_series exists, creates it if missing"""
    try:
        
        if not spark.catalog.tableExists(f"{database_name}_dw.date_series"):
            print("Creating date_series table...")
            
           
            date_df = spark.sql("""
                SELECT explode(sequence(
                    date_sub(current_date(), 365*5),
                    date_add(current_date(), 365*5),
                    interval 1 day
                )) as date
            """)
            
            
            date_df = date_df.withColumn("year", year("date")) \
                             .withColumn("month", month("date")) \
                             .withColumn("day", dayofmonth("date")) \
                             .withColumn("day_of_week", dayofweek("date")) \
                             .withColumn("day_of_year", dayofyear("date")) \
                             .withColumn("week_of_year", weekofyear("date")) \
                             .withColumn("quarter", quarter("date")) \
                             .withColumn("is_weekend", (dayofweek("date").isin(1, 7)))
            
            
            date_df.write \
                .mode("overwrite") \
                .format("delta") \
                .saveAsTable(f"{database_name}_dw.date_series")
            
            print("✅ date_series table created successfully!")
        else:
            print("date_series table already exists")
            
    except Exception as e:
        print(f"Error creating date_series table: {str(e)}")
        raise

def generate_documentation():
    """Generates data model documentation with robust error handling"""
    try:
        
        spark.sql(f"CREATE DATABASE IF NOT EXISTS {database_name}_dw")
        
        
        check_and_create_date_series()
        
        
        doc = "# Data Model Documentation\n\n"
        
        
        tables = spark.sql(f"SHOW TABLES IN {database_name}_dw").collect()
        
        if not tables:
            doc += "No tables found in the database.\n"
            return doc
        
        for table in tables:
            table_name = table.tableName
            doc += f"## Table: {table_name}\n\n"
            
            try:
                df = spark.table(f"{database_name}_dw.{table_name}")
                
               
                doc += f"**Row count:** {df.count():,}\n\n"
                
                
                doc += "| Column Name | Data Type | Description |\n"
                doc += "|-------------|-----------|-------------|\n"
                
                for field in df.schema:
                    desc = "No description available"
                    if "date" in field.name.lower():
                        desc = "Date field"
                    elif "id" in field.name.lower():
                        desc = "Identifier field"
                    elif "name" in field.name.lower():
                        desc = "Name field"
                    elif "value" in field.name.lower():
                        desc = "Numeric value"
                        
                    doc += f"| {field.name} | {field.dataType} | {desc} |\n"
                
                doc += "\n"
                
            except Exception as e:
                doc += f"⚠️ Error documenting table {table_name}: {str(e)}\n\n"
                continue
        
        
        doc_path = f"/FileStore/tables/{database_name}_documentation.md"
        dbutils.fs.put(doc_path, doc, overwrite=True)
        print(f"Documentation saved to {doc_path}")
        
        return doc
        
    except Exception as e:
        return f"Error generating documentation: {str(e)}"

def generate_field_description(field_name):
    """Gera descrições automáticas baseadas em padrões de nomes de campos"""
    field_lower = field_name.lower()
    
    description_rules = {
        "id_": "Chave primária/estrangeira",
        "cod_": "Código identificador",
        "data_": f"Data de {field_lower.replace('data_', '').replace('_', ' ')}",
        "nome_": f"Nome do {field_lower.replace('nome_', '').replace('_', ' ')}",
        "desc_": f"Descrição do {field_lower.replace('desc_', '').replace('_', ' ')}",
        "valor_": f"Valor da {field_lower.replace('valor_', '').replace('_', ' ')}",
        "flag_": f"Indicador booleano para {field_lower.replace('flag_', '').replace('_', ' ')}",
        "qtd_": f"Quantidade de {field_lower.replace('qtd_', '').replace('_', ' ')}",
        "total_": f"Total de {field_lower.replace('total_', '').replace('_', ' ')}",
    }
    
    for prefix, desc in description_rules.items():
        if field_name.lower().startswith(prefix.lower()):
            return desc
    
    # Padrões especiais para campos conhecidos
    if field_name == "CURRENT_FLAG": return "Indica se é o registro atual (SCD Type 2)"
    if field_name == "VALID_FROM": return "Data de início da validade do registro"
    if field_name == "VALID_TO": return "Data de fim da validade do registro"
    
    return "Campo de dados"  # Descrição padrão


# 12. Otimizações Finais

In [0]:
def optimize_tables():
    """Otimiza as tabelas Delta para melhor performance"""
    
    tables = spark.sql(f"SHOW TABLES IN {database_name}_dw").collect()
    
    for table in tables:
        table_name = table["tableName"]
        print(f"Otimizando tabela {table_name}...")
        
        # Otimiza a tabela
        spark.sql(f"OPTIMIZE {database_name}_dw.{table_name}")
        
        # Executa ZORDER nas colunas mais usadas em filtros
        if table_name == "FATO_TRANSACOES":
            spark.sql(f"OPTIMIZE {database_name}_dw.{table_name} ZORDER BY (ID_TEMPO, ID_TIPO_CARTAO)")
        elif table_name == "DIM_TEMPO":
            spark.sql(f"OPTIMIZE {database_name}_dw.{table_name} ZORDER BY (ID_TEMPO)")
        
        # Coleta estatísticas para o otimizador de consultas
        spark.sql(f"ANALYZE TABLE {database_name}_dw.{table_name} COMPUTE STATISTICS FOR ALL COLUMNS")

# 13. Execução Completa do Pipeline

In [0]:

# Cria as tabelas de dimensão
create_dim_tables()

# Transforma as dimensões
transform_dimensions()

# Transforma a tabela fato
transform_fact_table()

# Executa controles de qualidade
run_data_quality_checks()

# Gera documentação
docs = generate_documentation()

# Otimiza as tabelas
optimize_tables()

print("Pipeline executado com sucesso!")


OK: Nenhum registro com Transações sem ID na tabela FATO_TRANSACOES
OK: Nenhum registro com Transações sem valor na tabela FATO_TRANSACOES
ALERTA: 7939 registros com Transações sem data na tabela FATO_TRANSACOES
OK: Nenhum registro com Estabelecimentos sem CNPJ/CPF na tabela DIM_ESTABELECIMENTO
OK: Nenhum registro com Portadores sem CPF na tabela DIM_PORTADOR
OK: Nenhuma transação com valor negativo
OK: Integridade referencial válida para FATO_TRANSACOES.ID_TIPO_CARTAO
OK: Integridade referencial válida para FATO_TRANSACOES.ID_ESTABELECIMENTO
OK: Integridade referencial válida para FATO_TRANSACOES.ID_UNIDADE_GESTORA
OK: Integridade referencial válida para FATO_TRANSACOES.ID_PORTADOR
OK: Integridade referencial válida para FATO_TRANSACOES.ID_TEMPO
Creating date_series table...
✅ date_series table created successfully!
Wrote 5007 bytes.
Documentation saved to /FileStore/tables/dw_cpgf_documentation.md
Otimizando tabela date_series...
Otimizando tabela dim_estabelecimento...
Otimizando ta

# 14. Análise Exploratória

In [0]:
# Estatísticas básicas
stats = spark.sql(f"""
SELECT 
    COUNT(*) as total_transacoes,
    SUM(VALOR_TRANSACAO) as valor_total,
    AVG(VALOR_TRANSACAO) as valor_medio,
    MIN(VALOR_TRANSACAO) as valor_minimo,
    MAX(VALOR_TRANSACAO) as valor_maximo,
    STDDEV(VALOR_TRANSACAO) as desvio_padrao
FROM {database_name}_dw.FATO_TRANSACOES
""").collect()[0]

print(f"Total de transações: {stats['total_transacoes']}")
print(f"Valor total gasto: R$ {stats['valor_total']:,.2f}")
print(f"Valor médio por transação: R$ {stats['valor_medio']:,.2f}")
print(f"Menor valor: R$ {stats['valor_minimo']:,.2f}")
print(f"Maior valor: R$ {stats['valor_maximo']:,.2f}")
print(f"Desvio padrão: R$ {stats['desvio_padrao']:,.2f}")


# Top 10 estabelecimentos que mais receberam pagamentos
display(spark.sql(f"""
SELECT 
    e.NOME_FAVORECIDO,
    COUNT(*) as quantidade_transacoes,
    SUM(f.VALOR_TRANSACAO) as valor_total,
    AVG(f.VALOR_TRANSACAO) as valor_medio
FROM {database_name}_dw.FATO_TRANSACOES f
JOIN {database_name}_dw.DIM_ESTABELECIMENTO e ON f.ID_ESTABELECIMENTO = e.ID_ESTABELECIMENTO
GROUP BY e.NOME_FAVORECIDO
ORDER BY valor_total DESC
LIMIT 10
"""))

# Transações por tipo
display(spark.sql(f"""
SELECT 
    t.TIPO_TRANSACAO,
    t.DESCRICAO_TRANSACAO,
    COUNT(*) as quantidade_transacoes,
    SUM(f.VALOR_TRANSACAO) as valor_total
FROM {database_name}_dw.FATO_TRANSACOES f
JOIN {database_name}_dw.DIM_TIPO_CARTAO t ON f.ID_TIPO_CARTAO = t.ID_TIPO_CARTAO
GROUP BY t.TIPO_TRANSACAO, t.DESCRICAO_TRANSACAO
ORDER BY valor_total DESC
"""))

# Transações por mês/ano
display(spark.sql(f"""
SELECT 
    f.ANO_EXTRATO,
    f.MES_EXTRATO,
    COUNT(*) as quantidade_transacoes,
    SUM(f.VALOR_TRANSACAO) as valor_total
FROM {database_name}_dw.FATO_TRANSACOES f
GROUP BY f.ANO_EXTRATO, f.MES_EXTRATO
ORDER BY f.ANO_EXTRATO, f.MES_EXTRATO
"""))

# Top 10 portadores com mais transações
display(spark.sql(f"""
SELECT 
    p.NOME_PORTADOR,
    COUNT(*) as quantidade_transacoes,
    SUM(f.VALOR_TRANSACAO) as valor_total
FROM {database_name}_dw.FATO_TRANSACOES f
JOIN {database_name}_dw.DIM_PORTADOR p ON f.ID_PORTADOR = p.ID_PORTADOR
GROUP BY p.NOME_PORTADOR
ORDER BY valor_total DESC
LIMIT 10
"""))


Total de transações: 34999
Valor total gasto: R$ 26,673,304.81
Valor médio por transação: R$ 762.12
Menor valor: R$ 0.30
Maior valor: R$ 166,750.00
Desvio padrão: R$ 2,290.45


NOME_FAVORECIDO,quantidade_transacoes,valor_total,valor_medio
Sigiloso,7939,11032170.62,1389.617158
Nao Se Aplica,3925,2637936.73,672.085791
Sem Informacao,1488,1490654.4,1001.783871
Pagar.me Instituicao De Pagamento S.a,394,396629.24,1006.673198
Sendas Distribuidora S/a,170,107073.82,629.846
Kalunga Sa,233,87066.8,373.677253
Leroy Merlin Companhia Brasileira De Bricolagem,140,75795.9,541.399286
Paygo Administradora De Meios De Pagamentos Ltda,64,63800.79,996.887344
Am Pneus Distribuidora De Barra Mansa Ltda,20,48947.18,2447.359
Djejgm Comercio De Combustiveis E Derivados Ltda,17,39982.0,2351.882353


TIPO_TRANSACAO,DESCRICAO_TRANSACAO,quantidade_transacoes,valor_total
COMPRA,Transação de compra,23129,13002651.04
OUTROS,Outros tipos de transação,7945,11032717.04
SAQUE,Transação de saque,3925,2637936.73


ANO_EXTRATO,MES_EXTRATO,quantidade_transacoes,valor_total
2025,1,10791,8292704.68
2025,2,2431,1795181.85
2025,3,9477,7300744.78
2025,4,12300,9284673.5


NOME_PORTADOR,quantidade_transacoes,valor_total
Henrique Araujo Hohne,87,235482.63
Tassio Cristiano Rios De Souza,106,115030.4
Andre Moura Pessoa,103,104362.51
Lamin Goncalves Santos Da Silva,83,98885.16
Marcio Alves Da Silva,36,86472.03
Cristiano Leao Gomes,70,81942.0
Cleriston Barbosa Bello,75,73751.58
Walney Jose De Oliveira Albuquerque,98,73625.3
Gabriel Mantoano Felix De Souza,87,69494.69
Itamar Batista Vanzeler,65,63500.0


# 15. Linhagem de Dados Completa

In [0]:
def generate_data_lineage():
    """Gera um gráfico de linhagem de dados"""
    
    lineage = {
        "Fontes": ["Arquivos CSV do CPGF"],
        "Camada Bronze": ["bronze_transacoes (dados brutos)"],
        "Camada Silver": ["silver_transacoes (dados limpos e padronizados)"],
        "Camada Gold (DW)": [
            "DIM_TIPO_CARTAO",
            "DIM_ESTABELECIMENTO",
            "DIM_UNIDADE_GESTORA",
            "DIM_PORTADOR",
            "DIM_TEMPO",
            "FATO_TRANSACOES"
        ],
        "Consumidores": ["Relatórios de BI", "Dashboards", "Análises"]
    }
    
    print("Linhagem de Dados:")
    for layer, items in lineage.items():
        print(f"\n{layer}:")
        for item in items:
            print(f" - {item}")

# COMMAND ----------

generate_data_lineage()

# Exibe a documentação gerada
displayHTML("<h2>Documentação do Modelo de Dados</h2><pre>" + docs + "</pre>")

Linhagem de Dados:

Fontes:
 - Arquivos CSV do CPGF

Camada Bronze:
 - bronze_transacoes (dados brutos)

Camada Silver:
 - silver_transacoes (dados limpos e padronizados)

Camada Gold (DW):
 - DIM_TIPO_CARTAO
 - DIM_ESTABELECIMENTO
 - DIM_UNIDADE_GESTORA
 - DIM_PORTADOR
 - DIM_TEMPO
 - FATO_TRANSACOES

Consumidores:
 - Relatórios de BI
 - Dashboards
 - Análises


%md
# Impacto e Próximos Passos
 
Esse trabalho serve como base para análises estratégicas, permitindo:

 Identificar padrões de gastos por unidade gestora ou estabelecimento.
 Gerar dashboards de acompanhamento de despesas.
 Detectar anomalias (transações incomuns ou valores atípicos).

Próximas etapas podem incluir:

Agendamento automático da carga (ex: diária/semanal).

Integração com ferramentas de BI (Power BI, Tableau).

Análise mais aprofundada com machine learning (clustering de gastos).

# Conclusão
  
Este projeto não apenas resolveu desafios técnicos de ingestão e modelagem de dados, mas também criou uma base confiável para tomada de decisão no setor público, aumentando a transparência e possibilitando auditorias mais eficientes.