diff --git a/src/DynamicData/Cache/ObservableCache.cs b/src/DynamicData/Cache/ObservableCache.cs index 8c6411bd5..4ce3becde 100644 --- a/src/DynamicData/Cache/ObservableCache.cs +++ b/src/DynamicData/Cache/ObservableCache.cs @@ -21,7 +21,6 @@ internal sealed class ObservableCache : IObservableCache _readerWriter; private readonly IDisposable _cleanUp; private readonly object _locker = new object(); - private readonly object _writeLock = new object(); private int _editLevel; // The level of recursion in editing. @@ -80,7 +79,7 @@ internal void UpdateFromIntermediate(Action> update throw new ArgumentNullException(nameof(updateAction)); } - lock (_writeLock) + lock (_locker) { ChangeSet changes = null; @@ -111,7 +110,7 @@ internal void UpdateFromSource(Action> updateActio throw new ArgumentNullException(nameof(updateAction)); } - lock (_writeLock) + lock (_locker) { ChangeSet changes = null; @@ -162,50 +161,53 @@ private void InvokeNext(ChangeSet changes) } } - public IObservable CountChanged => _countChanged.Value.StartWith(_readerWriter.Count).DistinctUntilChanged(); + public IObservable CountChanged => Observable.Create(observer => + { + lock (_locker) + { + var source = _countChanged.Value.StartWith(_readerWriter.Count).DistinctUntilChanged(); + return source.SubscribeSafe(observer); + } + }); - public IObservable> Watch(TKey key) + public IObservable> Watch(TKey key) => Observable.Create>(observer => { - return Observable.Create> - ( - observer => + lock (_locker) + { + var initial = _readerWriter.Lookup(key); + if (initial.HasValue) + { + observer.OnNext(new Change(ChangeReason.Add, key, initial.Value)); + } + + return _changes.Finally(observer.OnCompleted).Subscribe(changes => { - lock (_locker) + foreach (var change in changes) { - var initial = _readerWriter.Lookup(key); - if (initial.HasValue) + var match = EqualityComparer.Default.Equals(change.Key, key); + if (match) { - observer.OnNext(new Change(ChangeReason.Add, key, initial.Value)); + observer.OnNext(change); } - - return _changes.Finally(observer.OnCompleted).Subscribe(changes => - { - foreach (var change in changes) - { - var match = EqualityComparer.Default.Equals(change.Key, key); - if (match) - { - observer.OnNext(change); - } - } - }); } }); - } + } + }); - public IObservable> Connect(Func predicate = null) + public IObservable> Connect(Func predicate = null) => Observable.Create>(observer => { - return Observable.Defer(() => + lock (_locker) { - lock (_locker) + var initial = GetInitialUpdates(predicate); + if (initial.Count != 0) { - var initial = GetInitialUpdates(predicate); - var changes = Observable.Return(initial).Concat(_changes); - - return (predicate == null ? changes : changes.Filter(predicate)).NotEmpty(); + observer.OnNext(initial); } - }); - } + + var updateSource = (predicate == null ? _changes : _changes.Filter(predicate)).NotEmpty(); + return updateSource.SubscribeSafe(observer); + } + }); public IObservable> Preview(Func predicate = null) { diff --git a/src/DynamicData/List/SourceList.cs b/src/DynamicData/List/SourceList.cs index bf4e1d126..01c9d9f21 100644 --- a/src/DynamicData/List/SourceList.cs +++ b/src/DynamicData/List/SourceList.cs @@ -25,7 +25,6 @@ public sealed class SourceList : ISourceList private readonly ReaderWriter _readerWriter = new ReaderWriter(); private readonly IDisposable _cleanUp; private readonly object _locker = new object(); - private readonly object _writeLock = new object(); private int _editLevel; @@ -50,7 +49,7 @@ public SourceList(IObservable> source = null) private IDisposable LoadFromSource(IObservable> source) { - return source + return source.Synchronize(_locker) .Finally(OnCompleted) .Select(_readerWriter.Write) .Subscribe(InvokeNext, OnError, OnCompleted); @@ -64,7 +63,7 @@ public void Edit([NotNull] Action> updateAction) throw new ArgumentNullException(nameof(updateAction)); } - lock (_writeLock) + lock (_locker) { IChangeSet changes = null; @@ -151,7 +150,14 @@ private void OnError(Exception exception) public int Count => _readerWriter.Count; /// - public IObservable CountChanged => _countChanged.Value.StartWith(_readerWriter.Count).DistinctUntilChanged(); + public IObservable CountChanged => Observable.Create(observer => + { + lock (_locker) + { + var source = _countChanged.Value.StartWith(_readerWriter.Count).DistinctUntilChanged(); + return source.SubscribeSafe(observer); + } + }); /// public IObservable> Connect(Func predicate = null)