In [None]:
import sys
sys.path.append("/home/tsevero/notebooks/SAT_BIG_DATA/data-pipeline/batch/poc")
sys.path.append("/home/tsevero/notebooks/SAT_BIG_DATA/data-pipeline/batch/plugins")
sys.path.append("/home/tsevero/notebooks/SAT_BIG_DATA/data-pipeline/batch/dags")

#Import libs python
from pyspark.sql.types import *
from pyspark.sql.functions import *
from datetime import date

#Import libs internas
from utils import spark_utils_session as utils

from hooks.hdfs.hdfs_helper import HdfsHelper
from jobs.job_base_config import BaseETLJobClass

import poc_helper
poc_helper.load_env("PROD")

In [None]:
def get_session(profile: str, dynamic_allocation_enabled: bool = True) -> utils.DBASparkAppSession:
    """Generates DBASparkAppSession."""
    
    app_name = "tsevero_luciano_analises"
    
    spark_builder = (utils.DBASparkAppSession
                     .builder
                     .setAppName(app_name)
                     .usingProcessProfile(profile)
                    )
    
    if dynamic_allocation_enabled:
        spark_builder.autoResourceManagement()

    return spark_builder.build()

session = get_session(profile='efd_t2')

In [None]:
session.sparkSession.sql("SHOW DATABASES").show(truncate=False)

In [None]:
# ============================================================================
# CONFIGURA√á√ÉO INICIAL - 
# ============================================================================

import sys
import warnings
from datetime import datetime, date
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import plotly.express as px
import plotly.graph_objects as go
from plotly.subplots import make_subplots

# PySpark imports com aliases para evitar conflitos
from pyspark.sql.functions import (
    col as spark_col, 
    sum as spark_sum, 
    avg as spark_avg,
    count as spark_count,
    when as spark_when,
    desc as spark_desc,
    asc as spark_asc,
    round as spark_round,
    concat as spark_concat,
    lit as spark_lit,
    max as spark_max,
    min as spark_min,
    stddev as spark_stddev,
    countDistinct as spark_countDistinct
)
from pyspark.sql.types import DoubleType, IntegerType

# Configura√ß√µes de visualiza√ß√£o
warnings.filterwarnings('ignore')
plt.style.use('seaborn-v0_8-darkgrid')
sns.set_palette("husl")
plt.rcParams['figure.figsize'] = (16, 8)
plt.rcParams['font.size'] = 11

# ‚úÖ CORRE√á√ÉO: N√£o usar abs() que conflita com PySpark
# pd.set_option('display.float_format', lambda x: f'{x:,.2f}' if abs(x) > 0.01 else f'{x:.6f}')
pd.set_option('display.max_columns', None)
pd.set_option('display.precision', 2)

# Acesso ao Spark
spark = session.sparkSession

print("=" * 80)
print("üîç SISTEMA")
print("=" * 80)
print(f"Sess√£o Spark: {spark.sparkContext.appName}")
print(f"Vers√£o Spark: {spark.version}")
print(f"Iniciado em: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
print("=" * 80)

In [None]:
# ===================================================================
# C√âLULA 1: SETUP E FUN√á√ÉO HELPER PARA AN√ÅLISES
# ===================================================================

import pandas as pd
import numpy as np
from datetime import datetime
import warnings
warnings.filterwarnings('ignore')

# Configura√ß√µes de display
pd.set_option('display.max_columns', None)
pd.set_option('display.max_rows', 100)
pd.set_option('display.width', None)
pd.set_option('display.float_format', lambda x: f'{x:,.2f}')

print("="*80)
print("üìä PROJETO LUCIANO - AN√ÅLISES COMPLETAS")
print("="*80)
print(f"Data/Hora de Execu√ß√£o: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
print("="*80)

# Fun√ß√£o para executar queries e converter resultados
def executar_analise(nome_analise, query_sql, exibir=True, salvar=False):
    """
    Executa uma query SQL e retorna/exibe resultados
    
    Args:
        nome_analise: Nome da an√°lise
        query_sql: Query SQL a executar
        exibir: Se True, exibe o resultado
        salvar: Se True, salva em CSV
    
    Returns:
        DataFrame pandas com resultados
    """
    try:
        print(f"\n{'='*80}")
        print(f"üîç {nome_analise}")
        print(f"{'='*80}")
        
        # Executar query
        df_resultado = spark.sql(query_sql).toPandas()
        
        if exibir and len(df_resultado) > 0:
            print(f"\nüìä Resultados ({len(df_resultado):,} registros):")
            print(df_resultado.to_string(index=False))
        elif len(df_resultado) == 0:
            print("‚ö†Ô∏è  Nenhum registro encontrado")
        
        if salvar:
            nome_arquivo = f"{nome_analise.replace(' ', '_').lower()}.csv"
            df_resultado.to_csv(nome_arquivo, index=False)
            print(f"\nüíæ Salvo em: {nome_arquivo}")
        
        return df_resultado
        
    except Exception as e:
        print(f"‚ùå Erro na an√°lise '{nome_analise}': {str(e)}")
        return pd.DataFrame()

print("\n‚úÖ Setup conclu√≠do!")

In [None]:
# ===================================================================
# C√âLULA 2: DASHBOARD EXECUTIVO E ESTAT√çSTICAS GERAIS
# ===================================================================

print("\n" + "="*80)
print("üìä SE√á√ÉO 1: DASHBOARD EXECUTIVO")
print("="*80)

# ========================================
# AN√ÅLISE 1: DASHBOARD EXECUTIVO
# ========================================

query_dashboard = """
SELECT 
    'VOLUMETRIA GERAL' AS secao,
    'Total de Empresas Analisadas' AS metrica,
    CAST(total_empresas AS STRING) AS valor,
    'Empresas √∫nicas no per√≠odo de 60 meses' AS observacao
FROM teste.luciano_resumo

UNION ALL

SELECT 
    'VOLUMETRIA GERAL',
    'Total de Protocolos',
    CAST(total_protocolos AS STRING),
    'Cancelamentos processados'
FROM teste.luciano_resumo

UNION ALL

SELECT 
    'RISCO',
    'Empresas Risco CR√çTICO',
    CONCAT(CAST(empresas_risco_critico AS STRING), ' (', CAST(perc_risco_critico AS STRING), '%)'),
    'Prioridade m√°xima de fiscaliza√ß√£o'
FROM teste.luciano_resumo

UNION ALL

SELECT 
    'RISCO',
    'Empresas Risco ALTO',
    CONCAT(CAST(empresas_risco_alto AS STRING), ' (', CAST(perc_risco_alto AS STRING), '%)'),
    'Segunda prioridade'
FROM teste.luciano_resumo

UNION ALL

SELECT 
    'EFETIVIDADE',
    'Taxa M√©dia de Perman√™ncia',
    CONCAT(CAST(taxa_media_permanencia AS STRING), '%'),
    'Empresas que continuam canceladas'
FROM teste.luciano_resumo

UNION ALL

SELECT 
    'CR√âDITO',
    'Saldo Credor Total em Risco',
    CONCAT('R$ ', CAST(ROUND(saldo_credor_total, 2) AS STRING)),
    'Valor acumulado em empresas canceladas'
FROM teste.luciano_resumo

UNION ALL

SELECT 
    'IND√çCIOS',
    'Empresas com Ind√≠cios',
    CONCAT(CAST(empresas_com_indicios AS STRING), ' (', CAST(perc_com_indicios AS STRING), '%)'),
    'Empresas sinalizadas por irregularidades'
FROM teste.luciano_resumo
"""

df_dashboard = executar_analise("1. Dashboard Executivo", query_dashboard)

# ========================================
# AN√ÅLISE 2: ESTAT√çSTICAS GERAIS
# ========================================

query_estatisticas = """
SELECT 
    COUNT(DISTINCT cnpj) AS total_empresas,
    SUM(total_protocolos) AS total_protocolos,
    ROUND(AVG(total_protocolos), 2) AS media_protocolos_por_empresa,
    MAX(total_protocolos) AS max_protocolos_empresa,
    
    SUM(CASE WHEN classificacao_risco_final = 'CR√çTICO' THEN 1 ELSE 0 END) AS qtde_critico,
    SUM(CASE WHEN classificacao_risco_final = 'ALTO' THEN 1 ELSE 0 END) AS qtde_alto,
    SUM(CASE WHEN classificacao_risco_final = 'M√âDIO' THEN 1 ELSE 0 END) AS qtde_medio,
    SUM(CASE WHEN classificacao_risco_final = 'BAIXO' THEN 1 ELSE 0 END) AS qtde_baixo,
    
    ROUND(AVG(score_total), 2) AS score_medio,
    ROUND(STDDEV(score_total), 2) AS desvio_padrao_score,
    ROUND(MIN(score_total), 2) AS score_minimo,
    ROUND(MAX(score_total), 2) AS score_maximo,
    
    SUM(flag_atualmente_cancelada) AS ainda_canceladas,
    COUNT(*) - SUM(flag_atualmente_cancelada) AS reativadas,
    ROUND(SUM(flag_atualmente_cancelada) * 100.0 / COUNT(*), 2) AS perc_canceladas,
    
    SUM(flag_empresa_reincidente) AS empresas_reincidentes,
    ROUND(SUM(flag_empresa_reincidente) * 100.0 / COUNT(*), 2) AS perc_reincidentes,
    
    SUM(saldo_credor_atual) AS saldo_total,
    ROUND(AVG(saldo_credor_atual), 2) AS saldo_medio,
    SUM(CASE WHEN saldo_credor_atual > 1000000 THEN 1 ELSE 0 END) AS empresas_saldo_acima_1M,
    SUM(CASE WHEN saldo_credor_atual > 10000000 THEN 1 ELSE 0 END) AS empresas_saldo_acima_10M,
    
    SUM(flag_tem_indicios) AS com_indicios,
    SUM(flag_tem_indicios_graves) AS com_indicios_graves,
    SUM(qtde_indicios) AS total_indicios,
    ROUND(AVG(CASE WHEN flag_tem_indicios = 1 THEN qtde_indicios END), 2) AS media_indicios_quando_tem

FROM teste.luciano_scores
"""

df_estatisticas = executar_analise("2. Estat√≠sticas Gerais", query_estatisticas)

print("\n‚úÖ Se√ß√£o 1 conclu√≠da!")

In [None]:
# ===================================================================
# C√âLULA 3: AN√ÅLISES DE RISCO E TEMPORAL
# ===================================================================

print("\n" + "="*80)
print("üìä SE√á√ÉO 2: AN√ÅLISES DE RISCO E TEMPORAL")
print("="*80)

# ========================================
# AN√ÅLISE 3: POR CLASSIFICA√á√ÉO DE RISCO
# ========================================

query_risco = """
SELECT 
    classificacao_risco_final,
    COUNT(*) AS qtde_empresas,
    ROUND(COUNT(*) * 100.0 / SUM(COUNT(*)) OVER(), 2) AS perc_total,
    
    ROUND(AVG(score_total), 2) AS score_total_medio,
    ROUND(AVG(score_comportamento), 2) AS score_comportamento_medio,
    ROUND(AVG(score_credito), 2) AS score_credito_medio,
    ROUND(AVG(score_indicios), 2) AS score_indicios_medio,
    
    SUM(total_protocolos) AS total_protocolos,
    ROUND(AVG(total_protocolos), 2) AS media_protocolos,
    
    SUM(flag_atualmente_cancelada) AS ainda_canceladas,
    ROUND(AVG(taxa_permanencia_cancelamento_perc), 2) AS taxa_permanencia_media,
    
    SUM(saldo_credor_atual) AS saldo_total,
    ROUND(AVG(saldo_credor_atual), 2) AS saldo_medio,
    MAX(saldo_credor_atual) AS saldo_maximo,
    
    SUM(qtde_indicios) AS total_indicios,
    ROUND(AVG(qtde_indicios), 2) AS media_indicios,
    SUM(qtde_indicios_graves) AS indicios_graves,
    
    SUM(flag_saldo_alto_cancelada) AS flag_saldo_alto,
    SUM(flag_suspeita_valores_repetidos) AS flag_valores_repetidos

FROM teste.luciano_scores
GROUP BY classificacao_risco_final
ORDER BY 
    CASE classificacao_risco_final
        WHEN 'CR√çTICO' THEN 1
        WHEN 'ALTO' THEN 2
        WHEN 'M√âDIO' THEN 3
        WHEN 'BAIXO' THEN 4
    END
"""

df_risco = executar_analise("3. An√°lise por Classifica√ß√£o de Risco", query_risco, salvar=True)

# ========================================
# AN√ÅLISE 4: EVOLU√á√ÉO TEMPORAL MENSAL
# ========================================

query_temporal_mensal = """
SELECT 
    periodo_cancelamento,
    ano_cancelamento,
    mes_cancelamento,
    
    qtde_protocolos,
    qtde_empresas_distintas,
    qtde_usuarios_distintos,
    
    qtde_automaticos,
    qtde_manuais,
    ROUND(qtde_automaticos * 100.0 / qtde_protocolos, 2) AS perc_automaticos,
    
    qtde_ainda_canceladas,
    qtde_reativadas,
    taxa_permanencia_perc,
    taxa_reativacao_perc,
    
    ROUND(media_dias_ate_reativacao, 0) AS dias_medios_reativacao

FROM teste.luciano_temporal
ORDER BY ano_cancelamento DESC, mes_cancelamento DESC
LIMIT 24
"""

df_temporal_mensal = executar_analise("4. Evolu√ß√£o Temporal Mensal (√öltimos 24 meses)", 
                                       query_temporal_mensal, salvar=True)

# ========================================
# AN√ÅLISE 5: EVOLU√á√ÉO TEMPORAL ANUAL
# ========================================

query_temporal_anual = """
SELECT 
    ano_cancelamento,
    
    SUM(qtde_protocolos) AS total_protocolos,
    SUM(qtde_empresas_distintas) AS empresas_distintas,
    
    SUM(qtde_automaticos) AS total_automaticos,
    SUM(qtde_manuais) AS total_manuais,
    ROUND(SUM(qtde_automaticos) * 100.0 / SUM(qtde_protocolos), 2) AS perc_automaticos,
    
    SUM(qtde_ainda_canceladas) AS total_ainda_canceladas,
    SUM(qtde_reativadas) AS total_reativadas,
    ROUND(SUM(qtde_ainda_canceladas) * 100.0 / SUM(qtde_protocolos), 2) AS taxa_permanencia_anual,
    
    COUNT(DISTINCT periodo_cancelamento) AS meses_com_atividade,
    ROUND(AVG(qtde_protocolos), 2) AS media_mensal_protocolos,
    MAX(qtde_protocolos) AS pico_mensal

FROM teste.luciano_temporal
GROUP BY ano_cancelamento
ORDER BY ano_cancelamento DESC
"""

df_temporal_anual = executar_analise("5. Evolu√ß√£o Temporal Anual", query_temporal_anual, salvar=True)

print("\n‚úÖ Se√ß√£o 2 conclu√≠da!")

In [None]:
# ===================================================================
# C√âLULA 4: AN√ÅLISES GEOGR√ÅFICAS E POR CNAE
# ===================================================================

print("\n" + "="*80)
print("üìä SE√á√ÉO 3: AN√ÅLISES GEOGR√ÅFICAS E POR SETOR")
print("="*80)

# ========================================
# AN√ÅLISE 6: POR MUNIC√çPIO (TOP 20)
# ========================================

query_municipio = """
SELECT 
    m.municipio,
    m.uf,
    m.gerencia_regional,
    
    COUNT(DISTINCT m.cnpj) AS qtde_empresas,
    SUM(m.total_protocolos) AS total_protocolos,
    ROUND(AVG(m.total_protocolos), 2) AS media_protocolos,
    
    SUM(CASE WHEN s.classificacao_risco_final IN ('CR√çTICO', 'ALTO') THEN 1 ELSE 0 END) AS empresas_alto_risco,
    ROUND(SUM(CASE WHEN s.classificacao_risco_final IN ('CR√çTICO', 'ALTO') THEN 1 ELSE 0 END) * 100.0 / COUNT(*), 2) AS perc_alto_risco,
    
    SUM(m.qtde_ainda_cancelada) AS ainda_canceladas,
    ROUND(AVG(m.taxa_permanencia_cancelamento_perc), 2) AS taxa_permanencia_media,
    
    SUM(CASE WHEN m.flag_empresa_reincidente = 1 THEN 1 ELSE 0 END) AS reincidentes,
    
    COUNT(DISTINCT m.cd_cnae) AS diversidade_cnaes

FROM teste.luciano_metricas m
LEFT JOIN teste.luciano_scores s ON m.cnpj = s.cnpj
WHERE m.municipio IS NOT NULL
GROUP BY m.municipio, m.uf, m.gerencia_regional
HAVING COUNT(DISTINCT m.cnpj) >= 5
ORDER BY qtde_empresas DESC
LIMIT 20
"""

df_municipio = executar_analise("6. An√°lise por Munic√≠pio (Top 20)", query_municipio, salvar=True)

# ========================================
# AN√ÅLISE 7: POR GER√äNCIA REGIONAL
# ========================================

query_gerfe = """
SELECT 
    m.gerencia_regional,
    
    COUNT(DISTINCT m.cnpj) AS qtde_empresas,
    SUM(m.total_protocolos) AS total_protocolos,
    ROUND(AVG(m.total_protocolos), 2) AS media_protocolos,
    
    COUNT(DISTINCT m.municipio) AS qtde_municipios,
    COUNT(DISTINCT m.cd_cnae) AS qtde_cnaes,
    
    SUM(CASE WHEN s.classificacao_risco_final = 'CR√çTICO' THEN 1 ELSE 0 END) AS risco_critico,
    SUM(CASE WHEN s.classificacao_risco_final = 'ALTO' THEN 1 ELSE 0 END) AS risco_alto,
    ROUND(SUM(CASE WHEN s.classificacao_risco_final IN ('CR√çTICO', 'ALTO') THEN 1 ELSE 0 END) * 100.0 / COUNT(*), 2) AS perc_alto_risco,
    
    ROUND(AVG(s.score_total), 2) AS score_medio,
    
    SUM(m.qtde_ainda_cancelada) AS ainda_canceladas,
    ROUND(AVG(m.taxa_permanencia_cancelamento_perc), 2) AS taxa_permanencia,
    
    SUM(c.saldo_credor_atual) AS saldo_credor_total,
    ROUND(AVG(c.saldo_credor_atual), 2) AS saldo_credor_medio,
    
    SUM(i.qtde_indicios) AS total_indicios,
    ROUND(AVG(i.qtde_indicios), 2) AS media_indicios

FROM teste.luciano_metricas m
LEFT JOIN teste.luciano_scores s ON m.cnpj = s.cnpj
LEFT JOIN teste.luciano_credito c ON m.cnpj = c.cnpj
LEFT JOIN teste.luciano_indicios i ON m.cnpj = i.cnpj
WHERE m.gerencia_regional IS NOT NULL
GROUP BY m.gerencia_regional
ORDER BY qtde_empresas DESC
"""

df_gerfe = executar_analise("7. An√°lise por Ger√™ncia Regional", query_gerfe, salvar=True)

# ========================================
# AN√ÅLISE 8: POR CNAE (TOP 15)
# ========================================

query_cnae = """
SELECT 
    m.cd_cnae,
    MAX(m.descricao_cnae) AS descricao_cnae,
    
    COUNT(DISTINCT m.cnpj) AS qtde_empresas,
    SUM(m.total_protocolos) AS total_protocolos,
    ROUND(AVG(m.total_protocolos), 2) AS media_protocolos,
    
    SUM(CASE WHEN s.classificacao_risco_final IN ('CR√çTICO', 'ALTO') THEN 1 ELSE 0 END) AS alto_risco,
    ROUND(SUM(CASE WHEN s.classificacao_risco_final IN ('CR√çTICO', 'ALTO') THEN 1 ELSE 0 END) * 100.0 / COUNT(*), 2) AS perc_alto_risco,
    
    ROUND(AVG(s.score_total), 2) AS score_medio,
    ROUND(AVG(s.score_comportamento), 2) AS score_comportamento,
    ROUND(AVG(s.score_credito), 2) AS score_credito,
    ROUND(AVG(s.score_indicios), 2) AS score_indicios,
    
    SUM(c.saldo_credor_atual) AS saldo_total,
    ROUND(AVG(c.saldo_credor_atual), 2) AS saldo_medio,
    
    SUM(i.qtde_indicios) AS total_indicios,
    SUM(i.qtde_indicios_graves) AS indicios_graves,
    
    SUM(m.flag_empresa_reincidente) AS reincidentes,
    ROUND(SUM(m.flag_empresa_reincidente) * 100.0 / COUNT(*), 2) AS perc_reincidentes

FROM teste.luciano_metricas m
LEFT JOIN teste.luciano_scores s ON m.cnpj = s.cnpj
LEFT JOIN teste.luciano_credito c ON m.cnpj = c.cnpj
LEFT JOIN teste.luciano_indicios i ON m.cnpj = i.cnpj
WHERE m.cd_cnae IS NOT NULL
GROUP BY m.cd_cnae
HAVING COUNT(DISTINCT m.cnpj) >= 10
ORDER BY qtde_empresas DESC
LIMIT 15
"""

df_cnae = executar_analise("8. An√°lise por CNAE (Top 15)", query_cnae, salvar=True)

print("\n‚úÖ Se√ß√£o 3 conclu√≠da!")

In [None]:
# ===================================================================
# C√âLULA 5: AN√ÅLISES DE CR√âDITO E IND√çCIOS
# ===================================================================

print("\n" + "="*80)
print("üìä SE√á√ÉO 4: AN√ÅLISES DE CR√âDITO E IND√çCIOS")
print("="*80)

# ========================================
# AN√ÅLISE 9: AN√ÅLISE DE CR√âDITO DETALHADA
# ========================================

query_credito = """
SELECT 
    CASE 
        WHEN saldo_credor_atual <= 0 THEN '0 - Sem saldo'
        WHEN saldo_credor_atual <= 10000 THEN '1 - At√© 10k'
        WHEN saldo_credor_atual <= 100000 THEN '2 - 10k a 100k'
        WHEN saldo_credor_atual <= 500000 THEN '3 - 100k a 500k'
        WHEN saldo_credor_atual <= 1000000 THEN '4 - 500k a 1M'
        WHEN saldo_credor_atual <= 10000000 THEN '5 - 1M a 10M'
        ELSE '6 - Acima de 10M'
    END AS faixa_saldo,
    
    COUNT(*) AS qtde_empresas,
    ROUND(COUNT(*) * 100.0 / SUM(COUNT(*)) OVER(), 2) AS perc_total,
    
    SUM(saldo_credor_atual) AS saldo_total_faixa,
    ROUND(AVG(saldo_credor_atual), 2) AS saldo_medio,
    
    SUM(vl_credito_60m) AS credito_60m_total,
    SUM(vl_credito_presumido_60m) AS credito_presumido_total,
    
    SUM(flag_saldo_alto_cancelada) AS flag_saldo_alto,
    SUM(flag_suspeita_valores_repetidos) AS flag_valores_repetidos,
    SUM(flag_saldo_sem_declaracao) AS flag_sem_declaracao,
    SUM(flag_credito_muito_estavel) AS flag_muito_estavel,
    SUM(flag_crescimento_anormal_saldo) AS flag_crescimento_anormal,
    
    SUM(flag_saldo_alto_cancelada + flag_suspeita_valores_repetidos + 
        flag_saldo_sem_declaracao + flag_credito_muito_estavel + 
        flag_crescimento_anormal_saldo) AS total_alertas

FROM teste.luciano_credito
GROUP BY 
    CASE 
        WHEN saldo_credor_atual <= 0 THEN '0 - Sem saldo'
        WHEN saldo_credor_atual <= 10000 THEN '1 - At√© 10k'
        WHEN saldo_credor_atual <= 100000 THEN '2 - 10k a 100k'
        WHEN saldo_credor_atual <= 500000 THEN '3 - 100k a 500k'
        WHEN saldo_credor_atual <= 1000000 THEN '4 - 500k a 1M'
        WHEN saldo_credor_atual <= 10000000 THEN '5 - 1M a 10M'
        ELSE '6 - Acima de 10M'
    END
ORDER BY faixa_saldo
"""

df_credito = executar_analise("9. An√°lise de Cr√©dito por Faixa de Saldo", query_credito, salvar=True)

# ========================================
# AN√ÅLISE 10: TOP 20 EMPRESAS - MAIOR SALDO CREDOR
# ========================================

query_top_saldo = """
SELECT 
    c.cnpj,
    c.nome_contribuinte,
    s.classificacao_risco_final,
    s.score_total,
    
    c.saldo_credor_atual,
    c.vl_credito_60m,
    c.vl_credito_presumido_60m,
    
    c.perc_valores_iguais_12m,
    c.variacao_saldo_perc_60m,
    
    m.total_protocolos,
    m.taxa_permanencia_cancelamento_perc,
    
    i.qtde_indicios,
    
    c.flag_saldo_alto_cancelada,
    c.flag_suspeita_valores_repetidos,
    c.flag_saldo_sem_declaracao,
    c.flag_crescimento_anormal_saldo

FROM teste.luciano_credito c
JOIN teste.luciano_scores s ON c.cnpj = s.cnpj
JOIN teste.luciano_metricas m ON c.cnpj = m.cnpj
LEFT JOIN teste.luciano_indicios i ON c.cnpj = i.cnpj
WHERE s.flag_atualmente_cancelada = 1
    AND c.saldo_credor_atual > 0
ORDER BY c.saldo_credor_atual DESC
LIMIT 20
"""

df_top_saldo = executar_analise("10. Top 20 Empresas - Maior Saldo Credor em Risco", 
                                 query_top_saldo, salvar=True)

# ========================================
# AN√ÅLISE 11: AN√ÅLISE DE IND√çCIOS DETALHADA
# ========================================

query_indicios = """
SELECT 
    classificacao_risco_indicios,
    
    COUNT(*) AS qtde_empresas,
    ROUND(COUNT(*) * 100.0 / SUM(COUNT(*)) OVER(), 2) AS perc_total,
    
    SUM(qtde_indicios) AS total_indicios,
    ROUND(AVG(qtde_indicios), 2) AS media_indicios,
    MAX(qtde_indicios) AS max_indicios,
    
    SUM(qtde_indicios_graves) AS total_graves,
    ROUND(AVG(qtde_indicios_graves), 2) AS media_graves,
    
    SUM(soma_scores_indicios) AS soma_total_scores,
    ROUND(AVG(soma_scores_indicios), 2) AS media_scores,
    
    SUM(qtde_indicios_docs_fiscais) AS indicios_docs_fiscais,
    SUM(qtde_indicios_relacionamentos) AS indicios_relacionamentos,
    SUM(qtde_indicios_declaracoes) AS indicios_declaracoes,
    SUM(qtde_indicios_cadastro) AS indicios_cadastro,
    
    SUM(flag_tem_indicios) AS com_indicios,
    SUM(flag_tem_indicios_graves) AS com_indicios_graves

FROM teste.luciano_indicios
GROUP BY classificacao_risco_indicios
ORDER BY 
    CASE classificacao_risco_indicios
        WHEN 'CR√çTICO' THEN 1
        WHEN 'ALTO' THEN 2
        WHEN 'M√âDIO' THEN 3
        WHEN 'BAIXO' THEN 4
        WHEN 'SEM_INDICIOS' THEN 5
    END
"""

df_indicios = executar_analise("11. An√°lise de Ind√≠cios Detalhada", query_indicios, salvar=True)

# ========================================
# AN√ÅLISE 12: TOP 20 EMPRESAS - MAIS IND√çCIOS
# ========================================

query_top_indicios = """
SELECT 
    i.cnpj,
    i.nome_contribuinte,
    i.classificacao_risco_indicios,
    
    i.qtde_indicios,
    i.qtde_tipos_indicios_distintos,
    i.soma_scores_indicios,
    i.qtde_indicios_graves,
    
    i.qtde_indicios_docs_fiscais,
    i.qtde_indicios_relacionamentos,
    i.qtde_indicios_declaracoes,
    i.qtde_indicios_cadastro,
    
    s.classificacao_risco_final,
    s.score_total,
    s.flag_atualmente_cancelada,
    
    c.saldo_credor_atual,
    
    m.total_protocolos,
    m.taxa_permanencia_cancelamento_perc

FROM teste.luciano_indicios i
JOIN teste.luciano_scores s ON i.cnpj = s.cnpj
JOIN teste.luciano_metricas m ON i.cnpj = m.cnpj
LEFT JOIN teste.luciano_credito c ON i.cnpj = c.cnpj
WHERE i.qtde_indicios > 0
ORDER BY i.soma_scores_indicios DESC, i.qtde_indicios DESC
LIMIT 20
"""

df_top_indicios = executar_analise("12. Top 20 Empresas - Mais Ind√≠cios", 
                                    query_top_indicios, salvar=True)

print("\n‚úÖ Se√ß√£o 4 conclu√≠da!")

In [None]:
# ===================================================================
# C√âLULA 6: AN√ÅLISES DE DESEMPENHO DOS FISCAIS
# ===================================================================

print("\n" + "="*80)
print("üìä SE√á√ÉO 5: AN√ÅLISES DE DESEMPENHO DOS FISCAIS")
print("="*80)

# ========================================
# AN√ÅLISE 13: DESEMPENHO GERAL DOS FISCAIS
# ========================================

query_fiscais = """
SELECT 
    matricula_fiscal,
    nome_fiscal,
    
    qtde_protocolos_fiscal,
    qtde_empresas_distintas,
    anos_atuacao,
    
    qtde_cancelamentos_efetivos,
    taxa_efetividade_perc,
    taxa_reativacao_perc,
    
    ROUND(media_dias_processamento, 1) AS dias_processamento_medio,
    
    ROUND(media_protocolos_por_empresa, 2) AS media_protocolos_empresa,
    qtde_casos_reincidentes,
    
    qtde_individuais,
    qtde_massivos,
    
    total_chats,
    total_observacoes,
    perc_dedicacao_malhas,
    
    data_primeiro_protocolo,
    data_ultimo_protocolo,
    DATEDIFF(data_ultimo_protocolo, data_primeiro_protocolo) AS dias_atuacao

FROM teste.luciano_fiscal
ORDER BY qtde_protocolos_fiscal DESC
"""

df_fiscais = executar_analise("13. An√°lise de Desempenho dos Fiscais", query_fiscais, salvar=True)

# ========================================
# AN√ÅLISE 14: RANKING FISCAIS - EFETIVIDADE
# ========================================

query_ranking_fiscais = """
SELECT 
    ROW_NUMBER() OVER (ORDER BY taxa_efetividade_perc DESC, qtde_protocolos_fiscal DESC) AS ranking,
    matricula_fiscal,
    nome_fiscal,
    qtde_protocolos_fiscal,
    qtde_cancelamentos_efetivos,
    taxa_efetividade_perc,
    taxa_reativacao_perc,
    ROUND(media_dias_processamento, 1) AS dias_processamento,
    qtde_casos_reincidentes

FROM teste.luciano_fiscal
WHERE qtde_protocolos_fiscal >= 10
ORDER BY taxa_efetividade_perc DESC, qtde_protocolos_fiscal DESC
LIMIT 15
"""

df_ranking_fiscais = executar_analise("14. Ranking Fiscais - Maior Efetividade (Top 15)", 
                                       query_ranking_fiscais, salvar=True)

print("\n‚úÖ Se√ß√£o 5 conclu√≠da!")

In [None]:
# ===================================================================
# C√âLULA 7: AN√ÅLISES CRUZADAS E REINCID√äNCIA
# ===================================================================

print("\n" + "="*80)
print("üìä SE√á√ÉO 6: AN√ÅLISES CRUZADAS E REINCID√äNCIA")
print("="*80)

# ========================================
# AN√ÅLISE 15: MATRIZ RISCO x CR√âDITO
# ========================================

query_matriz_risco_credito = """
SELECT 
    s.classificacao_risco_final,
    
    SUM(CASE WHEN c.saldo_credor_atual <= 0 THEN 1 ELSE 0 END) AS sem_saldo,
    SUM(CASE WHEN c.saldo_credor_atual > 0 AND c.saldo_credor_atual <= 100000 THEN 1 ELSE 0 END) AS ate_100k,
    SUM(CASE WHEN c.saldo_credor_atual > 100000 AND c.saldo_credor_atual <= 1000000 THEN 1 ELSE 0 END) AS de_100k_a_1M,
    SUM(CASE WHEN c.saldo_credor_atual > 1000000 THEN 1 ELSE 0 END) AS acima_1M,
    
    COUNT(*) AS total,
    SUM(c.saldo_credor_atual) AS saldo_total,
    ROUND(AVG(c.saldo_credor_atual), 2) AS saldo_medio

FROM teste.luciano_scores s
LEFT JOIN teste.luciano_credito c ON s.cnpj = c.cnpj
GROUP BY s.classificacao_risco_final
ORDER BY 
    CASE s.classificacao_risco_final
        WHEN 'CR√çTICO' THEN 1
        WHEN 'ALTO' THEN 2
        WHEN 'M√âDIO' THEN 3
        WHEN 'BAIXO' THEN 4
    END
"""

df_matriz_risco_credito = executar_analise("15. Matriz Risco x Cr√©dito", 
                                            query_matriz_risco_credito, salvar=True)

# ========================================
# AN√ÅLISE 16: AN√ÅLISE DE REINCID√äNCIA
# ========================================

query_reincidencia = """
SELECT 
    CASE 
        WHEN m.total_protocolos = 1 THEN '1 - √önico protocolo'
        WHEN m.total_protocolos = 2 THEN '2 - Dois protocolos'
        WHEN m.total_protocolos BETWEEN 3 AND 5 THEN '3 - De 3 a 5 protocolos'
        WHEN m.total_protocolos BETWEEN 6 AND 10 THEN '4 - De 6 a 10 protocolos'
        ELSE '5 - Mais de 10 protocolos'
    END AS faixa_protocolos,
    
    COUNT(*) AS qtde_empresas,
    ROUND(COUNT(*) * 100.0 / SUM(COUNT(*)) OVER(), 2) AS perc_total,
    
    SUM(m.total_protocolos) AS total_protocolos_grupo,
    
    ROUND(AVG(m.taxa_permanencia_cancelamento_perc), 2) AS taxa_permanencia_media,
    ROUND(AVG(m.taxa_reativacao_perc), 2) AS taxa_reativacao_media,
    
    SUM(CASE WHEN s.classificacao_risco_final IN ('CR√çTICO', 'ALTO') THEN 1 ELSE 0 END) AS alto_risco,
    ROUND(SUM(CASE WHEN s.classificacao_risco_final IN ('CR√çTICO', 'ALTO') THEN 1 ELSE 0 END) * 100.0 / COUNT(*), 2) AS perc_alto_risco,
    
    SUM(s.flag_atualmente_cancelada) AS ainda_canceladas,
    
    SUM(s.saldo_credor_atual) AS saldo_total,
    ROUND(AVG(s.saldo_credor_atual), 2) AS saldo_medio

FROM teste.luciano_scores s
JOIN teste.luciano_metricas m ON s.cnpj = m.cnpj
GROUP BY 
    CASE 
        WHEN m.total_protocolos = 1 THEN '1 - √önico protocolo'
        WHEN m.total_protocolos = 2 THEN '2 - Dois protocolos'
        WHEN m.total_protocolos BETWEEN 3 AND 5 THEN '3 - De 3 a 5 protocolos'
        WHEN m.total_protocolos BETWEEN 6 AND 10 THEN '4 - De 6 a 10 protocolos'
        ELSE '5 - Mais de 10 protocolos'
    END
ORDER BY faixa_protocolos
"""

df_reincidencia = executar_analise("16. An√°lise de Reincid√™ncia", query_reincidencia, salvar=True)

print("\n‚úÖ Se√ß√£o 6 conclu√≠da!")

In [None]:
# ===================================================================
# C√âLULA 8: ALERTAS CR√çTICOS E PADR√ïES DE CANCELAMENTO
# ===================================================================

print("\n" + "="*80)
print("üìä SE√á√ÉO 7: ALERTAS CR√çTICOS E PADR√ïES")
print("="*80)

# ========================================
# AN√ÅLISE 17: ALERTAS CR√çTICOS - CASOS PRIORIT√ÅRIOS
# ========================================

query_alertas_criticos = """
SELECT 
    s.cnpj,
    s.nome_contribuinte,
    s.classificacao_risco_final,
    s.score_total,
    s.ranking_fiscalizacao,
    
    CONCAT(
        CASE WHEN s.classificacao_risco_final = 'CR√çTICO' THEN '[RISCO_CR√çTICO] ' ELSE '' END,
        CASE WHEN c.saldo_credor_atual > 1000000 AND s.flag_atualmente_cancelada = 1 THEN '[SALDO>1M_CANCELADA] ' ELSE '' END,
        CASE WHEN i.qtde_indicios_graves >= 3 THEN '[IND√çCIOS_GRAVES>=3] ' ELSE '' END,
        CASE WHEN m.total_protocolos >= 5 THEN '[MUITO_REINCIDENTE] ' ELSE '' END,
        CASE WHEN c.flag_crescimento_anormal_saldo = 1 THEN '[CRESCIMENTO_ANORMAL] ' ELSE '' END,
        CASE WHEN c.perc_valores_iguais_12m >= 80 THEN '[VALORES_REPETIDOS] ' ELSE '' END
    ) AS alertas,
    
    m.total_protocolos,
    s.flag_atualmente_cancelada,
    c.saldo_credor_atual,
    i.qtde_indicios,
    i.qtde_indicios_graves,
    
    m.data_primeiro_cancelamento,
    m.data_ultimo_cancelamento

FROM teste.luciano_scores s
JOIN teste.luciano_metricas m ON s.cnpj = m.cnpj
LEFT JOIN teste.luciano_credito c ON s.cnpj = c.cnpj
LEFT JOIN teste.luciano_indicios i ON s.cnpj = i.cnpj
WHERE 
    s.classificacao_risco_final = 'CR√çTICO'
    OR (c.saldo_credor_atual > 1000000 AND s.flag_atualmente_cancelada = 1)
    OR i.qtde_indicios_graves >= 3
    OR (m.total_protocolos >= 5 AND s.flag_atualmente_cancelada = 1)
ORDER BY s.score_total DESC, c.saldo_credor_atual DESC
LIMIT 50
"""

df_alertas_criticos = executar_analise("17. Alertas Cr√≠ticos - Casos Priorit√°rios (Top 50)", 
                                        query_alertas_criticos, salvar=True)

# ========================================
# AN√ÅLISE 18: PADR√ïES DE CANCELAMENTO
# ========================================

query_padroes_cancelamento = """
SELECT 
    m.padrao_cancelamento_predominante,
    
    COUNT(*) AS qtde_empresas,
    ROUND(COUNT(*) * 100.0 / SUM(COUNT(*)) OVER(), 2) AS perc_total,
    
    SUM(m.total_protocolos) AS total_protocolos,
    ROUND(AVG(m.total_protocolos), 2) AS media_protocolos,
    
    SUM(m.qtde_automaticos) AS total_automaticos,
    SUM(m.qtde_manuais) AS total_manuais,
    
    ROUND(AVG(m.taxa_permanencia_cancelamento_perc), 2) AS taxa_permanencia,
    ROUND(AVG(m.taxa_reativacao_perc), 2) AS taxa_reativacao,
    
    SUM(CASE WHEN s.classificacao_risco_final IN ('CR√çTICO', 'ALTO') THEN 1 ELSE 0 END) AS alto_risco,
    
    SUM(m.qtde_ainda_cancelada) AS ainda_canceladas,
    SUM(m.flag_empresa_reincidente) AS reincidentes

FROM teste.luciano_metricas m
JOIN teste.luciano_scores s ON m.cnpj = s.cnpj
GROUP BY m.padrao_cancelamento_predominante
"""

df_padroes = executar_analise("18. An√°lise de Padr√µes de Cancelamento", 
                               query_padroes_cancelamento, salvar=True)

# ========================================
# AN√ÅLISE 19: EFETIVIDADE DO CANCELAMENTO
# ========================================

query_efetividade = """
SELECT 
    m.efetividade_cancelamento,
    
    COUNT(*) AS qtde_empresas,
    ROUND(COUNT(*) * 100.0 / SUM(COUNT(*)) OVER(), 2) AS perc_total,
    
    SUM(m.total_protocolos) AS total_protocolos,
    
    ROUND(AVG(m.taxa_permanencia_cancelamento_perc), 2) AS taxa_permanencia_media,
    ROUND(AVG(m.dias_medios_reativacao), 0) AS dias_medios_reativacao,
    
    SUM(m.flag_empresa_reincidente) AS reincidentes,
    ROUND(SUM(m.flag_empresa_reincidente) * 100.0 / COUNT(*), 2) AS perc_reincidentes,
    
    ROUND(AVG(s.score_total), 2) AS score_medio,
    
    SUM(c.saldo_credor_atual) AS saldo_total,
    ROUND(AVG(c.saldo_credor_atual), 2) AS saldo_medio

FROM teste.luciano_metricas m
JOIN teste.luciano_scores s ON m.cnpj = s.cnpj
LEFT JOIN teste.luciano_credito c ON m.cnpj = c.cnpj
GROUP BY m.efetividade_cancelamento
ORDER BY 
    CASE m.efetividade_cancelamento
        WHEN 'EFETIVO' THEN 1
        WHEN 'PARCIALMENTE_EFETIVO' THEN 2
        WHEN 'POUCO_EFETIVO' THEN 3
        WHEN 'INEFETIVO' THEN 4
    END
"""

df_efetividade = executar_analise("19. An√°lise de Efetividade do Cancelamento", 
                                   query_efetividade, salvar=True)

print("\n‚úÖ Se√ß√£o 7 conclu√≠da!")

In [None]:
# ===================================================================
# C√âLULA 9: COMPARATIVOS E AN√ÅLISES POR TIPO
# ===================================================================

print("\n" + "="*80)
print("üìä SE√á√ÉO 8: COMPARATIVOS E AN√ÅLISES POR TIPO")
print("="*80)

# ========================================
# AN√ÅLISE 20: COMPARATIVO AUTOM√ÅTICO vs MANUAL
# ========================================

query_comparativo = """
SELECT 
    'AUTOM√ÅTICO' AS tipo_cancelamento,
    COUNT(DISTINCT b.cnpj) AS empresas_afetadas,
    COUNT(*) AS total_eventos,
    SUM(b.flag_ainda_cancelada) AS ainda_canceladas,
    ROUND(SUM(b.flag_ainda_cancelada) * 100.0 / COUNT(*), 2) AS taxa_permanencia,
    SUM(b.flag_reativada) AS reativadas,
    ROUND(AVG(b.dias_desde_cancelamento), 0) AS dias_medios
FROM teste.luciano_base b
WHERE b.flag_cancelamento_automatico = 1

UNION ALL

SELECT 
    'MANUAL' AS tipo_cancelamento,
    COUNT(DISTINCT b.cnpj) AS empresas_afetadas,
    COUNT(*) AS total_eventos,
    SUM(b.flag_ainda_cancelada) AS ainda_canceladas,
    ROUND(SUM(b.flag_ainda_cancelada) * 100.0 / COUNT(*), 2) AS taxa_permanencia,
    SUM(b.flag_reativada) AS reativadas,
    ROUND(AVG(b.dias_desde_cancelamento), 0) AS dias_medios
FROM teste.luciano_base b
WHERE b.flag_cancelamento_automatico = 0
"""

df_comparativo = executar_analise("20. Comparativo Autom√°tico vs Manual", 
                                   query_comparativo, salvar=True)

# ========================================
# AN√ÅLISE 21: AN√ÅLISE POR TIPO DE CONTRIBUINTE
# ========================================

query_tipo_contribuinte = """
SELECT 
    m.tipo_contribuinte,
    
    COUNT(DISTINCT m.cnpj) AS qtde_empresas,
    SUM(m.total_protocolos) AS total_protocolos,
    ROUND(AVG(m.total_protocolos), 2) AS media_protocolos,
    
    SUM(CASE WHEN s.classificacao_risco_final IN ('CR√çTICO', 'ALTO') THEN 1 ELSE 0 END) AS alto_risco,
    ROUND(SUM(CASE WHEN s.classificacao_risco_final IN ('CR√çTICO', 'ALTO') THEN 1 ELSE 0 END) * 100.0 / COUNT(*), 2) AS perc_alto_risco,
    
    ROUND(AVG(s.score_total), 2) AS score_medio,
    
    SUM(m.qtde_ainda_cancelada) AS ainda_canceladas,
    ROUND(AVG(m.taxa_permanencia_cancelamento_perc), 2) AS taxa_permanencia,
    
    SUM(c.saldo_credor_atual) AS saldo_total,
    ROUND(AVG(c.saldo_credor_atual), 2) AS saldo_medio

FROM teste.luciano_metricas m
JOIN teste.luciano_scores s ON m.cnpj = s.cnpj
LEFT JOIN teste.luciano_credito c ON m.cnpj = c.cnpj
WHERE m.tipo_contribuinte IS NOT NULL
GROUP BY m.tipo_contribuinte
ORDER BY qtde_empresas DESC
"""

df_tipo_contrib = executar_analise("21. An√°lise por Tipo de Contribuinte", 
                                    query_tipo_contribuinte, salvar=True)

# ========================================
# AN√ÅLISE 22: AN√ÅLISE POR REGIME DE APURA√á√ÉO
# ========================================

query_regime = """
SELECT 
    m.regime_apuracao,
    
    COUNT(DISTINCT m.cnpj) AS qtde_empresas,
    SUM(m.total_protocolos) AS total_protocolos,
    
    SUM(CASE WHEN s.classificacao_risco_final = 'CR√çTICO' THEN 1 ELSE 0 END) AS risco_critico,
    SUM(CASE WHEN s.classificacao_risco_final = 'ALTO' THEN 1 ELSE 0 END) AS risco_alto,
    
    ROUND(AVG(s.score_total), 2) AS score_medio,
    
    SUM(c.saldo_credor_atual) AS saldo_total,
    SUM(c.vl_credito_presumido_60m) AS credito_presumido_total,
    
    SUM(i.qtde_indicios) AS total_indicios

FROM teste.luciano_metricas m
JOIN teste.luciano_scores s ON m.cnpj = s.cnpj
LEFT JOIN teste.luciano_credito c ON m.cnpj = c.cnpj
LEFT JOIN teste.luciano_indicios i ON m.cnpj = i.cnpj
WHERE m.regime_apuracao IS NOT NULL
GROUP BY m.regime_apuracao
ORDER BY qtde_empresas DESC
"""

df_regime = executar_analise("22. An√°lise por Regime de Apura√ß√£o", query_regime, salvar=True)

print("\n‚úÖ Se√ß√£o 8 conclu√≠da!")

In [None]:
# ===================================================================
# C√âLULA 10: TOP 100 E SUM√ÅRIOS FINAIS
# ===================================================================

print("\n" + "="*80)
print("üìä SE√á√ÉO 9: TOP 100 E SUM√ÅRIOS FINAIS")
print("="*80)

# ========================================
# AN√ÅLISE 23: TOP 100 COMPLETO
# ========================================

query_top100 = """
SELECT 
    ranking_fiscalizacao,
    cnpj,
    nome_contribuinte,
    razao_social,
    cd_cnae,
    municipio,
    uf,
    
    classificacao_risco_final,
    score_total,
    score_comportamento,
    score_credito,
    score_indicios,
    
    total_protocolos,
    taxa_permanencia_cancelamento_perc,
    classificacao_frequencia,
    efetividade_cancelamento,
    
    flag_atualmente_cancelada,
    flag_empresa_reincidente,
    
    saldo_credor_atual,
    vl_credito_60m,
    
    qtde_indicios,
    qtde_indicios_graves,
    classificacao_risco_indicios,
    
    flag_saldo_alto_cancelada,
    flag_suspeita_valores_repetidos,
    flag_tem_indicios_graves,
    
    padrao_cancelamento_predominante,
    data_primeiro_cancelamento,
    data_ultimo_cancelamento

FROM teste.luciano_top100
ORDER BY ranking_fiscalizacao
"""

df_top100 = executar_analise("23. Top 100 - Prioridade de Fiscaliza√ß√£o", 
                              query_top100, salvar=True)

# ========================================
# AN√ÅLISE 24: SUM√ÅRIO ESTAT√çSTICO FINAL
# ========================================

query_sumario = """
SELECT 
    'VOLUME' AS categoria,
    'Total de Empresas' AS metrica,
    CAST(COUNT(DISTINCT cnpj) AS STRING) AS valor
FROM teste.luciano_scores

UNION ALL

SELECT 'VOLUME', 'Total de Protocolos', CAST(SUM(total_protocolos) AS STRING)
FROM teste.luciano_scores

UNION ALL

SELECT 'VOLUME', 'M√©dia Protocolos/Empresa', CAST(ROUND(AVG(total_protocolos), 2) AS STRING)
FROM teste.luciano_scores

UNION ALL

SELECT 'RISCO', 'Empresas Cr√≠tico', CONCAT(CAST(SUM(CASE WHEN classificacao_risco_final = 'CR√çTICO' THEN 1 ELSE 0 END) AS STRING), ' (', CAST(ROUND(SUM(CASE WHEN classificacao_risco_final = 'CR√çTICO' THEN 1 ELSE 0 END) * 100.0 / COUNT(*), 1) AS STRING), '%)')
FROM teste.luciano_scores

UNION ALL

SELECT 'RISCO', 'Empresas Alto', CONCAT(CAST(SUM(CASE WHEN classificacao_risco_final = 'ALTO' THEN 1 ELSE 0 END) AS STRING), ' (', CAST(ROUND(SUM(CASE WHEN classificacao_risco_final = 'ALTO' THEN 1 ELSE 0 END) * 100.0 / COUNT(*), 1) AS STRING), '%)')
FROM teste.luciano_scores

UNION ALL

SELECT 'SCORE', 'Score M√©dio', CAST(ROUND(AVG(score_total), 2) AS STRING)
FROM teste.luciano_scores

UNION ALL

SELECT 'SCORE', 'Score M√°ximo', CAST(ROUND(MAX(score_total), 2) AS STRING)
FROM teste.luciano_scores

UNION ALL

SELECT 'STATUS', 'Ainda Canceladas', CONCAT(CAST(SUM(flag_atualmente_cancelada) AS STRING), ' (', CAST(ROUND(SUM(flag_atualmente_cancelada) * 100.0 / COUNT(*), 1) AS STRING), '%)')
FROM teste.luciano_scores

UNION ALL

SELECT 'STATUS', 'Reincidentes', CONCAT(CAST(SUM(flag_empresa_reincidente) AS STRING), ' (', CAST(ROUND(SUM(flag_empresa_reincidente) * 100.0 / COUNT(*), 1) AS STRING), '%)')
FROM teste.luciano_scores

UNION ALL

SELECT 'CR√âDITO', 'Saldo Credor Total', CONCAT('R$ ', CAST(ROUND(SUM(saldo_credor_atual), 2) AS STRING))
FROM teste.luciano_scores

UNION ALL

SELECT 'CR√âDITO', 'Saldo Credor M√©dio', CONCAT('R$ ', CAST(ROUND(AVG(saldo_credor_atual), 2) AS STRING))
FROM teste.luciano_scores

UNION ALL

SELECT 'IND√çCIOS', 'Empresas com Ind√≠cios', CONCAT(CAST(SUM(flag_tem_indicios) AS STRING), ' (', CAST(ROUND(SUM(flag_tem_indicios) * 100.0 / COUNT(*), 1) AS STRING), '%)')
FROM teste.luciano_scores

UNION ALL

SELECT 'IND√çCIOS', 'Total de Ind√≠cios', CAST(SUM(qtde_indicios) AS STRING)
FROM teste.luciano_scores

UNION ALL

SELECT 'ALERTAS', 'Saldo Alto + Cancelada', CAST(SUM(flag_saldo_alto_cancelada) AS STRING)
FROM teste.luciano_scores

UNION ALL

SELECT 'ALERTAS', 'Valores Suspeitos', CAST(SUM(flag_suspeita_valores_repetidos) AS STRING)
FROM teste.luciano_scores
"""

df_sumario = executar_analise("24. Sum√°rio Estat√≠stico Final", query_sumario, salvar=True)

# ========================================
# AN√ÅLISE 25: EXPORTA√á√ÉO COMPLETA
# ========================================

query_exportacao = """
SELECT 
    s.ranking_fiscalizacao,
    s.cnpj,
    s.nome_contribuinte,
    m.razao_social,
    m.cd_cnae,
    m.descricao_cnae,
    m.municipio,
    m.uf,
    m.gerencia_regional,
    m.tipo_contribuinte,
    m.regime_apuracao,
    m.grupo_economico,
    
    s.classificacao_risco_final,
    s.score_total,
    s.score_comportamento,
    s.score_credito,
    s.score_indicios,
    
    s.total_protocolos,
    m.anos_distintos_cancelamento,
    m.qtde_usuarios_distintos,
    m.qtde_automaticos,
    m.qtde_manuais,
    m.padrao_cancelamento_predominante,
    
    s.taxa_permanencia_cancelamento_perc,
    s.taxa_reativacao_perc,
    s.classificacao_frequencia,
    s.classificacao_persistencia,
    s.efetividade_cancelamento,
    
    s.flag_empresa_reincidente,
    s.flag_atualmente_cancelada,
    
    c.saldo_credor_atual,
    c.vl_credito_12m,
    c.vl_credito_60m,
    c.vl_credito_presumido_12m,
    c.vl_credito_presumido_60m,
    c.perc_valores_iguais_12m,
    c.variacao_saldo_perc_60m,
    c.flag_saldo_alto_cancelada,
    c.flag_suspeita_valores_repetidos,
    c.flag_saldo_sem_declaracao,
    c.flag_crescimento_anormal_saldo,
    
    i.qtde_indicios,
    i.qtde_tipos_indicios_distintos,
    i.soma_scores_indicios,
    i.qtde_indicios_graves,
    i.classificacao_risco_indicios,
    i.qtde_indicios_docs_fiscais,
    i.qtde_indicios_relacionamentos,
    i.qtde_indicios_declaracoes,
    i.qtde_indicios_cadastro,
    
    m.data_primeiro_cancelamento,
    m.data_ultimo_cancelamento,
    m.dias_entre_primeiro_ultimo,
    m.media_dias_desde_cancelamento,
    m.lista_fiscais_envolvidos

FROM teste.luciano_scores s
JOIN teste.luciano_metricas m ON s.cnpj = m.cnpj
LEFT JOIN teste.luciano_credito c ON s.cnpj = c.cnpj
LEFT JOIN teste.luciano_indicios i ON s.cnpj = i.cnpj
ORDER BY s.ranking_fiscalizacao
LIMIT 1000
"""

df_exportacao = executar_analise("25. Exporta√ß√£o Completa (Top 1000)", 
                                  query_exportacao, salvar=True)

print("\n‚úÖ Se√ß√£o 9 conclu√≠da!")

In [None]:
# ===================================================================
# C√âLULA 11: RELAT√ìRIO CONSOLIDADO FINAL
# ===================================================================

from datetime import datetime
import builtins  # Importar builtins para usar sum() nativo

print("="*80)
print("üìã RELAT√ìRIO CONSOLIDADO FINAL")
print("="*80)

print(f"\nüìä RESUMO DO RELAT√ìRIO:")
print(f"   Total de an√°lises executadas: {len(resultados)}")
print(f"   Data/Hora de conclus√£o: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")

# CORRE√á√ÉO: Usar builtins.sum() ao inv√©s de sum() para evitar conflito com PySpark
total_registros = builtins.sum(len(df) for df in resultados.values())
print(f"   Total de registros processados: {total_registros:,}")

print("\nüìÇ ARQUIVOS SALVOS:")
for nome_analise, df_resultado in resultados.items():
    arquivo = f"/home/claude/{nome_analise}.csv"
    print(f"   ‚úÖ {arquivo}")

print("\n" + "="*80)
print("üìä RESUMO DAS PRINCIPAIS AN√ÅLISES")
print("="*80)

# An√°lise 1: Distribui√ß√£o de Risco
if 'ranking_completo' in resultados:
    df_ranking = resultados['ranking_completo']
    print("\nüéØ DISTRIBUI√á√ÉO DE RISCO:")
    distribuicao = df_ranking['classificacao_risco'].value_counts().sort_index()
    for risco, qtd in distribuicao.items():
        pct = (qtd / len(df_ranking)) * 100
        print(f"   {risco:12s}: {qtd:>7,} empresas ({pct:>5.2f}%)")

# An√°lise 2: Top 10 Empresas de Maior Risco
if 'ranking_top1k_detalhado' in resultados:
    df_top = resultados['ranking_top1k_detalhado']
    print("\nüîù TOP 10 EMPRESAS DE MAIOR RISCO:")
    top_10 = df_top.head(10)
    for idx, row in top_10.iterrows():
        print(f"   {idx+1:2d}. {row['cnpj']} - Prob: {row['prob_alto_risco']:.4f} - Ind√≠cios: {row['qtd_indicios']}")

# An√°lise 3: Empresas com Ind√≠cios
if 'ranking_completo' in resultados:
    df_ranking = resultados['ranking_completo']
    empresas_com_indicios = (df_ranking['qtd_indicios'] > 0).sum()
    pct_indicios = (empresas_com_indicios / len(df_ranking)) * 100
    print(f"\n‚ö†Ô∏è  EMPRESAS COM IND√çCIOS NEAF:")
    print(f"   Total: {empresas_com_indicios:,} ({pct_indicios:.2f}%)")
    print(f"   M√©dia de ind√≠cios: {df_ranking[df_ranking['qtd_indicios'] > 0]['qtd_indicios'].mean():.2f}")

# An√°lise 4: Empresas com Receita
if 'ranking_completo' in resultados:
    df_ranking = resultados['ranking_completo']
    empresas_com_receita = (df_ranking['receita_12m'] > 0).sum()
    pct_receita = (empresas_com_receita / len(df_ranking)) * 100
    print(f"\nüí∞ EMPRESAS COM RECEITA DECLARADA:")
    print(f"   Total: {empresas_com_receita:,} ({pct_receita:.2f}%)")
    receita_total = df_ranking['receita_12m'].sum()
    print(f"   Receita total (12m): R$ {receita_total:,.2f}")

# An√°lise 5: Distribui√ß√£o por UF (Top 5)
if 'ranking_completo' in resultados:
    df_ranking = resultados['ranking_completo']
    print(f"\nüó∫Ô∏è  TOP 5 UFs COM MAIS EMPRESAS:")
    top_ufs = df_ranking['cd_uf'].value_counts().head(5)
    for uf, qtd in top_ufs.items():
        pct = (qtd / len(df_ranking)) * 100
        print(f"   {uf}: {qtd:>7,} empresas ({pct:>5.2f}%)")

# An√°lise 6: Distribui√ß√£o por Regime Tribut√°rio
if 'ranking_completo' in resultados:
    df_ranking = resultados['ranking_completo']
    print(f"\nüìã DISTRIBUI√á√ÉO POR REGIME TRIBUT√ÅRIO:")
    regimes = df_ranking['nm_reg_apuracao'].value_counts()
    for regime, qtd in regimes.items():
        pct = (qtd / len(df_ranking)) * 100
        print(f"   {regime:20s}: {qtd:>7,} ({pct:>5.2f}%)")

print("\n" + "="*80)
print("üìä ESTAT√çSTICAS DE FEATURES")
print("="*80)

if 'ranking_completo' in resultados:
    df_ranking = resultados['ranking_completo']
    
    print("\nüìà FEATURES NUM√âRICAS (Estat√≠sticas Descritivas):")
    features_numericas = [
        'qtd_indicios', 'receita_12m', 'receita_maxima', 
        'qtd_meses_declarados', 'qt_socios_ativos', 'qt_vinculos_ativos'
    ]
    
    for feat in features_numericas:
        if feat in df_ranking.columns:
            stats = df_ranking[feat].describe()
            print(f"\n   {feat}:")
            print(f"      M√©dia: {stats['mean']:,.2f}")
            print(f"      Mediana: {stats['50%']:,.2f}")
            print(f"      M√°ximo: {stats['max']:,.2f}")
            print(f"      N√£o-zeros: {(df_ranking[feat] > 0).sum():,}")

print("\n" + "="*80)
print("üéØ RECOMENDA√á√ïES DE PRIORIZA√á√ÉO")
print("="*80)

if 'ranking_completo' in resultados:
    df_ranking = resultados['ranking_completo']
    
    muito_alto = (df_ranking['classificacao_risco'] == 'MUITO ALTO').sum()
    alto = (df_ranking['classificacao_risco'] == 'ALTO').sum()
    medio = (df_ranking['classificacao_risco'] == 'M√âDIO').sum()
    
    print(f"""
   üî¥ PRIORIDADE 1 - MUITO ALTO RISCO ({muito_alto:,} empresas):
      ‚Ä¢ Probabilidade >90% de perfil irregular
      ‚Ä¢ A√ß√£o: Fiscaliza√ß√£o imediata
      ‚Ä¢ Meta: 100% analisadas em 30 dias
   
   üü† PRIORIDADE 2 - ALTO RISCO ({alto:,} empresas):
      ‚Ä¢ Probabilidade 70-90%
      ‚Ä¢ A√ß√£o: Monitoramento intensivo + fiscaliza√ß√£o programada
      ‚Ä¢ Meta: 50% analisadas em 60 dias
   
   üü° PRIORIDADE 3 - M√âDIO RISCO ({medio:,} empresas):
      ‚Ä¢ Probabilidade 50-70%
      ‚Ä¢ A√ß√£o: Incluir em plano de fiscaliza√ß√£o trimestral
      ‚Ä¢ Meta: 20% analisadas em 90 dias
    """)

print("\n" + "="*80)
print("üìÅ LOCALIZA√á√ÉO DOS ARQUIVOS")
print("="*80)

print("""
   RANKINGS:
   ‚Ä¢ /home/claude/ranking_completo.csv (400k empresas)
   ‚Ä¢ /home/claude/ranking_top10k.csv
   ‚Ä¢ /home/claude/ranking_top1k_detalhado.csv
   
   AN√ÅLISES:
   ‚Ä¢ /home/claude/analise_por_faixa_risco.csv
   ‚Ä¢ /home/claude/perfis_alto_risco_top100.csv
   ‚Ä¢ /home/claude/feature_importance.csv
   
   MODELOS:
   ‚Ä¢ /home/claude/rf_model.pkl
   ‚Ä¢ /home/claude/le_regime.pkl
   ‚Ä¢ /home/claude/le_uf.pkl
   ‚Ä¢ /home/claude/all_features.pkl
""")

print("\n" + "="*80)
print("‚úÖ RELAT√ìRIO FINAL CONCLU√çDO COM SUCESSO!")
print("="*80)

print(f"\nüí° PR√ìXIMOS PASSOS SUGERIDOS:")
print("""
   1. Validar amostra do TOP 100 manualmente
   2. Ajustar limiares se necess√°rio
   3. Integrar rankings com sistema de fiscaliza√ß√£o
   4. Estabelecer rotina de atualiza√ß√£o mensal
   5. Criar alertas autom√°ticos para novos casos de alto risco
""")

print(f"\nüìÖ Relat√≥rio gerado em: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
print("="*80)

In [None]:
# ===================================================================
# C√âLULA 12: RESUMO EXECUTIVO VISUAL (B√îNUS)
# ===================================================================

import plotly.graph_objects as go
from plotly.subplots import make_subplots

print("\n" + "="*80)
print("üìä GERANDO VISUALIZA√á√ïES EXECUTIVAS")
print("="*80)

# Criar dashboard visual com principais m√©tricas
fig = make_subplots(
    rows=3, cols=2,
    subplot_titles=(
        'Distribui√ß√£o de Risco',
        'Evolu√ß√£o Temporal (Anual)',
        'Top 10 Munic√≠pios',
        'Distribui√ß√£o de Cr√©dito',
        'Padr√µes de Cancelamento',
        'Efetividade'
    ),
    specs=[[{'type': 'pie'}, {'type': 'bar'}],
           [{'type': 'bar'}, {'type': 'bar'}],
           [{'type': 'bar'}, {'type': 'bar'}]]
)

# 1. Distribui√ß√£o de Risco (Pizza)
if len(df_risco) > 0:
    fig.add_trace(
        go.Pie(labels=df_risco['classificacao_risco_final'],
               values=df_risco['qtde_empresas'],
               hole=0.3),
        row=1, col=1
    )

# 2. Evolu√ß√£o Temporal (Barras)
if len(df_temporal_anual) > 0:
    fig.add_trace(
        go.Bar(x=df_temporal_anual['ano_cancelamento'],
               y=df_temporal_anual['total_protocolos'],
               marker_color='#1f77b4'),
        row=1, col=2
    )

# 3. Top 10 Munic√≠pios
if len(df_municipio) > 0:
    top10_mun = df_municipio.head(10)
    fig.add_trace(
        go.Bar(y=top10_mun['municipio'],
               x=top10_mun['qtde_empresas'],
               orientation='h',
               marker_color='#2ca02c'),
        row=2, col=1
    )

# 4. Distribui√ß√£o de Cr√©dito
if len(df_credito) > 0:
    fig.add_trace(
        go.Bar(x=df_credito['faixa_saldo'],
               y=df_credito['qtde_empresas'],
               marker_color='#ff7f0e'),
        row=2, col=2
    )

# 5. Padr√µes de Cancelamento
if len(df_padroes) > 0:
    fig.add_trace(
        go.Bar(x=df_padroes['padrao_cancelamento_predominante'],
               y=df_padroes['qtde_empresas'],
               marker_color='#d62728'),
        row=3, col=1
    )

# 6. Efetividade
if len(df_efetividade) > 0:
    fig.add_trace(
        go.Bar(x=df_efetividade['efetividade_cancelamento'],
               y=df_efetividade['qtde_empresas'],
               marker_color='#9467bd'),
        row=3, col=2
    )

fig.update_layout(
    height=1200,
    showlegend=False,
    title_text="<b>PROJETO LUCIANO - DASHBOARD EXECUTIVO</b>",
    title_font_size=20
)

fig.show()

# Salvar dashboard como HTML
html_path = "DASHBOARD_EXECUTIVO.html"
fig.write_html(html_path)
print(f"\n‚úÖ Dashboard salvo: {html_path}")

print("\nüéâ VISUALIZA√á√ïES CONCLU√çDAS!")