-
Notifications
You must be signed in to change notification settings - Fork 434
feat: support streamablehttp and websocket mcp servers #696
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: support streamablehttp and websocket mcp servers #696
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Summary of Changes
Hello @suluyana, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!
This pull request extends the MCPClient's functionality by integrating support for streamable_http and websocket communication protocols. This allows the client to establish connections with a wider range of MCP servers, enhancing its versatility and adaptability to different network environments and server implementations.
Highlights
- New Connection Protocols: The
MCPClientnow supportsstreamable_httpandwebsocketas new transport types for connecting to MCP servers, alongside the existing SSE. - Dynamic Transport Selection: The
connect_to_servermethod has been updated to dynamically select the appropriate client transport based on thetransportortypekeyword arguments provided. - Timeout Configuration: New default timeout constants (
DEFAULT_STREAMABLE_HTTP_TIMEOUT,DEFAULT_STREAMABLE_HTTP_SSE_READ_TIMEOUT) have been introduced specifically forstreamable_httpconnections. - Session Management Changes: The explicit call to
session.initialize()and the storage of theClientSessionobject inself.sessionswithin theconnect_to_servermethod have been removed.
Using Gemini Code Assist
The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.
Invoking Gemini
You can request assistance from Gemini at any point in your pull request via creating an issue comment (i.e. comment on the pull request page) using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands.
| Feature | Command | Description |
|---|---|---|
| Code Review | /gemini review |
Performs a code review for the current pull request in its current state. |
| Pull Request Summary | /gemini summary |
Provides a summary of the current pull request in its current state. |
| Comment | @gemini-code-assist | Responds in comments when explicitly tagged, both in issue comments and review comments. |
| Help | /gemini help |
Displays a list of available commands. |
Customization
To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.
Limitations & Feedback
Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments or fill out our survey to provide feedback.
You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.
Footnotes
-
Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution. ↩
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This pull request adds support for streamable_http and websocket transports to the MCP client, which is a great enhancement. The implementation correctly uses lazy imports for the optional dependencies. My review identified a critical bug where the client session is not stored after creation, which would cause subsequent tool calls to fail. I've also pointed out a few other issues, including the use of an incorrect timeout constant, improperly formatted error messages, and code duplication that affects maintainability. I've provided specific suggestions to address these points.
ms_agent/tools/mcp_client.py
Outdated
| # List available tools | ||
| self.print_tools(server_name, await session.list_tools()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The client session is created but is not being initialized or stored in self.sessions. The previous implementation correctly called await session.initialize() and then self.sessions[server_name] = session.
Without these steps, any subsequent call to call_tool for this server will fail, as it relies on retrieving the session from self.sessions.
| # List available tools | |
| self.print_tools(server_name, await session.list_tools()) | |
| # Store session | |
| self.sessions[server_name] = session |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
ms_agent/tools/mcp_client.py
Outdated
| streamablehttp_client( | ||
| url, | ||
| headers=kwargs.get('headers'), | ||
| timeout=kwargs.get('timeout', DEFAULT_HTTP_TIMEOUT), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The streamablehttp_client is using DEFAULT_HTTP_TIMEOUT as its default timeout. However, a new constant, DEFAULT_STREAMABLE_HTTP_TIMEOUT, was introduced and seems more appropriate here.
Using DEFAULT_HTTP_TIMEOUT is likely incorrect because it's an integer, whereas the new streamablehttp_client probably expects a timedelta object, similar to DEFAULT_STREAMABLE_HTTP_SSE_READ_TIMEOUT. This could lead to unexpected timeout behavior.
| timeout=kwargs.get('timeout', DEFAULT_HTTP_TIMEOUT), | |
| timeout=kwargs.get('timeout', DEFAULT_STREAMABLE_HTTP_TIMEOUT), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
ms_agent/tools/mcp_client.py
Outdated
| raise ImportError( | ||
| 'Could not import streamablehttp_client. ', | ||
| 'To use streamable http connections, please upgrade the required dependency with: ', | ||
| "'pip install -U mcp'", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When you raise an ImportError with multiple string arguments, Python creates a tuple of those strings as the error message (e.g., ("msg1", "msg2")) instead of a single, concatenated string. This can make the error message difficult to read for developers.
To provide a clear, single-string error message, you can rely on Python's automatic string literal concatenation.
| raise ImportError( | |
| 'Could not import streamablehttp_client. ', | |
| 'To use streamable http connections, please upgrade the required dependency with: ', | |
| "'pip install -U mcp'", | |
| raise ImportError( | |
| 'Could not import streamablehttp_client. ' | |
| 'To use streamable http connections, please upgrade the required dependency with: ' | |
| "'pip install -U mcp'" | |
| ) from None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
ms_agent/tools/mcp_client.py
Outdated
| raise ImportError( | ||
| 'Could not import websocket_client. ', | ||
| 'To use Websocket connections, please install the required dependency with: ', | ||
| "'pip install mcp[ws]' or 'pip install websockets'", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When you raise an ImportError with multiple string arguments, Python creates a tuple of those strings as the error message (e.g., ("msg1", "msg2")) instead of a single, concatenated string. This can make the error message difficult to read for developers.
To provide a clear, single-string error message, you can rely on Python's automatic string literal concatenation.
| raise ImportError( | |
| 'Could not import websocket_client. ', | |
| 'To use Websocket connections, please install the required dependency with: ', | |
| "'pip install mcp[ws]' or 'pip install websockets'", | |
| raise ImportError( | |
| 'Could not import websocket_client. ' | |
| 'To use Websocket connections, please install the required dependency with: ' | |
| "'pip install mcp[ws]' or 'pip install websockets'" | |
| ) from None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
ms_agent/tools/mcp_client.py
Outdated
| if transport == 'streamable_http': | ||
| try: | ||
| from mcp.client.streamable_http import streamablehttp_client | ||
| except ImportError: | ||
| raise ImportError( | ||
| 'Could not import streamablehttp_client. ', | ||
| 'To use streamable http connections, please upgrade the required dependency with: ', | ||
| "'pip install -U mcp'", | ||
| ) from None | ||
| httpx_client_factory = kwargs.get('httpx_client_factory') | ||
| other_kwargs = {} | ||
| if httpx_client_factory is not None: | ||
| other_kwargs['httpx_client_factory'] = httpx_client_factory | ||
| streamable_transport = await self.exit_stack.enter_async_context( | ||
| streamablehttp_client( | ||
| url, | ||
| headers=kwargs.get('headers'), | ||
| timeout=kwargs.get('timeout', DEFAULT_HTTP_TIMEOUT), | ||
| sse_read_timeout=kwargs.get( | ||
| 'sse_read_timeout', | ||
| DEFAULT_STREAMABLE_HTTP_SSE_READ_TIMEOUT), | ||
| **other_kwargs)) | ||
| read, write, _ = streamable_transport | ||
| session_kwargs = session_kwargs or {} | ||
| session = await self.exit_stack.enter_async_context( | ||
| ClientSession(read, write, **session_kwargs)) | ||
| elif transport == 'websocket': | ||
| try: | ||
| from mcp.client.websocket import websocket_client | ||
| except ImportError: | ||
| raise ImportError( | ||
| 'Could not import websocket_client. ', | ||
| 'To use Websocket connections, please install the required dependency with: ', | ||
| "'pip install mcp[ws]' or 'pip install websockets'", | ||
| ) from None | ||
| websocket_transport = await self.exit_stack.enter_async_context( | ||
| websocket_client(url)) | ||
| read, write = websocket_transport | ||
| session_kwargs = session_kwargs or {} | ||
| session = await self.exit_stack.enter_async_context( | ||
| ClientSession(read, write, **session_kwargs)) | ||
| else: | ||
| sse_transport = await self.exit_stack.enter_async_context( | ||
| sse_client( | ||
| url, kwargs.get('headers'), | ||
| kwargs.get('timeout', DEFAULT_HTTP_TIMEOUT), | ||
| kwargs.get('sse_read_timeout', | ||
| DEFAULT_SSE_READ_TIMEOUT))) | ||
| read, write = sse_transport | ||
| session_kwargs = session_kwargs or {} | ||
| session = await self.exit_stack.enter_async_context( | ||
| ClientSession(read, write, **session_kwargs)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's a fair amount of code duplication in the if/elif/else block for handling different transports. The logic for initializing session_kwargs and creating the ClientSession is repeated in each branch.
To improve maintainability and reduce redundancy, I recommend refactoring this section. You could determine the read and write streams within each specific transport branch and then move the common session creation logic outside and after the conditional block.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
ms_agent/tools/mcp_client.py
Outdated
| except ImportError: | ||
| raise ImportError( | ||
| 'Could not import streamablehttp_client. ', | ||
| 'To use streamable http connections, please upgrade the required dependency with: ', |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is possible that the user does not have the latest version of mcp installed. Consider suggesting that they upgrade to the latest version.
| 'To use streamable http connections, please upgrade the required dependency with: ', | |
| 'To use streamable http connections, please upgrade to the latest version of mcp with: ', |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
|
|
||
| async def connect_to_server(self, server_name: str, **kwargs): | ||
| logger.info(f'connect to {server_name}') | ||
| transport = kwargs.get('transport') or kwargs.get('type') |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
增加注释,列出transport or type完备的枚举值
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
已修复
No description provided.