Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
Signed-off-by: Weizhen Wang <wangweizhen@pingcap.com>
  • Loading branch information
hawkingrei committed Mar 27, 2024
1 parent 9f41d2b commit d2a8c22
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 16 deletions.
16 changes: 11 additions & 5 deletions pkg/statistics/handle/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import (
"github.com/pingcap/tidb/pkg/sessionctx/stmtctx"
"github.com/pingcap/tidb/pkg/statistics"
"github.com/pingcap/tidb/pkg/statistics/handle/cache"
"github.com/pingcap/tidb/pkg/statistics/handle/loadstats"
"github.com/pingcap/tidb/pkg/statistics/handle/initstats"
statstypes "github.com/pingcap/tidb/pkg/statistics/handle/types"
"github.com/pingcap/tidb/pkg/statistics/handle/util"
"github.com/pingcap/tidb/pkg/types"
Expand Down Expand Up @@ -78,6 +78,12 @@ func (h *Handle) initStatsMeta(is infoschema.InfoSchema) (statstypes.StatsCache,
if err != nil {
return nil, err
}
if config.GetGlobalConfig().Performance.ConcurrencyInitStats {
ls := initstats.NewLoadStats(rc.Next, h.initStatsMeta4Chunk)
ls.LoadStats(is, tables, rc)
ls.Wait()
return tables, nil
}
req := rc.NewChunk(nil)
iter := chunk.NewIterator4Chunk(req)
for {
Expand Down Expand Up @@ -237,7 +243,7 @@ func (h *Handle) initStatsHistogramsLite(is infoschema.InfoSchema, cache statsty
}
defer terror.Call(rc.Close)
if config.GetGlobalConfig().Performance.ConcurrencyInitStats {
ls := loadstats.NewLoadStats(rc.Next, h.initStatsHistograms4ChunkLite)
ls := initstats.NewLoadStats(rc.Next, h.initStatsHistograms4ChunkLite)
ls.LoadStats(is, cache, rc)
ls.Wait()
return nil
Expand Down Expand Up @@ -266,7 +272,7 @@ func (h *Handle) initStatsHistograms(is infoschema.InfoSchema, cache statstypes.
}
defer terror.Call(rc.Close)
if config.GetGlobalConfig().Performance.ConcurrencyInitStats {
ls := loadstats.NewLoadStats(rc.Next, h.initStatsHistograms4Chunk)
ls := initstats.NewLoadStats(rc.Next, h.initStatsHistograms4Chunk)
ls.LoadStats(is, cache, rc)
ls.Wait()
return nil
Expand Down Expand Up @@ -321,7 +327,7 @@ func (h *Handle) initStatsTopN(cache statstypes.StatsCache) error {
}
defer terror.Call(rc.Close)
if config.GetGlobalConfig().Performance.ConcurrencyInitStats {
ls := loadstats.NewLoadStats(rc.Next, func(_ infoschema.InfoSchema, cache statstypes.StatsCache, iter *chunk.Iterator4Chunk) {
ls := initstats.NewLoadStats(rc.Next, func(_ infoschema.InfoSchema, cache statstypes.StatsCache, iter *chunk.Iterator4Chunk) {
h.initStatsTopN4Chunk(cache, iter)
})
ls.LoadStats(nil, cache, rc)
Expand Down Expand Up @@ -453,7 +459,7 @@ func (h *Handle) initStatsBuckets(cache statstypes.StatsCache) error {
}
defer terror.Call(rc.Close)
if config.GetGlobalConfig().Performance.ConcurrencyInitStats {
ls := loadstats.NewLoadStats(rc.Next, func(_ infoschema.InfoSchema, cache statstypes.StatsCache, iter *chunk.Iterator4Chunk) {
ls := initstats.NewLoadStats(rc.Next, func(_ infoschema.InfoSchema, cache statstypes.StatsCache, iter *chunk.Iterator4Chunk) {
h.initStatsBuckets4Chunk(cache, iter)
})
ls.LoadStats(nil, cache, rc)
Expand Down
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package loadstats
package initstats

import (
"context"
Expand All @@ -29,26 +29,26 @@ import (
"go.uber.org/zap"
)

// LoadStats is used to load stats concurrently.
type LoadStats struct {
// Worker is used to load stats concurrently.
type Worker struct {
taskFunc func(ctx context.Context, req *chunk.Chunk) error
dealFunc func(is infoschema.InfoSchema, cache statstypes.StatsCache, iter *chunk.Iterator4Chunk)
mu sync.Mutex
wg util.WaitGroupWrapper
}

// NewLoadStats creates a new LoadStats.
// NewLoadStats creates a new Worker.
func NewLoadStats(
taskFunc func(ctx context.Context, req *chunk.Chunk) error,
dealFunc func(is infoschema.InfoSchema, cache statstypes.StatsCache, iter *chunk.Iterator4Chunk)) *LoadStats {
return &LoadStats{
dealFunc func(is infoschema.InfoSchema, cache statstypes.StatsCache, iter *chunk.Iterator4Chunk)) *Worker {
return &Worker{
taskFunc: taskFunc,
dealFunc: dealFunc,
}
}

// LoadStats loads stats concurrently.
func (ls *LoadStats) LoadStats(is infoschema.InfoSchema, cache statstypes.StatsCache, rc sqlexec.RecordSet) {
// Worker loads stats concurrently.
func (ls *Worker) LoadStats(is infoschema.InfoSchema, cache statstypes.StatsCache, rc sqlexec.RecordSet) {
concurrency := runtime.GOMAXPROCS(0)
for n := 0; n < concurrency; n++ {
ls.wg.Run(func() {
Expand All @@ -58,7 +58,7 @@ func (ls *LoadStats) LoadStats(is infoschema.InfoSchema, cache statstypes.StatsC
}
}

func (ls *LoadStats) loadStats(is infoschema.InfoSchema, cache statstypes.StatsCache, req *chunk.Chunk) {
func (ls *Worker) loadStats(is infoschema.InfoSchema, cache statstypes.StatsCache, req *chunk.Chunk) {
iter := chunk.NewIterator4Chunk(req)
ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnStats)
for {
Expand All @@ -74,13 +74,13 @@ func (ls *LoadStats) loadStats(is infoschema.InfoSchema, cache statstypes.StatsC
}
}

func (ls *LoadStats) getTask(ctx context.Context, req *chunk.Chunk) error {
func (ls *Worker) getTask(ctx context.Context, req *chunk.Chunk) error {
ls.mu.Lock()
defer ls.mu.Unlock()
return ls.taskFunc(ctx, req)
}

// Wait closes the load stats worker.
func (ls *LoadStats) Wait() {
func (ls *Worker) Wait() {
ls.wg.Wait()
}

0 comments on commit d2a8c22

Please sign in to comment.