-
Notifications
You must be signed in to change notification settings - Fork 2.1k
Reject outstanding requests to dropped clients #9463
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This pull request addresses the issue of rejecting outstanding requests to dropped clients by refining client disconnection handling and updating related test cases. Key changes include:
- Fixing a spelling mistake in a comment from "immideately" to "immediately" in the timeout tests.
- Introducing extensive client disconnection tests in the Tester project.
- Updating methods in production code (SiloControl, Gateway, and others) to handle client disconnection more robustly and to use the new OnResponse callback pattern.
Reviewed Changes
Copilot reviewed 17 out of 17 changed files in this pull request and generated no comments.
Show a summary per file
File | Description |
---|---|
test/TesterInternal/TimeoutTests.cs | Corrects a spelling mistake in a comment. |
test/Tester/ClientConnectionTests/ClientDisconnectionTests.cs | Adds tests to validate client disconnection scenarios and message rejections. |
src/Orleans.TestingHost/InProcTestCluster.cs | Introduces multi-client support and related nullability enhancements. |
src/Orleans.Runtime/Silo/SiloControl.cs | Adds a DropDisconnectedClients method using a new parameter to optionally exclude recent disconnects. |
src/Orleans.Runtime/Networking/GatewayConnectionListener.cs | Removes unused logger initialization for simplicity. |
src/Orleans.Runtime/Messaging/Gateway.cs | Implements new client message handling via WorkItemType and updates message tracking. |
src/Orleans.Runtime/Core/ManagementGrain.cs | Exposes a DropDisconnectedClients method to orchestrate client disconnection among silos. |
src/Orleans.Runtime/Core/InsideRuntimeClient.cs, HostedClient.cs, OutsideRuntimeClient.cs, CallbackData.cs, EventSourceEvents.cs | Updates callback methods from DoCallback to OnResponse and adjusts disposal behavior. |
src/Orleans.Core/SystemTargetInterfaces/* | Updates ISiloControl and IManagementGrain interfaces to reflect the new DropDisconnectedClients API. |
src/Orleans.Core/GrainReferences/GrainReferenceActivator.cs | Minor refactoring by removing an unused injected dependency. |
src/Orleans.Core/Networking/ConnectionManager.cs | Updates the shutdown exception message for clarity. |
src/Orleans.Core.Abstractions/IDs/ClientGrainId.cs | Adds a helper method for client equality evaluation. |
var message = callback.Value.Message; | ||
var response = messageFactory.CreateRejectionResponse(message, Message.RejectionTypes.Unrecoverable, "Client is shutting down.", null); | ||
callback.Value.OnResponse(response); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reject all outbound requests from a client when it is disposed. Future requests are already prevented by an ObjectDisposedException
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we have a longer message explaining that the message might have been sent but we don't wait the response because we are shutting down? Maybe it's too much
/// </summary> | ||
/// <param name="excludeRecent">If true, only clients that have been disconnected for longer than the configured client expiration time will be dropped.</param> | ||
/// <returns>A task representing the work performed.</returns> | ||
Task DropDisconnectedClients(bool excludeRecent); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added for testing
@@ -183,8 +180,9 @@ public void SetComponent<TComponent>(TComponent? instance) where TComponent : cl | |||
/// <inheritdoc /> | |||
public bool TryDispatchToClient(Message message) | |||
{ | |||
if (!ClientGrainId.TryParse(message.TargetGrain, out var targetClient) || !this.ClientId.Equals(targetClient)) | |||
if (!ClientId.IsClientEqual(message.TargetGrain)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Cheaper comparison (no allocs). We cannot do direct GrainId comparisons because observers on a client have different (extended) ids from the client itself.
var response = messageFactory.CreateRejectionResponse(message, Message.RejectionTypes.Unrecoverable, "Host is shutting down.", null); | ||
callback.Value.OnResponse(response); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Similar to the above, when a host is disposed, we reject all outbound requests.
@@ -280,7 +296,8 @@ private class ClientState | |||
{ | |||
private readonly Gateway _gateway; | |||
private readonly Task _messageLoop; | |||
private readonly ConcurrentQueue<Message> _pendingToSend = new(); | |||
private readonly Dictionary<(GrainId, CorrelationId), Message> _outstandingRequestsToClient = []; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the meat of the PR: tracking the outstanding requests to a client. We modify ClientState
to track these requests and perform updates to that state via the message loop with the exception that when a client is dropped, we enumerate the data structures and reject all messages
|
||
// Exchange references, so each one has a reference to the other which is bound to its client. | ||
var observerBId = observerB.SelfReference.GetGrainId(); | ||
var aToB = (IEchoGrainObserver)clientA.ServiceProvider.GetRequiredService<GrainReferenceActivator>().CreateReference(observerBId, GrainInterfaceType.Create("IEchoGrainObserver")); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a bit awkward. Since GrainReference
is immutable, we don't rebind it from one runtime to another when we copy it, so if we were to pass it directly to the client, it would still have the runtime of the originator. It's not likely an issue in practice, but makes these testing scenarios where you have multiple clients in a single process awkward.
/// </summary> | ||
/// <param name="other">The <see cref="GrainId"/> to compare.</param> | ||
/// <returns><see langword="true"/> if the provided <see cref="GrainId"/> corresponds to the same client, otherwise <see langword="false"/>.</returns> | ||
public bool IsClientEqual(GrainId other) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So IsClientEqual
and Equals
can return different result?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These GrainIds take the form sys.client/ClientId[+ObserverId]
. When comparing them to see if we are pointing to the correct client, we want to only compare the ClientId
component, not the ObserverId
component.
IsClientEqual
and Equals
will return the same result because ClientGrainId
strips off the +[ObserverId]
portion in TryParse
(and we don't have an Equals
overload which accepts a GrainId
directly). ClientGrainId.GrainId.Equals(otherGrainId)
could return a different result because otherGrainId
might still have the ObserverId portion.
@@ -187,6 +187,11 @@ internal SiloAddress TryToReroute(Message msg) | |||
|
|||
if (msg.Direction != Message.Directions.Response) | |||
{ | |||
if (ClientGrainId.TryParse(msg.SendingGrain, out var clientId) && clients.TryGetValue(clientId, out var clientState)) | |||
{ | |||
clientState.OnClientResponse(msg); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Isn't it possible that the client is still connected to other silos in the cluster? Or is it an edge case?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the sender grain is hosted locally, this will catch the response.
If the sender grain is hosted remotely, this might not catch the response if the client sends the response via a different gateway.
Clients will send responses via the gateway that sent them the request unless that gateway becomes unavailable.
If we reject messages as soon as a client disconnects, that should address this.
Also, if the request expires, we should remove it from this dictionary. We could do this when scanning for dropped clients.
EDIT: The crossed-out bit above is not always true: we only override the SendingSilo to the gateway address if the SendingSilo wasn't already stamped on the message. Given that, we might need another solution. Can you think of any, @benjaminpetit?
Fixes #9462
Microsoft Reviewers: Open in CodeFlow