Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(batch): Use futures-async-stream to implement HashJoin executor #2119

Merged
merged 13 commits into from
Apr 28, 2022

Conversation

D2Lark
Copy link
Contributor

@D2Lark D2Lark commented Apr 25, 2022

What's changed and what's your intention?

Implement HashJoin using Executor2 trait.

Please explain IN DETAIL what the changes are in this PR and why they are needed:

  • Summarize your change (mandatory)
  • How does this PR work? Need a brief introduction for the changed logic (optional)
  • Describe clearly one logical change and avoid lazy messages (optional)
  • Describe any limitations of the current code (optional)

Checklist

  • I have written necessary docs and comments
  • I have added necessary unit tests and integration tests

Refer to a related PR or issue link (optional)

close #1947

@D2Lark
Copy link
Contributor Author

D2Lark commented Apr 25, 2022

There are still some unresolved issues, such as Calling child.execute() twice at the same time shows the problem of ownership

--> src/batch/src/executor2/join/hash_join.rs:174:17
|
170 | let mut right_child_stream = self.right_child.execute();
| --------- self.right_child partially moved due to this method call
...
174 | self.build(build_table, right_child_stream).await?
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ value borrowed here after partial move
|
note: this function takes ownership of the receiver self, which moves self.right_child
--> src/batch/src/executor2/mod.rs:67:16
|
67 | fn execute(self: Box) -> BoxedDataChunkStream;
| ^^^^
= note: partial move occurs because self.right_child has type std::boxed::Box<dyn executor2::Executor2>, which does not implement the Copy trait

error[E0382]: use of moved value: left_child_stream

@@ -0,0 +1,212 @@
// Copyright 2022 Singularity Data
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should also remove chunked_data in executor/join mod?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried this once but had some problems, if I delete executor/join/chunked_data, many functions and structures under executor2/join/chunked_data have to be changed to pub, so I keep executor/join/chunked_data for now and delete it after all of them have been ported?

#[try_stream(boxed, ok = DataChunk, error = RwError)]
async fn do_execute(mut self: Box<Self>) {
let mut left_child_stream = self.left_child.execute();
let mut right_child_stream = self.right_child.execute();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The execution of right child should be postponed , we should execute left child to build table first.

/// Build side
right_child: BoxedExecutor,
right_child: BoxedExecutor2,
state: HashJoinState<K>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In fact, after using stream api, we no longer need this state machine. We just need to keep BuildTable and ProbeTable<K> as local varialbe in do_execute method.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From my simple understanding, the previous control flow open next close caused a lot of variables to be bound to the executor and the whole logic was very complicated. So this morning I made a version of the risinglight implementation and simplified it

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for you contribution, I'll take a review.

Copy link
Contributor Author

@D2Lark D2Lark Apr 25, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In fact, after using stream api, we no longer need this state machine. We just need to keep BuildTable and ProbeTable as local varialbe in do_execute method.

I see what you mean, I'll optimize it

@liurenjie1024
Copy link
Contributor

Yes, execute will take the ownership of executor.

@D2Lark
Copy link
Contributor Author

D2Lark commented Apr 25, 2022

Yes, execute will take the ownership of the executor.

yes, because of that we usually put the execute at the end of func. So far there are no problems in the non-join executor. But every join executor has two children to take their father's ownership , What to do with this?

@skyzh
Copy link
Contributor

skyzh commented Apr 25, 2022

Yes, execute will take the ownership of the executor.

yes, because of that we usually put the execute at the end of func. So far there are no problems in the non-join executor. But every join executor has two children to take their father's ownership , What to do with this?

Just set them as Option<BoxedExecutor> and use left.take().unwrap() to get the ownership.

@D2Lark
Copy link
Contributor Author

D2Lark commented Apr 25, 2022

Yes, execute will take the ownership of the executor.

yes, because of that we usually put the execute at the end of func. So far there are no problems in the non-join executor. But every join executor has two children to take their father's ownership , What to do with this?

Just set them as Option<BoxedExecutor> and use left.take().unwrap() to get the ownership.

Thanks for the tip~

@TennyZhuang
Copy link
Collaborator

@TennyZhuang
Copy link
Collaborator

TennyZhuang commented Apr 28, 2022

rustdoc check failed @D2Lark

cargo doc --document-private-items --no-deps

https://github.com/singularity-data/risingwave/runs/6205527285?check_suite_focus=true

Copy link
Contributor

@liurenjie1024 liurenjie1024 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@TennyZhuang
Copy link
Collaborator

@codecov
Copy link

codecov bot commented Apr 28, 2022

Codecov Report

Merging #2119 (69b5eb4) into main (61c645e) will decrease coverage by 0.01%.
The diff coverage is 85.57%.

@@            Coverage Diff             @@
##             main    #2119      +/-   ##
==========================================
- Coverage   70.92%   70.90%   -0.02%     
==========================================
  Files         650      652       +2     
  Lines       82725    82790      +65     
==========================================
+ Hits        58671    58705      +34     
- Misses      24054    24085      +31     
Flag Coverage Δ
rust 70.90% <85.57%> (-0.02%) ⬇️

Flags with carried forward coverage won't be shown. Click here to find out more.

Impacted Files Coverage Δ
src/batch/src/executor/join/mod.rs 13.15% <ø> (-50.00%) ⬇️
src/batch/src/executor/join/nested_loop_join.rs 82.17% <ø> (ø)
src/batch/src/executor/mod.rs 60.46% <ø> (ø)
src/batch/src/executor2/join/hash_join_state.rs 88.78% <ø> (ø)
src/batch/src/executor2/mod.rs 100.00% <ø> (ø)
src/batch/src/executor2/join/mod.rs 60.52% <60.52%> (ø)
src/batch/src/executor2/join/hash_join.rs 85.32% <62.85%> (ø)
src/batch/src/executor2/join/chunked_data.rs 99.21% <99.21%> (ø)
src/batch/src/executor/join/chunked_data.rs 92.96% <0.00%> (-6.25%) ⬇️
... and 2 more

📣 Codecov can now indicate which changes are the most critical in Pull Requests. Learn more

@yuhao-su
Copy link
Contributor

LGTM

@TennyZhuang
Copy link
Collaborator

@liurenjie1024 liurenjie1024 merged commit cb72ab3 into risingwavelabs:main Apr 28, 2022
@liurenjie1024
Copy link
Contributor

@D2Lark D2Lark deleted the impl_hashjoin_executor2 branch April 28, 2022 10:38
while let Some(chunk) = self.right_child.next().await? {
impl<K: HashKey + Send + Sync> HashJoinExecutor2<K> {
#[try_stream(boxed, ok = DataChunk, error = RwError)]
async fn do_execute(mut self: Box<Self>) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this is an elegant implementation and I'd like to refactor it

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Use futures-async-stream to refactor HashJoinExecutor.
5 participants