Skip to content

Commit

Permalink
refactor the BaseExecutor in IndexReaderExecutor
Browse files Browse the repository at this point in the history
Signed-off-by: Yang Keao <yangkeao@chunibyo.icu>
  • Loading branch information
YangKeao committed May 9, 2024
1 parent 788d505 commit 200f5c5
Show file tree
Hide file tree
Showing 4 changed files with 100 additions and 55 deletions.
69 changes: 35 additions & 34 deletions pkg/executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -3423,7 +3423,7 @@ func (b *executorBuilder) buildTableReader(v *plannercore.PhysicalTableReader) e
return ret
}

func buildIndexRangeForEachPartition(ctx sessionctx.Context, usedPartitions []table.PhysicalTable, contentPos []int64,
func buildIndexRangeForEachPartition(rctx *rangerctx.RangerContext, usedPartitions []table.PhysicalTable, contentPos []int64,
lookUpContent []*join.IndexJoinLookUpContent, indexRanges []*ranger.Range, keyOff2IdxOff []int, cwc *plannercore.ColWithCmpFuncManager) (map[int64][]*ranger.Range, error) {
contentBucket := make(map[int64][]*join.IndexJoinLookUpContent)
for _, p := range usedPartitions {
Expand All @@ -3436,7 +3436,7 @@ func buildIndexRangeForEachPartition(ctx sessionctx.Context, usedPartitions []ta
}
nextRange := make(map[int64][]*ranger.Range)
for _, p := range usedPartitions {
ranges, err := buildRangesForIndexJoin(ctx, contentBucket[p.GetPhysicalID()], indexRanges, keyOff2IdxOff, cwc)
ranges, err := buildRangesForIndexJoin(rctx, contentBucket[p.GetPhysicalID()], indexRanges, keyOff2IdxOff, cwc)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -3564,28 +3564,29 @@ func buildNoRangeIndexReader(b *executorBuilder, v *plannercore.PhysicalIndexRea
paging := b.ctx.GetSessionVars().EnablePaging

e := &IndexReaderExecutor{
BaseExecutor: exec.NewBaseExecutor(b.ctx, v.Schema(), v.ID()),
indexUsageReporter: b.buildIndexUsageReporter(v),
dagPB: dagReq,
startTS: startTS,
txnScope: b.txnScope,
readReplicaScope: b.readReplicaScope,
isStaleness: b.isStaleness,
netDataSize: v.GetNetDataSize(),
physicalTableID: physicalTableID,
table: tbl,
index: is.Index,
keepOrder: is.KeepOrder,
desc: is.Desc,
columns: is.Columns,
byItems: is.ByItems,
paging: paging,
corColInFilter: b.corColInDistPlan(v.IndexPlans),
corColInAccess: b.corColInAccess(v.IndexPlans[0]),
idxCols: is.IdxCols,
colLens: is.IdxColLens,
plans: v.IndexPlans,
outputColumns: v.OutputColumns,
indexReaderExecutorContext: newIndexReaderExecutorContext(b.ctx),
BaseExecutorV2: exec.NewBaseExecutorV2(b.ctx.GetSessionVars(), v.Schema(), v.ID()),
indexUsageReporter: b.buildIndexUsageReporter(v),
dagPB: dagReq,
startTS: startTS,
txnScope: b.txnScope,
readReplicaScope: b.readReplicaScope,
isStaleness: b.isStaleness,
netDataSize: v.GetNetDataSize(),
physicalTableID: physicalTableID,
table: tbl,
index: is.Index,
keepOrder: is.KeepOrder,
desc: is.Desc,
columns: is.Columns,
byItems: is.ByItems,
paging: paging,
corColInFilter: b.corColInDistPlan(v.IndexPlans),
corColInAccess: b.corColInAccess(v.IndexPlans[0]),
idxCols: is.IdxCols,
colLens: is.IdxColLens,
plans: v.IndexPlans,
outputColumns: v.OutputColumns,
}

for _, col := range v.OutputColumns {
Expand Down Expand Up @@ -4379,7 +4380,7 @@ func (builder *dataReaderBuilder) buildIndexReaderForIndexJoin(ctx context.Conte
}
tbInfo := e.table.Meta()
if tbInfo.GetPartitionInfo() == nil || !builder.ctx.GetSessionVars().StmtCtx.UseDynamicPartitionPrune() {
kvRanges, err := buildKvRangesForIndexJoin(e.Ctx().GetDistSQLCtx(), e.Ctx().GetRangerCtx(), e.physicalTableID, e.index.ID, lookUpContents, indexRanges, keyOff2IdxOff, cwc, memoryTracker, interruptSignal)
kvRanges, err := buildKvRangesForIndexJoin(e.dctx, e.rctx, e.physicalTableID, e.index.ID, lookUpContents, indexRanges, keyOff2IdxOff, cwc, memoryTracker, interruptSignal)
if err != nil {
return nil, err
}
Expand All @@ -4393,7 +4394,7 @@ func (builder *dataReaderBuilder) buildIndexReaderForIndexJoin(ctx context.Conte
if err != nil {
return nil, err
}
if e.ranges, err = buildRangesForIndexJoin(e.Ctx(), lookUpContents, indexRanges, keyOff2IdxOff, cwc); err != nil {
if e.ranges, err = buildRangesForIndexJoin(e.rctx, lookUpContents, indexRanges, keyOff2IdxOff, cwc); err != nil {
return nil, err
}
if err := exec.Open(ctx, e); err != nil {
Expand All @@ -4409,7 +4410,7 @@ func (builder *dataReaderBuilder) buildIndexReaderForIndexJoin(ctx context.Conte
}
if len(usedPartition) != 0 {
if canPrune {
rangeMap, err := buildIndexRangeForEachPartition(e.Ctx(), usedPartition, contentPos, lookUpContents, indexRanges, keyOff2IdxOff, cwc)
rangeMap, err := buildIndexRangeForEachPartition(e.rctx, usedPartition, contentPos, lookUpContents, indexRanges, keyOff2IdxOff, cwc)
if err != nil {
return nil, err
}
Expand All @@ -4418,7 +4419,7 @@ func (builder *dataReaderBuilder) buildIndexReaderForIndexJoin(ctx context.Conte
e.partRangeMap = rangeMap
} else {
e.partitions = usedPartition
if e.ranges, err = buildRangesForIndexJoin(e.Ctx(), lookUpContents, indexRanges, keyOff2IdxOff, cwc); err != nil {
if e.ranges, err = buildRangesForIndexJoin(e.rctx, lookUpContents, indexRanges, keyOff2IdxOff, cwc); err != nil {
return nil, err
}
}
Expand Down Expand Up @@ -4455,7 +4456,7 @@ func (builder *dataReaderBuilder) buildIndexLookUpReaderForIndexJoin(ctx context
if err != nil {
return nil, err
}
e.ranges, err = buildRangesForIndexJoin(e.Ctx(), lookUpContents, indexRanges, keyOff2IdxOff, cwc)
e.ranges, err = buildRangesForIndexJoin(e.Ctx().GetRangerCtx(), lookUpContents, indexRanges, keyOff2IdxOff, cwc)
if err != nil {
return nil, err
}
Expand All @@ -4472,7 +4473,7 @@ func (builder *dataReaderBuilder) buildIndexLookUpReaderForIndexJoin(ctx context
}
if len(usedPartition) != 0 {
if canPrune {
rangeMap, err := buildIndexRangeForEachPartition(e.Ctx(), usedPartition, contentPos, lookUpContents, indexRanges, keyOff2IdxOff, cwc)
rangeMap, err := buildIndexRangeForEachPartition(e.Ctx().GetRangerCtx(), usedPartition, contentPos, lookUpContents, indexRanges, keyOff2IdxOff, cwc)
if err != nil {
return nil, err
}
Expand All @@ -4481,7 +4482,7 @@ func (builder *dataReaderBuilder) buildIndexLookUpReaderForIndexJoin(ctx context
e.partitionRangeMap = rangeMap
} else {
e.prunedPartitions = usedPartition
e.ranges, err = buildRangesForIndexJoin(e.Ctx(), lookUpContents, indexRanges, keyOff2IdxOff, cwc)
e.ranges, err = buildRangesForIndexJoin(e.Ctx().GetRangerCtx(), lookUpContents, indexRanges, keyOff2IdxOff, cwc)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -4548,7 +4549,7 @@ func (builder *dataReaderBuilder) buildProjectionForIndexJoin(
}

// buildRangesForIndexJoin builds kv ranges for index join when the inner plan is index scan plan.
func buildRangesForIndexJoin(ctx sessionctx.Context, lookUpContents []*join.IndexJoinLookUpContent,
func buildRangesForIndexJoin(rctx *rangerctx.RangerContext, lookUpContents []*join.IndexJoinLookUpContent,
ranges []*ranger.Range, keyOff2IdxOff []int, cwc *plannercore.ColWithCmpFuncManager) ([]*ranger.Range, error) {
retRanges := make([]*ranger.Range, 0, len(ranges)*len(lookUpContents))
lastPos := len(ranges[0].LowVal) - 1
Expand All @@ -4567,7 +4568,7 @@ func buildRangesForIndexJoin(ctx sessionctx.Context, lookUpContents []*join.Inde
}
continue
}
nextColRanges, err := cwc.BuildRangesByRow(ctx.GetRangerCtx(), content.Row)
nextColRanges, err := cwc.BuildRangesByRow(rctx, content.Row)
if err != nil {
return nil, err
}
Expand All @@ -4587,7 +4588,7 @@ func buildRangesForIndexJoin(ctx sessionctx.Context, lookUpContents []*join.Inde
return retRanges, nil
}

return ranger.UnionRanges(ctx.GetRangerCtx(), tmpDatumRanges, true)
return ranger.UnionRanges(rctx, tmpDatumRanges, true)
}

// buildKvRangesForIndexJoin builds kv ranges for index join when the inner plan is index scan plan.
Expand Down
64 changes: 48 additions & 16 deletions pkg/executor/distsql.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,14 @@ import (
"github.com/pingcap/tidb/pkg/executor/internal/builder"
"github.com/pingcap/tidb/pkg/executor/internal/exec"
"github.com/pingcap/tidb/pkg/expression"
exprctx "github.com/pingcap/tidb/pkg/expression/context"
isctx "github.com/pingcap/tidb/pkg/infoschema/context"
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/parser/charset"
"github.com/pingcap/tidb/pkg/parser/model"
"github.com/pingcap/tidb/pkg/parser/mysql"
"github.com/pingcap/tidb/pkg/parser/terror"
planctx "github.com/pingcap/tidb/pkg/planner/context"
plannercore "github.com/pingcap/tidb/pkg/planner/core"
"github.com/pingcap/tidb/pkg/planner/core/base"
plannerutil "github.com/pingcap/tidb/pkg/planner/util"
Expand All @@ -56,6 +59,7 @@ import (
"github.com/pingcap/tidb/pkg/util/logutil/consistency"
"github.com/pingcap/tidb/pkg/util/memory"
"github.com/pingcap/tidb/pkg/util/ranger"
rangerctx "github.com/pingcap/tidb/pkg/util/ranger/context"
"github.com/pingcap/tidb/pkg/util/size"
"github.com/pingcap/tipb/go-tipb"
"go.uber.org/zap"
Expand Down Expand Up @@ -149,23 +153,51 @@ func closeAll(objs ...Closeable) error {

// rebuildIndexRanges will be called if there's correlated column in access conditions. We will rebuild the range
// by substituting correlated column with the constant.
func rebuildIndexRanges(ctx sessionctx.Context, is *plannercore.PhysicalIndexScan, idxCols []*expression.Column, colLens []int) (ranges []*ranger.Range, err error) {
func rebuildIndexRanges(ectx exprctx.BuildContext, rctx *rangerctx.RangerContext, is *plannercore.PhysicalIndexScan, idxCols []*expression.Column, colLens []int) (ranges []*ranger.Range, err error) {
access := make([]expression.Expression, 0, len(is.AccessCondition))
for _, cond := range is.AccessCondition {
newCond, err1 := expression.SubstituteCorCol2Constant(ctx.GetExprCtx(), cond)
newCond, err1 := expression.SubstituteCorCol2Constant(ectx, cond)
if err1 != nil {
return nil, err1
}
access = append(access, newCond)
}
// All of access conditions must be used to build ranges, so we don't limit range memory usage.
ranges, _, err = ranger.DetachSimpleCondAndBuildRangeForIndex(ctx.GetRangerCtx(), access, idxCols, colLens, 0)
ranges, _, err = ranger.DetachSimpleCondAndBuildRangeForIndex(rctx, access, idxCols, colLens, 0)
return ranges, err
}

type indexReaderExecutorContext struct {
dctx *distsqlctx.DistSQLContext
rctx *rangerctx.RangerContext
buildPBCtx *planctx.BuildPBContext
ectx exprctx.BuildContext

stmtMemTracker *memory.Tracker

infoSchema isctx.MetaOnlyInfoSchema
}

func (ireCtx *indexReaderExecutorContext) GetInfoSchema() isctx.MetaOnlyInfoSchema {
return ireCtx.infoSchema
}

func newIndexReaderExecutorContext(sctx sessionctx.Context) indexReaderExecutorContext {
pctx := sctx.GetPlanCtx()
return indexReaderExecutorContext{
dctx: sctx.GetDistSQLCtx(),
rctx: pctx.GetRangerCtx(),
buildPBCtx: pctx.GetBuildPBCtx(),
ectx: sctx.GetExprCtx(),
stmtMemTracker: sctx.GetSessionVars().StmtCtx.MemTracker,
infoSchema: pctx.GetInfoSchema(),
}
}

// IndexReaderExecutor sends dag request and reads index data from kv layer.
type IndexReaderExecutor struct {
exec.BaseExecutor
indexReaderExecutorContext
exec.BaseExecutorV2
indexUsageReporter *exec.IndexUsageReporter

// For a partitioned table, the IndexReaderExecutor works on a partition, so
Expand Down Expand Up @@ -270,7 +302,7 @@ func (e *IndexReaderExecutor) buildKeyRanges(dctx *distsqlctx.DistSQLContext, ra
func (e *IndexReaderExecutor) Open(ctx context.Context) error {
var err error
if e.corColInAccess {
e.ranges, err = rebuildIndexRanges(e.Ctx(), e.plans[0].(*plannercore.PhysicalIndexScan), e.idxCols, e.colLens)
e.ranges, err = rebuildIndexRanges(e.ectx, e.rctx, e.plans[0].(*plannercore.PhysicalIndexScan), e.idxCols, e.colLens)
if err != nil {
return err
}
Expand All @@ -283,14 +315,14 @@ func (e *IndexReaderExecutor) Open(ctx context.Context) error {
if pRange, ok := e.partRangeMap[p.GetPhysicalID()]; ok {
partRange = pRange
}
kvRange, err := e.buildKeyRanges(e.Ctx().GetDistSQLCtx(), partRange, p.GetPhysicalID())
kvRange, err := e.buildKeyRanges(e.dctx, partRange, p.GetPhysicalID())
if err != nil {
return err
}
kvRanges = append(kvRanges, kvRange...)
}
} else {
kvRanges, err = e.buildKeyRanges(e.Ctx().GetDistSQLCtx(), e.ranges, e.physicalTableID)
kvRanges, err = e.buildKeyRanges(e.dctx, e.ranges, e.physicalTableID)
}
if err != nil {
return err
Expand All @@ -309,19 +341,19 @@ func (e *IndexReaderExecutor) buildKVReq(r []kv.KeyRange) (*kv.Request, error) {
SetTxnScope(e.txnScope).
SetReadReplicaScope(e.readReplicaScope).
SetIsStaleness(e.isStaleness).
SetFromSessionVars(e.Ctx().GetDistSQLCtx()).
SetFromInfoSchema(e.Ctx().GetInfoSchema()).
SetFromSessionVars(e.dctx).
SetFromInfoSchema(e.GetInfoSchema()).
SetMemTracker(e.memTracker).
SetClosestReplicaReadAdjuster(newClosestReadAdjuster(e.Ctx().GetDistSQLCtx(), &builder.Request, e.netDataSize)).
SetConnIDAndConnAlias(e.Ctx().GetSessionVars().ConnectionID, e.Ctx().GetSessionVars().SessionAlias)
SetClosestReplicaReadAdjuster(newClosestReadAdjuster(e.dctx, &builder.Request, e.netDataSize)).
SetConnIDAndConnAlias(e.dctx.ConnectionID, e.dctx.SessionAlias)
kvReq, err := builder.Build()
return kvReq, err
}

func (e *IndexReaderExecutor) open(ctx context.Context, kvRanges []kv.KeyRange) error {
var err error
if e.corColInFilter {
e.dagPB.Executors, err = builder.ConstructListBasedDistExec(e.Ctx().GetBuildPBCtx(), e.plans)
e.dagPB.Executors, err = builder.ConstructListBasedDistExec(e.buildPBCtx, e.plans)
if err != nil {
return err
}
Expand All @@ -344,7 +376,7 @@ func (e *IndexReaderExecutor) open(ctx context.Context, kvRanges []kv.KeyRange)
} else {
e.memTracker = memory.NewTracker(e.ID(), -1)
}
e.memTracker.AttachTo(e.Ctx().GetSessionVars().StmtCtx.MemTracker)
e.memTracker.AttachTo(e.stmtMemTracker)
slices.SortFunc(kvRanges, func(i, j kv.KeyRange) int {
return bytes.Compare(i.StartKey, j.StartKey)
})
Expand All @@ -354,7 +386,7 @@ func (e *IndexReaderExecutor) open(ctx context.Context, kvRanges []kv.KeyRange)
if err != nil {
return err
}
e.result, err = e.SelectResult(ctx, e.Ctx().GetDistSQLCtx(), kvReq, exec.RetTypes(e), getPhysicalPlanIDs(e.plans), e.ID())
e.result, err = e.SelectResult(ctx, e.dctx, kvReq, exec.RetTypes(e), getPhysicalPlanIDs(e.plans), e.ID())
if err != nil {
return err
}
Expand All @@ -369,7 +401,7 @@ func (e *IndexReaderExecutor) open(ctx context.Context, kvRanges []kv.KeyRange)
}
var results []distsql.SelectResult
for _, kvReq := range kvReqs {
result, err := e.SelectResult(ctx, e.Ctx().GetDistSQLCtx(), kvReq, exec.RetTypes(e), getPhysicalPlanIDs(e.plans), e.ID())
result, err := e.SelectResult(ctx, e.dctx, kvReq, exec.RetTypes(e), getPhysicalPlanIDs(e.plans), e.ID())
if err != nil {
return err
}
Expand Down Expand Up @@ -481,7 +513,7 @@ func (e *IndexLookUpExecutor) setDummy() {
func (e *IndexLookUpExecutor) Open(ctx context.Context) error {
var err error
if e.corColInAccess {
e.ranges, err = rebuildIndexRanges(e.Ctx(), e.idxPlans[0].(*plannercore.PhysicalIndexScan), e.idxCols, e.colLens)
e.ranges, err = rebuildIndexRanges(e.Ctx().GetExprCtx(), e.Ctx().GetRangerCtx(), e.idxPlans[0].(*plannercore.PhysicalIndexScan), e.idxCols, e.colLens)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/executor/index_merge_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ func (e *IndexMergeReaderExecutor) rebuildRangeForCorCol() (err error) {
if e.isCorColInPartialAccess[i] {
switch x := plan[0].(type) {
case *plannercore.PhysicalIndexScan:
e.ranges[i], err = rebuildIndexRanges(e.Ctx(), x, x.IdxCols, x.IdxColLens)
e.ranges[i], err = rebuildIndexRanges(e.Ctx().GetExprCtx(), e.Ctx().GetRangerCtx(), x, x.IdxCols, x.IdxColLens)
case *plannercore.PhysicalTableScan:
e.ranges[i], err = x.ResolveCorrelatedColumns()
default:
Expand Down
20 changes: 16 additions & 4 deletions pkg/executor/table_readers_required_rows_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,17 @@ func buildMockBaseExec(sctx sessionctx.Context) exec.BaseExecutor {
return baseExec
}

func buildMockBaseExecV2(sctx sessionctx.Context) exec.BaseExecutorV2 {
retTypes := []*types.FieldType{types.NewFieldType(mysql.TypeDouble), types.NewFieldType(mysql.TypeLonglong)}
cols := make([]*expression.Column, len(retTypes))
for i := range retTypes {
cols[i] = &expression.Column{Index: i, RetType: retTypes[i]}
}
schema := expression.NewSchema(cols...)
baseExec := exec.NewBaseExecutorV2(sctx.GetSessionVars(), schema, 0)
return baseExec
}

func TestTableReaderRequiredRows(t *testing.T) {
maxChunkSize := defaultCtx().GetSessionVars().MaxChunkSize
testCases := []struct {
Expand Down Expand Up @@ -208,10 +219,11 @@ func TestTableReaderRequiredRows(t *testing.T) {

func buildIndexReader(sctx sessionctx.Context) exec.Executor {
e := &IndexReaderExecutor{
BaseExecutor: buildMockBaseExec(sctx),
dagPB: buildMockDAGRequest(sctx),
index: &model.IndexInfo{},
selectResultHook: selectResultHook{mockSelectResult},
indexReaderExecutorContext: newIndexReaderExecutorContext(sctx),
BaseExecutorV2: buildMockBaseExecV2(sctx),
dagPB: buildMockDAGRequest(sctx),
index: &model.IndexInfo{},
selectResultHook: selectResultHook{mockSelectResult},
}
return e
}
Expand Down

0 comments on commit 200f5c5

Please sign in to comment.