New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
resource_manager/client: introduce RU and Request metrics #6170
Changes from 3 commits
b7dae8f
41ce95b
07820e8
bba7c35
efd0f44
1f64f33
a62c7cd
0433e92
964a37c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -26,15 +26,17 @@ import ( | |
"github.com/pingcap/failpoint" | ||
rmpb "github.com/pingcap/kvproto/pkg/resource_manager" | ||
"github.com/pingcap/log" | ||
"github.com/prometheus/client_golang/prometheus" | ||
pd "github.com/tikv/pd/client" | ||
"github.com/tikv/pd/client/errs" | ||
"go.uber.org/zap" | ||
) | ||
|
||
const ( | ||
requestUnitConfigPath = "resource_group/ru_config" | ||
maxRetry = 3 | ||
maxNotificationChanLen = 200 | ||
requestUnitConfigPath = "resource_group/ru_config" | ||
maxRetry = 3 | ||
maxNotificationChanLen = 200 | ||
maxConsumptionMetricsChanLen = 200 | ||
) | ||
|
||
type selectType int | ||
|
@@ -104,6 +106,8 @@ type ResourceGroupsController struct { | |
// currentRequests is used to record the request and resource group. | ||
// Currently, we don't do multiple `AcquireTokenBuckets`` at the same time, so there are no concurrency problems with `currentRequests`. | ||
currentRequests []*rmpb.TokenBucketRequest | ||
|
||
consumptionDispatcher chan *rmpb.TokenBucketRequest | ||
} | ||
|
||
// NewResourceGroupController returns a new ResourceGroupsController which impls ResourceGroupKVInterceptor | ||
|
@@ -129,6 +133,7 @@ func NewResourceGroupController( | |
lowTokenNotifyChan: make(chan struct{}, 1), | ||
tokenResponseChan: make(chan []*rmpb.TokenBucketResponse, 1), | ||
tokenBucketUpdateChan: make(chan *groupCostController, maxNotificationChanLen), | ||
consumptionDispatcher: make(chan *rmpb.TokenBucketRequest, maxConsumptionMetricsChanLen), | ||
} | ||
for _, opt := range opts { | ||
opt(controller) | ||
|
@@ -181,6 +186,7 @@ func (c *ResourceGroupsController) Start(ctx context.Context) { | |
for { | ||
select { | ||
case <-c.loopCtx.Done(): | ||
c.resetMetrics() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we need to reset There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, I do it in |
||
return | ||
case resp := <-c.tokenResponseChan: | ||
if resp != nil { | ||
|
@@ -210,6 +216,7 @@ func (c *ResourceGroupsController) Start(ctx context.Context) { | |
} | ||
} | ||
}() | ||
go c.backgroundMetricsFlush(ctx) | ||
} | ||
|
||
// Stop stops ResourceGroupController service. | ||
|
@@ -221,6 +228,38 @@ func (c *ResourceGroupsController) Stop() error { | |
return nil | ||
} | ||
|
||
// Receive the consumption and flush it to the metrics. | ||
func (c *ResourceGroupsController) backgroundMetricsFlush(ctx context.Context) { | ||
for { | ||
select { | ||
case <-ctx.Done(): | ||
return | ||
case req := <-c.consumptionDispatcher: | ||
consumption := req.GetConsumptionSinceLastRequest() | ||
if consumption == nil { | ||
continue | ||
} | ||
var ( | ||
name = req.GetResourceGroupName() | ||
rruMetrics = readRequestUnitCost.WithLabelValues(name) | ||
wruMetrics = writeRequestUnitCost.WithLabelValues(name) | ||
) | ||
resourceGroupTokenRequestCounter.WithLabelValues(name).Inc() | ||
// RU info. | ||
if consumption.RRU != 0 { | ||
rruMetrics.Observe(consumption.RRU) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Does it's duplicated with the server side? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @JmPotato @BornChanger Do u think it is necessary to count RU consumption of tidb instances? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. may do not need in current. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I didn't really think about the need to view RU consumption at the instance level, so not keeping a record is okay for me. |
||
} | ||
if consumption.WRU != 0 { | ||
wruMetrics.Observe(consumption.WRU) | ||
} | ||
} | ||
} | ||
} | ||
|
||
func (c *ResourceGroupsController) resetMetrics() { | ||
resourceGroupStatusGauge.Reset() | ||
} | ||
JmPotato marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
// tryGetResourceGroup will try to get the resource group controller from local cache first, | ||
// if the local cache misses, it will then call gRPC to fetch the resource group info from server. | ||
func (c *ResourceGroupsController) tryGetResourceGroup(ctx context.Context, name string) (*groupCostController, error) { | ||
|
@@ -239,7 +278,7 @@ func (c *ResourceGroupsController) tryGetResourceGroup(ctx context.Context, name | |
return gc, nil | ||
} | ||
// Initialize the resource group controller. | ||
gc, err := newGroupCostController(group, c.config, c.lowTokenNotifyChan, c.tokenBucketUpdateChan) | ||
gc, err := newGroupCostController(group, c.config, c.lowTokenNotifyChan, c.tokenBucketUpdateChan, successfulRequestDuration.WithLabelValues(group.Name), failedRequestCounter.WithLabelValues(group.Name)) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
@@ -248,6 +287,7 @@ func (c *ResourceGroupsController) tryGetResourceGroup(ctx context.Context, name | |
// Check again to prevent initializing the same resource group concurrently. | ||
tmp, loaded := c.groupsController.LoadOrStore(group.GetName(), gc) | ||
if !loaded { | ||
resourceGroupStatusGauge.WithLabelValues(name).Set(1) | ||
log.Info("[resource group controller] create resource group cost controller", zap.String("name", group.GetName())) | ||
} | ||
return tmp.(*groupCostController), nil | ||
|
@@ -266,6 +306,7 @@ func (c *ResourceGroupsController) cleanUpResourceGroup(ctx context.Context) err | |
resourceGroupName := key.(string) | ||
if _, ok := latestGroups[resourceGroupName]; !ok { | ||
c.groupsController.Delete(key) | ||
resourceGroupStatusGauge.DeleteLabelValues(resourceGroupName) | ||
return true | ||
} | ||
|
||
|
@@ -277,6 +318,7 @@ func (c *ResourceGroupsController) cleanUpResourceGroup(ctx context.Context) err | |
if equalRU(latestConsumption, *gc.run.consumption) { | ||
if gc.tombstone { | ||
c.groupsController.Delete(resourceGroupName) | ||
resourceGroupStatusGauge.DeleteLabelValues(resourceGroupName) | ||
return true | ||
} | ||
gc.tombstone = true | ||
|
@@ -325,6 +367,7 @@ func (c *ResourceGroupsController) collectTokenBucketRequests(ctx context.Contex | |
if request != nil { | ||
c.currentRequests = append(c.currentRequests, request) | ||
} | ||
c.consumptionDispatcher <- request | ||
return true | ||
}) | ||
if len(c.currentRequests) > 0 { | ||
|
@@ -341,14 +384,18 @@ func (c *ResourceGroupsController) sendTokenBucketRequests(ctx context.Context, | |
go func() { | ||
log.Debug("[resource group controller] send token bucket request", zap.Time("now", now), zap.Any("req", req.Requests), zap.String("source", source)) | ||
resp, err := c.provider.AcquireTokenBuckets(ctx, req) | ||
latency := time.Since(now) | ||
if err != nil { | ||
// Don't log any errors caused by the stopper canceling the context. | ||
if !errors.ErrorEqual(err, context.Canceled) { | ||
log.L().Sugar().Infof("[resource group controller] token bucket rpc error: %v", err) | ||
} | ||
resp = nil | ||
failedTokenRequestCounter.Inc() | ||
} else { | ||
successfulTokenRequestDuration.Observe(latency.Seconds()) | ||
} | ||
log.Debug("[resource group controller] token bucket response", zap.Time("now", time.Now()), zap.Any("resp", resp), zap.String("source", source), zap.Duration("latency", time.Since(now))) | ||
log.Debug("[resource group controller] token bucket response", zap.Time("now", time.Now()), zap.Any("resp", resp), zap.String("source", source), zap.Duration("latency", latency)) | ||
c.tokenResponseChan <- resp | ||
}() | ||
} | ||
|
@@ -359,6 +406,7 @@ func (c *ResourceGroupsController) OnRequestWait( | |
) (*rmpb.Consumption, error) { | ||
gc, err := c.tryGetResourceGroup(ctx, resourceGroupName) | ||
if err != nil { | ||
failedRequestCounter.WithLabelValues(resourceGroupName).Inc() | ||
return nil, err | ||
} | ||
return gc.onRequestWait(ctx, info) | ||
|
@@ -384,6 +432,9 @@ type groupCostController struct { | |
|
||
handleRespFunc func(*rmpb.TokenBucketResponse) | ||
|
||
successfulRequestDuration prometheus.Observer | ||
failedRequestCounter prometheus.Counter | ||
|
||
mu struct { | ||
sync.Mutex | ||
consumption *rmpb.Consumption | ||
|
@@ -456,6 +507,8 @@ func newGroupCostController( | |
mainCfg *Config, | ||
lowRUNotifyChan chan struct{}, | ||
tokenBucketUpdateChan chan *groupCostController, | ||
successfulRequestDuration prometheus.Observer, | ||
failedRequestCounter prometheus.Counter, | ||
) (*groupCostController, error) { | ||
switch group.Mode { | ||
case rmpb.GroupMode_RUMode: | ||
|
@@ -467,8 +520,10 @@ func newGroupCostController( | |
} | ||
|
||
gc := &groupCostController{ | ||
ResourceGroup: group, | ||
mainCfg: mainCfg, | ||
ResourceGroup: group, | ||
mainCfg: mainCfg, | ||
successfulRequestDuration: successfulRequestDuration, | ||
failedRequestCounter: failedRequestCounter, | ||
calculators: []ResourceCalculator{ | ||
newKVCalculator(mainCfg), | ||
newSQLCalculator(mainCfg), | ||
|
@@ -862,8 +917,10 @@ func (gc *groupCostController) onRequestWait( | |
if !gc.burstable.Load() { | ||
var err error | ||
now := time.Now() | ||
var i int | ||
var d time.Duration | ||
retryLoop: | ||
for i := 0; i < maxRetry; i++ { | ||
for i = 0; i < maxRetry; i++ { | ||
switch gc.mode { | ||
case rmpb.GroupMode_RawMode: | ||
res := make([]*Reservation, 0, len(requestResourceLimitTypeList)) | ||
|
@@ -872,7 +929,7 @@ func (gc *groupCostController) onRequestWait( | |
res = append(res, counter.limiter.Reserve(ctx, gc.mainCfg.maxWaitDuration, now, v)) | ||
} | ||
} | ||
if err = WaitReservations(ctx, now, res); err == nil { | ||
if d, err = WaitReservations(ctx, now, res); err == nil { | ||
break retryLoop | ||
} | ||
case rmpb.GroupMode_RUMode: | ||
|
@@ -882,14 +939,17 @@ func (gc *groupCostController) onRequestWait( | |
res = append(res, counter.limiter.Reserve(ctx, gc.mainCfg.maxWaitDuration, now, v)) | ||
} | ||
} | ||
if err = WaitReservations(ctx, now, res); err == nil { | ||
if d, err = WaitReservations(ctx, now, res); err == nil { | ||
break retryLoop | ||
} | ||
} | ||
time.Sleep(100 * time.Millisecond) | ||
} | ||
if err != nil { | ||
gc.failedRequestCounter.Inc() | ||
return nil, err | ||
} else { | ||
gc.successfulRequestDuration.Observe(d.Seconds()) | ||
} | ||
} | ||
gc.mu.Lock() | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
need consist with #6063
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will be resolved automatically after merging #6063