Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(streaming): correctly implement LookupUnionExecutor #2169

Merged
merged 8 commits into from
Apr 29, 2022

Conversation

skyzh
Copy link
Contributor

@skyzh skyzh commented Apr 27, 2022

Signed-off-by: Alex Chi iskyzh@gmail.com

What's changed and what's your intention?

Basically, what we are doing here is to forward data one input by one input. To avoid blocking upstream channels, we will need to store all data into a local buffer, which caused this part somehow not easy to read. Not sure how to write this in a better way. cc @wangrunji0408 may take a look and shed some light.

Checklist

  • I have written necessary docs and comments
  • I have added necessary unit tests and integration tests

Refer to a related PR or issue link (optional)

close #2141

@skyzh skyzh requested review from wangrunji0408, BugenZhao and yuhao-su and removed request for BugenZhao April 27, 2022 04:07
@codecov
Copy link

codecov bot commented Apr 27, 2022

Codecov Report

Merging #2169 (c741644) into main (bc05efd) will decrease coverage by 0.09%.
The diff coverage is 76.47%.

@@            Coverage Diff             @@
##             main    #2169      +/-   ##
==========================================
- Coverage   71.00%   70.90%   -0.10%     
==========================================
  Files         654      657       +3     
  Lines       82894    83075     +181     
==========================================
+ Hits        58862    58908      +46     
- Misses      24032    24167     +135     
Flag Coverage Δ
rust 70.90% <76.47%> (-0.10%) ⬇️

Flags with carried forward coverage won't be shown. Click here to find out more.

Impacted Files Coverage Δ
src/frontend/src/optimizer/mod.rs 94.35% <ø> (ø)
...c/meta/src/stream/fragmenter/rewrite/delta_join.rs 0.00% <0.00%> (ø)
src/stream/src/executor/mod.rs 42.46% <ø> (ø)
src/stream/src/executor_v2/mod.rs 80.00% <ø> (ø)
src/stream/src/executor_v2/lookup_union.rs 75.45% <75.45%> (ø)
src/frontend/src/optimizer/delta_join_solver.rs 95.16% <100.00%> (ø)
src/meta/src/hummock/mod.rs 21.35% <0.00%> (-9.78%) ⬇️
src/meta/src/hummock/hummock_manager.rs 91.13% <0.00%> (-0.43%) ⬇️
src/meta/src/rpc/server.rs 0.00% <0.00%> (ø)
src/meta/src/hummock/compactor_manager.rs 98.21% <0.00%> (ø)
... and 5 more

📣 Codecov can now indicate which changes are the most critical in Pull Requests. Learn more

@wangrunji0408
Copy link
Contributor

Can we spawn new futures to buffer upstreams? Then the union executor could await these futures sequentially.

@skyzh
Copy link
Contributor Author

skyzh commented Apr 27, 2022

Can we spawn new futures to buffer upstreams? Then the union executor could await these futures sequentially.

Nope. Executors need back pressure. If union executors cannot send its data to downstream, it should stop receiving data from upstream.

Also we assume one actor only creates one thread. There should not be un-managed spawned future in actors and executors.

@wangrunji0408
Copy link
Contributor

I have a solution that might look better. See 77c40a2.

The key points are:

  • Use bounded channels as buffers with backpressure.
  • To avoid blocking upstream, create a future for each input to poll the stream and forward its output to the channel. The executor can then await the channels one by one, which greatly simplifies the code.
  • Although spawning futures is not allowed, to ensure they are being polled, we can select them on all .await points.

@skyzh
Copy link
Contributor Author

skyzh commented Apr 27, 2022

That's cool! I'll cherry-pick that commit to my branch... tomorrow for sure 🤣

@wangrunji0408
Copy link
Contributor

wangrunji0408 commented Apr 27, 2022

Also we assume one actor only creates one thread. There should not be un-managed spawned future in actors and executors.

And I still think spawning futures should be allowed. It wouldn't create new threads, just create a new task.
To avoid dangling futures, we can use smol-style task handle, which will cancel the task on drop, instead of detaching it.
Once all task handles are properly stored somewhere, we can make sure that all tasks are under control.

In this case, I don't think the executor itself should care whether the upstreams would be blocked or not. If an upstream is not allowed to be blocked by downstream, we should spawn a future to poll it and connect futures using a channel as a buffer. Therefore, no matter what the downstream executor is, the upstream will never be blocked.

@skyzh
Copy link
Contributor Author

skyzh commented Apr 27, 2022

For the case of union executor, I think it's okay to use a channel. Some small issues:

  • Tracing context will lose between union's inputs and the downstream executors.
  • Actors might do actual work (apart from I/O) even if we don't poll them. I think that's somehow bad.
  • Backpressure might not work. Given the channel is unbounded between union and its inputs, upstream will keep producing data, even if union cannot put data into downstream; if using a bounded channel, upstream will be blocked unexpectedly, if union keeps receiving from one path.

From my perspective, union is somehow a special kind of merge executor. If we model union executor as a merge executor, instead of something in the middle, I'll be totally fine with spawning tasks.

skyzh and others added 5 commits April 29, 2022 10:09
Signed-off-by: Alex Chi <iskyzh@gmail.com>
Signed-off-by: Alex Chi <iskyzh@gmail.com>
Signed-off-by: Alex Chi <iskyzh@gmail.com>
Signed-off-by: Runji Wang <wangrunji0408@163.com>
Signed-off-by: Alex Chi <iskyzh@gmail.com>
@yuhao-su
Copy link
Contributor

LGTM

@skyzh skyzh changed the title feat(streaming): correctly implement lookup-union executor feat(streaming): rename Union to LookupUnion Apr 29, 2022
Signed-off-by: Alex Chi <iskyzh@gmail.com>
@skyzh skyzh changed the title feat(streaming): rename Union to LookupUnion feat(streaming): correctly implement LookupUnionExecutor Apr 29, 2022
@skyzh skyzh enabled auto-merge (squash) April 29, 2022 04:45
@skyzh skyzh disabled auto-merge April 29, 2022 04:45
@skyzh skyzh enabled auto-merge (squash) April 29, 2022 04:45
Signed-off-by: Alex Chi <iskyzh@gmail.com>
@skyzh skyzh merged commit 32f4a84 into main Apr 29, 2022
@skyzh skyzh deleted the skyzh/correct-union branch April 29, 2022 05:10
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

streaming: fix lookup union executor behavior
3 participants