-
Notifications
You must be signed in to change notification settings - Fork 818
Description
Bug description
On an MCP server using StdioServerTransportProvider, if multiple asynchronous tool calls are made on the same session, the following RuntimeException may be thrown on the server side, and the server may stop responding thereafter.
[boundedElastic-1] ERROR reactor.core.publisher.Operators - Operator called default onErrorDropped
reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.RuntimeException: Failed to enqueue message
Caused by: java.lang.RuntimeException: Failed to enqueue message
at io.modelcontextprotocol.server.transport.StdioServerTransportProvider$StdioMcpSessionTransport.lambda$sendMessage$0(StdioServerTransportProvider.java:154)
at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:45)
at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.subscribeNext(MonoIgnoreThen.java:241)
at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.onComplete(MonoIgnoreThen.java:204)
at reactor.core.publisher.MonoZip$ZipInner.onComplete(MonoZip.java:536)
at reactor.core.publisher.Operators$MonoInnerProducerBase.complete(Operators.java:2869)
at reactor.core.publisher.SinkOneMulticast.subscribe(SinkOneMulticast.java:98)
at reactor.core.publisher.MonoZip$ZipCoordinator.request(MonoZip.java:220)
at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.onSubscribe(MonoIgnoreThen.java:135)
at reactor.core.publisher.MonoZip.subscribe(MonoZip.java:129)
at reactor.core.publisher.Mono.subscribe(Mono.java:4576)
at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.subscribeNext(MonoIgnoreThen.java:265)
at reactor.core.publisher.MonoIgnoreThen.subscribe(MonoIgnoreThen.java:51)
at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:165)
at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onNext(FluxOnErrorResume.java:79)
at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onNext(FluxOnErrorResume.java:79)
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:129)
at reactor.core.publisher.MonoFlatMap$FlatMapMain.secondComplete(MonoFlatMap.java:245)
at reactor.core.publisher.MonoFlatMap$FlatMapInner.onNext(MonoFlatMap.java:305)
at reactor.core.publisher.FluxSubscribeOnCallable$CallableSubscribeOnSubscription.run(FluxSubscribeOnCallable.java:252)
at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:68)
at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:28)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:317)
at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
at java.base/java.lang.Thread.run(Thread.java:1583)
When an LLM requests multiple tool calls, AI agent libraries such as LangChain may execute multiple tool calls in parallel.
In this case, a runtime exception should not occur.
Environment
MCP SDK Version: 0.16.0
JDK: 21
Steps to reproduce
Implement an MCP server with a simple tool that echoes back input.
The tool is intentionally implemented to delay responses.
import com.fasterxml.jackson.databind.ObjectMapper;
import io.modelcontextprotocol.json.jackson.JacksonMcpJsonMapper;
import io.modelcontextprotocol.server.McpServer;
import io.modelcontextprotocol.server.McpServerFeatures;
import io.modelcontextprotocol.server.transport.StdioServerTransportProvider;
import io.modelcontextprotocol.spec.McpSchema;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.TimeUnit;
public class Server {
private static final Logger LOGGER = LoggerFactory.getLogger(Server.class);
public static void main(String[] args) {
var jsonMapper = new JacksonMcpJsonMapper(new ObjectMapper());
var transportProvider = new StdioServerTransportProvider(jsonMapper);
var schema = """
{
"type": "object",
"id": "urn:jsonschema:Operation",
"properties": {
"input": {
"type": "string"
}
}
}
""";
var tool = McpSchema.Tool.builder().name("echo").description("echo back input value")
.inputSchema(jsonMapper, schema).build();
var toolSpec = McpServerFeatures.SyncToolSpecification.builder().tool(tool)
.callHandler((mcpSyncServerExchange, callToolRequest) -> {
var arguments = callToolRequest.arguments();
var input = (String) arguments.get("input");
LOGGER.info("Echoing: {}", input);
try {
TimeUnit.SECONDS.sleep(3); // delay response
return new McpSchema.CallToolResult(input, false);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return new McpSchema.CallToolResult("error", true);
}
}).build();
var server = McpServer.sync(transportProvider).serverInfo("example", "1.0.0")
.capabilities(McpSchema.ServerCapabilities.builder().tools(true).build()).tools(toolSpec).build();
LOGGER.info("Starting started: {}", server.getServerInfo());
}
}Create and execute code that makes multiple asynchronous tool calls to this MCP server.
import com.fasterxml.jackson.databind.ObjectMapper;
import io.modelcontextprotocol.client.McpClient;
import io.modelcontextprotocol.client.transport.ServerParameters;
import io.modelcontextprotocol.client.transport.StdioClientTransport;
import io.modelcontextprotocol.json.jackson.JacksonMcpJsonMapper;
import io.modelcontextprotocol.spec.McpSchema;
import java.util.Map;
public class Client {
public static void main(String[] args) {
var params = ServerParameters.builder("java")
.args("-jar", "/path/to/example.jar")
.build();
var transport = new StdioClientTransport(params, new JacksonMcpJsonMapper(new ObjectMapper()));
transport.setStdErrorHandler(System.err::println);
var client = McpClient.async(transport).build();
client.initialize().subscribe();
client.listTools()
.doOnNext(result -> result.tools().forEach(tool -> System.out.println(tool.name())))
.subscribe();
client.callTool(McpSchema.CallToolRequest.builder().name("echo").arguments(Map.of("input", "hello1")).build())
.doOnNext(callToolResult -> callToolResult.content().forEach(System.out::println))
.subscribe();
client.callTool(McpSchema.CallToolRequest.builder().name("echo").arguments(Map.of("input", "hello2")).build())
.doOnNext(callToolResult -> callToolResult.content().forEach(System.out::println))
.subscribe();
}
}The exception shown in the report at the beginning often occurs on the server side (although it may not occur).
Expected behavior
Return the correct response for each asynchronous call.
Minimal Complete Reproducible example
As described in "Steps to reproduce".