Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ public async ValueTask DisposeAsync()
public async Task SendMessageAsync(JsonRpcMessage message, CancellationToken cancellationToken = default)
{
Throw.IfNull(message);
// If the underlying writer has been disposed, just drop the message.
await _sseWriter.SendMessageAsync(message, cancellationToken).ConfigureAwait(false);
}

Expand Down
10 changes: 5 additions & 5 deletions src/ModelContextProtocol.Core/Server/SseWriter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -47,22 +47,22 @@ public Task WriteAllAsync(Stream sseResponseStream, CancellationToken cancellati
return _writeTask;
}

public async Task SendMessageAsync(JsonRpcMessage message, CancellationToken cancellationToken = default)
public async Task<bool> SendMessageAsync(JsonRpcMessage message, CancellationToken cancellationToken = default)
{
Throw.IfNull(message);

using var _ = await _disposeLock.LockAsync(cancellationToken).ConfigureAwait(false);

if (_disposed)
{
// Don't throw an ODE, because this is disposed internally when the transport disconnects due to an abort
// or sending all the responses for the a give given Streamable HTTP POST request, so the user might not be at fault.
// There's precedence for no-oping here similar to writing to the response body of an aborted request in ASP.NET Core.
return;
// Don't throw ObjectDisposedException here; just return false to indicate the message wasn't sent.
// The calling transport can determine what to do in this case (drop the message, or fall back to another transport).
return false;
}

// Emit redundant "event: message" lines for better compatibility with other SDKs.
await _messages.Writer.WriteAsync(new SseItem<JsonRpcMessage?>(message, SseParser.EventTypeDefault), cancellationToken).ConfigureAwait(false);
return true;
}

public async ValueTask DisposeAsync()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,13 @@ public async Task SendMessageAsync(JsonRpcMessage message, CancellationToken can
throw new InvalidOperationException("Server to client requests are not supported in stateless mode.");
}

await _sseWriter.SendMessageAsync(message, cancellationToken).ConfigureAwait(false);
bool isAccepted = await _sseWriter.SendMessageAsync(message, cancellationToken).ConfigureAwait(false);
if (!isAccepted)
{
// The underlying writer didn't accept the message because the underlying request has completed.
// Rather than drop the message, fall back to sending it via the parent transport.
await parentTransport.SendMessageAsync(message, cancellationToken).ConfigureAwait(false);
}
}

public async ValueTask DisposeAsync()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ public async Task SendMessageAsync(JsonRpcMessage message, CancellationToken can
throw new InvalidOperationException("Unsolicited server to client messages are not supported in stateless mode.");
}

// If the underlying writer has been disposed, just drop the message.
await _sseWriter.SendMessageAsync(message, cancellationToken).ConfigureAwait(false);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -521,6 +521,41 @@ public async Task IdleSessionsPastMaxIdleSessionCount_ArePruned_LongestIdleFirst
Assert.StartsWith("MaxIdleSessionCount of 2 exceeded. Closing idle session", idleLimitLogMessage.Message);
}

[Fact]
public async Task McpServer_UsedOutOfScope_CanSendNotifications()
{
McpServer? capturedServer = null;
Builder.Services.AddMcpServer()
.WithHttpTransport()
.WithListResourcesHandler((_, _) => ValueTask.FromResult(new ListResourcesResult()))
.WithSubscribeToResourcesHandler((context, token) =>
{
capturedServer = context.Server;
return ValueTask.FromResult(new EmptyResult());
});

await StartAsync();

string sessionId = await CallInitializeAndValidateAsync();
SetSessionId(sessionId);

// Call the subscribe method to capture the McpServer instance.
using var response = await HttpClient.PostAsync("", JsonContent(Request("resources/subscribe")), TestContext.Current.CancellationToken);
var rpcResponse = await AssertSingleSseResponseAsync(response);
AssertType<EmptyResult>(rpcResponse.Result);
Assert.NotNull(capturedServer);

// Check the captured McpServer instance can send a notification.
await capturedServer.SendNotificationAsync(NotificationMethods.ResourceUpdatedNotification, TestContext.Current.CancellationToken);
using var getResponse = await HttpClient.GetAsync("", HttpCompletionOption.ResponseHeadersRead, TestContext.Current.CancellationToken);
JsonRpcMessage? firstSseMessage = await ReadSseAsync(getResponse.Content)
.Select(data => JsonSerializer.Deserialize<JsonRpcMessage>(data, McpJsonUtilities.DefaultOptions))
.FirstOrDefaultAsync(TestContext.Current.CancellationToken);

var notification = Assert.IsType<JsonRpcNotification>(firstSseMessage);
Assert.Equal(NotificationMethods.ResourceUpdatedNotification, notification.Method);
}

private static StringContent JsonContent(string json) => new StringContent(json, Encoding.UTF8, "application/json");
private static JsonTypeInfo<T> GetJsonTypeInfo<T>() => (JsonTypeInfo<T>)McpJsonUtilities.DefaultOptions.GetTypeInfo(typeof(T));

Expand Down
Loading