diff --git a/common/persistence/client/health_request_rate_limiter.go b/common/persistence/client/health_request_rate_limiter.go index 2912fda5932..ceef0f80a62 100644 --- a/common/persistence/client/health_request_rate_limiter.go +++ b/common/persistence/client/health_request_rate_limiter.go @@ -150,13 +150,13 @@ func (rl *HealthRequestRateLimiterImpl) refreshRate() { if rl.latencyThresholdExceeded() || rl.errorThresholdExceeded() { // limit exceeded, do backoff rl.curRateMultiplier = math.Max(rl.minRateMultiplier, rl.curRateMultiplier-rl.curOptions.RateBackoffStepSize) - rl.rateLimiter.SetRate(rl.curRateMultiplier * rl.rateFn()) + rl.rateLimiter.SetRPS(rl.curRateMultiplier * rl.rateFn()) rl.rateLimiter.SetBurst(int(rl.rateToBurstRatio * rl.rateFn())) rl.logger.Info("Health threshold exceeded, reducing rate limit.", tag.NewFloat64("newMulti", rl.curRateMultiplier), tag.NewFloat64("newRate", rl.rateLimiter.Rate()), tag.NewFloat64("latencyAvg", rl.healthSignals.AverageLatency()), tag.NewFloat64("errorRatio", rl.healthSignals.ErrorRatio())) } else if rl.curRateMultiplier < rl.maxRateMultiplier { // already doing backoff and under thresholds, increase limit rl.curRateMultiplier = math.Min(rl.maxRateMultiplier, rl.curRateMultiplier+rl.curOptions.RateIncreaseStepSize) - rl.rateLimiter.SetRate(rl.curRateMultiplier * rl.rateFn()) + rl.rateLimiter.SetRPS(rl.curRateMultiplier * rl.rateFn()) rl.rateLimiter.SetBurst(int(rl.rateToBurstRatio * rl.rateFn())) rl.logger.Info("System healthy, increasing rate limit.", tag.NewFloat64("newMulti", rl.curRateMultiplier), tag.NewFloat64("newRate", rl.rateLimiter.Rate()), tag.NewFloat64("latencyAvg", rl.healthSignals.AverageLatency()), tag.NewFloat64("errorRatio", rl.healthSignals.ErrorRatio())) } diff --git a/common/quotas/clocked_rate_limiter.go b/common/quotas/clocked_rate_limiter.go index a6cd01a6611..8fd495d1c7e 100644 --- a/common/quotas/clocked_rate_limiter.go +++ b/common/quotas/clocked_rate_limiter.go @@ -30,15 +30,15 @@ import ( "fmt" "time" - "github.com/jonboulle/clockwork" + "go.temporal.io/server/common/clock" "golang.org/x/time/rate" ) -// ClockedRateLimiter wraps a rate.Limiter with a clockwork.Clock. It is used to ensure that the rate limiter respects -// the time determined by the clock. +// ClockedRateLimiter wraps a rate.Limiter with a clock.TimeSource. It is used to ensure that the rate limiter respects +// the time determined by the timeSource. type ClockedRateLimiter struct { rateLimiter *rate.Limiter - clock clockwork.Clock + timeSource clock.TimeSource } var ( @@ -47,15 +47,15 @@ var ( ErrRateLimiterReservationWouldExceedContextDeadline = errors.New("rate limiter reservation would exceed context deadline") ) -func NewClockedRateLimiter(rateLimiter *rate.Limiter, clock clockwork.Clock) ClockedRateLimiter { +func NewClockedRateLimiter(rateLimiter *rate.Limiter, timeSource clock.TimeSource) ClockedRateLimiter { return ClockedRateLimiter{ rateLimiter: rateLimiter, - clock: clock, + timeSource: timeSource, } } func (l ClockedRateLimiter) Allow() bool { - return l.AllowN(l.clock.Now(), 1) + return l.AllowN(l.timeSource.Now(), 1) } func (l ClockedRateLimiter) AllowN(now time.Time, token int) bool { @@ -63,10 +63,10 @@ func (l ClockedRateLimiter) AllowN(now time.Time, token int) bool { } // ClockedReservation wraps a rate.Reservation with a clockwork.Clock. It is used to ensure that the reservation -// respects the time determined by the clock. +// respects the time determined by the timeSource. type ClockedReservation struct { reservation *rate.Reservation - clock clockwork.Clock + timeSource clock.TimeSource } func (r ClockedReservation) OK() bool { @@ -74,7 +74,7 @@ func (r ClockedReservation) OK() bool { } func (r ClockedReservation) Delay() time.Duration { - return r.DelayFrom(r.clock.Now()) + return r.DelayFrom(r.timeSource.Now()) } func (r ClockedReservation) DelayFrom(t time.Time) time.Duration { @@ -82,7 +82,7 @@ func (r ClockedReservation) DelayFrom(t time.Time) time.Duration { } func (r ClockedReservation) Cancel() { - r.CancelAt(r.clock.Now()) + r.CancelAt(r.timeSource.Now()) } func (r ClockedReservation) CancelAt(t time.Time) { @@ -90,12 +90,12 @@ func (r ClockedReservation) CancelAt(t time.Time) { } func (l ClockedRateLimiter) Reserve() ClockedReservation { - return l.ReserveN(l.clock.Now(), 1) + return l.ReserveN(l.timeSource.Now(), 1) } func (l ClockedRateLimiter) ReserveN(now time.Time, token int) ClockedReservation { reservation := l.rateLimiter.ReserveN(now, token) - return ClockedReservation{reservation, l.clock} + return ClockedReservation{reservation, l.timeSource} } func (l ClockedRateLimiter) Wait(ctx context.Context) error { @@ -106,13 +106,9 @@ func (l ClockedRateLimiter) Wait(ctx context.Context) error { // the original method uses time.Now(), and does not allow us to pass in a time.Time. Fortunately, it can be built on // top of ReserveN. However, there are some optimizations that we can make. func (l ClockedRateLimiter) WaitN(ctx context.Context, token int) error { - reservation := ClockedReservation{l.rateLimiter.ReserveN(l.clock.Now(), token), l.clock} + reservation := ClockedReservation{l.rateLimiter.ReserveN(l.timeSource.Now(), token), l.timeSource} if !reservation.OK() { - return fmt.Errorf( - "%w: reservation would delay for %v", - ErrRateLimiterReservationCannotBeMade, - reservation.Delay(), - ) + return fmt.Errorf("%w: WaitN(n=%d)", ErrRateLimiterReservationCannotBeMade, token) } waitDuration := reservation.Delay() @@ -124,23 +120,22 @@ func (l ClockedRateLimiter) WaitN(ctx context.Context, token int) error { // Optimization: if the waitDuration is longer than the context deadline, we can immediately return an error. if deadline, ok := ctx.Deadline(); ok { - if l.clock.Now().Add(waitDuration).After(deadline) { + if l.timeSource.Now().Add(waitDuration).After(deadline) { reservation.Cancel() - return fmt.Errorf( - "%w: reservation would delay for %v", - ErrRateLimiterReservationWouldExceedContextDeadline, - reservation.Delay(), - ) + return fmt.Errorf("%w: WaitN(n=%d)", ErrRateLimiterReservationWouldExceedContextDeadline, token) } } - timer := l.clock.NewTimer(waitDuration) + waitExpired := make(chan struct{}) + timer := l.timeSource.AfterFunc(waitDuration, func() { + close(waitExpired) + }) defer timer.Stop() select { case <-ctx.Done(): reservation.Cancel() return fmt.Errorf("%w: %v", ErrRateLimiterWaitInterrupted, ctx.Err()) - case <-timer.Chan(): + case <-waitExpired: return nil } } diff --git a/common/quotas/clocked_rate_limiter_test.go b/common/quotas/clocked_rate_limiter_test.go index 249d0632de9..09b6202fbfe 100644 --- a/common/quotas/clocked_rate_limiter_test.go +++ b/common/quotas/clocked_rate_limiter_test.go @@ -29,8 +29,8 @@ import ( "testing" "time" - "github.com/jonboulle/clockwork" "github.com/stretchr/testify/assert" + "go.temporal.io/server/common/clock" "go.temporal.io/server/common/quotas" "golang.org/x/time/rate" ) @@ -38,16 +38,16 @@ import ( func TestClockedRateLimiter_Allow_NoQuota(t *testing.T) { t.Parallel() - clock := clockwork.NewRealClock() - rl := quotas.NewClockedRateLimiter(rate.NewLimiter(0, 0), clock) + ts := clock.NewRealTimeSource() + rl := quotas.NewClockedRateLimiter(rate.NewLimiter(0, 0), ts) assert.False(t, rl.Allow()) } func TestClockedRateLimiter_Allow_OneBurst(t *testing.T) { t.Parallel() - clock := clockwork.NewFakeClock() - rl := quotas.NewClockedRateLimiter(rate.NewLimiter(0, 1), clock) + ts := clock.NewEventTimeSource() + rl := quotas.NewClockedRateLimiter(rate.NewLimiter(0, 1), ts) assert.True(t, rl.Allow()) assert.False(t, rl.Allow()) } @@ -55,44 +55,44 @@ func TestClockedRateLimiter_Allow_OneBurst(t *testing.T) { func TestClockedRateLimiter_Allow_RPS_TooHigh(t *testing.T) { t.Parallel() - clock := clockwork.NewFakeClock() - rl := quotas.NewClockedRateLimiter(rate.NewLimiter(1, 1), clock) + ts := clock.NewEventTimeSource() + rl := quotas.NewClockedRateLimiter(rate.NewLimiter(1, 1), ts) assert.True(t, rl.Allow()) - clock.Advance(999 * time.Millisecond) + ts.Advance(999 * time.Millisecond) assert.False(t, rl.Allow()) } func TestClockedRateLimiter_Allow_RPS_Ok(t *testing.T) { t.Parallel() - clock := clockwork.NewFakeClock() - rl := quotas.NewClockedRateLimiter(rate.NewLimiter(1, 1), clock) + ts := clock.NewEventTimeSource() + rl := quotas.NewClockedRateLimiter(rate.NewLimiter(1, 1), ts) assert.True(t, rl.Allow()) - clock.Advance(time.Second) + ts.Advance(time.Second) assert.True(t, rl.Allow()) } func TestClockedRateLimiter_AllowN_Ok(t *testing.T) { t.Parallel() - clock := clockwork.NewFakeClock() - rl := quotas.NewClockedRateLimiter(rate.NewLimiter(0, 10), clock) - assert.True(t, rl.AllowN(clock.Now(), 10)) + ts := clock.NewEventTimeSource() + rl := quotas.NewClockedRateLimiter(rate.NewLimiter(0, 10), ts) + assert.True(t, rl.AllowN(ts.Now(), 10)) } func TestClockedRateLimiter_AllowN_NotOk(t *testing.T) { t.Parallel() - clock := clockwork.NewFakeClock() - rl := quotas.NewClockedRateLimiter(rate.NewLimiter(0, 10), clock) - assert.False(t, rl.AllowN(clock.Now(), 11)) + ts := clock.NewEventTimeSource() + rl := quotas.NewClockedRateLimiter(rate.NewLimiter(0, 10), ts) + assert.False(t, rl.AllowN(ts.Now(), 11)) } func TestClockedRateLimiter_Wait_NoBurst(t *testing.T) { t.Parallel() - clock := clockwork.NewFakeClock() - rl := quotas.NewClockedRateLimiter(rate.NewLimiter(1, 0), clock) + ts := clock.NewEventTimeSource() + rl := quotas.NewClockedRateLimiter(rate.NewLimiter(1, 0), ts) ctx := context.Background() assert.ErrorIs(t, rl.Wait(ctx), quotas.ErrRateLimiterReservationCannotBeMade) } @@ -100,14 +100,13 @@ func TestClockedRateLimiter_Wait_NoBurst(t *testing.T) { func TestClockedRateLimiter_Wait_Ok(t *testing.T) { t.Parallel() - clock := clockwork.NewFakeClock() - rl := quotas.NewClockedRateLimiter(rate.NewLimiter(1, 1), clock) + ts := clock.NewEventTimeSource() + rl := quotas.NewClockedRateLimiter(rate.NewLimiter(1, 1), ts) ctx := context.Background() assert.NoError(t, rl.Wait(ctx)) go func() { - clock.BlockUntil(1) - clock.Advance(time.Second) + ts.Advance(time.Second) }() assert.NoError(t, rl.Wait(ctx)) } @@ -115,17 +114,15 @@ func TestClockedRateLimiter_Wait_Ok(t *testing.T) { func TestClockedRateLimiter_Wait_Canceled(t *testing.T) { t.Parallel() - clock := clockwork.NewFakeClock() - rl := quotas.NewClockedRateLimiter(rate.NewLimiter(1, 1), clock) + ts := clock.NewEventTimeSource() + rl := quotas.NewClockedRateLimiter(rate.NewLimiter(1, 1), ts) ctx := context.Background() ctx, cancel := context.WithCancel(ctx) assert.NoError(t, rl.Wait(ctx)) go func() { - clock.BlockUntil(1) - clock.Advance(time.Millisecond * 999) - clock.BlockUntil(1) + ts.Advance(time.Millisecond * 999) cancel() }() assert.ErrorIs(t, rl.Wait(ctx), quotas.ErrRateLimiterWaitInterrupted) @@ -134,23 +131,23 @@ func TestClockedRateLimiter_Wait_Canceled(t *testing.T) { func TestClockedRateLimiter_Reserve(t *testing.T) { t.Parallel() - clock := clockwork.NewFakeClock() - rl := quotas.NewClockedRateLimiter(rate.NewLimiter(1, 1), clock) + ts := clock.NewEventTimeSource() + rl := quotas.NewClockedRateLimiter(rate.NewLimiter(1, 1), ts) rl.Allow() reservation := rl.Reserve() - assert.Equal(t, time.Second, reservation.DelayFrom(clock.Now())) + assert.Equal(t, time.Second, reservation.DelayFrom(ts.Now())) } func TestClockedRateLimiter_Wait_DeadlineWouldExceed(t *testing.T) { t.Parallel() - clock := clockwork.NewFakeClock() - rl := quotas.NewClockedRateLimiter(rate.NewLimiter(1, 1), clock) + ts := clock.NewEventTimeSource() + rl := quotas.NewClockedRateLimiter(rate.NewLimiter(1, 1), ts) rl.Allow() ctx := context.Background() - ctx, cancel := context.WithDeadline(ctx, clock.Now().Add(500*time.Millisecond)) + ctx, cancel := context.WithDeadline(ctx, ts.Now().Add(500*time.Millisecond)) t.Cleanup(cancel) assert.ErrorIs(t, rl.Wait(ctx), quotas.ErrRateLimiterReservationWouldExceedContextDeadline) } diff --git a/common/quotas/dynamic.go b/common/quotas/dynamic.go index dba75d69f99..6e1946dadd0 100644 --- a/common/quotas/dynamic.go +++ b/common/quotas/dynamic.go @@ -53,7 +53,7 @@ type ( } MutableRateBurst interface { - SetRate(rate float64) + SetRPS(rps float64) SetBurst(burst int) RateBurst } @@ -120,7 +120,7 @@ func NewMutableRateBurst( } } -func (d *MutableRateBurstImpl) SetRate(rate float64) { +func (d *MutableRateBurstImpl) SetRPS(rate float64) { d.rate.Store(rate) } diff --git a/common/quotas/rate_limiter_impl.go b/common/quotas/rate_limiter_impl.go index ff0fa19d7eb..1a5f17133b9 100644 --- a/common/quotas/rate_limiter_impl.go +++ b/common/quotas/rate_limiter_impl.go @@ -28,7 +28,7 @@ import ( "sync" "time" - "github.com/jonboulle/clockwork" + "go.temporal.io/server/common/clock" "golang.org/x/time/rate" ) @@ -36,9 +36,9 @@ type ( // RateLimiterImpl is a wrapper around the golang rate limiter RateLimiterImpl struct { sync.RWMutex - rps float64 - burst int - clock clockwork.Clock + rps float64 + burst int + timeSource clock.TimeSource ClockedRateLimiter } ) @@ -47,21 +47,21 @@ var _ RateLimiter = (*RateLimiterImpl)(nil) // NewRateLimiter returns a new rate limiter that can handle dynamic // configuration updates -func NewRateLimiter(newRate float64, newBurst int) *RateLimiterImpl { - limiter := rate.NewLimiter(rate.Limit(newRate), newBurst) - clock := clockwork.NewRealClock() +func NewRateLimiter(newRPS float64, newBurst int) *RateLimiterImpl { + limiter := rate.NewLimiter(rate.Limit(newRPS), newBurst) + ts := clock.NewRealTimeSource() rl := &RateLimiterImpl{ - rps: newRate, + rps: newRPS, burst: newBurst, - clock: clock, - ClockedRateLimiter: NewClockedRateLimiter(limiter, clock), + timeSource: ts, + ClockedRateLimiter: NewClockedRateLimiter(limiter, ts), } return rl } // SetRate set the rate of the rate limiter -func (rl *RateLimiterImpl) SetRate(rps float64) { +func (rl *RateLimiterImpl) SetRPS(rps float64) { rl.refreshInternalRateLimiterImpl(&rps, nil) } @@ -78,12 +78,12 @@ func (rl *RateLimiterImpl) ReserveN(now time.Time, n int) Reservation { return rl.ClockedRateLimiter.ReserveN(now, n) } -// SetRateBurst set the rate & burst of the rate limiter +// SetRateBurst set the rps & burst of the rate limiter func (rl *RateLimiterImpl) SetRateBurst(rps float64, burst int) { rl.refreshInternalRateLimiterImpl(&rps, &burst) } -// Rate returns the rate per second for this rate limiter +// Rate returns the rps for this rate limiter func (rl *RateLimiterImpl) Rate() float64 { rl.Lock() defer rl.Unlock() @@ -119,7 +119,7 @@ func (rl *RateLimiterImpl) refreshInternalRateLimiterImpl( } if refresh { - now := rl.clock.Now() + now := rl.timeSource.Now() rl.SetLimitAt(now, rate.Limit(rl.rps)) rl.SetBurstAt(now, rl.burst) } diff --git a/common/quotas/rate_limiter_impl_test.go b/common/quotas/rate_limiter_impl_test.go index 39bbaed6cfa..abf66fce92f 100644 --- a/common/quotas/rate_limiter_impl_test.go +++ b/common/quotas/rate_limiter_impl_test.go @@ -63,7 +63,7 @@ func (s *rateLimiterSuite) TestSetRate_Same() { rateLimiter := NewRateLimiter(testRate, testBurst) rateLimiterBefore := rateLimiter.ClockedRateLimiter - rateLimiter.SetRate(testRate) + rateLimiter.SetRPS(testRate) rateLimiterAfter := rateLimiter.ClockedRateLimiter s.Equal(testRate, rateLimiter.Rate()) s.Equal(testBurst, rateLimiter.Burst()) @@ -74,7 +74,7 @@ func (s *rateLimiterSuite) TestSetRate_Diff() { rateLimiter := NewRateLimiter(testRate, testBurst) newRate := testRate * 2 - rateLimiter.SetRate(newRate) + rateLimiter.SetRPS(newRate) s.Equal(newRate, rateLimiter.Rate()) s.Equal(testBurst, rateLimiter.Burst()) } diff --git a/service/matching/matcher.go b/service/matching/matcher.go index d1144c79a03..4e1ac76b5bb 100644 --- a/service/matching/matcher.go +++ b/service/matching/matcher.go @@ -309,25 +309,25 @@ func (tm *TaskMatcher) PollForQuery(ctx context.Context, pollMetadata *pollMetad } // UpdateRatelimit updates the task dispatch rate -func (tm *TaskMatcher) UpdateRatelimit(rps *float64) { - if rps == nil { +func (tm *TaskMatcher) UpdateRatelimit(rpsPtr *float64) { + if rpsPtr == nil { return } - rate := *rps + rps := *rpsPtr nPartitions := float64(tm.numPartitions()) if nPartitions > 0 { // divide the rate equally across all partitions - rate = rate / nPartitions + rps = rps / nPartitions } - burst := int(math.Ceil(rate)) + burst := int(math.Ceil(rps)) minTaskThrottlingBurstSize := tm.config.MinTaskThrottlingBurstSize() if burst < minTaskThrottlingBurstSize { burst = minTaskThrottlingBurstSize } - tm.dynamicRateBurst.SetRate(rate) + tm.dynamicRateBurst.SetRPS(rps) tm.dynamicRateBurst.SetBurst(burst) tm.forceRefreshRateOnce.Do(func() { // Dynamic rate limiter only refresh its rate every 1m. Before that initial 1m interval, it uses default rate diff --git a/service/matching/matching_engine_test.go b/service/matching/matching_engine_test.go index 6ca32c17cdd..44182cadd90 100644 --- a/service/matching/matching_engine_test.go +++ b/service/matching/matching_engine_test.go @@ -3061,9 +3061,9 @@ type ( } ) -func (d *dynamicRateBurstWrapper) SetRate(rate float64) { - d.MutableRateBurst.SetRate(rate) - d.RateLimiterImpl.SetRate(rate) +func (d *dynamicRateBurstWrapper) SetRPS(rps float64) { + d.MutableRateBurst.SetRPS(rps) + d.RateLimiterImpl.SetRPS(rps) } func (d *dynamicRateBurstWrapper) SetBurst(burst int) {