From 72049062dc14035f516ef8b19857d5bc31656372 Mon Sep 17 00:00:00 2001 From: Zhi Qi <30543181+LittleFall@users.noreply.github.com> Date: Mon, 24 May 2021 03:48:04 +0800 Subject: [PATCH] planner: enforce push mpp down (#24849) * planner: support set tidb_allow_mpp to `2` or `ENFORCE` to enforce use mpp mode. (#24516) * solve conflicts * fix tests * fix vet * fix test --- executor/mpp_gather.go | 2 +- planner/core/exhaust_physical_plans.go | 10 +- planner/core/integration_test.go | 126 +++++++++++++++++++++++++ planner/core/task.go | 10 +- sessionctx/variable/session.go | 21 ++++- sessionctx/variable/sysvar.go | 4 +- sessionctx/variable/tidb_vars.go | 4 +- store/copr/batch_coprocessor.go | 2 +- 8 files changed, 162 insertions(+), 17 deletions(-) diff --git a/executor/mpp_gather.go b/executor/mpp_gather.go index 536a06eda8993..2c6955853bf90 100644 --- a/executor/mpp_gather.go +++ b/executor/mpp_gather.go @@ -30,7 +30,7 @@ import ( ) func useMPPExecution(ctx sessionctx.Context, tr *plannercore.PhysicalTableReader) bool { - if !ctx.GetSessionVars().AllowMPPExecution { + if !ctx.GetSessionVars().IsMPPAllowed() { return false } _, ok := tr.GetTablePlan().(*plannercore.PhysicalExchangeSender) diff --git a/planner/core/exhaust_physical_plans.go b/planner/core/exhaust_physical_plans.go index 56ffc85cae4f0..86041b54da121 100644 --- a/planner/core/exhaust_physical_plans.go +++ b/planner/core/exhaust_physical_plans.go @@ -1667,7 +1667,7 @@ func (p *LogicalJoin) exhaustPhysicalPlans(prop *property.PhysicalProperty) ([]P } joins := make([]PhysicalPlan, 0, 8) canPushToTiFlash := p.canPushToCop(kv.TiFlash) - if p.ctx.GetSessionVars().AllowMPPExecution && canPushToTiFlash { + if p.ctx.GetSessionVars().IsMPPAllowed() && canPushToTiFlash { if p.shouldUseMPPBCJ() { mppJoins := p.tryToGetMppHashJoin(prop, true) if (p.preferJoinType & preferBCJoin) > 0 { @@ -1980,7 +1980,7 @@ func (lt *LogicalTopN) getPhysTopN(prop *property.PhysicalProperty) []PhysicalPl if !lt.limitHints.preferLimitToCop { allTaskTypes = append(allTaskTypes, property.RootTaskType) } - if lt.ctx.GetSessionVars().AllowMPPExecution { + if lt.ctx.GetSessionVars().IsMPPAllowed() { allTaskTypes = append(allTaskTypes, property.MppTaskType) } ret := make([]PhysicalPlan, 0, len(allTaskTypes)) @@ -2370,7 +2370,7 @@ func (la *LogicalAggregation) getHashAggs(prop *property.PhysicalProperty) []Phy taskTypes = append(taskTypes, property.CopTiFlashLocalReadTaskType) } canPushDownToTiFlash := la.canPushToCop(kv.TiFlash) - canPushDownToMPP := la.ctx.GetSessionVars().AllowMPPExecution && la.checkCanPushDownToMPP() && canPushDownToTiFlash + canPushDownToMPP := la.ctx.GetSessionVars().IsMPPAllowed() && la.checkCanPushDownToMPP() && canPushDownToTiFlash if la.HasDistinct() { // TODO: remove after the cost estimation of distinct pushdown is implemented. if !la.ctx.GetSessionVars().AllowDistinctAggPushDown { @@ -2480,7 +2480,7 @@ func (p *LogicalLimit) exhaustPhysicalPlans(prop *property.PhysicalProperty) ([] if !p.limitHints.preferLimitToCop { allTaskTypes = append(allTaskTypes, property.RootTaskType) } - if p.canPushToCop(kv.TiFlash) && p.ctx.GetSessionVars().AllowMPPExecution { + if p.canPushToCop(kv.TiFlash) && p.ctx.GetSessionVars().IsMPPAllowed() { allTaskTypes = append(allTaskTypes, property.MppTaskType) } ret := make([]PhysicalPlan, 0, len(allTaskTypes)) @@ -2517,7 +2517,7 @@ func (p *LogicalUnionAll) exhaustPhysicalPlans(prop *property.PhysicalProperty) if prop.TaskTp == property.MppTaskType && prop.PartitionTp != property.AnyType { return nil, true } - canUseMpp := p.ctx.GetSessionVars().AllowMPPExecution && p.canPushToCop(kv.TiFlash) + canUseMpp := p.ctx.GetSessionVars().IsMPPAllowed() && p.canPushToCop(kv.TiFlash) chReqProps := make([]*property.PhysicalProperty, 0, len(p.children)) for range p.children { if canUseMpp && prop.TaskTp == property.MppTaskType { diff --git a/planner/core/integration_test.go b/planner/core/integration_test.go index 3edc01b93407f..8c1403c903889 100644 --- a/planner/core/integration_test.go +++ b/planner/core/integration_test.go @@ -3124,3 +3124,129 @@ func (s *testIntegrationSuite) TestIssue23846(c *C) { tk.MustQuery("select count(*) from t where a=0x00A4EEF4FA55D6706ED5").Check(testkit.Rows("1")) tk.MustQuery("select * from t where a=0x00A4EEF4FA55D6706ED5").Check(testkit.Rows("\x00\xa4\xee\xf4\xfaU\xd6pn\xd5")) // not empty } + +func (s *testIntegrationSuite) TestEnforceMPP(c *C) { + tk := testkit.NewTestKit(c, s.store) + + // test value limit of tidb_opt_tiflash_concurrency_factor + err := tk.ExecToErr("set @@tidb_opt_tiflash_concurrency_factor = 0") + c.Assert(err, NotNil) + c.Assert(err.Error(), Equals, `[variable:1231]Variable 'tidb_opt_tiflash_concurrency_factor' can't be set to the value of '0'`) + + tk.MustExec("set @@tidb_opt_tiflash_concurrency_factor = 1") + tk.MustQuery("select @@tidb_opt_tiflash_concurrency_factor").Check(testkit.Rows("1")) + tk.MustExec("set @@tidb_opt_tiflash_concurrency_factor = 24") + tk.MustQuery("select @@tidb_opt_tiflash_concurrency_factor").Check(testkit.Rows("24")) + + // test set tidb_allow_mpp + tk.MustExec("set @@session.tidb_allow_mpp = 0") + tk.MustQuery("select @@session.tidb_allow_mpp").Check(testkit.Rows("OFF")) + tk.MustExec("set @@session.tidb_allow_mpp = 1") + tk.MustQuery("select @@session.tidb_allow_mpp").Check(testkit.Rows("ON")) + tk.MustExec("set @@session.tidb_allow_mpp = 2") + tk.MustQuery("select @@session.tidb_allow_mpp").Check(testkit.Rows("ENFORCE")) + + tk.MustExec("set @@session.tidb_allow_mpp = off") + tk.MustQuery("select @@session.tidb_allow_mpp").Check(testkit.Rows("OFF")) + tk.MustExec("set @@session.tidb_allow_mpp = oN") + tk.MustQuery("select @@session.tidb_allow_mpp").Check(testkit.Rows("ON")) + tk.MustExec("set @@session.tidb_allow_mpp = enForcE") + tk.MustQuery("select @@session.tidb_allow_mpp").Check(testkit.Rows("ENFORCE")) + + tk.MustExec("set @@global.tidb_allow_mpp = faLsE") + tk.MustQuery("select @@global.tidb_allow_mpp").Check(testkit.Rows("OFF")) + tk.MustExec("set @@global.tidb_allow_mpp = True") + tk.MustQuery("select @@global.tidb_allow_mpp").Check(testkit.Rows("ON")) + + err = tk.ExecToErr("set @@global.tidb_allow_mpp = enforceWithTypo") + c.Assert(err, NotNil) + c.Assert(err.Error(), Equals, `[variable:1231]Variable 'tidb_allow_mpp' can't be set to the value of 'enforceWithTypo'`) + + // test query + tk.MustExec("use test") + tk.MustExec("drop table if exists t") + tk.MustExec("create table t(a int)") + tk.MustExec("create index idx on t(a)") + + // Create virtual tiflash replica info. + dom := domain.GetDomain(tk.Se) + is := dom.InfoSchema() + db, exists := is.SchemaByName(model.NewCIStr("test")) + c.Assert(exists, IsTrue) + for _, tblInfo := range db.Tables { + if tblInfo.Name.L == "t" { + tblInfo.TiFlashReplica = &model.TiFlashReplicaInfo{ + Count: 1, + Available: true, + } + } + } + + // ban mpp + tk.MustExec("set @@session.tidb_allow_mpp = 0") + tk.MustQuery("select @@session.tidb_allow_mpp").Check(testkit.Rows("OFF")) + + // read from tiflash, batch cop. + tk.MustQuery("explain select /*+ read_from_storage(tiflash[t]) */ count(*) from t where a=1").Check(testkit.Rows( + "StreamAgg_20 1.00 root funcs:count(Column#5)->Column#3", + "└─TableReader_21 1.00 root data:StreamAgg_9", + " └─StreamAgg_9 1.00 batchCop[tiflash] funcs:count(1)->Column#5", + " └─Selection_19 10.00 batchCop[tiflash] eq(test.t.a, 1)", + " └─TableFullScan_18 10000.00 batchCop[tiflash] table:t keep order:false, stats:pseudo")) + + // open mpp + tk.MustExec("set @@session.tidb_allow_mpp = 1") + tk.MustQuery("select @@session.tidb_allow_mpp").Check(testkit.Rows("ON")) + + // should use tikv to index read + tk.MustQuery("explain select count(*) from t where a=1;").Check(testkit.Rows( + "StreamAgg_30 1.00 root funcs:count(Column#6)->Column#3", + "└─IndexReader_31 1.00 root index:StreamAgg_10", + " └─StreamAgg_10 1.00 cop[tikv] funcs:count(1)->Column#6", + " └─IndexRangeScan_29 10.00 cop[tikv] table:t, index:idx(a) range:[1,1], keep order:false, stats:pseudo")) + + // read from tikv, indexRead + tk.MustQuery("explain select /*+ read_from_storage(tikv[t]) */ count(*) from t where a=1;").Check(testkit.Rows( + "StreamAgg_18 1.00 root funcs:count(Column#5)->Column#3", + "└─IndexReader_19 1.00 root index:StreamAgg_10", + " └─StreamAgg_10 1.00 cop[tikv] funcs:count(1)->Column#5", + " └─IndexRangeScan_17 10.00 cop[tikv] table:t, index:idx(a) range:[1,1], keep order:false, stats:pseudo")) + + // read from tiflash, mpp with large cost + tk.MustQuery("explain select /*+ read_from_storage(tiflash[t]) */ count(*) from t where a=1").Check(testkit.Rows( + "HashAgg_21 1.00 root funcs:count(Column#5)->Column#3", + "└─TableReader_23 1.00 root data:ExchangeSender_22", + " └─ExchangeSender_22 1.00 batchCop[tiflash] ExchangeType: PassThrough", + " └─HashAgg_9 1.00 batchCop[tiflash] funcs:count(1)->Column#5", + " └─Selection_20 10.00 batchCop[tiflash] eq(test.t.a, 1)", + " └─TableFullScan_19 10000.00 batchCop[tiflash] table:t keep order:false, stats:pseudo")) + + // enforce mpp + tk.MustExec("set @@session.tidb_allow_mpp = 2") + tk.MustQuery("select @@session.tidb_allow_mpp").Check(testkit.Rows("ENFORCE")) + + // should use mpp + tk.MustQuery("explain select count(*) from t where a=1;").Check(testkit.Rows( + "HashAgg_24 1.00 root funcs:count(Column#5)->Column#3", + "└─TableReader_26 1.00 root data:ExchangeSender_25", + " └─ExchangeSender_25 1.00 batchCop[tiflash] ExchangeType: PassThrough", + " └─HashAgg_9 1.00 batchCop[tiflash] funcs:count(1)->Column#5", + " └─Selection_23 10.00 batchCop[tiflash] eq(test.t.a, 1)", + " └─TableFullScan_22 10000.00 batchCop[tiflash] table:t keep order:false, stats:pseudo")) + + // read from tikv, indexRead + tk.MustQuery("explain select /*+ read_from_storage(tikv[t]) */ count(*) from t where a=1;").Check(testkit.Rows( + "StreamAgg_18 1.00 root funcs:count(Column#5)->Column#3", + "└─IndexReader_19 1.00 root index:StreamAgg_10", + " └─StreamAgg_10 1.00 cop[tikv] funcs:count(1)->Column#5", + " └─IndexRangeScan_17 10.00 cop[tikv] table:t, index:idx(a) range:[1,1], keep order:false, stats:pseudo")) + + // read from tiflash + tk.MustQuery("explain select /*+ read_from_storage(tiflash[t]) */ count(*) from t where a=1").Check(testkit.Rows( + "HashAgg_21 1.00 root funcs:count(Column#5)->Column#3", + "└─TableReader_23 1.00 root data:ExchangeSender_22", + " └─ExchangeSender_22 1.00 batchCop[tiflash] ExchangeType: PassThrough", + " └─HashAgg_9 1.00 batchCop[tiflash] funcs:count(1)->Column#5", + " └─Selection_20 10.00 batchCop[tiflash] eq(test.t.a, 1)", + " └─TableFullScan_19 10000.00 batchCop[tiflash] table:t keep order:false, stats:pseudo")) +} diff --git a/planner/core/task.go b/planner/core/task.go index 4f1b6efe6f50e..45a0f729e41c7 100644 --- a/planner/core/task.go +++ b/planner/core/task.go @@ -2007,10 +2007,16 @@ func (t *mppTask) convertToRootTaskImpl(ctx sessionctx.Context) *rootTask { StoreType: kv.TiFlash, }.Init(ctx, t.p.SelectBlockOffset()) p.stats = t.p.statsInfo() - return &rootTask{ + + cst := t.cst / p.ctx.GetSessionVars().CopTiFlashConcurrencyFactor + if p.ctx.GetSessionVars().IsMPPEnforced() { + cst = 0 + } + rt := &rootTask{ p: p, - cst: t.cst / p.ctx.GetSessionVars().CopTiFlashConcurrencyFactor, + cst: cst, } + return rt } func (t *mppTask) needEnforce(prop *property.PhysicalProperty) bool { diff --git a/sessionctx/variable/session.go b/sessionctx/variable/session.go index c3a263770691a..f7f0ccd7b7566 100644 --- a/sessionctx/variable/session.go +++ b/sessionctx/variable/session.go @@ -496,11 +496,12 @@ type SessionVars struct { AllowWriteRowID bool // AllowBatchCop means if we should send batch coprocessor to TiFlash. Default value is 1, means to use batch cop in case of aggregation and join. - // If value is set to 2 , which means to force to send batch cop for any query. Value is set to 0 means never use batch cop. + // Value set to 2 means to force to send batch cop for any query. Value set to 0 means never use batch cop. AllowBatchCop int - // AllowMPPExecution will prefer using mpp way to execute a query. - AllowMPPExecution bool + // AllowMPPExecution means if we should use mpp way to execute query. Default value is "ON", means to be determined by the optimizer. + // Value set to "ENFORCE" means to use mpp whenever possible. Value set to "OFF" means never use mpp. + allowMPPExecution string // TiDBAllowAutoRandExplicitInsert indicates whether explicit insertion on auto_random column is allowed. AllowAutoRandExplicitInsert bool @@ -842,6 +843,16 @@ func (s *SessionVars) AllocMPPTaskID(startTS uint64) int64 { return 1 } +// IsMPPAllowed returns whether mpp execution is allowed. +func (s *SessionVars) IsMPPAllowed() bool { + return s.allowMPPExecution != "OFF" +} + +// IsMPPEnforced returns whether mpp execution is enforced. +func (s *SessionVars) IsMPPEnforced() bool { + return s.allowMPPExecution == "ENFORCE" +} + // CheckAndGetTxnScope will return the transaction scope we should use in the current session. func (s *SessionVars) CheckAndGetTxnScope() string { if s.InRestrictedSQL { @@ -1068,7 +1079,7 @@ func NewSessionVars() *SessionVars { terror.Log(vars.SetSystemVar(TiDBEnableStreaming, enableStreaming)) vars.AllowBatchCop = DefTiDBAllowBatchCop - vars.AllowMPPExecution = DefTiDBAllowMPPExecution + vars.allowMPPExecution = DefTiDBAllowMPPExecution var enableChunkRPC string if config.GetGlobalConfig().TiKVClient.EnableChunkRPC { @@ -1490,7 +1501,7 @@ func (s *SessionVars) SetSystemVar(name string, val string) error { case TiDBAllowBatchCop: s.AllowBatchCop = int(tidbOptInt64(val, DefTiDBAllowBatchCop)) case TiDBAllowMPPExecution: - s.AllowMPPExecution = TiDBOptOn(val) + s.allowMPPExecution = val case TiDBIndexLookupSize: s.IndexLookupSize = tidbOptPositiveInt32(val, DefIndexLookupSize) case TiDBHashJoinConcurrency: diff --git a/sessionctx/variable/sysvar.go b/sessionctx/variable/sysvar.go index 86b1c9900ef5d..541948b200dde 100644 --- a/sessionctx/variable/sysvar.go +++ b/sessionctx/variable/sysvar.go @@ -580,7 +580,7 @@ var defaultSysVars = []*SysVar{ return oracle.LocalTxnScope }()}, /* TiDB specific variables */ - {Scope: ScopeGlobal | ScopeSession, Name: TiDBAllowMPPExecution, Type: TypeBool, Value: BoolToOnOff(DefTiDBAllowMPPExecution)}, + {Scope: ScopeGlobal | ScopeSession, Name: TiDBAllowMPPExecution, Value: On, Type: TypeEnum, PossibleValues: []string{"OFF", "ON", "ENFORCE"}}, {Scope: ScopeGlobal | ScopeSession, Name: TiDBBCJThresholdCount, Value: strconv.Itoa(DefBroadcastJoinThresholdCount), Type: TypeInt, MinValue: 0, MaxValue: math.MaxInt64}, {Scope: ScopeGlobal | ScopeSession, Name: TiDBBCJThresholdSize, Value: strconv.Itoa(DefBroadcastJoinThresholdSize), Type: TypeInt, MinValue: 0, MaxValue: math.MaxInt64}, {Scope: ScopeSession, Name: TiDBSnapshot, Value: ""}, @@ -606,7 +606,7 @@ var defaultSysVars = []*SysVar{ {Scope: ScopeGlobal | ScopeSession, Name: TiDBOptCorrelationThreshold, Value: strconv.FormatFloat(DefOptCorrelationThreshold, 'f', -1, 64), Type: TypeFloat, MinValue: 0, MaxValue: 1}, {Scope: ScopeGlobal | ScopeSession, Name: TiDBOptCorrelationExpFactor, Value: strconv.Itoa(DefOptCorrelationExpFactor), Type: TypeUnsigned, MinValue: 0, MaxValue: math.MaxUint64}, {Scope: ScopeGlobal | ScopeSession, Name: TiDBOptCPUFactor, Value: strconv.FormatFloat(DefOptCPUFactor, 'f', -1, 64), Type: TypeFloat, MinValue: 0, MaxValue: math.MaxUint64}, - {Scope: ScopeGlobal | ScopeSession, Name: TiDBOptTiFlashConcurrencyFactor, Value: strconv.FormatFloat(DefOptTiFlashConcurrencyFactor, 'f', -1, 64), Type: TypeFloat, MinValue: 0, MaxValue: math.MaxUint64}, + {Scope: ScopeGlobal | ScopeSession, Name: TiDBOptTiFlashConcurrencyFactor, Value: strconv.FormatFloat(DefOptTiFlashConcurrencyFactor, 'f', -1, 64), Type: TypeFloat, MinValue: 1, MaxValue: math.MaxUint64}, {Scope: ScopeGlobal | ScopeSession, Name: TiDBOptCopCPUFactor, Value: strconv.FormatFloat(DefOptCopCPUFactor, 'f', -1, 64), Type: TypeFloat, MinValue: 0, MaxValue: math.MaxUint64}, {Scope: ScopeGlobal | ScopeSession, Name: TiDBOptNetworkFactor, Value: strconv.FormatFloat(DefOptNetworkFactor, 'f', -1, 64), Type: TypeFloat, MinValue: 0, MaxValue: math.MaxUint64}, {Scope: ScopeGlobal | ScopeSession, Name: TiDBOptScanFactor, Value: strconv.FormatFloat(DefOptScanFactor, 'f', -1, 64), Type: TypeFloat, MinValue: 0, MaxValue: math.MaxUint64}, diff --git a/sessionctx/variable/tidb_vars.go b/sessionctx/variable/tidb_vars.go index 43b4c4977cba0..022d7a40a8e51 100644 --- a/sessionctx/variable/tidb_vars.go +++ b/sessionctx/variable/tidb_vars.go @@ -294,6 +294,8 @@ const ( // The default value is 0 TiDBAllowBatchCop = "tidb_allow_batch_cop" + // TiDBAllowMPPExecution means if we should use mpp way to execute query. Default value is 1 (or 'ON'), means to be determined by the optimizer. + // Value set to 2 (or 'ENFORCE') which means to use mpp whenever possible. Value set to 2 (or 'OFF') means never use mpp. TiDBAllowMPPExecution = "tidb_allow_mpp" // TiDBInitChunkSize is used to control the init chunk size during query execution. @@ -613,7 +615,7 @@ const ( DefBroadcastJoinThresholdCount = 10 * 1024 DefTiDBOptimizerSelectivityLevel = 0 DefTiDBAllowBatchCop = 1 - DefTiDBAllowMPPExecution = true + DefTiDBAllowMPPExecution = "ON" DefTiDBTxnMode = "" DefTiDBRowFormatV1 = 1 DefTiDBRowFormatV2 = 2 diff --git a/store/copr/batch_coprocessor.go b/store/copr/batch_coprocessor.go index eae17862c53f7..b72b09c5f87a1 100644 --- a/store/copr/batch_coprocessor.go +++ b/store/copr/batch_coprocessor.go @@ -269,7 +269,7 @@ func buildBatchCopTasks(bo *tikv.Backoffer, cache *tikv.RegionCache, ranges *tik storeAddr: rpcCtx.Addr, cmdType: cmdType, ctx: rpcCtx, - regionInfos: []tikv.RegionInfo{{task.region, rpcCtx.Meta, task.ranges, allStores}}, + regionInfos: []tikv.RegionInfo{{Region: task.region, Meta: rpcCtx.Meta, Ranges: task.ranges, AllStores: allStores}}, } storeTaskMap[rpcCtx.Addr] = batchTask }