# Gerador de Data Schemas - Projeto DIMP

Este notebook gera automaticamente os schemas de todas as tabelas do projeto DIMP.

Para cada tabela, executa:
- `DESCRIBE FORMATTED tabela`
- `SELECT * FROM tabela LIMIT 10`

## Tabelas a processar:

### Originais (4):
1. teste.dimp_cnpj_base
2. teste.dimp_socios
3. teste.dimp_pagamentos_cnpj
4. teste.dimp_pagamentos_cpf

### Intermediárias (9):
5. teste.dimp_score_final
6. teste.dimp_operacoes_suspeitas
7. teste.dimp_socios_multiplas_empresas
8. teste.dimp_comparacao_cnpj_cpf
9. teste.dimp_func_score_final
10. teste.dimp_funcionarios_agregado
11. teste.dimp_func_rede_multiplas
12. teste.dimp_func_top_suspeitos
13. usr_sat_ods.vw_ods_contrib

## 1. Imports e Configurações

In [None]:
import os
from datetime import datetime
from pathlib import Path

# Importar utilitários Spark
from utils import spark_utils_session as utils

print("✓ Imports realizados com sucesso")

## 2. Definir Lista de Tabelas

In [None]:
# Lista completa de tabelas do projeto DIMP
TABELAS = {
    # Tabelas Originais (Fonte de Dados)
    'originais': [
        'teste.dimp_cnpj_base',
        'teste.dimp_socios',
        'teste.dimp_pagamentos_cnpj',
        'teste.dimp_pagamentos_cpf',
    ],
    
    # Tabelas Intermediárias/Processadas
    'intermediarias': [
        'teste.dimp_score_final',
        'teste.dimp_operacoes_suspeitas',
        'teste.dimp_socios_multiplas_empresas',
        'teste.dimp_comparacao_cnpj_cpf',
        'teste.dimp_func_score_final',
        'teste.dimp_funcionarios_agregado',
        'teste.dimp_func_rede_multiplas',
        'teste.dimp_func_top_suspeitos',
        'usr_sat_ods.vw_ods_contrib',
    ]
}

total_tabelas = sum(len(tabelas) for tabelas in TABELAS.values())
print(f"Total de tabelas a processar: {total_tabelas}")
print(f"  - Originais: {len(TABELAS['originais'])}")
print(f"  - Intermediárias: {len(TABELAS['intermediarias'])}")

## 3. Inicializar Sessão Spark

In [None]:
def get_session(profile: str = 'default', dynamic_allocation_enabled: bool = True):
    """Cria sessão Spark."""
    spark_builder = (utils.DBASparkAppSession
                     .builder
                     .appName('DIMP_Data_Schema_Generator')
                     .language(utils.AvailableLanguages.PYTHON)
                     .profileName(profile))
    
    if dynamic_allocation_enabled:
        spark_builder.autoResourceManagement()
    
    return spark_builder.build()

# Inicializar sessão
print("Inicializando sessão Spark...")
session = get_session()
spark = session.sparkSession
print("✓ Sessão Spark iniciada com sucesso")

## 4. Criar Estrutura de Pastas

In [None]:
# Criar estrutura de diretórios
base_path = Path('data-schemas')

dirs = [
    base_path / 'originais',
    base_path / 'intermediarias',
]

for dir_path in dirs:
    dir_path.mkdir(parents=True, exist_ok=True)
    print(f"✓ {dir_path}")

print(f"\n✓ Estrutura criada em: {base_path.absolute()}")

## 5. Funções Auxiliares

In [None]:
def executar_describe_formatted(spark, tabela: str) -> str:
    """Executa DESCRIBE FORMATTED e retorna resultado como string."""
    try:
        print(f"  → DESCRIBE FORMATTED {tabela}")
        df = spark.sql(f"DESCRIBE FORMATTED {tabela}")
        
        resultado = f"-- DESCRIBE FORMATTED {tabela}\n"
        resultado += f"-- Gerado em: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n\n"
        
        rows = df.collect()
        for row in rows:
            resultado += f"{row[0]:<35} {row[1]:<40} {row[2] if len(row) > 2 else ''}\n"
        
        return resultado
    except Exception as e:
        print(f"  ✗ Erro: {str(e)}")
        return f"-- ERRO: {str(e)}\n"


def executar_select_sample(spark, tabela: str) -> str:
    """Executa SELECT * LIMIT 10 e retorna resultado como string."""
    try:
        print(f"  → SELECT * FROM {tabela} LIMIT 10")
        df = spark.sql(f"SELECT * FROM {tabela} LIMIT 10")
        
        resultado = f"\n\n-- SELECT * FROM {tabela} LIMIT 10\n"
        resultado += f"-- Gerado em: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n\n"
        
        colunas = df.columns
        rows = df.collect()
        
        if not rows:
            resultado += "-- Tabela vazia (sem dados)\n"
            return resultado
        
        resultado += " | ".join(colunas) + "\n"
        resultado += "-" * (len(" | ".join(colunas))) + "\n"
        
        for row in rows:
            valores = [str(row[col]) if row[col] is not None else 'NULL' for col in colunas]
            resultado += " | ".join(valores) + "\n"
        
        return resultado
    except Exception as e:
        print(f"  ✗ Erro: {str(e)}")
        return f"\n\n-- ERRO: {str(e)}\n"


def gerar_schema_tabela(spark, tabela: str, tipo: str, base_path: Path):
    """Gera schema completo para uma tabela."""
    print(f"\n{'='*70}")
    print(f"Processando: {tabela}")
    print(f"{'='*70}")
    
    # Executar comandos
    describe_result = executar_describe_formatted(spark, tabela)
    select_result = executar_select_sample(spark, tabela)
    
    # Combinar resultados
    conteudo_completo = describe_result + select_result
    
    # Salvar arquivo
    nome_limpo = tabela.replace('.', '_')
    arquivo = base_path / tipo / f"{nome_limpo}.txt"
    arquivo.write_text(conteudo_completo, encoding='utf-8')
    
    print(f"✓ Salvo em: {arquivo}")
    return arquivo

print("✓ Funções definidas")

## 6. PROCESSAR TABELAS ORIGINAIS

In [None]:
print("#" * 70)
print("# PROCESSANDO: TABELAS ORIGINAIS")
print("#" * 70)

for tabela in TABELAS['originais']:
    try:
        gerar_schema_tabela(spark, tabela, 'originais', base_path)
    except Exception as e:
        print(f"✗ ERRO em {tabela}: {e}")

print("\n✓ Tabelas originais processadas!")

## 7. PROCESSAR TABELAS INTERMEDIÁRIAS

In [None]:
print("#" * 70)
print("# PROCESSANDO: TABELAS INTERMEDIÁRIAS")
print("#" * 70)

for tabela in TABELAS['intermediarias']:
    try:
        gerar_schema_tabela(spark, tabela, 'intermediarias', base_path)
    except Exception as e:
        print(f"✗ ERRO em {tabela}: {e}")

print("\n✓ Tabelas intermediárias processadas!")

## 8. Gerar Resumo Final

In [None]:
# Listar arquivos gerados
print("\n" + "=" * 70)
print("ARQUIVOS GERADOS:")
print("=" * 70)

print("\nOriginais:")
for arquivo in sorted((base_path / 'originais').glob('*.txt')):
    print(f"  ✓ {arquivo.name}")

print("\nIntermediárias:")
for arquivo in sorted((base_path / 'intermediarias').glob('*.txt')):
    print(f"  ✓ {arquivo.name}")

# Gerar arquivo de resumo
resumo_path = base_path / 'RESUMO.txt'
resumo = f"""
RESUMO DA GERAÇÃO DE DATA SCHEMAS - PROJETO DIMP
{'='*70}

Data/Hora: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
Total de tabelas: {total_tabelas}

TABELAS ORIGINAIS ({len(TABELAS['originais'])}):
{chr(10).join(f'  - {t}' for t in TABELAS['originais'])}

TABELAS INTERMEDIÁRIAS ({len(TABELAS['intermediarias'])}):
{chr(10).join(f'  - {t}' for t in TABELAS['intermediarias'])}

{'='*70}
Arquivos salvos em: {base_path.absolute()}
"""

resumo_path.write_text(resumo, encoding='utf-8')
print(resumo)

print("\n✓ CONCLUÍDO!")

## 9. Finalizar Sessão Spark

In [None]:
# Finalizar sessão
session.stop()
print("✓ Sessão Spark finalizada")

---

## ✅ Processo Completo!

Os schemas foram gerados na pasta `data-schemas/` com a seguinte estrutura:

```
data-schemas/
├── RESUMO.txt
├── originais/
│   ├── teste_dimp_cnpj_base.txt
│   ├── teste_dimp_socios.txt
│   ├── teste_dimp_pagamentos_cnpj.txt
│   └── teste_dimp_pagamentos_cpf.txt
└── intermediarias/
    ├── teste_dimp_score_final.txt
    ├── teste_dimp_operacoes_suspeitas.txt
    ├── teste_dimp_socios_multiplas_empresas.txt
    ├── teste_dimp_comparacao_cnpj_cpf.txt
    ├── teste_dimp_func_score_final.txt
    ├── teste_dimp_funcionarios_agregado.txt
    ├── teste_dimp_func_rede_multiplas.txt
    ├── teste_dimp_func_top_suspeitos.txt
    └── usr_sat_ods_vw_ods_contrib.txt
```