In [None]:
import sys
sys.path.append("/home/tsevero/notebooks/SAT_BIG_DATA/data-pipeline/batch/poc")
sys.path.append("/home/tsevero/notebooks/SAT_BIG_DATA/data-pipeline/batch/plugins")
sys.path.append("/home/tsevero/notebooks/SAT_BIG_DATA/data-pipeline/batch/dags")

#Import libs python
from pyspark.sql.types import *
from pyspark.sql.functions import *
from datetime import date

#Import libs internas
from utils import spark_utils_session as utils

from hooks.hdfs.hdfs_helper import HdfsHelper
from jobs.job_base_config import BaseETLJobClass

import poc_helper
poc_helper.load_env("PROD")

In [None]:
def get_session(profile: str, dynamic_allocation_enabled: bool = True) -> utils.DBASparkAppSession:
    """Generates DBASparkAppSession."""
    
    app_name = "tsevero_bcadastro"
    
    spark_builder = (utils.DBASparkAppSession
                     .builder
                     .setAppName(app_name)
                     .usingProcessProfile(profile)
                    )
    
    if dynamic_allocation_enabled:
        spark_builder.autoResourceManagement()

    return spark_builder.build()

session = get_session(profile='efd_t2')

In [None]:
session.sparkSession.sql("SHOW DATABASES").show(truncate=False)

In [None]:
# ============================================================================
# CONFIGURA√á√ÉO INICIAL - PROJETO SIMPLES NACIONAL E GRUPOS ECON√îMICOS
# ============================================================================

import sys
import warnings
from datetime import datetime, date
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import plotly.express as px
import plotly.graph_objects as go
from plotly.subplots import make_subplots

# PySpark imports com aliases para evitar conflitos
from pyspark.sql.functions import (
    col as spark_col, 
    sum as spark_sum, 
    avg as spark_avg,
    count as spark_count,
    when as spark_when,
    desc as spark_desc,
    asc as spark_asc,
    round as spark_round,
    concat as spark_concat,
    lit as spark_lit,
    max as spark_max,
    min as spark_min,
    stddev as spark_stddev,
    countDistinct as spark_countDistinct
)
from pyspark.sql.types import DoubleType, IntegerType

# Configura√ß√µes de visualiza√ß√£o
warnings.filterwarnings('ignore')
plt.style.use('seaborn-v0_8-darkgrid')
sns.set_palette("husl")
plt.rcParams['figure.figsize'] = (16, 8)
plt.rcParams['font.size'] = 11

pd.set_option('display.max_columns', None)
pd.set_option('display.precision', 2)

# Acesso ao Spark
spark = session.sparkSession

print("=" * 80)
print("üîç SISTEMA DE AN√ÅLISE - SIMPLES NACIONAL E GRUPOS ECON√îMICOS")
print("=" * 80)
print(f"Sess√£o Spark: {spark.sparkContext.appName}")
print(f"Vers√£o Spark: {spark.version}")
print(f"Iniciado em: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
print("=" * 80)

In [None]:
# ============================================================================
# PANORAMA GERAL DO SISTEMA
# ============================================================================

print("\n" + "=" * 80)
print("üìä VERIFICA√á√ÉO DE TABELAS E ESTAT√çSTICAS GERAIS")
print("=" * 80)

# Lista de tabelas do projeto
tabelas_projeto = [
    'gessimples.feitoza_base_cnpj_completo',
    'gessimples.feitoza_base_socios_consolidado',
    'gessimples.feitoza_base_periodos_sn',
    'gessimples.feitoza_pgdas_consolidado',
    'gessimples.feitoza_rba_12_meses',
    'gessimples.feitoza_grupos_identificados',
    'gessimples.feitoza_rba_grupo',
    'gessimples.feitoza_fato_gerador',
    'gessimples.feitoza_resumo_grupos_irregulares',
    'gessimples.feitoza_lista_acao_fiscal'
]

print("\nüìã Verificando exist√™ncia e tamanho das tabelas:\n")
tabelas_info = []
for tabela in tabelas_projeto:
    try:
        count = spark.sql(f"SELECT COUNT(*) as cnt FROM {tabela}").collect()[0]['cnt']
        print(f"‚úÖ {tabela:50s} ‚Üí {count:>12,} registros")
        tabelas_info.append({'tabela': tabela, 'registros': count, 'status': 'OK'})
    except Exception as e:
        print(f"‚ùå {tabela:50s} ‚Üí N√ÉO ENCONTRADA")
        tabelas_info.append({'tabela': tabela, 'registros': 0, 'status': 'ERRO'})

# Converter para DataFrame para an√°lise
df_tabelas_info = pd.DataFrame(tabelas_info)
print(f"\n‚úÖ Total de tabelas verificadas: {len(df_tabelas_info)}")
print(f"‚úÖ Tabelas dispon√≠veis: {(df_tabelas_info['status'] == 'OK').sum()}")
print(f"‚ùå Tabelas com erro: {(df_tabelas_info['status'] == 'ERRO').sum()}")

In [None]:
# ============================================================================
# ESTAT√çSTICAS GERAIS DO SISTEMA
# ============================================================================

print("\n" + "=" * 80)
print("üìà ESTAT√çSTICAS GERAIS")
print("=" * 80)

# Criar view com estat√≠sticas consolidadas
spark.sql("""
CREATE OR REPLACE TEMPORARY VIEW vw_stats_geral AS
SELECT 
    COUNT(DISTINCT bc.cnpj_raiz) AS total_empresas,
    COUNT(DISTINCT CASE WHEN bc.uf = 'SC' THEN bc.cnpj_raiz END) AS empresas_sc,
    COUNT(DISTINCT CASE WHEN psn.cnpj_raiz IS NOT NULL THEN bc.cnpj_raiz END) AS empresas_sn,
    COUNT(DISTINCT sc.cpf_socio) AS total_socios,
    COUNT(DISTINCT CASE WHEN gi.num_grupo IS NOT NULL THEN sc.cpf_socio END) AS socios_em_grupos,
    COUNT(DISTINCT gi.num_grupo) AS total_grupos,
    COUNT(DISTINCT CASE WHEN gi.tipo_grupo = 'GRUPO_SC_PURO' THEN gi.num_grupo END) AS grupos_sc_puro,
    COUNT(DISTINCT rgi.num_grupo) AS grupos_irregulares,
    CAST(COALESCE(SUM(rba.vl_rba_12_meses), 0) AS DOUBLE) AS rba_total,
    CAST(COALESCE(AVG(rba.vl_rba_12_meses), 0) AS DOUBLE) AS rba_media,
    CAST(COALESCE(MAX(rba.vl_rba_12_meses), 0) AS DOUBLE) AS rba_maxima
FROM gessimples.feitoza_base_cnpj_completo bc
LEFT JOIN gessimples.feitoza_base_socios_consolidado sc ON bc.cnpj_raiz = sc.cnpj_raiz
LEFT JOIN gessimples.feitoza_base_periodos_sn psn ON bc.cnpj_raiz = psn.cnpj_raiz
LEFT JOIN gessimples.feitoza_grupos_identificados gi ON sc.cpf_socio = gi.cpf_socio
LEFT JOIN gessimples.feitoza_resumo_grupos_irregulares rgi ON gi.num_grupo = rgi.num_grupo
LEFT JOIN gessimples.feitoza_rba_12_meses rba ON bc.cnpj_raiz = rba.cnpj_raiz
""")

# Verificar tamanho
total_stats = spark.sql("SELECT COUNT(*) as cnt FROM vw_stats_geral").collect()[0]['cnt']
print(f"\nüìä Total de registros nas estat√≠sticas: {total_stats}")

if total_stats > 0:
    df_stats = spark.sql("SELECT * FROM vw_stats_geral").toPandas()
    
    s = df_stats.iloc[0]
    
    print("\nüéØ M√âTRICAS PRINCIPAIS:")
    print(f"  ‚Ä¢ Total de Empresas Analisadas: {int(s['total_empresas']):,}")
    print(f"  ‚Ä¢ Empresas em SC: {int(s['empresas_sc']):,}")
    print(f"  ‚Ä¢ Empresas do Simples Nacional: {int(s['empresas_sn']):,}")
    print(f"  ‚Ä¢ Total de S√≥cios √önicos: {int(s['total_socios']):,}")
    print(f"  ‚Ä¢ S√≥cios em Grupos Econ√¥micos: {int(s['socios_em_grupos']):,}")
    
    print(f"\nüè¢ GRUPOS ECON√îMICOS:")
    print(f"  ‚Ä¢ Total de Grupos Identificados: {int(s['total_grupos']):,}")
    print(f"  ‚Ä¢ Grupos SC Puro: {int(s['grupos_sc_puro']):,}")
    print(f"  ‚Ä¢ Grupos Irregulares: {int(s['grupos_irregulares']):,}")
    
    print(f"\nüí∞ RECEITA BRUTA ACUMULADA (RBA):")
    print(f"  ‚Ä¢ RBA Total: R$ {s['rba_total']:,.2f}")
    print(f"  ‚Ä¢ RBA M√©dia por Empresa: R$ {s['rba_media']:,.2f}")
    print(f"  ‚Ä¢ RBA M√°xima: R$ {s['rba_maxima']:,.2f}")
    
    # Percentuais
    if s['total_empresas'] > 0:
        perc_sn = (s['empresas_sn'] / s['total_empresas']) * 100
        perc_irregulares = (s['grupos_irregulares'] / s['total_grupos']) * 100 if s['total_grupos'] > 0 else 0
        
        print(f"\n‚ö†Ô∏è  INDICADORES DE RISCO:")
        print(f"  ‚Ä¢ {perc_sn:.1f}% das empresas s√£o optantes do SN")
        print(f"  ‚Ä¢ {perc_irregulares:.1f}% dos grupos est√£o irregulares")
        
else:
    print("‚ö†Ô∏è Nenhum dado encontrado nas estat√≠sticas gerais")

print("\n" + "=" * 80)

In [None]:
# ============================================================================
# DISTRIBUI√á√ÉO DE EMPRESAS POR SITUA√á√ÉO CADASTRAL
# ============================================================================

print("\n" + "=" * 80)
print("üìä DISTRIBUI√á√ÉO POR SITUA√á√ÉO CADASTRAL")
print("=" * 80)

# Criar view
spark.sql("""
CREATE OR REPLACE TEMPORARY VIEW vw_situacao_cadastral AS
SELECT 
    situacao_cadastral_desc,
    COUNT(DISTINCT cnpj_raiz) AS qtd_empresas,
    COUNT(DISTINCT CASE WHEN uf = 'SC' THEN cnpj_raiz END) AS qtd_sc
FROM gessimples.feitoza_base_cnpj_completo
GROUP BY situacao_cadastral_desc
ORDER BY qtd_empresas DESC
""")

# Verificar tamanho
total_sit = spark.sql("SELECT COUNT(*) as cnt FROM vw_situacao_cadastral").collect()[0]['cnt']

if total_sit > 0 and total_sit <= 20:
    df_situacao = spark.sql("SELECT * FROM vw_situacao_cadastral").toPandas()
    
    print(f"\nüìã Distribui√ß√£o por Situa√ß√£o Cadastral:\n")
    for idx, row in df_situacao.iterrows():
        perc_sc = (row['qtd_sc'] / row['qtd_empresas']) * 100 if row['qtd_empresas'] > 0 else 0
        print(f"  {row['situacao_cadastral_desc']:15s} ‚Üí {int(row['qtd_empresas']):>7,} empresas "
              f"({int(row['qtd_sc']):>6,} SC - {perc_sc:.1f}%)")
    
    # Gr√°fico de Pizza
    fig_situacao = go.Figure(data=[go.Pie(
        labels=df_situacao['situacao_cadastral_desc'],
        values=df_situacao['qtd_empresas'],
        hole=0.4,
        marker=dict(colors=['#2ca02c', '#ffdd70', '#ff7f0e', '#d62728', '#9467bd']),
        textinfo='label+percent+value',
        texttemplate='<b>%{label}</b><br>%{value:,}<br>%{percent:.1%}'
    )])
    
    fig_situacao.update_layout(
        title='<b>Distribui√ß√£o de Empresas por Situa√ß√£o Cadastral</b>',
        height=500,
        showlegend=True,
        font=dict(size=12)
    )
    
    fig_situacao.show()
    
else:
    print(f"‚ö†Ô∏è Dados de situa√ß√£o cadastral n√£o dispon√≠veis ou muito fragmentados ({total_sit} categorias)")

In [None]:
# ============================================================================
# AN√ÅLISE GEOGR√ÅFICA - DISTRIBUI√á√ÉO POR UF
# ============================================================================

print("\n" + "=" * 80)
print("üó∫Ô∏è  AN√ÅLISE GEOGR√ÅFICA")
print("=" * 80)

# ‚úÖ CORRE√á√ÉO: Usar apenas √∫ltimo per√≠odo dispon√≠vel + aliases expl√≠citos
spark.sql("""
CREATE OR REPLACE TEMPORARY VIEW vw_geografica_uf AS
SELECT 
    bc.uf,
    COUNT(DISTINCT bc.cnpj_raiz) AS total_empresas,
    COUNT(DISTINCT CASE WHEN psn.cnpj_raiz IS NOT NULL THEN bc.cnpj_raiz END) AS empresas_sn,
    COUNT(DISTINCT CASE WHEN bc.situacao_cadastral_desc = 'ATIVA' THEN bc.cnpj_raiz END) AS empresas_ativas,
    CAST(COALESCE(AVG(rba_ult.vl_rba_12_meses), 0) AS DOUBLE) AS rba_media,
    CAST(COALESCE(SUM(rba_ult.vl_rba_12_meses), 0) AS DOUBLE) AS rba_total,
    COUNT(DISTINCT rba_ult.cnpj_raiz) AS empresas_com_rba
FROM gessimples.feitoza_base_cnpj_completo bc
LEFT JOIN gessimples.feitoza_base_periodos_sn psn 
    ON bc.cnpj_raiz = psn.cnpj_raiz
LEFT JOIN (
    -- ‚úÖ Subquery: pegar APENAS √∫ltimo per√≠odo de cada empresa
    SELECT 
        cnpj_raiz,
        vl_rba_12_meses,
        periodo_apuracao
    FROM (
        SELECT 
            cnpj_raiz,
            vl_rba_12_meses,
            periodo_apuracao,
            ROW_NUMBER() OVER (PARTITION BY cnpj_raiz ORDER BY periodo_apuracao DESC) AS rn
        FROM gessimples.feitoza_rba_12_meses
        WHERE periodo_apuracao >= 202409  -- √öltimos meses com dados completos
    ) ranked
    WHERE rn = 1
) rba_ult ON bc.cnpj_raiz = rba_ult.cnpj_raiz
WHERE bc.uf IS NOT NULL 
  AND bc.uf != ''
  AND LENGTH(bc.uf) = 2  -- ‚úÖ Apenas siglas v√°lidas (exclui "EX")
GROUP BY bc.uf
ORDER BY total_empresas DESC
""")

total_uf = spark.sql("SELECT COUNT(*) as cnt FROM vw_geografica_uf").collect()[0]['cnt']
print(f"\nüìä Total de UFs encontradas: {total_uf}")

# ‚úÖ CORRE√á√ÉO: Aceitar at√© 30 UFs (mais flex√≠vel)
if total_uf > 0 and total_uf <= 30:
    df_uf = spark.sql("SELECT * FROM vw_geografica_uf").toPandas()
    
    print(f"\nüèõÔ∏è DISTRIBUI√á√ÉO POR ESTADO (Top 15):\n")
    for idx, row in df_uf.head(15).iterrows():
        perc_sn = (row['empresas_sn'] / row['total_empresas']) * 100 if row['total_empresas'] > 0 else 0
        perc_ativas = (row['empresas_ativas'] / row['total_empresas']) * 100 if row['total_empresas'] > 0 else 0
        print(f"  {row['uf']:2s} ‚Üí {int(row['total_empresas']):>7,} empresas | "
              f"SN: {int(row['empresas_sn']):>6,} ({perc_sn:>5.1f}%) | "
              f"Ativas: {perc_ativas:>5.1f}% | "
              f"RBA M√©dia: R$ {row['rba_media']:>12,.2f}")
    
    # ========================================================================
    # GR√ÅFICO 1: Barras Empilhadas - SN vs Outros Regimes
    # ========================================================================
    
    df_uf_top15 = df_uf.head(15).sort_values('total_empresas', ascending=True)
    
    fig_uf = go.Figure()
    
    fig_uf.add_trace(go.Bar(
        y=df_uf_top15['uf'],
        x=df_uf_top15['empresas_sn'],
        name='Simples Nacional',
        orientation='h',
        marker=dict(color='#2ca02c'),
        text=df_uf_top15['empresas_sn'].apply(lambda x: f'{int(x):,}' if x > 0 else ''),
        textposition='inside',
        hovertemplate='<b>SN:</b> %{x:,}<extra></extra>'
    ))
    
    fig_uf.add_trace(go.Bar(
        y=df_uf_top15['uf'],
        x=df_uf_top15['total_empresas'] - df_uf_top15['empresas_sn'],
        name='Outros Regimes',
        orientation='h',
        marker=dict(color='#1f77b4'),
        text=(df_uf_top15['total_empresas'] - df_uf_top15['empresas_sn']).apply(lambda x: f'{int(x):,}' if x > 0 else ''),
        textposition='inside',
        hovertemplate='<b>Outros:</b> %{x:,}<extra></extra>'
    ))
    
    fig_uf.update_layout(
        title='<b>Top 15 Estados - Empresas por Regime Tribut√°rio</b>',
        xaxis_title='Quantidade de Empresas',
        yaxis_title='UF',
        barmode='stack',
        height=600,
        showlegend=True,
        hovermode='y unified'
    )
    
    fig_uf.show()
    
    # ========================================================================
    # GR√ÅFICO 2: Barras - RBA Total por UF
    # ========================================================================
    
    df_uf_rba = df_uf[df_uf['rba_total'] > 0].head(15).sort_values('rba_total', ascending=True)
    
    if len(df_uf_rba) > 0:
        print(f"\nüí∞ TOP 15 ESTADOS POR RBA TOTAL:\n")
        for idx, row in df_uf_rba.sort_values('rba_total', ascending=False).iterrows():
            print(f"  {row['uf']:2s} ‚Üí RBA Total: R$ {row['rba_total']:>15,.2f} | "
                  f"Empresas com RBA: {int(row['empresas_com_rba']):>6,}")
        
        fig_rba_uf = go.Figure(go.Bar(
            y=df_uf_rba['uf'],
            x=df_uf_rba['rba_total'] / 1e9,
            orientation='h',
            marker=dict(
                color=df_uf_rba['rba_media'],
                colorscale='Viridis',
                showscale=True,
                colorbar=dict(title="RBA M√©dia<br>(R$)")
            ),
            text=df_uf_rba['rba_total'].apply(lambda x: f'R$ {x/1e9:.1f}B'),
            textposition='outside',
            hovertemplate='<b>%{y}</b><br>RBA Total: R$ %{x:.2f}B<br>Empresas com RBA: %{customdata:,}<extra></extra>',
            customdata=df_uf_rba['empresas_com_rba']
        ))
        
        fig_rba_uf.update_layout(
            title='<b>RBA Total por Estado (Bilh√µes R$)</b>',
            xaxis_title='RBA Total (Bilh√µes R$)',
            yaxis_title='UF',
            height=600
        )
        
        fig_rba_uf.show()
    
    # ========================================================================
    # GR√ÅFICO 3: Pizza - Empresas Ativas vs Inativas
    # ========================================================================
    
    total_ativas = df_uf['empresas_ativas'].sum()
    total_geral = df_uf['total_empresas'].sum()
    total_inativas = total_geral - total_ativas
    
    fig_situacao = go.Figure(data=[go.Pie(
        labels=['Empresas Ativas', 'Empresas Inativas'],
        values=[total_ativas, total_inativas],
        hole=0.4,
        marker=dict(colors=['#2ca02c', '#d62728']),
        textinfo='label+percent+value',
        texttemplate='<b>%{label}</b><br>%{value:,}<br>%{percent:.1%}'
    )])
    
    fig_situacao.update_layout(
        title='<b>Distribui√ß√£o: Empresas Ativas vs Inativas</b>',
        height=500
    )
    
    fig_situacao.show()
    
    # ========================================================================
    # GR√ÅFICO 4: Mapa de Calor - RBA M√©dia
    # ========================================================================
    
    # Filtrar apenas UFs do Brasil (excluir EX se existir)
    df_uf_mapa = df_uf[df_uf['uf'] != 'EX']
    
    if len(df_uf_mapa) > 0:
        fig_mapa = go.Figure(data=go.Choropleth(
            locations=df_uf_mapa['uf'],
            z=df_uf_mapa['rba_media'],
            locationmode='USA-states',
            colorscale='Viridis',
            text=df_uf_mapa['uf'],
            marker_line_color='white',
            colorbar_title="RBA M√©dia<br>(R$)",
            hovertemplate='<b>%{text}</b><br>RBA M√©dia: R$ %{z:,.2f}<extra></extra>'
        ))
        
        fig_mapa.update_layout(
            title='<b>RBA M√©dia por Estado</b>',
            geo=dict(
                scope='south america',
                showlakes=True,
                lakecolor='rgb(255, 255, 255)'
            ),
            height=600
        )
        
        fig_mapa.show()
    
    # ========================================================================
    # ESTAT√çSTICAS COMPLEMENTARES
    # ========================================================================
    
    print(f"\nüìä RESUMO GERAL:")
    print(f"  ‚Ä¢ Total de Estados: {len(df_uf)}")
    print(f"  ‚Ä¢ Total de Empresas: {int(df_uf['total_empresas'].sum()):,}")
    print(f"  ‚Ä¢ Empresas do SN: {int(df_uf['empresas_sn'].sum()):,} ({(df_uf['empresas_sn'].sum() / df_uf['total_empresas'].sum() * 100):.1f}%)")
    print(f"  ‚Ä¢ Empresas Ativas: {int(total_ativas):,} ({(total_ativas / total_geral * 100):.1f}%)")
    print(f"  ‚Ä¢ RBA Total: R$ {df_uf['rba_total'].sum():,.2f}")
    print(f"  ‚Ä¢ Estado Dominante: {df_uf.iloc[0]['uf']} com {int(df_uf.iloc[0]['total_empresas']):,} empresas")

else:
    print(f"‚ö†Ô∏è Dados geogr√°ficos n√£o dispon√≠veis ({total_uf} UFs encontradas)")
    print(f"üí° Esperado: at√© 30 UFs")

print("\n" + "=" * 80)
print("‚úÖ An√°lise geogr√°fica conclu√≠da!")
print("=" * 80)

In [None]:
# ============================================================================
# AN√ÅLISE DE GRUPOS ECON√îMICOS
# ============================================================================

print("\n" + "=" * 80)
print("üè¢ AN√ÅLISE DE GRUPOS ECON√îMICOS")
print("=" * 80)

# Estat√≠sticas de grupos
spark.sql("""
CREATE OR REPLACE TEMPORARY VIEW vw_stats_grupos AS
SELECT 
    tipo_grupo,
    COUNT(DISTINCT num_grupo) AS qtd_grupos,
    COUNT(DISTINCT cnpj_raiz) AS qtd_empresas_total,
    CAST(AVG(qtd_empresas_grupo) AS DOUBLE) AS media_empresas_por_grupo,
    CAST(MAX(qtd_empresas_grupo) AS INT) AS max_empresas_grupo,
    COUNT(DISTINCT cpf_socio) AS qtd_socios_unicos
FROM gessimples.feitoza_grupos_identificados
GROUP BY tipo_grupo
ORDER BY qtd_grupos DESC
""")

total_grupos_stats = spark.sql("SELECT COUNT(*) as cnt FROM vw_stats_grupos").collect()[0]['cnt']

if total_grupos_stats > 0:
    df_grupos_stats = spark.sql("SELECT * FROM vw_stats_grupos").toPandas()
    
    print(f"\nüìä ESTAT√çSTICAS DE GRUPOS ECON√îMICOS:\n")
    for idx, row in df_grupos_stats.iterrows():
        print(f"  {row['tipo_grupo']:20s} ‚Üí "
              f"{int(row['qtd_grupos']):>5,} grupos | "
              f"{int(row['qtd_empresas_total']):>6,} empresas | "
              f"M√©dia: {row['media_empresas_por_grupo']:.1f} empresas/grupo")
    
    # Gr√°fico - Distribui√ß√£o de Grupos
    fig_grupos = make_subplots(
        rows=1, cols=2,
        subplot_titles=('Quantidade de Grupos por Tipo', 'Total de Empresas por Tipo'),
        specs=[[{'type': 'bar'}, {'type': 'bar'}]]
    )
    
    fig_grupos.add_trace(
        go.Bar(
            x=df_grupos_stats['tipo_grupo'],
            y=df_grupos_stats['qtd_grupos'],
            marker=dict(color='#1f77b4'),
            text=df_grupos_stats['qtd_grupos'],
            textposition='outside',
            name='Grupos'
        ),
        row=1, col=1
    )
    
    fig_grupos.add_trace(
        go.Bar(
            x=df_grupos_stats['tipo_grupo'],
            y=df_grupos_stats['qtd_empresas_total'],
            marker=dict(color='#ff7f0e'),
            text=df_grupos_stats['qtd_empresas_total'],
            textposition='outside',
            name='Empresas'
        ),
        row=1, col=2
    )
    
    fig_grupos.update_layout(
        title='<b>Distribui√ß√£o de Grupos Econ√¥micos</b>',
        height=500,
        showlegend=False
    )
    
    fig_grupos.show()

# Top grupos por quantidade de empresas
spark.sql("""
CREATE OR REPLACE TEMPORARY VIEW vw_top_grupos AS
SELECT 
    num_grupo,
    cpf_socio,
    tipo_grupo,
    MAX(qtd_empresas_grupo) AS qtd_empresas,
    MAX(qtd_empresas_sc) AS qtd_sc,
    COUNT(DISTINCT cnpj_raiz) AS empresas_listadas
FROM gessimples.feitoza_grupos_identificados
GROUP BY num_grupo, cpf_socio, tipo_grupo
ORDER BY qtd_empresas DESC
LIMIT 20
""")

total_top_grupos = spark.sql("SELECT COUNT(*) as cnt FROM vw_top_grupos").collect()[0]['cnt']

if total_top_grupos > 0:
    df_top_grupos = spark.sql("SELECT * FROM vw_top_grupos").toPandas()
    
    print(f"\nüîù TOP 20 MAIORES GRUPOS ECON√îMICOS:\n")
    for idx, row in df_top_grupos.iterrows():
        cpf_mask = row['cpf_socio'][:3] + '.***.***-' + row['cpf_socio'][-2:] if len(str(row['cpf_socio'])) == 11 else '***'
        print(f"{idx+1:2d}. Grupo {int(row['num_grupo']):>5} | CPF: {cpf_mask} | "
              f"{int(row['qtd_empresas']):>3} empresas | Tipo: {row['tipo_grupo']}")
    
    # Gr√°fico
    fig_top_grupos = go.Figure(go.Bar(
        y=df_top_grupos['num_grupo'].astype(str),
        x=df_top_grupos['qtd_empresas'],
        orientation='h',
        marker=dict(
            color=df_top_grupos['qtd_empresas'],
            colorscale='Viridis',
            showscale=True,
            colorbar=dict(title="Qtd<br>Empresas")
        ),
        text=df_top_grupos['qtd_empresas'],
        textposition='outside',
        hovertemplate='<b>Grupo %{y}</b><br>Empresas: %{x}<br>Tipo: %{customdata}<extra></extra>',
        customdata=df_top_grupos['tipo_grupo']
    ))
    
    fig_top_grupos.update_layout(
        title='<b>Top 20 Maiores Grupos Econ√¥micos</b>',
        xaxis_title='Quantidade de Empresas',
        yaxis_title='N√∫mero do Grupo',
        height=700
    )
    
    fig_top_grupos.show()

In [None]:
# ============================================================================
# AN√ÅLISE DE RECEITA BRUTA ACUMULADA (RBA)
# ============================================================================

print("\n" + "=" * 80)
print("üí∞ AN√ÅLISE DE RECEITA BRUTA ACUMULADA (RBA)")
print("=" * 80)

# Estat√≠sticas RBA por status
spark.sql("""
CREATE OR REPLACE TEMPORARY VIEW vw_rba_stats AS
SELECT 
    status_rba,
    COUNT(DISTINCT cnpj_raiz) AS qtd_empresas,
    CAST(COALESCE(SUM(vl_rba_12_meses), 0) AS DOUBLE) AS rba_total,
    CAST(COALESCE(AVG(vl_rba_12_meses), 0) AS DOUBLE) AS rba_media,
    CAST(COALESCE(MIN(vl_rba_12_meses), 0) AS DOUBLE) AS rba_minima,
    CAST(COALESCE(MAX(vl_rba_12_meses), 0) AS DOUBLE) AS rba_maxima
FROM gessimples.feitoza_rba_12_meses
WHERE vl_rba_12_meses > 0
GROUP BY status_rba
ORDER BY 
    CASE status_rba
        WHEN 'COMPLETO' THEN 1
        WHEN 'PARCIAL' THEN 2
        ELSE 3
    END
""")

total_rba_stats = spark.sql("SELECT COUNT(*) as cnt FROM vw_rba_stats").collect()[0]['cnt']

if total_rba_stats > 0:
    df_rba_stats = spark.sql("SELECT * FROM vw_rba_stats").toPandas()
    
    print(f"\nüìä ESTAT√çSTICAS DE RBA POR STATUS:\n")
    for idx, row in df_rba_stats.iterrows():
        print(f"  {row['status_rba']:15s} ‚Üí {int(row['qtd_empresas']):>7,} empresas | "
              f"Total: R$ {row['rba_total']:>15,.2f} | "
              f"M√©dia: R$ {row['rba_media']:>12,.2f}")
    
    # Gr√°fico - Distribui√ß√£o RBA
    fig_rba_dist = make_subplots(
        rows=1, cols=2,
        subplot_titles=('Quantidade de Empresas', 'Volume Total de RBA'),
        specs=[[{'type': 'bar'}, {'type': 'bar'}]]
    )
    
    colors = ['#2ca02c', '#ffdd70', '#ff7f0e']
    
    fig_rba_dist.add_trace(
        go.Bar(
            x=df_rba_stats['status_rba'],
            y=df_rba_stats['qtd_empresas'],
            marker=dict(color=colors[:len(df_rba_stats)]),
            text=df_rba_stats['qtd_empresas'].apply(lambda x: f'{int(x):,}'),
            textposition='outside'
        ),
        row=1, col=1
    )
    
    fig_rba_dist.add_trace(
        go.Bar(
            x=df_rba_stats['status_rba'],
            y=df_rba_stats['rba_total'] / 1e9,
            marker=dict(color=colors[:len(df_rba_stats)]),
            text=df_rba_stats['rba_total'].apply(lambda x: f'R$ {x/1e9:.1f}B'),
            textposition='outside'
        ),
        row=1, col=2
    )
    
    fig_rba_dist.update_yaxes(title_text="Empresas", row=1, col=1)
    fig_rba_dist.update_yaxes(title_text="RBA (Bilh√µes R$)", row=1, col=2)
    
    fig_rba_dist.update_layout(
        title='<b>Distribui√ß√£o de RBA por Status de Completude</b>',
        height=500,
        showlegend=False
    )
    
    fig_rba_dist.show()

# Distribui√ß√£o de RBA (Histograma)
spark.sql("""
CREATE OR REPLACE TEMPORARY VIEW vw_rba_distribuicao AS
SELECT 
    CASE 
        WHEN vl_rba_12_meses < 1000000 THEN '< R$ 1 MI'
        WHEN vl_rba_12_meses < 2000000 THEN 'R$ 1-2 MI'
        WHEN vl_rba_12_meses < 3600000 THEN 'R$ 2-3,6 MI'
        WHEN vl_rba_12_meses < 4800000 THEN 'R$ 3,6-4,8 MI'
        ELSE '> R$ 4,8 MI'
    END AS faixa_rba,
    COUNT(DISTINCT cnpj_raiz) AS qtd_empresas,
    CAST(COALESCE(SUM(vl_rba_12_meses), 0) AS DOUBLE) AS rba_total
FROM gessimples.feitoza_rba_12_meses
WHERE status_rba = 'COMPLETO'
GROUP BY 
    CASE 
        WHEN vl_rba_12_meses < 1000000 THEN '< R$ 1 MI'
        WHEN vl_rba_12_meses < 2000000 THEN 'R$ 1-2 MI'
        WHEN vl_rba_12_meses < 3600000 THEN 'R$ 2-3,6 MI'
        WHEN vl_rba_12_meses < 4800000 THEN 'R$ 3,6-4,8 MI'
        ELSE '> R$ 4,8 MI'
    END
ORDER BY 
    CASE 
        WHEN faixa_rba = '< R$ 1 MI' THEN 1
        WHEN faixa_rba = 'R$ 1-2 MI' THEN 2
        WHEN faixa_rba = 'R$ 2-3,6 MI' THEN 3
        WHEN faixa_rba = 'R$ 3,6-4,8 MI' THEN 4
        ELSE 5
    END
""")

total_dist = spark.sql("SELECT COUNT(*) as cnt FROM vw_rba_distribuicao").collect()[0]['cnt']

if total_dist > 0:
    df_rba_dist = spark.sql("SELECT * FROM vw_rba_distribuicao").toPandas()
    
    print(f"\nüìä DISTRIBUI√á√ÉO POR FAIXA DE RBA:\n")
    for idx, row in df_rba_dist.iterrows():
        perc = (row['qtd_empresas'] / df_rba_dist['qtd_empresas'].sum()) * 100
        print(f"  {row['faixa_rba']:15s} ‚Üí {int(row['qtd_empresas']):>7,} empresas ({perc:>5.1f}%)")
    
    # Gr√°fico
    fig_faixas = go.Figure(data=[
        go.Bar(
            x=df_rba_dist['faixa_rba'],
            y=df_rba_dist['qtd_empresas'],
            marker=dict(
                color=df_rba_dist['qtd_empresas'],
                colorscale='Blues',
                showscale=True,
                colorbar=dict(title="Empresas")
            ),
            text=df_rba_dist['qtd_empresas'].apply(lambda x: f'{int(x):,}'),
            textposition='outside'
        )
    ])
    
    fig_faixas.update_layout(
        title='<b>Distribui√ß√£o de Empresas por Faixa de RBA (Dados Completos)</b>',
        xaxis_title='Faixa de RBA',
        yaxis_title='Quantidade de Empresas',
        height=500
    )
    
    fig_faixas.show()

In [None]:
# ============================================================================
# AN√ÅLISE DE FATO GERADOR - ULTRAPASSAGEM DO LIMITE
# ============================================================================

print("\n" + "=" * 80)
print("‚ö†Ô∏è  AN√ÅLISE DE FATO GERADOR - IRREGULARIDADES")
print("=" * 80)

# Estat√≠sticas de fato gerador
spark.sql("""
CREATE OR REPLACE TEMPORARY VIEW vw_fato_gerador_stats AS
SELECT 
    classificacao_risco_fiscal,
    gravidade_ultrapassagem,
    COUNT(DISTINCT num_grupo) AS qtd_grupos,
    COUNT(*) AS qtd_periodos_irregulares,
    CAST(COALESCE(SUM(vl_excedente), 0) AS DOUBLE) AS excedente_total,
    CAST(COALESCE(AVG(vl_excedente), 0) AS DOUBLE) AS excedente_medio,
    CAST(COALESCE(AVG(percentual_excesso), 0) AS DOUBLE) AS perc_excesso_medio
FROM gessimples.feitoza_fato_gerador
WHERE flag_ultrapassou_limite = 1
GROUP BY classificacao_risco_fiscal, gravidade_ultrapassagem
ORDER BY 
    CASE classificacao_risco_fiscal
        WHEN 'RISCO_CRITICO' THEN 1
        WHEN 'RISCO_ALTO' THEN 2
        WHEN 'RISCO_MEDIO' THEN 3
        ELSE 4
    END,
    CASE gravidade_ultrapassagem
        WHEN 'GRAVISSIMO' THEN 1
        WHEN 'GRAVE' THEN 2
        WHEN 'MODERADO' THEN 3
        ELSE 4
    END
""")

total_fg_stats = spark.sql("SELECT COUNT(*) as cnt FROM vw_fato_gerador_stats").collect()[0]['cnt']

if total_fg_stats > 0 and total_fg_stats <= 50:
    df_fg_stats = spark.sql("SELECT * FROM vw_fato_gerador_stats").toPandas()
    
    print(f"\nüìä IRREGULARIDADES POR RISCO E GRAVIDADE:\n")
    for idx, row in df_fg_stats.iterrows():
        print(f"  {row['classificacao_risco_fiscal']:15s} | {row['gravidade_ultrapassagem']:12s} ‚Üí "
              f"{int(row['qtd_grupos']):>5,} grupos | "
              f"{int(row['qtd_periodos_irregulares']):>6,} per√≠odos | "
              f"Excedente: R$ {row['excedente_total']:>15,.2f}")
    
    # Heatmap - Risco x Gravidade
    pivot_fg = df_fg_stats.pivot_table(
        index='gravidade_ultrapassagem',
        columns='classificacao_risco_fiscal',
        values='qtd_grupos',
        fill_value=0
    )
    
    fig_heatmap_fg = go.Figure(data=go.Heatmap(
        z=pivot_fg.values,
        x=pivot_fg.columns,
        y=pivot_fg.index,
        colorscale='Reds',
        text=pivot_fg.values,
        texttemplate='%{text}',
        textfont={"size": 14},
        colorbar=dict(title="Qtd<br>Grupos")
    ))
    
    fig_heatmap_fg.update_layout(
        title='<b>Matriz de Risco: Grupos Irregulares por Classifica√ß√£o e Gravidade</b>',
        xaxis_title='Classifica√ß√£o de Risco',
        yaxis_title='Gravidade da Ultrapassagem',
        height=500
    )
    
    fig_heatmap_fg.show()

# Resumo dos grupos irregulares
spark.sql("""
CREATE OR REPLACE TEMPORARY VIEW vw_resumo_irregulares AS
SELECT 
    classificacao_risco_maximo,
    COUNT(DISTINCT num_grupo) AS qtd_grupos,
    CAST(COALESCE(SUM(vl_excedente_maximo), 0) AS DOUBLE) AS excedente_total,
    CAST(COALESCE(AVG(vl_rba_maxima_grupo), 0) AS DOUBLE) AS rba_media,
    CAST(COALESCE(AVG(qtd_total_periodos_irregulares), 0) AS DOUBLE) AS media_periodos_irreg,
    SUM(flag_irregular_atualmente) AS grupos_irregulares_atualmente
FROM gessimples.feitoza_resumo_grupos_irregulares
GROUP BY classificacao_risco_maximo
ORDER BY 
    CASE classificacao_risco_maximo
        WHEN 'RISCO_CRITICO' THEN 1
        WHEN 'RISCO_ALTO' THEN 2
        WHEN 'RISCO_MEDIO' THEN 3
        ELSE 4
    END
""")

total_resumo = spark.sql("SELECT COUNT(*) as cnt FROM vw_resumo_irregulares").collect()[0]['cnt']

if total_resumo > 0:
    df_resumo_irreg = spark.sql("SELECT * FROM vw_resumo_irregulares").toPandas()
    
    print(f"\n‚ö†Ô∏è  RESUMO DOS GRUPOS IRREGULARES:\n")
    for idx, row in df_resumo_irreg.iterrows():
        print(f"  {row['classificacao_risco_maximo']:15s} ‚Üí "
              f"{int(row['qtd_grupos']):>5,} grupos | "
              f"Excedente: R$ {row['excedente_total']:>15,.2f} | "
              f"Irregulares agora: {int(row['grupos_irregulares_atualmente']):>4}")
    
    # Gr√°ficos
    fig_resumo = make_subplots(
        rows=1, cols=2,
        subplot_titles=('Grupos por Risco', 'Excedente Total'),
        specs=[[{'type': 'bar'}, {'type': 'bar'}]]
    )
    
    colors_risk = ['#8b0000', '#d62728', '#ff7f0e', '#ffdd70']
    
    fig_resumo.add_trace(
        go.Bar(
            x=df_resumo_irreg['classificacao_risco_maximo'],
            y=df_resumo_irreg['qtd_grupos'],
            marker=dict(color=colors_risk[:len(df_resumo_irreg)]),
            text=df_resumo_irreg['qtd_grupos'],
            textposition='outside'
        ),
        row=1, col=1
    )
    
    fig_resumo.add_trace(
        go.Bar(
            x=df_resumo_irreg['classificacao_risco_maximo'],
            y=df_resumo_irreg['excedente_total'] / 1e6,
            marker=dict(color=colors_risk[:len(df_resumo_irreg)]),
            text=df_resumo_irreg['excedente_total'].apply(lambda x: f'R$ {x/1e6:.1f}M'),
            textposition='outside'
        ),
        row=1, col=2
    )
    
    fig_resumo.update_yaxes(title_text="Quantidade de Grupos", row=1, col=1)
    fig_resumo.update_yaxes(title_text="Excedente (Milh√µes R$)", row=1, col=2)
    
    fig_resumo.update_layout(
        title='<b>Resumo de Grupos Irregulares por Risco</b>',
        height=500,
        showlegend=False
    )
    
    fig_resumo.show()

In [None]:
# ============================================================================
# TOP GRUPOS PARA A√á√ÉO FISCAL
# ============================================================================

print("\n" + "=" * 80)
print("üéØ TOP GRUPOS PRIORIT√ÅRIOS PARA FISCALIZA√á√ÉO")
print("=" * 80)

# Top 50 grupos para a√ß√£o fiscal
spark.sql("""
CREATE OR REPLACE TEMPORARY VIEW vw_top_fiscalizacao AS
SELECT 
    num_grupo,
    cpf_socio,
    qtd_empresas_grupo,
    lista_cnpj_raiz,
    periodo_primeira_ultrapassagem,
    periodo_ultima_ultrapassagem,
    qtd_total_periodos_irregulares,
    CAST(COALESCE(vl_excedente_maximo, 0) AS DOUBLE) AS vl_excedente_maximo,
    CAST(COALESCE(percentual_excesso_maximo, 0) AS DOUBLE) AS percentual_excesso_maximo,
    classificacao_risco_maximo,
    gravidade_maxima,
    flag_irregular_atualmente,
    CAST(COALESCE(score_priorizacao_fiscal, 0) AS DOUBLE) AS score_priorizacao,
    recomendacao_acao
FROM gessimples.feitoza_lista_acao_fiscal
ORDER BY score_priorizacao_fiscal DESC
LIMIT 50
""")

total_top_fisc = spark.sql("SELECT COUNT(*) as cnt FROM vw_top_fiscalizacao").collect()[0]['cnt']
print(f"\nüìä Total de grupos priorit√°rios: {total_top_fisc}")

if total_top_fisc > 0:
    df_top_fisc = spark.sql("SELECT * FROM vw_top_fiscalizacao").toPandas()
    
    print(f"\nüîù TOP 20 GRUPOS PARA FISCALIZA√á√ÉO IMEDIATA:\n")
    for idx, row in df_top_fisc.head(20).iterrows():
        cpf_mask = row['cpf_socio'][:3] + '.***.***-' + row['cpf_socio'][-2:] if len(str(row['cpf_socio'])) == 11 else '***'
        
        print(f"{idx+1:2d}. Grupo {int(row['num_grupo']):>5} | CPF: {cpf_mask}")
        print(f"    Score: {row['score_priorizacao']:.1f} | Risco: {row['classificacao_risco_maximo']} | "
              f"Gravidade: {row['gravidade_maxima']}")
        print(f"    Empresas: {int(row['qtd_empresas_grupo'])} | "
              f"Per√≠odos irregulares: {int(row['qtd_total_periodos_irregulares'])}")
        print(f"    Excedente: R$ {row['vl_excedente_maximo']:,.2f} ({row['percentual_excesso_maximo']:.1f}%)")
        print(f"    A√ß√£o: {row['recomendacao_acao']}")
        print()
    
    # Gr√°fico - Score de Prioriza√ß√£o
    df_top30 = df_top_fisc.head(30).sort_values('score_priorizacao', ascending=True)
    
    colors_score = ['#8b0000' if x >= 80 else '#d62728' if x >= 60 else '#ff7f0e' 
                    for x in df_top30['score_priorizacao']]
    
    fig_score = go.Figure(go.Bar(
        y=df_top30['num_grupo'].astype(str),
        x=df_top30['score_priorizacao'],
        orientation='h',
        marker=dict(color=colors_score),
        text=df_top30['score_priorizacao'].apply(lambda x: f'{x:.1f}'),
        textposition='outside',
        hovertemplate='<b>Grupo %{y}</b><br>Score: %{x:.2f}<br>Empresas: %{customdata[0]}<br>Excedente: R$ %{customdata[1]:,.2f}<extra></extra>',
        customdata=df_top30[['qtd_empresas_grupo', 'vl_excedente_maximo']].values
    ))
    
    fig_score.update_layout(
        title='<b>Score de Prioriza√ß√£o Fiscal - Top 30 Grupos</b>',
        xaxis_title='Score de Prioriza√ß√£o (0-100)',
        yaxis_title='N√∫mero do Grupo',
        height=900
    )
    
    fig_score.show()
    
    # Distribui√ß√£o por Recomenda√ß√£o de A√ß√£o
    dist_acao = df_top_fisc['recomendacao_acao'].value_counts()
    
    fig_acao = go.Figure(data=[go.Pie(
        labels=dist_acao.index,
        values=dist_acao.values,
        hole=0.4,
        marker=dict(colors=['#8b0000', '#d62728', '#ff7f0e', '#ffdd70']),
        textinfo='label+percent+value',
        texttemplate='<b>%{label}</b><br>%{value} grupos<br>%{percent:.1%}'
    )])
    
    fig_acao.update_layout(
        title='<b>Distribui√ß√£o por Recomenda√ß√£o de A√ß√£o Fiscal</b>',
        height=500
    )
    
    fig_acao.show()

In [None]:
# ============================================================================
# PREPARA√á√ÉO DE DADOS PARA MACHINE LEARNING
# ============================================================================

print("\n" + "=" * 80)
print("ü§ñ PREPARA√á√ÉO DE FEATURES PARA MACHINE LEARNING")
print("=" * 80)

# Criar dataset consolidado com features
spark.sql("""
CREATE OR REPLACE TEMPORARY VIEW vw_ml_dataset AS
SELECT 
    bc.cnpj_raiz,
    bc.uf,
    bc.situacao_cadastral_desc,
    bc.porte_empresa,
    
    -- Features de RBA (√∫ltimo per√≠odo)
    CAST(COALESCE(rba_ult.vl_rba_12_meses, 0) AS DOUBLE) AS feat_rba_12m,
    CAST(COALESCE(rba_ult.vl_receita_bruta_mensal, 0) AS DOUBLE) AS feat_receita_mensal,
    CAST(COALESCE(rba_ult.vl_icms_pago, 0) AS DOUBLE) AS feat_icms_pago,
    CAST(COALESCE(rba_ult.aliquota_efetiva_12m_perc, 0) AS DOUBLE) AS feat_aliquota_efetiva,
    CAST(COALESCE(rba_ult.qtd_meses_com_movimento, 0) AS DOUBLE) AS feat_meses_movimento,
    CAST(COALESCE(rba_ult.taxa_atividade, 0) AS DOUBLE) AS feat_taxa_atividade,
    
    -- Features de grupo
    CAST(COALESCE(gi.qtd_empresas_grupo, 1) AS DOUBLE) AS feat_qtd_empresas_grupo,
    CASE WHEN gi.tipo_grupo = 'GRUPO_SC_PURO' THEN 1 ELSE 0 END AS feat_grupo_sc_puro,
    CASE WHEN gi.flag_socio_ou_titular = 1 THEN 1 ELSE 0 END AS feat_socio_titular,
    CASE WHEN gi.flag_socio_responsavel = 1 THEN 1 ELSE 0 END AS feat_socio_responsavel,
    
    -- Features de irregularidade (TARGET)
    CASE WHEN rgi.num_grupo IS NOT NULL THEN 1 ELSE 0 END AS target_irregular,
    CASE 
        WHEN rgi.classificacao_risco_maximo = 'RISCO_CRITICO' THEN 3
        WHEN rgi.classificacao_risco_maximo = 'RISCO_ALTO' THEN 2
        WHEN rgi.classificacao_risco_maximo = 'RISCO_MEDIO' THEN 1
        ELSE 0
    END AS target_nivel_risco,
    
    CAST(COALESCE(rgi.vl_excedente_maximo, 0) AS DOUBLE) AS feat_excedente_maximo,
    CAST(COALESCE(rgi.qtd_total_periodos_irregulares, 0) AS DOUBLE) AS feat_periodos_irregulares,
    CAST(COALESCE(rgi.percentual_excesso_maximo, 0) AS DOUBLE) AS feat_perc_excesso,
    
    -- Flags
    CASE WHEN bc.situacao_cadastral_desc = 'ATIVA' THEN 1 ELSE 0 END AS flag_ativa,
    CASE WHEN psn.cnpj_raiz IS NOT NULL THEN 1 ELSE 0 END AS flag_simples_nacional,
    CASE WHEN rba_ult.cnpj_raiz IS NOT NULL THEN 1 ELSE 0 END AS flag_tem_rba

FROM gessimples.feitoza_base_cnpj_completo bc

LEFT JOIN gessimples.feitoza_base_periodos_sn psn 
    ON bc.cnpj_raiz = psn.cnpj_raiz

LEFT JOIN (
    SELECT 
        cnpj_raiz,
        vl_rba_12_meses,
        vl_receita_bruta_mensal,
        vl_icms_pago,
        aliquota_efetiva_12m_perc,
        qtd_meses_com_movimento,
        taxa_atividade
    FROM (
        SELECT 
            *,
            ROW_NUMBER() OVER (PARTITION BY cnpj_raiz ORDER BY periodo_apuracao DESC) AS rn
        FROM gessimples.feitoza_rba_12_meses
        WHERE periodo_apuracao >= 202409
          AND status_rba IN ('COMPLETO', 'PARCIAL')
    ) ranked
    WHERE rn = 1
) rba_ult ON bc.cnpj_raiz = rba_ult.cnpj_raiz

LEFT JOIN gessimples.feitoza_grupos_identificados gi 
    ON bc.cnpj_raiz = gi.cnpj_raiz

LEFT JOIN gessimples.feitoza_resumo_grupos_irregulares rgi 
    ON gi.num_grupo = rgi.num_grupo

WHERE bc.uf = 'SC'  -- Focar apenas em SC
  AND bc.situacao_cadastral_desc = 'ATIVA'  -- Apenas empresas ativas
  AND rba_ult.vl_rba_12_meses > 0  -- Com receita
""")

# Verificar tamanho
total_ml = spark.sql("SELECT COUNT(*) as cnt FROM vw_ml_dataset").collect()[0]['cnt']
print(f"\nüìä Total de registros para ML: {total_ml:,}")

if total_ml > 0:
    # Verificar se √© muito grande
    if total_ml > 200000:
        print(f"‚ö†Ô∏è  Dataset muito grande ({total_ml:,} registros)")
        print(f"üîÑ Limitando para 200.000 registros aleat√≥rios...")
        
        # Sample aleat√≥rio
        df_ml_spark = (spark.sql("SELECT * FROM vw_ml_dataset")
                       .sample(fraction=200000/total_ml, seed=42))
    else:
        df_ml_spark = spark.sql("SELECT * FROM vw_ml_dataset")
    
    # Cachear
    df_ml_spark.cache()
    
    # Converter para Pandas
    print(f"üîÑ Convertendo para Pandas...")
    df_ml = df_ml_spark.toPandas()
    
    print(f"‚úÖ Dataset carregado: {len(df_ml):,} registros √ó {len(df_ml.columns)} colunas")
    
    # An√°lise da distribui√ß√£o do target
    print(f"\nüéØ DISTRIBUI√á√ÉO DO TARGET:")
    dist_target = df_ml['target_irregular'].value_counts()
    total_records = len(df_ml)
    print(f"  ‚Ä¢ N√£o Irregulares (0): {dist_target.get(0, 0):,} ({dist_target.get(0, 0)/total_records*100:.1f}%)")
    print(f"  ‚Ä¢ Irregulares (1): {dist_target.get(1, 0):,} ({dist_target.get(1, 0)/total_records*100:.1f}%)")
    
    # Estat√≠sticas descritivas
    print(f"\nüìä ESTAT√çSTICAS DAS FEATURES PRINCIPAIS:\n")
    features_principais = ['feat_rba_12m', 'feat_receita_mensal', 'feat_qtd_empresas_grupo', 
                          'feat_excedente_maximo', 'feat_periodos_irregulares']
    
    stats_df = df_ml[features_principais].describe()
    print(stats_df.to_string())
    
    # Verificar valores faltantes
    missing_summary = df_ml.isnull().sum()
    if missing_summary.sum() > 0:
        print(f"\n‚ö†Ô∏è  VALORES FALTANTES ENCONTRADOS:")
        for col, missing_count in missing_summary[missing_summary > 0].items():
            print(f"   ‚Ä¢ {col}: {missing_count:,} ({missing_count/len(df_ml)*100:.2f}%)")
        
        # Preencher NaN com 0
        df_ml = df_ml.fillna(0)
        print(f"\n‚úÖ Valores faltantes preenchidos com 0")
    else:
        print(f"\n‚úÖ Nenhum valor faltante encontrado!")
    
    # Gr√°fico - Distribui√ß√£o do Target
    fig_target = go.Figure(data=[go.Pie(
        labels=['Regular', 'Irregular'],
        values=[dist_target.get(0, 0), dist_target.get(1, 0)],
        hole=0.4,
        marker=dict(colors=['#2ca02c', '#d62728']),
        textinfo='label+percent+value',
        texttemplate='<b>%{label}</b><br>%{value:,}<br>%{percent:.1%}'
    )])
    
    fig_target.update_layout(
        title='<b>Distribui√ß√£o do Target - Grupos Irregulares</b>',
        height=400
    )
    
    fig_target.show()
    
    print(f"\nüíæ DATASET PREPARADO:")
    print(f"   ‚Ä¢ Vari√°vel: df_ml")
    print(f"   ‚Ä¢ Shape: {df_ml.shape}")
    print(f"   ‚Ä¢ Mem√≥ria: {df_ml.memory_usage(deep=True).sum() / 1024**2:.1f} MB")
    
else:
    print("‚ùå Nenhum dado dispon√≠vel para ML")
    df_ml = None

print("\n" + "=" * 80)
print("‚úÖ PREPARA√á√ÉO PARA ML CONCLU√çDA!")
print("=" * 80)

In [None]:
# ============================================================================
# AN√ÅLISE EXPLORAT√ìRIA DAS FEATURES
# ============================================================================

print("\n" + "=" * 80)
print("üìä AN√ÅLISE EXPLORAT√ìRIA DE FEATURES")
print("=" * 80)

if df_ml is not None and len(df_ml) > 0:
    # Features num√©ricas
    features_numericas = [col for col in df_ml.columns if col.startswith('feat_')]
    
    print(f"\nüìã Total de features num√©ricas: {len(features_numericas)}")
    
    # ========================================================================
    # MATRIZ DE CORRELA√á√ÉO
    # ========================================================================
    
    print(f"\nüîó Calculando matriz de correla√ß√£o...")
    corr_matrix = df_ml[features_numericas + ['target_irregular']].corr()
    
    # Heatmap de correla√ß√£o
    fig_corr = go.Figure(data=go.Heatmap(
        z=corr_matrix.values,
        x=corr_matrix.columns,
        y=corr_matrix.columns,
        colorscale='RdBu',
        zmid=0,
        text=np.round(corr_matrix.values, 2),
        texttemplate='%{text}',
        textfont={"size": 7},
        hovertemplate='%{y} vs %{x}<br>Correla√ß√£o: %{z:.3f}<extra></extra>'
    ))
    
    fig_corr.update_layout(
        title='<b>Matriz de Correla√ß√£o entre Features</b>',
        height=800,
        width=900,
        xaxis=dict(tickangle=-45, tickfont=dict(size=8)),
        yaxis=dict(tickfont=dict(size=8))
    )
    
    fig_corr.show()
    
    # ========================================================================
    # TOP CORRELA√á√ïES COM TARGET
    # ========================================================================
    
    print(f"\nüéØ TOP 10 FEATURES MAIS CORRELACIONADAS COM TARGET:\n")
    target_corr = corr_matrix['target_irregular'].drop('target_irregular').abs().sort_values(ascending=False)
    
    for idx, (feat, corr_val) in enumerate(target_corr.head(10).items(), 1):
        corr_original = corr_matrix['target_irregular'][feat]
        print(f"  {idx:2d}. {feat:35s} ‚Üí {corr_original:+.4f}")
    
    # ‚úÖ CORRE√á√ÉO: Usar list comprehension ao inv√©s de .apply()
    top_corr = target_corr.head(10)
    
    fig_top_corr = go.Figure(go.Bar(
        y=top_corr.index,
        x=top_corr.values,
        orientation='h',
        marker=dict(
            color=top_corr.values,
            colorscale='Viridis',
            showscale=True,
            colorbar=dict(title="Correla√ß√£o<br>Absoluta")
        ),
        text=[f'{x:.3f}' for x in top_corr.values],  # ‚úÖ List comprehension
        textposition='outside'
    ))
    
    fig_top_corr.update_layout(
        title='<b>Top 10 Features com Maior Correla√ß√£o com Target (Absoluta)</b>',
        xaxis_title='Correla√ß√£o Absoluta',
        yaxis_title='Feature',
        height=500
    )
    
    fig_top_corr.show()
    
    # ========================================================================
    # DISTRIBUI√á√ÉO DAS FEATURES POR CLASSE
    # ========================================================================
    
    print(f"\nüìä Gerando distribui√ß√µes das features por classe...")
    
    features_plot = ['feat_rba_12m', 'feat_receita_mensal', 'feat_qtd_empresas_grupo',
                     'feat_excedente_maximo', 'feat_periodos_irregulares', 'feat_taxa_atividade']
    
    fig_dist = make_subplots(
        rows=3, cols=2,
        subplot_titles=[f.replace('feat_', '').replace('_', ' ').title() for f in features_plot],
        vertical_spacing=0.12,
        horizontal_spacing=0.15
    )
    
    positions = [(1,1), (1,2), (2,1), (2,2), (3,1), (3,2)]
    colors_dist = ['#2ca02c', '#d62728']
    
    for idx, feat in enumerate(features_plot):
        row, col = positions[idx]
        
        # Histograma para cada classe
        for class_val, color in zip([0, 1], colors_dist):
            data_class = df_ml[df_ml['target_irregular'] == class_val][feat]
            
            fig_dist.add_trace(
                go.Histogram(
                    x=data_class,
                    name=f"Classe {class_val}",
                    opacity=0.6,
                    marker=dict(color=color),
                    showlegend=(idx == 0),
                    nbinsx=30
                ),
                row=row, col=col
            )
    
    fig_dist.update_layout(
        title='<b>Distribui√ß√£o das Features por Classe (0=Regular, 1=Irregular)</b>',
        height=900,
        barmode='overlay',
        showlegend=True
    )
    
    fig_dist.show()
    
    # ========================================================================
    # BOXPLOTS - Features vs Target
    # ========================================================================
    
    print(f"\nüì¶ Gerando boxplots...")
    
    fig_box = make_subplots(
        rows=2, cols=3,
        subplot_titles=[f.replace('feat_', '').replace('_', ' ').title() for f in features_plot],
        vertical_spacing=0.15,
        horizontal_spacing=0.1
    )
    
    positions_box = [(1,1), (1,2), (1,3), (2,1), (2,2), (2,3)]
    
    for idx, feat in enumerate(features_plot):
        row, col = positions_box[idx]
        
        for class_val, color in zip([0, 1], colors_dist):
            data_class = df_ml[df_ml['target_irregular'] == class_val][feat]
            
            fig_box.add_trace(
                go.Box(
                    y=data_class,
                    name=f"Classe {class_val}",
                    marker=dict(color=color),
                    showlegend=(idx == 0)
                ),
                row=row, col=col
            )
    
    fig_box.update_layout(
        title='<b>Boxplots: Features por Classe</b>',
        height=700,
        showlegend=True
    )
    
    fig_box.show()
    
    # ========================================================================
    # SCATTER PLOT - RBA vs Excedente
    # ========================================================================
    
    if df_ml['feat_excedente_maximo'].sum() > 0:  # Se houver excedente
        print(f"\nüìà Gerando scatter plot: RBA vs Excedente...")
        
        # Filtrar apenas quem tem excedente
        df_scatter = df_ml[df_ml['feat_excedente_maximo'] > 0].copy()
        
        if len(df_scatter) > 0:
            fig_scatter = go.Figure()
            
            for class_val, color, name in zip([0, 1], ['#2ca02c', '#d62728'], ['Regular', 'Irregular']):
                data_class = df_scatter[df_scatter['target_irregular'] == class_val]
                
                fig_scatter.add_trace(go.Scatter(
                    x=data_class['feat_rba_12m'] / 1e6,
                    y=data_class['feat_excedente_maximo'] / 1e6,
                    mode='markers',
                    name=name,
                    marker=dict(
                        size=8,
                        color=color,
                        opacity=0.6
                    ),
                    hovertemplate='<b>%{fullData.name}</b><br>RBA: R$ %{x:.2f}M<br>Excedente: R$ %{y:.2f}M<extra></extra>'
                ))
            
            fig_scatter.update_layout(
                title='<b>Rela√ß√£o: RBA 12 Meses vs Excedente M√°ximo</b>',
                xaxis_title='RBA 12 Meses (Milh√µes R$)',
                yaxis_title='Excedente M√°ximo (Milh√µes R$)',
                height=500,
                showlegend=True
            )
            
            fig_scatter.show()
    
    print(f"\n‚úÖ An√°lise explorat√≥ria conclu√≠da!")

else:
    print("‚ùå Dataset n√£o dispon√≠vel")

print("\n" + "=" * 80)

In [None]:
# ============================================================================
# MACHINE LEARNING: RANDOM FOREST CLASSIFIER
# ============================================================================

print("\n" + "=" * 80)
print("üå≤ RANDOM FOREST - CLASSIFICA√á√ÉO DE GRUPOS IRREGULARES")
print("=" * 80)

if df_ml is not None and len(df_ml) > 0:
    from sklearn.model_selection import train_test_split, cross_val_score
    from sklearn.ensemble import RandomForestClassifier
    from sklearn.metrics import (classification_report, confusion_matrix, 
                                  roc_auc_score, roc_curve, accuracy_score, 
                                  precision_score, recall_score, f1_score)
    from sklearn.preprocessing import StandardScaler
    
    # Selecionar features independentes (sem leakage)
    features_rf = [
        'feat_rba_12m',
        'feat_receita_mensal',
        'feat_icms_pago',
        'feat_aliquota_efetiva',
        'feat_meses_movimento',
        'feat_taxa_atividade',
        'feat_qtd_empresas_grupo',
        'feat_grupo_sc_puro',
        'feat_socio_titular',
        'feat_socio_responsavel',
        'flag_simples_nacional'
    ]
    
    X = df_ml[features_rf].fillna(0)
    y = df_ml['target_irregular']
    
    print(f"\nüìä Dataset para treinamento:")
    print(f"  ‚Ä¢ Features: {len(features_rf)}")
    print(f"  ‚Ä¢ Amostras: {len(X):,}")
    print(f"  ‚Ä¢ Distribui√ß√£o: Regular={y.value_counts().get(0, 0):,}, Irregular={y.value_counts().get(1, 0):,}")
    
    # Split treino/teste
    print(f"\n‚úÇÔ∏è  Dividindo em treino (70%) e teste (30%)...")
    X_train, X_test, y_train, y_test = train_test_split(
        X, y, test_size=0.3, random_state=42, stratify=y
    )
    
    print(f"  ‚Ä¢ Treino: {len(X_train):,} amostras")
    print(f"  ‚Ä¢ Teste: {len(X_test):,} amostras")
    
    # Normaliza√ß√£o
    print(f"\n‚öôÔ∏è  Normalizando features...")
    scaler = StandardScaler()
    X_train_scaled = scaler.fit_transform(X_train)
    X_test_scaled = scaler.transform(X_test)
    
    # Treinar modelo
    print(f"\nüå≤ Treinando Random Forest...")
    
    # Calcular peso de classes
    class_weights = len(y_train) / (2 * np.bincount(y_train))
    weight_dict = {0: class_weights[0], 1: class_weights[1]}
    
    rf_model = RandomForestClassifier(
        n_estimators=100,
        max_depth=15,
        min_samples_split=20,
        min_samples_leaf=10,
        class_weight=weight_dict,
        random_state=42,
        n_jobs=-1
    )
    
    rf_model.fit(X_train_scaled, y_train)
    print(f"‚úÖ Modelo treinado!")
    
    # Predi√ß√µes
    y_pred = rf_model.predict(X_test_scaled)
    y_pred_proba = rf_model.predict_proba(X_test_scaled)[:, 1]
    
    # M√©tricas
    print(f"\nüìä M√âTRICAS DE DESEMPENHO:\n")
    print(classification_report(y_test, y_pred, target_names=['Regular', 'Irregular']))
    
    auc_score = roc_auc_score(y_test, y_pred_proba)
    accuracy = accuracy_score(y_test, y_pred)
    precision = precision_score(y_test, y_pred)
    recall = recall_score(y_test, y_pred)
    f1 = f1_score(y_test, y_pred)
    
    print(f"\nüéØ RESUMO DAS M√âTRICAS:")
    print(f"  ‚Ä¢ Acur√°cia: {accuracy:.4f}")
    print(f"  ‚Ä¢ Precis√£o: {precision:.4f}")
    print(f"  ‚Ä¢ Recall: {recall:.4f}")
    print(f"  ‚Ä¢ F1-Score: {f1:.4f}")
    print(f"  ‚Ä¢ AUC-ROC: {auc_score:.4f}")
    
    # Valida√ß√£o cruzada
    print(f"\nüîÑ Valida√ß√£o cruzada (5-fold)...")
    cv_scores = cross_val_score(rf_model, X_train_scaled, y_train, cv=5, scoring='roc_auc', n_jobs=-1)
    print(f"   AUC m√©dio CV: {cv_scores.mean():.4f} (¬±{cv_scores.std():.4f})")
    
    # ========================================================================
    # GR√ÅFICOS
    # ========================================================================
    
    # Matriz de confus√£o
    cm = confusion_matrix(y_test, y_pred)
    
    fig_cm = go.Figure(data=go.Heatmap(
        z=cm,
        x=['Regular', 'Irregular'],
        y=['Regular', 'Irregular'],
        text=cm,
        texttemplate='%{text}',
        textfont={"size": 20},
        colorscale='Blues'
    ))
    
    fig_cm.update_layout(
        title='<b>Matriz de Confus√£o - Random Forest</b>',
        xaxis_title='Predi√ß√£o',
        yaxis_title='Real',
        height=400
    )
    
    fig_cm.show()
    
    # Curva ROC
    fpr, tpr, thresholds = roc_curve(y_test, y_pred_proba)
    
    fig_roc = go.Figure()
    
    fig_roc.add_trace(go.Scatter(
        x=fpr, y=tpr,
        mode='lines',
        name=f'Random Forest (AUC = {auc_score:.3f})',
        line=dict(color='#1f77b4', width=3)
    ))
    
    fig_roc.add_trace(go.Scatter(
        x=[0, 1], y=[0, 1],
        mode='lines',
        name='Baseline (Random)',
        line=dict(color='red', width=2, dash='dash')
    ))
    
    fig_roc.update_layout(
        title='<b>Curva ROC - Random Forest</b>',
        xaxis_title='Taxa de Falsos Positivos (FPR)',
        yaxis_title='Taxa de Verdadeiros Positivos (TPR)',
        height=500,
        showlegend=True
    )
    
    fig_roc.show()
    
    # Import√¢ncia das features
    feature_importance = pd.DataFrame({
        'feature': features_rf,
        'importance': rf_model.feature_importances_
    }).sort_values('importance', ascending=False)
    
    print(f"\nüìä IMPORT√ÇNCIA DAS FEATURES:\n")
    for idx, row in feature_importance.iterrows():
        print(f"  {row['feature']:35s} ‚Üí {row['importance']:.6f}")
    
    fig_importance = go.Figure(go.Bar(
        y=feature_importance['feature'],
        x=feature_importance['importance'],
        orientation='h',
        marker=dict(color=feature_importance['importance'], colorscale='Viridis'),
        text=feature_importance['importance'].apply(lambda x: f'{x:.4f}'),
        textposition='outside'
    ))
    
    fig_importance.update_layout(
        title='<b>Import√¢ncia das Features - Random Forest</b>',
        xaxis_title='Import√¢ncia',
        yaxis_title='Feature',
        height=500
    )
    
    fig_importance.show()
    
    # Salvar modelo e probabilidades
    df_ml['rf_probability'] = np.nan
    df_ml.loc[X_test.index, 'rf_probability'] = y_pred_proba
    
    print(f"\nüíæ Probabilidades salvas em df_ml['rf_probability']")
    
else:
    print("‚ùå Dataset n√£o dispon√≠vel")

print("\n" + "=" * 80)
print("‚úÖ RANDOM FOREST CONCLU√çDO!")
print("=" * 80)

In [None]:
# ============================================================================
# MACHINE LEARNING: XGBOOST CLASSIFIER
# ============================================================================

print("\n" + "=" * 80)
print("‚ö° XGBOOST - CLASSIFICA√á√ÉO DE GRUPOS IRREGULARES")
print("=" * 80)

if df_ml is not None and len(df_ml) > 0:
    import xgboost as xgb
    from sklearn.metrics import classification_report, roc_auc_score, roc_curve
    
    # Usar as mesmas features do Random Forest
    features_xgb = [
        'feat_rba_12m',
        'feat_receita_mensal',
        'feat_icms_pago',
        'feat_aliquota_efetiva',
        'feat_meses_movimento',
        'feat_taxa_atividade',
        'feat_qtd_empresas_grupo',
        'feat_grupo_sc_puro',
        'feat_socio_titular',
        'feat_socio_responsavel',
        'flag_simples_nacional'
    ]
    
    X = df_ml[features_xgb].fillna(0)
    y = df_ml['target_irregular']
    
    # Usar o mesmo split
    X_train, X_test, y_train, y_test = train_test_split(
        X, y, test_size=0.3, random_state=42, stratify=y
    )
    
    # Normaliza√ß√£o
    scaler_xgb = StandardScaler()
    X_train_scaled = scaler_xgb.fit_transform(X_train)
    X_test_scaled = scaler_xgb.transform(X_test)
    
    print(f"\n‚öôÔ∏è  Configurando XGBoost...")
    
    # Calcular scale_pos_weight para desbalanceamento
    scale_weight = (y_train == 0).sum() / (y_train == 1).sum()
    print(f"  ‚Ä¢ Scale Pos Weight: {scale_weight:.2f}")
    
    # Treinar modelo
    print(f"\n‚ö° Treinando XGBoost...")
    xgb_model = xgb.XGBClassifier(
        n_estimators=100,
        max_depth=6,
        learning_rate=0.1,
        subsample=0.8,
        colsample_bytree=0.8,
        scale_pos_weight=scale_weight,
        random_state=42,
        n_jobs=-1,
        eval_metric='logloss'
    )
    
    xgb_model.fit(X_train_scaled, y_train)
    print(f"‚úÖ Modelo treinado!")
    
    # Predi√ß√µes
    y_pred_xgb = xgb_model.predict(X_test_scaled)
    y_pred_proba_xgb = xgb_model.predict_proba(X_test_scaled)[:, 1]
    
    # M√©tricas
    print(f"\nüìä M√âTRICAS DE DESEMPENHO - XGBOOST:\n")
    print(classification_report(y_test, y_pred_xgb, target_names=['Regular', 'Irregular']))
    
    auc_xgb = roc_auc_score(y_test, y_pred_proba_xgb)
    accuracy_xgb = accuracy_score(y_test, y_pred_xgb)
    precision_xgb = precision_score(y_test, y_pred_xgb)
    recall_xgb = recall_score(y_test, y_pred_xgb)
    f1_xgb = f1_score(y_test, y_pred_xgb)
    
    print(f"\nüéØ RESUMO DAS M√âTRICAS:")
    print(f"  ‚Ä¢ Acur√°cia: {accuracy_xgb:.4f}")
    print(f"  ‚Ä¢ Precis√£o: {precision_xgb:.4f}")
    print(f"  ‚Ä¢ Recall: {recall_xgb:.4f}")
    print(f"  ‚Ä¢ F1-Score: {f1_xgb:.4f}")
    print(f"  ‚Ä¢ AUC-ROC: {auc_xgb:.4f}")
    
    # Valida√ß√£o cruzada
    print(f"\nüîÑ Valida√ß√£o cruzada (5-fold)...")
    cv_scores_xgb = cross_val_score(xgb_model, X_train_scaled, y_train, cv=5, scoring='roc_auc', n_jobs=-1)
    print(f"   AUC m√©dio CV: {cv_scores_xgb.mean():.4f} (¬±{cv_scores_xgb.std():.4f})")
    
    # ========================================================================
    # COMPARA√á√ÉO: RF vs XGBoost
    # ========================================================================
    
    print(f"\nüìä COMPARA√á√ÉO DE MODELOS:")
    print(f"  ‚Ä¢ Random Forest AUC: {auc_score:.4f}")
    print(f"  ‚Ä¢ XGBoost AUC: {auc_xgb:.4f}")
    print(f"  ‚Ä¢ Diferen√ßa: {abs(auc_xgb - auc_score):.4f}")
    
    if auc_xgb > auc_score:
        print(f"  ‚úÖ XGBoost √© {((auc_xgb/auc_score - 1)*100):.2f}% melhor")
    else:
        print(f"  ‚úÖ Random Forest √© {((auc_score/auc_xgb - 1)*100):.2f}% melhor")
    
    # ========================================================================
    # GR√ÅFICOS
    # ========================================================================
    
    # Curvas ROC comparadas
    fpr_xgb, tpr_xgb, _ = roc_curve(y_test, y_pred_proba_xgb)
    
    fig_compare = go.Figure()
    
    fig_compare.add_trace(go.Scatter(
        x=fpr, y=tpr,
        mode='lines',
        name=f'Random Forest (AUC = {auc_score:.3f})',
        line=dict(color='#1f77b4', width=3)
    ))
    
    fig_compare.add_trace(go.Scatter(
        x=fpr_xgb, y=tpr_xgb,
        mode='lines',
        name=f'XGBoost (AUC = {auc_xgb:.3f})',
        line=dict(color='#ff7f0e', width=3)
    ))
    
    fig_compare.add_trace(go.Scatter(
        x=[0, 1], y=[0, 1],
        mode='lines',
        name='Baseline (Random)',
        line=dict(color='red', width=2, dash='dash')
    ))
    
    fig_compare.update_layout(
        title='<b>Compara√ß√£o: Random Forest vs XGBoost - Curvas ROC</b>',
        xaxis_title='Taxa de Falsos Positivos (FPR)',
        yaxis_title='Taxa de Verdadeiros Positivos (TPR)',
        height=500,
        showlegend=True
    )
    
    fig_compare.show()
    
    # Matriz de confus√£o XGBoost
    cm_xgb = confusion_matrix(y_test, y_pred_xgb)
    
    fig_cm_xgb = go.Figure(data=go.Heatmap(
        z=cm_xgb,
        x=['Regular', 'Irregular'],
        y=['Regular', 'Irregular'],
        text=cm_xgb,
        texttemplate='%{text}',
        textfont={"size": 20},
        colorscale='Oranges'
    ))
    
    fig_cm_xgb.update_layout(
        title='<b>Matriz de Confus√£o - XGBoost</b>',
        xaxis_title='Predi√ß√£o',
        yaxis_title='Real',
        height=400
    )
    
    fig_cm_xgb.show()
    
    # Import√¢ncia das features - XGBoost
    feature_importance_xgb = pd.DataFrame({
        'feature': features_xgb,
        'importance': xgb_model.feature_importances_
    }).sort_values('importance', ascending=False)
    
    print(f"\nüìä IMPORT√ÇNCIA DAS FEATURES - XGBOOST:\n")
    for idx, row in feature_importance_xgb.iterrows():
        print(f"  {row['feature']:35s} ‚Üí {row['importance']:.6f}")
    
    fig_importance_xgb = go.Figure(go.Bar(
        y=feature_importance_xgb['feature'],
        x=feature_importance_xgb['importance'],
        orientation='h',
        marker=dict(color=feature_importance_xgb['importance'], colorscale='Plasma'),
        text=[f'{x:.4f}' for x in feature_importance_xgb['importance'].values],
        textposition='outside'
    ))
    
    fig_importance_xgb.update_layout(
        title='<b>Import√¢ncia das Features - XGBoost</b>',
        xaxis_title='Import√¢ncia',
        yaxis_title='Feature',
        height=500
    )
    
    fig_importance_xgb.show()
    
    # Compara√ß√£o de import√¢ncia
    fig_comp_importance = go.Figure()
    
    fig_comp_importance.add_trace(go.Bar(
        name='Random Forest',
        y=feature_importance['feature'],
        x=feature_importance['importance'],
        orientation='h',
        marker=dict(color='#1f77b4')
    ))
    
    fig_comp_importance.add_trace(go.Bar(
        name='XGBoost',
        y=feature_importance_xgb['feature'],
        x=feature_importance_xgb['importance'],
        orientation='h',
        marker=dict(color='#ff7f0e')
    ))
    
    fig_comp_importance.update_layout(
        title='<b>Compara√ß√£o: Import√¢ncia das Features (RF vs XGBoost)</b>',
        xaxis_title='Import√¢ncia',
        yaxis_title='Feature',
        height=600,
        barmode='group'
    )
    
    fig_comp_importance.show()
    
    # Salvar probabilidades
    df_ml['xgb_probability'] = np.nan
    df_ml.loc[X_test.index, 'xgb_probability'] = y_pred_proba_xgb
    
    # Ensemble (m√©dia das probabilidades)
    df_ml['ensemble_probability'] = (df_ml['rf_probability'] + df_ml['xgb_probability']) / 2
    
    print(f"\nüíæ Probabilidades XGBoost e Ensemble salvas em df_ml")
    
    # An√°lise do Ensemble
    mask_ensemble = df_ml['ensemble_probability'].notna()
    y_pred_ensemble = (df_ml.loc[mask_ensemble, 'ensemble_probability'] >= 0.5).astype(int)
    y_test_ensemble = df_ml.loc[mask_ensemble, 'target_irregular']
    
    if len(y_test_ensemble) > 0:
        auc_ensemble = roc_auc_score(y_test_ensemble, df_ml.loc[mask_ensemble, 'ensemble_probability'])
        
        print(f"\nüéØ COMPARA√á√ÉO FINAL DOS MODELOS:")
        print(f"  ‚Ä¢ Random Forest AUC: {auc_score:.4f}")
        print(f"  ‚Ä¢ XGBoost AUC: {auc_xgb:.4f}")
        print(f"  ‚Ä¢ Ensemble (M√©dia) AUC: {auc_ensemble:.4f}")
        
        # Melhor modelo
        import builtins
        modelos_comparacao = [
            ('Random Forest', auc_score), 
            ('XGBoost', auc_xgb), 
            ('Ensemble', auc_ensemble)
        ]
        best_model = builtins.max(modelos_comparacao, key=lambda x: x[1])
        print(f"\nüèÜ MELHOR MODELO: {best_model[0]} (AUC = {best_model[1]:.4f})")

else:
    print("‚ùå Dataset n√£o dispon√≠vel para XGBoost")

print("\n" + "=" * 80)
print("‚úÖ XGBOOST CONCLU√çDO!")
print("=" * 80)

In [None]:
# ============================================================================
# APRENDIZADO N√ÉO SUPERVISIONADO: K-MEANS CLUSTERING
# ============================================================================

print("\n" + "=" * 80)
print("üé≤ K-MEANS CLUSTERING - SEGMENTA√á√ÉO DE EMPRESAS")
print("=" * 80)

if df_ml is not None and len(df_ml) > 0:
    from sklearn.cluster import KMeans
    from sklearn.decomposition import PCA
    from sklearn.preprocessing import StandardScaler
    
    # Selecionar features para clustering
    features_cluster = [
        'feat_rba_12m',
        'feat_receita_mensal',
        'feat_icms_pago',
        'feat_aliquota_efetiva',
        'feat_meses_movimento',
        'feat_taxa_atividade',
        'feat_qtd_empresas_grupo'
    ]
    
    df_cluster = df_ml[features_cluster].copy()
    df_cluster = df_cluster.fillna(0)
    
    print(f"\nüìä Features selecionadas para clustering:")
    for feat in features_cluster:
        print(f"  ‚Ä¢ {feat}")
    
    # Normaliza√ß√£o
    print(f"\n‚öôÔ∏è  Normalizando features...")
    scaler_cluster = StandardScaler()
    X_scaled = scaler_cluster.fit_transform(df_cluster)
    
    # ========================================================================
    # M√âTODO DO COTOVELO
    # ========================================================================
    
    print(f"\nüìà Calculando m√©todo do cotovelo...")
    inertias = []
    silhouette_scores = []
    K_range = range(2, 11)
    
    from sklearn.metrics import silhouette_score
    
    for k in K_range:
        kmeans_temp = KMeans(n_clusters=k, random_state=42, n_init=10)
        kmeans_temp.fit(X_scaled)
        inertias.append(kmeans_temp.inertia_)
        silhouette_scores.append(silhouette_score(X_scaled, kmeans_temp.labels_))
        print(f"  K={k}: Inertia={kmeans_temp.inertia_:.0f}, Silhouette={silhouette_scores[-1]:.3f}")
    
    # Gr√°fico do cotovelo
    fig_elbow = make_subplots(
        rows=1, cols=2,
        subplot_titles=('M√©todo do Cotovelo', 'Silhouette Score'),
        specs=[[{'type': 'scatter'}, {'type': 'scatter'}]]
    )
    
    fig_elbow.add_trace(
        go.Scatter(
            x=list(K_range),
            y=inertias,
            mode='lines+markers',
            marker=dict(size=10, color='#1f77b4'),
            line=dict(width=2),
            name='In√©rcia'
        ),
        row=1, col=1
    )
    
    fig_elbow.add_trace(
        go.Scatter(
            x=list(K_range),
            y=silhouette_scores,
            mode='lines+markers',
            marker=dict(size=10, color='#ff7f0e'),
            line=dict(width=2),
            name='Silhouette'
        ),
        row=1, col=2
    )
    
    fig_elbow.update_xaxes(title_text="N√∫mero de Clusters (K)", row=1, col=1)
    fig_elbow.update_xaxes(title_text="N√∫mero de Clusters (K)", row=1, col=2)
    fig_elbow.update_yaxes(title_text="In√©rcia", row=1, col=1)
    fig_elbow.update_yaxes(title_text="Silhouette Score", row=1, col=2)
    
    fig_elbow.update_layout(
        title='<b>Determina√ß√£o do K Ideal</b>',
        height=400,
        showlegend=False
    )
    
    fig_elbow.show()
    
    # ========================================================================
    # APLICAR K-MEANS
    # ========================================================================
    
    k_optimal = 4
    print(f"\nüéØ Aplicando K-Means com K={k_optimal}...")
    
    kmeans = KMeans(n_clusters=k_optimal, random_state=42, n_init=10)
    df_ml['cluster'] = kmeans.fit_predict(X_scaled)
    
    # ========================================================================
    # AN√ÅLISE DOS CLUSTERS
    # ========================================================================
    
    print(f"\nüìä AN√ÅLISE DOS CLUSTERS:\n")
    for cluster_id in range(k_optimal):
        cluster_data = df_ml[df_ml['cluster'] == cluster_id]
        print(f"Cluster {cluster_id}:")
        print(f"  ‚Ä¢ Tamanho: {len(cluster_data):,} empresas ({len(cluster_data)/len(df_ml)*100:.1f}%)")
        print(f"  ‚Ä¢ RBA M√©dia: R$ {cluster_data['feat_rba_12m'].mean():,.2f}")
        print(f"  ‚Ä¢ Receita Mensal M√©dia: R$ {cluster_data['feat_receita_mensal'].mean():,.2f}")
        print(f"  ‚Ä¢ Empresas/Grupo M√©dio: {cluster_data['feat_qtd_empresas_grupo'].mean():.1f}")
        print(f"  ‚Ä¢ % Irregulares: {(cluster_data['target_irregular'].sum()/len(cluster_data)*100):.1f}%")
        print()
    
    # ========================================================================
    # PCA PARA VISUALIZA√á√ÉO 2D
    # ========================================================================
    
    print(f"üîÑ Reduzindo dimensionalidade com PCA...")
    pca = PCA(n_components=2)
    X_pca = pca.fit_transform(X_scaled)
    
    df_ml['pca1'] = X_pca[:, 0]
    df_ml['pca2'] = X_pca[:, 1]
    
    print(f"‚úÖ Vari√¢ncia explicada: {pca.explained_variance_ratio_.sum()*100:.1f}%")
    
    # Visualiza√ß√£o dos clusters
    fig_clusters = go.Figure()
    
    colors_cluster = ['#1f77b4', '#ff7f0e', '#2ca02c', '#d62728']
    
    for cluster_id in range(k_optimal):
        cluster_data = df_ml[df_ml['cluster'] == cluster_id]
        
        fig_clusters.add_trace(go.Scatter(
            x=cluster_data['pca1'],
            y=cluster_data['pca2'],
            mode='markers',
            name=f'Cluster {cluster_id}',
            marker=dict(
                size=6,
                color=colors_cluster[cluster_id],
                opacity=0.6
            ),
            text=cluster_data['cnpj_raiz'],
            hovertemplate='<b>Cluster %{fullData.name}</b><br>CNPJ: %{text}<br>PCA1: %{x:.2f}<br>PCA2: %{y:.2f}<extra></extra>'
        ))
    
    # Centroides
    centroids_pca = pca.transform(kmeans.cluster_centers_)
    fig_clusters.add_trace(go.Scatter(
        x=centroids_pca[:, 0],
        y=centroids_pca[:, 1],
        mode='markers',
        name='Centroides',
        marker=dict(
            size=20,
            color='black',
            symbol='x',
            line=dict(width=3, color='white')
        )
    ))
    
    fig_clusters.update_layout(
        title='<b>Visualiza√ß√£o dos Clusters (PCA 2D)</b>',
        xaxis_title=f'Componente Principal 1 ({pca.explained_variance_ratio_[0]*100:.1f}%)',
        yaxis_title=f'Componente Principal 2 ({pca.explained_variance_ratio_[1]*100:.1f}%)',
        height=600,
        showlegend=True
    )
    
    fig_clusters.show()
    
    # ========================================================================
    # PERFIL DOS CLUSTERS (Radar Chart)
    # ========================================================================
    
    print(f"\nüìä Gerando perfil dos clusters...")
    
    fig_perfil = go.Figure()
    
    features_perfil = ['feat_rba_12m', 'feat_receita_mensal', 'feat_qtd_empresas_grupo', 
                       'feat_taxa_atividade']
    df_perfil = df_ml.groupby('cluster')[features_perfil].mean()
    
    # Normalizar para visualiza√ß√£o (0-1)
    df_perfil_norm = (df_perfil - df_perfil.min()) / (df_perfil.max() - df_perfil.min())
    
    for cluster_id in range(k_optimal):
        fig_perfil.add_trace(go.Scatterpolar(
            r=df_perfil_norm.loc[cluster_id].values,
            theta=[f.replace('feat_', '').replace('_', ' ').title() for f in features_perfil],
            fill='toself',
            name=f'Cluster {cluster_id}',
            line=dict(color=colors_cluster[cluster_id])
        ))
    
    fig_perfil.update_layout(
        polar=dict(radialaxis=dict(visible=True, range=[0, 1])),
        title='<b>Perfil dos Clusters (Normalizado)</b>',
        height=500,
        showlegend=True
    )
    
    fig_perfil.show()
    
    # ========================================================================
    # DISTRIBUI√á√ÉO DE IRREGULARES POR CLUSTER
    # ========================================================================
    
    cluster_irreg = df_ml.groupby('cluster')['target_irregular'].agg(['sum', 'count'])
    cluster_irreg['perc'] = (cluster_irreg['sum'] / cluster_irreg['count']) * 100
    
    fig_cluster_irreg = go.Figure(data=[
        go.Bar(
            x=[f'Cluster {i}' for i in range(k_optimal)],
            y=cluster_irreg['perc'].values,
            marker=dict(
                color=cluster_irreg['perc'].values,
                colorscale='Reds',
                showscale=True,
                colorbar=dict(title="% Irreg.")
            ),
            text=[f'{x:.1f}%' for x in cluster_irreg['perc'].values],
            textposition='outside'
        )
    ])
    
    fig_cluster_irreg.update_layout(
        title='<b>Percentual de Empresas Irregulares por Cluster</b>',
        xaxis_title='Cluster',
        yaxis_title='% de Empresas Irregulares',
        height=400
    )
    
    fig_cluster_irreg.show()
    
    print(f"\nüíæ Clusters salvos em df_ml['cluster']")

else:
    print("‚ùå Dataset n√£o dispon√≠vel para clustering")

print("\n" + "=" * 80)
print("‚úÖ CLUSTERING K-MEANS CONCLU√çDO!")
print("=" * 80)

In [None]:
# ============================================================================
# DASHBOARD FINAL - RESUMO EXECUTIVO
# ============================================================================

print("\n" + "=" * 80)
print("üìä DASHBOARD EXECUTIVO - RESUMO GERAL")
print("=" * 80)

if df_ml is not None and len(df_ml) > 0:
    
    # ========================================================================
    # M√âTRICAS CONSOLIDADAS
    # ========================================================================
    
    print(f"\nüéØ M√âTRICAS CONSOLIDADAS DO MODELO:\n")
    
    metrics_summary = pd.DataFrame({
        'Modelo': ['Random Forest', 'XGBoost', 'Ensemble'],
        'AUC-ROC': [auc_score, auc_xgb, auc_ensemble if 'auc_ensemble' in locals() else 0],
        'Acur√°cia': [accuracy, accuracy_xgb, 0],
        'Precis√£o': [precision, precision_xgb, 0],
        'Recall': [recall, recall_xgb, 0],
        'F1-Score': [f1, f1_xgb, 0]
    })
    
    print(metrics_summary.to_string(index=False))
    
    # Gr√°fico de barras comparativo
    fig_metrics = go.Figure()
    
    metrics_plot = ['AUC-ROC', 'Acur√°cia', 'Precis√£o', 'Recall', 'F1-Score']
    
    for modelo in ['Random Forest', 'XGBoost']:
        values = metrics_summary[metrics_summary['Modelo'] == modelo][metrics_plot].values[0]
        
        fig_metrics.add_trace(go.Bar(
            name=modelo,
            x=metrics_plot,
            y=values,
            text=[f'{v:.3f}' for v in values],
            textposition='outside'
        ))
    
    fig_metrics.update_layout(
        title='<b>Compara√ß√£o de M√©tricas: Random Forest vs XGBoost</b>',
        xaxis_title='M√©trica',
        yaxis_title='Valor',
        barmode='group',
        height=500
    )
    
    fig_metrics.show()
    
    # ========================================================================
    # RESUMO DOS CLUSTERS
    # ========================================================================
    
    print(f"\nüé≤ RESUMO DOS CLUSTERS:\n")
    
    cluster_summary = df_ml.groupby('cluster').agg({
        'cnpj_raiz': 'count',
        'feat_rba_12m': 'mean',
        'feat_qtd_empresas_grupo': 'mean',
        'target_irregular': lambda x: (x.sum() / len(x)) * 100
    }).round(2)
    
    cluster_summary.columns = ['Qtd Empresas', 'RBA M√©dia', 'Emp/Grupo M√©dio', '% Irregular']
    print(cluster_summary.to_string())
    
    # ========================================================================
    # TOP 20 EMPRESAS MAIS SUSPEITAS
    # ========================================================================
    
    print(f"\nüö® TOP 20 EMPRESAS MAIS SUSPEITAS (por Ensemble):\n")
    
    df_suspeitas = df_ml[df_ml['ensemble_probability'].notna()].copy()
    df_suspeitas_top = df_suspeitas.nlargest(20, 'ensemble_probability')
    
    for idx, row in df_suspeitas_top.iterrows():
        print(f"{list(df_suspeitas_top.index).index(idx)+1:2d}. CNPJ: {row['cnpj_raiz']} | "
              f"Prob: {row['ensemble_probability']:.3f} | "
              f"Cluster: {int(row['cluster'])} | "
              f"RBA: R$ {row['feat_rba_12m']:,.2f}")
    
    print(f"\n‚úÖ An√°lise completa conclu√≠da!")
    print(f"üíæ Resultados dispon√≠veis em: df_ml")

else:
    print("‚ùå Dataset n√£o dispon√≠vel")

print("\n" + "=" * 80)
print("üéâ AN√ÅLISE COMPLETA FINALIZADA!")
print("=" * 80)