In [9]:

import asyncio
import json
import os
from typing import Optional
from contextlib import AsyncExitStack

from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio_client
from mcp.types import * 

from anthropic import Anthropic
from dotenv import load_dotenv
from openai import OpenAI

load_dotenv()  # load environment variables from .env

True

In [2]:

def get_tools_format( tools,type='qwen'):
    if type == 'qwen':
        available_tools = [{
            "type": "function",
            "function": {
                "name": tool.name,
                "description": tool.description,
                "parameters": tool.inputSchema
            }
        } for tool in tools]

    elif type == 'anthropic':
        available_tools = [{
            "name": tool.name,
            "description": tool.description,
            "input_schema": tool.inputSchema
        } for tool in tools]
    return available_tools

exit_stack = AsyncExitStack()
anthropic = Anthropic()
qwenClient = OpenAI(
    api_key=os.getenv("DASHSCOPE_API_KEY"),
    base_url="https://dashscope.aliyuncs.com/compatible-mode/v1",
)

In [39]:
mcpServersConfig = {
  "fileSystem": {
      "command": "npx",
      "args": [
        "-y",
        "@modelcontextprotocol/server-filesystem",
        "/home/mouka/mcp-test",
      ]
  },
  "weather": {
      "command": "python",
      "args": [
        "./server/weather.py",
      ]
  },
  "code_executor": {
      "command": "python",
      "args": [
        "./server/code_executor.py",
      ]
  },
}

mcpSessions = {}
available_tools = []
mcpToolsSessionMap = { }

async def connect_to_server(configs: dict):
    # fileSystemMCP = {
    #   "command": "npx",
    #   "args": [
    #     "-y",
    #     "@modelcontextprotocol/server-filesystem",
    #     "/home/mouka/mcp-test",
    #   ]
    # }

    for server_name, config in configs.items():
      server_params = StdioServerParameters(**config)
      stdio_transport = await exit_stack.enter_async_context(stdio_client(server_params))
      stdio, write = stdio_transport
      session = await exit_stack.enter_async_context(ClientSession(stdio, write))

      await session.initialize()

      # List available tools
      response = await session.list_tools()
      tools = response.tools
      print("\nConnected to server with tools:", [tool.name for tool in tools])

      available_tools.extend(get_tools_format(tools,type='qwen'))
      mcpSessions[server_name] = session
      for tool in tools:
        mcpToolsSessionMap[tool.name] = session

await connect_to_server(mcpServersConfig)


Connected to server with tools: ['read_file', 'read_multiple_files', 'write_file', 'edit_file', 'create_directory', 'list_directory', 'directory_tree', 'move_file', 'search_files', 'get_file_info', 'list_allowed_directories']

Connected to server with tools: ['get_alerts', 'get_forecast']

Connected to server with tools: ['execute_python_code', 'execute_bash_script']


In [63]:
from pprint import pprint
import time


def get_chat_completion(messages):
    response = qwenClient.chat.completions.create(
        model="qwen-turbo-2024-11-01",
        max_tokens=1000,
        messages=messages,
        tools=available_tools,
        tool_choice="auto",
        # parallel_tool_calls=True
    )
    return response

def get_tool_result_message(result:CallToolResult, tool_call_id:str, type='tool'):
    if type == 'tool':
        return {
                "content": result.content,
                "role": "tool",
                "tool_call_id": tool_call_id,
            }
    if type == 'user':
        return {
            "content": result.content,
            "role":"user",
            "name":"tool caller"
        }

async def get_response_2(messages):
    final_text = []
    isStop = False 

    round = 1
    while not isStop:
        print(f"{'='*20}Processing round {round} {'='*20}", )
        round+=1

        response = get_chat_completion(messages)
        assistant_output = response.choices[0].message
        if assistant_output.content:
            final_text.append(assistant_output.content)
            print(f"assistant message: {assistant_output.content}")

            if assistant_output.tool_calls is None:
                # this is the stop point of tool calling loop
                break
        else:
            assistant_output.content = ""

        messages.append(assistant_output)

        for idx, tool_call in enumerate(assistant_output.tool_calls):
            called_function = tool_call.function
            tool_name = called_function.name
            tool_args = json.loads(called_function.arguments)

            session:ClientSession = mcpToolsSessionMap[tool_name]
            if session is None:
                print(f'Cannot find servers for tool ${tool_name}')
                continue

            result = await session.call_tool(tool_name, tool_args)
            print(f"{idx}: [Calling tool {tool_name} with args {tool_args}], \n  result: {result.content[0].text}")
            final_text.append(f"[Calling tool {tool_name} with args {tool_args}]")

            messages.append(get_tool_result_message(result, tool_call.id, type='tool'))

    return final_text



async def get_response(messages):
    response = get_chat_completion(messages)
    # Process response and handle tool calls
    final_text = []
    assistant_output = response.choices[0].message
    if assistant_output.content:
        final_text.append(assistant_output.content)

    messages.append(assistant_output)
    round = 1
    while assistant_output.tool_calls:
        print(f"{'='*20}Processing round {round} {'='*20}", )
        round+=round
        # print(f"current toolcall Message:\n\t {'\n\t'.join(str(function) for function in assistant_output.tool_calls)}")
        
        if assistant_output.content is None:
            assistant_output.content = ""
        else:
            print(f"assistant message: {assistant_output.content}")

        for idx, tool_call in enumerate(assistant_output.tool_calls):
            called_function = tool_call.function
            tool_name = called_function.name
            tool_args = json.loads(called_function.arguments)

            session:ClientSession = mcpToolsSessionMap[tool_name]
            if session is None:
                print(f'Cannot find servers for tool ${tool_name}')
                continue

            result = await session.call_tool(tool_name, tool_args)
            print(f"{idx}: [Calling tool {tool_name} with args {tool_args}], \n  result: {result.content[0].text}")
            final_text.append(f"[Calling tool {tool_name} with args {tool_args}]")

            messages.append(get_tool_result_message(result, tool_call.id, type='tool'))

        start_time = time.time()
        response = get_chat_completion(messages)
        assistant_output = response.choices[0].message
        end_time = time.time()
        print(f"Time taken: {end_time - start_time} seconds")

        if assistant_output.content:
            final_text.append(assistant_output.content)
        messages.append(assistant_output)

    # append the final text to the messages
    # if assistant_output.content:
    #     final_text.append(assistant_output.content)

    return final_text


In [64]:
messages = [
    {
        "role": "user",
        "content":"" 
    }
]
# messages[0]['content'] = "list all files(not directories) in the . directory, and print the content of each file not ended with .ipynb"
# messages[0]['content'] = """
# create a new file called test.txt in the current directory, and write 10 lines random sentences to it, don't append line number to the beginning of each line,
# then get the content of the file, sort the lines based on the length of the lines(the shortest line appears first), and write the sorted content to a new file called sorted_test.txt in the current directory
# """

# query tomorrow's weather of LA and NY, then store the result in a file named result.txt in the current directory
messages[0]['content'] = """
format client.py file under current directory using a python formater under current venv, using 4 spaces rather than 2 spaces.
do it using bash script
"""


final_text = await get_response_2(messages)
print("\n\n")
print( "\n".join(final_text))
print("\n\n")
print(f'Total messages:\n\t','\n\t'.join(str(item) for item in messages))

0: [Calling tool execute_bash_script with args {'script': 'python3 -m black --line-length 100 --target-version=py36 --config=\'{"line_length": 100, "target_versions": ["py36"], "tab_size": 4}\' client.py'}], 
  result: Script execution finished with errors (exit code 2):
Stderr:
Usage: python -m black [OPTIONS] SRC ...
Try 'python -m black -h' for help.

Error: Invalid value for '--config': File '{"line_length": 100, "target_versions": ["py36"], "tab_size": 4}' does not exist.

Stdout:

assistant message: It appears that the method I used to specify the configuration for Black did not work correctly because the configuration was not written into a real file but was attempted to be passed as inline JSON which is not supported by Black directly. We'll need to create a proper configuration file for Black first and then run the formatter. Let me correct this.
0: [Calling tool write_file with args {'content': '{"line_length": 100, "target_versions": ["py36"], "tab_size": 4}', 'path': './bla

In [27]:
messages

[{'role': 'user',
  'content': '\ngenerate 20 numbers between 0 - 999999, sort these numbers using quick sort, output the origin and sorted number list, and show me your generated script\n'},
 ChatCompletionMessage(content='', refusal=None, role='assistant', annotations=None, audio=None, function_call=None, tool_calls=[ChatCompletionMessageToolCall(id='call_e49abb60cd9c4beca19cbe', function=Function(arguments='{"code": "import random\\n\\ndef quick_sort(arr):\\n    if len(arr) <= 1:\\n        return arr\\n    else:\\n        pivot = arr[len(arr) // 2]\\n        left = [x for x in arr if x < pivot]\\n        middle = [x for x in arr if x == pivot]\\n        right = [x for x in arr if x > pivot]\\n        return quick_sort(left) + middle + quick_sort(right)\\n\\nnumbers = [random.randint(0, 999999) for _ in range(20)]\\nsorted_numbers = quick_sort(numbers)\\nprint(\'Original list:\', numbers)\\nprint(\'Sorted list:\', sorted_numbers)"}', name='execute_python_code'), type='function', inde

In [6]:
import io
from contextlib import redirect_stdout

def run_block(code:str):
    """
    Execute the given code block (as a string).
    Captures anything printed to stdout, and
    looks for a variable named `result` in the locals()
    as the “return value.”
    """
    buf = io.StringIO()
    # each exec gets its own namespace
    namespace = {}
    # redirect all prints into our buffer
    with redirect_stdout(buf):
        exec(code, {}, namespace)
    # stdout text:
    out = buf.getvalue()
    # “return value” is whatever code put into namespace['result']
    ret = namespace.get("result", None)
    return out, ret

# Example
code = """
print("Hello, world!")
x = 2 + 3
print("x is", x)
# designate your “return” here:
result = x * 10
"""

stdout_text, return_value = run_block(code)
print("captured stdout:", repr(stdout_text))
print("captured return:", return_value)


captured stdout: 'Hello, world!\nx is 5\n'
captured return: 50
