<a href="https://colab.research.google.com/github/monica-sabag/Projeto-Final-Artefatos/blob/main/Agente_Extra%C3%A7%C3%A3o_Dados_Fiscais.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# ==============================================================================
# 1. INSTALAÇÃO E CONFIGURAÇÃO
# ==============================================================================

# Instalação das bibliotecas
!pip install langchain pydantic google-genai langchain-google-genai --quiet
print("--- 1. INSTALAÇÃO CONCLUÍDA ---")

# Importações necessárias
import os
import zipfile
import io
from google.colab import userdata, files
from pydantic import BaseModel, Field
from langchain_google_genai import ChatGoogleGenerativeAI
from langchain.output_parsers import PydanticOutputParser
from langchain.prompts import PromptTemplate
from langchain.chains import LLMChain

# Configuração da Chave de API
gemini_api_key_value = None
if 'GEMINI_API_KEY' not in os.environ:
    try:
        # Tenta carregar a chave dos Secrets do Colab
        gemini_api_key_value = userdata.get('GEMINI_API_KEY')
        if gemini_api_key_value:
            os.environ['GEMINI_API_KEY'] = gemini_api_key_value
    except Exception:
        pass

if os.environ.get('GEMINI_API_KEY'):
    print("--- 2. CHAVE DE API CARREGADA ---")
    gemini_api_key_value = os.environ.get('GEMINI_API_KEY')
else:
    print("ERRO: A chave 'GEMINI_API_KEY' não foi encontrada. Configure-a nos Secrets do Colab.")

# ------------------------------------------------------------------------------

# ==============================================================================
# 2. DEFINIÇÃO DO MODELO PYDANTIC E PROMPT
# ==============================================================================

class ItemFiscal(BaseModel):
    """Estrutura para um item individual da nota fiscal."""
    descricao: str = Field(description="Descrição completa do produto ou serviço.")
    quantidade: float = Field(description="Quantidade numérica vendida ou comprada.")
    valor_unitario: float = Field(description="Valor unitário do item (sem impostos).")
    cfop: str = Field(description="Código CFOP do item. Deve ser um código de 4 dígitos (ex: 5102).")

class NotaFiscalEstruturada(BaseModel):
    """Modelo de dados fiscais completo extraído do documento."""
    numero_nota: str = Field(description="Número de identificação da Nota Fiscal.")
    data_emissao: str = Field(description="Data de emissão no formato DD/MM/AAAA.")
    cnpj_emitente: str = Field(description="CNPJ da empresa emitente (fornecedor).")
    valor_total_nota: float = Field(description="Valor total da nota, incluindo impostos e frete.")
    itens: list[ItemFiscal] = Field(description="Lista de todos os itens detalhados da nota fiscal.")

extração_parser = PydanticOutputParser(pydantic_object=NotaFiscalEstruturada)

extração_prompt_template = PromptTemplate(
    template="""Você é um agente especializado em extração de dados fiscais.
    Sua tarefa é analisar o TEXTO DO DOCUMENTO (XML ou Texto simples) abaixo e extrair **somente** as informações fiscais solicitadas no formato JSON.
    Não adicione texto explicativo antes ou depois, apenas o JSON puro.

    INSTRUÇÕES DE FORMATO:
    {format_instructions}

    TEXTO DO DOCUMENTO:
    ---
    {texto_nota_fiscal}
    ---
    """,
    input_variables=["texto_nota_fiscal"],
    partial_variables={"format_instructions": extração_parser.get_format_instructions()},
)
print("--- 3. MODELOS PYDANTIC E PROMPT DEFINIDOS ---")

# ------------------------------------------------------------------------------

# ==============================================================================
# 4. FUNÇÕES DE PRÉ-PROCESSAMENTO E EXECUÇÃO
# ==============================================================================

# 1. Inicializar o Modelo Gemini e a Cadeia (CORREÇÃO DE AUTENTICAÇÃO)
llm = None
extraction_chain = None

if gemini_api_key_value:
    llm = ChatGoogleGenerativeAI(
        model="gemini-2.5-flash",
        temperature=0.0,
        api_key=gemini_api_key_value  # Passando a chave explicitamente para evitar RefreshError
    )
    extraction_chain = LLMChain(prompt=extração_prompt_template, llm=llm)
    print("--- 4.1. LLM INICIALIZADO COM SUCESSO ---")
else:
    print("ERRO: O modelo LLM NÃO PODE SER INICIALIZADO. Chave de API ausente ou inválida.")


def extrair_dados_documento(conteudo_documento: str, nome_documento: str) -> dict:
    """Função que executa o agente de extração em um único documento."""
    if not extraction_chain:
        print("   ❌ ERRO: Chain de extração não inicializada devido a falha na API Key.")
        return {"documento": nome_documento, "status": "FALHA_CONEXÃO"}

    print(f"\n-> Processando: {nome_documento}...")
    try:
        # 3.1 Chama o modelo e recebe o JSON em formato string
        json_string_extraido = extraction_chain.run(texto_nota_fiscal=conteudo_documento)

        # 3.2 O Pydantic Parser converte a string JSON validada em um objeto Python
        nota_fiscal_objeto = extração_parser.parse(json_string_extraido)

        print(f"   ✅ Extração de NF Nº {nota_fiscal_objeto.numero_nota} OK.")

        # CORREÇÃO: INCLUINDO O NOME DO ARQUIVO NO RETORNO DE SUCESSO
        resultado = nota_fiscal_objeto.model_dump()
        resultado['documento'] = nome_documento # Garante que o nome do arquivo aparece no relatório
        return resultado

    except Exception as e:
        # Captura falhas na extração ou erros de conexão
        print(f"   ❌ ERRO na extração de {nome_documento}. Motivo: {str(e)[:100]}...")
        return {"documento": nome_documento, "status": "FALHA_EXTRAÇÃO", "erro": str(e)}

def processar_arquivo_uploaded(uploaded_files: dict) -> list[dict]:
    """Lida com os arquivos carregados (XML, TXT ou ZIP)."""

    documentos_processados = []

    for nome_arquivo, conteudo_binario in uploaded_files.items():

        if nome_arquivo.lower().endswith('.zip'):
            print(f"--- Descompactando {nome_arquivo} ---")

            try:
                # Usa io.BytesIO para ler o conteúdo binário do ZIP da memória
                with zipfile.ZipFile(io.BytesIO(conteudo_binario), 'r') as zf:
                    for nome_membro in zf.namelist():
                        if nome_membro.lower().endswith(('.xml', '.txt')):
                            conteudo_membro = zf.read(nome_membro).decode('utf-8', errors='ignore')
                            resultado = extrair_dados_documento(conteudo_membro, nome_membro)
                            documentos_processados.append(resultado)
                        else:
                            print(f"   Aviso: Ignorando arquivo não XML/TXT: {nome_membro}")
            except Exception as e:
                print(f"   ❌ ERRO ao abrir ou ler o arquivo ZIP {nome_arquivo}: {e}")

        elif nome_arquivo.lower().endswith(('.xml', '.txt')):
            # Processa XML ou TXT simples
            try:
                conteudo_arquivo = conteudo_binario.decode('utf-8', errors='ignore')
                resultado = extrair_dados_documento(conteudo_arquivo, nome_arquivo)
                documentos_processados.append(resultado)
            except Exception as e:
                print(f"   ❌ ERRO ao decodificar o arquivo {nome_arquivo}: {e}")

        else:
            print(f"Aviso: Formato de arquivo não suportado (apenas .zip, .xml, .txt): {nome_arquivo}")

    return documentos_processados

# ------------------------------------------------------------------------------

# ==============================================================================
# 5. EXECUÇÃO PRINCIPAL
# ==============================================================================

print("\n--- 5. INICIANDO EXTRAÇÃO DE DADOS ---")

if llm:
    print("⚠️ Por favor, clique no botão para carregar seu(s) arquivo(s) fiscal(is) (.zip, .xml ou .txt).")

    # Executa o widget de upload do Colab
    uploaded = files.upload()

    if uploaded:
        resultados = processar_arquivo_uploaded(uploaded)

        print("\n=======================================================")
        print("✅ RELATÓRIO FINAL DO PROCESSAMENTO ✅")
        print("=======================================================")

        if resultados:
            for i, resultado in enumerate(resultados):
                print(f"--- Documento {i+1} ---")
                if resultado.get("status") not in ["FALHA_EXTRAÇÃO", "FALHA_CONEXÃO"]:
                    # Agora 'documento' virá do dicionário de sucesso
                    print(f"  Arquivo Fonte: {resultado.get('documento', 'N/A')}")
                    print(f"  NF/Nº: {resultado.get('numero_nota', 'N/A')}")
                    print(f"  Data de Emissão: {resultado.get('data_emissao', 'N/A')}")
                    print(f"  CNPJ Emitente: {resultado.get('cnpj_emitente', 'N/A')}")
                    print(f"  Valor Total: R$ {resultado.get('valor_total_nota', 0.0):.2f}")
                    print(f"  Itens Extraídos: {len(resultado.get('itens', []))}")
                else:
                    print(f"  Arquivo Fonte: {resultado.get('documento')}")
                    print(f"  STATUS: {resultado.get('status')} - Erro: {resultado.get('erro', 'Desconhecido')[:80]}...")
        else:
            print("Nenhum documento processado com sucesso.")

        print("=======================================================")

else:
    print("❌ Processamento interrompido devido a erro na Chave de API.")