Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: tidb tracing prototype #7016

Merged
merged 27 commits into from Aug 30, 2018
Merged
Show file tree
Hide file tree
Changes from 26 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
12 changes: 12 additions & 0 deletions executor/builder.go
Expand Up @@ -89,6 +89,8 @@ func (b *executorBuilder) build(p plan.Plan) Executor {
return b.buildDelete(v)
case *plan.Execute:
return b.buildExecute(v)
case *plan.Trace:
return b.buildTrace(v)
case *plan.Explain:
return b.buildExplain(v)
case *plan.PointGetPlan:
Expand Down Expand Up @@ -619,6 +621,16 @@ func (b *executorBuilder) buildDDL(v *plan.DDL) Executor {
return e
}

// buildTrace builds a TraceExec for future executing. This method will be called
// at build().
func (b *executorBuilder) buildTrace(v *plan.Trace) Executor {
return &TraceExec{
baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ExplainID()),
stmtNode: v.StmtNode,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we store the plain text of the select statement to be traced? We can trace the time spent by parser with the help of this.

builder: b,
}
}

// buildExplain builds a explain executor. `e.rows` collects final result to `ExplainExec`.
func (b *executorBuilder) buildExplain(v *plan.Explain) Executor {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Move code makes a reviewer waste more time to view a PR

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup. Thanks for the kind and good advice. Already did this, please refer to #7065 and #7063.

e := &ExplainExec{
Expand Down
19 changes: 0 additions & 19 deletions executor/distsql.go
Expand Up @@ -23,7 +23,6 @@ import (
"unsafe"

"github.com/juju/errors"
"github.com/opentracing/opentracing-go"
"github.com/pingcap/tidb/distsql"
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/kv"
Expand Down Expand Up @@ -210,18 +209,6 @@ func splitRanges(ranges []*ranger.Range, keepOrder bool) ([]*ranger.Range, []*ra
return signedRanges, unsignedRanges
}

// startSpanFollowContext is similar to opentracing.StartSpanFromContext, but the span reference use FollowsFrom option.
func startSpanFollowsContext(ctx context.Context, operationName string) (opentracing.Span, context.Context) {
span := opentracing.SpanFromContext(ctx)
if span != nil {
span = opentracing.StartSpan(operationName, opentracing.FollowsFrom(span.Context()))
} else {
span = opentracing.StartSpan(operationName)
}

return span, opentracing.ContextWithSpan(ctx, span)
}

// rebuildIndexRanges will be called if there's correlated column in access conditions. We will rebuild the range
// by substitute correlated column with the constant.
func rebuildIndexRanges(ctx sessionctx.Context, is *plan.PhysicalIndexScan, idxCols []*expression.Column, colLens []int) (ranges []*ranger.Range, err error) {
Expand Down Expand Up @@ -298,9 +285,6 @@ func (e *IndexReaderExecutor) Open(ctx context.Context) error {
}

func (e *IndexReaderExecutor) open(ctx context.Context, kvRanges []kv.KeyRange) error {
span, ctx := startSpanFollowsContext(ctx, "executor.IndexReader.Open")
defer span.Finish()

var err error
if e.corColInFilter {
e.dagPB.Executors, _, err = constructDistExec(e.ctx, e.plans)
Expand Down Expand Up @@ -403,9 +387,6 @@ func (e *IndexLookUpExecutor) open(ctx context.Context, kvRanges []kv.KeyRange)
e.memTracker = memory.NewTracker(e.id, e.ctx.GetSessionVars().MemQuotaIndexLookupReader)
e.memTracker.AttachTo(e.ctx.GetSessionVars().StmtCtx.MemTracker)

span, ctx := startSpanFollowsContext(ctx, "executor.IndexLookUp.Open")
defer span.Finish()

e.finished = make(chan struct{})
e.resultCh = make(chan *lookupTableTask, atomic.LoadInt32(&LookupTableTaskChannelSize))

Expand Down
11 changes: 4 additions & 7 deletions executor/table_reader.go
Expand Up @@ -57,9 +57,6 @@ type TableReaderExecutor struct {

// Open initialzes necessary variables for using this executor.
func (e *TableReaderExecutor) Open(ctx context.Context) error {
span, ctx := startSpanFollowsContext(ctx, "executor.TableReader.Open")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why remove this?

defer span.Finish()

var err error
if e.corColInFilter {
e.dagPB.Executors, _, err = constructDistExec(e.ctx, e.plans)
Expand Down Expand Up @@ -101,11 +98,11 @@ func (e *TableReaderExecutor) Open(ctx context.Context) error {
// Next fills data into the chunk passed by its caller.
// The task was actually done by tableReaderHandler.
func (e *TableReaderExecutor) Next(ctx context.Context, chk *chunk.Chunk) error {
err := e.resultHandler.nextChunk(ctx, chk)
if err != nil {
if err := e.resultHandler.nextChunk(ctx, chk); err != nil {
e.feedback.Invalidate()
return err
}
return errors.Trace(err)
return errors.Trace(nil)
}

// Close implements the Executor Close interface.
Expand All @@ -115,7 +112,7 @@ func (e *TableReaderExecutor) Close() error {
return errors.Trace(err)
}

// buildResp first build request and send it to tikv using distsql.Select. It uses SelectResut returned by the callee
// buildResp first builds request and sends it to tikv using distsql.Select. It uses SelectResut returned by the callee
// to fetch all results.
func (e *TableReaderExecutor) buildResp(ctx context.Context, ranges []*ranger.Range) (distsql.SelectResult, error) {
var builder distsql.RequestBuilder
Expand Down
131 changes: 131 additions & 0 deletions executor/trace.go
@@ -0,0 +1,131 @@
// Copyright 2018 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package executor

import (
"time"

"github.com/juju/errors"
"github.com/opentracing/basictracer-go"
opentracing "github.com/opentracing/opentracing-go"
"github.com/pingcap/tidb/ast"
"github.com/pingcap/tidb/plan"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/tracing"
"golang.org/x/net/context"
)

// TraceExec represents a root executor of trace query.
type TraceExec struct {
baseExecutor
// CollectedSpans collects all span during execution. Span is appended via
// callback method which passes into tracer implementation.
CollectedSpans []basictracer.RawSpan
// exhausted being true means there is no more result.
exhausted bool
zhexuany marked this conversation as resolved.
Show resolved Hide resolved
// stmtNode is the real query ast tree and it is used for building real query's plan.
stmtNode ast.StmtNode
// rootTrace represents root span which is father of all other span.
rootTrace opentracing.Span

builder *executorBuilder
}

// Next executes real query and collects span later.
func (e *TraceExec) Next(ctx context.Context, chk *chunk.Chunk) error {
chk.Reset()
if e.exhausted {
return nil
}

// record how much time was spent for optimizeing plan
optimizeSp := e.rootTrace.Tracer().StartSpan("plan_optimize", opentracing.FollowsFrom(e.rootTrace.Context()))
stmtPlan, err := plan.Optimize(e.builder.ctx, e.stmtNode, e.builder.is)
if err != nil {
return err
}
optimizeSp.Finish()

zhexuany marked this conversation as resolved.
Show resolved Hide resolved
pp, ok := stmtPlan.(plan.PhysicalPlan)
if !ok {
return errors.New("cannot cast logical plan to physical plan")
}

zhexuany marked this conversation as resolved.
Show resolved Hide resolved
// append select executor to trace executor
stmtExec := e.builder.build(pp)

e.rootTrace = tracing.NewRecordedTrace("trace_exec", func(sp basictracer.RawSpan) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After that, global opentracing Tracer is modified?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. I will file another PR to better handle this.

e.CollectedSpans = append(e.CollectedSpans, sp)
})
err = stmtExec.Open(ctx)
if err != nil {
return errors.Trace(err)
}
stmtExecChk := stmtExec.newChunk()

// store span into context
ctx = opentracing.ContextWithSpan(ctx, e.rootTrace)

for {
if err := stmtExec.Next(ctx, stmtExecChk); err != nil {
return errors.Trace(err)
}
if stmtExecChk.NumRows() == 0 {
break
}
}

e.rootTrace.LogKV("event", "tracing completed")
e.rootTrace.Finish()
var rootSpan basictracer.RawSpan

treeSpans := make(map[uint64][]basictracer.RawSpan)
for _, sp := range e.CollectedSpans {
treeSpans[sp.ParentSpanID] = append(treeSpans[sp.ParentSpanID], sp)
// if a span's parentSpanID is 0, then it is root span
// this is by design
if sp.ParentSpanID == 0 {
rootSpan = sp
}
}

dfsTree(rootSpan, treeSpans, "", false, chk)
e.exhausted = true
return nil
}

func dfsTree(span basictracer.RawSpan, tree map[uint64][]basictracer.RawSpan, prefix string, isLast bool, chk *chunk.Chunk) {
suffix := ""
spans := tree[span.Context.SpanID]
var newPrefix string
if span.ParentSpanID == 0 {
newPrefix = prefix
} else {
if len(tree[span.ParentSpanID]) > 0 && !isLast {
suffix = "├─"
newPrefix = prefix + "│ "
} else {
suffix = "└─"
newPrefix = prefix + " "
}
}

chk.AppendString(0, prefix+suffix+span.Operation)
chk.AppendString(1, span.Start.Format(time.StampNano))
chk.AppendString(2, span.Duration.String())

for i, sp := range spans {
dfsTree(sp, tree, newPrefix, i == (len(spans))-1 /*last element of array*/, chk)
}
}
33 changes: 33 additions & 0 deletions executor/trace_test.go
@@ -0,0 +1,33 @@
// Copyright 2018 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package executor_test

import (
. "github.com/pingcap/check"
"github.com/pingcap/tidb/util/testkit"
)

type testTraceExec struct{}

func (s *testTraceExec) SetupSuite(c *C) {
}

func (s *testSuite) TestTraceExec(c *C) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems this test file does nothing, can we remove the whole file and add it when adding ut.
And I think it's better to add these integration test in mysql-test, not in the ut.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. I leave a todo and I will soon add the actual test in another PR.

tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
testSQL := `create table trace (id int PRIMARY KEY AUTO_INCREMENT, c1 int, c2 int, c3 int default 1);`
tk.MustExec(testSQL)
// TODO: check result later in another PR.
tk.MustExec("trace select * from trace where id = 0;")
}
22 changes: 22 additions & 0 deletions plan/planbuilder.go
Expand Up @@ -141,6 +141,8 @@ func (b *planBuilder) build(node ast.Node) (Plan, error) {
return b.buildExecute(x)
case *ast.ExplainStmt:
return b.buildExplain(x)
case *ast.TraceStmt:
return b.buildTrace(x)
case *ast.InsertStmt:
return b.buildInsert(x)
case *ast.LoadDataStmt:
Expand Down Expand Up @@ -1359,6 +1361,26 @@ func (b *planBuilder) buildDDL(node ast.DDLNode) Plan {
return p
}

// buildTrace builds a trace plan. Inside this method, it first optimize the
// underlying query and then constructs a schema, which will be used to constructs
// rows result.
func (b *planBuilder) buildTrace(trace *ast.TraceStmt) (Plan, error) {
if _, ok := trace.Stmt.(*ast.SelectStmt); !ok {
return nil, errors.New("trace only supports select query")
}

p := &Trace{StmtNode: trace.Stmt}

retFields := []string{"operation", "duration", "spanID"}
schema := expression.NewSchema(make([]*expression.Column, 0, len(retFields))...)
schema.Append(buildColumn("", "operation", mysql.TypeString, mysql.MaxBlobWidth))

schema.Append(buildColumn("", "startTS", mysql.TypeString, mysql.MaxBlobWidth))
schema.Append(buildColumn("", "duration", mysql.TypeString, mysql.MaxBlobWidth))
p.SetSchema(schema)
return p, nil
}

func (b *planBuilder) buildExplain(explain *ast.ExplainStmt) (Plan, error) {
if show, ok := explain.Stmt.(*ast.ShowStmt); ok {
return b.buildShow(show)
Expand Down
12 changes: 12 additions & 0 deletions plan/trace.go
@@ -0,0 +1,12 @@
package plan

import (
"github.com/pingcap/tidb/ast"
)

// Trace represents a trace plan.
type Trace struct {
baseSchemaProducer

StmtNode ast.StmtNode
}
26 changes: 0 additions & 26 deletions session/session.go
Expand Up @@ -29,7 +29,6 @@ import (

"github.com/juju/errors"
"github.com/ngaut/pools"
"github.com/opentracing/opentracing-go"
"github.com/pingcap/tidb/ast"
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/executor"
Expand Down Expand Up @@ -375,11 +374,6 @@ func (s *session) doCommitWithRetry(ctx context.Context) error {
}

func (s *session) CommitTxn(ctx context.Context) error {
if span := opentracing.SpanFromContext(ctx); span != nil {
span = opentracing.StartSpan("session.CommitTxn", opentracing.ChildOf(span.Context()))
defer span.Finish()
}

err := s.doCommitWithRetry(ctx)
label := metrics.LblOK
if err != nil {
Expand All @@ -390,11 +384,6 @@ func (s *session) CommitTxn(ctx context.Context) error {
}

func (s *session) RollbackTxn(ctx context.Context) error {
if span := opentracing.SpanFromContext(ctx); span != nil {
span = opentracing.StartSpan("session.RollbackTxn", opentracing.ChildOf(span.Context()))
defer span.Finish()
}

var err error
if s.txn.Valid() {
terror.Log(s.txn.Rollback())
Expand Down Expand Up @@ -451,9 +440,6 @@ func (s *session) isRetryableError(err error) bool {
}

func (s *session) retry(ctx context.Context, maxCnt uint) error {
span, ctx := opentracing.StartSpanFromContext(ctx, "retry")
defer span.Finish()

connID := s.sessionVars.ConnectionID
if s.sessionVars.TxnCtx.ForUpdate {
return errForUpdateCantRetry.GenByArgs(connID)
Expand Down Expand Up @@ -545,10 +531,7 @@ func (s *session) sysSessionPool() *pools.ResourcePool {
// Unlike normal Exec, it doesn't reset statement status, doesn't commit or rollback the current transaction
// and doesn't write binlog.
func (s *session) ExecRestrictedSQL(sctx sessionctx.Context, sql string) ([]chunk.Row, []*ast.ResultField, error) {
var span opentracing.Span
ctx := context.TODO()
span, ctx = opentracing.StartSpanFromContext(ctx, "session.ExecRestrictedSQL")
defer span.Finish()

// Use special session to execute the sql.
tmp, err := s.sysSessionPool().Get()
Expand Down Expand Up @@ -712,10 +695,6 @@ func (s *session) SetGlobalSysVar(name, value string) error {
}

func (s *session) ParseSQL(ctx context.Context, sql, charset, collation string) ([]ast.StmtNode, error) {
if span := opentracing.SpanFromContext(ctx); span != nil {
span1 := opentracing.StartSpan("session.ParseSQL", opentracing.ChildOf(span.Context()))
defer span1.Finish()
}
s.parser.SetSQLMode(s.sessionVars.SQLMode)
return s.parser.Parse(sql, charset, collation)
}
Expand Down Expand Up @@ -770,11 +749,6 @@ func (s *session) Execute(ctx context.Context, sql string) (recordSets []ast.Rec
}

func (s *session) execute(ctx context.Context, sql string) (recordSets []ast.RecordSet, err error) {
if span := opentracing.SpanFromContext(ctx); span != nil {
span, ctx = opentracing.StartSpanFromContext(ctx, "session.Execute")
defer span.Finish()
}

s.PrepareTxnCtx(ctx)
connID := s.sessionVars.ConnectionID
err = s.loadCommonGlobalVariablesIfNeeded()
Expand Down