Skip to content

Commit

Permalink
planner: add limit control for intersection case of index merge reader (
Browse files Browse the repository at this point in the history
#46862)

close #46863
  • Loading branch information
AilinKid committed Sep 13, 2023
1 parent a565852 commit 3ad4f74
Show file tree
Hide file tree
Showing 8 changed files with 269 additions and 62 deletions.
224 changes: 190 additions & 34 deletions executor/index_merge_reader.go
Expand Up @@ -296,8 +296,9 @@ func (e *IndexMergeReaderExecutor) startIndexMergeProcessWorker(ctx context.Cont
util.WithRecovery(
func() {
if e.isIntersection {
if e.pushedLimit != nil || e.keepOrder {
panic("Not support intersection with pushedLimit or keepOrder = true")
if e.keepOrder {
// todo: implementing fetchLoopIntersectionWithOrderBy if necessary.
panic("Not support intersection with keepOrder = true")
}
idxMergeProcessWorker.fetchLoopIntersection(ctx, fetch, workCh, e.resultCh, e.finished)
} else if len(e.byItems) != 0 {
Expand All @@ -306,7 +307,7 @@ func (e *IndexMergeReaderExecutor) startIndexMergeProcessWorker(ctx context.Cont
idxMergeProcessWorker.fetchLoopUnion(ctx, fetch, workCh, e.resultCh, e.finished)
}
},
handleWorkerPanic(ctx, e.finished, e.resultCh, nil, processWorkerType),
handleWorkerPanic(ctx, e.finished, nil, e.resultCh, nil, processWorkerType),
)
e.processWorkerWg.Done()
}()
Expand Down Expand Up @@ -337,6 +338,13 @@ func (e *IndexMergeReaderExecutor) startPartialIndexWorker(ctx context.Context,
return errors.New("inject an error before start partialIndexWorker")
})

// for union case, the push-downLimit can be utilized to limit index fetched handles.
// for intersection case, the push-downLimit can only be conducted after all index path/table finished.
pushedIndexLimit := e.pushedLimit
if e.isIntersection {
pushedIndexLimit = nil
}

go func() {
defer trace.StartRegion(ctx, "IndexMergePartialIndexWorker").End()
defer e.idxWorkerWg.Done()
Expand All @@ -357,7 +365,7 @@ func (e *IndexMergeReaderExecutor) startPartialIndexWorker(ctx context.Context,
partitionTableMode: e.partitionTableMode,
prunedPartitions: e.prunedPartitions,
byItems: is.ByItems,
pushedLimit: e.pushedLimit,
pushedLimit: pushedIndexLimit,
}
if e.isCorColInPartialFilters[workID] {
// We got correlated column, so need to refresh Selection operator.
Expand Down Expand Up @@ -432,7 +440,7 @@ func (e *IndexMergeReaderExecutor) startPartialIndexWorker(ctx context.Context,
_, _ = worker.fetchHandles(ctx1, results, exitCh, fetchCh, e.finished, e.handleCols, workID)
cancel()
},
handleWorkerPanic(ctx, e.finished, fetchCh, nil, partialIndexWorkerType),
handleWorkerPanic(ctx, e.finished, nil, fetchCh, nil, partialIndexWorkerType),
)
}()

Expand All @@ -451,6 +459,13 @@ func (e *IndexMergeReaderExecutor) startPartialTableWorker(ctx context.Context,
tbls = append(tbls, e.table)
}

// for union case, the push-downLimit can be utilized to limit index fetched handles.
// for intersection case, the push-downLimit can only be conducted after all index/table path finished.
pushedTableLimit := e.pushedLimit
if e.isIntersection {
pushedTableLimit = nil
}

go func() {
defer trace.StartRegion(ctx, "IndexMergePartialTableWorker").End()
defer e.idxWorkerWg.Done()
Expand Down Expand Up @@ -483,7 +498,7 @@ func (e *IndexMergeReaderExecutor) startPartialTableWorker(ctx context.Context,
partitionTableMode: e.partitionTableMode,
prunedPartitions: e.prunedPartitions,
byItems: ts.ByItems,
pushedLimit: e.pushedLimit,
pushedLimit: pushedTableLimit,
}

if len(e.prunedPartitions) != 0 && len(e.byItems) != 0 {
Expand Down Expand Up @@ -550,7 +565,7 @@ func (e *IndexMergeReaderExecutor) startPartialTableWorker(ctx context.Context,
}
}
},
handleWorkerPanic(ctx, e.finished, fetchCh, nil, partialTableWorkerType),
handleWorkerPanic(ctx, e.finished, nil, fetchCh, nil, partialTableWorkerType),
)
}()
return nil
Expand Down Expand Up @@ -852,7 +867,7 @@ func (e *IndexMergeReaderExecutor) getResultTask(ctx context.Context) (*indexMer
return e.resultCurr, nil
}

func handleWorkerPanic(ctx context.Context, finished <-chan struct{}, ch chan<- *indexMergeTableTask, extraNotifyCh chan bool, worker string) func(r interface{}) {
func handleWorkerPanic(ctx context.Context, finished, limitDone <-chan struct{}, ch chan<- *indexMergeTableTask, extraNotifyCh chan bool, worker string) func(r interface{}) {
return func(r interface{}) {
if worker == processWorkerType {
// There is only one processWorker, so it's safe to close here.
Expand Down Expand Up @@ -882,6 +897,9 @@ func handleWorkerPanic(ctx context.Context, finished <-chan struct{}, ch chan<-
return
case <-finished:
return
case <-limitDone:
// once the process worker recovered from panic, once finding the limitDone signal, actually we can return.
return
case ch <- task:
return
}
Expand Down Expand Up @@ -1125,6 +1143,25 @@ func (w *indexMergeProcessWorker) fetchLoopUnionWithOrderBy(ctx context.Context,
}
}

func pushedLimitCountingDown(pushedLimit *plannercore.PushedDownLimit, handles []kv.Handle) (next bool, res []kv.Handle) {
fhsLen := uint64(len(handles))
// The number of handles is less than the offset, discard all handles.
if fhsLen <= pushedLimit.Offset {
pushedLimit.Offset -= fhsLen
return true, nil
}
handles = handles[pushedLimit.Offset:]
pushedLimit.Offset = 0

fhsLen = uint64(len(handles))
// The number of handles is greater than the limit, only keep limit count.
if fhsLen > pushedLimit.Count {
handles = handles[:pushedLimit.Count]
}
pushedLimit.Count -= mathutil.Min(pushedLimit.Count, fhsLen)
return false, handles
}

func (w *indexMergeProcessWorker) fetchLoopUnion(ctx context.Context, fetchCh <-chan *indexMergeTableTask,
workCh chan<- *indexMergeTableTask, resultCh chan<- *indexMergeTableTask, finished <-chan struct{}) {
failpoint.Inject("testIndexMergeResultChCloseEarly", func(_ failpoint.Value) {
Expand Down Expand Up @@ -1194,21 +1231,11 @@ func (w *indexMergeProcessWorker) fetchLoopUnion(ctx context.Context, fetchCh <-
continue
}
if pushedLimit != nil {
fhsLen := uint64(len(fhs))
// The number of handles is less than the offset, discard all handles.
if fhsLen <= pushedLimit.Offset {
pushedLimit.Offset -= fhsLen
next, res := pushedLimitCountingDown(pushedLimit, fhs)
if next {
continue
}
fhs = fhs[pushedLimit.Offset:]
pushedLimit.Offset = 0

fhsLen = uint64(len(fhs))
// The number of handles is greater than the limit, only keep limit count.
if fhsLen > pushedLimit.Count {
fhs = fhs[:pushedLimit.Count]
}
pushedLimit.Count -= mathutil.Min(pushedLimit.Count, fhsLen)
fhs = res
}
task = &indexMergeTableTask{
lookupTableTask: lookupTableTask{
Expand Down Expand Up @@ -1249,6 +1276,62 @@ func (w *indexMergeProcessWorker) fetchLoopUnion(ctx context.Context, fetchCh <-
}
}

// intersectionCollectWorker is used to dispatch index-merge-table-task to original workCh and resultCh.
// a kind of interceptor to control the pushed down limit restriction. (should be no performance impact)
type intersectionCollectWorker struct {
pushedLimit *plannercore.PushedDownLimit
collectCh chan *indexMergeTableTask
limitDone chan struct{}
}

func (w *intersectionCollectWorker) doIntersectionLimitAndDispatch(ctx context.Context, workCh chan<- *indexMergeTableTask,
resultCh chan<- *indexMergeTableTask, finished <-chan struct{}) {
var (
ok bool
task *indexMergeTableTask
)
for {
select {
case <-ctx.Done():
return
case <-finished:
return
case task, ok = <-w.collectCh:
if !ok {
return
}
// receive a new intersection task here, adding limit restriction logic
if w.pushedLimit != nil {
if w.pushedLimit.Count == 0 {
// close limitDone channel to notify intersectionProcessWorkers * N to exit.
close(w.limitDone)
return
}
next, handles := pushedLimitCountingDown(w.pushedLimit, task.handles)
if next {
continue
}
task.handles = handles
}
// dispatch the new task to workCh and resultCh.
select {
case <-ctx.Done():
return
case <-finished:
return
case workCh <- task:
select {
case <-ctx.Done():
return
case <-finished:
return
case resultCh <- task:
}
}
}
}
}

type intersectionProcessWorker struct {
// key: parTblIdx, val: HandleMap
// Value of MemAwareHandleMap is *int to avoid extra Get().
Expand All @@ -1270,7 +1353,9 @@ func (w *intersectionProcessWorker) consumeMemDelta() {
w.rowDelta = 0
}

func (w *intersectionProcessWorker) doIntersectionPerPartition(ctx context.Context, workCh chan<- *indexMergeTableTask, resultCh chan<- *indexMergeTableTask, finished <-chan struct{}) {
// doIntersectionPerPartition fetch all the task from workerChannel, and after that, then do the intersection pruning, which
// will cause wasting a lot of time waiting for all the fetch task done.
func (w *intersectionProcessWorker) doIntersectionPerPartition(ctx context.Context, workCh chan<- *indexMergeTableTask, resultCh chan<- *indexMergeTableTask, finished, limitDone <-chan struct{}) {
failpoint.Inject("testIndexMergePanicPartitionTableIntersectionWorker", nil)
defer w.memTracker.Detach()

Expand Down Expand Up @@ -1344,10 +1429,12 @@ func (w *intersectionProcessWorker) doIntersectionPerPartition(ctx context.Conte
}
}
failpoint.Inject("testIndexMergeProcessWorkerIntersectionHang", func(_ failpoint.Value) {
for i := 0; i < cap(resultCh); i++ {
select {
case resultCh <- &indexMergeTableTask{}:
default:
if resultCh != nil {
for i := 0; i < cap(resultCh); i++ {
select {
case resultCh <- &indexMergeTableTask{}:
default:
}
}
}
})
Expand All @@ -1357,18 +1444,50 @@ func (w *intersectionProcessWorker) doIntersectionPerPartition(ctx context.Conte
return
case <-finished:
return
case <-limitDone:
// limitDone has signal means the collectWorker has collected enough results, shutdown process workers quickly here.
return
case workCh <- task:
select {
case <-ctx.Done():
return
case <-finished:
return
case resultCh <- task:
// resultCh != nil means there is no collectWorker, and we should send task to resultCh too by ourselves here.
if resultCh != nil {
select {
case <-ctx.Done():
return
case <-finished:
return
case resultCh <- task:
}
}
}
}
}

// for every index merge process worker, it should be feed on a sortedSelectResult for every partial index plan (constructed on all
// table partition ranges results on that index plan path). Since every partial index path is a sorted select result, we can utilize
// K-way merge to accelerate the intersection process.
//
// partialIndexPlan-1 ---> SSR ---> +
// partialIndexPlan-2 ---> SSR ---> + ---> SSR K-way Merge ---> output IndexMergeTableTask
// partialIndexPlan-3 ---> SSR ---> +
// ... +
// partialIndexPlan-N ---> SSR ---> +
//
// K-way merge detail: for every partial index plan, output one row as current its representative row. Then, comparing the N representative
// rows together:
//
// Loop start:
//
// case 1: they are all the same, intersection succeed. --- Record current handle down (already in index order).
// case 2: distinguish among them, for the minimum value/values corresponded index plan/plans. --- Discard current representative row, fetch next.
//
// goto Loop start:
//
// encapsulate all the recorded handles (already in index order) as index merge table tasks, sending them out.
func (*indexMergeProcessWorker) fetchLoopIntersectionWithOrderBy(_ context.Context, _ <-chan *indexMergeTableTask,
_ chan<- *indexMergeTableTask, _ chan<- *indexMergeTableTask, _ <-chan struct{}) {
// todo: pushed sort property with partial index plan and limit.
}

// For each partition(dynamic mode), a map is used to do intersection. Key of the map is handle, and value is the number of times it occurs.
// If the value of handle equals the number of partial paths, it should be sent to final_table_scan_worker.
// To avoid too many goroutines, each intersectionProcessWorker can handle multiple partitions.
Expand Down Expand Up @@ -1404,8 +1523,25 @@ func (w *indexMergeProcessWorker) fetchLoopIntersection(ctx context.Context, fet
})

workers := make([]*intersectionProcessWorker, 0, workerCnt)
var collectWorker *intersectionCollectWorker
wg := util.WaitGroupWrapper{}
wg2 := util.WaitGroupWrapper{}
errCh := make(chan bool, workerCnt)
var limitDone chan struct{}
if w.indexMerge.pushedLimit != nil {
// no memory cost for this code logic.
collectWorker = &intersectionCollectWorker{
// same size of workCh/resultCh
collectCh: make(chan *indexMergeTableTask, atomic.LoadInt32(&LookupTableTaskChannelSize)),
pushedLimit: w.indexMerge.pushedLimit.Clone(),
limitDone: make(chan struct{}),
}
limitDone = collectWorker.limitDone
wg2.RunWithRecover(func() {
defer trace.StartRegion(ctx, "IndexMergeIntersectionProcessWorker").End()
collectWorker.doIntersectionLimitAndDispatch(ctx, workCh, resultCh, finished)
}, handleWorkerPanic(ctx, finished, nil, resultCh, errCh, partTblIntersectionWorkerType))
}
for i := 0; i < workerCnt; i++ {
tracker := memory.NewTracker(w.indexMerge.ID(), -1)
tracker.AttachTo(w.indexMerge.memTracker)
Expand All @@ -1419,15 +1555,35 @@ func (w *indexMergeProcessWorker) fetchLoopIntersection(ctx context.Context, fet
}
wg.RunWithRecover(func() {
defer trace.StartRegion(ctx, "IndexMergeIntersectionProcessWorker").End()
worker.doIntersectionPerPartition(ctx, workCh, resultCh, finished)
}, handleWorkerPanic(ctx, finished, resultCh, errCh, partTblIntersectionWorkerType))
if collectWorker != nil {
// workflow:
// intersectionProcessWorker-1 --+ (limit restriction logic)
// intersectionProcessWorker-2 --+--------- collectCh--> intersectionCollectWorker +--> workCh --> table worker
// ... --+ <--- limitDone to shut inputs ------+ +-> resultCh --> upper parent
// intersectionProcessWorker-N --+
worker.doIntersectionPerPartition(ctx, collectWorker.collectCh, nil, finished, collectWorker.limitDone)
} else {
// workflow:
// intersectionProcessWorker-1 --------------------------+--> workCh --> table worker
// intersectionProcessWorker-2 ---(same as above) +--> resultCh --> upper parent
// ... ---(same as above)
// intersectionProcessWorker-N ---(same as above)
worker.doIntersectionPerPartition(ctx, workCh, resultCh, finished, nil)
}
}, handleWorkerPanic(ctx, finished, limitDone, resultCh, errCh, partTblIntersectionWorkerType))
workers = append(workers, worker)
}
defer func() {
for _, processWorker := range workers {
close(processWorker.workerCh)
}
wg.Wait()
// only after all the possible writer closed, can we shut down the collectCh.
if collectWorker != nil {
// you don't need to clear the channel before closing it, so discard all the remain tasks.
close(collectWorker.collectCh)
}
wg2.Wait()
}()
for {
var ok bool
Expand Down
2 changes: 1 addition & 1 deletion executor/test/indexmergereadtest/BUILD.bazel
Expand Up @@ -9,7 +9,7 @@ go_test(
],
flaky = True,
race = "on",
shard_count = 35,
shard_count = 36,
deps = [
"//config",
"//executor",
Expand Down

0 comments on commit 3ad4f74

Please sign in to comment.