-
Notifications
You must be signed in to change notification settings - Fork 53
Fix GrpcChannel handle leak in AzureManaged backend #625
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
- Add GrpcChannelCache for thread-safe channel caching by endpoint - Update Client/Worker extensions to use shared cache - Ensure channels are disposed when ServiceProvider disposes - Add comprehensive unit and integration tests
|
I asked Copilot CLI to review this with the following prompt since the currently proposed solution looks far more complicated than I was expecting.
Here's what the response was: I can try to review this more deeply tomorrow. |
- Move channel factory call outside lock to prevent deadlock - Combine nested if statements in Replace method - Use 'using' statement for channel disposal - Catch Exception instead of bare catch - Remove unused variable in test
- Remove separate GrpcChannelCache class - Inline channel caching directly in ConfigureGrpcChannel using ConcurrentDictionary<string, Lazy<GrpcChannel>> - Make ConfigureGrpcChannel implement IDisposable for proper channel disposal - Remove unused Replace() and TryRemove() methods - Add disposal verification tests - Reduces complexity from 170+ LOC to ~40 LOC per extension
- Use LINQ Where() instead of if inside foreach for filtering channels - Narrow catch (Exception) to specific types (OperationCanceledException, ObjectDisposedException)
test/Client/AzureManaged.Tests/DurableTaskSchedulerClientExtensionsTests.cs
Outdated
Show resolved
Hide resolved
src/Worker/AzureManaged/DurableTaskSchedulerWorkerExtensions.cs
Outdated
Show resolved
Hide resolved
src/Worker/AzureManaged/DurableTaskSchedulerWorkerExtensions.cs
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
Fixes a gRPC channel/handle leak in the AzureManaged Durable Task Scheduler integration by caching GrpcChannel instances and disposing them when the DI container is disposed.
Changes:
- Cache
GrpcChannelinstances insideConfigureGrpcChannelusingConcurrentDictionary<..., Lazy<GrpcChannel>>. - Make the channel configurators disposable so cached channels are cleaned up on
ServiceProviderdisposal. - Add tests intended to validate channel reuse and disposal behavior for both client and worker extensions.
Reviewed changes
Copilot reviewed 4 out of 4 changed files in this pull request and generated 12 comments.
| File | Description |
|---|---|
| src/Client/AzureManaged/DurableTaskSchedulerClientExtensions.cs | Adds per-configuration GrpcChannel caching and async disposal logic for client-side options configuration. |
| src/Worker/AzureManaged/DurableTaskSchedulerWorkerExtensions.cs | Adds per-configuration GrpcChannel caching and async disposal logic for worker-side options configuration. |
| test/Client/AzureManaged.Tests/DurableTaskSchedulerClientExtensionsTests.cs | Adds tests for channel reuse/isolation and disposal on DI container teardown for client extensions. |
| test/Worker/AzureManaged.Tests/DurableTaskSchedulerWorkerExtensionsTests.cs | Adds tests for channel reuse/isolation and disposal on DI container teardown for worker extensions. |
| sealed class ConfigureGrpcChannel : IConfigureNamedOptions<GrpcDurableTaskWorkerOptions>, IAsyncDisposable | ||
| { | ||
| readonly IOptionsMonitor<DurableTaskSchedulerWorkerOptions> schedulerOptions; | ||
| readonly ConcurrentDictionary<string, Lazy<GrpcChannel>> channels = new(); | ||
| int disposed; |
Copilot
AI
Jan 27, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
PR description mentions IDisposable and a volatile disposed flag, but this implementation is IAsyncDisposable-only and the disposed field isn’t volatile (and is read without Volatile.Read). Either update the PR description or adjust the implementation to match (e.g., implement IDisposable and use volatile/Volatile.Read for the disposed check).
| sealed class ConfigureGrpcChannel : IConfigureNamedOptions<GrpcDurableTaskClientOptions>, IAsyncDisposable | ||
| { | ||
| readonly IOptionsMonitor<DurableTaskSchedulerClientOptions> schedulerOptions; | ||
| readonly ConcurrentDictionary<string, Lazy<GrpcChannel>> channels = new(); | ||
| int disposed; |
Copilot
AI
Jan 27, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
PR description mentions IDisposable and a volatile disposed flag, but this implementation is IAsyncDisposable-only and the disposed field isn’t volatile (and is read without Volatile.Read). Either update the PR description or adjust the implementation to match (e.g., implement IDisposable and use volatile/Volatile.Read for the disposed check).
test/Client/AzureManaged.Tests/DurableTaskSchedulerClientExtensionsTests.cs
Outdated
Show resolved
Hide resolved
test/Worker/AzureManaged.Tests/DurableTaskSchedulerWorkerExtensionsTests.cs
Outdated
Show resolved
Hide resolved
test/Worker/AzureManaged.Tests/DurableTaskSchedulerWorkerExtensionsTests.cs
Outdated
Show resolved
Hide resolved
| if (exceptions is { Count: > 0 }) | ||
| { | ||
| throw new AggregateException(exceptions); | ||
| } |
Copilot
AI
Jan 27, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
DisposeAsync throws an AggregateException when any channel shutdown/dispose fails. Throwing from ServiceProvider disposal can surface as app shutdown failures and is difficult for callers to handle. Consider making this best-effort (swallow/log disposal errors) instead of throwing.
| // Act - configure two different named clients with different endpoints | ||
| mockBuilder1.Object.UseDurableTaskScheduler("endpoint1.westus3.durabletask.io", ValidTaskHub, credential); | ||
| mockBuilder2.Object.UseDurableTaskScheduler("endpoint2.westus3.durabletask.io", ValidTaskHub, credential); | ||
| ServiceProvider provider = services.BuildServiceProvider(); | ||
|
|
Copilot
AI
Jan 27, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This test uses different endpoints for different named options, so it will pass even if the cache key accidentally ignores the options name. To validate name isolation in the cache key, use the same endpoint/task hub for both names and assert the channels differ; also dispose the ServiceProvider to avoid leaking channels.
src/Worker/AzureManaged/DurableTaskSchedulerWorkerExtensions.cs
Outdated
Show resolved
Hide resolved
| string optionsName = name ?? Options.DefaultName; | ||
| DurableTaskSchedulerWorkerOptions source = this.schedulerOptions.Get(optionsName); | ||
|
|
||
| // Create a cache key based on the options name, endpoint, and task hub. | ||
| // This ensures channels are reused for the same configuration |
Copilot
AI
Jan 27, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
CreateChannel() behavior depends on more than endpoint/task hub (e.g., ResourceId, Credential, AllowInsecureCredentials, and WorkerId via the call credentials interceptor). If any of these values change while EndpointAddress/TaskHubName stay the same (e.g., via options reload), the cached channel will be reused with stale settings. Consider including these fields in the cache key or enforcing immutability for them.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@copilot open a new pull request to apply changes based on this feedback
| if (exceptions is { Count: > 0 }) | ||
| { | ||
| throw new AggregateException(exceptions); | ||
| } |
Copilot
AI
Jan 27, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
DisposeAsync throws an AggregateException when any channel shutdown/dispose fails. Throwing from ServiceProvider disposal can surface as app shutdown failures and is difficult for callers to handle. Consider making this best-effort (swallow/log disposal errors) instead of throwing.
…sionsTests.cs Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
…sionsTests.cs Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
…sionsTests.cs Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
Copilot reviewed 4 out of 4 changed files in this pull request and generated 6 comments.
| // Create a cache key based on the options name, endpoint, and task hub. | ||
| // This ensures channels are reused for the same configuration | ||
| // but separate channels are created for different configurations. | ||
| // Use a delimiter character (\u001F) that will not appear in typical endpoint URIs. | ||
| string cacheKey = $"{optionsName}\u001F{source.EndpointAddress}\u001F{source.TaskHubName}"; |
Copilot
AI
Jan 28, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The cache key uses source.EndpointAddress, but DurableTaskSchedulerWorkerOptions.CreateChannel() normalizes endpoints by adding a scheme when missing. As a result, equivalent endpoints (with vs without https://) will not share a cache entry and will create multiple channels. Consider normalizing the endpoint string for the cache key the same way CreateChannel() does (or use the channel target/normalized endpoint).
| mockBuilder.Object.UseDurableTaskScheduler(ValidEndpoint, ValidTaskHub, credential); | ||
| using ServiceProvider provider = services.BuildServiceProvider(); | ||
|
|
Copilot
AI
Jan 28, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The ServiceProvider created here is not disposed. Since these tests now create/dispose cached gRPC channels via DI, the provider should be wrapped in using/await using to avoid leaking resources across test runs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@copilot open a new pull request to apply changes based on this feedback
| mockBuilder2.Object.UseDurableTaskScheduler("endpoint.westus3.durabletask.io", ValidTaskHub, credential); | ||
| using ServiceProvider provider = services.BuildServiceProvider(); | ||
|
|
Copilot
AI
Jan 28, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The ServiceProvider created here is not disposed. Please wrap it in using/await using so the container can dispose the cached channels and other tracked disposables, preventing cross-test resource leakage.
| #if NET7_0_OR_GREATER | ||
| ObjectDisposedException.ThrowIf(this.disposed == 1, this); | ||
| #else | ||
| if (this.disposed == 1) | ||
| { |
Copilot
AI
Jan 28, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
disposed is written via Interlocked.Exchange in DisposeAsync, but read here via a plain field read. For cross-thread visibility, the read should use Volatile.Read(ref this.disposed) (or make the field volatile) to ensure Configure reliably observes disposal under concurrency.
| ObjectDisposedException.ThrowIf(this.disposed == 1, this); | ||
| #else | ||
| if (this.disposed == 1) | ||
| { | ||
| throw new ObjectDisposedException(nameof(ConfigureGrpcChannel)); |
Copilot
AI
Jan 28, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
disposed is written via Interlocked.Exchange in DisposeAsync, but read here via a plain field read. Use Volatile.Read(ref this.disposed) (or make the field volatile) so concurrent calls to Configure reliably observe disposal.
| // Resolve options multiple times to trigger channel configuration via new options instances | ||
| IOptionsFactory<GrpcDurableTaskWorkerOptions> optionsFactory = provider.GetRequiredService<IOptionsFactory<GrpcDurableTaskWorkerOptions>>(); | ||
| GrpcDurableTaskWorkerOptions options1 = optionsFactory.Create(Options.DefaultName); | ||
| GrpcDurableTaskWorkerOptions options2 = optionsFactory.Create(Options.DefaultName); | ||
|
|
Copilot
AI
Jan 28, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This test uses IOptionsMonitor.Get twice for the same name, which typically returns the same cached options instance. As a result, it may pass even if ConfigureGrpcChannel.Configure creates a new channel on each invocation. To actually validate channel reuse across multiple configurations, consider using IOptionsFactory<GrpcDurableTaskWorkerOptions>.Create(...) twice (like the client tests do).
Co-authored-by: Copilot Autofix powered by AI <223894421+github-code-quality[bot]@users.noreply.github.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
Copilot reviewed 4 out of 4 changed files in this pull request and generated 4 comments.
| string credentialType = source.Credential?.GetType().FullName ?? "null"; | ||
| string retryOptionsKey = source.RetryOptions != null | ||
| ? $"{source.RetryOptions.MaxRetries}|{source.RetryOptions.InitialBackoffMs}|{source.RetryOptions.MaxBackoffMs}|{source.RetryOptions.BackoffMultiplier}|{(source.RetryOptions.RetryableStatusCodes != null ? string.Join(",", source.RetryOptions.RetryableStatusCodes) : string.Empty)}" | ||
| : "null"; | ||
| string cacheKey = $"{optionsName}\u001F{source.EndpointAddress}\u001F{source.TaskHubName}\u001F{source.ResourceId}\u001F{credentialType}\u001F{source.AllowInsecureCredentials}\u001F{retryOptionsKey}"; | ||
| options.Channel = this.channels.GetOrAdd( |
Copilot
AI
Jan 28, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The cache key only includes the credential type (FullName), but CreateChannel() captures and uses the TokenCredential instance (e.g., for AccessTokenCache and auth headers). If the credential instance changes while keeping the same type (e.g., two ManagedIdentityCredential instances with different client IDs, or config reload), this will incorrectly reuse a channel created with the old credential. Consider keying on the credential instance identity (or a stable credential-specific identifier) rather than only its type, and also normalize EndpointAddress the same way CreateChannel() does (scheme/no-scheme) to avoid duplicate channels for the same effective endpoint.
| { | ||
| readonly IOptionsMonitor<DurableTaskSchedulerClientOptions> schedulerOptions; | ||
| readonly ConcurrentDictionary<string, Lazy<GrpcChannel>> channels = new(); | ||
| int disposed; |
Copilot
AI
Jan 28, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
disposed is used as a cross-thread flag (read in Configure, written via Interlocked.Exchange in DisposeAsync) but is read without volatile/Volatile.Read. Also, Configure can race with DisposeAsync (an in-flight Configure can create/add a channel after DisposeAsync has enumerated/cleared the dictionary), which would leak the channel. Consider synchronizing Configure vs DisposeAsync (e.g., a lock/reader-writer gate) and using volatile/Volatile.Read for the flag.
| int disposed; | |
| volatile int disposed; |
| string credentialType = source.Credential?.GetType().FullName ?? "null"; | ||
| string cacheKey = $"{optionsName}\u001F{source.EndpointAddress}\u001F{source.TaskHubName}\u001F{source.ResourceId}\u001F{credentialType}\u001F{source.AllowInsecureCredentials}\u001F{source.WorkerId}"; | ||
| options.Channel = this.channels.GetOrAdd( | ||
| cacheKey, |
Copilot
AI
Jan 28, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The cache key only includes the credential type (FullName), but CreateChannel() uses the TokenCredential instance (token cache + auth headers). If the credential instance changes while keeping the same type, this will incorrectly reuse a channel created with the old credential. Consider keying on the credential instance identity (or a stable credential-specific identifier) rather than only its type, and normalize EndpointAddress the same way CreateChannel() does to avoid duplicate channels for equivalent endpoints.
| { | ||
| readonly IOptionsMonitor<DurableTaskSchedulerWorkerOptions> schedulerOptions; | ||
| readonly ConcurrentDictionary<string, Lazy<GrpcChannel>> channels = new(); | ||
| int disposed; |
Copilot
AI
Jan 28, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
disposed is used as a cross-thread flag (read in Configure, written via Interlocked.Exchange in DisposeAsync) but is read without volatile/Volatile.Read. Also, Configure can race with DisposeAsync (an in-flight Configure can create/add a channel after DisposeAsync has enumerated/cleared the dictionary), which would leak the channel. Consider synchronizing Configure vs DisposeAsync (e.g., a lock/reader-writer gate) and using volatile/Volatile.Read for the flag.
| int disposed; | |
| volatile int disposed; |
|
@copilot open a new pull request to apply changes based on the comments in this thread |
|
Closing this PR and will open a new fixed PR |
Summary
What changed?
ConfigureGrpcChannelclass usingConcurrentDictionary<string, Lazy<GrpcChannel>>ConfigureGrpcChannelimplementIDisposableto properly dispose channels when theServiceProvideris disposedWhy is this change needed?
IConfigureNamedOptions<DurableTaskSchedulerOptions>.Configure()method was being called multiple times (on each retry)GrpcChannel, which allocates internal HTTP handlers and socket connectionsoptions.Channelis pre-set (AzureManaged case),GetCallInvoker()returnsdefaultfor the disposable referenceIssues / work items
Project checklist
release_notes.mdAI-assisted code disclosure (required)
Was an AI tool used? (select one)
If AI was used:
src/Client/AzureManaged/DurableTaskSchedulerClientExtensions.cs(modified)src/Worker/AzureManaged/DurableTaskSchedulerWorkerExtensions.cs(modified)test/Client/AzureManaged.Tests/DurableTaskSchedulerClientExtensionsTests.cs(modified)test/Worker/AzureManaged.Tests/DurableTaskSchedulerWorkerExtensionsTests.cs(modified)GrpcChannelCacheclass to inlineConcurrentDictionary<string, Lazy<GrpcChannel>>per reviewer feedbackAI verification (required if AI was used):
Testing
Automated tests
test/Client/AzureManaged.Tests- 33 tests passedtest/Worker/AzureManaged.Tests- 31 tests passedtest/Shared/AzureManaged.Tests- 20 tests passedManual validation (only if runtime/behavior changed)
Notes for reviewers
ConfigureGrpcChannelclass now usesConcurrentDictionary<string, Lazy<GrpcChannel>>for thread-safe channel cachingLazy<GrpcChannel>ensures thread-safe initialization without holding locks during channel creation (avoids potential deadlocks)ShutdownAsync()is called beforeDispose()for graceful shutdown of in-flight RPCsvolatilekeyword on thedisposedfield ensures proper memory visibility when checking disposal state from multiple threads