Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

statistics: concurrency init stats #51403

Merged
merged 7 commits into from
Mar 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 4 additions & 0 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -749,6 +749,9 @@ type Performance struct {
// If ForceInitStats is false, tidb can provide service before init stats is finished. Note that during the period
// of init stats the optimizer may make bad decisions due to pseudo stats.
ForceInitStats bool `toml:"force-init-stats" json:"force-init-stats"`

// ConcurrentlyInitStats indicates whether to use concurrency to init stats.
ConcurrentlyInitStats bool `toml:"concurrently-init-stats" json:"concurrently-init-stats"`
}

// PlanCache is the PlanCache section of the config.
Expand Down Expand Up @@ -1016,6 +1019,7 @@ var defaultConf = Config{
EnableLoadFMSketch: false,
LiteInitStats: true,
ForceInitStats: true,
ConcurrentlyInitStats: true,
},
ProxyProtocol: ProxyProtocol{
Networks: "",
Expand Down
1 change: 1 addition & 0 deletions pkg/statistics/handle/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ go_library(
"//pkg/statistics/handle/ddl",
"//pkg/statistics/handle/globalstats",
"//pkg/statistics/handle/history",
"//pkg/statistics/handle/initstats",
"//pkg/statistics/handle/lockstats",
"//pkg/statistics/handle/logutil",
"//pkg/statistics/handle/storage",
Expand Down
63 changes: 49 additions & 14 deletions pkg/statistics/handle/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/pingcap/tidb/pkg/sessionctx/stmtctx"
"github.com/pingcap/tidb/pkg/statistics"
"github.com/pingcap/tidb/pkg/statistics/handle/cache"
"github.com/pingcap/tidb/pkg/statistics/handle/initstats"
statstypes "github.com/pingcap/tidb/pkg/statistics/handle/types"
"github.com/pingcap/tidb/pkg/statistics/handle/util"
"github.com/pingcap/tidb/pkg/types"
Expand Down Expand Up @@ -77,6 +78,12 @@ func (h *Handle) initStatsMeta(is infoschema.InfoSchema) (statstypes.StatsCache,
if err != nil {
return nil, err
}
if config.GetGlobalConfig().Performance.ConcurrentlyInitStats {
ls := initstats.NewWorker(rc.Next, h.initStatsMeta4Chunk)
ls.LoadStats(is, tables, rc)
ls.Wait()
return tables, nil
}
req := rc.NewChunk(nil)
iter := chunk.NewIterator4Chunk(req)
for {
Expand Down Expand Up @@ -229,13 +236,19 @@ func (h *Handle) initStatsHistograms4Chunk(is infoschema.InfoSchema, cache stats
}

func (h *Handle) initStatsHistogramsLite(is infoschema.InfoSchema, cache statstypes.StatsCache) error {
ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnStats)
sql := "select HIGH_PRIORITY table_id, is_index, hist_id, distinct_count, version, null_count, tot_col_size, stats_ver, correlation, flag, last_analyze_pos from mysql.stats_histograms"
rc, err := util.Exec(h.initStatsCtx, sql)
if err != nil {
return errors.Trace(err)
}
defer terror.Call(rc.Close)
if config.GetGlobalConfig().Performance.ConcurrentlyInitStats {
ls := initstats.NewWorker(rc.Next, h.initStatsHistograms4ChunkLite)
ls.LoadStats(is, cache, rc)
ls.Wait()
return nil
}
ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnStats)
hawkingrei marked this conversation as resolved.
Show resolved Hide resolved
req := rc.NewChunk(nil)
iter := chunk.NewIterator4Chunk(req)
for {
Expand All @@ -252,13 +265,19 @@ func (h *Handle) initStatsHistogramsLite(is infoschema.InfoSchema, cache statsty
}

func (h *Handle) initStatsHistograms(is infoschema.InfoSchema, cache statstypes.StatsCache) error {
ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnStats)
sql := "select HIGH_PRIORITY table_id, is_index, hist_id, distinct_count, version, null_count, cm_sketch, tot_col_size, stats_ver, correlation, flag, last_analyze_pos from mysql.stats_histograms"
rc, err := util.Exec(h.initStatsCtx, sql)
if err != nil {
return errors.Trace(err)
}
defer terror.Call(rc.Close)
if config.GetGlobalConfig().Performance.ConcurrentlyInitStats {
ls := initstats.NewWorker(rc.Next, h.initStatsHistograms4Chunk)
ls.LoadStats(is, cache, rc)
ls.Wait()
return nil
}
ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnStats)
req := rc.NewChunk(nil)
iter := chunk.NewIterator4Chunk(req)
for {
Expand Down Expand Up @@ -301,13 +320,21 @@ func (*Handle) initStatsTopN4Chunk(cache statstypes.StatsCache, iter *chunk.Iter
}

func (h *Handle) initStatsTopN(cache statstypes.StatsCache) error {
ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnStats)
sql := "select HIGH_PRIORITY table_id, hist_id, value, count from mysql.stats_top_n where is_index = 1"
rc, err := util.Exec(h.initStatsCtx, sql)
if err != nil {
return errors.Trace(err)
}
defer terror.Call(rc.Close)
if config.GetGlobalConfig().Performance.ConcurrentlyInitStats {
ls := initstats.NewWorker(rc.Next, func(_ infoschema.InfoSchema, cache statstypes.StatsCache, iter *chunk.Iterator4Chunk) {
h.initStatsTopN4Chunk(cache, iter)
})
ls.LoadStats(nil, cache, rc)
ls.Wait()
return nil
}
ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnStats)
req := rc.NewChunk(nil)
iter := chunk.NewIterator4Chunk(req)
for {
Expand Down Expand Up @@ -425,24 +452,32 @@ func (*Handle) initStatsBuckets4Chunk(cache statstypes.StatsCache, iter *chunk.I
}

func (h *Handle) initStatsBuckets(cache statstypes.StatsCache) error {
ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnStats)
sql := "select HIGH_PRIORITY table_id, is_index, hist_id, count, repeats, lower_bound, upper_bound, ndv from mysql.stats_buckets order by table_id, is_index, hist_id, bucket_id"
rc, err := util.Exec(h.initStatsCtx, sql)
if err != nil {
return errors.Trace(err)
}
defer terror.Call(rc.Close)
req := rc.NewChunk(nil)
iter := chunk.NewIterator4Chunk(req)
for {
err := rc.Next(ctx, req)
if err != nil {
return errors.Trace(err)
}
if req.NumRows() == 0 {
break
if config.GetGlobalConfig().Performance.ConcurrentlyInitStats {
ls := initstats.NewWorker(rc.Next, func(_ infoschema.InfoSchema, cache statstypes.StatsCache, iter *chunk.Iterator4Chunk) {
h.initStatsBuckets4Chunk(cache, iter)
})
ls.LoadStats(nil, cache, rc)
ls.Wait()
} else {
ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnStats)
req := rc.NewChunk(nil)
iter := chunk.NewIterator4Chunk(req)
for {
err := rc.Next(ctx, req)
if err != nil {
return errors.Trace(err)
}
if req.NumRows() == 0 {
break
}
h.initStatsBuckets4Chunk(cache, iter)
}
h.initStatsBuckets4Chunk(cache, iter)
}
tables := cache.Values()
for _, table := range tables {
Expand Down
2 changes: 1 addition & 1 deletion pkg/statistics/handle/handletest/statstest/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ go_test(
],
flaky = True,
race = "on",
shard_count = 9,
shard_count = 12,
deps = [
"//pkg/config",
"//pkg/parser/model",
Expand Down
53 changes: 48 additions & 5 deletions pkg/statistics/handle/handletest/statstest/stats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,10 +187,38 @@ func testInitStatsMemTrace(t *testing.T) {
}

func TestInitStatsMemTraceWithLite(t *testing.T) {
restore := config.RestoreFunc()
defer restore()
config.UpdateGlobal(func(conf *config.Config) {
conf.Performance.ConcurrentlyInitStats = false
})
testInitStatsMemTraceFunc(t, true)
}

func TestInitStatsMemTraceWithoutLite(t *testing.T) {
restore := config.RestoreFunc()
defer restore()
config.UpdateGlobal(func(conf *config.Config) {
conf.Performance.ConcurrentlyInitStats = false
})
testInitStatsMemTraceFunc(t, false)
}

func TestInitStatsMemTraceWithConcurrrencyLite(t *testing.T) {
restore := config.RestoreFunc()
defer restore()
config.UpdateGlobal(func(conf *config.Config) {
conf.Performance.ConcurrentlyInitStats = true
})
testInitStatsMemTraceFunc(t, true)
}

func TestInitStatsMemTraceWithoutConcurrrencyLite(t *testing.T) {
restore := config.RestoreFunc()
defer restore()
config.UpdateGlobal(func(conf *config.Config) {
conf.Performance.ConcurrentlyInitStats = true
})
testInitStatsMemTraceFunc(t, false)
}

Expand Down Expand Up @@ -283,11 +311,26 @@ func TestInitStats51358(t *testing.T) {
}

func TestInitStatsVer2(t *testing.T) {
originValue := config.GetGlobalConfig().Performance.LiteInitStats
defer func() {
config.GetGlobalConfig().Performance.LiteInitStats = originValue
}()
config.GetGlobalConfig().Performance.LiteInitStats = false
restore := config.RestoreFunc()
defer restore()
config.UpdateGlobal(func(conf *config.Config) {
config.GetGlobalConfig().Performance.LiteInitStats = false
config.GetGlobalConfig().Performance.ConcurrentlyInitStats = false
})
initStatsVer2(t)
}

func TestInitStatsVer2Concurrency(t *testing.T) {
restore := config.RestoreFunc()
defer restore()
config.UpdateGlobal(func(conf *config.Config) {
config.GetGlobalConfig().Performance.LiteInitStats = false
config.GetGlobalConfig().Performance.ConcurrentlyInitStats = true
})
initStatsVer2(t)
}

func initStatsVer2(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
Expand Down
18 changes: 18 additions & 0 deletions pkg/statistics/handle/initstats/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library")

go_library(
name = "initstats",
srcs = ["load_stats.go"],
importpath = "github.com/pingcap/tidb/pkg/statistics/handle/initstats",
visibility = ["//visibility:public"],
deps = [
"//pkg/infoschema",
"//pkg/kv",
"//pkg/statistics/handle/logutil",
"//pkg/statistics/handle/types",
"//pkg/util",
"//pkg/util/chunk",
"//pkg/util/sqlexec",
"@org_uber_go_zap//:zap",
],
)
86 changes: 86 additions & 0 deletions pkg/statistics/handle/initstats/load_stats.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
// Copyright 2024 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package initstats

import (
"context"
"runtime"
"sync"

"github.com/pingcap/tidb/pkg/infoschema"
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/statistics/handle/logutil"
statstypes "github.com/pingcap/tidb/pkg/statistics/handle/types"
"github.com/pingcap/tidb/pkg/util"
"github.com/pingcap/tidb/pkg/util/chunk"
"github.com/pingcap/tidb/pkg/util/sqlexec"
"go.uber.org/zap"
)

// Worker is used to load stats concurrently.
type Worker struct {
taskFunc func(ctx context.Context, req *chunk.Chunk) error
dealFunc func(is infoschema.InfoSchema, cache statstypes.StatsCache, iter *chunk.Iterator4Chunk)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about creating a type alias to avoid copying this signature repeatedly?

mu sync.Mutex
wg util.WaitGroupWrapper
}

// NewWorker creates a new Worker.
func NewWorker(
taskFunc func(ctx context.Context, req *chunk.Chunk) error,
dealFunc func(is infoschema.InfoSchema, cache statstypes.StatsCache, iter *chunk.Iterator4Chunk)) *Worker {
return &Worker{
taskFunc: taskFunc,
dealFunc: dealFunc,
}
}

// LoadStats loads stats concurrently when to init stats
func (ls *Worker) LoadStats(is infoschema.InfoSchema, cache statstypes.StatsCache, rc sqlexec.RecordSet) {
concurrency := runtime.GOMAXPROCS(0)
for n := 0; n < concurrency; n++ {
ls.wg.Run(func() {
req := rc.NewChunk(nil)
hawkingrei marked this conversation as resolved.
Show resolved Hide resolved
ls.loadStats(is, cache, req)
})
}
}

func (ls *Worker) loadStats(is infoschema.InfoSchema, cache statstypes.StatsCache, req *chunk.Chunk) {
iter := chunk.NewIterator4Chunk(req)
ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnStats)
for {
err := ls.getTask(ctx, req)
if err != nil {
logutil.StatsLogger().Error("load stats failed", zap.Error(err))
return
}
if req.NumRows() == 0 {
return
}
ls.dealFunc(is, cache, iter)
}
}

func (ls *Worker) getTask(ctx context.Context, req *chunk.Chunk) error {
ls.mu.Lock()
defer ls.mu.Unlock()
return ls.taskFunc(ctx, req)
}

// Wait closes the load stats worker.
func (ls *Worker) Wait() {
ls.wg.Wait()
}