Skip to content

Commit

Permalink
*: make explain support explain anaylze (#7827)(#7888) (#7925)
Browse files Browse the repository at this point in the history
  • Loading branch information
lysu authored and ngaut committed Oct 18, 2018
1 parent c91290f commit 52d5ee2
Show file tree
Hide file tree
Showing 20 changed files with 320 additions and 48 deletions.
5 changes: 3 additions & 2 deletions ast/misc.go
Expand Up @@ -118,8 +118,9 @@ func (n *TraceStmt) Accept(v Visitor) (Node, bool) {
type ExplainStmt struct {
stmtNode

Stmt StmtNode
Format string
Stmt StmtNode
Format string
Analyze bool
}

// Accept implements Node Accept interface.
Expand Down
4 changes: 2 additions & 2 deletions executor/adapter.go
Expand Up @@ -365,12 +365,12 @@ func (a *ExecStmt) logSlowQuery(txnTS uint64, succ bool) {
if sessVars.InRestrictedSQL {
internal = "[INTERNAL] "
}
execDetail := sessVars.StmtCtx.GetExecDetails()
if costTime < threshold {
logutil.SlowQueryLogger.Debugf(
"[QUERY] %vcost_time:%v %s succ:%v con:%v user:%s txn_start_ts:%v database:%v %v%vsql:%v",
internal, costTime, sessVars.StmtCtx.GetExecDetails(), succ, connID, user, txnTS, currentDB, tableIDs, indexIDs, sql)
internal, costTime, execDetail, succ, connID, user, txnTS, currentDB, tableIDs, indexIDs, sql)
} else {
execDetail := sessVars.StmtCtx.GetExecDetails()
logutil.SlowQueryLogger.Warnf(
"[SLOW_QUERY] %vcost_time:%v %s succ:%v con:%v user:%s txn_start_ts:%v database:%v %v%vsql:%v",
internal, costTime, execDetail, succ, connID, user, txnTS, currentDB, tableIDs, indexIDs, sql)
Expand Down
10 changes: 9 additions & 1 deletion executor/aggregate.go
Expand Up @@ -15,6 +15,7 @@ package executor

import (
"sync"
"time"

"github.com/pingcap/tidb/executor/aggfuncs"
"github.com/pingcap/tidb/expression"
Expand Down Expand Up @@ -501,6 +502,10 @@ func (w *HashAggFinalWorker) run(ctx sessionctx.Context, waitGroup *sync.WaitGro

// Next implements the Executor Next interface.
func (e *HashAggExec) Next(ctx context.Context, chk *chunk.Chunk) error {
if e.runtimeStats != nil {
start := time.Now()
defer func() { e.runtimeStats.Record(time.Now().Sub(start), chk.NumRows()) }()
}
chk.Reset()
if e.isUnparallelExec {
return errors.Trace(e.unparallelExec(ctx, chk))
Expand Down Expand Up @@ -756,8 +761,11 @@ func (e *StreamAggExec) Close() error {

// Next implements the Executor Next interface.
func (e *StreamAggExec) Next(ctx context.Context, chk *chunk.Chunk) error {
if e.runtimeStats != nil {
start := time.Now()
defer func() { e.runtimeStats.Record(time.Now().Sub(start), chk.NumRows()) }()
}
chk.Reset()

for !e.executed && chk.NumRows() < e.maxChunkSize {
err := e.consumeOneGroup(ctx, chk)
if err != nil {
Expand Down
15 changes: 8 additions & 7 deletions executor/builder.go
Expand Up @@ -43,6 +43,7 @@ import (
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/admin"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/execdetails"
"github.com/pingcap/tidb/util/ranger"
"github.com/pingcap/tidb/util/timeutil"
"github.com/pingcap/tipb/go-tipb"
Expand Down Expand Up @@ -448,13 +449,12 @@ func (b *executorBuilder) buildLimit(v *plannercore.PhysicalLimit) Executor {
func (b *executorBuilder) buildPrepare(v *plannercore.Prepare) Executor {
base := newBaseExecutor(b.ctx, v.Schema(), v.ExplainID())
base.initCap = chunk.ZeroCapacity
e := &PrepareExec{
return &PrepareExec{
baseExecutor: base,
is: b.is,
name: v.Name,
sqlText: v.SQLText,
}
return e
}

func (b *executorBuilder) buildExecute(v *plannercore.Execute) Executor {
Expand Down Expand Up @@ -659,14 +659,15 @@ func (b *executorBuilder) buildTrace(v *plannercore.Trace) Executor {

// buildExplain builds a explain executor. `e.rows` collects final result to `ExplainExec`.
func (b *executorBuilder) buildExplain(v *plannercore.Explain) Executor {
e := &ExplainExec{
explainExec := &ExplainExec{
baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ExplainID()),
explain: v,
}
e.rows = make([][]string, 0, len(v.Rows))
for _, row := range v.Rows {
e.rows = append(e.rows, row)
if v.Analyze {
b.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl = execdetails.NewRuntimeStatsColl()
explainExec.analyzeExec = b.build(v.ExecPlan)
}
return e
return explainExec
}

func (b *executorBuilder) buildUnionScanExec(v *plannercore.PhysicalUnionScan) Executor {
Expand Down
17 changes: 15 additions & 2 deletions executor/distsql.go
Expand Up @@ -19,6 +19,7 @@ import (
"sort"
"sync"
"sync/atomic"
"time"
"unsafe"

"github.com/pingcap/tidb/distsql"
Expand Down Expand Up @@ -243,6 +244,10 @@ func (e *IndexReaderExecutor) Close() error {

// Next implements the Executor Next interface.
func (e *IndexReaderExecutor) Next(ctx context.Context, chk *chunk.Chunk) error {
if e.runtimeStats != nil {
start := time.Now()
defer func() { e.runtimeStats.Record(time.Now().Sub(start), chk.NumRows()) }()
}
err := e.result.Next(ctx, chk)
if err != nil {
e.feedback.Invalidate()
Expand Down Expand Up @@ -474,7 +479,7 @@ func (e *IndexLookUpExecutor) startTableWorker(ctx context.Context, workCh <-cha
}

func (e *IndexLookUpExecutor) buildTableReader(ctx context.Context, handles []int64) (Executor, error) {
tableReader, err := e.dataReaderBuilder.buildTableReaderFromHandles(ctx, &TableReaderExecutor{
tableReaderExec := &TableReaderExecutor{
baseExecutor: newBaseExecutor(e.ctx, e.schema, e.id+"_tableReader"),
table: e.table,
physicalTableID: e.physicalTableID,
Expand All @@ -483,7 +488,11 @@ func (e *IndexLookUpExecutor) buildTableReader(ctx context.Context, handles []in
feedback: statistics.NewQueryFeedback(0, nil, 0, false),
corColInFilter: e.corColInTblSide,
plans: e.tblPlans,
}, handles)
}
// We assign `nil` to `runtimeStats` to forbidden `TableWorker` driven `IndexLookupExecutor`'s runtime stats collecting,
// because TableWorker information isn't showing in explain result now.
tableReaderExec.runtimeStats = nil
tableReader, err := e.dataReaderBuilder.buildTableReaderFromHandles(ctx, tableReaderExec, handles)
if err != nil {
log.Error(err)
return nil, errors.Trace(err)
Expand Down Expand Up @@ -512,6 +521,10 @@ func (e *IndexLookUpExecutor) Close() error {

// Next implements Exec Next interface.
func (e *IndexLookUpExecutor) Next(ctx context.Context, chk *chunk.Chunk) error {
if e.runtimeStats != nil {
start := time.Now()
defer func() { e.runtimeStats.Record(time.Now().Sub(start), chk.NumRows()) }()
}
chk.Reset()
for {
resultTask, err := e.getResultTask()
Expand Down
35 changes: 35 additions & 0 deletions executor/executor.go
Expand Up @@ -18,6 +18,7 @@ import (
"runtime"
"sync"
"sync/atomic"
"time"

"github.com/cznic/mathutil"
"github.com/pingcap/tidb/ast"
Expand All @@ -35,12 +36,14 @@ import (
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/admin"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/execdetails"
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
"golang.org/x/net/context"
)

var (
_ Executor = &baseExecutor{}
_ Executor = &CheckTableExec{}
_ Executor = &HashAggExec{}
_ Executor = &LimitExec{}
Expand Down Expand Up @@ -71,6 +74,7 @@ type baseExecutor struct {
maxChunkSize int
children []Executor
retFieldTypes []*types.FieldType
runtimeStats *execdetails.RuntimeStats
}

// Open initializes children recursively and "childrenResults" according to children's schemas.
Expand Down Expand Up @@ -127,6 +131,9 @@ func newBaseExecutor(ctx sessionctx.Context, schema *expression.Schema, id strin
initCap: ctx.GetSessionVars().MaxChunkSize,
maxChunkSize: ctx.GetSessionVars().MaxChunkSize,
}
if ctx.GetSessionVars().StmtCtx.RuntimeStatsColl != nil {
e.runtimeStats = e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.Get(e.id)
}
if schema != nil {
cols := schema.Columns
e.retFieldTypes = make([]*types.FieldType, len(cols))
Expand Down Expand Up @@ -168,6 +175,10 @@ type CancelDDLJobsExec struct {

// Next implements the Executor Next interface.
func (e *CancelDDLJobsExec) Next(ctx context.Context, chk *chunk.Chunk) error {
if e.runtimeStats != nil {
start := time.Now()
defer func() { e.runtimeStats.Record(time.Now().Sub(start), chk.NumRows()) }()
}
chk.GrowAndReset(e.maxChunkSize)
if e.cursor >= len(e.jobIDs) {
return nil
Expand Down Expand Up @@ -610,6 +621,10 @@ type LimitExec struct {

// Next implements the Executor Next interface.
func (e *LimitExec) Next(ctx context.Context, chk *chunk.Chunk) error {
if e.runtimeStats != nil {
start := time.Now()
defer func() { e.runtimeStats.Record(time.Now().Sub(start), chk.NumRows()) }()
}
chk.Reset()
if e.cursor >= e.end {
return nil
Expand Down Expand Up @@ -729,6 +744,10 @@ func (e *TableDualExec) Open(ctx context.Context) error {

// Next implements the Executor Next interface.
func (e *TableDualExec) Next(ctx context.Context, chk *chunk.Chunk) error {
if e.runtimeStats != nil {
start := time.Now()
defer func() { e.runtimeStats.Record(time.Now().Sub(start), chk.NumRows()) }()
}
chk.Reset()
if e.numReturned >= e.numDualRows {
return nil
Expand Down Expand Up @@ -780,6 +799,10 @@ func (e *SelectionExec) Close() error {

// Next implements the Executor Next interface.
func (e *SelectionExec) Next(ctx context.Context, chk *chunk.Chunk) error {
if e.runtimeStats != nil {
start := time.Now()
defer func() { e.runtimeStats.Record(time.Now().Sub(start), chk.NumRows()) }()
}
chk.GrowAndReset(e.maxChunkSize)

if !e.batched {
Expand Down Expand Up @@ -855,6 +878,10 @@ type TableScanExec struct {

// Next implements the Executor Next interface.
func (e *TableScanExec) Next(ctx context.Context, chk *chunk.Chunk) error {
if e.runtimeStats != nil {
start := time.Now()
defer func() { e.runtimeStats.Record(time.Now().Sub(start), chk.NumRows()) }()
}
chk.GrowAndReset(e.maxChunkSize)
if e.isVirtualTable {
return errors.Trace(e.nextChunk4InfoSchema(ctx, chk))
Expand Down Expand Up @@ -955,6 +982,10 @@ func (e *MaxOneRowExec) Open(ctx context.Context) error {

// Next implements the Executor Next interface.
func (e *MaxOneRowExec) Next(ctx context.Context, chk *chunk.Chunk) error {
if e.runtimeStats != nil {
start := time.Now()
defer func() { e.runtimeStats.Record(time.Now().Sub(start), chk.NumRows()) }()
}
chk.Reset()
if e.evaluated {
return nil
Expand Down Expand Up @@ -1097,6 +1128,10 @@ func (e *UnionExec) resultPuller(ctx context.Context, childID int) {

// Next implements the Executor Next interface.
func (e *UnionExec) Next(ctx context.Context, chk *chunk.Chunk) error {
if e.runtimeStats != nil {
start := time.Now()
defer func() { e.runtimeStats.Record(time.Now().Sub(start), chk.NumRows()) }()
}
chk.GrowAndReset(e.maxChunkSize)
if !e.initialized {
e.initialize(ctx)
Expand Down
46 changes: 44 additions & 2 deletions executor/explain.go
Expand Up @@ -15,6 +15,7 @@ package executor

import (
"github.com/cznic/mathutil"
"github.com/pingcap/tidb/planner/core"
"github.com/pingcap/tidb/util/chunk"
"golang.org/x/net/context"
)
Expand All @@ -23,18 +24,39 @@ import (
type ExplainExec struct {
baseExecutor

rows [][]string
cursor int
explain *core.Explain
analyzeExec Executor
rows [][]string
cursor int
}

// Open implements the Executor Open interface.
func (e *ExplainExec) Open(ctx context.Context) error {
if e.analyzeExec != nil {
return e.analyzeExec.Open(ctx)
}
return nil
}

// Close implements the Executor Close interface.
func (e *ExplainExec) Close() error {
if e.analyzeExec != nil {
e.analyzeExec.Close()
}
e.rows = nil
return nil
}

// Next implements the Executor Next interface.
func (e *ExplainExec) Next(ctx context.Context, chk *chunk.Chunk) error {
if e.rows == nil {
var err error
e.rows, err = e.generateExplainInfo(ctx)
if err != nil {
return err
}
}

chk.GrowAndReset(e.maxChunkSize)
if e.cursor >= len(e.rows) {
return nil
Expand All @@ -49,3 +71,23 @@ func (e *ExplainExec) Next(ctx context.Context, chk *chunk.Chunk) error {
e.cursor += numCurRows
return nil
}

func (e *ExplainExec) generateExplainInfo(ctx context.Context) ([][]string, error) {
if e.analyzeExec != nil {
chk := e.analyzeExec.newFirstChunk()
for {
err := e.analyzeExec.Next(ctx, chk)
if err != nil {
return nil, err
}
if chk.NumRows() == 0 {
break
}
}
}
e.explain.RenderResult()
if e.analyzeExec != nil {
e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl = nil
}
return e.explain.Rows, nil
}
5 changes: 5 additions & 0 deletions executor/index_lookup_join.go
Expand Up @@ -18,6 +18,7 @@ import (
"runtime"
"sort"
"sync"
"time"
"unsafe"

"github.com/pingcap/tidb/expression"
Expand Down Expand Up @@ -189,6 +190,10 @@ func (e *IndexLookUpJoin) newInnerWorker(taskCh chan *lookUpJoinTask) *innerWork

// Next implements the Executor interface.
func (e *IndexLookUpJoin) Next(ctx context.Context, chk *chunk.Chunk) error {
if e.runtimeStats != nil {
start := time.Now()
defer func() { e.runtimeStats.Record(time.Now().Sub(start), chk.NumRows()) }()
}
chk.Reset()
e.joinResult.Reset()
for {
Expand Down
9 changes: 9 additions & 0 deletions executor/join.go
Expand Up @@ -16,6 +16,7 @@ package executor
import (
"sync"
"sync/atomic"
"time"
"unsafe"

"github.com/pingcap/tidb/expression"
Expand Down Expand Up @@ -483,6 +484,10 @@ func (e *HashJoinExec) join2Chunk(workerID uint, outerChk *chunk.Chunk, joinResu
// step 1. fetch data from inner child and build a hash table;
// step 2. fetch data from outer child in a background goroutine and probe the hash table in multiple join workers.
func (e *HashJoinExec) Next(ctx context.Context, chk *chunk.Chunk) (err error) {
if e.runtimeStats != nil {
start := time.Now()
defer func() { e.runtimeStats.Record(time.Now().Sub(start), chk.NumRows()) }()
}
if !e.prepared {
e.innerFinished = make(chan error, 1)
go util.WithRecovery(func() { e.fetchInnerAndBuildHashTable(ctx) }, e.finishFetchInnerAndBuildHashTable)
Expand Down Expand Up @@ -696,6 +701,10 @@ func (e *NestedLoopApplyExec) fetchAllInners(ctx context.Context) error {

// Next implements the Executor interface.
func (e *NestedLoopApplyExec) Next(ctx context.Context, chk *chunk.Chunk) (err error) {
if e.runtimeStats != nil {
start := time.Now()
defer func() { e.runtimeStats.Record(time.Now().Sub(start), chk.NumRows()) }()
}
chk.Reset()
for {
if e.innerIter == nil || e.innerIter.Current() == e.innerIter.End() {
Expand Down

0 comments on commit 52d5ee2

Please sign in to comment.