<a href="https://colab.research.google.com/github/lsglearning/agentaz02/blob/main/agentaz.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
azure-functions
azure-ai-documentintelligence>=1.0.0b1
azure-search-documents>=11.4.0
openai>=1.0.0
azure-identity
python-dotenv

---------------

{
  "IsEncrypted": false,
  "Values": {
    "AzureWebJobsStorage": "YOUR_STORAGE_CONNECTION_STRING", // O UseDevelopmentStorage=true para Azurite
    "FUNCTIONS_WORKER_RUNTIME": "python",
    "AzureWebJobsFeatureFlags": "EnableWorkerIndexing",

    "DOC_INTELLIGENCE_ENDPOINT": "YOUR_DOC_INTELLIGENCE_ENDPOINT",
    "DOC_INTELLIGENCE_KEY": "YOUR_DOC_INTELLIGENCE_KEY",

    "AZURE_OPENAI_ENDPOINT": "YOUR_AZURE_OPENAI_ENDPOINT",
    "AZURE_OPENAI_KEY": "YOUR_AZURE_OPENAI_KEY",
    "AZURE_OPENAI_EMBEDDING_DEPLOYMENT": "your-embedding-deployment-name",

    "AZURE_SEARCH_ENDPOINT": "YOUR_AZURE_SEARCH_ENDPOINT",
    "AZURE_SEARCH_KEY": "YOUR_AZURE_SEARCH_ADMIN_KEY",
    "AZURE_SEARCH_INDEX_NAME": "your-legal-index-name"
  }
}

----------------

import logging
import os
import uuid
from typing import List, Dict
import time # Para posible espera simple (aunque no ideal para lag de creación)

import azure.functions as func
from azure.core.exceptions import HttpResponseError # Para capturar errores de SDK
from azure.core.credentials import AzureKeyCredential
from azure.ai.documentintelligence import DocumentIntelligenceClient
from azure.ai.documentintelligence.models import AnalyzeResult, AnalyzeDocumentRequest
from azure.search.documents import SearchClient
from openai import AzureOpenAI, APIError # Importar APIError para manejo específico

# --- Configuración y Clientes ---
# (Incluye manejo de errores si falta configuración)
clients_initialized = False
doc_intelligence_client = None
openai_client = None
search_client = None
openai_embedding_deployment = None
search_index_name = None

try:
    # Cargar configuración
    doc_intelligence_endpoint = os.environ["DOC_INTELLIGENCE_ENDPOINT"]
    doc_intelligence_key = os.environ["DOC_INTELLIGENCE_KEY"]
    openai_endpoint = os.environ["AZURE_OPENAI_ENDPOINT"]
    openai_key = os.environ["AZURE_OPENAI_KEY"]
    openai_embedding_deployment = os.environ["AZURE_OPENAI_EMBEDDING_DEPLOYMENT"]
    search_endpoint = os.environ["AZURE_SEARCH_ENDPOINT"]
    search_key = os.environ["AZURE_SEARCH_KEY"]
    search_index_name = os.environ["AZURE_SEARCH_INDEX_NAME"]

    # Crear Clientes
    doc_intelligence_client = DocumentIntelligenceClient(
        endpoint=doc_intelligence_endpoint, credential=AzureKeyCredential(doc_intelligence_key)
    )
    openai_client = AzureOpenAI(
        azure_endpoint=openai_endpoint,
        api_key=openai_key,
        api_version="2024-02-01" # O versión API adecuada
    )
    search_client = SearchClient(
        endpoint=search_endpoint,
        index_name=search_index_name,
        credential=AzureKeyCredential(search_key)
    )
    logging.info("Azure service clients initialized successfully.")
    clients_initialized = True

except KeyError as e:
    logging.error(f"CRITICAL ERROR: Missing Application Setting: {e}.")
except Exception as e:
    logging.error(f"CRITICAL ERROR: Failed to initialize Azure service clients: {e}", exc_info=True)

# --- Instancia de la App de Funciones V2 ---
app = func.FunctionApp()

# --- Lógica de Fragmentación por Párrafos ---
def chunk_by_paragraph(di_result: AnalyzeResult, filename: str) -> List[Dict]:
    """
    Fragmenta el texto basado en los párrafos detectados por Document Intelligence.
    """
    chunks = []
    if not di_result.paragraphs:
        logging.warning(f"No paragraphs found by Document Intelligence for {filename}.")
        # Opcionalmente, tratar todo el contenido como un solo chunk si es corto
        if di_result.content and len(di_result.content.strip()) > 0:
             logging.warning(f"Treating entire content as one chunk for {filename}.")
             chunk_id_str = f"{filename}-para-0-{uuid.uuid4()}"
             chunks.append({
                 "id": chunk_id_str.replace("_", "-"), # Reemplazar '_' por '-' si la clave no lo permite
                 "content": di_result.content.strip(),
                 "source_document": filename,
                 "chunk_id": 0
                 # Añadir más metadatos si es necesario (ej. página, rol)
             })
        return chunks

    logging.info(f"Found {len(di_result.paragraphs)} paragraphs. Creating chunks...")
    for i, paragraph in enumerate(di_result.paragraphs):
        if paragraph.content and len(paragraph.content.strip()) > 0: # Ignorar párrafos vacíos
            chunk_id_str = f"{filename}-para-{i}-{uuid.uuid4()}"
            chunks.append({
                "id": chunk_id_str.replace("_", "-"), # Asegurar IDs válidos para la clave
                "content": paragraph.content.strip(),
                "source_document": filename,
                "chunk_id": i
                # Podrías añadir 'paragraph.bounding_regions[0].page_number' si lo necesitas
            })
        else:
             logging.info(f"Skipping empty paragraph {i} for {filename}.")

    logging.info(f"Generated {len(chunks)} chunks based on paragraphs for {filename}.")
    return chunks

# --- Función Principal del Trigger ---
@app.blob_trigger(arg_name="myblob", path="documentos-legales/{name}", # Ajusta tu contenedor
                  connection="AzureWebJobsStorage") # Ajusta si usas otra conexión
def process_document_and_index(myblob: func.InputStream):
    """
    Función completa V2: Trigger -> DI -> Chunking por Párrafo -> Embedding -> AI Search Index
    """
    if not clients_initialized:
        logging.error("Aborting: Azure service clients not initialized.")
        # Podrías lanzar una excepción para que el trigger reintente si es apropiado
        return

    start_time = time.time()
    logging.info(f"Python V2 trigger processing blob: {myblob.name}, Size: {myblob.length} Bytes")
    file_name = os.path.basename(myblob.name)

    # Asumimos PDF por simplicidad, añade filtro si es necesario
    # if not file_name.lower().endswith('.pdf'): ...

    try:
        # --- 1. Extraer Texto y Párrafos con Document Intelligence ---
        logging.info(f"Reading blob and calling Document Intelligence for {file_name}...")
        blob_bytes = myblob.read()
        poller = doc_intelligence_client.begin_analyze_document(
            "prebuilt-layout", # Layout es bueno para obtener párrafos
            AnalyzeDocumentRequest(bytes_source=blob_bytes)
        )
        di_result: AnalyzeResult = poller.result()
        logging.info("Document Intelligence analysis completed.")

        if not di_result.content or len(di_result.content.strip()) == 0:
            logging.warning(f"Document Intelligence returned no content for {file_name}.")
            return

        # --- 2. Fragmentar el Texto por Párrafos ---
        chunks_data = chunk_by_paragraph(di_result, file_name)
        if not chunks_data:
            logging.warning(f"No chunks generated for {file_name}.")
            return

        # --- 3. Generar Vectores (Embeddings) ---
        logging.info(f"Generating embeddings for {len(chunks_data)} chunks...")
        texts_to_embed = [chunk["content"] for chunk in chunks_data]
        # OpenAI recomienda reemplazar \n para mejor rendimiento
        texts_to_embed = [text.replace("\n", " ") for text in texts_to_embed]

        try:
            # La llamada a create puede manejar lotes internamente hasta cierto punto
            embedding_result = openai_client.embeddings.create(
                model=openai_embedding_deployment,
                input=texts_to_embed
            )
            # Asociar vectores a los chunks
            for i, chunk in enumerate(chunks_data):
                chunk["content_vector"] = embedding_result.data[i].embedding
            logging.info(f"Embeddings generated for {len(chunks_data)} chunks.")

        except APIError as api_err:
             logging.error(f"Azure OpenAI API Error during embedding: {api_err}", exc_info=True)
             # Considerar reintento o fallo específico
             return
        except Exception as emb_error:
            logging.error(f"Generic error during embedding: {emb_error}", exc_info=True)
            return # Fallar si no se pueden generar embeddings

        # --- 4. Subir Documentos a Azure AI Search ---
        logging.info(f"Uploading {len(chunks_data)} documents to index '{search_index_name}'...")
        try:
            # El SDK maneja la subida en lote
            upload_result = search_client.upload_documents(documents=chunks_data)
            successful_uploads = sum(1 for r in upload_result if r.succeeded)
            logging.info(f"Upload finished. Success for {successful_uploads} out of {len(chunks_data)} documents.")
            # Revisar errores individuales si es necesario
            if successful_uploads < len(chunks_data):
                 for item_result in upload_result:
                      if not item_result.succeeded:
                           logging.error(f"  Failed to index document ID {item_result.key}: {item_result.error_message} (Status: {item_result.status_code})")
                 # Podrías lanzar una excepción aquí si cualquier fallo es crítico

        except HttpResponseError as search_err:
            # Capturar errores específicos de la API de Search
            logging.error(f"Azure AI Search API Error during upload: {search_err.status_code} - {search_err.message}", exc_info=True)
            if search_err.status_code == 404:
                 logging.error(f"INDEX NOT FOUND? Verify that the index '{search_index_name}' exists and is ready.")
            # Aquí podrías implementar reintentos con espera exponencial para errores transitorios (ej. 503)
            # Pero no intentar crear el índice desde aquí.
            raise search_err # Re-lanzar para que Azure Functions gestione el reintento si está configurado
        except Exception as search_error:
            logging.error(f"Generic error uploading to Azure AI Search: {search_error}", exc_info=True)
            raise search_error # Re-lanzar

    except Exception as e:
        logging.error(f"Unhandled error processing {file_name}: {e}", exc_info=True)
        # Considera enviar a dead-letter queue o similar
        # Re-lanzar la excepción puede ser útil para que el trigger reintente (si la causa es transitoria)
        raise e
    finally:
         end_time = time.time()
         logging.info(f"Processing for {file_name} completed in {end_time - start_time:.2f} seconds.")