Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

executor: exit all goroutines immediately when exceeded mem-quota #37405

Merged
merged 17 commits into from Sep 14, 2022
9 changes: 7 additions & 2 deletions executor/adapter.go
Expand Up @@ -904,9 +904,14 @@ func (a *ExecStmt) buildExecutor() (Executor, error) {
return e, nil
}

func (a *ExecStmt) openExecutor(ctx context.Context, e Executor) error {
func (a *ExecStmt) openExecutor(ctx context.Context, e Executor) (err error) {
defer func() {
if r := recover(); r != nil {
err = errors.New(fmt.Sprint(r))
}
}()
start := time.Now()
err := e.Open(ctx)
err = e.Open(ctx)
a.phaseOpenDurations[0] += time.Since(start)
return err
}
Expand Down
2 changes: 0 additions & 2 deletions executor/aggregate.go
Expand Up @@ -301,8 +301,6 @@ func (e *HashAggExec) Open(ctx context.Context) error {
if err := e.baseExecutor.Open(ctx); err != nil {
return err
}
// If panic here, the children executor should be closed because they are open.
defer closeBaseExecutor(&e.baseExecutor)
e.prepared = false

e.memTracker = memory.NewTracker(e.id, -1)
Expand Down
85 changes: 49 additions & 36 deletions executor/aggregate_test.go
Expand Up @@ -1495,59 +1495,72 @@ func TestAggInDisk(t *testing.T) {
tk.MustQuery("select /*+ HASH_AGG() */ count(c) from t group by c1;").Check(testkit.Rows())
}

func TestRandomPanicAggConsume(t *testing.T) {
func TestRandomPanicConsume(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("set @@tidb_max_chunk_size=32")
tk.MustExec("set @@tidb_init_chunk_size=1")
tk.MustExec("drop table if exists t")
tk.MustExec("create table t(a int)")
tk.MustExec("drop table if exists t;")
tk.MustExec("create table t(pk bigint primary key auto_random,a int, index idx(a));")
tk.MustExec("SPLIT TABLE t BETWEEN (-9223372036854775808) AND (9223372036854775807) REGIONS 50;") // Split 50 regions to simulate many requests
for i := 0; i <= 1000; i++ {
tk.MustExec(fmt.Sprintf("insert into t values(%v),(%v),(%v)", i, i, i))
tk.MustExec(fmt.Sprintf("insert into t(a) values(%v),(%v),(%v)", i, i, i))
}

fpName := "github.com/pingcap/tidb/executor/ConsumeRandomPanic"
require.NoError(t, failpoint.Enable(fpName, "5%panic(\"ERROR 1105 (HY000): Out Of Memory Quota![conn_id=1]\")"))
defer func() {
require.NoError(t, failpoint.Disable(fpName))
}()
fpName2 := "github.com/pingcap/tidb/store/copr/ConsumeRandomPanic"
require.NoError(t, failpoint.Enable(fpName2, "3%panic(\"ERROR 1105 (HY000): Out Of Memory Quota![conn_id=1]\")"))
defer func() {
require.NoError(t, failpoint.Disable(fpName2))
}()

sqls := []string{
// Without index
"select /*+ HASH_AGG() */ /*+ USE_INDEX(t) */ count(a) from t group by a", // HashAgg Paralleled
"select /*+ HASH_AGG() */ /*+ USE_INDEX(t) */ count(distinct a) from t", // HashAgg Unparalleled
"select /*+ STREAM_AGG() */ /*+ USE_INDEX(t) */ count(a) from t group by a", // Shuffle+StreamAgg
"select /*+ USE_INDEX(t) */ a * a, a / a, a + a , a - a from t", // Projection
"select /*+ HASH_JOIN(t1) */ /*+ USE_INDEX(t1) */ /*+ USE_INDEX(t2) */* from t t1 join t t2 on t1.a=t2.a", // HashJoin
"select /*+ MERGE_JOIN(t1) */ /*+ USE_INDEX(t1) */ /*+ USE_INDEX(t2) */* from t t1 join t t2 on t1.a=t2.a", // Shuffle+MergeJoin
"select /*+ USE_INDEX(t) */ * from t", // TableScan

// With index
"select /*+ HASH_AGG() */ /*+ USE_INDEX(t,idx) */ count(a) from t group by a", // HashAgg Paralleled
"select /*+ HASH_AGG() */ /*+ USE_INDEX(t,idx) */ count(distinct a) from t", // HashAgg Unparalleled
"select /*+ STREAM_AGG() */ /*+ USE_INDEX(t,idx) */ count(a) from t group by a", // Shuffle+StreamAgg
"select /*+ USE_INDEX(t,idx) */ a * a, a / a, a + a , a - a from t", // Projection
"select /*+ HASH_JOIN(t1) */ /*+ USE_INDEX(t1,idx) */ /*+ USE_INDEX(t2,idx) */ * from t t1 join t t2 on t1.a=t2.a", // HashJoin
"select /*+ MERGE_JOIN(t1) */ /*+ USE_INDEX(t1,idx) */ /*+ USE_INDEX(t2,idx) */ * from t t1 join t t2 on t1.a=t2.a", // Shuffle+MergeJoin
"select /*+ INL_JOIN(t2) */ * from t t1 join t t2 on t1.a=t2.a;", // Index Join
"select /*+ INL_HASH_JOIN(t2) */ * from t t1 join t t2 on t1.a=t2.a;", // Index Hash Join
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems we do not inject the random panic for inl_hash_join?

"select /*+ USE_INDEX(t, idx) */ * from t", // IndexScan
}

// Test 10 times panic for each AggExec.
// Test 10 times panic for each Executor.
var res sqlexec.RecordSet
for i := 1; i <= 10; i++ {
var err error
for err == nil {
// Test paralleled hash agg.
res, err = tk.Exec("select /*+ HASH_AGG() */ count(a) from t group by a")
if err == nil {
_, err = session.GetRows4Test(context.Background(), tk.Session(), res)
require.NoError(t, res.Close())
}
}
require.EqualError(t, err, "failpoint panic: ERROR 1105 (HY000): Out Of Memory Quota![conn_id=1]")

err = nil
for err == nil {
// Test unparalleled hash agg.
res, err = tk.Exec("select /*+ HASH_AGG() */ count(distinct a) from t")
if err == nil {
_, err = session.GetRows4Test(context.Background(), tk.Session(), res)
require.NoError(t, res.Close())
}
}
require.EqualError(t, err, "failpoint panic: ERROR 1105 (HY000): Out Of Memory Quota![conn_id=1]")

err = nil
for err == nil {
// Test stream agg.
res, err = tk.Exec("select /*+ STREAM_AGG() */ count(a) from t")
if err == nil {
_, err = session.GetRows4Test(context.Background(), tk.Session(), res)
require.NoError(t, res.Close())
for _, sql := range sqls {
for i := 1; i <= 10; i++ {
concurrency := rand.Int31n(4) + 1 // test 1~5 concurrency randomly
tk.MustExec(fmt.Sprintf("set @@tidb_executor_concurrency=%v", concurrency))
tk.MustExec(fmt.Sprintf("set @@tidb_merge_join_concurrency=%v", concurrency))
tk.MustExec(fmt.Sprintf("set @@tidb_streamagg_concurrency=%v", concurrency))
distConcurrency := rand.Int31n(15) + 1
tk.MustExec(fmt.Sprintf("set @@tidb_distsql_scan_concurrency=%v", distConcurrency))
var err error
for err == nil {
res, err = tk.Exec(sql)
if err == nil {
_, err = session.GetRows4Test(context.Background(), tk.Session(), res)
require.NoError(t, res.Close())
}
}
require.EqualError(t, err, "failpoint panic: ERROR 1105 (HY000): Out Of Memory Quota![conn_id=1]")
}
require.EqualError(t, err, "failpoint panic: ERROR 1105 (HY000): Out Of Memory Quota![conn_id=1]")
}
}

Expand Down
2 changes: 1 addition & 1 deletion executor/index_lookup_hash_join.go
Expand Up @@ -629,10 +629,10 @@ func (iw *indexHashJoinInnerWorker) handleTask(ctx context.Context, task *indexH

iw.wg = &sync.WaitGroup{}
iw.wg.Add(1)
iw.lookup.workerWg.Add(1)
// TODO(XuHuaiyu): we may always use the smaller side to build the hashtable.
go util.WithRecovery(
func() {
iw.lookup.workerWg.Add(1)
iw.buildHashTableForOuterResult(ctx, task, h)
},
func(r interface{}) {
Expand Down
3 changes: 3 additions & 0 deletions executor/index_lookup_join.go
Expand Up @@ -372,6 +372,7 @@ func (ow *outerWorker) run(ctx context.Context, wg *sync.WaitGroup) {
}()
for {
failpoint.Inject("TestIssue30211", nil)
failpoint.Inject("ConsumeRandomPanic", nil)
task, err := ow.buildTask(ctx)
if err != nil {
task.doneCh <- err
Expand Down Expand Up @@ -555,6 +556,7 @@ func (iw *innerWorker) constructLookupContent(task *lookUpJoinTask) ([]*indexJoi
}
return nil, err
}
failpoint.Inject("ConsumeRandomPanic", nil)
if rowIdx == 0 {
iw.memTracker.Consume(types.EstimatedMemUsage(dLookUpKey, numRows))
}
Expand Down Expand Up @@ -700,6 +702,7 @@ func (iw *innerWorker) fetchInnerResults(ctx context.Context, task *lookUpJoinTa
default:
}
err := Next(ctx, innerExec, iw.executorChk)
failpoint.Inject("ConsumeRandomPanic", nil)
if err != nil {
return err
}
Expand Down
4 changes: 4 additions & 0 deletions executor/join.go
Expand Up @@ -217,6 +217,7 @@ func (e *HashJoinExec) fetchProbeSideChunks(ctx context.Context) {
probeSideResult.SetRequiredRows(required, e.maxChunkSize)
}
err := Next(ctx, e.probeSideExec, probeSideResult)
failpoint.Inject("ConsumeRandomPanic", nil)
if err != nil {
e.joinResultCh <- &hashjoinWorkerResult{
err: err,
Expand Down Expand Up @@ -290,6 +291,7 @@ func (e *HashJoinExec) fetchBuildSideRows(ctx context.Context, chkCh chan<- *chu
return
}
failpoint.Inject("errorFetchBuildSideRowsMockOOMPanic", nil)
failpoint.Inject("ConsumeRandomPanic", nil)
if chk.NumRows() == 0 {
return
}
Expand Down Expand Up @@ -467,6 +469,7 @@ func (e *HashJoinExec) runJoinWorker(workerID uint, probeKeyColIdx []int) {
return
case probeSideResult, ok = <-e.probeResultChs[workerID]:
}
failpoint.Inject("ConsumeRandomPanic", nil)
if !ok {
break
}
Expand Down Expand Up @@ -810,6 +813,7 @@ func (e *HashJoinExec) buildHashTableForList(buildSideResultCh <-chan *chunk.Chu
err = e.rowContainer.PutChunkSelected(chk, selected, e.isNullEQ)
}
}
failpoint.Inject("ConsumeRandomPanic", nil)
if err != nil {
return err
}
Expand Down
1 change: 1 addition & 0 deletions executor/merge_join.go
Expand Up @@ -322,6 +322,7 @@ func (e *MergeJoinExec) Next(ctx context.Context, req *chunk.Chunk) (err error)
innerIter := e.innerTable.groupRowsIter
outerIter := e.outerTable.groupRowsIter
for !req.IsFull() {
failpoint.Inject("ConsumeRandomPanic", nil)
if innerIter.Current() == innerIter.End() {
if err := e.innerTable.fetchNextInnerGroup(ctx, e); err != nil {
return err
Expand Down
5 changes: 5 additions & 0 deletions executor/projection.go
Expand Up @@ -190,6 +190,7 @@ func (e *ProjectionExec) unParallelExecute(ctx context.Context, chk *chunk.Chunk
e.childResult.SetRequiredRows(chk.RequiredRows(), e.maxChunkSize)
mSize := e.childResult.MemoryUsage()
err := Next(ctx, e.children[0], e.childResult)
failpoint.Inject("ConsumeRandomPanic", nil)
e.memTracker.Consume(e.childResult.MemoryUsage() - mSize)
if err != nil {
return err
Expand Down Expand Up @@ -219,6 +220,7 @@ func (e *ProjectionExec) parallelExecute(ctx context.Context, chk *chunk.Chunk)
}
mSize := output.chk.MemoryUsage()
chk.SwapColumns(output.chk)
failpoint.Inject("ConsumeRandomPanic", nil)
e.memTracker.Consume(output.chk.MemoryUsage() - mSize)
e.fetcher.outputCh <- output
return nil
Expand Down Expand Up @@ -252,6 +254,7 @@ func (e *ProjectionExec) prepare(ctx context.Context) {
})

inputChk := newFirstChunk(e.children[0])
failpoint.Inject("ConsumeRandomPanic", nil)
e.memTracker.Consume(inputChk.MemoryUsage())
e.fetcher.inputCh <- &projectionInput{
chk: inputChk,
Expand Down Expand Up @@ -379,6 +382,7 @@ func (f *projectionInputFetcher) run(ctx context.Context) {
input.chk.SetRequiredRows(int(requiredRows), f.proj.maxChunkSize)
mSize := input.chk.MemoryUsage()
err := Next(ctx, f.child, input.chk)
failpoint.Inject("ConsumeRandomPanic", nil)
f.proj.memTracker.Consume(input.chk.MemoryUsage() - mSize)
if err != nil || input.chk.NumRows() == 0 {
output.done <- err
Expand Down Expand Up @@ -439,6 +443,7 @@ func (w *projectionWorker) run(ctx context.Context) {

mSize := output.chk.MemoryUsage() + input.chk.MemoryUsage()
err := w.evaluatorSuit.Run(w.sctx, input.chk, output.chk)
failpoint.Inject("ConsumeRandomPanic", nil)
w.proj.memTracker.Consume(output.chk.MemoryUsage() + input.chk.MemoryUsage() - mSize)
output.done <- err

Expand Down
1 change: 1 addition & 0 deletions store/copr/coprocessor.go
Expand Up @@ -576,6 +576,7 @@ func (worker *copIteratorWorker) sendToRespCh(resp *copResponse, respCh chan<- *
}
}
})
failpoint.Inject("ConsumeRandomPanic", nil)
worker.memTracker.Consume(consumed)
}
select {
Expand Down
10 changes: 4 additions & 6 deletions util/memory/action.go
Expand Up @@ -136,15 +136,13 @@ func (a *PanicOnExceed) SetLogHook(hook func(uint64)) {
// Action panics when memory usage exceeds memory quota.
func (a *PanicOnExceed) Action(_ *Tracker) {
a.mutex.Lock()
if a.acted {
a.mutex.Unlock()
return
if !a.acted {
if a.logHook != nil {
a.logHook(a.ConnID)
}
}
a.acted = true
a.mutex.Unlock()
if a.logHook != nil {
a.logHook(a.ConnID)
}
panic(PanicMemoryExceed + fmt.Sprintf("[conn_id=%d]", a.ConnID))
}

Expand Down