Skip to content

Commit

Permalink
Thread safety in ObservableCache and SourceList. Fixes reactivemarble…
Browse files Browse the repository at this point in the history
  • Loading branch information
pmg23 authored and pmg23rcm committed Jun 26, 2020
1 parent d213d2e commit 51da21c
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 36 deletions.
70 changes: 36 additions & 34 deletions src/DynamicData/Cache/ObservableCache.cs
Expand Up @@ -21,7 +21,6 @@ internal sealed class ObservableCache<TObject, TKey> : IObservableCache<TObject,
private readonly ReaderWriter<TObject, TKey> _readerWriter;
private readonly IDisposable _cleanUp;
private readonly object _locker = new object();
private readonly object _writeLock = new object();

private int _editLevel; // The level of recursion in editing.

Expand Down Expand Up @@ -80,7 +79,7 @@ internal void UpdateFromIntermediate(Action<ICacheUpdater<TObject, TKey>> update
throw new ArgumentNullException(nameof(updateAction));
}

lock (_writeLock)
lock (_locker)
{
ChangeSet<TObject, TKey> changes = null;

Expand Down Expand Up @@ -111,7 +110,7 @@ internal void UpdateFromSource(Action<ISourceUpdater<TObject, TKey>> updateActio
throw new ArgumentNullException(nameof(updateAction));
}

lock (_writeLock)
lock (_locker)
{
ChangeSet<TObject, TKey> changes = null;

Expand Down Expand Up @@ -162,50 +161,53 @@ private void InvokeNext(ChangeSet<TObject, TKey> changes)
}
}

public IObservable<int> CountChanged => _countChanged.Value.StartWith(_readerWriter.Count).DistinctUntilChanged();
public IObservable<int> CountChanged => Observable.Create<int>(observer =>
{
lock (_locker)
{
observer.OnNext(_readerWriter.Count);
return _countChanged.Value.SubscribeSafe(observer);
}
});

public IObservable<Change<TObject, TKey>> Watch(TKey key)
public IObservable<Change<TObject, TKey>> Watch(TKey key) => Observable.Create<Change<TObject, TKey>>(observer =>
{
return Observable.Create<Change<TObject, TKey>>
(
observer =>
lock (_locker)
{
var initial = _readerWriter.Lookup(key);
if (initial.HasValue)
{
observer.OnNext(new Change<TObject, TKey>(ChangeReason.Add, key, initial.Value));
}
return _changes.Finally(observer.OnCompleted).Subscribe(changes =>
{
lock (_locker)
foreach (var change in changes)
{
var initial = _readerWriter.Lookup(key);
if (initial.HasValue)
var match = EqualityComparer<TKey>.Default.Equals(change.Key, key);
if (match)
{
observer.OnNext(new Change<TObject, TKey>(ChangeReason.Add, key, initial.Value));
observer.OnNext(change);
}
return _changes.Finally(observer.OnCompleted).Subscribe(changes =>
{
foreach (var change in changes)
{
var match = EqualityComparer<TKey>.Default.Equals(change.Key, key);
if (match)
{
observer.OnNext(change);
}
}
});
}
});
}
}
});

public IObservable<IChangeSet<TObject, TKey>> Connect(Func<TObject, bool> predicate = null)
public IObservable<IChangeSet<TObject, TKey>> Connect(Func<TObject, bool> predicate = null) => Observable.Create<IChangeSet<TObject, TKey>>(observer =>
{
return Observable.Defer(() =>
lock (_locker)
{
lock (_locker)
var initial = GetInitialUpdates(predicate);
if (initial.Count != 0)
{
var initial = GetInitialUpdates(predicate);
var changes = Observable.Return(initial).Concat(_changes);
return (predicate == null ? changes : changes.Filter(predicate)).NotEmpty();
observer.OnNext(initial);
}
});
}
var updateSource = (predicate == null ? _changes : _changes.Filter(predicate)).NotEmpty();
return updateSource.SubscribeSafe(observer);
}
});

public IObservable<IChangeSet<TObject, TKey>> Preview(Func<TObject, bool> predicate = null)
{
Expand Down
3 changes: 1 addition & 2 deletions src/DynamicData/List/SourceList.cs
Expand Up @@ -25,7 +25,6 @@ public sealed class SourceList<T> : ISourceList<T>
private readonly ReaderWriter<T> _readerWriter = new ReaderWriter<T>();
private readonly IDisposable _cleanUp;
private readonly object _locker = new object();
private readonly object _writeLock = new object();

private int _editLevel;

Expand Down Expand Up @@ -64,7 +63,7 @@ public void Edit([NotNull] Action<IExtendedList<T>> updateAction)
throw new ArgumentNullException(nameof(updateAction));
}

lock (_writeLock)
lock (_locker)
{
IChangeSet<T> changes = null;

Expand Down

0 comments on commit 51da21c

Please sign in to comment.