Skip to content

Commit

Permalink
Create a disposable version of ReactiveCollection and make sure the s…
Browse files Browse the repository at this point in the history
…ubs that CreateDerivedCollection take get disposed. Fixes #199
  • Loading branch information
anaisbetts committed Mar 1, 2013
1 parent 0c1411b commit 9420f69
Showing 1 changed file with 45 additions and 11 deletions.
56 changes: 45 additions & 11 deletions ReactiveUI/ReactiveCollectionMixins.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,34 @@
using System.Collections.Specialized;
using System.Reactive.Subjects;
using System.Globalization;
using System.Threading;
using System.Reactive.Disposables;

namespace ReactiveUI
{
public sealed class ReactiveDerivedCollection<T> : ReactiveCollection<T>, IDisposable
{
IDisposable inner = null;

public ReactiveDerivedCollection(IDisposable disposable) : base()
{
inner = disposable;

This comment has been minimized.

Copy link
@haacked

haacked Mar 1, 2013

Contributor

Is disposable allowed to be null? If so, provide a default ctor? If not, guard against it so people know how to do it right.

}

public ReactiveDerivedCollection(IEnumerable<T> items, IDisposable disposable) : base(items)
{
inner = disposable;
}

public void Dispose()
{
var disp = Interlocked.Exchange(ref inner, null);
if (disp == null) return;

disp.Dispose();
}
}

public static class ReactiveCollectionMixins
{
/// <summary>
Expand All @@ -27,15 +52,17 @@ public static class ReactiveCollectionMixins
/// collection no faster than the delay provided.</param>
/// <returns>A new collection which will be populated with the
/// Observable.</returns>
public static ReactiveCollection<T> CreateCollection<T>(
public static ReactiveDerivedCollection<T> CreateCollection<T>(
this IObservable<T> fromObservable,
TimeSpan? withDelay = null,
Action<Exception> onError = null)
{
var ret = new ReactiveCollection<T>();
var disp = new SingleAssignmentDisposable();
var ret = new ReactiveDerivedCollection<T>(disp);

onError = onError ?? (ex => RxApp.DefaultExceptionHandler.OnNext(ex));
if (withDelay == null) {
fromObservable.ObserveOn(RxApp.DeferredScheduler).Subscribe(ret.Add, onError);
disp.Disposable = fromObservable.ObserveOn(RxApp.DeferredScheduler).Subscribe(ret.Add, onError);

This comment has been minimized.

Copy link
@haacked

haacked Mar 1, 2013

Contributor

Potential NullReferenceException on fromObservable. We should probably guard against it. It really is more useful to have an ArgumentNullException with the parameter name as opposed to a NullReferenceException in the ctor where you have no idea what's causing it.

return ret;
}

Expand All @@ -48,6 +75,8 @@ public static class ReactiveCollectionMixins
}
});

disp.Disposable = disconnect;

// When new items come in from the observable, stuff them in the queue.
// Using the DeferredScheduler guarantees we'll always access the queue
// from the same thread.
Expand Down Expand Up @@ -78,7 +107,7 @@ public static class ReactiveCollectionMixins
/// collection no faster than the delay provided.</param>
/// <returns>A new collection which will be populated with the
/// Observable.</returns>
public static ReactiveCollection<TRet> CreateCollection<T, TRet>(
public static ReactiveDerivedCollection<TRet> CreateCollection<T, TRet>(
this IObservable<T> fromObservable,
Func<T, TRet> selector,
TimeSpan? withDelay = null)
Expand Down Expand Up @@ -114,7 +143,7 @@ public static class ObservableCollectionMixin
/// <returns>A new collection whose items are equivalent to
/// Collection.Select().Where().OrderBy() and will mirror changes
/// in the initial collection.</returns>
public static ReactiveCollection<TNew> CreateDerivedCollection<T, TNew, TDontCare>(
public static ReactiveDerivedCollection<TNew> CreateDerivedCollection<T, TNew, TDontCare>(
this IEnumerable<T> This,
Func<T, TNew> selector,
Func<T, bool> filter = null,
Expand All @@ -123,6 +152,7 @@ public static class ObservableCollectionMixin
{
Contract.Requires(selector != null);

var disp = new CompositeDisposable();
var collChanged = new Subject<NotifyCollectionChangedEventArgs>();

if (selector == null) {
Expand All @@ -134,20 +164,24 @@ public static class ObservableCollectionMixin
var enumerable = origEnum.Select(selector);
enumerable = (orderer != null ? enumerable.OrderBy(x => x, new FuncComparator<TNew>(orderer)) : enumerable);

var ret = new ReactiveCollection<TNew>(enumerable);
var ret = new ReactiveDerivedCollection<TNew>(enumerable, disp);

var incc = This as INotifyCollectionChanged;
if (incc != null) {
((INotifyCollectionChanged)This).CollectionChanged += (o, e) => collChanged.OnNext(e);
var connObs = Observable.FromEventPattern<NotifyCollectionChangedEventHandler, NotifyCollectionChangedEventArgs>(x => incc.CollectionChanged += x, x => incc.CollectionChanged -= x)
.Select(x => x.EventArgs)
.Multicast(collChanged);

disp.Add(connObs.Connect());
}

if (filter != null && orderer == null) {
throw new Exception("If you specify a filter, you must also specify an ordering function");
}

signalReset.Subscribe(_ => collChanged.OnNext(new NotifyCollectionChangedEventArgs(NotifyCollectionChangedAction.Reset)));
disp.Add(signalReset.Subscribe(_ => collChanged.OnNext(new NotifyCollectionChangedEventArgs(NotifyCollectionChangedAction.Reset))));

collChanged.Subscribe(args => {
disp.Add(collChanged.Subscribe(args => {
if (args.Action == NotifyCollectionChangedAction.Reset) {
using(ret.SuppressChangeNotifications()) {
ret.Clear();
Expand Down Expand Up @@ -195,7 +229,7 @@ public static class ObservableCollectionMixin
ret.Insert(positionForNewItem(ret, toAdd, orderer), toAdd);
}
}
});
}));

return ret;
}
Expand All @@ -220,7 +254,7 @@ public static class ObservableCollectionMixin
/// <returns>A new collection whose items are equivalent to
/// Collection.Select().Where().OrderBy() and will mirror changes
/// in the initial collection.</returns>
public static ReactiveCollection<TNew> CreateDerivedCollection<T, TNew>(
public static ReactiveDerivedCollection<TNew> CreateDerivedCollection<T, TNew>(
this IEnumerable<T> This,
Func<T, TNew> selector,
Func<T, bool> filter = null,
Expand Down

2 comments on commit 9420f69

@haacked
Copy link
Contributor

@haacked haacked commented on 9420f69 Mar 1, 2013

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting. So basically every call to Subscribe or Connect returns an IDisposable and you're adding each one to the CompositeDisposable so it eventually gets disposed property, right?

@anaisbetts
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@haacked Yeah

Please sign in to comment.