Skip to content

Commit

Permalink
feature: Add overload of TransformMany that works with child instance…
Browse files Browse the repository at this point in the history
…s of IObservableCache (#689)

* Add overload of TransformMany that works with child instances of IObservableCache

* Update version.json

---------

Co-authored-by: Glenn <5834289+glennawatson@users.noreply.github.com>
  • Loading branch information
John-MartinMalone and glennawatson committed May 22, 2023
1 parent 39df0fa commit 9af334c
Show file tree
Hide file tree
Showing 3 changed files with 113 additions and 3 deletions.
77 changes: 75 additions & 2 deletions src/DynamicData.Tests/Cache/TransformManyObservableCacheFixture.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
using System.Diagnostics;
using System.Linq;
using System.Reactive.Linq;

using DynamicData.Binding;
using DynamicData.Tests.Domain;

using FluentAssertions;
Expand Down Expand Up @@ -117,6 +117,57 @@ public void FlattenReadOnlyObservableCollection()
aggregator.Data.Lookup("Replacement").HasValue.Should().BeTrue();
}

[Fact]
public void FlattenObservableCache()
{
var children = Enumerable.Range(1, 100).Select(i => new Person("Name" + i, i)).ToArray();

int childIndex = 0;
var parents = Enumerable.Range(1, 50).Select(
i =>
{
var parent = new Parent(
i,
new[]
{
children[childIndex],
children[childIndex + 1]
});
childIndex += 2;
return parent;
}).ToArray();

using var source = new SourceCache<Parent, int>(x => x.Id);
using var aggregator = source.Connect().TransformMany(p => p.ChildrenCache, c => c.Name).AsAggregator();
source.AddOrUpdate(parents);

aggregator.Data.Count.Should().Be(100);

//add a child to an observable collection and check the new item is added
parents[0].Children.Add(new Person("NewlyAddded", 100));
aggregator.Data.Count.Should().Be(101);

////remove first parent and check children have gone
source.RemoveKey(1);
aggregator.Data.Count.Should().Be(98);

//check items can be cleared and then added back in
var childrenInZero = parents[1].Children.ToArray();
parents[1].Children.Clear();
aggregator.Data.Count.Should().Be(96);
parents[1].Children.AddRange(childrenInZero);
aggregator.Data.Count.Should().Be(98);

//replace produces an update
var replacedChild = parents[1].Children[0];
parents[1].Children[0] = new Person("Replacement", 100);
aggregator.Data.Count.Should().Be(98);

aggregator.Data.Lookup(replacedChild.Key).HasValue.Should().BeFalse();
aggregator.Data.Lookup("Replacement").HasValue.Should().BeTrue();
}

[Fact]
public void ObservableCollectionWithoutInitialData()
{
Expand Down Expand Up @@ -183,25 +234,47 @@ public void ReadOnlyObservableCollectionWithoutInitialData()
collection.Count.Should().Be(2);
}

[Fact]
public void ObservableCacheWithoutInitialData()
{
using var parents = new SourceCache<Parent, int>(d => d.Id);
var collection = parents.Connect().TransformMany(d => d.ChildrenCache, p => p.Name).AsObservableCache();

var parent = new Parent(1);
parents.AddOrUpdate(parent);

collection.Count.Should().Be(0);

parent.Children.Add(new Person("child1", 1));
collection.Count.Should().Be(1);

parent.Children.Add(new Person("child2", 2));
collection.Count.Should().Be(2);
}

private class Parent
{
public Parent(int id, IEnumerable<Person> children)
{
Id = id;
Children = new ObservableCollection<Person>(children);
ChildrenReadonly = new ReadOnlyObservableCollection<Person>(Children);
ChildrenCache = Children.ToObservableChangeSet(x => x.Name).AsObservableCache();
}

public Parent(int id)
{
Id = id;
Children = new ObservableCollection<Person>();
ChildrenReadonly = new ReadOnlyObservableCollection<Person>(Children);
ChildrenCache = Children.ToObservableChangeSet(x => x.Name).AsObservableCache();
}

public ObservableCollection<Person> Children { get; }

public ReadOnlyObservableCollection<Person> ChildrenReadonly { get; }
public ReadOnlyObservableCollection<Person> ChildrenReadonly { get; }

public IObservableCache<Person, string> ChildrenCache { get; }

public int Id { get; }
}
Expand Down
21 changes: 20 additions & 1 deletion src/DynamicData/Cache/Internal/TransformMany.cs
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,25 @@ public TransformMany(IObservable<IChangeSet<TSource, TSourceKey>> source, Func<T
{
}

public TransformMany(IObservable<IChangeSet<TSource, TSourceKey>> source, Func<TSource, IObservableCache<TDestination, TDestinationKey>> manySelector, Func<TDestination, TDestinationKey> keySelector)
: this(source,
x => manySelector(x).Items,
keySelector,
t => Observable.Defer(
() =>
{
var subsequentChanges = Observable.Create<IChangeSet<TDestination, TDestinationKey>>(o => manySelector(t).Connect().Subscribe(o));
if (manySelector(t).Count > 0)
{
return subsequentChanges;
}
return Observable.Return(ChangeSet<TDestination, TDestinationKey>.Empty).Concat(subsequentChanges);
}))
{
}

public TransformMany(IObservable<IChangeSet<TSource, TSourceKey>> source, Func<TSource, IEnumerable<TDestination>> manySelector, Func<TDestination, TDestinationKey> keySelector, Func<TSource, IObservable<IChangeSet<TDestination, TDestinationKey>>>? childChanges = null)
{
_source = source;
Expand Down Expand Up @@ -108,11 +127,11 @@ public TransformMany(IObservable<IChangeSet<TSource, TSourceKey>> source, Func<T
{
// Only skip initial for first time Adds where there is initial data records
var locker = new object();
var collection = _manySelector(t);
var changes = _childChanges(t).Synchronize(locker).Skip(1);
return new ManyContainer(
() =>
{
var collection = _manySelector(t);
lock (locker)
{
return collection.Select(m => new DestinationContainer(m, _keySelector(m))).ToArray();
Expand Down
18 changes: 18 additions & 0 deletions src/DynamicData/Cache/ObservableCacheEx.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4694,6 +4694,24 @@ public static IObservable<IReadOnlyCollection<T>> StartWithEmpty<T>(this IObserv
return new TransformMany<TDestination, TDestinationKey, TSource, TSourceKey>(source, manySelector, keySelector).Run();
}

/// <summary>
/// Flatten the nested observable cache, and subsequently observe observable cache changes.
/// </summary>
/// <typeparam name="TDestination">The type of the destination.</typeparam>
/// <typeparam name="TDestinationKey">The type of the destination key.</typeparam>
/// <typeparam name="TSource">The type of the source.</typeparam>
/// <typeparam name="TSourceKey">The type of the source key.</typeparam>
/// <returns>An observable with the transformed change set.</returns>
/// <param name="source">The source.</param>
/// <param name="manySelector">Will select an observable cache of values.</param>
/// <param name="keySelector">The key selector which must be unique across all.</param>
public static IObservable<IChangeSet<TDestination, TDestinationKey>> TransformMany<TDestination, TDestinationKey, TSource, TSourceKey>(this IObservable<IChangeSet<TSource, TSourceKey>> source, Func<TSource, IObservableCache<TDestination, TDestinationKey>> manySelector, Func<TDestination, TDestinationKey> keySelector)
where TSourceKey : notnull
where TDestinationKey : notnull
{
return new TransformMany<TDestination, TDestinationKey, TSource, TSourceKey>(source, manySelector, keySelector).Run();
}

/// <summary>
/// Projects each update item to a new form using the specified transform function,
/// providing an error handling action to safely handle transform errors without killing the stream.
Expand Down

0 comments on commit 9af334c

Please sign in to comment.