Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BUG] ObservableCache Connect subscription omits concurrent updates #376

Closed
pmg23 opened this issue Jun 24, 2020 · 5 comments
Closed

[BUG] ObservableCache Connect subscription omits concurrent updates #376

pmg23 opened this issue Jun 24, 2020 · 5 comments
Labels

Comments

@pmg23
Copy link
Contributor

pmg23 commented Jun 24, 2020

Describe the bug

There are two related bugs in ObservableCache<TObject, TKey> which cause the changeset sequence emitted by Connect(Func<TObject, bool>) sometimes to miss updates between the initial changeset and the update changesets which follow.

Bug 1: _changes escapes lock scope in Connect(Func<TObject, bool>)

https://github.com/reactiveui/DynamicData/blob/1e61faa7e1db28e4ce1004f99c7d26cc59050a34/src/DynamicData/Cache/ObservableCache.cs#L196-L208

First, note that this method uses Observable.Defer, not Observable.Create. Observable.Defer returns an observable that when subscribed to:

  1. executes the delegate provided (the "callback") to obtain a new observable; then
  2. subscribes to it.

In ObservableCache, access to the Subject field _changes is (apparently) guarded by _locker. But the code above violates this constraint: within the callback's lock scope (executed in step 1) we simply read the _changes field and return this reference within a Concat observable. It is not until sometime after that the calling code subscribes to it (in or after step 2), at which point the lock is no longer held. There is therefore a window where any updates to the cache occurring in another thread will appear neither in the initial changeset nor in any subsequent one, i.e. be lost.

An alternative implementation which does not exhibit this problem is as follows:

    public IObservable<IChangeSet<TObject, TKey>> Connect(Func<TObject, bool> predicate = null)
    {
        return Observable.Create<IChangeSet<TObject, TKey>>(o =>
        {
            lock (_locker)
            {
                var initial = GetInitialUpdates(predicate);
                var updates = predicate == null ? _changes : _changes.Filter(predicate);

                return Observable.Return(initial, ImmediateScheduler.Instance) // immediate ensures that `updates` is subscribed before we exit lock scope
                    .Concat(updates)
                    .NotEmpty()
                    .Subscribe(o);
            }
        });
    }

Here, the initial changeset construction and the subscription to _changes occur together atomically within the scope of the lock.

Note that this also fixes a relatively minor bug (call it 1b) in the original where if a predicate is given, the already-filtered initial changeset is redundantly filtered a second time.

Bug 2: UpdateFromSource/UpdateFromIntermediate access to _changes without lock

https://github.com/reactiveui/DynamicData/blob/1e61faa7e1db28e4ce1004f99c7d26cc59050a34/src/DynamicData/Cache/ObservableCache.cs#L91
https://github.com/reactiveui/DynamicData/blob/1e61faa7e1db28e4ce1004f99c7d26cc59050a34/src/DynamicData/Cache/ObservableCache.cs#L122

In these excerpts from UpdateFromIntermediate and UpdateFromSource respectively _readerWriter.Write's bool parameter collectChanges is determined by the value of _changes.HasObservers. If true, Write returns a non-empty changeset which is propagated to change observers (after correctly acquiring the _locker lock). HasObservers internally uses a volatile read to retrieve the very latest state.

This mechanism appears to exist to avoid the materialisation of changesets redundantly when no-one is interested.

However, a concurrent caller to Connect is interested in any changes that occur after the point of generating the initial changeset, including any that occur before the subscription to _changes has completed (which will cause HasObservers to become true). There is therefore a window during which collectChanges is erroneously determined to be false, resulting in lost changesets.

A simple solution with minimal downside (i.e. no new locking) would be to introduce a volatile flag field:

    private volatile bool _connectInProgress;

and to take it into account when determining collectChanges:

        var collectChanges = _changes.HasObservers || _connectInProgress;
        changes = _readerWriter.Write(updateAction, previewHandler, collectChanges);

Then it just needs to be set and unset within the Connect callback. Thus:

    public IObservable<IChangeSet<TObject, TKey>> Connect(Func<TObject, bool> predicate = null)
    {
        return Observable.Create<IChangeSet<TObject, TKey>>(o =>
        {
            lock (_locker)
            {
                _connectInProgress = true;
                try
                {
                    var initial = GetInitialUpdates(predicate);
                    var updates = predicate == null ? _changes : _changes.Filter(predicate);

                    return Observable.Return(initial, ImmediateScheduler.Instance) // immediate ensures that `updates` is subscribed before we exit lock scope
                        .Concat(updates)
                        .NotEmpty()
                        .Subscribe(o);
                }
                finally
                {
                    _connectInProgress = false;
                }
            }
        });
    }

Steps To Reproduce

Missing updates can be observed by subscribing to a cache that is undergoing frequent concurrent updates from another thread.

I think it likely that #319 and #333 are manifestations of this problem.

Expected behavior

The changeset sequence emitted by the observable returned by Connect should form a contiguous sequence, with no "gaps".

Environment

  • Version: 6.14.14
@pmg23 pmg23 added the bug label Jun 24, 2020
@glennawatson
Copy link
Member

Like the high amount of research you did submitting this bug it definitely helps

We will get back to you in a couple days.

@pmg23
Copy link
Contributor Author

pmg23 commented Jun 25, 2020

Thanks Glenn.

On reflection, my proposed solution to "Bug 2" is still racy. (A writer checks _connectInProgress and determines that change collection is not required... then a reader starts a subscription, collects the initial changeset and subscribes for updates... then the writer completes the update but does not publish a changeset.) I believe I was confusing _writeLock with _readerWriter's internal lock.

A revised solution is to pass a delegate () => _changes.HasObservers || _connectInProgress into _readerWriter.Write instead and have it evaluate this while holding its lock.

@RolandPheasant
Copy link
Collaborator

RolandPheasant commented Jun 25, 2020

What you say makes perfect sense and would resolve a whole manner of ills which have crept into the lib. Several years ago I seriously stressed the lib and found it to be perfectly thread safe but over the years there have been tweaks which on the face of are sound but somewhere along the line things changed, and there have been quite a few reports of threading issues which I am sure will be fixed with your suggestion.

Additionally while on the subject of the connect method there is an issue #359 where Observable.Return holds on to the initial change set, which can be fixed when the above is addressed?

Are you willing to do a PR, also for SourceList?

@pmg23
Copy link
Contributor Author

pmg23 commented Jun 26, 2020

Hi Roland. Thanks for creating a very useful library.

I hope to have a PR ready shortly. (Disclaimer - sorry - this is not a commitment.)

A few observations:

  1. There are a couple of bugs in CountChanged:
    https://github.com/reactiveui/DynamicData/blob/1e61faa7e1db28e4ce1004f99c7d26cc59050a34/src/DynamicData/Cache/ObservableCache.cs#L165

First, the observable returned yields the initial count current at the time when CountChanged is called, followed by a stream of updates starting from when the observable is subscribed. This is non-contiguous - note in particular that the caller is not obliged to subscribe immediately - and can result in the observer seeing an incorrect value indefinitely (until the next update following subscription).

Second, write methods in ObservableCache check if the lazy count subject is created ("check"), then if so call it ("act"); they do this while holding _locker, but CountChanged does not acquire _locker (at any point) opening the possibility of a subscription to the subject occuring between the "check" and "act".

  1. The current implementation suffers from a further problem (which is not fixed by my prior suggestions) that a given change can be delivered to a Connect subscriber twice - both in the initial changeset and in a subsequent update. The observer will see this as the same key being added twice, violating contiguity.

This occurs as follows: assume that change collection is in force; writer thread calls _readerWriter.Write and gets a change... observer thread acquires _locker, subscribes and fetches initial changeset, releases lock... writer acquires _locker and publishes the change.

  1. As you suggested, SourceList has problems too. For example its Connect callback has a check-then-act on _readerWriter.Items while holding _locker, but a concurrent writer updates _readerWriter directly without that lock.

I think actually the solution to these problems in both classes (which I will present in my PR) is simple: eliminate the _writeLocks and use _locker consistently.

Noted also your point about #359. I intend that my change will fix that too.

@github-actions
Copy link

This issue has been automatically locked since there has not been any recent activity after it was closed. Please open a new issue for related bugs.

@github-actions github-actions bot locked as resolved and limited conversation to collaborators Nov 23, 2021
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
Projects
None yet
Development

No branches or pull requests

3 participants