Skip to content

Commit

Permalink
address comment
Browse files Browse the repository at this point in the history
Signed-off-by: husharp <jinhao.hu@pingcap.com>
  • Loading branch information
HuSharp committed Jun 3, 2024
1 parent 73fa833 commit 37d30f6
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 42 deletions.
21 changes: 8 additions & 13 deletions tools/pd-heartbeat-bench/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -484,7 +484,6 @@ func main() {
log.Fatal("initialize logger error", zap.Error(err))
}

metrics.InitMetric2Collect(cfg.MetricsAddr)
maxVersion = cfg.InitEpochVer
options := config.NewOptions(cfg)
// let PD have enough time to start
Expand Down Expand Up @@ -531,11 +530,11 @@ func main() {
defer heartbeatTicker.Stop()
var resolvedTSTicker = time.NewTicker(time.Second)
defer resolvedTSTicker.Stop()
withMetric := metrics.InitMetric2Collect(cfg.MetricsAddr)
for {
select {
case <-heartbeatTicker.C:
if cfg.Round != 0 && regions.updateRound > cfg.Round {
metrics.OutputConclusion()
exit(0)
}
rep := newReport(cfg)
Expand All @@ -548,21 +547,17 @@ func main() {
wg.Add(1)
go regions.handleRegionHeartbeat(wg, streams[id], id, rep)
}
go metrics.CollectMetrics(regions.updateRound, 1*time.Second)
if withMetric {
metrics.CollectMetrics(regions.updateRound, time.Second)
}
wg.Wait()

since := time.Since(startTime).Seconds()
close(rep.Results())
regions.result(cfg.RegionCount, since)
stats := <-r
log.Info("region heartbeat stats", zap.String("total", fmt.Sprintf("%.4fs", stats.Total.Seconds())),
zap.String("slowest", fmt.Sprintf("%.4fs", stats.Slowest)),
zap.String("fastest", fmt.Sprintf("%.4fs", stats.Fastest)),
zap.String("average", fmt.Sprintf("%.4fs", stats.Average)),
zap.String("stddev", fmt.Sprintf("%.4fs", stats.Stddev)),
zap.String("rps", fmt.Sprintf("%.4f", stats.RPS)),
zap.Uint64("max-epoch-version", maxVersion),
)
log.Info("region heartbeat stats",
metrics.RegionFields(stats, zap.Uint64("max-epoch-version", maxVersion))...)
log.Info("store heartbeat stats", zap.String("max", fmt.Sprintf("%.4fs", since)))
metrics.CollectRegionAndStoreStats(&stats, &since)
regions.update(cfg, options)
Expand Down Expand Up @@ -600,6 +595,7 @@ func main() {
}

func exit(code int) {
metrics.OutputConclusion()
os.Exit(code)
}

Expand Down Expand Up @@ -695,7 +691,7 @@ func runHTTPServer(cfg *config.Config, options *config.Options) {

c.IndentedJSON(http.StatusOK, output)
})
engine.GET("metrics_collect", func(c *gin.Context) {
engine.GET("metrics-collect", func(c *gin.Context) {
second := c.Query("second")
if second == "" {
c.String(http.StatusBadRequest, "missing second")
Expand All @@ -706,7 +702,6 @@ func runHTTPServer(cfg *config.Config, options *config.Options) {
c.String(http.StatusBadRequest, "invalid second")
return
}
metrics.InitMetric2Collect(cfg.MetricsAddr)
metrics.CollectMetrics(metrics.WarmUpRound, time.Duration(secondInt)*time.Second)
c.IndentedJSON(http.StatusOK, "Successfully collect metrics")
})
Expand Down
83 changes: 54 additions & 29 deletions tools/pd-heartbeat-bench/metrics/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,16 +31,16 @@ import (
)

var (
prometheusCli api.Client
avgMetrics2Collect []Metric
avgRegionStats report.Stats
avgStoreTime float64
collectRound = 1.0
prometheusCli api.Client
finalMetrics2Collect []Metric
avgRegionStats report.Stats
avgStoreTime float64
collectRound = 1.0

metrics2Collect = []Metric{
{promSQL: cpuMetric, name: "max cpu usage(%)"},
{promSQL: memoryMetric, name: "max memory usage(G)"},
{promSQL: goRoutineMetric, name: "max go routines"},
{promSQL: cpuMetric, name: "max cpu usage(%)", max: true},
{promSQL: memoryMetric, name: "max memory usage(G)", max: true},
{promSQL: goRoutineMetric, name: "max go routines", max: true},
{promSQL: hbLatency99Metric, name: "99% Heartbeat Latency(ms)"},
{promSQL: hbLatencyAvgMetric, name: "Avg Heartbeat Latency(ms)"},
}
Expand All @@ -50,11 +50,10 @@ var (
memoryMetric = `max_over_time(go_memstats_heap_inuse_bytes{job=~".*pd.*"}[1h])/1024/1024/1024`
goRoutineMetric = `max_over_time(go_goroutines{job=~".*pd.*"}[1h])`
hbLatency99Metric = `histogram_quantile(0.99, sum(rate(pd_scheduler_handle_region_heartbeat_duration_seconds_bucket{}[1m])) by (le))`
hbLatencyAvgMetric = `sum(rate(pd_scheduler_handle_region_heartbeat_duration_seconds_sum{}[1m])) / sum(rate(pd_scheduler_handle_region_heartbeat_duration_seconds_count{}[1m])) * 1000`
hbLatencyAvgMetric = `sum(rate(pd_scheduler_handle_region_heartbeat_duration_seconds_sum{}[1m])) / sum(rate(pd_scheduler_handle_region_heartbeat_duration_seconds_count{}[1m]))`

// Heartbeat Performance Duration BreakDown
hbBreakdownName = "Heartbeat Performance Duration BreakDown (Accumulation)(ms)"
breakdownNames = []string{
breakdownNames = []string{
"AsyncHotStatsDuration",
"CollectRegionStats",
"Other",
Expand All @@ -74,30 +73,39 @@ type Metric struct {
promSQL string
name string
value float64
// max indicates whether the metric is a max value
max bool
}

func InitMetric2Collect(endpoint string) {
func InitMetric2Collect(endpoint string) (withMetric bool) {
for _, name := range breakdownNames {
metrics2Collect = append(metrics2Collect, Metric{
promSQL: hbBreakdownMetricByName(name),
name: fmt.Sprintf("%s with %s", hbBreakdownName, name),
name: name,
})
}
avgMetrics2Collect = metrics2Collect
finalMetrics2Collect = metrics2Collect

if j := strings.Index(endpoint, "//"); j == -1 {
endpoint = "http://" + endpoint
}
cu, err := url.Parse(endpoint)
if err != nil {
log.Error("parse prometheus url error", zap.Error(err))
return
return false
}
prometheusCli, err = NewPrometheusClient(*cu)
if err != nil {
log.Error("create prometheus client error", zap.Error(err))
return
return false
}
// check whether the prometheus is available
_, err = getMetric(prometheusCli, goRoutineMetric, time.Now())
if err != nil {
log.Error("check prometheus availability error, please check the prometheus address", zap.Error(err))
return false
}
return true
}

func NewPrometheusClient(prometheusURL url.URL) (api.Client, error) {
Expand All @@ -111,7 +119,7 @@ func NewPrometheusClient(prometheusURL url.URL) (api.Client, error) {
return client, nil
}

// wait for the first round to warm up
// WarmUpRound wait for the first round to warm up
const WarmUpRound = 1

func CollectMetrics(curRound int, wait time.Duration) {
Expand Down Expand Up @@ -143,11 +151,15 @@ func CollectMetrics(curRound int, wait time.Duration) {
}
for i := 0; i < len(metrics2Collect); i++ {
metrics2Collect[i].value = getRes(i)
avgMetrics2Collect[i].value = (avgMetrics2Collect[i].value*collectRound + metrics2Collect[i].value) / (collectRound + 1)
if metrics2Collect[i].max {
finalMetrics2Collect[i].value = max(finalMetrics2Collect[i].value, metrics2Collect[i].value)
} else {
finalMetrics2Collect[i].value = (finalMetrics2Collect[i].value*collectRound + metrics2Collect[i].value) / (collectRound + 1)
}
}

collectRound += 1
log.Info("metrics collected", zap.String("metrics", formatMetrics(metrics2Collect)))
log.Info("metrics collected", zap.Float64("round", collectRound), zap.String("metrics", formatMetrics(metrics2Collect)))
}

func getMetric(cli api.Client, query string, ts time.Time) ([]float64, error) {
Expand All @@ -171,6 +183,14 @@ func getMetric(cli api.Client, query string, ts time.Time) ([]float64, error) {
return value, nil
}

func formatMetrics(ms []Metric) string {
res := ""
for _, m := range ms {
res += "[" + m.name + "]" + " " + fmt.Sprintf("%.10f", m.value) + " "
}
return res
}

func CollectRegionAndStoreStats(regionStats *report.Stats, storeTime *float64) {
if regionStats != nil && storeTime != nil {
collect(*regionStats, *storeTime)
Expand All @@ -191,16 +211,21 @@ func collect(regionStats report.Stats, storeTime float64) {
avgStoreTime = average(avgStoreTime, storeTime)
}

func formatMetrics(ms []Metric) string {
res := ""
for _, m := range ms {
res += "[" + m.name + "]" + " " + fmt.Sprintf("%.10f", m.value) + " "
}
return res
func OutputConclusion() {
logFields := RegionFields(avgRegionStats,
zap.Float64("avg store time", avgStoreTime),
zap.Float64("current round", collectRound),
zap.String("metrics", formatMetrics(finalMetrics2Collect)))
log.Info("final metrics collected", logFields...)
}

func OutputConclusion() {
log.Info("average metrics", zap.Float64("avg store time", avgStoreTime),
zap.Any("avg region stats", avgRegionStats),
zap.String("metrics", formatMetrics(avgMetrics2Collect)))
func RegionFields(stats report.Stats, fields ...zap.Field) []zap.Field {
return append([]zap.Field{
zap.String("total", fmt.Sprintf("%.4fs", stats.Total.Seconds())),
zap.String("slowest", fmt.Sprintf("%.4fs", stats.Slowest)),
zap.String("fastest", fmt.Sprintf("%.4fs", stats.Fastest)),
zap.String("average", fmt.Sprintf("%.4fs", stats.Average)),
zap.String("stddev", fmt.Sprintf("%.4fs", stats.Stddev)),
zap.String("rps", fmt.Sprintf("%.4f", stats.RPS)),
}, fields...)
}

0 comments on commit 37d30f6

Please sign in to comment.