Skip to content

Commit

Permalink
PR comments, mainly s/rate/rps
Browse files Browse the repository at this point in the history
  • Loading branch information
MichaelSnowden committed Sep 5, 2023
1 parent 2f07a85 commit 7bd2fc3
Show file tree
Hide file tree
Showing 8 changed files with 82 additions and 90 deletions.
4 changes: 2 additions & 2 deletions common/persistence/client/health_request_rate_limiter.go
Expand Up @@ -150,13 +150,13 @@ func (rl *HealthRequestRateLimiterImpl) refreshRate() {
if rl.latencyThresholdExceeded() || rl.errorThresholdExceeded() {
// limit exceeded, do backoff
rl.curRateMultiplier = math.Max(rl.minRateMultiplier, rl.curRateMultiplier-rl.curOptions.RateBackoffStepSize)
rl.rateLimiter.SetRate(rl.curRateMultiplier * rl.rateFn())
rl.rateLimiter.SetRPS(rl.curRateMultiplier * rl.rateFn())
rl.rateLimiter.SetBurst(int(rl.rateToBurstRatio * rl.rateFn()))
rl.logger.Info("Health threshold exceeded, reducing rate limit.", tag.NewFloat64("newMulti", rl.curRateMultiplier), tag.NewFloat64("newRate", rl.rateLimiter.Rate()), tag.NewFloat64("latencyAvg", rl.healthSignals.AverageLatency()), tag.NewFloat64("errorRatio", rl.healthSignals.ErrorRatio()))
} else if rl.curRateMultiplier < rl.maxRateMultiplier {
// already doing backoff and under thresholds, increase limit
rl.curRateMultiplier = math.Min(rl.maxRateMultiplier, rl.curRateMultiplier+rl.curOptions.RateIncreaseStepSize)
rl.rateLimiter.SetRate(rl.curRateMultiplier * rl.rateFn())
rl.rateLimiter.SetRPS(rl.curRateMultiplier * rl.rateFn())
rl.rateLimiter.SetBurst(int(rl.rateToBurstRatio * rl.rateFn()))
rl.logger.Info("System healthy, increasing rate limit.", tag.NewFloat64("newMulti", rl.curRateMultiplier), tag.NewFloat64("newRate", rl.rateLimiter.Rate()), tag.NewFloat64("latencyAvg", rl.healthSignals.AverageLatency()), tag.NewFloat64("errorRatio", rl.healthSignals.ErrorRatio()))
}
Expand Down
49 changes: 22 additions & 27 deletions common/quotas/clocked_rate_limiter.go
Expand Up @@ -30,15 +30,15 @@ import (
"fmt"
"time"

"github.com/jonboulle/clockwork"
"go.temporal.io/server/common/clock"
"golang.org/x/time/rate"
)

// ClockedRateLimiter wraps a rate.Limiter with a clockwork.Clock. It is used to ensure that the rate limiter respects
// the time determined by the clock.
// ClockedRateLimiter wraps a rate.Limiter with a clock.TimeSource. It is used to ensure that the rate limiter respects
// the time determined by the timeSource.
type ClockedRateLimiter struct {
rateLimiter *rate.Limiter
clock clockwork.Clock
timeSource clock.TimeSource
}

var (
Expand All @@ -47,55 +47,55 @@ var (
ErrRateLimiterReservationWouldExceedContextDeadline = errors.New("rate limiter reservation would exceed context deadline")
)

func NewClockedRateLimiter(rateLimiter *rate.Limiter, clock clockwork.Clock) ClockedRateLimiter {
func NewClockedRateLimiter(rateLimiter *rate.Limiter, timeSource clock.TimeSource) ClockedRateLimiter {
return ClockedRateLimiter{
rateLimiter: rateLimiter,
clock: clock,
timeSource: timeSource,
}
}

func (l ClockedRateLimiter) Allow() bool {
return l.AllowN(l.clock.Now(), 1)
return l.AllowN(l.timeSource.Now(), 1)
}

func (l ClockedRateLimiter) AllowN(now time.Time, token int) bool {
return l.rateLimiter.AllowN(now, token)
}

// ClockedReservation wraps a rate.Reservation with a clockwork.Clock. It is used to ensure that the reservation
// respects the time determined by the clock.
// respects the time determined by the timeSource.
type ClockedReservation struct {
reservation *rate.Reservation
clock clockwork.Clock
timeSource clock.TimeSource
}

func (r ClockedReservation) OK() bool {
return r.reservation.OK()
}

func (r ClockedReservation) Delay() time.Duration {
return r.DelayFrom(r.clock.Now())
return r.DelayFrom(r.timeSource.Now())
}

func (r ClockedReservation) DelayFrom(t time.Time) time.Duration {
return r.reservation.DelayFrom(t)
}

func (r ClockedReservation) Cancel() {
r.CancelAt(r.clock.Now())
r.CancelAt(r.timeSource.Now())
}

func (r ClockedReservation) CancelAt(t time.Time) {
r.reservation.CancelAt(t)
}

func (l ClockedRateLimiter) Reserve() ClockedReservation {
return l.ReserveN(l.clock.Now(), 1)
return l.ReserveN(l.timeSource.Now(), 1)
}

func (l ClockedRateLimiter) ReserveN(now time.Time, token int) ClockedReservation {
reservation := l.rateLimiter.ReserveN(now, token)
return ClockedReservation{reservation, l.clock}
return ClockedReservation{reservation, l.timeSource}
}

func (l ClockedRateLimiter) Wait(ctx context.Context) error {
Expand All @@ -106,13 +106,9 @@ func (l ClockedRateLimiter) Wait(ctx context.Context) error {
// the original method uses time.Now(), and does not allow us to pass in a time.Time. Fortunately, it can be built on
// top of ReserveN. However, there are some optimizations that we can make.
func (l ClockedRateLimiter) WaitN(ctx context.Context, token int) error {
reservation := ClockedReservation{l.rateLimiter.ReserveN(l.clock.Now(), token), l.clock}
reservation := ClockedReservation{l.rateLimiter.ReserveN(l.timeSource.Now(), token), l.timeSource}
if !reservation.OK() {
return fmt.Errorf(
"%w: reservation would delay for %v",
ErrRateLimiterReservationCannotBeMade,
reservation.Delay(),
)
return fmt.Errorf("%w: WaitN(n=%d)", ErrRateLimiterReservationCannotBeMade, token)
}

waitDuration := reservation.Delay()
Expand All @@ -124,23 +120,22 @@ func (l ClockedRateLimiter) WaitN(ctx context.Context, token int) error {

// Optimization: if the waitDuration is longer than the context deadline, we can immediately return an error.
if deadline, ok := ctx.Deadline(); ok {
if l.clock.Now().Add(waitDuration).After(deadline) {
if l.timeSource.Now().Add(waitDuration).After(deadline) {
reservation.Cancel()
return fmt.Errorf(
"%w: reservation would delay for %v",
ErrRateLimiterReservationWouldExceedContextDeadline,
reservation.Delay(),
)
return fmt.Errorf("%w: WaitN(n=%d)", ErrRateLimiterReservationWouldExceedContextDeadline, token)
}
}

timer := l.clock.NewTimer(waitDuration)
waitExpired := make(chan struct{})
timer := l.timeSource.AfterFunc(waitDuration, func() {
close(waitExpired)
})
defer timer.Stop()
select {
case <-ctx.Done():
reservation.Cancel()
return fmt.Errorf("%w: %v", ErrRateLimiterWaitInterrupted, ctx.Err())
case <-timer.Chan():
case <-waitExpired:
return nil
}
}
Expand Down
65 changes: 31 additions & 34 deletions common/quotas/clocked_rate_limiter_test.go
Expand Up @@ -29,103 +29,100 @@ import (
"testing"
"time"

"github.com/jonboulle/clockwork"
"github.com/stretchr/testify/assert"
"go.temporal.io/server/common/clock"
"go.temporal.io/server/common/quotas"
"golang.org/x/time/rate"
)

func TestClockedRateLimiter_Allow_NoQuota(t *testing.T) {
t.Parallel()

clock := clockwork.NewRealClock()
rl := quotas.NewClockedRateLimiter(rate.NewLimiter(0, 0), clock)
ts := clock.NewRealTimeSource()
rl := quotas.NewClockedRateLimiter(rate.NewLimiter(0, 0), ts)
assert.False(t, rl.Allow())
}

func TestClockedRateLimiter_Allow_OneBurst(t *testing.T) {
t.Parallel()

clock := clockwork.NewFakeClock()
rl := quotas.NewClockedRateLimiter(rate.NewLimiter(0, 1), clock)
ts := clock.NewEventTimeSource()
rl := quotas.NewClockedRateLimiter(rate.NewLimiter(0, 1), ts)
assert.True(t, rl.Allow())
assert.False(t, rl.Allow())
}

func TestClockedRateLimiter_Allow_RPS_TooHigh(t *testing.T) {
t.Parallel()

clock := clockwork.NewFakeClock()
rl := quotas.NewClockedRateLimiter(rate.NewLimiter(1, 1), clock)
ts := clock.NewEventTimeSource()
rl := quotas.NewClockedRateLimiter(rate.NewLimiter(1, 1), ts)
assert.True(t, rl.Allow())
clock.Advance(999 * time.Millisecond)
ts.Advance(999 * time.Millisecond)
assert.False(t, rl.Allow())
}

func TestClockedRateLimiter_Allow_RPS_Ok(t *testing.T) {
t.Parallel()

clock := clockwork.NewFakeClock()
rl := quotas.NewClockedRateLimiter(rate.NewLimiter(1, 1), clock)
ts := clock.NewEventTimeSource()
rl := quotas.NewClockedRateLimiter(rate.NewLimiter(1, 1), ts)
assert.True(t, rl.Allow())
clock.Advance(time.Second)
ts.Advance(time.Second)
assert.True(t, rl.Allow())
}

func TestClockedRateLimiter_AllowN_Ok(t *testing.T) {
t.Parallel()

clock := clockwork.NewFakeClock()
rl := quotas.NewClockedRateLimiter(rate.NewLimiter(0, 10), clock)
assert.True(t, rl.AllowN(clock.Now(), 10))
ts := clock.NewEventTimeSource()
rl := quotas.NewClockedRateLimiter(rate.NewLimiter(0, 10), ts)
assert.True(t, rl.AllowN(ts.Now(), 10))
}

func TestClockedRateLimiter_AllowN_NotOk(t *testing.T) {
t.Parallel()

clock := clockwork.NewFakeClock()
rl := quotas.NewClockedRateLimiter(rate.NewLimiter(0, 10), clock)
assert.False(t, rl.AllowN(clock.Now(), 11))
ts := clock.NewEventTimeSource()
rl := quotas.NewClockedRateLimiter(rate.NewLimiter(0, 10), ts)
assert.False(t, rl.AllowN(ts.Now(), 11))
}

func TestClockedRateLimiter_Wait_NoBurst(t *testing.T) {
t.Parallel()

clock := clockwork.NewFakeClock()
rl := quotas.NewClockedRateLimiter(rate.NewLimiter(1, 0), clock)
ts := clock.NewEventTimeSource()
rl := quotas.NewClockedRateLimiter(rate.NewLimiter(1, 0), ts)
ctx := context.Background()
assert.ErrorIs(t, rl.Wait(ctx), quotas.ErrRateLimiterReservationCannotBeMade)
}

func TestClockedRateLimiter_Wait_Ok(t *testing.T) {
t.Parallel()

clock := clockwork.NewFakeClock()
rl := quotas.NewClockedRateLimiter(rate.NewLimiter(1, 1), clock)
ts := clock.NewEventTimeSource()
rl := quotas.NewClockedRateLimiter(rate.NewLimiter(1, 1), ts)
ctx := context.Background()
assert.NoError(t, rl.Wait(ctx))

go func() {
clock.BlockUntil(1)
clock.Advance(time.Second)
ts.Advance(time.Second)
}()
assert.NoError(t, rl.Wait(ctx))
}

func TestClockedRateLimiter_Wait_Canceled(t *testing.T) {
t.Parallel()

clock := clockwork.NewFakeClock()
rl := quotas.NewClockedRateLimiter(rate.NewLimiter(1, 1), clock)
ts := clock.NewEventTimeSource()
rl := quotas.NewClockedRateLimiter(rate.NewLimiter(1, 1), ts)
ctx := context.Background()

ctx, cancel := context.WithCancel(ctx)
assert.NoError(t, rl.Wait(ctx))

go func() {
clock.BlockUntil(1)
clock.Advance(time.Millisecond * 999)
clock.BlockUntil(1)
ts.Advance(time.Millisecond * 999)
cancel()
}()
assert.ErrorIs(t, rl.Wait(ctx), quotas.ErrRateLimiterWaitInterrupted)
Expand All @@ -134,23 +131,23 @@ func TestClockedRateLimiter_Wait_Canceled(t *testing.T) {
func TestClockedRateLimiter_Reserve(t *testing.T) {
t.Parallel()

clock := clockwork.NewFakeClock()
rl := quotas.NewClockedRateLimiter(rate.NewLimiter(1, 1), clock)
ts := clock.NewEventTimeSource()
rl := quotas.NewClockedRateLimiter(rate.NewLimiter(1, 1), ts)
rl.Allow()
reservation := rl.Reserve()
assert.Equal(t, time.Second, reservation.DelayFrom(clock.Now()))
assert.Equal(t, time.Second, reservation.DelayFrom(ts.Now()))
}

func TestClockedRateLimiter_Wait_DeadlineWouldExceed(t *testing.T) {
t.Parallel()

clock := clockwork.NewFakeClock()
rl := quotas.NewClockedRateLimiter(rate.NewLimiter(1, 1), clock)
ts := clock.NewEventTimeSource()
rl := quotas.NewClockedRateLimiter(rate.NewLimiter(1, 1), ts)
rl.Allow()

ctx := context.Background()

ctx, cancel := context.WithDeadline(ctx, clock.Now().Add(500*time.Millisecond))
ctx, cancel := context.WithDeadline(ctx, ts.Now().Add(500*time.Millisecond))
t.Cleanup(cancel)
assert.ErrorIs(t, rl.Wait(ctx), quotas.ErrRateLimiterReservationWouldExceedContextDeadline)
}
4 changes: 2 additions & 2 deletions common/quotas/dynamic.go
Expand Up @@ -53,7 +53,7 @@ type (
}

MutableRateBurst interface {
SetRate(rate float64)
SetRPS(rps float64)
SetBurst(burst int)
RateBurst
}
Expand Down Expand Up @@ -120,7 +120,7 @@ func NewMutableRateBurst(
}
}

func (d *MutableRateBurstImpl) SetRate(rate float64) {
func (d *MutableRateBurstImpl) SetRPS(rate float64) {
d.rate.Store(rate)
}

Expand Down
28 changes: 14 additions & 14 deletions common/quotas/rate_limiter_impl.go
Expand Up @@ -28,17 +28,17 @@ import (
"sync"
"time"

"github.com/jonboulle/clockwork"
"go.temporal.io/server/common/clock"
"golang.org/x/time/rate"
)

type (
// RateLimiterImpl is a wrapper around the golang rate limiter
RateLimiterImpl struct {
sync.RWMutex
rps float64
burst int
clock clockwork.Clock
rps float64
burst int
timeSource clock.TimeSource
ClockedRateLimiter
}
)
Expand All @@ -47,21 +47,21 @@ var _ RateLimiter = (*RateLimiterImpl)(nil)

// NewRateLimiter returns a new rate limiter that can handle dynamic
// configuration updates
func NewRateLimiter(newRate float64, newBurst int) *RateLimiterImpl {
limiter := rate.NewLimiter(rate.Limit(newRate), newBurst)
clock := clockwork.NewRealClock()
func NewRateLimiter(newRPS float64, newBurst int) *RateLimiterImpl {
limiter := rate.NewLimiter(rate.Limit(newRPS), newBurst)
ts := clock.NewRealTimeSource()
rl := &RateLimiterImpl{
rps: newRate,
rps: newRPS,
burst: newBurst,
clock: clock,
ClockedRateLimiter: NewClockedRateLimiter(limiter, clock),
timeSource: ts,
ClockedRateLimiter: NewClockedRateLimiter(limiter, ts),
}

return rl
}

// SetRate set the rate of the rate limiter
func (rl *RateLimiterImpl) SetRate(rps float64) {
func (rl *RateLimiterImpl) SetRPS(rps float64) {
rl.refreshInternalRateLimiterImpl(&rps, nil)
}

Expand All @@ -78,12 +78,12 @@ func (rl *RateLimiterImpl) ReserveN(now time.Time, n int) Reservation {
return rl.ClockedRateLimiter.ReserveN(now, n)
}

// SetRateBurst set the rate & burst of the rate limiter
// SetRateBurst set the rps & burst of the rate limiter
func (rl *RateLimiterImpl) SetRateBurst(rps float64, burst int) {
rl.refreshInternalRateLimiterImpl(&rps, &burst)
}

// Rate returns the rate per second for this rate limiter
// Rate returns the rps for this rate limiter
func (rl *RateLimiterImpl) Rate() float64 {
rl.Lock()
defer rl.Unlock()
Expand Down Expand Up @@ -119,7 +119,7 @@ func (rl *RateLimiterImpl) refreshInternalRateLimiterImpl(
}

if refresh {
now := rl.clock.Now()
now := rl.timeSource.Now()
rl.SetLimitAt(now, rate.Limit(rl.rps))
rl.SetBurstAt(now, rl.burst)
}
Expand Down

0 comments on commit 7bd2fc3

Please sign in to comment.