diff --git a/ReactiveUI/ReactiveCollectionMixins.cs b/ReactiveUI/ReactiveCollectionMixins.cs index e650c803db..b661f5670a 100644 --- a/ReactiveUI/ReactiveCollectionMixins.cs +++ b/ReactiveUI/ReactiveCollectionMixins.cs @@ -7,9 +7,34 @@ using System.Collections.Specialized; using System.Reactive.Subjects; using System.Globalization; +using System.Threading; +using System.Reactive.Disposables; namespace ReactiveUI { + public sealed class ReactiveDerivedCollection : ReactiveCollection, IDisposable + { + IDisposable inner = null; + + public ReactiveDerivedCollection(IDisposable disposable) : base() + { + inner = disposable; + } + + public ReactiveDerivedCollection(IEnumerable items, IDisposable disposable) : base(items) + { + inner = disposable; + } + + public void Dispose() + { + var disp = Interlocked.Exchange(ref inner, null); + if (disp == null) return; + + disp.Dispose(); + } + } + public static class ReactiveCollectionMixins { /// @@ -27,15 +52,17 @@ public static class ReactiveCollectionMixins /// collection no faster than the delay provided. /// A new collection which will be populated with the /// Observable. - public static ReactiveCollection CreateCollection( + public static ReactiveDerivedCollection CreateCollection( this IObservable fromObservable, TimeSpan? withDelay = null, Action onError = null) { - var ret = new ReactiveCollection(); + var disp = new SingleAssignmentDisposable(); + var ret = new ReactiveDerivedCollection(disp); + onError = onError ?? (ex => RxApp.DefaultExceptionHandler.OnNext(ex)); if (withDelay == null) { - fromObservable.ObserveOn(RxApp.DeferredScheduler).Subscribe(ret.Add, onError); + disp.Disposable = fromObservable.ObserveOn(RxApp.DeferredScheduler).Subscribe(ret.Add, onError); return ret; } @@ -48,6 +75,8 @@ public static class ReactiveCollectionMixins } }); + disp.Disposable = disconnect; + // When new items come in from the observable, stuff them in the queue. // Using the DeferredScheduler guarantees we'll always access the queue // from the same thread. @@ -78,7 +107,7 @@ public static class ReactiveCollectionMixins /// collection no faster than the delay provided. /// A new collection which will be populated with the /// Observable. - public static ReactiveCollection CreateCollection( + public static ReactiveDerivedCollection CreateCollection( this IObservable fromObservable, Func selector, TimeSpan? withDelay = null) @@ -114,7 +143,7 @@ public static class ObservableCollectionMixin /// A new collection whose items are equivalent to /// Collection.Select().Where().OrderBy() and will mirror changes /// in the initial collection. - public static ReactiveCollection CreateDerivedCollection( + public static ReactiveDerivedCollection CreateDerivedCollection( this IEnumerable This, Func selector, Func filter = null, @@ -123,6 +152,7 @@ public static class ObservableCollectionMixin { Contract.Requires(selector != null); + var disp = new CompositeDisposable(); var collChanged = new Subject(); if (selector == null) { @@ -134,20 +164,24 @@ public static class ObservableCollectionMixin var enumerable = origEnum.Select(selector); enumerable = (orderer != null ? enumerable.OrderBy(x => x, new FuncComparator(orderer)) : enumerable); - var ret = new ReactiveCollection(enumerable); + var ret = new ReactiveDerivedCollection(enumerable, disp); var incc = This as INotifyCollectionChanged; if (incc != null) { - ((INotifyCollectionChanged)This).CollectionChanged += (o, e) => collChanged.OnNext(e); + var connObs = Observable.FromEventPattern(x => incc.CollectionChanged += x, x => incc.CollectionChanged -= x) + .Select(x => x.EventArgs) + .Multicast(collChanged); + + disp.Add(connObs.Connect()); } if (filter != null && orderer == null) { throw new Exception("If you specify a filter, you must also specify an ordering function"); } - signalReset.Subscribe(_ => collChanged.OnNext(new NotifyCollectionChangedEventArgs(NotifyCollectionChangedAction.Reset))); + disp.Add(signalReset.Subscribe(_ => collChanged.OnNext(new NotifyCollectionChangedEventArgs(NotifyCollectionChangedAction.Reset)))); - collChanged.Subscribe(args => { + disp.Add(collChanged.Subscribe(args => { if (args.Action == NotifyCollectionChangedAction.Reset) { using(ret.SuppressChangeNotifications()) { ret.Clear(); @@ -195,7 +229,7 @@ public static class ObservableCollectionMixin ret.Insert(positionForNewItem(ret, toAdd, orderer), toAdd); } } - }); + })); return ret; } @@ -220,7 +254,7 @@ public static class ObservableCollectionMixin /// A new collection whose items are equivalent to /// Collection.Select().Where().OrderBy() and will mirror changes /// in the initial collection. - public static ReactiveCollection CreateDerivedCollection( + public static ReactiveDerivedCollection CreateDerivedCollection( this IEnumerable This, Func selector, Func filter = null,