In [None]:
import sys
sys.path.append("/home/tsevero/notebooks/SAT_BIG_DATA/data-pipeline/batch/poc")
sys.path.append("/home/tsevero/notebooks/SAT_BIG_DATA/data-pipeline/batch/plugins")
sys.path.append("/home/tsevero/notebooks/SAT_BIG_DATA/data-pipeline/batch/dags")

#Import libs python
from pyspark.sql.types import *
from pyspark.sql.functions import *
from datetime import date

#Import libs internas
from utils import spark_utils_session as utils

from hooks.hdfs.hdfs_helper import HdfsHelper
from jobs.job_base_config import BaseETLJobClass

import poc_helper
poc_helper.load_env("PROD")

In [None]:
def get_session(profile: str, dynamic_allocation_enabled: bool = True) -> utils.DBASparkAppSession:
    """Generates DBASparkAppSession."""
    
    app_name = "tsevero_setores"
    
    
    spark_builder = (utils.DBASparkAppSession
                     .builder
                     .setAppName(app_name)
                     .usingProcessProfile(profile)
                    )
    
    if dynamic_allocation_enabled:
        spark_builder.autoResourceManagement()

    return spark_builder.build()

session = get_session(profile='efd_t2')

In [None]:
session.sparkSession.sql("SHOW DATABASES").show(truncate=False)

In [None]:
# =============================================================================
# SISTEMA DE AN√ÅLISE TRIBUT√ÅRIA SETORIAL v4.0 - AN√ÅLISE COMPLETA COM ML
# Receita Estadual de Santa Catarina
# =============================================================================
# Este script realiza an√°lises avan√ßadas usando Spark SQL para processamento
# e bibliotecas Python (Pandas, Matplotlib, Seaborn, Plotly, Scikit-learn) 
# para visualiza√ß√µes e machine learning.
# =============================================================================

from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.window import Window
from datetime import datetime
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import plotly.express as px
import plotly.graph_objects as go
from plotly.subplots import make_subplots
import warnings

# Sklearn para ML
from sklearn.preprocessing import StandardScaler
from sklearn.cluster import KMeans, DBSCAN
from sklearn.decomposition import PCA
from sklearn.ensemble import IsolationForest, RandomForestRegressor
from sklearn.linear_model import LinearRegression
from sklearn.metrics import silhouette_score, davies_bouldin_score

# =============================================================================
# PREVEN√á√ÉO DE CONFLITOS: Python Built-ins vs PySpark
# =============================================================================
import builtins
import numpy as np

# Salvar refer√™ncias √†s fun√ß√µes built-in que conflitam com PySpark
max_builtin = builtins.max
min_builtin = builtins.min
abs_builtin = builtins.abs
sum_builtin = builtins.sum
round_builtin = builtins.round

print("‚úÖ Fun√ß√µes built-in protegidas contra conflitos com PySpark")
print("   Use: max_builtin(), min_builtin(), abs_builtin(), sum_builtin(), round_builtin()")
print("   Ou use as vers√µes do numpy: np.abs(), np.max(), np.min(), etc.")

# Configura√ß√µes visuais
plt.style.use('seaborn-v0_8-darkgrid')
sns.set_palette("husl")
warnings.filterwarnings('ignore')
plt.rcParams['figure.figsize'] = (16, 8)
plt.rcParams['font.size'] = 11

print("=" * 80)
print("SISTEMA DE AN√ÅLISE TRIBUT√ÅRIA SETORIAL v4.0 - AN√ÅLISE COMPLETA COM ML")
print("=" * 80)
print(f"Iniciado em: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")

# Verificar sess√£o Spark
try:
    if 'session' in locals():
        spark = session.sparkSession
        print(f"‚úÖ Sess√£o Spark dispon√≠vel: {spark.sparkContext.appName}")
    elif 'spark' in locals():
        print(f"‚úÖ Sess√£o Spark dispon√≠vel")
    else:
        raise NameError("Sess√£o Spark n√£o encontrada.")
except Exception as e:
    print(f"‚ùå ERRO: {e}")
    print("Execute primeiro o c√≥digo de configura√ß√£o da sess√£o Spark.")
    raise

# =============================================================================
# PARTE 1: CARREGAMENTO E PREPARA√á√ÉO DOS DADOS
# =============================================================================
print("\n" + "=" * 80)
print("PARTE 1: CARREGAMENTO E PREPARA√á√ÉO DOS DADOS")
print("=" * 80)

# Carregar dados das views principais
print("\nüìä Carregando dados das views do sistema...")

try:
    # Benchmark Setorial
    df_benchmark = spark.table("niat.argos_benchmark_setorial")
    total_benchmark = df_benchmark.count()
    print(f"‚úÖ Benchmark Setorial: {total_benchmark:,} registros")
    
    # Empresas vs Benchmark
    df_empresas = spark.table("niat.argos_empresa_vs_benchmark")
    total_empresas = df_empresas.count()
    print(f"‚úÖ Empresas vs Benchmark: {total_empresas:,} registros")
    
    # Evolu√ß√£o Temporal Setorial
    df_evolucao_setor = spark.table("niat.argos_evolucao_temporal_setor")
    total_evo_setor = df_evolucao_setor.count()
    print(f"‚úÖ Evolu√ß√£o Setorial: {total_evo_setor:,} setores")
    
    # Alertas
    df_alertas = spark.table("niat.argos_alertas_empresas")
    total_alertas = df_alertas.count()
    print(f"‚úÖ Alertas: {total_alertas:,} registros")
    
except Exception as e:
    print(f"‚ùå ERRO ao carregar dados: {e}")
    print("Certifique-se de que o script SQL v4.0 foi executado completamente.")
    raise



In [None]:
# =============================================================================
# PARTE 2: AN√ÅLISE EXPLORAT√ìRIA E ESTAT√çSTICAS DESCRITIVAS
# =============================================================================
print("\n" + "=" * 80)
print("PARTE 2: AN√ÅLISE EXPLORAT√ìRIA E ESTAT√çSTICAS DESCRITIVAS")
print("=" * 80)

# 2.1. Estat√≠sticas Gerais do Sistema
print("\nüìà 2.1. ESTAT√çSTICAS GERAIS DO SISTEMA")
print("-" * 80)

stats_gerais = spark.sql("""
SELECT 
    COUNT(DISTINCT b.cnae_classe) AS total_setores,
    COUNT(DISTINCT e.nu_cnpj) AS total_empresas,
    COUNT(DISTINCT e.nu_per_ref) AS total_periodos,
    ROUND(SUM(e.vl_faturamento) / 1e9, 2) AS faturamento_total_bilhoes,
    ROUND(SUM(e.icms_devido) / 1e9, 2) AS icms_devido_bilhoes,
    ROUND(AVG(e.aliq_efetiva) * 100, 2) AS aliq_media_sistema_pct,
    COUNT(DISTINCT CASE WHEN a.severidade = 'CRITICO' THEN e.nu_cnpj END) AS empresas_risco_critico
FROM niat.argos_empresas e
LEFT JOIN niat.argos_benchmark_setorial b ON e.cnae_classe = b.cnae_classe AND e.nu_per_ref = b.nu_per_ref
LEFT JOIN niat.argos_alertas_empresas a ON e.nu_cnpj = a.nu_cnpj AND e.nu_per_ref = a.nu_per_ref
WHERE e.nu_per_ref = (SELECT MAX(nu_per_ref) FROM niat.argos_empresas)
""").toPandas()

print("\nM√âTRICAS PRINCIPAIS:")
for col in stats_gerais.columns:
    valor = stats_gerais[col].iloc[0]
    print(f"  ‚Ä¢ {col}: {valor:,.2f}" if isinstance(valor, float) else f"  ‚Ä¢ {col}: {valor:,}")

# 2.2. Distribui√ß√£o por Porte
print("\nüìä 2.2. DISTRIBUI√á√ÉO POR PORTE EMPRESARIAL")
print("-" * 80)

dist_porte = spark.sql("""
SELECT 
    porte_empresa,
    COUNT(DISTINCT nu_cnpj) AS qtd_empresas,
    ROUND(AVG(vl_faturamento), 2) AS faturamento_medio,
    ROUND(AVG(aliq_efetiva) * 100, 2) AS aliq_media_pct
FROM niat.argos_empresas
WHERE nu_per_ref = (SELECT MAX(nu_per_ref) FROM niat.argos_empresas)
  AND porte_empresa != 'SEM_FATURAMENTO'
GROUP BY porte_empresa
ORDER BY 
    CASE porte_empresa
        WHEN 'MICRO' THEN 1
        WHEN 'PEQUENO' THEN 2
        WHEN 'MEDIO' THEN 3
        WHEN 'GRANDE' THEN 4
    END
""").toPandas()

print(dist_porte.to_string(index=False))

# Visualiza√ß√£o: Distribui√ß√£o por Porte
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(18, 7))
fig.suptitle('Distribui√ß√£o por Porte Empresarial', fontsize=16, fontweight='bold')

sns.barplot(data=dist_porte, x='porte_empresa', y='qtd_empresas', ax=ax1, palette='viridis')
ax1.set_title('Quantidade de Empresas por Porte')
ax1.set_xlabel('Porte')
ax1.set_ylabel('N√∫mero de Empresas')
ax1.bar_label(ax1.containers[0], fmt='%d')

sns.barplot(data=dist_porte, x='porte_empresa', y='aliq_media_pct', ax=ax2, palette='plasma')
ax2.set_title('Al√≠quota M√©dia por Porte (%)')
ax2.set_xlabel('Porte')
ax2.set_ylabel('Al√≠quota M√©dia (%)')
ax2.bar_label(ax2.containers[0], fmt='%.2f%%')

plt.tight_layout()
plt.show()


In [None]:
# =============================================================================
# PARTE 3: VISUALIZA√á√ïES AVAN√áADAS
# =============================================================================
print("\n" + "=" * 80)
print("PARTE 3: VISUALIZA√á√ïES AVAN√áADAS")
print("=" * 80)

# 3.1. Evolu√ß√£o Temporal por Setor (Top 10)
print("\nüìà 3.1. EVOLU√á√ÉO TEMPORAL DOS TOP 10 SETORES")
print("-" * 80)

evolucao_top_setores = spark.sql("""
WITH top_setores AS (
    SELECT cnae_classe
    FROM niat.argos_evolucao_temporal_setor
    ORDER BY faturamento_acumulado_8m DESC
    LIMIT 10
)
SELECT 
    b.nu_per_ref,
    b.cnae_classe,
    b.desc_cnae_classe,
    ROUND(b.aliq_efetiva_mediana * 100, 2) AS aliq_mediana_pct,
    ROUND(b.faturamento_total / 1e6, 2) AS faturamento_milhoes
FROM niat.argos_benchmark_setorial b
INNER JOIN top_setores ts ON b.cnae_classe = ts.cnae_classe
ORDER BY b.cnae_classe, b.nu_per_ref
""").toPandas()

# Converter per√≠odo para datetime
evolucao_top_setores['periodo_dt'] = pd.to_datetime(evolucao_top_setores['nu_per_ref'], format='%Y%m')

# Converter Decimal para float
evolucao_top_setores['aliq_mediana_pct'] = evolucao_top_setores['aliq_mediana_pct'].astype(float)
evolucao_top_setores['faturamento_milhoes'] = evolucao_top_setores['faturamento_milhoes'].astype(float)

# Gr√°fico interativo com Plotly
fig = px.line(evolucao_top_setores, 
              x='periodo_dt', 
              y='aliq_mediana_pct',
              color='desc_cnae_classe',
              title='Evolu√ß√£o Temporal das Al√≠quotas Medianas - Top 10 Setores',
              labels={'periodo_dt': 'Per√≠odo', 
                     'aliq_mediana_pct': 'Al√≠quota Mediana (%)',
                     'desc_cnae_classe': 'Setor'},
              hover_data=['faturamento_milhoes'])
fig.update_layout(height=600, hovermode='x unified')
fig.show()

# 3.2. Mapa de Calor: Al√≠quotas por CNAE e Per√≠odo
print("\nüî• 3.2. MAPA DE CALOR: AL√çQUOTAS POR CNAE")
print("-" * 80)

heatmap_data = spark.sql("""
SELECT 
    cnae_classe,
    desc_cnae_classe,
    nu_per_ref,
    ROUND(aliq_efetiva_mediana * 100, 2) AS aliq_mediana_pct
FROM niat.argos_benchmark_setorial
WHERE cnae_classe IN (
    SELECT cnae_classe 
    FROM niat.argos_evolucao_temporal_setor 
    ORDER BY faturamento_acumulado_8m DESC 
    LIMIT 15
)
ORDER BY cnae_classe, nu_per_ref
""").toPandas()

# Pivotar dados para matriz
heatmap_pivot = heatmap_data.pivot(index='desc_cnae_classe', columns='nu_per_ref', values='aliq_mediana_pct')

# Converter Decimal para float
heatmap_pivot = heatmap_pivot.astype(float)

plt.figure(figsize=(16, 10))
sns.heatmap(heatmap_pivot, annot=True, fmt='.1f', cmap='YlOrRd', linewidths=0.5, cbar_kws={'label': 'Al√≠quota Mediana (%)'})
plt.title('Mapa de Calor: Al√≠quotas Medianas por Setor e Per√≠odo', fontsize=16, fontweight='bold')
plt.xlabel('Per√≠odo')
plt.ylabel('Setor (CNAE)')
plt.xticks(rotation=45)
plt.tight_layout()
plt.show()

# 3.3. Distribui√ß√£o de Empresas por Quartil de Al√≠quota
print("\nüìä 3.3. DISTRIBUI√á√ÉO POR QUARTIL DE AL√çQUOTA")
print("-" * 80)

quartis_data = spark.sql("""
SELECT 
    CASE 
        WHEN aliq_efetiva_empresa < aliq_setor_p25 THEN 'Q1 (Abaixo P25)'
        WHEN aliq_efetiva_empresa < aliq_setor_mediana THEN 'Q2 (P25-P50)'
        WHEN aliq_efetiva_empresa < aliq_setor_p75 THEN 'Q3 (P50-P75)'
        ELSE 'Q4 (Acima P75)'
    END AS quartil,
    COUNT(*) AS qtd_empresas,
    ROUND(AVG(vl_faturamento) / 1e6, 2) AS faturamento_medio_milhoes
FROM niat.argos_empresa_vs_benchmark
WHERE nu_per_ref = (SELECT MAX(nu_per_ref) FROM niat.argos_empresa_vs_benchmark)
  AND aliq_efetiva_empresa IS NOT NULL
GROUP BY 1
ORDER BY 1
""").toPandas()

print(quartis_data.to_string(index=False))

# Converter Decimal para float
quartis_data['qtd_empresas'] = quartis_data['qtd_empresas'].astype(int)
quartis_data['faturamento_medio_milhoes'] = quartis_data['faturamento_medio_milhoes'].astype(float)

# Visualiza√ß√£o
fig, ax = plt.subplots(figsize=(14, 7))
x = np.arange(len(quartis_data))
width = 0.35

bars1 = ax.bar(x - width/2, quartis_data['qtd_empresas'], width, label='Quantidade', color='steelblue')
ax2 = ax.twinx()
bars2 = ax2.bar(x + width/2, quartis_data['faturamento_medio_milhoes'], width, label='Fat. M√©dio', color='coral')

ax.set_xlabel('Quartil de Al√≠quota')
ax.set_ylabel('Quantidade de Empresas', color='steelblue')
ax2.set_ylabel('Faturamento M√©dio (Milh√µes R$)', color='coral')
ax.set_title('Distribui√ß√£o de Empresas por Quartil de Al√≠quota', fontsize=16, fontweight='bold')
ax.set_xticks(x)
ax.set_xticklabels(quartis_data['quartil'])
ax.legend(loc='upper left')
ax2.legend(loc='upper right')
ax.bar_label(bars1, fmt='%d')
ax2.bar_label(bars2, fmt='%.1f')

plt.tight_layout()
plt.show()

# 3.4. An√°lise de Diverg√™ncias ICMS vs Pagamentos
print("\nüí∞ 3.4. AN√ÅLISE DE DIVERG√äNCIAS ICMS vs PAGAMENTOS")
print("-" * 80)

divergencias = spark.sql("""
SELECT 
    CASE
        WHEN icms_recolher > 100 AND valor_total_pago = 0 THEN 'Sem Pagamento'
        WHEN ABS(icms_recolher - valor_total_pago) / NULLIF(icms_recolher, 0) > 0.5 THEN 'Diverg√™ncia Extrema (>50%)'
        WHEN ABS(icms_recolher - valor_total_pago) / NULLIF(icms_recolher, 0) > 0.3 THEN 'Diverg√™ncia Alta (30-50%)'
        WHEN ABS(icms_recolher - valor_total_pago) / NULLIF(icms_recolher, 0) > 0.1 THEN 'Diverg√™ncia M√©dia (10-30%)'
        ELSE 'Normal (<10%)'
    END AS tipo_divergencia,
    COUNT(*) AS qtd_empresas,
    ROUND(SUM(icms_recolher) / 1e6, 2) AS icms_total_milhoes,
    ROUND(SUM(valor_total_pago) / 1e6, 2) AS pagto_total_milhoes
FROM niat.argos_empresa_vs_benchmark
WHERE nu_per_ref = (SELECT MAX(nu_per_ref) FROM niat.argos_empresa_vs_benchmark)
  AND icms_recolher IS NOT NULL
GROUP BY 1
ORDER BY 
    CASE tipo_divergencia
        WHEN 'Sem Pagamento' THEN 1
        WHEN 'Diverg√™ncia Extrema (>50%)' THEN 2
        WHEN 'Diverg√™ncia Alta (30-50%)' THEN 3
        WHEN 'Diverg√™ncia M√©dia (10-30%)' THEN 4
        ELSE 5
    END
""").toPandas()

print(divergencias.to_string(index=False))

# Converter Decimal para float
divergencias['icms_total_milhoes'] = divergencias['icms_total_milhoes'].astype(float)
divergencias['pagto_total_milhoes'] = divergencias['pagto_total_milhoes'].astype(float)

# Visualiza√ß√£o com Plotly
fig = go.Figure()
fig.add_trace(go.Bar(name='ICMS a Recolher', x=divergencias['tipo_divergencia'], y=divergencias['icms_total_milhoes'], marker_color='indianred'))
fig.add_trace(go.Bar(name='Pagamentos Realizados', x=divergencias['tipo_divergencia'], y=divergencias['pagto_total_milhoes'], marker_color='lightseagreen'))
fig.update_layout(
    title='An√°lise de Diverg√™ncias: ICMS a Recolher vs Pagamentos Realizados',
    xaxis_title='Tipo de Diverg√™ncia',
    yaxis_title='Valor Total (Milh√µes R$)',
    barmode='group',
    height=600
)
fig.show()

# 3.5. Ranking de Setores com Anomalias
print("\n‚ö†Ô∏è 3.5. RANKING DE SETORES COM ANOMALIAS")
print("-" * 80)

anomalias_ranking = spark.sql("""
SELECT 
    ans.cnae_classe,
    ans.desc_cnae_classe,
    ans.tipo_anomalia,
    ans.severidade,
    ROUND(ans.score_relevancia, 1) AS score,
    ans.qtd_empresas_total,
    ROUND(ans.faturamento_total / 1e9, 2) AS faturamento_bilhoes,
    ROUND(ans.aliq_setor * 100, 2) AS aliq_setor_pct,
    ROUND(ans.aliq_mediana_economia * 100, 2) AS aliq_economia_pct
FROM niat.argos_anomalias_setoriais ans
WHERE nu_per_ref = (SELECT MAX(nu_per_ref) FROM niat.argos_anomalias_setoriais)
ORDER BY ans.score_relevancia DESC
LIMIT 20
""").toPandas()

print(anomalias_ranking.to_string(index=False))

# Converter Decimal para float
anomalias_ranking['faturamento_bilhoes'] = anomalias_ranking['faturamento_bilhoes'].astype(float)
anomalias_ranking['score'] = anomalias_ranking['score'].astype(float)
anomalias_ranking['aliq_setor_pct'] = anomalias_ranking['aliq_setor_pct'].astype(float)

# Visualiza√ß√£o
fig = px.scatter(anomalias_ranking,
                x='faturamento_bilhoes',
                y='score',
                size='qtd_empresas_total',
                color='severidade',
                hover_name='desc_cnae_classe',
                hover_data=['tipo_anomalia', 'aliq_setor_pct'],
                title='Setores com Anomalias: Score vs Faturamento',
                labels={'faturamento_bilhoes': 'Faturamento (Bilh√µes R$)',
                       'score': 'Score de Relev√¢ncia',
                       'severidade': 'Severidade'},
                color_discrete_map={'ALTA': '#d62728', 'MEDIA': '#ff7f0e', 'BAIXA': '#2ca02c'},
                height=600)
fig.update_xaxes(type='log')
fig.show()


In [None]:
# =============================================================================
# PARTE 4: AN√ÅLISES ESTAT√çSTICAS AVAN√áADAS COM MACHINE LEARNING
# =============================================================================
print("\n" + "=" * 80)
print("PARTE 4: AN√ÅLISES ESTAT√çSTICAS AVAN√áADAS COM MACHINE LEARNING")
print("=" * 80)

# 4.1. Clustering de Setores Similares
print("\nüî¨ 4.1. CLUSTERING DE SETORES SIMILARES (K-MEANS)")
print("-" * 80)

# Preparar dados para clustering
df_clustering = spark.sql("""
SELECT 
    cnae_classe,
    desc_cnae_classe,
    aliq_mediana_media_8m,
    coef_variacao_temporal,
    faturamento_acumulado_8m,
    icms_devido_acumulado_8m,
    media_empresas_mes
FROM niat.argos_evolucao_temporal_setor
WHERE aliq_mediana_media_8m IS NOT NULL
  AND coef_variacao_temporal IS NOT NULL
""").toPandas()

# Converter Decimal para float
for col in df_clustering.select_dtypes(include=['object']).columns:
    try:
        df_clustering[col] = df_clustering[col].astype(float)
    except:
        pass  # Manter como string se n√£o for num√©rico

# Features para clustering
features_cluster = ['aliq_mediana_media_8m', 'coef_variacao_temporal', 
                   'faturamento_acumulado_8m', 'icms_devido_acumulado_8m', 'media_empresas_mes']

# Normaliza√ß√£o
scaler = StandardScaler()
X_scaled = scaler.fit_transform(df_clustering[features_cluster].fillna(0))

# Determinar n√∫mero √≥timo de clusters (m√©todo Elbow)
inertias = []
silhouette_scores = []
K_range = range(2, 11)

for k in K_range:
    kmeans = KMeans(n_clusters=k, random_state=42, n_init=10)
    kmeans.fit(X_scaled)
    inertias.append(kmeans.inertia_)
    silhouette_scores.append(silhouette_score(X_scaled, kmeans.labels_))

# Visualizar m√©todo Elbow
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(16, 6))
fig.suptitle('Determina√ß√£o do N√∫mero √ìtimo de Clusters', fontsize=16, fontweight='bold')

ax1.plot(K_range, inertias, 'bo-')
ax1.set_xlabel('N√∫mero de Clusters (k)')
ax1.set_ylabel('In√©rcia')
ax1.set_title('M√©todo Elbow')
ax1.grid(True)

ax2.plot(K_range, silhouette_scores, 'ro-')
ax2.set_xlabel('N√∫mero de Clusters (k)')
ax2.set_ylabel('Silhouette Score')
ax2.set_title('Silhouette Score por k')
ax2.grid(True)

plt.tight_layout()
plt.show()

# Escolher k √≥timo (maior silhouette score) - CORRE√á√ÉO AQUI
k_otimo = K_range[np.argmax(silhouette_scores)]
max_silhouette = silhouette_scores[np.argmax(silhouette_scores)]  # Armazenar o valor antes
print(f"\n‚úÖ N√∫mero √≥timo de clusters: {k_otimo} (Silhouette Score: {max_silhouette:.3f})")

# Aplicar K-Means com k √≥timo
kmeans_final = KMeans(n_clusters=k_otimo, random_state=42, n_init=10)
df_clustering['cluster'] = kmeans_final.fit_predict(X_scaled)

# An√°lise dos clusters
print(f"\nCARACTER√çSTICAS DOS {k_otimo} CLUSTERS IDENTIFICADOS:")
print("-" * 80)

for cluster_id in range(k_otimo):
    cluster_data = df_clustering[df_clustering['cluster'] == cluster_id]
    print(f"\nCluster {cluster_id} ({len(cluster_data)} setores):")
    print(f"  Al√≠quota M√©dia: {cluster_data['aliq_mediana_media_8m'].mean():.4f}")
    print(f"  Volatilidade M√©dia: {cluster_data['coef_variacao_temporal'].mean():.3f}")
    print(f"  Faturamento M√©dio: R$ {cluster_data['faturamento_acumulado_8m'].mean()/1e9:.2f} bilh√µes")
    print(f"  Setores representativos:")
    for setor in cluster_data.nlargest(3, 'faturamento_acumulado_8m')['desc_cnae_classe'].values:
        print(f"    ‚Ä¢ {setor}")

# Visualiza√ß√£o com PCA
pca = PCA(n_components=2)
X_pca = pca.fit_transform(X_scaled)

plt.figure(figsize=(14, 8))
scatter = plt.scatter(X_pca[:, 0], X_pca[:, 1], c=df_clustering['cluster'], 
                     cmap='viridis', s=100, alpha=0.6, edgecolors='w', linewidth=0.5)
plt.colorbar(scatter, label='Cluster')
plt.xlabel(f'PC1 ({pca.explained_variance_ratio_[0]:.1%} vari√¢ncia)')
plt.ylabel(f'PC2 ({pca.explained_variance_ratio_[1]:.1%} vari√¢ncia)')
plt.title('Clustering de Setores Similares (K-Means + PCA)', fontsize=16, fontweight='bold')
plt.grid(True, alpha=0.3)
plt.tight_layout()
plt.show()

# 4.2. Detec√ß√£o de Outliers por Setor (Isolation Forest)
print("\nüîç 4.2. DETEC√á√ÉO DE OUTLIERS POR SETOR (ISOLATION FOREST)")
print("-" * 80)

# Preparar dados
df_outliers = spark.sql("""
SELECT 
    cnae_classe,
    desc_cnae_classe,
    aliq_mediana_media_8m,
    coef_variacao_temporal,
    LOG(faturamento_acumulado_8m + 1) AS log_faturamento,
    aliq_mediana_max_8m - aliq_mediana_min_8m AS amplitude_aliq
FROM niat.argos_evolucao_temporal_setor
WHERE aliq_mediana_media_8m IS NOT NULL
""").toPandas()

# Converter Decimal para float
for col in ['aliq_mediana_media_8m', 'coef_variacao_temporal', 'log_faturamento', 'amplitude_aliq']:
    df_outliers[col] = df_outliers[col].astype(float)

features_outliers = ['aliq_mediana_media_8m', 'coef_variacao_temporal', 'log_faturamento', 'amplitude_aliq']
X_outliers = df_outliers[features_outliers].fillna(0)

# Isolation Forest
iso_forest = IsolationForest(contamination=0.1, random_state=42)
df_outliers['outlier'] = iso_forest.fit_predict(X_outliers)
df_outliers['outlier_score'] = iso_forest.score_samples(X_outliers)

outliers_detectados = df_outliers[df_outliers['outlier'] == -1].sort_values('outlier_score')

print(f"\n‚ö†Ô∏è {len(outliers_detectados)} setores outliers detectados:")
print("-" * 80)
print(outliers_detectados[['desc_cnae_classe', 'aliq_mediana_media_8m', 'coef_variacao_temporal', 'outlier_score']].head(15).to_string(index=False))

# Visualiza√ß√£o
fig, ax = plt.subplots(figsize=(14, 8))
normal = df_outliers[df_outliers['outlier'] == 1]
outliers = df_outliers[df_outliers['outlier'] == -1]

ax.scatter(normal['coef_variacao_temporal'], normal['aliq_mediana_media_8m'], 
          c='blue', label='Normal', alpha=0.6, s=80)
ax.scatter(outliers['coef_variacao_temporal'], outliers['aliq_mediana_media_8m'], 
          c='red', label='Outlier', alpha=0.8, s=120, marker='^')

ax.set_xlabel('Coeficiente de Varia√ß√£o Temporal')
ax.set_ylabel('Al√≠quota Mediana M√©dia (8 meses)')
ax.set_title('Detec√ß√£o de Setores Outliers (Isolation Forest)', fontsize=16, fontweight='bold')
ax.legend()
ax.grid(True, alpha=0.3)
plt.tight_layout()
plt.show()

# 4.3. Previs√£o de Tend√™ncias de Al√≠quotas (Regress√£o Linear)
print("\nüìà 4.3. PREVIS√ÉO DE TEND√äNCIAS DE AL√çQUOTAS")
print("-" * 80)

# Selecionar top 5 setores para previs√£o
top_5_setores = spark.sql("""
SELECT cnae_classe
FROM niat.argos_evolucao_temporal_setor
ORDER BY faturamento_acumulado_8m DESC
LIMIT 5
""").toPandas()['cnae_classe'].tolist()

# Preparar dados de s√©ries temporais
df_series = spark.sql(f"""
SELECT 
    cnae_classe,
    desc_cnae_classe,
    nu_per_ref,
    ROUND(aliq_efetiva_mediana * 100, 2) AS aliq_mediana_pct
FROM niat.argos_benchmark_setorial
WHERE cnae_classe IN ('{"','".join(top_5_setores)}')
ORDER BY cnae_classe, nu_per_ref
""").toPandas()

# Converter para tipos corretos
df_series['aliq_mediana_pct'] = df_series['aliq_mediana_pct'].astype(float)
df_series['periodo_dt'] = pd.to_datetime(df_series['nu_per_ref'], format='%Y%m')
df_series['periodo_num'] = (df_series['periodo_dt'] - df_series['periodo_dt'].min()).dt.days

# Previs√£o para cada setor
fig, axes = plt.subplots(3, 2, figsize=(18, 14))
fig.suptitle('Previs√£o de Tend√™ncias de Al√≠quotas - Top 5 Setores', fontsize=16, fontweight='bold')
axes = axes.flatten()

for idx, setor in enumerate(top_5_setores):
    df_setor = df_series[df_series['cnae_classe'] == setor].copy()
    
    # Treinar modelo
    X = df_setor[['periodo_num']].values
    y = df_setor['aliq_mediana_pct'].values
    
    model = LinearRegression()
    model.fit(X, y)
    
    # Prever pr√≥ximos 6 meses
    max_periodo = df_setor['periodo_num'].max()
    periodos_futuros = np.arange(max_periodo, max_periodo + 180, 30).reshape(-1, 1)
    previsoes = model.predict(periodos_futuros)
    
    # Plotar
    ax = axes[idx]
    ax.plot(df_setor['periodo_dt'], df_setor['aliq_mediana_pct'], 'o-', label='Hist√≥rico', linewidth=2)
    
    datas_futuras = pd.date_range(start=df_setor['periodo_dt'].max() + pd.DateOffset(months=1), periods=6, freq='MS')
    ax.plot(datas_futuras, previsoes, 's--', label='Previs√£o', linewidth=2, color='red', alpha=0.7)
    
    ax.set_title(df_setor['desc_cnae_classe'].iloc[0][:40], fontsize=10)
    ax.set_xlabel('Per√≠odo')
    ax.set_ylabel('Al√≠quota Mediana (%)')
    ax.legend()
    ax.grid(True, alpha=0.3)
    
    # R¬≤ score
    r2 = model.score(X, y)
    ax.text(0.02, 0.98, f'R¬≤ = {r2:.3f}', transform=ax.transAxes, 
           verticalalignment='top', bbox=dict(boxstyle='round', facecolor='wheat', alpha=0.5))

# Remover subplot extra
if len(top_5_setores) < 6:
    fig.delaxes(axes[5])

plt.tight_layout()
plt.show()

# 4.4. An√°lise de Correla√ß√£o entre Vari√°veis
print("\nüîó 4.4. AN√ÅLISE DE CORRELA√á√ÉO ENTRE VARI√ÅVEIS")
print("-" * 80)

# Preparar dados
df_correlacao = spark.sql("""
SELECT 
    aliq_mediana_media_8m AS aliq_media,
    coef_variacao_temporal AS volatilidade,
    LOG(faturamento_acumulado_8m + 1) AS log_faturamento,
    LOG(icms_devido_acumulado_8m + 1) AS log_icms,
    media_empresas_mes AS empresas
FROM niat.argos_evolucao_temporal_setor
WHERE aliq_mediana_media_8m IS NOT NULL
""").toPandas()

# Matriz de correla√ß√£o
correlation_matrix = df_correlacao.corr()

print("\nMATRIZ DE CORRELA√á√ÉO:")
print(correlation_matrix.round(3))

# Visualiza√ß√£o
plt.figure(figsize=(10, 8))
sns.heatmap(correlation_matrix, annot=True, fmt='.3f', cmap='coolwarm', 
           center=0, square=True, linewidths=1, cbar_kws={"shrink": 0.8})
plt.title('Matriz de Correla√ß√£o entre Vari√°veis Setoriais', fontsize=16, fontweight='bold')
plt.tight_layout()
plt.show()

# Correla√ß√µes mais fortes - CORRE√á√ÉO AQUI
print("\nCORRELA√á√ïES MAIS FORTES (|r| > 0.5):")
print("-" * 80)

# Usar numpy.abs em vez da fun√ß√£o abs do Python (que est√° sendo sobrescrita pelo PySpark)
import builtins
abs_builtin = builtins.abs  # Salvar refer√™ncia √† fun√ß√£o built-in

correlacoes_fortes = []
for i in range(len(correlation_matrix.columns)):
    for j in range(i+1, len(correlation_matrix.columns)):
        corr_value = correlation_matrix.iloc[i, j]
        # Usar abs_builtin ou np.abs
        if abs_builtin(corr_value) > 0.5:
            correlacoes_fortes.append({
                'var1': correlation_matrix.columns[i],
                'var2': correlation_matrix.columns[j],
                'correlacao': corr_value
            })

if len(correlacoes_fortes) > 0:
    for item in correlacoes_fortes:
        print(f"  ‚Ä¢ {item['var1']} ‚Üî {item['var2']}: {item['correlacao']:.3f}")
else:
    print("  Nenhuma correla√ß√£o forte (|r| > 0.5) detectada.")

In [None]:
# =============================================================================
# PARTE 5: AN√ÅLISES ESPEC√çFICAS ADICIONAIS
# =============================================================================
print("\n" + "=" * 80)
print("PARTE 5: AN√ÅLISES ESPEC√çFICAS ADICIONAIS")
print("=" * 80)

# 5.1. An√°lise de Vari√¢ncia entre Portes dentro do Setor
print("\nüìä 5.1. AN√ÅLISE DE VARI√ÇNCIA ENTRE PORTES")
print("-" * 80)

variancia_porte = spark.sql("""
SELECT 
    bp.cnae_classe,
    b.desc_cnae_classe,
    STDDEV(bp.aliq_efetiva_mediana) AS variancia_aliq_entre_portes,
    MAX(bp.aliq_efetiva_mediana) - MIN(bp.aliq_efetiva_mediana) AS amplitude_aliq
FROM niat.argos_benchmark_setorial_porte bp
INNER JOIN niat.argos_benchmark_setorial b 
    ON bp.cnae_classe = b.cnae_classe AND bp.nu_per_ref = b.nu_per_ref
WHERE bp.nu_per_ref = (SELECT MAX(nu_per_ref) FROM niat.argos_benchmark_setorial_porte)
GROUP BY bp.cnae_classe, b.desc_cnae_classe
HAVING COUNT(DISTINCT bp.porte_empresa) >= 3
ORDER BY variancia_aliq_entre_portes DESC
LIMIT 15
""").toPandas()

print(variancia_porte.to_string(index=False))

# Visualiza√ß√£o
plt.figure(figsize=(14, 8))
sns.barplot(data=variancia_porte, y='desc_cnae_classe', x='variancia_aliq_entre_portes', palette='Reds_r')
plt.title('Setores com Maior Vari√¢ncia de Al√≠quota entre Portes', fontsize=16, fontweight='bold')
plt.xlabel('Desvio Padr√£o da Al√≠quota entre Portes')
plt.ylabel('Setor')
plt.tight_layout()
plt.show()

# 5.2. Score de Risco Composto (ML-Enhanced)
print("\nüéØ 5.2. SCORE DE RISCO COMPOSTO (ML-ENHANCED)")
print("-" * 80)

# Calcular score usando m√∫ltiplos fatores
df_score = spark.sql("""
SELECT 
    e.nu_cnpj,
    e.nm_razao_social,
    e.cnae_classe,
    e.porte_empresa,
    e.aliq_efetiva_empresa,
    e.aliq_setor_mediana,
    e.status_vs_setor,
    ete.categoria_volatilidade,
    ete.aliq_coef_variacao_8m,
    e.flag_divergencia_pagamento,
    COALESCE(a.score_risco, 0) AS score_alerta
FROM niat.argos_empresa_vs_benchmark e
LEFT JOIN niat.argos_evolucao_temporal_empresa ete ON e.nu_cnpj = ete.nu_cnpj
LEFT JOIN niat.argos_alertas_empresas a ON e.nu_cnpj = a.nu_cnpj AND e.nu_per_ref = a.nu_per_ref
WHERE e.nu_per_ref = (SELECT MAX(nu_per_ref) FROM niat.argos_empresa_vs_benchmark)
  AND e.aliq_efetiva_empresa IS NOT NULL
""").toPandas()

# Criar features para o modelo - CORRE√á√ÉO AQUI
df_score['diferenca_aliq'] = abs_builtin(df_score['aliq_efetiva_empresa'] - df_score['aliq_setor_mediana'])
# OU usando numpy (tamb√©m funciona):
# df_score['diferenca_aliq'] = np.abs(df_score['aliq_efetiva_empresa'] - df_score['aliq_setor_mediana'])

df_score['status_encoded'] = df_score['status_vs_setor'].map({
    'MUITO_ABAIXO': 5, 'ABAIXO': 3, 'NORMAL': 1, 'ACIMA': 2, 'MUITO_ACIMA': 4, 'SEM_DADOS': 0
})
df_score['volatilidade_encoded'] = df_score['categoria_volatilidade'].map({
    'ALTA': 3, 'MEDIA': 2, 'BAIXA': 1, 'SEM_DADOS': 0
})

# Random Forest para score composto
features_rf = ['diferenca_aliq', 'status_encoded', 'volatilidade_encoded', 
              'aliq_coef_variacao_8m', 'flag_divergencia_pagamento', 'score_alerta']

X_score = df_score[features_rf].fillna(0)

# Criar target sint√©tico baseado em m√∫ltiplos crit√©rios
df_score['risco_alto'] = (
    (df_score['status_vs_setor'] == 'MUITO_ABAIXO') | 
    (df_score['volatilidade_encoded'] >= 2) |
    (df_score['flag_divergencia_pagamento'] == 1)
).astype(int)

y_score = df_score['risco_alto']

# Treinar Random Forest
rf_score = RandomForestRegressor(n_estimators=100, random_state=42, max_depth=10)
rf_score.fit(X_score, y_score)

# Prever score
df_score['score_ml'] = rf_score.predict(X_score) * 100

# Top empresas por score ML
top_score_ml = df_score.nlargest(20, 'score_ml')[['nm_razao_social', 'cnae_classe', 'porte_empresa', 
                                                    'status_vs_setor', 'categoria_volatilidade', 'score_ml']]

print("\nTOP 20 EMPRESAS POR SCORE ML:")
print("-" * 80)
print(top_score_ml.to_string(index=False))

# Feature importance
feature_importance = pd.DataFrame({
    'feature': features_rf,
    'importance': rf_score.feature_importances_
}).sort_values('importance', ascending=False)

plt.figure(figsize=(12, 6))
sns.barplot(data=feature_importance, x='importance', y='feature', palette='viridis')
plt.title('Import√¢ncia das Features no Score de Risco ML', fontsize=16, fontweight='bold')
plt.xlabel('Import√¢ncia')
plt.ylabel('Feature')
plt.tight_layout()
plt.show()

In [None]:
# =============================================================================
# PARTE 6: RESUMO EXECUTIVO E EXPORTA√á√ÉO
# =============================================================================
print("\n" + "=" * 80)
print("PARTE 6: RESUMO EXECUTIVO")
print("=" * 80)

resumo_executivo = f"""
‚ïî‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïó
‚ïë           SISTEMA DE AN√ÅLISE TRIBUT√ÅRIA SETORIAL v4.0                     ‚ïë
‚ïë                    RESUMO EXECUTIVO DA AN√ÅLISE                             ‚ïë
‚ïö‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïù

üìä M√âTRICAS GERAIS:
  ‚Ä¢ Total de Setores Analisados: {stats_gerais['total_setores'].iloc[0]:,}
  ‚Ä¢ Total de Empresas: {stats_gerais['total_empresas'].iloc[0]:,}
  ‚Ä¢ Faturamento Total: R$ {stats_gerais['faturamento_total_bilhoes'].iloc[0]:.2f} bilh√µes
  ‚Ä¢ ICMS Devido Total: R$ {stats_gerais['icms_devido_bilhoes'].iloc[0]:.2f} bilh√µes
  ‚Ä¢ Al√≠quota M√©dia do Sistema: {stats_gerais['aliq_media_sistema_pct'].iloc[0]:.2f}%

üî¨ AN√ÅLISES DE MACHINE LEARNING:
  ‚Ä¢ Clusters de Setores Identificados: {k_otimo}
  ‚Ä¢ Setores Outliers Detectados: {len(outliers_detectados)}
  ‚Ä¢ Empresas com Alto Risco (ML): {(df_score['score_ml'] > 70).sum():,}

‚ö†Ô∏è ALERTAS CR√çTICOS:
  ‚Ä¢ Empresas em Risco Cr√≠tico: {stats_gerais['empresas_risco_critico'].iloc[0]:,}
  ‚Ä¢ Setores com Anomalias: {len(anomalias_ranking)}
  ‚Ä¢ Diverg√™ncias ICMS vs Pagamentos: {divergencias['qtd_empresas'].sum():,} empresas

üìà TEND√äNCIAS IDENTIFICADAS:
  ‚Ä¢ Setores com Alta Volatilidade: {(df_clustering['coef_variacao_temporal'] > 0.3).sum()}
  ‚Ä¢ Correla√ß√£o Faturamento-ICMS: {correlation_matrix.loc['log_faturamento', 'log_icms']:.3f}

üí° RECOMENDA√á√ïES:
  1. Priorizar fiscaliza√ß√£o dos {len(outliers_detectados)} setores outliers
  2. Monitorar empresas com score ML > 70 ({(df_score['score_ml'] > 70).sum():,} casos)
  3. Investigar diverg√™ncias ICMS em {divergencias.iloc[0]['qtd_empresas']} empresas
  4. Analisar clusters de setores para a√ß√µes setoriais coordenadas

‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê
An√°lise conclu√≠da em: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê
"""

print(resumo_executivo)

# Salvar resumo em arquivo (opcional)
# with open('/tmp/argos_resumo_executivo.txt', 'w', encoding='utf-8') as f:
#     f.write(resumo_executivo)
# print("\n‚úÖ Resumo executivo salvo em: /tmp/argos_resumo_executivo.txt")

print("\n" + "=" * 80)
print("‚úÖ AN√ÅLISE COMPLETA FINALIZADA COM SUCESSO!")
print("=" * 80)
print("\nPR√ìXIMOS PASSOS:")
print("  1. Utilizar os DataFrames gerados para an√°lises espec√≠ficas")
print("  2. Exportar resultados para dashboards (Streamlit/PowerBI)")
print("  3. Implementar sistema de monitoramento cont√≠nuo")
print("  4. Integrar com sistema de alertas autom√°ticos")
print("\n" + "=" * 80)