Skip to content

Commit

Permalink
Re-enable CreateCollection
Browse files Browse the repository at this point in the history
Moving logic into internal helper class based on
ReactiveDerivedCollection<T>.
  • Loading branch information
niik authored and anaisbetts committed Apr 1, 2013
1 parent 1ebba6e commit c1c3147
Showing 1 changed file with 48 additions and 30 deletions.
78 changes: 48 additions & 30 deletions ReactiveUI/ReactiveCollectionMixins.cs
Expand Up @@ -483,64 +483,82 @@ public override void Dispose(bool disposing)
} }
} }


public static class ReactiveCollectionMixins internal class ReactiveDerivedCollectionFromObservable<T>: ReactiveDerivedCollection<T>
{ {
/// <summary> SingleAssignmentDisposable inner;
/// Creates a collection based on an an Observable by adding items
/// provided until the Observable completes, optionally ensuring a public ReactiveDerivedCollectionFromObservable(
/// delay. Note that if the Observable never completes and withDelay is IObservable<T> observable,
/// set, this method will leak a Timer. This method also guarantees that
/// items are always added via the UI thread.
/// </summary>
/// <param name="fromObservable">The Observable whose items will be put
/// into the new collection.</param>
/// <param name="onError">The handler for errors from the Observable. If
/// not specified, an error will go to DefaultExceptionHandler.</param>
/// <param name="withDelay">If set, items will be populated in the
/// collection no faster than the delay provided.</param>
/// <returns>A new collection which will be populated with the
/// Observable.</returns>
public static ReactiveDerivedCollection<T> CreateCollection<T>(
this IObservable<T> fromObservable,
TimeSpan? withDelay = null, TimeSpan? withDelay = null,
Action<Exception> onError = null) Action<Exception> onError = null)
{ {
throw new NotImplementedException(); this.inner = new SingleAssignmentDisposable();
/*
var disp = new SingleAssignmentDisposable();
var ret = new ReactiveDerivedCollection<T>(disp);


onError = onError ?? (ex => RxApp.DefaultExceptionHandler.OnNext(ex)); onError = onError ?? (ex => RxApp.DefaultExceptionHandler.OnNext(ex));
if (withDelay == null) { if (withDelay == null) {
disp.Disposable = fromObservable.ObserveOn(RxApp.DeferredScheduler).Subscribe(ret.Add, onError); inner.Disposable = observable.ObserveOn(RxApp.DeferredScheduler).Subscribe(internalAdd, onError);
return ret; return;
} }


// On a timer, dequeue items from queue if they are available // On a timer, dequeue items from queue if they are available
var queue = new Queue<T>(); var queue = new Queue<T>();
var disconnect = Observable.Timer(withDelay.Value, withDelay.Value, RxApp.DeferredScheduler) var disconnect = Observable.Timer(withDelay.Value, withDelay.Value, RxApp.DeferredScheduler)
.Subscribe(_ => { .Subscribe(_ => {
if (queue.Count > 0) { if (queue.Count > 0) {
ret.Add(queue.Dequeue()); this.internalAdd(queue.Dequeue());
} }
}); });


disp.Disposable = disconnect; inner.Disposable = disconnect;


// When new items come in from the observable, stuff them in the queue. // When new items come in from the observable, stuff them in the queue.
// Using the DeferredScheduler guarantees we'll always access the queue // Using the DeferredScheduler guarantees we'll always access the queue
// from the same thread. // from the same thread.
fromObservable.ObserveOn(RxApp.DeferredScheduler).Subscribe(queue.Enqueue, onError); observable.ObserveOn(RxApp.DeferredScheduler).Subscribe(queue.Enqueue, onError);


// This is a bit clever - keep a running count of the items actually // This is a bit clever - keep a running count of the items actually
// added and compare them to the final count of items provided by the // added and compare them to the final count of items provided by the
// Observable. Combine the two values, and when they're equal, // Observable. Combine the two values, and when they're equal,
// disconnect the timer // disconnect the timer
ret.ItemsAdded.Scan(0, ((acc, _) => acc+1)).Zip(fromObservable.Aggregate(0, (acc,_) => acc+1), this.ItemsAdded.Scan(0, ((acc, _) => acc + 1)).Zip(observable.Aggregate(0, (acc, _) => acc + 1),
(l,r) => (l == r)).Where(x => x).Subscribe(_ => disconnect.Dispose()); (l,r) => (l == r)).Where(x => x).Subscribe(_ => disconnect.Dispose());
}


return ret; public override void Dispose(bool disposing)
*/ {
if (disposing)
{
var disp = Interlocked.Exchange(ref inner, null);
if (disp == null) return;

disp.Dispose();
}
}
}

public static class ReactiveCollectionMixins
{
/// <summary>
/// Creates a collection based on an an Observable by adding items
/// provided until the Observable completes, optionally ensuring a
/// delay. Note that if the Observable never completes and withDelay is
/// set, this method will leak a Timer. This method also guarantees that
/// items are always added via the UI thread.
/// </summary>
/// <param name="fromObservable">The Observable whose items will be put
/// into the new collection.</param>
/// <param name="onError">The handler for errors from the Observable. If
/// not specified, an error will go to DefaultExceptionHandler.</param>
/// <param name="withDelay">If set, items will be populated in the
/// collection no faster than the delay provided.</param>
/// <returns>A new collection which will be populated with the
/// Observable.</returns>
public static ReactiveDerivedCollection<T> CreateCollection<T>(
this IObservable<T> fromObservable,
TimeSpan? withDelay = null,
Action<Exception> onError = null)
{
return new ReactiveDerivedCollectionFromObservable<T>(fromObservable, withDelay, onError);
} }


/// <summary> /// <summary>
Expand Down

0 comments on commit c1c3147

Please sign in to comment.