Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CompositeObservable to merge Observable Events #38

Closed
thomasnield opened this issue Jul 22, 2016 · 11 comments
Closed

CompositeObservable to merge Observable Events #38

thomasnield opened this issue Jul 22, 2016 · 11 comments

Comments

@thomasnield
Copy link
Collaborator

thomasnield commented Jul 22, 2016

I am encountering a need heavily where I want to merge() multiple Observable event sources, but I don't have the sources available upon initialization. I need to add them later at separate places in the application.

With David Karnok's help, I came up with a utility. Here is the Kotlin implementation.

class ObservableBus<T>(cacheCount: Int = 0) {
​
    private val subject: SerializedSubject<Observable<T>, Observable<T>> = PublishSubject<Observable<T>>().toSerialized()
    private val live: MutableSet<Observable<T>> = ConcurrentHashMap.newKeySet<Observable<T>>()
​
    private val observable = subject.flatMap { obs: Observable<T> -> obs.takeWhile { live.contains(obs) } }
            .observeOnFx().let { if (cacheCount > 0) it.cacheWithInitialCapacity(cacheCount) else it }
​
    fun toObservable(): Observable<T> = observable
​
    operator fun plusAssign(observable: Observable<T>) = add(observable)
    operator fun minusAssign(observable: Observable<T>) = remove(observable)
​
    fun add(observable: Observable<T>) {
        live.add(observable)
        subject.onNext(observable)
    }
    fun remove(observable: Observable<T>) {
        live.remove(observable)
    }
}

This is also helpful for creating event-driven models. So if you have a MenuItem, Button, and key combo that all trigger a refresh request, you can merge all three Observables later.

object MyEventModel { 
    val refreshRequests = ObservableBus<ActionEvent>
}
val button = Button("Refresh")
MyEventModel.refresh += button.actionEvents()

//later
val menuItem = MenuItem("Refresh")
MyEventModel.refresh += button.actionEvents()

//and later again
val ctrlRPress = someNode.events(KeyEvent.KEY_PRESSED,)
     .filter { it.isControlDown && it.keyCode = KeyCode.R }
     .map { ActionEvent() }

MyEventModel.refresh += ctrlRPress

Subscription

MyEventModel.refresh.subscribe { performRefresh }

Of course, you can push anything and not just ActionEvent items. Does anybody else see value in this? I'll write this in Java if there is interest.

@thomasnield
Copy link
Collaborator Author

Tagging you since you're pretty objective @raniejade. What do you think?

@raniejade
Copy link

Hmmm, kinda looks like Observable#merge to me. I do find it useful, but a bit redundant.

@thomasnield
Copy link
Collaborator Author

Yes it is basically Observable#merge but it allows Observables to be added/removed at any time to a centralized instance. Additions and removals also affect all subscriptions. I'll create an example later showing why this is needed. Without it, you can get in a bit of trouble using Observable#merge with complex UI's.

@raniejade
Copy link

Follow up question, what happens when one of the source observable terminate with an error?

@thomasnield
Copy link
Collaborator Author

Good question. I think the pooled Observable will continue on without it unless you use a retry. I'll need to do some tests today regarding error behavior, bit I think that may be on the developer to handle it.

@thomasnield
Copy link
Collaborator Author

thomasnield commented Jul 22, 2016

Alright discovered a few issues. The error communication is not working like I expect. It just stops pushing items. when an error occurs downstream. Subjects are something I am not an expert in, especially when it comes to error handling.

Here is my test app.

class MyApp : App() {
    override val primaryView = TestView::class
}


object Decrementer {

    val numbers = Observable.just(5.0,4.0,3.0,2.0,1.0,0.0,-1.0,-2.0, -3.0)
    val decrementRequests = ObservableBus<ActionEvent>()

    val decrements = decrementRequests.toObservable()
            .zipWith(numbers) { ae,dbl -> dbl }
}


class TestView: View() {
    override val root = VBox()


    init {
        with(root) {
            label("start") {
                Decrementer.decrements
                        .doOnNext { if (it == 0.0) throw Exception("Zero not allowed!") }
                        .retry()
                        .subscribe { text = it.toString() }
            }
            Decrementer.decrementRequests += button("Decrementer 1").actionEvents()
            Decrementer.decrementRequests += button("Decrementer 2").actionEvents()
        }
    }
}


class ObservableBus<T>(cacheCount: Int = 0) {

    private val subject: SerializedSubject<Observable<T>, Observable<T>> = PublishSubject<Observable<T>>().toSerialized()
    private val live: MutableSet<Observable<T>> = ConcurrentHashMap.newKeySet<Observable<T>>()

    private val observable = subject.flatMap { obs: Observable<T> -> obs.takeWhile { live.contains(obs) } }
           .observeOnFx().let { if (cacheCount > 0) it.cacheWithInitialCapacity(cacheCount) else it }

    fun toObservable(): Observable<T> = observable

    operator fun plusAssign(observable: Observable<T>) = add(observable)
    operator fun minusAssign(observable: Observable<T>) = remove(observable)

    fun add(observable: Observable<T>) {
        live.add(observable)
        subject.onNext(observable)
    }
    fun remove(observable: Observable<T>) {
        live.remove(observable)
    }
}

Another issue is the subscriptions seem to have to happen before the source Observables are added. Otherwise nothing is emitted.

This works fine:

label("start") {
    Decrementer.decrements
            .doOnNext { if (it == 0.0) throw Exception("Zero not allowed!") }
            .retry()
            .subscribe { text = it.toString() }
}
Decrementer.decrementRequests += button("Decrementer 1").actionEvents()
Decrementer.decrementRequests += button("Decrementer 2").actionEvents()

This does not:

Decrementer.decrementRequests += button("Decrementer 1").actionEvents()
Decrementer.decrementRequests += button("Decrementer 2").actionEvents()
label("start") {
    Decrementer.decrements
            .doOnNext { if (it == 0.0) throw Exception("Zero not allowed!") }
            .retry()
            .subscribe { text = it.toString() }
}

@thomasnield
Copy link
Collaborator Author

thomasnield commented Jul 23, 2016

Alright, I got a JavaFX-driven solution using an ObservableList and a switchMap(). This seems to work perfectly. Error handling is predictable too, and I completely avoided using Subjects.

class ObservableBus<T>(val cacheCount: Int = 0) {

    private val sources = FXCollections.synchronizedObservableList(FXCollections.observableArrayList<Observable<T>>())

    private val observable = sources.onChangedObservable()
            .switchMap { Observable.from(it).flatMap { it } }
            .observeOnFx().let { if (cacheCount > 0) it.cacheWithInitialCapacity(cacheCount) else it }

    fun toObservable(): Observable<T> = observable

    operator fun plusAssign(observable: Observable<T>) = add(observable)
    operator fun minusAssign(observable: Observable<T>) = remove(observable)

    fun add(observable: Observable<T>) {
        sources.add(observable)
    }
    fun remove(observable: Observable<T>) {
        sources.remove(observable)
    }
}

@thomasnield
Copy link
Collaborator Author

thomasnield commented Jul 23, 2016

Here's a simpler example I was using to test retry() against an error. The moment it hits 0 and throws the error, it resubscribes and starts all over like it should.

class MyApp : App() {
    override val primaryView = TestView::class
}

object EventModel {
    val decrementRequests = ObservableBus<ActionEvent>()
}

class TestView: View() {
        override val root = VBox()

        init {
            with(root) {
                label("start") {
                    EventModel.decrementRequests.toObservable()
                            .map { 1 }.scan(10) { x,y -> x - y }
                            .doOnNext { if (it == 0) throw Exception("Zero not allowed!") }
                            .retry(2)
                            .subscribe { text = it.toString() }
                }
                Decrementer.decrementRequests += button("Decrementer 1").actionEvents()
                Decrementer.decrementRequests += button("Decrementer 2").actionEvents()
            }
        }
}

Unless anybody questions this or thinks this is too niche, I'll probably implement this in RxJavaFX and RxKotlinFX. Is there a better name than ObservableBus?

@thomasnield
Copy link
Collaborator Author

@JakeWharton Has created a better name for this utility. We shall call it CompositeObservable.

@thomasnield
Copy link
Collaborator Author

I proposed this in RxJava just to see if it gets shot down or not. ReactiveX/RxJava#4235

@thomasnield thomasnield changed the title ObservableBus to merge Observable Events CompositeObservable to merge Observable Events Jul 23, 2016
@thomasnield
Copy link
Collaborator Author

I went ahead and implemented this. If by any chance it goes into another library like RxJava or RxJava-Extras, I may deprecate later. For now I'd like it to be included.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants