diff --git a/executor/analyze_test.go b/executor/analyze_test.go index 2139816195b5..246016a4082f 100644 --- a/executor/analyze_test.go +++ b/executor/analyze_test.go @@ -17,6 +17,7 @@ package executor_test import ( "fmt" "io/ioutil" + "strconv" "strings" "sync/atomic" "testing" @@ -338,3 +339,78 @@ func TestAnalyzePartitionTableForFloat(t *testing.T) { } tk.MustExec("analyze table t1") } + +func TestAnalyzePartitionTableByConcurrencyInDynamic(t *testing.T) { + store := testkit.CreateMockStore(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec("set @@tidb_partition_prune_mode='dynamic'") + tk.MustExec("use test") + tk.MustExec("create table t(id int) partition by hash(id) partitions 4") + testcases := []struct { + concurrency string + }{ + { + concurrency: "1", + }, + { + concurrency: "2", + }, + { + concurrency: "3", + }, + { + concurrency: "4", + }, + { + concurrency: "5", + }, + } + // assert empty table + for _, tc := range testcases { + concurrency := tc.concurrency + fmt.Println("testcase ", concurrency) + tk.MustExec(fmt.Sprintf("set @@tidb_merge_partition_stats_concurrency=%v", concurrency)) + tk.MustQuery("select @@tidb_merge_partition_stats_concurrency").Check(testkit.Rows(concurrency)) + tk.MustExec("analyze table t") + tk.MustQuery("show stats_topn where partition_name = 'global' and table_name = 't'") + } + + for i := 1; i <= 500; i++ { + for j := 1; j <= 20; j++ { + tk.MustExec(fmt.Sprintf("insert into t (id) values (%v)", j)) + } + } + var expected [][]interface{} + for i := 1; i <= 20; i++ { + expected = append(expected, []interface{}{ + strconv.FormatInt(int64(i), 10), "500", + }) + } + testcases = []struct { + concurrency string + }{ + { + concurrency: "1", + }, + { + concurrency: "2", + }, + { + concurrency: "3", + }, + { + concurrency: "4", + }, + { + concurrency: "5", + }, + } + for _, tc := range testcases { + concurrency := tc.concurrency + fmt.Println("testcase ", concurrency) + tk.MustExec(fmt.Sprintf("set @@tidb_merge_partition_stats_concurrency=%v", concurrency)) + tk.MustQuery("select @@tidb_merge_partition_stats_concurrency").Check(testkit.Rows(concurrency)) + tk.MustExec("analyze table t") + tk.MustQuery("show stats_topn where partition_name = 'global' and table_name = 't'").CheckAt([]int{5, 6}, expected) + } +} diff --git a/sessionctx/variable/session.go b/sessionctx/variable/session.go index c0b71b4e10af..12841c40a67f 100644 --- a/sessionctx/variable/session.go +++ b/sessionctx/variable/session.go @@ -1271,6 +1271,9 @@ type SessionVars struct { // LastPlanReplayerToken indicates the last plan replayer token LastPlanReplayerToken string + // AnalyzePartitionMergeConcurrency indicates concurrency for merging partition stats + AnalyzePartitionMergeConcurrency int + HookContext } diff --git a/sessionctx/variable/sysvar.go b/sessionctx/variable/sysvar.go index c350d71db559..a95d497280f0 100644 --- a/sessionctx/variable/sysvar.go +++ b/sessionctx/variable/sysvar.go @@ -1917,6 +1917,13 @@ var defaultSysVars = []*SysVar{ s.RangeMaxSize = TidbOptInt64(val, DefTiDBOptRangeMaxSize) return nil }}, + { + Scope: ScopeGlobal | ScopeSession, Name: TiDBMergePartitionStatsConcurrency, Value: strconv.FormatInt(DefTiDBMergePartitionStatsConcurrency, 10), Type: TypeInt, MinValue: 1, MaxValue: MaxConfigurableConcurrency, + SetSession: func(s *SessionVars, val string) error { + s.AnalyzePartitionMergeConcurrency = TidbOptInt(val, DefTiDBMergePartitionStatsConcurrency) + return nil + }, + }, } // FeedbackProbability points to the FeedbackProbability in statistics package. diff --git a/sessionctx/variable/tidb_vars.go b/sessionctx/variable/tidb_vars.go index 9ebdd9ecc61b..38fecce6f95f 100644 --- a/sessionctx/variable/tidb_vars.go +++ b/sessionctx/variable/tidb_vars.go @@ -749,6 +749,9 @@ const ( // ranges would exceed the limit, it chooses less accurate ranges such as full range. 0 indicates that there is no memory // limit for ranges. TiDBOptRangeMaxSize = "tidb_opt_range_max_size" + + // TiDBMergePartitionStatsConcurrency indicates the concurrecny when merge partition stats into global stats + TiDBMergePartitionStatsConcurrency = "tidb_merge_partition_stats_concurrency" ) // TiDB vars that have only global scope @@ -1057,6 +1060,7 @@ const ( DefTiDBOptRangeMaxSize = 0 DefTiDBCostModelVer = 1 DefTiDBServerMemoryLimitSessMinSize = 128 << 20 + DefTiDBMergePartitionStatsConcurrency = 1 DefTiDBServerMemoryLimitGCTrigger = 0.7 DefTiDBEnableGOGCTuner = true ) diff --git a/statistics/cmsketch.go b/statistics/cmsketch.go index 848a10a65332..5f412371d2a0 100644 --- a/statistics/cmsketch.go +++ b/statistics/cmsketch.go @@ -21,6 +21,7 @@ import ( "reflect" "sort" "strings" + "time" "github.com/pingcap/errors" "github.com/pingcap/failpoint" @@ -729,7 +730,7 @@ func NewTopN(n int) *TopN { // 1. `*TopN` is the final global-level topN. // 2. `[]TopNMeta` is the left topN value from the partition-level TopNs, but is not placed to global-level TopN. We should put them back to histogram latter. // 3. `[]*Histogram` are the partition-level histograms which just delete some values when we merge the global-level topN. -func MergePartTopN2GlobalTopN(sc *stmtctx.StatementContext, version int, topNs []*TopN, n uint32, hists []*Histogram, isIndex bool) (*TopN, []TopNMeta, []*Histogram, error) { +func MergePartTopN2GlobalTopN(loc *time.Location, version int, topNs []*TopN, n uint32, hists []*Histogram, isIndex bool) (*TopN, []TopNMeta, []*Histogram, error) { if checkEmptyTopNs(topNs) { return nil, nil, hists, nil } @@ -781,7 +782,7 @@ func MergePartTopN2GlobalTopN(sc *stmtctx.StatementContext, version int, topNs [ var err error if types.IsTypeTime(hists[0].Tp.GetType()) { // handle datetime values specially since they are encoded to int and we'll get int values if using DecodeOne. - _, d, err = codec.DecodeAsDateTime(val.Encoded, hists[0].Tp.GetType(), sc.TimeZone) + _, d, err = codec.DecodeAsDateTime(val.Encoded, hists[0].Tp.GetType(), loc) } else if types.IsTypeFloat(hists[0].Tp.GetType()) { _, d, err = codec.DecodeAsFloat32(val.Encoded, hists[0].Tp.GetType()) } else { @@ -866,6 +867,22 @@ func checkEmptyTopNs(topNs []*TopN) bool { return count == 0 } +// SortTopnMeta sort topnMeta +func SortTopnMeta(topnMetas []TopNMeta) []TopNMeta { + slices.SortFunc(topnMetas, func(i, j TopNMeta) bool { + if i.Count != j.Count { + return i.Count > j.Count + } + return bytes.Compare(i.Encoded, j.Encoded) < 0 + }) + return topnMetas +} + +// GetMergedTopNFromSortedSlice returns merged topn +func GetMergedTopNFromSortedSlice(sorted []TopNMeta, n uint32) (*TopN, []TopNMeta) { + return getMergedTopNFromSortedSlice(sorted, n) +} + func getMergedTopNFromSortedSlice(sorted []TopNMeta, n uint32) (*TopN, []TopNMeta) { slices.SortFunc(sorted, func(i, j TopNMeta) bool { if i.Count != j.Count { diff --git a/statistics/handle/handle.go b/statistics/handle/handle.go index 00660c9756a6..6c08b5245fca 100644 --- a/statistics/handle/handle.go +++ b/statistics/handle/handle.go @@ -15,6 +15,7 @@ package handle import ( + "bytes" "context" "encoding/json" "fmt" @@ -28,7 +29,7 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/failpoint" "github.com/pingcap/tidb/config" - "github.com/pingcap/tidb/ddl/util" + ddlUtil "github.com/pingcap/tidb/ddl/util" "github.com/pingcap/tidb/infoschema" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/metrics" @@ -41,6 +42,7 @@ import ( "github.com/pingcap/tidb/statistics" "github.com/pingcap/tidb/table" "github.com/pingcap/tidb/types" + "github.com/pingcap/tidb/util" "github.com/pingcap/tidb/util/chunk" "github.com/pingcap/tidb/util/logutil" "github.com/pingcap/tidb/util/mathutil" @@ -56,6 +58,9 @@ import ( const ( // TiDBGlobalStats represents the global-stats for a partitioned table. TiDBGlobalStats = "global" + + // maxPartitionMergeBatchSize indicates the max batch size for a worker to merge partition stats + maxPartitionMergeBatchSize = 256 ) // Handle can update stats info periodically. @@ -83,7 +88,7 @@ type Handle struct { // ddlEventCh is a channel to notify a ddl operation has happened. // It is sent only by owner or the drop stats executor, and read by stats handle. - ddlEventCh chan *util.Event + ddlEventCh chan *ddlUtil.Event // listHead contains all the stats collector required by session. listHead *SessionStatsCollector // globalMap contains all the delta map from collectors when we dump them to KV. @@ -197,7 +202,7 @@ type sessionPool interface { func NewHandle(ctx sessionctx.Context, lease time.Duration, pool sessionPool, tracker sessionctx.SysProcTracker, serverIDGetter func() uint64) (*Handle, error) { cfg := config.GetGlobalConfig() handle := &Handle{ - ddlEventCh: make(chan *util.Event, 100), + ddlEventCh: make(chan *ddlUtil.Event, 100), listHead: &SessionStatsCollector{mapper: make(tableDeltaMap), rateMap: make(errorRateDeltaMap)}, idxUsageListHead: &SessionIndexUsageCollector{mapper: make(indexUsageMap)}, pool: pool, @@ -547,7 +552,8 @@ func (h *Handle) mergePartitionStats2GlobalStats(sc sessionctx.Context, // Because after merging TopN, some numbers will be left. // These remaining topN numbers will be used as a separate bucket for later histogram merging. var popedTopN []statistics.TopNMeta - globalStats.TopN[i], popedTopN, allHg[i], err = statistics.MergePartTopN2GlobalTopN(sc.GetSessionVars().StmtCtx, sc.GetSessionVars().AnalyzeVersion, allTopN[i], uint32(opts[ast.AnalyzeOptNumTopN]), allHg[i], isIndex == 1) + wrapper := statistics.NewStatsWrapper(allHg[i], allTopN[i]) + globalStats.TopN[i], popedTopN, allHg[i], err = h.mergeGlobalStatsTopN(sc, wrapper, sc.GetSessionVars().StmtCtx.TimeZone, sc.GetSessionVars().AnalyzeVersion, uint32(opts[ast.AnalyzeOptNumTopN]), isIndex == 1) if err != nil { return } @@ -579,6 +585,104 @@ func (h *Handle) mergePartitionStats2GlobalStats(sc sessionctx.Context, return } +func (h *Handle) mergeGlobalStatsTopN(sc sessionctx.Context, wrapper *statistics.StatsWrapper, + timeZone *time.Location, version int, n uint32, isIndex bool) (*statistics.TopN, + []statistics.TopNMeta, []*statistics.Histogram, error) { + mergeConcurrency := sc.GetSessionVars().AnalyzePartitionMergeConcurrency + // use original method if concurrency equals 1 or for version1 + if mergeConcurrency < 2 { + return statistics.MergePartTopN2GlobalTopN(timeZone, version, wrapper.AllTopN, n, wrapper.AllHg, isIndex) + } + batchSize := len(wrapper.AllTopN) / mergeConcurrency + if batchSize < 1 { + batchSize = 1 + } else if batchSize > maxPartitionMergeBatchSize { + batchSize = maxPartitionMergeBatchSize + } + return h.mergeGlobalStatsTopNByConcurrency(mergeConcurrency, batchSize, wrapper, timeZone, version, n, isIndex) +} + +// mergeGlobalStatsTopNByConcurrency merge partition topN by concurrency +// To merge global stats topn by concurrency, we will separate the partition topn in concurrency part and deal it with different worker. +// mergeConcurrency is used to control the total concurrency of the running worker, and mergeBatchSize is sued to control +// the partition size for each worker to solve it +func (h *Handle) mergeGlobalStatsTopNByConcurrency(mergeConcurrency, mergeBatchSize int, wrapper *statistics.StatsWrapper, + timeZone *time.Location, version int, n uint32, isIndex bool) (*statistics.TopN, + []statistics.TopNMeta, []*statistics.Histogram, error) { + if len(wrapper.AllTopN) < mergeConcurrency { + mergeConcurrency = len(wrapper.AllTopN) + } + tasks := make([]*statistics.TopnStatsMergeTask, 0) + for start := 0; start < len(wrapper.AllTopN); { + end := start + mergeBatchSize + if end > len(wrapper.AllTopN) { + end = len(wrapper.AllTopN) + } + task := statistics.NewTopnStatsMergeTask(start, end) + tasks = append(tasks, task) + start = end + } + var wg util.WaitGroupWrapper + taskNum := len(tasks) + taskCh := make(chan *statistics.TopnStatsMergeTask, taskNum) + respCh := make(chan *statistics.TopnStatsMergeResponse, taskNum) + for i := 0; i < mergeConcurrency; i++ { + worker := statistics.NewTopnStatsMergeWorker(taskCh, respCh, wrapper) + wg.Run(func() { + worker.Run(timeZone, isIndex, n, version) + }) + } + for _, task := range tasks { + taskCh <- task + } + close(taskCh) + wg.Wait() + close(respCh) + resps := make([]*statistics.TopnStatsMergeResponse, 0) + + // handle Error + hasErr := false + for resp := range respCh { + if resp.Err != nil { + hasErr = true + } + resps = append(resps, resp) + } + if hasErr { + errMsg := make([]string, 0) + for _, resp := range resps { + if resp.Err != nil { + errMsg = append(errMsg, resp.Err.Error()) + } + } + return nil, nil, nil, errors.New(strings.Join(errMsg, ",")) + } + + // fetch the response from each worker and merge them into global topn stats + sorted := make([]statistics.TopNMeta, 0, mergeConcurrency) + leftTopn := make([]statistics.TopNMeta, 0) + for _, resp := range resps { + if resp.TopN != nil { + sorted = append(sorted, resp.TopN.TopN...) + } + leftTopn = append(leftTopn, resp.PopedTopn...) + for i, removeTopn := range resp.RemoveVals { + // Remove the value from the Hists. + if len(removeTopn) > 0 { + tmp := removeTopn + slices.SortFunc(tmp, func(i, j statistics.TopNMeta) bool { + cmpResult := bytes.Compare(i.Encoded, j.Encoded) + return cmpResult < 0 + }) + wrapper.AllHg[i].RemoveVals(tmp) + } + } + } + + globalTopN, popedTopn := statistics.GetMergedTopNFromSortedSlice(sorted, n) + return globalTopN, statistics.SortTopnMeta(append(leftTopn, popedTopn...)), wrapper.AllHg, nil +} + func (h *Handle) getTableByPhysicalID(is infoschema.InfoSchema, physicalID int64) (table.Table, bool) { if is.SchemaMetaVersion() != h.mu.schemaVersion { h.mu.schemaVersion = is.SchemaMetaVersion() diff --git a/statistics/merge_worker.go b/statistics/merge_worker.go new file mode 100644 index 000000000000..ac3460583555 --- /dev/null +++ b/statistics/merge_worker.go @@ -0,0 +1,188 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package statistics + +import ( + "time" + + "github.com/pingcap/tidb/types" + "github.com/pingcap/tidb/util/codec" + "github.com/pingcap/tidb/util/hack" +) + +// StatsWrapper wrapper stats +type StatsWrapper struct { + AllHg []*Histogram + AllTopN []*TopN +} + +// NewStatsWrapper returns wrapper +func NewStatsWrapper(hg []*Histogram, topN []*TopN) *StatsWrapper { + return &StatsWrapper{ + AllHg: hg, + AllTopN: topN, + } +} + +type topnStatsMergeWorker struct { + taskCh <-chan *TopnStatsMergeTask + respCh chan<- *TopnStatsMergeResponse + // the stats in the wrapper should only be read during the worker + statsWrapper *StatsWrapper +} + +// NewTopnStatsMergeWorker returns topn merge worker +func NewTopnStatsMergeWorker( + taskCh <-chan *TopnStatsMergeTask, + respCh chan<- *TopnStatsMergeResponse, + wrapper *StatsWrapper) *topnStatsMergeWorker { + worker := &topnStatsMergeWorker{ + taskCh: taskCh, + respCh: respCh, + } + worker.statsWrapper = wrapper + return worker +} + +// TopnStatsMergeTask indicates a task for merge topn stats +type TopnStatsMergeTask struct { + start int + end int +} + +// NewTopnStatsMergeTask returns task +func NewTopnStatsMergeTask(start, end int) *TopnStatsMergeTask { + return &TopnStatsMergeTask{ + start: start, + end: end, + } +} + +// TopnStatsMergeResponse indicates topn merge worker response +type TopnStatsMergeResponse struct { + TopN *TopN + PopedTopn []TopNMeta + RemoveVals [][]TopNMeta + Err error +} + +// Run runs topn merge like statistics.MergePartTopN2GlobalTopN +func (worker *topnStatsMergeWorker) Run(timeZone *time.Location, isIndex bool, + n uint32, + version int) { + for task := range worker.taskCh { + start := task.start + end := task.end + checkTopNs := worker.statsWrapper.AllTopN[start:end] + allTopNs := worker.statsWrapper.AllTopN + allHists := worker.statsWrapper.AllHg + resp := &TopnStatsMergeResponse{} + if checkEmptyTopNs(checkTopNs) { + worker.respCh <- resp + return + } + partNum := len(allTopNs) + checkNum := len(checkTopNs) + topNsNum := make([]int, checkNum) + removeVals := make([][]TopNMeta, partNum) + for i, topN := range checkTopNs { + if topN == nil { + topNsNum[i] = 0 + continue + } + topNsNum[i] = len(topN.TopN) + } + // Different TopN structures may hold the same value, we have to merge them. + counter := make(map[hack.MutableString]float64) + // datumMap is used to store the mapping from the string type to datum type. + // The datum is used to find the value in the histogram. + datumMap := make(map[hack.MutableString]types.Datum) + + for i, topN := range checkTopNs { + if topN.TotalCount() == 0 { + continue + } + for _, val := range topN.TopN { + encodedVal := hack.String(val.Encoded) + _, exists := counter[encodedVal] + counter[encodedVal] += float64(val.Count) + if exists { + // We have already calculated the encodedVal from the histogram, so just continue to next topN value. + continue + } + // We need to check whether the value corresponding to encodedVal is contained in other partition-level stats. + // 1. Check the topN first. + // 2. If the topN doesn't contain the value corresponding to encodedVal. We should check the histogram. + for j := 0; j < partNum; j++ { + if (j == i && version >= 2) || allTopNs[j].findTopN(val.Encoded) != -1 { + continue + } + // Get the encodedVal from the hists[j] + datum, exists := datumMap[encodedVal] + if !exists { + // If the datumMap does not have the encodedVal datum, + // we should generate the datum based on the encoded value. + // This part is copied from the function MergePartitionHist2GlobalHist. + var d types.Datum + if isIndex { + d.SetBytes(val.Encoded) + } else { + var err error + if types.IsTypeTime(allHists[0].Tp.GetType()) { + // handle datetime values specially since they are encoded to int and we'll get int values if using DecodeOne. + _, d, err = codec.DecodeAsDateTime(val.Encoded, allHists[0].Tp.GetType(), timeZone) + } else if types.IsTypeFloat(allHists[0].Tp.GetType()) { + _, d, err = codec.DecodeAsFloat32(val.Encoded, allHists[0].Tp.GetType()) + } else { + _, d, err = codec.DecodeOne(val.Encoded) + } + if err != nil { + resp.Err = err + worker.respCh <- resp + return + } + } + datumMap[encodedVal] = d + datum = d + } + // Get the row count which the value is equal to the encodedVal from histogram. + count, _ := allHists[j].equalRowCount(datum, isIndex) + if count != 0 { + counter[encodedVal] += count + // Remove the value corresponding to encodedVal from the histogram. + removeVals[j] = append(removeVals[j], TopNMeta{Encoded: datum.GetBytes(), Count: uint64(count)}) + } + } + } + } + // record remove values + resp.RemoveVals = removeVals + + numTop := len(counter) + if numTop == 0 { + worker.respCh <- resp + continue + } + sorted := make([]TopNMeta, 0, numTop) + for value, cnt := range counter { + data := hack.Slice(string(value)) + sorted = append(sorted, TopNMeta{Encoded: data, Count: uint64(cnt)}) + } + globalTopN, leftTopN := getMergedTopNFromSortedSlice(sorted, n) + resp.TopN = globalTopN + resp.PopedTopn = leftTopN + worker.respCh <- resp + } +}