Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: Chunkreuse try reuse chunk in operator #38607

Merged
merged 86 commits into from
Nov 4, 2022
Merged
Show file tree
Hide file tree
Changes from 81 commits
Commits
Show all changes
86 commits
Select commit Hold shift + click to select a range
5fbc0c6
*: reuse chunk
keeplearning20221 Oct 15, 2022
4515b45
*: reuse chunk
keeplearning20221 Oct 15, 2022
c850b57
Merge branch 'chunkreuse' of https://github.com/keeplearning20221/tid…
keeplearning20221 Oct 19, 2022
97b07b2
modified: executor/executor.go
keeplearning20221 Oct 19, 2022
83b6309
modified: executor/adapter.go
keeplearning20221 Oct 21, 2022
93fdf67
modified: sessionctx/variable/session.go
keeplearning20221 Oct 22, 2022
e17784b
Merge branch 'master' into chunkreuse
keeplearning20221 Oct 22, 2022
e27aa8d
modified: sessionctx/variable/session.go
keeplearning20221 Oct 22, 2022
bf05b46
modified: util/chunk/alloc.go
keeplearning20221 Oct 22, 2022
9ca7217
modified: sessionctx/variable/sysvar.go
keeplearning20221 Oct 22, 2022
9664c1d
modified: util/chunk/alloc.go
keeplearning20221 Oct 22, 2022
26bb3f9
modified: util/chunk/alloc.go
keeplearning20221 Oct 22, 2022
b41a80f
modified: executor/distsql.go
keeplearning20221 Oct 22, 2022
7c8a4fa
modified: util/chunk/alloc.go
keeplearning20221 Oct 22, 2022
f81fbf8
Merge branch 'master' into chunkreuse
keeplearning20221 Oct 24, 2022
cd5fd7c
Merge branch 'master' into chunkreuse
keeplearning20221 Oct 24, 2022
8a1e1c1
modified: executor/set_test.go
keeplearning20221 Oct 24, 2022
ad1bded
Merge branch 'chunkreuse' of https://github.com/keeplearning20221/tid…
keeplearning20221 Oct 24, 2022
ab604fb
Merge branch 'master' into chunkreuse
keeplearning20221 Oct 24, 2022
873a2f5
modified: sessionctx/variable/tidb_vars.go
keeplearning20221 Oct 24, 2022
ce16f73
modified: util/chunk/chunk_test.go
keeplearning20221 Oct 24, 2022
47d642b
modified: sessionctx/variable/session.go
keeplearning20221 Oct 24, 2022
ebc3f8b
modified: BUILD.bazel
keeplearning20221 Oct 24, 2022
888b2a9
modified: chunk_test.go
keeplearning20221 Oct 24, 2022
2915a45
modified: util/chunk/chunk_test.go
keeplearning20221 Oct 25, 2022
4918110
modified: chunk_test.go
keeplearning20221 Oct 25, 2022
2406f50
modified: chunk_test.go
keeplearning20221 Oct 25, 2022
b06e0e7
modified: chunk_test.go
keeplearning20221 Oct 25, 2022
0253327
Merge branch 'master' into chunkreuse
keeplearning20221 Oct 25, 2022
dcb0de7
modified: executor/executor.go
keeplearning20221 Oct 25, 2022
613d59a
modified: executor/update_test.go
keeplearning20221 Oct 25, 2022
836f88b
modified: executor/update_test.go
keeplearning20221 Oct 25, 2022
f595af6
modified: executor/showtest/show_test.go
keeplearning20221 Oct 26, 2022
1097237
modified: show.go
keeplearning20221 Oct 27, 2022
dca77f1
Merge branch 'master' into chunkreuse
keeplearning20221 Oct 27, 2022
61bda04
modified: executor/sort_test.go
keeplearning20221 Oct 27, 2022
a88da50
Merge branch 'chunkreuse' of https://github.com/keeplearning20221/tid…
keeplearning20221 Oct 27, 2022
1c879c6
Merge branch 'master' into chunkreuse
keeplearning20221 Oct 27, 2022
dd4d126
modified: ../testkit/testkit.go
keeplearning20221 Oct 27, 2022
a489259
modified: ../testkit/testkit.go
keeplearning20221 Oct 27, 2022
cb55f3c
modified: sessionctx/variable/session.go
keeplearning20221 Oct 28, 2022
2f0d7d5
modified: infoschema/tables_test.go
keeplearning20221 Oct 28, 2022
47a591c
Merge branch 'master' into chunkreuse
keeplearning20221 Oct 28, 2022
b6c4c28
Merge branch 'master' into chunkreuse
keeplearning20221 Oct 28, 2022
c8f5381
Merge branch 'master' into chunkreuse
keeplearning20221 Oct 29, 2022
ccc9c5b
modified: util/chunk/alloc.go
keeplearning20221 Oct 29, 2022
27ed1bb
Merge branch 'chunkreuse' of https://github.com/keeplearning20221/tid…
keeplearning20221 Oct 29, 2022
fdd3069
modified: util/chunk/alloc.go
keeplearning20221 Oct 29, 2022
d80594b
modified: util/chunk/alloc.go
keeplearning20221 Oct 29, 2022
713da61
modified: util/chunk/alloc.go
keeplearning20221 Oct 29, 2022
6cfddd8
modified: util/chunk/alloc.go
keeplearning20221 Oct 30, 2022
6f959e0
modified: util/chunk/alloc.go
keeplearning20221 Oct 30, 2022
4ca8b69
modified: util/chunk/alloc.go
keeplearning20221 Oct 30, 2022
2c3a894
modified: util/chunk/alloc.go
keeplearning20221 Oct 30, 2022
43e5740
Merge branch 'master' into chunkreuse
keeplearning20221 Oct 30, 2022
08ad815
modified: util/chunk/alloc.go
keeplearning20221 Oct 30, 2022
b613861
Merge branch 'chunkreuse' of https://github.com/keeplearning20221/tid…
keeplearning20221 Oct 30, 2022
7d0a6e4
modified: executor/executor.go
keeplearning20221 Oct 31, 2022
46975cd
modified: executor/set_test.go
keeplearning20221 Nov 1, 2022
3279eaf
modified: sessionctx/variable/session.go
keeplearning20221 Nov 1, 2022
6574fa6
modified: executor/executor.go
keeplearning20221 Nov 1, 2022
712d2e2
Merge branch 'master' into chunkreuse
keeplearning20221 Nov 1, 2022
11746b5
modified: sessionctx/variable/sysvar.go
keeplearning20221 Nov 1, 2022
20199d9
Merge branch 'master' into chunkreuse
keeplearning20221 Nov 2, 2022
9c9db3a
modified: executor/aggregate.go
keeplearning20221 Nov 2, 2022
f6dc3b2
modified: executor/index_lookup_join_test.go
keeplearning20221 Nov 2, 2022
21c8018
modified: alloc.go
keeplearning20221 Nov 2, 2022
ea189ad
modified: config/config.go
keeplearning20221 Nov 3, 2022
e2cf4eb
modified: config/config.go
keeplearning20221 Nov 3, 2022
b9fab60
modified: ../../config/config.go
keeplearning20221 Nov 3, 2022
9bdc5d0
Merge branch 'master' into chunkreuse
keeplearning20221 Nov 3, 2022
95f4535
modified: ../../util/chunk/alloc.go
keeplearning20221 Nov 3, 2022
c94ede9
Merge branch 'master' into chunkreuse
keeplearning20221 Nov 3, 2022
9c78458
modified: ../../config/config_test.go
keeplearning20221 Nov 3, 2022
627389d
Merge branch 'master' into chunkreuse
keeplearning20221 Nov 3, 2022
4fcfcc9
modified: executor/set_test.go
keeplearning20221 Nov 3, 2022
c42f499
Merge branch 'master' into chunkreuse
keeplearning20221 Nov 3, 2022
a40971e
modified: sessionctx/variable/tidb_vars.go
keeplearning20221 Nov 3, 2022
53ebea1
modified: infoschema/tables_test.go
keeplearning20221 Nov 3, 2022
fb82c92
Merge branch 'master' into chunkreuse
keeplearning20221 Nov 3, 2022
0ceee0d
Merge branch 'master' into chunkreuse
keeplearning20221 Nov 3, 2022
4620a89
Merge branch 'master' into chunkreuse
ti-chi-bot Nov 4, 2022
deed88f
Merge branch 'master' into chunkreuse
ti-chi-bot Nov 4, 2022
75f41a9
Merge branch 'master' into chunkreuse
ti-chi-bot Nov 4, 2022
d8ef033
Merge branch 'master' into chunkreuse
ti-chi-bot Nov 4, 2022
13b57b5
Merge branch 'master' into chunkreuse
ti-chi-bot Nov 4, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
6 changes: 6 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,10 @@ type Config struct {
Plugin Plugin `toml:"plugin" json:"plugin"`
MaxServerConnections uint32 `toml:"max-server-connections" json:"max-server-connections"`
RunDDL bool `toml:"run-ddl" json:"run-ddl"`
// TiDBMaxReuseChunk indicates max cached chunk num
TiDBMaxReuseChunk uint32 `toml:"tidb-max-reuse-chunk" json:"tidb-max-reuse-chunk"`
// TiDBMaxReuseColumn indicates max cached column num
TiDBMaxReuseColumn uint32 `toml:"tidb-max-reuse-column" json:"tidb-max-reuse-column"`
}

// UpdateTempStoragePath is to update the `TempStoragePath` if port/statusPort was changed
Expand Down Expand Up @@ -967,6 +971,8 @@ var defaultConf = Config{
NewCollationsEnabledOnFirstBootstrap: true,
EnableGlobalKill: true,
TrxSummary: DefaultTrxSummary(),
TiDBMaxReuseChunk: 64,
TiDBMaxReuseColumn: 256,
}

var (
Expand Down
4 changes: 4 additions & 0 deletions config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -730,6 +730,8 @@ enable-enum-length-limit = false
stores-refresh-interval = 30
enable-forwarding = true
enable-global-kill = true
tidb-max-reuse-chunk = 10
tidb-max-reuse-column = 20
[performance]
txn-total-size-limit=2000
tcp-no-delay = false
Expand Down Expand Up @@ -798,6 +800,8 @@ max_connections = 200
require.True(t, conf.RepairMode)
require.Equal(t, uint64(16), conf.TiKVClient.ResolveLockLiteThreshold)
require.Equal(t, uint32(200), conf.Instance.MaxConnections)
require.Equal(t, uint32(10), conf.TiDBMaxReuseChunk)
require.Equal(t, uint32(20), conf.TiDBMaxReuseColumn)
require.Equal(t, []string{"tiflash"}, conf.IsolationRead.Engines)
require.Equal(t, 3080, conf.MaxIndexLength)
require.Equal(t, 70, conf.IndexLimit)
Expand Down
2 changes: 1 addition & 1 deletion distsql/select_result.go
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,7 @@ func (r *selectResult) readFromDefault(ctx context.Context, chk *chunk.Chunk) er
func (r *selectResult) readFromChunk(ctx context.Context, chk *chunk.Chunk) error {
if r.respChunkDecoder == nil {
r.respChunkDecoder = chunk.NewDecoder(
chunk.NewChunkWithCapacity(r.fieldTypes, 0),
r.ctx.GetSessionVars().GetNewChunk(r.fieldTypes, 0),
r.fieldTypes,
)
}
Expand Down
4 changes: 2 additions & 2 deletions executor/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -829,7 +829,7 @@ func (a *ExecStmt) runPessimisticSelectForUpdate(ctx context.Context, e Executor
}()
var rows []chunk.Row
var err error
req := newFirstChunk(e)
req := tryNewCacheChunk(e)
for {
err = a.next(ctx, e, req)
if err != nil {
Expand Down Expand Up @@ -876,7 +876,7 @@ func (a *ExecStmt) handleNoDelayExecutor(ctx context.Context, e Executor) (sqlex
}
}

err = a.next(ctx, e, newFirstChunk(e))
err = a.next(ctx, e, tryNewCacheChunk(e))
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion executor/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ func (e *CheckIndexRangeExec) Open(ctx context.Context) error {
FieldType: *colTypeForHandle,
})

e.srcChunk = newFirstChunk(e)
e.srcChunk = tryNewCacheChunk(e)
dagPB, err := e.buildDAGPB()
if err != nil {
return err
Expand Down
9 changes: 5 additions & 4 deletions executor/aggregate.go
Original file line number Diff line number Diff line change
Expand Up @@ -323,13 +323,14 @@ func (e *HashAggExec) initForUnparallelExec() {
failpoint.Inject("ConsumeRandomPanic", nil)
e.memTracker.Consume(hack.DefBucketMemoryUsageForMapStrToSlice*(1<<e.bInMap) + setSize)
e.groupKeyBuffer = make([][]byte, 0, 8)
e.childResult = newFirstChunk(e.children[0])
e.childResult = tryNewCacheChunk(e.children[0])
e.memTracker.Consume(e.childResult.MemoryUsage())

e.offsetOfSpilledChks, e.numOfSpilledChks = 0, 0
e.executed, e.isChildDrained = false, false
e.listInDisk = chunk.NewListInDisk(retTypes(e.children[0]))
e.tmpChkForSpill = newFirstChunk(e.children[0])

e.tmpChkForSpill = tryNewCacheChunk(e.children[0])
if vars := e.ctx.GetSessionVars(); vars.TrackAggregateMemoryUsage && variable.EnableTmpStorageOnOOM.Load() {
e.diskTracker = disk.NewTracker(e.id, -1)
e.diskTracker.AttachTo(vars.StmtCtx.DiskTracker)
Expand Down Expand Up @@ -379,7 +380,7 @@ func (e *HashAggExec) initForParallelExec(_ sessionctx.Context) {
globalOutputCh: e.finalOutputCh,
partialResultsMap: make(aggPartialResultMapper),
groupByItems: e.GroupByItems,
chk: newFirstChunk(e.children[0]),
chk: tryNewCacheChunk(e.children[0]),
groupKey: make([][]byte, 0, 8),
}
// There is a bucket in the empty partialResultsMap.
Expand Down Expand Up @@ -1272,7 +1273,7 @@ func (e *StreamAggExec) Open(ctx context.Context) error {
// If panic in Open, the children executor should be closed because they are open.
defer closeBaseExecutor(&e.baseExecutor)

e.childResult = newFirstChunk(e.children[0])
e.childResult = tryNewCacheChunk(e.children[0])
e.executed = false
e.isChildReturnEmpty = true
e.inputIter = chunk.NewIterator4Chunk(e.childResult)
Expand Down
6 changes: 3 additions & 3 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -1538,7 +1538,7 @@ func (b *executorBuilder) buildHashAgg(v *plannercore.PhysicalHashAgg) Executor
e.defaultVal = nil
} else {
if v.IsFinalAgg() {
e.defaultVal = chunk.NewChunkWithCapacity(retTypes(e), 1)
e.defaultVal = e.ctx.GetSessionVars().GetNewChunk(retTypes(e), 1)
}
}
for _, aggDesc := range v.AggFuncs {
Expand Down Expand Up @@ -1601,7 +1601,7 @@ func (b *executorBuilder) buildStreamAgg(v *plannercore.PhysicalStreamAgg) Execu
} else {
// Only do this for final agg, see issue #35295, #30923
if v.IsFinalAgg() {
e.defaultVal = chunk.NewChunkWithCapacity(retTypes(e), 1)
e.defaultVal = e.ctx.GetSessionVars().GetNewChunk(retTypes(e), 1)
}
}
for i, aggDesc := range v.AggFuncs {
Expand Down Expand Up @@ -3133,7 +3133,7 @@ func (b *executorBuilder) buildIndexLookUpJoin(v *plannercore.PhysicalIndexJoin)
e.innerCtx.hashCols = innerHashCols
e.innerCtx.hashCollators = hashCollators

e.joinResult = newFirstChunk(e)
e.joinResult = tryNewCacheChunk(e)
executorCounterIndexLookUpJoin.Inc()
return e
}
Expand Down
4 changes: 2 additions & 2 deletions executor/coprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func (h *CoprocessorDAGHandler) HandleRequest(ctx context.Context, req *coproces
return h.buildErrorResponse(err)
}

chk := newFirstChunk(e)
chk := tryNewCacheChunk(e)
tps := e.base().retFieldTypes
var totalChunks, partChunks []tipb.Chunk
memTracker := h.sctx.GetSessionVars().StmtCtx.MemTracker
Expand Down Expand Up @@ -99,7 +99,7 @@ func (h *CoprocessorDAGHandler) HandleStreamRequest(ctx context.Context, req *co
return stream.Send(h.buildErrorResponse(err))
}

chk := newFirstChunk(e)
chk := tryNewCacheChunk(e)
tps := e.base().retFieldTypes
for {
chk.Reset()
Expand Down
4 changes: 2 additions & 2 deletions executor/cte.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ func (e *CTEExec) computeSeedPart(ctx context.Context) (err error) {
if e.limitDone(e.iterInTbl) {
break
}
chk := newFirstChunk(e.seedExec)
chk := tryNewCacheChunk(e.seedExec)
if err = Next(ctx, e.seedExec, chk); err != nil {
return err
}
Expand Down Expand Up @@ -273,7 +273,7 @@ func (e *CTEExec) computeRecursivePart(ctx context.Context) (err error) {
}

for {
chk := newFirstChunk(e.recursiveExec)
chk := tryNewCacheChunk(e.recursiveExec)
if err = Next(ctx, e.recursiveExec, chk); err != nil {
return err
}
Expand Down
4 changes: 2 additions & 2 deletions executor/delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ func (e *DeleteExec) deleteSingleTableByChunk(ctx context.Context) error {
batchDelete := e.ctx.GetSessionVars().BatchDelete && !e.ctx.GetSessionVars().InTxn() &&
variable.EnableBatchDML.Load() && batchDMLSize > 0
fields := retTypes(e.children[0])
chk := newFirstChunk(e.children[0])
chk := tryNewCacheChunk(e.children[0])
columns := e.children[0].Schema().Columns
if len(columns) != len(fields) {
logutil.BgLogger().Error("schema columns and fields mismatch",
Expand Down Expand Up @@ -190,7 +190,7 @@ func (e *DeleteExec) deleteMultiTablesByChunk(ctx context.Context) error {
colPosInfos := e.tblColPosInfos
tblRowMap := make(tableRowMapType)
fields := retTypes(e.children[0])
chk := newFirstChunk(e.children[0])
chk := tryNewCacheChunk(e.children[0])
memUsageOfChk := int64(0)
for {
e.memTracker.Consume(-memUsageOfChk)
Expand Down
6 changes: 3 additions & 3 deletions executor/distsql.go
Original file line number Diff line number Diff line change
Expand Up @@ -866,7 +866,7 @@ func (w *indexWorker) fetchHandles(ctx context.Context, result distsql.SelectRes
}
}()
retTps := w.idxLookup.getRetTpsByHandle()
chk := chunk.NewChunkWithCapacity(retTps, w.idxLookup.maxChunkSize)
chk := w.idxLookup.ctx.GetSessionVars().GetNewChunk(retTps, w.idxLookup.maxChunkSize)
idxID := w.idxLookup.getIndexPlanRootID()
if w.idxLookup.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl != nil {
if idxID != w.idxLookup.id && w.idxLookup.stats != nil {
Expand Down Expand Up @@ -1161,7 +1161,7 @@ func (e *IndexLookUpRunTimeStats) Tp() int {
}

func (w *tableWorker) compareData(ctx context.Context, task *lookupTableTask, tableReader Executor) error {
chk := newFirstChunk(tableReader)
chk := tryNewCacheChunk(tableReader)
tblInfo := w.idxLookup.table.Meta()
vals := make([]types.Datum, 0, len(w.idxTblCols))

Expand Down Expand Up @@ -1317,7 +1317,7 @@ func (w *tableWorker) executeTask(ctx context.Context, task *lookupTableTask) er
handleCnt := len(task.handles)
task.rows = make([]chunk.Row, 0, handleCnt)
for {
chk := newFirstChunk(tableReader)
chk := tryNewCacheChunk(tableReader)
err = Next(ctx, tableReader, chk)
if err != nil {
logutil.Logger(ctx).Error("table reader fetch next chunk failed", zap.Error(err))
Expand Down
20 changes: 15 additions & 5 deletions executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,12 @@ func newFirstChunk(e Executor) *chunk.Chunk {
return chunk.New(base.retFieldTypes, base.initCap, base.maxChunkSize)
}

func tryNewCacheChunk(e Executor) *chunk.Chunk {
base := e.base()
s := base.ctx.GetSessionVars()
return s.GetNewChunkWithCapacity(base.retFieldTypes, base.initCap, base.maxChunkSize)
}

// newList creates a new List to buffer current executor's result.
func newList(e Executor) *chunk.List {
base := e.base()
Expand Down Expand Up @@ -1389,7 +1395,7 @@ func (e *LimitExec) Open(ctx context.Context) error {
if err := e.baseExecutor.Open(ctx); err != nil {
return err
}
e.childResult = newFirstChunk(e.children[0])
e.childResult = tryNewCacheChunk(e.children[0])
e.cursor = 0
e.meetFirstBatch = e.begin == 0
return nil
Expand Down Expand Up @@ -1446,8 +1452,7 @@ func init() {
if err != nil {
return nil, err
}
chk := newFirstChunk(exec)

chk := tryNewCacheChunk(exec)
err = Next(ctx, exec, chk)
if err != nil {
return nil, err
Expand Down Expand Up @@ -1522,7 +1527,7 @@ func (e *SelectionExec) Open(ctx context.Context) error {
func (e *SelectionExec) open(ctx context.Context) error {
e.memTracker = memory.NewTracker(e.id, -1)
e.memTracker.AttachTo(e.ctx.GetSessionVars().StmtCtx.MemTracker)
e.childResult = newFirstChunk(e.children[0])
e.childResult = tryNewCacheChunk(e.children[0])
e.memTracker.Consume(e.childResult.MemoryUsage())
e.batched = expression.Vectorizable(e.filters)
if e.batched {
Expand Down Expand Up @@ -1702,7 +1707,7 @@ func (e *MaxOneRowExec) Next(ctx context.Context, req *chunk.Chunk) error {
return ErrSubqueryMoreThan1Row
}

childChunk := newFirstChunk(e.children[0])
childChunk := tryNewCacheChunk(e.children[0])
err = Next(ctx, e.children[0], childChunk)
if err != nil {
return err
Expand Down Expand Up @@ -2136,6 +2141,7 @@ func ResetContextOfStmt(ctx sessionctx.Context, s ast.StmtNode) (err error) {
errCount, warnCount := vars.StmtCtx.NumErrorWarnings()
vars.SysErrorCount = errCount
vars.SysWarningCount = warnCount
vars.ExchangeChunkStatus()
vars.StmtCtx = sc
vars.PrevFoundInPlanCache = vars.FoundInPlanCache
vars.FoundInPlanCache = false
Expand Down Expand Up @@ -2174,6 +2180,10 @@ func ResetUpdateStmtCtx(sc *stmtctx.StatementContext, stmt *ast.UpdateStmt, vars
// expression using rows from a chunk, and then fill this value into the chunk
func FillVirtualColumnValue(virtualRetTypes []*types.FieldType, virtualColumnIndex []int,
schema *expression.Schema, columns []*model.ColumnInfo, sctx sessionctx.Context, req *chunk.Chunk) error {
if len(virtualColumnIndex) == 0 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this line cchange unrelated to the chunk reuse optimization?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this way, reduce application of a chunk

return nil
}

virCols := chunk.NewChunkWithCapacity(virtualRetTypes, req.Capacity())
iter := chunk.NewIterator4Chunk(req)
for i, idx := range virtualColumnIndex {
Expand Down
2 changes: 1 addition & 1 deletion executor/explain.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ func (e *ExplainExec) executeAnalyzeExec(ctx context.Context) (err error) {
}).run()
}
e.executed = true
chk := newFirstChunk(e.analyzeExec)
chk := tryNewCacheChunk(e.analyzeExec)
for {
err = Next(ctx, e.analyzeExec, chk)
if err != nil || chk.NumRows() == 0 {
Expand Down
2 changes: 1 addition & 1 deletion executor/index_lookup_hash_join.go
Original file line number Diff line number Diff line change
Expand Up @@ -418,7 +418,7 @@ func (e *IndexNestedLoopHashJoin) newInnerWorker(taskCh chan *indexHashJoinTask,
innerCtx: e.innerCtx,
outerCtx: e.outerCtx,
ctx: e.ctx,
executorChk: chunk.NewChunkWithCapacity(e.innerCtx.rowTypes, e.maxChunkSize),
executorChk: e.ctx.GetSessionVars().GetNewChunk(e.innerCtx.rowTypes, e.maxChunkSize),
indexRanges: copiedRanges,
keyOff2IdxOff: e.keyOff2IdxOff,
stats: innerStats,
Expand Down
8 changes: 4 additions & 4 deletions executor/index_lookup_join.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ func (e *IndexLookUpJoin) newInnerWorker(taskCh chan *lookUpJoinTask) *innerWork
outerCtx: e.outerCtx,
taskCh: taskCh,
ctx: e.ctx,
executorChk: chunk.NewChunkWithCapacity(e.innerCtx.rowTypes, e.maxChunkSize),
executorChk: e.ctx.GetSessionVars().GetNewChunk(e.innerCtx.rowTypes, e.maxChunkSize),
indexRanges: copiedRanges,
keyOff2IdxOff: e.keyOff2IdxOff,
stats: innerStats,
Expand Down Expand Up @@ -431,7 +431,7 @@ func (ow *outerWorker) buildTask(ctx context.Context) (*lookUpJoinTask, error) {
}
maxChunkSize := ow.ctx.GetSessionVars().MaxChunkSize
for requiredRows > task.outerResult.Len() {
chk := chunk.NewChunkWithCapacity(ow.outerCtx.rowTypes, maxChunkSize)
chk := ow.ctx.GetSessionVars().GetNewChunk(ow.outerCtx.rowTypes, maxChunkSize)
chk = chk.SetRequiredRows(requiredRows, maxChunkSize)
err := Next(ctx, ow.executor, chk)
if err != nil {
Expand Down Expand Up @@ -462,7 +462,7 @@ func (ow *outerWorker) buildTask(ctx context.Context) (*lookUpJoinTask, error) {
}
task.encodedLookUpKeys = make([]*chunk.Chunk, task.outerResult.NumChunks())
for i := range task.encodedLookUpKeys {
task.encodedLookUpKeys[i] = chunk.NewChunkWithCapacity([]*types.FieldType{types.NewFieldType(mysql.TypeBlob)}, task.outerResult.GetChunk(i).NumRows())
task.encodedLookUpKeys[i] = ow.ctx.GetSessionVars().GetNewChunk([]*types.FieldType{types.NewFieldType(mysql.TypeBlob)}, task.outerResult.GetChunk(i).NumRows())
}
return task, nil
}
Expand Down Expand Up @@ -714,7 +714,7 @@ func (iw *innerWorker) fetchInnerResults(ctx context.Context, task *lookUpJoinTa
break
}
innerResult.Add(iw.executorChk)
iw.executorChk = newFirstChunk(innerExec)
iw.executorChk = tryNewCacheChunk(innerExec)
}
task.innerResult = innerResult
return nil
Expand Down
5 changes: 5 additions & 0 deletions executor/index_lookup_join_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -352,13 +352,16 @@ func TestIssue23722(t *testing.T) {
tk.MustExec("insert into t values (20301,'Charlie',x'7a');")
tk.MustQuery("select * from t;").Check(testkit.Rows("20301 Charlie z"))
tk.MustQuery("select * from t where c in (select c from t where t.c >= 'a');").Check(testkit.Rows("20301 Charlie z"))
tk.MustQuery("select @@last_sql_use_alloc").Check(testkit.Rows("1"))

// Test lookup content exceeds primary key prefix.
tk.MustExec("drop table if exists t;")
tk.MustExec("create table t (a int, b char(10), c varchar(255), primary key (c(5)) clustered);")
tk.MustExec("insert into t values (20301,'Charlie','aaaaaaa');")
tk.MustQuery("select @@last_sql_use_alloc").Check(testkit.Rows("1"))
tk.MustQuery("select * from t;").Check(testkit.Rows("20301 Charlie aaaaaaa"))
tk.MustQuery("select * from t where c in (select c from t where t.c >= 'a');").Check(testkit.Rows("20301 Charlie aaaaaaa"))
tk.MustQuery("select @@last_sql_use_alloc").Check(testkit.Rows("1"))

// Test the original case.
tk.MustExec("drop table if exists t;")
Expand Down Expand Up @@ -452,7 +455,9 @@ func TestIssue27893(t *testing.T) {
tk.MustExec("insert into t1 values('x')")
tk.MustExec("insert into t2 values(1)")
tk.MustQuery("select /*+ inl_join(t2) */ count(*) from t1 join t2 on t1.a = t2.a").Check(testkit.Rows("1"))
tk.MustQuery("select @@last_sql_use_alloc").Check(testkit.Rows("1"))
tk.MustQuery("select /*+ inl_hash_join(t2) */ count(*) from t1 join t2 on t1.a = t2.a").Check(testkit.Rows("1"))
tk.MustQuery("select @@last_sql_use_alloc").Check(testkit.Rows("1"))
}

func TestPartitionTableIndexJoinAndIndexReader(t *testing.T) {
Expand Down
4 changes: 2 additions & 2 deletions executor/index_lookup_merge_join.go
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,7 @@ func (omw *outerMergeWorker) buildTask(ctx context.Context) (*lookUpMergeJoinTas
requiredRows = omw.maxBatchSize
}
for requiredRows > 0 {
execChk := newFirstChunk(omw.executor)
execChk := tryNewCacheChunk(omw.executor)
err := Next(ctx, omw.executor, execChk)
if err != nil {
return task, err
Expand Down Expand Up @@ -706,7 +706,7 @@ func (imw *innerMergeWorker) dedupDatumLookUpKeys(lookUpContents []*indexJoinLoo

// fetchNextInnerResult collects a chunk of inner results from inner child executor.
func (imw *innerMergeWorker) fetchNextInnerResult(ctx context.Context, task *lookUpMergeJoinTask) (beginRow chunk.Row, err error) {
task.innerResult = chunk.NewChunkWithCapacity(retTypes(imw.innerExec), imw.ctx.GetSessionVars().MaxChunkSize)
task.innerResult = imw.ctx.GetSessionVars().GetNewChunk(retTypes(imw.innerExec), imw.ctx.GetSessionVars().MaxChunkSize)
err = Next(ctx, imw.innerExec, task.innerResult)
task.innerIter = chunk.NewIterator4Chunk(task.innerResult)
beginRow = task.innerIter.Begin()
Expand Down