In [None]:
import sys
sys.path.append("/home/tsevero/notebooks/SAT_BIG_DATA/data-pipeline/batch/poc")
sys.path.append("/home/tsevero/notebooks/SAT_BIG_DATA/data-pipeline/batch/plugins")
sys.path.append("/home/tsevero/notebooks/SAT_BIG_DATA/data-pipeline/batch/dags")

#Import libs python
from pyspark.sql.types import *
from pyspark.sql.functions import *
from datetime import date

#Import libs internas
from utils import spark_utils_session as utils

from hooks.hdfs.hdfs_helper import HdfsHelper
from jobs.job_base_config import BaseETLJobClass

import poc_helper
poc_helper.load_env("PROD")

In [None]:
def get_session(profile: str, dynamic_allocation_enabled: bool = True) -> utils.DBASparkAppSession:
    """Generates DBASparkAppSession."""
    
    app_name = "tsevero_argos"
    
    spark_builder = (utils.DBASparkAppSession
                     .builder
                     .setAppName(app_name)
                     .usingProcessProfile(profile)
                    )
    
    if dynamic_allocation_enabled:
        spark_builder.autoResourceManagement()

    return spark_builder.build()

session = get_session(profile='efd_t2')

In [None]:
session.sparkSession.sql("SHOW DATABASES").show(truncate=False)

In [None]:
from datetime import date, datetime
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import warnings

# Configura√ß√µes visuais
plt.style.use('seaborn-v0_8-darkgrid')
sns.set_palette("husl")
warnings.filterwarnings('ignore')
plt.rcParams['figure.figsize'] = (14, 7)
plt.rcParams['font.size'] = 10

# Acesso ao SparkSession
spark = session.sparkSession

print("=" * 80)
print("SISTEMA ARGOS - AN√ÅLISE DE MUDAN√áA DE COMPORTAMENTO TRIBUT√ÅRIO")
print("=" * 80)
print(f"Iniciado em: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")

print(f"‚úÖ Sess√£o Spark criada: {spark.sparkContext.appName}")
print(f"   Vers√£o Spark: {spark.version}")
print(f"   Spark UI: {spark.sparkContext.uiWebUrl}")

In [None]:
print("=" * 80)
print("CARREGAMENTO E VERIFICA√á√ÉO DOS DADOS")
print("=" * 80)

# Carregar dados principais
query_principal = """
SELECT 
    periodo,
    cnpj_emitente,
    nm_razao_social,
    gtin,
    ncm,
    descricao,
    aliq_emitente_periodo,
    aliq_emitente_media_historica,
    aliquota_ia,
    bc_total_periodo,
    diferenca_vs_ia_periodo,
    diferenca_vs_ia_historica,
    classificacao_mudanca,
    movimento_vs_ia,
    YEAR(to_date(periodo, 'yyyyMM')) as ano,
    MONTH(to_date(periodo, 'yyyyMM')) as mes
FROM niat.argos_mudanca_comportamento
WHERE periodo >= '202301'
"""

df_argos = spark.sql(query_principal)
df_argos.cache()

total_registros = df_argos.count()
print(f"\n‚úÖ Dados carregados: {total_registros:,} registros")

# Estat√≠sticas b√°sicas
print("\nüìä ESTAT√çSTICAS B√ÅSICAS:")
stats = df_argos.agg(
    countDistinct("cnpj_emitente").alias("empresas"),
    countDistinct("periodo").alias("periodos"),
    countDistinct(concat_ws("-", col("gtin"), col("ncm"))).alias("produtos"),
    sum("bc_total_periodo").alias("bc_total"),
    avg("diferenca_vs_ia_periodo").alias("diff_ia_media")
).collect()[0]

print(f"  ‚Ä¢ Empresas monitoradas: {stats['empresas']:,}")
print(f"  ‚Ä¢ Per√≠odos analisados: {stats['periodos']}")
print(f"  ‚Ä¢ Produtos distintos: {stats['produtos']:,}")
print(f"  ‚Ä¢ Base de C√°lculo Total: R$ {float(stats['bc_total']):,.2f}")
print(f"  ‚Ä¢ Diferen√ßa m√©dia vs IA: {float(stats['diff_ia_media'])*100:.2f}%")

# Criar view para uso nas pr√≥ximas an√°lises
df_argos.createOrReplaceTempView("dados_argos")
print("\n‚úÖ View 'dados_argos' criada para an√°lises subsequentes")

In [None]:
print("=" * 80)
print("1. AN√ÅLISE DE DISTRIBUI√á√ÉO POR CLASSIFICA√á√ÉO DE MUDAN√áA")
print("=" * 80)

# Query de distribui√ß√£o
dist_classificacao = spark.sql("""
SELECT 
    classificacao_mudanca,
    COUNT(*) as quantidade,
    ROUND(COUNT(*) * 100.0 / SUM(COUNT(*)) OVER (), 2) as percentual,
    COUNT(DISTINCT cnpj_emitente) as empresas,
    SUM(bc_total_periodo) as bc_total,
    AVG(diferenca_vs_ia_periodo) as diff_ia_media
FROM dados_argos
GROUP BY classificacao_mudanca
ORDER BY quantidade DESC
""")

dist_class_pd = dist_classificacao.toPandas()
dist_class_pd['bc_total'] = dist_class_pd['bc_total'].astype(float)

# Exibir tabela
print("\nDISTRIBUI√á√ÉO POR CLASSIFICA√á√ÉO:")
for _, row in dist_class_pd.iterrows():
    print(f"  ‚Ä¢ {row['classificacao_mudanca']:<25}: {row['quantidade']:>10,} ({row['percentual']:>5.1f}%)")
    print(f"    Empresas: {row['empresas']:,} | BC: R$ {row['bc_total']:,.2f}")

# Visualiza√ß√£o
fig, axes = plt.subplots(1, 3, figsize=(20, 6))
fig.suptitle('Distribui√ß√£o por Classifica√ß√£o de Mudan√ßa', fontsize=16, fontweight='bold')

# Gr√°fico 1: Pizza - Percentual
colors = ['#d62728', '#ff7f0e', '#2ca02c', '#1f77b4']
axes[0].pie(dist_class_pd['quantidade'], labels=dist_class_pd['classificacao_mudanca'], 
            autopct='%1.1f%%', startangle=90, colors=colors)
axes[0].set_title('Percentual de Registros')

# Gr√°fico 2: Barras - Quantidade
sns.barplot(data=dist_class_pd, y='classificacao_mudanca', x='quantidade', 
            ax=axes[1], palette=colors, orient='h')
axes[1].set_title('Quantidade de Casos')
axes[1].set_xlabel('N√∫mero de Registros')

# Gr√°fico 3: Barras - Impacto Financeiro
sns.barplot(data=dist_class_pd, y='classificacao_mudanca', x=dist_class_pd['bc_total']/1e6, 
            ax=axes[2], palette=colors, orient='h')
axes[2].set_title('Impacto Financeiro')
axes[2].set_xlabel('Base de C√°lculo (Milh√µes R$)')

plt.tight_layout()
plt.show()

In [None]:
print("=" * 80)
print("2. AN√ÅLISE DE MOVIMENTO vs AL√çQUOTA CORRETA (IA)")
print("=" * 80)

# Query de movimento
dist_movimento = spark.sql("""
SELECT 
    movimento_vs_ia,
    COUNT(*) as quantidade,
    ROUND(COUNT(*) * 100.0 / SUM(COUNT(*)) OVER (), 2) as percentual,
    COUNT(DISTINCT cnpj_emitente) as empresas,
    SUM(bc_total_periodo) as bc_total
FROM dados_argos
WHERE movimento_vs_ia != 'SEM_REFERENCIA_IA'
GROUP BY movimento_vs_ia
ORDER BY 
    CASE 
        WHEN movimento_vs_ia = 'APROXIMOU_DA_CORRETA' THEN 1
        WHEN movimento_vs_ia = 'MANTEVE_DISTANCIA' THEN 2
        ELSE 3
    END
""")

dist_mov_pd = dist_movimento.toPandas()
dist_mov_pd['bc_total'] = dist_mov_pd['bc_total'].astype(float)

# Exibir dados
print("\nDISTRIBUI√á√ÉO POR MOVIMENTO:")
for _, row in dist_mov_pd.iterrows():
    print(f"  ‚Ä¢ {row['movimento_vs_ia']:<25}: {row['quantidade']:>10,} ({row['percentual']:>5.1f}%)")
    print(f"    Empresas: {row['empresas']:,} | BC: R$ {row['bc_total']:,.2f}")

# Visualiza√ß√£o
fig, axes = plt.subplots(1, 2, figsize=(18, 6))
fig.suptitle('An√°lise de Movimento em Rela√ß√£o √† Al√≠quota Correta (IA)', fontsize=16, fontweight='bold')

# Gr√°fico 1: Barras horizontais
colors_mov = {'APROXIMOU_DA_CORRETA': '#2ca02c', 
              'MANTEVE_DISTANCIA': '#ff7f0e', 
              'AFASTOU_DA_CORRETA': '#d62728'}
palette_mov = [colors_mov[x] for x in dist_mov_pd['movimento_vs_ia']]

sns.barplot(data=dist_mov_pd, y='movimento_vs_ia', x='quantidade', 
            ax=axes[0], palette=palette_mov, orient='h')
axes[0].set_title('Quantidade de Casos')
axes[0].set_xlabel('N√∫mero de Registros')

# Gr√°fico 2: Impacto financeiro
sns.barplot(data=dist_mov_pd, y='movimento_vs_ia', x=dist_mov_pd['bc_total']/1e6, 
            ax=axes[1], palette=palette_mov, orient='h')
axes[1].set_title('Impacto Financeiro')
axes[1].set_xlabel('Base de C√°lculo (Milh√µes R$)')

plt.tight_layout()
plt.show()

# Taxa de Corre√ß√£o
taxa_correcao = dist_mov_pd[dist_mov_pd['movimento_vs_ia'] == 'APROXIMOU_DA_CORRETA']['percentual'].values[0]
print(f"\nüéØ TAXA DE CORRE√á√ÉO GERAL: {taxa_correcao:.1f}%")

In [None]:
print("=" * 80)
print("3. EVOLU√á√ÉO TEMPORAL DAS MUDAN√áAS")
print("=" * 80)

# Query temporal
evolucao = spark.sql("""
SELECT 
    periodo,
    to_date(periodo, 'yyyyMM') as periodo_dt,
    COUNT(*) as total_casos,
    COUNT(DISTINCT cnpj_emitente) as empresas_ativas,
    SUM(bc_total_periodo) as bc_total,
    SUM(CASE WHEN classificacao_mudanca = 'MUDANCA_EXTREMA' THEN 1 ELSE 0 END) as mudancas_extremas,
    SUM(CASE WHEN classificacao_mudanca = 'MUDANCA_SIGNIFICATIVA' THEN 1 ELSE 0 END) as mudancas_significativas,
    SUM(CASE WHEN movimento_vs_ia = 'APROXIMOU_DA_CORRETA' THEN 1 ELSE 0 END) as aproximou_ia,
    SUM(CASE WHEN movimento_vs_ia = 'AFASTOU_DA_CORRETA' THEN 1 ELSE 0 END) as afastou_ia,
    AVG(diferenca_vs_ia_periodo) as diff_ia_media
FROM dados_argos
GROUP BY periodo
ORDER BY periodo
""")

evolucao_pd = evolucao.toPandas()
evolucao_pd['bc_total'] = evolucao_pd['bc_total'].astype(float)
evolucao_pd['periodo_dt'] = pd.to_datetime(evolucao_pd['periodo_dt'])
evolucao_pd['taxa_correcao'] = (evolucao_pd['aproximou_ia'] / evolucao_pd['total_casos'] * 100)

# Exibir √∫ltimos 12 per√≠odos
print("\nEVOLU√á√ÉO DOS √öLTIMOS 12 PER√çODOS:")
for _, row in evolucao_pd.tail(12).iterrows():
    print(f"  {row['periodo']} ({row['periodo_dt'].strftime('%Y-%m')})")
    print(f"    Casos: {row['total_casos']:,} | Empresas: {row['empresas_ativas']:,}")
    print(f"    Extremas: {row['mudancas_extremas']} | Taxa Corre√ß√£o: {row['taxa_correcao']:.1f}%")

# Visualiza√ß√£o
fig, axes = plt.subplots(3, 1, figsize=(16, 12))
fig.suptitle('Evolu√ß√£o Temporal do Sistema ARGOS', fontsize=18, fontweight='bold')

# Gr√°fico 1: Total de casos e empresas
ax1 = axes[0]
ax1.plot(evolucao_pd['periodo_dt'], evolucao_pd['total_casos'], 
         marker='o', color='royalblue', linewidth=2, label='Total de Casos')
ax1_twin = ax1.twinx()
ax1_twin.bar(evolucao_pd['periodo_dt'], evolucao_pd['empresas_ativas'], 
             color='lightblue', alpha=0.5, width=20, label='Empresas Ativas')
ax1.set_ylabel('Total de Casos', color='royalblue')
ax1_twin.set_ylabel('Empresas Ativas', color='lightblue')
ax1.set_title('Volume de An√°lises por Per√≠odo')
ax1.grid(True, alpha=0.3)
ax1.legend(loc='upper left')
ax1_twin.legend(loc='upper right')

# Gr√°fico 2: Mudan√ßas extremas e significativas
ax2 = axes[1]
ax2.bar(evolucao_pd['periodo_dt'], evolucao_pd['mudancas_extremas'], 
        color='#d62728', alpha=0.7, label='Mudan√ßas Extremas', width=20)
ax2.bar(evolucao_pd['periodo_dt'], evolucao_pd['mudancas_significativas'], 
        bottom=evolucao_pd['mudancas_extremas'], color='#ff7f0e', 
        alpha=0.7, label='Mudan√ßas Significativas', width=20)
ax2.set_ylabel('Quantidade')
ax2.set_title('Distribui√ß√£o de Mudan√ßas Relevantes')
ax2.legend()
ax2.grid(True, alpha=0.3)

# Gr√°fico 3: Taxa de corre√ß√£o
ax3 = axes[2]
ax3.plot(evolucao_pd['periodo_dt'], evolucao_pd['taxa_correcao'], 
         marker='s', color='#2ca02c', linewidth=2, markersize=8)
ax3.axhline(y=evolucao_pd['taxa_correcao'].mean(), color='red', 
            linestyle='--', alpha=0.7, label=f'M√©dia: {evolucao_pd["taxa_correcao"].mean():.1f}%')
ax3.set_ylabel('Taxa de Corre√ß√£o (%)')
ax3.set_xlabel('Per√≠odo')
ax3.set_title('Evolu√ß√£o da Taxa de Corre√ß√£o (Aproxima√ß√£o da Al√≠quota IA)')
ax3.grid(True, alpha=0.3)
ax3.legend()

plt.tight_layout()
plt.show()

In [None]:
print("=" * 80)
print("4. RANKING DE EMPRESAS POR MUDAN√áAS EXTREMAS")
print("=" * 80)

# Query de ranking
ranking_empresas = spark.sql("""
SELECT 
    cnpj_emitente,
    nm_razao_social,
    COUNT(*) as total_casos,
    COUNT(DISTINCT periodo) as periodos_ativos,
    COUNT(DISTINCT CONCAT(gtin, '-', ncm)) as produtos_distintos,
    SUM(CASE WHEN classificacao_mudanca = 'MUDANCA_EXTREMA' THEN 1 ELSE 0 END) as mudancas_extremas,
    SUM(CASE WHEN movimento_vs_ia = 'APROXIMOU_DA_CORRETA' THEN 1 ELSE 0 END) as aproximou_ia,
    SUM(CASE WHEN movimento_vs_ia = 'AFASTOU_DA_CORRETA' THEN 1 ELSE 0 END) as afastou_ia,
    SUM(bc_total_periodo) as bc_total,
    AVG(diferenca_vs_ia_periodo) as diff_ia_media
FROM dados_argos
GROUP BY cnpj_emitente, nm_razao_social
HAVING COUNT(*) >= 10
ORDER BY mudancas_extremas DESC, bc_total DESC
LIMIT 20
""")

ranking_pd = ranking_empresas.toPandas()
ranking_pd['bc_total'] = ranking_pd['bc_total'].astype(float)
ranking_pd['taxa_extremas'] = (ranking_pd['mudancas_extremas'] / ranking_pd['total_casos'] * 100)
ranking_pd['taxa_correcao'] = (ranking_pd['aproximou_ia'] / ranking_pd['total_casos'] * 100)

# Exibir ranking
print("\nTOP 20 EMPRESAS POR MUDAN√áAS EXTREMAS:")
for i, row in ranking_pd.iterrows():
    print(f"\n{i+1:2d}. {row['nm_razao_social'][:50]}")
    print(f"    CNPJ: {row['cnpj_emitente']}")
    print(f"    Total de Casos: {row['total_casos']:,} | Produtos: {row['produtos_distintos']:,}")
    print(f"    Mudan√ßas Extremas: {row['mudancas_extremas']} ({row['taxa_extremas']:.1f}%)")
    print(f"    Taxa de Corre√ß√£o: {row['taxa_correcao']:.1f}%")
    print(f"    Base de C√°lculo: R$ {row['bc_total']:,.2f}")

# Visualiza√ß√£o
fig, axes = plt.subplots(2, 2, figsize=(20, 14))
fig.suptitle('An√°lise das Top 20 Empresas Cr√≠ticas', fontsize=18, fontweight='bold')

# Gr√°fico 1: Mudan√ßas extremas
sns.barplot(data=ranking_pd.head(15), y='nm_razao_social', x='mudancas_extremas', 
            ax=axes[0,0], palette='Reds_r', orient='h')
axes[0,0].set_title('Top 15: N√∫mero de Mudan√ßas Extremas')
axes[0,0].set_xlabel('Quantidade')
axes[0,0].set_ylabel('')

# Gr√°fico 2: Taxa de corre√ß√£o
ranking_pd_sorted = ranking_pd.head(15).sort_values('taxa_correcao', ascending=False)
colors_correcao = ['#2ca02c' if x >= 50 else '#ff7f0e' if x >= 30 else '#d62728' 
                   for x in ranking_pd_sorted['taxa_correcao']]
sns.barplot(data=ranking_pd_sorted, y='nm_razao_social', x='taxa_correcao', 
            ax=axes[0,1], palette=colors_correcao, orient='h')
axes[0,1].set_title('Top 15: Taxa de Corre√ß√£o (%)')
axes[0,1].set_xlabel('Percentual (%)')
axes[0,1].set_ylabel('')
axes[0,1].axvline(x=50, color='red', linestyle='--', alpha=0.5)

# Gr√°fico 3: Base de c√°lculo
sns.barplot(data=ranking_pd.head(15), y='nm_razao_social', x=ranking_pd.head(15)['bc_total']/1e6, 
            ax=axes[1,0], palette='viridis', orient='h')
axes[1,0].set_title('Top 15: Impacto Financeiro')
axes[1,0].set_xlabel('Base de C√°lculo (Milh√µes R$)')
axes[1,0].set_ylabel('')

# Gr√°fico 4: Scatter - Taxa extremas vs Taxa corre√ß√£o
scatter_data = ranking_pd.head(20)
axes[1,1].scatter(scatter_data['taxa_extremas'], scatter_data['taxa_correcao'], 
                  s=scatter_data['bc_total']/1e5, alpha=0.6, c=scatter_data['mudancas_extremas'], 
                  cmap='RdYlGn_r')
axes[1,1].set_xlabel('Taxa de Mudan√ßas Extremas (%)')
axes[1,1].set_ylabel('Taxa de Corre√ß√£o (%)')
axes[1,1].set_title('Rela√ß√£o: Taxa Extremas vs Taxa Corre√ß√£o\n(tamanho = BC total)')
axes[1,1].grid(True, alpha=0.3)
axes[1,1].axhline(y=50, color='green', linestyle='--', alpha=0.5, label='Meta 50%')
axes[1,1].axvline(x=30, color='red', linestyle='--', alpha=0.5, label='Cr√≠tico 30%')
axes[1,1].legend()

plt.tight_layout()
plt.show()

In [None]:
print("=" * 80)
print("5. AN√ÅLISE SETORIAL POR NCM (2 D√çGITOS)")
print("=" * 80)

# Query setorial
analise_setorial = spark.sql("""
SELECT 
    SUBSTR(CAST(ncm AS STRING), 1, 2) as setor_ncm,
    COUNT(*) as total_casos,
    COUNT(DISTINCT cnpj_emitente) as empresas,
    COUNT(DISTINCT CONCAT(gtin, '-', ncm)) as produtos,
    SUM(bc_total_periodo) as bc_total,
    SUM(CASE WHEN classificacao_mudanca = 'MUDANCA_EXTREMA' THEN 1 ELSE 0 END) as mudancas_extremas,
    SUM(CASE WHEN movimento_vs_ia = 'APROXIMOU_DA_CORRETA' THEN 1 ELSE 0 END) as aproximou_ia,
    AVG(diferenca_vs_ia_periodo) as diff_ia_media
FROM dados_argos
WHERE LENGTH(CAST(ncm AS STRING)) >= 2
GROUP BY SUBSTR(CAST(ncm AS STRING), 1, 2)
HAVING COUNT(*) >= 100
ORDER BY mudancas_extremas DESC
LIMIT 20
""")

setorial_pd = analise_setorial.toPandas()
setorial_pd['bc_total'] = setorial_pd['bc_total'].astype(float)
setorial_pd['taxa_extremas'] = (setorial_pd['mudancas_extremas'] / setorial_pd['total_casos'] * 100)
setorial_pd['taxa_correcao'] = (setorial_pd['aproximou_ia'] / setorial_pd['total_casos'] * 100)

# Dicion√°rio de descri√ß√µes NCM
ncm_desc = {
    '01': 'Animais Vivos', '02': 'Carnes', '04': 'Latic√≠nios',
    '17': 'A√ß√∫cares', '19': 'Cereais', '22': 'Bebidas',
    '33': 'Cosm√©ticos', '84': 'M√°quinas', '85': 'El√©tricos'
}
setorial_pd['descricao'] = setorial_pd['setor_ncm'].map(lambda x: ncm_desc.get(x, f'Se√ß√£o {x}'))

# Exibir an√°lise
print("\nTOP 20 SETORES POR MUDAN√áAS EXTREMAS:")
for _, row in setorial_pd.iterrows():
    print(f"\nSetor {row['setor_ncm']}xx - {row['descricao']}")
    print(f"  Total: {row['total_casos']:,} casos | Empresas: {row['empresas']:,} | Produtos: {row['produtos']:,}")
    print(f"  Extremas: {row['mudancas_extremas']} ({row['taxa_extremas']:.1f}%)")
    print(f"  Taxa Corre√ß√£o: {row['taxa_correcao']:.1f}%")
    print(f"  BC Total: R$ {row['bc_total']:,.2f}")

# Visualiza√ß√£o
fig, axes = plt.subplots(2, 2, figsize=(20, 12))
fig.suptitle('An√°lise Setorial por NCM (2 d√≠gitos)', fontsize=18, fontweight='bold')

# Gr√°fico 1: Mudan√ßas extremas por setor
setorial_top15 = setorial_pd.head(15)
sns.barplot(data=setorial_top15, x='mudancas_extremas', y='descricao', 
            ax=axes[0,0], palette='rocket', orient='h')
axes[0,0].set_title('Top 15: Mudan√ßas Extremas por Setor')
axes[0,0].set_xlabel('Quantidade de Mudan√ßas Extremas')

# Gr√°fico 2: Taxa de corre√ß√£o
setorial_corr = setorial_pd.sort_values('taxa_correcao', ascending=False).head(15)
sns.barplot(data=setorial_corr, x='taxa_correcao', y='descricao', 
            ax=axes[0,1], palette='viridis', orient='h')
axes[0,1].set_title('Top 15: Taxa de Corre√ß√£o por Setor')
axes[0,1].set_xlabel('Taxa de Corre√ß√£o (%)')

# Gr√°fico 3: Impacto financeiro
setorial_bc = setorial_pd.sort_values('bc_total', ascending=False).head(15)
sns.barplot(data=setorial_bc, x=setorial_bc['bc_total']/1e6, y='descricao', 
            ax=axes[1,0], palette='magma', orient='h')
axes[1,0].set_title('Top 15: Impacto Financeiro por Setor')
axes[1,0].set_xlabel('Base de C√°lculo (Milh√µes R$)')

# Gr√°fico 4: Scatter comparativo
axes[1,1].scatter(setorial_pd['taxa_extremas'], setorial_pd['taxa_correcao'], 
                  s=setorial_pd['empresas']*2, alpha=0.6, 
                  c=setorial_pd['mudancas_extremas'], cmap='coolwarm')
axes[1,1].set_xlabel('Taxa de Mudan√ßas Extremas (%)')
axes[1,1].set_ylabel('Taxa de Corre√ß√£o (%)')
axes[1,1].set_title('Rela√ß√£o: Taxa Extremas vs Corre√ß√£o por Setor\n(tamanho = n¬∫ empresas)')
axes[1,1].grid(True, alpha=0.3)

# Adicionar labels aos pontos principais
for _, row in setorial_pd.head(10).iterrows():
    axes[1,1].annotate(row['setor_ncm'], 
                       (row['taxa_extremas'], row['taxa_correcao']),
                       fontsize=8, alpha=0.7)

plt.tight_layout()
plt.show()

In [None]:
print("=" * 80)
print("6. DASHBOARD EXECUTIVO - KPIs PRINCIPAIS")
print("=" * 80)

# Query de KPIs
kpis = spark.sql("""
SELECT 
    COUNT(*) as total_registros,
    COUNT(DISTINCT cnpj_emitente) as total_empresas,
    COUNT(DISTINCT periodo) as total_periodos,
    COUNT(DISTINCT CONCAT(gtin, '-', ncm)) as total_produtos,
    SUM(bc_total_periodo) as bc_total_sistema,
    
    SUM(CASE WHEN classificacao_mudanca = 'MUDANCA_EXTREMA' THEN 1 ELSE 0 END) as mudancas_extremas,
    SUM(CASE WHEN classificacao_mudanca = 'MUDANCA_SIGNIFICATIVA' THEN 1 ELSE 0 END) as mudancas_significativas,
    
    SUM(CASE WHEN movimento_vs_ia = 'APROXIMOU_DA_CORRETA' THEN 1 ELSE 0 END) as aproximou_ia,
    SUM(CASE WHEN movimento_vs_ia = 'AFASTOU_DA_CORRETA' THEN 1 ELSE 0 END) as afastou_ia,
    SUM(CASE WHEN movimento_vs_ia = 'MANTEVE_DISTANCIA' THEN 1 ELSE 0 END) as manteve_distancia,
    
    SUM(CASE WHEN movimento_vs_ia = 'APROXIMOU_DA_CORRETA' 
        THEN bc_total_periodo * ABS(diferenca_vs_ia_historica) ELSE 0 END) as impacto_correcoes,
    
    AVG(diferenca_vs_ia_periodo) as diff_ia_media,
    STDDEV(diferenca_vs_ia_periodo) as diff_ia_desvio
    
FROM niat.argos_mudanca_comportamento
WHERE movimento_vs_ia != 'SEM_REFERENCIA_IA'
  AND periodo >= '202301'
""")

kpi_data = kpis.collect()[0]

# Converter Decimals para float
bc_total_sistema = float(kpi_data['bc_total_sistema'])
impacto_correcoes = float(kpi_data['impacto_correcoes'])
diff_ia_media = float(kpi_data['diff_ia_media'])
diff_ia_desvio = float(kpi_data['diff_ia_desvio'])

# Calcular m√©tricas derivadas
taxa_correcao_global = (kpi_data['aproximou_ia'] / kpi_data['total_registros'] * 100)
taxa_extremas = (kpi_data['mudancas_extremas'] / kpi_data['total_registros'] * 100)
empresas_corrigindo = spark.sql("""
    SELECT COUNT(DISTINCT cnpj_emitente) as qtd
    FROM niat.argos_mudanca_comportamento 
    WHERE movimento_vs_ia = 'APROXIMOU_DA_CORRETA'
      AND periodo >= '202301'
""").collect()[0]['qtd']

# Exibir KPIs
print(f"\nüìä M√âTRICAS PRINCIPAIS DO SISTEMA ARGOS:")
print(f"  ‚Ä¢ Total de Registros Analisados: {kpi_data['total_registros']:,}")
print(f"  ‚Ä¢ Empresas Monitoradas: {kpi_data['total_empresas']:,}")
print(f"  ‚Ä¢ Per√≠odos Analisados: {kpi_data['total_periodos']}")
print(f"  ‚Ä¢ Produtos Distintos: {kpi_data['total_produtos']:,}")
print(f"  ‚Ä¢ Base de C√°lculo Total: R$ {bc_total_sistema:,.2f}")

print(f"\nüéØ INDICADORES DE EFETIVIDADE:")
print(f"  ‚Ä¢ Taxa de Corre√ß√£o Global: {taxa_correcao_global:.1f}%")
print(f"  ‚Ä¢ Empresas em Corre√ß√£o: {empresas_corrigindo:,} ({empresas_corrigindo/kpi_data['total_empresas']*100:.1f}%)")
print(f"  ‚Ä¢ Impacto Arrecadat√°rio (Corre√ß√µes): R$ {impacto_correcoes:,.2f}")

print(f"\n‚ö†Ô∏è INDICADORES DE RISCO:")
print(f"  ‚Ä¢ Mudan√ßas Extremas: {kpi_data['mudancas_extremas']:,} ({taxa_extremas:.1f}%)")
print(f"  ‚Ä¢ Mudan√ßas Significativas: {kpi_data['mudancas_significativas']:,}")
print(f"  ‚Ä¢ Afastou da Al√≠quota Correta: {kpi_data['afastou_ia']:,}")

print(f"\nüìà ESTAT√çSTICAS:")
print(f"  ‚Ä¢ Diferen√ßa M√©dia vs IA: {diff_ia_media*100:+.2f}%")
print(f"  ‚Ä¢ Desvio Padr√£o: ¬±{diff_ia_desvio*100:.2f}%")

# Visualiza√ß√£o Dashboard (com convers√µes corretas)
fig = plt.figure(figsize=(20, 12))
gs = fig.add_gridspec(3, 3, hspace=0.3, wspace=0.3)

# KPI 1: Distribui√ß√£o de Movimentos
ax1 = fig.add_subplot(gs[0, 0])
movimentos = ['Aproximou\nIA', 'Manteve\nDist√¢ncia', 'Afastou\nIA']
valores_mov = [kpi_data['aproximou_ia'], kpi_data['manteve_distancia'], kpi_data['afastou_ia']]
cores_mov = ['#2ca02c', '#ff7f0e', '#d62728']
ax1.pie(valores_mov, labels=movimentos, autopct='%1.1f%%', colors=cores_mov, startangle=90)
ax1.set_title('Distribui√ß√£o de Movimentos vs IA', fontweight='bold')

# KPI 2: Taxa de Corre√ß√£o
ax2 = fig.add_subplot(gs[0, 1])
ax2.barh(['Taxa de Corre√ß√£o'], [taxa_correcao_global], color='#2ca02c', height=0.5)
ax2.barh(['Taxa de Corre√ß√£o'], [100-taxa_correcao_global], left=[taxa_correcao_global], 
         color='lightgray', height=0.5, alpha=0.3)
ax2.set_xlim(0, 100)
ax2.set_xlabel('Percentual (%)')
ax2.set_title('Taxa de Corre√ß√£o Global', fontweight='bold')
ax2.text(taxa_correcao_global/2, 0, f'{taxa_correcao_global:.1f}%', 
         ha='center', va='center', fontsize=14, fontweight='bold', color='white')

# KPI 7: Impacto Financeiro (CORRIGIDO)
ax7 = fig.add_subplot(gs[2, 2])
impactos = ['BC Total\nSistema', 'Impacto\nCorre√ß√µes']
valores_impacto = [bc_total_sistema/1e6, impacto_correcoes/1e6]  # Agora s√£o floats
cores_impacto = ['#1f77b4', '#2ca02c']
bars = ax7.bar(impactos, valores_impacto, color=cores_impacto)
ax7.set_ylabel('Valores (Milh√µes R$)')
ax7.set_title('Impacto Financeiro', fontweight='bold')
for bar in bars:
    height = bar.get_height()
    ax7.text(bar.get_x() + bar.get_width()/2., height,
             f'R$ {height:.1f}M', ha='center', va='bottom', fontweight='bold')

plt.suptitle('DASHBOARD EXECUTIVO - SISTEMA ARGOS', fontsize=20, fontweight='bold', y=0.98)
plt.show()

In [None]:
print("=" * 80)
print("7. AN√ÅLISE DE PRODUTOS COM MAIOR VOLATILIDADE")
print("=" * 80)

# Query de produtos
produtos_volateis = spark.sql("""
SELECT 
    gtin,
    ncm,
    descricao,
    aliquota_ia,
    COUNT(DISTINCT cnpj_emitente) as qtd_empresas,
    COUNT(DISTINCT periodo) as qtd_periodos,
    COUNT(*) as total_registros,
    AVG(aliq_emitente_periodo) as aliq_media,
    STDDEV(aliq_emitente_periodo) as aliq_desvio,
    MIN(aliq_emitente_periodo) as aliq_minima,
    MAX(aliq_emitente_periodo) as aliq_maxima,
    AVG(diferenca_vs_ia_periodo) as diff_ia_media,
    SUM(bc_total_periodo) as bc_total,
    SUM(CASE WHEN classificacao_mudanca IN ('MUDANCA_EXTREMA', 'MUDANCA_SIGNIFICATIVA') 
        THEN 1 ELSE 0 END) as mudancas_relevantes
FROM niat.argos_mudanca_comportamento
WHERE aliquota_ia IS NOT NULL
  AND periodo >= '202301'
GROUP BY gtin, ncm, descricao, aliquota_ia
HAVING COUNT(DISTINCT cnpj_emitente) >= 3
ORDER BY aliq_desvio DESC, bc_total DESC
LIMIT 30
""")

produtos_pd = produtos_volateis.toPandas()

# Converter TODOS os campos num√©ricos de Decimal para float
numeric_cols = ['aliquota_ia', 'aliq_media', 'aliq_desvio', 'aliq_minima', 
                'aliq_maxima', 'diff_ia_media', 'bc_total']
for col in numeric_cols:
    if col in produtos_pd.columns:
        produtos_pd[col] = pd.to_numeric(produtos_pd[col], errors='coerce')

# Calcular coeficiente de varia√ß√£o (agora com valores float)
produtos_pd['coef_variacao'] = produtos_pd.apply(
    lambda row: (row['aliq_desvio'] / row['aliq_media'] * 100) 
    if row['aliq_media'] != 0 else 0, axis=1
)

# Exibir produtos
print("\nTOP 30 PRODUTOS COM MAIOR VOLATILIDADE:")
for i, row in produtos_pd.head(15).iterrows():
    print(f"\n{i+1}. {row['descricao'][:50]}")
    print(f"   GTIN: {row['gtin']} | NCM: {row['ncm']}")
    print(f"   Empresas: {int(row['qtd_empresas'])} | Per√≠odos: {int(row['qtd_periodos'])}")
    print(f"   Al√≠quota IA: {row['aliquota_ia']*100:.2f}% | Praticada M√©dia: {row['aliq_media']*100:.2f}%")
    print(f"   Desvio: ¬±{row['aliq_desvio']*100:.2f}% | Range: {row['aliq_minima']*100:.2f}% a {row['aliq_maxima']*100:.2f}%")
    print(f"   BC Total: R$ {row['bc_total']:,.2f} | Mudan√ßas Relevantes: {int(row['mudancas_relevantes'])}")

# Visualiza√ß√£o
fig, axes = plt.subplots(2, 2, figsize=(20, 12))
fig.suptitle('An√°lise de Produtos com Maior Volatilidade', fontsize=16, fontweight='bold')

# Gr√°fico 1: Top 15 por desvio padr√£o
top15_vol = produtos_pd.head(15)
sns.barplot(data=top15_vol, y='descricao', x=top15_vol['aliq_desvio']*100, 
            ax=axes[0,0], palette='Reds_r', orient='h')
axes[0,0].set_title('Top 15: Maior Desvio Padr√£o de Al√≠quota')
axes[0,0].set_xlabel('Desvio Padr√£o (%)')
axes[0,0].set_ylabel('')

# Gr√°fico 2: Impacto financeiro
top15_bc = produtos_pd.nlargest(15, 'bc_total')
sns.barplot(data=top15_bc, y='descricao', x=top15_bc['bc_total']/1e6, 
            ax=axes[0,1], palette='viridis', orient='h')
axes[0,1].set_title('Top 15: Maior Impacto Financeiro')
axes[0,1].set_xlabel('Base de C√°lculo (Milh√µes R$)')
axes[0,1].set_ylabel('')

# Gr√°fico 3: Scatter - Volatilidade vs Empresas
axes[1,0].scatter(produtos_pd['qtd_empresas'], produtos_pd['aliq_desvio']*100, 
                  s=np.log1p(produtos_pd['bc_total'])/10, alpha=0.6, 
                  c=produtos_pd['mudancas_relevantes'], cmap='YlOrRd',
                  edgecolors='black', linewidth=0.5)
axes[1,0].set_xlabel('Quantidade de Empresas')
axes[1,0].set_ylabel('Desvio Padr√£o da Al√≠quota (%)')
axes[1,0].set_title('Volatilidade vs Abrang√™ncia\n(tamanho = log(BC))')
axes[1,0].grid(True, alpha=0.3)

# Gr√°fico 4: Histograma de desvio padr√£o
axes[1,1].hist(produtos_pd['aliq_desvio']*100, bins=20, color='coral', 
               edgecolor='black', alpha=0.7)
axes[1,1].set_xlabel('Desvio Padr√£o da Al√≠quota (%)')
axes[1,1].set_ylabel('Frequ√™ncia')
axes[1,1].set_title('Distribui√ß√£o do Desvio Padr√£o')
axes[1,1].axvline(produtos_pd['aliq_desvio'].median()*100, color='red', 
                  linestyle='--', label=f'Mediana: {produtos_pd["aliq_desvio"].median()*100:.2f}%')
axes[1,1].legend()

plt.tight_layout()
plt.show()

In [None]:
print("=" * 80)
print("8. IDENTIFICA√á√ÉO DE EMPRESAS COM COMPORTAMENTO SISTEM√ÅTICO")
print("=" * 80)

# Query de empresas sistem√°ticas
empresas_sistematicas = spark.sql("""
WITH empresa_analise AS (
    SELECT 
        cnpj_emitente,
        nm_razao_social,
        COUNT(*) as total_casos,
        COUNT(DISTINCT periodo) as periodos_ativos,
        SUM(bc_total_periodo) as bc_total,
        AVG(diferenca_vs_ia_periodo) as diff_ia_media,
        STDDEV(diferenca_vs_ia_periodo) as diff_ia_desvio,
        
        -- Padr√µes de comportamento
        SUM(CASE WHEN classificacao_mudanca = 'MUDANCA_EXTREMA' THEN 1 ELSE 0 END) as mudancas_extremas,
        SUM(CASE WHEN movimento_vs_ia = 'AFASTOU_DA_CORRETA' THEN 1 ELSE 0 END) as afastou_ia,
        SUM(CASE WHEN movimento_vs_ia = 'APROXIMOU_DA_CORRETA' THEN 1 ELSE 0 END) as aproximou_ia,
        
        -- Taxas percentuais
        AVG(CASE WHEN classificacao_mudanca = 'MUDANCA_EXTREMA' THEN 100.0 ELSE 0.0 END) as taxa_extremas,
        AVG(CASE WHEN movimento_vs_ia = 'AFASTOU_DA_CORRETA' THEN 100.0 ELSE 0.0 END) as taxa_afastou
        
    FROM dados_argos
    WHERE movimento_vs_ia != 'SEM_REFERENCIA_IA'
    GROUP BY cnpj_emitente, nm_razao_social
    HAVING COUNT(*) >= 10
)
SELECT 
    *,
    CASE 
        WHEN taxa_extremas >= 60 AND taxa_afastou >= 50 THEN 'CR√çTICO - Afastamento Sistem√°tico'
        WHEN taxa_extremas >= 40 THEN 'ALTO RISCO - Alta Volatilidade'
        WHEN taxa_afastou >= 60 THEN 'ALTO RISCO - Piora Consistente'
        WHEN aproximou_ia >= (total_casos * 0.6) THEN 'POSITIVO - Corre√ß√£o Sistem√°tica'
        ELSE 'MODERADO - Comportamento Irregular'
    END as perfil_risco
FROM empresa_analise
ORDER BY taxa_extremas DESC, taxa_afastou DESC, bc_total DESC
LIMIT 30
""")

sist_pd = empresas_sistematicas.toPandas()
sist_pd['bc_total'] = sist_pd['bc_total'].astype(float)

# Exibir empresas sistem√°ticas
print("\nEMPRESAS COM COMPORTAMENTO SISTEM√ÅTICO:")
for i, row in sist_pd.iterrows():
    print(f"\n{i+1}. {row['nm_razao_social'][:50]}")
    print(f"   Perfil: {row['perfil_risco']}")
    print(f"   Casos: {row['total_casos']:,} | Per√≠odos: {row['periodos_ativos']}")
    print(f"   Taxa Extremas: {row['taxa_extremas']:.1f}% | Taxa Afastou IA: {row['taxa_afastou']:.1f}%")
    print(f"   Diferen√ßa IA M√©dia: {row['diff_ia_media']*100:+.2f}% (¬±{row['diff_ia_desvio']*100:.2f}%)")
    print(f"   BC Total: R$ {row['bc_total']:,.2f}")

# Visualiza√ß√£o
fig, axes = plt.subplots(2, 2, figsize=(20, 12))
fig.suptitle('An√°lise de Empresas com Comportamento Sistem√°tico', fontsize=18, fontweight='bold')

# Gr√°fico 1: Distribui√ß√£o por perfil
perfil_dist = sist_pd['perfil_risco'].value_counts()
colors_perfil = {'CR√çTICO - Afastamento Sistem√°tico': '#d62728',
                 'ALTO RISCO - Alta Volatilidade': '#ff7f0e',
                 'ALTO RISCO - Piora Consistente': '#ff9896',
                 'POSITIVO - Corre√ß√£o Sistem√°tica': '#2ca02c',
                 'MODERADO - Comportamento Irregular': '#ffdd70'}
axes[0,0].pie(perfil_dist.values, labels=perfil_dist.index, autopct='%1.1f%%',
              colors=[colors_perfil.get(x, '#gray') for x in perfil_dist.index])
axes[0,0].set_title('Distribui√ß√£o por Perfil de Risco')

# Gr√°fico 2: Taxa extremas vs Taxa afastou
scatter_colors = [colors_perfil.get(x, 'gray') for x in sist_pd['perfil_risco']]
axes[0,1].scatter(sist_pd['taxa_extremas'], sist_pd['taxa_afastou'], 
                  s=sist_pd['bc_total']/1e5, alpha=0.6, c=scatter_colors)
axes[0,1].set_xlabel('Taxa de Mudan√ßas Extremas (%)')
axes[0,1].set_ylabel('Taxa de Afastamento da IA (%)')
axes[0,1].set_title('Rela√ß√£o: Volatilidade vs Afastamento\n(tamanho = BC total)')
axes[0,1].axhline(y=50, color='red', linestyle='--', alpha=0.5)
axes[0,1].axvline(x=40, color='red', linestyle='--', alpha=0.5)
axes[0,1].grid(True, alpha=0.3)

# Gr√°fico 3: Top 15 por taxa de extremas
top15_sist = sist_pd.head(15)
sns.barplot(data=top15_sist, y='nm_razao_social', x='taxa_extremas', 
            ax=axes[1,0], palette='Reds_r', orient='h')
axes[1,0].set_title('Top 15: Maior Taxa de Mudan√ßas Extremas')
axes[1,0].set_xlabel('Taxa de Mudan√ßas Extremas (%)')
axes[1,0].set_ylabel('')

# Gr√°fico 4: Empresas em corre√ß√£o
empresas_positivas = sist_pd[sist_pd['perfil_risco'] == 'POSITIVO - Corre√ß√£o Sistem√°tica'].head(10)
if not empresas_positivas.empty:
    sns.barplot(data=empresas_positivas, y='nm_razao_social', x='aproximou_ia', 
                ax=axes[1,1], palette='Greens_r', orient='h')
    axes[1,1].set_title('Top 10: Empresas em Corre√ß√£o Sistem√°tica')
    axes[1,1].set_xlabel('Quantidade de Casos Corrigidos')
    axes[1,1].set_ylabel('')
else:
    axes[1,1].text(0.5, 0.5, 'Sem empresas em\ncorre√ß√£o sistem√°tica', 
                   ha='center', va='center', fontsize=14)
    axes[1,1].set_title('Empresas em Corre√ß√£o Sistem√°tica')

plt.tight_layout()
plt.show()

In [None]:
print("=" * 80)
print("9. AN√ÅLISE DE IMPACTO ARRECADAT√ìRIO POTENCIAL")
print("=" * 80)

# Query de impacto
impacto_arrecadatorio = spark.sql("""
SELECT 
    gtin,
    ncm,
    descricao,
    aliquota_ia,
    COUNT(DISTINCT cnpj_emitente) as qtd_empresas,
    SUM(bc_total_periodo) as bc_total,
    AVG(aliq_emitente_periodo - aliquota_ia) as diff_media,
    
    -- Impacto estimado se todos corrigissem
    SUM(bc_total_periodo * ABS(aliq_emitente_periodo - aliquota_ia)) as impacto_total_estimado,
    
    -- Empresas j√° corrigindo
    COUNT(DISTINCT CASE WHEN movimento_vs_ia = 'APROXIMOU_DA_CORRETA' 
                   THEN cnpj_emitente END) as empresas_corrigindo,
    
    -- Impacto j√° realizado
    SUM(CASE WHEN movimento_vs_ia = 'APROXIMOU_DA_CORRETA' 
        THEN bc_total_periodo * ABS(diferenca_vs_ia_historica) ELSE 0 END) as impacto_realizado
    
FROM dados_argos
WHERE aliquota_ia IS NOT NULL
  AND ABS(aliq_emitente_periodo - aliquota_ia) > 0.01
GROUP BY gtin, ncm, descricao, aliquota_ia
HAVING COUNT(DISTINCT cnpj_emitente) >= 3
ORDER BY impacto_total_estimado DESC
LIMIT 50
""")

impacto_pd = impacto_arrecadatorio.toPandas()
impacto_pd['bc_total'] = impacto_pd['bc_total'].astype(float)
impacto_pd['impacto_total_estimado'] = impacto_pd['impacto_total_estimado'].astype(float)
impacto_pd['impacto_realizado'] = impacto_pd['impacto_realizado'].astype(float)
impacto_pd['impacto_potencial'] = impacto_pd['impacto_total_estimado'] - impacto_pd['impacto_realizado']
impacto_pd['perc_empresas_corrigindo'] = (impacto_pd['empresas_corrigindo'] / impacto_pd['qtd_empresas'] * 100)

# Totais
impacto_total_sistema = impacto_pd['impacto_total_estimado'].sum()
impacto_realizado_sistema = impacto_pd['impacto_realizado'].sum()
impacto_potencial_sistema = impacto_pd['impacto_potencial'].sum()

print(f"\nüí∞ IMPACTO ARRECADAT√ìRIO CONSOLIDADO:")
print(f"  ‚Ä¢ Impacto Total Estimado: R$ {impacto_total_sistema:,.2f}")
print(f"  ‚Ä¢ Impacto J√° Realizado: R$ {impacto_realizado_sistema:,.2f}")
print(f"  ‚Ä¢ Potencial Remanescente: R$ {impacto_potencial_sistema:,.2f}")
print(f"  ‚Ä¢ Taxa de Realiza√ß√£o: {impacto_realizado_sistema/impacto_total_sistema*100:.1f}%")

print(f"\nTOP 20 PRODUTOS POR IMPACTO ARRECADAT√ìRIO:")
for i, row in impacto_pd.head(20).iterrows():
    print(f"\n{i+1}. {row['descricao'][:50]}")
    print(f"   NCM: {row['ncm']} | Empresas: {row['qtd_empresas']}")
    print(f"   Impacto Total: R$ {row['impacto_total_estimado']:,.2f}")
    print(f"   Impacto Realizado: R$ {row['impacto_realizado']:,.2f} ({row['impacto_realizado']/row['impacto_total_estimado']*100:.1f}%)")
    print(f"   Potencial: R$ {row['impacto_potencial']:,.2f}")
    print(f"   Empresas Corrigindo: {row['empresas_corrigindo']}/{row['qtd_empresas']} ({row['perc_empresas_corrigindo']:.1f}%)")

# Visualiza√ß√£o
fig, axes = plt.subplots(2, 2, figsize=(20, 12))
fig.suptitle('An√°lise de Impacto Arrecadat√≥rio', fontsize=18, fontweight='bold')

# Gr√°fico 1: Top 15 por impacto potencial
top15_impacto = impacto_pd.head(15).sort_values('impacto_potencial', ascending=True)
sns.barplot(data=top15_impacto, y='descricao', x=top15_impacto['impacto_potencial']/1e6, 
            ax=axes[0,0], palette='Reds_r', orient='h')
axes[0,0].set_title('Top 15: Maior Potencial Arrecadat√≥rio')
axes[0,0].set_xlabel('Impacto Potencial (Milh√µes R$)')
axes[0,0].set_ylabel('')

# Gr√°fico 2: Impacto realizado vs potencial (Top 15)
top15_comp = impacto_pd.head(15)
x = np.arange(len(top15_comp))
width = 0.35
axes[0,1].barh(x, top15_comp['impacto_realizado']/1e6, width, 
               label='Realizado', color='#2ca02c')
axes[0,1].barh(x + width, top15_comp['impacto_potencial']/1e6, width, 
               label='Potencial', color='#ff7f0e')
axes[0,1].set_yticks(x + width / 2)
axes[0,1].set_yticklabels([x[:30] for x in top15_comp['descricao']])
axes[0,1].set_xlabel('Impacto (Milh√µes R$)')
axes[0,1].set_title('Top 15: Impacto Realizado vs Potencial')
axes[0,1].legend()

# Gr√°fico 3: Scatter - Empresas vs Impacto
axes[1,0].scatter(impacto_pd['qtd_empresas'], impacto_pd['impacto_total_estimado']/1e6, 
                  s=100, alpha=0.6, c=impacto_pd['perc_empresas_corrigindo'], 
                  cmap='RdYlGn', vmin=0, vmax=100)
axes[1,0].set_xlabel('Quantidade de Empresas')
axes[1,0].set_ylabel('Impacto Total Estimado (Milh√µes R$)')
axes[1,0].set_title('Rela√ß√£o: Abrang√™ncia vs Impacto\n(cor = % empresas corrigindo)')
axes[1,0].set_yscale('log')
axes[1,0].grid(True, alpha=0.3)
cbar = plt.colorbar(axes[1,0].collections[0], ax=axes[1,0])
cbar.set_label('% Empresas Corrigindo')

# Gr√°fico 4: Funil de realiza√ß√£o
funil_data = pd.DataFrame({
    'Etapa': ['Impacto\nTotal', 'J√°\nRealizado', 'Potencial\nRemanescente'],
    'Valor': [impacto_total_sistema/1e6, impacto_realizado_sistema/1e6, impacto_potencial_sistema/1e6]
})
cores_funil = ['#1f77b4', '#2ca02c', '#ff7f0e']
bars = axes[1,1].bar(funil_data['Etapa'], funil_data['Valor'], color=cores_funil)
axes[1,1].set_ylabel('Valores (Milh√µes R$)')
axes[1,1].set_title('Funil de Realiza√ß√£o Arrecadat√≥ria')
for bar in bars:
    height = bar.get_height()
    axes[1,1].text(bar.get_x() + bar.get_width()/2., height,
                   f'R$ {height:.1f}M\n({height/funil_data["Valor"][0]*100:.1f}%)',
                   ha='center', va='bottom', fontweight='bold')

plt.tight_layout()
plt.show()

In [None]:
print("=" * 80)
print("10. SISTEMA DE ALERTAS E PRIORIZA√á√ÉO PARA FISCALIZA√á√ÉO")
print("=" * 80)

# Query de alertas
alertas_fiscalizacao = spark.sql("""
WITH empresa_scoring AS (
    SELECT 
        cnpj_emitente,
        nm_razao_social,
        COUNT(*) as total_casos,
        SUM(bc_total_periodo) as bc_total,
        
        -- Componentes do score
        AVG(CASE WHEN classificacao_mudanca = 'MUDANCA_EXTREMA' THEN 40 
                 WHEN classificacao_mudanca = 'MUDANCA_SIGNIFICATIVA' THEN 25 
                 ELSE 10 END) as score_classificacao,
        
        AVG(CASE WHEN movimento_vs_ia = 'AFASTOU_DA_CORRETA' THEN 30
                 WHEN movimento_vs_ia = 'MANTEVE_DISTANCIA' THEN 15
                 ELSE 5 END) as score_movimento,
        
        AVG(CASE WHEN ABS(diferenca_vs_ia_periodo) >= 0.20 THEN 20
                 WHEN ABS(diferenca_vs_ia_periodo) >= 0.10 THEN 15
                 WHEN ABS(diferenca_vs_ia_periodo) >= 0.05 THEN 10
                 ELSE 5 END) as score_magnitude,
        
        -- M√©tricas de comportamento
        SUM(CASE WHEN classificacao_mudanca = 'MUDANCA_EXTREMA' THEN 1 ELSE 0 END) as mudancas_extremas,
        SUM(CASE WHEN movimento_vs_ia = 'AFASTOU_DA_CORRETA' THEN 1 ELSE 0 END) as afastou_ia,
        AVG(diferenca_vs_ia_periodo) as diff_ia_media
        
    FROM dados_argos
    WHERE movimento_vs_ia != 'SEM_REFERENCIA_IA'
    GROUP BY cnpj_emitente, nm_razao_social
    HAVING COUNT(*) >= 5
)
SELECT 
    *,
    (score_classificacao + score_movimento + score_magnitude) as score_total,
    CASE 
        WHEN (score_classificacao + score_movimento + score_magnitude) >= 80 THEN 'EMERGENCIAL'
        WHEN (score_classificacao + score_movimento + score_magnitude) >= 65 THEN 'CR√çTICO'
        WHEN (score_classificacao + score_movimento + score_magnitude) >= 50 THEN 'ALTO'
        WHEN (score_classificacao + score_movimento + score_magnitude) >= 35 THEN 'M√âDIO'
        ELSE 'BAIXO'
    END as nivel_alerta
FROM empresa_scoring
ORDER BY score_total DESC, bc_total DESC
LIMIT 50
""")

alertas_pd = alertas_fiscalizacao.toPandas()
alertas_pd['bc_total'] = alertas_pd['bc_total'].astype(float)

# Distribui√ß√£o de alertas
dist_alertas = alertas_pd['nivel_alerta'].value_counts()

print("\nDISTRIBUI√á√ÉO DE ALERTAS:")
for nivel in ['EMERGENCIAL', 'CR√çTICO', 'ALTO', 'M√âDIO', 'BAIXO']:
    if nivel in dist_alertas.index:
        qtd = dist_alertas[nivel]
        bc_nivel = alertas_pd[alertas_pd['nivel_alerta'] == nivel]['bc_total'].sum()
        print(f"  {nivel:15}: {qtd:3d} empresas | BC: R$ {bc_nivel:,.2f}")

print("\nTOP 20 EMPRESAS PRIORIT√ÅRIAS PARA FISCALIZA√á√ÉO:")
for i, row in alertas_pd.head(20).iterrows():
    print(f"\n{i+1:2d}. [{row['nivel_alerta']}] {row['nm_razao_social'][:45]}")
    print(f"    Score: {row['score_total']:.1f} | BC: R$ {row['bc_total']:,.2f}")
    print(f"    Extremas: {row['mudancas_extremas']} | Afastou IA: {row['afastou_ia']}")
    print(f"    Diff IA M√©dia: {row['diff_ia_media']*100:+.2f}%")

# Visualiza√ß√£o
fig, axes = plt.subplots(2, 2, figsize=(20, 12))
fig.suptitle('Sistema de Alertas e Prioriza√ß√£o para Fiscaliza√ß√£o', fontsize=18, fontweight='bold')

# Gr√°fico 1: Distribui√ß√£o de alertas
cores_alerta = {'EMERGENCIAL': '#8b0000', 'CR√çTICO': '#d62728', 
                'ALTO': '#ff7f0e', 'M√âDIO': '#ffdd70', 'BAIXO': '#2ca02c'}
ordem_alerta = ['EMERGENCIAL', 'CR√çTICO', 'ALTO', 'M√âDIO', 'BAIXO']
dist_alertas_ordenado = dist_alertas.reindex(ordem_alerta, fill_value=0)
palette_alerta = [cores_alerta[x] for x in dist_alertas_ordenado.index]

axes[0,0].pie(dist_alertas_ordenado.values, labels=dist_alertas_ordenado.index, 
              autopct='%1.1f%%', colors=palette_alerta, startangle=90)
axes[0,0].set_title('Distribui√ß√£o de Empresas por N√≠vel de Alerta')

# Gr√°fico 2: Score total (Top 20)
top20_score = alertas_pd.head(20)
cores_empresas = [cores_alerta[x] for x in top20_score['nivel_alerta']]
sns.barplot(data=top20_score, y='nm_razao_social', x='score_total', 
            ax=axes[0,1], palette=cores_empresas, orient='h')
axes[0,1].set_title('Top 20: Score de Risco')
axes[0,1].set_xlabel('Score Total')
axes[0,1].set_ylabel('')

# Gr√°fico 3: Impacto financeiro por n√≠vel
bc_por_nivel = alertas_pd.groupby('nivel_alerta')['bc_total'].sum().reindex(ordem_alerta, fill_value=0)
sns.barplot(x=bc_por_nivel.index, y=bc_por_nivel.values/1e6, 
            ax=axes[1,0], palette=[cores_alerta[x] for x in bc_por_nivel.index])
axes[1,0].set_ylabel('Base de C√°lculo (Milh√µes R$)')
axes[1,0].set_xlabel('N√≠vel de Alerta')
axes[1,0].set_title('Impacto Financeiro por N√≠vel de Alerta')
axes[1,0].tick_params(axis='x', rotation=45)

# Gr√°fico 4: Scatter - Score vs BC
cores_scatter = [cores_alerta[x] for x in alertas_pd['nivel_alerta']]
axes[1,1].scatter(alertas_pd['score_total'], alertas_pd['bc_total']/1e6, 
                  s=alertas_pd['mudancas_extremas']*10, alpha=0.6, c=cores_scatter)
axes[1,1].set_xlabel('Score Total de Risco')
axes[1,1].set_ylabel('Base de C√°lculo (Milh√µes R$)')
axes[1,1].set_title('Rela√ß√£o: Score vs Impacto Financeiro\n(tamanho = mudan√ßas extremas)')
axes[1,1].set_yscale('log')
axes[1,1].grid(True, alpha=0.3)

# Linhas de corte
axes[1,1].axvline(x=80, color='darkred', linestyle='--', alpha=0.5, label='Emergencial')
axes[1,1].axvline(x=65, color='red', linestyle='--', alpha=0.5, label='Cr√≠tico')
axes[1,1].axvline(x=50, color='orange', linestyle='--', alpha=0.5, label='Alto')
axes[1,1].legend()

plt.tight_layout()
plt.show()

In [None]:
print("=" * 80)
print("11. GERA√á√ÉO DE RELAT√ìRIO CONSOLIDADO")
print("=" * 80)

# Criar DataFrame consolidado para exporta√ß√£o
relatorio_completo = spark.sql("""
SELECT 
    a.periodo,
    a.cnpj_emitente,
    a.nm_razao_social,
    a.gtin,
    a.ncm,
    a.descricao,
    ROUND(a.aliq_emitente_periodo * 100, 2) as aliquota_praticada_pct,
    ROUND(a.aliq_emitente_media_historica * 100, 2) as aliquota_historica_pct,
    ROUND(a.aliquota_ia * 100, 2) as aliquota_ia_pct,
    ROUND(a.diferenca_vs_ia_periodo * 100, 2) as diferenca_vs_ia_pct,
    a.classificacao_mudanca,
    a.movimento_vs_ia,
    a.bc_total_periodo,
    
    -- Adicionar informa√ß√µes de alerta se dispon√≠vel
    COALESCE(al.nivel_alerta, 'N√ÉO AVALIADO') as nivel_alerta_empresa,
    COALESCE(al.score_total, 0) as score_risco_empresa
    
FROM dados_argos a
LEFT JOIN (
    SELECT cnpj_emitente, 
           CASE 
               WHEN AVG(CASE WHEN classificacao_mudanca = 'MUDANCA_EXTREMA' THEN 40 ELSE 10 END) >= 30 THEN 'CR√çTICO'
               WHEN AVG(CASE WHEN classificacao_mudanca = 'MUDANCA_EXTREMA' THEN 40 ELSE 10 END) >= 20 THEN 'ALTO'
               ELSE 'M√âDIO'
           END as nivel_alerta,
           AVG(CASE WHEN classificacao_mudanca = 'MUDANCA_EXTREMA' THEN 40 ELSE 10 END) as score_total
    FROM dados_argos
    GROUP BY cnpj_emitente
) al ON a.cnpj_emitente = al.cnpj_emitente
ORDER BY a.cnpj_emitente, a.periodo, a.descricao
""")

# Salvar relat√≥rio (comentado - descomente para salvar)
# relatorio_completo.write.mode('overwrite').parquet('/caminho/para/salvar/relatorio_argos')

print("\n‚úÖ Relat√≥rio consolidado gerado")
print(f"   Total de registros: {relatorio_completo.count():,}")

# Criar resumo executivo em Pandas
resumo_executivo = pd.DataFrame({
    'M√©trica': [
        'Total de Registros Analisados',
        'Empresas Monitoradas',
        'Produtos Distintos',
        'Per√≠odos Analisados',
        'Base de C√°lculo Total (R$)',
        'Taxa de Corre√ß√£o Global (%)',
        'Mudan√ßas Extremas',
        'Empresas Cr√≠ticas',
        'Impacto Arrecadat√≥rio Realizado (R$)',
        'Potencial Arrecadat√≥rio (R$)'
    ],
    'Valor': [
        f"{kpi_data['total_registros']:,}",
        f"{kpi_data['total_empresas']:,}",
        f"{kpi_data['total_produtos']:,}",
        f"{kpi_data['total_periodos']}",
        f"{kpi_data['bc_total_sistema']:,.2f}",
        f"{taxa_correcao_global:.1f}",
        f"{kpi_data['mudancas_extremas']:,}",
        f"{dist_alertas.get('CR√çTICO', 0) + dist_alertas.get('EMERGENCIAL', 0)}",
        f"{impacto_realizado_sistema:,.2f}",
        f"{impacto_potencial_sistema:,.2f}"
    ]
})

print("\n" + "="*80)
print("RESUMO EXECUTIVO - SISTEMA ARGOS")
print("="*80)
print(resumo_executivo.to_string(index=False))

# Visualiza√ß√£o do resumo
fig, ax = plt.subplots(figsize=(14, 8))
ax.axis('tight')
ax.axis('off')

table_data = [resumo_executivo.columns.tolist()] + resumo_executivo.values.tolist()
table = ax.table(cellText=table_data, cellLoc='left', loc='center',
                 colWidths=[0.6, 0.4])

table.auto_set_font_size(False)
table.set_fontsize(11)
table.scale(1, 2.5)

# Estilizar cabe√ßalho
for i in range(len(resumo_executivo.columns)):
    table[(0, i)].set_facecolor('#1f77b4')
    table[(0, i)].set_text_props(weight='bold', color='white')

# Estilizar linhas
for i in range(1, len(table_data)):
    for j in range(len(resumo_executivo.columns)):
        if i % 2 == 0:
            table[(i, j)].set_facecolor('#f0f0f0')

plt.title('RESUMO EXECUTIVO - SISTEMA ARGOS\nAn√°lise de Mudan√ßa de Comportamento Tribut√°rio', 
          fontsize=16, fontweight='bold', pad=20)
plt.show()

print("\n" + "="*80)
print("RECOMENDA√á√ïES ESTRAT√âGICAS:")
print("="*80)
print(f"1. A√á√ÉO IMEDIATA: Fiscalizar {dist_alertas.get('EMERGENCIAL', 0)} empresas emergenciais")
print(f"2. CURTO PRAZO: Auditar {dist_alertas.get('CR√çTICO', 0)} empresas cr√≠ticas")
print(f"3. MONITORAMENTO: Acompanhar {top5_setores['mudancas_extremas'].sum()} casos nos 5 setores mais vol√°teis")
print(f"4. POTENCIAL: Trabalhar para realizar R$ {impacto_potencial_sistema/1e6:.1f}M em corre√ß√µes")
print(f"5. DASHBOARD: Implementar monitoramento cont√≠nuo dos KPIs apresentados")

In [None]:
print("=" * 80)
print("12. AN√ÅLISE COMPARATIVA TEMPORAL - EVOLU√á√ÉO DO COMPORTAMENTO")
print("=" * 80)

# Vers√£o simplificada sem CTEs complexas
comparativo_temporal = spark.sql("""
WITH periodos_ordenados AS (
    SELECT DISTINCT periodo
    FROM niat.argos_mudanca_comportamento
    WHERE periodo >= '202301'
    ORDER BY periodo
),
primeiros_6 AS (
    SELECT periodo FROM periodos_ordenados LIMIT 6
),
ultimos_6 AS (
    SELECT periodo FROM periodos_ordenados ORDER BY periodo DESC LIMIT 6
)
SELECT 
    CASE 
        WHEN periodo IN (SELECT periodo FROM primeiros_6) THEN 'PER√çODO INICIAL'
        WHEN periodo IN (SELECT periodo FROM ultimos_6) THEN 'PER√çODO FINAL'
        ELSE 'PER√çODO INTERMEDI√ÅRIO'
    END as fase,
    
    COUNT(*) as total_casos,
    COUNT(DISTINCT cnpj_emitente) as empresas,
    SUM(bc_total_periodo) as bc_total,
    AVG(diferenca_vs_ia_periodo) as diff_ia_media,
    
    SUM(CASE WHEN classificacao_mudanca = 'MUDANCA_EXTREMA' THEN 1 ELSE 0 END) as mudancas_extremas,
    AVG(CASE WHEN classificacao_mudanca = 'MUDANCA_EXTREMA' THEN 100.0 ELSE 0.0 END) as taxa_extremas,
    
    SUM(CASE WHEN movimento_vs_ia = 'APROXIMOU_DA_CORRETA' THEN 1 ELSE 0 END) as aproximou_ia,
    AVG(CASE WHEN movimento_vs_ia = 'APROXIMOU_DA_CORRETA' THEN 100.0 ELSE 0.0 END) as taxa_correcao

FROM niat.argos_mudanca_comportamento
WHERE movimento_vs_ia != 'SEM_REFERENCIA_IA'
  AND periodo >= '202301'
GROUP BY fase
ORDER BY 
    CASE fase
        WHEN 'PER√çODO INICIAL' THEN 1
        WHEN 'PER√çODO INTERMEDI√ÅRIO' THEN 2
        ELSE 3
    END
""")

comp_pd = comparativo_temporal.toPandas()
comp_pd['bc_total'] = pd.to_numeric(comp_pd['bc_total'], errors='coerce')
comp_pd['diff_ia_media'] = pd.to_numeric(comp_pd['diff_ia_media'], errors='coerce')

print("\nCOMPARATIVO TEMPORAL:")
for _, row in comp_pd.iterrows():
    print(f"\n{row['fase']}:")
    print(f"  Casos: {row['total_casos']:,} | Empresas: {int(row['empresas']):,}")
    print(f"  Taxa de Corre√ß√£o: {row['taxa_correcao']:.1f}%")
    print(f"  Taxa de Extremas: {row['taxa_extremas']:.1f}%")
    print(f"  Diff IA M√©dia: {row['diff_ia_media']*100:+.2f}%")


# Calcular evolu√ß√£o
if len(comp_pd) >= 2:
    inicial = comp_pd[comp_pd['fase'] == 'PER√çODO INICIAL'].iloc[0]
    final = comp_pd[comp_pd['fase'] == 'PER√çODO FINAL'].iloc[0]
    
    evolucao_correcao = final['taxa_correcao'] - inicial['taxa_correcao']
    evolucao_extremas = final['taxa_extremas'] - inicial['taxa_extremas']
    
    print(f"\nüìä EVOLU√á√ÉO DO SISTEMA:")
    print(f"  Varia√ß√£o Taxa Corre√ß√£o: {evolucao_correcao:+.1f} pontos percentuais")
    print(f"  Varia√ß√£o Taxa Extremas: {evolucao_extremas:+.1f} pontos percentuais")
    
    if evolucao_correcao > 0:
        print(f"  ‚úÖ RESULTADO POSITIVO: Aumento na taxa de corre√ß√£o")
    else:
        print(f"  ‚ö†Ô∏è ATEN√á√ÉO: Redu√ß√£o na taxa de corre√ß√£o")

# Visualiza√ß√£o
fig, axes = plt.subplots(2, 2, figsize=(18, 12))
fig.suptitle('An√°lise Comparativa Temporal - Evolu√ß√£o do Sistema', fontsize=18, fontweight='bold')

# Gr√°fico 1: Taxa de corre√ß√£o
axes[0,0].bar(comp_pd['fase'], comp_pd['taxa_correcao'], color=['#ff7f0e', '#1f77b4', '#2ca02c'])
axes[0,0].set_ylabel('Taxa de Corre√ß√£o (%)')
axes[0,0].set_title('Evolu√ß√£o da Taxa de Corre√ß√£o')
axes[0,0].tick_params(axis='x', rotation=15)
axes[0,0].grid(True, alpha=0.3, axis='y')

# Gr√°fico 2: Taxa de extremas
axes[0,1].bar(comp_pd['fase'], comp_pd['taxa_extremas'], color=['#ff7f0e', '#1f77b4', '#d62728'])
axes[0,1].set_ylabel('Taxa de Mudan√ßas Extremas (%)')
axes[0,1].set_title('Evolu√ß√£o das Mudan√ßas Extremas')
axes[0,1].tick_params(axis='x', rotation=15)
axes[0,1].grid(True, alpha=0.3, axis='y')

# Gr√°fico 3: Base de c√°lculo
axes[1,0].bar(comp_pd['fase'], comp_pd['bc_total']/1e6, color=['#ff7f0e', '#1f77b4', '#2ca02c'])
axes[1,0].set_ylabel('Base de C√°lculo (Milh√µes R$)')
axes[1,0].set_title('Evolu√ß√£o do Volume Financeiro')
axes[1,0].tick_params(axis='x', rotation=15)
axes[1,0].grid(True, alpha=0.3, axis='y')

# Gr√°fico 4: Comparativo dual
x = np.arange(len(comp_pd))
width = 0.35
axes[1,1].bar(x - width/2, comp_pd['taxa_correcao'], width, label='Taxa Corre√ß√£o', color='#2ca02c')
axes[1,1].bar(x + width/2, comp_pd['taxa_extremas'], width, label='Taxa Extremas', color='#d62728')
axes[1,1].set_ylabel('Percentual (%)')
axes[1,1].set_title('Comparativo: Corre√ß√£o vs Extremas')
axes[1,1].set_xticks(x)
axes[1,1].set_xticklabels(comp_pd['fase'], rotation=15)
axes[1,1].legend()
axes[1,1].grid(True, alpha=0.3, axis='y')

plt.tight_layout()
plt.show()

In [None]:
print("=" * 80)
print("FINALIZA√á√ÉO DA AN√ÅLISE")
print("=" * 80)

# Estat√≠sticas de processamento
tempo_total = datetime.now()
print(f"\n‚úÖ An√°lise conclu√≠da com sucesso!")
print(f"   Data/Hora: {tempo_total.strftime('%Y-%m-%d %H:%M:%S')}")

# Liberar cache
df_argos.unpersist()

print("\nüìä VIEWS SPARK DISPON√çVEIS:")
print("   ‚Ä¢ dados_argos - Dados principais")
print("   ‚Ä¢ relatorio_completo - Relat√≥rio consolidado")

print("\nüíæ DATAFRAMES PANDAS DISPON√çVEIS:")
print("   ‚Ä¢ dist_class_pd - Distribui√ß√£o por classifica√ß√£o")
print("   ‚Ä¢ dist_mov_pd - Distribui√ß√£o por movimento")
print("   ‚Ä¢ evolucao_pd - Evolu√ß√£o temporal")
print("   ‚Ä¢ ranking_pd - Ranking de empresas")
print("   ‚Ä¢ setorial_pd - An√°lise setorial")
print("   ‚Ä¢ produtos_pd - Produtos vol√°teis")
print("   ‚Ä¢ impacto_pd - Impacto arrecadat√≥rio")
print("   ‚Ä¢ alertas_pd - Sistema de alertas")
print("   ‚Ä¢ resumo_executivo - Resumo executivo")

print("\nüéØ PR√ìXIMOS PASSOS RECOMENDADOS:")
print("   1. Exportar dados para dashboard de monitoramento")
print("   2. Iniciar fiscaliza√ß√£o das empresas emergenciais")
print("   3. Agendar auditorias das empresas cr√≠ticas")
print("   4. Implementar alertas autom√°ticos por per√≠odo")
print("   5. Criar relat√≥rios mensais de acompanhamento")

print("\n" + "="*80)
print("SISTEMA ARGOS - AN√ÅLISE COMPLETA FINALIZADA")
print("="*80)