Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

planner: unify the Compile and CompileExecStmt functions #37065

Merged
merged 28 commits into from
Aug 16, 2022
Merged
Show file tree
Hide file tree
Changes from 19 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
db49c25
planner: unify the Compile and CompileExecStmt functions
Reminiscent Aug 12, 2022
9f89e0b
Merge branch 'master' of https://github.com/pingcap/tidb into refacto…
Reminiscent Aug 12, 2022
4201813
fix ut
Reminiscent Aug 12, 2022
776e914
Merge branch 'master' of https://github.com/pingcap/tidb into refacto…
Reminiscent Aug 12, 2022
84b9879
Merge branch 'master' into refactor-PC-compile
Reminiscent Aug 15, 2022
8d30c20
fix ut
Reminiscent Aug 15, 2022
ab5c1c7
Merge branch 'master' of https://github.com/pingcap/tidb into refacto…
Reminiscent Aug 15, 2022
27e68fc
Merge remote-tracking branch 'origin/refactor-PC-compile' into refact…
Reminiscent Aug 15, 2022
7f1f5a2
add the short path for PointGet to compiler
Reminiscent Aug 15, 2022
8ea4f0a
git pull upstream master
Reminiscent Aug 15, 2022
2def5dc
fix ut
Reminiscent Aug 15, 2022
7afc42f
Merge branch 'master' of https://github.com/pingcap/tidb into refacto…
Reminiscent Aug 15, 2022
6ebd83a
revert
Reminiscent Aug 15, 2022
5113185
Merge branch 'master' into refactor-PC-compile
qw4990 Aug 16, 2022
369ab78
fix ut
Reminiscent Aug 16, 2022
647e903
Merge branch 'master' of https://github.com/pingcap/tidb into refacto…
Reminiscent Aug 16, 2022
09b8056
Merge remote-tracking branch 'origin/refactor-PC-compile' into refact…
Reminiscent Aug 16, 2022
762f07e
Merge the PointGet short path to general execute path
Reminiscent Aug 16, 2022
276878c
Merge branch 'master' into refactor-PC-compile
Reminiscent Aug 16, 2022
9837405
Revert "Merge the PointGet short path to general execute path"
Reminiscent Aug 16, 2022
7517765
address comment
Reminiscent Aug 16, 2022
f60b21d
Merge branch 'master' of https://github.com/pingcap/tidb into refacto…
Reminiscent Aug 16, 2022
82735e9
Merge remote-tracking branch 'origin/refactor-PC-compile' into refact…
Reminiscent Aug 16, 2022
42c8e7e
Merge branch 'master' into refactor-PC-compile
qw4990 Aug 16, 2022
70167e2
address comment
Reminiscent Aug 16, 2022
8adad8c
Merge branch 'master' of https://github.com/pingcap/tidb into refacto…
Reminiscent Aug 16, 2022
ce5c126
Merge remote-tracking branch 'origin/refactor-PC-compile' into refact…
Reminiscent Aug 16, 2022
aa06119
Merge branch 'master' into refactor-PC-compile
qw4990 Aug 16, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
91 changes: 32 additions & 59 deletions executor/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,62 +251,6 @@ func (a ExecStmt) GetStmtNode() ast.StmtNode {
return a.StmtNode
}

// PointGet short path for point exec directly from plan, keep only necessary steps
func (a *ExecStmt) PointGet(ctx context.Context) (*recordSet, error) {
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span1 := span.Tracer().StartSpan("ExecStmt.PointGet", opentracing.ChildOf(span.Context()))
span1.LogKV("sql", a.OriginText())
defer span1.Finish()
ctx = opentracing.ContextWithSpan(ctx, span1)
}
failpoint.Inject("assertTxnManagerInShortPointGetPlan", func() {
sessiontxn.RecordAssert(a.Ctx, "assertTxnManagerInShortPointGetPlan", true)
// stale read should not reach here
staleread.AssertStmtStaleness(a.Ctx, false)
sessiontxn.AssertTxnManagerInfoSchema(a.Ctx, a.InfoSchema)
})

ctx = a.observeStmtBeginForTopSQL(ctx)
startTs, err := sessiontxn.GetTxnManager(a.Ctx).GetStmtReadTS()
if err != nil {
return nil, err
}
a.Ctx.GetSessionVars().StmtCtx.Priority = kv.PriorityHigh

// try to reuse point get executor
if a.PsStmt.Executor != nil {
exec, ok := a.PsStmt.Executor.(*PointGetExecutor)
if !ok {
logutil.Logger(ctx).Error("invalid executor type, not PointGetExecutor for point get path")
a.PsStmt.Executor = nil
} else {
// CachedPlan type is already checked in last step
pointGetPlan := a.PsStmt.PreparedAst.CachedPlan.(*plannercore.PointGetPlan)
exec.Init(pointGetPlan)
a.PsStmt.Executor = exec
}
}
if a.PsStmt.Executor == nil {
b := newExecutorBuilder(a.Ctx, a.InfoSchema, a.Ti)
newExecutor := b.build(a.Plan)
if b.err != nil {
return nil, b.err
}
a.PsStmt.Executor = newExecutor
}
pointExecutor := a.PsStmt.Executor.(*PointGetExecutor)

if err = pointExecutor.Open(ctx); err != nil {
terror.Call(pointExecutor.Close)
return nil, err
}
return &recordSet{
executor: pointExecutor,
stmt: a,
txnStartTS: startTs,
}, nil
}

// OriginText returns original statement as a string.
func (a *ExecStmt) OriginText() string {
return a.Text
Expand Down Expand Up @@ -441,9 +385,38 @@ func (a *ExecStmt) Exec(ctx context.Context) (_ sqlexec.RecordSet, err error) {
sctx.GetSessionVars().StmtCtx.MemTracker.SetBytesLimit(sctx.GetSessionVars().StmtCtx.MemQuotaQuery)
}

e, err := a.buildExecutor()
if err != nil {
return nil, err
var e Executor
if a.PsStmt != nil {
if a.PsStmt.Executor != nil {
exec, ok := a.PsStmt.Executor.(*PointGetExecutor)
if !ok {
logutil.Logger(ctx).Error("invalid executor type, not PointGetExecutor for point get path")
a.PsStmt.Executor = nil
} else {
a.Ctx.GetSessionVars().StmtCtx.Priority = kv.PriorityHigh
// CachedPlan type is already checked in last step
pointGetPlan := a.PsStmt.PreparedAst.CachedPlan.(*plannercore.PointGetPlan)
exec.Init(pointGetPlan)
a.PsStmt.Executor = exec

}
}

if a.PsStmt.Executor == nil {
b := newExecutorBuilder(a.Ctx, a.InfoSchema, a.Ti)
newExecutor := b.build(a.Plan)
if b.err != nil {
return nil, b.err
}
a.PsStmt.Executor = newExecutor
}

e = a.PsStmt.Executor.(*PointGetExecutor)
} else {
e, err = a.buildExecutor()
if err != nil {
return nil, err
}
}
// ExecuteExec will rewrite `a.Plan`, so set plan label should be executed after `a.buildExecutor`.
ctx = a.observeStmtBeginForTopSQL(ctx)
Expand Down
41 changes: 37 additions & 4 deletions executor/compiler.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,24 @@ func (c *Compiler) Compile(ctx context.Context, stmtNode ast.StmtNode) (*ExecStm
})

is := sessiontxn.GetTxnManager(c.Ctx).GetTxnInfoSchema()
sessVars := c.Ctx.GetSessionVars()
stmtCtx := sessVars.StmtCtx
// handle the execute statement
var (
pointPlanShortPathOK bool
preparedObj *plannercore.PlanCacheStmt
)

if execStmt, ok := stmtNode.(*ast.ExecuteStmt); ok {
preparedObj, err = plannercore.GetPreparedStmt(execStmt, sessVars)
if err != nil {
return nil, err
}
pointPlanShortPathOK, err = plannercore.IsPointPlanShortPathOK(c.Ctx, is, preparedObj)
if err != nil {
return nil, err
}
Reminiscent marked this conversation as resolved.
Show resolved Hide resolved
}
finalPlan, names, err := planner.Optimize(ctx, c.Ctx, stmtNode, is)
if err != nil {
return nil, err
Expand All @@ -86,13 +104,14 @@ func (c *Compiler) Compile(ctx context.Context, stmtNode ast.StmtNode) (*ExecStm
staleread.AssertStmtStaleness(c.Ctx, val.(bool))
})

CountStmtNode(stmtNode, c.Ctx.GetSessionVars().InRestrictedSQL)
// TODO: Should we use the Execute statement or the corresponding Prepare statement to record?
CountStmtNode(stmtNode, sessVars.InRestrictedSQL)
var lowerPriority bool
if c.Ctx.GetSessionVars().StmtCtx.Priority == mysql.NoPriority {
lowerPriority = needLowerPriority(finalPlan)
}
c.Ctx.GetSessionVars().StmtCtx.SetPlan(finalPlan)
return &ExecStmt{
stmtCtx.SetPlan(finalPlan)
stmt := &ExecStmt{
GoCtx: ctx,
InfoSchema: is,
Plan: finalPlan,
Expand All @@ -102,7 +121,21 @@ func (c *Compiler) Compile(ctx context.Context, stmtNode ast.StmtNode) (*ExecStm
Ctx: c.Ctx,
OutputNames: names,
Ti: &TelemetryInfo{},
}, nil
}
if pointPlanShortPathOK {
if ep, ok := stmt.Plan.(*plannercore.Execute); ok {
if pointPlan, ok := ep.Plan.(*plannercore.PointGetPlan); ok {
stmtCtx.SetPlan(stmt.Plan)
stmtCtx.SetPlanDigest(preparedObj.NormalizedPlan, preparedObj.PlanDigest)
stmt.Plan = pointPlan
stmt.PsStmt = preparedObj
} else {
// invalid the previous cached point plan
preparedObj.PreparedAst.CachedPlan = nil
}
}
}
return stmt, nil
}

// needLowerPriority checks whether it's needed to lower the execution priority
Expand Down
61 changes: 0 additions & 61 deletions executor/prepared.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,8 @@ package executor
import (
"context"
"math"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/log"
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/infoschema"
Expand All @@ -30,7 +28,6 @@ import (
"github.com/pingcap/tidb/planner"
plannercore "github.com/pingcap/tidb/planner/core"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessiontxn"
"github.com/pingcap/tidb/types"
driver "github.com/pingcap/tidb/types/parser_driver"
"github.com/pingcap/tidb/util"
Expand Down Expand Up @@ -325,61 +322,3 @@ func (e *DeallocateExec) Next(ctx context.Context, req *chunk.Chunk) error {
vars.RemovePreparedStmt(id)
return nil
}

// CompileExecutePreparedStmt compiles a session Execute command to a stmt.Statement.
func CompileExecutePreparedStmt(ctx context.Context, sctx sessionctx.Context,
execStmt *ast.ExecuteStmt, is infoschema.InfoSchema) (*ExecStmt, error) {
startTime := time.Now()
defer func() {
sctx.GetSessionVars().DurationCompile = time.Since(startTime)
}()

preparedObj, err := plannercore.GetPreparedStmt(execStmt, sctx.GetSessionVars())
if err != nil {
return nil, err
}
pointPlanShortPathOK, err := plannercore.IsPointPlanShortPathOK(sctx, is, preparedObj)
if err != nil {
return nil, err
}

execPlan, names, err := planner.Optimize(ctx, sctx, execStmt, is)
if err != nil {
return nil, err
}

failpoint.Inject("assertTxnManagerInCompile", func() {
sessiontxn.RecordAssert(sctx, "assertTxnManagerInCompile", true)
sessiontxn.AssertTxnManagerInfoSchema(sctx, is)
})

stmt := &ExecStmt{
GoCtx: ctx,
InfoSchema: is,
Plan: execPlan,
StmtNode: execStmt,
Ctx: sctx,
OutputNames: names,
Ti: &TelemetryInfo{},
}
stmtCtx := sctx.GetSessionVars().StmtCtx
stmt.Text = preparedObj.PreparedAst.Stmt.Text()
stmtCtx.OriginalSQL = stmt.Text
stmtCtx.InitSQLDigest(preparedObj.NormalizedSQL, preparedObj.SQLDigest)

// handle point plan short path specially
if pointPlanShortPathOK {
if ep, ok := execPlan.(*plannercore.Execute); ok {
if pointPlan, ok := ep.Plan.(*plannercore.PointGetPlan); ok {
stmtCtx.SetPlan(execPlan)
stmtCtx.SetPlanDigest(preparedObj.NormalizedPlan, preparedObj.PlanDigest)
stmt.Plan = pointPlan
stmt.PsStmt = preparedObj
} else {
// invalid the previous cached point plan
preparedObj.PreparedAst.CachedPlan = nil
}
}
}
return stmt, nil
}
8 changes: 3 additions & 5 deletions executor/seqtest/prepared_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (

"github.com/pingcap/tidb/executor"
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/infoschema"
"github.com/pingcap/tidb/metrics"
"github.com/pingcap/tidb/parser/ast"
"github.com/pingcap/tidb/parser/mysql"
Expand Down Expand Up @@ -143,11 +142,10 @@ func TestPrepared(t *testing.T) {
prepStmt, err := tk.Session().GetSessionVars().GetPreparedStmtByID(stmtID)
require.NoError(t, err)
execStmt := &ast.ExecuteStmt{PrepStmt: prepStmt, BinaryArgs: expression.Args2Expressions4Test(1)}
// Check that ast.Statement created by executor.CompileExecutePreparedStmt has query text.
stmt, err := executor.CompileExecutePreparedStmt(context.TODO(), tk.Session(), execStmt,
tk.Session().GetInfoSchema().(infoschema.InfoSchema))
// Check that ast.Statement created by compiler.Compile has query text.
compiler := executor.Compiler{Ctx: tk.Session()}
stmt, err := compiler.Compile(context.TODO(), execStmt)
require.NoError(t, err)
require.Equal(t, query, stmt.OriginText())

// Check that rebuild plan works.
err = tk.Session().PrepareTxnCtx(ctx)
Expand Down
9 changes: 4 additions & 5 deletions session/bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import (
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/executor"
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/infoschema"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/parser/ast"
"github.com/pingcap/tidb/store/mockstore"
Expand Down Expand Up @@ -1714,7 +1713,7 @@ func BenchmarkInsertIntoSelect(b *testing.B) {
b.StopTimer()
}

func BenchmarkCompileExecutePreparedStmt(b *testing.B) {
func BenchmarkCompileStmt(b *testing.B) {
// See issue https://github.com/pingcap/tidb/issues/27633
se, do, st := prepareBenchSession()
defer func() {
Expand Down Expand Up @@ -1815,12 +1814,12 @@ func BenchmarkCompileExecutePreparedStmt(b *testing.B) {
}

args := expression.Args2Expressions4Test(3401544)
is := se.GetInfoSchema()

b.ResetTimer()
stmtExec := &ast.ExecuteStmt{PrepStmt: prepStmt, BinaryArgs: args}
compiler := executor.Compiler{Ctx: se}
for i := 0; i < b.N; i++ {
_, err := executor.CompileExecutePreparedStmt(context.Background(), se, stmtExec, is.(infoschema.InfoSchema))
_, err := compiler.Compile(context.Background(), stmtExec)
if err != nil {
b.Fatal(err)
}
Expand Down Expand Up @@ -1859,6 +1858,6 @@ func TestBenchDaily(t *testing.T) {
BenchmarkHashPartitionPruningPointSelect,
BenchmarkHashPartitionPruningMultiSelect,
BenchmarkInsertIntoSelect,
BenchmarkCompileExecutePreparedStmt,
BenchmarkCompileStmt,
)
}
40 changes: 29 additions & 11 deletions session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -2009,7 +2009,12 @@ func (s *session) ExecuteStmt(ctx context.Context, stmtNode ast.StmtNode) (sqlex

// Execute the physical plan.
logStmt(stmt, s)
recordSet, err := runStmt(ctx, s, stmt)

var recordSet sqlexec.RecordSet
recordSet, err = runStmt(ctx, s, stmt)
if stmt.PsStmt != nil { // point plan short path
s.txn.changeToInvalid()
}
if err != nil {
if !errIsNoisy(err) {
logutil.Logger(ctx).Warn("run statement failed",
Expand Down Expand Up @@ -2299,16 +2304,30 @@ func (s *session) preparedStmtExec(ctx context.Context, execStmt *ast.ExecuteStm
}
})

is := sessiontxn.GetTxnManager(s).GetTxnInfoSchema()
st, err := executor.CompileExecutePreparedStmt(ctx, s, execStmt, is)
s.sessionVars.StartTime = time.Now()
preparedObj, err := plannercore.GetPreparedStmt(execStmt, s.GetSessionVars())
if err != nil {
return nil, err
}

compiler := executor.Compiler{Ctx: s}
stmt, err := compiler.Compile(ctx, execStmt)
if err == nil {
err = sessiontxn.OptimizeWithPlanAndThenWarmUp(s, st.Plan)
err = sessiontxn.OptimizeWithPlanAndThenWarmUp(s, stmt.Plan)
}
if err != nil {
return nil, err
}
durCompile := time.Since(s.sessionVars.StartTime)
s.GetSessionVars().DurationCompile = durCompile

stmtCtx := s.GetSessionVars().StmtCtx
stmt.Text = preparedObj.PreparedAst.Stmt.Text()
stmtCtx.OriginalSQL = stmt.Text
stmtCtx.InitSQLDigest(preparedObj.NormalizedSQL, preparedObj.SQLDigest)
Comment on lines +2330 to +2336
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This part is copied from the deleted function CompileExecutePreparedStmt.


if !s.isInternal() && config.GetGlobalConfig().EnableTelemetry {
tiFlashPushDown, tiFlashExchangePushDown := plannercore.IsTiFlashContained(st.Plan)
tiFlashPushDown, tiFlashExchangePushDown := plannercore.IsTiFlashContained(stmt.Plan)
telemetry.CurrentExecuteCount.Inc()
if tiFlashPushDown {
telemetry.CurrentTiFlashPushDownCount.Inc()
Expand All @@ -2318,15 +2337,15 @@ func (s *session) preparedStmtExec(ctx context.Context, execStmt *ast.ExecuteStm
}
}
sessionExecuteCompileDurationGeneral.Observe(time.Since(s.sessionVars.StartTime).Seconds())
logGeneralQuery(st, s, true)
logGeneralQuery(stmt, s, true)

if st.PsStmt != nil { // point plan short path
resultSet, err := st.PointGet(ctx)
var recordSet sqlexec.RecordSet
recordSet, err = runStmt(ctx, s, stmt)
if stmt.PsStmt != nil { // point plan short path
s.txn.changeToInvalid()
return resultSet, err
}

return runStmt(ctx, s, st)
return recordSet, err
}

// ExecutePreparedStmt executes a prepared statement.
Expand Down Expand Up @@ -2392,7 +2411,6 @@ func (s *session) executePlanCacheStmt(ctx context.Context, stmt *plannercore.Pl
}
}

executor.CountStmtNode(stmt.PreparedAst.Stmt, s.sessionVars.InRestrictedSQL)
s.txn.onStmtStart(stmt.SQLDigest.String())
defer s.txn.onStmtEnd()

Expand Down