diff --git a/mcp-solon/mcp-solon-webrx/src/main/java/io/modelcontextprotocol/server/transport/WebRxSseServerTransportProvider.java b/mcp-solon/mcp-solon-webrx/src/main/java/io/modelcontextprotocol/server/transport/WebRxSseServerTransportProvider.java index 6a2ab8d..3fabe05 100644 --- a/mcp-solon/mcp-solon-webrx/src/main/java/io/modelcontextprotocol/server/transport/WebRxSseServerTransportProvider.java +++ b/mcp-solon/mcp-solon-webrx/src/main/java/io/modelcontextprotocol/server/transport/WebRxSseServerTransportProvider.java @@ -92,7 +92,6 @@ public class WebRxSseServerTransportProvider implements McpServerTransportProvid * Map of active client sessions, keyed by session ID. */ private final ConcurrentHashMap sessions = new ConcurrentHashMap<>(); - private final ConcurrentHashMap sessionTransports = new ConcurrentHashMap<>(); /** * Flag indicating if the transport is shutting down. */ @@ -117,9 +116,9 @@ public WebRxSseServerTransportProvider(ObjectMapper objectMapper, String message this.sseEndpoint = sseEndpoint; } - public void sendHeartbeat(){ - for (WebRxMcpSessionTransport transport : sessionTransports.values()) { - transport.sendHeartbeat(); + public void sendHeartbeat() { + for (McpServerSession session : sessions.values()) { + ((WebRxMcpSessionTransport) session.getTransport()).sendHeartbeat(); } } @@ -230,14 +229,13 @@ public void handleSseConnection(Context ctx) throws Throwable{ } Flux publisher = Flux.create(sink -> { - WebRxMcpSessionTransport sessionTransport = new WebRxMcpSessionTransport(sink); + WebRxMcpSessionTransport sessionTransport = new WebRxMcpSessionTransport(ctx, sink); McpServerSession session = sessionFactory.create(sessionTransport); String sessionId = session.getId(); logger.debug("Created new SSE connection for session: {}", sessionId); sessions.put(sessionId, session); - sessionTransports.put(sessionId, sessionTransport); // Send initial endpoint event logger.debug("Sending initial endpoint event to session: {}", sessionId); @@ -247,7 +245,6 @@ public void handleSseConnection(Context ctx) throws Throwable{ sink.onCancel(() -> { logger.debug("Session {} cancelled", sessionId); sessions.remove(sessionId); - sessionTransports.remove(sessionId); }); }); @@ -277,13 +274,16 @@ public void handleMessage(Context ctx) throws Throwable { return; } - if (Utils.isEmpty(ctx.param("sessionId"))) { + String sessionId = ctx.param("sessionId"); + + if (Utils.isEmpty(sessionId)) { ctx.status(404); ctx.render(new McpError("Session ID missing in message endpoint")); return; } - McpServerSession session = sessions.get(ctx.param("sessionId")); + + McpServerSession session = sessions.get(sessionId); String body = ctx.body(); try { @@ -309,14 +309,19 @@ public void handleMessage(Context ctx) throws Throwable { } } - private class WebRxMcpSessionTransport implements McpServerTransport { - + public class WebRxMcpSessionTransport implements McpServerTransport { + private final Context context; private final FluxSink sink; - public WebRxMcpSessionTransport(FluxSink sink) { + public WebRxMcpSessionTransport(Context context,FluxSink sink) { + this.context = context; this.sink = sink; } + public Context getContext() { + return context; + } + public void sendHeartbeat() { sink.next(new SseEvent().comment("heartbeat")); } diff --git a/mcp/src/main/java/io/modelcontextprotocol/server/McpAsyncServer.java b/mcp/src/main/java/io/modelcontextprotocol/server/McpAsyncServer.java index 0c9c82b..d0ec9c6 100644 --- a/mcp/src/main/java/io/modelcontextprotocol/server/McpAsyncServer.java +++ b/mcp/src/main/java/io/modelcontextprotocol/server/McpAsyncServer.java @@ -646,9 +646,9 @@ public Mono addResource(McpServerFeatures.AsyncResourceSpecification resou } return Mono.defer(() -> { - if (this.resources.putIfAbsent(resourceSpecification.getResource().getDescription(), resourceSpecification) != null) { + if (this.resources.putIfAbsent(resourceSpecification.getResource().getUri(), resourceSpecification) != null) { return Mono.error(new McpError( - "Resource with URI '" + resourceSpecification.getResource().getDescription() + "' already exists")); + "Resource with URI '" + resourceSpecification.getResource().getUri() + "' already exists")); } logger.debug("Added resource handler: {}", resourceSpecification.getResource().getUri()); if (this.serverCapabilities.getResources().getListChanged()) { diff --git a/mcp/src/main/java/io/modelcontextprotocol/spec/McpServerSession.java b/mcp/src/main/java/io/modelcontextprotocol/spec/McpServerSession.java index 98c12a1..7d89914 100644 --- a/mcp/src/main/java/io/modelcontextprotocol/spec/McpServerSession.java +++ b/mcp/src/main/java/io/modelcontextprotocol/spec/McpServerSession.java @@ -81,6 +81,10 @@ public McpServerSession(String id, McpServerTransport transport, InitRequestHand this.notificationHandlers = notificationHandlers; } + public McpServerTransport getTransport() { + return transport; + } + /** * Retrieve the session id. * @return session id diff --git a/pom.xml b/pom.xml index da674ce..431972d 100644 --- a/pom.xml +++ b/pom.xml @@ -92,7 +92,7 @@ 4.2.0 5.0.1 2.40.1 - 3.2.1 + 3.3.0