-
Notifications
You must be signed in to change notification settings - Fork 526
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(streaming): Support hash based parallelized chain node #1846
Conversation
Signed-off-by: Bowen Zhou <bowenzhou@singularity-data.com>
Signed-off-by: Bowen Zhou <bowenzhou@singularity-data.com>
Signed-off-by: Bowen Zhou <bowenzhou@singularity-data.com>
Signed-off-by: Bowen Zhou <bowenzhou@singularity-data.com>
Codecov Report
@@ Coverage Diff @@
## main #1846 +/- ##
==========================================
- Coverage 70.86% 70.84% -0.03%
==========================================
Files 611 611
Lines 79591 79667 +76
==========================================
+ Hits 56403 56440 +37
- Misses 23188 23227 +39
Flags with carried forward coverage won't be shown. Click here to find out more.
📣 Codecov can now indicate which changes are the most critical in Pull Requests. Learn more |
Seems we have too much log now 😢 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure if this implementation is correct. Would you please elaborate:
- What distribution is
chain
following? - What's the distribution of BatchPlanNode and MergeNode separately? (Is BatchPlanNode really using
Distribution::HashShard(logical.base.pk_indices.clone())
as its distribution?) - What's distribution is chain's dispatcher? How is it determined?
.get_hash_values(self.info.pk_indices.as_ref(), CRC32FastBuilder) | ||
.unwrap(); | ||
let n = data_chunk.cardinality(); | ||
let (columns, _visibility) = data_chunk.into_parts(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we ensure that data_chunk's visibility is None?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Concern about this as well. By the way, there're also some other executors ignoring the visibility. :(
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
AFAIK, collect_data_chunk
in CellBasedTableRowIter
will always return a chunk with None
visibility.
By the way, there're also some other executors ignoring the visibility. :(
Added compact
in execute_inner
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
pls add an assert that visibility is None.
.get_hash_values(self.info.pk_indices.as_ref(), CRC32FastBuilder) | ||
.unwrap(); | ||
let n = data_chunk.cardinality(); | ||
let (columns, _visibility) = data_chunk.into_parts(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Concern about this as well. By the way, there're also some other executors ignoring the visibility. :(
@@ -42,7 +42,7 @@ impl StreamTableScan { | |||
ctx, | |||
logical.schema().clone(), | |||
logical.base.pk_indices.clone(), | |||
Distribution::Single, | |||
Distribution::HashShard(logical.base.pk_indices.clone()), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you please also change the distribution in the Java frontend? I'm afraid the current e2e result cannot cover some cases.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've added a workaround for Java frontend in fragmenter (because I'm not familiar with Java part 😅). Will remove this after we deprecate Java frontend.
I probably didn't catch up the multi-dispatcher part 😢, will reopen it later. |
According to the dashboard graph, there's still a hash dispatcher after each chain. So it seems the distribution of Anyway, as long as there's exchange after chain, the result will be correct. |
Multi-dispatcher can definitely help the implementation of this PR. But it's not well-tested yet -- at least compute-node doesn't support multi-dispatcher. A possible approach is to always follow the distribution of upstream materialize executor, and therefore, the dispatcher for them can be "broadcast", and downstream need shuffle after table scan. |
Signed-off-by: Bowen Zhou <bowenzhou@singularity-data.com>
Signed-off-by: Bowen Zhou <bowenzhou@singularity-data.com>
You can force a chain singleton to be specified in the StreamManagerService on the meta (these requests come from the Java frontend) to avoid modifying the Java frontend. By this way the workload is minimal and pass the Java e2e test. |
Please just use consistent hashing to partition the batch query. See more on Proposal: Use Consistent Hash Across the System. Also, in this way, no exchange would be needed for |
Signed-off-by: Bowen Zhou <bowenzhou@singularity-data.com>
No, we still need it.
|
Also, materialize stream node is created after enforcing distribution. Need refactor the create MV optimize process to make everything work. |
After offline discussion, we will merge this PR first to make it runnable. After this, @zbzbw will try to refine the batch query scan logic to adjust consistent hashing distribution in depended mv. |
Agree.
Hmmmm... Should be guaranteed, I think. |
For others not in the discussion: The stream merged from batch query and upstream looks weird to me because it's under a weird distribution, neither distributed by streaming nor batch. We may refine this later by letting the batch query scan data with the same distribution of the upstream |
That's what I concerned before too. 🤔 So we choice to add an exchange right after chain in current implementation. |
merge? |
What's changed and what's your intention?
This PR implements parallelized chain node in a very straightforward and naive way:
We should change the batch query node to scan the table by range after we figured out how to split table into partitions in a good way.
p.s. Dashboard has some small issue now when resolving mv on mv.
Checklist
Refer to a related PR or issue link (optional)
One step forward of #619