## list tools

In [8]:
import asyncio
import json
from fastmcp import Client
import os
import json
from fastmcp.client.transports import StreamableHttpTransport

PORT = os.getenv("PORT", "9000")
MCP_PATH = os.getenv("MCP_PATH", "/mcp/knowledge")

async def example_list_tools():
    async with Client(transport=StreamableHttpTransport(
        f"http://127.0.0.1:{PORT}{MCP_PATH}",
        headers={"x-forwarded-user": "123"},
    )) as client:
        await client.ping()
        
        # List available operations
        tools = await client.list_tools()
        for tool in tools:
            print(tool.name)

await example_list_tools()

create_knowledge_set
list_knowledge_sets
delete_knowledge_set
ingest_file
list_files
remove_file
query


In [None]:

import asyncio
import json
from fastmcp import Client
import os
import json
from fastmcp.client.transports import StreamableHttpTransport

PORT = os.getenv("PORT", "9000")
MCP_PATH = os.getenv("MCP_PATH", "/mcp/knowledge")

async def example_list_tools():
    async with Client(transport=StreamableHttpTransport(
        f"http://127.0.0.1:{PORT}{MCP_PATH}",
        headers={"x-forwarded-user": "123"},
    )) as client:
        await client.ping()
        
        # List available operations
        tools = await client.list_tools()
        for tool in tools:
            print(tool.name)

await example_list_tools()


In [5]:
# set the test user id
test_user_id = "test_user"

# Management

## list knowledge set

In [6]:
async def example_list_knowledge_sets():
    async with Client(transport=StreamableHttpTransport(
        f"http://127.0.0.1:{PORT}{MCP_PATH}",
        headers={"x-forwarded-user": test_user_id},
        
    )) as client:
        await client.ping()
        tenants = await client.call_tool(
            name="list_knowledge_sets",
            arguments={},
        )
        if tenants:
            return json.loads(tenants[0].text)
        else:
            return []


list_knowledge_sets = await example_list_knowledge_sets()
print(json.dumps(list_knowledge_sets, indent=4))

{
    "knowledge_set_id": "3a79892f-773b-44ac-97ca-6bc7cfbc4f73",
    "created_at": "2025-07-09T02:20:18.325431Z"
}


## create knowledge set

In [None]:
async def example_create_knowledge_set():
    async with Client(transport=StreamableHttpTransport(
        f"http://127.0.0.1:{PORT}{MCP_PATH}",
        headers={"x-forwarded-user": test_user_id},
    )) as client:
        await client.ping()
        tenants = await client.call_tool(
            name="create_knowledge_set",
            arguments={
            },
        )
        print("created knowledge set")
        print(tenants[0].text)
        return json.loads(tenants[0].text)["knowledge_set_id"]

knowledge_set_id = await example_create_knowledge_set()

## delete knowledge set

In [None]:
async def example_delete_knowledge_set(knowledge_set_id):
    async with Client(transport=StreamableHttpTransport(
        f"http://127.0.0.1:{PORT}{MCP_PATH}",
        headers={"x-forwarded-user": test_user_id},
    )) as client:
        await client.ping()
        tenants = await client.call_tool(
            name="delete_knowledge_set",
            arguments={
                "knowledge_set_id": knowledge_set_id,
            },
        )
        print(f"deleted knowledge set {knowledge_set_id}")
        print(tenants[0].text)

await example_delete_knowledge_set(knowledge_set_id)

# Knowledge Ingest, query, remove


In [None]:
res = await example_list_knowledge_sets()
if isinstance(res, list):
    test_knowledge_set_id = res[-1]["knowledge_set_id"]
else:
    test_knowledge_set_id = res["knowledge_set_id"]
test_knowledge_set_id

## list files

In [None]:
async def example_list_files():
    async with Client(transport=StreamableHttpTransport(
        f"http://127.0.0.1:{PORT}{MCP_PATH}",
        headers={"x-forwarded-user": test_user_id},
    )) as client:
        await client.ping()
        files = await client.call_tool(
            name="list_files",
            arguments={
                "knowledge_set_id": test_knowledge_set_id,
            },
        )
        if files:
            return json.loads(files[0].text)
        else:
            return []

await example_list_files()

## Ingest file

In [15]:
import base64

async def example_ingest_file(file_path: str):
    async with Client(transport=StreamableHttpTransport(
        f"http://127.0.0.1:{PORT}{MCP_PATH}",
        headers={"x-forwarded-user": test_user_id},
    )) as client:
        await client.ping()
        with open(file_path, "rb") as f:
            content = f.read()
        # Encode binary content as base64 string for MCP transport
        content_b64 = base64.b64encode(content).decode('utf-8')
        files = await client.call_tool(
            name="ingest_file",
            arguments={
                "knowledge_set_id": test_knowledge_set_id,
                "filename": file_path.split("/")[-1],
                "content": content_b64
            },
        )
        if files:
            return json.loads(files[0].text)

In [None]:
file_id_acorn = await example_ingest_file("test_acorn.txt")
file_id_acorn


In [None]:
file_id_fastmcp = await example_ingest_file("test_fastmcp.txt")
file_id_fastmcp

In [None]:
file_id_pdf = await example_ingest_file("test_invest.pdf")
file_id_pdf

## Remove File

In [13]:
async def example_remove_file(file_id: str):
    async with Client(transport=StreamableHttpTransport(
        f"http://127.0.0.1:{PORT}{MCP_PATH}",
        headers={"x-forwarded-user": test_user_id},
    )) as client:
        await client.ping()
        await client.call_tool(
            
            name="remove_file",
            arguments={
                "knowledge_set_id": test_knowledge_set_id,
                "file_id": file_id,
            },
        )
        print("file removed")

In [None]:
await example_remove_file(file_id_fastmcp["file_id"])
await example_remove_file(file_id_acorn["file_id"])

In [None]:
file_lists = await example_list_files()
file_lists

## Query

In [20]:
async def example_query(query_text: str):
    async with Client(transport=StreamableHttpTransport(
        f"http://127.0.0.1:{PORT}{MCP_PATH}",
        headers={"x-forwarded-user": test_user_id},
    )) as client:
        await client.ping()
        query_result = await client.call_tool(
            name="query",
            arguments={
                "query_text": query_text,
                "knowledge_set_id": test_knowledge_set_id,
            },
        )
        return json.loads(query_result[0].text)


In [None]:
query_text = "tell me something about acorn labs?"
res = await example_query(query_text)
res

In [None]:
query_text = "how does fastmcp work?"
res = await example_query(query_text)
res

In [None]:
query_text = "tell me something about the invest strategy?"
res = await example_query(query_text)
res