diff --git a/src/DynamicData.Tests/Cache/MergeChangeSetsFixture.cs b/src/DynamicData.Tests/Cache/MergeChangeSetsFixture.cs new file mode 100644 index 000000000..1ea1a331a --- /dev/null +++ b/src/DynamicData.Tests/Cache/MergeChangeSetsFixture.cs @@ -0,0 +1,1056 @@ +using System; +using System.Collections.Generic; +using System.Diagnostics; +using System.Diagnostics.CodeAnalysis; +using System.Linq; +using System.Reactive.Linq; +using DynamicData.Kernel; +using DynamicData.Tests.Utilities; +using FluentAssertions; +using Microsoft.Reactive.Testing; +using Xunit; + +namespace DynamicData.Tests.Cache; + +public sealed class MergeChangeSetsFixture : IDisposable +{ + const int MarketCount = 101; + const int PricesPerMarket = 103; + const int RemoveCount = 53; + const int ItemIdStride = 1000; + const decimal BasePrice = 10m; + const decimal PriceOffset = 10m; + const decimal HighestPrice = BasePrice + PriceOffset + 1.0m; + const decimal LowestPrice = BasePrice - 1.0m; + + public static readonly TimeSpan Interval = TimeSpan.FromSeconds(1); + public static readonly Random Random = new(0x12291977); + + private readonly List _marketList = new(); + + public MergeChangeSetsFixture() + { + } + + [Fact] + public void NullChecks() + { + // having + var emptyChangeSetObs = Observable.Empty>(); + var nullChangeSetObs = (IObservable>)null!; + var emptyChangeSetObsObs = Observable.Empty>>(); + var nullChangeSetObsObs = (IObservable>>)null!; + var nullComparer = (IComparer)null!; + var nullEqualityComparer = (IEqualityComparer)null!; + var nullChangeSetObsEnum = (IEnumerable>>)null!; + var emptyChangeSetObsEnum = Enumerable.Empty>>(); + var comparer = new NoOpComparer(); + var equalityComparer = new NoOpEqualityComparer(); + + // when + var obsobs = () => nullChangeSetObsObs.MergeChangeSets(); + var obsobsComp = () => nullChangeSetObsObs.MergeChangeSets(comparer); + var obsobsComp1 = () => emptyChangeSetObsObs.MergeChangeSets(nullComparer); + var obsobsEq = () => nullChangeSetObsObs.MergeChangeSets(equalityComparer); + var obsobsEq1 = () => emptyChangeSetObsObs.MergeChangeSets(nullEqualityComparer); + var obsobsEqComp = () => nullChangeSetObsObs.MergeChangeSets(equalityComparer, comparer); + var obsobsEqComp1 = () => emptyChangeSetObsObs.MergeChangeSets(nullEqualityComparer, comparer); + var obsobsEqComp2 = () => emptyChangeSetObsObs.MergeChangeSets(equalityComparer, nullComparer); + + var obspair = () => nullChangeSetObs.MergeChangeSets(emptyChangeSetObs); + var obspairB = () => emptyChangeSetObs.MergeChangeSets(nullChangeSetObs); + var obspairComp = () => nullChangeSetObs.MergeChangeSets(emptyChangeSetObs, comparer); + var obspairCompB = () => emptyChangeSetObs.MergeChangeSets(nullChangeSetObs, comparer); + var obspairComp1 = () => emptyChangeSetObs.MergeChangeSets(emptyChangeSetObs, nullComparer); + var obspairEq = () => nullChangeSetObs.MergeChangeSets(emptyChangeSetObs, equalityComparer); + var obspairEqB = () => emptyChangeSetObs.MergeChangeSets(nullChangeSetObs, equalityComparer); + var obspairEq1 = () => emptyChangeSetObs.MergeChangeSets(emptyChangeSetObs, nullEqualityComparer); + var obspairEqComp = () => nullChangeSetObs.MergeChangeSets(emptyChangeSetObs, equalityComparer, comparer); + var obspairEqCompB = () => emptyChangeSetObs.MergeChangeSets(nullChangeSetObs, equalityComparer, comparer); + var obspairEqComp1 = () => emptyChangeSetObs.MergeChangeSets(emptyChangeSetObs, nullEqualityComparer, comparer); + var obspairEqComp2 = () => emptyChangeSetObs.MergeChangeSets(emptyChangeSetObs, equalityComparer, nullComparer); + + var obsEnum = () => nullChangeSetObs.MergeChangeSets(emptyChangeSetObsEnum); + var obsEnumB = () => emptyChangeSetObs.MergeChangeSets(nullChangeSetObsEnum); + var obsEnumComp = () => nullChangeSetObs.MergeChangeSets(emptyChangeSetObsEnum, comparer); + var obsEnumCompB = () => emptyChangeSetObs.MergeChangeSets(nullChangeSetObsEnum, comparer); + var obsEnumComp1 = () => emptyChangeSetObs.MergeChangeSets(emptyChangeSetObsEnum, nullComparer); + var obsEnumEq = () => nullChangeSetObs.MergeChangeSets(emptyChangeSetObsEnum, equalityComparer); + var obsEnumEqB = () => emptyChangeSetObs.MergeChangeSets(nullChangeSetObsEnum, equalityComparer); + var obsEnumEq1 = () => emptyChangeSetObs.MergeChangeSets(emptyChangeSetObsEnum, nullEqualityComparer); + var obsEnumEqComp = () => nullChangeSetObs.MergeChangeSets(emptyChangeSetObsEnum, equalityComparer, comparer); + var obsEnumEqCompB = () => emptyChangeSetObs.MergeChangeSets(nullChangeSetObsEnum, equalityComparer, comparer); + var obsEnumEqComp1 = () => emptyChangeSetObs.MergeChangeSets(emptyChangeSetObsEnum, nullEqualityComparer, comparer); + var obsEnumEqComp2 = () => emptyChangeSetObs.MergeChangeSets(emptyChangeSetObsEnum, equalityComparer, nullComparer); + + var enumObs = () => nullChangeSetObsEnum.MergeChangeSets(); + var enumObsComp = () => nullChangeSetObsEnum.MergeChangeSets(comparer); + var enumObsComp1 = () => emptyChangeSetObsEnum.MergeChangeSets(nullComparer); + var enumObsEq = () => nullChangeSetObsEnum.MergeChangeSets(equalityComparer); + var enumObsEq1 = () => emptyChangeSetObsEnum.MergeChangeSets(nullEqualityComparer); + var enumObsEqComp = () => nullChangeSetObsEnum.MergeChangeSets(equalityComparer, comparer); + var enumObsEqComp1 = () => emptyChangeSetObsEnum.MergeChangeSets(nullEqualityComparer, comparer); + var enumObsEqComp2 = () => emptyChangeSetObsEnum.MergeChangeSets(equalityComparer, nullComparer); + + // then + emptyChangeSetObs.Should().NotBeNull(); + emptyChangeSetObsObs.Should().NotBeNull(); + emptyChangeSetObsEnum.Should().NotBeNull(); + comparer.Should().NotBeNull(); + equalityComparer.Should().NotBeNull(); + nullChangeSetObs.Should().BeNull(); + nullChangeSetObsObs.Should().BeNull(); + nullComparer.Should().BeNull(); + nullEqualityComparer.Should().BeNull(); + nullChangeSetObsEnum.Should().BeNull(); + + obsobs.Should().Throw(); + obsobsComp.Should().Throw(); + obsobsComp1.Should().Throw(); + obsobsEq.Should().Throw(); + obsobsEq1.Should().Throw(); + obsobsEqComp.Should().Throw(); + obsobsEqComp1.Should().Throw(); + obsobsEqComp2.Should().Throw(); + obspair.Should().Throw(); + obspairB.Should().Throw(); + obspairComp.Should().Throw(); + obspairCompB.Should().Throw(); + obspairComp1.Should().Throw(); + obspairEq.Should().Throw(); + obspairEqB.Should().Throw(); + obspairEq1.Should().Throw(); + obspairEqComp.Should().Throw(); + obspairEqCompB.Should().Throw(); + obspairEqComp1.Should().Throw(); + obspairEqComp2.Should().Throw(); + obsEnum.Should().Throw(); + obsEnumB.Should().Throw(); + obsEnumComp.Should().Throw(); + obsEnumCompB.Should().Throw(); + obsEnumComp1.Should().Throw(); + obsEnumEq.Should().Throw(); + obsEnumEqB.Should().Throw(); + obsEnumEq1.Should().Throw(); + obsEnumEqComp.Should().Throw(); + obsEnumEqCompB.Should().Throw(); + obsEnumEqComp1.Should().Throw(); + obsEnumEqComp2.Should().Throw(); + enumObs.Should().Throw(); + enumObsComp.Should().Throw(); + enumObsComp1.Should().Throw(); + enumObsEq.Should().Throw(); + enumObsEq1.Should().Throw(); + enumObsEqComp.Should().Throw(); + enumObsEqComp1.Should().Throw(); + enumObsEqComp2.Should().Throw(); + } + + [Fact] + public void AllExistingItemsPresentInResult() + { + // having + _marketList.AddRange(Enumerable.Range(0, MarketCount).Select(n => new Market(n))); + _marketList.ForEach((m, index) => m.AddUniquePrices(Random, index, PricesPerMarket)); + + // when + using var pricesCache = _marketList.Select(m => m.LatestPrices).MergeChangeSets(MarketPrice.EqualityComparer).AsObservableCache(); + using var results = pricesCache.Connect().AsAggregator(); + + // then + _marketList.Count.Should().Be(MarketCount); + _marketList.Sum(m => m.PricesCache.Count).Should().Be(MarketCount * PricesPerMarket); + results.Data.Count.Should().Be(MarketCount * PricesPerMarket); + results.Messages.Count.Should().Be(1); + results.Summary.Overall.Adds.Should().Be(MarketCount * PricesPerMarket); + results.Summary.Overall.Removes.Should().Be(0); + results.Summary.Overall.Updates.Should().Be(0); + results.Summary.Overall.Refreshes.Should().Be(0); + } + + [Fact] + public void AllNewSubItemsPresentInResult() + { + // having + _marketList.AddRange(Enumerable.Range(0, MarketCount).Select(n => new Market(n))); + using var pricesCache = _marketList.Select(m => m.LatestPrices).MergeChangeSets(MarketPrice.EqualityComparer).AsObservableCache(); + using var results = pricesCache.Connect().AsAggregator(); + + // when + _marketList.ForEach((m, index) => m.AddUniquePrices(Random, index, PricesPerMarket)); + + // then + _marketList.Count.Should().Be(MarketCount); + _marketList.Sum(m => m.PricesCache.Count).Should().Be(MarketCount * PricesPerMarket); + results.Data.Count.Should().Be(MarketCount * PricesPerMarket); + results.Messages.Count.Should().Be(MarketCount); + results.Summary.Overall.Adds.Should().Be(MarketCount * PricesPerMarket); + results.Summary.Overall.Removes.Should().Be(0); + results.Summary.Overall.Updates.Should().Be(0); + results.Summary.Overall.Refreshes.Should().Be(0); + } + + [Fact] + public void AllRefreshedSubItemsAreRefreshed() + { + // having + _marketList.AddRange(Enumerable.Range(0, MarketCount).Select(n => new Market(n))); + using var results = _marketList.Select(m => m.LatestPrices).MergeChangeSets(MarketPrice.EqualityComparer).AsAggregator(); + _marketList.ForEach((m, index) => m.AddUniquePrices(Random, index, PricesPerMarket)); + + // when + _marketList.ForEach(m => m.RefreshAllPrices(Random)); + + // then + _marketList.Count.Should().Be(MarketCount); + results.Data.Count.Should().Be(MarketCount * PricesPerMarket); + results.Messages.Count.Should().Be(MarketCount * 2); + results.Summary.Overall.Adds.Should().Be(MarketCount * PricesPerMarket); + results.Summary.Overall.Removes.Should().Be(0); + results.Summary.Overall.Updates.Should().Be(0); + results.Summary.Overall.Refreshes.Should().Be(MarketCount * PricesPerMarket); + } + + [Fact] + public void AnyDuplicateKeyValuesShouldBeHidden() + { + // having + _marketList.AddRange(Enumerable.Range(0, 2).Select(n => new Market(n))); + using var results = _marketList.Select(m => m.LatestPrices).MergeChangeSets(MarketPrice.EqualityComparer).AsAggregator(); + + // when + _marketList[0].AddRandomPrices(Random, 0, PricesPerMarket); + _marketList[1].AddRandomPrices(Random, 0, PricesPerMarket); + + // then + _marketList.Count.Should().Be(2); + results.Data.Count.Should().Be(PricesPerMarket); + results.Data.Items.Zip(_marketList[0].PricesCache.Items).ForEach(pair => pair.First.Should().Be(pair.Second)); + results.Summary.Overall.Adds.Should().Be(PricesPerMarket); + results.Summary.Overall.Removes.Should().Be(0); + results.Summary.Overall.Updates.Should().Be(0); + } + + [Fact] + public void AnyDuplicateValuesShouldBeNoOpWhenRemoved() + { + // having + _marketList.AddRange(Enumerable.Range(0, 2).Select(n => new Market(n))); + using var results = _marketList.Select(m => m.LatestPrices).MergeChangeSets(MarketPrice.EqualityComparer).AsAggregator(); + _marketList[0].AddRandomPrices(Random, 0, PricesPerMarket); + _marketList[1].AddRandomPrices(Random, 0, PricesPerMarket); + + // when + _marketList[1].RemoveAllPrices(); + + // then + _marketList.Count.Should().Be(2); + results.Data.Count.Should().Be(PricesPerMarket); + results.Data.Items.Zip(_marketList[0].PricesCache.Items).ForEach(pair => pair.First.Should().Be(pair.Second)); + results.Summary.Overall.Adds.Should().Be(PricesPerMarket); + results.Summary.Overall.Removes.Should().Be(0); + results.Summary.Overall.Updates.Should().Be(0); + } + + [Fact] + public void AnyDuplicateValuesShouldBeUnhiddenWhenOtherIsRemoved() + { + // having + _marketList.AddRange(Enumerable.Range(0, 2).Select(n => new Market(n))); + using var results = _marketList.Select(m => m.LatestPrices).MergeChangeSets(MarketPrice.EqualityComparer).AsAggregator(); + _marketList[0].AddRandomPrices(Random, 0, PricesPerMarket); + _marketList[1].AddRandomPrices(Random, 0, PricesPerMarket); + + // when + _marketList[0].RemoveAllPrices(); + + // then + results.Data.Count.Should().Be(PricesPerMarket); + results.Data.Items.Zip(_marketList[1].PricesCache.Items).ForEach(pair => pair.First.Should().Be(pair.Second)); + results.Messages.Count.Should().Be(2); + results.Messages[1].Updates.Should().Be(PricesPerMarket); + } + + [Fact] + public void AnyDuplicateValuesShouldNotRefreshWhenHidden() + { + // having + _marketList.AddRange(Enumerable.Range(0, 2).Select(n => new Market(n))); + using var results = _marketList.Select(m => m.LatestPrices).MergeChangeSets(MarketPrice.EqualityComparer).AsAggregator(); + _marketList[0].AddRandomPrices(Random, 0, PricesPerMarket); + _marketList[1].AddRandomPrices(Random, 0, PricesPerMarket); + + // when + _marketList[1].RefreshAllPrices(Random); + + // then + _marketList.Count.Should().Be(2); + results.Data.Count.Should().Be(PricesPerMarket); + results.Summary.Overall.Refreshes.Should().Be(0); + results.Data.Items.Zip(_marketList[0].PricesCache.Items).ForEach(pair => pair.First.Should().Be(pair.Second)); + } + + [Fact] + public void AnyRemovedSubItemIsRemoved() + { + // having + _marketList.AddRange(Enumerable.Range(0, MarketCount).Select(n => new Market(n))); + using var results = _marketList.Select(m => m.LatestPrices).MergeChangeSets(MarketPrice.EqualityComparer).AsAggregator(); + _marketList.ForEach((m, index) => m.AddUniquePrices(Random, index, PricesPerMarket)); + + // when + _marketList.ForEach(m => m.PricesCache.Edit(updater => updater.RemoveKeys(updater.Keys.Take(RemoveCount).ToList()))); + + // then + _marketList.Count.Should().Be(MarketCount); + results.Data.Count.Should().Be(MarketCount * (PricesPerMarket - RemoveCount)); + results.Messages.Count.Should().Be(MarketCount * 2); + results.Messages[0].Adds.Should().Be(PricesPerMarket); + results.Summary.Overall.Adds.Should().Be(MarketCount * PricesPerMarket); + results.Summary.Overall.Removes.Should().Be(MarketCount * RemoveCount); + } + + [Fact] + public void ComparerOnlyAddsBetterAddedValues() + { + // having + var marketOriginal = Add(new Market(0)); + var marketLow = Add(new Market(1)); + var marketHigh = Add(new Market(2)); + var others = new[] { marketLow.LatestPrices, marketHigh.LatestPrices }; + using var highPriceResults = marketOriginal.LatestPrices.MergeChangeSets(others, MarketPrice.HighPriceCompare).AsAggregator(); + using var lowPriceResults = marketOriginal.LatestPrices.MergeChangeSets(others, MarketPrice.LowPriceCompare).AsAggregator(); + marketOriginal.AddRandomPrices(Random, 0, PricesPerMarket); + + // when + marketLow.UpdatePrices(0, PricesPerMarket, LowestPrice); + marketHigh.UpdatePrices(0, PricesPerMarket, HighestPrice); + + // then + _marketList.Count.Should().Be(3); + lowPriceResults.Data.Count.Should().Be(PricesPerMarket); + lowPriceResults.Summary.Overall.Adds.Should().Be(PricesPerMarket); + lowPriceResults.Summary.Overall.Updates.Should().Be(PricesPerMarket); + lowPriceResults.Data.Items.Select(cp => cp.MarketId).ForEach(guid => guid.Should().Be(marketLow.Id)); + highPriceResults.Data.Count.Should().Be(PricesPerMarket); + highPriceResults.Summary.Overall.Adds.Should().Be(PricesPerMarket); + highPriceResults.Summary.Overall.Updates.Should().Be(PricesPerMarket); + highPriceResults.Data.Items.Select(cp => cp.MarketId).ForEach(guid => guid.Should().Be(marketHigh.Id)); + } + + [Fact] + public void ComparerOnlyAddsBetterExistingValues() + { + // having + var marketOriginal = Add(new Market(0)); + var marketLow = Add(new Market(1)); + var marketHigh = Add(new Market(2)); + var others = new[] { marketLow.LatestPrices, marketHigh.LatestPrices }; + marketOriginal.AddRandomPrices(Random, 0, PricesPerMarket); + marketLow.UpdatePrices(0, PricesPerMarket, LowestPrice); + marketHigh.UpdatePrices(0, PricesPerMarket, HighestPrice); + + // when + using var highPriceResults = marketOriginal.LatestPrices.MergeChangeSets(others, MarketPrice.HighPriceCompare).AsAggregator(); + using var lowPriceResults = marketOriginal.LatestPrices.MergeChangeSets(others, MarketPrice.LowPriceCompare).AsAggregator(); + + // then + _marketList.Count.Should().Be(3); + lowPriceResults.Data.Count.Should().Be(PricesPerMarket); + lowPriceResults.Summary.Overall.Adds.Should().Be(PricesPerMarket); + lowPriceResults.Summary.Overall.Updates.Should().Be(PricesPerMarket); + lowPriceResults.Data.Items.Select(cp => cp.MarketId).ForEach(guid => guid.Should().Be(marketLow.Id)); + highPriceResults.Data.Count.Should().Be(PricesPerMarket); + highPriceResults.Summary.Overall.Adds.Should().Be(PricesPerMarket); + highPriceResults.Summary.Overall.Updates.Should().Be(PricesPerMarket); + highPriceResults.Data.Items.Select(cp => cp.MarketId).ForEach(guid => guid.Should().Be(marketHigh.Id)); + } + + [Fact] + public void ComparerUpdatesToCorrectValueOnRefresh() + { + // having + var marketOriginal = Add(new Market(0)); + var marketFlipFlop = Add(new Market(1)); + using var highPriceResults = marketOriginal.LatestPrices.MergeChangeSets(marketFlipFlop.LatestPrices, MarketPrice.HighPriceCompare).AsAggregator(); + using var lowPriceResults = marketOriginal.LatestPrices.MergeChangeSets(marketFlipFlop.LatestPrices, MarketPrice.LowPriceCompare).AsAggregator(); + marketOriginal.AddRandomPrices(Random, 0, PricesPerMarket); + marketFlipFlop.UpdatePrices(0, PricesPerMarket, HighestPrice); + + // when + marketFlipFlop.RefreshAllPrices(LowestPrice); + + // then + _marketList.Count.Should().Be(2); + lowPriceResults.Data.Count.Should().Be(PricesPerMarket); + lowPriceResults.Summary.Overall.Adds.Should().Be(PricesPerMarket); + lowPriceResults.Summary.Overall.Removes.Should().Be(0); + lowPriceResults.Summary.Overall.Updates.Should().Be(PricesPerMarket); + lowPriceResults.Summary.Overall.Refreshes.Should().Be(0); + lowPriceResults.Data.Items.Select(cp => cp.MarketId).ForEach(guid => guid.Should().Be(marketFlipFlop.Id)); + highPriceResults.Data.Count.Should().Be(PricesPerMarket); + highPriceResults.Summary.Overall.Adds.Should().Be(PricesPerMarket); + highPriceResults.Summary.Overall.Removes.Should().Be(0); + highPriceResults.Summary.Overall.Updates.Should().Be(PricesPerMarket * 2); + highPriceResults.Summary.Overall.Refreshes.Should().Be(0); + highPriceResults.Data.Items.Select(cp => cp.MarketId).ForEach(guid => guid.Should().Be(marketOriginal.Id)); + } + + [Fact] + public void ComparerUpdatesToCorrectValueOnRemove() + { + // having + var marketOriginal = Add(new Market(0)); + var marketLow = Add(new Market(1)); + var marketHigh = Add(new Market(2)); + using var results = _marketList.Select(m => m.LatestPrices).MergeChangeSets(MarketPrice.EqualityComparer).AsAggregator(); + using var lowPriceResults = _marketList.Select(m => m.LatestPrices).MergeChangeSets(MarketPrice.LowPriceCompare).AsAggregator(); + using var highPriceResults = _marketList.Select(m => m.LatestPrices).MergeChangeSets(MarketPrice.HighPriceCompare).AsAggregator(); + marketOriginal.AddRandomPrices(Random, 0, PricesPerMarket); + marketLow.UpdatePrices(0, PricesPerMarket, LowestPrice); + marketHigh.UpdatePrices(0, PricesPerMarket, HighestPrice); + + // when + marketLow.RemoveAllPrices(); + + // then + _marketList.Count.Should().Be(3); + results.Data.Count.Should().Be(PricesPerMarket); + results.Summary.Overall.Adds.Should().Be(PricesPerMarket); + results.Summary.Overall.Updates.Should().Be(0); + results.Data.Items.Select(cp => cp.MarketId).ForEach(guid => guid.Should().Be(marketOriginal.Id)); + lowPriceResults.Data.Count.Should().Be(PricesPerMarket); + lowPriceResults.Summary.Overall.Adds.Should().Be(PricesPerMarket); + lowPriceResults.Summary.Overall.Removes.Should().Be(0); + lowPriceResults.Summary.Overall.Updates.Should().Be(PricesPerMarket * 2); + lowPriceResults.Data.Items.Select(cp => cp.MarketId).ForEach(guid => guid.Should().Be(marketOriginal.Id)); + highPriceResults.Data.Count.Should().Be(PricesPerMarket); + highPriceResults.Summary.Overall.Adds.Should().Be(PricesPerMarket); + highPriceResults.Summary.Overall.Removes.Should().Be(0); + highPriceResults.Summary.Overall.Updates.Should().Be(PricesPerMarket); + highPriceResults.Data.Items.Select(cp => cp.MarketId).ForEach(guid => guid.Should().Be(marketHigh.Id)); + } + + [Fact] + public void ComparerUpdatesToCorrectValueOnUpdate() + { + // having + var marketOriginal = Add(new Market(0)); + var marketFlipFlop = Add(new Market(1)); + using var highPriceResults = _marketList.Select(m => m.LatestPrices).MergeChangeSets(MarketPrice.HighPriceCompare).AsAggregator(); + using var lowPriceResults = _marketList.Select(m => m.LatestPrices).MergeChangeSets(MarketPrice.LowPriceCompare).AsAggregator(); + marketOriginal.AddRandomPrices(Random, 0, PricesPerMarket); + marketFlipFlop.UpdatePrices(0, PricesPerMarket, HighestPrice); + + // when + marketFlipFlop.UpdateAllPrices(LowestPrice); + + // then + _marketList.Count.Should().Be(2); + lowPriceResults.Data.Count.Should().Be(PricesPerMarket); + lowPriceResults.Summary.Overall.Adds.Should().Be(PricesPerMarket); + lowPriceResults.Summary.Overall.Removes.Should().Be(0); + lowPriceResults.Summary.Overall.Updates.Should().Be(PricesPerMarket); + lowPriceResults.Summary.Overall.Refreshes.Should().Be(0); + lowPriceResults.Data.Items.Select(cp => cp.MarketId).ForEach(guid => guid.Should().Be(marketFlipFlop.Id)); + highPriceResults.Data.Count.Should().Be(PricesPerMarket); + highPriceResults.Summary.Overall.Adds.Should().Be(PricesPerMarket); + highPriceResults.Summary.Overall.Removes.Should().Be(0); + highPriceResults.Summary.Overall.Updates.Should().Be(PricesPerMarket * 2); + highPriceResults.Summary.Overall.Refreshes.Should().Be(0); + highPriceResults.Data.Items.Select(cp => cp.MarketId).ForEach(guid => guid.Should().Be(marketOriginal.Id)); + } + + [Fact] + public void ComparerOnlyUpdatesVisibleValuesOnUpdate() + { + // having + var marketOriginal = Add(new Market(0)); + var marketLow = Add(new Market(1)); + using var highPriceResults = _marketList.Select(m => m.LatestPrices).MergeChangeSets(MarketPrice.HighPriceCompare).AsAggregator(); + using var lowPriceResults = _marketList.Select(m => m.LatestPrices).MergeChangeSets(MarketPrice.LowPriceCompare).AsAggregator(); + marketOriginal.AddRandomPrices(Random, 0, PricesPerMarket); + marketLow.UpdatePrices(0, PricesPerMarket, LowestPrice); + + // when + marketLow.UpdateAllPrices(LowestPrice - 1); + + // then + _marketList.Count.Should().Be(2); + lowPriceResults.Data.Count.Should().Be(PricesPerMarket); + lowPriceResults.Summary.Overall.Adds.Should().Be(PricesPerMarket); + lowPriceResults.Summary.Overall.Removes.Should().Be(0); + lowPriceResults.Summary.Overall.Updates.Should().Be(PricesPerMarket * 2); + lowPriceResults.Summary.Overall.Refreshes.Should().Be(0); + lowPriceResults.Data.Items.Select(cp => cp.MarketId).ForEach(guid => guid.Should().Be(marketLow.Id)); + highPriceResults.Data.Count.Should().Be(PricesPerMarket); + highPriceResults.Summary.Overall.Adds.Should().Be(PricesPerMarket); + highPriceResults.Summary.Overall.Removes.Should().Be(0); + highPriceResults.Summary.Overall.Updates.Should().Be(0); + highPriceResults.Summary.Overall.Refreshes.Should().Be(0); + highPriceResults.Data.Items.Select(cp => cp.MarketId).ForEach(guid => guid.Should().Be(marketOriginal.Id)); + } + + [Fact] + public void ComparerOnlyRefreshesVisibleValues() + { + // having + var marketOriginal = Add(new Market(0)); + var marketLow = Add(new Market(1)); + using var highPriceResults = _marketList.Select(m => m.LatestPrices).MergeChangeSets(MarketPrice.EqualityComparer, MarketPrice.HighPriceCompare).AsAggregator(); + using var lowPriceResults = _marketList.Select(m => m.LatestPrices).MergeChangeSets(MarketPrice.EqualityComparer, MarketPrice.LowPriceCompare).AsAggregator(); + marketOriginal.AddRandomPrices(Random, 0, PricesPerMarket); + marketLow.UpdatePrices(0, PricesPerMarket, LowestPrice); + + // when + marketLow.RefreshAllPrices(LowestPrice - 1); + + // then + _marketList.Count.Should().Be(2); + lowPriceResults.Data.Count.Should().Be(PricesPerMarket); + lowPriceResults.Summary.Overall.Adds.Should().Be(PricesPerMarket); + lowPriceResults.Summary.Overall.Removes.Should().Be(0); + lowPriceResults.Summary.Overall.Updates.Should().Be(PricesPerMarket); + lowPriceResults.Summary.Overall.Refreshes.Should().Be(PricesPerMarket); + lowPriceResults.Data.Items.Select(cp => cp.MarketId).ForEach(guid => guid.Should().Be(marketLow.Id)); + highPriceResults.Data.Count.Should().Be(PricesPerMarket); + highPriceResults.Summary.Overall.Adds.Should().Be(PricesPerMarket); + highPriceResults.Summary.Overall.Removes.Should().Be(0); + highPriceResults.Summary.Overall.Updates.Should().Be(0); + highPriceResults.Summary.Overall.Refreshes.Should().Be(0); + highPriceResults.Data.Items.Select(cp => cp.MarketId).ForEach(guid => guid.Should().Be(marketOriginal.Id)); + } + + [Fact] + public void EnumObservableUsesTheScheduler() + { + // having + var scheduler = new TestScheduler(); + _marketList.AddRange(Enumerable.Range(0, MarketCount).Select(n => new Market(n))); + _marketList.ForEach((m, index) => m.AddUniquePrices(Random, index, PricesPerMarket)); + using var pricesCache = _marketList.Select(m => m.LatestPrices).MergeChangeSets(MarketPrice.EqualityComparer, scheduler).AsObservableCache(); + using var results = pricesCache.Connect().AsAggregator(); + + // when + // Do not advance the scheduler so that nothing happens + + // then + _marketList.Count.Should().Be(MarketCount); + _marketList.Sum(m => m.PricesCache.Count).Should().Be(MarketCount * PricesPerMarket); + results.Data.Count.Should().Be(0); + results.Messages.Count.Should().Be(0); + results.Summary.Overall.Adds.Should().Be(0); + results.Summary.Overall.Removes.Should().Be(0); + results.Summary.Overall.Updates.Should().Be(0); + results.Summary.Overall.Refreshes.Should().Be(0); + } + + [Fact] + public void EnumObservableUsesTheSchedulerAndEmitsAll() + { + // having + var scheduler = new TestScheduler(); + _marketList.AddRange(Enumerable.Range(0, MarketCount).Select(n => new Market(n))); + _marketList.ForEach((m, index) => m.AddUniquePrices(Random, index, PricesPerMarket)); + using var pricesCache = _marketList.Select(m => m.LatestPrices).MergeChangeSets(MarketPrice.EqualityComparer, scheduler).AsObservableCache(); + using var results = pricesCache.Connect().AsAggregator(); + + // when + scheduler.AdvanceBy(1); + + // then + _marketList.Count.Should().Be(MarketCount); + _marketList.Sum(m => m.PricesCache.Count).Should().Be(MarketCount * PricesPerMarket); + results.Data.Count.Should().Be(MarketCount * PricesPerMarket); + results.Messages.Count.Should().Be(MarketCount); + results.Summary.Overall.Adds.Should().Be(MarketCount * PricesPerMarket); + results.Summary.Overall.Removes.Should().Be(0); + results.Summary.Overall.Updates.Should().Be(0); + results.Summary.Overall.Refreshes.Should().Be(0); + } + + [Fact] + public void EqualityComparerHidesUpdatesWithoutChanges() + { + // having + var market = Add(new Market(0)); + using var results = _marketList.Select(m => m.LatestPrices).MergeChangeSets(MarketPrice.EqualityComparer).AsAggregator(); + market.UpdatePrices(0, PricesPerMarket, LowestPrice); + + // when + market.UpdatePrices(0, PricesPerMarket, LowestPrice); + + // then + _marketList.Count.Should().Be(1); + results.Data.Count.Should().Be(PricesPerMarket); + results.Messages.Count.Should().Be(1); + results.Summary.Overall.Adds.Should().Be(PricesPerMarket); + results.Summary.Overall.Removes.Should().Be(0); + results.Summary.Overall.Updates.Should().Be(0); + results.Summary.Overall.Refreshes.Should().Be(0); + } + + [Fact] + public void EqualityComparerAndComparerWorkTogetherForUpdates() + { + // having + var market1 = Add(new Market(0)); + var market2 = Add(new Market(1)); + + var results = market1.LatestPrices.MergeChangeSets(market2.LatestPrices, MarketPrice.EqualityComparer, MarketPrice.LatestPriceCompare).AsAggregator(); + var resultsTimeStamp = market1.LatestPrices.MergeChangeSets(market2.LatestPrices, MarketPrice.EqualityComparerWithTimeStamp, MarketPrice.LatestPriceCompare).AsAggregator(); + market1.AddRandomPrices(Random, 0, PricesPerMarket); + market2.UpdatePrices(0, PricesPerMarket, LowestPrice); + + // when + market2.UpdatePrices(0, PricesPerMarket, LowestPrice); + + // then + _marketList.Count.Should().Be(2); + results.Data.Count.Should().Be(PricesPerMarket); + results.Messages.Count.Should().Be(2); + results.Summary.Overall.Adds.Should().Be(PricesPerMarket); + results.Summary.Overall.Removes.Should().Be(0); + results.Summary.Overall.Updates.Should().Be(PricesPerMarket); + results.Summary.Overall.Refreshes.Should().Be(0); + resultsTimeStamp.Messages.Count.Should().Be(3); + resultsTimeStamp.Summary.Overall.Adds.Should().Be(PricesPerMarket); + resultsTimeStamp.Summary.Overall.Removes.Should().Be(0); + resultsTimeStamp.Summary.Overall.Updates.Should().Be(PricesPerMarket * 2); + resultsTimeStamp.Summary.Overall.Refreshes.Should().Be(0); + } + + [Fact] + public void EqualityComparerAndComparerWorkTogetherForRefreshes() + { + // having + var market1 = Add(new Market(0)); + var market2 = Add(new Market(1)); + + var results1 = _marketList.Select(m => m.LatestPrices).MergeChangeSets(MarketPrice.EqualityComparer, MarketPrice.LatestPriceCompare).AsAggregator(); + var results2 = _marketList.Select(m => m.LatestPrices).MergeChangeSets(MarketPrice.EqualityComparerWithTimeStamp, MarketPrice.LatestPriceCompare).AsAggregator(); + market1.AddRandomPrices(Random, 0, PricesPerMarket); + market2.UpdatePrices(0, PricesPerMarket, LowestPrice); + // Update again, but only the timestamp will change, so results1 will ignore + market2.UpdatePrices(0, PricesPerMarket, LowestPrice); + + // when + // results1 won't see the refresh because it ignored the update + // results2 will see the refreshes because it didn't + market2.RefreshAllPrices(LowestPrice); + + // then + _marketList.Count.Should().Be(2); + results1.Data.Count.Should().Be(PricesPerMarket); + results1.Messages.Count.Should().Be(2); + results1.Summary.Overall.Adds.Should().Be(PricesPerMarket); + results1.Summary.Overall.Removes.Should().Be(0); + results1.Summary.Overall.Updates.Should().Be(PricesPerMarket); + results1.Summary.Overall.Refreshes.Should().Be(0); + results2.Messages.Count.Should().Be(4); + results2.Summary.Overall.Adds.Should().Be(PricesPerMarket); + results2.Summary.Overall.Removes.Should().Be(0); + results2.Summary.Overall.Updates.Should().Be(PricesPerMarket * 2); + results2.Summary.Overall.Refreshes.Should().Be(PricesPerMarket); + } + + [Fact] + public void EqualityComparerAndComparerRefreshesBecomeUpdates() + { + // having + var market1 = Add(new Market(0)); + var market2 = Add(new Market(1)); + + var results1 = _marketList.Select(m => m.LatestPrices).MergeChangeSets(MarketPrice.EqualityComparer, MarketPrice.LatestPriceCompare).AsAggregator(); + var results2 = _marketList.Select(m => m.LatestPrices).MergeChangeSets(MarketPrice.EqualityComparerWithTimeStamp, MarketPrice.LatestPriceCompare).AsAggregator(); + market1.AddRandomPrices(Random, 0, PricesPerMarket); + market2.UpdatePrices(0, PricesPerMarket, LowestPrice - 1); + // Update again, but only the timestamp will change, so results1 will ignore + market2.UpdatePrices(0, PricesPerMarket, LowestPrice - 1); + + // when + // results1 will see this as an update because it ignored the last update + // results2 will see the refreshes + market2.RefreshAllPrices(Random); + + // then + _marketList.Count.Should().Be(2); + results1.Data.Count.Should().Be(PricesPerMarket); + results1.Messages.Count.Should().Be(3); + results1.Summary.Overall.Adds.Should().Be(PricesPerMarket); + results1.Summary.Overall.Removes.Should().Be(0); + results1.Summary.Overall.Updates.Should().Be(PricesPerMarket * 2); + results1.Summary.Overall.Refreshes.Should().Be(0); + results2.Messages.Count.Should().Be(4); + results2.Summary.Overall.Adds.Should().Be(PricesPerMarket); + results2.Summary.Overall.Removes.Should().Be(0); + results2.Summary.Overall.Updates.Should().Be(PricesPerMarket * 2); + results2.Summary.Overall.Refreshes.Should().Be(PricesPerMarket); + } + + [Fact] + public void EveryItemVisibleWhenSequenceCompletes() + { + // having + var fixedMarketList = Enumerable.Range(0, MarketCount).Select(n => new FixedMarket(Random, n * ItemIdStride, (n * ItemIdStride) + PricesPerMarket)).ToList(); + + // when + using var results = fixedMarketList.Select(m => m.LatestPrices).MergeChangeSets(completable: true).AsAggregator(); + + // then + results.Completed.Should().Be(true); + results.Data.Count.Should().Be(PricesPerMarket * MarketCount); + results.Summary.Overall.Adds.Should().Be(PricesPerMarket * MarketCount); + results.Summary.Overall.Removes.Should().Be(0); + results.Summary.Overall.Updates.Should().Be(0); + results.Summary.Overall.Refreshes.Should().Be(0); + } + + [Theory] + [InlineData(false)] + [InlineData(true)] + public void MergedObservableCompletesWhenAllSourcesComplete(bool completeSources) + { + // having + var fixedMarketList = Enumerable.Range(0, MarketCount).Select(n => new FixedMarket(Random, n * ItemIdStride, (n * ItemIdStride) + PricesPerMarket, completable: completeSources)).ToList(); + + // when + using var results = fixedMarketList.Select(m => m.LatestPrices).MergeChangeSets(completable: true).AsAggregator(); + + // then + results.Completed.Should().Be(completeSources); + } + + [Theory] + [InlineData(false, false)] + [InlineData(false, true)] + [InlineData(true, false)] + [InlineData(true, true)] + public void MergedObservableRespectsCompletableFlag(bool completeSource, bool completeChildren) + { + // having + var fixedMarketList = Enumerable.Range(0, MarketCount).Select(n => new FixedMarket(Random, n * ItemIdStride, (n * ItemIdStride) + PricesPerMarket, completable: completeChildren)).ToList(); + + // when + using var results = fixedMarketList.Select(m => m.LatestPrices).MergeChangeSets(completable: completeSource).AsAggregator(); + + // then + results.Completed.Should().Be(completeSource && completeChildren); + } + + [Fact] + public void ObservableObservableContainsAllAddedValues() + { + // having + var scheduler = new TestScheduler(); + _marketList.AddRange(Enumerable.Range(0, MarketCount).Select(n => new Market(n))); + var marketObs = Observable.Interval(TimeSpan.FromSeconds(1), scheduler).Select(n => _marketList[(int)n]); + using var results = marketObs.Select(m => m.LatestPrices).MergeChangeSets(MarketPrice.EqualityComparer).AsAggregator(); + Enumerable.Range(0, MarketCount).ForEach(n => scheduler.AdvanceBy(Interval.Ticks)); + + // when + _marketList.ForEach((m, index) => m.AddUniquePrices(Random, index, PricesPerMarket)); + + // then + results.Data.Count.Should().Be(MarketCount * PricesPerMarket); + results.Messages.Count.Should().Be(MarketCount); + results.Summary.Overall.Adds.Should().Be(MarketCount * PricesPerMarket); + results.Summary.Overall.Removes.Should().Be(0); + results.Summary.Overall.Updates.Should().Be(0); + } + + [Fact] + public void ObservableObservableContainsAllExistingValues() + { + // having + var scheduler = new TestScheduler(); + _marketList.AddRange(Enumerable.Range(0, MarketCount).Select(n => new Market(n))); + _marketList.ForEach((m, index) => m.AddUniquePrices(Random, index, PricesPerMarket)); + var marketObs = Observable.Interval(Interval, scheduler).Select(n => _marketList[(int)n]); + using var results = marketObs.Select(m => m.LatestPrices).MergeChangeSets(MarketPrice.EqualityComparer).AsAggregator(); + + // when + Enumerable.Range(0, MarketCount).ForEach(n => scheduler.AdvanceBy(Interval.Ticks)); + + // then + results.Data.Count.Should().Be(MarketCount * PricesPerMarket); + results.Messages.Count.Should().Be(MarketCount); + results.Summary.Overall.Adds.Should().Be(MarketCount * PricesPerMarket); + results.Summary.Overall.Removes.Should().Be(0); + results.Summary.Overall.Updates.Should().Be(0); + } + + [Fact] + public void MergedObservableWillFailIfAnyChangeChangeSetFails() + { + // having + _marketList.AddRange(Enumerable.Range(0, MarketCount).Select(n => new Market(n))); + _marketList.ForEach((m, index) => m.AddUniquePrices(Random, index, PricesPerMarket)); + var expectedError = new Exception("Test exception"); + var enumObservable = _marketList.Select(m => m.LatestPrices).Append(Observable.Throw>(expectedError)); + + // when + using var results = enumObservable.MergeChangeSets().AsAggregator(); + + // then + _marketList.Count.Should().Be(MarketCount); + _marketList.Sum(m => m.PricesCache.Count).Should().Be(MarketCount * PricesPerMarket); + results.Data.Count.Should().Be(MarketCount * PricesPerMarket); + results.Error.Should().Be(expectedError); + } + + [Fact] + public void ObservableObservableWillFailIfSourceFails() + { + // having + _marketList.AddRange(Enumerable.Range(0, MarketCount).Select(n => new Market(n))); + _marketList.ForEach((m, index) => m.AddUniquePrices(Random, index, PricesPerMarket)); + var expectedError = new Exception("Test exception"); + var observables = _marketList.Select(m => m.LatestPrices).ToObservable().Concat(Observable.Throw>>(expectedError)); + + // when + using var results = observables.MergeChangeSets().AsAggregator(); + + // then + _marketList.Count.Should().Be(MarketCount); + _marketList.Sum(m => m.PricesCache.Count).Should().Be(MarketCount * PricesPerMarket); + results.Data.Count.Should().Be(MarketCount * PricesPerMarket); + results.Error.Should().Be(expectedError); + } + + [Theory] + [InlineData(false, false)] + [InlineData(false, true)] + [InlineData(true, false)] + [InlineData(true, true)] + public void ObservableObservableCompletesIfAndOnlyIfSourceAndAllChildrenComplete(bool completeSource, bool completeChildren) + { + // having + var fixedMarkets = Enumerable.Range(0, MarketCount).Select(n => new FixedMarket(Random, n * ItemIdStride, (n * ItemIdStride) + PricesPerMarket, completable: completeChildren)); + var observableObservable = fixedMarkets.Select(m => m.LatestPrices).ToObservable(); + if (!completeSource) + { + observableObservable = observableObservable.Concat(Observable.Never>>()); + } + + // when + using var results = observableObservable.MergeChangeSets().AsAggregator(); + + // then + results.Completed.Should().Be(completeSource && completeChildren); + } + + public void Dispose() + { + _marketList.ForEach(m => (m as IDisposable)?.Dispose()); + } + + private Market Add(Market addThis) + { + _marketList.Add(addThis); + return addThis; + } + + private interface IMarket + { + public string Name { get; } + + public Guid Id { get; } + + public IObservable> LatestPrices { get; } + } + + private class Market : IMarket, IDisposable + { + private readonly ISourceCache _latestPrices = new SourceCache(p => p.ItemId); + + private Market(string name, Guid id) + { + Name = name; + Id = id; + } + + public Market(Market market) : this(market.Name, market.Id) + { + } + + public Market(int name) : this($"Market #{name}", Guid.NewGuid()) + { + } + + public string Name { get; } + + public Guid Id { get; } + + public IObservable> LatestPrices => _latestPrices.Connect(); + + public ISourceCache PricesCache => _latestPrices; + + public MarketPrice CreatePrice(int itemId, decimal price) => new(itemId, price, Id); + + public Market AddRandomIdPrices(Random r, int count, int minId, int maxId) + { + _latestPrices.AddOrUpdate(Enumerable.Range(0, int.MaxValue).Select(_ => r.Next(minId, maxId)).Distinct().Take(count).Select(id => CreatePrice(id, RandomPrice(r)))); + return this; + } + + public Market AddRandomPrices(Random r, int minId, int maxId) + { + _latestPrices.AddOrUpdate(Enumerable.Range(minId, (maxId - minId)).Select(id => CreatePrice(id, RandomPrice(r)))); + return this; + } + + public Market AddUniquePrices(Random r, int section, int count) => AddRandomPrices(r, section * ItemIdStride, (section * ItemIdStride) + count); + + public Market RefreshAllPrices(decimal newPrice) + { + _latestPrices.Edit(updater => updater.Items.ForEach(cp => + { + cp.Price = newPrice; + updater.Refresh(cp); + })); + + return this; + } + + public Market RefreshAllPrices(Random r) => RefreshAllPrices(RandomPrice(r)); + + public Market RefreshPrice(int id, decimal newPrice) + { + _latestPrices.Edit(updater => updater.Lookup(id).IfHasValue(cp => + { + cp.Price = newPrice; + updater.Refresh(cp); + })); + return this; + } + + public void RemoveAllPrices() => this.With(_ => _latestPrices.Clear()); + + public void RemovePrice(int itemId) => this.With(_ => _latestPrices.Remove(itemId)); + + public Market UpdateAllPrices(decimal newPrice) => this.With(_ => _latestPrices.Edit(updater => updater.AddOrUpdate(updater.Items.Select(cp => CreatePrice(cp.ItemId, newPrice))))); + + public Market UpdatePrices(int minId, int maxId, decimal newPrice) => this.With(_ => _latestPrices.AddOrUpdate(Enumerable.Range(minId, (maxId - minId)).Select(id => CreatePrice(id, newPrice)))); + + public void Dispose() => _latestPrices.Dispose(); + } + + private static decimal RandomPrice(Random r) => BasePrice + ((decimal)r.NextDouble() * PriceOffset); + + private class MarketPrice + { + public static IEqualityComparer EqualityComparer { get; } = new CurrentPriceEqualityComparer(); + public static IEqualityComparer EqualityComparerWithTimeStamp { get; } = new TimeStampPriceEqualityComparer(); + public static IComparer HighPriceCompare { get; } = new HighestPriceComparer(); + public static IComparer LowPriceCompare { get; } = new LowestPriceComparer(); + public static IComparer LatestPriceCompare { get; } = new LatestPriceComparer(); + + private decimal _price; + + public MarketPrice(int itemId, decimal price, Guid marketId) + { + ItemId = itemId; + MarketId = marketId; + Price = price; + } + + public decimal Price + { + get => _price; + set + { + _price = value; + TimeStamp = DateTimeOffset.UtcNow; + } + } + + public DateTimeOffset TimeStamp { get; private set; } + + public Guid MarketId { get; } + + public int ItemId { get; } + + public override string ToString() => $"{ItemId:D5} - {Price:c} ({MarketId}) [{TimeStamp:HH:mm:ss.fffffff}]"; + + private class CurrentPriceEqualityComparer : IEqualityComparer + { + public virtual bool Equals([DisallowNull] MarketPrice x, [DisallowNull] MarketPrice y) => x.MarketId.Equals(x.MarketId) && (x.ItemId == y.ItemId) && (x.Price == y.Price); + public int GetHashCode([DisallowNull] MarketPrice obj) => throw new NotImplementedException(); + } + + private class TimeStampPriceEqualityComparer : CurrentPriceEqualityComparer, IEqualityComparer + { + public override bool Equals([DisallowNull] MarketPrice x, [DisallowNull] MarketPrice y) => base.Equals(x, y) && (x.TimeStamp == y.TimeStamp); + } + + private class LowestPriceComparer : IComparer + { + public int Compare([DisallowNull] MarketPrice x, [DisallowNull] MarketPrice y) + { + Debug.Assert(x.ItemId == y.ItemId); + return x.Price.CompareTo(y.Price); + } + } + + private class HighestPriceComparer : IComparer + { + public int Compare([DisallowNull] MarketPrice x, [DisallowNull] MarketPrice y) + { + Debug.Assert(x.ItemId == y.ItemId); + return y.Price.CompareTo(x.Price); + } + } + + private class LatestPriceComparer : IComparer + { + public int Compare([DisallowNull] MarketPrice x, [DisallowNull] MarketPrice y) + { + Debug.Assert(x.ItemId == y.ItemId); + return y.TimeStamp.CompareTo(x.TimeStamp); + } + } + } + + private class FixedMarket : IMarket + { + public FixedMarket(Random r, int minId, int maxId, bool completable = true) + { + Id = Guid.NewGuid(); + LatestPrices = Enumerable.Range(minId, maxId - minId) + .Select(id => new MarketPrice(id, RandomPrice(r), Id)) + .AsObservableChangeSet(cp => cp.ItemId, completable: completable); + } + + public IObservable> LatestPrices { get; } + + public string Name => Id.ToString("B"); + + public Guid Id { get; } + } + + class NoOpComparer : IComparer + { + public int Compare(T x, T y) => throw new NotImplementedException(); + } + + class NoOpEqualityComparer : IEqualityComparer + { + public bool Equals(T x, T y) => throw new NotImplementedException(); + public int GetHashCode([DisallowNull] T obj) => throw new NotImplementedException(); + } +} + +internal static class Extensions +{ + public static T With(this T item, Action action) + { + action(item); + return item; + } + + public static IObservable ForceFail(this IObservable source, int count, Exception? e) => + (e is not null) + ? source.Take(count).Concat(Observable.Throw(e)) + : source; +} diff --git a/src/DynamicData.Tests/Utilities/ObservableSpy.cs b/src/DynamicData.Tests/Utilities/ObservableSpy.cs new file mode 100644 index 000000000..3854eed76 --- /dev/null +++ b/src/DynamicData.Tests/Utilities/ObservableSpy.cs @@ -0,0 +1,183 @@ +using System; +using System.Diagnostics; +using System.Linq; +using System.Reactive.Disposables; +using System.Reactive.Linq; +using System.Threading; + +namespace DynamicData.Tests.Utilities; + +internal static class ObservableSpy +{ + private static readonly string ChangeSetEntrySpacing = Environment.NewLine + "\t"; + + /// + /// Spys on the given IObservable{T} by emitting logging information that is tagged with the current ThreadId for all related + /// events including every invocation on the Observer, subscriptions, disposes, and exceptions. + /// + /// The type of the Observable. + /// The source IObservable to be Spied on. + /// The logger instance to use for logging. + /// Optional text to include with each log message. + /// Optional Value transformer to control how the observed values are logged. + /// Indicates whether or not subscription related messages should be emitted. + /// Indicates whether or not timestamps should be prepended to messages. + /// An IObservable{T} with the Spy events included. + /// Adapted from https://stackoverflow.com/q/20220755/. + public static IObservable Spy(this IObservable source, string? infoText = null, Action? logger = null, + Func? formatter = null, bool showSubs = true, + bool showTimestamps = true) + { + static string NoTimestamp() => string.Empty; + static string HighResTimestamp() => DateTimeOffset.UtcNow.ToString("HH:mm:ss.fffffff") + " "; + + formatter ??= (t => t?.ToString() ?? "{Null}"); + logger = CreateLogger(logger ?? Console.WriteLine, showTimestamps ? HighResTimestamp : NoTimestamp, infoText ?? $"IObservable<{typeof(T).Name}>"); + + logger("Creating Observable"); + + int subscriptionCounter = 0; + return Observable.Create(obs => + { + var valueCounter = 0; + bool? completedSuccessfully = null; + + if (showSubs) + { + logger("Creating Subscription"); + } + try + { + var subscription = source + .Do(x => logger($"OnNext() (#{Interlocked.Increment(ref valueCounter)}): {formatter(x)}"), + ex => { logger($"OnError() ({valueCounter} Values) [Exception: {ex}]"); completedSuccessfully = false; }, + () => { logger($"OnCompleted() ({valueCounter} Values)"); completedSuccessfully = true; }) + .Subscribe(t => + { + try + { + obs.OnNext(t); + } + catch (Exception ex) + { + logger($"Downstream exception ({ex})"); + throw; + } + }, obs.OnError, obs.OnCompleted); + + return Disposable.Create(() => + { + if (showSubs) + { + switch (completedSuccessfully) + { + case true: logger("Disposing because Observable Sequence Completed Successfully"); break; + case false: logger("Disposing due to Failed Observable Sequence"); break; + case null: logger("Disposing due to Unsubscribe"); break; + } + } + subscription?.Dispose(); + int count = Interlocked.Decrement(ref subscriptionCounter); + if (showSubs) + { + logger($"Dispose Completed! ({count} Active Subscriptions)"); + } + }); + } + finally + { + int count = Interlocked.Increment(ref subscriptionCounter); + if (showSubs) + { + logger($"Subscription Created! ({count} Active Subscriptions)"); + } + } + }); + } + + public static IObservable> Spy(this IObservable> source, + string? opName = null, Action? logger = null, + Func? formatter = null, bool showSubs = true, + bool showTimestamps = true) + where T : notnull + where TKey : notnull + { + formatter = formatter ?? (t => t?.ToString() ?? "{Null}"); + return Spy(source, opName, logger, cs => "[Cache Change Set]" + ChangeSetEntrySpacing + string.Join(ChangeSetEntrySpacing, + cs.Select((change, n) => $"#{n} [{change.Reason}] {change.Key}: {FormatChange(formatter!, change)}")), showSubs, showTimestamps); + } + + public static IObservable> Spy(this IObservable> source, + string? opName = null, Action? logger = null, + Func? formatter = null, bool showSubs = true, + bool showTimestamps = true) + where T : notnull + { + formatter = formatter ?? (t => t?.ToString() ?? "{Null}"); + return Spy(source, opName, logger, cs => "[List Change Set]" + ChangeSetEntrySpacing + string.Join(ChangeSetEntrySpacing, + cs.Select(change => $"[{change.Reason}] {FormatChange(formatter!, change)}")), showSubs, showTimestamps); + } + + public static IObservable DebugSpy(this IObservable source, string? opName = null, + Func? formatter = null, bool showSubs = true, + bool showTimestamps = true) + { +#if DEBUG + return source.Spy(opName, DebugLogger, formatter, showSubs, showTimestamps); +#else + return source; +#endif + } + + public static IObservable> DebugSpy(this IObservable> source, + string? opName = null, + Func? formatter = null, bool showSubs = true, + bool showTimestamps = true) + where T : notnull + where TKey : notnull + { +#if DEBUG + return source.Spy(opName, DebugLogger, formatter, showSubs, showTimestamps); +#else + return source; +#endif + } + + public static IObservable> DebugSpy(this IObservable> source, + string? opName = null, + Func? formatter = null, bool showSubs = true, + bool showTimestamps = true) + where T : notnull + { +#if DEBUG + return source.Spy(opName, DebugLogger, formatter, showSubs, showTimestamps); +#else + return source; +#endif + } + + private static string FormatChange(Func formatter, Change change) + where T : notnull + where TKey : notnull => + change.Reason switch + { + ChangeReason.Update => $"{formatter(change.Current)} [Previous: {formatter(change.Previous.Value)}]", + _ => formatter(change.Current), + }; + + private static string FormatChange(Func formatter, Change change) + where T : notnull => + change.Reason switch + { + ListChangeReason.AddRange => string.Join(", ", change.Range.Select(n => formatter(n))), + ListChangeReason.RemoveRange => string.Join(", ", change.Range.Select(n => formatter(n))), + _ => formatter(change.Item.Current), + }; + + private static Action CreateLogger(Action baseLogger, Func timeStamper, string opName) => + msg => baseLogger($"{timeStamper()}[{Thread.CurrentThread.ManagedThreadId:X2}] |{opName}| {msg}"); + +#if DEBUG + static void DebugLogger(string str) => Debug.WriteLine(str); +#endif +} diff --git a/src/DynamicData/Cache/Internal/ChangeSetCache.cs b/src/DynamicData/Cache/Internal/ChangeSetCache.cs new file mode 100644 index 000000000..17991f317 --- /dev/null +++ b/src/DynamicData/Cache/Internal/ChangeSetCache.cs @@ -0,0 +1,26 @@ +// Copyright (c) 2011-2023 Roland Pheasant. All rights reserved. +// Roland Pheasant licenses this file to you under the MIT license. +// See the LICENSE file in the project root for full license information. + +using System.Reactive.Linq; + +namespace DynamicData.Cache.Internal; + +/// +/// Wraps an Observable ChangeSet while maintaining a copy of the aggregated changes. +/// +/// ChangeSet Object Type. +/// ChangeSet Key Type. +internal class ChangeSetCache + where TObject : notnull + where TKey : notnull +{ + public ChangeSetCache(IObservable> source) + { + Source = source.IgnoreSameReferenceUpdate().Do(Cache.Clone); + } + + public Cache Cache { get; } = new(); + + public IObservable> Source { get; } +} diff --git a/src/DynamicData/Cache/Internal/ChangeSetMergeTracker.cs b/src/DynamicData/Cache/Internal/ChangeSetMergeTracker.cs new file mode 100644 index 000000000..9adefe1a6 --- /dev/null +++ b/src/DynamicData/Cache/Internal/ChangeSetMergeTracker.cs @@ -0,0 +1,227 @@ +// Copyright (c) 2011-2023 Roland Pheasant. All rights reserved. +// Roland Pheasant licenses this file to you under the MIT license. +// See the LICENSE file in the project root for full license information. + +using System.Reactive.Linq; +using DynamicData.Kernel; + +namespace DynamicData.Cache.Internal; + +internal class ChangeSetMergeTracker + where TObject : notnull + where TKey : notnull +{ + private readonly ChangeAwareCache _resultCache; + private readonly Func>> _selectCaches; + private readonly IComparer? _comparer; + private readonly IEqualityComparer? _equalityComparer; + + public ChangeSetMergeTracker(Func>> selectCaches, IComparer? comparer, IEqualityComparer? equalityComparer) + { + _resultCache = new ChangeAwareCache(); + _selectCaches = selectCaches; + _comparer = comparer; + _equalityComparer = equalityComparer; + } + + public void RemoveItems(IEnumerable> items, IObserver> observer) + { + var sourceCaches = _selectCaches().ToArray(); + + // Update the Published Value for each item being removed + if (items is IList> list) + { + // zero allocation enumerator + foreach (var item in EnumerableIList.Create(list)) + { + OnItemRemoved(sourceCaches, item.Value, item.Key); + } + } + else + { + foreach (var item in items) + { + OnItemRemoved(sourceCaches, item.Value, item.Key); + } + } + + EmitChanges(observer); + } + + public void ProcessChangeSet(IChangeSet changes, IObserver> observer) + { + var sourceCaches = _selectCaches().ToArray(); + + foreach (var change in changes.ToConcreteType()) + { + switch (change.Reason) + { + case ChangeReason.Add: + OnItemAdded(change.Current, change.Key); + break; + + case ChangeReason.Remove: + OnItemRemoved(sourceCaches, change.Current, change.Key); + break; + + case ChangeReason.Update: + OnItemUpdated(sourceCaches, change.Current, change.Key, change.Previous); + break; + + case ChangeReason.Refresh: + OnItemRefreshed(sourceCaches, change.Current, change.Key); + break; + } + } + + EmitChanges(observer); + } + + private void EmitChanges(IObserver> observer) + { + var changeSet = _resultCache.CaptureChanges(); + if (changeSet.Count != 0) + { + observer.OnNext(changeSet); + } + } + + private void OnItemAdded(TObject item, TKey key) + { + var cached = _resultCache.Lookup(key); + + // If no current value, then add it + if (!cached.HasValue) + { + _resultCache.Add(item, key); + } + else if (ShouldReplace(item, cached.Value)) + { + _resultCache.AddOrUpdate(item, key); + } + } + + private void OnItemRemoved(ChangeSetCache[] sourceCaches, TObject item, TKey key) + { + var cached = _resultCache.Lookup(key); + + // If this key has been observed and the current value is being removed + if (cached.HasValue && CheckEquality(item, cached.Value)) + { + // Perform a full update to select the new downstream value (or remove it) + UpdateToBestValue(sourceCaches, key, cached); + } + } + + private void OnItemUpdated(ChangeSetCache[] sources, TObject item, TKey key, Optional prev) + { + var cached = _resultCache.Lookup(key); + + // Received an update change for a key that hasn't been seen yet + // So use the updated value + if (!cached.HasValue) + { + _resultCache.Add(item, key); + return; + } + + if (_comparer is null) + { + // If the current value (or there is no way to tell) is being replaced by a different value + if ((!prev.HasValue || CheckEquality(prev.Value, cached.Value)) && !CheckEquality(item, cached.Value)) + { + // Update to the new value + _resultCache.AddOrUpdate(item, key); + } + } + else + { + // The current value is being replaced (or there is no way to tell), so do a full update to select the best one from all the choices + if (!prev.HasValue || CheckEquality(prev.Value, cached.Value)) + { + UpdateToBestValue(sources, key, cached); + } + else + { + // If the current value isn't being replaced, check to see if the replacement value is better than the current one + if (ShouldReplace(item, cached.Value)) + { + _resultCache.AddOrUpdate(item, key); + } + } + } + } + + private void OnItemRefreshed(ChangeSetCache[] sources, TObject item, TKey key) + { + var cached = _resultCache.Lookup(key); + + // Received a refresh change for a key that hasn't been seen yet + // Nothing can be done, so ignore it + if (!cached.HasValue) + { + return; + } + + // In the sorting case, a refresh requires doing a full update because any change could alter what the best value is + // If we don't care about sorting OR if we do care, but re-selecting the best value didn't change anything + // AND the current value is the exact one being refreshed, then emit the refresh downstream + if (((_comparer is null) || !UpdateToBestValue(sources, key, cached)) && ReferenceEquals(cached.Value, item)) + { + _resultCache.Refresh(key); + } + } + + private bool UpdateToBestValue(ChangeSetCache[] sources, TKey key, Optional current) + { + // Determine which value should be the one seen downstream + var candidate = SelectValue(sources, key); + if (candidate.HasValue) + { + // If there isn't a current value + if (!current.HasValue) + { + _resultCache.Add(candidate.Value, key); + return true; + } + + // If the candidate value isn't the same as the current value + if (!CheckEquality(current.Value, candidate.Value)) + { + _resultCache.AddOrUpdate(candidate.Value, key); + return true; + } + + // The value seen downstream is the one that should be + return false; + } + + // No best candidate available + _resultCache.Remove(key); + return true; + } + + private Optional SelectValue(ChangeSetCache[] sources, TKey key) + { + if (sources.Length == 0) + { + return Optional.None(); + } + + var values = sources.Select(s => s.Cache.Lookup(key)).Where(opt => opt.HasValue); + + if (_comparer is not null) + { + values = values.OrderBy(opt => opt.Value, _comparer); + } + + return values.FirstOrDefault(); + } + + private bool CheckEquality(TObject left, TObject right) => + ReferenceEquals(left, right) || (_equalityComparer?.Equals(left, right) ?? (_comparer?.Compare(left, right) == 0)); + + // Return true if candidate should replace current as the observed downstream value + private bool ShouldReplace(TObject candidate, TObject current) => + !ReferenceEquals(candidate, current) && (_comparer?.Compare(candidate, current) < 0); +} diff --git a/src/DynamicData/Cache/Internal/MergeChangeSets.cs b/src/DynamicData/Cache/Internal/MergeChangeSets.cs new file mode 100644 index 000000000..01c2a339c --- /dev/null +++ b/src/DynamicData/Cache/Internal/MergeChangeSets.cs @@ -0,0 +1,96 @@ +// Copyright (c) 2011-2023 Roland Pheasant. All rights reserved. +// Roland Pheasant licenses this file to you under the MIT license. +// See the LICENSE file in the project root for full license information. + +using System.Reactive.Concurrency; +using System.Reactive.Disposables; +using System.Reactive.Linq; + +namespace DynamicData.Cache.Internal; + +/// +/// Operator that is similiar to Merge but intelligently handles Cache ChangeSets. +/// +internal sealed class MergeChangeSets + where TObject : notnull + where TKey : notnull +{ + private readonly IObservable, int>> _source; + + private readonly IComparer? _comparer; + + private readonly IEqualityComparer? _equalityComparer; + + public MergeChangeSets(IEnumerable>> source, IEqualityComparer? equalityComparer, IComparer? comparer, bool completable, IScheduler? scheduler = null) + : this(CreateContainerObservable(source, completable, scheduler), equalityComparer, comparer) + { + } + + public MergeChangeSets(IObservable>> source, IEqualityComparer? equalityComparer, IComparer? comparer) + : this(CreateContainerObservable(source), equalityComparer, comparer) + { + } + + private MergeChangeSets(IObservable, int>> source, IEqualityComparer? equalityComparer, IComparer? comparer) + { + _source = source; + _comparer = comparer; + _equalityComparer = equalityComparer; + } + + public IObservable> Run() + { + return Observable.Create>( + observer => + { + var locker = new object(); + + // Create a local cache of Merge Containers + var localCache = _source.Synchronize(locker).AsObservableCache(); + + // Set up the change tracker + var changeTracker = new ChangeSetMergeTracker(() => localCache.Items, _comparer, _equalityComparer); + + // Merge all of the changeset streams together and Process them with the change tracker which will emit the results + var subscription = localCache.Connect().MergeMany(mc => mc.Source.Do(_ => { }, observer.OnError)) + .Synchronize(locker) + .Subscribe( + changes => changeTracker.ProcessChangeSet(changes, observer), + observer.OnError, + observer.OnCompleted); + + return new CompositeDisposable(localCache, subscription); + }); + } + + // Can optimize for the Add case because that's the only one that applies + private static Change, int> CreateChange(IObservable> source, int index) => + new(ChangeReason.Add, index, new ChangeSetCache(source)); + + // Create a ChangeSet Observable that produces ChangeSets with a single Add event for each new sub-observable + private static IObservable, int>> CreateContainerObservable(IObservable>> source) => + source.Select((src, index) => new ChangeSet, int>(new[] { CreateChange(src, index) })); + + // Create a ChangeSet Observable with a single event that adds all the values in the enum (and then completes, maybe) + private static IObservable, int>> CreateContainerObservable(IEnumerable>> source, bool completable, IScheduler? scheduler = null) => + Observable.Create, int>>(observer => + { + void EmitChanges() + { + observer.OnNext(new ChangeSet, int>(source.Select(CreateChange))); + + if (completable) + { + observer.OnCompleted(); + } + } + + if (scheduler is not null) + { + return scheduler.Schedule(EmitChanges); + } + + EmitChanges(); + return Disposable.Empty; + }); +} diff --git a/src/DynamicData/Cache/Internal/MergeManyCacheChangeSets.cs b/src/DynamicData/Cache/Internal/MergeManyCacheChangeSets.cs index d94213417..9634d5510 100644 --- a/src/DynamicData/Cache/Internal/MergeManyCacheChangeSets.cs +++ b/src/DynamicData/Cache/Internal/MergeManyCacheChangeSets.cs @@ -4,7 +4,6 @@ using System.Reactive.Disposables; using System.Reactive.Linq; -using DynamicData.Kernel; namespace DynamicData.Cache.Internal; @@ -44,14 +43,14 @@ public MergeManyCacheChangeSets(IObservable> source, F var sourceCacheOfCaches = _source .IgnoreSameReferenceUpdate() .WhereReasonsAre(ChangeReason.Add, ChangeReason.Remove, ChangeReason.Update) + .Transform((obj, key) => new ChangeSetCache(_changeSetSelector(obj, key))) .Synchronize(locker) - .Transform((obj, key) => new MergeContainer(_changeSetSelector(obj, key))) .AsObservableCache(); var shared = sourceCacheOfCaches.Connect().Publish(); // this is manages all of the changes - var changeTracker = new ChangeTracker(sourceCacheOfCaches, _comparer, _equalityComparer); + var changeTracker = new ChangeSetMergeTracker(() => sourceCacheOfCaches.Items, _comparer, _equalityComparer); // merge the items back together var allChanges = shared.MergeMany(mc => mc.Source) @@ -70,236 +69,4 @@ public MergeManyCacheChangeSets(IObservable> source, F return new CompositeDisposable(sourceCacheOfCaches, allChanges, removedItems, shared.Connect()); }); } - - private class ChangeTracker - { - private readonly ChangeAwareCache _resultCache; - private readonly IObservableCache _sourceCache; - private readonly IComparer? _comparer; - private readonly IEqualityComparer? _equalityComparer; - - public ChangeTracker(IObservableCache sourceCache, IComparer? comparer, IEqualityComparer? equalityComparer) - { - _resultCache = new ChangeAwareCache(); - _sourceCache = sourceCache; - _comparer = comparer; - _equalityComparer = equalityComparer; - } - - public void RemoveItems(IEnumerable> items, IObserver> observer) - { - var sourceCaches = _sourceCache.Items.ToArray(); - - // Update the Published Value for each item being removed - if (items is IList> list) - { - // zero allocation enumerator - foreach (var item in EnumerableIList.Create(list)) - { - OnItemRemoved(sourceCaches, item.Value, item.Key); - } - } - else - { - foreach (var item in items) - { - OnItemRemoved(sourceCaches, item.Value, item.Key); - } - } - - EmitChanges(observer); - } - - public void ProcessChangeSet(IChangeSet changes, IObserver> observer) - { - var sourceCaches = _sourceCache.Items.ToArray(); - - foreach (var change in changes.ToConcreteType()) - { - switch (change.Reason) - { - case ChangeReason.Add: - OnItemAdded(change.Current, change.Key); - break; - - case ChangeReason.Remove: - OnItemRemoved(sourceCaches, change.Current, change.Key); - break; - - case ChangeReason.Update: - OnItemUpdated(sourceCaches, change.Current, change.Key, change.Previous); - break; - - case ChangeReason.Refresh: - OnItemRefreshed(sourceCaches, change.Current, change.Key); - break; - } - } - - EmitChanges(observer); - } - - private void EmitChanges(IObserver> observer) - { - var changeSet = _resultCache.CaptureChanges(); - if (changeSet.Count != 0) - { - observer.OnNext(changeSet); - } - } - - private void OnItemAdded(TDestination item, TDestinationKey key) - { - var cached = _resultCache.Lookup(key); - - // If no current value, then add it - if (!cached.HasValue) - { - _resultCache.Add(item, key); - } - else if (ShouldReplace(item, cached.Value)) - { - _resultCache.AddOrUpdate(item, key); - } - } - - private void OnItemRemoved(MergeContainer[] sourceCaches, TDestination item, TDestinationKey key) - { - var cached = _resultCache.Lookup(key); - - // If this key has been observed and the current value is being removed - if (cached.HasValue && CheckEquality(item, cached.Value)) - { - // Perform a full update to select the new downstream value (or remove it) - UpdateToBestValue(sourceCaches, key, cached); - } - } - - private void OnItemUpdated(MergeContainer[] sources, TDestination item, TDestinationKey key, Optional prev) - { - var cached = _resultCache.Lookup(key); - - // Received an update change for a key that hasn't been seen yet - // So use the updated value - if (!cached.HasValue) - { - _resultCache.Add(item, key); - return; - } - - if (_comparer is null) - { - // If the current value (or there is no way to tell) is being replaced by a different value - if ((!prev.HasValue || CheckEquality(prev.Value, cached.Value)) && !CheckEquality(item, cached.Value)) - { - // Update to the new value - _resultCache.AddOrUpdate(item, key); - } - } - else - { - // The current value is being replaced (or there is no way to tell), so do a full update to select the best one from all the choices - if (!prev.HasValue || CheckEquality(prev.Value, cached.Value)) - { - UpdateToBestValue(sources, key, cached); - } - else - { - // If the current value isn't being replaced, check to see if the replacement value is better than the current one - if (ShouldReplace(item, cached.Value)) - { - _resultCache.AddOrUpdate(item, key); - } - } - } - } - - private void OnItemRefreshed(MergeContainer[] sources, TDestination item, TDestinationKey key) - { - var cached = _resultCache.Lookup(key); - - // Received a refresh change for a key that hasn't been seen yet - // Nothing can be done, so ignore it - if (!cached.HasValue) - { - return; - } - - // In the sorting case, a refresh requires doing a full update because any change could alter what the best value is - // If we don't care about sorting OR if we do care, but re-selecting the best value didn't change anything - // AND the current value is the one being refreshed - if (((_comparer is null) || !UpdateToBestValue(sources, key, cached)) && CheckEquality(cached.Value, item)) - { - // Emit the refresh downstream - _resultCache.Refresh(key); - } - } - - private bool UpdateToBestValue(MergeContainer[] sources, TDestinationKey key, Optional current) - { - // Determine which value should be the one seen downstream - var candidate = SelectValue(sources, key); - if (candidate.HasValue) - { - // If there isn't a current value - if (!current.HasValue) - { - _resultCache.Add(candidate.Value, key); - return true; - } - - // If the candidate value isn't the same as the current value - if (!CheckEquality(current.Value, candidate.Value)) - { - _resultCache.AddOrUpdate(candidate.Value, key); - return true; - } - - // The value seen downstream is the one that should be - return false; - } - - // No best candidate available - _resultCache.Remove(key); - return true; - } - - private Optional SelectValue(MergeContainer[] sources, TDestinationKey key) - { - if (sources.Length == 0) - { - return Optional.None(); - } - - var values = sources.Select(s => s.Cache.Lookup(key)).Where(opt => opt.HasValue); - - if (_comparer is not null) - { - values = values.OrderBy(opt => opt.Value, _comparer); - } - - return values.FirstOrDefault(); - } - - private bool CheckEquality(TDestination left, TDestination right) => - ReferenceEquals(left, right) || (_equalityComparer?.Equals(left, right) ?? (_comparer?.Compare(left, right) == 0)); - - // Return true if candidate should replace current as the observed downstream value - private bool ShouldReplace(TDestination candidate, TDestination current) => - !ReferenceEquals(candidate, current) && (_comparer?.Compare(candidate, current) < 0); - } - - private class MergeContainer - { - public MergeContainer(IObservable> source) - { - Source = source.IgnoreSameReferenceUpdate().Do(Clone); - } - - public Cache Cache { get; } = new(); - - public IObservable> Source { get; } - - private void Clone(IChangeSet changes) => Cache.Clone(changes); - } } diff --git a/src/DynamicData/Cache/ObservableCacheEx.cs b/src/DynamicData/Cache/ObservableCacheEx.cs index 27fe0713b..480e969ab 100644 --- a/src/DynamicData/Cache/ObservableCacheEx.cs +++ b/src/DynamicData/Cache/ObservableCacheEx.cs @@ -2938,6 +2938,350 @@ public static IObservable FinallySafe(this IObservable source, Action f return new MergeMany(source, observableSelector).Run(); } + /// + /// Operator similiar to Merge except it is ChangeSet aware. All of the observable changesets are merged together into a single stream of ChangeSet events that correctly handles multiple Keys. + /// + /// The type of the object. + /// The type of the key. + /// The Source Observable ChangeSet. + /// The result from merging the changesets together. + /// Parameter was null. + public static IObservable> MergeChangeSets(this IObservable>> source) + where TObject : notnull + where TKey : notnull + { + if (source == null) throw new ArgumentNullException(nameof(source)); + + return new MergeChangeSets(source, equalityComparer: null, comparer: null).Run(); + } + + /// + /// Operator similiar to Merge except it is ChangeSet aware. All of the observable changesets are merged together into a single stream of ChangeSet events that correctly handles multiple Keys. + /// + /// The type of the object. + /// The type of the key. + /// The Source Observable ChangeSet. + /// instance to determine which element to emit if the same key is emitted from multiple changesets. + /// The result from merging the changesets together. + /// Parameter was null. + public static IObservable> MergeChangeSets(this IObservable>> source, IComparer comparer) + where TObject : notnull + where TKey : notnull + { + if (source == null) throw new ArgumentNullException(nameof(source)); + if (comparer == null) throw new ArgumentNullException(nameof(comparer)); + + return new MergeChangeSets(source, equalityComparer: null, comparer).Run(); + } + + /// + /// Operator similiar to Merge except it is ChangeSet aware. All of the observable changesets are merged together into a single stream of ChangeSet events that correctly handles multiple Keys. + /// + /// The type of the object. + /// The type of the key. + /// The Source Observable ChangeSet. + /// instance to determine if two elements are the same. + /// The result from merging the changesets together. + /// Parameter was null. + public static IObservable> MergeChangeSets(this IObservable>> source, IEqualityComparer equalityComparer) + where TObject : notnull + where TKey : notnull + { + if (source == null) throw new ArgumentNullException(nameof(source)); + if (equalityComparer == null) throw new ArgumentNullException(nameof(equalityComparer)); + + return new MergeChangeSets(source, equalityComparer, comparer: null).Run(); + } + + /// + /// Operator similiar to Merge except it is ChangeSet aware. All of the observable changesets are merged together into a single stream of ChangeSet events that correctly handles multiple Keys. + /// + /// The type of the object. + /// The type of the key. + /// The Source Observable ChangeSet. + /// instance to determine if two elements are the same. + /// instance to determine which element to emit if the same key is emitted from multiple changesets. + /// The result from merging the changesets together. + /// Parameter was null. + public static IObservable> MergeChangeSets(this IObservable>> source, IEqualityComparer equalityComparer, IComparer comparer) + where TObject : notnull + where TKey : notnull + { + if (source == null) throw new ArgumentNullException(nameof(source)); + if (equalityComparer == null) throw new ArgumentNullException(nameof(equalityComparer)); + if (comparer == null) throw new ArgumentNullException(nameof(comparer)); + + return new MergeChangeSets(source, equalityComparer, comparer).Run(); + } + + /// + /// Operator similiar to Merge except it is ChangeSet aware. Merges both observable changesets into a single stream of ChangeSet events that correctly handles multiple Keys. + /// + /// The type of the object. + /// The type of the key. + /// The Source Observable ChangeSet. + /// The Other Observable ChangeSet. + /// (Optional) instance to use when enumerating the collection. + /// Whether or not the result Observable should complete if all the changesets complete. + /// The result from merging the changesets together. + /// Parameter was null. + public static IObservable> MergeChangeSets(this IObservable> source, IObservable> other, IScheduler? scheduler = null, bool completable = true) + where TObject : notnull + where TKey : notnull + { + if (source == null) throw new ArgumentNullException(nameof(source)); + if (other == null) throw new ArgumentNullException(nameof(other)); + + return new[] { source, other }.MergeChangeSets(scheduler, completable); + } + + /// + /// Operator similiar to Merge except it is ChangeSet aware. Merges both observable changesets into a single stream of ChangeSet events that correctly handles multiple Keys. + /// + /// The type of the object. + /// The type of the key. + /// The Source Observable ChangeSet. + /// The Other Observable ChangeSet. + /// instance to determine which element to emit if the same key is emitted from multiple changesets. + /// (Optional) instance to use when enumerating the collection. + /// Whether or not the result Observable should complete if all the changesets complete. + /// The result from merging the changesets together. + /// Parameter was null. + public static IObservable> MergeChangeSets(this IObservable> source, IObservable> other, IComparer comparer, IScheduler? scheduler = null, bool completable = true) + where TObject : notnull + where TKey : notnull + { + if (source == null) throw new ArgumentNullException(nameof(source)); + if (other == null) throw new ArgumentNullException(nameof(other)); + if (comparer == null) throw new ArgumentNullException(nameof(comparer)); + + return new[] { source, other }.MergeChangeSets(comparer, scheduler, completable); + } + + /// + /// Operator similiar to Merge except it is ChangeSet aware. Merges both observable changesets into a single stream of ChangeSet events that correctly handles multiple Keys. + /// + /// The type of the object. + /// The type of the key. + /// The Source Observable ChangeSet. + /// The Other Observable ChangeSet. + /// instance to determine if two elements are the same. + /// (Optional) instance to use when enumerating the collection. + /// Whether or not the result Observable should complete if all the changesets complete. + /// The result from merging the changesets together. + /// Parameter was null. + public static IObservable> MergeChangeSets(this IObservable> source, IObservable> other, IEqualityComparer equalityComparer, IScheduler? scheduler = null, bool completable = true) + where TObject : notnull + where TKey : notnull + { + if (source == null) throw new ArgumentNullException(nameof(source)); + if (other == null) throw new ArgumentNullException(nameof(other)); + if (equalityComparer == null) throw new ArgumentNullException(nameof(equalityComparer)); + + return new[] { source, other }.MergeChangeSets(equalityComparer, scheduler, completable); + } + + /// + /// Operator similiar to Merge except it is ChangeSet aware. Merges both observable changesets into a single stream of ChangeSet events that correctly handles multiple Keys. + /// + /// The type of the object. + /// The type of the key. + /// The Source Observable ChangeSet. + /// The Other Observable ChangeSet. + /// instance to determine if two elements are the same. + /// instance to determine which element to emit if the same key is emitted from multiple changesets. + /// (Optional) instance to use when enumerating the collection. + /// Whether or not the result Observable should complete if all the changesets complete. + /// The result from merging the changesets together. + /// Parameter was null. + public static IObservable> MergeChangeSets(this IObservable> source, IObservable> other, IEqualityComparer equalityComparer, IComparer comparer, IScheduler? scheduler = null, bool completable = true) + where TObject : notnull + where TKey : notnull + { + if (source == null) throw new ArgumentNullException(nameof(source)); + if (other == null) throw new ArgumentNullException(nameof(other)); + if (equalityComparer == null) throw new ArgumentNullException(nameof(equalityComparer)); + if (comparer == null) throw new ArgumentNullException(nameof(comparer)); + + return new[] { source, other }.MergeChangeSets(equalityComparer, comparer, scheduler, completable); + } + + /// + /// Operator similiar to Merge except it is ChangeSet aware. Merges the source changeset and the collection of other changesets together into a single stream of ChangeSet events that correctly handles multiple Keys. + /// + /// The type of the object. + /// The type of the key. + /// The Source Observable ChangeSet. + /// The Other Observable ChangeSets. + /// (Optional) instance to use when enumerating the collection. + /// Whether or not the result Observable should complete if all the changesets complete. + /// The result from merging the changesets together. + /// Parameter was null. + public static IObservable> MergeChangeSets(this IObservable> source, IEnumerable>> others, IScheduler? scheduler = null, bool completable = true) + where TObject : notnull + where TKey : notnull + { + if (source == null) throw new ArgumentNullException(nameof(source)); + if (others == null) throw new ArgumentNullException(nameof(others)); + + return source.EnumerateOne().Concat(others).MergeChangeSets(scheduler, completable); + } + + /// + /// Operator similiar to Merge except it is ChangeSet aware. Merges the source changeset and the collection of other changesets together into a single stream of ChangeSet events that correctly handles multiple Keys. + /// + /// The type of the object. + /// The type of the key. + /// The Source Observable ChangeSet. + /// The Other Observable ChangeSets. + /// instance to determine which element to emit if the same key is emitted from multiple changesets. + /// (Optional) instance to use when enumerating the collection. + /// Whether or not the result Observable should complete if all the changesets complete. + /// The result from merging the changesets together. + /// Parameter was null. + public static IObservable> MergeChangeSets(this IObservable> source, IEnumerable>> others, IComparer comparer, IScheduler? scheduler = null, bool completable = true) + where TObject : notnull + where TKey : notnull + { + if (source == null) throw new ArgumentNullException(nameof(source)); + if (others == null) throw new ArgumentNullException(nameof(others)); + if (comparer == null) throw new ArgumentNullException(nameof(comparer)); + + return source.EnumerateOne().Concat(others).MergeChangeSets(comparer, scheduler, completable); + } + + /// + /// Operator similiar to Merge except it is ChangeSet aware. Merges the source changeset and the collection of other changesets together into a single stream of ChangeSet events that correctly handles multiple Keys. + /// + /// The type of the object. + /// The type of the key. + /// The Source Observable ChangeSet. + /// The Other Observable ChangeSets. + /// instance to determine if two elements are the same. + /// (Optional) instance to use when enumerating the collection. + /// Whether or not the result Observable should complete if all the changesets complete. + /// The result from merging the changesets together. + /// Parameter was null. + public static IObservable> MergeChangeSets(this IObservable> source, IEnumerable>> others, IEqualityComparer equalityComparer, IScheduler? scheduler = null, bool completable = true) + where TObject : notnull + where TKey : notnull + { + if (source == null) throw new ArgumentNullException(nameof(source)); + if (others == null) throw new ArgumentNullException(nameof(others)); + if (equalityComparer == null) throw new ArgumentNullException(nameof(equalityComparer)); + + return source.EnumerateOne().Concat(others).MergeChangeSets(equalityComparer, scheduler, completable); + } + + /// + /// Operator similiar to Merge except it is ChangeSet aware. Merges the source changeset and the collection of other changesets together into a single stream of ChangeSet events that correctly handles multiple Keys. + /// + /// The type of the object. + /// The type of the key. + /// The Source Observable ChangeSet. + /// The Other Observable ChangeSets. + /// instance to determine if two elements are the same. + /// instance to determine which element to emit if the same key is emitted from multiple changesets. + /// (Optional) instance to use when enumerating the collection. + /// Whether or not the result Observable should complete if all the changesets complete. + /// The result from merging the changesets together. + /// Parameter was null. + public static IObservable> MergeChangeSets(this IObservable> source, IEnumerable>> others, IEqualityComparer equalityComparer, IComparer comparer, IScheduler? scheduler = null, bool completable = true) + where TObject : notnull + where TKey : notnull + { + if (source == null) throw new ArgumentNullException(nameof(source)); + if (others == null) throw new ArgumentNullException(nameof(others)); + if (equalityComparer == null) throw new ArgumentNullException(nameof(equalityComparer)); + if (comparer == null) throw new ArgumentNullException(nameof(comparer)); + + return source.EnumerateOne().Concat(others).MergeChangeSets(equalityComparer, comparer, scheduler, completable); + } + + /// + /// Operator similiar to Merge except it is ChangeSet aware. All of the observable changesets are merged together into a single stream of ChangeSet events that correctly handles multiple Keys. + /// + /// The type of the object. + /// The type of the key. + /// The Source Observable ChangeSet. + /// (Optional) instance to use when enumerating the collection. + /// Whether or not the result Observable should complete if all the changesets complete. + /// The result from merging the changesets together. + /// Parameter was null. + public static IObservable> MergeChangeSets(this IEnumerable>> source, IScheduler? scheduler = null, bool completable = true) + where TObject : notnull + where TKey : notnull + { + if (source == null) throw new ArgumentNullException(nameof(source)); + + return new MergeChangeSets(source, equalityComparer: null, comparer: null, completable, scheduler).Run(); + } + + /// + /// Operator similiar to Merge except it is ChangeSet aware. All of the observable changesets are merged together into a single stream of ChangeSet events that correctly handles multiple Keys. + /// + /// The type of the object. + /// The type of the key. + /// The Source Observable ChangeSet. + /// instance to determine which element to emit if the same key is emitted from multiple changesets. + /// (Optional) instance to use when enumerating the collection. + /// Whether or not the result Observable should complete if all the changesets complete. + /// The result from merging the changesets together. + /// Parameter was null. + public static IObservable> MergeChangeSets(this IEnumerable>> source, IComparer comparer, IScheduler? scheduler = null, bool completable = true) + where TObject : notnull + where TKey : notnull + { + if (source == null) throw new ArgumentNullException(nameof(source)); + if (comparer == null) throw new ArgumentNullException(nameof(comparer)); + + return new MergeChangeSets(source, equalityComparer: null, comparer, completable, scheduler).Run(); + } + + /// + /// Operator similiar to Merge except it is ChangeSet aware. All of the observable changesets are merged together into a single stream of ChangeSet events that correctly handles multiple Keys. + /// + /// The type of the object. + /// The type of the key. + /// The Source Observable ChangeSet. + /// instance to determine if two elements are the same. + /// (Optional) instance to use when enumerating the collection. + /// Whether or not the result Observable should complete if all the changesets complete. + /// The result from merging the changesets together. + /// Parameter was null. + public static IObservable> MergeChangeSets(this IEnumerable>> source, IEqualityComparer equalityComparer, IScheduler? scheduler = null, bool completable = true) + where TObject : notnull + where TKey : notnull + { + if (source == null) throw new ArgumentNullException(nameof(source)); + if (equalityComparer == null) throw new ArgumentNullException(nameof(equalityComparer)); + + return new MergeChangeSets(source, equalityComparer, comparer: null, completable, scheduler).Run(); + } + + /// + /// Operator similiar to Merge except it is ChangeSet aware. All of the observable changesets are merged together into a single stream of ChangeSet events that correctly handles multiple Keys. + /// + /// The type of the object. + /// The type of the key. + /// The Source Observable ChangeSet. + /// instance to determine if two elements are the same. + /// instance to determine which element to emit if the same key is emitted from multiple changesets. + /// (Optional) instance to use when enumerating the collection. + /// Whether or not the result Observable should complete if all the changesets complete. + /// The result from merging the changesets together. + /// Parameter was null. + public static IObservable> MergeChangeSets(this IEnumerable>> source, IEqualityComparer equalityComparer, IComparer comparer, IScheduler? scheduler = null, bool completable = true) + where TObject : notnull + where TKey : notnull + { + if (source == null) throw new ArgumentNullException(nameof(source)); + if (equalityComparer == null) throw new ArgumentNullException(nameof(equalityComparer)); + if (comparer == null) throw new ArgumentNullException(nameof(comparer)); + + return new MergeChangeSets(source, equalityComparer, comparer, completable, scheduler).Run(); + } + /// /// Operator similiar to MergeMany except it is ChangeSet aware. It uses to transform each item in the source into a child and merges the result children together into a single stream of ChangeSets that correctly handles multiple Keys and removal of the parent items. /// diff --git a/src/DynamicData/Cache/Tests/ChangeSetAggregator.cs b/src/DynamicData/Cache/Tests/ChangeSetAggregator.cs index 72d1bd2be..6e7fc5370 100644 --- a/src/DynamicData/Cache/Tests/ChangeSetAggregator.cs +++ b/src/DynamicData/Cache/Tests/ChangeSetAggregator.cs @@ -35,7 +35,7 @@ public ChangeSetAggregator(IObservable> source) Data = published.AsObservableCache(); - var results = published.Subscribe(updates => Messages.Add(updates), ex => Error = ex); + var results = published.Subscribe(updates => Messages.Add(updates), ex => Error = ex, () => Completed = true); var summariser = published.CollectUpdateStats().Subscribe(summary => Summary = summary, _ => { }); var connected = published.Connect(); @@ -65,6 +65,14 @@ public ChangeSetAggregator(IObservable> source) /// public Exception? Error { get; private set; } + /// + /// Gets a value indicating whether or not the ChangeSet fired OnCompleted. + /// + /// + /// Boolean Value. + /// + public bool Completed { get; private set; } + /// /// Gets the messages. ///