Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Query subscriptions emit data in an incorrect order #793

MrSlateZB opened this issue Dec 5, 2019 · 5 comments

Query subscriptions emit data in an incorrect order #793

MrSlateZB opened this issue Dec 5, 2019 · 5 comments


Copy link

@MrSlateZB MrSlateZB commented Dec 5, 2019

When using a Query subscription, I think there is an implicit assumption that the last call to onData will contain the most recent data in the database. Or in other words, if I put 'A', 'B', 'C' into the database, I expect my onData callbacks to receive 'A', 'B', 'C' in that order. Or, at the very least, I expect 'C' to be the last emission I get. Unfortunately, that is not currently always the case.

It seems like this ordering was trying to be enforced at the BoxStore level. In the ObjectClassPublisher a concentrated effort is made to ensure that entity id changes are processed in a synchronized fashion by locking it to one thread at a time. In other words, it ensures that changes are emitted to data observers in the order that they were published.

The problem appears within the QueryPublisher because it takes those changes and publishes them to an unbounded thread pool. Once there, it executes the query to get the data, and emits them to all child observers. This is problematic, because there is nothing preventing multiple threads in the pool from querying and emitting data to the child observer. An example best illustrates the problem.

Lets say we have a Box with a list of car brands. For simplicity, imagine only one thread runs at a time.

  1. Some time in the past, data observer is subscribed to query to get all car brands
  2. InsertThread - Put Toyota into car brand box
  3. ThreadPool-1 - Receives change, queries box, and gets { Toyota } ... Context switch occurs
  4. InsertThread - Put Lexus into car brand box
  5. ThreadPool-2 - Receives changes, queries box, and gets { Toyota, Lexus }. Emits to data observer
  6. ThreadPool-1 - Emits list { Toyota } to data observer

Now, the query data observer incorrectly believes that the box contains only Toyota instead of Toyota and Lexus.

The way we are currently working around this problem is only using the entity class observers. Then in the observer we have a synchronized block that will run find() on our query and emit the data. You might think the synchronized block wouldn't be required, but the initial publishSingle for entity class observers has the possibility of running concurrently with the entity id based change processing.

Copy link

@greenrobot-team greenrobot-team commented Dec 9, 2019

Thanks for the detailed report!

I guess the order can be guaranteed by queuing find() calls after publish() or publishSingle() is called instead of immediately posting them to the thread pool.

However, a transformer also runs on the thread pool without guaranteeing order breaking the above fix. (Providing a thread-pool scheduler might break order as well, but here it is an explicit decision by the user.)

Copy link

@MrSlateZB MrSlateZB commented Dec 9, 2019

If you are talking about synchronously posting to a queue, and then processing out that queue on a thread-pool thread (similar to how its done in the ObjectClassPublisher) then I think that would work fine for all cases except the transformer case you mentioned.

Do you think for the ObjectClassPublisher we should modify the publishSingle() to also use the changesQueue for consistency's sake? This is important because the SubscriptionBuilder uses publishSingle to bootstrap the subscription and so it operates outside the changesQueue synchronization being done.

As for transformations there are a few solutions I could think of:

  1. Don't run transformations on a new thread. The current documentation is a bit confusing on this topic. For reference:

The transformation is done in an asynchronous thread.
* The observer will be called in the same asynchronous thread unless a Scheduler is defined using
* on(Scheduler).
* This is roughly equivalent to the map operator as known in Rx and Kotlin.

When I first read this I thought "an asynchronous thread from what?". The DB Change? Currently it means from the change emission thread. Part of the confusion I think stems from

The observer will be called in the same asynchronous thread...

My thinking is to just run the transformation on the already asynchronous thread the change notification is on. In other words, don't use a scheduler unless one is given to you. This would make the comparison to map in RxJava even more apt (since Rx does not apply a scheduler at all).

The downside to this is that the change thread is then held up by however long it takes to do the transform. I'm not sure this is much of a disadvantage, though, because the only situation where this wouldn't apply to all changes anyway (since all query results are transformed) are situations where users are sharing the same query object to create different subscriptions.

  1. Change the thread-pool for transformations to be single-threaded. While this gives you the asynchronous transformations, I'm not sure it buys the end user much.

  2. Do nothing and make it clear that these transformations can cause query results to appear out of order.

@greenrobot-team greenrobot-team added the bug label Dec 10, 2019
@greenrobot-team greenrobot-team self-assigned this Dec 10, 2019
@greenrobot-team greenrobot-team added this to the 3.0 milestone Mar 9, 2020
Copy link

@greenrobot-team greenrobot-team commented Mar 9, 2020

3.0.0-alpha1 has changes to ensure results are delivered in order.

All requests to publishers are now queued and processed on a single thread per publisher. This thread is still scheduled on the internal thread pool. Transformers will run on that same publisher thread. So results will be delivered from the same publisher thread, regardless if transformers are used or not.

If the frequency of publish requests is high (e.g. an entity is updated frequently) and is higher than the transform frequency, this may lead to publish requests filling up the queue. To mitigate that QueryPublisher will notify all currently waiting observers with the latest query results, instead of repeating the query for each. This should provide a good balance between performance and memory consumption (e.g. vs. running transforms in parallel and storing results until it's time to deliver them).

I uploaded the Java sources for 3.0.0-alpha1 so these changes can be inspected.

We welcome your feedback.

Copy link

@MrSlateZB MrSlateZB commented Mar 9, 2020

This looks good to me. The only point that I'd make is this maybe a good time to also address #695.

As a TL;DR the behavior seen in that ticket is due to observers being obtained at the time of notification instead of the time the data changes (AKA when publish is called).

Copy link

@greenrobot greenrobot commented Jun 23, 2021

FYI: This is also part of the 2.9.2-RC2 release.

greenrobot-team added a commit that referenced this issue Jul 5, 2021
Enforce order of result delivery for publishers

See merge request objectbox/objectbox-java!49
@greenrobot-team greenrobot-team removed this from the 2.9.2 milestone Oct 19, 2021
@greenrobot-team greenrobot-team added this to the 3.0 milestone Oct 19, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
None yet

No branches or pull requests

3 participants