En esta lección veremos cómo crear un chatbot a partir de un conjunto de datos de documentos PDF con Cortex Search. Mostraremos un ejemplo de extracción de texto de los PDF mediante una UDF básica de Python y, posteriormente, la ingesta de los datos extraídos en un servicio de Cortex Search.

### Cargar los datos en Snowflake

Primero, vamos a crear un stage para almacenar los archivos que contienen los datos. Este stage contendrá los archivos PDF que se encuentran adjuntos como recurso a esta lección.

In [None]:
CREATE OR REPLACE STAGE curso_ia.seccion_4.datos_aws
    DIRECTORY = (ENABLE = TRUE)
    ENCRYPTION = (TYPE = 'SNOWFLAKE_SSE');

### Cargar los PDFs al stage

A continuación deberán cargar los PDFs adjuntos como recurso a esta lección al stage que acabamos de crear.

### Parsear los archivos PDF

En este paso, extraeremos el texto sin procesar de los PDF y lo dividiremos en fragmentos para su incorporación al servicio de búsqueda.

Primero, usaremos la función `PARSE_DOCUMENT` de Cortex para extraer el texto de los PDF en una nueva tabla: `AWS_RAW_TEXT`.

In [None]:
CREATE OR REPLACE TABLE curso_ia.seccion_4.aws_raw_text AS
SELECT
    RELATIVE_PATH,
    TO_VARCHAR (
        SNOWFLAKE.CORTEX.PARSE_DOCUMENT (
            '@curso_ia.seccion_4.datos_aws',
            RELATIVE_PATH,
            {'mode': 'LAYOUT'} ):content
        ) AS EXTRACTED_LAYOUT
FROM
    DIRECTORY('@curso_ia.seccion_4.datos_aws')
WHERE
    RELATIVE_PATH LIKE '%.pdf';

In [None]:
SELECT * FROM curso_ia.seccion_4.aws_raw_text;

A continuación, utilizaremos `SPLIT_TEXT_RECURSIVE_CHARACTER` para dividir los documentos en fragmentos de un tamaño máximo de 2000 caracteres cada uno, insertando los fragmentos en una nueva tabla `AWS_CHUNKS`.

In [None]:
CREATE OR REPLACE TABLE curso_ia.seccion_4.aws_chunks AS
SELECT
    relative_path,
    BUILD_SCOPED_FILE_URL(@datos_aws, relative_path, TRUE) AS file_url,
    CONCAT(relative_path, ': ', c.value::TEXT) AS chunk,
    'Español' AS language
FROM
    curso_ia.seccion_4.aws_raw_text,
    LATERAL FLATTEN(SNOWFLAKE.CORTEX.SPLIT_TEXT_RECURSIVE_CHARACTER(
        EXTRACTED_LAYOUT,
        'markdown',
        2000, -- chunks de 2000 caracteres
        300 -- 300 superposición de caracteres
    )) c;

In [None]:
SELECT * FROM curso_ia.seccion_4.aws_chunks;

### Crear el servicio de Cortex Search

Ahora crearemos un servicio de búsqueda de Cortex en nuestra nueva tabla ejecutando el siguiente comando SQL.

In [None]:
CREATE OR REPLACE CORTEX SEARCH SERVICE curso_ia.seccion_4.aws_search_service
    ON chunk
    ATTRIBUTES language
    WAREHOUSE = search_service_wh
    TARGET_LAG = '6 hour',
    EMBEDDING_MODEL = 'voyage-multilingual-2'
    AS (
    SELECT
        chunk,
        relative_path,
        file_url,
        language
    FROM curso_ia.seccion_4.aws_chunks
    );

In [None]:
-- Mostramos los servicios de Cortex Search que tengamos creados
SHOW CORTEX SEARCH SERVICES;

### Crear una aplicación de Stremlit

Podemos consultar el servicio creado con el SDK de Python (usando el paquete de Python Snowflake). En esta lección mostraremos el uso del SDK de Python en una aplicación Streamlit en Snowflake.

A continuación deberá ir a la sección de Streamlit y seguir los siguientes pasos para crear la aplicación:

1. Click en + Streamlit app
2. Proporcionarle el nombre `aws_app`
3. Seleccionar la base de datos, esquema y warehouse:
    
    Base de datos: `curso_ia`
    
    Esquema: `seccion_4`
    
4. Click en Create
5. Eliminar el código de ejemplo que trae la plantilla
6. Copiar el código Python de la siguiente celda en la aplicación de Streamlit.
7. Agregar las librerías necesarias. Para esta aplicación debemos asegurarnos de que tenemos agregada las siguientes librerías:
    - `snowflake`
    - `snowflake-snowpark-python`
8. Ejecutar la aplicación

In [None]:
import streamlit as st
from snowflake.core import Root # requiere snowflake>=0.8.0
from snowflake.snowpark.context import get_active_session

MODELS = [
    "claude-4-sonnet",
    "openai-gpt-4.1",
    "mistral-large",
    "snowflake-arctic",
    "llama3-70b",
    "llama3-8b",
]

def init_messages():
    """
    Inicializa el estado de sesión para los mensajes del chat. Si el estado de sesión indica que la
    conversación debe ser borrada o si la clave "messages" no está en el estado de sesión,
    la inicializa como una lista vacía.
    """
    if st.session_state.clear_conversation or "messages" not in st.session_state:
        st.session_state.messages = []


def init_service_metadata():
    """
    Inicializa el estado de sesión para los metadatos del servicio de búsqueda cortex. Consulta los
    servicios de búsqueda cortex disponibles desde la sesión de Snowflake y almacena sus nombres y
    columnas de búsqueda en el estado de sesión.
    """
    if "service_metadata" not in st.session_state:
        services = session.sql("SHOW CORTEX SEARCH SERVICES;").collect()
        service_metadata = []
        if services:
            for s in services:
                svc_name = s["name"]
                svc_search_col = session.sql(
                    f"DESC CORTEX SEARCH SERVICE {svc_name};"
                ).collect()[0]["search_column"]
                service_metadata.append(
                    {"name": svc_name, "search_column": svc_search_col}
                )

        st.session_state.service_metadata = service_metadata


def init_config_options():
    """
    Inicializa las opciones de configuración en la barra lateral de Streamlit. Permite al usuario seleccionar
    un servicio de búsqueda cortex, borrar la conversación, alternar el modo debug y alternar el uso del
    historial de chat. También proporciona opciones avanzadas para seleccionar un modelo, el número de
    fragmentos de contexto y el número de mensajes de chat a usar en el historial de chat.
    """
    st.sidebar.selectbox(
        "Seleccionar servicio de búsqueda cortex:",
        [s["name"] for s in st.session_state.service_metadata],
        key="selected_cortex_search_service",
    )

    st.sidebar.button("Borrar conversación", key="clear_conversation")
    st.sidebar.toggle("Debug", key="debug", value=False)
    st.sidebar.toggle("Usar historial de chat", key="use_chat_history", value=True)

    with st.sidebar.expander("Opciones avanzadas"):
        st.selectbox("Seleccionar modelo:", MODELS, key="model_name")
        st.number_input(
            "Seleccionar número de fragmentos de contexto",
            value=5,
            key="num_retrieved_chunks",
            min_value=1,
            max_value=10,
        )
        st.number_input(
            "Seleccionar número de mensajes a usar en el historial de chat",
            value=5,
            key="num_chat_messages",
            min_value=1,
            max_value=10,
        )

    st.sidebar.expander("Estado de Sesión").write(st.session_state)


def query_cortex_search_service(query, columns = [], filter={}):
    """
    Consulta el servicio de búsqueda cortex seleccionado con la consulta dada y recupera documentos de contexto.
    Muestra los documentos de contexto recuperados en la barra lateral si el modo debug está habilitado. Devuelve
    los documentos de contexto como una cadena.

    Args:
        query (str): La consulta para buscar en el servicio de búsqueda cortex.

    Returns:
        str: La cadena concatenada de documentos de contexto.
    """
    db, schema = session.get_current_database(), session.get_current_schema()

    cortex_search_service = (
        root.databases[db]
        .schemas[schema]
        .cortex_search_services[st.session_state.selected_cortex_search_service]
    )

    context_documents = cortex_search_service.search(
        query, columns=columns, filter=filter, limit=st.session_state.num_retrieved_chunks
    )
    results = context_documents.results

    service_metadata = st.session_state.service_metadata
    search_col = [s["search_column"] for s in service_metadata
                    if s["name"] == st.session_state.selected_cortex_search_service][0].lower()

    context_str = ""
    for i, r in enumerate(results):
        context_str += f"Documento de contexto {i+1}: {r[search_col]} \n" + "\n"

    if st.session_state.debug:
        st.sidebar.text_area("Documentos de contexto", context_str, height=500)

    return context_str, results


def get_chat_history():
    """
    Recupera el historial de chat del estado de sesión limitado al número de mensajes especificado
    por el usuario en las opciones de la barra lateral.

    Returns:
        list: La lista de mensajes de chat del estado de sesión.
    """
    start_index = max(
        0, len(st.session_state.messages) - st.session_state.num_chat_messages
    )
    return st.session_state.messages[start_index : len(st.session_state.messages) - 1]


def complete(model, prompt):
    """
    Genera una respuesta para el prompt dado usando el modelo especificado.

    Args:
        model (str): El nombre del modelo a usar para la respuesta.
        prompt (str): El prompt para generar una respuesta.

    Returns:
        str: La respuesta generada.
    """
    return session.sql("SELECT ai_complete(?,?)", (model, prompt)).collect()[0][0]

def make_chat_history_summary(chat_history, question):
    """
    Genera un resumen del historial de chat combinado con la pregunta actual para extender el contexto
    de la consulta. Usa el modelo de lenguaje para generar este resumen.

    Args:
        chat_history (str): El historial de chat a incluir en el resumen.
        question (str): La pregunta actual del usuario para extender con el historial de chat.

    Returns:
        str: El resumen generado del historial de chat y la pregunta.
    """
    prompt = f"""
        [INST]
        Basándote en el historial de chat a continuación y la pregunta, genera una consulta que extienda la pregunta
        con el historial de chat proporcionado. La consulta debe estar en lenguaje natural.
        Responde solo con la consulta. No agregues ninguna explicación.

        <chat_history>
        {chat_history}
        </chat_history>
        <question>
        {question}
        </question>
        [/INST]
    """

    summary = complete(st.session_state.model_name, prompt)

    if st.session_state.debug:
        st.sidebar.text_area(
            "Resumen del historial de chat", summary.replace("$", "\$"), height=150
        )

    return summary


def create_prompt(user_question):
    """
    Crea un prompt para el modelo de lenguaje combinando la pregunta del usuario con el contexto recuperado
    del servicio de búsqueda cortex y el historial de chat (si está habilitado). Formatea el prompt de acuerdo
    al formato de entrada esperado del modelo.

    Args:
        user_question (str): La pregunta del usuario para generar un prompt.

    Returns:
        str: El prompt generado para el modelo de lenguaje.
    """
    if st.session_state.use_chat_history:
        chat_history = get_chat_history()
        if chat_history != []:
            question_summary = make_chat_history_summary(chat_history, user_question)
            prompt_context, results = query_cortex_search_service(
                question_summary,
                columns=["chunk", "file_url", "relative_path"],
                filter={"@and": [{"@eq": {"language": "Español"}}]},
            )
        else:
            prompt_context, results = query_cortex_search_service(
                user_question,
                columns=["chunk", "file_url", "relative_path"],
                filter={"@and": [{"@eq": {"language": "Español"}}]},
            )
    else:
        prompt_context, results = query_cortex_search_service(
            user_question,
            columns=["chunk", "file_url", "relative_path"],
            filter={"@and": [{"@eq": {"language": "Español"}}]},
        )
        chat_history = ""

    prompt = f"""
        [INST]
        Eres un asistente de chat de IA útil con capacidades RAG. Cuando un usuario te haga una pregunta,
        también se te dará contexto proporcionado entre las etiquetas <context> y </context>. Usa ese contexto
        con el historial de chat del usuario proporcionado entre las etiquetas <chat_history> y </chat_history>
        para proporcionar un resumen que aborde la pregunta del usuario. 

        IMPORTANTE: Formatea tu respuesta usando Markdown apropiado:
        - Usa líneas vacías (doble salto de línea) entre párrafos
        - Usa **texto** para negritas
        - Usa listas con - para elementos
        - Usa ### para subtítulos cuando sea apropiado
        
        Asegúrate de que la respuesta sea coherente, concisa y directamente relevante a la pregunta del usuario.

        Si el usuario hace una pregunta genérica que no puede ser respondida con el contexto dado o el chat_history,
        simplemente di "No conozco la respuesta a esa pregunta."

        No digas cosas como "según el contexto proporcionado".

        <chat_history>
        {chat_history}
        </chat_history>
        <context>
        {prompt_context}
        </context>
        <question>
        {user_question}
        </question>
        [/INST]
        Respuesta:
        """
    return prompt, results


def main():
    st.title(f":speech_balloon: Chatbot con Snowflake Cortex")

    init_service_metadata()
    init_config_options()
    init_messages()

    icons = {"assistant": "❄️", "user": "👤"}

    # Mostrar mensajes de chat del historial al reejecutar la aplicación
    for message in st.session_state.messages:
        with st.chat_message(message["role"], avatar=icons[message["role"]]):
            st.markdown(message["content"])

    disable_chat = (
        "service_metadata" not in st.session_state
        or len(st.session_state.service_metadata) == 0
    )
    if question := st.chat_input("Haz una pregunta...", disabled=disable_chat):
        # Agregar mensaje del usuario al historial de chat
        st.session_state.messages.append({"role": "user", "content": question})
        # Mostrar mensaje del usuario en el contenedor de mensajes de chat
        with st.chat_message("user", avatar=icons["user"]):
            st.markdown(question.replace("$", "\$"))

        # Mostrar respuesta del asistente en el contenedor de mensajes de chat
        with st.chat_message("assistant", avatar=icons["assistant"]):
            message_placeholder = st.empty()
            question = question.replace("'", "")
            prompt, results = create_prompt(question)
            with st.spinner("Pensando..."):
                generated_response = complete(
                    st.session_state.model_name, prompt
                )
                # construir tabla de referencias para citación
                markdown_table = "###### Referencias \n\n| Título PDF |\n|-------|\n"
                for ref in results:
                    markdown_table += f"| {ref['relative_path']} |\n"
                generated_response_clean = generated_response.replace("\\n", "\n").replace("\\", "")
                message_placeholder.markdown(generated_response_clean + "\n\n" + markdown_table)
                

        st.session_state.messages.append(
            {"role": "assistant", "content": generated_response}
        )


if __name__ == "__main__":
    session = get_active_session()
    root = Root(session)
    main()