In [1]:
import os
import json
import mimetypes
import unicodedata
from langchain.text_splitter import TokenTextSplitter
from azure.ai.formrecognizer import DocumentAnalysisClient
from azure.core.credentials import AzureKeyCredential
import os
import json
import logging
import re
import hashlib
from pathlib import Path
import pandas as pd
from openai import AzureOpenAI
from dotenv import load_dotenv
import logging
from azure.core.credentials import AzureKeyCredential
from azure.search.documents import SearchClient
from azure.search.documents.indexes import SearchIndexClient
from azure.search.documents.indexes.models import (
    SearchIndex, SimpleField, SearchField, ComplexField,
    SearchFieldDataType, SemanticConfiguration, SemanticField,
    SemanticPrioritizedFields, SemanticSearch, VectorSearch,
    VectorSearchProfile, HnswAlgorithmConfiguration
)

load_dotenv()

True

In [2]:
AZURE_FORM_RECOGNIZER_ENDPOINT = os.getenv("AZURE_FORM_RECOGNIZER_ENDPOINT")
AZURE_FORM_RECOGNIZER_API_KEY = os.getenv("AZURE_FORM_RECOGNIZER_API_KEY")
INTERMEDIATE_STEPS_PATH = "/app/index_data/data"

In [3]:

# Paso 3: Procesar documentos usando Azure Form Recognizer
def process_documents(input_path, output_path):
    document_files = [f for f in os.listdir(input_path) if f.endswith((".jpg", ".png", ".pptx", ".pdf", ".txt", ".bmp", ".tiff"))]
    form_recognizer_client = DocumentAnalysisClient(endpoint=AZURE_FORM_RECOGNIZER_ENDPOINT, credential=AzureKeyCredential(AZURE_FORM_RECOGNIZER_API_KEY))

    # Configuración de TokenTextSplitter
    text_splitter = TokenTextSplitter(chunk_size=1000, chunk_overlap=100)

    for document_file in document_files:
        file_path = os.path.join(input_path, document_file)

        # Validar formato MIME del archivo
        mime_type, _ = mimetypes.guess_type(file_path)
        try:
            if mime_type in ["application/pdf", "image/jpeg", "image/png", "image/bmp", "image/tiff"]:
                # Procesar con Azure Form Recognizer
                with open(file_path, "rb") as file:
                    poller = form_recognizer_client.begin_analyze_document("prebuilt-read", document=file)
                    result = poller.result()

                # Extraer texto
                text_content = " ".join([line.content for page in result.pages for line in page.lines])

            elif mime_type == "text/plain":
                # Leer archivos .txt manualmente
                with open(file_path, "r", encoding="utf-8") as file:
                    text_content = file.read()

            # Normalizar texto
            text_content_normalized = unicodedata.normalize("NFC", text_content)

            # Realizar chunking con TokenTextSplitter
            chunks = text_splitter.split_text(text_content_normalized)

            # Generar metadatos para cada chunk
            chunked_data = []
            for chunk in chunks:
                metadata = {
                    "count_tokens": len(chunk.split()),
                    "count_characters": len(chunk),
                    "source": document_file,
                    "type_source": mime_type
                }
                chunked_data.append({"content": chunk, "metadata": metadata})

            # Guardar chunks en un único archivo JSON
            output_file = os.path.join(output_path, f"{os.path.splitext(document_file)[0]}.json")
            with open(output_file, "w", encoding="utf-8") as file:
                json.dump(chunked_data, file, ensure_ascii=False, indent=4)

        except Exception as e:
            print(f"Error procesando {document_file}: {e}")

process_documents("/app/index_data/data","/app/index_data/data_processed")

In [3]:
def configure_azure_openai():
    """
    Configura el cliente de Azure OpenAI utilizando las variables de entorno.
    """
    return AzureOpenAI(
        api_key=os.getenv("AZURE_OPENAI_API_KEY"),
        api_version=os.getenv("AZURE_OPENAI_API_VERSION"),
        azure_endpoint=os.getenv("AZURE_OPENAI_ENDPOINT")
    )

def generate_hash_id(content):
    """
    Genera un ID hash único para un chunk basado en su contenido.
    """
    try:
        return hashlib.sha256(content.encode('utf-8')).hexdigest()
    except Exception as e:
        logging.error(f"Error generando hash ID: {e}", exc_info=True)
        raise

def preprocess_content(content):
    """
    Limpia y preprocesa el contenido para evitar activar el filtro de contenido.
    """
    content = re.sub(r"http\S+|www\S+|@\S+", "", content)
    content = re.sub(r"[^\w\s]", "", content)
    return content[:4000]

def save_data_as_json(data, output_file):
    """
    Guarda los datos en un archivo JSON.
    """
    try:
        with open(output_file, "w", encoding="utf-8") as file:
            json.dump(data, file, indent=4, ensure_ascii=False)
    except Exception as e:
        logging.error(f"Error al guardar el archivo JSON: {e}")

def process_entry(entry, client, combined_data, json_file):
    """
    Procesa una sola entrada JSON para generar palabras clave y embeddings.
    """
    errors = []
    if "content" not in entry or "metadata" not in entry:
        raise ValueError(f"El archivo {json_file} no tiene los campos necesarios.")

    if not entry["content"].strip():
        error_msg = f"Contenido vacío en {json_file}."
        raise ValueError(error_msg)

    entry["content"] = preprocess_content(entry["content"])

    try:
        current_id = generate_hash_id(entry["content"])
    except ValueError as ve:
        errors.append(f"Error generando hash ID para {json_file}: {ve}")
        return errors

    if "keywords" not in entry or not entry["keywords"]:
        try:
            prompt = f"Extrae las 5 palabras clave más importantes del siguiente texto: {entry['content']}"
            keywords_response = client.chat.completions.create(
                model="gpt-4o",
                messages=[
                    {"role": "system", "content": "Eres un modelo que genera palabras clave."},
                    {"role": "user", "content": prompt}
                ],
                temperature=0.0,
                max_tokens=50,
            )
            if keywords_response.choices and keywords_response.choices[0].message.content:
                keywords_text = keywords_response.choices[0].message.content.strip()

                # Usamos regex para extraer palabras clave eliminando numeración y saltos de línea
                extracted_keywords = re.findall(r"[\d\.\-\) ]*(.*)", keywords_text)

                # Limpiamos los elementos extraídos eliminando espacios vacíos
                entry["keywords"] = [kw.strip() for kw in extracted_keywords if kw.strip()]
            else:
                logging.warning(f"Respuesta filtrada o vacía para keywords en {json_file}. Respuesta: {keywords_response}")
                entry["keywords"] = ["content_filtered"]
        except Exception as e:
            errors.append(f"Error generando keywords para {json_file}: {e}")
            entry["keywords"] = ["error_generating_keywords"]

    try:
        response = client.embeddings.create(
            input=entry["content"],
            model=os.getenv("AZURE_OPENAI_EMBEDDING_MODEL_NAME", "text-embedding-ada-002")
        )
        entry["embeddings"] = response.data[0].embedding
    except Exception as e:
        errors.append(f"Error generando embeddings para {json_file}: {e}")
        entry["embeddings"] = []

    combined_data.append({
        "id": current_id,
        "content": entry["content"],
        "metadata": entry["metadata"],
        "keywords": entry["keywords"],
        "embeddings": entry["embeddings"]
    })

    return errors

def generate_embeddings_and_keywords(client, input_paths, output_path):
    """
    Procesa archivos JSON y genera embeddings y palabras clave.
    """
    os.makedirs(output_path, exist_ok=True)
    combined_data = []

    for input_path in input_paths:
        json_files = [f for f in os.listdir(input_path) if f.endswith(".json")]
        for json_file in json_files:
            file_path = os.path.join(input_path, json_file)
            try:
                with open(file_path, "r", encoding="utf-8") as file:
                    data = json.load(file)

                if isinstance(data, list):
                    for entry in data:
                        process_entry(entry, client, combined_data, json_file)
                elif isinstance(data, dict):
                    process_entry(data, client, combined_data, json_file)
            except Exception as e:
                logging.error(f"Error procesando {json_file}: {e}")

    output_json_file = os.path.join(output_path, "step5_combined_2.json")
    save_data_as_json(combined_data, output_json_file)

In [4]:
client = configure_azure_openai()

In [5]:
input_paths = [
    "/app/index_data/data_processed",
]
output_path = "/app/index_data/data_processed_embeddings"

print("Iniciando generación de embeddings y palabras clave...")
generate_embeddings_and_keywords(client, input_paths, output_path)

Iniciando generación de embeddings y palabras clave...


In [3]:
# ------------------------------------------------------------------------------
# Función para crear o actualizar un índice en Azure AI Search
# ------------------------------------------------------------------------------
def create_or_update_index(index_name, search_endpoint, search_api_key):
    """Crea o actualiza un índice en Azure AI Search con configuración de búsqueda vectorial y semántica."""
    try:
        credential = AzureKeyCredential(search_api_key)
        index_client = SearchIndexClient(endpoint=search_endpoint, credential=credential)

        fields = [
            SimpleField(name="id", type=SearchFieldDataType.String, key=True),
            SearchField(name="content", type=SearchFieldDataType.String, searchable=True, analyzer_name="standard.lucene"),
            ComplexField(name="metadata", fields=[
                SearchField(name="category", type=SearchFieldDataType.String, searchable=True, filterable=True),
                SimpleField(name="count_characters", type=SearchFieldDataType.Int32, filterable=True, sortable=True),
                SimpleField(name="count_tokens", type=SearchFieldDataType.Int32, filterable=True, sortable=True),
                SearchField(name="source", type=SearchFieldDataType.String, filterable=True),
                SearchField(name="type_source", type=SearchFieldDataType.String, filterable=True),
            ]),
            SearchField(name="keywords", type=SearchFieldDataType.Collection(SearchFieldDataType.String), searchable=True),
            SearchField(
                name="content_vector",
                type=SearchFieldDataType.Collection(SearchFieldDataType.Single),
                vector_search_dimensions=3072,
                searchable=True,
                vector_search_profile_name="myHnswProfile"
            )
        ]

        semantic_config = SemanticConfiguration(
            name="my-semantic-config",
            prioritized_fields=SemanticPrioritizedFields(
                title_field=SemanticField(field_name="metadata/source"),
                keywords_fields=[SemanticField(field_name="keywords")],
                content_fields=[SemanticField(field_name="content")]
            )
        )

        index = SearchIndex(
            name=index_name,
            fields=fields,
            vector_search=VectorSearch(
                algorithms=[HnswAlgorithmConfiguration(name="myHnsw")],
                profiles=[VectorSearchProfile(name="myHnswProfile", algorithm_configuration_name="myHnsw")]
            ),
            semantic_search=SemanticSearch(configurations=[semantic_config])
        )

        index_client.create_or_update_index(index)
        logging.info(f"Índice '{index_name}' creado o actualizado exitosamente.")
        return True
    except Exception as e:
        logging.error(f"Error al crear/actualizar el índice: {e}")
        return False

In [6]:
search_endpoint = os.getenv("AZURE_SEARCH_ENDPOINT")
search_api_key = os.getenv("AZURE_SEARCH_KEY")
index_name = "index-chatbot-in-a-day" # Reemplaza con tu indice

create_or_update_index(index_name, search_endpoint, search_api_key)

True

In [8]:
# ------------------------------------------------------------------------------
# Función para obtener IDs de documentos en Azure AI Search
# ------------------------------------------------------------------------------
def get_existing_chunk_ids(search_endpoint, index_name, search_api_key):
    """Obtiene los IDs de los chunks ya almacenados en Azure Search."""
    try:
        credential = AzureKeyCredential(search_api_key)
        search_client = SearchClient(endpoint=search_endpoint, index_name=index_name, credential=credential)

        results = search_client.search(search_text="*", select=["id"])
        existing_ids = [result["id"] for result in results]
        return existing_ids
    except Exception as e:
        logging.error(f"Error obteniendo IDs existentes de Azure Search: {e}", exc_info=True)
        raise ValueError(f"Error obteniendo IDs existentes de Azure Search: {e}")

# ------------------------------------------------------------------------------
# Función para eliminar documentos obsoletos en Azure AI Search
# ------------------------------------------------------------------------------
def delete_obsolete_chunks(search_endpoint, index_name, search_api_key, existing_ids, new_ids):
    """Elimina los documentos en Azure Search que están en 'existing_ids' pero no en 'new_ids'."""
    try:
        credential = AzureKeyCredential(search_api_key)
        search_client = SearchClient(endpoint=search_endpoint, index_name=index_name, credential=credential)

        chunks_to_delete = set(existing_ids) - set(new_ids)
        logging.info(f"Chunks a eliminar: {chunks_to_delete}")

        if chunks_to_delete:
            documents_to_delete = [{"id": doc_id} for doc_id in chunks_to_delete]
            try:
                response = search_client.delete_documents(documents=documents_to_delete)
                for res in response:
                    if res.succeeded:
                        logging.info(f"Documento eliminado exitosamente: {res.key}")
                    else:
                        logging.error(f"Fallo al eliminar el documento {res.key}: {res.error_message}")
            except Exception as inner_e:
                logging.error(f"Error eliminando documentos: {inner_e}", exc_info=True)
        else:
            logging.info("No hay documentos obsoletos para eliminar.")
    except Exception as e:
        logging.error(f"Error eliminando chunks obsoletos de Azure Search: {e}", exc_info=True)
        raise ValueError(f"Error eliminando chunks obsoletos de Azure Search: {e}")

# ------------------------------------------------------------------------------
# Función para subir documentos a Azure AI Search
# ------------------------------------------------------------------------------
def upload_to_azure_search(datos, index_name, search_endpoint, search_api_key):
    """Carga documentos a Azure AI Search y elimina documentos obsoletos."""
    errores = []

    if not create_or_update_index(index_name, search_endpoint, search_api_key):
        logging.error("No se pudo crear o actualizar el índice.")
        return ["No se pudo crear el índice."]

    credential = AzureKeyCredential(search_api_key)
    search_client = SearchClient(endpoint=search_endpoint, index_name=index_name, credential=credential)

    # Preparar documentos para la carga
    documentos = []
    for item in datos:
        metadata = item.get("metadata", {})
        if not isinstance(metadata, dict):
            logging.warning(f"El campo 'metadata' no es un diccionario para el ID: {item.get('id')}.")
            metadata = {}

        documentos.append({
            "id": str(item.get("id", "")),
            "content": item.get("content", ""),
            "metadata": {
                "category": metadata.get("category", ""),
                "count_characters": metadata.get("count_characters", 0),
                "count_tokens": metadata.get("count_tokens", 0),
                "source": metadata.get("source", ""),
                "type_source": metadata.get("type_source", "")
            },
            "keywords": [str(p) for p in item.get("keywords", [])],
            "content_vector": item.get("embeddings", [])
        })

    logging.info(f"Documentos a cargar: {len(documentos)}")

    existing_ids = set(get_existing_chunk_ids(search_endpoint, index_name, search_api_key))
    new_ids = set(doc["id"] for doc in documentos)

    delete_obsolete_chunks(search_endpoint, index_name, search_api_key, existing_ids, new_ids)

    # Subir documentos en lotes de 50
    batch_size = 50
    for i in range(0, len(documentos), batch_size):
        batch = documentos[i:i+batch_size]
        try:
            response = search_client.upload_documents(documents=batch)
            if all(res.succeeded for res in response):
                logging.info(f"Lote {i}-{i+batch_size}: {len(batch)} documentos subidos exitosamente.")
            else:
                logging.error(f"Lote {i}-{i+batch_size}: Falló la carga de algunos documentos.")
        except Exception as e:
            logging.error(f"Error en lote {i}-{i+batch_size}: {e}")
            errores.append(f"Error en lote {i}-{i+batch_size}: {e}")
            break

    return errores


In [9]:
datos = json.load(open("/app/index_data/data_processed_embeddings/step5_combined_2.json"))

errores = upload_to_azure_search(datos, index_name, search_endpoint, search_api_key)
if errores:
    logging.error(f"Errores en la carga a Azure Search: {errores}")
else:
    logging.info("Carga a Azure Search completada exitosamente.")