Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

planner, executor: support index merge's order prop push down at the normal way #43881

Merged
merged 16 commits into from Aug 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
63 changes: 32 additions & 31 deletions cmd/explaintest/r/index_merge.result
Expand Up @@ -390,15 +390,15 @@ Delete_11 N/A root N/A
└─Sort_15 4056.68 root test.t1.c1
└─SelectLock_17 4056.68 root for update 0
└─HashJoin_33 4056.68 root inner join, equal:[eq(test.t1.c1, test.t1.c1)]
├─HashAgg_36(Build) 3245.34 root group by:test.t1.c1, funcs:firstrow(test.t1.c1)->test.t1.c1
│ └─IndexMerge_45 2248.30 root type: union
│ ├─IndexRangeScan_41(Build) 3323.33 cop[tikv] table:t1, index:c1(c1) range:[-inf,10), keep order:false, stats:pseudo
│ ├─IndexRangeScan_42(Build) 3323.33 cop[tikv] table:t1, index:c2(c2) range:[-inf,10), keep order:false, stats:pseudo
│ └─Selection_44(Probe) 2248.30 cop[tikv] not(isnull(test.t1.c1)), or(lt(test.t1.c1, 10), and(lt(test.t1.c2, 10), lt(test.t1.c3, 10)))
│ └─TableRowIDScan_43 5542.21 cop[tikv] table:t1 keep order:false, stats:pseudo
└─TableReader_48(Probe) 9990.00 root data:Selection_47
└─Selection_47 9990.00 cop[tikv] not(isnull(test.t1.c1))
└─TableFullScan_46 10000.00 cop[tikv] table:t1 keep order:false, stats:pseudo
├─HashAgg_35(Build) 3245.34 root group by:test.t1.c1, funcs:firstrow(test.t1.c1)->test.t1.c1
│ └─IndexMerge_41 2248.30 root type: union
│ ├─IndexRangeScan_37(Build) 3323.33 cop[tikv] table:t1, index:c1(c1) range:[-inf,10), keep order:false, stats:pseudo
│ ├─IndexRangeScan_38(Build) 3323.33 cop[tikv] table:t1, index:c2(c2) range:[-inf,10), keep order:false, stats:pseudo
│ └─Selection_40(Probe) 2248.30 cop[tikv] not(isnull(test.t1.c1)), or(lt(test.t1.c1, 10), and(lt(test.t1.c2, 10), lt(test.t1.c3, 10)))
│ └─TableRowIDScan_39 5542.21 cop[tikv] table:t1 keep order:false, stats:pseudo
└─TableReader_49(Probe) 9990.00 root data:Selection_48
└─Selection_48 9990.00 cop[tikv] not(isnull(test.t1.c1))
└─TableFullScan_47 10000.00 cop[tikv] table:t1 keep order:false, stats:pseudo
delete from t1 where c1 in (select /*+ use_index_merge(t1) */ c1 from t1 where c1 < 10 or c2 < 10 and c3 < 10) order by 1;
select * from t1;
c1 c2 c3
Expand All @@ -408,15 +408,15 @@ id estRows task access object operator info
Update_10 N/A root N/A
└─SelectLock_14 4056.68 root for update 0
└─HashJoin_30 4056.68 root inner join, equal:[eq(test.t1.c1, test.t1.c1)]
├─HashAgg_33(Build) 3245.34 root group by:test.t1.c1, funcs:firstrow(test.t1.c1)->test.t1.c1
│ └─IndexMerge_42 2248.30 root type: union
│ ├─IndexRangeScan_38(Build) 3323.33 cop[tikv] table:t1, index:c1(c1) range:[-inf,10), keep order:false, stats:pseudo
│ ├─IndexRangeScan_39(Build) 3323.33 cop[tikv] table:t1, index:c2(c2) range:[-inf,10), keep order:false, stats:pseudo
│ └─Selection_41(Probe) 2248.30 cop[tikv] not(isnull(test.t1.c1)), or(lt(test.t1.c1, 10), and(lt(test.t1.c2, 10), lt(test.t1.c3, 10)))
│ └─TableRowIDScan_40 5542.21 cop[tikv] table:t1 keep order:false, stats:pseudo
└─TableReader_45(Probe) 9990.00 root data:Selection_44
└─Selection_44 9990.00 cop[tikv] not(isnull(test.t1.c1))
└─TableFullScan_43 10000.00 cop[tikv] table:t1 keep order:false, stats:pseudo
├─HashAgg_32(Build) 3245.34 root group by:test.t1.c1, funcs:firstrow(test.t1.c1)->test.t1.c1
│ └─IndexMerge_38 2248.30 root type: union
│ ├─IndexRangeScan_34(Build) 3323.33 cop[tikv] table:t1, index:c1(c1) range:[-inf,10), keep order:false, stats:pseudo
│ ├─IndexRangeScan_35(Build) 3323.33 cop[tikv] table:t1, index:c2(c2) range:[-inf,10), keep order:false, stats:pseudo
│ └─Selection_37(Probe) 2248.30 cop[tikv] not(isnull(test.t1.c1)), or(lt(test.t1.c1, 10), and(lt(test.t1.c2, 10), lt(test.t1.c3, 10)))
│ └─TableRowIDScan_36 5542.21 cop[tikv] table:t1 keep order:false, stats:pseudo
└─TableReader_46(Probe) 9990.00 root data:Selection_45
└─Selection_45 9990.00 cop[tikv] not(isnull(test.t1.c1))
└─TableFullScan_44 10000.00 cop[tikv] table:t1 keep order:false, stats:pseudo
update t1 set c1 = 100, c2 = 100, c3 = 100 where c1 in (select /*+ use_index_merge(t1) */ c1 from t1 where c1 < 10 or c2 < 10 and c3 < 10);
select * from t1;
c1 c2 c3
Expand Down Expand Up @@ -469,26 +469,27 @@ create table t1(c1 int, c2 int, c3 int, key(c1), key(c2));
insert into t1 values(1, 1, 1), (2, 2, 2), (3, 3, 3), (4, 4, 4), (5, 5, 5);
explain select /*+ use_index_merge(t1) */ * from t1 where (c1 < 10 or c2 < 10) and c3 < 10 order by 1 limit 1 offset 2;
id estRows task access object operator info
TopN_10 1.00 root test.t1.c1, offset:2, count:1
└─IndexMerge_23 1841.86 root type: union
├─IndexRangeScan_19(Build) 3323.33 cop[tikv] table:t1, index:c1(c1) range:[-inf,10), keep order:false, stats:pseudo
├─IndexRangeScan_20(Build) 3323.33 cop[tikv] table:t1, index:c2(c2) range:[-inf,10), keep order:false, stats:pseudo
└─Selection_22(Probe) 1841.86 cop[tikv] lt(test.t1.c3, 10)
└─TableRowIDScan_21 5542.21 cop[tikv] table:t1 keep order:false, stats:pseudo
TopN_9 1.00 root test.t1.c1, offset:2, count:1
└─IndexMerge_18 3.00 root type: union
├─IndexRangeScan_13(Build) 3323.33 cop[tikv] table:t1, index:c1(c1) range:[-inf,10), keep order:false, stats:pseudo
├─IndexRangeScan_14(Build) 3323.33 cop[tikv] table:t1, index:c2(c2) range:[-inf,10), keep order:false, stats:pseudo
└─TopN_17(Probe) 3.00 cop[tikv] test.t1.c1, offset:0, count:3
└─Selection_16 1841.86 cop[tikv] lt(test.t1.c3, 10)
└─TableRowIDScan_15 5542.21 cop[tikv] table:t1 keep order:false, stats:pseudo
select /*+ use_index_merge(t1) */ * from t1 where (c1 < 10 or c2 < 10) and c3 < 10 order by 1 limit 1 offset 2;
c1 c2 c3
3 3 3
///// GROUP BY
explain select /*+ use_index_merge(t1) */ sum(c1) from t1 where (c1 < 10 or c2 < 10) and c3 < 10 group by c1 order by 1;
id estRows task access object operator info
Sort_6 1473.49 root Column#5
└─HashAgg_11 1473.49 root group by:Column#13, funcs:sum(Column#12)->Column#5
└─Projection_22 1841.86 root cast(test.t1.c1, decimal(10,0) BINARY)->Column#12, test.t1.c1->Column#13
└─IndexMerge_20 1841.86 root type: union
├─IndexRangeScan_16(Build) 3323.33 cop[tikv] table:t1, index:c1(c1) range:[-inf,10), keep order:false, stats:pseudo
├─IndexRangeScan_17(Build) 3323.33 cop[tikv] table:t1, index:c2(c2) range:[-inf,10), keep order:false, stats:pseudo
└─Selection_19(Probe) 1841.86 cop[tikv] lt(test.t1.c3, 10)
└─TableRowIDScan_18 5542.21 cop[tikv] table:t1 keep order:false, stats:pseudo
└─HashAgg_10 1473.49 root group by:Column#13, funcs:sum(Column#12)->Column#5
└─Projection_23 1841.86 root cast(test.t1.c1, decimal(10,0) BINARY)->Column#12, test.t1.c1->Column#13
└─IndexMerge_16 1841.86 root type: union
├─IndexRangeScan_12(Build) 3323.33 cop[tikv] table:t1, index:c1(c1) range:[-inf,10), keep order:false, stats:pseudo
├─IndexRangeScan_13(Build) 3323.33 cop[tikv] table:t1, index:c2(c2) range:[-inf,10), keep order:false, stats:pseudo
└─Selection_15(Probe) 1841.86 cop[tikv] lt(test.t1.c3, 10)
└─TableRowIDScan_14 5542.21 cop[tikv] table:t1 keep order:false, stats:pseudo
select /*+ use_index_merge(t1) */ sum(c1) from t1 where (c1 < 10 or c2 < 10) and c3 < 10 group by c1 order by 1;
sum(c1)
1
Expand Down
20 changes: 18 additions & 2 deletions executor/builder.go
Expand Up @@ -1445,7 +1445,23 @@
us.virtualColumnIndex = buildVirtualColumnIndex(us.Schema(), us.columns)
us.handleCachedTable(b, x, sessionVars, startTS)
case *IndexMergeReaderExecutor:
// IndexMergeReader doesn't care order for now. So we will not set desc, useIndex and keepOrder.
if len(x.byItems) != 0 {
us.keepOrder = x.keepOrder
us.desc = x.byItems[0].Desc
for _, item := range x.byItems {
c, ok := item.Expr.(*expression.Column)
if !ok {
b.err = errors.Errorf("Not support non-column in orderBy pushed down")
return nil
}

Check warning on line 1456 in executor/builder.go

View check run for this annotation

Codecov / codecov/patch

executor/builder.go#L1454-L1456

Added lines #L1454 - L1456 were not covered by tests
for i, col := range x.columns {
if col.ID == c.ID {
us.usedIndex = append(us.usedIndex, i)
break
}
}
}
}
us.conditions, us.conditionsWithVirCol = plannercore.SplitSelCondsWithVirtualColumn(v.Conditions)
us.columns = x.columns
us.table = x.table
Expand Down Expand Up @@ -4201,7 +4217,7 @@
dataReaderBuilder: readerBuilder,
feedbacks: feedbacks,
paging: paging,
handleCols: ts.HandleCols,
handleCols: v.HandleCols,
isCorColInPartialFilters: isCorColInPartialFilters,
isCorColInTableFilter: isCorColInTableFilter,
isCorColInPartialAccess: isCorColInPartialAccess,
Expand Down
74 changes: 58 additions & 16 deletions executor/index_merge_reader.go
Expand Up @@ -151,7 +151,7 @@
// parTblIdx are only used in indexMergeProcessWorker.fetchLoopIntersection.
parTblIdx int

// partialPlanID are only used for indexMergeProcessWorker.fetchLoopUnionWithOrderByAndPushedLimit.
// partialPlanID are only used for indexMergeProcessWorker.fetchLoopUnionWithOrderBy.
partialPlanID int
}

Expand Down Expand Up @@ -297,9 +297,12 @@
util.WithRecovery(
func() {
if e.isIntersection {
if e.pushedLimit != nil || e.keepOrder {
panic("Not support intersection with pushedLimit or keepOrder = true")

Check warning on line 301 in executor/index_merge_reader.go

View check run for this annotation

Codecov / codecov/patch

executor/index_merge_reader.go#L301

Added line #L301 was not covered by tests
}
idxMergeProcessWorker.fetchLoopIntersection(ctx, fetch, workCh, e.resultCh, e.finished)
} else if e.pushedLimit != nil && len(e.byItems) != 0 {
idxMergeProcessWorker.fetchLoopUnionWithOrderByAndPushedLimit(ctx, fetch, workCh, e.resultCh, e.finished)
} else if len(e.byItems) != 0 {
idxMergeProcessWorker.fetchLoopUnionWithOrderBy(ctx, fetch, workCh, e.resultCh, e.finished)
} else {
idxMergeProcessWorker.fetchLoopUnion(ctx, fetch, workCh, e.resultCh, e.finished)
}
Expand Down Expand Up @@ -341,6 +344,7 @@
util.WithRecovery(
func() {
failpoint.Inject("testIndexMergePanicPartialIndexWorker", nil)
is := e.partialPlans[workID][0].(*plannercore.PhysicalIndexScan)
worker := &partialIndexWorker{
stats: e.stats,
idxID: e.getPartitalPlanID(workID),
Expand All @@ -353,10 +357,9 @@
memTracker: e.memTracker,
partitionTableMode: e.partitionTableMode,
prunedPartitions: e.prunedPartitions,
byItems: e.byItems,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like we don't need to change here.
Because we will set this in PhysicalIndexMergeReader.ByItems (Init()) then IndexMergeReaderExecutor.byItems (buildNoRangeIndexMergeReader), so finally they are the same.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I want to do the remained part in the later pull.

byItems: is.ByItems,
pushedLimit: e.pushedLimit,
}

if e.isCorColInPartialFilters[workID] {
// We got correlated column, so need to refresh Selection operator.
var err error
Expand Down Expand Up @@ -471,7 +474,7 @@
ranges: e.ranges[workID],
netDataSize: e.partialNetDataSizes[workID],
keepOrder: ts.KeepOrder,
byItems: e.byItems,
byItems: ts.ByItems,
}

worker := &partialTableWorker{
Expand All @@ -484,7 +487,7 @@
memTracker: e.memTracker,
partitionTableMode: e.partitionTableMode,
prunedPartitions: e.prunedPartitions,
byItems: e.byItems,
byItems: ts.ByItems,
pushedLimit: e.pushedLimit,
}

Expand Down Expand Up @@ -924,7 +927,9 @@
}

type handleHeap struct {
// requiredCnt == 0 means need all handles
requiredCnt uint64
tracker *memory.Tracker
taskMap map[int][]*indexMergeTableTask

idx []rowIdx
Expand Down Expand Up @@ -961,23 +966,34 @@
func (h *handleHeap) Push(x interface{}) {
idx := x.(rowIdx)
h.idx = append(h.idx, idx)
if h.tracker != nil {
h.tracker.Consume(int64(unsafe.Sizeof(h.idx)))
}
}

func (h *handleHeap) Pop() interface{} {
idxRet := h.idx[len(h.idx)-1]
h.idx = h.idx[:len(h.idx)-1]
if h.tracker != nil {
h.tracker.Consume(-int64(unsafe.Sizeof(h.idx)))
}
return idxRet
}

func (w *indexMergeProcessWorker) NewHandleHeap(taskMap map[int][]*indexMergeTableTask) *handleHeap {
func (w *indexMergeProcessWorker) NewHandleHeap(taskMap map[int][]*indexMergeTableTask, memTracker *memory.Tracker) *handleHeap {
compareFuncs := make([]chunk.CompareFunc, 0, len(w.indexMerge.byItems))
for _, item := range w.indexMerge.byItems {
keyType := item.Expr.GetType()
compareFuncs = append(compareFuncs, chunk.GetCompareFunc(keyType))
}
requiredCnt := w.indexMerge.pushedLimit.Count + w.indexMerge.pushedLimit.Offset

requiredCnt := uint64(0)
if w.indexMerge.pushedLimit != nil {
requiredCnt = mathutil.Max(requiredCnt, w.indexMerge.pushedLimit.Count+w.indexMerge.pushedLimit.Offset)
}
return &handleHeap{
requiredCnt: requiredCnt,
tracker: memTracker,
taskMap: taskMap,
idx: make([]rowIdx, 0, requiredCnt),
compareFunc: compareFuncs,
Expand All @@ -1004,7 +1020,7 @@
}
}

func (w *indexMergeProcessWorker) fetchLoopUnionWithOrderByAndPushedLimit(ctx context.Context, fetchCh <-chan *indexMergeTableTask,
func (w *indexMergeProcessWorker) fetchLoopUnionWithOrderBy(ctx context.Context, fetchCh <-chan *indexMergeTableTask,
workCh chan<- *indexMergeTableTask, resultCh chan<- *indexMergeTableTask, finished <-chan struct{}) {
memTracker := memory.NewTracker(w.indexMerge.ID(), -1)
memTracker.AttachTo(w.indexMerge.memTracker)
Expand All @@ -1021,8 +1037,7 @@
distinctHandles := kv.NewHandleMap()
taskMap := make(map[int][]*indexMergeTableTask)
uselessMap := make(map[int]struct{})
taskHeap := w.NewHandleHeap(taskMap)
memTracker.Consume(int64(taskHeap.requiredCnt) * int64(unsafe.Sizeof(rowIdx{0, 0, 0})))
taskHeap := w.NewHandleHeap(taskMap, memTracker)

for task := range fetchCh {
select {
Expand All @@ -1038,15 +1053,15 @@
continue
}
if _, ok := taskMap[task.partialPlanID]; !ok {
taskMap[task.partialPlanID] = make([]*indexMergeTableTask, 0)
taskMap[task.partialPlanID] = make([]*indexMergeTableTask, 0, 1)
}
w.pruneTableWorkerTaskIdxRows(task)
taskMap[task.partialPlanID] = append(taskMap[task.partialPlanID], task)
for i, h := range task.handles {
if _, ok := distinctHandles.Get(h); !ok {
distinctHandles.Set(h, true)
heap.Push(taskHeap, rowIdx{task.partialPlanID, len(taskMap[task.partialPlanID]) - 1, i})
if taskHeap.Len() > int(taskHeap.requiredCnt) {
if int(taskHeap.requiredCnt) != 0 && taskHeap.Len() > int(taskHeap.requiredCnt) {
top := heap.Pop(taskHeap).(rowIdx)
if top.partialID == task.partialPlanID && top.taskID == len(taskMap[task.partialPlanID])-1 && top.rowID == i {
uselessMap[task.partialPlanID] = struct{}{}
Expand All @@ -1067,7 +1082,10 @@
}
}

needCount := mathutil.Max(0, taskHeap.Len()-int(w.indexMerge.pushedLimit.Offset))
needCount := taskHeap.Len()
if w.indexMerge.pushedLimit != nil {
needCount = mathutil.Max(0, taskHeap.Len()-int(w.indexMerge.pushedLimit.Offset))
}
if needCount == 0 {
return
}
Expand Down Expand Up @@ -1125,10 +1143,17 @@
defer close(workCh)
failpoint.Inject("testIndexMergePanicProcessWorkerUnion", nil)

var pushedLimit *plannercore.PushedDownLimit
if w.indexMerge.pushedLimit != nil {
pushedLimit = w.indexMerge.pushedLimit.Clone()
}
distinctHandles := make(map[int64]*kv.HandleMap)
for {
var ok bool
var task *indexMergeTableTask
if pushedLimit != nil && pushedLimit.Count == 0 {
return
}
select {
case <-ctx.Done():
return
Expand Down Expand Up @@ -1175,6 +1200,23 @@
if len(fhs) == 0 {
continue
}
if pushedLimit != nil {
fhsLen := uint64(len(fhs))
// The number of handles is less than the offset, discard all handles.
if fhsLen <= pushedLimit.Offset {
winoros marked this conversation as resolved.
Show resolved Hide resolved
pushedLimit.Offset -= fhsLen
continue

Check warning on line 1208 in executor/index_merge_reader.go

View check run for this annotation

Codecov / codecov/patch

executor/index_merge_reader.go#L1207-L1208

Added lines #L1207 - L1208 were not covered by tests
}
fhs = fhs[pushedLimit.Offset:]
pushedLimit.Offset = 0

fhsLen = uint64(len(fhs))
// The number of handles is greater than the limit, only keep limit count.
if fhsLen > pushedLimit.Count {
winoros marked this conversation as resolved.
Show resolved Hide resolved
fhs = fhs[:pushedLimit.Count]
}

Check warning on line 1217 in executor/index_merge_reader.go

View check run for this annotation

Codecov / codecov/patch

executor/index_merge_reader.go#L1216-L1217

Added lines #L1216 - L1217 were not covered by tests
pushedLimit.Count -= mathutil.Min(pushedLimit.Count, fhsLen)
}
task = &indexMergeTableTask{
lookupTableTask: lookupTableTask{
handles: fhs,
Expand Down Expand Up @@ -1748,7 +1790,7 @@
if err != nil {
return err
}
if physicalTableIDIdx != -1 {
if w.indexMergeExec.partitionTableMode && physicalTableIDIdx != -1 {
handle = kv.NewPartitionHandle(row.GetInt64(physicalTableIDIdx), handle)
}
rowIdx, _ := task.indexOrder.Get(handle)
Expand Down
25 changes: 24 additions & 1 deletion executor/mem_reader.go
Expand Up @@ -780,6 +780,9 @@
partitionMode bool // if it is accessing a partition table
partitionTables []table.PhysicalTable // partition tables to access
partitionKVRanges [][][]kv.KeyRange // kv ranges for these partition tables

keepOrder bool
compareExec
}

func buildMemIndexMergeReader(ctx context.Context, us *UnionScanExec, indexMergeReader *IndexMergeReaderExecutor) *memIndexMergeReader {
Expand Down Expand Up @@ -831,6 +834,9 @@
partitionMode: indexMergeReader.partitionTableMode,
partitionTables: indexMergeReader.prunedPartitions,
partitionKVRanges: indexMergeReader.partitionKeyRanges,

keepOrder: us.keepOrder,
compareExec: us.compareExec,
}
}

Expand Down Expand Up @@ -921,7 +927,24 @@
},
}

return memTblReader.getMemRows(ctx)
rows, err := memTblReader.getMemRows(ctx)
if err != nil {
return nil, err
}

Check warning on line 933 in executor/mem_reader.go

View check run for this annotation

Codecov / codecov/patch

executor/mem_reader.go#L932-L933

Added lines #L932 - L933 were not covered by tests

// Didn't set keepOrder = true for memTblReader,
// In indexMerge, non-partitioned tables are also need reordered.
if m.keepOrder {
slices.SortFunc(rows, func(a, b []types.Datum) bool {
ret, err1 := m.compare(m.ctx.GetSessionVars().StmtCtx, a, b)
if err1 != nil {
err = err1
}
return ret

Check warning on line 943 in executor/mem_reader.go

View check run for this annotation

Codecov / codecov/patch

executor/mem_reader.go#L939-L943

Added lines #L939 - L943 were not covered by tests
})
}

return rows, err
}

// Union all handles of all partial paths.
Expand Down
2 changes: 1 addition & 1 deletion executor/test/indexmergereadtest/BUILD.bazel
Expand Up @@ -9,7 +9,7 @@ go_test(
],
flaky = True,
race = "on",
shard_count = 30,
shard_count = 33,
deps = [
"//config",
"//meta/autoid",
Expand Down