In [None]:
# %% [markdown]
# ### **PASO 1: Verificar versión de Python**
# Se valida que la versión de Python sea la requerida.
#
# - **Líneas Clave:**
#   - Importación de librerías para verificar la versión (`sys`, `os`).
#   - Configuración de logging para advertencias y mensajes informativos.
#   - Comparación de la versión actual con la requerida.
# - **Oportunidad de Mejora:** Agregar soporte para versiones cercanas si no es crítica la compatibilidad exacta.

# %%
import sys
import os
import logging

# Desactivar advertencias de paralelización en tokenizadores
os.environ["TOKENIZERS_PARALLELISM"] = "false"

# Configurar logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
)

# Versión requerida
REQUIRED_VERSION = (3, 10, 12)
current_version = sys.version_info

# Validar compatibilidad de versión
if (current_version.major, current_version.minor, current_version.micro) != REQUIRED_VERSION:
    logging.warning(f"""
    **********************************************
    ** Advertencia: Versión de Python no compatible **
    **********************************************
    Este chatbot está optimizado para Python {REQUIRED_VERSION[0]}.{REQUIRED_VERSION[1]}.{REQUIRED_VERSION[2]}.
    La versión actual es Python {current_version.major}.{current_version.minor}.{current_version.micro}.
    Algunas funcionalidades pueden no funcionar correctamente.
    **********************************************
    """)
else:
    logging.info("""
    **********************************************
    ** Versión de Python compatible **
    **********************************************
    Python 3.10.12 detectado correctamente.
    Todas las funcionalidades deberían operar sin problemas.
    **********************************************
    """)


In [None]:
# %% [markdown]
# ### **PASO 2: Instalación de Paquetes Necesarios**
# Se listan las bibliotecas requeridas para el funcionamiento del chatbot.
#
# - **Líneas Clave:**
#   - Uso de `requirements.txt` para instalar dependencias.
#   - Listado de las principales bibliotecas usadas.
# - **Oportunidad de Mejora:** Implementar una verificación automática de instalación.

# %%
%pip install -r requirements.txt

In [None]:
# %% [markdown]
# ### **PASO 3: Importar Librerías y Configurar Logging**
# Importa todas las librerías necesarias y configura un sistema de logs.
#
# - **Líneas Clave:**
#   - Importación de librerías estándar (`os`, `json`, `logging`).
#   - Importación de librerías específicas (`hnswlib`, `sentence_transformers`).
#   - Configuración del sistema de logs.
#   - Carga de variables de entorno desde un archivo `.env`.
# - **Oportunidad de Mejora:** Separar configuraciones sensibles como la clave de API en una función de inicialización.

# %%
import os
import json
import logging
import hnswlib
from sentence_transformers import SentenceTransformer, util
import numpy as np
from dotenv import load_dotenv
from PyPDF2 import PdfReader
from tenacity import retry, wait_exponential, stop_after_attempt, retry_if_exception_type
from llama_index.llms.gemini import Gemini
from llama_index.core.llms import ChatMessage
import time
import hashlib

# Configuración de logs
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    handlers=[logging.StreamHandler()]
)

# Mensaje de confirmación de importación
logging.info("Librerías importadas correctamente.")

# Cargar variables de entorno
load_dotenv()
logging.info("Variables de entorno cargadas desde el archivo .env.")


In [None]:
# %% [markdown]
# ### **PASO 4: Cargar Documentos**
# Carga documentos desde archivos o directorios.
#
# - **Líneas Clave:**
#   - `load_documents`: Carga archivos de tipo `.txt`, `.json`, y `.pdf`.
#   - `extract_content`: Procesa el contenido de los documentos dependiendo de su tipo.
# - **Oportunidad de Mejora:** Mejorar la validación y manejo de excepciones para formatos no soportados.

# %%
def load_documents(source, is_directory=False):
    """
    Carga documentos desde un archivo o directorio.
    """
    if not os.path.exists(source):
        logging.error(f"La fuente '{source}' no existe.")
        raise FileNotFoundError(f"La fuente '{source}' no se encontró.")

    loaded_files = []
    if is_directory:
        logging.info(f"Iniciando carga desde el directorio: {source}.")
        for filename in os.listdir(source):
            filepath = os.path.join(source, filename)
            if os.path.isfile(filepath) and filepath.endswith(('.txt', '.json', '.pdf')):
                content = extract_content(filepath)
                if content:
                    loaded_files.append({"filename": filename, "content": content})
                    logging.info(f"Archivo '{filename}' cargado correctamente.")
    else:
        logging.info(f"Iniciando carga del archivo: {source}.")
        content = extract_content(source)
        if content:
            loaded_files.append({"filename": os.path.basename(source), "content": content})
            logging.info(f"Archivo '{os.path.basename(source)}' cargado correctamente.")

    logging.info(f"{len(loaded_files)} documentos cargados.")
    return loaded_files

def extract_content(filepath):
    """
    Extrae el contenido del archivo según su tipo.
    """
    try:
        if filepath.endswith('.txt'):
            with open(filepath, 'r', encoding='utf-8') as file:
                content = file.read()
            units = content.split("\n-----\n")
            return units
        elif filepath.endswith('.json'):
            with open(filepath, 'r', encoding='utf-8') as file:
                data = json.load(file)
            return data
        elif filepath.endswith('.pdf'):
            reader = PdfReader(filepath)
            return ''.join(page.extract_text() or '' for page in reader.pages)
    except Exception as e:
        logging.error(f"Error al extraer contenido de '{filepath}': {e}")
        return None

# Configuración de ruta y carga de documentos
ruta_fuente = 'data'
documentos = load_documents(ruta_fuente, is_directory=True)
logging.info(f"Se cargaron {len(documentos)} documentos exitosamente.")


In [None]:
# %% [markdown]
# ### **PASO 5: Configurar la Clave API de Gemini**
# - Obtiene la clave API desde las variables de entorno.
# - Crea una instancia del modelo Gemini para uso posterior.
# OPORTUNIDAD DE MEJORA: Manejo más robusto ante una clave inválida o expiración.

# %%
gemini_llm = None

def configure_gemini():
    api_key = os.getenv("GEMINI_API_KEY")
    if not api_key:
        logging.error("La clave API de Gemini no está configurada.")
        raise EnvironmentError("Configura GEMINI_API_KEY en tu archivo .env.")
    gemini = Gemini(api_key=api_key)
    logging.info("Gemini configurado correctamente.")
    return gemini

gemini_llm = configure_gemini()


In [None]:
# %% [markdown]
# ### **PASO 6: Configurar el Modelo de Embeddings**
# - Se utiliza SentenceTransformer para generar embeddings de texto.
# - doc_enfermedad(pregunta): Determina el índice del documento más relevante 
#   según la similitud con el nombre del archivo.
#
# OPORTUNIDAD DE MEJORA:
# - En lugar de basarse en el nombre del archivo, usar embeddings del contenido 
#   para mayor precisión.

# %%
model_name = "all-MiniLM-L6-v2"
model = SentenceTransformer(model_name)

def doc_enfermedad(pregunta):
    if not documentos:
        logging.warning("No se encontraron documentos. Índice por defecto: 0.")
        return 0
    preg_embedding = model.encode(pregunta)
    archivos = [doc['filename'] for doc in documentos]
    doc_filenames_embeddings = [model.encode(name) for name in archivos]
    similarities = [util.cos_sim(preg_embedding, emb).item() for emb in doc_filenames_embeddings]
    max_index = similarities.index(max(similarities))
    return max_index


In [None]:
# %% [markdown]
# ### **PASO 7: Crear Clases para Documentos e Índices**
# - Clase Document: Representa un documento con su texto y metadatos.
# - Clase HNSWIndex: Crea un índice para recuperación rápida de información mediante embeddings.
#
# OPORTUNIDAD DE MEJORA:
# - Incluir más metadatos o estructuras de datos más complejas.

# %%
class Document:
    def __init__(self, text, metadata=None):
        self.page_content = text
        self.metadata = metadata or {}
    
    def __str__(self):
        return (
            f"Título: {self.metadata.get('Title', 'N/A')}\n"
            f"Resumen: {self.metadata.get('Summary', 'N/A')}\n"
            f"Tipo de Estudio: {self.metadata.get('StudyType', 'N/A')}\n"
            f"Paises donde se desarrolla el estudio: {self.metadata.get('Countries', 'N/A')}\n"
            f"Fase en que se encuentra el estudio: {self.metadata.get('Phases', 'N/A')}\n"
            f"Identificación en ClinicaTrial: {self.metadata.get('IDestudio', 'N/A')}.\n\n"
        )

class HNSWIndex:
    def __init__(self, embeddings, metadata=None, space='cosine', ef_construction=200, M=16):
        self.dimension = embeddings.shape[1]
        self.index = hnswlib.Index(space=space, dim=self.dimension)
        self.index.init_index(max_elements=embeddings.shape[0], ef_construction=ef_construction, M=M)
        self.index.add_items(embeddings, np.arange(embeddings.shape[0]))
        self.index.set_ef(50) 
        self.metadata = metadata or []
    
    def similarity_search(self, query_vector, k=5):
        labels, distances = self.index.knn_query(query_vector, k=k)
        return [(self.metadata[i], distances[0][j]) for j, i in enumerate(labels[0])]


In [None]:
# %% [markdown]
# ### **PASO 8: Procesar Documentos y Crear Índices**
# - desdobla_doc(data2): Crea objetos Document a partir del contenido.
# - Para JSON con ensayos clínicos, crea un Document por ensayo.
# - Para TXT/PDF, un Document genérico.
# - Genera embeddings y construye el índice HNSWlib.
#
# OPORTUNIDAD DE MEJORA:
# - Validar mejor la estructura JSON.
# - Realizar preprocesamiento de texto (limpieza) antes de embeddings.

# %%
def desdobla_doc(data2):
    documents = []
    summaries = []
    contenido = data2['content']
    
    if isinstance(contenido, list):
        for entry in contenido:
            if isinstance(entry, dict):
                nctId = entry.get("IDestudio", "")
                briefTitle = entry.get("Title", "")
                summary = entry.get("Summary", "")
                studyType = entry.get("StudyType", "")
                country = entry.get("Countries", "")
                overallStatus = entry.get("OverallStatus", "")
                conditions = entry.get("Conditions", "")
                phases = entry.get("Phases", "")

                Summary = (
                    f"The study titled '{briefTitle}', of type '{studyType}', "
                    f"investigates the condition(s): {conditions}. "
                    f"Brief summary: {summary}. "
                    f"Current status: {overallStatus}, taking place in {country}. "
                    f"The study is classified under: {phases} phase. "
                    f"For more info, search {nctId} on ClinicalTrials."
                )
                metadata = {
                    "Title": briefTitle,
                    "Summary": Summary,
                    "StudyType": studyType,
                    "Countries": country,
                    "Phases": phases,
                    "IDestudio": nctId
                }
                doc = Document(Summary, metadata)
                documents.append(doc)
                summaries.append(Summary)
            else:
                texto = str(entry)
                metadata = {"Summary": texto}
                doc = Document(texto, metadata)
                documents.append(doc)
                summaries.append(texto)
    else:
        texto = str(contenido)
        metadata = {"Summary": texto}
        doc = Document(texto, metadata)
        documents.append(doc)
        summaries.append(texto)

    if documents:
        embeddings = model.encode([doc.page_content for doc in documents], show_progress_bar=False)
        embeddings = np.array(embeddings).astype(np.float32)
        vector_store = HNSWIndex(embeddings, metadata=[doc.metadata for doc in documents])
    else:
        vector_store = None

    return documents, vector_store

trozos_archivos = []
index_archivos = []
for i in range(len(documentos)):
    trozos, index = desdobla_doc(documentos[i])
    trozos_archivos.append(trozos)
    index_archivos.append(index)

logging.info("Índices HNSWlib creados para todos los documentos.")


In [None]:
# %% [markdown]
# ### **PASO 9: Traducir Preguntas y Respuestas**
# - traducir(texto, idioma_destino): Usa Gemini para traducir el texto solicitado.
# - generate_embedding(texto): Genera embeddings para la pregunta en inglés.
#
# OPORTUNIDAD DE MEJORA:
# - Implementar caché de traducciones.
# - Detección de idioma para traducir sólo si es necesario.

# %%
def traducir(texto, idioma_destino):
    start_time = time.time()
    mensajes = [
        ChatMessage(role="system", content="Actúa como un traductor."),
        ChatMessage(role="user", content=f"Por favor, traduce este texto al {idioma_destino}: {texto}")
    ]
    try:
        respuesta = gemini_llm.chat(mensajes)
        elapsed_time = time.time() - start_time
        logging.info(f"Traducción completada en {elapsed_time:.2f} segundos.")
        return respuesta.message.content.strip()
    except Exception as e:
        logging.error(f"Error al traducir: {e}")
        return texto

def generate_embedding(texto):
    try:
        embedding = model.encode([texto])
        logging.info(f"Embedding generado para el texto: {texto}")
        return embedding
    except Exception as e:
        logging.error(f"Error al generar el embedding: {e}")
        # Devuelve embedding vacío como fallback
        return np.zeros((1, 384))

def obtener_contexto(pregunta, index, trozos, top_k=50):
    """
    Recupera los trozos de texto más relevantes para responder la pregunta.
    Traduce la pregunta al inglés antes de buscar en el índice.
    """
    try:
        pregunta_en_ingles = traducir(pregunta, "inglés")
        logging.info(f"Pregunta traducida al inglés: {pregunta_en_ingles}")

        pregunta_emb = generate_embedding(pregunta_en_ingles)
        logging.info("Embedding generado para la pregunta.")

        results = index.similarity_search(pregunta_emb, k=top_k)
        texto = ""
        for entry in results:
            resum = entry[0]["Summary"]
            texto += resum + "\n"

        logging.info("Contexto relevante recuperado para la pregunta.")
        return texto
    except Exception as e:
        logging.error(f"Error al obtener el contexto: {e}")
        return ""


In [None]:
# %% [markdown]
# ### **PASO 10: Generar Respuestas**
# - categorizar_pregunta: Usa palabras clave para clasificar la pregunta (ej: 'ensayo', 'tratamiento').
# - generar_prompt: Crea una instrucción específica según la categoría.
# - generar_respuesta: 
#   - Envía el prompt y el contexto a Gemini.
#   - Traduce la respuesta al español.
#
# OPORTUNIDAD DE MEJORA:
# - Usar un modelo de clasificación semántica en lugar de palabras clave.
# - Detectar el idioma de la pregunta y responder en el mismo idioma.

# %%
def categorizar_pregunta(pregunta):
    categorias = {
        "tratamiento": ["tratamiento", "medicación", "cura", "terapia", "fármaco"],
        "ensayo": ["ensayo", "estudio", "prueba", "investigación", "trial"],
        "resultado": ["resultado", "efectividad", "resultados", "éxito", "fracaso"],
        "prevención": ["prevención", "previene", "evitar", "reducción de riesgo"]
    }
    for categoria, palabras in categorias.items():
        if any(palabra in pregunta.lower() for palabra in palabras):
            return categoria
    return "general"

def generar_prompt(categoria, pregunta):
    prompts = {
        "tratamiento": f"Proporciona información sobre tratamientos en ensayos clínicos relacionados con: {pregunta}.",
        "ensayo": f"Describe los ensayos clínicos actuales relacionados con: {pregunta}.",
        "resultado": f"Explica los resultados más recientes de ensayos clínicos sobre: {pregunta}.",
        "prevención": f"Ofrece información sobre prevención y ensayos clínicos para: {pregunta}."
    }
    return prompts.get(categoria, "Por favor, responde la pregunta sobre ensayos clínicos.")

def es_saludo(pregunta):
    saludos = ["hola", "buen día", "buenas", "cómo estás", "cómo te llamas", "qué tal", "estás bien", "buenas tardes", "buenas noches"]
    return any(saludo in pregunta.lower() for saludo in saludos)

def responder_saludo():
    saludos_respuestas = [
        "¡Hola! Estoy para ayudarte con información sobre ensayos clínicos. ¿En qué puedo asistirte hoy?",
        "¡Buenas! Tenés alguna pregunta sobre ensayos clínicos en enfermedades neuromusculares?",
        "¡Hola! ¿Cómo puedo ayudarte con tus consultas sobre ensayos clínicos?"
    ]
    import random
    return random.choice(saludos_respuestas)

def generar_respuesta(pregunta, contexto, prompt_especifico):
    mensajes = [
        ChatMessage(role="system", content="Eres un experto médico."),
        ChatMessage(role="user", content=f"{prompt_especifico}\nContexto: {contexto}\nPregunta: {pregunta}")
    ]
    start_time = time.time()
    try:
        respuesta = gemini_llm.chat(mensajes)
        elapsed_time = time.time() - start_time
        logging.info(f"Respuesta generada en inglés en {elapsed_time:.2f} segundos.")
        respuesta_en_espanol = traducir(respuesta.message.content, "español")
        logging.info("Respuesta traducida al español.")
        return respuesta_en_espanol
    except Exception as e:
        logging.error(f"Error al generar la respuesta: {e}")
        return "Lo siento, ocurrió un error al generar la respuesta."


In [None]:
# %% [markdown]
# ### **PASO 11: Función Principal para Responder Preguntas**
# - Utiliza caché para no recalcular respuestas idénticas.
# - Integra categorización, obtención de contexto y generación de respuesta.
#
# OPORTUNIDAD DE MEJORA:
# - Expirar caché después de cierto tiempo.
# - Manejar mejor errores de contexto.

# %%
def generar_hash(pregunta):
    return hashlib.sha256(pregunta.encode('utf-8')).hexdigest()

def obtener_respuesta_cacheada(pregunta):
    hash_pregunta = generar_hash(pregunta)
    archivo_cache = f"cache/{hash_pregunta}.json"
    if os.path.exists(archivo_cache):
        try:
            with open(archivo_cache, "r", encoding='utf-8') as f:
                datos = json.load(f)
                return datos.get("respuesta", None)
        except Exception as e:
            logging.error(f"Error al leer el caché: {e}")
            return None
    return None

def guardar_respuesta_cacheada(pregunta, respuesta):
    hash_pregunta = generar_hash(pregunta)
    archivo_cache = f"cache/{hash_pregunta}.json"
    try:
        os.makedirs(os.path.dirname(archivo_cache), exist_ok=True)
        with open(archivo_cache, "w", encoding='utf-8') as f:
            json.dump({"pregunta": pregunta, "respuesta": respuesta}, f, ensure_ascii=False, indent=4)
        logging.info(f"Respuesta cacheada para la pregunta: '{pregunta}'")
    except Exception as e:
        logging.error(f"Error al guardar la respuesta en caché: {e}")

def responder_pregunta(pregunta, index, trozos):
    try:
        if index is None or not trozos:
            logging.warning("No se encontraron índices o trozos para esta pregunta.")
            return "No se encontró información para responder tu pregunta."

        respuesta_cacheada = obtener_respuesta_cacheada(pregunta)
        if respuesta_cacheada:
            logging.info(f"Respuesta obtenida del caché para: '{pregunta}'")
            return respuesta_cacheada

        categoria = categorizar_pregunta(pregunta)
        logging.info(f"Categoría de la pregunta: {categoria}")
        prompt_especifico = generar_prompt(categoria, pregunta)
        logging.info(f"Prompt específico: {prompt_especifico}")

        contexto = obtener_contexto(pregunta, index, trozos)
        if not contexto.strip():
            logging.warning("No se encontró contexto relevante.")
            respuesta = "No pude encontrar información relevante para responder tu pregunta."
            guardar_respuesta_cacheada(pregunta, respuesta)
            return respuesta

        respuesta = generar_respuesta(pregunta, contexto, prompt_especifico)
        guardar_respuesta_cacheada(pregunta, respuesta)
        return respuesta
    except Exception as e:
        logging.error(f"Error en el proceso de responder pregunta: {e}")
        return "Ocurrió un error al procesar tu pregunta."


In [None]:
# %% [markdown]
# ### **PASO 12: Interfaz CLI**
# Ofrece una interfaz de línea de comando para interactuar con el chatbot.
#
# - Espera una pregunta del usuario.
# - Responde saludos.
# - Detecta si el usuario quiere salir.
# - Llama a responder_pregunta() con la información necesaria.
#
# OPORTUNIDAD DE MEJORA:
# - Crear una interfaz web amigable.
# - Almacenar historial de preguntas y respuestas.

# %%
if __name__ == "__main__":
    os.makedirs("cache", exist_ok=True)
    
    if len(documentos) == 0:
        print("No se cargaron documentos. Por favor, verifica el directorio 'data'.")
        logging.error("No se encontraron documentos. Finalizando.")
    else:
        print("Bienvenido al Chatbot de Ensayos Clínicos")
        print("Conversemos sobre Ensayos Clínicos en enfermedades neuromusculares (Distrofia Muscular de Duchenne o Becker, Enfermedad de Pompe, Distrofia Miotónica, etc.).")
        print("Escribí tu pregunta, indicando la enfermedad sobre la que quieres información. Escribí 'salir' para terminar.")
        while True:
            pregunta = input("Tu pregunta: ").strip()
            if pregunta.lower() in ['salir', 'chau', 'exit', 'quit']:
                print("¡Chau!")
                logging.info("El usuario ha finalizado la sesión.")
                break
            if es_saludo(pregunta):
                respuesta_saludo = responder_saludo()
                print(respuesta_saludo)
                logging.info("Se detectó un saludo.")
                continue
            
            idn = doc_enfermedad(pregunta)
            index = index_archivos[idn] if idn < len(index_archivos) else None
            trozos = trozos_archivos[idn] if idn < len(trozos_archivos) else []

            respuesta = responder_pregunta(pregunta, index, trozos)
            print(f"Respuesta: {respuesta}")


In [None]:
# %% [markdown]
# ### **PASO 1: Verificar versión de Python**
# Se valida que la versión de Python sea la requerida.

# %%
import sys
import os
import logging

os.environ["TOKENIZERS_PARALLELISM"] = "false"

logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
)

REQUIRED_VERSION = (3, 10, 12)
current_version = sys.version_info

if (current_version.major, current_version.minor, current_version.micro) != REQUIRED_VERSION:
    logging.warning(f"""
    **********************************************
    ** Advertencia: Versión de Python no compatible **
    **********************************************
    Este chatbot está optimizado para Python {REQUIRED_VERSION[0]}.{REQUIRED_VERSION[1]}.{REQUIRED_VERSION[2]}.
    La versión actual es Python {current_version.major}.{current_version.minor}.{current_version.micro}.
    Algunas funcionalidades pueden no funcionar correctamente.
    **********************************************
    """)
else:
    logging.info("""
    **********************************************
    ** Versión de Python compatible **
    **********************************************
    Python 3.10.12 detectado correctamente.
    Todas las funcionalidades deberían operar sin problemas.
    **********************************************
    """)


# %% [markdown]
# ### **PASO 2: Instalación de Paquetes Necesarios**
# Se utilizan las librerías indicadas en `requirements.txt`.

# %%
%pip install -r requirements.txt


# %% [markdown]
# ### **PASO 3: Importar Librerías y Configurar Logging**
# Importa las librerías necesarias y configura el sistema de logs.

# %%
import os
import json
import logging
import hnswlib
from sentence_transformers import SentenceTransformer, util
import numpy as np
from dotenv import load_dotenv
from PyPDF2 import PdfReader
from tenacity import retry, wait_exponential, stop_after_attempt, retry_if_exception_type
from llama_index.llms.gemini import Gemini
from llama_index.core.llms import ChatMessage
import time
import hashlib

logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    handlers=[logging.StreamHandler()]
)

logging.info("Librerías importadas correctamente.")
load_dotenv()
logging.info("Variables de entorno cargadas desde el archivo .env.")


# %% [markdown]
# ### **PASO 4: Cargar Documentos**
# Carga documentos desde archivos o directorios en formatos `.txt`, `.json`, `.pdf`.

# %%
def load_documents(source, is_directory=False):
    if not os.path.exists(source):
        logging.error(f"La fuente '{source}' no existe.")
        raise FileNotFoundError(f"La fuente '{source}' no se encontró.")

    loaded_files = []
    if is_directory:
        logging.info(f"Iniciando carga desde el directorio: {source}.")
        for filename in os.listdir(source):
            filepath = os.path.join(source, filename)
            if os.path.isfile(filepath) and filepath.endswith(('.txt', '.json', '.pdf')):
                content = extract_content(filepath)
                if content:
                    loaded_files.append({"filename": filename, "content": content})
                    logging.info(f"Archivo '{filename}' cargado correctamente.")
    else:
        logging.info(f"Iniciando carga del archivo: {source}.")
        content = extract_content(source)
        if content:
            loaded_files.append({"filename": os.path.basename(source), "content": content})
            logging.info(f"Archivo '{os.path.basename(source)}' cargado correctamente.")

    logging.info(f"{len(loaded_files)} documentos cargados.")
    return loaded_files

def extract_content(filepath):
    try:
        if filepath.endswith('.txt'):
            with open(filepath, 'r', encoding='utf-8') as file:
                content = file.read()
            units = content.split("\n-----\n")
            return units
        elif filepath.endswith('.json'):
            with open(filepath, 'r', encoding='utf-8') as file:
                data = json.load(file)
            return data
        elif filepath.endswith('.pdf'):
            reader = PdfReader(filepath)
            return ''.join(page.extract_text() or '' for page in reader.pages)
    except Exception as e:
        logging.error(f"Error al extraer contenido de '{filepath}': {e}")
        return None

ruta_fuente = 'data'
documentos = load_documents(ruta_fuente, is_directory=True)
logging.info(f"Se cargaron {len(documentos)} documentos exitosamente.")


# %% [markdown]
# ### **PASO 5: Configurar la Clave API de Gemini**
# Obtiene la clave desde las variables de entorno y crea la instancia del modelo Gemini.

# %%
gemini_llm = None

def configure_gemini():
    api_key = os.getenv("GEMINI_API_KEY")
    if not api_key:
        logging.error("La clave API de Gemini no está configurada.")
        raise EnvironmentError("Configura GEMINI_API_KEY en tu archivo .env.")
    gemini = Gemini(api_key=api_key)
    logging.info("Gemini configurado correctamente.")
    return gemini

gemini_llm = configure_gemini()


# %% [markdown]
# ### **PASO 6: Configurar el Modelo de Embeddings**
# Utiliza SentenceTransformer para generar embeddings.

# %%
model_name = "all-MiniLM-L6-v2"
model = SentenceTransformer(model_name)

# Precompute embeddings de filenames una sola vez
archivos = [doc['filename'] for doc in documentos]
archivos_embeddings = model.encode(archivos)
logging.info("Embeddings de nombres de archivos precalculados.")


def doc_enfermedad(pregunta):
    """
    Identifica el índice del documento más relevante para la pregunta,
    usando embeddings precalculados de los nombres de archivo.
    """
    if not documentos:
        logging.warning("No se encontraron documentos. Índice por defecto: 0.")
        return 0
    preg_embedding = model.encode(pregunta)
    similarities = [util.cos_sim(preg_embedding, emb.reshape(1, -1)).item() for emb in archivos_embeddings]
    max_index = similarities.index(max(similarities))
    return max_index


# %% [markdown]
# ### **PASO 7: Crear Clases para Documentos e Índices**
# Document y HNSWIndex para manejar la indexación y búsqueda de textos.

# %%
class Document:
    def __init__(self, text, metadata=None):
        self.page_content = text
        self.metadata = metadata or {}
    
    def __str__(self):
        return (
            f"Título: {self.metadata.get('Title', 'N/A')}\n"
            f"Resumen: {self.metadata.get('Summary', 'N/A')}\n"
            f"Tipo de Estudio: {self.metadata.get('StudyType', 'N/A')}\n"
            f"Paises donde se desarrolla el estudio: {self.metadata.get('Countries', 'N/A')}\n"
            f"Fase en que se encuentra el estudio: {self.metadata.get('Phases', 'N/A')}\n"
            f"Identificación en ClinicaTrial: {self.metadata.get('IDestudio', 'N/A')}.\n\n"
        )

class HNSWIndex:
    def __init__(self, embeddings, metadata=None, space='cosine', ef_construction=200, M=16):
        self.dimension = embeddings.shape[1]
        self.index = hnswlib.Index(space=space, dim=self.dimension)
        self.index.init_index(max_elements=embeddings.shape[0], ef_construction=ef_construction, M=M)
        self.index.add_items(embeddings, np.arange(embeddings.shape[0]))
        self.index.set_ef(50) 
        self.metadata = metadata or []
    
    def similarity_search(self, query_vector, k=5):
        labels, distances = self.index.knn_query(query_vector, k=k)
        return [(self.metadata[i], distances[0][j]) for j, i in enumerate(labels[0])]


# %% [markdown]
# ### **PASO 8: Procesar Documentos y Crear Índices**
# Crea objetos Document e índices HNSWlib.

# %%
def desdobla_doc(data2):
    documents = []
    summaries = []
    contenido = data2['content']
    
    if isinstance(contenido, list):
        for entry in contenido:
            if isinstance(entry, dict):
                nctId = entry.get("IDestudio", "")
                briefTitle = entry.get("Title", "")
                summary = entry.get("Summary", "")
                studyType = entry.get("StudyType", "")
                country = entry.get("Countries", "")
                overallStatus = entry.get("OverallStatus", "")
                conditions = entry.get("Conditions", "")
                phases = entry.get("Phases", "")

                Summary = (
                    f"The study titled '{briefTitle}', of type '{studyType}', "
                    f"investigates the condition(s): {conditions}. "
                    f"Brief summary: {summary}. "
                    f"Current status: {overallStatus}, taking place in {country}. "
                    f"The study is classified under: {phases} phase. "
                    f"For more info, search {nctId} on ClinicalTrials."
                )
                metadata = {
                    "Title": briefTitle,
                    "Summary": Summary,
                    "StudyType": studyType,
                    "Countries": country,
                    "Phases": phases,
                    "IDestudio": nctId
                }
                doc = Document(Summary, metadata)
                documents.append(doc)
                summaries.append(Summary)
            else:
                # Entrada genérica
                texto = str(entry)
                metadata = {"Summary": texto}
                doc = Document(texto, metadata)
                documents.append(doc)
                summaries.append(texto)
    else:
        texto = str(contenido)
        metadata = {"Summary": texto}
        doc = Document(texto, metadata)
        documents.append(doc)
        summaries.append(texto)

    if documents:
        embeddings = model.encode([doc.page_content for doc in documents], show_progress_bar=False)
        embeddings = np.array(embeddings).astype(np.float32)
        vector_store = HNSWIndex(embeddings, metadata=[doc.metadata for doc in documents])
    else:
        vector_store = None

    return documents, vector_store

trozos_archivos = []
index_archivos = []
for i in range(len(documentos)):
    trozos, index = desdobla_doc(documentos[i])
    trozos_archivos.append(trozos)
    index_archivos.append(index)

logging.info("Índices HNSWlib creados para todos los documentos.")


# %% [markdown]
# ### **PASO 9: Traducir Preguntas y Respuestas**
# Usa Gemini para traducir texto al idioma deseado.
#
# OPORTUNIDAD DE MEJORA: Implementar detección de idioma si se requiere.

# %%
translation_cache = {}
embedding_cache = {}

def traducir(texto, idioma_destino):
    # TODO: Implementar detección de idioma si se desea, para evitar traducciones innecesarias.
    # Por ahora, traducimos siempre.
    cache_key = (texto, idioma_destino)
    if cache_key in translation_cache:
        logging.info("Traducción obtenida de caché.")
        return translation_cache[cache_key]

    start_time = time.time()
    mensajes = [
        ChatMessage(role="system", content="Actúa como un traductor."),
        ChatMessage(role="user", content=f"Por favor, traduce este texto al {idioma_destino}: {texto}")
    ]
    try:
        respuesta = gemini_llm.chat(mensajes)
        elapsed_time = time.time() - start_time
        logging.info(f"Traducción completada en {elapsed_time:.2f} segundos.")
        translation_cache[cache_key] = respuesta.message.content.strip()
        return respuesta.message.content.strip()
    except Exception as e:
        logging.error(f"Error al traducir: {e}")
        return texto

def generate_embedding(texto):
    if texto in embedding_cache:
        logging.info("Embedding obtenido de caché.")
        return embedding_cache[texto]

    try:
        embedding = model.encode([texto])
        logging.info(f"Embedding generado para el texto: {texto}")
        embedding_cache[texto] = embedding
        return embedding
    except Exception as e:
        logging.error(f"Error al generar el embedding: {e}")
        return np.zeros((1, 384))

def obtener_contexto(pregunta, index, trozos, top_k=50):
    try:
        pregunta_en_ingles = traducir(pregunta, "inglés")
        logging.info(f"Pregunta traducida al inglés: {pregunta_en_ingles}")

        pregunta_emb = generate_embedding(pregunta_en_ingles)
        logging.info("Embedding generado para la pregunta.")

        results = index.similarity_search(pregunta_emb, k=top_k)
        texto = ""
        for entry in results:
            resum = entry[0]["Summary"]
            texto += resum + "\n"

        logging.info("Contexto relevante recuperado para la pregunta.")
        return texto
    except Exception as e:
        logging.error(f"Error al obtener el contexto: {e}")
        return ""


# %% [markdown]
# ### **PASO 10: Generar Respuestas**
# Clasifica la pregunta, genera un prompt y obtiene una respuesta traducida al español.

# %%
def categorizar_pregunta(pregunta):
    categorias = {
        "tratamiento": ["tratamiento", "medicación", "cura", "terapia", "fármaco"],
        "ensayo": ["ensayo", "estudio", "prueba", "investigación", "trial"],
        "resultado": ["resultado", "efectividad", "resultados", "éxito", "fracaso"],
        "prevención": ["prevención", "previene", "evitar", "reducción de riesgo"]
    }
    for categoria, palabras in categorias.items():
        if any(palabra in pregunta.lower() for palabra in palabras):
            return categoria
    return "general"

def generar_prompt(categoria, pregunta):
    prompts = {
        "tratamiento": f"Proporciona información sobre tratamientos en ensayos clínicos relacionados con: {pregunta}.",
        "ensayo": f"Describe los ensayos clínicos actuales relacionados con: {pregunta}.",
        "resultado": f"Explica los resultados más recientes de ensayos clínicos sobre: {pregunta}.",
        "prevención": f"Ofrece información sobre prevención y ensayos clínicos para: {pregunta}."
    }
    return prompts.get(categoria, "Por favor, responde la pregunta sobre ensayos clínicos.")

def es_saludo(pregunta):
    saludos = ["hola", "buen día", "buenas", "cómo estás", "cómo te llamas", "qué tal", "estás bien", "buenas tardes", "buenas noches"]
    return any(saludo in pregunta.lower() for saludo in saludos)

def responder_saludo():
    saludos_respuestas = [
        "¡Hola! Estoy para ayudarte con información sobre ensayos clínicos. ¿En qué puedo asistirte hoy?",
        "¡Buenas! Tenés alguna pregunta sobre ensayos clínicos en enfermedades neuromusculares?",
        "¡Hola! ¿Cómo puedo ayudarte con tus consultas sobre ensayos clínicos?"
    ]
    import random
    return random.choice(saludos_respuestas)

def generar_respuesta(pregunta, contexto, prompt_especifico):
    mensajes = [
        ChatMessage(role="system", content="Eres un experto médico."),
        ChatMessage(role="user", content=f"{prompt_especifico}\nContexto: {contexto}\nPregunta: {pregunta}")
    ]
    start_time = time.time()
    try:
        respuesta = gemini_llm.chat(mensajes)
        elapsed_time = time.time() - start_time
        logging.info(f"Respuesta generada en inglés en {elapsed_time:.2f} segundos.")
        respuesta_en_espanol = traducir(respuesta.message.content, "español")
        logging.info("Respuesta traducida al español.")
        return respuesta_en_espanol
    except Exception as e:
        logging.error(f"Error al generar la respuesta: {e}")
        return "Lo siento, ocurrió un error al generar la respuesta."


# %% [markdown]
# ### **PASO 11: Función Principal para Responder Preguntas**
# Integra caché, categorización, contexto y respuesta.

# %%
def generar_hash(pregunta):
    return hashlib.sha256(pregunta.encode('utf-8')).hexdigest()

def obtener_respuesta_cacheada(pregunta):
    hash_pregunta = generar_hash(pregunta)
    archivo_cache = f"cache/{hash_pregunta}.json"
    if os.path.exists(archivo_cache):
        try:
            with open(archivo_cache, "r", encoding='utf-8') as f:
                datos = json.load(f)
                return datos.get("respuesta", None)
        except Exception as e:
            logging.error(f"Error al leer el caché: {e}")
            return None
    return None

def guardar_respuesta_cacheada(pregunta, respuesta):
    hash_pregunta = generar_hash(pregunta)
    archivo_cache = f"cache/{hash_pregunta}.json"
    try:
        os.makedirs(os.path.dirname(archivo_cache), exist_ok=True)
        with open(archivo_cache, "w", encoding='utf-8') as f:
            json.dump({"pregunta": pregunta, "respuesta": respuesta}, f, ensure_ascii=False, indent=4)
        logging.info(f"Respuesta cacheada para la pregunta: '{pregunta}'")
    except Exception as e:
        logging.error(f"Error al guardar la respuesta en caché: {e}")

def responder_pregunta(pregunta, index, trozos):
    try:
        if index is None or not trozos:
            logging.warning("No se encontraron índices o trozos para esta pregunta.")
            return "No se encontró información para responder tu pregunta."

        respuesta_cacheada = obtener_respuesta_cacheada(pregunta)
        if respuesta_cacheada:
            logging.info(f"Respuesta obtenida del caché para: '{pregunta}'")
            return respuesta_cacheada

        categoria = categorizar_pregunta(pregunta)
        logging.info(f"Categoría de la pregunta: {categoria}")
        prompt_especifico = generar_prompt(categoria, pregunta)
        logging.info(f"Prompt específico: {prompt_especifico}")

        contexto = obtener_contexto(pregunta, index, trozos)
        if not contexto.strip():
            logging.warning("No se encontró contexto relevante.")
            respuesta = "No pude encontrar información relevante para responder tu pregunta."
            guardar_respuesta_cacheada(pregunta, respuesta)
            return respuesta

        respuesta = generar_respuesta(pregunta, contexto, prompt_especifico)
        guardar_respuesta_cacheada(pregunta, respuesta)
        return respuesta
    except Exception as e:
        logging.error(f"Error en el proceso de responder pregunta: {e}")
        return "Ocurrió un error al procesar tu pregunta."


# %% [markdown]
# ### **PASO 12: Interfaz CLI**
# Proporciona interfaz en línea de comando.

# %%
if __name__ == "__main__":
    os.makedirs("cache", exist_ok=True)
    
    if len(documentos) == 0:
        print("No se cargaron documentos. Por favor, verifica el directorio 'data'.")
        logging.error("No se encontraron documentos. Finalizando.")
    else:
        print("Bienvenido al Chatbot de Ensayos Clínicos")
        print("Conversemos sobre Ensayos Clínicos en enfermedades neuromusculares (Distrofia Muscular de Duchenne o Becker, Enfermedad de Pompe, Distrofia Miotónica, etc.).")
        print("Escribí tu pregunta, indicando la enfermedad sobre la que quieres información. Escribí 'salir' para terminar.")
        while True:
            pregunta = input("Tu pregunta: ").strip()
            if pregunta.lower() in ['salir', 'chau', 'exit', 'quit']:
                print("¡Chau!")
                logging.info("El usuario ha finalizado la sesión.")
                break

            if es_saludo(pregunta):
                respuesta_saludo = responder_saludo()
                print(respuesta_saludo)
                logging.info("Se detectó un saludo.")
                continue
            
            idn = doc_enfermedad(pregunta)
            index = index_archivos[idn] if idn < len(index_archivos) else None
            trozos = trozos_archivos[idn] if idn < len(trozos_archivos) else []

            respuesta = responder_pregunta(pregunta, index, trozos)
            print(f"Respuesta: {respuesta}")


In [15]:
# Ajustes solicitados aplicados sobre la misma base de código

import sys
import os
import logging
import json
import time
import hashlib
import numpy as np
import hnswlib
from sentence_transformers import SentenceTransformer, util
from dotenv import load_dotenv
from PyPDF2 import PdfReader
from tenacity import retry, wait_exponential, stop_after_attempt, retry_if_exception_type
from llama_index.llms.gemini import Gemini
from llama_index.core.llms import ChatMessage

os.environ["TOKENIZERS_PARALLELISM"] = "false"

# Configuración de logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s'
)

logging.info("Cargando entorno y verificando variables...")

load_dotenv()
api_key = os.getenv("GEMINI_API_KEY")
if not api_key:
    logging.error("La clave API de Gemini no está configurada.")
    raise EnvironmentError("Configura GEMINI_API_KEY en tu archivo .env.")
gemini_llm = Gemini(api_key=api_key)
logging.info("Gemini configurado correctamente.")


# Carga de documentos
def extract_content(filepath):
    try:
        if filepath.endswith('.txt'):
            with open(filepath, 'r', encoding='utf-8') as file:
                content = file.read()
            units = content.split("\n-----\n")
            return units
        elif filepath.endswith('.json'):
            with open(filepath, 'r', encoding='utf-8') as file:
                data = json.load(file)
            return data
        elif filepath.endswith('.pdf'):
            reader = PdfReader(filepath)
            return ''.join(page.extract_text() or '' for page in reader.pages)
    except Exception as e:
        logging.error(f"Error al extraer contenido de '{filepath}': {e}")
        return None

def load_documents(source, is_directory=False):
    if not os.path.exists(source):
        logging.error(f"La fuente '{source}' no existe.")
        raise FileNotFoundError(f"La fuente '{source}' no se encontró.")

    loaded_files = []
    if is_directory:
        logging.info(f"Iniciando carga desde el directorio: {source}.")
        for filename in os.listdir(source):
            filepath = os.path.join(source, filename)
            if os.path.isfile(filepath) and filepath.endswith(('.txt', '.json', '.pdf')):
                content = extract_content(filepath)
                if content:
                    loaded_files.append({"filename": filename, "content": content})
                    logging.info(f"Archivo '{filename}' cargado correctamente.")
    else:
        logging.info(f"Iniciando carga del archivo: {source}.")
        content = extract_content(source)
        if content:
            loaded_files.append({"filename": os.path.basename(source), "content": content})
            logging.info(f"Archivo '{os.path.basename(source)}' cargado correctamente.")
    logging.info(f"{len(loaded_files)} documentos cargados.")
    return loaded_files

ruta_fuente = 'data'
documentos = load_documents(ruta_fuente, is_directory=True)
logging.info(f"Se cargaron {len(documentos)} documentos exitosamente.")


# Modelo de embeddings
model_name = "all-MiniLM-L6-v2"
model = SentenceTransformer(model_name)
logging.info("Modelo de embeddings cargado.")


# Clases Document e HNSWIndex
class Document:
    def __init__(self, text, metadata=None):
        self.page_content = text
        self.metadata = metadata or {}
    
    def __str__(self):
        return (
            f"Título: {self.metadata.get('Title', 'N/A')}\n"
            f"Resumen: {self.metadata.get('Summary', 'N/A')}\n"
            f"Tipo de Estudio: {self.metadata.get('StudyType', 'N/A')}\n"
            f"Paises donde se desarrolla el estudio: {self.metadata.get('Countries', 'N/A')}\n"
            f"Fase en que se encuentra el estudio: {self.metadata.get('Phases', 'N/A')}\n"
            f"Identificación en ClinicaTrial: {self.metadata.get('IDestudio', 'N/A')}.\n\n"
        )

class HNSWIndex:
    def __init__(self, embeddings, metadata=None, space='cosine', ef_construction=200, M=16):
        self.dimension = embeddings.shape[1]
        self.index = hnswlib.Index(space=space, dim=self.dimension)
        self.index.init_index(max_elements=embeddings.shape[0], ef_construction=ef_construction, M=M)
        self.index.add_items(embeddings, np.arange(embeddings.shape[0]))
        self.index.set_ef(50) 
        self.metadata = metadata or []
    
    def similarity_search(self, query_vector, k=5):
        labels, distances = self.index.knn_query(query_vector, k=k)
        return [(self.metadata[i], distances[0][j]) for j, i in enumerate(labels[0])]

# (2) Validación de Datos en Documentos JSON en desdobla_doc
def desdobla_doc(data2):
    documents = []
    summaries = []
    contenido = data2['content']
    
    if isinstance(contenido, list):
        for entry in contenido:
            if isinstance(entry, dict):
                # Validar campos esperados
                nctId = entry.get("IDestudio", "")
                briefTitle = entry.get("Title", "")
                summary = entry.get("Summary", "")
                studyType = entry.get("StudyType", "")
                country = entry.get("Countries", "")
                overallStatus = entry.get("OverallStatus", "")
                conditions = entry.get("Conditions", "")
                phases = entry.get("Phases", "")

                # Si algún campo crítico está vacío, loggear advertencia
                if not briefTitle or not summary:
                    logging.warning(f"Entrada JSON con campos faltantes. Se utilizan valores por defecto. ID: {nctId}")
                
                Summary = (
                    f"The study titled '{briefTitle}', of type '{studyType}', "
                    f"investigates the condition(s): {conditions}. "
                    f"Brief summary: {summary}. "
                    f"Current status: {overallStatus}, taking place in {country}. "
                    f"The study is classified under: {phases} phase. "
                    f"For more info, search {nctId} on ClinicalTrials."
                )
                metadata = {
                    "Title": briefTitle if briefTitle else "N/A",
                    "Summary": Summary if summary else "No summary available",
                    "StudyType": studyType if studyType else "N/A",
                    "Countries": country if country else "N/A",
                    "Phases": phases if phases else "N/A",
                    "IDestudio": nctId if nctId else "N/A"
                }
                doc = Document(Summary, metadata)
                documents.append(doc)
                summaries.append(Summary)
            else:
                # Si no es dict, manejar como texto genérico
                texto = str(entry)
                metadata = {"Summary": texto}
                documents.append(Document(texto, metadata))
                summaries.append(texto)
    else:
        texto = str(contenido)
        metadata = {"Summary": texto}
        documents.append(Document(texto, metadata))
        summaries.append(texto)

    if documents:
        embeddings = model.encode([doc.page_content for doc in documents], show_progress_bar=False)
        embeddings = np.array(embeddings).astype(np.float32)
        vector_store = HNSWIndex(embeddings, metadata=[doc.metadata for doc in documents])
    else:
        vector_store = None

    return documents, vector_store

trozos_archivos = []
index_archivos = []
for i in range(len(documentos)):
    trozos, index = desdobla_doc(documentos[i])
    trozos_archivos.append(trozos)
    index_archivos.append(index)

logging.info("Índices HNSWlib creados para todos los documentos.")


# Precomputation de embeddings de nombres
archivos = [doc['filename'] for doc in documentos]
archivos_embeddings = model.encode(archivos)
logging.info("Embeddings de nombres de archivos precalculados.")


def doc_enfermedad(pregunta):
    if not documentos:
        logging.warning("No se encontraron documentos. Índice por defecto: 0.")
        return 0
    preg_embedding = model.encode(pregunta)
    similarities = [util.cos_sim(preg_embedding, emb.reshape(1, -1)).item() for emb in archivos_embeddings]
    max_index = similarities.index(max(similarities))
    return max_index


# (4) Manejo de Idioma Dinámico: función para detectar idioma básico
def detectar_idioma(texto):
    # Heurística simple: si contiene palabras muy comunes del español, asumimos español.
    # De lo contrario, asumimos inglés. Esto es muy rudimentario.
    palabras_es = [" el ", " la ", " de ", " en ", " y ", " un ", " una ", " los ", " las "]
    texto_lower = " " + texto.lower() + " "
    score_es = sum(word in texto_lower for word in palabras_es)
    # Si score_es > cierto umbral, asumimos español
    if score_es > 2:
        return "es"
    else:
        return "en"

translation_cache = {}
embedding_cache = {}

def traducir(texto, idioma_destino):
    idioma_origen = detectar_idioma(texto)
    # Si el origen y el destino son iguales, no traducir
    if (idioma_origen == "es" and idioma_destino == "español") or (idioma_origen == "en" and idioma_destino == "inglés"):
        logging.info("Texto ya en el idioma destino, no se traduce.")
        return texto

    cache_key = (texto, idioma_destino)
    if cache_key in translation_cache:
        logging.info("Traducción obtenida de caché.")
        return translation_cache[cache_key]

    start_time = time.time()
    mensajes = [
        ChatMessage(role="system", content="Actúa como un traductor."),
        ChatMessage(role="user", content=f"Por favor, traduce este texto al {idioma_destino}: {texto}")
    ]
    try:
        respuesta = gemini_llm.chat(mensajes)
        elapsed_time = time.time() - start_time
        logging.info(f"Traducción completada en {elapsed_time:.2f} segundos.")
        translation_cache[cache_key] = respuesta.message.content.strip()
        return respuesta.message.content.strip()
    except Exception as e:
        logging.error(f"Error al traducir: {e}")
        return texto

def generate_embedding(texto):
    if texto in embedding_cache:
        logging.info("Embedding obtenido de caché.")
        return embedding_cache[texto]
    try:
        embedding = model.encode([texto])
        logging.info(f"Embedding generado para el texto: {texto}")
        embedding_cache[texto] = embedding
        return embedding
    except Exception as e:
        logging.error(f"Error al generar el embedding: {e}")
        return np.zeros((1, 384))

def obtener_contexto(pregunta, index, trozos, top_k=50):
    try:
        pregunta_en_ingles = traducir(pregunta, "inglés")
        logging.info(f"Pregunta traducida al inglés: {pregunta_en_ingles}")
        pregunta_emb = generate_embedding(pregunta_en_ingles)
        logging.info("Embedding generado para la pregunta.")

        results = index.similarity_search(pregunta_emb, k=top_k)
        texto = ""
        for entry in results:
            resum = entry[0]["Summary"]
            texto += resum + "\n"

        logging.info("Contexto relevante recuperado para la pregunta.")
        return texto
    except Exception as e:
        logging.error(f"Error al obtener el contexto: {e}")
        return ""


# (3) Mejoras en la Categorización de Preguntas
def categorizar_pregunta(pregunta):
    # Se amplían las palabras clave y se agregan sinónimos en español e inglés
    categorias = {
        "tratamiento": ["tratamiento", "medicación", "cura", "terapia", "fármaco", "treatment", "drug", "therapy"],
        "ensayo": ["ensayo", "estudio", "prueba", "investigación", "trial", "study", "clinical trial", "research"],
        "resultado": ["resultado", "efectividad", "resultados", "éxito", "fracaso", "outcome", "result", "efficacy"],
        "prevención": ["prevención", "previene", "evitar", "reducción de riesgo", "prevent", "avoid", "risk reduction"]
    }

    pregunta_lower = pregunta.lower()
    # Si encuentra una coincidencia en cualquier categoría, la asigna
    for categoria, palabras in categorias.items():
        for palabra in palabras:
            if palabra in pregunta_lower:
                return categoria

    return "general"


def generar_prompt(categoria, pregunta):
    prompts = {
        "tratamiento": f"Proporciona información sobre tratamientos en ensayos clínicos relacionados con: {pregunta}.",
        "ensayo": f"Describe los ensayos clínicos actuales relacionados con: {pregunta}.",
        "resultado": f"Explica los resultados más recientes de ensayos clínicos sobre: {pregunta}.",
        "prevención": f"Ofrece información sobre prevención y ensayos clínicos para: {pregunta}."
    }
    return prompts.get(categoria, "Por favor, responde la pregunta sobre ensayos clínicos.")

def es_saludo(pregunta):
    saludos = ["hola", "buen día", "buenas", "cómo estás", "cómo te llamas", "qué tal", "estás bien", "buenas tardes", "buenas noches"]
    return any(saludo in pregunta.lower() for saludo in saludos)

def responder_saludo():
    saludos_respuestas = [
        "¡Hola! Estoy para ayudarte con información sobre ensayos clínicos. ¿En qué puedo asistirte hoy?",
        "¡Buenas! Tenés alguna pregunta sobre ensayos clínicos en enfermedades neuromusculares?",
        "¡Hola! ¿Cómo puedo ayudarte con tus consultas sobre ensayos clínicos?"
    ]
    import random
    return random.choice(saludos_respuestas)

def generar_respuesta(pregunta, contexto, prompt_especifico):
    mensajes = [
        ChatMessage(role="system", content="Eres un experto médico."),
        ChatMessage(role="user", content=f"{prompt_especifico}\nContexto: {contexto}\nPregunta: {pregunta}")
    ]
    start_time = time.time()
    try:
        respuesta = gemini_llm.chat(mensajes)
        elapsed_time = time.time() - start_time
        logging.info(f"Respuesta generada en inglés en {elapsed_time:.2f} segundos.")
        respuesta_en_espanol = traducir(respuesta.message.content, "español")
        logging.info("Respuesta traducida al español.")
        return respuesta_en_espanol
    except Exception as e:
        logging.error(f"Error al generar la respuesta: {e}")
        return "Lo siento, ocurrió un error al generar la respuesta."

def generar_hash(pregunta):
    return hashlib.sha256(pregunta.encode('utf-8')).hexdigest()

def obtener_respuesta_cacheada(pregunta):
    hash_pregunta = generar_hash(pregunta)
    archivo_cache = f"cache/{hash_pregunta}.json"
    if os.path.exists(archivo_cache):
        try:
            with open(archivo_cache, "r", encoding='utf-8') as f:
                datos = json.load(f)
                return datos.get("respuesta", None)
        except Exception as e:
            logging.error(f"Error al leer el caché: {e}")
            return None
    return None

def guardar_respuesta_cacheada(pregunta, respuesta):
    hash_pregunta = generar_hash(pregunta)
    archivo_cache = f"cache/{hash_pregunta}.json"
    try:
        os.makedirs(os.path.dirname(archivo_cache), exist_ok=True)
        with open(archivo_cache, "w", encoding='utf-8') as f:
            json.dump({"pregunta": pregunta, "respuesta": respuesta}, f, ensure_ascii=False, indent=4)
        logging.info(f"Respuesta cacheada para la pregunta: '{pregunta}'")
    except Exception as e:
        logging.error(f"Error al guardar la respuesta en caché: {e}")

def responder_pregunta(pregunta, index, trozos):
    try:
        if index is None or not trozos:
            logging.warning("No se encontraron índices o trozos para esta pregunta.")
            return "No se encontró información para responder tu pregunta."

        respuesta_cacheada = obtener_respuesta_cacheada(pregunta)
        if respuesta_cacheada:
            logging.info(f"Respuesta obtenida del caché para: '{pregunta}'")
            return respuesta_cacheada

        categoria = categorizar_pregunta(pregunta)
        logging.info(f"Categoría de la pregunta: {categoria}")
        prompt_especifico = generar_prompt(categoria, pregunta)
        logging.info(f"Prompt específico: {prompt_especifico}")

        contexto = obtener_contexto(pregunta, index, trozos)
        if not contexto.strip():
            logging.warning("No se encontró contexto relevante.")
            respuesta = "No pude encontrar información relevante para responder tu pregunta."
            guardar_respuesta_cacheada(pregunta, respuesta)
            return respuesta

        respuesta = generar_respuesta(pregunta, contexto, prompt_especifico)
        guardar_respuesta_cacheada(pregunta, respuesta)
        return respuesta
    except Exception as e:
        logging.error(f"Error en el proceso de responder pregunta: {e}")
        return "Ocurrió un error al procesar tu pregunta."


if __name__ == "__main__":
    os.makedirs("cache", exist_ok=True)
    
    if len(documentos) == 0:
        print("No se cargaron documentos. Por favor, verifica el directorio 'data'.")
        logging.error("No se encontraron documentos. Finalizando.")
    else:
        print("Bienvenido al Chatbot de Ensayos Clínicos")
        print("Conversemos sobre Ensayos Clínicos en enfermedades neuromusculares (Distrofia Muscular de Duchenne o Becker, Enfermedad de Pompe, Distrofia Miotónica, etc.).")
        print("Escribí tu pregunta, indicando la enfermedad sobre la que quieres información. Escribí 'salir' para terminar.")
        while True:
            pregunta = input("Tu pregunta: ").strip()
            if pregunta.lower() in ['salir', 'chau', 'exit', 'quit']:
                print("¡Chau!")
                logging.info("El usuario ha finalizado la sesión.")
                break

            if es_saludo(pregunta):
                respuesta_saludo = responder_saludo()
                print(respuesta_saludo)
                logging.info("Se detectó un saludo.")
                continue
            
            idn = doc_enfermedad(pregunta)
            index = index_archivos[idn] if idn < len(index_archivos) else None
            trozos = trozos_archivos[idn] if idn < len(trozos_archivos) else []

            respuesta = responder_pregunta(pregunta, index, trozos)
            print(f"Respuesta: {respuesta}")


2024-12-06 23:49:44,322 - INFO - Cargando entorno y verificando variables...
2024-12-06 23:49:45,307 - INFO - Gemini configurado correctamente.
2024-12-06 23:49:45,309 - INFO - Iniciando carga desde el directorio: data.
2024-12-06 23:49:45,318 - INFO - Archivo 'resumenes_lupus.json' cargado correctamente.
2024-12-06 23:49:45,321 - INFO - Archivo 'resumenes_duchenne.json' cargado correctamente.
2024-12-06 23:49:45,328 - INFO - Archivo 'resumenes_pompe.json' cargado correctamente.
2024-12-06 23:49:45,330 - INFO - Archivo 'resumenes_becker.json' cargado correctamente.
2024-12-06 23:49:45,332 - INFO - Archivo 'resumenes_glycogen storage disease.json' cargado correctamente.
2024-12-06 23:49:45,334 - INFO - Archivo 'resumenes_myotonic dystrophy.json' cargado correctamente.
2024-12-06 23:49:45,335 - INFO - 6 documentos cargados.
2024-12-06 23:49:45,336 - INFO - Se cargaron 6 documentos exitosamente.
2024-12-06 23:49:45,346 - INFO - Use pytorch device_name: mps
2024-12-06 23:49:45,346 - INFO -

Bienvenido al Chatbot de Ensayos Clínicos
Conversemos sobre Ensayos Clínicos en enfermedades neuromusculares (Distrofia Muscular de Duchenne o Becker, Enfermedad de Pompe, Distrofia Miotónica, etc.).
Escribí tu pregunta, indicando la enfermedad sobre la que quieres información. Escribí 'salir' para terminar.


Batches: 100%|██████████| 1/1 [00:00<00:00,  1.08it/s]
2024-12-07 00:06:51,908 - INFO - Categoría de la pregunta: ensayo
2024-12-07 00:06:51,910 - INFO - Prompt específico: Describe los ensayos clínicos actuales relacionados con: ¿Cuantos ensayos clínicos están activos actualmente para la Distrofia Muscular de Duchenne?.
2024-12-07 00:06:51,911 - INFO - Texto ya en el idioma destino, no se traduce.
2024-12-07 00:06:51,912 - INFO - Pregunta traducida al inglés: ¿Cuantos ensayos clínicos están activos actualmente para la Distrofia Muscular de Duchenne?
Batches: 100%|██████████| 1/1 [00:00<00:00,  6.07it/s]
2024-12-07 00:06:52,086 - INFO - Embedding generado para el texto: ¿Cuantos ensayos clínicos están activos actualmente para la Distrofia Muscular de Duchenne?
2024-12-07 00:06:52,087 - INFO - Embedding generado para la pregunta.
2024-12-07 00:06:52,095 - INFO - Contexto relevante recuperado para la pregunta.
2024-12-07 00:06:55,464 - INFO - Respuesta generada en inglés en 3.37 segundos

Respuesta: Basándome en la información proporcionada, hay **dos** ensayos clínicos activos actualmente para la Distrofia Muscular de Duchenne:

1. **NCT05712447:**  "Duchenne Muscular Dystrophy Video Assessment Registry"  (Estado: ACTIVE_NOT_RECRUITING)
2. **NCT04587908:** "A Phase 3 Study of TAS-205 in Patients With Duchenne Muscular Dystrophy（REACH-DMD）" (Estado: ACTIVE_NOT_RECRUITING)


Es importante destacar que "activo" puede interpretarse de diferentes maneras.  Estos dos estudios están activos en el sentido de que aún están recopilando y analizando datos, aunque no estén reclutando nuevos participantes.  Muchos otros estudios listados están completados, reclutamiento, o con estado desconocido, por lo que no se consideran activos en este momento.  Para una lista completa y actualizada de ensayos clínicos activos, se debe consultar directamente la base de datos de ClinicalTrials.gov.  Mi análisis se limita a la información proporcionada.



2024-12-07 00:07:01,125 - INFO - El usuario ha finalizado la sesión.


¡Chau!


In [16]:
if __name__ == "__main__":
    os.makedirs("cache", exist_ok=True)
    
    if len(documentos) == 0:
        print("No se cargaron documentos. Por favor, verifica el directorio 'data'.")
        logging.error("No se encontraron documentos. Finalizando.")
    else:
        print("Bienvenido al Chatbot de Ensayos Clínicos")
        print("Conversemos sobre Ensayos Clínicos en enfermedades neuromusculares (Distrofia Muscular de Duchenne o Becker, Enfermedad de Pompe, Distrofia Miotónica, etc.).")
        print("Escribí tu pregunta, indicando la enfermedad sobre la que quieres información. Escribí 'salir' para terminar.")
        while True:
            pregunta = input("Tu pregunta: ").strip()
            if pregunta.lower() in ['salir', 'chau', 'exit', 'quit']:
                print("¡Chau!")
                logging.info("El usuario ha finalizado la sesión.")
                break

            if es_saludo(pregunta):
                respuesta_saludo = responder_saludo()
                print(respuesta_saludo)
                logging.info("Se detectó un saludo.")
                continue
            
            idn = doc_enfermedad(pregunta)
            index = index_archivos[idn] if idn < len(index_archivos) else None
            trozos = trozos_archivos[idn] if idn < len(trozos_archivos) else []

            respuesta = responder_pregunta(pregunta, index, trozos)
            print(f"Respuesta: {respuesta}")


Bienvenido al Chatbot de Ensayos Clínicos
Conversemos sobre Ensayos Clínicos en enfermedades neuromusculares (Distrofia Muscular de Duchenne o Becker, Enfermedad de Pompe, Distrofia Miotónica, etc.).
Escribí tu pregunta, indicando la enfermedad sobre la que quieres información. Escribí 'salir' para terminar.


Batches: 100%|██████████| 1/1 [00:00<00:00,  8.49it/s]
2024-12-07 00:07:35,932 - INFO - Categoría de la pregunta: ensayo
2024-12-07 00:07:35,933 - INFO - Prompt específico: Describe los ensayos clínicos actuales relacionados con: ¿Cuantos ensayos clínicos están activos actualmente para la Distrofia Muscular de Duchenne?.
2024-12-07 00:07:35,934 - INFO - Texto ya en el idioma destino, no se traduce.
2024-12-07 00:07:35,934 - INFO - Pregunta traducida al inglés: ¿Cuantos ensayos clínicos están activos actualmente para la Distrofia Muscular de Duchenne?
2024-12-07 00:07:35,935 - INFO - Embedding obtenido de caché.
2024-12-07 00:07:35,936 - INFO - Embedding generado para la pregunta.
2024-12-07 00:07:35,938 - INFO - Contexto relevante recuperado para la pregunta.
2024-12-07 00:07:37,980 - INFO - Respuesta generada en inglés en 2.04 segundos.
2024-12-07 00:07:37,981 - INFO - Texto ya en el idioma destino, no se traduce.
2024-12-07 00:07:37,982 - INFO - Respuesta traducida al español.
2024-1

Respuesta: Basándome en la información proporcionada, hay **dos** ensayos clínicos activos actualmente para la Distrofia Muscular de Duchenne:

1. **NCT05712447:**  "Duchenne Muscular Dystrophy Video Assessment Registry"  (ACTIVO_NO_RECLUTANDO)
2. **NCT04587908:** "A Phase 3 Study of TAS-205 in Patients With Duchenne Muscular Dystrophy（REACH-DMD）" (ACTIVO_NO_RECLUTANDO)


Es importante destacar que esta respuesta se basa únicamente en los datos proporcionados.  El número real de ensayos clínicos activos para la Distrofia Muscular de Duchenne en todo el mundo es significativamente mayor y se puede encontrar información más completa en bases de datos como ClinicalTrials.gov realizando una búsqueda específica.  La información aquí presentada representa solo una pequeña muestra.



2024-12-07 00:08:00,093 - INFO - El usuario ha finalizado la sesión.


¡Chau!


In [None]:
# %% [markdown]
# ### **PASO 1: Verificar versión de Python**
# Se valida que la versión de Python sea la requerida.
#
# - **Líneas Clave:**
#   - Importación de librerías para verificar la versión (`sys`, `os`).
#   - Configuración de logging para advertencias y mensajes informativos.
#   - Comparación de la versión actual con la requerida.
# - **Oportunidad de Mejora:** Agregar soporte para versiones cercanas si no es crítica la compatibilidad exacta.


In [17]:
# %%
import sys
import os
import logging

# Desactivar advertencias de paralelización en tokenizadores
os.environ["TOKENIZERS_PARALLELISM"] = "false"

# Configurar logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
)

# Versión requerida
REQUIRED_VERSION = (3, 10, 12)
current_version = sys.version_info

# Validar compatibilidad de versión
if (current_version.major, current_version.minor, current_version.micro) != REQUIRED_VERSION:
    logging.warning(f"""
    **********************************************
    ** Advertencia: Versión de Python no compatible **
    **********************************************
    Este chatbot está optimizado para Python {REQUIRED_VERSION[0]}.{REQUIRED_VERSION[1]}.{REQUIRED_VERSION[2]}.
    La versión actual es Python {current_version.major}.{current_version.minor}.{current_version.micro}.
    Algunas funcionalidades pueden no funcionar correctamente.
    **********************************************
    """)
else:
    logging.info("""
    **********************************************
    ** Versión de Python compatible **
    **********************************************
    Python 3.10.12 detectado correctamente.
    Todas las funcionalidades deberían operar sin problemas.
    **********************************************
    """)


2024-12-07 00:10:21,731 - INFO - 
    **********************************************
    ** Versión de Python compatible **
    **********************************************
    Python 3.10.12 detectado correctamente.
    Todas las funcionalidades deberían operar sin problemas.
    **********************************************
    


In [None]:
# %%
# Instalar las bibliotecas necesarias desde el archivo requirements.txt
# Asegúrate de tener un archivo 'requirements.txt' en el directorio raíz con todas las dependencias listadas
%pip install -r requirements.txt


In [18]:
# %%
import os
import json
import logging
import hnswlib
from sentence_transformers import SentenceTransformer, util
import numpy as np
from dotenv import load_dotenv
from PyPDF2 import PdfReader
from tenacity import retry, wait_exponential, stop_after_attempt, retry_if_exception_type
from llama_index.llms.gemini import Gemini
from llama_index.core.llms import ChatMessage
import time
import hashlib

# Configuración de logs para imprimir todo en consola
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    handlers=[logging.StreamHandler()]
)

# Mensaje de confirmación de importación
logging.info("Librerías importadas correctamente.")

# Cargar variables de entorno desde un archivo .env
load_dotenv()
logging.info("Variables de entorno cargadas desde el archivo .env.")


2024-12-07 00:10:44,568 - INFO - Librerías importadas correctamente.
2024-12-07 00:10:44,577 - INFO - Variables de entorno cargadas desde el archivo .env.


In [19]:
# %%
def load_documents(source, is_directory=False):
    """
    Carga documentos desde un archivo o directorio.
    
    Args:
        source (str): Ruta al archivo o directorio.
        is_directory (bool): Indica si la fuente es un directorio.
    
    Returns:
        list: Lista de diccionarios con 'filename' y 'content'.
    """
    if not os.path.exists(source):
        logging.error(f"La fuente '{source}' no existe.")
        raise FileNotFoundError(f"La fuente '{source}' no se encontró.")

    loaded_files = []
    if is_directory:
        logging.info(f"Iniciando carga desde el directorio: {source}.")
        for filename in os.listdir(source):
            filepath = os.path.join(source, filename)
            if os.path.isfile(filepath) and filepath.endswith(('.txt', '.json', '.pdf')):
                content = extract_content(filepath)
                if content:
                    loaded_files.append({"filename": filename, "content": content})
                    logging.info(f"Archivo '{filename}' cargado correctamente.")
    else:
        logging.info(f"Iniciando carga del archivo: {source}.")
        content = extract_content(source)
        if content:
            loaded_files.append({"filename": os.path.basename(source), "content": content})
            logging.info(f"Archivo '{os.path.basename(source)}' cargado correctamente.")

    logging.info(f"{len(loaded_files)} documentos cargados.")
    return loaded_files

def extract_content(filepath):
    """
    Extrae el contenido del archivo según su tipo.
    
    Args:
        filepath (str): Ruta al archivo.
    
    Returns:
        list o dict o str: Contenido procesado del archivo.
    """
    try:
        if filepath.endswith('.txt'):
            with open(filepath, 'r', encoding='utf-8') as file:
                content = file.read()
            units = content.split("\n-----\n")
            return units
        elif filepath.endswith('.json'):
            with open(filepath, 'r', encoding='utf-8') as file:
                data = json.load(file)
            return data
        elif filepath.endswith('.pdf'):
            reader = PdfReader(filepath)
            return ''.join(page.extract_text() or '' for page in reader.pages)
    except Exception as e:
        logging.error(f"Error al extraer contenido de '{filepath}': {e}")
        return None

# Configuración de ruta y carga de documentos
ruta_fuente = 'data'  # Asegúrate de tener una carpeta 'data' con los documentos
documentos = load_documents(ruta_fuente, is_directory=True)
logging.info(f"Se cargaron {len(documentos)} documentos exitosamente.")


2024-12-07 00:10:52,380 - INFO - Iniciando carga desde el directorio: data.
2024-12-07 00:10:52,401 - INFO - Archivo 'resumenes_lupus.json' cargado correctamente.
2024-12-07 00:10:52,405 - INFO - Archivo 'resumenes_duchenne.json' cargado correctamente.
2024-12-07 00:10:52,407 - INFO - Archivo 'resumenes_pompe.json' cargado correctamente.
2024-12-07 00:10:52,410 - INFO - Archivo 'resumenes_becker.json' cargado correctamente.
2024-12-07 00:10:52,414 - INFO - Archivo 'resumenes_glycogen storage disease.json' cargado correctamente.
2024-12-07 00:10:52,417 - INFO - Archivo 'resumenes_myotonic dystrophy.json' cargado correctamente.
2024-12-07 00:10:52,418 - INFO - 6 documentos cargados.
2024-12-07 00:10:52,424 - INFO - Se cargaron 6 documentos exitosamente.


In [20]:
# %%
gemini_llm = None

def configure_gemini():
    """
    Configura la instancia de Gemini usando la clave API.
    
    Returns:
        Gemini: Instancia configurada del modelo Gemini.
    """
    api_key = os.getenv("GEMINI_API_KEY")
    if not api_key:
        logging.error("La clave API de Gemini no está configurada.")
        raise EnvironmentError("Configura GEMINI_API_KEY en tu archivo .env.")
    gemini = Gemini(api_key=api_key)
    logging.info("Gemini configurado correctamente.")
    return gemini

gemini_llm = configure_gemini()


2024-12-07 00:11:02,677 - INFO - Gemini configurado correctamente.


In [21]:
# %%
model_name = "all-MiniLM-L6-v2"
model = SentenceTransformer(model_name)

# Precomputar los embeddings de los nombres de archivo para eficiencia
archivos = [doc['filename'] for doc in documentos]
archivos_embeddings = model.encode(archivos)

def doc_enfermedad(pregunta):
    """
    Identifica el índice del documento más relevante para la enfermedad en la pregunta.
    Utiliza embeddings precomputados de los nombres de archivo.
    
    Args:
        pregunta (str): Pregunta del usuario.
    
    Returns:
        int: Índice del documento más relevante.
    """
    if not documentos:
        logging.warning("No se encontraron documentos. Índice por defecto: 0.")
        return 0

    # Generar embedding de la pregunta
    preg_embedding = model.encode(pregunta)

    # Calcular similitudes con los embeddings de los nombres de archivo
    similarities = [util.cos_sim(preg_embedding, emb).item() for emb in archivos_embeddings]

    # Obtener el índice con mayor similitud
    max_index = similarities.index(max(similarities))
    return max_index


2024-12-07 00:11:07,928 - INFO - Use pytorch device_name: mps
2024-12-07 00:11:07,929 - INFO - Load pretrained SentenceTransformer: all-MiniLM-L6-v2
Batches: 100%|██████████| 1/1 [00:00<00:00,  6.04it/s]


In [22]:
# %%
class Document:
    def __init__(self, text, metadata=None):
        """
        Inicializa un documento con su contenido y metadatos.
        
        Args:
            text (str): Texto del documento.
            metadata (dict, optional): Metadatos asociados al documento.
        """
        self.page_content = text
        self.metadata = metadata or {}
    
    def __str__(self):
        """
        Representación en string del documento.
        
        Returns:
            str: Información formateada del documento.
        """
        return (
            f"Título: {self.metadata.get('Title', 'N/A')}\n"
            f"Resumen: {self.metadata.get('Summary', 'N/A')}\n"
            f"Tipo de Estudio: {self.metadata.get('StudyType', 'N/A')}\n"
            f"Paises donde se desarrolla el estudio: {self.metadata.get('Countries', 'N/A')}\n"
            f"Fase en que se encuentra el estudio: {self.metadata.get('Phases', 'N/A')}\n"
            f"Identificación en ClinicaTrial: {self.metadata.get('IDestudio', 'N/A')}.\n\n"
        )

class HNSWIndex:
    def __init__(self, embeddings, metadata=None, space='cosine', ef_construction=200, M=16):
        """
        Inicializa el índice HNSWlib con los embeddings proporcionados.
        
        Args:
            embeddings (np.ndarray): Matriz de embeddings.
            metadata (list, optional): Lista de metadatos asociados a cada embedding.
            space (str, optional): Espacio métrico para HNSWlib.
            ef_construction (int, optional): Parámetro ef para la construcción del índice.
            M (int, optional): Parámetro M para HNSWlib.
        """
        self.dimension = embeddings.shape[1]
        self.index = hnswlib.Index(space=space, dim=self.dimension)
        self.index.init_index(max_elements=embeddings.shape[0], ef_construction=ef_construction, M=M)
        self.index.add_items(embeddings, np.arange(embeddings.shape[0]))
        self.index.set_ef(50)  # Parámetro ef para consultas
        self.metadata = metadata or []
    
    def similarity_search(self, query_vector, k=5):
        """
        Realiza una búsqueda de los k vecinos más similares.
        
        Args:
            query_vector (np.ndarray): Vector de consulta.
            k (int, optional): Número de vecinos a buscar.
        
        Returns:
            list: Lista de tuplas con metadatos y distancias.
        """
        labels, distances = self.index.knn_query(query_vector, k=k)
        return [(self.metadata[i], distances[0][j]) for j, i in enumerate(labels[0])]


In [23]:
# %%
def desdobla_doc(data2):
    """
    Desdobla el contenido del documento en varios `Document` con metadatos.
    Maneja JSON (asumiendo estructura de ensayos clínicos) o texto/PDF genérico.
    
    Args:
        data2 (dict): Diccionario con 'filename' y 'content'.
    
    Returns:
        tuple: Lista de `Document` y instancia de `HNSWIndex`.
    """
    documents = []
    summaries = []
    contenido = data2['content']
    
    if isinstance(contenido, list):
        for entry in contenido:
            if isinstance(entry, dict):
                nctId = entry.get("IDestudio", "")
                briefTitle = entry.get("Title", "")
                summary = entry.get("Summary", "")
                studyType = entry.get("StudyType", "")
                country = entry.get("Countries", "")
                overallStatus = entry.get("OverallStatus", "")
                conditions = entry.get("Conditions", "")
                phases = entry.get("Phases", "")

                # Crear resumen en inglés para consistencia interna
                Summary = (
                    f"The study titled '{briefTitle}', of type '{studyType}', "
                    f"investigates the condition(s): {conditions}. "
                    f"Brief summary: {summary}. "
                    f"Current status: {overallStatus}, taking place in {country}. "
                    f"The study is classified under: {phases} phase. "
                    f"For more info, search {nctId} on ClinicalTrials."
                )
                metadata = {
                    "Title": briefTitle,
                    "Summary": Summary,
                    "StudyType": studyType,
                    "Countries": country,
                    "Phases": phases,
                    "IDestudio": nctId
                }
                doc = Document(Summary, metadata)
                documents.append(doc)
                summaries.append(Summary)
            else:
                # Si no es dict, tratar la entrada como texto genérico
                texto = str(entry)
                metadata = {"Summary": texto}
                doc = Document(texto, metadata)
                documents.append(doc)
                summaries.append(texto)
    else:
        # Texto genérico (PDF o TXT)
        texto = str(contenido)
        metadata = {"Summary": texto}
        doc = Document(texto, metadata)
        documents.append(doc)
        summaries.append(texto)

    if documents:
        embeddings = model.encode([doc.page_content for doc in documents], show_progress_bar=False)
        embeddings = np.array(embeddings).astype(np.float32)
        vector_store = HNSWIndex(embeddings, metadata=[doc.metadata for doc in documents])
    else:
        vector_store = None

    return documents, vector_store

# Procesar todos los documentos y crear sus respectivos índices
trozos_archivos = []
index_archivos = []
for i in range(len(documentos)):
    trozos, index = desdobla_doc(documentos[i])
    trozos_archivos.append(trozos)
    index_archivos.append(index)

logging.info("Índices HNSWlib creados para todos los documentos.")


2024-12-07 00:13:45,751 - INFO - Índices HNSWlib creados para todos los documentos.


In [24]:
# %%
def traducir(texto, idioma_destino):
    """
    Traduce texto al idioma especificado usando el modelo Gemini.
    En caso de error, se devuelve el texto original.
    
    Args:
        texto (str): Texto a traducir.
        idioma_destino (str): Idioma de destino.
    
    Returns:
        str: Texto traducido o original en caso de fallo.
    """
    start_time = time.time()
    mensajes = [
        ChatMessage(role="system", content="Actúa como un traductor."),
        ChatMessage(role="user", content=f"Por favor, traduce este texto al {idioma_destino}: {texto}")
    ]
    try:
        respuesta = gemini_llm.chat(mensajes)
        elapsed_time = time.time() - start_time
        logging.info(f"Traducción completada en {elapsed_time:.2f} segundos.")
        return respuesta.message.content.strip()
    except Exception as e:
        logging.error(f"Error al traducir: {e}")
        return texto  # fallback

def generate_embedding(texto):
    """
    Genera un embedding para el texto utilizando el modelo de embeddings.
    
    Args:
        texto (str): Texto para generar el embedding.
    
    Returns:
        np.ndarray: Embedding generado o vector de ceros en caso de fallo.
    """
    try:
        embedding = model.encode([texto])
        logging.info(f"Embedding generado para el texto: {texto}")
        return embedding
    except Exception as e:
        logging.error(f"Error al generar el embedding: {e}")
        # Devuelve embedding vacío como fallback
        return np.zeros((1, 384))
    
def obtener_contexto(pregunta, index, trozos, top_k=50):
    """
    Recupera los trozos de texto más relevantes para responder la pregunta.
    Traduce la pregunta al inglés antes de buscar en el índice.
    
    Args:
        pregunta (str): Pregunta del usuario.
        index (HNSWIndex): Índice de HNSWlib para buscar similitudes.
        trozos (list): Lista de `Document` relacionados.
        top_k (int, optional): Número de resultados a recuperar.
    
    Returns:
        str: Contexto relevante concatenado.
    """
    try:
        # Traducir la pregunta al inglés
        pregunta_en_ingles = traducir(pregunta, "inglés")
        logging.info(f"Pregunta traducida al inglés: {pregunta_en_ingles}")

        # Generar embedding de la pregunta traducida
        pregunta_emb = generate_embedding(pregunta_en_ingles)
        logging.info("Embedding generado para la pregunta.")

        # Buscar en el índice
        results = index.similarity_search(pregunta_emb, k=top_k)
        texto = ""
        for entry in results:
            resum = entry[0]["Summary"]
            texto += resum + "\n"

        logging.info("Contexto relevante recuperado para la pregunta.")
        return texto
    except Exception as e:
        logging.error(f"Error al obtener el contexto: {e}")
        return ""


In [25]:
# %%
def categorizar_pregunta(pregunta):
    """
    Clasifica la pregunta en categorías basadas en palabras clave.
    
    Args:
        pregunta (str): Pregunta del usuario.
    
    Returns:
        str: Categoría identificada.
    """
    categorias = {
        "tratamiento": ["tratamiento", "medicación", "cura", "terapia", "fármaco"],
        "ensayo": ["ensayo", "estudio", "prueba", "investigación", "trial"],
        "resultado": ["resultado", "efectividad", "resultados", "éxito", "fracaso"],
        "prevención": ["prevención", "previene", "evitar", "reducción de riesgo"]
    }
    for categoria, palabras in categorias.items():
        if any(palabra in pregunta.lower() for palabra in palabras):
            return categoria
    return "general"

def generar_prompt(categoria, pregunta):
    """
    Genera un prompt específico basado en la categoría de la pregunta.
    
    Args:
        categoria (str): Categoría de la pregunta.
        pregunta (str): Pregunta del usuario.
    
    Returns:
        str: Prompt generado.
    """
    prompts = {
        "tratamiento": f"Proporciona información sobre tratamientos en ensayos clínicos relacionados con: {pregunta}.",
        "ensayo": f"Describe los ensayos clínicos actuales relacionados con: {pregunta}.",
        "resultado": f"Explica los resultados más recientes de ensayos clínicos sobre: {pregunta}.",
        "prevención": f"Ofrece información sobre prevención y ensayos clínicos para: {pregunta}."
    }
    return prompts.get(categoria, "Por favor, responde la pregunta sobre ensayos clínicos.")

def es_saludo(pregunta):
    """
    Verifica si la pregunta del usuario es un saludo.
    
    Args:
        pregunta (str): Pregunta del usuario.
    
    Returns:
        bool: True si es un saludo, False de lo contrario.
    """
    saludos = ["hola", "buen día", "buenas", "cómo estás", "cómo te llamas", "qué tal", "estás bien", "buenas tardes", "buenas noches"]
    return any(saludo in pregunta.lower() for saludo in saludos)

def responder_saludo():
    """
    Genera una respuesta aleatoria a un saludo.
    
    Returns:
        str: Respuesta de saludo.
    """
    saludos_respuestas = [
        "¡Hola! Estoy para ayudarte con información sobre ensayos clínicos. ¿En qué puedo asistirte hoy?",
        "¡Buenas! Tenés alguna pregunta sobre ensayos clínicos en enfermedades neuromusculares?",
        "¡Hola! ¿Cómo puedo ayudarte con tus consultas sobre ensayos clínicos?"
    ]
    import random
    return random.choice(saludos_respuestas)

def generar_respuesta(pregunta, contexto, prompt_especifico):
    """
    Genera una respuesta usando el contexto proporcionado y un prompt específico.
    Primero genera la respuesta en inglés, luego la traduce al español.
    
    Args:
        pregunta (str): Pregunta del usuario.
        contexto (str): Contexto relevante recuperado.
        prompt_especifico (str): Prompt adaptado a la categoría de la pregunta.
    
    Returns:
        str: Respuesta generada en español.
    """
    mensajes = [
        ChatMessage(role="system", content="Eres un experto médico."),
        ChatMessage(role="user", content=f"{prompt_especifico}\nContexto: {contexto}\nPregunta: {pregunta}")
    ]
    start_time = time.time()
    try:
        respuesta = gemini_llm.chat(mensajes)
        elapsed_time = time.time() - start_time
        logging.info(f"Respuesta generada en inglés en {elapsed_time:.2f} segundos.")
        # Traducir la respuesta al español
        respuesta_en_espanol = traducir(respuesta.message.content, "español")
        logging.info("Respuesta traducida al español.")
        return respuesta_en_espanol
    except Exception as e:
        logging.error(f"Error al generar la respuesta: {e}")
        return "Lo siento, ocurrió un error al generar la respuesta."


In [26]:
# %%
def generar_hash(pregunta):
    """
    Genera un hash SHA-256 para una pregunta dada.
    
    Args:
        pregunta (str): Pregunta del usuario.
    
    Returns:
        str: Hash generado.
    """
    return hashlib.sha256(pregunta.encode('utf-8')).hexdigest()

def obtener_respuesta_cacheada(pregunta):
    """
    Obtiene una respuesta cacheada para una pregunta si existe.
    
    Args:
        pregunta (str): Pregunta del usuario.
    
    Returns:
        str o None: Respuesta cacheada o None si no existe.
    """
    hash_pregunta = generar_hash(pregunta)
    archivo_cache = f"cache/{hash_pregunta}.json"
    if os.path.exists(archivo_cache):
        try:
            with open(archivo_cache, "r", encoding='utf-8') as f:
                datos = json.load(f)
                return datos.get("respuesta", None)
        except Exception as e:
            logging.error(f"Error al leer el caché: {e}")
            return None
    return None

def guardar_respuesta_cacheada(pregunta, respuesta):
    """
    Guarda una respuesta en caché para una pregunta dada.
    
    Args:
        pregunta (str): Pregunta del usuario.
        respuesta (str): Respuesta generada.
    """
    hash_pregunta = generar_hash(pregunta)
    archivo_cache = f"cache/{hash_pregunta}.json"
    try:
        os.makedirs(os.path.dirname(archivo_cache), exist_ok=True)
        with open(archivo_cache, "w", encoding='utf-8') as f:
            json.dump({"pregunta": pregunta, "respuesta": respuesta}, f, ensure_ascii=False, indent=4)
        logging.info(f"Respuesta cacheada para la pregunta: '{pregunta}'")
    except Exception as e:
        logging.error(f"Error al guardar la respuesta en caché: {e}")

def responder_pregunta(pregunta, index, trozos):
    """
    Integra categorización, obtención de contexto y generación de respuesta.
    Incluye manejo de caché para respuestas repetidas.
    
    Args:
        pregunta (str): Pregunta del usuario.
        index (HNSWIndex): Índice de HNSWlib para búsqueda de contexto.
        trozos (list): Lista de `Document` relacionados.
    
    Returns:
        str: Respuesta generada.
    """
    try:
        if index is None or not trozos:
            logging.warning("No se encontraron índices o trozos para esta pregunta.")
            return "No se encontró información para responder tu pregunta."

        # Verificar caché
        respuesta_cacheada = obtener_respuesta_cacheada(pregunta)
        if respuesta_cacheada:
            logging.info(f"Respuesta obtenida del caché para: '{pregunta}'")
            return respuesta_cacheada

        # Categorizar la pregunta
        categoria = categorizar_pregunta(pregunta)
        logging.info(f"Categoría de la pregunta: {categoria}")

        # Generar prompt específico
        prompt_especifico = generar_prompt(categoria, pregunta)
        logging.info(f"Prompt específico: {prompt_especifico}")

        # Obtener contexto relevante
        contexto = obtener_contexto(pregunta, index, trozos)
        if not contexto.strip():
            logging.warning("No se encontró contexto relevante.")
            respuesta = "No pude encontrar información relevante para responder tu pregunta."
            guardar_respuesta_cacheada(pregunta, respuesta)
            return respuesta

        # Generar la respuesta
        respuesta = generar_respuesta(pregunta, contexto, prompt_especifico)

        # Guardar la respuesta en caché
        guardar_respuesta_cacheada(pregunta, respuesta)
        return respuesta
    except Exception as e:
        logging.error(f"Error en el proceso de responder pregunta: {e}")
        return "Ocurrió un error al procesar tu pregunta."


In [27]:
# %%
def generar_hash(pregunta):
    """
    Genera un hash SHA-256 para una pregunta dada.
    
    Args:
        pregunta (str): Pregunta del usuario.
    
    Returns:
        str: Hash generado.
    """
    return hashlib.sha256(pregunta.encode('utf-8')).hexdigest()

def obtener_respuesta_cacheada(pregunta):
    """
    Obtiene una respuesta cacheada para una pregunta si existe.
    
    Args:
        pregunta (str): Pregunta del usuario.
    
    Returns:
        str o None: Respuesta cacheada o None si no existe.
    """
    hash_pregunta = generar_hash(pregunta)
    archivo_cache = f"cache/{hash_pregunta}.json"
    if os.path.exists(archivo_cache):
        try:
            with open(archivo_cache, "r", encoding='utf-8') as f:
                datos = json.load(f)
                return datos.get("respuesta", None)
        except Exception as e:
            logging.error(f"Error al leer el caché: {e}")
            return None
    return None

def guardar_respuesta_cacheada(pregunta, respuesta):
    """
    Guarda una respuesta en caché para una pregunta dada.
    
    Args:
        pregunta (str): Pregunta del usuario.
        respuesta (str): Respuesta generada.
    """
    hash_pregunta = generar_hash(pregunta)
    archivo_cache = f"cache/{hash_pregunta}.json"
    try:
        os.makedirs(os.path.dirname(archivo_cache), exist_ok=True)
        with open(archivo_cache, "w", encoding='utf-8') as f:
            json.dump({"pregunta": pregunta, "respuesta": respuesta}, f, ensure_ascii=False, indent=4)
        logging.info(f"Respuesta cacheada para la pregunta: '{pregunta}'")
    except Exception as e:
        logging.error(f"Error al guardar la respuesta en caché: {e}")

def responder_pregunta(pregunta, index, trozos):
    """
    Integra categorización, obtención de contexto y generación de respuesta.
    Incluye manejo de caché para respuestas repetidas.
    
    Args:
        pregunta (str): Pregunta del usuario.
        index (HNSWIndex): Índice de HNSWlib para búsqueda de contexto.
        trozos (list): Lista de `Document` relacionados.
    
    Returns:
        str: Respuesta generada.
    """
    try:
        if index is None or not trozos:
            logging.warning("No se encontraron índices o trozos para esta pregunta.")
            return "No se encontró información para responder tu pregunta."

        # Verificar caché
        respuesta_cacheada = obtener_respuesta_cacheada(pregunta)
        if respuesta_cacheada:
            logging.info(f"Respuesta obtenida del caché para: '{pregunta}'")
            return respuesta_cacheada

        # Categorizar la pregunta
        categoria = categorizar_pregunta(pregunta)
        logging.info(f"Categoría de la pregunta: {categoria}")

        # Generar prompt específico
        prompt_especifico = generar_prompt(categoria, pregunta)
        logging.info(f"Prompt específico: {prompt_especifico}")

        # Obtener contexto relevante
        contexto = obtener_contexto(pregunta, index, trozos)
        if not contexto.strip():
            logging.warning("No se encontró contexto relevante.")
            respuesta = "No pude encontrar información relevante para responder tu pregunta."
            guardar_respuesta_cacheada(pregunta, respuesta)
            return respuesta

        # Generar la respuesta
        respuesta = generar_respuesta(pregunta, contexto, prompt_especifico)

        # Guardar la respuesta en caché
        guardar_respuesta_cacheada(pregunta, respuesta)
        return respuesta
    except Exception as e:
        logging.error(f"Error en el proceso de responder pregunta: {e}")
        return "Ocurrió un error al procesar tu pregunta."


In [29]:
# %%
if __name__ == "__main__":
    # Crear directorio de caché si no existe
    os.makedirs("cache", exist_ok=True)
    
    if len(documentos) == 0:
        print("No se cargaron documentos. Por favor, verifica el directorio 'data'.")
        logging.error("No se encontraron documentos. Finalizando.")
    else:
        print("Bienvenido al Chatbot de Ensayos Clínicos")
        print("Conversemos sobre Ensayos Clínicos en enfermedades neuromusculares (Distrofia Muscular de Duchenne o Becker, Enfermedad de Pompe, Distrofia Miotónica, etc.).")
        print("Escribí tu pregunta, indicando la enfermedad sobre la que quieres información. Escribí 'salir' para terminar.")
        while True:
            pregunta = input("Tu pregunta: ").strip()
            if pregunta.lower() in ['salir', 'chau', 'exit', 'quit']:
                print("¡Chau!")
                logging.info("El usuario ha finalizado la sesión.")
                break
            if es_saludo(pregunta):
                respuesta_saludo = responder_saludo()
                print(respuesta_saludo)
                logging.info("Se detectó un saludo.")
                continue
            
            # Identificar la enfermedad (documento más relevante)
            idn = doc_enfermedad(pregunta)
            index = index_archivos[idn] if idn < len(index_archivos) else None
            trozos = trozos_archivos[idn] if idn < len(trozos_archivos) else []

            # Responder la pregunta
            respuesta = responder_pregunta(pregunta, index, trozos)
            print(f"Respuesta: {respuesta}")


Bienvenido al Chatbot de Ensayos Clínicos
Conversemos sobre Ensayos Clínicos en enfermedades neuromusculares (Distrofia Muscular de Duchenne o Becker, Enfermedad de Pompe, Distrofia Miotónica, etc.).
Escribí tu pregunta, indicando la enfermedad sobre la que quieres información. Escribí 'salir' para terminar.


Batches: 100%|██████████| 1/1 [00:00<00:00,  6.40it/s]
2024-12-07 00:14:41,584 - INFO - Categoría de la pregunta: ensayo
2024-12-07 00:14:41,585 - INFO - Prompt específico: Describe los ensayos clínicos actuales relacionados con: ¿Cuantos ensayos clínicos están activos actualmente para la Distrofia Muscular de Duchenne?.
2024-12-07 00:14:42,097 - INFO - Traducción completada en 0.51 segundos.
2024-12-07 00:14:42,098 - INFO - Pregunta traducida al inglés: How many clinical trials are currently active for Duchenne Muscular Dystrophy?
Batches: 100%|██████████| 1/1 [00:00<00:00, 10.54it/s]
2024-12-07 00:14:42,198 - INFO - Embedding generado para el texto: How many clinical trials are currently active for Duchenne Muscular Dystrophy?
2024-12-07 00:14:42,199 - INFO - Embedding generado para la pregunta.
2024-12-07 00:14:42,200 - INFO - Contexto relevante recuperado para la pregunta.
2024-12-07 00:14:47,660 - INFO - Respuesta generada en inglés en 5.46 segundos.
2024-12-07 00:14:53,385 - INFO

Respuesta: Basándome en la información proporcionada, hay **11 ensayos clínicos activos actualmente** para la Distrofia Muscular de Duchenne. Estos son:

* **Registro de Duchenne (NCT02069756):** RECLUTANDO
* **Estudio a largo plazo y de extensión de DS-5141b en pacientes con Distrofia Muscular de Duchenne (NCT04433234):** ACTIVO, NO RECLUTANDO
* **Registro de evaluación por video de la Distrofia Muscular de Duchenne (NCT05712447):** ACTIVO, NO RECLUTANDO
* **Estudio de SRP-4045 (Casimersen) y SRP-4053 (Golodirsen) en participantes con Distrofia Muscular de Duchenne (DMD) (NCT02500381):** ACTIVO, NO RECLUTANDO
* **AFFINITY DUCHENNE: Terapia génica RGX-202 en participantes con Distrofia Muscular de Duchenne (DMD) (NCT05693142):** RECLUTANDO
* **Un estudio de Fase 3 de TAS-205 en pacientes con Distrofia Muscular de Duchenne (REACH-DMD) (NCT04587908):** ACTIVO, NO RECLUTANDO
* **Un estudio de la terapia génica SGT-003 en la Distrofia Muscular de Duchenne (INSPIRE DUCHENNE) (NCT06138639):*

2024-12-07 00:14:56,189 - INFO - El usuario ha finalizado la sesión.


¡Chau!
