Skip to content

Commit

Permalink
Add CreateCollection extension method
Browse files Browse the repository at this point in the history
  • Loading branch information
anaisbetts committed Nov 2, 2010
1 parent 66fe174 commit ae774b0
Show file tree
Hide file tree
Showing 2 changed files with 69 additions and 5 deletions.
28 changes: 28 additions & 0 deletions ReactiveXaml.Tests/ReactiveCollectionTest.cs
Expand Up @@ -7,6 +7,7 @@
using System.Text;
using ReactiveXaml.Tests;
using System.Runtime.Serialization.Json;
using System.Threading;

namespace ReactiveXaml.Tests
{
Expand Down Expand Up @@ -129,13 +130,40 @@ public void ChangeTrackingShouldFireNotifications()
new[]{"IsOnlyOneWord", "IsNotNullString"}.AssertAreEqual(output.Select(x => x.Item2));
}

[TestMethod()]
public void CreateCollectionWithoutTimer()
{
var input = new[] {"Foo", "Bar", "Baz", "Bamf"};
var fixture = input.ToObservable().CreateCollection();

input.AssertAreEqual(fixture);
}

[TestMethod()]
public void CreateCollectionWithTimer()
{
var input = new[] {"Foo", "Bar", "Baz", "Bamf"};
var fixture = input.ToObservable().CreateCollection(TimeSpan.FromSeconds(0.5));
var output = fixture.ItemsAdded.Timestamp().Select(x => x.Timestamp).CreateCollection();

Assert.IsTrue(RxApp.InUnitTestRunner());
Thread.Sleep(4 * 1000);

input.AssertAreEqual(fixture);
var timings = Enumerable.Zip(output, output.Skip(1),
(prev, curr) => (curr - prev) - TimeSpan.FromSeconds(0.5));
this.Log().Debug(String.Join(",", timings));
timings.Run(x => Assert.IsTrue(x < TimeSpan.FromMilliseconds(20)));
}

[TestMethod()]
public void DerivedCollectionsShouldFollowBaseCollection()
{
var input = new[] {"Foo", "Bar", "Baz", "Bamf"};
var fixture = new ReactiveCollection<TestFixture>(
input.Select(x => new TestFixture() { IsOnlyOneWord = x }));

input.Run(Console.WriteLine);
var output = fixture.CreateDerivedCollection(new Func<TestFixture, string>(x => x.IsOnlyOneWord));

input.AssertAreEqual(output);
Expand Down
46 changes: 41 additions & 5 deletions ReactiveXaml/ReactiveCollection.cs
Expand Up @@ -210,13 +210,13 @@ public ReactiveCollection<TNew> CreateDerivedCollection<TNew>(Func<T, TNew> Sele
case NotifyCollectionChangedAction.Add:
case NotifyCollectionChangedAction.Remove:
case NotifyCollectionChangedAction.Replace:
// NB: SL4 fills in OldStartingIndex with -1 on Replace :-/
int old_index = (x.EventArgs.Action == NotifyCollectionChangedAction.Replace ?
x.EventArgs.NewStartingIndex : x.EventArgs.OldStartingIndex);
// NB: SL4 fills in OldStartingIndex with -1 on Replace :-/
int old_index = (x.EventArgs.Action == NotifyCollectionChangedAction.Replace ?
x.EventArgs.NewStartingIndex : x.EventArgs.OldStartingIndex);
if (x.EventArgs.OldItems != null) {
foreach(object _ in x.EventArgs.OldItems) {
ret.RemoveAt(old_index);
ret.RemoveAt(old_index);
}
}
if (x.EventArgs.NewItems != null) {
Expand All @@ -236,6 +236,7 @@ public ReactiveCollection<TNew> CreateDerivedCollection<TNew>(Func<T, TNew> Sele
return ret;
}


public void Dispose()
{
ChangeTrackingEnabled = false;
Expand Down Expand Up @@ -303,6 +304,41 @@ protected override void OnPropertyChanged(PropertyChangedEventArgs e)
}
#endif
}

public static class ReactiveCollectionMixins
{
public static ReactiveCollection<T> CreateCollection<T>(this IObservable<T> FromObservable, TimeSpan? WithDelay = null)
{
var ret = new ReactiveCollection<T>();
if (WithDelay == null) {
FromObservable.ObserveOn(RxApp.DeferredScheduler).Subscribe(ret.Add);
return ret;
}

// On a timer, dequeue items from queue if they are available
var queue = new Queue<T>();
var disconnect = Observable.Timer(WithDelay.Value, WithDelay.Value)
.ObserveOn(RxApp.DeferredScheduler).Subscribe(_ => {
if (queue.Count > 0) {
ret.Add(queue.Dequeue());
}
});

// When new items come in from the observable, stuff them in the queue.
// Using the DeferredScheduler guarantees we'll always access the queue
// from the same thread.
FromObservable.ObserveOn(RxApp.DeferredScheduler).Subscribe(queue.Enqueue);

// This is a bit clever - keep a running count of the items actually
// added and compare them to the final count of items provided by the
// Observable. Combine the two values, and when they're equal,
// disconnect the timer
ret.ItemsAdded.Scan0(0, ((acc, _) => acc+1)).Zip(FromObservable.Aggregate(0, (acc,_) => acc+1),
(l,r) => (l == r)).Where(x => x).Subscribe(_ => disconnect.Dispose());

return ret;
}
}
}

// vim: tw=120 ts=4 sw=4 et enc=utf8 :

0 comments on commit ae774b0

Please sign in to comment.