diff --git a/mcp-spring/mcp-spring-webflux/src/main/java/io/modelcontextprotocol/client/transport/InternalWebClientSseClientTransport.java b/mcp-spring/mcp-spring-webflux/src/main/java/io/modelcontextprotocol/client/transport/InternalWebClientSseClientTransport.java new file mode 100644 index 000000000..7fe2bf397 --- /dev/null +++ b/mcp-spring/mcp-spring-webflux/src/main/java/io/modelcontextprotocol/client/transport/InternalWebClientSseClientTransport.java @@ -0,0 +1,362 @@ +/* + * Copyright 2024 - 2024 the original author or authors. + */ + +package io.modelcontextprotocol.client.transport; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import io.modelcontextprotocol.spec.*; +import io.modelcontextprotocol.spec.McpSchema.JSONRPCMessage; +import io.modelcontextprotocol.util.Assert; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.core.ParameterizedTypeReference; +import org.springframework.http.MediaType; +import org.springframework.http.codec.ServerSentEvent; +import org.springframework.web.reactive.function.client.WebClient; +import reactor.core.Disposable; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; +import reactor.core.publisher.Sinks; +import reactor.core.publisher.SynchronousSink; +import reactor.core.scheduler.Schedulers; +import reactor.util.retry.Retry; +import reactor.util.retry.Retry.RetrySignal; + +import java.io.IOException; +import java.util.List; +import java.util.function.BiConsumer; +import java.util.function.Function; + +/** + * Server-Sent Events (SSE) implementation of the {@link McpTransport} that follows the + * MCP HTTP with SSE transport specification. + * + *

+ * This transport establishes a bidirectional communication channel where: + *

+ * + *

+ * The message flow follows these steps: + *

    + *
  1. The client establishes an SSE connection to the server's /sse endpoint
  2. + *
  3. The server sends an 'endpoint' event containing the URI for sending messages
  4. + *
+ * + * This implementation uses {@link WebClient} for HTTP communications and supports JSON + * serialization/deserialization of messages. + * + * NOTE: This class is temporarily used by deprecated {@link WebFluxSseClientTransport}, + * it should be merged into {@link WebClientSseClientTransport} once + * {@link WebFluxSseClientTransport} is removed. + * + * @author Christian Tzolov + * @author Yanming Zhou + * @see MCP + * HTTP with SSE Transport Specification + */ +abstract class InternalWebClientSseClientTransport implements McpClientTransport { + + private final Logger logger = LoggerFactory.getLogger(getClass()); + + private static final String MCP_PROTOCOL_VERSION = ProtocolVersions.MCP_2024_11_05; + + /** + * Event type for JSON-RPC messages received through the SSE connection. The server + * sends messages with this event type to transmit JSON-RPC protocol data. + */ + private static final String MESSAGE_EVENT_TYPE = "message"; + + /** + * Event type for receiving the message endpoint URI from the server. The server MUST + * send this event when a client connects, providing the URI where the client should + * send its messages via HTTP POST. + */ + private static final String ENDPOINT_EVENT_TYPE = "endpoint"; + + /** + * Default SSE endpoint path as specified by the MCP transport specification. This + * endpoint is used to establish the SSE connection with the server. + */ + static final String DEFAULT_SSE_ENDPOINT = "/sse"; + + /** + * Type reference for parsing SSE events containing string data. + */ + private static final ParameterizedTypeReference> SSE_TYPE = new ParameterizedTypeReference<>() { + }; + + /** + * WebClient instance for handling both SSE connections and HTTP POST requests. Used + * for establishing the SSE connection and sending outbound messages. + */ + private final WebClient webClient; + + /** + * ObjectMapper for serializing outbound messages and deserializing inbound messages. + * Handles conversion between JSON-RPC messages and their string representation. + */ + protected ObjectMapper objectMapper; + + /** + * Subscription for the SSE connection handling inbound messages. Used for cleanup + * during transport shutdown. + */ + private Disposable inboundSubscription; + + /** + * Flag indicating if the transport is in the process of shutting down. Used to + * prevent new operations during shutdown and handle cleanup gracefully. + */ + private volatile boolean isClosing = false; + + /** + * Sink for managing the message endpoint URI provided by the server. Stores the most + * recent endpoint URI and makes it available for outbound message processing. + */ + protected final Sinks.One messageEndpointSink = Sinks.one(); + + /** + * The SSE endpoint URI provided by the server. Used for sending outbound messages via + * HTTP POST requests. + */ + private String sseEndpoint; + + /** + * Constructs a new SseClientTransport with the specified WebClient builder. Uses a + * default ObjectMapper instance for JSON processing. + * @param webClientBuilder the WebClient.Builder to use for creating the WebClient + * instance + * @throws IllegalArgumentException if webClientBuilder is null + */ + public InternalWebClientSseClientTransport(WebClient.Builder webClientBuilder) { + this(webClientBuilder, new ObjectMapper()); + } + + /** + * Constructs a new SseClientTransport with the specified WebClient builder and + * ObjectMapper. Initializes both inbound and outbound message processing pipelines. + * @param webClientBuilder the WebClient.Builder to use for creating the WebClient + * instance + * @param objectMapper the ObjectMapper to use for JSON processing + * @throws IllegalArgumentException if either parameter is null + */ + public InternalWebClientSseClientTransport(WebClient.Builder webClientBuilder, ObjectMapper objectMapper) { + this(webClientBuilder, objectMapper, DEFAULT_SSE_ENDPOINT); + } + + /** + * Constructs a new SseClientTransport with the specified WebClient builder and + * ObjectMapper. Initializes both inbound and outbound message processing pipelines. + * @param webClientBuilder the WebClient.Builder to use for creating the WebClient + * instance + * @param objectMapper the ObjectMapper to use for JSON processing + * @param sseEndpoint the SSE endpoint URI to use for establishing the connection + * @throws IllegalArgumentException if either parameter is null + */ + public InternalWebClientSseClientTransport(WebClient.Builder webClientBuilder, ObjectMapper objectMapper, + String sseEndpoint) { + Assert.notNull(objectMapper, "ObjectMapper must not be null"); + Assert.notNull(webClientBuilder, "WebClient.Builder must not be null"); + Assert.hasText(sseEndpoint, "SSE endpoint must not be null or empty"); + + this.objectMapper = objectMapper; + this.webClient = webClientBuilder.build(); + this.sseEndpoint = sseEndpoint; + } + + @Override + public List protocolVersions() { + return List.of(MCP_PROTOCOL_VERSION); + } + + /** + * Establishes a connection to the MCP server using Server-Sent Events (SSE). This + * method initiates the SSE connection and sets up the message processing pipeline. + * + *

+ * The connection process follows these steps: + *

    + *
  1. Establishes an SSE connection to the server's /sse endpoint
  2. + *
  3. Waits for the server to send an 'endpoint' event with the message posting + * URI
  4. + *
  5. Sets up message handling for incoming JSON-RPC messages
  6. + *
+ * + *

+ * The connection is considered established only after receiving the endpoint event + * from the server. + * @param handler a function that processes incoming JSON-RPC messages and returns + * responses + * @return a Mono that completes when the connection is fully established + * @throws McpError if there's an error processing SSE events or if an unrecognized + * event type is received + */ + @Override + public Mono connect(Function, Mono> handler) { + // TODO: Avoid eager connection opening and enable resilience + // -> upon disconnects, re-establish connection + // -> allow optimizing for eager connection start using a constructor flag + Flux> events = eventStream(); + this.inboundSubscription = events.concatMap(event -> Mono.just(event).handle((e, s) -> { + if (ENDPOINT_EVENT_TYPE.equals(event.event())) { + String messageEndpointUri = event.data(); + if (messageEndpointSink.tryEmitValue(messageEndpointUri).isSuccess()) { + s.complete(); + } + else { + // TODO: clarify with the spec if multiple events can be + // received + s.error(new McpError("Failed to handle SSE endpoint event")); + } + } + else if (MESSAGE_EVENT_TYPE.equals(event.event())) { + try { + JSONRPCMessage message = McpSchema.deserializeJsonRpcMessage(this.objectMapper, event.data()); + s.next(message); + } + catch (IOException ioException) { + s.error(ioException); + } + } + else { + logger.debug("Received unrecognized SSE event type: {}", event); + s.complete(); + } + }).transform(handler)).subscribe(); + + // The connection is established once the server sends the endpoint event + return messageEndpointSink.asMono().then(); + } + + /** + * Sends a JSON-RPC message to the server using the endpoint provided during + * connection. + * + *

+ * Messages are sent via HTTP POST requests to the server-provided endpoint URI. The + * message is serialized to JSON before transmission. If the transport is in the + * process of closing, the message send operation is skipped gracefully. + * @param message the JSON-RPC message to send + * @return a Mono that completes when the message has been sent successfully + * @throws RuntimeException if message serialization fails + */ + @Override + public Mono sendMessage(JSONRPCMessage message) { + // The messageEndpoint is the endpoint URI to send the messages + // It is provided by the server as part of the endpoint event + return messageEndpointSink.asMono().flatMap(messageEndpointUri -> { + if (isClosing) { + return Mono.empty(); + } + try { + String jsonText = this.objectMapper.writeValueAsString(message); + return webClient.post() + .uri(messageEndpointUri) + .contentType(MediaType.APPLICATION_JSON) + .header(HttpHeaders.PROTOCOL_VERSION, MCP_PROTOCOL_VERSION) + .bodyValue(jsonText) + .retrieve() + .toBodilessEntity() + .doOnSuccess(response -> { + logger.debug("Message sent successfully"); + }) + .doOnError(error -> { + if (!isClosing) { + logger.error("Error sending message: {}", error.getMessage()); + } + }); + } + catch (IOException e) { + if (!isClosing) { + return Mono.error(new RuntimeException("Failed to serialize message", e)); + } + return Mono.empty(); + } + }).then(); // TODO: Consider non-200-ok response + } + + /** + * Initializes and starts the inbound SSE event processing. Establishes the SSE + * connection and sets up event handling for both message and endpoint events. + * Includes automatic retry logic for handling transient connection failures. + */ + // visible for tests + protected Flux> eventStream() {// @formatter:off + return this.webClient + .get() + .uri(this.sseEndpoint) + .accept(MediaType.TEXT_EVENT_STREAM) + .header(HttpHeaders.PROTOCOL_VERSION, MCP_PROTOCOL_VERSION) + .retrieve() + .bodyToFlux(SSE_TYPE) + .retryWhen(Retry.from(retrySignal -> retrySignal.handle(inboundRetryHandler))); + } // @formatter:on + + /** + * Retry handler for the inbound SSE stream. Implements the retry logic for handling + * connection failures and other errors. + */ + private BiConsumer> inboundRetryHandler = (retrySpec, sink) -> { + if (isClosing) { + logger.debug("SSE connection closed during shutdown"); + sink.error(retrySpec.failure()); + return; + } + if (retrySpec.failure() instanceof IOException) { + logger.debug("Retrying SSE connection after IO error"); + sink.next(retrySpec); + return; + } + logger.error("Fatal SSE error, not retrying: {}", retrySpec.failure().getMessage()); + sink.error(retrySpec.failure()); + }; + + /** + * Implements graceful shutdown of the transport. Cleans up all resources including + * subscriptions and schedulers. Ensures orderly shutdown of both inbound and outbound + * message processing. + * @return a Mono that completes when shutdown is finished + */ + @Override + public Mono closeGracefully() { // @formatter:off + return Mono.fromRunnable(() -> { + isClosing = true; + + // Dispose of subscriptions + + if (inboundSubscription != null) { + inboundSubscription.dispose(); + } + + }) + .then() + .subscribeOn(Schedulers.boundedElastic()); + } // @formatter:on + + /** + * Unmarshalls data from a generic Object into the specified type using the configured + * ObjectMapper. + * + *

+ * This method is particularly useful when working with JSON-RPC parameters or result + * objects that need to be converted to specific Java types. It leverages Jackson's + * type conversion capabilities to handle complex object structures. + * @param the target type to convert the data into + * @param data the source object to convert + * @param typeRef the TypeReference describing the target type + * @return the unmarshalled object of type T + * @throws IllegalArgumentException if the conversion cannot be performed + */ + @Override + public T unmarshalFrom(Object data, TypeReference typeRef) { + return this.objectMapper.convertValue(data, typeRef); + } + +} diff --git a/mcp-spring/mcp-spring-webflux/src/main/java/io/modelcontextprotocol/client/transport/WebClientSseClientTransport.java b/mcp-spring/mcp-spring-webflux/src/main/java/io/modelcontextprotocol/client/transport/WebClientSseClientTransport.java new file mode 100644 index 000000000..15bc25b52 --- /dev/null +++ b/mcp-spring/mcp-spring-webflux/src/main/java/io/modelcontextprotocol/client/transport/WebClientSseClientTransport.java @@ -0,0 +1,142 @@ +/* + * Copyright 2024 - 2024 the original author or authors. + */ + +package io.modelcontextprotocol.client.transport; + +import com.fasterxml.jackson.databind.ObjectMapper; +import io.modelcontextprotocol.util.Assert; + +import org.springframework.web.reactive.function.client.WebClient; + +/** + * Server-Sent Events (SSE) implementation of the + * {@link io.modelcontextprotocol.spec.McpTransport} that follows the MCP HTTP with SSE + * transport specification. + * + *

+ * This transport establishes a bidirectional communication channel where: + *

    + *
  • Inbound messages are received through an SSE connection from the server
  • + *
  • Outbound messages are sent via HTTP POST requests to a server-provided + * endpoint
  • + *
+ * + *

+ * The message flow follows these steps: + *

    + *
  1. The client establishes an SSE connection to the server's /sse endpoint
  2. + *
  3. The server sends an 'endpoint' event containing the URI for sending messages
  4. + *
+ * + * This implementation uses {@link WebClient} for HTTP communications and supports JSON + * serialization/deserialization of messages. + * + * @author Christian Tzolov + * @author Yanming Zhou + * @see MCP + * HTTP with SSE Transport Specification + */ +public class WebClientSseClientTransport extends InternalWebClientSseClientTransport { + + /** + * Constructs a new SseClientTransport with the specified WebClient builder. Uses a + * default ObjectMapper instance for JSON processing. + * @param webClientBuilder the WebClient.Builder to use for creating the WebClient + * instance + * @throws IllegalArgumentException if webClientBuilder is null + */ + public WebClientSseClientTransport(WebClient.Builder webClientBuilder) { + super(webClientBuilder); + } + + /** + * Constructs a new SseClientTransport with the specified WebClient builder and + * ObjectMapper. Initializes both inbound and outbound message processing pipelines. + * @param webClientBuilder the WebClient.Builder to use for creating the WebClient + * instance + * @param objectMapper the ObjectMapper to use for JSON processing + * @throws IllegalArgumentException if either parameter is null + */ + public WebClientSseClientTransport(WebClient.Builder webClientBuilder, ObjectMapper objectMapper) { + super(webClientBuilder, objectMapper); + } + + /** + * Constructs a new SseClientTransport with the specified WebClient builder and + * ObjectMapper. Initializes both inbound and outbound message processing pipelines. + * @param webClientBuilder the WebClient.Builder to use for creating the WebClient + * instance + * @param objectMapper the ObjectMapper to use for JSON processing + * @param sseEndpoint the SSE endpoint URI to use for establishing the connection + * @throws IllegalArgumentException if either parameter is null + */ + public WebClientSseClientTransport(WebClient.Builder webClientBuilder, ObjectMapper objectMapper, + String sseEndpoint) { + super(webClientBuilder, objectMapper, sseEndpoint); + } + + /** + * Creates a new builder for {@link WebClientSseClientTransport}. + * @param webClientBuilder the WebClient.Builder to use for creating the WebClient + * instance + * @return a new builder instance + */ + public static Builder builder(WebClient.Builder webClientBuilder) { + return new Builder(webClientBuilder); + } + + /** + * Builder for {@link WebClientSseClientTransport}. + */ + public static class Builder { + + private final WebClient.Builder webClientBuilder; + + private String sseEndpoint = DEFAULT_SSE_ENDPOINT; + + private ObjectMapper objectMapper = new ObjectMapper(); + + /** + * Creates a new builder with the specified WebClient.Builder. + * @param webClientBuilder the WebClient.Builder to use + */ + public Builder(WebClient.Builder webClientBuilder) { + Assert.notNull(webClientBuilder, "WebClient.Builder must not be null"); + this.webClientBuilder = webClientBuilder; + } + + /** + * Sets the SSE endpoint path. + * @param sseEndpoint the SSE endpoint path + * @return this builder + */ + public Builder sseEndpoint(String sseEndpoint) { + Assert.hasText(sseEndpoint, "sseEndpoint must not be empty"); + this.sseEndpoint = sseEndpoint; + return this; + } + + /** + * Sets the object mapper for JSON serialization/deserialization. + * @param objectMapper the object mapper + * @return this builder + */ + public Builder objectMapper(ObjectMapper objectMapper) { + Assert.notNull(objectMapper, "objectMapper must not be null"); + this.objectMapper = objectMapper; + return this; + } + + /** + * Builds a new {@link WebClientSseClientTransport} instance. + * @return a new transport instance + */ + public WebClientSseClientTransport build() { + return new WebClientSseClientTransport(webClientBuilder, objectMapper, sseEndpoint); + } + + } + +} diff --git a/mcp-spring/mcp-spring-webflux/src/main/java/io/modelcontextprotocol/client/transport/WebClientStreamableHttpTransport.java b/mcp-spring/mcp-spring-webflux/src/main/java/io/modelcontextprotocol/client/transport/WebClientStreamableHttpTransport.java index 4758fd2d2..33a442413 100644 --- a/mcp-spring/mcp-spring-webflux/src/main/java/io/modelcontextprotocol/client/transport/WebClientStreamableHttpTransport.java +++ b/mcp-spring/mcp-spring-webflux/src/main/java/io/modelcontextprotocol/client/transport/WebClientStreamableHttpTransport.java @@ -60,7 +60,7 @@ * "https://modelcontextprotocol.io/specification/2024-11-05/basic/transports#http-with-sse">"HTTP * with SSE" transport. In order to communicate over the phased-out * 2024-11-05 protocol, use {@link HttpClientSseClientTransport} or - * {@link WebFluxSseClientTransport}. + * {@link WebClientSseClientTransport}. *

* * @author Dariusz Jędrzejczyk diff --git a/mcp-spring/mcp-spring-webflux/src/main/java/io/modelcontextprotocol/client/transport/WebFluxSseClientTransport.java b/mcp-spring/mcp-spring-webflux/src/main/java/io/modelcontextprotocol/client/transport/WebFluxSseClientTransport.java index 75caebef0..28e1d2eaa 100644 --- a/mcp-spring/mcp-spring-webflux/src/main/java/io/modelcontextprotocol/client/transport/WebFluxSseClientTransport.java +++ b/mcp-spring/mcp-spring-webflux/src/main/java/io/modelcontextprotocol/client/transport/WebFluxSseClientTransport.java @@ -4,35 +4,10 @@ package io.modelcontextprotocol.client.transport; -import java.io.IOException; -import java.util.List; -import java.util.function.BiConsumer; -import java.util.function.Function; - -import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; -import io.modelcontextprotocol.spec.HttpHeaders; -import io.modelcontextprotocol.spec.McpClientTransport; -import io.modelcontextprotocol.spec.McpError; -import io.modelcontextprotocol.spec.McpSchema; -import io.modelcontextprotocol.spec.McpSchema.JSONRPCMessage; -import io.modelcontextprotocol.spec.ProtocolVersions; import io.modelcontextprotocol.util.Assert; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import reactor.core.Disposable; -import reactor.core.publisher.Flux; -import reactor.core.publisher.Mono; -import reactor.core.publisher.Sinks; -import reactor.core.publisher.SynchronousSink; -import reactor.core.scheduler.Schedulers; -import reactor.util.retry.Retry; -import reactor.util.retry.Retry.RetrySignal; -import org.springframework.core.ParameterizedTypeReference; -import org.springframework.http.MediaType; -import org.springframework.http.codec.ServerSentEvent; import org.springframework.web.reactive.function.client.WebClient; /** @@ -59,76 +34,14 @@ * serialization/deserialization of messages. * * @author Christian Tzolov + * @author Yanming Zhou + * @deprecated use {@link WebClientSseClientTransport} instead * @see MCP * HTTP with SSE Transport Specification */ -public class WebFluxSseClientTransport implements McpClientTransport { - - private static final Logger logger = LoggerFactory.getLogger(WebFluxSseClientTransport.class); - - private static final String MCP_PROTOCOL_VERSION = ProtocolVersions.MCP_2024_11_05; - - /** - * Event type for JSON-RPC messages received through the SSE connection. The server - * sends messages with this event type to transmit JSON-RPC protocol data. - */ - private static final String MESSAGE_EVENT_TYPE = "message"; - - /** - * Event type for receiving the message endpoint URI from the server. The server MUST - * send this event when a client connects, providing the URI where the client should - * send its messages via HTTP POST. - */ - private static final String ENDPOINT_EVENT_TYPE = "endpoint"; - - /** - * Default SSE endpoint path as specified by the MCP transport specification. This - * endpoint is used to establish the SSE connection with the server. - */ - private static final String DEFAULT_SSE_ENDPOINT = "/sse"; - - /** - * Type reference for parsing SSE events containing string data. - */ - private static final ParameterizedTypeReference> SSE_TYPE = new ParameterizedTypeReference<>() { - }; - - /** - * WebClient instance for handling both SSE connections and HTTP POST requests. Used - * for establishing the SSE connection and sending outbound messages. - */ - private final WebClient webClient; - - /** - * ObjectMapper for serializing outbound messages and deserializing inbound messages. - * Handles conversion between JSON-RPC messages and their string representation. - */ - protected ObjectMapper objectMapper; - - /** - * Subscription for the SSE connection handling inbound messages. Used for cleanup - * during transport shutdown. - */ - private Disposable inboundSubscription; - - /** - * Flag indicating if the transport is in the process of shutting down. Used to - * prevent new operations during shutdown and handle cleanup gracefully. - */ - private volatile boolean isClosing = false; - - /** - * Sink for managing the message endpoint URI provided by the server. Stores the most - * recent endpoint URI and makes it available for outbound message processing. - */ - protected final Sinks.One messageEndpointSink = Sinks.one(); - - /** - * The SSE endpoint URI provided by the server. Used for sending outbound messages via - * HTTP POST requests. - */ - private String sseEndpoint; +@Deprecated(forRemoval = true) +public class WebFluxSseClientTransport extends InternalWebClientSseClientTransport { /** * Constructs a new SseClientTransport with the specified WebClient builder. Uses a @@ -138,7 +51,7 @@ public class WebFluxSseClientTransport implements McpClientTransport { * @throws IllegalArgumentException if webClientBuilder is null */ public WebFluxSseClientTransport(WebClient.Builder webClientBuilder) { - this(webClientBuilder, new ObjectMapper()); + super(webClientBuilder); } /** @@ -150,7 +63,7 @@ public WebFluxSseClientTransport(WebClient.Builder webClientBuilder) { * @throws IllegalArgumentException if either parameter is null */ public WebFluxSseClientTransport(WebClient.Builder webClientBuilder, ObjectMapper objectMapper) { - this(webClientBuilder, objectMapper, DEFAULT_SSE_ENDPOINT); + super(webClientBuilder, objectMapper); } /** @@ -164,201 +77,7 @@ public WebFluxSseClientTransport(WebClient.Builder webClientBuilder, ObjectMappe */ public WebFluxSseClientTransport(WebClient.Builder webClientBuilder, ObjectMapper objectMapper, String sseEndpoint) { - Assert.notNull(objectMapper, "ObjectMapper must not be null"); - Assert.notNull(webClientBuilder, "WebClient.Builder must not be null"); - Assert.hasText(sseEndpoint, "SSE endpoint must not be null or empty"); - - this.objectMapper = objectMapper; - this.webClient = webClientBuilder.build(); - this.sseEndpoint = sseEndpoint; - } - - @Override - public List protocolVersions() { - return List.of(MCP_PROTOCOL_VERSION); - } - - /** - * Establishes a connection to the MCP server using Server-Sent Events (SSE). This - * method initiates the SSE connection and sets up the message processing pipeline. - * - *

- * The connection process follows these steps: - *

    - *
  1. Establishes an SSE connection to the server's /sse endpoint
  2. - *
  3. Waits for the server to send an 'endpoint' event with the message posting - * URI
  4. - *
  5. Sets up message handling for incoming JSON-RPC messages
  6. - *
- * - *

- * The connection is considered established only after receiving the endpoint event - * from the server. - * @param handler a function that processes incoming JSON-RPC messages and returns - * responses - * @return a Mono that completes when the connection is fully established - * @throws McpError if there's an error processing SSE events or if an unrecognized - * event type is received - */ - @Override - public Mono connect(Function, Mono> handler) { - // TODO: Avoid eager connection opening and enable resilience - // -> upon disconnects, re-establish connection - // -> allow optimizing for eager connection start using a constructor flag - Flux> events = eventStream(); - this.inboundSubscription = events.concatMap(event -> Mono.just(event).handle((e, s) -> { - if (ENDPOINT_EVENT_TYPE.equals(event.event())) { - String messageEndpointUri = event.data(); - if (messageEndpointSink.tryEmitValue(messageEndpointUri).isSuccess()) { - s.complete(); - } - else { - // TODO: clarify with the spec if multiple events can be - // received - s.error(new McpError("Failed to handle SSE endpoint event")); - } - } - else if (MESSAGE_EVENT_TYPE.equals(event.event())) { - try { - JSONRPCMessage message = McpSchema.deserializeJsonRpcMessage(this.objectMapper, event.data()); - s.next(message); - } - catch (IOException ioException) { - s.error(ioException); - } - } - else { - logger.debug("Received unrecognized SSE event type: {}", event); - s.complete(); - } - }).transform(handler)).subscribe(); - - // The connection is established once the server sends the endpoint event - return messageEndpointSink.asMono().then(); - } - - /** - * Sends a JSON-RPC message to the server using the endpoint provided during - * connection. - * - *

- * Messages are sent via HTTP POST requests to the server-provided endpoint URI. The - * message is serialized to JSON before transmission. If the transport is in the - * process of closing, the message send operation is skipped gracefully. - * @param message the JSON-RPC message to send - * @return a Mono that completes when the message has been sent successfully - * @throws RuntimeException if message serialization fails - */ - @Override - public Mono sendMessage(JSONRPCMessage message) { - // The messageEndpoint is the endpoint URI to send the messages - // It is provided by the server as part of the endpoint event - return messageEndpointSink.asMono().flatMap(messageEndpointUri -> { - if (isClosing) { - return Mono.empty(); - } - try { - String jsonText = this.objectMapper.writeValueAsString(message); - return webClient.post() - .uri(messageEndpointUri) - .contentType(MediaType.APPLICATION_JSON) - .header(HttpHeaders.PROTOCOL_VERSION, MCP_PROTOCOL_VERSION) - .bodyValue(jsonText) - .retrieve() - .toBodilessEntity() - .doOnSuccess(response -> { - logger.debug("Message sent successfully"); - }) - .doOnError(error -> { - if (!isClosing) { - logger.error("Error sending message: {}", error.getMessage()); - } - }); - } - catch (IOException e) { - if (!isClosing) { - return Mono.error(new RuntimeException("Failed to serialize message", e)); - } - return Mono.empty(); - } - }).then(); // TODO: Consider non-200-ok response - } - - /** - * Initializes and starts the inbound SSE event processing. Establishes the SSE - * connection and sets up event handling for both message and endpoint events. - * Includes automatic retry logic for handling transient connection failures. - */ - // visible for tests - protected Flux> eventStream() {// @formatter:off - return this.webClient - .get() - .uri(this.sseEndpoint) - .accept(MediaType.TEXT_EVENT_STREAM) - .header(HttpHeaders.PROTOCOL_VERSION, MCP_PROTOCOL_VERSION) - .retrieve() - .bodyToFlux(SSE_TYPE) - .retryWhen(Retry.from(retrySignal -> retrySignal.handle(inboundRetryHandler))); - } // @formatter:on - - /** - * Retry handler for the inbound SSE stream. Implements the retry logic for handling - * connection failures and other errors. - */ - private BiConsumer> inboundRetryHandler = (retrySpec, sink) -> { - if (isClosing) { - logger.debug("SSE connection closed during shutdown"); - sink.error(retrySpec.failure()); - return; - } - if (retrySpec.failure() instanceof IOException) { - logger.debug("Retrying SSE connection after IO error"); - sink.next(retrySpec); - return; - } - logger.error("Fatal SSE error, not retrying: {}", retrySpec.failure().getMessage()); - sink.error(retrySpec.failure()); - }; - - /** - * Implements graceful shutdown of the transport. Cleans up all resources including - * subscriptions and schedulers. Ensures orderly shutdown of both inbound and outbound - * message processing. - * @return a Mono that completes when shutdown is finished - */ - @Override - public Mono closeGracefully() { // @formatter:off - return Mono.fromRunnable(() -> { - isClosing = true; - - // Dispose of subscriptions - - if (inboundSubscription != null) { - inboundSubscription.dispose(); - } - - }) - .then() - .subscribeOn(Schedulers.boundedElastic()); - } // @formatter:on - - /** - * Unmarshalls data from a generic Object into the specified type using the configured - * ObjectMapper. - * - *

- * This method is particularly useful when working with JSON-RPC parameters or result - * objects that need to be converted to specific Java types. It leverages Jackson's - * type conversion capabilities to handle complex object structures. - * @param the target type to convert the data into - * @param data the source object to convert - * @param typeRef the TypeReference describing the target type - * @return the unmarshalled object of type T - * @throws IllegalArgumentException if the conversion cannot be performed - */ - @Override - public T unmarshalFrom(Object data, TypeReference typeRef) { - return this.objectMapper.convertValue(data, typeRef); + super(webClientBuilder, objectMapper, sseEndpoint); } /** diff --git a/mcp-spring/mcp-spring-webflux/src/test/java/io/modelcontextprotocol/WebFluxSseIntegrationTests.java b/mcp-spring/mcp-spring-webflux/src/test/java/io/modelcontextprotocol/WebFluxSseIntegrationTests.java index 8ce714f94..5998714a0 100644 --- a/mcp-spring/mcp-spring-webflux/src/test/java/io/modelcontextprotocol/WebFluxSseIntegrationTests.java +++ b/mcp-spring/mcp-spring-webflux/src/test/java/io/modelcontextprotocol/WebFluxSseIntegrationTests.java @@ -38,7 +38,7 @@ import io.modelcontextprotocol.client.McpClient; import io.modelcontextprotocol.client.transport.HttpClientSseClientTransport; -import io.modelcontextprotocol.client.transport.WebFluxSseClientTransport; +import io.modelcontextprotocol.client.transport.WebClientSseClientTransport; import io.modelcontextprotocol.server.McpServer; import io.modelcontextprotocol.server.McpServerFeatures; import io.modelcontextprotocol.server.McpSyncServerExchange; @@ -102,7 +102,7 @@ public void before() { .build())); clientBuilders.put("webflux", McpClient - .sync(WebFluxSseClientTransport.builder(WebClient.builder().baseUrl("http://localhost:" + PORT)) + .sync(WebClientSseClientTransport.builder(WebClient.builder().baseUrl("http://localhost:" + PORT)) .sseEndpoint(CUSTOM_SSE_ENDPOINT) .build())); diff --git a/mcp-spring/mcp-spring-webflux/src/test/java/io/modelcontextprotocol/client/WebClientSseMcpAsyncClientTests.java b/mcp-spring/mcp-spring-webflux/src/test/java/io/modelcontextprotocol/client/WebClientSseMcpAsyncClientTests.java new file mode 100644 index 000000000..fe4d6bd5e --- /dev/null +++ b/mcp-spring/mcp-spring-webflux/src/test/java/io/modelcontextprotocol/client/WebClientSseMcpAsyncClientTests.java @@ -0,0 +1,56 @@ +/* + * Copyright 2024-2024 the original author or authors. + */ + +package io.modelcontextprotocol.client; + +import io.modelcontextprotocol.client.transport.WebClientSseClientTransport; +import io.modelcontextprotocol.spec.McpClientTransport; +import org.junit.jupiter.api.Timeout; +import org.springframework.web.reactive.function.client.WebClient; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.wait.strategy.Wait; + +import java.time.Duration; + +/** + * Tests for the {@link McpAsyncClient} with {@link WebClientSseClientTransport}. + * + * @author Christian Tzolov + * @author Yanming Zhou + */ +@Timeout(15) // Giving extra time beyond the client timeout +class WebClientSseMcpAsyncClientTests extends AbstractMcpAsyncClientTests { + + static String host = "http://localhost:3001"; + + // Uses the https://github.com/tzolov/mcp-everything-server-docker-image + @SuppressWarnings("resource") + GenericContainer container = new GenericContainer<>("docker.io/tzolov/mcp-everything-server:v2") + .withCommand("node dist/index.js sse") + .withLogConsumer(outputFrame -> System.out.println(outputFrame.getUtf8String())) + .withExposedPorts(3001) + .waitingFor(Wait.forHttp("/").forStatusCode(404)); + + @Override + protected McpClientTransport createMcpTransport() { + return WebClientSseClientTransport.builder(WebClient.builder().baseUrl(host)).build(); + } + + @Override + protected void onStart() { + container.start(); + int port = container.getMappedPort(3001); + host = "http://" + container.getHost() + ":" + port; + } + + @Override + public void onClose() { + container.stop(); + } + + protected Duration getInitializationTimeout() { + return Duration.ofSeconds(1); + } + +} diff --git a/mcp-spring/mcp-spring-webflux/src/test/java/io/modelcontextprotocol/client/WebClientSseMcpSyncClientTests.java b/mcp-spring/mcp-spring-webflux/src/test/java/io/modelcontextprotocol/client/WebClientSseMcpSyncClientTests.java new file mode 100644 index 000000000..dc457ce1f --- /dev/null +++ b/mcp-spring/mcp-spring-webflux/src/test/java/io/modelcontextprotocol/client/WebClientSseMcpSyncClientTests.java @@ -0,0 +1,56 @@ +/* + * Copyright 2024-2024 the original author or authors. + */ + +package io.modelcontextprotocol.client; + +import io.modelcontextprotocol.client.transport.WebClientSseClientTransport; +import io.modelcontextprotocol.spec.McpClientTransport; +import org.junit.jupiter.api.Timeout; +import org.springframework.web.reactive.function.client.WebClient; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.wait.strategy.Wait; + +import java.time.Duration; + +/** + * Tests for the {@link McpSyncClient} with {@link WebClientSseClientTransport}. + * + * @author Christian Tzolov + * @author Yanming Zhou + */ +@Timeout(15) // Giving extra time beyond the client timeout +class WebClientSseMcpSyncClientTests extends AbstractMcpSyncClientTests { + + static String host = "http://localhost:3001"; + + // Uses the https://github.com/tzolov/mcp-everything-server-docker-image + @SuppressWarnings("resource") + GenericContainer container = new GenericContainer<>("docker.io/tzolov/mcp-everything-server:v2") + .withCommand("node dist/index.js sse") + .withLogConsumer(outputFrame -> System.out.println(outputFrame.getUtf8String())) + .withExposedPorts(3001) + .waitingFor(Wait.forHttp("/").forStatusCode(404)); + + @Override + protected McpClientTransport createMcpTransport() { + return WebClientSseClientTransport.builder(WebClient.builder().baseUrl(host)).build(); + } + + @Override + protected void onStart() { + container.start(); + int port = container.getMappedPort(3001); + host = "http://" + container.getHost() + ":" + port; + } + + @Override + protected void onClose() { + container.stop(); + } + + protected Duration getInitializationTimeout() { + return Duration.ofSeconds(1); + } + +} diff --git a/mcp-spring/mcp-spring-webflux/src/test/java/io/modelcontextprotocol/client/WebFluxSseMcpAsyncClientTests.java b/mcp-spring/mcp-spring-webflux/src/test/java/io/modelcontextprotocol/client/WebFluxSseMcpAsyncClientTests.java index 0edf4cd54..091a1d6cf 100644 --- a/mcp-spring/mcp-spring-webflux/src/test/java/io/modelcontextprotocol/client/WebFluxSseMcpAsyncClientTests.java +++ b/mcp-spring/mcp-spring-webflux/src/test/java/io/modelcontextprotocol/client/WebFluxSseMcpAsyncClientTests.java @@ -4,12 +4,8 @@ package io.modelcontextprotocol.client; -import java.time.Duration; - import org.junit.jupiter.api.Timeout; import org.springframework.web.reactive.function.client.WebClient; -import org.testcontainers.containers.GenericContainer; -import org.testcontainers.containers.wait.strategy.Wait; import io.modelcontextprotocol.client.transport.WebFluxSseClientTransport; import io.modelcontextprotocol.spec.McpClientTransport; @@ -18,39 +14,15 @@ * Tests for the {@link McpAsyncClient} with {@link WebFluxSseClientTransport}. * * @author Christian Tzolov + * @author Yanming Zhou */ @Timeout(15) // Giving extra time beyond the client timeout -class WebFluxSseMcpAsyncClientTests extends AbstractMcpAsyncClientTests { - - static String host = "http://localhost:3001"; - - // Uses the https://github.com/tzolov/mcp-everything-server-docker-image - @SuppressWarnings("resource") - GenericContainer container = new GenericContainer<>("docker.io/tzolov/mcp-everything-server:v2") - .withCommand("node dist/index.js sse") - .withLogConsumer(outputFrame -> System.out.println(outputFrame.getUtf8String())) - .withExposedPorts(3001) - .waitingFor(Wait.forHttp("/").forStatusCode(404)); +@Deprecated(forRemoval = true) +class WebFluxSseMcpAsyncClientTests extends WebClientSseMcpAsyncClientTests { @Override protected McpClientTransport createMcpTransport() { return WebFluxSseClientTransport.builder(WebClient.builder().baseUrl(host)).build(); } - @Override - protected void onStart() { - container.start(); - int port = container.getMappedPort(3001); - host = "http://" + container.getHost() + ":" + port; - } - - @Override - public void onClose() { - container.stop(); - } - - protected Duration getInitializationTimeout() { - return Duration.ofSeconds(1); - } - } diff --git a/mcp-spring/mcp-spring-webflux/src/test/java/io/modelcontextprotocol/client/WebFluxSseMcpSyncClientTests.java b/mcp-spring/mcp-spring-webflux/src/test/java/io/modelcontextprotocol/client/WebFluxSseMcpSyncClientTests.java index 9b0959a35..c2b8ccb92 100644 --- a/mcp-spring/mcp-spring-webflux/src/test/java/io/modelcontextprotocol/client/WebFluxSseMcpSyncClientTests.java +++ b/mcp-spring/mcp-spring-webflux/src/test/java/io/modelcontextprotocol/client/WebFluxSseMcpSyncClientTests.java @@ -4,13 +4,9 @@ package io.modelcontextprotocol.client; -import java.time.Duration; - import io.modelcontextprotocol.client.transport.WebFluxSseClientTransport; import io.modelcontextprotocol.spec.McpClientTransport; import org.junit.jupiter.api.Timeout; -import org.testcontainers.containers.GenericContainer; -import org.testcontainers.containers.wait.strategy.Wait; import org.springframework.web.reactive.function.client.WebClient; @@ -18,39 +14,15 @@ * Tests for the {@link McpSyncClient} with {@link WebFluxSseClientTransport}. * * @author Christian Tzolov + * @author Yanming Zhou */ @Timeout(15) // Giving extra time beyond the client timeout -class WebFluxSseMcpSyncClientTests extends AbstractMcpSyncClientTests { - - static String host = "http://localhost:3001"; - - // Uses the https://github.com/tzolov/mcp-everything-server-docker-image - @SuppressWarnings("resource") - GenericContainer container = new GenericContainer<>("docker.io/tzolov/mcp-everything-server:v2") - .withCommand("node dist/index.js sse") - .withLogConsumer(outputFrame -> System.out.println(outputFrame.getUtf8String())) - .withExposedPorts(3001) - .waitingFor(Wait.forHttp("/").forStatusCode(404)); +@Deprecated(forRemoval = true) +class WebFluxSseMcpSyncClientTests extends WebClientSseMcpSyncClientTests { @Override protected McpClientTransport createMcpTransport() { return WebFluxSseClientTransport.builder(WebClient.builder().baseUrl(host)).build(); } - @Override - protected void onStart() { - container.start(); - int port = container.getMappedPort(3001); - host = "http://" + container.getHost() + ":" + port; - } - - @Override - protected void onClose() { - container.stop(); - } - - protected Duration getInitializationTimeout() { - return Duration.ofSeconds(1); - } - } diff --git a/mcp-spring/mcp-spring-webflux/src/test/java/io/modelcontextprotocol/client/transport/WebClientSseClientTransportTests.java b/mcp-spring/mcp-spring-webflux/src/test/java/io/modelcontextprotocol/client/transport/WebClientSseClientTransportTests.java new file mode 100644 index 000000000..ccbfce238 --- /dev/null +++ b/mcp-spring/mcp-spring-webflux/src/test/java/io/modelcontextprotocol/client/transport/WebClientSseClientTransportTests.java @@ -0,0 +1,367 @@ +/* + * Copyright 2024-2024 the original author or authors. + */ + +package io.modelcontextprotocol.client.transport; + +import com.fasterxml.jackson.databind.ObjectMapper; +import io.modelcontextprotocol.spec.McpSchema; +import io.modelcontextprotocol.spec.McpSchema.JSONRPCRequest; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; +import org.springframework.http.codec.ServerSentEvent; +import org.springframework.web.reactive.function.client.WebClient; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.wait.strategy.Wait; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; +import reactor.core.publisher.Sinks; +import reactor.test.StepVerifier; + +import java.time.Duration; +import java.util.Map; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Function; + +import static org.assertj.core.api.Assertions.*; + +/** + * Tests for the {@link WebClientSseClientTransport} class. + * + * @author Christian Tzolov + */ +@Timeout(15) +class WebClientSseClientTransportTests { + + static String host = "http://localhost:3001"; + + @SuppressWarnings("resource") + GenericContainer container = new GenericContainer<>("docker.io/tzolov/mcp-everything-server:v2") + .withCommand("node dist/index.js sse") + .withLogConsumer(outputFrame -> System.out.println(outputFrame.getUtf8String())) + .withExposedPorts(3001) + .waitingFor(Wait.forHttp("/").forStatusCode(404)); + + private TestSseClientTransport transport; + + private WebClient.Builder webClientBuilder; + + private ObjectMapper objectMapper; + + // Test class to access protected methods + static class TestSseClientTransport extends WebClientSseClientTransport { + + private final AtomicInteger inboundMessageCount = new AtomicInteger(0); + + private Sinks.Many> events = Sinks.many().unicast().onBackpressureBuffer(); + + public TestSseClientTransport(WebClient.Builder webClientBuilder, ObjectMapper objectMapper) { + super(webClientBuilder, objectMapper); + } + + @Override + protected Flux> eventStream() { + return super.eventStream().mergeWith(events.asFlux()); + } + + public String getLastEndpoint() { + return messageEndpointSink.asMono().block(); + } + + public int getInboundMessageCount() { + return inboundMessageCount.get(); + } + + public void simulateSseComment(String comment) { + events.tryEmitNext(ServerSentEvent.builder().comment(comment).build()); + inboundMessageCount.incrementAndGet(); + } + + public void simulateEndpointEvent(String jsonMessage) { + events.tryEmitNext(ServerSentEvent.builder().event("endpoint").data(jsonMessage).build()); + inboundMessageCount.incrementAndGet(); + } + + public void simulateMessageEvent(String jsonMessage) { + events.tryEmitNext(ServerSentEvent.builder().event("message").data(jsonMessage).build()); + inboundMessageCount.incrementAndGet(); + } + + } + + void startContainer() { + container.start(); + int port = container.getMappedPort(3001); + host = "http://" + container.getHost() + ":" + port; + } + + @BeforeEach + void setUp() { + startContainer(); + webClientBuilder = WebClient.builder().baseUrl(host); + objectMapper = new ObjectMapper(); + transport = new TestSseClientTransport(webClientBuilder, objectMapper); + transport.connect(Function.identity()).block(); + } + + @AfterEach + void afterEach() { + if (transport != null) { + assertThatCode(() -> transport.closeGracefully().block(Duration.ofSeconds(10))).doesNotThrowAnyException(); + } + cleanup(); + } + + void cleanup() { + container.stop(); + } + + @Test + void testEndpointEventHandling() { + assertThat(transport.getLastEndpoint()).startsWith("/message?"); + } + + @Test + void constructorValidation() { + assertThatThrownBy(() -> new WebClientSseClientTransport(null)).isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("WebClient.Builder must not be null"); + + assertThatThrownBy(() -> new WebClientSseClientTransport(webClientBuilder, null)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("ObjectMapper must not be null"); + } + + @Test + void testBuilderPattern() { + // Test default builder + WebClientSseClientTransport transport1 = WebClientSseClientTransport.builder(webClientBuilder).build(); + assertThatCode(() -> transport1.closeGracefully().block()).doesNotThrowAnyException(); + + // Test builder with custom ObjectMapper + ObjectMapper customMapper = new ObjectMapper(); + WebClientSseClientTransport transport2 = WebClientSseClientTransport.builder(webClientBuilder) + .objectMapper(customMapper) + .build(); + assertThatCode(() -> transport2.closeGracefully().block()).doesNotThrowAnyException(); + + // Test builder with custom SSE endpoint + WebClientSseClientTransport transport3 = WebClientSseClientTransport.builder(webClientBuilder) + .sseEndpoint("/custom-sse") + .build(); + assertThatCode(() -> transport3.closeGracefully().block()).doesNotThrowAnyException(); + + // Test builder with all custom parameters + WebClientSseClientTransport transport4 = WebClientSseClientTransport.builder(webClientBuilder) + .objectMapper(customMapper) + .sseEndpoint("/custom-sse") + .build(); + assertThatCode(() -> transport4.closeGracefully().block()).doesNotThrowAnyException(); + } + + @Test + void testCommentSseMessage() { + // If the line starts with a character (:) are comment lins and should be ingored + // https://html.spec.whatwg.org/multipage/server-sent-events.html#event-stream-interpretation + + CopyOnWriteArrayList droppedErrors = new CopyOnWriteArrayList<>(); + reactor.core.publisher.Hooks.onErrorDropped(droppedErrors::add); + + try { + // Simulate receiving the SSE comment line + transport.simulateSseComment("sse comment"); + + StepVerifier.create(transport.closeGracefully()).verifyComplete(); + + assertThat(droppedErrors).hasSize(0); + } + finally { + reactor.core.publisher.Hooks.resetOnErrorDropped(); + } + } + + @Test + void testMessageProcessing() { + // Create a test message + JSONRPCRequest testMessage = new JSONRPCRequest(McpSchema.JSONRPC_VERSION, "test-method", "test-id", + Map.of("key", "value")); + + // Simulate receiving the message + transport.simulateMessageEvent(""" + { + "jsonrpc": "2.0", + "method": "test-method", + "id": "test-id", + "params": {"key": "value"} + } + """); + + // Subscribe to messages and verify + StepVerifier.create(transport.sendMessage(testMessage)).verifyComplete(); + + assertThat(transport.getInboundMessageCount()).isEqualTo(1); + } + + @Test + void testResponseMessageProcessing() { + // Simulate receiving a response message + transport.simulateMessageEvent(""" + { + "jsonrpc": "2.0", + "id": "test-id", + "result": {"status": "success"} + } + """); + + // Create and send a request message + JSONRPCRequest testMessage = new JSONRPCRequest(McpSchema.JSONRPC_VERSION, "test-method", "test-id", + Map.of("key", "value")); + + // Verify message handling + StepVerifier.create(transport.sendMessage(testMessage)).verifyComplete(); + + assertThat(transport.getInboundMessageCount()).isEqualTo(1); + } + + @Test + void testErrorMessageProcessing() { + // Simulate receiving an error message + transport.simulateMessageEvent(""" + { + "jsonrpc": "2.0", + "id": "test-id", + "error": { + "code": -32600, + "message": "Invalid Request" + } + } + """); + + // Create and send a request message + JSONRPCRequest testMessage = new JSONRPCRequest(McpSchema.JSONRPC_VERSION, "test-method", "test-id", + Map.of("key", "value")); + + // Verify message handling + StepVerifier.create(transport.sendMessage(testMessage)).verifyComplete(); + + assertThat(transport.getInboundMessageCount()).isEqualTo(1); + } + + @Test + void testNotificationMessageProcessing() { + // Simulate receiving a notification message (no id) + transport.simulateMessageEvent(""" + { + "jsonrpc": "2.0", + "method": "update", + "params": {"status": "processing"} + } + """); + + // Verify the notification was processed + assertThat(transport.getInboundMessageCount()).isEqualTo(1); + } + + @Test + void testGracefulShutdown() { + // Test graceful shutdown + StepVerifier.create(transport.closeGracefully()).verifyComplete(); + + // Create a test message + JSONRPCRequest testMessage = new JSONRPCRequest(McpSchema.JSONRPC_VERSION, "test-method", "test-id", + Map.of("key", "value")); + + // Verify message is not processed after shutdown + StepVerifier.create(transport.sendMessage(testMessage)).verifyComplete(); + + // Message count should remain 0 after shutdown + assertThat(transport.getInboundMessageCount()).isEqualTo(0); + } + + @Test + void testRetryBehavior() { + // Create a WebClient that simulates connection failures + WebClient.Builder failingWebClientBuilder = WebClient.builder().baseUrl("http://non-existent-host"); + + WebClientSseClientTransport failingTransport = WebClientSseClientTransport.builder(failingWebClientBuilder) + .build(); + + // Verify that the transport attempts to reconnect + StepVerifier.create(Mono.delay(Duration.ofSeconds(2))).expectNextCount(1).verifyComplete(); + + // Clean up + failingTransport.closeGracefully().block(); + } + + @Test + void testMultipleMessageProcessing() { + // Simulate receiving multiple messages in sequence + transport.simulateMessageEvent(""" + { + "jsonrpc": "2.0", + "method": "method1", + "id": "id1", + "params": {"key": "value1"} + } + """); + + transport.simulateMessageEvent(""" + { + "jsonrpc": "2.0", + "method": "method2", + "id": "id2", + "params": {"key": "value2"} + } + """); + + // Create and send corresponding messages + JSONRPCRequest message1 = new JSONRPCRequest(McpSchema.JSONRPC_VERSION, "method1", "id1", + Map.of("key", "value1")); + + JSONRPCRequest message2 = new JSONRPCRequest(McpSchema.JSONRPC_VERSION, "method2", "id2", + Map.of("key", "value2")); + + // Verify both messages are processed + StepVerifier.create(transport.sendMessage(message1).then(transport.sendMessage(message2))).verifyComplete(); + + // Verify message count + assertThat(transport.getInboundMessageCount()).isEqualTo(2); + } + + @Test + void testMessageOrderPreservation() { + // Simulate receiving messages in a specific order + transport.simulateMessageEvent(""" + { + "jsonrpc": "2.0", + "method": "first", + "id": "1", + "params": {"sequence": 1} + } + """); + + transport.simulateMessageEvent(""" + { + "jsonrpc": "2.0", + "method": "second", + "id": "2", + "params": {"sequence": 2} + } + """); + + transport.simulateMessageEvent(""" + { + "jsonrpc": "2.0", + "method": "third", + "id": "3", + "params": {"sequence": 3} + } + """); + + // Verify message count and order + assertThat(transport.getInboundMessageCount()).isEqualTo(3); + } + +} diff --git a/mcp-spring/mcp-spring-webflux/src/test/java/io/modelcontextprotocol/client/transport/WebFluxSseClientTransportTests.java b/mcp-spring/mcp-spring-webflux/src/test/java/io/modelcontextprotocol/client/transport/WebFluxSseClientTransportTests.java index 1cf5dffe2..5b4c73a3e 100644 --- a/mcp-spring/mcp-spring-webflux/src/test/java/io/modelcontextprotocol/client/transport/WebFluxSseClientTransportTests.java +++ b/mcp-spring/mcp-spring-webflux/src/test/java/io/modelcontextprotocol/client/transport/WebFluxSseClientTransportTests.java @@ -37,6 +37,7 @@ * @author Christian Tzolov */ @Timeout(15) +@Deprecated(forRemoval = true) class WebFluxSseClientTransportTests { static String host = "http://localhost:3001"; diff --git a/mcp-spring/mcp-spring-webmvc/src/test/java/io/modelcontextprotocol/server/WebMvcSseIntegrationTests.java b/mcp-spring/mcp-spring-webmvc/src/test/java/io/modelcontextprotocol/server/WebMvcSseIntegrationTests.java index 45f6b94f0..309e99b27 100644 --- a/mcp-spring/mcp-spring-webmvc/src/test/java/io/modelcontextprotocol/server/WebMvcSseIntegrationTests.java +++ b/mcp-spring/mcp-spring-webmvc/src/test/java/io/modelcontextprotocol/server/WebMvcSseIntegrationTests.java @@ -23,7 +23,7 @@ import io.modelcontextprotocol.AbstractMcpClientServerIntegrationTests; import io.modelcontextprotocol.client.McpClient; import io.modelcontextprotocol.client.transport.HttpClientSseClientTransport; -import io.modelcontextprotocol.client.transport.WebFluxSseClientTransport; +import io.modelcontextprotocol.client.transport.WebClientSseClientTransport; import io.modelcontextprotocol.server.McpServer.AsyncSpecification; import io.modelcontextprotocol.server.McpServer.SingleSessionSyncSpecification; import io.modelcontextprotocol.server.transport.WebMvcSseServerTransportProvider; @@ -45,8 +45,8 @@ protected void prepareClients(int port, String mcpEndpoint) { .initializationTimeout(Duration.ofHours(10)) .requestTimeout(Duration.ofHours(10))); - clientBuilders.put("webflux", McpClient - .sync(WebFluxSseClientTransport.builder(WebClient.builder().baseUrl("http://localhost:" + port)).build())); + clientBuilders.put("webflux", McpClient.sync( + WebClientSseClientTransport.builder(WebClient.builder().baseUrl("http://localhost:" + port)).build())); } @Configuration diff --git a/mcp/src/main/java/io/modelcontextprotocol/client/transport/HttpClientStreamableHttpTransport.java b/mcp/src/main/java/io/modelcontextprotocol/client/transport/HttpClientStreamableHttpTransport.java index 3cfa7359b..6d5dbac64 100644 --- a/mcp/src/main/java/io/modelcontextprotocol/client/transport/HttpClientStreamableHttpTransport.java +++ b/mcp/src/main/java/io/modelcontextprotocol/client/transport/HttpClientStreamableHttpTransport.java @@ -62,7 +62,7 @@ * "https://modelcontextprotocol.io/specification/2024-11-05/basic/transports#http-with-sse">"HTTP * with SSE" transport. In order to communicate over the phased-out * 2024-11-05 protocol, use {@link HttpClientSseClientTransport} or - * {@code WebFluxSseClientTransport}. + * {@code WebClientSseClientTransport}. *

* * @author Christian Tzolov