### **Workflows y Pipelines de LLM + Agentes**

Este cuaderno cubre, con ejemplos ejecutables, los siguientes temas:

- Workflows y pipelines de LLM (preprocesamiento -> LLM -> postprocesamiento)
- Multi-step prompting y chain-of-actions
- Patrones de workflow: secuencial, branching, paralelo
- Integración con APIs y microservicios
- Observabilidad de workflows (logs de prompts, métricas, trazas)
- Agentes (agent loop: observar -> razonar -> actuar -> observar)
- Agentes reactivos, planificadores, con memoria
- Agentes + RAG + herramientas, tool routing
- Seguridad y guardrails en agentes (validación, límites, auditoría)
- Workflows multi-agente (Manager + Workers, critic/reviewer)
- Casos de uso: QA empresarial con RAG + agentes, generación de código + agente tester
- RAG y reducción de alucinaciones 
- Razonamiento sobre contexto recuperado
- Impacto de RAG en costo de inferencia y técnicas para abaratarlo

**Restricción clave**

Este cuaderno evita el uso de APIs propietarias con credenciales (por ejemplo, OpenAI).  
Las opciones de LLM son locales:

1) Ollama (servidor local + modelos descargados)  
2) Transformers en modo offline (`local_files_only=True`)  
3) MockLLM determinista (para que el cuaderno corra incluso sin modelos)

Nota: si deseas "offline estricto", pre-descarga modelos y ejecuta este cuaderno con `local_files_only=True` (ya está así por defecto).

In [None]:
# 0) Instalación (opcional)
# En entornos limpios (Docker/VM), descomenta e instala.

# !pip install -U "requests>=2.31" "pydantic>=2.6" "fastapi>=0.110" "uvicorn>=0.27" \
#     "prometheus-client>=0.20" "opentelemetry-api>=1.24" "opentelemetry-sdk>=1.24" \
#     "numpy>=1.26" "scikit-learn>=1.4"

# Opcional (embeddings densos):
# !pip install -U "sentence-transformers>=3.0" "torch>=2.2"

# Opcional (LLM local con Transformers, offline si el modelo ya existe):
# !pip install -U "transformers>=4.41" "sentencepiece>=0.2"

#### **Backends LLM locales (Ollama/Transformers offline/Mock)**

La interfaz `LLM.generate(prompt, system=None)` unifica backends.

- Ollama: requiere `ollama serve` y un modelo local (`ollama pull llama3.1:8b` u otro).
- Transformers: se ejecuta sin internet usando `local_files_only=True` (si el modelo ya existe en cache o volumen).
- MockLLM: simula tool-calling por reglas (útil para reproducibilidad).

In [None]:
import json
import os
import re
import time
from dataclasses import dataclass
from typing import Any, Callable, Dict, List, Optional, Tuple

import requests

class LLMBase:
    def generate(self, prompt: str, system: Optional[str] = None, max_new_tokens: int = 256) -> str:
        raise NotImplementedError()

class OllamaLLM(LLMBase):
    def __init__(self, model: str = "llama3.1:8b", base_url: str = "http://localhost:11434"):
        self.model = model
        self.base_url = base_url.rstrip("/")

    def _is_up(self) -> bool:
        try:
            r = requests.get(f"{self.base_url}/api/tags", timeout=1.0)
            return r.status_code == 200
        except Exception:
            return False

    def generate(self, prompt: str, system: Optional[str] = None, max_new_tokens: int = 256) -> str:
        payload = {
            "model": self.model,
            "prompt": prompt if system is None else f"{system}\n\n{prompt}",
            "stream": False,
            "options": {"num_predict": max_new_tokens},
        }
        r = requests.post(f"{self.base_url}/api/generate", json=payload, timeout=120.0)
        r.raise_for_status()
        return r.json().get("response", "")

import os
from typing import Optional
try:
    from transformers import AutoTokenizer, AutoModelForSeq2SeqLM, pipeline  # type: ignore
    _HAS_TRANSFORMERS = True
except Exception:
    AutoTokenizer = None  # type: ignore
    AutoModelForSeq2SeqLM = None  # type: ignore
    pipeline = None  # type: ignore
    _HAS_TRANSFORMERS = False
if _HAS_TRANSFORMERS:
    class TransformersLLM(LLMBase):
        """Backend local para Transformers.

        - Corre en CPU (device=-1)
        - Soporta modo offline estricto: no descarga nada si offline=True
        - Requiere que el modelo ya exista en cache/volumen local
        """

        def __init__(self, model_name_or_path: str = "google/flan-t5-small", offline: bool = True):
            if offline:
                os.environ.setdefault("TRANSFORMERS_OFFLINE", "1")
                os.environ.setdefault("HF_HUB_OFFLINE", "1")

            self.tokenizer = AutoTokenizer.from_pretrained(
                model_name_or_path,
                local_files_only=offline,
            )
            self.model = AutoModelForSeq2SeqLM.from_pretrained(
                model_name_or_path,
                local_files_only=offline,
            )

            # Importante: NO pasar local_files_only al pipeline.
            self.pipe = pipeline(
                task="text2text-generation",
                model=self.model,
                tokenizer=self.tokenizer,
                device=-1,
                framework="pt",
            )

        def generate(self, prompt: str, system: Optional[str] = None, max_new_tokens: int = 256) -> str:
            text = prompt if system is None else f"{system}\n\n{prompt}"
            out = self.pipe(text, max_new_tokens=max_new_tokens, do_sample=False)
            return out[0]["generated_text"]
else:
    TransformersLLM = None  # type: ignore


class MockLLM(LLMBase):
    """LLM de respaldo determinista.

    Objetivo:
    - Permitir que el cuaderno corra "Restart + Run All" aun sin Ollama/Transformers.
    - Soportar demos de tool-calling (una llamada -> observación -> respuesta final).
    - Soportar demos RAG (respuesta basada en CONTEXTO con citas mínimas).
    """

    def _extract_obs(self, text: str) -> Dict[str, Any]:
        # Intento 1: bloque "Observación actual (JSON): { ... }"
        m = re.search(r"Observación actual \(JSON\):\s*(\{.*?\})\s*\n\nHerramientas disponibles:", text, flags=re.S)
        if m:
            try:
                return json.loads(m.group(1))
            except Exception:
                pass
        # Intento 2: cualquier JSON que contenga last_tool/last_result
        m = re.search(r"(\{.*\"last_tool\".*\})", text, flags=re.S)
        if m:
            try:
                return json.loads(m.group(1))
            except Exception:
                pass
        return {}

    def generate(self, prompt: str, system: Optional[str] = None, max_new_tokens: int = 256) -> str:
        text = (system or "") + "\n" + prompt

        # 1) Si ya existe una observación con resultado de herramienta, finalizar.
        obs = self._extract_obs(text)
        if isinstance(obs, dict) and "last_tool" in obs and "last_result" in obs:
            tool = obs.get("last_tool")
            res = obs.get("last_result", {})
            # Normaliza errores de herramienta
            if isinstance(res, dict) and ("tool_error" in res or "error" in res):
                return json.dumps({"final": f"Hubo un problema al ejecutar {tool}: {res}"}, ensure_ascii=False)

            if tool == "flight_lookup" and isinstance(res, dict):
                options = res.get("options") or res.get("flights") or []
                if options:
                    # Resumen compacto
                    top = options[0]
                    return json.dumps({"final": f"Opciones de vuelo encontradas (ejemplo): {top}. Si quieres, pido más opciones."}, ensure_ascii=False)
                return json.dumps({"final": "No encontré opciones de vuelo con los parámetros dados."}, ensure_ascii=False)

            if tool == "hotel_lookup" and isinstance(res, dict):
                hotels = res.get("hotels") or res.get("options") or []
                if hotels:
                    return json.dumps({"final": f"Hoteles sugeridos (ejemplo): {hotels[:2]}."}, ensure_ascii=False)
                return json.dumps({"final": "No encontré hoteles con los parámetros dados."}, ensure_ascii=False)

            if tool == "calculator" and isinstance(res, dict):
                if "value" in res:
                    return json.dumps({"final": f"Resultado: {res['value']}"}, ensure_ascii=False)

            if tool == "rag_search" and isinstance(res, dict):
                passages = res.get("passages") or []
                if passages:
                    pid = passages[0].get("id", "doc")
                    return json.dumps({"final": f"Según el contexto recuperado [{pid}], la respuesta depende de la evidencia disponible. Si falta evidencia, debo decirlo explícitamente."}, ensure_ascii=False)

        # 2) Demos RAG: si el prompt incluye CONTEXTO con citas, generar respuesta corta con citas.
        if "CONTEXTO:" in text and re.search(r"\[[^\]]+\]", text):
            # toma hasta 2 ids de cita
            ids = re.findall(r"\[([^\]]+)\]", text)
            ids = [i for i in ids if i and not i.lower().startswith("http")]
            cite = ""
            if ids:
                cite = f" [{ids[0]}]" + (f" [{ids[1]}]" if len(ids) > 1 else "")
            return f"Respuesta basada en el contexto recuperado.{cite}"

        # 3) Tool selection (primera iteración)
        if re.search(r"\b(vuelo|flight)\b", text, re.IGNORECASE):
            return json.dumps({"tool": "flight_lookup", "args": {"departure_city": "LAX", "destination_city": "JFK", "num_options": 3}}, ensure_ascii=False)
        if re.search(r"\b(hotel|alojamiento)\b", text, re.IGNORECASE):
            return json.dumps({"tool": "hotel_lookup", "args": {"city": "San Francisco", "num_options": 3}}, ensure_ascii=False)
        if re.search(r"\b(calcula|calculator|sum|add|multiplica|divide)\b", text, re.IGNORECASE):
            return json.dumps({"tool": "calculator", "args": {"expression": "2*(7+5)"}}, ensure_ascii=False)

        return json.dumps({"final": "No tengo evidencia suficiente. Usa RAG o proporciona contexto."}, ensure_ascii=False)

def pick_llm() -> LLMBase:
    # 1) Ollama si está disponible
    try:
        ollama = OllamaLLM(model=os.environ.get("OLLAMA_MODEL", "llama3.1:8b"))
        if ollama._is_up():
            print("Usando backend: Ollama")
            return ollama
    except Exception:
        pass

    # 2) Transformers offline estricto (opcional)
    if TransformersLLM is not None:
        try:
            model = os.environ.get("LOCAL_TFM_MODEL", "google/flan-t5-small")
            llm = TransformersLLM(model_name_or_path=model, offline=True)
            print("Usando backend: Transformers (offline=True)")
            return llm
        except Exception as e:
            print("Transformers no disponible en modo offline. Error:", type(e).__name__, str(e)[:160])

    # 3) Mock
    print("Usando backend: MockLLM (determinista)")
    return MockLLM()

LLM = pick_llm()

#### **Primitivas de Workflow: preprocess -> LLM -> postprocess**

In [None]:
from typing import Protocol

class Step(Protocol):
    name: str
    def run(self, ctx: Dict[str, Any]) -> Dict[str, Any]: ...

@dataclass
class FunctionStep:
    name: str
    fn: Callable[[Dict[str, Any]], Dict[str, Any]]
    def run(self, ctx: Dict[str, Any]) -> Dict[str, Any]:
        return self.fn(ctx)

@dataclass
class Workflow:
    name: str
    steps: List[Step]
    def run(self, ctx: Dict[str, Any]) -> Dict[str, Any]:
        ctx = dict(ctx)
        ctx.setdefault("trace", [])
        for s in self.steps:
            t0 = time.time()
            ctx["trace"].append({"step": s.name, "event": "start", "ts": t0})
            ctx = s.run(ctx)
            ctx["trace"].append({"step": s.name, "event": "end", "ts": time.time(), "dt_ms": (time.time()-t0)*1000})
        return ctx

def basic_preprocess(text: str) -> str:
    text = text.strip()
    text = re.sub(r"\s+", " ", text)
    return text

def basic_postprocess(text: str) -> str:
    return text.strip()

def llm_step(ctx: Dict[str, Any]) -> Dict[str, Any]:
    ctx["raw_llm"] = LLM.generate(ctx["prompt"], system=ctx.get("system"), max_new_tokens=256)
    return ctx

wf = Workflow(
    name="pre_llm_post",
    steps=[
        FunctionStep("preprocess", lambda c: {**c, "prompt": basic_preprocess(c["prompt"])}),
        FunctionStep("llm", llm_step),
        FunctionStep("postprocess", lambda c: {**c, "output": basic_postprocess(c["raw_llm"])}),
    ],
)

ctx = wf.run({"prompt": "  Resume en 1 línea: Workflows de LLM para QA empresarial.   "})
ctx["output"], ctx["trace"][:2]

#### **Multi-step prompting y chain-of-actions: tool calls en JSON**

In [None]:
# Tool registry + routing

from dataclasses import dataclass
from typing import Any, Callable, Dict, List
import json
import os
import re

@dataclass
class Tool:
    name: str
    description: str
    fn: Callable[..., Any]
    schema: Dict[str, Any]

class ToolError(Exception):
    pass

TOOLS: Dict[str, Tool] = {}


# Punto único de ejecución: permite activar guardrails sin reescribir el loop.
def default_tool_execute(tool_name: str, args: Dict[str, Any]) -> Any:
    return TOOLS[tool_name].fn(**args)

TOOL_EXECUTOR: Callable[[str, Dict[str, Any]], Any] = default_tool_execute

def register_tool(tool: Tool) -> None:
    TOOLS[tool.name] = tool

def safe_json_loads(text: str) -> Dict[str, Any]:
    m = re.search(r"\{.*\}", text, flags=re.S)
    if not m:
        raise ValueError("No JSON object found in text.")
    return json.loads(m.group(0))

def tool_call_loop(user_request: str, system: str, max_steps: int = 6) -> Dict[str, Any]:
    transcript: List[Dict[str, Any]] = []
    obs = {"user_request": user_request}

    for step in range(max_steps):
        tool_names = ", ".join(sorted(TOOLS.keys()))
        prompt = (
            "Solicitud del usuario:\n"
            f"{user_request}\n\n"
            "Observación actual (JSON):\n"
            f"{json.dumps(obs, ensure_ascii=False)}\n\n"
            f"Herramientas disponibles: {tool_names}\n\n"
            "Responde SOLO con JSON en UNA de estas formas:\n"
            '1) { "tool": "<nombre>", "args": { ... } }\n'
            '2) { "final": "<respuesta final>" }\n'
        )
        raw = LLM.generate(prompt, system=system, max_new_tokens=256)
        transcript.append({"step": step, "llm_raw": raw})

        try:
            decision = safe_json_loads(raw)
        except Exception as e:
            transcript.append({"parse_error": str(e)})
            return {"status": "error", "error": "invalid_json", "transcript": transcript}

        if "final" in decision:
            return {"status": "ok", "final": decision["final"], "transcript": transcript, "obs": obs}

        tool_name = decision.get("tool")
        args = decision.get("args", {})
        if tool_name not in TOOLS:
            return {"status": "error", "error": f"Unknown tool: {tool_name}", "transcript": transcript}
        if not isinstance(args, dict):
            return {"status": "error", "error": "Tool args must be a dict.", "transcript": transcript}

        try:
            result = TOOL_EXECUTOR(tool_name, args)
        except Exception as e:
            result = {"tool_error": type(e).__name__, "message": str(e)[:200]}
        transcript.append({"tool": tool_name, "args": args, "result": result})
        obs = {"last_tool": tool_name, "last_result": result}

    return {"status": "error", "error": "max_steps_exceeded", "transcript": transcript}

# system prompt para tool calling
system_toolcaller = (
    "Eres un agente que decide qué herramienta usar.\n"
    "Debes seguir el formato JSON exactamente como se solicita.\n"
    "No incluyas texto extra.\n"
    "Usa herramientas cuando falten datos; sintetiza cuando ya estén disponibles."
)

# Tools locales (demostración)
def calculator(expression: str) -> Dict[str, Any]:
    if not re.fullmatch(r"[0-9\.\+\-\*\/\(\)\s]+", expression):
        raise ToolError("Expression contains unsupported characters.")
    return {"expression": expression, "value": eval(expression, {"__builtins__": {}}, {})}

register_tool(Tool(
    name="calculator",
    description="Evalúa una expresión aritmética segura (solo + - * / paréntesis).",
    fn=calculator,
    schema={"type":"object","properties":{"expression":{"type":"string"}},"required":["expression"]},
))

# Travel provider (demo local, sin dependencias externas)
TRAVEL_PROVIDER_PATH = "travel_provider.py"
SUPPORTED_LOCATIONS_PATH = "supported_locations.json"

DEFAULT_LOCATIONS = {
    "airports": ["LAX","JFK","ORD","ATL","DFW","DEN","SEA","SAN"],
    "hotel_cities": ["New York","Los Angeles","Chicago","Miami","San Francisco","Seattle","San Diego"],
}

def ensure_travel_provider_files() -> None:
    # supported_locations.json
    if not os.path.exists(SUPPORTED_LOCATIONS_PATH):
        with open(SUPPORTED_LOCATIONS_PATH, "w", encoding="utf-8") as f:
            json.dump(DEFAULT_LOCATIONS, f, indent=2)

    # travel_provider.py (sin faker; solo stdlib)
    if not os.path.exists(TRAVEL_PROVIDER_PATH):
        TP_TEMPLATE = '# travel_provider.py (demo local, sin dependencias externas)\nimport json\nimport os\nimport random\nfrom datetime import datetime, timedelta\n\ndef load_supported_locations(default=None):\n    default = default or {\n        "airports": ["LAX","JFK","ORD","ATL","DFW","DEN","SEA","SAN"],\n        "hotel_cities": ["New York","Los Angeles","Chicago","Miami","San Francisco","Seattle","San Diego"],\n    }\n    json_file = os.path.join(os.path.dirname(__file__), "supported_locations.json")\n    if not os.path.exists(json_file):\n        return default\n    with open(json_file, "r", encoding="utf-8") as f:\n        try:\n            return json.load(f)\n        except Exception:\n            return default\n\nsupported_locations = load_supported_locations()\n\ndef _rand_dt(start: datetime, end: datetime) -> datetime:\n    if end <= start:\n        return start\n    delta = int((end - start).total_seconds())\n    return start + timedelta(seconds=random.randint(0, delta))\n\nclass TravelProvider:\n    def flight_lookup(self, departure_city, destination_city, num_options=3):\n        if departure_city not in supported_locations["airports"]:\n            return {"error": f"Unsupported departure city: {departure_city}. Supported airports are {supported_locations[\'airports\']}"}\n        if destination_city not in supported_locations["airports"]:\n            return {"error": f"Unsupported destination city: {destination_city}. Supported airports are {supported_locations[\'airports\']}"}\n        if departure_city == destination_city:\n            return {"error": "Departure and destination cities cannot be the same."}\n\n        flights = []\n        now = datetime.utcnow()\n        for _ in range(int(num_options)):\n            airline = random.choice(["Delta","United","Southwest","JetBlue","American Airlines"])\n            flight_number = f"{random.choice([\'DL\',\'UA\',\'SW\',\'JB\',\'AA\'])}{random.randint(100,9999)}"\n            dep = _rand_dt(now, now + timedelta(days=30))\n            arr = _rand_dt(dep + timedelta(hours=2), dep + timedelta(hours=12))\n            price = round(random.uniform(100, 300), 2)\n            flights.append({\n                "airline": airline,\n                "departure_airport": departure_city,\n                "destination_airport": destination_city,\n                "flight_number": flight_number,\n                "departure_time": dep.isoformat(),\n                "arrival_time": arr.isoformat(),\n                "price": price,\n            })\n        return {"status_code": 200, "flight_options": flights}\n\n    def hotel_lookup(self, city, num_options=3):\n        if city not in supported_locations["hotel_cities"]:\n            return {"error": f"Unsupported city: {city}. Supported cities are {supported_locations[\'hotel_cities\']}"}\n\n        hotels = []\n        now = datetime.utcnow()\n        for _ in range(int(num_options)):\n            hotel_name = random.choice(["Hilton","Marriott","Hyatt","Holiday Inn","Sheraton"])\n            check_in = _rand_dt(now, now + timedelta(days=30))\n            nights = random.randint(1, 7)\n            check_out = check_in + timedelta(days=nights)\n            price_per_night = round(random.uniform(100, 500), 2)\n            total_price = round(price_per_night * nights, 2)\n            hotels.append({\n                "hotel_name": hotel_name,\n                "city": city,\n                "check_in": check_in.isoformat(),\n                "check_out": check_out.isoformat(),\n                "price_per_night": price_per_night,\n                "total_price": total_price,\n            })\n        return hotels\n\ntravel_provider = TravelProvider()\n'
        with open(TRAVEL_PROVIDER_PATH, "w", encoding="utf-8") as f:
            f.write(TP_TEMPLATE)

ensure_travel_provider_files()

try:
    from travel_provider import travel_provider  # type: ignore
except Exception as e:
    # Fallback 100% local para que el cuaderno quede 'verde' aun sin archivo externo.
    print("No se pudo importar travel_provider. Se activará fallback local. Error:", type(e).__name__, str(e)[:120])
    class _DummyTravelProvider:
        def flight_lookup(self, departure_city: str, destination_city: str, num_options: int = 3) -> Dict[str, Any]:
            return {
                "query": {"from": departure_city, "to": destination_city},
                "options": [
                    {"airline": "LocalAir", "from": departure_city, "to": destination_city, "price_usd": 320 + 10*i, "stops": i%2}
                    for i in range(max(1, int(num_options)))
                ],
            }
        def hotel_lookup(self, city: str, num_options: int = 3) -> Dict[str, Any]:
            return {
                "query": {"city": city},
                "hotels": [
                    {"name": f"Hotel {city} {i+1}", "price_usd": 120 + 15*i, "rating": 4.0 + 0.1*(i%5)}
                    for i in range(max(1, int(num_options)))
                ],
            }
    travel_provider = _DummyTravelProvider()

def flight_lookup(departure_city: str, destination_city: str, num_options: int = 3) -> Dict[str, Any]:
    if travel_provider is None:
        raise ToolError("travel_provider no está disponible.")
    return travel_provider.flight_lookup(departure_city, destination_city, num_options=num_options)

def hotel_lookup(city: str, num_options: int = 3) -> Any:
    if travel_provider is None:
        raise ToolError("travel_provider no está disponible.")
    return travel_provider.hotel_lookup(city, num_options=num_options)


# Registrar herramientas de viaje
register_tool(Tool(
    name="flight_lookup",
    description="Busca opciones de vuelo (demo local; devuelve opciones simuladas o del provider local).",
    fn=flight_lookup,
    schema={"type":"object","properties":{"departure_city":{"type":"string"},"destination_city":{"type":"string"},"num_options":{"type":"integer"}},"required":["departure_city","destination_city"]},
))

register_tool(Tool(
    name="hotel_lookup",
    description="Busca opciones de hotel (demo local; devuelve hoteles simulados o del provider local).",
    fn=hotel_lookup,
    schema={"type":"object","properties":{"city":{"type":"string"},"num_options":{"type":"integer"}},"required":["city"]},
))

print("Tools registrados:", sorted(TOOLS.keys()))


In [None]:
res = tool_call_loop(
    user_request="Necesito un vuelo de LAX a JFK.",
    system=system_toolcaller,
    max_steps=3,
)
res["status"], res.get("final"), res["transcript"][:2]

#### **RAG local (TF-IDF + opcional dense) y respuesta grounded con citas**

In [None]:
import numpy as np

def chunk_text(text: str, chunk_size: int = 220, overlap: int = 40) -> List[str]:
    text = basic_preprocess(text)
    chunks = []
    i = 0
    while i < len(text):
        chunks.append(text[i:i+chunk_size])
        i += chunk_size - overlap
    return chunks

DOCS = [
    ("doc1",
     "Fundamentos de RAG (Retrieval-Augmented Generation): un sistema RAG separa el problema en tres etapas: "
     "retrieve → read → generate. Primero recupera pasajes relevantes (sparse, dense o híbrido), luego arma un "
     "contexto y finalmente genera una respuesta. RAG reduce alucinaciones al exigir que las afirmaciones se anclen "
     "en evidencia recuperada; si el contexto no contiene soporte suficiente, el asistente debe declararlo explícitamente."),
    ("doc2",
     "Chunking en RAG: la ingesta transforma documentos en chunks para embeddings/índices. El tamaño y solapamiento "
     "controlan el trade-off entre precisión y cobertura. Chunks muy pequeños pierden coherencia; muy grandes "
     "diluyen señal y encarecen el contexto. El chunking semántico intenta cortar por unidades discursivas (títulos, "
     "párrafos, secciones) para mejorar recuperación y citas."),
    ("doc3",
     "Retrievers: sparse (BM25) favorece coincidencia léxica; dense (embeddings) captura similitud semántica; híbrido "
     "combina ambos con re-ranking. Un patrón común es: (i) recuperar top-k amplio, (ii) re-rank con cross-encoder, "
     "(iii) seleccionar pocos chunks para el prompt. Métricas típicas: recall@k, MRR, nDCG; y evaluación end-to-end "
     "con EM/F1 o LLM-as-a-judge (con controles de sesgo)."),
    ("doc4",
     "Workflows/pipelines de LLM: estructurar la ejecución como pasos (preprocesar → LLM → postprocesar) reduce "
     "errores y mejora auditabilidad. El preprocesamiento normaliza entrada; el postprocesamiento valida formato "
     "(JSON/tabla), aplica políticas (por ejemplo, límites de longitud) y puede ejecutar verificadores. "
     "El tracing por step (start/end/error, latencia) es clave para depuración y observabilidad."),
    ("doc5",
     "Agentes: un agente implementa un loop observar → razonar → actuar → observar. Actuar suele significar usar "
     "herramientas (APIs, DB, vector DB, calculadora). Guardrails en agentes incluyen: validación de argumentos, "
     "allowlists de herramientas, límites de presupuesto (pasos, k, contexto), y auditoría del loop (transcripts). "
     "Un patrón avanzado es Manager+Workers con un Critic que revisa y pide correcciones."),
    ("doc6",
     "Observabilidad aplicada a RAG y agentes: además de métricas (p95 latencia, errores por herramienta), se "
     "registran logs estructurados de prompts, tamaño de contexto, ids citados, y trazas distribuidas por request "
     "(trace_id). Esto permite relacionar calidad con costos (tokens/latencia) y detectar regresiones por cambios "
     "en el corpus, embeddings o reglas de routing."),
]

CHUNKS: List[Tuple[str,str]] = []
for doc_id, text in DOCS:
    for j, ch in enumerate(chunk_text(text, chunk_size=170, overlap=30)):
        CHUNKS.append((f"{doc_id}#c{j}", ch))

from sklearn.feature_extraction.text import TfidfVectorizer  # type: ignore
vectorizer = TfidfVectorizer()
X = vectorizer.fit_transform([c[1] for c in CHUNKS])

def sparse_retrieve(query: str, k: int = 3) -> List[Tuple[str, float, str]]:
    qv = vectorizer.transform([query])
    scores = (X @ qv.T).toarray().ravel()
    idx = np.argsort(-scores)[:k]
    return [(CHUNKS[i][0], float(scores[i]), CHUNKS[i][1]) for i in idx]

def build_context(passages: List[Tuple[str,float,str]]) -> str:
    return "\n".join([f"[{cid}] {txt}" for cid,_,txt in passages])

def rag_answer(query: str, k: int = 3) -> Dict[str, Any]:
    passages = sparse_retrieve(query, k=k)
    context = build_context(passages)
    system = (
        "Eres un asistente técnico.\n"
        "Reglas: responde usando SOLO el CONTEXTO provisto, incluye citas [doc#chunk].\n"
        "Si el contexto no contiene evidencia, di: 'No tengo evidencia suficiente en el contexto recuperado.'"
    )
    prompt = f"PREGUNTA:\n{query}\n\nCONTEXTO:\n{context}\n\nRESPUESTA (con citas):\n"
    out = LLM.generate(prompt, system=system, max_new_tokens=220)
    return {"query": query, "context": context, "answer": out}

rag_answer("¿Qué costo extra introduce RAG y cómo se abarata?", k=3)

#### **Agente RAG-first (observar->razonar->actuar->observar)**

Este agente primero llama `rag_search` (herramienta) y luego sintetiza con citas.

In [None]:
def rag_search(query: str, k: int = 4) -> Dict[str, Any]:
    passages = sparse_retrieve(query, k=k)
    return {"passages": [{"id":cid, "score":score, "text":txt} for cid,score,txt in passages]}

register_tool(Tool(
    name="rag_search",
    description="Recupera pasajes relevantes del corpus local (RAG).",
    fn=rag_search,
    schema={"type":"object","properties":{"query":{"type":"string"},"k":{"type":"integer"}},"required":["query"]},
))

@dataclass
class AgentState:
    goal: str
    short_memory: List[Dict[str, Any]]
    long_memory: List[Dict[str, Any]]

class BaseAgent:
    def __init__(self, name: str):
        self.name = name

    def run(self, user_request: str, max_turns: int = 4) -> Dict[str, Any]:
        state = AgentState(goal=user_request, short_memory=[], long_memory=[])
        obs: Dict[str, Any] = {"user_request": user_request}

        for t in range(max_turns):
            action = self.reason(state, obs)
            state.short_memory.append({"turn": t, "obs": obs, "action": action})

            if action.get("type") == "final":
                return {"status": "ok", "final": action["content"], "memory": state.short_memory}

            if action.get("type") == "tool":
                tool = action["tool"]
                args = action["args"]
                try:
                    result = TOOLS[tool].fn(**args)
                except Exception as e:
                    result = {"tool_error": type(e).__name__, "message": str(e)[:200]}
                obs = {"last_tool": tool, "last_result": result}
                continue

            return {"status": "error", "error": "unknown_action_type", "memory": state.short_memory}

        return {"status": "error", "error": "max_turns_exceeded", "memory": state.short_memory}

    def reason(self, state: AgentState, obs: Dict[str, Any]) -> Dict[str, Any]:
        raise NotImplementedError()

class RagAgent(BaseAgent):
    def reason(self, state: AgentState, obs: Dict[str, Any]) -> Dict[str, Any]:
        if obs.get("last_tool") == "rag_search":
            passages = obs["last_result"]["passages"]
            context = "\n".join([f"[{p['id']}] {p['text']}" for p in passages])
            prompt = (
                f"PREGUNTA:\n{state.goal}\n\n"
                f"CONTEXTO:\n{context}\n\n"
                "Responde SOLO con información del contexto y añade citas [id].\n"
                "Si falta evidencia, di: 'No tengo evidencia suficiente en el contexto recuperado.'\n"
            )
            ans = LLM.generate(prompt, system="Eres un asistente RAG.", max_new_tokens=220)
            return {"type": "final", "content": ans}
        return {"type": "tool", "tool": "rag_search", "args": {"query": state.goal, "k": 3}}

rag_agent = RagAgent("rag_agent")
rag_agent.run("Explica cómo RAG reduce alucinaciones y qué costo extra añade.", max_turns=3)

#### **Guardrails (validación + límites + auditoría)**

Se muestran guardrails mínimos:
- Validación de argumentos (Pydantic)
- Policy gate (reglas)

In [None]:
from typing import Any, Dict, Tuple

from pydantic import BaseModel, Field, ValidationError  # type: ignore

# -------------------------
# Modelos de validación
# -------------------------

class FlightArgs(BaseModel):
    departure_city: str = Field(min_length=3, max_length=3, description="Código IATA (ej: LAX)")
    destination_city: str = Field(min_length=3, max_length=3, description="Código IATA (ej: JFK)")
    num_options: int = Field(default=3, ge=1, le=10)

class HotelArgs(BaseModel):
    city: str = Field(min_length=2, max_length=80)
    num_options: int = Field(default=3, ge=1, le=10)

class RagArgs(BaseModel):
    query: str = Field(min_length=5, max_length=800)
    k: int = Field(default=3, ge=1, le=8)

class CalcArgs(BaseModel):
    expression: str = Field(min_length=1, max_length=80)

# -------------------------
# Políticas (guardrails)
# -------------------------

def policy_allow(tool_name: str, args: Dict[str, Any]) -> Tuple[bool, str]:
    # Ejemplo 1: regla obvia (evitar consulta sin sentido)
    if tool_name == "flight_lookup" and args.get("departure_city") == args.get("destination_city"):
        return False, "Policy: departure_city must differ from destination_city."

    # Ejemplo 2: evitar abuso de contexto (RAG)
    if tool_name == "rag_search" and int(args.get("k", 3)) > 8:
        return False, "Policy: k too large for rag_search."

    # Ejemplo 3: (defensa adicional) negar expresiones sospechosas en calculator
    if tool_name == "calculator":
        exp = str(args.get("expression", ""))
        if any(tok in exp for tok in ["__", "import", "open", "eval", "exec"]):
            return False, "Policy: expression contains forbidden tokens."

    return True, "ok"

# -------------------------
# Ejecución guardada
# -------------------------

def guarded_tool_execute(tool_name: str, args: Dict[str, Any]) -> Any:
    # 1) Policy gating
    ok, msg = policy_allow(tool_name, args)
    if not ok:
        raise ToolError(msg)

    # 2) Validación por herramienta
    try:
        if tool_name == "flight_lookup":
            args = FlightArgs(**args).model_dump()
        elif tool_name == "hotel_lookup":
            args = HotelArgs(**args).model_dump()
        elif tool_name == "rag_search":
            args = RagArgs(**args).model_dump()
        elif tool_name == "calculator":
            # valida longitud (la validación fuerte de caracteres está en la tool)
            args = CalcArgs(**args).model_dump()
    except ValidationError as e:
        raise ToolError(f"Args validation failed: {e.errors()[:1]}")

    # 3) Ejecución real (tool registry)
    return TOOLS[tool_name].fn(**args)

# Activación: el loop de tool-calling usará TOOL_EXECUTOR (definido en la sección de tools).
# Esto mantiene el loop estable y permite encender/apagar guardrails sin reescribirlo.
TOOL_EXECUTOR = guarded_tool_execute  # type: ignore[name-defined]

# Demo rápido: debe bloquearse por política
try:
    guarded_tool_execute("flight_lookup", {"departure_city":"LAX","destination_city":"LAX","num_options":3})
except Exception as e:
    print("Bloqueado:", e)


#### **Docker/offline (sin credenciales)**

Recomendación:
- Monta un volumen para caches (`HF_HOME` / `TRANSFORMERS_CACHE`) y mantén offline.
- Pre-descarga modelos una vez (si aplica) y luego ejecuta con `local_files_only=True` (ya está así en este cuaderno).

#### **Patrones de workflow: secuencial, branching y paralelo**

- **Secuencial**: pasos lineales (fácil de auditar, ideal en entornos regulados).
- **Branching**: bifurcar según condición (por ejemplo,  intención del usuario, confianza, presencia de contexto).
- **Paralelo**: ejecutar tareas independientes concurrentemente (p.ej. detección de idioma + extracción de entidades).

Ejemplo didáctico: detectar `intent` y ejecutar ruta correspondiente.

In [None]:
from concurrent.futures import ThreadPoolExecutor, as_completed

def detect_language(text: str) -> str:
    if re.search(r"\b(el|la|los|las|pero|porque|entonces)\b", text.lower()):
        return "es"
    return "en"

def classify_intent(text: str) -> str:
    if re.search(r"\b(vuelo|flight)\b", text, re.IGNORECASE):
        return "travel_flight"
    if re.search(r"\b(hotel|alojamiento)\b", text, re.IGNORECASE):
        return "travel_hotel"
    if re.search(r"\b(rag|retriev)\b", text, re.IGNORECASE):
        return "rag_qa"
    return "general_qa"

def route_request(user_text: str) -> dict:
    with ThreadPoolExecutor(max_workers=2) as ex:
        futures = {ex.submit(detect_language, user_text): "lang",
                   ex.submit(classify_intent, user_text): "intent"}
        meta = {}
        for f in as_completed(futures):
            meta[futures[f]] = f.result()

    intent = meta["intent"]
    if intent in ("travel_flight", "travel_hotel"):
        return {"route": intent, "meta": meta, "result": tool_call_loop(user_text, system_toolcaller, max_steps=3)}
    if intent == "rag_qa":
        return {"route": intent, "meta": meta, "result": rag_answer(user_text, k=3)}
    # fallback: LLM directo
    ans = LLM.generate(f"Responde con máximo 3 viñetas:\n{user_text}", system="Eres un asistente técnico.", max_new_tokens=180)
    return {"route": intent, "meta": meta, "result": {"answer": ans}}

route_request("¿Cómo reduce RAG las alucinaciones y qué costo extra introduce?")

#### **Integración con APIs y microservicios (local)**

En escenarios reales, "tools" suelen estar detrás de HTTP/gRPC.  
A continuación se levanta (opcional) un microservicio local con FastAPI que expone:

- `/flight` (usa `flight_lookup`)
- `/hotel` (usa `hotel_lookup`)
- `/rag_search` (retrieval local)

El objetivo es mostrar **tool use** vía red, sin credenciales externas.

#### **Observabilidad del workflow (logs, métricas, trazas) - local**

**Logs**
- Incluir `trace_id`, `step`, `tool`, `latency_ms`, tamaños (`prompt_chars`, `context_chars`).

**Métricas (Prometheus client)**
- Contadores: tool calls por tool y status
- Histograma: latencia de LLM y retrieval

**Trazas (OpenTelemetry)**
- Spans por etapa: preprocess, retrieve, generate, validate

A continuación, una instrumentación mínima local (sin `collectors` externos).

In [None]:
# Nota: si no tienes fastapi/uvicorn, instala:
# !pip install -U fastapi uvicorn

import threading

def start_tool_server(host="127.0.0.1", port=8088):
    from fastapi import FastAPI
    from pydantic import BaseModel
    import uvicorn

    app = FastAPI(title="Local Tools API")

    class FlightReq(BaseModel):
        departure_city: str
        destination_city: str
        num_options: int = 3

    class HotelReq(BaseModel):
        city: str
        num_options: int = 3

    class RagReq(BaseModel):
        query: str
        k: int = 4

    @app.post("/flight")
    def _flight(req: FlightReq):
        return flight_lookup(req.departure_city, req.destination_city, req.num_options)

    @app.post("/hotel")
    def _hotel(req: HotelReq):
        return hotel_lookup(req.city, req.num_options)

    @app.post("/rag_search")
    def _rag(req: RagReq):
        return rag_search(req.query, req.k)

    config = uvicorn.Config(app, host=host, port=port, log_level="warning")
    server = uvicorn.Server(config)

    th = threading.Thread(target=server.run, daemon=True)
    th.start()
    return {"host": host, "port": port, "thread": th}

# Descomenta para iniciar:
# srv = start_tool_server()
# srv

In [None]:
# Cliente HTTP (si levantaste el server arriba)
def http_post(url: str, payload: dict) -> dict:
    r = requests.post(url, json=payload, timeout=5.0)
    r.raise_for_status()
    return r.json()

# Ejemplo (requiere server):
# http_post("http://127.0.0.1:8088/rag_search", {"query":"costo extra de RAG", "k":3})

In [None]:
# Nota: si no tienes prometheus_client / opentelemetry, instala:
# !pip install -U prometheus-client opentelemetry-api opentelemetry-sdk

import logging

logging.basicConfig(level=logging.INFO, format="%(message)s")
log = logging.getLogger("obs")

# Prometheus (opcional). Este bloque está escrito para poder re-ejecutarse en Jupyter
# sin lanzar "Duplicated timeseries".
try:
    from prometheus_client import Counter, Histogram, REGISTRY  # type: ignore

    def _get_or_create_collector(name: str, factory):
        existing = getattr(REGISTRY, "_names_to_collectors", {}).get(name)  # type: ignore[attr-defined]
        return existing if existing is not None else factory()

    WF_RUNS = _get_or_create_collector(
        "wf_runs_total",
        lambda: Counter("wf_runs_total", "Ejecuciones de workflows", ["name"]),
    )
    TOOL_CALLS = _get_or_create_collector(
        "tool_calls_total",
        lambda: Counter("tool_calls_total", "Llamadas a tools", ["tool", "status"]),
    )
    LLM_LAT = _get_or_create_collector(
        "llm_latency_seconds",
        lambda: Histogram("llm_latency_seconds", "Latencia de LLM (s)", buckets=(0.1,0.3,0.7,1.5,3,6,12)),
    )
    RETR_LAT = _get_or_create_collector(
        "retrieval_latency_seconds",
        lambda: Histogram("retrieval_latency_seconds", "Latencia de retrieval (s)", buckets=(0.01,0.03,0.1,0.3,0.7,1.5)),
    )

except Exception:
    # Fallback: métricas no-op (para que el notebook no falle si falta prometheus_client)
    class _Noop:
        def labels(self, *args, **kwargs): return self
        def inc(self, *args, **kwargs): return None
        def observe(self, *args, **kwargs): return None

    WF_RUNS = _Noop()
    TOOL_CALLS = _Noop()
    LLM_LAT = _Noop()
    RETR_LAT = _Noop()

def obs_llm(prompt: str, system: str) -> str:
    t0 = time.time()
    try:
        return LLM.generate(prompt, system=system, max_new_tokens=220)
    finally:
        try:
            LLM_LAT.observe(time.time() - t0)
        except Exception:
            pass

def obs_retrieve(query: str, k: int = 3):
    t0 = time.time()
    try:
        return sparse_retrieve(query, k=k)
    finally:
        try:
            RETR_LAT.observe(time.time() - t0)
        except Exception:
            pass

def traced_rag_answer(query: str, k: int = 3) -> dict:
    trace_id = f"tr_{int(time.time()*1000)}"
    try:
        WF_RUNS.labels("rag").inc()
    except Exception:
        pass

    passages = obs_retrieve(query, k=k)
    context = build_context(passages)

    log.info(json.dumps({"event":"retrieve", "trace_id":trace_id, "k":k, "hits":len(passages), "context_chars":len(context)}))
    prompt = f"PREGUNTA:\n{query}\n\nCONTEXTO:\n{context}\n\nRESPUESTA (con citas):\n"
    ans = obs_llm(prompt, system="Eres un asistente RAG.")
    log.info(json.dumps({"event":"generate", "trace_id":trace_id, "answer_chars":len(ans)}))
    return {"trace_id": trace_id, "answer": ans, "context": context}

traced_rag_answer("¿Qué componentes tiene un workflow de LLM?", k=3)

#### **Workflows multi-agente (Manager + Workers + Critic)**

Patrón didáctico:

- **Worker RAG**: obtiene contexto + redacta respuesta con citas.
- **Critic/Reviewer**: valida grounding ("¿hay citas?", "¿afirma sin evidencia?"), y pide correcciones.
- **Manager**: orquesta.

El objetivo es mostrar *separación de responsabilidades* y auditoría.

In [None]:
class Critic:
    def review(self, draft: str) -> str:
        prompt = (
            "Revisa el borrador para:\n"
            "- evitar alucinaciones\n"
            "- exigir citas si afirma hechos\n"
            "- señalar incertidumbre si falta evidencia\n\n"
            f"BORRADOR:\n{draft}\n\n"
            "Devuelve OK o una lista de problemas y cómo corregirlos."
        )
        return LLM.generate(prompt, system="Eres un revisor estricto.", max_new_tokens=220)

class Manager:
    def __init__(self, max_corrections: int = 2):
        self.max_corrections = max_corrections
        self.critic = Critic()

    def run(self, question: str) -> dict:
        # Worker: RAG con contexto explícito para evitar inventar durante correcciones.
        base = rag_answer(question, k=3)
        draft = base["answer"]
        context = base["context"]

        last_review = ""
        for i in range(self.max_corrections + 1):
            last_review = self.critic.review(draft)
            if last_review.strip().upper().startswith("OK"):
                return {"draft": draft, "review": last_review, "iterations": i + 1}

            revise_prompt = f"""Corrige el borrador siguiendo el feedback.

Reglas:
- Usa SOLO el CONTEXTO (no agregues hechos externos).
- Mantén o mejora las citas [doc#chunk] cuando hagas afirmaciones.
- Si falta evidencia, dilo explícitamente.

FEEDBACK:
{last_review}

CONTEXTO:
{context}

BORRADOR:
{draft}

BORRADOR CORREGIDO:
"""
            draft = LLM.generate(revise_prompt, system="Eres un editor técnico estricto.", max_new_tokens=260)

        return {"draft": draft, "review": last_review, "iterations": self.max_corrections + 1}

mgr = Manager()
mgr.run("Explica cómo RAG reduce alucinaciones y cómo afecta el costo, con citas.")

#### **Razonamiento sobre contexto recuperado (deductivo, inductivo, abductivo)**

Una práctica útil en clases es pedir al agente que separe explícitamente:

- **Evidencia**: citas [doc#chunk] (lo que está en el contexto)
- **Inferencias deductivas**: consecuencias lógicas
- **Inferencias inductivas**: generalizaciones (marcar incertidumbre)
- **Inferencias abductivas**: mejor explicación plausible (marcar especulación)

Ejemplo de prompt estructurado:

In [None]:
def rag_reasoning_template(question: str) -> dict:
    passages = sparse_retrieve(question, k=3)
    context = build_context(passages)
    system = "Eres un asistente técnico. No inventes."
    prompt = (
        f"PREGUNTA:\n{question}\n\n"
        f"CONTEXTO:\n{context}\n\n"
        "Produce una respuesta con estas secciones:\n"
        "1) Evidencia (citas)\n"
        "2) Razonamiento deductivo\n"
        "3) Razonamiento inductivo (marca incertidumbre)\n"
        "4) Razonamiento abductivo (marca especulación)\n"
    )
    out = LLM.generate(prompt, system=system, max_new_tokens=260)
    return {"context": context, "answer": out}

rag_reasoning_template("¿Qué relación hay entre RAG y mitigación de alucinaciones?")

#### **RAG e impacto en optimización de inferencia (costo extra) y técnicas para abaratar**

RAG añade costos de:
- embeddings
- búsqueda
- contexto mayor (más tokens)

Mitigaciones típicas:
- caching (query embeddings, retrieval results)
- bajar k + híbrido
- dos etapas (retrieve -> rerank solo si es necesario)
- chunking adecuado
- embeddings pequeños/cuantiizados

Ejemplo mínimo: cache del retrieval sparse.

In [None]:
from functools import lru_cache

@lru_cache(maxsize=256)
def cached_sparse(query: str, k: int = 3):
    return tuple(sparse_retrieve(query, k=k))

t0 = time.time(); _ = cached_sparse("costo extra de rag", 3); t1 = time.time()
t2 = time.time(); _ = cached_sparse("costo extra de rag", 3); t3 = time.time()

{"first_ms": (t1-t0)*1000, "cached_ms": (t3-t2)*1000}

#### **Ejercicios propuestos para el cuaderno**

**E1. Instrumentación mínima del workflow (traza útil)**

**Tarea:** extiende `Workflow.run()` para registrar también:

* `input_keys`, `output_keys`
* `prompt_chars` (antes de LLM) y `output_chars` (después)
* `error` si un step falla (sin romper toda la ejecución)

**Criterio de éxito:** `ctx["trace"]` contiene eventos `start/end/error` por step y el workflow retorna un `ctx` válido aun con error.

**E2. Branching: ruta "RAG obligatorio" vs "LLM directo"**

**Tarea:** implementa un step `route()` que:

* si la pregunta contiene patrones de "hecho" (por ejemplo, "¿qué costo...?", "¿qué afirma...?") obligue RAG
* si es "creativo" o "resumen de estilo" permita LLM directo

**Criterio de éxito:** imprime `route=rag` o `route=direct` y la salida se genera por la ruta correcta.

**E3. Paralelo: extracción de metadatos + retrieval**

**Tarea:** ejecuta en paralelo:

1. `classify_intent(text)`
2. `sparse_retrieve(query,k)`
   y luego combina resultados en un solo `ctx`.

**Criterio de éxito:** el tiempo total medido es menor (o igual) que hacerlo secuencialmente y `ctx` contiene `intent` y `passages`.

**E4. Prompting multi-step: plan -> respuesta -> verificación**

**Tarea:** crea 3 steps:

1. `plan_step`: produce una lista corta de pasos (máx. 5)
2. `answer_step`: responde usando el plan
3. `verify_step`: revisa que haya citas si se usó RAG y que no existan afirmaciones sin evidencia

**Criterio de éxito:** el output final incluye (a) plan, (b) respuesta, (c) verificación "OK" o lista de problemas.

**E5. Tool routing: relacional vs vectorial vs API**

**Tarea:** implementa una función `choose_tools(question)` que devuelva una lista ordenada de tools candidatas:

* DB relacional (simulada): para consultas con estructura ("estado=", "id=", "fecha=")
* vector DB (tu `rag_search`): para preguntas semánticas
* API interna (FastAPI local): para acciones (flight/hotel)

**Criterio de éxito:** para 6 preguntas de prueba, el router elige razonablemente la(s) herramienta(s).

**E6. Guardrails: validación fuerte + allowlist + budgets**

**Tarea:** añade:

* allowlist por tool (`{"rag_search","calculator","flight_lookup","hotel_lookup"}`)
* límite de pasos del agente + **límite de herramientas por tipo** (por ejemplo,  máximo 2 llamadas a APIs por turno)
* validación Pydantic para al menos 2 tools (por ejemplo, `rag_search`, `calculator`)

**Criterio de éxito:** si el LLM pide una tool no permitida o excede límites, el agente retorna una respuesta segura (rechazo con motivo) y queda auditado.

**E7. Observabilidad: métricas Prometheus que no rompan al re-ejecutar**

**Tarea:** implementa un helper `get_metric(name, factory)` que:

* si la métrica ya existe en el registry, reutilice la existente
* si no existe, la cree

**Criterio de éxito:** puedes re-ejecutar la celda de métricas sin error y los contadores siguen incrementándose.

**E8. RAG con citas obligatorias y "no sé" controlado**

**Tarea:** modifica `rag_answer()` para:

* exigir que cada párrafo tenga al menos una cita `[doc#chunk]`, o bien devolver "No tengo evidencia suficiente..."
* agregar una sección "Evidencia usada:" listando los `chunk_id` utilizados

**Criterio de éxito:** en preguntas fuera del corpus, devuelve "No tengo evidencia...". En preguntas dentro, devuelve citas consistentes.

**E9. Razonamiento sobre contexto: separar evidencia vs inferencia**

**Tarea:** crea un prompt/plantilla que produzca 4 secciones:

1. Evidencia (solo citas)
2. Deductivo (derivable)
3. Inductivo (marcar incertidumbre)
4. Abductivo (marcar hipótesis)

**Criterio de éxito:** la salida marca explícitamente incertidumbre ("posiblemente", "hipótesis") y nunca presenta abductivo como hecho.

**E10. Memoria de largo plazo (simple) con TTL y metadatos**

**Tarea:** implementa un `MemoryStore` local con:

* `put(item, ttl_seconds, tags={...})`
* `search(query, k)` (puede ser TF-IDF sobre textos guardados)
* limpieza de expirados

**Criterio de éxito:** lo que expira deja de recuperarse; lo vigente sí aparece y se cita como "memoria".

**E11. Multi-agente: Manager + Worker(RAG) + Critic con corrección iterativa**

**Tarea:** el `Critic` debe:

* rechazar si no hay citas cuando corresponde
* pedir "otra pasada de retrieval" si scores son bajos
  El `Manager` re-ejecuta el Worker hasta 2 veces.

**Criterio de éxito:** se ve un bucle de corrección: `draft1 -> review -> draft2 (mejorado)`.

**E12. Costos de RAG: presupuesto de contexto + caching**

**Tarea:** agrega un "presupuesto" `max_context_chars`:

* si el contexto supera el presupuesto, reduce k o trunca por chunks con mayor score
* cachea retrieval por query, con invalidación por "versión de corpus"

**Criterio de éxito:** el contexto nunca excede el límite y el caching reduce latencia en la segunda ejecución.



In [None]:
## Tus respuestas