-
Notifications
You must be signed in to change notification settings - Fork 746
feat: improve process killing #600
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
feat: improve process killing
Caution Review failedThe pull request is closed. WalkthroughAdds configurable graceful shutdown for the Stdio transport, normalizes loosely-typed Content to concrete MCP types, implements bidirectional sampling support across HTTP and stdio transports, changes resource _meta to raw maps, adds extensive tests and docs, and introduces a CI coverage job and gitignore entries. Changes
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Possibly related PRs
Suggested labels
Suggested reviewers
Pre-merge checks and finishing touches✅ Passed checks (5 passed)
📜 Recent review detailsConfiguration used: CodeRabbit UI Review profile: CHILL Plan: Pro 📒 Files selected for processing (33)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
client/transport/stdio.go
(8 hunks)
🧰 Additional context used
🧠 Learnings (2)
📚 Learning: 2025-08-08T15:38:52.931Z
Learnt from: ezynda3
PR: mark3labs/mcp-go#0
File: :0-0
Timestamp: 2025-08-08T15:38:52.931Z
Learning: In mark3labs/mcp-go, for stdio line reading (e.g., client/transport/stdio.go), maintainers prefer bufio.Reader over bufio.Scanner because Scanner’s default 64 KiB token limit is problematic for long lines. If Scanner is used, its buffer limit should be raised via Scanner.Buffer.
Applied to files:
client/transport/stdio.go
📚 Learning: 2025-08-08T15:37:18.458Z
Learnt from: ezynda3
PR: mark3labs/mcp-go#0
File: :0-0
Timestamp: 2025-08-08T15:37:18.458Z
Learning: In mark3labs/mcp-go stdio transport, using bufio.Scanner without calling Scanner.Buffer imposes a 64 KiB per-line limit; to support very long lines safely, set a larger max via Scanner.Buffer or use a streaming Reader/json.Decoder approach.
Applied to files:
client/transport/stdio.go
🧬 Code graph analysis (1)
client/transport/stdio.go (2)
client/transport/interface.go (2)
JSONRPCResponse
(67-72)RequestHandler
(38-38)util/logger.go (2)
Logger
(8-11)DefaultLogger
(16-20)
// Waiting for the server to exit, or sending SIGTERM if the server does not exit within a reasonable time | ||
wait := func() (error, bool) { | ||
select { | ||
case err := <-resChan: | ||
return err, true | ||
case <-time.After(c.terminateDuration): | ||
} | ||
return nil, false | ||
} | ||
|
||
if err, ok := wait(); ok { | ||
return err | ||
} | ||
|
||
// Note the condition here: if sending SIGTERM fails, don't wait and just | ||
// move on to SIGKILL. | ||
if err := c.cmd.Process.Signal(syscall.SIGTERM); err == nil { | ||
if err, ok := wait(); ok { | ||
return err | ||
} | ||
} | ||
// Sending SIGKILL if the server does not exit within a reasonable time after SIGTERM | ||
if err := c.cmd.Process.Kill(); err != nil { | ||
return err | ||
} | ||
if err, ok := wait(); ok { | ||
return err | ||
} | ||
return fmt.Errorf("unresponsive subprocess") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Avoid killing an already–exited subprocess
After the first timeout expires, the child might exit just before we send SIGTERM. In that race, Process.Signal
returns os: process already finished
; because we always fall through to SIGKILL, Process.Kill
returns the same error and Close
fails even though shutdown already completed. We need a non-blocking re-check of the Wait
result between escalation steps so we don’t signal a reaped process.
Please fold in a zero-timeout drain before each escalation, e.g.:
- wait := func() (error, bool) {
- select {
- case err := <-resChan:
- return err, true
- case <-time.After(c.terminateDuration):
- }
- return nil, false
- }
-
- if err, ok := wait(); ok {
+ wait := func(timeout time.Duration) (error, bool) {
+ if timeout <= 0 {
+ select {
+ case err := <-resChan:
+ return err, true
+ default:
+ return nil, false
+ }
+ }
+
+ select {
+ case err := <-resChan:
+ return err, true
+ case <-time.After(timeout):
+ }
+ return nil, false
+ }
+
+ if err, ok := wait(c.terminateDuration); ok {
return err
}
+ if err, ok := wait(0); ok {
+ return err
+ }
+
// Note the condition here: if sending SIGTERM fails, don't wait and just
// move on to SIGKILL.
if err := c.cmd.Process.Signal(syscall.SIGTERM); err == nil {
- if err, ok := wait(); ok {
+ if err, ok := wait(c.terminateDuration); ok {
return err
}
+ } else {
+ if err, ok := wait(0); ok {
+ return err
+ }
}
// Sending SIGKILL if the server does not exit within a reasonable time after SIGTERM
+ if err, ok := wait(0); ok {
+ return err
+ }
if err := c.cmd.Process.Kill(); err != nil {
- return err
+ if err2, ok := wait(0); ok {
+ return err2
+ }
+ return err
}
- if err, ok := wait(); ok {
+ if err, ok := wait(c.terminateDuration); ok {
return err
}
+ if err, ok := wait(0); ok {
+ return err
+ }
return fmt.Errorf("unresponsive subprocess")
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
// Waiting for the server to exit, or sending SIGTERM if the server does not exit within a reasonable time | |
wait := func() (error, bool) { | |
select { | |
case err := <-resChan: | |
return err, true | |
case <-time.After(c.terminateDuration): | |
} | |
return nil, false | |
} | |
if err, ok := wait(); ok { | |
return err | |
} | |
// Note the condition here: if sending SIGTERM fails, don't wait and just | |
// move on to SIGKILL. | |
if err := c.cmd.Process.Signal(syscall.SIGTERM); err == nil { | |
if err, ok := wait(); ok { | |
return err | |
} | |
} | |
// Sending SIGKILL if the server does not exit within a reasonable time after SIGTERM | |
if err := c.cmd.Process.Kill(); err != nil { | |
return err | |
} | |
if err, ok := wait(); ok { | |
return err | |
} | |
return fmt.Errorf("unresponsive subprocess") | |
// Waiting for the server to exit, or sending SIGTERM if the server does not exit within a reasonable time | |
wait := func(timeout time.Duration) (error, bool) { | |
if timeout <= 0 { | |
select { | |
case err := <-resChan: | |
return err, true | |
default: | |
return nil, false | |
} | |
} | |
select { | |
case err := <-resChan: | |
return err, true | |
case <-time.After(timeout): | |
} | |
return nil, false | |
} | |
if err, ok := wait(c.terminateDuration); ok { | |
return err | |
} | |
if err, ok := wait(0); ok { | |
return err | |
} | |
// Note the condition here: if sending SIGTERM fails, don't wait and just | |
// move on to SIGKILL. | |
if err := c.cmd.Process.Signal(syscall.SIGTERM); err == nil { | |
if err, ok := wait(c.terminateDuration); ok { | |
return err | |
} | |
} else { | |
if err, ok := wait(0); ok { | |
return err | |
} | |
} | |
// Sending SIGKILL if the server does not exit within a reasonable time after SIGTERM | |
if err, ok := wait(0); ok { | |
return err | |
} | |
if err := c.cmd.Process.Kill(); err != nil { | |
if err2, ok := wait(0); ok { | |
return err2 | |
} | |
return err | |
} | |
if err, ok := wait(c.terminateDuration); ok { | |
return err | |
} | |
if err, ok := wait(0); ok { | |
return err | |
} | |
return fmt.Errorf("unresponsive subprocess") |
🤖 Prompt for AI Agents
client/transport/stdio.go lines 236-264: after the initial timeout you must
avoid signaling a process that already exited; add a non-blocking zero-timeout
check (select on resChan with a default) immediately before sending SIGTERM and
again immediately before sending SIGKILL so that if the child finished in the
race we drain resChan and return its result instead of proceeding; additionally
treat "process already finished" errors from Signal/Kill as a success path by
performing the same non-blocking check and returning the drained result rather
than treating those errors as failures.
…s in stdio transport (mark3labs#603) The bufio.Scanner has a default 64KB token limit which causes 'token too long' errors when MCP servers send large messages (e.g., large tool responses, resource contents, or prompts). This change replaces Scanner with Reader.ReadString('\n') which can handle arbitrarily large lines. Changes: - client/transport/stdio.go: Changed stdout from *bufio.Scanner to *bufio.Reader - testdata/mockstdio_server.go: Applied same fix to mock server - client/transport/stdio_test.go: Added TestStdio_LargeMessages with tests for messages ranging from 1KB to 5MB to ensure the fix works correctly The original code (pre-commit 4e353ac) used bufio.Reader, but was incorrectly changed to Scanner claiming it would avoid panics with long lines. This fix reverts to the Reader approach which actually handles large messages correctly. Fixes issue where stdio clients fail with 'bufio.Scanner: token too long' error when communicating with servers that send large responses.
* feat: implement sampling support for Streamable HTTP transport Implements sampling capability for HTTP transport, resolving issue mark3labs#419. Enables servers to send sampling requests to HTTP clients via SSE and receive LLM-generated responses. ## Key Changes ### Core Implementation - Add `BidirectionalInterface` support to `StreamableHTTP` - Implement `SetRequestHandler` for server-to-client requests - Enhance SSE parsing to handle requests alongside responses/notifications - Add `handleIncomingRequest` and `sendResponseToServer` methods ### HTTP-Specific Features - Leverage existing MCP headers (`Mcp-Session-Id`, `Mcp-Protocol-Version`) - Bidirectional communication via HTTP POST for responses - Proper JSON-RPC request/response handling over HTTP ### Error Handling - Add specific JSON-RPC error codes for different failure scenarios: - `-32601` (Method not found) when no handler configured - `-32603` (Internal error) for sampling failures - `-32800` (Request cancelled/timeout) for context errors - Enhanced error messages with sampling-specific context ### Testing & Examples - Comprehensive test suite in `streamable_http_sampling_test.go` - Complete working example in `examples/sampling_http_client/` - Tests cover success flows, error scenarios, and interface compliance ## Technical Details The implementation maintains full backward compatibility while adding bidirectional communication support. Server requests are processed asynchronously to avoid blocking the SSE stream reader. HTTP transport now supports the complete sampling flow that was previously only available in stdio and inprocess transports. 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com> * feat: implement server-side sampling support for HTTP transport This completes the server-side implementation of sampling support for HTTP transport, addressing the remaining requirements from issue mark3labs#419. Changes: - Enhanced streamableHttpSession to implement SessionWithSampling interface - Added bidirectional SSE communication for server-to-client requests - Implemented session registry for proper response correlation - Added comprehensive error handling with JSON-RPC error codes - Created extensive test suite covering all scenarios - Added working example server with sampling tools Key Features: - Server can send sampling requests to HTTP clients via SSE - Clients respond via HTTP POST with proper session correlation - Queue overflow protection and timeout handling - Compatible with existing HTTP transport architecture 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com> * fix: replace time.Sleep with synchronization primitives in tests Replace flaky time.Sleep calls with proper synchronization using channels and sync.WaitGroup to make tests deterministic and avoid race conditions. Also improves error handling robustness in test servers with proper JSON decoding error checks. 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com> * fix: improve request detection logic and add nil pointer checks - Make request vs response detection more robust by checking for presence of "method" field instead of relying on nil Result/Error fields - Add nil pointer check in sendResponseToServer function to prevent panics These changes improve reliability against malformed messages and edge cases. 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com> * fix: correct misleading comment about response delivery The comment incorrectly stated that responses are broadcast to all sessions, but the implementation actually delivers responses to the specific session identified by sessionID using the activeSessions registry. 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com> * fix: implement EnableSampling() to properly declare sampling capability Previously, EnableSampling() was a no-op that didn't actually enable the sampling capability in the server's declared capabilities. Changes: - Add Sampling field to mcp.ServerCapabilities struct - Add sampling field to internal serverCapabilities struct - Update EnableSampling() to set the sampling capability flag - Update handleInitialize() to include sampling in capability response - Add test to verify sampling capability is properly declared Now when EnableSampling() is called, the server will properly declare sampling capability during initialization, allowing clients to know that the server supports sending sampling requests. 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com> * fix: prevent panic from unsafe type assertion in example server Replace unsafe type assertion result.Content.(mcp.TextContent).Text with safe type checking to handle cases where Content might not be a TextContent struct. Now gracefully handles different content types without panicking. 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com> * fix: add missing EnableSampling() call in interface test The SamplingInterface test was missing the EnableSampling() call, which is necessary to activate sampling features for proper testing. 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com> * fix: expand error test coverage and avoid t.Fatalf - Replace single error test with comprehensive table-driven tests - Add test cases for invalid request IDs and malformed results - Replace t.Fatalf with t.Errorf to follow project conventions - Use proper session ID format for valid test scenarios 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com> * fix: eliminate recursive response handling and improve routing - Remove recursive call in RequestSampling that could cause stack overflow - Remove problematic response re-queuing to global channel - Update deliverSamplingResponse to route responses directly to dedicated request channels via samplingRequests map lookup - This prevents ordering issues and ensures responses reach the correct waiting request 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com> * fix: improve sampling response delivery robustness - Modified deliverSamplingResponse to return error instead of just logging - Added proper error handling for disconnected sessions - Improved error messages for debugging - Updated test expectations to match new error behavior 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com> * fix: add graceful shutdown handling to sampling client - Add signal handling for SIGINT and SIGTERM - Move defer statement after error checking - Improve shutdown error handling 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com> * fix: improve context handling in streamable HTTP transport - Add timeout context for SSE response processing (30s default) - Add timeout for individual connection attempts in listenForever (10s) - Use context-aware sleep in retry logic - Ensure async goroutines properly respect context cancellation 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com> * fix: improve error message for notification channel queue full condition - Make error message more descriptive and actionable - Provide clearer debugging information about why the channel is blocked 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com> * refactor: rename struct variable for clarity in message parsing - Rename 'baseMessage' to 'jsonMessage' for more neutral naming - Improves code readability and follows consistent naming conventions 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com> * test: add concurrent sampling requests test with response association Add test verifying that concurrent sampling requests are handled correctly when the second request completes faster than the first. The test ensures: - Responses are correctly associated with their request IDs - Server processes requests concurrently without blocking - Completion order follows actual processing time, not submission order 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com> * fix: improve context handling in async goroutine Create new context with 30-second timeout for request handling to prevent long-running handlers from blocking indefinitely. 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com> * refactor: replace interface{} with any throughout codebase Replace all occurrences of interface{} with the modern Go any type alias for improved readability and consistency with current Go best practices. 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com> * fix: improve context handling in async goroutine for StreamableHTTP Create timeout context from parent context instead of context.Background() to ensure request handlers respect parent context cancellation. Addresses review comment about context handling in async goroutine. 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com> * refactor: remove unused samplingResponseChan field from session struct The samplingResponseChan field was declared but never used in the streamableHttpSession struct. Remove it and update tests accordingly. Addresses review comment about unused fields in session struct. 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com> * feat: add graceful shutdown handling to sampling HTTP client example Add signal handling for SIGINT and SIGTERM to allow graceful shutdown of the sampling HTTP client example. This prevents indefinite blocking and provides better production-ready behavior. Addresses review comment about adding graceful shutdown handling. 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com> * refactor: remove unused mu field from streamableHttpSession Removes unused sync.RWMutex field that was flagged by golangci-lint. 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com> * Add e2e test * wip * wip * fixes * fixes * fix race condition --------- Co-authored-by: andig <cpuidle@gmx.de> Co-authored-by: Claude <noreply@anthropic.com> Co-authored-by: andig <cpuidle@gmail.com>
…ark3labs#591) * feat: update type definition of _meta field to text/blob resources. - Parses optional _meta field on the text/blob resource - Add TestResourceContentsMetaField to verify _meta field handling - Ensure backward compatibility with resources without _meta * chore: apply coderabbit nits.
Description
This adds a fallback to help clean up a process that may not be closing. Currently processes can be help open, causing the close function to not finish.
I believe this fixes #131 and #498.
Type of Change
Checklist
MCP Spec Compliance
Additional Information
This is largely pulled from the official MCP-Go SDK https://github.com/modelcontextprotocol/go-sdk/blob/ab092510d20a23bed672a27164957e0ff1104bc7/mcp/cmd.go#L100
Summary by CodeRabbit
New Features
Bug Fixes