Skip to content

Commit

Permalink
Add a clocked rate limiter (#4591)
Browse files Browse the repository at this point in the history
<!-- Describe what has changed in this PR -->
**What changed?**
Our RateLimiterImpl now uses a ClockedRateLimiter instead of a raw go
rate limiter which does not accept a clock.

<!-- Tell your future self why have you made these changes -->
**Why?**
To make unit testing / time skipping easier in the future, and to add
some test coverage.

<!-- How have you verified this change? Tested locally? Added a unit
test? Checked in staging env? -->
**How did you test it?**
I added a bunch of unit tests for the ClockedRateLimiter.

<!-- Assuming the worst case, what can be broken when deploying this
change to production? -->
**Potential risks**
1. If I missed something, we could be using system time instead of the
clock, and it would make downstream issues harder to debug
2. If there's a bug here, we could end up with a fauly rate limiter
which prevents any requests from going through
3. The tests aren't exactly trivial, so it could introduce a maintenance
burden

<!-- Is this PR a hotfix candidate or require that a notification be
sent to the broader community? (Yes/No) -->
**Is hotfix candidate?**
No
  • Loading branch information
MichaelSnowden committed Sep 6, 2023
1 parent 1282ee5 commit dc0a55d
Show file tree
Hide file tree
Showing 8 changed files with 354 additions and 68 deletions.
4 changes: 2 additions & 2 deletions common/persistence/client/health_request_rate_limiter.go
Expand Up @@ -150,13 +150,13 @@ func (rl *HealthRequestRateLimiterImpl) refreshRate() {
if rl.latencyThresholdExceeded() || rl.errorThresholdExceeded() {
// limit exceeded, do backoff
rl.curRateMultiplier = math.Max(rl.minRateMultiplier, rl.curRateMultiplier-rl.curOptions.RateBackoffStepSize)
rl.rateLimiter.SetRate(rl.curRateMultiplier * rl.rateFn())
rl.rateLimiter.SetRPS(rl.curRateMultiplier * rl.rateFn())
rl.rateLimiter.SetBurst(int(rl.rateToBurstRatio * rl.rateFn()))
rl.logger.Info("Health threshold exceeded, reducing rate limit.", tag.NewFloat64("newMulti", rl.curRateMultiplier), tag.NewFloat64("newRate", rl.rateLimiter.Rate()), tag.NewFloat64("latencyAvg", rl.healthSignals.AverageLatency()), tag.NewFloat64("errorRatio", rl.healthSignals.ErrorRatio()))
} else if rl.curRateMultiplier < rl.maxRateMultiplier {
// already doing backoff and under thresholds, increase limit
rl.curRateMultiplier = math.Min(rl.maxRateMultiplier, rl.curRateMultiplier+rl.curOptions.RateIncreaseStepSize)
rl.rateLimiter.SetRate(rl.curRateMultiplier * rl.rateFn())
rl.rateLimiter.SetRPS(rl.curRateMultiplier * rl.rateFn())
rl.rateLimiter.SetBurst(int(rl.rateToBurstRatio * rl.rateFn()))
rl.logger.Info("System healthy, increasing rate limit.", tag.NewFloat64("newMulti", rl.curRateMultiplier), tag.NewFloat64("newRate", rl.rateLimiter.Rate()), tag.NewFloat64("latencyAvg", rl.healthSignals.AverageLatency()), tag.NewFloat64("errorRatio", rl.healthSignals.ErrorRatio()))
}
Expand Down
149 changes: 149 additions & 0 deletions common/quotas/clocked_rate_limiter.go
@@ -0,0 +1,149 @@
// The MIT License
//
// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved.
//
// Copyright (c) 2020 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package quotas

import (
"context"
"errors"
"fmt"
"time"

"go.temporal.io/server/common/clock"
"golang.org/x/time/rate"
)

// ClockedRateLimiter wraps a rate.Limiter with a clock.TimeSource. It is used to ensure that the rate limiter respects
// the time determined by the timeSource.
type ClockedRateLimiter struct {
rateLimiter *rate.Limiter
timeSource clock.TimeSource
}

var (
ErrRateLimiterWaitInterrupted = errors.New("rate limiter wait interrupted")
ErrRateLimiterReservationCannotBeMade = errors.New("rate limiter reservation cannot be made due to insufficient quota")
ErrRateLimiterReservationWouldExceedContextDeadline = errors.New("rate limiter reservation would exceed context deadline")
)

func NewClockedRateLimiter(rateLimiter *rate.Limiter, timeSource clock.TimeSource) ClockedRateLimiter {
return ClockedRateLimiter{
rateLimiter: rateLimiter,
timeSource: timeSource,
}
}

func (l ClockedRateLimiter) Allow() bool {
return l.AllowN(l.timeSource.Now(), 1)
}

func (l ClockedRateLimiter) AllowN(now time.Time, token int) bool {
return l.rateLimiter.AllowN(now, token)
}

// ClockedReservation wraps a rate.Reservation with a clockwork.Clock. It is used to ensure that the reservation
// respects the time determined by the timeSource.
type ClockedReservation struct {
reservation *rate.Reservation
timeSource clock.TimeSource
}

func (r ClockedReservation) OK() bool {
return r.reservation.OK()
}

func (r ClockedReservation) Delay() time.Duration {
return r.DelayFrom(r.timeSource.Now())
}

func (r ClockedReservation) DelayFrom(t time.Time) time.Duration {
return r.reservation.DelayFrom(t)
}

func (r ClockedReservation) Cancel() {
r.CancelAt(r.timeSource.Now())
}

func (r ClockedReservation) CancelAt(t time.Time) {
r.reservation.CancelAt(t)
}

func (l ClockedRateLimiter) Reserve() ClockedReservation {
return l.ReserveN(l.timeSource.Now(), 1)
}

func (l ClockedRateLimiter) ReserveN(now time.Time, token int) ClockedReservation {
reservation := l.rateLimiter.ReserveN(now, token)
return ClockedReservation{reservation, l.timeSource}
}

func (l ClockedRateLimiter) Wait(ctx context.Context) error {
return l.WaitN(ctx, 1)
}

// WaitN is the only method that is different from rate.Limiter. We need to fully reimplement this method because
// the original method uses time.Now(), and does not allow us to pass in a time.Time. Fortunately, it can be built on
// top of ReserveN. However, there are some optimizations that we can make.
func (l ClockedRateLimiter) WaitN(ctx context.Context, token int) error {
reservation := ClockedReservation{l.rateLimiter.ReserveN(l.timeSource.Now(), token), l.timeSource}
if !reservation.OK() {
return fmt.Errorf("%w: WaitN(n=%d)", ErrRateLimiterReservationCannotBeMade, token)
}

waitDuration := reservation.Delay()

// Optimization: if the waitDuration is 0, we don't need to start a timer.
if waitDuration <= 0 {
return nil
}

// Optimization: if the waitDuration is longer than the context deadline, we can immediately return an error.
if deadline, ok := ctx.Deadline(); ok {
if l.timeSource.Now().Add(waitDuration).After(deadline) {
reservation.Cancel()
return fmt.Errorf("%w: WaitN(n=%d)", ErrRateLimiterReservationWouldExceedContextDeadline, token)
}
}

waitExpired := make(chan struct{})
timer := l.timeSource.AfterFunc(waitDuration, func() {
close(waitExpired)
})
defer timer.Stop()
select {
case <-ctx.Done():
reservation.Cancel()
return fmt.Errorf("%w: %v", ErrRateLimiterWaitInterrupted, ctx.Err())
case <-waitExpired:
return nil
}
}

func (l ClockedRateLimiter) SetLimitAt(t time.Time, newLimit rate.Limit) {
l.rateLimiter.SetLimitAt(t, newLimit)
}

func (l ClockedRateLimiter) SetBurstAt(t time.Time, newBurst int) {
l.rateLimiter.SetBurstAt(t, newBurst)
}
153 changes: 153 additions & 0 deletions common/quotas/clocked_rate_limiter_test.go
@@ -0,0 +1,153 @@
// The MIT License
//
// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved.
//
// Copyright (c) 2020 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package quotas_test

import (
"context"
"testing"
"time"

"github.com/stretchr/testify/assert"
"go.temporal.io/server/common/clock"
"go.temporal.io/server/common/quotas"
"golang.org/x/time/rate"
)

func TestClockedRateLimiter_Allow_NoQuota(t *testing.T) {
t.Parallel()

ts := clock.NewRealTimeSource()
rl := quotas.NewClockedRateLimiter(rate.NewLimiter(0, 0), ts)
assert.False(t, rl.Allow())
}

func TestClockedRateLimiter_Allow_OneBurst(t *testing.T) {
t.Parallel()

ts := clock.NewEventTimeSource()
rl := quotas.NewClockedRateLimiter(rate.NewLimiter(0, 1), ts)
assert.True(t, rl.Allow())
assert.False(t, rl.Allow())
}

func TestClockedRateLimiter_Allow_RPS_TooHigh(t *testing.T) {
t.Parallel()

ts := clock.NewEventTimeSource()
rl := quotas.NewClockedRateLimiter(rate.NewLimiter(1, 1), ts)
assert.True(t, rl.Allow())
ts.Advance(999 * time.Millisecond)
assert.False(t, rl.Allow())
}

func TestClockedRateLimiter_Allow_RPS_Ok(t *testing.T) {
t.Parallel()

ts := clock.NewEventTimeSource()
rl := quotas.NewClockedRateLimiter(rate.NewLimiter(1, 1), ts)
assert.True(t, rl.Allow())
ts.Advance(time.Second)
assert.True(t, rl.Allow())
}

func TestClockedRateLimiter_AllowN_Ok(t *testing.T) {
t.Parallel()

ts := clock.NewEventTimeSource()
rl := quotas.NewClockedRateLimiter(rate.NewLimiter(0, 10), ts)
assert.True(t, rl.AllowN(ts.Now(), 10))
}

func TestClockedRateLimiter_AllowN_NotOk(t *testing.T) {
t.Parallel()

ts := clock.NewEventTimeSource()
rl := quotas.NewClockedRateLimiter(rate.NewLimiter(0, 10), ts)
assert.False(t, rl.AllowN(ts.Now(), 11))
}

func TestClockedRateLimiter_Wait_NoBurst(t *testing.T) {
t.Parallel()

ts := clock.NewEventTimeSource()
rl := quotas.NewClockedRateLimiter(rate.NewLimiter(1, 0), ts)
ctx := context.Background()
assert.ErrorIs(t, rl.Wait(ctx), quotas.ErrRateLimiterReservationCannotBeMade)
}

func TestClockedRateLimiter_Wait_Ok(t *testing.T) {
t.Parallel()

ts := clock.NewEventTimeSource()
rl := quotas.NewClockedRateLimiter(rate.NewLimiter(1, 1), ts)
ctx := context.Background()
assert.NoError(t, rl.Wait(ctx))

go func() {
ts.Advance(time.Second)
}()
assert.NoError(t, rl.Wait(ctx))
}

func TestClockedRateLimiter_Wait_Canceled(t *testing.T) {
t.Parallel()

ts := clock.NewEventTimeSource()
rl := quotas.NewClockedRateLimiter(rate.NewLimiter(1, 1), ts)
ctx := context.Background()

ctx, cancel := context.WithCancel(ctx)
assert.NoError(t, rl.Wait(ctx))

go func() {
ts.Advance(time.Millisecond * 999)
cancel()
}()
assert.ErrorIs(t, rl.Wait(ctx), quotas.ErrRateLimiterWaitInterrupted)
}

func TestClockedRateLimiter_Reserve(t *testing.T) {
t.Parallel()

ts := clock.NewEventTimeSource()
rl := quotas.NewClockedRateLimiter(rate.NewLimiter(1, 1), ts)
rl.Allow()
reservation := rl.Reserve()
assert.Equal(t, time.Second, reservation.DelayFrom(ts.Now()))
}

func TestClockedRateLimiter_Wait_DeadlineWouldExceed(t *testing.T) {
t.Parallel()

ts := clock.NewEventTimeSource()
rl := quotas.NewClockedRateLimiter(rate.NewLimiter(1, 1), ts)
rl.Allow()

ctx := context.Background()

ctx, cancel := context.WithDeadline(ctx, ts.Now().Add(500*time.Millisecond))
t.Cleanup(cancel)
assert.ErrorIs(t, rl.Wait(ctx), quotas.ErrRateLimiterReservationWouldExceedContextDeadline)
}
4 changes: 2 additions & 2 deletions common/quotas/dynamic.go
Expand Up @@ -53,7 +53,7 @@ type (
}

MutableRateBurst interface {
SetRate(rate float64)
SetRPS(rps float64)
SetBurst(burst int)
RateBurst
}
Expand Down Expand Up @@ -120,7 +120,7 @@ func NewMutableRateBurst(
}
}

func (d *MutableRateBurstImpl) SetRate(rate float64) {
func (d *MutableRateBurstImpl) SetRPS(rate float64) {
d.rate.Store(rate)
}

Expand Down

0 comments on commit dc0a55d

Please sign in to comment.