Skip to content

Commit

Permalink
resource_manager: remove maxRequestTokens in limiter (#5880)
Browse files Browse the repository at this point in the history
ref #5851

Signed-off-by: Cabinfever_B <cabinfeveroier@gmail.com>
  • Loading branch information
CabinfeverB committed Jan 29, 2023
1 parent 5c518d0 commit c55e632
Show file tree
Hide file tree
Showing 4 changed files with 22 additions and 32 deletions.
22 changes: 11 additions & 11 deletions pkg/mcs/resource_manager/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ func (c *ResourceGroupsController) sendTokenBucketRequests(ctx context.Context,
TargetRequestPeriodMs: uint64(c.config.targetPeriod / time.Millisecond),
}
go func() {
log.Debug("[resource group controllor] send token bucket request", zap.Time("now", now), zap.Any("req", req.Requests), zap.String("source", source))
log.Debug("[resource group controller] send token bucket request", zap.Time("now", now), zap.Any("req", req.Requests), zap.String("source", source))
resp, err := c.provider.AcquireTokenBuckets(ctx, req)
if err != nil {
// Don't log any errors caused by the stopper canceling the context.
Expand All @@ -257,7 +257,7 @@ func (c *ResourceGroupsController) sendTokenBucketRequests(ctx context.Context,
}
resp = nil
}
log.Debug("[resource group controllor] token bucket response", zap.Time("now", time.Now()), zap.Any("resp", resp), zap.String("source", source), zap.Duration("latency", time.Since(now)))
log.Debug("[resource group controller] token bucket response", zap.Time("now", time.Now()), zap.Any("resp", resp), zap.String("source", source), zap.Duration("latency", time.Since(now)))
c.tokenResponseChan <- resp
}()
}
Expand Down Expand Up @@ -327,7 +327,7 @@ func (c *ResourceGroupsController) OnRequestWait(
return err
}

// OnResponse is used to consume tokens atfer receiving response
// OnResponse is used to consume tokens after receiving response
func (c *ResourceGroupsController) OnResponse(_ context.Context, resourceGroupName string, req RequestInfo, resp ResponseInfo) error {
tmp, ok := c.groupsController.Load(resourceGroupName)
if !ok {
Expand Down Expand Up @@ -410,7 +410,7 @@ func newGroupCostController(group *rmpb.ResourceGroup, mainCfg *Config, lowRUNot
case rmpb.GroupMode_RUMode:
gc.handleRespFunc = gc.handleRUTokenResponse
case rmpb.GroupMode_RawMode:
gc.handleRespFunc = gc.handleResourceTokenResponse
gc.handleRespFunc = gc.handleRawResourceTokenResponse
}

gc.mu.consumption = &rmpb.Consumption{}
Expand All @@ -431,7 +431,7 @@ func (gc *groupCostController) initRunState() {
gc.run.requestUnitTokens = make(map[rmpb.RequestUnitType]*tokenCounter)
for typ := range requestUnitList {
counter := &tokenCounter{
limiter: NewLimiter(now, 0, initialRequestUnits, gc.mainCfg.maxRequestTokens, gc.lowRUNotifyChan),
limiter: NewLimiter(now, 0, initialRequestUnits, gc.lowRUNotifyChan),
avgRUPerSec: initialRequestUnits / gc.run.targetPeriod.Seconds() * 2,
avgLastTime: now,
}
Expand All @@ -441,7 +441,7 @@ func (gc *groupCostController) initRunState() {
gc.run.resourceTokens = make(map[rmpb.RawResourceType]*tokenCounter)
for typ := range requestResourceList {
counter := &tokenCounter{
limiter: NewLimiter(now, 0, initialRequestUnits, gc.mainCfg.maxRequestTokens, gc.lowRUNotifyChan),
limiter: NewLimiter(now, 0, initialRequestUnits, gc.lowRUNotifyChan),
avgRUPerSec: initialRequestUnits / gc.run.targetPeriod.Seconds() * 2,
avgLastTime: now,
}
Expand Down Expand Up @@ -475,7 +475,7 @@ func (gc *groupCostController) updateRunState(ctx context.Context) {
}
}
}
log.Debug("update run state", zap.Any("request unit comsumption", gc.run.consumption))
log.Debug("update run state", zap.Any("request unit consumption", gc.run.consumption))
gc.run.now = newTime
}

Expand Down Expand Up @@ -520,7 +520,7 @@ func (gc *groupCostController) updateAvgRaWResourcePerSec() {
if !gc.calcAvg(counter, getRawResourceValueFromConsumption(gc.run.consumption, typ)) {
continue
}
log.Debug("[resource group controllor] update avg raw resource per sec", zap.String("name", gc.Name), zap.String("type", rmpb.RawResourceType_name[int32(typ)]), zap.Float64("avgRUPerSec", counter.avgRUPerSec))
log.Debug("[resource group controller] update avg raw resource per sec", zap.String("name", gc.Name), zap.String("type", rmpb.RawResourceType_name[int32(typ)]), zap.Float64("avgRUPerSec", counter.avgRUPerSec))
}
}

Expand All @@ -529,7 +529,7 @@ func (gc *groupCostController) updateAvgRUPerSec() {
if !gc.calcAvg(counter, getRUValueFromConsumption(gc.run.consumption, typ)) {
continue
}
log.Debug("[resource group controllor] update avg ru per sec", zap.String("name", gc.Name), zap.String("type", rmpb.RequestUnitType_name[int32(typ)]), zap.Float64("avgRUPerSec", counter.avgRUPerSec))
log.Debug("[resource group controller] update avg ru per sec", zap.String("name", gc.Name), zap.String("type", rmpb.RequestUnitType_name[int32(typ)]), zap.Float64("avgRUPerSec", counter.avgRUPerSec))
}
}

Expand Down Expand Up @@ -578,7 +578,7 @@ func (gc *groupCostController) handleTokenBucketResponse(resp *rmpb.TokenBucketR
}
}

func (gc *groupCostController) handleResourceTokenResponse(resp *rmpb.TokenBucketResponse) {
func (gc *groupCostController) handleRawResourceTokenResponse(resp *rmpb.TokenBucketResponse) {
for _, grantedTB := range resp.GetGrantedResourceTokens() {
typ := grantedTB.GetType()
counter, ok := gc.run.resourceTokens[typ]
Expand Down Expand Up @@ -631,7 +631,7 @@ func (gc *groupCostController) modifyTokenCounter(counter *tokenCounter, bucket
cfg.NotifyThreshold = notifyThreshold
counter.lastDeadline = time.Time{}
} else {
// Otherwise the granted token is delivered to the client by fillrate.
// Otherwise the granted token is delivered to the client by fill rate.
cfg.NewTokens = 0
trickleDuration := time.Duration(trickleTimeMs) * time.Millisecond
deadline := gc.run.now.Add(trickleDuration)
Expand Down
3 changes: 0 additions & 3 deletions pkg/mcs/resource_manager/client/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ const (
extendedReportingPeriodFactor = 4
defaultGroupLoopUpdateInterval = 1 * time.Second
defaultTargetPeriod = 10 * time.Second
defaultMaxRequestTokens = 1e8
)

const (
Expand Down Expand Up @@ -93,7 +92,6 @@ func DefaultRequestUnitConfig() *RequestUnitConfig {
type Config struct {
groupLoopUpdateInterval time.Duration
targetPeriod time.Duration
maxRequestTokens float64

ReadBaseCost RequestUnit
ReadBytesCost RequestUnit
Expand Down Expand Up @@ -121,6 +119,5 @@ func generateConfig(ruConfig *RequestUnitConfig) *Config {
}
cfg.groupLoopUpdateInterval = defaultGroupLoopUpdateInterval
cfg.targetPeriod = defaultTargetPeriod
cfg.maxRequestTokens = defaultMaxRequestTokens
return cfg
}
17 changes: 5 additions & 12 deletions pkg/mcs/resource_manager/client/limiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,9 @@ func Every(interval time.Duration) Limit {
// and the amount of time the caller must wait before using it,
// or its associated context.Context is canceled.
type Limiter struct {
mu sync.Mutex
limit Limit
tokens float64
maxRequestTokens float64
mu sync.Mutex
limit Limit
tokens float64
// last is the last time the limiter's tokens field was updated
last time.Time
notifyThreshold float64
Expand All @@ -84,12 +83,11 @@ func (lim *Limiter) Limit() Limit {

// NewLimiter returns a new Limiter that allows events up to rate r and permits
// bursts of at most b tokens.
func NewLimiter(now time.Time, r Limit, tokens, maxRequestTokens float64, lowTokensNotifyChan chan struct{}) *Limiter {
func NewLimiter(now time.Time, r Limit, tokens float64, lowTokensNotifyChan chan struct{}) *Limiter {
lim := &Limiter{
limit: r,
last: now,
tokens: tokens,
maxRequestTokens: maxRequestTokens,
lowTokensNotifyChan: lowTokensNotifyChan,
}
log.Info("new limiter", zap.String("limiter", fmt.Sprintf("%+v", lim)))
Expand Down Expand Up @@ -291,11 +289,6 @@ func (lim *Limiter) reserveN(now time.Time, n float64, maxFutureReserve time.Dur
tokens: n,
timeToAct: now,
}
} else if n > lim.maxRequestTokens {
return Reservation{
ok: false,
lim: lim,
}
}
now, last, tokens := lim.advance(now)

Expand All @@ -308,7 +301,7 @@ func (lim *Limiter) reserveN(now time.Time, n float64, maxFutureReserve time.Dur
}

// Decide result
ok := n <= lim.maxRequestTokens && waitDuration <= maxFutureReserve
ok := waitDuration <= maxFutureReserve

// Prepare reservation
r := Reservation{
Expand Down
12 changes: 6 additions & 6 deletions pkg/mcs/resource_manager/client/limiter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,21 +83,21 @@ func checkTokens(re *require.Assertions, lim *Limiter, t time.Time, expected flo
}

func TestSimpleReserve(t *testing.T) {
lim := NewLimiter(t0, 1, 2, 1000, make(chan struct{}, 1))
lim := NewLimiter(t0, 1, 2, make(chan struct{}, 1))

runReserveMax(t, lim, request{t0, 3, t1, true})
runReserveMax(t, lim, request{t0, 3, t4, true})
runReserveMax(t, lim, request{t3, 2, t6, true})

runReserve(t, lim, request{t3, 2, t7, false}, time.Second*4)
runReserveMax(t, lim, request{t5, 2000, t6, false})
runReserve(t, lim, request{t5, 2000, t6, false}, time.Second*100)

runReserve(t, lim, request{t3, 2, t8, true}, time.Second*8)
}

func TestReconfig(t *testing.T) {
re := require.New(t)
lim := NewLimiter(t0, 1, 2, 1000, make(chan struct{}, 1))
lim := NewLimiter(t0, 1, 2, make(chan struct{}, 1))

runReserveMax(t, lim, request{t0, 4, t2, true})
args := tokenBucketReconfigureArgs{
Expand All @@ -111,7 +111,7 @@ func TestReconfig(t *testing.T) {

func TestNotify(t *testing.T) {
nc := make(chan struct{}, 1)
lim := NewLimiter(t0, 1, 0, 1000, nc)
lim := NewLimiter(t0, 1, 0, nc)

args := tokenBucketReconfigureArgs{
NewTokens: 1000.,
Expand All @@ -132,8 +132,8 @@ func TestCancel(t *testing.T) {
ctx1, cancel1 := context.WithDeadline(ctx, t2)
re := require.New(t)
nc := make(chan struct{}, 1)
lim1 := NewLimiter(t0, 1, 10, 100, nc)
lim2 := NewLimiter(t0, 1, 0, 100, nc)
lim1 := NewLimiter(t0, 1, 10, nc)
lim2 := NewLimiter(t0, 1, 0, nc)

r1 := runReserveMax(t, lim1, request{t0, 5, t0, true})
checkTokens(re, lim1, t0, 5)
Expand Down

0 comments on commit c55e632

Please sign in to comment.