Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

executor, planner: add a smaller context for ToPB method #52369

Merged
merged 2 commits into from
Apr 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
6 changes: 3 additions & 3 deletions pkg/executor/distsql.go
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,7 @@ func (e *IndexReaderExecutor) buildKVReq(r []kv.KeyRange) (*kv.Request, error) {
func (e *IndexReaderExecutor) open(ctx context.Context, kvRanges []kv.KeyRange) error {
var err error
if e.corColInFilter {
e.dagPB.Executors, err = builder.ConstructListBasedDistExec(e.Ctx().GetPlanCtx(), e.plans)
e.dagPB.Executors, err = builder.ConstructListBasedDistExec(e.Ctx().GetBuildPBCtx(), e.plans)
if err != nil {
return err
}
Expand Down Expand Up @@ -582,14 +582,14 @@ func (e *IndexLookUpExecutor) open(_ context.Context) error {

var err error
if e.corColInIdxSide {
e.dagPB.Executors, err = builder.ConstructListBasedDistExec(e.Ctx().GetPlanCtx(), e.idxPlans)
e.dagPB.Executors, err = builder.ConstructListBasedDistExec(e.Ctx().GetBuildPBCtx(), e.idxPlans)
if err != nil {
return err
}
}

if e.corColInTblSide {
e.tableRequest.Executors, err = builder.ConstructListBasedDistExec(e.Ctx().GetPlanCtx(), e.tblPlans)
e.tableRequest.Executors, err = builder.ConstructListBasedDistExec(e.Ctx().GetBuildPBCtx(), e.tblPlans)
if err != nil {
return err
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/executor/index_merge_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ func (e *IndexMergeReaderExecutor) Open(_ context.Context) (err error) {
e.keyRanges = make([][]kv.KeyRange, 0, len(e.partialPlans))
e.initRuntimeStats()
if e.isCorColInTableFilter {
e.tableRequest.Executors, err = builder.ConstructListBasedDistExec(e.Ctx().GetPlanCtx(), e.tblPlans)
e.tableRequest.Executors, err = builder.ConstructListBasedDistExec(e.Ctx().GetBuildPBCtx(), e.tblPlans)
if err != nil {
return err
}
Expand Down Expand Up @@ -370,7 +370,7 @@ func (e *IndexMergeReaderExecutor) startPartialIndexWorker(ctx context.Context,
if e.isCorColInPartialFilters[workID] {
// We got correlated column, so need to refresh Selection operator.
var err error
if e.dagPBs[workID].Executors, err = builder.ConstructListBasedDistExec(e.Ctx().GetPlanCtx(), e.partialPlans[workID]); err != nil {
if e.dagPBs[workID].Executors, err = builder.ConstructListBasedDistExec(e.Ctx().GetBuildPBCtx(), e.partialPlans[workID]); err != nil {
syncErr(ctx, e.finished, fetchCh, err)
return
}
Expand Down Expand Up @@ -513,7 +513,7 @@ func (e *IndexMergeReaderExecutor) startPartialTableWorker(ctx context.Context,
}

if e.isCorColInPartialFilters[workID] {
if e.dagPBs[workID].Executors, err = builder.ConstructListBasedDistExec(e.Ctx().GetPlanCtx(), e.partialPlans[workID]); err != nil {
if e.dagPBs[workID].Executors, err = builder.ConstructListBasedDistExec(e.Ctx().GetBuildPBCtx(), e.partialPlans[workID]); err != nil {
syncErr(ctx, e.finished, fetchCh, err)
return
}
Expand Down
8 changes: 4 additions & 4 deletions pkg/executor/internal/builder/builder_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,13 @@ import (
)

// ConstructTreeBasedDistExec constructs tree based DAGRequest
func ConstructTreeBasedDistExec(pctx planctx.PlanContext, p plannercore.PhysicalPlan) ([]*tipb.Executor, error) {
func ConstructTreeBasedDistExec(pctx *planctx.BuildPBContext, p plannercore.PhysicalPlan) ([]*tipb.Executor, error) {
execPB, err := p.ToPB(pctx, kv.TiFlash)
return []*tipb.Executor{execPB}, err
}

// ConstructListBasedDistExec constructs list based DAGRequest
func ConstructListBasedDistExec(pctx planctx.PlanContext, plans []plannercore.PhysicalPlan) ([]*tipb.Executor, error) {
func ConstructListBasedDistExec(pctx *planctx.BuildPBContext, plans []plannercore.PhysicalPlan) ([]*tipb.Executor, error) {
executors := make([]*tipb.Executor, 0, len(plans))
for _, p := range plans {
execPB, err := p.ToPB(pctx, kv.TiKV)
Expand Down Expand Up @@ -60,10 +60,10 @@ func ConstructDAGReq(ctx sessionctx.Context, plans []plannercore.PhysicalPlan, s
}
if storeType == kv.TiFlash {
var executors []*tipb.Executor
executors, err = ConstructTreeBasedDistExec(ctx.GetPlanCtx(), plans[0])
executors, err = ConstructTreeBasedDistExec(ctx.GetBuildPBCtx(), plans[0])
dagReq.RootExecutor = executors[0]
} else {
dagReq.Executors, err = ConstructListBasedDistExec(ctx.GetPlanCtx(), plans)
dagReq.Executors, err = ConstructListBasedDistExec(ctx.GetBuildPBCtx(), plans)
}

distsql.SetEncodeType(ctx.GetDistSQLCtx(), dagReq)
Expand Down
14 changes: 8 additions & 6 deletions pkg/executor/table_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,10 +77,11 @@ type kvRangeBuilder interface {

// tableReaderExecutorContext is the execution context for the `TableReaderExecutor`
type tableReaderExecutorContext struct {
dctx *distsqlctx.DistSQLContext
rctx *rangerctx.RangerContext
pctx planctx.PlanContext
ectx exprctx.BuildContext
dctx *distsqlctx.DistSQLContext
rctx *rangerctx.RangerContext
buildPBCtx *planctx.BuildPBContext
pctx planctx.PlanContext
ectx exprctx.BuildContext

stmtMemTracker *memory.Tracker

Expand Down Expand Up @@ -120,6 +121,7 @@ func newTableReaderExecutorContext(sctx sessionctx.Context) tableReaderExecutorC
return tableReaderExecutorContext{
dctx: sctx.GetDistSQLCtx(),
rctx: pctx.GetRangerCtx(),
buildPBCtx: pctx.GetBuildPBCtx(),
pctx: pctx,
ectx: sctx.GetExprCtx(),
stmtMemTracker: sctx.GetSessionVars().StmtCtx.MemTracker,
Expand Down Expand Up @@ -232,13 +234,13 @@ func (e *TableReaderExecutor) Open(ctx context.Context) error {
if e.corColInFilter {
// If there's correlated column in filter, need to rewrite dagPB
if e.storeType == kv.TiFlash {
execs, err := builder.ConstructTreeBasedDistExec(e.pctx, e.tablePlan)
execs, err := builder.ConstructTreeBasedDistExec(e.buildPBCtx, e.tablePlan)
if err != nil {
return err
}
e.dagPB.RootExecutor = execs[0]
} else {
e.dagPB.Executors, err = builder.ConstructListBasedDistExec(e.pctx, e.plans)
e.dagPB.Executors, err = builder.ConstructListBasedDistExec(e.buildPBCtx, e.plans)
if err != nil {
return err
}
Expand Down
29 changes: 29 additions & 0 deletions pkg/planner/context/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ type PlanContext interface {
AdviseTxnWarmup() error
// GetRangerCtx returns the context used in `ranger` functions
GetRangerCtx() *rangerctx.RangerContext
// GetBuildPBCtx returns the context used in `ToPB` method.
GetBuildPBCtx() *BuildPBContext
}

// EmptyPlanContextExtended is used to provide some empty implementations for PlanContext.
Expand All @@ -75,3 +77,30 @@ type EmptyPlanContextExtended struct{}

// AdviseTxnWarmup advises the txn to warm up.
func (EmptyPlanContextExtended) AdviseTxnWarmup() error { return nil }

// BuildPBContext is used to build the `*tipb.Executor` according to the plan.
type BuildPBContext struct {
ExprCtx exprctx.BuildContext
Client kv.Client

TiFlashFastScan bool
TiFlashFineGrainedShuffleBatchSize uint64

// the following fields are used to build `expression.PushDownContext`.
// TODO: it'd be better to embed `expression.PushDownContext` in `BuildPBContext`. But `expression` already
// depends on this package, so we need to move `expression.PushDownContext` to a standalone package first.
GroupConcatMaxLen uint64
InExplainStmt bool
AppendWarning func(err error)
AppendExtraWarning func(err error)
}

// GetExprCtx returns the expression context.
func (b *BuildPBContext) GetExprCtx() exprctx.BuildContext {
return b.ExprCtx
}

// GetClient returns the kv client.
func (b *BuildPBContext) GetClient() kv.Client {
return b.Client
}
2 changes: 1 addition & 1 deletion pkg/planner/core/physical_plan_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -506,7 +506,7 @@ func TestPhysicalTableScanExtractCorrelatedCols(t *testing.T) {
core.PushedDown(sel, ts, []expression.Expression{selected}, 0.1)
}

pb, err := ts.ToPB(tk.Session().GetPlanCtx(), kv.TiFlash)
pb, err := ts.ToPB(tk.Session().GetBuildPBCtx(), kv.TiFlash)
require.NoError(t, err)
// make sure the pushed down filter condition is correct
require.Equal(t, 1, len(pb.TblScan.PushedDownFilterConditions))
Expand Down
5 changes: 4 additions & 1 deletion pkg/planner/core/plan.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@ import (
// PlanContext is the context for building plan.
type PlanContext = context.PlanContext

// BuildPBContext is the context for building `*tipb.Executor`.
type BuildPBContext = context.BuildPBContext

// AsSctx converts PlanContext to sessionctx.Context.
func AsSctx(pctx PlanContext) (sessionctx.Context, error) {
sctx, ok := pctx.(sessionctx.Context)
Expand Down Expand Up @@ -371,7 +374,7 @@ type PhysicalPlan interface {
attach2Task(...task) task

// ToPB converts physical plan to tipb executor.
ToPB(ctx PlanContext, storeType kv.StoreType) (*tipb.Executor, error)
ToPB(ctx *BuildPBContext, storeType kv.StoreType) (*tipb.Executor, error)

// GetChildReqProps gets the required property by child index.
GetChildReqProps(idx int) *property.PhysicalProperty
Expand Down