-
Notifications
You must be signed in to change notification settings - Fork 5.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
*: support mpp partition for tiflash #31043
Conversation
[REVIEW NOTIFICATION] This pull request has been approved by:
To complete the pull request process, please ask the reviewers in the list to review by filling The full list of commands accepted by this bot can be found here. Reviewer can indicate their review by submitting an approval review. |
85a8760
to
2b3d6c9
Compare
10518b8
to
7679d8d
Compare
Code Coverage Details: https://codecov.io/github/pingcap/tidb/commit/72a17f8e3405cb9fb2aa69d9eac7fe200a93f91b |
/cc @windtalker |
tasks := make([]*kv.MPPTask, 0, len(metas)) | ||
for _, meta := range metas { | ||
tasks = append(tasks, &kv.MPPTask{Meta: meta, ID: e.ctx.GetSessionVars().AllocMPPTaskID(e.startTS), StartTs: e.startTS, TableID: tableID}) | ||
task := &kv.MPPTask{Meta: meta, ID: e.ctx.GetSessionVars().AllocMPPTaskID(e.startTS), StartTs: e.startTS, TableID: ts.Table.ID, PartitionTableIDs: allPartitionsIDs} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For non-partition table, PartitionTableIDs
will be a empty slice instead of nil
, is it as expected?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In L338, var allPartitionsIDs []int64
, it will assign allPartitionsIDs to nil. So PartitionTableIDs
will be nil
when the table is non-partition table..
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
executor/table_reader.go
Outdated
@@ -325,6 +337,41 @@ func (e *TableReaderExecutor) buildKVReqSeparately(ctx context.Context, ranges [ | |||
return kvReqs, nil | |||
} | |||
|
|||
func (e *TableReaderExecutor) buildKVForPartitionTableScan(ctx context.Context, ranges []*ranger.Range) (*kv.Request, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
buildKVReqForPartitionTableScan ?
store/copr/batch_request_sender.go
Outdated
Meta *metapb.Region | ||
Ranges *KeyRanges | ||
AllStores []uint64 | ||
PartitionIndex int64 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add a comment for this attribute
planner/core/explain.go
Outdated
@@ -289,6 +290,9 @@ func (p *PhysicalTableScan) OperatorInfo(normalized bool) string { | |||
if p.stats.StatsVersion == statistics.PseudoVersion && !normalized { | |||
buffer.WriteString(", stats:pseudo") | |||
} | |||
if p.StoreType == kv.TiFlash && p.Table.GetPartitionInfo() != nil && p.IsMPPOrBatchCop && p.ctx.GetSessionVars().UseDynamicPartitionPrune() { | |||
buffer.WriteString(", PartitionTableScan") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add a test case for this.
executor/table_reader.go
Outdated
KeyRanges: kvRange, | ||
}) | ||
} | ||
if err := updateExecutorTableID(ctx, e.dagPB.RootExecutor, e.table.Meta().ID, true, pids); err != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
e.table.Meta().ID
is meaningless here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes
planner/core/fragment.go
Outdated
ret = append(ret, tasks...) | ||
} | ||
return ret, nil | ||
return e.constructMPPTasks(ctx, ts, splitedRanges, partitions) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
return e.constructMPPTasks(ctx, ts, splitedRanges, partitions) | |
func (e *mppTaskGenerator) constructMPPTasksImpl(ctx context.Context, ts *PhysicalTableScan) ([]*kv.MPPTask, error) { | |
... | |
if ts.Table.GetPartitionInfo() != nil { | |
req, allPartitionIDs = e.constructMPPBuildTaskReqForPartitionedTable() | |
} else { | |
req = e.constructMPPBuildTaskForNonPartitionTable() | |
} | |
ttl = | |
metas = | |
tasks = | |
return tasks | |
... | |
} |
store/copr/batch_coprocessor.go
Outdated
for _, copTask := range batchTasks { | ||
tableRegions := make([]*coprocessor.TableRegions, len(partitionIDs)) | ||
// init coprocessor.TableRegions | ||
for j, pid := range partitionIDs { | ||
tableRegions[j] = &coprocessor.TableRegions{ | ||
PhysicalTableId: pid, | ||
} | ||
} | ||
// fill region infos | ||
for _, ri := range copTask.regionInfos { | ||
tableRegions[ri.PartitionIndex].Regions = append(tableRegions[ri.PartitionIndex].Regions, | ||
ri.toCoprocessorRegionInfo()) | ||
} | ||
count := 0 | ||
// clear empty table region | ||
for j := 0; j < len(tableRegions); j++ { | ||
if len(tableRegions[j].Regions) != 0 { | ||
tableRegions[count] = tableRegions[j] | ||
count++ | ||
} | ||
} | ||
copTask.PartitionTableRegions = tableRegions[:count] | ||
copTask.regionInfos = nil |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
extract this code block as an individual function like convertRegionInfosToPartitionTableRegions
store/copr/batch_coprocessor.go
Outdated
// When `partitionIDs != nil`, it means that buildBatchCopTasks is constructing a batch cop tasks for PartitionTableScan. | ||
// At this time, `len(rangesForEachPhysicalTable) == len(partitionIDs)` and `rangesForEachPhysicalTable[i]` is for partition `partitionIDs[i]`. | ||
// Otherwise, `rangesForEachPhysicalTable[0]` indicates the range for the single physical table. | ||
func buildBatchCopTasks(bo *backoff.Backoffer, store *kvStore, rangesForEachPhysicalTable []*KeyRanges, storeType kv.StoreType, mppStoreLastFailTime map[string]time.Time, ttl time.Duration, balanceWithContinuity bool, balanceContinuousRegionCount int64, partitionIDs []int64) ([]*batchCopTask, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can add a new function as buildBatchCopTasksForPartitionedTable()
and rename this old function to buildBatchCopTasksForNonPartitionedTable
.
Then we can extract the common code block as buildBatchCopTasksCore()
store/copr/batch_coprocessor.go
Outdated
@@ -628,56 +636,62 @@ func buildBatchCopTasks(bo *backoff.Backoffer, store *kvStore, rangesForEachPhys | |||
|
|||
// generate tableRegions for batchCopTasks | |||
if partitionIDs != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This block can be moved into buildBatchCopTasksForPartitionedTable
keyRanges := []*KeyRanges{ranges} | ||
var partitionIDs []int64 | ||
var tasks []*batchCopTask | ||
var err error | ||
if req.PartitionIDAndRanges != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This block can be moved into buildBatchCopTasksForPartitionedTable
} | ||
tasks, err = buildBatchCopTasksForPartitionedTable(bo, c.store.kvStore, keyRanges, req.StoreType, nil, 0, false, 0, partitionIDs) | ||
} else { | ||
ranges := NewKeyRanges(req.KeyRanges) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
move this line into buildBatchCopTasksForNonPartitionedTable
store/copr/batch_coprocessor.go
Outdated
@@ -49,7 +48,8 @@ type batchCopTask struct { | |||
cmdType tikvrpc.CmdType | |||
ctx *tikv.RPCContext | |||
|
|||
regionInfos []RegionInfo | |||
regionInfos []RegionInfo |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add a comment for the difference between regionInfos and PartitionTableRegions
executor/table_reader.go
Outdated
} | ||
return distsql.NewSerialSelectResults(results), nil | ||
// Use PartitionTable Scan | ||
kvReq, err := e.buildKVRangeForPartitionTableScan(ctx, ranges) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
buildKVReq
not buildKVRange
executor/partition_table.go
Outdated
@@ -22,7 +22,7 @@ import ( | |||
"github.com/pingcap/tipb/go-tipb" | |||
) | |||
|
|||
func updateExecutorTableID(ctx context.Context, exec *tipb.Executor, partitionID int64, recursive bool) error { | |||
func updateExecutorTableID(ctx context.Context, exec *tipb.Executor, partitionID int64, recursive bool, partitionIDs []int64) error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not only use partitionIDs here, and remove partitionID?
/merge |
This pull request has been accepted and is ready to merge. Commit hash: 53f9366
|
/merge |
This pull request has been accepted and is ready to merge. Commit hash: ae08a1f
|
@wshwsh12: Your PR was out of date, I have automatically updated it for you. At the same time I will also trigger all tests for you: /run-all-tests If the CI test fails, you just re-trigger the test that failed and the bot will merge the PR for you after the CI passes. Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the ti-community-infra/tichi repository. |
What problem does this PR solve?
Issue Number: close #32347
Problem Summary:
What is changed and how it works?
TiPB: https://github.com/pingcap/tipb/tree/mpp_partition_table
Kvproto: https://github.com/pingcap/kvproto/tree/mpp_partition_table
Check List
Tests
Side effects
Documentation
Release note