Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

statistics: better benchmark tests for merge topN #48006

Merged
merged 2 commits into from
Oct 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
6 changes: 2 additions & 4 deletions pkg/statistics/cmsketch.go
Original file line number Diff line number Diff line change
Expand Up @@ -828,14 +828,12 @@ func MergeTopN(topNs []*TopN, n uint32) (*TopN, []TopNMeta) {

// CheckEmptyTopNs checks whether all TopNs are empty.
func CheckEmptyTopNs(topNs []*TopN) bool {
count := uint64(0)
for _, topN := range topNs {
count += topN.TotalCount()
if count != 0 {
if topN.TotalCount() != 0 {
return false
}
}
return count == 0
return true
}

// SortTopnMeta sort topnMeta
Expand Down
51 changes: 39 additions & 12 deletions pkg/statistics/handle/globalstats/topn.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,21 @@ func mergeGlobalStatsTopN(gp *gp.Pool, sc sessionctx.Context, wrapper *StatsWrap
return MergeGlobalStatsTopNByConcurrency(gp, mergeConcurrency, batchSize, wrapper, timeZone, version, n, isIndex, killer)
}

// MergeGlobalStatsTopNByConcurrency merge partition topN by concurrency
// To merge global stats topn by concurrency, we will separate the partition topn in concurrency part and deal it with different worker.
// mergeConcurrency is used to control the total concurrency of the running worker, and mergeBatchSize is sued to control
// the partition size for each worker to solve it
func MergeGlobalStatsTopNByConcurrency(gp *gp.Pool, mergeConcurrency, mergeBatchSize int, wrapper *StatsWrapper,
timeZone *time.Location, version int, n uint32, isIndex bool, killer *sqlkiller.SQLKiller) (*statistics.TopN,
// MergeGlobalStatsTopNByConcurrency merge partition topN by concurrency.
// To merge global stats topN by concurrency,
// we will separate the partition topN in concurrency part and deal it with different worker.
// mergeConcurrency is used to control the total concurrency of the running worker,
// and mergeBatchSize is sued to control the partition size for each worker to solve it
func MergeGlobalStatsTopNByConcurrency(
gp *gp.Pool,
mergeConcurrency, mergeBatchSize int,
wrapper *StatsWrapper,
timeZone *time.Location,
version int,
n uint32,
isIndex bool,
killer *sqlkiller.SQLKiller,
) (*statistics.TopN,
[]statistics.TopNMeta, []*statistics.Histogram, error) {
if len(wrapper.AllTopN) < mergeConcurrency {
mergeConcurrency = len(wrapper.AllTopN)
Expand Down Expand Up @@ -119,18 +128,31 @@ func MergeGlobalStatsTopNByConcurrency(gp *gp.Pool, mergeConcurrency, mergeBatch
// MergePartTopN2GlobalTopN is used to merge the partition-level topN to global-level topN.
// The input parameters:
// 1. `topNs` are the partition-level topNs to be merged.
// 2. `n` is the size of the global-level topN. Notice: This value can be 0 and has no default value, we must explicitly specify this value.
// 3. `hists` are the partition-level histograms. Some values not in topN may be placed in the histogram. We need it here to make the value in the global-level TopN more accurate.
// 2. `n` is the size of the global-level topN.
// Notice: This value can be 0 and has no default value, we must explicitly specify this value.
// 3. `hists` are the partition-level histograms.
// Some values not in topN may be placed in the histogram.
// We need it here to make the value in the global-level TopN more accurate.
//
// The output parameters:
// 1. `*TopN` is the final global-level topN.
// 2. `[]TopNMeta` is the left topN value from the partition-level TopNs, but is not placed to global-level TopN. We should put them back to histogram latter.
// 3. `[]*Histogram` are the partition-level histograms which just delete some values when we merge the global-level topN.
func MergePartTopN2GlobalTopN(loc *time.Location, version int, topNs []*statistics.TopN, n uint32, hists []*statistics.Histogram,
isIndex bool, killer *sqlkiller.SQLKiller) (*statistics.TopN, []statistics.TopNMeta, []*statistics.Histogram, error) {
// 2. `[]TopNMeta` is the left topN value from the partition-level TopNs,
// but is not placed to global-level TopN. We should put them back to histogram latter.
// 3. `[]*Histogram` are the partition-level histograms which
// just delete some values when we merge the global-level topN.
func MergePartTopN2GlobalTopN(
loc *time.Location,
version int,
topNs []*statistics.TopN,
n uint32,
hists []*statistics.Histogram,
isIndex bool,
killer *sqlkiller.SQLKiller,
) (*statistics.TopN, []statistics.TopNMeta, []*statistics.Histogram, error) {
if statistics.CheckEmptyTopNs(topNs) {
return nil, nil, hists, nil
}

partNum := len(topNs)
// Different TopN structures may hold the same value, we have to merge them.
counter := make(map[hack.MutableString]float64)
Expand All @@ -141,9 +163,11 @@ func MergePartTopN2GlobalTopN(loc *time.Location, version int, topNs []*statisti
if err := killer.HandleSignal(); err != nil {
return nil, nil, nil, err
}
// Ignore the empty topN.
if topN.TotalCount() == 0 {
continue
}

for _, val := range topN.TopN {
encodedVal := hack.String(val.Encoded)
_, exists := counter[encodedVal]
Expand All @@ -152,13 +176,15 @@ func MergePartTopN2GlobalTopN(loc *time.Location, version int, topNs []*statisti
// We have already calculated the encodedVal from the histogram, so just continue to next topN value.
continue
}

// We need to check whether the value corresponding to encodedVal is contained in other partition-level stats.
// 1. Check the topN first.
// 2. If the topN doesn't contain the value corresponding to encodedVal. We should check the histogram.
for j := 0; j < partNum; j++ {
if err := killer.HandleSignal(); err != nil {
return nil, nil, nil, err
}

if (j == i && version >= 2) || topNs[j].FindTopN(val.Encoded) != -1 {
continue
}
Expand All @@ -181,6 +207,7 @@ func MergePartTopN2GlobalTopN(loc *time.Location, version int, topNs []*statisti
}
}
}

numTop := len(counter)
if numTop == 0 {
return nil, nil, hists, nil
Expand Down
143 changes: 62 additions & 81 deletions pkg/statistics/handle/globalstats/topn_bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package globalstats

import (
"fmt"
"math/rand"
"testing"
"time"

Expand All @@ -30,29 +31,22 @@ import (
"github.com/tiancaiamao/gp"
)

// cmd: go test -run=^$ -bench=BenchmarkMergePartTopN2GlobalTopNWithHists -benchmem github.com/pingcap/tidb/pkg/statistics/handle/globalstats
func benchmarkMergePartTopN2GlobalTopNWithHists(partitions int, b *testing.B) {
loc := time.UTC
sc := stmtctx.NewStmtCtxWithTimeZone(loc)
version := 1
killer := sqlkiller.SQLKiller{}

func prepareTopNsAndHists(b *testing.B, partitions int, tz *time.Location) ([]*statistics.TopN, []*statistics.Histogram) {
sc := stmtctx.NewStmtCtxWithTimeZone(tz)
// Prepare TopNs.
topNs := make([]*statistics.TopN, 0, partitions)
for i := 0; i < partitions; i++ {
// Construct TopN, should be key1 -> 2, key2 -> 2, key3 -> 3.
topN := statistics.NewTopN(3)
// Construct TopN, should be key1 -> rand(0, 1000), key2 -> rand(0, 1000), key3 -> rand(0, 1000)...
topN := statistics.NewTopN(500)
{
key1, err := codec.EncodeKey(sc, nil, types.NewIntDatum(1))
require.NoError(b, err)
topN.AppendTopN(key1, 2)
key2, err := codec.EncodeKey(sc, nil, types.NewIntDatum(2))
require.NoError(b, err)
topN.AppendTopN(key2, 2)
if i%2 == 0 {
key3, err := codec.EncodeKey(sc, nil, types.NewIntDatum(3))
for j := 1; j <= 500; j++ {
// Randomly skip some keys for some partitions.
if i%2 == 0 && j%2 == 0 {
continue
}
key, err := codec.EncodeKey(sc, nil, types.NewIntDatum(int64(j)))
require.NoError(b, err)
topN.AppendTopN(key3, 3)
topN.AppendTopN(key, uint64(rand.Intn(1000)))
}
}
topNs = append(topNs, topN)
Expand All @@ -62,68 +56,55 @@ func benchmarkMergePartTopN2GlobalTopNWithHists(partitions int, b *testing.B) {
hists := make([]*statistics.Histogram, 0, partitions)
for i := 0; i < partitions; i++ {
// Construct Hist
h := statistics.NewHistogram(1, 10, 0, 0, types.NewFieldType(mysql.TypeTiny), chunk.InitialCapacity, 0)
h.Bounds.AppendInt64(0, 1)
h.Buckets = append(h.Buckets, statistics.Bucket{Repeat: 10, Count: 20})
h.Bounds.AppendInt64(0, 2)
h.Buckets = append(h.Buckets, statistics.Bucket{Repeat: 10, Count: 30})
h.Bounds.AppendInt64(0, 3)
h.Buckets = append(h.Buckets, statistics.Bucket{Repeat: 10, Count: 30})
h.Bounds.AppendInt64(0, 4)
h.Buckets = append(h.Buckets, statistics.Bucket{Repeat: 10, Count: 40})
h := statistics.NewHistogram(1, 500, 0, 0, types.NewFieldType(mysql.TypeTiny), chunk.InitialCapacity, 0)
for j := 1; j <= 500; j++ {
datum := types.NewIntDatum(int64(j))
h.AppendBucket(&datum, &datum, int64(10+j*10), 10)
}
hists = append(hists, h)
}

return topNs, hists
}

func benchmarkMergePartTopN2GlobalTopNWithHists(partitions int, b *testing.B) {
loc := time.UTC
version := 1
killer := sqlkiller.SQLKiller{}
topNs, hists := prepareTopNsAndHists(b, partitions, loc)

b.ResetTimer()
for i := 0; i < b.N; i++ {
// Benchmark merge 10 topN.
_, _, _, _ = MergePartTopN2GlobalTopN(loc, version, topNs, 10, hists, false, &killer)
// Benchmark merge 100 topN.
_, _, _, _ = MergePartTopN2GlobalTopN(
loc,
version,
topNs,
100,
hists,
false,
&killer,
)
}
}

var benchmarkSizes = []int{100, 1000, 2000, 5000, 10000}

// cmd: go test -run=^$ -bench=BenchmarkMergePartTopN2GlobalTopNWithHists -benchmem github.com/pingcap/tidb/pkg/statistics/handle/globalstats
func BenchmarkMergePartTopN2GlobalTopNWithHists(b *testing.B) {
for _, size := range benchmarkSizes {
b.Run(fmt.Sprintf("Size%d", size), func(b *testing.B) {
benchmarkMergePartTopN2GlobalTopNWithHists(size, b)
})
}
}

// cmd: go test -run=^$ -bench=BenchmarkMergeGlobalStatsTopNByConcurrencyWithHists -benchmem github.com/pingcap/tidb/pkg/statistics/handle/globalstats
func benchmarkMergeGlobalStatsTopNByConcurrencyWithHists(partitions int, b *testing.B) {
loc := time.UTC
sc := stmtctx.NewStmtCtxWithTimeZone(loc)
version := 1
killer := sqlkiller.SQLKiller{}

// Prepare TopNs.
topNs := make([]*statistics.TopN, 0, partitions)
for i := 0; i < partitions; i++ {
// Construct TopN, should be key1 -> 2, key2 -> 2, key3 -> 3.
topN := statistics.NewTopN(3)
{
key1, err := codec.EncodeKey(sc, nil, types.NewIntDatum(1))
require.NoError(b, err)
topN.AppendTopN(key1, 2)
key2, err := codec.EncodeKey(sc, nil, types.NewIntDatum(2))
require.NoError(b, err)
topN.AppendTopN(key2, 2)
if i%2 == 0 {
key3, err := codec.EncodeKey(sc, nil, types.NewIntDatum(3))
require.NoError(b, err)
topN.AppendTopN(key3, 3)
}
}
topNs = append(topNs, topN)
}

// Prepare Hists.
hists := make([]*statistics.Histogram, 0, partitions)
for i := 0; i < partitions; i++ {
// Construct Hist
h := statistics.NewHistogram(1, 10, 0, 0, types.NewFieldType(mysql.TypeTiny), chunk.InitialCapacity, 0)
h.Bounds.AppendInt64(0, 1)
h.Buckets = append(h.Buckets, statistics.Bucket{Repeat: 10, Count: 20})
h.Bounds.AppendInt64(0, 2)
h.Buckets = append(h.Buckets, statistics.Bucket{Repeat: 10, Count: 30})
h.Bounds.AppendInt64(0, 3)
h.Buckets = append(h.Buckets, statistics.Bucket{Repeat: 10, Count: 30})
h.Bounds.AppendInt64(0, 4)
h.Buckets = append(h.Buckets, statistics.Bucket{Repeat: 10, Count: 40})
hists = append(hists, h)
}
topNs, hists := prepareTopNsAndHists(b, partitions, loc)
wrapper := NewStatsWrapper(hists, topNs)
const mergeConcurrency = 4
batchSize := len(wrapper.AllTopN) / mergeConcurrency
Expand All @@ -136,24 +117,24 @@ func benchmarkMergeGlobalStatsTopNByConcurrencyWithHists(partitions int, b *test
defer gpool.Close()
b.ResetTimer()
for i := 0; i < b.N; i++ {
// Benchmark merge 10 topN.
_, _, _, _ = MergeGlobalStatsTopNByConcurrency(gpool, mergeConcurrency, batchSize, wrapper, loc, version, 10, false, &killer)
}
}

var benchmarkSizes = []int{100, 1000, 10000, 100000, 1000000, 10000000}
var benchmarkConcurrencySizes = []int{100, 1000, 10000, 100000}

func BenchmarkMergePartTopN2GlobalTopNWithHists(b *testing.B) {
for _, size := range benchmarkSizes {
b.Run(fmt.Sprintf("Size%d", size), func(b *testing.B) {
benchmarkMergePartTopN2GlobalTopNWithHists(size, b)
})
// Benchmark merge 100 topN.
_, _, _, _ = MergeGlobalStatsTopNByConcurrency(
gpool,
mergeConcurrency,
batchSize,
wrapper,
loc,
version,
100,
false,
&killer,
)
}
}

// cmd: go test -run=^$ -bench=BenchmarkMergeGlobalStatsTopNByConcurrencyWithHists -benchmem github.com/pingcap/tidb/pkg/statistics/handle/globalstats
func BenchmarkMergeGlobalStatsTopNByConcurrencyWithHists(b *testing.B) {
for _, size := range benchmarkConcurrencySizes {
for _, size := range benchmarkSizes {
b.Run(fmt.Sprintf("Size%d", size), func(b *testing.B) {
benchmarkMergeGlobalStatsTopNByConcurrencyWithHists(size, b)
})
Expand Down