From 9492bfee95e5b42054d2b6c269e97ffc1555c2b3 Mon Sep 17 00:00:00 2001 From: Yang Keao Date: Thu, 9 May 2024 17:04:25 +0800 Subject: [PATCH] refactor the BaseExecutor in `IndexReaderExecutor` Signed-off-by: Yang Keao --- pkg/executor/builder.go | 70 ++++++++++--------- pkg/executor/distsql.go | 64 ++++++++++++----- pkg/executor/index_merge_reader.go | 2 +- .../table_readers_required_rows_test.go | 20 ++++-- 4 files changed, 101 insertions(+), 55 deletions(-) diff --git a/pkg/executor/builder.go b/pkg/executor/builder.go index a192fa4ae9856..fa7245dc2e55a 100644 --- a/pkg/executor/builder.go +++ b/pkg/executor/builder.go @@ -54,6 +54,7 @@ import ( "github.com/pingcap/tidb/pkg/executor/unionexec" "github.com/pingcap/tidb/pkg/expression" "github.com/pingcap/tidb/pkg/expression/aggregation" + exprctx "github.com/pingcap/tidb/pkg/expression/context" "github.com/pingcap/tidb/pkg/infoschema" "github.com/pingcap/tidb/pkg/kv" "github.com/pingcap/tidb/pkg/parser/ast" @@ -3423,7 +3424,7 @@ func (b *executorBuilder) buildTableReader(v *plannercore.PhysicalTableReader) e return ret } -func buildIndexRangeForEachPartition(ctx sessionctx.Context, usedPartitions []table.PhysicalTable, contentPos []int64, +func buildIndexRangeForEachPartition(ectx exprctx.BuildContext, rctx *rangerctx.RangerContext, usedPartitions []table.PhysicalTable, contentPos []int64, lookUpContent []*join.IndexJoinLookUpContent, indexRanges []*ranger.Range, keyOff2IdxOff []int, cwc *plannercore.ColWithCmpFuncManager) (map[int64][]*ranger.Range, error) { contentBucket := make(map[int64][]*join.IndexJoinLookUpContent) for _, p := range usedPartitions { @@ -3436,7 +3437,7 @@ func buildIndexRangeForEachPartition(ctx sessionctx.Context, usedPartitions []ta } nextRange := make(map[int64][]*ranger.Range) for _, p := range usedPartitions { - ranges, err := buildRangesForIndexJoin(ctx, contentBucket[p.GetPhysicalID()], indexRanges, keyOff2IdxOff, cwc) + ranges, err := buildRangesForIndexJoin(rctx, contentBucket[p.GetPhysicalID()], indexRanges, keyOff2IdxOff, cwc) if err != nil { return nil, err } @@ -3564,28 +3565,29 @@ func buildNoRangeIndexReader(b *executorBuilder, v *plannercore.PhysicalIndexRea paging := b.ctx.GetSessionVars().EnablePaging e := &IndexReaderExecutor{ - BaseExecutor: exec.NewBaseExecutor(b.ctx, v.Schema(), v.ID()), - indexUsageReporter: b.buildIndexUsageReporter(v), - dagPB: dagReq, - startTS: startTS, - txnScope: b.txnScope, - readReplicaScope: b.readReplicaScope, - isStaleness: b.isStaleness, - netDataSize: v.GetNetDataSize(), - physicalTableID: physicalTableID, - table: tbl, - index: is.Index, - keepOrder: is.KeepOrder, - desc: is.Desc, - columns: is.Columns, - byItems: is.ByItems, - paging: paging, - corColInFilter: b.corColInDistPlan(v.IndexPlans), - corColInAccess: b.corColInAccess(v.IndexPlans[0]), - idxCols: is.IdxCols, - colLens: is.IdxColLens, - plans: v.IndexPlans, - outputColumns: v.OutputColumns, + indexReaderExecutorContext: newIndexReaderExecutorContext(b.ctx), + BaseExecutorV2: exec.NewBaseExecutorV2(b.ctx.GetSessionVars(), v.Schema(), v.ID()), + indexUsageReporter: b.buildIndexUsageReporter(v), + dagPB: dagReq, + startTS: startTS, + txnScope: b.txnScope, + readReplicaScope: b.readReplicaScope, + isStaleness: b.isStaleness, + netDataSize: v.GetNetDataSize(), + physicalTableID: physicalTableID, + table: tbl, + index: is.Index, + keepOrder: is.KeepOrder, + desc: is.Desc, + columns: is.Columns, + byItems: is.ByItems, + paging: paging, + corColInFilter: b.corColInDistPlan(v.IndexPlans), + corColInAccess: b.corColInAccess(v.IndexPlans[0]), + idxCols: is.IdxCols, + colLens: is.IdxColLens, + plans: v.IndexPlans, + outputColumns: v.OutputColumns, } for _, col := range v.OutputColumns { @@ -4379,7 +4381,7 @@ func (builder *dataReaderBuilder) buildIndexReaderForIndexJoin(ctx context.Conte } tbInfo := e.table.Meta() if tbInfo.GetPartitionInfo() == nil || !builder.ctx.GetSessionVars().StmtCtx.UseDynamicPartitionPrune() { - kvRanges, err := buildKvRangesForIndexJoin(e.Ctx().GetDistSQLCtx(), e.Ctx().GetRangerCtx(), e.physicalTableID, e.index.ID, lookUpContents, indexRanges, keyOff2IdxOff, cwc, memoryTracker, interruptSignal) + kvRanges, err := buildKvRangesForIndexJoin(e.dctx, e.rctx, e.physicalTableID, e.index.ID, lookUpContents, indexRanges, keyOff2IdxOff, cwc, memoryTracker, interruptSignal) if err != nil { return nil, err } @@ -4393,7 +4395,7 @@ func (builder *dataReaderBuilder) buildIndexReaderForIndexJoin(ctx context.Conte if err != nil { return nil, err } - if e.ranges, err = buildRangesForIndexJoin(e.Ctx(), lookUpContents, indexRanges, keyOff2IdxOff, cwc); err != nil { + if e.ranges, err = buildRangesForIndexJoin(e.rctx, lookUpContents, indexRanges, keyOff2IdxOff, cwc); err != nil { return nil, err } if err := exec.Open(ctx, e); err != nil { @@ -4409,7 +4411,7 @@ func (builder *dataReaderBuilder) buildIndexReaderForIndexJoin(ctx context.Conte } if len(usedPartition) != 0 { if canPrune { - rangeMap, err := buildIndexRangeForEachPartition(e.Ctx(), usedPartition, contentPos, lookUpContents, indexRanges, keyOff2IdxOff, cwc) + rangeMap, err := buildIndexRangeForEachPartition(e.ectx, e.rctx, usedPartition, contentPos, lookUpContents, indexRanges, keyOff2IdxOff, cwc) if err != nil { return nil, err } @@ -4418,7 +4420,7 @@ func (builder *dataReaderBuilder) buildIndexReaderForIndexJoin(ctx context.Conte e.partRangeMap = rangeMap } else { e.partitions = usedPartition - if e.ranges, err = buildRangesForIndexJoin(e.Ctx(), lookUpContents, indexRanges, keyOff2IdxOff, cwc); err != nil { + if e.ranges, err = buildRangesForIndexJoin(e.rctx, lookUpContents, indexRanges, keyOff2IdxOff, cwc); err != nil { return nil, err } } @@ -4455,7 +4457,7 @@ func (builder *dataReaderBuilder) buildIndexLookUpReaderForIndexJoin(ctx context if err != nil { return nil, err } - e.ranges, err = buildRangesForIndexJoin(e.Ctx(), lookUpContents, indexRanges, keyOff2IdxOff, cwc) + e.ranges, err = buildRangesForIndexJoin(e.Ctx().GetRangerCtx(), lookUpContents, indexRanges, keyOff2IdxOff, cwc) if err != nil { return nil, err } @@ -4472,7 +4474,7 @@ func (builder *dataReaderBuilder) buildIndexLookUpReaderForIndexJoin(ctx context } if len(usedPartition) != 0 { if canPrune { - rangeMap, err := buildIndexRangeForEachPartition(e.Ctx(), usedPartition, contentPos, lookUpContents, indexRanges, keyOff2IdxOff, cwc) + rangeMap, err := buildIndexRangeForEachPartition(e.Ctx().GetExprCtx(), e.Ctx().GetRangerCtx(), usedPartition, contentPos, lookUpContents, indexRanges, keyOff2IdxOff, cwc) if err != nil { return nil, err } @@ -4481,7 +4483,7 @@ func (builder *dataReaderBuilder) buildIndexLookUpReaderForIndexJoin(ctx context e.partitionRangeMap = rangeMap } else { e.prunedPartitions = usedPartition - e.ranges, err = buildRangesForIndexJoin(e.Ctx(), lookUpContents, indexRanges, keyOff2IdxOff, cwc) + e.ranges, err = buildRangesForIndexJoin(e.Ctx().GetRangerCtx(), lookUpContents, indexRanges, keyOff2IdxOff, cwc) if err != nil { return nil, err } @@ -4548,7 +4550,7 @@ func (builder *dataReaderBuilder) buildProjectionForIndexJoin( } // buildRangesForIndexJoin builds kv ranges for index join when the inner plan is index scan plan. -func buildRangesForIndexJoin(ctx sessionctx.Context, lookUpContents []*join.IndexJoinLookUpContent, +func buildRangesForIndexJoin(rctx *rangerctx.RangerContext, lookUpContents []*join.IndexJoinLookUpContent, ranges []*ranger.Range, keyOff2IdxOff []int, cwc *plannercore.ColWithCmpFuncManager) ([]*ranger.Range, error) { retRanges := make([]*ranger.Range, 0, len(ranges)*len(lookUpContents)) lastPos := len(ranges[0].LowVal) - 1 @@ -4567,7 +4569,7 @@ func buildRangesForIndexJoin(ctx sessionctx.Context, lookUpContents []*join.Inde } continue } - nextColRanges, err := cwc.BuildRangesByRow(ctx.GetRangerCtx(), content.Row) + nextColRanges, err := cwc.BuildRangesByRow(rctx, content.Row) if err != nil { return nil, err } @@ -4587,7 +4589,7 @@ func buildRangesForIndexJoin(ctx sessionctx.Context, lookUpContents []*join.Inde return retRanges, nil } - return ranger.UnionRanges(ctx.GetRangerCtx(), tmpDatumRanges, true) + return ranger.UnionRanges(rctx, tmpDatumRanges, true) } // buildKvRangesForIndexJoin builds kv ranges for index join when the inner plan is index scan plan. diff --git a/pkg/executor/distsql.go b/pkg/executor/distsql.go index 41b9857b6a3e2..d81bc30fe1cb7 100644 --- a/pkg/executor/distsql.go +++ b/pkg/executor/distsql.go @@ -33,11 +33,14 @@ import ( "github.com/pingcap/tidb/pkg/executor/internal/builder" "github.com/pingcap/tidb/pkg/executor/internal/exec" "github.com/pingcap/tidb/pkg/expression" + exprctx "github.com/pingcap/tidb/pkg/expression/context" + isctx "github.com/pingcap/tidb/pkg/infoschema/context" "github.com/pingcap/tidb/pkg/kv" "github.com/pingcap/tidb/pkg/parser/charset" "github.com/pingcap/tidb/pkg/parser/model" "github.com/pingcap/tidb/pkg/parser/mysql" "github.com/pingcap/tidb/pkg/parser/terror" + planctx "github.com/pingcap/tidb/pkg/planner/context" plannercore "github.com/pingcap/tidb/pkg/planner/core" "github.com/pingcap/tidb/pkg/planner/core/base" plannerutil "github.com/pingcap/tidb/pkg/planner/util" @@ -56,6 +59,7 @@ import ( "github.com/pingcap/tidb/pkg/util/logutil/consistency" "github.com/pingcap/tidb/pkg/util/memory" "github.com/pingcap/tidb/pkg/util/ranger" + rangerctx "github.com/pingcap/tidb/pkg/util/ranger/context" "github.com/pingcap/tidb/pkg/util/size" "github.com/pingcap/tipb/go-tipb" "go.uber.org/zap" @@ -149,23 +153,51 @@ func closeAll(objs ...Closeable) error { // rebuildIndexRanges will be called if there's correlated column in access conditions. We will rebuild the range // by substituting correlated column with the constant. -func rebuildIndexRanges(ctx sessionctx.Context, is *plannercore.PhysicalIndexScan, idxCols []*expression.Column, colLens []int) (ranges []*ranger.Range, err error) { +func rebuildIndexRanges(ectx exprctx.BuildContext, rctx *rangerctx.RangerContext, is *plannercore.PhysicalIndexScan, idxCols []*expression.Column, colLens []int) (ranges []*ranger.Range, err error) { access := make([]expression.Expression, 0, len(is.AccessCondition)) for _, cond := range is.AccessCondition { - newCond, err1 := expression.SubstituteCorCol2Constant(ctx.GetExprCtx(), cond) + newCond, err1 := expression.SubstituteCorCol2Constant(ectx, cond) if err1 != nil { return nil, err1 } access = append(access, newCond) } // All of access conditions must be used to build ranges, so we don't limit range memory usage. - ranges, _, err = ranger.DetachSimpleCondAndBuildRangeForIndex(ctx.GetRangerCtx(), access, idxCols, colLens, 0) + ranges, _, err = ranger.DetachSimpleCondAndBuildRangeForIndex(rctx, access, idxCols, colLens, 0) return ranges, err } +type indexReaderExecutorContext struct { + dctx *distsqlctx.DistSQLContext + rctx *rangerctx.RangerContext + buildPBCtx *planctx.BuildPBContext + ectx exprctx.BuildContext + + stmtMemTracker *memory.Tracker + + infoSchema isctx.MetaOnlyInfoSchema +} + +func (ireCtx *indexReaderExecutorContext) GetInfoSchema() isctx.MetaOnlyInfoSchema { + return ireCtx.infoSchema +} + +func newIndexReaderExecutorContext(sctx sessionctx.Context) indexReaderExecutorContext { + pctx := sctx.GetPlanCtx() + return indexReaderExecutorContext{ + dctx: sctx.GetDistSQLCtx(), + rctx: pctx.GetRangerCtx(), + buildPBCtx: pctx.GetBuildPBCtx(), + ectx: sctx.GetExprCtx(), + stmtMemTracker: sctx.GetSessionVars().StmtCtx.MemTracker, + infoSchema: pctx.GetInfoSchema(), + } +} + // IndexReaderExecutor sends dag request and reads index data from kv layer. type IndexReaderExecutor struct { - exec.BaseExecutor + indexReaderExecutorContext + exec.BaseExecutorV2 indexUsageReporter *exec.IndexUsageReporter // For a partitioned table, the IndexReaderExecutor works on a partition, so @@ -270,7 +302,7 @@ func (e *IndexReaderExecutor) buildKeyRanges(dctx *distsqlctx.DistSQLContext, ra func (e *IndexReaderExecutor) Open(ctx context.Context) error { var err error if e.corColInAccess { - e.ranges, err = rebuildIndexRanges(e.Ctx(), e.plans[0].(*plannercore.PhysicalIndexScan), e.idxCols, e.colLens) + e.ranges, err = rebuildIndexRanges(e.ectx, e.rctx, e.plans[0].(*plannercore.PhysicalIndexScan), e.idxCols, e.colLens) if err != nil { return err } @@ -283,14 +315,14 @@ func (e *IndexReaderExecutor) Open(ctx context.Context) error { if pRange, ok := e.partRangeMap[p.GetPhysicalID()]; ok { partRange = pRange } - kvRange, err := e.buildKeyRanges(e.Ctx().GetDistSQLCtx(), partRange, p.GetPhysicalID()) + kvRange, err := e.buildKeyRanges(e.dctx, partRange, p.GetPhysicalID()) if err != nil { return err } kvRanges = append(kvRanges, kvRange...) } } else { - kvRanges, err = e.buildKeyRanges(e.Ctx().GetDistSQLCtx(), e.ranges, e.physicalTableID) + kvRanges, err = e.buildKeyRanges(e.dctx, e.ranges, e.physicalTableID) } if err != nil { return err @@ -309,11 +341,11 @@ func (e *IndexReaderExecutor) buildKVReq(r []kv.KeyRange) (*kv.Request, error) { SetTxnScope(e.txnScope). SetReadReplicaScope(e.readReplicaScope). SetIsStaleness(e.isStaleness). - SetFromSessionVars(e.Ctx().GetDistSQLCtx()). - SetFromInfoSchema(e.Ctx().GetInfoSchema()). + SetFromSessionVars(e.dctx). + SetFromInfoSchema(e.GetInfoSchema()). SetMemTracker(e.memTracker). - SetClosestReplicaReadAdjuster(newClosestReadAdjuster(e.Ctx().GetDistSQLCtx(), &builder.Request, e.netDataSize)). - SetConnIDAndConnAlias(e.Ctx().GetSessionVars().ConnectionID, e.Ctx().GetSessionVars().SessionAlias) + SetClosestReplicaReadAdjuster(newClosestReadAdjuster(e.dctx, &builder.Request, e.netDataSize)). + SetConnIDAndConnAlias(e.dctx.ConnectionID, e.dctx.SessionAlias) kvReq, err := builder.Build() return kvReq, err } @@ -321,7 +353,7 @@ func (e *IndexReaderExecutor) buildKVReq(r []kv.KeyRange) (*kv.Request, error) { func (e *IndexReaderExecutor) open(ctx context.Context, kvRanges []kv.KeyRange) error { var err error if e.corColInFilter { - e.dagPB.Executors, err = builder.ConstructListBasedDistExec(e.Ctx().GetBuildPBCtx(), e.plans) + e.dagPB.Executors, err = builder.ConstructListBasedDistExec(e.buildPBCtx, e.plans) if err != nil { return err } @@ -344,7 +376,7 @@ func (e *IndexReaderExecutor) open(ctx context.Context, kvRanges []kv.KeyRange) } else { e.memTracker = memory.NewTracker(e.ID(), -1) } - e.memTracker.AttachTo(e.Ctx().GetSessionVars().StmtCtx.MemTracker) + e.memTracker.AttachTo(e.stmtMemTracker) slices.SortFunc(kvRanges, func(i, j kv.KeyRange) int { return bytes.Compare(i.StartKey, j.StartKey) }) @@ -354,7 +386,7 @@ func (e *IndexReaderExecutor) open(ctx context.Context, kvRanges []kv.KeyRange) if err != nil { return err } - e.result, err = e.SelectResult(ctx, e.Ctx().GetDistSQLCtx(), kvReq, exec.RetTypes(e), getPhysicalPlanIDs(e.plans), e.ID()) + e.result, err = e.SelectResult(ctx, e.dctx, kvReq, exec.RetTypes(e), getPhysicalPlanIDs(e.plans), e.ID()) if err != nil { return err } @@ -369,7 +401,7 @@ func (e *IndexReaderExecutor) open(ctx context.Context, kvRanges []kv.KeyRange) } var results []distsql.SelectResult for _, kvReq := range kvReqs { - result, err := e.SelectResult(ctx, e.Ctx().GetDistSQLCtx(), kvReq, exec.RetTypes(e), getPhysicalPlanIDs(e.plans), e.ID()) + result, err := e.SelectResult(ctx, e.dctx, kvReq, exec.RetTypes(e), getPhysicalPlanIDs(e.plans), e.ID()) if err != nil { return err } @@ -481,7 +513,7 @@ func (e *IndexLookUpExecutor) setDummy() { func (e *IndexLookUpExecutor) Open(ctx context.Context) error { var err error if e.corColInAccess { - e.ranges, err = rebuildIndexRanges(e.Ctx(), e.idxPlans[0].(*plannercore.PhysicalIndexScan), e.idxCols, e.colLens) + e.ranges, err = rebuildIndexRanges(e.Ctx().GetExprCtx(), e.Ctx().GetRangerCtx(), e.idxPlans[0].(*plannercore.PhysicalIndexScan), e.idxCols, e.colLens) if err != nil { return err } diff --git a/pkg/executor/index_merge_reader.go b/pkg/executor/index_merge_reader.go index 3c391c3e2d28a..c5745b9c64ec3 100644 --- a/pkg/executor/index_merge_reader.go +++ b/pkg/executor/index_merge_reader.go @@ -222,7 +222,7 @@ func (e *IndexMergeReaderExecutor) rebuildRangeForCorCol() (err error) { if e.isCorColInPartialAccess[i] { switch x := plan[0].(type) { case *plannercore.PhysicalIndexScan: - e.ranges[i], err = rebuildIndexRanges(e.Ctx(), x, x.IdxCols, x.IdxColLens) + e.ranges[i], err = rebuildIndexRanges(e.Ctx().GetExprCtx(), e.Ctx().GetRangerCtx(), x, x.IdxCols, x.IdxColLens) case *plannercore.PhysicalTableScan: e.ranges[i], err = x.ResolveCorrelatedColumns() default: diff --git a/pkg/executor/table_readers_required_rows_test.go b/pkg/executor/table_readers_required_rows_test.go index 6be0105c8a097..78b33ce38ee42 100644 --- a/pkg/executor/table_readers_required_rows_test.go +++ b/pkg/executor/table_readers_required_rows_test.go @@ -164,6 +164,17 @@ func buildMockBaseExec(sctx sessionctx.Context) exec.BaseExecutor { return baseExec } +func buildMockBaseExecV2(sctx sessionctx.Context) exec.BaseExecutorV2 { + retTypes := []*types.FieldType{types.NewFieldType(mysql.TypeDouble), types.NewFieldType(mysql.TypeLonglong)} + cols := make([]*expression.Column, len(retTypes)) + for i := range retTypes { + cols[i] = &expression.Column{Index: i, RetType: retTypes[i]} + } + schema := expression.NewSchema(cols...) + baseExec := exec.NewBaseExecutorV2(sctx.GetSessionVars(), schema, 0) + return baseExec +} + func TestTableReaderRequiredRows(t *testing.T) { maxChunkSize := defaultCtx().GetSessionVars().MaxChunkSize testCases := []struct { @@ -208,10 +219,11 @@ func TestTableReaderRequiredRows(t *testing.T) { func buildIndexReader(sctx sessionctx.Context) exec.Executor { e := &IndexReaderExecutor{ - BaseExecutor: buildMockBaseExec(sctx), - dagPB: buildMockDAGRequest(sctx), - index: &model.IndexInfo{}, - selectResultHook: selectResultHook{mockSelectResult}, + indexReaderExecutorContext: newIndexReaderExecutorContext(sctx), + BaseExecutorV2: buildMockBaseExecV2(sctx), + dagPB: buildMockDAGRequest(sctx), + index: &model.IndexInfo{}, + selectResultHook: selectResultHook{mockSelectResult}, } return e }