Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

planner: move base plan related output of core pkg and make it well-pkged #52529

Merged
merged 8 commits into from
Apr 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 2 additions & 0 deletions pkg/executor/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,7 @@ go_library(
"//pkg/planner/cardinality",
"//pkg/planner/context",
"//pkg/planner/core",
"//pkg/planner/core/base",
"//pkg/planner/util",
"//pkg/planner/util/fixcontrol",
"//pkg/plugin",
Expand Down Expand Up @@ -410,6 +411,7 @@ go_test(
"//pkg/parser/terror",
"//pkg/planner",
"//pkg/planner/core",
"//pkg/planner/core/base",
"//pkg/planner/property",
"//pkg/planner/util",
"//pkg/server",
Expand Down
11 changes: 6 additions & 5 deletions pkg/executor/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ import (
"github.com/pingcap/tidb/pkg/parser/terror"
"github.com/pingcap/tidb/pkg/planner"
plannercore "github.com/pingcap/tidb/pkg/planner/core"
"github.com/pingcap/tidb/pkg/planner/core/base"
"github.com/pingcap/tidb/pkg/plugin"
"github.com/pingcap/tidb/pkg/sessionctx"
"github.com/pingcap/tidb/pkg/sessionctx/sessionstates"
Expand Down Expand Up @@ -221,7 +222,7 @@ type ExecStmt struct {
// InfoSchema stores a reference to the schema information.
InfoSchema infoschema.InfoSchema
// Plan stores a reference to the final physical plan.
Plan plannercore.Plan
Plan base.Plan
// Text represents the origin query text.
Text string

Expand Down Expand Up @@ -393,7 +394,7 @@ func (a *ExecStmt) RebuildPlan(ctx context.Context) (int64, error) {
}

// IsFastPlan exports for testing.
func IsFastPlan(p plannercore.Plan) bool {
func IsFastPlan(p base.Plan) bool {
if proj, ok := p.(*plannercore.PhysicalProjection); ok {
p = proj.Children()[0]
}
Expand Down Expand Up @@ -802,7 +803,7 @@ func (a *ExecStmt) handleNoDelay(ctx context.Context, e exec.Executor, isPessimi
return false, nil, nil
}

func isNoResultPlan(p plannercore.Plan) bool {
func isNoResultPlan(p base.Plan) bool {
if p.Schema().Len() == 0 {
return true
}
Expand Down Expand Up @@ -1707,7 +1708,7 @@ func collectWarningsForSlowLog(stmtCtx *stmtctx.StatementContext) []variable.JSO
}

// GetResultRowsCount gets the count of the statement result rows.
func GetResultRowsCount(stmtCtx *stmtctx.StatementContext, p plannercore.Plan) int64 {
func GetResultRowsCount(stmtCtx *stmtctx.StatementContext, p base.Plan) int64 {
runtimeStatsColl := stmtCtx.RuntimeStatsColl
if runtimeStatsColl == nil {
return 0
Expand All @@ -1731,7 +1732,7 @@ func getFlatPlan(stmtCtx *stmtctx.StatementContext) *plannercore.FlatPhysicalPla
f := flat.(*plannercore.FlatPhysicalPlan)
return f
}
p := pp.(plannercore.Plan)
p := pp.(base.Plan)
flat := plannercore.FlattenPhysicalPlan(p, false)
if flat != nil {
stmtCtx.SetFlatPlan(flat)
Expand Down
19 changes: 10 additions & 9 deletions pkg/executor/benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
"github.com/pingcap/tidb/pkg/parser/ast"
"github.com/pingcap/tidb/pkg/parser/mysql"
"github.com/pingcap/tidb/pkg/planner/core"
"github.com/pingcap/tidb/pkg/planner/core/base"
"github.com/pingcap/tidb/pkg/planner/property"
"github.com/pingcap/tidb/pkg/planner/util"
"github.com/pingcap/tidb/pkg/sessionctx"
Expand All @@ -51,7 +52,7 @@ import (

var (
_ exec.Executor = &testutil.MockDataSource{}
_ core.PhysicalPlan = &testutil.MockDataPhysicalPlan{}
_ base.PhysicalPlan = &testutil.MockDataPhysicalPlan{}
wideString = strings.Repeat("x", 5*1024)
)

Expand Down Expand Up @@ -80,7 +81,7 @@ func buildStreamAggExecutor(ctx sessionctx.Context, srcExec exec.Executor, schem
sg.SetSchema(schema)
sg.Init(ctx.GetPlanCtx(), nil, 0)

var tail core.PhysicalPlan = sg
var tail base.PhysicalPlan = sg
// if data source is not sorted, we have to attach sort, to make the input of stream-agg sorted
if !dataSourceSorted {
byItems := make([]*util.ByItems, 0, len(sg.GroupByItems))
Expand All @@ -96,7 +97,7 @@ func buildStreamAggExecutor(ctx sessionctx.Context, srcExec exec.Executor, schem
}

var (
plan core.PhysicalPlan
plan base.PhysicalPlan
splitter core.PartitionSplitterType = core.PartitionHashSplitterType
)
if concurrency > 1 {
Expand All @@ -105,8 +106,8 @@ func buildStreamAggExecutor(ctx sessionctx.Context, srcExec exec.Executor, schem
}
plan = core.PhysicalShuffle{
Concurrency: concurrency,
Tails: []core.PhysicalPlan{tail},
DataSources: []core.PhysicalPlan{src},
Tails: []base.PhysicalPlan{tail},
DataSources: []base.PhysicalPlan{src},
SplitterType: splitter,
ByItemArrays: [][]expression.Expression{sg.GroupByItems},
}.Init(ctx.GetPlanCtx(), nil, 0)
Expand Down Expand Up @@ -315,7 +316,7 @@ func buildWindowExecutor(ctx sessionctx.Context, windowFunc string, funcs int, f
win.SetSchema(winSchema)
win.Init(ctx.GetPlanCtx(), nil, 0)

var tail core.PhysicalPlan = win
var tail base.PhysicalPlan = win
if !dataSourceSorted {
byItems := make([]*util.ByItems, 0, len(partitionBy))
for _, col := range partitionBy {
Expand All @@ -329,7 +330,7 @@ func buildWindowExecutor(ctx sessionctx.Context, windowFunc string, funcs int, f
win.SetChildren(src)
}

var plan core.PhysicalPlan
var plan base.PhysicalPlan
if concurrency > 1 {
byItems := make([]expression.Expression, 0, len(win.PartitionBy))
for _, item := range win.PartitionBy {
Expand All @@ -338,8 +339,8 @@ func buildWindowExecutor(ctx sessionctx.Context, windowFunc string, funcs int, f

plan = core.PhysicalShuffle{
Concurrency: concurrency,
Tails: []core.PhysicalPlan{tail},
DataSources: []core.PhysicalPlan{src},
Tails: []base.PhysicalPlan{tail},
DataSources: []base.PhysicalPlan{src},
SplitterType: core.PartitionHashSplitterType,
ByItemArrays: [][]expression.Expression{byItems},
}.Init(ctx.GetPlanCtx(), nil, 0)
Expand Down
27 changes: 14 additions & 13 deletions pkg/executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ import (
"github.com/pingcap/tidb/pkg/parser/mysql"
"github.com/pingcap/tidb/pkg/parser/terror"
plannercore "github.com/pingcap/tidb/pkg/planner/core"
"github.com/pingcap/tidb/pkg/planner/core/base"
plannerutil "github.com/pingcap/tidb/pkg/planner/util"
"github.com/pingcap/tidb/pkg/sessionctx"
"github.com/pingcap/tidb/pkg/sessionctx/stmtctx"
Expand Down Expand Up @@ -150,11 +151,11 @@ func NewMockExecutorBuilderForTest(ctx sessionctx.Context, is infoschema.InfoSch
}

// Build builds an executor tree according to `p`.
func (b *MockExecutorBuilder) Build(p plannercore.Plan) exec.Executor {
func (b *MockExecutorBuilder) Build(p base.Plan) exec.Executor {
return b.build(p)
}

func (b *executorBuilder) build(p plannercore.Plan) exec.Executor {
func (b *executorBuilder) build(p base.Plan) exec.Executor {
switch v := p.(type) {
case nil:
return nil
Expand Down Expand Up @@ -2160,8 +2161,8 @@ func (b *executorBuilder) buildTopN(v *plannercore.PhysicalTopN) exec.Executor {

func (b *executorBuilder) buildApply(v *plannercore.PhysicalApply) exec.Executor {
var (
innerPlan plannercore.PhysicalPlan
outerPlan plannercore.PhysicalPlan
innerPlan base.PhysicalPlan
outerPlan base.PhysicalPlan
)
if v.InnerChildIdx == 0 {
innerPlan = v.Children()[0]
Expand Down Expand Up @@ -2865,7 +2866,7 @@ func markChildrenUsedCols(outputCols []*expression.Column, childSchemas ...*expr
return
}

func (*executorBuilder) corColInDistPlan(plans []plannercore.PhysicalPlan) bool {
func (*executorBuilder) corColInDistPlan(plans []base.PhysicalPlan) bool {
for _, p := range plans {
switch x := p.(type) {
case *plannercore.PhysicalSelection:
Expand All @@ -2892,7 +2893,7 @@ func (*executorBuilder) corColInDistPlan(plans []plannercore.PhysicalPlan) bool
}

// corColInAccess checks whether there's correlated column in access conditions.
func (*executorBuilder) corColInAccess(p plannercore.PhysicalPlan) bool {
func (*executorBuilder) corColInAccess(p base.PhysicalPlan) bool {
var access []expression.Expression
switch x := p.(type) {
case *plannercore.PhysicalTableScan:
Expand All @@ -2908,7 +2909,7 @@ func (*executorBuilder) corColInAccess(p plannercore.PhysicalPlan) bool {
return false
}

func (b *executorBuilder) newDataReaderBuilder(p plannercore.PhysicalPlan) (*dataReaderBuilder, error) {
func (b *executorBuilder) newDataReaderBuilder(p base.PhysicalPlan) (*dataReaderBuilder, error) {
ts, err := b.getSnapshotTS()
if err != nil {
return nil, err
Expand Down Expand Up @@ -3183,7 +3184,7 @@ func (b *executorBuilder) buildIndexNestedLoopHashJoin(v *plannercore.PhysicalIn
func buildNoRangeTableReader(b *executorBuilder, v *plannercore.PhysicalTableReader) (*TableReaderExecutor, error) {
tablePlans := v.TablePlans
if v.StoreType == kv.TiFlash {
tablePlans = []plannercore.PhysicalPlan{v.GetTablePlan()}
tablePlans = []base.PhysicalPlan{v.GetTablePlan()}
}
dagReq, err := builder.ConstructDAGReq(b.ctx, tablePlans, v.StoreType)
if err != nil {
Expand Down Expand Up @@ -3644,7 +3645,7 @@ func (b *executorBuilder) buildIndexReader(v *plannercore.PhysicalIndexReader) e
return ret
}

func buildTableReq(b *executorBuilder, schemaLen int, plans []plannercore.PhysicalPlan) (dagReq *tipb.DAGRequest, val table.Table, err error) {
func buildTableReq(b *executorBuilder, schemaLen int, plans []base.PhysicalPlan) (dagReq *tipb.DAGRequest, val table.Table, err error) {
tableReq, err := builder.ConstructDAGReq(b.ctx, plans, kv.TiKV)
if err != nil {
return nil, nil, err
Expand All @@ -3665,7 +3666,7 @@ func buildTableReq(b *executorBuilder, schemaLen int, plans []plannercore.Physic
// buildIndexReq is designed to create a DAG for index request.
// If len(ByItems) != 0 means index request should return related columns
// to sort result rows in TiDB side for parition tables.
func buildIndexReq(ctx sessionctx.Context, columns []*model.IndexColumn, handleLen int, plans []plannercore.PhysicalPlan) (dagReq *tipb.DAGRequest, err error) {
func buildIndexReq(ctx sessionctx.Context, columns []*model.IndexColumn, handleLen int, plans []base.PhysicalPlan) (dagReq *tipb.DAGRequest, err error) {
indexReq, err := builder.ConstructDAGReq(ctx, plans, kv.TiKV)
if err != nil {
return nil, err
Expand Down Expand Up @@ -4001,7 +4002,7 @@ func (b *executorBuilder) buildIndexMergeReader(v *plannercore.PhysicalIndexMerg
// 1. dataReaderBuilder calculate data range from argument, rather than plan.
// 2. the result executor is already opened.
type dataReaderBuilder struct {
plan plannercore.Plan
plan base.Plan
*executorBuilder

selectResultHook // for testing
Expand All @@ -4013,7 +4014,7 @@ type dataReaderBuilder struct {
}

type mockPhysicalIndexReader struct {
plannercore.PhysicalPlan
base.PhysicalPlan

e exec.Executor
}
Expand All @@ -4028,7 +4029,7 @@ func (builder *dataReaderBuilder) buildExecutorForIndexJoin(ctx context.Context,
return builder.buildExecutorForIndexJoinInternal(ctx, builder.plan, lookUpContents, indexRanges, keyOff2IdxOff, cwc, canReorderHandles, memTracker, interruptSignal)
}

func (builder *dataReaderBuilder) buildExecutorForIndexJoinInternal(ctx context.Context, plan plannercore.Plan, lookUpContents []*indexJoinLookUpContent,
func (builder *dataReaderBuilder) buildExecutorForIndexJoinInternal(ctx context.Context, plan base.Plan, lookUpContents []*indexJoinLookUpContent,
indexRanges []*ranger.Range, keyOff2IdxOff []int, cwc *plannercore.ColWithCmpFuncManager, canReorderHandles bool, memTracker *memory.Tracker, interruptSignal *atomic.Value) (exec.Executor, error) {
switch v := plan.(type) {
case *plannercore.PhysicalTableReader:
Expand Down
7 changes: 4 additions & 3 deletions pkg/executor/compiler.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/pingcap/tidb/pkg/parser/mysql"
"github.com/pingcap/tidb/pkg/planner"
plannercore "github.com/pingcap/tidb/pkg/planner/core"
"github.com/pingcap/tidb/pkg/planner/core/base"
"github.com/pingcap/tidb/pkg/sessionctx"
"github.com/pingcap/tidb/pkg/sessiontxn"
"github.com/pingcap/tidb/pkg/sessiontxn/staleread"
Expand Down Expand Up @@ -158,9 +159,9 @@ func (c *Compiler) Compile(ctx context.Context, stmtNode ast.StmtNode) (_ *ExecS
// If the estimated output row count of any operator in the physical plan tree
// is greater than the specific threshold, we'll set it to lowPriority when
// sending it to the coprocessor.
func needLowerPriority(p plannercore.Plan) bool {
func needLowerPriority(p base.Plan) bool {
switch x := p.(type) {
case plannercore.PhysicalPlan:
case base.PhysicalPlan:
return isPhysicalPlanNeedLowerPriority(x)
case *plannercore.Execute:
return needLowerPriority(x.Plan)
Expand All @@ -180,7 +181,7 @@ func needLowerPriority(p plannercore.Plan) bool {
return false
}

func isPhysicalPlanNeedLowerPriority(p plannercore.PhysicalPlan) bool {
func isPhysicalPlanNeedLowerPriority(p base.PhysicalPlan) bool {
expensiveThreshold := int64(config.GetGlobalConfig().Log.ExpensiveThreshold)
if int64(p.StatsCount()) > expensiveThreshold {
return true
Expand Down
9 changes: 5 additions & 4 deletions pkg/executor/distsql.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
"github.com/pingcap/tidb/pkg/parser/mysql"
"github.com/pingcap/tidb/pkg/parser/terror"
plannercore "github.com/pingcap/tidb/pkg/planner/core"
"github.com/pingcap/tidb/pkg/planner/core/base"
plannerutil "github.com/pingcap/tidb/pkg/planner/util"
"github.com/pingcap/tidb/pkg/sessionctx"
"github.com/pingcap/tidb/pkg/table"
Expand Down Expand Up @@ -204,7 +205,7 @@ type IndexReaderExecutor struct {
corColInAccess bool
idxCols []*expression.Column
colLens []int
plans []plannercore.PhysicalPlan
plans []base.PhysicalPlan

memTracker *memory.Tracker

Expand Down Expand Up @@ -470,8 +471,8 @@ type IndexLookUpExecutor struct {
corColInIdxSide bool
corColInTblSide bool
corColInAccess bool
idxPlans []plannercore.PhysicalPlan
tblPlans []plannercore.PhysicalPlan
idxPlans []base.PhysicalPlan
tblPlans []base.PhysicalPlan
idxCols []*expression.Column
colLens []int
// PushedLimit is used to skip the preceding and tailing handles when Limit is sunk into IndexLookUpReader.
Expand Down Expand Up @@ -1594,7 +1595,7 @@ func GetLackHandles(expectedHandles []kv.Handle, obtainedHandlesMap *kv.HandleMa
return diffHandles
}

func getPhysicalPlanIDs(plans []plannercore.PhysicalPlan) []int {
func getPhysicalPlanIDs(plans []base.PhysicalPlan) []int {
planIDs := make([]int, 0, len(plans))
for _, p := range plans {
planIDs = append(planIDs, p.ID())
Expand Down
3 changes: 2 additions & 1 deletion pkg/executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ import (
"github.com/pingcap/tidb/pkg/parser/terror"
planctx "github.com/pingcap/tidb/pkg/planner/context"
plannercore "github.com/pingcap/tidb/pkg/planner/core"
"github.com/pingcap/tidb/pkg/planner/core/base"
"github.com/pingcap/tidb/pkg/planner/util/fixcontrol"
"github.com/pingcap/tidb/pkg/privilege"
"github.com/pingcap/tidb/pkg/resourcemanager/pool/workerpool"
Expand Down Expand Up @@ -1452,7 +1453,7 @@ func init() {
// While doing optimization in the plan package, we need to execute uncorrelated subquery,
// but the plan package cannot import the executor package because of the dependency cycle.
// So we assign a function implemented in the executor package to the plan package to avoid the dependency cycle.
plannercore.EvalSubqueryFirstRow = func(ctx context.Context, p plannercore.PhysicalPlan, is infoschema.InfoSchema, pctx planctx.PlanContext) ([]types.Datum, error) {
plannercore.EvalSubqueryFirstRow = func(ctx context.Context, p base.PhysicalPlan, is infoschema.InfoSchema, pctx planctx.PlanContext) ([]types.Datum, error) {
defer func(begin time.Time) {
s := pctx.GetSessionVars()
s.StmtCtx.SetSkipPlanCache(errors.NewNoStackError("query has uncorrelated sub-queries is un-cacheable"))
Expand Down
3 changes: 2 additions & 1 deletion pkg/executor/foreign_key.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/pingcap/tidb/pkg/parser/model"
"github.com/pingcap/tidb/pkg/planner"
plannercore "github.com/pingcap/tidb/pkg/planner/core"
"github.com/pingcap/tidb/pkg/planner/core/base"
"github.com/pingcap/tidb/pkg/sessionctx"
"github.com/pingcap/tidb/pkg/sessionctx/stmtctx"
"github.com/pingcap/tidb/pkg/table"
Expand Down Expand Up @@ -719,7 +720,7 @@ func (fkc *FKCascadeExec) buildExecutor(ctx context.Context) (exec.Executor, err
// this is to avoid performance issue, see: https://github.com/pingcap/tidb/issues/38631
var maxHandleFKValueInOneCascade = 1024

func (fkc *FKCascadeExec) buildFKCascadePlan(ctx context.Context) (plannercore.Plan, error) {
func (fkc *FKCascadeExec) buildFKCascadePlan(ctx context.Context) (base.Plan, error) {
if len(fkc.fkValues) == 0 && len(fkc.fkUpdatedValuesMap) == 0 {
return nil, nil
}
Expand Down
1 change: 1 addition & 0 deletions pkg/executor/importer/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ go_test(
"//pkg/parser/model",
"//pkg/parser/mysql",
"//pkg/planner/core",
"//pkg/planner/core/base",
"//pkg/planner/util",
"//pkg/session",
"//pkg/sessionctx/variable",
Expand Down
3 changes: 2 additions & 1 deletion pkg/executor/importer/importer_testkit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
"github.com/pingcap/tidb/pkg/parser/ast"
"github.com/pingcap/tidb/pkg/parser/model"
plannercore "github.com/pingcap/tidb/pkg/planner/core"
"github.com/pingcap/tidb/pkg/planner/core/base"
"github.com/pingcap/tidb/pkg/session"
"github.com/pingcap/tidb/pkg/sessionctx/variable"
"github.com/pingcap/tidb/pkg/testkit"
Expand Down Expand Up @@ -260,7 +261,7 @@ func getTableImporter(ctx context.Context, t *testing.T, store kv.Storage, table
require.True(t, ok)
table, err := do.InfoSchema().TableByName(model.NewCIStr("test"), model.NewCIStr(tableName))
require.NoError(t, err)
var selectPlan plannercore.PhysicalPlan
var selectPlan base.PhysicalPlan
if path == "" {
selectPlan = &plannercore.PhysicalSelection{}
}
Expand Down