From 45e780b8d7c4561c3f6a1201ef74a5286127050c Mon Sep 17 00:00:00 2001 From: Konstantin Pavlov <1517853+kpavlov@users.noreply.github.com> Date: Tue, 3 Mar 2026 23:52:44 +0200 Subject: [PATCH 1/3] fix(server): #573, #574: add customizable builder to StdioServerTransport Scope lifecycle is fixed (#574 resolved), processing dispatcher defaults to `Dispatchers.Default` (#573 resolved), scope context is clean (no spurious dispatcher stacking), and @Volatile on the three job vars addresses the visibility race. - Introduce a `Configuration` class for `StdioServerTransport` to improve API flexibility and readability. - Updated transport initialization to use a builder block for configuring parameters such as I/O streams, buffer sizes, dispatchers, and parent coroutine scope. Test: Added integration test validating the builder functionality. --- kotlin-sdk-server/api/kotlin-sdk-server.api | 23 ++ .../kotlin/sdk/server/StdioServerTransport.kt | 213 ++++++++++++------ .../sdk/server/StdioServerTransportTest.kt | 39 ++++ 3 files changed, 201 insertions(+), 74 deletions(-) diff --git a/kotlin-sdk-server/api/kotlin-sdk-server.api b/kotlin-sdk-server/api/kotlin-sdk-server.api index c0b0199d..669e950c 100644 --- a/kotlin-sdk-server/api/kotlin-sdk-server.api +++ b/kotlin-sdk-server/api/kotlin-sdk-server.api @@ -186,12 +186,35 @@ public final class io/modelcontextprotocol/kotlin/sdk/server/SseServerTransport } public final class io/modelcontextprotocol/kotlin/sdk/server/StdioServerTransport : io/modelcontextprotocol/kotlin/sdk/shared/AbstractTransport { + public fun (Lkotlin/jvm/functions/Function1;)V public fun (Lkotlinx/io/Source;Lkotlinx/io/Sink;)V public fun close (Lkotlin/coroutines/Continuation;)Ljava/lang/Object; public fun send (Lio/modelcontextprotocol/kotlin/sdk/types/JSONRPCMessage;Lio/modelcontextprotocol/kotlin/sdk/shared/TransportSendOptions;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; public fun start (Lkotlin/coroutines/Continuation;)Ljava/lang/Object; } +public final class io/modelcontextprotocol/kotlin/sdk/server/StdioServerTransport$Configuration { + public fun ()V + public final fun getCoroutineScope ()Lkotlinx/coroutines/CoroutineScope; + public final fun getProcessingJobDispatcher ()Lkotlinx/coroutines/CoroutineDispatcher; + public final fun getReadBufferSize ()J + public final fun getReadChannelBufferSize ()I + public final fun getReadingJobDispatcher ()Lkotlinx/coroutines/CoroutineDispatcher; + public final fun getSink ()Lkotlinx/io/Sink; + public final fun getSource ()Lkotlinx/io/Source; + public final fun getWriteChannelBufferSize ()I + public final fun getWritingJobDispatcher ()Lkotlinx/coroutines/CoroutineDispatcher; + public final fun setCoroutineScope (Lkotlinx/coroutines/CoroutineScope;)V + public final fun setProcessingJobDispatcher (Lkotlinx/coroutines/CoroutineDispatcher;)V + public final fun setReadBufferSize (J)V + public final fun setReadChannelBufferSize (I)V + public final fun setReadingJobDispatcher (Lkotlinx/coroutines/CoroutineDispatcher;)V + public final fun setSink (Lkotlinx/io/Sink;)V + public final fun setSource (Lkotlinx/io/Source;)V + public final fun setWriteChannelBufferSize (I)V + public final fun setWritingJobDispatcher (Lkotlinx/coroutines/CoroutineDispatcher;)V +} + public final class io/modelcontextprotocol/kotlin/sdk/server/StreamableHttpServerTransport : io/modelcontextprotocol/kotlin/sdk/shared/AbstractTransport { public static final field STANDALONE_SSE_STREAM_ID Ljava/lang/String; public fun ()V diff --git a/kotlin-sdk-server/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/server/StdioServerTransport.kt b/kotlin-sdk-server/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/server/StdioServerTransport.kt index 8fd46dd2..64eab8e1 100644 --- a/kotlin-sdk-server/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/server/StdioServerTransport.kt +++ b/kotlin-sdk-server/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/server/StdioServerTransport.kt @@ -8,7 +8,9 @@ import io.modelcontextprotocol.kotlin.sdk.shared.TransportSendOptions import io.modelcontextprotocol.kotlin.sdk.shared.serializeMessage import io.modelcontextprotocol.kotlin.sdk.types.JSONRPCMessage import kotlinx.coroutines.CancellationException +import kotlinx.coroutines.CoroutineDispatcher import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.Job import kotlinx.coroutines.NonCancellable import kotlinx.coroutines.SupervisorJob @@ -23,9 +25,9 @@ import kotlinx.io.Source import kotlinx.io.buffered import kotlinx.io.readByteArray import kotlinx.io.writeString +import kotlin.concurrent.Volatile import kotlin.concurrent.atomics.AtomicBoolean import kotlin.concurrent.atomics.ExperimentalAtomicApi -import kotlin.coroutines.CoroutineContext private const val READ_BUFFER_SIZE = 8192L @@ -34,25 +36,99 @@ private const val READ_BUFFER_SIZE = 8192L * * Reads from input [Source] and writes to output [Sink]. * - * @constructor Creates a new instance of [StdioServerTransport]. - * @param inputStream The input [Source] used to receive data. - * @param outputStream The output [Sink] used to send data. + * Example: + * ```kotlin + * val transport = StdioServerTransport { + * source = System.`in`.asInput(), + * sink = System.out.asSink(), + * } + * ``` + * + * @constructor Initializes the transport using the provided block for [Configuration]. + * The configuration includes specifying the input and output streams, buffer sizes, + * and dispatchers for I/O and processing tasks and coroutine scope. */ @OptIn(ExperimentalAtomicApi::class) -public class StdioServerTransport(private val inputStream: Source, outputStream: Sink) : AbstractTransport() { +public class StdioServerTransport(block: Configuration.() -> Unit) : AbstractTransport() { + + /** + * Configuration for [StdioServerTransport]. + * + * @property source The input [Source] used to receive data. + * @property sink The output [Sink] used to send data. + * @property readBufferSize The buffer size for the read channel. + * @property readingJobDispatcher The [CoroutineDispatcher] used for reading jobs. + * Defaults to [IODispatcher]. + * @property writingJobDispatcher The [CoroutineDispatcher] used for writing jobs. + * Defaults to [IODispatcher]. + * @property processingJobDispatcher The [CoroutineDispatcher] used for processing jobs. + * Defaults to [Dispatchers.Default]. + * @property readChannelBufferSize The buffer size for the read channel. + * @property writeChannelBufferSize The buffer size for the write channel. + * @property coroutineScope The [CoroutineScope] used for managing coroutines. + */ + @Suppress("LongParameterList") + public class Configuration internal constructor( + public var source: Source? = null, + public var sink: Sink? = null, + public var readBufferSize: Long = READ_BUFFER_SIZE, + public var readingJobDispatcher: CoroutineDispatcher = IODispatcher, + public var writingJobDispatcher: CoroutineDispatcher = IODispatcher, + public var processingJobDispatcher: CoroutineDispatcher = Dispatchers.Default, + public var readChannelBufferSize: Int = Channel.UNLIMITED, + public var writeChannelBufferSize: Int = Channel.UNLIMITED, + public var coroutineScope: CoroutineScope? = null, + ) + + private val source: Source + private val sink: Sink + private val processingJobDispatcher: CoroutineDispatcher + private val readingJobDispatcher: CoroutineDispatcher + private val writingJobDispatcher: CoroutineDispatcher + private val scope: CoroutineScope + private val readBufferSize: Long + private val readChannel: Channel + private val writeChannel: Channel + + init { + val config = Configuration().apply(block) + val input = requireNotNull(config.source) { "source is required" } + val output = requireNotNull(config.sink) { "sink is required" } + require(config.readBufferSize > 0) { "readBufferSize must be > 0" } + + source = input + processingJobDispatcher = config.processingJobDispatcher + readingJobDispatcher = config.readingJobDispatcher + writingJobDispatcher = config.writingJobDispatcher + val parentJob = config.coroutineScope?.coroutineContext?.get(Job) + scope = CoroutineScope(SupervisorJob(parentJob)) + readBufferSize = config.readBufferSize + readChannel = Channel(config.readChannelBufferSize) + writeChannel = Channel(config.writeChannelBufferSize) + sink = output.buffered() + } + + /** + * Creates a new instance of [StdioServerTransport] + * with the given [inputStream] [Source] and [outputStream] [Sink]. + */ + public constructor(inputStream: Source, outputStream: Sink) : this({ + source = inputStream + sink = outputStream + }) private val logger = KotlinLogging.logger {} private val readBuffer = ReadBuffer() private val initialized: AtomicBoolean = AtomicBoolean(false) + + @Volatile private var readingJob: Job? = null + + @Volatile private var sendingJob: Job? = null - private var processingJob: Job? = null - private val coroutineContext: CoroutineContext = IODispatcher + SupervisorJob() - private val scope = CoroutineScope(coroutineContext) - private val readChannel = Channel(Channel.UNLIMITED) - private val writeChannel = Channel(Channel.UNLIMITED) - private val outputSink = outputStream.buffered() + @Volatile + private var processingJob: Job? = null override suspend fun start() { if (!initialized.compareAndSet(expectedValue = false, newValue = true)) { @@ -69,81 +145,71 @@ public class StdioServerTransport(private val inputStream: Source, outputStream: sendingJob = launchSendingJob() } - private fun launchReadingJob(): Job { - val job = scope.launch { - val buf = Buffer() - @Suppress("TooGenericExceptionCaught") - try { - while (isActive) { - val bytesRead = inputStream.readAtMostTo(buf, READ_BUFFER_SIZE) - if (bytesRead == -1L) { - // EOF reached - break - } - if (bytesRead > 0) { - val chunk = buf.readByteArray() - readChannel.send(chunk) - } + private fun launchReadingJob(): Job = scope.launch(readingJobDispatcher) { + val buf = Buffer() + @Suppress("TooGenericExceptionCaught") + try { + while (isActive) { + val bytesRead = source.readAtMostTo(buf, readBufferSize) + if (bytesRead == -1L) { + // EOF reached + break + } + if (bytesRead > 0) { + val chunk = buf.readByteArray() + readChannel.send(chunk) } - } catch (e: CancellationException) { - throw e - } catch (e: Throwable) { - logger.error(e) { "Error reading from stdin" } - _onError.invoke(e) - } finally { - // Reached EOF or error, close connection - close() } + } catch (e: CancellationException) { + throw e + } catch (e: Throwable) { + logger.error(e) { "Error reading from stdin" } + _onError.invoke(e) + } finally { + // Reached EOF or error, close connection + close() } - job.invokeOnCompletion { cause -> - logJobCompletion("Message reading", cause) - } - return job + }.apply { + invokeOnCompletion { logJobCompletion("Message reading", it) } } - private fun launchProcessingJob(): Job { - val job = scope.launch { - @Suppress("TooGenericExceptionCaught") - try { - for (chunk in readChannel) { - readBuffer.append(chunk) - processReadBuffer() - } - } catch (e: CancellationException) { - throw e - } catch (e: Throwable) { - _onError.invoke(e) + private fun launchProcessingJob(): Job = scope.launch(processingJobDispatcher) { + @Suppress("TooGenericExceptionCaught") + try { + for (chunk in readChannel) { + readBuffer.append(chunk) + processReadBuffer() } + } catch (e: CancellationException) { + throw e + } catch (e: Throwable) { + _onError.invoke(e) } - job.invokeOnCompletion { cause -> - logJobCompletion("Processing", cause) - } - return job + }.apply { + invokeOnCompletion { logJobCompletion("Processing", it) } } - private fun launchSendingJob(): Job { - val job = scope.launch { - @Suppress("TooGenericExceptionCaught") - try { - for (message in writeChannel) { - val json = serializeMessage(message) - outputSink.writeString(json) - outputSink.flush() - } - } catch (e: CancellationException) { - throw e - } catch (e: Throwable) { - logger.error(e) { "Error writing to stdout" } - _onError.invoke(e) + private fun launchSendingJob(): Job = scope.launch(writingJobDispatcher) { + @Suppress("TooGenericExceptionCaught") + try { + for (message in writeChannel) { + val json = serializeMessage(message) + sink.writeString(json) + sink.flush() } + } catch (e: CancellationException) { + throw e + } catch (e: Throwable) { + logger.error(e) { "Error writing to stdout" } + _onError.invoke(e) } - job.invokeOnCompletion { cause -> + }.apply { + invokeOnCompletion { cause -> logJobCompletion("Message sending", cause) if (cause is CancellationException) { readingJob?.cancel(cause) } } - return job } private suspend fun processReadBuffer() { @@ -192,7 +258,7 @@ public class StdioServerTransport(private val inputStream: Source, outputStream: sendingJob?.cancelAndJoin() runCatching { - inputStream.close() + source.close() }.onFailure { logger.warn(it) { "Failed to close stdin" } } readingJob?.cancel() @@ -201,10 +267,9 @@ public class StdioServerTransport(private val inputStream: Source, outputStream: processingJob?.cancelAndJoin() readBuffer.clear() - runCatching { - outputSink.flush() - outputSink.close() + sink.flush() + sink.close() }.onFailure { logger.warn(it) { "Failed to close stdout" } } invokeOnCloseCallback() diff --git a/kotlin-sdk-server/src/jvmTest/kotlin/io/modelcontextprotocol/kotlin/sdk/server/StdioServerTransportTest.kt b/kotlin-sdk-server/src/jvmTest/kotlin/io/modelcontextprotocol/kotlin/sdk/server/StdioServerTransportTest.kt index dcbb7a4b..88ca3122 100644 --- a/kotlin-sdk-server/src/jvmTest/kotlin/io/modelcontextprotocol/kotlin/sdk/server/StdioServerTransportTest.kt +++ b/kotlin-sdk-server/src/jvmTest/kotlin/io/modelcontextprotocol/kotlin/sdk/server/StdioServerTransportTest.kt @@ -5,6 +5,8 @@ import io.kotest.assertions.throwables.shouldThrow import io.kotest.assertions.withClue import io.kotest.matchers.collections.shouldContain import io.kotest.matchers.shouldBe +import io.ktor.client.utils.clientDispatcher +import io.ktor.utils.io.InternalAPI import io.modelcontextprotocol.kotlin.sdk.shared.ReadBuffer import io.modelcontextprotocol.kotlin.sdk.shared.serializeMessage import io.modelcontextprotocol.kotlin.sdk.types.InitializedNotification @@ -14,7 +16,10 @@ import io.modelcontextprotocol.kotlin.sdk.types.toJSON import io.modelcontextprotocol.kotlin.test.utils.runIntegrationTest import kotlinx.coroutines.CancellationException import kotlinx.coroutines.CompletableDeferred +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.TimeoutCancellationException +import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.withTimeout import kotlinx.io.Buffer import kotlinx.io.RawSink @@ -69,6 +74,40 @@ class StdioServerTransportTest { printOutput = output.asSink().buffered() } + @OptIn(InternalAPI::class) + @Test + fun `should construct with builder`() = runIntegrationTest { + val received = CompletableDeferred() + + val ioDispatcher = Dispatchers.IO.limitedParallelism(4) + + // Set every configuration parameter explicitly with non-default values, + // then verify a message round-trips correctly. + val server = StdioServerTransport { + source = bufferedInput + sink = printOutput + readBufferSize = 16L // non-default: smaller read chunk + readingJobDispatcher = ioDispatcher // non-default: limited parallelism + writingJobDispatcher = ioDispatcher // non-default: limited parallelism + processingJobDispatcher = Dispatchers.clientDispatcher(2, "Worker") // non-default + readChannelBufferSize = Channel.BUFFERED // non-default: bounded + writeChannelBufferSize = Channel.BUFFERED // non-default: bounded + coroutineScope = CoroutineScope(Dispatchers.Default) // non-default: parent scope provided + } + server.onError { throw it } + server.onMessage { received.complete(it) } + + server.start() + + val message = PingRequest().toJSON() + inputWriter.write(serializeMessage(message)) + inputWriter.flush() + + received.await() shouldBe message + + server.close() + } + @Test fun `should be safe to close before start`() = runIntegrationTest { val server = StdioServerTransport(bufferedInput, printOutput) From 570184def6a4841635623510f32d6477a461290b Mon Sep 17 00:00:00 2001 From: Konstantin Pavlov <1517853+kpavlov@users.noreply.github.com> Date: Tue, 3 Mar 2026 23:53:13 +0200 Subject: [PATCH 2/3] chore(detekt): exclude `InjectDispatcher` for test folders and update baselines - Added `InjectDispatcher` rule to detekt exclusions for test folders in `detekt.yml`. - Removed resolved `InjectDispatcher` issues from baseline files. --- config/detekt/detekt.yml | 4 ++++ integration-test/detekt-baseline-test.xml | 11 ----------- .../sdk/integration/AbstractAuthenticationTest.kt | 1 - kotlin-sdk-client/detekt-baseline-test.xml | 2 -- 4 files changed, 4 insertions(+), 14 deletions(-) diff --git a/config/detekt/detekt.yml b/config/detekt/detekt.yml index 8c19749c..2cb135ab 100644 --- a/config/detekt/detekt.yml +++ b/config/detekt/detekt.yml @@ -14,6 +14,10 @@ complexity: LongMethod: excludes: *testFolders +coroutines: + InjectDispatcher: + excludes: *testFolders + empty-blocks: EmptyFunctionBlock: excludes: *testFolders diff --git a/integration-test/detekt-baseline-test.xml b/integration-test/detekt-baseline-test.xml index 710b7293..cd94eac6 100644 --- a/integration-test/detekt-baseline-test.xml +++ b/integration-test/detekt-baseline-test.xml @@ -8,17 +8,6 @@ AbstractClassCanBeConcreteClass:BaseTransportTest.kt:BaseTransportTest$BaseTransportTest CyclomaticComplexMethod:AbstractToolIntegrationTest.kt:AbstractToolIntegrationTest$private fun setupCalculatorTool ForbiddenComment:StdioClientTransportTest.kt:StdioClientTransportTest$// TODO: fix running on windows - InjectDispatcher:AbstractKotlinClientTsServerTest.kt:AbstractKotlinClientTsServerTest$IO - InjectDispatcher:AbstractPromptIntegrationTest.kt:AbstractPromptIntegrationTest$IO - InjectDispatcher:AbstractResourceIntegrationTest.kt:AbstractResourceIntegrationTest$IO - InjectDispatcher:AbstractToolIntegrationTest.kt:AbstractToolIntegrationTest$IO - InjectDispatcher:KotlinClientTsServerEdgeCasesTestSse.kt:KotlinClientTsServerEdgeCasesTestSse$IO - InjectDispatcher:KotlinClientTsServerEdgeCasesTestStdio.kt:KotlinClientTsServerEdgeCasesTestStdio$IO - InjectDispatcher:SseIntegrationTest.kt:SseIntegrationTest$IO - InjectDispatcher:StdioClientTransportTest.kt:StdioClientTransportTest$IO - InjectDispatcher:StreamableHttpIntegrationTest.kt:StreamableHttpIntegrationTest$IO - InjectDispatcher:TsEdgeCasesTestSse.kt:TsEdgeCasesTestSse$IO - InjectDispatcher:WebSocketIntegrationTest.kt:WebSocketIntegrationTest$IO MatchingDeclarationName:PromptIntegrationTestSse.kt:SchemaPromptIntegrationTestSse : AbstractPromptIntegrationTest SleepInsteadOfDelay:KotlinServerForTsClientSse.kt:KotlinServerForTsClient$sleep(500) ThrowsCount:AbstractPromptIntegrationTest.kt:AbstractPromptIntegrationTest$override fun configureServer diff --git a/integration-test/src/jvmTest/kotlin/io/modelcontextprotocol/kotlin/sdk/integration/AbstractAuthenticationTest.kt b/integration-test/src/jvmTest/kotlin/io/modelcontextprotocol/kotlin/sdk/integration/AbstractAuthenticationTest.kt index 1e88558f..ec420414 100644 --- a/integration-test/src/jvmTest/kotlin/io/modelcontextprotocol/kotlin/sdk/integration/AbstractAuthenticationTest.kt +++ b/integration-test/src/jvmTest/kotlin/io/modelcontextprotocol/kotlin/sdk/integration/AbstractAuthenticationTest.kt @@ -42,7 +42,6 @@ import io.ktor.server.sse.SSE as ServerSSE /** * Base class for MCP authentication integration tests. */ -@Suppress("InjectDispatcher") abstract class AbstractAuthenticationTest { protected companion object { diff --git a/kotlin-sdk-client/detekt-baseline-test.xml b/kotlin-sdk-client/detekt-baseline-test.xml index b16e8eb1..dd6adfab 100644 --- a/kotlin-sdk-client/detekt-baseline-test.xml +++ b/kotlin-sdk-client/detekt-baseline-test.xml @@ -4,8 +4,6 @@ AbstractClassCanBeConcreteClass:AbstractStreamableHttpClientTest.kt:AbstractStreamableHttpClientTest$AbstractStreamableHttpClientTest ForbiddenComment:StreamableHttpClientTest.kt:StreamableHttpClientTest$// TODO: how to get notifications via Client API? - InjectDispatcher:StdioClientTransportErrorHandlingTest.kt:StdioClientTransportErrorHandlingTest$IO - InjectDispatcher:StreamableHttpClientTransportTest.kt:StreamableHttpClientTransportTest$Default LongParameterList:MockMcp.kt:MockMcp$fun handleJSONRPCRequest LongParameterList:MockMcp.kt:MockMcp$fun handleWithResult LongParameterList:MockMcp.kt:MockMcp$fun onInitialize From 469352910e7363f0c4741f06ad350ccd62b63db2 Mon Sep 17 00:00:00 2001 From: Konstantin Pavlov <1517853+kpavlov@users.noreply.github.com> Date: Wed, 4 Mar 2026 23:25:17 +0200 Subject: [PATCH 3/3] refactor(server): simplify `StdioServerTransport` constructor and remove `Configuration` class - Replaced the builder-based setup with a streamlined primary constructor for `StdioServerTransport`. - Removed the `Configuration` class for reduced complexity and enhanced readability. - Updated `StdioServerTransportTest` to reflect the refactored initialization. - Refactor cleanup. --- kotlin-sdk-server/api/kotlin-sdk-server.api | 25 +--- .../kotlin/sdk/server/StdioServerTransport.kt | 133 +++++++----------- .../sdk/server/StdioServerTransportTest.kt | 28 ++-- 3 files changed, 63 insertions(+), 123 deletions(-) diff --git a/kotlin-sdk-server/api/kotlin-sdk-server.api b/kotlin-sdk-server/api/kotlin-sdk-server.api index 669e950c..ad5208c6 100644 --- a/kotlin-sdk-server/api/kotlin-sdk-server.api +++ b/kotlin-sdk-server/api/kotlin-sdk-server.api @@ -186,35 +186,14 @@ public final class io/modelcontextprotocol/kotlin/sdk/server/SseServerTransport } public final class io/modelcontextprotocol/kotlin/sdk/server/StdioServerTransport : io/modelcontextprotocol/kotlin/sdk/shared/AbstractTransport { - public fun (Lkotlin/jvm/functions/Function1;)V public fun (Lkotlinx/io/Source;Lkotlinx/io/Sink;)V + public fun (Lkotlinx/io/Source;Lkotlinx/io/Sink;JLkotlinx/coroutines/channels/Channel;Lkotlinx/coroutines/channels/Channel;Lkotlinx/coroutines/CoroutineDispatcher;Lkotlinx/coroutines/CoroutineDispatcher;Lkotlinx/coroutines/CoroutineDispatcher;Lkotlinx/coroutines/CoroutineScope;)V + public synthetic fun (Lkotlinx/io/Source;Lkotlinx/io/Sink;JLkotlinx/coroutines/channels/Channel;Lkotlinx/coroutines/channels/Channel;Lkotlinx/coroutines/CoroutineDispatcher;Lkotlinx/coroutines/CoroutineDispatcher;Lkotlinx/coroutines/CoroutineDispatcher;Lkotlinx/coroutines/CoroutineScope;ILkotlin/jvm/internal/DefaultConstructorMarker;)V public fun close (Lkotlin/coroutines/Continuation;)Ljava/lang/Object; public fun send (Lio/modelcontextprotocol/kotlin/sdk/types/JSONRPCMessage;Lio/modelcontextprotocol/kotlin/sdk/shared/TransportSendOptions;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; public fun start (Lkotlin/coroutines/Continuation;)Ljava/lang/Object; } -public final class io/modelcontextprotocol/kotlin/sdk/server/StdioServerTransport$Configuration { - public fun ()V - public final fun getCoroutineScope ()Lkotlinx/coroutines/CoroutineScope; - public final fun getProcessingJobDispatcher ()Lkotlinx/coroutines/CoroutineDispatcher; - public final fun getReadBufferSize ()J - public final fun getReadChannelBufferSize ()I - public final fun getReadingJobDispatcher ()Lkotlinx/coroutines/CoroutineDispatcher; - public final fun getSink ()Lkotlinx/io/Sink; - public final fun getSource ()Lkotlinx/io/Source; - public final fun getWriteChannelBufferSize ()I - public final fun getWritingJobDispatcher ()Lkotlinx/coroutines/CoroutineDispatcher; - public final fun setCoroutineScope (Lkotlinx/coroutines/CoroutineScope;)V - public final fun setProcessingJobDispatcher (Lkotlinx/coroutines/CoroutineDispatcher;)V - public final fun setReadBufferSize (J)V - public final fun setReadChannelBufferSize (I)V - public final fun setReadingJobDispatcher (Lkotlinx/coroutines/CoroutineDispatcher;)V - public final fun setSink (Lkotlinx/io/Sink;)V - public final fun setSource (Lkotlinx/io/Source;)V - public final fun setWriteChannelBufferSize (I)V - public final fun setWritingJobDispatcher (Lkotlinx/coroutines/CoroutineDispatcher;)V -} - public final class io/modelcontextprotocol/kotlin/sdk/server/StreamableHttpServerTransport : io/modelcontextprotocol/kotlin/sdk/shared/AbstractTransport { public static final field STANDALONE_SSE_STREAM_ID Ljava/lang/String; public fun ()V diff --git a/kotlin-sdk-server/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/server/StdioServerTransport.kt b/kotlin-sdk-server/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/server/StdioServerTransport.kt index 64eab8e1..e9df85ec 100644 --- a/kotlin-sdk-server/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/server/StdioServerTransport.kt +++ b/kotlin-sdk-server/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/server/StdioServerTransport.kt @@ -14,7 +14,7 @@ import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.Job import kotlinx.coroutines.NonCancellable import kotlinx.coroutines.SupervisorJob -import kotlinx.coroutines.cancelAndJoin +import kotlinx.coroutines.cancel import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.isActive import kotlinx.coroutines.launch @@ -25,7 +25,6 @@ import kotlinx.io.Source import kotlinx.io.buffered import kotlinx.io.readByteArray import kotlinx.io.writeString -import kotlin.concurrent.Volatile import kotlin.concurrent.atomics.AtomicBoolean import kotlin.concurrent.atomics.ExperimentalAtomicApi @@ -34,115 +33,80 @@ private const val READ_BUFFER_SIZE = 8192L /** * A server transport that communicates with a client via standard I/O. * - * Reads from input [Source] and writes to output [Sink]. + * [StdioServerTransport] manages the communication between a JSON-RPC server and its clients + * by reading incoming messages from the specified [Source] (input stream) and writing outgoing + * messages to the [Sink] (output stream). * * Example: * ```kotlin - * val transport = StdioServerTransport { - * source = System.`in`.asInput(), - * sink = System.out.asSink(), - * } + * val transport = StdioServerTransport( + * source = System.`in`.asInput() + * sink = System.out.asSink() + * ) * ``` * - * @constructor Initializes the transport using the provided block for [Configuration]. - * The configuration includes specifying the input and output streams, buffer sizes, - * and dispatchers for I/O and processing tasks and coroutine scope. + * @constructor Creates an instance of [StdioServerTransport] with the specified parameters. + * @property source The source for reading incoming messages (e.g., stdin or other readable stream). + * @param sink The sink for writing outgoing messages (e.g., stdout or other writable stream). + * @property readBufferSize The maximum size of the read buffer, defaults to a pre-configured constant. + * @property readChannel The channel for receiving raw byte arrays from the input stream. + * @property writeChannel The channel for sending serialized JSON-RPC messages to the output stream. + * @property readingJobDispatcher The dispatcher to use for the message-reading coroutine. + * @property writingJobDispatcher The dispatcher to use for the message-writing coroutine. + * @property processingJobDispatcher The dispatcher to handle processing of read messages. + * @param coroutineScope Optional coroutine scope to use for managing internal jobs. A new scope + * will be created if not provided. */ @OptIn(ExperimentalAtomicApi::class) -public class StdioServerTransport(block: Configuration.() -> Unit) : AbstractTransport() { +@Suppress("LongParameterList") +public class StdioServerTransport( + private val source: Source, + sink: Sink, + private val readBufferSize: Long = READ_BUFFER_SIZE, + private val readChannel: Channel = Channel(Channel.UNLIMITED), + private val writeChannel: Channel = Channel(Channel.UNLIMITED), + private var readingJobDispatcher: CoroutineDispatcher = IODispatcher, + private var writingJobDispatcher: CoroutineDispatcher = IODispatcher, + private var processingJobDispatcher: CoroutineDispatcher = Dispatchers.Default, + coroutineScope: CoroutineScope? = null, +) : AbstractTransport() { - /** - * Configuration for [StdioServerTransport]. - * - * @property source The input [Source] used to receive data. - * @property sink The output [Sink] used to send data. - * @property readBufferSize The buffer size for the read channel. - * @property readingJobDispatcher The [CoroutineDispatcher] used for reading jobs. - * Defaults to [IODispatcher]. - * @property writingJobDispatcher The [CoroutineDispatcher] used for writing jobs. - * Defaults to [IODispatcher]. - * @property processingJobDispatcher The [CoroutineDispatcher] used for processing jobs. - * Defaults to [Dispatchers.Default]. - * @property readChannelBufferSize The buffer size for the read channel. - * @property writeChannelBufferSize The buffer size for the write channel. - * @property coroutineScope The [CoroutineScope] used for managing coroutines. - */ - @Suppress("LongParameterList") - public class Configuration internal constructor( - public var source: Source? = null, - public var sink: Sink? = null, - public var readBufferSize: Long = READ_BUFFER_SIZE, - public var readingJobDispatcher: CoroutineDispatcher = IODispatcher, - public var writingJobDispatcher: CoroutineDispatcher = IODispatcher, - public var processingJobDispatcher: CoroutineDispatcher = Dispatchers.Default, - public var readChannelBufferSize: Int = Channel.UNLIMITED, - public var writeChannelBufferSize: Int = Channel.UNLIMITED, - public var coroutineScope: CoroutineScope? = null, - ) - - private val source: Source - private val sink: Sink - private val processingJobDispatcher: CoroutineDispatcher - private val readingJobDispatcher: CoroutineDispatcher - private val writingJobDispatcher: CoroutineDispatcher private val scope: CoroutineScope - private val readBufferSize: Long - private val readChannel: Channel - private val writeChannel: Channel + private val sink: Sink init { - val config = Configuration().apply(block) - val input = requireNotNull(config.source) { "source is required" } - val output = requireNotNull(config.sink) { "sink is required" } - require(config.readBufferSize > 0) { "readBufferSize must be > 0" } - - source = input - processingJobDispatcher = config.processingJobDispatcher - readingJobDispatcher = config.readingJobDispatcher - writingJobDispatcher = config.writingJobDispatcher - val parentJob = config.coroutineScope?.coroutineContext?.get(Job) + require(readBufferSize > 0) { "readBufferSize must be > 0" } + val parentJob = coroutineScope?.coroutineContext?.get(Job) scope = CoroutineScope(SupervisorJob(parentJob)) - readBufferSize = config.readBufferSize - readChannel = Channel(config.readChannelBufferSize) - writeChannel = Channel(config.writeChannelBufferSize) - sink = output.buffered() + this.sink = sink.buffered() } /** * Creates a new instance of [StdioServerTransport] * with the given [inputStream] [Source] and [outputStream] [Sink]. */ - public constructor(inputStream: Source, outputStream: Sink) : this({ - source = inputStream - sink = outputStream - }) + public constructor(inputStream: Source, outputStream: Sink) : this( + source = inputStream, + sink = outputStream, + ) private val logger = KotlinLogging.logger {} private val readBuffer = ReadBuffer() private val initialized: AtomicBoolean = AtomicBoolean(false) - @Volatile - private var readingJob: Job? = null - - @Volatile - private var sendingJob: Job? = null - - @Volatile - private var processingJob: Job? = null - override suspend fun start() { if (!initialized.compareAndSet(expectedValue = false, newValue = true)) { error("StdioServerTransport already started!") } // Launch a coroutine to read from stdin - readingJob = launchReadingJob() + launchReadingJob() // Launch a coroutine to process messages from readChannel - processingJob = launchProcessingJob() + launchProcessingJob() // Launch a coroutine to handle message sending - sendingJob = launchSendingJob() + launchSendingJob() } private fun launchReadingJob(): Job = scope.launch(readingJobDispatcher) { @@ -186,7 +150,9 @@ public class StdioServerTransport(block: Configuration.() -> Unit) : AbstractTra _onError.invoke(e) } }.apply { - invokeOnCompletion { logJobCompletion("Processing", it) } + invokeOnCompletion { cause -> + logJobCompletion("Processing", cause) + } } private fun launchSendingJob(): Job = scope.launch(writingJobDispatcher) { @@ -207,7 +173,7 @@ public class StdioServerTransport(block: Configuration.() -> Unit) : AbstractTra invokeOnCompletion { cause -> logJobCompletion("Message sending", cause) if (cause is CancellationException) { - readingJob?.cancel(cause) + readChannel.cancel(cause) } } } @@ -255,23 +221,22 @@ public class StdioServerTransport(block: Configuration.() -> Unit) : AbstractTra withContext(NonCancellable) { writeChannel.close() - sendingJob?.cancelAndJoin() runCatching { source.close() }.onFailure { logger.warn(it) { "Failed to close stdin" } } - readingJob?.cancel() readChannel.close() - processingJob?.cancelAndJoin() - readBuffer.clear() runCatching { sink.flush() sink.close() }.onFailure { logger.warn(it) { "Failed to close stdout" } } + scope.cancel() + scope.coroutineContext[Job]?.join() + invokeOnCloseCallback() } } diff --git a/kotlin-sdk-server/src/jvmTest/kotlin/io/modelcontextprotocol/kotlin/sdk/server/StdioServerTransportTest.kt b/kotlin-sdk-server/src/jvmTest/kotlin/io/modelcontextprotocol/kotlin/sdk/server/StdioServerTransportTest.kt index 88ca3122..5a84c33f 100644 --- a/kotlin-sdk-server/src/jvmTest/kotlin/io/modelcontextprotocol/kotlin/sdk/server/StdioServerTransportTest.kt +++ b/kotlin-sdk-server/src/jvmTest/kotlin/io/modelcontextprotocol/kotlin/sdk/server/StdioServerTransportTest.kt @@ -5,8 +5,6 @@ import io.kotest.assertions.throwables.shouldThrow import io.kotest.assertions.withClue import io.kotest.matchers.collections.shouldContain import io.kotest.matchers.shouldBe -import io.ktor.client.utils.clientDispatcher -import io.ktor.utils.io.InternalAPI import io.modelcontextprotocol.kotlin.sdk.shared.ReadBuffer import io.modelcontextprotocol.kotlin.sdk.shared.serializeMessage import io.modelcontextprotocol.kotlin.sdk.types.InitializedNotification @@ -19,6 +17,7 @@ import kotlinx.coroutines.CompletableDeferred import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.TimeoutCancellationException +import kotlinx.coroutines.channels.BufferOverflow import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.withTimeout import kotlinx.io.Buffer @@ -74,26 +73,23 @@ class StdioServerTransportTest { printOutput = output.asSink().buffered() } - @OptIn(InternalAPI::class) @Test fun `should construct with builder`() = runIntegrationTest { val received = CompletableDeferred() - val ioDispatcher = Dispatchers.IO.limitedParallelism(4) - // Set every configuration parameter explicitly with non-default values, // then verify a message round-trips correctly. - val server = StdioServerTransport { - source = bufferedInput - sink = printOutput - readBufferSize = 16L // non-default: smaller read chunk - readingJobDispatcher = ioDispatcher // non-default: limited parallelism - writingJobDispatcher = ioDispatcher // non-default: limited parallelism - processingJobDispatcher = Dispatchers.clientDispatcher(2, "Worker") // non-default - readChannelBufferSize = Channel.BUFFERED // non-default: bounded - writeChannelBufferSize = Channel.BUFFERED // non-default: bounded - coroutineScope = CoroutineScope(Dispatchers.Default) // non-default: parent scope provided - } + val server = StdioServerTransport( + source = bufferedInput, + sink = printOutput, + readBufferSize = 16L, // non-default: smaller read chunk + readingJobDispatcher = Dispatchers.IO.limitedParallelism(4, "Read"), // non-default: limited parallelism + writingJobDispatcher = Dispatchers.IO.limitedParallelism(4, "Write"), // non-default: limited parallelism + processingJobDispatcher = Dispatchers.IO.limitedParallelism(2, name = "Worker"), // non-default + readChannel = Channel(capacity = 8, onBufferOverflow = BufferOverflow.SUSPEND), // non-default: bounded + writeChannel = Channel(capacity = 16, onBufferOverflow = BufferOverflow.SUSPEND), // non-default: bounded + coroutineScope = CoroutineScope(Dispatchers.Default), // non-default: parent scope provided + ) server.onError { throw it } server.onMessage { received.complete(it) }