Harden gRPC worker and client against silent disconnects#708
Merged
berndverst merged 30 commits intomainfrom Apr 23, 2026
Merged
Harden gRPC worker and client against silent disconnects#708berndverst merged 30 commits intomainfrom
berndverst merged 30 commits intomainfrom
Conversation
Adds Hello deadline, channel recreation after N consecutive failures, jittered exponential backoff, distinct Unauthenticated logging, and DTS health-ping observability to the worker. Mirrors the channel-recreate pattern in the client via a CallInvoker wrapper. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Contributor
There was a problem hiding this comment.
Pull request overview
This PR hardens the Durable Task gRPC worker and client against silent disconnects and wedged HTTP/2 connections by adding bounded handshakes, jittered reconnect backoff, and channel recreation paths (including AzureManaged channel-cache CAS swaps with deferred disposal).
Changes:
- Worker: add Hello deadline, silent-disconnect window changes + HealthPing trace logging, jittered exponential reconnect backoff, and channel recreation signaling.
- Client: introduce a
CallInvokerwrapper that triggers channel recreation after consecutive transport failures, with cooldown/single-flight safeguards and long-poll exclusions. - AzureManaged extensions: wire recreator callbacks through the shared
ConcurrentDictionary<string, Lazy<GrpcChannel>>cache with CAS swap + deferred disposal.
Reviewed changes
Copilot reviewed 16 out of 16 changed files in this pull request and generated 3 comments.
Show a summary per file
| File | Description |
|---|---|
| test/Worker/Grpc.Tests/ReconnectBackoffTests.cs | Adds unit tests for full-jitter exponential reconnect backoff behavior. |
| test/Worker/Grpc.Tests/GrpcDurableTaskWorkerOptionsInternalTests.cs | Adds tests for internal worker option defaults and SetChannelRecreator validation. |
| src/Worker/Grpc/ReconnectBackoff.cs | New helper for capped exponential backoff with jitter. |
| src/Worker/Grpc/ProcessorExitReason.cs | New enum to signal processor exit reason to the outer worker loop. |
| src/Worker/Grpc/Logs.cs | Adds new structured log events for timeout/auth/channel recreation/backoff/HealthPing observability. |
| src/Worker/Grpc/Internal/InternalOptionsExtensions.cs | Adds internal extension method to set a channel recreator callback. |
| src/Worker/Grpc/GrpcDurableTaskWorkerOptions.cs | Adds internal options for deadlines, silent-disconnect timeout, backoff, and recreation thresholds. |
| src/Worker/Grpc/GrpcDurableTaskWorker.cs | Wraps processor execution in a loop and attempts channel recreation when requested. |
| src/Worker/Grpc/GrpcDurableTaskWorker.Processor.cs | Implements Hello deadline, failure tracking, backoff, silent disconnect signaling, and HealthPing trace logging. |
| src/Worker/AzureManaged/DurableTaskSchedulerWorkerExtensions.cs | Adds cached-channel recreation with CAS swap and deferred channel disposal. |
| src/Client/Grpc/Logs.cs | Adds client-side channel recreation logs. |
| src/Client/Grpc/Internal/InternalOptionsExtensions.cs | Adds internal extension method to set a client channel recreator callback. |
| src/Client/Grpc/GrpcDurableTaskClientOptions.cs | Adds internal client options for recreation threshold and cooldown interval. |
| src/Client/Grpc/GrpcDurableTaskClient.cs | Wraps the call invoker with a recreating invoker when enabled; owns lifecycle for Address-created channels. |
| src/Client/Grpc/ChannelRecreatingCallInvoker.cs | New CallInvoker wrapper that recreates channels after consecutive transport failures. |
| src/Client/AzureManaged/DurableTaskSchedulerClientExtensions.cs | Adds cached-channel recreation with CAS swap and deferred channel disposal for AzureManaged clients. |
Comments suppressed due to low confidence (1)
src/Worker/Grpc/GrpcDurableTaskWorker.Processor.cs:427
- ProcessWorkItemsAsync can enter a tight loop if the server completes the work-item stream normally (ReadAllAsync finishes without throwing and without any cancellation). In that case, tokenSource is not cancelled, the cancellation check is false, and the outer while-loop restarts another ReadAllAsync on the same completed stream. Consider treating normal stream completion as a disconnect (e.g., return after the await-foreach completes, or break/throw so the caller reconnects) to avoid spinning and to re-establish the stream.
while (!cancellation.IsCancellationRequested)
{
await foreach (P.WorkItem workItem in stream.ResponseStream.ReadAllAsync(tokenSource.Token))
{
if (silentDisconnectEnabled)
{
timeoutSource.CancelAfter(silentDisconnectTimeout);
}
if (!firstMessageObserved)
{
firstMessageObserved = true;
onFirstMessage?.Invoke();
}
if (workItem.RequestCase == P.WorkItem.RequestOneofCase.OrchestratorRequest)
{
this.RunBackgroundTask(
workItem,
() => this.OnRunOrchestratorAsync(
workItem.OrchestratorRequest,
workItem.CompletionToken,
cancellation),
cancellation);
}
else if (workItem.RequestCase == P.WorkItem.RequestOneofCase.ActivityRequest)
{
this.RunBackgroundTask(
workItem,
() => this.OnRunActivityAsync(
workItem.ActivityRequest,
workItem.CompletionToken,
cancellation),
cancellation);
}
else if (workItem.RequestCase == P.WorkItem.RequestOneofCase.EntityRequest)
{
this.RunBackgroundTask(
workItem,
() => this.OnRunEntityBatchAsync(workItem.EntityRequest.ToEntityBatchRequest(), cancellation),
cancellation);
}
else if (workItem.RequestCase == P.WorkItem.RequestOneofCase.EntityRequestV2)
{
workItem.EntityRequestV2.ToEntityBatchRequest(
out EntityBatchRequest batchRequest,
out List<P.OperationInfo> operationInfos);
this.RunBackgroundTask(
workItem,
() => this.OnRunEntityBatchAsync(
batchRequest,
cancellation,
workItem.CompletionToken,
operationInfos),
cancellation);
}
else if (workItem.RequestCase == P.WorkItem.RequestOneofCase.HealthPing)
{
// Health pings are heartbeat-only signals from the backend; the silent-disconnect
// timer reset above is the actionable behavior. Logging at Trace allows operators
// to confirm liveness without flooding info-level telemetry.
this.Logger.ReceivedHealthPing();
}
else
{
this.Logger.UnexpectedWorkItemType(workItem.RequestCase.ToString());
}
}
if (tokenSource.IsCancellationRequested || tokenSource.Token.IsCancellationRequested)
{
// The token has cancelled, this means either:
// 1. The broader 'cancellation' was triggered, return here to start a graceful shutdown.
// 2. The timeoutSource was triggered, return here to trigger a reconnect to the backend.
if (!cancellation.IsCancellationRequested)
{
// Since the cancellation came from the timeout, log a warning.
this.Logger.ConnectionTimeout();
onSilentDisconnect?.Invoke();
}
return;
}
}
…ic catches Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
…out default Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Worker: ProcessWorkItemsAsync no longer wraps �wait foreach in an outer loop. IAsyncStreamReader is single-use, so on a graceful server close (HTTP/2 GOAWAY + OK trailers during a DTS rolling upgrade) the foreach exited cleanly and the outer while re-entered an already-exhausted reader, tight-spinning until the 120s silent-disconnect timer eventually fired and the resulting OCE surfaced as a generic UnexpectedError. New flow: after the foreach exits, explicitly distinguish shutdown, timeout, and peer-initiated close. Peer-initiated close is logged as StreamEndedByPeer (EventId 76, Information) and drives the channel-poisoned counter so repeated drains escalate to recreate. Client: ChannelRecreatingCallInvoker now reads/writes the state field with Volatile.Read/Volatile.Write so call-site threads observe the post-recreate (channel, invoker) pair without torn state. When the invoker owns the channel (self-Address path), RecreateAsync now schedules deferred disposal of the old channel so repeated recreates do not leak HTTP handlers / sockets on NET6+. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
C1: Wrap the worker stream-reader await foreach in a try/catch so that grpc-dotnet's RpcException(Cancelled) raised when the silent-disconnect timer fires is routed to the silent-disconnect handler. Without this catch the exception unwound to the outer Cancelled handler that explicitly does not poison the channel, leaving the silent-disconnect -> recreate path effectively dead. C2: Switch all Lazy<GrpcChannel> instances in the AzureManaged worker and client extensions to LazyThreadSafetyMode.PublicationOnly so a transient CreateChannel failure does not permanently poison the cache slot for the lifetime of the process. I1: Track the most recently observed channel inside the worker's ExecuteAsync and pass it to TryRecreateChannelAsync instead of the never-updated this.grpcOptions.Channel field. The stale field caused the recreator's 'peer already swapped' branch to be skipped on every cycle. I3+I4: Add a class-level disposalCts to ChannelRecreatingCallInvoker that is cancelled in DisposeAsync and linked into the recreator's CancellationToken. The recreate task now re-checks the disposed flag before publishing a freshly created channel, and shuts the new channel down if it loses the race; this prevents leaking a freshly created channel after disposal and lets DisposeAsync abort an in-flight recreate. I5: Only treat a graceful stream-end as a poison signal when no work-item messages were observed on the stream. A drained stream that successfully delivered work is healthy backend rolling-upgrade behavior; counting it would let long-lived processes accumulate spurious poison credits. I6: Reset the consecutive-failure and reconnect-attempt counters on Unauthenticated. A status reply is proof the underlying transport is healthy, so prior transport failures should not combine with a later transient blip to trip the channel recreate. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
- M2: Add explanatory comment in ChannelRecreatingCallInvoker.RecordFailure about why only Unavailable + non-long-poll DeadlineExceeded count toward the recreate threshold. - M3: Drop the unnecessary 'await Task.Yield()' at the end of both AzureManaged RecreateChannelAsync paths. Convert the methods to non-async returning Task.FromResult to avoid CS1998. - I2: Expose 'SetHelloDeadline' and 'SetSilentDisconnectTimeout' in 'Microsoft.DurableTask.Worker.Grpc.Internal.InternalOptionsExtensions' so the PR description's configurability claims are actually reachable from outside the assembly. Pattern matches existing 'SetChannelRecreator'. - M1 (deferred): A regression test for the silent-disconnect path requires faking 'AsyncServerStreamingCall<T>' and 'IAsyncStreamReader<T>'. Skipped to keep the change surgical; the fix is small and verified by inspection. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Extracts the work-item stream-consume + termination-classification logic from GrpcDurableTaskWorker.Processor.ProcessWorkItemsAsync into a new internal helper WorkItemStreamConsumer.ConsumeAsync. The helper owns the linked-token wiring, the await foreach, and the three-arm catch chain that distinguishes: - outer cancellation (Shutdown) - silent-disconnect timeout surfaced as OCE (SilentDisconnect) - silent-disconnect timeout surfaced as RpcException(Cancelled) (SilentDisconnect) from a graceful drain. ProcessWorkItemsAsync becomes a thin coordinator that dispatches per-item work via a new DispatchWorkItem method and switches on the helper's outcome to decide post-loop logging and channel-poisoned signaling. Adds 9 unit tests including the C1 regression test HangingStream_SurfacingRpcCancelled_ReturnsSilentDisconnect that fails on the pre-fix code: prior to the fix the helper would have propagated the RpcException(Cancelled) past the silent-disconnect classification, leaving the caller without the channelLikelyPoisoned signal and the channel-recreate counter would not advance. Behavior is preserved exactly. Outer-cancellation surfaced as RpcException(Cancelled) still propagates to ExecuteAsync's outer catch chain (documented in OuterCancellation_WithRpcCancelledFromStream_PropagatesException). Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Previously the AzureManaged worker and client recreators wrote the new Lazy<GrpcChannel> into the cache via TryUpdate, scheduled deferred disposal of the old channel, and THEN materialized newLazy.Value. If CreateChannel throws inside .Value, the cache is left pointing at a permanently-failing Lazy and the still-healthy old channel has already been queued for shutdown — an unrecoverable state for that cache key. Now we call source.CreateChannel() first and only TryUpdate after the new channel is proven created. If the swap loses the race we dispose the freshly-created channel so it does not leak. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
c0fe328 to
cec5f4a
Compare
- Client/Grpc/GrpcDurableTaskClient.GetCallInvoker: when an externally-supplied Channel is used with channel recreation enabled, return an AsyncDisposable that disposes the ChannelRecreatingCallInvoker wrapper. Without this, the wrapper's CancellationTokenSource and any in-flight recreate task outlive the client. The wrapper's DisposeAsync is a no-op for the channel itself when ownsChannel is false, so the externally-owned channel is not affected. - Client/Grpc/ChannelRecreatingCallInvoker.RecordFailure: a non-counted RpcException status (NotFound, InvalidArgument, FailedPrecondition, etc.) now resets consecutiveFailures. Any gRPC status reply is proof the transport is healthy enough to deliver round-trips, so an app-level error sequence should not allow a prior transport-failure count to accumulate across it and combine with a later blip to trip a false-positive recreate. - Worker/Grpc/ReconnectBackoff: doc-comment now mentions that a non-positive cap also returns TimeSpan.Zero (matches the Math.Max clamp behavior). - Worker/Grpc/WorkItemStreamConsumer: removed unused using System.Runtime.CompilerServices;. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
- AzureManaged worker/client: don't return a freshly-created channel that has been scheduled for deferred disposal when the cache slot disappears (concurrent DisposeAsync); throw ObjectDisposedException instead. - WorkItemStreamConsumer: clamp silentDisconnectTimeout to int.MaxValue ms before passing to CancellationTokenSource.CancelAfter (avoids ArgumentOutOfRangeException for >24 day values). - Processor.ConnectAsync: clamp DateTime.UtcNow.Add(helloDeadline) at DateTime.MaxValue so a misconfigured HelloDeadline cannot crash the connect loop. - Processor: clamp delay.TotalMilliseconds to int.MaxValue before logging ReconnectBackoff to avoid integer overflow. - Processor: rename 'onSilentDisconnect' callback to 'onChannelLikelyPoisoned' to reflect that it also fires on graceful empty drains, not only on silent-disconnect timeouts. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
sophiatev
previously requested changes
Apr 22, 2026
Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
sophiatev
approved these changes
Apr 23, 2026
torosent
approved these changes
Apr 23, 2026
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
Hardens the gRPC worker and client against silent disconnects and stale transport state when the Durable Task Scheduler (DTS) backend is replaced, drained, or otherwise leaves callers latched onto a bad HTTP/2 path.
Before this change, a worker or long-lived client could stay pinned to a half-open connection, stale sub-channel state, stale DNS result, or stale routing affinity for the lifetime of the process. In practice that meant a backend replacement or graceful drain could leave callers idle until restart even though the service had recovered.
Motivation / root causes
The main failure modes were:
HelloAsyncon the worker connect path had no deadline, so a half-open handshake could hang forever and prevent the reconnect loop from making progress.SocketsHttpHandlerstate, or backend routing affinity, nothing forced a fresh channel.Unauthenticated) or not surfaced clearly at all (graceful server stream end).double -> longconversion ofTimeSpan, which was not well-defined for extreme values.How this change addresses them
Worker (
src/Worker/Grpc)HelloAsyncdeadline (configurable viaGrpcDurableTaskWorkerOptions.SetHelloDeadline()) so a half-open handshake cannot stall reconnect forever.Unavailable, helloDeadlineExceeded, silent stream disconnects, and empty graceful drains as transport failures that increment a recreate counter.Unauthenticateddistinctly.HealthPingatTrace.Logs.StreamEndedByPeer.WorkerId: replace that worker's cached channel, guard against stale callbacks, and defer disposal of the previous channel. This keeps the worker logic focused on the realistic case instead of modeling peer-worker contention as a default scenario.Client (
src/Client/Grpc)The client does not have a central reconnect loop, so recovery is implemented around the call path instead:
ChannelRecreatingCallInvokerobserves unary RPC outcomes and recreates the channel after repeated transport failures.DeadlineExceededcalls are excluded from failure counting.MinRecreateIntervalconversion to stopwatch ticks now saturates safely for extreme values.Azure Managed cache wiring (
src/{Worker,Client}/AzureManaged)WorkerId.Tests and validation
DateTime.MaxValuehello-deadline clamp.WorkItemStreamConsumerheartbeat-reset timing test that was failing CI.What this prevents
PR comment follow-ups
WorkerId, so peer-worker coordination is not a realistic default scenario.Breaking change
No breaking change.