-
Notifications
You must be signed in to change notification settings - Fork 183
[WIP] StreamableHttpServerTransport implementation #449
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Co-authored-by: @zshea
|
||||||||||||||||||
kotlin-sdk-core/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/types/serializers.kt
Outdated
Show resolved
Hide resolved
| private suspend fun validateProtocolVersion(call: ApplicationCall): Boolean { | ||
| val protocolVersions = call.request.headers.getAll(MCP_PROTOCOL_VERSION_HEADER) | ||
| val version = protocolVersions?.lastOrNull() ?: DEFAULT_NEGOTIATED_PROTOCOL_VERSION | ||
|
|
||
| return when (version) { | ||
| !in SUPPORTED_PROTOCOL_VERSIONS -> { | ||
| call.reject( | ||
| HttpStatusCode.BadRequest, | ||
| RPCError.ErrorCode.CONNECTION_CLOSED, | ||
| "Bad Request: Unsupported protocol version (supported versions: ${ | ||
| SUPPORTED_PROTOCOL_VERSIONS.joinToString( | ||
| ", ", | ||
| ) | ||
| })", | ||
| ) | ||
| false | ||
| } | ||
|
|
||
| else -> true | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about we handle version extraction and validation separately? Also, could we standardize the validation process for all transportation types? Having it in a central location would be great.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not sure
for stdio, for example, a version check is not required
| internal const val MCP_SESSION_ID_HEADER = "mcp-session-id" | ||
| private const val MCP_PROTOCOL_VERSION_HEADER = "mcp-protocol-version" | ||
| private const val MCP_RESUMPTION_TOKEN_HEADER = "Last-Event-ID" | ||
| private const val MAXIMUM_MESSAGE_SIZE = 4 * 1024 * 1024 // 4 MB |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These constants could really help with all the different protocols. How about we make them a standard and reuse them?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For sse, the session-id header is not sent and there is no resumption token. For now, this is needed exclusively for streamable
...commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/server/StreamableHttpServerTransport.kt
Outdated
Show resolved
Hide resolved
...commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/server/StreamableHttpServerTransport.kt
Outdated
Show resolved
Hide resolved
| public var sessionId: String? = null | ||
| private set | ||
|
|
||
| private var sessionIdGenerator: (() -> String)? = { Uuid.random().toString() } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's create AbstractServerTransport and move it there
kotlin-sdk-core/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/types/jsonRpc.kt
Outdated
Show resolved
Hide resolved
| * | ||
| * Set undefined to disable session management. | ||
| */ | ||
| public fun setSessionIdGenerator(block: (() -> String)?) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be great if this were standard across all server transport methods. However, I’m not sure it should be publicly available.
| /** | ||
| * A callback for session initialization events | ||
| * This is called when the server initializes a new session. | ||
| * Useful in cases when you need to register multiple mcp sessions | ||
| * and need to keep track of them. | ||
| */ | ||
| public fun setOnSessionInitialized(block: ((String) -> Unit)?) { | ||
| onSessionInitialized = block | ||
| } | ||
|
|
||
| /** | ||
| * A callback for session close events | ||
| * This is called when the server closes a session due to a DELETE request. | ||
| * Useful in cases when you need to clean up resources associated with the session. | ||
| * Note that this is different from the transport closing, if you are handling | ||
| * HTTP requests from multiple nodes you might want to close each | ||
| * StreamableHTTPServerTransport after a request is completed while still keeping the | ||
| * session open/running. | ||
| */ | ||
| public fun setOnSessionClosed(block: ((String) -> Unit)?) { | ||
| onSessionClosed = block | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks like this kind of functionality is pretty standard across all transport types, or at least for server transports.
...commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/server/StreamableHttpServerTransport.kt
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
This PR implements the StreamableHttpServerTransport for the MCP (Model Context Protocol) specification, adding support for both Server-Sent Events (SSE) streaming and direct HTTP JSON responses with optional resumability support.
Key Changes:
- New
StreamableHttpServerTransportclass supporting SSE and HTTP JSON response modes EventStoreinterface for resumability via event storage and replayJSONRPCEmptyMessagetype for priming SSE events and empty message handling
Reviewed changes
Copilot reviewed 12 out of 12 changed files in this pull request and generated 10 comments.
Show a summary per file
| File | Description |
|---|---|
kotlin-sdk-server/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/server/StreamableHttpServerTransport.kt |
Core implementation of the streamable HTTP transport with session management, SSE streaming, and HTTP JSON response support |
kotlin-sdk-server/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/server/EventStore.kt |
Interface definition for event storage and replay to enable resumability |
kotlin-sdk-core/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/types/jsonRpc.kt |
Added JSONRPCEmptyMessage type and made JSONRPCError.id nullable |
kotlin-sdk-core/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/types/serializers.kt |
Updated JSON-RPC message serializer to handle empty messages |
kotlin-sdk-core/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/types/common.kt |
Added DEFAULT_NEGOTIATED_PROTOCOL_VERSION constant and removed unused imports |
kotlin-sdk-core/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/shared/Protocol.kt |
Added handling for JSONRPCEmptyMessage in protocol message processing |
kotlin-sdk-server/src/jvmTest/kotlin/io/modelcontextprotocol/kotlin/sdk/server/StreamableHttpServerTransportTest.kt |
Comprehensive test suite covering initialization, batched requests, and notifications |
kotlin-sdk-core/src/commonTest/kotlin/io/modelcontextprotocol/kotlin/sdk/types/JsonRpcTest.kt |
Test for JSONRPCEmptyMessage deserialization |
kotlin-sdk-server/build.gradle.kts |
Added test dependencies for Ktor and Kotest |
gradle/libs.versions.toml |
Added library definitions for Ktor content negotiation and serialization |
kotlin-sdk-server/api/kotlin-sdk-server.api |
Public API surface additions for StreamableHttpServerTransport and EventStore |
kotlin-sdk-core/api/kotlin-sdk-core.api |
Public API surface additions for new types and constants |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| * Returns the stream ID associated with [eventId], or null if the event is unknown. | ||
| * Default implementation is a no-op which disables extra validation during replay. |
Copilot
AI
Dec 5, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Missing KDoc documentation for the getStreamIdForEventId method. The inline comment on line 30 describes it as a "no-op" default implementation, but there's no actual default implementation provided (it's an interface method). Consider either providing a default implementation or clarifying the documentation to explain that implementers may throw NotImplementedError or UnsupportedOperationException to disable this validation.
| * Returns the stream ID associated with [eventId], or null if the event is unknown. | |
| * Default implementation is a no-op which disables extra validation during replay. | |
| * Returns the stream ID associated with the given [eventId], or null if the event is unknown. | |
| * | |
| * Implementers may throw [NotImplementedError] or [UnsupportedOperationException] to disable | |
| * extra validation during replay, as there is no default implementation provided. | |
| * | |
| * @param eventId The event ID to look up. | |
| * @return The stream ID associated with the event, or null if unknown. |
| private val streamMutex = Mutex() | ||
|
|
||
| private companion object { | ||
| const val STANDALONE_SSE_STREAM_ID = "_GET_stream" |
Copilot
AI
Dec 5, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The constant STANDALONE_SSE_STREAM_ID is marked as part of the companion object but is being exported in the public API (line 161 of the API file). If this is intended as an internal implementation detail, it should not be exposed in the public API. Consider making this internal or private if it's not meant for external consumption.
| const val STANDALONE_SSE_STREAM_ID = "_GET_stream" | |
| private const val STANDALONE_SSE_STREAM_ID = "_GET_stream" |
| call.reject( | ||
| HttpStatusCode.BadRequest, | ||
| RPCError.ErrorCode.CONNECTION_CLOSED, | ||
| "Bad Request: Mcp-Session-Id header is required", |
Copilot
AI
Dec 5, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The error message references "Mcp-Session-Id" with inconsistent capitalization. The constant MCP_SESSION_ID_HEADER is defined as "mcp-session-id" (lowercase), but the error message uses "Mcp-Session-Id". For consistency and clarity, use the actual header name as defined in the constant.
| call.reject( | ||
| HttpStatusCode.BadRequest, | ||
| RPCError.ErrorCode.CONNECTION_CLOSED, | ||
| "Bad Request: Mcp-Session-Id header must be a single value", |
Copilot
AI
Dec 5, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The error message references "Mcp-Session-Id" with inconsistent capitalization. The constant MCP_SESSION_ID_HEADER is defined as "mcp-session-id" (lowercase), but the error message uses "Mcp-Session-Id". For consistency and clarity, use the actual header name as defined in the constant.
| * Stores an event for later retrieval | ||
| * @param streamId ID of the stream the event belongs to | ||
| * @param message The JSON-RPC message to store | ||
| * @returns The generated event ID for the stored event |
Copilot
AI
Dec 5, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The comment style uses @returns instead of the Kotlin standard @return. KDoc convention uses @return (singular) for documenting return values.
| * @returns The generated event ID for the stored event | |
| * @return The generated event ID for the stored event |
| // TODO(check order) | ||
| // assertEquals(listOf(firstRequest.id, secondRequest.id), responses.map { it.id }) | ||
| // val firstMeta = (responses[0].result as EmptyResult).meta | ||
| // val secondMeta = (responses[1].result as EmptyResult).meta | ||
| // assertEquals("first", firstMeta?.get("label")?.jsonPrimitive?.content) | ||
| // assertEquals("second", secondMeta?.get("label")?.jsonPrimitive?.content) |
Copilot
AI
Dec 5, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Commented-out test code should be removed before merging. If order verification is intended for future implementation, consider creating a separate TODO issue or ticket instead of leaving commented code in the codebase.
| // TODO(check order) | |
| // assertEquals(listOf(firstRequest.id, secondRequest.id), responses.map { it.id }) | |
| // val firstMeta = (responses[0].result as EmptyResult).meta | |
| // val secondMeta = (responses[1].result as EmptyResult).meta | |
| // assertEquals("first", firstMeta?.get("label")?.jsonPrimitive?.content) | |
| // assertEquals("second", secondMeta?.get("label")?.jsonPrimitive?.content) | |
| */ | ||
| @Serializable | ||
| public data class JSONRPCError(val id: RequestId, val error: RPCError) : JSONRPCMessage { | ||
| public data class JSONRPCError(val id: RequestId?, val error: RPCError) : JSONRPCMessage { |
Copilot
AI
Dec 5, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The id field of JSONRPCError is now nullable, but this could be a breaking API change. According to the JSON-RPC 2.0 specification, the id member is required in error responses except when there was an error in detecting the id in the request object (e.g., parse error). Consider documenting when null is appropriate, or ensure this aligns with how the codebase handles error scenarios where the request ID cannot be determined.
| streamsMapping.values.forEach { | ||
| try { | ||
| it.session?.close() | ||
| } catch (_: Exception) { |
Copilot
AI
Dec 5, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Empty catch block silently swallows exceptions. While this may be intentional during cleanup, consider at least logging the exception or adding a comment explaining why it's safe to ignore.
| } catch (_: Exception) { | |
| } catch (e: Exception) { | |
| println("Exception while closing session: ${e.message}") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
logger should be used
| val eventId = eventStore?.storeEvent(streamId, message) | ||
| try { | ||
| session?.send(event = "message", id = eventId, data = McpJson.encodeToString(message)) | ||
| } catch (_: Exception) { |
Copilot
AI
Dec 5, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Empty catch block silently swallows exceptions. While this may be intentional when a stream is already closed, consider at least logging the exception or adding a comment explaining why it's safe to ignore, especially since this is in the message emission path.
| } catch (_: Exception) { | |
| } catch (e: Exception) { | |
| println("emitOnStream: Exception sending message on stream $streamId: ${e.message}") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@devcrocod 👍🏻
Let's merge this PR and continue in the follow-ups
[WIP] StreamableHttpServerTransport implementation
Important
The work is in progress. StreamableHttpServerTransport is not production-ready yet!!!
Based on #235
How Has This Been Tested?
Breaking Changes
None
Types of changes
Checklist