Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
5f58004
Harden gRPC worker and client against silent disconnects
Copilot Apr 21, 2026
71bca4c
Address CodeQL feedback: drop ReferenceEquals on struct, narrow gener…
Copilot Apr 21, 2026
6533658
Sort new log event additions by EventId in Worker and Client Logs.cs
Copilot Apr 21, 2026
665ba0e
Address Copilot review: clamp backoff cap, cover SilentDisconnectTime…
Copilot Apr 21, 2026
4d1fbbe
Reconnect after graceful stream-end (GOAWAY) and thread-safe state swap
Copilot Apr 21, 2026
e0fc2e8
Fix critical bugs surfaced by code review
Apr 21, 2026
5dfc5b1
Address remaining audit nits
Apr 21, 2026
f71d510
Add M1: regression test for silent-disconnect classification
Apr 21, 2026
cec5f4a
Materialize new channel before swapping cache entry
Apr 21, 2026
ab763db
Address Copilot review feedback on cec5f4a
Apr 21, 2026
955ea63
Address Copilot review feedback on ab763db
Apr 21, 2026
5186ae1
Merge branch 'main' into fix/grpc-resilience-channel-recreate
berndverst Apr 22, 2026
7df10dc
Address PR review follow-up on gRPC resilience
Apr 22, 2026
add1a39
Address PR feedback for gRPC recreation
Apr 22, 2026
4bc4a08
Stabilize WorkItemStreamConsumer timing test
Apr 22, 2026
aba04b7
Clamp worker hello deadline to UTC max
Apr 23, 2026
5594c7b
Simplify worker recreate flow
Apr 23, 2026
a754b55
Merge branch 'main' into fix/grpc-resilience-channel-recreate
berndverst Apr 23, 2026
c0dbb3d
Fix worker recreate ownership
Apr 23, 2026
c08076e
Simplify worker channel cache
Apr 23, 2026
fd9381d
Fix worker recreate disposal timing
Apr 23, 2026
dc243e1
Fix continue-as-new event carryover
Apr 23, 2026
18e23a3
Add worker disconnect coverage tests
Apr 23, 2026
dd9ad77
Fix fatal deferred-dispose filters
Apr 23, 2026
4d30b9b
Fix client recreate dispose race
Apr 23, 2026
caf307b
Ignore local git worktrees
Apr 23, 2026
c85234d
Move wrapper changes to separate PR
Apr 23, 2026
f246e90
Keep worktree ignore local
Apr 23, 2026
d291efd
Address latest PR feedback on reconnect cleanup
Apr 23, 2026
729654a
Merge branch 'main' into fix/grpc-resilience-channel-recreate
berndverst Apr 23, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
120 changes: 119 additions & 1 deletion src/Client/AzureManaged/DurableTaskSchedulerClientExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@
using System.Collections.Concurrent;
using System.Diagnostics;
using System.Linq;
using System.Threading;
using Azure.Core;
using Grpc.Net.Client;
using Microsoft.DurableTask.Client.Grpc;
using Microsoft.DurableTask.Client.Grpc.Internal;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.DependencyInjection.Extensions;
using Microsoft.Extensions.Options;
Expand Down Expand Up @@ -156,7 +158,122 @@ public void Configure(string? name, GrpcDurableTaskClientOptions options)
string cacheKey = $"{optionsName}\u001F{source.EndpointAddress}\u001F{source.TaskHubName}\u001F{source.ResourceId}\u001F{credentialType}\u001F{source.AllowInsecureCredentials}\u001F{retryOptionsKey}";
options.Channel = this.channels.GetOrAdd(
cacheKey,
_ => new Lazy<GrpcChannel>(source.CreateChannel)).Value;
_ => new Lazy<GrpcChannel>(source.CreateChannel, LazyThreadSafetyMode.PublicationOnly)).Value;
options.SetChannelRecreator((oldChannel, ct) => this.RecreateChannelAsync(cacheKey, source, oldChannel, ct));
}

/// <summary>
/// Atomically swaps the cached channel for the given key with a freshly created one and schedules
/// graceful disposal of the old channel after a grace period so any in-flight RPCs from peer
/// clients can drain. Returns the currently cached channel if a peer client has already recreated it.
/// </summary>
async Task<GrpcChannel> RecreateChannelAsync(
string cacheKey,
DurableTaskSchedulerClientOptions source,
GrpcChannel oldChannel,
CancellationToken cancellation)
{
cancellation.ThrowIfCancellationRequested();

// Recreate callbacks can outlive Configure(...) because clients keep the delegate on their
// options. Best-effort check for disposal before publishing anything back into the shared cache.
if (this.disposed == 1)
{
throw new ObjectDisposedException(nameof(ConfigureGrpcChannel));
}

// Shared-cache recreation has four relevant states:
// 1. No entry exists anymore. Create one and use it.
// 2. The entry already materialized a different channel. A peer client already refreshed it.
// 3. The entry still represents what this client observed. Win TryUpdate and publish the new channel.
// 4. The entry changes between our read and TryUpdate. Lose the race, dispose ours, and reuse the winner.
if (!this.channels.TryGetValue(cacheKey, out Lazy<GrpcChannel>? currentLazy))
{
// PublicationOnly avoids permanently caching a transient CreateChannel exception.
Lazy<GrpcChannel> created = new(source.CreateChannel, LazyThreadSafetyMode.PublicationOnly);
if (this.disposed == 1)
{
throw new ObjectDisposedException(nameof(ConfigureGrpcChannel));
}

if (this.channels.TryAdd(cacheKey, created))
{
return created.Value;
}

this.channels.TryGetValue(cacheKey, out currentLazy);
}

if (currentLazy is null)
{
throw new InvalidOperationException("Failed to obtain a cached gRPC channel after recreation attempt.");
}

// Only a materialized Lazy can be compared against oldChannel by reference. If the cache slot
// has not created its channel yet, let TryUpdate decide whether this recreate attempt still owns it.
if (currentLazy.IsValueCreated && !ReferenceEquals(currentLazy.Value, oldChannel))
{
// A peer client already swapped in a new channel; reuse it.
return currentLazy.Value;
}

// Materialize the new channel BEFORE swapping the dictionary so a CreateChannel failure
// leaves the existing entry intact. If we swapped a not-yet-materialized Lazy and then
// CreateChannel threw, the dictionary would point to a permanently-failing Lazy and the
// old channel would have already been queued for disposal — an unrecoverable state.
GrpcChannel newChannel = source.CreateChannel();
if (this.disposed == 1)
{
await DisposeChannelAsync(newChannel).ConfigureAwait(false);
throw new ObjectDisposedException(nameof(ConfigureGrpcChannel));
}

// The cache always stores Lazy<GrpcChannel> so the steady-state Configure path and the
// recreate path use the same dictionary value shape. Recreate materializes first only to
// avoid publishing a lazy that could fault before we know channel creation succeeded.
Lazy<GrpcChannel> newLazy = new(newChannel);
Comment thread
berndverst marked this conversation as resolved.
if (!this.channels.TryUpdate(cacheKey, newLazy, currentLazy))
{
// Lost the race. Always queue the freshly-created channel for deferred disposal so
// it does not leak. Then return the winning entry — but if the cache slot has been
// removed entirely (e.g. concurrent DisposeAsync cleared the dictionary), do NOT
// hand back the doomed `newChannel`: it has already been scheduled for shutdown.
_ = ScheduleDeferredDisposeAsync(newChannel);
if (this.channels.TryGetValue(cacheKey, out Lazy<GrpcChannel>? winner) && winner is not null)
{
return winner.Value;
}

throw new ObjectDisposedException(this.GetType().FullName);
}

if (currentLazy.IsValueCreated)
{
_ = ScheduleDeferredDisposeAsync(currentLazy.Value);
}

return newChannel;
}

static async Task ScheduleDeferredDisposeAsync(GrpcChannel channel)
{
try
{
await Task.Delay(TimeSpan.FromSeconds(30)).ConfigureAwait(false);
await DisposeChannelAsync(channel).ConfigureAwait(false);
}
catch (Exception ex) when (ex is not OutOfMemoryException
and not StackOverflowException
and not AccessViolationException
and not ThreadAbortException)
{
Comment thread
berndverst marked this conversation as resolved.
if (ex is not OperationCanceledException and not ObjectDisposedException)
{
Trace.TraceError(
"Unexpected exception while deferred-disposing gRPC channel in DurableTaskSchedulerClientExtensions.ScheduleDeferredDisposeAsync: {0}",
ex);
}
}
}

/// <inheritdoc/>
Expand All @@ -175,6 +292,7 @@ public async ValueTask DisposeAsync()
}
catch (Exception ex) when (ex is not OutOfMemoryException
and not StackOverflowException
and not AccessViolationException
and not ThreadAbortException)
{
// Swallow disposal exceptions - disposal should be best-effort to ensure
Expand Down
Loading
Loading