From f64ebdcebff9ced4b1b438393df73404ff7be9f7 Mon Sep 17 00:00:00 2001 From: Thanabordee Date: Sat, 26 Apr 2025 16:54:52 +0700 Subject: [PATCH 01/11] Add MCPTools to Utill , Bug : Calling Asycn Function --- dspy/predict/react.py | 13 ++- dspy/primitives/tool.py | 2 +- dspy/utils/MCPTools.py | 176 ++++++++++++++++++++++++++++++++++++++++ dspy/utils/__init__.py | 2 + 4 files changed, 191 insertions(+), 2 deletions(-) create mode 100644 dspy/utils/MCPTools.py diff --git a/dspy/predict/react.py b/dspy/predict/react.py index 0580a7f7b9..f7941f6d41 100644 --- a/dspy/predict/react.py +++ b/dspy/predict/react.py @@ -7,6 +7,7 @@ from dspy.primitives.program import Module from dspy.primitives.tool import Tool from dspy.signatures.signature import ensure_signature +import asyncio logger = logging.getLogger(__name__) @@ -88,8 +89,18 @@ def forward(self, **input_args): trajectory[f"tool_args_{idx}"] = pred.next_tool_args try: - trajectory[f"observation_{idx}"] = self.tools[pred.next_tool_name](**pred.next_tool_args) + # if asyncio.iscoroutine(self.tools[pred.next_tool_name]): + print(f"Calling async tool: {pred.next_tool_name}") + async def async_func(): + return await self.tools[pred.next_tool_name].acall(**pred.next_tool_args) + + trajectory[f"observation_{idx}"] = asyncio.run(async_func()) + + # else : + # print(f"Calling sync tool: {pred.next_tool_name}") + # trajectory[f"observation_{idx}"] = self.tools[pred.next_tool_name](**pred.next_tool_args) except Exception as err: + print(err) trajectory[f"observation_{idx}"] = f"Execution error in {pred.next_tool_name}: {_fmt_exc(err)}" if pred.next_tool_name == "finish": diff --git a/dspy/primitives/tool.py b/dspy/primitives/tool.py index 0716ded7c7..87ec44c2b9 100644 --- a/dspy/primitives/tool.py +++ b/dspy/primitives/tool.py @@ -1,6 +1,6 @@ import asyncio import inspect -from typing import Any, Callable, Optional, get_origin, get_type_hints +from typing import Any, Callable, Optional, get_origin, get_type_hints from jsonschema import ValidationError, validate from pydantic import BaseModel, TypeAdapter, create_model diff --git a/dspy/utils/MCPTools.py b/dspy/utils/MCPTools.py new file mode 100644 index 0000000000..58ff508fe9 --- /dev/null +++ b/dspy/utils/MCPTools.py @@ -0,0 +1,176 @@ +from typing import Any, Dict, List, Optional, Tuple, Type, Union +import json +from dspy.primitives.tool import Tool + +def map_json_schema_to_tool_args( + schema: Optional[Dict[str, Any]] +) -> Tuple[Dict[str, Any], Dict[str, Type], Dict[str, str]]: + """Maps a JSON schema to tool arguments compatible with DSPy Tool.""" + args, arg_types, arg_desc = {}, {}, {} + + if not schema or "properties" not in schema: + return args, arg_types, arg_desc + + for name, prop in schema["properties"].items(): + args[name] = prop + + # Map JSON schema types to Python types + type_mapping = { + "string": str, "integer": int, "number": float, + "boolean": bool, "array": list, "object": dict + } + prop_type = prop.get("type", "string") + arg_types[name] = type_mapping.get(prop_type, Any) + + # Description with required indicator + arg_desc[name] = prop.get("description", "No description provided.") + if name in schema.get("required", []): + arg_desc[name] += " (Required)" + + return args, arg_types, arg_desc + +class MCPTool(Tool): + """Wrapper for an MCP tool, compatible with DSPy agents.""" + + def __init__(self, tool_info: Any, session: Any): + """Create a DSPy Tool from an MCP tool description. + + Args: + tool_info: The tool information from MCP server + session: The MCP client session + """ + self.session = session + self._raw_tool_info = tool_info + + name, desc, input_schema = self._extract_tool_info(tool_info) + self.name = name # Store name as instance attribute for use in call_tool_async + args, arg_types, arg_desc = map_json_schema_to_tool_args(input_schema) + + super().__init__( + func=self.call_tool_async, + name=name, + desc=desc, + args=args, + arg_types=arg_types, + arg_desc=arg_desc + ) + + def _extract_tool_info(self, tool_info: Any) -> Tuple[str, str, Optional[Dict[str, Any]]]: + """Extract name, description and input schema from tool info.""" + # Handle object with attributes first (most common case for MCP tools) + try: + name = getattr(tool_info, 'name', None) + desc = getattr(tool_info, 'description', None) + + # Handle inputSchema or schema attribute + input_schema = None + if hasattr(tool_info, 'inputSchema'): + input_schema = tool_info.inputSchema + + # If all attributes were found, return them + if name and desc is not None: + return name, desc, input_schema + except (AttributeError, TypeError): + pass + + # Handle dictionary format + if isinstance(tool_info, dict): + name = tool_info.get('name') + desc = tool_info.get('description') + input_schema = tool_info.get('inputSchema') + + if name and desc is not None: + return name, desc, input_schema + + # Handle serialized JSON string + if isinstance(tool_info, str): + try: + parsed = json.loads(tool_info) + if isinstance(parsed, dict): + name = parsed.get('name') + desc = parsed.get('description') + input_schema = parsed.get('inputSchema') + + if name and desc is not None: + return name, desc, input_schema + except json.JSONDecodeError: + pass + + # Last resort fallback - use string representation as name + return str(tool_info), "No description available.", None + + async def call_tool_async(self, **kwargs: Any) -> Any: + """Execute the MCP tool.""" + try: + # Pass the kwargs directly without nesting them + result = await self.session.call_tool(self.name, kwargs) + print(result) + return self._process_result(result) + except Exception as e: + raise RuntimeError(f"Error executing tool {self.name}: {str(e)}") + + def _process_result(self, result: Any) -> Any: + """Process the result from tool execution.""" + if result is None: + return "Tool executed successfully but returned no content." + + if hasattr(result, 'content') and result.content: + content = result.content + if isinstance(content, list): + try: + return "\n".join(str(getattr(item, 'text', item)) for item in content if item) + except Exception: + pass + return str(content) + + if hasattr(result, 'text') and result.text is not None: + return result.text + + if isinstance(result, dict): + for key in ("message", "output", "result", "text"): + if key in result: + return str(result[key]) + return json.dumps(result, indent=2) + + return str(result) + +class MCPTools: + """Collection of tools from an MCP server, usable with DSPy agents.""" + + def __init__(self, tools_list: List[Any], session: Any): + """Initialize the MCPTools collection. + + Args: + tools_list: List of tools from MCP server + session: MCP client session + """ + self.session = session + self.tools = {} + + # Create MCPTool instances for each tool in the list + for tool in tools_list: + mcp_tool = MCPTool(tool, session) + self.tools[mcp_tool.name] = mcp_tool + + def __getitem__(self, tool_name: str) -> MCPTool: + """Get a tool by name.""" + if tool_name not in self.tools: + raise KeyError(f"Tool '{tool_name}' not found in available MCP tools") + return self.tools[tool_name] + + def get_tools(self) -> List[Tool]: + """Get all tools as a list.""" + return list(self.tools.values()) + + def get_tool_names(self) -> List[str]: + """Get names of all available tools.""" + return list(self.tools.keys()) + + def __str__(self) -> str: + """String representation showing available tools.""" + return f"MCPTools with {len(self.tools)} tools: {', '.join(self.tools.keys())}" + + def __repr__(self) -> str: + """Detailed representation of the tools collection.""" + return f"MCPTools({len(self.tools)} tools: {list(self.tools.keys())})" + diff --git a/dspy/utils/__init__.py b/dspy/utils/__init__.py index 84178e4910..9cb3917782 100644 --- a/dspy/utils/__init__.py +++ b/dspy/utils/__init__.py @@ -1,6 +1,7 @@ from dspy.utils.callback import BaseCallback, with_callbacks from dspy.utils.dummies import DummyLM, DummyVectorizer, dummy_rm from dspy.streaming.messages import StatusMessageProvider, StatusMessage +from dspy.utils.MCPTools import MCPTools import os import requests @@ -24,6 +25,7 @@ def download(url): "with_callbacks", "DummyLM", "DummyVectorizer", + "MCPTools", "dummy_rm", "StatusMessage", "StatusMessageProvider", From 389e8a59228a368a4bcdabe46f321625d555a0b5 Mon Sep 17 00:00:00 2001 From: Thanabordee Date: Sun, 27 Apr 2025 00:14:29 +0700 Subject: [PATCH 02/11] Add syncify to wrap acall still stuckling --- dspy/predict/react.py | 26 +++++------ dspy/primitives/tool.py | 2 +- dspy/utils/MCPTools.py | 99 +++++++++++++---------------------------- 3 files changed, 45 insertions(+), 82 deletions(-) diff --git a/dspy/predict/react.py b/dspy/predict/react.py index f7941f6d41..d984422620 100644 --- a/dspy/predict/react.py +++ b/dspy/predict/react.py @@ -7,8 +7,7 @@ from dspy.primitives.program import Module from dspy.primitives.tool import Tool from dspy.signatures.signature import ensure_signature -import asyncio - +from asyncer import syncify logger = logging.getLogger(__name__) @@ -89,18 +88,19 @@ def forward(self, **input_args): trajectory[f"tool_args_{idx}"] = pred.next_tool_args try: - # if asyncio.iscoroutine(self.tools[pred.next_tool_name]): - print(f"Calling async tool: {pred.next_tool_name}") - async def async_func(): - return await self.tools[pred.next_tool_name].acall(**pred.next_tool_args) - - trajectory[f"observation_{idx}"] = asyncio.run(async_func()) - - # else : - # print(f"Calling sync tool: {pred.next_tool_name}") - # trajectory[f"observation_{idx}"] = self.tools[pred.next_tool_name](**pred.next_tool_args) + try: + print(f"Calling async tool: {pred.next_tool_name}") + + result = syncify(lambda: self.tools[pred.next_tool_name].acall(**pred.next_tool_args))() + print(result) + trajectory[f"observation_{idx}"] = result + + except: + # else : + print(f"Calling sync tool: {pred.next_tool_name}") + trajectory[f"observation_{idx}"] = self.tools[pred.next_tool_name](**pred.next_tool_args) except Exception as err: - print(err) + raise err trajectory[f"observation_{idx}"] = f"Execution error in {pred.next_tool_name}: {_fmt_exc(err)}" if pred.next_tool_name == "finish": diff --git a/dspy/primitives/tool.py b/dspy/primitives/tool.py index 87ec44c2b9..7f3a824aa2 100644 --- a/dspy/primitives/tool.py +++ b/dspy/primitives/tool.py @@ -172,7 +172,7 @@ def __call__(self, **kwargs): async def acall(self, **kwargs): parsed_kwargs = self._validate_and_parse_args(**kwargs) - result = self.func(**parsed_kwargs) + result = await self.func(**parsed_kwargs) if not asyncio.iscoroutine(result): raise ValueError("You are calling `acall` on a non-async tool, please use `__call__` instead.") return await result diff --git a/dspy/utils/MCPTools.py b/dspy/utils/MCPTools.py index 58ff508fe9..83f89a2f98 100644 --- a/dspy/utils/MCPTools.py +++ b/dspy/utils/MCPTools.py @@ -1,30 +1,22 @@ -from typing import Any, Dict, List, Optional, Tuple, Type, Union +from typing import Any, Dict, List, Optional, Tuple, Type import json +import anyio from dspy.primitives.tool import Tool -def map_json_schema_to_tool_args( - schema: Optional[Dict[str, Any]] -) -> Tuple[Dict[str, Any], Dict[str, Type], Dict[str, str]]: +def map_json_schema_to_tool_args(schema: Optional[Dict[str, Any]]) -> Tuple[Dict[str, Any], Dict[str, Type], Dict[str, str]]: """Maps a JSON schema to tool arguments compatible with DSPy Tool.""" args, arg_types, arg_desc = {}, {}, {} - if not schema or "properties" not in schema: return args, arg_types, arg_desc + type_mapping = {"string": str, "integer": int, "number": float, "boolean": bool, "array": list, "object": dict} + required = schema.get("required", []) + for name, prop in schema["properties"].items(): args[name] = prop - - # Map JSON schema types to Python types - type_mapping = { - "string": str, "integer": int, "number": float, - "boolean": bool, "array": list, "object": dict - } - prop_type = prop.get("type", "string") - arg_types[name] = type_mapping.get(prop_type, Any) - - # Description with required indicator + arg_types[name] = type_mapping.get(prop.get("type", "string"), Any) arg_desc[name] = prop.get("description", "No description provided.") - if name in schema.get("required", []): + if name in required: arg_desc[name] += " (Required)" return args, arg_types, arg_desc @@ -33,17 +25,12 @@ class MCPTool(Tool): """Wrapper for an MCP tool, compatible with DSPy agents.""" def __init__(self, tool_info: Any, session: Any): - """Create a DSPy Tool from an MCP tool description. - - Args: - tool_info: The tool information from MCP server - session: The MCP client session - """ + """Create a DSPy Tool from an MCP tool description.""" self.session = session self._raw_tool_info = tool_info name, desc, input_schema = self._extract_tool_info(tool_info) - self.name = name # Store name as instance attribute for use in call_tool_async + self.name = name args, arg_types, arg_desc = map_json_schema_to_tool_args(input_schema) super().__init__( @@ -57,55 +44,38 @@ def __init__(self, tool_info: Any, session: Any): def _extract_tool_info(self, tool_info: Any) -> Tuple[str, str, Optional[Dict[str, Any]]]: """Extract name, description and input schema from tool info.""" - # Handle object with attributes first (most common case for MCP tools) - try: - name = getattr(tool_info, 'name', None) - desc = getattr(tool_info, 'description', None) + # Try object attributes + if hasattr(tool_info, 'name') and hasattr(tool_info, 'description'): + return ( + tool_info.name, + tool_info.description, + getattr(tool_info, 'inputSchema', None) + ) - # Handle inputSchema or schema attribute - input_schema = None - if hasattr(tool_info, 'inputSchema'): - input_schema = tool_info.inputSchema - - # If all attributes were found, return them - if name and desc is not None: - return name, desc, input_schema - except (AttributeError, TypeError): - pass - - # Handle dictionary format + # Try dict format if isinstance(tool_info, dict): - name = tool_info.get('name') - desc = tool_info.get('description') - input_schema = tool_info.get('inputSchema') - - if name and desc is not None: - return name, desc, input_schema + if 'name' in tool_info and 'description' in tool_info: + return tool_info['name'], tool_info['description'], tool_info.get('inputSchema') - # Handle serialized JSON string + # Try JSON string if isinstance(tool_info, str): try: parsed = json.loads(tool_info) - if isinstance(parsed, dict): - name = parsed.get('name') - desc = parsed.get('description') - input_schema = parsed.get('inputSchema') - - if name and desc is not None: - return name, desc, input_schema + if isinstance(parsed, dict) and 'name' in parsed and 'description' in parsed: + return parsed['name'], parsed['description'], parsed.get('inputSchema') except json.JSONDecodeError: pass - # Last resort fallback - use string representation as name return str(tool_info), "No description available.", None async def call_tool_async(self, **kwargs: Any) -> Any: """Execute the MCP tool.""" try: - # Pass the kwargs directly without nesting them result = await self.session.call_tool(self.name, kwargs) - print(result) + print(f"Tool {self.name} executed with args: {kwargs}") + print(f"Tool {self.name} result: {result}") return self._process_result(result) + except Exception as e: raise RuntimeError(f"Error executing tool {self.name}: {str(e)}") @@ -114,6 +84,7 @@ def _process_result(self, result: Any) -> Any: if result is None: return "Tool executed successfully but returned no content." + # Handle content attribute if hasattr(result, 'content') and result.content: content = result.content if isinstance(content, list): @@ -123,9 +94,11 @@ def _process_result(self, result: Any) -> Any: pass return str(content) + # Handle text attribute if hasattr(result, 'text') and result.text is not None: return result.text + # Handle dictionary if isinstance(result, dict): for key in ("message", "output", "result", "text"): if key in result: @@ -138,19 +111,9 @@ class MCPTools: """Collection of tools from an MCP server, usable with DSPy agents.""" def __init__(self, tools_list: List[Any], session: Any): - """Initialize the MCPTools collection. - - Args: - tools_list: List of tools from MCP server - session: MCP client session - """ + """Initialize the MCPTools collection.""" self.session = session - self.tools = {} - - # Create MCPTool instances for each tool in the list - for tool in tools_list: - mcp_tool = MCPTool(tool, session) - self.tools[mcp_tool.name] = mcp_tool + self.tools = {MCPTool(tool, session).name: MCPTool(tool, session) for tool in tools_list} def __getitem__(self, tool_name: str) -> MCPTool: """Get a tool by name.""" From dc87f7aa16e548439dd9f46d60f0d37946fc289f Mon Sep 17 00:00:00 2001 From: Thanabordee Nammungkhun Date: Sun, 27 Apr 2025 11:02:40 +0700 Subject: [PATCH 03/11] Converted MCP tools are working correctly after the fix, but they are still buggy when called by React. --- dspy/predict/react.py | 17 ++++------------- 1 file changed, 4 insertions(+), 13 deletions(-) diff --git a/dspy/predict/react.py b/dspy/predict/react.py index d984422620..727bc3499b 100644 --- a/dspy/predict/react.py +++ b/dspy/predict/react.py @@ -7,7 +7,9 @@ from dspy.primitives.program import Module from dspy.primitives.tool import Tool from dspy.signatures.signature import ensure_signature +import anyio from asyncer import syncify + logger = logging.getLogger(__name__) @@ -73,7 +75,7 @@ def _format_trajectory(self, trajectory: dict[str, Any]): trajectory_signature = dspy.Signature(f"{', '.join(trajectory.keys())} -> x") return adapter.format_user_message_content(trajectory_signature, trajectory) - def forward(self, **input_args): + async def forward(self, **input_args): trajectory = {} max_iters = input_args.pop("max_iters", self.max_iters) for idx in range(max_iters): @@ -88,19 +90,8 @@ def forward(self, **input_args): trajectory[f"tool_args_{idx}"] = pred.next_tool_args try: - try: - print(f"Calling async tool: {pred.next_tool_name}") - - result = syncify(lambda: self.tools[pred.next_tool_name].acall(**pred.next_tool_args))() - print(result) - trajectory[f"observation_{idx}"] = result - - except: - # else : - print(f"Calling sync tool: {pred.next_tool_name}") - trajectory[f"observation_{idx}"] = self.tools[pred.next_tool_name](**pred.next_tool_args) + trajectory[f"observation_{idx}"] = self.tools[pred.next_tool_name](**pred.next_tool_args) except Exception as err: - raise err trajectory[f"observation_{idx}"] = f"Execution error in {pred.next_tool_name}: {_fmt_exc(err)}" if pred.next_tool_name == "finish": From 056869dc11b39a782bfa7df48154464d7d7d74c3 Mon Sep 17 00:00:00 2001 From: Thanabordee Nammungkhun Date: Sun, 27 Apr 2025 13:29:02 +0700 Subject: [PATCH 04/11] Refactor ReAct and Tool classes to remove async functionality from forward and acall methods --- dspy/predict/react.py | 4 +--- dspy/primitives/tool.py | 4 ++-- 2 files changed, 3 insertions(+), 5 deletions(-) diff --git a/dspy/predict/react.py b/dspy/predict/react.py index 727bc3499b..0580a7f7b9 100644 --- a/dspy/predict/react.py +++ b/dspy/predict/react.py @@ -7,8 +7,6 @@ from dspy.primitives.program import Module from dspy.primitives.tool import Tool from dspy.signatures.signature import ensure_signature -import anyio -from asyncer import syncify logger = logging.getLogger(__name__) @@ -75,7 +73,7 @@ def _format_trajectory(self, trajectory: dict[str, Any]): trajectory_signature = dspy.Signature(f"{', '.join(trajectory.keys())} -> x") return adapter.format_user_message_content(trajectory_signature, trajectory) - async def forward(self, **input_args): + def forward(self, **input_args): trajectory = {} max_iters = input_args.pop("max_iters", self.max_iters) for idx in range(max_iters): diff --git a/dspy/primitives/tool.py b/dspy/primitives/tool.py index 7f3a824aa2..0716ded7c7 100644 --- a/dspy/primitives/tool.py +++ b/dspy/primitives/tool.py @@ -1,6 +1,6 @@ import asyncio import inspect -from typing import Any, Callable, Optional, get_origin, get_type_hints +from typing import Any, Callable, Optional, get_origin, get_type_hints from jsonschema import ValidationError, validate from pydantic import BaseModel, TypeAdapter, create_model @@ -172,7 +172,7 @@ def __call__(self, **kwargs): async def acall(self, **kwargs): parsed_kwargs = self._validate_and_parse_args(**kwargs) - result = await self.func(**parsed_kwargs) + result = self.func(**parsed_kwargs) if not asyncio.iscoroutine(result): raise ValueError("You are calling `acall` on a non-async tool, please use `__call__` instead.") return await result From ec90349de9cb8f60256f1f659998c641735b55af Mon Sep 17 00:00:00 2001 From: Thanabordee Date: Sun, 27 Apr 2025 15:32:03 +0700 Subject: [PATCH 05/11] Fixed : fix forward working with MCP tools by adding Inside async _forward --- dspy/predict/react.py | 42 ++++++++++++++-- dspy/primitives/tool.py | 16 +++++- dspy/utils/MCPTools.py | 108 ++++++++++++++++++++++++++++++++++------ 3 files changed, 145 insertions(+), 21 deletions(-) diff --git a/dspy/predict/react.py b/dspy/predict/react.py index 0580a7f7b9..92d3cd545e 100644 --- a/dspy/predict/react.py +++ b/dspy/predict/react.py @@ -1,8 +1,9 @@ import logging from typing import Any, Callable, Literal - from litellm import ContextWindowExceededError +import asyncio +import inspect import dspy from dspy.primitives.program import Module from dspy.primitives.tool import Tool @@ -67,15 +68,44 @@ def __init__(self, signature, tools: list[Callable], max_iters=5): self.tools = tools self.react = dspy.Predict(react_signature) self.extract = dspy.ChainOfThought(fallback_signature) - + def _format_trajectory(self, trajectory: dict[str, Any]): adapter = dspy.settings.adapter or dspy.ChatAdapter() trajectory_signature = dspy.Signature(f"{', '.join(trajectory.keys())} -> x") return adapter.format_user_message_content(trajectory_signature, trajectory) def forward(self, **input_args): + """Execute the ReAct agent with the provided input arguments.""" trajectory = {} max_iters = input_args.pop("max_iters", self.max_iters) + + # Check if we're already in an event loop + try: + asyncio.get_running_loop() + # If we are in an event loop, we need a completely different approach + # Convert forward to an async coroutine and schedule it for execution in the event loop + return self._forward_in_event_loop(trajectory, max_iters, **input_args) + except RuntimeError: + # If we're not in an event loop, we can use asyncio.run to create one + async def run_async(): + return await self._forward_async(trajectory, max_iters, **input_args) + return asyncio.run(run_async()) + + def _forward_in_event_loop(self, trajectory, max_iters, **input_args): + """Handle the case where forward is called from within an event loop. + + In this case, we can't use asyncio.run or run_until_complete, so we need to + return a value that can be awaited by the caller. + """ + async def async_forward(): + return await self._forward_async(trajectory, max_iters, **input_args) + + # If we're called from an async function, we can return a coroutine + # If we're called from a sync function in an event loop, the caller needs to handle scheduling + return async_forward() + + async def _forward_async(self, trajectory, max_iters, **input_args): + """Async implementation of the ReAct forward method.""" for idx in range(max_iters): try: pred = self._call_with_potential_trajectory_truncation(self.react, trajectory, **input_args) @@ -88,9 +118,13 @@ def forward(self, **input_args): trajectory[f"tool_args_{idx}"] = pred.next_tool_args try: - trajectory[f"observation_{idx}"] = self.tools[pred.next_tool_name](**pred.next_tool_args) + # Execute the tool using the universal aexecute method + # Here we use await directly since we're in an async function + trajectory[f"observation_{idx}"] = await self.tools[pred.next_tool_name].aexecute(**pred.next_tool_args) except Exception as err: - trajectory[f"observation_{idx}"] = f"Execution error in {pred.next_tool_name}: {_fmt_exc(err)}" + error_msg = f"Execution error in {pred.next_tool_name}: {_fmt_exc(err)}" + trajectory[f"observation_{idx}"] = error_msg + logger.warning(f"Tool execution error: {_fmt_exc(err)}") if pred.next_tool_name == "finish": break diff --git a/dspy/primitives/tool.py b/dspy/primitives/tool.py index 0716ded7c7..515a95f2c8 100644 --- a/dspy/primitives/tool.py +++ b/dspy/primitives/tool.py @@ -174,5 +174,19 @@ async def acall(self, **kwargs): parsed_kwargs = self._validate_and_parse_args(**kwargs) result = self.func(**parsed_kwargs) if not asyncio.iscoroutine(result): - raise ValueError("You are calling `acall` on a non-async tool, please use `__call__` instead.") + # For compatibility, let's just return the result instead of raising an error + return result return await result + + async def aexecute(self, **kwargs): + """Execute the tool in any context (async or sync) handling both sync and async functions. + + This method is used internally by ReAct and other agents to safely execute tools in any context. + It will work with both sync and async functions, making it a universal tool execution method. + """ + parsed_kwargs = self._validate_and_parse_args(**kwargs) + result = self.func(**parsed_kwargs) + if asyncio.iscoroutine(result): + return await result + else: + return result diff --git a/dspy/utils/MCPTools.py b/dspy/utils/MCPTools.py index 83f89a2f98..6f1620d637 100644 --- a/dspy/utils/MCPTools.py +++ b/dspy/utils/MCPTools.py @@ -1,10 +1,20 @@ -from typing import Any, Dict, List, Optional, Tuple, Type +from typing import Any, Dict, List, Optional, Tuple, Type, Union import json +import logging import anyio from dspy.primitives.tool import Tool +logger = logging.getLogger(__name__) + def map_json_schema_to_tool_args(schema: Optional[Dict[str, Any]]) -> Tuple[Dict[str, Any], Dict[str, Type], Dict[str, str]]: - """Maps a JSON schema to tool arguments compatible with DSPy Tool.""" + """Maps a JSON schema to tool arguments compatible with DSPy Tool. + + Args: + schema: A JSON schema describing the tool's input parameters + + Returns: + A tuple of (args, arg_types, arg_desc) dictionaries for DSPy Tool initialization + """ args, arg_types, arg_desc = {}, {}, {} if not schema or "properties" not in schema: return args, arg_types, arg_desc @@ -22,10 +32,20 @@ def map_json_schema_to_tool_args(schema: Optional[Dict[str, Any]]) -> Tuple[Dict return args, arg_types, arg_desc class MCPTool(Tool): - """Wrapper for an MCP tool, compatible with DSPy agents.""" + """Wrapper for an MCP tool, compatible with DSPy agents. + + This class wraps a Model Context Protocol tool and makes it compatible with + DSPy's ReAct and other agent frameworks. It handles the translation between + DSPy's tool interface and MCP's JSON-RPC interface. + """ def __init__(self, tool_info: Any, session: Any): - """Create a DSPy Tool from an MCP tool description.""" + """Create a DSPy Tool from an MCP tool description. + + Args: + tool_info: Tool information from MCP (object, dict, or JSON string) + session: MCP client session for making tool calls + """ self.session = session self._raw_tool_info = tool_info @@ -43,7 +63,14 @@ def __init__(self, tool_info: Any, session: Any): ) def _extract_tool_info(self, tool_info: Any) -> Tuple[str, str, Optional[Dict[str, Any]]]: - """Extract name, description and input schema from tool info.""" + """Extract name, description and input schema from tool info. + + Args: + tool_info: Tool information in various formats (object, dict, JSON string) + + Returns: + A tuple of (name, description, input_schema) + """ # Try object attributes if hasattr(tool_info, 'name') and hasattr(tool_info, 'description'): return ( @@ -69,18 +96,39 @@ def _extract_tool_info(self, tool_info: Any) -> Tuple[str, str, Optional[Dict[st return str(tool_info), "No description available.", None async def call_tool_async(self, **kwargs: Any) -> Any: - """Execute the MCP tool.""" + """Execute the MCP tool asynchronously. + + Args: + **kwargs: Arguments to pass to the MCP tool + + Returns: + Processed result from the tool execution + + Raises: + RuntimeError: If there's an error executing the tool + """ try: + logger.debug(f"Executing MCP tool {self.name} with args: {kwargs}") result = await self.session.call_tool(self.name, kwargs) - print(f"Tool {self.name} executed with args: {kwargs}") - print(f"Tool {self.name} result: {result}") + logger.debug(f"MCP tool {self.name} returned: {result}") return self._process_result(result) - + except anyio.ClosedResourceError as e: + # Special handling for closed resources (common during cleanup) + logger.error(f"MCP resource closed during {self.name} execution: {str(e)}") + raise RuntimeError(f"MCP connection closed while executing {self.name}") except Exception as e: + logger.error(f"Error executing MCP tool {self.name}: {str(e)}") raise RuntimeError(f"Error executing tool {self.name}: {str(e)}") def _process_result(self, result: Any) -> Any: - """Process the result from tool execution.""" + """Process the result from tool execution into a format suitable for agents. + + Args: + result: Raw result from the MCP tool + + Returns: + Processed result (typically as a string or structured data) + """ if result is None: return "Tool executed successfully but returned no content." @@ -108,25 +156,53 @@ def _process_result(self, result: Any) -> Any: return str(result) class MCPTools: - """Collection of tools from an MCP server, usable with DSPy agents.""" + """Collection of tools from an MCP server, usable with DSPy agents. - def __init__(self, tools_list: List[Any], session: Any): - """Initialize the MCPTools collection.""" + This class provides a container for multiple MCP tools and makes them + available in a format compatible with DSPy's agent frameworks like ReAct. + """ + + def __init__(self, session: Any, tools_list: List[Any]): + """Initialize the MCPTools collection. + + Args: + session: MCP client session for making tool calls + tools_list: List of tool descriptions from MCP + """ self.session = session self.tools = {MCPTool(tool, session).name: MCPTool(tool, session) for tool in tools_list} + logger.info(f"Initialized MCPTools with {len(self.tools)} tools: {', '.join(self.tools.keys())}") def __getitem__(self, tool_name: str) -> MCPTool: - """Get a tool by name.""" + """Get a tool by name. + + Args: + tool_name: Name of the tool to retrieve + + Returns: + The requested MCPTool instance + + Raises: + KeyError: If the tool is not found + """ if tool_name not in self.tools: raise KeyError(f"Tool '{tool_name}' not found in available MCP tools") return self.tools[tool_name] def get_tools(self) -> List[Tool]: - """Get all tools as a list.""" + """Get all tools as a list. + + Returns: + List of all available MCPTool instances + """ return list(self.tools.values()) def get_tool_names(self) -> List[str]: - """Get names of all available tools.""" + """Get names of all available tools. + + Returns: + List of tool names + """ return list(self.tools.keys()) def __str__(self) -> str: From a393e0d166ad6abe84728a0b4d333f75da461694 Mon Sep 17 00:00:00 2001 From: Thanabordee Date: Sun, 27 Apr 2025 15:41:48 +0700 Subject: [PATCH 06/11] Refactor : Simplify forward method --- dspy/predict/react.py | 26 ++++++-------------------- 1 file changed, 6 insertions(+), 20 deletions(-) diff --git a/dspy/predict/react.py b/dspy/predict/react.py index 92d3cd545e..a44b207629 100644 --- a/dspy/predict/react.py +++ b/dspy/predict/react.py @@ -3,7 +3,6 @@ from litellm import ContextWindowExceededError import asyncio -import inspect import dspy from dspy.primitives.program import Module from dspy.primitives.tool import Tool @@ -82,28 +81,16 @@ def forward(self, **input_args): # Check if we're already in an event loop try: asyncio.get_running_loop() - # If we are in an event loop, we need a completely different approach - # Convert forward to an async coroutine and schedule it for execution in the event loop - return self._forward_in_event_loop(trajectory, max_iters, **input_args) + # If we are in an event loop, we need to return a coroutine + async def async_forward(): + return await self._forward_async(trajectory, max_iters, **input_args) + return async_forward() except RuntimeError: - # If we're not in an event loop, we can use asyncio.run to create one + # If we're not in an event loop, we can use asyncio.run async def run_async(): return await self._forward_async(trajectory, max_iters, **input_args) return asyncio.run(run_async()) - def _forward_in_event_loop(self, trajectory, max_iters, **input_args): - """Handle the case where forward is called from within an event loop. - - In this case, we can't use asyncio.run or run_until_complete, so we need to - return a value that can be awaited by the caller. - """ - async def async_forward(): - return await self._forward_async(trajectory, max_iters, **input_args) - - # If we're called from an async function, we can return a coroutine - # If we're called from a sync function in an event loop, the caller needs to handle scheduling - return async_forward() - async def _forward_async(self, trajectory, max_iters, **input_args): """Async implementation of the ReAct forward method.""" for idx in range(max_iters): @@ -118,8 +105,7 @@ async def _forward_async(self, trajectory, max_iters, **input_args): trajectory[f"tool_args_{idx}"] = pred.next_tool_args try: - # Execute the tool using the universal aexecute method - # Here we use await directly since we're in an async function + # Execute the tool using the aexecute method trajectory[f"observation_{idx}"] = await self.tools[pred.next_tool_name].aexecute(**pred.next_tool_args) except Exception as err: error_msg = f"Execution error in {pred.next_tool_name}: {_fmt_exc(err)}" From 7f64537cecdde4556e643c9d2d5e93e267860f19 Mon Sep 17 00:00:00 2001 From: Thanabordee Date: Sun, 27 Apr 2025 17:07:19 +0700 Subject: [PATCH 07/11] Refactor : remove unnecessary comment --- dspy/__init__.py | 1 + dspy/predict/react.py | 4 ---- dspy/primitives/tool.py | 3 +-- 3 files changed, 2 insertions(+), 6 deletions(-) diff --git a/dspy/__init__.py b/dspy/__init__.py index 775018d61b..1e4440270d 100644 --- a/dspy/__init__.py +++ b/dspy/__init__.py @@ -12,6 +12,7 @@ from dspy.utils.logging_utils import configure_dspy_loggers, disable_logging, enable_logging from dspy.utils.asyncify import asyncify from dspy.utils.saving import load +from dspy.utils.MCPTools import MCPTools from dspy.streaming.streamify import streamify from dspy.utils.usage_tracker import track_usage diff --git a/dspy/predict/react.py b/dspy/predict/react.py index a44b207629..2d70107c61 100644 --- a/dspy/predict/react.py +++ b/dspy/predict/react.py @@ -77,16 +77,12 @@ def forward(self, **input_args): """Execute the ReAct agent with the provided input arguments.""" trajectory = {} max_iters = input_args.pop("max_iters", self.max_iters) - - # Check if we're already in an event loop try: asyncio.get_running_loop() - # If we are in an event loop, we need to return a coroutine async def async_forward(): return await self._forward_async(trajectory, max_iters, **input_args) return async_forward() except RuntimeError: - # If we're not in an event loop, we can use asyncio.run async def run_async(): return await self._forward_async(trajectory, max_iters, **input_args) return asyncio.run(run_async()) diff --git a/dspy/primitives/tool.py b/dspy/primitives/tool.py index 515a95f2c8..f54479b9fa 100644 --- a/dspy/primitives/tool.py +++ b/dspy/primitives/tool.py @@ -174,8 +174,7 @@ async def acall(self, **kwargs): parsed_kwargs = self._validate_and_parse_args(**kwargs) result = self.func(**parsed_kwargs) if not asyncio.iscoroutine(result): - # For compatibility, let's just return the result instead of raising an error - return result + raise ValueError("You are calling `acall` on a sync tool, please use `__call__` instead.") return await result async def aexecute(self, **kwargs): From d7dec81c833ef01a2ae4c3acf16c851ea74f47b4 Mon Sep 17 00:00:00 2001 From: Thanabordee Date: Sun, 27 Apr 2025 17:20:15 +0700 Subject: [PATCH 08/11] Added Optional MCP dependency support for Python 3.10 and updated related markers in project files --- pyproject.toml | 1 + uv.lock | 71 +++++++++++++++++++++++++++++++++++++++++++++----- 2 files changed, 66 insertions(+), 6 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 27d37a706c..3097423152 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -51,6 +51,7 @@ dependencies = [ anthropic = ["anthropic>=0.18.0,<1.0.0"] weaviate = ["weaviate-client~=4.5.4"] aws = ["boto3~=1.34.78"] +mcp = ["mcp; python_version >= '3.10'"] dev = [ "pytest>=6.2.5", "pytest-mock>=3.12.0", diff --git a/uv.lock b/uv.lock index ee8da45b23..8c12b5b5b1 100644 --- a/uv.lock +++ b/uv.lock @@ -712,6 +712,9 @@ dev = [ { name = "pytest-mock" }, { name = "ruff" }, ] +mcp = [ + { name = "mcp", marker = "python_full_version >= '3.10'" }, +] weaviate = [ { name = "weaviate-client" }, ] @@ -735,9 +738,10 @@ requires-dist = [ { name = "litellm", marker = "sys_platform == 'win32' and extra == 'dev'", specifier = ">=1.60.3" }, { name = "litellm", extras = ["proxy"], marker = "sys_platform != 'win32' and extra == 'dev'", specifier = ">=1.60.3" }, { name = "magicattr", specifier = ">=0.1.6" }, + { name = "mcp", marker = "python_full_version >= '3.10' and extra == 'mcp'" }, { name = "numpy", marker = "python_full_version == '3.9.*'", specifier = ">=1.26.0,<2.2" }, { name = "numpy", marker = "python_full_version >= '3.10'", specifier = ">=1.26.0" }, - { name = "openai", specifier = ">=0.28.1,<=1.61.0" }, + { name = "openai", specifier = ">=0.28.1" }, { name = "optuna", specifier = ">=3.4.0" }, { name = "pandas", specifier = ">=2.1.1" }, { name = "pillow", marker = "extra == 'dev'", specifier = ">=10.1.0" }, @@ -755,7 +759,7 @@ requires-dist = [ { name = "ujson", specifier = ">=5.8.0" }, { name = "weaviate-client", marker = "extra == 'weaviate'", specifier = "~=4.5.4" }, ] -provides-extras = ["anthropic", "weaviate", "aws", "dev"] +provides-extras = ["anthropic", "weaviate", "aws", "mcp", "dev"] [[package]] name = "email-validator" @@ -1127,6 +1131,15 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/41/7b/ddacf6dcebb42466abd03f368782142baa82e08fc0c1f8eaa05b4bae87d5/httpx-0.27.0-py3-none-any.whl", hash = "sha256:71d5465162c13681bff01ad59b2cc68dd838ea1f10e51574bac27103f00c91a5", size = 75590 }, ] +[[package]] +name = "httpx-sse" +version = "0.4.0" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/4c/60/8f4281fa9bbf3c8034fd54c0e7412e66edbab6bc74c4996bd616f8d0406e/httpx-sse-0.4.0.tar.gz", hash = "sha256:1e81a3a3070ce322add1d3529ed42eb5f70817f45ed6ec915ab753f961139721", size = 12624 } +wheels = [ + { url = "https://files.pythonhosted.org/packages/e1/9b/a181f281f65d776426002f330c31849b86b31fc9d848db62e16f03ff739f/httpx_sse-0.4.0-py3-none-any.whl", hash = "sha256:f329af6eae57eaa2bdfd962b42524764af68075ea87370a2de920af5341e318f", size = 7819 }, +] + [[package]] name = "huggingface-hub" version = "0.30.2" @@ -1448,6 +1461,25 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/b3/73/085399401383ce949f727afec55ec3abd76648d04b9f22e1c0e99cb4bec3/MarkupSafe-3.0.2-cp39-cp39-win_amd64.whl", hash = "sha256:6e296a513ca3d94054c2c881cc913116e90fd030ad1c656b3869762b754f5f8a", size = 15506 }, ] +[[package]] +name = "mcp" +version = "1.6.0" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "anyio", marker = "python_full_version >= '3.10'" }, + { name = "httpx", marker = "python_full_version >= '3.10'" }, + { name = "httpx-sse", marker = "python_full_version >= '3.10'" }, + { name = "pydantic", marker = "python_full_version >= '3.10'" }, + { name = "pydantic-settings", marker = "python_full_version >= '3.10'" }, + { name = "sse-starlette", marker = "python_full_version >= '3.10'" }, + { name = "starlette", marker = "python_full_version >= '3.10'" }, + { name = "uvicorn", marker = "python_full_version >= '3.10'" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/95/d2/f587cb965a56e992634bebc8611c5b579af912b74e04eb9164bd49527d21/mcp-1.6.0.tar.gz", hash = "sha256:d9324876de2c5637369f43161cd71eebfd803df5a95e46225cab8d280e366723", size = 200031 } +wheels = [ + { url = "https://files.pythonhosted.org/packages/10/30/20a7f33b0b884a9d14dd3aa94ff1ac9da1479fe2ad66dd9e2736075d2506/mcp-1.6.0-py3-none-any.whl", hash = "sha256:7bd24c6ea042dbec44c754f100984d186620d8b841ec30f1b19eda9b93a634d0", size = 76077 }, +] + [[package]] name = "mdurl" version = "0.1.2" @@ -2197,6 +2229,20 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/25/f2/1647933efaaad61846109a27619f3704929e758a09e6431b8f932a053d40/pydantic_core-2.33.1-pp39-pypy39_pp73-win_amd64.whl", hash = "sha256:de9e06abe3cc5ec6a2d5f75bc99b0bdca4f5c719a5b34026f8c57efbdecd2ee3", size = 2081073 }, ] +[[package]] +name = "pydantic-settings" +version = "2.9.1" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "pydantic", marker = "python_full_version >= '3.10'" }, + { name = "python-dotenv", marker = "python_full_version >= '3.10'" }, + { name = "typing-inspection", marker = "python_full_version >= '3.10'" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/67/1d/42628a2c33e93f8e9acbde0d5d735fa0850f3e6a2f8cb1eb6c40b9a732ac/pydantic_settings-2.9.1.tar.gz", hash = "sha256:c509bf79d27563add44e8446233359004ed85066cd096d8b510f715e6ef5d268", size = 163234 } +wheels = [ + { url = "https://files.pythonhosted.org/packages/b6/5f/d6d641b490fd3ec2c4c13b4244d68deea3a1b970a97be64f34fb5504ff72/pydantic_settings-2.9.1-py3-none-any.whl", hash = "sha256:59b4f431b1defb26fe620c71a7d3968a710d719f5f4cdbbdb7926edeb770f6ef", size = 44356 }, +] + [[package]] name = "pygments" version = "2.19.1" @@ -2710,12 +2756,25 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/d1/7c/5fc8e802e7506fe8b55a03a2e1dab156eae205c91bee46305755e086d2e2/sqlalchemy-2.0.40-py3-none-any.whl", hash = "sha256:32587e2e1e359276957e6fe5dad089758bc042a971a8a09ae8ecf7a8fe23d07a", size = 1903894 }, ] +[[package]] +name = "sse-starlette" +version = "2.3.3" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "anyio", marker = "python_full_version >= '3.10'" }, + { name = "starlette", marker = "python_full_version >= '3.10'" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/86/35/7d8d94eb0474352d55f60f80ebc30f7e59441a29e18886a6425f0bccd0d3/sse_starlette-2.3.3.tar.gz", hash = "sha256:fdd47c254aad42907cfd5c5b83e2282be15be6c51197bf1a9b70b8e990522072", size = 17499 } +wheels = [ + { url = "https://files.pythonhosted.org/packages/5d/20/52fdb5ebb158294b0adb5662235dd396fc7e47aa31c293978d8d8942095a/sse_starlette-2.3.3-py3-none-any.whl", hash = "sha256:8b0a0ced04a329ff7341b01007580dd8cf71331cc21c0ccea677d500618da1e0", size = 10235 }, +] + [[package]] name = "starlette" version = "0.46.2" source = { registry = "https://pypi.org/simple" } dependencies = [ - { name = "anyio", marker = "sys_platform != 'win32'" }, + { name = "anyio", marker = "python_full_version >= '3.10' or sys_platform != 'win32'" }, { name = "typing-extensions", marker = "python_full_version < '3.10' and sys_platform != 'win32'" }, ] sdist = { url = "https://files.pythonhosted.org/packages/ce/20/08dfcd9c983f6a6f4a1000d934b9e6d626cff8d2eeb77a89a68eef20a2b7/starlette-0.46.2.tar.gz", hash = "sha256:7f7361f34eed179294600af672f565727419830b54b7b084efe44bb82d2fccd5", size = 2580846 } @@ -2981,9 +3040,9 @@ name = "uvicorn" version = "0.29.0" source = { registry = "https://pypi.org/simple" } dependencies = [ - { name = "click", marker = "sys_platform != 'win32'" }, - { name = "h11", marker = "sys_platform != 'win32'" }, - { name = "typing-extensions", marker = "python_full_version < '3.11' and sys_platform != 'win32'" }, + { name = "click", marker = "python_full_version >= '3.10' or sys_platform != 'win32'" }, + { name = "h11", marker = "python_full_version >= '3.10' or sys_platform != 'win32'" }, + { name = "typing-extensions", marker = "(python_full_version < '3.11' and sys_platform != 'win32') or (python_full_version == '3.10.*' and sys_platform == 'win32')" }, ] sdist = { url = "https://files.pythonhosted.org/packages/49/8d/5005d39cd79c9ae87baf7d7aafdcdfe0b13aa69d9a1e3b7f1c984a2ac6d2/uvicorn-0.29.0.tar.gz", hash = "sha256:6a69214c0b6a087462412670b3ef21224fa48cae0e452b5883e8e8bdfdd11dd0", size = 40894 } wheels = [ From b41df0da1d7a8f81c73b12b8651229d9c25e88a7 Mon Sep 17 00:00:00 2001 From: Thanabordee Date: Sun, 27 Apr 2025 17:21:27 +0700 Subject: [PATCH 09/11] Add comprehensive MCP integration examples and documentation - Introduced basic MCP integration example in `basic_mcp_integration.py` demonstrating the use of DSPy with MCP tools. - Developed a combined tools agent in `combined_tools_agent.py` that integrates MCP tools with standard Python functions. - Created a direct tool call example in `direct_tool_call.py` for executing MCP tools without an agent. - Expanded documentation in `mcp_docs.md` to cover MCP integration, usage patterns, best practices, and troubleshooting. - Implemented a FastAPI server integration in `mcp_fastapi_server.py` for handling queries with MCP tools. - Added robust MCP agent example in `robust_mcp_agent.py` with improved error handling and resource management. - Included standard DSPy tools example in `standard_dspy_tools.py` showcasing the use of regular Python functions as tools. --- mcp_docs/basic_mcp_integration.py | 58 ++ mcp_docs/combined_tools_agent.py | 106 ++++ mcp_docs/direct_tool_call.py | 100 ++++ mcp_docs/mcp_docs.md | 899 ++++++++++++++++++++++++++++++ mcp_docs/mcp_fastapi_server.py | 149 +++++ mcp_docs/robust_mcp_agent.py | 106 ++++ mcp_docs/standard_dspy_tools.py | 35 ++ 7 files changed, 1453 insertions(+) create mode 100644 mcp_docs/basic_mcp_integration.py create mode 100644 mcp_docs/combined_tools_agent.py create mode 100644 mcp_docs/direct_tool_call.py create mode 100644 mcp_docs/mcp_docs.md create mode 100644 mcp_docs/mcp_fastapi_server.py create mode 100644 mcp_docs/robust_mcp_agent.py create mode 100644 mcp_docs/standard_dspy_tools.py diff --git a/mcp_docs/basic_mcp_integration.py b/mcp_docs/basic_mcp_integration.py new file mode 100644 index 0000000000..4edd27399f --- /dev/null +++ b/mcp_docs/basic_mcp_integration.py @@ -0,0 +1,58 @@ +import os +import sys +import json +import asyncio +from mcp import ClientSession, StdioServerParameters +from mcp.client.stdio import stdio_client + +import dspy + + +async def main(): + """Main entry point for the async application.""" + try: + print("Starting MCP client initialization...") + # Configure DSPy with LLM + LLM = dspy.LM("gemini/gemini-2.0-flash", api_key=os.getenv("GOOGLE_API_KEY")) + dspy.configure(lm=LLM) + + # Initialize MCP server and tools + server_params = StdioServerParameters( + command="npx", + args=[ + "-y", + "@openbnb/mcp-server-airbnb", + "--ignore-robots-txt" + ]) + async with stdio_client(server_params) as (read, write): + async with ClientSession(read, write) as session: + await session.initialize() + tools = await session.list_tools() + tools = tools.tools + print("\nCreating MCPTools instance...") + mcp_tools = dspy.MCPTools(session=session, tools_list=tools) + + # Create ReAct agent in the same async context + react_agent = dspy.ReAct("input->output", mcp_tools.get_tools()) + + # Run the agent (will use the existing event loop) + print("\nRunning ReAct agent...") + react_result = await react_agent( + input="Find a place to stay in New York for 2 adults from 2025-05-01 to 2025-05-05." + ) + + # # If the result is a coroutine (which it will be in an async context), await it + # if asyncio.iscoroutine(react_result): + # react_result = await react_result + + print("\nReAct Result:") + print(react_result) + + except Exception as e: + print(f"Error in main: {str(e)}") + raise + +if __name__ == "__main__": + asyncio.run(main()) + + diff --git a/mcp_docs/combined_tools_agent.py b/mcp_docs/combined_tools_agent.py new file mode 100644 index 0000000000..71a00a764c --- /dev/null +++ b/mcp_docs/combined_tools_agent.py @@ -0,0 +1,106 @@ +import os +import sys +import json +import asyncio +from mcp import ClientSession, StdioServerParameters +from mcp.client.stdio import stdio_client +from typing import Any +from contextlib import AsyncExitStack + +import dspy + +stdio_context: Any | None = None +session: ClientSession | None = None +_cleanup_lock: asyncio.Lock = asyncio.Lock() +exit_stack: AsyncExitStack = AsyncExitStack() + +def calculate_sum(a: int, b: int) -> int: + """Calculate the sum of two integers.""" + return a + b + +async def initialize_stdio_client( + command: str, + command_args: list[str] = [], + env: dict[str, str] | None = None +): + global stdio_context, session, exit_stack + if stdio_context is not None: + return stdio_context + + print(f"Initializing MCP server with command: {command} {' '.join(command_args)}") + server_params = StdioServerParameters( + command=command, + args=command_args, + env=env if env else None + ) + try: + stdio_transport = await exit_stack.enter_async_context( + stdio_client(server_params) + ) + read, write = stdio_transport + session = await exit_stack.enter_async_context( + ClientSession(read, write) + ) + await session.initialize() + tools = await session.list_tools() + + return session, tools.tools + except Exception as e: + print(f"Error initializing MCP server: {str(e)}") + await cleanup() + raise + +async def cleanup() -> None: + """Clean up server resources.""" + global stdio_context, session, exit_stack + print("Cleaning up MCP server resources...") + async with _cleanup_lock: + await exit_stack.aclose() + session = None + stdio_context = None + print("Cleanup complete.") + +async def main(): + """Main entry point for the async application.""" + try: + print("Starting MCP client initialization...") + # Configure DSPy with LLM + LLM = dspy.LM("gemini/gemini-2.0-flash", api_key=os.getenv("GOOGLE_API_KEY")) + dspy.configure(lm=LLM) + + # Initialize MCP server and tools + session, tool_list = await initialize_stdio_client( + command="npx", + command_args=[ + "-y", + "@openbnb/mcp-server-airbnb", + "--ignore-robots-txt" + ], + env={} + ) + + print("\nCreating MCPTools instance...") + mcp_tools = dspy.MCPTools(session=session, tools_list=tool_list) + + # Create ReAct agent in the same async context + react_agent = dspy.ReAct("input->output", mcp_tools.get_tools()+[calculate_sum]) + + # Run the agent (will use the existing event loop) + print("\nRunning ReAct agent...") + react_result = await react_agent( + input="Find a place to stay in New York for 2 adults from 2025-05-01 to 2025-05-05. and calculate the sum of 5 and 10.", + ) + + print("\nReAct Result:") + print(react_result.output) + + except Exception as e: + print(f"Error in main: {str(e)}") + raise + finally: + await cleanup() + +if __name__ == "__main__": + asyncio.run(main()) + + diff --git a/mcp_docs/direct_tool_call.py b/mcp_docs/direct_tool_call.py new file mode 100644 index 0000000000..d012f735b5 --- /dev/null +++ b/mcp_docs/direct_tool_call.py @@ -0,0 +1,100 @@ +import os +import sys +import json + + +from mcp import ClientSession, StdioServerParameters +from mcp.client.stdio import stdio_client +from typing import Any +import asyncio +import dspy # Using absolute import now +from contextlib import AsyncExitStack + +stdio_context: Any | None = None +session: ClientSession | None = None +_cleanup_lock: asyncio.Lock = asyncio.Lock() +exit_stack: AsyncExitStack = AsyncExitStack() + +async def initialize_stdio_client( + command: str, + command_args: list[str] = [], + env: dict[str, str] | None = None +): + global stdio_context, session, exit_stack + if stdio_context is not None: + return stdio_context + + print(f"Initializing MCP server with command: {command} {' '.join(command_args)}") + server_params = StdioServerParameters( + command=command, + args=command_args, + env=env if env else None + ) + try: + stdio_transport = await exit_stack.enter_async_context( + stdio_client(server_params) + ) + read, write = stdio_transport + session = await exit_stack.enter_async_context( + ClientSession(read, write) + ) + await session.initialize() + tools = await session.list_tools() + + return session, tools.tools + except Exception as e: + print(f"Error initializing MCP server: {str(e)}") + await cleanup() + raise + +async def cleanup() -> None: + """Clean up server resources.""" + global stdio_context, session, exit_stack + print("Cleaning up MCP server resources...") + async with _cleanup_lock: + await exit_stack.aclose() + session = None + stdio_context = None + print("Cleanup complete.") + + +# Create a safer async main function to properly handle errors +async def async_main(): + global session + try: + + # Try initializing the MCP server - with proper error handling + print("Starting MCP client initialization...") + session, tool_list = await initialize_stdio_client( + command="npx", + command_args=[ + "-y", + "@openbnb/mcp-server-airbnb", + "--ignore-robots-txt" + ], + env={} + ) + + print("\nCreating MCPTools instance...") + mcp_tools = dspy.MCPTools(session=session, tools_list=tool_list) + print("MCPTools instance created successfully.") + tool = mcp_tools.get_tools()[0] + + + search_result = await tool.aexecute( + location="New York", + checkin="2025-05-01", + checkout="2025-05-05", + adults=2 + ) + print("\nSearch Result:", search_result[:500] + "..." if len(str(search_result)) > 500 else search_result) + + + finally: + # Make sure to clean up resources + if session: + await cleanup() + +# Run the main async function +if __name__ == "__main__": + asyncio.run(async_main()) diff --git a/mcp_docs/mcp_docs.md b/mcp_docs/mcp_docs.md new file mode 100644 index 0000000000..75064cd39e --- /dev/null +++ b/mcp_docs/mcp_docs.md @@ -0,0 +1,899 @@ +# DSPy with Model Context Protocol (MCP) - Integration Guide + +This documentation provides a comprehensive guide to integrating and using Model Context Protocol (MCP) tools with DSPy agents and frameworks. + +## Table of Contents + +1. [Introduction to Model Context Protocol (MCP)](#introduction-to-model-context-protocol-mcp) +2. [DSPy and MCP Integration](#dspy-and-mcp-integration) +3. [Getting Started](#getting-started) + - [Prerequisites](#prerequisites) + - [Basic Setup](#basic-setup) +4. [Working with MCP Tools](#working-with-mcp-tools) + - [Creating MCP Tools](#creating-mcp-tools) + - [Using MCP Tools with DSPy Agents](#using-mcp-tools-with-dspy-agents) +5. [Implementation Patterns](#implementation-patterns) + - [Simple MCP Integration](#simple-mcp-integration) + - [MCP with Non-MCP Tools](#mcp-with-non-mcp-tools) + - [FastAPI Integration](#fastapi-integration) +6. [Best Practices](#best-practices) +7. [Troubleshooting](#troubleshooting) +8. [API Reference](#api-reference) +9. [Examples](#examples) + - [Simple MCP Integration](#example-simple-mcp-integration) + - [Advanced MCP Agent](#example-advanced-mcp-agent) + - [Direct MCP Tool Calls](#example-direct-mcp-tool-calls) + - [Non-MCP Tools](#example-non-mcp-tools) + - [Combined MCP and Non-MCP Tools](#example-combined-mcp-and-non-mcp-tools) + - [FastAPI Server Integration](#example-fastapi-server-integration) + +## Introduction to Model Context Protocol (MCP) + +The Model Context Protocol (MCP) is a standard for defining tools and capabilities for AI models. It defines a JSON-RPC based interface that allows models to interact with external tools and services. MCP helps create a consistent interface for tools that can be used across different models and platforms. + +Key features of MCP: +- Standardized tool definitions with JSON schema +- Consistent method for tool execution +- Support for asynchronous operations +- Extensible design for various tool types + +## DSPy and MCP Integration + +DSPy, a framework for programming foundation models with programs and feedback, integrates with MCP to allow models to use tools defined in the MCP specification. This integration provides several benefits: + +- Use external tools with DSPy agents like ReAct +- Leverage existing MCP tool libraries +- Combine MCP tools with native DSPy tools +- Build complex multi-tool applications + +The integration is provided through: +- `MCPTools` class: A container for MCP tools that makes them compatible with DSPy +- `MCPTool` class: A wrapper for individual MCP tools that converts them to DSPy-compatible tools + +## Getting Started + +### Prerequisites + +To use MCP with DSPy, you'll need: + +- Python 3.9 or higher +- DSPy installed (`pip install dspy`) +- The MCP client package (`pip install mcp`) +- Node.js (for running JavaScript-based MCP servers) + +### Basic Setup + +The basic workflow for using MCP tools with DSPy involves: + +1. Initializing an MCP server +2. Establishing an MCP client session +3. Retrieving available tools +4. Creating DSPy wrappers for these tools +5. Using the tools with DSPy reAct + +Here's a minimal example: + +```python +import asyncio +import os +import dspy + +from mcp import ClientSession, StdioServerParameters +from mcp.client.stdio import stdio_client + +async def main(): + # Configure DSPy with an LLM + LLM = dspy.LM("gemini/gemini-2.0-flash", api_key=os.getenv("GOOGLE_API_KEY")) + dspy.configure(lm=LLM) + + # Initialize MCP server and tools + server_params = StdioServerParameters( + command="npx", + args=["-y", "@openbnb/mcp-server-airbnb", "--ignore-robots-txt"] + ) + + async with stdio_client(server_params) as (read, write): + async with ClientSession(read, write) as session: + await session.initialize() + tools = await session.list_tools() + + # Create MCPTools instance + mcp_tools = dspy.MCPTools(session=session, tools_list=tools.tools) + + # Create ReAct agent with MCP tools + react_agent = dspy.ReAct("input->output", mcp_tools.get_tools()) + + # Run the agent + result = await react_agent( + input="Find a place to stay in New York for 2 adults from May 1-5, 2025." + ) + + print(result) + +if __name__ == "__main__": + asyncio.run(main()) +``` + +## Working with MCP Tools + +### Creating MCP Tools + +DSPy provides a wrapper around MCP tools through the `MCPTools` and `MCPTool` classes. These convert MCP tool definitions into DSPy-compatible tools. + +The `MCPTools` class handles: +- Converting MCP tool definitions to DSPy tools +- Managing the MCP session +- Providing access to tools by name +- Returning all tools as a list for use with agents + +### Using MCP Tools with DSPy Agents + +Once wrapped, MCP tools can be used with any DSPy agent, including ReAct. The tools are used like any other DSPy tool: + +```python +# Create ReAct agent with MCP tools +react_agent = dspy.ReAct("input->output", mcp_tools.get_tools()) + +# Run the agent with a query +result = await react_agent( + input="Find a place to stay in New York for 2 adults from May 1-5, 2025." +) +``` + +The agent will be able to: +1. Understand the available tools +2. Choose the appropriate tool for a task +3. Execute the tool with proper arguments +4. Process the results and continue reasoning + +## Implementation Patterns + +### Simple MCP Integration + +For a simple integration with clean resource management, use Python's context managers: + +```python +async def main(): + # Initialize MCP server and tools + server_params = StdioServerParameters(command="npx", args=[...]) + async with stdio_client(server_params) as (read, write): + async with ClientSession(read, write) as session: + await session.initialize() + tools = await session.list_tools() + + # Create MCPTools and agent + mcp_tools = dspy.MCPTools(session=session, tools_list=tools.tools) + react_agent = dspy.ReAct("input->output", mcp_tools.get_tools()) + + # Run the agent + result = await react_agent(input="Your query here") +``` + +### MCP with Non-MCP Tools + +You can combine MCP tools with regular DSPy tools: + +```python +def calculate_sum(a: int, b: int) -> int: + """Calculate the sum of two integers.""" + return a + b + +# Create ReAct agent with both MCP and regular tools +react_agent = dspy.ReAct("input->output", mcp_tools.get_tools() + [calculate_sum]) +``` + +### FastAPI Integration + +For production applications, you can integrate MCP tools with a FastAPI server: + +```python +@asynccontextmanager +async def lifespan(app: FastAPI): + # Initialize resources + session, tool_list = await initialize_stdio_client() + mcp_tools = dspy.MCPTools(session=session, tools_list=tool_list) + global react_agent + react_agent = dspy.ReAct("input->output", mcp_tools.get_tools()) + yield + # Cleanup resources + await cleanup() + +app = FastAPI(lifespan=lifespan) + +@app.post("/api/query") +async def process_query(request: QueryRequest): + react_result = await react_agent(input=request.query) + return QueryResponse(result=react_result.output, status="success") +``` + +## Best Practices + +1. **Resource Management** + - Always clean up MCP resources properly + - Use context managers or a dedicated cleanup function + - Handle connection errors gracefully + +2. **Error Handling** + - Wrap MCP tool calls in try/except blocks + - Provide meaningful error messages to agents + - Handle connection issues and timeouts + +3. **Asynchronous Operations** + - Use `async`/`await` consistently + - Don't mix sync and async code without proper planning + - Understand event loop behavior when using `asyncio.run()` + +4. **Tool Composition** + - Combine MCP tools with regular DSPy tools when needed + - Consider tool dependencies and interactions + - Design tools with specific, focused purposes + +## Troubleshooting + +### Common Issues + +1. **MCP Server Connection Failures** + - Ensure the MCP server package is installed (`npm install -g @openbnb/mcp-server-airbnb`) + - Check for network issues or firewall restrictions + - Verify Node.js is properly installed + +2. **Tool Execution Errors** + - Validate input parameters against the tool's schema + - Check for rate limiting or API key issues with external services + - Examine logs for detailed error messages + +3. **Event Loop Issues** + - Avoid mixing sync and async code improperly + - Use `asyncio.run()` at the top level only + - Ensure cleanup happens in the same task context as initialization + +## API Reference + +### MCPTools Class + +```python +class MCPTools: + def __init__(self, session: Any, tools_list: List[Any]): + """Initialize the MCPTools collection.""" + + def __getitem__(self, tool_name: str) -> MCPTool: + """Get a tool by name.""" + + def get_tools(self) -> List[Tool]: + """Get all tools as a list.""" + + def get_tool_names(self) -> List[str]: + """Get names of all available tools.""" +``` + +### MCPTool Class + +```python +class MCPTool(Tool): + def __init__(self, tool_info: Any, session: Any): + """Create a DSPy Tool from an MCP tool description.""" + + async def call_tool_async(self, **kwargs: Any) -> Any: + """Execute the MCP tool asynchronously.""" +``` + +### Helper Functions + +```python +def map_json_schema_to_tool_args(schema: Dict[str, Any]) -> Tuple[Dict[str, Any], Dict[str, Type], Dict[str, str]]: + """Maps a JSON schema to tool arguments compatible with DSPy Tool.""" + +async def initialize_stdio_client(command: str, command_args: list[str] = [], env: dict[str, str] | None = None): + """Initialize an MCP client session with the specified command.""" + +async def cleanup() -> None: + """Clean up MCP server resources.""" +``` + +## Examples + +The following examples demonstrate different patterns for using MCP with DSPy. Each example is designed to showcase a specific integration pattern or use case. + +### Example: Simple MCP Integration + +This example shows the simplest way to integrate MCP tools with DSPy using context managers for clean resource handling: + +```python +# simple_mcp.py +import os +import sys +import json +import asyncio +from mcp import ClientSession, StdioServerParameters +from mcp.client.stdio import stdio_client + +import dspy + + + +async def main(): + """Main entry point for the async application.""" + try: + print("Starting MCP client initialization...") + # Configure DSPy with LLM + LLM = dspy.LM("gemini/gemini-2.0-flash", api_key=os.getenv("GOOGLE_API_KEY")) + dspy.configure(lm=LLM) + + # Initialize MCP server and tools + server_params = StdioServerParameters( + command="npx", + args=[ + "-y", + "@openbnb/mcp-server-airbnb", + "--ignore-robots-txt" + ]) + async with stdio_client(server_params) as (read, write): + async with ClientSession(read, write) as session: + await session.initialize() + tools = await session.list_tools() + tools = tools.tools + print("\nCreating MCPTools instance...") + mcp_tools = dspy.MCPTools(session=session, tools_list=tools) + + # Create ReAct agent in the same async context + react_agent = dspy.ReAct("input->output", mcp_tools.get_tools()) + + # Run the agent (will use the existing event loop) + print("\nRunning ReAct agent...") + react_result = await react_agent( + input="Find a place to stay in New York for 2 adults from 2025-05-01 to 2025-05-05." + ) + + print("\nReAct Result:") + print(react_result) + + except Exception as e: + print(f"Error in main: {str(e)}") + raise + +if __name__ == "__main__": + asyncio.run(main()) +``` + +This approach uses Python's context managers (`async with`) to handle MCP resources properly. The context managers ensure that resources are cleaned up even if an exception occurs. + +### Example: Advanced MCP Agent + +This example shows a more robust implementation with proper resource management using a dedicated resource management pattern: + +```python +# agent.py +import os +import sys +import json +import asyncio +from mcp import ClientSession, StdioServerParameters +from mcp.client.stdio import stdio_client +from typing import Any +from contextlib import AsyncExitStack + +import dspy + + +stdio_context: Any | None = None +session: ClientSession | None = None +_cleanup_lock: asyncio.Lock = asyncio.Lock() +exit_stack: AsyncExitStack = AsyncExitStack() + +async def initialize_stdio_client( + command: str, + command_args: list[str] = [], + env: dict[str, str] | None = None +): + global stdio_context, session, exit_stack + if stdio_context is not None: + return stdio_context + + print(f"Initializing MCP server with command: {command} {' '.join(command_args)}") + server_params = StdioServerParameters( + command=command, + args=command_args, + env=env if env else None + ) + try: + stdio_transport = await exit_stack.enter_async_context( + stdio_client(server_params) + ) + read, write = stdio_transport + session = await exit_stack.enter_async_context( + ClientSession(read, write) + ) + await session.initialize() + tools = await session.list_tools() + + return session, tools.tools + except Exception as e: + print(f"Error initializing MCP server: {str(e)}") + await cleanup() + raise + +async def cleanup() -> None: + """Clean up server resources.""" + global stdio_context, session, exit_stack + print("Cleaning up MCP server resources...") + async with _cleanup_lock: + await exit_stack.aclose() + session = None + stdio_context = None + print("Cleanup complete.") + +async def main(): + """Main entry point for the async application.""" + try: + print("Starting MCP client initialization...") + # Configure DSPy with LLM + LLM = dspy.LM("gemini/gemini-2.0-flash", api_key=os.getenv("GOOGLE_API_KEY")) + dspy.configure(lm=LLM) + + # Initialize MCP server and tools + session, tool_list = await initialize_stdio_client( + command="npx", + command_args=[ + "-y", + "@openbnb/mcp-server-airbnb", + "--ignore-robots-txt" + ], + env={} + ) + + print("\nCreating MCPTools instance...") + mcp_tools = dspy.MCPTools(session=session, tools_list=tool_list) + + # Create ReAct agent in the same async context + react_agent = dspy.ReAct("input->output", mcp_tools.get_tools()) + + # Run the agent (will use the existing event loop) + print("\nRunning ReAct agent...") + react_result = await react_agent( + input="Find a place to stay in New York for 2 adults from 2025-05-01 to 2025-05-05." + ) + + print("\nReAct Result:") + print(react_result) + + except Exception as e: + print(f"Error in main: {str(e)}") + raise + finally: + await cleanup() + +if __name__ == "__main__": + asyncio.run(main()) +``` + +This implementation uses a more structured approach with dedicated functions for initialization and cleanup. It also handles exceptions more carefully with a `finally` block to ensure resources are always cleaned up. + +### Example: Direct MCP Tool Calls + +This example demonstrates how to use MCP tools directly without a ReAct agent: + +```python +# direct.py +import os +import sys +import json + + +from mcp import ClientSession, StdioServerParameters +from mcp.client.stdio import stdio_client +from typing import Any +import asyncio +import dspy +from contextlib import AsyncExitStack + +stdio_context: Any | None = None +session: ClientSession | None = None +_cleanup_lock: asyncio.Lock = asyncio.Lock() +exit_stack: AsyncExitStack = AsyncExitStack() + +async def initialize_stdio_client( + command: str, + command_args: list[str] = [], + env: dict[str, str] | None = None +): + global stdio_context, session, exit_stack + if stdio_context is not None: + return stdio_context + + print(f"Initializing MCP server with command: {command} {' '.join(command_args)}") + server_params = StdioServerParameters( + command=command, + args=command_args, + env=env if env else None + ) + try: + stdio_transport = await exit_stack.enter_async_context( + stdio_client(server_params) + ) + read, write = stdio_transport + session = await exit_stack.enter_async_context( + ClientSession(read, write) + ) + await session.initialize() + tools = await session.list_tools() + + return session, tools.tools + except Exception as e: + print(f"Error initializing MCP server: {str(e)}") + await cleanup() + raise + +async def cleanup() -> None: + """Clean up server resources.""" + global stdio_context, session, exit_stack + print("Cleaning up MCP server resources...") + async with _cleanup_lock: + await exit_stack.aclose() + session = None + stdio_context = None + print("Cleanup complete.") + + +# Create a safer async main function to properly handle errors +async def async_main(): + global session + try: + + # Try initializing the MCP server - with proper error handling + print("Starting MCP client initialization...") + session, tool_list = await initialize_stdio_client( + command="npx", + command_args=[ + "-y", + "@openbnb/mcp-server-airbnb", + "--ignore-robots-txt" + ], + env={} + ) + + print("\nCreating MCPTools instance...") + mcp_tools = dspy.utils.MCPTools(session=session, tools_list=tool_list) + print("MCPTools instance created successfully.") + tool = mcp_tools.get_tools()[0] + + + search_result = await tool.aexecute( + location="New York", + checkin="2025-05-01", + checkout="2025-05-05", + adults=2 + ) + print("\nSearch Result:", search_result[:500] + "..." if len(str(search_result)) > 500 else search_result) + + + finally: + # Make sure to clean up resources + if session: + await cleanup() + +# Run the main async function +if __name__ == "__main__": + asyncio.run(async_main()) +``` + +This example shows how to use MCPTools without a ReAct agent, directly calling a tool with specific parameters. This approach is useful when you want more control over the tool execution or when you want to use MCP tools in a non-agent context. + +### Example: Non-MCP Tools + +DSPy can also work with regular Python functions as tools, independent of MCP: + +```python +# non_mpc.py +import dspy +import os +def calculate_sum(a: int, b: int) -> int: + """Calculate the sum of two integers.""" + return a + b + +def main(): + """Main entry point for the async application.""" + print("Starting MCP client initialization...") + # Configure DSPy with LLM + LLM = dspy.LM("gemini/gemini-2.0-flash", api_key=os.getenv("GOOGLE_API_KEY")) + dspy.configure(lm=LLM) + + + print("\nCreating MCPTools instance...") + tools = [calculate_sum] + + # Create ReAct agent in the same async context + react_agent = dspy.ReAct("input->output", tools) + + # Run the agent (will use the existing event loop) + print("\nRunning ReAct agent...") + react_result = react_agent( + input="what is the sum of 5 and 10?", + ) + + + print("\nReAct Result:") + print(react_result) + +if __name__ == "__main__": + main() +``` + +This example demonstrates how to use regular Python functions as DSPy tools without any MCP integration. This is useful for simple tools or when you want to build applications that don't rely on external MCP servers. + +### Example: Combined MCP and Non-MCP Tools + +You can combine MCP tools with regular Python functions: + +```python +# agent_mcp_non_mcp.py +import os +import sys +import json +import asyncio +from mcp import ClientSession, StdioServerParameters +from mcp.client.stdio import stdio_client +from typing import Any +from contextlib import AsyncExitStack + +import dspy + + +stdio_context: Any | None = None +session: ClientSession | None = None +_cleanup_lock: asyncio.Lock = asyncio.Lock() +exit_stack: AsyncExitStack = AsyncExitStack() + +def calculate_sum(a: int, b: int) -> int: + """Calculate the sum of two integers.""" + return a + b + +async def initialize_stdio_client( + command: str, + command_args: list[str] = [], + env: dict[str, str] | None = None +): + global stdio_context, session, exit_stack + if stdio_context is not None: + return stdio_context + + print(f"Initializing MCP server with command: {command} {' '.join(command_args)}") + server_params = StdioServerParameters( + command=command, + args=command_args, + env=env if env else None + ) + try: + stdio_transport = await exit_stack.enter_async_context( + stdio_client(server_params) + ) + read, write = stdio_transport + session = await exit_stack.enter_async_context( + ClientSession(read, write) + ) + await session.initialize() + tools = await session.list_tools() + + return session, tools.tools + except Exception as e: + print(f"Error initializing MCP server: {str(e)}") + await cleanup() + raise + +async def cleanup() -> None: + """Clean up server resources.""" + global stdio_context, session, exit_stack + print("Cleaning up MCP server resources...") + async with _cleanup_lock: + await exit_stack.aclose() + session = None + stdio_context = None + print("Cleanup complete.") + +async def main(): + """Main entry point for the async application.""" + try: + print("Starting MCP client initialization...") + # Configure DSPy with LLM + LLM = dspy.LM("gemini/gemini-2.0-flash", api_key=os.getenv("GOOGLE_API_KEY")) + dspy.configure(lm=LLM) + + # Initialize MCP server and tools + session, tool_list = await initialize_stdio_client( + command="npx", + command_args=[ + "-y", + "@openbnb/mcp-server-airbnb", + "--ignore-robots-txt" + ], + env={} + ) + + print("\nCreating MCPTools instance...") + mcp_tools = dspy.MCPTools(session=session, tools_list=tool_list) + + # Create ReAct agent in the same async context + react_agent = dspy.ReAct("input->output", mcp_tools.get_tools()+[calculate_sum]) + + # Run the agent (will use the existing event loop) + print("\nRunning ReAct agent...") + react_result = await react_agent( + input="Find a place to stay in New York for 2 adults from 2025-05-01 to 2025-05-05. and calculate the sum of 5 and 10.", + ) + + print("\nReAct Result:") + print(react_result.output) + + except Exception as e: + print(f"Error in main: {str(e)}") + raise + finally: + await cleanup() + +if __name__ == "__main__": + asyncio.run(main()) +``` + +This example showcases how to combine MCP tools with simple Python functions in a single ReAct agent. This hybrid approach allows you to leverage both external MCP tools and custom local functionality. + +### Example: FastAPI Server Integration + +For production applications, you can integrate MCP tools with a FastAPI server: + +```python +# fastapi_dspy_mcp.py +import os +import sys +import json +import asyncio +from typing import Any, Dict, List, Optional +from contextlib import AsyncExitStack, asynccontextmanager + +from fastapi import FastAPI, HTTPException, BackgroundTasks, Depends +from fastapi.middleware.cors import CORSMiddleware +from pydantic import BaseModel + +from mcp import ClientSession, StdioServerParameters +from mcp.client.stdio import stdio_client + +import dspy + + +# Global variables for MCP client session +stdio_context: Any | None = None +session: ClientSession | None = None +_cleanup_lock: asyncio.Lock = asyncio.Lock() +exit_stack: AsyncExitStack = AsyncExitStack() + +# Default MCP configuration +DEFAULT_MODEL = "gemini/gemini-2.0-flash" +DEFAULT_MCP_COMMAND = "npx" +DEFAULT_MCP_ARGS = ["-y", "@openbnb/mcp-server-airbnb", "--ignore-robots-txt"] +DEFAULT_ENV_VARS = {} + +react_agent: Any | None = None + +# Define lifespan context manager +@asynccontextmanager +async def lifespan(app: FastAPI): + global react_agent, session, exit_stack + # Configure DSPy with LLM + api_key = os.getenv("GOOGLE_API_KEY") + if not api_key: + raise HTTPException(status_code=400, detail="GOOGLE_API_KEY environment variable not set") + + LLM = dspy.LM(DEFAULT_MODEL, api_key=api_key) + dspy.configure(lm=LLM) + + # Initialize MCP server and tools + session, tool_list = await initialize_stdio_client() + + # Create MCPTools instance + mcp_tools = dspy.MCPTools(session=session, tools_list=tool_list) + + # Create ReAct agent in the same async context + react_agent = dspy.ReAct("input->output", mcp_tools.get_tools()) + + yield + # Shutdown - clean up resources + # This will be executed in the same task context where resources were initialized + await cleanup() + +# Create FastAPI app with lifespan +app = FastAPI( + title="DSPy MCP API", + description="FastAPI server for DSPy Model Context Protocol interactions", + version="0.1.0", + lifespan=lifespan +) + +# Add CORS middleware +app.add_middleware( + CORSMiddleware, + allow_origins=["*"], # Modify for production + allow_credentials=True, + allow_methods=["*"], + allow_headers=["*"], +) + +# Pydantic models for request/response +class QueryRequest(BaseModel): + query: str + +class QueryResponse(BaseModel): + result: Any + status: str + +# MCP initialization function +async def initialize_stdio_client( + command: str = DEFAULT_MCP_COMMAND, + command_args: list[str] = DEFAULT_MCP_ARGS, + env: dict[str, str] | None = DEFAULT_ENV_VARS +): + global stdio_context, session, exit_stack, react_agent + if stdio_context is not None: + return session, stdio_context + + print(f"Initializing MCP server with command: {command} {' '.join(command_args)}") + server_params = StdioServerParameters( + command=command, + args=command_args, + env=env if env else None + ) + try: + stdio_transport = await exit_stack.enter_async_context( + stdio_client(server_params) + ) + read, write = stdio_transport + session = await exit_stack.enter_async_context( + ClientSession(read, write) + ) + await session.initialize() + tools = await session.list_tools() + stdio_context = tools.tools + + return session, tools.tools + except Exception as e: + print(f"Error initializing MCP server: {str(e)}") + await cleanup() + raise + +async def cleanup() -> None: + """Clean up server resources.""" + global stdio_context, session, exit_stack + print("Cleaning up MCP server resources...") + async with _cleanup_lock: + if session is not None: + await exit_stack.aclose() + session = None + stdio_context = None + print("Cleanup complete.") + +# FastAPI endpoints +@app.post("/api/query", response_model=QueryResponse) +async def process_query(request: QueryRequest): + try: + # Run the agent + react_result = await react_agent(input=request.query) + + return QueryResponse( + result=react_result.output, + status="success" + ) + + except Exception as e: + raise HTTPException(status_code=500, detail=str(e)) + +@app.get("/health") +async def health_check(): + return {"status": "healthy"} + +if __name__ == "__main__": + import uvicorn + uvicorn.run("fastapi_dspy_mcp:app", host="0.0.0.0", port=8000, reload=True) +``` + +This implementation shows how to create a production-ready FastAPI server that integrates DSPy's ReAct agent with MCP tools. It uses FastAPI's lifespan feature to manage the MCP resources and provides a clean API endpoint for queries. + +## Additional Resources + +- [MCP](https://github.com/modelcontextprotocol/python-sdk/) diff --git a/mcp_docs/mcp_fastapi_server.py b/mcp_docs/mcp_fastapi_server.py new file mode 100644 index 0000000000..b7af92903a --- /dev/null +++ b/mcp_docs/mcp_fastapi_server.py @@ -0,0 +1,149 @@ +import os +import sys +import json +import asyncio +from typing import Any, Dict, List, Optional +from contextlib import AsyncExitStack, asynccontextmanager + +from fastapi import FastAPI, HTTPException, BackgroundTasks, Depends +from fastapi.middleware.cors import CORSMiddleware +from pydantic import BaseModel + +from mcp import ClientSession, StdioServerParameters +from mcp.client.stdio import stdio_client + +import dspy + + +# Global variables for MCP client session +stdio_context: Any | None = None +session: ClientSession | None = None +_cleanup_lock: asyncio.Lock = asyncio.Lock() +exit_stack: AsyncExitStack = AsyncExitStack() + +# Default MCP configuration +DEFAULT_MODEL = "gemini/gemini-2.0-flash" +DEFAULT_MCP_COMMAND = "npx" +DEFAULT_MCP_ARGS = ["-y", "@openbnb/mcp-server-airbnb", "--ignore-robots-txt"] +DEFAULT_ENV_VARS = {} + +react_agent: Any | None = None + +# Define lifespan context manager +@asynccontextmanager +async def lifespan(app: FastAPI): + global react_agent, session, exit_stack + # Configure DSPy with LLM + api_key = os.getenv("GOOGLE_API_KEY") + if not api_key: + raise HTTPException(status_code=400, detail="GOOGLE_API_KEY environment variable not set") + + LLM = dspy.LM(DEFAULT_MODEL, api_key=api_key) + dspy.configure(lm=LLM) + + # Initialize MCP server and tools + session, tool_list = await initialize_stdio_client() + + # Create MCPTools instance + mcp_tools = dspy.MCPTools(session=session, tools_list=tool_list) + + # Create ReAct agent in the same async context + react_agent = dspy.ReAct("input->output", mcp_tools.get_tools()) + + yield + # Shutdown - clean up resources + # This will be executed in the same task context where resources were initialized + await cleanup() + +# Create FastAPI app with lifespan +app = FastAPI( + title="DSPy MCP API", + description="FastAPI server for DSPy Model Context Protocol interactions", + version="0.1.0", + lifespan=lifespan +) + +# Add CORS middleware +app.add_middleware( + CORSMiddleware, + allow_origins=["*"], # Modify for production + allow_credentials=True, + allow_methods=["*"], + allow_headers=["*"], +) + +# Pydantic models for request/response +class QueryRequest(BaseModel): + query: str + +class QueryResponse(BaseModel): + result: Any + status: str + +# MCP initialization function +async def initialize_stdio_client( + command: str = DEFAULT_MCP_COMMAND, + command_args: list[str] = DEFAULT_MCP_ARGS, + env: dict[str, str] | None = DEFAULT_ENV_VARS +): + global stdio_context, session, exit_stack, react_agent + if stdio_context is not None: + return session, stdio_context + + print(f"Initializing MCP server with command: {command} {' '.join(command_args)}") + server_params = StdioServerParameters( + command=command, + args=command_args, + env=env if env else None + ) + try: + stdio_transport = await exit_stack.enter_async_context( + stdio_client(server_params) + ) + read, write = stdio_transport + session = await exit_stack.enter_async_context( + ClientSession(read, write) + ) + await session.initialize() + tools = await session.list_tools() + stdio_context = tools.tools + + return session, tools.tools + except Exception as e: + print(f"Error initializing MCP server: {str(e)}") + await cleanup() + raise + +async def cleanup() -> None: + """Clean up server resources.""" + global stdio_context, session, exit_stack + print("Cleaning up MCP server resources...") + async with _cleanup_lock: + if session is not None: + await exit_stack.aclose() + session = None + stdio_context = None + print("Cleanup complete.") + +# FastAPI endpoints +@app.post("/api/query", response_model=QueryResponse) +async def process_query(request: QueryRequest): + try: + # Run the agent + react_result = await react_agent(input=request.query) + + return QueryResponse( + result=react_result.output, + status="success" + ) + + except Exception as e: + raise HTTPException(status_code=500, detail=str(e)) + +@app.get("/health") +async def health_check(): + return {"status": "healthy"} + +if __name__ == "__main__": + import uvicorn + uvicorn.run("fastapi_dspy_mcp:app", host="0.0.0.0", port=8000, reload=True) \ No newline at end of file diff --git a/mcp_docs/robust_mcp_agent.py b/mcp_docs/robust_mcp_agent.py new file mode 100644 index 0000000000..c2f1843ef8 --- /dev/null +++ b/mcp_docs/robust_mcp_agent.py @@ -0,0 +1,106 @@ +import os +import sys +import json +import asyncio +from mcp import ClientSession, StdioServerParameters +from mcp.client.stdio import stdio_client +from typing import Any +from contextlib import AsyncExitStack + +import dspy + +stdio_context: Any | None = None +session: ClientSession | None = None +_cleanup_lock: asyncio.Lock = asyncio.Lock() +exit_stack: AsyncExitStack = AsyncExitStack() + +async def initialize_stdio_client( + command: str, + command_args: list[str] = [], + env: dict[str, str] | None = None +): + global stdio_context, session, exit_stack + if stdio_context is not None: + return stdio_context + + print(f"Initializing MCP server with command: {command} {' '.join(command_args)}") + server_params = StdioServerParameters( + command=command, + args=command_args, + env=env if env else None + ) + try: + stdio_transport = await exit_stack.enter_async_context( + stdio_client(server_params) + ) + read, write = stdio_transport + session = await exit_stack.enter_async_context( + ClientSession(read, write) + ) + await session.initialize() + tools = await session.list_tools() + + return session, tools.tools + except Exception as e: + print(f"Error initializing MCP server: {str(e)}") + await cleanup() + raise + +async def cleanup() -> None: + """Clean up server resources.""" + global stdio_context, session, exit_stack + print("Cleaning up MCP server resources...") + async with _cleanup_lock: + await exit_stack.aclose() + session = None + stdio_context = None + print("Cleanup complete.") + +async def main(): + """Main entry point for the async application.""" + try: + print("Starting MCP client initialization...") + # Configure DSPy with LLM + LLM = dspy.LM("gemini/gemini-2.0-flash", api_key=os.getenv("GOOGLE_API_KEY")) + dspy.configure(lm=LLM) + + # Initialize MCP server and tools + session, tool_list = await initialize_stdio_client( + command="npx", + command_args=[ + "-y", + "@openbnb/mcp-server-airbnb", + "--ignore-robots-txt" + ], + env={} + ) + + print("\nCreating MCPTools instance...") + mcp_tools = dspy.MCPTools(session=session, tools_list=tool_list) + + # Create ReAct agent in the same async context + react_agent = dspy.ReAct("input->output", mcp_tools.get_tools()) + + # Run the agent (will use the existing event loop) + print("\nRunning ReAct agent...") + react_result = await react_agent( + input="Find a place to stay in New York for 2 adults from 2025-05-01 to 2025-05-05." + ) + + # # If the result is a coroutine (which it will be in an async context), await it + # if asyncio.iscoroutine(react_result): + # react_result = await react_result + + print("\nReAct Result:") + print(react_result) + + except Exception as e: + print(f"Error in main: {str(e)}") + raise + finally: + await cleanup() + +if __name__ == "__main__": + asyncio.run(main()) + + diff --git a/mcp_docs/standard_dspy_tools.py b/mcp_docs/standard_dspy_tools.py new file mode 100644 index 0000000000..a2e44d0c51 --- /dev/null +++ b/mcp_docs/standard_dspy_tools.py @@ -0,0 +1,35 @@ + +import dspy +import os +def calculate_sum(a: int, b: int) -> int: + """Calculate the sum of two integers.""" + return a + b + +def main(): + """Main entry point for the async application.""" + print("Starting MCP client initialization...") + # Configure DSPy with LLM + LLM = dspy.LM("gemini/gemini-2.0-flash", api_key=os.getenv("GOOGLE_API_KEY")) + dspy.configure(lm=LLM) + + + print("\nCreating MCPTools instance...") + tools = [calculate_sum] + + # Create ReAct agent in the same async context + react_agent = dspy.ReAct("input->output", tools) + + # Run the agent (will use the existing event loop) + print("\nRunning ReAct agent...") + react_result = react_agent( + input="what is the sum of 5 and 10?", + ) + + + print("\nReAct Result:") + print(react_result) + +if __name__ == "__main__": + main() + + From bba32d6786efd231601d5d6270a1677fabfd5464 Mon Sep 17 00:00:00 2001 From: Thanabordee Date: Sun, 27 Apr 2025 17:27:45 +0700 Subject: [PATCH 10/11] Refactor: remove unused imports from multiple files --- dspy/utils/MCPTools.py | 2 +- mcp_docs/basic_mcp_integration.py | 2 -- mcp_docs/combined_tools_agent.py | 2 -- mcp_docs/direct_tool_call.py | 3 --- mcp_docs/mcp_fastapi_server.py | 6 ++---- mcp_docs/robust_mcp_agent.py | 2 -- 6 files changed, 3 insertions(+), 14 deletions(-) diff --git a/dspy/utils/MCPTools.py b/dspy/utils/MCPTools.py index 6f1620d637..b0403bf6e9 100644 --- a/dspy/utils/MCPTools.py +++ b/dspy/utils/MCPTools.py @@ -1,4 +1,4 @@ -from typing import Any, Dict, List, Optional, Tuple, Type, Union +from typing import Any, Dict, List, Optional, Tuple, Type import json import logging import anyio diff --git a/mcp_docs/basic_mcp_integration.py b/mcp_docs/basic_mcp_integration.py index 4edd27399f..0a20700ba5 100644 --- a/mcp_docs/basic_mcp_integration.py +++ b/mcp_docs/basic_mcp_integration.py @@ -1,6 +1,4 @@ import os -import sys -import json import asyncio from mcp import ClientSession, StdioServerParameters from mcp.client.stdio import stdio_client diff --git a/mcp_docs/combined_tools_agent.py b/mcp_docs/combined_tools_agent.py index 71a00a764c..65502e2c1b 100644 --- a/mcp_docs/combined_tools_agent.py +++ b/mcp_docs/combined_tools_agent.py @@ -1,6 +1,4 @@ import os -import sys -import json import asyncio from mcp import ClientSession, StdioServerParameters from mcp.client.stdio import stdio_client diff --git a/mcp_docs/direct_tool_call.py b/mcp_docs/direct_tool_call.py index d012f735b5..e3a70c1c84 100644 --- a/mcp_docs/direct_tool_call.py +++ b/mcp_docs/direct_tool_call.py @@ -1,6 +1,3 @@ -import os -import sys -import json from mcp import ClientSession, StdioServerParameters diff --git a/mcp_docs/mcp_fastapi_server.py b/mcp_docs/mcp_fastapi_server.py index b7af92903a..93ff9a7435 100644 --- a/mcp_docs/mcp_fastapi_server.py +++ b/mcp_docs/mcp_fastapi_server.py @@ -1,11 +1,9 @@ import os -import sys -import json import asyncio -from typing import Any, Dict, List, Optional +from typing import Any from contextlib import AsyncExitStack, asynccontextmanager -from fastapi import FastAPI, HTTPException, BackgroundTasks, Depends +from fastapi import FastAPI, HTTPException from fastapi.middleware.cors import CORSMiddleware from pydantic import BaseModel diff --git a/mcp_docs/robust_mcp_agent.py b/mcp_docs/robust_mcp_agent.py index c2f1843ef8..01a52cacf3 100644 --- a/mcp_docs/robust_mcp_agent.py +++ b/mcp_docs/robust_mcp_agent.py @@ -1,6 +1,4 @@ import os -import sys -import json import asyncio from mcp import ClientSession, StdioServerParameters from mcp.client.stdio import stdio_client From de1069d40d0850da79278e616b52c84c6fbab602 Mon Sep 17 00:00:00 2001 From: Thanabordee Date: Mon, 28 Apr 2025 18:31:11 +0700 Subject: [PATCH 11/11] Refactor: simplify tool info extraction logic in MCPTool class --- dspy/utils/MCPTools.py | 16 +--------------- 1 file changed, 1 insertion(+), 15 deletions(-) diff --git a/dspy/utils/MCPTools.py b/dspy/utils/MCPTools.py index b0403bf6e9..e208f07e55 100644 --- a/dspy/utils/MCPTools.py +++ b/dspy/utils/MCPTools.py @@ -78,21 +78,7 @@ def _extract_tool_info(self, tool_info: Any) -> Tuple[str, str, Optional[Dict[st tool_info.description, getattr(tool_info, 'inputSchema', None) ) - - # Try dict format - if isinstance(tool_info, dict): - if 'name' in tool_info and 'description' in tool_info: - return tool_info['name'], tool_info['description'], tool_info.get('inputSchema') - - # Try JSON string - if isinstance(tool_info, str): - try: - parsed = json.loads(tool_info) - if isinstance(parsed, dict) and 'name' in parsed and 'description' in parsed: - return parsed['name'], parsed['description'], parsed.get('inputSchema') - except json.JSONDecodeError: - pass - + return str(tool_info), "No description available.", None async def call_tool_async(self, **kwargs: Any) -> Any: