From 492976a7093d08da0530ad0b92b1ede3e6de70b1 Mon Sep 17 00:00:00 2001 From: Yuhao Zhang Date: Thu, 9 Apr 2026 11:14:13 +0800 Subject: [PATCH 1/5] client/resource_group: cache request source RU metrics Signed-off-by: Yuhao Zhang --- .../controller/global_controller.go | 2 + .../controller/group_controller.go | 59 ++++++++ .../controller/metrics/metrics.go | 12 ++ client/resource_group/controller/model.go | 1 + .../controller/request_source_metrics_test.go | 137 ++++++++++++++++++ client/resource_group/controller/testutil.go | 16 +- 6 files changed, 222 insertions(+), 5 deletions(-) create mode 100644 client/resource_group/controller/request_source_metrics_test.go diff --git a/client/resource_group/controller/global_controller.go b/client/resource_group/controller/global_controller.go index a5e88a090a4..738a7068e48 100644 --- a/client/resource_group/controller/global_controller.go +++ b/client/resource_group/controller/global_controller.go @@ -337,6 +337,7 @@ func (c *ResourceGroupsController) Start(ctx context.Context) { /* channels */ case <-c.loopCtx.Done(): metrics.ResourceGroupStatusGauge.Reset() + metrics.RequestSourceRUCounter.Reset() return case <-c.responseDeadlineCh: c.run.inDegradedMode.Store(true) @@ -583,6 +584,7 @@ func (c *ResourceGroupsController) cleanUpResourceGroup() { gc.mu.Unlock() if equalRU(latestConsumption, *gc.run.consumption) { if gc.inactive || gc.tombstone.Load() { + gc.metrics.cleanupRequestSourceMetrics(resourceGroupName) c.groupsController.Delete(resourceGroupName) metrics.ResourceGroupStatusGauge.DeleteLabelValues(resourceGroupName, resourceGroupName) return true diff --git a/client/resource_group/controller/group_controller.go b/client/resource_group/controller/group_controller.go index 86628e397ad..677fe6dbcf4 100644 --- a/client/resource_group/controller/group_controller.go +++ b/client/resource_group/controller/group_controller.go @@ -106,6 +106,13 @@ type groupMetricsCollection struct { tokenRequestCounter prometheus.Counter runningKVRequestCounter prometheus.Gauge consumeTokenHistogram prometheus.Observer + sourceMetricsMu sync.RWMutex + sourceMetrics map[string]*requestSourceMetrics +} + +type requestSourceMetrics struct { + rru prometheus.Counter + wru prometheus.Counter } func initMetrics(oldName, name string) *groupMetricsCollection { @@ -122,6 +129,52 @@ func initMetrics(oldName, name string) *groupMetricsCollection { tokenRequestCounter: metrics.ResourceGroupTokenRequestCounter.WithLabelValues(oldName, name), runningKVRequestCounter: metrics.GroupRunningKVRequestCounter.WithLabelValues(name), consumeTokenHistogram: metrics.TokenConsumedHistogram.WithLabelValues(name), + sourceMetrics: make(map[string]*requestSourceMetrics), + } +} + +func (mc *groupMetricsCollection) getOrCreateRequestSourceMetrics(resourceGroupName, requestSource string) *requestSourceMetrics { + mc.sourceMetricsMu.RLock() + sourceMetrics, ok := mc.sourceMetrics[requestSource] + mc.sourceMetricsMu.RUnlock() + if ok { + return sourceMetrics + } + + mc.sourceMetricsMu.Lock() + defer mc.sourceMetricsMu.Unlock() + sourceMetrics, ok = mc.sourceMetrics[requestSource] + if ok { + return sourceMetrics + } + sourceMetrics = &requestSourceMetrics{ + rru: metrics.RequestSourceRUCounter.WithLabelValues(resourceGroupName, requestSource, "rru"), + wru: metrics.RequestSourceRUCounter.WithLabelValues(resourceGroupName, requestSource, "wru"), + } + mc.sourceMetrics[requestSource] = sourceMetrics + return sourceMetrics +} + +func (mc *groupMetricsCollection) addRequestSourceRU(resourceGroupName, requestSource string, consumption *rmpb.Consumption) { + if consumption == nil { + return + } + sourceMetrics := mc.getOrCreateRequestSourceMetrics(resourceGroupName, requestSource) + if consumption.RRU > 0 { + sourceMetrics.rru.Add(consumption.RRU) + } + if consumption.WRU > 0 { + sourceMetrics.wru.Add(consumption.WRU) + } +} + +func (mc *groupMetricsCollection) cleanupRequestSourceMetrics(resourceGroupName string) { + mc.sourceMetricsMu.Lock() + defer mc.sourceMetricsMu.Unlock() + for requestSource := range mc.sourceMetrics { + metrics.RequestSourceRUCounter.DeleteLabelValues(resourceGroupName, requestSource, "rru") + metrics.RequestSourceRUCounter.DeleteLabelValues(resourceGroupName, requestSource, "wru") + delete(mc.sourceMetrics, requestSource) } } @@ -577,6 +630,8 @@ func (gc *groupCostController) onRequestWaitImpl( waitDuration += d } + gc.metrics.addRequestSourceRU(gc.name, info.RequestSource(), delta) + gc.mu.Lock() // Calculate the penalty of the store penalty = &rmpb.Consumption{} @@ -622,6 +677,8 @@ func (gc *groupCostController) onResponseImpl( add(gc.mu.globalCounter, count) gc.mu.Unlock() + gc.metrics.addRequestSourceRU(gc.name, req.RequestSource(), delta) + return delta, nil } @@ -663,6 +720,8 @@ func (gc *groupCostController) onResponseWaitImpl( add(gc.mu.globalCounter, count) gc.mu.Unlock() + gc.metrics.addRequestSourceRU(gc.name, req.RequestSource(), delta) + return delta, waitDuration, nil } diff --git a/client/resource_group/controller/metrics/metrics.go b/client/resource_group/controller/metrics/metrics.go index 01231aceeaf..6826518378a 100644 --- a/client/resource_group/controller/metrics/metrics.go +++ b/client/resource_group/controller/metrics/metrics.go @@ -49,6 +49,8 @@ var ( LowTokenRequestNotifyCounter *prometheus.CounterVec // TokenConsumedHistogram comments placeholder TokenConsumedHistogram *prometheus.HistogramVec + // RequestSourceRUCounter comments placeholder + RequestSourceRUCounter *prometheus.CounterVec // FailedTokenRequestDuration comments placeholder, WithLabelValues is a heavy operation, define variable to avoid call it every time. FailedTokenRequestDuration prometheus.Observer // SuccessfulTokenRequestDuration comments placeholder, WithLabelValues is a heavy operation, define variable to avoid call it every time. @@ -153,6 +155,15 @@ func initMetrics(constLabels prometheus.Labels) { ConstLabels: constLabels, }, []string{newResourceGroupNameLabel}) + RequestSourceRUCounter = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: requestSubsystem, + Name: "ru_total", + Help: "Counter of request RU consumption grouped by resource group and request source.", + ConstLabels: constLabels, + }, []string{newResourceGroupNameLabel, "request_source", errType}) + // WithLabelValues is a heavy operation, define variable to avoid call it every time. FailedTokenRequestDuration = TokenRequestDuration.WithLabelValues("fail") SuccessfulTokenRequestDuration = TokenRequestDuration.WithLabelValues("success") @@ -171,4 +182,5 @@ func InitAndRegisterMetrics(constLabels prometheus.Labels) { prometheus.MustRegister(ResourceGroupTokenRequestCounter) prometheus.MustRegister(LowTokenRequestNotifyCounter) prometheus.MustRegister(TokenConsumedHistogram) + prometheus.MustRegister(RequestSourceRUCounter) } diff --git a/client/resource_group/controller/model.go b/client/resource_group/controller/model.go index 3fe11286d42..2f4c30887c6 100644 --- a/client/resource_group/controller/model.go +++ b/client/resource_group/controller/model.go @@ -52,6 +52,7 @@ type RequestInfo interface { StoreID() uint64 RequestSize() uint64 AccessLocationType() AccessLocationType + RequestSource() string } // ResponseInfo is the interface of the response information provider. A response should be diff --git a/client/resource_group/controller/request_source_metrics_test.go b/client/resource_group/controller/request_source_metrics_test.go new file mode 100644 index 00000000000..79ae0857b79 --- /dev/null +++ b/client/resource_group/controller/request_source_metrics_test.go @@ -0,0 +1,137 @@ +package controller + +import ( + "context" + "testing" + + "github.com/prometheus/client_golang/prometheus" + dto "github.com/prometheus/client_model/go" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" + + rmpb "github.com/pingcap/kvproto/pkg/resource_manager" + + controllerMetrics "github.com/tikv/pd/client/resource_group/controller/metrics" +) + +func counterValue(t *testing.T, metric prometheus.Counter) float64 { + t.Helper() + pb := &dto.Metric{} + require.NoError(t, metric.Write(pb)) + return pb.GetCounter().GetValue() +} + +func collectorMetricCount(collector prometheus.Collector) int { + ch := make(chan prometheus.Metric, 8) + go func() { + collector.Collect(ch) + close(ch) + }() + count := 0 + for range ch { + count++ + } + return count +} + +func TestRequestSourceMetricsCachedByResourceGroup(t *testing.T) { + re := require.New(t) + gc := createTestGroupCostController(re) + req := &TestRequestInfo{ + isWrite: true, + writeBytes: 100, + numReplicas: 1, + storeID: 1, + accessType: AccessUnknown, + requestSource: "internal_gc_test", + } + resp := &TestResponseInfo{ + readBytes: 128, + succeed: true, + } + + beforeCount := collectorMetricCount(controllerMetrics.RequestSourceRUCounter) + + reqConsumption, _, _, _, err := gc.onRequestWaitImpl(context.Background(), req) + re.NoError(err) + respConsumption, err := gc.onResponseImpl(req, resp) + re.NoError(err) + re.NotZero(reqConsumption.WRU) + re.NotZero(respConsumption.RRU) + + gc.metrics.sourceMetricsMu.RLock() + sourceMetrics := gc.metrics.sourceMetrics[req.requestSource] + cacheSize := len(gc.metrics.sourceMetrics) + gc.metrics.sourceMetricsMu.RUnlock() + + re.Equal(1, cacheSize) + re.NotNil(sourceMetrics) + re.Equal(reqConsumption.WRU, counterValue(t, sourceMetrics.wru)) + re.Equal(respConsumption.RRU, counterValue(t, sourceMetrics.rru)) + re.Equal(beforeCount+2, collectorMetricCount(controllerMetrics.RequestSourceRUCounter)) + + _, _, _, _, err = gc.onRequestWaitImpl(context.Background(), req) + re.NoError(err) + gc.metrics.sourceMetricsMu.RLock() + re.Equal(1, len(gc.metrics.sourceMetrics)) + re.Same(sourceMetrics, gc.metrics.sourceMetrics[req.requestSource]) + gc.metrics.sourceMetricsMu.RUnlock() + + controllerMetrics.RequestSourceRUCounter.DeleteLabelValues(gc.name, req.requestSource, "rru") + controllerMetrics.RequestSourceRUCounter.DeleteLabelValues(gc.name, req.requestSource, "wru") +} + +func TestCleanupResourceGroupRemovesRequestSourceMetrics(t *testing.T) { + re := require.New(t) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + mockProvider := newMockResourceGroupProvider() + controller, err := NewResourceGroupController(ctx, 1, mockProvider, nil, 0) + re.NoError(err) + + group := &rmpb.ResourceGroup{ + Name: "request-source-cleanup", + Mode: rmpb.GroupMode_RUMode, + RUSettings: &rmpb.GroupRequestUnitSettings{ + RU: &rmpb.TokenBucket{ + Settings: &rmpb.TokenLimitSettings{FillRate: 1000000}, + }, + }, + } + mockProvider.On("GetResourceGroup", mock.Anything, group.Name, mock.Anything).Return(group, nil) + + gc, err := controller.tryGetResourceGroupController(ctx, group.Name, false) + re.NoError(err) + + req := &TestRequestInfo{ + isWrite: true, + writeBytes: 64, + numReplicas: 1, + storeID: 1, + requestSource: "internal_gc_cleanup", + } + resp := &TestResponseInfo{readBytes: 64, succeed: true} + beforeCount := collectorMetricCount(controllerMetrics.RequestSourceRUCounter) + + _, _, _, _, err = gc.onRequestWaitImpl(context.Background(), req) + re.NoError(err) + _, err = gc.onResponseImpl(req, resp) + re.NoError(err) + + gc.mu.Lock() + *gc.run.consumption = *gc.mu.consumption + gc.mu.Unlock() + gc.inactive = true + + re.Greater(collectorMetricCount(controllerMetrics.RequestSourceRUCounter), beforeCount) + + controller.cleanUpResourceGroup() + + _, ok := controller.loadGroupController(group.Name) + re.False(ok) + gc.metrics.sourceMetricsMu.RLock() + re.Empty(gc.metrics.sourceMetrics) + gc.metrics.sourceMetricsMu.RUnlock() + re.Equal(beforeCount, collectorMetricCount(controllerMetrics.RequestSourceRUCounter)) +} diff --git a/client/resource_group/controller/testutil.go b/client/resource_group/controller/testutil.go index a055b32fa6f..d09f856f577 100644 --- a/client/resource_group/controller/testutil.go +++ b/client/resource_group/controller/testutil.go @@ -22,11 +22,12 @@ import "time" // TestRequestInfo is used to test the request info interface. type TestRequestInfo struct { - isWrite bool - writeBytes uint64 - numReplicas int64 - storeID uint64 - accessType AccessLocationType + isWrite bool + writeBytes uint64 + numReplicas int64 + storeID uint64 + accessType AccessLocationType + requestSource string } // NewTestRequestInfo creates a new TestRequestInfo. @@ -70,6 +71,11 @@ func (tri *TestRequestInfo) AccessLocationType() AccessLocationType { return tri.accessType } +// RequestSource implements the RequestInfo interface. +func (tri *TestRequestInfo) RequestSource() string { + return tri.requestSource +} + // TestResponseInfo is used to test the response info interface. type TestResponseInfo struct { readBytes uint64 From 3fe18d6faa9ede3d2c93beb8d11e42aacf56af67 Mon Sep 17 00:00:00 2001 From: Yuhao Zhang Date: Mon, 13 Apr 2026 16:03:58 +0800 Subject: [PATCH 2/5] client/resource_group: keep request-source metrics state per group Signed-off-by: Yuhao Zhang --- .../controller/global_controller.go | 56 +++- .../controller/group_controller.go | 94 ++++-- .../controller/group_controller_test.go | 4 +- .../controller/request_source_metrics_test.go | 283 +++++++++++++++++- 4 files changed, 387 insertions(+), 50 deletions(-) diff --git a/client/resource_group/controller/global_controller.go b/client/resource_group/controller/global_controller.go index 738a7068e48..355fd4ca9c2 100644 --- a/client/resource_group/controller/global_controller.go +++ b/client/resource_group/controller/global_controller.go @@ -142,11 +142,12 @@ var _ ResourceGroupKVInterceptor = (*ResourceGroupsController)(nil) // ResourceGroupsController implements ResourceGroupKVInterceptor. type ResourceGroupsController struct { - clientUniqueID uint64 - provider ResourceGroupProvider - groupsController sync.Map - ruConfig *RUConfig - keyspaceID uint32 + clientUniqueID uint64 + provider ResourceGroupProvider + groupsController sync.Map + requestSourceStates sync.Map + ruConfig *RUConfig + keyspaceID uint32 loopCtx context.Context loopCancel func() @@ -388,7 +389,13 @@ func (c *ResourceGroupsController) Start(ctx context.Context) { continue } // If the resource group is marked as tombstone before, re-create the resource group controller. - newGC, err := newGroupCostController(group, c.ruConfig, c.lowTokenNotifyChan, c.tokenBucketUpdateChan) + newGC, err := newGroupCostController( + group, + c.ruConfig, + c.lowTokenNotifyChan, + c.tokenBucketUpdateChan, + c.getOrCreateRequestSourceMetricsState(name), + ) if err != nil { log.Warn("[resource group controller] re-create resource group cost controller for tombstone failed", zap.String("name", name), zap.Error(err)) @@ -473,6 +480,22 @@ func (c *ResourceGroupsController) loadOrStoreGroupController(name string, gc *g return tmp.(*groupCostController), loaded } +func (c *ResourceGroupsController) getOrCreateRequestSourceMetricsState(name string) *requestSourceMetricsState { + if state, ok := c.requestSourceStates.Load(name); ok { + return state.(*requestSourceMetricsState) + } + state := newRequestSourceMetricsState(name) + actual, _ := c.requestSourceStates.LoadOrStore(name, state) + return actual.(*requestSourceMetricsState) +} + +func (c *ResourceGroupsController) cleanupRequestSourceMetricsState(name string) { + if state, ok := c.requestSourceStates.Load(name); ok { + state.(*requestSourceMetricsState).cleanup() + c.requestSourceStates.Delete(name) + } +} + // NewResourceGroupNotExistErr returns a new error that indicates the resource group does not exist. // It's exported for testing. func NewResourceGroupNotExistErr(name string) error { @@ -522,7 +545,13 @@ func (c *ResourceGroupsController) tryGetResourceGroupController( return gc, nil } // Initialize the resource group controller. - gc, err = newGroupCostController(group, c.ruConfig, c.lowTokenNotifyChan, c.tokenBucketUpdateChan) + gc, err = newGroupCostController( + group, + c.ruConfig, + c.lowTokenNotifyChan, + c.tokenBucketUpdateChan, + c.getOrCreateRequestSourceMetricsState(name), + ) if err != nil { return nil, err } @@ -554,15 +583,23 @@ func (c *ResourceGroupsController) tombstoneGroupCostController(name string) { log.Warn("[resource group controller] get default resource group meta for tombstone failed", zap.String("name", name), zap.Error(err)) // Directly delete the resource group controller if the default group is not available. + c.cleanupRequestSourceMetricsState(name) c.groupsController.Delete(name) return } // Create a default resource group controller for the tombstone resource group independently. - gc, err := newGroupCostController(defaultGC.getMeta(), c.ruConfig, c.lowTokenNotifyChan, c.tokenBucketUpdateChan) + gc, err := newGroupCostController( + defaultGC.getMeta(), + c.ruConfig, + c.lowTokenNotifyChan, + c.tokenBucketUpdateChan, + c.getOrCreateRequestSourceMetricsState(name), + ) if err != nil { log.Warn("[resource group controller] create default resource group cost controller for tombstone failed", zap.String("name", name), zap.Error(err)) // Directly delete the resource group controller if the default group controller cannot be created. + c.cleanupRequestSourceMetricsState(name) c.groupsController.Delete(name) return } @@ -584,8 +621,9 @@ func (c *ResourceGroupsController) cleanUpResourceGroup() { gc.mu.Unlock() if equalRU(latestConsumption, *gc.run.consumption) { if gc.inactive || gc.tombstone.Load() { - gc.metrics.cleanupRequestSourceMetrics(resourceGroupName) + gc.metrics.cleanupRequestSourceMetrics() c.groupsController.Delete(resourceGroupName) + c.requestSourceStates.Delete(resourceGroupName) metrics.ResourceGroupStatusGauge.DeleteLabelValues(resourceGroupName, resourceGroupName) return true } diff --git a/client/resource_group/controller/group_controller.go b/client/resource_group/controller/group_controller.go index 677fe6dbcf4..31d002efffd 100644 --- a/client/resource_group/controller/group_controller.go +++ b/client/resource_group/controller/group_controller.go @@ -106,8 +106,7 @@ type groupMetricsCollection struct { tokenRequestCounter prometheus.Counter runningKVRequestCounter prometheus.Gauge consumeTokenHistogram prometheus.Observer - sourceMetricsMu sync.RWMutex - sourceMetrics map[string]*requestSourceMetrics + sourceState *requestSourceMetricsState } type requestSourceMetrics struct { @@ -115,7 +114,35 @@ type requestSourceMetrics struct { wru prometheus.Counter } -func initMetrics(oldName, name string) *groupMetricsCollection { +type requestSourceMetricsState struct { + resourceGroupName string + mu sync.RWMutex + closed bool + items map[string]*requestSourceMetrics +} + +func newRequestSourceMetricsState(resourceGroupName string) *requestSourceMetricsState { + return &requestSourceMetricsState{ + resourceGroupName: resourceGroupName, + items: make(map[string]*requestSourceMetrics), + } +} + +func (s *requestSourceMetricsState) cleanup() { + if s == nil { + return + } + s.mu.Lock() + defer s.mu.Unlock() + s.closed = true + for requestSource := range s.items { + metrics.RequestSourceRUCounter.DeleteLabelValues(s.resourceGroupName, requestSource, "rru") + metrics.RequestSourceRUCounter.DeleteLabelValues(s.resourceGroupName, requestSource, "wru") + delete(s.items, requestSource) + } +} + +func initMetrics(oldName, name string, sourceState *requestSourceMetricsState) *groupMetricsCollection { const ( otherType = "others" throttledType = "throttled" @@ -129,37 +156,50 @@ func initMetrics(oldName, name string) *groupMetricsCollection { tokenRequestCounter: metrics.ResourceGroupTokenRequestCounter.WithLabelValues(oldName, name), runningKVRequestCounter: metrics.GroupRunningKVRequestCounter.WithLabelValues(name), consumeTokenHistogram: metrics.TokenConsumedHistogram.WithLabelValues(name), - sourceMetrics: make(map[string]*requestSourceMetrics), + sourceState: sourceState, } } -func (mc *groupMetricsCollection) getOrCreateRequestSourceMetrics(resourceGroupName, requestSource string) *requestSourceMetrics { - mc.sourceMetricsMu.RLock() - sourceMetrics, ok := mc.sourceMetrics[requestSource] - mc.sourceMetricsMu.RUnlock() +func (mc *groupMetricsCollection) getOrCreateRequestSourceMetrics(requestSource string) *requestSourceMetrics { + if mc.sourceState == nil { + return nil + } + mc.sourceState.mu.RLock() + sourceMetrics, ok := mc.sourceState.items[requestSource] + closed := mc.sourceState.closed + mc.sourceState.mu.RUnlock() if ok { return sourceMetrics } + if closed { + return nil + } - mc.sourceMetricsMu.Lock() - defer mc.sourceMetricsMu.Unlock() - sourceMetrics, ok = mc.sourceMetrics[requestSource] + mc.sourceState.mu.Lock() + defer mc.sourceState.mu.Unlock() + if mc.sourceState.closed { + return nil + } + sourceMetrics, ok = mc.sourceState.items[requestSource] if ok { return sourceMetrics } sourceMetrics = &requestSourceMetrics{ - rru: metrics.RequestSourceRUCounter.WithLabelValues(resourceGroupName, requestSource, "rru"), - wru: metrics.RequestSourceRUCounter.WithLabelValues(resourceGroupName, requestSource, "wru"), + rru: metrics.RequestSourceRUCounter.WithLabelValues(mc.sourceState.resourceGroupName, requestSource, "rru"), + wru: metrics.RequestSourceRUCounter.WithLabelValues(mc.sourceState.resourceGroupName, requestSource, "wru"), } - mc.sourceMetrics[requestSource] = sourceMetrics + mc.sourceState.items[requestSource] = sourceMetrics return sourceMetrics } -func (mc *groupMetricsCollection) addRequestSourceRU(resourceGroupName, requestSource string, consumption *rmpb.Consumption) { +func (mc *groupMetricsCollection) addRequestSourceRU(requestSource string, consumption *rmpb.Consumption) { if consumption == nil { return } - sourceMetrics := mc.getOrCreateRequestSourceMetrics(resourceGroupName, requestSource) + sourceMetrics := mc.getOrCreateRequestSourceMetrics(requestSource) + if sourceMetrics == nil { + return + } if consumption.RRU > 0 { sourceMetrics.rru.Add(consumption.RRU) } @@ -168,14 +208,8 @@ func (mc *groupMetricsCollection) addRequestSourceRU(resourceGroupName, requestS } } -func (mc *groupMetricsCollection) cleanupRequestSourceMetrics(resourceGroupName string) { - mc.sourceMetricsMu.Lock() - defer mc.sourceMetricsMu.Unlock() - for requestSource := range mc.sourceMetrics { - metrics.RequestSourceRUCounter.DeleteLabelValues(resourceGroupName, requestSource, "rru") - metrics.RequestSourceRUCounter.DeleteLabelValues(resourceGroupName, requestSource, "wru") - delete(mc.sourceMetrics, requestSource) - } +func (mc *groupMetricsCollection) cleanupRequestSourceMetrics() { + mc.sourceState.cleanup() } type tokenCounter struct { @@ -209,6 +243,7 @@ func newGroupCostController( mainCfg *RUConfig, lowRUNotifyChan chan notifyMsg, tokenBucketUpdateChan chan *groupCostController, + sourceState *requestSourceMetricsState, ) (*groupCostController, error) { switch group.Mode { case rmpb.GroupMode_RUMode: @@ -218,7 +253,10 @@ func newGroupCostController( default: return nil, errs.ErrClientResourceGroupConfigUnavailable.FastGenByArgs("not supports the resource type") } - ms := initMetrics(group.Name, group.Name) + if sourceState == nil { + sourceState = newRequestSourceMetricsState(group.Name) + } + ms := initMetrics(group.Name, group.Name, sourceState) gc := &groupCostController{ meta: group, name: group.Name, @@ -630,7 +668,7 @@ func (gc *groupCostController) onRequestWaitImpl( waitDuration += d } - gc.metrics.addRequestSourceRU(gc.name, info.RequestSource(), delta) + gc.metrics.addRequestSourceRU(info.RequestSource(), delta) gc.mu.Lock() // Calculate the penalty of the store @@ -677,7 +715,7 @@ func (gc *groupCostController) onResponseImpl( add(gc.mu.globalCounter, count) gc.mu.Unlock() - gc.metrics.addRequestSourceRU(gc.name, req.RequestSource(), delta) + gc.metrics.addRequestSourceRU(req.RequestSource(), delta) return delta, nil } @@ -720,7 +758,7 @@ func (gc *groupCostController) onResponseWaitImpl( add(gc.mu.globalCounter, count) gc.mu.Unlock() - gc.metrics.addRequestSourceRU(gc.name, req.RequestSource(), delta) + gc.metrics.addRequestSourceRU(req.RequestSource(), delta) return delta, waitDuration, nil } diff --git a/client/resource_group/controller/group_controller_test.go b/client/resource_group/controller/group_controller_test.go index f916f5e56c4..5fe80504755 100644 --- a/client/resource_group/controller/group_controller_test.go +++ b/client/resource_group/controller/group_controller_test.go @@ -45,7 +45,7 @@ func createTestGroupCostController(re *require.Assertions) *groupCostController } ch1 := make(chan notifyMsg) ch2 := make(chan *groupCostController) - gc, err := newGroupCostController(group, DefaultRUConfig(), ch1, ch2) + gc, err := newGroupCostController(group, DefaultRUConfig(), ch1, ch2, nil) re.NoError(err) return gc } @@ -239,7 +239,7 @@ func TestAcquireTokensSignalAwareWait(t *testing.T) { cfg := DefaultRUConfig() cfg.WaitRetryInterval = 5 * time.Second cfg.WaitRetryTimes = 3 - gc, err := newGroupCostController(group, cfg, notifyCh, make(chan *groupCostController, 1)) + gc, err := newGroupCostController(group, cfg, notifyCh, make(chan *groupCostController, 1), nil) re.NoError(err) // Set fillRate=0 so reservation always fails with InfDuration, diff --git a/client/resource_group/controller/request_source_metrics_test.go b/client/resource_group/controller/request_source_metrics_test.go index 79ae0857b79..df60cd69427 100644 --- a/client/resource_group/controller/request_source_metrics_test.go +++ b/client/resource_group/controller/request_source_metrics_test.go @@ -34,6 +34,16 @@ func collectorMetricCount(collector prometheus.Collector) int { return count } +func requestSourceStateSnapshot(t *testing.T, gc *groupCostController, requestSource string) (*requestSourceMetrics, int) { + t.Helper() + require.NotNil(t, gc.metrics.sourceState) + + gc.metrics.sourceState.mu.RLock() + defer gc.metrics.sourceState.mu.RUnlock() + + return gc.metrics.sourceState.items[requestSource], len(gc.metrics.sourceState.items) +} + func TestRequestSourceMetricsCachedByResourceGroup(t *testing.T) { re := require.New(t) gc := createTestGroupCostController(re) @@ -59,10 +69,7 @@ func TestRequestSourceMetricsCachedByResourceGroup(t *testing.T) { re.NotZero(reqConsumption.WRU) re.NotZero(respConsumption.RRU) - gc.metrics.sourceMetricsMu.RLock() - sourceMetrics := gc.metrics.sourceMetrics[req.requestSource] - cacheSize := len(gc.metrics.sourceMetrics) - gc.metrics.sourceMetricsMu.RUnlock() + sourceMetrics, cacheSize := requestSourceStateSnapshot(t, gc, req.requestSource) re.Equal(1, cacheSize) re.NotNil(sourceMetrics) @@ -72,10 +79,9 @@ func TestRequestSourceMetricsCachedByResourceGroup(t *testing.T) { _, _, _, _, err = gc.onRequestWaitImpl(context.Background(), req) re.NoError(err) - gc.metrics.sourceMetricsMu.RLock() - re.Equal(1, len(gc.metrics.sourceMetrics)) - re.Same(sourceMetrics, gc.metrics.sourceMetrics[req.requestSource]) - gc.metrics.sourceMetricsMu.RUnlock() + cachedMetrics, cacheSize := requestSourceStateSnapshot(t, gc, req.requestSource) + re.Equal(1, cacheSize) + re.Same(sourceMetrics, cachedMetrics) controllerMetrics.RequestSourceRUCounter.DeleteLabelValues(gc.name, req.requestSource, "rru") controllerMetrics.RequestSourceRUCounter.DeleteLabelValues(gc.name, req.requestSource, "wru") @@ -130,8 +136,263 @@ func TestCleanupResourceGroupRemovesRequestSourceMetrics(t *testing.T) { _, ok := controller.loadGroupController(group.Name) re.False(ok) - gc.metrics.sourceMetricsMu.RLock() - re.Empty(gc.metrics.sourceMetrics) - gc.metrics.sourceMetricsMu.RUnlock() + _, cacheSize := requestSourceStateSnapshot(t, gc, req.requestSource) + re.Zero(cacheSize) + re.Equal(beforeCount, collectorMetricCount(controllerMetrics.RequestSourceRUCounter)) +} + +func TestCleanupDoesNotReexportExistingCachedHandle(t *testing.T) { + re := require.New(t) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + mockProvider := newMockResourceGroupProvider() + controller, err := NewResourceGroupController(ctx, 1, mockProvider, nil, 0) + re.NoError(err) + + group := &rmpb.ResourceGroup{ + Name: "request-source-cleanup-cached-handle", + Mode: rmpb.GroupMode_RUMode, + RUSettings: &rmpb.GroupRequestUnitSettings{ + RU: &rmpb.TokenBucket{ + Settings: &rmpb.TokenLimitSettings{FillRate: 1000000}, + }, + }, + } + mockProvider.On("GetResourceGroup", mock.Anything, group.Name, mock.Anything).Return(group, nil) + + gc, err := controller.tryGetResourceGroupController(ctx, group.Name, false) + re.NoError(err) + + req := &TestRequestInfo{ + isWrite: true, + writeBytes: 64, + numReplicas: 1, + storeID: 1, + requestSource: "internal_gc_cleanup_cached_handle", + } + resp := &TestResponseInfo{readBytes: 64, succeed: true} + beforeCount := collectorMetricCount(controllerMetrics.RequestSourceRUCounter) + + _, _, _, _, err = gc.onRequestWaitImpl(context.Background(), req) + re.NoError(err) + _, err = gc.onResponseImpl(req, resp) + re.NoError(err) + + sourceMetrics, cacheSize := requestSourceStateSnapshot(t, gc, req.requestSource) + re.NotNil(sourceMetrics) + re.Equal(1, cacheSize) + re.Equal(beforeCount+2, collectorMetricCount(controllerMetrics.RequestSourceRUCounter)) + + gc.mu.Lock() + *gc.run.consumption = *gc.mu.consumption + gc.mu.Unlock() + gc.inactive = true + + controller.cleanUpResourceGroup() + + _, ok := controller.loadGroupController(group.Name) + re.False(ok) + re.Equal(beforeCount, collectorMetricCount(controllerMetrics.RequestSourceRUCounter)) + + sourceMetrics.rru.Add(1) + sourceMetrics.wru.Add(1) + + re.Equal(beforeCount, collectorMetricCount(controllerMetrics.RequestSourceRUCounter)) +} + +func TestCleanupPreventsRecreateRequestSourceMetrics(t *testing.T) { + re := require.New(t) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + mockProvider := newMockResourceGroupProvider() + controller, err := NewResourceGroupController(ctx, 1, mockProvider, nil, 0) + re.NoError(err) + + group := &rmpb.ResourceGroup{ + Name: "request-source-cleanup-prevent-recreate", + Mode: rmpb.GroupMode_RUMode, + RUSettings: &rmpb.GroupRequestUnitSettings{ + RU: &rmpb.TokenBucket{ + Settings: &rmpb.TokenLimitSettings{FillRate: 1000000}, + }, + }, + } + mockProvider.On("GetResourceGroup", mock.Anything, group.Name, mock.Anything).Return(group, nil) + + gc, err := controller.tryGetResourceGroupController(ctx, group.Name, false) + re.NoError(err) + + req := &TestRequestInfo{ + isWrite: true, + writeBytes: 64, + numReplicas: 1, + storeID: 1, + requestSource: "internal_gc_cleanup_prevent_recreate", + } + resp := &TestResponseInfo{readBytes: 64, succeed: true} + beforeCount := collectorMetricCount(controllerMetrics.RequestSourceRUCounter) + + _, _, _, _, err = gc.onRequestWaitImpl(context.Background(), req) + re.NoError(err) + _, err = gc.onResponseImpl(req, resp) + re.NoError(err) + re.Equal(beforeCount+2, collectorMetricCount(controllerMetrics.RequestSourceRUCounter)) + + gc.mu.Lock() + *gc.run.consumption = *gc.mu.consumption + gc.mu.Unlock() + gc.inactive = true + + controller.cleanUpResourceGroup() + + _, ok := controller.loadGroupController(group.Name) + re.False(ok) + _, cacheSize := requestSourceStateSnapshot(t, gc, req.requestSource) + re.Zero(cacheSize) + re.Equal(beforeCount, collectorMetricCount(controllerMetrics.RequestSourceRUCounter)) + + gc.metrics.addRequestSourceRU(req.requestSource, &rmpb.Consumption{RRU: 1, WRU: 1}) + + _, cacheSize = requestSourceStateSnapshot(t, gc, req.requestSource) + re.Zero(cacheSize) + re.Equal(beforeCount, collectorMetricCount(controllerMetrics.RequestSourceRUCounter)) +} + +func TestTombstoneCleanupRemovesExistingRequestSourceMetrics(t *testing.T) { + re := require.New(t) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + mockProvider := newMockResourceGroupProvider() + controller, err := NewResourceGroupController(ctx, 1, mockProvider, nil, 0) + re.NoError(err) + + group := &rmpb.ResourceGroup{ + Name: "request-source-tombstone-cleanup", + Mode: rmpb.GroupMode_RUMode, + RUSettings: &rmpb.GroupRequestUnitSettings{ + RU: &rmpb.TokenBucket{ + Settings: &rmpb.TokenLimitSettings{FillRate: 1000000}, + }, + }, + } + defaultGroup := &rmpb.ResourceGroup{ + Name: defaultResourceGroupName, + Mode: rmpb.GroupMode_RUMode, + RUSettings: &rmpb.GroupRequestUnitSettings{ + RU: &rmpb.TokenBucket{ + Settings: &rmpb.TokenLimitSettings{FillRate: 1000000}, + }, + }, + } + mockProvider.On("GetResourceGroup", mock.Anything, group.Name, mock.Anything).Return(group, nil) + mockProvider.On("GetResourceGroup", mock.Anything, defaultResourceGroupName, mock.Anything).Return(defaultGroup, nil) + + gc, err := controller.tryGetResourceGroupController(ctx, group.Name, false) + re.NoError(err) + + req := &TestRequestInfo{ + isWrite: true, + writeBytes: 64, + numReplicas: 1, + storeID: 1, + requestSource: "internal_gc_tombstone_cleanup", + } + resp := &TestResponseInfo{readBytes: 64, succeed: true} + beforeCount := collectorMetricCount(controllerMetrics.RequestSourceRUCounter) + + _, _, _, _, err = gc.onRequestWaitImpl(context.Background(), req) + re.NoError(err) + _, err = gc.onResponseImpl(req, resp) + re.NoError(err) + re.Equal(beforeCount+2, collectorMetricCount(controllerMetrics.RequestSourceRUCounter)) + + controller.tombstoneGroupCostController(group.Name) + tombstoneGC, err := controller.tryGetResourceGroupController(ctx, group.Name, true) + re.NoError(err) + re.True(tombstoneGC.tombstone.Load()) + + controller.cleanUpResourceGroup() + + _, ok := controller.loadGroupController(group.Name) + re.False(ok) + re.Equal(beforeCount, collectorMetricCount(controllerMetrics.RequestSourceRUCounter)) +} + +func TestRevivedResourceGroupCleanupRemovesExistingRequestSourceMetrics(t *testing.T) { + re := require.New(t) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + mockProvider := newMockResourceGroupProvider() + controller, err := NewResourceGroupController(ctx, 1, mockProvider, nil, 0) + re.NoError(err) + + group := &rmpb.ResourceGroup{ + Name: "request-source-revive-cleanup", + Mode: rmpb.GroupMode_RUMode, + RUSettings: &rmpb.GroupRequestUnitSettings{ + RU: &rmpb.TokenBucket{ + Settings: &rmpb.TokenLimitSettings{FillRate: 1000000}, + }, + }, + } + defaultGroup := &rmpb.ResourceGroup{ + Name: defaultResourceGroupName, + Mode: rmpb.GroupMode_RUMode, + RUSettings: &rmpb.GroupRequestUnitSettings{ + RU: &rmpb.TokenBucket{ + Settings: &rmpb.TokenLimitSettings{FillRate: 1000000}, + }, + }, + } + mockProvider.On("GetResourceGroup", mock.Anything, group.Name, mock.Anything).Return(group, nil) + mockProvider.On("GetResourceGroup", mock.Anything, defaultResourceGroupName, mock.Anything).Return(defaultGroup, nil) + + gc, err := controller.tryGetResourceGroupController(ctx, group.Name, false) + re.NoError(err) + + req := &TestRequestInfo{ + isWrite: true, + writeBytes: 64, + numReplicas: 1, + storeID: 1, + requestSource: "internal_gc_revive_cleanup", + } + resp := &TestResponseInfo{readBytes: 64, succeed: true} + beforeCount := collectorMetricCount(controllerMetrics.RequestSourceRUCounter) + + _, _, _, _, err = gc.onRequestWaitImpl(context.Background(), req) + re.NoError(err) + _, err = gc.onResponseImpl(req, resp) + re.NoError(err) + re.Equal(beforeCount+2, collectorMetricCount(controllerMetrics.RequestSourceRUCounter)) + + controller.tombstoneGroupCostController(group.Name) + tombstoneGC, err := controller.tryGetResourceGroupController(ctx, group.Name, true) + re.NoError(err) + re.True(tombstoneGC.tombstone.Load()) + + revivedGC, err := newGroupCostController( + group, + controller.ruConfig, + controller.lowTokenNotifyChan, + controller.tokenBucketUpdateChan, + controller.getOrCreateRequestSourceMetricsState(group.Name), + ) + re.NoError(err) + re.True(controller.groupsController.CompareAndSwap(group.Name, tombstoneGC, revivedGC)) + + revivedGC.mu.Lock() + *revivedGC.run.consumption = *revivedGC.mu.consumption + revivedGC.mu.Unlock() + revivedGC.inactive = true + + controller.cleanUpResourceGroup() + + _, ok := controller.loadGroupController(group.Name) + re.False(ok) re.Equal(beforeCount, collectorMetricCount(controllerMetrics.RequestSourceRUCounter)) } From 11e1a6cd550e18592f129cd21cbb3f72fcd36e21 Mon Sep 17 00:00:00 2001 From: Yuhao Zhang Date: Tue, 14 Apr 2026 14:08:20 +0800 Subject: [PATCH 3/5] client/resource_group: fix race between cleanup and getOrCreate for request-source metrics state Use LoadAndDelete in cleanupRequestSourceMetricsState so the map entry is removed atomically before the state is closed. Any hot-path goroutine still holding the old reference no-ops via the closed check, and the next getOrCreateRequestSourceMetricsState allocates a fresh state instead of returning a closed one. Addresses rleungx's review comment. Signed-off-by: Yuhao Zhang --- .../controller/global_controller.go | 8 +- .../controller/group_controller.go | 4 - .../controller/request_source_metrics_test.go | 158 ++++++++++++++++++ 3 files changed, 161 insertions(+), 9 deletions(-) diff --git a/client/resource_group/controller/global_controller.go b/client/resource_group/controller/global_controller.go index 355fd4ca9c2..e7aeb3a84c1 100644 --- a/client/resource_group/controller/global_controller.go +++ b/client/resource_group/controller/global_controller.go @@ -490,9 +490,8 @@ func (c *ResourceGroupsController) getOrCreateRequestSourceMetricsState(name str } func (c *ResourceGroupsController) cleanupRequestSourceMetricsState(name string) { - if state, ok := c.requestSourceStates.Load(name); ok { - state.(*requestSourceMetricsState).cleanup() - c.requestSourceStates.Delete(name) + if v, loaded := c.requestSourceStates.LoadAndDelete(name); loaded { + v.(*requestSourceMetricsState).cleanup() } } @@ -621,9 +620,8 @@ func (c *ResourceGroupsController) cleanUpResourceGroup() { gc.mu.Unlock() if equalRU(latestConsumption, *gc.run.consumption) { if gc.inactive || gc.tombstone.Load() { - gc.metrics.cleanupRequestSourceMetrics() + c.cleanupRequestSourceMetricsState(resourceGroupName) c.groupsController.Delete(resourceGroupName) - c.requestSourceStates.Delete(resourceGroupName) metrics.ResourceGroupStatusGauge.DeleteLabelValues(resourceGroupName, resourceGroupName) return true } diff --git a/client/resource_group/controller/group_controller.go b/client/resource_group/controller/group_controller.go index 31d002efffd..d3f8afc0a26 100644 --- a/client/resource_group/controller/group_controller.go +++ b/client/resource_group/controller/group_controller.go @@ -208,10 +208,6 @@ func (mc *groupMetricsCollection) addRequestSourceRU(requestSource string, consu } } -func (mc *groupMetricsCollection) cleanupRequestSourceMetrics() { - mc.sourceState.cleanup() -} - type tokenCounter struct { fillRate uint64 diff --git a/client/resource_group/controller/request_source_metrics_test.go b/client/resource_group/controller/request_source_metrics_test.go index df60cd69427..55a52f48128 100644 --- a/client/resource_group/controller/request_source_metrics_test.go +++ b/client/resource_group/controller/request_source_metrics_test.go @@ -396,3 +396,161 @@ func TestRevivedResourceGroupCleanupRemovesExistingRequestSourceMetrics(t *testi re.False(ok) re.Equal(beforeCount, collectorMetricCount(controllerMetrics.RequestSourceRUCounter)) } + +// TestGetOrCreateAfterCleanupReturnsFreshState verifies the fix for the race +// rleungx identified: after cleanupRequestSourceMetricsState runs, +// getOrCreateRequestSourceMetricsState must return a fresh, non-closed state +// so that a newly created gc can record metrics normally. +func TestGetOrCreateAfterCleanupReturnsFreshState(t *testing.T) { + re := require.New(t) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + mockProvider := newMockResourceGroupProvider() + controller, err := NewResourceGroupController(ctx, 1, mockProvider, nil, 0) + re.NoError(err) + + group := &rmpb.ResourceGroup{ + Name: "request-source-create-after-cleanup", + Mode: rmpb.GroupMode_RUMode, + RUSettings: &rmpb.GroupRequestUnitSettings{ + RU: &rmpb.TokenBucket{ + Settings: &rmpb.TokenLimitSettings{FillRate: 1000000}, + }, + }, + } + mockProvider.On("GetResourceGroup", mock.Anything, group.Name, mock.Anything).Return(group, nil) + + // Phase 1: create a gc, record some metrics, then clean up. + gc, err := controller.tryGetResourceGroupController(ctx, group.Name, false) + re.NoError(err) + + req := &TestRequestInfo{ + isWrite: true, + writeBytes: 64, + numReplicas: 1, + storeID: 1, + requestSource: "internal_create_after_cleanup", + } + resp := &TestResponseInfo{readBytes: 64, succeed: true} + + _, _, _, _, err = gc.onRequestWaitImpl(context.Background(), req) + re.NoError(err) + _, err = gc.onResponseImpl(req, resp) + re.NoError(err) + + gc.mu.Lock() + *gc.run.consumption = *gc.mu.consumption + gc.mu.Unlock() + gc.inactive = true + controller.cleanUpResourceGroup() + + _, loaded := controller.loadGroupController(group.Name) + re.False(loaded) + + // Phase 2: getOrCreateRequestSourceMetricsState must return a fresh, + // non-closed state after cleanup has removed the old one. + state := controller.getOrCreateRequestSourceMetricsState(group.Name) + re.NotNil(state) + + state.mu.RLock() + re.False(state.closed) + state.mu.RUnlock() + + // Phase 3: create a new gc with this state and verify it can record metrics. + newGC, err := newGroupCostController( + group, + controller.ruConfig, + controller.lowTokenNotifyChan, + controller.tokenBucketUpdateChan, + state, + ) + re.NoError(err) + + beforeCount := collectorMetricCount(controllerMetrics.RequestSourceRUCounter) + _, _, _, _, err = newGC.onRequestWaitImpl(context.Background(), req) + re.NoError(err) + _, err = newGC.onResponseImpl(req, resp) + re.NoError(err) + + sourceMetrics, cacheSize := requestSourceStateSnapshot(t, newGC, req.requestSource) + re.Equal(1, cacheSize) + re.NotNil(sourceMetrics) + re.Greater(counterValue(t, sourceMetrics.wru), float64(0)) + re.Greater(counterValue(t, sourceMetrics.rru), float64(0)) + re.Equal(beforeCount+2, collectorMetricCount(controllerMetrics.RequestSourceRUCounter)) + + // Cleanup for this test. + state.cleanup() +} + +// TestCleanupThenRecreateViaFullPath exercises the full end-to-end path: +// cleanup a group, then tryGetResourceGroupController re-creates it, and +// the new gc records metrics successfully. +func TestCleanupThenRecreateViaFullPath(t *testing.T) { + re := require.New(t) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + mockProvider := newMockResourceGroupProvider() + controller, err := NewResourceGroupController(ctx, 1, mockProvider, nil, 0) + re.NoError(err) + + group := &rmpb.ResourceGroup{ + Name: "request-source-full-recreate", + Mode: rmpb.GroupMode_RUMode, + RUSettings: &rmpb.GroupRequestUnitSettings{ + RU: &rmpb.TokenBucket{ + Settings: &rmpb.TokenLimitSettings{FillRate: 1000000}, + }, + }, + } + mockProvider.On("GetResourceGroup", mock.Anything, group.Name, mock.Anything).Return(group, nil) + + req := &TestRequestInfo{ + isWrite: true, + writeBytes: 64, + numReplicas: 1, + storeID: 1, + requestSource: "internal_full_recreate", + } + resp := &TestResponseInfo{readBytes: 64, succeed: true} + + // Create, use, and clean up the group. + gc, err := controller.tryGetResourceGroupController(ctx, group.Name, false) + re.NoError(err) + _, _, _, _, err = gc.onRequestWaitImpl(context.Background(), req) + re.NoError(err) + _, err = gc.onResponseImpl(req, resp) + re.NoError(err) + + gc.mu.Lock() + *gc.run.consumption = *gc.mu.consumption + gc.mu.Unlock() + gc.inactive = true + controller.cleanUpResourceGroup() + + _, loaded := controller.loadGroupController(group.Name) + re.False(loaded) + + // Re-create through the normal path (simulates a new request arriving + // after the group was cleaned up). + gc2, err := controller.tryGetResourceGroupController(ctx, group.Name, false) + re.NoError(err) + + beforeCount := collectorMetricCount(controllerMetrics.RequestSourceRUCounter) + _, _, _, _, err = gc2.onRequestWaitImpl(context.Background(), req) + re.NoError(err) + _, err = gc2.onResponseImpl(req, resp) + re.NoError(err) + + sourceMetrics, cacheSize := requestSourceStateSnapshot(t, gc2, req.requestSource) + re.Equal(1, cacheSize) + re.NotNil(sourceMetrics) + re.Greater(counterValue(t, sourceMetrics.wru), float64(0)) + re.Greater(counterValue(t, sourceMetrics.rru), float64(0)) + re.Equal(beforeCount+2, collectorMetricCount(controllerMetrics.RequestSourceRUCounter)) + + // Cleanup for this test. + gc2.metrics.sourceState.cleanup() +} From 915f0b5414ba8fe3d6b960f34c8cc2c0c27fafba Mon Sep 17 00:00:00 2001 From: Yuhao Zhang Date: Tue, 14 Apr 2026 21:39:34 +0800 Subject: [PATCH 4/5] client/resource_group/controller/metrics: rename misleading errType label const errType was reused for the new RequestSourceRUCounter where the "type" dimension means rru/wru, not error kind. Rename to typeLabel to avoid the misleading prefix and align with the file's xxxLabel convention. Also extract requestSourceLabel for the new counter. The Prometheus label name "type" is unchanged. Signed-off-by: Yuhao Zhang --- client/resource_group/controller/metrics/metrics.go | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/client/resource_group/controller/metrics/metrics.go b/client/resource_group/controller/metrics/metrics.go index 6826518378a..17aa4a0e2f9 100644 --- a/client/resource_group/controller/metrics/metrics.go +++ b/client/resource_group/controller/metrics/metrics.go @@ -24,8 +24,9 @@ const ( // TODO: remove old label in 8.x resourceGroupNameLabel = "name" newResourceGroupNameLabel = "resource_group" + requestSourceLabel = "request_source" - errType = "type" + typeLabel = "type" ) var ( @@ -98,7 +99,7 @@ func initMetrics(constLabels prometheus.Labels) { Name: "fail", Help: "Counter of failed request.", ConstLabels: constLabels, - }, []string{resourceGroupNameLabel, newResourceGroupNameLabel, errType}) + }, []string{resourceGroupNameLabel, newResourceGroupNameLabel, typeLabel}) GroupRunningKVRequestCounter = prometheus.NewGaugeVec( prometheus.GaugeOpts{ @@ -162,7 +163,7 @@ func initMetrics(constLabels prometheus.Labels) { Name: "ru_total", Help: "Counter of request RU consumption grouped by resource group and request source.", ConstLabels: constLabels, - }, []string{newResourceGroupNameLabel, "request_source", errType}) + }, []string{newResourceGroupNameLabel, requestSourceLabel, typeLabel}) // WithLabelValues is a heavy operation, define variable to avoid call it every time. FailedTokenRequestDuration = TokenRequestDuration.WithLabelValues("fail") From c7fbfeef1364735c66c6a890241d4b71a10de05f Mon Sep 17 00:00:00 2001 From: Yuhao Zhang Date: Tue, 14 Apr 2026 21:39:44 +0800 Subject: [PATCH 5/5] client/resource_group: drop test-only nil sourceState fallback newGroupCostController had a fallback that constructed a fresh requestSourceMetricsState when callers passed nil. The state created by the fallback was never registered in ResourceGroupsController's requestSourceStates map, so it would never be cleaned up. All three production call sites pass a registered state via getOrCreateRequestSourceMetricsState; only two pre-existing tests relied on nil. Remove the fallback and update the two tests to construct their own state explicitly. Signed-off-by: Yuhao Zhang --- client/resource_group/controller/group_controller.go | 3 --- client/resource_group/controller/group_controller_test.go | 4 ++-- 2 files changed, 2 insertions(+), 5 deletions(-) diff --git a/client/resource_group/controller/group_controller.go b/client/resource_group/controller/group_controller.go index d3f8afc0a26..7bab211410b 100644 --- a/client/resource_group/controller/group_controller.go +++ b/client/resource_group/controller/group_controller.go @@ -249,9 +249,6 @@ func newGroupCostController( default: return nil, errs.ErrClientResourceGroupConfigUnavailable.FastGenByArgs("not supports the resource type") } - if sourceState == nil { - sourceState = newRequestSourceMetricsState(group.Name) - } ms := initMetrics(group.Name, group.Name, sourceState) gc := &groupCostController{ meta: group, diff --git a/client/resource_group/controller/group_controller_test.go b/client/resource_group/controller/group_controller_test.go index 5fe80504755..de75e1a800e 100644 --- a/client/resource_group/controller/group_controller_test.go +++ b/client/resource_group/controller/group_controller_test.go @@ -45,7 +45,7 @@ func createTestGroupCostController(re *require.Assertions) *groupCostController } ch1 := make(chan notifyMsg) ch2 := make(chan *groupCostController) - gc, err := newGroupCostController(group, DefaultRUConfig(), ch1, ch2, nil) + gc, err := newGroupCostController(group, DefaultRUConfig(), ch1, ch2, newRequestSourceMetricsState(group.Name)) re.NoError(err) return gc } @@ -239,7 +239,7 @@ func TestAcquireTokensSignalAwareWait(t *testing.T) { cfg := DefaultRUConfig() cfg.WaitRetryInterval = 5 * time.Second cfg.WaitRetryTimes = 3 - gc, err := newGroupCostController(group, cfg, notifyCh, make(chan *groupCostController, 1), nil) + gc, err := newGroupCostController(group, cfg, notifyCh, make(chan *groupCostController, 1), newRequestSourceMetricsState(group.Name)) re.NoError(err) // Set fillRate=0 so reservation always fails with InfDuration,