diff --git a/.editorconfig b/.editorconfig index 138ad077e..7bb2dff13 100644 --- a/.editorconfig +++ b/.editorconfig @@ -370,7 +370,7 @@ dotnet_diagnostic.SA1136.severity = error dotnet_diagnostic.SA1137.severity = error dotnet_diagnostic.SA1139.severity = error dotnet_diagnostic.SA1200.severity = none -dotnet_diagnostic.SA1201.severity = error +dotnet_diagnostic.SA1201.severity = none dotnet_diagnostic.SA1202.severity = error dotnet_diagnostic.SA1203.severity = error dotnet_diagnostic.SA1204.severity = error @@ -424,10 +424,10 @@ dotnet_diagnostic.SA1508.severity = error dotnet_diagnostic.SA1509.severity = error dotnet_diagnostic.SA1510.severity = error dotnet_diagnostic.SA1511.severity = error -dotnet_diagnostic.SA1512.severity = error -dotnet_diagnostic.SA1513.severity = error +dotnet_diagnostic.SA1512.severity = none +dotnet_diagnostic.SA1513.severity = none dotnet_diagnostic.SA1514.severity = error -dotnet_diagnostic.SA1515.severity = error +dotnet_diagnostic.SA1515.severity = none dotnet_diagnostic.SA1516.severity = error dotnet_diagnostic.SA1517.severity = error dotnet_diagnostic.SA1518.severity = error @@ -503,7 +503,7 @@ dotnet_diagnostic.RCS1058.severity=warning dotnet_diagnostic.RCS1068.severity=warning dotnet_diagnostic.RCS1073.severity=warning dotnet_diagnostic.RCS1084.severity=error -dotnet_diagnostic.RCS1085.severity=error +dotnet_diagnostic.RCS1085.severity=none dotnet_diagnostic.RCS1105.severity=error dotnet_diagnostic.RCS1112.severity=error dotnet_diagnostic.RCS1128.severity=error @@ -547,4 +547,4 @@ end_of_line = lf [*.{cmd, bat}] end_of_line = crlf -vsspell_dictionary_languages = en-US \ No newline at end of file +vsspell_dictionary_languages = en-US diff --git a/src/DynamicData.Tests/Cache/ExpireAfterFixture.ForSource.cs b/src/DynamicData.Tests/Cache/ExpireAfterFixture.ForSource.cs index 230f7d2c8..e7e726c2b 100644 --- a/src/DynamicData.Tests/Cache/ExpireAfterFixture.ForSource.cs +++ b/src/DynamicData.Tests/Cache/ExpireAfterFixture.ForSource.cs @@ -270,7 +270,7 @@ public void PollingIntervalIsGiven_RemovalsAreScheduledAtInterval() results.TryGetRecordedCompletion().Should().BeFalse(); } - [Fact(Skip = "Existing defect, very minor defect, items defined to never expire actually do, at DateTimeOffset.MaxValue")] + [Fact] public void PollingIntervalIsNotGiven_RemovalsAreScheduledImmediately() { using var source = CreateTestSource(); @@ -390,7 +390,7 @@ public void PollingIntervalIsNotGiven_RemovalsAreScheduledImmediately() } // Covers https://github.com/reactivemarbles/DynamicData/issues/716 - [Fact(Skip = "Existing defect, removals are skipped when scheduler invokes early")] + [Fact] public void SchedulerIsInaccurate_RemovalsAreNotSkipped() { using var source = CreateTestSource(); @@ -425,7 +425,7 @@ public void SchedulerIsInaccurate_RemovalsAreNotSkipped() results.TryGetRecordedCompletion().Should().BeFalse(); } - [Fact(Skip = "Existing defect, completion is not propagated from the source")] + [Fact] public void SourceCompletes_CompletionIsPropagated() { using var source = CreateTestSource(); @@ -454,7 +454,7 @@ public void SourceCompletes_CompletionIsPropagated() results.EnumerateInvalidNotifications().Should().BeEmpty(); } - [Fact(Skip = "Existing defect, completion is not propagated from the source")] + [Fact] public void SourceCompletesImmediately_CompletionIsPropagated() { using var source = CreateTestSource(); @@ -482,7 +482,7 @@ public void SourceCompletesImmediately_CompletionIsPropagated() results.EnumerateInvalidNotifications().Should().BeEmpty(); } - [Fact(Skip = "Exsiting defect, errors are re-thrown instead of propagated, operator does not use safe subscriptions")] + [Fact] public void SourceErrors_ErrorIsPropagated() { using var source = CreateTestSource(); @@ -512,7 +512,7 @@ public void SourceErrors_ErrorIsPropagated() results.EnumerateInvalidNotifications().Should().BeEmpty(); } - [Fact(Skip = "Existing defect, immediately-occuring error is not propagated")] + [Fact] public void SourceErrorsImmediately_ErrorIsPropagated() { using var source = CreateTestSource(); @@ -552,7 +552,7 @@ public void SourceIsNull_ThrowsException() interval: null)) .Should().Throw(); - [Fact(Skip = "Existing defect, operator does not properly handle items with a null timeout, when using a real scheduler, it passes a TimeSpan to the scheduler that is outside of the supported range")] + [Fact] public async Task ThreadPoolSchedulerIsUsedWithoutPolling_ExpirationIsThreadSafe() { using var source = new TestSourceCache(static item => item.Id); @@ -624,7 +624,7 @@ public void TimeSelectorIsNull_ThrowsException() interval: null)) .Should().Throw(); - [Fact(Skip = "Exsiting defect, errors are re-thrown instead of propagated, user code is not protected")] + [Fact] public void TimeSelectorThrows_ErrorIsPropagated() { using var source = CreateTestSource(); diff --git a/src/DynamicData.Tests/Cache/ExpireAfterFixture.ForStream.cs b/src/DynamicData.Tests/Cache/ExpireAfterFixture.ForStream.cs index 37f2a312a..0af723751 100644 --- a/src/DynamicData.Tests/Cache/ExpireAfterFixture.ForStream.cs +++ b/src/DynamicData.Tests/Cache/ExpireAfterFixture.ForStream.cs @@ -356,7 +356,7 @@ public void PollingIntervalIsGiven_RemovalsAreScheduledAtInterval() results.IsCompleted.Should().BeFalse(); } - [Fact(Skip = "Existing defect, very minor defect, items defined to never expire actually do, at DateTimeOffset.MaxValue")] + [Fact] public void PollingIntervalIsNotGiven_RemovalsAreScheduledImmediately() { using var source = new Subject>(); @@ -499,7 +499,7 @@ public void PollingIntervalIsNotGiven_RemovalsAreScheduledImmediately() results.IsCompleted.Should().BeFalse(); } - [Fact(Skip = "Existing defect, completion does not wait")] + [Fact] public void RemovalsArePending_CompletionWaitsForRemovals() { using var source = new Subject>(); @@ -550,7 +550,7 @@ public void RemovalsArePending_CompletionWaitsForRemovals() } // Covers https://github.com/reactivemarbles/DynamicData/issues/716 - [Fact(Skip = "Existing defect, removals are skipped when scheduler invokes early")] + [Fact] public void SchedulerIsInaccurate_RemovalsAreNotSkipped() { using var source = new Subject>(); @@ -742,7 +742,7 @@ public void SourceIsNull_ThrowsException() timeSelector: static _ => default)) .Should().Throw(); - [Fact(Skip = "Existing defect, operator does not properly handle items with a null timeout, when using a real scheduler, it passes a TimeSpan to the scheduler that is outside of the supported range")] + [Fact] public async Task ThreadPoolSchedulerIsUsedWithoutPolling_ExpirationIsThreadSafe() { using var source = new Subject>(); @@ -809,7 +809,7 @@ public void TimeSelectorIsNull_ThrowsException() timeSelector: null!)) .Should().Throw(); - [Fact(Skip = "Exsiting defect, errors are re-thrown instead of propagated, user code is not protected")] + [Fact] public void TimeSelectorThrows_ErrorIsPropagated() { using var source = new Subject>(); diff --git a/src/DynamicData.Tests/List/ExpireAfterFixture.cs b/src/DynamicData.Tests/List/ExpireAfterFixture.cs index d728d734b..8db5bbf5f 100644 --- a/src/DynamicData.Tests/List/ExpireAfterFixture.cs +++ b/src/DynamicData.Tests/List/ExpireAfterFixture.cs @@ -137,7 +137,7 @@ public void NextItemToExpireIsReplaced_ExpirationIsRescheduledIfNeeded() results.TryGetRecordedCompletion().Should().BeFalse(); } - [Fact(Skip = "Existing defect, operator emits empty sets of expired items, instead of skipping emission")] + [Fact] public void PollingIntervalIsGiven_RemovalsAreScheduledAtInterval() { using var source = new TestSourceList(); @@ -276,7 +276,7 @@ public void PollingIntervalIsGiven_RemovalsAreScheduledAtInterval() results.TryGetRecordedCompletion().Should().BeFalse(); } - [Fact(Skip = "Existing defect, very minor defect, items defined to never expire actually do, at DateTimeOffset.MaxValue")] + [Fact] public void PollingIntervalIsNotGiven_RemovalsAreScheduledImmediately() { using var source = new TestSourceList(); @@ -393,7 +393,7 @@ public void PollingIntervalIsNotGiven_RemovalsAreScheduledImmediately() } // Covers https://github.com/reactivemarbles/DynamicData/issues/716 - [Fact(Skip = "Existing defect, removals are skipped when scheduler invokes early")] + [Fact] public void SchedulerIsInaccurate_RemovalsAreNotSkipped() { using var source = new TestSourceList(); @@ -428,7 +428,7 @@ public void SchedulerIsInaccurate_RemovalsAreNotSkipped() results.TryGetRecordedCompletion().Should().BeFalse(); } - [Fact(Skip = "Existing defect, completion is not propagated from the source")] + [Fact] public void SourceCompletes_CompletionIsPropagated() { using var source = new TestSourceList(); @@ -457,7 +457,7 @@ public void SourceCompletes_CompletionIsPropagated() results.EnumerateInvalidNotifications().Should().BeEmpty(); } - [Fact(Skip = "Existing defect, completion is not propagated from the source")] + [Fact] public void SourceCompletesImmediately_CompletionIsPropagated() { using var source = new TestSourceList(); @@ -485,7 +485,7 @@ public void SourceCompletesImmediately_CompletionIsPropagated() results.EnumerateInvalidNotifications().Should().BeEmpty(); } - [Fact(Skip = "Exsiting defect, errors are re-thrown instead of propagated, operator does not use safe subscriptions")] + [Fact] public void SourceErrors_ErrorIsPropagated() { using var source = new TestSourceList(); @@ -515,7 +515,7 @@ public void SourceErrors_ErrorIsPropagated() results.EnumerateInvalidNotifications().Should().BeEmpty(); } - [Fact(Skip = "Existing defect, immediately-occuring error is not propagated")] + [Fact] public void SourceErrorsImmediately_ErrorIsPropagated() { using var source = new TestSourceList(); @@ -555,7 +555,7 @@ public void SourceIsNull_ThrowsException() pollingInterval: null)) .Should().Throw(); - [Fact(Skip = "Existing defect, operator does not properly handle items with a null timeout, when using a real scheduler, it passes a TimeSpan to the scheduler that is outside of the supported range")] + [Fact] public async Task ThreadPoolSchedulerIsUsedWithoutPolling_ExpirationIsThreadSafe() { using var source = new TestSourceList(); @@ -587,7 +587,7 @@ public async Task ThreadPoolSchedulerIsUsedWithoutPolling_ExpirationIsThreadSafe _output.WriteLine($"{results.EnumerateRecordedValues().Count()} Expirations occurred, for {results.EnumerateRecordedValues().SelectMany(static item => item).Count()} items"); } - [Fact(Skip = "Existing defect, deadlocks")] + [Fact] public async Task ThreadPoolSchedulerIsUsedWithPolling_ExpirationIsThreadSafe() { using var source = new TestSourceList(); @@ -627,7 +627,7 @@ public void TimeSelectorIsNull_ThrowsException() pollingInterval: null)) .Should().Throw(); - [Fact(Skip = "Exsiting defect, errors are re-thrown instead of propagated, user code is not protected")] + [Fact] public void TimeSelectorThrows_ThrowsException() { using var source = new TestSourceList(); diff --git a/src/DynamicData.Tests/Utilities/FakeScheduler.cs b/src/DynamicData.Tests/Utilities/FakeScheduler.cs index 0e21fecd6..fc2b19bbf 100644 --- a/src/DynamicData.Tests/Utilities/FakeScheduler.cs +++ b/src/DynamicData.Tests/Utilities/FakeScheduler.cs @@ -71,11 +71,9 @@ public void SimulateUntilIdle(TimeSpan inaccuracyOffset = default) ScheduledActions[0].Invoke(); ScheduledActions.RemoveAt(0); } - // If the next action is in the future, advance time by 1ms. - else - { - Now += TimeSpan.FromMilliseconds(1); - } + + // Advance time by at least one tick after every action, to eliminate infinite-looping + Now += TimeSpan.FromTicks(1); } } diff --git a/src/DynamicData.Tests/Utilities/TestSourceCache.cs b/src/DynamicData.Tests/Utilities/TestSourceCache.cs index 605ac3b74..f08be1598 100644 --- a/src/DynamicData.Tests/Utilities/TestSourceCache.cs +++ b/src/DynamicData.Tests/Utilities/TestSourceCache.cs @@ -58,9 +58,9 @@ public void Complete() public void Dispose() { + _source.Dispose(); _error.Dispose(); _hasCompleted.Dispose(); - _source.Dispose(); } public void Edit(Action> updateAction) diff --git a/src/DynamicData.Tests/Utilities/TestSourceList.cs b/src/DynamicData.Tests/Utilities/TestSourceList.cs index fbf6bb24f..e708c7239 100644 --- a/src/DynamicData.Tests/Utilities/TestSourceList.cs +++ b/src/DynamicData.Tests/Utilities/TestSourceList.cs @@ -44,9 +44,9 @@ public void Complete() public void Dispose() { + _source.Dispose(); _error.Dispose(); _hasCompleted.Dispose(); - _source.Dispose(); } public void Edit(Action> updateAction) diff --git a/src/DynamicData/Cache/Internal/ExpireAfter.ForSource.cs b/src/DynamicData/Cache/Internal/ExpireAfter.ForSource.cs new file mode 100644 index 000000000..e995c8512 --- /dev/null +++ b/src/DynamicData/Cache/Internal/ExpireAfter.ForSource.cs @@ -0,0 +1,399 @@ +// Copyright (c) 2011-2023 Roland Pheasant. All rights reserved. +// Roland Pheasant licenses this file to you under the MIT license. +// See the LICENSE file in the project root for full license information. + +using System.Reactive; +using System.Reactive.Concurrency; +using System.Reactive.Linq; + +namespace DynamicData.Cache.Internal; + +internal static partial class ExpireAfter +{ + public static class ForSource + where TObject : notnull + where TKey : notnull + { + public static IObservable>> Create( + ISourceCache source, + Func timeSelector, + TimeSpan? pollingInterval = null, + IScheduler? scheduler = null) + { + source.ThrowArgumentNullExceptionIfNull(nameof(source)); + timeSelector.ThrowArgumentNullExceptionIfNull(nameof(timeSelector)); + + return Observable.Create>>(observer => (pollingInterval is TimeSpan pollingIntervalValue) + ? new PollingSubscription( + observer: observer, + pollingInterval: pollingIntervalValue, + scheduler: scheduler, + source: source, + timeSelector: timeSelector) + : new OnDemandSubscription( + observer: observer, + scheduler: scheduler, + source: source, + timeSelector: timeSelector)); + } + + private abstract class SubscriptionBase + : IDisposable + { + private readonly Dictionary _expirationDueTimesByKey; + private readonly IObserver>> _observer; + private readonly List _proposedExpirationsQueue; + private readonly List> _removedItemsBuffer; + private readonly IScheduler _scheduler; + private readonly ISourceCache _source; + private readonly IDisposable _sourceSubscription; + private readonly Func _timeSelector; + + private bool _hasSourceCompleted; + private ScheduledManagement? _nextScheduledManagement; + + protected SubscriptionBase( + IObserver>> observer, + IScheduler? scheduler, + ISourceCache source, + Func timeSelector) + { + _observer = observer; + _source = source; + _timeSelector = timeSelector; + + _scheduler = scheduler ?? GlobalConfig.DefaultScheduler; + + _expirationDueTimesByKey = new(); + _proposedExpirationsQueue = new(); + _removedItemsBuffer = new(); + + _sourceSubscription = source + .Connect() + // It's important to set this flag outside the context of a lock, because it'll be read outside of lock as well. + .Finally(() => _hasSourceCompleted = true) + .Synchronize(SynchronizationGate) + .SubscribeSafe(Observer.Create>( + onNext: OnSourceNext, + onError: OnSourceError, + onCompleted: OnSourceCompleted)); + } + + public void Dispose() + { + lock (SynchronizationGate) + { + _sourceSubscription.Dispose(); + + TryCancelNextScheduledManagement(); + } + } + + protected IScheduler Scheduler + => _scheduler; + + // Instead of using a dedicated _synchronizationGate object, we can save an allocation by using any object that is never exposed to public consumers. + protected object SynchronizationGate + => _expirationDueTimesByKey; + + protected abstract DateTimeOffset? GetNextManagementDueTime(); + + protected DateTimeOffset? GetNextProposedExpirationDueTime() + => _proposedExpirationsQueue.Count is 0 + ? null + : _proposedExpirationsQueue[0].DueTime; + + protected abstract void OnExpirationsManaged(DateTimeOffset dueTime); + + private void ClearExpiration(TKey key) + // This is what puts the "proposed" in _proposedExpirationsQueue. + // Finding the position of the item to remove from the queue would be O(log n), at best, + // so just leave it and flush it later during normal processing of the queue. + => _expirationDueTimesByKey.Remove(key); + + private void ManageExpirations() + { + // This check is needed, to make sure we don't try and call .Edit() on a disposed _source, + // since the scheduler only promises "best effort" to cancel a scheduled action. + // It's safe to skip locking here becuase once this flag is set, it's never unset. + if (_hasSourceCompleted) + return; + + // Putting the entire management process here inside a .Edit() call for a couple of reasons: + // - It keeps the edit delegate from becoming a closure + // - It batches multiple expirations occurring at the same time into one source operation, so it only emits one changeset. + // - It eliminates the possibility of _itemStatesByKey and other internal state becoming out-of-sync with _source, by effectively locking _source. + // - It eliminates a rare deadlock that I honestly can't fully explain, but was able to reproduce reliably with few hundred iterations of the ThreadPoolSchedulerIsUsedWithoutPolling_ExpirationIsThreadSafe test. + _source.Edit(updater => + { + lock (SynchronizationGate) + { + // The scheduler only promises "best effort" to cancel scheduled operations, so we need to make sure. + if (_nextScheduledManagement is not ScheduledManagement thisScheduledManagement) + return; + + _nextScheduledManagement = null; + + var now = Scheduler.Now; + + // Buffer removals, so we can optimize the allocation for the final changeset, or skip it entirely. + // Also, so we can optimize removal from the queue as a range removal. + var proposedExpirationIndex = 0; + for (; proposedExpirationIndex < _proposedExpirationsQueue.Count; ++proposedExpirationIndex) + { + var proposedExpiration = _proposedExpirationsQueue[proposedExpirationIndex]; + if (proposedExpiration.DueTime > now) + { + break; + } + + // The state of _expirationQueue is allowed to go out-of-sync with _expirationDueTimesByKey, + // so make sure the item still needs to be removed, before removing it. + if (_expirationDueTimesByKey.TryGetValue(proposedExpiration.Key, out var expirationDueTime) && (expirationDueTime <= now)) + { + _expirationDueTimesByKey.Remove(proposedExpiration.Key); + + _removedItemsBuffer.Add(new( + key: proposedExpiration.Key, + value: updater.Lookup(proposedExpiration.Key).Value)); + + updater.RemoveKey(proposedExpiration.Key); + } + } + _proposedExpirationsQueue.RemoveRange(0, proposedExpirationIndex); + + // We can end up with no expiring items here because the scheduler only promises "best effort" to cancel scheduled operations, + // or because of a race condition with the source. + if (_removedItemsBuffer.Count is not 0) + { + _observer.OnNext(_removedItemsBuffer.ToArray()); + + _removedItemsBuffer.Clear(); + } + + OnExpirationsManaged(thisScheduledManagement.DueTime); + + // We just changed the expirations queue, so run cleanup and management scheduling. + OnExpirationsChanged(); + } + }); + } + + private void OnExpirationsChanged() + { + // Clear out any expirations at the front of the queue that are no longer valid. + var removeToIndex = _proposedExpirationsQueue.FindIndex(expiration => _expirationDueTimesByKey.ContainsKey(expiration.Key)); + if (removeToIndex > 0) + _proposedExpirationsQueue.RemoveRange(0, removeToIndex); + + // Check if we need to re-schedule the next management operation + if (GetNextManagementDueTime() is DateTimeOffset nextManagementDueTime) + { + if (_nextScheduledManagement?.DueTime != nextManagementDueTime) + { + if (_nextScheduledManagement is ScheduledManagement nextScheduledManagement) + nextScheduledManagement.Cancellation.Dispose(); + + _nextScheduledManagement = new() + { + Cancellation = _scheduler.Schedule( + dueTime: nextManagementDueTime, + action: ManageExpirations), + DueTime = nextManagementDueTime + }; + } + } + else + { + TryCancelNextScheduledManagement(); + } + } + + private void OnSourceCompleted() + { + // If the source completes, we can no longer remove items from it, so any pending expirations are moot. + TryCancelNextScheduledManagement(); + + _observer.OnCompleted(); + } + + private void OnSourceError(Exception error) + { + TryCancelNextScheduledManagement(); + + _observer.OnError(error); + } + + private void OnSourceNext(IChangeSet changes) + { + try + { + var now = _scheduler.Now; + + var haveExpirationsChanged = false; + + foreach (var change in changes.ToConcreteType()) + { + switch (change.Reason) + { + case ChangeReason.Add: + { + if (_timeSelector.Invoke(change.Current) is TimeSpan expireAfter) + { + haveExpirationsChanged |= TrySetExpiration( + key: change.Key, + dueTime: now + expireAfter); + } + } + break; + + case ChangeReason.Remove: + ClearExpiration(change.Key); + haveExpirationsChanged = true; + break; + + case ChangeReason.Update: + { + if (_timeSelector.Invoke(change.Current) is TimeSpan expireAfter) + { + haveExpirationsChanged = TrySetExpiration( + key: change.Key, + dueTime: now + expireAfter); + } + else + { + ClearExpiration(change.Key); + haveExpirationsChanged = true; + } + } + break; + } + } + + if (haveExpirationsChanged) + OnExpirationsChanged(); + } + catch (Exception error) + { + TryCancelNextScheduledManagement(); + + _observer.OnError(error); + } + } + + private void TryCancelNextScheduledManagement() + { + _nextScheduledManagement?.Cancellation.Dispose(); + _nextScheduledManagement = null; + } + + private bool TrySetExpiration( + TKey key, + DateTimeOffset dueTime) + { + var oldDueTime = _expirationDueTimesByKey.TryGetValue(key, out var existingDueTime) + ? existingDueTime + : null as DateTimeOffset?; + + // Always update the item state, cause even if ExpireAt doesn't change, the item itself might have. + _expirationDueTimesByKey[key] = dueTime; + + if (dueTime == oldDueTime) + return false; + + var insertionIndex = _proposedExpirationsQueue.BinarySearch(dueTime, static (dueTime, expiration) => dueTime.CompareTo(expiration.DueTime)); + if (insertionIndex < 0) + insertionIndex = ~insertionIndex; + + _proposedExpirationsQueue.Insert( + index: insertionIndex, + item: new() + { + DueTime = dueTime, + Key = key + }); + + // Intentionally not removing the old expiration for this item, if applicable, see ClearExpiration() + + return true; + } + + private readonly struct ProposedExpiration + { + public required DateTimeOffset DueTime { get; init; } + + public required TKey Key { get; init; } + } + + private readonly struct ScheduledManagement + { + public required IDisposable Cancellation { get; init; } + + public required DateTimeOffset DueTime { get; init; } + } + } + + private sealed class OnDemandSubscription + : SubscriptionBase + { + public OnDemandSubscription( + IObserver>> observer, + IScheduler? scheduler, + ISourceCache source, + Func timeSelector) + : base( + observer, + scheduler, + source, + timeSelector) + { + } + + protected override DateTimeOffset? GetNextManagementDueTime() + => GetNextProposedExpirationDueTime(); + + protected override void OnExpirationsManaged(DateTimeOffset dueTime) + { + } + } + + private sealed class PollingSubscription + : SubscriptionBase + { + private readonly TimeSpan _pollingInterval; + + private DateTimeOffset _lastManagementDueTime; + + public PollingSubscription( + IObserver>> observer, + TimeSpan pollingInterval, + IScheduler? scheduler, + ISourceCache source, + Func timeSelector) + : base( + observer, + scheduler, + source, + timeSelector) + { + _pollingInterval = pollingInterval; + + _lastManagementDueTime = Scheduler.Now; + } + + protected override DateTimeOffset? GetNextManagementDueTime() + { + var now = Scheduler.Now; + var nextDueTime = _lastManagementDueTime + _pollingInterval; + + // Throttle down the polling frequency if polls are taking longer than the ideal interval. + return (nextDueTime > now) + ? nextDueTime + : now; + } + + protected override void OnExpirationsManaged(DateTimeOffset dueTime) + => _lastManagementDueTime = dueTime; + } + } +} diff --git a/src/DynamicData/Cache/Internal/ExpireAfter.ForStream.cs b/src/DynamicData/Cache/Internal/ExpireAfter.ForStream.cs new file mode 100644 index 000000000..81ac338b8 --- /dev/null +++ b/src/DynamicData/Cache/Internal/ExpireAfter.ForStream.cs @@ -0,0 +1,400 @@ +// Copyright (c) 2011-2023 Roland Pheasant. All rights reserved. +// Roland Pheasant licenses this file to you under the MIT license. +// See the LICENSE file in the project root for full license information. + +using System.Reactive; +using System.Reactive.Concurrency; +using System.Reactive.Linq; + +namespace DynamicData.Cache.Internal; + +internal static partial class ExpireAfter +{ + public static class ForStream + where TObject : notnull + where TKey : notnull + { + public static IObservable> Create( + IObservable> source, + Func timeSelector, + TimeSpan? pollingInterval = null, + IScheduler? scheduler = null) + { + source.ThrowArgumentNullExceptionIfNull(nameof(source)); + timeSelector.ThrowArgumentNullExceptionIfNull(nameof(timeSelector)); + + return Observable.Create>(observer => (pollingInterval is TimeSpan pollingIntervalValue) + ? new PollingSubscription( + observer: observer, + pollingInterval: pollingIntervalValue, + scheduler: scheduler, + source: source, + timeSelector: timeSelector) + : new OnDemandSubscription( + observer: observer, + scheduler: scheduler, + source: source, + timeSelector: timeSelector)); + } + + private abstract class SubscriptionBase + : IDisposable + { + private readonly Dictionary _expirationDueTimesByKey; + private readonly ChangeAwareCache _itemsCache; + private readonly IObserver> _observer; + private readonly List _proposedExpirationsQueue; + private readonly IScheduler _scheduler; + private readonly IDisposable _sourceSubscription; + private readonly Func _timeSelector; + + private bool _hasSourceCompleted; + private ScheduledManagement? _nextScheduledManagement; + + protected SubscriptionBase( + IObserver> observer, + IScheduler? scheduler, + IObservable> source, + Func timeSelector) + { + _observer = observer; + _timeSelector = timeSelector; + + _scheduler = scheduler ?? GlobalConfig.DefaultScheduler; + + _expirationDueTimesByKey = new(); + _itemsCache = new(); + _proposedExpirationsQueue = new(); + + _sourceSubscription = source + .Synchronize(SynchronizationGate) + .SubscribeSafe(Observer.Create>( + onNext: OnSourceNext, + onError: OnSourceError, + onCompleted: OnSourceCompleted)); + } + + public void Dispose() + { + lock (SynchronizationGate) + { + _sourceSubscription.Dispose(); + + TryCancelNextScheduledManagement(); + } + } + + protected IScheduler Scheduler + => _scheduler; + + // Instead of using a dedicated _synchronizationGate object, we can save an allocation by using any object that is never exposed to public consumers. + protected object SynchronizationGate + => _expirationDueTimesByKey; + + protected abstract DateTimeOffset? GetNextManagementDueTime(); + + protected DateTimeOffset? GetNextProposedExpirationDueTime() + => _proposedExpirationsQueue.Count is 0 + ? null + : _proposedExpirationsQueue[0].DueTime; + + protected abstract void OnExpirationsManaged(DateTimeOffset dueTime); + + private void ClearExpiration(TKey key) + // This is what puts the "proposed" in _proposedExpirationsQueue. + // Finding the position of the item to remove from the queue would be O(log n), at best, + // so just leave it and flush it later during normal processing of the queue. + => _expirationDueTimesByKey.Remove(key); + + private void ManageExpirations() + { + lock (SynchronizationGate) + { + // The scheduler only promises "best effort" to cancel scheduled operations, so we need to make sure. + if (_nextScheduledManagement is not ScheduledManagement thisScheduledManagement) + return; + + _nextScheduledManagement = null; + + var now = Scheduler.Now; + + // Buffer removals, so we can optimize the allocation for the final changeset, or skip it entirely. + // Also, so we can optimize removal from the queue as a range removal. + var proposedExpirationIndex = 0; + for (; proposedExpirationIndex < _proposedExpirationsQueue.Count; ++proposedExpirationIndex) + { + var proposedExpiration = _proposedExpirationsQueue[proposedExpirationIndex]; + if (proposedExpiration.DueTime > now) + { + break; + } + + // The state of _expirationQueue is allowed to go out-of-sync with _itemStatesByKey, + // so make sure the item still needs to be removed, before removing it. + if (_expirationDueTimesByKey.TryGetValue(proposedExpiration.Key, out var expirationDueTime) && (expirationDueTime <= now)) + { + _expirationDueTimesByKey.Remove(proposedExpiration.Key); + + _itemsCache.Remove(proposedExpiration.Key); + } + } + _proposedExpirationsQueue.RemoveRange(0, proposedExpirationIndex); + + // The scheduler only promises "best effort" to cancel scheduled operations, so we can end up with no items being expired. + var downstreamChanges = _itemsCache.CaptureChanges(); + if (downstreamChanges.Count is not 0) + _observer.OnNext(downstreamChanges); + + OnExpirationsManaged(thisScheduledManagement.DueTime); + + // We just changed the expirations queue, so run cleanup and management scheduling. + OnExpirationsChanged(); + } + } + + private void OnExpirationsChanged() + { + // Clear out any expirations at the front of the queue that are no longer valid. + var removeToIndex = _proposedExpirationsQueue.FindIndex(expiration => _expirationDueTimesByKey.ContainsKey(expiration.Key)); + if (removeToIndex > 0) + _proposedExpirationsQueue.RemoveRange(0, removeToIndex); + + // If we're out of items to expire, and the source has completed, we'll never have any further changes to publish. + if ((_expirationDueTimesByKey.Count is 0) && _hasSourceCompleted) + { + TryCancelNextScheduledManagement(); + + _observer.OnCompleted(); + + return; + } + + // Check if we need to re-schedule the next management operation + if (GetNextManagementDueTime() is DateTimeOffset nextManagementDueTime) + { + if (_nextScheduledManagement?.DueTime != nextManagementDueTime) + { + if (_nextScheduledManagement is ScheduledManagement nextScheduledManagement) + nextScheduledManagement.Cancellation.Dispose(); + + _nextScheduledManagement = new() + { + Cancellation = _scheduler.Schedule( + dueTime: nextManagementDueTime, + action: ManageExpirations), + DueTime = nextManagementDueTime + }; + } + } + else + { + TryCancelNextScheduledManagement(); + } + } + + private void OnSourceCompleted() + { + _hasSourceCompleted = true; + + // Postpone downstream completion if there are any expirations pending. + if (_expirationDueTimesByKey.Count is 0) + { + TryCancelNextScheduledManagement(); + + _observer.OnCompleted(); + } + } + + private void OnSourceError(Exception error) + { + TryCancelNextScheduledManagement(); + + _observer.OnError(error); + } + + private void OnSourceNext(IChangeSet upstreamChanges) + { + try + { + var now = _scheduler.Now; + + var haveExpirationsChanged = false; + + foreach (var change in upstreamChanges.ToConcreteType()) + { + switch (change.Reason) + { + case ChangeReason.Add: + { + if (_timeSelector.Invoke(change.Current) is TimeSpan expireAfter) + { + haveExpirationsChanged |= TrySetExpiration( + key: change.Key, + dueTime: now + expireAfter); + } + _itemsCache.AddOrUpdate(change.Current, change.Key); + } + break; + + // Ignore Move changes completely, as this is functionally really just a fancy filter operator. + + case ChangeReason.Remove: + ClearExpiration(change.Key); + _itemsCache.Remove(change.Key); + break; + + case ChangeReason.Refresh: + _itemsCache.Refresh(change.Key); + break; + + case ChangeReason.Update: + { + if (_timeSelector.Invoke(change.Current) is TimeSpan expireAfter) + { + haveExpirationsChanged = TrySetExpiration( + key: change.Key, + dueTime: now + expireAfter); + } + else + { + ClearExpiration(change.Key); + haveExpirationsChanged = true; + } + + _itemsCache.AddOrUpdate(change.Current, change.Key); + } + break; + } + } + + if (haveExpirationsChanged) + OnExpirationsChanged(); + + var downstreamChanges = _itemsCache.CaptureChanges(); + if (downstreamChanges.Count is not 0) + _observer.OnNext(downstreamChanges); + } + catch (Exception error) + { + TryCancelNextScheduledManagement(); + + _observer.OnError(error); + } + } + + private void TryCancelNextScheduledManagement() + { + _nextScheduledManagement?.Cancellation.Dispose(); + _nextScheduledManagement = null; + } + + private bool TrySetExpiration( + TKey key, + DateTimeOffset dueTime) + { + var oldDueTime = _expirationDueTimesByKey.TryGetValue(key, out var expirationDueTime) + ? expirationDueTime + : null as DateTimeOffset?; + + // Always update the item state, cause even if ExpireAt doesn't change, the item itself might have. + _expirationDueTimesByKey[key] = dueTime; + + if (dueTime == oldDueTime) + return false; + + var insertionIndex = _proposedExpirationsQueue.BinarySearch(dueTime, static (expireAt, expiration) => expireAt.CompareTo(expiration.DueTime)); + if (insertionIndex < 0) + insertionIndex = ~insertionIndex; + + _proposedExpirationsQueue.Insert( + index: insertionIndex, + item: new() + { + DueTime = dueTime, + Key = key + }); + + // Intentionally not removing the old expiration for this item, if applicable, see ClearExpiration() + + return true; + } + + private readonly struct ProposedExpiration + { + public required DateTimeOffset DueTime { get; init; } + + public required TKey Key { get; init; } + } + + private readonly struct ScheduledManagement + { + public required IDisposable Cancellation { get; init; } + + public required DateTimeOffset DueTime { get; init; } + } + } + + private sealed class OnDemandSubscription + : SubscriptionBase + { + public OnDemandSubscription( + IObserver> observer, + IScheduler? scheduler, + IObservable> source, + Func timeSelector) + : base( + observer, + scheduler, + source, + timeSelector) + { + } + + protected override DateTimeOffset? GetNextManagementDueTime() + => GetNextProposedExpirationDueTime(); + + protected override void OnExpirationsManaged(DateTimeOffset dueTime) + { + } + } + + private sealed class PollingSubscription + : SubscriptionBase + { + private readonly TimeSpan _pollingInterval; + + private DateTimeOffset _lastManagementDueTime; + + public PollingSubscription( + IObserver> observer, + TimeSpan pollingInterval, + IScheduler? scheduler, + IObservable> source, + Func timeSelector) + : base( + observer, + scheduler, + source, + timeSelector) + { + _pollingInterval = pollingInterval; + + _lastManagementDueTime = Scheduler.Now; + } + + protected override DateTimeOffset? GetNextManagementDueTime() + { + var now = Scheduler.Now; + var nextDueTime = _lastManagementDueTime + _pollingInterval; + + // Throttle down the polling frequency if polls are taking longer than the ideal interval. + return (nextDueTime > now) + ? nextDueTime + : now; + } + + protected override void OnExpirationsManaged(DateTimeOffset dueTime) + => _lastManagementDueTime = dueTime; + } + } +} diff --git a/src/DynamicData/Cache/Internal/TimeExpirer.cs b/src/DynamicData/Cache/Internal/TimeExpirer.cs deleted file mode 100644 index ad48c800b..000000000 --- a/src/DynamicData/Cache/Internal/TimeExpirer.cs +++ /dev/null @@ -1,105 +0,0 @@ -// Copyright (c) 2011-2023 Roland Pheasant. All rights reserved. -// Roland Pheasant licenses this file to you under the MIT license. -// See the LICENSE file in the project root for full license information. - -using System.Reactive.Concurrency; -using System.Reactive.Disposables; -using System.Reactive.Linq; - -using DynamicData.Kernel; - -namespace DynamicData.Cache.Internal; - -internal sealed class TimeExpirer(IObservable> source, Func timeSelector, TimeSpan? interval, IScheduler scheduler) - where TObject : notnull - where TKey : notnull -{ - private readonly IObservable> _source = source ?? throw new ArgumentNullException(nameof(source)); - - private readonly Func _timeSelector = timeSelector ?? throw new ArgumentNullException(nameof(timeSelector)); - - public IObservable> ExpireAfter() => Observable.Create>( - observer => - { - var cache = new IntermediateCache(_source); - - var published = cache.Connect().Publish(); - var subscriber = published.SubscribeSafe(observer); - - var autoRemover = published.ForExpiry(_timeSelector, interval, scheduler).Finally(observer.OnCompleted).Subscribe( - keys => - { - try - { - cache.Edit(updater => updater.Remove(keys.Select(kv => kv.Key))); - } - catch (Exception ex) - { - observer.OnError(ex); - } - }); - - var connected = published.Connect(); - - return Disposable.Create( - () => - { - connected.Dispose(); - subscriber.Dispose(); - autoRemover.Dispose(); - cache.Dispose(); - }); - }); - - public IObservable>> ForExpiry() => Observable.Create>>( - observer => - { - var dateTime = DateTime.Now; - - var autoRemover = _source.Do(_ => dateTime = scheduler.Now.UtcDateTime).Transform( - (t, v) => - { - var removeAt = _timeSelector(t); - var expireAt = removeAt.HasValue ? dateTime.Add(removeAt.Value) : DateTime.MaxValue; - return new ExpirableItem(t, v, expireAt); - }).AsObservableCache(); - - void RemovalAction() - { - try - { - var toRemove = autoRemover.KeyValues.Where(kv => kv.Value.ExpireAt <= scheduler.Now.UtcDateTime).ToList(); - - observer.OnNext(toRemove.ConvertAll(kv => new KeyValuePair(kv.Key, kv.Value.Value))); - } - catch (Exception ex) - { - observer.OnError(ex); - } - } - - var removalSubscription = new SingleAssignmentDisposable(); - if (interval.HasValue) - { - // use polling - removalSubscription.Disposable = scheduler.ScheduleRecurringAction(interval.Value, RemovalAction); - } - else - { - // create a timer for each distinct time - removalSubscription.Disposable = autoRemover.Connect().DistinctValues(ei => ei.ExpireAt).SubscribeMany( - datetime => - { - var expireAt = datetime.Subtract(scheduler.Now.UtcDateTime); - return Observable.Timer(expireAt, scheduler).Take(1).Subscribe(_ => RemovalAction()); - }).Subscribe(); - } - - return Disposable.Create( - () => - { - removalSubscription.Dispose(); - autoRemover.Dispose(); - }); - }); -} diff --git a/src/DynamicData/Cache/ObservableCacheEx.cs b/src/DynamicData/Cache/ObservableCacheEx.cs index c593c2a85..07c8c9443 100644 --- a/src/DynamicData/Cache/ObservableCacheEx.cs +++ b/src/DynamicData/Cache/ObservableCacheEx.cs @@ -1357,9 +1357,14 @@ public static class ObservableCacheEx /// or /// timeSelector. /// - public static IObservable> ExpireAfter(this IObservable> source, Func timeSelector) - where TObject : notnull - where TKey : notnull => ExpireAfter(source, timeSelector, GlobalConfig.DefaultScheduler); + public static IObservable> ExpireAfter( + this IObservable> source, + Func timeSelector) + where TObject : notnull + where TKey : notnull + => Cache.Internal.ExpireAfter.ForStream.Create( + source: source, + timeSelector: timeSelector); /// /// Automatically removes items from the stream after the time specified by @@ -1376,15 +1381,16 @@ public static class ObservableCacheEx /// or /// timeSelector. /// - public static IObservable> ExpireAfter(this IObservable> source, Func timeSelector, IScheduler scheduler) - where TObject : notnull - where TKey : notnull - { - source.ThrowArgumentNullExceptionIfNull(nameof(source)); - timeSelector.ThrowArgumentNullExceptionIfNull(nameof(timeSelector)); - - return source.ExpireAfter(timeSelector, null, scheduler); - } + public static IObservable> ExpireAfter( + this IObservable> source, + Func timeSelector, + IScheduler scheduler) + where TObject : notnull + where TKey : notnull + => Cache.Internal.ExpireAfter.ForStream.Create( + source: source, + timeSelector: timeSelector, + scheduler: scheduler); /// /// Automatically removes items from the stream on the next poll after the time specified by @@ -1401,9 +1407,16 @@ public static class ObservableCacheEx /// source /// or /// timeSelector. - public static IObservable> ExpireAfter(this IObservable> source, Func timeSelector, TimeSpan? pollingInterval) - where TObject : notnull - where TKey : notnull => ExpireAfter(source, timeSelector, pollingInterval, GlobalConfig.DefaultScheduler); + public static IObservable> ExpireAfter( + this IObservable> source, + Func timeSelector, + TimeSpan? pollingInterval) + where TObject : notnull + where TKey : notnull + => Cache.Internal.ExpireAfter.ForStream.Create( + source: source, + timeSelector: timeSelector, + pollingInterval: pollingInterval); /// /// Automatically removes items from the stream on the next poll after the time specified by @@ -1421,15 +1434,18 @@ public static class ObservableCacheEx /// source /// or /// timeSelector. - public static IObservable> ExpireAfter(this IObservable> source, Func timeSelector, TimeSpan? pollingInterval, IScheduler scheduler) - where TObject : notnull - where TKey : notnull - { - source.ThrowArgumentNullExceptionIfNull(nameof(source)); - timeSelector.ThrowArgumentNullExceptionIfNull(nameof(timeSelector)); - - return new TimeExpirer(source, timeSelector, pollingInterval, scheduler).ExpireAfter(); - } + public static IObservable> ExpireAfter( + this IObservable> source, + Func timeSelector, + TimeSpan? pollingInterval, + IScheduler scheduler) + where TObject : notnull + where TKey : notnull + => Cache.Internal.ExpireAfter.ForStream.Create( + source: source, + timeSelector: timeSelector, + pollingInterval: pollingInterval, + scheduler: scheduler); /// /// Automatically removes items from the cache after the time specified by @@ -1444,9 +1460,16 @@ public static class ObservableCacheEx /// source /// or /// timeSelector. - public static IObservable>> ExpireAfter(this ISourceCache source, Func timeSelector, IScheduler? scheduler = null) - where TObject : notnull - where TKey : notnull => source.ExpireAfter(timeSelector, null, scheduler); + public static IObservable>> ExpireAfter( + this ISourceCache source, + Func timeSelector, + IScheduler? scheduler = null) + where TObject : notnull + where TKey : notnull + => Cache.Internal.ExpireAfter.ForSource.Create( + source: source, + timeSelector: timeSelector, + scheduler: scheduler); /// /// Automatically removes items from the cache after the time specified by @@ -1463,9 +1486,16 @@ public static class ObservableCacheEx /// source /// or /// timeSelector. - public static IObservable>> ExpireAfter(this ISourceCache source, Func timeSelector, TimeSpan? interval = null) - where TObject : notnull - where TKey : notnull => ExpireAfter(source, timeSelector, interval, GlobalConfig.DefaultScheduler); + public static IObservable>> ExpireAfter( + this ISourceCache source, + Func timeSelector, + TimeSpan? interval = null) + where TObject : notnull + where TKey : notnull + => Cache.Internal.ExpireAfter.ForSource.Create( + source: source, + timeSelector: timeSelector, + pollingInterval: interval); /// /// Ensures there are no duplicated keys in the observable changeset. @@ -1499,37 +1529,18 @@ public static class ObservableCacheEx /// source /// or /// timeSelector. - public static IObservable>> ExpireAfter(this ISourceCache source, Func timeSelector, TimeSpan? pollingInterval, IScheduler? scheduler) - where TObject : notnull - where TKey : notnull - { - source.ThrowArgumentNullExceptionIfNull(nameof(source)); - timeSelector.ThrowArgumentNullExceptionIfNull(nameof(timeSelector)); - - scheduler ??= GlobalConfig.DefaultScheduler; - - return Observable.Create>>( - observer => source.Connect().ForExpiry(timeSelector, pollingInterval, scheduler).Finally(observer.OnCompleted).Subscribe( - toRemove => - { - try - { - // remove from cache and notify which items have been auto removed - var keyValuePairs = toRemove as KeyValuePair[] ?? toRemove.AsArray(); - if (keyValuePairs.Length == 0) - { - return; - } - - source.Remove(keyValuePairs.Select(kv => kv.Key)); - observer.OnNext(keyValuePairs); - } - catch (Exception ex) - { - observer.OnError(ex); - } - })); - } + public static IObservable>> ExpireAfter( + this ISourceCache source, + Func timeSelector, + TimeSpan? pollingInterval, + IScheduler? scheduler) + where TObject : notnull + where TKey : notnull + => Cache.Internal.ExpireAfter.ForSource.Create( + source: source, + timeSelector: timeSelector, + pollingInterval: pollingInterval, + scheduler: scheduler); /// /// Filters the specified source. @@ -6344,26 +6355,6 @@ sortOrder switch return sources.Combine(CombineOperator.Xor); } - /// - /// Automatically removes items from the cache after the time specified by - /// the time selector elapses. - /// - /// The type of the object. - /// The type of the key. - /// The cache. - /// The time selector. Return null if the item should never be removed. - /// A polling interval. Since multiple timer subscriptions can be expensive, - /// it may be worth setting the interval. - /// - /// The scheduler. - /// An observable of enumerable of the key values which has been removed. - /// source - /// or - /// timeSelector. - internal static IObservable>> ForExpiry(this IObservable> source, Func timeSelector, TimeSpan? interval, IScheduler scheduler) - where TObject : notnull - where TKey : notnull => new TimeExpirer(source, timeSelector, interval, scheduler).ForExpiry(); - private static IObservable> Combine(this IObservableList> source, CombineOperator type) where TObject : notnull where TKey : notnull diff --git a/src/DynamicData/DynamicData.csproj b/src/DynamicData/DynamicData.csproj index 61c4f6809..ad0f30c94 100644 --- a/src/DynamicData/DynamicData.csproj +++ b/src/DynamicData/DynamicData.csproj @@ -19,4 +19,8 @@ Dynamic Data is a comprehensive caching and data manipulation solution which int + + + + diff --git a/src/DynamicData/List/Internal/ExpireAfter.cs b/src/DynamicData/List/Internal/ExpireAfter.cs index e000bb2a8..7c7ee3a59 100644 --- a/src/DynamicData/List/Internal/ExpireAfter.cs +++ b/src/DynamicData/List/Internal/ExpireAfter.cs @@ -2,79 +2,418 @@ // Roland Pheasant licenses this file to you under the MIT license. // See the LICENSE file in the project root for full license information. +using System.Reactive; using System.Reactive.Concurrency; -using System.Reactive.Disposables; using System.Reactive.Linq; -using DynamicData.Kernel; - namespace DynamicData.List.Internal; -internal sealed class ExpireAfter(ISourceList sourceList, Func expireAfter, TimeSpan? pollingInterval, IScheduler scheduler, object locker) +internal sealed class ExpireAfter where T : notnull { - private readonly Func _expireAfter = expireAfter ?? throw new ArgumentNullException(nameof(expireAfter)); - private readonly IScheduler _scheduler = scheduler ?? throw new ArgumentNullException(nameof(scheduler)); + public static IObservable> Create( + ISourceList source, + Func timeSelector, + TimeSpan? pollingInterval = null, + IScheduler? scheduler = null) + { + source.ThrowArgumentNullExceptionIfNull(nameof(source)); + timeSelector.ThrowArgumentNullExceptionIfNull(nameof(timeSelector)); + + return Observable.Create>(observer => (pollingInterval is TimeSpan pollingIntervalValue) + ? new PollingSubscription( + observer: observer, + pollingInterval: pollingIntervalValue, + scheduler: scheduler, + source: source, + timeSelector: timeSelector) + : new OnDemandSubscription( + observer: observer, + scheduler: scheduler, + source: source, + timeSelector: timeSelector)); + } + + private abstract class SubscriptionBase + : IDisposable + { + private readonly List _expirationDueTimes; + private readonly List _expiringIndexesBuffer; + private readonly IObserver> _observer; + private readonly IScheduler _scheduler; + private readonly ISourceList _source; + private readonly IDisposable _sourceSubscription; + private readonly Func _timeSelector; + + private bool _hasSourceCompleted; + private ScheduledManagement? _nextScheduledManagement; + + protected SubscriptionBase( + IObserver> observer, + IScheduler? scheduler, + ISourceList source, + Func timeSelector) + { + _observer = observer; + _source = source; + _timeSelector = timeSelector; + + _scheduler = scheduler ?? GlobalConfig.DefaultScheduler; - private readonly ISourceList _sourceList = sourceList ?? throw new ArgumentNullException(nameof(sourceList)); + _expirationDueTimes = new(); + _expiringIndexesBuffer = new(); - public IObservable> Run() => Observable.Create>( - observer => + _sourceSubscription = source + .Connect() + // It's important to set this flag outside the context of a lock, because it'll be read outside of lock as well. + .Finally(() => _hasSourceCompleted = true) + .Synchronize(SynchronizationGate) + .SubscribeSafe(Observer.Create>( + onNext: OnSourceNext, + onError: OnSourceError, + onCompleted: OnSourceCompleted)); + } + + public void Dispose() + { + lock (SynchronizationGate) { - var dateTime = _scheduler.Now.UtcDateTime; - long orderItemWasAdded = -1; + _sourceSubscription.Dispose(); - var autoRemover = _sourceList.Connect().Synchronize(locker).Do(_ => dateTime = _scheduler.Now.UtcDateTime).Cast( - t => - { - var removeAt = _expireAfter(t); - var expireAt = removeAt.HasValue ? dateTime.Add(removeAt.Value) : DateTime.MaxValue; - return new ExpirableItem(t, expireAt, Interlocked.Increment(ref orderItemWasAdded)); - }).AsObservableList(); + TryCancelNextScheduledManagement(); + } + } + + protected IScheduler Scheduler + => _scheduler; + + // Instead of using a dedicated _synchronizationGate object, we can save an allocation by using any object that is never exposed to public consumers. + protected object SynchronizationGate + => _expirationDueTimes; + + protected abstract DateTimeOffset? GetNextManagementDueTime(); - void RemovalAction() + protected DateTimeOffset? GetNextProposedExpirationDueTime() + { + var result = null as DateTimeOffset?; + + foreach (var dueTime in _expirationDueTimes) + { + if ((dueTime is DateTimeOffset value) && ((result is null) || (value < result))) + result = value; + } + + return result; + } + + protected abstract void OnExpirationsManaged(DateTimeOffset dueTime); + + private void ManageExpirations() + { + // This check is needed, to make sure we don't try and call .Edit() on a disposed _source, + // since the scheduler only promises "best effort" to cancel a scheduled action. + // It's safe to skip locking here becuase once this flag is set, it's never unset. + if (_hasSourceCompleted) + return; + + // Putting the entire management process here inside a .Edit() call for a couple of reasons: + // - It keeps the edit delegate from becoming a closure + // - It batches multiple expirations occurring at the same time into one source operation, so it only emits one changeset. + // - It eliminates the possibility of our internal state/item caches becoming out-of-sync with _source, by effectively locking _source. + // - It eliminates a rare deadlock that I honestly can't fully explain, but was able to reproduce reliably with few hundred iterations of the ThreadPoolSchedulerIsUsedWithoutPolling_ExpirationIsThreadSafe test, on the Cache-equivalent of this operator. + _source.Edit(updater => + { + lock (SynchronizationGate) { - try + // The scheduler only promises "best effort" to cancel scheduled operations, so we need to make sure. + if (_nextScheduledManagement is not ScheduledManagement thisScheduledManagement) + return; + + _nextScheduledManagement = null; + + var now = Scheduler.Now; + + // One major note here: we are NOT updating our internal state, except to mark items as no longer needing to expire. + // Once we're done with the source.Edit() here, it will fire of a changeset for the removals, which will get handled by OnSourceNext(), + // thus bringing all of our internal state back into sync. + + // Buffer removals, so we can eliminate the need for index adjustments as we update the source + for (var i = 0; i < _expirationDueTimes.Count; ++i) { - lock (locker) + if ((_expirationDueTimes[i] is DateTimeOffset dueTime) && (dueTime <= now)) { - var toRemove = autoRemover.Items.Where(ei => ei.ExpireAt <= _scheduler.Now.DateTime).Select(ei => ei.Item).ToList(); + _expiringIndexesBuffer.Add(i); - observer.OnNext(toRemove); + // This shouldn't be necessary, but it guarantees we don't accidentally expire an item more than once, + // in the event of a race condition or something we haven't predicted. + _expirationDueTimes[i] = null; } } - catch (Exception ex) + + // I'm pretty sure it shouldn't be possible to end up with no removals here, but it costs basically nothing to check. + if (_expiringIndexesBuffer.Count is not 0) { - observer.OnError(ex); + // Processing removals in reverse-index order eliminates the need for us to adjust index of each .RemoveAt() call, as we go. + _expiringIndexesBuffer.Sort(static (x, y) => y.CompareTo(x)); + + var removedItems = new T[_expiringIndexesBuffer.Count]; + for (var i = 0; i < _expiringIndexesBuffer.Count; ++i) + { + var removedIndex = _expiringIndexesBuffer[i]; + removedItems[i] = updater[removedIndex]; + updater.RemoveAt(removedIndex); + } + + _observer.OnNext(removedItems); + + _expiringIndexesBuffer.Clear(); } + + OnExpirationsManaged(thisScheduledManagement.DueTime); + + // We just changed due times, so run cleanup and management scheduling. + OnExpirationDueTimesChanged(); } + }); + } - var removalSubscription = new SingleAssignmentDisposable(); - if (pollingInterval.HasValue) + private void OnExpirationDueTimesChanged() + { + // Check if we need to re-schedule the next management operation + if (GetNextManagementDueTime() is DateTimeOffset nextManagementDueTime) + { + if (_nextScheduledManagement?.DueTime != nextManagementDueTime) { - // use polling - // ReSharper disable once InconsistentlySynchronizedField - removalSubscription.Disposable = _scheduler.ScheduleRecurringAction(pollingInterval.Value, RemovalAction); + if (_nextScheduledManagement is ScheduledManagement nextScheduledManagement) + nextScheduledManagement.Cancellation.Dispose(); + + _nextScheduledManagement = new() + { + Cancellation = _scheduler.Schedule( + dueTime: nextManagementDueTime, + action: ManageExpirations), + DueTime = nextManagementDueTime + }; } - else + } + else + { + TryCancelNextScheduledManagement(); + } + } + + private void OnSourceCompleted() + { + // If the source completes, we can no longer remove items from it, so any pending expirations are moot. + TryCancelNextScheduledManagement(); + + _observer.OnCompleted(); + } + + private void OnSourceError(Exception error) + { + TryCancelNextScheduledManagement(); + + _observer.OnError(error); + } + + private void OnSourceNext(IChangeSet changes) + { + try + { + var now = _scheduler.Now; + + var haveExpirationDueTimesChanged = false; + + foreach (var change in changes) { - // create a timer for each distinct time - removalSubscription.Disposable = autoRemover.Connect().DistinctValues(ei => ei.ExpireAt).SubscribeMany( - datetime => - { - // ReSharper disable once InconsistentlySynchronizedField - var expireAt = datetime.Subtract(_scheduler.Now.UtcDateTime); + switch (change.Reason) + { + case ListChangeReason.Add: + { + var dueTime = now + _timeSelector.Invoke(change.Item.Current); + + _expirationDueTimes.Insert( + index: change.Item.CurrentIndex, + item: dueTime); + + haveExpirationDueTimesChanged |= dueTime is not null; + } + break; + + case ListChangeReason.AddRange: + { + _expirationDueTimes.EnsureCapacity(_expirationDueTimes.Count + change.Range.Count); + + var itemIndex = change.Range.Index; + foreach (var item in change.Range) + { + var dueTime = now + _timeSelector.Invoke(item); + + _expirationDueTimes.Insert( + index: itemIndex, + item: dueTime); + + haveExpirationDueTimesChanged |= dueTime is not null; + + ++itemIndex; + } + } + break; + + case ListChangeReason.Clear: + foreach (var dueTime in _expirationDueTimes) + { + if (dueTime is not null) + { + haveExpirationDueTimesChanged = true; + break; + } + } + + _expirationDueTimes.Clear(); + break; + + case ListChangeReason.Moved: + { + var expirationDueTime = _expirationDueTimes[change.Item.PreviousIndex]; + + _expirationDueTimes.RemoveAt(change.Item.PreviousIndex); + _expirationDueTimes.Insert( + index: change.Item.CurrentIndex, + item: expirationDueTime); + } + break; + + case ListChangeReason.Remove: + { + if (_expirationDueTimes[change.Item.CurrentIndex] is not null) + { + haveExpirationDueTimesChanged = true; + } + + _expirationDueTimes.RemoveAt(change.Item.CurrentIndex); + } + break; + + case ListChangeReason.RemoveRange: + { + var rangeEndIndex = change.Range.Index + change.Range.Count - 1; + for (var i = change.Range.Index; i <= rangeEndIndex; ++i) + { + if (_expirationDueTimes[i] is not null) + { + haveExpirationDueTimesChanged = true; + break; + } + } + + _expirationDueTimes.RemoveRange(change.Range.Index, change.Range.Count); + } + break; + + case ListChangeReason.Replace: + { + var oldDueTime = _expirationDueTimes[change.Item.CurrentIndex]; + var newDueTime = now + _timeSelector.Invoke(change.Item.Current); + + // Ignoring the possibility that the item's index has changed as well, because ISourceList does not allow for this. - // ReSharper disable once InconsistentlySynchronizedField - return Observable.Timer(expireAt, _scheduler).Take(1).Subscribe(_ => RemovalAction()); - }).Subscribe(); + _expirationDueTimes[change.Item.CurrentIndex] = newDueTime; + + haveExpirationDueTimesChanged |= newDueTime != oldDueTime; + } + break; + + // Ignoring Refresh changes, since ISourceList doesn't generate them. + } } - return Disposable.Create( - () => - { - removalSubscription.Dispose(); - autoRemover.Dispose(); - }); - }); + if (haveExpirationDueTimesChanged) + OnExpirationDueTimesChanged(); + } + catch (Exception error) + { + TryCancelNextScheduledManagement(); + + _observer.OnError(error); + } + } + + private void TryCancelNextScheduledManagement() + { + _nextScheduledManagement?.Cancellation.Dispose(); + _nextScheduledManagement = null; + } + + private readonly record struct ScheduledManagement + { + public required IDisposable Cancellation { get; init; } + + public required DateTimeOffset DueTime { get; init; } + } + } + + private sealed class OnDemandSubscription + : SubscriptionBase + { + public OnDemandSubscription( + IObserver> observer, + IScheduler? scheduler, + ISourceList source, + Func timeSelector) + : base( + observer, + scheduler, + source, + timeSelector) + { + } + + protected override DateTimeOffset? GetNextManagementDueTime() + => GetNextProposedExpirationDueTime(); + + protected override void OnExpirationsManaged(DateTimeOffset dueTime) + { + } + } + + private sealed class PollingSubscription + : SubscriptionBase + { + private readonly TimeSpan _pollingInterval; + + private DateTimeOffset _lastManagementDueTime; + + public PollingSubscription( + IObserver> observer, + TimeSpan pollingInterval, + IScheduler? scheduler, + ISourceList source, + Func timeSelector) + : base( + observer, + scheduler, + source, + timeSelector) + { + _pollingInterval = pollingInterval; + + _lastManagementDueTime = Scheduler.Now; + } + + protected override DateTimeOffset? GetNextManagementDueTime() + { + var now = Scheduler.Now; + var nextDueTime = _lastManagementDueTime + _pollingInterval; + + // Make sure we don't flood the system with polls, if the processing time of a poll ever exceeds the polling interval. + return (nextDueTime > now) + ? nextDueTime + : now; + } + + protected override void OnExpirationsManaged(DateTimeOffset dueTime) + => _lastManagementDueTime = dueTime; + } } diff --git a/src/DynamicData/List/ObservableListEx.cs b/src/DynamicData/List/ObservableListEx.cs index cca8351f1..c196b37aa 100644 --- a/src/DynamicData/List/ObservableListEx.cs +++ b/src/DynamicData/List/ObservableListEx.cs @@ -660,8 +660,16 @@ public static IObservable> Except(this IObservableListSelector returning when to expire the item. Return null for non-expiring item. /// The scheduler. /// An observable which emits the enumerable of items. - public static IObservable> ExpireAfter(this ISourceList source, Func timeSelector, IScheduler? scheduler = null) - where T : notnull => source.ExpireAfter(timeSelector, null, scheduler); + public static IObservable> ExpireAfter( + this ISourceList source, + Func timeSelector, + IScheduler? scheduler = null) + where T : notnull + => List.Internal.ExpireAfter.Create( + source: source, + timeSelector: timeSelector, + pollingInterval: null, + scheduler: scheduler); /// /// Removes items from the cache according to the value specified by the time selector function. @@ -672,18 +680,17 @@ public static IObservable> ExpireAfter(this ISourceList sou /// Enter the polling interval to optimise expiry timers, if omitted 1 timer is created for each unique expiry time. /// The scheduler. /// An observable which emits the enumerable of items. - public static IObservable> ExpireAfter(this ISourceList source, Func timeSelector, TimeSpan? pollingInterval = null, IScheduler? scheduler = null) - where T : notnull - { - source.ThrowArgumentNullExceptionIfNull(nameof(source)); - - timeSelector.ThrowArgumentNullExceptionIfNull(nameof(timeSelector)); - - var locker = new object(); - var limiter = new ExpireAfter(source, timeSelector, pollingInterval, scheduler ?? GlobalConfig.DefaultScheduler, locker); - - return limiter.Run().Synchronize(locker).Do(source.RemoveMany); - } + public static IObservable> ExpireAfter( + this ISourceList source, + Func timeSelector, + TimeSpan? pollingInterval = null, + IScheduler? scheduler = null) + where T : notnull + => List.Internal.ExpireAfter.Create( + source: source, + timeSelector: timeSelector, + pollingInterval: pollingInterval, + scheduler: scheduler); /// /// Filters the source using the specified valueSelector. diff --git a/src/DynamicData/Polyfills/ListEnsureCapacity.cs b/src/DynamicData/Polyfills/ListEnsureCapacity.cs new file mode 100644 index 000000000..d07461812 --- /dev/null +++ b/src/DynamicData/Polyfills/ListEnsureCapacity.cs @@ -0,0 +1,16 @@ +// Copyright (c) 2011-2023 Roland Pheasant. All rights reserved. +// Roland Pheasant licenses this file to you under the MIT license. +// See the LICENSE file in the project root for full license information. + +namespace System.Collections.Generic; + +internal static class ListEnsureCapacity +{ + public static void EnsureCapacity( + this List list, + int capacity) + { + if (list.Capacity < capacity) + list.Capacity = capacity; + } +}