From b14d953aec11013a3af2a0e6c7a953f14187d9fa Mon Sep 17 00:00:00 2001 From: noear Date: Tue, 8 Apr 2025 14:37:50 +0800 Subject: [PATCH 1/2] feat(solon-webrx): Add solon-webrx adaptive --- mcp-solon/README.md | 14 + mcp-solon/mcp-solon-webrx/README.md | 30 ++ mcp-solon/mcp-solon-webrx/pom.xml | 147 +++++++ .../WebRxSseServerTransportProvider.java | 408 ++++++++++++++++++ pom.xml | 2 + 5 files changed, 601 insertions(+) create mode 100644 mcp-solon/README.md create mode 100644 mcp-solon/mcp-solon-webrx/README.md create mode 100644 mcp-solon/mcp-solon-webrx/pom.xml create mode 100644 mcp-solon/mcp-solon-webrx/src/main/java/io/modelcontextprotocol/server/transport/WebRxSseServerTransportProvider.java diff --git a/mcp-solon/README.md b/mcp-solon/README.md new file mode 100644 index 0000000..3c1df79 --- /dev/null +++ b/mcp-solon/README.md @@ -0,0 +1,14 @@ +## solon is a java enterprise application development framework similar to spring. + +There is no java-ee. Compared to the spring framework: + +* more concurrency (300%), +* less memory (50%), +* faster startup (1000%), +* smaller packaging (10%), +* support for java8 ~ java24, native runtime. + + +github: + +* https://github.com/opensolon/solon \ No newline at end of file diff --git a/mcp-solon/mcp-solon-webrx/README.md b/mcp-solon/mcp-solon-webrx/README.md new file mode 100644 index 0000000..3af5f05 --- /dev/null +++ b/mcp-solon/mcp-solon-webrx/README.md @@ -0,0 +1,30 @@ +# Solon WebRx SSE Server Transport + +```xml + + io.modelcontextprotocol.sdk + mcp-solon-webrx + +``` + + +```java +String MESSAGE_ENDPOINT = "/mcp/message"; + +@Configuration +static class MyConfig { + + @Bean + public WebRxSseServerTransportProvider webMvcSseServerTransport() { + return WebRxSseServerTransportProvider.builder() + .objectMapper(new ObjectMapper()) + .messageEndpoint(MESSAGE_ENDPOINT) + .build(); + } + + @Bean + public void routerFunction(WebRxSseServerTransportProvider transport, AppContext context) { + transport.toHttpHandler(context.app()); + } +} +``` diff --git a/mcp-solon/mcp-solon-webrx/pom.xml b/mcp-solon/mcp-solon-webrx/pom.xml new file mode 100644 index 0000000..1485b05 --- /dev/null +++ b/mcp-solon/mcp-solon-webrx/pom.xml @@ -0,0 +1,147 @@ + + + 4.0.0 + + io.modelcontextprotocol.sdk.j8 + mcp-parent + 0.8.1 + ../../pom.xml + + mcp-solon-webrx + jar + Solon WebRx implementation of the Java MCP SSE transport + + https://github.com/modelcontextprotocol/java-sdk + + + https://github.com/modelcontextprotocol/java-sdk + git://github.com/modelcontextprotocol/java-sdk.git + git@github.com/modelcontextprotocol/java-sdk.git + + + + + io.modelcontextprotocol.sdk.j8 + mcp + 0.8.1 + + + + io.modelcontextprotocol.sdk + mcp-test + 0.8.1 + test + + + + org.noear + solon-web-rx + ${solon.version} + + + + org.noear + solon-web-sse + ${solon.version} + + + + org.noear + solon-net-httputils + ${solon.version} + test + + + + org.noear + solon-logging-simple + ${solon.version} + test + + + + org.noear + solon-lib + ${solon.version} + test + + + + org.noear + solon-boot-jetty + ${solon.version} + test + + + + org.assertj + assertj-core + ${assert4j.version} + test + + + + org.junit.jupiter + junit-jupiter-api + ${junit.version} + test + + + + org.mockito + mockito-core + ${mockito.version} + test + + + + io.projectreactor + reactor-test + test + + + + org.testcontainers + junit-jupiter + ${testcontainers.version} + test + + + + org.awaitility + awaitility + ${awaitility.version} + test + + + + org.junit.jupiter + junit-jupiter-params + ${junit-jupiter.version} + test + + + + + + sonatype-nexus-snapshots + Sonatype Nexus Snapshots + https://oss.sonatype.org/content/repositories/snapshots + + false + + + + + + sonatype-nexus-snapshots + Sonatype Nexus Snapshots + https://oss.sonatype.org/content/repositories/snapshots + + false + + + + diff --git a/mcp-solon/mcp-solon-webrx/src/main/java/io/modelcontextprotocol/server/transport/WebRxSseServerTransportProvider.java b/mcp-solon/mcp-solon-webrx/src/main/java/io/modelcontextprotocol/server/transport/WebRxSseServerTransportProvider.java new file mode 100644 index 0000000..2746445 --- /dev/null +++ b/mcp-solon/mcp-solon-webrx/src/main/java/io/modelcontextprotocol/server/transport/WebRxSseServerTransportProvider.java @@ -0,0 +1,408 @@ +/* + * Copyright 2025 - 2025 the original author or authors. + */ +package io.modelcontextprotocol.server.transport; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import io.modelcontextprotocol.spec.*; +import io.modelcontextprotocol.util.Assert; +import org.noear.solon.SolonApp; +import org.noear.solon.Utils; +import org.noear.solon.core.handle.Context; +import org.noear.solon.core.handle.Entity; +import org.noear.solon.core.util.MimeType; +import org.noear.solon.web.sse.SseEvent; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import reactor.core.Exceptions; +import reactor.core.publisher.Flux; +import reactor.core.publisher.FluxSink; +import reactor.core.publisher.Mono; + +import java.io.IOException; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +/** + * Server-side implementation of the MCP (Model Context Protocol) HTTP transport using + * Server-Sent Events (SSE). This implementation provides a bidirectional communication + * channel between MCP clients and servers using HTTP POST for client-to-server messages + * and SSE for server-to-client messages. + * + *

+ * Key features: + *

+ * + *

+ * The transport sets up two main endpoints: + *

+ * + *

+ * This implementation is thread-safe and can handle multiple concurrent client + * connections. It uses {@link ConcurrentHashMap} for session management and Project + * Reactor's non-blocking APIs for message processing and delivery. + * + * @author Christian Tzolov + * @author Alexandros Pappas + * @author Dariusz Jędrzejczyk + * @author noear + * @see McpServerTransport + * @see SseEvent + */ +public class WebRxSseServerTransportProvider implements McpServerTransportProvider { + + private static final Logger logger = LoggerFactory.getLogger(WebRxSseServerTransportProvider.class); + + /** + * Event type for JSON-RPC messages sent through the SSE connection. + */ + public static final String MESSAGE_EVENT_TYPE = "message"; + + /** + * Event type for sending the message endpoint URI to clients. + */ + public static final String ENDPOINT_EVENT_TYPE = "endpoint"; + + /** + * Default SSE endpoint path as specified by the MCP transport specification. + */ + public static final String DEFAULT_SSE_ENDPOINT = "/sse"; + + private final ObjectMapper objectMapper; + + private final String messageEndpoint; + + private final String sseEndpoint; + + private McpServerSession.Factory sessionFactory; + + /** + * Map of active client sessions, keyed by session ID. + */ + private final ConcurrentHashMap sessions = new ConcurrentHashMap<>(); + + /** + * Flag indicating if the transport is shutting down. + */ + private volatile boolean isClosing = false; + + /** + * Constructs a new WebFlux SSE server transport provider instance. + * @param objectMapper The ObjectMapper to use for JSON serialization/deserialization + * of MCP messages. Must not be null. + * @param messageEndpoint The endpoint URI where clients should send their JSON-RPC + * messages. This endpoint will be communicated to clients during SSE connection + * setup. Must not be null. + * @throws IllegalArgumentException if either parameter is null + */ + public WebRxSseServerTransportProvider(ObjectMapper objectMapper, String messageEndpoint, String sseEndpoint) { + Assert.notNull(objectMapper, "ObjectMapper must not be null"); + Assert.notNull(messageEndpoint, "Message endpoint must not be null"); + Assert.notNull(sseEndpoint, "SSE endpoint must not be null"); + + this.objectMapper = objectMapper; + this.messageEndpoint = messageEndpoint; + this.sseEndpoint = sseEndpoint; + } + + public void toHttpHandler(SolonApp app) { + app.get(this.sseEndpoint, this::handleSseConnection); + app.post(this.messageEndpoint, this::handleMessage); + } + + /** + * Constructs a new WebFlux SSE server transport provider instance with the default + * SSE endpoint. + * @param objectMapper The ObjectMapper to use for JSON serialization/deserialization + * of MCP messages. Must not be null. + * @param messageEndpoint The endpoint URI where clients should send their JSON-RPC + * messages. This endpoint will be communicated to clients during SSE connection + * setup. Must not be null. + * @throws IllegalArgumentException if either parameter is null + */ + public WebRxSseServerTransportProvider(ObjectMapper objectMapper, String messageEndpoint) { + this(objectMapper, messageEndpoint, DEFAULT_SSE_ENDPOINT); + } + + @Override + public void setSessionFactory(McpServerSession.Factory sessionFactory) { + this.sessionFactory = sessionFactory; + } + + /** + * Broadcasts a JSON-RPC message to all connected clients through their SSE + * connections. The message is serialized to JSON and sent as a server-sent event to + * each active session. + * + *

+ * The method: + *

+ * @param method The JSON-RPC method to send to clients + * @param params The method parameters to send to clients + * @return A Mono that completes when the message has been sent to all sessions, or + * errors if any session fails to receive the message + */ + @Override + public Mono notifyClients(String method, Map params) { + if (sessions.isEmpty()) { + logger.debug("No active sessions to broadcast message to"); + return Mono.empty(); + } + + logger.debug("Attempting to broadcast message to {} active sessions", sessions.size()); + + return Flux.fromStream(sessions.values().stream()) + .flatMap(session -> session.sendNotification(method, params) + .doOnError(e -> logger.error("Failed to " + "send message to session " + "{}: {}", session.getId(), + e.getMessage())) + .onErrorComplete()) + .then(); + } + + // FIXME: This javadoc makes claims about using isClosing flag but it's not actually + // doing that. + /** + * Initiates a graceful shutdown of all the sessions. This method ensures all active + * sessions are properly closed and cleaned up. + * + *

+ * The shutdown process: + *

    + *
  • Marks the transport as closing to prevent new connections
  • + *
  • Closes each active session
  • + *
  • Removes closed sessions from the sessions map
  • + *
  • Times out after 5 seconds if shutdown takes too long
  • + *
+ * @return A Mono that completes when all sessions have been closed + */ + @Override + public Mono closeGracefully() { + return Flux.fromIterable(sessions.values()) + .doFirst(() -> logger.debug("Initiating graceful shutdown with {} active sessions", sessions.size())) + .flatMap(McpServerSession::closeGracefully) + .then(); + } + + /** + * Handles new SSE connection requests from clients. Creates a new session for each + * connection and sets up the SSE event stream. + * @param ctx The incoming server context + * @return A Mono which emits a response with the SSE event stream + */ + private void handleSseConnection(Context ctx) throws Throwable{ + if (isClosing) { + ctx.status(503); + ctx.output("Server is shutting down"); + return; + } + + Flux publisher = Flux.create(sink -> { + WebRxMcpSessionTransport sessionTransport = new WebRxMcpSessionTransport(sink); + + McpServerSession session = sessionFactory.create(sessionTransport); + String sessionId = session.getId(); + + logger.debug("Created new SSE connection for session: {}", sessionId); + sessions.put(sessionId, session); + + // Send initial endpoint event + logger.debug("Sending initial endpoint event to session: {}", sessionId); + sink.next(new SseEvent() + .name(ENDPOINT_EVENT_TYPE) + .data(messageEndpoint + "?sessionId=" + sessionId)); + sink.onCancel(() -> { + logger.debug("Session {} cancelled", sessionId); + sessions.remove(sessionId); + }); + }); + + ctx.contentType(MimeType.TEXT_EVENT_STREAM_VALUE); + ctx.returnValue(publisher); + } + + /** + * Handles incoming JSON-RPC messages from clients. Deserializes the message and + * processes it through the configured message handler. + * + *

+ * The handler: + *

    + *
  • Deserializes the incoming JSON-RPC message
  • + *
  • Passes it through the message handler chain
  • + *
  • Returns appropriate HTTP responses based on processing results
  • + *
  • Handles various error conditions with appropriate error responses
  • + *
+ * @param request The incoming server request containing the JSON-RPC message + * @return A Mono emitting the response indicating the message processing result + */ + private void handleMessage(Context request) throws Throwable { + if (isClosing) { + request.status(503); + request.output("Server is shutting down"); + return; + } + + if (Utils.isEmpty(request.param("sessionId"))) { + request.status(404); + request.render(new McpError("Session ID missing in message endpoint")); + return; + } + + McpServerSession session = sessions.get(request.param("sessionId")); + + String body = request.body(); + try { + McpSchema.JSONRPCMessage message = McpSchema.deserializeJsonRpcMessage(objectMapper, body); + + Mono mono = session.handle(message) + .flatMap(response -> { + return Mono.just(new Entity()); + }) + .onErrorResume(error -> { + logger.error("Error processing message: {}", error.getMessage()); + // TODO: instead of signalling the error, just respond with 200 OK + // - the error is signalled on the SSE connection + // return ServerResponse.ok().build(); + return Mono.just(new Entity().status(500).body(new McpError(error.getMessage()))); + }); + + request.returnValue(mono); + } catch (IllegalArgumentException | IOException e) { + logger.error("Failed to deserialize message: {}", e.getMessage()); + request.status(400); + request.render(new McpError("Invalid message format")); + } + } + + private class WebRxMcpSessionTransport implements McpServerTransport { + + private final FluxSink sink; + + public WebRxMcpSessionTransport(FluxSink sink) { + this.sink = sink; + } + + @Override + public Mono sendMessage(McpSchema.JSONRPCMessage message) { + return Mono.fromSupplier(() -> { + try { + return objectMapper.writeValueAsString(message); + } + catch (IOException e) { + throw Exceptions.propagate(e); + } + }).doOnNext(jsonText -> { + SseEvent event = new SseEvent() + .name(MESSAGE_EVENT_TYPE) + .data(jsonText); + sink.next(event); + }).doOnError(e -> { + // TODO log with sessionid + Throwable exception = Exceptions.unwrap(e); + sink.error(exception); + }).then(); + } + + @Override + public T unmarshalFrom(Object data, TypeReference typeRef) { + return objectMapper.convertValue(data, typeRef); + } + + @Override + public Mono closeGracefully() { + return Mono.fromRunnable(sink::complete); + } + + @Override + public void close() { + sink.complete(); + } + + } + + public static Builder builder() { + return new Builder(); + } + + /** + * Builder for creating instances of {@link WebRxSseServerTransportProvider}. + *

+ * This builder provides a fluent API for configuring and creating instances of + * WebFluxSseServerTransportProvider with custom settings. + */ + public static class Builder { + + private ObjectMapper objectMapper; + + private String messageEndpoint; + + private String sseEndpoint = DEFAULT_SSE_ENDPOINT; + + /** + * Sets the ObjectMapper to use for JSON serialization/deserialization of MCP + * messages. + * @param objectMapper The ObjectMapper instance. Must not be null. + * @return this builder instance + * @throws IllegalArgumentException if objectMapper is null + */ + public Builder objectMapper(ObjectMapper objectMapper) { + Assert.notNull(objectMapper, "ObjectMapper must not be null"); + this.objectMapper = objectMapper; + return this; + } + + /** + * Sets the endpoint URI where clients should send their JSON-RPC messages. + * @param messageEndpoint The message endpoint URI. Must not be null. + * @return this builder instance + * @throws IllegalArgumentException if messageEndpoint is null + */ + public Builder messageEndpoint(String messageEndpoint) { + Assert.notNull(messageEndpoint, "Message endpoint must not be null"); + this.messageEndpoint = messageEndpoint; + return this; + } + + /** + * Sets the SSE endpoint path. + * @param sseEndpoint The SSE endpoint path. Must not be null. + * @return this builder instance + * @throws IllegalArgumentException if sseEndpoint is null + */ + public Builder sseEndpoint(String sseEndpoint) { + Assert.notNull(sseEndpoint, "SSE endpoint must not be null"); + this.sseEndpoint = sseEndpoint; + return this; + } + + /** + * Builds a new instance of {@link WebRxSseServerTransportProvider} with the + * configured settings. + * @return A new WebFluxSseServerTransportProvider instance + * @throws IllegalStateException if required parameters are not set + */ + public WebRxSseServerTransportProvider build() { + Assert.notNull(objectMapper, "ObjectMapper must be set"); + Assert.notNull(messageEndpoint, "Message endpoint must be set"); + + return new WebRxSseServerTransportProvider(objectMapper, messageEndpoint, sseEndpoint); + } + } +} diff --git a/pom.xml b/pom.xml index 560a8a7..e1c730a 100644 --- a/pom.xml +++ b/pom.xml @@ -92,12 +92,14 @@ 4.2.0 5.0.1 4.1.0 + 3.1.2 mcp-bom mcp + mcp-solon/mcp-solon-webrx mcp-spring/mcp-spring-webflux mcp-spring/mcp-spring-webmvc From 598064206ab66749e060d8aa5cd6d8c92fd682c6 Mon Sep 17 00:00:00 2001 From: noear Date: Tue, 8 Apr 2025 14:39:52 +0800 Subject: [PATCH 2/2] fixed: Fixed ImageContent and EmbeddedResource not fetching properties --- .../main/java/io/modelcontextprotocol/spec/McpSchema.java | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/mcp/src/main/java/io/modelcontextprotocol/spec/McpSchema.java b/mcp/src/main/java/io/modelcontextprotocol/spec/McpSchema.java index 3772e76..b844c6e 100644 --- a/mcp/src/main/java/io/modelcontextprotocol/spec/McpSchema.java +++ b/mcp/src/main/java/io/modelcontextprotocol/spec/McpSchema.java @@ -1284,6 +1284,9 @@ public TextContent(String content) { @JsonInclude(JsonInclude.Include.NON_ABSENT) @JsonIgnoreProperties(ignoreUnknown = true) + @Data + @AllArgsConstructor + @NoArgsConstructor public static class ImageContent implements Content { // @formatter:on @JsonProperty("audience") List audience; @JsonProperty("priority") Double priority; @@ -1293,6 +1296,9 @@ public static class ImageContent implements Content { // @formatter:on @JsonInclude(JsonInclude.Include.NON_ABSENT) @JsonIgnoreProperties(ignoreUnknown = true) + @Data + @AllArgsConstructor + @NoArgsConstructor public static class EmbeddedResource implements Content { // @formatter:on @JsonProperty("audience") List audience; @JsonProperty("priority") Double priority;