In [None]:
from src.core.settings import configure_langchain_environment
import os

configure_langchain_environment()

print(os.getenv("LANGCHAIN_PROJECT"))

## Definir modelo 

In [None]:
from langchain_openai import ChatOpenAI
from langgraph.prebuilt import create_react_agent

OPENAI_MODEL = "gpt-4o-mini"
TEMPERATURE = 0.5

model = ChatOpenAI(
    temperature=TEMPERATURE, 
    model=OPENAI_MODEL
)

## Prueba de agentes con Tools

In [None]:
from langchain_tavily import TavilySearch
tools = [TavilySearch(max_results=5)]

prompt = "Eres un experto en repuestos de motocicletas en Colombia."
agent_executor = create_react_agent(model, tools, prompt=prompt)

response = model.invoke("Hola, para una Honda CB190R cuales son las medidas de las llantas")
response.pretty_print()

## Tools para MundiBot

### Tools para maneo de catalogos

In [None]:
from typing import Optional, List, Dict, Union
from langchain_core.tools import tool

# Tu instancia global del catálogo
from src.core.domains.products.catalogs import get_global_catalogs
catalogs = get_global_catalogs()


# ============== LISTADOS BÁSICOS ==============

@tool
def obtener_marcas() -> List[str]:
    """
    Devuelve la lista cerrada de marcas de motos disponibles en el catálogo (no llantas).
    Úsala para validar/mostrar opciones cuando el usuario menciona o necesita una marca de moto.

    Returns:
        List[str]: Marcas disponibles tal como aparecen en el catálogo.
    """
    return catalogs.get_marcas()


@tool
def obtener_marcas_llantas() -> List[str]:
    """
    Devuelve la lista cerrada de marcas de llantas disponibles.
    Úsala al tratar llantas para validar o sugerir marcas válidas.

    Returns:
        List[str]: Marcas de llantas tal como aparecen en el catálogo.
    """
    return catalogs.get_marcas_llantas()

@tool
def obtener_categorias_full() -> List[Dict]:
    """
    Devuelve el catálogo completo de categorías con descripciones y subcategorías (estructura completa).
    Úsala si necesitas contexto amplio para sugerir o validar.

    Returns:
        List[Dict]: Estructura típica:
            [
              {
                "name": "FRENOS",
                "description": "...",
                "subcategorias": ["PASTILLAS DE FRENO","DISCO DE FRENO", ...]
              },
              ...
            ]
    """
    return catalogs.get_categorias_con_descripciones_y_subcategorias()

@tool
def obtener_modelos(marca: Optional[str] = None) -> Union[List[str], Dict[str, List[str]]]:
    """
    Devuelve modelos válidos desde el catálogo:
      - Si `marca` está definida → retorna List[str] con los modelos válidos de esa marca.
      - Si `marca` es None → retorna Dict[str, List[str]] con *todas* las marcas y sus modelos.

    Útil cuando el usuario da el modelo sin marca, o para sugerir modelos de una marca detectada.

    Args:
        marca (Optional[str]): Nombre exacto de la marca (motos o llantas), o None para traer todas.

    Returns:
        Union[List[str], Dict[str, List[str]]]
    """
    if marca:
        return catalogs.get_modelos_por_marca(marca)
    return catalogs.get_modelos()

### Tool para buscar productos

In [None]:
import json
import logging
from typing import Dict, List, Optional, Literal, Union
from pydantic import BaseModel, Field
from langchain_core.tools import tool

from src.core.domains.products.services import ProductSearchService

logger = logging.getLogger(__name__)
_search_service: Optional[ProductSearchService] = None

# ------------------ Listas cerradas  ------------------

ALLOWED_CATEGORIAS = set(catalogs.get_categorias())
ALLOWED_MARCAS_MOTOS = set(catalogs.get_marcas())
ALLOWED_MARCAS_LLANTAS = set(catalogs.get_marcas_llantas())
ALLOWED_MARCAS = ALLOWED_MARCAS_MOTOS | ALLOWED_MARCAS_LLANTAS

def _marcas_description() -> str:
    motos = ", ".join(sorted(ALLOWED_MARCAS_MOTOS))
    llantas = ", ".join(sorted(ALLOWED_MARCAS_LLANTAS))
    return (
        "Marcas permitidas (lista cerrada, insensible a mayúsculas). "
        f"Motos: {motos}. "
        f"Llantas: {llantas}."
    )

def _normalize_str(s: str) -> str:
    return (s or "").strip().upper()

def _filter_allowed(values: Optional[List[str]], allowed: set) -> List[str]:
    if not values:
        return []
    normed = [_normalize_str(v) for v in values if v]
    return [v for v in normed if v in allowed]


# ------------------ Tipos de entrada ------------------

class BuscarProductosArgs(BaseModel):
    """
    Úsala cuando el usuario pida ver repuestos/llantas y tengas:
      - 'consulta' y, opcionalmente, filtros válidos (marcas | categorias | tipo_repuesto | precio_max).

    Reglas para `consulta` (texto libre estructurado):

    Repuestos motos
      "<TITULO REPUESTO>  <MARCA principal si la hay opcional> <MODELO si existe> <ORIGINAL|GENERICO opcional>"
      Ejemplos:
        "Disco Freno Delantero Xtz 150 Yamaha Original"
        "Balinera 6902U Polea Fija Dynamic 125 Akt"
        "Bujia BP8HS Japon (NGK) - Generico"
        "CALCO MANIGUETA KIT PULSAR BLACK - Marca: BAJAJ"
        "Kit de Empaques YAMAHA Dt 175K Nacional Generico"
        "Buje de Tijera (14X20X529) Cb 190R Honda"
        "Reten Bomba de Agua Downtown 300I Kymco"

    Llantas
      "<TITULO REPUESTO>  <MARCA principal si la hay opcional>  <ALTURA/ANCHO - RIN (dimensiones si las hay)> <DELANTERA | TRASERA si la hay>"
      Ejemplos:
        "Llanta Metzeler Roadtec 01 130/70-17 Trasera Sellomatic Original"
        "Llanta Kontrol Knt311 120/70-12 Delantera Sellomatic"
        "Llanta Michelin Road 6 110/70Zr-17 Delantera Sellomatic"

    Notas:
      - El texto puede incluir modelo de moto y/o tipo (Original/Genérico) si aplica.
      - Los filtros `marcas` y `categorias` deben provenir de listas cerradas (ver descripciones) para que apliquen.
    """
    consulta: str = Field(
        ...,
        min_length=3,
        max_length=200,
        description=(
            "Consulta en lenguaje natural siguiendo los formatos anteriores. "
            "Incluye título del repuesto; opcionalmente marca principal, modelo y tipo (Original|Generico)."
        ),
    )
    marcas: Optional[List[str]] = Field(
        default=None,
        description=_marcas_description(),
    )
    categorias: Optional[List[str]] = Field(
        default=None,
        description=(
            "Categorías permitidas (lista cerrada, insensible a mayúsculas): "
            "ACCESORIOS, CARROCERIA, ELECTRICO, FILTROS, FRENOS, LLANTAS, "
            "LUBRICANTES, MOTOR, SUSPENSION, TRANSMISION"
        ),
    )
    tipo_repuesto: Optional[Literal["ORIGINAL", "GENERICO"]] = Field(
        default=None, description="Tipo de repuesto (si aplica). Valores: ORIGINAL | GENERICO."
    )
    precio_max: Optional[float] = Field(
        default=None, ge=0, description="Precio máximo en COP."
    )
    limite_resultados: int = Field(
        default=10, ge=1, le=20, description="Nº máximo a mostrar."
    )
    formato: Literal["dict", "json"] = Field(
        default="dict", description="Respuesta 'dict' (recomendado) o 'json'."
    )


# ------------------ Infra ------------------

def get_search_service() -> ProductSearchService:
    """Lazy singleton del servicio de búsqueda."""
    global _search_service
    if _search_service is None:
        _search_service = ProductSearchService(collection_name="repuesto_motos_mundibot")
    return _search_service


# ------------------ Helpers de salida ------------------

def _dedup_by_id(items: List[Dict]) -> List[Dict]:
    seen, out = set(), []
    for it in items:
        pid = it.get("id") or (it.get("payload") or {}).get("id")
        if pid in seen:
            continue
        seen.add(pid)
        out.append(it)
    return out

def _format_results(results, limite: int, applied_filters: Dict, filter_warnings: List[str]) -> Dict:
    """
    Normaliza a un dict JSON-safe:
    {
      meta: {encontrados, mostrados, hay_mas, applied_filters, mensaje?, sugerencia?, warnings?},
      productos: [{...}],
      facets: {marcas: {marca:count}, categorias: {categoria:count}}
    }
    """
    if hasattr(results, "results"):
        raw_items = [{"id": r.id, "score": r.score, **(r.payload or {})} for r in (results.results or [])]
    elif isinstance(results, dict):
        raw_items = results.get("items", [])
    else:
        raw_items = []

    raw_items = _dedup_by_id(raw_items)
    total = len(raw_items)

    productos = []
    facet_brands, facet_cats = {}, {}

    for item in raw_items[:limite]:
        marca = item.get("marca") or ""
        categoria = item.get("categoria") or ""
        subcat = item.get("subcategoria") or ""  # se muestra si viene en payload

        if marca:
            facet_brands[marca] = facet_brands.get(marca, 0) + 1
        if categoria:
            facet_cats[categoria] = facet_cats.get(categoria, 0) + 1

        prod = {
            "id": item.get("id"),
            "titulo": item.get("titulo", "Sin título"),
            "marca": marca,
            "categoria": categoria,
            "subcategoria": subcat,
            "precio": item.get("precio"),
            "moneda": item.get("moneda") or "COP",
            "url": item.get("url", ""),
            "imagen": item.get("imagen", ""),
            "score": round(item.get("score", 0), 3) if item.get("score") else None,
            "descripcion": (item.get("descripcion") or "")[:240],
        }
        if item.get("modelos_lista"):
            prod["modelos_compatibles"] = item["modelos_lista"][:6]
        if item.get("dimensiones"):
            # Solo salida, ya no se reciben como filtro
            prod["dimensiones"] = item["dimensiones"]
        if item.get("tipo_repuesto"):
            prod["tipo"] = item["tipo_repuesto"]

        productos.append(prod)

    out = {
        "meta": {
            "encontrados": total,
            "mostrados": len(productos),
            "hay_mas": total > limite,
            "applied_filters": applied_filters or {},
        },
        "productos": productos,
        "facets": {
            "marcas": facet_brands,
            "categorias": facet_cats,
        },
    }

    if filter_warnings:
        out["meta"]["warnings"] = filter_warnings

    if total == 0:
        out["meta"]["mensaje"] = "No se encontraron productos con los criterios dados."
        out["meta"]["sugerencia"] = (
            "Revisa que las marcas/categorías pertenezcan a las listas cerradas o amplía la consulta."
        )
    elif total > limite:
        out["meta"]["mensaje"] = f"Se encontraron {total} productos, mostrando los primeros {limite}."

    return out


# ------------------ Tool ------------------

@tool(args_schema=BuscarProductosArgs)
async def buscar_productos(
    consulta: str,
    marcas: Optional[List[str]] = None,
    categorias: Optional[List[str]] = None,
    tipo_repuesto: Optional[str] = None,
    precio_max: Optional[float] = None,
    limite_resultados: int = 10,
    formato: Literal["dict", "json"] = "dict",
) -> Union[Dict, str]:
    """
    Búsqueda de productos (motor híbrido: vector + metadatos).

    Requisitos:
      - 'consulta' siguiendo los formatos definidos (Repuestos/Llantas).
      - Filtros opcionales: 'marcas', 'categorias', 'tipo_repuesto', 'precio_max'.
        * 'marcas' y 'categorias' deben pertenecer a sus listas cerradas (ver descripción del esquema).
          Entradas no válidas serán ignoradas.

    Devuelve:
      - meta: {encontrados, mostrados, hay_mas, applied_filters, mensaje?, sugerencia?, warnings?}
      - productos: [{id, titulo, marca, categoria, subcategoria, precio, moneda, url, imagen, score, modelos_compatibles?, dimensiones?, tipo?}]
      - facets: {marcas: {marca:count}, categorias: {categoria:count}}
    """
    try:
        # Normalización/validación de listas cerradas
        filter_warnings: List[str] = []

        marcas_validas = _filter_allowed(marcas, ALLOWED_MARCAS)
        if marcas and not marcas_validas:
            filter_warnings.append(
                "Las 'marcas' provistas no pertenecen a la lista cerrada; se ignoraron."
            )

        categorias_validas = _filter_allowed(categorias, ALLOWED_CATEGORIAS)
        if categorias and not categorias_validas:
            filter_warnings.append(
                "Las 'categorias' provistas no pertenecen a la lista cerrada; se ignoraron."
            )

        # Filtros limpios (no vacíos)
        applied_filters: Dict = {}
        if marcas_validas:
            applied_filters["marcas"] = marcas_validas
        if categorias_validas:
            applied_filters["categorias"] = categorias_validas
        if tipo_repuesto:
            applied_filters["tipo_repuesto"] = _normalize_str(tipo_repuesto)
        if precio_max is not None:
            applied_filters["precio_max"] = precio_max

        logger.info(
            f"[buscar_productos] consulta='{consulta}' "
            f"marcas={applied_filters.get('marcas')} categorias={applied_filters.get('categorias')} "
            f"tipo={applied_filters.get('tipo_repuesto')} precio_max={precio_max}"
        )

        service = get_search_service()

        results = await service.search_products(
            query=consulta,
            search_type="hybrid",
            marcas=applied_filters.get("marcas"),
            categorias=applied_filters.get("categorias"),
            tipo_repuesto=applied_filters.get("tipo_repuesto"),
            precio_max=applied_filters.get("precio_max"),
            limit=limite_resultados * 2,  # buscamos un poco más para tener opciones
        )

        payload = _format_results(results, limite_resultados, applied_filters, filter_warnings)
        return json.dumps(payload, ensure_ascii=False, indent=2) if formato == "json" else payload

    except Exception as e:
        logger.exception(f"[buscar_productos] Error: {e}")
        error_payload = {
            "meta": {
                "encontrados": 0, "mostrados": 0, "hay_mas": False,
                "applied_filters": {},
                "mensaje": "Error temporal en la búsqueda.",
                "sugerencia": "Verifica marcas/categorías (listas cerradas) o ajusta la consulta y el precio máximo.",
            },
            "productos": [],
            "facets": {"marcas": {}, "categorias": {}},
        }
        return json.dumps(error_payload, ensure_ascii=False, indent=2) if formato == "json" else error_payload

In [None]:
await buscar_productos.ainvoke({
    "consulta": "Pastilas de freno Honda CB190R",
    "precio_max": 600000,
    "limite_resultados": 8
})

### Registro de Tools

In [None]:
from langchain_tavily import TavilySearch

TOOLS = [
    obtener_marcas,
    obtener_modelos,
    obtener_marcas_llantas,
    obtener_categorias_full,
    buscar_productos,
    TavilySearch(max_results=5) # Nombre de la tool es: tavily_search
]

In [None]:
obtener_categorias_full.invoke({})

## Agente 

### Prompt

In [None]:
SYSTEM_PROMPT = """
  Aquí se pone el prompt base. Por motivos de confidencialidad, no se muestra éste en este repositorio.
"""

In [None]:
from langchain_core.prompts import ChatPromptTemplate

assistant_prompt = ChatPromptTemplate.from_messages(
    [
        ("system", SYSTEM_PROMPT),
        ("placeholder", "{messages}"),
    ]
)


assistant_chain = assistant_prompt | model.bind_tools(TOOLS)

## Definir Grafo

### State

In [None]:
from typing import Annotated
from typing_extensions import TypedDict
from langgraph.graph.message import AnyMessage, add_messages


class State(TypedDict):
    messages: Annotated[list[AnyMessage], add_messages]

### Grafo

#### Utilities

In [None]:
from langchain_core.messages import ToolMessage
from langchain_core.runnables import RunnableLambda

from langgraph.prebuilt import ToolNode


def handle_tool_error(state) -> dict:
    error = state.get("error")
    tool_calls = state["messages"][-1].tool_calls
    return {
        "messages": [
            ToolMessage(
                content=f"Error: {repr(error)}\n please fix your mistakes.",
                tool_call_id=tc["id"],
            )
            for tc in tool_calls
        ]
    }


def create_tool_node_with_fallback(tools: list) -> dict:
    return ToolNode(tools).with_fallbacks(
        [RunnableLambda(handle_tool_error)], exception_key="error"
    )


def _print_event(event: dict, _printed: set, max_length=1500):
    current_state = event.get("dialog_state")
    if current_state:
        print("Currently in: ", current_state[-1])
    message = event.get("messages")
    if message:
        if isinstance(message, list):
            message = message[-1]
        if message.id not in _printed:
            msg_repr = message.pretty_repr(html=True)
            if len(msg_repr) > max_length:
                msg_repr = msg_repr[:max_length] + " ... (truncated)"
            print(msg_repr)
            _printed.add(message.id)

In [None]:
from langchain_core.runnables import Runnable, RunnableConfig
from langgraph.checkpoint.memory import InMemorySaver
from langgraph.graph import END, StateGraph, START
from langgraph.prebuilt import tools_condition

class Assistant:
    def __init__(self, runnable: Runnable):
        self.runnable = runnable

    def __call__(self, state: State, config: RunnableConfig):
        while True:
            result = self.runnable.invoke(state)
            # If the LLM happens to return an empty response, we will re-prompt it
            # for an actual response.
            if not result.tool_calls and (
                not result.content
                or isinstance(result.content, list)
                and not result.content[0].get("text")
            ):
                messages = state["messages"] + [("user", "Respond with a real output.")]
                state = {**state, "messages": messages}
            else:
                break
        return {"messages": result}


builder = StateGraph(State)
builder.add_node("assistant", Assistant(assistant_chain))
builder.add_node("tools", create_tool_node_with_fallback(TOOLS))
builder.add_edge(START, "assistant")
builder.add_conditional_edges(
    "assistant",
    tools_condition,
)
builder.add_edge("tools", "assistant")

memory = InMemorySaver()

graph = builder.compile(
    checkpointer=memory,
)

In [None]:
from IPython.display import Image, display

try:
    display(Image(graph.get_graph(xray=True).draw_mermaid_png()))
except Exception:
    pass

## Pruebas

In [None]:
import uuid

conv1_questions = [
    "Hola, busco pastillas de freno para mi moto CB 190 R",
    "Tambien busco llantas para mi moto, que me recomiendas?",
]

thread_id = str(uuid.uuid4())
config = {
    "configurable": {
        "thread_id": thread_id,
    },
    "recursion_limit": 10,  # útil para evitar loops largos
}

_printed = set()

async def run_demo():
    for question in conv1_questions:
        # LangGraph espera una LISTA de mensajes, no un solo tuple suelto
        input_state = {"messages": [("user", question)]}
        # astream -> async generator
        async for event in graph.astream(input_state, config, stream_mode="values"):
            _print_event(event, _printed)

In [None]:
await run_demo()

In [None]:
input_state = {"messages": [("user", "Si, ayudame con eso")]}
# astream -> async generator
async for event in graph.astream(input_state, config, stream_mode="values"):
    _print_event(event, _printed)

In [None]:
input_state = {"messages": [("user", "Que marcas de llantas tienes?")]}
# astream -> async generator
async for event in graph.astream(input_state, config, stream_mode="values"):
    _print_event(event, _printed)

In [None]:
input_state = {"messages": [("user", "quiero llantas delantera marca MICHELIN o PIRELLI")]}
# astream -> async generator
async for event in graph.astream(input_state, config, stream_mode="values"):
    _print_event(event, _printed) 