In [None]:
import sys
sys.path.append("/home/tsevero/notebooks/SAT_BIG_DATA/data-pipeline/batch/poc")
sys.path.append("/home/tsevero/notebooks/SAT_BIG_DATA/data-pipeline/batch/plugins")
sys.path.append("/home/tsevero/notebooks/SAT_BIG_DATA/data-pipeline/batch/dags")

#Import libs python
from pyspark.sql.types import *
from pyspark.sql.functions import *
from datetime import date

#Import libs internas
from utils import spark_utils_session as utils

from hooks.hdfs.hdfs_helper import HdfsHelper
from jobs.job_base_config import BaseETLJobClass

import poc_helper
poc_helper.load_env("PROD")

In [None]:
def get_session(profile: str, dynamic_allocation_enabled: bool = True) -> utils.DBASparkAppSession:
    """Generates DBASparkAppSession."""
    
    app_name = "tsevero_ecd_new"
    
    spark_builder = (utils.DBASparkAppSession
                     .builder
                     .setAppName(app_name)
                     .usingProcessProfile(profile)
                    )
    
    if dynamic_allocation_enabled:
        spark_builder.autoResourceManagement()

    return spark_builder.build()

session = get_session(profile='efd_t2')

In [None]:
session.sparkSession.sql("SHOW DATABASES").show(truncate=False)

In [None]:
# ============================================================================
# CONFIGURA√á√ÉO INICIAL - 
# ============================================================================

import sys
import warnings
from datetime import datetime, date
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import plotly.express as px
import plotly.graph_objects as go
from plotly.subplots import make_subplots

# PySpark imports com aliases para evitar conflitos
from pyspark.sql.functions import (
    col as spark_col, 
    sum as spark_sum, 
    avg as spark_avg,
    count as spark_count,
    when as spark_when,
    desc as spark_desc,
    asc as spark_asc,
    round as spark_round,
    concat as spark_concat,
    lit as spark_lit,
    max as spark_max,
    min as spark_min,
    stddev as spark_stddev,
    countDistinct as spark_countDistinct
)
from pyspark.sql.types import DoubleType, IntegerType

# Configura√ß√µes de visualiza√ß√£o
warnings.filterwarnings('ignore')
plt.style.use('seaborn-v0_8-darkgrid')
sns.set_palette("husl")
plt.rcParams['figure.figsize'] = (16, 8)
plt.rcParams['font.size'] = 11

# ‚úÖ CORRE√á√ÉO: N√£o usar abs() que conflita com PySpark
# pd.set_option('display.float_format', lambda x: f'{x:,.2f}' if abs(x) > 0.01 else f'{x:.6f}')
pd.set_option('display.max_columns', None)
pd.set_option('display.precision', 2)

# Acesso ao Spark
spark = session.sparkSession

print("=" * 80)
print("üîç SISTEMA")
print("=" * 80)
print(f"Sess√£o Spark: {spark.sparkContext.appName}")
print(f"Vers√£o Spark: {spark.version}")
print(f"Iniciado em: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
print("=" * 80)

In [None]:
# Adicione estas linhas no in√≠cio da C√âLULA 1 (depois dos imports)
import logging

# Reduzir logs do Spark
logging.getLogger("org.apache.spark").setLevel(logging.ERROR)
logging.getLogger("org.spark_project").setLevel(logging.ERROR)

# Configura√ß√£o do Spark para lidar melhor com c√≥digo complexo
spark.conf.set("spark.sql.codegen.wholeStage", "false")  # Desabilita codegen complexo
spark.conf.set("spark.sql.codegen.maxFields", "500")     # Aumenta limite de campos

# ================================================================================
# CLASSIFICA√á√ÉO DE CONTAS CONT√ÅBEIS - SCRIPT COMPLETO
# C√âLULA 1: PAR√ÇMETROS E PREPARA√á√ÉO
# ================================================================================

print("üîß CONFIGURANDO PAR√ÇMETROS...")
print("=" * 80)

# ================================================================================
# PAR√ÇMETROS CONFIGUR√ÅVEIS
# ================================================================================

# Ano de refer√™ncia
ANO_REFERENCIA = 2024
print(f"üìÖ Ano de refer√™ncia: {ANO_REFERENCIA}")

# UF para filtro
UF_FILTRO = 'SC'
print(f"üìç UF filtrada: {UF_FILTRO}")

# Toler√¢ncia para equa√ß√£o cont√°bil (%)
TOLERANCIA_EQUACAO = 10.0  # 10%
print(f"‚öñÔ∏è  Toler√¢ncia equa√ß√£o cont√°bil: {TOLERANCIA_EQUACAO}%")

# Database de destino
DATABASE_DESTINO = 'neac'
print(f"üíæ Database destino: {DATABASE_DESTINO}")

# Tabelas de refer√™ncia
TABELA_REF_BP = f'{DATABASE_DESTINO}.ecd_pc_bp'
TABELA_REF_DRE = f'{DATABASE_DESTINO}.ecd_pc_dre'
print(f"üìö Refer√™ncia BP: {TABELA_REF_BP}")
print(f"üìö Refer√™ncia DRE: {TABELA_REF_DRE}")

# Tabela de sa√≠da
TABELA_SAIDA = f'{DATABASE_DESTINO}.ecd_contas_classificadas'
print(f"üìä Tabela sa√≠da: {TABELA_SAIDA}")

print("\n‚úÖ Par√¢metros configurados!")
print("=" * 80)

In [None]:
# ================================================================================
# C√âLULA 2: CRIAR TABELA UNIFICADA DE REFER√äNCIA (BP + DRE)
# ================================================================================

print("\nüìö CRIANDO TABELA UNIFICADA DE REFER√äNCIA...")
print("=" * 80)

# ================================================================================
# DROPAR TABELA ANTIGA SE EXISTIR
# ================================================================================

try:
    spark.sql(f"DROP TABLE IF EXISTS {DATABASE_DESTINO}.pc_referencia_completa PURGE")
    print("‚úÖ Tabela antiga removida")
except:
    print("‚ö†Ô∏è  Tabela n√£o existia (OK)")

# ================================================================================
# CRIAR TABELA UNIFICADA: BP + DRE COM HIERARQUIA
# ================================================================================

sql_criar_referencia = f"""
CREATE TABLE {DATABASE_DESTINO}.pc_referencia_completa AS
WITH ref_bp AS (
    SELECT 
        codigo AS cd_conta_ref,
        descricao AS descr_conta_ref,
        tipo AS tp_conta,  -- 'A' ou 'S'
        conta_superior AS cd_conta_sup_ref,
        nivel AS nivel_ref,
        natureza AS cd_natureza_ref,  -- '1', '2', '3'
        'BP' AS origem_ref,
        
        -- Classifica√ß√£o baseada no c√≥digo
        CASE 
            WHEN codigo LIKE '1.01%' THEN 'ATIVO_CIRCULANTE'
            WHEN codigo LIKE '1.02%' THEN 'ATIVO_NAO_CIRCULANTE'
            WHEN codigo LIKE '1%' THEN 'ATIVO'
            WHEN codigo LIKE '2.01%' THEN 'PASSIVO_CIRCULANTE'
            WHEN codigo LIKE '2.02%' THEN 'PASSIVO_NAO_CIRCULANTE'
            WHEN codigo LIKE '2.03%' THEN 'PATRIMONIO_LIQUIDO'
            WHEN codigo LIKE '2%' THEN 'PASSIVO'
            ELSE 'NAO_CLASSIFICADO'
        END AS classificacao_nivel2,
        
        CASE 
            WHEN codigo LIKE '1.01.01%' THEN 'DISPONIBILIDADES'
            WHEN codigo LIKE '1.01.02%' THEN 'CREDITOS'
            WHEN codigo LIKE '1.01.03%' THEN 'ESTOQUES'
            WHEN codigo LIKE '1.02.01%' THEN 'ATIVO_REALIZAVEL_LP'
            WHEN codigo LIKE '1.02.02%' THEN 'INVESTIMENTOS'
            WHEN codigo LIKE '1.02.03%' THEN 'IMOBILIZADO'
            WHEN codigo LIKE '1.02.04%' THEN 'INTANGIVEL'
            WHEN codigo LIKE '2.01.01%' THEN 'FORNECEDORES'
            WHEN codigo LIKE '2.01.02%' THEN 'EMPRESTIMOS_CP'
            WHEN codigo LIKE '2.01.03%' THEN 'TRIBUTOS_A_PAGAR'
            WHEN codigo LIKE '2.02.01%' THEN 'EMPRESTIMOS_LP'
            WHEN codigo LIKE '2.03.01%' THEN 'CAPITAL_SOCIAL'
            WHEN codigo LIKE '2.03.02%' THEN 'RESERVAS'
            WHEN codigo LIKE '2.03.03%' THEN 'LUCROS_PREJUIZOS_ACUMULADOS'
            ELSE NULL
        END AS classificacao_nivel3
        
    FROM {TABELA_REF_BP}
    WHERE codigo IS NOT NULL
),

ref_dre AS (
    SELECT 
        codigo AS cd_conta_ref,
        descricao AS descr_conta_ref,
        tipo AS tp_conta,
        conta_superior AS cd_conta_sup_ref,
        nivel AS nivel_ref,
        natureza AS cd_natureza_ref,  -- '4'
        'DRE' AS origem_ref,
        
        -- Classifica√ß√£o baseada no c√≥digo
        CASE 
            WHEN codigo LIKE '3.01%' THEN 'RECEITA_BRUTA'
            WHEN codigo LIKE '3.02%' THEN 'DEDUCOES_RECEITA'
            WHEN codigo LIKE '3.03%' THEN 'CUSTOS'
            WHEN codigo LIKE '3.04%' THEN 'DESPESAS_OPERACIONAIS'
            WHEN codigo LIKE '3.05%' THEN 'OUTRAS_RECEITAS_DESPESAS'
            WHEN codigo LIKE '3.06%' THEN 'RESULTADO_FINANCEIRO'
            WHEN codigo LIKE '3.07%' THEN 'IMPOSTOS_RESULTADO'
            WHEN codigo LIKE '3%' THEN 'RESULTADO'
            ELSE 'NAO_CLASSIFICADO'
        END AS classificacao_nivel2,
        
        CASE 
            WHEN codigo LIKE '3.01.01%' THEN 'RECEITA_VENDAS_PRODUTOS'
            WHEN codigo LIKE '3.01.02%' THEN 'RECEITA_VENDAS_MERCADORIAS'
            WHEN codigo LIKE '3.01.03%' THEN 'RECEITA_PRESTACAO_SERVICOS'
            WHEN codigo LIKE '3.02.01%' THEN 'DEVOLUCOES_VENDAS'
            WHEN codigo LIKE '3.02.02%' THEN 'IMPOSTOS_SOBRE_VENDAS'
            WHEN codigo LIKE '3.03.01%' THEN 'CUSTO_PRODUTOS_VENDIDOS'
            WHEN codigo LIKE '3.03.02%' THEN 'CUSTO_MERCADORIAS_VENDIDAS'
            WHEN codigo LIKE '3.03.03%' THEN 'CUSTO_SERVICOS_PRESTADOS'
            WHEN codigo LIKE '3.04.01%' THEN 'DESPESAS_VENDAS'
            WHEN codigo LIKE '3.04.02%' THEN 'DESPESAS_ADMINISTRATIVAS'
            WHEN codigo LIKE '3.06.01%' THEN 'RECEITAS_FINANCEIRAS'
            WHEN codigo LIKE '3.06.02%' THEN 'DESPESAS_FINANCEIRAS'
            ELSE NULL
        END AS classificacao_nivel3
        
    FROM {TABELA_REF_DRE}
    WHERE codigo IS NOT NULL
)

SELECT * FROM ref_bp
UNION ALL
SELECT * FROM ref_dre
"""

print("Executando SQL...")
spark.sql(sql_criar_referencia)

# Validar cria√ß√£o
total_ref = spark.sql(f"SELECT COUNT(*) as total FROM {DATABASE_DESTINO}.pc_referencia_completa").collect()[0]['total']
print(f"‚úÖ Tabela criada com {total_ref:,} registros de refer√™ncia")

# Estat√≠sticas
print("\nüìä Estat√≠sticas da tabela de refer√™ncia:")
stats_ref = spark.sql(f"""
    SELECT 
        origem_ref,
        COUNT(*) as qtd,
        COUNT(DISTINCT cd_conta_ref) as contas_unicas
    FROM {DATABASE_DESTINO}.pc_referencia_completa
    GROUP BY origem_ref
""")
stats_ref.show()

print("=" * 80)
print("‚úÖ TABELA DE REFER√äNCIA CRIADA COM SUCESSO!")

In [None]:
# ================================================================================
# C√âLULA 3: EXTRAIR E CLASSIFICAR CONTAS - CAMADA 1 (VALIDA√á√ÉO CRUZADA)
# ================================================================================

print("\nüîÑ EXTRAINDO E CLASSIFICANDO CONTAS - CAMADA 1...")
print("=" * 80)

# ================================================================================
# DROPAR TABELA ANTIGA
# ================================================================================

try:
    spark.sql(f"DROP TABLE IF EXISTS {TABELA_SAIDA} PURGE")
    print("‚úÖ Tabela antiga removida")
except:
    print("‚ö†Ô∏è  Tabela n√£o existia (OK)")

# ================================================================================
# SQL PRINCIPAL: UNI√ÉO DE BP + DRE COM VALIDA√á√ÉO CRUZADA
# ================================================================================

dt_ref_filter = int(f"{ANO_REFERENCIA}01")  # 202401

sql_criar_classificacao = f"""
CREATE TABLE {TABELA_SAIDA} AS

WITH empresas_sc AS (
    -- Filtrar apenas empresas de SC
    SELECT DISTINCT 
        id_ecd,
        nu_cnpj AS cnpj,
        nm_empresarial,
        cd_uf
    FROM usr_sat_ecd.ecd_r0000_identificacao
    WHERE dt_referencia = {dt_ref_filter}
        AND cd_uf = '{UF_FILTRO}'
),

contas_bp AS (
    -- BALAN√áO PATRIMONIAL
    SELECT 
        bp.id_ecd,
        {dt_ref_filter} AS dt_referencia,
        emp.cnpj,
        emp.nm_empresarial,
        emp.cd_uf,
        
        -- Dados da conta
        bp.cod_agl AS cd_conta,
        bp.cod_agl_sup AS cd_conta_sup,
        bp.descr_cod_agl AS descr_conta,
        bp.nivel_agl AS nivel_conta,
        bp.ind_cod_agl AS tp_conta_agl,  -- 'T' ou 'D'
        'BP' AS origem_demonstrativo,
        
        -- Classifica√ß√£o BASE (campos originais)
        bp.ind_grp_bal,  -- 'A' ou 'P'
        NULL AS ind_grp_dre,
        bp.ind_dc_cta_ini,
        bp.ind_dc_cta_fin,
        
        -- Plano de contas (JOIN para pegar cd_natureza)
        pc.cd_natureza,
        pc.tp_conta AS tp_conta_pc,
        
        -- Valores
        bp.vl_cta_ini,
        bp.vl_cta_fin
        
    FROM usr_sat_ecd.ecd_rj100_balanco_patrimonial bp
    INNER JOIN empresas_sc emp
        ON bp.id_ecd = emp.id_ecd
    LEFT JOIN usr_sat_ecd.ecd_ri050_plano_contas pc
        ON bp.id_ecd = pc.id_ecd
        AND bp.dt_referencia = pc.dt_referencia
        AND bp.cod_agl = pc.cd_conta_anl
    
    WHERE bp.dt_referencia = {dt_ref_filter}
        AND bp.cod_agl IS NOT NULL
        AND (bp.vl_cta_ini != 0 OR bp.vl_cta_fin != 0)  -- Com movimenta√ß√£o
),

contas_dre AS (
    -- DRE
    SELECT 
        dre.id_ecd,
        {dt_ref_filter} AS dt_referencia,
        emp.cnpj,
        emp.nm_empresarial,
        emp.cd_uf,
        
        -- Dados da conta
        dre.cod_agl AS cd_conta,
        dre.cod_agl_sup AS cd_conta_sup,
        dre.descr_cod_agl AS descr_conta,
        dre.nivel_agl AS nivel_conta,
        dre.ind_cod_agl AS tp_conta_agl,
        'DRE' AS origem_demonstrativo,
        
        -- Classifica√ß√£o BASE
        NULL AS ind_grp_bal,
        dre.ind_grp_dre,  -- 'R' ou 'D'
        dre.ind_dc_cta_ini,
        dre.ind_dc_cta_fin,
        
        -- Plano de contas
        pc.cd_natureza,
        pc.tp_conta AS tp_conta_pc,
        
        -- Valores
        dre.vl_cta_ini,
        dre.vl_cta_fin
        
    FROM usr_sat_ecd.ecd_rj150_demonstracao_resultado_exercicio dre
    INNER JOIN empresas_sc emp
        ON dre.id_ecd = emp.id_ecd
    LEFT JOIN usr_sat_ecd.ecd_ri050_plano_contas pc
        ON dre.id_ecd = pc.id_ecd
        AND dre.dt_referencia = pc.dt_referencia
        AND dre.cod_agl = pc.cd_conta_anl
    
    WHERE dre.dt_referencia = {dt_ref_filter}
        AND dre.cod_agl IS NOT NULL
        AND (dre.vl_cta_ini != 0 OR dre.vl_cta_fin != 0)  -- Com movimenta√ß√£o
),

contas_unificadas AS (
    SELECT * FROM contas_bp
    UNION ALL
    SELECT * FROM contas_dre
),

classificacao_camada1 AS (
    SELECT 
        *,
        
        -- =========================================================
        -- CAMADA 1: CLASSIFICA√á√ÉO POR VALIDA√á√ÉO CRUZADA
        -- Prioridade: ind_grp + cd_natureza (99% consist√™ncia!)
        -- =========================================================
        
        -- N√çVEL 1: ATIVO, PASSIVO, PL ou RESULTADO
        CASE 
            -- BP: ATIVO
            WHEN origem_demonstrativo = 'BP' 
                AND ind_grp_bal = 'A' 
                AND cd_natureza = '01' 
                THEN 'ATIVO'
                
            -- BP: PASSIVO
            WHEN origem_demonstrativo = 'BP' 
                AND ind_grp_bal = 'P' 
                AND cd_natureza = '02' 
                THEN 'PASSIVO'
                
            -- BP: PATRIM√îNIO L√çQUIDO
            WHEN origem_demonstrativo = 'BP' 
                AND ind_grp_bal = 'P' 
                AND cd_natureza = '03' 
                THEN 'PATRIMONIO_LIQUIDO'
                
            -- DRE: RESULTADO
            WHEN origem_demonstrativo = 'DRE' 
                AND ind_grp_dre IN ('R', 'D') 
                AND cd_natureza = '04' 
                THEN 'RESULTADO'
                
            -- Fallback: s√≥ ind_grp (quando n√£o tem cd_natureza)
            WHEN origem_demonstrativo = 'BP' AND ind_grp_bal = 'A' THEN 'ATIVO'
            WHEN origem_demonstrativo = 'BP' AND ind_grp_bal = 'P' THEN 'PASSIVO'  -- pode ser PL tamb√©m
            WHEN origem_demonstrativo = 'DRE' AND ind_grp_dre IN ('R', 'D') THEN 'RESULTADO'
            
            ELSE 'NAO_CLASSIFICADO'
        END AS classificacao_nivel1,
        
        -- Confian√ßa da classifica√ß√£o N√≠vel 1
        CASE 
            WHEN (origem_demonstrativo = 'BP' AND ind_grp_bal = 'A' AND cd_natureza = '01') 
                OR (origem_demonstrativo = 'BP' AND ind_grp_bal = 'P' AND cd_natureza IN ('02', '03'))
                OR (origem_demonstrativo = 'DRE' AND ind_grp_dre IN ('R', 'D') AND cd_natureza = '04')
                THEN 'MUITO_ALTA'  -- Valida√ß√£o cruzada perfeita
                
            WHEN cd_natureza IS NOT NULL 
                THEN 'ALTA'  -- Tem cd_natureza mas n√£o bate perfeitamente
                
            WHEN ind_grp_bal IS NOT NULL OR ind_grp_dre IS NOT NULL
                THEN 'MEDIA'  -- S√≥ tem ind_grp
                
            ELSE 'BAIXA'
        END AS confianca_nivel1,
        
        -- M√©todo usado
        CASE 
            WHEN (origem_demonstrativo = 'BP' AND ind_grp_bal = 'A' AND cd_natureza = '01') 
                OR (origem_demonstrativo = 'BP' AND ind_grp_bal = 'P' AND cd_natureza IN ('02', '03'))
                OR (origem_demonstrativo = 'DRE' AND ind_grp_dre IN ('R', 'D') AND cd_natureza = '04')
                THEN 'VALIDACAO_CRUZADA'
                
            WHEN cd_natureza IS NOT NULL 
                THEN 'CD_NATUREZA'
                
            WHEN ind_grp_bal IS NOT NULL OR ind_grp_dre IS NOT NULL
                THEN 'IND_GRP'
                
            ELSE 'NAO_CLASSIFICADO'
        END AS metodo_nivel1
        
    FROM contas_unificadas
)

SELECT 
    id_ecd,
    dt_referencia,
    cnpj,
    nm_empresarial,
    cd_uf,
    cd_conta,
    cd_conta_sup,
    descr_conta,
    nivel_conta,
    tp_conta_agl,
    tp_conta_pc,
    origem_demonstrativo,
    ind_grp_bal,
    ind_grp_dre,
    cd_natureza,
    ind_dc_cta_ini,
    ind_dc_cta_fin,
    classificacao_nivel1,
    confianca_nivel1,
    metodo_nivel1,
    vl_cta_ini,
    vl_cta_fin,
    
    -- Placeholders para pr√≥ximas camadas
    CAST(NULL AS STRING) AS classificacao_nivel2,
    CAST(NULL AS STRING) AS classificacao_nivel3,
    CAST(NULL AS STRING) AS cd_conta_referencial_matched,
    CAST(NULL AS DOUBLE) AS score_similaridade,
    CAST(NULL AS STRING) AS metodo_final
    
FROM classificacao_camada1
"""

print("Executando SQL... (pode levar alguns minutos)")
spark.sql(sql_criar_classificacao)

# Validar cria√ß√£o
total = spark.sql(f"SELECT COUNT(*) as total FROM {TABELA_SAIDA}").collect()[0]['total']
print(f"\n‚úÖ Tabela criada com {total:,} registros")

# Estat√≠sticas CAMADA 1
print("\nüìä ESTAT√çSTICAS - CAMADA 1 (Valida√ß√£o Cruzada):")
print("-" * 80)

stats_camada1 = spark.sql(f"""
    SELECT 
        classificacao_nivel1,
        confianca_nivel1,
        metodo_nivel1,
        COUNT(*) as qtd,
        ROUND(COUNT(*) * 100.0 / SUM(COUNT(*)) OVER (), 2) as perc
    FROM {TABELA_SAIDA}
    GROUP BY classificacao_nivel1, confianca_nivel1, metodo_nivel1
    ORDER BY qtd DESC
""")
stats_camada1.show(30, truncate=False)

print("\nüìä RESUMO POR ORIGEM:")
resumo_origem = spark.sql(f"""
    SELECT 
        origem_demonstrativo,
        COUNT(*) as total,
        SUM(CASE WHEN classificacao_nivel1 != 'NAO_CLASSIFICADO' THEN 1 ELSE 0 END) as classificados,
        ROUND(SUM(CASE WHEN classificacao_nivel1 != 'NAO_CLASSIFICADO' THEN 1 ELSE 0 END) * 100.0 / COUNT(*), 2) as perc_classificado
    FROM {TABELA_SAIDA}
    GROUP BY origem_demonstrativo
""")
resumo_origem.show()

print("=" * 80)
print("‚úÖ CAMADA 1 CONCLU√çDA!")

In [None]:
# ================================================================================
# C√âLULA 4: CLASSIFICA√á√ÉO CAMADA 2 - MATCH DE C√ìDIGO (Exato e Parcial)
# ================================================================================

print("\nüîç APLICANDO CAMADA 2 - MATCH DE C√ìDIGO...")
print("=" * 80)

from pyspark.sql.functions import col, when, length, substring, coalesce, lit

# ================================================================================
# CARREGAR DADOS
# ================================================================================

print("Carregando dados...")
df_contas = spark.table(f"{TABELA_SAIDA}")
df_referencia = spark.table(f"{DATABASE_DESTINO}.pc_referencia_completa")

print(f"‚úÖ Contas carregadas: {df_contas.count():,}")
print(f"‚úÖ Refer√™ncia carregada: {df_referencia.count():,}")

# ================================================================================
# ESTRAT√âGIA DE MATCH
# ================================================================================
print("\nüìã Estrat√©gia de Match:")
print("  1. Match EXATO: cod_agl = cd_conta_ref")
print("  2. Match PARCIAL (prefixo): primeiros 3-7 d√≠gitos")
print("  3. Heran√ßa via HIERARQUIA: usar cod_agl_sup")

# ================================================================================
# MATCH EXATO
# ================================================================================

print("\nüéØ Aplicando MATCH EXATO...")

# JOIN para match exato
df_match_exato = df_contas.alias("c") \
    .join(
        df_referencia.alias("r"),
        (col("c.cd_conta") == col("r.cd_conta_ref")) &
        (col("c.origem_demonstrativo") == col("r.origem_ref")),
        "left"
    ) \
    .select(
        col("c.*"),
        
        # Classifica√ß√µes da refer√™ncia (quando houver match)
        when(col("r.cd_conta_ref").isNotNull(), col("r.classificacao_nivel2"))
            .otherwise(col("c.classificacao_nivel2"))
            .alias("classificacao_nivel2_novo"),
            
        when(col("r.cd_conta_ref").isNotNull(), col("r.classificacao_nivel3"))
            .otherwise(col("c.classificacao_nivel3"))
            .alias("classificacao_nivel3_novo"),
            
        when(col("r.cd_conta_ref").isNotNull(), col("r.cd_conta_ref"))
            .otherwise(col("c.cd_conta_referencial_matched"))
            .alias("cd_conta_referencial_matched_novo"),
            
        # Score de similaridade (1.0 = match perfeito)
        when(col("r.cd_conta_ref").isNotNull(), lit(1.0))
            .otherwise(col("c.score_similaridade"))
            .alias("score_similaridade_novo"),
            
        # M√©todo
        when(col("r.cd_conta_ref").isNotNull(), lit("MATCH_CODIGO_EXATO"))
            .otherwise(col("c.metodo_final"))
            .alias("metodo_final_novo")
    )

# Estat√≠sticas do match exato
match_exato_count = df_match_exato.filter(col("metodo_final_novo") == "MATCH_CODIGO_EXATO").count()
print(f"‚úÖ Match exato: {match_exato_count:,} registros ({match_exato_count*100/df_contas.count():.2f}%)")

# ================================================================================
# MATCH PARCIAL (PREFIXO)
# ================================================================================

print("\nüîé Aplicando MATCH PARCIAL (prefixos)...")

# Para contas ainda n√£o classificadas em n√≠vel 2/3, tentar match parcial
df_match_parcial = df_match_exato

# Tentar prefixos de tamanho 7, 5, 3 (do mais espec√≠fico ao mais geral)
for tam_prefixo in [7, 5, 3]:
    print(f"  Tentando prefixo de {tam_prefixo} caracteres...")
    
    df_match_parcial = df_match_parcial.alias("c") \
        .join(
            df_referencia.alias("r"),
            (substring(col("c.cd_conta"), 1, tam_prefixo) == substring(col("r.cd_conta_ref"), 1, tam_prefixo)) &
            (col("c.origem_demonstrativo") == col("r.origem_ref")) &
            (col("c.classificacao_nivel2_novo").isNull() | (col("c.classificacao_nivel2_novo") == "")),
            "left"
        ) \
        .select(
            col("c.id_ecd"),
            col("c.dt_referencia"),
            col("c.cnpj"),
            col("c.nm_empresarial"),
            col("c.cd_uf"),
            col("c.cd_conta"),
            col("c.cd_conta_sup"),
            col("c.descr_conta"),
            col("c.nivel_conta"),
            col("c.tp_conta_agl"),
            col("c.tp_conta_pc"),
            col("c.origem_demonstrativo"),
            col("c.ind_grp_bal"),
            col("c.ind_grp_dre"),
            col("c.cd_natureza"),
            col("c.ind_dc_cta_ini"),
            col("c.ind_dc_cta_fin"),
            col("c.classificacao_nivel1"),
            col("c.confianca_nivel1"),
            col("c.metodo_nivel1"),
            col("c.vl_cta_ini"),
            col("c.vl_cta_fin"),
            
            # Atualizar classifica√ß√µes se encontrou match
            when(
                col("r.cd_conta_ref").isNotNull() & 
                (col("c.classificacao_nivel2_novo").isNull() | (col("c.classificacao_nivel2_novo") == "")),
                col("r.classificacao_nivel2")
            ).otherwise(col("c.classificacao_nivel2_novo")).alias("classificacao_nivel2_novo"),
            
            when(
                col("r.cd_conta_ref").isNotNull() & 
                (col("c.classificacao_nivel3_novo").isNull() | (col("c.classificacao_nivel3_novo") == "")),
                col("r.classificacao_nivel3")
            ).otherwise(col("c.classificacao_nivel3_novo")).alias("classificacao_nivel3_novo"),
            
            when(
                col("r.cd_conta_ref").isNotNull() & 
                (col("c.cd_conta_referencial_matched_novo").isNull() | (col("c.cd_conta_referencial_matched_novo") == "")),
                col("r.cd_conta_ref")
            ).otherwise(col("c.cd_conta_referencial_matched_novo")).alias("cd_conta_referencial_matched_novo"),
            
            when(
                col("r.cd_conta_ref").isNotNull() & 
                (col("c.score_similaridade_novo").isNull()),
                lit(tam_prefixo / 10.0)  # Score baseado no tamanho do prefixo
            ).otherwise(col("c.score_similaridade_novo")).alias("score_similaridade_novo"),
            
            when(
                col("r.cd_conta_ref").isNotNull() & 
                (col("c.metodo_final_novo").isNull() | (col("c.metodo_final_novo") == "")),
                lit(f"MATCH_CODIGO_PARCIAL_{tam_prefixo}")
            ).otherwise(col("c.metodo_final_novo")).alias("metodo_final_novo")
        )

match_parcial_count = df_match_parcial.filter(
    col("metodo_final_novo").like("MATCH_CODIGO_PARCIAL%")
).count()
print(f"‚úÖ Match parcial: {match_parcial_count:,} registros adicionais")

# ================================================================================
# HERAN√áA VIA HIERARQUIA (cod_agl_sup)
# ================================================================================

print("\nüå≥ Aplicando HERAN√áA VIA HIERARQUIA...")

# Para contas anal√≠ticas sem classifica√ß√£o, herdar da conta sint√©tica (pai)
df_com_hierarquia = df_match_parcial.alias("c") \
    .join(
        df_match_parcial.alias("pai").select(
            col("id_ecd").alias("pai_id_ecd"),
            col("cd_conta").alias("pai_cd_conta"),
            col("classificacao_nivel2_novo").alias("pai_nivel2"),
            col("classificacao_nivel3_novo").alias("pai_nivel3")
        ),
        (col("c.id_ecd") == col("pai_id_ecd")) &
        (col("c.cd_conta_sup") == col("pai_cd_conta")) &
        (col("c.classificacao_nivel2_novo").isNull() | (col("c.classificacao_nivel2_novo") == "")),
        "left"
    ) \
    .select(
        col("c.id_ecd"),
        col("c.dt_referencia"),
        col("c.cnpj"),
        col("c.nm_empresarial"),
        col("c.cd_uf"),
        col("c.cd_conta"),
        col("c.cd_conta_sup"),
        col("c.descr_conta"),
        col("c.nivel_conta"),
        col("c.tp_conta_agl"),
        col("c.tp_conta_pc"),
        col("c.origem_demonstrativo"),
        col("c.ind_grp_bal"),
        col("c.ind_grp_dre"),
        col("c.cd_natureza"),
        col("c.ind_dc_cta_ini"),
        col("c.ind_dc_cta_fin"),
        col("c.classificacao_nivel1"),
        col("c.confianca_nivel1"),
        col("c.metodo_nivel1"),
        col("c.vl_cta_ini"),
        col("c.vl_cta_fin"),
        
        # Herdar classifica√ß√£o do pai
        coalesce(col("c.classificacao_nivel2_novo"), col("pai_nivel2")).alias("classificacao_nivel2"),
        coalesce(col("c.classificacao_nivel3_novo"), col("pai_nivel3")).alias("classificacao_nivel3"),
        col("c.cd_conta_referencial_matched_novo").alias("cd_conta_referencial_matched"),
        col("c.score_similaridade_novo").alias("score_similaridade"),
        
        when(
            col("c.metodo_final_novo").isNull() & col("pai_nivel2").isNotNull(),
            lit("HERANCA_HIERARQUIA")
        ).otherwise(col("c.metodo_final_novo")).alias("metodo_final")
    )

heranca_count = df_com_hierarquia.filter(col("metodo_final") == "HERANCA_HIERARQUIA").count()
print(f"‚úÖ Heran√ßa via hierarquia: {heranca_count:,} registros adicionais")

# ================================================================================
# SALVAR RESULTADO
# ================================================================================

print("\nüíæ Salvando resultado...")

df_com_hierarquia.createOrReplaceTempView("temp_camada2")

spark.sql(f"""
    INSERT OVERWRITE TABLE {TABELA_SAIDA}
    SELECT * FROM temp_camada2
""")

print("‚úÖ Tabela atualizada!")

# ================================================================================
# ESTAT√çSTICAS FINAIS CAMADA 2
# ================================================================================

print("\nüìä ESTAT√çSTICAS - AP√ìS CAMADA 2:")
print("-" * 80)

stats_camada2 = spark.sql(f"""
    SELECT 
        metodo_final,
        COUNT(*) as qtd,
        ROUND(COUNT(*) * 100.0 / SUM(COUNT(*)) OVER (), 2) as perc
    FROM {TABELA_SAIDA}
    GROUP BY metodo_final
    ORDER BY qtd DESC
""")
stats_camada2.show(20, truncate=False)

print("\nüìä COBERTURA DE CLASSIFICA√á√ÉO:")
cobertura = spark.sql(f"""
    SELECT 
        COUNT(*) as total,
        SUM(CASE WHEN classificacao_nivel2 IS NOT NULL THEN 1 ELSE 0 END) as com_nivel2,
        SUM(CASE WHEN classificacao_nivel3 IS NOT NULL THEN 1 ELSE 0 END) as com_nivel3,
        ROUND(SUM(CASE WHEN classificacao_nivel2 IS NOT NULL THEN 1 ELSE 0 END) * 100.0 / COUNT(*), 2) as perc_nivel2,
        ROUND(SUM(CASE WHEN classificacao_nivel3 IS NOT NULL THEN 1 ELSE 0 END) * 100.0 / COUNT(*), 2) as perc_nivel3
    FROM {TABELA_SAIDA}
""")
cobertura.show(truncate=False)

print("=" * 80)
print("‚úÖ CAMADA 2 CONCLU√çDA!")

In [None]:
# ================================================================================
# C√âLULA 5 OTIMIZADA: MATCH POR DESCRI√á√ÉO (SEM LEVENSHTEIN - R√ÅPIDO!)
# ================================================================================

print("\nüìù APLICANDO CAMADA 3 - MATCH POR DESCRI√á√ÉO (VERS√ÉO OTIMIZADA)...")
print("=" * 80)

from pyspark.sql.functions import (
    col, when, lower, trim, regexp_replace, length, 
    coalesce, lit, split, array_contains, substring
)

# ================================================================================
# PREPARAR DADOS
# ================================================================================

print("\nüì• Preparando dados para text mining...")

# Carregar apenas contas ainda SEM classifica√ß√£o n√≠vel 2
df_sem_classificacao = spark.sql(f"""
    SELECT *
    FROM {TABELA_SAIDA}
    WHERE classificacao_nivel2 IS NULL 
        OR classificacao_nivel2 = ''
        OR classificacao_nivel2 = 'NAO_CLASSIFICADO'
""")

total_sem_class = df_sem_classificacao.count()
print(f"üìä Contas sem classifica√ß√£o n√≠vel 2: {total_sem_class:,}")

if total_sem_class == 0:
    print("‚úÖ Todas as contas j√° foram classificadas! Pulando CAMADA 3.")
else:
    # ================================================================================
    # NORMALIZAR DESCRI√á√ïES
    # ================================================================================
    
    print("\nüîß Normalizando descri√ß√µes...")
    
    df_normalizado = df_sem_classificacao.withColumn(
        "descr_normalizada",
        lower(trim(regexp_replace(col("descr_conta"), "[^a-zA-Z0-9\\s]", " ")))
    ).withColumn(
        "primeira_palavra",
        split(col("descr_normalizada"), " ").getItem(0)
    ).withColumn(
        "primeiras_3_palavras",
        substring(col("descr_normalizada"), 1, 50)  # Primeiros 50 caracteres
    )
    
    print("‚úÖ Descri√ß√µes normalizadas!")
    
    # ================================================================================
    # ESTRAT√âGIA 1: PALAVRAS-CHAVE (Keywords)
    # ================================================================================
    
    print("\nüîë ESTRAT√âGIA 1: Aplicando busca por palavras-chave...")
    
    # Mapa de palavras-chave EXPANDIDO (mais abrangente)
    keywords_map = {
        # ATIVO
        "DISPONIBILIDADES": ["caixa", "banco", "deposito", "disponivel", "numerario", "dinheiro"],
        "CREDITOS": ["cliente", "duplicata", "receber", "credito", "venda prazo", "contas receber"],
        "ESTOQUES": ["estoque", "mercadoria", "produto acabado", "materia prima", "almoxarifado", "insumo"],
        "INVESTIMENTOS": ["investimento", "participacao societaria", "acao", "quota", "aplicacao financeira"],
        "IMOBILIZADO": ["imovel", "veiculo", "maquina", "equipamento", "movel", "instalacao", "imobilizado", "edificacao"],
        "INTANGIVEL": ["software", "marca", "patente", "intangivel", "direito uso", "fundo comercio"],
        
        # PASSIVO
        "FORNECEDORES": ["fornecedor", "fornecimento", "compra prazo", "contas pagar"],
        "EMPRESTIMOS_CP": ["emprestimo", "financiamento curto", "banco emprestimo"],
        "TRIBUTOS_A_PAGAR": ["tributo", "imposto pagar", "icms pagar", "pis pagar", "cofins pagar", "irpj pagar", "csll pagar"],
        "SALARIOS_A_PAGAR": ["salario", "folha pagamento", "provisa ferias", "13 salario", "ordenado", "remuneracao"],
        "EMPRESTIMOS_LP": ["emprestimo longo prazo", "financiamento longo"],
        
        # PATRIM√îNIO L√çQUIDO
        "CAPITAL_SOCIAL": ["capital social", "capital subscrito", "capital integralizado"],
        "RESERVAS": ["reserva", "reserva legal", "reserva lucro"],
        "LUCROS_PREJUIZOS_ACUMULADOS": ["lucro acumulado", "prejuizo acumulado", "resultado exercicio"],
        
        # RESULTADO - RECEITAS
        "RECEITA_BRUTA": ["receita", "venda", "faturamento", "receita bruta", "comercializacao"],
        "DEDUCOES_RECEITA": ["devolucao", "desconto", "abatimento", "icms venda", "deducao"],
        
        # RESULTADO - CUSTOS/DESPESAS
        "CUSTOS": ["custo", "cmv", "cpv", "custo mercadoria", "custo produto"],
        "DESPESAS_OPERACIONAIS": ["despesa", "gasto", "despesa administrativa", "despesa comercial", "despesa venda"],
        "RECEITAS_FINANCEIRAS": ["juros recebido", "receita financeira", "rendimento", "aplicacao"],
        "DESPESAS_FINANCEIRAS": ["juros pago", "despesa financeira", "encargo", "iof"],
        
        # Adicionais
        "OUTRAS_RECEITAS_DESPESAS": ["outra receita", "outra despesa", "diversa"],
        "IMPOSTOS_RESULTADO": ["irpj", "csll", "imposto renda", "contribuicao social"]
    }
    
    # Criar condi√ß√µes WHEN para cada classifica√ß√£o
    classificacao_por_keyword = lit(None)
    
    for classificacao, keywords in keywords_map.items():
        condicao = lit(False)
        for keyword in keywords:
            condicao = condicao | col("descr_normalizada").contains(keyword)
        
        classificacao_por_keyword = when(condicao, lit(classificacao)).otherwise(classificacao_por_keyword)
    
    df_com_keywords = df_normalizado.withColumn(
        "classificacao_keyword",
        classificacao_por_keyword
    )
    
    keywords_match_count = df_com_keywords.filter(
        col("classificacao_keyword").isNotNull()
    ).count()
    
    print(f"‚úÖ Match por palavras-chave: {keywords_match_count:,} registros ({keywords_match_count*100/total_sem_class:.2f}%)")
    
    # ================================================================================
    # ESTRAT√âGIA 2: REGRAS HEUR√çSTICAS (cd_natureza + primeira_palavra)
    # ================================================================================
    
    print("\nüéØ ESTRAT√âGIA 2: Aplicando regras heur√≠sticas...")
    
    # Para contas ainda n√£o classificadas por keyword
    df_com_heuristicas = df_com_keywords.withColumn(
        "classificacao_heuristica",
        when(
            # Se j√° tem keyword, manter
            col("classificacao_keyword").isNotNull(),
            col("classificacao_keyword")
        ).otherwise(
            # Regras baseadas em cd_natureza + primeira palavra
            when(
                # ATIVO (natureza 01)
                (col("cd_natureza") == "01") & col("primeira_palavra").isin(
                    "banco", "caixa", "bcr", "bb", "santander", "itau", "bradesco", "sicoob"
                ), lit("DISPONIBILIDADES")
            ).when(
                (col("cd_natureza") == "01") & col("primeira_palavra").isin(
                    "estoque", "mercadoria", "produto", "materia", "insumo"
                ), lit("ESTOQUES")
            ).when(
                (col("cd_natureza") == "01") & col("primeira_palavra").isin(
                    "cliente", "duplicata", "conta", "titulo"
                ), lit("CREDITOS")
            ).when(
                (col("cd_natureza") == "01") & (col("nivel_conta") >= 3),
                # Ativo n√≠vel 3+ sem match espec√≠fico = provavelmente circulante
                lit("ATIVO_CIRCULANTE")
            ).when(
                # PASSIVO (natureza 02)
                (col("cd_natureza") == "02") & col("primeira_palavra").isin(
                    "fornecedor", "fornecimento", "duplicata", "compra"
                ), lit("FORNECEDORES")
            ).when(
                (col("cd_natureza") == "02") & col("primeira_palavra").isin(
                    "salario", "ordenado", "folha", "ferias", "13"
                ), lit("SALARIOS_A_PAGAR")
            ).when(
                (col("cd_natureza") == "02") & col("primeira_palavra").isin(
                    "tributo", "imposto", "icms", "pis", "cofins", "iss"
                ), lit("TRIBUTOS_A_PAGAR")
            ).when(
                (col("cd_natureza") == "02") & col("primeira_palavra").isin(
                    "emprestimo", "financiamento", "banco"
                ), lit("EMPRESTIMOS_CP")
            ).when(
                (col("cd_natureza") == "02") & (col("nivel_conta") >= 3),
                lit("PASSIVO_CIRCULANTE")
            ).when(
                # PATRIM√îNIO L√çQUIDO (natureza 03)
                (col("cd_natureza") == "03") & col("primeira_palavra").isin(
                    "capital", "subscrito", "integralizado"
                ), lit("CAPITAL_SOCIAL")
            ).when(
                (col("cd_natureza") == "03") & col("primeira_palavra").isin(
                    "reserva"
                ), lit("RESERVAS")
            ).when(
                (col("cd_natureza") == "03") & col("primeira_palavra").isin(
                    "lucro", "prejuizo", "resultado"
                ), lit("LUCROS_PREJUIZOS_ACUMULADOS")
            ).when(
                (col("cd_natureza") == "03"),
                lit("PATRIMONIO_LIQUIDO")  # Fallback gen√©rico
            ).when(
                # RESULTADO (natureza 04)
                (col("cd_natureza") == "04") & (col("ind_grp_dre") == "R") & col("primeira_palavra").isin(
                    "receita", "venda", "faturamento", "comercializacao"
                ), lit("RECEITA_BRUTA")
            ).when(
                (col("cd_natureza") == "04") & (col("ind_grp_dre") == "D") & col("primeira_palavra").isin(
                    "custo", "cmv", "cpv"
                ), lit("CUSTOS")
            ).when(
                (col("cd_natureza") == "04") & (col("ind_grp_dre") == "D") & col("primeira_palavra").isin(
                    "despesa", "gasto"
                ), lit("DESPESAS_OPERACIONAIS")
            ).when(
                (col("cd_natureza") == "04") & (col("ind_grp_dre") == "D") & col("primeira_palavra").isin(
                    "devolucao", "desconto", "abatimento"
                ), lit("DEDUCOES_RECEITA")
            ).when(
                (col("cd_natureza") == "04") & col("primeira_palavra").isin(
                    "juros", "juro"
                ),
                when(col("ind_grp_dre") == "R", lit("RECEITAS_FINANCEIRAS"))
                .otherwise(lit("DESPESAS_FINANCEIRAS"))
            ).when(
                (col("cd_natureza") == "04") & (col("ind_grp_dre") == "R"),
                lit("RECEITA_BRUTA")  # Fallback: Resultado + Receita
            ).when(
                (col("cd_natureza") == "04") & (col("ind_grp_dre") == "D"),
                lit("DESPESAS_OPERACIONAIS")  # Fallback: Resultado + Despesa
            ).otherwise(lit(None))
        )
    )
    
    heuristica_match_count = df_com_heuristicas.filter(
        col("classificacao_heuristica").isNotNull()
    ).count()
    
    adicional_heuristica = heuristica_match_count - keywords_match_count
    print(f"‚úÖ Match adicional por heur√≠sticas: {adicional_heuristica:,} registros")
    print(f"üìä Total classificados na CAMADA 3: {heuristica_match_count:,} ({heuristica_match_count*100/total_sem_class:.2f}%)")
    
    # ================================================================================
    # ESTRAT√âGIA 3: FALLBACK POR cd_natureza + origem
    # ================================================================================
    
    print("\nüîÑ ESTRAT√âGIA 3: Aplicando fallback gen√©rico...")
    
    df_com_fallback = df_com_heuristicas.withColumn(
        "classificacao_final_camada3",
        when(
            col("classificacao_heuristica").isNotNull(),
            col("classificacao_heuristica")
        ).otherwise(
            # Fallback: classifica√ß√£o gen√©rica baseada em cd_natureza
            when(
                (col("cd_natureza") == "01") & (col("origem_demonstrativo") == "BP"),
                when(col("nivel_conta") <= 2, lit("ATIVO")).otherwise(lit("ATIVO_CIRCULANTE"))
            ).when(
                (col("cd_natureza") == "02") & (col("origem_demonstrativo") == "BP"),
                when(col("nivel_conta") <= 2, lit("PASSIVO")).otherwise(lit("PASSIVO_CIRCULANTE"))
            ).when(
                (col("cd_natureza") == "03") & (col("origem_demonstrativo") == "BP"),
                lit("PATRIMONIO_LIQUIDO")
            ).when(
                (col("cd_natureza") == "04") & (col("origem_demonstrativo") == "DRE"),
                when(col("ind_grp_dre") == "R", lit("RECEITA_BRUTA")).otherwise(lit("DESPESAS_OPERACIONAIS"))
            ).otherwise(lit(None))
        )
    ).withColumn(
        "metodo_camada3",
        when(
            col("classificacao_keyword").isNotNull(),
            lit("MATCH_KEYWORDS")
        ).when(
            col("classificacao_heuristica").isNotNull(),
            lit("MATCH_HEURISTICAS")
        ).when(
            col("classificacao_final_camada3").isNotNull(),
            lit("FALLBACK_NATUREZA")
        ).otherwise(lit(None))
    )
    
    fallback_match_count = df_com_fallback.filter(
        col("classificacao_final_camada3").isNotNull()
    ).count()
    
    adicional_fallback = fallback_match_count - heuristica_match_count
    print(f"‚úÖ Match adicional por fallback: {adicional_fallback:,} registros")
    print(f"üìä TOTAL FINAL classificados: {fallback_match_count:,} ({fallback_match_count*100/total_sem_class:.2f}%)")
    
    # ================================================================================
    # ATUALIZAR TABELA PRINCIPAL
    # ================================================================================
    
    print("\nüíæ Atualizando tabela principal...")
    
    df_com_fallback.createOrReplaceTempView("temp_camada3")
    
    spark.sql(f"""
        CREATE OR REPLACE TEMP VIEW temp_merge AS
        SELECT 
            principal.id_ecd,
            principal.dt_referencia,
            principal.cnpj,
            principal.nm_empresarial,
            principal.cd_uf,
            principal.cd_conta,
            principal.cd_conta_sup,
            principal.descr_conta,
            principal.nivel_conta,
            principal.tp_conta_agl,
            principal.tp_conta_pc,
            principal.origem_demonstrativo,
            principal.ind_grp_bal,
            principal.ind_grp_dre,
            principal.cd_natureza,
            principal.ind_dc_cta_ini,
            principal.ind_dc_cta_fin,
            principal.classificacao_nivel1,
            principal.confianca_nivel1,
            principal.metodo_nivel1,
            principal.vl_cta_ini,
            principal.vl_cta_fin,
            
            -- Atualizar classificacao_nivel2 e nivel3
            COALESCE(cam3.classificacao_final_camada3, principal.classificacao_nivel2) AS classificacao_nivel2,
            principal.classificacao_nivel3,
            principal.cd_conta_referencial_matched,
            
            -- Score (0.8 para keywords/heur√≠sticas, 0.6 para fallback)
            COALESCE(
                CASE 
                    WHEN cam3.metodo_camada3 = 'MATCH_KEYWORDS' THEN 0.8
                    WHEN cam3.metodo_camada3 = 'MATCH_HEURISTICAS' THEN 0.75
                    WHEN cam3.metodo_camada3 = 'FALLBACK_NATUREZA' THEN 0.6
                    ELSE NULL
                END,
                principal.score_similaridade
            ) AS score_similaridade,
            
            -- Atualizar m√©todo
            COALESCE(cam3.metodo_camada3, principal.metodo_final) AS metodo_final
            
        FROM {TABELA_SAIDA} principal
        LEFT JOIN temp_camada3 cam3
            ON principal.id_ecd = cam3.id_ecd
            AND principal.cd_conta = cam3.cd_conta
    """)
    
    spark.sql(f"""
        INSERT OVERWRITE TABLE {TABELA_SAIDA}
        SELECT * FROM temp_merge
    """)
    
    print("‚úÖ Tabela atualizada!")

# ================================================================================
# ESTAT√çSTICAS FINAIS CAMADA 3
# ================================================================================

print("\nüìä ESTAT√çSTICAS - AP√ìS CAMADA 3:")
print("-" * 80)

stats_metodos = spark.sql(f"""
    SELECT 
        COALESCE(metodo_final, 'NAO_CLASSIFICADO') as metodo,
        COUNT(*) as qtd,
        ROUND(COUNT(*) * 100.0 / SUM(COUNT(*)) OVER (), 2) as perc
    FROM {TABELA_SAIDA}
    GROUP BY metodo_final
    ORDER BY qtd DESC
""")
stats_metodos.show(20, truncate=False)

print("\nüìä COBERTURA FINAL DE CLASSIFICA√á√ÉO:")
cobertura_final = spark.sql(f"""
    SELECT 
        COUNT(*) as total,
        SUM(CASE WHEN classificacao_nivel1 IS NOT NULL AND classificacao_nivel1 != 'NAO_CLASSIFICADO' THEN 1 ELSE 0 END) as com_nivel1,
        SUM(CASE WHEN classificacao_nivel2 IS NOT NULL AND classificacao_nivel2 != 'NAO_CLASSIFICADO' THEN 1 ELSE 0 END) as com_nivel2,
        SUM(CASE WHEN classificacao_nivel3 IS NOT NULL AND classificacao_nivel3 != 'NAO_CLASSIFICADO' THEN 1 ELSE 0 END) as com_nivel3,
        ROUND(SUM(CASE WHEN classificacao_nivel1 IS NOT NULL AND classificacao_nivel1 != 'NAO_CLASSIFICADO' THEN 1 ELSE 0 END) * 100.0 / COUNT(*), 2) as perc_nivel1,
        ROUND(SUM(CASE WHEN classificacao_nivel2 IS NOT NULL AND classificacao_nivel2 != 'NAO_CLASSIFICADO' THEN 1 ELSE 0 END) * 100.0 / COUNT(*), 2) as perc_nivel2,
        ROUND(SUM(CASE WHEN classificacao_nivel3 IS NOT NULL AND classificacao_nivel3 != 'NAO_CLASSIFICADO' THEN 1 ELSE 0 END) * 100.0 / COUNT(*), 2) as perc_nivel3
    FROM {TABELA_SAIDA}
""")
cobertura_final.show(truncate=False)

print("\nüìä TOP 20 CLASSIFICA√á√ïES N√çVEL 2:")
top20 = spark.sql(f"""
    SELECT 
        classificacao_nivel2,
        COUNT(*) as qtd,
        ROUND(COUNT(*) * 100.0 / SUM(COUNT(*)) OVER (), 2) as perc
    FROM {TABELA_SAIDA}
    WHERE classificacao_nivel2 IS NOT NULL 
        AND classificacao_nivel2 != 'NAO_CLASSIFICADO'
    GROUP BY classificacao_nivel2
    ORDER BY qtd DESC
    LIMIT 20
""")
top20.show(truncate=False)

print("\n" + "=" * 80)
print("‚úÖ CAMADA 3 OTIMIZADA CONCLU√çDA!")
print("=" * 80)
print("\n‚ö° Tempo economizado: HORAS ‚Üí MINUTOS!")
print("üí™ Performance: Sem cross join custoso!")
print("=" * 80)

In [None]:
# ================================================================================
# C√âLULA 6: VALIDA√á√ÉO CONT√ÅBIL E PREPARA√á√ÉO PARA ML
# ================================================================================

print("\n‚öñÔ∏è  VALIDA√á√ÉO CONT√ÅBIL E PREPARA√á√ÉO PARA ML...")
print("=" * 80)

from pyspark.sql.functions import col, sum as spark_sum, abs as spark_abs, when, lit

# ================================================================================
# CALCULAR SALDOS CONT√ÅBEIS POR EMPRESA
# ================================================================================

print("\nüìä Calculando saldos cont√°beis por empresa...")

# Calcular saldo cont√°bil (considerando natureza D/C)
df_com_saldos = spark.sql(f"""
    SELECT 
        *,
        -- Saldo final cont√°bil (D negativo, C positivo)
        CASE 
            WHEN ind_dc_cta_fin = 'D' THEN -vl_cta_fin
            WHEN ind_dc_cta_fin = 'C' THEN vl_cta_fin
            ELSE 0
        END AS saldo_final_contabil,
        
        -- Saldo inicial cont√°bil
        CASE 
            WHEN ind_dc_cta_ini = 'D' THEN -vl_cta_ini
            WHEN ind_dc_cta_ini = 'C' THEN vl_cta_ini
            ELSE 0
        END AS saldo_inicial_contabil
        
    FROM {TABELA_SAIDA}
    WHERE origem_demonstrativo = 'BP'  -- S√≥ BP para equa√ß√£o cont√°bil
""")

df_com_saldos.createOrReplaceTempView("temp_saldos")

# ================================================================================
# EQUA√á√ÉO CONT√ÅBIL POR EMPRESA
# ================================================================================

print("\nüßÆ Calculando equa√ß√£o cont√°bil: ATIVO = PASSIVO + PL...")

equacao_por_empresa = spark.sql(f"""
    SELECT 
        id_ecd,
        cnpj,
        nm_empresarial,
        
        -- ATIVO (d√©bito = negativo, ent√£o soma com sinal negativo = valor positivo)
        SUM(CASE WHEN classificacao_nivel1 = 'ATIVO' THEN -saldo_final_contabil ELSE 0 END) AS total_ativo,
        
        -- PASSIVO (cr√©dito = positivo)
        SUM(CASE WHEN classificacao_nivel1 = 'PASSIVO' THEN saldo_final_contabil ELSE 0 END) AS total_passivo,
        
        -- PATRIM√îNIO L√çQUIDO (cr√©dito = positivo)
        SUM(CASE WHEN classificacao_nivel1 = 'PATRIMONIO_LIQUIDO' THEN saldo_final_contabil ELSE 0 END) AS total_pl,
        
        -- Equa√ß√£o: Ativo - (Passivo + PL) = deveria ser 0
        SUM(CASE WHEN classificacao_nivel1 = 'ATIVO' THEN -saldo_final_contabil ELSE 0 END) -
        (
            SUM(CASE WHEN classificacao_nivel1 = 'PASSIVO' THEN saldo_final_contabil ELSE 0 END) +
            SUM(CASE WHEN classificacao_nivel1 = 'PATRIMONIO_LIQUIDO' THEN saldo_final_contabil ELSE 0 END)
        ) AS diferenca_equacao,
        
        -- Diferen√ßa percentual
        CASE 
            WHEN SUM(CASE WHEN classificacao_nivel1 = 'ATIVO' THEN -saldo_final_contabil ELSE 0 END) != 0 THEN
                ABS(
                    (SUM(CASE WHEN classificacao_nivel1 = 'ATIVO' THEN -saldo_final_contabil ELSE 0 END) -
                    (SUM(CASE WHEN classificacao_nivel1 = 'PASSIVO' THEN saldo_final_contabil ELSE 0 END) +
                     SUM(CASE WHEN classificacao_nivel1 = 'PATRIMONIO_LIQUIDO' THEN saldo_final_contabil ELSE 0 END))) * 100.0 /
                    SUM(CASE WHEN classificacao_nivel1 = 'ATIVO' THEN -saldo_final_contabil ELSE 0 END)
                )
            ELSE 999.99
        END AS diferenca_percentual
        
    FROM temp_saldos
    GROUP BY id_ecd, cnpj, nm_empresarial
    HAVING SUM(CASE WHEN classificacao_nivel1 = 'ATIVO' THEN -saldo_final_contabil ELSE 0 END) != 0
""")

equacao_por_empresa.createOrReplaceTempView("temp_equacao")

print("‚úÖ Equa√ß√£o cont√°bil calculada!")

# ================================================================================
# VALIDAR EMPRESAS (Toler√¢ncia configur√°vel)
# ================================================================================

print(f"\n‚úÖ Validando empresas (toler√¢ncia: {TOLERANCIA_EQUACAO}%)...")

empresas_validas = spark.sql(f"""
    SELECT 
        *,
        CASE 
            WHEN ABS(diferenca_percentual) <= {TOLERANCIA_EQUACAO} THEN TRUE
            ELSE FALSE
        END AS equacao_valida
    FROM temp_equacao
""")

empresas_validas.createOrReplaceTempView("temp_empresas_validas")

# Estat√≠sticas
total_empresas = empresas_validas.count()
empresas_ok = empresas_validas.filter(col("equacao_valida") == True).count()
empresas_nok = total_empresas - empresas_ok

print(f"\nüìä RESULTADOS DA VALIDA√á√ÉO:")
print(f"  Total de empresas: {total_empresas:,}")
print(f"  ‚úÖ Equa√ß√£o OK (‚â§{TOLERANCIA_EQUACAO}%): {empresas_ok:,} ({empresas_ok*100/total_empresas:.2f}%)")
print(f"  ‚ùå Equa√ß√£o NOK (>{TOLERANCIA_EQUACAO}%): {empresas_nok:,} ({empresas_nok*100/total_empresas:.2f}%)")

# Amostra de empresas com equa√ß√£o OK
print("\nüìã AMOSTRA - Empresas com equa√ß√£o OK:")
spark.sql("""
    SELECT 
        cnpj, nm_empresarial,
        ROUND(total_ativo, 2) as ativo,
        ROUND(total_passivo, 2) as passivo,
        ROUND(total_pl, 2) as pl,
        ROUND(diferenca_equacao, 2) as diferenca,
        ROUND(diferenca_percentual, 2) as diff_perc
    FROM temp_empresas_validas
    WHERE equacao_valida = TRUE
    ORDER BY ABS(diferenca_percentual)
    LIMIT 10
""").show(truncate=False)

# Amostra de empresas com problemas
print("\n‚ö†Ô∏è  AMOSTRA - Empresas com equa√ß√£o com problemas:")
spark.sql("""
    SELECT 
        cnpj, nm_empresarial,
        ROUND(total_ativo, 2) as ativo,
        ROUND(total_passivo, 2) as passivo,
        ROUND(total_pl, 2) as pl,
        ROUND(diferenca_equacao, 2) as diferenca,
        ROUND(diferenca_percentual, 2) as diff_perc
    FROM temp_empresas_validas
    WHERE equacao_valida = FALSE
    ORDER BY ABS(diferenca_percentual)
    LIMIT 10
""").show(truncate=False)

# ================================================================================
# MARCAR CONTAS PARA TREINO DE ML
# ================================================================================

print("\nü§ñ Marcando contas para treino de Machine Learning...")

# Criar flag indicando se a conta pode ser usada para treino
spark.sql(f"""
    CREATE OR REPLACE TEMP VIEW temp_final AS
    SELECT 
        c.*,
        CASE 
            WHEN e.equacao_valida = TRUE 
                AND c.confianca_nivel1 IN ('MUITO_ALTA', 'ALTA')
                AND c.classificacao_nivel2 IS NOT NULL
                THEN TRUE
            ELSE FALSE
        END AS usar_para_treino_ml,
        
        e.equacao_valida AS empresa_equacao_ok,
        e.diferenca_percentual AS empresa_diff_percentual
        
    FROM {TABELA_SAIDA} c
    LEFT JOIN temp_empresas_validas e
        ON c.id_ecd = e.id_ecd
""")

# Estat√≠sticas de contas para ML
contas_para_ml = spark.sql("""
    SELECT 
        COUNT(*) as total_contas,
        SUM(CASE WHEN usar_para_treino_ml = TRUE THEN 1 ELSE 0 END) as contas_treino,
        ROUND(SUM(CASE WHEN usar_para_treino_ml = TRUE THEN 1 ELSE 0 END) * 100.0 / COUNT(*), 2) as perc_treino
    FROM temp_final
""")

print("\nüìä CONTAS DISPON√çVEIS PARA TREINO DE ML:")
contas_para_ml.show(truncate=False)

# Distribui√ß√£o por classifica√ß√£o
print("\nüìä DISTRIBUI√á√ÉO DE CONTAS PARA TREINO (por classifica√ß√£o):")
spark.sql("""
    SELECT 
        classificacao_nivel2,
        COUNT(*) as qtd_treino
    FROM temp_final
    WHERE usar_para_treino_ml = TRUE
    GROUP BY classificacao_nivel2
    ORDER BY qtd_treino DESC
""").show(20, truncate=False)

# ================================================================================
# SALVAR TABELA FINAL
# ================================================================================

print("\nüíæ Salvando tabela final...")

spark.sql(f"""
    CREATE TABLE IF NOT EXISTS {DATABASE_DESTINO}.ecd_contas_classificadas_final (
        id_ecd BIGINT,
        dt_referencia INT,
        cnpj STRING,
        nm_empresarial STRING,
        cd_uf STRING,
        cd_conta STRING,
        cd_conta_sup STRING,
        descr_conta STRING,
        nivel_conta INT,
        tp_conta_agl STRING,
        tp_conta_pc STRING,
        origem_demonstrativo STRING,
        ind_grp_bal STRING,
        ind_grp_dre STRING,
        cd_natureza STRING,
        ind_dc_cta_ini STRING,
        ind_dc_cta_fin STRING,
        classificacao_nivel1 STRING,
        confianca_nivel1 STRING,
        metodo_nivel1 STRING,
        vl_cta_ini DOUBLE,
        vl_cta_fin DOUBLE,
        classificacao_nivel2 STRING,
        classificacao_nivel3 STRING,
        cd_conta_referencial_matched STRING,
        score_similaridade DOUBLE,
        metodo_final STRING,
        usar_para_treino_ml BOOLEAN,
        empresa_equacao_ok BOOLEAN,
        empresa_diff_percentual DOUBLE
    )
    PARTITIONED BY (ano_referencia INT)
    STORED AS PARQUET
""")

spark.sql(f"""
    INSERT OVERWRITE TABLE {DATABASE_DESTINO}.ecd_contas_classificadas_final
    PARTITION (ano_referencia = {ANO_REFERENCIA})
    SELECT 
        id_ecd, dt_referencia, cnpj, nm_empresarial, cd_uf,
        cd_conta, cd_conta_sup, descr_conta, nivel_conta, tp_conta_agl, tp_conta_pc,
        origem_demonstrativo, ind_grp_bal, ind_grp_dre, cd_natureza,
        ind_dc_cta_ini, ind_dc_cta_fin,
        classificacao_nivel1, confianca_nivel1, metodo_nivel1,
        vl_cta_ini, vl_cta_fin,
        classificacao_nivel2, classificacao_nivel3,
        cd_conta_referencial_matched, score_similaridade, metodo_final,
        usar_para_treino_ml, empresa_equacao_ok, empresa_diff_percentual
    FROM temp_final
""")

print(f"‚úÖ Tabela final salva: {DATABASE_DESTINO}.ecd_contas_classificadas_final")

# ================================================================================
# RELAT√ìRIO FINAL CONSOLIDADO
# ================================================================================

print("\n" + "=" * 80)
print("üìä RELAT√ìRIO FINAL - PIPELINE COMPLETO")
print("=" * 80)

print("\n1Ô∏è‚É£  COBERTURA DE CLASSIFICA√á√ÉO:")
spark.sql(f"""
    SELECT 
        COUNT(*) as total_contas,
        SUM(CASE WHEN classificacao_nivel1 IS NOT NULL THEN 1 ELSE 0 END) as nivel1,
        SUM(CASE WHEN classificacao_nivel2 IS NOT NULL THEN 1 ELSE 0 END) as nivel2,
        SUM(CASE WHEN classificacao_nivel3 IS NOT NULL THEN 1 ELSE 0 END) as nivel3,
        ROUND(SUM(CASE WHEN classificacao_nivel1 IS NOT NULL THEN 1 ELSE 0 END) * 100.0 / COUNT(*), 2) as perc_n1,
        ROUND(SUM(CASE WHEN classificacao_nivel2 IS NOT NULL THEN 1 ELSE 0 END) * 100.0 / COUNT(*), 2) as perc_n2,
        ROUND(SUM(CASE WHEN classificacao_nivel3 IS NOT NULL THEN 1 ELSE 0 END) * 100.0 / COUNT(*), 2) as perc_n3
    FROM temp_final
""").show(truncate=False)

print("\n2Ô∏è‚É£  M√âTODOS DE CLASSIFICA√á√ÉO:")
spark.sql("""
    SELECT 
        metodo_final,
        COUNT(*) as qtd,
        ROUND(COUNT(*) * 100.0 / SUM(COUNT(*)) OVER (), 2) as perc
    FROM temp_final
    WHERE metodo_final IS NOT NULL AND metodo_final != ''
    GROUP BY metodo_final
    ORDER BY qtd DESC
""").show(20, truncate=False)

print("\n3Ô∏è‚É£  CONFIAN√áA DA CLASSIFICA√á√ÉO:")
spark.sql("""
    SELECT 
        confianca_nivel1,
        COUNT(*) as qtd,
        ROUND(COUNT(*) * 100.0 / SUM(COUNT(*)) OVER (), 2) as perc
    FROM temp_final
    GROUP BY confianca_nivel1
    ORDER BY 
        CASE confianca_nivel1
            WHEN 'MUITO_ALTA' THEN 1
            WHEN 'ALTA' THEN 2
            WHEN 'MEDIA' THEN 3
            WHEN 'BAIXA' THEN 4
        END
""").show(truncate=False)

print("\n4Ô∏è‚É£  VALIDA√á√ÉO CONT√ÅBIL:")
print(f"  ‚úÖ Empresas com equa√ß√£o OK: {empresas_ok:,} / {total_empresas:,} ({empresas_ok*100/total_empresas:.2f}%)")
print(f"  ü§ñ Contas prontas para ML: {contas_para_ml.collect()[0]['contas_treino']:,}")

print("\n" + "=" * 80)
print("‚úÖ PIPELINE COMPLETO EXECUTADO COM SUCESSO!")
print("=" * 80)
print(f"\nüìÇ Tabela final: {DATABASE_DESTINO}.ecd_contas_classificadas_final")
print(f"üìÖ Ano processado: {ANO_REFERENCIA}")
print(f"üìç UF: {UF_FILTRO}")
print("\nüéØ Pr√≥ximos passos:")
print("  1. Treinar modelos de ML com as contas marcadas (usar_para_treino_ml = TRUE)")
print("  2. Aplicar ML nas contas n√£o classificadas")
print("  3. Iterar para melhorar cobertura")
print("=" * 80)

In [None]:
# ================================================================================
# C√âLULA 6.5: DEDUPLICA√á√ÉO DA TABELA FINAL
# ================================================================================

print("\nüßπ DEDUPLICANDO TABELA FINAL...")
print("=" * 80)

# ================================================================================
# DIAGN√ìSTICO INICIAL
# ================================================================================

print("\nüìä DIAGN√ìSTICO - Antes da deduplica√ß√£o:")

# Total de registros
total_antes = spark.sql(f"""
    SELECT COUNT(*) as total
    FROM {DATABASE_DESTINO}.ecd_contas_classificadas_final
    WHERE ano_referencia = {ANO_REFERENCIA}
""").collect()[0]['total']

print(f"  Total de registros: {total_antes:,}")

# Registros √∫nicos (por id_ecd + cd_conta)
unicos_antes = spark.sql(f"""
    SELECT COUNT(DISTINCT id_ecd, cd_conta) as unicos
    FROM {DATABASE_DESTINO}.ecd_contas_classificadas_final
    WHERE ano_referencia = {ANO_REFERENCIA}
""").collect()[0]['unicos']

print(f"  Registros √∫nicos (id_ecd + cd_conta): {unicos_antes:,}")

duplicatas = total_antes - unicos_antes
perc_duplicatas = (duplicatas * 100.0 / total_antes) if total_antes > 0 else 0

print(f"  Duplicatas: {duplicatas:,} ({perc_duplicatas:.2f}%)")

if duplicatas == 0:
    print("\n‚úÖ N√£o h√° duplicatas! Tabela j√° est√° limpa.")
    print("=" * 80)
else:
    # ================================================================================
    # ESTRAT√âGIA DE DEDUPLICA√á√ÉO
    # ================================================================================
    
    print("\nüîß Estrat√©gia de deduplica√ß√£o:")
    print("  1. Manter apenas 1 registro por (id_ecd, cd_conta)")
    print("  2. Prioridade: melhor m√©todo de classifica√ß√£o + maior confian√ßa")
    print("  3. Usar ROW_NUMBER() com ORDER BY apropriado")
    
    # ================================================================================
    # CRIAR TABELA DEDUPLICADA
    # ================================================================================
    
    print("\nüíæ Criando tabela deduplicada...")
    
    spark.sql(f"""
        CREATE OR REPLACE TEMP VIEW temp_deduplicada AS
        WITH ranked AS (
            SELECT 
                *,
                ROW_NUMBER() OVER (
                    PARTITION BY id_ecd, cd_conta
                    ORDER BY 
                        -- Prioridade 1: Melhor m√©todo
                        CASE metodo_final
                            WHEN 'MATCH_CODIGO_EXATO' THEN 1
                            WHEN 'MATCH_CODIGO_PARCIAL_7' THEN 2
                            WHEN 'MATCH_CODIGO_PARCIAL_5' THEN 3
                            WHEN 'MATCH_CODIGO_PARCIAL_3' THEN 4
                            WHEN 'HERANCA_HIERARQUIA' THEN 5
                            WHEN 'MATCH_KEYWORDS' THEN 6
                            WHEN 'MATCH_HEURISTICAS' THEN 7
                            WHEN 'FALLBACK_NATUREZA' THEN 8
                            ELSE 99
                        END,
                        -- Prioridade 2: Melhor confian√ßa
                        CASE confianca_nivel1
                            WHEN 'MUITO_ALTA' THEN 1
                            WHEN 'ALTA' THEN 2
                            WHEN 'MEDIA' THEN 3
                            WHEN 'BAIXA' THEN 4
                            ELSE 99
                        END,
                        -- Prioridade 3: Maior score de similaridade
                        score_similaridade DESC NULLS LAST,
                        -- Prioridade 4: Mais recente (desempate)
                        dt_referencia DESC
                ) as rn
            FROM {DATABASE_DESTINO}.ecd_contas_classificadas_final
            WHERE ano_referencia = {ANO_REFERENCIA}
        )
        SELECT 
            id_ecd, dt_referencia, cnpj, nm_empresarial, cd_uf,
            cd_conta, cd_conta_sup, descr_conta, nivel_conta, 
            tp_conta_agl, tp_conta_pc, origem_demonstrativo,
            ind_grp_bal, ind_grp_dre, cd_natureza,
            ind_dc_cta_ini, ind_dc_cta_fin,
            classificacao_nivel1, confianca_nivel1, metodo_nivel1,
            vl_cta_ini, vl_cta_fin,
            classificacao_nivel2, classificacao_nivel3,
            cd_conta_referencial_matched, score_similaridade, metodo_final,
            usar_para_treino_ml, empresa_equacao_ok, empresa_diff_percentual
        FROM ranked
        WHERE rn = 1
    """)
    
    print("‚úÖ View tempor√°ria criada!")
    
    # ================================================================================
    # VALIDAR DEDUPLICA√á√ÉO
    # ================================================================================
    
    print("\nüîç Validando deduplica√ß√£o...")
    
    total_depois = spark.sql("SELECT COUNT(*) as total FROM temp_deduplicada").collect()[0]['total']
    
    print(f"  Registros ap√≥s deduplica√ß√£o: {total_depois:,}")
    print(f"  Registros removidos: {total_antes - total_depois:,}")
    print(f"  Redu√ß√£o: {((total_antes - total_depois) * 100.0 / total_antes):.2f}%")
    
    # Verificar se n√£o perdemos contas √∫nicas
    if total_depois != unicos_antes:
        print(f"\n‚ö†Ô∏è  ATEN√á√ÉO: Esperava {unicos_antes:,} registros √∫nicos, mas obteve {total_depois:,}")
    else:
        print("\n‚úÖ Valida√ß√£o OK: n√∫mero de registros √∫nicos est√° correto!")
    
    # ================================================================================
    # ESTAT√çSTICAS DA DEDUPLICA√á√ÉO
    # ================================================================================
    
    print("\nüìä Estat√≠sticas ap√≥s deduplica√ß√£o:")
    
    # Cobertura
    cobertura = spark.sql("""
        SELECT 
            COUNT(*) as total,
            SUM(CASE WHEN classificacao_nivel1 IS NOT NULL THEN 1 ELSE 0 END) as nivel1,
            SUM(CASE WHEN classificacao_nivel2 IS NOT NULL THEN 1 ELSE 0 END) as nivel2,
            SUM(CASE WHEN classificacao_nivel3 IS NOT NULL THEN 1 ELSE 0 END) as nivel3,
            ROUND(SUM(CASE WHEN classificacao_nivel1 IS NOT NULL THEN 1 ELSE 0 END) * 100.0 / COUNT(*), 2) as perc_n1,
            ROUND(SUM(CASE WHEN classificacao_nivel2 IS NOT NULL THEN 1 ELSE 0 END) * 100.0 / COUNT(*), 2) as perc_n2,
            ROUND(SUM(CASE WHEN classificacao_nivel3 IS NOT NULL THEN 1 ELSE 0 END) * 100.0 / COUNT(*), 2) as perc_n3
        FROM temp_deduplicada
    """)
    cobertura.show(truncate=False)
    
    # M√©todos
    print("\nüìä Distribui√ß√£o por m√©todo (ap√≥s deduplica√ß√£o):")
    metodos = spark.sql("""
        SELECT 
            COALESCE(metodo_final, 'NAO_CLASSIFICADO') as metodo,
            COUNT(*) as qtd,
            ROUND(COUNT(*) * 100.0 / SUM(COUNT(*)) OVER (), 2) as perc
        FROM temp_deduplicada
        GROUP BY metodo_final
        ORDER BY qtd DESC
        LIMIT 10
    """)
    metodos.show(truncate=False)
    
    # Contas para ML
    print("\nüìä Contas para ML (ap√≥s deduplica√ß√£o):")
    ml_stats = spark.sql("""
        SELECT 
            COUNT(*) as total,
            SUM(CASE WHEN usar_para_treino_ml = TRUE THEN 1 ELSE 0 END) as para_ml,
            ROUND(SUM(CASE WHEN usar_para_treino_ml = TRUE THEN 1 ELSE 0 END) * 100.0 / COUNT(*), 2) as perc_ml
        FROM temp_deduplicada
    """)
    ml_stats.show(truncate=False)
    
    # ================================================================================
    # SALVAR TABELA DEDUPLICADA
    # ================================================================================
    
    print("\nüíæ Salvando tabela deduplicada...")
    print("‚è≥ Isso pode levar alguns minutos...")
    
    spark.sql(f"""
        INSERT OVERWRITE TABLE {DATABASE_DESTINO}.ecd_contas_classificadas_final
        PARTITION (ano_referencia = {ANO_REFERENCIA})
        SELECT * FROM temp_deduplicada
    """)
    
    print("‚úÖ Tabela deduplicada salva!")
    
    # ================================================================================
    # VALIDA√á√ÉO FINAL
    # ================================================================================
    
    print("\nüîç Valida√ß√£o final...")
    
    final_count = spark.sql(f"""
        SELECT COUNT(*) as total
        FROM {DATABASE_DESTINO}.ecd_contas_classificadas_final
        WHERE ano_referencia = {ANO_REFERENCIA}
    """).collect()[0]['total']
    
    print(f"  Registros na tabela final: {final_count:,}")
    
    # ================================================================================
    # RESUMO
    # ================================================================================
    
    print("\n" + "=" * 80)
    print("üìä RESUMO DA DEDUPLICA√á√ÉO")
    print("=" * 80)
    print(f"  Registros ANTES: {total_antes:,}")
    print(f"  Registros DEPOIS: {final_count:,}")
    print(f"  Duplicatas removidas: {total_antes - final_count:,}")
    print(f"  Redu√ß√£o: {((total_antes - final_count) * 100.0 / total_antes):.2f}%")
    print(f"  Espa√ßo economizado: ~{((total_antes - final_count) / 1000000):.1f} milh√µes de registros")
    print("=" * 80)
    print("‚úÖ DEDUPLICA√á√ÉO CONCLU√çDA!")
    print("=" * 80)

In [None]:
# ================================================================================
# C√âLULA 7: VISUALIZA√á√ïES E AN√ÅLISES DETALHADAS
# ================================================================================

print("\nüìä VISUALIZA√á√ïES E AN√ÅLISES DETALHADAS DOS RESULTADOS...")
print("=" * 80)

import plotly.graph_objects as go
import plotly.express as px
from plotly.subplots import make_subplots
import pandas as pd

# ================================================================================
# CARREGAR DADOS
# ================================================================================

print("\nüì• Carregando dados para an√°lise...")

df_final = spark.table(f"{DATABASE_DESTINO}.ecd_contas_classificadas_final").filter(
    col("ano_referencia") == ANO_REFERENCIA
)

total_registros = df_final.count()
print(f"‚úÖ Total de registros: {total_registros:,}")

# ================================================================================
# AN√ÅLISE 1: COBERTURA DE CLASSIFICA√á√ÉO POR N√çVEL
# ================================================================================

print("\nüìä AN√ÅLISE 1: Cobertura de Classifica√ß√£o")

cobertura_df = spark.sql(f"""
    SELECT 
        'N√≠vel 1' as nivel,
        COUNT(*) as total,
        SUM(CASE WHEN classificacao_nivel1 IS NOT NULL AND classificacao_nivel1 != 'NAO_CLASSIFICADO' THEN 1 ELSE 0 END) as classificados,
        ROUND(SUM(CASE WHEN classificacao_nivel1 IS NOT NULL AND classificacao_nivel1 != 'NAO_CLASSIFICADO' THEN 1 ELSE 0 END) * 100.0 / COUNT(*), 2) as percentual
    FROM {DATABASE_DESTINO}.ecd_contas_classificadas_final
    WHERE ano_referencia = {ANO_REFERENCIA}
    
    UNION ALL
    
    SELECT 
        'N√≠vel 2' as nivel,
        COUNT(*) as total,
        SUM(CASE WHEN classificacao_nivel2 IS NOT NULL AND classificacao_nivel2 != 'NAO_CLASSIFICADO' THEN 1 ELSE 0 END) as classificados,
        ROUND(SUM(CASE WHEN classificacao_nivel2 IS NOT NULL AND classificacao_nivel2 != 'NAO_CLASSIFICADO' THEN 1 ELSE 0 END) * 100.0 / COUNT(*), 2) as percentual
    FROM {DATABASE_DESTINO}.ecd_contas_classificadas_final
    WHERE ano_referencia = {ANO_REFERENCIA}
    
    UNION ALL
    
    SELECT 
        'N√≠vel 3' as nivel,
        COUNT(*) as total,
        SUM(CASE WHEN classificacao_nivel3 IS NOT NULL AND classificacao_nivel3 != 'NAO_CLASSIFICADO' THEN 1 ELSE 0 END) as classificados,
        ROUND(SUM(CASE WHEN classificacao_nivel3 IS NOT NULL AND classificacao_nivel3 != 'NAO_CLASSIFICADO' THEN 1 ELSE 0 END) * 100.0 / COUNT(*), 2) as percentual
    FROM {DATABASE_DESTINO}.ecd_contas_classificadas_final
    WHERE ano_referencia = {ANO_REFERENCIA}
""").toPandas()

print(cobertura_df)

# Gr√°fico de cobertura
fig_cobertura = go.Figure()
fig_cobertura.add_trace(go.Bar(
    x=cobertura_df['nivel'],
    y=cobertura_df['percentual'],
    text=cobertura_df['percentual'].apply(lambda x: f"{x:.1f}%"),
    textposition='auto',
    marker_color=['#2ecc71', '#3498db', '#e74c3c']
))
fig_cobertura.update_layout(
    title=f"Cobertura de Classifica√ß√£o por N√≠vel - {UF_FILTRO} {ANO_REFERENCIA}",
    xaxis_title="N√≠vel de Classifica√ß√£o",
    yaxis_title="Percentual Classificado (%)",
    yaxis_range=[0, 105],
    height=400
)
fig_cobertura.show()

# ================================================================================
# AN√ÅLISE 2: DISTRIBUI√á√ÉO POR M√âTODO DE CLASSIFICA√á√ÉO
# ================================================================================

print("\nüìä AN√ÅLISE 2: Distribui√ß√£o por M√©todo de Classifica√ß√£o")

metodos_df = spark.sql(f"""
    SELECT 
        COALESCE(metodo_final, 'NAO_CLASSIFICADO') as metodo,
        COUNT(*) as qtd,
        ROUND(COUNT(*) * 100.0 / SUM(COUNT(*)) OVER (), 2) as percentual
    FROM {DATABASE_DESTINO}.ecd_contas_classificadas_final
    WHERE ano_referencia = {ANO_REFERENCIA}
    GROUP BY metodo_final
    ORDER BY qtd DESC
""").toPandas()

print(metodos_df)

# Gr√°fico de pizza - M√©todos
fig_metodos = go.Figure(data=[go.Pie(
    labels=metodos_df['metodo'],
    values=metodos_df['qtd'],
    hole=.3,
    textinfo='label+percent',
    textposition='auto'
)])
fig_metodos.update_layout(
    title=f"Distribui√ß√£o por M√©todo de Classifica√ß√£o - {UF_FILTRO} {ANO_REFERENCIA}",
    height=500
)
fig_metodos.show()

# ================================================================================
# AN√ÅLISE 3: CONFIAN√áA DA CLASSIFICA√á√ÉO
# ================================================================================

print("\nüìä AN√ÅLISE 3: N√≠veis de Confian√ßa")

confianca_df = spark.sql(f"""
    SELECT 
        COALESCE(confianca_nivel1, 'NAO_DEFINIDA') as confianca,
        COUNT(*) as qtd,
        ROUND(COUNT(*) * 100.0 / SUM(COUNT(*)) OVER (), 2) as percentual
    FROM {DATABASE_DESTINO}.ecd_contas_classificadas_final
    WHERE ano_referencia = {ANO_REFERENCIA}
    GROUP BY confianca_nivel1
    ORDER BY 
        CASE confianca_nivel1
            WHEN 'MUITO_ALTA' THEN 1
            WHEN 'ALTA' THEN 2
            WHEN 'MEDIA' THEN 3
            WHEN 'BAIXA' THEN 4
            ELSE 5
        END
""").toPandas()

print(confianca_df)

# Gr√°fico de barras - Confian√ßa
fig_confianca = go.Figure()
colors_conf = {'MUITO_ALTA': '#27ae60', 'ALTA': '#2ecc71', 'MEDIA': '#f39c12', 'BAIXA': '#e74c3c', 'NAO_DEFINIDA': '#95a5a6'}
fig_confianca.add_trace(go.Bar(
    x=confianca_df['confianca'],
    y=confianca_df['qtd'],
    text=confianca_df['percentual'].apply(lambda x: f"{x:.1f}%"),
    textposition='auto',
    marker_color=[colors_conf.get(x, '#95a5a6') for x in confianca_df['confianca']]
))
fig_confianca.update_layout(
    title=f"Distribui√ß√£o por N√≠vel de Confian√ßa - {UF_FILTRO} {ANO_REFERENCIA}",
    xaxis_title="N√≠vel de Confian√ßa",
    yaxis_title="Quantidade de Contas",
    height=400
)
fig_confianca.show()

# ================================================================================
# AN√ÅLISE 4: TOP 20 CLASSIFICA√á√ïES N√çVEL 2
# ================================================================================

print("\nüìä AN√ÅLISE 4: Top 20 Classifica√ß√µes (N√≠vel 2)")

top_class_df = spark.sql(f"""
    SELECT 
        COALESCE(classificacao_nivel2, 'NAO_CLASSIFICADO') as classificacao,
        origem_demonstrativo,
        COUNT(*) as qtd,
        ROUND(COUNT(*) * 100.0 / SUM(COUNT(*)) OVER (), 2) as percentual
    FROM {DATABASE_DESTINO}.ecd_contas_classificadas_final
    WHERE ano_referencia = {ANO_REFERENCIA}
    GROUP BY classificacao_nivel2, origem_demonstrativo
    ORDER BY qtd DESC
    LIMIT 20
""").toPandas()

print(top_class_df)

# Gr√°fico de barras horizontais - Top classifica√ß√µes
fig_top = go.Figure()
for origem in top_class_df['origem_demonstrativo'].unique():
    df_origem = top_class_df[top_class_df['origem_demonstrativo'] == origem]
    fig_top.add_trace(go.Bar(
        name=origem,
        y=df_origem['classificacao'],
        x=df_origem['qtd'],
        orientation='h',
        text=df_origem['percentual'].apply(lambda x: f"{x:.1f}%"),
        textposition='auto'
    ))

fig_top.update_layout(
    title=f"Top 20 Classifica√ß√µes (N√≠vel 2) - {UF_FILTRO} {ANO_REFERENCIA}",
    xaxis_title="Quantidade de Contas",
    yaxis_title="Classifica√ß√£o",
    barmode='stack',
    height=600
)
fig_top.show()

# ================================================================================
# AN√ÅLISE 5: VALIDA√á√ÉO CONT√ÅBIL - EMPRESAS
# ================================================================================

print("\nüìä AN√ÅLISE 5: Valida√ß√£o da Equa√ß√£o Cont√°bil")

validacao_df = spark.sql(f"""
    SELECT 
        CASE WHEN empresa_equacao_ok = TRUE THEN 'Equa√ß√£o OK' ELSE 'Equa√ß√£o com Erro' END as status,
        COUNT(DISTINCT id_ecd) as qtd_empresas,
        ROUND(COUNT(DISTINCT id_ecd) * 100.0 / SUM(COUNT(DISTINCT id_ecd)) OVER (), 2) as percentual
    FROM {DATABASE_DESTINO}.ecd_contas_classificadas_final
    WHERE ano_referencia = {ANO_REFERENCIA}
        AND empresa_equacao_ok IS NOT NULL
    GROUP BY empresa_equacao_ok
""").toPandas()

print(validacao_df)

# Gr√°fico de pizza - Valida√ß√£o
fig_validacao = go.Figure(data=[go.Pie(
    labels=validacao_df['status'],
    values=validacao_df['qtd_empresas'],
    marker_colors=['#27ae60', '#e74c3c'],
    hole=.4,
    textinfo='label+percent+value',
    textposition='auto'
)])
fig_validacao.update_layout(
    title=f"Valida√ß√£o da Equa√ß√£o Cont√°bil (Toler√¢ncia {TOLERANCIA_EQUACAO}%) - {UF_FILTRO} {ANO_REFERENCIA}",
    height=400
)
fig_validacao.show()

# ================================================================================
# AN√ÅLISE 6: DISTRIBUI√á√ÉO DE ERRO NA EQUA√á√ÉO CONT√ÅBIL
# ================================================================================

print("\nüìä AN√ÅLISE 6: Distribui√ß√£o do Erro na Equa√ß√£o Cont√°bil")

erro_dist_df = spark.sql(f"""
    SELECT 
        empresa_diff_percentual
    FROM {DATABASE_DESTINO}.ecd_contas_classificadas_final
    WHERE ano_referencia = {ANO_REFERENCIA}
        AND empresa_diff_percentual IS NOT NULL
        AND empresa_diff_percentual < 100  -- Filtrar outliers extremos
    GROUP BY id_ecd, empresa_diff_percentual
""").toPandas()

if len(erro_dist_df) > 0:
    fig_erro_dist = px.histogram(
        erro_dist_df, 
        x='empresa_diff_percentual',
        nbins=50,
        title=f"Distribui√ß√£o do Erro Percentual na Equa√ß√£o Cont√°bil - {UF_FILTRO} {ANO_REFERENCIA}",
        labels={'empresa_diff_percentual': 'Erro Percentual (%)', 'count': 'N√∫mero de Empresas'}
    )
    fig_erro_dist.add_vline(x=TOLERANCIA_EQUACAO, line_dash="dash", line_color="red", 
                            annotation_text=f"Toler√¢ncia {TOLERANCIA_EQUACAO}%")
    fig_erro_dist.update_layout(height=400)
    fig_erro_dist.show()

# ================================================================================
# AN√ÅLISE 7: CONTAS DISPON√çVEIS PARA ML
# ================================================================================

print("\nüìä AN√ÅLISE 7: Contas Dispon√≠veis para Machine Learning")

ml_stats_df = spark.sql(f"""
    SELECT 
        usar_para_treino_ml,
        COUNT(*) as qtd_contas,
        ROUND(COUNT(*) * 100.0 / SUM(COUNT(*)) OVER (), 2) as percentual
    FROM {DATABASE_DESTINO}.ecd_contas_classificadas_final
    WHERE ano_referencia = {ANO_REFERENCIA}
    GROUP BY usar_para_treino_ml
""").toPandas()

print(ml_stats_df)

# Distribui√ß√£o de contas para ML por classifica√ß√£o
ml_por_class_df = spark.sql(f"""
    SELECT 
        COALESCE(classificacao_nivel2, 'NAO_CLASSIFICADO') as classificacao,
        COUNT(*) as qtd_treino
    FROM {DATABASE_DESTINO}.ecd_contas_classificadas_final
    WHERE ano_referencia = {ANO_REFERENCIA}
        AND usar_para_treino_ml = TRUE
    GROUP BY classificacao_nivel2
    ORDER BY qtd_treino DESC
    LIMIT 15
""").toPandas()

print("\nTop 15 Classifica√ß√µes para Treino ML:")
print(ml_por_class_df)

# Gr√°fico - Contas para ML
fig_ml = go.Figure()
fig_ml.add_trace(go.Bar(
    x=ml_por_class_df['classificacao'],
    y=ml_por_class_df['qtd_treino'],
    marker_color='#3498db',
    text=ml_por_class_df['qtd_treino'],
    textposition='auto'
))
fig_ml.update_layout(
    title=f"Top 15 Classifica√ß√µes com Mais Contas para Treino ML - {UF_FILTRO} {ANO_REFERENCIA}",
    xaxis_title="Classifica√ß√£o (N√≠vel 2)",
    yaxis_title="Quantidade de Contas",
    height=500,
    xaxis_tickangle=-45
)
fig_ml.show()

# ================================================================================
# AN√ÅLISE 8: QUALIDADE DA CLASSIFICA√á√ÉO - SCORE DE SIMILARIDADE
# ================================================================================

print("\nüìä AN√ÅLISE 8: Distribui√ß√£o do Score de Similaridade")

score_dist_df = spark.sql(f"""
    SELECT 
        score_similaridade,
        metodo_final
    FROM {DATABASE_DESTINO}.ecd_contas_classificadas_final
    WHERE ano_referencia = {ANO_REFERENCIA}
        AND score_similaridade IS NOT NULL
        AND metodo_final IN ('MATCH_CODIGO_EXATO', 'MATCH_CODIGO_PARCIAL_7', 
                             'MATCH_CODIGO_PARCIAL_5', 'MATCH_CODIGO_PARCIAL_3',
                             'MATCH_KEYWORDS', 'MATCH_SIMILARIDADE')
""").toPandas()

if len(score_dist_df) > 0:
    fig_score = px.box(
        score_dist_df,
        x='metodo_final',
        y='score_similaridade',
        title=f"Distribui√ß√£o do Score de Similaridade por M√©todo - {UF_FILTRO} {ANO_REFERENCIA}",
        labels={'metodo_final': 'M√©todo de Classifica√ß√£o', 'score_similaridade': 'Score de Similaridade'}
    )
    fig_score.update_layout(height=500, xaxis_tickangle=-45)
    fig_score.show()

print("\n" + "=" * 80)
print("‚úÖ AN√ÅLISES E VISUALIZA√á√ïES CONCLU√çDAS!")
print("=" * 80)

In [None]:
# ================================================================================
# C√âLULA 8: CRIAR TABELAS ANAL√çTICAS NO HIVE
# ================================================================================

print("\nüíæ CRIANDO TABELAS ANAL√çTICAS NO HIVE...")
print("=" * 80)

from datetime import datetime

# ================================================================================
# CONFIGURA√á√ÉO
# ================================================================================

print(f"üìä Database: {DATABASE_DESTINO}")
print(f"üìÖ Ano: {ANO_REFERENCIA}")
print(f"üìç UF: {UF_FILTRO}")

# ================================================================================
# TABELA 1: RESUMO EXECUTIVO POR ANO/UF
# ================================================================================

print("\nüìä Criando Tabela 1: neac.ecd_resumo_executivo...")

spark.sql(f"""
    CREATE TABLE IF NOT EXISTS {DATABASE_DESTINO}.ecd_resumo_executivo (
        total_contas BIGINT,
        total_empresas BIGINT,
        total_cnpjs BIGINT,
        contas_nivel1 BIGINT,
        contas_nivel2 BIGINT,
        contas_nivel3 BIGINT,
        perc_nivel1 DOUBLE,
        perc_nivel2 DOUBLE,
        perc_nivel3 DOUBLE,
        contas_confianca_muito_alta BIGINT,
        contas_confianca_alta BIGINT,
        contas_confianca_media BIGINT,
        contas_confianca_baixa BIGINT,
        empresas_equacao_ok BIGINT,
        empresas_equacao_nok BIGINT,
        perc_empresas_ok DOUBLE,
        contas_para_ml BIGINT,
        perc_contas_ml DOUBLE
    )
    PARTITIONED BY (ano_referencia INT, uf STRING)
    STORED AS PARQUET
""")

spark.sql(f"""
    INSERT OVERWRITE TABLE {DATABASE_DESTINO}.ecd_resumo_executivo
    PARTITION (ano_referencia = {ANO_REFERENCIA}, uf = '{UF_FILTRO}')
    SELECT 
        COUNT(*) as total_contas,
        COUNT(DISTINCT id_ecd) as total_empresas,
        COUNT(DISTINCT cnpj) as total_cnpjs,
        
        SUM(CASE WHEN classificacao_nivel1 IS NOT NULL AND classificacao_nivel1 != 'NAO_CLASSIFICADO' THEN 1 ELSE 0 END) as contas_nivel1,
        SUM(CASE WHEN classificacao_nivel2 IS NOT NULL AND classificacao_nivel2 != 'NAO_CLASSIFICADO' THEN 1 ELSE 0 END) as contas_nivel2,
        SUM(CASE WHEN classificacao_nivel3 IS NOT NULL AND classificacao_nivel3 != 'NAO_CLASSIFICADO' THEN 1 ELSE 0 END) as contas_nivel3,
        
        ROUND(SUM(CASE WHEN classificacao_nivel1 IS NOT NULL AND classificacao_nivel1 != 'NAO_CLASSIFICADO' THEN 1 ELSE 0 END) * 100.0 / COUNT(*), 2) as perc_nivel1,
        ROUND(SUM(CASE WHEN classificacao_nivel2 IS NOT NULL AND classificacao_nivel2 != 'NAO_CLASSIFICADO' THEN 1 ELSE 0 END) * 100.0 / COUNT(*), 2) as perc_nivel2,
        ROUND(SUM(CASE WHEN classificacao_nivel3 IS NOT NULL AND classificacao_nivel3 != 'NAO_CLASSIFICADO' THEN 1 ELSE 0 END) * 100.0 / COUNT(*), 2) as perc_nivel3,
        
        SUM(CASE WHEN confianca_nivel1 = 'MUITO_ALTA' THEN 1 ELSE 0 END) as contas_confianca_muito_alta,
        SUM(CASE WHEN confianca_nivel1 = 'ALTA' THEN 1 ELSE 0 END) as contas_confianca_alta,
        SUM(CASE WHEN confianca_nivel1 = 'MEDIA' THEN 1 ELSE 0 END) as contas_confianca_media,
        SUM(CASE WHEN confianca_nivel1 = 'BAIXA' THEN 1 ELSE 0 END) as contas_confianca_baixa,
        
        COUNT(DISTINCT CASE WHEN empresa_equacao_ok = TRUE THEN id_ecd END) as empresas_equacao_ok,
        COUNT(DISTINCT CASE WHEN empresa_equacao_ok = FALSE THEN id_ecd END) as empresas_equacao_nok,
        ROUND(COUNT(DISTINCT CASE WHEN empresa_equacao_ok = TRUE THEN id_ecd END) * 100.0 / COUNT(DISTINCT id_ecd), 2) as perc_empresas_ok,
        
        SUM(CASE WHEN usar_para_treino_ml = TRUE THEN 1 ELSE 0 END) as contas_para_ml,
        ROUND(SUM(CASE WHEN usar_para_treino_ml = TRUE THEN 1 ELSE 0 END) * 100.0 / COUNT(*), 2) as perc_contas_ml
        
    FROM {DATABASE_DESTINO}.ecd_contas_classificadas_final
    WHERE ano_referencia = {ANO_REFERENCIA}
""")

print("‚úÖ Tabela criada: neac.ecd_resumo_executivo")

# Mostrar resultado
spark.sql(f"""
    SELECT * FROM {DATABASE_DESTINO}.ecd_resumo_executivo
    WHERE ano_referencia = {ANO_REFERENCIA} AND uf = '{UF_FILTRO}'
""").show(truncate=False)

# ================================================================================
# TABELA 2: DETALHAMENTO POR M√âTODO
# ================================================================================

print("\nüìä Criando Tabela 2: neac.ecd_detalhamento_metodo...")

spark.sql(f"""
    CREATE TABLE IF NOT EXISTS {DATABASE_DESTINO}.ecd_detalhamento_metodo (
        metodo STRING,
        origem_demonstrativo STRING,
        qtd_contas BIGINT,
        qtd_empresas BIGINT,
        percentual_total DOUBLE,
        avg_score DOUBLE
    )
    PARTITIONED BY (ano_referencia INT, uf STRING)
    STORED AS PARQUET
""")

spark.sql(f"""
    INSERT OVERWRITE TABLE {DATABASE_DESTINO}.ecd_detalhamento_metodo
    PARTITION (ano_referencia = {ANO_REFERENCIA}, uf = '{UF_FILTRO}')
    SELECT 
        COALESCE(metodo_final, 'NAO_CLASSIFICADO') as metodo,
        origem_demonstrativo,
        COUNT(*) as qtd_contas,
        COUNT(DISTINCT id_ecd) as qtd_empresas,
        ROUND(COUNT(*) * 100.0 / SUM(COUNT(*)) OVER (), 2) as percentual_total,
        ROUND(AVG(CASE WHEN score_similaridade IS NOT NULL THEN score_similaridade END), 3) as avg_score
    FROM {DATABASE_DESTINO}.ecd_contas_classificadas_final
    WHERE ano_referencia = {ANO_REFERENCIA}
    GROUP BY metodo_final, origem_demonstrativo
""")

print("‚úÖ Tabela criada: neac.ecd_detalhamento_metodo")

spark.sql(f"""
    SELECT * FROM {DATABASE_DESTINO}.ecd_detalhamento_metodo
    WHERE ano_referencia = {ANO_REFERENCIA} AND uf = '{UF_FILTRO}'
    ORDER BY qtd_contas DESC
    LIMIT 20
""").show(truncate=False)

# ================================================================================
# TABELA 3: TOP CLASSIFICA√á√ïES
# ================================================================================

print("\nüìä Criando Tabela 3: neac.ecd_top_classificacoes...")

spark.sql(f"""
    CREATE TABLE IF NOT EXISTS {DATABASE_DESTINO}.ecd_top_classificacoes (
        classificacao_nivel1 STRING,
        classificacao_nivel2 STRING,
        classificacao_nivel3 STRING,
        origem_demonstrativo STRING,
        qtd_contas BIGINT,
        qtd_empresas BIGINT,
        percentual DOUBLE,
        media_saldo_final DOUBLE
    )
    PARTITIONED BY (ano_referencia INT, uf STRING)
    STORED AS PARQUET
""")

spark.sql(f"""
    INSERT OVERWRITE TABLE {DATABASE_DESTINO}.ecd_top_classificacoes
    PARTITION (ano_referencia = {ANO_REFERENCIA}, uf = '{UF_FILTRO}')
    SELECT 
        classificacao_nivel1,
        classificacao_nivel2,
        classificacao_nivel3,
        origem_demonstrativo,
        COUNT(*) as qtd_contas,
        COUNT(DISTINCT id_ecd) as qtd_empresas,
        ROUND(COUNT(*) * 100.0 / SUM(COUNT(*)) OVER (), 2) as percentual,
        ROUND(AVG(vl_cta_fin), 2) as media_saldo_final
    FROM {DATABASE_DESTINO}.ecd_contas_classificadas_final
    WHERE ano_referencia = {ANO_REFERENCIA}
        AND classificacao_nivel2 IS NOT NULL
        AND classificacao_nivel2 != 'NAO_CLASSIFICADO'
    GROUP BY classificacao_nivel1, classificacao_nivel2, classificacao_nivel3, origem_demonstrativo
""")

print("‚úÖ Tabela criada: neac.ecd_top_classificacoes")

# ================================================================================
# TABELA 4: EMPRESAS COM EQUA√á√ÉO OK
# ================================================================================

print("\nüìä Criando Tabela 4: neac.ecd_empresas_equacao_ok...")

spark.sql(f"""
    CREATE TABLE IF NOT EXISTS {DATABASE_DESTINO}.ecd_empresas_equacao_ok (
        id_ecd BIGINT,
        cnpj STRING,
        nm_empresarial STRING,
        erro_percentual DOUBLE,
        total_contas BIGINT,
        contas_para_ml BIGINT
    )
    PARTITIONED BY (ano_referencia INT, uf STRING)
    STORED AS PARQUET
""")

spark.sql(f"""
    INSERT OVERWRITE TABLE {DATABASE_DESTINO}.ecd_empresas_equacao_ok
    PARTITION (ano_referencia = {ANO_REFERENCIA}, uf = '{UF_FILTRO}')
    SELECT DISTINCT
        id_ecd,
        cnpj,
        nm_empresarial,
        empresa_diff_percentual as erro_percentual,
        COUNT(*) OVER (PARTITION BY id_ecd) as total_contas,
        SUM(CASE WHEN usar_para_treino_ml = TRUE THEN 1 ELSE 0 END) OVER (PARTITION BY id_ecd) as contas_para_ml
    FROM {DATABASE_DESTINO}.ecd_contas_classificadas_final
    WHERE ano_referencia = {ANO_REFERENCIA}
        AND empresa_equacao_ok = TRUE
""")

print("‚úÖ Tabela criada: neac.ecd_empresas_equacao_ok")

# ================================================================================
# TABELA 5: CONTAS PARA ML (Amostra Balanceada)
# ================================================================================

print("\nüìä Criando Tabela 5: neac.ecd_amostra_ml...")

spark.sql(f"""
    CREATE TABLE IF NOT EXISTS {DATABASE_DESTINO}.ecd_amostra_ml (
        id_ecd BIGINT,
        cnpj STRING,
        cd_conta STRING,
        descr_conta STRING,
        nivel_conta INT,
        origem_demonstrativo STRING,
        classificacao_nivel1 STRING,
        classificacao_nivel2 STRING,
        classificacao_nivel3 STRING,
        confianca_nivel1 STRING,
        metodo_final STRING,
        score_similaridade DOUBLE,
        vl_cta_ini DOUBLE,
        vl_cta_fin DOUBLE
    )
    PARTITIONED BY (ano_referencia INT, uf STRING)
    STORED AS PARQUET
""")

spark.sql(f"""
    INSERT OVERWRITE TABLE {DATABASE_DESTINO}.ecd_amostra_ml
    PARTITION (ano_referencia = {ANO_REFERENCIA}, uf = '{UF_FILTRO}')
    WITH ranked AS (
        SELECT 
            *,
            ROW_NUMBER() OVER (
                PARTITION BY classificacao_nivel2 
                ORDER BY RAND()
            ) as rn
        FROM {DATABASE_DESTINO}.ecd_contas_classificadas_final
        WHERE ano_referencia = {ANO_REFERENCIA}
            AND usar_para_treino_ml = TRUE
            AND classificacao_nivel2 IS NOT NULL
    )
    SELECT 
        id_ecd, cnpj, cd_conta, descr_conta, nivel_conta,
        origem_demonstrativo, classificacao_nivel1, classificacao_nivel2, classificacao_nivel3,
        confianca_nivel1, metodo_final, score_similaridade,
        vl_cta_ini, vl_cta_fin
    FROM ranked
    WHERE rn <= 100
""")

print("‚úÖ Tabela criada: neac.ecd_amostra_ml")

# ================================================================================
# TABELA 6: CONTAS N√ÉO CLASSIFICADAS
# ================================================================================

print("\nüìä Criando Tabela 6: neac.ecd_contas_nao_classificadas...")

spark.sql(f"""
    CREATE TABLE IF NOT EXISTS {DATABASE_DESTINO}.ecd_contas_nao_classificadas (
        id_ecd BIGINT,
        cnpj STRING,
        nm_empresarial STRING,
        cd_conta STRING,
        descr_conta STRING,
        nivel_conta INT,
        origem_demonstrativo STRING,
        ind_grp_bal STRING,
        ind_grp_dre STRING,
        cd_natureza STRING,
        vl_cta_fin DOUBLE,
        metodo_nivel1 STRING,
        cd_conta_sup STRING
    )
    PARTITIONED BY (ano_referencia INT, uf STRING)
    STORED AS PARQUET
""")

spark.sql(f"""
    INSERT OVERWRITE TABLE {DATABASE_DESTINO}.ecd_contas_nao_classificadas
    PARTITION (ano_referencia = {ANO_REFERENCIA}, uf = '{UF_FILTRO}')
    SELECT 
        id_ecd, cnpj, nm_empresarial, cd_conta, descr_conta, nivel_conta,
        origem_demonstrativo, ind_grp_bal, ind_grp_dre, cd_natureza,
        vl_cta_fin, metodo_nivel1, cd_conta_sup
    FROM {DATABASE_DESTINO}.ecd_contas_classificadas_final
    WHERE ano_referencia = {ANO_REFERENCIA}
        AND (classificacao_nivel2 IS NULL OR classificacao_nivel2 = 'NAO_CLASSIFICADO')
""")

print("‚úÖ Tabela criada: neac.ecd_contas_nao_classificadas")

# ================================================================================
# TABELA 7: ESTAT√çSTICAS POR EMPRESA
# ================================================================================

print("\nüìä Criando Tabela 7: neac.ecd_stats_por_empresa...")

spark.sql(f"""
    CREATE TABLE IF NOT EXISTS {DATABASE_DESTINO}.ecd_stats_por_empresa (
        id_ecd BIGINT,
        cnpj STRING,
        nm_empresarial STRING,
        total_contas BIGINT,
        contas_bp BIGINT,
        contas_dre BIGINT,
        contas_classificadas BIGINT,
        perc_classificadas DOUBLE,
        contas_alta_confianca BIGINT,
        contas_treino_ml BIGINT,
        equacao_ok BOOLEAN,
        erro_percentual DOUBLE
    )
    PARTITIONED BY (ano_referencia INT, uf STRING)
    STORED AS PARQUET
""")

spark.sql(f"""
    INSERT OVERWRITE TABLE {DATABASE_DESTINO}.ecd_stats_por_empresa
    PARTITION (ano_referencia = {ANO_REFERENCIA}, uf = '{UF_FILTRO}')
    SELECT 
        id_ecd,
        cnpj,
        nm_empresarial,
        COUNT(*) as total_contas,
        SUM(CASE WHEN origem_demonstrativo = 'BP' THEN 1 ELSE 0 END) as contas_bp,
        SUM(CASE WHEN origem_demonstrativo = 'DRE' THEN 1 ELSE 0 END) as contas_dre,
        SUM(CASE WHEN classificacao_nivel2 IS NOT NULL AND classificacao_nivel2 != 'NAO_CLASSIFICADO' THEN 1 ELSE 0 END) as contas_classificadas,
        ROUND(SUM(CASE WHEN classificacao_nivel2 IS NOT NULL AND classificacao_nivel2 != 'NAO_CLASSIFICADO' THEN 1 ELSE 0 END) * 100.0 / COUNT(*), 2) as perc_classificadas,
        SUM(CASE WHEN confianca_nivel1 IN ('MUITO_ALTA', 'ALTA') THEN 1 ELSE 0 END) as contas_alta_confianca,
        SUM(CASE WHEN usar_para_treino_ml = TRUE THEN 1 ELSE 0 END) as contas_treino_ml,
        MAX(empresa_equacao_ok) as equacao_ok,
        MAX(empresa_diff_percentual) as erro_percentual
    FROM {DATABASE_DESTINO}.ecd_contas_classificadas_final
    WHERE ano_referencia = {ANO_REFERENCIA}
    GROUP BY id_ecd, cnpj, nm_empresarial
""")

print("‚úÖ Tabela criada: neac.ecd_stats_por_empresa")

# ================================================================================
# RESUMO DAS TABELAS CRIADAS
# ================================================================================

print("\n" + "=" * 80)
print("üìÅ TABELAS ANAL√çTICAS CRIADAS NO HIVE:")
print("=" * 80)

tabelas_criadas = [
    ("neac.ecd_resumo_executivo", "Resumo executivo geral"),
    ("neac.ecd_detalhamento_metodo", "Detalhamento por m√©todo de classifica√ß√£o"),
    ("neac.ecd_top_classificacoes", "Top classifica√ß√µes mais frequentes"),
    ("neac.ecd_empresas_equacao_ok", "Empresas com equa√ß√£o cont√°bil OK"),
    ("neac.ecd_amostra_ml", "Amostra balanceada para ML"),
    ("neac.ecd_contas_nao_classificadas", "Contas n√£o classificadas"),
    ("neac.ecd_stats_por_empresa", "Estat√≠sticas detalhadas por empresa")
]

for i, (tabela, descricao) in enumerate(tabelas_criadas, 1):
    print(f"{i}. {tabela}")
    print(f"   ‚îî‚îÄ‚îÄ {descricao}")

print("\n" + "=" * 80)
print("‚úÖ TODAS AS TABELAS ANAL√çTICAS FORAM CRIADAS COM SUCESSO!")
print("=" * 80)
print(f"\nüìä Particionadas por: ano_referencia = {ANO_REFERENCIA}, uf = '{UF_FILTRO}'")
print(f"üíæ Database: {DATABASE_DESTINO}")
print("\nüéØ Use estas tabelas para:")
print("  - An√°lises r√°pidas e dashboards")
print("  - Monitoramento da qualidade da classifica√ß√£o")
print("  - Identifica√ß√£o de empresas priorit√°rias")
print("  - Prepara√ß√£o de datasets para ML")
print("=" * 80)

In [None]:
# ================================================================================
# C√âLULA 9: PREPARA√á√ÉO DE FEATURES PARA MACHINE LEARNING
# ================================================================================

print("\nü§ñ PREPARA√á√ÉO DE FEATURES PARA MACHINE LEARNING...")
print("=" * 80)

from pyspark.ml.feature import StringIndexer, VectorAssembler, OneHotEncoder
from pyspark.ml import Pipeline
from pyspark.sql.functions import col, length, regexp_extract, when, lit, split, abs as spark_abs
import pandas as pd

# ================================================================================
# CARREGAR DADOS DE TREINO
# ================================================================================

print("\nüì• Carregando dados para treino...")

df_treino = spark.sql(f"""
    SELECT 
        id_ecd, cnpj, cd_conta, descr_conta, nivel_conta, cd_conta_sup,
        origem_demonstrativo, ind_grp_bal, ind_grp_dre, cd_natureza,
        tp_conta_agl, tp_conta_pc,
        classificacao_nivel1, classificacao_nivel2, classificacao_nivel3,
        vl_cta_ini, vl_cta_fin
    FROM {DATABASE_DESTINO}.ecd_contas_classificadas_final
    WHERE ano_referencia = {ANO_REFERENCIA}
        AND usar_para_treino_ml = TRUE
        AND classificacao_nivel2 IS NOT NULL
        AND classificacao_nivel2 != 'NAO_CLASSIFICADO'
""")

total_treino = df_treino.count()
print(f"‚úÖ Registros para treino: {total_treino:,}")

# Distribui√ß√£o de classes
print("\nüìä Distribui√ß√£o de classes (top 20):")
df_treino.groupBy("classificacao_nivel2").count().orderBy(col("count").desc()).show(20)

# ================================================================================
# FEATURE ENGINEERING
# ================================================================================

print("\nüîß Criando features...")

df_features = df_treino \
    .withColumn("tamanho_descricao", length(col("descr_conta"))) \
    .withColumn("tamanho_codigo", length(col("cd_conta"))) \
    .withColumn("primeiro_digito_codigo", regexp_extract(col("cd_conta"), r"^(\d)", 1)) \
    .withColumn("tem_ponto_codigo", when(col("cd_conta").contains("."), lit(1)).otherwise(lit(0))) \
    .withColumn("tem_hifen_codigo", when(col("cd_conta").contains("-"), lit(1)).otherwise(lit(0))) \
    .withColumn("tem_underscore_codigo", when(col("cd_conta").contains("_"), lit(1)).otherwise(lit(0))) \
    .withColumn("valor_absoluto_final", when(col("vl_cta_fin").isNull(), lit(0.0)).otherwise(abs(col("vl_cta_fin")))) \
    .withColumn("valor_absoluto_inicial", when(col("vl_cta_ini").isNull(), lit(0.0)).otherwise(abs(col("vl_cta_ini")))) \
    .withColumn("variacao_valor", col("valor_absoluto_final") - col("valor_absoluto_inicial")) \
    .withColumn("tem_conta_superior", when(col("cd_conta_sup").isNotNull(), lit(1)).otherwise(lit(0)))

# Tratar nulos em campos categ√≥ricos
df_features = df_features \
    .fillna({
        'ind_grp_bal': 'DESCONHECIDO',
        'ind_grp_dre': 'DESCONHECIDO',
        'cd_natureza': '00',
        'tp_conta_agl': 'DESCONHECIDO',
        'tp_conta_pc': 'DESCONHECIDO',
        'primeiro_digito_codigo': '0'
    })

# IMPORTANTE: Substituir strings vazias por 'DESCONHECIDO'
for col_name in ['ind_grp_bal', 'ind_grp_dre', 'tp_conta_agl', 'tp_conta_pc', 
                 'cd_natureza', 'primeiro_digito_codigo', 'classificacao_nivel1']:
    df_features = df_features.withColumn(
        col_name,
        when((col(col_name).isNull()) | (col(col_name) == ''), lit('DESCONHECIDO'))
        .otherwise(col(col_name))
    )

print("‚úÖ Features criadas!")

# ================================================================================
# CRIAR FEATURES TEXTUAIS (Palavras-chave)
# ================================================================================

print("\nüìù Criando features textuais (presen√ßa de palavras-chave)...")

# Palavras-chave importantes
keywords_features = {
    'tem_palavra_caixa': ['caixa', 'cash'],
    'tem_palavra_banco': ['banco', 'bank'],
    'tem_palavra_estoque': ['estoque', 'mercadoria', 'produto'],
    'tem_palavra_cliente': ['cliente', 'duplicata', 'receber'],
    'tem_palavra_fornecedor': ['fornecedor', 'fornecimento'],
    'tem_palavra_salario': ['salario', 'folha', 'ferias'],
    'tem_palavra_tributo': ['tributo', 'imposto', 'icms', 'pis', 'cofins'],
    'tem_palavra_receita': ['receita', 'venda', 'faturamento'],
    'tem_palavra_despesa': ['despesa', 'custo', 'gasto'],
    'tem_palavra_financeiro': ['juros', 'financeiro', 'emprestimo'],
    'tem_palavra_imobilizado': ['imovel', 'veiculo', 'maquina', 'equipamento'],
    'tem_palavra_capital': ['capital', 'social'],
    'tem_palavra_lucro': ['lucro', 'prejuizo', 'resultado']
}

for feature_name, keywords in keywords_features.items():
    condicao = lit(False)
    for keyword in keywords:
        condicao = condicao | col("descr_conta").contains(keyword)
    df_features = df_features.withColumn(feature_name, when(condicao, lit(1)).otherwise(lit(0)))

print("‚úÖ Features textuais criadas!")

# ================================================================================
# ENCODING DE VARI√ÅVEIS CATEG√ìRICAS
# ================================================================================

print("\nüî¢ Aplicando encoding de vari√°veis categ√≥ricas...")

# Campos categ√≥ricos para encoding
categorical_cols = [
    'origem_demonstrativo',
    'ind_grp_bal',
    'ind_grp_dre',
    'cd_natureza',
    'tp_conta_agl',
    'tp_conta_pc',
    'primeiro_digito_codigo',
    'classificacao_nivel1'
]

# Criar indexers
indexers = [
    StringIndexer(inputCol=col_name, outputCol=f"{col_name}_index", handleInvalid="keep")
    for col_name in categorical_cols
]

# Criar encoders (opcional - para algoritmos que se beneficiam disso)
encoders = [
    OneHotEncoder(inputCol=f"{col_name}_index", outputCol=f"{col_name}_encoded")
    for col_name in categorical_cols
]

# Pipeline de transforma√ß√£o
pipeline_encoding = Pipeline(stages=indexers + encoders)

print("Aplicando pipeline de encoding...")
model_encoding = pipeline_encoding.fit(df_features)
df_encoded = model_encoding.transform(df_features)

print("‚úÖ Encoding aplicado!")

# ================================================================================
# SELECIONAR FEATURES FINAIS
# ================================================================================

print("\nüìã Selecionando features finais...")

# Features num√©ricas
numeric_features = [
    'nivel_conta',
    'tamanho_descricao',
    'tamanho_codigo',
    'tem_ponto_codigo',
    'tem_hifen_codigo',
    'tem_underscore_codigo',
    'valor_absoluto_final',
    'valor_absoluto_inicial',
    'variacao_valor',
    'tem_conta_superior'
] + list(keywords_features.keys())

# Features categ√≥ricas (encoded)
categorical_encoded_features = [f"{col_name}_encoded" for col_name in categorical_cols]

# Todas as features
all_features = numeric_features + categorical_encoded_features

print(f"Total de features: {len(all_features)}")
print(f"  - Num√©ricas: {len(numeric_features)}")
print(f"  - Categ√≥ricas (encoded): {len(categorical_encoded_features)}")

# ================================================================================
# CRIAR VETOR DE FEATURES
# ================================================================================

print("\nüîó Criando vetor de features...")

assembler = VectorAssembler(
    inputCols=all_features,
    outputCol="features",
    handleInvalid="keep"
)

df_assembled = assembler.transform(df_encoded)

print("‚úÖ Vetor de features criado!")

# ================================================================================
# CRIAR LABEL (TARGET)
# ================================================================================

print("\nüéØ Criando label (target)...")

# Indexar a classifica√ß√£o_nivel2 como label
label_indexer = StringIndexer(
    inputCol="classificacao_nivel2",
    outputCol="label",
    handleInvalid="keep"
)

df_final_ml = label_indexer.fit(df_assembled).transform(df_assembled)

print("‚úÖ Label criada!")

# Verificar distribui√ß√£o de labels
print("\nüìä Distribui√ß√£o de labels (top 20):")
df_final_ml.groupBy("label", "classificacao_nivel2").count().orderBy("label").show(20)

# ================================================================================
# SALVAR DATASET PREPARADO
# ================================================================================

print("\nüíæ Salvando dataset preparado para ML...")

# Selecionar apenas colunas necess√°rias
df_ml_final = df_final_ml.select(
    "id_ecd", "cnpj", "cd_conta", "descr_conta",
    "classificacao_nivel2", "label", "features",
    *numeric_features,
    *[f"{col_name}_index" for col_name in categorical_cols]
)

# Salvar como tabela Hive
print(f"üíæ Salvando como tabela: {DATABASE_DESTINO}.ecd_ml_dataset...")

# Dropar tabela se existir (para evitar conflito de schema)
spark.sql(f"DROP TABLE IF EXISTS {DATABASE_DESTINO}.ecd_ml_dataset")

# Criar tabela diretamente com CTAS (Create Table As Select)
df_ml_final.write.mode("overwrite").saveAsTable(f"{DATABASE_DESTINO}.ecd_ml_dataset")

print(f"‚úÖ Dataset salvo: {DATABASE_DESTINO}.ecd_ml_dataset")

# ================================================================================
# ESTAT√çSTICAS FINAIS
# ================================================================================

print("\nüìä ESTAT√çSTICAS DO DATASET PARA ML:")
print("-" * 80)

# Criar view tempor√°ria ANTES de usar no SQL
df_ml_final.createOrReplaceTempView("temp_stats_ml")

# Agora executar a query
stats = spark.sql("""
    SELECT 
        COUNT(*) as total_registros,
        COUNT(DISTINCT classificacao_nivel2) as total_classes,
        ROUND(AVG(nivel_conta), 2) as avg_nivel_conta,
        ROUND(AVG(tamanho_descricao), 2) as avg_tamanho_descricao,
        ROUND(AVG(valor_absoluto_final), 2) as avg_valor_final
    FROM temp_stats_ml
""")

stats.show(truncate=False)

# Mostrar mais estat√≠sticas
print("\nüìä Total e classes:")
spark.sql("SELECT COUNT(*) as total, COUNT(DISTINCT label) as total_classes FROM temp_stats_ml").show()

print("\nüìã Amostra do dataset preparado:")
df_ml_final.select("cd_conta", "descr_conta", "classificacao_nivel2", "label").show(10, truncate=50)

# ================================================================================
# SPLIT TREINO/TESTE
# ================================================================================

print("\n‚úÇÔ∏è  Criando split treino/valida√ß√£o/teste...")

# Split: 70% treino, 15% valida√ß√£o, 15% teste
train_data, temp_data = df_ml_final.randomSplit([0.7, 0.3], seed=42)
val_data, test_data = temp_data.randomSplit([0.5, 0.5], seed=42)

print(f"üìä Treino: {train_data.count():,} registros")
print(f"üìä Valida√ß√£o: {val_data.count():,} registros")
print(f"üìä Teste: {test_data.count():,} registros")

# Salvar splits como tabelas Hive
print("\nüíæ Salvando splits em tabelas Hive...")

# Dropar tabelas se existirem
spark.sql(f"DROP TABLE IF EXISTS {DATABASE_DESTINO}.ecd_ml_train")
spark.sql(f"DROP TABLE IF EXISTS {DATABASE_DESTINO}.ecd_ml_val")
spark.sql(f"DROP TABLE IF EXISTS {DATABASE_DESTINO}.ecd_ml_test")

# Salvar diretamente
train_data.write.mode("overwrite").saveAsTable(f"{DATABASE_DESTINO}.ecd_ml_train")
val_data.write.mode("overwrite").saveAsTable(f"{DATABASE_DESTINO}.ecd_ml_val")
test_data.write.mode("overwrite").saveAsTable(f"{DATABASE_DESTINO}.ecd_ml_test")

print(f"‚úÖ Train salvo: {DATABASE_DESTINO}.ecd_ml_train")
print(f"‚úÖ Val salvo: {DATABASE_DESTINO}.ecd_ml_val")
print(f"‚úÖ Test salvo: {DATABASE_DESTINO}.ecd_ml_test")

# ================================================================================
# EXPORTAR MAPEAMENTO DE LABELS
# ================================================================================

print("\nüìÑ Salvando mapeamento de labels...")

# Criar mapeamento label -> classificacao_nivel2
label_mapping = df_final_ml.select("label", "classificacao_nivel2") \
    .distinct() \
    .orderBy("label") \
    .toPandas()

print("\nüìã Mapeamento de labels:")
print(label_mapping)

# Salvar como tabela Hive (mais confi√°vel que arquivo local)
df_label_mapping = spark.createDataFrame(label_mapping)
df_label_mapping.createOrReplaceTempView("temp_label_mapping")

spark.sql(f"""
    CREATE TABLE IF NOT EXISTS {DATABASE_DESTINO}.ml_label_mapping (
        label DOUBLE,
        classificacao_nivel2 STRING
    )
    STORED AS PARQUET
""")

spark.sql(f"""
    INSERT OVERWRITE TABLE {DATABASE_DESTINO}.ml_label_mapping
    SELECT * FROM temp_label_mapping
""")

print(f"‚úÖ Mapeamento salvo na tabela: {DATABASE_DESTINO}.ml_label_mapping")

print("\n" + "=" * 80)
print("‚úÖ PREPARA√á√ÉO DE FEATURES PARA ML CONCLU√çDA!")
print("=" * 80)
print("\nüéØ Pr√≥ximos passos:")
print("  1. Treinar modelos (Random Forest, XGBoost, etc.)")
print("  2. Avaliar performance nos dados de valida√ß√£o")
print("  3. Fazer predi√ß√µes nas contas n√£o classificadas")
print("  4. Refinar features e retreinar se necess√°rio")
print("=" * 80)

In [None]:
# ================================================================================
# C√âLULA 10: TREINAR MODELOS DE MACHINE LEARNING
# ================================================================================

print("\nü§ñ TREINAMENTO DE MODELOS DE MACHINE LEARNING...")
print("=" * 80)

from pyspark.ml.classification import RandomForestClassifier, GBTClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml import Pipeline
from pyspark.sql.functions import col
import time

# ================================================================================
# CARREGAR DADOS DE TREINO E VALIDA√á√ÉO
# ================================================================================

print("\nüì• Carregando datasets...")

try:
    df_train = spark.table(f"{DATABASE_DESTINO}.ecd_ml_train")
    df_val = spark.table(f"{DATABASE_DESTINO}.ecd_ml_val")
    df_test = spark.table(f"{DATABASE_DESTINO}.ecd_ml_test")
    
    print(f"‚úÖ Treino: {df_train.count():,} registros")
    print(f"‚úÖ Valida√ß√£o: {df_val.count():,} registros")
    print(f"‚úÖ Teste: {df_test.count():,} registros")
except Exception as e:
    print(f"‚ùå ERRO ao carregar datasets: {e}")
    print("‚ö†Ô∏è  Execute a C√âLULA 9 primeiro para preparar os dados!")
    raise

# Verificar distribui√ß√£o de classes
print("\nüìä Distribui√ß√£o de classes no dataset de treino (Top 20):")
df_train.groupBy("label", "classificacao_nivel2").count().orderBy("count", ascending=False).show(20)

# ================================================================================
# MODELO 1: RANDOM FOREST
# ================================================================================

print("\nüå≤ MODELO 1: RANDOM FOREST CLASSIFIER")
print("-" * 80)

print("Configurando Random Forest...")
rf = RandomForestClassifier(
    featuresCol="features",
    labelCol="label",
    predictionCol="rf_prediction",
    probabilityCol="rf_probability",
    rawPredictionCol="rf_raw_prediction",
    # Hiperpar√¢metros otimizados
    numTrees=100,              # N√∫mero de √°rvores
    maxDepth=10,               # Profundidade m√°xima
    maxBins=32,                # Bins para features categ√≥ricas
    minInstancesPerNode=10,    # M√≠nimo de inst√¢ncias por n√≥
    subsamplingRate=0.8,       # Taxa de subamostragem
    featureSubsetStrategy="sqrt",  # Estrat√©gia de sele√ß√£o de features
    seed=42
)

print("Treinando Random Forest...")
print("‚è≥ Isso pode levar 5-15 minutos dependendo do volume de dados...")
inicio_rf = time.time()

rf_model = rf.fit(df_train)

tempo_rf = time.time() - inicio_rf
print(f"‚úÖ Random Forest treinado em {tempo_rf:.2f} segundos ({tempo_rf/60:.2f} minutos)")

# Feature Importance
print("\nüìä Feature Importance (Random Forest) - Top 20:")
feature_importance_rf = list(zip(
    range(len(rf_model.featureImportances)),
    rf_model.featureImportances.toArray()
))
feature_importance_rf.sort(key=lambda x: x[1], reverse=True)

for idx, importance in feature_importance_rf[:20]:
    print(f"  Feature {idx}: {importance:.6f}")

# Fazer predi√ß√µes no conjunto de valida√ß√£o
print("\nüîÆ Fazendo predi√ß√µes no conjunto de valida√ß√£o...")
df_val_rf = rf_model.transform(df_val)

# ================================================================================
# AVALIA√á√ÉO R√ÅPIDA - RANDOM FOREST
# ================================================================================

print("\nüìä AVALIA√á√ÉO R√ÅPIDA - RANDOM FOREST:")
print("-" * 80)

evaluator_accuracy = MulticlassClassificationEvaluator(
    labelCol="label",
    predictionCol="rf_prediction",
    metricName="accuracy"
)

evaluator_f1 = MulticlassClassificationEvaluator(
    labelCol="label",
    predictionCol="rf_prediction",
    metricName="f1"
)

evaluator_precision = MulticlassClassificationEvaluator(
    labelCol="label",
    predictionCol="rf_prediction",
    metricName="weightedPrecision"
)

evaluator_recall = MulticlassClassificationEvaluator(
    labelCol="label",
    predictionCol="rf_prediction",
    metricName="weightedRecall"
)

accuracy_rf = evaluator_accuracy.evaluate(df_val_rf)
f1_rf = evaluator_f1.evaluate(df_val_rf)
precision_rf = evaluator_precision.evaluate(df_val_rf)
recall_rf = evaluator_recall.evaluate(df_val_rf)

print(f"  Accuracy:  {accuracy_rf:.4f} ({accuracy_rf*100:.2f}%)")
print(f"  F1-Score:  {f1_rf:.4f}")
print(f"  Precision: {precision_rf:.4f}")
print(f"  Recall:    {recall_rf:.4f}")

# ================================================================================
# MODELO 2: ALTERNATIVA - LOGISTIC REGRESSION (MULTICLASSE)
# ================================================================================

print("\nüìä MODELO 2: LOGISTIC REGRESSION (One-vs-Rest)")
print("-" * 80)
print("‚ÑπÔ∏è  GBT nativo do Spark ML s√≥ suporta classifica√ß√£o bin√°ria.")
print("‚ÑπÔ∏è  Usando Logistic Regression com estrat√©gia One-vs-Rest para multiclasse.")

from pyspark.ml.classification import LogisticRegression, OneVsRest

print("\nConfigurando Logistic Regression...")
lr = LogisticRegression(
    featuresCol="features",
    labelCol="label",
    maxIter=10,
    regParam=0.01,
    elasticNetParam=0.0,
    family="multinomial"  # Suporta multiclasse diretamente
)

print("Treinando Logistic Regression...")
print("‚è≥ Isso pode levar 5-10 minutos...")
inicio_lr = time.time()

lr_model = lr.fit(df_train)

tempo_lr = time.time() - inicio_lr
print(f"‚úÖ Logistic Regression treinado em {tempo_lr:.2f} segundos ({tempo_lr/60:.2f} minutos)")

# Fazer predi√ß√µes no conjunto de valida√ß√£o
print("\nüîÆ Fazendo predi√ß√µes no conjunto de valida√ß√£o...")
df_val_lr = lr_model.transform(df_val)

# ================================================================================
# AVALIA√á√ÉO R√ÅPIDA - LOGISTIC REGRESSION
# ================================================================================

print("\nüìä AVALIA√á√ÉO R√ÅPIDA - LOGISTIC REGRESSION:")
print("-" * 80)

evaluator_accuracy_lr = MulticlassClassificationEvaluator(
    labelCol="label",
    predictionCol="prediction",
    metricName="accuracy"
)

evaluator_f1_lr = MulticlassClassificationEvaluator(
    labelCol="label",
    predictionCol="prediction",
    metricName="f1"
)

accuracy_lr = evaluator_accuracy_lr.evaluate(df_val_lr)
f1_lr = evaluator_f1_lr.evaluate(df_val_lr)

print(f"  Accuracy:  {accuracy_lr:.4f} ({accuracy_lr*100:.2f}%)")
print(f"  F1-Score:  {f1_lr:.4f}")

# ================================================================================
# SALVAR MODELOS EM HDFS
# ================================================================================

print("\nüíæ Salvando modelos treinados...")

# Criar diret√≥rio de modelos se n√£o existir
try:
    # Salvar Random Forest
    rf_path = f"/user/{spark.sparkContext.sparkUser()}/models/rf_{UF_FILTRO}_{ANO_REFERENCIA}"
    rf_model.write().overwrite().save(rf_path)
    print(f"‚úÖ Random Forest salvo: {rf_path}")
except Exception as e:
    print(f"‚ö†Ô∏è  Aviso ao salvar Random Forest: {e}")
    print("   Modelo permanece em mem√≥ria para uso nesta sess√£o")

try:
    # Salvar Logistic Regression
    lr_path = f"/user/{spark.sparkContext.sparkUser()}/models/lr_{UF_FILTRO}_{ANO_REFERENCIA}"
    lr_model.write().overwrite().save(lr_path)
    print(f"‚úÖ Logistic Regression salvo: {lr_path}")
except Exception as e:
    print(f"‚ö†Ô∏è  Aviso ao salvar Logistic Regression: {e}")
    print("   Modelo permanece em mem√≥ria para uso nesta sess√£o")

# ================================================================================
# SALVAR PREDI√á√ïES EM TABELAS HIVE
# ================================================================================

print("\nüíæ Salvando predi√ß√µes do conjunto de valida√ß√£o em tabelas Hive...")

# Preparar predi√ß√µes RF
df_val_rf_final = df_val_rf.select(
    "id_ecd", "cnpj", "cd_conta", "descr_conta", 
    "classificacao_nivel2", "label", "rf_prediction"
)

# Salvar RF
spark.sql(f"DROP TABLE IF EXISTS {DATABASE_DESTINO}.ecd_ml_predictions_rf_val")
df_val_rf_final.write.mode("overwrite").saveAsTable(f"{DATABASE_DESTINO}.ecd_ml_predictions_rf_val")
print(f"‚úÖ Predi√ß√µes RF salvas: {DATABASE_DESTINO}.ecd_ml_predictions_rf_val")

# Preparar predi√ß√µes LR
df_val_lr_final = df_val_lr.select(
    "id_ecd", "cnpj", "cd_conta", "descr_conta", 
    "classificacao_nivel2", "label", 
    col("prediction").alias("lr_prediction")
)

# Salvar LR
spark.sql(f"DROP TABLE IF EXISTS {DATABASE_DESTINO}.ecd_ml_predictions_lr_val")
df_val_lr_final.write.mode("overwrite").saveAsTable(f"{DATABASE_DESTINO}.ecd_ml_predictions_lr_val")
print(f"‚úÖ Predi√ß√µes LR salvas: {DATABASE_DESTINO}.ecd_ml_predictions_lr_val")

# ================================================================================
# SALVAR M√âTRICAS EM TABELA HIVE
# ================================================================================

print("\nüíæ Salvando m√©tricas dos modelos...")

# Criar DataFrame com m√©tricas
from pyspark.sql import Row

metricas_data = [
    Row(
        modelo="RandomForest",
        accuracy=float(accuracy_rf),
        f1_score=float(f1_rf),
        precision=float(precision_rf),
        recall=float(recall_rf),
        tempo_treino_segundos=float(tempo_rf),
        num_trees=rf.getNumTrees(),
        max_depth=rf.getMaxDepth(),
        ano_referencia=ANO_REFERENCIA,
        uf=UF_FILTRO
    ),
    Row(
        modelo="LogisticRegression",
        accuracy=float(accuracy_lr),
        f1_score=float(f1_lr),
        precision=None,
        recall=None,
        tempo_treino_segundos=float(tempo_lr),
        num_trees=None,
        max_depth=None,
        ano_referencia=ANO_REFERENCIA,
        uf=UF_FILTRO
    )
]

df_metricas = spark.createDataFrame(metricas_data)

# Salvar m√©tricas
spark.sql(f"DROP TABLE IF EXISTS {DATABASE_DESTINO}.ecd_ml_metricas")
df_metricas.write.mode("overwrite").saveAsTable(f"{DATABASE_DESTINO}.ecd_ml_metricas")
print(f"‚úÖ M√©tricas salvas: {DATABASE_DESTINO}.ecd_ml_metricas")

# ================================================================================
# ESTAT√çSTICAS FINAIS
# ================================================================================

print("\n" + "=" * 80)
print("üìä RESUMO DO TREINAMENTO")
print("=" * 80)

print(f"\nüå≤ Random Forest:")
print(f"  - √Årvores: {rf.getNumTrees()}")
print(f"  - Profundidade m√°xima: {rf.getMaxDepth()}")
print(f"  - Tempo de treino: {tempo_rf/60:.2f} minutos")
print(f"  - Accuracy: {accuracy_rf*100:.2f}%")
print(f"  - F1-Score: {f1_rf:.4f}")

print(f"\nüìä Logistic Regression:")
print(f"  - Max Iterations: {lr.getMaxIter()}")
print(f"  - Regularization: {lr.getRegParam()}")
print(f"  - Tempo de treino: {tempo_lr/60:.2f} minutos")
print(f"  - Accuracy: {accuracy_lr*100:.2f}%")
print(f"  - F1-Score: {f1_lr:.4f}")

print(f"\n‚è±Ô∏è  Tempo total: {(tempo_rf + tempo_lr)/60:.2f} minutos")

# Melhor modelo
melhor_modelo = "Random Forest" if accuracy_rf > accuracy_lr else "Logistic Regression"
melhor_accuracy = accuracy_rf if accuracy_rf > accuracy_lr else accuracy_lr
print(f"\nüèÜ Melhor modelo: {melhor_modelo} (Accuracy: {melhor_accuracy*100:.2f}%)")

print("\nüìä Tabelas criadas:")
print(f"  - {DATABASE_DESTINO}.ecd_ml_predictions_rf_val")
print(f"  - {DATABASE_DESTINO}.ecd_ml_predictions_lr_val")
print(f"  - {DATABASE_DESTINO}.ecd_ml_metricas")

print("\n" + "=" * 80)
print("‚úÖ TREINAMENTO CONCLU√çDO!")
print("=" * 80)
print("\nüéØ Pr√≥ximo passo: Execute C√âLULA 11 para an√°lise detalhada")
print("=" * 80)

In [None]:
# ================================================================================
# C√âLULA 11: AVALIAR PERFORMANCE DOS MODELOS
# ================================================================================

print("\nüìà AVALIA√á√ÉO DETALHADA DE PERFORMANCE DOS MODELOS...")
print("=" * 80)

from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.sql.functions import col, when, count, sum as spark_sum
import pandas as pd
import plotly.express as px
import plotly.graph_objects as go
from plotly.subplots import make_subplots
import numpy as np

# ================================================================================
# CARREGAR PREDI√á√ïES DAS TABELAS HIVE
# ================================================================================

print("\nüì• Carregando predi√ß√µes das tabelas Hive...")

try:
    # Carregar predi√ß√µes salvas
    df_val_rf = spark.table(f"{DATABASE_DESTINO}.ecd_ml_predictions_rf_val")
    
    # Carregar m√©tricas b√°sicas
    df_metricas = spark.table(f"{DATABASE_DESTINO}.ecd_ml_metricas")
    
    print("‚úÖ Predi√ß√µes e m√©tricas carregadas!")
    
    # Mostrar m√©tricas b√°sicas
    print("\nüìä M√©tricas B√°sicas (j√° calculadas):")
    df_metricas.show(truncate=False)
    
except Exception as e:
    print(f"‚ùå ERRO ao carregar dados: {e}")
    print("‚ö†Ô∏è  Execute a C√âLULA 10 primeiro para treinar os modelos!")
    raise

# ================================================================================
# CARREGAR MAPEAMENTO DE LABELS
# ================================================================================

print("\nüì• Carregando mapeamento de labels...")

label_mapping = spark.table(f"{DATABASE_DESTINO}.ml_label_mapping").toPandas()
print(f"‚úÖ {len(label_mapping)} labels carregados")

# Criar dicion√°rio de mapeamento
label_to_class = dict(zip(label_mapping['label'], label_mapping['classificacao_nivel2']))

print("\nüìã Amostra do mapeamento (primeiras 10 classes):")
print(label_mapping.head(10))

# ================================================================================
# M√âTRICAS DETALHADAS
# ================================================================================

print("\nüìä EXTRAINDO M√âTRICAS...")
print("=" * 80)

# Converter m√©tricas para Pandas
metricas_pd = df_metricas.toPandas()

# Extrair Random Forest
rf_row = metricas_pd[metricas_pd['modelo'] == 'RandomForest'].iloc[0]
rf_accuracy = rf_row['accuracy']
rf_f1 = rf_row['f1_score']
rf_precision = rf_row['precision'] if 'precision' in rf_row and rf_row['precision'] is not None else None
rf_recall = rf_row['recall'] if 'recall' in rf_row and rf_row['recall'] is not None else None

print("\nüå≤ RANDOM FOREST:")
print("-" * 40)
print(f"  Accuracy:  {rf_accuracy:.4f} ({rf_accuracy*100:.2f}%)")
print(f"  F1-Score:  {rf_f1:.4f}")
if rf_precision:
    print(f"  Precision: {rf_precision:.4f}")
if rf_recall:
    print(f"  Recall:    {rf_recall:.4f}")

# Extrair Logistic Regression se existir
lr_existe = False
try:
    lr_row = metricas_pd[metricas_pd['modelo'] == 'LogisticRegression'].iloc[0]
    lr_accuracy = lr_row['accuracy']
    lr_f1 = lr_row['f1_score']
    lr_existe = True
    
    print("\nüìä LOGISTIC REGRESSION:")
    print("-" * 40)
    print(f"  Accuracy:  {lr_accuracy:.4f} ({lr_accuracy*100:.2f}%)")
    print(f"  F1-Score:  {lr_f1:.4f}")
except:
    print("\n‚ÑπÔ∏è  Logistic Regression n√£o encontrado (apenas RF dispon√≠vel)")

# ================================================================================
# GR√ÅFICO DE COMPARA√á√ÉO DE MODELOS
# ================================================================================

print("\nüìä Gerando gr√°fico de compara√ß√£o...")

# Criar DataFrame de compara√ß√£o
if lr_existe:
    metricas_comparacao = pd.DataFrame({
        'Modelo': ['Random Forest', 'Logistic Regression'],
        'Accuracy': [rf_accuracy, lr_accuracy],
        'F1-Score': [rf_f1, lr_f1]
    })
    cores = ['#2ecc71', '#3498db']
else:
    metricas_comparacao = pd.DataFrame({
        'Modelo': ['Random Forest'],
        'Accuracy': [rf_accuracy],
        'F1-Score': [rf_f1]
    })
    cores = ['#2ecc71']

print(metricas_comparacao)

# Gr√°fico de barras
fig_comparacao = go.Figure()

for idx, modelo in enumerate(metricas_comparacao['Modelo']):
    row = metricas_comparacao[metricas_comparacao['Modelo'] == modelo].iloc[0]
    valores = [row['Accuracy'], row['F1-Score']]
    
    fig_comparacao.add_trace(go.Bar(
        name=modelo,
        x=['Accuracy', 'F1-Score'],
        y=valores,
        text=[f"{v:.2%}" for v in valores],
        textposition='auto',
        marker_color=cores[idx]
    ))

fig_comparacao.update_layout(
    title=f"Performance dos Modelos - {UF_FILTRO} {ANO_REFERENCIA}",
    yaxis_title="Score",
    yaxis_range=[0, 1.05],
    barmode='group',
    height=500
)
fig_comparacao.show()

# ================================================================================
# MATRIZ DE CONFUS√ÉO (RANDOM FOREST)
# ================================================================================

print("\nüìä CALCULANDO MATRIZ DE CONFUS√ÉO (Random Forest)...")
print("-" * 80)

# Calcular matriz de confus√£o
confusion_matrix_data = df_val_rf.groupBy("label", "rf_prediction").count().toPandas()

# Pegar top 15 classes mais frequentes
top_classes = df_val_rf.groupBy("label").count().orderBy(col("count").desc()).limit(15).toPandas()
top_labels = top_classes['label'].tolist()

print(f"Analisando top {len(top_labels)} classes...")

# Filtrar matriz
confusion_filtered = confusion_matrix_data[
    confusion_matrix_data['label'].isin(top_labels) & 
    confusion_matrix_data['rf_prediction'].isin(top_labels)
]

# Criar pivot
confusion_pivot = confusion_filtered.pivot_table(
    index='label', 
    columns='rf_prediction', 
    values='count', 
    fill_value=0
)

# Normalizar
confusion_normalized = confusion_pivot.div(confusion_pivot.sum(axis=1), axis=0)

# Mapear labels
index_labels = [label_to_class.get(l, f"Label_{l}") for l in confusion_normalized.index]
column_labels = [label_to_class.get(l, f"Label_{l}") for l in confusion_normalized.columns]

# Heatmap
fig_confusion = px.imshow(
    confusion_normalized.values,
    x=column_labels,
    y=index_labels,
    labels=dict(x="Predi√ß√£o", y="Real", color="% Predito"),
    title=f"Matriz de Confus√£o (Top {len(top_labels)} Classes) - Random Forest",
    color_continuous_scale="Blues",
    height=800,
    width=900,
    text_auto='.2%'
)

fig_confusion.update_layout(xaxis_tickangle=-45, font=dict(size=10))
fig_confusion.show()

# ================================================================================
# PERFORMANCE POR CLASSE
# ================================================================================

print("\nüìä CALCULANDO PERFORMANCE POR CLASSE...")
print("-" * 80)

performance_por_classe_spark = df_val_rf.groupBy("label", "classificacao_nivel2").agg(
    count("*").alias("total"),
    spark_sum(when(col("label") == col("rf_prediction"), 1).otherwise(0)).alias("corretos")
).withColumn(
    "accuracy_classe",
    col("corretos") / col("total")
).orderBy(col("total").desc())

performance_por_classe = performance_por_classe_spark.limit(30).toPandas()

print("\nüìã Top 30 Classes por Volume:")
print(performance_por_classe)

# Salvar
print("\nüíæ Salvando performance por classe...")
spark.sql(f"DROP TABLE IF EXISTS {DATABASE_DESTINO}.ecd_ml_performance_por_classe")
performance_por_classe_spark.write.mode("overwrite").saveAsTable(
    f"{DATABASE_DESTINO}.ecd_ml_performance_por_classe"
)
print(f"‚úÖ Salvo: {DATABASE_DESTINO}.ecd_ml_performance_por_classe")

# Gr√°fico
performance_plot = performance_por_classe.head(25)

fig_classe = go.Figure()
fig_classe.add_trace(go.Bar(
    x=performance_plot['classificacao_nivel2'],
    y=performance_plot['accuracy_classe'],
    text=performance_plot['accuracy_classe'].apply(lambda x: f"{x:.1%}"),
    textposition='auto',
    marker_color=performance_plot['accuracy_classe'].apply(
        lambda x: '#27ae60' if x >= 0.9 else '#f39c12' if x >= 0.7 else '#e74c3c'
    ),
    hovertemplate='<b>%{x}</b><br>Accuracy: %{y:.2%}<br>Total: %{customdata}<extra></extra>',
    customdata=performance_plot['total']
))

fig_classe.update_layout(
    title=f"Accuracy por Classe (Top 25) - Random Forest",
    xaxis_title="Classifica√ß√£o",
    yaxis_title="Accuracy",
    yaxis_range=[0, 1.05],
    height=600,
    xaxis_tickangle=-45,
    showlegend=False
)
fig_classe.show()

# ================================================================================
# AN√ÅLISE DE ERROS
# ================================================================================

print("\n‚ùå AN√ÅLISE DE ERROS...")
print("-" * 80)

erros = df_val_rf.filter(col("label") != col("rf_prediction"))
total_erros = erros.count()
total_val = df_val_rf.count()

print(f"Total de erros: {total_erros:,} / {total_val:,} ({total_erros*100/total_val:.2f}%)")
print(f"Taxa de acerto: {(1 - total_erros/total_val)*100:.2f}%")

# Top erros
print("\nüîù Top 10 Pares de Erros Mais Comuns:")
erros_comuns = erros.groupBy("label", "rf_prediction", "classificacao_nivel2").count() \
    .orderBy(col("count").desc()).limit(10)
erros_comuns_df = erros_comuns.toPandas()
print(erros_comuns_df)

# Salvar erros
print("\nüíæ Salvando erros...")
spark.sql(f"DROP TABLE IF EXISTS {DATABASE_DESTINO}.ecd_ml_erros_rf")
erros.select(
    "id_ecd", "cnpj", "cd_conta", "descr_conta",
    "label", "classificacao_nivel2", "rf_prediction"
).write.mode("overwrite").saveAsTable(f"{DATABASE_DESTINO}.ecd_ml_erros_rf")
print(f"‚úÖ Salvo: {DATABASE_DESTINO}.ecd_ml_erros_rf ({total_erros:,} registros)")

# Amostra
print("\nüìã Amostra de Erros:")
erros.select("cd_conta", "descr_conta", "classificacao_nivel2", "rf_prediction").show(20, truncate=50)

# ================================================================================
# RESUMO FINAL
# ================================================================================

print("\n" + "=" * 80)
print("üìä RESUMO DA AVALIA√á√ÉO")
print("=" * 80)

print(f"\nüèÜ MODELO PRINCIPAL: Random Forest")
print(f"   Accuracy: {rf_accuracy:.4f} ({rf_accuracy*100:.2f}%)")
print(f"   F1-Score: {rf_f1:.4f}")

print(f"\nüìä ESTAT√çSTICAS:")
print(f"   Total valida√ß√£o: {total_val:,} contas")
print(f"   Acertos: {total_val - total_erros:,} ({(1-total_erros/total_val)*100:.2f}%)")
print(f"   Erros: {total_erros:,} ({total_erros*100/total_val:.2f}%)")

print(f"\nüìä CLASSES:")
print(f"   Total de classes: {len(label_mapping)}")

# Top/Bottom performers
top_performers = performance_por_classe.nlargest(5, 'accuracy_classe')
print(f"\nüåü TOP 5 CLASSES (Melhor Accuracy):")
for idx, row in top_performers.iterrows():
    print(f"   {row['classificacao_nivel2']}: {row['accuracy_classe']:.2%} ({row['total']:,} contas)")

bottom_performers = performance_por_classe.nsmallest(5, 'accuracy_classe')
print(f"\n‚ö†Ô∏è  TOP 5 CLASSES (Pior Accuracy):")
for idx, row in bottom_performers.iterrows():
    print(f"   {row['classificacao_nivel2']}: {row['accuracy_classe']:.2%} ({row['total']:,} contas)")

print(f"\nüìÇ TABELAS CRIADAS:")
print(f"   - {DATABASE_DESTINO}.ecd_ml_performance_por_classe")
print(f"   - {DATABASE_DESTINO}.ecd_ml_erros_rf")

print("\n" + "=" * 80)
print("‚úÖ AVALIA√á√ÉO CONCLU√çDA!")
print("=" * 80)
print("\nüéØ Modelo pronto para classificar contas n√£o classificadas!")
print("=" * 80)

In [None]:
# ================================================================================
# C√âLULA 12: APLICAR MODELO NAS CONTAS N√ÉO CLASSIFICADAS
# ================================================================================

print("\nüîÆ APLICANDO MODELO NAS CONTAS N√ÉO CLASSIFICADAS...")
print("=" * 80)

from pyspark.sql.functions import col, length, regexp_extract, when, lit, abs as spark_abs, split, udf, coalesce
from pyspark.sql.types import StringType, DoubleType
import pandas as pd

# ================================================================================
# CARREGAR MODELO RANDOM FOREST
# ================================================================================

print("\nüì• Carregando modelo Random Forest...")

from pyspark.ml.classification import RandomForestClassificationModel

try:
    # Tentar carregar do HDFS
    rf_path = f"/user/{spark.sparkContext.sparkUser()}/models/rf_{UF_FILTRO}_{ANO_REFERENCIA}"
    rf_model = RandomForestClassificationModel.load(rf_path)
    print(f"‚úÖ Modelo carregado de: {rf_path}")
except Exception as e:
    print(f"‚ö†Ô∏è  Erro ao carregar modelo: {e}")
    print("‚ùå Execute a C√âLULA 10 primeiro para treinar o modelo!")
    raise

# ================================================================================
# CARREGAR CONTAS N√ÉO CLASSIFICADAS
# ================================================================================

print("\nüì• Carregando contas n√£o classificadas...")

df_nao_classificadas = spark.sql(f"""
    SELECT 
        id_ecd, cnpj, cd_conta, descr_conta, nivel_conta, cd_conta_sup,
        origem_demonstrativo, ind_grp_bal, ind_grp_dre, cd_natureza,
        tp_conta_agl, tp_conta_pc,
        classificacao_nivel1,
        vl_cta_ini, vl_cta_fin
    FROM {DATABASE_DESTINO}.ecd_contas_classificadas_final
    WHERE ano_referencia = {ANO_REFERENCIA}
        AND (classificacao_nivel2 IS NULL 
             OR classificacao_nivel2 = '' 
             OR classificacao_nivel2 = 'NAO_CLASSIFICADO')
""")

total_nao_classificadas = df_nao_classificadas.count()
print(f"‚úÖ Contas n√£o classificadas: {total_nao_classificadas:,}")

if total_nao_classificadas == 0:
    print("\nüéâ Todas as contas j√° est√£o classificadas! Nada a fazer.")
    print("=" * 80)
else:
    # ================================================================================
    # APLICAR FEATURE ENGINEERING (MESMA L√ìGICA DA C√âLULA 9)
    # ================================================================================
    
    print("\nüîß Aplicando feature engineering...")
    print("‚è≥ Processando features para {:,} contas...".format(total_nao_classificadas))
    
    df_features = df_nao_classificadas \
        .withColumn("tamanho_descricao", length(col("descr_conta"))) \
        .withColumn("tamanho_codigo", length(col("cd_conta"))) \
        .withColumn("primeiro_digito_codigo", regexp_extract(col("cd_conta"), r"^(\d)", 1)) \
        .withColumn("tem_ponto_codigo", when(col("cd_conta").contains("."), lit(1)).otherwise(lit(0))) \
        .withColumn("tem_hifen_codigo", when(col("cd_conta").contains("-"), lit(1)).otherwise(lit(0))) \
        .withColumn("tem_underscore_codigo", when(col("cd_conta").contains("_"), lit(1)).otherwise(lit(0))) \
        .withColumn("valor_absoluto_final", when(col("vl_cta_fin").isNull(), lit(0.0)).otherwise(spark_abs(col("vl_cta_fin")))) \
        .withColumn("valor_absoluto_inicial", when(col("vl_cta_ini").isNull(), lit(0.0)).otherwise(spark_abs(col("vl_cta_ini")))) \
        .withColumn("variacao_valor", col("valor_absoluto_final") - col("valor_absoluto_inicial")) \
        .withColumn("tem_conta_superior", when(col("cd_conta_sup").isNotNull(), lit(1)).otherwise(lit(0)))
    
    # Tratar nulos e strings vazias
    for col_name in ['ind_grp_bal', 'ind_grp_dre', 'tp_conta_agl', 'tp_conta_pc', 
                     'cd_natureza', 'primeiro_digito_codigo', 'classificacao_nivel1']:
        df_features = df_features.withColumn(
            col_name,
            when((col(col_name).isNull()) | (col(col_name) == ''), lit('DESCONHECIDO'))
            .otherwise(col(col_name))
        )
    
    # Features textuais (keywords)
    keywords_features = {
        'tem_palavra_caixa': ['caixa', 'cash'],
        'tem_palavra_banco': ['banco', 'bank'],
        'tem_palavra_estoque': ['estoque', 'mercadoria', 'produto'],
        'tem_palavra_cliente': ['cliente', 'duplicata', 'receber'],
        'tem_palavra_fornecedor': ['fornecedor', 'fornecimento'],
        'tem_palavra_salario': ['salario', 'folha', 'ferias'],
        'tem_palavra_tributo': ['tributo', 'imposto', 'icms', 'pis', 'cofins'],
        'tem_palavra_receita': ['receita', 'venda', 'faturamento'],
        'tem_palavra_despesa': ['despesa', 'custo', 'gasto'],
        'tem_palavra_financeiro': ['juros', 'financeiro', 'emprestimo'],
        'tem_palavra_imobilizado': ['imovel', 'veiculo', 'maquina', 'equipamento'],
        'tem_palavra_capital': ['capital', 'social'],
        'tem_palavra_lucro': ['lucro', 'prejuizo', 'resultado']
    }
    
    for feature_name, keywords in keywords_features.items():
        condicao = lit(False)
        for keyword in keywords:
            condicao = condicao | col("descr_conta").contains(keyword)
        df_features = df_features.withColumn(feature_name, when(condicao, lit(1)).otherwise(lit(0)))
    
    print("‚úÖ Features criadas!")
    
    # ================================================================================
    # ENCODING (MESMO PIPELINE DA C√âLULA 9)
    # ================================================================================
    
    print("\nüî¢ Aplicando encoding de vari√°veis categ√≥ricas...")
    
    from pyspark.ml.feature import StringIndexer, VectorAssembler, OneHotEncoder
    from pyspark.ml import Pipeline
    
    categorical_cols = [
        'origem_demonstrativo', 'ind_grp_bal', 'ind_grp_dre', 'cd_natureza',
        'tp_conta_agl', 'tp_conta_pc', 'primeiro_digito_codigo', 'classificacao_nivel1'
    ]
    
    indexers = [
        StringIndexer(inputCol=col_name, outputCol=f"{col_name}_index", handleInvalid="keep")
        for col_name in categorical_cols
    ]
    
    encoders = [
        OneHotEncoder(inputCol=f"{col_name}_index", outputCol=f"{col_name}_encoded")
        for col_name in categorical_cols
    ]
    
    pipeline_encoding = Pipeline(stages=indexers + encoders)
    model_encoding = pipeline_encoding.fit(df_features)
    df_encoded = model_encoding.transform(df_features)
    
    print("‚úÖ Encoding aplicado!")
    
    # ================================================================================
    # ASSEMBLER DE FEATURES
    # ================================================================================
    
    print("\nüîó Criando vetor de features...")
    
    numeric_features = [
        'nivel_conta', 'tamanho_descricao', 'tamanho_codigo',
        'tem_ponto_codigo', 'tem_hifen_codigo', 'tem_underscore_codigo',
        'valor_absoluto_final', 'valor_absoluto_inicial', 'variacao_valor', 'tem_conta_superior'
    ] + list(keywords_features.keys())
    
    categorical_encoded_features = [f"{col_name}_encoded" for col_name in categorical_cols]
    all_features = numeric_features + categorical_encoded_features
    
    assembler = VectorAssembler(
        inputCols=all_features,
        outputCol="features",
        handleInvalid="keep"
    )
    
    df_assembled = assembler.transform(df_encoded)
    
    print("‚úÖ Vetor de features criado!")
    
    # ================================================================================
    # FAZER PREDI√á√ïES
    # ================================================================================
    
    print("\nüîÆ Fazendo predi√ß√µes com Random Forest...")
    print("‚è≥ Isso pode levar alguns minutos...")
    
    df_predictions = rf_model.transform(df_assembled)
    
    print("‚úÖ Predi√ß√µes conclu√≠das!")
    
    # ================================================================================
    # CONVERTER LABEL NUM√âRICA PARA CLASSIFICA√á√ÉO
    # ================================================================================
    
    print("\nüîÑ Convertendo labels num√©ricas para classifica√ß√µes...")
    
    # Carregar mapeamento de tabela Hive
    label_mapping_df = spark.table(f"{DATABASE_DESTINO}.ml_label_mapping")
    label_mapping = label_mapping_df.toPandas()
    label_map_dict = dict(zip(label_mapping['label'], label_mapping['classificacao_nivel2']))
    
    print(f"‚úÖ {len(label_map_dict)} mapeamentos carregados")
    
    # Broadcast do dicion√°rio para melhor performance
    label_map_broadcast = spark.sparkContext.broadcast(label_map_dict)
    
    # Criar UDF para converter
    @udf(returnType=StringType())
    def label_to_class(label):
        if label is None:
            return 'DESCONHECIDO'
        try:
            return label_map_broadcast.value.get(float(label), 'DESCONHECIDO')
        except:
            return 'DESCONHECIDO'
    
    df_predictions = df_predictions.withColumn(
        "classificacao_predita",
        label_to_class(col("rf_prediction"))
    )
    
    # Extrair confian√ßa (probabilidade m√°xima)
    @udf(returnType=DoubleType())
    def get_max_probability(probability_vector):
        if probability_vector is None:
            return 0.0
        try:
            probs = probability_vector.toArray()
            return float(max(probs)) if len(probs) > 0 else 0.0
        except:
            return 0.0
    
    df_predictions = df_predictions.withColumn(
        "confianca_predicao",
        get_max_probability(col("rf_probability"))
    )
    
    print("‚úÖ Labels convertidas!")
    
    # ================================================================================
    # SALVAR PREDI√á√ïES EM TABELA HIVE
    # ================================================================================
    
    print("\nüíæ Salvando predi√ß√µes em tabela Hive...")
    
    # Selecionar colunas relevantes
    df_final_pred = df_predictions.select(
        "id_ecd", "cnpj", "cd_conta", "descr_conta", "nivel_conta",
        "classificacao_nivel1", "classificacao_predita", "confianca_predicao"
    ).withColumn("modelo_usado", lit("RandomForest"))
    
    # Salvar
    spark.sql(f"DROP TABLE IF EXISTS {DATABASE_DESTINO}.ecd_ml_predicoes")
    df_final_pred.write.mode("overwrite").saveAsTable(
        f"{DATABASE_DESTINO}.ecd_ml_predicoes"
    )
    
    print(f"‚úÖ Predi√ß√µes salvas: {DATABASE_DESTINO}.ecd_ml_predicoes")
    
    # ================================================================================
    # ESTAT√çSTICAS DAS PREDI√á√ïES
    # ================================================================================
    
    print("\nüìä ESTAT√çSTICAS DAS PREDI√á√ïES")
    print("=" * 80)
    
    print("\nüìã Top 20 Classifica√ß√µes Preditas:")
    spark.sql(f"""
        SELECT 
            classificacao_predita,
            COUNT(*) as qtd,
            ROUND(COUNT(*) * 100.0 / SUM(COUNT(*)) OVER (), 2) as percentual
        FROM {DATABASE_DESTINO}.ecd_ml_predicoes
        GROUP BY classificacao_predita
        ORDER BY qtd DESC
        LIMIT 20
    """).show(truncate=False)
    
    print("\nüìä Distribui√ß√£o de Confian√ßa:")
    spark.sql(f"""
        SELECT 
            ROUND(MIN(confianca_predicao), 4) as min,
            ROUND(PERCENTILE(confianca_predicao, 0.25), 4) as q1,
            ROUND(PERCENTILE(confianca_predicao, 0.50), 4) as mediana,
            ROUND(PERCENTILE(confianca_predicao, 0.75), 4) as q3,
            ROUND(MAX(confianca_predicao), 4) as max,
            ROUND(AVG(confianca_predicao), 4) as media
        FROM {DATABASE_DESTINO}.ecd_ml_predicoes
    """).show(truncate=False)
    
    # Distribui√ß√£o por faixa de confian√ßa
    print("\nüìä Predi√ß√µes por Faixa de Confian√ßa:")
    spark.sql(f"""
        SELECT 
            CASE 
                WHEN confianca_predicao >= 0.9 THEN 'Muito Alta (>=90%)'
                WHEN confianca_predicao >= 0.7 THEN 'Alta (70-90%)'
                WHEN confianca_predicao >= 0.5 THEN 'M√©dia (50-70%)'
                ELSE 'Baixa (<50%)'
            END as faixa_confianca,
            COUNT(*) as qtd,
            ROUND(COUNT(*) * 100.0 / SUM(COUNT(*)) OVER (), 2) as percentual
        FROM {DATABASE_DESTINO}.ecd_ml_predicoes
        GROUP BY 
            CASE 
                WHEN confianca_predicao >= 0.9 THEN 'Muito Alta (>=90%)'
                WHEN confianca_predicao >= 0.7 THEN 'Alta (70-90%)'
                WHEN confianca_predicao >= 0.5 THEN 'M√©dia (50-70%)'
                ELSE 'Baixa (<50%)'
            END
        ORDER BY 
            CASE 
                WHEN faixa_confianca = 'Muito Alta (>=90%)' THEN 1
                WHEN faixa_confianca = 'Alta (70-90%)' THEN 2
                WHEN faixa_confianca = 'M√©dia (50-70%)' THEN 3
                ELSE 4
            END
    """).show(truncate=False)
    
    # Amostra de predi√ß√µes com alta confian√ßa
    print("\nüìã Amostra de Predi√ß√µes (confian√ßa >= 80%):")
    spark.sql(f"""
        SELECT 
            cd_conta, descr_conta, classificacao_nivel1, 
            classificacao_predita, 
            ROUND(confianca_predicao, 4) as confianca
        FROM {DATABASE_DESTINO}.ecd_ml_predicoes
        WHERE confianca_predicao >= 0.8
        ORDER BY confianca_predicao DESC
        LIMIT 20
    """).show(truncate=50)
    
    # Amostra de predi√ß√µes com baixa confian√ßa (para revis√£o)
    print("\n‚ö†Ô∏è  Amostra de Predi√ß√µes com Baixa Confian√ßa (<50%):")
    baixa_confianca_count = spark.sql(f"""
        SELECT COUNT(*) as cnt
        FROM {DATABASE_DESTINO}.ecd_ml_predicoes
        WHERE confianca_predicao < 0.5
    """).collect()[0]['cnt']
    
    print(f"Total com baixa confian√ßa: {baixa_confianca_count:,}")
    
    if baixa_confianca_count > 0:
        spark.sql(f"""
            SELECT 
                cd_conta, descr_conta, classificacao_predita,
                ROUND(confianca_predicao, 4) as confianca
            FROM {DATABASE_DESTINO}.ecd_ml_predicoes
            WHERE confianca_predicao < 0.5
            ORDER BY confianca_predicao
            LIMIT 20
        """).show(truncate=50)

    # ================================================================================
    # RESUMO FINAL
    # ================================================================================
    
    print("\n" + "=" * 80)
    print("üìä RESUMO DAS PREDI√á√ïES")
    print("=" * 80)
    
    print(f"\n‚úÖ Total de contas classificadas por ML: {total_nao_classificadas:,}")
    print(f"üíæ Tabela: {DATABASE_DESTINO}.ecd_ml_predicoes")
    print(f"ü§ñ Modelo usado: Random Forest")
    
    # Calcular cobertura total estimada
    total_contas = spark.sql(f"""
        SELECT COUNT(*) as cnt 
        FROM {DATABASE_DESTINO}.ecd_contas_classificadas_final
        WHERE ano_referencia = {ANO_REFERENCIA}
    """).collect()[0]['cnt']
    
    contas_ja_classificadas = total_contas - total_nao_classificadas
    perc_ja_classificadas = (contas_ja_classificadas * 100.0 / total_contas) if total_contas > 0 else 0
    perc_ml = (total_nao_classificadas * 100.0 / total_contas) if total_contas > 0 else 0
    
    print(f"\nüìà COBERTURA TOTAL:")
    print(f"   Total de contas: {total_contas:,}")
    print(f"   J√° classificadas (regras): {contas_ja_classificadas:,} ({perc_ja_classificadas:.2f}%)")
    print(f"   Classificadas por ML: {total_nao_classificadas:,} ({perc_ml:.2f}%)")
    print(f"   COBERTURA TOTAL: ~100%! üéâ")

print("\n" + "=" * 80)
print("‚úÖ PREDI√á√ïES CONCLU√çDAS!")
print("=" * 80)
print("\nüéØ Pr√≥ximo passo: Validar predi√ß√µes e integrar √† tabela final")
print("=" * 80)

In [None]:
# ================================================================================
# C√âLULA 13: AN√ÅLISE DE FEATURE IMPORTANCE E REFINAMENTO
# ================================================================================

print("\nüîç AN√ÅLISE DE FEATURE IMPORTANCE E REFINAMENTO...")
print("=" * 80)

from pyspark.ml.classification import RandomForestClassificationModel
from pyspark.sql.functions import col
import pandas as pd
import plotly.graph_objects as go
import plotly.express as px
from plotly.subplots import make_subplots
import numpy as np

# ================================================================================
# CARREGAR MODELO RANDOM FOREST
# ================================================================================

print("\nüì• Carregando modelo Random Forest...")

try:
    # Caminho do modelo salvo na C√âLULA 10
    rf_path = f"/user/{spark.sparkContext.sparkUser()}/models/rf_{UF_FILTRO}_{ANO_REFERENCIA}"
    rf_model = RandomForestClassificationModel.load(rf_path)
    print("‚úÖ Modelo Random Forest carregado!")
except Exception as e:
    print(f"‚ùå ERRO ao carregar modelo: {e}")
    print("‚ö†Ô∏è  Execute a C√âLULA 10 primeiro para treinar o modelo!")
    raise

# ================================================================================
# DEFINIR NOMES DAS FEATURES
# ================================================================================

print("\nüìã Definindo nomes das features...")

# Features num√©ricas (conforme C√âLULA 9)
numeric_features = [
    'nivel_conta',
    'tamanho_descricao',
    'tamanho_codigo',
    'tem_ponto_codigo',
    'tem_hifen_codigo',
    'tem_underscore_codigo',
    'valor_absoluto_final',
    'valor_absoluto_inicial',
    'variacao_valor',
    'tem_conta_superior',
    'tem_palavra_caixa',
    'tem_palavra_banco',
    'tem_palavra_estoque',
    'tem_palavra_cliente',
    'tem_palavra_fornecedor',
    'tem_palavra_salario',
    'tem_palavra_tributo',
    'tem_palavra_receita',
    'tem_palavra_despesa',
    'tem_palavra_financeiro',
    'tem_palavra_imobilizado',
    'tem_palavra_capital',
    'tem_palavra_lucro'
]

# Features categ√≥ricas (conforme C√âLULA 9)
categorical_cols = [
    'origem_demonstrativo',
    'ind_grp_bal',
    'ind_grp_dre',
    'cd_natureza',
    'tp_conta_agl',
    'tp_conta_pc',
    'primeiro_digito_codigo',
    'classificacao_nivel1'
]

print(f"  Features num√©ricas: {len(numeric_features)}")
print(f"  Features categ√≥ricas: {len(categorical_cols)}")

# ================================================================================
# EXTRAIR FEATURE IMPORTANCE
# ================================================================================

print("\nüìä FEATURE IMPORTANCE - RANDOM FOREST")
print("-" * 80)

# Extrair import√¢ncias do modelo
rf_importances = rf_model.featureImportances.toArray()

print(f"Total de import√¢ncias extra√≠das: {len(rf_importances)}")

# Criar nomes para todas as features (num√©ricas + encoded)
# As features categ√≥ricas one-hot encoded geram m√∫ltiplas colunas
n_numeric = len(numeric_features)
n_total_features = len(rf_importances)
n_categorical_encoded = n_total_features - n_numeric

print(f"  Features num√©ricas: {n_numeric}")
print(f"  Features categ√≥ricas encoded: {n_categorical_encoded}")

# Lista de nomes das features
feature_names = numeric_features.copy()

# Adicionar nomes gen√©ricos para features categ√≥ricas encoded
for i in range(n_categorical_encoded):
    cat_idx = i % len(categorical_cols)
    feature_names.append(f"{categorical_cols[cat_idx]}_encoded_{i // len(categorical_cols)}")

# Criar DataFrame de import√¢ncia
importance_df_rf = pd.DataFrame({
    'feature': feature_names,
    'importance': rf_importances
}).sort_values('importance', ascending=False)

print("\nüìä Top 30 Features Mais Importantes:")
print(importance_df_rf.head(30).to_string())

# ================================================================================
# VISUALIZA√á√ÉO - TOP 30 FEATURES
# ================================================================================

print("\nüìä Gerando gr√°fico de feature importance...")

fig_importance = px.bar(
    importance_df_rf.head(30),
    x='importance',
    y='feature',
    orientation='h',
    title=f"Top 30 Features Mais Importantes - Random Forest - {UF_FILTRO} {ANO_REFERENCIA}",
    labels={'importance': 'Import√¢ncia', 'feature': 'Feature'},
    color='importance',
    color_continuous_scale='Viridis'
)

fig_importance.update_layout(
    height=800,
    yaxis={'categoryorder': 'total ascending'},
    showlegend=False
)

fig_importance.show()

# ================================================================================
# AN√ÅLISE DE FEATURES NUM√âRICAS vs CATEG√ìRICAS
# ================================================================================

print("\nüìä AN√ÅLISE: Features Num√©ricas vs Categ√≥ricas")
print("-" * 80)

# Separar import√¢ncias
importance_numeric = importance_df_rf.iloc[:n_numeric].copy()
importance_categorical = importance_df_rf.iloc[n_numeric:].copy()

total_importance_numeric = importance_numeric['importance'].sum()
total_importance_categorical = importance_categorical['importance'].sum()

print(f"\nImport√¢ncia total - Num√©ricas: {total_importance_numeric:.4f} ({total_importance_numeric*100:.2f}%)")
print(f"Import√¢ncia total - Categ√≥ricas: {total_importance_categorical:.4f} ({total_importance_categorical*100:.2f}%)")

# Gr√°fico de pizza
fig_pie = go.Figure(data=[go.Pie(
    labels=['Features Num√©ricas', 'Features Categ√≥ricas (Encoded)'],
    values=[total_importance_numeric, total_importance_categorical],
    hole=0.3
)])

fig_pie.update_layout(
    title=f"Distribui√ß√£o de Import√¢ncia: Num√©ricas vs Categ√≥ricas - {UF_FILTRO} {ANO_REFERENCIA}",
    height=500
)

fig_pie.show()

# ================================================================================
# IDENTIFICAR FEATURES DE BAIXA IMPORT√ÇNCIA
# ================================================================================

print("\nüìâ FEATURES DE BAIXA IMPORT√ÇNCIA")
print("-" * 80)

# Threshold de baixa import√¢ncia
threshold_baixa = 0.01
features_baixa_importancia = importance_df_rf[importance_df_rf['importance'] < threshold_baixa]

print(f"\nFeatures com import√¢ncia < {threshold_baixa}:")
print(f"  Total: {len(features_baixa_importancia)}")
print(f"  Percentual: {len(features_baixa_importancia)/len(importance_df_rf)*100:.2f}%")

if len(features_baixa_importancia) > 0:
    print("\n  Lista (primeiras 20):")
    for i, row in features_baixa_importancia.head(20).iterrows():
        print(f"   - {row['feature']}: {row['importance']:.6f}")

# ================================================================================
# FEATURES MAIS IMPORTANTES (TOP 10%)
# ================================================================================

print("\nüìà FEATURES MAIS IMPORTANTES (TOP 10%)")
print("-" * 80)

threshold_alta = importance_df_rf['importance'].quantile(0.9)
features_alta_importancia = importance_df_rf[importance_df_rf['importance'] >= threshold_alta]

print(f"\nFeatures no top 10% (import√¢ncia >= {threshold_alta:.6f}):")
print(f"  Total: {len(features_alta_importancia)}")

print("\n  Lista:")
for i, row in features_alta_importancia.iterrows():
    print(f"   - {row['feature']}: {row['importance']:.6f}")

# ================================================================================
# RECOMENDA√á√ïES DE REFINAMENTO
# ================================================================================

print("\n" + "=" * 80)
print("üí° RECOMENDA√á√ïES DE REFINAMENTO")
print("=" * 80)

print("\n1Ô∏è‚É£ FEATURES PARA REMOVER (baixa import√¢ncia < 0.01):")
if len(features_baixa_importancia) > 0:
    print(f"   ‚úÇÔ∏è  Remover {len(features_baixa_importancia)} features ({len(features_baixa_importancia)/len(importance_df_rf)*100:.1f}% do total)")
    print("   üí° Isso pode reduzir overfitting e melhorar generaliza√ß√£o")
    print("   ‚ö†Ô∏è  Validar impacto removendo e retreinando o modelo")
else:
    print("   ‚úÖ N√£o h√° features com import√¢ncia muito baixa")

print("\n2Ô∏è‚É£ FEATURES PARA ADICIONAR:")
print("   üí° Considere criar:")
print("   üìä Features de Intera√ß√£o:")
print("      - razao_valor_final_inicial (vl_cta_fin / vl_cta_ini)")
print("      - nivel_x_tamanho_descricao")
print("      - nivel_x_tamanho_codigo")
print("   üìä Features de Transforma√ß√£o:")
print("      - log_valor_absoluto_final (log1p para lidar com outliers)")
print("      - log_valor_absoluto_inicial")
print("   üìä Features Textuais Avan√ßadas:")
print("      - TF-IDF da descri√ß√£o (top N termos)")
print("      - N-gramas (bi-gramas, tri-gramas)")
print("      - Similaridade com termos do referencial")

print("\n3Ô∏è‚É£ AN√ÅLISE DE CLASSE:")
# Verificar balanceamento
df_train = spark.table(f"{DATABASE_DESTINO}.ecd_ml_train")
class_counts = df_train.groupBy("label").count().orderBy("count", ascending=False).toPandas()

ratio_maior_menor = class_counts['count'].max() / class_counts['count'].min()
print(f"   üìä Raz√£o maior/menor classe: {ratio_maior_menor:.2f}")

if ratio_maior_menor > 10:
    print("   ‚ö†Ô∏è  Classes desbalanceadas detectadas!")
    print("   üí° Recomenda√ß√µes:")
    print("      - Usar class weights no treinamento")
    print("      - Aplicar SMOTE (oversampling)")
    print("      - Undersampling da classe majorit√°ria")
else:
    print("   ‚úÖ Classes razoavelmente balanceadas")

print("\n4Ô∏è‚É£ HIPERPAR√ÇMETROS:")
print("   üîß Considere fazer grid search para otimizar:")
print("   üìå Random Forest:")
print("      - numTrees: testar [100, 150, 200]")
print("      - maxDepth: testar [10, 12, 15]")
print("      - minInstancesPerNode: testar [5, 10, 15]")
print("      - featureSubsetStrategy: testar ['sqrt', 'log2', '0.5']")

# ================================================================================
# SALVAR AN√ÅLISE
# ================================================================================

print("\nüíæ Salvando an√°lise de feature importance...")

# Converter para Spark DataFrame
df_importance = spark.createDataFrame(importance_df_rf)

# Salvar em tabela Hive
spark.sql(f"DROP TABLE IF EXISTS {DATABASE_DESTINO}.ecd_ml_feature_importance")

df_importance.write.mode("overwrite").saveAsTable(
    f"{DATABASE_DESTINO}.ecd_ml_feature_importance"
)

print(f"‚úÖ An√°lise salva: {DATABASE_DESTINO}.ecd_ml_feature_importance")

# Salvar lista de features para remover
if len(features_baixa_importancia) > 0:
    df_features_remover = spark.createDataFrame(features_baixa_importancia)
    
    spark.sql(f"DROP TABLE IF EXISTS {DATABASE_DESTINO}.ecd_ml_features_baixa_importancia")
    
    df_features_remover.write.mode("overwrite").saveAsTable(
        f"{DATABASE_DESTINO}.ecd_ml_features_baixa_importancia"
    )
    
    print(f"‚úÖ Features para remover salvas: {DATABASE_DESTINO}.ecd_ml_features_baixa_importancia")

# ================================================================================
# RESUMO FINAL
# ================================================================================

print("\n" + "=" * 80)
print("üìä RESUMO DA AN√ÅLISE DE FEATURE IMPORTANCE")
print("=" * 80)

print(f"\nüìà Total de features analisadas: {len(importance_df_rf)}")
print(f"   - Num√©ricas: {n_numeric}")
print(f"   - Categ√≥ricas (encoded): {n_categorical_encoded}")

print(f"\nüîù Top features:")
print(f"   - Top 10%: {len(features_alta_importancia)} features")
print(f"   - Import√¢ncia acumulada: {features_alta_importancia['importance'].sum():.4f}")

print(f"\nüìâ Features de baixa import√¢ncia:")
print(f"   - < 0.01: {len(features_baixa_importancia)} features ({len(features_baixa_importancia)/len(importance_df_rf)*100:.1f}%)")

print(f"\nüí™ Import√¢ncia por tipo:")
print(f"   - Num√©ricas: {total_importance_numeric:.4f} ({total_importance_numeric*100:.2f}%)")
print(f"   - Categ√≥ricas: {total_importance_categorical:.4f} ({total_importance_categorical*100:.2f}%)")

print(f"\nüìÇ Tabelas criadas:")
print(f"   - {DATABASE_DESTINO}.ecd_ml_feature_importance")
if len(features_baixa_importancia) > 0:
    print(f"   - {DATABASE_DESTINO}.ecd_ml_features_baixa_importancia")

print("\n" + "=" * 80)
print("‚úÖ AN√ÅLISE DE FEATURE IMPORTANCE CONCLU√çDA!")
print("=" * 80)
print("\nüéØ Pr√≥ximo passo: Execute C√âLULA 14 para retreinamento otimizado")
print("=" * 80)

In [None]:
# ================================================================================
# C√âLULA 14 CORRIGIDA: RETREINAMENTO COM ABORDAGEM CONSERVADORA
# ================================================================================

print("\nüîÑ RETREINAMENTO COM FEATURES OTIMIZADAS (VERS√ÉO CONSERVADORA)...")
print("=" * 80)

from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.sql.functions import col, when, lit, log1p, udf, abs as spark_abs
from pyspark.sql.types import DoubleType
import time
import pandas as pd

# ================================================================================
# CONFIGURA√á√ÉO DE RETREINAMENTO - VERS√ÉO CONSERVADORA
# ================================================================================

print("\n‚öôÔ∏è  CONFIGURA√á√ÉO DE RETREINAMENTO (CONSERVADORA)")
print("-" * 80)

# ‚ùå N√ÉO remover features de palavras-chave (s√£o essenciais!)
# ‚úÖ Apenas adicionar features de intera√ß√£o e usar class weights
REMOVER_FEATURES_BAIXA_IMPORTANCIA = False  # DESLIGADO - manter todas as features
ADICIONAR_FEATURES_INTERACAO = True          # Adicionar features de intera√ß√£o
USAR_CLASS_WEIGHTS = True                    # Balancear classes com pesos

# Hiperpar√¢metros otimizados
RF_NUM_TREES = 150        # Aumentado de 100
RF_MAX_DEPTH = 12         # Aumentado de 10
RF_MIN_INSTANCES = 5      # Reduzido de 10
RF_SUBSAMPLING_RATE = 0.8
RF_FEATURE_SUBSET = "sqrt"

print(f"‚úÖ Configura√ß√µes:")
print(f"   - Remover features baixa import√¢ncia: {REMOVER_FEATURES_BAIXA_IMPORTANCIA}")
print(f"   - Adicionar features de intera√ß√£o: {ADICIONAR_FEATURES_INTERACAO}")
print(f"   - Usar class weights: {USAR_CLASS_WEIGHTS}")
print(f"   - RF numTrees: {RF_NUM_TREES}")
print(f"   - RF maxDepth: {RF_MAX_DEPTH}")
print(f"   - RF minInstancesPerNode: {RF_MIN_INSTANCES}")

# ================================================================================
# CARREGAR DADOS
# ================================================================================

print("\nüì• Carregando dados de treino e valida√ß√£o...")

DATABASE_DESTINO = 'neac'
UF_FILTRO = 'SC'
ANO_REFERENCIA = 2024

df_train = spark.table(f"{DATABASE_DESTINO}.ecd_ml_train")
df_val = spark.table(f"{DATABASE_DESTINO}.ecd_ml_val")
df_test = spark.table(f"{DATABASE_DESTINO}.ecd_ml_test")

print(f"‚úÖ Treino: {df_train.count():,} registros")
print(f"‚úÖ Valida√ß√£o: {df_val.count():,} registros")
print(f"‚úÖ Teste: {df_test.count():,} registros")

# ================================================================================
# FEATURE ENGINEERING - APENAS ADICIONAR INTERA√á√ïES
# ================================================================================

print("\nüîß APLICANDO FEATURE ENGINEERING...")
print("-" * 80)

# Features num√©ricas base (MANTER TODAS - incluindo keywords!)
numeric_features_base = [
    'nivel_conta',
    'tamanho_descricao',
    'tamanho_codigo',
    'tem_ponto_codigo',
    'tem_hifen_codigo',
    'tem_underscore_codigo',
    'valor_absoluto_final',
    'valor_absoluto_inicial',
    'variacao_valor',
    'tem_conta_superior',
    'tem_palavra_caixa',
    'tem_palavra_banco',
    'tem_palavra_estoque',
    'tem_palavra_cliente',
    'tem_palavra_fornecedor',
    'tem_palavra_salario',
    'tem_palavra_tributo',
    'tem_palavra_receita',
    'tem_palavra_despesa',
    'tem_palavra_financeiro',
    'tem_palavra_imobilizado',
    'tem_palavra_capital',
    'tem_palavra_lucro'
]

# Adicionar features de intera√ß√£o
if ADICIONAR_FEATURES_INTERACAO:
    print("\n  ‚ûï Criando features de intera√ß√£o...")
    
    # Aplicar transforma√ß√µes em todos os datasets
    for df_name, df in [("Treino", df_train), ("Valida√ß√£o", df_val), ("Teste", df_test)]:
        print(f"     - Processando {df_name}...")
        
        # 1. Raz√£o entre valores (prote√ß√£o divis√£o por zero)
        df = df.withColumn(
            "razao_valor_final_inicial",
            when(col("valor_absoluto_inicial") > 0,
                 col("valor_absoluto_final") / col("valor_absoluto_inicial")
            ).otherwise(lit(0.0))
        )
        
        # 2. Intera√ß√µes nivel * tamanho
        df = df.withColumn(
            "nivel_x_tamanho_descricao",
            col("nivel_conta") * col("tamanho_descricao")
        )
        
        df = df.withColumn(
            "nivel_x_tamanho_codigo",
            col("nivel_conta") * col("tamanho_codigo")
        )
        
        # 3. Log de valores (lidar com outliers)
        df = df.withColumn(
            "log_valor_absoluto_final",
            log1p(col("valor_absoluto_final"))
        )
        
        df = df.withColumn(
            "log_valor_absoluto_inicial",
            log1p(col("valor_absoluto_inicial"))
        )
        
        # 4. Diferen√ßa absoluta entre valores
        df = df.withColumn(
            "diferenca_absoluta_valores",
            spark_abs(col("valor_absoluto_final") - col("valor_absoluto_inicial"))
        )
        
        # Atualizar refer√™ncia
        if df_name == "Treino":
            df_train = df
        elif df_name == "Valida√ß√£o":
            df_val = df
        else:
            df_test = df
    
    # Adicionar novas features √† lista
    numeric_features_base.extend([
        'razao_valor_final_inicial',
        'nivel_x_tamanho_descricao',
        'nivel_x_tamanho_codigo',
        'log_valor_absoluto_final',
        'log_valor_absoluto_inicial',
        'diferenca_absoluta_valores'
    ])
    
    print(f"  ‚úÖ 6 features de intera√ß√£o criadas!")

numeric_features_final = numeric_features_base

print(f"\n  üìä Total de features num√©ricas: {len(numeric_features_final)}")

# ================================================================================
# PREPARAR FEATURES CATEG√ìRICAS
# ================================================================================

print("\n  üî¢ Preparando features categ√≥ricas...")

categorical_cols = [
    'origem_demonstrativo',
    'ind_grp_bal',
    'ind_grp_dre',
    'cd_natureza',
    'tp_conta_agl',
    'tp_conta_pc',
    'primeiro_digito_codigo',
    'classificacao_nivel1'
]

# Features categ√≥ricas indexadas
categorical_indexed_features = [f"{col_name}_index" for col_name in categorical_cols]

# Verificar disponibilidade
available_cols = df_train.columns
categorical_indexed_features = [f for f in categorical_indexed_features if f in available_cols]

print(f"  ‚úÖ Features categ√≥ricas indexadas: {len(categorical_indexed_features)}")

# Todas as features
all_features = numeric_features_final + categorical_indexed_features

print(f"\n  üìä Total de features para o modelo:")
print(f"     - Num√©ricas: {len(numeric_features_final)}")
print(f"     - Categ√≥ricas (indexadas): {len(categorical_indexed_features)}")
print(f"     - TOTAL: {len(all_features)}")

# ================================================================================
# BALANCEAMENTO COM CLASS WEIGHTS (VERS√ÉO MELHORADA)
# ================================================================================

if USAR_CLASS_WEIGHTS:
    print("\n‚öñÔ∏è  CALCULANDO PESOS DE CLASSES (VERS√ÉO BALANCEADA)...")
    print("-" * 80)
    
    # Calcular distribui√ß√£o
    class_counts = df_train.groupBy("label").count().toPandas()
    total = class_counts['count'].sum()
    n_classes = len(class_counts)
    
    # Calcular pesos usando m√©todo BALANCED
    # Peso = total / (n_classes * count)
    class_counts['weight'] = total / (n_classes * class_counts['count'])
    
    # LIMITA√á√ÉO: Limitar pesos m√°ximos para evitar overfitting em classes raras
    MAX_WEIGHT = 50.0  # Peso m√°ximo permitido
    class_counts['weight_limited'] = class_counts['weight'].clip(upper=MAX_WEIGHT)
    
    print(f"  üìä Total de classes: {n_classes}")
    print(f"  üìä Peso m√≠nimo: {class_counts['weight_limited'].min():.4f}")
    print(f"  üìä Peso m√°ximo: {class_counts['weight_limited'].max():.4f}")
    print(f"  üìä Peso m√©dio: {class_counts['weight_limited'].mean():.4f}")
    
    # Mostrar classes mais desbalanceadas
    print("\n  üìã Top 5 classes com maior peso (menos frequentes):")
    for idx, row in class_counts.nlargest(5, 'weight').iterrows():
        peso_original = row['weight']
        peso_limitado = row['weight_limited']
        limitado_str = " [LIMITADO]" if peso_original != peso_limitado else ""
        print(f"     Label {row['label']:.0f}: peso={peso_limitado:.4f}{limitado_str}, count={row['count']:,}")
    
    # Criar mapeamento com pesos limitados
    weight_map = dict(zip(class_counts['label'], class_counts['weight_limited']))
    
    # UDF para aplicar pesos
    @udf(returnType=DoubleType())
    def get_class_weight(label):
        try:
            return float(weight_map.get(float(label), 1.0))
        except:
            return 1.0
    
    # Adicionar coluna de peso
    df_train = df_train.withColumn("classWeight", get_class_weight(col("label")))
    
    print("  ‚úÖ Pesos de classe calculados e aplicados (com limita√ß√£o)!")

# ================================================================================
# CRIAR VETOR DE FEATURES
# ================================================================================

print("\nüîó CRIANDO VETOR DE FEATURES...")
print("-" * 80)

assembler = VectorAssembler(
    inputCols=all_features,
    outputCol="features_v2",
    handleInvalid="keep"
)

df_train_assembled = assembler.transform(df_train)
df_val_assembled = assembler.transform(df_val)
df_test_assembled = assembler.transform(df_test)

print("‚úÖ Vetor de features criado!")
print(f"   - Total de features: {len(all_features)}")

# ================================================================================
# TREINAR MODELO OTIMIZADO
# ================================================================================

print("\nüå≤ TREINANDO RANDOM FOREST OTIMIZADO (V2 - CONSERVADOR)")
print("=" * 80)

# Configurar modelo
if USAR_CLASS_WEIGHTS:
    rf_v2 = RandomForestClassifier(
        featuresCol="features_v2",
        labelCol="label",
        predictionCol="prediction",
        probabilityCol="probability",
        weightCol="classWeight",
        numTrees=RF_NUM_TREES,
        maxDepth=RF_MAX_DEPTH,
        minInstancesPerNode=RF_MIN_INSTANCES,
        subsamplingRate=RF_SUBSAMPLING_RATE,
        featureSubsetStrategy=RF_FEATURE_SUBSET,
        seed=42
    )
else:
    rf_v2 = RandomForestClassifier(
        featuresCol="features_v2",
        labelCol="label",
        predictionCol="prediction",
        probabilityCol="probability",
        numTrees=RF_NUM_TREES,
        maxDepth=RF_MAX_DEPTH,
        minInstancesPerNode=RF_MIN_INSTANCES,
        subsamplingRate=RF_SUBSAMPLING_RATE,
        featureSubsetStrategy=RF_FEATURE_SUBSET,
        seed=42
    )

print(f"‚öôÔ∏è  Configura√ß√£o:")
print(f"   - numTrees: {RF_NUM_TREES}")
print(f"   - maxDepth: {RF_MAX_DEPTH}")
print(f"   - minInstancesPerNode: {RF_MIN_INSTANCES}")
print(f"   - Class weights: {USAR_CLASS_WEIGHTS} (com limita√ß√£o)")

print("\n‚è≥ Treinando modelo... (5-15 minutos)")
inicio = time.time()

rf_model_v2 = rf_v2.fit(df_train_assembled)

tempo_treino = time.time() - inicio
print(f"\n‚úÖ Modelo treinado em {tempo_treino:.2f} segundos ({tempo_treino/60:.2f} minutos)")

# ================================================================================
# AVALIAR MODELO
# ================================================================================

print("\nüìä AVALIANDO MODELO OTIMIZADO (V2)")
print("=" * 80)

print("\nüîÆ Fazendo predi√ß√µes...")
df_val_pred = rf_model_v2.transform(df_val_assembled)

# M√©tricas
evaluator_accuracy = MulticlassClassificationEvaluator(
    labelCol="label", predictionCol="prediction", metricName="accuracy"
)
evaluator_f1 = MulticlassClassificationEvaluator(
    labelCol="label", predictionCol="prediction", metricName="f1"
)
evaluator_precision = MulticlassClassificationEvaluator(
    labelCol="label", predictionCol="prediction", metricName="weightedPrecision"
)
evaluator_recall = MulticlassClassificationEvaluator(
    labelCol="label", predictionCol="prediction", metricName="weightedRecall"
)

accuracy_v2 = evaluator_accuracy.evaluate(df_val_pred)
f1_v2 = evaluator_f1.evaluate(df_val_pred)
precision_v2 = evaluator_precision.evaluate(df_val_pred)
recall_v2 = evaluator_recall.evaluate(df_val_pred)

print(f"\nüìà M√©tricas do Modelo V2 (Conservador):")
print(f"   Accuracy:  {accuracy_v2:.4f} ({accuracy_v2*100:.2f}%)")
print(f"   F1-Score:  {f1_v2:.4f}")
print(f"   Precision: {precision_v2:.4f}")
print(f"   Recall:    {recall_v2:.4f}")

# ================================================================================
# COMPARA√á√ÉO
# ================================================================================

print("\nüìä COMPARA√á√ÉO: V1 vs V2 (Conservador)")
print("=" * 80)

try:
    metricas_original = spark.table(f"{DATABASE_DESTINO}.ecd_ml_metricas") \
        .filter(col("modelo") == "RandomForest") \
        .collect()
    
    if len(metricas_original) > 0:
        accuracy_v1 = metricas_original[0]['accuracy']
        f1_v1 = metricas_original[0]['f1_score']
        
        melhoria_accuracy = ((accuracy_v2 - accuracy_v1) / accuracy_v1) * 100
        melhoria_f1 = ((f1_v2 - f1_v1) / f1_v1) * 100
        
        print(f"\nüîµ Modelo V1 (Original):")
        print(f"   Accuracy: {accuracy_v1:.4f} ({accuracy_v1*100:.2f}%)")
        print(f"   F1-Score: {f1_v1:.4f}")
        
        print(f"\nüü¢ Modelo V2 (Conservador):")
        print(f"   Accuracy: {accuracy_v2:.4f} ({accuracy_v2*100:.2f}%) [{melhoria_accuracy:+.2f}%]")
        print(f"   F1-Score: {f1_v2:.4f} [{melhoria_f1:+.2f}%]")
        
        if accuracy_v2 > accuracy_v1:
            print(f"\nüéâ MELHORIA!")
            print(f"   ‚úÖ Accuracy: +{melhoria_accuracy:.2f}%")
            print(f"   ‚úÖ F1-Score: +{melhoria_f1:.2f}%")
        elif accuracy_v2 >= accuracy_v1 * 0.98:  # At√© 2% de queda √© aceit√°vel
            print(f"\n‚úÖ Desempenho similar (varia√ß√£o: {melhoria_accuracy:.2f}%)")
        else:
            print(f"\n‚ö†Ô∏è  Desempenho reduziu:")
            print(f"   ‚¨áÔ∏è  Accuracy: {melhoria_accuracy:.2f}%")
            print(f"   ‚¨áÔ∏è  F1-Score: {melhoria_f1:.2f}%")
except Exception as e:
    print(f"‚ö†Ô∏è  Erro na compara√ß√£o: {e}")

# ================================================================================
# SALVAR
# ================================================================================

print("\nüíæ SALVANDO MODELO...")
print("-" * 80)

try:
    rf_v2_path = f"/user/{spark.sparkContext.sparkUser()}/models/rf_v2_conservador_{UF_FILTRO}_{ANO_REFERENCIA}"
    rf_model_v2.write().overwrite().save(rf_v2_path)
    print(f"‚úÖ Modelo salvo: {rf_v2_path}")
except Exception as e:
    print(f"‚ö†Ô∏è  Aviso: {e}")

# Salvar predi√ß√µes
df_val_pred_final = df_val_pred.select(
    "id_ecd", "cnpj", "cd_conta", "descr_conta",
    "classificacao_nivel2", "label", "prediction"
)

spark.sql(f"DROP TABLE IF EXISTS {DATABASE_DESTINO}.ecd_ml_predictions_rf_v2_conservador_val")
df_val_pred_final.write.mode("overwrite").saveAsTable(
    f"{DATABASE_DESTINO}.ecd_ml_predictions_rf_v2_conservador_val"
)
print(f"‚úÖ Predi√ß√µes salvas: {DATABASE_DESTINO}.ecd_ml_predictions_rf_v2_conservador_val")

# Salvar m√©tricas
from pyspark.sql import Row

nova_metrica = spark.createDataFrame([Row(
    modelo="RandomForest_v2_Conservador",
    accuracy=float(accuracy_v2),
    f1_score=float(f1_v2),
    precision=float(precision_v2),
    recall=float(recall_v2),
    tempo_treino_segundos=float(tempo_treino),
    num_trees=RF_NUM_TREES,
    max_depth=RF_MAX_DEPTH,
    ano_referencia=ANO_REFERENCIA,
    uf=UF_FILTRO
)])

nova_metrica.write.mode("append").saveAsTable(f"{DATABASE_DESTINO}.ecd_ml_metricas")
print(f"‚úÖ M√©tricas salvas: {DATABASE_DESTINO}.ecd_ml_metricas")

# ================================================================================
# RESUMO
# ================================================================================

print("\n" + "=" * 80)
print("‚úÖ RETREINAMENTO CONSERVADOR CONCLU√çDO!")
print("=" * 80)

print(f"\nüîß Otimiza√ß√µes aplicadas:")
print(f"   ‚úÖ 6 features de intera√ß√£o adicionadas")
print(f"   ‚úÖ Class weights balanceados (com limita√ß√£o MAX={50.0})")
print(f"   ‚úÖ Hiperpar√¢metros otimizados (150 √°rvores, profundidade 12)")
print(f"   ‚ÑπÔ∏è  Features de palavras-chave MANTIDAS (essenciais!)")

print(f"\nüìä Modelo V2 (Conservador):")
print(f"   - Features: {len(all_features)} (29 num√©ricas + 8 categ√≥ricas indexadas)")
print(f"   - Accuracy: {accuracy_v2*100:.2f}%")
print(f"   - F1-Score: {f1_v2:.4f}")
print(f"   - Tempo: {tempo_treino/60:.2f} min")

print("\nüí° Li√ß√µes aprendidas:")
print("   - Features de palavras-chave s√£o CR√çTICAS (n√£o remover!)")
print("   - Class weights funcionam melhor com limita√ß√£o de valor m√°ximo")
print("   - Abordagem conservadora > Abordagem agressiva")

print("=" * 80)

In [None]:
# ================================================================================
# C√âLULA 14.1: TESTE COMPLETO - 3 ABORDAGENS DE OTIMIZA√á√ÉO
# ================================================================================

print("\nüî¨ TESTE COMPLETO: 3 ABORDAGENS DE OTIMIZA√á√ÉO")
print("=" * 80)
print("\nVamos testar:")
print("  A) Sem class weights (baseline V1 melhorado)")
print("  B) Class weights com MAX_WEIGHT=10.0 (conservador)")
print("  C) Filtrar classes raras + class weights moderados")
print("=" * 80)

from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.sql.functions import col, when, lit, log1p, udf, abs as spark_abs
from pyspark.sql.types import DoubleType
import time
import pandas as pd
import numpy as np

# ================================================================================
# CONFIGURA√á√ïES GLOBAIS
# ================================================================================

DATABASE_DESTINO = 'neac'
UF_FILTRO = 'SC'
ANO_REFERENCIA = 2024

# Hiperpar√¢metros otimizados (mantidos para todas as abordagens)
RF_NUM_TREES = 150
RF_MAX_DEPTH = 12
RF_MIN_INSTANCES = 5
RF_SUBSAMPLING_RATE = 0.8
RF_FEATURE_SUBSET = "sqrt"

# ================================================================================
# CARREGAR DADOS
# ================================================================================

print("\nüì• Carregando dados...")
df_train = spark.table(f"{DATABASE_DESTINO}.ecd_ml_train")
df_val = spark.table(f"{DATABASE_DESTINO}.ecd_ml_val")
df_test = spark.table(f"{DATABASE_DESTINO}.ecd_ml_test")

print(f"‚úÖ Treino: {df_train.count():,} registros")
print(f"‚úÖ Valida√ß√£o: {df_val.count():,} registros")
print(f"‚úÖ Teste: {df_test.count():,} registros")

# ================================================================================
# FEATURE ENGINEERING (COMUM A TODAS AS ABORDAGENS)
# ================================================================================

print("\nüîß APLICANDO FEATURE ENGINEERING...")
print("-" * 80)

# Features num√©ricas base (MANTER TODAS!)
numeric_features_base = [
    'nivel_conta',
    'tamanho_descricao',
    'tamanho_codigo',
    'tem_ponto_codigo',
    'tem_hifen_codigo',
    'tem_underscore_codigo',
    'valor_absoluto_final',
    'valor_absoluto_inicial',
    'variacao_valor',
    'tem_conta_superior',
    'tem_palavra_caixa',
    'tem_palavra_banco',
    'tem_palavra_estoque',
    'tem_palavra_cliente',
    'tem_palavra_fornecedor',
    'tem_palavra_salario',
    'tem_palavra_tributo',
    'tem_palavra_receita',
    'tem_palavra_despesa',
    'tem_palavra_financeiro',
    'tem_palavra_imobilizado',
    'tem_palavra_capital',
    'tem_palavra_lucro'
]

print("\n  ‚ûï Criando features de intera√ß√£o...")

# Aplicar transforma√ß√µes em todos os datasets
for df_name, df in [("Treino", df_train), ("Valida√ß√£o", df_val), ("Teste", df_test)]:
    print(f"     - Processando {df_name}...")
    
    # 1. Raz√£o entre valores (prote√ß√£o divis√£o por zero)
    df = df.withColumn(
        "razao_valor_final_inicial",
        when(col("valor_absoluto_inicial") > 0,
             col("valor_absoluto_final") / col("valor_absoluto_inicial")
        ).otherwise(lit(0.0))
    )
    
    # 2. Intera√ß√µes nivel * tamanho
    df = df.withColumn(
        "nivel_x_tamanho_descricao",
        col("nivel_conta") * col("tamanho_descricao")
    )
    
    df = df.withColumn(
        "nivel_x_tamanho_codigo",
        col("nivel_conta") * col("tamanho_codigo")
    )
    
    # 3. Log de valores (lidar com outliers)
    df = df.withColumn(
        "log_valor_absoluto_final",
        log1p(col("valor_absoluto_final"))
    )
    
    df = df.withColumn(
        "log_valor_absoluto_inicial",
        log1p(col("valor_absoluto_inicial"))
    )
    
    # 4. Diferen√ßa absoluta entre valores
    df = df.withColumn(
        "diferenca_absoluta_valores",
        spark_abs(col("valor_absoluto_final") - col("valor_absoluto_inicial"))
    )
    
    # Atualizar refer√™ncia
    if df_name == "Treino":
        df_train = df
    elif df_name == "Valida√ß√£o":
        df_val = df
    else:
        df_test = df

# Adicionar novas features √† lista
numeric_features_base.extend([
    'razao_valor_final_inicial',
    'nivel_x_tamanho_descricao',
    'nivel_x_tamanho_codigo',
    'log_valor_absoluto_final',
    'log_valor_absoluto_inicial',
    'diferenca_absoluta_valores'
])

numeric_features_final = numeric_features_base

print(f"\n  ‚úÖ 6 features de intera√ß√£o criadas!")
print(f"  üìä Total de features num√©ricas: {len(numeric_features_final)}")

# Preparar features categ√≥ricas
print("\n  üî¢ Preparando features categ√≥ricas...")

categorical_cols = [
    'origem_demonstrativo',
    'ind_grp_bal',
    'ind_grp_dre',
    'cd_natureza',
    'tp_conta_agl',
    'tp_conta_pc',
    'primeiro_digito_codigo',
    'classificacao_nivel1'
]

categorical_indexed_features = [f"{col_name}_index" for col_name in categorical_cols]
available_cols = df_train.columns
categorical_indexed_features = [f for f in categorical_indexed_features if f in available_cols]

print(f"  ‚úÖ Features categ√≥ricas indexadas: {len(categorical_indexed_features)}")

# Todas as features
all_features = numeric_features_final + categorical_indexed_features

print(f"\n  üìä Total de features: {len(all_features)}")
print(f"     - Num√©ricas: {len(numeric_features_final)}")
print(f"     - Categ√≥ricas: {len(categorical_indexed_features)}")

# ================================================================================
# CRIAR VETOR DE FEATURES (COMUM)
# ================================================================================

print("\nüîó CRIANDO VETOR DE FEATURES...")
print("-" * 80)

assembler = VectorAssembler(
    inputCols=all_features,
    outputCol="features_v2",
    handleInvalid="keep"
)

df_train_assembled = assembler.transform(df_train)
df_val_assembled = assembler.transform(df_val)
df_test_assembled = assembler.transform(df_test)

print("‚úÖ Vetor de features criado!")

# ================================================================================
# FUN√á√ÉO AUXILIAR PARA AVALIA√á√ÉO
# ================================================================================

def avaliar_modelo(df_pred, nome_modelo):
    """Calcula m√©tricas de avalia√ß√£o"""
    evaluator_accuracy = MulticlassClassificationEvaluator(
        labelCol="label", predictionCol="prediction", metricName="accuracy"
    )
    evaluator_f1 = MulticlassClassificationEvaluator(
        labelCol="label", predictionCol="prediction", metricName="f1"
    )
    evaluator_precision = MulticlassClassificationEvaluator(
        labelCol="label", predictionCol="prediction", metricName="weightedPrecision"
    )
    evaluator_recall = MulticlassClassificationEvaluator(
        labelCol="label", predictionCol="prediction", metricName="weightedRecall"
    )
    
    accuracy = evaluator_accuracy.evaluate(df_pred)
    f1 = evaluator_f1.evaluate(df_pred)
    precision = evaluator_precision.evaluate(df_pred)
    recall = evaluator_recall.evaluate(df_pred)
    
    return {
        'modelo': nome_modelo,
        'accuracy': accuracy,
        'f1_score': f1,
        'precision': precision,
        'recall': recall
    }

# ================================================================================
# ABORDAGEM A: SEM CLASS WEIGHTS (BASELINE V1 MELHORADO)
# ================================================================================

print("\n" + "=" * 80)
print("üÖ∞Ô∏è  ABORDAGEM A: SEM CLASS WEIGHTS (Baseline V1 Melhorado)")
print("=" * 80)
print("\nüìã Estrat√©gia:")
print("   - Mant√©m todas as features (29 num√©ricas + 8 categ√≥ricas)")
print("   - Adiciona 6 features de intera√ß√£o")
print("   - SEM class weights (modelo trata todas as classes igualmente)")
print("   - Hiperpar√¢metros otimizados")

print("\nüå≤ Treinando Random Forest A...")
inicio_a = time.time()

rf_a = RandomForestClassifier(
    featuresCol="features_v2",
    labelCol="label",
    predictionCol="prediction",
    probabilityCol="probability",
    numTrees=RF_NUM_TREES,
    maxDepth=RF_MAX_DEPTH,
    minInstancesPerNode=RF_MIN_INSTANCES,
    subsamplingRate=RF_SUBSAMPLING_RATE,
    featureSubsetStrategy=RF_FEATURE_SUBSET,
    seed=42
)

model_a = rf_a.fit(df_train_assembled)
tempo_a = time.time() - inicio_a

print(f"‚úÖ Treinamento conclu√≠do em {tempo_a:.2f}s ({tempo_a/60:.2f} min)")

# Avaliar
print("\nüîÆ Avaliando modelo A...")
pred_a = model_a.transform(df_val_assembled)
metricas_a = avaliar_modelo(pred_a, "A_SemClassWeights")

print(f"\nüìä Resultados Abordagem A:")
print(f"   Accuracy:  {metricas_a['accuracy']:.4f} ({metricas_a['accuracy']*100:.2f}%)")
print(f"   F1-Score:  {metricas_a['f1_score']:.4f}")
print(f"   Precision: {metricas_a['precision']:.4f}")
print(f"   Recall:    {metricas_a['recall']:.4f}")

# ================================================================================
# ABORDAGEM B: CLASS WEIGHTS COM MAX_WEIGHT=10.0
# ================================================================================

print("\n" + "=" * 80)
print("üÖ±Ô∏è  ABORDAGEM B: CLASS WEIGHTS CONSERVADORES (MAX=10.0)")
print("=" * 80)
print("\nüìã Estrat√©gia:")
print("   - Mesmas features da Abordagem A")
print("   - Usa class weights para balancear classes")
print("   - LIMITA√á√ÉO: peso m√°ximo = 10.0 (mais conservador)")

print("\n‚öñÔ∏è  Calculando class weights (MAX=10.0)...")

# Calcular distribui√ß√£o
class_counts_b = df_train.groupBy("label").count().toPandas()
total_b = class_counts_b['count'].sum()
n_classes_b = len(class_counts_b)

# Calcular pesos balanced
class_counts_b['weight'] = total_b / (n_classes_b * class_counts_b['count'])

# LIMITA√á√ÉO: MAX=10.0
MAX_WEIGHT_B = 10.0
class_counts_b['weight_limited'] = class_counts_b['weight'].clip(upper=MAX_WEIGHT_B)

print(f"  üìä Total de classes: {n_classes_b}")
print(f"  üìä Peso m√≠nimo: {class_counts_b['weight_limited'].min():.4f}")
print(f"  üìä Peso m√°ximo: {class_counts_b['weight_limited'].max():.4f}")
print(f"  üìä Peso m√©dio: {class_counts_b['weight_limited'].mean():.4f}")

# Top 5 classes com maior peso
print("\n  üìã Top 5 classes com maior peso:")
for idx, row in class_counts_b.nlargest(5, 'weight_limited').iterrows():
    peso_original = row['weight']
    peso_limitado = row['weight_limited']
    limitado_str = " [LIMITADO]" if peso_original > MAX_WEIGHT_B else ""
    print(f"     Label {row['label']:.0f}: peso={peso_limitado:.4f}{limitado_str}, count={row['count']:,}")

# Criar mapeamento
weight_map_b = dict(zip(class_counts_b['label'], class_counts_b['weight_limited']))

# UDF para aplicar pesos
@udf(returnType=DoubleType())
def get_class_weight_b(label):
    try:
        return float(weight_map_b.get(float(label), 1.0))
    except:
        return 1.0

# Adicionar coluna de peso
df_train_b = df_train_assembled.withColumn("classWeight", get_class_weight_b(col("label")))

print("\nüå≤ Treinando Random Forest B...")
inicio_b = time.time()

rf_b = RandomForestClassifier(
    featuresCol="features_v2",
    labelCol="label",
    predictionCol="prediction",
    probabilityCol="probability",
    weightCol="classWeight",
    numTrees=RF_NUM_TREES,
    maxDepth=RF_MAX_DEPTH,
    minInstancesPerNode=RF_MIN_INSTANCES,
    subsamplingRate=RF_SUBSAMPLING_RATE,
    featureSubsetStrategy=RF_FEATURE_SUBSET,
    seed=42
)

model_b = rf_b.fit(df_train_b)
tempo_b = time.time() - inicio_b

print(f"‚úÖ Treinamento conclu√≠do em {tempo_b:.2f}s ({tempo_b/60:.2f} min)")

# Avaliar
print("\nüîÆ Avaliando modelo B...")
pred_b = model_b.transform(df_val_assembled)
metricas_b = avaliar_modelo(pred_b, "B_ClassWeights_MAX10")

print(f"\nüìä Resultados Abordagem B:")
print(f"   Accuracy:  {metricas_b['accuracy']:.4f} ({metricas_b['accuracy']*100:.2f}%)")
print(f"   F1-Score:  {metricas_b['f1_score']:.4f}")
print(f"   Precision: {metricas_b['precision']:.4f}")
print(f"   Recall:    {metricas_b['recall']:.4f}")

# ================================================================================
# ABORDAGEM C: FILTRAR CLASSES RARAS + CLASS WEIGHTS MODERADOS
# ================================================================================

print("\n" + "=" * 80)
print("üÖ≤  ABORDAGEM C: FILTRAR CLASSES RARAS + CLASS WEIGHTS")
print("=" * 80)
print("\nüìã Estrat√©gia:")
print("   - Remover classes com < 200 exemplos ANTES do treino")
print("   - Usar class weights moderados (MAX=15.0) nas classes restantes")
print("   - Melhora qualidade dos dados de treino")

print("\nüîç Analisando distribui√ß√£o de classes...")

# An√°lise de distribui√ß√£o
class_dist_c = df_train.groupBy("label").count().toPandas().sort_values('count')
print(f"\n  üìä Distribui√ß√£o atual:")
print(f"     Total de classes: {len(class_dist_c)}")
print(f"     Classe menor: {class_dist_c['count'].min():,} exemplos")
print(f"     Classe maior: {class_dist_c['count'].max():,} exemplos")
print(f"     Mediana: {class_dist_c['count'].median():,.0f} exemplos")

# Filtrar classes raras
MIN_SAMPLES_PER_CLASS = 200
classes_raras = class_dist_c[class_dist_c['count'] < MIN_SAMPLES_PER_CLASS]['label'].tolist()
classes_validas = class_dist_c[class_dist_c['count'] >= MIN_SAMPLES_PER_CLASS]['label'].tolist()

print(f"\n  ‚ö†Ô∏è  Classes raras (< {MIN_SAMPLES_PER_CLASS} exemplos): {len(classes_raras)}")
print(f"  ‚úÖ Classes v√°lidas (‚â• {MIN_SAMPLES_PER_CLASS} exemplos): {len(classes_validas)}")

if len(classes_raras) > 0:
    print(f"\n  üìã Classes que ser√£o REMOVIDAS do treino:")
    for label in classes_raras:
        count = class_dist_c[class_dist_c['label'] == label]['count'].values[0]
        print(f"     Label {label:.0f}: {count:,} exemplos")

# Filtrar datasets
print(f"\n  üîß Filtrando datasets...")
df_train_c = df_train_assembled.filter(col("label").isin(classes_validas))
df_val_c = df_val_assembled.filter(col("label").isin(classes_validas))

registros_train_antes = df_train_assembled.count()
registros_train_depois = df_train_c.count()
registros_removidos = registros_train_antes - registros_train_depois

print(f"     Treino: {registros_train_antes:,} ‚Üí {registros_train_depois:,} registros")
print(f"     Removidos: {registros_removidos:,} ({registros_removidos/registros_train_antes*100:.2f}%)")

# Calcular class weights nas classes v√°lidas
print("\n‚öñÔ∏è  Calculando class weights (MAX=15.0)...")

class_counts_c = df_train_c.groupBy("label").count().toPandas()
total_c = class_counts_c['count'].sum()
n_classes_c = len(class_counts_c)

class_counts_c['weight'] = total_c / (n_classes_c * class_counts_c['count'])

MAX_WEIGHT_C = 15.0
class_counts_c['weight_limited'] = class_counts_c['weight'].clip(upper=MAX_WEIGHT_C)

print(f"  üìä Classes finais: {n_classes_c}")
print(f"  üìä Peso m√≠nimo: {class_counts_c['weight_limited'].min():.4f}")
print(f"  üìä Peso m√°ximo: {class_counts_c['weight_limited'].max():.4f}")
print(f"  üìä Peso m√©dio: {class_counts_c['weight_limited'].mean():.4f}")

# Top 5 classes com maior peso
print("\n  üìã Top 5 classes com maior peso:")
for idx, row in class_counts_c.nlargest(5, 'weight_limited').iterrows():
    peso_original = row['weight']
    peso_limitado = row['weight_limited']
    limitado_str = " [LIMITADO]" if peso_original > MAX_WEIGHT_C else ""
    print(f"     Label {row['label']:.0f}: peso={peso_limitado:.4f}{limitado_str}, count={row['count']:,}")

# Criar mapeamento
weight_map_c = dict(zip(class_counts_c['label'], class_counts_c['weight_limited']))

@udf(returnType=DoubleType())
def get_class_weight_c(label):
    try:
        return float(weight_map_c.get(float(label), 1.0))
    except:
        return 1.0

df_train_c = df_train_c.withColumn("classWeight", get_class_weight_c(col("label")))

print("\nüå≤ Treinando Random Forest C...")
inicio_c = time.time()

rf_c = RandomForestClassifier(
    featuresCol="features_v2",
    labelCol="label",
    predictionCol="prediction",
    probabilityCol="probability",
    weightCol="classWeight",
    numTrees=RF_NUM_TREES,
    maxDepth=RF_MAX_DEPTH,
    minInstancesPerNode=RF_MIN_INSTANCES,
    subsamplingRate=RF_SUBSAMPLING_RATE,
    featureSubsetStrategy=RF_FEATURE_SUBSET,
    seed=42
)

model_c = rf_c.fit(df_train_c)
tempo_c = time.time() - inicio_c

print(f"‚úÖ Treinamento conclu√≠do em {tempo_c:.2f}s ({tempo_c/60:.2f} min)")

# Avaliar
print("\nüîÆ Avaliando modelo C...")
pred_c = model_c.transform(df_val_c)
metricas_c = avaliar_modelo(pred_c, "C_FiltrarRaras_ClassWeights")

print(f"\nüìä Resultados Abordagem C:")
print(f"   Accuracy:  {metricas_c['accuracy']:.4f} ({metricas_c['accuracy']*100:.2f}%)")
print(f"   F1-Score:  {metricas_c['f1_score']:.4f}")
print(f"   Precision: {metricas_c['precision']:.4f}")
print(f"   Recall:    {metricas_c['recall']:.4f}")
print(f"\n‚ö†Ô∏è  NOTA: Valida√ß√£o feita apenas nas {len(classes_validas)} classes v√°lidas")

# ================================================================================
# COMPARA√á√ÉO FINAL
# ================================================================================

print("\n" + "=" * 80)
print("üìä COMPARA√á√ÉO FINAL: TODAS AS ABORDAGENS")
print("=" * 80)

# Buscar V1 original
try:
    metricas_v1 = spark.table(f"{DATABASE_DESTINO}.ecd_ml_metricas") \
        .filter(col("modelo") == "RandomForest") \
        .collect()
    
    if len(metricas_v1) > 0:
        acc_v1 = metricas_v1[0]['accuracy']
        f1_v1 = metricas_v1[0]['f1_score']
    else:
        acc_v1 = 0.7793
        f1_v1 = 0.7016
except:
    acc_v1 = 0.7793
    f1_v1 = 0.7016

# Criar DataFrame comparativo
comparacao = pd.DataFrame([
    {
        'Modelo': 'V1 (Original)',
        'Accuracy': acc_v1,
        'F1-Score': f1_v1,
        'Œî Accuracy': 0.0,
        'Œî F1': 0.0,
        'Tempo (min)': '-',
        'Estrat√©gia': 'Baseline original'
    },
    {
        'Modelo': 'A - Sem Weights',
        'Accuracy': metricas_a['accuracy'],
        'F1-Score': metricas_a['f1_score'],
        'Œî Accuracy': ((metricas_a['accuracy'] - acc_v1) / acc_v1) * 100,
        'Œî F1': ((metricas_a['f1_score'] - f1_v1) / f1_v1) * 100,
        'Tempo (min)': f"{tempo_a/60:.2f}",
        'Estrat√©gia': '37 features, sem weights'
    },
    {
        'Modelo': 'B - Weights MAX=10',
        'Accuracy': metricas_b['accuracy'],
        'F1-Score': metricas_b['f1_score'],
        'Œî Accuracy': ((metricas_b['accuracy'] - acc_v1) / acc_v1) * 100,
        'Œî F1': ((metricas_b['f1_score'] - f1_v1) / f1_v1) * 100,
        'Tempo (min)': f"{tempo_b/60:.2f}",
        'Estrat√©gia': '37 features, weights MAX=10'
    },
    {
        'Modelo': 'C - Filtrar + Weights',
        'Accuracy': metricas_c['accuracy'],
        'F1-Score': metricas_c['f1_score'],
        'Œî Accuracy': ((metricas_c['accuracy'] - acc_v1) / acc_v1) * 100,
        'Œî F1': ((metricas_c['f1_score'] - f1_v1) / f1_v1) * 100,
        'Tempo (min)': f"{tempo_c/60:.2f}",
        'Estrat√©gia': f'{len(classes_validas)} classes, weights MAX=15'
    }
])

print("\nüìã TABELA COMPARATIVA:")
print(comparacao.to_string(index=False))

# Identificar melhor modelo
melhor_idx = comparacao['Accuracy'].idxmax()
melhor_modelo = comparacao.loc[melhor_idx, 'Modelo']
melhor_acc = comparacao.loc[melhor_idx, 'Accuracy']

print(f"\nüèÜ MELHOR MODELO: {melhor_modelo}")
print(f"   Accuracy: {melhor_acc:.4f} ({melhor_acc*100:.2f}%)")

# Recomenda√ß√£o
print("\nüí° RECOMENDA√á√ÉO:")
if melhor_idx == 0:  # V1
    print("   ‚û°Ô∏è  Manter modelo V1 original (nenhuma melhoria encontrada)")
elif melhor_idx == 1:  # A
    print("   ‚û°Ô∏è  USAR ABORDAGEM A (Sem class weights)")
    print("   ‚úÖ Adiciona features de intera√ß√£o sem complicar com weights")
    print("   ‚úÖ Melhor custo-benef√≠cio")
elif melhor_idx == 2:  # B
    print("   ‚û°Ô∏è  USAR ABORDAGEM B (Class weights conservadores)")
    print("   ‚úÖ Balanceia classes sem overfitting")
    print("   ‚ö†Ô∏è  Avaliar se ganho justifica complexidade")
else:  # C
    print("   ‚û°Ô∏è  USAR ABORDAGEM C (Filtrar classes raras)")
    print("   ‚úÖ Melhor qualidade de predi√ß√µes")
    print("   ‚ö†Ô∏è  Cuidado: n√£o prediz classes raras!")

# ================================================================================
# SALVAR RESULTADOS
# ================================================================================

print("\nüíæ SALVANDO RESULTADOS...")
print("-" * 80)

# Salvar modelo escolhido (vamos salvar todos para compara√ß√£o futura)
try:
    # Salvar modelos
    model_a.write().overwrite().save(f"/user/{spark.sparkContext.sparkUser()}/models/rf_abordagem_a_{UF_FILTRO}_{ANO_REFERENCIA}")
    model_b.write().overwrite().save(f"/user/{spark.sparkContext.sparkUser()}/models/rf_abordagem_b_{UF_FILTRO}_{ANO_REFERENCIA}")
    model_c.write().overwrite().save(f"/user/{spark.sparkContext.sparkUser()}/models/rf_abordagem_c_{UF_FILTRO}_{ANO_REFERENCIA}")
    print("‚úÖ Modelos salvos")
except Exception as e:
    print(f"‚ö†Ô∏è  Aviso ao salvar modelos: {e}")

# Salvar predi√ß√µes do melhor modelo
try:
    if melhor_idx == 1:
        pred_final = pred_a
        nome_tabela = "ecd_ml_predictions_abordagem_a"
    elif melhor_idx == 2:
        pred_final = pred_b
        nome_tabela = "ecd_ml_predictions_abordagem_b"
    else:
        pred_final = pred_c
        nome_tabela = "ecd_ml_predictions_abordagem_c"
    
    pred_final_save = pred_final.select(
        "id_ecd", "cnpj", "cd_conta", "descr_conta",
        "classificacao_nivel2", "label", "prediction"
    )
    
    spark.sql(f"DROP TABLE IF EXISTS {DATABASE_DESTINO}.{nome_tabela}")
    pred_final_save.write.mode("overwrite").saveAsTable(f"{DATABASE_DESTINO}.{nome_tabela}")
    print(f"‚úÖ Predi√ß√µes salvas: {DATABASE_DESTINO}.{nome_tabela}")
except Exception as e:
    print(f"‚ö†Ô∏è  Aviso ao salvar predi√ß√µes: {e}")

# Salvar m√©tricas comparativas
try:
    from pyspark.sql import Row
    
    metricas_comparativas = []
    
    for idx, row in comparacao.iterrows():
        if idx > 0:  # Pular V1 original
            tempo_treino = tempo_a if idx == 1 else (tempo_b if idx == 2 else tempo_c)
            
            metricas_comparativas.append(Row(
                modelo=row['Modelo'],
                accuracy=float(row['Accuracy']),
                f1_score=float(row['F1-Score']),
                precision=float(metricas_a['precision'] if idx == 1 else (metricas_b['precision'] if idx == 2 else metricas_c['precision'])),
                recall=float(metricas_a['recall'] if idx == 1 else (metricas_b['recall'] if idx == 2 else metricas_c['recall'])),
                tempo_treino_segundos=float(tempo_treino),
                num_trees=RF_NUM_TREES,
                max_depth=RF_MAX_DEPTH,
                estrategia=row['Estrat√©gia'],
                ano_referencia=ANO_REFERENCIA,
                uf=UF_FILTRO
            ))
    
    df_metricas = spark.createDataFrame(metricas_comparativas)
    df_metricas.write.mode("append").saveAsTable(f"{DATABASE_DESTINO}.ecd_ml_metricas")
    print(f"‚úÖ M√©tricas comparativas salvas: {DATABASE_DESTINO}.ecd_ml_metricas")
except Exception as e:
    print(f"‚ö†Ô∏è  Aviso ao salvar m√©tricas: {e}")

# ================================================================================
# RESUMO EXECUTIVO
# ================================================================================

print("\n" + "=" * 80)
print("‚úÖ TESTE COMPLETO CONCLU√çDO!")
print("=" * 80)

print(f"\nüéØ RESUMO EXECUTIVO:")
print(f"   - Testadas 3 abordagens de otimiza√ß√£o")
print(f"   - Melhor resultado: {melhor_modelo} com {melhor_acc*100:.2f}% accuracy")
print(f"   - Ganho sobre V1: {comparacao.loc[melhor_idx, 'Œî Accuracy']:+.2f}%")

print(f"\nüìä Compara√ß√£o r√°pida:")
print(f"   V1 Original:        {acc_v1*100:.2f}%")
print(f"   A - Sem Weights:    {metricas_a['accuracy']*100:.2f}% ({((metricas_a['accuracy']-acc_v1)/acc_v1*100):+.2f}%)")
print(f"   B - Weights MAX=10: {metricas_b['accuracy']*100:.2f}% ({((metricas_b['accuracy']-acc_v1)/acc_v1*100):+.2f}%)")
print(f"   C - Filtrar Raras:  {metricas_c['accuracy']*100:.2f}% ({((metricas_c['accuracy']-acc_v1)/acc_v1*100):+.2f}%)")

print("\nüí° Pr√≥ximos passos sugeridos:")
if melhor_acc > acc_v1:
    print(f"   ‚úÖ Adotar {melhor_modelo} em produ√ß√£o")
    print(f"   ‚úÖ Retreinar modelo completo (treino + valida√ß√£o)")
    print(f"   ‚úÖ Avaliar no conjunto de teste final")
else:
    print(f"   ‚ö†Ô∏è  Nenhuma abordagem superou V1 original")
    print(f"   üí≠ Considerar outras estrat√©gias:")
    print(f"      - Feature selection mais sofisticada")
    print(f"      - Ensemble de modelos")
    print(f"      - Transfer learning de V1 para classes espec√≠ficas")

print("=" * 80)

In [None]:
# ================================================================================
# C√âLULA 14.2: RETREINAR MODELO A COM TREINO + VALIDA√á√ÉO
# ================================================================================

print("\nüöÄ RETREINAMENTO FINAL - ABORDAGEM A (PRODU√á√ÉO)")
print("=" * 80)
print("\nüìã Estrat√©gia:")
print("   - Combinar treino + valida√ß√£o para maximizar dados")
print("   - Manter configura√ß√£o vencedora da Abordagem A")
print("   - Avaliar no conjunto de TESTE (nunca visto)")

DATABASE_DESTINO = 'neac'
UF_FILTRO = 'SC'
ANO_REFERENCIA = 2024

# ================================================================================
# CARREGAR MODELO VENCEDOR
# ================================================================================

from pyspark.ml.classification import RandomForestClassificationModel

print("\nüì• Carregando modelo vencedor (Abordagem A)...")
model_a_path = f"/user/{spark.sparkContext.sparkUser()}/models/rf_abordagem_a_{UF_FILTRO}_{ANO_REFERENCIA}"
modelo_vencedor = RandomForestClassificationModel.load(model_a_path)
print(f"‚úÖ Modelo carregado: {model_a_path}")

# ================================================================================
# OP√á√ÉO 1: AVALIAR NO CONJUNTO DE TESTE
# ================================================================================

print("\nüìä AVALIANDO NO CONJUNTO DE TESTE (NUNCA VISTO)...")
print("-" * 80)

# Carregar dados de teste
df_test = spark.table(f"{DATABASE_DESTINO}.ecd_ml_test")
print(f"‚úÖ Teste: {df_test.count():,} registros")

# Aplicar mesmas transforma√ß√µes
from pyspark.sql.functions import col, when, lit, log1p, abs as spark_abs

print("\nüîß Aplicando feature engineering no teste...")

# Features de intera√ß√£o
df_test = df_test.withColumn(
    "razao_valor_final_inicial",
    when(col("valor_absoluto_inicial") > 0,
         col("valor_absoluto_final") / col("valor_absoluto_inicial")
    ).otherwise(lit(0.0))
)

df_test = df_test.withColumn("nivel_x_tamanho_descricao", col("nivel_conta") * col("tamanho_descricao"))
df_test = df_test.withColumn("nivel_x_tamanho_codigo", col("nivel_conta") * col("tamanho_codigo"))
df_test = df_test.withColumn("log_valor_absoluto_final", log1p(col("valor_absoluto_final")))
df_test = df_test.withColumn("log_valor_absoluto_inicial", log1p(col("valor_absoluto_inicial")))
df_test = df_test.withColumn("diferenca_absoluta_valores", 
                             spark_abs(col("valor_absoluto_final") - col("valor_absoluto_inicial")))

# Vetor de features
from pyspark.ml.feature import VectorAssembler

numeric_features = [
    'nivel_conta', 'tamanho_descricao', 'tamanho_codigo', 'tem_ponto_codigo',
    'tem_hifen_codigo', 'tem_underscore_codigo', 'valor_absoluto_final',
    'valor_absoluto_inicial', 'variacao_valor', 'tem_conta_superior',
    'tem_palavra_caixa', 'tem_palavra_banco', 'tem_palavra_estoque',
    'tem_palavra_cliente', 'tem_palavra_fornecedor', 'tem_palavra_salario',
    'tem_palavra_tributo', 'tem_palavra_receita', 'tem_palavra_despesa',
    'tem_palavra_financeiro', 'tem_palavra_imobilizado', 'tem_palavra_capital',
    'tem_palavra_lucro', 'razao_valor_final_inicial', 'nivel_x_tamanho_descricao',
    'nivel_x_tamanho_codigo', 'log_valor_absoluto_final', 'log_valor_absoluto_inicial',
    'diferenca_absoluta_valores'
]

categorical_features = [
    'origem_demonstrativo_index', 'ind_grp_bal_index', 'ind_grp_dre_index',
    'cd_natureza_index', 'tp_conta_agl_index', 'tp_conta_pc_index',
    'primeiro_digito_codigo_index', 'classificacao_nivel1_index'
]

all_features = numeric_features + categorical_features

assembler = VectorAssembler(inputCols=all_features, outputCol="features_v2", handleInvalid="keep")
df_test_assembled = assembler.transform(df_test)

print("‚úÖ Features preparadas no teste")

# Fazer predi√ß√µes
print("\nüîÆ Fazendo predi√ß√µes no teste...")
pred_test = modelo_vencedor.transform(df_test_assembled)

# Avaliar
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

evaluator_accuracy = MulticlassClassificationEvaluator(
    labelCol="label", predictionCol="prediction", metricName="accuracy"
)
evaluator_f1 = MulticlassClassificationEvaluator(
    labelCol="label", predictionCol="prediction", metricName="f1"
)
evaluator_precision = MulticlassClassificationEvaluator(
    labelCol="label", predictionCol="prediction", metricName="weightedPrecision"
)
evaluator_recall = MulticlassClassificationEvaluator(
    labelCol="label", predictionCol="prediction", metricName="weightedRecall"
)

accuracy_test = evaluator_accuracy.evaluate(pred_test)
f1_test = evaluator_f1.evaluate(pred_test)
precision_test = evaluator_precision.evaluate(pred_test)
recall_test = evaluator_recall.evaluate(pred_test)

print(f"\nüìä RESULTADOS NO CONJUNTO DE TESTE:")
print(f"   Accuracy:  {accuracy_test:.4f} ({accuracy_test*100:.2f}%)")
print(f"   F1-Score:  {f1_test:.4f}")
print(f"   Precision: {precision_test:.4f}")
print(f"   Recall:    {recall_test:.4f}")

# Compara√ß√£o
print(f"\nüìà COMPARA√á√ÉO:")
print(f"   Valida√ß√£o: {0.8088*100:.2f}%")
print(f"   Teste:     {accuracy_test*100:.2f}%")
print(f"   Diferen√ßa: {((accuracy_test - 0.8088) / 0.8088 * 100):+.2f}%")

# ================================================================================
# OP√á√ÉO 2: RETREINAR COM TREINO + VALIDA√á√ÉO (OPCIONAL)
# ================================================================================

print("\n" + "=" * 80)
print("üîÑ OP√á√ÉO: RETREINAR COM TREINO + VALIDA√á√ÉO COMBINADOS")
print("=" * 80)

resposta_retreinar = input("\nDeseja retreinar com treino+valida√ß√£o? (s/n): ")

if resposta_retreinar.lower() == 's':
    print("\nüîß Combinando treino + valida√ß√£o...")
    
    df_train = spark.table(f"{DATABASE_DESTINO}.ecd_ml_train")
    df_val = spark.table(f"{DATABASE_DESTINO}.ecd_ml_val")
    
    # Aplicar feature engineering em ambos
    for df_name, df in [("Treino", df_train), ("Valida√ß√£o", df_val)]:
        df = df.withColumn(
            "razao_valor_final_inicial",
            when(col("valor_absoluto_inicial") > 0,
                 col("valor_absoluto_final") / col("valor_absoluto_inicial")
            ).otherwise(lit(0.0))
        )
        df = df.withColumn("nivel_x_tamanho_descricao", col("nivel_conta") * col("tamanho_descricao"))
        df = df.withColumn("nivel_x_tamanho_codigo", col("nivel_conta") * col("tamanho_codigo"))
        df = df.withColumn("log_valor_absoluto_final", log1p(col("valor_absoluto_final")))
        df = df.withColumn("log_valor_absoluto_inicial", log1p(col("valor_absoluto_inicial")))
        df = df.withColumn("diferenca_absoluta_valores", 
                          spark_abs(col("valor_absoluto_final") - col("valor_absoluto_inicial")))
        
        if df_name == "Treino":
            df_train = df
        else:
            df_val = df
    
    # Combinar
    df_train_full = df_train.union(df_val)
    print(f"‚úÖ Dataset combinado: {df_train_full.count():,} registros")
    
    # Criar vetor
    df_train_full_assembled = assembler.transform(df_train_full)
    
    # Treinar
    from pyspark.ml.classification import RandomForestClassifier
    import time
    
    print("\nüå≤ Treinando Random Forest FINAL...")
    inicio = time.time()
    
    rf_final = RandomForestClassifier(
        featuresCol="features_v2",
        labelCol="label",
        predictionCol="prediction",
        probabilityCol="probability",
        numTrees=150,
        maxDepth=12,
        minInstancesPerNode=5,
        subsamplingRate=0.8,
        featureSubsetStrategy="sqrt",
        seed=42
    )
    
    modelo_final = rf_final.fit(df_train_full_assembled)
    tempo_treino = time.time() - inicio
    
    print(f"‚úÖ Treinamento conclu√≠do em {tempo_treino:.2f}s ({tempo_treino/60:.2f} min)")
    
    # Avaliar no teste
    print("\nüîÆ Avaliando modelo FINAL no teste...")
    pred_test_final = modelo_final.transform(df_test_assembled)
    
    accuracy_final = evaluator_accuracy.evaluate(pred_test_final)
    f1_final = evaluator_f1.evaluate(pred_test_final)
    precision_final = evaluator_precision.evaluate(pred_test_final)
    recall_final = evaluator_recall.evaluate(pred_test_final)
    
    print(f"\nüìä RESULTADOS MODELO FINAL (TREINO+VAL):")
    print(f"   Accuracy:  {accuracy_final:.4f} ({accuracy_final*100:.2f}%)")
    print(f"   F1-Score:  {f1_final:.4f}")
    print(f"   Precision: {precision_final:.4f}")
    print(f"   Recall:    {recall_final:.4f}")
    
    # Salvar modelo final
    print("\nüíæ Salvando modelo FINAL...")
    modelo_final_path = f"/user/{spark.sparkContext.sparkUser()}/models/rf_final_producao_{UF_FILTRO}_{ANO_REFERENCIA}"
    modelo_final.write().overwrite().save(modelo_final_path)
    print(f"‚úÖ Modelo FINAL salvo: {modelo_final_path}")
    
    # Salvar predi√ß√µes
    pred_final_save = pred_test_final.select(
        "id_ecd", "cnpj", "cd_conta", "descr_conta",
        "classificacao_nivel2", "label", "prediction", "probability"
    )
    
    spark.sql(f"DROP TABLE IF EXISTS {DATABASE_DESTINO}.ecd_ml_predictions_final_test")
    pred_final_save.write.mode("overwrite").saveAsTable(
        f"{DATABASE_DESTINO}.ecd_ml_predictions_final_test"
    )
    print(f"‚úÖ Predi√ß√µes FINAIS salvas: {DATABASE_DESTINO}.ecd_ml_predictions_final_test")

# ================================================================================
# RESUMO FINAL
# ================================================================================

print("\n" + "=" * 80)
print("‚úÖ PROCESSO CONCLU√çDO!")
print("=" * 80)

print(f"\nüéØ CONQUISTAS:")
print(f"   ‚úÖ Modelo vencedor: Abordagem A (sem class weights)")
print(f"   ‚úÖ Ganho: +3.79% accuracy sobre V1 original")
print(f"   ‚úÖ Avaliado no conjunto de teste")
print(f"   ‚úÖ Modelo pronto para produ√ß√£o")

print(f"\nüìä PERFORMANCE FINAL:")
print(f"   Valida√ß√£o: 80.88%")
print(f"   Teste:     {accuracy_test*100:.2f}%")

print(f"\nüí° LI√á√ïES APRENDIDAS:")
print(f"   1. Features de intera√ß√£o > Class weights")
print(f"   2. Simplicidade funciona melhor que complexidade")
print(f"   3. Class weights prejudicam quando mal calibrados")
print(f"   4. Features de palavras-chave s√£o essenciais")

print("=" * 80)

In [None]:
# ================================================================================
# AN√ÅLISE DE ERROS - ONDE O MODELO ERRA?
# ================================================================================

print("üîç AN√ÅLISE DE ERROS DO MODELO FINAL")
print("=" * 80)

# Carregar predi√ß√µes do teste
pred_test = spark.table("neac.ecd_ml_predictions_final_test")

# 1. MATRIZ DE CONFUS√ÉO POR CLASSE
print("\nüìä MATRIZ DE CONFUS√ÉO - Top 10 Classes com Mais Erros:")

erros_por_classe = pred_test \
    .filter(col("label") != col("prediction")) \
    .groupBy("label", "classificacao_nivel2") \
    .count() \
    .orderBy(col("count").desc()) \
    .limit(10)

erros_por_classe.show(10, truncate=False)

# 2. CLASSES MAIS CONFUNDIDAS
print("\nüîÄ PARES DE CLASSES MAIS CONFUNDIDOS:")

confusoes = pred_test \
    .filter(col("label") != col("prediction")) \
    .groupBy("label", "prediction") \
    .count() \
    .orderBy(col("count").desc()) \
    .limit(10)

confusoes.show(10, truncate=False)

# 3. EXEMPLOS DE ERROS
print("\nüìã EXEMPLOS DE CONTAS MAL CLASSIFICADAS:")

exemplos_erros = pred_test \
    .filter(col("label") != col("prediction")) \
    .select("cnpj", "cd_conta", "descr_conta", 
            "classificacao_nivel2", "label", "prediction") \
    .limit(20)

exemplos_erros.show(20, truncate=False)

# 4. ACUR√ÅCIA POR CLASSE
print("\nüìà ACUR√ÅCIA POR CLASSE:")

acuracia_classe = pred_test \
    .withColumn("correto", when(col("label") == col("prediction"), 1).otherwise(0)) \
    .groupBy("label", "classificacao_nivel2") \
    .agg(
        count("*").alias("total"),
        sum("correto").alias("acertos")
    ) \
    .withColumn("accuracy", col("acertos") / col("total") * 100) \
    .orderBy(col("accuracy"))

acuracia_classe.show(30, truncate=False)

In [None]:
# ================================================================================
# IMPORT√ÇNCIA DAS FEATURES - VERS√ÉO FINAL CORRIGIDA
# ================================================================================

from pyspark.ml.classification import RandomForestClassificationModel
import numpy as np
import builtins  # Para usar sum() do Python, n√£o do PySpark

print("\nüéØ IMPORT√ÇNCIA DAS FEATURES")
print("=" * 80)

# Carregar modelo
modelo = RandomForestClassificationModel.load(
    "/user/tsevero/models/rf_final_producao_SC_2024"
)

# Extrair import√¢ncias
importances = modelo.featureImportances
importances_array = importances.toArray()
indices_ordenados = importances_array.argsort()[::-1]

# Nomes das features (37 total)
feature_names = [
    # Num√©ricas base (23)
    'nivel_conta', 'tamanho_descricao', 'tamanho_codigo', 'tem_ponto_codigo',
    'tem_hifen_codigo', 'tem_underscore_codigo', 'valor_absoluto_final',
    'valor_absoluto_inicial', 'variacao_valor', 'tem_conta_superior',
    'tem_palavra_caixa', 'tem_palavra_banco', 'tem_palavra_estoque',
    'tem_palavra_cliente', 'tem_palavra_fornecedor', 'tem_palavra_salario',
    'tem_palavra_tributo', 'tem_palavra_receita', 'tem_palavra_despesa',
    'tem_palavra_financeiro', 'tem_palavra_imobilizado', 'tem_palavra_capital',
    'tem_palavra_lucro',
    # Features de intera√ß√£o (6)
    'razao_valor_final_inicial', 'nivel_x_tamanho_descricao',
    'nivel_x_tamanho_codigo', 'log_valor_absoluto_final', 
    'log_valor_absoluto_inicial', 'diferenca_absoluta_valores',
    # Categ√≥ricas (8)
    'origem_demonstrativo_index', 'ind_grp_bal_index', 'ind_grp_dre_index',
    'cd_natureza_index', 'tp_conta_agl_index', 'tp_conta_pc_index',
    'primeiro_digito_codigo_index', 'classificacao_nivel1_index'
]

print(f"\nüìä Total de features: {len(feature_names)}")
print(f"üìä Import√¢ncias extra√≠das: {len(importances_array)}")

# Validar
if len(feature_names) != len(importances_array):
    print(f"\n‚ö†Ô∏è  AVISO: N√∫mero de features n√£o coincide!")
    print(f"   Esperado: {len(feature_names)}")
    print(f"   Recebido: {len(importances_array)}")

print("\nüìã TOP 20 FEATURES MAIS IMPORTANTES:\n")
print(f"{'Rank':<5} {'Feature':<35} {'Import√¢ncia':<12} {'%':<8} {'Tipo'}")
print("-" * 90)

importancia_total = float(importances_array.sum())

for i, idx in enumerate(indices_ordenados[:20], 1):
    # Converter numpy.int64 para int Python
    idx_int = int(idx)
    
    nome = feature_names[idx_int]
    importancia = float(importances_array[idx_int])
    percentual = (importancia / importancia_total) * 100
    
    # Classificar tipo
    if nome in ['razao_valor_final_inicial', 'nivel_x_tamanho_descricao',
                'nivel_x_tamanho_codigo', 'log_valor_absoluto_final',
                'log_valor_absoluto_inicial', 'diferenca_absoluta_valores']:
        tipo = "üîó Intera√ß√£o"
    elif "_index" in nome:
        tipo = "üìä Categ√≥rica"
    else:
        tipo = "üî¢ Num√©rica"
    
    print(f"{i:<5} {nome:<35} {importancia:<12.6f} {percentual:<7.2f}% {tipo}")

# ================================================================================
# AN√ÅLISE ADICIONAL
# ================================================================================

print("\n" + "=" * 90)
print("üìä AN√ÅLISE POR TIPO DE FEATURE")
print("=" * 90)

# Categorizar features
features_por_tipo = {
    'üî¢ Num√©ricas Base': [],
    'üîó Intera√ß√£o': [],
    'üìä Categ√≥ricas': []
}

for idx, nome in enumerate(feature_names):
    importancia = float(importances_array[idx])
    
    if nome in ['razao_valor_final_inicial', 'nivel_x_tamanho_descricao',
                'nivel_x_tamanho_codigo', 'log_valor_absoluto_final',
                'log_valor_absoluto_inicial', 'diferenca_absoluta_valores']:
        features_por_tipo['üîó Intera√ß√£o'].append((nome, importancia))
    elif "_index" in nome:
        features_por_tipo['üìä Categ√≥ricas'].append((nome, importancia))
    else:
        features_por_tipo['üî¢ Num√©ricas Base'].append((nome, importancia))

# Calcular estat√≠sticas por tipo
print("\nüìà IMPORT√ÇNCIA AGREGADA POR TIPO:\n")
print(f"{'Tipo':<25} {'Qtd':<6} {'Soma Imp.':<15} {'M√©dia Imp.':<15} {'% Total'}")
print("-" * 90)

for tipo, features in features_por_tipo.items():
    qtd = len(features)
    # CORRE√á√ÉO: usar builtins.sum() ao inv√©s de sum()
    soma = builtins.sum(imp for _, imp in features)
    media = soma / qtd if qtd > 0 else 0
    percentual = (soma / importancia_total) * 100
    
    print(f"{tipo:<25} {qtd:<6} {soma:<15.6f} {media:<15.6f} {percentual:.2f}%")

# ================================================================================
# TOP FEATURES DE INTERA√á√ÉO
# ================================================================================

print("\n" + "=" * 90)
print("üîó AN√ÅLISE DETALHADA: FEATURES DE INTERA√á√ÉO")
print("=" * 90)

interacoes = [(nome, float(importances_array[idx])) 
              for idx, nome in enumerate(feature_names) 
              if nome in ['razao_valor_final_inicial', 'nivel_x_tamanho_descricao',
                         'nivel_x_tamanho_codigo', 'log_valor_absoluto_final',
                         'log_valor_absoluto_inicial', 'diferenca_absoluta_valores']]

interacoes_ordenadas = sorted(interacoes, key=lambda x: x[1], reverse=True)

print(f"\n{'Feature de Intera√ß√£o':<35} {'Import√¢ncia':<15} {'% Total':<10} {'Rank Geral'}")
print("-" * 90)

for nome, importancia in interacoes_ordenadas:
    percentual = (importancia / importancia_total) * 100
    
    # Encontrar rank geral
    rank_geral = None
    for i, idx in enumerate(indices_ordenados, 1):
        if feature_names[int(idx)] == nome:
            rank_geral = i
            break
    
    print(f"{nome:<35} {importancia:<15.6f} {percentual:<9.2f}% #{rank_geral}")

# ================================================================================
# TOP PALAVRAS-CHAVE
# ================================================================================

print("\n" + "=" * 90)
print("üî§ AN√ÅLISE DETALHADA: FEATURES DE PALAVRAS-CHAVE")
print("=" * 90)

palavras_chave = [(nome, float(importances_array[idx])) 
                  for idx, nome in enumerate(feature_names) 
                  if nome.startswith('tem_palavra_')]

palavras_ordenadas = sorted(palavras_chave, key=lambda x: x[1], reverse=True)

print(f"\n{'Palavra-Chave':<35} {'Import√¢ncia':<15} {'% Total':<10} {'Rank Geral'}")
print("-" * 90)

for nome, importancia in palavras_ordenadas:
    percentual = (importancia / importancia_total) * 100
    palavra = nome.replace('tem_palavra_', '').upper()
    
    # Encontrar rank geral
    rank_geral = None
    for i, idx in enumerate(indices_ordenados, 1):
        if feature_names[int(idx)] == nome:
            rank_geral = i
            break
    
    print(f"{palavra:<35} {importancia:<15.6f} {percentual:<9.2f}% #{rank_geral}")

# ================================================================================
# VISUALIZA√á√ÉO: IMPORT√ÇNCIAS ACUMULADAS
# ================================================================================

print("\n" + "=" * 90)
print("üìà IMPORT√ÇNCIA ACUMULADA (TOP FEATURES)")
print("=" * 90)

print(f"\n{'Top N Features':<20} {'Import√¢ncia Acumulada':<25} {'% do Total'}")
print("-" * 90)

for n in [5, 10, 15, 20, 30]:
    if n <= len(indices_ordenados):
        # CORRE√á√ÉO: usar builtins.sum()
        importancia_acumulada = builtins.sum(float(importances_array[int(idx)]) 
                                            for idx in indices_ordenados[:n])
        percentual = (importancia_acumulada / importancia_total) * 100
        print(f"Top {n:<17} {importancia_acumulada:<25.6f} {percentual:.2f}%")

# ================================================================================
# INSIGHTS
# ================================================================================

print("\n" + "=" * 90)
print("üí° INSIGHTS E DESCOBERTAS")
print("=" * 90)

# Calcular percentuais (CORRE√á√ÉO: usar builtins.sum())
total_interacao = builtins.sum(imp for _, imp in interacoes)
total_palavras = builtins.sum(imp for _, imp in palavras_chave)
perc_interacao = (total_interacao / importancia_total) * 100
perc_palavras = (total_palavras / importancia_total) * 100

print(f"\n1Ô∏è‚É£  Features de INTERA√á√ÉO representam {perc_interacao:.2f}% da import√¢ncia total")
if perc_interacao > 20:
    print(f"   ‚úÖ EXCELENTE! Intera√ß√µes foram fundamentais para o ganho de +3.79%")
elif perc_interacao > 10:
    print(f"   ‚úÖ BOM! Intera√ß√µes contribu√≠ram significativamente")
else:
    print(f"   ‚ÑπÔ∏è  Contribui√ß√£o moderada das intera√ß√µes")

print(f"\n2Ô∏è‚É£  Features de PALAVRAS-CHAVE representam {perc_palavras:.2f}% da import√¢ncia total")
if perc_palavras > 30:
    print(f"   ‚úÖ CONFIRMADO! Palavras-chave s√£o ESSENCIAIS (por isso V2 falhou ao remov√™-las)")
elif perc_palavras > 20:
    print(f"   ‚úÖ BOM! Palavras-chave s√£o muito importantes")
else:
    print(f"   ‚ÑπÔ∏è  Palavras-chave t√™m import√¢ncia moderada")

# Top feature de intera√ß√£o
if len(interacoes_ordenadas) > 0:
    top_interacao = interacoes_ordenadas[0]
    print(f"\n3Ô∏è‚É£  Melhor feature de intera√ß√£o: '{top_interacao[0]}'")
    print(f"   Import√¢ncia: {top_interacao[1]:.6f} ({(top_interacao[1]/importancia_total)*100:.2f}%)")

# Top palavra-chave
if len(palavras_ordenadas) > 0:
    top_palavra = palavras_ordenadas[0]
    palavra_limpa = top_palavra[0].replace('tem_palavra_', '').upper()
    print(f"\n4Ô∏è‚É£  Palavra-chave mais importante: '{palavra_limpa}'")
    print(f"   Import√¢ncia: {top_palavra[1]:.6f} ({(top_palavra[1]/importancia_total)*100:.2f}%)")

# Top 5 features gerais
print(f"\n5Ô∏è‚É£  Top 5 features mais importantes do modelo:")
for i, idx in enumerate(indices_ordenados[:5], 1):
    idx_int = int(idx)
    nome = feature_names[idx_int]
    importancia = float(importances_array[idx_int])
    print(f"   {i}. {nome}: {(importancia/importancia_total)*100:.2f}%")

# ================================================================================
# COMPARA√á√ÉO: ANTES vs DEPOIS DAS INTERA√á√ïES
# ================================================================================

print("\n" + "=" * 90)
print("‚öñÔ∏è  IMPACTO DAS FEATURES DE INTERA√á√ÉO")
print("=" * 90)

# Features base originais vs features de intera√ß√£o
features_originais = features_por_tipo['üî¢ Num√©ricas Base'] + features_por_tipo['üìä Categ√≥ricas']
features_novas = features_por_tipo['üîó Intera√ß√£o']

total_originais = builtins.sum(imp for _, imp in features_originais)
total_novas = builtins.sum(imp for _, imp in features_novas)

perc_originais = (total_originais / importancia_total) * 100
perc_novas = (total_novas / importancia_total) * 100

print(f"\nüìä Distribui√ß√£o de import√¢ncia:")
print(f"   Features ORIGINAIS (31):  {perc_originais:.2f}%")
print(f"   Features INTERA√á√ÉO (6):   {perc_novas:.2f}%")
print(f"\nüí° Interpreta√ß√£o:")

if perc_novas > 15:
    print(f"   ‚úÖ As 6 features de intera√ß√£o (16.2% das features) representam {perc_novas:.1f}% da import√¢ncia!")
    print(f"   ‚úÖ ROI excelente: cada feature de intera√ß√£o vale {perc_novas/6:.2f}% em m√©dia")
    print(f"   ‚úÖ JUSTIFICA o ganho de +3.79% accuracy!")
else:
    print(f"   ‚ÑπÔ∏è  Features de intera√ß√£o contribuem moderadamente ({perc_novas:.1f}%)")

# Efici√™ncia por feature
media_original = total_originais / len(features_originais) if len(features_originais) > 0 else 0
media_interacao = total_novas / len(features_novas) if len(features_novas) > 0 else 0

print(f"\nüìà Import√¢ncia m√©dia por feature:")
print(f"   Originais:  {media_original:.6f}")
print(f"   Intera√ß√£o:  {media_interacao:.6f}")

if media_interacao > media_original:
    ratio = media_interacao / media_original
    print(f"   ‚úÖ Features de intera√ß√£o s√£o {ratio:.2f}x mais importantes em m√©dia!")
else:
    print(f"   ‚ÑπÔ∏è  Features originais t√™m maior import√¢ncia m√©dia individual")

print("\n" + "=" * 90)
print("‚úÖ AN√ÅLISE DE FEATURE IMPORTANCE CONCLU√çDA!")
print("=" * 90)