From 98a73733362eee97f56bc96b323ec231cb7a048c Mon Sep 17 00:00:00 2001 From: Maria Tigina Date: Fri, 28 Nov 2025 13:15:05 +0100 Subject: [PATCH 01/12] Support notifications --- .../kotlin/sdk/shared/Protocol.kt | 2 +- .../sdk/server/{Features.kt => Feature.kt} | 0 .../sdk/server/FeatureNotificationService.kt | 127 ++++++++++++++++++ .../kotlin/sdk/server/FeatureRegistry.kt | 73 ++++++++-- .../kotlin/sdk/server/Server.kt | 80 ++++++++++- 5 files changed, 267 insertions(+), 15 deletions(-) rename kotlin-sdk-server/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/server/{Features.kt => Feature.kt} (100%) create mode 100644 kotlin-sdk-server/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/server/FeatureNotificationService.kt diff --git a/kotlin-sdk-core/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/shared/Protocol.kt b/kotlin-sdk-core/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/shared/Protocol.kt index 7dd04bc0..0f892760 100644 --- a/kotlin-sdk-core/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/shared/Protocol.kt +++ b/kotlin-sdk-core/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/shared/Protocol.kt @@ -208,7 +208,7 @@ public abstract class Protocol(@PublishedApi internal val options: ProtocolOptio * A handler to invoke for any request types that do not have their own handler installed. */ public var fallbackRequestHandler: ( - suspend (request: JSONRPCRequest, extra: RequestHandlerExtra) -> RequestResult? + suspend (request: JSONRPCRequest, extra: RequestHandlerExtra) -> RequestResult? )? = null diff --git a/kotlin-sdk-server/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/server/Features.kt b/kotlin-sdk-server/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/server/Feature.kt similarity index 100% rename from kotlin-sdk-server/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/server/Features.kt rename to kotlin-sdk-server/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/server/Feature.kt diff --git a/kotlin-sdk-server/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/server/FeatureNotificationService.kt b/kotlin-sdk-server/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/server/FeatureNotificationService.kt new file mode 100644 index 00000000..29d96831 --- /dev/null +++ b/kotlin-sdk-server/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/server/FeatureNotificationService.kt @@ -0,0 +1,127 @@ +package io.modelcontextprotocol.kotlin.sdk.server + +import io.github.oshai.kotlinlogging.KotlinLogging +import io.modelcontextprotocol.kotlin.sdk.types.Notification +import io.modelcontextprotocol.kotlin.sdk.types.PromptListChangedNotification +import io.modelcontextprotocol.kotlin.sdk.types.ResourceListChangedNotification +import io.modelcontextprotocol.kotlin.sdk.types.ResourceUpdatedNotification +import io.modelcontextprotocol.kotlin.sdk.types.ResourceUpdatedNotificationParams +import io.modelcontextprotocol.kotlin.sdk.types.ToolListChangedNotification +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.Job +import kotlinx.coroutines.SupervisorJob +import kotlinx.coroutines.flow.MutableSharedFlow +import kotlinx.coroutines.launch + +internal class FeatureNotificationService { + private val notifications = MutableSharedFlow() + private val notificationScope = CoroutineScope(SupervisorJob() + Dispatchers.Default) + + private val notificationSessionFeatureJobs: MutableMap = mutableMapOf() + private val notificationSessionResourceJobs: MutableMap, Job> = mutableMapOf() + + private val logger = KotlinLogging.logger {} + + private val toolFeatureListener: FeatureListener by lazy { + object : FeatureListener { + override fun onListChanged() { + logger.debug { "Emitting tool list changed notification" } + emit(ToolListChangedNotification()) + } + + override fun onFeatureUpdated(featureKey: FeatureKey) { + logger.debug { "Skipping update for tool feature key: $featureKey" } + } + } + } + + private val promptFeatureListener: FeatureListener by lazy { + object : FeatureListener { + override fun onListChanged() { + logger.debug { "Emitting prompt list changed notification" } + emit(PromptListChangedNotification()) + } + + override fun onFeatureUpdated(featureKey: FeatureKey) { + logger.debug { "Skipping update for prompt feature key: $featureKey" } + } + } + } + + private val resourceFeatureListener: FeatureListener by lazy { + object : FeatureListener { + override fun onListChanged() { + logger.debug { "Emitting resource list changed notification" } + emit(ResourceListChangedNotification()) + } + + override fun onFeatureUpdated(featureKey: FeatureKey) { + logger.debug { "Emitting resource updated notification for feature key: $featureKey" } + emit(ResourceUpdatedNotification(ResourceUpdatedNotificationParams(uri = featureKey))) + } + } + } + + internal fun getToolFeatureListener(): FeatureListener = toolFeatureListener + internal fun getPromptFeatureListener(): FeatureListener = promptFeatureListener + internal fun geResourceFeatureListener(): FeatureListener = resourceFeatureListener + + internal fun subscribeToListChangedNotification(session: ServerSession) { + logger.debug { "Subscribing to list changed notifications for sessionId: ${session.sessionId}" } + notificationSessionFeatureJobs[session.sessionId] = notificationScope.launch { + notifications.collect { notification -> + when (notification) { + is PromptListChangedNotification -> session.notification(notification) + is ResourceListChangedNotification -> session.notification(notification) + is ToolListChangedNotification -> session.notification(notification) + else -> logger.debug { + "Notification not handled for sessionId ${session.sessionId}: $notification" + } + } + } + } + logger.debug { "Subscribed to list changed notifications for sessionId: ${session.sessionId}" } + } + + internal fun unsubscribeFromListChangedNotification(session: ServerSession) { + logger.debug { "Unsubscribing from list changed notifications for sessionId: ${session.sessionId}" } + notificationSessionFeatureJobs[session.sessionId]?.cancel() + notificationSessionFeatureJobs.remove(session.sessionId) + logger.debug { "Unsubscribed from list changed notifications for sessionId: ${session.sessionId}" } + } + + internal fun subscribeToResourceUpdateNotifications(session: ServerSession, resourceKey: FeatureKey) { + logger.debug { "Subscribing to resource update notifications for sessionId: ${session.sessionId}" } + notificationSessionResourceJobs[session.sessionId to resourceKey] = notificationScope.launch { + notifications.collect { notification -> + when (notification) { + is ResourceUpdatedNotification -> { + if (notification.params.uri == resourceKey) { + session.notification(notification) + } + } + + else -> logger.debug { + "Notification not handled for session for sessionId ${session.sessionId}: $notification" + } + } + } + } + logger.debug { "Subscribed to resource update notifications for sessionId: ${session.sessionId}" } + } + + internal fun unsubscribeFromResourceUpdateNotifications(session: ServerSession, resourceKey: FeatureKey) { + logger.debug { "Unsubscribing from resourcec update notifications for sessionId: ${session.sessionId}" } + notificationSessionResourceJobs[session.sessionId to resourceKey]?.cancel() + notificationSessionResourceJobs.remove(session.sessionId to resourceKey) + logger.debug { "Unsubscribed from resourcec update notifications for sessionId: ${session.sessionId}" } + } + + + private fun emit(notification: Notification) { + notificationScope.launch { + notifications.emit(notification) + } + } +} diff --git a/kotlin-sdk-server/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/server/FeatureRegistry.kt b/kotlin-sdk-server/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/server/FeatureRegistry.kt index 1749ef64..d7fe3519 100644 --- a/kotlin-sdk-server/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/server/FeatureRegistry.kt +++ b/kotlin-sdk-server/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/server/FeatureRegistry.kt @@ -5,9 +5,19 @@ import kotlinx.atomicfu.atomic import kotlinx.atomicfu.getAndUpdate import kotlinx.atomicfu.update import kotlinx.collections.immutable.minus +import kotlinx.collections.immutable.persistentListOf import kotlinx.collections.immutable.persistentMapOf import kotlinx.collections.immutable.toPersistentSet + +/** + * A listener interface for receiving notifications about feature changes in registry. + */ +internal interface FeatureListener { + fun onListChanged() + fun onFeatureUpdated(featureKey: String) +} + /** * A generic registry for managing features of a specified type. This class provides thread-safe * operations for adding, removing, and retrieving features from the registry. @@ -33,6 +43,16 @@ internal class FeatureRegistry(private val featureType: String) { internal val values: Map get() = registry.value + private val listeners = atomic(persistentListOf()) + + internal fun addListener(listener: FeatureListener) { + listeners.update { it.add(listener) } + } + + internal fun removeListener(listener: FeatureListener) { + listeners.update { it.remove(listener) } + } + /** * Adds the specified feature to the registry. * @@ -40,8 +60,12 @@ internal class FeatureRegistry(private val featureType: String) { */ internal fun add(feature: T) { logger.info { "Adding $featureType: \"${feature.key}\"" } - registry.update { current -> current.put(feature.key, feature) } + val oldMap = registry.getAndUpdate { current -> current.put(feature.key, feature) } + val oldFeature = oldMap[feature.key] logger.info { "Added $featureType: \"${feature.key}\"" } + + notifyFeatureUpdated(oldFeature, feature) + notifyListChanged() } /** @@ -52,8 +76,14 @@ internal class FeatureRegistry(private val featureType: String) { */ internal fun addAll(features: List) { logger.info { "Adding ${featureType}s: ${features.size}" } - registry.update { current -> current.putAll(features.associateBy { it.key }) } + val oldMap = registry.getAndUpdate { current -> current.putAll(features.associateBy { it.key }) } + for (feature in features) { + val oldFeature = oldMap[feature.key] + notifyFeatureUpdated(oldFeature, feature) + } logger.info { "Added ${featureType}s: ${features.size}" } + + notifyListChanged() } /** @@ -66,7 +96,8 @@ internal class FeatureRegistry(private val featureType: String) { logger.info { "Removing $featureType: \"$key\"" } val oldMap = registry.getAndUpdate { current -> current.remove(key) } - val removed = key in oldMap + val removedFeature = oldMap[key] + val removed = removedFeature != null logger.info { if (removed) { "Removed $featureType: \"$key\"" @@ -74,7 +105,11 @@ internal class FeatureRegistry(private val featureType: String) { "$featureType not found: \"$key\"" } } - return key in oldMap + + notifyFeatureUpdated(removedFeature, null) + notifyListChanged() + + return removed } /** @@ -87,7 +122,12 @@ internal class FeatureRegistry(private val featureType: String) { logger.info { "Removing ${featureType}s: ${keys.size}" } val oldMap = registry.getAndUpdate { current -> current - keys.toPersistentSet() } - val removedCount = keys.count { it in oldMap } + val removedFeatures = keys.mapNotNull { oldMap[it] } + val removedCount = removedFeatures.size + removedFeatures.forEach { + notifyFeatureUpdated(it, null) + } + logger.info { if (removedCount > 0) { "Removed ${featureType}s: $removedCount" @@ -108,13 +148,24 @@ internal class FeatureRegistry(private val featureType: String) { internal fun get(key: FeatureKey): T? { logger.info { "Getting $featureType: \"$key\"" } val feature = registry.value[key] - logger.info { - if (feature != null) { - "Got $featureType: \"$key\"" - } else { - "$featureType not found: \"$key\"" - } + if (feature != null) { + logger.info { "Got $featureType: \"$key\"" } + } else { + logger.info { "$featureType not found: \"$key\"" } } + return feature } + + private fun notifyListChanged() { + logger.info { "Notifying listeners of list change" } + listeners.value.forEach { it.onListChanged() } + } + + private fun notifyFeatureUpdated(oldFeature: T?, newFeature: T?) { + logger.info { "Notifying listeners of feature update" } + val featureKey = (oldFeature?.key ?: newFeature?.key) ?: return + + listeners.value.forEach { it.onFeatureUpdated(featureKey) } + } } diff --git a/kotlin-sdk-server/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/server/Server.kt b/kotlin-sdk-server/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/server/Server.kt index 7fa7a98f..777ecfc4 100644 --- a/kotlin-sdk-server/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/server/Server.kt +++ b/kotlin-sdk-server/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/server/Server.kt @@ -26,6 +26,7 @@ import io.modelcontextprotocol.kotlin.sdk.types.ListToolsRequest import io.modelcontextprotocol.kotlin.sdk.types.ListToolsResult import io.modelcontextprotocol.kotlin.sdk.types.LoggingMessageNotification import io.modelcontextprotocol.kotlin.sdk.types.Method +import io.modelcontextprotocol.kotlin.sdk.types.Notification import io.modelcontextprotocol.kotlin.sdk.types.Prompt import io.modelcontextprotocol.kotlin.sdk.types.PromptArgument import io.modelcontextprotocol.kotlin.sdk.types.ReadResourceRequest @@ -33,11 +34,14 @@ import io.modelcontextprotocol.kotlin.sdk.types.ReadResourceResult import io.modelcontextprotocol.kotlin.sdk.types.Resource import io.modelcontextprotocol.kotlin.sdk.types.ResourceUpdatedNotification import io.modelcontextprotocol.kotlin.sdk.types.ServerCapabilities +import io.modelcontextprotocol.kotlin.sdk.types.SubscribeRequest import io.modelcontextprotocol.kotlin.sdk.types.TextContent import io.modelcontextprotocol.kotlin.sdk.types.Tool import io.modelcontextprotocol.kotlin.sdk.types.ToolAnnotations import io.modelcontextprotocol.kotlin.sdk.types.ToolSchema +import io.modelcontextprotocol.kotlin.sdk.types.UnsubscribeRequest import kotlinx.coroutines.CancellationException +import kotlinx.coroutines.Deferred import kotlinx.serialization.json.JsonObject private val logger = KotlinLogging.logger {} @@ -88,6 +92,8 @@ public open class Server( private val sessionRegistry = ServerSessionRegistry() + private val notificationService = FeatureNotificationService() + /** * Provides a snapshot of all sessions currently registered in the server */ @@ -103,9 +109,21 @@ public open class Server( @Suppress("ktlint:standard:backing-property-naming") private var _onClose: () -> Unit = {} - private val toolRegistry = FeatureRegistry("Tool") - private val promptRegistry = FeatureRegistry("Prompt") - private val resourceRegistry = FeatureRegistry("Resource") + private val toolRegistry = FeatureRegistry("Tool").apply { + if (options.capabilities.tools?.listChanged ?: false) { + addListener(notificationService.getToolFeatureListener()) + } + } + private val promptRegistry = FeatureRegistry("Prompt").apply { + if (options.capabilities.prompts?.listChanged ?: false) { + addListener(notificationService.getPromptFeatureListener()) + } + } + private val resourceRegistry = FeatureRegistry("Resource").apply { + if (options.capabilities.resources?.listChanged ?: false) { + addListener(notificationService.geResourceFeatureListener()) + } + } public val tools: Map get() = toolRegistry.values @@ -182,17 +200,29 @@ public open class Server( session.setRequestHandler(Method.Defined.ResourcesTemplatesList) { _, _ -> handleListResourceTemplates() } + if (options.capabilities.resources?.subscribe ?: false) { + session.setRequestHandler(Method.Defined.ResourcesSubscribe) { request, _ -> + handleSubscribeResources(session, request) + null + } + session.setRequestHandler(Method.Defined.ResourcesUnsubscribe) { request, _ -> + handleUnsubscribeResources(session, request) + null + } + } } // Register cleanup handler to remove session from list when it closes session.onClose { logger.debug { "Removing closed session from active sessions list" } + notificationService.unsubscribeFromListChangedNotification(session) sessionRegistry.removeSession(session.sessionId) } logger.debug { "Server session connecting to transport" } session.connect(transport) logger.debug { "Server session successfully connected to transport" } sessionRegistry.addSession(session) + notificationService.subscribeToListChangedNotification(session) _onConnect() return session @@ -484,6 +514,24 @@ public open class Server( } // --- Internal Handlers --- + private suspend fun handleSubscribeResources(session: ServerSession, request: SubscribeRequest) { + if (options.capabilities.resources?.subscribe ?: false) { + logger.debug { "Subscribing to resources" } + notificationService.subscribeToResourceUpdateNotifications(session, request.params.uri) + } else { + logger.debug { "Failed to subscribe to resources: Server does not support resources capability" } + } + } + + private suspend fun handleUnsubscribeResources(session: ServerSession, request: UnsubscribeRequest) { + if (options.capabilities.resources?.subscribe ?: false) { + logger.debug { "Unsubscribing from resources" } + notificationService.unsubscribeFromResourceUpdateNotifications(session, request.params.uri) + } else { + logger.debug { "Failed to unsubscribe from resources: Server does not support resources capability" } + } + } + private suspend fun handleListTools(): ListToolsResult { val toolList = tools.values.map { it.tool } return ListToolsResult(tools = toolList, nextCursor = null) @@ -675,4 +723,30 @@ public open class Server( } } // End the ServerSession redirection section + + // Start the notification handling section + public fun setNotificationHandler(method: Method, handler: (notification: T) -> Deferred) { + sessions.forEach { (_, session) -> + session.setNotificationHandler(method, handler) + } + } + + public fun removeNotificationHandler(method: Method) { + sessions.forEach { (_, session) -> + session.removeNotificationHandler(method) + } + } + + public fun setNotificationHandler( + sessionId: String, + method: Method, + handler: (notification: T) -> Deferred + ) { + sessionRegistry.getSessionOrNull(sessionId)?.setNotificationHandler(method, handler) + } + + public fun removeNotificationHandler(sessionId: String, method: Method) { + sessionRegistry.getSessionOrNull(sessionId)?.removeNotificationHandler(method) + } + // End the notification handling section } From 2b1c21fa478a5307ccf84a3ae081b2dcc618a15a Mon Sep 17 00:00:00 2001 From: Maria Tigina Date: Fri, 28 Nov 2025 13:47:44 +0100 Subject: [PATCH 02/12] Fix linter --- .../io/modelcontextprotocol/kotlin/sdk/shared/Protocol.kt | 2 +- .../kotlin/sdk/server/FeatureNotificationService.kt | 4 +++- .../kotlin/sdk/server/FeatureRegistry.kt | 1 - .../io/modelcontextprotocol/kotlin/sdk/server/Server.kt | 6 +++--- 4 files changed, 7 insertions(+), 6 deletions(-) diff --git a/kotlin-sdk-core/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/shared/Protocol.kt b/kotlin-sdk-core/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/shared/Protocol.kt index 0f892760..7dd04bc0 100644 --- a/kotlin-sdk-core/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/shared/Protocol.kt +++ b/kotlin-sdk-core/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/shared/Protocol.kt @@ -208,7 +208,7 @@ public abstract class Protocol(@PublishedApi internal val options: ProtocolOptio * A handler to invoke for any request types that do not have their own handler installed. */ public var fallbackRequestHandler: ( - suspend (request: JSONRPCRequest, extra: RequestHandlerExtra) -> RequestResult? + suspend (request: JSONRPCRequest, extra: RequestHandlerExtra) -> RequestResult? )? = null diff --git a/kotlin-sdk-server/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/server/FeatureNotificationService.kt b/kotlin-sdk-server/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/server/FeatureNotificationService.kt index 29d96831..a4edaf2b 100644 --- a/kotlin-sdk-server/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/server/FeatureNotificationService.kt +++ b/kotlin-sdk-server/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/server/FeatureNotificationService.kt @@ -73,8 +73,11 @@ internal class FeatureNotificationService { notifications.collect { notification -> when (notification) { is PromptListChangedNotification -> session.notification(notification) + is ResourceListChangedNotification -> session.notification(notification) + is ToolListChangedNotification -> session.notification(notification) + else -> logger.debug { "Notification not handled for sessionId ${session.sessionId}: $notification" } @@ -118,7 +121,6 @@ internal class FeatureNotificationService { logger.debug { "Unsubscribed from resourcec update notifications for sessionId: ${session.sessionId}" } } - private fun emit(notification: Notification) { notificationScope.launch { notifications.emit(notification) diff --git a/kotlin-sdk-server/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/server/FeatureRegistry.kt b/kotlin-sdk-server/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/server/FeatureRegistry.kt index d7fe3519..74003ad4 100644 --- a/kotlin-sdk-server/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/server/FeatureRegistry.kt +++ b/kotlin-sdk-server/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/server/FeatureRegistry.kt @@ -9,7 +9,6 @@ import kotlinx.collections.immutable.persistentListOf import kotlinx.collections.immutable.persistentMapOf import kotlinx.collections.immutable.toPersistentSet - /** * A listener interface for receiving notifications about feature changes in registry. */ diff --git a/kotlin-sdk-server/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/server/Server.kt b/kotlin-sdk-server/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/server/Server.kt index 777ecfc4..9d7391a3 100644 --- a/kotlin-sdk-server/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/server/Server.kt +++ b/kotlin-sdk-server/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/server/Server.kt @@ -215,14 +215,14 @@ public open class Server( // Register cleanup handler to remove session from list when it closes session.onClose { logger.debug { "Removing closed session from active sessions list" } - notificationService.unsubscribeFromListChangedNotification(session) +// notificationService.unsubscribeFromListChangedNotification(session) sessionRegistry.removeSession(session.sessionId) } logger.debug { "Server session connecting to transport" } session.connect(transport) logger.debug { "Server session successfully connected to transport" } sessionRegistry.addSession(session) - notificationService.subscribeToListChangedNotification(session) +// notificationService.subscribeToListChangedNotification(session) _onConnect() return session @@ -740,7 +740,7 @@ public open class Server( public fun setNotificationHandler( sessionId: String, method: Method, - handler: (notification: T) -> Deferred + handler: (notification: T) -> Deferred, ) { sessionRegistry.getSessionOrNull(sessionId)?.setNotificationHandler(method, handler) } From 592a3aa6c1df9add0facba063253b2cdab1ea4de Mon Sep 17 00:00:00 2001 From: Maria Tigina Date: Mon, 1 Dec 2025 13:52:07 +0100 Subject: [PATCH 03/12] Support notifications for server --- kotlin-sdk-server/api/kotlin-sdk-server.api | 4 + .../sdk/server/FeatureNotificationService.kt | 396 +++++++++++++++--- .../kotlin/sdk/server/FeatureRegistry.kt | 50 +-- .../kotlin/sdk/server/Server.kt | 21 +- .../sdk/integration/InMemoryTransportTest.kt | 96 +++++ .../server/ServerPromptsNotificationTest.kt | 111 +++++ .../kotlin/sdk/server/ServerPromptsTest.kt | 48 ++- ...erverResourcesNotificationSubscribeTest.kt | 184 ++++++++ .../server/ServerResourcesNotificationTest.kt | 150 +++++++ .../kotlin/sdk/server/ServerResourcesTest.kt | 4 +- .../sdk/server/ServerToolsNotificationTest.kt | 99 +++++ .../kotlin/sdk/server/ServerToolsTest.kt | 75 +++- 12 files changed, 1112 insertions(+), 126 deletions(-) create mode 100644 kotlin-sdk-test/src/jvmTest/kotlin/io/modelcontextprotocol/kotlin/sdk/server/ServerPromptsNotificationTest.kt create mode 100644 kotlin-sdk-test/src/jvmTest/kotlin/io/modelcontextprotocol/kotlin/sdk/server/ServerResourcesNotificationSubscribeTest.kt create mode 100644 kotlin-sdk-test/src/jvmTest/kotlin/io/modelcontextprotocol/kotlin/sdk/server/ServerResourcesNotificationTest.kt create mode 100644 kotlin-sdk-test/src/jvmTest/kotlin/io/modelcontextprotocol/kotlin/sdk/server/ServerToolsNotificationTest.kt diff --git a/kotlin-sdk-server/api/kotlin-sdk-server.api b/kotlin-sdk-server/api/kotlin-sdk-server.api index 8016d3bc..e5183f07 100644 --- a/kotlin-sdk-server/api/kotlin-sdk-server.api +++ b/kotlin-sdk-server/api/kotlin-sdk-server.api @@ -82,6 +82,8 @@ public class io/modelcontextprotocol/kotlin/sdk/server/Server { public final fun onConnect (Lkotlin/jvm/functions/Function0;)V public final fun onInitialized (Lkotlin/jvm/functions/Function0;)V public final fun ping (Ljava/lang/String;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; + public final fun removeNotificationHandler (Lio/modelcontextprotocol/kotlin/sdk/types/Method;)V + public final fun removeNotificationHandler (Ljava/lang/String;Lio/modelcontextprotocol/kotlin/sdk/types/Method;)V public final fun removePrompt (Ljava/lang/String;)Z public final fun removePrompts (Ljava/util/List;)I public final fun removeResource (Ljava/lang/String;)Z @@ -93,6 +95,8 @@ public class io/modelcontextprotocol/kotlin/sdk/server/Server { public final fun sendResourceListChanged (Ljava/lang/String;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; public final fun sendResourceUpdated (Ljava/lang/String;Lio/modelcontextprotocol/kotlin/sdk/types/ResourceUpdatedNotification;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; public final fun sendToolListChanged (Ljava/lang/String;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; + public final fun setNotificationHandler (Lio/modelcontextprotocol/kotlin/sdk/types/Method;Lkotlin/jvm/functions/Function1;)V + public final fun setNotificationHandler (Ljava/lang/String;Lio/modelcontextprotocol/kotlin/sdk/types/Method;Lkotlin/jvm/functions/Function1;)V } public final class io/modelcontextprotocol/kotlin/sdk/server/ServerOptions : io/modelcontextprotocol/kotlin/sdk/shared/ProtocolOptions { diff --git a/kotlin-sdk-server/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/server/FeatureNotificationService.kt b/kotlin-sdk-server/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/server/FeatureNotificationService.kt index a4edaf2b..252898d4 100644 --- a/kotlin-sdk-server/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/server/FeatureNotificationService.kt +++ b/kotlin-sdk-server/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/server/FeatureNotificationService.kt @@ -7,123 +7,385 @@ import io.modelcontextprotocol.kotlin.sdk.types.ResourceListChangedNotification import io.modelcontextprotocol.kotlin.sdk.types.ResourceUpdatedNotification import io.modelcontextprotocol.kotlin.sdk.types.ResourceUpdatedNotificationParams import io.modelcontextprotocol.kotlin.sdk.types.ToolListChangedNotification +import kotlinx.atomicfu.atomic +import kotlinx.atomicfu.getAndUpdate +import kotlinx.collections.immutable.persistentMapOf +import kotlinx.collections.immutable.persistentSetOf import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.CoroutineStart import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.Job import kotlinx.coroutines.SupervisorJob +import kotlinx.coroutines.cancel +import kotlinx.coroutines.channels.BufferOverflow import kotlinx.coroutines.flow.MutableSharedFlow +import kotlinx.coroutines.flow.SharedFlow +import kotlinx.coroutines.flow.takeWhile +import kotlinx.coroutines.joinAll import kotlinx.coroutines.launch +import kotlin.time.Clock +import kotlin.time.ExperimentalTime -internal class FeatureNotificationService { - private val notifications = MutableSharedFlow() - private val notificationScope = CoroutineScope(SupervisorJob() + Dispatchers.Default) +/** Represents an event for notification service. */ +private sealed class Event + +/** + * Represents an event for a notification. + * + * @property timestamp A timestamp for the event. + * @property notification The notification associated with the event. + */ +private class NotificationEvent(val timestamp: Long, val notification: Notification) : Event() - private val notificationSessionFeatureJobs: MutableMap = mutableMapOf() - private val notificationSessionResourceJobs: MutableMap, Job> = mutableMapOf() +/** Represents an event marking the end of notification processing. */ +private class EndEvent : Event() +/** + * Represents a job that handles session-specific notifications, processing events + * and delivering relevant notifications to the associated session. + * + * This class listens to a stream of notification events and processes them + * based on the event type and the resource subscriptions associated with the session. + * It allows subscribing to or unsubscribing from specific resource keys for granular + * notification handling. The job can also be canceled to stop processing further events. + * IDs less than or equal to this value will be skipped. + */ +private class SessionNotificationJob { + private val job: Job + private val resourceSubscriptions = atomic(persistentMapOf()) private val logger = KotlinLogging.logger {} - private val toolFeatureListener: FeatureListener by lazy { - object : FeatureListener { - override fun onListChanged() { - logger.debug { "Emitting tool list changed notification" } - emit(ToolListChangedNotification()) - } + constructor( + session: ServerSession, + scope: CoroutineScope, + events: SharedFlow, + fromTimestamp: Long, + ) { + logger.info { "Starting notification job from timestamp $fromTimestamp for sessionId: ${session.sessionId} " } + job = scope.launch { + events.takeWhile { it !is EndEvent }.collect { event -> + when (event) { + is NotificationEvent -> { + if (event.timestamp > fromTimestamp) { + when (val notification = event.notification) { + is PromptListChangedNotification -> { + logger.info { + "Sending prompt list changed notification for sessionId: ${session.sessionId}" + } + session.notification(notification) + } - override fun onFeatureUpdated(featureKey: FeatureKey) { - logger.debug { "Skipping update for tool feature key: $featureKey" } + is ResourceListChangedNotification -> { + logger.info { + "Sending resourse list changed notification for sessionId: ${session.sessionId}" + } + session.notification(notification) + } + + is ToolListChangedNotification -> { + logger.info { + "Sending tool list changed notification for sessionId: ${session.sessionId}" + } + session.notification(notification) + } + + is ResourceUpdatedNotification -> { + resourceSubscriptions.value[notification.params.uri]?.let { resourceFromTimestamp -> + if (event.timestamp > resourceFromTimestamp) { + logger.info { + "Sending notification for resource ${notification.params.uri} " + + "to sessionId: ${session.sessionId}" + } + session.notification(notification) + } else { + logger.info { + "Skipping notification for resource ${notification.params.uri} " + + "as it is older than subscription timestamp $resourceFromTimestamp" + } + } + } ?: run { + logger.info { + "No subscription for resource ${notification.params.uri}. " + + "Skipping notification." + } + } + } + + else -> { + logger.warn { "Skipping notification: $notification" } + } + } + } else { + logger.info { + "Skipping event with id: ${event.timestamp} " + + "as it is older than startingEventId $fromTimestamp" + } + } + } + + else -> { + logger.warn { "Skipping event: $event" } + } + } } } } - private val promptFeatureListener: FeatureListener by lazy { + /** + * Subscribes to a resource identified by the given feature key. + * + * @param resourceKey The key representing the resource to subscribe to. + * @param timestamp The timestamp of the subscription. + */ + fun subscribe(resourceKey: FeatureKey, timestamp: Long) { + resourceSubscriptions.getAndUpdate { it.put(resourceKey, timestamp) } + } + + /** + * Unsubscribes from a resource identified by the given feature key. + * + * @param resourceKey The key representing the resource to unsubscribe from. + */ + fun unsubscribe(resourceKey: FeatureKey) { + resourceSubscriptions.getAndUpdate { it.remove(resourceKey) } + } + + suspend fun join() { + job.join() + } + + fun cancel() { + job.cancel() + } +} + +/** + * Service responsible for managing and emitting notifications related to feature changes. + * + * This service facilitates notification subscriptions for different sessions and supports managing + * listeners for feature-related events. Notifications include changes in tool lists, prompt lists, + * resource lists, and updates to specific resources. + * + * This class operates on a background coroutine scope to handle notifications asynchronously. + * It maintains jobs associated with sessions and features for controlling active subscriptions. + * + * Key Responsibilities: + * - Emit notifications for various feature-related events. + * - Provide listeners for handling feature change events. + * - Allow clients to subscribe or unsubscribe from specific notifications. + * + * Notifications managed: + * - Tool list change notifications. + * - Prompt list change notifications. + * - Resource list change notifications. + * - Resource updates pertaining to specific resources. + */ +internal class FeatureNotificationService( + @OptIn(ExperimentalTime::class) + private val clock: Clock = Clock.System, +) { + private val closingService = atomic(false) + + /** Shared flow used to emit events within the feature notification service. */ + private val notificationEvents = MutableSharedFlow( + extraBufferCapacity = 100, + replay = 0, + onBufferOverflow = BufferOverflow.SUSPEND, + ) + + /** Coroutine scope used to handle asynchronous notifications. */ + private val notificationScope = CoroutineScope(SupervisorJob() + Dispatchers.Default) + + /** Active emit jobs. */ + private val activeEmitJobs = atomic(persistentSetOf()) + + /** Notification jobs associated with sessions. */ + private val sessionNotificationJobs = atomic(persistentMapOf()) + + private val logger = KotlinLogging.logger {} + + /** Listener for tool feature events. */ + private val toolListChangedListener: FeatureListener by lazy { object : FeatureListener { - override fun onListChanged() { - logger.debug { "Emitting prompt list changed notification" } - emit(PromptListChangedNotification()) + override fun onFeatureUpdated(featureKey: FeatureKey) { + logger.info { "Emitting tool list changed notification" } + emit(ToolListChangedNotification()) } + } + } + /** Listener for prompt feature events. */ + private val promptListChangeListener: FeatureListener by lazy { + object : FeatureListener { override fun onFeatureUpdated(featureKey: FeatureKey) { - logger.debug { "Skipping update for prompt feature key: $featureKey" } + logger.info { "Emitting prompt list changed notification" } + emit(PromptListChangedNotification()) } } } - private val resourceFeatureListener: FeatureListener by lazy { + /** Listener for resource feature events. */ + private val resourceListChangedListener: FeatureListener by lazy { object : FeatureListener { - override fun onListChanged() { - logger.debug { "Emitting resource list changed notification" } + override fun onFeatureUpdated(featureKey: FeatureKey) { + logger.info { "Emitting resource list changed notification" } emit(ResourceListChangedNotification()) } + } + } + /** Listener for resource update events. */ + private val resourceUpdatedListener: FeatureListener by lazy { + object : FeatureListener { override fun onFeatureUpdated(featureKey: FeatureKey) { - logger.debug { "Emitting resource updated notification for feature key: $featureKey" } + logger.info { "Emitting resource updated notification for feature key: $featureKey" } emit(ResourceUpdatedNotification(ResourceUpdatedNotificationParams(uri = featureKey))) } } } - internal fun getToolFeatureListener(): FeatureListener = toolFeatureListener - internal fun getPromptFeatureListener(): FeatureListener = promptFeatureListener - internal fun geResourceFeatureListener(): FeatureListener = resourceFeatureListener + /** Listener for the tool list changed events. */ + internal fun getToolListChangedListener(): FeatureListener = toolListChangedListener - internal fun subscribeToListChangedNotification(session: ServerSession) { - logger.debug { "Subscribing to list changed notifications for sessionId: ${session.sessionId}" } - notificationSessionFeatureJobs[session.sessionId] = notificationScope.launch { - notifications.collect { notification -> - when (notification) { - is PromptListChangedNotification -> session.notification(notification) + /** Listener for the prompt list changed events. */ + internal fun getPromptListChangedListener(): FeatureListener = promptListChangeListener - is ResourceListChangedNotification -> session.notification(notification) + /** Listener for the resource list changed events. */ + internal fun getResourceListChangedListener(): FeatureListener = resourceListChangedListener - is ToolListChangedNotification -> session.notification(notification) + /** Listener for resource update events. */ + internal fun getResourceUpdateListener(): FeatureListener = resourceUpdatedListener - else -> logger.debug { - "Notification not handled for sessionId ${session.sessionId}: $notification" - } - } + /** + * Subscribes session to list changed notifications for all features and resource update notifications. + * For each session the job is created and stored until the [unsubscribeSession] method is called. + * In case of session already subscribed to list changed notifications, the method will skip the subscription and + * continue to send notification using the existing job. + * + * @param session The session to subscribe. + */ + internal fun subscribeSession(session: ServerSession) { + logger.info { "Subscribing session for notifications sessionId: ${session.sessionId}" } + + val timestamp = getCurrentTimestamp() + + sessionNotificationJobs.getAndUpdate { + if (it.containsKey(session.sessionId)) { + logger.info { "Session already subscribed: ${session.sessionId}" } + return@getAndUpdate it + } else { + it.put( + session.sessionId, + SessionNotificationJob( + session = session, + scope = notificationScope, + events = notificationEvents, + // Save the first event id to process, as notification can be emitted after the subscription + fromTimestamp = timestamp, + ), + ) } } - logger.debug { "Subscribed to list changed notifications for sessionId: ${session.sessionId}" } + + logger.info { "Subscribed session for notifications sessionId: ${session.sessionId}" } } - internal fun unsubscribeFromListChangedNotification(session: ServerSession) { - logger.debug { "Unsubscribing from list changed notifications for sessionId: ${session.sessionId}" } - notificationSessionFeatureJobs[session.sessionId]?.cancel() - notificationSessionFeatureJobs.remove(session.sessionId) - logger.debug { "Unsubscribed from list changed notifications for sessionId: ${session.sessionId}" } + /** + * Unsubscribes a session from list changed notifications for all features. + * Cancels and removes the job associated with the given session's notifications. + * + * @param session The session to unsubscribe from list changed notifications. + */ + internal fun unsubscribeSession(session: ServerSession) { + logger.info { "Unsubscribing from list changed notifications for sessionId: ${session.sessionId}" } + sessionNotificationJobs.getAndUpdate { + it[session.sessionId]?.cancel() + it.remove(session.sessionId) + } + logger.info { "Unsubscribed from list changed notifications for sessionId: ${session.sessionId}" } } - internal fun subscribeToResourceUpdateNotifications(session: ServerSession, resourceKey: FeatureKey) { - logger.debug { "Subscribing to resource update notifications for sessionId: ${session.sessionId}" } - notificationSessionResourceJobs[session.sessionId to resourceKey] = notificationScope.launch { - notifications.collect { notification -> - when (notification) { - is ResourceUpdatedNotification -> { - if (notification.params.uri == resourceKey) { - session.notification(notification) - } - } + /** + * Subscribes a session to notifications for resource updates pertaining to the given resource key. + * + * @param session The session to subscribe. + * @param resourceKey The resource key to subscribe to. + */ + internal fun subscribeToResourceUpdate(session: ServerSession, resourceKey: FeatureKey) { + logger.info { "Subscribing to resource $resourceKey update notifications for sessionId: ${session.sessionId}" } + // Set starting event id for resources notifications to skip events emitted before the subscription + sessionNotificationJobs.value[session.sessionId]?.subscribe(resourceKey, getCurrentTimestamp()) + logger.info { "Subscribed to resource $resourceKey update notifications for sessionId: ${session.sessionId}" } + } - else -> logger.debug { - "Notification not handled for session for sessionId ${session.sessionId}: $notification" - } - } - } + /** + * Unsubscribes a session from notifications for resource updates pertaining to the given resource key. + * + * @param session The session to unsubscribe from. + * @param resourceKey The resource key to unsubscribe from. + */ + internal fun unsubscribeFromResourceUpdate(session: ServerSession, resourceKey: FeatureKey) { + logger.info { + "Unsubscribing from resource $resourceKey update notifications for sessionId: ${session.sessionId}" + } + sessionNotificationJobs.value[session.sessionId]?.unsubscribe(resourceKey) + logger.info { + "Unsubscribed from resource $resourceKey update notifications for sessionId: ${session.sessionId}" } - logger.debug { "Subscribed to resource update notifications for sessionId: ${session.sessionId}" } } - internal fun unsubscribeFromResourceUpdateNotifications(session: ServerSession, resourceKey: FeatureKey) { - logger.debug { "Unsubscribing from resourcec update notifications for sessionId: ${session.sessionId}" } - notificationSessionResourceJobs[session.sessionId to resourceKey]?.cancel() - notificationSessionResourceJobs.remove(session.sessionId to resourceKey) - logger.debug { "Unsubscribed from resourcec update notifications for sessionId: ${session.sessionId}" } + /** Emits a notification to all active sessions. */ + private fun emit(notification: Notification) { + // Create a timestamp before emit to ensure notifications are processed in order + val timestamp = getCurrentTimestamp() + if (closingService.value) { + logger.warn { "Skipping emitting notification as service is closing: $notification" } + return + } + + logger.info { "Emitting notification $timestamp: $notification" } + + // Launching emit lazily to put it to the jobs queue before the completion + val job = notificationScope.launch(start = CoroutineStart.LAZY) { + logger.info { "Actually emitting notification $timestamp: $notification" } + notificationEvents.emit(NotificationEvent(timestamp, notification)) + logger.info { "Notification emitted $timestamp: $notification" } + } + + // Add job to set before starting + activeEmitJobs.getAndUpdate { it.add(job) } + + // Register completion + job.invokeOnCompletion { + activeEmitJobs.getAndUpdate { it.remove(job) } + } + + // Start the job after it's safely added + job.start() } - private fun emit(notification: Notification) { + /** Returns the current timestamp in milliseconds. */ + @OptIn(ExperimentalTime::class) + private fun getCurrentTimestamp(): Long = clock.now().toEpochMilliseconds() + + suspend fun close() { + logger.info { "Closing feature notification service" } + closingService.compareAndSet(false, update = true) + + // Making sure all emit jobs are completed + activeEmitJobs.value.joinAll() + + // Emitting end event to complete all session notification jobs notificationScope.launch { - notifications.emit(notification) - } + logger.info { "Emitting end event" } + notificationEvents.emit(EndEvent()) + logger.info { "End event emitted" } + }.join() + + // Making sure all session notification jobs are completed (after receiving end event) + sessionNotificationJobs.value.values.forEach { it.join() } + + // Cancelling notification scope to stop processing further events + notificationScope.cancel() } } diff --git a/kotlin-sdk-server/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/server/FeatureRegistry.kt b/kotlin-sdk-server/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/server/FeatureRegistry.kt index 74003ad4..0ed65588 100644 --- a/kotlin-sdk-server/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/server/FeatureRegistry.kt +++ b/kotlin-sdk-server/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/server/FeatureRegistry.kt @@ -13,7 +13,6 @@ import kotlinx.collections.immutable.toPersistentSet * A listener interface for receiving notifications about feature changes in registry. */ internal interface FeatureListener { - fun onListChanged() fun onFeatureUpdated(featureKey: String) } @@ -61,10 +60,9 @@ internal class FeatureRegistry(private val featureType: String) { logger.info { "Adding $featureType: \"${feature.key}\"" } val oldMap = registry.getAndUpdate { current -> current.put(feature.key, feature) } val oldFeature = oldMap[feature.key] - logger.info { "Added $featureType: \"${feature.key}\"" } + logger.info { "Added $featureType: \"${feature.key}\"" } notifyFeatureUpdated(oldFeature, feature) - notifyListChanged() } /** @@ -76,13 +74,12 @@ internal class FeatureRegistry(private val featureType: String) { internal fun addAll(features: List) { logger.info { "Adding ${featureType}s: ${features.size}" } val oldMap = registry.getAndUpdate { current -> current.putAll(features.associateBy { it.key }) } + + logger.info { "Added ${featureType}s: ${features.size}" } for (feature in features) { val oldFeature = oldMap[feature.key] notifyFeatureUpdated(oldFeature, feature) } - logger.info { "Added ${featureType}s: ${features.size}" } - - notifyListChanged() } /** @@ -97,16 +94,13 @@ internal class FeatureRegistry(private val featureType: String) { val removedFeature = oldMap[key] val removed = removedFeature != null - logger.info { - if (removed) { - "Removed $featureType: \"$key\"" - } else { - "$featureType not found: \"$key\"" - } - } - notifyFeatureUpdated(removedFeature, null) - notifyListChanged() + if (removed) { + logger.info { "Removed $featureType: \"$key\"" } + notifyFeatureUpdated(removedFeature, null) + } else { + logger.info { "$featureType not found: \"$key\"" } + } return removed } @@ -123,18 +117,16 @@ internal class FeatureRegistry(private val featureType: String) { val removedFeatures = keys.mapNotNull { oldMap[it] } val removedCount = removedFeatures.size + + if (removedCount > 0) { + logger.info { "Removed ${featureType}s: $removedCount" } + } else { + logger.info { "No $featureType were removed" } + } removedFeatures.forEach { notifyFeatureUpdated(it, null) } - logger.info { - if (removedCount > 0) { - "Removed ${featureType}s: $removedCount" - } else { - "No $featureType were removed" - } - } - return removedCount } @@ -156,15 +148,13 @@ internal class FeatureRegistry(private val featureType: String) { return feature } - private fun notifyListChanged() { - logger.info { "Notifying listeners of list change" } - listeners.value.forEach { it.onListChanged() } - } - private fun notifyFeatureUpdated(oldFeature: T?, newFeature: T?) { - logger.info { "Notifying listeners of feature update" } - val featureKey = (oldFeature?.key ?: newFeature?.key) ?: return + val featureKey = (oldFeature?.key ?: newFeature?.key) ?: run { + logger.error { "Notification should have feature key, but none found" } + return + } + logger.info { "Notifying listeners on feature update" } listeners.value.forEach { it.onFeatureUpdated(featureKey) } } } diff --git a/kotlin-sdk-server/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/server/Server.kt b/kotlin-sdk-server/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/server/Server.kt index 9d7391a3..57d82ef4 100644 --- a/kotlin-sdk-server/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/server/Server.kt +++ b/kotlin-sdk-server/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/server/Server.kt @@ -43,6 +43,7 @@ import io.modelcontextprotocol.kotlin.sdk.types.UnsubscribeRequest import kotlinx.coroutines.CancellationException import kotlinx.coroutines.Deferred import kotlinx.serialization.json.JsonObject +import kotlin.time.ExperimentalTime private val logger = KotlinLogging.logger {} @@ -92,6 +93,7 @@ public open class Server( private val sessionRegistry = ServerSessionRegistry() + @OptIn(ExperimentalTime::class) private val notificationService = FeatureNotificationService() /** @@ -111,17 +113,20 @@ public open class Server( private val toolRegistry = FeatureRegistry("Tool").apply { if (options.capabilities.tools?.listChanged ?: false) { - addListener(notificationService.getToolFeatureListener()) + addListener(notificationService.getToolListChangedListener()) } } private val promptRegistry = FeatureRegistry("Prompt").apply { if (options.capabilities.prompts?.listChanged ?: false) { - addListener(notificationService.getPromptFeatureListener()) + addListener(notificationService.getPromptListChangedListener()) } } private val resourceRegistry = FeatureRegistry("Resource").apply { if (options.capabilities.resources?.listChanged ?: false) { - addListener(notificationService.geResourceFeatureListener()) + addListener(notificationService.getResourceListChangedListener()) + } + if (options.capabilities.resources?.subscribe ?: false) { + addListener(notificationService.getResourceUpdateListener()) } } @@ -138,6 +143,7 @@ public open class Server( public suspend fun close() { logger.debug { "Closing MCP server" } + notificationService.close() sessions.forEach { (sessionId, session) -> logger.info { "Closing session $sessionId" } session.close() @@ -215,14 +221,15 @@ public open class Server( // Register cleanup handler to remove session from list when it closes session.onClose { logger.debug { "Removing closed session from active sessions list" } -// notificationService.unsubscribeFromListChangedNotification(session) + notificationService.unsubscribeSession(session) sessionRegistry.removeSession(session.sessionId) } + logger.debug { "Server session connecting to transport" } session.connect(transport) logger.debug { "Server session successfully connected to transport" } sessionRegistry.addSession(session) -// notificationService.subscribeToListChangedNotification(session) + notificationService.subscribeSession(session) _onConnect() return session @@ -517,7 +524,7 @@ public open class Server( private suspend fun handleSubscribeResources(session: ServerSession, request: SubscribeRequest) { if (options.capabilities.resources?.subscribe ?: false) { logger.debug { "Subscribing to resources" } - notificationService.subscribeToResourceUpdateNotifications(session, request.params.uri) + notificationService.subscribeToResourceUpdate(session, request.params.uri) } else { logger.debug { "Failed to subscribe to resources: Server does not support resources capability" } } @@ -526,7 +533,7 @@ public open class Server( private suspend fun handleUnsubscribeResources(session: ServerSession, request: UnsubscribeRequest) { if (options.capabilities.resources?.subscribe ?: false) { logger.debug { "Unsubscribing from resources" } - notificationService.unsubscribeFromResourceUpdateNotifications(session, request.params.uri) + notificationService.unsubscribeFromResourceUpdate(session, request.params.uri) } else { logger.debug { "Failed to unsubscribe from resources: Server does not support resources capability" } } diff --git a/kotlin-sdk-test/src/commonTest/kotlin/io/modelcontextprotocol/kotlin/sdk/integration/InMemoryTransportTest.kt b/kotlin-sdk-test/src/commonTest/kotlin/io/modelcontextprotocol/kotlin/sdk/integration/InMemoryTransportTest.kt index d13f5848..83e56184 100644 --- a/kotlin-sdk-test/src/commonTest/kotlin/io/modelcontextprotocol/kotlin/sdk/integration/InMemoryTransportTest.kt +++ b/kotlin-sdk-test/src/commonTest/kotlin/io/modelcontextprotocol/kotlin/sdk/integration/InMemoryTransportTest.kt @@ -1,8 +1,14 @@ package io.modelcontextprotocol.kotlin.sdk.integration import io.modelcontextprotocol.kotlin.sdk.shared.InMemoryTransport +import io.modelcontextprotocol.kotlin.sdk.types.BaseNotificationParams import io.modelcontextprotocol.kotlin.sdk.types.InitializedNotification import io.modelcontextprotocol.kotlin.sdk.types.JSONRPCMessage +import io.modelcontextprotocol.kotlin.sdk.types.PromptListChangedNotification +import io.modelcontextprotocol.kotlin.sdk.types.ResourceListChangedNotification +import io.modelcontextprotocol.kotlin.sdk.types.ResourceUpdatedNotification +import io.modelcontextprotocol.kotlin.sdk.types.ResourceUpdatedNotificationParams +import io.modelcontextprotocol.kotlin.sdk.types.ToolListChangedNotification import io.modelcontextprotocol.kotlin.sdk.types.toJSON import kotlinx.coroutines.test.runTest import kotlin.test.BeforeTest @@ -107,4 +113,94 @@ class InMemoryTransportTest { serverTransport.start() assertEquals(message, receivedMessage) } + + @Test + fun `should send ToolListChangedNotification from server to client`() = runTest { + val notification = ToolListChangedNotification( + BaseNotificationParams(), + ) + + var receivedMessage: JSONRPCMessage? = null + clientTransport.onMessage { msg -> + receivedMessage = msg + } + + val rpcMessage = notification.toJSON() + serverTransport.send(rpcMessage) + assertEquals(rpcMessage, receivedMessage) + } + + @Test + fun `should send PromptListChangedNotification from server to client`() = runTest { + val notification = PromptListChangedNotification( + BaseNotificationParams(), + ) + + var receivedMessage: JSONRPCMessage? = null + clientTransport.onMessage { msg -> + receivedMessage = msg + } + + val rpcMessage = notification.toJSON() + serverTransport.send(rpcMessage) + assertEquals(rpcMessage, receivedMessage) + } + + @Test + fun `should send ResourceListChangedNotification from server to client`() = runTest { + val notification = ResourceListChangedNotification( + BaseNotificationParams(), + ) + + var receivedMessage: JSONRPCMessage? = null + clientTransport.onMessage { msg -> + receivedMessage = msg + } + + val rpcMessage = notification.toJSON() + serverTransport.send(rpcMessage) + assertEquals(rpcMessage, receivedMessage) + } + + @Test + fun `should send ResourceUpdatedNotification from server to client`() = runTest { + val notification = ResourceUpdatedNotification( + ResourceUpdatedNotificationParams( + uri = "file:///workspace/data.json", + ), + ) + + var receivedMessage: JSONRPCMessage? = null + clientTransport.onMessage { msg -> + receivedMessage = msg + } + + val rpcMessage = notification.toJSON() + serverTransport.send(rpcMessage) + assertEquals(rpcMessage, receivedMessage) + } + + @Test + fun `should handle multiple notifications in sequence`() = runTest { + val notifications = listOf( + ToolListChangedNotification(), + PromptListChangedNotification(), + ResourceListChangedNotification(), + ResourceUpdatedNotification(ResourceUpdatedNotificationParams(uri = "file:///workspace/data.json")), + ) + + val receivedMessages = mutableListOf() + clientTransport.onMessage { msg -> + receivedMessages.add(msg) + } + + notifications.forEach { notification -> + serverTransport.send(notification.toJSON()) + } + + assertEquals(notifications.size, receivedMessages.size) + notifications.forEachIndexed { index, notification -> + assertEquals(notification.toJSON(), receivedMessages[index]) + } + } } diff --git a/kotlin-sdk-test/src/jvmTest/kotlin/io/modelcontextprotocol/kotlin/sdk/server/ServerPromptsNotificationTest.kt b/kotlin-sdk-test/src/jvmTest/kotlin/io/modelcontextprotocol/kotlin/sdk/server/ServerPromptsNotificationTest.kt new file mode 100644 index 00000000..84036466 --- /dev/null +++ b/kotlin-sdk-test/src/jvmTest/kotlin/io/modelcontextprotocol/kotlin/sdk/server/ServerPromptsNotificationTest.kt @@ -0,0 +1,111 @@ +package io.modelcontextprotocol.kotlin.sdk.server + +import io.modelcontextprotocol.kotlin.sdk.types.GetPromptResult +import io.modelcontextprotocol.kotlin.sdk.types.Method +import io.modelcontextprotocol.kotlin.sdk.types.Prompt +import io.modelcontextprotocol.kotlin.sdk.types.PromptListChangedNotification +import io.modelcontextprotocol.kotlin.sdk.types.ServerCapabilities +import kotlinx.coroutines.CompletableDeferred +import kotlinx.coroutines.test.runTest +import org.junit.jupiter.api.Test +import kotlin.test.assertEquals +import kotlin.test.assertFalse +import kotlin.test.assertTrue + +class ServerPromptsNotificationTest : AbstractServerFeaturesTest() { + + override fun getServerCapabilities(): ServerCapabilities = ServerCapabilities( + prompts = ServerCapabilities.Prompts(true), + ) + + @Test + fun `addPrompt should send notification`() = runTest { + // Configure notification handler + var promptListChangedNotificationReceived = false + client.setNotificationHandler(Method.Defined.NotificationsPromptsListChanged) { + promptListChangedNotificationReceived = true + CompletableDeferred(Unit) + } + + // Add a prompt + val testPrompt = Prompt("test-prompt", "Test Prompt", null) + server.addPrompt(testPrompt) { + GetPromptResult( + description = "Test prompt description", + messages = listOf(), + ) + } + + // Remove the prompt + val result = server.removePrompt(testPrompt.name) + // Close the server to stop processing further events and flush notifications + server.close() + + // Verify the prompt was removed + assertTrue(result, "Prompt should be removed successfully") + + // Verify that the notification was sent + assertTrue(promptListChangedNotificationReceived, "Notification should be sent when prompt is added") + } + + @Test + fun `removePrompts should remove multiple prompts and send two notifications`() = runTest { + // Configure notification handler + var promptListChangedNotificationReceivedCount = 0 + client.setNotificationHandler(Method.Defined.NotificationsPromptsListChanged) { + promptListChangedNotificationReceivedCount += 1 + CompletableDeferred(Unit) + } + + // Add prompts + val testPrompt1 = Prompt("test-prompt-1", "Test Prompt 1", null) + val testPrompt2 = Prompt("test-prompt-2", "Test Prompt 2", null) + + server.addPrompt(testPrompt1) { + GetPromptResult( + description = "Test prompt description 1", + messages = listOf(), + ) + } + server.addPrompt(testPrompt2) { + GetPromptResult( + description = "Test prompt description 2", + messages = listOf(), + ) + } + + // Remove the prompts + val result = server.removePrompts(listOf(testPrompt1.name, testPrompt2.name)) + // Close the server to stop processing further events and flush notifications + server.close() + + // Verify the prompts were removed + assertEquals(2, result, "Both prompts should be removed") + + // Verify that the notifications were sent twice + assertEquals( + 4, + promptListChangedNotificationReceivedCount, + "Two notifications should be sent when prompts are added and two when removed", + ) + } + + @Test + fun `notification should not be send when removed prompt does not exists`() = runTest { + // Track notifications + var promptListChangedNotificationReceived = false + client.setNotificationHandler(Method.Defined.NotificationsPromptsListChanged) { + promptListChangedNotificationReceived = true + CompletableDeferred(Unit) + } + + // Try to remove a non-existent prompt + val result = server.removePrompt("non-existent-prompt") + // Close the server to stop processing further events and flush notifications + server.close() + + // Verify the result + assertFalse(result, "Removing non-existent prompt should return false") + assertFalse(promptListChangedNotificationReceived, "No notification should be sent when prompt doesn't exist") + } +} diff --git a/kotlin-sdk-test/src/jvmTest/kotlin/io/modelcontextprotocol/kotlin/sdk/server/ServerPromptsTest.kt b/kotlin-sdk-test/src/jvmTest/kotlin/io/modelcontextprotocol/kotlin/sdk/server/ServerPromptsTest.kt index 60de0cab..5f49d04c 100644 --- a/kotlin-sdk-test/src/jvmTest/kotlin/io/modelcontextprotocol/kotlin/sdk/server/ServerPromptsTest.kt +++ b/kotlin-sdk-test/src/jvmTest/kotlin/io/modelcontextprotocol/kotlin/sdk/server/ServerPromptsTest.kt @@ -17,11 +17,18 @@ import kotlin.test.assertTrue class ServerPromptsTest : AbstractServerFeaturesTest() { override fun getServerCapabilities(): ServerCapabilities = ServerCapabilities( - prompts = ServerCapabilities.Prompts(false), + prompts = ServerCapabilities.Prompts(null), ) @Test - fun `removePrompt should remove a prompt`() = runTest { + fun `removePrompt should remove a prompt and do not send notification`() = runTest { + // Configure notification handler + var promptListChangedNotificationReceived = false + client.setNotificationHandler(Method.Defined.NotificationsPromptsListChanged) { + promptListChangedNotificationReceived = true + CompletableDeferred(Unit) + } + // Add a prompt val testPrompt = Prompt("test-prompt", "Test Prompt", null) server.addPrompt(testPrompt) { @@ -36,13 +43,30 @@ class ServerPromptsTest : AbstractServerFeaturesTest() { // Verify the prompt was removed assertTrue(result, "Prompt should be removed successfully") + + // Verify that the notification was not sent + assertFalse( + promptListChangedNotificationReceived, + "No notification should be sent when prompts capability is not supported", + ) } @Test - fun `removePrompts should remove multiple prompts and send notification`() = runTest { + fun `removePrompts should remove multiple prompts`() = runTest { + // Configure notification handler + var promptListChangedNotificationReceived = false + client.setNotificationHandler(Method.Defined.NotificationsPromptsListChanged) { + promptListChangedNotificationReceived = true + CompletableDeferred(Unit) + } + // Add prompts val testPrompt1 = Prompt("test-prompt-1", "Test Prompt 1", null) val testPrompt2 = Prompt("test-prompt-2", "Test Prompt 2", null) + client.setNotificationHandler(Method.Defined.NotificationsPromptsListChanged) { + throw IllegalStateException("Notification should not be sent") + } + server.addPrompt(testPrompt1) { GetPromptResult( description = "Test prompt description 1", @@ -61,6 +85,12 @@ class ServerPromptsTest : AbstractServerFeaturesTest() { // Verify the prompts were removed assertEquals(2, result, "Both prompts should be removed") + + // Verify that the notification was not sent + assertFalse( + promptListChangedNotificationReceived, + "No notification should be sent when prompts capability is not supported", + ) } @Test @@ -82,6 +112,12 @@ class ServerPromptsTest : AbstractServerFeaturesTest() { @Test fun `removePrompt should throw when prompts capability is not supported`() = runTest { + var promptListChangedNotificationReceived = false + client.setNotificationHandler(Method.Defined.NotificationsPromptsListChanged) { + promptListChangedNotificationReceived = true + CompletableDeferred(Unit) + } + // Create server without prompts capability val serverOptions = ServerOptions( capabilities = ServerCapabilities(), @@ -96,5 +132,11 @@ class ServerPromptsTest : AbstractServerFeaturesTest() { server.removePrompt("test-prompt") } assertEquals("Server does not support prompts capability.", exception.message) + + // Verify that the notification was not sent + assertFalse( + promptListChangedNotificationReceived, + "No notification should be sent when prompts capability is not supported", + ) } } diff --git a/kotlin-sdk-test/src/jvmTest/kotlin/io/modelcontextprotocol/kotlin/sdk/server/ServerResourcesNotificationSubscribeTest.kt b/kotlin-sdk-test/src/jvmTest/kotlin/io/modelcontextprotocol/kotlin/sdk/server/ServerResourcesNotificationSubscribeTest.kt new file mode 100644 index 00000000..4948ecf7 --- /dev/null +++ b/kotlin-sdk-test/src/jvmTest/kotlin/io/modelcontextprotocol/kotlin/sdk/server/ServerResourcesNotificationSubscribeTest.kt @@ -0,0 +1,184 @@ +package io.modelcontextprotocol.kotlin.sdk.server + +import io.modelcontextprotocol.kotlin.sdk.types.Method +import io.modelcontextprotocol.kotlin.sdk.types.ReadResourceResult +import io.modelcontextprotocol.kotlin.sdk.types.ResourceListChangedNotification +import io.modelcontextprotocol.kotlin.sdk.types.ResourceUpdatedNotification +import io.modelcontextprotocol.kotlin.sdk.types.ServerCapabilities +import io.modelcontextprotocol.kotlin.sdk.types.SubscribeRequest +import io.modelcontextprotocol.kotlin.sdk.types.SubscribeRequestParams +import io.modelcontextprotocol.kotlin.sdk.types.TextResourceContents +import kotlinx.coroutines.CompletableDeferred +import kotlinx.coroutines.test.runTest +import org.junit.jupiter.api.Test +import kotlin.test.assertEquals +import kotlin.test.assertFalse +import kotlin.test.assertTrue + +class ServerResourcesNotificationSubscribeTest : AbstractServerFeaturesTest() { + + override fun getServerCapabilities(): ServerCapabilities = ServerCapabilities( + resources = ServerCapabilities.Resources(null, true), + ) + + @Test + fun `removeResource should send resource update notification`() = runTest { + val notifications = mutableListOf() + client.setNotificationHandler(Method.Defined.NotificationsResourcesUpdated) { + notifications.add(it) + println(it) + CompletableDeferred(Unit) + } + + // Add resources + val testResourceUri1 = "test://resource1" + val testResourceUri2 = "test://resource2" + + server.addResource( + uri = testResourceUri1, + name = "Test Resource 1", + description = "A test resource 1", + mimeType = "text/plain", + ) { + ReadResourceResult( + contents = listOf( + TextResourceContents( + text = "Test resource content 1", + uri = testResourceUri1, + mimeType = "text/plain", + ), + ), + ) + } + + client.subscribeResource(SubscribeRequest(SubscribeRequestParams(uri = testResourceUri1))) + + server.addResource( + uri = testResourceUri2, + name = "Test Resource 2", + description = "A test resource 2", + mimeType = "text/plain", + ) { + ReadResourceResult( + contents = listOf( + TextResourceContents( + text = "Test resource content 2", + uri = testResourceUri2, + mimeType = "text/plain", + ), + ), + ) + } + + // Remove the resource + val result = server.removeResource(testResourceUri1) + // Close the server to stop processing further events and flush notifications + server.close() + + // Verify the resource was removed + assertTrue(result, "Resource should be removed successfully") + + println(notifications.map { it.params.uri }) + // Verify that the notification was sent + assertEquals(1, notifications.size, "Notification should be sent when resource 1 was deleted") + assertEquals(testResourceUri1, notifications[0].params.uri, "Notification should contain the resource 1 URI") + } + + @Test + fun `removeResource for two resources should send two separate notifications`() = runTest { + val notifications = mutableListOf() + client.setNotificationHandler(Method.Defined.NotificationsResourcesUpdated) { + notifications.add(it) + CompletableDeferred(Unit) + } + + // Add resources + val testResourceUri1 = "test://resource1" + val testResourceUri2 = "test://resource2" + + server.addResource( + uri = testResourceUri1, + name = "Test Resource 1", + description = "A test resource 1", + mimeType = "text/plain", + ) { + ReadResourceResult( + contents = listOf( + TextResourceContents( + text = "Test resource content 1", + uri = testResourceUri1, + mimeType = "text/plain", + ), + ), + ) + } + + server.addResource( + uri = testResourceUri2, + name = "Test Resource 2", + description = "A test resource 2", + mimeType = "text/plain", + ) { + ReadResourceResult( + contents = listOf( + TextResourceContents( + text = "Test resource content 2", + uri = testResourceUri2, + mimeType = "text/plain", + ), + ), + ) + } + + client.subscribeResource(SubscribeRequest(SubscribeRequestParams(uri = testResourceUri1))) + client.subscribeResource(SubscribeRequest(SubscribeRequestParams(uri = testResourceUri2))) + + // Remove the resource + val result1 = server.removeResource(testResourceUri1) + val result2 = server.removeResource(testResourceUri2) + // Close the server to stop processing further events and flush notifications + server.close() + + // Verify the resource was removed + assertTrue(result1, "Resource 1 should be removed successfully") + assertTrue(result2, "Resource 2 should be removed successfully") + + println(notifications.map { it.params.uri }) + // Verify that the notification was sent + assertEquals(2, notifications.size, "Notification should be sent when resource 1 and resource 2 was deleted") + + val deletedResources = listOf(notifications[0].params.uri, notifications[1].params.uri) + assertTrue( + deletedResources.contains(testResourceUri1), + "Notification should contain the removed resource 1 URI", + ) + assertTrue( + deletedResources.contains(testResourceUri2), + "Notification should contain the removed resource 2 URI", + ) + } + + @Test + fun `notification should not be send when removed resource does not exists`() = runTest { + // Track notifications + var resourceListChangedNotificationReceived = false + client.setNotificationHandler( + Method.Defined.NotificationsResourcesListChanged, + ) { + resourceListChangedNotificationReceived = true + CompletableDeferred(Unit) + } + + // Try to remove a non-existent resource + val result = server.removeResource("non-existent-resource") + // Close the server to stop processing further events and flush notifications + server.close() + + // Verify the result + assertFalse(result, "Removing non-existent resource should return false") + assertFalse( + resourceListChangedNotificationReceived, + "No notification should be sent when resource doesn't exist", + ) + } +} diff --git a/kotlin-sdk-test/src/jvmTest/kotlin/io/modelcontextprotocol/kotlin/sdk/server/ServerResourcesNotificationTest.kt b/kotlin-sdk-test/src/jvmTest/kotlin/io/modelcontextprotocol/kotlin/sdk/server/ServerResourcesNotificationTest.kt new file mode 100644 index 00000000..ab284a66 --- /dev/null +++ b/kotlin-sdk-test/src/jvmTest/kotlin/io/modelcontextprotocol/kotlin/sdk/server/ServerResourcesNotificationTest.kt @@ -0,0 +1,150 @@ +package io.modelcontextprotocol.kotlin.sdk.server + +import io.modelcontextprotocol.kotlin.sdk.types.Method +import io.modelcontextprotocol.kotlin.sdk.types.ReadResourceResult +import io.modelcontextprotocol.kotlin.sdk.types.ResourceListChangedNotification +import io.modelcontextprotocol.kotlin.sdk.types.ServerCapabilities +import io.modelcontextprotocol.kotlin.sdk.types.TextResourceContents +import kotlinx.coroutines.CompletableDeferred +import kotlinx.coroutines.test.runTest +import org.junit.jupiter.api.Test +import kotlin.test.assertEquals +import kotlin.test.assertFalse +import kotlin.test.assertTrue + +class ServerResourcesNotificationTest : AbstractServerFeaturesTest() { + + override fun getServerCapabilities(): ServerCapabilities = ServerCapabilities( + resources = ServerCapabilities.Resources(true, null), + ) + + @Test + fun `addResource should send notification`() = runTest { + // Configure notification handler + var resourceListChangedNotificationReceived = false + client.setNotificationHandler( + Method.Defined.NotificationsResourcesListChanged, + ) { + resourceListChangedNotificationReceived = true + CompletableDeferred(Unit) + } + + // Add a resource + val testResourceUri = "test://resource" + server.addResource( + uri = testResourceUri, + name = "Test Resource", + description = "A test resource", + mimeType = "text/plain", + ) { + ReadResourceResult( + contents = listOf( + TextResourceContents( + text = "Test resource content", + uri = testResourceUri, + mimeType = "text/plain", + ), + ), + ) + } + + // Remove the resource + val result = server.removeResource(testResourceUri) + // Close the server to stop processing further events and flush notifications + server.close() + + // Verify the resource was removed + assertTrue(result, "Resource should be removed successfully") + + // Verify that the notification was sent + assertTrue(resourceListChangedNotificationReceived, "Notification should be sent when resource is added") + } + + @Test + fun `removeResources should remove multiple resources and send two notifications`() = runTest { + // Configure notification handler + var resourceListChangedNotificationReceivedCount = 0 + client.setNotificationHandler( + Method.Defined.NotificationsResourcesListChanged, + ) { + resourceListChangedNotificationReceivedCount += 1 + CompletableDeferred(Unit) + } + + // Add resources + val testResourceUri1 = "test://resource1" + val testResourceUri2 = "test://resource2" + + server.addResource( + uri = testResourceUri1, + name = "Test Resource 1", + description = "A test resource 1", + mimeType = "text/plain", + ) { + ReadResourceResult( + contents = listOf( + TextResourceContents( + text = "Test resource content 1", + uri = testResourceUri1, + mimeType = "text/plain", + ), + ), + ) + } + server.addResource( + uri = testResourceUri2, + name = "Test Resource 2", + description = "A test resource 2", + mimeType = "text/plain", + ) { + ReadResourceResult( + contents = listOf( + TextResourceContents( + text = "Test resource content 2", + uri = testResourceUri2, + mimeType = "text/plain", + ), + ), + ) + } + + // Remove the resources + val result = server.removeResources(listOf(testResourceUri1, testResourceUri2)) + // Close the server to stop processing further events and flush notifications + server.close() + + // Verify the resources were removed + assertEquals(2, result, "Both resources should be removed") + + // Verify that the notifications were sent twice + assertEquals( + 4, + resourceListChangedNotificationReceivedCount, + "Two notifications should be sent when resources are added and two when removed", + ) + } + + @Test + fun `notification should not be send when removed resource does not exists`() = runTest { + // Track notifications + var resourceListChangedNotificationReceived = false + client.setNotificationHandler( + Method.Defined.NotificationsResourcesListChanged, + ) { + resourceListChangedNotificationReceived = true + CompletableDeferred(Unit) + } + + // Try to remove a non-existent resource + val result = server.removeResource("non-existent-resource") + // Close the server to stop processing further events and flush notifications + server.close() + + // Verify the result + assertFalse(result, "Removing non-existent resource should return false") + assertFalse( + resourceListChangedNotificationReceived, + "No notification should be sent when resource doesn't exist", + ) + } +} diff --git a/kotlin-sdk-test/src/jvmTest/kotlin/io/modelcontextprotocol/kotlin/sdk/server/ServerResourcesTest.kt b/kotlin-sdk-test/src/jvmTest/kotlin/io/modelcontextprotocol/kotlin/sdk/server/ServerResourcesTest.kt index ad73c834..fafc3c86 100644 --- a/kotlin-sdk-test/src/jvmTest/kotlin/io/modelcontextprotocol/kotlin/sdk/server/ServerResourcesTest.kt +++ b/kotlin-sdk-test/src/jvmTest/kotlin/io/modelcontextprotocol/kotlin/sdk/server/ServerResourcesTest.kt @@ -21,7 +21,7 @@ class ServerResourcesTest : AbstractServerFeaturesTest() { ) @Test - fun `removeResource should remove a resource and send notification`() = runTest { + fun `removeResource should remove a resource`() = runTest { // Add a resource val testResourceUri = "test://resource" server.addResource( @@ -49,7 +49,7 @@ class ServerResourcesTest : AbstractServerFeaturesTest() { } @Test - fun `removeResources should remove multiple resources and send notification`() = runTest { + fun `removeResources should remove multiple resources`() = runTest { // Add resources val testResourceUri1 = "test://resource1" val testResourceUri2 = "test://resource2" diff --git a/kotlin-sdk-test/src/jvmTest/kotlin/io/modelcontextprotocol/kotlin/sdk/server/ServerToolsNotificationTest.kt b/kotlin-sdk-test/src/jvmTest/kotlin/io/modelcontextprotocol/kotlin/sdk/server/ServerToolsNotificationTest.kt new file mode 100644 index 00000000..b5a80186 --- /dev/null +++ b/kotlin-sdk-test/src/jvmTest/kotlin/io/modelcontextprotocol/kotlin/sdk/server/ServerToolsNotificationTest.kt @@ -0,0 +1,99 @@ +package io.modelcontextprotocol.kotlin.sdk.server + +import io.modelcontextprotocol.kotlin.sdk.types.CallToolResult +import io.modelcontextprotocol.kotlin.sdk.types.Method +import io.modelcontextprotocol.kotlin.sdk.types.ServerCapabilities +import io.modelcontextprotocol.kotlin.sdk.types.TextContent +import io.modelcontextprotocol.kotlin.sdk.types.ToolListChangedNotification +import io.modelcontextprotocol.kotlin.sdk.types.ToolSchema +import kotlinx.coroutines.CompletableDeferred +import kotlinx.coroutines.test.runTest +import org.junit.jupiter.api.Test +import kotlin.test.assertEquals +import kotlin.test.assertFalse +import kotlin.test.assertTrue + +class ServerToolsNotificationTest : AbstractServerFeaturesTest() { + + override fun getServerCapabilities(): ServerCapabilities = ServerCapabilities( + tools = ServerCapabilities.Tools(true), + ) + + @Test + fun `addTool should send notification`() = runTest { + // Configure notification handler + var toolListChangedNotificationReceived = false + client.setNotificationHandler(Method.Defined.NotificationsToolsListChanged) { + toolListChangedNotificationReceived = true + CompletableDeferred(Unit) + } + + // Add a tool + server.addTool("test-tool", "Test Tool", ToolSchema()) { + CallToolResult(listOf(TextContent("Test result"))) + } + + // Remove the tool + val result = server.removeTool("test-tool") + // Close the server to stop processing further events and flush notifications + server.close() + + // Verify the tool was removed + assertTrue(result, "Tool should be removed successfully") + + // Verify that the notification was sent + assertTrue(toolListChangedNotificationReceived, "Notification should be sent when tool is added") + } + + @Test + fun `removeTools should remove multiple tools and send two notifications`() = runTest { + // Configure notification handler + var toolListChangedNotificationReceivedCount = 0 + client.setNotificationHandler(Method.Defined.NotificationsToolsListChanged) { + toolListChangedNotificationReceivedCount += 1 + CompletableDeferred(Unit) + } + + // Add tools + server.addTool("test-tool-1", "Test Tool 1") { + CallToolResult(listOf(TextContent("Test result 1"))) + } + server.addTool("test-tool-2", "Test Tool 2") { + CallToolResult(listOf(TextContent("Test result 2"))) + } + + // Remove the tools + val result = server.removeTools(listOf("test-tool-1", "test-tool-2")) + // Close the server to stop processing further events and flush notifications + server.close() + + // Verify the tools were removed + assertEquals(2, result, "Both tools should be removed") + + // Verify that the notifications were sent twice + assertEquals( + 4, + toolListChangedNotificationReceivedCount, + "Two notifications should be sent when tools are added and two when removed", + ) + } + + @Test + fun `notification should not be send when removed tool does not exists`() = runTest { + // Track notifications + var toolListChangedNotificationReceived = false + client.setNotificationHandler(Method.Defined.NotificationsToolsListChanged) { + toolListChangedNotificationReceived = true + CompletableDeferred(Unit) + } + + // Try to remove a non-existent tool + val result = server.removeTool("non-existent-tool") + // Close the server to stop processing further events and flush notifications + server.close() + + // Verify the result + assertFalse(result, "Removing non-existent tool should return false") + assertFalse(toolListChangedNotificationReceived, "No notification should be sent when tool doesn't exist") + } +} diff --git a/kotlin-sdk-test/src/jvmTest/kotlin/io/modelcontextprotocol/kotlin/sdk/server/ServerToolsTest.kt b/kotlin-sdk-test/src/jvmTest/kotlin/io/modelcontextprotocol/kotlin/sdk/server/ServerToolsTest.kt index eae2eb23..c2e82015 100644 --- a/kotlin-sdk-test/src/jvmTest/kotlin/io/modelcontextprotocol/kotlin/sdk/server/ServerToolsTest.kt +++ b/kotlin-sdk-test/src/jvmTest/kotlin/io/modelcontextprotocol/kotlin/sdk/server/ServerToolsTest.kt @@ -22,7 +22,14 @@ class ServerToolsTest : AbstractServerFeaturesTest() { ) @Test - fun `removeTool should remove a tool`() = runTest { + fun `removeTool should remove a tool and do not send notification`() = runTest { + // Configure notification handler + var toolListChangedNotificationReceived = false + client.setNotificationHandler(Method.Defined.NotificationsToolsListChanged) { + toolListChangedNotificationReceived = true + CompletableDeferred(Unit) + } + // Add a tool server.addTool("test-tool", "Test Tool", ToolSchema()) { CallToolResult(listOf(TextContent("Test result"))) @@ -33,6 +40,45 @@ class ServerToolsTest : AbstractServerFeaturesTest() { // Verify the tool was removed assertTrue(result, "Tool should be removed successfully") + + // Verify that the notification was not sent + assertFalse( + toolListChangedNotificationReceived, + "No notification should be sent when tools capability is not supported", + ) + } + + @Test + fun `removeTools should remove multiple tools`() = runTest { + // Configure notification handler + var toolListChangedNotificationReceived = false + client.setNotificationHandler(Method.Defined.NotificationsToolsListChanged) { + toolListChangedNotificationReceived = true + CompletableDeferred(Unit) + } + + // Add tools + server.addTool("test-tool-1", "Test Tool 1") { + CallToolResult(listOf(TextContent("Test result 1"))) + } + server.addTool("test-tool-2", "Test Tool 2") { + CallToolResult(listOf(TextContent("Test result 2"))) + } + client.setNotificationHandler(Method.Defined.NotificationsToolsListChanged) { + throw IllegalStateException("Notification should not be sent") + } + + // Remove the tools + val result = server.removeTools(listOf("test-tool-1", "test-tool-2")) + + // Verify the tools were removed + assertEquals(2, result, "Both tools should be removed") + + // Verify that the notification was not sent + assertFalse( + toolListChangedNotificationReceived, + "No notification should be sent when tools capability is not supported", + ) } @Test @@ -54,6 +100,12 @@ class ServerToolsTest : AbstractServerFeaturesTest() { @Test fun `removeTool should throw when tools capability is not supported`() = runTest { + var toolListChangedNotificationReceived = false + client.setNotificationHandler(Method.Defined.NotificationsToolsListChanged) { + toolListChangedNotificationReceived = true + CompletableDeferred(Unit) + } + // Create server without tools capability val serverOptions = ServerOptions( capabilities = ServerCapabilities(), @@ -68,22 +120,11 @@ class ServerToolsTest : AbstractServerFeaturesTest() { server.removeTool("test-tool") } assertEquals("Server does not support tools capability.", exception.message) - } - @Test - fun `removeTools should remove multiple tools`() = runTest { - // Add tools - server.addTool("test-tool-1", "Test Tool 1") { - CallToolResult(listOf(TextContent("Test result 1"))) - } - server.addTool("test-tool-2", "Test Tool 2") { - CallToolResult(listOf(TextContent("Test result 2"))) - } - - // Remove the tools - val result = server.removeTools(listOf("test-tool-1", "test-tool-2")) - - // Verify the tools were removed - assertEquals(2, result, "Both tools should be removed") + // Verify that the notification was not sent + assertFalse( + toolListChangedNotificationReceived, + "No notification should be sent when tools capability is not supported", + ) } } From bb313f98e8f8f85fb517f563f38b4bd774793588 Mon Sep 17 00:00:00 2001 From: Maria Tigina Date: Mon, 1 Dec 2025 14:17:23 +0100 Subject: [PATCH 04/12] Fix notification timestamp comparison --- .../sdk/server/FeatureNotificationService.kt | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/kotlin-sdk-server/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/server/FeatureNotificationService.kt b/kotlin-sdk-server/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/server/FeatureNotificationService.kt index 252898d4..1f48299f 100644 --- a/kotlin-sdk-server/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/server/FeatureNotificationService.kt +++ b/kotlin-sdk-server/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/server/FeatureNotificationService.kt @@ -7,6 +7,8 @@ import io.modelcontextprotocol.kotlin.sdk.types.ResourceListChangedNotification import io.modelcontextprotocol.kotlin.sdk.types.ResourceUpdatedNotification import io.modelcontextprotocol.kotlin.sdk.types.ResourceUpdatedNotificationParams import io.modelcontextprotocol.kotlin.sdk.types.ToolListChangedNotification +import kotlin.time.Clock +import kotlin.time.ExperimentalTime import kotlinx.atomicfu.atomic import kotlinx.atomicfu.getAndUpdate import kotlinx.collections.immutable.persistentMapOf @@ -23,8 +25,6 @@ import kotlinx.coroutines.flow.SharedFlow import kotlinx.coroutines.flow.takeWhile import kotlinx.coroutines.joinAll import kotlinx.coroutines.launch -import kotlin.time.Clock -import kotlin.time.ExperimentalTime /** Represents an event for notification service. */ private sealed class Event @@ -66,7 +66,7 @@ private class SessionNotificationJob { events.takeWhile { it !is EndEvent }.collect { event -> when (event) { is NotificationEvent -> { - if (event.timestamp > fromTimestamp) { + if (event.timestamp >= fromTimestamp) { when (val notification = event.notification) { is PromptListChangedNotification -> { logger.info { @@ -91,22 +91,24 @@ private class SessionNotificationJob { is ResourceUpdatedNotification -> { resourceSubscriptions.value[notification.params.uri]?.let { resourceFromTimestamp -> - if (event.timestamp > resourceFromTimestamp) { + if (event.timestamp >= resourceFromTimestamp) { logger.info { - "Sending notification for resource ${notification.params.uri} " + + "Sending resource updated notification for resource " + + "${notification.params.uri} " + "to sessionId: ${session.sessionId}" } session.notification(notification) } else { logger.info { - "Skipping notification for resource ${notification.params.uri} " + + "Skipping resource updated notification for resource " + + "${notification.params.uri} " + "as it is older than subscription timestamp $resourceFromTimestamp" } } } ?: run { logger.info { "No subscription for resource ${notification.params.uri}. " + - "Skipping notification." + "Skipping notification: $notification" } } } @@ -118,7 +120,7 @@ private class SessionNotificationJob { } else { logger.info { "Skipping event with id: ${event.timestamp} " + - "as it is older than startingEventId $fromTimestamp" + "as it is older than startingEventId $fromTimestamp: $event" } } } From dcee1c546855ec6ff9d44a5c198fded94bf606c1 Mon Sep 17 00:00:00 2001 From: Maria Tigina Date: Mon, 1 Dec 2025 15:26:34 +0100 Subject: [PATCH 05/12] Fix linter --- .../kotlin/sdk/server/FeatureNotificationService.kt | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/kotlin-sdk-server/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/server/FeatureNotificationService.kt b/kotlin-sdk-server/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/server/FeatureNotificationService.kt index 1f48299f..b66ce91f 100644 --- a/kotlin-sdk-server/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/server/FeatureNotificationService.kt +++ b/kotlin-sdk-server/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/server/FeatureNotificationService.kt @@ -7,8 +7,6 @@ import io.modelcontextprotocol.kotlin.sdk.types.ResourceListChangedNotification import io.modelcontextprotocol.kotlin.sdk.types.ResourceUpdatedNotification import io.modelcontextprotocol.kotlin.sdk.types.ResourceUpdatedNotificationParams import io.modelcontextprotocol.kotlin.sdk.types.ToolListChangedNotification -import kotlin.time.Clock -import kotlin.time.ExperimentalTime import kotlinx.atomicfu.atomic import kotlinx.atomicfu.getAndUpdate import kotlinx.collections.immutable.persistentMapOf @@ -25,6 +23,8 @@ import kotlinx.coroutines.flow.SharedFlow import kotlinx.coroutines.flow.takeWhile import kotlinx.coroutines.joinAll import kotlinx.coroutines.launch +import kotlin.time.Clock +import kotlin.time.ExperimentalTime /** Represents an event for notification service. */ private sealed class Event @@ -386,7 +386,6 @@ internal class FeatureNotificationService( // Making sure all session notification jobs are completed (after receiving end event) sessionNotificationJobs.value.values.forEach { it.join() } - // Cancelling notification scope to stop processing further events notificationScope.cancel() } From 846c14a0c666cd49d456536bf1309ce5f27429af Mon Sep 17 00:00:00 2001 From: Maria Tigina Date: Mon, 1 Dec 2025 15:59:36 +0100 Subject: [PATCH 06/12] Refactor FeatureNotificationService --- .../sdk/server/FeatureNotificationService.kt | 36 +++++++++++-------- 1 file changed, 22 insertions(+), 14 deletions(-) diff --git a/kotlin-sdk-server/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/server/FeatureNotificationService.kt b/kotlin-sdk-server/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/server/FeatureNotificationService.kt index b66ce91f..963845be 100644 --- a/kotlin-sdk-server/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/server/FeatureNotificationService.kt +++ b/kotlin-sdk-server/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/server/FeatureNotificationService.kt @@ -26,19 +26,22 @@ import kotlinx.coroutines.launch import kotlin.time.Clock import kotlin.time.ExperimentalTime -/** Represents an event for notification service. */ -private sealed class Event +/** + * Represents an event for the notification service. + * + * @property timestamp A timestamp for the event. + */ +private sealed class NotificationEvent(open val timestamp: Long) /** * Represents an event for a notification. * - * @property timestamp A timestamp for the event. * @property notification The notification associated with the event. */ -private class NotificationEvent(val timestamp: Long, val notification: Notification) : Event() +private class SendEvent(override val timestamp: Long, val notification: Notification) : NotificationEvent(timestamp) /** Represents an event marking the end of notification processing. */ -private class EndEvent : Event() +private class EndEvent(override val timestamp: Long) : NotificationEvent(timestamp) /** * Represents a job that handles session-specific notifications, processing events @@ -48,7 +51,7 @@ private class EndEvent : Event() * based on the event type and the resource subscriptions associated with the session. * It allows subscribing to or unsubscribing from specific resource keys for granular * notification handling. The job can also be canceled to stop processing further events. - * IDs less than or equal to this value will be skipped. + * Notification with timestamps older than the starting timestamp are skipped. */ private class SessionNotificationJob { private val job: Job @@ -58,14 +61,14 @@ private class SessionNotificationJob { constructor( session: ServerSession, scope: CoroutineScope, - events: SharedFlow, + events: SharedFlow, fromTimestamp: Long, ) { logger.info { "Starting notification job from timestamp $fromTimestamp for sessionId: ${session.sessionId} " } job = scope.launch { events.takeWhile { it !is EndEvent }.collect { event -> when (event) { - is NotificationEvent -> { + is SendEvent -> { if (event.timestamp >= fromTimestamp) { when (val notification = event.notification) { is PromptListChangedNotification -> { @@ -186,18 +189,19 @@ internal class FeatureNotificationService( @OptIn(ExperimentalTime::class) private val clock: Clock = Clock.System, ) { + /** Coroutine scope used to handle asynchronous notifications. */ + private val notificationScope = CoroutineScope(SupervisorJob() + Dispatchers.Default) + + /** Flag indicating whether the service is closing. */ private val closingService = atomic(false) /** Shared flow used to emit events within the feature notification service. */ - private val notificationEvents = MutableSharedFlow( + private val notificationEvents = MutableSharedFlow( extraBufferCapacity = 100, replay = 0, onBufferOverflow = BufferOverflow.SUSPEND, ) - /** Coroutine scope used to handle asynchronous notifications. */ - private val notificationScope = CoroutineScope(SupervisorJob() + Dispatchers.Default) - /** Active emit jobs. */ private val activeEmitJobs = atomic(persistentSetOf()) @@ -270,6 +274,10 @@ internal class FeatureNotificationService( logger.info { "Subscribing session for notifications sessionId: ${session.sessionId}" } val timestamp = getCurrentTimestamp() + if (closingService.value) { + logger.warn { "Skipping subscription notification as service is closing: ${session.sessionId}" } + return + } sessionNotificationJobs.getAndUpdate { if (it.containsKey(session.sessionId)) { @@ -350,7 +358,7 @@ internal class FeatureNotificationService( // Launching emit lazily to put it to the jobs queue before the completion val job = notificationScope.launch(start = CoroutineStart.LAZY) { logger.info { "Actually emitting notification $timestamp: $notification" } - notificationEvents.emit(NotificationEvent(timestamp, notification)) + notificationEvents.emit(SendEvent(timestamp, notification)) logger.info { "Notification emitted $timestamp: $notification" } } @@ -380,7 +388,7 @@ internal class FeatureNotificationService( // Emitting end event to complete all session notification jobs notificationScope.launch { logger.info { "Emitting end event" } - notificationEvents.emit(EndEvent()) + notificationEvents.emit(EndEvent(getCurrentTimestamp())) logger.info { "End event emitted" } }.join() From 89450e591907a0652d5c483ac6ab2769bb435254 Mon Sep 17 00:00:00 2001 From: Maria Tigina Date: Tue, 2 Dec 2025 13:25:28 +0100 Subject: [PATCH 07/12] Use awaitility for testing notifications and remove unnecessary server.close() calls --- .../sdk/server/FeatureNotificationService.kt | 90 +++++++------------ .../kotlin/sdk/server/Server.kt | 8 +- .../server/ServerPromptsNotificationTest.kt | 31 ++++--- ...erverResourcesNotificationSubscribeTest.kt | 22 ++--- .../server/ServerResourcesNotificationTest.kt | 24 ++--- .../sdk/server/ServerToolsNotificationTest.kt | 23 ++--- 6 files changed, 89 insertions(+), 109 deletions(-) diff --git a/kotlin-sdk-server/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/server/FeatureNotificationService.kt b/kotlin-sdk-server/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/server/FeatureNotificationService.kt index 963845be..d63e4782 100644 --- a/kotlin-sdk-server/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/server/FeatureNotificationService.kt +++ b/kotlin-sdk-server/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/server/FeatureNotificationService.kt @@ -6,6 +6,7 @@ import io.modelcontextprotocol.kotlin.sdk.types.PromptListChangedNotification import io.modelcontextprotocol.kotlin.sdk.types.ResourceListChangedNotification import io.modelcontextprotocol.kotlin.sdk.types.ResourceUpdatedNotification import io.modelcontextprotocol.kotlin.sdk.types.ResourceUpdatedNotificationParams +import io.modelcontextprotocol.kotlin.sdk.types.ServerNotification import io.modelcontextprotocol.kotlin.sdk.types.ToolListChangedNotification import kotlinx.atomicfu.atomic import kotlinx.atomicfu.getAndUpdate @@ -18,6 +19,7 @@ import kotlinx.coroutines.Job import kotlinx.coroutines.SupervisorJob import kotlinx.coroutines.cancel import kotlinx.coroutines.channels.BufferOverflow +import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.flow.MutableSharedFlow import kotlinx.coroutines.flow.SharedFlow import kotlinx.coroutines.flow.takeWhile @@ -58,6 +60,17 @@ private class SessionNotificationJob { private val resourceSubscriptions = atomic(persistentMapOf()) private val logger = KotlinLogging.logger {} + /** + * Constructor for the SessionNotificationJob, responsible for processing notification events + * and dispatching appropriate notifications to the provided server session. The job operates + * within the given coroutine scope and begins handling events starting from the specified + * timestamp. + * + * @param session The server session where notifications will be dispatched. + * @param scope The coroutine scope in which this job operates. + * @param events A shared flow of notification events that the job listens to. + * @param fromTimestamp The timestamp from which the job starts processing events. + */ constructor( session: ServerSession, scope: CoroutineScope, @@ -71,23 +84,12 @@ private class SessionNotificationJob { is SendEvent -> { if (event.timestamp >= fromTimestamp) { when (val notification = event.notification) { - is PromptListChangedNotification -> { - logger.info { - "Sending prompt list changed notification for sessionId: ${session.sessionId}" - } - session.notification(notification) - } - - is ResourceListChangedNotification -> { + is PromptListChangedNotification, + is ResourceListChangedNotification, + is ToolListChangedNotification, + -> { logger.info { - "Sending resourse list changed notification for sessionId: ${session.sessionId}" - } - session.notification(notification) - } - - is ToolListChangedNotification -> { - logger.info { - "Sending tool list changed notification for sessionId: ${session.sessionId}" + "Sending list changed notification for sessionId: ${session.sessionId}" } session.notification(notification) } @@ -186,6 +188,7 @@ private class SessionNotificationJob { * - Resource updates pertaining to specific resources. */ internal class FeatureNotificationService( + notificationBufferCapacity: Int = Channel.UNLIMITED, @OptIn(ExperimentalTime::class) private val clock: Clock = Clock.System, ) { @@ -197,8 +200,7 @@ internal class FeatureNotificationService( /** Shared flow used to emit events within the feature notification service. */ private val notificationEvents = MutableSharedFlow( - extraBufferCapacity = 100, - replay = 0, + extraBufferCapacity = notificationBufferCapacity, onBufferOverflow = BufferOverflow.SUSPEND, ) @@ -210,57 +212,27 @@ internal class FeatureNotificationService( private val logger = KotlinLogging.logger {} - /** Listener for tool feature events. */ - private val toolListChangedListener: FeatureListener by lazy { + private fun featureListener(notificationProvider: (FeatureKey) -> ServerNotification): FeatureListener = object : FeatureListener { override fun onFeatureUpdated(featureKey: FeatureKey) { - logger.info { "Emitting tool list changed notification" } - emit(ToolListChangedNotification()) + val notification = notificationProvider(featureKey) + logger.info { "Emitting notification: ${notification.method.value}" } + emit(notification) } } - } + + /** Listener for tool feature events. */ + internal val toolListChangedListener: FeatureListener = featureListener { ToolListChangedNotification() } /** Listener for prompt feature events. */ - private val promptListChangeListener: FeatureListener by lazy { - object : FeatureListener { - override fun onFeatureUpdated(featureKey: FeatureKey) { - logger.info { "Emitting prompt list changed notification" } - emit(PromptListChangedNotification()) - } - } - } + internal val promptListChangedListener: FeatureListener = featureListener { PromptListChangedNotification() } /** Listener for resource feature events. */ - private val resourceListChangedListener: FeatureListener by lazy { - object : FeatureListener { - override fun onFeatureUpdated(featureKey: FeatureKey) { - logger.info { "Emitting resource list changed notification" } - emit(ResourceListChangedNotification()) - } - } - } - - /** Listener for resource update events. */ - private val resourceUpdatedListener: FeatureListener by lazy { - object : FeatureListener { - override fun onFeatureUpdated(featureKey: FeatureKey) { - logger.info { "Emitting resource updated notification for feature key: $featureKey" } - emit(ResourceUpdatedNotification(ResourceUpdatedNotificationParams(uri = featureKey))) - } - } - } - - /** Listener for the tool list changed events. */ - internal fun getToolListChangedListener(): FeatureListener = toolListChangedListener - - /** Listener for the prompt list changed events. */ - internal fun getPromptListChangedListener(): FeatureListener = promptListChangeListener - - /** Listener for the resource list changed events. */ - internal fun getResourceListChangedListener(): FeatureListener = resourceListChangedListener + internal val resourceListChangedListener: FeatureListener = featureListener { ResourceListChangedNotification() } /** Listener for resource update events. */ - internal fun getResourceUpdateListener(): FeatureListener = resourceUpdatedListener + internal val resourceUpdatedListener: FeatureListener = + featureListener { ResourceUpdatedNotification(ResourceUpdatedNotificationParams(uri = it)) } /** * Subscribes session to list changed notifications for all features and resource update notifications. diff --git a/kotlin-sdk-server/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/server/Server.kt b/kotlin-sdk-server/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/server/Server.kt index 57d82ef4..97d309dc 100644 --- a/kotlin-sdk-server/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/server/Server.kt +++ b/kotlin-sdk-server/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/server/Server.kt @@ -113,20 +113,20 @@ public open class Server( private val toolRegistry = FeatureRegistry("Tool").apply { if (options.capabilities.tools?.listChanged ?: false) { - addListener(notificationService.getToolListChangedListener()) + addListener(notificationService.toolListChangedListener) } } private val promptRegistry = FeatureRegistry("Prompt").apply { if (options.capabilities.prompts?.listChanged ?: false) { - addListener(notificationService.getPromptListChangedListener()) + addListener(notificationService.promptListChangedListener) } } private val resourceRegistry = FeatureRegistry("Resource").apply { if (options.capabilities.resources?.listChanged ?: false) { - addListener(notificationService.getResourceListChangedListener()) + addListener(notificationService.resourceListChangedListener) } if (options.capabilities.resources?.subscribe ?: false) { - addListener(notificationService.getResourceUpdateListener()) + addListener(notificationService.resourceUpdatedListener) } } diff --git a/kotlin-sdk-test/src/jvmTest/kotlin/io/modelcontextprotocol/kotlin/sdk/server/ServerPromptsNotificationTest.kt b/kotlin-sdk-test/src/jvmTest/kotlin/io/modelcontextprotocol/kotlin/sdk/server/ServerPromptsNotificationTest.kt index 84036466..e2cfe294 100644 --- a/kotlin-sdk-test/src/jvmTest/kotlin/io/modelcontextprotocol/kotlin/sdk/server/ServerPromptsNotificationTest.kt +++ b/kotlin-sdk-test/src/jvmTest/kotlin/io/modelcontextprotocol/kotlin/sdk/server/ServerPromptsNotificationTest.kt @@ -7,6 +7,8 @@ import io.modelcontextprotocol.kotlin.sdk.types.PromptListChangedNotification import io.modelcontextprotocol.kotlin.sdk.types.ServerCapabilities import kotlinx.coroutines.CompletableDeferred import kotlinx.coroutines.test.runTest +import org.awaitility.kotlin.await +import org.awaitility.kotlin.untilAsserted import org.junit.jupiter.api.Test import kotlin.test.assertEquals import kotlin.test.assertFalse @@ -38,14 +40,14 @@ class ServerPromptsNotificationTest : AbstractServerFeaturesTest() { // Remove the prompt val result = server.removePrompt(testPrompt.name) - // Close the server to stop processing further events and flush notifications - server.close() // Verify the prompt was removed assertTrue(result, "Prompt should be removed successfully") // Verify that the notification was sent - assertTrue(promptListChangedNotificationReceived, "Notification should be sent when prompt is added") + await untilAsserted { + assertTrue(promptListChangedNotificationReceived, "Notification should be sent when prompt is added") + } } @Test @@ -76,18 +78,18 @@ class ServerPromptsNotificationTest : AbstractServerFeaturesTest() { // Remove the prompts val result = server.removePrompts(listOf(testPrompt1.name, testPrompt2.name)) - // Close the server to stop processing further events and flush notifications - server.close() // Verify the prompts were removed assertEquals(2, result, "Both prompts should be removed") // Verify that the notifications were sent twice - assertEquals( - 4, - promptListChangedNotificationReceivedCount, - "Two notifications should be sent when prompts are added and two when removed", - ) + await untilAsserted { + assertEquals( + 4, + promptListChangedNotificationReceivedCount, + "Two notifications should be sent when prompts are added and two when removed", + ) + } } @Test @@ -101,11 +103,14 @@ class ServerPromptsNotificationTest : AbstractServerFeaturesTest() { // Try to remove a non-existent prompt val result = server.removePrompt("non-existent-prompt") - // Close the server to stop processing further events and flush notifications - server.close() // Verify the result assertFalse(result, "Removing non-existent prompt should return false") - assertFalse(promptListChangedNotificationReceived, "No notification should be sent when prompt doesn't exist") + await untilAsserted { + assertFalse( + promptListChangedNotificationReceived, + "No notification should be sent when prompt doesn't exist", + ) + } } } diff --git a/kotlin-sdk-test/src/jvmTest/kotlin/io/modelcontextprotocol/kotlin/sdk/server/ServerResourcesNotificationSubscribeTest.kt b/kotlin-sdk-test/src/jvmTest/kotlin/io/modelcontextprotocol/kotlin/sdk/server/ServerResourcesNotificationSubscribeTest.kt index 4948ecf7..62844501 100644 --- a/kotlin-sdk-test/src/jvmTest/kotlin/io/modelcontextprotocol/kotlin/sdk/server/ServerResourcesNotificationSubscribeTest.kt +++ b/kotlin-sdk-test/src/jvmTest/kotlin/io/modelcontextprotocol/kotlin/sdk/server/ServerResourcesNotificationSubscribeTest.kt @@ -10,6 +10,8 @@ import io.modelcontextprotocol.kotlin.sdk.types.SubscribeRequestParams import io.modelcontextprotocol.kotlin.sdk.types.TextResourceContents import kotlinx.coroutines.CompletableDeferred import kotlinx.coroutines.test.runTest +import org.awaitility.kotlin.await +import org.awaitility.kotlin.untilAsserted import org.junit.jupiter.api.Test import kotlin.test.assertEquals import kotlin.test.assertFalse @@ -26,7 +28,6 @@ class ServerResourcesNotificationSubscribeTest : AbstractServerFeaturesTest() { val notifications = mutableListOf() client.setNotificationHandler(Method.Defined.NotificationsResourcesUpdated) { notifications.add(it) - println(it) CompletableDeferred(Unit) } @@ -72,15 +73,14 @@ class ServerResourcesNotificationSubscribeTest : AbstractServerFeaturesTest() { // Remove the resource val result = server.removeResource(testResourceUri1) - // Close the server to stop processing further events and flush notifications - server.close() // Verify the resource was removed assertTrue(result, "Resource should be removed successfully") - println(notifications.map { it.params.uri }) // Verify that the notification was sent - assertEquals(1, notifications.size, "Notification should be sent when resource 1 was deleted") + await untilAsserted { + assertEquals(1, notifications.size, "Notification should be sent when resource 1 was deleted") + } assertEquals(testResourceUri1, notifications[0].params.uri, "Notification should contain the resource 1 URI") } @@ -136,8 +136,6 @@ class ServerResourcesNotificationSubscribeTest : AbstractServerFeaturesTest() { // Remove the resource val result1 = server.removeResource(testResourceUri1) val result2 = server.removeResource(testResourceUri2) - // Close the server to stop processing further events and flush notifications - server.close() // Verify the resource was removed assertTrue(result1, "Resource 1 should be removed successfully") @@ -145,7 +143,13 @@ class ServerResourcesNotificationSubscribeTest : AbstractServerFeaturesTest() { println(notifications.map { it.params.uri }) // Verify that the notification was sent - assertEquals(2, notifications.size, "Notification should be sent when resource 1 and resource 2 was deleted") + await untilAsserted { + assertEquals( + 2, + notifications.size, + "Notification should be sent when resource 1 and resource 2 was deleted", + ) + } val deletedResources = listOf(notifications[0].params.uri, notifications[1].params.uri) assertTrue( @@ -171,8 +175,6 @@ class ServerResourcesNotificationSubscribeTest : AbstractServerFeaturesTest() { // Try to remove a non-existent resource val result = server.removeResource("non-existent-resource") - // Close the server to stop processing further events and flush notifications - server.close() // Verify the result assertFalse(result, "Removing non-existent resource should return false") diff --git a/kotlin-sdk-test/src/jvmTest/kotlin/io/modelcontextprotocol/kotlin/sdk/server/ServerResourcesNotificationTest.kt b/kotlin-sdk-test/src/jvmTest/kotlin/io/modelcontextprotocol/kotlin/sdk/server/ServerResourcesNotificationTest.kt index ab284a66..0267549c 100644 --- a/kotlin-sdk-test/src/jvmTest/kotlin/io/modelcontextprotocol/kotlin/sdk/server/ServerResourcesNotificationTest.kt +++ b/kotlin-sdk-test/src/jvmTest/kotlin/io/modelcontextprotocol/kotlin/sdk/server/ServerResourcesNotificationTest.kt @@ -11,6 +11,8 @@ import org.junit.jupiter.api.Test import kotlin.test.assertEquals import kotlin.test.assertFalse import kotlin.test.assertTrue +import org.awaitility.kotlin.await +import org.awaitility.kotlin.untilAsserted class ServerResourcesNotificationTest : AbstractServerFeaturesTest() { @@ -50,14 +52,14 @@ class ServerResourcesNotificationTest : AbstractServerFeaturesTest() { // Remove the resource val result = server.removeResource(testResourceUri) - // Close the server to stop processing further events and flush notifications - server.close() // Verify the resource was removed assertTrue(result, "Resource should be removed successfully") // Verify that the notification was sent - assertTrue(resourceListChangedNotificationReceived, "Notification should be sent when resource is added") + await untilAsserted { + assertTrue(resourceListChangedNotificationReceived, "Notification should be sent when resource is added") + } } @Test @@ -110,18 +112,18 @@ class ServerResourcesNotificationTest : AbstractServerFeaturesTest() { // Remove the resources val result = server.removeResources(listOf(testResourceUri1, testResourceUri2)) - // Close the server to stop processing further events and flush notifications - server.close() // Verify the resources were removed assertEquals(2, result, "Both resources should be removed") // Verify that the notifications were sent twice - assertEquals( - 4, - resourceListChangedNotificationReceivedCount, - "Two notifications should be sent when resources are added and two when removed", - ) + await untilAsserted { + assertEquals( + 4, + resourceListChangedNotificationReceivedCount, + "Two notifications should be sent when resources are added and two when removed", + ) + } } @Test @@ -137,8 +139,6 @@ class ServerResourcesNotificationTest : AbstractServerFeaturesTest() { // Try to remove a non-existent resource val result = server.removeResource("non-existent-resource") - // Close the server to stop processing further events and flush notifications - server.close() // Verify the result assertFalse(result, "Removing non-existent resource should return false") diff --git a/kotlin-sdk-test/src/jvmTest/kotlin/io/modelcontextprotocol/kotlin/sdk/server/ServerToolsNotificationTest.kt b/kotlin-sdk-test/src/jvmTest/kotlin/io/modelcontextprotocol/kotlin/sdk/server/ServerToolsNotificationTest.kt index b5a80186..8a7b8bc9 100644 --- a/kotlin-sdk-test/src/jvmTest/kotlin/io/modelcontextprotocol/kotlin/sdk/server/ServerToolsNotificationTest.kt +++ b/kotlin-sdk-test/src/jvmTest/kotlin/io/modelcontextprotocol/kotlin/sdk/server/ServerToolsNotificationTest.kt @@ -12,6 +12,8 @@ import org.junit.jupiter.api.Test import kotlin.test.assertEquals import kotlin.test.assertFalse import kotlin.test.assertTrue +import org.awaitility.kotlin.await +import org.awaitility.kotlin.untilAsserted class ServerToolsNotificationTest : AbstractServerFeaturesTest() { @@ -35,14 +37,14 @@ class ServerToolsNotificationTest : AbstractServerFeaturesTest() { // Remove the tool val result = server.removeTool("test-tool") - // Close the server to stop processing further events and flush notifications - server.close() // Verify the tool was removed assertTrue(result, "Tool should be removed successfully") // Verify that the notification was sent - assertTrue(toolListChangedNotificationReceived, "Notification should be sent when tool is added") + await untilAsserted { + assertTrue(toolListChangedNotificationReceived, "Notification should be sent when tool is added") + } } @Test @@ -64,18 +66,17 @@ class ServerToolsNotificationTest : AbstractServerFeaturesTest() { // Remove the tools val result = server.removeTools(listOf("test-tool-1", "test-tool-2")) - // Close the server to stop processing further events and flush notifications - server.close() - // Verify the tools were removed assertEquals(2, result, "Both tools should be removed") // Verify that the notifications were sent twice - assertEquals( - 4, - toolListChangedNotificationReceivedCount, - "Two notifications should be sent when tools are added and two when removed", - ) + await untilAsserted { + assertEquals( + 4, + toolListChangedNotificationReceivedCount, + "Two notifications should be sent when tools are added and two when removed", + ) + } } @Test From fde0d7aecdc2841901e423f10d023f6827dba808 Mon Sep 17 00:00:00 2001 From: Maria Tigina Date: Tue, 2 Dec 2025 22:07:32 +0100 Subject: [PATCH 08/12] Fix linter --- .../kotlin/sdk/server/ServerResourcesNotificationTest.kt | 4 ++-- .../kotlin/sdk/server/ServerToolsNotificationTest.kt | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/kotlin-sdk-test/src/jvmTest/kotlin/io/modelcontextprotocol/kotlin/sdk/server/ServerResourcesNotificationTest.kt b/kotlin-sdk-test/src/jvmTest/kotlin/io/modelcontextprotocol/kotlin/sdk/server/ServerResourcesNotificationTest.kt index 0267549c..52a42613 100644 --- a/kotlin-sdk-test/src/jvmTest/kotlin/io/modelcontextprotocol/kotlin/sdk/server/ServerResourcesNotificationTest.kt +++ b/kotlin-sdk-test/src/jvmTest/kotlin/io/modelcontextprotocol/kotlin/sdk/server/ServerResourcesNotificationTest.kt @@ -7,12 +7,12 @@ import io.modelcontextprotocol.kotlin.sdk.types.ServerCapabilities import io.modelcontextprotocol.kotlin.sdk.types.TextResourceContents import kotlinx.coroutines.CompletableDeferred import kotlinx.coroutines.test.runTest +import org.awaitility.kotlin.await +import org.awaitility.kotlin.untilAsserted import org.junit.jupiter.api.Test import kotlin.test.assertEquals import kotlin.test.assertFalse import kotlin.test.assertTrue -import org.awaitility.kotlin.await -import org.awaitility.kotlin.untilAsserted class ServerResourcesNotificationTest : AbstractServerFeaturesTest() { diff --git a/kotlin-sdk-test/src/jvmTest/kotlin/io/modelcontextprotocol/kotlin/sdk/server/ServerToolsNotificationTest.kt b/kotlin-sdk-test/src/jvmTest/kotlin/io/modelcontextprotocol/kotlin/sdk/server/ServerToolsNotificationTest.kt index 8a7b8bc9..5892532e 100644 --- a/kotlin-sdk-test/src/jvmTest/kotlin/io/modelcontextprotocol/kotlin/sdk/server/ServerToolsNotificationTest.kt +++ b/kotlin-sdk-test/src/jvmTest/kotlin/io/modelcontextprotocol/kotlin/sdk/server/ServerToolsNotificationTest.kt @@ -8,12 +8,12 @@ import io.modelcontextprotocol.kotlin.sdk.types.ToolListChangedNotification import io.modelcontextprotocol.kotlin.sdk.types.ToolSchema import kotlinx.coroutines.CompletableDeferred import kotlinx.coroutines.test.runTest +import org.awaitility.kotlin.await +import org.awaitility.kotlin.untilAsserted import org.junit.jupiter.api.Test import kotlin.test.assertEquals import kotlin.test.assertFalse import kotlin.test.assertTrue -import org.awaitility.kotlin.await -import org.awaitility.kotlin.untilAsserted class ServerToolsNotificationTest : AbstractServerFeaturesTest() { From b1570ce87de09bdcd688665bd466c58d73f5292a Mon Sep 17 00:00:00 2001 From: Maria Tigina Date: Wed, 3 Dec 2025 14:10:18 +0100 Subject: [PATCH 09/12] Fix review comments Fix logging levels Extract send event handling --- .../sdk/server/FeatureNotificationService.kt | 119 ++++++++++-------- 1 file changed, 66 insertions(+), 53 deletions(-) diff --git a/kotlin-sdk-server/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/server/FeatureNotificationService.kt b/kotlin-sdk-server/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/server/FeatureNotificationService.kt index d63e4782..6f52e78a 100644 --- a/kotlin-sdk-server/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/server/FeatureNotificationService.kt +++ b/kotlin-sdk-server/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/server/FeatureNotificationService.kt @@ -82,52 +82,7 @@ private class SessionNotificationJob { events.takeWhile { it !is EndEvent }.collect { event -> when (event) { is SendEvent -> { - if (event.timestamp >= fromTimestamp) { - when (val notification = event.notification) { - is PromptListChangedNotification, - is ResourceListChangedNotification, - is ToolListChangedNotification, - -> { - logger.info { - "Sending list changed notification for sessionId: ${session.sessionId}" - } - session.notification(notification) - } - - is ResourceUpdatedNotification -> { - resourceSubscriptions.value[notification.params.uri]?.let { resourceFromTimestamp -> - if (event.timestamp >= resourceFromTimestamp) { - logger.info { - "Sending resource updated notification for resource " + - "${notification.params.uri} " + - "to sessionId: ${session.sessionId}" - } - session.notification(notification) - } else { - logger.info { - "Skipping resource updated notification for resource " + - "${notification.params.uri} " + - "as it is older than subscription timestamp $resourceFromTimestamp" - } - } - } ?: run { - logger.info { - "No subscription for resource ${notification.params.uri}. " + - "Skipping notification: $notification" - } - } - } - - else -> { - logger.warn { "Skipping notification: $notification" } - } - } - } else { - logger.info { - "Skipping event with id: ${event.timestamp} " + - "as it is older than startingEventId $fromTimestamp: $event" - } - } + handleSendNotificationEvent(event, session, fromTimestamp) } else -> { @@ -138,6 +93,58 @@ private class SessionNotificationJob { } } + /** + * Handles sending a notification event to a specific server session. + * + * @param event The notification event to be processed. + * @param session The server session to which the notification should be sent. + * @param fromTimestamp The timestamp to filter events. + * Notifications with timestamps older than this value are skipped. + */ + private suspend fun handleSendNotificationEvent(event: SendEvent, session: ServerSession, fromTimestamp: Long) { + if (event.timestamp < fromTimestamp) { + logger.info { + "Skipping event with id: ${event.timestamp} as it is older than startingEventId $fromTimestamp: $event" + } + return + } + when (val notification = event.notification) { + is PromptListChangedNotification, + is ResourceListChangedNotification, + is ToolListChangedNotification, + -> { + logger.info { "Sending list changed notification for sessionId: ${session.sessionId}" } + session.notification(notification) + } + + is ResourceUpdatedNotification -> { + resourceSubscriptions.value[notification.params.uri]?.let { resourceFromTimestamp -> + if (event.timestamp >= resourceFromTimestamp) { + logger.info { + "Sending resource updated notification for resource ${notification.params.uri} " + + "to sessionId: ${session.sessionId}" + } + session.notification(notification) + } else { + logger.info { + "Skipping resource updated notification for resource ${notification.params.uri} " + + "as it is older than subscription timestamp $resourceFromTimestamp" + } + } + } ?: { + logger.info { + "No subscription for resource ${notification.params.uri}. " + + "Skipping notification: $notification" + } + } + } + + else -> { + logger.warn { "Skipping notification: $notification" } + } + } + } + /** * Subscribes to a resource identified by the given feature key. * @@ -157,10 +164,16 @@ private class SessionNotificationJob { resourceSubscriptions.getAndUpdate { it.remove(resourceKey) } } + /** + * Waits for the notification service to complete its operations. + */ suspend fun join() { job.join() } + /** + * Cancels the notification service job. + */ fun cancel() { job.cancel() } @@ -216,7 +229,7 @@ internal class FeatureNotificationService( object : FeatureListener { override fun onFeatureUpdated(featureKey: FeatureKey) { val notification = notificationProvider(featureKey) - logger.info { "Emitting notification: ${notification.method.value}" } + logger.debug { "Emitting notification: ${notification.method.value}" } emit(notification) } } @@ -247,7 +260,7 @@ internal class FeatureNotificationService( val timestamp = getCurrentTimestamp() if (closingService.value) { - logger.warn { "Skipping subscription notification as service is closing: ${session.sessionId}" } + logger.debug { "Skipping subscription notification as service is closing: ${session.sessionId}" } return } @@ -321,7 +334,7 @@ internal class FeatureNotificationService( // Create a timestamp before emit to ensure notifications are processed in order val timestamp = getCurrentTimestamp() if (closingService.value) { - logger.warn { "Skipping emitting notification as service is closing: $notification" } + logger.debug { "Skipping emitting notification as service is closing: $notification" } return } @@ -329,9 +342,9 @@ internal class FeatureNotificationService( // Launching emit lazily to put it to the jobs queue before the completion val job = notificationScope.launch(start = CoroutineStart.LAZY) { - logger.info { "Actually emitting notification $timestamp: $notification" } + logger.debug { "Actually emitting notification $timestamp: $notification" } notificationEvents.emit(SendEvent(timestamp, notification)) - logger.info { "Notification emitted $timestamp: $notification" } + logger.debug { "Notification emitted $timestamp: $notification" } } // Add job to set before starting @@ -354,7 +367,7 @@ internal class FeatureNotificationService( logger.info { "Closing feature notification service" } closingService.compareAndSet(false, update = true) - // Making sure all emit jobs are completed + // Making sure all emitting jobs are completed activeEmitJobs.value.joinAll() // Emitting end event to complete all session notification jobs @@ -364,7 +377,7 @@ internal class FeatureNotificationService( logger.info { "End event emitted" } }.join() - // Making sure all session notification jobs are completed (after receiving end event) + // Making sure all session notification jobs are completed (after receiving the end event) sessionNotificationJobs.value.values.forEach { it.join() } // Cancelling notification scope to stop processing further events notificationScope.cancel() From 97df2661559b41e596f13ff032877f9b868466bd Mon Sep 17 00:00:00 2001 From: Maria Tigina Date: Wed, 3 Dec 2025 15:22:28 +0100 Subject: [PATCH 10/12] Remove unused closingService flag --- .../sdk/server/FeatureNotificationService.kt | 20 ++++--------------- 1 file changed, 4 insertions(+), 16 deletions(-) diff --git a/kotlin-sdk-server/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/server/FeatureNotificationService.kt b/kotlin-sdk-server/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/server/FeatureNotificationService.kt index 6f52e78a..397e6e19 100644 --- a/kotlin-sdk-server/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/server/FeatureNotificationService.kt +++ b/kotlin-sdk-server/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/server/FeatureNotificationService.kt @@ -208,9 +208,6 @@ internal class FeatureNotificationService( /** Coroutine scope used to handle asynchronous notifications. */ private val notificationScope = CoroutineScope(SupervisorJob() + Dispatchers.Default) - /** Flag indicating whether the service is closing. */ - private val closingService = atomic(false) - /** Shared flow used to emit events within the feature notification service. */ private val notificationEvents = MutableSharedFlow( extraBufferCapacity = notificationBufferCapacity, @@ -218,7 +215,7 @@ internal class FeatureNotificationService( ) /** Active emit jobs. */ - private val activeEmitJobs = atomic(persistentSetOf()) + private val emitJobs = atomic(persistentSetOf()) /** Notification jobs associated with sessions. */ private val sessionNotificationJobs = atomic(persistentMapOf()) @@ -259,10 +256,6 @@ internal class FeatureNotificationService( logger.info { "Subscribing session for notifications sessionId: ${session.sessionId}" } val timestamp = getCurrentTimestamp() - if (closingService.value) { - logger.debug { "Skipping subscription notification as service is closing: ${session.sessionId}" } - return - } sessionNotificationJobs.getAndUpdate { if (it.containsKey(session.sessionId)) { @@ -333,10 +326,6 @@ internal class FeatureNotificationService( private fun emit(notification: Notification) { // Create a timestamp before emit to ensure notifications are processed in order val timestamp = getCurrentTimestamp() - if (closingService.value) { - logger.debug { "Skipping emitting notification as service is closing: $notification" } - return - } logger.info { "Emitting notification $timestamp: $notification" } @@ -348,11 +337,11 @@ internal class FeatureNotificationService( } // Add job to set before starting - activeEmitJobs.getAndUpdate { it.add(job) } + emitJobs.getAndUpdate { it.add(job) } // Register completion job.invokeOnCompletion { - activeEmitJobs.getAndUpdate { it.remove(job) } + emitJobs.getAndUpdate { it.remove(job) } } // Start the job after it's safely added @@ -365,10 +354,9 @@ internal class FeatureNotificationService( suspend fun close() { logger.info { "Closing feature notification service" } - closingService.compareAndSet(false, update = true) // Making sure all emitting jobs are completed - activeEmitJobs.value.joinAll() + emitJobs.value.joinAll() // Emitting end event to complete all session notification jobs notificationScope.launch { From bbb28778b538aa0d44dfccc4b93b273dde342583 Mon Sep 17 00:00:00 2001 From: Maria Tigina Date: Wed, 3 Dec 2025 16:01:03 +0100 Subject: [PATCH 11/12] Update server kdoc with notification information --- .../kotlin/sdk/server/Server.kt | 52 +++++++++++++------ 1 file changed, 35 insertions(+), 17 deletions(-) diff --git a/kotlin-sdk-server/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/server/Server.kt b/kotlin-sdk-server/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/server/Server.kt index 97d309dc..9d53bbb3 100644 --- a/kotlin-sdk-server/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/server/Server.kt +++ b/kotlin-sdk-server/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/server/Server.kt @@ -63,6 +63,11 @@ public class ServerOptions(public val capabilities: ServerCapabilities, enforceS * You can register tools, prompts, and resources using [addTool], [addPrompt], and [addResource]. * The server will then automatically handle listing and retrieval requests from the client. * + * In case the server supports feature list notification or resource substitution, + * the server will automatically send notifications for all connected clients. + * Currently, after subscription to a resource, the server will NOT send the subscription configuration + * as this response shema is not defined in the protocol. + * * @param serverInfo Information about this server implementation (name, version). * @param options Configuration options for the server. * @param instructionsProvider Optional provider for instructions from the server to the client about how to use @@ -91,17 +96,6 @@ public open class Server( block: Server.() -> Unit = {}, ) : this(serverInfo, options, { instructions }, block) - private val sessionRegistry = ServerSessionRegistry() - - @OptIn(ExperimentalTime::class) - private val notificationService = FeatureNotificationService() - - /** - * Provides a snapshot of all sessions currently registered in the server - */ - public val sessions: Map - get() = sessionRegistry.sessions - @Suppress("ktlint:standard:backing-property-naming") private var _onInitialized: (() -> Unit) = {} @@ -111,6 +105,11 @@ public open class Server( @Suppress("ktlint:standard:backing-property-naming") private var _onClose: () -> Unit = {} + @OptIn(ExperimentalTime::class) + private val notificationService = FeatureNotificationService() + + private val sessionRegistry = ServerSessionRegistry() + private val toolRegistry = FeatureRegistry("Tool").apply { if (options.capabilities.tools?.listChanged ?: false) { addListener(notificationService.toolListChangedListener) @@ -130,10 +129,27 @@ public open class Server( } } + /** + * Provides a snapshot of all sessions currently registered in the server + */ + public val sessions: Map + get() = sessionRegistry.sessions + + /** + * Provides a snapshot of all tools currently registered in the server + */ public val tools: Map get() = toolRegistry.values + + /** + * Provides a snapshot of all prompts currently registered in the server + */ public val prompts: Map get() = promptRegistry.values + + /** + * Provides a snapshot of all resources currently registered in the server + */ public val resources: Map get() = resourceRegistry.values @@ -209,10 +225,12 @@ public open class Server( if (options.capabilities.resources?.subscribe ?: false) { session.setRequestHandler(Method.Defined.ResourcesSubscribe) { request, _ -> handleSubscribeResources(session, request) + // Does not return any confirmation as the structure is not stated in the protocol null } session.setRequestHandler(Method.Defined.ResourcesUnsubscribe) { request, _ -> handleUnsubscribeResources(session, request) + // Does not return any confirmation as the structure is not stated in the protocol null } } @@ -521,7 +539,7 @@ public open class Server( } // --- Internal Handlers --- - private suspend fun handleSubscribeResources(session: ServerSession, request: SubscribeRequest) { + private fun handleSubscribeResources(session: ServerSession, request: SubscribeRequest) { if (options.capabilities.resources?.subscribe ?: false) { logger.debug { "Subscribing to resources" } notificationService.subscribeToResourceUpdate(session, request.params.uri) @@ -530,7 +548,7 @@ public open class Server( } } - private suspend fun handleUnsubscribeResources(session: ServerSession, request: UnsubscribeRequest) { + private fun handleUnsubscribeResources(session: ServerSession, request: UnsubscribeRequest) { if (options.capabilities.resources?.subscribe ?: false) { logger.debug { "Unsubscribing from resources" } notificationService.unsubscribeFromResourceUpdate(session, request.params.uri) @@ -539,7 +557,7 @@ public open class Server( } } - private suspend fun handleListTools(): ListToolsResult { + private fun handleListTools(): ListToolsResult { val toolList = tools.values.map { it.tool } return ListToolsResult(tools = toolList, nextCursor = null) } @@ -573,7 +591,7 @@ public open class Server( } } - private suspend fun handleListPrompts(): ListPromptsResult { + private fun handleListPrompts(): ListPromptsResult { logger.debug { "Handling list prompts request" } return ListPromptsResult(prompts = prompts.values.map { it.prompt }) } @@ -589,7 +607,7 @@ public open class Server( return prompt.messageProvider(request) } - private suspend fun handleListResources(): ListResourcesResult { + private fun handleListResources(): ListResourcesResult { logger.debug { "Handling list resources request" } return ListResourcesResult(resources = resources.values.map { it.resource }) } @@ -605,7 +623,7 @@ public open class Server( return resource.readHandler(request) } - private suspend fun handleListResourceTemplates(): ListResourceTemplatesResult { + private fun handleListResourceTemplates(): ListResourceTemplatesResult { // If you have resource templates, return them here. For now, return empty. return ListResourceTemplatesResult(listOf()) } From 776dd959a98f6570b8f674a959e1e82bb23f4bb1 Mon Sep 17 00:00:00 2001 From: Maria Tigina Date: Wed, 3 Dec 2025 16:26:30 +0100 Subject: [PATCH 12/12] Fix typo --- .../io/modelcontextprotocol/kotlin/sdk/server/Server.kt | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/kotlin-sdk-server/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/server/Server.kt b/kotlin-sdk-server/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/server/Server.kt index 9d53bbb3..ef52d728 100644 --- a/kotlin-sdk-server/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/server/Server.kt +++ b/kotlin-sdk-server/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/server/Server.kt @@ -65,8 +65,8 @@ public class ServerOptions(public val capabilities: ServerCapabilities, enforceS * * In case the server supports feature list notification or resource substitution, * the server will automatically send notifications for all connected clients. - * Currently, after subscription to a resource, the server will NOT send the subscription configuration - * as this response shema is not defined in the protocol. + * Currently, after subscription to a resource, the server will NOT send the subscription confirmation + * as this response schema is not defined in the protocol. * * @param serverInfo Information about this server implementation (name, version). * @param options Configuration options for the server.