Skip to content

Commit

Permalink
add a smaller context for ToPB
Browse files Browse the repository at this point in the history
Signed-off-by: Yang Keao <yangkeao@chunibyo.icu>
  • Loading branch information
YangKeao committed Apr 7, 2024
1 parent d2eb902 commit d51e0dc
Show file tree
Hide file tree
Showing 11 changed files with 108 additions and 51 deletions.
6 changes: 3 additions & 3 deletions pkg/executor/distsql.go
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,7 @@ func (e *IndexReaderExecutor) buildKVReq(r []kv.KeyRange) (*kv.Request, error) {
func (e *IndexReaderExecutor) open(ctx context.Context, kvRanges []kv.KeyRange) error {
var err error
if e.corColInFilter {
e.dagPB.Executors, err = builder.ConstructListBasedDistExec(e.Ctx().GetPlanCtx(), e.plans)
e.dagPB.Executors, err = builder.ConstructListBasedDistExec(plannercore.GetBuildPBCtx(e.Ctx().GetPlanCtx()), e.plans)
if err != nil {
return err
}
Expand Down Expand Up @@ -582,14 +582,14 @@ func (e *IndexLookUpExecutor) open(_ context.Context) error {

var err error
if e.corColInIdxSide {
e.dagPB.Executors, err = builder.ConstructListBasedDistExec(e.Ctx().GetPlanCtx(), e.idxPlans)
e.dagPB.Executors, err = builder.ConstructListBasedDistExec(plannercore.GetBuildPBCtx(e.Ctx().GetPlanCtx()), e.idxPlans)
if err != nil {
return err
}
}

if e.corColInTblSide {
e.tableRequest.Executors, err = builder.ConstructListBasedDistExec(e.Ctx().GetPlanCtx(), e.tblPlans)
e.tableRequest.Executors, err = builder.ConstructListBasedDistExec(plannercore.GetBuildPBCtx(e.Ctx().GetPlanCtx()), e.tblPlans)
if err != nil {
return err
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/executor/index_merge_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ func (e *IndexMergeReaderExecutor) Open(_ context.Context) (err error) {
e.keyRanges = make([][]kv.KeyRange, 0, len(e.partialPlans))
e.initRuntimeStats()
if e.isCorColInTableFilter {
e.tableRequest.Executors, err = builder.ConstructListBasedDistExec(e.Ctx().GetPlanCtx(), e.tblPlans)
e.tableRequest.Executors, err = builder.ConstructListBasedDistExec(plannercore.GetBuildPBCtx(e.Ctx().GetPlanCtx()), e.tblPlans)
if err != nil {
return err
}
Expand Down Expand Up @@ -370,7 +370,7 @@ func (e *IndexMergeReaderExecutor) startPartialIndexWorker(ctx context.Context,
if e.isCorColInPartialFilters[workID] {
// We got correlated column, so need to refresh Selection operator.
var err error
if e.dagPBs[workID].Executors, err = builder.ConstructListBasedDistExec(e.Ctx().GetPlanCtx(), e.partialPlans[workID]); err != nil {
if e.dagPBs[workID].Executors, err = builder.ConstructListBasedDistExec(plannercore.GetBuildPBCtx(e.Ctx().GetPlanCtx()), e.partialPlans[workID]); err != nil {
syncErr(ctx, e.finished, fetchCh, err)
return
}
Expand Down Expand Up @@ -513,7 +513,7 @@ func (e *IndexMergeReaderExecutor) startPartialTableWorker(ctx context.Context,
}

if e.isCorColInPartialFilters[workID] {
if e.dagPBs[workID].Executors, err = builder.ConstructListBasedDistExec(e.Ctx().GetPlanCtx(), e.partialPlans[workID]); err != nil {
if e.dagPBs[workID].Executors, err = builder.ConstructListBasedDistExec(plannercore.GetBuildPBCtx(e.Ctx().GetPlanCtx()), e.partialPlans[workID]); err != nil {
syncErr(ctx, e.finished, fetchCh, err)
return
}
Expand Down
8 changes: 4 additions & 4 deletions pkg/executor/internal/builder/builder_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,13 @@ import (
)

// ConstructTreeBasedDistExec constructs tree based DAGRequest
func ConstructTreeBasedDistExec(pctx planctx.PlanContext, p plannercore.PhysicalPlan) ([]*tipb.Executor, error) {
func ConstructTreeBasedDistExec(pctx *planctx.BuildPBContext, p plannercore.PhysicalPlan) ([]*tipb.Executor, error) {
execPB, err := p.ToPB(pctx, kv.TiFlash)
return []*tipb.Executor{execPB}, err
}

// ConstructListBasedDistExec constructs list based DAGRequest
func ConstructListBasedDistExec(pctx planctx.PlanContext, plans []plannercore.PhysicalPlan) ([]*tipb.Executor, error) {
func ConstructListBasedDistExec(pctx *planctx.BuildPBContext, plans []plannercore.PhysicalPlan) ([]*tipb.Executor, error) {
executors := make([]*tipb.Executor, 0, len(plans))
for _, p := range plans {
execPB, err := p.ToPB(pctx, kv.TiKV)
Expand Down Expand Up @@ -60,10 +60,10 @@ func ConstructDAGReq(ctx sessionctx.Context, plans []plannercore.PhysicalPlan, s
}
if storeType == kv.TiFlash {
var executors []*tipb.Executor
executors, err = ConstructTreeBasedDistExec(ctx.GetPlanCtx(), plans[0])
executors, err = ConstructTreeBasedDistExec(plannercore.GetBuildPBCtx(ctx.GetPlanCtx()), plans[0])
dagReq.RootExecutor = executors[0]
} else {
dagReq.Executors, err = ConstructListBasedDistExec(ctx.GetPlanCtx(), plans)
dagReq.Executors, err = ConstructListBasedDistExec(plannercore.GetBuildPBCtx(ctx.GetPlanCtx()), plans)
}

distsql.SetEncodeType(ctx.GetDistSQLCtx(), dagReq)
Expand Down
15 changes: 9 additions & 6 deletions pkg/executor/table_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,10 @@ type kvRangeBuilder interface {

// tableReaderExecutorContext is the execution context for the `TableReaderExecutor`
type tableReaderExecutorContext struct {
dctx *distsqlctx.DistSQLContext
pctx planctx.PlanContext
ectx exprctx.BuildContext
dctx *distsqlctx.DistSQLContext
buildPBCtx *planctx.BuildPBContext
pctx planctx.PlanContext
ectx exprctx.BuildContext

getDDLOwner func(context.Context) (*infosync.ServerInfo, error)
}
Expand Down Expand Up @@ -117,9 +118,11 @@ func newTableReaderExecutorContext(sctx sessionctx.Context) tableReaderExecutorC
}
}

pctx := sctx.GetPlanCtx()
return tableReaderExecutorContext{
dctx: sctx.GetDistSQLCtx(),
pctx: sctx.GetPlanCtx(),
buildPBCtx: plannercore.GetBuildPBCtx(pctx),
pctx: pctx,
ectx: sctx.GetExprCtx(),
getDDLOwner: getDDLOwner,
}
Expand Down Expand Up @@ -230,13 +233,13 @@ func (e *TableReaderExecutor) Open(ctx context.Context) error {
if e.corColInFilter {
// If there's correlated column in filter, need to rewrite dagPB
if e.storeType == kv.TiFlash {
execs, err := builder.ConstructTreeBasedDistExec(e.pctx, e.tablePlan)
execs, err := builder.ConstructTreeBasedDistExec(e.buildPBCtx, e.tablePlan)
if err != nil {
return err
}
e.dagPB.RootExecutor = execs[0]
} else {
e.dagPB.Executors, err = builder.ConstructListBasedDistExec(e.pctx, e.plans)
e.dagPB.Executors, err = builder.ConstructListBasedDistExec(e.buildPBCtx, e.plans)
if err != nil {
return err
}
Expand Down
27 changes: 27 additions & 0 deletions pkg/planner/context/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,3 +72,30 @@ type EmptyPlanContextExtended struct{}

// AdviseTxnWarmup advises the txn to warm up.
func (EmptyPlanContextExtended) AdviseTxnWarmup() error { return nil }

// BuildPBContext is used to build the `*tipb.Executor` according to the plan.
type BuildPBContext struct {
ExprCtx exprctx.BuildContext
Client kv.Client

TiFlashFastScan bool
TiFlashFineGrainedShuffleBatchSize uint64

// the following fields are used to build `expression.PushDownContext`.
// TODO: it'd be better to embed `expression.PushDownContext` in `BuildPBContext`. But `expression` already
// depends on this package, so we need to move `expression.PushDownContext` to a standalone package first.
GroupConcatMaxLen uint64
InExplainStmt bool
AppendWarning func(err error)
AppendExtraWarning func(err error)
}

// GetExprCtx returns the expression context.
func (b *BuildPBContext) GetExprCtx() exprctx.BuildContext {
return b.ExprCtx
}

// GetExprCtx returns the kv client.
func (b *BuildPBContext) GetClient() kv.Client {
return b.Client
}
2 changes: 1 addition & 1 deletion pkg/planner/core/physical_plan_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -506,7 +506,7 @@ func TestPhysicalTableScanExtractCorrelatedCols(t *testing.T) {
core.PushedDown(sel, ts, []expression.Expression{selected}, 0.1)
}

pb, err := ts.ToPB(tk.Session().GetPlanCtx(), kv.TiFlash)
pb, err := ts.ToPB(core.GetBuildPBCtx(tk.Session().GetPlanCtx()), kv.TiFlash)
require.NoError(t, err)
// make sure the pushed down filter condition is correct
require.Equal(t, 1, len(pb.TblScan.PushedDownFilterConditions))
Expand Down
5 changes: 4 additions & 1 deletion pkg/planner/core/plan.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@ import (
// PlanContext is the context for building plan.
type PlanContext = context.PlanContext

// BuildPBContext is the context for building `*tipb.Executor`.
type BuildPBContext = context.BuildPBContext

// AsSctx converts PlanContext to sessionctx.Context.
func AsSctx(pctx PlanContext) (sessionctx.Context, error) {
sctx, ok := pctx.(sessionctx.Context)
Expand Down Expand Up @@ -371,7 +374,7 @@ type PhysicalPlan interface {
attach2Task(...task) task

// ToPB converts physical plan to tipb executor.
ToPB(ctx PlanContext, storeType kv.StoreType) (*tipb.Executor, error)
ToPB(ctx *BuildPBContext, storeType kv.StoreType) (*tipb.Executor, error)

// GetChildReqProps gets the required property by child index.
GetChildReqProps(idx int) *property.PhysicalProperty
Expand Down
54 changes: 27 additions & 27 deletions pkg/planner/core/plan_to_pb.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,12 @@ import (
)

// ToPB implements PhysicalPlan ToPB interface.
func (p *basePhysicalPlan) ToPB(_ PlanContext, _ kv.StoreType) (*tipb.Executor, error) {
func (p *basePhysicalPlan) ToPB(_ *BuildPBContext, _ kv.StoreType) (*tipb.Executor, error) {
return nil, errors.Errorf("plan %s fails converts to PB", p.Plan.ExplainID())
}

// ToPB implements PhysicalPlan ToPB interface.
func (p *PhysicalExpand) ToPB(ctx PlanContext, storeType kv.StoreType) (*tipb.Executor, error) {
func (p *PhysicalExpand) ToPB(ctx *BuildPBContext, storeType kv.StoreType) (*tipb.Executor, error) {
if len(p.LevelExprs) > 0 {
return p.toPBV2(ctx, storeType)
}
Expand All @@ -56,7 +56,7 @@ func (p *PhysicalExpand) ToPB(ctx PlanContext, storeType kv.StoreType) (*tipb.Ex
return &tipb.Executor{Tp: tipb.ExecType_TypeExpand, Expand: expand, ExecutorId: &executorID}, nil
}

func (p *PhysicalExpand) toPBV2(ctx PlanContext, storeType kv.StoreType) (*tipb.Executor, error) {
func (p *PhysicalExpand) toPBV2(ctx *BuildPBContext, storeType kv.StoreType) (*tipb.Executor, error) {
client := ctx.GetClient()
projExprsPB := make([]*tipb.ExprSlice, 0, len(p.LevelExprs))
evalCtx := ctx.GetExprCtx().GetEvalCtx()
Expand Down Expand Up @@ -84,7 +84,7 @@ func (p *PhysicalExpand) toPBV2(ctx PlanContext, storeType kv.StoreType) (*tipb.
}

// ToPB implements PhysicalPlan ToPB interface.
func (p *PhysicalHashAgg) ToPB(ctx PlanContext, storeType kv.StoreType) (*tipb.Executor, error) {
func (p *PhysicalHashAgg) ToPB(ctx *BuildPBContext, storeType kv.StoreType) (*tipb.Executor, error) {
client := ctx.GetClient()
groupByExprs, err := expression.ExpressionsToPBList(ctx.GetExprCtx().GetEvalCtx(), p.GroupByItems, client)
if err != nil {
Expand Down Expand Up @@ -115,15 +115,15 @@ func (p *PhysicalHashAgg) ToPB(ctx PlanContext, storeType kv.StoreType) (*tipb.E
Aggregation: aggExec,
ExecutorId: &executorID,
FineGrainedShuffleStreamCount: p.TiFlashFineGrainedShuffleStreamCount,
FineGrainedShuffleBatchSize: ctx.GetSessionVars().TiFlashFineGrainedShuffleBatchSize,
FineGrainedShuffleBatchSize: ctx.TiFlashFineGrainedShuffleBatchSize,
}, nil
}

// ToPB implements PhysicalPlan ToPB interface.
func (p *PhysicalStreamAgg) ToPB(ctx PlanContext, storeType kv.StoreType) (*tipb.Executor, error) {
func (p *PhysicalStreamAgg) ToPB(ctx *BuildPBContext, storeType kv.StoreType) (*tipb.Executor, error) {
client := ctx.GetClient()
evalCtx := ctx.GetExprCtx().GetEvalCtx()
pushDownCtx := GetPushDownCtx(ctx)
pushDownCtx := GetPushDownCtxFromBuildPBContext(ctx)
groupByExprs, err := expression.ExpressionsToPBList(evalCtx, p.GroupByItems, client)
if err != nil {
return nil, err
Expand Down Expand Up @@ -151,7 +151,7 @@ func (p *PhysicalStreamAgg) ToPB(ctx PlanContext, storeType kv.StoreType) (*tipb
}

// ToPB implements PhysicalPlan ToPB interface.
func (p *PhysicalSelection) ToPB(ctx PlanContext, storeType kv.StoreType) (*tipb.Executor, error) {
func (p *PhysicalSelection) ToPB(ctx *BuildPBContext, storeType kv.StoreType) (*tipb.Executor, error) {
client := ctx.GetClient()
conditions, err := expression.ExpressionsToPBList(ctx.GetExprCtx().GetEvalCtx(), p.Conditions, client)
if err != nil {
Expand All @@ -173,7 +173,7 @@ func (p *PhysicalSelection) ToPB(ctx PlanContext, storeType kv.StoreType) (*tipb
}

// ToPB implements PhysicalPlan ToPB interface.
func (p *PhysicalProjection) ToPB(ctx PlanContext, storeType kv.StoreType) (*tipb.Executor, error) {
func (p *PhysicalProjection) ToPB(ctx *BuildPBContext, storeType kv.StoreType) (*tipb.Executor, error) {
client := ctx.GetClient()
exprs, err := expression.ExpressionsToPBList(ctx.GetExprCtx().GetEvalCtx(), p.Exprs, client)
if err != nil {
Expand All @@ -195,7 +195,7 @@ func (p *PhysicalProjection) ToPB(ctx PlanContext, storeType kv.StoreType) (*tip
}

// ToPB implements PhysicalPlan ToPB interface.
func (p *PhysicalTopN) ToPB(ctx PlanContext, storeType kv.StoreType) (*tipb.Executor, error) {
func (p *PhysicalTopN) ToPB(ctx *BuildPBContext, storeType kv.StoreType) (*tipb.Executor, error) {
client := ctx.GetClient()
topNExec := &tipb.TopN{
Limit: p.Count,
Expand All @@ -220,7 +220,7 @@ func (p *PhysicalTopN) ToPB(ctx PlanContext, storeType kv.StoreType) (*tipb.Exec
}

// ToPB implements PhysicalPlan ToPB interface.
func (p *PhysicalLimit) ToPB(ctx PlanContext, storeType kv.StoreType) (*tipb.Executor, error) {
func (p *PhysicalLimit) ToPB(ctx *BuildPBContext, storeType kv.StoreType) (*tipb.Executor, error) {
client := ctx.GetClient()
limitExec := &tipb.Limit{
Limit: p.Count,
Expand All @@ -241,15 +241,15 @@ func (p *PhysicalLimit) ToPB(ctx PlanContext, storeType kv.StoreType) (*tipb.Exe
}

// ToPB implements PhysicalPlan ToPB interface.
func (p *PhysicalTableScan) ToPB(ctx PlanContext, storeType kv.StoreType) (*tipb.Executor, error) {
func (p *PhysicalTableScan) ToPB(ctx *BuildPBContext, storeType kv.StoreType) (*tipb.Executor, error) {
if storeType == kv.TiFlash && p.Table.GetPartitionInfo() != nil && p.IsMPPOrBatchCop && p.SCtx().GetSessionVars().StmtCtx.UseDynamicPartitionPrune() {
return p.partitionTableScanToPBForFlash(ctx)
}
tsExec := tables.BuildTableScanFromInfos(p.Table, p.Columns)
tsExec.Desc = p.Desc
keepOrder := p.KeepOrder
tsExec.KeepOrder = &keepOrder
tsExec.IsFastScan = &(ctx.GetSessionVars().TiFlashFastScan)
tsExec.IsFastScan = &(ctx.TiFlashFastScan)

if len(p.LateMaterializationFilterCondition) > 0 {
client := ctx.GetClient()
Expand Down Expand Up @@ -278,8 +278,8 @@ func (p *PhysicalTableScan) ToPB(ctx PlanContext, storeType kv.StoreType) (*tipb
return &tipb.Executor{Tp: tipb.ExecType_TypeTableScan, TblScan: tsExec, ExecutorId: &executorID}, err
}

func (p *PhysicalTableScan) partitionTableScanToPBForFlash(ctx PlanContext) (*tipb.Executor, error) {
ptsExec := tables.BuildPartitionTableScanFromInfos(p.Table, p.Columns, ctx.GetSessionVars().TiFlashFastScan)
func (p *PhysicalTableScan) partitionTableScanToPBForFlash(ctx *BuildPBContext) (*tipb.Executor, error) {
ptsExec := tables.BuildPartitionTableScanFromInfos(p.Table, p.Columns, ctx.TiFlashFastScan)

if len(p.LateMaterializationFilterCondition) > 0 {
client := ctx.GetClient()
Expand Down Expand Up @@ -342,7 +342,7 @@ func FindColumnInfoByID(colInfos []*model.ColumnInfo, id int64) *model.ColumnInf
}

// ToPB generates the pb structure.
func (e *PhysicalExchangeSender) ToPB(ctx PlanContext, storeType kv.StoreType) (*tipb.Executor, error) {
func (e *PhysicalExchangeSender) ToPB(ctx *BuildPBContext, storeType kv.StoreType) (*tipb.Executor, error) {
child, err := e.Children()[0].ToPB(ctx, kv.TiFlash)
if err != nil {
return nil, errors.Trace(err)
Expand Down Expand Up @@ -412,12 +412,12 @@ func (e *PhysicalExchangeSender) ToPB(ctx PlanContext, storeType kv.StoreType) (
ExchangeSender: ecExec,
ExecutorId: &executorID,
FineGrainedShuffleStreamCount: e.TiFlashFineGrainedShuffleStreamCount,
FineGrainedShuffleBatchSize: ctx.GetSessionVars().TiFlashFineGrainedShuffleBatchSize,
FineGrainedShuffleBatchSize: ctx.TiFlashFineGrainedShuffleBatchSize,
}, nil
}

// ToPB generates the pb structure.
func (e *PhysicalExchangeReceiver) ToPB(ctx PlanContext, _ kv.StoreType) (*tipb.Executor, error) {
func (e *PhysicalExchangeReceiver) ToPB(ctx *BuildPBContext, _ kv.StoreType) (*tipb.Executor, error) {
encodedTask := make([][]byte, 0, len(e.Tasks))

for _, task := range e.Tasks {
Expand Down Expand Up @@ -451,12 +451,12 @@ func (e *PhysicalExchangeReceiver) ToPB(ctx PlanContext, _ kv.StoreType) (*tipb.
ExchangeReceiver: ecExec,
ExecutorId: &executorID,
FineGrainedShuffleStreamCount: e.TiFlashFineGrainedShuffleStreamCount,
FineGrainedShuffleBatchSize: ctx.GetSessionVars().TiFlashFineGrainedShuffleBatchSize,
FineGrainedShuffleBatchSize: ctx.TiFlashFineGrainedShuffleBatchSize,
}, nil
}

// ToPB implements PhysicalPlan ToPB interface.
func (p *PhysicalIndexScan) ToPB(_ PlanContext, _ kv.StoreType) (*tipb.Executor, error) {
func (p *PhysicalIndexScan) ToPB(_ *BuildPBContext, _ kv.StoreType) (*tipb.Executor, error) {
columns := make([]*model.ColumnInfo, 0, p.schema.Len())
tableColumns := p.Table.Cols()
for _, col := range p.schema.Columns {
Expand Down Expand Up @@ -490,7 +490,7 @@ func (p *PhysicalIndexScan) ToPB(_ PlanContext, _ kv.StoreType) (*tipb.Executor,
}

// ToPB implements PhysicalPlan ToPB interface.
func (p *PhysicalHashJoin) ToPB(ctx PlanContext, storeType kv.StoreType) (*tipb.Executor, error) {
func (p *PhysicalHashJoin) ToPB(ctx *BuildPBContext, storeType kv.StoreType) (*tipb.Executor, error) {
client := ctx.GetClient()

if len(p.LeftJoinKeys) > 0 && len(p.LeftNAJoinKeys) > 0 {
Expand Down Expand Up @@ -633,12 +633,12 @@ func (p *PhysicalHashJoin) ToPB(ctx PlanContext, storeType kv.StoreType) (*tipb.
Join: join,
ExecutorId: &executorID,
FineGrainedShuffleStreamCount: p.TiFlashFineGrainedShuffleStreamCount,
FineGrainedShuffleBatchSize: ctx.GetSessionVars().TiFlashFineGrainedShuffleBatchSize,
FineGrainedShuffleBatchSize: ctx.TiFlashFineGrainedShuffleBatchSize,
}, nil
}

// ToPB converts FrameBound to tipb structure.
func (fb *FrameBound) ToPB(ctx PlanContext) (*tipb.WindowFrameBound, error) {
func (fb *FrameBound) ToPB(ctx *BuildPBContext) (*tipb.WindowFrameBound, error) {
pbBound := &tipb.WindowFrameBound{
Type: tipb.WindowBoundType(fb.Type),
Unbounded: fb.UnBounded,
Expand All @@ -660,7 +660,7 @@ func (fb *FrameBound) ToPB(ctx PlanContext) (*tipb.WindowFrameBound, error) {
}

// ToPB implements PhysicalPlan ToPB interface.
func (p *PhysicalWindow) ToPB(ctx PlanContext, storeType kv.StoreType) (*tipb.Executor, error) {
func (p *PhysicalWindow) ToPB(ctx *BuildPBContext, storeType kv.StoreType) (*tipb.Executor, error) {
client := ctx.GetClient()

windowExec := &tipb.Window{}
Expand Down Expand Up @@ -708,12 +708,12 @@ func (p *PhysicalWindow) ToPB(ctx PlanContext, storeType kv.StoreType) (*tipb.Ex
Window: windowExec,
ExecutorId: &executorID,
FineGrainedShuffleStreamCount: p.TiFlashFineGrainedShuffleStreamCount,
FineGrainedShuffleBatchSize: ctx.GetSessionVars().TiFlashFineGrainedShuffleBatchSize,
FineGrainedShuffleBatchSize: ctx.TiFlashFineGrainedShuffleBatchSize,
}, nil
}

// ToPB implements PhysicalPlan ToPB interface.
func (p *PhysicalSort) ToPB(ctx PlanContext, storeType kv.StoreType) (*tipb.Executor, error) {
func (p *PhysicalSort) ToPB(ctx *BuildPBContext, storeType kv.StoreType) (*tipb.Executor, error) {
if !p.IsPartialSort {
return nil, errors.Errorf("sort %s can't convert to pb, because it isn't a partial sort", p.Plan.ExplainID())
}
Expand All @@ -738,6 +738,6 @@ func (p *PhysicalSort) ToPB(ctx PlanContext, storeType kv.StoreType) (*tipb.Exec
Sort: sortExec,
ExecutorId: &executorID,
FineGrainedShuffleStreamCount: p.TiFlashFineGrainedShuffleStreamCount,
FineGrainedShuffleBatchSize: ctx.GetSessionVars().TiFlashFineGrainedShuffleBatchSize,
FineGrainedShuffleBatchSize: ctx.TiFlashFineGrainedShuffleBatchSize,
}, nil
}

0 comments on commit d51e0dc

Please sign in to comment.