Skip to content

Commit

Permalink
Per shard per namespace RPS warning log (#4525)
Browse files Browse the repository at this point in the history
* add warning log for high per shard per ns rps
  • Loading branch information
pdoerner committed Jul 21, 2023
1 parent ec16a06 commit 4edce52
Show file tree
Hide file tree
Showing 6 changed files with 124 additions and 99 deletions.
3 changes: 3 additions & 0 deletions common/dynamicconfig/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,9 @@ const (
PersistenceHealthSignalBufferSize = "system.persistenceHealthSignalBufferSize"
// ShardRPSWarnLimit is the per-shard RPS limit for warning
ShardRPSWarnLimit = "system.shardRPSWarnLimit"
// ShardPerNsRPSWarnPercent is the per-shard per-namespace RPS limit for warning as a percentage of ShardRPSWarnLimit
// these warning are not emitted if the value is set to 0 or less
ShardPerNsRPSWarnPercent = "system.shardPerNsRPSWarnPercent"

// Whether the deadlock detector should dump goroutines
DeadlockDumpGoroutines = "system.deadlock.DumpGoroutines"
Expand Down
5 changes: 5 additions & 0 deletions common/log/tag/tags.go
Original file line number Diff line number Diff line change
Expand Up @@ -477,6 +477,11 @@ func RequestCount(c int) ZapTag {
return NewInt("request-count", c)
}

// RPS returns tag for requests per second
func RPS(c int64) ZapTag {
return NewInt64("rps", c)
}

// Number returns tag for Number
func Number(n int64) ZapTag {
return NewInt64("number", n)
Expand Down
3 changes: 2 additions & 1 deletion common/persistence/client/fx.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ func FactoryProvider(
func HealthSignalAggregatorProvider(
dynamicCollection *dynamicconfig.Collection,
metricsHandler metrics.Handler,
logger log.Logger,
logger log.ThrottledLogger,
) persistence.HealthSignalAggregator {
if dynamicCollection.GetBoolProperty(dynamicconfig.PersistenceHealthSignalMetricsEnabled, true)() {
return persistence.NewHealthSignalAggregatorImpl(
Expand All @@ -125,6 +125,7 @@ func HealthSignalAggregatorProvider(
dynamicCollection.GetIntProperty(dynamicconfig.PersistenceHealthSignalBufferSize, 5000)(),
metricsHandler,
dynamicCollection.GetIntProperty(dynamicconfig.ShardRPSWarnLimit, 50),
dynamicCollection.GetFloat64Property(dynamicconfig.ShardPerNsRPSWarnPercent, 0.8),
logger,
)
}
Expand Down
62 changes: 39 additions & 23 deletions common/persistence/health_signal_aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ const (
type (
HealthSignalAggregator interface {
common.Daemon
Record(callerSegment int32, latency time.Duration, err error)
Record(callerSegment int32, namespace string, latency time.Duration, err error)
AverageLatency() float64
ErrorRatio() float64
}
Expand All @@ -53,16 +53,18 @@ type (
status int32
shutdownCh chan struct{}

requestsPerShard map[int32]int64
requestsLock sync.Mutex
// map of shardID -> map of namespace -> request count
requestCounts map[int32]map[string]int64
requestsLock sync.Mutex

aggregationEnabled bool
latencyAverage aggregate.MovingWindowAverage
errorRatio aggregate.MovingWindowAverage

metricsHandler metrics.Handler
emitMetricsTimer *time.Ticker
perShardRPSWarnLimit dynamicconfig.IntPropertyFn
metricsHandler metrics.Handler
emitMetricsTimer *time.Ticker
perShardRPSWarnLimit dynamicconfig.IntPropertyFn
perShardPerNsRPSWarnLimit dynamicconfig.FloatPropertyFn

logger log.Logger
}
Expand All @@ -74,17 +76,19 @@ func NewHealthSignalAggregatorImpl(
maxBufferSize int,
metricsHandler metrics.Handler,
perShardRPSWarnLimit dynamicconfig.IntPropertyFn,
perShardPerNsRPSWarnLimit dynamicconfig.FloatPropertyFn,
logger log.Logger,
) *HealthSignalAggregatorImpl {
ret := &HealthSignalAggregatorImpl{
status: common.DaemonStatusInitialized,
shutdownCh: make(chan struct{}),
requestsPerShard: make(map[int32]int64),
metricsHandler: metricsHandler,
emitMetricsTimer: time.NewTicker(emitMetricsInterval),
perShardRPSWarnLimit: perShardRPSWarnLimit,
logger: logger,
aggregationEnabled: aggregationEnabled,
status: common.DaemonStatusInitialized,
shutdownCh: make(chan struct{}),
requestCounts: make(map[int32]map[string]int64),
metricsHandler: metricsHandler,
emitMetricsTimer: time.NewTicker(emitMetricsInterval),
perShardRPSWarnLimit: perShardRPSWarnLimit,
perShardPerNsRPSWarnLimit: perShardPerNsRPSWarnLimit,
logger: logger,
aggregationEnabled: aggregationEnabled,
}

if aggregationEnabled {
Expand Down Expand Up @@ -113,7 +117,7 @@ func (s *HealthSignalAggregatorImpl) Stop() {
s.emitMetricsTimer.Stop()
}

func (s *HealthSignalAggregatorImpl) Record(callerSegment int32, latency time.Duration, err error) {
func (s *HealthSignalAggregatorImpl) Record(callerSegment int32, namespace string, latency time.Duration, err error) {
if s.aggregationEnabled {
s.latencyAverage.Record(latency.Milliseconds())

Expand All @@ -125,7 +129,7 @@ func (s *HealthSignalAggregatorImpl) Record(callerSegment int32, latency time.Du
}

if callerSegment != CallerSegmentMissing {
s.incrementShardRequestCount(callerSegment)
s.incrementShardRequestCount(callerSegment, namespace)
}
}

Expand All @@ -137,10 +141,13 @@ func (s *HealthSignalAggregatorImpl) ErrorRatio() float64 {
return s.errorRatio.Average()
}

func (s *HealthSignalAggregatorImpl) incrementShardRequestCount(shardID int32) {
func (s *HealthSignalAggregatorImpl) incrementShardRequestCount(shardID int32, namespace string) {
s.requestsLock.Lock()
defer s.requestsLock.Unlock()
s.requestsPerShard[shardID]++
if s.requestCounts[shardID] == nil {
s.requestCounts[shardID] = make(map[string]int64)
}
s.requestCounts[shardID][namespace]++
}

func (s *HealthSignalAggregatorImpl) emitMetricsLoop() {
Expand All @@ -150,15 +157,24 @@ func (s *HealthSignalAggregatorImpl) emitMetricsLoop() {
return
case <-s.emitMetricsTimer.C:
s.requestsLock.Lock()
requestCounts := s.requestsPerShard
s.requestsPerShard = make(map[int32]int64, len(requestCounts))
requestCounts := s.requestCounts
s.requestCounts = make(map[int32]map[string]int64, len(requestCounts))
s.requestsLock.Unlock()

for shardID, count := range requestCounts {
shardRPS := int64(float64(count) / emitMetricsInterval.Seconds())
for shardID, requestCountPerNS := range requestCounts {
shardRequestCount := int64(0)
for namespace, count := range requestCountPerNS {
shardRequestCount += count
shardRPSPerNS := int64(float64(count) / emitMetricsInterval.Seconds())
if s.perShardPerNsRPSWarnLimit() > 0.0 && shardRPSPerNS > int64(s.perShardPerNsRPSWarnLimit()*float64(s.perShardRPSWarnLimit())) {
s.logger.Warn("Per shard per namespace RPS warn limit exceeded", tag.ShardID(shardID), tag.WorkflowNamespace(namespace), tag.RPS(shardRPSPerNS))
}
}

shardRPS := int64(float64(shardRequestCount) / emitMetricsInterval.Seconds())
s.metricsHandler.Histogram(metrics.PersistenceShardRPS.GetMetricName(), metrics.PersistenceShardRPS.GetMetricUnit()).Record(shardRPS)
if shardRPS > int64(s.perShardRPSWarnLimit()) {
s.logger.Warn("Per shard RPS warn limit exceeded", tag.ShardID(shardID))
s.logger.Warn("Per shard RPS warn limit exceeded", tag.ShardID(shardID), tag.RPS(shardRPS))
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion common/persistence/noop_health_signal_aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func (a *noopSignalAggregator) Start() {}

func (a *noopSignalAggregator) Stop() {}

func (a *noopSignalAggregator) Record(_ int32, _ time.Duration, _ error) {}
func (a *noopSignalAggregator) Record(_ int32, _ string, _ time.Duration, _ error) {}

func (a *noopSignalAggregator) AverageLatency() float64 {
return 0
Expand Down

0 comments on commit 4edce52

Please sign in to comment.