Skip to content

Commit

Permalink
use method to get PB context
Browse files Browse the repository at this point in the history
Signed-off-by: Yang Keao <yangkeao@chunibyo.icu>
  • Loading branch information
YangKeao committed Apr 10, 2024
1 parent 5db19d4 commit bd3814d
Show file tree
Hide file tree
Showing 11 changed files with 82 additions and 41 deletions.
6 changes: 3 additions & 3 deletions pkg/executor/distsql.go
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,7 @@ func (e *IndexReaderExecutor) buildKVReq(r []kv.KeyRange) (*kv.Request, error) {
func (e *IndexReaderExecutor) open(ctx context.Context, kvRanges []kv.KeyRange) error {
var err error
if e.corColInFilter {
e.dagPB.Executors, err = builder.ConstructListBasedDistExec(plannercore.GetBuildPBCtx(e.Ctx().GetPlanCtx()), e.plans)
e.dagPB.Executors, err = builder.ConstructListBasedDistExec(e.Ctx().GetBuildPBCtx(), e.plans)
if err != nil {
return err
}
Expand Down Expand Up @@ -582,14 +582,14 @@ func (e *IndexLookUpExecutor) open(_ context.Context) error {

var err error
if e.corColInIdxSide {
e.dagPB.Executors, err = builder.ConstructListBasedDistExec(plannercore.GetBuildPBCtx(e.Ctx().GetPlanCtx()), e.idxPlans)
e.dagPB.Executors, err = builder.ConstructListBasedDistExec(e.Ctx().GetBuildPBCtx(), e.idxPlans)
if err != nil {
return err
}
}

if e.corColInTblSide {
e.tableRequest.Executors, err = builder.ConstructListBasedDistExec(plannercore.GetBuildPBCtx(e.Ctx().GetPlanCtx()), e.tblPlans)
e.tableRequest.Executors, err = builder.ConstructListBasedDistExec(e.Ctx().GetBuildPBCtx(), e.tblPlans)
if err != nil {
return err
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/executor/index_merge_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ func (e *IndexMergeReaderExecutor) Open(_ context.Context) (err error) {
e.keyRanges = make([][]kv.KeyRange, 0, len(e.partialPlans))
e.initRuntimeStats()
if e.isCorColInTableFilter {
e.tableRequest.Executors, err = builder.ConstructListBasedDistExec(plannercore.GetBuildPBCtx(e.Ctx().GetPlanCtx()), e.tblPlans)
e.tableRequest.Executors, err = builder.ConstructListBasedDistExec(e.Ctx().GetBuildPBCtx(), e.tblPlans)
if err != nil {
return err
}
Expand Down Expand Up @@ -370,7 +370,7 @@ func (e *IndexMergeReaderExecutor) startPartialIndexWorker(ctx context.Context,
if e.isCorColInPartialFilters[workID] {
// We got correlated column, so need to refresh Selection operator.
var err error
if e.dagPBs[workID].Executors, err = builder.ConstructListBasedDistExec(plannercore.GetBuildPBCtx(e.Ctx().GetPlanCtx()), e.partialPlans[workID]); err != nil {
if e.dagPBs[workID].Executors, err = builder.ConstructListBasedDistExec(e.Ctx().GetBuildPBCtx(), e.partialPlans[workID]); err != nil {
syncErr(ctx, e.finished, fetchCh, err)
return
}
Expand Down Expand Up @@ -513,7 +513,7 @@ func (e *IndexMergeReaderExecutor) startPartialTableWorker(ctx context.Context,
}

if e.isCorColInPartialFilters[workID] {
if e.dagPBs[workID].Executors, err = builder.ConstructListBasedDistExec(plannercore.GetBuildPBCtx(e.Ctx().GetPlanCtx()), e.partialPlans[workID]); err != nil {
if e.dagPBs[workID].Executors, err = builder.ConstructListBasedDistExec(e.Ctx().GetBuildPBCtx(), e.partialPlans[workID]); err != nil {
syncErr(ctx, e.finished, fetchCh, err)
return
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/executor/internal/builder/builder_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,10 @@ func ConstructDAGReq(ctx sessionctx.Context, plans []plannercore.PhysicalPlan, s
}
if storeType == kv.TiFlash {
var executors []*tipb.Executor
executors, err = ConstructTreeBasedDistExec(plannercore.GetBuildPBCtx(ctx.GetPlanCtx()), plans[0])
executors, err = ConstructTreeBasedDistExec(ctx.GetBuildPBCtx(), plans[0])
dagReq.RootExecutor = executors[0]
} else {
dagReq.Executors, err = ConstructListBasedDistExec(plannercore.GetBuildPBCtx(ctx.GetPlanCtx()), plans)
dagReq.Executors, err = ConstructListBasedDistExec(ctx.GetBuildPBCtx(), plans)
}

distsql.SetEncodeType(ctx.GetDistSQLCtx(), dagReq)
Expand Down
17 changes: 5 additions & 12 deletions pkg/executor/table_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,11 +77,11 @@ type kvRangeBuilder interface {

// tableReaderExecutorContext is the execution context for the `TableReaderExecutor`
type tableReaderExecutorContext struct {
dctx *distsqlctx.DistSQLContext
rctx *rangerctx.RangerContext
dctx *distsqlctx.DistSQLContext
rctx *rangerctx.RangerContext
buildPBCtx *planctx.BuildPBContext
pctx planctx.PlanContext
ectx exprctx.BuildContext
pctx planctx.PlanContext
ectx exprctx.BuildContext

stmtMemTracker *memory.Tracker

Expand Down Expand Up @@ -119,20 +119,13 @@ func newTableReaderExecutorContext(sctx sessionctx.Context) tableReaderExecutorC

pctx := sctx.GetPlanCtx()
return tableReaderExecutorContext{
<<<<<<< HEAD
dctx: sctx.GetDistSQLCtx(),
rctx: pctx.GetRangerCtx(),
buildPBCtx: pctx.GetBuildPBCtx(),
pctx: pctx,
ectx: sctx.GetExprCtx(),
stmtMemTracker: sctx.GetSessionVars().StmtCtx.MemTracker,
getDDLOwner: getDDLOwner,
=======
dctx: sctx.GetDistSQLCtx(),
buildPBCtx: plannercore.GetBuildPBCtx(pctx),
pctx: pctx,
ectx: sctx.GetExprCtx(),
getDDLOwner: getDDLOwner,
>>>>>>> 4a53fa89c0 (add a smaller context for ToPB)
}
}

Expand Down
2 changes: 2 additions & 0 deletions pkg/planner/context/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ type PlanContext interface {
AdviseTxnWarmup() error
// GetRangerCtx returns the context used in `ranger` functions
GetRangerCtx() *rangerctx.RangerContext
// GetBuildPBCtx returns the context used in `ToPB` method.
GetBuildPBCtx() *BuildPBContext
}

// EmptyPlanContextExtended is used to provide some empty implementations for PlanContext.
Expand Down
2 changes: 1 addition & 1 deletion pkg/planner/core/physical_plan_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -506,7 +506,7 @@ func TestPhysicalTableScanExtractCorrelatedCols(t *testing.T) {
core.PushedDown(sel, ts, []expression.Expression{selected}, 0.1)
}

pb, err := ts.ToPB(core.GetBuildPBCtx(tk.Session().GetPlanCtx()), kv.TiFlash)
pb, err := ts.ToPB(tk.Session().GetBuildPBCtx(), kv.TiFlash)
require.NoError(t, err)
// make sure the pushed down filter condition is correct
require.Equal(t, 1, len(pb.TblScan.PushedDownFilterConditions))
Expand Down
21 changes: 1 addition & 20 deletions pkg/planner/core/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -473,26 +473,7 @@ func EncodeUniqueIndexValuesForKey(ctx sessionctx.Context, tblInfo *model.TableI

// GetPushDownCtx creates a PushDownContext from PlanContext
func GetPushDownCtx(pctx PlanContext) expression.PushDownContext {
return GetPushDownCtxFromBuildPBContext(GetBuildPBCtx(pctx))
}

// GetBuildPBCtx returns the BuildPBContext from PlanContext
func GetBuildPBCtx(pctx PlanContext) *BuildPBContext {
return &BuildPBContext{
ExprCtx: pctx.GetExprCtx(),
Client: pctx.GetClient(),

TiFlashFastScan: pctx.GetSessionVars().TiFlashFastScan,
TiFlashFineGrainedShuffleBatchSize: pctx.GetSessionVars().TiFlashFineGrainedShuffleBatchSize,

// the following fields are used to build `expression.PushDownContext`.
// TODO: it'd be better to embed `expression.PushDownContext` in `BuildPBContext`. But `expression` already
// depends on this package, so we need to move `expression.PushDownContext` to a standalone package first.
GroupConcatMaxLen: pctx.GetSessionVars().GroupConcatMaxLen,
InExplainStmt: pctx.GetSessionVars().StmtCtx.InExplainStmt,
AppendWarning: pctx.GetSessionVars().StmtCtx.AppendWarning,
AppendExtraWarning: pctx.GetSessionVars().StmtCtx.AppendExtraWarning,
}
return GetPushDownCtxFromBuildPBContext(pctx.GetBuildPBCtx())
}

// GetPushDownCtxFromBuildPBContext creates a PushDownContext from BuildPBContext
Expand Down
26 changes: 26 additions & 0 deletions pkg/session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -2606,6 +2606,32 @@ func (s *session) GetRangerCtx() *rangerctx.RangerContext {
return rctx.(*rangerctx.RangerContext)
}

// GetBuildPBCtx returns the context used in `ToPB` method
func (s *session) GetBuildPBCtx() *planctx.BuildPBContext {
vars := s.GetSessionVars()
sc := vars.StmtCtx

bctx := sc.GetOrInitBuildPBCtxFromCache(func() any {
return &planctx.BuildPBContext{
ExprCtx: s.GetExprCtx(),
Client: s.GetClient(),

TiFlashFastScan: s.GetSessionVars().TiFlashFastScan,
TiFlashFineGrainedShuffleBatchSize: s.GetSessionVars().TiFlashFineGrainedShuffleBatchSize,

// the following fields are used to build `expression.PushDownContext`.
// TODO: it'd be better to embed `expression.PushDownContext` in `BuildPBContext`. But `expression` already
// depends on this package, so we need to move `expression.PushDownContext` to a standalone package first.
GroupConcatMaxLen: s.GetSessionVars().GroupConcatMaxLen,
InExplainStmt: s.GetSessionVars().StmtCtx.InExplainStmt,
AppendWarning: s.GetSessionVars().StmtCtx.AppendWarning,
AppendExtraWarning: s.GetSessionVars().StmtCtx.AppendExtraWarning,
}
})

return bctx.(*planctx.BuildPBContext)
}

func (s *session) AuthPluginForUser(user *auth.UserIdentity) (string, error) {
pm := privilege.GetPrivilegeManager(s)
authplugin, err := pm.GetAuthPluginForConnection(user.Username, user.Hostname)
Expand Down
3 changes: 3 additions & 0 deletions pkg/sessionctx/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,9 @@ type Context interface {
// GetRangerCtx returns the context used in `ranger` related functions
GetRangerCtx() *rangerctx.RangerContext

// GetBuildPBCtx gets the ctx used in `ToPB` of the current session
GetBuildPBCtx() *planctx.BuildPBContext

GetSessionManager() util.SessionManager

// RefreshTxnCtx commits old transaction without retry,
Expand Down
17 changes: 17 additions & 0 deletions pkg/sessionctx/stmtctx/stmtctx.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,13 @@ type StatementContext struct {
rctx any
}

// buildPBCtxCache is used to persist all variables and tools needed by the `ToPB`
// this cache is set on `StatementContext` because it has to be updated after each statement.
buildPBCtxCache struct {
init sync.Once
bctx any
}

// Set the following variables before execution
hint.StmtHints

Expand Down Expand Up @@ -1272,6 +1279,16 @@ func (sc *StatementContext) GetOrInitRangerCtxFromCache(create func() any) any {
return sc.rangerCtxCache.rctx
}

// GetOrInitBuildPBCtxFromCache returns the `BuildPBContext` inside cache. If it didn't exist, return a new one created by
// the `create` function. It uses the `any` to avoid cycle dependency.
func (sc *StatementContext) GetOrInitBuildPBCtxFromCache(create func() any) any {
sc.buildPBCtxCache.init.Do(func() {
sc.buildPBCtxCache.bctx = create()
})

return sc.buildPBCtxCache.bctx
}

func newErrCtx(tc types.Context, otherLevels errctx.LevelMap, handler contextutil.WarnHandler) errctx.Context {
l := errctx.LevelError
if flags := tc.Flags(); flags.IgnoreTruncateErr() {
Expand Down
19 changes: 19 additions & 0 deletions pkg/util/mock/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,25 @@ func (c *Context) GetRangerCtx() *rangerctx.RangerContext {
}
}

// GetBuildPBCtx returns the `ToPB` context of the session
func (c *Context) GetBuildPBCtx() *planctx.BuildPBContext {
return &planctx.BuildPBContext{
ExprCtx: c.GetExprCtx(),
Client: c.GetClient(),

TiFlashFastScan: c.GetSessionVars().TiFlashFastScan,
TiFlashFineGrainedShuffleBatchSize: c.GetSessionVars().TiFlashFineGrainedShuffleBatchSize,

// the following fields are used to build `expression.PushDownContext`.
// TODO: it'd be better to embed `expression.PushDownContext` in `BuildPBContext`. But `expression` already
// depends on this package, so we need to move `expression.PushDownContext` to a standalone package first.
GroupConcatMaxLen: c.GetSessionVars().GroupConcatMaxLen,
InExplainStmt: c.GetSessionVars().StmtCtx.InExplainStmt,
AppendWarning: c.GetSessionVars().StmtCtx.AppendWarning,
AppendExtraWarning: c.GetSessionVars().StmtCtx.AppendExtraWarning,
}
}

// Txn implements sessionctx.Context Txn interface.
func (c *Context) Txn(bool) (kv.Transaction, error) {
return &c.txn, nil
Expand Down

0 comments on commit bd3814d

Please sign in to comment.