Skip to content

Commit

Permalink
tidb tracing prototype
Browse files Browse the repository at this point in the history
  • Loading branch information
zhexuany committed Aug 16, 2018
1 parent f18176f commit a8821c8
Show file tree
Hide file tree
Showing 14 changed files with 371 additions and 14 deletions.
6 changes: 6 additions & 0 deletions distsql/distsql.go
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/statistics"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/tracing"
"golang.org/x/net/context"
)

Expand All @@ -35,9 +36,13 @@ func Select(ctx context.Context, sctx sessionctx.Context, kvReq *kv.Request, fie
hook.(func(*kv.Request))(kvReq)
}

child := tracing.ChildSpanFromContxt(ctx, "distsql_select")
defer child.Finish()

if !sctx.GetSessionVars().EnableStreaming {
kvReq.Streaming = false
}

resp := sctx.GetClient().Send(ctx, kvReq, sctx.GetSessionVars().KVVars)
if resp == nil {
err := errors.New("client returns nil response")
Expand All @@ -54,6 +59,7 @@ func Select(ctx context.Context, sctx sessionctx.Context, kvReq *kv.Request, fie
}, nil
}

child.LogKV("event", "finished sending rpc calls")
return &selectResult{
label: "dag",
resp: resp,
Expand Down
26 changes: 25 additions & 1 deletion executor/aggregate.go
Expand Up @@ -16,7 +16,10 @@ package executor
import (
"sync"

"fmt"

"github.com/juju/errors"
"github.com/opentracing/opentracing-go"
"github.com/pingcap/tidb/executor/aggfuncs"
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/expression/aggregation"
Expand All @@ -27,6 +30,7 @@ import (
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/codec"
"github.com/pingcap/tidb/util/mvmap"
"github.com/pingcap/tidb/util/tracing"
"github.com/spaolacci/murmur3"
"golang.org/x/net/context"
)
Expand Down Expand Up @@ -64,6 +68,8 @@ type HashAggPartialWorker struct {
// chk stores the input data from child,
// and is reused by childExec and partial worker.
chk *chunk.Chunk

trace opentracing.Span
}

// HashAggFinalWorker indicates the final workers of parallel hash agg execution,
Expand All @@ -79,6 +85,8 @@ type HashAggFinalWorker struct {
inputCh chan *HashAggIntermData
outputCh chan *AfFinalResult
finalResultHolderCh chan *chunk.Chunk

trace opentracing.Span
}

// AfFinalResult indicates aggregation functions final result.
Expand Down Expand Up @@ -351,12 +359,14 @@ func (w *HashAggPartialWorker) run(ctx sessionctx.Context, waitGroup *sync.WaitG
if needShuffle {
w.shuffleIntermData(sc, finalConcurrency)
}
w.trace.Finish()
waitGroup.Done()
}()
for {
if !w.getChildInput() {
return
}
w.trace.LogKV("event", "update partial result")
if err := w.updatePartialResult(ctx, sc, w.chk, len(w.aggCtxsMap)); err != nil {
w.globalOutputCh <- &AfFinalResult{err: errors.Trace(err)}
return
Expand All @@ -369,6 +379,7 @@ func (w *HashAggPartialWorker) run(ctx sessionctx.Context, waitGroup *sync.WaitG

func (w *HashAggPartialWorker) updatePartialResult(ctx sessionctx.Context, sc *stmtctx.StatementContext, chk *chunk.Chunk, finalConcurrency int) (err error) {
inputIter := chunk.NewIterator4Chunk(chk)
w.trace.LogKV("event", "iterating chunk's data")
for row := inputIter.Begin(); row != inputIter.End(); row = inputIter.Next() {
groupKey, err := w.getGroupKey(sc, row)
if err != nil {
Expand All @@ -388,6 +399,7 @@ func (w *HashAggPartialWorker) updatePartialResult(ctx sessionctx.Context, sc *s
// We only support parallel execution for single-machine, so process of encode and decode can be skipped.
func (w *HashAggPartialWorker) shuffleIntermData(sc *stmtctx.StatementContext, finalConcurrency int) {
groupKeysSlice := make([][][]byte, finalConcurrency)
w.trace.LogKV("event", "shuffling interm data")
for groupKey := range w.aggCtxsMap {
groupKeyBytes := []byte(groupKey)
finalWorkerIdx := int(murmur3.Sum32(groupKeyBytes)) % finalConcurrency
Expand Down Expand Up @@ -457,6 +469,7 @@ func (w *HashAggFinalWorker) consumeIntermData(sc *stmtctx.StatementContext) (er
ok bool
intermDataRowsBuffer [][]types.Datum
)
w.trace.LogKV("event", "consuming interm data")
for {
if input, ok = w.getPartialInput(); !ok {
return nil
Expand Down Expand Up @@ -527,6 +540,7 @@ func (w *HashAggFinalWorker) receiveFinalResultHolder() (*chunk.Chunk, bool) {

func (w *HashAggFinalWorker) run(ctx sessionctx.Context, waitGroup *sync.WaitGroup) {
defer func() {
w.trace.Finish()
waitGroup.Done()
}()
sc := ctx.GetSessionVars().StmtCtx
Expand All @@ -539,10 +553,14 @@ func (w *HashAggFinalWorker) run(ctx sessionctx.Context, waitGroup *sync.WaitGro
// Next implements the Executor Next interface.
func (e *HashAggExec) Next(ctx context.Context, chk *chunk.Chunk) error {
chk.Reset()
child := tracing.ChildSpanFromContxt(ctx, "hash_agg_exec")
defer child.Finish()

if e.isUnparallelExec {
return errors.Trace(e.unparallelExec(ctx, chk))
}
return errors.Trace(e.parallelExec(ctx, chk))
err := e.parallelExec(ctx, chk)
return errors.Trace(err)
}

func (e *HashAggExec) fetchChildData(ctx context.Context) {
Expand Down Expand Up @@ -596,14 +614,17 @@ func (e *HashAggExec) prepare4ParallelExec(ctx context.Context) {

partialWorkerWaitGroup := &sync.WaitGroup{}
partialWorkerWaitGroup.Add(len(e.partialWorkers))

for i := range e.partialWorkers {
e.partialWorkers[i].trace = tracing.ChildSpanFromContxt(ctx, fmt.Sprintf("hash_agg_exec_partial_worker_%d", i))
go e.partialWorkers[i].run(e.ctx, partialWorkerWaitGroup, len(e.finalWorkers))
}
go e.waitPartialWorkerAndCloseOutputChs(partialWorkerWaitGroup)

finalWorkerWaitGroup := &sync.WaitGroup{}
finalWorkerWaitGroup.Add(len(e.finalWorkers))
for i := range e.finalWorkers {
e.finalWorkers[i].trace = tracing.ChildSpanFromContxt(ctx, fmt.Sprintf("hash_agg_exec_final_worker_%d", i))
go e.finalWorkers[i].run(e.ctx, finalWorkerWaitGroup)
}
go e.waitFinalWorkerAndCloseFinalOutput(finalWorkerWaitGroup)
Expand Down Expand Up @@ -793,13 +814,16 @@ func (e *StreamAggExec) Close() error {
func (e *StreamAggExec) Next(ctx context.Context, chk *chunk.Chunk) error {
chk.Reset()

child := tracing.ChildSpanFromContxt(ctx, "stream_agg_exec")
defer child.Finish()
for !e.executed && chk.NumRows() < e.maxChunkSize {
err := e.consumeOneGroup(ctx, chk)
if err != nil {
e.executed = true
return errors.Trace(err)
}
}
child.LogKV("event", "StreamAggExec is finished")
return nil
}

Expand Down
2 changes: 2 additions & 0 deletions executor/builder.go
Expand Up @@ -89,6 +89,8 @@ func (b *executorBuilder) build(p plan.Plan) Executor {
return b.buildDelete(v)
case *plan.Execute:
return b.buildExecute(v)
case *plan.Trace:
return b.buildTrace(v)
case *plan.Explain:
return b.buildExplain(v)
case *plan.PointGetPlan:
Expand Down
6 changes: 6 additions & 0 deletions executor/index_lookup_join.go
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/pingcap/tidb/util/memory"
"github.com/pingcap/tidb/util/mvmap"
"github.com/pingcap/tidb/util/ranger"
"github.com/pingcap/tidb/util/tracing"
log "github.com/sirupsen/logrus"
"golang.org/x/net/context"
)
Expand Down Expand Up @@ -189,6 +190,11 @@ func (e *IndexLookUpJoin) newInnerWorker(taskCh chan *lookUpJoinTask) *innerWork

// Next implements the Executor interface.
func (e *IndexLookUpJoin) Next(ctx context.Context, chk *chunk.Chunk) error {
sp := tracing.ChildSpanFromContxt(ctx, "index_lookup_join")
defer func() {
sp.LogKV("event", "index lookup join is finished.")
sp.Finish()
}()
chk.Reset()
e.joinResult.Reset()
for {
Expand Down
7 changes: 7 additions & 0 deletions executor/join.go
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/pingcap/tidb/util/codec"
"github.com/pingcap/tidb/util/memory"
"github.com/pingcap/tidb/util/mvmap"
"github.com/pingcap/tidb/util/tracing"
log "github.com/sirupsen/logrus"
"golang.org/x/net/context"
)
Expand Down Expand Up @@ -184,7 +185,10 @@ func (e *HashJoinExec) getJoinKeyFromChkRow(isOuterKey bool, row chunk.Row, keyB
// fetchOuterChunks get chunks from fetches chunks from the big table in a background goroutine
// and sends the chunks to multiple channels which will be read by multiple join workers.
func (e *HashJoinExec) fetchOuterChunks(ctx context.Context) {
sp := tracing.SpanFromContext(ctx)
defer func() {
sp.LogKV("event", "finishing fetchOuterChunks")
sp.Finish()
for i := range e.outerResultChs {
close(e.outerResultChs[i])
}
Expand Down Expand Up @@ -471,6 +475,8 @@ func (e *HashJoinExec) join2Chunk(workerID uint, outerChk *chunk.Chunk, joinResu
// step 1. fetch data from inner child and build a hash table;
// step 2. fetch data from outer child in a background goroutine and probe the hash table in multiple join workers.
func (e *HashJoinExec) Next(ctx context.Context, chk *chunk.Chunk) (err error) {
sp := tracing.ChildSpanFromContxt(ctx, "hash_join_exec")
defer sp.Finish()
if !e.prepared {
e.innerFinished = make(chan error, 1)
go e.fetchInnerAndBuildHashTable(ctx)
Expand All @@ -491,6 +497,7 @@ func (e *HashJoinExec) Next(ctx context.Context, chk *chunk.Chunk) (err error) {
}
chk.SwapColumns(result.chk)
result.src <- result.chk
sp.LogKV("event", "hash join exec is finished")
return nil
}

Expand Down
7 changes: 7 additions & 0 deletions executor/projection.go
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/tracing"
"golang.org/x/net/context"
)

Expand Down Expand Up @@ -61,6 +62,7 @@ type ProjectionExec struct {

// Open implements the Executor Open interface.
func (e *ProjectionExec) Open(ctx context.Context) error {
_ = tracing.ChildSpanFromContxt(ctx, "projection_exec")
if err := e.baseExecutor.Open(ctx); err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -139,6 +141,11 @@ func (e *ProjectionExec) Open(ctx context.Context) error {
// +------------------------------+ +----------------------+
//
func (e *ProjectionExec) Next(ctx context.Context, chk *chunk.Chunk) error {
sp := tracing.ChildSpanFromContxt(ctx, "projection_exec")
defer func() {
sp.LogKV("event", "projection_exec is finished")
sp.Finish()
}()
chk.Reset()
if e.isUnparallelExec() {
return errors.Trace(e.unParallelExecute(ctx, chk))
Expand Down
5 changes: 3 additions & 2 deletions executor/table_reader.go
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/ranger"
"github.com/pingcap/tidb/util/tracing"
tipb "github.com/pingcap/tipb/go-tipb"
"golang.org/x/net/context"
)
Expand Down Expand Up @@ -57,8 +58,8 @@ type TableReaderExecutor struct {

// Open initialzes necessary variables for using this executor.
func (e *TableReaderExecutor) Open(ctx context.Context) error {
span, ctx := startSpanFollowsContext(ctx, "executor.TableReader.Open")
defer span.Finish()
child := tracing.ChildSpanFromContxt(ctx, "table_reader_exec")
defer child.Finish()

var err error
if e.corColInFilter {
Expand Down

0 comments on commit a8821c8

Please sign in to comment.