Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions AGENTS.md
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ MCP Kotlin SDK — Kotlin Multiplatform implementation of the Model Context Prot
### Multiplatform Patterns
- Use `expect`/`actual` pattern for platform-specific implementations in `utils.*` files.
- Test changes on JVM first, then verify platform-specific behavior if needed.
- Use Kotlin 2.2 api and language level
- Supported targets: JVM (1.8+), JS/Wasm, iOS, watchOS, tvOS.

### Serialization
Expand Down
11 changes: 7 additions & 4 deletions gradle/libs.versions.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,21 +6,23 @@ atomicfu = "0.29.0"
ktlint = "14.0.1"
kover = "0.9.3"
netty = "4.2.7.Final"

mavenPublish = "0.35.0"
binaryCompatibilityValidatorPlugin = "0.18.1"
openapi-generator = "7.17.0"

# libraries version
serialization = "1.9.0"
awaitility = "4.3.0"
collections-immutable = "0.4.0"
coroutines = "1.10.2"
kotest = "6.0.4"
kotlinx-io = "0.8.1"
ktor = "3.2.3"
logging = "7.0.13"
slf4j = "2.0.17"
kotest = "6.0.4"
awaitility = "4.3.0"
mockk = "1.14.6"
mokksy = "0.6.2"
serialization = "1.9.0"
slf4j = "2.0.17"

[libraries]
# Plugins
Expand Down Expand Up @@ -53,6 +55,7 @@ kotest-assertions-json = { group = "io.kotest", name = "kotest-assertions-json",
kotlinx-coroutines-test = { group = "org.jetbrains.kotlinx", name = "kotlinx-coroutines-test", version.ref = "coroutines" }
ktor-client-mock = { group = "io.ktor", name = "ktor-client-mock", version.ref = "ktor" }
ktor-server-test-host = { group = "io.ktor", name = "ktor-server-test-host", version.ref = "ktor" }
mockk = { module = "io.mockk:mockk", version.ref = "mockk" }
mokksy = { group = "dev.mokksy", name = "mokksy", version.ref = "mokksy" }
netty-bom = { group = "io.netty", name = "netty-bom", version.ref = "netty" }
slf4j-simple = { group = "org.slf4j", name = "slf4j-simple", version.ref = "slf4j" }
Expand Down
3 changes: 3 additions & 0 deletions kotlin-sdk-client/api/kotlin-sdk-client.api
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,9 @@ public final class io/modelcontextprotocol/kotlin/sdk/client/SseClientTransport

public final class io/modelcontextprotocol/kotlin/sdk/client/StdioClientTransport : io/modelcontextprotocol/kotlin/sdk/shared/AbstractTransport {
public fun <init> (Lkotlinx/io/Source;Lkotlinx/io/Sink;)V
public fun <init> (Lkotlinx/io/Source;Lkotlinx/io/Sink;Lkotlinx/io/Source;)V
public fun <init> (Lkotlinx/io/Source;Lkotlinx/io/Sink;Lkotlinx/io/Source;Lkotlin/jvm/functions/Function1;)V
public synthetic fun <init> (Lkotlinx/io/Source;Lkotlinx/io/Sink;Lkotlinx/io/Source;Lkotlin/jvm/functions/Function1;ILkotlin/jvm/internal/DefaultConstructorMarker;)V
public fun close (Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public fun send (Lio/modelcontextprotocol/kotlin/sdk/types/JSONRPCMessage;Lio/modelcontextprotocol/kotlin/sdk/shared/TransportSendOptions;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public fun start (Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
Expand Down
2 changes: 2 additions & 0 deletions kotlin-sdk-client/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ kotlin {
implementation(libs.ktor.server.websockets)
implementation(libs.kotlinx.coroutines.test)
implementation(libs.ktor.client.logging)
implementation(libs.kotest.assertions.core)
}
}

Expand All @@ -53,6 +54,7 @@ kotlin {
implementation(libs.mokksy)
implementation(libs.awaitility)
implementation(libs.ktor.client.apache5)
implementation(libs.mockk)
implementation(dependencies.platform(libs.netty.bom))
runtimeOnly(libs.slf4j.simple)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,19 @@ import io.modelcontextprotocol.kotlin.sdk.shared.ReadBuffer
import io.modelcontextprotocol.kotlin.sdk.shared.TransportSendOptions
import io.modelcontextprotocol.kotlin.sdk.shared.serializeMessage
import io.modelcontextprotocol.kotlin.sdk.types.JSONRPCMessage
import io.modelcontextprotocol.kotlin.sdk.types.McpException
import io.modelcontextprotocol.kotlin.sdk.types.RPCError
import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.CompletableJob
import kotlinx.coroutines.CoroutineName
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Job
import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.cancelAndJoin
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.channels.consumeEach
import kotlinx.coroutines.currentCoroutineContext
import kotlinx.coroutines.isActive
import kotlinx.coroutines.launch
import kotlinx.coroutines.supervisorScope
import kotlinx.io.Buffer
import kotlinx.io.Sink
import kotlinx.io.Source
Expand All @@ -24,80 +28,100 @@ import kotlinx.io.readByteArray
import kotlinx.io.writeString
import kotlin.concurrent.atomics.AtomicBoolean
import kotlin.concurrent.atomics.ExperimentalAtomicApi
import kotlin.coroutines.CoroutineContext
import kotlin.jvm.JvmOverloads

/**
* A transport implementation for JSON-RPC communication that leverages standard input and output streams.
*
* This class reads from an input stream to process incoming JSON-RPC messages and writes JSON-RPC messages
* to an output stream.
*
* Uses structured concurrency principles:
* - Parent job controls all child coroutines
* - Proper cancellation propagation
* - Resource cleanup guaranteed via structured concurrency
*
* @param input The input stream where messages are received.
* @param output The output stream where messages are sent.
* @param error Optional error stream for stderr processing.
* @param processStdError Callback for stderr lines. Returns true for fatal errors.
*/
@OptIn(ExperimentalAtomicApi::class)
public class StdioClientTransport(private val input: Source, private val output: Sink) : AbstractTransport() {
public class StdioClientTransport @JvmOverloads public constructor(
private val input: Source,
private val output: Sink,
private val error: Source? = null,
private val processStdError: (String) -> Boolean = { true },
) : AbstractTransport() {
private val logger = KotlinLogging.logger {}
private val ioCoroutineContext: CoroutineContext = IODispatcher
private val scope by lazy {
CoroutineScope(ioCoroutineContext + SupervisorJob())
}
private var job: Job? = null

// Structured concurrency: single parent job manages all I/O operations
private val parentJob: CompletableJob = SupervisorJob()
private val scope = CoroutineScope(IODispatcher + parentJob)

// State management through job lifecycle, not atomic flags
private val initialized: AtomicBoolean = AtomicBoolean(false)
private val sendChannel = Channel<JSONRPCMessage>(Channel.UNLIMITED)
private val readBuffer = ReadBuffer()

@Suppress("TooGenericExceptionCaught")
override suspend fun start() {
if (!initialized.compareAndSet(expectedValue = false, newValue = true)) {
error("StdioClientTransport already started!")
}

logger.debug { "Starting StdioClientTransport..." }

val outputStream = output.buffered()

job = scope.launch(CoroutineName("StdioClientTransport.IO#${hashCode()}")) {
val readJob = launch {
logger.debug { "Read coroutine started." }
try {
input.use {
while (isActive) {
val buffer = Buffer()
val bytesRead = input.readAtMostTo(buffer, 8192)
if (bytesRead == -1L) break
if (bytesRead > 0L) {
readBuffer.append(buffer.readByteArray())
processReadBuffer()
}
}
// Launch all I/O operations in the scope - structured concurrency ensures cleanup
scope.launch(CoroutineName("StdioClientTransport.IO#${hashCode()}")) {
try {
val outputStream = output.buffered()
val errorStream = error?.buffered()

// Use supervisorScope so individual stream failures don't cancel siblings
supervisorScope {
// Launch stdin reader
val stdinJob = launch(CoroutineName("stdin-reader")) {
readStream(input, ::processReadBuffer)
}
} catch (e: Exception) {
_onError.invoke(e)
logger.error(e) { "Error reading from input stream" }
}
}

val writeJob = launch {
logger.debug { "Write coroutine started." }
try {
sendChannel.consumeEach { message ->
val json = serializeMessage(message)
outputStream.writeString(json)
outputStream.flush()
// Launch stderr reader if present
val stderrJob = errorStream?.let {
launch(CoroutineName("stderr-reader")) {
readStream(it, ::processStderrBuffer)
}
}
} catch (e: Throwable) {
if (isActive) {
_onError.invoke(e)
logger.error(e) { "Error writing to output stream" }

// Launch writer
val writerJob = launch(CoroutineName("stdout-writer")) {
writeMessages(outputStream)
}
} finally {
output.close()

// Wait for both stdin and stderr to complete (reach EOF or get cancelled)
// When a process exits, both streams will be closed by the OS
logger.debug { "Waiting for stdin to complete..." }
stdinJob.join()
logger.debug { "stdin completed, waiting for stderr..." }
stderrJob?.join()
logger.debug { "stderr completed, cancelling writer..." }

// Cancel writer (it may be blocked waiting for channel messages)
writerJob.cancelAndJoin()
logger.debug { "writer cancelled, supervisorScope complete" }
}
} catch (e: CancellationException) {
logger.debug { "Transport cancelled: ${e.message}" }
throw e
} catch (e: Exception) {
logger.error(e) { "Transport error" }
_onError.invoke(e)
} finally {
// Cleanup: close all streams and notify
runCatching { input.close() }
runCatching { output.close() }
runCatching { error?.close() }
runCatching { sendChannel.close() }
_onClose.invoke()
}

readJob.join()
writeJob.cancelAndJoin()
_onClose.invoke()
}
}

Expand All @@ -113,23 +137,115 @@ public class StdioClientTransport(private val input: Source, private val output:
if (!initialized.compareAndSet(expectedValue = true, newValue = false)) {
error("Transport is already closed")
}
job?.cancelAndJoin()
input.close()
output.close()
readBuffer.clear()
sendChannel.close()
_onClose.invoke()

logger.debug { "Closing StdioClientTransport..." }

// Cancel scope - structured concurrency handles cleanup via finally blocks
parentJob.cancelAndJoin()
}

/**
* Reads from a source stream and processes each chunk through the provided block.
* Cancellation-aware and properly propagates CancellationException.
*/
private suspend fun CoroutineScope.readStream(source: Source, block: suspend (ReadBuffer) -> Unit) {
logger.debug { "Stream reader started" }

source.use {
val readBuffer = ReadBuffer()
while (this.isActive) {
val buffer = Buffer()
val bytesRead = it.readAtMostTo(buffer, 8192)

if (bytesRead == -1L) {
logger.debug { "EOF reached" }
break
}

if (bytesRead > 0L) {
readBuffer.append(buffer.readByteArray())
block(readBuffer)
}
}
}
}

private suspend fun processReadBuffer() {
/**
* Processes JSON-RPC messages from the read buffer.
* Each message is delivered to the onMessage callback.
*/
private suspend fun processReadBuffer(buffer: ReadBuffer) {
while (true) {
val msg = readBuffer.readMessage() ?: break
val msg = buffer.readMessage() ?: break

@Suppress("TooGenericExceptionCaught")
try {
_onMessage.invoke(msg)
} catch (e: Throwable) {
_onError.invoke(e)
logger.error(e) { "Error processing message." }
logger.error(e) { "Error processing message" }
}
}
}

/**
* Processes stderr lines from the read buffer.
* If processStdError returns true (fatal), cancels the scope.
*/
private suspend fun processStderrBuffer(buffer: ReadBuffer) {
val errorLine = buffer.readLine()
buffer.clear()

if (errorLine != null) {
val isFatal = processStdError(errorLine)

if (isFatal) {
logger.error { "Fatal stderr error: $errorLine" }

val exception = McpException(
RPCError.ErrorCode.CONNECTION_CLOSED,
"Fatal error in stderr: $errorLine",
)

// Notify error handler
_onError.invoke(exception)

// Close streams to trigger EOF - this will cause natural shutdown
// The stdin reader will complete, then we'll shut down gracefully
runCatching { input.close() }
runCatching { output.close() }

// Exit the stderr reader loop
return
} else {
logger.warn { "Non-fatal stderr warning: $errorLine" }
}
}
}

/**
* Writes JSON-RPC messages from the send channel to the output stream.
* Runs until the channel is closed or coroutine is cancelled.
*/
private suspend fun writeMessages(outputStream: Sink) {
logger.debug { "Writer started" }

try {
for (message in sendChannel) {
if (!currentCoroutineContext().isActive) break

val json = serializeMessage(message)
outputStream.writeString(json)
outputStream.flush()
}
} catch (e: Exception) {
if (currentCoroutineContext().isActive) {
_onError.invoke(e)
logger.error(e) { "Error writing to output stream" }
}
throw e
}

logger.debug { "Writer finished" }
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -123,8 +123,6 @@ class StreamableHttpClientTransportTest {

@Test
fun testTerminateSession() = runTest {
// transport.sessionId = "test-session-id"

val transport = createTransport { request ->
assertEquals(HttpMethod.Delete, request.method)
assertEquals("test-session-id", request.headers["mcp-session-id"])
Expand All @@ -143,8 +141,6 @@ class StreamableHttpClientTransportTest {

@Test
fun testTerminateSessionHandle405() = runTest {
// transport.sessionId = "test-session-id"

val transport = createTransport { request ->
assertEquals(HttpMethod.Delete, request.method)
respond(
Expand Down
Loading
Loading