# Gerador de Data-Schemas FISCA

Este notebook gera automaticamente os arquivos de schema para todas as tabelas do projeto FISCA.

## O que este script faz:
1. Executa `DESCRIBE FORMATTED` para cada tabela
2. Executa `SELECT * LIMIT 10` para cada tabela
3. Salva os resultados em arquivos texto e CSV
4. Gera documenta√ß√£o para tabelas intermedi√°rias

## Tabelas processadas:
- **5 tabelas originais** do banco `teste`
- **10 tabelas intermedi√°rias** (DataFrames) com documenta√ß√£o

## 1. Configura√ß√£o e Imports

In [None]:
# Imports
import os
import sys
from datetime import datetime
from pathlib import Path
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *

# Acesso ao SparkSession
spark = session.sparkSession

print("‚úì Imports conclu√≠dos")
print(f"‚úì SparkSession ativa: {spark.version}")

## 2. Configura√ß√µes

In [None]:
# Configura√ß√µes
DATABASE = "teste"
OUTPUT_DIR = "data-schemas"

# Lista de tabelas originais do banco
ORIGINAL_TABLES = [
    "fisca_fiscalizacoes_consolidadas",
    "fisca_dashboard_executivo",
    "fisca_scores_efetividade",
    "fisca_metricas_por_afre",
    "fisca_acompanhamentos",
]

print(f"Database: {DATABASE}")
print(f"Diret√≥rio de sa√≠da: {OUTPUT_DIR}")
print(f"Tabelas a processar: {len(ORIGINAL_TABLES)}")

## 3. Criar Diret√≥rio de Sa√≠da

In [None]:
# Cria diret√≥rio de sa√≠da
output_path = Path(OUTPUT_DIR)
output_path.mkdir(exist_ok=True)

print(f"üìÅ Diret√≥rio criado: {output_path.absolute()}")

## 4. Fun√ß√µes Auxiliares

In [None]:
def save_to_file(output_path, table_name, query_type, content):
    """Salva conte√∫do em arquivo"""
    filename = f"{table_name}_{query_type}.txt"
    filepath = output_path / filename
    
    with open(filepath, 'w', encoding='utf-8') as f:
        f.write(f"# {table_name} - {query_type}\n")
        f.write(f"# Gerado em: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n")
        f.write(f"# {'='*80}\n\n")
        f.write(content)
    
    print(f"   ‚úì Salvo: {filename}")
    return filepath

print("‚úì Fun√ß√µes auxiliares definidas")

## 5. Processar Tabelas

In [None]:
# Rastreamento
processed = []
failed = []

print("="*80)
print("üöÄ INICIANDO PROCESSAMENTO")
print("="*80 + "\n")

for table in ORIGINAL_TABLES:
    print(f"\n{'='*80}")
    print(f"üìä Processando: {DATABASE}.{table}")
    print(f"{'='*80}")
    
    try:
        # 1. DESCRIBE FORMATTED
        print("\n1Ô∏è‚É£ DESCRIBE FORMATTED...")
        describe_query = f"DESCRIBE FORMATTED {DATABASE}.{table}"
        print(f"   Query: {describe_query}")
        
        df_describe = spark.sql(describe_query)
        describe_result = df_describe.toPandas().to_string(index=False)
        save_to_file(output_path, table, "describe_formatted", describe_result)
        
        # 2. SELECT * LIMIT 10
        print("\n2Ô∏è‚É£ SELECT * LIMIT 10...")
        select_query = f"SELECT * FROM {DATABASE}.{table} LIMIT 10"
        print(f"   Query: {select_query}")
        
        df_select = spark.sql(select_query)
        df_pandas = df_select.toPandas()
        
        # Salva TXT
        select_result = df_pandas.to_string(index=False)
        save_to_file(output_path, table, "select_limit_10", select_result)
        
        # Salva CSV
        csv_filename = f"{table}_sample_data.csv"
        csv_filepath = output_path / csv_filename
        df_pandas.to_csv(csv_filepath, index=False, encoding='utf-8')
        print(f"   ‚úì Salvo CSV: {csv_filename}")
        
        # 3. Estat√≠sticas
        print("\n3Ô∏è‚É£ Estat√≠sticas:")
        print(f"   - Colunas: {len(df_pandas.columns)}")
        print(f"   - Linhas (sample): {len(df_pandas)}")
        print(f"   - Colunas: {', '.join(df_pandas.columns[:5])}...")
        
        processed.append(table)
        print(f"\n‚úÖ {table} processado com sucesso!")
        
    except Exception as e:
        print(f"\n‚ùå ERRO: {str(e)}")
        failed.append(table)
        
        # Salva erro
        error_file = output_path / f"{table}_ERROR.txt"
        with open(error_file, 'w') as f:
            f.write(f"Erro ao processar {table}\n")
            f.write(f"Timestamp: {datetime.now()}\n")
            f.write(f"Erro: {str(e)}\n")

print("\n" + "="*80)
print("‚úÖ PROCESSAMENTO DE TABELAS CONCLU√çDO")
print("="*80)

## 6. Gerar Documenta√ß√£o de Tabelas Intermedi√°rias

In [None]:
# Tabelas intermedi√°rias (DataFrames)
INTERMEDIATE_TABLES = {
    "metrics_df": "Dashboard KPIs - m√©tricas consolidadas",
    "temporal_df": "An√°lise temporal - s√©ries temporais",
    "gerencia_df": "Performance por ger√™ncia",
    "afre_df": "Performance por auditor fiscal",
    "geo_df": "Distribui√ß√£o geogr√°fica",
    "cnae_df": "An√°lise setorial por CNAE",
    "ml_df": "Dataset para machine learning",
    "network_df": "Relacionamentos empresa-auditor-infra√ß√£o",
    "infraction_types_df": "Classifica√ß√£o de tipos de infra√ß√µes",
    "company_details_df": "Detalhes de empresas por CNPJ",
}

print("\nüìù Gerando documenta√ß√£o de tabelas intermedi√°rias...\n")

doc_content = "# TABELAS INTERMEDI√ÅRIAS (DataFrames in-memory)\n\n"
doc_content += f"Gerado em: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n\n"
doc_content += "Estas s√£o tabelas criadas dinamicamente como DataFrames Pandas.\n"
doc_content += "N√£o s√£o tabelas persistentes no banco de dados.\n\n"
doc_content += f"{'='*80}\n\n"

for table_name, description in INTERMEDIATE_TABLES.items():
    doc_content += f"## {table_name}\n"
    doc_content += f"**Descri√ß√£o:** {description}\n"
    doc_content += f"**Tipo:** DataFrame Pandas (in-memory)\n"
    doc_content += f"**Origem:** Derivado de fisca_fiscalizacoes_consolidadas\n\n"
    doc_content += "**Estrutura:** Consulte src/modules/database.py\n\n"
    doc_content += f"{'-'*80}\n\n"

doc_filepath = output_path / "intermediate_tables_README.txt"
with open(doc_filepath, 'w', encoding='utf-8') as f:
    f.write(doc_content)

print(f"‚úì Documenta√ß√£o salva: intermediate_tables_README.txt")

## 7. Relat√≥rio Final

In [None]:
# Gera relat√≥rio
print("\nüìã Gerando relat√≥rio final...\n")

summary = "# RELAT√ìRIO DE GERA√á√ÉO DE DATA-SCHEMAS\n\n"
summary += f"Data/Hora: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n"
summary += f"Database: {DATABASE}\n\n"
summary += f"## RESUMO\n\n"
summary += f"- Total de tabelas: {len(ORIGINAL_TABLES)}\n"
summary += f"- Processadas com sucesso: {len(processed) - len(failed)}\n"
summary += f"- Com erro: {len(failed)}\n\n"

summary += f"## TABELAS PROCESSADAS\n\n"
for table in processed:
    status = "‚úó" if table in failed else "‚úì"
    summary += f"{status} {table}\n"

summary += f"\n## ARQUIVOS GERADOS\n\n"
summary += "Para cada tabela:\n"
summary += "- {table}_describe_formatted.txt\n"
summary += "- {table}_select_limit_10.txt\n"
summary += "- {table}_sample_data.csv\n\n"

summary += "Documenta√ß√£o adicional:\n"
summary += "- intermediate_tables_README.txt\n"

report_filepath = output_path / "SUMMARY_REPORT.txt"
with open(report_filepath, 'w', encoding='utf-8') as f:
    f.write(summary)

print(summary)
print(f"\n‚úì Relat√≥rio salvo: {report_filepath}")

## 8. Resumo Final

In [None]:
print("\n" + "="*80)
print("‚úÖ GERA√á√ÉO DE DATA-SCHEMAS CONCLU√çDA!")
print("="*80 + "\n")
print(f"üìÅ Arquivos salvos em: {output_path.absolute()}")
print(f"\nüìä Estat√≠sticas:")
print(f"   - Tabelas processadas: {len(processed)}")
print(f"   - Sucesso: {len(processed) - len(failed)}")
print(f"   - Erros: {len(failed)}")
print(f"   - Tabelas intermedi√°rias documentadas: {len(INTERMEDIATE_TABLES)}")

if failed:
    print(f"\n‚ö†Ô∏è Tabelas com erro:")
    for table in failed:
        print(f"   - {table}")

print("\n" + "="*80)

## 9. (Opcional) Listar Arquivos Gerados

In [None]:
# Lista todos os arquivos gerados
print("\nüìÇ Arquivos gerados:\n")

all_files = sorted(output_path.glob("*"))
for i, file in enumerate(all_files, 1):
    size_kb = file.stat().st_size / 1024
    print(f"{i:2d}. {file.name:50s} ({size_kb:,.1f} KB)")

print(f"\nTotal: {len(all_files)} arquivos")