Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -11,24 +11,21 @@
import java.net.http.HttpResponse;
import java.net.http.HttpResponse.BodyHandler;
import java.time.Duration;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletionException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Function;

import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.modelcontextprotocol.json.TypeRef;
import io.modelcontextprotocol.json.McpJsonMapper;

import io.modelcontextprotocol.client.transport.ResponseSubscribers.ResponseEvent;
import io.modelcontextprotocol.client.transport.customizer.McpAsyncHttpClientRequestCustomizer;
import io.modelcontextprotocol.client.transport.customizer.McpSyncHttpClientRequestCustomizer;
import io.modelcontextprotocol.client.transport.ResponseSubscribers.ResponseEvent;
import io.modelcontextprotocol.common.McpTransportContext;
import io.modelcontextprotocol.json.McpJsonMapper;
import io.modelcontextprotocol.json.TypeRef;
import io.modelcontextprotocol.spec.ClosedMcpTransportSession;
import io.modelcontextprotocol.spec.DefaultMcpTransportSession;
import io.modelcontextprotocol.spec.DefaultMcpTransportStream;
Expand All @@ -42,6 +39,9 @@
import io.modelcontextprotocol.spec.ProtocolVersions;
import io.modelcontextprotocol.util.Assert;
import io.modelcontextprotocol.util.Utils;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
Expand Down Expand Up @@ -78,8 +78,6 @@ public class HttpClientStreamableHttpTransport implements McpClientTransport {

private static final Logger logger = LoggerFactory.getLogger(HttpClientStreamableHttpTransport.class);

private static final String MCP_PROTOCOL_VERSION = ProtocolVersions.MCP_2025_06_18;

private static final String DEFAULT_ENDPOINT = "/mcp";

/**
Expand Down Expand Up @@ -125,9 +123,14 @@ public class HttpClientStreamableHttpTransport implements McpClientTransport {

private final AtomicReference<Consumer<Throwable>> exceptionHandler = new AtomicReference<>();

private final List<String> supportedProtocolVersions;

private final String latestSupportedProtocolVersion;

private HttpClientStreamableHttpTransport(McpJsonMapper jsonMapper, HttpClient httpClient,
HttpRequest.Builder requestBuilder, String baseUri, String endpoint, boolean resumableStreams,
boolean openConnectionOnStartup, McpAsyncHttpClientRequestCustomizer httpRequestCustomizer) {
boolean openConnectionOnStartup, McpAsyncHttpClientRequestCustomizer httpRequestCustomizer,
List<String> supportedProtocolVersions) {
this.jsonMapper = jsonMapper;
this.httpClient = httpClient;
this.requestBuilder = requestBuilder;
Expand All @@ -137,12 +140,16 @@ private HttpClientStreamableHttpTransport(McpJsonMapper jsonMapper, HttpClient h
this.openConnectionOnStartup = openConnectionOnStartup;
this.activeSession.set(createTransportSession());
this.httpRequestCustomizer = httpRequestCustomizer;
this.supportedProtocolVersions = Collections.unmodifiableList(supportedProtocolVersions);
this.latestSupportedProtocolVersion = this.supportedProtocolVersions.stream()
.sorted(Comparator.reverseOrder())
.findFirst()
.get();
}

@Override
public List<String> protocolVersions() {
return List.of(ProtocolVersions.MCP_2024_11_05, ProtocolVersions.MCP_2025_03_26,
ProtocolVersions.MCP_2025_06_18);
return supportedProtocolVersions;
}

public static Builder builder(String baseUri) {
Expand Down Expand Up @@ -186,7 +193,7 @@ private Publisher<Void> createDelete(String sessionId) {
.uri(uri)
.header("Cache-Control", "no-cache")
.header(HttpHeaders.MCP_SESSION_ID, sessionId)
.header(HttpHeaders.PROTOCOL_VERSION, MCP_PROTOCOL_VERSION)
.header(HttpHeaders.PROTOCOL_VERSION, this.latestSupportedProtocolVersion)
.DELETE();
var transportContext = ctx.getOrDefault(McpTransportContext.KEY, McpTransportContext.EMPTY);
return Mono.from(this.httpRequestCustomizer.customize(builder, "DELETE", uri, null, transportContext));
Expand Down Expand Up @@ -257,7 +264,7 @@ private Mono<Disposable> reconnect(McpTransportStream<Disposable> stream) {
var builder = requestBuilder.uri(uri)
.header(HttpHeaders.ACCEPT, TEXT_EVENT_STREAM)
.header("Cache-Control", "no-cache")
.header(HttpHeaders.PROTOCOL_VERSION, MCP_PROTOCOL_VERSION)
.header(HttpHeaders.PROTOCOL_VERSION, this.latestSupportedProtocolVersion)
.GET();
var transportContext = connectionCtx.getOrDefault(McpTransportContext.KEY, McpTransportContext.EMPTY);
return Mono.from(this.httpRequestCustomizer.customize(builder, "GET", uri, null, transportContext));
Expand Down Expand Up @@ -432,7 +439,7 @@ public Mono<Void> sendMessage(McpSchema.JSONRPCMessage sentMessage) {
.header(HttpHeaders.ACCEPT, APPLICATION_JSON + ", " + TEXT_EVENT_STREAM)
.header(HttpHeaders.CONTENT_TYPE, APPLICATION_JSON)
.header(HttpHeaders.CACHE_CONTROL, "no-cache")
.header(HttpHeaders.PROTOCOL_VERSION, MCP_PROTOCOL_VERSION)
.header(HttpHeaders.PROTOCOL_VERSION, this.latestSupportedProtocolVersion)
.POST(HttpRequest.BodyPublishers.ofString(jsonBody));
var transportContext = ctx.getOrDefault(McpTransportContext.KEY, McpTransportContext.EMPTY);
return Mono
Expand Down Expand Up @@ -624,6 +631,9 @@ public static class Builder {

private Duration connectTimeout = Duration.ofSeconds(10);

private List<String> supportedProtocolVersions = List.of(ProtocolVersions.MCP_2024_11_05,
ProtocolVersions.MCP_2025_03_26, ProtocolVersions.MCP_2025_06_18);

/**
* Creates a new builder with the specified base URI.
* @param baseUri the base URI of the MCP server
Expand Down Expand Up @@ -772,6 +782,30 @@ public Builder connectTimeout(Duration connectTimeout) {
return this;
}

/**
* Sets the list of supported protocol versions used in version negotiation. By
* default, the client will send the latest of those versions in the
* {@code MCP-Protocol-Version} header.
* <p>
* Setting this value only updates the values used in version negotiation, and
* does NOT impact the actual capabilities of the transport. It should only be
* used for compatibility with servers having strict requirements around the
* {@code MCP-Protocol-Version} header.
* @param supportedProtocolVersions protocol versions supported by this transport
* @return this builder
* @see <a href=
* "https://modelcontextprotocol.io/specification/2024-11-05/basic/lifecycle#version-negotiation">version
* negotiation specification</a>
* @see <a href=
* "https://modelcontextprotocol.io/specification/2025-06-18/basic/transports#protocol-version-header">Protocol
* Version Header</a>
*/
public Builder supportedProtocolVersions(List<String> supportedProtocolVersions) {
Assert.notEmpty(supportedProtocolVersions, "supportedProtocolVersions must not be empty");
this.supportedProtocolVersions = Collections.unmodifiableList(supportedProtocolVersions);
return this;
}

/**
* Construct a fresh instance of {@link HttpClientStreamableHttpTransport} using
* the current builder configuration.
Expand All @@ -781,7 +815,7 @@ public HttpClientStreamableHttpTransport build() {
HttpClient httpClient = this.clientBuilder.connectTimeout(this.connectTimeout).build();
return new HttpClientStreamableHttpTransport(jsonMapper == null ? McpJsonMapper.getDefault() : jsonMapper,
httpClient, requestBuilder, baseUri, endpoint, resumableStreams, openConnectionOnStartup,
httpRequestCustomizer);
httpRequestCustomizer, supportedProtocolVersions);
}

}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
/*
* Copyright 2025-2025 the original author or authors.
*/

package io.modelcontextprotocol.common;

import java.util.List;
import java.util.Map;
import java.util.function.BiFunction;

import io.modelcontextprotocol.client.McpClient;
import io.modelcontextprotocol.client.transport.HttpClientStreamableHttpTransport;
import io.modelcontextprotocol.server.McpServer;
import io.modelcontextprotocol.server.McpServerFeatures;
import io.modelcontextprotocol.server.McpSyncServer;
import io.modelcontextprotocol.server.McpSyncServerExchange;
import io.modelcontextprotocol.server.transport.HttpServletStreamableServerTransportProvider;
import io.modelcontextprotocol.server.transport.McpTestRequestRecordingServletFilter;
import io.modelcontextprotocol.server.transport.TomcatTestUtil;
import io.modelcontextprotocol.spec.McpSchema;
import io.modelcontextprotocol.spec.ProtocolVersions;
import org.apache.catalina.LifecycleException;
import org.apache.catalina.LifecycleState;
import org.apache.catalina.startup.Tomcat;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;

import static org.assertj.core.api.Assertions.assertThat;

class HttpClientStreamableHttpVersionNegotiationIntegrationTests {

private Tomcat tomcat;

private static final int PORT = TomcatTestUtil.findAvailablePort();

private final McpTestRequestRecordingServletFilter requestRecordingFilter = new McpTestRequestRecordingServletFilter();

private final HttpServletStreamableServerTransportProvider transport = HttpServletStreamableServerTransportProvider
.builder()
.contextExtractor(
req -> McpTransportContext.create(Map.of("protocol-version", req.getHeader("MCP-protocol-version"))))
.build();

private final McpSchema.Tool toolSpec = McpSchema.Tool.builder()
.name("test-tool")
.description("return the protocol version used")
.build();

private final BiFunction<McpSyncServerExchange, McpSchema.CallToolRequest, McpSchema.CallToolResult> toolHandler = (
exchange, request) -> new McpSchema.CallToolResult(
exchange.transportContext().get("protocol-version").toString(), null);

McpSyncServer mcpServer = McpServer.sync(transport)
.capabilities(McpSchema.ServerCapabilities.builder().tools(false).build())
.tools(new McpServerFeatures.SyncToolSpecification(toolSpec, null, toolHandler))
.build();

@AfterEach
void tearDown() {
stopTomcat();
}

@Test
void usesLatestVersion() {
startTomcat();

var client = McpClient.sync(HttpClientStreamableHttpTransport.builder("http://localhost:" + PORT).build())
.build();

client.initialize();
McpSchema.CallToolResult response = client.callTool(new McpSchema.CallToolRequest("test-tool", Map.of()));

var calls = requestRecordingFilter.getCalls();

assertThat(calls).filteredOn(c -> !c.body().contains("\"method\":\"initialize\""))
// GET /mcp ; POST notification/initialized ; POST tools/call
.hasSize(3)
.map(McpTestRequestRecordingServletFilter.Call::headers)
.allSatisfy(headers -> assertThat(headers).containsEntry("mcp-protocol-version",
ProtocolVersions.MCP_2025_06_18));

assertThat(response).isNotNull();
assertThat(response.content()).hasSize(1)
.first()
.extracting(McpSchema.TextContent.class::cast)
.extracting(McpSchema.TextContent::text)
.isEqualTo(ProtocolVersions.MCP_2025_06_18);
mcpServer.close();
}

@Test
void usesCustomLatestVersion() {
startTomcat();

var transport = HttpClientStreamableHttpTransport.builder("http://localhost:" + PORT)
.supportedProtocolVersions(List.of(ProtocolVersions.MCP_2025_06_18, "2263-03-18"))
.build();
var client = McpClient.sync(transport).build();

client.initialize();
McpSchema.CallToolResult response = client.callTool(new McpSchema.CallToolRequest("test-tool", Map.of()));

var calls = requestRecordingFilter.getCalls();

assertThat(calls).filteredOn(c -> !c.body().contains("\"method\":\"initialize\""))
// GET /mcp ; POST notification/initialized ; POST tools/call
.hasSize(3)
.map(McpTestRequestRecordingServletFilter.Call::headers)
.allSatisfy(headers -> assertThat(headers).containsEntry("mcp-protocol-version", "2263-03-18"));

assertThat(response).isNotNull();
assertThat(response.content()).hasSize(1)
.first()
.extracting(McpSchema.TextContent.class::cast)
.extracting(McpSchema.TextContent::text)
.isEqualTo("2263-03-18");
mcpServer.close();
}

private void startTomcat() {
tomcat = TomcatTestUtil.createTomcatServer("", PORT, transport, requestRecordingFilter);
try {
tomcat.start();
assertThat(tomcat.getServer().getState()).isEqualTo(LifecycleState.STARTED);
}
catch (Exception e) {
throw new RuntimeException("Failed to start Tomcat", e);
}
}

private void stopTomcat() {
if (tomcat != null) {
try {
tomcat.stop();
tomcat.destroy();
}
catch (LifecycleException e) {
throw new RuntimeException("Failed to stop Tomcat", e);
}
}
}

}
Loading