Skip to content

Commit

Permalink
Re-designed ExpireAfter operators, from scratch, eliminating a variet…
Browse files Browse the repository at this point in the history
…y of defects, including #716.
  • Loading branch information
JakenVeina committed Feb 26, 2024
1 parent 5b14497 commit 9425f24
Show file tree
Hide file tree
Showing 15 changed files with 1,415 additions and 290 deletions.
12 changes: 6 additions & 6 deletions .editorconfig
Original file line number Diff line number Diff line change
Expand Up @@ -370,7 +370,7 @@ dotnet_diagnostic.SA1136.severity = error
dotnet_diagnostic.SA1137.severity = error
dotnet_diagnostic.SA1139.severity = error
dotnet_diagnostic.SA1200.severity = none
dotnet_diagnostic.SA1201.severity = error
dotnet_diagnostic.SA1201.severity = none
dotnet_diagnostic.SA1202.severity = error
dotnet_diagnostic.SA1203.severity = error
dotnet_diagnostic.SA1204.severity = error
Expand Down Expand Up @@ -424,10 +424,10 @@ dotnet_diagnostic.SA1508.severity = error
dotnet_diagnostic.SA1509.severity = error
dotnet_diagnostic.SA1510.severity = error
dotnet_diagnostic.SA1511.severity = error
dotnet_diagnostic.SA1512.severity = error
dotnet_diagnostic.SA1513.severity = error
dotnet_diagnostic.SA1512.severity = none
dotnet_diagnostic.SA1513.severity = none
dotnet_diagnostic.SA1514.severity = error
dotnet_diagnostic.SA1515.severity = error
dotnet_diagnostic.SA1515.severity = none
dotnet_diagnostic.SA1516.severity = error
dotnet_diagnostic.SA1517.severity = error
dotnet_diagnostic.SA1518.severity = error
Expand Down Expand Up @@ -503,7 +503,7 @@ dotnet_diagnostic.RCS1058.severity=warning
dotnet_diagnostic.RCS1068.severity=warning
dotnet_diagnostic.RCS1073.severity=warning
dotnet_diagnostic.RCS1084.severity=error
dotnet_diagnostic.RCS1085.severity=error
dotnet_diagnostic.RCS1085.severity=none
dotnet_diagnostic.RCS1105.severity=error
dotnet_diagnostic.RCS1112.severity=error
dotnet_diagnostic.RCS1128.severity=error
Expand Down Expand Up @@ -547,4 +547,4 @@ end_of_line = lf
[*.{cmd, bat}]
end_of_line = crlf

vsspell_dictionary_languages = en-US
vsspell_dictionary_languages = en-US
47 changes: 35 additions & 12 deletions src/DynamicData.Tests/Cache/ExpireAfterFixture.ForSource.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
using Xunit.Abstractions;

using DynamicData.Tests.Utilities;
using System.Diagnostics;

namespace DynamicData.Tests.Cache;

Expand Down Expand Up @@ -270,7 +271,7 @@ public void PollingIntervalIsGiven_RemovalsAreScheduledAtInterval()
results.TryGetRecordedCompletion().Should().BeFalse();
}

[Fact(Skip = "Existing defect, very minor defect, items defined to never expire actually do, at DateTimeOffset.MaxValue")]
[Fact]
public void PollingIntervalIsNotGiven_RemovalsAreScheduledImmediately()
{
using var source = CreateTestSource();
Expand Down Expand Up @@ -390,7 +391,7 @@ public void PollingIntervalIsNotGiven_RemovalsAreScheduledImmediately()
}

// Covers https://github.com/reactivemarbles/DynamicData/issues/716
[Fact(Skip = "Existing defect, removals are skipped when scheduler invokes early")]
[Fact]
public void SchedulerIsInaccurate_RemovalsAreNotSkipped()
{
using var source = CreateTestSource();
Expand Down Expand Up @@ -425,7 +426,7 @@ public void SchedulerIsInaccurate_RemovalsAreNotSkipped()
results.TryGetRecordedCompletion().Should().BeFalse();
}

[Fact(Skip = "Existing defect, completion is not propagated from the source")]
[Fact]
public void SourceCompletes_CompletionIsPropagated()
{
using var source = CreateTestSource();
Expand Down Expand Up @@ -454,7 +455,7 @@ public void SourceCompletes_CompletionIsPropagated()
results.EnumerateInvalidNotifications().Should().BeEmpty();
}

[Fact(Skip = "Existing defect, completion is not propagated from the source")]
[Fact]
public void SourceCompletesImmediately_CompletionIsPropagated()
{
using var source = CreateTestSource();
Expand Down Expand Up @@ -482,7 +483,7 @@ public void SourceCompletesImmediately_CompletionIsPropagated()
results.EnumerateInvalidNotifications().Should().BeEmpty();
}

[Fact(Skip = "Exsiting defect, errors are re-thrown instead of propagated, operator does not use safe subscriptions")]
[Fact]
public void SourceErrors_ErrorIsPropagated()
{
using var source = CreateTestSource();
Expand Down Expand Up @@ -512,7 +513,7 @@ public void SourceErrors_ErrorIsPropagated()
results.EnumerateInvalidNotifications().Should().BeEmpty();
}

[Fact(Skip = "Existing defect, immediately-occuring error is not propagated")]
[Fact]
public void SourceErrorsImmediately_ErrorIsPropagated()
{
using var source = CreateTestSource();
Expand Down Expand Up @@ -552,7 +553,7 @@ public void SourceIsNull_ThrowsException()
interval: null))
.Should().Throw<ArgumentNullException>();

[Fact(Skip = "Existing defect, operator does not properly handle items with a null timeout, when using a real scheduler, it passes a TimeSpan to the scheduler that is outside of the supported range")]
[Fact]
public async Task ThreadPoolSchedulerIsUsedWithoutPolling_ExpirationIsThreadSafe()
{
using var source = new TestSourceCache<StressItem, int>(static item => item.Id);
Expand All @@ -573,8 +574,7 @@ public async Task ThreadPoolSchedulerIsUsedWithoutPolling_ExpirationIsThreadSafe
maxItemLifetime: TimeSpan.FromMilliseconds(10),
maxChangeCount: 10);

// Give the thread pool an extra 100ms to finish all the expirations
await Observable.Timer(TimeSpan.FromMilliseconds(100), scheduler);
await WaitForCompletionAsync(source, results, timeout: TimeSpan.FromMinutes(1));

results.TryGetRecordedError().Should().BeNull();
results.EnumerateRecordedValues().SelectMany(static removals => removals).Should().AllSatisfy(static pair => pair.Value.Lifetime.Should().NotBeNull("only items with an expiration should have expired"));
Expand Down Expand Up @@ -606,8 +606,7 @@ public async Task ThreadPoolSchedulerIsUsedWithPolling_ExpirationIsThreadSafe()
maxItemLifetime: TimeSpan.FromMilliseconds(10),
maxChangeCount: 10);

// Give the thread pool an extra 100ms to finish all the expirations
await Observable.Timer(TimeSpan.FromMilliseconds(100), scheduler);
await WaitForCompletionAsync(source, results, timeout: TimeSpan.FromMinutes(1));

results.TryGetRecordedError().Should().BeNull();
results.EnumerateRecordedValues().SelectMany(static removals => removals).Should().AllSatisfy(pair => pair.Value.Lifetime.Should().NotBeNull("only items with an expiration should have expired"));
Expand All @@ -624,7 +623,7 @@ public void TimeSelectorIsNull_ThrowsException()
interval: null))
.Should().Throw<ArgumentNullException>();

[Fact(Skip = "Exsiting defect, errors are re-thrown instead of propagated, user code is not protected")]
[Fact]
public void TimeSelectorThrows_ErrorIsPropagated()
{
using var source = CreateTestSource();
Expand Down Expand Up @@ -743,5 +742,29 @@ public void TimeSelectorThrows_ErrorIsPropagated()
});
}
}

private static async Task WaitForCompletionAsync(
ISourceCache<StressItem, int> source,
TestableObserver<IEnumerable<KeyValuePair<int, StressItem>>> results,
TimeSpan timeout)
{
// Wait up to full minute for the operator to finish processing expirations
// (this is mainly a problem for GitHub PR builds, where test runs take a lot longer, due to more limited resources).
var stopwatch = new Stopwatch();
stopwatch.Start();
var pollingInterval = TimeSpan.FromMilliseconds(100);
while (stopwatch.Elapsed < timeout)
{
await Task.Delay(pollingInterval);

// Identify "completion" as either an error, a completion signal, or all expiring items being removed.
if ((results.TryGetRecordedError() is not null)
|| results.TryGetRecordedCompletion()
|| source.Items.All(static item => item.Lifetime is null))
{
break;
}
}
}
}
}
38 changes: 31 additions & 7 deletions src/DynamicData.Tests/Cache/ExpireAfterFixture.ForStream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

using DynamicData.Tests.Utilities;
using Xunit.Abstractions;
using System.Diagnostics;

namespace DynamicData.Tests.Cache;

Expand Down Expand Up @@ -356,7 +357,7 @@ public void PollingIntervalIsGiven_RemovalsAreScheduledAtInterval()
results.IsCompleted.Should().BeFalse();
}

[Fact(Skip = "Existing defect, very minor defect, items defined to never expire actually do, at DateTimeOffset.MaxValue")]
[Fact]
public void PollingIntervalIsNotGiven_RemovalsAreScheduledImmediately()
{
using var source = new Subject<IChangeSet<TestItem, int>>();
Expand Down Expand Up @@ -499,7 +500,7 @@ public void PollingIntervalIsNotGiven_RemovalsAreScheduledImmediately()
results.IsCompleted.Should().BeFalse();
}

[Fact(Skip = "Existing defect, completion does not wait")]
[Fact]
public void RemovalsArePending_CompletionWaitsForRemovals()
{
using var source = new Subject<IChangeSet<TestItem, int>>();
Expand Down Expand Up @@ -550,7 +551,7 @@ public void RemovalsArePending_CompletionWaitsForRemovals()
}

// Covers https://github.com/reactivemarbles/DynamicData/issues/716
[Fact(Skip = "Existing defect, removals are skipped when scheduler invokes early")]
[Fact]
public void SchedulerIsInaccurate_RemovalsAreNotSkipped()
{
using var source = new Subject<IChangeSet<TestItem, int>>();
Expand Down Expand Up @@ -742,7 +743,7 @@ public void SourceIsNull_ThrowsException()
timeSelector: static _ => default))
.Should().Throw<ArgumentNullException>();

[Fact(Skip = "Existing defect, operator does not properly handle items with a null timeout, when using a real scheduler, it passes a TimeSpan to the scheduler that is outside of the supported range")]
[Fact]
public async Task ThreadPoolSchedulerIsUsedWithoutPolling_ExpirationIsThreadSafe()
{
using var source = new Subject<IChangeSet<StressItem, int>>();
Expand All @@ -763,7 +764,7 @@ public async Task ThreadPoolSchedulerIsUsedWithoutPolling_ExpirationIsThreadSafe
maxItemLifetime: TimeSpan.FromMilliseconds(10),
maxChangeCount: 20);

await Observable.Timer(TimeSpan.FromMilliseconds(100), scheduler);
await WaitForCompletionAsync(results, timeout: TimeSpan.FromMinutes(1));

results.Error.Should().BeNull();
results.Data.Items.Should().AllSatisfy(item => item.Lifetime.Should().BeNull("all items with an expiration should have expired"));
Expand Down Expand Up @@ -793,7 +794,7 @@ public async Task ThreadPoolSchedulerIsUsedWithPolling_ExpirationIsThreadSafe()
maxItemLifetime: TimeSpan.FromMilliseconds(10),
maxChangeCount: 20);

await Observable.Timer(TimeSpan.FromMilliseconds(100), scheduler);
await WaitForCompletionAsync(results, timeout: TimeSpan.FromMinutes(1));

var now = scheduler.Now;

Expand All @@ -809,7 +810,7 @@ public void TimeSelectorIsNull_ThrowsException()
timeSelector: null!))
.Should().Throw<ArgumentNullException>();

[Fact(Skip = "Exsiting defect, errors are re-thrown instead of propagated, user code is not protected")]
[Fact]
public void TimeSelectorThrows_ErrorIsPropagated()
{
using var source = new Subject<IChangeSet<TestItem, int>>();
Expand Down Expand Up @@ -935,5 +936,28 @@ public void TimeSelectorThrows_ErrorIsPropagated()
foreach(var changeSet in changeSets)
source.OnNext(changeSet);
}

private static async Task WaitForCompletionAsync(
ChangeSetAggregator<StressItem, int> results,
TimeSpan timeout)
{
// Wait up to full minute for the operator to finish processing expirations
// (this is mainly a problem for GitHub PR builds, where test runs take a lot longer, due to more limited resources).
var stopwatch = new Stopwatch();
stopwatch.Start();
var pollingInterval = TimeSpan.FromMilliseconds(100);
while (stopwatch.Elapsed < timeout)
{
await Task.Delay(pollingInterval);

// Identify "completion" as either an error, a completion signal, or all expiring items being removed.
if ((results.Error is not null)
|| results.IsCompleted
|| results.Data.Items.All(static item => item.Lifetime is null))
{
break;
}
}
}
}
}
Loading

0 comments on commit 9425f24

Please sign in to comment.