Skip to content

Commit

Permalink
statistics: better benchmark tests for merge topN (pingcap#48006)
Browse files Browse the repository at this point in the history
  • Loading branch information
hi-rustin authored and yibin87 committed Oct 31, 2023
1 parent 18a0e00 commit 77ae077
Show file tree
Hide file tree
Showing 3 changed files with 103 additions and 97 deletions.
6 changes: 2 additions & 4 deletions pkg/statistics/cmsketch.go
Original file line number Diff line number Diff line change
Expand Up @@ -828,14 +828,12 @@ func MergeTopN(topNs []*TopN, n uint32) (*TopN, []TopNMeta) {

// CheckEmptyTopNs checks whether all TopNs are empty.
func CheckEmptyTopNs(topNs []*TopN) bool {
count := uint64(0)
for _, topN := range topNs {
count += topN.TotalCount()
if count != 0 {
if topN.TotalCount() != 0 {
return false
}
}
return count == 0
return true
}

// SortTopnMeta sort topnMeta
Expand Down
51 changes: 39 additions & 12 deletions pkg/statistics/handle/globalstats/topn.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,21 @@ func mergeGlobalStatsTopN(gp *gp.Pool, sc sessionctx.Context, wrapper *StatsWrap
return MergeGlobalStatsTopNByConcurrency(gp, mergeConcurrency, batchSize, wrapper, timeZone, version, n, isIndex, killer)
}

// MergeGlobalStatsTopNByConcurrency merge partition topN by concurrency
// To merge global stats topn by concurrency, we will separate the partition topn in concurrency part and deal it with different worker.
// mergeConcurrency is used to control the total concurrency of the running worker, and mergeBatchSize is sued to control
// the partition size for each worker to solve it
func MergeGlobalStatsTopNByConcurrency(gp *gp.Pool, mergeConcurrency, mergeBatchSize int, wrapper *StatsWrapper,
timeZone *time.Location, version int, n uint32, isIndex bool, killer *sqlkiller.SQLKiller) (*statistics.TopN,
// MergeGlobalStatsTopNByConcurrency merge partition topN by concurrency.
// To merge global stats topN by concurrency,
// we will separate the partition topN in concurrency part and deal it with different worker.
// mergeConcurrency is used to control the total concurrency of the running worker,
// and mergeBatchSize is sued to control the partition size for each worker to solve it
func MergeGlobalStatsTopNByConcurrency(
gp *gp.Pool,
mergeConcurrency, mergeBatchSize int,
wrapper *StatsWrapper,
timeZone *time.Location,
version int,
n uint32,
isIndex bool,
killer *sqlkiller.SQLKiller,
) (*statistics.TopN,
[]statistics.TopNMeta, []*statistics.Histogram, error) {
if len(wrapper.AllTopN) < mergeConcurrency {
mergeConcurrency = len(wrapper.AllTopN)
Expand Down Expand Up @@ -119,18 +128,31 @@ func MergeGlobalStatsTopNByConcurrency(gp *gp.Pool, mergeConcurrency, mergeBatch
// MergePartTopN2GlobalTopN is used to merge the partition-level topN to global-level topN.
// The input parameters:
// 1. `topNs` are the partition-level topNs to be merged.
// 2. `n` is the size of the global-level topN. Notice: This value can be 0 and has no default value, we must explicitly specify this value.
// 3. `hists` are the partition-level histograms. Some values not in topN may be placed in the histogram. We need it here to make the value in the global-level TopN more accurate.
// 2. `n` is the size of the global-level topN.
// Notice: This value can be 0 and has no default value, we must explicitly specify this value.
// 3. `hists` are the partition-level histograms.
// Some values not in topN may be placed in the histogram.
// We need it here to make the value in the global-level TopN more accurate.
//
// The output parameters:
// 1. `*TopN` is the final global-level topN.
// 2. `[]TopNMeta` is the left topN value from the partition-level TopNs, but is not placed to global-level TopN. We should put them back to histogram latter.
// 3. `[]*Histogram` are the partition-level histograms which just delete some values when we merge the global-level topN.
func MergePartTopN2GlobalTopN(loc *time.Location, version int, topNs []*statistics.TopN, n uint32, hists []*statistics.Histogram,
isIndex bool, killer *sqlkiller.SQLKiller) (*statistics.TopN, []statistics.TopNMeta, []*statistics.Histogram, error) {
// 2. `[]TopNMeta` is the left topN value from the partition-level TopNs,
// but is not placed to global-level TopN. We should put them back to histogram latter.
// 3. `[]*Histogram` are the partition-level histograms which
// just delete some values when we merge the global-level topN.
func MergePartTopN2GlobalTopN(
loc *time.Location,
version int,
topNs []*statistics.TopN,
n uint32,
hists []*statistics.Histogram,
isIndex bool,
killer *sqlkiller.SQLKiller,
) (*statistics.TopN, []statistics.TopNMeta, []*statistics.Histogram, error) {
if statistics.CheckEmptyTopNs(topNs) {
return nil, nil, hists, nil
}

partNum := len(topNs)
// Different TopN structures may hold the same value, we have to merge them.
counter := make(map[hack.MutableString]float64)
Expand All @@ -141,9 +163,11 @@ func MergePartTopN2GlobalTopN(loc *time.Location, version int, topNs []*statisti
if err := killer.HandleSignal(); err != nil {
return nil, nil, nil, err
}
// Ignore the empty topN.
if topN.TotalCount() == 0 {
continue
}

for _, val := range topN.TopN {
encodedVal := hack.String(val.Encoded)
_, exists := counter[encodedVal]
Expand All @@ -152,13 +176,15 @@ func MergePartTopN2GlobalTopN(loc *time.Location, version int, topNs []*statisti
// We have already calculated the encodedVal from the histogram, so just continue to next topN value.
continue
}

// We need to check whether the value corresponding to encodedVal is contained in other partition-level stats.
// 1. Check the topN first.
// 2. If the topN doesn't contain the value corresponding to encodedVal. We should check the histogram.
for j := 0; j < partNum; j++ {
if err := killer.HandleSignal(); err != nil {
return nil, nil, nil, err
}

if (j == i && version >= 2) || topNs[j].FindTopN(val.Encoded) != -1 {
continue
}
Expand All @@ -181,6 +207,7 @@ func MergePartTopN2GlobalTopN(loc *time.Location, version int, topNs []*statisti
}
}
}

numTop := len(counter)
if numTop == 0 {
return nil, nil, hists, nil
Expand Down
143 changes: 62 additions & 81 deletions pkg/statistics/handle/globalstats/topn_bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package globalstats

import (
"fmt"
"math/rand"
"testing"
"time"

Expand All @@ -30,29 +31,22 @@ import (
"github.com/tiancaiamao/gp"
)

// cmd: go test -run=^$ -bench=BenchmarkMergePartTopN2GlobalTopNWithHists -benchmem github.com/pingcap/tidb/pkg/statistics/handle/globalstats
func benchmarkMergePartTopN2GlobalTopNWithHists(partitions int, b *testing.B) {
loc := time.UTC
sc := stmtctx.NewStmtCtxWithTimeZone(loc)
version := 1
killer := sqlkiller.SQLKiller{}

func prepareTopNsAndHists(b *testing.B, partitions int, tz *time.Location) ([]*statistics.TopN, []*statistics.Histogram) {
sc := stmtctx.NewStmtCtxWithTimeZone(tz)
// Prepare TopNs.
topNs := make([]*statistics.TopN, 0, partitions)
for i := 0; i < partitions; i++ {
// Construct TopN, should be key1 -> 2, key2 -> 2, key3 -> 3.
topN := statistics.NewTopN(3)
// Construct TopN, should be key1 -> rand(0, 1000), key2 -> rand(0, 1000), key3 -> rand(0, 1000)...
topN := statistics.NewTopN(500)
{
key1, err := codec.EncodeKey(sc, nil, types.NewIntDatum(1))
require.NoError(b, err)
topN.AppendTopN(key1, 2)
key2, err := codec.EncodeKey(sc, nil, types.NewIntDatum(2))
require.NoError(b, err)
topN.AppendTopN(key2, 2)
if i%2 == 0 {
key3, err := codec.EncodeKey(sc, nil, types.NewIntDatum(3))
for j := 1; j <= 500; j++ {
// Randomly skip some keys for some partitions.
if i%2 == 0 && j%2 == 0 {
continue
}
key, err := codec.EncodeKey(sc, nil, types.NewIntDatum(int64(j)))
require.NoError(b, err)
topN.AppendTopN(key3, 3)
topN.AppendTopN(key, uint64(rand.Intn(1000)))
}
}
topNs = append(topNs, topN)
Expand All @@ -62,68 +56,55 @@ func benchmarkMergePartTopN2GlobalTopNWithHists(partitions int, b *testing.B) {
hists := make([]*statistics.Histogram, 0, partitions)
for i := 0; i < partitions; i++ {
// Construct Hist
h := statistics.NewHistogram(1, 10, 0, 0, types.NewFieldType(mysql.TypeTiny), chunk.InitialCapacity, 0)
h.Bounds.AppendInt64(0, 1)
h.Buckets = append(h.Buckets, statistics.Bucket{Repeat: 10, Count: 20})
h.Bounds.AppendInt64(0, 2)
h.Buckets = append(h.Buckets, statistics.Bucket{Repeat: 10, Count: 30})
h.Bounds.AppendInt64(0, 3)
h.Buckets = append(h.Buckets, statistics.Bucket{Repeat: 10, Count: 30})
h.Bounds.AppendInt64(0, 4)
h.Buckets = append(h.Buckets, statistics.Bucket{Repeat: 10, Count: 40})
h := statistics.NewHistogram(1, 500, 0, 0, types.NewFieldType(mysql.TypeTiny), chunk.InitialCapacity, 0)
for j := 1; j <= 500; j++ {
datum := types.NewIntDatum(int64(j))
h.AppendBucket(&datum, &datum, int64(10+j*10), 10)
}
hists = append(hists, h)
}

return topNs, hists
}

func benchmarkMergePartTopN2GlobalTopNWithHists(partitions int, b *testing.B) {
loc := time.UTC
version := 1
killer := sqlkiller.SQLKiller{}
topNs, hists := prepareTopNsAndHists(b, partitions, loc)

b.ResetTimer()
for i := 0; i < b.N; i++ {
// Benchmark merge 10 topN.
_, _, _, _ = MergePartTopN2GlobalTopN(loc, version, topNs, 10, hists, false, &killer)
// Benchmark merge 100 topN.
_, _, _, _ = MergePartTopN2GlobalTopN(
loc,
version,
topNs,
100,
hists,
false,
&killer,
)
}
}

var benchmarkSizes = []int{100, 1000, 2000, 5000, 10000}

// cmd: go test -run=^$ -bench=BenchmarkMergePartTopN2GlobalTopNWithHists -benchmem github.com/pingcap/tidb/pkg/statistics/handle/globalstats
func BenchmarkMergePartTopN2GlobalTopNWithHists(b *testing.B) {
for _, size := range benchmarkSizes {
b.Run(fmt.Sprintf("Size%d", size), func(b *testing.B) {
benchmarkMergePartTopN2GlobalTopNWithHists(size, b)
})
}
}

// cmd: go test -run=^$ -bench=BenchmarkMergeGlobalStatsTopNByConcurrencyWithHists -benchmem github.com/pingcap/tidb/pkg/statistics/handle/globalstats
func benchmarkMergeGlobalStatsTopNByConcurrencyWithHists(partitions int, b *testing.B) {
loc := time.UTC
sc := stmtctx.NewStmtCtxWithTimeZone(loc)
version := 1
killer := sqlkiller.SQLKiller{}

// Prepare TopNs.
topNs := make([]*statistics.TopN, 0, partitions)
for i := 0; i < partitions; i++ {
// Construct TopN, should be key1 -> 2, key2 -> 2, key3 -> 3.
topN := statistics.NewTopN(3)
{
key1, err := codec.EncodeKey(sc, nil, types.NewIntDatum(1))
require.NoError(b, err)
topN.AppendTopN(key1, 2)
key2, err := codec.EncodeKey(sc, nil, types.NewIntDatum(2))
require.NoError(b, err)
topN.AppendTopN(key2, 2)
if i%2 == 0 {
key3, err := codec.EncodeKey(sc, nil, types.NewIntDatum(3))
require.NoError(b, err)
topN.AppendTopN(key3, 3)
}
}
topNs = append(topNs, topN)
}

// Prepare Hists.
hists := make([]*statistics.Histogram, 0, partitions)
for i := 0; i < partitions; i++ {
// Construct Hist
h := statistics.NewHistogram(1, 10, 0, 0, types.NewFieldType(mysql.TypeTiny), chunk.InitialCapacity, 0)
h.Bounds.AppendInt64(0, 1)
h.Buckets = append(h.Buckets, statistics.Bucket{Repeat: 10, Count: 20})
h.Bounds.AppendInt64(0, 2)
h.Buckets = append(h.Buckets, statistics.Bucket{Repeat: 10, Count: 30})
h.Bounds.AppendInt64(0, 3)
h.Buckets = append(h.Buckets, statistics.Bucket{Repeat: 10, Count: 30})
h.Bounds.AppendInt64(0, 4)
h.Buckets = append(h.Buckets, statistics.Bucket{Repeat: 10, Count: 40})
hists = append(hists, h)
}
topNs, hists := prepareTopNsAndHists(b, partitions, loc)
wrapper := NewStatsWrapper(hists, topNs)
const mergeConcurrency = 4
batchSize := len(wrapper.AllTopN) / mergeConcurrency
Expand All @@ -136,24 +117,24 @@ func benchmarkMergeGlobalStatsTopNByConcurrencyWithHists(partitions int, b *test
defer gpool.Close()
b.ResetTimer()
for i := 0; i < b.N; i++ {
// Benchmark merge 10 topN.
_, _, _, _ = MergeGlobalStatsTopNByConcurrency(gpool, mergeConcurrency, batchSize, wrapper, loc, version, 10, false, &killer)
}
}

var benchmarkSizes = []int{100, 1000, 10000, 100000, 1000000, 10000000}
var benchmarkConcurrencySizes = []int{100, 1000, 10000, 100000}

func BenchmarkMergePartTopN2GlobalTopNWithHists(b *testing.B) {
for _, size := range benchmarkSizes {
b.Run(fmt.Sprintf("Size%d", size), func(b *testing.B) {
benchmarkMergePartTopN2GlobalTopNWithHists(size, b)
})
// Benchmark merge 100 topN.
_, _, _, _ = MergeGlobalStatsTopNByConcurrency(
gpool,
mergeConcurrency,
batchSize,
wrapper,
loc,
version,
100,
false,
&killer,
)
}
}

// cmd: go test -run=^$ -bench=BenchmarkMergeGlobalStatsTopNByConcurrencyWithHists -benchmem github.com/pingcap/tidb/pkg/statistics/handle/globalstats
func BenchmarkMergeGlobalStatsTopNByConcurrencyWithHists(b *testing.B) {
for _, size := range benchmarkConcurrencySizes {
for _, size := range benchmarkSizes {
b.Run(fmt.Sprintf("Size%d", size), func(b *testing.B) {
benchmarkMergeGlobalStatsTopNByConcurrencyWithHists(size, b)
})
Expand Down

0 comments on commit 77ae077

Please sign in to comment.