In [1]:

%%writefile server.py
# server.py
import sys, logging
from mcp.server.fastmcp import FastMCP

# set up logging to stderr and to a file
logger = logging.getLogger("mcp-server")
logger.setLevel(logging.DEBUG)
fmt = logging.Formatter("%(asctime)s [%(levelname)s] %(message)s")

stderr_h = logging.StreamHandler(sys.stderr)
stderr_h.setFormatter(fmt)
file_h = logging.FileHandler("server.log", encoding="utf-8")
file_h.setFormatter(fmt)

logger.handlers.clear()
logger.addHandler(stderr_h)
logger.addHandler(file_h)

mcp = FastMCP("demo")

@mcp.tool()
def add(a: int, b: int) -> int:
    """
    Add two integers and return the sum.

    When to use:
      - Any arithmetic addition request, including multi-step math where a sum is needed.

    Constraints:
      - Only integers. For floats, round first or ask the user to confirm.

    Examples:
      add(a=2, b=40) -> 42
      add(a=321, b=123) -> 444
    """
    logger.debug(f"add() called with a={a}, b={b}")
    return a + b

@mcp.tool()
def echo(text: str) -> str:
    """
    Return the same text that was passed in.

    When to use:
      - The user asks to reflect, quote, or transform exactly without changes.

    Examples:
      echo(text="MCP works") -> "MCP works"
    """
    logger.debug(f"echo() called with text={text!r}")
    return text

if __name__ == "__main__":
    logger.debug("MCP server starting...")
    mcp.run("stdio")



Overwriting server.py


In [2]:
#pip install mcp openai

In [3]:
#pip install --upgrade pip

In [4]:

import os, json, asyncio, sys
from typing import Dict, Any, List
from openai import OpenAI

from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio_client

def json_read(filename: str):
    if os.path.isfile(filename):
        with open(filename, "r", encoding="utf-8") as f:
            data = json.load(f)
        return data, True
    return {}, False

async def tail_file(path: str):
    try:
        with open(path, "r", encoding="utf-8") as f:
            f.seek(0, 2)  # jump to end
            while True:
                line = f.readline()
                if not line:
                    await asyncio.sleep(0.2)
                    continue
                print("[SERVER]", line.rstrip())
    except FileNotFoundError:
        # file may not exist yet on first run
        for _ in range(25):
            await asyncio.sleep(0.2)
            try:
                with open(path, "r", encoding="utf-8") as f:
                    break
            except FileNotFoundError:
                pass
        # try again recursively if it appeared
        return await tail_file(path)

async def build_system_prompt(session) -> str:
    tools = (await session.list_tools()).tools
    lines = [
        "You can call tools when useful. Prefer precise tool usage. If no tool is needed, answer directly.",
        "",
        "Available tools:"
    ]
    for t in tools:
        desc = (t.description or "").strip()
        # grab the first paragraph only for brevity
        first_para = desc.split("\n\n", 1)[0]
        lines.append(f"- {t.name}: {first_para}")
    lines.append("")
    lines.append("Use tool parameters exactly as defined in the provided tool schemas.")
    return "\n".join(lines)

api_data, ok = json_read("key.json")
if not ok:
    raise FileNotFoundError("key.json not found. Please create it.")

PROVIDER_BASE = api_data.get("PROVIDER_BASE", "").strip()
API_KEY       = api_data.get("OPENAI_API_KEY", "").strip()
MODEL         = api_data.get("LLM_MODEL", "gpt-4o-mini").strip()
SERVER_PATH   = api_data.get("MCP_SERVER", "server.py").strip()

#SYSTEM_PROMPT = (
#    "You can call tools when useful. Use add for arithmetic, echo for reflecting text. "
#    "If no tool is needed, answer directly."
#)

def make_client() -> OpenAI:
    # Always pass the API key from JSON to avoid relying on environment variables
    if PROVIDER_BASE:
        return OpenAI(base_url=PROVIDER_BASE, api_key=API_KEY)
    return OpenAI(api_key=API_KEY)

def schema_from_mcp_tool(t) -> Dict[str, Any]:
    # Convert an MCP ToolDescriptor to OpenAI tool schema
    params = getattr(t, "inputSchema", None) or getattr(t, "input_schema", None)
    if not params:
        params = {"type": "object", "properties": {}, "additionalProperties": False}
    return {
        "type": "function",
        "function": {
            "name": t.name,
            "description": getattr(t, "description", "") or "",
            "parameters": params,
        },
    }

def extract_text_content(mcp_result) -> str:
    parts = []
    for c in getattr(mcp_result, "content", []) or []:
        text = getattr(c, "text", None)
        if text is not None:
            parts.append(text)
    return "\n".join(parts) if parts else ""

async def build_openai_tools(session: ClientSession) -> List[Dict[str, Any]]:
    tools_resp = await session.list_tools()
    return [schema_from_mcp_tool(t) for t in tools_resp.tools]

async def run_chat_once(user_msg: str) -> str:
    client = make_client()

    # Spawn the MCP server over stdio and open a session
    params = StdioServerParameters(command=sys.executable, args=[SERVER_PATH])

    # start tail task before connecting
    log_task = asyncio.create_task(tail_file("server.log"))
    
    async with stdio_client(params) as (read_stream, write_stream):
        async with ClientSession(read_stream, write_stream) as session:
            await session.initialize()

            openai_tools = await build_openai_tools(session)

            print(openai_tools)

            #system_prompt
            system_prompt = await build_system_prompt(session)
            print(system_prompt)

            messages: List[Dict[str, Any]] = [
                {"role": "system", "content": system_prompt},
                {"role": "user", "content": user_msg},
            ]

            for _ in range(5):
                chat = client.chat.completions.create(
                    model=MODEL,
                    messages=messages,
                    tools=openai_tools,
                    tool_choice="auto",
                )
                choice = chat.choices[0]
                finish = choice.finish_reason

                if finish == "tool_calls" and choice.message.tool_calls:
                    tool_messages = []
                    for call in choice.message.tool_calls:
                        name = call.function.name
                        args = json.loads(call.function.arguments or "{}")
                        result = await session.call_tool(name, args)
                        tool_messages.append({
                            "role": "tool",
                            "tool_call_id": call.id,
                            "name": name,
                            "content": extract_text_content(result),
                        })
                    messages = messages + [choice.message] + tool_messages
                    continue

                return choice.message.content or ""

    # stop the tailer
    log_task.cancel()
    try:
        await log_task
    except asyncio.CancelledError:
        pass
        
    return ""

# Notebook-friendly entry point
def run_demo(prompt: str = "Please add 123 and 456 using the tool."):
    # Jupyter often has an existing event loop. Use nest_asyncio to allow nested runs.
    import nest_asyncio, asyncio
    nest_asyncio.apply()
    try:
        loop = asyncio.get_event_loop()
        return loop.run_until_complete(run_chat_once(prompt))
    except RuntimeError:
        # Fallback: create a new loop
        return asyncio.run(run_chat_once(prompt))


In [5]:
import os
os.environ["OPENAI_LOG"] = "" 

print(run_demo("Add 321 and 123 using the tool, and then echo 'MCP works'."))


[{'type': 'function', 'function': {'name': 'add', 'description': '\n    Add two integers and return the sum.\n\n    When to use:\n      - Any arithmetic addition request, including multi-step math where a sum is needed.\n\n    Constraints:\n      - Only integers. For floats, round first or ask the user to confirm.\n\n    Examples:\n      add(a=2, b=40) -> 42\n      add(a=321, b=123) -> 444\n    ', 'parameters': {'properties': {'a': {'title': 'A', 'type': 'integer'}, 'b': {'title': 'B', 'type': 'integer'}}, 'required': ['a', 'b'], 'title': 'addArguments', 'type': 'object'}}}, {'type': 'function', 'function': {'name': 'echo', 'description': '\n    Return the same text that was passed in.\n\n    When to use:\n      - The user asks to reflect, quote, or transform exactly without changes.\n\n    Examples:\n      echo(text="MCP works") -> "MCP works"\n    ', 'parameters': {'properties': {'text': {'title': 'Text', 'type': 'string'}}, 'required': ['text'], 'title': 'echoArguments', 'type': 'ob

In [6]:
print(run_demo("devide 321 by 123 using the tool, and then echo 'MCP works'."))


[{'type': 'function', 'function': {'name': 'add', 'description': '\n    Add two integers and return the sum.\n\n    When to use:\n      - Any arithmetic addition request, including multi-step math where a sum is needed.\n\n    Constraints:\n      - Only integers. For floats, round first or ask the user to confirm.\n\n    Examples:\n      add(a=2, b=40) -> 42\n      add(a=321, b=123) -> 444\n    ', 'parameters': {'properties': {'a': {'title': 'A', 'type': 'integer'}, 'b': {'title': 'B', 'type': 'integer'}}, 'required': ['a', 'b'], 'title': 'addArguments', 'type': 'object'}}}, {'type': 'function', 'function': {'name': 'echo', 'description': '\n    Return the same text that was passed in.\n\n    When to use:\n      - The user asks to reflect, quote, or transform exactly without changes.\n\n    Examples:\n      echo(text="MCP works") -> "MCP works"\n    ', 'parameters': {'properties': {'text': {'title': 'Text', 'type': 'string'}}, 'required': ['text'], 'title': 'echoArguments', 'type': 'ob