Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Eliminate map call to enable channel operator fusion for Kotlin Workers. #434

Closed
zach-klippenstein opened this issue Jun 29, 2019 · 2 comments · Fixed by #589
Closed

Eliminate map call to enable channel operator fusion for Kotlin Workers. #434

zach-klippenstein opened this issue Jun 29, 2019 · 2 comments · Fixed by #589
Assignees
Labels
blocked enhancement New feature or request kotlin Affects the Kotlin library.
Milestone

Comments

@zach-klippenstein
Copy link
Collaborator

Flow implements operator fusion for operators that rely on channels. E.g., if you start with an RxJava2 Flowable, and then do:

val subscriptionChannel = myFlowable
  .flowOn(Dispatchers.IO) // Subscribe/observe on a background thread.
  .buffer(50) // Buffer up to 50 items when there's backpressure.
  .produceIn(scope) // Subscribe to the Flowable with a ReceiveChannel.

Then only a single channel will be created and shared between all the operators to own the subscription to the Flowable, communicate between threads, and store the buffer. And that same channel will be what is returned from produceIn. This is just an optimization, since each of those declarative operators describe a different aspect of channels' behavior. If you put another, non-channel operator between any of those operators, two separate channels will be created. The behavior of the Flow shouldn't change, but it will use a bit more memory.

The Workflow runtime uses channels to subscribe to Workers. In practice, at least for now, many workers are likely to be direct wrappers around other reactive stream types. So while we can't force Workers to use operator fusion (nor would we want to), we should at least not prevent fusion from occurring when possible.

Currently, to keep the change to using Flow for the Worker API simple, call map just before produceIn in order to wrap values in the Output sentinel type. This prevents any operator fusion from ever occurring in the common case of, for example myObservable.asWorker(). We should remove that map to allow fusion to occur.

@zach-klippenstein zach-klippenstein added the enhancement New feature or request label Jun 29, 2019
@zach-klippenstein zach-klippenstein added this to the kotlin v0.18.0 milestone Jun 29, 2019
@zach-klippenstein zach-klippenstein self-assigned this Jun 29, 2019
@zach-klippenstein
Copy link
Collaborator Author

This is just an optimization, moving to v1.0 milestone.

@zach-klippenstein
Copy link
Collaborator Author

zach-klippenstein commented Sep 1, 2019

This will probably be made obsolete or fixed by #558, and shouldn't be addressed until that issue is resolved.

zach-klippenstein added a commit that referenced this issue Sep 3, 2019
Closes #558, removes 1/2 of #512 TODOs, and closes #434.
zach-klippenstein added a commit that referenced this issue Sep 3, 2019
Closes #558, removes 1/2 of #512 TODOs, and closes #434.
@zach-klippenstein zach-klippenstein added the kotlin Affects the Kotlin library. label Mar 3, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
blocked enhancement New feature or request kotlin Affects the Kotlin library.
Projects
None yet
Development

Successfully merging a pull request may close this issue.

1 participant