Skip to content

Commit 76e5a4a

Browse files
To deliver in order run transform on same thread as ActionObserver was notified on.
1 parent 1e200ee commit 76e5a4a

File tree

5 files changed

+26
-24
lines changed

5 files changed

+26
-24
lines changed

objectbox-java/src/main/java/io/objectbox/BoxStore.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1064,7 +1064,7 @@ long internalHandle() {
10641064
* Note that failed or aborted transaction do not trigger observers.
10651065
*/
10661066
public SubscriptionBuilder<Class> subscribe() {
1067-
return new SubscriptionBuilder<>(objectClassPublisher, null, threadPool);
1067+
return new SubscriptionBuilder<>(objectClassPublisher, null);
10681068
}
10691069

10701070
@Experimental
@@ -1159,7 +1159,7 @@ public void setDbExceptionListener(@Nullable DbExceptionListener dbExceptionList
11591159
*/
11601160
@SuppressWarnings("unchecked")
11611161
public <T> SubscriptionBuilder<Class<T>> subscribe(Class<T> forClass) {
1162-
return new SubscriptionBuilder<>((DataPublisher) objectClassPublisher, forClass, threadPool);
1162+
return new SubscriptionBuilder<>((DataPublisher) objectClassPublisher, forClass);
11631163
}
11641164

11651165
@Internal

objectbox-java/src/main/java/io/objectbox/query/Query.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -611,7 +611,7 @@ public long remove() {
611611
* it may be GCed and observers may become stale (won't receive anymore data).
612612
*/
613613
public SubscriptionBuilder<List<T>> subscribe() {
614-
return new SubscriptionBuilder<>(publisher, null, box.getStore().internalThreadPool());
614+
return new SubscriptionBuilder<>(publisher, null);
615615
}
616616

617617
/**

objectbox-java/src/main/java/io/objectbox/reactive/SubscriptionBuilder.java

Lines changed: 21 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,6 @@
1616

1717
package io.objectbox.reactive;
1818

19-
import java.util.concurrent.ExecutorService;
20-
2119
import javax.annotation.Nullable;
2220

2321
import io.objectbox.annotation.apihint.Internal;
@@ -44,7 +42,6 @@
4442
public class SubscriptionBuilder<T> {
4543
private final DataPublisher<T> publisher;
4644
private final Object publisherParam;
47-
private final ExecutorService threadPool;
4845
private DataObserver<T> observer;
4946
// private Runnable firstRunnable;
5047
private boolean weak;
@@ -58,10 +55,9 @@ public class SubscriptionBuilder<T> {
5855

5956

6057
@Internal
61-
public SubscriptionBuilder(DataPublisher<T> publisher, @Nullable Object param, ExecutorService threadPool) {
58+
public SubscriptionBuilder(DataPublisher<T> publisher, @Nullable Object param) {
6259
this.publisher = publisher;
6360
publisherParam = param;
64-
this.threadPool = threadPool;
6561
}
6662

6763
// public Observable<T> runFirst(Runnable firstRunnable) {
@@ -214,19 +210,27 @@ public void onData(final T data) {
214210
}
215211
}
216212

213+
/**
214+
* Runs on the thread of the {@link #onData(Object)} caller to ensure data is delivered
215+
* in the same order as {@link #onData(Object)} was called, to prevent delivering stale data.
216+
* <p>
217+
* For both ObjectClassPublisher and QueryPublisher this is the asynchronous
218+
* thread publish requests are processed on.
219+
* <p>
220+
* This could be optimized in the future to allow parallel execution,
221+
* but this would require an ordering mechanism for the transformed data.
222+
*/
217223
private void transformAndContinue(final T data) {
218-
threadPool.submit(() -> {
219-
if (subscription.isCanceled()) {
220-
return;
221-
}
222-
try {
223-
// Type erasure FTW
224-
T result = (T) transformer.transform(data);
225-
callOnData(result);
226-
} catch (Throwable th) {
227-
callOnError(th, "Transformer failed without an ErrorObserver set");
228-
}
229-
});
224+
if (subscription.isCanceled()) {
225+
return;
226+
}
227+
try {
228+
// Type erasure FTW
229+
T result = (T) transformer.transform(data);
230+
callOnData(result);
231+
} catch (Throwable th) {
232+
callOnError(th, "Transformer failed without an ErrorObserver set");
233+
}
230234
}
231235

232236
private void callOnError(Throwable th, String msgNoErrorObserver) {

objectbox-rxjava/src/test/java/io/objectbox/query/MockQuery.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ public MockQuery(boolean hasOrder) {
3535
// when(box.getStore()).thenReturn(boxStore);
3636
query = mock(Query.class);
3737
fakeQueryPublisher = new FakeQueryPublisher();
38-
SubscriptionBuilder subscriptionBuilder = new SubscriptionBuilder(fakeQueryPublisher, null, null);
38+
SubscriptionBuilder subscriptionBuilder = new SubscriptionBuilder(fakeQueryPublisher, null);
3939
when(query.subscribe()).thenReturn(subscriptionBuilder);
4040
}
4141

objectbox-rxjava3/src/test/java/io/objectbox/query/MockQuery.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -40,9 +40,7 @@ public MockQuery(boolean hasOrder) {
4040
//noinspection unchecked It's a unit test, casting is fine.
4141
query = (Query<T>) mock(Query.class);
4242
fakeQueryPublisher = new FakeQueryPublisher<>();
43-
//noinspection ConstantConditions ExecutorService only used for transforms.
44-
SubscriptionBuilder<List<T>> subscriptionBuilder = new SubscriptionBuilder<>(
45-
fakeQueryPublisher, null, null);
43+
SubscriptionBuilder<List<T>> subscriptionBuilder = new SubscriptionBuilder<>(fakeQueryPublisher, null);
4644
when(query.subscribe()).thenReturn(subscriptionBuilder);
4745
}
4846

0 commit comments

Comments
 (0)