Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(batch): parallel table scan #3251

Merged
merged 39 commits into from
Jul 1, 2022
Merged

feat(batch): parallel table scan #3251

merged 39 commits into from
Jul 1, 2022

Conversation

xxchan
Copy link
Member

@xxchan xxchan commented Jun 15, 2022

What's changed and what's your intention?

Mainly added vnode_bitmap to ExecutorBuilder.

Not finished yet... See the todo!s

  • schedule: how to partition vnode bitmap for different executors?
  • executor: scan different ranges according to vnode bitmap

BTW, should we use vnode bitmap or other representations? e.g., vnode ranges, maybe like ParallelUnitMapping. I used bitmap here because of previous storage implementation 😇

Checklist

  • I have written necessary docs and comments
  • I have added necessary unit tests and integration tests
  • All checks passed in ./risedev check (or alias, ./risedev c)

Refer to a related PR or issue link (optional)

#3237

Copy link
Contributor

@github-actions github-actions bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

license-eye has totally checked 851 files.

Valid Invalid Ignored Fixed
849 1 1 0
Click to see the invalid file list
  • src/common/src/consistent_hashing.rs

@xxchan
Copy link
Member Author

xxchan commented Jun 15, 2022

Just tried another representation use vnode_ranges instead of vnode_bitmap, which may be easier for RowSecScanExecutor to create vnode prefix range iterators

proto/task_service.proto Outdated Show resolved Hide resolved
proto/task_service.proto Outdated Show resolved Hide resolved
@@ -37,6 +37,7 @@ impl GrpcExchangeSource {
let task_id = task_output_id.get_task_id()?.clone();
let client = ComputeClient::new(addr).await?;
let local_execute_plan = exchange_source.local_execute_plan;
let vnode_ranges = todo!();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The vnode_range should be determined by optimier/fragmenter.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it should be determined by scheduler? Each scan task scans different ranges

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think vnodes of a table scan plan node should be determined by optimizer. And the compute node where a task should be sent to determined fragmenter, this way we can reuse this logic in local execution mode. The scheduler should only care about sending task to right compute node.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For optimizer & fragmenter, they only have one plan. e.g., scan full table

Then the scan is divided into non-overlapping vnode ranges, and each task of the fragment is assigned a range to scan. How can we do that in optimizer/fragmenter?

Copy link
Member Author

@xxchan xxchan Jun 16, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh we can let fragmenter store num_parallelsm different plans if you want...

Copy link
Contributor

@fuyufjh fuyufjh Jun 16, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here after the filter has been pushed to table scan, it should be able to prune unnecessary vnodes.

Optimizer is responsible for pushing predicate (SARGS) to table scan, but is not responsible for deciding the specific vnode number.

And it cannot do it, because it doesn't know whether (1, 2) are in the same parallel unit.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For 1, it's not the target of this PR and the vnodes_range introduced here.

This PR cares mainly about given a range (e.g., full table scan), divide (schedule) it into small ranges and assign them to tasks.

I think the ScanRange introduced earlier (in BatchSeqScan) is responsible for this part? And pruning vnodes can be done at the same place where we partition the vnode ranges, instead of optimizer

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Per my understanding, the parallel unit is just a partition, which should be maintained by optimizer. It should not know vnode, but should understand parallel unit.

Copy link
Contributor

@fuyufjh fuyufjh Jun 16, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Per my understanding, the parallel unit is just a partition, which should be maintained by optimizer. It should not know vnode, but should understand parallel unit.

I don't think it as partition 😇 To me, it's exactly same thing to the routing metadata of KV databases like Cassendra, HBase or TikV

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Anyway, we need to map filters in table scan to parallel units, and it can't be don't in scheduler since we need to reuse it in local mode. Maybe fragmenter is a better place.

proto/task_service.proto Outdated Show resolved Hide resolved
proto/task_service.proto Outdated Show resolved Hide resolved
proto/task_service.proto Outdated Show resolved Hide resolved
src/batch/src/task/task_manager.rs Outdated Show resolved Hide resolved
@xxchan
Copy link
Member Author

xxchan commented Jun 19, 2022

PTAL the partitioning logic in distributed scheduler (stage.rs schedule_tasks) & local mode (local.rs convert_plan_node) first. I'm not sure whether I understand it correctly 😇 (Whether the representation looks good/can be placed somewhere else can be further discussed if there's no other big problems)

And the remaining work is the executor part, which is blocked by storage side's work (new encoding & vnode ranges scan API).

@xxchan
Copy link
Member Author

xxchan commented Jun 30, 2022

@xxchan
Copy link
Member Author

xxchan commented Jun 30, 2022

PTAL the final updates:

I'd like to fix the FIXME in a separate PR. We merge this PR if other fixes look good?

cc @liurenjie1024 @fuyufjh

update: fix already merged #3599

@xxchan xxchan mentioned this pull request Jul 1, 2022
3 tasks
@xxchan xxchan added the mergify/can-merge Indicates that the PR can be added to the merge queue label Jul 1, 2022
@BugenZhao
Copy link
Member

So this does not enable real parallel table scan according to discussions in #3583? I've removed "close" in PR body. 🤣

@mergify mergify bot merged commit 420fcd5 into main Jul 1, 2022
@mergify mergify bot deleted the xxchan/partition-read branch July 1, 2022 13:01
huangjw806 pushed a commit that referenced this pull request Jul 5, 2022
* add vnode_bitmap in row seq scan

* use vnode_ranges instead of vnode_bitmap

* todo!

* use vnode_mapping in table catalog

* style: change the representation of vnode_ranges

* update local mode

* update local mode workers

* move vnode ranges from tasks into RowSeqScan

* remove build_vnode_mapping

* add some comments

* trivial fix

* use vnode_bitmap instead of vnode_ranges (let table do the conversion instead)

* fix vnodes

* fix local mode (system table)

* buf format

* revert the change in table, ignore in executor if vnodes not set

* revert cell based table

* ignore get row

* fix distinct

* Revert "fix distinct"

This reverts commit 7424af8.

* Revert "Revert "fix distinct""

This reverts commit 3cdab8f.

* let distinct agg be singleton

* single distribution for BatchTopN

* remove should_ignore

* fmt
nasnoisaac pushed a commit to nasnoisaac/risingwave that referenced this pull request Aug 9, 2022
* add vnode_bitmap in row seq scan

* use vnode_ranges instead of vnode_bitmap

* todo!

* use vnode_mapping in table catalog

* style: change the representation of vnode_ranges

* update local mode

* update local mode workers

* move vnode ranges from tasks into RowSeqScan

* remove build_vnode_mapping

* add some comments

* trivial fix

* use vnode_bitmap instead of vnode_ranges (let table do the conversion instead)

* fix vnodes

* fix local mode (system table)

* buf format

* revert the change in table, ignore in executor if vnodes not set

* revert cell based table

* ignore get row

* fix distinct

* Revert "fix distinct"

This reverts commit 7424af8.

* Revert "Revert "fix distinct""

This reverts commit 3cdab8f.

* let distinct agg be singleton

* single distribution for BatchTopN

* remove should_ignore

* fmt
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
mergify/can-merge Indicates that the PR can be added to the merge queue type/feature
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

6 participants