Skip to content

Commit

Permalink
use method to get PB context
Browse files Browse the repository at this point in the history
Signed-off-by: Yang Keao <yangkeao@chunibyo.icu>
  • Loading branch information
YangKeao committed Apr 9, 2024
1 parent 4a53fa8 commit 5fb4402
Show file tree
Hide file tree
Showing 11 changed files with 78 additions and 30 deletions.
6 changes: 3 additions & 3 deletions pkg/executor/distsql.go
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,7 @@ func (e *IndexReaderExecutor) buildKVReq(r []kv.KeyRange) (*kv.Request, error) {
func (e *IndexReaderExecutor) open(ctx context.Context, kvRanges []kv.KeyRange) error {
var err error
if e.corColInFilter {
e.dagPB.Executors, err = builder.ConstructListBasedDistExec(plannercore.GetBuildPBCtx(e.Ctx().GetPlanCtx()), e.plans)
e.dagPB.Executors, err = builder.ConstructListBasedDistExec(e.Ctx().GetPlanCtx().GetBuildPBCtx(), e.plans)
if err != nil {
return err
}
Expand Down Expand Up @@ -582,14 +582,14 @@ func (e *IndexLookUpExecutor) open(_ context.Context) error {

var err error
if e.corColInIdxSide {
e.dagPB.Executors, err = builder.ConstructListBasedDistExec(plannercore.GetBuildPBCtx(e.Ctx().GetPlanCtx()), e.idxPlans)
e.dagPB.Executors, err = builder.ConstructListBasedDistExec(e.Ctx().GetPlanCtx().GetBuildPBCtx(), e.idxPlans)
if err != nil {
return err
}
}

if e.corColInTblSide {
e.tableRequest.Executors, err = builder.ConstructListBasedDistExec(plannercore.GetBuildPBCtx(e.Ctx().GetPlanCtx()), e.tblPlans)
e.tableRequest.Executors, err = builder.ConstructListBasedDistExec(e.Ctx().GetPlanCtx().GetBuildPBCtx(), e.tblPlans)
if err != nil {
return err
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/executor/index_merge_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ func (e *IndexMergeReaderExecutor) Open(_ context.Context) (err error) {
e.keyRanges = make([][]kv.KeyRange, 0, len(e.partialPlans))
e.initRuntimeStats()
if e.isCorColInTableFilter {
e.tableRequest.Executors, err = builder.ConstructListBasedDistExec(plannercore.GetBuildPBCtx(e.Ctx().GetPlanCtx()), e.tblPlans)
e.tableRequest.Executors, err = builder.ConstructListBasedDistExec(e.Ctx().GetPlanCtx().GetBuildPBCtx(), e.tblPlans)
if err != nil {
return err
}
Expand Down Expand Up @@ -370,7 +370,7 @@ func (e *IndexMergeReaderExecutor) startPartialIndexWorker(ctx context.Context,
if e.isCorColInPartialFilters[workID] {
// We got correlated column, so need to refresh Selection operator.
var err error
if e.dagPBs[workID].Executors, err = builder.ConstructListBasedDistExec(plannercore.GetBuildPBCtx(e.Ctx().GetPlanCtx()), e.partialPlans[workID]); err != nil {
if e.dagPBs[workID].Executors, err = builder.ConstructListBasedDistExec(e.Ctx().GetPlanCtx().GetBuildPBCtx(), e.partialPlans[workID]); err != nil {
syncErr(ctx, e.finished, fetchCh, err)
return
}
Expand Down Expand Up @@ -513,7 +513,7 @@ func (e *IndexMergeReaderExecutor) startPartialTableWorker(ctx context.Context,
}

if e.isCorColInPartialFilters[workID] {
if e.dagPBs[workID].Executors, err = builder.ConstructListBasedDistExec(plannercore.GetBuildPBCtx(e.Ctx().GetPlanCtx()), e.partialPlans[workID]); err != nil {
if e.dagPBs[workID].Executors, err = builder.ConstructListBasedDistExec(e.Ctx().GetPlanCtx().GetBuildPBCtx(), e.partialPlans[workID]); err != nil {
syncErr(ctx, e.finished, fetchCh, err)
return
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/executor/internal/builder/builder_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,10 @@ func ConstructDAGReq(ctx sessionctx.Context, plans []plannercore.PhysicalPlan, s
}
if storeType == kv.TiFlash {
var executors []*tipb.Executor
executors, err = ConstructTreeBasedDistExec(plannercore.GetBuildPBCtx(ctx.GetPlanCtx()), plans[0])
executors, err = ConstructTreeBasedDistExec(ctx.GetPlanCtx().GetBuildPBCtx(), plans[0])
dagReq.RootExecutor = executors[0]
} else {
dagReq.Executors, err = ConstructListBasedDistExec(plannercore.GetBuildPBCtx(ctx.GetPlanCtx()), plans)
dagReq.Executors, err = ConstructListBasedDistExec(ctx.GetPlanCtx().GetBuildPBCtx(), plans)
}

distsql.SetEncodeType(ctx.GetDistSQLCtx(), dagReq)
Expand Down
2 changes: 1 addition & 1 deletion pkg/executor/table_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ func newTableReaderExecutorContext(sctx sessionctx.Context) tableReaderExecutorC
pctx := sctx.GetPlanCtx()
return tableReaderExecutorContext{
dctx: sctx.GetDistSQLCtx(),
buildPBCtx: plannercore.GetBuildPBCtx(pctx),
buildPBCtx: pctx.GetBuildPBCtx(),
pctx: pctx,
ectx: sctx.GetExprCtx(),
getDDLOwner: getDDLOwner,
Expand Down
2 changes: 2 additions & 0 deletions pkg/planner/context/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ type PlanContext interface {
HasDirtyContent(tid int64) bool
// AdviseTxnWarmup advises the txn to warm up.
AdviseTxnWarmup() error
// GetBuildPBCtx returns the context used in `ToPB` method.
GetBuildPBCtx() *BuildPBContext
}

// EmptyPlanContextExtended is used to provide some empty implementations for PlanContext.
Expand Down
2 changes: 1 addition & 1 deletion pkg/planner/core/physical_plan_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -506,7 +506,7 @@ func TestPhysicalTableScanExtractCorrelatedCols(t *testing.T) {
core.PushedDown(sel, ts, []expression.Expression{selected}, 0.1)
}

pb, err := ts.ToPB(core.GetBuildPBCtx(tk.Session().GetPlanCtx()), kv.TiFlash)
pb, err := ts.ToPB(tk.Session().GetPlanCtx().GetBuildPBCtx(), kv.TiFlash)
require.NoError(t, err)
// make sure the pushed down filter condition is correct
require.Equal(t, 1, len(pb.TblScan.PushedDownFilterConditions))
Expand Down
21 changes: 1 addition & 20 deletions pkg/planner/core/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -473,26 +473,7 @@ func EncodeUniqueIndexValuesForKey(ctx sessionctx.Context, tblInfo *model.TableI

// GetPushDownCtx creates a PushDownContext from PlanContext
func GetPushDownCtx(pctx PlanContext) expression.PushDownContext {
return GetPushDownCtxFromBuildPBContext(GetBuildPBCtx(pctx))
}

// GetBuildPBCtx returns the BuildPBContext from PlanContext
func GetBuildPBCtx(pctx PlanContext) *BuildPBContext {
return &BuildPBContext{
ExprCtx: pctx.GetExprCtx(),
Client: pctx.GetClient(),

TiFlashFastScan: pctx.GetSessionVars().TiFlashFastScan,
TiFlashFineGrainedShuffleBatchSize: pctx.GetSessionVars().TiFlashFineGrainedShuffleBatchSize,

// the following fields are used to build `expression.PushDownContext`.
// TODO: it'd be better to embed `expression.PushDownContext` in `BuildPBContext`. But `expression` already
// depends on this package, so we need to move `expression.PushDownContext` to a standalone package first.
GroupConcatMaxLen: pctx.GetSessionVars().GroupConcatMaxLen,
InExplainStmt: pctx.GetSessionVars().StmtCtx.InExplainStmt,
AppendWarning: pctx.GetSessionVars().StmtCtx.AppendWarning,
AppendExtraWarning: pctx.GetSessionVars().StmtCtx.AppendExtraWarning,
}
return GetPushDownCtxFromBuildPBContext(pctx.GetBuildPBCtx())
}

// GetPushDownCtxFromBuildPBContext creates a PushDownContext from BuildPBContext
Expand Down
26 changes: 26 additions & 0 deletions pkg/session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -2697,6 +2697,32 @@ func (s *session) GetDistSQLCtx() *distsqlctx.DistSQLContext {
return dctx.(*distsqlctx.DistSQLContext)
}

// GetBuildPBCtx returns the context used in `ToPB` method
func (s *session) GetBuildPBCtx() *planctx.BuildPBContext {
vars := s.GetSessionVars()
sc := vars.StmtCtx

bctx := sc.GetOrInitBuildPBCtxFromCache(func() any {
return &planctx.BuildPBContext{
ExprCtx: s.GetExprCtx(),
Client: s.GetClient(),

TiFlashFastScan: s.GetSessionVars().TiFlashFastScan,
TiFlashFineGrainedShuffleBatchSize: s.GetSessionVars().TiFlashFineGrainedShuffleBatchSize,

// the following fields are used to build `expression.PushDownContext`.
// TODO: it'd be better to embed `expression.PushDownContext` in `BuildPBContext`. But `expression` already
// depends on this package, so we need to move `expression.PushDownContext` to a standalone package first.
GroupConcatMaxLen: s.GetSessionVars().GroupConcatMaxLen,
InExplainStmt: s.GetSessionVars().StmtCtx.InExplainStmt,
AppendWarning: s.GetSessionVars().StmtCtx.AppendWarning,
AppendExtraWarning: s.GetSessionVars().StmtCtx.AppendExtraWarning,
}
})

return bctx.(*planctx.BuildPBContext)
}

func (s *session) AuthPluginForUser(user *auth.UserIdentity) (string, error) {
pm := privilege.GetPrivilegeManager(s)
authplugin, err := pm.GetAuthPluginForConnection(user.Username, user.Hostname)
Expand Down
3 changes: 3 additions & 0 deletions pkg/sessionctx/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,9 @@ type Context interface {
// GetDistSQLCtx gets the distsql ctx of the current session
GetDistSQLCtx() *distsqlctx.DistSQLContext

// GetBuildPBCtx gets the ctx used in `ToPB` of the current session
GetBuildPBCtx() *planctx.BuildPBContext

GetSessionManager() util.SessionManager

// RefreshTxnCtx commits old transaction without retry,
Expand Down
17 changes: 17 additions & 0 deletions pkg/sessionctx/stmtctx/stmtctx.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,13 @@ type StatementContext struct {
dctx *distsqlctx.DistSQLContext
}

// buildPBCtxCache is used to persist all variables and tools needed by the `ToPB`
// this cache is set on `StatementContext` because it has to be updated after each statement.
buildPBCtxCache struct {
init sync.Once
bctx any
}

// Set the following variables before execution
hint.StmtHints

Expand Down Expand Up @@ -1255,6 +1262,16 @@ func (sc *StatementContext) GetOrInitDistSQLFromCache(create func() *distsqlctx.
return sc.distSQLCtxCache.dctx
}

// GetOrInitBuildPBCtxFromCache returns the `BuildPBContext` inside cache. If it didn't exist, return a new one created by
// the `create` function. It uses the `any` to avoid cycle dependency.
func (sc *StatementContext) GetOrInitBuildPBCtxFromCache(create func() any) any {
sc.buildPBCtxCache.init.Do(func() {
sc.buildPBCtxCache.bctx = create()
})

return sc.buildPBCtxCache.bctx
}

func newErrCtx(tc types.Context, otherLevels errctx.LevelMap, handler contextutil.WarnHandler) errctx.Context {
l := errctx.LevelError
if flags := tc.Flags(); flags.IgnoreTruncateErr() {
Expand Down
19 changes: 19 additions & 0 deletions pkg/util/mock/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,25 @@ func (c *Context) GetDistSQLCtx() *distsqlctx.DistSQLContext {
}
}

// GetBuildPBCtx returns the `ToPB` context of the session
func (c *Context) GetBuildPBCtx() *planctx.BuildPBContext {
return &planctx.BuildPBContext{
ExprCtx: c.GetExprCtx(),
Client: c.GetClient(),

TiFlashFastScan: c.GetSessionVars().TiFlashFastScan,
TiFlashFineGrainedShuffleBatchSize: c.GetSessionVars().TiFlashFineGrainedShuffleBatchSize,

// the following fields are used to build `expression.PushDownContext`.
// TODO: it'd be better to embed `expression.PushDownContext` in `BuildPBContext`. But `expression` already
// depends on this package, so we need to move `expression.PushDownContext` to a standalone package first.
GroupConcatMaxLen: c.GetSessionVars().GroupConcatMaxLen,
InExplainStmt: c.GetSessionVars().StmtCtx.InExplainStmt,
AppendWarning: c.GetSessionVars().StmtCtx.AppendWarning,
AppendExtraWarning: c.GetSessionVars().StmtCtx.AppendExtraWarning,
}
}

// Txn implements sessionctx.Context Txn interface.
func (c *Context) Txn(bool) (kv.Transaction, error) {
return &c.txn, nil
Expand Down

0 comments on commit 5fb4402

Please sign in to comment.