Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

resource_manager: remove maxRequestTokens in limiter #5880

Merged
merged 2 commits into from Jan 29, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
22 changes: 11 additions & 11 deletions pkg/mcs/resource_manager/client/client.go
Expand Up @@ -248,7 +248,7 @@ func (c *ResourceGroupsController) sendTokenBucketRequests(ctx context.Context,
TargetRequestPeriodMs: uint64(c.config.targetPeriod / time.Millisecond),
}
go func() {
log.Debug("[resource group controllor] send token bucket request", zap.Time("now", now), zap.Any("req", req.Requests), zap.String("source", source))
log.Debug("[resource group controller] send token bucket request", zap.Time("now", now), zap.Any("req", req.Requests), zap.String("source", source))
resp, err := c.provider.AcquireTokenBuckets(ctx, req)
if err != nil {
// Don't log any errors caused by the stopper canceling the context.
Expand All @@ -257,7 +257,7 @@ func (c *ResourceGroupsController) sendTokenBucketRequests(ctx context.Context,
}
resp = nil
}
log.Debug("[resource group controllor] token bucket response", zap.Time("now", time.Now()), zap.Any("resp", resp), zap.String("source", source), zap.Duration("latency", time.Since(now)))
log.Debug("[resource group controller] token bucket response", zap.Time("now", time.Now()), zap.Any("resp", resp), zap.String("source", source), zap.Duration("latency", time.Since(now)))
c.tokenResponseChan <- resp
}()
}
Expand Down Expand Up @@ -327,7 +327,7 @@ func (c *ResourceGroupsController) OnRequestWait(
return err
}

// OnResponse is used to consume tokens atfer receiving response
// OnResponse is used to consume tokens after receiving response
func (c *ResourceGroupsController) OnResponse(_ context.Context, resourceGroupName string, req RequestInfo, resp ResponseInfo) error {
tmp, ok := c.groupsController.Load(resourceGroupName)
if !ok {
Expand Down Expand Up @@ -410,7 +410,7 @@ func newGroupCostController(group *rmpb.ResourceGroup, mainCfg *Config, lowRUNot
case rmpb.GroupMode_RUMode:
gc.handleRespFunc = gc.handleRUTokenResponse
case rmpb.GroupMode_RawMode:
gc.handleRespFunc = gc.handleResourceTokenResponse
gc.handleRespFunc = gc.handleRawResourceTokenResponse
}

gc.mu.consumption = &rmpb.Consumption{}
Expand All @@ -431,7 +431,7 @@ func (gc *groupCostController) initRunState() {
gc.run.requestUnitTokens = make(map[rmpb.RequestUnitType]*tokenCounter)
for typ := range requestUnitList {
counter := &tokenCounter{
limiter: NewLimiter(now, 0, initialRequestUnits, gc.mainCfg.maxRequestTokens, gc.lowRUNotifyChan),
limiter: NewLimiter(now, 0, initialRequestUnits, gc.lowRUNotifyChan),
avgRUPerSec: initialRequestUnits / gc.run.targetPeriod.Seconds() * 2,
avgLastTime: now,
}
Expand All @@ -441,7 +441,7 @@ func (gc *groupCostController) initRunState() {
gc.run.resourceTokens = make(map[rmpb.RawResourceType]*tokenCounter)
for typ := range requestResourceList {
counter := &tokenCounter{
limiter: NewLimiter(now, 0, initialRequestUnits, gc.mainCfg.maxRequestTokens, gc.lowRUNotifyChan),
limiter: NewLimiter(now, 0, initialRequestUnits, gc.lowRUNotifyChan),
avgRUPerSec: initialRequestUnits / gc.run.targetPeriod.Seconds() * 2,
avgLastTime: now,
}
Expand Down Expand Up @@ -475,7 +475,7 @@ func (gc *groupCostController) updateRunState(ctx context.Context) {
}
}
}
log.Debug("update run state", zap.Any("request unit comsumption", gc.run.consumption))
log.Debug("update run state", zap.Any("request unit consumption", gc.run.consumption))
gc.run.now = newTime
}

Expand Down Expand Up @@ -520,7 +520,7 @@ func (gc *groupCostController) updateAvgRaWResourcePerSec() {
if !gc.calcAvg(counter, getRawResourceValueFromConsumption(gc.run.consumption, typ)) {
continue
}
log.Debug("[resource group controllor] update avg raw resource per sec", zap.String("name", gc.Name), zap.String("type", rmpb.RawResourceType_name[int32(typ)]), zap.Float64("avgRUPerSec", counter.avgRUPerSec))
log.Debug("[resource group controller] update avg raw resource per sec", zap.String("name", gc.Name), zap.String("type", rmpb.RawResourceType_name[int32(typ)]), zap.Float64("avgRUPerSec", counter.avgRUPerSec))
}
}

Expand All @@ -529,7 +529,7 @@ func (gc *groupCostController) updateAvgRUPerSec() {
if !gc.calcAvg(counter, getRUValueFromConsumption(gc.run.consumption, typ)) {
continue
}
log.Debug("[resource group controllor] update avg ru per sec", zap.String("name", gc.Name), zap.String("type", rmpb.RequestUnitType_name[int32(typ)]), zap.Float64("avgRUPerSec", counter.avgRUPerSec))
log.Debug("[resource group controller] update avg ru per sec", zap.String("name", gc.Name), zap.String("type", rmpb.RequestUnitType_name[int32(typ)]), zap.Float64("avgRUPerSec", counter.avgRUPerSec))
}
}

Expand Down Expand Up @@ -578,7 +578,7 @@ func (gc *groupCostController) handleTokenBucketResponse(resp *rmpb.TokenBucketR
}
}

func (gc *groupCostController) handleResourceTokenResponse(resp *rmpb.TokenBucketResponse) {
func (gc *groupCostController) handleRawResourceTokenResponse(resp *rmpb.TokenBucketResponse) {
for _, grantedTB := range resp.GetGrantedResourceTokens() {
typ := grantedTB.GetType()
counter, ok := gc.run.resourceTokens[typ]
Expand Down Expand Up @@ -631,7 +631,7 @@ func (gc *groupCostController) modifyTokenCounter(counter *tokenCounter, bucket
cfg.NotifyThreshold = notifyThreshold
counter.lastDeadline = time.Time{}
} else {
// Otherwise the granted token is delivered to the client by fillrate.
// Otherwise the granted token is delivered to the client by fill rate.
cfg.NewTokens = 0
trickleDuration := time.Duration(trickleTimeMs) * time.Millisecond
deadline := gc.run.now.Add(trickleDuration)
Expand Down
3 changes: 0 additions & 3 deletions pkg/mcs/resource_manager/client/config.go
Expand Up @@ -47,7 +47,6 @@ const (
extendedReportingPeriodFactor = 4
defaultGroupLoopUpdateInterval = 1 * time.Second
defaultTargetPeriod = 10 * time.Second
defaultMaxRequestTokens = 1e8
)

const (
Expand Down Expand Up @@ -93,7 +92,6 @@ func DefaultRequestUnitConfig() *RequestUnitConfig {
type Config struct {
groupLoopUpdateInterval time.Duration
targetPeriod time.Duration
maxRequestTokens float64

ReadBaseCost RequestUnit
ReadBytesCost RequestUnit
Expand Down Expand Up @@ -121,6 +119,5 @@ func generateConfig(ruConfig *RequestUnitConfig) *Config {
}
cfg.groupLoopUpdateInterval = defaultGroupLoopUpdateInterval
cfg.targetPeriod = defaultTargetPeriod
cfg.maxRequestTokens = defaultMaxRequestTokens
return cfg
}
17 changes: 5 additions & 12 deletions pkg/mcs/resource_manager/client/limiter.go
Expand Up @@ -61,10 +61,9 @@ func Every(interval time.Duration) Limit {
// and the amount of time the caller must wait before using it,
// or its associated context.Context is canceled.
type Limiter struct {
mu sync.Mutex
limit Limit
tokens float64
maxRequestTokens float64
mu sync.Mutex
limit Limit
tokens float64
// last is the last time the limiter's tokens field was updated
last time.Time
notifyThreshold float64
Expand All @@ -84,12 +83,11 @@ func (lim *Limiter) Limit() Limit {

// NewLimiter returns a new Limiter that allows events up to rate r and permits
// bursts of at most b tokens.
func NewLimiter(now time.Time, r Limit, tokens, maxRequestTokens float64, lowTokensNotifyChan chan struct{}) *Limiter {
func NewLimiter(now time.Time, r Limit, tokens float64, lowTokensNotifyChan chan struct{}) *Limiter {
lim := &Limiter{
limit: r,
last: now,
tokens: tokens,
maxRequestTokens: maxRequestTokens,
lowTokensNotifyChan: lowTokensNotifyChan,
}
log.Info("new limiter", zap.String("limiter", fmt.Sprintf("%+v", lim)))
Expand Down Expand Up @@ -291,11 +289,6 @@ func (lim *Limiter) reserveN(now time.Time, n float64, maxFutureReserve time.Dur
tokens: n,
timeToAct: now,
}
} else if n > lim.maxRequestTokens {
return Reservation{
ok: false,
lim: lim,
}
}
now, last, tokens := lim.advance(now)

Expand All @@ -308,7 +301,7 @@ func (lim *Limiter) reserveN(now time.Time, n float64, maxFutureReserve time.Dur
}

// Decide result
ok := n <= lim.maxRequestTokens && waitDuration <= maxFutureReserve
ok := waitDuration <= maxFutureReserve

// Prepare reservation
r := Reservation{
Expand Down
12 changes: 6 additions & 6 deletions pkg/mcs/resource_manager/client/limiter_test.go
Expand Up @@ -83,21 +83,21 @@ func checkTokens(re *require.Assertions, lim *Limiter, t time.Time, expected flo
}

func TestSimpleReserve(t *testing.T) {
lim := NewLimiter(t0, 1, 2, 1000, make(chan struct{}, 1))
lim := NewLimiter(t0, 1, 2, make(chan struct{}, 1))

runReserveMax(t, lim, request{t0, 3, t1, true})
runReserveMax(t, lim, request{t0, 3, t4, true})
runReserveMax(t, lim, request{t3, 2, t6, true})

runReserve(t, lim, request{t3, 2, t7, false}, time.Second*4)
runReserveMax(t, lim, request{t5, 2000, t6, false})
runReserve(t, lim, request{t5, 2000, t6, false}, time.Second*100)

runReserve(t, lim, request{t3, 2, t8, true}, time.Second*8)
}

func TestReconfig(t *testing.T) {
re := require.New(t)
lim := NewLimiter(t0, 1, 2, 1000, make(chan struct{}, 1))
lim := NewLimiter(t0, 1, 2, make(chan struct{}, 1))

runReserveMax(t, lim, request{t0, 4, t2, true})
args := tokenBucketReconfigureArgs{
Expand All @@ -111,7 +111,7 @@ func TestReconfig(t *testing.T) {

func TestNotify(t *testing.T) {
nc := make(chan struct{}, 1)
lim := NewLimiter(t0, 1, 0, 1000, nc)
lim := NewLimiter(t0, 1, 0, nc)

args := tokenBucketReconfigureArgs{
NewTokens: 1000.,
Expand All @@ -132,8 +132,8 @@ func TestCancel(t *testing.T) {
ctx1, cancel1 := context.WithDeadline(ctx, t2)
re := require.New(t)
nc := make(chan struct{}, 1)
lim1 := NewLimiter(t0, 1, 10, 100, nc)
lim2 := NewLimiter(t0, 1, 0, 100, nc)
lim1 := NewLimiter(t0, 1, 10, nc)
lim2 := NewLimiter(t0, 1, 0, nc)

r1 := runReserveMax(t, lim1, request{t0, 5, t0, true})
checkTokens(re, lim1, t0, 5)
Expand Down