diff --git a/pkg/ddl/index_cop.go b/pkg/ddl/index_cop.go index 739361abd5cbc..4a6e058f3d779 100644 --- a/pkg/ddl/index_cop.go +++ b/pkg/ddl/index_cop.go @@ -302,7 +302,7 @@ func fetchTableScanResult( } err = table.FillVirtualColumnValue( copCtx.VirtualColumnsFieldTypes, copCtx.VirtualColumnsOutputOffsets, - copCtx.ExprColumnInfos, copCtx.ColumnInfos, copCtx.SessionContext, chk) + copCtx.ExprColumnInfos, copCtx.ColumnInfos, copCtx.SessionContext.GetExprCtx(), chk) return false, err } diff --git a/pkg/distsql/BUILD.bazel b/pkg/distsql/BUILD.bazel index 262e94ffdf023..abd764a6f83b5 100644 --- a/pkg/distsql/BUILD.bazel +++ b/pkg/distsql/BUILD.bazel @@ -12,6 +12,7 @@ go_library( deps = [ "//pkg/config", "//pkg/ddl/placement", + "//pkg/distsql/context", "//pkg/errctx", "//pkg/errno", "//pkg/expression", @@ -21,7 +22,6 @@ go_library( "//pkg/parser/mysql", "//pkg/parser/terror", "//pkg/planner/util", - "//pkg/sessionctx", "//pkg/sessionctx/stmtctx", "//pkg/sessionctx/variable", "//pkg/store/copr", diff --git a/pkg/distsql/context/BUILD.bazel b/pkg/distsql/context/BUILD.bazel new file mode 100644 index 0000000000000..f0e4f4d8ae156 --- /dev/null +++ b/pkg/distsql/context/BUILD.bazel @@ -0,0 +1,12 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library") + +go_library( + name = "context", + srcs = ["context.go"], + importpath = "github.com/pingcap/tidb/pkg/distsql/context", + visibility = ["//visibility:public"], + deps = [ + "//pkg/kv", + "//pkg/sessionctx/variable", + ], +) diff --git a/pkg/distsql/context/context.go b/pkg/distsql/context/context.go new file mode 100644 index 0000000000000..4fec70980f809 --- /dev/null +++ b/pkg/distsql/context/context.go @@ -0,0 +1,28 @@ +// Copyright 2024 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package context + +import ( + "github.com/pingcap/tidb/pkg/kv" + "github.com/pingcap/tidb/pkg/sessionctx/variable" +) + +// DistSQLContext gives the interface +type DistSQLContext interface { + // GetSessionVars gets the session variables. + GetSessionVars() *variable.SessionVars + // GetClient gets a kv.Client. + GetClient() kv.Client +} diff --git a/pkg/distsql/distsql.go b/pkg/distsql/distsql.go index cb433dd25e6a7..af8120f3f96ff 100644 --- a/pkg/distsql/distsql.go +++ b/pkg/distsql/distsql.go @@ -21,9 +21,9 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/tidb/pkg/config" + distsqlctx "github.com/pingcap/tidb/pkg/distsql/context" "github.com/pingcap/tidb/pkg/kv" "github.com/pingcap/tidb/pkg/metrics" - "github.com/pingcap/tidb/pkg/sessionctx" "github.com/pingcap/tidb/pkg/sessionctx/stmtctx" "github.com/pingcap/tidb/pkg/sessionctx/variable" "github.com/pingcap/tidb/pkg/types" @@ -37,14 +37,14 @@ import ( ) // GenSelectResultFromMPPResponse generates an iterator from response. -func GenSelectResultFromMPPResponse(sctx sessionctx.Context, fieldTypes []*types.FieldType, planIDs []int, rootID int, resp kv.Response) SelectResult { +func GenSelectResultFromMPPResponse(dctx distsqlctx.DistSQLContext, fieldTypes []*types.FieldType, planIDs []int, rootID int, resp kv.Response) SelectResult { // TODO: Add metric label and set open tracing. return &selectResult{ label: "mpp", resp: resp, rowLen: len(fieldTypes), fieldTypes: fieldTypes, - ctx: sctx, + ctx: dctx, copPlanIDs: planIDs, rootPlanID: rootID, storeType: kv.TiFlash, @@ -53,7 +53,7 @@ func GenSelectResultFromMPPResponse(sctx sessionctx.Context, fieldTypes []*types // Select sends a DAG request, returns SelectResult. // In kvReq, KeyRanges is required, Concurrency/KeepOrder/Desc/IsolationLevel/Priority are optional. -func Select(ctx context.Context, sctx sessionctx.Context, kvReq *kv.Request, fieldTypes []*types.FieldType) (SelectResult, error) { +func Select(ctx context.Context, dctx distsqlctx.DistSQLContext, kvReq *kv.Request, fieldTypes []*types.FieldType) (SelectResult, error) { r, ctx := tracing.StartRegionEx(ctx, "distsql.Select") defer r.End() @@ -62,8 +62,8 @@ func Select(ctx context.Context, sctx sessionctx.Context, kvReq *kv.Request, fie hook.(func(*kv.Request))(kvReq) } - enabledRateLimitAction := sctx.GetSessionVars().EnabledRateLimitAction - originalSQL := sctx.GetSessionVars().StmtCtx.OriginalSQL + enabledRateLimitAction := dctx.GetSessionVars().EnabledRateLimitAction + originalSQL := dctx.GetSessionVars().StmtCtx.OriginalSQL eventCb := func(event trxevents.TransactionEvent) { // Note: Do not assume this callback will be invoked within the same goroutine. if copMeetLock := event.GetCopMeetLock(); copMeetLock != nil { @@ -74,27 +74,27 @@ func Select(ctx context.Context, sctx sessionctx.Context, kvReq *kv.Request, fie } } - ctx = WithSQLKvExecCounterInterceptor(ctx, sctx.GetSessionVars().StmtCtx) + ctx = WithSQLKvExecCounterInterceptor(ctx, dctx.GetSessionVars().StmtCtx) option := &kv.ClientSendOption{ - SessionMemTracker: sctx.GetSessionVars().MemTracker, + SessionMemTracker: dctx.GetSessionVars().MemTracker, EnabledRateLimitAction: enabledRateLimitAction, EventCb: eventCb, EnableCollectExecutionInfo: config.GetGlobalConfig().Instance.EnableCollectExecutionInfo.Load(), } if kvReq.StoreType == kv.TiFlash { - ctx = SetTiFlashConfVarsInContext(ctx, sctx) - option.TiFlashReplicaRead = sctx.GetSessionVars().TiFlashReplicaRead - option.AppendWarning = sctx.GetSessionVars().StmtCtx.AppendWarning + ctx = SetTiFlashConfVarsInContext(ctx, dctx.GetSessionVars()) + option.TiFlashReplicaRead = dctx.GetSessionVars().TiFlashReplicaRead + option.AppendWarning = dctx.GetSessionVars().StmtCtx.AppendWarning } - resp := sctx.GetClient().Send(ctx, kvReq, sctx.GetSessionVars().KVVars, option) + resp := dctx.GetClient().Send(ctx, kvReq, dctx.GetSessionVars().KVVars, option) if resp == nil { return nil, errors.New("client returns nil response") } label := metrics.LblGeneral - if sctx.GetSessionVars().InRestrictedSQL { + if dctx.GetSessionVars().InRestrictedSQL { label = metrics.LblInternal } @@ -106,7 +106,7 @@ func Select(ctx context.Context, sctx sessionctx.Context, kvReq *kv.Request, fie resp: resp, rowLen: len(fieldTypes), fieldTypes: fieldTypes, - ctx: sctx, + ctx: dctx, sqlType: label, memTracker: kvReq.MemTracker, storeType: kvReq.StoreType, @@ -116,34 +116,34 @@ func Select(ctx context.Context, sctx sessionctx.Context, kvReq *kv.Request, fie } // SetTiFlashConfVarsInContext set some TiFlash config variables in context. -func SetTiFlashConfVarsInContext(ctx context.Context, sctx sessionctx.Context) context.Context { - if sctx.GetSessionVars().TiFlashMaxThreads != -1 { - ctx = metadata.AppendToOutgoingContext(ctx, variable.TiDBMaxTiFlashThreads, strconv.FormatInt(sctx.GetSessionVars().TiFlashMaxThreads, 10)) +func SetTiFlashConfVarsInContext(ctx context.Context, vars *variable.SessionVars) context.Context { + if vars.TiFlashMaxThreads != -1 { + ctx = metadata.AppendToOutgoingContext(ctx, variable.TiDBMaxTiFlashThreads, strconv.FormatInt(vars.TiFlashMaxThreads, 10)) } - if sctx.GetSessionVars().TiFlashMaxBytesBeforeExternalJoin != -1 { - ctx = metadata.AppendToOutgoingContext(ctx, variable.TiDBMaxBytesBeforeTiFlashExternalJoin, strconv.FormatInt(sctx.GetSessionVars().TiFlashMaxBytesBeforeExternalJoin, 10)) + if vars.TiFlashMaxBytesBeforeExternalJoin != -1 { + ctx = metadata.AppendToOutgoingContext(ctx, variable.TiDBMaxBytesBeforeTiFlashExternalJoin, strconv.FormatInt(vars.TiFlashMaxBytesBeforeExternalJoin, 10)) } - if sctx.GetSessionVars().TiFlashMaxBytesBeforeExternalGroupBy != -1 { - ctx = metadata.AppendToOutgoingContext(ctx, variable.TiDBMaxBytesBeforeTiFlashExternalGroupBy, strconv.FormatInt(sctx.GetSessionVars().TiFlashMaxBytesBeforeExternalGroupBy, 10)) + if vars.TiFlashMaxBytesBeforeExternalGroupBy != -1 { + ctx = metadata.AppendToOutgoingContext(ctx, variable.TiDBMaxBytesBeforeTiFlashExternalGroupBy, strconv.FormatInt(vars.TiFlashMaxBytesBeforeExternalGroupBy, 10)) } - if sctx.GetSessionVars().TiFlashMaxBytesBeforeExternalSort != -1 { - ctx = metadata.AppendToOutgoingContext(ctx, variable.TiDBMaxBytesBeforeTiFlashExternalSort, strconv.FormatInt(sctx.GetSessionVars().TiFlashMaxBytesBeforeExternalSort, 10)) + if vars.TiFlashMaxBytesBeforeExternalSort != -1 { + ctx = metadata.AppendToOutgoingContext(ctx, variable.TiDBMaxBytesBeforeTiFlashExternalSort, strconv.FormatInt(vars.TiFlashMaxBytesBeforeExternalSort, 10)) } - if sctx.GetSessionVars().TiFlashMaxQueryMemoryPerNode <= 0 { + if vars.TiFlashMaxQueryMemoryPerNode <= 0 { ctx = metadata.AppendToOutgoingContext(ctx, variable.TiFlashMemQuotaQueryPerNode, "0") } else { - ctx = metadata.AppendToOutgoingContext(ctx, variable.TiFlashMemQuotaQueryPerNode, strconv.FormatInt(sctx.GetSessionVars().TiFlashMaxQueryMemoryPerNode, 10)) + ctx = metadata.AppendToOutgoingContext(ctx, variable.TiFlashMemQuotaQueryPerNode, strconv.FormatInt(vars.TiFlashMaxQueryMemoryPerNode, 10)) } - ctx = metadata.AppendToOutgoingContext(ctx, variable.TiFlashQuerySpillRatio, strconv.FormatFloat(sctx.GetSessionVars().TiFlashQuerySpillRatio, 'f', -1, 64)) + ctx = metadata.AppendToOutgoingContext(ctx, variable.TiFlashQuerySpillRatio, strconv.FormatFloat(vars.TiFlashQuerySpillRatio, 'f', -1, 64)) return ctx } // SelectWithRuntimeStats sends a DAG request, returns SelectResult. // The difference from Select is that SelectWithRuntimeStats will set copPlanIDs into selectResult, // which can help selectResult to collect runtime stats. -func SelectWithRuntimeStats(ctx context.Context, sctx sessionctx.Context, kvReq *kv.Request, +func SelectWithRuntimeStats(ctx context.Context, dctx distsqlctx.DistSQLContext, kvReq *kv.Request, fieldTypes []*types.FieldType, copPlanIDs []int, rootPlanID int) (SelectResult, error) { - sr, err := Select(ctx, sctx, kvReq, fieldTypes) + sr, err := Select(ctx, dctx, kvReq, fieldTypes) if err != nil { return nil, err } @@ -198,7 +198,7 @@ func Checksum(ctx context.Context, client kv.Client, kvReq *kv.Request, vars any // methods are: // 1. TypeChunk: the result is encoded using the Chunk format, refer util/chunk/chunk.go // 2. TypeDefault: the result is encoded row by row -func SetEncodeType(ctx sessionctx.Context, dagReq *tipb.DAGRequest) { +func SetEncodeType(ctx distsqlctx.DistSQLContext, dagReq *tipb.DAGRequest) { if canUseChunkRPC(ctx) { dagReq.EncodeType = tipb.EncodeType_TypeChunk setChunkMemoryLayout(dagReq) @@ -207,7 +207,7 @@ func SetEncodeType(ctx sessionctx.Context, dagReq *tipb.DAGRequest) { } } -func canUseChunkRPC(ctx sessionctx.Context) bool { +func canUseChunkRPC(ctx distsqlctx.DistSQLContext) bool { if !ctx.GetSessionVars().EnableChunkRPC { return false } diff --git a/pkg/distsql/select_result.go b/pkg/distsql/select_result.go index c90185bfc6c64..55d3da05d363b 100644 --- a/pkg/distsql/select_result.go +++ b/pkg/distsql/select_result.go @@ -26,13 +26,13 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/failpoint" "github.com/pingcap/tidb/pkg/config" + dcontext "github.com/pingcap/tidb/pkg/distsql/context" "github.com/pingcap/tidb/pkg/errno" "github.com/pingcap/tidb/pkg/expression" "github.com/pingcap/tidb/pkg/kv" "github.com/pingcap/tidb/pkg/metrics" "github.com/pingcap/tidb/pkg/parser/terror" "github.com/pingcap/tidb/pkg/planner/util" - "github.com/pingcap/tidb/pkg/sessionctx" "github.com/pingcap/tidb/pkg/sessionctx/stmtctx" "github.com/pingcap/tidb/pkg/store/copr" "github.com/pingcap/tidb/pkg/types" @@ -286,7 +286,7 @@ type selectResult struct { rowLen int fieldTypes []*types.FieldType - ctx sessionctx.Context + ctx dcontext.DistSQLContext selectResp *tipb.SelectResponse selectRespSize int64 // record the selectResp.Size() when it is initialized. diff --git a/pkg/executor/BUILD.bazel b/pkg/executor/BUILD.bazel index d4460d2d17cd5..56e4ff37efd8c 100644 --- a/pkg/executor/BUILD.bazel +++ b/pkg/executor/BUILD.bazel @@ -104,6 +104,7 @@ go_library( "//pkg/ddl/placement", "//pkg/ddl/schematracker", "//pkg/distsql", + "//pkg/distsql/context", "//pkg/disttask/framework/handle", "//pkg/disttask/framework/proto", "//pkg/disttask/framework/storage", @@ -131,6 +132,7 @@ go_library( "//pkg/executor/sortexec", "//pkg/expression", "//pkg/expression/aggregation", + "//pkg/expression/context", "//pkg/infoschema", "//pkg/keyspace", "//pkg/kv", @@ -378,6 +380,7 @@ go_test( "//pkg/ddl/placement", "//pkg/ddl/util", "//pkg/distsql", + "//pkg/distsql/context", "//pkg/domain", "//pkg/domain/infosync", "//pkg/errctx", diff --git a/pkg/executor/analyze_col_v2.go b/pkg/executor/analyze_col_v2.go index 458ee1de7c7dd..32032f8da1b27 100644 --- a/pkg/executor/analyze_col_v2.go +++ b/pkg/executor/analyze_col_v2.go @@ -210,7 +210,7 @@ func (e *AnalyzeColumnsExecV2) decodeSampleDataWithVirtualColumn( } } } - err := table.FillVirtualColumnValue(fieldTps, virtualColIdx, schema.Columns, e.colsInfo, e.ctx, chk) + err := table.FillVirtualColumnValue(fieldTps, virtualColIdx, schema.Columns, e.colsInfo, e.ctx.GetExprCtx(), chk) if err != nil { return err } diff --git a/pkg/executor/batch_point_get.go b/pkg/executor/batch_point_get.go index 9f59a7caf57d1..f9061fe04b4ca 100644 --- a/pkg/executor/batch_point_get.go +++ b/pkg/executor/batch_point_get.go @@ -201,7 +201,7 @@ func (e *BatchPointGetExec) Next(ctx context.Context, req *chunk.Chunk) error { e.index++ } - err := table.FillVirtualColumnValue(e.virtualColumnRetFieldTypes, e.virtualColumnIndex, e.Schema().Columns, e.columns, e.Ctx(), req) + err := table.FillVirtualColumnValue(e.virtualColumnRetFieldTypes, e.virtualColumnIndex, e.Schema().Columns, e.columns, e.Ctx().GetExprCtx(), req) if err != nil { return err } diff --git a/pkg/executor/builder.go b/pkg/executor/builder.go index 069e2f1997cf9..6edd4e9a8fa15 100644 --- a/pkg/executor/builder.go +++ b/pkg/executor/builder.go @@ -56,6 +56,7 @@ import ( "github.com/pingcap/tidb/pkg/parser/ast" "github.com/pingcap/tidb/pkg/parser/model" "github.com/pingcap/tidb/pkg/parser/mysql" + planctx "github.com/pingcap/tidb/pkg/planner/context" plannercore "github.com/pingcap/tidb/pkg/planner/core" plannerutil "github.com/pingcap/tidb/pkg/planner/util" "github.com/pingcap/tidb/pkg/sessionctx" @@ -1777,11 +1778,11 @@ func (b *executorBuilder) buildTableDual(v *plannercore.PhysicalTableDual) exec. b.err = errors.Errorf("buildTableDual failed, invalid row count for dual table: %v", v.RowCount) return nil } - base := exec.NewBaseExecutor(b.ctx, v.Schema(), v.ID()) + base := exec.NewBaseExecutorV2(b.ctx.GetSessionVars(), v.Schema(), v.ID()) base.SetInitCap(v.RowCount) e := &TableDualExec{ - BaseExecutor: base, - numDualRows: v.RowCount, + BaseExecutorV2: base, + numDualRows: v.RowCount, } return e } @@ -2917,7 +2918,7 @@ func (b *executorBuilder) newDataReaderBuilder(p plannercore.PhysicalPlan) (*dat builderForDataReader.dataReaderTS = ts return &dataReaderBuilder{ - Plan: p, + plan: p, executorBuilder: &builderForDataReader, }, nil } @@ -3206,26 +3207,28 @@ func buildNoRangeTableReader(b *executorBuilder, v *plannercore.PhysicalTableRea return nil, err } paging := b.ctx.GetSessionVars().EnablePaging + e := &TableReaderExecutor{ - BaseExecutor: exec.NewBaseExecutor(b.ctx, v.Schema(), v.ID()), - dagPB: dagReq, - startTS: startTS, - txnScope: b.txnScope, - readReplicaScope: b.readReplicaScope, - isStaleness: b.isStaleness, - netDataSize: v.GetNetDataSize(), - table: tbl, - keepOrder: ts.KeepOrder, - desc: ts.Desc, - byItems: ts.ByItems, - columns: ts.Columns, - paging: paging, - corColInFilter: b.corColInDistPlan(v.TablePlans), - corColInAccess: b.corColInAccess(v.TablePlans[0]), - plans: v.TablePlans, - tablePlan: v.GetTablePlan(), - storeType: v.StoreType, - batchCop: v.ReadReqType == plannercore.BatchCop, + BaseExecutorV2: exec.NewBaseExecutorV2(b.ctx.GetSessionVars(), v.Schema(), v.ID()), + tableReaderExecutorContext: newTableReaderExecutorContext(b.ctx), + dagPB: dagReq, + startTS: startTS, + txnScope: b.txnScope, + readReplicaScope: b.readReplicaScope, + isStaleness: b.isStaleness, + netDataSize: v.GetNetDataSize(), + table: tbl, + keepOrder: ts.KeepOrder, + desc: ts.Desc, + byItems: ts.ByItems, + columns: ts.Columns, + paging: paging, + corColInFilter: b.corColInDistPlan(v.TablePlans), + corColInAccess: b.corColInAccess(v.TablePlans[0]), + plans: v.TablePlans, + tablePlan: v.GetTablePlan(), + storeType: v.StoreType, + batchCop: v.ReadReqType == plannercore.BatchCop, } e.buildVirtualColumnInfo() @@ -3389,7 +3392,7 @@ func (b *executorBuilder) buildTableReader(v *plannercore.PhysicalTableReader) e } if len(partitions) == 0 { - return &TableDualExec{BaseExecutor: ret.BaseExecutor} + return &TableDualExec{BaseExecutorV2: ret.BaseExecutorV2} } // Sort the partition is necessary to make the final multiple partition key ranges ordered. @@ -3997,7 +4000,7 @@ func (b *executorBuilder) buildIndexMergeReader(v *plannercore.PhysicalIndexMerg // 1. dataReaderBuilder calculate data range from argument, rather than plan. // 2. the result executor is already opened. type dataReaderBuilder struct { - plannercore.Plan + plan plannercore.Plan *executorBuilder selectResultHook // for testing @@ -4021,7 +4024,7 @@ func (*mockPhysicalIndexReader) MemoryUsage() (sum int64) { func (builder *dataReaderBuilder) buildExecutorForIndexJoin(ctx context.Context, lookUpContents []*indexJoinLookUpContent, indexRanges []*ranger.Range, keyOff2IdxOff []int, cwc *plannercore.ColWithCmpFuncManager, canReorderHandles bool, memTracker *memory.Tracker, interruptSignal *atomic.Value) (exec.Executor, error) { - return builder.buildExecutorForIndexJoinInternal(ctx, builder.Plan, lookUpContents, indexRanges, keyOff2IdxOff, cwc, canReorderHandles, memTracker, interruptSignal) + return builder.buildExecutorForIndexJoinInternal(ctx, builder.plan, lookUpContents, indexRanges, keyOff2IdxOff, cwc, canReorderHandles, memTracker, interruptSignal) } func (builder *dataReaderBuilder) buildExecutorForIndexJoinInternal(ctx context.Context, plan plannercore.Plan, lookUpContents []*indexJoinLookUpContent, @@ -4092,7 +4095,7 @@ func (builder *dataReaderBuilder) buildTableReaderForIndexJoin(ctx context.Conte tbInfo := e.table.Meta() if tbInfo.GetPartitionInfo() == nil || !builder.ctx.GetSessionVars().StmtCtx.UseDynamicPartitionPrune() { if v.IsCommonHandle { - kvRanges, err := buildKvRangesForIndexJoin(e.Ctx(), getPhysicalTableID(e.table), -1, lookUpContents, indexRanges, keyOff2IdxOff, cwc, memTracker, interruptSignal) + kvRanges, err := buildKvRangesForIndexJoin(e.GetSessionVars().StmtCtx, e.pctx, getPhysicalTableID(e.table), -1, lookUpContents, indexRanges, keyOff2IdxOff, cwc, memTracker, interruptSignal) if err != nil { return nil, err } @@ -4122,7 +4125,7 @@ func (builder *dataReaderBuilder) buildTableReaderForIndexJoin(ctx context.Conte kvRanges = make([]kv.KeyRange, 0, len(lookUpContents)) // lookUpContentsByPID groups lookUpContents by pid(partition) so that kv ranges for same partition can be merged. lookUpContentsByPID := make(map[int64][]*indexJoinLookUpContent) - exprCtx := e.Ctx().GetExprCtx() + exprCtx := e.ectx for _, content := range lookUpContents { for i, data := range content.keys { locateKey[keyColOffsets[i]] = data @@ -4142,7 +4145,7 @@ func (builder *dataReaderBuilder) buildTableReaderForIndexJoin(ctx context.Conte } for pid, contents := range lookUpContentsByPID { // buildKvRanges for each partition. - tmp, err := buildKvRangesForIndexJoin(e.Ctx(), pid, -1, contents, indexRanges, keyOff2IdxOff, cwc, nil, interruptSignal) + tmp, err := buildKvRangesForIndexJoin(e.GetSessionVars().StmtCtx, e.pctx, pid, -1, contents, indexRanges, keyOff2IdxOff, cwc, nil, interruptSignal) if err != nil { return nil, err } @@ -4151,7 +4154,7 @@ func (builder *dataReaderBuilder) buildTableReaderForIndexJoin(ctx context.Conte } else { kvRanges = make([]kv.KeyRange, 0, len(usedPartitions)*len(lookUpContents)) for _, p := range usedPartitionList { - tmp, err := buildKvRangesForIndexJoin(e.Ctx(), p.GetPhysicalID(), -1, lookUpContents, indexRanges, keyOff2IdxOff, cwc, memTracker, interruptSignal) + tmp, err := buildKvRangesForIndexJoin(e.GetSessionVars().StmtCtx, e.pctx, p.GetPhysicalID(), -1, lookUpContents, indexRanges, keyOff2IdxOff, cwc, memTracker, interruptSignal) if err != nil { return nil, err } @@ -4170,7 +4173,7 @@ func (builder *dataReaderBuilder) buildTableReaderForIndexJoin(ctx context.Conte if len(keyColOffsets) > 0 { locateKey := make([]types.Datum, len(pt.Cols())) kvRanges = make([]kv.KeyRange, 0, len(lookUpContents)) - exprCtx := e.Ctx().GetExprCtx() + exprCtx := e.ectx for _, content := range lookUpContents { for i, data := range content.keys { locateKey[keyColOffsets[i]] = data @@ -4267,13 +4270,13 @@ func (h kvRangeBuilderFromRangeAndPartition) buildKeyRange(ranges []*ranger.Rang // newClosestReadAdjuster let the request be sent to closest replica(within the same zone) // if response size exceeds certain threshold. -func newClosestReadAdjuster(ctx sessionctx.Context, req *kv.Request, netDataSize float64) kv.CoprRequestAdjuster { +func newClosestReadAdjuster(vars *variable.SessionVars, req *kv.Request, netDataSize float64) kv.CoprRequestAdjuster { if req.ReplicaRead != kv.ReplicaReadClosestAdaptive { return nil } return func(req *kv.Request, copTaskCount int) bool { // copTaskCount is the number of coprocessor requests - if int64(netDataSize/float64(copTaskCount)) >= ctx.GetSessionVars().ReplicaClosestReadThreshold { + if int64(netDataSize/float64(copTaskCount)) >= vars.ReplicaClosestReadThreshold { req.MatchStoreLabels = append(req.MatchStoreLabels, &metapb.StoreLabel{ Key: placement.DCLabelKey, Value: config.GetTxnScopeFromConfig(), @@ -4299,11 +4302,11 @@ func (builder *dataReaderBuilder) buildTableReaderBase(ctx context.Context, e *T SetTxnScope(e.txnScope). SetReadReplicaScope(e.readReplicaScope). SetIsStaleness(e.isStaleness). - SetFromSessionVars(e.Ctx().GetSessionVars()). - SetFromInfoSchema(e.Ctx().GetInfoSchema()). - SetClosestReplicaReadAdjuster(newClosestReadAdjuster(e.Ctx(), &reqBuilderWithRange.Request, e.netDataSize)). + SetFromSessionVars(e.GetSessionVars()). + SetFromInfoSchema(e.GetInfoSchema()). + SetClosestReplicaReadAdjuster(newClosestReadAdjuster(e.GetSessionVars(), &reqBuilderWithRange.Request, e.netDataSize)). SetPaging(e.paging). - SetConnIDAndConnAlias(e.Ctx().GetSessionVars().ConnectionID, e.Ctx().GetSessionVars().SessionAlias). + SetConnIDAndConnAlias(e.GetSessionVars().ConnectionID, e.GetSessionVars().SessionAlias). Build() if err != nil { return nil, err @@ -4351,7 +4354,7 @@ func (builder *dataReaderBuilder) buildIndexReaderForIndexJoin(ctx context.Conte } tbInfo := e.table.Meta() if tbInfo.GetPartitionInfo() == nil || !builder.ctx.GetSessionVars().StmtCtx.UseDynamicPartitionPrune() { - kvRanges, err := buildKvRangesForIndexJoin(e.Ctx(), e.physicalTableID, e.index.ID, lookUpContents, indexRanges, keyOff2IdxOff, cwc, memoryTracker, interruptSignal) + kvRanges, err := buildKvRangesForIndexJoin(e.Ctx().GetSessionVars().StmtCtx, e.Ctx().GetPlanCtx(), e.physicalTableID, e.index.ID, lookUpContents, indexRanges, keyOff2IdxOff, cwc, memoryTracker, interruptSignal) if err != nil { return nil, err } @@ -4408,7 +4411,7 @@ func (builder *dataReaderBuilder) buildIndexReaderForIndexJoin(ctx context.Conte } return e, nil } - ret := &TableDualExec{BaseExecutor: e.BaseExecutor} + ret := &TableDualExec{BaseExecutorV2: e.BaseExecutorV2} err = exec.Open(ctx, ret) return ret, err } @@ -4422,7 +4425,7 @@ func (builder *dataReaderBuilder) buildIndexLookUpReaderForIndexJoin(ctx context tbInfo := e.table.Meta() if tbInfo.GetPartitionInfo() == nil || !builder.ctx.GetSessionVars().StmtCtx.UseDynamicPartitionPrune() { - e.kvRanges, err = buildKvRangesForIndexJoin(e.Ctx(), getPhysicalTableID(e.table), e.index.ID, lookUpContents, indexRanges, keyOff2IdxOff, cwc, memTracker, interruptSignal) + e.kvRanges, err = buildKvRangesForIndexJoin(e.Ctx().GetSessionVars().StmtCtx, e.Ctx().GetPlanCtx(), getPhysicalTableID(e.table), e.index.ID, lookUpContents, indexRanges, keyOff2IdxOff, cwc, memTracker, interruptSignal) if err != nil { return nil, err } @@ -4482,7 +4485,7 @@ func (builder *dataReaderBuilder) buildIndexLookUpReaderForIndexJoin(ctx context } return e, err } - ret := &TableDualExec{BaseExecutor: e.BaseExecutor} + ret := &TableDualExec{BaseExecutorV2: e.BaseExecutorV2} err = exec.Open(ctx, ret) return ret, err } @@ -4565,14 +4568,13 @@ func buildRangesForIndexJoin(ctx sessionctx.Context, lookUpContents []*indexJoin } // buildKvRangesForIndexJoin builds kv ranges for index join when the inner plan is index scan plan. -func buildKvRangesForIndexJoin(ctx sessionctx.Context, tableID, indexID int64, lookUpContents []*indexJoinLookUpContent, +func buildKvRangesForIndexJoin(sc *stmtctx.StatementContext, pctx planctx.PlanContext, tableID, indexID int64, lookUpContents []*indexJoinLookUpContent, ranges []*ranger.Range, keyOff2IdxOff []int, cwc *plannercore.ColWithCmpFuncManager, memTracker *memory.Tracker, interruptSignal *atomic.Value) (_ []kv.KeyRange, err error) { kvRanges := make([]kv.KeyRange, 0, len(ranges)*len(lookUpContents)) if len(ranges) == 0 { return []kv.KeyRange{}, nil } lastPos := len(ranges[0].LowVal) - 1 - sc := ctx.GetSessionVars().StmtCtx tmpDatumRanges := make([]*ranger.Range, 0, len(lookUpContents)) for _, content := range lookUpContents { for _, ran := range ranges { @@ -4596,7 +4598,7 @@ func buildKvRangesForIndexJoin(ctx sessionctx.Context, tableID, indexID int64, l kvRanges = tmpKvRanges.AppendSelfTo(kvRanges) continue } - nextColRanges, err := cwc.BuildRangesByRow(ctx.GetPlanCtx(), content.row) + nextColRanges, err := cwc.BuildRangesByRow(pctx, content.row) if err != nil { return nil, err } @@ -4627,16 +4629,16 @@ func buildKvRangesForIndexJoin(ctx sessionctx.Context, tableID, indexID int64, l return kvRanges, nil } - tmpDatumRanges, err = ranger.UnionRanges(ctx.GetPlanCtx(), tmpDatumRanges, true) + tmpDatumRanges, err = ranger.UnionRanges(pctx, tmpDatumRanges, true) if err != nil { return nil, err } // Index id is -1 means it's a common handle. if indexID == -1 { - tmpKeyRanges, err := distsql.CommonHandleRangesToKVRanges(ctx.GetSessionVars().StmtCtx, []int64{tableID}, tmpDatumRanges) + tmpKeyRanges, err := distsql.CommonHandleRangesToKVRanges(sc, []int64{tableID}, tmpDatumRanges) return tmpKeyRanges.FirstPartitionRange(), err } - tmpKeyRanges, err := distsql.IndexRangesToKVRangesWithInterruptSignal(ctx.GetSessionVars().StmtCtx, tableID, indexID, tmpDatumRanges, memTracker, interruptSignal) + tmpKeyRanges, err := distsql.IndexRangesToKVRangesWithInterruptSignal(sc, tableID, indexID, tmpDatumRanges, memTracker, interruptSignal) return tmpKeyRanges.FirstPartitionRange(), err } diff --git a/pkg/executor/distsql.go b/pkg/executor/distsql.go index 1e01546905f8b..078712f713d43 100644 --- a/pkg/executor/distsql.go +++ b/pkg/executor/distsql.go @@ -312,7 +312,7 @@ func (e *IndexReaderExecutor) buildKVReq(r []kv.KeyRange) (*kv.Request, error) { SetFromSessionVars(e.Ctx().GetSessionVars()). SetFromInfoSchema(e.Ctx().GetInfoSchema()). SetMemTracker(e.memTracker). - SetClosestReplicaReadAdjuster(newClosestReadAdjuster(e.Ctx(), &builder.Request, e.netDataSize)). + SetClosestReplicaReadAdjuster(newClosestReadAdjuster(e.Ctx().GetSessionVars(), &builder.Request, e.netDataSize)). SetConnIDAndConnAlias(e.Ctx().GetSessionVars().ConnectionID, e.Ctx().GetSessionVars().SessionAlias) kvReq, err := builder.Build() return kvReq, err @@ -321,7 +321,7 @@ func (e *IndexReaderExecutor) buildKVReq(r []kv.KeyRange) (*kv.Request, error) { func (e *IndexReaderExecutor) open(ctx context.Context, kvRanges []kv.KeyRange) error { var err error if e.corColInFilter { - e.dagPB.Executors, err = builder.ConstructListBasedDistExec(e.Ctx(), e.plans) + e.dagPB.Executors, err = builder.ConstructListBasedDistExec(e.Ctx().GetPlanCtx(), e.plans) if err != nil { return err } @@ -583,14 +583,14 @@ func (e *IndexLookUpExecutor) open(_ context.Context) error { var err error if e.corColInIdxSide { - e.dagPB.Executors, err = builder.ConstructListBasedDistExec(e.Ctx(), e.idxPlans) + e.dagPB.Executors, err = builder.ConstructListBasedDistExec(e.Ctx().GetPlanCtx(), e.idxPlans) if err != nil { return err } } if e.corColInTblSide { - e.tableRequest.Executors, err = builder.ConstructListBasedDistExec(e.Ctx(), e.tblPlans) + e.tableRequest.Executors, err = builder.ConstructListBasedDistExec(e.Ctx().GetPlanCtx(), e.tblPlans) if err != nil { return err } @@ -716,7 +716,7 @@ func (e *IndexLookUpExecutor) startIndexWorker(ctx context.Context, workCh chan< SetIsStaleness(e.isStaleness). SetFromSessionVars(e.Ctx().GetSessionVars()). SetFromInfoSchema(e.Ctx().GetInfoSchema()). - SetClosestReplicaReadAdjuster(newClosestReadAdjuster(e.Ctx(), &builder.Request, e.idxNetDataSize/float64(len(kvRanges)))). + SetClosestReplicaReadAdjuster(newClosestReadAdjuster(e.Ctx().GetSessionVars(), &builder.Request, e.idxNetDataSize/float64(len(kvRanges)))). SetMemTracker(tracker). SetConnIDAndConnAlias(e.Ctx().GetSessionVars().ConnectionID, e.Ctx().GetSessionVars().SessionAlias) @@ -804,18 +804,19 @@ func (e *IndexLookUpExecutor) buildTableReader(ctx context.Context, task *lookup table = task.partitionTable } tableReaderExec := &TableReaderExecutor{ - BaseExecutor: exec.NewBaseExecutor(e.Ctx(), e.Schema(), e.getTableRootPlanID()), - table: table, - dagPB: e.tableRequest, - startTS: e.startTS, - txnScope: e.txnScope, - readReplicaScope: e.readReplicaScope, - isStaleness: e.isStaleness, - columns: e.columns, - corColInFilter: e.corColInTblSide, - plans: e.tblPlans, - netDataSize: e.avgRowSize * float64(len(task.handles)), - byItems: e.byItems, + BaseExecutorV2: exec.NewBaseExecutorV2(e.Ctx().GetSessionVars(), e.Schema(), e.getTableRootPlanID()), + tableReaderExecutorContext: newTableReaderExecutorContext(e.Ctx()), + table: table, + dagPB: e.tableRequest, + startTS: e.startTS, + txnScope: e.txnScope, + readReplicaScope: e.readReplicaScope, + isStaleness: e.isStaleness, + columns: e.columns, + corColInFilter: e.corColInTblSide, + plans: e.tblPlans, + netDataSize: e.avgRowSize * float64(len(task.handles)), + byItems: e.byItems, } tableReaderExec.buildVirtualColumnInfo() tableReader, err := e.dataReaderBuilder.buildTableReaderFromHandles(ctx, tableReaderExec, task.handles, true) diff --git a/pkg/executor/executor.go b/pkg/executor/executor.go index 3e9cbceb79096..6df4eac69af0f 100644 --- a/pkg/executor/executor.go +++ b/pkg/executor/executor.go @@ -1506,7 +1506,7 @@ func init() { // TableDualExec represents a dual table executor. type TableDualExec struct { - exec.BaseExecutor + exec.BaseExecutorV2 // numDualRows can only be 0 or 1. numDualRows int diff --git a/pkg/executor/executor_pkg_test.go b/pkg/executor/executor_pkg_test.go index faf77a0a44cd9..6656df4042020 100644 --- a/pkg/executor/executor_pkg_test.go +++ b/pkg/executor/executor_pkg_test.go @@ -65,7 +65,7 @@ func TestBuildKvRangesForIndexJoinWithoutCwc(t *testing.T) { keyOff2IdxOff := []int{1, 3} ctx := mock.NewContext() - kvRanges, err := buildKvRangesForIndexJoin(ctx, 0, 0, joinKeyRows, indexRanges, keyOff2IdxOff, nil, nil, nil) + kvRanges, err := buildKvRangesForIndexJoin(ctx.GetSessionVars().StmtCtx, ctx.GetPlanCtx(), 0, 0, joinKeyRows, indexRanges, keyOff2IdxOff, nil, nil, nil) require.NoError(t, err) // Check the kvRanges is in order. for i, kvRange := range kvRanges { @@ -95,7 +95,7 @@ func TestBuildKvRangesForIndexJoinWithoutCwcAndWithMemoryTracker(t *testing.T) { keyOff2IdxOff := []int{1, 3} ctx := mock.NewContext() memTracker := memory.NewTracker(memory.LabelForIndexWorker, -1) - kvRanges, err := buildKvRangesForIndexJoin(ctx, 0, 0, joinKeyRows, indexRanges, keyOff2IdxOff, nil, memTracker, nil) + kvRanges, err := buildKvRangesForIndexJoin(ctx.GetSessionVars().StmtCtx, ctx.GetPlanCtx(), 0, 0, joinKeyRows, indexRanges, keyOff2IdxOff, nil, memTracker, nil) require.NoError(t, err) // Check the kvRanges is in order. for i, kvRange := range kvRanges { @@ -117,7 +117,7 @@ func TestBuildKvRangesForIndexJoinWithoutCwcAndWithMemoryTracker(t *testing.T) { keyOff2IdxOff := []int{1, 3} ctx := mock.NewContext() memTracker := memory.NewTracker(memory.LabelForIndexWorker, -1) - kvRanges, err := buildKvRangesForIndexJoin(ctx, 0, 0, joinKeyRows, indexRanges, keyOff2IdxOff, nil, memTracker, nil) + kvRanges, err := buildKvRangesForIndexJoin(ctx.GetSessionVars().StmtCtx, ctx.GetPlanCtx(), 0, 0, joinKeyRows, indexRanges, keyOff2IdxOff, nil, memTracker, nil) require.NoError(t, err) // Check the kvRanges is in order. for i, kvRange := range kvRanges { diff --git a/pkg/executor/index_merge_reader.go b/pkg/executor/index_merge_reader.go index 4fbfb6c0092da..e44682c5e184f 100644 --- a/pkg/executor/index_merge_reader.go +++ b/pkg/executor/index_merge_reader.go @@ -167,7 +167,7 @@ func (e *IndexMergeReaderExecutor) Open(_ context.Context) (err error) { e.keyRanges = make([][]kv.KeyRange, 0, len(e.partialPlans)) e.initRuntimeStats() if e.isCorColInTableFilter { - e.tableRequest.Executors, err = builder.ConstructListBasedDistExec(e.Ctx(), e.tblPlans) + e.tableRequest.Executors, err = builder.ConstructListBasedDistExec(e.Ctx().GetPlanCtx(), e.tblPlans) if err != nil { return err } @@ -370,7 +370,7 @@ func (e *IndexMergeReaderExecutor) startPartialIndexWorker(ctx context.Context, if e.isCorColInPartialFilters[workID] { // We got correlated column, so need to refresh Selection operator. var err error - if e.dagPBs[workID].Executors, err = builder.ConstructListBasedDistExec(e.Ctx(), e.partialPlans[workID]); err != nil { + if e.dagPBs[workID].Executors, err = builder.ConstructListBasedDistExec(e.Ctx().GetPlanCtx(), e.partialPlans[workID]); err != nil { syncErr(ctx, e.finished, fetchCh, err) return } @@ -388,7 +388,7 @@ func (e *IndexMergeReaderExecutor) startPartialIndexWorker(ctx context.Context, SetMemTracker(e.memTracker). SetPaging(e.paging). SetFromInfoSchema(e.Ctx().GetInfoSchema()). - SetClosestReplicaReadAdjuster(newClosestReadAdjuster(e.Ctx(), &builder.Request, e.partialNetDataSizes[workID])). + SetClosestReplicaReadAdjuster(newClosestReadAdjuster(e.Ctx().GetSessionVars(), &builder.Request, e.partialNetDataSizes[workID])). SetConnIDAndConnAlias(e.Ctx().GetSessionVars().ConnectionID, e.Ctx().GetSessionVars().SessionAlias) tps := worker.getRetTpsForIndexScan(e.handleCols) @@ -474,17 +474,18 @@ func (e *IndexMergeReaderExecutor) startPartialTableWorker(ctx context.Context, failpoint.Inject("testIndexMergePanicPartialTableWorker", nil) var err error partialTableReader := &TableReaderExecutor{ - BaseExecutor: exec.NewBaseExecutor(e.Ctx(), ts.Schema(), e.getPartitalPlanID(workID)), - dagPB: e.dagPBs[workID], - startTS: e.startTS, - txnScope: e.txnScope, - readReplicaScope: e.readReplicaScope, - isStaleness: e.isStaleness, - plans: e.partialPlans[workID], - ranges: e.ranges[workID], - netDataSize: e.partialNetDataSizes[workID], - keepOrder: ts.KeepOrder, - byItems: ts.ByItems, + BaseExecutorV2: exec.NewBaseExecutorV2(e.Ctx().GetSessionVars(), ts.Schema(), e.getPartitalPlanID(workID)), + tableReaderExecutorContext: newTableReaderExecutorContext(e.Ctx()), + dagPB: e.dagPBs[workID], + startTS: e.startTS, + txnScope: e.txnScope, + readReplicaScope: e.readReplicaScope, + isStaleness: e.isStaleness, + plans: e.partialPlans[workID], + ranges: e.ranges[workID], + netDataSize: e.partialNetDataSizes[workID], + keepOrder: ts.KeepOrder, + byItems: ts.ByItems, } worker := &partialTableWorker{ @@ -512,7 +513,7 @@ func (e *IndexMergeReaderExecutor) startPartialTableWorker(ctx context.Context, } if e.isCorColInPartialFilters[workID] { - if e.dagPBs[workID].Executors, err = builder.ConstructListBasedDistExec(e.Ctx(), e.partialPlans[workID]); err != nil { + if e.dagPBs[workID].Executors, err = builder.ConstructListBasedDistExec(e.Ctx().GetPlanCtx(), e.partialPlans[workID]); err != nil { syncErr(ctx, e.finished, fetchCh, err) return } @@ -785,16 +786,17 @@ func (e *IndexMergeReaderExecutor) startIndexMergeTableScanWorker(ctx context.Co func (e *IndexMergeReaderExecutor) buildFinalTableReader(ctx context.Context, tbl table.Table, handles []kv.Handle) (_ exec.Executor, err error) { tableReaderExec := &TableReaderExecutor{ - BaseExecutor: exec.NewBaseExecutor(e.Ctx(), e.Schema(), e.getTablePlanRootID()), - table: tbl, - dagPB: e.tableRequest, - startTS: e.startTS, - txnScope: e.txnScope, - readReplicaScope: e.readReplicaScope, - isStaleness: e.isStaleness, - columns: e.columns, - plans: e.tblPlans, - netDataSize: e.dataAvgRowSize * float64(len(handles)), + BaseExecutorV2: exec.NewBaseExecutorV2(e.Ctx().GetSessionVars(), e.Schema(), e.getTablePlanRootID()), + tableReaderExecutorContext: newTableReaderExecutorContext(e.Ctx()), + table: tbl, + dagPB: e.tableRequest, + startTS: e.startTS, + txnScope: e.txnScope, + readReplicaScope: e.readReplicaScope, + isStaleness: e.isStaleness, + columns: e.columns, + plans: e.tblPlans, + netDataSize: e.dataAvgRowSize * float64(len(handles)), } tableReaderExec.buildVirtualColumnInfo() // Reorder handles because SplitKeyRangesByLocations() requires startKey of kvRanges is ordered. diff --git a/pkg/executor/internal/builder/BUILD.bazel b/pkg/executor/internal/builder/BUILD.bazel index bc9afd8e240cc..b1344b35468d8 100644 --- a/pkg/executor/internal/builder/BUILD.bazel +++ b/pkg/executor/internal/builder/BUILD.bazel @@ -8,6 +8,7 @@ go_library( deps = [ "//pkg/distsql", "//pkg/kv", + "//pkg/planner/context", "//pkg/planner/core", "//pkg/sessionctx", "//pkg/util/timeutil", diff --git a/pkg/executor/internal/builder/builder_utils.go b/pkg/executor/internal/builder/builder_utils.go index a29754ab352bb..ed2781bc4aeeb 100644 --- a/pkg/executor/internal/builder/builder_utils.go +++ b/pkg/executor/internal/builder/builder_utils.go @@ -17,6 +17,7 @@ package builder import ( "github.com/pingcap/tidb/pkg/distsql" "github.com/pingcap/tidb/pkg/kv" + planctx "github.com/pingcap/tidb/pkg/planner/context" plannercore "github.com/pingcap/tidb/pkg/planner/core" "github.com/pingcap/tidb/pkg/sessionctx" "github.com/pingcap/tidb/pkg/util/timeutil" @@ -24,16 +25,16 @@ import ( ) // ConstructTreeBasedDistExec constructs tree based DAGRequest -func ConstructTreeBasedDistExec(sctx sessionctx.Context, p plannercore.PhysicalPlan) ([]*tipb.Executor, error) { - execPB, err := p.ToPB(sctx.GetPlanCtx(), kv.TiFlash) +func ConstructTreeBasedDistExec(pctx planctx.PlanContext, p plannercore.PhysicalPlan) ([]*tipb.Executor, error) { + execPB, err := p.ToPB(pctx, kv.TiFlash) return []*tipb.Executor{execPB}, err } // ConstructListBasedDistExec constructs list based DAGRequest -func ConstructListBasedDistExec(sctx sessionctx.Context, plans []plannercore.PhysicalPlan) ([]*tipb.Executor, error) { +func ConstructListBasedDistExec(pctx planctx.PlanContext, plans []plannercore.PhysicalPlan) ([]*tipb.Executor, error) { executors := make([]*tipb.Executor, 0, len(plans)) for _, p := range plans { - execPB, err := p.ToPB(sctx.GetPlanCtx(), kv.TiKV) + execPB, err := p.ToPB(pctx, kv.TiKV) if err != nil { return nil, err } @@ -54,10 +55,10 @@ func ConstructDAGReq(ctx sessionctx.Context, plans []plannercore.PhysicalPlan, s dagReq.Flags = sc.PushDownFlags() if storeType == kv.TiFlash { var executors []*tipb.Executor - executors, err = ConstructTreeBasedDistExec(ctx, plans[0]) + executors, err = ConstructTreeBasedDistExec(ctx.GetPlanCtx(), plans[0]) dagReq.RootExecutor = executors[0] } else { - dagReq.Executors, err = ConstructListBasedDistExec(ctx, plans) + dagReq.Executors, err = ConstructListBasedDistExec(ctx.GetPlanCtx(), plans) } distsql.SetEncodeType(ctx, dagReq) diff --git a/pkg/executor/internal/exec/BUILD.bazel b/pkg/executor/internal/exec/BUILD.bazel index 9948eff194f4e..3e9086fdefc4f 100644 --- a/pkg/executor/internal/exec/BUILD.bazel +++ b/pkg/executor/internal/exec/BUILD.bazel @@ -11,6 +11,7 @@ go_library( deps = [ "//pkg/domain", "//pkg/expression", + "//pkg/parser", "//pkg/sessionctx", "//pkg/sessionctx/stmtctx", "//pkg/sessionctx/variable", @@ -26,6 +27,7 @@ go_library( "//pkg/util/topsql/state", "//pkg/util/tracing", "@com_github_ngaut_pools//:pools", + "@org_uber_go_atomic//:atomic", ], ) diff --git a/pkg/executor/internal/exec/executor.go b/pkg/executor/internal/exec/executor.go index 8ebbf80ba8ee1..dd754c326a94c 100644 --- a/pkg/executor/internal/exec/executor.go +++ b/pkg/executor/internal/exec/executor.go @@ -22,7 +22,9 @@ import ( "github.com/ngaut/pools" "github.com/pingcap/tidb/pkg/domain" "github.com/pingcap/tidb/pkg/expression" + "github.com/pingcap/tidb/pkg/parser" "github.com/pingcap/tidb/pkg/sessionctx" + "github.com/pingcap/tidb/pkg/sessionctx/stmtctx" "github.com/pingcap/tidb/pkg/sessionctx/variable" "github.com/pingcap/tidb/pkg/types" "github.com/pingcap/tidb/pkg/util" @@ -32,6 +34,7 @@ import ( "github.com/pingcap/tidb/pkg/util/topsql" topsqlstate "github.com/pingcap/tidb/pkg/util/topsql/state" "github.com/pingcap/tidb/pkg/util/tracing" + "go.uber.org/atomic" ) // Executor is the physical implementation of an algebra operator. @@ -67,34 +70,68 @@ type Executor interface { var _ Executor = &BaseExecutor{} -// BaseExecutor holds common information for executors. -type BaseExecutor struct { - ctx sessionctx.Context +// executorChunkAllocator is a helper to implement `Chunk` related methods in `Executor` interface +type executorChunkAllocator struct { AllocPool chunk.Allocator - schema *expression.Schema // output schema - runtimeStats *execdetails.BasicRuntimeStats - children []Executor retFieldTypes []*types.FieldType - id int initCap int maxChunkSize int } -// NewBaseExecutor creates a new BaseExecutor instance. -func NewBaseExecutor(ctx sessionctx.Context, schema *expression.Schema, id int, children ...Executor) BaseExecutor { - e := BaseExecutor{ - children: children, - ctx: ctx, - id: id, - schema: schema, - initCap: ctx.GetSessionVars().InitChunkSize, - maxChunkSize: ctx.GetSessionVars().MaxChunkSize, - AllocPool: ctx.GetSessionVars().GetChunkAllocator(), +// newExecutorChunkAllocator creates a new `executorChunkAllocator` +func newExecutorChunkAllocator(vars *variable.SessionVars, retFieldTypes []*types.FieldType) executorChunkAllocator { + return executorChunkAllocator{ + AllocPool: vars.GetChunkAllocator(), + initCap: vars.InitChunkSize, + maxChunkSize: vars.MaxChunkSize, + retFieldTypes: retFieldTypes, } - if ctx.GetSessionVars().StmtCtx.RuntimeStatsColl != nil { - if e.id > 0 { - e.runtimeStats = e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.GetBasicRuntimeStats(id) - } +} + +// InitCap returns the initial capacity for chunk +func (e *executorChunkAllocator) InitCap() int { + return e.initCap +} + +// SetInitCap sets the initial capacity for chunk +func (e *executorChunkAllocator) SetInitCap(c int) { + e.initCap = c +} + +// MaxChunkSize returns the max chunk size. +func (e *executorChunkAllocator) MaxChunkSize() int { + return e.maxChunkSize +} + +// SetMaxChunkSize sets the max chunk size. +func (e *executorChunkAllocator) SetMaxChunkSize(size int) { + e.maxChunkSize = size +} + +// NewChunk creates a new chunk according to the executor configuration +func (e *executorChunkAllocator) NewChunk() *chunk.Chunk { + return e.NewChunkWithCapacity(e.retFieldTypes, e.InitCap(), e.MaxChunkSize()) +} + +// NewChunkWithCapacity allows the caller to allocate the chunk with any types, capacity and max size in the pool +func (e *executorChunkAllocator) NewChunkWithCapacity(fields []*types.FieldType, capacity int, maxCachesize int) *chunk.Chunk { + return e.AllocPool.Alloc(fields, capacity, maxCachesize) +} + +// executorMeta is a helper to store metadata for an execturo and implement the getter +type executorMeta struct { + schema *expression.Schema + children []Executor + retFieldTypes []*types.FieldType + id int +} + +// newExecutorMeta creates a new `executorMeta` +func newExecutorMeta(schema *expression.Schema, id int, children ...Executor) executorMeta { + e := executorMeta{ + id: id, + schema: schema, + children: children, } if schema != nil { cols := schema.Columns @@ -106,68 +143,141 @@ func NewBaseExecutor(ctx sessionctx.Context, schema *expression.Schema, id int, return e } -// RuntimeStats returns the runtime stats of an executor. -func (e *BaseExecutor) RuntimeStats() *execdetails.BasicRuntimeStats { - return e.runtimeStats +// NewChunkWithCapacity allows the caller to allocate the chunk with any types, capacity and max size in the pool +func (e *executorMeta) RetFieldTypes() []*types.FieldType { + return e.retFieldTypes } // ID returns the id of an executor. -func (e *BaseExecutor) ID() int { +func (e *executorMeta) ID() int { return e.id } // AllChildren returns all children. -func (e *BaseExecutor) AllChildren() []Executor { +func (e *executorMeta) AllChildren() []Executor { return e.children } // ChildrenLen returns the length of children. -func (e *BaseExecutor) ChildrenLen() int { +func (e *executorMeta) ChildrenLen() int { return len(e.children) } // EmptyChildren judges whether the children is empty. -func (e *BaseExecutor) EmptyChildren() bool { +func (e *executorMeta) EmptyChildren() bool { return len(e.children) == 0 } // SetChildren sets the children for an executor. -func (e *BaseExecutor) SetChildren(idx int, ex Executor) { +func (e *executorMeta) SetChildren(idx int, ex Executor) { e.children[idx] = ex } // Children returns the children for an executor. -func (e *BaseExecutor) Children(idx int) Executor { +func (e *executorMeta) Children(idx int) Executor { return e.children[idx] } -// RetFieldTypes returns the return field types of an executor. -func (e *BaseExecutor) RetFieldTypes() []*types.FieldType { - return e.retFieldTypes +// Schema returns the current BaseExecutor's schema. If it is nil, then create and return a new one. +func (e *executorMeta) Schema() *expression.Schema { + if e.schema == nil { + return expression.NewSchema() + } + return e.schema } -// InitCap returns the initial capacity for chunk -func (e *BaseExecutor) InitCap() int { - return e.initCap +// GetSchema gets the schema. +func (e *executorMeta) GetSchema() *expression.Schema { + return e.schema } -// SetInitCap sets the initial capacity for chunk -func (e *BaseExecutor) SetInitCap(c int) { - e.initCap = c +// executorStats is a helper to implement the stats related methods for `Executor` +type executorStats struct { + runtimeStats *execdetails.BasicRuntimeStats + isSQLAndPlanRegistered *atomic.Bool + sqlDigest *parser.Digest + planDigest *parser.Digest + normalizedSQL string + normalizedPlan string + inRestrictedSQL bool } -// MaxChunkSize returns the max chunk size. -func (e *BaseExecutor) MaxChunkSize() int { - return e.maxChunkSize +// newExecutorStats creates a new `executorStats` +func newExecutorStats(stmtCtx *stmtctx.StatementContext, id int) executorStats { + normalizedSQL, sqlDigest := stmtCtx.SQLDigest() + normalizedPlan, planDigest := stmtCtx.GetPlanDigest() + e := executorStats{ + isSQLAndPlanRegistered: &stmtCtx.IsSQLAndPlanRegistered, + normalizedSQL: normalizedSQL, + sqlDigest: sqlDigest, + normalizedPlan: normalizedPlan, + planDigest: planDigest, + inRestrictedSQL: stmtCtx.InRestrictedSQL, + } + + if stmtCtx.RuntimeStatsColl != nil { + if id > 0 { + e.runtimeStats = stmtCtx.RuntimeStatsColl.GetBasicRuntimeStats(id) + } + } + + return e } -// SetMaxChunkSize sets the max chunk size. -func (e *BaseExecutor) SetMaxChunkSize(size int) { - e.maxChunkSize = size +// RuntimeStats returns the runtime stats of an executor. +func (e *executorStats) RuntimeStats() *execdetails.BasicRuntimeStats { + return e.runtimeStats +} + +// RegisterSQLAndPlanInExecForTopSQL registers the current SQL and Plan on top sql +func (e *executorStats) RegisterSQLAndPlanInExecForTopSQL() { + if topsqlstate.TopSQLEnabled() && e.isSQLAndPlanRegistered.CompareAndSwap(false, true) { + topsql.RegisterSQL(e.normalizedSQL, e.sqlDigest, e.inRestrictedSQL) + if len(e.normalizedPlan) > 0 { + topsql.RegisterPlan(e.normalizedPlan, e.planDigest) + } + } +} + +type signalHandler interface { + HandleSignal() error +} + +// executorKillerHandler is a helper to implement the killer related methods for `Executor`. +type executorKillerHandler struct { + handler signalHandler +} + +func (e *executorKillerHandler) HandleSQLKillerSignal() error { + return e.handler.HandleSignal() +} + +func newExecutorKillerHandler(handler signalHandler) executorKillerHandler { + return executorKillerHandler{handler} +} + +// BaseExecutorV2 is a simplified version of `BaseExecutor`, which doesn't contain a full session context +type BaseExecutorV2 struct { + executorMeta + executorKillerHandler + executorStats + executorChunkAllocator +} + +// NewBaseExecutorV2 creates a new BaseExecutorV2 instance. +func NewBaseExecutorV2(vars *variable.SessionVars, schema *expression.Schema, id int, children ...Executor) BaseExecutorV2 { + executorMeta := newExecutorMeta(schema, id, children...) + e := BaseExecutorV2{ + executorMeta: executorMeta, + executorStats: newExecutorStats(vars.StmtCtx, id), + executorChunkAllocator: newExecutorChunkAllocator(vars, executorMeta.RetFieldTypes()), + executorKillerHandler: newExecutorKillerHandler(&vars.SQLKiller), + } + return e } // Open initializes children recursively and "childrenResults" according to children's schemas. -func (e *BaseExecutor) Open(ctx context.Context) error { +func (e *BaseExecutorV2) Open(ctx context.Context) error { for _, child := range e.children { err := Open(ctx, child) if err != nil { @@ -178,7 +288,7 @@ func (e *BaseExecutor) Open(ctx context.Context) error { } // Close closes all executors and release all resources. -func (e *BaseExecutor) Close() error { +func (e *BaseExecutorV2) Close() error { var firstErr error for _, src := range e.children { if err := Close(src); err != nil && firstErr == nil { @@ -188,29 +298,31 @@ func (e *BaseExecutor) Close() error { return firstErr } -// Schema returns the current BaseExecutor's schema. If it is nil, then create and return a new one. -func (e *BaseExecutor) Schema() *expression.Schema { - if e.schema == nil { - return expression.NewSchema() - } - return e.schema -} - // Next fills multiple rows into a chunk. -func (*BaseExecutor) Next(_ context.Context, _ *chunk.Chunk) error { +func (*BaseExecutorV2) Next(_ context.Context, _ *chunk.Chunk) error { return nil } +// BaseExecutor holds common information for executors. +type BaseExecutor struct { + ctx sessionctx.Context + + BaseExecutorV2 +} + +// NewBaseExecutor creates a new BaseExecutor instance. +func NewBaseExecutor(ctx sessionctx.Context, schema *expression.Schema, id int, children ...Executor) BaseExecutor { + return BaseExecutor{ + ctx: ctx, + BaseExecutorV2: NewBaseExecutorV2(ctx.GetSessionVars(), schema, id, children...), + } +} + // Ctx return ```sessionctx.Context``` of Executor func (e *BaseExecutor) Ctx() sessionctx.Context { return e.ctx } -// GetSchema gets the schema. -func (e *BaseExecutor) GetSchema() *expression.Schema { - return e.schema -} - // UpdateDeltaForTableID updates the delta info for the table with tableID. func (e *BaseExecutor) UpdateDeltaForTableID(id int64) { txnCtx := e.ctx.GetSessionVars().TxnCtx @@ -244,30 +356,6 @@ func (e *BaseExecutor) ReleaseSysSession(ctx context.Context, sctx sessionctx.Co sysSessionPool.Put(sctx.(pools.Resource)) } -// NewChunk creates a new chunk according to the executor configuration -func (e *BaseExecutor) NewChunk() *chunk.Chunk { - return e.NewChunkWithCapacity(e.RetFieldTypes(), e.InitCap(), e.MaxChunkSize()) -} - -// NewChunkWithCapacity allows the caller to allocate the chunk with any types, capacity and max size in the pool -func (e *BaseExecutor) NewChunkWithCapacity(fields []*types.FieldType, capacity int, maxCachesize int) *chunk.Chunk { - return e.AllocPool.Alloc(fields, capacity, maxCachesize) -} - -// HandleSQLKillerSignal handles the signal sent by SQLKiller -func (e *BaseExecutor) HandleSQLKillerSignal() error { - return e.ctx.GetSessionVars().SQLKiller.HandleSignal() -} - -// RegisterSQLAndPlanInExecForTopSQL registers the current SQL and Plan on top sql -// TODO: consider whether it's appropriate to have this on executor -func (e *BaseExecutor) RegisterSQLAndPlanInExecForTopSQL() { - sessVars := e.ctx.GetSessionVars() - if topsqlstate.TopSQLEnabled() && sessVars.StmtCtx.IsSQLAndPlanRegistered.CompareAndSwap(false, true) { - RegisterSQLAndPlanInExecForTopSQL(sessVars) - } -} - // TryNewCacheChunk tries to get a cached chunk func TryNewCacheChunk(e Executor) *chunk.Chunk { return e.NewChunk() @@ -331,15 +419,3 @@ func Close(e Executor) (err error) { }() return e.Close() } - -// RegisterSQLAndPlanInExecForTopSQL register the sql and plan information if it doesn't register before execution. -// This uses to catch the running SQL when Top SQL is enabled in execution. -func RegisterSQLAndPlanInExecForTopSQL(sessVars *variable.SessionVars) { - stmtCtx := sessVars.StmtCtx - normalizedSQL, sqlDigest := stmtCtx.SQLDigest() - topsql.RegisterSQL(normalizedSQL, sqlDigest, sessVars.InRestrictedSQL) - normalizedPlan, planDigest := stmtCtx.GetPlanDigest() - if len(normalizedPlan) > 0 { - topsql.RegisterPlan(normalizedPlan, planDigest) - } -} diff --git a/pkg/executor/internal/mpp/local_mpp_coordinator.go b/pkg/executor/internal/mpp/local_mpp_coordinator.go index 9a80f1c89bcd3..16e1415732866 100644 --- a/pkg/executor/internal/mpp/local_mpp_coordinator.go +++ b/pkg/executor/internal/mpp/local_mpp_coordinator.go @@ -754,7 +754,7 @@ func (c *localMppCoordinator) Execute(ctx context.Context) (kv.Response, []kv.Ke ctx = distsql.WithSQLKvExecCounterInterceptor(ctx, sctx.GetSessionVars().StmtCtx) _, allowTiFlashFallback := sctx.GetSessionVars().AllowFallbackToTiKV[kv.TiFlash] - ctx = distsql.SetTiFlashConfVarsInContext(ctx, sctx) + ctx = distsql.SetTiFlashConfVarsInContext(ctx, sctx.GetSessionVars()) c.needTriggerFallback = allowTiFlashFallback c.enableCollectExecutionInfo = config.GetGlobalConfig().Instance.EnableCollectExecutionInfo.Load() diff --git a/pkg/executor/mpp_gather.go b/pkg/executor/mpp_gather.go index f4d838b86e0f4..2bbb6fdde4810 100644 --- a/pkg/executor/mpp_gather.go +++ b/pkg/executor/mpp_gather.go @@ -121,7 +121,7 @@ func (e *MPPGather) Next(ctx context.Context, chk *chunk.Chunk) error { return nil } - return table.FillVirtualColumnValue(e.virtualColumnRetFieldTypes, e.virtualColumnIndex, e.Schema().Columns, e.columns, e.Ctx(), chk) + return table.FillVirtualColumnValue(e.virtualColumnRetFieldTypes, e.virtualColumnIndex, e.Schema().Columns, e.columns, e.Ctx().GetExprCtx(), chk) } // Close and release the used resources. diff --git a/pkg/executor/point_get.go b/pkg/executor/point_get.go index 545fcea0de493..4d335d8519b63 100644 --- a/pkg/executor/point_get.go +++ b/pkg/executor/point_get.go @@ -352,7 +352,7 @@ func (e *PointGetExecutor) Next(ctx context.Context, req *chunk.Chunk) error { } err = table.FillVirtualColumnValue(e.virtualColumnRetFieldTypes, e.virtualColumnIndex, - e.Schema().Columns, e.columns, e.Ctx(), req) + e.Schema().Columns, e.columns, e.Ctx().GetExprCtx(), req) if err != nil { return err } diff --git a/pkg/executor/table_reader.go b/pkg/executor/table_reader.go index 8d7624f7c23e9..32fa2bc9c4840 100644 --- a/pkg/executor/table_reader.go +++ b/pkg/executor/table_reader.go @@ -22,21 +22,25 @@ import ( "time" "unsafe" + "github.com/pingcap/errors" "github.com/pingcap/failpoint" "github.com/pingcap/tidb/pkg/distsql" + distsqlctx "github.com/pingcap/tidb/pkg/distsql/context" "github.com/pingcap/tidb/pkg/domain" "github.com/pingcap/tidb/pkg/domain/infosync" "github.com/pingcap/tidb/pkg/executor/internal/builder" "github.com/pingcap/tidb/pkg/executor/internal/exec" internalutil "github.com/pingcap/tidb/pkg/executor/internal/util" "github.com/pingcap/tidb/pkg/expression" + exprctx "github.com/pingcap/tidb/pkg/expression/context" "github.com/pingcap/tidb/pkg/infoschema" "github.com/pingcap/tidb/pkg/kv" "github.com/pingcap/tidb/pkg/parser/model" + planctx "github.com/pingcap/tidb/pkg/planner/context" plannercore "github.com/pingcap/tidb/pkg/planner/core" "github.com/pingcap/tidb/pkg/planner/util" "github.com/pingcap/tidb/pkg/sessionctx" - "github.com/pingcap/tidb/pkg/sessiontxn" + "github.com/pingcap/tidb/pkg/sessionctx/variable" "github.com/pingcap/tidb/pkg/table" "github.com/pingcap/tidb/pkg/types" "github.com/pingcap/tidb/pkg/util/chunk" @@ -54,16 +58,16 @@ var _ exec.Executor = &TableReaderExecutor{} // selectResultHook is used to hack distsql.SelectWithRuntimeStats safely for testing. type selectResultHook struct { - selectResultFunc func(ctx context.Context, sctx sessionctx.Context, kvReq *kv.Request, + selectResultFunc func(ctx context.Context, dctx distsqlctx.DistSQLContext, kvReq *kv.Request, fieldTypes []*types.FieldType, copPlanIDs []int) (distsql.SelectResult, error) } -func (sr selectResultHook) SelectResult(ctx context.Context, sctx sessionctx.Context, kvReq *kv.Request, +func (sr selectResultHook) SelectResult(ctx context.Context, dctx distsqlctx.DistSQLContext, kvReq *kv.Request, fieldTypes []*types.FieldType, copPlanIDs []int, rootPlanID int) (distsql.SelectResult, error) { if sr.selectResultFunc == nil { - return distsql.SelectWithRuntimeStats(ctx, sctx, kvReq, fieldTypes, copPlanIDs, rootPlanID) + return distsql.SelectWithRuntimeStats(ctx, dctx, kvReq, fieldTypes, copPlanIDs, rootPlanID) } - return sr.selectResultFunc(ctx, sctx, kvReq, fieldTypes, copPlanIDs) + return sr.selectResultFunc(ctx, dctx, kvReq, fieldTypes, copPlanIDs) } type kvRangeBuilder interface { @@ -71,9 +75,60 @@ type kvRangeBuilder interface { buildKeyRangeSeparately(ranges []*ranger.Range) ([]int64, [][]kv.KeyRange, error) } +// tableReaderExecutorContext is the execution context for the `TableReaderExecutor` +type tableReaderExecutorContext struct { + dctx distsqlctx.DistSQLContext + pctx planctx.PlanContext + ectx exprctx.BuildContext + + getDDLOwner func(context.Context) (*infosync.ServerInfo, error) +} + +func (treCtx *tableReaderExecutorContext) GetSessionVars() *variable.SessionVars { + return treCtx.dctx.GetSessionVars() +} + +func (treCtx *tableReaderExecutorContext) GetInfoSchema() infoschema.InfoSchema { + return treCtx.pctx.GetInfoSchema().(infoschema.InfoSchema) +} + +func (treCtx *tableReaderExecutorContext) GetDDLOwner(ctx context.Context) (*infosync.ServerInfo, error) { + if treCtx.getDDLOwner != nil { + return treCtx.getDDLOwner(ctx) + } + + return nil, errors.New("GetDDLOwner in a context without DDL") +} + +func newTableReaderExecutorContext(sctx sessionctx.Context) tableReaderExecutorContext { + // Explicitly get `ownerManager` out of the closure to show that the `tableReaderExecutorContext` itself doesn't + // depend on `sctx` directly. + // The context of some tests don't have `DDL`, so make it optional + var getDDLOwner func(ctx context.Context) (*infosync.ServerInfo, error) + ddl := domain.GetDomain(sctx).DDL() + if ddl != nil { + ownerManager := ddl.OwnerManager() + getDDLOwner = func(ctx context.Context) (*infosync.ServerInfo, error) { + ddlOwnerID, err := ownerManager.GetOwnerID(ctx) + if err != nil { + return nil, err + } + return infosync.GetServerInfoByID(ctx, ddlOwnerID) + } + } + + return tableReaderExecutorContext{ + dctx: sctx.GetDistSQLCtx(), + pctx: sctx.GetPlanCtx(), + ectx: sctx.GetExprCtx(), + getDDLOwner: getDDLOwner, + } +} + // TableReaderExecutor sends DAG request and reads table data from kv layer. type TableReaderExecutor struct { - exec.BaseExecutor + tableReaderExecutorContext + exec.BaseExecutorV2 table table.Table @@ -169,25 +224,25 @@ func (e *TableReaderExecutor) Open(ctx context.Context) error { } else { e.memTracker = memory.NewTracker(e.ID(), -1) } - e.memTracker.AttachTo(e.Ctx().GetSessionVars().StmtCtx.MemTracker) + e.memTracker.AttachTo(e.GetSessionVars().StmtCtx.MemTracker) var err error if e.corColInFilter { // If there's correlated column in filter, need to rewrite dagPB if e.storeType == kv.TiFlash { - execs, err := builder.ConstructTreeBasedDistExec(e.Ctx(), e.tablePlan) + execs, err := builder.ConstructTreeBasedDistExec(e.pctx, e.tablePlan) if err != nil { return err } e.dagPB.RootExecutor = execs[0] } else { - e.dagPB.Executors, err = builder.ConstructListBasedDistExec(e.Ctx(), e.plans) + e.dagPB.Executors, err = builder.ConstructListBasedDistExec(e.pctx, e.plans) if err != nil { return err } } } - if e.RuntimeStats() != nil { + if e.GetSessionVars().StmtCtx.RuntimeStatsColl != nil { collExec := true e.dagPB.CollectExecutionSummaries = &collExec } @@ -270,7 +325,7 @@ func (e *TableReaderExecutor) Next(ctx context.Context, req *chunk.Chunk) error return err } - err := table.FillVirtualColumnValue(e.virtualColumnRetFieldTypes, e.virtualColumnIndex, e.Schema().Columns, e.columns, e.Ctx(), req) + err := table.FillVirtualColumnValue(e.virtualColumnRetFieldTypes, e.virtualColumnIndex, e.Schema().Columns, e.columns, e.ectx, req) if err != nil { return err } @@ -303,7 +358,7 @@ func (e *TableReaderExecutor) buildResp(ctx context.Context, ranges []*ranger.Ra } var results []distsql.SelectResult for _, kvReq := range kvReqs { - result, err := e.SelectResult(ctx, e.Ctx(), kvReq, exec.RetTypes(e), getPhysicalPlanIDs(e.plans), e.ID()) + result, err := e.SelectResult(ctx, e.dctx, kvReq, exec.RetTypes(e), getPhysicalPlanIDs(e.plans), e.ID()) if err != nil { return nil, err } @@ -316,7 +371,7 @@ func (e *TableReaderExecutor) buildResp(ctx context.Context, ranges []*ranger.Ra if err != nil { return nil, err } - result, err := e.SelectResult(ctx, e.Ctx(), kvReq, exec.RetTypes(e), getPhysicalPlanIDs(e.plans), e.ID()) + result, err := e.SelectResult(ctx, e.dctx, kvReq, exec.RetTypes(e), getPhysicalPlanIDs(e.plans), e.ID()) if err != nil { return nil, err } @@ -331,7 +386,7 @@ func (e *TableReaderExecutor) buildResp(ctx context.Context, ranges []*ranger.Ra } var results []distsql.SelectResult for _, kvReq := range kvReqs { - result, err := e.SelectResult(ctx, e.Ctx(), kvReq, exec.RetTypes(e), getPhysicalPlanIDs(e.plans), e.ID()) + result, err := e.SelectResult(ctx, e.dctx, kvReq, exec.RetTypes(e), getPhysicalPlanIDs(e.plans), e.ID()) if err != nil { return nil, err } @@ -352,7 +407,7 @@ func (e *TableReaderExecutor) buildResp(ctx context.Context, ranges []*ranger.Ra }) e.kvRanges = kvReq.KeyRanges.AppendSelfTo(e.kvRanges) - result, err := e.SelectResult(ctx, e.Ctx(), kvReq, exec.RetTypes(e), getPhysicalPlanIDs(e.plans), e.ID()) + result, err := e.SelectResult(ctx, e.dctx, kvReq, exec.RetTypes(e), getPhysicalPlanIDs(e.plans), e.ID()) if err != nil { return nil, err } @@ -379,14 +434,14 @@ func (e *TableReaderExecutor) buildKVReqSeparately(ctx context.Context, ranges [ SetKeepOrder(e.keepOrder). SetTxnScope(e.txnScope). SetReadReplicaScope(e.readReplicaScope). - SetFromSessionVars(e.Ctx().GetSessionVars()). - SetFromInfoSchema(e.Ctx().GetInfoSchema()). + SetFromSessionVars(e.GetSessionVars()). + SetFromInfoSchema(e.GetInfoSchema()). SetMemTracker(e.memTracker). SetStoreType(e.storeType). SetPaging(e.paging). SetAllowBatchCop(e.batchCop). - SetClosestReplicaReadAdjuster(newClosestReadAdjuster(e.Ctx(), &reqBuilder.Request, e.netDataSize)). - SetConnIDAndConnAlias(e.Ctx().GetSessionVars().ConnectionID, e.Ctx().GetSessionVars().SessionAlias). + SetClosestReplicaReadAdjuster(newClosestReadAdjuster(e.GetSessionVars(), &reqBuilder.Request, e.netDataSize)). + SetConnIDAndConnAlias(e.GetSessionVars().ConnectionID, e.GetSessionVars().SessionAlias). Build() if err != nil { return nil, err @@ -421,14 +476,14 @@ func (e *TableReaderExecutor) buildKVReqForPartitionTableScan(ctx context.Contex SetKeepOrder(e.keepOrder). SetTxnScope(e.txnScope). SetReadReplicaScope(e.readReplicaScope). - SetFromSessionVars(e.Ctx().GetSessionVars()). - SetFromInfoSchema(e.Ctx().GetInfoSchema()). + SetFromSessionVars(e.GetSessionVars()). + SetFromInfoSchema(e.GetInfoSchema()). SetMemTracker(e.memTracker). SetStoreType(e.storeType). SetPaging(e.paging). SetAllowBatchCop(e.batchCop). - SetClosestReplicaReadAdjuster(newClosestReadAdjuster(e.Ctx(), &reqBuilder.Request, e.netDataSize)). - SetConnIDAndConnAlias(e.Ctx().GetSessionVars().ConnectionID, e.Ctx().GetSessionVars().SessionAlias). + SetClosestReplicaReadAdjuster(newClosestReadAdjuster(e.GetSessionVars(), &reqBuilder.Request, e.netDataSize)). + SetConnIDAndConnAlias(e.GetSessionVars().ConnectionID, e.GetSessionVars().SessionAlias). Build() if err != nil { return nil, err @@ -446,17 +501,12 @@ func (e *TableReaderExecutor) buildKVReq(ctx context.Context, ranges []*ranger.R } reqBuilder = builder.SetPartitionKeyRanges(kvRange) } else { - reqBuilder = builder.SetHandleRanges(e.Ctx().GetSessionVars().StmtCtx, getPhysicalTableID(e.table), e.table.Meta() != nil && e.table.Meta().IsCommonHandle, ranges) + reqBuilder = builder.SetHandleRanges(e.GetSessionVars().StmtCtx, getPhysicalTableID(e.table), e.table.Meta() != nil && e.table.Meta().IsCommonHandle, ranges) } if e.table != nil && e.table.Type().IsClusterTable() { copDestination := infoschema.GetClusterTableCopDestination(e.table.Meta().Name.L) if copDestination == infoschema.DDLOwner { - ownerManager := domain.GetDomain(e.Ctx()).DDL().OwnerManager() - ddlOwnerID, err := ownerManager.GetOwnerID(ctx) - if err != nil { - return nil, err - } - serverInfo, err := infosync.GetServerInfoByID(ctx, ddlOwnerID) + serverInfo, err := e.GetDDLOwner(ctx) if err != nil { return nil, err } @@ -471,14 +521,14 @@ func (e *TableReaderExecutor) buildKVReq(ctx context.Context, ranges []*ranger.R SetTxnScope(e.txnScope). SetReadReplicaScope(e.readReplicaScope). SetIsStaleness(e.isStaleness). - SetFromSessionVars(e.Ctx().GetSessionVars()). - SetFromInfoSchema(sessiontxn.GetTxnManager(e.Ctx()).GetTxnInfoSchema()). + SetFromSessionVars(e.GetSessionVars()). + SetFromInfoSchema(e.GetInfoSchema()). SetMemTracker(e.memTracker). SetStoreType(e.storeType). SetAllowBatchCop(e.batchCop). - SetClosestReplicaReadAdjuster(newClosestReadAdjuster(e.Ctx(), &reqBuilder.Request, e.netDataSize)). + SetClosestReplicaReadAdjuster(newClosestReadAdjuster(e.GetSessionVars(), &reqBuilder.Request, e.netDataSize)). SetPaging(e.paging). - SetConnIDAndConnAlias(e.Ctx().GetSessionVars().ConnectionID, e.Ctx().GetSessionVars().SessionAlias) + SetConnIDAndConnAlias(e.GetSessionVars().ConnectionID, e.GetSessionVars().SessionAlias) return reqBuilder.Build() } diff --git a/pkg/executor/table_readers_required_rows_test.go b/pkg/executor/table_readers_required_rows_test.go index 8581f8b91b16f..238b5b3d31d20 100644 --- a/pkg/executor/table_readers_required_rows_test.go +++ b/pkg/executor/table_readers_required_rows_test.go @@ -21,6 +21,7 @@ import ( "testing" "github.com/pingcap/tidb/pkg/distsql" + distsqlctx "github.com/pingcap/tidb/pkg/distsql/context" "github.com/pingcap/tidb/pkg/executor/internal/builder" "github.com/pingcap/tidb/pkg/executor/internal/exec" "github.com/pingcap/tidb/pkg/expression" @@ -111,7 +112,7 @@ func mockDistsqlSelectCtxGet(ctx context.Context) (totalRows int, expectedRowsRe return } -func mockSelectResult(ctx context.Context, sctx sessionctx.Context, kvReq *kv.Request, +func mockSelectResult(ctx context.Context, dctx distsqlctx.DistSQLContext, kvReq *kv.Request, fieldTypes []*types.FieldType, copPlanIDs []int) (distsql.SelectResult, error) { totalRows, expectedRowsRet := mockDistsqlSelectCtxGet(ctx) return &requiredRowsSelectResult{ @@ -122,11 +123,19 @@ func mockSelectResult(ctx context.Context, sctx sessionctx.Context, kvReq *kv.Re } func buildTableReader(sctx sessionctx.Context) exec.Executor { + retTypes := []*types.FieldType{types.NewFieldType(mysql.TypeDouble), types.NewFieldType(mysql.TypeLonglong)} + cols := make([]*expression.Column, len(retTypes)) + for i := range retTypes { + cols[i] = &expression.Column{Index: i, RetType: retTypes[i]} + } + schema := expression.NewSchema(cols...) + e := &TableReaderExecutor{ - BaseExecutor: buildMockBaseExec(sctx), - table: &tables.TableCommon{}, - dagPB: buildMockDAGRequest(sctx), - selectResultHook: selectResultHook{mockSelectResult}, + BaseExecutorV2: exec.NewBaseExecutorV2(sctx.GetSessionVars(), schema, 0), + tableReaderExecutorContext: newTableReaderExecutorContext(sctx), + table: &tables.TableCommon{}, + dagPB: buildMockDAGRequest(sctx), + selectResultHook: selectResultHook{mockSelectResult}, } return e } diff --git a/pkg/session/BUILD.bazel b/pkg/session/BUILD.bazel index 977f5663cc7bb..f10b6d933e639 100644 --- a/pkg/session/BUILD.bazel +++ b/pkg/session/BUILD.bazel @@ -24,6 +24,7 @@ go_library( "//pkg/ddl/placement", "//pkg/ddl/schematracker", "//pkg/ddl/syncer", + "//pkg/distsql/context", "//pkg/disttask/framework/proto", "//pkg/disttask/framework/scheduler", "//pkg/disttask/framework/storage", diff --git a/pkg/session/session.go b/pkg/session/session.go index 227bddef99c12..1dd35a6a98b30 100644 --- a/pkg/session/session.go +++ b/pkg/session/session.go @@ -43,6 +43,7 @@ import ( "github.com/pingcap/tidb/pkg/config" "github.com/pingcap/tidb/pkg/ddl" "github.com/pingcap/tidb/pkg/ddl/placement" + distsqlctx "github.com/pingcap/tidb/pkg/distsql/context" "github.com/pingcap/tidb/pkg/disttask/framework/proto" "github.com/pingcap/tidb/pkg/disttask/framework/scheduler" "github.com/pingcap/tidb/pkg/disttask/framework/taskexecutor" @@ -2620,6 +2621,11 @@ func (s *session) GetTableCtx() tbctx.MutateContext { return s.tblctx } +// GetDistSQLCtx returns the context used in DistSQL +func (s *session) GetDistSQLCtx() distsqlctx.DistSQLContext { + return s +} + func (s *session) AuthPluginForUser(user *auth.UserIdentity) (string, error) { pm := privilege.GetPrivilegeManager(s) authplugin, err := pm.GetAuthPluginForConnection(user.Username, user.Hostname) diff --git a/pkg/sessionctx/BUILD.bazel b/pkg/sessionctx/BUILD.bazel index a9a6983a77c68..428390a5af69d 100644 --- a/pkg/sessionctx/BUILD.bazel +++ b/pkg/sessionctx/BUILD.bazel @@ -6,6 +6,7 @@ go_library( importpath = "github.com/pingcap/tidb/pkg/sessionctx", visibility = ["//visibility:public"], deps = [ + "//pkg/distsql/context", "//pkg/expression/context", "//pkg/extension", "//pkg/infoschema/context", diff --git a/pkg/sessionctx/context.go b/pkg/sessionctx/context.go index 62b2537e269e8..6018e6af17cee 100644 --- a/pkg/sessionctx/context.go +++ b/pkg/sessionctx/context.go @@ -20,6 +20,7 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/kvproto/pkg/kvrpcpb" + distsqlctx "github.com/pingcap/tidb/pkg/distsql/context" exprctx "github.com/pingcap/tidb/pkg/expression/context" "github.com/pingcap/tidb/pkg/extension" infoschema "github.com/pingcap/tidb/pkg/infoschema/context" @@ -106,6 +107,9 @@ type Context interface { // GetPlanCtx gets the plan context of the current session. GetPlanCtx() planctx.PlanContext + // GetDistSQLCtx gets the distsql ctx of the current session + GetDistSQLCtx() distsqlctx.DistSQLContext + GetSessionManager() util.SessionManager // RefreshTxnCtx commits old transaction without retry, diff --git a/pkg/table/BUILD.bazel b/pkg/table/BUILD.bazel index 23e8a9b333519..33b766c024e52 100644 --- a/pkg/table/BUILD.bazel +++ b/pkg/table/BUILD.bazel @@ -14,6 +14,7 @@ go_library( "//pkg/errctx", "//pkg/errno", "//pkg/expression", + "//pkg/expression/context", "//pkg/kv", "//pkg/meta/autoid", "//pkg/parser", diff --git a/pkg/table/column.go b/pkg/table/column.go index 4c1630d5ae0da..8bc214b9fd880 100644 --- a/pkg/table/column.go +++ b/pkg/table/column.go @@ -27,6 +27,7 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/tidb/pkg/errctx" "github.com/pingcap/tidb/pkg/expression" + exprctx "github.com/pingcap/tidb/pkg/expression/context" "github.com/pingcap/tidb/pkg/parser" "github.com/pingcap/tidb/pkg/parser/ast" "github.com/pingcap/tidb/pkg/parser/charset" @@ -743,7 +744,7 @@ func OptionalFsp(fieldType *types.FieldType) string { // FillVirtualColumnValue will calculate the virtual column value by evaluating generated // expression using rows from a chunk, and then fill this value into the chunk. func FillVirtualColumnValue(virtualRetTypes []*types.FieldType, virtualColumnIndex []int, - expCols []*expression.Column, colInfos []*model.ColumnInfo, sctx sessionctx.Context, req *chunk.Chunk) error { + expCols []*expression.Column, colInfos []*model.ColumnInfo, ectx exprctx.BuildContext, req *chunk.Chunk) error { if len(virtualColumnIndex) == 0 { return nil } @@ -752,19 +753,19 @@ func FillVirtualColumnValue(virtualRetTypes []*types.FieldType, virtualColumnInd iter := chunk.NewIterator4Chunk(req) for i, idx := range virtualColumnIndex { for row := iter.Begin(); row != iter.End(); row = iter.Next() { - datum, err := expCols[idx].EvalVirtualColumn(sctx.GetExprCtx(), row) + datum, err := expCols[idx].EvalVirtualColumn(ectx, row) if err != nil { return err } // Because the expression might return different type from // the generated column, we should wrap a CAST on the result. - castDatum, err := CastValue(sctx, datum, colInfos[idx], false, true) + castDatum, err := CastColumnValue(ectx.GetSessionVars(), datum, colInfos[idx], false, true) if err != nil { return err } // Clip to zero if get negative value after cast to unsigned. - if mysql.HasUnsignedFlag(colInfos[idx].FieldType.GetFlag()) && !castDatum.IsNull() && sctx.GetSessionVars().StmtCtx.TypeFlags().AllowNegativeToUnsigned() { + if mysql.HasUnsignedFlag(colInfos[idx].FieldType.GetFlag()) && !castDatum.IsNull() && ectx.GetSessionVars().StmtCtx.TypeFlags().AllowNegativeToUnsigned() { switch datum.Kind() { case types.KindInt64: if datum.GetInt64() < 0 { diff --git a/pkg/util/mock/BUILD.bazel b/pkg/util/mock/BUILD.bazel index 2ef3aa0c823d3..c096283eae1e2 100644 --- a/pkg/util/mock/BUILD.bazel +++ b/pkg/util/mock/BUILD.bazel @@ -12,6 +12,7 @@ go_library( importpath = "github.com/pingcap/tidb/pkg/util/mock", visibility = ["//visibility:public"], deps = [ + "//pkg/distsql/context", "//pkg/expression/context", "//pkg/expression/contextimpl", "//pkg/extension", diff --git a/pkg/util/mock/context.go b/pkg/util/mock/context.go index 48ec7ead3bf4d..19ff217a80aad 100644 --- a/pkg/util/mock/context.go +++ b/pkg/util/mock/context.go @@ -22,6 +22,7 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/kvproto/pkg/kvrpcpb" + distsqlctx "github.com/pingcap/tidb/pkg/distsql/context" exprctx "github.com/pingcap/tidb/pkg/expression/context" exprctximpl "github.com/pingcap/tidb/pkg/expression/contextimpl" "github.com/pingcap/tidb/pkg/extension" @@ -215,6 +216,11 @@ func (c *Context) GetTableCtx() tbctx.MutateContext { return c.tblctx } +// GetDistSQLCtx returns the distsql context of the session +func (c *Context) GetDistSQLCtx() distsqlctx.DistSQLContext { + return c +} + // Txn implements sessionctx.Context Txn interface. func (c *Context) Txn(bool) (kv.Transaction, error) { return &c.txn, nil