From 64761de7597cec08cb7597faab72478c35f6c173 Mon Sep 17 00:00:00 2001 From: Ti Chi Robot Date: Wed, 22 Mar 2023 16:04:42 +0800 Subject: [PATCH] resource_manager/client: introduce RU and Request metrics (#6170) (#6197) close tikv/pd#6136, ref tikv/pd#6170 Signed-off-by: Cabinfever_B Signed-off-by: lhy1024 Co-authored-by: Yongbo Jiang Co-authored-by: Ti Chi Robot Co-authored-by: lhy1024 --- .../resource_group/controller/controller.go | 40 +++++++-- .../controller/controller_test.go | 2 +- client/resource_group/controller/limiter.go | 12 +-- .../resource_group/controller/limiter_test.go | 5 +- client/resource_group/controller/metrics.go | 82 +++++++++++++++++++ 5 files changed, 125 insertions(+), 16 deletions(-) create mode 100644 client/resource_group/controller/metrics.go diff --git a/client/resource_group/controller/controller.go b/client/resource_group/controller/controller.go index c2c64dab798..2b721a159bb 100755 --- a/client/resource_group/controller/controller.go +++ b/client/resource_group/controller/controller.go @@ -26,6 +26,7 @@ import ( "github.com/pingcap/failpoint" rmpb "github.com/pingcap/kvproto/pkg/resource_manager" "github.com/pingcap/log" + "github.com/prometheus/client_golang/prometheus" pd "github.com/tikv/pd/client" "github.com/tikv/pd/client/errs" "go.uber.org/zap" @@ -192,6 +193,7 @@ func (c *ResourceGroupsController) Start(ctx context.Context) { for { select { case <-c.loopCtx.Done(): + resourceGroupStatusGauge.Reset() return case <-c.responseDeadlineCh: c.run.inDegradedMode = true @@ -257,7 +259,8 @@ func (c *ResourceGroupsController) tryGetResourceGroup(ctx context.Context, name return gc, nil } // Initialize the resource group controller. - gc, err := newGroupCostController(group, c.config, c.lowTokenNotifyChan, c.tokenBucketUpdateChan) + gc, err := newGroupCostController(group, c.config, c.lowTokenNotifyChan, c.tokenBucketUpdateChan, + successfulRequestDuration.WithLabelValues(group.Name), failedRequestCounter.WithLabelValues(group.Name), resourceGroupTokenRequestCounter.WithLabelValues(group.Name)) if err != nil { return nil, err } @@ -266,6 +269,7 @@ func (c *ResourceGroupsController) tryGetResourceGroup(ctx context.Context, name // Check again to prevent initializing the same resource group concurrently. tmp, loaded := c.groupsController.LoadOrStore(group.GetName(), gc) if !loaded { + resourceGroupStatusGauge.WithLabelValues(name).Set(1) log.Info("[resource group controller] create resource group cost controller", zap.String("name", group.GetName())) } return tmp.(*groupCostController), nil @@ -284,6 +288,7 @@ func (c *ResourceGroupsController) cleanUpResourceGroup(ctx context.Context) err resourceGroupName := key.(string) if _, ok := latestGroups[resourceGroupName]; !ok { c.groupsController.Delete(key) + resourceGroupStatusGauge.DeleteLabelValues(resourceGroupName) return true } @@ -295,6 +300,7 @@ func (c *ResourceGroupsController) cleanUpResourceGroup(ctx context.Context) err if equalRU(latestConsumption, *gc.run.consumption) { if gc.tombstone { c.groupsController.Delete(resourceGroupName) + resourceGroupStatusGauge.DeleteLabelValues(resourceGroupName) return true } gc.tombstone = true @@ -361,6 +367,7 @@ func (c *ResourceGroupsController) collectTokenBucketRequests(ctx context.Contex if request != nil { c.run.currentRequests = append(c.run.currentRequests, request) } + gc.tokenRequestCounter.Inc() return true }) if len(c.run.currentRequests) > 0 { @@ -381,14 +388,18 @@ func (c *ResourceGroupsController) sendTokenBucketRequests(ctx context.Context, go func() { log.Debug("[resource group controller] send token bucket request", zap.Time("now", now), zap.Any("req", req.Requests), zap.String("source", source)) resp, err := c.provider.AcquireTokenBuckets(ctx, req) + latency := time.Since(now) if err != nil { // Don't log any errors caused by the stopper canceling the context. if !errors.ErrorEqual(err, context.Canceled) { log.L().Sugar().Infof("[resource group controller] token bucket rpc error: %v", err) } resp = nil + failedTokenRequestDuration.Observe(latency.Seconds()) + } else { + successfulTokenRequestDuration.Observe(latency.Seconds()) } - log.Debug("[resource group controller] token bucket response", zap.Time("now", time.Now()), zap.Any("resp", resp), zap.String("source", source), zap.Duration("latency", time.Since(now))) + log.Debug("[resource group controller] token bucket response", zap.Time("now", time.Now()), zap.Any("resp", resp), zap.String("source", source), zap.Duration("latency", latency)) c.tokenResponseChan <- resp }() } @@ -399,6 +410,7 @@ func (c *ResourceGroupsController) OnRequestWait( ) (*rmpb.Consumption, error) { gc, err := c.tryGetResourceGroup(ctx, resourceGroupName) if err != nil { + failedRequestCounter.WithLabelValues(resourceGroupName).Inc() return nil, err } return gc.onRequestWait(ctx, info) @@ -424,6 +436,10 @@ type groupCostController struct { handleRespFunc func(*rmpb.TokenBucketResponse) + successfulRequestDuration prometheus.Observer + failedRequestCounter prometheus.Counter + tokenRequestCounter prometheus.Counter + mu struct { sync.Mutex consumption *rmpb.Consumption @@ -498,6 +514,8 @@ func newGroupCostController( mainCfg *Config, lowRUNotifyChan chan struct{}, tokenBucketUpdateChan chan *groupCostController, + successfulRequestDuration prometheus.Observer, + failedRequestCounter, tokenRequestCounter prometheus.Counter, ) (*groupCostController, error) { switch group.Mode { case rmpb.GroupMode_RUMode: @@ -509,8 +527,11 @@ func newGroupCostController( } gc := &groupCostController{ - ResourceGroup: group, - mainCfg: mainCfg, + ResourceGroup: group, + mainCfg: mainCfg, + successfulRequestDuration: successfulRequestDuration, + failedRequestCounter: failedRequestCounter, + tokenRequestCounter: tokenRequestCounter, calculators: []ResourceCalculator{ newKVCalculator(mainCfg), newSQLCalculator(mainCfg), @@ -955,8 +976,10 @@ func (gc *groupCostController) onRequestWait( if !gc.burstable.Load() { var err error now := time.Now() + var i int + var d time.Duration retryLoop: - for i := 0; i < maxRetry; i++ { + for i = 0; i < maxRetry; i++ { switch gc.mode { case rmpb.GroupMode_RawMode: res := make([]*Reservation, 0, len(requestResourceLimitTypeList)) @@ -965,7 +988,7 @@ func (gc *groupCostController) onRequestWait( res = append(res, counter.limiter.Reserve(ctx, gc.mainCfg.maxWaitDuration, now, v)) } } - if err = WaitReservations(ctx, now, res); err == nil { + if d, err = WaitReservations(ctx, now, res); err == nil { break retryLoop } case rmpb.GroupMode_RUMode: @@ -975,14 +998,17 @@ func (gc *groupCostController) onRequestWait( res = append(res, counter.limiter.Reserve(ctx, gc.mainCfg.maxWaitDuration, now, v)) } } - if err = WaitReservations(ctx, now, res); err == nil { + if d, err = WaitReservations(ctx, now, res); err == nil { break retryLoop } } time.Sleep(100 * time.Millisecond) } if err != nil { + gc.failedRequestCounter.Inc() return nil, err + } else { + gc.successfulRequestDuration.Observe(d.Seconds()) } } gc.mu.Lock() diff --git a/client/resource_group/controller/controller_test.go b/client/resource_group/controller/controller_test.go index 5bde4bca2c1..177352e475f 100644 --- a/client/resource_group/controller/controller_test.go +++ b/client/resource_group/controller/controller_test.go @@ -42,7 +42,7 @@ func createTestGroupCostController(re *require.Assertions) *groupCostController } ch1 := make(chan struct{}) ch2 := make(chan *groupCostController) - gc, err := newGroupCostController(group, DefaultConfig(), ch1, ch2) + gc, err := newGroupCostController(group, DefaultConfig(), ch1, ch2, successfulRequestDuration.WithLabelValues(group.Name), failedRequestCounter.WithLabelValues(group.Name), resourceGroupTokenRequestCounter.WithLabelValues(group.Name)) re.NoError(err) return gc } diff --git a/client/resource_group/controller/limiter.go b/client/resource_group/controller/limiter.go index f0f12dde10b..fe6e0a8c69c 100644 --- a/client/resource_group/controller/limiter.go +++ b/client/resource_group/controller/limiter.go @@ -409,9 +409,9 @@ func (limit Limit) tokensFromDuration(d time.Duration) float64 { // WaitReservations is used to process a series of reservations // so that all limiter tokens are returned if one reservation fails -func WaitReservations(ctx context.Context, now time.Time, reservations []*Reservation) error { +func WaitReservations(ctx context.Context, now time.Time, reservations []*Reservation) (time.Duration, error) { if len(reservations) == 0 { - return nil + return 0, nil } cancel := func() { for _, res := range reservations { @@ -422,7 +422,7 @@ func WaitReservations(ctx context.Context, now time.Time, reservations []*Reserv for _, res := range reservations { if !res.ok { cancel() - return errs.ErrClientResourceGroupThrottled + return 0, errs.ErrClientResourceGroupThrottled } delay := res.DelayFrom(now) if delay > longestDelayDuration { @@ -430,7 +430,7 @@ func WaitReservations(ctx context.Context, now time.Time, reservations []*Reserv } } if longestDelayDuration <= 0 { - return nil + return 0, nil } t := time.NewTimer(longestDelayDuration) defer t.Stop() @@ -438,11 +438,11 @@ func WaitReservations(ctx context.Context, now time.Time, reservations []*Reserv select { case <-t.C: // We can proceed. - return nil + return longestDelayDuration, nil case <-ctx.Done(): // Context was canceled before we could proceed. Cancel the // reservation, which may permit other events to proceed sooner. cancel() - return ctx.Err() + return 0, ctx.Err() } } diff --git a/client/resource_group/controller/limiter_test.go b/client/resource_group/controller/limiter_test.go index c3620a9addd..2e05f66ab67 100644 --- a/client/resource_group/controller/limiter_test.go +++ b/client/resource_group/controller/limiter_test.go @@ -152,7 +152,8 @@ func TestCancel(t *testing.T) { r2 := lim2.Reserve(ctx1, InfDuration, t1, 5) checkTokens(re, lim1, t2, 7) checkTokens(re, lim2, t2, 2) - err := WaitReservations(ctx, t2, []*Reservation{r1, r2}) + d, err := WaitReservations(ctx, t2, []*Reservation{r1, r2}) + re.Equal(d, time.Duration(0)) re.Error(err) checkTokens(re, lim1, t3, 13) checkTokens(re, lim2, t3, 3) @@ -166,7 +167,7 @@ func TestCancel(t *testing.T) { var wg sync.WaitGroup wg.Add(1) go func() { - err := WaitReservations(ctx2, t3, []*Reservation{r1, r2}) + _, err := WaitReservations(ctx2, t3, []*Reservation{r1, r2}) re.Error(err) wg.Done() }() diff --git a/client/resource_group/controller/metrics.go b/client/resource_group/controller/metrics.go new file mode 100644 index 00000000000..9af0aeb15d4 --- /dev/null +++ b/client/resource_group/controller/metrics.go @@ -0,0 +1,82 @@ +// Copyright 2023 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package controller + +import "github.com/prometheus/client_golang/prometheus" + +const ( + namespace = "resource_manager_client" + requestSubsystem = "request" + tokenRequestSubsystem = "token_request" + + resourceGroupNameLabel = "name" +) + +var ( + resourceGroupStatusGauge = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: "resource_group", + Name: "status", + Help: "Status of the resource group.", + }, []string{resourceGroupNameLabel}) + + successfulRequestDuration = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: namespace, + Subsystem: requestSubsystem, + Name: "success", + Buckets: prometheus.ExponentialBuckets(0.001, 4, 8), // 0.001 ~ 40.96 + Help: "Bucketed histogram of wait duration of successful request.", + }, []string{resourceGroupNameLabel}) + + failedRequestCounter = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: requestSubsystem, + Name: "fail", + Help: "Counter of failed request.", + }, []string{resourceGroupNameLabel}) + + tokenRequestDuration = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: namespace, + Subsystem: tokenRequestSubsystem, + Name: "duration", + Help: "Bucketed histogram of latency(s) of token request.", + }, []string{"type"}) + + resourceGroupTokenRequestCounter = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: tokenRequestSubsystem, + Name: "resource_group", + Help: "Counter of token request by every resource group.", + }, []string{resourceGroupNameLabel}) +) + +var ( + // WithLabelValues is a heavy operation, define variable to avoid call it every time. + failedTokenRequestDuration = tokenRequestDuration.WithLabelValues("fail") + successfulTokenRequestDuration = tokenRequestDuration.WithLabelValues("success") +) + +func init() { + prometheus.MustRegister(resourceGroupStatusGauge) + prometheus.MustRegister(successfulRequestDuration) + prometheus.MustRegister(failedRequestCounter) + prometheus.MustRegister(tokenRequestDuration) + prometheus.MustRegister(resourceGroupTokenRequestCounter) +}