Skip to content

Reject outstanding requests to dropped clients #9463

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: main
Choose a base branch
from

Conversation

ReubenBond
Copy link
Member

@ReubenBond ReubenBond commented Apr 25, 2025

Fixes #9462

Microsoft Reviewers: Open in CodeFlow

@ReubenBond ReubenBond requested a review from Copilot April 25, 2025 23:44
Copy link

@Copilot Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This pull request addresses the issue of rejecting outstanding requests to dropped clients by refining client disconnection handling and updating related test cases. Key changes include:

  • Fixing a spelling mistake in a comment from "immideately" to "immediately" in the timeout tests.
  • Introducing extensive client disconnection tests in the Tester project.
  • Updating methods in production code (SiloControl, Gateway, and others) to handle client disconnection more robustly and to use the new OnResponse callback pattern.

Reviewed Changes

Copilot reviewed 17 out of 17 changed files in this pull request and generated no comments.

Show a summary per file
File Description
test/TesterInternal/TimeoutTests.cs Corrects a spelling mistake in a comment.
test/Tester/ClientConnectionTests/ClientDisconnectionTests.cs Adds tests to validate client disconnection scenarios and message rejections.
src/Orleans.TestingHost/InProcTestCluster.cs Introduces multi-client support and related nullability enhancements.
src/Orleans.Runtime/Silo/SiloControl.cs Adds a DropDisconnectedClients method using a new parameter to optionally exclude recent disconnects.
src/Orleans.Runtime/Networking/GatewayConnectionListener.cs Removes unused logger initialization for simplicity.
src/Orleans.Runtime/Messaging/Gateway.cs Implements new client message handling via WorkItemType and updates message tracking.
src/Orleans.Runtime/Core/ManagementGrain.cs Exposes a DropDisconnectedClients method to orchestrate client disconnection among silos.
src/Orleans.Runtime/Core/InsideRuntimeClient.cs, HostedClient.cs, OutsideRuntimeClient.cs, CallbackData.cs, EventSourceEvents.cs Updates callback methods from DoCallback to OnResponse and adjusts disposal behavior.
src/Orleans.Core/SystemTargetInterfaces/* Updates ISiloControl and IManagementGrain interfaces to reflect the new DropDisconnectedClients API.
src/Orleans.Core/GrainReferences/GrainReferenceActivator.cs Minor refactoring by removing an unused injected dependency.
src/Orleans.Core/Networking/ConnectionManager.cs Updates the shutdown exception message for clarity.
src/Orleans.Core.Abstractions/IDs/ClientGrainId.cs Adds a helper method for client equality evaluation.

var message = callback.Value.Message;
var response = messageFactory.CreateRejectionResponse(message, Message.RejectionTypes.Unrecoverable, "Client is shutting down.", null);
callback.Value.OnResponse(response);
}
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reject all outbound requests from a client when it is disposed. Future requests are already prevented by an ObjectDisposedException

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we have a longer message explaining that the message might have been sent but we don't wait the response because we are shutting down? Maybe it's too much

/// </summary>
/// <param name="excludeRecent">If true, only clients that have been disconnected for longer than the configured client expiration time will be dropped.</param>
/// <returns>A task representing the work performed.</returns>
Task DropDisconnectedClients(bool excludeRecent);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added for testing

@@ -183,8 +180,9 @@ public void SetComponent<TComponent>(TComponent? instance) where TComponent : cl
/// <inheritdoc />
public bool TryDispatchToClient(Message message)
{
if (!ClientGrainId.TryParse(message.TargetGrain, out var targetClient) || !this.ClientId.Equals(targetClient))
if (!ClientId.IsClientEqual(message.TargetGrain))
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cheaper comparison (no allocs). We cannot do direct GrainId comparisons because observers on a client have different (extended) ids from the client itself.

var response = messageFactory.CreateRejectionResponse(message, Message.RejectionTypes.Unrecoverable, "Host is shutting down.", null);
callback.Value.OnResponse(response);
}
}
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar to the above, when a host is disposed, we reject all outbound requests.

@@ -280,7 +296,8 @@ private class ClientState
{
private readonly Gateway _gateway;
private readonly Task _messageLoop;
private readonly ConcurrentQueue<Message> _pendingToSend = new();
private readonly Dictionary<(GrainId, CorrelationId), Message> _outstandingRequestsToClient = [];
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the meat of the PR: tracking the outstanding requests to a client. We modify ClientState to track these requests and perform updates to that state via the message loop with the exception that when a client is dropped, we enumerate the data structures and reject all messages


// Exchange references, so each one has a reference to the other which is bound to its client.
var observerBId = observerB.SelfReference.GetGrainId();
var aToB = (IEchoGrainObserver)clientA.ServiceProvider.GetRequiredService<GrainReferenceActivator>().CreateReference(observerBId, GrainInterfaceType.Create("IEchoGrainObserver"));
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a bit awkward. Since GrainReference is immutable, we don't rebind it from one runtime to another when we copy it, so if we were to pass it directly to the client, it would still have the runtime of the originator. It's not likely an issue in practice, but makes these testing scenarios where you have multiple clients in a single process awkward.

/// </summary>
/// <param name="other">The <see cref="GrainId"/> to compare.</param>
/// <returns><see langword="true"/> if the provided <see cref="GrainId"/> corresponds to the same client, otherwise <see langword="false"/>.</returns>
public bool IsClientEqual(GrainId other)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So IsClientEqual and Equals can return different result?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These GrainIds take the form sys.client/ClientId[+ObserverId]. When comparing them to see if we are pointing to the correct client, we want to only compare the ClientId component, not the ObserverId component.
IsClientEqual and Equals will return the same result because ClientGrainId strips off the +[ObserverId] portion in TryParse (and we don't have an Equals overload which accepts a GrainId directly). ClientGrainId.GrainId.Equals(otherGrainId) could return a different result because otherGrainId might still have the ObserverId portion.

@@ -187,6 +187,11 @@ internal SiloAddress TryToReroute(Message msg)

if (msg.Direction != Message.Directions.Response)
{
if (ClientGrainId.TryParse(msg.SendingGrain, out var clientId) && clients.TryGetValue(clientId, out var clientState))
{
clientState.OnClientResponse(msg);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't it possible that the client is still connected to other silos in the cluster? Or is it an edge case?

Copy link
Member Author

@ReubenBond ReubenBond Apr 30, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the sender grain is hosted locally, this will catch the response.
If the sender grain is hosted remotely, this might not catch the response if the client sends the response via a different gateway.

Clients will send responses via the gateway that sent them the request unless that gateway becomes unavailable.

If we reject messages as soon as a client disconnects, that should address this.
Also, if the request expires, we should remove it from this dictionary. We could do this when scanning for dropped clients.

EDIT: The crossed-out bit above is not always true: we only override the SendingSilo to the gateway address if the SendingSilo wasn't already stamped on the message. Given that, we might need another solution. Can you think of any, @benjaminpetit?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Reject outstanding requests to dropped clients
2 participants