From 7bed3fbeef8d9fa8b09ce52ad04c45eb6660d23c Mon Sep 17 00:00:00 2001 From: Cabinfever_B Date: Sat, 28 Jan 2023 18:05:55 +0800 Subject: [PATCH 1/2] remove maxRequestTokens Signed-off-by: Cabinfever_B --- pkg/mcs/resource_manager/client/client.go | 22 +++++++++---------- pkg/mcs/resource_manager/client/config.go | 3 --- pkg/mcs/resource_manager/client/limiter.go | 17 +++++--------- .../resource_manager/client/limiter_test.go | 10 ++++----- 4 files changed, 21 insertions(+), 31 deletions(-) diff --git a/pkg/mcs/resource_manager/client/client.go b/pkg/mcs/resource_manager/client/client.go index f5c60b7d653..58db0444296 100644 --- a/pkg/mcs/resource_manager/client/client.go +++ b/pkg/mcs/resource_manager/client/client.go @@ -248,7 +248,7 @@ func (c *ResourceGroupsController) sendTokenBucketRequests(ctx context.Context, TargetRequestPeriodMs: uint64(c.config.targetPeriod / time.Millisecond), } go func() { - log.Debug("[resource group controllor] send token bucket request", zap.Time("now", now), zap.Any("req", req.Requests), zap.String("source", source)) + log.Debug("[resource group controller] send token bucket request", zap.Time("now", now), zap.Any("req", req.Requests), zap.String("source", source)) resp, err := c.provider.AcquireTokenBuckets(ctx, req) if err != nil { // Don't log any errors caused by the stopper canceling the context. @@ -257,7 +257,7 @@ func (c *ResourceGroupsController) sendTokenBucketRequests(ctx context.Context, } resp = nil } - log.Debug("[resource group controllor] token bucket response", zap.Time("now", time.Now()), zap.Any("resp", resp), zap.String("source", source), zap.Duration("latency", time.Since(now))) + log.Debug("[resource group controller] token bucket response", zap.Time("now", time.Now()), zap.Any("resp", resp), zap.String("source", source), zap.Duration("latency", time.Since(now))) c.tokenResponseChan <- resp }() } @@ -327,7 +327,7 @@ func (c *ResourceGroupsController) OnRequestWait( return err } -// OnResponse is used to consume tokens atfer receiving response +// OnResponse is used to consume tokens after receiving response func (c *ResourceGroupsController) OnResponse(_ context.Context, resourceGroupName string, req RequestInfo, resp ResponseInfo) error { tmp, ok := c.groupsController.Load(resourceGroupName) if !ok { @@ -410,7 +410,7 @@ func newGroupCostController(group *rmpb.ResourceGroup, mainCfg *Config, lowRUNot case rmpb.GroupMode_RUMode: gc.handleRespFunc = gc.handleRUTokenResponse case rmpb.GroupMode_RawMode: - gc.handleRespFunc = gc.handleResourceTokenResponse + gc.handleRespFunc = gc.handleRawResourceTokenResponse } gc.mu.consumption = &rmpb.Consumption{} @@ -431,7 +431,7 @@ func (gc *groupCostController) initRunState() { gc.run.requestUnitTokens = make(map[rmpb.RequestUnitType]*tokenCounter) for typ := range requestUnitList { counter := &tokenCounter{ - limiter: NewLimiter(now, 0, initialRequestUnits, gc.mainCfg.maxRequestTokens, gc.lowRUNotifyChan), + limiter: NewLimiter(now, 0, initialRequestUnits, gc.lowRUNotifyChan), avgRUPerSec: initialRequestUnits / gc.run.targetPeriod.Seconds() * 2, avgLastTime: now, } @@ -441,7 +441,7 @@ func (gc *groupCostController) initRunState() { gc.run.resourceTokens = make(map[rmpb.RawResourceType]*tokenCounter) for typ := range requestResourceList { counter := &tokenCounter{ - limiter: NewLimiter(now, 0, initialRequestUnits, gc.mainCfg.maxRequestTokens, gc.lowRUNotifyChan), + limiter: NewLimiter(now, 0, initialRequestUnits, gc.lowRUNotifyChan), avgRUPerSec: initialRequestUnits / gc.run.targetPeriod.Seconds() * 2, avgLastTime: now, } @@ -475,7 +475,7 @@ func (gc *groupCostController) updateRunState(ctx context.Context) { } } } - log.Debug("update run state", zap.Any("request unit comsumption", gc.run.consumption)) + log.Debug("update run state", zap.Any("request unit consumption", gc.run.consumption)) gc.run.now = newTime } @@ -520,7 +520,7 @@ func (gc *groupCostController) updateAvgRaWResourcePerSec() { if !gc.calcAvg(counter, getRawResourceValueFromConsumption(gc.run.consumption, typ)) { continue } - log.Debug("[resource group controllor] update avg raw resource per sec", zap.String("name", gc.Name), zap.String("type", rmpb.RawResourceType_name[int32(typ)]), zap.Float64("avgRUPerSec", counter.avgRUPerSec)) + log.Debug("[resource group controller] update avg raw resource per sec", zap.String("name", gc.Name), zap.String("type", rmpb.RawResourceType_name[int32(typ)]), zap.Float64("avgRUPerSec", counter.avgRUPerSec)) } } @@ -529,7 +529,7 @@ func (gc *groupCostController) updateAvgRUPerSec() { if !gc.calcAvg(counter, getRUValueFromConsumption(gc.run.consumption, typ)) { continue } - log.Debug("[resource group controllor] update avg ru per sec", zap.String("name", gc.Name), zap.String("type", rmpb.RequestUnitType_name[int32(typ)]), zap.Float64("avgRUPerSec", counter.avgRUPerSec)) + log.Debug("[resource group controller] update avg ru per sec", zap.String("name", gc.Name), zap.String("type", rmpb.RequestUnitType_name[int32(typ)]), zap.Float64("avgRUPerSec", counter.avgRUPerSec)) } } @@ -578,7 +578,7 @@ func (gc *groupCostController) handleTokenBucketResponse(resp *rmpb.TokenBucketR } } -func (gc *groupCostController) handleResourceTokenResponse(resp *rmpb.TokenBucketResponse) { +func (gc *groupCostController) handleRawResourceTokenResponse(resp *rmpb.TokenBucketResponse) { for _, grantedTB := range resp.GetGrantedResourceTokens() { typ := grantedTB.GetType() counter, ok := gc.run.resourceTokens[typ] @@ -631,7 +631,7 @@ func (gc *groupCostController) modifyTokenCounter(counter *tokenCounter, bucket cfg.NotifyThreshold = notifyThreshold counter.lastDeadline = time.Time{} } else { - // Otherwise the granted token is delivered to the client by fillrate. + // Otherwise the granted token is delivered to the client by fill rate. cfg.NewTokens = 0 trickleDuration := time.Duration(trickleTimeMs) * time.Millisecond deadline := gc.run.now.Add(trickleDuration) diff --git a/pkg/mcs/resource_manager/client/config.go b/pkg/mcs/resource_manager/client/config.go index 5267eddce09..f6934f9a7ab 100644 --- a/pkg/mcs/resource_manager/client/config.go +++ b/pkg/mcs/resource_manager/client/config.go @@ -47,7 +47,6 @@ const ( extendedReportingPeriodFactor = 4 defaultGroupLoopUpdateInterval = 1 * time.Second defaultTargetPeriod = 10 * time.Second - defaultMaxRequestTokens = 1e8 ) const ( @@ -93,7 +92,6 @@ func DefaultRequestUnitConfig() *RequestUnitConfig { type Config struct { groupLoopUpdateInterval time.Duration targetPeriod time.Duration - maxRequestTokens float64 ReadBaseCost RequestUnit ReadBytesCost RequestUnit @@ -121,6 +119,5 @@ func generateConfig(ruConfig *RequestUnitConfig) *Config { } cfg.groupLoopUpdateInterval = defaultGroupLoopUpdateInterval cfg.targetPeriod = defaultTargetPeriod - cfg.maxRequestTokens = defaultMaxRequestTokens return cfg } diff --git a/pkg/mcs/resource_manager/client/limiter.go b/pkg/mcs/resource_manager/client/limiter.go index c87bf8c7424..33eb46bbf0a 100644 --- a/pkg/mcs/resource_manager/client/limiter.go +++ b/pkg/mcs/resource_manager/client/limiter.go @@ -61,10 +61,9 @@ func Every(interval time.Duration) Limit { // and the amount of time the caller must wait before using it, // or its associated context.Context is canceled. type Limiter struct { - mu sync.Mutex - limit Limit - tokens float64 - maxRequestTokens float64 + mu sync.Mutex + limit Limit + tokens float64 // last is the last time the limiter's tokens field was updated last time.Time notifyThreshold float64 @@ -84,12 +83,11 @@ func (lim *Limiter) Limit() Limit { // NewLimiter returns a new Limiter that allows events up to rate r and permits // bursts of at most b tokens. -func NewLimiter(now time.Time, r Limit, tokens, maxRequestTokens float64, lowTokensNotifyChan chan struct{}) *Limiter { +func NewLimiter(now time.Time, r Limit, tokens float64, lowTokensNotifyChan chan struct{}) *Limiter { lim := &Limiter{ limit: r, last: now, tokens: tokens, - maxRequestTokens: maxRequestTokens, lowTokensNotifyChan: lowTokensNotifyChan, } log.Info("new limiter", zap.String("limiter", fmt.Sprintf("%+v", lim))) @@ -291,11 +289,6 @@ func (lim *Limiter) reserveN(now time.Time, n float64, maxFutureReserve time.Dur tokens: n, timeToAct: now, } - } else if n > lim.maxRequestTokens { - return Reservation{ - ok: false, - lim: lim, - } } now, last, tokens := lim.advance(now) @@ -308,7 +301,7 @@ func (lim *Limiter) reserveN(now time.Time, n float64, maxFutureReserve time.Dur } // Decide result - ok := n <= lim.maxRequestTokens && waitDuration <= maxFutureReserve + ok := waitDuration <= maxFutureReserve // Prepare reservation r := Reservation{ diff --git a/pkg/mcs/resource_manager/client/limiter_test.go b/pkg/mcs/resource_manager/client/limiter_test.go index 9a50b0db918..2961af3fac3 100644 --- a/pkg/mcs/resource_manager/client/limiter_test.go +++ b/pkg/mcs/resource_manager/client/limiter_test.go @@ -83,7 +83,7 @@ func checkTokens(re *require.Assertions, lim *Limiter, t time.Time, expected flo } func TestSimpleReserve(t *testing.T) { - lim := NewLimiter(t0, 1, 2, 1000, make(chan struct{}, 1)) + lim := NewLimiter(t0, 1, 2, make(chan struct{}, 1)) runReserveMax(t, lim, request{t0, 3, t1, true}) runReserveMax(t, lim, request{t0, 3, t4, true}) @@ -97,7 +97,7 @@ func TestSimpleReserve(t *testing.T) { func TestReconfig(t *testing.T) { re := require.New(t) - lim := NewLimiter(t0, 1, 2, 1000, make(chan struct{}, 1)) + lim := NewLimiter(t0, 1, 2, make(chan struct{}, 1)) runReserveMax(t, lim, request{t0, 4, t2, true}) args := tokenBucketReconfigureArgs{ @@ -111,7 +111,7 @@ func TestReconfig(t *testing.T) { func TestNotify(t *testing.T) { nc := make(chan struct{}, 1) - lim := NewLimiter(t0, 1, 0, 1000, nc) + lim := NewLimiter(t0, 1, 0, nc) args := tokenBucketReconfigureArgs{ NewTokens: 1000., @@ -132,8 +132,8 @@ func TestCancel(t *testing.T) { ctx1, cancel1 := context.WithDeadline(ctx, t2) re := require.New(t) nc := make(chan struct{}, 1) - lim1 := NewLimiter(t0, 1, 10, 100, nc) - lim2 := NewLimiter(t0, 1, 0, 100, nc) + lim1 := NewLimiter(t0, 1, 10, nc) + lim2 := NewLimiter(t0, 1, 0, nc) r1 := runReserveMax(t, lim1, request{t0, 5, t0, true}) checkTokens(re, lim1, t0, 5) From d7a62ad9c4511c83c99d794072a176146c53035b Mon Sep 17 00:00:00 2001 From: Cabinfever_B Date: Sat, 28 Jan 2023 18:27:54 +0800 Subject: [PATCH 2/2] fix test Signed-off-by: Cabinfever_B --- pkg/mcs/resource_manager/client/limiter_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/mcs/resource_manager/client/limiter_test.go b/pkg/mcs/resource_manager/client/limiter_test.go index 2961af3fac3..b308514fa6c 100644 --- a/pkg/mcs/resource_manager/client/limiter_test.go +++ b/pkg/mcs/resource_manager/client/limiter_test.go @@ -90,7 +90,7 @@ func TestSimpleReserve(t *testing.T) { runReserveMax(t, lim, request{t3, 2, t6, true}) runReserve(t, lim, request{t3, 2, t7, false}, time.Second*4) - runReserveMax(t, lim, request{t5, 2000, t6, false}) + runReserve(t, lim, request{t5, 2000, t6, false}, time.Second*100) runReserve(t, lim, request{t3, 2, t8, true}, time.Second*8) }