In [1]:
%%writefile server.py
import mcp.types as types

from mcp import Tool
from mcp.server.sse import SseServerTransport
from mcp.server import Server
from mcp.server import Server

from starlette.applications import Starlette
from starlette.routing import Route

import uvicorn
import httpx

app = Server("mcp-server")
sse = SseServerTransport("/messages")

port = 8000

async def handle_sse(request):
    async with sse.connect_sse(
            request.scope, request.receive, request._send
    ) as streams:
        await app.run(
            streams[0], streams[1], app.create_initialization_options()
        )

async def handle_messages(request):
    await sse.handle_post_message(request.scope, request.receive, request._send)

Writing server.py


In [2]:
%%writefile -a server.py

async def fetch_website(
        url: str,
) -> list[types.TextContent | types.ImageContent | types.EmbeddedResource]:
    headers = {
        "User-Agent": "MCP Test Server (github.com/modelcontextprotocol/python-sdk)"
    }
    async with httpx.AsyncClient(follow_redirects=True, headers=headers) as client:
        response = await client.get(url)
        response.raise_for_status()
        return [types.TextContent(type="text", text=response.text)]

Appending to server.py


In [3]:
%%writefile -a server.py

@app.call_tool()
async def call_tool(
  name: str, arguments: dict
) -> list[types.TextContent | types.ImageContent | types.EmbeddedResource]:
    if name == "fetch":
        if "url" not in arguments:
            raise ValueError("Missing required argument 'url'")
        return await fetch_website(arguments["url"])
    else:
        raise ValueError(f"Unknown tool '{name}'")

@app.list_tools()
async def list_tools() -> list[types.Tool]:
    return [
        types.Tool(
            name="fetch",
            description="Fetches a website and returns its content",
            inputSchema={
                "type": "object",
                "required": ["url"],
                "properties": {
                    "url": {
                        "type": "string",
                        "description": "URL to fetch",
                    }
                },
            },
        )
    ]

Appending to server.py


In [5]:
%%writefile -a server.py

starlette_app = Starlette(
    debug=True,
    routes=[
        Route("/sse", endpoint=handle_sse, methods=["GET"]),
        Route("/messages", endpoint=handle_messages, methods=["POST"]),
    ],
)

# 使用uvicorn运行服务器
if __name__ == "__main__":
    import uvicorn
    uvicorn.run(starlette_app, host="0.0.0.0", port=port)

Appending to server.py


In [6]:
!nohup python server.py &

OSError: Background processes not supported.

In [4]:
import threading
import subprocess
import time

def run_mcp_server():
  subprocess.Popen(["python", "server.py"])

thread = threading.Thread(target=run_mcp_server)
thread.start()
time.sleep(5)

In [5]:
!curl 127.0.0.1:8000

Not Found


  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed

  0     0    0     0    0     0      0      0 --:--:-- --:--:-- --:--:--     0
100     9  100     9    0     0   1218      0 --:--:-- --:--:-- --:--:--  1285


# Client

In [1]:
import asyncio
import json
import boto3
import logging
from mcp.client.session import ClientSession
from mcp.client.sse import sse_client

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

In [2]:
def convert_tool_format(tools):
    """
    将工具转换为Ollama所需的格式。

    https://ollama.com/blog/tool-support

    参数：
        tools (list): 工具对象列表

    返回：
        dict: Ollama所需格式的工具
    """
    converted_tools = []

    for tool in tools:
        converted_tool = {
             'type': 'function',
             'function': {
                 'name': tool.name,
                 'description': tool.description,
                 'parameters': tool.inputSchema
                 }
             }
        converted_tools.append(converted_tool)

    return converted_tools

In [3]:
import uuid

async def call_tool(session, response, messages):
    # 请求工具使用。调用工具并将结果发送给模型。
    print("call_tool")
    tool_requests = response.message.tool_calls
    if not tool_requests:
        raise ValueError("No tool requests found in response")

    for tool_request in tool_requests:
        tool = tool_request.function
        if isinstance(tool.arguments, str):
            print("load as str to json")
            arguments = json.loads(tool.arguments)
        else:
            arguments = tool.arguments
        print("arguments instance:", type(arguments).__name__)
        print(f"Requesting tool: {tool.name}, arguments: {arguments}")

        try:
            # 通过MCP会话调用工具
            tool_response = await session.call_tool(tool.name, arguments)

            #print("Tool Response: ", tool_response)

            # 将工具响应转换为预期格式
            tool_result = {
                "toolUseId": tool.name + str(uuid.uuid4()),
                "content": [{"text": str(tool_response)}]
            }

        except Exception as err:
            logger.error("Tool call failed: %s", str(err))
            tool_result = {
                "toolUseId": tool['toolUseId'],
                "content": [{"text": f"Error: {str(err)}"}],
                "status": "error"
            }

        # 将工具结果添加到消息中
        messages.append({
            "role": "user",
            "content": str({"toolResult": tool_result})
        })
    return messages

In [None]:
from ollama import ChatResponse, Client

#import os
#os.environ['OLLAMA_HOST'] = 'http://192.168.16.237:11434'
client = Client(host='http://192.168.16.237:11434')

async def converse_using_ollama(session, messages, tools):
    while True:
        converted_tools = convert_tool_format(tools)
        print("messages: ", str(messages))
        response: ChatResponse = client.chat(
            model="qwen2.5:7b",
            messages=messages,
            tools=converted_tools
        )
        print("Response from Ollama:")
        print(response.message.content)
        output_message = response['message']
        messages.append(output_message)

        if response.message.tool_calls:
            await call_tool(session, response, messages)
        else:
            print("No more tool use requests, we're done")
            break

from openai import OpenAI

openai_client = OpenAI(
    base_url="https://api.siliconflow.cn",
    api_key="sk-your-openai-api-key")
model = "Qwen/Qwen2.5-72B-Instruct-128K"
#model = "Pro/deepseek-ai/DeepSeek-V3"

async def converse_using_openai(session, messages, tools):
    while True:
        converted_tools = convert_tool_format(tools)
        print(f"messages[{len(str(messages))}]: {str(messages)}")
        response = openai_client.chat.completions.create(
            model=model,
            messages=messages,
            tools=converted_tools
        )
        print("Response from OpenAI:")
        print(response.choices[0].message.content)
        output_message = response.choices[0].message
        messages.append(output_message)

        if response.choices[0].message.tool_calls:
            await call_tool(session, response.choices[0], messages)
        else:
            print("No more tool use requests, we're done")
            break

In [8]:
async def complete(message):
    logger.info("Starting session")
    messages = []

    async with sse_client("http://localhost:8000/sse") as streams:
        async with ClientSession(streams[0], streams[1]) as session:
            try:
                await session.initialize()
                # 稍等片刻，让会话初始化
                await asyncio.sleep(1)
                logger.info("Session initialized")

                # 列出可用工具并转换为可序列化格式
                tools_result = await session.list_tools()
                tools_list = [{
                    "name": tool.name, 
                    "description": tool.description,
                    "inputSchema": tool.inputSchema} for tool in tools_result.tools]
                logger.info("Available tools: %s", tools_list)

                # Use Chinese website first, 
                system_message = { 
                    "role" : "system", 
                    "content" : "You are a helpful AI assistant. 使用中文回答, Use Wiki website first, You have access to the following tools: " + json.dumps(tools_list) + " Use these tools if called to answer any questions posed by the prompt (user)."}
                messages.append(system_message)
                messages.append(message)
            except TypeError as err:
                logger.error("Tool call failed for tools: %s - with error", tool_request, str(err))
                tool_result = {
                    "content": [{"text": f"Error: {str(err)}"}],
                    "status": "error"
                }
            #await converse_using_ollama(session, messages, tools_result.tools)
            await converse_using_openai(session, messages, tools_result.tools)

In [9]:
message = { 
  "role" : "user", 
  "content" : "Describe the planet pluto using any information available from the supplied tools?" 
}

await complete(message)

INFO:__main__:Starting session
INFO:mcp.client.sse:Connecting to SSE endpoint: http://localhost:8000/sse
INFO:httpx:HTTP Request: GET http://localhost:8000/sse "HTTP/1.1 200 OK"
INFO:mcp.client.sse:Received endpoint URL: http://localhost:8000/messages?session_id=4ee8fdb568e247ab90f8a7d262311a1e
INFO:mcp.client.sse:Starting post writer with endpoint URL: http://localhost:8000/messages?session_id=4ee8fdb568e247ab90f8a7d262311a1e
INFO:httpx:HTTP Request: POST http://localhost:8000/messages?session_id=4ee8fdb568e247ab90f8a7d262311a1e "HTTP/1.1 202 Accepted"
INFO:httpx:HTTP Request: POST http://localhost:8000/messages?session_id=4ee8fdb568e247ab90f8a7d262311a1e "HTTP/1.1 202 Accepted"
INFO:__main__:Session initialized
INFO:httpx:HTTP Request: POST http://localhost:8000/messages?session_id=4ee8fdb568e247ab90f8a7d262311a1e "HTTP/1.1 202 Accepted"
INFO:__main__:Available tools: [{'name': 'fetch', 'description': 'Fetches a website and returns its content', 'inputSchema': {'type': 'object', 'req

messages[541]: [{'role': 'system', 'content': 'You are a helpful AI assistant. 使用中文回答, Use Wiki website first, You have access to the following tools: [{"name": "fetch", "description": "Fetches a website and returns its content", "inputSchema": {"type": "object", "required": ["url"], "properties": {"url": {"type": "string", "description": "URL to fetch"}}}}] Use these tools if called to answer any questions posed by the prompt (user).'}, {'role': 'user', 'content': 'Describe the planet pluto using any information available from the supplied tools?'}]


INFO:httpx:HTTP Request: POST https://api.siliconflow.cn/chat/completions "HTTP/1.1 200 OK"


Response from OpenAI:

call_tool
load as str to json
arguments instance: dict
Requesting tool: fetch, arguments: {'url': 'https://zh.wikipedia.org/wiki/冥王星'}


INFO:httpx:HTTP Request: POST http://localhost:8000/messages?session_id=4ee8fdb568e247ab90f8a7d262311a1e "HTTP/1.1 202 Accepted"


messages[90849]: [{'role': 'system', 'content': 'You are a helpful AI assistant. 使用中文回答, Use Wiki website first, You have access to the following tools: [{"name": "fetch", "description": "Fetches a website and returns its content", "inputSchema": {"type": "object", "required": ["url"], "properties": {"url": {"type": "string", "description": "URL to fetch"}}}}] Use these tools if called to answer any questions posed by the prompt (user).'}, {'role': 'user', 'content': 'Describe the planet pluto using any information available from the supplied tools?'}, ChatCompletionMessage(content='', refusal=None, role='assistant', annotations=None, audio=None, function_call=None, tool_calls=[ChatCompletionMessageToolCall(id='0195f5a728fd4bb4ffd9ac11016162b4', function=Function(arguments='{"url": "https://zh.wikipedia.org/wiki/冥王星"}', name='fetch'), type='function')]), {'role': 'user', 'content': '{\'toolResult\': {\'toolUseId\': \'fetch3a78fdd2-7490-488d-b978-727943c20dd3\', \'content\': [{\'text\':

INFO:httpx:HTTP Request: POST https://api.siliconflow.cn/chat/completions "HTTP/1.1 200 OK"


Response from OpenAI:
冥王星（冥神星）是一颗位于太阳系外围柯伊伯带中的矮行星。1930年克莱德·汤博发现冥王星，并将其视为太阳系中的第九行星。然而，自1992年以后，柯伊伯带中发现的一些质量与冥王星相似的冰製天体对冥王星的行星地位提出挑战，最终2006年国际天文联合会（IAU）对行星的定义进行了修正，因冥王星未能清理其轨道上的其它小天体，因此将其降级为矮行星，以区别于其它八大古典行星。

冥王星的轨道离心率及倾角都相对较高，其近日点距离太阳30天文单位（44亿公里），远日点则为49天文单位（74亿公里）。阳光需要大约5.5小时才能从太阳传播到冥王星。

科学家已经确认冥王星有五颗卫星，分别是冥卫一（卡戎）、冥卫二（尼克斯）、冥卫三（许德拉）、冥卫四（科伯罗斯）和冥卫五（斯提克斯）。冥卫一的质量和体积超过其余四颗卫星，且冥王星与冥卫一的共同质心位于二者外部，使其有时被视为双矮行星系统，虽然IAU并未正式定义矮行星联星，冥卫一仍被定义为冥王星的卫星。

新视野号探测器在2015年的冥王星飞掠任务中，收集到大量有关冥王星及其卫星的宝贵数据。例如，冥王星表面有显著的地形特征，存在冰火山和数千米高的山脉，被固态甲烷层和冰凍氮气覆盖，冰川温度大约在零下200摄氏度以下。新视野号还显示，冥王星的地下可能有半融化的冰形成的海洋，表明地下可能存在热源。

冥王星主要由岩石和冰构成，拥有稀薄的大气层，里面含有固态氮（超过98%）、微量甲烷、一氧化碳和极少的一氧化碳。冥王星的表面颜色和亮度变化较大，含有炭黑色、深橙色和白色，其中有一个特别明亮、大的区域，被称为「冥王星之心」。冥王星的地表有强烈的季节变化，自转轴与公转平面的夹角为120度，使其在至点时会持续接受阳光照射，而另外四分之一的表面则完全没有阳光照射。冥王星的地质特征有活跃冰火山的迹象，且存在明显的撞击坑和山梁等地貌。
No more tool use requests, we're done
