From 4edce52ec49499012f88e7fc9eb083ae0c4e0d6b Mon Sep 17 00:00:00 2001 From: pdoerner <122412190+pdoerner@users.noreply.github.com> Date: Tue, 27 Jun 2023 15:18:15 -0700 Subject: [PATCH] Per shard per namespace RPS warning log (#4525) * add warning log for high per shard per ns rps --- common/dynamicconfig/constants.go | 3 + common/log/tag/tags.go | 5 + common/persistence/client/fx.go | 3 +- .../persistence/health_signal_aggregator.go | 62 +++++--- .../noop_health_signal_aggregator.go | 2 +- .../persistence/persistenceMetricClients.go | 148 +++++++++--------- 6 files changed, 124 insertions(+), 99 deletions(-) diff --git a/common/dynamicconfig/constants.go b/common/dynamicconfig/constants.go index a2d5ed041ea..02d6fbce903 100644 --- a/common/dynamicconfig/constants.go +++ b/common/dynamicconfig/constants.go @@ -116,6 +116,9 @@ const ( PersistenceHealthSignalBufferSize = "system.persistenceHealthSignalBufferSize" // ShardRPSWarnLimit is the per-shard RPS limit for warning ShardRPSWarnLimit = "system.shardRPSWarnLimit" + // ShardPerNsRPSWarnPercent is the per-shard per-namespace RPS limit for warning as a percentage of ShardRPSWarnLimit + // these warning are not emitted if the value is set to 0 or less + ShardPerNsRPSWarnPercent = "system.shardPerNsRPSWarnPercent" // Whether the deadlock detector should dump goroutines DeadlockDumpGoroutines = "system.deadlock.DumpGoroutines" diff --git a/common/log/tag/tags.go b/common/log/tag/tags.go index 9dcdf44360e..1b66efc15e3 100644 --- a/common/log/tag/tags.go +++ b/common/log/tag/tags.go @@ -477,6 +477,11 @@ func RequestCount(c int) ZapTag { return NewInt("request-count", c) } +// RPS returns tag for requests per second +func RPS(c int64) ZapTag { + return NewInt64("rps", c) +} + // Number returns tag for Number func Number(n int64) ZapTag { return NewInt64("number", n) diff --git a/common/persistence/client/fx.go b/common/persistence/client/fx.go index 69ebc06c1b7..244a29ac0c5 100644 --- a/common/persistence/client/fx.go +++ b/common/persistence/client/fx.go @@ -116,7 +116,7 @@ func FactoryProvider( func HealthSignalAggregatorProvider( dynamicCollection *dynamicconfig.Collection, metricsHandler metrics.Handler, - logger log.Logger, + logger log.ThrottledLogger, ) persistence.HealthSignalAggregator { if dynamicCollection.GetBoolProperty(dynamicconfig.PersistenceHealthSignalMetricsEnabled, true)() { return persistence.NewHealthSignalAggregatorImpl( @@ -125,6 +125,7 @@ func HealthSignalAggregatorProvider( dynamicCollection.GetIntProperty(dynamicconfig.PersistenceHealthSignalBufferSize, 5000)(), metricsHandler, dynamicCollection.GetIntProperty(dynamicconfig.ShardRPSWarnLimit, 50), + dynamicCollection.GetFloat64Property(dynamicconfig.ShardPerNsRPSWarnPercent, 0.8), logger, ) } diff --git a/common/persistence/health_signal_aggregator.go b/common/persistence/health_signal_aggregator.go index 2a63d0f7edc..ff83174d6d5 100644 --- a/common/persistence/health_signal_aggregator.go +++ b/common/persistence/health_signal_aggregator.go @@ -44,7 +44,7 @@ const ( type ( HealthSignalAggregator interface { common.Daemon - Record(callerSegment int32, latency time.Duration, err error) + Record(callerSegment int32, namespace string, latency time.Duration, err error) AverageLatency() float64 ErrorRatio() float64 } @@ -53,16 +53,18 @@ type ( status int32 shutdownCh chan struct{} - requestsPerShard map[int32]int64 - requestsLock sync.Mutex + // map of shardID -> map of namespace -> request count + requestCounts map[int32]map[string]int64 + requestsLock sync.Mutex aggregationEnabled bool latencyAverage aggregate.MovingWindowAverage errorRatio aggregate.MovingWindowAverage - metricsHandler metrics.Handler - emitMetricsTimer *time.Ticker - perShardRPSWarnLimit dynamicconfig.IntPropertyFn + metricsHandler metrics.Handler + emitMetricsTimer *time.Ticker + perShardRPSWarnLimit dynamicconfig.IntPropertyFn + perShardPerNsRPSWarnLimit dynamicconfig.FloatPropertyFn logger log.Logger } @@ -74,17 +76,19 @@ func NewHealthSignalAggregatorImpl( maxBufferSize int, metricsHandler metrics.Handler, perShardRPSWarnLimit dynamicconfig.IntPropertyFn, + perShardPerNsRPSWarnLimit dynamicconfig.FloatPropertyFn, logger log.Logger, ) *HealthSignalAggregatorImpl { ret := &HealthSignalAggregatorImpl{ - status: common.DaemonStatusInitialized, - shutdownCh: make(chan struct{}), - requestsPerShard: make(map[int32]int64), - metricsHandler: metricsHandler, - emitMetricsTimer: time.NewTicker(emitMetricsInterval), - perShardRPSWarnLimit: perShardRPSWarnLimit, - logger: logger, - aggregationEnabled: aggregationEnabled, + status: common.DaemonStatusInitialized, + shutdownCh: make(chan struct{}), + requestCounts: make(map[int32]map[string]int64), + metricsHandler: metricsHandler, + emitMetricsTimer: time.NewTicker(emitMetricsInterval), + perShardRPSWarnLimit: perShardRPSWarnLimit, + perShardPerNsRPSWarnLimit: perShardPerNsRPSWarnLimit, + logger: logger, + aggregationEnabled: aggregationEnabled, } if aggregationEnabled { @@ -113,7 +117,7 @@ func (s *HealthSignalAggregatorImpl) Stop() { s.emitMetricsTimer.Stop() } -func (s *HealthSignalAggregatorImpl) Record(callerSegment int32, latency time.Duration, err error) { +func (s *HealthSignalAggregatorImpl) Record(callerSegment int32, namespace string, latency time.Duration, err error) { if s.aggregationEnabled { s.latencyAverage.Record(latency.Milliseconds()) @@ -125,7 +129,7 @@ func (s *HealthSignalAggregatorImpl) Record(callerSegment int32, latency time.Du } if callerSegment != CallerSegmentMissing { - s.incrementShardRequestCount(callerSegment) + s.incrementShardRequestCount(callerSegment, namespace) } } @@ -137,10 +141,13 @@ func (s *HealthSignalAggregatorImpl) ErrorRatio() float64 { return s.errorRatio.Average() } -func (s *HealthSignalAggregatorImpl) incrementShardRequestCount(shardID int32) { +func (s *HealthSignalAggregatorImpl) incrementShardRequestCount(shardID int32, namespace string) { s.requestsLock.Lock() defer s.requestsLock.Unlock() - s.requestsPerShard[shardID]++ + if s.requestCounts[shardID] == nil { + s.requestCounts[shardID] = make(map[string]int64) + } + s.requestCounts[shardID][namespace]++ } func (s *HealthSignalAggregatorImpl) emitMetricsLoop() { @@ -150,15 +157,24 @@ func (s *HealthSignalAggregatorImpl) emitMetricsLoop() { return case <-s.emitMetricsTimer.C: s.requestsLock.Lock() - requestCounts := s.requestsPerShard - s.requestsPerShard = make(map[int32]int64, len(requestCounts)) + requestCounts := s.requestCounts + s.requestCounts = make(map[int32]map[string]int64, len(requestCounts)) s.requestsLock.Unlock() - for shardID, count := range requestCounts { - shardRPS := int64(float64(count) / emitMetricsInterval.Seconds()) + for shardID, requestCountPerNS := range requestCounts { + shardRequestCount := int64(0) + for namespace, count := range requestCountPerNS { + shardRequestCount += count + shardRPSPerNS := int64(float64(count) / emitMetricsInterval.Seconds()) + if s.perShardPerNsRPSWarnLimit() > 0.0 && shardRPSPerNS > int64(s.perShardPerNsRPSWarnLimit()*float64(s.perShardRPSWarnLimit())) { + s.logger.Warn("Per shard per namespace RPS warn limit exceeded", tag.ShardID(shardID), tag.WorkflowNamespace(namespace), tag.RPS(shardRPSPerNS)) + } + } + + shardRPS := int64(float64(shardRequestCount) / emitMetricsInterval.Seconds()) s.metricsHandler.Histogram(metrics.PersistenceShardRPS.GetMetricName(), metrics.PersistenceShardRPS.GetMetricUnit()).Record(shardRPS) if shardRPS > int64(s.perShardRPSWarnLimit()) { - s.logger.Warn("Per shard RPS warn limit exceeded", tag.ShardID(shardID)) + s.logger.Warn("Per shard RPS warn limit exceeded", tag.ShardID(shardID), tag.RPS(shardRPS)) } } } diff --git a/common/persistence/noop_health_signal_aggregator.go b/common/persistence/noop_health_signal_aggregator.go index dff0c151d28..f1200886b65 100644 --- a/common/persistence/noop_health_signal_aggregator.go +++ b/common/persistence/noop_health_signal_aggregator.go @@ -40,7 +40,7 @@ func (a *noopSignalAggregator) Start() {} func (a *noopSignalAggregator) Stop() {} -func (a *noopSignalAggregator) Record(_ int32, _ time.Duration, _ error) {} +func (a *noopSignalAggregator) Record(_ int32, _ string, _ time.Duration, _ error) {} func (a *noopSignalAggregator) AverageLatency() float64 { return 0 diff --git a/common/persistence/persistenceMetricClients.go b/common/persistence/persistenceMetricClients.go index 625c8f57ba3..b8444a627b3 100644 --- a/common/persistence/persistenceMetricClients.go +++ b/common/persistence/persistenceMetricClients.go @@ -172,7 +172,7 @@ func (p *shardPersistenceClient) GetOrCreateShard( startTime := time.Now().UTC() defer func() { latency := time.Since(startTime) - p.healthSignals.Record(request.ShardID, latency, retErr) + p.healthSignals.Record(request.ShardID, caller, latency, retErr) p.recordRequestMetrics(metrics.PersistenceGetOrCreateShardScope, caller, latency, retErr) }() return p.persistence.GetOrCreateShard(ctx, request) @@ -186,7 +186,7 @@ func (p *shardPersistenceClient) UpdateShard( startTime := time.Now().UTC() defer func() { latency := time.Since(startTime) - p.healthSignals.Record(request.ShardInfo.GetShardId(), latency, retErr) + p.healthSignals.Record(request.ShardInfo.GetShardId(), caller, latency, retErr) p.recordRequestMetrics(metrics.PersistenceUpdateShardScope, caller, latency, retErr) }() return p.persistence.UpdateShard(ctx, request) @@ -200,7 +200,7 @@ func (p *shardPersistenceClient) AssertShardOwnership( startTime := time.Now().UTC() defer func() { latency := time.Since(startTime) - p.healthSignals.Record(request.ShardID, latency, retErr) + p.healthSignals.Record(request.ShardID, caller, latency, retErr) p.recordRequestMetrics(metrics.PersistenceAssertShardOwnershipScope, caller, latency, retErr) }() return p.persistence.AssertShardOwnership(ctx, request) @@ -226,7 +226,7 @@ func (p *executionPersistenceClient) CreateWorkflowExecution( startTime := time.Now().UTC() defer func() { latency := time.Since(startTime) - p.healthSignals.Record(request.ShardID, latency, retErr) + p.healthSignals.Record(request.ShardID, caller, latency, retErr) p.recordRequestMetrics(metrics.PersistenceCreateWorkflowExecutionScope, caller, latency, retErr) }() return p.persistence.CreateWorkflowExecution(ctx, request) @@ -240,7 +240,7 @@ func (p *executionPersistenceClient) GetWorkflowExecution( startTime := time.Now().UTC() defer func() { latency := time.Since(startTime) - p.healthSignals.Record(request.ShardID, latency, retErr) + p.healthSignals.Record(request.ShardID, caller, latency, retErr) p.recordRequestMetrics(metrics.PersistenceGetWorkflowExecutionScope, caller, latency, retErr) }() return p.persistence.GetWorkflowExecution(ctx, request) @@ -254,7 +254,7 @@ func (p *executionPersistenceClient) SetWorkflowExecution( startTime := time.Now().UTC() defer func() { latency := time.Since(startTime) - p.healthSignals.Record(request.ShardID, latency, retErr) + p.healthSignals.Record(request.ShardID, caller, latency, retErr) p.recordRequestMetrics(metrics.PersistenceSetWorkflowExecutionScope, caller, latency, retErr) }() return p.persistence.SetWorkflowExecution(ctx, request) @@ -268,7 +268,7 @@ func (p *executionPersistenceClient) UpdateWorkflowExecution( startTime := time.Now().UTC() defer func() { latency := time.Since(startTime) - p.healthSignals.Record(request.ShardID, latency, retErr) + p.healthSignals.Record(request.ShardID, caller, latency, retErr) p.recordRequestMetrics(metrics.PersistenceUpdateWorkflowExecutionScope, caller, latency, retErr) }() return p.persistence.UpdateWorkflowExecution(ctx, request) @@ -282,7 +282,7 @@ func (p *executionPersistenceClient) ConflictResolveWorkflowExecution( startTime := time.Now().UTC() defer func() { latency := time.Since(startTime) - p.healthSignals.Record(request.ShardID, latency, retErr) + p.healthSignals.Record(request.ShardID, caller, latency, retErr) p.recordRequestMetrics(metrics.PersistenceConflictResolveWorkflowExecutionScope, caller, latency, retErr) }() return p.persistence.ConflictResolveWorkflowExecution(ctx, request) @@ -296,7 +296,7 @@ func (p *executionPersistenceClient) DeleteWorkflowExecution( startTime := time.Now().UTC() defer func() { latency := time.Since(startTime) - p.healthSignals.Record(request.ShardID, latency, retErr) + p.healthSignals.Record(request.ShardID, caller, latency, retErr) p.recordRequestMetrics(metrics.PersistenceDeleteWorkflowExecutionScope, caller, latency, retErr) }() return p.persistence.DeleteWorkflowExecution(ctx, request) @@ -310,7 +310,7 @@ func (p *executionPersistenceClient) DeleteCurrentWorkflowExecution( startTime := time.Now().UTC() defer func() { latency := time.Since(startTime) - p.healthSignals.Record(request.ShardID, latency, retErr) + p.healthSignals.Record(request.ShardID, caller, latency, retErr) p.recordRequestMetrics(metrics.PersistenceDeleteCurrentWorkflowExecutionScope, caller, latency, retErr) }() return p.persistence.DeleteCurrentWorkflowExecution(ctx, request) @@ -324,7 +324,7 @@ func (p *executionPersistenceClient) GetCurrentExecution( startTime := time.Now().UTC() defer func() { latency := time.Since(startTime) - p.healthSignals.Record(request.ShardID, latency, retErr) + p.healthSignals.Record(request.ShardID, caller, latency, retErr) p.recordRequestMetrics(metrics.PersistenceGetCurrentExecutionScope, caller, latency, retErr) }() return p.persistence.GetCurrentExecution(ctx, request) @@ -338,7 +338,7 @@ func (p *executionPersistenceClient) ListConcreteExecutions( startTime := time.Now().UTC() defer func() { latency := time.Since(startTime) - p.healthSignals.Record(request.ShardID, latency, retErr) + p.healthSignals.Record(request.ShardID, caller, latency, retErr) p.recordRequestMetrics(metrics.PersistenceListConcreteExecutionsScope, caller, latency, retErr) }() return p.persistence.ListConcreteExecutions(ctx, request) @@ -379,7 +379,7 @@ func (p *executionPersistenceClient) AddHistoryTasks( startTime := time.Now().UTC() defer func() { latency := time.Since(startTime) - p.healthSignals.Record(request.ShardID, latency, retErr) + p.healthSignals.Record(request.ShardID, caller, latency, retErr) p.recordRequestMetrics(metrics.PersistenceAddTasksScope, caller, latency, retErr) }() return p.persistence.AddHistoryTasks(ctx, request) @@ -409,7 +409,7 @@ func (p *executionPersistenceClient) GetHistoryTasks( startTime := time.Now().UTC() defer func() { latency := time.Since(startTime) - p.healthSignals.Record(request.ShardID, latency, retErr) + p.healthSignals.Record(request.ShardID, caller, latency, retErr) p.recordRequestMetrics(operation, caller, latency, retErr) }() return p.persistence.GetHistoryTasks(ctx, request) @@ -439,7 +439,7 @@ func (p *executionPersistenceClient) CompleteHistoryTask( startTime := time.Now().UTC() defer func() { latency := time.Since(startTime) - p.healthSignals.Record(request.ShardID, latency, retErr) + p.healthSignals.Record(request.ShardID, caller, latency, retErr) p.recordRequestMetrics(operation, caller, latency, retErr) }() return p.persistence.CompleteHistoryTask(ctx, request) @@ -469,7 +469,7 @@ func (p *executionPersistenceClient) RangeCompleteHistoryTasks( startTime := time.Now().UTC() defer func() { latency := time.Since(startTime) - p.healthSignals.Record(request.ShardID, latency, retErr) + p.healthSignals.Record(request.ShardID, caller, latency, retErr) p.recordRequestMetrics(operation, caller, latency, retErr) }() return p.persistence.RangeCompleteHistoryTasks(ctx, request) @@ -483,7 +483,7 @@ func (p *executionPersistenceClient) PutReplicationTaskToDLQ( startTime := time.Now().UTC() defer func() { latency := time.Since(startTime) - p.healthSignals.Record(request.ShardID, latency, retErr) + p.healthSignals.Record(request.ShardID, caller, latency, retErr) p.recordRequestMetrics(metrics.PersistencePutReplicationTaskToDLQScope, caller, latency, retErr) }() return p.persistence.PutReplicationTaskToDLQ(ctx, request) @@ -497,7 +497,7 @@ func (p *executionPersistenceClient) GetReplicationTasksFromDLQ( startTime := time.Now().UTC() defer func() { latency := time.Since(startTime) - p.healthSignals.Record(request.ShardID, latency, retErr) + p.healthSignals.Record(request.ShardID, caller, latency, retErr) p.recordRequestMetrics(metrics.PersistenceGetReplicationTasksFromDLQScope, caller, latency, retErr) }() return p.persistence.GetReplicationTasksFromDLQ(ctx, request) @@ -511,7 +511,7 @@ func (p *executionPersistenceClient) DeleteReplicationTaskFromDLQ( startTime := time.Now().UTC() defer func() { latency := time.Since(startTime) - p.healthSignals.Record(request.ShardID, latency, retErr) + p.healthSignals.Record(request.ShardID, caller, latency, retErr) p.recordRequestMetrics(metrics.PersistenceDeleteReplicationTaskFromDLQScope, caller, latency, retErr) }() return p.persistence.DeleteReplicationTaskFromDLQ(ctx, request) @@ -525,7 +525,7 @@ func (p *executionPersistenceClient) RangeDeleteReplicationTaskFromDLQ( startTime := time.Now().UTC() defer func() { latency := time.Since(startTime) - p.healthSignals.Record(request.ShardID, latency, retErr) + p.healthSignals.Record(request.ShardID, caller, latency, retErr) p.recordRequestMetrics(metrics.PersistenceRangeDeleteReplicationTaskFromDLQScope, caller, latency, retErr) }() return p.persistence.RangeDeleteReplicationTaskFromDLQ(ctx, request) @@ -539,7 +539,7 @@ func (p *executionPersistenceClient) IsReplicationDLQEmpty( startTime := time.Now().UTC() defer func() { latency := time.Since(startTime) - p.healthSignals.Record(request.ShardID, latency, retErr) + p.healthSignals.Record(request.ShardID, caller, latency, retErr) p.recordRequestMetrics(metrics.PersistenceGetReplicationTasksFromDLQScope, caller, latency, retErr) }() return p.persistence.IsReplicationDLQEmpty(ctx, request) @@ -561,7 +561,7 @@ func (p *taskPersistenceClient) CreateTasks( startTime := time.Now().UTC() defer func() { latency := time.Since(startTime) - p.healthSignals.Record(CallerSegmentMissing, latency, retErr) + p.healthSignals.Record(CallerSegmentMissing, caller, latency, retErr) p.recordRequestMetrics(metrics.PersistenceCreateTasksScope, caller, latency, retErr) }() return p.persistence.CreateTasks(ctx, request) @@ -575,7 +575,7 @@ func (p *taskPersistenceClient) GetTasks( startTime := time.Now().UTC() defer func() { latency := time.Since(startTime) - p.healthSignals.Record(CallerSegmentMissing, latency, retErr) + p.healthSignals.Record(CallerSegmentMissing, caller, latency, retErr) p.recordRequestMetrics(metrics.PersistenceGetTasksScope, caller, latency, retErr) }() return p.persistence.GetTasks(ctx, request) @@ -589,7 +589,7 @@ func (p *taskPersistenceClient) CompleteTask( startTime := time.Now().UTC() defer func() { latency := time.Since(startTime) - p.healthSignals.Record(CallerSegmentMissing, latency, retErr) + p.healthSignals.Record(CallerSegmentMissing, caller, latency, retErr) p.recordRequestMetrics(metrics.PersistenceCompleteTaskScope, caller, latency, retErr) }() return p.persistence.CompleteTask(ctx, request) @@ -603,7 +603,7 @@ func (p *taskPersistenceClient) CompleteTasksLessThan( startTime := time.Now().UTC() defer func() { latency := time.Since(startTime) - p.healthSignals.Record(CallerSegmentMissing, latency, retErr) + p.healthSignals.Record(CallerSegmentMissing, caller, latency, retErr) p.recordRequestMetrics(metrics.PersistenceCompleteTasksLessThanScope, caller, latency, retErr) }() return p.persistence.CompleteTasksLessThan(ctx, request) @@ -617,7 +617,7 @@ func (p *taskPersistenceClient) CreateTaskQueue( startTime := time.Now().UTC() defer func() { latency := time.Since(startTime) - p.healthSignals.Record(CallerSegmentMissing, latency, retErr) + p.healthSignals.Record(CallerSegmentMissing, caller, latency, retErr) p.recordRequestMetrics(metrics.PersistenceCreateTaskQueueScope, caller, latency, retErr) }() return p.persistence.CreateTaskQueue(ctx, request) @@ -631,7 +631,7 @@ func (p *taskPersistenceClient) UpdateTaskQueue( startTime := time.Now().UTC() defer func() { latency := time.Since(startTime) - p.healthSignals.Record(CallerSegmentMissing, latency, retErr) + p.healthSignals.Record(CallerSegmentMissing, caller, latency, retErr) p.recordRequestMetrics(metrics.PersistenceUpdateTaskQueueScope, caller, latency, retErr) }() return p.persistence.UpdateTaskQueue(ctx, request) @@ -645,7 +645,7 @@ func (p *taskPersistenceClient) GetTaskQueue( startTime := time.Now().UTC() defer func() { latency := time.Since(startTime) - p.healthSignals.Record(CallerSegmentMissing, latency, retErr) + p.healthSignals.Record(CallerSegmentMissing, caller, latency, retErr) p.recordRequestMetrics(metrics.PersistenceGetTaskQueueScope, caller, latency, retErr) }() return p.persistence.GetTaskQueue(ctx, request) @@ -659,7 +659,7 @@ func (p *taskPersistenceClient) ListTaskQueue( startTime := time.Now().UTC() defer func() { latency := time.Since(startTime) - p.healthSignals.Record(CallerSegmentMissing, latency, retErr) + p.healthSignals.Record(CallerSegmentMissing, caller, latency, retErr) p.recordRequestMetrics(metrics.PersistenceListTaskQueueScope, caller, latency, retErr) }() return p.persistence.ListTaskQueue(ctx, request) @@ -673,7 +673,7 @@ func (p *taskPersistenceClient) DeleteTaskQueue( startTime := time.Now().UTC() defer func() { latency := time.Since(startTime) - p.healthSignals.Record(CallerSegmentMissing, latency, retErr) + p.healthSignals.Record(CallerSegmentMissing, caller, latency, retErr) p.recordRequestMetrics(metrics.PersistenceDeleteTaskQueueScope, caller, latency, retErr) }() return p.persistence.DeleteTaskQueue(ctx, request) @@ -687,7 +687,7 @@ func (p *taskPersistenceClient) GetTaskQueueUserData( startTime := time.Now().UTC() defer func() { latency := time.Since(startTime) - p.healthSignals.Record(CallerSegmentMissing, latency, retErr) + p.healthSignals.Record(CallerSegmentMissing, caller, latency, retErr) p.recordRequestMetrics(metrics.PersistenceGetTaskQueueUserDataScope, caller, latency, retErr) }() return p.persistence.GetTaskQueueUserData(ctx, request) @@ -701,7 +701,7 @@ func (p *taskPersistenceClient) UpdateTaskQueueUserData( startTime := time.Now().UTC() defer func() { latency := time.Since(startTime) - p.healthSignals.Record(CallerSegmentMissing, latency, retErr) + p.healthSignals.Record(CallerSegmentMissing, caller, latency, retErr) p.recordRequestMetrics(metrics.PersistenceUpdateTaskQueueUserDataScope, caller, latency, retErr) }() return p.persistence.UpdateTaskQueueUserData(ctx, request) @@ -715,7 +715,7 @@ func (p *taskPersistenceClient) ListTaskQueueUserDataEntries( startTime := time.Now().UTC() defer func() { latency := time.Since(startTime) - p.healthSignals.Record(CallerSegmentMissing, latency, retErr) + p.healthSignals.Record(CallerSegmentMissing, caller, latency, retErr) p.recordRequestMetrics(metrics.PersistenceListTaskQueueUserDataEntriesScope, caller, latency, retErr) }() return p.persistence.ListTaskQueueUserDataEntries(ctx, request) @@ -726,7 +726,7 @@ func (p *taskPersistenceClient) GetTaskQueuesByBuildId(ctx context.Context, requ startTime := time.Now().UTC() defer func() { latency := time.Since(startTime) - p.healthSignals.Record(CallerSegmentMissing, latency, retErr) + p.healthSignals.Record(CallerSegmentMissing, caller, latency, retErr) p.recordRequestMetrics(metrics.PersistenceGetTaskQueuesByBuildIdScope, caller, latency, retErr) }() return p.persistence.GetTaskQueuesByBuildId(ctx, request) @@ -737,7 +737,7 @@ func (p *taskPersistenceClient) CountTaskQueuesByBuildId(ctx context.Context, re startTime := time.Now().UTC() defer func() { latency := time.Since(startTime) - p.healthSignals.Record(CallerSegmentMissing, latency, retErr) + p.healthSignals.Record(CallerSegmentMissing, caller, latency, retErr) p.recordRequestMetrics(metrics.PersistenceCountTaskQueuesByBuildIdScope, caller, latency, retErr) }() return p.persistence.CountTaskQueuesByBuildId(ctx, request) @@ -759,7 +759,7 @@ func (p *metadataPersistenceClient) CreateNamespace( startTime := time.Now().UTC() defer func() { latency := time.Since(startTime) - p.healthSignals.Record(CallerSegmentMissing, latency, retErr) + p.healthSignals.Record(CallerSegmentMissing, caller, latency, retErr) p.recordRequestMetrics(metrics.PersistenceCreateNamespaceScope, caller, latency, retErr) }() return p.persistence.CreateNamespace(ctx, request) @@ -773,7 +773,7 @@ func (p *metadataPersistenceClient) GetNamespace( startTime := time.Now().UTC() defer func() { latency := time.Since(startTime) - p.healthSignals.Record(CallerSegmentMissing, latency, retErr) + p.healthSignals.Record(CallerSegmentMissing, caller, latency, retErr) p.recordRequestMetrics(metrics.PersistenceGetNamespaceScope, caller, latency, retErr) }() return p.persistence.GetNamespace(ctx, request) @@ -787,7 +787,7 @@ func (p *metadataPersistenceClient) UpdateNamespace( startTime := time.Now().UTC() defer func() { latency := time.Since(startTime) - p.healthSignals.Record(CallerSegmentMissing, latency, retErr) + p.healthSignals.Record(CallerSegmentMissing, caller, latency, retErr) p.recordRequestMetrics(metrics.PersistenceUpdateNamespaceScope, caller, latency, retErr) }() return p.persistence.UpdateNamespace(ctx, request) @@ -801,7 +801,7 @@ func (p *metadataPersistenceClient) RenameNamespace( startTime := time.Now().UTC() defer func() { latency := time.Since(startTime) - p.healthSignals.Record(CallerSegmentMissing, latency, retErr) + p.healthSignals.Record(CallerSegmentMissing, caller, latency, retErr) p.recordRequestMetrics(metrics.PersistenceRenameNamespaceScope, caller, latency, retErr) }() return p.persistence.RenameNamespace(ctx, request) @@ -815,7 +815,7 @@ func (p *metadataPersistenceClient) DeleteNamespace( startTime := time.Now().UTC() defer func() { latency := time.Since(startTime) - p.healthSignals.Record(CallerSegmentMissing, latency, retErr) + p.healthSignals.Record(CallerSegmentMissing, caller, latency, retErr) p.recordRequestMetrics(metrics.PersistenceDeleteNamespaceScope, caller, latency, retErr) }() return p.persistence.DeleteNamespace(ctx, request) @@ -829,7 +829,7 @@ func (p *metadataPersistenceClient) DeleteNamespaceByName( startTime := time.Now().UTC() defer func() { latency := time.Since(startTime) - p.healthSignals.Record(CallerSegmentMissing, latency, retErr) + p.healthSignals.Record(CallerSegmentMissing, caller, latency, retErr) p.recordRequestMetrics(metrics.PersistenceDeleteNamespaceByNameScope, caller, latency, retErr) }() return p.persistence.DeleteNamespaceByName(ctx, request) @@ -843,7 +843,7 @@ func (p *metadataPersistenceClient) ListNamespaces( startTime := time.Now().UTC() defer func() { latency := time.Since(startTime) - p.healthSignals.Record(CallerSegmentMissing, latency, retErr) + p.healthSignals.Record(CallerSegmentMissing, caller, latency, retErr) p.recordRequestMetrics(metrics.PersistenceListNamespacesScope, caller, latency, retErr) }() return p.persistence.ListNamespaces(ctx, request) @@ -856,7 +856,7 @@ func (p *metadataPersistenceClient) GetMetadata( startTime := time.Now().UTC() defer func() { latency := time.Since(startTime) - p.healthSignals.Record(CallerSegmentMissing, latency, retErr) + p.healthSignals.Record(CallerSegmentMissing, caller, latency, retErr) p.recordRequestMetrics(metrics.PersistenceGetMetadataScope, caller, latency, retErr) }() return p.persistence.GetMetadata(ctx) @@ -875,7 +875,7 @@ func (p *executionPersistenceClient) AppendHistoryNodes( startTime := time.Now().UTC() defer func() { latency := time.Since(startTime) - p.healthSignals.Record(CallerSegmentMissing, latency, retErr) + p.healthSignals.Record(CallerSegmentMissing, caller, latency, retErr) p.recordRequestMetrics(metrics.PersistenceAppendHistoryNodesScope, caller, latency, retErr) }() return p.persistence.AppendHistoryNodes(ctx, request) @@ -890,7 +890,7 @@ func (p *executionPersistenceClient) AppendRawHistoryNodes( startTime := time.Now().UTC() defer func() { latency := time.Since(startTime) - p.healthSignals.Record(CallerSegmentMissing, latency, retErr) + p.healthSignals.Record(CallerSegmentMissing, caller, latency, retErr) p.recordRequestMetrics(metrics.PersistenceAppendRawHistoryNodesScope, caller, latency, retErr) }() return p.persistence.AppendRawHistoryNodes(ctx, request) @@ -905,7 +905,7 @@ func (p *executionPersistenceClient) ReadHistoryBranch( startTime := time.Now().UTC() defer func() { latency := time.Since(startTime) - p.healthSignals.Record(CallerSegmentMissing, latency, retErr) + p.healthSignals.Record(CallerSegmentMissing, caller, latency, retErr) p.recordRequestMetrics(metrics.PersistenceReadHistoryBranchScope, caller, latency, retErr) }() return p.persistence.ReadHistoryBranch(ctx, request) @@ -919,7 +919,7 @@ func (p *executionPersistenceClient) ReadHistoryBranchReverse( startTime := time.Now().UTC() defer func() { latency := time.Since(startTime) - p.healthSignals.Record(CallerSegmentMissing, latency, retErr) + p.healthSignals.Record(CallerSegmentMissing, caller, latency, retErr) p.recordRequestMetrics(metrics.PersistenceReadHistoryBranchReverseScope, caller, latency, retErr) }() return p.persistence.ReadHistoryBranchReverse(ctx, request) @@ -934,7 +934,7 @@ func (p *executionPersistenceClient) ReadHistoryBranchByBatch( startTime := time.Now().UTC() defer func() { latency := time.Since(startTime) - p.healthSignals.Record(CallerSegmentMissing, latency, retErr) + p.healthSignals.Record(CallerSegmentMissing, caller, latency, retErr) p.recordRequestMetrics(metrics.PersistenceReadHistoryBranchScope, caller, latency, retErr) }() return p.persistence.ReadHistoryBranchByBatch(ctx, request) @@ -949,7 +949,7 @@ func (p *executionPersistenceClient) ReadRawHistoryBranch( startTime := time.Now().UTC() defer func() { latency := time.Since(startTime) - p.healthSignals.Record(CallerSegmentMissing, latency, retErr) + p.healthSignals.Record(CallerSegmentMissing, caller, latency, retErr) p.recordRequestMetrics(metrics.PersistenceReadRawHistoryBranchScope, caller, latency, retErr) }() return p.persistence.ReadRawHistoryBranch(ctx, request) @@ -964,7 +964,7 @@ func (p *executionPersistenceClient) ForkHistoryBranch( startTime := time.Now().UTC() defer func() { latency := time.Since(startTime) - p.healthSignals.Record(CallerSegmentMissing, latency, retErr) + p.healthSignals.Record(CallerSegmentMissing, caller, latency, retErr) p.recordRequestMetrics(metrics.PersistenceForkHistoryBranchScope, caller, latency, retErr) }() return p.persistence.ForkHistoryBranch(ctx, request) @@ -979,7 +979,7 @@ func (p *executionPersistenceClient) DeleteHistoryBranch( startTime := time.Now().UTC() defer func() { latency := time.Since(startTime) - p.healthSignals.Record(CallerSegmentMissing, latency, retErr) + p.healthSignals.Record(CallerSegmentMissing, caller, latency, retErr) p.recordRequestMetrics(metrics.PersistenceDeleteHistoryBranchScope, caller, latency, retErr) }() return p.persistence.DeleteHistoryBranch(ctx, request) @@ -994,7 +994,7 @@ func (p *executionPersistenceClient) TrimHistoryBranch( startTime := time.Now().UTC() defer func() { latency := time.Since(startTime) - p.healthSignals.Record(CallerSegmentMissing, latency, retErr) + p.healthSignals.Record(CallerSegmentMissing, caller, latency, retErr) p.recordRequestMetrics(metrics.PersistenceTrimHistoryBranchScope, caller, latency, retErr) }() return p.persistence.TrimHistoryBranch(ctx, request) @@ -1008,7 +1008,7 @@ func (p *executionPersistenceClient) GetAllHistoryTreeBranches( startTime := time.Now().UTC() defer func() { latency := time.Since(startTime) - p.healthSignals.Record(CallerSegmentMissing, latency, retErr) + p.healthSignals.Record(CallerSegmentMissing, caller, latency, retErr) p.recordRequestMetrics(metrics.PersistenceGetAllHistoryTreeBranchesScope, caller, latency, retErr) }() return p.persistence.GetAllHistoryTreeBranches(ctx, request) @@ -1023,7 +1023,7 @@ func (p *executionPersistenceClient) GetHistoryTree( startTime := time.Now().UTC() defer func() { latency := time.Since(startTime) - p.healthSignals.Record(CallerSegmentMissing, latency, retErr) + p.healthSignals.Record(CallerSegmentMissing, caller, latency, retErr) p.recordRequestMetrics(metrics.PersistenceGetHistoryTreeScope, caller, latency, retErr) }() return p.persistence.GetHistoryTree(ctx, request) @@ -1044,7 +1044,7 @@ func (p *queuePersistenceClient) EnqueueMessage( startTime := time.Now().UTC() defer func() { latency := time.Since(startTime) - p.healthSignals.Record(CallerSegmentMissing, latency, retErr) + p.healthSignals.Record(CallerSegmentMissing, caller, latency, retErr) p.recordRequestMetrics(metrics.PersistenceEnqueueMessageScope, caller, latency, retErr) }() return p.persistence.EnqueueMessage(ctx, blob) @@ -1059,7 +1059,7 @@ func (p *queuePersistenceClient) ReadMessages( startTime := time.Now().UTC() defer func() { latency := time.Since(startTime) - p.healthSignals.Record(CallerSegmentMissing, latency, retErr) + p.healthSignals.Record(CallerSegmentMissing, caller, latency, retErr) p.recordRequestMetrics(metrics.PersistenceReadQueueMessagesScope, caller, latency, retErr) }() return p.persistence.ReadMessages(ctx, lastMessageID, maxCount) @@ -1073,7 +1073,7 @@ func (p *queuePersistenceClient) UpdateAckLevel( startTime := time.Now().UTC() defer func() { latency := time.Since(startTime) - p.healthSignals.Record(CallerSegmentMissing, latency, retErr) + p.healthSignals.Record(CallerSegmentMissing, caller, latency, retErr) p.recordRequestMetrics(metrics.PersistenceUpdateAckLevelScope, caller, latency, retErr) }() return p.persistence.UpdateAckLevel(ctx, metadata) @@ -1086,7 +1086,7 @@ func (p *queuePersistenceClient) GetAckLevels( startTime := time.Now().UTC() defer func() { latency := time.Since(startTime) - p.healthSignals.Record(CallerSegmentMissing, latency, retErr) + p.healthSignals.Record(CallerSegmentMissing, caller, latency, retErr) p.recordRequestMetrics(metrics.PersistenceGetAckLevelScope, caller, latency, retErr) }() return p.persistence.GetAckLevels(ctx) @@ -1100,7 +1100,7 @@ func (p *queuePersistenceClient) DeleteMessagesBefore( startTime := time.Now().UTC() defer func() { latency := time.Since(startTime) - p.healthSignals.Record(CallerSegmentMissing, latency, retErr) + p.healthSignals.Record(CallerSegmentMissing, caller, latency, retErr) p.recordRequestMetrics(metrics.PersistenceDeleteMessagesBeforeScope, caller, latency, retErr) }() return p.persistence.DeleteMessagesBefore(ctx, messageID) @@ -1114,7 +1114,7 @@ func (p *queuePersistenceClient) EnqueueMessageToDLQ( startTime := time.Now().UTC() defer func() { latency := time.Since(startTime) - p.healthSignals.Record(CallerSegmentMissing, latency, retErr) + p.healthSignals.Record(CallerSegmentMissing, caller, latency, retErr) p.recordRequestMetrics(metrics.PersistenceEnqueueMessageToDLQScope, caller, latency, retErr) }() return p.persistence.EnqueueMessageToDLQ(ctx, blob) @@ -1131,7 +1131,7 @@ func (p *queuePersistenceClient) ReadMessagesFromDLQ( startTime := time.Now().UTC() defer func() { latency := time.Since(startTime) - p.healthSignals.Record(CallerSegmentMissing, latency, retErr) + p.healthSignals.Record(CallerSegmentMissing, caller, latency, retErr) p.recordRequestMetrics(metrics.PersistenceReadMessagesFromDLQScope, caller, latency, retErr) }() return p.persistence.ReadMessagesFromDLQ(ctx, firstMessageID, lastMessageID, pageSize, pageToken) @@ -1145,7 +1145,7 @@ func (p *queuePersistenceClient) DeleteMessageFromDLQ( startTime := time.Now().UTC() defer func() { latency := time.Since(startTime) - p.healthSignals.Record(CallerSegmentMissing, latency, retErr) + p.healthSignals.Record(CallerSegmentMissing, caller, latency, retErr) p.recordRequestMetrics(metrics.PersistenceDeleteMessageFromDLQScope, caller, latency, retErr) }() return p.persistence.DeleteMessageFromDLQ(ctx, messageID) @@ -1160,7 +1160,7 @@ func (p *queuePersistenceClient) RangeDeleteMessagesFromDLQ( startTime := time.Now().UTC() defer func() { latency := time.Since(startTime) - p.healthSignals.Record(CallerSegmentMissing, latency, retErr) + p.healthSignals.Record(CallerSegmentMissing, caller, latency, retErr) p.recordRequestMetrics(metrics.PersistenceRangeDeleteMessagesFromDLQScope, caller, latency, retErr) }() return p.persistence.RangeDeleteMessagesFromDLQ(ctx, firstMessageID, lastMessageID) @@ -1174,7 +1174,7 @@ func (p *queuePersistenceClient) UpdateDLQAckLevel( startTime := time.Now().UTC() defer func() { latency := time.Since(startTime) - p.healthSignals.Record(CallerSegmentMissing, latency, retErr) + p.healthSignals.Record(CallerSegmentMissing, caller, latency, retErr) p.recordRequestMetrics(metrics.PersistenceUpdateDLQAckLevelScope, caller, latency, retErr) }() return p.persistence.UpdateDLQAckLevel(ctx, metadata) @@ -1187,7 +1187,7 @@ func (p *queuePersistenceClient) GetDLQAckLevels( startTime := time.Now().UTC() defer func() { latency := time.Since(startTime) - p.healthSignals.Record(CallerSegmentMissing, latency, retErr) + p.healthSignals.Record(CallerSegmentMissing, caller, latency, retErr) p.recordRequestMetrics(metrics.PersistenceGetDLQAckLevelScope, caller, latency, retErr) }() return p.persistence.GetDLQAckLevels(ctx) @@ -1209,7 +1209,7 @@ func (p *clusterMetadataPersistenceClient) ListClusterMetadata( startTime := time.Now().UTC() defer func() { latency := time.Since(startTime) - p.healthSignals.Record(CallerSegmentMissing, latency, retErr) + p.healthSignals.Record(CallerSegmentMissing, caller, latency, retErr) p.recordRequestMetrics(metrics.PersistenceListClusterMetadataScope, caller, latency, retErr) }() return p.persistence.ListClusterMetadata(ctx, request) @@ -1222,7 +1222,7 @@ func (p *clusterMetadataPersistenceClient) GetCurrentClusterMetadata( startTime := time.Now().UTC() defer func() { latency := time.Since(startTime) - p.healthSignals.Record(CallerSegmentMissing, latency, retErr) + p.healthSignals.Record(CallerSegmentMissing, caller, latency, retErr) p.recordRequestMetrics(metrics.PersistenceGetCurrentClusterMetadataScope, caller, latency, retErr) }() return p.persistence.GetCurrentClusterMetadata(ctx) @@ -1236,7 +1236,7 @@ func (p *clusterMetadataPersistenceClient) GetClusterMetadata( startTime := time.Now().UTC() defer func() { latency := time.Since(startTime) - p.healthSignals.Record(CallerSegmentMissing, latency, retErr) + p.healthSignals.Record(CallerSegmentMissing, caller, latency, retErr) p.recordRequestMetrics(metrics.PersistenceGetClusterMetadataScope, caller, latency, retErr) }() return p.persistence.GetClusterMetadata(ctx, request) @@ -1250,7 +1250,7 @@ func (p *clusterMetadataPersistenceClient) SaveClusterMetadata( startTime := time.Now().UTC() defer func() { latency := time.Since(startTime) - p.healthSignals.Record(CallerSegmentMissing, latency, retErr) + p.healthSignals.Record(CallerSegmentMissing, caller, latency, retErr) p.recordRequestMetrics(metrics.PersistenceSaveClusterMetadataScope, caller, latency, retErr) }() return p.persistence.SaveClusterMetadata(ctx, request) @@ -1264,7 +1264,7 @@ func (p *clusterMetadataPersistenceClient) DeleteClusterMetadata( startTime := time.Now().UTC() defer func() { latency := time.Since(startTime) - p.healthSignals.Record(CallerSegmentMissing, latency, retErr) + p.healthSignals.Record(CallerSegmentMissing, caller, latency, retErr) p.recordRequestMetrics(metrics.PersistenceDeleteClusterMetadataScope, caller, latency, retErr) }() return p.persistence.DeleteClusterMetadata(ctx, request) @@ -1282,7 +1282,7 @@ func (p *clusterMetadataPersistenceClient) GetClusterMembers( startTime := time.Now().UTC() defer func() { latency := time.Since(startTime) - p.healthSignals.Record(CallerSegmentMissing, latency, retErr) + p.healthSignals.Record(CallerSegmentMissing, caller, latency, retErr) p.recordRequestMetrics(metrics.PersistenceGetClusterMembersScope, caller, latency, retErr) }() return p.persistence.GetClusterMembers(ctx, request) @@ -1296,7 +1296,7 @@ func (p *clusterMetadataPersistenceClient) UpsertClusterMembership( startTime := time.Now().UTC() defer func() { latency := time.Since(startTime) - p.healthSignals.Record(CallerSegmentMissing, latency, retErr) + p.healthSignals.Record(CallerSegmentMissing, caller, latency, retErr) p.recordRequestMetrics(metrics.PersistenceUpsertClusterMembershipScope, caller, latency, retErr) }() return p.persistence.UpsertClusterMembership(ctx, request) @@ -1310,7 +1310,7 @@ func (p *clusterMetadataPersistenceClient) PruneClusterMembership( startTime := time.Now().UTC() defer func() { latency := time.Since(startTime) - p.healthSignals.Record(CallerSegmentMissing, latency, retErr) + p.healthSignals.Record(CallerSegmentMissing, caller, latency, retErr) p.recordRequestMetrics(metrics.PersistencePruneClusterMembershipScope, caller, latency, retErr) }() return p.persistence.PruneClusterMembership(ctx, request) @@ -1324,7 +1324,7 @@ func (p *metadataPersistenceClient) InitializeSystemNamespaces( startTime := time.Now().UTC() defer func() { latency := time.Since(startTime) - p.healthSignals.Record(CallerSegmentMissing, latency, retErr) + p.healthSignals.Record(CallerSegmentMissing, caller, latency, retErr) p.recordRequestMetrics(metrics.PersistenceInitializeSystemNamespaceScope, caller, latency, retErr) }() return p.persistence.InitializeSystemNamespaces(ctx, currentClusterName)