In [None]:
# RAG Quickstart – iceos-client
# Configure ICE_API_URL and ICE_API_TOKEN in your environment before running.
from __future__ import annotations

import asyncio
import json
import os

from ice_client import IceClient

ICE_API_URL = os.getenv("ICE_API_URL", "http://localhost:8000")
ICE_API_TOKEN = os.getenv("ICE_API_TOKEN", "dev-token")
print("API:", ICE_API_URL)



In [None]:
# Ingest a small sample via MCP memory_write_tool
async def ingest_sample(client: IceClient) -> None:
    init = await client._client.post(
        "/api/v1/mcp/",
        json={"jsonrpc": "2.0", "id": 0, "method": "initialize", "params": {}},
    )
    init.raise_for_status()
    text = "Paris is the capital of France."
    payload = {
        "jsonrpc": "2.0",
        "id": 1,
        "method": "tools/call",
        "params": {
            "name": "tool:memory_write_tool",
            "arguments": {"inputs": {"key": "fact", "content": text, "scope": "kb"}},
        },
    }
    r = await client._client.post("/api/v1/mcp/", json=payload)
    r.raise_for_status()


In [None]:
# Run ChatKit Bundle and display result
async def run_rag() -> None:
    async with IceClient() as client:
        await ingest_sample(client)
        exec_id = await client.run(
            blueprint_id="chatkit.rag_chat",
            inputs={
                "query": "What is the capital of France?",
                "org_id": "demo_org",
                "user_id": "demo_user",
                "session_id": "s1",
            },
        )
        final = await client.poll_until_complete(exec_id, timeout=60)
        print(json.dumps(final, indent=2))

await run_rag()
