From bc217b6c74414158558f6a77709affbc8519dccb Mon Sep 17 00:00:00 2001 From: Konstantin Pavlov <1517853+kpavlov@users.noreply.github.com> Date: Mon, 1 Dec 2025 21:52:26 +0200 Subject: [PATCH 1/5] Refactor `StdioClientTransport`: add structured concurrency, error handling, and support for optional error stream. Update dependencies in `libs.versions.toml`. --- gradle/libs.versions.toml | 10 +- kotlin-sdk-client/api/kotlin-sdk-client.api | 12 + kotlin-sdk-client/build.gradle.kts | 8 +- .../kotlin/sdk/client/StdioClientTransport.kt | 285 ++++++++++++++---- .../kotlin/sdk/shared/AbstractTransport.kt | 2 +- .../kotlin/sdk/shared/BaseTransportTest.kt | 4 + .../sdk/client/StdioClientTransportTest.kt | 53 +++- .../KotlinClientTsServerEdgeCasesTestStdio.kt | 5 +- 8 files changed, 301 insertions(+), 78 deletions(-) diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index 12d8b370..0b31051c 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -11,16 +11,17 @@ binaryCompatibilityValidatorPlugin = "0.18.1" openapi-generator = "7.17.0" # libraries version -serialization = "1.9.0" +awaitility = "4.3.0" collections-immutable = "0.4.0" coroutines = "1.10.2" +kotest = "6.0.7" kotlinx-io = "0.8.2" ktor = "3.2.3" logging = "7.0.13" -slf4j = "2.0.17" -kotest = "6.0.7" -awaitility = "4.3.0" +mockk = "1.14.6" mokksy = "0.6.2" +serialization = "1.9.0" +slf4j = "2.0.17" [libraries] # Plugins @@ -53,6 +54,7 @@ kotest-assertions-json = { group = "io.kotest", name = "kotest-assertions-json", kotlinx-coroutines-test = { group = "org.jetbrains.kotlinx", name = "kotlinx-coroutines-test", version.ref = "coroutines" } ktor-client-mock = { group = "io.ktor", name = "ktor-client-mock", version.ref = "ktor" } ktor-server-test-host = { group = "io.ktor", name = "ktor-server-test-host", version.ref = "ktor" } +mockk = { module = "io.mockk:mockk", version.ref = "mockk" } mokksy = { group = "dev.mokksy", name = "mokksy", version.ref = "mokksy" } netty-bom = { group = "io.netty", name = "netty-bom", version.ref = "netty" } slf4j-simple = { group = "org.slf4j", name = "slf4j-simple", version.ref = "slf4j" } diff --git a/kotlin-sdk-client/api/kotlin-sdk-client.api b/kotlin-sdk-client/api/kotlin-sdk-client.api index 9186512c..f1807d55 100644 --- a/kotlin-sdk-client/api/kotlin-sdk-client.api +++ b/kotlin-sdk-client/api/kotlin-sdk-client.api @@ -62,6 +62,13 @@ public final class io/modelcontextprotocol/kotlin/sdk/client/KtorClientKt { public static synthetic fun mcpSseTransport-5_5nbZA$default (Lio/ktor/client/HttpClient;Ljava/lang/String;Lkotlin/time/Duration;Lkotlin/jvm/functions/Function1;ILjava/lang/Object;)Lio/modelcontextprotocol/kotlin/sdk/client/SseClientTransport; } +public final class io/modelcontextprotocol/kotlin/sdk/client/OldStdioClientTransport : io/modelcontextprotocol/kotlin/sdk/shared/AbstractTransport { + public fun (Lkotlinx/io/Source;Lkotlinx/io/Sink;)V + public fun close (Lkotlin/coroutines/Continuation;)Ljava/lang/Object; + public fun send (Lio/modelcontextprotocol/kotlin/sdk/types/JSONRPCMessage;Lio/modelcontextprotocol/kotlin/sdk/shared/TransportSendOptions;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; + public fun start (Lkotlin/coroutines/Continuation;)Ljava/lang/Object; +} + public final class io/modelcontextprotocol/kotlin/sdk/client/SseClientTransport : io/modelcontextprotocol/kotlin/sdk/shared/AbstractTransport { public synthetic fun (Lio/ktor/client/HttpClient;Ljava/lang/String;Lkotlin/time/Duration;Lkotlin/jvm/functions/Function1;ILkotlin/jvm/internal/DefaultConstructorMarker;)V public synthetic fun (Lio/ktor/client/HttpClient;Ljava/lang/String;Lkotlin/time/Duration;Lkotlin/jvm/functions/Function1;Lkotlin/jvm/internal/DefaultConstructorMarker;)V @@ -71,7 +78,12 @@ public final class io/modelcontextprotocol/kotlin/sdk/client/SseClientTransport } public final class io/modelcontextprotocol/kotlin/sdk/client/StdioClientTransport : io/modelcontextprotocol/kotlin/sdk/shared/AbstractTransport { + public static final field BUFFER_SIZE J public fun (Lkotlinx/io/Source;Lkotlinx/io/Sink;)V + public fun (Lkotlinx/io/Source;Lkotlinx/io/Sink;Lkotlinx/io/Source;)V + public fun (Lkotlinx/io/Source;Lkotlinx/io/Sink;Lkotlinx/io/Source;Lkotlinx/coroutines/channels/Channel;)V + public fun (Lkotlinx/io/Source;Lkotlinx/io/Sink;Lkotlinx/io/Source;Lkotlinx/coroutines/channels/Channel;Lkotlin/jvm/functions/Function1;)V + public synthetic fun (Lkotlinx/io/Source;Lkotlinx/io/Sink;Lkotlinx/io/Source;Lkotlinx/coroutines/channels/Channel;Lkotlin/jvm/functions/Function1;ILkotlin/jvm/internal/DefaultConstructorMarker;)V public fun close (Lkotlin/coroutines/Continuation;)Ljava/lang/Object; public fun send (Lio/modelcontextprotocol/kotlin/sdk/types/JSONRPCMessage;Lio/modelcontextprotocol/kotlin/sdk/shared/TransportSendOptions;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; public fun start (Lkotlin/coroutines/Continuation;)Ljava/lang/Object; diff --git a/kotlin-sdk-client/build.gradle.kts b/kotlin-sdk-client/build.gradle.kts index 473c32b5..4755ac7c 100644 --- a/kotlin-sdk-client/build.gradle.kts +++ b/kotlin-sdk-client/build.gradle.kts @@ -41,18 +41,20 @@ kotlin { commonTest { dependencies { implementation(kotlin("test")) - implementation(libs.ktor.client.mock) - implementation(libs.ktor.server.websockets) + implementation(libs.kotest.assertions.core) implementation(libs.kotlinx.coroutines.test) implementation(libs.ktor.client.logging) + implementation(libs.ktor.client.mock) + implementation(libs.ktor.server.websockets) } } jvmTest { dependencies { - implementation(libs.mokksy) implementation(libs.awaitility) implementation(libs.ktor.client.apache5) + implementation(libs.mockk) + implementation(libs.mokksy) implementation(dependencies.platform(libs.netty.bom)) runtimeOnly(libs.slf4j.simple) } diff --git a/kotlin-sdk-client/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/client/StdioClientTransport.kt b/kotlin-sdk-client/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/client/StdioClientTransport.kt index 547079fb..8a54cf64 100644 --- a/kotlin-sdk-client/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/client/StdioClientTransport.kt +++ b/kotlin-sdk-client/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/client/StdioClientTransport.kt @@ -7,24 +7,36 @@ import io.modelcontextprotocol.kotlin.sdk.shared.ReadBuffer import io.modelcontextprotocol.kotlin.sdk.shared.TransportSendOptions import io.modelcontextprotocol.kotlin.sdk.shared.serializeMessage import io.modelcontextprotocol.kotlin.sdk.types.JSONRPCMessage +import io.modelcontextprotocol.kotlin.sdk.types.McpException +import io.modelcontextprotocol.kotlin.sdk.types.RPCError.ErrorCode.CONNECTION_CLOSED +import io.modelcontextprotocol.kotlin.sdk.types.RPCError.ErrorCode.INTERNAL_ERROR import kotlinx.coroutines.CoroutineName import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.Job import kotlinx.coroutines.SupervisorJob +import kotlinx.coroutines.cancel import kotlinx.coroutines.cancelAndJoin import kotlinx.coroutines.channels.Channel +import kotlinx.coroutines.channels.ClosedSendChannelException +import kotlinx.coroutines.channels.ProducerScope import kotlinx.coroutines.channels.consumeEach +import kotlinx.coroutines.flow.channelFlow import kotlinx.coroutines.isActive import kotlinx.coroutines.launch +import kotlinx.coroutines.yield import kotlinx.io.Buffer +import kotlinx.io.IOException import kotlinx.io.Sink import kotlinx.io.Source import kotlinx.io.buffered import kotlinx.io.readByteArray import kotlinx.io.writeString +import kotlinx.serialization.SerializationException import kotlin.concurrent.atomics.AtomicBoolean import kotlin.concurrent.atomics.ExperimentalAtomicApi import kotlin.coroutines.CoroutineContext +import kotlin.jvm.JvmOverloads /** * A transport implementation for JSON-RPC communication that leverages standard input and output streams. @@ -32,104 +44,249 @@ import kotlin.coroutines.CoroutineContext * This class reads from an input stream to process incoming JSON-RPC messages and writes JSON-RPC messages * to an output stream. * + * Uses structured concurrency principles: + * - Parent job controls all child coroutines + * - Proper cancellation propagation + * - Resource cleanup guaranteed via structured concurrency + * * @param input The input stream where messages are received. * @param output The output stream where messages are sent. + * @param error Optional error stream for stderr processing. + * @param processStdError Callback for stderr lines. Returns true for fatal errors. */ @OptIn(ExperimentalAtomicApi::class) -public class StdioClientTransport(private val input: Source, private val output: Sink) : AbstractTransport() { +public class StdioClientTransport @JvmOverloads public constructor( + private val input: Source, + private val output: Sink, + private val error: Source? = null, + private val sendChannel: Channel = Channel(Channel.BUFFERED), + private val processStdError: (String) -> Boolean = { true }, +) : AbstractTransport() { + + private companion object { + /** + * Buffer size for I/O operations. + * 8KB is optimal for most systems (matches default page size). + */ + const val BUFFER_SIZE = 8 * 1024L + } + private val logger = KotlinLogging.logger {} + private val ioCoroutineContext: CoroutineContext = IODispatcher - private val scope by lazy { - CoroutineScope(ioCoroutineContext + SupervisorJob()) - } - private var job: Job? = null - private val initialized: AtomicBoolean = AtomicBoolean(false) - private val sendChannel = Channel(Channel.UNLIMITED) - private val readBuffer = ReadBuffer() + private val scope = CoroutineScope(SupervisorJob() + Dispatchers.Default) + private val initialized = AtomicBoolean(false) + private val onCloseCalled = AtomicBoolean(false) override suspend fun start() { if (!initialized.compareAndSet(expectedValue = false, newValue = true)) { error("StdioClientTransport already started!") } - logger.debug { "Starting StdioClientTransport..." } - val outputStream = output.buffered() - - job = scope.launch(CoroutineName("StdioClientTransport.IO#${hashCode()}")) { - val readJob = launch { - logger.debug { "Read coroutine started." } - try { - input.use { - while (isActive) { - val buffer = Buffer() - val bytesRead = input.readAtMostTo(buffer, 8192) - if (bytesRead == -1L) break - if (bytesRead > 0L) { - readBuffer.append(buffer.readByteArray()) - processReadBuffer() - } + // Producers run on IODispatcher for I/O + // Collector runs on Default dispatcher for message handling + scope.launch(CoroutineName("StdioClientTransport.IO#${hashCode()}")) { + var writeJob: Job? = null + val mainScope = this + try { + // Explicitly use ioCoroutineContext for I/O operations + writeJob = launch(ioCoroutineContext) { + logger.debug { "Write coroutine started." } + output.buffered().use { sink -> + sendChannel.consumeEach { message -> + sendOutboundMessage(message, sink, mainScope) + yield() // Giving other coroutines a chance to run } } - } catch (e: Exception) { - _onError.invoke(e) - logger.error(e) { "Error reading from input stream" } } - } - val writeJob = launch { - logger.debug { "Write coroutine started." } - try { - sendChannel.consumeEach { message -> - val json = serializeMessage(message) - outputStream.writeString(json) - outputStream.flush() + val eventsFlow = channelFlow { + launch(ioCoroutineContext) { + logger.debug { "Read stdin coroutine started." } + val readBuffer = ReadBuffer() // parses bytes into JSONRPCMessage + readSource(stream = ProcessStream.Stdin, source = input, channel = this@channelFlow) { bytes -> + readBuffer.append(bytes) + do { + val msg = readBuffer.readMessage() + msg?.let { send(Event.JsonRpc(msg)) } + } while (msg != null) + } + }.invokeOnCompletion { + logger.debug(it) { "Read stdin coroutine finished." } } - } catch (e: Throwable) { - if (isActive) { - _onError.invoke(e) - logger.error(e) { "Error writing to output stream" } + + error?.let { source -> + launch(ioCoroutineContext) { + logger.debug { "Read stderr coroutine started." } + readSource( + stream = ProcessStream.Stderr, + source = source, + channel = this@channelFlow, + ) { bytes -> + val str = bytes.decodeToString() + send(Event.StderrEvent(str)) + } + } } - } finally { - output.close() } - } - readJob.join() - writeJob.cancelAndJoin() - _onClose.invoke() + // Collect events on handlerCoroutineContext (Dispatchers.Default from parent scope) + // No flowOn necessary - collection runs in parent launch context + eventsFlow + .collect { event -> + when (event) { + is Event.JsonRpc -> { + handleJSONRPCMessage(event.message) + } + + is Event.StderrEvent -> { + if (processStdError(event.message)) { + runCatching { + _onError(McpException(INTERNAL_ERROR, "Message in StdErr: ${event.message}")) + } + stopProcessing("STDERR message received") + } + } + + is Event.EOFEvent -> { + if (event.stream == ProcessStream.Stdin) { + stopProcessing("EOF in ${event.stream}") + } + } + + is Event.IOErrorEvent -> { + runCatching { _onError(event.cause) } + stopProcessing("IO Error", event.cause) + } + } + } + } finally { + // Wait for write job to complete before closing, matching old implementation + writeJob?.cancelAndJoin() + logger.debug { "Transport coroutine completed, calling onClose" } + callOnCloseOnce() + } } } override suspend fun send(message: JSONRPCMessage, options: TransportSendOptions?) { - if (!initialized.load()) { - error("Transport not started") + check(initialized.load()) { "Transport is not started" } + check(!onCloseCalled.load()) { "Transport is closed" } + @Suppress("TooGenericExceptionCaught", "SwallowedException") + try { + sendChannel.send(message) + } catch (e: ClosedSendChannelException) { + logger.debug(e) { "Cannot send message: transport is closed" } + throw McpException(CONNECTION_CLOSED, "Transport is closed") + } catch (e: McpException) { + logger.debug(e) { "Error while sending message: ${e.message}" } + throw e + } catch (e: Exception) { + logger.error(e) { "Error while sending message: ${e.message}" } + throw McpException(INTERNAL_ERROR, "Error while sending message: ${e.message}") } - - sendChannel.send(message) } override suspend fun close() { if (!initialized.compareAndSet(expectedValue = true, newValue = false)) { - error("Transport is already closed") + return // Already closed } - job?.cancelAndJoin() - input.close() - output.close() - readBuffer.clear() - sendChannel.close() - _onClose.invoke() + scope.stopProcessing("Closed") + scope.coroutineContext[Job]?.join() // Wait for all coroutines to complete } - private suspend fun processReadBuffer() { - while (true) { - val msg = readBuffer.readMessage() ?: break - try { - _onMessage.invoke(msg) - } catch (e: Throwable) { - _onError.invoke(e) - logger.error(e) { "Error processing message." } + private fun callOnCloseOnce() { + if (onCloseCalled.compareAndSet(expectedValue = false, newValue = true)) { + runCatching { _onClose() } + } + } + + private fun sendOutboundMessage(message: JSONRPCMessage, sink: Sink, mainScope: CoroutineScope) { + try { + val json = serializeMessage(message) + sink.writeString(json) + sink.flush() + } catch (e: SerializationException) { + logger.warn(e) { "Can't serialize message" } + runCatching { _onError(McpException(INTERNAL_ERROR, "Serialization error")) } + mainScope.stopProcessing("Can't serialize message", e) + } catch (e: IOException) { + logger.warn(e) { "Can't send message" } + runCatching { _onError(McpException(CONNECTION_CLOSED, "Can't send message. Connection closed")) } + mainScope.stopProcessing("Write I/O failed", e) + } + } + + private suspend fun handleJSONRPCMessage(msg: JSONRPCMessage) { + @Suppress("TooGenericExceptionCaught") + try { + _onMessage.invoke(msg) + } catch (e: Throwable) { + logger.error(e) { "Error processing message." } + runCatching { _onError.invoke(e) } + } + } + + private fun CoroutineScope.stopProcessing(reason: String, cause: Throwable? = null) { + sendChannel.close() // Stop accepting new messages + callOnCloseOnce() + cancel(reason, cause) // cancel current coroutine context + } + + private suspend fun CoroutineScope.readSource( + stream: ProcessStream, + source: Source, + channel: ProducerScope, + bytesConsumer: suspend (ByteArray) -> Unit, + ) { + val buffer = Buffer() + try { + source.use { source -> + while (isActive) { + val bytesRead = source.readAtMostTo(buffer, BUFFER_SIZE) + if (bytesRead == -1L) { + logger.debug { "EOF reached in $stream" } + channel.send(Event.EOFEvent(stream)) + break + } + + if (bytesRead > 0L) { + val bytes = buffer.readByteArray() + buffer.clear() + bytesConsumer.invoke(bytes) + } + + yield() // Giving other coroutines a chance to run + } } + } catch (exception: IOException) { + logger.debug(exception) { "IOException while reading stream" } + channel.send(Event.IOErrorEvent(stream, exception)) + } finally { + buffer.clear() + } + } + + private enum class ProcessStream { Stdin, Stderr, Stdout } + + /** + * Represents an event in the communication process. + * + * Events are a sealed hierarchy of different types of communication signals + * between processes. These events are used to manage and interpret information + * or errors generated during the operation of the associated transport. + */ + private sealed interface Event { + + data class JsonRpc(val message: JSONRPCMessage) : Event + + data class StderrEvent(val message: String) : Event + + data class EOFEvent(val stream: ProcessStream) : Event + + data class IOErrorEvent(val stream: ProcessStream, val cause: Throwable) : Event { + override fun toString(): String = "IOErrorEvent(stream=$stream, cause=${cause.message})" } } } diff --git a/kotlin-sdk-core/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/shared/AbstractTransport.kt b/kotlin-sdk-core/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/shared/AbstractTransport.kt index 55e09da8..cbab7aeb 100644 --- a/kotlin-sdk-core/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/shared/AbstractTransport.kt +++ b/kotlin-sdk-core/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/shared/AbstractTransport.kt @@ -1,6 +1,6 @@ package io.modelcontextprotocol.kotlin.sdk.shared -import io.modelcontextprotocol.kotlin.sdk.JSONRPCMessage +import io.modelcontextprotocol.kotlin.sdk.types.JSONRPCMessage import kotlinx.coroutines.CompletableDeferred /** diff --git a/kotlin-sdk-test/src/commonTest/kotlin/io/modelcontextprotocol/kotlin/sdk/shared/BaseTransportTest.kt b/kotlin-sdk-test/src/commonTest/kotlin/io/modelcontextprotocol/kotlin/sdk/shared/BaseTransportTest.kt index 1929facf..5de5fd28 100644 --- a/kotlin-sdk-test/src/commonTest/kotlin/io/modelcontextprotocol/kotlin/sdk/shared/BaseTransportTest.kt +++ b/kotlin-sdk-test/src/commonTest/kotlin/io/modelcontextprotocol/kotlin/sdk/shared/BaseTransportTest.kt @@ -5,10 +5,12 @@ import io.modelcontextprotocol.kotlin.sdk.types.JSONRPCMessage import io.modelcontextprotocol.kotlin.sdk.types.PingRequest import io.modelcontextprotocol.kotlin.sdk.types.toJSON import kotlinx.coroutines.CompletableDeferred +import kotlinx.coroutines.delay import kotlin.test.assertEquals import kotlin.test.assertFalse import kotlin.test.assertTrue import kotlin.test.fail +import kotlin.time.Duration.Companion.seconds abstract class BaseTransportTest { @@ -21,6 +23,8 @@ abstract class BaseTransportTest { transport.onClose { didClose = true } transport.start() + delay(1.seconds) + assertFalse(didClose, "Transport should not be closed immediately after start") transport.close() diff --git a/kotlin-sdk-test/src/jvmTest/kotlin/io/modelcontextprotocol/kotlin/sdk/client/StdioClientTransportTest.kt b/kotlin-sdk-test/src/jvmTest/kotlin/io/modelcontextprotocol/kotlin/sdk/client/StdioClientTransportTest.kt index f0c7aed2..1bb66c7f 100644 --- a/kotlin-sdk-test/src/jvmTest/kotlin/io/modelcontextprotocol/kotlin/sdk/client/StdioClientTransportTest.kt +++ b/kotlin-sdk-test/src/jvmTest/kotlin/io/modelcontextprotocol/kotlin/sdk/client/StdioClientTransportTest.kt @@ -1,13 +1,60 @@ package io.modelcontextprotocol.kotlin.sdk.client import io.modelcontextprotocol.kotlin.sdk.shared.BaseTransportTest +import io.modelcontextprotocol.kotlin.sdk.types.Implementation +import io.modelcontextprotocol.kotlin.sdk.types.McpException +import kotlinx.coroutines.runBlocking import kotlinx.coroutines.test.runTest import kotlinx.io.asSink import kotlinx.io.asSource import kotlinx.io.buffered import org.junit.jupiter.api.Test +import org.junit.jupiter.api.Timeout +import org.junit.jupiter.api.assertThrows +import java.util.concurrent.TimeUnit +@Timeout(20, unit = TimeUnit.SECONDS) class StdioClientTransportTest : BaseTransportTest() { + + @Test + @Timeout(30, unit = TimeUnit.SECONDS) + fun `handle stdio error`(): Unit = runBlocking { + val processBuilder = if (System.getProperty("os.name").lowercase().contains("win")) { + ProcessBuilder("cmd", "/c", "pause 1 && echo simulated error 1>&2 && exit 1") + } else { + ProcessBuilder("sh", "-c", "sleep 1 && echo 'simulated error' >&2 && exit 1") + } + + val process = processBuilder.start() + + val stdin = process.inputStream.asSource().buffered() + val stdout = process.outputStream.asSink().buffered() + val stderr = process.errorStream.asSource().buffered() + + val transport = StdioClientTransport( + input = stdin, + output = stdout, + error = stderr, + ) { + println("💥Ah-oh!, error: \"$it\"") + true + } + + val client = Client( + clientInfo = Implementation( + name = "test-client", + version = "1.0", + ), + ) + + // The error in stderr should cause connecting to fail + assertThrows { + client.connect(transport) + } + + process.destroyForcibly() + } + @Test fun `should start then close cleanly`() = runTest { // Run process "/usr/bin/tee" @@ -24,7 +71,7 @@ class StdioClientTransportTest : BaseTransportTest() { testTransportOpenClose(transport) - process.destroy() + process.destroyForcibly() } @Test @@ -43,7 +90,7 @@ class StdioClientTransportTest : BaseTransportTest() { testTransportRead(transport) process.waitFor() - process.destroy() + process.destroyForcibly() } @Test @@ -63,6 +110,6 @@ class StdioClientTransportTest : BaseTransportTest() { testTransportRead(transport) process.waitFor() - process.destroy() + process.destroyForcibly() } } diff --git a/kotlin-sdk-test/src/jvmTest/kotlin/io/modelcontextprotocol/kotlin/sdk/integration/typescript/stdio/KotlinClientTsServerEdgeCasesTestStdio.kt b/kotlin-sdk-test/src/jvmTest/kotlin/io/modelcontextprotocol/kotlin/sdk/integration/typescript/stdio/KotlinClientTsServerEdgeCasesTestStdio.kt index fcab03c3..2bdd3618 100644 --- a/kotlin-sdk-test/src/jvmTest/kotlin/io/modelcontextprotocol/kotlin/sdk/integration/typescript/stdio/KotlinClientTsServerEdgeCasesTestStdio.kt +++ b/kotlin-sdk-test/src/jvmTest/kotlin/io/modelcontextprotocol/kotlin/sdk/integration/typescript/stdio/KotlinClientTsServerEdgeCasesTestStdio.kt @@ -89,7 +89,7 @@ class KotlinClientTsServerEdgeCasesTestStdio : TsTestBase() { } @Test - @Timeout(60, unit = TimeUnit.SECONDS) + @Timeout(30, unit = TimeUnit.SECONDS) fun testConcurrentRequestsOverStdio(): Unit = runBlocking(Dispatchers.IO) { withClientStdio { client: Client, _ -> val concurrentCount = 5 @@ -102,8 +102,7 @@ class KotlinClientTsServerEdgeCasesTestStdio : TsTestBase() { val result = client.callTool("greet", arguments) assertNotNull(result, "Tool call result should not be null for client $i") - val callResult = result - val textContent = callResult.content.firstOrNull { it is TextContent } as? TextContent + val textContent = result.content.firstOrNull { it is TextContent } as? TextContent assertNotNull(textContent, "Text content should be present for client $i") textContent.text From 93c4e68995a130019876546b74b190e2bf6ef5f9 Mon Sep 17 00:00:00 2001 From: Konstantin Pavlov <1517853+kpavlov@users.noreply.github.com> Date: Wed, 3 Dec 2025 13:39:22 +0200 Subject: [PATCH 2/5] Improve stderr handling with severity levels in StdioClientTransport Replaced a boolean ` processStdError ` callback with `classifyStderr` that returns a `StderrSeverity` enum (FATAL, WARNING, INFO, DEBUG, IGNORE). This allows fine-grained control over stderr message handling: FATAL terminates the transport, while other levels log at appropriate levels or discard messages. Updated KDoc with comprehensive documentation and usage examples. --- kotlin-sdk-client/api/kotlin-sdk-client.api | 18 ++-- .../kotlin/sdk/client/StdioClientTransport.kt | 85 ++++++++++++++++--- 2 files changed, 83 insertions(+), 20 deletions(-) diff --git a/kotlin-sdk-client/api/kotlin-sdk-client.api b/kotlin-sdk-client/api/kotlin-sdk-client.api index f1807d55..06094829 100644 --- a/kotlin-sdk-client/api/kotlin-sdk-client.api +++ b/kotlin-sdk-client/api/kotlin-sdk-client.api @@ -62,13 +62,6 @@ public final class io/modelcontextprotocol/kotlin/sdk/client/KtorClientKt { public static synthetic fun mcpSseTransport-5_5nbZA$default (Lio/ktor/client/HttpClient;Ljava/lang/String;Lkotlin/time/Duration;Lkotlin/jvm/functions/Function1;ILjava/lang/Object;)Lio/modelcontextprotocol/kotlin/sdk/client/SseClientTransport; } -public final class io/modelcontextprotocol/kotlin/sdk/client/OldStdioClientTransport : io/modelcontextprotocol/kotlin/sdk/shared/AbstractTransport { - public fun (Lkotlinx/io/Source;Lkotlinx/io/Sink;)V - public fun close (Lkotlin/coroutines/Continuation;)Ljava/lang/Object; - public fun send (Lio/modelcontextprotocol/kotlin/sdk/types/JSONRPCMessage;Lio/modelcontextprotocol/kotlin/sdk/shared/TransportSendOptions;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; - public fun start (Lkotlin/coroutines/Continuation;)Ljava/lang/Object; -} - public final class io/modelcontextprotocol/kotlin/sdk/client/SseClientTransport : io/modelcontextprotocol/kotlin/sdk/shared/AbstractTransport { public synthetic fun (Lio/ktor/client/HttpClient;Ljava/lang/String;Lkotlin/time/Duration;Lkotlin/jvm/functions/Function1;ILkotlin/jvm/internal/DefaultConstructorMarker;)V public synthetic fun (Lio/ktor/client/HttpClient;Ljava/lang/String;Lkotlin/time/Duration;Lkotlin/jvm/functions/Function1;Lkotlin/jvm/internal/DefaultConstructorMarker;)V @@ -89,6 +82,17 @@ public final class io/modelcontextprotocol/kotlin/sdk/client/StdioClientTranspor public fun start (Lkotlin/coroutines/Continuation;)Ljava/lang/Object; } +public final class io/modelcontextprotocol/kotlin/sdk/client/StdioClientTransport$StderrSeverity : java/lang/Enum { + public static final field DEBUG Lio/modelcontextprotocol/kotlin/sdk/client/StdioClientTransport$StderrSeverity; + public static final field FATAL Lio/modelcontextprotocol/kotlin/sdk/client/StdioClientTransport$StderrSeverity; + public static final field IGNORE Lio/modelcontextprotocol/kotlin/sdk/client/StdioClientTransport$StderrSeverity; + public static final field INFO Lio/modelcontextprotocol/kotlin/sdk/client/StdioClientTransport$StderrSeverity; + public static final field WARNING Lio/modelcontextprotocol/kotlin/sdk/client/StdioClientTransport$StderrSeverity; + public static fun getEntries ()Lkotlin/enums/EnumEntries; + public static fun valueOf (Ljava/lang/String;)Lio/modelcontextprotocol/kotlin/sdk/client/StdioClientTransport$StderrSeverity; + public static fun values ()[Lio/modelcontextprotocol/kotlin/sdk/client/StdioClientTransport$StderrSeverity; +} + public final class io/modelcontextprotocol/kotlin/sdk/client/StreamableHttpClientTransport : io/modelcontextprotocol/kotlin/sdk/shared/AbstractTransport { public synthetic fun (Lio/ktor/client/HttpClient;Ljava/lang/String;Lkotlin/time/Duration;Lkotlin/jvm/functions/Function1;ILkotlin/jvm/internal/DefaultConstructorMarker;)V public synthetic fun (Lio/ktor/client/HttpClient;Ljava/lang/String;Lkotlin/time/Duration;Lkotlin/jvm/functions/Function1;Lkotlin/jvm/internal/DefaultConstructorMarker;)V diff --git a/kotlin-sdk-client/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/client/StdioClientTransport.kt b/kotlin-sdk-client/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/client/StdioClientTransport.kt index 8a54cf64..76debbae 100644 --- a/kotlin-sdk-client/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/client/StdioClientTransport.kt +++ b/kotlin-sdk-client/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/client/StdioClientTransport.kt @@ -1,6 +1,11 @@ package io.modelcontextprotocol.kotlin.sdk.client import io.github.oshai.kotlinlogging.KotlinLogging +import io.modelcontextprotocol.kotlin.sdk.client.StdioClientTransport.StderrSeverity.DEBUG +import io.modelcontextprotocol.kotlin.sdk.client.StdioClientTransport.StderrSeverity.FATAL +import io.modelcontextprotocol.kotlin.sdk.client.StdioClientTransport.StderrSeverity.IGNORE +import io.modelcontextprotocol.kotlin.sdk.client.StdioClientTransport.StderrSeverity.INFO +import io.modelcontextprotocol.kotlin.sdk.client.StdioClientTransport.StderrSeverity.WARNING import io.modelcontextprotocol.kotlin.sdk.internal.IODispatcher import io.modelcontextprotocol.kotlin.sdk.shared.AbstractTransport import io.modelcontextprotocol.kotlin.sdk.shared.ReadBuffer @@ -39,20 +44,42 @@ import kotlin.coroutines.CoroutineContext import kotlin.jvm.JvmOverloads /** - * A transport implementation for JSON-RPC communication that leverages standard input and output streams. + * A transport implementation for JSON-RPC communication over standard I/O streams. * - * This class reads from an input stream to process incoming JSON-RPC messages and writes JSON-RPC messages - * to an output stream. + * Reads JSON-RPC messages from [input] and writes messages to [output]. Optionally monitors + * [error] stream for stderr output with configurable severity handling. * - * Uses structured concurrency principles: + * ## Structured Concurrency * - Parent job controls all child coroutines * - Proper cancellation propagation - * - Resource cleanup guaranteed via structured concurrency + * - Resource cleanup guaranteed + * + * ## Usage Example + * ```kotlin + * val process = ProcessBuilder("mcp-server").start() + * + * val transport = StdioClientTransport( + * input = process.inputStream.asSource().buffered(), + * output = process.outputStream.asSink().buffered(), + * error = process.errorStream.asSource().buffered() + * ) { stderrLine -> + * when { + * stderrLine.contains("error", ignoreCase = true) -> StderrSeverity.FATAL + * stderrLine.contains("warning", ignoreCase = true) -> StderrSeverity.WARNING + * else -> StderrSeverity.INFO + * } + * } + * + * transport.start() + * ``` * * @param input The input stream where messages are received. * @param output The output stream where messages are sent. - * @param error Optional error stream for stderr processing. - * @param processStdError Callback for stderr lines. Returns true for fatal errors. + * @param error Optional error stream for stderr monitoring. + * @param sendChannel Channel for outbound messages. Default: buffered channel (capacity 64). + * @param classifyStderr Callback to classify stderr lines. Return [StderrSeverity.FATAL] to fail transport, + * or [StderrSeverity.WARNING]/[INFO]/[DEBUG] to log, or [IGNORE] to discard. + * Default: treats all stderr as [FATAL]. */ @OptIn(ExperimentalAtomicApi::class) public class StdioClientTransport @JvmOverloads public constructor( @@ -60,7 +87,7 @@ public class StdioClientTransport @JvmOverloads public constructor( private val output: Sink, private val error: Source? = null, private val sendChannel: Channel = Channel(Channel.BUFFERED), - private val processStdError: (String) -> Boolean = { true }, + private val classifyStderr: (String) -> StderrSeverity = { FATAL }, ) : AbstractTransport() { private companion object { @@ -69,9 +96,20 @@ public class StdioClientTransport @JvmOverloads public constructor( * 8KB is optimal for most systems (matches default page size). */ const val BUFFER_SIZE = 8 * 1024L + + private val logger = KotlinLogging.logger {} } - private val logger = KotlinLogging.logger {} + /** + * Severity classification for stderr messages. + * + * - [FATAL]: Calls error handler and terminates transport. + * - [WARNING]: Logs at WARN level, transport continues. + * - [INFO]: Logs at INFO level, transport continues. + * - [DEBUG]: Logs at DEBUG level, transport continues. + * - [IGNORE]: Discards message silently, transport continues. + */ + public enum class StderrSeverity { FATAL, WARNING, INFO, DEBUG, IGNORE } private val ioCoroutineContext: CoroutineContext = IODispatcher private val scope = CoroutineScope(SupervisorJob() + Dispatchers.Default) @@ -141,11 +179,32 @@ public class StdioClientTransport @JvmOverloads public constructor( } is Event.StderrEvent -> { - if (processStdError(event.message)) { - runCatching { - _onError(McpException(INTERNAL_ERROR, "Message in StdErr: ${event.message}")) + val errorSeverity = classifyStderr(event.message) + when (errorSeverity) { + FATAL -> { + runCatching { + _onError( + McpException(INTERNAL_ERROR, "Message in StdErr: ${event.message}"), + ) + } + stopProcessing("Fatal STDERR message received") + } + + WARNING -> { + logger.warn { "STDERR message received: ${event.message}" } + } + + INFO -> { + logger.info { "STDERR message received: ${event.message}" } + } + + DEBUG -> { + logger.debug { "STDERR message received: ${event.message}" } + } + + IGNORE -> { + // do nothing } - stopProcessing("STDERR message received") } } From 1f6288d0866718254600a0787f3f2d061f8505f5 Mon Sep 17 00:00:00 2001 From: Konstantin Pavlov <1517853+kpavlov@users.noreply.github.com> Date: Wed, 3 Dec 2025 13:39:22 +0200 Subject: [PATCH 3/5] Add/fix StdioClientTransport tests Enhance `StdioClientTransportTest` with clean shutdown validation, error handling, and added assertions. Extend timeout for stability. --- .../client/StreamableHttpClientTransport.kt | 1 + .../AbstractClientTransportLifecycleTest.kt | 86 ++++++++++++++++ .../StdioClientTransportLifecycleTest.kt | 26 +++++ .../StreamableHttpClientTransportTest.kt | 4 +- ...reamingHttpClientTransportLifecycleTest.kt | 43 ++++++++ .../StdioClientTransportErrorHandlingTest.kt | 98 +++++++++++++++++++ .../sdk/client/StdioClientTransportTest.kt | 36 ++++++- 7 files changed, 288 insertions(+), 6 deletions(-) create mode 100644 kotlin-sdk-client/src/commonTest/kotlin/io/modelcontextprotocol/kotlin/sdk/client/AbstractClientTransportLifecycleTest.kt create mode 100644 kotlin-sdk-client/src/commonTest/kotlin/io/modelcontextprotocol/kotlin/sdk/client/stdio/StdioClientTransportLifecycleTest.kt rename kotlin-sdk-client/src/commonTest/kotlin/io/modelcontextprotocol/kotlin/sdk/client/{ => streamable/http}/StreamableHttpClientTransportTest.kt (98%) create mode 100644 kotlin-sdk-client/src/commonTest/kotlin/io/modelcontextprotocol/kotlin/sdk/client/streamable/http/StreamingHttpClientTransportLifecycleTest.kt create mode 100644 kotlin-sdk-client/src/jvmTest/kotlin/io/modelcontextprotocol/kotlin/sdk/client/stdio/StdioClientTransportErrorHandlingTest.kt diff --git a/kotlin-sdk-client/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/client/StreamableHttpClientTransport.kt b/kotlin-sdk-client/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/client/StreamableHttpClientTransport.kt index e635f87d..a1712171 100644 --- a/kotlin-sdk-client/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/client/StreamableHttpClientTransport.kt +++ b/kotlin-sdk-client/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/client/StreamableHttpClientTransport.kt @@ -105,6 +105,7 @@ public class StreamableHttpClientTransport( resumptionToken: String?, onResumptionToken: ((String) -> Unit)? = null, ) { + check(initialized.load()) { "Transport is not started" } logger.debug { "Client sending message via POST to $url: ${McpJson.encodeToString(message)}" } // If we have a resumption token, reconnect the SSE stream with it diff --git a/kotlin-sdk-client/src/commonTest/kotlin/io/modelcontextprotocol/kotlin/sdk/client/AbstractClientTransportLifecycleTest.kt b/kotlin-sdk-client/src/commonTest/kotlin/io/modelcontextprotocol/kotlin/sdk/client/AbstractClientTransportLifecycleTest.kt new file mode 100644 index 00000000..8afc81fb --- /dev/null +++ b/kotlin-sdk-client/src/commonTest/kotlin/io/modelcontextprotocol/kotlin/sdk/client/AbstractClientTransportLifecycleTest.kt @@ -0,0 +1,86 @@ +package io.modelcontextprotocol.kotlin.sdk.client + +import io.kotest.assertions.throwables.shouldThrow +import io.kotest.matchers.shouldBe +import io.kotest.matchers.string.shouldContain +import io.modelcontextprotocol.kotlin.sdk.shared.AbstractTransport +import io.modelcontextprotocol.kotlin.sdk.types.PingRequest +import io.modelcontextprotocol.kotlin.sdk.types.toJSON +import kotlinx.coroutines.delay +import kotlinx.coroutines.test.runTest +import kotlin.test.BeforeTest +import kotlin.test.Test +import kotlin.time.Duration.Companion.milliseconds + +abstract class AbstractClientTransportLifecycleTest { + + protected lateinit var transport: T + + @BeforeTest + fun beforeEach() { + transport = createTransport() + } + + @Test + fun `should throw when started twice`() = runTest { + transport.start() + + val exception = shouldThrow { + transport.start() + } + exception.message shouldContain "already started" + } + + @Test + fun `should be idempotent when closed twice`() = runTest { + val transport = createTransport() + + transport.start() + transport.close() + + // Second close should not throw + transport.close() + } + + @Test + fun `should throw when sending before start`() = runTest { + val transport = createTransport() + + val exception = shouldThrow { + transport.send(PingRequest().toJSON()) + } + exception.message shouldContain "not started" + } + + @Test + fun `should throw when sending after close`() = runTest { + val transport = createTransport() + + transport.start() + delay(50.milliseconds) + transport.close() + + shouldThrow { + transport.send(PingRequest().toJSON()) + } + } + + @Test + fun `should call onClose exactly once`() = runTest { + val transport = createTransport() + + var closeCallCount = 0 + transport.onClose { closeCallCount++ } + + transport.start() + delay(50.milliseconds) + + // Multiple close attempts + transport.close() + transport.close() + + closeCallCount shouldBe 1 + } + + protected abstract fun createTransport(): T +} diff --git a/kotlin-sdk-client/src/commonTest/kotlin/io/modelcontextprotocol/kotlin/sdk/client/stdio/StdioClientTransportLifecycleTest.kt b/kotlin-sdk-client/src/commonTest/kotlin/io/modelcontextprotocol/kotlin/sdk/client/stdio/StdioClientTransportLifecycleTest.kt new file mode 100644 index 00000000..63542424 --- /dev/null +++ b/kotlin-sdk-client/src/commonTest/kotlin/io/modelcontextprotocol/kotlin/sdk/client/stdio/StdioClientTransportLifecycleTest.kt @@ -0,0 +1,26 @@ +package io.modelcontextprotocol.kotlin.sdk.client.stdio + +import io.modelcontextprotocol.kotlin.sdk.client.AbstractClientTransportLifecycleTest +import io.modelcontextprotocol.kotlin.sdk.client.StdioClientTransport +import kotlinx.io.Buffer +import kotlin.test.Ignore +import kotlin.test.Test + +class StdioClientTransportLifecycleTest : AbstractClientTransportLifecycleTest() { + + /** + * Dummy method to make IDE treat this class as a test + */ + @Test + @Ignore + fun dummyTest() = Unit + + override fun createTransport(): StdioClientTransport { + val inputBuffer = Buffer() + val outputBuffer = Buffer() + return StdioClientTransport( + input = inputBuffer, + output = outputBuffer, + ) + } +} diff --git a/kotlin-sdk-client/src/commonTest/kotlin/io/modelcontextprotocol/kotlin/sdk/client/StreamableHttpClientTransportTest.kt b/kotlin-sdk-client/src/commonTest/kotlin/io/modelcontextprotocol/kotlin/sdk/client/streamable/http/StreamableHttpClientTransportTest.kt similarity index 98% rename from kotlin-sdk-client/src/commonTest/kotlin/io/modelcontextprotocol/kotlin/sdk/client/StreamableHttpClientTransportTest.kt rename to kotlin-sdk-client/src/commonTest/kotlin/io/modelcontextprotocol/kotlin/sdk/client/streamable/http/StreamableHttpClientTransportTest.kt index 9968ceb1..5cf96b67 100644 --- a/kotlin-sdk-client/src/commonTest/kotlin/io/modelcontextprotocol/kotlin/sdk/client/StreamableHttpClientTransportTest.kt +++ b/kotlin-sdk-client/src/commonTest/kotlin/io/modelcontextprotocol/kotlin/sdk/client/streamable/http/StreamableHttpClientTransportTest.kt @@ -1,4 +1,4 @@ -package io.modelcontextprotocol.kotlin.sdk.client +package io.modelcontextprotocol.kotlin.sdk.client.streamable.http import io.ktor.client.HttpClient import io.ktor.client.engine.mock.MockEngine @@ -12,6 +12,8 @@ import io.ktor.http.HttpStatusCode import io.ktor.http.content.TextContent import io.ktor.http.headersOf import io.ktor.utils.io.ByteReadChannel +import io.modelcontextprotocol.kotlin.sdk.client.Client +import io.modelcontextprotocol.kotlin.sdk.client.StreamableHttpClientTransport import io.modelcontextprotocol.kotlin.sdk.types.Implementation import io.modelcontextprotocol.kotlin.sdk.types.JSONRPCMessage import io.modelcontextprotocol.kotlin.sdk.types.JSONRPCNotification diff --git a/kotlin-sdk-client/src/commonTest/kotlin/io/modelcontextprotocol/kotlin/sdk/client/streamable/http/StreamingHttpClientTransportLifecycleTest.kt b/kotlin-sdk-client/src/commonTest/kotlin/io/modelcontextprotocol/kotlin/sdk/client/streamable/http/StreamingHttpClientTransportLifecycleTest.kt new file mode 100644 index 00000000..f8ecec5b --- /dev/null +++ b/kotlin-sdk-client/src/commonTest/kotlin/io/modelcontextprotocol/kotlin/sdk/client/streamable/http/StreamingHttpClientTransportLifecycleTest.kt @@ -0,0 +1,43 @@ +package io.modelcontextprotocol.kotlin.sdk.client.streamable.http + +import io.ktor.client.HttpClient +import io.ktor.client.engine.mock.MockEngine +import io.ktor.client.engine.mock.respond +import io.ktor.client.plugins.sse.SSE +import io.ktor.http.ContentType +import io.ktor.http.HttpHeaders +import io.ktor.http.HttpStatusCode +import io.ktor.http.headersOf +import io.modelcontextprotocol.kotlin.sdk.client.AbstractClientTransportLifecycleTest +import io.modelcontextprotocol.kotlin.sdk.client.StreamableHttpClientTransport +import kotlin.test.Ignore +import kotlin.test.Test +import kotlin.time.Duration.Companion.seconds + +class StreamingHttpClientTransportLifecycleTest : + AbstractClientTransportLifecycleTest() { + + /** + * Dummy method to make IDE treat this class as a test + */ + @Test + @Ignore + fun dummyTest() = Unit + + override fun createTransport(): StreamableHttpClientTransport { + val mockEngine = MockEngine { + respond( + "this is not valid json", + status = HttpStatusCode.OK, + headers = headersOf(HttpHeaders.ContentType, ContentType.Application.Json.toString()), + ) + } + val httpClient = HttpClient(mockEngine) { + install(SSE) { + reconnectionTime = 1.seconds + } + } + + return StreamableHttpClientTransport(httpClient, url = "http://localhost:8080/mcp") + } +} diff --git a/kotlin-sdk-client/src/jvmTest/kotlin/io/modelcontextprotocol/kotlin/sdk/client/stdio/StdioClientTransportErrorHandlingTest.kt b/kotlin-sdk-client/src/jvmTest/kotlin/io/modelcontextprotocol/kotlin/sdk/client/stdio/StdioClientTransportErrorHandlingTest.kt new file mode 100644 index 00000000..ba3bb2a4 --- /dev/null +++ b/kotlin-sdk-client/src/jvmTest/kotlin/io/modelcontextprotocol/kotlin/sdk/client/stdio/StdioClientTransportErrorHandlingTest.kt @@ -0,0 +1,98 @@ +package io.modelcontextprotocol.kotlin.sdk.client.stdio + +import io.kotest.matchers.booleans.shouldBeFalse +import io.kotest.matchers.shouldBe +import io.modelcontextprotocol.kotlin.sdk.client.StdioClientTransport +import kotlinx.coroutines.delay +import kotlinx.coroutines.test.runTest +import kotlinx.io.Buffer +import kotlinx.io.writeString +import kotlin.concurrent.atomics.AtomicBoolean +import kotlin.concurrent.atomics.ExperimentalAtomicApi +import kotlin.test.Test +import kotlin.time.Duration.Companion.milliseconds + +/** + * Tests for StdioClientTransport error handling: EOF, IO errors, and edge cases. + */ +class StdioClientTransportErrorHandlingTest { + + private lateinit var transport: StdioClientTransport + + @OptIn(ExperimentalAtomicApi::class) + @Test + fun `should continue on stderr EOF`() = runTest { + val stderrBuffer = Buffer() + // Empty stderr = immediate EOF + + val inputBuffer = Buffer() + inputBuffer.writeString("""data: {"jsonrpc":"2.0","method":"ping","id":1}\n\n""") + val outputBuffer = Buffer() + + transport = StdioClientTransport( + input = inputBuffer, + output = outputBuffer, + error = stderrBuffer, + ) + + val closeCalled = AtomicBoolean(false) + transport.onClose { closeCalled.store(true) } + + transport.start() + delay(200.milliseconds) + + // Stderr EOF should not close transport + closeCalled.load() shouldBe false + + transport.close() + closeCalled.load() shouldBe true + } + + @Test + fun `should call onClose exactly once on error scenarios`() = runTest { + val stderrBuffer = Buffer() + stderrBuffer.write("FATAL: critical error\n".encodeToByteArray()) + + val inputBuffer = Buffer() + val outputBuffer = Buffer() + + var closeCallCount = 0 + + transport = StdioClientTransport( + input = inputBuffer, + output = outputBuffer, + error = stderrBuffer, + classifyStderr = { StdioClientTransport.StderrSeverity.FATAL }, + ) + + transport.onClose { closeCallCount++ } + + transport.start() + delay(100.milliseconds) + + // Explicit close after error already closed it + transport.close() + + closeCallCount shouldBe 1 + } + + @Test + fun `should handle empty input gracefully`() = runTest { + val inputBuffer = Buffer() + val outputBuffer = Buffer() + + transport = StdioClientTransport( + input = inputBuffer, + output = outputBuffer, + ) + + var errorCalled = false + transport.onError { errorCalled = true } + + transport.start() + delay(100.milliseconds) + + // Empty input should close cleanly without error + errorCalled.shouldBeFalse() + } +} diff --git a/kotlin-sdk-test/src/jvmTest/kotlin/io/modelcontextprotocol/kotlin/sdk/client/StdioClientTransportTest.kt b/kotlin-sdk-test/src/jvmTest/kotlin/io/modelcontextprotocol/kotlin/sdk/client/StdioClientTransportTest.kt index 1bb66c7f..fa2f032c 100644 --- a/kotlin-sdk-test/src/jvmTest/kotlin/io/modelcontextprotocol/kotlin/sdk/client/StdioClientTransportTest.kt +++ b/kotlin-sdk-test/src/jvmTest/kotlin/io/modelcontextprotocol/kotlin/sdk/client/StdioClientTransportTest.kt @@ -3,6 +3,7 @@ package io.modelcontextprotocol.kotlin.sdk.client import io.modelcontextprotocol.kotlin.sdk.shared.BaseTransportTest import io.modelcontextprotocol.kotlin.sdk.types.Implementation import io.modelcontextprotocol.kotlin.sdk.types.McpException +import kotlinx.coroutines.delay import kotlinx.coroutines.runBlocking import kotlinx.coroutines.test.runTest import kotlinx.io.asSink @@ -12,12 +13,18 @@ import org.junit.jupiter.api.Test import org.junit.jupiter.api.Timeout import org.junit.jupiter.api.assertThrows import java.util.concurrent.TimeUnit - -@Timeout(20, unit = TimeUnit.SECONDS) +import kotlin.concurrent.atomics.AtomicBoolean +import kotlin.concurrent.atomics.ExperimentalAtomicApi +import kotlin.test.assertFalse +import kotlin.test.assertTrue +import kotlin.test.fail +import kotlin.time.Duration.Companion.milliseconds +import kotlin.time.Duration.Companion.seconds + +@Timeout(30, unit = TimeUnit.SECONDS) class StdioClientTransportTest : BaseTransportTest() { @Test - @Timeout(30, unit = TimeUnit.SECONDS) fun `handle stdio error`(): Unit = runBlocking { val processBuilder = if (System.getProperty("os.name").lowercase().contains("win")) { ProcessBuilder("cmd", "/c", "pause 1 && echo simulated error 1>&2 && exit 1") @@ -37,7 +44,7 @@ class StdioClientTransportTest : BaseTransportTest() { error = stderr, ) { println("💥Ah-oh!, error: \"$it\"") - true + StdioClientTransport.StderrSeverity.FATAL } val client = Client( @@ -55,6 +62,7 @@ class StdioClientTransportTest : BaseTransportTest() { process.destroyForcibly() } + @OptIn(ExperimentalAtomicApi::class) @Test fun `should start then close cleanly`() = runTest { // Run process "/usr/bin/tee" @@ -63,15 +71,33 @@ class StdioClientTransportTest : BaseTransportTest() { val input = process.inputStream.asSource().buffered() val output = process.outputStream.asSink().buffered() + val error = process.errorStream.asSource().buffered() val transport = StdioClientTransport( input = input, output = output, + error = error, ) - testTransportOpenClose(transport) + transport.onError { error -> + fail("Unexpected error: $error") + } + + val didClose = AtomicBoolean(false) + transport.onClose { didClose.store(true) } + + transport.start() + delay(1.seconds) + assertFalse(didClose.load(), "Transport should not be closed immediately after start") + + // Destroy process BEFORE close() to unblock stdin reader process.destroyForcibly() + delay(100.milliseconds) // Give time for EOF to propagate + + transport.close() + + assertTrue(didClose.load(), "Transport should be closed after close() call") } @Test From 16da2273ecf0d26f4a69e5d38c670da81fe12cdd Mon Sep 17 00:00:00 2001 From: Konstantin Pavlov <1517853+kpavlov@users.noreply.github.com> Date: Wed, 3 Dec 2025 18:28:05 +0200 Subject: [PATCH 4/5] Update JUnit report step in CI workflow: enable comment updates, group test suites, and avoid stack trace truncation. --- .github/workflows/build.yml | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 719f9c7a..e05c06aa 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -12,6 +12,10 @@ concurrency: # Cancel only when the run is NOT on `main` branch cancel-in-progress: ${{ github.ref != 'refs/heads/main' }} +permissions: + checks: write + pull-requests: write # only required if `comment: true` was enabled + jobs: validate-pr: runs-on: macos-latest-xlarge @@ -70,12 +74,16 @@ jobs: uses: mikepenz/action-junit-report@v6 if: ${{ !cancelled() }} # always run even if the previous step fails with: - report_paths: '**/test-results/**/TEST-*.xml' + annotate_only: true + comment: true detailed_summary: true flaky_summary: true + group_suite: true include_empty_in_summary: false include_time_in_summary: true - annotate_only: true + report_paths: '**/test-results/**/TEST-*.xml' + truncate_stack_traces: false + updateComment: true - name: Disable Auto-Merge on Fail if: failure() && github.event_name == 'pull_request' From 075bef85fb6ce1ac1cd36d03571cd9175c2ce26a Mon Sep 17 00:00:00 2001 From: Konstantin Pavlov <1517853+kpavlov@users.noreply.github.com> Date: Wed, 3 Dec 2025 19:31:12 +0200 Subject: [PATCH 5/5] Update `StdioClientTransport`: and update default behavior for `classifyStderr` to DEBUG --- .../kotlin/sdk/client/StdioClientTransport.kt | 13 +++++-------- 1 file changed, 5 insertions(+), 8 deletions(-) diff --git a/kotlin-sdk-client/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/client/StdioClientTransport.kt b/kotlin-sdk-client/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/client/StdioClientTransport.kt index 76debbae..b6aa8275 100644 --- a/kotlin-sdk-client/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/client/StdioClientTransport.kt +++ b/kotlin-sdk-client/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/client/StdioClientTransport.kt @@ -49,11 +49,6 @@ import kotlin.jvm.JvmOverloads * Reads JSON-RPC messages from [input] and writes messages to [output]. Optionally monitors * [error] stream for stderr output with configurable severity handling. * - * ## Structured Concurrency - * - Parent job controls all child coroutines - * - Proper cancellation propagation - * - Resource cleanup guaranteed - * * ## Usage Example * ```kotlin * val process = ProcessBuilder("mcp-server").start() @@ -78,8 +73,10 @@ import kotlin.jvm.JvmOverloads * @param error Optional error stream for stderr monitoring. * @param sendChannel Channel for outbound messages. Default: buffered channel (capacity 64). * @param classifyStderr Callback to classify stderr lines. Return [StderrSeverity.FATAL] to fail transport, - * or [StderrSeverity.WARNING]/[INFO]/[DEBUG] to log, or [IGNORE] to discard. - * Default: treats all stderr as [FATAL]. + * or [StderrSeverity.WARNING] / [StderrSeverity.INFO] / [StderrSeverity.DEBUG] + * to log, or [StderrSeverity.IGNORE] to discard. + * Default value: [StderrSeverity.DEBUG]. + * @see MCP Specification */ @OptIn(ExperimentalAtomicApi::class) public class StdioClientTransport @JvmOverloads public constructor( @@ -87,7 +84,7 @@ public class StdioClientTransport @JvmOverloads public constructor( private val output: Sink, private val error: Source? = null, private val sendChannel: Channel = Channel(Channel.BUFFERED), - private val classifyStderr: (String) -> StderrSeverity = { FATAL }, + private val classifyStderr: (String) -> StderrSeverity = { DEBUG }, ) : AbstractTransport() { private companion object {