# Construyendo un Sistema de IA Multi-Agente Usando LangGraph y LangSmith

¡Bienvenido a esta guía completa sobre cómo construir **flujos de trabajo multi-agente** usando LangGraph! En este notebook, recorreremos el camino desde un agente ReAct básico hasta un sofisticado sistema de soporte al cliente multi-agente. Exploraremos los conceptos clave de LangGraph, aprovecharemos sus librerías pre-construidas e integraremos características avanzadas como la interacción humana en el bucle y la memoria a largo plazo.

## ¿Qué es LangGraph?

LangGraph es una librería diseñada para construir aplicaciones con estado y múltiples actores utilizando LLMs, cadenas y herramientas. Extiende LangChain permitiéndote definir secuencias de llamadas (cadenas) de manera más robusta, con ciclos, lógica condicional y la capacidad de gestionar transiciones de estado complejas. Esto lo hace ideal para crear flujos de trabajo agentivos donde se requieren múltiples pasos de "pensar" y "actuar", o donde diferentes agentes especializados necesitan colaborar.

## ¿Por qué Multi-Agente?

Las arquitecturas multi-agente son poderosas por varias razones:

*   **Especialización y Modularidad**: En lugar de un solo agente monolítico intentando manejar todo, un sistema multi-agente se compone de agentes más pequeños y especializados. Cada agente está optimizado para una tarea o dominio específico (por ejemplo, uno para consultas musicales, otro para detalles de facturación). Esto mejora la precisión y el rendimiento dentro de su área de especialización.
*   **Flexibilidad y Escalabilidad**: Se pueden añadir nuevas capacidades simplemente integrando un nuevo agente especializado, sin necesidad de reentrenar o modificar significativamente los agentes existentes. Los agentes pueden añadirse, eliminarse o modificarse rápidamente, haciendo el sistema altamente adaptable.
*   **Robustez**: Si un agente falla o tiene un bajo rendimiento en su tarea específica, no necesariamente afecta a todo el sistema, ya que los demás agentes pueden seguir funcionando. Esto contribuye a una aplicación más resiliente.
*   **Resolución de Problemas Complejos**: Muchos problemas del mundo real requieren diferentes tipos de experiencia. Un sistema multi-agente puede imitar la colaboración de un equipo humano, dividiendo consultas complejas en sub-problemas que son manejados por el experto más adecuado.

## Nuestro Escenario de Soporte al Cliente

Simularemos un ejemplo realista de soporte al cliente para una tienda de música digital. El agente interactuará con la [base de datos Chinook](https://www.sqlitetutorial.net/sqlite-sample-database/), que contiene información completa sobre clientes, facturas y un catálogo musical.

Nuestra arquitectura final se verá algo así:

![Diagrama de Arquitectura](images/architecture.png) 

Como puedes ver, el sistema comienza con un paso de **verificación del cliente** (con intervención humana), luego **carga las preferencias del usuario** desde la memoria a largo plazo. Un **agente supervisor** dirige inteligentemente la consulta al sub-agente especializado apropiado: ya sea el **sub-agente de catálogo musical** o el **sub-agente de información de facturas**. Finalmente, el sistema **guarda cualquier nueva preferencia del usuario** en la memoria a largo plazo antes de proporcionar una respuesta.

Para profundizar en los conceptos básicos de LangGraph y aprender sobre nuestro framework, visita nuestra [LangChain Academy](https://academy.langchain.com/courses/intro-to-langgraph)!


## Pre-trabajo: Configuración

Antes de sumergirnos en la construcción de nuestro sistema multi-agente, configuremos nuestro entorno. Esto implica cargar las variables de entorno necesarias, conectar con nuestra base de datos de ejemplo e inicializar nuestros almacenes de memoria.

#### Cargando Variables de Entorno

Para comenzar, cargaremos nuestras claves de API y otra configuración desde un archivo `.env`. Esto mantiene la información sensible fuera del código. En este ejemplo usaremos los modelos de OpenAI, pero LangGraph es independiente del modelo, por lo que puedes reemplazar fácilmente `ChatOpenAI` por otros proveedores de `ChatModel`, como Azure OpenAI, Anthropic o Google Gemini.

Asegúrate de que tu archivo `.env` incluya todas las claves especificadas en el archivo `.env.example` (por ejemplo, `OPENAI_API_KEY`).


In [None]:
import os
from dotenv import load_dotenv # Import function to load environment variables
from langchain_openai import AzureChatOpenAI # Import the OpenAI chat model

# Load environment variables from the .env file. The `override=True` argument
# ensures that variables from the .env file will overwrite existing environment variables.
load_dotenv(dotenv_path=".env", override=True)

load_dotenv()
api_key = os.getenv("UOC_API_KEY")
api_base = os.getenv("UOC_ENDPOINT")
deployment = os.getenv("UOC_MODEL_NAME")
api_version = os.getenv("UOC_API_VERSION")

llm = AzureChatOpenAI(
    openai_api_key=api_key,
    azure_endpoint=api_base,
    deployment_name=deployment,
    api_version=api_version,
    # temperature=0
)


#### Cargando Datos de Cliente de Muestra (Base de Datos Chinook)

Nuestro agente de soporte al cliente interactuará con una base de datos para recuperar información. Usaremos la [base de datos Chinook](https://www.sqlitetutorial.net/sqlite-sample-database/), una base de datos de ejemplo muy utilizada que contiene tablas relacionadas con información de clientes, historial de compras y un catálogo musical (artistas, álbumes, canciones, géneros). Esto proporciona un conjunto de datos completo para que nuestro agente pueda hacer consultas.

Cargaremos esta base de datos SQLite en memoria para un acceso rápido durante la ejecución del notebook. Esto evita el almacenamiento persistente en archivos y simplifica la configuración.


In [None]:
import sqlite3 # Standard Python library for SQLite database interaction
import requests # Library for making HTTP requests (to download the SQL script)
from langchain_community.utilities.sql_database import SQLDatabase # LangChain utility to interact with SQL databases
from sqlalchemy import create_engine # SQLAlchemy function to create a database engine
from sqlalchemy.pool import StaticPool # SQLAlchemy connection pool class for in-memory databases

def get_engine_for_chinook_db():
    """Pull sql file, populate in-memory database, and create engine."""
    # URL to the raw SQL script for the Chinook database
    url = "https://raw.githubusercontent.com/lerocha/chinook-database/master/ChinookDatabase/DataSources/Chinook_Sqlite.sql"
    
    # Fetch the SQL script content from the URL
    response = requests.get(url)
    sql_script = response.text

    # Create an in-memory SQLite database connection.
    # `check_same_thread=False` is important for SQLAlchemy's StaticPool.
    connection = sqlite3.connect(":memory:", check_same_thread=False)
    
    # Execute the SQL script to populate the in-memory database with Chinook data
    connection.executescript(sql_script)
    
    # Create a SQLAlchemy engine for the in-memory SQLite database.
    # `creator=lambda: connection` tells SQLAlchemy how to get a new connection.
    # `poolclass=StaticPool` is used for in-memory databases, ensuring the same connection is reused.
    # `connect_args` are passed directly to the `sqlite3.connect` function.
    return create_engine(
        "sqlite://",
        creator=lambda: connection,
        poolclass=StaticPool,
        connect_args={"check_same_thread": False},
    )

# Get the SQLAlchemy engine for our Chinook database
engine = get_engine_for_chinook_db()

# Create a LangChain SQLDatabase utility instance from the engine.
# This utility will help our agents interact with the database via SQL queries.
db = SQLDatabase(engine)

#### Configuración de Memoria a Corto y Largo Plazo

La memoria es un componente crucial para construir agentes inteligentes. LangGraph proporciona mecanismos para gestionar tanto la memoria a corto plazo como la memoria a largo plazo en nuestros flujos de trabajo.

* **Memoria a Corto Plazo (Checkpointer)**: Preserva el estado de un hilo conversacional específico. Permite que el agente mantenga el contexto y continúe desde donde lo dejó dentro de *una sola conversación*. Si la consulta de un usuario requiere varios turnos o intervención humana, el checkpointer garantiza que el estado del grafo se guarde y pueda reanudarse.

  * Usamos `MemorySaver` para una memoria a corto plazo en memoria, adecuada para fines de demostración. En producción, normalmente se usaría un checkpointer persistente (por ejemplo, SQL, Redis).

* **Memoria a Largo Plazo (InMemoryStore)**: Permite almacenar y recuperar información *entre conversaciones* o a lo largo de diferentes sesiones del mismo usuario. En nuestro escenario, la usaremos para guardar preferencias del usuario, permitiendo la personalización. Por ejemplo, si un usuario menciona su género musical favorito, esta información puede almacenarse y utilizarse en futuras interacciones.

  * Usamos `InMemoryStore` para almacenamiento en memoria a largo plazo. Al igual que con los checkpointers, en un entorno de producción se utilizaría un almacenamiento persistente (por ejemplo, una base de datos vectorial o un almacén clave-valor).


In [None]:
from langgraph.checkpoint.memory import MemorySaver # For short-term memory (thread-level state persistence)
from langgraph.store.memory import InMemoryStore # For long-term memory (storing user preferences)

# Initializing `InMemoryStore` for long-term memory. 
# This store will hold user-specific data like music preferences across sessions.
in_memory_store = InMemoryStore()

# Initializing `MemorySaver` for short-term (thread-level) memory. 
# This checkpointer saves the graph's state after each step, allowing for restarts or interruptions within a thread.
checkpointer = MemorySaver()

## Parte 1: Construyendo Subagentes ReAct

Nuestro sistema multiagente estará compuesto por subagentes especializados. Comenzaremos construyendo dos agentes ReAct fundamentales: uno desde cero para entender sus componentes principales, y otro utilizando las utilidades preconstruidas de LangGraph para un desarrollo más ágil.

### 1.1 Construyendo un Agente ReAct Desde Cero: El Subagente del Catálogo Musical

Nuestro primer subagente se encargará de atender consultas de clientes relacionadas con el catálogo de la tienda de música. Este agente utilizará un conjunto de herramientas para obtener información sobre artistas, álbumes, canciones y géneros desde la base de datos Chinook.

El marco ReAct (Razonamiento y Acción) es un patrón popular para construir agentes que alternan pasos de razonamiento con pasos de acción (uso de herramientas). El agente **razona** sobre qué herramienta usar, **actúa** llamando a esa herramienta, y luego **observa** el resultado para **razonar** nuevamente.

![Arquitectura del Subagente Musical](images/music_subagent.png)

#### Estado

En LangGraph, el **Estado** es un concepto fundamental. Actúa como la memoria compartida del agente: una estructura de datos que se pasa entre los nodos del grafo. Cada nodo recibe el estado actual, ejecuta su lógica y devuelve actualizaciones al estado, que luego se convierten en la entrada para el siguiente nodo. Este flujo continuo de información a través del estado permite que el grafo mantenga el contexto y acumule información a medida que avanza.

Para nuestro agente de soporte al cliente, el estado rastreará los siguientes elementos clave:

1. `customer_id`: Una cadena que representa el ID del cliente que interactúa con el agente. Esto es crucial para consultas personalizadas (por ejemplo, revisar el historial de facturas).
2. `messages`: Una lista anotada de objetos `AnyMessage`. Esto forma el historial de la conversación, incluyendo entradas del usuario, respuestas del agente y resultados de herramientas. `add_messages` se encarga de agregar nuevos mensajes a la lista, manteniendo el flujo conversacional.
3. `loaded_memory`: Una cadena que almacenará cualquier preferencia del usuario o información relevante cargada desde la memoria a largo plazo. Esto permite que el agente personalice las respuestas según interacciones pasadas.
4. `remaining_steps`: Un objeto `RemainingSteps`. Esto forma parte del estado gestionado por LangGraph y ayuda a rastrear el número de pasos restantes antes de alcanzar el límite de recursión, previniendo bucles infinitos en grafos cíclicos.


In [None]:
from typing_extensions import TypedDict # For defining dictionaries with type hints
from typing import Annotated, List # For type hinting lists and adding annotations
from langgraph.graph.message import AnyMessage, add_messages # For managing messages in the graph state
from langgraph.managed.is_last_step import RemainingSteps # For tracking recursion limits

class State(TypedDict):
    """Represents the state of our LangGraph agent."""
    # customer_id: Stores the unique identifier for the current customer.
    customer_id: str
    
    # messages: A list of messages that form the conversation history.
    # Annotated with `add_messages` to ensure new messages are appended rather than overwritten.
    messages: Annotated[list[AnyMessage], add_messages]
    
    # loaded_memory: Stores information loaded from the long-term memory store, 
    # typically user preferences or historical context.
    loaded_memory: str
    
    # remaining_steps: Used by LangGraph to track the number of allowed steps 
    # to prevent infinite loops in cyclic graphs.
    remaining_steps: RemainingSteps 

#### Herramientas

Las herramientas son funcionalidades externas que un modelo de lenguaje (LLM) puede invocar para ampliar sus capacidades más allá de la simple generación de texto. Estas pueden ser APIs, consultas a bases de datos o cualquier función arbitraria en Python. En nuestro subagente del catálogo musical, definiremos un conjunto de herramientas que interactúan con la base de datos Chinook para obtener información relacionada con la música.

Utilizamos el decorador `@tool` de LangChain para exponer fácilmente funciones de Python como herramientas que nuestro LLM puede aprender a usar. Este decorador genera automáticamente un esquema que el modelo puede comprender, lo que le permite decidir cuándo y cómo llamar a la herramienta.


In [None]:
from langchain_core.tools import tool # Decorator to define a function as a LangChain tool
import ast # Module to safely evaluate strings containing Python literal structures

@tool
def get_albums_by_artist(artist: str):
    """Get albums by an artist."""
    # Execute a SQL query to retrieve album titles and artist names
    # from the Album and Artist tables, joining them and filtering by artist name.
    # `db.run` is a utility from LangChain's SQLDatabase to execute queries.
    # `include_columns=True` ensures column names are included in the result for better readability.
    return db.run(
        f"""
        SELECT Album.Title, Artist.Name 
        FROM Album 
        JOIN Artist ON Album.ArtistId = Artist.ArtistId 
        WHERE Artist.Name LIKE '%{artist}%';
        """,
        include_columns=True
    )

@tool
def get_tracks_by_artist(artist: str):
    """Get songs by an artist (or similar artists)."""
    # Execute a SQL query to find tracks (songs) by a given artist, or similar artists.
    # It joins Album, Artist, and Track tables to get song names and artist names.
    return db.run(
        f"""
        SELECT Track.Name as SongName, Artist.Name as ArtistName 
        FROM Album 
        LEFT JOIN Artist ON Album.ArtistId = Artist.ArtistId 
        LEFT JOIN Track ON Track.AlbumId = Album.AlbumId 
        WHERE Artist.Name LIKE '%{artist}%';
        """,
        include_columns=True
    )

@tool
def get_songs_by_genre(genre: str):
    """
    Fetch songs from the database that match a specific genre.
    
    Args:
        genre (str): The genre of the songs to fetch.
    
    Returns:
        list[dict]: A list of songs that match the specified genre.
    """
    # First, find the GenreId for the given genre name.
    genre_id_query = f"SELECT GenreId FROM Genre WHERE Name LIKE '%{genre}%'"
    genre_ids = db.run(genre_id_query)
    
    # If no genre IDs are found, return an informative message.
    if not genre_ids:
        return f"No songs found for the genre: {genre}"
    
    # Safely evaluate the string result from db.run to get a list of tuples.
    genre_ids = ast.literal_eval(genre_ids)
    # Extract just the GenreId values and join them into a comma-separated string for the IN clause.
    genre_id_list = ", ".join(str(gid[0]) for gid in genre_ids)

    # Construct the query to get songs for the found genre IDs.
    # It joins Track, Album, and Artist tables and limits the results to 8.
    songs_query = f"""
        SELECT Track.Name as SongName, Artist.Name as ArtistName
        FROM Track
        LEFT JOIN Album ON Track.AlbumId = Album.AlbumId
        LEFT JOIN Artist ON Album.ArtistId = Artist.ArtistId
        WHERE Track.GenreId IN ({genre_id_list})
        GROUP BY Artist.Name
        LIMIT 8;
    """
    songs = db.run(songs_query, include_columns=True)
    
    # If no songs are found for the genre, return an informative message.
    if not songs:
        return f"No songs found for the genre: {genre}"
        
    # Safely evaluate the string result and format it into a list of dictionaries.
    formatted_songs = ast.literal_eval(songs)
    return [
        {"Song": song["SongName"], "Artist": song["ArtistName"]}
        for song in formatted_songs
    ]

@tool
def check_for_songs(song_title):
    """Check if a song exists by its name."""
    # Execute a SQL query to check for the existence of a song by its title.
    return db.run(
        f"""
        SELECT * FROM Track WHERE Name LIKE '%{song_title}%';
        """,
        include_columns=True
    )

# Aggregate all music-related tools into a list.
music_tools = [get_albums_by_artist, get_tracks_by_artist, get_songs_by_genre, check_for_songs]

# Bind the tools to our ChatOpenAI model.
# This step configures the LLM so it knows about the available tools and their schemas,
# allowing it to generate tool calls when appropriate based on the user's query.
llm_with_music_tools = llm.bind_tools(music_tools)

#### Nodos

En LangGraph, los **Nodos** son los bloques fundamentales de tu grafo. Son, esencialmente, funciones en Python (o JS/TS) que toman el `State` (estado) del grafo como entrada, ejecutan cierta lógica (por ejemplo, invocar un LLM, llamar a una herramienta, actualizar datos) y devuelven actualizaciones al `State`.

Para nuestro agente ReAct, definiremos dos tipos principales de nodos:

1. **`music_assistant` (Nodo de Razonamiento)**: Este nodo es un modelo de lenguaje (LLM) responsable del **razonamiento**. Toma el historial actual de la conversación y la consulta del usuario, considera las herramientas disponibles y decide la próxima mejor acción. Esta acción puede ser invocar una herramienta o, si la consulta ya está resuelta, generar una respuesta final.

2. **`music_tool_node` (Nodo de Acción)**: Este nodo se encarga de la **acción**. Cuando el `music_assistant` decide utilizar una herramienta, el `music_tool_node` recibe la llamada a la herramienta, ejecuta la función correspondiente y luego devuelve la salida de esa herramienta al estado del grafo. LangGraph proporciona una utilidad llamada `ToolNode` que facilita la ejecución automática de herramientas.


In [None]:
from langgraph.prebuilt import ToolNode # Pre-built node for executing tools

# Create a ToolNode instance. This node will automatically execute any tool calls 
# generated by an LLM that is bound to these tools.
music_tool_node = ToolNode(music_tools)

In [None]:
from langchain_core.messages import ToolMessage, SystemMessage, HumanMessage # Message types for conversation history
from langchain_core.runnables import RunnableConfig # For configuration parameters passed to runnables

# Define the system prompt for the music assistant.
# This prompt provides instructions and persona for the LLM.
# It emphasizes the agent's role, core responsibilities, and search guidelines.
# The `memory` placeholder allows us to inject user preferences from long-term memory.
def generate_music_assistant_prompt(memory: str = "None") -> str:
    return f"""
    You are a member of the assistant team, your role specifically is to focused on helping customers discover and learn about music in our digital catalog. 
    If you are unable to find playlists, songs, or albums associated with an artist, it is okay. 
    Just inform the customer that the catalog does not have any playlists, songs, or albums associated with that artist.
    You also have context on any saved user preferences, helping you to tailor your response. 
    
    CORE RESPONSIBILITIES:
    - Search and provide accurate information about songs, albums, artists, and playlists
    - Offer relevant recommendations based on customer interests
    - Handle music-related queries with attention to detail
    - Help customers discover new music they might enjoy
    - You are routed only when there are questions related to music catalog; ignore other questions. 
    
    SEARCH GUIDELINES:
    1. Always perform thorough searches before concluding something is unavailable
    2. If exact matches aren't found, try:
       - Checking for alternative spellings
       - Looking for similar artist names
       - Searching by partial matches
       - Checking different versions/remixes
    3. When providing song lists:
       - Include the artist name with each song
       - Mention the album when relevant
       - Note if it's part of any playlists
       - Indicate if there are multiple versions
    
    Additional context is provided below: 

    Prior saved user preferences: {memory}
    
    Message history is also attached.  
    """

# Define the music_assistant node function.
# This function receives the current `State` and `RunnableConfig`.
def music_assistant(state: State, config: RunnableConfig): 

    # Fetch long-term memory (user preferences) from the state.
    # If `loaded_memory` is not present in the state, default to "None".
    memory = "None" 
    if "loaded_memory" in state: 
        memory = state["loaded_memory"]

    # Generate the system prompt for the music assistant, injecting the loaded memory.
    music_assistant_prompt = generate_music_assistant_prompt(memory)

    # Invoke the LLM (`llm_with_music_tools`) with the system prompt and the current message history.
    # The LLM will decide whether to call a tool or generate a final response.
    response = llm_with_music_tools.invoke([SystemMessage(music_assistant_prompt)] + state["messages"])
    
    # Update the state by appending the LLM's response to the `messages` list.
    # The `add_messages` annotation in `State` ensures this is appended correctly.
    return {"messages": [response]}

#### Aristas (Edges)

Las **aristas** son las conexiones entre nodos en un grafo de LangGraph. Definen el flujo y la secuencia de ejecución dentro de tu aplicación.

* **Aristas Normales**: Son deterministas, lo que significa que siempre conducen de un nodo específico a otro también específico. Por ejemplo, `graph.add_edge("node_A", "node_B")` indica que, una vez que `node_A` finaliza, `node_B` se ejecutará siempre a continuación.

* **Aristas Condicionales**: Permiten un enrutamiento dinámico. En lugar de tener un destino fijo, una arista condicional utiliza una función (llamada "router" o "función condicional") que inspecciona el `State` actual y devuelve una cadena correspondiente al nombre del próximo nodo a visitar. Esto permite tomar decisiones flexibles e inteligentes sobre la ruta del flujo de trabajo.

Para nuestro agente ReAct, necesitaremos una **arista condicional** después del nodo `music_assistant`. Esta arista determinará:

* Si `music_assistant` decidió invocar una herramienta, deberíamos dirigirnos al nodo `music_tool_node` para ejecutarla.
* Si `music_assistant` generó una respuesta final comprensible para el usuario (es decir, sin llamadas a herramientas), deberíamos ir al nodo `END`, finalizando la ejecución del subagente porque la consulta ya ha sido resuelta.

La función `should_continue` implementa esta lógica condicional.


In [None]:
# Define a conditional edge function named `should_continue`.
# This function determines the next step in the graph based on the LLM's response.
def should_continue(state: State, config: RunnableConfig):
    # Get the list of messages from the current state.
    messages = state["messages"]
    # Get the last message, which is the response from the `music_assistant` LLM.
    last_message = messages[-1]
    
    # Check if the last message contains any tool calls.
    # LLMs generate `tool_calls` when they decide to use a function.
    if not last_message.tool_calls:
        # If there are no tool calls, it means the LLM has generated a final answer.
        # In this case, the sub-agent's work is done, so we return "end" to signal completion.
        return "end"
    # Otherwise, if there are tool calls,
    else:
        # We need to execute the tool(s). So, we return "continue" to route to the tool execution node.
        return "continue"

#### ¡Compilar el Grafo!

Ahora que ya hemos definido nuestro `State` (qué datos fluyen), los `Nodes` (qué acciones se ejecutan) y las `Edges` (cómo fluye el control), podemos ensamblar todo en un flujo de trabajo completo con LangGraph. Este proceso se denomina **compilación**.

La clase `StateGraph` se utiliza para definir la estructura de nuestro agente. Agregamos nodos y aristas, y luego *compilamos* el grafo. La compilación convierte el grafo definido en un objeto ejecutable, listo para ser invocado.

Métodos clave utilizados:

* `StateGraph(State)`: Inicializa un grafo con nuestro esquema de estado definido.
* `add_node(nombre, función_nodo)`: Agrega un nodo al grafo, asociando un nombre con una función de Python invocable.
* `add_edge(origen, destino)`: Crea una arista directa e incondicional desde el nodo `origen` al nodo `destino`.
* `add_conditional_edges(origen, función_condicional, mapeo)`: Crea una arista dinámica. La `función_condicional` se llama para determinar el siguiente nodo según su valor de retorno, el cual debe coincidir con una clave en el diccionario `mapeo`.

  * `START`: Punto de entrada especial que marca el inicio de la ejecución del grafo.
  * `END`: Punto de salida especial que indica el final de la ejecución del grafo.
* `compile(name, checkpointer, store)`: Finaliza el grafo.

  * `name`: Identificador único para el grafo compilado.
  * `checkpointer`: Mecanismo de memoria a corto plazo (`MemorySaver`) para guardar y reanudar el estado del grafo.
  * `store`: Mecanismo de memoria a largo plazo (`InMemoryStore`) para mantener datos entre sesiones.


In [None]:
from langgraph.graph import StateGraph, START, END # Core LangGraph classes and special node names
from utils import show_graph # Utility function to visualize the graph (assumed to be in a utils.py file)

# Initialize a StateGraph with our defined `State` schema.
# This tells LangGraph how the data will flow and be managed within the graph.
music_workflow = StateGraph(State)

# Add the 'music_assistant' node to the graph.
# This node is responsible for the LLM's reasoning and generating tool calls or final responses.
music_workflow.add_node("music_assistant", music_assistant)

# Add the 'music_tool_node' to the graph.
# This node is responsible for executing the tools when requested by the LLM.
music_workflow.add_node("music_tool_node", music_tool_node)


# Define the starting point of the graph.
# All queries will initially enter the 'music_assistant' node.
music_workflow.add_edge(START, "music_assistant")

# Add a conditional edge from 'music_assistant'.
# The `should_continue` function will be called to determine the next node.
music_workflow.add_conditional_edges(
    "music_assistant", # Source node
    should_continue,   # Conditional function to call
    {
        # If `should_continue` returns "continue", route to `music_tool_node`.
        "continue": "music_tool_node",
        # If `should_continue` returns "end", terminate the graph execution.
        "end": END,
    },
)

# Add a normal edge from 'music_tool_node' back to 'music_assistant'.
# After a tool is executed, the result is fed back to the LLM for further reasoning 
# or to formulate a final response (ReAct loop).
music_workflow.add_edge("music_tool_node", "music_assistant")

# Compile the graph into a runnable object.
# `name`: A unique identifier for this compiled graph (useful for debugging and logging).
# `checkpointer`: The short-term memory mechanism (MemorySaver) for thread-specific state.
# `store`: The long-term memory mechanism (InMemoryStore) for persistent user data.
music_catalog_subagent = music_workflow.compile(name="music_catalog_subagent", checkpointer=checkpointer, store = in_memory_store)

# Display a visualization of the compiled graph.
show_graph(music_catalog_subagent)

#### Prueba del Subagente del Catálogo Musical

Ahora que hemos compilado nuestro primer subagente ReAct, vamos a probar su funcionamiento. Simularemos una consulta de un cliente y observaremos cómo el agente la procesa, posiblemente invocando herramientas y proporcionando una respuesta.

Elementos clave en la prueba:

* `uuid.uuid4()`: Genera un ID de conversación único para cada sesión. Esto es crucial para que el *checkpointer* mantenga estados separados para conversaciones distintas.

  * En una aplicación real, este `thread_id` normalmente correspondería a una sesión de usuario o a un identificador único de conversación.

* `config`: Un diccionario que se pasa al método `invoke`, que contiene parámetros configurables como el `thread_id`. El *checkpointer* usa este ID para cargar y guardar el estado correcto del grafo.

* `invoke()`: Inicia la ejecución del grafo LangGraph. Recibe como entrada inicial los `messages` definidos en el `State` y la configuración (`config`).

* `pretty_print()`: Un método utilitario (suponiendo que esté definido para `AnyMessage` u objeto similar) que muestra los mensajes en un formato legible, indicando claramente los roles (usuario, agente, herramienta) y el contenido.

Esta prueba nos permitirá verificar si el subagente puede manejar consultas reales sobre el catálogo musical, utilizar herramientas cuando sea necesario y generar respuestas útiles y coherentes.


In [None]:
import uuid # Module for generating unique identifiers

# Generate a unique thread ID for this conversation.
# This ensures that the conversation state is isolated and can be resumed later.
thread_id = uuid.uuid4()

# Define the customer's question.
question = "I like the Rolling Stones. What songs do you recommend by them or by other artists that I might like?"

# Create the configuration dictionary for invoking the graph.
# The `thread_id` is essential for the checkpointer to manage state.
config = {"configurable": {"thread_id": thread_id}}

# Invoke the `music_catalog_subagent` with the initial human message and configuration.
# The `invoke` method runs the graph to completion and returns the final state.
result = music_catalog_subagent.invoke({"messages": [HumanMessage(content=question)]}, config=config)

# Iterate through the messages in the final state and print them for observation.
# `pretty_print()` provides a formatted output of the message content and role.
for message in result["messages"]:
   message.pretty_print()

### 1.2. Construcción de un Agente ReAct Usando Funcionalidades Preconstruidas de LangGraph: El Subagente de Información de Facturas

Aunque construir agentes desde cero proporciona un entendimiento profundo del funcionamiento interno, LangGraph también ofrece **librerías preconstruidas** para arquitecturas comunes. Estas soluciones predefinidas permiten desarrollar rápidamente agentes con patrones estándar como ReAct, eliminando gran parte del código repetitivo (*boilerplate*).

Puedes consultar la lista completa de librerías preconstruidas aquí:
👉 [LangGraph Pre-built Libraries](https://langchain-ai.github.io/langgraph/prebuilt/#available-libraries)

En esta sección, demostraremos cómo crear nuestro segundo subagente, el **Subagente de Información de Facturas**, utilizando la utilidad preconstruida `create_react_agent`. Este agente se encargará de manejar todas las consultas de clientes relacionadas con facturas y compras anteriores.

![Arquitectura del Subagente de Facturas](images/invoice_subagent.png)


#### Definición de Herramientas y Prompt para el Subagente de Facturación

Al igual que nuestro subagente musical, el subagente de facturación requiere su propio conjunto de herramientas especializadas y un prompt personalizado. Estas herramientas interactuarán con la base de datos Chinook para recuperar información específica de facturación.

Cada herramienta está diseñada para responder a un tipo específico de consulta que un cliente podría tener sobre sus facturas. Los prompts guiarán al LLM para que comprenda su rol y utilice estas herramientas de manera efectiva.


In [None]:
from langchain_core.tools import tool # Import the tool decorator again

@tool 
def get_invoices_by_customer_sorted_by_date(customer_id: str) -> list[dict]:
    """
    Look up all invoices for a customer using their ID.
    The invoices are sorted in descending order by invoice date, which helps when the customer wants to view their most recent/oldest invoice, or if 
    they want to view invoices within a specific date range.
    
    Args:
        customer_id (str): customer_id, which serves as the identifier.
    
    Returns:
        list[dict]: A list of invoices for the customer.
    """
    # Executes a SQL query to retrieve all invoice details for a given customer ID,
    # ordered by invoice date in descending order (most recent first).
    return db.run(f"SELECT * FROM Invoice WHERE CustomerId = {customer_id} ORDER BY InvoiceDate DESC;")


@tool 
def get_invoices_sorted_by_unit_price(customer_id: str) -> list[dict]:
    """
    Use this tool when the customer wants to know the details of one of their invoices based on the unit price/cost of the invoice.
    This tool looks up all invoices for a customer, and sorts the unit price from highest to lowest. In order to find the invoice associated with the customer, 
    we need to know the customer ID.
    
    Args:
        customer_id (str): customer_id, which serves as the identifier.
    
    Returns:
        list[dict]: A list of invoices sorted by unit price.
    """
    # Executes a SQL query to retrieve invoice details along with the unit price of items in those invoices,
    # for a given customer ID, ordered by unit price in descending order (highest unit price first).
    query = f"""
        SELECT Invoice.*, InvoiceLine.UnitPrice
        FROM Invoice
        JOIN InvoiceLine ON Invoice.InvoiceId = InvoiceLine.InvoiceId
        WHERE Invoice.CustomerId = {customer_id}
        ORDER BY InvoiceLine.UnitPrice DESC;
    """
    return db.run(query)


@tool
def get_employee_by_invoice_and_customer(invoice_id: str, customer_id: str) -> dict:
    """
    This tool will take in an invoice ID and a customer ID and return the employee information associated with the invoice.

    Args:
        invoice_id (int): The ID of the specific invoice.
        customer_id (str): customer_id, which serves as the identifier.

    Returns:
        dict: Information about the employee associated with the invoice.
    """

    # Executes a SQL query to find the employee associated with a specific invoice and customer.
    # It joins Employee, Customer, and Invoice tables to retrieve employee first name, title, and email.
    query = f"""
        SELECT Employee.FirstName, Employee.Title, Employee.Email
        FROM Employee
        JOIN Customer ON Customer.SupportRepId = Employee.EmployeeId
        JOIN Invoice ON Invoice.CustomerId = Customer.CustomerId
        WHERE Invoice.InvoiceId = ({invoice_id}) AND Invoice.CustomerId = ({customer_id});
    """
    
    employee_info = db.run(query, include_columns=True)
    
    # Checks if any employee information was found.
    if not employee_info:
        return f"No employee found for invoice ID {invoice_id} and customer identifier {customer_id}."
    return employee_info

# Aggregate all invoice-related tools into a list.
invoice_tools = [get_invoices_by_customer_sorted_by_date, get_invoices_sorted_by_unit_price, get_employee_by_invoice_and_customer]

: 

In [None]:
# Define the system prompt for the invoice information sub-agent.
# This prompt sets the persona and core responsibilities for the LLM within this sub-agent's domain.
# It explicitly lists the tools available to this agent and provides guidelines for their use.
invoice_subagent_prompt = """
    You are a subagent among a team of assistants. You are specialized for retrieving and processing invoice information. You are routed for invoice-related portion of the questions, so only respond to them.. 

    You have access to three tools. These tools enable you to retrieve and process invoice information from the database. Here are the tools:
    - get_invoices_by_customer_sorted_by_date: This tool retrieves all invoices for a customer, sorted by invoice date.
    - get_invoices_sorted_by_unit_price: This tool retrieves all invoices for a customer, sorted by unit price.
    - get_employee_by_invoice_and_customer: This tool retrieves the employee information associated with an invoice and a customer.
    
    If you are unable to retrieve the invoice information, inform the customer you are unable to retrieve the information, and ask if they would like to search for something else.
    
    CORE RESPONSIBILITIES:
    - Retrieve and process invoice information from the database
    - Provide detailed information about invoices, including customer details, invoice dates, total amounts, employees associated with the invoice, etc. when the customer asks for it.
    - Always maintain a professional, friendly, and patient demeanor
    
    You may have additional context that you should use to help answer the customer's query. It will be provided to you below:
    """

#### Uso de la Librería Preconstruida: `create_react_agent`

Ahora vamos a ensamblar nuestro subagente de facturación utilizando la función `create_react_agent` de LangGraph. Esta utilidad abstrae la definición de nodos y aristas para un bucle ReAct estándar, lo que nos permite crear rápidamente un agente ejecutable.

La función `create_react_agent` se encarga de:

* Vincular las `tools` proporcionadas al `model`.
* Configurar el LLM (`model`) como nodo de razonamiento.
* Configurar un `ToolNode` para ejecutar las herramientas.
* Definir la lógica condicional (aristas) para alternar entre el LLM y las herramientas hasta que se produzca una respuesta final o se alcance un límite de recursión.

Le proporcionamos:

* `model`: El LLM a utilizar (nuestra instancia de `ChatOpenAI`).
* `tools`: La lista de funciones disponibles para el agente.
* `name`: Un nombre único para este agente (útil para su identificación en un sistema multiagente).
* `prompt`: El prompt del sistema que guía el comportamiento del LLM.
* `state_schema`: La clase `State` que definimos, asegurando consistencia entre agentes.
* `checkpointer` y `store`: Nuestros mecanismos de memoria para el estado a nivel de conversación y datos del usuario a largo plazo.


In [None]:
from langgraph.prebuilt import create_react_agent # Import the pre-built ReAct agent creator

# Define the invoice information subagent using the pre-built `create_react_agent`.
# This function internally sets up the nodes (LLM and ToolNode) and edges for a ReAct loop.
invoice_information_subagent = create_react_agent(
    llm,                          # The language model to use for reasoning
    tools=invoice_tools,            # The list of tools available to this agent
    name="invoice_information_subagent", # A unique name for this agent within the graph
    prompt=invoice_subagent_prompt, # The system prompt for this agent's persona and instructions
    state_schema=State,             # The shared state schema for the graph
    checkpointer=checkpointer,      # The checkpointer for short-term (thread-level) memory
    store = in_memory_store         # The in-memory store for long-term user data
)

#### Testing the Invoice Information Sub-Agent!

Let's test our newly created invoice sub-agent to ensure it correctly processes queries related to invoices and utilizes its specific tools.

In [None]:
thread_id = uuid.uuid4() # Generate a new unique thread ID for this test conversation.

# Define a sample question for the invoice sub-agent.
question = "My customer id is 1. What was my most recent invoice, and who was the employee that helped me with it?"

# Set up the configuration with the thread ID.
config = {"configurable": {"thread_id": thread_id}}

# Invoke the invoice sub-agent with the question and configuration.
result = invoice_information_subagent.invoke({"messages": [HumanMessage(content=question)]}, config=config)

# Print the conversation history from the result for verification.
for message in result["messages"]:
    message.pretty_print()

## Parte 2: Construcción de una Arquitectura Multiagente (Supervisor)

Ahora que tenemos dos subagentes especializados—uno para consultas sobre el catálogo musical y otro para información de facturación—el siguiente paso lógico es garantizar que las consultas de los clientes se dirijan al agente adecuado. Aquí es donde entra en juego un **agente supervisor**.

El supervisor actúa como coordinador central, supervisando el flujo de trabajo e invocando de forma inteligente al subagente más relevante según la consulta del cliente. Esto elimina la necesidad de que cada subagente entienda todos los tipos de consultas posibles, permitiéndoles enfocarse en su especialidad.

### El Rol del Supervisor

![Arquitectura del Supervisor](images/supervisor.png)

Las responsabilidades principales del supervisor incluyen:

* **Reconocimiento de Intención**: Analiza la consulta del cliente para determinar su intención principal (por ejemplo, relacionada con música, facturación u otra categoría).

  * `music_catalog_information_subagent`: Atiende consultas sobre canciones, álbumes, artistas, géneros y preferencias musicales.
  * `invoice_information_subagent`: Atiende consultas sobre compras anteriores, facturas y detalles de facturación.
* **Enrutamiento**: Dirige la consulta al subagente especializado apropiado según la intención reconocida.
* **Orquestación (Implícita)**: Aunque el supervisor realiza el *enrutamiento* de forma explícita, orquesta de forma implícita al asegurar que el agente correcto esté activo en el momento adecuado. En conversaciones de múltiples turnos, puede reenrutar o permitir que el subagente actual continúe.

LangGraph ofrece una utilidad preconstruida llamada `create_supervisor`, que simplifica la creación de este mecanismo de enrutamiento. Esta función configura un potente enrutador basado en LLM que aprovecha las descripciones y capacidades de los subagentes para tomar decisiones de enrutamiento.


Primero, crearemos un conjunto de instrucciones para nuestro supervisor. Este prompt define la personalidad del supervisor, su rol como planificador y enrutador, y las capacidades de los subagentes que supervisa.


In [None]:
supervisor_prompt = """You are an expert customer support assistant for a digital music store. 
You are dedicated to providing exceptional service and ensuring customer queries are answered thoroughly. 
You have a team of subagents that you can use to help answer queries from customers. 
Your primary role is to serve as a supervisor/planner for this multi-agent team that helps answer queries from customers. 

Your team is composed of two subagents that you can use to help answer the customer's request:
1. music_catalog_information_subagent: this subagent has access to user's saved music preferences. It can also retrieve information about the digital music store's music 
catalog (albums, tracks, songs, etc.) from the database. 
3. invoice_information_subagent: this subagent is able to retrieve information about a customer's past purchases or invoices 
from the database. 

Based on the existing steps that have been taken in the messages, your role is to generate the next subagent that needs to be called. 
This could be one step in an inquiry that needs multiple sub-agent calls. """

In [None]:
from langgraph_supervisor import create_supervisor # Import the pre-built supervisor creator

# Create the supervisor workflow using the `create_supervisor` utility.
# This function dynamically sets up the graph to route between the provided agents.
supervisor_prebuilt_workflow = create_supervisor(
    agents=[invoice_information_subagent, music_catalog_subagent], # List of sub-agents the supervisor can route to
    output_mode="last_message", # Specifies that the supervisor should output only the last message from the routed agent.
                                # Alternative is "full_history" to get all messages from the sub-agent.
    model=llm,                # The LLM to act as the supervisor (for routing decisions).
    prompt=(supervisor_prompt), # The system prompt guiding the supervisor's behavior.
    state_schema=State          # The shared state schema for the entire multi-agent graph.
)

# Compile the supervisor workflow into a runnable object.
# This makes it ready for invocation and integrates it with our memory systems.
supervisor_prebuilt = supervisor_prebuilt_workflow.compile(name="music_catalog_subagent", checkpointer=checkpointer, store=in_memory_store)

# Display a visualization of the compiled supervisor graph.
# Notice how the supervisor acts as the central hub, directing traffic to its sub-agents.
show_graph(supervisor_prebuilt)

#### Prueba del Supervisor Multiagente

Probemos nuestro sistema multiagente recién compilado con el supervisor en funcionamiento. Proporcionaremos una consulta que pueda involucrar tanto información musical como de facturación, o principalmente un dominio, para observar cómo el supervisor enruta correctamente la solicitud.


In [None]:
thread_id = uuid.uuid4() # Generate a fresh thread ID for this conversation.

# Define a question that involves both invoice and music information.
question = "My customer ID is 1. How much was my most recent purchase? What albums do you have by U2?"

# Configure the invocation with the thread ID.
config = {"configurable": {"thread_id": thread_id}}

# Invoke the `supervisor_prebuilt` graph with the human message.
# The supervisor will analyze the question, route it to the appropriate sub-agent(s), 
# and return the final response from the last active agent.
result = supervisor_prebuilt.invoke({"messages": [HumanMessage(content=question)]}, config=config)

# Print the messages from the resulting state to see the conversation flow and final answer.
for message in result["messages"]:
    message.pretty_print()

## Parte 3: Añadiendo Verificación del Cliente mediante Human-in-the-Loop

Actualmente, nuestro agente asume que tenemos el `customer_id` disponible o proporcionado directamente en la consulta inicial. En un escenario realista de soporte al cliente, a menudo necesitamos **primero verificar la identidad del cliente** antes de procesar consultas sensibles como detalles de facturación. Esta verificación puede implicar pedir al usuario un identificador (por ejemplo, correo electrónico, teléfono, ID de cliente) y luego buscarlo en la base de datos.

Para implementar esto, introduciremos un componente de **human-in-the-loop**. Esto significa que el grafo puede *pausar* la ejecución y esperar una entrada adicional del usuario (o de un agente humano) antes de continuar. El mecanismo `interrupt` de LangGraph es perfecto para esto.

![Integración de Entrada Humana](images/human_input.png)

En este paso, añadiremos dos nuevos nodos a nuestro flujo de trabajo:

* **Nodo `verify_info`**: Este nodo intentará extraer un identificador del cliente a partir del input del usuario y verificarlo contra nuestra base de datos. Si lo encuentra, actualiza el `customer_id` en el estado del grafo. Si no se encuentra o no se proporciona, solicita al usuario la información.
* **Nodo `human_input`**: Es un nodo simple que activa explícitamente una `interrupt`, pausando la ejecución del grafo hasta que el usuario proporcione la información necesaria para continuar.

También aprovecharemos las capacidades de salida estructurada de LangChain para analizar confiablemente la entrada del usuario en busca de identificadores usando esquemas Pydantic.


In [None]:
from pydantic import BaseModel, Field # Pydantic for defining data schemas and field validation

# Define a Pydantic BaseModel to structure the expected user input for account information.
# This helps the LLM to parse specific entities (identifier) from free-form text.
class UserInput(BaseModel):
    """Schema for parsing user-provided account information."""
    # `identifier` field: Expects a string, with a description for the LLM to understand its purpose.
    identifier: str = Field(description = "Identifier, which can be a customer ID, email, or phone number.")

# Bind the Pydantic schema to our LLM using `with_structured_output`.
# This forces the LLM to generate output that conforms to the `UserInput` schema, making parsing reliable.
structured_llm = llm.with_structured_output(schema=UserInput)

# Define a system prompt specifically for the structured LLM.
# This prompt instructs the LLM on how to extract the customer identifier from messages.
structured_system_prompt = """You are a customer service representative responsible for extracting customer identifier.\n 
Only extract the customer's account information from the message history. 
If they haven't provided the information yet, return an empty string for the file"""

In [None]:
from typing import Optional # For type hinting optional values

# Helper function to retrieve a customer ID from various identifiers (ID, phone, email).
def get_customer_id_from_identifier(identifier: str) -> Optional[int]:
    """
    Retrieve Customer ID using an identifier, which can be a customer ID, email, or phone number.
    
    Args:
        identifier (str): The identifier can be customer ID, email, or phone.
    
    Returns:
        Optional[int]: The CustomerId if found, otherwise None.
    """
    # Check if the identifier is purely numeric, indicating a direct customer ID.
    if identifier.isdigit():
        return int(identifier)
    
    # Check if the identifier starts with '+', suggesting a phone number.
    elif identifier[0] == "+":
        query = f"SELECT CustomerId FROM Customer WHERE Phone = '{identifier}';"
        result = db.run(query)
        formatted_result = ast.literal_eval(result) # Safely evaluate string to list/tuple
        if formatted_result:
            return formatted_result[0][0] # Return the first CustomerId found
    
    # Check if the identifier contains '@', suggesting an email address.
    elif "@" in identifier:
        query = f"SELECT CustomerId FROM Customer WHERE Email = '{identifier}';"
        result = db.run(query)
        formatted_result = ast.literal_eval(result)
        if formatted_result:
            return formatted_result[0][0] # Return the first CustomerId found
    
    # If no matching identifier type is found or no ID is retrieved, return None.
    return None 

In [None]:
# Define the `verify_info` node function.
# This node is responsible for verifying the customer's identity based on their input.
def verify_info(state: State, config: RunnableConfig):
    """Verify the customer's account by parsing their input and matching it with the database."""

    # Check if a customer_id is already present in the state.
    # If it is, verification is complete, and the node does nothing (passes).
    if state.get("customer_id") is None: 
        # System instructions for the verification LLM.
        system_instructions = """You are a music store agent, where you are trying to verify the customer identity 
        as the first step of the customer support process. 
        Only after their account is verified, you would be able to support them on resolving the issue. 
        In order to verify their identity, one of their customer ID, email, or phone number needs to be provided.
        If the customer has not provided the information yet, please ask them for it.
        If they have provided the identifier but cannot be found, please ask them to revise it."""

        # Get the most recent user message from the state.
        user_input = state["messages"][-1] 
    
        # Use the structured LLM to parse the user's input for an identifier.
        # It combines the structured system prompt with the user's message.
        parsed_info = structured_llm.invoke([SystemMessage(content=structured_system_prompt)] + [user_input])
    
        # Extract the identified identifier string.
        identifier = parsed_info.identifier
    
        customer_id = "" # Initialize customer_id as an empty string.
        # Attempt to find the customer ID in the database using the helper function.
        if (identifier):
            customer_id = get_customer_id_from_identifier(identifier)
    
        # If a valid customer_id was found,
        if customer_id != "":
            # Create a system message confirming verification.
            intent_message = SystemMessage(
                content= f"Thank you for providing your information! I was able to verify your account with customer id {customer_id}."
            )
            # Update the state with the found customer_id and the confirmation message.
            return {
                  "customer_id": customer_id,
                  "messages" : [intent_message]
                  }
        # If no customer_id was found or provided,
        else:
          # Invoke the base LLM with instructions to prompt the user for their identifier or revise it.
          response = llm.invoke([SystemMessage(content=system_instructions)]+state['messages'])
          # Update the state with the LLM's response (the prompt for user input).
          return {"messages": [response]}

    else: 
        # If `customer_id` is already in state, this node does nothing.
        # This `pass` implies that the graph will simply proceed to the next edge, 
        # as defined in the graph compilation.
        pass

Ahora, vamos a crear nuestro nodo `human_input`. Este nodo está diseñado para activar una `interrupt` en el grafo. Cuando ocurre una interrupción, el grafo se pausa y el control regresa al llamador (por ejemplo, el notebook o una aplicación). El llamador puede entonces elegir reanudar el grafo, proporcionando opcionalmente nueva información.

Así es como implementamos la interacción human-in-the-loop para la verificación del cliente: el grafo solicita un identificador, se pausa y espera a que el usuario lo proporcione.


In [None]:
from langgraph.types import interrupt # Import the `interrupt` function for pausing graph execution

# Define the `human_input` node function.
# This node serves as a placeholder to signal that human intervention is required.
def human_input(state: State, config: RunnableConfig):
    """ No-op node that should be interrupted on """
    # `interrupt("Please provide input.")` pauses the graph execution.
    # The string message is passed as a reason for the interrupt.
    # When the graph is resumed, the new input will be stored in `user_input`.
    user_input = interrupt("Please provide input.")
    
    # The new user input (after resume) is then added to the messages in the state.
    return {"messages": [user_input]}

¡Vamos a juntar todo esto! Integraremos los nodos `verify_info` y `human_input` en una nueva estructura de grafo. El flujo será el siguiente:

1. **`START`** -> **`verify_info`**: Todas las consultas entrantes intentan primero verificar al cliente.
2. **`verify_info` (Bifurcación condicional)**:

   * Si *no se encuentra* el `customer_id` (lo que significa que la verificación falló o que falta entrada), se redirige a **`human_input`**.
   * Si *se encuentra* el `customer_id` (es decir, verificación exitosa), se redirige al **`supervisor`**.
3. **`human_input`** -> **`verify_info`**: Después de que el usuario proporciona la información para reanudar el grafo, se vuelve a intentar la verificación pasando nuevamente por `verify_info`.
4. **`supervisor`** -> **`END`**: Una vez que la consulta principal es atendida por el supervisor y sus subagentes, el grafo finaliza.

Esta configuración garantiza que la identidad del cliente sea verificada antes de cualquier otra acción, y maneja de forma elegante los casos en que la identidad necesita ser proporcionada o reintentada.


In [None]:
# Define the conditional edge function for `verify_info`.
# This function checks if a `customer_id` has been successfully set in the state.
def should_interrupt(state: State, config: RunnableConfig):
    # If `customer_id` is present, it means verification was successful or already done, so continue.
    if state.get("customer_id") is not None:
        return "continue"
    # Otherwise, it means customer ID is missing or couldn't be verified, so interrupt for human input.
    else:
        return "interrupt"

In [None]:
# Initialize a new StateGraph for our multi-agent system with human-in-the-loop verification.
multi_agent_verify = StateGraph(State)

# Add the `verify_info` node for customer identity verification.
multi_agent_verify.add_node("verify_info", verify_info)

# Add the `human_input` node, which triggers an interrupt to get user input.
multi_agent_verify.add_node("human_input", human_input)

# Add the `supervisor` node, which orchestrates the sub-agents for query handling.
multi_agent_verify.add_node("supervisor", supervisor_prebuilt)

# Define the entry point: all interactions start with customer verification.
multi_agent_verify.add_edge(START, "verify_info")

# Define the conditional routing after `verify_info`.
# `should_interrupt` decides whether to continue to the supervisor or prompt for human input.
multi_agent_verify.add_conditional_edges(
    "verify_info",     # Source node
    should_interrupt,  # Conditional function
    {
        # If verification is successful, continue to the main supervisor agent.
        "continue": "supervisor",
        # If verification is needed (or failed), route to `human_input` to prompt the user.
        "interrupt": "human_input",
    },
)

# After `human_input` (once resumed), loop back to `verify_info` to try verification again.
multi_agent_verify.add_edge("human_input", "verify_info")

# The supervisor is the final processing stage before the graph ends.
multi_agent_verify.add_edge("supervisor", END)

# Compile the complete graph, integrating all nodes and edges with our memory systems.
multi_agent_verify_graph = multi_agent_verify.compile(name="multi_agent_verify", checkpointer=checkpointer, store=in_memory_store)

# Display the visualization of the new graph.
show_graph(multi_agent_verify_graph)

#### Probando la Verificación Human-in-the-Loop

Vamos a probar nuestro grafo actualizado. Comenzaremos con una pregunta que requiere un ID de cliente, pero sin proporcionarlo inicialmente. Esto debería activar el nodo `human_input` y pausar la ejecución.


In [None]:
thread_id = uuid.uuid4() # Generate a new unique thread ID.

# Initial question without providing customer ID.
question = "How much was my most recent purchase?"

# Configuration for the graph invocation.
config = {"configurable": {"thread_id": thread_id}}

# Invoke the graph. This first invocation should hit the `human_input` node and interrupt.
result = multi_agent_verify_graph.invoke({"messages": [HumanMessage(content=question)]}, config=config)

# Print messages to observe the agent asking for the customer ID.
for message in result["messages"]:
    message.pretty_print()

In [None]:
from langgraph.types import Command # Import Command for resuming graph execution

# Now, we simulate the user providing their phone number to resume the conversation.
question = "My phone number is +55 (12) 3923-5555."

# Resume from the interrupt using `Command(resume=...)`. 
# The `resume` argument carries the new user input, which gets processed by `human_input` node 
# and then passed back to `verify_info`.
# The `config` must be the same as the initial invocation to resume the correct thread.
result = multi_agent_verify_graph.invoke(Command(resume=question), config=config)

# Print the conversation messages to see the verification and subsequent processing.
for message in result["messages"]:
    message.pretty_print()

Ahora, si hacemos una pregunta de seguimiento dentro del *mismo hilo*, el estado de nuestro agente (gestionado por el checkpointer) ya *almacenará nuestro `customer_id`*. Esto significa que el nodo `verify_info` simplemente hará `pass` sin volver a pedir la información, y la consulta se enviará directamente al supervisor, demostrando el beneficio de la memoria a corto plazo.


In [None]:
# Follow-up question in the same thread (using the same `thread_id`).
question = "What albums do you have by the Rolling Stones?"

# Invoke the graph again. Since the `customer_id` is already in the state,
# the verification step will be skipped, and the query will directly go to the supervisor.
result = multi_agent_verify_graph.invoke({"messages": [HumanMessage(content=question)]}, config=config)

# Print the results. You should see the music catalog sub-agent's response directly.
for message in result["messages"]:
    message.pretty_print()

## Parte 4: Añadiendo Memoria a Largo Plazo (Preferencias del Usuario)

Hemos construido con éxito un flujo de trabajo de agente que maneja la verificación y la ejecución multiagente. Vamos a mejorarlo aún más integrando la **memoria a largo plazo**. Mientras que la memoria a corto plazo (mediante `checkpointers`) mantiene el contexto dentro de una sola conversación, la memoria a largo plazo permite que nuestro agente almacene y recupere información *entre conversaciones* o a través de diferentes sesiones para el mismo usuario. Esto es crucial para la personalización.

![Integración de Memoria](images/memory.png)

En este paso, añadiremos dos nuevos nodos para gestionar las preferencias del usuario relacionadas con la música:

* **Nodo `load_memory`**: Este nodo cargará cualquier preferencia musical existente asociada con el `customer_id` verificado desde nuestro `InMemoryStore` (nuestra memoria a largo plazo) al `State` actual del grafo (`loaded_memory`). Esto garantiza que el agente tenga el contexto relevante para generar respuestas personalizadas.

* **Nodo `create_memory`**: Después de que se procese la consulta principal, este nodo analizará la conversación que acaba de tener lugar. Si el cliente compartió algún nuevo interés musical, actualizará o creará un `UserProfile` en el `InMemoryStore`, guardando estas preferencias para futuras interacciones.


In [None]:
from langgraph.store.base import BaseStore # Base class for defining custom stores for LangGraph

# Helper function to format user memory (music preferences) into a readable string.
def format_user_memory(user_data):
    """Formats music preferences from users, if available."""
    profile = user_data['memory'] # Access the 'memory' key from the stored dictionary
    result = "" # Initialize an empty string for the formatted result
    
    # Check if the profile object has a 'music_preferences' attribute and if it's not empty.
    if hasattr(profile, 'music_preferences') and profile.music_preferences:
        # If preferences exist, join them into a comma-separated string.
        result += f"Music Preferences: {', '.join(profile.music_preferences)}"
    
    return result.strip() # Return the formatted string, removing any leading/trailing whitespace.

# Define the `load_memory` node function.
# This node loads a user's long-term memory (music preferences) into the current state.
def load_memory(state: State, config: RunnableConfig, store: BaseStore):
    """Loads music preferences from users, if available."""
    
    user_id = state["customer_id"] # Get the current customer ID from the state.
    namespace = ("memory_profile", user_id) # Define a namespace for storing user-specific memory.
                                          # This creates a unique key for each user's profile.
    
    # Attempt to retrieve existing memory for this user from the `InMemoryStore`.
    existing_memory = store.get(namespace, "user_memory")
    
    formatted_memory = "" # Initialize formatted memory as empty.
    
    # If memory exists and has a value, format it using our helper function.
    if existing_memory and existing_memory.value:
        formatted_memory = format_user_memory(existing_memory.value)

    # Update the `loaded_memory` field in the state with the retrieved and formatted memory.
    return {"loaded_memory" : formatted_memory}

In [None]:
# Define a Pydantic BaseModel to structure the `UserProfile` for long-term memory.
# This ensures that user preferences are stored in a consistent and verifiable format.
class UserProfile(BaseModel):
    # `customer_id`: Required field for the customer's unique identifier.
    customer_id: str = Field(
        description="The customer ID of the customer"
    )
    # `music_preferences`: A list of strings to store the customer's music interests.
    music_preferences: List[str] = Field(
        description="The music preferences of the customer"
    )

In [None]:
# Define the system prompt for the `create_memory` LLM.
# This prompt instructs an LLM to act as an analyst, analyzing conversation history
# to extract and update user music preferences in a structured `UserProfile` format.
create_memory_prompt = """You are an expert analyst that is observing a conversation that has taken place between a customer and a customer support assistant. The customer support assistant works for a digital music store, and has utilized a multi-agent team to answer the customer's request. 
You are tasked with analyzing the conversation that has taken place between the customer and the customer support assistant, and updating the memory profile associated with the customer. The memory profile may be empty. If it's empty, you should create a new memory profile for the customer.

You specifically care about saving any music interest the customer has shared about themselves, particularly their music preferences to their memory profile.

To help you with this task, I have attached the conversation that has taken place between the customer and the customer support assistant below, as well as the existing memory profile associated with the customer that you should either update or create. 

The customer's memory profile should have the following fields:
- customer_id: the customer ID of the customer
- music_preferences: the music preferences of the customer

These are the fields you should keep track of and update in the memory profile. If there has been no new information shared by the customer, you should not update the memory profile. It is completely okay if you do not have new information to update the memory profile with. In that case, just leave the values as they are.

*IMPORTANT INFORMATION BELOW*

The conversation between the customer and the customer support assistant that you should analyze is as follows:
{conversation}

The existing memory profile associated with the customer that you should either update or create based on the conversation is as follows:
{memory_profile}

Ensure your response is an object that has the following fields:
- customer_id: the customer ID of the customer
- music_preferences: the music preferences of the customer

For each key in the object, if there is no new information, do not update the value, just keep the value that is already there. If there is new information, update the value. 

Take a deep breath and think carefully before responding.
"""

# Define the `create_memory` node function.
# This node is responsible for analyzing the conversation and saving/updating user music preferences.
def create_memory(state: State, config: RunnableConfig, store: BaseStore):
    user_id = str(state["customer_id"]) # Get the customer ID from the current state (convert to string).
    namespace = ("memory_profile", user_id) # Define the namespace for this user's memory profile.
    
    # Retrieve the existing memory profile for this user from the long-term store.
    existing_memory = store.get(namespace, "user_memory")
    
    formatted_memory = "" # Initialize formatted memory for the prompt.
    if existing_memory and existing_memory.value:
        existing_memory_dict = existing_memory.value # Get the dictionary containing the UserProfile instance.
        # Format existing music preferences into a string for the prompt.
        formatted_memory = (
            f"Music Preferences: {', '.join(existing_memory_dict.get('memory').music_preferences or [])}" # Access the UserProfile object via 'memory' key
        )

    # Create a SystemMessage with the formatted prompt, injecting the full conversation history
# and the existing memory profile.
    formatted_system_message = SystemMessage(content=create_memory_prompt.format(conversation=state["messages"], memory_profile=formatted_memory))
    
    # Invoke the LLM with structured output (`UserProfile`) to analyze the conversation
    # and update the memory profile based on new information.
    updated_memory = llm.with_structured_output(UserProfile).invoke([formatted_system_message])
    
    key = "user_memory" # Define the key for storing this specific memory object.
    
    # Store the updated memory profile back into the `InMemoryStore`.
    # We wrap `updated_memory` in a dictionary under the key 'memory' for consistency in access.
    store.put(namespace, key, {"memory": updated_memory})

In [None]:
# Initialize the final StateGraph for our complete multi-agent system, including memory management.
multi_agent_final = StateGraph(State)

# Add all necessary nodes to the graph.
multi_agent_final.add_node("verify_info", verify_info)         # Node for customer verification
multi_agent_final.add_node("human_input", human_input)         # Node for human-in-the-loop interruption
multi_agent_final.add_node("load_memory", load_memory)         # Node for loading user long-term memory
multi_agent_final.add_node("supervisor", supervisor_prebuilt) # Supervisor for routing to sub-agents
multi_agent_final.add_node("create_memory", create_memory)     # Node for saving/updating user long-term memory

# Define the initial entry point: all interactions start with verification.
multi_agent_final.add_edge(START, "verify_info")

# Define the conditional routing after `verify_info`.
# If verification is successful, proceed to load memory. Otherwise, prompt for human input.
multi_agent_final.add_conditional_edges(
    "verify_info",
    should_interrupt,
    {
        "continue": "load_memory", # If verified, load user memory
        "interrupt": "human_input", # If not verified, request human input
    },
)

# After `human_input` (resume), loop back to `verify_info` to re-attempt verification.
multi_agent_final.add_edge("human_input", "verify_info")

# After loading memory, proceed to the supervisor for main query processing.
multi_agent_final.add_edge("load_memory", "supervisor")

# After the supervisor completes, save/update the user's memory.
multi_agent_final.add_edge("supervisor", "create_memory")

# The graph ends after memory has been updated.
multi_agent_final.add_edge("create_memory", END)

# Compile the entire, sophisticated graph.
multi_agent_final_graph = multi_agent_final.compile(name="multi_agent_verify", checkpointer=checkpointer, store=in_memory_store)

# Display the visualization of the complete graph.
show_graph(multi_agent_final_graph)

¡Vamos a probarlo! Usaremos una consulta compleja que requiere verificación, luego aborda tanto información musical como de facturación, y además incluye una preferencia musical que debería guardarse en la memoria a largo plazo.


In [None]:
thread_id = uuid.uuid4() # Generate a fresh unique thread ID for this demonstration.

# A comprehensive question that includes customer ID, invoice query, and music preference.
question = "My phone number is +55 (12) 3923-5555. How much was my most recent purchase? What albums do you have by the Rolling Stones?"

# Configuration for the graph invocation.
# Note: The user_id is passed as a configurable parameter, although in this specific example,
# the customer_id is extracted dynamically by the verify_info node. 
# For real-world use, ensure consistent handling of user identifiers.
config = {"configurable": {"thread_id": thread_id, "user_id" : "1"}}

# Invoke the final multi-agent graph.
# This will run through verification, memory loading, supervisor routing (to invoice then music),
# and finally memory saving.
result = multi_agent_final_graph.invoke({"messages": [HumanMessage(content=question)]}, config=config)

# Print all messages in the final state to observe the complete interaction flow.
for message in result["messages"]:
    message.pretty_print()

Echemos un vistazo a la memoria. Esperamos que las preferencias musicales (por ejemplo, "Rolling Stones") se hayan guardado en nuestro `in_memory_store` bajo el ID de cliente asociado con el número de teléfono `+55 (12) 3923-5555` (que corresponde al ID de cliente 1 en nuestra base de datos Chinook).


In [None]:
user_id = "1" # The customer ID we expect to be associated with the phone number used.
namespace = ("memory_profile", user_id) # The namespace used to store this user's memory.

# Retrieve the user's memory profile from the `in_memory_store`.
# `.value` retrieves the actual data stored, which should be a dictionary containing the UserProfile instance.
memory_data = in_memory_store.get(namespace, "user_memory")

# Check if memory_data exists and has a 'memory' key (which holds the UserProfile object).
if memory_data and "memory" in memory_data.value:
    saved_music_preferences = memory_data.value.get("memory").music_preferences
else:
    saved_music_preferences = [] # Default to empty list if no preferences found.

print(f"Saved Music Preferences for Customer ID {user_id}: {saved_music_preferences}")

## (Opcional) Construir un Gráfico Multi-Agente Tipo Swarm

Más allá del patrón centralizado con supervisor, otro enfoque poderoso para sistemas multi-agente es la **arquitectura Swarm**. Mientras que el supervisor depende de un agente central que orquesta todo, el modelo swarm enfatiza la colaboración descentralizada.

### Arquitectura Swarm

![Arquitectura Swarm](images/swarm.png)

En un swarm, múltiples agentes especializados trabajan juntos sin un coordinador central. Cada agente conoce las capacidades de los demás y puede *transferir directamente* una tarea al par más adecuado cuando su propia experiencia no es suficiente o cuando una consulta cruza dominios. Esto crea un flujo dinámico y flexible donde los agentes se pasan el control entre sí según sea necesario.

En LangChain, hemos creado una [librería ligera](https://github.com/langchain-ai/langgraph-swarm-py) para ayudar a crear agentes Swarm fácilmente con LangGraph.

### Swarm vs Supervisor

![Comparación Swarm vs Supervisor](images/supervisor_vs_swarm.png)

Recapitulemos las diferencias clave:

| Característica         | Arquitectura con Supervisor                      | Arquitectura Swarm                                 |
| :--------------------- | :----------------------------------------------- | :------------------------------------------------- |
| **Flujo de control**   | Centralizado; el supervisor enruta consultas.    | Descentralizado; los agentes se pasan tareas.      |
| **Toma de decisiones** | El supervisor decide qué agente llamar.          | Cada agente decide a quién pasar el control.       |
| **Jerarquía**          | Jerarquía clara: Supervisor -> Sub-agentes.      | Estructura plana y colaborativa.                   |
| **Modularidad**        | Añadir/quitar agentes modificando el supervisor. | Los agentes deben conocer posibles transferencias. |
| **Previsibilidad**     | Enrutamiento más predecible.                     | Comportamiento dinámico y emergente.               |
| **Resiliencia**        | El supervisor es un punto único de falla.        | Más resiliente a fallas individuales.              |

La elección entre supervisor y swarm depende del caso de uso. Si necesitas control estricto y un flujo claro, el supervisor suele ser la mejor opción. Si tu problema se beneficia de una resolución de problemas colaborativa y dinámica, un swarm puede ser más adecuado.

Para más información, hay un excelente video de Lance, del equipo de LangChain, que explica en detalle: [Multi-agent swarms with LangGraph](https://www.youtube.com/watch?v=JeyDrn1dSUQ)

¡Vamos a crear agentes swarm!


In [None]:
from langgraph_swarm import create_handoff_tool, create_swarm # Import utilities for creating swarm agents and handoff tools

# Create our handoff tools between agents.
# These are special tools that, when called by an agent, signal a transfer of control
# to another named agent within the swarm.

transfer_to_invoice_agent_handoff_tool = create_handoff_tool(
    agent_name = "invoice_information_agent_with_handoff", # The name of the target agent for this handoff
    description = "Transfer user to the invoice information agent that can help with invoice information" # Description for LLM
)

transfer_to_music_catalog_agent_handoff_tool = create_handoff_tool(
    agent_name = "music_catalog_agent_with_handoff", 
    description = "Transfer user to the music catalog agent that can help with music searches and music catalog information"
)

# Recreate our agents, but this time, add the handoff tools to their available tools.
# This allows each agent to `request` a handoff to the other when appropriate.

# First, combine the handoff tools with the existing specific tools for each agent.
invoice_tools_with_handoff = [transfer_to_music_catalog_agent_handoff_tool] + invoice_tools
music_tools_with_handoff = [transfer_to_invoice_agent_handoff_tool] + music_tools

# Recreate the invoice information agent with its original prompt and its new set of tools (including handoff).
invoice_information_agent_with_handoff = create_react_agent(
    llm,
    invoice_tools_with_handoff,
    prompt = invoice_subagent_prompt,
    name = "invoice_information_agent_with_handoff" # Give it a specific name for the swarm
)

# Recreate the music catalog agent with its original prompt and its new set of tools (including handoff).
# Note: The music catalog agent prompt is generated dynamically, as defined earlier.
music_catalog_agent_with_handoff = create_react_agent(
    llm,
    music_tools_with_handoff,
    prompt = generate_music_assistant_prompt(),
    name = "music_catalog_agent_with_handoff" # Give it a specific name for the swarm
)


# Create the swarm workflow. `create_swarm` handles the orchestration logic
# for agents to hand off to each other without a central supervisor.
swarm_workflow = create_swarm(
    agents = [invoice_information_agent_with_handoff, music_catalog_agent_with_handoff], # The agents participating in the swarm
    default_active_agent = "invoice_information_agent_with_handoff", # The agent that will be active first by default
)

# Compile the swarm graph. This makes it runnable and integrates memory.
swarm_agents = swarm_workflow.compile(
    checkpointer = checkpointer,
    store = in_memory_store
)

# Display the graph of the swarm. Notice it's different from the supervisor graph,
# showing connections for potential handoffs rather than a central hub.
show_graph(swarm_agents)

Ahora ¡probémoslo! Le daremos una pregunta relacionada con música, pero como el `default_active_agent` está configurado como el agente de facturas, debería *transferir* la tarea al agente del catálogo musical.


In [None]:
# Create a new thread for this swarm test.
thread_id = uuid.uuid4()

# Ask a music-related question.
question = "Do you have any albums by the Rolling Stones?"

# Configure the invocation with the thread ID.
config = {"configurable": {"thread_id": thread_id}}

# Invoke the swarm agents. Even though the default active agent is `invoice_information_agent_with_handoff`,
# it should recognize that the query is for music and hand off to `music_catalog_agent_with_handoff`.
result = swarm_agents.invoke({"messages": [HumanMessage(content=question)]}, config=config)

# Print the messages to observe the handoff and the final response.
for message in result["messages"]:
    message.pretty_print()

## Evaluations

**Evaluations** son una forma cuantitativa y sistemática de medir el rendimiento de tus aplicaciones con LLM, especialmente agentes. Son fundamentales porque los LLM no siempre se comportan de manera predecible — incluso pequeños cambios en los prompts, modelos o entradas pueden afectar significativamente los resultados. Las evaluaciones ofrecen un método estructurado para:

1. **Identificar Fallos**: Detectar dónde y por qué tu agente no está funcionando como se espera.
2. **Comparar Versiones**: Comparar cuantitativamente diferentes versiones de tu aplicación (por ejemplo, tras cambios en el prompt, actualizaciones del modelo o cambios arquitectónicos).
3. **Construir Fiabilidad**: Mejorar iterativamente la calidad de tu agente y asegurar que cumple con los estándares de rendimiento deseados.

Una evaluación típicamente incluye tres componentes principales:

1. **Un Conjunto de Datos**: Una colección de entradas de prueba y sus salidas esperadas (ground truth). Esto sirve como referencia para medir el rendimiento de tu aplicación.
2. **Una Aplicación o Función Objetivo**: La parte específica de tu aplicación con LLM (por ejemplo, un agente, cadena o nodo) que deseas evaluar. Esta función toma una entrada y devuelve una salida.
3. **Evaluadores**: Métricas o modelos (a menudo LLMs actuando como "jueces") que puntúan las salidas de tu función objetivo comparándolas con la ground truth o con criterios específicos.

![Evaluation Conceptual Diagram](images/evals-conceptual.png)

Existen muchas formas de evaluar un agente, dependiendo del aspecto de su rendimiento que se quiera medir. Hoy veremos tres tipos comunes de evaluaciones para agentes:

1. **Evaluación de la Respuesta Final**: Evalúa la calidad de la respuesta final del agente ante una consulta, tratándolo como una caja negra.
2. **Evaluación de Paso Único**: Se centra en el rendimiento de un paso específico y crítico durante la ejecución del agente (por ejemplo, si seleccionó correctamente una herramienta).
3. **Evaluación de la Trayectoria**: Analiza toda la secuencia de pasos (la "trayectoria") que toma un agente, evaluando si sigue el camino esperado de llamadas a herramientas y razonamiento interno.


### Evaluating The Final Response

Evaluar la respuesta final es la forma más común y a menudo la más simple de medir la efectividad general de un agente. Consiste en tratar al agente como una "caja negra" y centrarse únicamente en si la salida final responde con éxito a la intención del usuario, sin importar los pasos intermedios que haya tomado.

* **Entrada**: La consulta original del usuario.
* **Salida**: La respuesta final del agente, dirigida al usuario.

![Final Response Evaluation](images/final-response.png)


#### 1. Crear un conjunto de datos para la evaluación de la respuesta final

Usaremos LangSmith para crear y gestionar nuestros conjuntos de datos de evaluación. Un conjunto de datos consta de `inputs` (las preguntas de los usuarios) y `outputs` (las respuestas correctas esperadas). Esta verdad de base permite a los evaluadores comparar la respuesta generada por el agente con lo que se considera correcto.


In [None]:
from langsmith import Client # Import the LangSmith Client for dataset and experiment management

client = Client() # Initialize the LangSmith client. This will connect to your LangSmith account.

# Define a list of example inputs and expected outputs for our dataset.
# Each dictionary represents a test case with a 'question' (input) and a 'response' (ground truth output).
examples = [
    {
        "question": "My name is Aaron Mitchell. My number associated with my account is +1 (204) 452-6452. I am trying to find the invoice number for my most recent song purchase. Could you help me with it?",
        "response": "The Invoice ID of your most recent purchase was 342.",
    },
    {
        "question": "I'd like a refund.",
        "response": "I need additional information to help you with the refund. Could you please provide your customer identifier so that we can fetch your purchase history?",
    },
    {
        "question": "Who recorded Wish You Were Here again?",
        "response": "Wish You Were Here is an album by Pink Floyd",
    },
    { 
        "question": "What albums do you have by Coldplay?",
        "response": "There are no Coldplay albums available in our catalog at the moment.",
    },
]

dataset_name = "LangGraph 101 Multi-Agent: Final Response" # Define a name for our dataset.

# Check if the dataset already exists in LangSmith to avoid recreation.
if not client.has_dataset(dataset_name=dataset_name):
    # If not, create the dataset.
    dataset = client.create_dataset(dataset_name=dataset_name)
    # Populate the dataset with our examples.
    # `inputs` are extracted from the 'question' key, `outputs` from the 'response' key.
    client.create_examples(
        inputs=[{"question": ex["question"]} for ex in examples],
        outputs=[{"response": ex["response"]} for ex in examples],
        dataset_id=dataset.id # Associate examples with the created dataset.
    )

#### 2. Definir la lógica de la aplicación a evaluar

Necesitamos una función que encapsule la ejecución de nuestro LangGraph y devuelva la respuesta final en un formato adecuado para la evaluación. Dado que nuestro grafo incluye una interrupción `human_input` para la verificación del cliente, debemos manejar esto en nuestra función de evaluación. Simularemos que el usuario proporciona su ID de cliente (por ejemplo, "Mi ID de cliente es 10") para permitir que el grafo continúe más allá del paso de verificación.


In [None]:
import uuid # For generating unique thread IDs
from langgraph.types import Command # For resuming graph execution after an interrupt

graph = multi_agent_final_graph # Reference our complete, final multi-agent graph

async def run_graph(inputs: dict):
    """Run graph and track the final response for evaluation."""
    # Creating a unique thread ID for each evaluation run to ensure isolation.
    thread_id = uuid.uuid4()
    # Configuration for the graph invocation. User ID '10' is used here for a specific test scenario.
    configuration = {"configurable": {"thread_id": thread_id, "user_id" : "10"}}

    # Invoke the graph with the initial user question.
    # This invocation will likely hit the `human_input` node and interrupt if `customer_id` is not present.
    result = await graph.ainvoke({"messages": [
        { "role": "user", "content": inputs['question']}]}, config = configuration)
    
    # After the first invocation, if an interrupt occurred, resume it.
    # We explicitly provide a (simulated) customer ID to pass the verification step.
    # The `thread_id` in the config must match the initial invocation to resume the correct state.
    result = await graph.ainvoke(Command(resume="My customer ID is 10"), config={"configurable": {"thread_id": thread_id, "user_id" : "10"}})
    
    # Return the content of the last message in the conversation as the final response.
    # This is the output that will be evaluated against the dataset's `response`.
    return {"response": result['messages'][-1].content}

#### 3. Define the Evaluator for Final Response

Evaluators are functions that take the application's output, the original input, and sometimes the reference output, and return a score or feedback. We can use pre-built evaluators or define our own.

##### Using a Pre-built Evaluator (OpenEvals)
LangSmith integrates with `openevals`, a library providing ready-to-use LLM-as-a-judge evaluators. The `create_llm_as_judge` function sets up an evaluator that uses an LLM to score responses based on a given prompt (e.g., `CORRECTNESS_PROMPT`).

##### Defining a Custom Evaluator (LLM-as-a-Judge)
For more specific or nuanced evaluation criteria, you can define your own LLM-as-a-judge evaluator. This involves:
1.  **Custom Instructions**: A detailed prompt for the LLM that explains its role as a grader and the criteria for scoring.
2.  **Structured Output Schema**: A Pydantic `BaseModel` or `TypedDict` to enforce the format of the LLM's grading output (e.g., `is_correct: bool`, `reasoning: str`).
3.  **Evaluator Function**: A Python function that calls the structured LLM with the prompt, inputs, and reference outputs, then extracts the relevant score.

This approach gives you maximum flexibility over how your agent's responses are judged.

In [None]:
from openevals.llm import create_llm_as_judge # Import the utility to create LLM-as-judge evaluators
from openevals.prompts import CORRECTNESS_PROMPT # Import a pre-defined prompt for correctness evaluation

# Create an LLM-as-judge evaluator for correctness using the pre-built `CORRECTNESS_PROMPT`.
# `feedback_key="correctness"` sets the name of the score reported in LangSmith.
# `judge=model` specifies which LLM to use for judging.
correctness_evaluator = create_llm_as_judge(
    prompt=CORRECTNESS_PROMPT,
    feedback_key="correctness",
    judge=llm,
)

# Print the content of the pre-defined correctness prompt to understand its instructions.
print(CORRECTNESS_PROMPT)

In [None]:
# Custom definition of LLM-as-judge instructions.
    # This prompt provides specific guidelines for the LLM acting as a grader, focusing on factual accuracy.
grader_instructions = """You are a teacher grading a quiz.

You will be given a QUESTION, the GROUND TRUTH (correct) RESPONSE, and the STUDENT RESPONSE.

Here is the grade criteria to follow:
(1) Grade the student responses based ONLY on their factual accuracy relative to the ground truth answer.
(2) Ensure that the student response does not contain any conflicting statements.
(3) It is OK if the student response contains more information than the ground truth response, as long as it is factually accurate relative to the ground truth response.

Correctness:
True means that the student's response meets all of the criteria.
False means that the student's response does not meet all of the criteria.

Explain your reasoning in a step-by-step manner to ensure your reasoning and conclusion are correct."""

# Define the schema for the LLM-as-judge's output using TypedDict.
# This ensures the grading output is structured with a reasoning and a boolean correctness score.
class Grade(TypedDict):
    """Compare the expected and actual answers and grade the actual answer."""
    reasoning: Annotated[str, ..., "Explain your reasoning for whether the actual response is correct or not."]
    is_correct: Annotated[bool, ..., "True if the student response is mostly or exactly correct, otherwise False."]

# Configure the judge LLM to output structured data according to the `Grade` schema.
# `method="json_schema"` ensures JSON-based structured output, `strict=True` enforces strict adherence.
grader_llm = llm.with_structured_output(Grade, method="json_schema", strict=True)

# Define the custom evaluator function `final_answer_correct`.
# This function takes inputs, outputs (from our `run_graph`), and reference outputs (from the dataset).
async def final_answer_correct(inputs: dict, outputs: dict, reference_outputs: dict) -> bool:
    """Evaluate if the final response is equivalent to reference response."""
    # Construct the user prompt for the grader LLM, combining the question, ground truth, and student response.
    user = f"""QUESTION: {inputs['question']}
    GROUND TRUTH RESPONSE: {reference_outputs['response']}
    STUDENT RESPONSE: {outputs['response']}"""

    # Invoke the structured grader LLM with the system instructions and the user prompt.
    # Awaiting the async call as LLM invocations are typically async.
    grade = await grader_llm.ainvoke([{"role": "system", "content": grader_instructions}, {"role": "user", "content": user}])
    
    # Return the `is_correct` boolean from the grader's output as the evaluation score.
    return grade["is_correct"]

#### 4. Run the Final Response Evaluation

Now we're ready to run our evaluation job using the LangSmith client. The `aevaluate` method orchestrates the entire process:

1.  It fetches inputs from the specified `data` (our dataset).
2.  For each input, it calls our `run_graph` function.
3.  It then passes the `run_graph`'s output, along with the original input and dataset's reference output, to each defined `evaluator`.
4.  All results are logged and visible in your LangSmith project, providing a comprehensive report of your agent's performance.

Key parameters:
*   `run_graph`: Our target function to be evaluated.
*   `data`: The name of the dataset created in LangSmith.
*   `evaluators`: A list of evaluator functions to apply.
*   `experiment_prefix`: A prefix for the experiment name in LangSmith, useful for organizing runs.
*   `num_repetitions`: How many times to run each example. (For more robust results, typically >1)
*   `max_concurrency`: The maximum number of parallel runs (useful for speeding up evaluation).

Upon completion, you can navigate to your LangSmith project to view detailed traces and aggregated scores.

In [None]:
# Run the evaluation job asynchronously using the LangSmith client.
    # This will execute `run_graph` for each example in the dataset and apply the specified evaluators.
experiment_results = await client.aevaluate(
    run_graph,                        # The asynchronous function that runs our graph and returns its output
    data=dataset_name,                # The name of the LangSmith dataset to use for inputs and references
    evaluators=[final_answer_correct, correctness_evaluator], # List of evaluator functions to apply
    experiment_prefix="agent-Llama-e2e", # A prefix for the experiment name in LangSmith for better organization
    num_repetitions=1,                # Number of times to run each example (1 for quick testing)
    max_concurrency=5,                # Maximum concurrent runs to optimize evaluation speed
)

### Evaluating a Single Step of the Agent

While end-to-end evaluation is important, it can be challenging to debug. Sometimes, an agent might fail overall, but you don't know *which* specific decision or action led to the failure. **Single-step evaluation** allows you to test individual components or critical decisions within your agent's workflow in isolation, similar to unit testing in software development.

For our multi-agent system, a critical single step is the **supervisor's routing decision**: does it correctly send the query to the music agent or the invoice agent?

*   **Input**: The specific input to that single step (e.g., the user message that the supervisor receives).
*   **Output**: The direct output of that step (e.g., the name of the agent the supervisor chose to route to).

![Single Step Evaluation](images/single-step.png) 

#### 1. Create a Dataset for Single-Step Evaluation

For single-step evaluation, our dataset's `inputs` will be the user message, and the `outputs` will be the *expected routing decision* (i.e., the name of the sub-agent that should be activated).

In [None]:
# Define examples for single-step evaluation, focusing on the supervisor's routing.
    # `messages`: The input to the supervisor (the user's query).
    # `route`: The expected output of the supervisor (the name of the sub-agent it should route to).
examples = [
    {
        "messages": "My customer ID is 1. What's my most recent purchase? and What albums does the catalog have by U2?", 
        "route": 'transfer_to_invoice_information_subagent' # Expects initial routing to invoice agent
    },
    {
        "messages": "What songs do you have by U2?", 
        "route": 'transfer_to_music_catalog_subagent' # Expects routing to music agent
    },
    {
        "messages": "My name is Aaron Mitchell. My number associated with my account is +1 (204) 452-6452. I am trying to find the invoice number for my most recent song purchase. Could you help me with it?", 
        "route": 'transfer_to_invoice_information_subagent' # Expects routing to invoice agent
    },
    {
        "messages": "Who recorded Wish You Were Here again? What other albums by them do you have?", 
        "route": 'transfer_to_music_catalog_subagent' # Expects routing to music agent
    }
]


dataset_name = "LangGraph 101 Multi-Agent: Single-Step" # Name for this specific dataset.
# Check and create the dataset in LangSmith if it doesn't already exist.
if not client.has_dataset(dataset_name=dataset_name):
    dataset = client.create_dataset(dataset_name=dataset_name)
    client.create_examples(
        inputs = [{"messages": ex["messages"]} for ex in examples],
        outputs = [{"route": ex["route"]} for ex in examples],
        dataset_id=dataset.id
    )

#### 2. Define the Application Logic to Evaluate (Single Step)

To evaluate only the supervisor's routing, we need to run our `supervisor_prebuilt` graph but *interrupt* its execution immediately after the supervisor makes its routing decision, before any sub-agents are actually invoked. LangGraph's `interrupt_before` argument is perfect for this.

The `interrupt_before` parameter tells the graph to pause execution right before entering the specified nodes. In this case, we want to pause before `music_catalog_subagent` or `invoice_information_subagent` are invoked. This allows us to inspect the state and determine what the supervisor decided to do.

In [None]:
async def run_supervisor_routing(inputs: dict):
    """Runs the supervisor graph up to the point of routing and returns the chosen route."""
    # Invoke the `supervisor_prebuilt` graph.
    # `interrupt_before` specifies that the graph should pause execution just before entering 
    # either the music or invoice sub-agent nodes. This captures the routing decision.
    # A dummy `user_id` and `thread_id` are provided for configuration, as the supervisor itself doesn't need real verification here.
    result = await supervisor_prebuilt.ainvoke(
        {"messages": [HumanMessage(content=inputs['messages'])]},
        interrupt_before=["music_catalog_subagent", "invoice_information_subagent"],
        config={"configurable": {"thread_id": uuid.uuid4(), "user_id" : "10"}}
    )
    
    # The name of the last message (which is typically the `tool_call` or `message` that represents the routing decision)
    # should correspond to the name of the next chosen sub-agent. This is how the supervisor indicates its routing.
    return {"route": result["messages"][-1].name}

#### 3. Define the Evaluator for Single Step

For this single-step evaluation, a simple exact match evaluator is sufficient. It will check if the `route` output by our `run_supervisor_routing` function exactly matches the `route` defined in our dataset's `reference_outputs`.

In [None]:
def correct(outputs: dict, reference_outputs: dict) -> bool:
    """Evaluator function to check if the agent chose the correct route."""
    # Compares the 'route' returned by our `run_supervisor_routing` function
    # with the 'route' specified in the ground truth dataset.
    return outputs['route'] == reference_outputs["route"]

#### 4. Run the Single Step Evaluation

Now we execute the single-step evaluation using `client.aevaluate`, similar to the final response evaluation, but with our specialized function and dataset.

In [None]:
experiment_results = await client.aevaluate(
    run_supervisor_routing,           # Our function that runs only the supervisor routing step
    data=dataset_name,                # The dataset specifically for single-step routing evaluation
    evaluators=[correct],
    experiment_prefix="agent-Llama-singlestep",
    max_concurrency=5,
)

### Evaluating the Trajectory of the Agent

**Trajectory evaluation** takes a deeper look into the agent's internal workings. Instead of just assessing the final output or a single step, it evaluates the entire sequence of steps (the "trajectory") an agent takes to arrive at its answer. This is particularly useful for complex agents where the *process* of reaching a solution is as important as the solution itself (e.g., ensuring a specific set of tools are used in a particular order).

*   **Input**: The initial user query to the overall agent.
*   **Output**: A detailed list of all nodes/steps visited during the agent's execution.

![Trajectory Evaluation](images/trajectory.png) 


#### 1. Create a Dataset for Trajectory Evaluation

For trajectory evaluation, our dataset will contain the user `question` as input and an ordered list of `trajectory` (the expected sequence of node names) as the ground truth output.

In [None]:
# Define examples for trajectory evaluation.
    # `question`: The user's input.
    # `trajectory`: The expected ordered list of node names visited by the graph.
examples = [
    {
        "question": "My customer ID is 1. What's my most recent purchase? and What albums does the catalog have by U2?",
        "trajectory": ["verify_info", "load_memory", "supervisor", "create_memory"], # Expected path when customer ID is provided
    },
    {
        "question": "What songs do you have by U2?",
        "trajectory": ["verify_info", "human_input", "verify_info", "load_memory", "supervisor", "create_memory"], # Expected path with initial verification and resume
    },
    {
        "question": "My name is Aaron Mitchell. My number associated with my account is +1 (204) 452-6452. I am trying to find the invoice number for my most recent song purchase. Could you help me with it?",
        "trajectory": ["verify_info", "load_memory", "supervisor", "create_memory"], # Expected path when customer ID is provided implicitly
    },
    {
        "question": "Who recorded Wish You Were Here again? What other albums by them do you have?",
        "trajectory": ["verify_info", "human_input", "verify_info", "load_memory", "supervisor", "create_memory"], # Another example with initial verification and resume
    },
]

dataset_name = "LangGraph 101 Multi-Agent: Trajectory Eval" # Name for this dataset.

# Check and create the dataset in LangSmith if it doesn't already exist.
if not client.has_dataset(dataset_name=dataset_name):
    dataset = client.create_dataset(dataset_name=dataset_name)
    client.create_examples(
        inputs=[{"question": ex["question"]} for ex in examples],
        outputs=[{"trajectory": ex["trajectory"]} for ex in examples],
        dataset_id=dataset.id
    )

#### 2. Define the Application Logic to Evaluate (Trajectory)

To capture the full trajectory, we will use `graph.astream(stream_mode="debug")`. The `debug` stream mode yields detailed `chunk` objects for each step in the graph, including the `task` chunks which contain the name of the node being executed. We'll collect these node names into a list to form the actual trajectory.

Similar to the final response evaluation, we need to handle the `human_input` interrupt by resuming the graph with dummy input.

In [None]:
graph = multi_agent_final_graph # Reference our complete multi-agent graph

async def run_graph(inputs: dict) -> dict:
    """Run graph and track the trajectory it takes along with the final response."""
    trajectory = [] # List to store the names of nodes visited
    thread_id = uuid.uuid4() # Unique ID for the current thread
    # Configuration for the graph invocation, including a dummy user_id for verification step.
    configuration = {"configurable": {"thread_id": thread_id, "user_id" : "10"}}

    # First, run the graph for the initial question. `astream` allows us to iterate through chunks.
    # `stream_mode="debug"` provides detailed information about each step, including node names.
    async for chunk in graph.astream({"messages": [
            {
                "role": "user",
                "content": inputs['question'],
            }
        ]}, config = configuration, stream_mode="debug"):
        # Check if the chunk type is 'task' (indicating a node execution).
        if chunk['type'] == 'task':
            # Append the name of the executed node to our trajectory list.
            trajectory.append(chunk['payload']['name'])

    # If the graph paused for human input, resume it with a dummy customer ID.
    async for chunk in graph.astream(Command(resume="My customer ID is 10"), config = configuration, stream_mode="debug"):
        if chunk['type'] == 'task':
            trajectory.append(chunk['payload']['name'])
            
    # Return the collected trajectory list.
    return {"trajectory": trajectory}

#### 3. Define the Evaluator for Trajectory

For trajectory evaluation, we'll define two custom evaluators:

1.  **`evaluate_exact_match`**: This simple evaluator checks if the `actual trajectory` exactly matches the `expected trajectory` from the dataset. It provides a binary score (True/False).
2.  **`evaluate_extra_steps`**: This more sophisticated evaluator counts the number of "unmatched" or "extra" steps taken by the agent that were not present in the reference trajectory. This can indicate inefficiency or unexpected behavior.

In [None]:
def evaluate_exact_match(outputs: dict, reference_outputs: dict):
    """Evaluate whether the trajectory exactly matches the expected output"""
    return {
        "key": "exact_match", # The key for this evaluation metric in LangSmith
        "score": outputs["trajectory"] == reference_outputs["trajectory"] # True if trajectories are identical
    }

def evaluate_extra_steps(outputs: dict, reference_outputs: dict) -> dict:
    """Evaluate the number of unmatched steps in the agent's output trajectory compared to the reference."""
    i = j = 0 # Pointers for reference trajectory (i) and actual output trajectory (j)
    unmatched_steps = 0 # Counter for steps in output not found in reference sequence

    # Iterate through both trajectories to find matches and count mismatches.
    while i < len(reference_outputs['trajectory']) and j < len(outputs['trajectory']):
        if reference_outputs['trajectory'][i] == outputs['trajectory'][j]:
            i += 1  # Match found, move to the next step in reference trajectory
        else:
            unmatched_steps += 1  # Step in output is not the expected one, count as unmatched
        j += 1  # Always move to the next step in outputs trajectory

    # After the loop, if there are remaining steps in the output trajectory,
    # they are all considered unmatched (extra steps taken by the agent).
    unmatched_steps += len(outputs['trajectory']) - j

    return {
        "key": "unmatched_steps", # The key for this evaluation metric
        "score": unmatched_steps, # The count of unmatched steps
    }

#### 4. Run the Trajectory Evaluation

Finally, we run the trajectory evaluation using our specialized `run_graph` function and the two custom trajectory evaluators. The results will be uploaded to LangSmith, where you can analyze the sequence of node executions and compare them against your expected paths.

In [None]:
experiment_results = await client.aevaluate(
    run_graph,                        # Our function that collects the full trajectory
    data=dataset_name,                # The dataset specifically for trajectory evaluation
    evaluators=[evaluate_extra_steps, evaluate_exact_match], # Our custom trajectory evaluators
    experiment_prefix="agent-Llama-trajectory", # Prefix for the experiment name in LangSmith
    num_repetitions=1,
    max_concurrency=4,
)