Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

statistics: support merge global topn in concurrency #38358

Merged
merged 15 commits into from Oct 18, 2022
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
76 changes: 76 additions & 0 deletions executor/analyze_test.go
Expand Up @@ -17,6 +17,7 @@ package executor_test
import (
"fmt"
"io/ioutil"
"strconv"
"strings"
"sync/atomic"
"testing"
Expand Down Expand Up @@ -338,3 +339,78 @@ func TestAnalyzePartitionTableForFloat(t *testing.T) {
}
tk.MustExec("analyze table t1")
}

func TestAnalyzePartitionTableByConcurrencyInDynamic(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("set @@tidb_partition_prune_mode='dynamic'")
tk.MustExec("use test")
tk.MustExec("create table t(id int) partition by hash(id) partitions 4")
testcases := []struct {
concurrency string
}{
{
concurrency: "1",
},
{
concurrency: "2",
},
{
concurrency: "3",
},
{
concurrency: "4",
},
{
concurrency: "5",
},
}
// assert empty table
for _, tc := range testcases {
concurrency := tc.concurrency
fmt.Println("testcase ", concurrency)
tk.MustExec(fmt.Sprintf("set @@tidb_merge_partition_stats_concurrency=%v", concurrency))
tk.MustQuery("select @@tidb_merge_partition_stats_concurrency").Check(testkit.Rows(concurrency))
tk.MustExec("analyze table t")
tk.MustQuery("show stats_topn where partition_name = 'global' and table_name = 't'")
}

for i := 1; i <= 500; i++ {
for j := 1; j <= 20; j++ {
tk.MustExec(fmt.Sprintf("insert into t (id) values (%v)", j))
}
}
var expected [][]interface{}
for i := 1; i <= 20; i++ {
expected = append(expected, []interface{}{
strconv.FormatInt(int64(i), 10), "500",
})
}
testcases = []struct {
concurrency string
}{
{
concurrency: "1",
},
{
concurrency: "2",
},
{
concurrency: "3",
},
{
concurrency: "4",
},
{
concurrency: "5",
},
}
for _, tc := range testcases {
concurrency := tc.concurrency
fmt.Println("testcase ", concurrency)
tk.MustExec(fmt.Sprintf("set @@tidb_merge_partition_stats_concurrency=%v", concurrency))
tk.MustQuery("select @@tidb_merge_partition_stats_concurrency").Check(testkit.Rows(concurrency))
tk.MustExec("analyze table t")
tk.MustQuery("show stats_topn where partition_name = 'global' and table_name = 't'").CheckAt([]int{5, 6}, expected)
}
}
3 changes: 3 additions & 0 deletions sessionctx/variable/session.go
Expand Up @@ -1271,6 +1271,9 @@ type SessionVars struct {
// LastPlanReplayerToken indicates the last plan replayer token
LastPlanReplayerToken string

// AnalyzePartitionMergeConcurrency indicates concurrency for merging partition stats
AnalyzePartitionMergeConcurrency int

HookContext
}

Expand Down
7 changes: 7 additions & 0 deletions sessionctx/variable/sysvar.go
Expand Up @@ -1916,6 +1916,13 @@ var defaultSysVars = []*SysVar{
s.RangeMaxSize = TidbOptInt64(val, DefTiDBOptRangeMaxSize)
return nil
}},
{
Scope: ScopeGlobal | ScopeSession, Name: TiDBMergePartitionStatsConcurrency, Value: strconv.FormatInt(DefTiDBMergePartitionStatsConcurrency, 10), Type: TypeInt, MinValue: 1, MaxValue: 32,
hawkingrei marked this conversation as resolved.
Show resolved Hide resolved
SetSession: func(s *SessionVars, val string) error {
s.AnalyzePartitionMergeConcurrency = TidbOptInt(val, DefTiDBMergePartitionStatsConcurrency)
return nil
},
},
}

// FeedbackProbability points to the FeedbackProbability in statistics package.
Expand Down
4 changes: 4 additions & 0 deletions sessionctx/variable/tidb_vars.go
Expand Up @@ -749,6 +749,9 @@ const (
// ranges would exceed the limit, it chooses less accurate ranges such as full range. 0 indicates that there is no memory
// limit for ranges.
TiDBOptRangeMaxSize = "tidb_opt_range_max_size"

// TiDBMergePartitionStatsConcurrency indicates the concurrecny when merge partition stats into global stats
TiDBMergePartitionStatsConcurrency = "tidb_merge_partition_stats_concurrency"
)

// TiDB vars that have only global scope
Expand Down Expand Up @@ -1057,6 +1060,7 @@ const (
DefTiDBOptRangeMaxSize = 0
DefTiDBCostModelVer = 1
DefTiDBServerMemoryLimitSessMinSize = 128 << 20
DefTiDBMergePartitionStatsConcurrency = 1
DefTiDBServerMemoryLimitGCTrigger = 0.7
DefTiDBEnableGOGCTuner = true
)
Expand Down
21 changes: 19 additions & 2 deletions statistics/cmsketch.go
Expand Up @@ -21,6 +21,7 @@ import (
"reflect"
"sort"
"strings"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
Expand Down Expand Up @@ -729,7 +730,7 @@ func NewTopN(n int) *TopN {
// 1. `*TopN` is the final global-level topN.
// 2. `[]TopNMeta` is the left topN value from the partition-level TopNs, but is not placed to global-level TopN. We should put them back to histogram latter.
// 3. `[]*Histogram` are the partition-level histograms which just delete some values when we merge the global-level topN.
func MergePartTopN2GlobalTopN(sc *stmtctx.StatementContext, version int, topNs []*TopN, n uint32, hists []*Histogram, isIndex bool) (*TopN, []TopNMeta, []*Histogram, error) {
func MergePartTopN2GlobalTopN(loc *time.Location, version int, topNs []*TopN, n uint32, hists []*Histogram, isIndex bool) (*TopN, []TopNMeta, []*Histogram, error) {
if checkEmptyTopNs(topNs) {
return nil, nil, hists, nil
}
Expand Down Expand Up @@ -781,7 +782,7 @@ func MergePartTopN2GlobalTopN(sc *stmtctx.StatementContext, version int, topNs [
var err error
if types.IsTypeTime(hists[0].Tp.GetType()) {
// handle datetime values specially since they are encoded to int and we'll get int values if using DecodeOne.
_, d, err = codec.DecodeAsDateTime(val.Encoded, hists[0].Tp.GetType(), sc.TimeZone)
_, d, err = codec.DecodeAsDateTime(val.Encoded, hists[0].Tp.GetType(), loc)
} else if types.IsTypeFloat(hists[0].Tp.GetType()) {
_, d, err = codec.DecodeAsFloat32(val.Encoded, hists[0].Tp.GetType())
} else {
Expand Down Expand Up @@ -866,6 +867,22 @@ func checkEmptyTopNs(topNs []*TopN) bool {
return count == 0
}

// SortTopnMeta sort topnMeta
func SortTopnMeta(topnMetas []TopNMeta) []TopNMeta {
slices.SortFunc(topnMetas, func(i, j TopNMeta) bool {
if i.Count != j.Count {
return i.Count > j.Count
}
return bytes.Compare(i.Encoded, j.Encoded) < 0
})
return topnMetas
}

// GetMergedTopNFromSortedSlice returns merged topn
func GetMergedTopNFromSortedSlice(sorted []TopNMeta, n uint32) (*TopN, []TopNMeta) {
return getMergedTopNFromSortedSlice(sorted, n)
}

func getMergedTopNFromSortedSlice(sorted []TopNMeta, n uint32) (*TopN, []TopNMeta) {
slices.SortFunc(sorted, func(i, j TopNMeta) bool {
if i.Count != j.Count {
Expand Down
104 changes: 103 additions & 1 deletion statistics/handle/handle.go
Expand Up @@ -15,6 +15,7 @@
package handle

import (
"bytes"
"context"
"encoding/json"
"fmt"
Expand Down Expand Up @@ -56,6 +57,9 @@ import (
const (
// TiDBGlobalStats represents the global-stats for a partitioned table.
TiDBGlobalStats = "global"

// maxPartitionMergeBatchSize indicates the max batch size for a worker to merge partition stats
maxPartitionMergeBatchSize = 256
Yisaer marked this conversation as resolved.
Show resolved Hide resolved
)

// Handle can update stats info periodically.
Expand Down Expand Up @@ -547,7 +551,8 @@ func (h *Handle) mergePartitionStats2GlobalStats(sc sessionctx.Context,
// Because after merging TopN, some numbers will be left.
// These remaining topN numbers will be used as a separate bucket for later histogram merging.
var popedTopN []statistics.TopNMeta
globalStats.TopN[i], popedTopN, allHg[i], err = statistics.MergePartTopN2GlobalTopN(sc.GetSessionVars().StmtCtx, sc.GetSessionVars().AnalyzeVersion, allTopN[i], uint32(opts[ast.AnalyzeOptNumTopN]), allHg[i], isIndex == 1)
wrapper := statistics.NewStatsWrapper(allHg[i], allTopN[i])
globalStats.TopN[i], popedTopN, allHg[i], err = h.mergeGlobalStatsTopN(sc, wrapper, sc.GetSessionVars().StmtCtx.TimeZone, sc.GetSessionVars().AnalyzeVersion, uint32(opts[ast.AnalyzeOptNumTopN]), isIndex == 1)
if err != nil {
return
}
Expand Down Expand Up @@ -579,6 +584,103 @@ func (h *Handle) mergePartitionStats2GlobalStats(sc sessionctx.Context,
return
}

func (h *Handle) mergeGlobalStatsTopN(sc sessionctx.Context, wrapper *statistics.StatsWrapper,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we handle the version at first? For example, if version == 1 just return.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will statistics version affect this?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only the version2 has the TopN. Maybe for the Version1, we don't need to merge topN?

timeZone *time.Location, version int, n uint32, isIndex bool) (*statistics.TopN,
[]statistics.TopNMeta, []*statistics.Histogram, error) {
mergeConcurrency := sc.GetSessionVars().AnalyzePartitionMergeConcurrency
// use original method if concurrency equals 1 or for version1
if mergeConcurrency < 2 {
return statistics.MergePartTopN2GlobalTopN(timeZone, version, wrapper.AllTopN, n, wrapper.AllHg, isIndex)
}
batchSize := len(wrapper.AllTopN) / mergeConcurrency
if batchSize < 1 {
batchSize = 1
} else if batchSize > maxPartitionMergeBatchSize {
batchSize = maxPartitionMergeBatchSize
}
return h.mergeGlobalStatsTopNByConcurrency(mergeConcurrency, batchSize, wrapper, timeZone, version, n, isIndex)
}

// mergeGlobalStatsTopNByConcurrency merge partition topN by concurrency
// To merge global stats topn by concurrency, we will separate the partition topn in concurrency part and deal it with different worker.
// mergeConcurrency is used to control the total concurrency of the running worker, and mergeBatchSize is sued to control
// the partition size for each worker to solve it
func (h *Handle) mergeGlobalStatsTopNByConcurrency(mergeConcurrency, mergeBatchSize int, wrapper *statistics.StatsWrapper,
timeZone *time.Location, version int, n uint32, isIndex bool) (*statistics.TopN,
[]statistics.TopNMeta, []*statistics.Histogram, error) {
if len(wrapper.AllTopN) < mergeConcurrency {
mergeConcurrency = len(wrapper.AllTopN)
}
tasks := make([]*statistics.TopnStatsMergeTask, 0)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe the capacity can be Len(wrapper.AllTopN) + 1?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's not necessary

for start := 0; start < len(wrapper.AllTopN); {
end := start + mergeBatchSize
if end > len(wrapper.AllTopN) {
end = len(wrapper.AllTopN)
}
task := statistics.NewTopnStatsMergeTask(start, end)
tasks = append(tasks, task)
start = end
}
taskNum := len(tasks)
wg := &sync.WaitGroup{}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

replace with util.WaitGroupWrapper. We will be able to metrics those goroutine in the future.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated

taskCh := make(chan *statistics.TopnStatsMergeTask, taskNum)
respCh := make(chan *statistics.TopnStatsMergeResponse, taskNum)
wg.Add(mergeConcurrency)
for i := 0; i < mergeConcurrency; i++ {
worker := statistics.NewTopnStatsMergeWorker(wg, taskCh, respCh, wrapper)
go worker.Run(timeZone, isIndex, n, version)
}
for _, task := range tasks {
taskCh <- task
}
close(taskCh)
wg.Wait()
close(respCh)
resps := make([]*statistics.TopnStatsMergeResponse, 0)

// handle Error
hasErr := false
for resp := range respCh {
if resp.Err != nil {
hasErr = true
}
resps = append(resps, resp)
}
if hasErr {
errMsg := make([]string, 0)
for _, resp := range resps {
if resp.Err != nil {
errMsg = append(errMsg, resp.Err.Error())
}
}
return nil, nil, nil, errors.New(strings.Join(errMsg, ","))
}

// fetch the response from each worker and merge them into global topn stats
sorted := make([]statistics.TopNMeta, 0, mergeConcurrency)
leftTopn := make([]statistics.TopNMeta, 0)
for _, resp := range resps {
if resp.TopN != nil {
sorted = append(sorted, resp.TopN.TopN...)
}
leftTopn = append(leftTopn, resp.PopedTopn...)
for i, removeTopn := range resp.RemoveVals {
// Remove the value from the Hists.
if len(removeTopn) > 0 {
tmp := removeTopn
slices.SortFunc(tmp, func(i, j statistics.TopNMeta) bool {
cmpResult := bytes.Compare(i.Encoded, j.Encoded)
return cmpResult < 0
})
wrapper.AllHg[i].RemoveVals(tmp)
}
}
Comment on lines +669 to +679
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we gather all of the removeTopN for one partition from all resps. Remove them in histogram at once. Can this improve the histogram's accurate?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reason why I gather all the removed topn from response is to keep only reading stats during worker in order to avoid data race.

}

globalTopN, popedTopn := statistics.GetMergedTopNFromSortedSlice(sorted, n)
return globalTopN, statistics.SortTopnMeta(append(leftTopn, popedTopn...)), wrapper.AllHg, nil
}

func (h *Handle) getTableByPhysicalID(is infoschema.InfoSchema, physicalID int64) (table.Table, bool) {
if is.SchemaMetaVersion() != h.mu.schemaVersion {
h.mu.schemaVersion = is.SchemaMetaVersion()
Expand Down