feat: add SSE reconnection with retry support#596
feat: add SSE reconnection with retry support#596devcrocod merged 6 commits intodevcrocod/conformance-testsfrom
Conversation
…ven retry delays in StreamableHttpClientTransport
…gic, address retry behavior, and ensure compatibility with protocol updates
… limitation in conformance tests
kpavlov
left a comment
There was a problem hiding this comment.
@devcrocod , please check my comments
...commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/client/StreamableHttpClientTransport.kt
Show resolved
Hide resolved
...commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/client/StreamableHttpClientTransport.kt
Outdated
Show resolved
Hide resolved
| public val initialReconnectionDelay: Duration = 1.seconds, | ||
| public val maxReconnectionDelay: Duration = 30.seconds, | ||
| public val reconnectionDelayGrowFactor: Double = 1.5, | ||
| public val maxRetries: Int = 2, |
There was a problem hiding this comment.
It is also advantageous to have a jitter.
There was a problem hiding this comment.
good idea, will consider in follow-up
...commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/client/StreamableHttpClientTransport.kt
Outdated
Show resolved
Hide resolved
...commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/client/StreamableHttpClientTransport.kt
Outdated
Show resolved
Hide resolved
...commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/client/StreamableHttpClientTransport.kt
Outdated
Show resolved
Hide resolved
...commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/client/StreamableHttpClientTransport.kt
Show resolved
Hide resolved
...lient/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/client/ReconnectionOptions.kt
Outdated
Show resolved
Hide resolved
|
|
||
| private val scope by lazy { CoroutineScope(SupervisorJob() + Dispatchers.Default) } | ||
|
|
||
| @Volatile |
There was a problem hiding this comment.
@devcrocod, why are you adding @Volatile here? This not recommended here.
And still, there is a correctness problem: sseSession written from background coroutine without @Volatile
serverRetryDelay and lastEventId both received @Volatile in this PR. sseSession did not, even though it is now also written from a background coroutine (the sseJob loop via reconnectSseSession). closeResources() reads sseSession before joining sseJob, so there is no happens-before:
sseSession?.cancel() // may read stale reference
sseJob?.cancelAndJoin() // happens-before only established after thisConsider adding @Volatile to sseSession for consistency and correctness.
There was a problem hiding this comment.
Agreed, I missed @volatile on sseSession. I didn't originally plan to refactor this transport in this PR, but I went ahead and refactored it
kpavlov
left a comment
There was a problem hiding this comment.
This is a significant refactoring of StreamableHttpClientTransport.
One housekeeping note before merging: the current base branch is devcrocod/conformance-tests, which means this PR carries a lot of unrelated conformance test
changes as context. That makes the diff harder to review and will create unnecessary complexity when both branches evolve.
@devcrocod — could you finalize #585 (disabling the failing conformance tests) and merge it to main first? Once that's done, rebasing this PR onto main will bring the diff back in line with its actual purpose, and conformance test work can continue in parallel without blocking this refactoring.
Happy to re-review once the base branch is updated.
| public var protocolVersion: String? = null | ||
|
|
||
| private var sseSession: ClientSSESession? = null | ||
| private var sseJob: Job? = null |
There was a problem hiding this comment.
| private var sseJob: Job? = null | |
| private val sseJobRef = AtomicReference<Job?>(null) |
sseJob is written in startSseSession() — called from the caller's coroutine via performSend() — and read in closeResources() — called from whatever coroutine invokes close(). These are independent coroutines, potentially running on different threads. Without @Volatile or an AtomicReference, the JVM memory model does not guarantee the write in startSseSession is visible to the thread executing closeResources.
The consequence: sseJob?.cancelAndJoin() in closeResources may operate on a stale reference. scope.cancel() serves as a safety net (all children are cancelled regardless), but cancelAndJoin() — which is meant to wait for the SSE coroutine to drain — may be skipped on the wrong reference. close() can return while the SSE coroutine is still running.
There was a problem hiding this comment.
scope.cancel() in closeResources will cancel all child coroutines
| return true | ||
| } | ||
|
|
||
| override fun hashCode(): Int { |
There was a problem hiding this comment.
the hashCode() ordering doesn't match the equals() ordering (reconnectionDelayMultiplier first vs. last), which is a style inconsistency even if not a correctness bug.
There was a problem hiding this comment.
-
CancellationException swallowed in
collectSse— breaks cancellation contract -
Unowned CoroutineScope — no structured lifecycle guarantee
private val scope by lazy { CoroutineScope(SupervisorJob() + Dispatchers.Default) }This scope has no parent Job and is not linked to any caller's scope. It is manually cancelled in closeResources(). The lazy initialization means the scope isn't created until startSseSession() is first invoked, which avoids a leak if the transport is never actually used. But if it is initialized and close() is never called (e.g., caller drops the reference), the scope and all its children leak silently.
The fundamental issue: callers can't inject a lifecycle via a parent scope. The transport creates its own root, which is the root cause of needing explicit scope.cancel() in teardown.
This is a structural constraint that's hard to fix without changing the public API, but it should be documented explicitly so future contributors understand why closeResources() is not optional.
There was a problem hiding this comment.
CancellationException swallowed in collectSse — breaks cancellation contract
nope
CancellationException is handled deliberately rather than accidentally. collectSse is a terminal function that returns a result. After catch, the coroutine checks isActive and exits properly, so this does not violate the contract
Unowned CoroutineScope — no structured lifecycle guarantee
How is this comment related to this PR?
There was a problem hiding this comment.
Not related, just observation. But your explanation (not sure if it's correct approach) reveals non-trivial logic that needs to be simplified for better maintainability.
There was a problem hiding this comment.
testTerminateSession is a false positive
Location: StreamableHttpClientTransportTest.kt:185–202
val transport = createTransport { request ->
assertEquals(HttpMethod.Delete, request.method)
assertEquals("test-session-id", request.headers["mcp-session-id"])
respond(content = "", status = HttpStatusCode.OK)
}
transport.start()
transport.terminateSession() // sessionId == null → returns immediately
assertNull(transport.sessionId) // trivially true: was null before the callterminateSession() returns early on sessionId == null. The MockRequestHandler is never invoked. The assertions inside the handler never execute.
assertNull(transport.sessionId) passes trivially. The test provides zero coverage of the DELETE-request path and gives false confidence.
The same applies to testTerminateSessionHandle405.
The root cause is that sessionId now has private set, so tests can't seed it directly. The fix is to drive a full handshake first (POST initialize → receive
mcp-session-id header → then call terminateSession()), or to add a test-only constructor parameter. As written, these tests should be marked @ignore with a
note explaining what they actually cover.
|
@kpavlov please, if you use ai review, check the comments before posting them I don't see a problem with merging this into conformance-tests |
|
@devcrocod, GH checks did not run for this PR. This is good enough reason for me. |
|
Good point about checking AI comments. I do my best to review the comments before sharing them. In several recent PRs, AI quickly identified gaps and some non-trivial logical issues that could have been addressed earlier. @devrocod, could you please run an AI self-review before requesting human review in the future? This would likely make the process smoother for everyone. |
Codecov Report❌ Patch coverage is 📢 Thoughts on this report? Let us know! |
https://github.com/modelcontextprotocol/kotlin-sdk/actions/runs/22949664879
I didn’t request a review from you on this PR. In fact, I hadn’t requested one from anyone initially because I thought I would refine it further. And yes, I use different tools |
| public val reconnectionDelayMultiplier: Double = 1.5, | ||
| public val maxRetries: Int = 2, | ||
| ) { | ||
| override fun equals(other: Any?): Boolean { |
There was a problem hiding this comment.
why don't use data class for this? to avoid manual hashCode/equals, and also it gives you the pattern options.copy([only specific fields])
There was a problem hiding this comment.
Data classes are indeed more convenient
the problem is that any changes to them can introduce binary incompatibility in the future
https://kotlinlang.org/docs/api-guidelines-backward-compatibility.html#avoid-using-data-classes-in-your-api
| * Sends a single message with optional resumption support | ||
| */ | ||
| @Suppress("ReturnCount", "CyclomaticComplexMethod") | ||
| @Suppress("ReturnCount", "CyclomaticComplexMethod", "LongMethod", "TooGenericExceptionCaught", "ThrowsCount") |
There was a problem hiding this comment.
пацаны, вот серьезно, зачем вы нацепили этих говнокодерских линтеров?) как будто на джаваскрипте пишем)
There was a problem hiding this comment.
This serves as a reminder of technical debt to address. "CyclomaticComplexMethod" and "LongMethod" are particularly valuable.
There was a problem hiding this comment.
This serves as a reminder of technical debt to address.
Baseline is technical debt
In my opinion, the source code should not contain Suppress annotations for third-party tools. This tends to clutter the codebase and can make long-term maintenance harder, especially if we later migrate to different tooling
Detekt is a useful plugin, but I currently have a couple of concerns about using it in this project
First, every project or product has its own lifecycle. At the current stage of the kotlin-sdk development, this plugin feels somewhat excessive. It slows down development without providing much value at the moment, and the codebase becomes filled with Suppress annotations. It may also discourage external contributors from participating
Second, I'm not convinced that relying purely on the default rules is the right approach. It’s challenging to create large, universal rule sets that work well for every type of development: frameworks, libraries, enterprise applications, compose apps, etc. With the default setup, it’s also not very clear which rules are enabled or disabled and what their configured values are
For example, it was mentioned that LongMethod is particularly valuable. However, do we know what exactly counts as "long"? How many lines? In a previous version it was 80 lines, and now it’s 60. Changes like this raise questions
The Detekt maintainers understand that rules often need to be adapted for a specific project and team. That’s why they allow generating the default configuration and customizing it. Additionally, some rules that could be useful for an SDK are disabled by default
I tried to address this in PR #501 by introducing an explicit configuration. However, for reasons I still don't fully understand, this change was later removed in #512. My intention was to keep a clear configuration file that explicitly shows which rules are enabled, what values they use, and allows us to track changes over time, whether they come from our adjustments or from detekt updates. Explicit configuration also helps avoid issues where the gradle plugin and the idea plugin behave differently. I also enabled some rules and relaxed others to better fit our sdk use case
I'm honestly a bit disappointed that these concerns seem to be ignored or dismissed. When it comes to team tooling decisions, I would really appreciate my perspective being considered as part of the team discussion. For example, detekt itself was initially introduced bypassing the maintainers and merged without their approval in #493
| // This is intentionally non-suspend to avoid blocking performSend. | ||
| val previousJob = sseJob | ||
| previousJob?.cancel() | ||
| sseJob = scope.launch(CoroutineName("StreamableHttpTransport.collect#${hashCode()}")) { |
There was a problem hiding this comment.
Is it possible that startSseSession can be invoked in parallel? If so the code can race: you may get a abandoned job that works in parallel to the main saved to sseJob
There was a problem hiding this comment.
Yes, I considered this scenario
Even with parallel calls, it should work correctly. The new job will wait for the previous one to finish
…th retry support (#596) (#585) Adds a comprehensive conformance test suite for the Kotlin MCP SDK, covering core protocol operations, tool calls, elicitation, resources, prompts, and 20 OAuth/auth scenarios - Conformance server and client implementations - OAuth/auth test scenarios: JWT, authorization code flow, client credentials, PKCE, scope handling, cross-app access, client registration - CI workflow - Baseline file for tracking expected failures - Shell script fixes: - #592 - #593 - #596 ## Remaining known failures (tracked issues, will be fixed directly in `main`) - [x] `tools-call-with-logging`, `tools-call-with-progress`, `tools-call-sampling`, `tools-call-elicitation`, `elicitation-sep1034-defaults`- see #599, - [x] `elicitation-sep1330-enums` - #587 #600 - [x] `initialize` - #588 - [x] `tools_call`, `auth/scope-step-up`, `auth/scope-retry-limit` - #589 - [ ] `elicitation-sep1034-client-defaults` - #414 - [x] `sse-retry` - #590 - [ ] `resources-templates-read` - #591 ## Breaking Changes from #596 - `StreamableHttpClientTransport` and `mcpStreamableHttp`/`mcpStreamableHttpTransport`: old constructors accepting `Duration` timeout are now `@Deprecated` — use the new overloads with `ReconnectionOptions` instead - `StreamableHttpClientTransport.close()` no longer calls `terminateSession()` automatically ## Types of changes - [x] Bug fix (non-breaking change which fixes an issue) - [x] New feature (non-breaking change which adds functionality) - [x] Breaking change (fix or feature that would cause existing functionality to change) - [ ] Documentation update ## Checklist - [x] I have read the [MCP Documentation](https://modelcontextprotocol.io) - [x] My code follows the repository's style guidelines - [x] New and existing tests pass locally - [x] I have added appropriate error handling - [x] I have added or updated documentation as needed --------- Co-authored-by: Konstantin Pavlov <1517853+kpavlov@users.noreply.github.com>
Add configurable SSE reconnection with exponential backoff and server-driven retry delays to
StreamableHttpClientTransportcloses #590
closes #420
How Has This Been Tested?
New unit tests and pass conformance test
Breaking Changes
old constructors are Deprecated
closeno longer callsterminateSessionTypes of changes
Checklist