Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

planner: unify the Compile and CompileExecStmt functions #37065

Merged
merged 28 commits into from
Aug 16, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
db49c25
planner: unify the Compile and CompileExecStmt functions
Reminiscent Aug 12, 2022
9f89e0b
Merge branch 'master' of https://github.com/pingcap/tidb into refacto…
Reminiscent Aug 12, 2022
4201813
fix ut
Reminiscent Aug 12, 2022
776e914
Merge branch 'master' of https://github.com/pingcap/tidb into refacto…
Reminiscent Aug 12, 2022
84b9879
Merge branch 'master' into refactor-PC-compile
Reminiscent Aug 15, 2022
8d30c20
fix ut
Reminiscent Aug 15, 2022
ab5c1c7
Merge branch 'master' of https://github.com/pingcap/tidb into refacto…
Reminiscent Aug 15, 2022
27e68fc
Merge remote-tracking branch 'origin/refactor-PC-compile' into refact…
Reminiscent Aug 15, 2022
7f1f5a2
add the short path for PointGet to compiler
Reminiscent Aug 15, 2022
8ea4f0a
git pull upstream master
Reminiscent Aug 15, 2022
2def5dc
fix ut
Reminiscent Aug 15, 2022
7afc42f
Merge branch 'master' of https://github.com/pingcap/tidb into refacto…
Reminiscent Aug 15, 2022
6ebd83a
revert
Reminiscent Aug 15, 2022
5113185
Merge branch 'master' into refactor-PC-compile
qw4990 Aug 16, 2022
369ab78
fix ut
Reminiscent Aug 16, 2022
647e903
Merge branch 'master' of https://github.com/pingcap/tidb into refacto…
Reminiscent Aug 16, 2022
09b8056
Merge remote-tracking branch 'origin/refactor-PC-compile' into refact…
Reminiscent Aug 16, 2022
762f07e
Merge the PointGet short path to general execute path
Reminiscent Aug 16, 2022
276878c
Merge branch 'master' into refactor-PC-compile
Reminiscent Aug 16, 2022
9837405
Revert "Merge the PointGet short path to general execute path"
Reminiscent Aug 16, 2022
7517765
address comment
Reminiscent Aug 16, 2022
f60b21d
Merge branch 'master' of https://github.com/pingcap/tidb into refacto…
Reminiscent Aug 16, 2022
82735e9
Merge remote-tracking branch 'origin/refactor-PC-compile' into refact…
Reminiscent Aug 16, 2022
42c8e7e
Merge branch 'master' into refactor-PC-compile
qw4990 Aug 16, 2022
70167e2
address comment
Reminiscent Aug 16, 2022
8adad8c
Merge branch 'master' of https://github.com/pingcap/tidb into refacto…
Reminiscent Aug 16, 2022
ce5c126
Merge remote-tracking branch 'origin/refactor-PC-compile' into refact…
Reminiscent Aug 16, 2022
aa06119
Merge branch 'master' into refactor-PC-compile
qw4990 Aug 16, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
16 changes: 16 additions & 0 deletions executor/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,22 @@ func (a *ExecStmt) PointGet(ctx context.Context) (*recordSet, error) {
terror.Call(pointExecutor.Close)
return nil, err
}

sctx := a.Ctx
cmd32 := atomic.LoadUint32(&sctx.GetSessionVars().CommandValue)
cmd := byte(cmd32)
var pi processinfoSetter
if raw, ok := sctx.(processinfoSetter); ok {
pi = raw
sql := a.OriginText()
maxExecutionTime := getMaxExecutionTime(sctx)
// Update processinfo, ShowProcess() will use it.
pi.SetProcessInfo(sql, time.Now(), cmd, maxExecutionTime)
if sctx.GetSessionVars().StmtCtx.StmtType == "" {
sctx.GetSessionVars().StmtCtx.StmtType = GetStmtLabel(a.StmtNode)
}
}

Comment on lines +303 to +318
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This part is copied from the ExecStmt.Exec. We should set the process information here or we will failed to get the result for explain for connection ....

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function is short path. It only do the little things.(compare to rumStmt). We will refactor those part in the later PR.

return &recordSet{
executor: pointExecutor,
stmt: a,
Expand Down
39 changes: 35 additions & 4 deletions executor/compiler.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,22 @@ func (c *Compiler) Compile(ctx context.Context, stmtNode ast.StmtNode) (*ExecStm
})

is := sessiontxn.GetTxnManager(c.Ctx).GetTxnInfoSchema()
sessVars := c.Ctx.GetSessionVars()
stmtCtx := sessVars.StmtCtx
// handle the execute statement
var (
pointPlanShortPathOK bool
preparedObj *plannercore.PlanCacheStmt
)

if execStmt, ok := stmtNode.(*ast.ExecuteStmt); ok {
if preparedObj, err = plannercore.GetPreparedStmt(execStmt, sessVars); err != nil {
return nil, err
}
if pointPlanShortPathOK, err = plannercore.IsPointPlanShortPathOK(c.Ctx, is, preparedObj); err != nil {
return nil, err
}
}
finalPlan, names, err := planner.Optimize(ctx, c.Ctx, stmtNode, is)
if err != nil {
return nil, err
Expand All @@ -86,13 +102,14 @@ func (c *Compiler) Compile(ctx context.Context, stmtNode ast.StmtNode) (*ExecStm
staleread.AssertStmtStaleness(c.Ctx, val.(bool))
})

CountStmtNode(stmtNode, c.Ctx.GetSessionVars().InRestrictedSQL)
// TODO: Should we use the Execute statement or the corresponding Prepare statement to record?
CountStmtNode(stmtNode, sessVars.InRestrictedSQL)
var lowerPriority bool
if c.Ctx.GetSessionVars().StmtCtx.Priority == mysql.NoPriority {
lowerPriority = needLowerPriority(finalPlan)
}
c.Ctx.GetSessionVars().StmtCtx.SetPlan(finalPlan)
return &ExecStmt{
stmtCtx.SetPlan(finalPlan)
stmt := &ExecStmt{
GoCtx: ctx,
InfoSchema: is,
Plan: finalPlan,
Expand All @@ -102,7 +119,21 @@ func (c *Compiler) Compile(ctx context.Context, stmtNode ast.StmtNode) (*ExecStm
Ctx: c.Ctx,
OutputNames: names,
Ti: &TelemetryInfo{},
}, nil
}
if pointPlanShortPathOK {
if ep, ok := stmt.Plan.(*plannercore.Execute); ok {
if pointPlan, ok := ep.Plan.(*plannercore.PointGetPlan); ok {
stmtCtx.SetPlan(stmt.Plan)
stmtCtx.SetPlanDigest(preparedObj.NormalizedPlan, preparedObj.PlanDigest)
stmt.Plan = pointPlan
stmt.PsStmt = preparedObj
} else {
// invalid the previous cached point plan
preparedObj.PreparedAst.CachedPlan = nil
}
}
}
return stmt, nil
}

// needLowerPriority checks whether it's needed to lower the execution priority
Expand Down
61 changes: 0 additions & 61 deletions executor/prepared.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,8 @@ package executor
import (
"context"
"math"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/log"
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/infoschema"
Expand All @@ -30,7 +28,6 @@ import (
"github.com/pingcap/tidb/planner"
plannercore "github.com/pingcap/tidb/planner/core"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessiontxn"
"github.com/pingcap/tidb/types"
driver "github.com/pingcap/tidb/types/parser_driver"
"github.com/pingcap/tidb/util"
Expand Down Expand Up @@ -325,61 +322,3 @@ func (e *DeallocateExec) Next(ctx context.Context, req *chunk.Chunk) error {
vars.RemovePreparedStmt(id)
return nil
}

// CompileExecutePreparedStmt compiles a session Execute command to a stmt.Statement.
func CompileExecutePreparedStmt(ctx context.Context, sctx sessionctx.Context,
execStmt *ast.ExecuteStmt, is infoschema.InfoSchema) (*ExecStmt, error) {
startTime := time.Now()
defer func() {
sctx.GetSessionVars().DurationCompile = time.Since(startTime)
}()

preparedObj, err := plannercore.GetPreparedStmt(execStmt, sctx.GetSessionVars())
if err != nil {
return nil, err
}
pointPlanShortPathOK, err := plannercore.IsPointPlanShortPathOK(sctx, is, preparedObj)
if err != nil {
return nil, err
}

execPlan, names, err := planner.Optimize(ctx, sctx, execStmt, is)
if err != nil {
return nil, err
}

failpoint.Inject("assertTxnManagerInCompile", func() {
sessiontxn.RecordAssert(sctx, "assertTxnManagerInCompile", true)
sessiontxn.AssertTxnManagerInfoSchema(sctx, is)
})

stmt := &ExecStmt{
GoCtx: ctx,
InfoSchema: is,
Plan: execPlan,
StmtNode: execStmt,
Ctx: sctx,
OutputNames: names,
Ti: &TelemetryInfo{},
}
stmtCtx := sctx.GetSessionVars().StmtCtx
stmt.Text = preparedObj.PreparedAst.Stmt.Text()
stmtCtx.OriginalSQL = stmt.Text
stmtCtx.InitSQLDigest(preparedObj.NormalizedSQL, preparedObj.SQLDigest)

// handle point plan short path specially
if pointPlanShortPathOK {
if ep, ok := execPlan.(*plannercore.Execute); ok {
if pointPlan, ok := ep.Plan.(*plannercore.PointGetPlan); ok {
stmtCtx.SetPlan(execPlan)
stmtCtx.SetPlanDigest(preparedObj.NormalizedPlan, preparedObj.PlanDigest)
stmt.Plan = pointPlan
stmt.PsStmt = preparedObj
} else {
// invalid the previous cached point plan
preparedObj.PreparedAst.CachedPlan = nil
}
}
}
return stmt, nil
}
8 changes: 3 additions & 5 deletions executor/seqtest/prepared_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (

"github.com/pingcap/tidb/executor"
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/infoschema"
"github.com/pingcap/tidb/metrics"
"github.com/pingcap/tidb/parser/ast"
"github.com/pingcap/tidb/parser/mysql"
Expand Down Expand Up @@ -143,11 +142,10 @@ func TestPrepared(t *testing.T) {
prepStmt, err := tk.Session().GetSessionVars().GetPreparedStmtByID(stmtID)
require.NoError(t, err)
execStmt := &ast.ExecuteStmt{PrepStmt: prepStmt, BinaryArgs: expression.Args2Expressions4Test(1)}
// Check that ast.Statement created by executor.CompileExecutePreparedStmt has query text.
stmt, err := executor.CompileExecutePreparedStmt(context.TODO(), tk.Session(), execStmt,
tk.Session().GetInfoSchema().(infoschema.InfoSchema))
// Check that ast.Statement created by compiler.Compile has query text.
compiler := executor.Compiler{Ctx: tk.Session()}
stmt, err := compiler.Compile(context.TODO(), execStmt)
require.NoError(t, err)
require.Equal(t, query, stmt.OriginText())

// Check that rebuild plan works.
err = tk.Session().PrepareTxnCtx(ctx)
Expand Down
9 changes: 4 additions & 5 deletions session/bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import (
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/executor"
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/infoschema"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/parser/ast"
"github.com/pingcap/tidb/store/mockstore"
Expand Down Expand Up @@ -1714,7 +1713,7 @@ func BenchmarkInsertIntoSelect(b *testing.B) {
b.StopTimer()
}

func BenchmarkCompileExecutePreparedStmt(b *testing.B) {
func BenchmarkCompileStmt(b *testing.B) {
// See issue https://github.com/pingcap/tidb/issues/27633
se, do, st := prepareBenchSession()
defer func() {
Expand Down Expand Up @@ -1815,12 +1814,12 @@ func BenchmarkCompileExecutePreparedStmt(b *testing.B) {
}

args := expression.Args2Expressions4Test(3401544)
is := se.GetInfoSchema()

b.ResetTimer()
stmtExec := &ast.ExecuteStmt{PrepStmt: prepStmt, BinaryArgs: args}
compiler := executor.Compiler{Ctx: se}
for i := 0; i < b.N; i++ {
_, err := executor.CompileExecutePreparedStmt(context.Background(), se, stmtExec, is.(infoschema.InfoSchema))
_, err := compiler.Compile(context.Background(), stmtExec)
if err != nil {
b.Fatal(err)
}
Expand Down Expand Up @@ -1859,6 +1858,6 @@ func TestBenchDaily(t *testing.T) {
BenchmarkHashPartitionPruningPointSelect,
BenchmarkHashPartitionPruningMultiSelect,
BenchmarkInsertIntoSelect,
BenchmarkCompileExecutePreparedStmt,
BenchmarkCompileStmt,
)
}
49 changes: 39 additions & 10 deletions session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -2015,7 +2015,15 @@ func (s *session) ExecuteStmt(ctx context.Context, stmtNode ast.StmtNode) (sqlex

// Execute the physical plan.
logStmt(stmt, s)
recordSet, err := runStmt(ctx, s, stmt)

var recordSet sqlexec.RecordSet
if stmt.PsStmt != nil { // point plan short path
recordSet, err = stmt.PointGet(ctx)
s.txn.changeToInvalid()
} else {
recordSet, err = runStmt(ctx, s, stmt)
}

if err != nil {
if !errIsNoisy(err) {
logutil.Logger(ctx).Warn("run statement failed",
Expand Down Expand Up @@ -2305,16 +2313,30 @@ func (s *session) preparedStmtExec(ctx context.Context, execStmt *ast.ExecuteStm
}
})

is := sessiontxn.GetTxnManager(s).GetTxnInfoSchema()
st, err := executor.CompileExecutePreparedStmt(ctx, s, execStmt, is)
s.sessionVars.StartTime = time.Now()
preparedObj, err := plannercore.GetPreparedStmt(execStmt, s.GetSessionVars())
if err != nil {
return nil, err
}

compiler := executor.Compiler{Ctx: s}
stmt, err := compiler.Compile(ctx, execStmt)
if err == nil {
err = sessiontxn.OptimizeWithPlanAndThenWarmUp(s, st.Plan)
err = sessiontxn.OptimizeWithPlanAndThenWarmUp(s, stmt.Plan)
}
if err != nil {
return nil, err
}
durCompile := time.Since(s.sessionVars.StartTime)
s.GetSessionVars().DurationCompile = durCompile

stmtCtx := s.GetSessionVars().StmtCtx
stmt.Text = preparedObj.PreparedAst.Stmt.Text()
stmtCtx.OriginalSQL = stmt.Text
stmtCtx.InitSQLDigest(preparedObj.NormalizedSQL, preparedObj.SQLDigest)
Comment on lines +2330 to +2336
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This part is copied from the deleted function CompileExecutePreparedStmt.


if !s.isInternal() && config.GetGlobalConfig().EnableTelemetry {
tiFlashPushDown, tiFlashExchangePushDown := plannercore.IsTiFlashContained(st.Plan)
tiFlashPushDown, tiFlashExchangePushDown := plannercore.IsTiFlashContained(stmt.Plan)
telemetry.CurrentExecuteCount.Inc()
if tiFlashPushDown {
telemetry.CurrentTiFlashPushDownCount.Inc()
Expand All @@ -2324,15 +2346,23 @@ func (s *session) preparedStmtExec(ctx context.Context, execStmt *ast.ExecuteStm
}
}
sessionExecuteCompileDurationGeneral.Observe(time.Since(s.sessionVars.StartTime).Seconds())
logGeneralQuery(st, s, true)
logGeneralQuery(stmt, s, true)

if st.PsStmt != nil { // point plan short path
resultSet, err := st.PointGet(ctx)
if stmt.PsStmt != nil { // point plan short path
resultSet, err := stmt.PointGet(ctx)
s.txn.changeToInvalid()
return resultSet, err
}

return runStmt(ctx, s, st)
var recordSet sqlexec.RecordSet

if stmt.PsStmt != nil { // point plan short path
recordSet, err = stmt.PointGet(ctx)
s.txn.changeToInvalid()
} else {
recordSet, err = runStmt(ctx, s, stmt)
}
return recordSet, err
}

// ExecutePreparedStmt executes a prepared statement.
Expand Down Expand Up @@ -2398,7 +2428,6 @@ func (s *session) executePlanCacheStmt(ctx context.Context, stmt *plannercore.Pl
}
}

executor.CountStmtNode(stmt.PreparedAst.Stmt, s.sessionVars.InRestrictedSQL)
s.txn.onStmtStart(stmt.SQLDigest.String())
defer s.txn.onStmtEnd()

Expand Down