Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: add region heartbeat duration breakdown metrics #7871

Merged
merged 9 commits into from
Mar 12, 2024
245 changes: 245 additions & 0 deletions pkg/core/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,245 @@
// Copyright 2024 TiKV Project Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package core

import (
"time"

"github.com/prometheus/client_golang/prometheus"
"go.uber.org/zap"
)

var (
// HeartbeatBreakdownHandleDurationSum is the summary of the processing time of handle the heartbeat stage.
HeartbeatBreakdownHandleDurationSum = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: "pd",
Subsystem: "core",
Name: "heartbeat_breakdown_handle_duration_seconds_sum",
nolouch marked this conversation as resolved.
Show resolved Hide resolved
Help: "Bucketed histogram of processing time (s) of handle the heartbeat stage.",
}, []string{"name"})

// HeartbeatBreakdownHandleCount is the summary of the processing count of handle the heartbeat stage.
HeartbeatBreakdownHandleCount = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: "pd",
Subsystem: "core",
Name: "heartbeat_breakdown_handle_duration_seconds_count",
Help: "Bucketed histogram of processing count of handle the heartbeat stage.",
}, []string{"name"})
// AcquireRegionsLockWaitDurationSum is the summary of the processing time of waiting for acquiring regions lock.
AcquireRegionsLockWaitDurationSum = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: "pd",
Subsystem: "core",
Name: "acquire_regions_lock_wait_duration_seconds_sum",
Help: "Bucketed histogram of processing time (s) of waiting for acquiring regions lock.",
}, []string{"type"})
// AcquireRegionsLockWaitCount is the summary of the processing count of waiting for acquiring regions lock.
AcquireRegionsLockWaitCount = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: "pd",
Subsystem: "core",
Name: "acquire_regions_lock_wait_duration_seconds_count",
Help: "Bucketed histogram of processing count of waiting for acquiring regions lock.",
}, []string{"name"})

// lock statistics
waitRegionsLockDurationSum = AcquireRegionsLockWaitDurationSum.WithLabelValues("WaitRegionsLock")
waitRegionsLockCount = AcquireRegionsLockWaitCount.WithLabelValues("WaitRegionsLock")
waitSubRegionsLockDurationSum = AcquireRegionsLockWaitDurationSum.WithLabelValues("WaitSubRegionsLock")
waitSubRegionsLockCount = AcquireRegionsLockWaitCount.WithLabelValues("WaitSubRegionsLock")

// heartbeat breakdown statistics
preCheckDurationSum = HeartbeatBreakdownHandleDurationSum.WithLabelValues("PreCheck")
preCheckCount = HeartbeatBreakdownHandleCount.WithLabelValues("PreCheck")
asyncHotStatsDurationSum = HeartbeatBreakdownHandleDurationSum.WithLabelValues("AsyncHotStatsDuration")
asyncHotStatsCount = HeartbeatBreakdownHandleCount.WithLabelValues("AsyncHotStatsDuration")
regionGuideDurationSum = HeartbeatBreakdownHandleDurationSum.WithLabelValues("RegionGuide")
regionGuideCount = HeartbeatBreakdownHandleCount.WithLabelValues("RegionGuide")
checkOverlapsDurationSum = HeartbeatBreakdownHandleDurationSum.WithLabelValues("SaveCache_CheckOverlaps")
checkOverlapsCount = HeartbeatBreakdownHandleCount.WithLabelValues("SaveCache_CheckOverlaps")
validateRegionDurationSum = HeartbeatBreakdownHandleDurationSum.WithLabelValues("SaveCache_InvalidRegion")
validateRegionCount = HeartbeatBreakdownHandleCount.WithLabelValues("SaveCache_InvalidRegion")
setRegionDurationSum = HeartbeatBreakdownHandleDurationSum.WithLabelValues("SaveCache_SetRegion")
setRegionCount = HeartbeatBreakdownHandleCount.WithLabelValues("SaveCache_SetRegion")
updateSubTreeDurationSum = HeartbeatBreakdownHandleDurationSum.WithLabelValues("SaveCache_UpdateSubTree")
updateSubTreeCount = HeartbeatBreakdownHandleCount.WithLabelValues("SaveCache_UpdateSubTree")
otherDurationSum = HeartbeatBreakdownHandleDurationSum.WithLabelValues("Other")
otherCount = HeartbeatBreakdownHandleCount.WithLabelValues("Other")
)

func init() {
prometheus.MustRegister(HeartbeatBreakdownHandleDurationSum)
prometheus.MustRegister(HeartbeatBreakdownHandleCount)
prometheus.MustRegister(AcquireRegionsLockWaitDurationSum)
prometheus.MustRegister(AcquireRegionsLockWaitCount)
}

type saveCacheStats struct {
startTime time.Time
lastCheckTime time.Time
checkOverlapsDuration time.Duration
validateRegionDuration time.Duration
setRegionDuration time.Duration
updateSubTreeDuration time.Duration
}

// RegionHeartbeatProcessTracer is used to trace the process of handling region heartbeat.
type RegionHeartbeatProcessTracer interface {
Begin()
OnPreCheckFinished()
OnAsyncHotStatsFinished()
OnRegionGuideFinished()
OnSaveCacheBegin()
OnSaveCacheFinished()
OnCheckOverlapsFinished()
OnValidateRegionFinished()
OnSetRegionFinished()
OnUpdateSubTreeFinished()
OnAllStageFinished()
LogFields() []zap.Field
}

type noopHeartbeatProcessTracer struct{}

// NewNoopHeartbeatProcessTracer returns a noop heartbeat process tracer.
func NewNoopHeartbeatProcessTracer() RegionHeartbeatProcessTracer {
return &noopHeartbeatProcessTracer{}
}

func (n *noopHeartbeatProcessTracer) Begin() {}
func (n *noopHeartbeatProcessTracer) OnPreCheckFinished() {}
func (n *noopHeartbeatProcessTracer) OnAsyncHotStatsFinished() {}
func (n *noopHeartbeatProcessTracer) OnRegionGuideFinished() {}
func (n *noopHeartbeatProcessTracer) OnSaveCacheBegin() {}
func (n *noopHeartbeatProcessTracer) OnSaveCacheFinished() {}
func (n *noopHeartbeatProcessTracer) OnCheckOverlapsFinished() {}
func (n *noopHeartbeatProcessTracer) OnValidateRegionFinished() {}
func (n *noopHeartbeatProcessTracer) OnSetRegionFinished() {}
func (n *noopHeartbeatProcessTracer) OnUpdateSubTreeFinished() {}
func (n *noopHeartbeatProcessTracer) OnAllStageFinished() {}
func (n *noopHeartbeatProcessTracer) LogFields() []zap.Field {
return nil
}

type regionHeartbeatProcessTracer struct {
startTime time.Time
lastCheckTime time.Time
preCheckDuration time.Duration
asyncHotStatsDuration time.Duration
regionGuideDuration time.Duration
saveCacheStats saveCacheStats
OtherDuration time.Duration
}

// NewHeartbeatProcessTracer returns a heartbeat process tracer.
func NewHeartbeatProcessTracer() RegionHeartbeatProcessTracer {
return &regionHeartbeatProcessTracer{}
}

func (h *regionHeartbeatProcessTracer) Begin() {
now := time.Now()
h.startTime = now
h.lastCheckTime = now
}

func (h *regionHeartbeatProcessTracer) OnPreCheckFinished() {
now := time.Now()
h.preCheckDuration = now.Sub(h.lastCheckTime)
h.lastCheckTime = now
preCheckDurationSum.Add(h.preCheckDuration.Seconds())
preCheckCount.Inc()
Comment on lines +167 to +168
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to flush these metrics asynchronously? It appears that setting the metrics in the middle of processing may affect the performance.

Copy link
Contributor Author

@nolouch nolouch Mar 7, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's ok because all of them use atomic. and we can dynamically disable the trace by the config enable-heartbeat-breakdown-metrics

}

func (h *regionHeartbeatProcessTracer) OnAsyncHotStatsFinished() {
now := time.Now()
h.asyncHotStatsDuration = now.Sub(h.lastCheckTime)
h.lastCheckTime = now
asyncHotStatsDurationSum.Add(h.preCheckDuration.Seconds())
asyncHotStatsCount.Inc()
}

func (h *regionHeartbeatProcessTracer) OnRegionGuideFinished() {
now := time.Now()
h.regionGuideDuration = now.Sub(h.lastCheckTime)
h.lastCheckTime = now
regionGuideDurationSum.Add(h.regionGuideDuration.Seconds())
regionGuideCount.Inc()
}

func (h *regionHeartbeatProcessTracer) OnSaveCacheBegin() {
now := time.Now()
h.saveCacheStats.startTime = now
h.saveCacheStats.lastCheckTime = now
h.lastCheckTime = now
}

func (h *regionHeartbeatProcessTracer) OnSaveCacheFinished() {
// update the outer checkpoint time
h.lastCheckTime = time.Now()
}

func (h *regionHeartbeatProcessTracer) OnCheckOverlapsFinished() {
now := time.Now()
h.saveCacheStats.checkOverlapsDuration = now.Sub(h.lastCheckTime)
h.saveCacheStats.lastCheckTime = now
checkOverlapsDurationSum.Add(h.saveCacheStats.checkOverlapsDuration.Seconds())
checkOverlapsCount.Inc()
}

func (h *regionHeartbeatProcessTracer) OnValidateRegionFinished() {
now := time.Now()
h.saveCacheStats.validateRegionDuration = now.Sub(h.saveCacheStats.lastCheckTime)
h.saveCacheStats.lastCheckTime = now
validateRegionDurationSum.Add(h.saveCacheStats.validateRegionDuration.Seconds())
validateRegionCount.Inc()
}

func (h *regionHeartbeatProcessTracer) OnSetRegionFinished() {
now := time.Now()
h.saveCacheStats.setRegionDuration = now.Sub(h.saveCacheStats.lastCheckTime)
h.saveCacheStats.lastCheckTime = now
setRegionDurationSum.Add(h.saveCacheStats.setRegionDuration.Seconds())
setRegionCount.Inc()
}

func (h *regionHeartbeatProcessTracer) OnUpdateSubTreeFinished() {
now := time.Now()
h.saveCacheStats.updateSubTreeDuration = now.Sub(h.saveCacheStats.lastCheckTime)
h.saveCacheStats.lastCheckTime = now
updateSubTreeDurationSum.Add(h.saveCacheStats.updateSubTreeDuration.Seconds())
updateSubTreeCount.Inc()
}

func (h *regionHeartbeatProcessTracer) OnAllStageFinished() {
now := time.Now()
h.OtherDuration = now.Sub(h.lastCheckTime)
otherDurationSum.Add(h.OtherDuration.Seconds())
otherCount.Inc()
}

func (h *regionHeartbeatProcessTracer) LogFields() []zap.Field {
return []zap.Field{
zap.Duration("pre-check-duration", h.preCheckDuration),
zap.Duration("async-hot-stats-duration", h.asyncHotStatsDuration),
zap.Duration("region-guide-duration", h.regionGuideDuration),
zap.Duration("check-overlaps-duration", h.saveCacheStats.checkOverlapsDuration),
zap.Duration("validate-region-duration", h.saveCacheStats.validateRegionDuration),
zap.Duration("set-region-duration", h.saveCacheStats.setRegionDuration),
zap.Duration("update-sub-tree-duration", h.saveCacheStats.updateSubTreeDuration),
zap.Duration("other-duration", h.OtherDuration),
}
}
90 changes: 84 additions & 6 deletions pkg/core/region.go
Original file line number Diff line number Diff line change
Expand Up @@ -824,12 +824,49 @@ func GenerateRegionGuideFunc(enableLog bool) RegionGuideFunc {
}
}

// RWLockStats is a read-write lock with statistics.
type RWLockStats struct {
syncutil.RWMutex
totalWaitTime int64
lockCount int64
lastLockCount int64
lastTotalWaitTime int64
}

// Lock locks the lock and records the waiting time.
func (l *RWLockStats) Lock() {
startTime := time.Now()
l.RWMutex.Lock()
elapsed := time.Since(startTime).Nanoseconds()
atomic.AddInt64(&l.totalWaitTime, elapsed)
atomic.AddInt64(&l.lockCount, 1)
}

// Unlock unlocks the lock.
func (l *RWLockStats) Unlock() {
l.RWMutex.Unlock()
}

// RLock locks the lock for reading and records the waiting time.
func (l *RWLockStats) RLock() {
startTime := time.Now()
l.RWMutex.RLock()
elapsed := time.Since(startTime).Nanoseconds()
atomic.AddInt64(&l.totalWaitTime, elapsed)
atomic.AddInt64(&l.lockCount, 1)
}

// RUnlock unlocks the lock for reading.
func (l *RWLockStats) RUnlock() {
l.RWMutex.RUnlock()
}

// RegionsInfo for export
type RegionsInfo struct {
t syncutil.RWMutex
t RWLockStats
tree *regionTree
regions map[uint64]*regionItem // regionID -> regionInfo
st syncutil.RWMutex
st RWLockStats
subRegions map[uint64]*regionItem // regionID -> regionInfo
leaders map[uint64]*regionTree // storeID -> sub regionTree
followers map[uint64]*regionTree // storeID -> sub regionTree
Expand Down Expand Up @@ -896,33 +933,38 @@ func (r *RegionsInfo) PutRegion(region *RegionInfo) []*RegionInfo {
}

// PreCheckPutRegion checks if the region is valid to put.
func (r *RegionsInfo) PreCheckPutRegion(region *RegionInfo) (*RegionInfo, []*regionItem, error) {
origin, overlaps := r.GetRelevantRegions(region)
func (r *RegionsInfo) PreCheckPutRegion(region *RegionInfo, trace RegionHeartbeatProcessTracer) (*RegionInfo, []*regionItem, error) {
origin, overlaps := r.GetRelevantRegions(region, trace)
err := check(region, origin, overlaps)
return origin, overlaps, err
}

// AtomicCheckAndPutRegion checks if the region is valid to put, if valid then put.
func (r *RegionsInfo) AtomicCheckAndPutRegion(region *RegionInfo) ([]*RegionInfo, error) {
func (r *RegionsInfo) AtomicCheckAndPutRegion(region *RegionInfo, trace RegionHeartbeatProcessTracer) ([]*RegionInfo, error) {
r.t.Lock()
var ols []*regionItem
origin := r.getRegionLocked(region.GetID())
if origin == nil || !bytes.Equal(origin.GetStartKey(), region.GetStartKey()) || !bytes.Equal(origin.GetEndKey(), region.GetEndKey()) {
ols = r.tree.overlaps(&regionItem{RegionInfo: region})
}
trace.OnCheckOverlapsFinished()
err := check(region, origin, ols)
if err != nil {
r.t.Unlock()
trace.OnValidateRegionFinished()
return nil, err
}
trace.OnValidateRegionFinished()
origin, overlaps, rangeChanged := r.setRegionLocked(region, true, ols...)
r.t.Unlock()
trace.OnSetRegionFinished()
r.UpdateSubTree(region, origin, overlaps, rangeChanged)
trace.OnUpdateSubTreeFinished()
return overlaps, nil
}

// GetRelevantRegions returns the relevant regions for a given region.
func (r *RegionsInfo) GetRelevantRegions(region *RegionInfo) (origin *RegionInfo, overlaps []*regionItem) {
func (r *RegionsInfo) GetRelevantRegions(region *RegionInfo, trace RegionHeartbeatProcessTracer) (origin *RegionInfo, overlaps []*regionItem) {
r.t.RLock()
defer r.t.RUnlock()
origin = r.getRegionLocked(region.GetID())
Expand Down Expand Up @@ -1653,6 +1695,42 @@ func (r *RegionsInfo) GetRegionSizeByRange(startKey, endKey []byte) int64 {
return size
}

// metrics default poll interval
const magicCount = 15 * time.Second
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about just DefaultPollInterval?


// CollectWaitLockMetrics collects the metrics of waiting time for lock
func (r *RegionsInfo) CollectWaitLockMetrics() {
regionsLockTotalWaitTime := atomic.LoadInt64(&r.t.totalWaitTime)
regionsLockCount := atomic.LoadInt64(&r.t.lockCount)

lastRegionsLockTotalWaitTime := atomic.LoadInt64(&r.t.lastTotalWaitTime)
lastsRegionsLockCount := atomic.LoadInt64(&r.t.lastLockCount)

subRegionsLockTotalWaitTime := atomic.LoadInt64(&r.st.totalWaitTime)
subRegionsLockCount := atomic.LoadInt64(&r.st.lockCount)

lastSubRegionsLockTotalWaitTime := atomic.LoadInt64(&r.st.lastTotalWaitTime)
lastSubRegionsLockCount := atomic.LoadInt64(&r.st.lastLockCount)

// update last metrics
atomic.StoreInt64(&r.t.lastTotalWaitTime, regionsLockTotalWaitTime)
atomic.StoreInt64(&r.t.lastLockCount, regionsLockCount)
atomic.StoreInt64(&r.st.lastTotalWaitTime, subRegionsLockTotalWaitTime)
atomic.StoreInt64(&r.st.lastLockCount, subRegionsLockCount)

// skip invalid situation like initial status
if lastRegionsLockTotalWaitTime == 0 || lastsRegionsLockCount == 0 || lastSubRegionsLockTotalWaitTime == 0 || lastSubRegionsLockCount == 0 ||
regionsLockTotalWaitTime-lastRegionsLockTotalWaitTime < 0 || regionsLockTotalWaitTime-lastRegionsLockTotalWaitTime > int64(magicCount) ||
subRegionsLockTotalWaitTime-lastSubRegionsLockTotalWaitTime < 0 || subRegionsLockTotalWaitTime-lastSubRegionsLockTotalWaitTime > int64(magicCount) {
return
}

waitRegionsLockDurationSum.Add(time.Duration(regionsLockTotalWaitTime - lastRegionsLockTotalWaitTime).Seconds())
waitRegionsLockCount.Add(float64(regionsLockCount - lastsRegionsLockCount))
waitSubRegionsLockDurationSum.Add(time.Duration(subRegionsLockTotalWaitTime - lastSubRegionsLockTotalWaitTime).Seconds())
waitSubRegionsLockCount.Add(float64(subRegionsLockCount - lastSubRegionsLockCount))
}

// GetAdjacentRegions returns region's info that is adjacent with specific region
func (r *RegionsInfo) GetAdjacentRegions(region *RegionInfo) (*RegionInfo, *RegionInfo) {
r.t.RLock()
Expand Down
4 changes: 2 additions & 2 deletions pkg/core/region_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -459,9 +459,9 @@ func TestSetRegionConcurrence(t *testing.T) {
regions := NewRegionsInfo()
region := NewTestRegionInfo(1, 1, []byte("a"), []byte("b"))
go func() {
regions.AtomicCheckAndPutRegion(region)
regions.AtomicCheckAndPutRegion(region, NewNoopHeartbeatProcessTracer())
}()
regions.AtomicCheckAndPutRegion(region)
regions.AtomicCheckAndPutRegion(region, NewNoopHeartbeatProcessTracer())
re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/core/UpdateSubTree"))
}

Expand Down