Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

executor: show operators' disk consumption in results of EXPLAIN ANALYZE #13764

Merged
merged 8 commits into from
Dec 2, 2019
4 changes: 3 additions & 1 deletion distsql/request_builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/codec"
"github.com/pingcap/tidb/util/disk"
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/memory"
"github.com/pingcap/tidb/util/mock"
Expand Down Expand Up @@ -55,7 +56,8 @@ type testSuite struct {
func (s *testSuite) SetUpSuite(c *C) {
ctx := mock.NewContext()
ctx.GetSessionVars().StmtCtx = &stmtctx.StatementContext{
MemTracker: memory.NewTracker(stringutil.StringerStr("testSuite"), variable.DefTiDBMemQuotaDistSQL),
MemTracker: memory.NewTracker(stringutil.StringerStr("testSuite"), variable.DefTiDBMemQuotaDistSQL),
DiskTracker: disk.NewTracker(stringutil.StringerStr("testSuite"), -1),
}
ctx.Store = &mock.Store{
Client: &mock.Client{
Expand Down
5 changes: 5 additions & 0 deletions executor/benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/disk"
"github.com/pingcap/tidb/util/memory"
"github.com/pingcap/tidb/util/mock"
"github.com/pingcap/tidb/util/stringutil"
Expand Down Expand Up @@ -558,6 +559,7 @@ func defaultHashJoinTestCase(cols []*types.FieldType, joinType core.JoinType, us
ctx.GetSessionVars().InitChunkSize = variable.DefInitChunkSize
ctx.GetSessionVars().MaxChunkSize = variable.DefMaxChunkSize
ctx.GetSessionVars().StmtCtx.MemTracker = memory.NewTracker(nil, -1)
ctx.GetSessionVars().StmtCtx.DiskTracker = disk.NewTracker(nil, -1)
ctx.GetSessionVars().IndexLookupJoinConcurrency = 4
tc := &hashJoinTestCase{rows: 100000, concurrency: 4, ctx: ctx, keyIdx: []int{0, 1}}
tc.cols = cols
Expand Down Expand Up @@ -603,7 +605,9 @@ func prepare4HashJoin(testCase *hashJoinTestCase, innerExec, outerExec Executor)
}
t := memory.NewTracker(stringutil.StringerStr("root of prepare4HashJoin"), memLimit)
t.SetActionOnExceed(nil)
t2 := disk.NewTracker(stringutil.StringerStr("root of prepare4HashJoin"), -1)
e.ctx.GetSessionVars().StmtCtx.MemTracker = t
e.ctx.GetSessionVars().StmtCtx.DiskTracker = t2
return e
}

Expand Down Expand Up @@ -865,6 +869,7 @@ func defaultIndexJoinTestCase() *indexJoinTestCase {
ctx.GetSessionVars().MaxChunkSize = variable.DefMaxChunkSize
ctx.GetSessionVars().SnapshotTS = 1
ctx.GetSessionVars().StmtCtx.MemTracker = memory.NewTracker(nil, -1)
ctx.GetSessionVars().StmtCtx.DiskTracker = disk.NewTracker(nil, -1)
tc := &indexJoinTestCase{
outerRows: 100000,
innerRows: variable.DefMaxChunkSize * 100,
Expand Down
8 changes: 5 additions & 3 deletions executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ import (
"github.com/pingcap/tidb/util"
"github.com/pingcap/tidb/util/admin"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/disk"
"github.com/pingcap/tidb/util/execdetails"
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/memory"
Expand Down Expand Up @@ -1493,9 +1494,10 @@ func ResetContextOfStmt(ctx sessionctx.Context, s ast.StmtNode) (err error) {
memQuota = stmtHints.MemQuotaQuery
}
sc := &stmtctx.StatementContext{
StmtHints: stmtHints,
TimeZone: vars.Location(),
MemTracker: memory.NewTracker(stringutil.MemoizeStr(s.Text), memQuota),
StmtHints: stmtHints,
TimeZone: vars.Location(),
MemTracker: memory.NewTracker(stringutil.MemoizeStr(s.Text), memQuota),
DiskTracker: disk.NewTracker(stringutil.MemoizeStr(s.Text), -1),
}
switch config.GetGlobalConfig().OOMAction {
case config.OOMActionCancel:
Expand Down
2 changes: 2 additions & 0 deletions executor/executor_required_rows_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/disk"
"github.com/pingcap/tidb/util/memory"
"github.com/pingcap/tidb/util/mock"
)
Expand Down Expand Up @@ -206,6 +207,7 @@ func defaultCtx() sessionctx.Context {
ctx.GetSessionVars().MaxChunkSize = variable.DefMaxChunkSize
ctx.GetSessionVars().MemQuotaSort = variable.DefTiDBMemQuotaSort
ctx.GetSessionVars().StmtCtx.MemTracker = memory.NewTracker(nil, ctx.GetSessionVars().MemQuotaQuery)
ctx.GetSessionVars().StmtCtx.DiskTracker = disk.NewTracker(nil, -1)
ctx.GetSessionVars().SnapshotTS = uint64(1)
return ctx
}
Expand Down
14 changes: 9 additions & 5 deletions executor/hash_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/pingcap/tidb/util/disk"
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/memory"
"github.com/pingcap/tidb/util/stringutil"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -88,7 +89,8 @@ type hashRowContainer struct {
// memTracker is the reference of records.GetMemTracker().
// records would be set to nil for garbage collection when spilling is activated
// so we need this reference.
memTracker *memory.Tracker
memTracker *memory.Tracker
diskTracker *disk.Tracker

// records stores the chunks in memory.
records *chunk.List
Expand Down Expand Up @@ -122,9 +124,10 @@ func newHashRowContainer(sCtx sessionctx.Context, estCount int, hCtx *hashContex
sc: sCtx.GetSessionVars().StmtCtx,
hCtx: hCtx,

hashTable: newRowHashMap(estCount),
memTracker: initList.GetMemTracker(),
records: initList,
hashTable: newRowHashMap(estCount),
memTracker: initList.GetMemTracker(),
diskTracker: disk.NewTracker(stringutil.StringerStr("hashRowContainer"), -1),
records: initList,
}

return c
Expand Down Expand Up @@ -174,6 +177,7 @@ func (c *hashRowContainer) matchJoinKey(buildRow, probeRow chunk.Row, probeHCtx
func (c *hashRowContainer) spillToDisk() (err error) {
N := c.records.NumChunks()
c.recordsInDisk = chunk.NewListInDisk(c.hCtx.allTypes)
c.recordsInDisk.GetDiskTracker().AttachTo(c.diskTracker)
for i := 0; i < N; i++ {
chk := c.records.GetChunk(i)
err = c.recordsInDisk.Add(chk)
Expand Down Expand Up @@ -271,7 +275,7 @@ func (c *hashRowContainer) Close() error {
func (c *hashRowContainer) GetMemTracker() *memory.Tracker { return c.memTracker }

// GetDiskTracker returns the underlying disk usage tracker in hashRowContainer.
func (c *hashRowContainer) GetDiskTracker() *disk.Tracker { return c.recordsInDisk.GetDiskTracker() }
func (c *hashRowContainer) GetDiskTracker() *disk.Tracker { return c.diskTracker }

// ActionSpill returns a memory.ActionOnExceed for spilling over to disk.
func (c *hashRowContainer) ActionSpill() memory.ActionOnExceed {
Expand Down
7 changes: 7 additions & 0 deletions executor/join.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/pingcap/tidb/util/bitmap"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/codec"
"github.com/pingcap/tidb/util/disk"
"github.com/pingcap/tidb/util/memory"
"github.com/pingcap/tidb/util/stringutil"
)
Expand Down Expand Up @@ -70,6 +71,7 @@ type HashJoinExec struct {
joinResultCh chan *hashjoinWorkerResult

memTracker *memory.Tracker // track memory usage.
diskTracker *disk.Tracker // track disk usage.
prepared bool
isOuterJoin bool

Expand Down Expand Up @@ -145,6 +147,9 @@ func (e *HashJoinExec) Open(ctx context.Context) error {
e.memTracker = memory.NewTracker(e.id, -1)
e.memTracker.AttachTo(e.ctx.GetSessionVars().StmtCtx.MemTracker)

e.diskTracker = disk.NewTracker(e.id, -1)
e.diskTracker.AttachTo(e.ctx.GetSessionVars().StmtCtx.DiskTracker)

e.closeCh = make(chan struct{})
e.finished.Store(false)
e.joinWorkerWaitGroup = sync.WaitGroup{}
Expand Down Expand Up @@ -677,6 +682,8 @@ func (e *HashJoinExec) buildHashTableForList(buildSideResultCh <-chan *chunk.Chu
e.rowContainer = newHashRowContainer(e.ctx, int(e.buildSideEstCount), hCtx)
e.rowContainer.GetMemTracker().AttachTo(e.memTracker)
e.rowContainer.GetMemTracker().SetLabel(buildSideResultLabel)
e.rowContainer.GetDiskTracker().AttachTo(e.diskTracker)
e.rowContainer.GetDiskTracker().SetLabel(buildSideResultLabel)
if config.GetGlobalConfig().OOMUseTmpStorage {
actionSpill := e.rowContainer.ActionSpill()
e.ctx.GetSessionVars().StmtCtx.MemTracker.FallbackOldAndSetNewAction(actionSpill)
Expand Down
2 changes: 1 addition & 1 deletion planner/core/cbo_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ func (s *testAnalyzeSuite) TestExplainAnalyze(c *C) {
rs := tk.MustQuery("explain analyze select t1.a, t1.b, sum(t1.c) from t1 join t2 on t1.a = t2.b where t1.a > 1")
c.Assert(len(rs.Rows()), Equals, 10)
for _, row := range rs.Rows() {
c.Assert(len(row), Equals, 6)
c.Assert(len(row), Equals, 7)
execInfo := row[4].(string)
c.Assert(strings.Contains(execInfo, "time"), Equals, true)
c.Assert(strings.Contains(execInfo, "loops"), Equals, true)
Expand Down
15 changes: 11 additions & 4 deletions planner/core/common_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -675,7 +675,7 @@ func (e *Explain) prepareSchema() error {
case format == ast.ExplainFormatROW && !e.Analyze:
fieldNames = []string{"id", "count", "task", "operator info"}
case format == ast.ExplainFormatROW && e.Analyze:
fieldNames = []string{"id", "count", "task", "operator info", "execution info", "memory"}
fieldNames = []string{"id", "count", "task", "operator info", "execution info", "memory", "disk"}
case format == ast.ExplainFormatDOT:
fieldNames = []string{"dot contents"}
default:
Expand Down Expand Up @@ -815,9 +815,16 @@ func (e *Explain) prepareOperatorInfo(p Plan, taskType string, indent string, is
}
row = append(row, analyzeInfo)

tracker := e.ctx.GetSessionVars().StmtCtx.MemTracker.SearchTracker(p.ExplainID().String())
if tracker != nil {
row = append(row, tracker.BytesToString(tracker.MaxConsumed()))
memTracker := e.ctx.GetSessionVars().StmtCtx.MemTracker.SearchTracker(p.ExplainID().String())
if memTracker != nil {
row = append(row, memTracker.BytesToString(memTracker.MaxConsumed()))
} else {
row = append(row, "N/A")
}

diskTracker := e.ctx.GetSessionVars().StmtCtx.DiskTracker.SearchTracker(p.ExplainID().String())
if diskTracker != nil {
row = append(row, diskTracker.BytesToString(diskTracker.MaxConsumed()))
} else {
row = append(row, "N/A")
}
Expand Down
2 changes: 2 additions & 0 deletions sessionctx/stmtctx/stmtctx.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/pingcap/parser"
"github.com/pingcap/parser/model"
"github.com/pingcap/parser/mysql"
"github.com/pingcap/tidb/util/disk"
"github.com/pingcap/tidb/util/execdetails"
"github.com/pingcap/tidb/util/memory"
"go.uber.org/zap"
Expand Down Expand Up @@ -126,6 +127,7 @@ type StatementContext struct {
Priority mysql.PriorityEnum
NotFillCache bool
MemTracker *memory.Tracker
DiskTracker *disk.Tracker
RuntimeStatsColl *execdetails.RuntimeStatsColl
TableIDs []int64
IndexNames []string
Expand Down
2 changes: 2 additions & 0 deletions util/mock/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/util"
"github.com/pingcap/tidb/util/disk"
"github.com/pingcap/tidb/util/kvcache"
"github.com/pingcap/tidb/util/memory"
"github.com/pingcap/tidb/util/sqlexec"
Expand Down Expand Up @@ -267,6 +268,7 @@ func NewContext() *Context {
sctx.sessionVars.MaxChunkSize = 32
sctx.sessionVars.StmtCtx.TimeZone = time.UTC
sctx.sessionVars.StmtCtx.MemTracker = memory.NewTracker(stringutil.StringerStr("mock.NewContext"), -1)
sctx.sessionVars.StmtCtx.DiskTracker = disk.NewTracker(stringutil.StringerStr("mock.NewContext"), -1)
sctx.sessionVars.GlobalVarsAccessor = variable.NewMockGlobalAccessor()
if err := sctx.GetSessionVars().SetSystemVar(variable.MaxAllowedPacket, "67108864"); err != nil {
panic(err)
Expand Down