Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,11 @@ public sealed record CustomResourceSnapshot
private readonly ImmutableArray<HealthReportSnapshot> _healthReports = [];
private readonly ResourceStateSnapshot? _state;

/// <summary>
/// Monotonically increasing version number for the snapshot.
/// </summary>
internal long Version { get; init; }

/// <summary>
/// The type of the resource.
/// </summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -305,17 +305,6 @@ public async Task<ResourceEvent> WaitForResourceAsync(string resourceName, Func<
/// <returns></returns>
public async IAsyncEnumerable<ResourceEvent> WatchAsync([EnumeratorCancellation] CancellationToken cancellationToken = default)
{
// Return the last snapshot for each resource.
foreach (var state in _resourceNotificationStates)
{
var (resource, resourceId) = state.Key;

if (state.Value.LastSnapshot is not null)
{
yield return new ResourceEvent(resource, resourceId, state.Value.LastSnapshot);
}
}

var channel = Channel.CreateUnbounded<ResourceEvent>();

void WriteToChannel(ResourceEvent resourceEvent) =>
Expand All @@ -326,10 +315,37 @@ void WriteToChannel(ResourceEvent resourceEvent) =>
OnResourceUpdated += WriteToChannel;
}

// Return the last snapshot for each resource.
// We do this after subscribing to the event to avoid missing any updates.

// Keep track of the versions we have seen so far to avoid duplicates.
var versionsSeen = new Dictionary<(IResource, string), long>();

foreach (var state in _resourceNotificationStates)
{
var (resource, resourceId) = state.Key;

if (state.Value.LastSnapshot is { } snapshot)
{
versionsSeen[state.Key] = snapshot.Version;

yield return new ResourceEvent(resource, resourceId, snapshot);
}
}

try
{
await foreach (var item in channel.Reader.ReadAllAsync(cancellationToken).ConfigureAwait(false))
{
// Skip events that are older than the max version we have seen so far. This avoids duplicates.
if (versionsSeen.TryGetValue((item.Resource, item.ResourceId), out var maxVersionSeen) && item.Snapshot.Version <= maxVersionSeen)
{
// We can remove the version from the seen list since we have seen it already.
// We only care about events we have returned to the caller
versionsSeen.Remove((item.Resource, item.ResourceId));
Comment thread
davidfowl marked this conversation as resolved.
continue;
}

yield return item;
}
}
Expand Down Expand Up @@ -360,6 +376,9 @@ public Task PublishUpdateAsync(IResource resource, string resourceId, Func<Custo

var newState = stateFactory(previousState);

// Increment the snapshot version, this is a per resource version.
newState = newState with { Version = notificationState.GetNextVersion() };

newState = UpdateCommands(resource, newState);

notificationState.LastSnapshot = newState;
Expand All @@ -385,6 +404,7 @@ public Task PublishUpdateAsync(IResource resource, string resourceId, Func<Custo
{
_logger.LogTrace(
"""
Version: {Version}
Resource {Resource}/{ResourceId} update published:
ResourceType = {ResourceType},
CreationTimeStamp = {CreationTimeStamp:s},
Expand All @@ -400,6 +420,7 @@ public Task PublishUpdateAsync(IResource resource, string resourceId, Func<Custo
{Properties}
}}
""",
newState.Version,
resource.Name,
resourceId,
newState.ResourceType,
Expand Down Expand Up @@ -551,6 +572,8 @@ public void Dispose()
/// </summary>
private sealed class ResourceNotificationState
{
private long _lastVersion = 1;
public long GetNextVersion() => _lastVersion++;
public CustomResourceSnapshot? LastSnapshot { get; set; }
}
}
Expand Down