Skip to content

Commit

Permalink
*: Chunkreuse try reuse chunk in operator (#38607)
Browse files Browse the repository at this point in the history
close #38606
  • Loading branch information
keeplearning20221 committed Nov 4, 2022
1 parent e5d3195 commit 0b3e1e9
Show file tree
Hide file tree
Showing 47 changed files with 476 additions and 92 deletions.
6 changes: 6 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,10 @@ type Config struct {
Plugin Plugin `toml:"plugin" json:"plugin"`
MaxServerConnections uint32 `toml:"max-server-connections" json:"max-server-connections"`
RunDDL bool `toml:"run-ddl" json:"run-ddl"`
// TiDBMaxReuseChunk indicates max cached chunk num
TiDBMaxReuseChunk uint32 `toml:"tidb-max-reuse-chunk" json:"tidb-max-reuse-chunk"`
// TiDBMaxReuseColumn indicates max cached column num
TiDBMaxReuseColumn uint32 `toml:"tidb-max-reuse-column" json:"tidb-max-reuse-column"`
}

// UpdateTempStoragePath is to update the `TempStoragePath` if port/statusPort was changed
Expand Down Expand Up @@ -975,6 +979,8 @@ var defaultConf = Config{
NewCollationsEnabledOnFirstBootstrap: true,
EnableGlobalKill: true,
TrxSummary: DefaultTrxSummary(),
TiDBMaxReuseChunk: 64,
TiDBMaxReuseColumn: 256,
}

var (
Expand Down
4 changes: 4 additions & 0 deletions config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -730,6 +730,8 @@ enable-enum-length-limit = false
stores-refresh-interval = 30
enable-forwarding = true
enable-global-kill = true
tidb-max-reuse-chunk = 10
tidb-max-reuse-column = 20
[performance]
txn-total-size-limit=2000
tcp-no-delay = false
Expand Down Expand Up @@ -798,6 +800,8 @@ max_connections = 200
require.True(t, conf.RepairMode)
require.Equal(t, uint64(16), conf.TiKVClient.ResolveLockLiteThreshold)
require.Equal(t, uint32(200), conf.Instance.MaxConnections)
require.Equal(t, uint32(10), conf.TiDBMaxReuseChunk)
require.Equal(t, uint32(20), conf.TiDBMaxReuseColumn)
require.Equal(t, []string{"tiflash"}, conf.IsolationRead.Engines)
require.Equal(t, 3080, conf.MaxIndexLength)
require.Equal(t, 70, conf.IndexLimit)
Expand Down
2 changes: 1 addition & 1 deletion distsql/select_result.go
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,7 @@ func (r *selectResult) readFromDefault(ctx context.Context, chk *chunk.Chunk) er
func (r *selectResult) readFromChunk(ctx context.Context, chk *chunk.Chunk) error {
if r.respChunkDecoder == nil {
r.respChunkDecoder = chunk.NewDecoder(
chunk.NewChunkWithCapacity(r.fieldTypes, 0),
r.ctx.GetSessionVars().GetNewChunk(r.fieldTypes, 0),
r.fieldTypes,
)
}
Expand Down
4 changes: 2 additions & 2 deletions executor/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -840,7 +840,7 @@ func (a *ExecStmt) runPessimisticSelectForUpdate(ctx context.Context, e Executor
}()
var rows []chunk.Row
var err error
req := newFirstChunk(e)
req := tryNewCacheChunk(e)
for {
err = a.next(ctx, e, req)
if err != nil {
Expand Down Expand Up @@ -887,7 +887,7 @@ func (a *ExecStmt) handleNoDelayExecutor(ctx context.Context, e Executor) (sqlex
}
}

err = a.next(ctx, e, newFirstChunk(e))
err = a.next(ctx, e, tryNewCacheChunk(e))
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion executor/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ func (e *CheckIndexRangeExec) Open(ctx context.Context) error {
FieldType: *colTypeForHandle,
})

e.srcChunk = newFirstChunk(e)
e.srcChunk = tryNewCacheChunk(e)
dagPB, err := e.buildDAGPB()
if err != nil {
return err
Expand Down
9 changes: 5 additions & 4 deletions executor/aggregate.go
Original file line number Diff line number Diff line change
Expand Up @@ -323,13 +323,14 @@ func (e *HashAggExec) initForUnparallelExec() {
failpoint.Inject("ConsumeRandomPanic", nil)
e.memTracker.Consume(hack.DefBucketMemoryUsageForMapStrToSlice*(1<<e.bInMap) + setSize)
e.groupKeyBuffer = make([][]byte, 0, 8)
e.childResult = newFirstChunk(e.children[0])
e.childResult = tryNewCacheChunk(e.children[0])
e.memTracker.Consume(e.childResult.MemoryUsage())

e.offsetOfSpilledChks, e.numOfSpilledChks = 0, 0
e.executed, e.isChildDrained = false, false
e.listInDisk = chunk.NewListInDisk(retTypes(e.children[0]))
e.tmpChkForSpill = newFirstChunk(e.children[0])

e.tmpChkForSpill = tryNewCacheChunk(e.children[0])
if vars := e.ctx.GetSessionVars(); vars.TrackAggregateMemoryUsage && variable.EnableTmpStorageOnOOM.Load() {
e.diskTracker = disk.NewTracker(e.id, -1)
e.diskTracker.AttachTo(vars.StmtCtx.DiskTracker)
Expand Down Expand Up @@ -379,7 +380,7 @@ func (e *HashAggExec) initForParallelExec(_ sessionctx.Context) {
globalOutputCh: e.finalOutputCh,
partialResultsMap: make(aggPartialResultMapper),
groupByItems: e.GroupByItems,
chk: newFirstChunk(e.children[0]),
chk: tryNewCacheChunk(e.children[0]),
groupKey: make([][]byte, 0, 8),
}
// There is a bucket in the empty partialResultsMap.
Expand Down Expand Up @@ -1272,7 +1273,7 @@ func (e *StreamAggExec) Open(ctx context.Context) error {
// If panic in Open, the children executor should be closed because they are open.
defer closeBaseExecutor(&e.baseExecutor)

e.childResult = newFirstChunk(e.children[0])
e.childResult = tryNewCacheChunk(e.children[0])
e.executed = false
e.isChildReturnEmpty = true
e.inputIter = chunk.NewIterator4Chunk(e.childResult)
Expand Down
6 changes: 3 additions & 3 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -1538,7 +1538,7 @@ func (b *executorBuilder) buildHashAgg(v *plannercore.PhysicalHashAgg) Executor
e.defaultVal = nil
} else {
if v.IsFinalAgg() {
e.defaultVal = chunk.NewChunkWithCapacity(retTypes(e), 1)
e.defaultVal = e.ctx.GetSessionVars().GetNewChunk(retTypes(e), 1)
}
}
for _, aggDesc := range v.AggFuncs {
Expand Down Expand Up @@ -1601,7 +1601,7 @@ func (b *executorBuilder) buildStreamAgg(v *plannercore.PhysicalStreamAgg) Execu
} else {
// Only do this for final agg, see issue #35295, #30923
if v.IsFinalAgg() {
e.defaultVal = chunk.NewChunkWithCapacity(retTypes(e), 1)
e.defaultVal = e.ctx.GetSessionVars().GetNewChunk(retTypes(e), 1)
}
}
for i, aggDesc := range v.AggFuncs {
Expand Down Expand Up @@ -3133,7 +3133,7 @@ func (b *executorBuilder) buildIndexLookUpJoin(v *plannercore.PhysicalIndexJoin)
e.innerCtx.hashCols = innerHashCols
e.innerCtx.hashCollators = hashCollators

e.joinResult = newFirstChunk(e)
e.joinResult = tryNewCacheChunk(e)
executorCounterIndexLookUpJoin.Inc()
return e
}
Expand Down
4 changes: 2 additions & 2 deletions executor/coprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func (h *CoprocessorDAGHandler) HandleRequest(ctx context.Context, req *coproces
return h.buildErrorResponse(err)
}

chk := newFirstChunk(e)
chk := tryNewCacheChunk(e)
tps := e.base().retFieldTypes
var totalChunks, partChunks []tipb.Chunk
memTracker := h.sctx.GetSessionVars().StmtCtx.MemTracker
Expand Down Expand Up @@ -99,7 +99,7 @@ func (h *CoprocessorDAGHandler) HandleStreamRequest(ctx context.Context, req *co
return stream.Send(h.buildErrorResponse(err))
}

chk := newFirstChunk(e)
chk := tryNewCacheChunk(e)
tps := e.base().retFieldTypes
for {
chk.Reset()
Expand Down
4 changes: 2 additions & 2 deletions executor/cte.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ func (e *CTEExec) computeSeedPart(ctx context.Context) (err error) {
if e.limitDone(e.iterInTbl) {
break
}
chk := newFirstChunk(e.seedExec)
chk := tryNewCacheChunk(e.seedExec)
if err = Next(ctx, e.seedExec, chk); err != nil {
return err
}
Expand Down Expand Up @@ -273,7 +273,7 @@ func (e *CTEExec) computeRecursivePart(ctx context.Context) (err error) {
}

for {
chk := newFirstChunk(e.recursiveExec)
chk := tryNewCacheChunk(e.recursiveExec)
if err = Next(ctx, e.recursiveExec, chk); err != nil {
return err
}
Expand Down
4 changes: 2 additions & 2 deletions executor/delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ func (e *DeleteExec) deleteSingleTableByChunk(ctx context.Context) error {
batchDelete := e.ctx.GetSessionVars().BatchDelete && !e.ctx.GetSessionVars().InTxn() &&
variable.EnableBatchDML.Load() && batchDMLSize > 0
fields := retTypes(e.children[0])
chk := newFirstChunk(e.children[0])
chk := tryNewCacheChunk(e.children[0])
columns := e.children[0].Schema().Columns
if len(columns) != len(fields) {
logutil.BgLogger().Error("schema columns and fields mismatch",
Expand Down Expand Up @@ -190,7 +190,7 @@ func (e *DeleteExec) deleteMultiTablesByChunk(ctx context.Context) error {
colPosInfos := e.tblColPosInfos
tblRowMap := make(tableRowMapType)
fields := retTypes(e.children[0])
chk := newFirstChunk(e.children[0])
chk := tryNewCacheChunk(e.children[0])
memUsageOfChk := int64(0)
for {
e.memTracker.Consume(-memUsageOfChk)
Expand Down
6 changes: 3 additions & 3 deletions executor/distsql.go
Original file line number Diff line number Diff line change
Expand Up @@ -866,7 +866,7 @@ func (w *indexWorker) fetchHandles(ctx context.Context, result distsql.SelectRes
}
}()
retTps := w.idxLookup.getRetTpsByHandle()
chk := chunk.NewChunkWithCapacity(retTps, w.idxLookup.maxChunkSize)
chk := w.idxLookup.ctx.GetSessionVars().GetNewChunk(retTps, w.idxLookup.maxChunkSize)
idxID := w.idxLookup.getIndexPlanRootID()
if w.idxLookup.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl != nil {
if idxID != w.idxLookup.id && w.idxLookup.stats != nil {
Expand Down Expand Up @@ -1161,7 +1161,7 @@ func (e *IndexLookUpRunTimeStats) Tp() int {
}

func (w *tableWorker) compareData(ctx context.Context, task *lookupTableTask, tableReader Executor) error {
chk := newFirstChunk(tableReader)
chk := tryNewCacheChunk(tableReader)
tblInfo := w.idxLookup.table.Meta()
vals := make([]types.Datum, 0, len(w.idxTblCols))

Expand Down Expand Up @@ -1317,7 +1317,7 @@ func (w *tableWorker) executeTask(ctx context.Context, task *lookupTableTask) er
handleCnt := len(task.handles)
task.rows = make([]chunk.Row, 0, handleCnt)
for {
chk := newFirstChunk(tableReader)
chk := tryNewCacheChunk(tableReader)
err = Next(ctx, tableReader, chk)
if err != nil {
logutil.Logger(ctx).Error("table reader fetch next chunk failed", zap.Error(err))
Expand Down
20 changes: 15 additions & 5 deletions executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,12 @@ func newFirstChunk(e Executor) *chunk.Chunk {
return chunk.New(base.retFieldTypes, base.initCap, base.maxChunkSize)
}

func tryNewCacheChunk(e Executor) *chunk.Chunk {
base := e.base()
s := base.ctx.GetSessionVars()
return s.GetNewChunkWithCapacity(base.retFieldTypes, base.initCap, base.maxChunkSize)
}

// newList creates a new List to buffer current executor's result.
func newList(e Executor) *chunk.List {
base := e.base()
Expand Down Expand Up @@ -1387,7 +1393,7 @@ func (e *LimitExec) Open(ctx context.Context) error {
if err := e.baseExecutor.Open(ctx); err != nil {
return err
}
e.childResult = newFirstChunk(e.children[0])
e.childResult = tryNewCacheChunk(e.children[0])
e.cursor = 0
e.meetFirstBatch = e.begin == 0
return nil
Expand Down Expand Up @@ -1444,8 +1450,7 @@ func init() {
if err != nil {
return nil, err
}
chk := newFirstChunk(exec)

chk := tryNewCacheChunk(exec)
err = Next(ctx, exec, chk)
if err != nil {
return nil, err
Expand Down Expand Up @@ -1520,7 +1525,7 @@ func (e *SelectionExec) Open(ctx context.Context) error {
func (e *SelectionExec) open(ctx context.Context) error {
e.memTracker = memory.NewTracker(e.id, -1)
e.memTracker.AttachTo(e.ctx.GetSessionVars().StmtCtx.MemTracker)
e.childResult = newFirstChunk(e.children[0])
e.childResult = tryNewCacheChunk(e.children[0])
e.memTracker.Consume(e.childResult.MemoryUsage())
e.batched = expression.Vectorizable(e.filters)
if e.batched {
Expand Down Expand Up @@ -1700,7 +1705,7 @@ func (e *MaxOneRowExec) Next(ctx context.Context, req *chunk.Chunk) error {
return ErrSubqueryMoreThan1Row
}

childChunk := newFirstChunk(e.children[0])
childChunk := tryNewCacheChunk(e.children[0])
err = Next(ctx, e.children[0], childChunk)
if err != nil {
return err
Expand Down Expand Up @@ -2134,6 +2139,7 @@ func ResetContextOfStmt(ctx sessionctx.Context, s ast.StmtNode) (err error) {
errCount, warnCount := vars.StmtCtx.NumErrorWarnings()
vars.SysErrorCount = errCount
vars.SysWarningCount = warnCount
vars.ExchangeChunkStatus()
vars.StmtCtx = sc
vars.PrevFoundInPlanCache = vars.FoundInPlanCache
vars.FoundInPlanCache = false
Expand Down Expand Up @@ -2172,6 +2178,10 @@ func ResetUpdateStmtCtx(sc *stmtctx.StatementContext, stmt *ast.UpdateStmt, vars
// expression using rows from a chunk, and then fill this value into the chunk
func FillVirtualColumnValue(virtualRetTypes []*types.FieldType, virtualColumnIndex []int,
schema *expression.Schema, columns []*model.ColumnInfo, sctx sessionctx.Context, req *chunk.Chunk) error {
if len(virtualColumnIndex) == 0 {
return nil
}

virCols := chunk.NewChunkWithCapacity(virtualRetTypes, req.Capacity())
iter := chunk.NewIterator4Chunk(req)
for i, idx := range virtualColumnIndex {
Expand Down
2 changes: 1 addition & 1 deletion executor/explain.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ func (e *ExplainExec) executeAnalyzeExec(ctx context.Context) (err error) {
}).run()
}
e.executed = true
chk := newFirstChunk(e.analyzeExec)
chk := tryNewCacheChunk(e.analyzeExec)
for {
err = Next(ctx, e.analyzeExec, chk)
if err != nil || chk.NumRows() == 0 {
Expand Down
2 changes: 1 addition & 1 deletion executor/index_lookup_hash_join.go
Original file line number Diff line number Diff line change
Expand Up @@ -418,7 +418,7 @@ func (e *IndexNestedLoopHashJoin) newInnerWorker(taskCh chan *indexHashJoinTask,
innerCtx: e.innerCtx,
outerCtx: e.outerCtx,
ctx: e.ctx,
executorChk: chunk.NewChunkWithCapacity(e.innerCtx.rowTypes, e.maxChunkSize),
executorChk: e.ctx.GetSessionVars().GetNewChunk(e.innerCtx.rowTypes, e.maxChunkSize),
indexRanges: copiedRanges,
keyOff2IdxOff: e.keyOff2IdxOff,
stats: innerStats,
Expand Down
8 changes: 4 additions & 4 deletions executor/index_lookup_join.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ func (e *IndexLookUpJoin) newInnerWorker(taskCh chan *lookUpJoinTask) *innerWork
outerCtx: e.outerCtx,
taskCh: taskCh,
ctx: e.ctx,
executorChk: chunk.NewChunkWithCapacity(e.innerCtx.rowTypes, e.maxChunkSize),
executorChk: e.ctx.GetSessionVars().GetNewChunk(e.innerCtx.rowTypes, e.maxChunkSize),
indexRanges: copiedRanges,
keyOff2IdxOff: e.keyOff2IdxOff,
stats: innerStats,
Expand Down Expand Up @@ -431,7 +431,7 @@ func (ow *outerWorker) buildTask(ctx context.Context) (*lookUpJoinTask, error) {
}
maxChunkSize := ow.ctx.GetSessionVars().MaxChunkSize
for requiredRows > task.outerResult.Len() {
chk := chunk.NewChunkWithCapacity(ow.outerCtx.rowTypes, maxChunkSize)
chk := ow.ctx.GetSessionVars().GetNewChunk(ow.outerCtx.rowTypes, maxChunkSize)
chk = chk.SetRequiredRows(requiredRows, maxChunkSize)
err := Next(ctx, ow.executor, chk)
if err != nil {
Expand Down Expand Up @@ -462,7 +462,7 @@ func (ow *outerWorker) buildTask(ctx context.Context) (*lookUpJoinTask, error) {
}
task.encodedLookUpKeys = make([]*chunk.Chunk, task.outerResult.NumChunks())
for i := range task.encodedLookUpKeys {
task.encodedLookUpKeys[i] = chunk.NewChunkWithCapacity([]*types.FieldType{types.NewFieldType(mysql.TypeBlob)}, task.outerResult.GetChunk(i).NumRows())
task.encodedLookUpKeys[i] = ow.ctx.GetSessionVars().GetNewChunk([]*types.FieldType{types.NewFieldType(mysql.TypeBlob)}, task.outerResult.GetChunk(i).NumRows())
}
return task, nil
}
Expand Down Expand Up @@ -714,7 +714,7 @@ func (iw *innerWorker) fetchInnerResults(ctx context.Context, task *lookUpJoinTa
break
}
innerResult.Add(iw.executorChk)
iw.executorChk = newFirstChunk(innerExec)
iw.executorChk = tryNewCacheChunk(innerExec)
}
task.innerResult = innerResult
return nil
Expand Down
5 changes: 5 additions & 0 deletions executor/index_lookup_join_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -352,13 +352,16 @@ func TestIssue23722(t *testing.T) {
tk.MustExec("insert into t values (20301,'Charlie',x'7a');")
tk.MustQuery("select * from t;").Check(testkit.Rows("20301 Charlie z"))
tk.MustQuery("select * from t where c in (select c from t where t.c >= 'a');").Check(testkit.Rows("20301 Charlie z"))
tk.MustQuery("select @@last_sql_use_alloc").Check(testkit.Rows("1"))

// Test lookup content exceeds primary key prefix.
tk.MustExec("drop table if exists t;")
tk.MustExec("create table t (a int, b char(10), c varchar(255), primary key (c(5)) clustered);")
tk.MustExec("insert into t values (20301,'Charlie','aaaaaaa');")
tk.MustQuery("select @@last_sql_use_alloc").Check(testkit.Rows("1"))
tk.MustQuery("select * from t;").Check(testkit.Rows("20301 Charlie aaaaaaa"))
tk.MustQuery("select * from t where c in (select c from t where t.c >= 'a');").Check(testkit.Rows("20301 Charlie aaaaaaa"))
tk.MustQuery("select @@last_sql_use_alloc").Check(testkit.Rows("1"))

// Test the original case.
tk.MustExec("drop table if exists t;")
Expand Down Expand Up @@ -452,7 +455,9 @@ func TestIssue27893(t *testing.T) {
tk.MustExec("insert into t1 values('x')")
tk.MustExec("insert into t2 values(1)")
tk.MustQuery("select /*+ inl_join(t2) */ count(*) from t1 join t2 on t1.a = t2.a").Check(testkit.Rows("1"))
tk.MustQuery("select @@last_sql_use_alloc").Check(testkit.Rows("1"))
tk.MustQuery("select /*+ inl_hash_join(t2) */ count(*) from t1 join t2 on t1.a = t2.a").Check(testkit.Rows("1"))
tk.MustQuery("select @@last_sql_use_alloc").Check(testkit.Rows("1"))
}

func TestPartitionTableIndexJoinAndIndexReader(t *testing.T) {
Expand Down
4 changes: 2 additions & 2 deletions executor/index_lookup_merge_join.go
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,7 @@ func (omw *outerMergeWorker) buildTask(ctx context.Context) (*lookUpMergeJoinTas
requiredRows = omw.maxBatchSize
}
for requiredRows > 0 {
execChk := newFirstChunk(omw.executor)
execChk := tryNewCacheChunk(omw.executor)
err := Next(ctx, omw.executor, execChk)
if err != nil {
return task, err
Expand Down Expand Up @@ -706,7 +706,7 @@ func (imw *innerMergeWorker) dedupDatumLookUpKeys(lookUpContents []*indexJoinLoo

// fetchNextInnerResult collects a chunk of inner results from inner child executor.
func (imw *innerMergeWorker) fetchNextInnerResult(ctx context.Context, task *lookUpMergeJoinTask) (beginRow chunk.Row, err error) {
task.innerResult = chunk.NewChunkWithCapacity(retTypes(imw.innerExec), imw.ctx.GetSessionVars().MaxChunkSize)
task.innerResult = imw.ctx.GetSessionVars().GetNewChunk(retTypes(imw.innerExec), imw.ctx.GetSessionVars().MaxChunkSize)
err = Next(ctx, imw.innerExec, task.innerResult)
task.innerIter = chunk.NewIterator4Chunk(task.innerResult)
beginRow = task.innerIter.Begin()
Expand Down

0 comments on commit 0b3e1e9

Please sign in to comment.