From 86d6527e50f7aaf2ed49da7295e068fef6163a73 Mon Sep 17 00:00:00 2001 From: David Fowler Date: Thu, 23 Jan 2025 18:09:32 -0800 Subject: [PATCH 1/7] Add snapshot ID tracking and improve logging - Introduced a new `Id` property in `CustomResourceSnapshot` for unique identification of snapshots. - Updated `WatchAsync` to filter resource events based on the maximum snapshot ID. - Modified `PublishUpdateAsync` to increment the snapshot ID for new states. - Enhanced logging in `PublishUpdateAsync` to include the new snapshot ID for better traceability. --- .../CustomResourceSnapshot.cs | 2 ++ .../ResourceNotificationService.cs | 36 +++++++++++++------ 2 files changed, 27 insertions(+), 11 deletions(-) diff --git a/src/Aspire.Hosting/ApplicationModel/CustomResourceSnapshot.cs b/src/Aspire.Hosting/ApplicationModel/CustomResourceSnapshot.cs index 7f9b0fb27e7..31459c3de0b 100644 --- a/src/Aspire.Hosting/ApplicationModel/CustomResourceSnapshot.cs +++ b/src/Aspire.Hosting/ApplicationModel/CustomResourceSnapshot.cs @@ -18,6 +18,8 @@ public sealed record CustomResourceSnapshot private readonly ImmutableArray _healthReports = []; private readonly ResourceStateSnapshot? _state; + internal long Id { get; set; } + /// /// The type of the resource. /// diff --git a/src/Aspire.Hosting/ApplicationModel/ResourceNotificationService.cs b/src/Aspire.Hosting/ApplicationModel/ResourceNotificationService.cs index 7b56fb635e2..86a39c27b46 100644 --- a/src/Aspire.Hosting/ApplicationModel/ResourceNotificationService.cs +++ b/src/Aspire.Hosting/ApplicationModel/ResourceNotificationService.cs @@ -305,17 +305,6 @@ public async Task WaitForResourceAsync(string resourceName, Func< /// public async IAsyncEnumerable WatchAsync([EnumeratorCancellation] CancellationToken cancellationToken = default) { - // Return the last snapshot for each resource. - foreach (var state in _resourceNotificationStates) - { - var (resource, resourceId) = state.Key; - - if (state.Value.LastSnapshot is not null) - { - yield return new ResourceEvent(resource, resourceId, state.Value.LastSnapshot); - } - } - var channel = Channel.CreateUnbounded(); void WriteToChannel(ResourceEvent resourceEvent) => @@ -326,10 +315,30 @@ void WriteToChannel(ResourceEvent resourceEvent) => OnResourceUpdated += WriteToChannel; } + long id = 0; + + // Return the last snapshot for each resource. + foreach (var state in _resourceNotificationStates) + { + var (resource, resourceId) = state.Key; + + if (state.Value.LastSnapshot is { } ss) + { + id = Math.Max(id, ss.Id); + + yield return new ResourceEvent(resource, resourceId, state.Value.LastSnapshot); + } + } + try { await foreach (var item in channel.Reader.ReadAllAsync(cancellationToken).ConfigureAwait(false)) { + if (item.Snapshot.Id <= id) + { + continue; + } + yield return item; } } @@ -360,6 +369,9 @@ public Task PublishUpdateAsync(IResource resource, string resourceId, Func Date: Thu, 23 Jan 2025 18:28:47 -0800 Subject: [PATCH 2/7] The ID is global across resources --- .../ApplicationModel/ResourceNotificationService.cs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/Aspire.Hosting/ApplicationModel/ResourceNotificationService.cs b/src/Aspire.Hosting/ApplicationModel/ResourceNotificationService.cs index 86a39c27b46..e6b1ab001e3 100644 --- a/src/Aspire.Hosting/ApplicationModel/ResourceNotificationService.cs +++ b/src/Aspire.Hosting/ApplicationModel/ResourceNotificationService.cs @@ -23,6 +23,7 @@ public class ResourceNotificationService : IDisposable private readonly IServiceProvider _serviceProvider; private readonly CancellationTokenSource _disposing = new(); private readonly ResourceLoggerService _resourceLoggerService; + private static int s_id; private Action? OnResourceUpdated { get; set; } @@ -370,7 +371,7 @@ public Task PublishUpdateAsync(IResource resource, string resourceId, Func Date: Thu, 23 Jan 2025 20:51:05 -0800 Subject: [PATCH 3/7] Call it version --- .../ApplicationModel/CustomResourceSnapshot.cs | 5 ++++- .../ResourceNotificationService.cs | 14 +++++++------- 2 files changed, 11 insertions(+), 8 deletions(-) diff --git a/src/Aspire.Hosting/ApplicationModel/CustomResourceSnapshot.cs b/src/Aspire.Hosting/ApplicationModel/CustomResourceSnapshot.cs index 31459c3de0b..31767cbb8f6 100644 --- a/src/Aspire.Hosting/ApplicationModel/CustomResourceSnapshot.cs +++ b/src/Aspire.Hosting/ApplicationModel/CustomResourceSnapshot.cs @@ -18,7 +18,10 @@ public sealed record CustomResourceSnapshot private readonly ImmutableArray _healthReports = []; private readonly ResourceStateSnapshot? _state; - internal long Id { get; set; } + /// + /// Monotonically increasing version number for the snapshot. + /// + internal long Version { get; set; } /// /// The type of the resource. diff --git a/src/Aspire.Hosting/ApplicationModel/ResourceNotificationService.cs b/src/Aspire.Hosting/ApplicationModel/ResourceNotificationService.cs index e6b1ab001e3..03b05998dc0 100644 --- a/src/Aspire.Hosting/ApplicationModel/ResourceNotificationService.cs +++ b/src/Aspire.Hosting/ApplicationModel/ResourceNotificationService.cs @@ -23,7 +23,7 @@ public class ResourceNotificationService : IDisposable private readonly IServiceProvider _serviceProvider; private readonly CancellationTokenSource _disposing = new(); private readonly ResourceLoggerService _resourceLoggerService; - private static int s_id; + private static long s_maxVersion; private Action? OnResourceUpdated { get; set; } @@ -316,7 +316,7 @@ void WriteToChannel(ResourceEvent resourceEvent) => OnResourceUpdated += WriteToChannel; } - long id = 0; + var maxVersion = 0L; // Return the last snapshot for each resource. foreach (var state in _resourceNotificationStates) @@ -325,7 +325,7 @@ void WriteToChannel(ResourceEvent resourceEvent) => if (state.Value.LastSnapshot is { } ss) { - id = Math.Max(id, ss.Id); + maxVersion = Math.Max(maxVersion, ss.Version); yield return new ResourceEvent(resource, resourceId, state.Value.LastSnapshot); } @@ -335,7 +335,7 @@ void WriteToChannel(ResourceEvent resourceEvent) => { await foreach (var item in channel.Reader.ReadAllAsync(cancellationToken).ConfigureAwait(false)) { - if (item.Snapshot.Id <= id) + if (item.Snapshot.Version <= maxVersion) { continue; } @@ -371,7 +371,7 @@ public Task PublishUpdateAsync(IResource resource, string resourceId, Func Date: Thu, 23 Jan 2025 20:57:14 -0800 Subject: [PATCH 4/7] Added comments --- .../ApplicationModel/ResourceNotificationService.cs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/Aspire.Hosting/ApplicationModel/ResourceNotificationService.cs b/src/Aspire.Hosting/ApplicationModel/ResourceNotificationService.cs index 03b05998dc0..82928fe1ea1 100644 --- a/src/Aspire.Hosting/ApplicationModel/ResourceNotificationService.cs +++ b/src/Aspire.Hosting/ApplicationModel/ResourceNotificationService.cs @@ -319,12 +319,15 @@ void WriteToChannel(ResourceEvent resourceEvent) => var maxVersion = 0L; // Return the last snapshot for each resource. + // We do this after subscribing to the event to avoid missing any updates. foreach (var state in _resourceNotificationStates) { var (resource, resourceId) = state.Key; if (state.Value.LastSnapshot is { } ss) { + // Keep track of the highest version we have seen so far. + // This is used to skip events that are older than the max version. maxVersion = Math.Max(maxVersion, ss.Version); yield return new ResourceEvent(resource, resourceId, state.Value.LastSnapshot); @@ -335,6 +338,7 @@ void WriteToChannel(ResourceEvent resourceEvent) => { await foreach (var item in channel.Reader.ReadAllAsync(cancellationToken).ConfigureAwait(false)) { + // Skip events that are older than the max version we have seen so far. This avoids duplicates. if (item.Snapshot.Version <= maxVersion) { continue; From 9c70bcea1a073f808efad315f840c3194fe2f45f Mon Sep 17 00:00:00 2001 From: David Fowler Date: Thu, 23 Jan 2025 22:05:12 -0800 Subject: [PATCH 5/7] Per version tracking to avoid misses across resources --- .../ResourceNotificationService.cs | 23 ++++++++++--------- 1 file changed, 12 insertions(+), 11 deletions(-) diff --git a/src/Aspire.Hosting/ApplicationModel/ResourceNotificationService.cs b/src/Aspire.Hosting/ApplicationModel/ResourceNotificationService.cs index 82928fe1ea1..3be7ac69141 100644 --- a/src/Aspire.Hosting/ApplicationModel/ResourceNotificationService.cs +++ b/src/Aspire.Hosting/ApplicationModel/ResourceNotificationService.cs @@ -23,7 +23,6 @@ public class ResourceNotificationService : IDisposable private readonly IServiceProvider _serviceProvider; private readonly CancellationTokenSource _disposing = new(); private readonly ResourceLoggerService _resourceLoggerService; - private static long s_maxVersion; private Action? OnResourceUpdated { get; set; } @@ -316,21 +315,21 @@ void WriteToChannel(ResourceEvent resourceEvent) => OnResourceUpdated += WriteToChannel; } - var maxVersion = 0L; - // Return the last snapshot for each resource. // We do this after subscribing to the event to avoid missing any updates. + + // Keep track of the versions we have seen so far to avoid duplicates. + var versionsSeen = new Dictionary<(IResource, string), long>(); + foreach (var state in _resourceNotificationStates) { var (resource, resourceId) = state.Key; - if (state.Value.LastSnapshot is { } ss) + if (state.Value.LastSnapshot is { } snapshot) { - // Keep track of the highest version we have seen so far. - // This is used to skip events that are older than the max version. - maxVersion = Math.Max(maxVersion, ss.Version); + versionsSeen[state.Key] = snapshot.Version; - yield return new ResourceEvent(resource, resourceId, state.Value.LastSnapshot); + yield return new ResourceEvent(resource, resourceId, snapshot); } } @@ -339,8 +338,9 @@ void WriteToChannel(ResourceEvent resourceEvent) => await foreach (var item in channel.Reader.ReadAllAsync(cancellationToken).ConfigureAwait(false)) { // Skip events that are older than the max version we have seen so far. This avoids duplicates. - if (item.Snapshot.Version <= maxVersion) + if (versionsSeen.TryGetValue((item.Resource, item.ResourceId), out var maxVersionSeen) && item.Snapshot.Version <= maxVersionSeen) { + versionsSeen.Remove((item.Resource, item.ResourceId)); continue; } @@ -374,8 +374,8 @@ public Task PublishUpdateAsync(IResource resource, string resourceId, Func private sealed class ResourceNotificationState { + public long LastVersion { get; set; } = 1; public CustomResourceSnapshot? LastSnapshot { get; set; } } } From c1bfc0ea123bb2d0a171c03edd5bee7918ad9967 Mon Sep 17 00:00:00 2001 From: David Fowler Date: Thu, 23 Jan 2025 22:37:38 -0800 Subject: [PATCH 6/7] More nits! --- .../ApplicationModel/CustomResourceSnapshot.cs | 2 +- .../ApplicationModel/ResourceNotificationService.cs | 5 +++-- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/src/Aspire.Hosting/ApplicationModel/CustomResourceSnapshot.cs b/src/Aspire.Hosting/ApplicationModel/CustomResourceSnapshot.cs index 31767cbb8f6..068c3fdd7ea 100644 --- a/src/Aspire.Hosting/ApplicationModel/CustomResourceSnapshot.cs +++ b/src/Aspire.Hosting/ApplicationModel/CustomResourceSnapshot.cs @@ -21,7 +21,7 @@ public sealed record CustomResourceSnapshot /// /// Monotonically increasing version number for the snapshot. /// - internal long Version { get; set; } + internal long Version { get; init; } /// /// The type of the resource. diff --git a/src/Aspire.Hosting/ApplicationModel/ResourceNotificationService.cs b/src/Aspire.Hosting/ApplicationModel/ResourceNotificationService.cs index 3be7ac69141..bc090f825f9 100644 --- a/src/Aspire.Hosting/ApplicationModel/ResourceNotificationService.cs +++ b/src/Aspire.Hosting/ApplicationModel/ResourceNotificationService.cs @@ -375,7 +375,7 @@ public Task PublishUpdateAsync(IResource resource, string resourceId, Func private sealed class ResourceNotificationState { - public long LastVersion { get; set; } = 1; + private long _lastVersion = 1; + public long GetNextVersion() => _lastVersion++; public CustomResourceSnapshot? LastSnapshot { get; set; } } } From 7e7ea9fa085c576f366a685e9db4c4a07f996d86 Mon Sep 17 00:00:00 2001 From: David Fowler Date: Thu, 23 Jan 2025 22:40:24 -0800 Subject: [PATCH 7/7] Added comments --- .../ApplicationModel/ResourceNotificationService.cs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/Aspire.Hosting/ApplicationModel/ResourceNotificationService.cs b/src/Aspire.Hosting/ApplicationModel/ResourceNotificationService.cs index bc090f825f9..b559e85c46a 100644 --- a/src/Aspire.Hosting/ApplicationModel/ResourceNotificationService.cs +++ b/src/Aspire.Hosting/ApplicationModel/ResourceNotificationService.cs @@ -340,6 +340,8 @@ void WriteToChannel(ResourceEvent resourceEvent) => // Skip events that are older than the max version we have seen so far. This avoids duplicates. if (versionsSeen.TryGetValue((item.Resource, item.ResourceId), out var maxVersionSeen) && item.Snapshot.Version <= maxVersionSeen) { + // We can remove the version from the seen list since we have seen it already. + // We only care about events we have returned to the caller versionsSeen.Remove((item.Resource, item.ResourceId)); continue; }