In [None]:
import sys
sys.path.append("/home/tsevero/notebooks/SAT_BIG_DATA/data-pipeline/batch/poc")
sys.path.append("/home/tsevero/notebooks/SAT_BIG_DATA/data-pipeline/batch/plugins")
sys.path.append("/home/tsevero/notebooks/SAT_BIG_DATA/data-pipeline/batch/dags")

#Import libs python
from pyspark.sql.types import *
from pyspark.sql.functions import *
from datetime import date

#Import libs internas
from utils import spark_utils_session as utils

from hooks.hdfs.hdfs_helper import HdfsHelper
from jobs.job_base_config import BaseETLJobClass

import poc_helper
poc_helper.load_env("PROD")

In [None]:
def get_session(profile: str, dynamic_allocation_enabled: bool = True) -> utils.DBASparkAppSession:
    """Generates DBASparkAppSession."""
    
    app_name = "tsevero_gei_calculo"
    
    spark_builder = (utils.DBASparkAppSession
                     .builder
                     .setAppName(app_name)
                     .usingProcessProfile(profile)
                    )
    
    if dynamic_allocation_enabled:
        spark_builder.autoResourceManagement()

    return spark_builder.build()

session = get_session(profile='efd_t2')

In [None]:
session.sparkSession.sql("SHOW DATABASES").show(truncate=False)

In [None]:
# Bibliotecas para an√°lise e visualiza√ß√£o
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import warnings
from datetime import datetime, date
from decimal import Decimal

# Configura√ß√µes de visualiza√ß√£o
plt.style.use('seaborn-v0_8-darkgrid')
sns.set_palette("husl")
warnings.filterwarnings('ignore')
plt.rcParams['figure.figsize'] = (16, 8)
plt.rcParams['font.size'] = 11

# Acesso ao SparkSession
spark = session.sparkSession

print("=" * 80)
print("SISTEMA DE AN√ÅLISE FISCAL - PROCESSAMENTO NFe/CTe")
print("=" * 80)
print(f"Iniciado em: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
print("=" * 80)

In [None]:
# ===================================================================
# C√âLULA 3: PAR√ÇMETROS DE ENTRADA
# ===================================================================
# DEFINA AQUI OS PAR√ÇMETROS DA AN√ÅLISE

# Per√≠odo de an√°lise (formato YYYYMM)
PERIODO_INICIO = 202001  # Janeiro/2020
PERIODO_FIM = 202509     # Setembro/2025

# Lista de CNPJs a serem analisados (cadastro)
# Se vazio, busca da tabela de cadastro
CNPJS_ANALISE = ['32372396000194', '46909678000192']  # Exemplo: ['12345678000190', '98765432000110']

# Se quiser buscar de uma tabela espec√≠fica:
TABELA_CADASTRO = "usr_sat_ods.vw_ods_contrib"  # Ajuste conforme sua tabela
CAMPO_CNPJ_CADASTRO = "nu_cnpj"  # Campo que cont√©m o CNPJ

print("=" * 80)
print("PAR√ÇMETROS DA AN√ÅLISE")
print("=" * 80)
print(f"Per√≠odo: {PERIODO_INICIO} at√© {PERIODO_FIM}")
print(f"Tabela Cadastro: {TABELA_CADASTRO}")
print(f"CNPJs espec√≠ficos: {len(CNPJS_ANALISE) if CNPJS_ANALISE else 'Todos da tabela de cadastro'}")
print("=" * 80)

In [None]:
# ===================================================================
# C√âLULA 4: CARREGAR CNPJS DO CADASTRO
# ===================================================================
print("\n" + "=" * 80)
print("CARREGANDO CNPJS DO CADASTRO")
print("=" * 80)

if CNPJS_ANALISE:
    # Usar lista manual
    df_cadastro = spark.createDataFrame(
        [(cnpj,) for cnpj in CNPJS_ANALISE], 
        ["cnpj"]
    )
    print(f"‚úì Usando {len(CNPJS_ANALISE)} CNPJs fornecidos manualmente")
else:
    # Buscar da tabela
    query_cadastro = f"""
    SELECT DISTINCT 
        REGEXP_REPLACE(TRIM(CAST({CAMPO_CNPJ_CADASTRO} AS STRING)), '[^0-9]', '') AS cnpj
    FROM {TABELA_CADASTRO}
    WHERE {CAMPO_CNPJ_CADASTRO} IS NOT NULL
    """
    df_cadastro = spark.sql(query_cadastro)
    total_cnpjs = df_cadastro.count()
    print(f"‚úì Carregados {total_cnpjs} CNPJs da tabela {TABELA_CADASTRO}")

# Criar view tempor√°ria
df_cadastro.createOrReplaceTempView("cadastro_cnpj")

# Exibir amostra
print("\nAmostra de CNPJs:")
df_cadastro.limit(10).show(truncate=False)

In [None]:
# ===================================================================
# C√âLULA 5: CRIAR VIEW DE NFe (OTIMIZADA + SKIP PER√çODO CORROMPIDO)
# ===================================================================
print("\n" + "=" * 80)
print("CRIANDO VIEW DE NFe PROCESSADA (OTIMIZADA)")
print("=" * 80)

# REFRESH da tabela NFe primeiro
print("Fazendo REFRESH da tabela nfe.nfe...")
try:
    spark.sql("REFRESH TABLE nfe.nfe")
    print("‚úì REFRESH executado")
except Exception as e:
    print(f"‚ö† REFRESH falhou: {e}")

# Materializar os CNPJs em cache
spark.sql("CACHE TABLE cadastro_cnpj")
cnpj_list = [row.cnpj for row in spark.sql("SELECT cnpj FROM cadastro_cnpj").collect()]
print(f"CNPJs no cadastro: {len(cnpj_list)}")

# Criar string para usar no IN clause
cnpj_in_clause = "', '".join(cnpj_list)

# ‚ö†Ô∏è AJUSTE CR√çTICO: Pular per√≠odos corrompidos
# Se 202001 est√° corrompido, come√ßar de 202002
PERIODO_INICIO_AJUSTADO = 202101  # Come√ßar de Janeiro/2021 ao inv√©s de 2020

print(f"\n‚ö†Ô∏è ATEN√á√ÉO: Pulando per√≠odo 2020 devido a arquivos corrompidos")
print(f"Per√≠odo ajustado: {PERIODO_INICIO_AJUSTADO} at√© {PERIODO_FIM}")

query_nfe_view = f"""
CREATE OR REPLACE TEMPORARY VIEW vw_nfe_processada AS
SELECT 
    -- Identifica√ß√£o
    a.chave AS chave_nfe,
    (a.ano_emissao * 100 + a.mes_emissao) AS periodo_ref,
    CONCAT(LPAD(a.mes_emissao, 2, '0'), '/', a.ano_emissao) AS periodo_ref_formatado,
    a.dhemi_orig AS data_hora_emissao,
    
    -- Emitente (limpeza feita uma vez)
    cnpj_emit AS cnpj_emitente,
    a.procnfe.nfe.infnfe.emit.xnome AS razao_social_emitente,
    a.procnfe.nfe.infnfe.emit.xfant AS nome_fantasia_emitente,
    a.procnfe.nfe.infnfe.emit.ie AS ie_emitente,
    a.procnfe.nfe.infnfe.emit.crt AS crt_emitente,
    a.procnfe.nfe.infnfe.emit.enderemit.uf AS uf_emitente,
    a.procnfe.nfe.infnfe.emit.enderemit.xmun AS municipio_emitente,
    
    -- Destinat√°rio (limpeza feita uma vez)
    cnpj_dest AS cnpj_destinatario,
    a.procnfe.nfe.infnfe.dest.xnome AS razao_social_destinatario,
    a.procnfe.nfe.infnfe.dest.ie AS ie_destinatario,
    a.procnfe.nfe.infnfe.dest.indiedest AS indicador_ie_destinatario,
    a.procnfe.nfe.infnfe.dest.enderdest.uf AS uf_destinatario,
    a.procnfe.nfe.infnfe.dest.enderdest.xmun AS municipio_destinatario,
    
    -- Opera√ß√£o
    a.procnfe.nfe.infnfe.ide.natop AS natureza_operacao,
    a.procnfe.nfe.infnfe.ide.tpnf AS tipo_nf,
    CASE 
        WHEN a.procnfe.nfe.infnfe.ide.tpnf = 0 THEN 'Entrada'
        WHEN a.procnfe.nfe.infnfe.ide.tpnf = 1 THEN 'Saida'
        ELSE 'Indefinido'
    END AS entrada_saida,
    CASE 
        WHEN a.procnfe.nfe.infnfe.ide.indfinal = 1 THEN 'Consumidor final'
        ELSE 'Normal'
    END AS tipo_consumidor,
    
    -- Itens
    b._nitem AS numero_item,
    b.prod.cprod AS codigo_produto,
    b.prod.xprod AS descricao_produto,
    b.prod.ncm AS ncm,
    b.prod.cfop AS cfop,
    
    -- Valores
    CAST(COALESCE(b.prod.vprod, 0) AS DECIMAL(15,2)) AS valor_produto,
    CAST(COALESCE(b.prod.vfrete, 0) AS DECIMAL(15,2)) AS valor_frete,
    CAST(COALESCE(b.prod.vseg, 0) AS DECIMAL(15,2)) AS valor_seguro,
    CAST(COALESCE(b.prod.vdesc, 0) AS DECIMAL(15,2)) AS valor_desconto,
    CAST(COALESCE(b.prod.voutro, 0) AS DECIMAL(15,2)) AS valor_outras_despesas,
    
    -- ICMS
    b.imposto.icms.resumo.cst AS cst_icms,
    CAST(COALESCE(b.imposto.icms.resumo.vbc, 0) AS DECIMAL(15,2)) AS bc_icms,
    CAST(COALESCE(b.imposto.icms.resumo.picms, 0) AS DECIMAL(7,4)) AS aliquota_icms,
    CAST(COALESCE(b.imposto.icms.resumo.vicms, 0) AS DECIMAL(15,2)) AS valor_icms,
    CAST(COALESCE(b.imposto.icms.resumo.vcredicmssn, 0) AS DECIMAL(15,2)) AS valor_credito_sn,
    
    -- Totais NFe
    CAST(COALESCE(a.procnfe.nfe.infnfe.total.icmstot.vnf, 0) AS DECIMAL(15,2)) AS total_nfe,
    
    -- Flags cadastro
    IF(cnpj_emit IN ('{cnpj_in_clause}'), 1, 0) AS emitente_no_cadastro,
    IF(cnpj_dest IN ('{cnpj_in_clause}'), 1, 0) AS destinatario_no_cadastro

FROM (
    SELECT 
        *,
        REGEXP_REPLACE(TRIM(CAST(procnfe.nfe.infnfe.emit.cnpj AS STRING)), '[^0-9]', '') AS cnpj_emit,
        REGEXP_REPLACE(TRIM(CAST(procnfe.nfe.infnfe.dest.cnpj AS STRING)), '[^0-9]', '') AS cnpj_dest
    FROM nfe.nfe
    WHERE situacao = 1
      AND (ano_emissao * 100 + mes_emissao) >= {PERIODO_INICIO_AJUSTADO}
      AND (ano_emissao * 100 + mes_emissao) <= {PERIODO_FIM}
) a
LATERAL VIEW EXPLODE(a.procnfe.nfe.infnfe.det) exploded_table AS b
WHERE cnpj_emit IN ('{cnpj_in_clause}')
   OR cnpj_dest IN ('{cnpj_in_clause}')
"""

try:
    spark.sql(query_nfe_view)
    print("‚úì View criada")
    
    # Verificar SEM executar count completo (muito pesado)
    print("\n‚úì View NFe criada com sucesso")
    print(f"Per√≠odo: {PERIODO_INICIO_AJUSTADO} at√© {PERIODO_FIM}")
    
    # Amostra para verificar
    print("\nAmostra dos dados (5 registros):")
    spark.sql("SELECT periodo_ref, cnpj_emitente, cnpj_destinatario, entrada_saida, valor_produto FROM vw_nfe_processada LIMIT 5").show(truncate=False)
    
except Exception as e:
    print(f"‚úó Erro: {e}")
    raise

print("\n‚úì Pronto para pr√≥ximas etapas!")

In [None]:
# ===================================================================
# C√âLULA 6: INTEGRAR CFOP (Execute ap√≥s a c√©lula 5 terminar)
# ===================================================================
print("\n" + "=" * 80)
print("INTEGRANDO INFORMA√á√ïES DE CFOP")
print("=" * 80)

query_cfop_view = """
CREATE OR REPLACE TEMPORARY VIEW vw_nfe_com_cfop AS
SELECT 
    nfe.*,
    cfop.conta,
    cfop.descricaocfop,
    cfop.indcom,
    cfop.movimento
FROM vw_nfe_processada nfe
LEFT JOIN niat.tabela_cfop cfop 
    ON nfe.cfop = cfop.cfop
"""

spark.sql(query_cfop_view)
print("‚úì View vw_nfe_com_cfop criada")

# Verificar
total = spark.sql("SELECT COUNT(*) as total FROM vw_nfe_com_cfop").collect()[0]['total']
print(f"‚úì Total: {total:,} registros")

# Top CFOPs
print("\nTop 10 CFOPs:")
spark.sql("""
    SELECT 
        cfop,
        descricaocfop,
        conta,
        entrada_saida,
        COUNT(*) as qtde,
        ROUND(SUM(valor_produto), 2) as vl_total
    FROM vw_nfe_com_cfop
    GROUP BY cfop, descricaocfop, conta, entrada_saida
    ORDER BY qtde DESC
    LIMIT 10
""").show(truncate=False)

In [None]:
# ===================================================================
# C√âLULA 6: INTEGRAR CFOP (CORRIGIDA)
# ===================================================================
print("\n" + "=" * 80)
print("INTEGRANDO INFORMA√á√ïES DE CFOP")
print("=" * 80)

# Primeiro, carregar a tabela CFOP para mem√≥ria
try:
    print("Carregando tabela CFOP...")
    df_cfop = spark.sql("""
        SELECT 
            cfop,
            conta,
            descricaocfop,
            indcom,
            movimento,
            especial,
            local,
            mercenergtel
        FROM niat.tabela_cfop
    """)
    
    # Criar view tempor√°ria
    df_cfop.createOrReplaceTempView("temp_cfop")
    total_cfops = df_cfop.count()
    print(f"‚úì Tabela CFOP carregada: {total_cfops} registros")
    
except Exception as e:
    print(f"‚úó Erro ao carregar tabela CFOP: {e}")
    print("\nCriando tabela CFOP b√°sica manualmente...")
    
    # Se n√£o conseguir acessar, criar uma b√°sica
    cfop_data = [
        (1101, 0, "Compra para industrializa√ß√£o", "Industrializa√ß√£o", "Opera√ß√£o"),
        (1102, 0, "Compra para comercializa√ß√£o", "Comercializa√ß√£o", "Opera√ß√£o"),
        (5101, 1, "Venda de produ√ß√£o do estabelecimento", "Industrializa√ß√£o", "Opera√ß√£o"),
        (5102, 1, "Venda de mercadoria adquirida", "Comercializa√ß√£o", "Opera√ß√£o"),
        (5403, 1, "Venda de mercadoria em opera√ß√£o com n√£o contribuinte", "Comercializa√ß√£o", "Opera√ß√£o"),
        (6101, 1, "Venda de produ√ß√£o interestadual", "Industrializa√ß√£o", "Opera√ß√£o"),
        (6102, 1, "Venda de mercadoria interestadual", "Comercializa√ß√£o", "Opera√ß√£o"),
    ]
    
    from pyspark.sql.types import StructType, StructField, IntegerType, StringType
    schema = StructType([
        StructField("cfop", IntegerType(), True),
        StructField("conta", IntegerType(), True),
        StructField("descricaocfop", StringType(), True),
        StructField("indcom", StringType(), True),
        StructField("movimento", StringType(), True),
    ])
    
    df_cfop = spark.createDataFrame(cfop_data, schema)
    df_cfop.createOrReplaceTempView("temp_cfop")
    print("‚úì Tabela CFOP b√°sica criada")

# Agora fazer o join com a view tempor√°ria
query_cfop_view = """
CREATE OR REPLACE TEMPORARY VIEW vw_nfe_com_cfop AS
SELECT 
    nfe.*,
    cfop.conta,
    cfop.descricaocfop,
    cfop.indcom,
    cfop.movimento
FROM vw_nfe_processada nfe
LEFT JOIN temp_cfop cfop 
    ON nfe.cfop = cfop.cfop
"""

spark.sql(query_cfop_view)
print("‚úì View vw_nfe_com_cfop criada")

# Verificar
total = spark.sql("SELECT COUNT(*) as total FROM vw_nfe_com_cfop").collect()[0]['total']
print(f"‚úì Total: {total:,} registros")

# Verificar quantos CFOPs n√£o foram encontrados
cfops_sem_desc = spark.sql("""
    SELECT COUNT(*) as total 
    FROM vw_nfe_com_cfop 
    WHERE descricaocfop IS NULL
""").collect()[0]['total']

if cfops_sem_desc > 0:
    print(f"‚ö† {cfops_sem_desc:,} registros sem descri√ß√£o de CFOP")

# Top CFOPs
print("\nTop 10 CFOPs:")
spark.sql("""
    SELECT 
        cfop,
        descricaocfop,
        conta,
        entrada_saida,
        COUNT(*) as qtde,
        ROUND(SUM(valor_produto), 2) as vl_total
    FROM vw_nfe_com_cfop
    GROUP BY cfop, descricaocfop, conta, entrada_saida
    ORDER BY qtde DESC
    LIMIT 10
""").show(truncate=False)

In [None]:
# ===================================================================
# C√âLULA 7: CALCULAR ICMS
# ===================================================================
print("\n" + "=" * 80)
print("CALCULANDO ICMS - D√âBITOS E CR√âDITOS")
print("=" * 80)

query_icms = """
CREATE OR REPLACE TEMPORARY VIEW vw_nfe_com_icms AS
SELECT 
    *,
    -- BC total do item
    (valor_produto + valor_frete + valor_seguro + valor_outras_despesas - valor_desconto) AS bc_total_item,
    
    -- D√âBITO (Sa√≠da)
    CASE 
        WHEN entrada_saida = 'Saida' AND conta = 1 THEN
            CASE
                WHEN valor_icms > 0 THEN valor_icms
                WHEN bc_icms > 0 AND aliquota_icms > 0 THEN bc_icms * (aliquota_icms / 100)
                WHEN tipo_consumidor = 'Consumidor final' THEN 
                    (valor_produto + valor_frete + valor_seguro + valor_outras_despesas - valor_desconto) * 0.17
                WHEN uf_destinatario IN ('MG', 'PR', 'RJ', 'RS', 'SP') AND uf_destinatario != uf_emitente THEN
                    (valor_produto + valor_frete + valor_seguro + valor_outras_despesas - valor_desconto) * 0.12
                WHEN uf_destinatario != uf_emitente THEN
                    (valor_produto + valor_frete + valor_seguro + valor_outras_despesas - valor_desconto) * 0.07
                ELSE 
                    (valor_produto + valor_frete + valor_seguro + valor_outras_despesas - valor_desconto) * 0.12
            END
        ELSE 0
    END AS vl_icms_debito,
    
    CASE 
        WHEN entrada_saida = 'Saida' AND conta = 1 THEN
            COALESCE(bc_icms, valor_produto + valor_frete + valor_seguro + valor_outras_despesas - valor_desconto)
        ELSE 0
    END AS bc_icms_debito,
    
    -- CR√âDITO (Entrada)
    CASE 
        WHEN entrada_saida = 'Entrada' AND conta = -1 
             AND cst_icms NOT IN ('10', '15', '30', '40', '41', '53', '60', '61', '70', '102', '202', '203', '300', '500') THEN
            CASE
                WHEN valor_icms > 0 THEN valor_icms
                WHEN valor_credito_sn > 0 THEN valor_credito_sn
                ELSE 0
            END
        ELSE 0
    END AS vl_icms_credito,
    
    CASE 
        WHEN entrada_saida = 'Entrada' AND conta = -1 THEN
            COALESCE(bc_icms, valor_produto + valor_frete + valor_seguro + valor_outras_despesas - valor_desconto)
        ELSE 0
    END AS bc_icms_credito

FROM vw_nfe_com_cfop
"""

spark.sql(query_icms)
print("‚úì View vw_nfe_com_icms criada")

# Resumo
print("\nüìä RESUMO ICMS:")
spark.sql("""
    SELECT 
        entrada_saida,
        COUNT(*) as qtde_itens,
        ROUND(SUM(bc_icms_debito), 2) as bc_debito,
        ROUND(SUM(vl_icms_debito), 2) as vl_debito,
        ROUND(SUM(bc_icms_credito), 2) as bc_credito,
        ROUND(SUM(vl_icms_credito), 2) as vl_credito
    FROM vw_nfe_com_icms
    GROUP BY entrada_saida
""").show(truncate=False)

In [None]:
# ===================================================================
# C√âLULA 8: AN√ÅLISE POR ENTRADA/SA√çDA E EMITENTE/DESTINAT√ÅRIO
# ===================================================================
print("\n" + "=" * 80)
print("SEPARANDO: ENTRADA vs SA√çDA | EMITENTE vs DESTINAT√ÅRIO")
print("=" * 80)

# ENTRADA - DESTINAT√ÅRIO (empresa do cadastro recebeu mercadoria)
df_entrada_dest = spark.sql("""
    SELECT *
    FROM vw_nfe_com_icms
    WHERE entrada_saida = 'Entrada'
      AND destinatario_no_cadastro = 1
""")
df_entrada_dest.createOrReplaceTempView("vw_entrada_destinatario")
total_ed = df_entrada_dest.count()
print(f"‚úì Entrada Destinat√°rio: {total_ed:,} registros")

# ENTRADA - EMITENTE (empresa do cadastro emitiu nota de devolu√ß√£o/retorno)
df_entrada_emit = spark.sql("""
    SELECT *
    FROM vw_nfe_com_icms
    WHERE entrada_saida = 'Entrada'
      AND emitente_no_cadastro = 1
""")
df_entrada_emit.createOrReplaceTempView("vw_entrada_emitente")
total_ee = df_entrada_emit.count()
print(f"‚úì Entrada Emitente: {total_ee:,} registros")

# SA√çDA - EMITENTE (empresa do cadastro vendeu)
df_saida_emit = spark.sql("""
    SELECT *
    FROM vw_nfe_com_icms
    WHERE entrada_saida = 'Saida'
      AND emitente_no_cadastro = 1
""")
df_saida_emit.createOrReplaceTempView("vw_saida_emitente")
total_se = df_saida_emit.count()
print(f"‚úì Sa√≠da Emitente: {total_se:,} registros")

# SA√çDA - DESTINAT√ÅRIO (empresa do cadastro recebeu venda de terceiro)
df_saida_dest = spark.sql("""
    SELECT *
    FROM vw_nfe_com_icms
    WHERE entrada_saida = 'Saida'
      AND destinatario_no_cadastro = 1
""")
df_saida_dest.createOrReplaceTempView("vw_saida_destinatario")
total_sd = df_saida_dest.count()
print(f"‚úì Sa√≠da Destinat√°rio: {total_sd:,} registros")

print(f"\n‚úì Total geral: {total_ed + total_ee + total_se + total_sd:,} registros")

In [None]:
# ===================================================================
# C√âLULA 9: PROCESSAR CTe (CORRIGIDA - ESTRUTURA ICMS)
# ===================================================================
print("\n" + "=" * 80)
print("PROCESSANDO CTe - CR√âDITO DE FRETE")
print("=" * 80)

query_cte = f"""
CREATE OR REPLACE TEMPORARY VIEW vw_cte_processada AS
SELECT 
    -- Identifica√ß√£o
    a.chave AS chave_cte,
    a.ano_emissao AS ano_emissao,
    
    -- Per√≠odo
    CONCAT(
        LPAD(MONTH(a.proccte.cte.infcte.ide.dhemi), 2, '0'), 
        '/', 
        YEAR(a.proccte.cte.infcte.ide.dhemi)
    ) AS periodo_ref,
    
    (YEAR(a.proccte.cte.infcte.ide.dhemi) * 100 + MONTH(a.proccte.cte.infcte.ide.dhemi)) AS periodo_ref_num,
    
    a.proccte.cte.infcte.ide.dhemi AS data_hora_emissao,
    
    -- Tomador (quem paga o frete)
    REGEXP_REPLACE(
        TRIM(CAST(a.proccte.cte.infcte.ide.toma4.cnpj AS STRING)), 
        '[^0-9]', ''
    ) AS cnpj_tomador,
    
    a.proccte.cte.infcte.ide.toma4.xnome AS nome_tomador,
    a.proccte.cte.infcte.ide.toma4.toma AS indicador_tomador,
    
    -- Remetente
    REGEXP_REPLACE(
        TRIM(CAST(a.proccte.cte.infcte.rem.cnpj AS STRING)), 
        '[^0-9]', ''
    ) AS cnpj_remetente,
    
    a.proccte.cte.infcte.rem.xnome AS nome_remetente,
    
    -- Destinat√°rio
    REGEXP_REPLACE(
        TRIM(CAST(a.proccte.cte.infcte.dest.cnpj AS STRING)), 
        '[^0-9]', ''
    ) AS cnpj_destinatario,
    
    a.proccte.cte.infcte.dest.xnome AS nome_destinatario,
    
    -- Emitente (transportadora)
    REGEXP_REPLACE(
        TRIM(CAST(a.proccte.cte.infcte.emit.cnpj AS STRING)), 
        '[^0-9]', ''
    ) AS cnpj_emitente,
    
    a.proccte.cte.infcte.emit.xnome AS nome_emitente,
    a.proccte.cte.infcte.emit.ie AS ie_emitente,
    
    -- CFOP e Natureza
    a.proccte.cte.infcte.ide.cfop AS cfop,
    a.proccte.cte.infcte.ide.natop AS natureza_operacao,
    a.proccte.cte.infcte.ide.modal AS modal,
    a.proccte.cte.infcte.ide.tpserv AS tipo_servico,
    
    -- Valores
    CAST(COALESCE(a.proccte.cte.infcte.vprest.vtprest, 0) AS DECIMAL(15,2)) AS valor_total_servico,
    
    -- ICMS - Estrutura CORRETA baseada no describe
    CAST(COALESCE(
        a.proccte.cte.infcte.imp.icms.icms00.vicms,
        a.proccte.cte.infcte.imp.icms.icms20.vicms,
        a.proccte.cte.infcte.imp.icms.icms60.vcred,
        a.proccte.cte.infcte.imp.icms.icms90.vcred,
        a.proccte.cte.infcte.imp.icms.cst00.vicms,
        a.proccte.cte.infcte.imp.icms.cst20.vicms,
        a.proccte.cte.infcte.imp.icms.cst80.vcred,
        a.proccte.cte.infcte.imp.icms.cst90.vcred,
        0
    ) AS DECIMAL(15,2)) AS valor_icms,
    
    CAST(COALESCE(
        a.proccte.cte.infcte.imp.icms.icms00.vbc,
        a.proccte.cte.infcte.imp.icms.icms20.vbc,
        a.proccte.cte.infcte.imp.icms.icms90.vbc,
        a.proccte.cte.infcte.imp.icms.cst00.vbc,
        a.proccte.cte.infcte.imp.icms.cst20.vbc,
        a.proccte.cte.infcte.imp.icms.cst80.vbc,
        a.proccte.cte.infcte.imp.icms.cst90.vbc,
        0
    ) AS DECIMAL(15,2)) AS bc_icms,
    
    -- CST
    COALESCE(
        a.proccte.cte.infcte.imp.icms.icms00.cst,
        a.proccte.cte.infcte.imp.icms.icms20.cst,
        a.proccte.cte.infcte.imp.icms.icms45.cst,
        a.proccte.cte.infcte.imp.icms.icms60.cst,
        a.proccte.cte.infcte.imp.icms.icms90.cst,
        a.proccte.cte.infcte.imp.icms.icmssn.cst,
        a.proccte.cte.infcte.imp.icms.cst00.cst,
        a.proccte.cte.infcte.imp.icms.cst20.cst,
        a.proccte.cte.infcte.imp.icms.cst45.cst,
        a.proccte.cte.infcte.imp.icms.cst80.cst,
        a.proccte.cte.infcte.imp.icms.cst90.cst
    ) AS cst_icms

FROM cte.cte a
WHERE a.situacao = 1
  AND a.ano_emissao >= {PERIODO_INICIO // 100}
  AND a.ano_emissao <= {PERIODO_FIM // 100}
  AND a.proccte.cte.infcte.ide.toma4.cnpj IS NOT NULL
"""

try:
    spark.sql(query_cte)
    print("‚úì View vw_cte_processada criada")
    
    # Filtrar CTe relevantes
    query_cte_filtrado = f"""
    CREATE OR REPLACE TEMPORARY VIEW vw_cte_filtrado AS
    SELECT 
        *,
        IF(cnpj_tomador IN ('{cnpj_in_clause}'), 1, 0) AS tomador_no_cadastro,
        IF(cnpj_destinatario IN ('{cnpj_in_clause}'), 1, 0) AS destinatario_no_cadastro,
        IF(cnpj_remetente IN ('{cnpj_in_clause}'), 1, 0) AS remetente_no_cadastro
    FROM vw_cte_processada
    WHERE periodo_ref_num >= {PERIODO_INICIO}
      AND periodo_ref_num <= {PERIODO_FIM}
      AND (
          cnpj_tomador IN ('{cnpj_in_clause}')
          OR cnpj_destinatario IN ('{cnpj_in_clause}')
          OR cnpj_remetente IN ('{cnpj_in_clause}')
      )
    """
    
    spark.sql(query_cte_filtrado)
    total_cte = spark.sql("SELECT COUNT(*) as total FROM vw_cte_filtrado").collect()[0]['total']
    print(f"‚úì CTe filtrados: {total_cte:,} registros")
    
    if total_cte > 0:
        # Resumo por per√≠odo
        print("\nResumo CTe por per√≠odo:")
        spark.sql("""
            SELECT 
                periodo_ref,
                COUNT(*) as qtde_cte,
                ROUND(SUM(valor_total_servico), 2) as vl_total_servico,
                ROUND(SUM(valor_icms), 2) as vl_icms_credito
            FROM vw_cte_filtrado
            GROUP BY periodo_ref
            ORDER BY periodo_ref
        """).show(20, truncate=False)
        
        # Resumo por modal
        print("\nResumo por Modal de Transporte:")
        spark.sql("""
            SELECT 
                CASE 
                    WHEN modal = 1 THEN 'Rodovi√°rio'
                    WHEN modal = 2 THEN 'A√©reo'
                    WHEN modal = 3 THEN 'Aquavi√°rio'
                    WHEN modal = 4 THEN 'Ferrovi√°rio'
                    WHEN modal = 5 THEN 'Dutovi√°rio'
                    WHEN modal = 6 THEN 'Multimodal'
                    ELSE 'Outros'
                END as modal_descricao,
                COUNT(*) as qtde,
                ROUND(SUM(valor_icms), 2) as vl_icms
            FROM vw_cte_filtrado
            GROUP BY modal
            ORDER BY qtde DESC
        """).show(truncate=False)
        
        # Principais transportadoras
        print("\nTop 10 Transportadoras:")
        spark.sql("""
            SELECT 
                nome_emitente,
                cnpj_emitente,
                COUNT(*) as qtde_cte,
                ROUND(SUM(valor_total_servico), 2) as vl_total,
                ROUND(SUM(valor_icms), 2) as vl_icms
            FROM vw_cte_filtrado
            GROUP BY nome_emitente, cnpj_emitente
            ORDER BY vl_total DESC
            LIMIT 10
        """).show(truncate=False)
    else:
        print("\n‚ö† Nenhum CTe encontrado para os CNPJs do cadastro no per√≠odo")

except Exception as e:
    print(f"‚úó Erro ao processar CTe: {e}")
    print("\n‚ö† Criando view CTe vazia para continuar processamento...")
    
    # Criar view vazia
    spark.sql("""
        CREATE OR REPLACE TEMPORARY VIEW vw_cte_filtrado AS
        SELECT 
            CAST(NULL AS STRING) AS periodo_ref,
            CAST(NULL AS INT) AS periodo_ref_num,
            CAST(NULL AS STRING) AS cnpj_tomador,
            CAST(0 AS DECIMAL(15,2)) AS valor_icms,
            CAST(0 AS INT) AS tomador_no_cadastro
        WHERE 1=0
    """)
    
    print("‚úì View CTe vazia criada - processamento continuar√° sem dados de CTe")

In [None]:
# ===================================================================
# C√âLULA 10: CARREGAR PGDAS-D (Simples Nacional)
# ===================================================================
print("\n" + "=" * 80)
print("CARREGANDO DADOS PGDAS-D (SIMPLES NACIONAL)")
print("=" * 80)

query_pgdas = f"""
CREATE OR REPLACE TEMPORARY VIEW vw_pgdas_filtrado AS
SELECT 
    REGEXP_REPLACE(TRIM(nu_cnpj), '[^0-9]', '') AS cnpj,
    nu_per_ref AS periodo_ref,
    CONCAT(LPAD(nu_per_ref % 100, 2, '0'), '/', CAST(nu_per_ref / 100 AS INT)) AS periodo_formatado,
    CAST(COALESCE(vl_rec_bruta_estab, 0) AS DECIMAL(15,2)) AS receita_bruta,
    CAST(COALESCE(vl_icms_sc, 0) AS DECIMAL(15,2)) AS icms_declarado_sc
FROM usr_sat_ods.sna_pgdasd_estabelecimento_raw
WHERE nu_per_ref >= {PERIODO_INICIO}
  AND nu_per_ref <= {PERIODO_FIM}
  AND REGEXP_REPLACE(TRIM(nu_cnpj), '[^0-9]', '') IN ('{cnpj_in_clause}')
"""

spark.sql(query_pgdas)
total_pgdas = spark.sql("SELECT COUNT(*) as total FROM vw_pgdas_filtrado").collect()[0]['total']
print(f"‚úì PGDAS-D carregado: {total_pgdas:,} registros")

# Mostrar amostra
print("\nAmostra PGDAS-D:")
spark.sql("""
    SELECT *
    FROM vw_pgdas_filtrado
    ORDER BY periodo_ref DESC
    LIMIT 10
""").show(10, truncate=False)

In [None]:
# ===================================================================
# C√âLULA 11: CONSOLIDAR NOTIFICA√á√ÉO (CORRIGIDA COM TRATAMENTO)
# ===================================================================
print("\n" + "=" * 80)
print("GERANDO NOTIFICA√á√ÉO - APURA√á√ÉO MENSAL DE ICMS")
print("=" * 80)

query_notificacao = """
CREATE OR REPLACE TEMPORARY VIEW vw_notificacao AS
WITH apuracao_saida AS (
    SELECT 
        periodo_ref,
        cnpj_emitente AS cnpj,
        SUM(bc_icms_debito) AS bc_icms_saida,
        SUM(vl_icms_debito) AS vl_icms_saida
    FROM vw_saida_emitente
    GROUP BY periodo_ref, cnpj_emitente
),
apuracao_entrada AS (
    SELECT 
        periodo_ref,
        cnpj_destinatario AS cnpj,
        SUM(bc_icms_credito) AS bc_icms_entrada,
        SUM(vl_icms_credito) AS vl_icms_entrada
    FROM vw_entrada_destinatario
    GROUP BY periodo_ref, cnpj_destinatario
),
apuracao_cte AS (
    SELECT 
        periodo_ref_num AS periodo_ref,
        cnpj_tomador AS cnpj,
        SUM(valor_icms) AS vl_icms_cte
    FROM vw_cte_filtrado
    WHERE tomador_no_cadastro = 1
    GROUP BY periodo_ref_num, cnpj_tomador
)
SELECT 
    COALESCE(s.periodo_ref, e.periodo_ref, c.periodo_ref, p.periodo_ref) AS periodo,
    COALESCE(s.cnpj, e.cnpj, c.cnpj, p.cnpj) AS cnpj,
    CONCAT(
        LPAD(CAST(COALESCE(s.periodo_ref, e.periodo_ref, c.periodo_ref, p.periodo_ref) % 100 AS STRING), 2, '0'),
        '/',
        CAST(COALESCE(s.periodo_ref, e.periodo_ref, c.periodo_ref, p.periodo_ref) / 100 AS STRING)
    ) AS periodo_formatado,
    
    -- D√©bitos
    ROUND(COALESCE(s.bc_icms_saida, 0), 2) AS bc_icms_saida,
    ROUND(COALESCE(s.vl_icms_saida, 0), 2) AS vl_icms_saida,
    
    -- Cr√©ditos
    ROUND(COALESCE(e.bc_icms_entrada, 0), 2) AS bc_icms_entrada,
    ROUND(COALESCE(e.vl_icms_entrada, 0), 2) AS vl_icms_entrada,
    ROUND(COALESCE(c.vl_icms_cte, 0), 2) AS vl_icms_cte,
    
    -- PGDAS-D
    ROUND(COALESCE(p.receita_bruta, 0), 2) AS receita_bruta_declarada,
    ROUND(COALESCE(p.icms_declarado_sc, 0), 2) AS icms_declarado,
    
    -- C√°lculos
    ROUND(COALESCE(s.bc_icms_saida, 0) - COALESCE(p.receita_bruta, 0), 2) AS receita_nao_declarada,
    ROUND((COALESCE(s.bc_icms_saida, 0) - COALESCE(p.receita_bruta, 0)) * 0.17, 2) AS vl_icms_receita_omitida,
    
    -- ICMS DEVIDO
    ROUND(
        COALESCE(s.vl_icms_saida, 0) +
        ((COALESCE(s.bc_icms_saida, 0) - COALESCE(p.receita_bruta, 0)) * 0.17) -
        COALESCE(e.vl_icms_entrada, 0) -
        COALESCE(c.vl_icms_cte, 0) -
        COALESCE(p.icms_declarado_sc, 0),
        2
    ) AS vl_icms_devido,
    
    -- Vencimento (dia 10 do m√™s seguinte)
    DATE_FORMAT(
        ADD_MONTHS(
            CONCAT(
                CAST(COALESCE(s.periodo_ref, e.periodo_ref, c.periodo_ref, p.periodo_ref) / 100 AS INT), '-',
                LPAD(CAST(COALESCE(s.periodo_ref, e.periodo_ref, c.periodo_ref, p.periodo_ref) % 100 AS INT), 2, '0'), '-01'
            ),
            1
        ),
        'yyyy-MM-10'
    ) AS data_vencimento

FROM apuracao_saida s
FULL OUTER JOIN apuracao_entrada e 
    ON s.periodo_ref = e.periodo_ref AND s.cnpj = e.cnpj
FULL OUTER JOIN apuracao_cte c 
    ON COALESCE(s.periodo_ref, e.periodo_ref) = c.periodo_ref 
    AND COALESCE(s.cnpj, e.cnpj) = c.cnpj
FULL OUTER JOIN vw_pgdas_filtrado p 
    ON COALESCE(s.periodo_ref, e.periodo_ref, c.periodo_ref) = p.periodo_ref
    AND COALESCE(s.cnpj, e.cnpj, c.cnpj) = p.cnpj
ORDER BY cnpj, periodo
"""

try:
    spark.sql(query_notificacao)
    print("‚úì View vw_notificacao criada")
    
    # Materializar resultado para evitar reprocessamento
    print("\nMaterializando dados da notifica√ß√£o...")
    df_notificacao = spark.sql("SELECT * FROM vw_notificacao ORDER BY cnpj, periodo")
    df_notificacao.cache()  # Cachear para evitar recalcular
    
    total_periodos = df_notificacao.count()
    print(f"‚úì Total de per√≠odos apurados: {total_periodos}")
    
    if total_periodos > 0:
        print("\nAPURA√á√ÉO MENSAL DE ICMS (primeiros 20 per√≠odos):")
        df_notificacao.show(20, truncate=False)
        
        # Converter para Pandas (s√≥ depois de cachear)
        print("\nConvertendo para Pandas...")
        df_notif_pandas = df_notificacao.toPandas()
        
        # Estat√≠sticas gerais
        print("\n" + "=" * 80)
        print("ESTAT√çSTICAS CONSOLIDADAS")
        print("=" * 80)
        print(f"Total ICMS Devido: R$ {df_notif_pandas['vl_icms_devido'].sum():,.2f}")
        print(f"Total D√©bitos (Sa√≠da): R$ {df_notif_pandas['vl_icms_saida'].sum():,.2f}")
        print(f"Total Cr√©ditos (Entrada): R$ {df_notif_pandas['vl_icms_entrada'].sum():,.2f}")
        print(f"Total Cr√©ditos (CTe): R$ {df_notif_pandas['vl_icms_cte'].sum():,.2f}")
        print(f"Total ICMS Declarado: R$ {df_notif_pandas['icms_declarado'].sum():,.2f}")
        print(f"Receita N√£o Declarada: R$ {df_notif_pandas['receita_nao_declarada'].sum():,.2f}")
        print(f"ICMS sobre Receita Omitida: R$ {df_notif_pandas['vl_icms_receita_omitida'].sum():,.2f}")
        
        # Resumo por CNPJ
        print("\n" + "=" * 80)
        print("RESUMO POR CNPJ")
        print("=" * 80)
        resumo_cnpj = df_notif_pandas.groupby('cnpj').agg({
            'periodo': 'count',
            'vl_icms_saida': 'sum',
            'vl_icms_entrada': 'sum',
            'vl_icms_cte': 'sum',
            'icms_declarado': 'sum',
            'vl_icms_devido': 'sum',
            'receita_nao_declarada': 'sum'
        }).round(2)
        resumo_cnpj.columns = ['Qtde_Per√≠odos', 'Total_D√©bitos', 'Total_Cr√©d_Entrada', 
                                'Total_Cr√©d_CTe', 'Total_Declarado', 'Total_Devido', 'Rec_Omitida']
        print(resumo_cnpj)
        
    else:
        print("\n‚ö† Nenhum per√≠odo com dados para apura√ß√£o")
        df_notif_pandas = pd.DataFrame()
        
except Exception as e:
    print(f"\n‚úó Erro ao gerar notifica√ß√£o: {e}")
    print("\nTentando com per√≠odo mais restrito (√∫ltimos 12 meses)...")
    
    # Recalcular per√≠odo mais recente
    periodo_fim_reduzido = PERIODO_FIM
    periodo_inicio_reduzido = (PERIODO_FIM // 100 - 1) * 100 + (PERIODO_FIM % 100)
    
    print(f"Per√≠odo ajustado: {periodo_inicio_reduzido} a {periodo_fim_reduzido}")
    
    # Voc√™ pode tentar reprocessar com per√≠odo menor
    df_notif_pandas = pd.DataFrame()

In [None]:
# ===================================================================
# C√âLULA 11: CONSOLIDAR NOTIFICA√á√ÉO (CORRIGIDA)
# ===================================================================
print("\n" + "=" * 80)
print("GERANDO NOTIFICA√á√ÉO - APURA√á√ÉO MENSAL DE ICMS")
print("=" * 80)

query_notificacao = """
CREATE OR REPLACE TEMPORARY VIEW vw_notificacao AS
WITH apuracao_saida AS (
    SELECT 
        periodo_ref,
        cnpj_emitente AS cnpj,
        SUM(bc_icms_debito) AS bc_icms_saida,
        SUM(vl_icms_debito) AS vl_icms_saida
    FROM vw_saida_emitente
    GROUP BY periodo_ref, cnpj_emitente
),
apuracao_entrada AS (
    SELECT 
        periodo_ref,
        cnpj_destinatario AS cnpj,
        SUM(bc_icms_credito) AS bc_icms_entrada,
        SUM(vl_icms_credito) AS vl_icms_entrada
    FROM vw_entrada_destinatario
    GROUP BY periodo_ref, cnpj_destinatario
),
apuracao_cte AS (
    SELECT 
        periodo_ref_num AS periodo_ref,
        cnpj_tomador AS cnpj,
        SUM(valor_icms) AS vl_icms_cte
    FROM vw_cte_filtrado
    WHERE tomador_no_cadastro = 1
    GROUP BY periodo_ref_num, cnpj_tomador
)
SELECT 
    COALESCE(s.periodo_ref, e.periodo_ref, c.periodo_ref, p.periodo_ref) AS periodo,
    COALESCE(s.cnpj, e.cnpj, c.cnpj, p.cnpj) AS cnpj,
    CONCAT(
        LPAD(CAST(COALESCE(s.periodo_ref, e.periodo_ref, c.periodo_ref, p.periodo_ref) % 100 AS STRING), 2, '0'),
        '/',
        CAST(COALESCE(s.periodo_ref, e.periodo_ref, c.periodo_ref, p.periodo_ref) / 100 AS STRING)
    ) AS periodo_formatado,
    
    -- D√©bitos
    ROUND(COALESCE(s.bc_icms_saida, 0), 2) AS bc_icms_saida,
    ROUND(COALESCE(s.vl_icms_saida, 0), 2) AS vl_icms_saida,
    
    -- Cr√©ditos
    ROUND(COALESCE(e.bc_icms_entrada, 0), 2) AS bc_icms_entrada,
    ROUND(COALESCE(e.vl_icms_entrada, 0), 2) AS vl_icms_entrada,
    ROUND(COALESCE(c.vl_icms_cte, 0), 2) AS vl_icms_cte,
    
    -- PGDAS-D
    ROUND(COALESCE(p.receita_bruta, 0), 2) AS receita_bruta_declarada,
    ROUND(COALESCE(p.icms_declarado_sc, 0), 2) AS icms_declarado,
    
    -- C√°lculos
    ROUND(COALESCE(s.bc_icms_saida, 0) - COALESCE(p.receita_bruta, 0), 2) AS receita_nao_declarada,
    ROUND((COALESCE(s.bc_icms_saida, 0) - COALESCE(p.receita_bruta, 0)) * 0.17, 2) AS vl_icms_receita_omitida,
    
    -- ICMS DEVIDO
    ROUND(
        COALESCE(s.vl_icms_saida, 0) +
        ((COALESCE(s.bc_icms_saida, 0) - COALESCE(p.receita_bruta, 0)) * 0.17) -
        COALESCE(e.vl_icms_entrada, 0) -
        COALESCE(c.vl_icms_cte, 0) -
        COALESCE(p.icms_declarado_sc, 0),
        2
    ) AS vl_icms_devido,
    
    -- Vencimento (dia 10 do m√™s seguinte)
    DATE_FORMAT(
        ADD_MONTHS(
            CONCAT(
                CAST(COALESCE(s.periodo_ref, e.periodo_ref, c.periodo_ref, p.periodo_ref) / 100 AS INT), '-',
                LPAD(CAST(COALESCE(s.periodo_ref, e.periodo_ref, c.periodo_ref, p.periodo_ref) % 100 AS INT), 2, '0'), '-01'
            ),
            1
        ),
        'yyyy-MM-10'
    ) AS data_vencimento

FROM apuracao_saida s
FULL OUTER JOIN apuracao_entrada e 
    ON s.periodo_ref = e.periodo_ref AND s.cnpj = e.cnpj
FULL OUTER JOIN apuracao_cte c 
    ON COALESCE(s.periodo_ref, e.periodo_ref) = c.periodo_ref 
    AND COALESCE(s.cnpj, e.cnpj) = c.cnpj
FULL OUTER JOIN vw_pgdas_filtrado p 
    ON COALESCE(s.periodo_ref, e.periodo_ref, c.periodo_ref) = p.periodo_ref
    AND COALESCE(s.cnpj, e.cnpj, c.cnpj) = p.cnpj
ORDER BY cnpj, periodo
"""

spark.sql(query_notificacao)
print("‚úì View vw_notificacao criada")

# Exibir resultado
df_notificacao = spark.sql("SELECT * FROM vw_notificacao ORDER BY cnpj, periodo")
total_periodos = df_notificacao.count()
print(f"\nTotal de per√≠odos apurados: {total_periodos}")

if total_periodos > 0:
    print("\nAPURA√á√ÉO MENSAL DE ICMS:")
    df_notificacao.show(50, truncate=False)
    
    # Converter para Pandas para an√°lises
    df_notif_pandas = df_notificacao.toPandas()
    
    # Estat√≠sticas gerais
    print("\n" + "=" * 80)
    print("ESTAT√çSTICAS CONSOLIDADAS")
    print("=" * 80)
    print(f"Total ICMS Devido: R$ {df_notif_pandas['vl_icms_devido'].sum():,.2f}")
    print(f"Total D√©bitos (Sa√≠da): R$ {df_notif_pandas['vl_icms_saida'].sum():,.2f}")
    print(f"Total Cr√©ditos (Entrada): R$ {df_notif_pandas['vl_icms_entrada'].sum():,.2f}")
    print(f"Total Cr√©ditos (CTe): R$ {df_notif_pandas['vl_icms_cte'].sum():,.2f}")
    print(f"Total ICMS Declarado: R$ {df_notif_pandas['icms_declarado'].sum():,.2f}")
    print(f"Receita N√£o Declarada: R$ {df_notif_pandas['receita_nao_declarada'].sum():,.2f}")
    print(f"ICMS sobre Receita Omitida: R$ {df_notif_pandas['vl_icms_receita_omitida'].sum():,.2f}")
    
    # Resumo por CNPJ
    print("\n" + "=" * 80)
    print("RESUMO POR CNPJ")
    print("=" * 80)
    resumo_cnpj = df_notif_pandas.groupby('cnpj').agg({
        'periodo': 'count',
        'vl_icms_saida': 'sum',
        'vl_icms_entrada': 'sum',
        'vl_icms_cte': 'sum',
        'icms_declarado': 'sum',
        'vl_icms_devido': 'sum',
        'receita_nao_declarada': 'sum'
    }).round(2)
    resumo_cnpj.columns = ['Qtde_Per√≠odos', 'Total_D√©bitos', 'Total_Cr√©d_Entrada', 
                            'Total_Cr√©d_CTe', 'Total_Declarado', 'Total_Devido', 'Rec_Omitida']
    print(resumo_cnpj)
    
else:
    print("\n‚ö† Nenhum per√≠odo com dados para apura√ß√£o")
    df_notif_pandas = pd.DataFrame()

In [None]:
# ===================================================================
# C√âLULA 12: VISUALIZA√á√ïES E AN√ÅLISES
# ===================================================================
print("\n" + "=" * 80)
print("VISUALIZA√á√ïES E AN√ÅLISES")
print("=" * 80)

import matplotlib.pyplot as plt
import seaborn as sns

# Preparar dados
df_plot = df_notif_pandas.copy()
df_plot['periodo'] = df_plot['periodo'].astype(str)

# Criar figura com m√∫ltiplos gr√°ficos
fig, axes = plt.subplots(2, 2, figsize=(18, 12))
fig.suptitle('AN√ÅLISE FISCAL - ICMS', fontsize=16, fontweight='bold')

# Gr√°fico 1: Evolu√ß√£o do ICMS Devido
ax1 = axes[0, 0]
for cnpj in df_plot['cnpj'].unique():
    data = df_plot[df_plot['cnpj'] == cnpj]
    ax1.plot(data['periodo'], data['vl_icms_devido'], marker='o', label=f'CNPJ {cnpj}')
ax1.set_title('Evolu√ß√£o do ICMS Devido', fontweight='bold')
ax1.set_xlabel('Per√≠odo')
ax1.set_ylabel('Valor (R$)')
ax1.legend()
ax1.grid(True, alpha=0.3)
ax1.tick_params(axis='x', rotation=45)

# Gr√°fico 2: D√©bitos vs Cr√©ditos
ax2 = axes[0, 1]
periodos = df_plot['periodo'].unique()[:12]  # √öltimos 12 meses
data_resumo = df_plot[df_plot['periodo'].isin(periodos)].groupby('periodo').agg({
    'vl_icms_saida': 'sum',
    'vl_icms_entrada': 'sum',
    'vl_icms_cte': 'sum'
}).reset_index()

x = range(len(data_resumo))
width = 0.25
ax2.bar([i - width for i in x], data_resumo['vl_icms_saida'], width, label='D√©bito (Sa√≠da)', color='red', alpha=0.7)
ax2.bar(x, data_resumo['vl_icms_entrada'], width, label='Cr√©dito (Entrada)', color='green', alpha=0.7)
ax2.bar([i + width for i in x], data_resumo['vl_icms_cte'], width, label='Cr√©dito (CTe)', color='blue', alpha=0.7)
ax2.set_title('D√©bitos vs Cr√©ditos - √öltimos 12 Meses', fontweight='bold')
ax2.set_xlabel('Per√≠odo')
ax2.set_ylabel('Valor (R$)')
ax2.set_xticks(x)
ax2.set_xticklabels(data_resumo['periodo'], rotation=45)
ax2.legend()
ax2.grid(True, alpha=0.3)

# Gr√°fico 3: Receita Declarada vs Receita Apurada
ax3 = axes[1, 0]
data_receita = df_plot.groupby('periodo').agg({
    'bc_icms_saida': 'sum',
    'receita_bruta_declarada': 'sum'
}).reset_index()
ax3.plot(data_receita['periodo'], data_receita['bc_icms_saida'], marker='o', label='BC ICMS (Apurada)', linewidth=2)
ax3.plot(data_receita['periodo'], data_receita['receita_bruta_declarada'], marker='s', label='Receita Declarada', linewidth=2)
ax3.fill_between(range(len(data_receita)), data_receita['bc_icms_saida'], data_receita['receita_bruta_declarada'], 
                  where=(data_receita['bc_icms_saida'] > data_receita['receita_bruta_declarada']), 
                  alpha=0.3, color='red', label='Diferen√ßa')
ax3.set_title('Receita Declarada vs Apurada', fontweight='bold')
ax3.set_xlabel('Per√≠odo')
ax3.set_ylabel('Valor (R$)')
ax3.legend()
ax3.grid(True, alpha=0.3)
ax3.tick_params(axis='x', rotation=45)

# Gr√°fico 4: Composi√ß√£o do ICMS Devido
ax4 = axes[1, 1]
totais = df_plot.sum()
componentes = ['D√©bito\n(Sa√≠da)', 'Receita\nOmitida', 'Declarado', 'Cr√©dito\n(Entrada)', 'Cr√©dito\n(CTe)']
valores = [
    totais['vl_icms_saida'],
    totais['vl_icms_receita_omitida'],
    -totais['icms_declarado'],
    -totais['vl_icms_entrada'],
    -totais['vl_icms_cte']
]
cores = ['red', 'orange', 'blue', 'green', 'cyan']
ax4.bar(componentes, valores, color=cores, alpha=0.7)
ax4.axhline(y=0, color='black', linestyle='-', linewidth=0.5)
ax4.set_title('Composi√ß√£o do ICMS Devido (Total)', fontweight='bold')
ax4.set_ylabel('Valor (R$)')
ax4.grid(True, alpha=0.3, axis='y')

plt.tight_layout()
plt.show()

# Estat√≠sticas resumidas
print("\n" + "=" * 80)
print("ESTAT√çSTICAS CONSOLIDADAS")
print("=" * 80)
print(f"Total ICMS Devido: R$ {df_plot['vl_icms_devido'].sum():,.2f}")
print(f"Total D√©bitos: R$ {df_plot['vl_icms_saida'].sum():,.2f}")
print(f"Total Cr√©ditos (Entrada): R$ {df_plot['vl_icms_entrada'].sum():,.2f}")
print(f"Total Cr√©ditos (CTe): R$ {df_plot['vl_icms_cte'].sum():,.2f}")
print(f"Total ICMS Declarado: R$ {df_plot['icms_declarado'].sum():,.2f}")
print(f"Receita N√£o Declarada: R$ {df_plot['receita_nao_declarada'].sum():,.2f}")

In [None]:
# ===================================================================
# C√âLULA 13: EXPORTAR RESULTADOS PARA REDE LOCAL EM EXCEL
# ===================================================================
print("\n" + "=" * 80)
print("EXPORTANDO RESULTADOS PARA REDE LOCAL")
print("=" * 80)

import getpass
import smbclient
import os
import logging
from datetime import datetime

# Configurar logging
logging.getLogger('smbprotocol').setLevel(logging.WARNING)

# ==============================================================================
# CONFIGURA√á√ÉO DA CONEX√ÉO SMB
# ==============================================================================
server = "sef.sc.gov.br"
user = "tsevero"

try:
    pwd = getpass.getpass(f"Digite a senha de rede para {user}@{server}: ")
    smbclient.register_session(server, username=user, password=pwd)
    print(f"‚úì Sess√£o SMB registrada com sucesso para {user}!")
except Exception as e:
    print(f"‚úó Falha ao registrar sess√£o SMB: {e}")
    raise

# ==============================================================================
# DEFINIR DIRET√ìRIO DE DESTINO
# ==============================================================================
# Criar pasta com timestamp para organizar
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
dest_dir = r"\\sef.sc.gov.br\DFS\GERFE08\Backup Severo\JUPYTER\Analise_Fiscal_NFe_CTe"
dest_dir_timestamped = os.path.join(dest_dir, timestamp).replace(os.sep, '\\')

print(f"\nCriando pasta de destino: {dest_dir_timestamped}")
try:
    if not smbclient.path.isdir(dest_dir):
        smbclient.makedirs(dest_dir)
    smbclient.makedirs(dest_dir_timestamped)
    print("‚úì Pasta criada com sucesso")
except Exception as e:
    print(f"‚úó Erro ao criar pasta: {e}")
    raise

# ==============================================================================
# FUN√á√ÉO PARA EXPORTAR DATAFRAME PARA EXCEL NA REDE
# ==============================================================================
def exportar_para_excel_rede(df_spark, nome_arquivo, caminho_destino):
    """
    Exporta um DataFrame Spark para Excel na rede usando SMB
    """
    try:
        print(f"  Processando: {nome_arquivo}...")
        
        # Converter para Pandas
        df_pandas = df_spark.toPandas()
        
        if len(df_pandas) == 0:
            print(f"  ‚ö† DataFrame vazio, pulando: {nome_arquivo}")
            return False
        
        # Caminho completo do arquivo
        caminho_completo = os.path.join(caminho_destino, nome_arquivo).replace(os.sep, '\\')
        
        # Criar arquivo tempor√°rio local
        temp_file = f"/tmp/{nome_arquivo}"
        
        # Salvar como Excel localmente
        with pd.ExcelWriter(temp_file, engine='openpyxl') as writer:
            df_pandas.to_excel(writer, index=False, sheet_name='Dados')
        
        # Copiar para a rede
        with open(temp_file, 'rb') as f_local:
            with smbclient.open_file(caminho_completo, mode='wb') as f_remoto:
                f_remoto.write(f_local.read())
        
        # Limpar arquivo tempor√°rio
        os.remove(temp_file)
        
        print(f"  ‚úì {nome_arquivo}: {len(df_pandas):,} registros exportados")
        return True
        
    except Exception as e:
        print(f"  ‚úó Erro ao exportar {nome_arquivo}: {e}")
        return False

# ==============================================================================
# EXPORTAR ARQUIVOS PRINCIPAIS
# ==============================================================================
print("\n" + "=" * 80)
print("INICIANDO EXPORTA√á√ÉO DOS ARQUIVOS")
print("=" * 80)

arquivos_sucesso = []
arquivos_erro = []

# 1. NOTIFICA√á√ÉO (Apura√ß√£o Mensal)
print("\n1. NOTIFICA√á√ÉO - Apura√ß√£o Mensal de ICMS")
if exportar_para_excel_rede(
    spark.sql("SELECT * FROM vw_notificacao ORDER BY cnpj, periodo"),
    "01_Notificacao.xlsx",
    dest_dir_timestamped
):
    arquivos_sucesso.append("01_Notificacao.xlsx")
else:
    arquivos_erro.append("01_Notificacao.xlsx")

# 2. ENTRADA DESTINAT√ÅRIO (Cr√©ditos)
print("\n2. ENTRADA DESTINAT√ÅRIO - Notas de Entrada (Cr√©ditos)")
if exportar_para_excel_rede(
    spark.sql("SELECT * FROM vw_entrada_destinatario ORDER BY periodo_ref, chave_nfe"),
    "02_Entrada_Destinatario.xlsx",
    dest_dir_timestamped
):
    arquivos_sucesso.append("02_Entrada_Destinatario.xlsx")
else:
    arquivos_erro.append("02_Entrada_Destinatario.xlsx")

# 3. SA√çDA EMITENTE (D√©bitos)
print("\n3. SA√çDA EMITENTE - Notas de Sa√≠da (D√©bitos)")
if exportar_para_excel_rede(
    spark.sql("SELECT * FROM vw_saida_emitente ORDER BY periodo_ref, chave_nfe"),
    "03_Saida_Emitente.xlsx",
    dest_dir_timestamped
):
    arquivos_sucesso.append("03_Saida_Emitente.xlsx")
else:
    arquivos_erro.append("03_Saida_Emitente.xlsx")

# 4. CTe (Cr√©dito de Frete)
print("\n4. CTe - Conhecimento de Transporte")
if exportar_para_excel_rede(
    spark.sql("SELECT * FROM vw_cte_filtrado ORDER BY periodo_ref, chave_cte"),
    "04_CTe.xlsx",
    dest_dir_timestamped
):
    arquivos_sucesso.append("04_CTe.xlsx")
else:
    arquivos_erro.append("04_CTe.xlsx")

# 5. PGDAS-D (Simples Nacional)
print("\n5. PGDAS-D - Declara√ß√µes Simples Nacional")
if exportar_para_excel_rede(
    spark.sql("SELECT * FROM vw_pgdas_filtrado ORDER BY cnpj, periodo_ref"),
    "05_PGDAS_D.xlsx",
    dest_dir_timestamped
):
    arquivos_sucesso.append("05_PGDAS_D.xlsx")
else:
    arquivos_erro.append("05_PGDAS_D.xlsx")

# 6. RESUMO POR CNPJ
print("\n6. RESUMO POR CNPJ")
df_resumo_cnpj = spark.sql("""
    SELECT 
        cnpj,
        COUNT(DISTINCT periodo) as qtde_periodos,
        ROUND(SUM(vl_icms_saida), 2) as total_debitos,
        ROUND(SUM(vl_icms_entrada), 2) as total_creditos_entrada,
        ROUND(SUM(vl_icms_cte), 2) as total_creditos_cte,
        ROUND(SUM(icms_declarado), 2) as total_declarado,
        ROUND(SUM(vl_icms_devido), 2) as total_devido,
        ROUND(SUM(receita_nao_declarada), 2) as total_receita_omitida
    FROM vw_notificacao
    GROUP BY cnpj
""")

if exportar_para_excel_rede(df_resumo_cnpj, "06_Resumo_por_CNPJ.xlsx", dest_dir_timestamped):
    arquivos_sucesso.append("06_Resumo_por_CNPJ.xlsx")
else:
    arquivos_erro.append("06_Resumo_por_CNPJ.xlsx")

# 7. TOP CFOPs
print("\n7. TOP CFOPs")
df_top_cfops = spark.sql("""
    SELECT 
        cfop,
        descricaocfop,
        entrada_saida,
        conta,
        COUNT(*) as qtde_itens,
        ROUND(SUM(valor_produto), 2) as valor_total
    FROM vw_nfe_com_icms
    GROUP BY cfop, descricaocfop, entrada_saida, conta
    ORDER BY qtde_itens DESC
    LIMIT 100
""")

if exportar_para_excel_rede(df_top_cfops, "07_Top_CFOPs.xlsx", dest_dir_timestamped):
    arquivos_sucesso.append("07_Top_CFOPs.xlsx")
else:
    arquivos_erro.append("07_Top_CFOPs.xlsx")

# 8. TOP PRODUTOS
print("\n8. TOP PRODUTOS")
df_top_produtos = spark.sql("""
    SELECT 
        descricao_produto,
        ncm,
        COUNT(*) as qtde_vendas,
        ROUND(SUM(valor_produto), 2) as valor_total
    FROM vw_nfe_com_icms
    WHERE entrada_saida = 'Saida' AND emitente_no_cadastro = 1
    GROUP BY descricao_produto, ncm
    ORDER BY valor_total DESC
    LIMIT 100
""")

if exportar_para_excel_rede(df_top_produtos, "08_Top_Produtos.xlsx", dest_dir_timestamped):
    arquivos_sucesso.append("08_Top_Produtos.xlsx")
else:
    arquivos_erro.append("08_Top_Produtos.xlsx")

# ==============================================================================
# RELAT√ìRIO FINAL
# ==============================================================================
print("\n" + "=" * 80)
print("RELAT√ìRIO DE EXPORTA√á√ÉO")
print("=" * 80)
print(f"\nPasta de destino: {dest_dir_timestamped}")
print(f"\nArquivos exportados com sucesso: {len(arquivos_sucesso)}")
for arq in arquivos_sucesso:
    print(f"  ‚úì {arq}")

if arquivos_erro:
    print(f"\nArquivos com erro: {len(arquivos_erro)}")
    for arq in arquivos_erro:
        print(f"  ‚úó {arq}")

print("\n" + "=" * 80)
print("EXPORTA√á√ÉO CONCLU√çDA!")
print("=" * 80)