Skip to content

Commit

Permalink
refine code
Browse files Browse the repository at this point in the history
Signed-off-by: husharp <jinhao.hu@pingcap.com>
  • Loading branch information
HuSharp committed Jun 3, 2024
1 parent 8979b36 commit e678954
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 26 deletions.
13 changes: 4 additions & 9 deletions tools/pd-heartbeat-bench/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -484,7 +484,6 @@ func main() {
log.Fatal("initialize logger error", zap.Error(err))
}

withMetric := metrics.InitMetric2Collect(cfg.MetricsAddr)
maxVersion = cfg.InitEpochVer
options := config.NewOptions(cfg)
// let PD have enough time to start
Expand Down Expand Up @@ -531,6 +530,7 @@ func main() {
defer heartbeatTicker.Stop()
var resolvedTSTicker = time.NewTicker(time.Second)
defer resolvedTSTicker.Stop()
withMetric := metrics.InitMetric2Collect(cfg.MetricsAddr)
for {
select {
case <-heartbeatTicker.C:
Expand All @@ -557,14 +557,8 @@ func main() {
close(rep.Results())
regions.result(cfg.RegionCount, since)
stats := <-r
log.Info("region heartbeat stats", zap.String("total", fmt.Sprintf("%.4fs", stats.Total.Seconds())),
zap.String("slowest", fmt.Sprintf("%.4fs", stats.Slowest)),
zap.String("fastest", fmt.Sprintf("%.4fs", stats.Fastest)),
zap.String("average", fmt.Sprintf("%.4fs", stats.Average)),
zap.String("stddev", fmt.Sprintf("%.4fs", stats.Stddev)),
zap.String("rps", fmt.Sprintf("%.4f", stats.RPS)),
zap.Uint64("max-epoch-version", maxVersion),
)
log.Info("region heartbeat stats",
metrics.RegionFields(stats, zap.Uint64("max-epoch-version", maxVersion))...)
log.Info("store heartbeat stats", zap.String("max", fmt.Sprintf("%.4fs", since)))
metrics.CollectRegionAndStoreStats(&stats, &since)
regions.update(cfg, options)
Expand Down Expand Up @@ -602,6 +596,7 @@ func main() {
}

func exit(code int) {
metrics.OutputConclusion()
os.Exit(code)
}

Expand Down
52 changes: 35 additions & 17 deletions tools/pd-heartbeat-bench/metrics/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,16 +31,16 @@ import (
)

var (
prometheusCli api.Client
avgMetrics2Collect []Metric
avgRegionStats report.Stats
avgStoreTime float64
collectRound = 1.0
prometheusCli api.Client
finalMetrics2Collect []Metric
avgRegionStats report.Stats
avgStoreTime float64
collectRound = 1.0

metrics2Collect = []Metric{
{promSQL: cpuMetric, name: "max cpu usage(%)"},
{promSQL: memoryMetric, name: "max memory usage(G)"},
{promSQL: goRoutineMetric, name: "max go routines"},
{promSQL: cpuMetric, name: "max cpu usage(%)", max: true},
{promSQL: memoryMetric, name: "max memory usage(G)", max: true},
{promSQL: goRoutineMetric, name: "max go routines", max: true},
{promSQL: hbLatency99Metric, name: "99% Heartbeat Latency(ms)"},
{promSQL: hbLatencyAvgMetric, name: "Avg Heartbeat Latency(ms)"},
}
Expand All @@ -53,8 +53,7 @@ var (
hbLatencyAvgMetric = `sum(rate(pd_scheduler_handle_region_heartbeat_duration_seconds_sum{}[1m])) / sum(rate(pd_scheduler_handle_region_heartbeat_duration_seconds_count{}[1m]))`

// Heartbeat Performance Duration BreakDown
hbBreakdownName = "Heartbeat Performance Duration BreakDown (Accumulation)(ms)"
breakdownNames = []string{
breakdownNames = []string{
"AsyncHotStatsDuration",
"CollectRegionStats",
"Other",
Expand All @@ -74,16 +73,18 @@ type Metric struct {
promSQL string
name string
value float64
// max indicates whether the metric is a max value
max bool
}

func InitMetric2Collect(endpoint string) (withMetric bool) {
for _, name := range breakdownNames {
metrics2Collect = append(metrics2Collect, Metric{
promSQL: hbBreakdownMetricByName(name),
name: fmt.Sprintf("%s with %s", hbBreakdownName, name),
name: name,
})
}
avgMetrics2Collect = metrics2Collect
finalMetrics2Collect = metrics2Collect

if j := strings.Index(endpoint, "//"); j == -1 {
endpoint = "http://" + endpoint
Expand Down Expand Up @@ -150,11 +151,15 @@ func CollectMetrics(curRound int, wait time.Duration) {
}
for i := 0; i < len(metrics2Collect); i++ {
metrics2Collect[i].value = getRes(i)
avgMetrics2Collect[i].value = (avgMetrics2Collect[i].value*collectRound + metrics2Collect[i].value) / (collectRound + 1)
if metrics2Collect[i].max {
finalMetrics2Collect[i].value = max(finalMetrics2Collect[i].value, metrics2Collect[i].value)
} else {
finalMetrics2Collect[i].value = (finalMetrics2Collect[i].value*collectRound + metrics2Collect[i].value) / (collectRound + 1)
}
}

collectRound += 1
log.Info("metrics collected", zap.String("metrics", formatMetrics(metrics2Collect)))
log.Info("metrics collected", zap.Float64("round", collectRound), zap.String("metrics", formatMetrics(metrics2Collect)))
}

func getMetric(cli api.Client, query string, ts time.Time) ([]float64, error) {
Expand Down Expand Up @@ -207,7 +212,20 @@ func formatMetrics(ms []Metric) string {
}

func OutputConclusion() {
log.Info("average metrics", zap.Float64("avg store time", avgStoreTime),
zap.Any("avg region stats", avgRegionStats),
zap.String("metrics", formatMetrics(avgMetrics2Collect)))
logFields := RegionFields(avgRegionStats,
zap.Float64("avg store time", avgStoreTime),
zap.Float64("current round", collectRound),
zap.String("metrics", formatMetrics(finalMetrics2Collect)))
log.Info("final metrics collected", logFields...)
}

func RegionFields(stats report.Stats, fields ...zap.Field) []zap.Field {
return append([]zap.Field{
zap.String("total", fmt.Sprintf("%.4fs", stats.Total.Seconds())),
zap.String("slowest", fmt.Sprintf("%.4fs", stats.Slowest)),
zap.String("fastest", fmt.Sprintf("%.4fs", stats.Fastest)),
zap.String("average", fmt.Sprintf("%.4fs", stats.Average)),
zap.String("stddev", fmt.Sprintf("%.4fs", stats.Stddev)),
zap.String("rps", fmt.Sprintf("%.4f", stats.RPS)),
}, fields...)
}

0 comments on commit e678954

Please sign in to comment.