Skip to content

Commit

Permalink
executor: INLHJ worker return err to main thread (#18573) (#18586)
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-srebot committed Jul 15, 2020
1 parent 17711d8 commit 9e25dc4
Show file tree
Hide file tree
Showing 4 changed files with 85 additions and 22 deletions.
47 changes: 28 additions & 19 deletions executor/index_lookup_hash_join.go
Expand Up @@ -21,16 +21,15 @@ import (
"sync"

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/expression"
plannercore "github.com/pingcap/tidb/planner/core"
"github.com/pingcap/tidb/util"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/codec"
"github.com/pingcap/tidb/util/execdetails"
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/memory"
"github.com/pingcap/tidb/util/ranger"
"go.uber.org/zap"
)

// numResChkHold indicates the number of resource chunks that an inner worker
Expand Down Expand Up @@ -158,7 +157,7 @@ func (e *IndexNestedLoopHashJoin) startWorkers(ctx context.Context) {
}
e.workerWg.Add(1)
ow := e.newOuterWorker(innerCh)
go util.WithRecovery(func() { ow.run(workerCtx, cancelFunc) }, e.finishJoinWorkers)
go util.WithRecovery(func() { ow.run(workerCtx) }, e.finishJoinWorkers)

if !e.keepOuterOrder {
e.resultCh = make(chan *indexHashJoinResult, concurrency)
Expand Down Expand Up @@ -230,7 +229,7 @@ func (e *IndexNestedLoopHashJoin) Next(ctx context.Context, req *chunk.Chunk) er
return result.err
}
case <-ctx.Done():
return nil
return ctx.Err()
}
req.SwapColumns(result.chk)
result.src <- result.chk
Expand All @@ -256,7 +255,7 @@ func (e *IndexNestedLoopHashJoin) runInOrder(ctx context.Context, req *chunk.Chu
return result.err
}
case <-ctx.Done():
return nil
return ctx.Err()
}
req.SwapColumns(result.chk)
result.src <- result.chk
Expand Down Expand Up @@ -308,16 +307,22 @@ func (e *IndexNestedLoopHashJoin) Close() error {
return e.baseExecutor.Close()
}

func (ow *indexHashJoinOuterWorker) run(ctx context.Context, cancelFunc context.CancelFunc) {
func (ow *indexHashJoinOuterWorker) run(ctx context.Context) {
defer close(ow.innerCh)
for {
task, err := ow.buildTask(ctx)
if task == nil {
failpoint.Inject("testIndexHashJoinOuterWorkerErr", func() {
err = errors.New("mockIndexHashJoinOuterWorkerErr")
})
if err != nil {
task = &indexHashJoinTask{err: err}
ow.pushToChan(ctx, task, ow.innerCh)
if ow.keepOuterOrder {
ow.pushToChan(ctx, task, ow.taskCh)
}
return
}
if err != nil {
cancelFunc()
logutil.Logger(ctx).Error("indexHashJoinOuterWorker.run failed", zap.Error(err))
if task == nil {
return
}
if finished := ow.pushToChan(ctx, task, ow.innerCh); finished {
Expand Down Expand Up @@ -449,7 +454,7 @@ func (iw *indexHashJoinInnerWorker) run(ctx context.Context, cancelFunc context.
if task.keepOuterOrder {
resultCh = task.resultCh
}
err := iw.handleTask(ctx, cancelFunc, task, joinResult, h, resultCh)
err := iw.handleTask(ctx, task, joinResult, h, resultCh)
if err != nil {
joinResult.err = err
break
Expand All @@ -465,9 +470,11 @@ func (iw *indexHashJoinInnerWorker) run(ctx context.Context, cancelFunc context.
}
}
}
failpoint.Inject("testIndexHashJoinInnerWorkerErr", func() {
joinResult.err = errors.New("mockIndexHashJoinInnerWorkerErr")
})
if joinResult.err != nil {
cancelFunc()
logutil.Logger(ctx).Error("indexHashJoinInnerWorker.run failed", zap.Error(joinResult.err))
resultCh <- joinResult
return
}
// When task.keepOuterOrder is TRUE(resultCh != iw.resultCh), the last
Expand Down Expand Up @@ -495,7 +502,7 @@ func (iw *indexHashJoinInnerWorker) getNewJoinResult(ctx context.Context) (*inde
return joinResult, ok
}

func (iw *indexHashJoinInnerWorker) buildHashTableForOuterResult(ctx context.Context, cancelFunc context.CancelFunc, task *indexHashJoinTask, h hash.Hash64) {
func (iw *indexHashJoinInnerWorker) buildHashTableForOuterResult(ctx context.Context, task *indexHashJoinTask, h hash.Hash64) {
buf, numChks := make([]byte, 1), task.outerResult.NumChunks()
task.lookupMap = newRowHashMap(task.outerResult.Len())
for chkIdx := 0; chkIdx < numChks; chkIdx++ {
Expand All @@ -515,10 +522,12 @@ func (iw *indexHashJoinInnerWorker) buildHashTableForOuterResult(ctx context.Con
}
h.Reset()
err := codec.HashChunkRow(iw.ctx.GetSessionVars().StmtCtx, h, row, iw.outerCtx.rowTypes, keyColIdx, buf)
failpoint.Inject("testIndexHashJoinBuildErr", func() {
err = errors.New("mockIndexHashJoinBuildErr")
})
if err != nil {
cancelFunc()
logutil.Logger(ctx).Error("indexHashJoinInnerWorker.buildHashTableForOuterResult failed", zap.Error(err))
return
// This panic will be recovered by the invoker.
panic(err.Error())
}
rowPtr := chunk.RowPtr{ChkIdx: uint32(chkIdx), RowIdx: uint32(rowIdx)}
task.lookupMap.Put(h.Sum64(), rowPtr)
Expand All @@ -542,11 +551,11 @@ func (iw *indexHashJoinInnerWorker) handleHashJoinInnerWorkerPanic(r interface{}
iw.wg.Done()
}

func (iw *indexHashJoinInnerWorker) handleTask(ctx context.Context, cancelFunc context.CancelFunc, task *indexHashJoinTask, joinResult *indexHashJoinResult, h hash.Hash64, resultCh chan *indexHashJoinResult) error {
func (iw *indexHashJoinInnerWorker) handleTask(ctx context.Context, task *indexHashJoinTask, joinResult *indexHashJoinResult, h hash.Hash64, resultCh chan *indexHashJoinResult) error {
iw.wg = &sync.WaitGroup{}
iw.wg.Add(1)
// TODO(XuHuaiyu): we may always use the smaller side to build the hashtable.
go util.WithRecovery(func() { iw.buildHashTableForOuterResult(ctx, cancelFunc, task, h) }, iw.handleHashJoinInnerWorkerPanic)
go util.WithRecovery(func() { iw.buildHashTableForOuterResult(ctx, task, h) }, iw.handleHashJoinInnerWorkerPanic)
err := iw.fetchInnerResults(ctx, task.lookUpJoinTask)
if err != nil {
return err
Expand Down
2 changes: 1 addition & 1 deletion executor/index_lookup_merge_join.go
Expand Up @@ -281,7 +281,7 @@ func (e *IndexLookUpMergeJoin) Next(ctx context.Context, req *chunk.Chunk) error
result.src <- result.chk
return nil
case <-ctx.Done():
return nil
return ctx.Err()
}
}

Expand Down
4 changes: 2 additions & 2 deletions executor/index_merge_reader.go
Expand Up @@ -337,7 +337,7 @@ func (w *partialTableWorker) fetchHandles(ctx context.Context, exitCh <-chan str
task := w.buildTableTask(handles, retChunk)
select {
case <-ctx.Done():
return count, nil
return count, ctx.Err()
case <-exitCh:
return count, nil
case <-finished:
Expand Down Expand Up @@ -586,7 +586,7 @@ func (w *partialIndexWorker) fetchHandles(ctx context.Context, result distsql.Se
task := w.buildTableTask(handles, retChunk)
select {
case <-ctx.Done():
return count, nil
return count, ctx.Err()
case <-exitCh:
return count, nil
case <-finished:
Expand Down
54 changes: 54 additions & 0 deletions executor/join_test.go
Expand Up @@ -2088,3 +2088,57 @@ func (s *testSuiteJoinSerial) TestIssue18070(c *C) {
err = tk.QueryToErr("select /*+ inl_merge_join(t1)*/ * from t1 join t2 on t1.a = t2.a;")
c.Assert(strings.Contains(err.Error(), "Out Of Memory Quota!"), IsTrue)
}

func (s *testSuite9) TestIssue18572_1(c *C) {
tk := testkit.NewTestKitWithInit(c, s.store)
tk.MustExec("drop table if exists t1")
tk.MustExec("create table t1(a int, b int, index idx(b));")
tk.MustExec("insert into t1 values(1, 1);")
tk.MustExec("insert into t1 select * from t1;")

c.Assert(failpoint.Enable("github.com/pingcap/tidb/executor/testIndexHashJoinInnerWorkerErr", "return"), IsNil)
defer func() {
c.Assert(failpoint.Disable("github.com/pingcap/tidb/executor/testIndexHashJoinInnerWorkerErr"), IsNil)
}()

rs, err := tk.Exec("select /*+ inl_hash_join(t1) */ * from t1 right join t1 t2 on t1.b=t2.b;")
c.Assert(err, IsNil)
_, err = session.GetRows4Test(context.Background(), nil, rs)
c.Assert(strings.Contains(err.Error(), "mockIndexHashJoinInnerWorkerErr"), IsTrue)
}

func (s *testSuite9) TestIssue18572_2(c *C) {
tk := testkit.NewTestKitWithInit(c, s.store)
tk.MustExec("drop table if exists t1")
tk.MustExec("create table t1(a int, b int, index idx(b));")
tk.MustExec("insert into t1 values(1, 1);")
tk.MustExec("insert into t1 select * from t1;")

c.Assert(failpoint.Enable("github.com/pingcap/tidb/executor/testIndexHashJoinOuterWorkerErr", "return"), IsNil)
defer func() {
c.Assert(failpoint.Disable("github.com/pingcap/tidb/executor/testIndexHashJoinOuterWorkerErr"), IsNil)
}()

rs, err := tk.Exec("select /*+ inl_hash_join(t1) */ * from t1 right join t1 t2 on t1.b=t2.b;")
c.Assert(err, IsNil)
_, err = session.GetRows4Test(context.Background(), nil, rs)
c.Assert(strings.Contains(err.Error(), "mockIndexHashJoinOuterWorkerErr"), IsTrue)
}

func (s *testSuite9) TestIssue18572_3(c *C) {
tk := testkit.NewTestKitWithInit(c, s.store)
tk.MustExec("drop table if exists t1")
tk.MustExec("create table t1(a int, b int, index idx(b));")
tk.MustExec("insert into t1 values(1, 1);")
tk.MustExec("insert into t1 select * from t1;")

c.Assert(failpoint.Enable("github.com/pingcap/tidb/executor/testIndexHashJoinBuildErr", "return"), IsNil)
defer func() {
c.Assert(failpoint.Disable("github.com/pingcap/tidb/executor/testIndexHashJoinBuildErr"), IsNil)
}()

rs, err := tk.Exec("select /*+ inl_hash_join(t1) */ * from t1 right join t1 t2 on t1.b=t2.b;")
c.Assert(err, IsNil)
_, err = session.GetRows4Test(context.Background(), nil, rs)
c.Assert(strings.Contains(err.Error(), "mockIndexHashJoinBuildErr"), IsTrue)
}

0 comments on commit 9e25dc4

Please sign in to comment.