diff --git a/src/Aspire.Hosting/ApplicationModel/CustomResourceSnapshot.cs b/src/Aspire.Hosting/ApplicationModel/CustomResourceSnapshot.cs index 7f9b0fb27e7..068c3fdd7ea 100644 --- a/src/Aspire.Hosting/ApplicationModel/CustomResourceSnapshot.cs +++ b/src/Aspire.Hosting/ApplicationModel/CustomResourceSnapshot.cs @@ -18,6 +18,11 @@ public sealed record CustomResourceSnapshot private readonly ImmutableArray _healthReports = []; private readonly ResourceStateSnapshot? _state; + /// + /// Monotonically increasing version number for the snapshot. + /// + internal long Version { get; init; } + /// /// The type of the resource. /// diff --git a/src/Aspire.Hosting/ApplicationModel/ResourceNotificationService.cs b/src/Aspire.Hosting/ApplicationModel/ResourceNotificationService.cs index 7b56fb635e2..b559e85c46a 100644 --- a/src/Aspire.Hosting/ApplicationModel/ResourceNotificationService.cs +++ b/src/Aspire.Hosting/ApplicationModel/ResourceNotificationService.cs @@ -305,17 +305,6 @@ public async Task WaitForResourceAsync(string resourceName, Func< /// public async IAsyncEnumerable WatchAsync([EnumeratorCancellation] CancellationToken cancellationToken = default) { - // Return the last snapshot for each resource. - foreach (var state in _resourceNotificationStates) - { - var (resource, resourceId) = state.Key; - - if (state.Value.LastSnapshot is not null) - { - yield return new ResourceEvent(resource, resourceId, state.Value.LastSnapshot); - } - } - var channel = Channel.CreateUnbounded(); void WriteToChannel(ResourceEvent resourceEvent) => @@ -326,10 +315,37 @@ void WriteToChannel(ResourceEvent resourceEvent) => OnResourceUpdated += WriteToChannel; } + // Return the last snapshot for each resource. + // We do this after subscribing to the event to avoid missing any updates. + + // Keep track of the versions we have seen so far to avoid duplicates. + var versionsSeen = new Dictionary<(IResource, string), long>(); + + foreach (var state in _resourceNotificationStates) + { + var (resource, resourceId) = state.Key; + + if (state.Value.LastSnapshot is { } snapshot) + { + versionsSeen[state.Key] = snapshot.Version; + + yield return new ResourceEvent(resource, resourceId, snapshot); + } + } + try { await foreach (var item in channel.Reader.ReadAllAsync(cancellationToken).ConfigureAwait(false)) { + // Skip events that are older than the max version we have seen so far. This avoids duplicates. + if (versionsSeen.TryGetValue((item.Resource, item.ResourceId), out var maxVersionSeen) && item.Snapshot.Version <= maxVersionSeen) + { + // We can remove the version from the seen list since we have seen it already. + // We only care about events we have returned to the caller + versionsSeen.Remove((item.Resource, item.ResourceId)); + continue; + } + yield return item; } } @@ -360,6 +376,9 @@ public Task PublishUpdateAsync(IResource resource, string resourceId, Func private sealed class ResourceNotificationState { + private long _lastVersion = 1; + public long GetNextVersion() => _lastVersion++; public CustomResourceSnapshot? LastSnapshot { get; set; } } }