Skip to content

Commit

Permalink
domain, statistics: periodically update stats healthy distribution (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot committed Feb 5, 2024
1 parent 44f39e9 commit 2ec7e32
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 58 deletions.
4 changes: 4 additions & 0 deletions domain/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -1948,8 +1948,10 @@ func (do *Domain) loadStatsWorker() {
lease = 3 * time.Second
}
loadTicker := time.NewTicker(lease)
updStatsHealthyTicker := time.NewTicker(20 * lease)
defer func() {
loadTicker.Stop()
updStatsHealthyTicker.Stop()
logutil.BgLogger().Info("loadStatsWorker exited.")
}()
do.initStats()
Expand All @@ -1970,6 +1972,8 @@ func (do *Domain) loadStatsWorker() {
if err != nil {
logutil.BgLogger().Debug("load histograms failed", zap.Error(err))
}
case <-updStatsHealthyTicker.C:
statsHandle.UpdateStatsHealthyMetrics()
case <-do.exit:
return
}
Expand Down
11 changes: 0 additions & 11 deletions statistics/handle/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -412,17 +412,6 @@ func (h *Handle) InitStats(is infoschema.InfoSchema) (err error) {
}
cache.FreshMemUsage()
h.updateStatsCache(cache)
v := h.statsCache.Load()
if v == nil {
return nil
}
healthyChange := &statsHealthyChange{}
for _, tbl := range v.(statsCache).Values() {
if healthy, ok := tbl.GetStatsHealthy(); ok {
healthyChange.add(healthy)
}
}
healthyChange.apply()
return nil
}

Expand Down
73 changes: 26 additions & 47 deletions statistics/handle/handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -534,42 +534,32 @@ var statsHealthyGauges = []prometheus.Gauge{
metrics.StatsHealthyGauge.WithLabelValues("[0,100]"),
}

type statsHealthyChange struct {
bucketDelta [5]int
}

func (c *statsHealthyChange) update(add bool, statsHealthy int64) {
var idx int
if statsHealthy < 50 {
idx = 0
} else if statsHealthy < 80 {
idx = 1
} else if statsHealthy < 100 {
idx = 2
} else {
idx = 3
}
lastIDX := len(c.bucketDelta) - 1
if add {
c.bucketDelta[idx]++
c.bucketDelta[lastIDX]++
} else {
c.bucketDelta[idx]--
c.bucketDelta[lastIDX]--
// UpdateStatsHealthyMetrics updates stats healthy distribution metrics according to stats cache.
func (h *Handle) UpdateStatsHealthyMetrics() {
v := h.statsCache.Load()
if v == nil {
return
}
}

func (c *statsHealthyChange) drop(statsHealthy int64) {
c.update(false, statsHealthy)
}

func (c *statsHealthyChange) add(statsHealthy int64) {
c.update(true, statsHealthy)
}

func (c *statsHealthyChange) apply() {
for i, val := range c.bucketDelta {
statsHealthyGauges[i].Add(float64(val))
distribution := make([]int64, 5)
for _, tbl := range v.(statsCache).Values() {
healthy, ok := tbl.GetStatsHealthy()
if !ok {
continue
}
if healthy < 50 {
distribution[0] += 1
} else if healthy < 80 {
distribution[1] += 1
} else if healthy < 100 {
distribution[2] += 1
} else {
distribution[3] += 1
}
distribution[4] += 1
}
for i, val := range distribution {
statsHealthyGauges[i].Set(float64(val))
}
}

Expand All @@ -593,7 +583,6 @@ func (h *Handle) Update(is infoschema.InfoSchema, opts ...TableStatsOpt) error {
if err != nil {
return errors.Trace(err)
}
healthyChange := &statsHealthyChange{}
option := &tableStatsOption{}
for _, opt := range opts {
opt(option)
Expand All @@ -615,8 +604,7 @@ func (h *Handle) Update(is infoschema.InfoSchema, opts ...TableStatsOpt) error {
continue
}
tableInfo := table.Meta()
oldTbl, ok := oldCache.Get(physicalID)
if ok && oldTbl.Version >= version && tableInfo.UpdateTS == oldTbl.TblInfoUpdateTS {
if oldTbl, ok := oldCache.Get(physicalID); ok && oldTbl.Version >= version && tableInfo.UpdateTS == oldTbl.TblInfoUpdateTS {
continue
}
tbl, err := h.TableStatsFromStorage(tableInfo, physicalID, false, 0)
Expand All @@ -625,9 +613,6 @@ func (h *Handle) Update(is infoschema.InfoSchema, opts ...TableStatsOpt) error {
logutil.BgLogger().Error("[stats] error occurred when read table stats", zap.String("table", tableInfo.Name.O), zap.Error(err))
continue
}
if oldHealthy, ok := oldTbl.GetStatsHealthy(); ok {
healthyChange.drop(oldHealthy)
}
if tbl == nil {
deletedTableIDs = append(deletedTableIDs, physicalID)
continue
Expand All @@ -637,15 +622,9 @@ func (h *Handle) Update(is infoschema.InfoSchema, opts ...TableStatsOpt) error {
tbl.ModifyCount = modifyCount
tbl.Name = getFullTableName(is, tableInfo)
tbl.TblInfoUpdateTS = tableInfo.UpdateTS
if newHealthy, ok := tbl.GetStatsHealthy(); ok {
healthyChange.add(newHealthy)
}
tables = append(tables, tbl)
}
updated := h.updateStatsCache(oldCache.update(tables, deletedTableIDs, lastVersion, opts...))
if updated {
healthyChange.apply()
}
h.updateStatsCache(oldCache.update(tables, deletedTableIDs, lastVersion, opts...))
return nil
}

Expand Down

0 comments on commit 2ec7e32

Please sign in to comment.