### ¿Qué es la Generación Aumentada con Caché (CAG)?

CAG (Cache-Augmented Generation) es un enfoque sin recuperación que omite el paso habitual de consultar fuentes de conocimiento externas en tiempo de inferencia. En su lugar, precarga documentos relevantes en la ventana de contexto extendida del LLM, precalcula la caché de pares clave-valor (KV) del modelo, y reutiliza esto durante la inferencia, de modo que el modelo puede generar respuestas sin pasos adicionales de recuperación.

In [None]:
# Importamos las librerías necesarias para configuración del entorno
import os  # Para manejar variables de entorno del sistema operativo
from dotenv import load_dotenv  # Para cargar variables desde archivo .env

# Cargamos las variables de entorno desde el archivo .env
load_dotenv()

# Configuramos la API key de OpenAI desde las variables de entorno
# Esta clave es necesaria para autenticarnos con el servicio de OpenAI
os.environ["OPENAI_API_KEY"] = os.getenv("OPENAI_API_KEY")

# Importamos la función para inicializar modelos de chat de LangChain
from langchain.chat_models import init_chat_model

# Inicializamos el modelo de lenguaje GPT-4o-mini de OpenAI
# Este es un modelo más económico y rápido que GPT-4, ideal para demostraciones
llm = init_chat_model("openai:gpt-4o-mini")

# Mostramos la configuración del modelo para verificar que se inicializó correctamente
llm

In [None]:
### Variable de Caché

# Creamos un diccionario vacío para almacenar las respuestas cacheadas
# La clave será la consulta del usuario y el valor será la respuesta del modelo
# Este es un caché simple en memoria que persiste mientras el notebook está en ejecución
Model_Cache = {}

In [None]:
# Importamos el módulo time para medir tiempos de ejecución
import time

def cache_model(query):
    """
    Función que implementa un sistema de caché simple para el modelo.
    Si la consulta ya existe en caché, retorna la respuesta guardada.
    Si no existe, ejecuta el modelo y guarda el resultado en caché.
    """
    # Registramos el tiempo de inicio
    start_time = time.time()
    
    # Verificamos si la consulta ya existe en el caché
    if Model_Cache.get(query):
        # Si hay un cache hit (acierto), imprimimos un mensaje
        print("**ACIERTO DE CACHÉ**")
        
        # Calculamos el tiempo transcurrido (será casi 0 para cache hits)
        end_time = time.time()
        elapsed_time = end_time - start_time
        
        # Mostramos el tiempo de ejecución
        print(f"TIEMPO DE EJECUCIÓN: {elapsed_time:.2f} segundos")
        
        # Retornamos la respuesta del caché sin llamar al modelo
        return Model_Cache.get(query)
    else:
        # Si no hay cache hit (fallo de caché), debemos ejecutar el modelo
        print("***FALLO DE CACHÉ – EJECUTANDO MODELO***")
        
        # Registramos el tiempo de inicio de la llamada al modelo
        start_time = time.time()
        
        # Invocamos el modelo LLM con la consulta
        # Esta es la operación costosa que queremos evitar con el caché
        response = llm.invoke(query)
        
        # Calculamos el tiempo que tomó la llamada al modelo
        end_time = time.time()
        elapsed = end_time - start_time
        
        # Mostramos el tiempo de ejecución (será varios segundos)
        print(f"TIEMPO DE EJECUCIÓN: {elapsed:.2f} segundos")
        
        # Guardamos la respuesta en el caché para futuras consultas idénticas
        Model_Cache[query] = response
        
        # Retornamos la respuesta
        return response

In [None]:
# Primera llamada con la consulta "hi"
# Esta será un fallo de caché porque es la primera vez que hacemos esta pregunta
response = cache_model("hi")

# Mostramos la respuesta completa del modelo
response

In [None]:
# Inspeccionamos el contenido del diccionario de caché
# Después de la primera llamada, ahora contiene una entrada para "hi"
Model_Cache

In [None]:
# Segunda llamada con la misma consulta "hi"
# Esta vez será un acierto de caché porque ya existe la respuesta guardada
# Notarás que el tiempo de ejecución es prácticamente 0 segundos
response = cache_model("hi")

# Mostramos la respuesta (será exactamente la misma que la primera vez)
response

In [None]:
# Probamos con una consulta más compleja que requiere una respuesta extensa
query = "¿puedes darme 500 palabras sobre langgraph?"

# Primera vez con esta consulta = fallo de caché
# El modelo tomará varios segundos para generar 500 palabras
response = cache_model(query)

# Imprimimos la respuesta generada
print(response)

In [None]:
# Repetimos la misma consulta exacta
query = "¿puedes darme 500 palabras sobre langgraph?"

# Esta vez será un acierto de caché
# La respuesta será instantánea (~0.00 segundos) comparado con los 17+ segundos anteriores
# Esto demuestra el beneficio del caché: ahorro de tiempo y costo de API
response = cache_model(query)

# Imprimimos la respuesta (idéntica a la anterior pero recuperada del caché)
print(response)

In [None]:
# Probamos con una consulta MUY SIMILAR pero no idéntica
# Nota: eliminamos "can you" del inicio de la pregunta
query = "dame 500 palabras sobre langgraph?"

# Esto será un FALLO de caché porque el string es diferente
# Este es el problema del caché simple basado en diccionarios:
# - Solo funciona con coincidencias exactas de texto
# - No entiende similitud semántica
# - "can you give me" vs "give me" se tratan como consultas completamente diferentes
response = cache_model(query)

# El modelo generará una nueva respuesta (tomará ~12+ segundos)
print(response)

### CAG Avanzado

En esta sección implementamos un sistema de caché semántico más sofisticado que utiliza embeddings vectoriales para detectar preguntas similares, no solo idénticas.

In [None]:
# Importaciones para el sistema CAG avanzado

# Importación para compatibilidad con anotaciones de tipo
from __future__ import annotations

# Tipos para definir estructuras de datos con type hints
from typing import TypedDict, List, Optional

# Módulo time para gestión de TTL (Time To Live) del caché
import time

# ---- LangGraph / LangChain ----
# StateGraph: para construir flujos de trabajo basados en grafos de estado
# END: constante que marca el final del grafo
from langgraph.graph import StateGraph, END

# MemorySaver: gestor de checkpoints para persistencia de estado
from langgraph.checkpoint.memory import MemorySaver

# Document: clase que representa un documento con contenido y metadatos
from langchain_core.documents import Document

# ChatOpenAI: cliente para modelos de chat de OpenAI
from langchain_openai import ChatOpenAI

# HuggingFaceEmbeddings: para generar embeddings usando modelos de Hugging Face
from langchain.embeddings import HuggingFaceEmbeddings

# ---- FAISS vector stores ----
# FAISS: biblioteca de Facebook para búsqueda de similitud vectorial eficiente
import faiss

# FAISS wrapper de LangChain con funcionalidad adicional
from langchain_community.vectorstores import FAISS

# InMemoryDocstore: almacén de documentos en memoria
from langchain_community.docstore.in_memory import InMemoryDocstore

In [None]:
# ================= CONFIGURACIÓN =================

# Modelo de embeddings: usamos un modelo compacto de sentence-transformers
# Este modelo genera vectores de 384 dimensiones
EMBED_MODEL = "sentence-transformers/all-MiniLM-L6-v2"  

# Dimensionalidad de los vectores de embedding (debe coincidir con el modelo)
VECTOR_DIM = 384

# Modelo LLM a utilizar: GPT-4o-mini es rápido y económico
LLM_MODEL = "gpt-4o-mini"

# Temperatura del LLM: 0 = determinístico, respuestas consistentes
LLM_TEMPERATURE = 0

# Número de documentos a recuperar del vector store RAG
RETRIEVE_TOP_K = 4

# Número de resultados a revisar del caché semántico
CACHE_TOP_K = 3

# Umbral de distancia para considerar un acierto de caché
# Valores más bajos = mayor similitud requerida
# FAISS usa distancia L2: valores más bajos indican mayor similitud
CACHE_DISTANCE_THRESHOLD = 0.45

# TTL (Time To Live) opcional para entradas de caché en segundos
# 0 = deshabilitado (el caché nunca expira)
CACHE_TTL_SEC = 0

In [None]:
# ================= ESTADO DEL GRAFO =================

class RAGState(TypedDict):
    """
    Define el estado que se pasa entre nodos del grafo LangGraph.
    Cada campo representa una parte del flujo de trabajo RAG con caché.
    """
    # Pregunta original del usuario (sin modificar)
    question: str
    
    # Pregunta normalizada (minúsculas, limpia) para búsqueda en caché
    normalized_question: str
    
    # Lista de documentos recuperados del vector store
    context_docs: List[Document]
    
    # Respuesta generada por el LLM o recuperada del caché
    answer: Optional[str]
    
    # Lista de citas que referencian las fuentes utilizadas
    citations: List[str]
    
    # Bandera que indica si la respuesta provino del caché
    cache_hit: bool

In [None]:
# ============== VARIABLES GLOBALES ===================

# Importamos HuggingFaceEmbeddings para generar embeddings
from langchain_huggingface import HuggingFaceEmbeddings

# Inicializamos el modelo de embeddings
# Este modelo se descargará la primera vez que se ejecute (~90MB)
# Convierte texto en vectores de 384 dimensiones que capturan significado semántico
EMBED = HuggingFaceEmbeddings(model_name=EMBED_MODEL)

In [None]:
# ----- CACHÉ DE PREGUNTAS/RESPUESTAS (INICIALIZACIÓN VACÍA) -----

# Creamos un índice FAISS con distancia L2 (Euclidiana)
# L2: menor distancia = mayor similitud
# El índice está vacío inicialmente (se llenará con el uso)
qa_index = faiss.IndexFlatL2(VECTOR_DIM)

# Envolvemos el índice FAISS en el wrapper de LangChain para funcionalidad adicional
QA_CACHE = FAISS(
    # Función de embedding para convertir texto a vectores
    embedding_function=EMBED,
    
    # El índice FAISS que acabamos de crear
    index=qa_index,
    
    # Docstore vacío que almacenará los metadatos de las consultas
    docstore=InMemoryDocstore({}),
    
    # Mapeo vacío de IDs de índice a IDs de docstore
    index_to_docstore_id={}
)

In [None]:
# Verificamos que el caché QA se inicializó correctamente
# Mostrará el objeto FAISS con su configuración
QA_CACHE

In [None]:
# ----- ALMACÉN RAG (SOLO PARA DEMOSTRACIÓN) -----

# Creamos un vector store FAISS con documentos de ejemplo sobre LangGraph
# En producción, esto contendría tu base de conocimiento completa
RAG_STORE = FAISS.from_texts(
    # Lista de textos de ejemplo que servirán como nuestra base de conocimiento
    texts=[
        "LangGraph te permite componer flujos de trabajo de LLM con estado como grafos.",
        "En LangGraph, los nodos pueden ser cacheados; el caché de nodos memoriza salidas indexadas por entradas durante un TTL.",
        "Retrieval-Augmented Generation (RAG) recupera contexto externo y lo inyecta en prompts.",
        "El caché semántico reutiliza respuestas previas cuando nuevas preguntas son semánticamente similares."
    ],
    # Usamos el mismo modelo de embeddings para mantener consistencia
    embedding=EMBED,
)

In [None]:
# Inicializamos el modelo de lenguaje OpenAI con la configuración definida
LLM = ChatOpenAI(
    model=LLM_MODEL,           # Modelo a usar (gpt-4o-mini)
    temperature=LLM_TEMPERATURE # Temperatura 0 = respuestas determinísticas
)

In [None]:
# Verificamos la configuración del LLM
LLM

In [None]:
# ================ FUNCIONES DE NODOS DEL GRAFO ===================

def normalize_query(state: RAGState) -> RAGState:
    """
    Normaliza la pregunta del usuario para mejorar la búsqueda en caché.
    Convierte a minúsculas y elimina espacios en blanco innecesarios.
    """
    # Obtenemos la pregunta del estado, o string vacío si no existe
    q = (state["question"] or "").strip()
    
    # Normalizamos a minúsculas para búsqueda case-insensitive
    state["normalized_question"] = q.lower()
    
    return state


def semantic_cache_lookup(state: RAGState) -> RAGState:
    """
    Busca en el caché semántico si existe una pregunta similar previa.
    Usa embeddings vectoriales y similitud por distancia L2.
    """
    # Obtenemos la pregunta normalizada
    q = state["normalized_question"]
    
    # Por defecto, asumimos que no hay acierto de caché
    state["cache_hit"] = False

    # Si la pregunta está vacía, no hay nada que buscar
    if not q:
        return state

    # ✅ Protección: FAISS falla si el índice está vacío y pedimos k>0
    # Verificamos que el índice existe y tiene al menos un vector
    if getattr(QA_CACHE, "index", None) is None or QA_CACHE.index.ntotal == 0:
        return state

    # Buscamos las k preguntas más similares en el caché
    # similarity_search_with_score retorna (Document, distance) con menor=mejor
    hits = QA_CACHE.similarity_search_with_score(q, k=CACHE_TOP_K)
    
    # Si no hay resultados, retornamos sin cambios
    if not hits:
        return state

    # Obtenemos el mejor resultado (menor distancia = mayor similitud)
    best_doc, dist = hits[0]

    # Verificación opcional de TTL (Time To Live)
    if CACHE_TTL_SEC > 0:
        # Obtenemos el timestamp de cuando se guardó en caché
        ts = best_doc.metadata.get("ts")
        
        # Si el timestamp no existe o la entrada expiró, ignoramos el caché
        if ts is None or (time.time() - float(ts)) > CACHE_TTL_SEC:
            return state

    # Verificamos si la distancia está por debajo del umbral
    # Para L2: menor distancia = mayor similitud
    if dist <= CACHE_DISTANCE_THRESHOLD:
        # Extraemos la respuesta guardada en los metadatos
        cached_answer = best_doc.metadata.get("answer")
        
        if cached_answer:
            # ¡Acierto de caché! Usamos la respuesta guardada
            state["answer"] = cached_answer
            state["citations"] = ["(cache)"]  # Indicamos que vino del caché
            state["cache_hit"] = True

    return state


def respond_from_cache(state: RAGState) -> RAGState:
    """
    Nodo pasante que simplemente retorna el estado.
    La respuesta ya fue establecida por semantic_cache_lookup.
    """
    return state


def retrieve(state: RAGState) -> RAGState:
    """
    Recupera documentos relevantes del vector store RAG.
    Solo se ejecuta si no hubo acierto de caché.
    """
    # Obtenemos la pregunta normalizada
    q = state["normalized_question"]
    
    # Buscamos los k documentos más similares en el RAG store
    docs = RAG_STORE.similarity_search(q, k=RETRIEVE_TOP_K)
    
    # Guardamos los documentos recuperados en el estado
    state["context_docs"] = docs
    
    return state


def generate(state: RAGState) -> RAGState:
    """
    Genera una respuesta usando el LLM con el contexto recuperado.
    Implementa el patrón RAG: usa documentos como contexto para la respuesta.
    """
    # Obtenemos la pregunta original (sin normalizar)
    q = state["question"]
    
    # Obtenemos los documentos de contexto recuperados
    docs = state.get("context_docs", [])
    
    # Formateamos el contexto concatenando todos los documentos
    # Añadimos marcadores [doc-i] para referencias
    ctx = "\n\n".join([f"[doc-{i}] {d.page_content}" for i, d in enumerate(docs, start=1)])

    # Mensaje del sistema que define el rol y comportamiento del asistente
    system = (
        "Eres un asistente RAG preciso. Usa el contexto cuando sea útil. "
        "Cita con marcadores [doc-i] si usas un hecho del contexto."
    )
    
    # Mensaje del usuario con la pregunta y el contexto
    user = f"Pregunta: {q}\n\nContexto:\n{ctx}\n\nEscribe una respuesta concisa con citas."

    # Invocamos el LLM con los mensajes formateados
    resp = LLM.invoke([
        {"role": "system", "content": system},
        {"role": "user", "content": user}
    ])
    
    # Guardamos la respuesta generada
    state["answer"] = resp.content
    
    # Creamos las citas para todos los documentos usados
    state["citations"] = [f"[doc-{i}]" for i in range(1, len(docs) + 1)]
    
    return state


def cache_write(state: RAGState) -> RAGState:
    """
    Escribe la pregunta y respuesta en el caché semántico.
    Permite reutilizar esta respuesta para preguntas similares futuras.
    """
    # Obtenemos la pregunta normalizada y la respuesta
    q = state["normalized_question"]
    a = state.get("answer")
    
    # Solo guardamos si tenemos tanto pregunta como respuesta
    if not q or not a:
        return state

    # Añadimos la pregunta al caché FAISS
    QA_CACHE.add_texts(
        # El texto es la pregunta (se convertirá a embedding)
        texts=[q],
        
        # Los metadatos contienen la respuesta y timestamp
        metadatas=[{
            "answer": a,           # La respuesta a cachear
            "ts": time.time(),     # Timestamp para TTL
        }]
    )
    
    return state

In [None]:
# ============== CONSTRUCCIÓN DEL GRAFO ==============

# Creamos un grafo de estado usando RAGState como esquema
graph = StateGraph(RAGState)

# Añadimos todos los nodos al grafo
# Cada nodo es una función que transforma el estado
graph.add_node("normalize_query", normalize_query)
graph.add_node("semantic_cache_lookup", semantic_cache_lookup)
graph.add_node("respond_from_cache", respond_from_cache)
graph.add_node("retrieve", retrieve)
graph.add_node("generate", generate)
graph.add_node("cache_write", cache_write)

# Definimos el punto de entrada: siempre empezamos normalizando la pregunta
graph.set_entry_point("normalize_query")

# Después de normalizar, buscamos en el caché
graph.add_edge("normalize_query", "semantic_cache_lookup")

# Función de decisión para el edge condicional
def _branch(state: RAGState) -> str:
    """
    Decide el siguiente nodo basándose en si hubo acierto de caché.
    - Si cache_hit = True -> responder desde caché (saltar RAG)
    - Si cache_hit = False -> ejecutar flujo RAG completo
    """
    return "respond_from_cache" if state.get("cache_hit") else "retrieve"

# Añadimos un edge condicional después de la búsqueda en caché
# La función _branch decide qué camino tomar
graph.add_conditional_edges(
    "semantic_cache_lookup",  # Desde este nodo
    _branch,                   # Usando esta función de decisión
    {
        # Mapeo de valores de retorno a nodos destino
        "respond_from_cache": "respond_from_cache",
        "retrieve": "retrieve"
    }
)

# Si respondemos desde caché, terminamos inmediatamente
graph.add_edge("respond_from_cache", END)

# Si no hay caché, vamos a retrieve -> generate -> cache_write -> END
graph.add_edge("retrieve", "generate")
graph.add_edge("generate", "cache_write")
graph.add_edge("cache_write", END)

# Creamos un gestor de memoria para persistencia de estado (opcional)
memory = MemorySaver()

# Compilamos el grafo con el checkpointer
app = graph.compile(checkpointer=memory)

# Mostramos el grafo compilado
app

In [None]:
# ================= DEMOSTRACIÓN ===================

if __name__ == "__main__":
    # Configuración del thread para memoria persistente
    thread_cfg = {"configurable": {"thread_id": "demo-user-1"}}

    # Primera pregunta: "¿Qué es LangGraph?"
    q1 = "¿Qué es LangGraph?"
    
    # Invocamos el grafo con la pregunta
    # Como es la primera vez, será un fallo de caché y ejecutará RAG completo
    out1 = app.invoke(
        {
            "question": q1,           # Pregunta del usuario
            "context_docs": [],       # Inicialmente vacío
            "citations": []           # Inicialmente vacío
        }, 
        thread_cfg
    )
    
    # Imprimimos los resultados
    print("Respuesta:", out1["answer"])
    print("Citas:", out1.get("citations"))
    print("¿Acierto de caché?:", out1.get("cache_hit"))

In [None]:
# Segunda pregunta: pregunta SIMILAR pero con diferente redacción
q1 = "¿Explica sobre LangGraph?"

# Esta pregunta es semánticamente similar a "¿Qué es LangGraph?"
# El sistema de caché semántico debería detectar la similitud
# y retornar la respuesta cacheada sin ejecutar RAG
out1 = app.invoke(
    {
        "question": q1, 
        "context_docs": [], 
        "citations": []
    }, 
    thread_cfg
)

# Imprimimos los resultados
print("Respuesta:", out1["answer"])
print("Citas:", out1.get("citations"))
# Esperamos cache_hit = True porque la pregunta es similar
print("¿Acierto de caché?:", out1.get("cache_hit"))

In [None]:
# Tercera pregunta: pregunta DIFERENTE sobre un aspecto específico
q1 = "¿Explica sobre los agentes de LangGraph?"

# Esta pregunta es sobre un tema relacionado pero diferente
# "agentes de LangGraph" es más específico que "LangGraph" en general
# Esperamos un fallo de caché porque la similitud no será suficientemente alta
out1 = app.invoke(
    {
        "question": q1, 
        "context_docs": [], 
        "citations": []
    }, 
    thread_cfg
)

# Imprimimos los resultados
print("Respuesta:", out1["answer"])
print("Citas:", out1.get("citations"))
# Esperamos cache_hit = False porque la pregunta es suficientemente diferente
print("¿Acierto de caché?:", out1.get("cache_hit"))

In [None]:
# Cuarta pregunta: reformulación de la pregunta anterior
q1 = "¿Explica sobre agentes en Langgraph?"

# Esta pregunta es casi idéntica a la anterior, solo cambia ligeramente la redacción:
# - "agentes de LangGraph" vs "agentes en Langgraph"
# El caché semántico debería reconocer esta similitud
# y retornar la respuesta cacheada de la pregunta anterior
out1 = app.invoke(
    {
        "question": q1, 
        "context_docs": [], 
        "citations": []
    }, 
    thread_cfg
)

# Imprimimos los resultados
print("Respuesta:", out1["answer"])
print("Citas:", out1.get("citations"))
# Esperamos cache_hit = True porque es semánticamente idéntica a la pregunta anterior
print("¿Acierto de caché?:", out1.get("cache_hit"))