Skip to content

Commit

Permalink
Added internal count metrics in addition to the external counts that
Browse files Browse the repository at this point in the history
are already there
  • Loading branch information
jakobht committed Jun 14, 2024
1 parent ceff8ca commit c40121a
Show file tree
Hide file tree
Showing 4 changed files with 19 additions and 13 deletions.
2 changes: 2 additions & 0 deletions common/metrics/defs.go
Original file line number Diff line number Diff line change
Expand Up @@ -2476,6 +2476,7 @@ const (
WorkflowIDCacheSizeGauge
WorkflowIDCacheRequestsExternalRatelimitedCounter
WorkflowIDCacheRequestsExternalMaxRequestsPerSecondsTimer
WorkflowIDCacheRequestsInternalMaxRequestsPerSecondsTimer
WorkflowIDCacheRequestsInternalRatelimitedCounter
NumHistoryMetrics
)
Expand Down Expand Up @@ -3115,6 +3116,7 @@ var MetricDefs = map[ServiceIdx]map[int]metricDefinition{
WorkflowIDCacheSizeGauge: {metricName: "workflow_id_cache_size", metricType: Gauge},
WorkflowIDCacheRequestsExternalRatelimitedCounter: {metricName: "workflow_id_external_requests_ratelimited", metricType: Counter},
WorkflowIDCacheRequestsExternalMaxRequestsPerSecondsTimer: {metricName: "workflow_id_external_requests_max_requests_per_seconds", metricType: Timer},
WorkflowIDCacheRequestsInternalMaxRequestsPerSecondsTimer: {metricName: "workflow_id_internal_requests_max_requests_per_seconds", metricType: Timer},
WorkflowIDCacheRequestsInternalRatelimitedCounter: {metricName: "workflow_id_internal_requests_ratelimited", metricType: Counter},
},
Matching: {
Expand Down
6 changes: 4 additions & 2 deletions service/history/workflowcache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,8 @@ type cacheKey struct {
type cacheValue struct {
externalRateLimiter quotas.Limiter
internalRateLimiter quotas.Limiter
countMetric workflowIDCountMetric
externalCountMetric workflowIDCountMetric
internalCountMetric workflowIDCountMetric
}

// Params is the parameters for a new WFCache
Expand Down Expand Up @@ -144,13 +145,14 @@ func (c *wfCache) allow(domainID string, workflowID string, rateLimitType rateLi

switch rateLimitType {
case external:
value.countMetric.updatePerDomainMaxWFRequestCount(domainName, c.timeSource, c.metricsClient)
value.externalCountMetric.updatePerDomainMaxWFRequestCount(domainName, c.timeSource, c.metricsClient, metrics.WorkflowIDCacheRequestsExternalMaxRequestsPerSecondsTimer)
if !value.externalRateLimiter.Allow() {
c.emitRateLimitMetrics(domainID, workflowID, domainName, "external", metrics.WorkflowIDCacheRequestsExternalRatelimitedCounter)
return false
}
return true
case internal:
value.internalCountMetric.updatePerDomainMaxWFRequestCount(domainName, c.timeSource, c.metricsClient, metrics.WorkflowIDCacheRequestsInternalMaxRequestsPerSecondsTimer)
if !value.internalRateLimiter.Allow() {
c.emitRateLimitMetrics(domainID, workflowID, domainName, "internal", metrics.WorkflowIDCacheRequestsInternalRatelimitedCounter)
return false
Expand Down
3 changes: 2 additions & 1 deletion service/history/workflowcache/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ func (cm *workflowIDCountMetric) updatePerDomainMaxWFRequestCount(
domainName string,
timeSource clock.TimeSource,
metricsClient metrics.Client,
metric int,
) {
cm.Lock()
defer cm.Unlock()
Expand All @@ -61,5 +62,5 @@ func (cm *workflowIDCountMetric) updatePerDomainMaxWFRequestCount(

// We can just use the upper of the metric, so it is not an issue to emit all the counts
metricsClient.Scope(metrics.HistoryClientWfIDCacheScope, metrics.DomainTag(domainName)).
RecordTimer(metrics.WorkflowIDCacheRequestsExternalMaxRequestsPerSecondsTimer, time.Duration(cm.count))
RecordTimer(metric, time.Duration(cm.count))
}
21 changes: 11 additions & 10 deletions service/history/workflowcache/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (

func TestUpdatePerDomainMaxWFRequestCount(t *testing.T) {
domainName := "some domain name"
metric := metrics.WorkflowIDCacheRequestsInternalMaxRequestsPerSecondsTimer

cases := []struct {
name string
Expand All @@ -45,23 +46,23 @@ func TestUpdatePerDomainMaxWFRequestCount(t *testing.T) {
name: "Single workflowID",
updatePerDomainMaxWFRequestCount: func(metricsClient metrics.Client, timeSource clock.MockedTimeSource) {
workflowID1 := &workflowIDCountMetric{}
workflowID1.updatePerDomainMaxWFRequestCount(domainName, timeSource, metricsClient) // Emits 1
workflowID1.updatePerDomainMaxWFRequestCount(domainName, timeSource, metricsClient) // Emits 2
workflowID1.updatePerDomainMaxWFRequestCount(domainName, timeSource, metricsClient, metric) // Emits 1
workflowID1.updatePerDomainMaxWFRequestCount(domainName, timeSource, metricsClient, metric) // Emits 2
},
expecetMetrics: []time.Duration{1, 2},
},
{
name: "Separate workflowIDs",
updatePerDomainMaxWFRequestCount: func(metricsClient metrics.Client, timeSource clock.MockedTimeSource) {
workflowID1 := &workflowIDCountMetric{}
workflowID1.updatePerDomainMaxWFRequestCount(domainName, timeSource, metricsClient) // Emits 1
workflowID1.updatePerDomainMaxWFRequestCount(domainName, timeSource, metricsClient, metric) // Emits 1

workflowID2 := &workflowIDCountMetric{}
workflowID2.updatePerDomainMaxWFRequestCount(domainName, timeSource, metricsClient) // Emits 1
workflowID2.updatePerDomainMaxWFRequestCount(domainName, timeSource, metricsClient) // Emits 2
workflowID2.updatePerDomainMaxWFRequestCount(domainName, timeSource, metricsClient) // Emits 3
workflowID2.updatePerDomainMaxWFRequestCount(domainName, timeSource, metricsClient, metric) // Emits 1
workflowID2.updatePerDomainMaxWFRequestCount(domainName, timeSource, metricsClient, metric) // Emits 2
workflowID2.updatePerDomainMaxWFRequestCount(domainName, timeSource, metricsClient, metric) // Emits 3

workflowID1.updatePerDomainMaxWFRequestCount(domainName, timeSource, metricsClient) // Emits 2
workflowID1.updatePerDomainMaxWFRequestCount(domainName, timeSource, metricsClient, metric) // Emits 2

},
expecetMetrics: []time.Duration{1, 1, 2, 3, 2},
Expand All @@ -70,11 +71,11 @@ func TestUpdatePerDomainMaxWFRequestCount(t *testing.T) {
name: "Reset",
updatePerDomainMaxWFRequestCount: func(metricsClient metrics.Client, timeSource clock.MockedTimeSource) {
workflowID1 := &workflowIDCountMetric{}
workflowID1.updatePerDomainMaxWFRequestCount(domainName, timeSource, metricsClient) // Emits 1
workflowID1.updatePerDomainMaxWFRequestCount(domainName, timeSource, metricsClient) // Emits 2
workflowID1.updatePerDomainMaxWFRequestCount(domainName, timeSource, metricsClient, metric) // Emits 1
workflowID1.updatePerDomainMaxWFRequestCount(domainName, timeSource, metricsClient, metric) // Emits 2

timeSource.Advance(1100 * time.Millisecond)
workflowID1.updatePerDomainMaxWFRequestCount(domainName, timeSource, metricsClient) // Emits 1
workflowID1.updatePerDomainMaxWFRequestCount(domainName, timeSource, metricsClient, metric) // Emits 1
},
expecetMetrics: []time.Duration{1, 2, 1},
},
Expand Down

0 comments on commit c40121a

Please sign in to comment.