-
Notifications
You must be signed in to change notification settings - Fork 51
Fusing subscribeOn with upstream #25
Comments
Generally,
Generally, they can't.
No and won't be.
Since there are only the two of us, we chat in hangouts quite frequently. However, the fusion-related discussions happened mostly half a year ago; today, we don't discuss the theory behind it anymore and only do maintenance-related work based on feedback from Reactor-Core users.
Unless you are already a master of Reactive programming and Reactive Streams, you'd have found it confusing and under-explained. I believe the best distilled version of what we accomplished is in my two blog post about it: part 1, part 2. |
Ah the part I was missing was the requirement to negotiate synchronously in onSubscribe. Makes sense that subscribeOn cannot as it is switching threads. I've read the blog posts you linked (as well as much of the posts in your blog - thank you for creating such a useful resource BTW). I'm certainly no master of Reactive streams although I'm working on it :P (my current work in this area is playing around with zero-heap allocation - i.e. only stack allocated - reactive streams in Rust). In any case, I'll be following closely with interest both your work on RxJava 2.0 as well as the maintenance work here :) |
Sorry for reopening again - didn't think it was worth opening a new issue for this. But I was looking through the history of subscribeOn and I noticed that at one point (90e6655) there was a non-eager variant of subscribeOn which delayed onSubscribe until it received it from upstream. What was the rationale for removing this variant? From what I could tell this would solve the problem you described above (requiring to negotiate on the same threead) and lend itself to fusion. I also notice that RxJava also has a TODO to move to calling onSubscribe to subscribe time. I guess that was determined based on your work here? Thanks again for being so helpful to date :) |
It prevented cancelling a subscription in time, which was an undesired behavior in general so we dropped it and stayed with the classical |
What would you like to fuse together?
I just posted a PR that fixes that. |
I'm not interested in fusing an particular operator together TBH. The most interesting operators for fusion (to me at least) seem to be the thread switching ones (observeOn and subscribeOn). Thanks for the info on cancellation. I did think that might be the reason. Good to get confirmation. |
|
Yes precisely because they are required to limit fusion is why I find them interesting - most of the other operators have quite a straightforward reasons for supporting/not supporting fusion. Concurrent processes have always interested me to see how far they can be pushed. :) |
@akarnokd has written in several places that the classic case of:
means that map and observeOn cannot be fused together. I understand this point well. However, I'm interested in a variant of this pipeline:
I might be misunderstanding the matrix, but it seems to suggest that subscribeOn cannot be fused with the map which as far as I can tell is possible? Moreover, the matrix suggests that subscribeOn and observeOn can be fused together which I am failing to understand.
Also a related question, is the async fusion in subscribeOn currently implemented? From the code it doesn't look like it (I'm looking at https://github.com/reactor/reactive-streams-commons/blob/master/src/main/java/rsc/publisher/PublisherSubscribeOn.java) but I'm not sure if it's been worked on locally.
Finally is there a place where @smaldini and @akarnokd discuss what is happening in this repo? I would be very interested in following along!
The text was updated successfully, but these errors were encountered: