Skip to content

Commit

Permalink
planner: unify the Compile and CompileExecStmt functions (#37065)
Browse files Browse the repository at this point in the history
ref #36598
  • Loading branch information
Reminiscent committed Aug 16, 2022
1 parent d226909 commit b705a13
Show file tree
Hide file tree
Showing 6 changed files with 97 additions and 85 deletions.
16 changes: 16 additions & 0 deletions executor/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,22 @@ func (a *ExecStmt) PointGet(ctx context.Context) (*recordSet, error) {
terror.Call(pointExecutor.Close)
return nil, err
}

sctx := a.Ctx
cmd32 := atomic.LoadUint32(&sctx.GetSessionVars().CommandValue)
cmd := byte(cmd32)
var pi processinfoSetter
if raw, ok := sctx.(processinfoSetter); ok {
pi = raw
sql := a.OriginText()
maxExecutionTime := getMaxExecutionTime(sctx)
// Update processinfo, ShowProcess() will use it.
pi.SetProcessInfo(sql, time.Now(), cmd, maxExecutionTime)
if sctx.GetSessionVars().StmtCtx.StmtType == "" {
sctx.GetSessionVars().StmtCtx.StmtType = GetStmtLabel(a.StmtNode)
}
}

return &recordSet{
executor: pointExecutor,
stmt: a,
Expand Down
39 changes: 35 additions & 4 deletions executor/compiler.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,22 @@ func (c *Compiler) Compile(ctx context.Context, stmtNode ast.StmtNode) (*ExecStm
})

is := sessiontxn.GetTxnManager(c.Ctx).GetTxnInfoSchema()
sessVars := c.Ctx.GetSessionVars()
stmtCtx := sessVars.StmtCtx
// handle the execute statement
var (
pointPlanShortPathOK bool
preparedObj *plannercore.PlanCacheStmt
)

if execStmt, ok := stmtNode.(*ast.ExecuteStmt); ok {
if preparedObj, err = plannercore.GetPreparedStmt(execStmt, sessVars); err != nil {
return nil, err
}
if pointPlanShortPathOK, err = plannercore.IsPointPlanShortPathOK(c.Ctx, is, preparedObj); err != nil {
return nil, err
}
}
finalPlan, names, err := planner.Optimize(ctx, c.Ctx, stmtNode, is)
if err != nil {
return nil, err
Expand All @@ -86,13 +102,14 @@ func (c *Compiler) Compile(ctx context.Context, stmtNode ast.StmtNode) (*ExecStm
staleread.AssertStmtStaleness(c.Ctx, val.(bool))
})

CountStmtNode(stmtNode, c.Ctx.GetSessionVars().InRestrictedSQL)
// TODO: Should we use the Execute statement or the corresponding Prepare statement to record?
CountStmtNode(stmtNode, sessVars.InRestrictedSQL)
var lowerPriority bool
if c.Ctx.GetSessionVars().StmtCtx.Priority == mysql.NoPriority {
lowerPriority = needLowerPriority(finalPlan)
}
c.Ctx.GetSessionVars().StmtCtx.SetPlan(finalPlan)
return &ExecStmt{
stmtCtx.SetPlan(finalPlan)
stmt := &ExecStmt{
GoCtx: ctx,
InfoSchema: is,
Plan: finalPlan,
Expand All @@ -102,7 +119,21 @@ func (c *Compiler) Compile(ctx context.Context, stmtNode ast.StmtNode) (*ExecStm
Ctx: c.Ctx,
OutputNames: names,
Ti: &TelemetryInfo{},
}, nil
}
if pointPlanShortPathOK {
if ep, ok := stmt.Plan.(*plannercore.Execute); ok {
if pointPlan, ok := ep.Plan.(*plannercore.PointGetPlan); ok {
stmtCtx.SetPlan(stmt.Plan)
stmtCtx.SetPlanDigest(preparedObj.NormalizedPlan, preparedObj.PlanDigest)
stmt.Plan = pointPlan
stmt.PsStmt = preparedObj
} else {
// invalid the previous cached point plan
preparedObj.PreparedAst.CachedPlan = nil
}
}
}
return stmt, nil
}

// needLowerPriority checks whether it's needed to lower the execution priority
Expand Down
61 changes: 0 additions & 61 deletions executor/prepared.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,8 @@ package executor
import (
"context"
"math"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/log"
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/infoschema"
Expand All @@ -30,7 +28,6 @@ import (
"github.com/pingcap/tidb/planner"
plannercore "github.com/pingcap/tidb/planner/core"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessiontxn"
"github.com/pingcap/tidb/types"
driver "github.com/pingcap/tidb/types/parser_driver"
"github.com/pingcap/tidb/util"
Expand Down Expand Up @@ -325,61 +322,3 @@ func (e *DeallocateExec) Next(ctx context.Context, req *chunk.Chunk) error {
vars.RemovePreparedStmt(id)
return nil
}

// CompileExecutePreparedStmt compiles a session Execute command to a stmt.Statement.
func CompileExecutePreparedStmt(ctx context.Context, sctx sessionctx.Context,
execStmt *ast.ExecuteStmt, is infoschema.InfoSchema) (*ExecStmt, error) {
startTime := time.Now()
defer func() {
sctx.GetSessionVars().DurationCompile = time.Since(startTime)
}()

preparedObj, err := plannercore.GetPreparedStmt(execStmt, sctx.GetSessionVars())
if err != nil {
return nil, err
}
pointPlanShortPathOK, err := plannercore.IsPointPlanShortPathOK(sctx, is, preparedObj)
if err != nil {
return nil, err
}

execPlan, names, err := planner.Optimize(ctx, sctx, execStmt, is)
if err != nil {
return nil, err
}

failpoint.Inject("assertTxnManagerInCompile", func() {
sessiontxn.RecordAssert(sctx, "assertTxnManagerInCompile", true)
sessiontxn.AssertTxnManagerInfoSchema(sctx, is)
})

stmt := &ExecStmt{
GoCtx: ctx,
InfoSchema: is,
Plan: execPlan,
StmtNode: execStmt,
Ctx: sctx,
OutputNames: names,
Ti: &TelemetryInfo{},
}
stmtCtx := sctx.GetSessionVars().StmtCtx
stmt.Text = preparedObj.PreparedAst.Stmt.Text()
stmtCtx.OriginalSQL = stmt.Text
stmtCtx.InitSQLDigest(preparedObj.NormalizedSQL, preparedObj.SQLDigest)

// handle point plan short path specially
if pointPlanShortPathOK {
if ep, ok := execPlan.(*plannercore.Execute); ok {
if pointPlan, ok := ep.Plan.(*plannercore.PointGetPlan); ok {
stmtCtx.SetPlan(execPlan)
stmtCtx.SetPlanDigest(preparedObj.NormalizedPlan, preparedObj.PlanDigest)
stmt.Plan = pointPlan
stmt.PsStmt = preparedObj
} else {
// invalid the previous cached point plan
preparedObj.PreparedAst.CachedPlan = nil
}
}
}
return stmt, nil
}
8 changes: 3 additions & 5 deletions executor/seqtest/prepared_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (

"github.com/pingcap/tidb/executor"
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/infoschema"
"github.com/pingcap/tidb/metrics"
"github.com/pingcap/tidb/parser/ast"
"github.com/pingcap/tidb/parser/mysql"
Expand Down Expand Up @@ -143,11 +142,10 @@ func TestPrepared(t *testing.T) {
prepStmt, err := tk.Session().GetSessionVars().GetPreparedStmtByID(stmtID)
require.NoError(t, err)
execStmt := &ast.ExecuteStmt{PrepStmt: prepStmt, BinaryArgs: expression.Args2Expressions4Test(1)}
// Check that ast.Statement created by executor.CompileExecutePreparedStmt has query text.
stmt, err := executor.CompileExecutePreparedStmt(context.TODO(), tk.Session(), execStmt,
tk.Session().GetInfoSchema().(infoschema.InfoSchema))
// Check that ast.Statement created by compiler.Compile has query text.
compiler := executor.Compiler{Ctx: tk.Session()}
stmt, err := compiler.Compile(context.TODO(), execStmt)
require.NoError(t, err)
require.Equal(t, query, stmt.OriginText())

// Check that rebuild plan works.
err = tk.Session().PrepareTxnCtx(ctx)
Expand Down
9 changes: 4 additions & 5 deletions session/bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import (
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/executor"
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/infoschema"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/parser/ast"
"github.com/pingcap/tidb/store/mockstore"
Expand Down Expand Up @@ -1714,7 +1713,7 @@ func BenchmarkInsertIntoSelect(b *testing.B) {
b.StopTimer()
}

func BenchmarkCompileExecutePreparedStmt(b *testing.B) {
func BenchmarkCompileStmt(b *testing.B) {
// See issue https://github.com/pingcap/tidb/issues/27633
se, do, st := prepareBenchSession()
defer func() {
Expand Down Expand Up @@ -1815,12 +1814,12 @@ func BenchmarkCompileExecutePreparedStmt(b *testing.B) {
}

args := expression.Args2Expressions4Test(3401544)
is := se.GetInfoSchema()

b.ResetTimer()
stmtExec := &ast.ExecuteStmt{PrepStmt: prepStmt, BinaryArgs: args}
compiler := executor.Compiler{Ctx: se}
for i := 0; i < b.N; i++ {
_, err := executor.CompileExecutePreparedStmt(context.Background(), se, stmtExec, is.(infoschema.InfoSchema))
_, err := compiler.Compile(context.Background(), stmtExec)
if err != nil {
b.Fatal(err)
}
Expand Down Expand Up @@ -1859,6 +1858,6 @@ func TestBenchDaily(t *testing.T) {
BenchmarkHashPartitionPruningPointSelect,
BenchmarkHashPartitionPruningMultiSelect,
BenchmarkInsertIntoSelect,
BenchmarkCompileExecutePreparedStmt,
BenchmarkCompileStmt,
)
}
49 changes: 39 additions & 10 deletions session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -2015,7 +2015,15 @@ func (s *session) ExecuteStmt(ctx context.Context, stmtNode ast.StmtNode) (sqlex

// Execute the physical plan.
logStmt(stmt, s)
recordSet, err := runStmt(ctx, s, stmt)

var recordSet sqlexec.RecordSet
if stmt.PsStmt != nil { // point plan short path
recordSet, err = stmt.PointGet(ctx)
s.txn.changeToInvalid()
} else {
recordSet, err = runStmt(ctx, s, stmt)
}

if err != nil {
if !errIsNoisy(err) {
logutil.Logger(ctx).Warn("run statement failed",
Expand Down Expand Up @@ -2305,16 +2313,30 @@ func (s *session) preparedStmtExec(ctx context.Context, execStmt *ast.ExecuteStm
}
})

is := sessiontxn.GetTxnManager(s).GetTxnInfoSchema()
st, err := executor.CompileExecutePreparedStmt(ctx, s, execStmt, is)
s.sessionVars.StartTime = time.Now()
preparedObj, err := plannercore.GetPreparedStmt(execStmt, s.GetSessionVars())
if err != nil {
return nil, err
}

compiler := executor.Compiler{Ctx: s}
stmt, err := compiler.Compile(ctx, execStmt)
if err == nil {
err = sessiontxn.OptimizeWithPlanAndThenWarmUp(s, st.Plan)
err = sessiontxn.OptimizeWithPlanAndThenWarmUp(s, stmt.Plan)
}
if err != nil {
return nil, err
}
durCompile := time.Since(s.sessionVars.StartTime)
s.GetSessionVars().DurationCompile = durCompile

stmtCtx := s.GetSessionVars().StmtCtx
stmt.Text = preparedObj.PreparedAst.Stmt.Text()
stmtCtx.OriginalSQL = stmt.Text
stmtCtx.InitSQLDigest(preparedObj.NormalizedSQL, preparedObj.SQLDigest)

if !s.isInternal() && config.GetGlobalConfig().EnableTelemetry {
tiFlashPushDown, tiFlashExchangePushDown := plannercore.IsTiFlashContained(st.Plan)
tiFlashPushDown, tiFlashExchangePushDown := plannercore.IsTiFlashContained(stmt.Plan)
telemetry.CurrentExecuteCount.Inc()
if tiFlashPushDown {
telemetry.CurrentTiFlashPushDownCount.Inc()
Expand All @@ -2324,15 +2346,23 @@ func (s *session) preparedStmtExec(ctx context.Context, execStmt *ast.ExecuteStm
}
}
sessionExecuteCompileDurationGeneral.Observe(time.Since(s.sessionVars.StartTime).Seconds())
logGeneralQuery(st, s, true)
logGeneralQuery(stmt, s, true)

if st.PsStmt != nil { // point plan short path
resultSet, err := st.PointGet(ctx)
if stmt.PsStmt != nil { // point plan short path
resultSet, err := stmt.PointGet(ctx)
s.txn.changeToInvalid()
return resultSet, err
}

return runStmt(ctx, s, st)
var recordSet sqlexec.RecordSet

if stmt.PsStmt != nil { // point plan short path
recordSet, err = stmt.PointGet(ctx)
s.txn.changeToInvalid()
} else {
recordSet, err = runStmt(ctx, s, stmt)
}
return recordSet, err
}

// ExecutePreparedStmt executes a prepared statement.
Expand Down Expand Up @@ -2398,7 +2428,6 @@ func (s *session) executePlanCacheStmt(ctx context.Context, stmt *plannercore.Pl
}
}

executor.CountStmtNode(stmt.PreparedAst.Stmt, s.sessionVars.InRestrictedSQL)
s.txn.onStmtStart(stmt.SQLDigest.String())
defer s.txn.onStmtEnd()

Expand Down

0 comments on commit b705a13

Please sign in to comment.