Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

planner/core, session, sessionctx/variable: add session variable to control the concurrency of shuffle merge join #21332

Merged
merged 6 commits into from
Nov 27, 2020
Merged
3 changes: 1 addition & 2 deletions planner/core/plan.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,8 +169,7 @@ func optimizeByShuffle4StreamAgg(pp *PhysicalStreamAgg, ctx sessionctx.Context)
}

func optimizeByShuffle4MergeJoin(pp *PhysicalMergeJoin, ctx sessionctx.Context) *PhysicalShuffle {
// TODO: should be configured by a session variable
concurrency := 1 // disable by default
concurrency := ctx.GetSessionVars().MergeJoinConcurrency()
if concurrency <= 1 {
return nil
}
Expand Down
1 change: 1 addition & 0 deletions session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -2222,6 +2222,7 @@ var builtinGlobalVariable = []string{
variable.TiDBHashAggPartialConcurrency,
variable.TiDBHashAggFinalConcurrency,
variable.TiDBWindowConcurrency,
variable.TiDBMergeJoinConcurrency,
variable.TiDBStreamAggConcurrency,
variable.TiDBExecutorConcurrency,
variable.TiDBBackoffLockFast,
Expand Down
19 changes: 19 additions & 0 deletions sessionctx/variable/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -907,6 +907,7 @@ func NewSessionVars() *SessionVars {
hashAggPartialConcurrency: DefTiDBHashAggPartialConcurrency,
hashAggFinalConcurrency: DefTiDBHashAggFinalConcurrency,
windowConcurrency: DefTiDBWindowConcurrency,
mergeJoinConcurrency: DefTiDBMergeJoinConcurrency,
streamAggConcurrency: DefTiDBStreamAggConcurrency,
ExecutorConcurrency: DefExecutorConcurrency,
}
Expand Down Expand Up @@ -1362,6 +1363,8 @@ func (s *SessionVars) SetSystemVar(name string, val string) error {
s.hashAggFinalConcurrency = tidbOptPositiveInt32(val, ConcurrencyUnset)
case TiDBWindowConcurrency:
s.windowConcurrency = tidbOptPositiveInt32(val, ConcurrencyUnset)
case TiDBMergeJoinConcurrency:
s.mergeJoinConcurrency = tidbOptPositiveInt32(val, ConcurrencyUnset)
case TiDBStreamAggConcurrency:
s.streamAggConcurrency = tidbOptPositiveInt32(val, ConcurrencyUnset)
case TiDBDistSQLScanConcurrency:
Expand Down Expand Up @@ -1727,6 +1730,9 @@ type Concurrency struct {
// windowConcurrency is deprecated, use ExecutorConcurrency instead.
windowConcurrency int

// mergeJoinConcurrency is the number of concurrent merge join worker
mergeJoinConcurrency int

// streamAggConcurrency is the number of concurrent stream aggregation worker.
// streamAggConcurrency is deprecated, use ExecutorConcurrency instead.
streamAggConcurrency int
Expand Down Expand Up @@ -1781,6 +1787,11 @@ func (c *Concurrency) SetWindowConcurrency(n int) {
c.windowConcurrency = n
}

// SetMergeJoinConcurrency set the number of concurrent merge join worker.
func (c *Concurrency) SetMergeJoinConcurrency(n int) {
c.mergeJoinConcurrency = n
}

// SetStreamAggConcurrency set the number of concurrent stream aggregation worker.
func (c *Concurrency) SetStreamAggConcurrency(n int) {
c.streamAggConcurrency = n
Expand Down Expand Up @@ -1852,6 +1863,14 @@ func (c *Concurrency) WindowConcurrency() int {
return c.ExecutorConcurrency
}

// MergeJoinConcurrency return the number of concurrent merge join worker.
func (c *Concurrency) MergeJoinConcurrency() int {
if c.mergeJoinConcurrency != ConcurrencyUnset {
return c.mergeJoinConcurrency
}
return c.ExecutorConcurrency
}

// StreamAggConcurrency return the number of concurrent stream aggregation worker.
func (c *Concurrency) StreamAggConcurrency() int {
if c.streamAggConcurrency != ConcurrencyUnset {
Expand Down
1 change: 1 addition & 0 deletions sessionctx/variable/sysvar.go
Original file line number Diff line number Diff line change
Expand Up @@ -1017,6 +1017,7 @@ var defaultSysVars = []*SysVar{
{Scope: ScopeGlobal | ScopeSession, Name: TiDBHashAggPartialConcurrency, Value: strconv.Itoa(DefTiDBHashAggPartialConcurrency), Type: TypeInt, MinValue: 1, MaxValue: math.MaxInt64, AllowAutoValue: true},
{Scope: ScopeGlobal | ScopeSession, Name: TiDBHashAggFinalConcurrency, Value: strconv.Itoa(DefTiDBHashAggFinalConcurrency), Type: TypeInt, MinValue: 1, MaxValue: math.MaxInt64, AllowAutoValue: true},
{Scope: ScopeGlobal | ScopeSession, Name: TiDBWindowConcurrency, Value: strconv.Itoa(DefTiDBWindowConcurrency), Type: TypeInt, MinValue: 1, MaxValue: math.MaxInt64, AllowAutoValue: true},
{Scope: ScopeGlobal | ScopeSession, Name: TiDBMergeJoinConcurrency, Value: strconv.Itoa(DefTiDBMergeJoinConcurrency), Type: TypeInt, MinValue: 1, MaxValue: math.MaxInt64, AllowAutoValue: true},
{Scope: ScopeGlobal | ScopeSession, Name: TiDBStreamAggConcurrency, Value: strconv.Itoa(DefTiDBStreamAggConcurrency), Type: TypeInt, MinValue: 1, MaxValue: math.MaxInt64, AllowAutoValue: true},
{Scope: ScopeGlobal | ScopeSession, Name: TiDBEnableParallelApply, Value: BoolToOnOff(DefTiDBEnableParallelApply), Type: TypeBool},
{Scope: ScopeGlobal | ScopeSession, Name: TiDBBackoffLockFast, Value: strconv.Itoa(kv.DefBackoffLockFast), Type: TypeUnsigned, MinValue: 1, MaxValue: math.MaxUint64},
Expand Down
4 changes: 4 additions & 0 deletions sessionctx/variable/tidb_vars.go
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,9 @@ const (
// tidb_window_concurrency is deprecated, use tidb_executor_concurrency instead.
TiDBWindowConcurrency = "tidb_window_concurrency"

// tidb_merge_join_concurrency is used for merge join parallel executor
TiDBMergeJoinConcurrency = "tidb_merge_join_concurrency"

// tidb_stream_agg_concurrency is used for stream aggregation parallel executor.
// tidb_stream_agg_concurrency is deprecated, use tidb_executor_concurrency instead.
TiDBStreamAggConcurrency = "tidb_streamagg_concurrency"
Expand Down Expand Up @@ -548,6 +551,7 @@ const (
DefTiDBHashAggPartialConcurrency = ConcurrencyUnset
DefTiDBHashAggFinalConcurrency = ConcurrencyUnset
DefTiDBWindowConcurrency = ConcurrencyUnset
DefTiDBMergeJoinConcurrency = 1 // disable optimization by default
DefTiDBStreamAggConcurrency = 1
DefTiDBForcePriority = mysql.NoPriority
DefTiDBUseRadixJoin = false
Expand Down
2 changes: 1 addition & 1 deletion sessionctx/variable/varsutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ func CheckDeprecationSetSystemVar(s *SessionVars, name string) {
switch name {
case TiDBIndexLookupConcurrency, TiDBIndexLookupJoinConcurrency,
TiDBHashJoinConcurrency, TiDBHashAggPartialConcurrency, TiDBHashAggFinalConcurrency,
TiDBProjectionConcurrency, TiDBWindowConcurrency, TiDBStreamAggConcurrency:
TiDBProjectionConcurrency, TiDBWindowConcurrency, TiDBMergeJoinConcurrency, TiDBStreamAggConcurrency:
s.StmtCtx.AppendWarning(errWarnDeprecatedSyntax.FastGenByArgs(name, TiDBExecutorConcurrency))
case TIDBMemQuotaHashJoin, TIDBMemQuotaMergeJoin,
TIDBMemQuotaSort, TIDBMemQuotaTopn,
Expand Down
11 changes: 11 additions & 0 deletions sessionctx/variable/varsutil_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,12 +74,14 @@ func (s *testVarsutilSuite) TestNewSessionVars(c *C) {
c.Assert(vars.hashAggPartialConcurrency, Equals, ConcurrencyUnset)
c.Assert(vars.hashAggFinalConcurrency, Equals, ConcurrencyUnset)
c.Assert(vars.windowConcurrency, Equals, ConcurrencyUnset)
c.Assert(vars.mergeJoinConcurrency, Equals, DefTiDBMergeJoinConcurrency)
c.Assert(vars.streamAggConcurrency, Equals, DefTiDBStreamAggConcurrency)
c.Assert(vars.distSQLScanConcurrency, Equals, DefDistSQLScanConcurrency)
c.Assert(vars.ProjectionConcurrency(), Equals, DefExecutorConcurrency)
c.Assert(vars.HashAggPartialConcurrency(), Equals, DefExecutorConcurrency)
c.Assert(vars.HashAggFinalConcurrency(), Equals, DefExecutorConcurrency)
c.Assert(vars.WindowConcurrency(), Equals, DefExecutorConcurrency)
c.Assert(vars.MergeJoinConcurrency(), Equals, DefTiDBMergeJoinConcurrency)
c.Assert(vars.StreamAggConcurrency(), Equals, DefTiDBStreamAggConcurrency)
c.Assert(vars.DistSQLScanConcurrency(), Equals, DefDistSQLScanConcurrency)
c.Assert(vars.ExecutorConcurrency, Equals, DefExecutorConcurrency)
Expand Down Expand Up @@ -667,6 +669,14 @@ func (s *testVarsutilSuite) TestConcurrencyVariables(c *C) {
c.Assert(vars.windowConcurrency, Equals, wdConcurrency)
c.Assert(vars.WindowConcurrency(), Equals, wdConcurrency)

mjConcurrency := 2
c.Assert(vars.mergeJoinConcurrency, Equals, DefTiDBMergeJoinConcurrency)
c.Assert(vars.MergeJoinConcurrency(), Equals, DefTiDBMergeJoinConcurrency)
err = SetSessionSystemVar(vars, TiDBMergeJoinConcurrency, types.NewIntDatum(int64(mjConcurrency)))
c.Assert(err, IsNil)
c.Assert(vars.mergeJoinConcurrency, Equals, mjConcurrency)
c.Assert(vars.MergeJoinConcurrency(), Equals, mjConcurrency)

saConcurrency := 2
c.Assert(vars.streamAggConcurrency, Equals, DefTiDBStreamAggConcurrency)
c.Assert(vars.StreamAggConcurrency(), Equals, DefTiDBStreamAggConcurrency)
Expand All @@ -683,6 +693,7 @@ func (s *testVarsutilSuite) TestConcurrencyVariables(c *C) {
c.Assert(vars.indexLookupConcurrency, Equals, ConcurrencyUnset)
c.Assert(vars.IndexLookupConcurrency(), Equals, exeConcurrency)
c.Assert(vars.WindowConcurrency(), Equals, wdConcurrency)
c.Assert(vars.MergeJoinConcurrency(), Equals, mjConcurrency)
c.Assert(vars.StreamAggConcurrency(), Equals, saConcurrency)

}