Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Convert RxJava1 to RxJava2 #4992

Merged
merged 13 commits into from Aug 13, 2017
Merged

Conversation

cmelchior
Copy link
Contributor

@cmelchior cmelchior commented Jul 19, 2017

Fixes #4095
Part of #4991

Convert the existing API's to RxJava 2 types:

  • asObservable() -> asFlowable() ... Since Realm is a hot stream, dropping notifications and only using the latest is fine. The naming in the public API reflects this.

  • RealmQuery is exposed as Single(). Note this is still not used in our public API, but the primary use case would move the query to a background scheduler, run the query, and convert all types using copyFromRealm(). Single()` fits that nicely. Also, I don't see any use cases where you would be interested in listening to changes to the query construction.

Conflicts:
	CHANGELOG.md
	realm/realm-library/build.gradle
	realm/realm-library/src/main/java/io/realm/DynamicRealm.java
@cmelchior cmelchior self-assigned this Jul 19, 2017
@Zhuinden Zhuinden mentioned this pull request Jul 19, 2017
6 tasks
subscriber.add(Subscriptions.create(new Action0() {

// Cleanup when stream is disposed
emitter.setDisposable(Disposables.fromRunnable(new Runnable() {
Copy link
Contributor

@Zhuinden Zhuinden Jul 19, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well at least if I know right then RxJava2 solves the issue that addChangeListener actually had to be called after subscriber.add()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not entirely sure what you are referring to?

Copy link
Contributor

@Zhuinden Zhuinden Jul 19, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#4095 figured it may as well be fixed if this stuff is being modified 😄

@Override
protected StrongReferenceCounter<RealmModel> initialValue() {
return new StrongReferenceCounter<RealmModel>();
}
};

private static final BackpressureStrategy BACK_PRESSURE_STRATEGY = BackpressureStrategy.LATEST;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this the only sensible back pressure strategy that Realm can support?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is the one that makes the most sense. The UI generally doesn't care what changed, just that something changed, so yes.

Do you have any concerns about this?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@curiousily it is the only backpressure strategy that makes sense for RealmResults. It will always show its latest value, why enqueue events of the same thing?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the clarification! Eagerly waiting to try out Realm with RxJava2. Will write back if some use case comes to mind 👍

subscriber.add(Subscriptions.create(new Action0() {

// Cleanup when stream is disposed
emitter.setDisposable(Disposables.fromRunnable(new Runnable() {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will automatically closing the local realm instance when disposing cause problems for longer running subscriptions on the same thread? Say you have a longer running Flowable and a shorter running Flowable using the same Realm instance and both are subscribed to. The shorter running Flowable is disposed which closes the realm instance. Would the longer running flowable no longer get updates at that point?

Copy link
Contributor

@Zhuinden Zhuinden Jul 20, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They won't use the same instance because each Realm.getInstance() call is reference counted and returns a "new local instance".

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missed that before in RealmCache. Thanks!

@Zhuinden
Copy link
Contributor

Zhuinden commented Jul 20, 2017

The only thing that nags me a bit inside is that there is a rxjava3-preview which will eventually supercede RxJava2, and Flowable adheres to the reactive-streams specification that would mean you could just use org.reactivestreams.Publisher instead of RxJava2 specifically, which would be more future-proof.

But then RealmObservableFactory would need to have its own provided rxjava2 which would mean it'd move out to its own library just like realm-android-adapters did.

@cmelchior
Copy link
Contributor Author

Publisher only has the subscribe method which is really limited :( Also we thought the same thing when we used rx.Observable as the interface for the original RxObsevableFactory, and then it changed.

Personally, I would rather have really great integration with the "most common" Rx library, and then extensions for any other (RxJava1, RxJava3, ). The RxRealm extension approach should work for anything and with Kotlin you even add them as extension methods for the fluent API.

@Zhuinden
Copy link
Contributor

Zhuinden commented Jul 20, 2017

I never realized the default Publisher and Subscriber specification is so stupid that you get the Subscription not as a return value (like with subscribeWith()) but when you override onSubscribe() in the Subscriber? Yeah, that won't do. Let us be thankful that Flowable hides this. Realm adhering reactive-streams spec would be more trouble than worth. Nevermind me then 😄

Conflicts:
	realm/realm-library/src/main/java/io/realm/Realm.java
@cmelchior cmelchior force-pushed the cm/convert-rxjava1-to-rxjava2 branch from 5b367d4 to f119d6d Compare August 2, 2017 09:44
@cmelchior
Copy link
Contributor Author

Ready for review @realm/java

@bmunkholm bmunkholm removed the S:Review label Aug 8, 2017
Copy link
Contributor

@bmeike bmeike left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. So, we are supporting backpressure? I think it would be great if you would give a lightning talk, to the team about how that is going to work...

@Zhuinden
Copy link
Contributor

@bmeike Flowable.LATEST to the best of my knowledge means that if there was an event emission, then only the latest is kept and sent to the subscribers.

Which makes sense because RealmResults when a notification is sent, only the latest is needed (every other event would also be the latest).

Copy link
Contributor

@zaki50 zaki50 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 with minor comment

Jenkinsfile Outdated
@@ -51,7 +51,7 @@ try {
}
} finally {
storeJunitResults 'realm/realm-annotations-processor/build/test-results/test/TEST-*.xml'
storeJunitResults 'examples/unitTestExample/build/test-results/**/TEST-*.xml'
// storeJunitResults 'examples/unitTestExample/build/test-results/**/TEST-*.xml'
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Needs FIXME comment?

@@ -41,7 +41,7 @@
* @param realm {@link Realm} to listen to changes for.
* @return Rx observable that emit all updates to the Realm.
*/
Observable<Realm> from(Realm realm);
Flowable<Realm> from(Realm realm);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we change the Java doc for all these methods to Flowable instead of Observable ?

Creates a {@code Flowable} for a ...
@see http://reactivex.io/RxJava/2.x/javadoc/io/reactivex/Flowable.html

subscriber.add(Subscriptions.create(new Action0() {

// Cleanup when stream is disposed
emitter.setDisposable(Disposables.fromRunnable(new Runnable() {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not use setCancellation ?

emitter.setCancellation(()-> {
  observableRealm.removeChangeListener(listener);
  observableRealm.close();
});

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think they kinda do the same thing and setDisposable works just fine.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After reading ReactiveX/RxJava#4812 setDisposable seemed like the right choice, but if anyone has any reason for why setCancellable is better, I'll be happy to change it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm pretty sure it's dispose() for Realm's Observables.

}
});

subscription.unsubscribe();
subscription.dispose();
assertTrue(dynamicRealm.isClosed());
}

@Test
@UiThreadTest
public void realmResults_closeInDoOnUnsubscribe() {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

update test name to realmResults_closeInDoOnCancel same for all methods using *DoOnUnsubscribe

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems to me that doOnUnsubscribe is more accurate, perhaps doOnDispose? From what I can tell unsubscribe is the general word used for telling a stream that you no longer want it around, you can do that by either calling dispose, not cancel. Also, subscribing returns a Disposable, not a Cancellable

@Override
public void call(RealmResults<AllTypes> allTypes) {
public void accept(RealmResults<AllTypes> ignored) throws Exception {
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

instead of doing nothing we can assert that the realm is still open or checking the refcount == 1 ?

@cmelchior cmelchior merged commit f2f6e62 into feature/rxjava2 Aug 13, 2017
dalinaum added a commit that referenced this pull request Sep 12, 2017
* Add changelog

* Move Pair into public API (#5081)

* Convert RxJava1 to RxJava2 (#4992)

* Apply rxjava2 to unitTestExample

* Update Jenkinsfile to store junit result of unitTestExample.

* Convert newsreaderExample to RxJava2.

* RxJava2: Add support for changeset observables (#5089)

* Update rxJavaExample and improve some formatting.

* Fix unit testing.

* Fix build.gradle.

* Update build.gradle to fix lint errors.

* Improve some formatting.

* Improve formatting.

* Update style of RxJavaExample.

* Add missing colors.xml file.

* Update styles and colors.

* Rename some disposables.

* PR feedback.

* flatMap -> switchMap and improve formatting.

* Improve formatting.

* Converting newsreaserExample to lambda.

* Improve formatting.

* Improve formatting.

* Apply RxJava2 to unit testing.

* Improve formatting.

* Apply Lambda to rxJavaExample.

* Improve formatting.

* PR feedback

* Re-enable RxJava2 examples

* CompositeDisposable.dispose() -> CompositeDisposable.clear()

* PR feedback.
@cmelchior cmelchior deleted the cm/convert-rxjava1-to-rxjava2 branch November 20, 2017 12:10
@github-actions github-actions bot locked as resolved and limited conversation to collaborators Mar 15, 2024
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

8 participants