Fix Continue-as-new Race Condition at InProcessTestHost#703
Conversation
There was a problem hiding this comment.
Pull request overview
Note
Copilot was unable to run its full agentic suite in this review.
Adds coverage and runtime fixes to address a ContinueAsNew race condition in the in-process test host (durabletask-dotnet #689), focusing on proper async continuations and improved robustness around worker streaming/disconnect behavior.
Changes:
- Add a new stress-oriented test suite validating ContinueAsNew behavior under concurrency and scheduling patterns.
- Run
TaskCompletionSourcecontinuations asynchronously to reduce inline-continuation races/deadlocks. - Adjust gRPC sidecar behavior around history streaming threshold and worker stream disconnect handling.
Reviewed changes
Copilot reviewed 3 out of 3 changed files in this pull request and generated 3 comments.
| File | Description |
|---|---|
| test/InProcessTestHost.Tests/ContinueAsNewTests.cs | New tests exercising ContinueAsNew across single, parallel, rapid-fire, and staggered scheduling scenarios. |
| src/InProcessTestHost/Sidecar/InMemoryOrchestrationService.cs | Makes waiter continuations run asynchronously to avoid continuation-related races. |
| src/InProcessTestHost/Sidecar/Grpc/TaskHubGrpcServer.cs | Introduces a streaming threshold constant, adjusts “no client” exception type, handles a specific stream-closed scenario, and uses async continuations for several TCS instances. |
| catch (InvalidOperationException ex) when (ex.Message.Contains("request is complete", StringComparison.OrdinalIgnoreCase)) | ||
| { | ||
| // The client disconnected or canceled the GetWorkItems stream. | ||
| // Reset the connection state so the dispatcher pauses naturally | ||
| // (via the traffic signal) until a new client connects. | ||
| lock (this.isConnectedSignal) | ||
| { | ||
| this.workerToClientStream = null; | ||
| this.isConnectedSignal.Reset(); | ||
| } | ||
|
|
||
| this.log.LogWarning(ex, "Work-item stream closed by client. Dispatcher will pause until reconnection."); | ||
| } |
There was a problem hiding this comment.
Catching InvalidOperationException based on a substring of the exception message is brittle (message text can change across runtime/framework versions and may vary). Prefer matching on exception types that represent stream cancellation/disconnect (commonly RpcException with Cancelled/Unavailable, or IOException/ObjectDisposedException depending on the gRPC stack), or use a more reliable signal from the server call context if available. This will make the disconnect handling more stable long-term.
| catch (InvalidOperationException ex) when (ex.Message.Contains("request is complete", StringComparison.OrdinalIgnoreCase)) | ||
| { | ||
| // The client disconnected or canceled the GetWorkItems stream. | ||
| // Reset the connection state so the dispatcher pauses naturally | ||
| // (via the traffic signal) until a new client connects. | ||
| lock (this.isConnectedSignal) | ||
| { | ||
| this.workerToClientStream = null; | ||
| this.isConnectedSignal.Reset(); | ||
| } | ||
|
|
||
| // Must throw so callers (ExecuteOrchestrator/ExecuteActivity) can clean up | ||
| // their pending TCS. The dispatcher catches this, abandons the work item, | ||
| // and releases it back to the queue for retry. | ||
| throw new OperationCanceledException("Work-item stream closed by client.", ex); | ||
| } |
There was a problem hiding this comment.
Catching InvalidOperationException based on a substring match of ex.Message is brittle (message text can change across framework versions/locales and is not a stable contract). Prefer handling stream-closure in a way that doesn’t depend on message text (e.g., catch InvalidOperationException without a message filter and/or handle additional common stream write failures like ObjectDisposedException/IOException, then treat them uniformly as a disconnected client).
There was a problem hiding this comment.
I agree with this comment, but I will not block on this. If the message text changes, the new tests should catch this. We should look for other ways to detect this condition: perhaps the exception object has more clues in other properties, or we can infer this from other context. However, this can be addressed later -- I recommend filing a separate follow-up issue now, but no need to block releasing the fix.
| // Must throw so callers (ExecuteOrchestrator/ExecuteActivity) can clean up | ||
| // their pending TCS. The dispatcher catches this, abandons the work item, | ||
| // and releases it back to the queue for retry. | ||
| throw new OperationCanceledException("Work-item stream closed by client.", ex); |
There was a problem hiding this comment.
Throwing OperationCanceledException from gRPC server code can surface to callers as an unexpected/opaque gRPC status (often Unknown) unless consistently translated upstream. Since this is explicitly a client-disconnect/stream-cancel scenario, consider throwing an RpcException with an appropriate status code (e.g., Cancelled or Unavailable) so the failure mode is explicit and consistent with the earlier No client is connected path.
| throw new OperationCanceledException("Work-item stream closed by client.", ex); | |
| throw new RpcException(new Status(StatusCode.Cancelled, "Work-item stream closed by client.")); |
AnatoliB
left a comment
There was a problem hiding this comment.
LGTM, just one non-blocking comment.
As titled. Fix issue #689