diff --git a/client/resource_group/controller/global_controller.go b/client/resource_group/controller/global_controller.go index a5e88a090a4..e7aeb3a84c1 100644 --- a/client/resource_group/controller/global_controller.go +++ b/client/resource_group/controller/global_controller.go @@ -142,11 +142,12 @@ var _ ResourceGroupKVInterceptor = (*ResourceGroupsController)(nil) // ResourceGroupsController implements ResourceGroupKVInterceptor. type ResourceGroupsController struct { - clientUniqueID uint64 - provider ResourceGroupProvider - groupsController sync.Map - ruConfig *RUConfig - keyspaceID uint32 + clientUniqueID uint64 + provider ResourceGroupProvider + groupsController sync.Map + requestSourceStates sync.Map + ruConfig *RUConfig + keyspaceID uint32 loopCtx context.Context loopCancel func() @@ -337,6 +338,7 @@ func (c *ResourceGroupsController) Start(ctx context.Context) { /* channels */ case <-c.loopCtx.Done(): metrics.ResourceGroupStatusGauge.Reset() + metrics.RequestSourceRUCounter.Reset() return case <-c.responseDeadlineCh: c.run.inDegradedMode.Store(true) @@ -387,7 +389,13 @@ func (c *ResourceGroupsController) Start(ctx context.Context) { continue } // If the resource group is marked as tombstone before, re-create the resource group controller. - newGC, err := newGroupCostController(group, c.ruConfig, c.lowTokenNotifyChan, c.tokenBucketUpdateChan) + newGC, err := newGroupCostController( + group, + c.ruConfig, + c.lowTokenNotifyChan, + c.tokenBucketUpdateChan, + c.getOrCreateRequestSourceMetricsState(name), + ) if err != nil { log.Warn("[resource group controller] re-create resource group cost controller for tombstone failed", zap.String("name", name), zap.Error(err)) @@ -472,6 +480,21 @@ func (c *ResourceGroupsController) loadOrStoreGroupController(name string, gc *g return tmp.(*groupCostController), loaded } +func (c *ResourceGroupsController) getOrCreateRequestSourceMetricsState(name string) *requestSourceMetricsState { + if state, ok := c.requestSourceStates.Load(name); ok { + return state.(*requestSourceMetricsState) + } + state := newRequestSourceMetricsState(name) + actual, _ := c.requestSourceStates.LoadOrStore(name, state) + return actual.(*requestSourceMetricsState) +} + +func (c *ResourceGroupsController) cleanupRequestSourceMetricsState(name string) { + if v, loaded := c.requestSourceStates.LoadAndDelete(name); loaded { + v.(*requestSourceMetricsState).cleanup() + } +} + // NewResourceGroupNotExistErr returns a new error that indicates the resource group does not exist. // It's exported for testing. func NewResourceGroupNotExistErr(name string) error { @@ -521,7 +544,13 @@ func (c *ResourceGroupsController) tryGetResourceGroupController( return gc, nil } // Initialize the resource group controller. - gc, err = newGroupCostController(group, c.ruConfig, c.lowTokenNotifyChan, c.tokenBucketUpdateChan) + gc, err = newGroupCostController( + group, + c.ruConfig, + c.lowTokenNotifyChan, + c.tokenBucketUpdateChan, + c.getOrCreateRequestSourceMetricsState(name), + ) if err != nil { return nil, err } @@ -553,15 +582,23 @@ func (c *ResourceGroupsController) tombstoneGroupCostController(name string) { log.Warn("[resource group controller] get default resource group meta for tombstone failed", zap.String("name", name), zap.Error(err)) // Directly delete the resource group controller if the default group is not available. + c.cleanupRequestSourceMetricsState(name) c.groupsController.Delete(name) return } // Create a default resource group controller for the tombstone resource group independently. - gc, err := newGroupCostController(defaultGC.getMeta(), c.ruConfig, c.lowTokenNotifyChan, c.tokenBucketUpdateChan) + gc, err := newGroupCostController( + defaultGC.getMeta(), + c.ruConfig, + c.lowTokenNotifyChan, + c.tokenBucketUpdateChan, + c.getOrCreateRequestSourceMetricsState(name), + ) if err != nil { log.Warn("[resource group controller] create default resource group cost controller for tombstone failed", zap.String("name", name), zap.Error(err)) // Directly delete the resource group controller if the default group controller cannot be created. + c.cleanupRequestSourceMetricsState(name) c.groupsController.Delete(name) return } @@ -583,6 +620,7 @@ func (c *ResourceGroupsController) cleanUpResourceGroup() { gc.mu.Unlock() if equalRU(latestConsumption, *gc.run.consumption) { if gc.inactive || gc.tombstone.Load() { + c.cleanupRequestSourceMetricsState(resourceGroupName) c.groupsController.Delete(resourceGroupName) metrics.ResourceGroupStatusGauge.DeleteLabelValues(resourceGroupName, resourceGroupName) return true diff --git a/client/resource_group/controller/group_controller.go b/client/resource_group/controller/group_controller.go index 86628e397ad..7bab211410b 100644 --- a/client/resource_group/controller/group_controller.go +++ b/client/resource_group/controller/group_controller.go @@ -106,9 +106,43 @@ type groupMetricsCollection struct { tokenRequestCounter prometheus.Counter runningKVRequestCounter prometheus.Gauge consumeTokenHistogram prometheus.Observer + sourceState *requestSourceMetricsState } -func initMetrics(oldName, name string) *groupMetricsCollection { +type requestSourceMetrics struct { + rru prometheus.Counter + wru prometheus.Counter +} + +type requestSourceMetricsState struct { + resourceGroupName string + mu sync.RWMutex + closed bool + items map[string]*requestSourceMetrics +} + +func newRequestSourceMetricsState(resourceGroupName string) *requestSourceMetricsState { + return &requestSourceMetricsState{ + resourceGroupName: resourceGroupName, + items: make(map[string]*requestSourceMetrics), + } +} + +func (s *requestSourceMetricsState) cleanup() { + if s == nil { + return + } + s.mu.Lock() + defer s.mu.Unlock() + s.closed = true + for requestSource := range s.items { + metrics.RequestSourceRUCounter.DeleteLabelValues(s.resourceGroupName, requestSource, "rru") + metrics.RequestSourceRUCounter.DeleteLabelValues(s.resourceGroupName, requestSource, "wru") + delete(s.items, requestSource) + } +} + +func initMetrics(oldName, name string, sourceState *requestSourceMetricsState) *groupMetricsCollection { const ( otherType = "others" throttledType = "throttled" @@ -122,6 +156,55 @@ func initMetrics(oldName, name string) *groupMetricsCollection { tokenRequestCounter: metrics.ResourceGroupTokenRequestCounter.WithLabelValues(oldName, name), runningKVRequestCounter: metrics.GroupRunningKVRequestCounter.WithLabelValues(name), consumeTokenHistogram: metrics.TokenConsumedHistogram.WithLabelValues(name), + sourceState: sourceState, + } +} + +func (mc *groupMetricsCollection) getOrCreateRequestSourceMetrics(requestSource string) *requestSourceMetrics { + if mc.sourceState == nil { + return nil + } + mc.sourceState.mu.RLock() + sourceMetrics, ok := mc.sourceState.items[requestSource] + closed := mc.sourceState.closed + mc.sourceState.mu.RUnlock() + if ok { + return sourceMetrics + } + if closed { + return nil + } + + mc.sourceState.mu.Lock() + defer mc.sourceState.mu.Unlock() + if mc.sourceState.closed { + return nil + } + sourceMetrics, ok = mc.sourceState.items[requestSource] + if ok { + return sourceMetrics + } + sourceMetrics = &requestSourceMetrics{ + rru: metrics.RequestSourceRUCounter.WithLabelValues(mc.sourceState.resourceGroupName, requestSource, "rru"), + wru: metrics.RequestSourceRUCounter.WithLabelValues(mc.sourceState.resourceGroupName, requestSource, "wru"), + } + mc.sourceState.items[requestSource] = sourceMetrics + return sourceMetrics +} + +func (mc *groupMetricsCollection) addRequestSourceRU(requestSource string, consumption *rmpb.Consumption) { + if consumption == nil { + return + } + sourceMetrics := mc.getOrCreateRequestSourceMetrics(requestSource) + if sourceMetrics == nil { + return + } + if consumption.RRU > 0 { + sourceMetrics.rru.Add(consumption.RRU) + } + if consumption.WRU > 0 { + sourceMetrics.wru.Add(consumption.WRU) } } @@ -156,6 +239,7 @@ func newGroupCostController( mainCfg *RUConfig, lowRUNotifyChan chan notifyMsg, tokenBucketUpdateChan chan *groupCostController, + sourceState *requestSourceMetricsState, ) (*groupCostController, error) { switch group.Mode { case rmpb.GroupMode_RUMode: @@ -165,7 +249,7 @@ func newGroupCostController( default: return nil, errs.ErrClientResourceGroupConfigUnavailable.FastGenByArgs("not supports the resource type") } - ms := initMetrics(group.Name, group.Name) + ms := initMetrics(group.Name, group.Name, sourceState) gc := &groupCostController{ meta: group, name: group.Name, @@ -577,6 +661,8 @@ func (gc *groupCostController) onRequestWaitImpl( waitDuration += d } + gc.metrics.addRequestSourceRU(info.RequestSource(), delta) + gc.mu.Lock() // Calculate the penalty of the store penalty = &rmpb.Consumption{} @@ -622,6 +708,8 @@ func (gc *groupCostController) onResponseImpl( add(gc.mu.globalCounter, count) gc.mu.Unlock() + gc.metrics.addRequestSourceRU(req.RequestSource(), delta) + return delta, nil } @@ -663,6 +751,8 @@ func (gc *groupCostController) onResponseWaitImpl( add(gc.mu.globalCounter, count) gc.mu.Unlock() + gc.metrics.addRequestSourceRU(req.RequestSource(), delta) + return delta, waitDuration, nil } diff --git a/client/resource_group/controller/group_controller_test.go b/client/resource_group/controller/group_controller_test.go index f916f5e56c4..de75e1a800e 100644 --- a/client/resource_group/controller/group_controller_test.go +++ b/client/resource_group/controller/group_controller_test.go @@ -45,7 +45,7 @@ func createTestGroupCostController(re *require.Assertions) *groupCostController } ch1 := make(chan notifyMsg) ch2 := make(chan *groupCostController) - gc, err := newGroupCostController(group, DefaultRUConfig(), ch1, ch2) + gc, err := newGroupCostController(group, DefaultRUConfig(), ch1, ch2, newRequestSourceMetricsState(group.Name)) re.NoError(err) return gc } @@ -239,7 +239,7 @@ func TestAcquireTokensSignalAwareWait(t *testing.T) { cfg := DefaultRUConfig() cfg.WaitRetryInterval = 5 * time.Second cfg.WaitRetryTimes = 3 - gc, err := newGroupCostController(group, cfg, notifyCh, make(chan *groupCostController, 1)) + gc, err := newGroupCostController(group, cfg, notifyCh, make(chan *groupCostController, 1), newRequestSourceMetricsState(group.Name)) re.NoError(err) // Set fillRate=0 so reservation always fails with InfDuration, diff --git a/client/resource_group/controller/metrics/metrics.go b/client/resource_group/controller/metrics/metrics.go index 01231aceeaf..17aa4a0e2f9 100644 --- a/client/resource_group/controller/metrics/metrics.go +++ b/client/resource_group/controller/metrics/metrics.go @@ -24,8 +24,9 @@ const ( // TODO: remove old label in 8.x resourceGroupNameLabel = "name" newResourceGroupNameLabel = "resource_group" + requestSourceLabel = "request_source" - errType = "type" + typeLabel = "type" ) var ( @@ -49,6 +50,8 @@ var ( LowTokenRequestNotifyCounter *prometheus.CounterVec // TokenConsumedHistogram comments placeholder TokenConsumedHistogram *prometheus.HistogramVec + // RequestSourceRUCounter comments placeholder + RequestSourceRUCounter *prometheus.CounterVec // FailedTokenRequestDuration comments placeholder, WithLabelValues is a heavy operation, define variable to avoid call it every time. FailedTokenRequestDuration prometheus.Observer // SuccessfulTokenRequestDuration comments placeholder, WithLabelValues is a heavy operation, define variable to avoid call it every time. @@ -96,7 +99,7 @@ func initMetrics(constLabels prometheus.Labels) { Name: "fail", Help: "Counter of failed request.", ConstLabels: constLabels, - }, []string{resourceGroupNameLabel, newResourceGroupNameLabel, errType}) + }, []string{resourceGroupNameLabel, newResourceGroupNameLabel, typeLabel}) GroupRunningKVRequestCounter = prometheus.NewGaugeVec( prometheus.GaugeOpts{ @@ -153,6 +156,15 @@ func initMetrics(constLabels prometheus.Labels) { ConstLabels: constLabels, }, []string{newResourceGroupNameLabel}) + RequestSourceRUCounter = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: requestSubsystem, + Name: "ru_total", + Help: "Counter of request RU consumption grouped by resource group and request source.", + ConstLabels: constLabels, + }, []string{newResourceGroupNameLabel, requestSourceLabel, typeLabel}) + // WithLabelValues is a heavy operation, define variable to avoid call it every time. FailedTokenRequestDuration = TokenRequestDuration.WithLabelValues("fail") SuccessfulTokenRequestDuration = TokenRequestDuration.WithLabelValues("success") @@ -171,4 +183,5 @@ func InitAndRegisterMetrics(constLabels prometheus.Labels) { prometheus.MustRegister(ResourceGroupTokenRequestCounter) prometheus.MustRegister(LowTokenRequestNotifyCounter) prometheus.MustRegister(TokenConsumedHistogram) + prometheus.MustRegister(RequestSourceRUCounter) } diff --git a/client/resource_group/controller/model.go b/client/resource_group/controller/model.go index 3fe11286d42..2f4c30887c6 100644 --- a/client/resource_group/controller/model.go +++ b/client/resource_group/controller/model.go @@ -52,6 +52,7 @@ type RequestInfo interface { StoreID() uint64 RequestSize() uint64 AccessLocationType() AccessLocationType + RequestSource() string } // ResponseInfo is the interface of the response information provider. A response should be diff --git a/client/resource_group/controller/request_source_metrics_test.go b/client/resource_group/controller/request_source_metrics_test.go new file mode 100644 index 00000000000..55a52f48128 --- /dev/null +++ b/client/resource_group/controller/request_source_metrics_test.go @@ -0,0 +1,556 @@ +package controller + +import ( + "context" + "testing" + + "github.com/prometheus/client_golang/prometheus" + dto "github.com/prometheus/client_model/go" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" + + rmpb "github.com/pingcap/kvproto/pkg/resource_manager" + + controllerMetrics "github.com/tikv/pd/client/resource_group/controller/metrics" +) + +func counterValue(t *testing.T, metric prometheus.Counter) float64 { + t.Helper() + pb := &dto.Metric{} + require.NoError(t, metric.Write(pb)) + return pb.GetCounter().GetValue() +} + +func collectorMetricCount(collector prometheus.Collector) int { + ch := make(chan prometheus.Metric, 8) + go func() { + collector.Collect(ch) + close(ch) + }() + count := 0 + for range ch { + count++ + } + return count +} + +func requestSourceStateSnapshot(t *testing.T, gc *groupCostController, requestSource string) (*requestSourceMetrics, int) { + t.Helper() + require.NotNil(t, gc.metrics.sourceState) + + gc.metrics.sourceState.mu.RLock() + defer gc.metrics.sourceState.mu.RUnlock() + + return gc.metrics.sourceState.items[requestSource], len(gc.metrics.sourceState.items) +} + +func TestRequestSourceMetricsCachedByResourceGroup(t *testing.T) { + re := require.New(t) + gc := createTestGroupCostController(re) + req := &TestRequestInfo{ + isWrite: true, + writeBytes: 100, + numReplicas: 1, + storeID: 1, + accessType: AccessUnknown, + requestSource: "internal_gc_test", + } + resp := &TestResponseInfo{ + readBytes: 128, + succeed: true, + } + + beforeCount := collectorMetricCount(controllerMetrics.RequestSourceRUCounter) + + reqConsumption, _, _, _, err := gc.onRequestWaitImpl(context.Background(), req) + re.NoError(err) + respConsumption, err := gc.onResponseImpl(req, resp) + re.NoError(err) + re.NotZero(reqConsumption.WRU) + re.NotZero(respConsumption.RRU) + + sourceMetrics, cacheSize := requestSourceStateSnapshot(t, gc, req.requestSource) + + re.Equal(1, cacheSize) + re.NotNil(sourceMetrics) + re.Equal(reqConsumption.WRU, counterValue(t, sourceMetrics.wru)) + re.Equal(respConsumption.RRU, counterValue(t, sourceMetrics.rru)) + re.Equal(beforeCount+2, collectorMetricCount(controllerMetrics.RequestSourceRUCounter)) + + _, _, _, _, err = gc.onRequestWaitImpl(context.Background(), req) + re.NoError(err) + cachedMetrics, cacheSize := requestSourceStateSnapshot(t, gc, req.requestSource) + re.Equal(1, cacheSize) + re.Same(sourceMetrics, cachedMetrics) + + controllerMetrics.RequestSourceRUCounter.DeleteLabelValues(gc.name, req.requestSource, "rru") + controllerMetrics.RequestSourceRUCounter.DeleteLabelValues(gc.name, req.requestSource, "wru") +} + +func TestCleanupResourceGroupRemovesRequestSourceMetrics(t *testing.T) { + re := require.New(t) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + mockProvider := newMockResourceGroupProvider() + controller, err := NewResourceGroupController(ctx, 1, mockProvider, nil, 0) + re.NoError(err) + + group := &rmpb.ResourceGroup{ + Name: "request-source-cleanup", + Mode: rmpb.GroupMode_RUMode, + RUSettings: &rmpb.GroupRequestUnitSettings{ + RU: &rmpb.TokenBucket{ + Settings: &rmpb.TokenLimitSettings{FillRate: 1000000}, + }, + }, + } + mockProvider.On("GetResourceGroup", mock.Anything, group.Name, mock.Anything).Return(group, nil) + + gc, err := controller.tryGetResourceGroupController(ctx, group.Name, false) + re.NoError(err) + + req := &TestRequestInfo{ + isWrite: true, + writeBytes: 64, + numReplicas: 1, + storeID: 1, + requestSource: "internal_gc_cleanup", + } + resp := &TestResponseInfo{readBytes: 64, succeed: true} + beforeCount := collectorMetricCount(controllerMetrics.RequestSourceRUCounter) + + _, _, _, _, err = gc.onRequestWaitImpl(context.Background(), req) + re.NoError(err) + _, err = gc.onResponseImpl(req, resp) + re.NoError(err) + + gc.mu.Lock() + *gc.run.consumption = *gc.mu.consumption + gc.mu.Unlock() + gc.inactive = true + + re.Greater(collectorMetricCount(controllerMetrics.RequestSourceRUCounter), beforeCount) + + controller.cleanUpResourceGroup() + + _, ok := controller.loadGroupController(group.Name) + re.False(ok) + _, cacheSize := requestSourceStateSnapshot(t, gc, req.requestSource) + re.Zero(cacheSize) + re.Equal(beforeCount, collectorMetricCount(controllerMetrics.RequestSourceRUCounter)) +} + +func TestCleanupDoesNotReexportExistingCachedHandle(t *testing.T) { + re := require.New(t) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + mockProvider := newMockResourceGroupProvider() + controller, err := NewResourceGroupController(ctx, 1, mockProvider, nil, 0) + re.NoError(err) + + group := &rmpb.ResourceGroup{ + Name: "request-source-cleanup-cached-handle", + Mode: rmpb.GroupMode_RUMode, + RUSettings: &rmpb.GroupRequestUnitSettings{ + RU: &rmpb.TokenBucket{ + Settings: &rmpb.TokenLimitSettings{FillRate: 1000000}, + }, + }, + } + mockProvider.On("GetResourceGroup", mock.Anything, group.Name, mock.Anything).Return(group, nil) + + gc, err := controller.tryGetResourceGroupController(ctx, group.Name, false) + re.NoError(err) + + req := &TestRequestInfo{ + isWrite: true, + writeBytes: 64, + numReplicas: 1, + storeID: 1, + requestSource: "internal_gc_cleanup_cached_handle", + } + resp := &TestResponseInfo{readBytes: 64, succeed: true} + beforeCount := collectorMetricCount(controllerMetrics.RequestSourceRUCounter) + + _, _, _, _, err = gc.onRequestWaitImpl(context.Background(), req) + re.NoError(err) + _, err = gc.onResponseImpl(req, resp) + re.NoError(err) + + sourceMetrics, cacheSize := requestSourceStateSnapshot(t, gc, req.requestSource) + re.NotNil(sourceMetrics) + re.Equal(1, cacheSize) + re.Equal(beforeCount+2, collectorMetricCount(controllerMetrics.RequestSourceRUCounter)) + + gc.mu.Lock() + *gc.run.consumption = *gc.mu.consumption + gc.mu.Unlock() + gc.inactive = true + + controller.cleanUpResourceGroup() + + _, ok := controller.loadGroupController(group.Name) + re.False(ok) + re.Equal(beforeCount, collectorMetricCount(controllerMetrics.RequestSourceRUCounter)) + + sourceMetrics.rru.Add(1) + sourceMetrics.wru.Add(1) + + re.Equal(beforeCount, collectorMetricCount(controllerMetrics.RequestSourceRUCounter)) +} + +func TestCleanupPreventsRecreateRequestSourceMetrics(t *testing.T) { + re := require.New(t) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + mockProvider := newMockResourceGroupProvider() + controller, err := NewResourceGroupController(ctx, 1, mockProvider, nil, 0) + re.NoError(err) + + group := &rmpb.ResourceGroup{ + Name: "request-source-cleanup-prevent-recreate", + Mode: rmpb.GroupMode_RUMode, + RUSettings: &rmpb.GroupRequestUnitSettings{ + RU: &rmpb.TokenBucket{ + Settings: &rmpb.TokenLimitSettings{FillRate: 1000000}, + }, + }, + } + mockProvider.On("GetResourceGroup", mock.Anything, group.Name, mock.Anything).Return(group, nil) + + gc, err := controller.tryGetResourceGroupController(ctx, group.Name, false) + re.NoError(err) + + req := &TestRequestInfo{ + isWrite: true, + writeBytes: 64, + numReplicas: 1, + storeID: 1, + requestSource: "internal_gc_cleanup_prevent_recreate", + } + resp := &TestResponseInfo{readBytes: 64, succeed: true} + beforeCount := collectorMetricCount(controllerMetrics.RequestSourceRUCounter) + + _, _, _, _, err = gc.onRequestWaitImpl(context.Background(), req) + re.NoError(err) + _, err = gc.onResponseImpl(req, resp) + re.NoError(err) + re.Equal(beforeCount+2, collectorMetricCount(controllerMetrics.RequestSourceRUCounter)) + + gc.mu.Lock() + *gc.run.consumption = *gc.mu.consumption + gc.mu.Unlock() + gc.inactive = true + + controller.cleanUpResourceGroup() + + _, ok := controller.loadGroupController(group.Name) + re.False(ok) + _, cacheSize := requestSourceStateSnapshot(t, gc, req.requestSource) + re.Zero(cacheSize) + re.Equal(beforeCount, collectorMetricCount(controllerMetrics.RequestSourceRUCounter)) + + gc.metrics.addRequestSourceRU(req.requestSource, &rmpb.Consumption{RRU: 1, WRU: 1}) + + _, cacheSize = requestSourceStateSnapshot(t, gc, req.requestSource) + re.Zero(cacheSize) + re.Equal(beforeCount, collectorMetricCount(controllerMetrics.RequestSourceRUCounter)) +} + +func TestTombstoneCleanupRemovesExistingRequestSourceMetrics(t *testing.T) { + re := require.New(t) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + mockProvider := newMockResourceGroupProvider() + controller, err := NewResourceGroupController(ctx, 1, mockProvider, nil, 0) + re.NoError(err) + + group := &rmpb.ResourceGroup{ + Name: "request-source-tombstone-cleanup", + Mode: rmpb.GroupMode_RUMode, + RUSettings: &rmpb.GroupRequestUnitSettings{ + RU: &rmpb.TokenBucket{ + Settings: &rmpb.TokenLimitSettings{FillRate: 1000000}, + }, + }, + } + defaultGroup := &rmpb.ResourceGroup{ + Name: defaultResourceGroupName, + Mode: rmpb.GroupMode_RUMode, + RUSettings: &rmpb.GroupRequestUnitSettings{ + RU: &rmpb.TokenBucket{ + Settings: &rmpb.TokenLimitSettings{FillRate: 1000000}, + }, + }, + } + mockProvider.On("GetResourceGroup", mock.Anything, group.Name, mock.Anything).Return(group, nil) + mockProvider.On("GetResourceGroup", mock.Anything, defaultResourceGroupName, mock.Anything).Return(defaultGroup, nil) + + gc, err := controller.tryGetResourceGroupController(ctx, group.Name, false) + re.NoError(err) + + req := &TestRequestInfo{ + isWrite: true, + writeBytes: 64, + numReplicas: 1, + storeID: 1, + requestSource: "internal_gc_tombstone_cleanup", + } + resp := &TestResponseInfo{readBytes: 64, succeed: true} + beforeCount := collectorMetricCount(controllerMetrics.RequestSourceRUCounter) + + _, _, _, _, err = gc.onRequestWaitImpl(context.Background(), req) + re.NoError(err) + _, err = gc.onResponseImpl(req, resp) + re.NoError(err) + re.Equal(beforeCount+2, collectorMetricCount(controllerMetrics.RequestSourceRUCounter)) + + controller.tombstoneGroupCostController(group.Name) + tombstoneGC, err := controller.tryGetResourceGroupController(ctx, group.Name, true) + re.NoError(err) + re.True(tombstoneGC.tombstone.Load()) + + controller.cleanUpResourceGroup() + + _, ok := controller.loadGroupController(group.Name) + re.False(ok) + re.Equal(beforeCount, collectorMetricCount(controllerMetrics.RequestSourceRUCounter)) +} + +func TestRevivedResourceGroupCleanupRemovesExistingRequestSourceMetrics(t *testing.T) { + re := require.New(t) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + mockProvider := newMockResourceGroupProvider() + controller, err := NewResourceGroupController(ctx, 1, mockProvider, nil, 0) + re.NoError(err) + + group := &rmpb.ResourceGroup{ + Name: "request-source-revive-cleanup", + Mode: rmpb.GroupMode_RUMode, + RUSettings: &rmpb.GroupRequestUnitSettings{ + RU: &rmpb.TokenBucket{ + Settings: &rmpb.TokenLimitSettings{FillRate: 1000000}, + }, + }, + } + defaultGroup := &rmpb.ResourceGroup{ + Name: defaultResourceGroupName, + Mode: rmpb.GroupMode_RUMode, + RUSettings: &rmpb.GroupRequestUnitSettings{ + RU: &rmpb.TokenBucket{ + Settings: &rmpb.TokenLimitSettings{FillRate: 1000000}, + }, + }, + } + mockProvider.On("GetResourceGroup", mock.Anything, group.Name, mock.Anything).Return(group, nil) + mockProvider.On("GetResourceGroup", mock.Anything, defaultResourceGroupName, mock.Anything).Return(defaultGroup, nil) + + gc, err := controller.tryGetResourceGroupController(ctx, group.Name, false) + re.NoError(err) + + req := &TestRequestInfo{ + isWrite: true, + writeBytes: 64, + numReplicas: 1, + storeID: 1, + requestSource: "internal_gc_revive_cleanup", + } + resp := &TestResponseInfo{readBytes: 64, succeed: true} + beforeCount := collectorMetricCount(controllerMetrics.RequestSourceRUCounter) + + _, _, _, _, err = gc.onRequestWaitImpl(context.Background(), req) + re.NoError(err) + _, err = gc.onResponseImpl(req, resp) + re.NoError(err) + re.Equal(beforeCount+2, collectorMetricCount(controllerMetrics.RequestSourceRUCounter)) + + controller.tombstoneGroupCostController(group.Name) + tombstoneGC, err := controller.tryGetResourceGroupController(ctx, group.Name, true) + re.NoError(err) + re.True(tombstoneGC.tombstone.Load()) + + revivedGC, err := newGroupCostController( + group, + controller.ruConfig, + controller.lowTokenNotifyChan, + controller.tokenBucketUpdateChan, + controller.getOrCreateRequestSourceMetricsState(group.Name), + ) + re.NoError(err) + re.True(controller.groupsController.CompareAndSwap(group.Name, tombstoneGC, revivedGC)) + + revivedGC.mu.Lock() + *revivedGC.run.consumption = *revivedGC.mu.consumption + revivedGC.mu.Unlock() + revivedGC.inactive = true + + controller.cleanUpResourceGroup() + + _, ok := controller.loadGroupController(group.Name) + re.False(ok) + re.Equal(beforeCount, collectorMetricCount(controllerMetrics.RequestSourceRUCounter)) +} + +// TestGetOrCreateAfterCleanupReturnsFreshState verifies the fix for the race +// rleungx identified: after cleanupRequestSourceMetricsState runs, +// getOrCreateRequestSourceMetricsState must return a fresh, non-closed state +// so that a newly created gc can record metrics normally. +func TestGetOrCreateAfterCleanupReturnsFreshState(t *testing.T) { + re := require.New(t) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + mockProvider := newMockResourceGroupProvider() + controller, err := NewResourceGroupController(ctx, 1, mockProvider, nil, 0) + re.NoError(err) + + group := &rmpb.ResourceGroup{ + Name: "request-source-create-after-cleanup", + Mode: rmpb.GroupMode_RUMode, + RUSettings: &rmpb.GroupRequestUnitSettings{ + RU: &rmpb.TokenBucket{ + Settings: &rmpb.TokenLimitSettings{FillRate: 1000000}, + }, + }, + } + mockProvider.On("GetResourceGroup", mock.Anything, group.Name, mock.Anything).Return(group, nil) + + // Phase 1: create a gc, record some metrics, then clean up. + gc, err := controller.tryGetResourceGroupController(ctx, group.Name, false) + re.NoError(err) + + req := &TestRequestInfo{ + isWrite: true, + writeBytes: 64, + numReplicas: 1, + storeID: 1, + requestSource: "internal_create_after_cleanup", + } + resp := &TestResponseInfo{readBytes: 64, succeed: true} + + _, _, _, _, err = gc.onRequestWaitImpl(context.Background(), req) + re.NoError(err) + _, err = gc.onResponseImpl(req, resp) + re.NoError(err) + + gc.mu.Lock() + *gc.run.consumption = *gc.mu.consumption + gc.mu.Unlock() + gc.inactive = true + controller.cleanUpResourceGroup() + + _, loaded := controller.loadGroupController(group.Name) + re.False(loaded) + + // Phase 2: getOrCreateRequestSourceMetricsState must return a fresh, + // non-closed state after cleanup has removed the old one. + state := controller.getOrCreateRequestSourceMetricsState(group.Name) + re.NotNil(state) + + state.mu.RLock() + re.False(state.closed) + state.mu.RUnlock() + + // Phase 3: create a new gc with this state and verify it can record metrics. + newGC, err := newGroupCostController( + group, + controller.ruConfig, + controller.lowTokenNotifyChan, + controller.tokenBucketUpdateChan, + state, + ) + re.NoError(err) + + beforeCount := collectorMetricCount(controllerMetrics.RequestSourceRUCounter) + _, _, _, _, err = newGC.onRequestWaitImpl(context.Background(), req) + re.NoError(err) + _, err = newGC.onResponseImpl(req, resp) + re.NoError(err) + + sourceMetrics, cacheSize := requestSourceStateSnapshot(t, newGC, req.requestSource) + re.Equal(1, cacheSize) + re.NotNil(sourceMetrics) + re.Greater(counterValue(t, sourceMetrics.wru), float64(0)) + re.Greater(counterValue(t, sourceMetrics.rru), float64(0)) + re.Equal(beforeCount+2, collectorMetricCount(controllerMetrics.RequestSourceRUCounter)) + + // Cleanup for this test. + state.cleanup() +} + +// TestCleanupThenRecreateViaFullPath exercises the full end-to-end path: +// cleanup a group, then tryGetResourceGroupController re-creates it, and +// the new gc records metrics successfully. +func TestCleanupThenRecreateViaFullPath(t *testing.T) { + re := require.New(t) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + mockProvider := newMockResourceGroupProvider() + controller, err := NewResourceGroupController(ctx, 1, mockProvider, nil, 0) + re.NoError(err) + + group := &rmpb.ResourceGroup{ + Name: "request-source-full-recreate", + Mode: rmpb.GroupMode_RUMode, + RUSettings: &rmpb.GroupRequestUnitSettings{ + RU: &rmpb.TokenBucket{ + Settings: &rmpb.TokenLimitSettings{FillRate: 1000000}, + }, + }, + } + mockProvider.On("GetResourceGroup", mock.Anything, group.Name, mock.Anything).Return(group, nil) + + req := &TestRequestInfo{ + isWrite: true, + writeBytes: 64, + numReplicas: 1, + storeID: 1, + requestSource: "internal_full_recreate", + } + resp := &TestResponseInfo{readBytes: 64, succeed: true} + + // Create, use, and clean up the group. + gc, err := controller.tryGetResourceGroupController(ctx, group.Name, false) + re.NoError(err) + _, _, _, _, err = gc.onRequestWaitImpl(context.Background(), req) + re.NoError(err) + _, err = gc.onResponseImpl(req, resp) + re.NoError(err) + + gc.mu.Lock() + *gc.run.consumption = *gc.mu.consumption + gc.mu.Unlock() + gc.inactive = true + controller.cleanUpResourceGroup() + + _, loaded := controller.loadGroupController(group.Name) + re.False(loaded) + + // Re-create through the normal path (simulates a new request arriving + // after the group was cleaned up). + gc2, err := controller.tryGetResourceGroupController(ctx, group.Name, false) + re.NoError(err) + + beforeCount := collectorMetricCount(controllerMetrics.RequestSourceRUCounter) + _, _, _, _, err = gc2.onRequestWaitImpl(context.Background(), req) + re.NoError(err) + _, err = gc2.onResponseImpl(req, resp) + re.NoError(err) + + sourceMetrics, cacheSize := requestSourceStateSnapshot(t, gc2, req.requestSource) + re.Equal(1, cacheSize) + re.NotNil(sourceMetrics) + re.Greater(counterValue(t, sourceMetrics.wru), float64(0)) + re.Greater(counterValue(t, sourceMetrics.rru), float64(0)) + re.Equal(beforeCount+2, collectorMetricCount(controllerMetrics.RequestSourceRUCounter)) + + // Cleanup for this test. + gc2.metrics.sourceState.cleanup() +} diff --git a/client/resource_group/controller/testutil.go b/client/resource_group/controller/testutil.go index a055b32fa6f..d09f856f577 100644 --- a/client/resource_group/controller/testutil.go +++ b/client/resource_group/controller/testutil.go @@ -22,11 +22,12 @@ import "time" // TestRequestInfo is used to test the request info interface. type TestRequestInfo struct { - isWrite bool - writeBytes uint64 - numReplicas int64 - storeID uint64 - accessType AccessLocationType + isWrite bool + writeBytes uint64 + numReplicas int64 + storeID uint64 + accessType AccessLocationType + requestSource string } // NewTestRequestInfo creates a new TestRequestInfo. @@ -70,6 +71,11 @@ func (tri *TestRequestInfo) AccessLocationType() AccessLocationType { return tri.accessType } +// RequestSource implements the RequestInfo interface. +func (tri *TestRequestInfo) RequestSource() string { + return tri.requestSource +} + // TestResponseInfo is used to test the response info interface. type TestResponseInfo struct { readBytes uint64