Skip to content

Commit

Permalink
executor: fix query hang in IndexMerge Executor when it's killed (pin…
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot authored and windtalker committed Mar 12, 2024
1 parent 6eb3c2c commit 9797ccb
Show file tree
Hide file tree
Showing 3 changed files with 62 additions and 6 deletions.
24 changes: 18 additions & 6 deletions executor/index_merge_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@ import (

var (
_ Executor = &IndexMergeReaderExecutor{}

// IndexMergeCancelFuncForTest is used just for test
IndexMergeCancelFuncForTest func()
)

const (
Expand Down Expand Up @@ -710,7 +713,7 @@ func (e *IndexMergeReaderExecutor) Next(ctx context.Context, req *chunk.Chunk) e

req.Reset()
for {
resultTask, err := e.getResultTask()
resultTask, err := e.getResultTask(ctx)
if err != nil {
return errors.Trace(err)
}
Expand All @@ -728,7 +731,7 @@ func (e *IndexMergeReaderExecutor) Next(ctx context.Context, req *chunk.Chunk) e
}
}

func (e *IndexMergeReaderExecutor) getResultTask() (*lookupTableTask, error) {
func (e *IndexMergeReaderExecutor) getResultTask(ctx context.Context) (*lookupTableTask, error) {
failpoint.Inject("testIndexMergeMainReturnEarly", func(_ failpoint.Value) {
// To make sure processWorker make resultCh to be full.
// When main goroutine close finished, processWorker may be stuck when writing resultCh.
Expand All @@ -742,8 +745,14 @@ func (e *IndexMergeReaderExecutor) getResultTask() (*lookupTableTask, error) {
if !ok {
return nil, nil
}
if err := <-task.doneCh; err != nil {
return nil, errors.Trace(err)

select {
case <-ctx.Done():
return nil, errors.Trace(ctx.Err())
case err := <-task.doneCh:
if err != nil {
return nil, errors.Trace(err)
}
}

// Release the memory usage of last task before we handle a new task.
Expand Down Expand Up @@ -887,13 +896,16 @@ func (w *indexMergeProcessWorker) fetchLoop(ctx context.Context, fetchCh <-chan
return
case <-finished:
return
case workCh <- task:
case resultCh <- task:
failpoint.Inject("testCancelContext", func() {
IndexMergeCancelFuncForTest()
})
select {
case <-ctx.Done():
return
case <-finished:
return
case resultCh <- task:
case workCh <- task:
}
}
}
Expand Down
28 changes: 28 additions & 0 deletions executor/index_merge_reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package executor_test

import (
"context"
"fmt"
"math/rand"
"regexp"
Expand All @@ -24,7 +25,9 @@ import (
"time"

"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/executor"
"github.com/pingcap/tidb/planner/core"
"github.com/pingcap/tidb/session"
"github.com/pingcap/tidb/testkit"
"github.com/pingcap/tidb/util"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -731,3 +734,28 @@ func TestIndexMergeProcessWorkerHang(t *testing.T) {
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/executor/testIndexMergeMainReturnEarly"))
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/executor/testIndexMergeProcessWorkerUnionHang"))
}

func TestIndexMergeReaderIssue45279(t *testing.T) {
store, clean := testkit.CreateMockStore(t)
defer clean()
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")

tk.MustExec("drop table if exists reproduce;")
tk.MustExec("CREATE TABLE reproduce (c1 int primary key, c2 int, c3 int, key ci2(c2), key ci3(c3));")
tk.MustExec("insert into reproduce values (1, 1, 1), (2, 2, 2), (3, 3, 3);")
tk.MustQuery("explain select * from reproduce where c1 in (0, 1, 2, 3) or c2 in (0, 1, 2);").Check(testkit.Rows(
"IndexMerge_11 33.99 root ",
"├─TableRangeScan_8(Build) 4.00 cop[tikv] table:reproduce range:[0,0], [1,1], [2,2], [3,3], keep order:false, stats:pseudo",
"├─IndexRangeScan_9(Build) 30.00 cop[tikv] table:reproduce, index:ci2(c2) range:[0,0], [1,1], [2,2], keep order:false, stats:pseudo",
"└─TableRowIDScan_10(Probe) 33.99 cop[tikv] table:reproduce keep order:false, stats:pseudo"))

// This function should return successfully
var ctx context.Context
ctx, executor.IndexMergeCancelFuncForTest = context.WithCancel(context.Background())
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/executor/testCancelContext", "return()"))
rs, err := tk.ExecWithContext(ctx, "select * from reproduce where c1 in (0, 1, 2, 3) or c2 in (0, 1, 2);")
require.NoError(t, err)
session.ResultSetToStringSlice(ctx, tk.Session(), rs)
failpoint.Disable("github.com/pingcap/tidb/executor/testCancelContext")
}
16 changes: 16 additions & 0 deletions testkit/testkit.go
Original file line number Diff line number Diff line change
Expand Up @@ -412,3 +412,19 @@ func (tk *TestKit) MustNoGlobalStats(table string) bool {
func (tk *TestKit) CheckLastMessage(msg string) {
tk.require.Equal(tk.Session().LastMessage(), msg)
}

// ExecWithContext executes a sql statement with context
func (tk *TestKit) ExecWithContext(ctx context.Context, sql string) (rs sqlexec.RecordSet, err error) {
var stmts []ast.StmtNode
stmts, err = tk.session.Parse(ctx, sql)
if err != nil {
return nil, errors.Trace(err)
}

rs, err = tk.Session().ExecuteStmt(ctx, stmts[0])
if err != nil {
tk.session.GetSessionVars().StmtCtx.AppendError(err)
return rs, errors.Trace(err)
}
return rs, nil
}

0 comments on commit 9797ccb

Please sign in to comment.