From b2d94c173ddde0aeab4411d0acad4691f84cac72 Mon Sep 17 00:00:00 2001 From: Michael Snowden Date: Wed, 26 Jul 2023 16:59:02 -0700 Subject: [PATCH] Add quotas.DelayedRateLimiter --- common/quotas/delayed_request_rate_limiter.go | 78 +++++++++++++++ .../delayed_request_rate_limiter_test.go | 96 +++++++++++++++++++ .../noop_request_rate_limiter_impl_test.go | 46 +++++++++ .../quotas/request_rate_limiter_delegator.go | 71 ++++++++++++++ .../request_rate_limiter_delegator_test.go | 86 +++++++++++++++++ 5 files changed, 377 insertions(+) create mode 100644 common/quotas/delayed_request_rate_limiter.go create mode 100644 common/quotas/delayed_request_rate_limiter_test.go create mode 100644 common/quotas/noop_request_rate_limiter_impl_test.go create mode 100644 common/quotas/request_rate_limiter_delegator.go create mode 100644 common/quotas/request_rate_limiter_delegator_test.go diff --git a/common/quotas/delayed_request_rate_limiter.go b/common/quotas/delayed_request_rate_limiter.go new file mode 100644 index 00000000000..232a5e3258b --- /dev/null +++ b/common/quotas/delayed_request_rate_limiter.go @@ -0,0 +1,78 @@ +// The MIT License +// +// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved. +// +// Copyright (c) 2020 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package quotas + +import ( + "errors" + "fmt" + "time" + + "go.temporal.io/server/common/clock" +) + +// DelayedRequestRateLimiter is a rate limiter that allows all requests without any delay for a given duration. After +// the delay expires, it delegates to another rate limiter. This rate limiter is useful for cases where you want to +// allow all requests for a given duration, e.g. during something volatile like a deployment, and then switch to another +// rate limiter after the duration expires. +type DelayedRequestRateLimiter struct { + // RequestRateLimiter is the delegate that we switch to after the delay expires. + RequestRateLimiter + // timer triggers the rate limiter to delegate to the underlying rate limiter. We hold a reference to it in order to + // cancel it prematurely if needed. + timer clock.Timer +} + +var ErrNegativeDelay = errors.New("delay cannot be negative") + +// NewDelayedRequestRateLimiter returns a DelayedRequestRateLimiter that delegates to the given rate limiter after a +// delay. The timeSource is used to create the timer that triggers the switch. It returns an error if the given delay +// is negative. +func NewDelayedRequestRateLimiter( + rl RequestRateLimiter, + delay time.Duration, + timeSource clock.TimeSource, +) (*DelayedRequestRateLimiter, error) { + if delay < 0 { + return nil, fmt.Errorf("%w: %v", ErrNegativeDelay, delay) + } + + delegator := RequestRateLimiterDelegator{} + delegator.SetRateLimiter(NoopRequestRateLimiter) + + timer := timeSource.AfterFunc(delay, func() { + delegator.SetRateLimiter(rl) + }) + + return &DelayedRequestRateLimiter{ + RequestRateLimiter: &delegator, + timer: timer, + }, nil +} + +// Cancel stops the timer that triggers the rate limiter to delegate to the underlying rate limiter. It returns true if +// the timer was stopped before it expired. +func (rl *DelayedRequestRateLimiter) Cancel() bool { + return rl.timer.Stop() +} diff --git a/common/quotas/delayed_request_rate_limiter_test.go b/common/quotas/delayed_request_rate_limiter_test.go new file mode 100644 index 00000000000..d356d515c55 --- /dev/null +++ b/common/quotas/delayed_request_rate_limiter_test.go @@ -0,0 +1,96 @@ +// The MIT License +// +// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved. +// +// Copyright (c) 2020 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package quotas_test + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.temporal.io/server/common/clock" + "go.temporal.io/server/common/quotas" +) + +// disallowingRateLimiter is a rate limiter whose Allow method always returns false. +type disallowingRateLimiter struct { + // RequestRateLimiter is an embedded field so that disallowingRateLimiter implements that interface. It doesn't + // actually delegate to this rate limiter, and this field should be left nil. + quotas.RequestRateLimiter +} + +func (rl disallowingRateLimiter) Allow(time.Time, quotas.Request) bool { + return false +} + +func TestNewDelayedRequestRateLimiter_NegativeDelay(t *testing.T) { + t.Parallel() + + _, err := quotas.NewDelayedRequestRateLimiter( + quotas.NoopRequestRateLimiter, + -time.Nanosecond, + clock.NewRealTimeSource(), + ) + assert.ErrorIs(t, err, quotas.ErrNegativeDelay) +} + +func TestNewDelayedRequestRateLimiter_ZeroDelay(t *testing.T) { + t.Parallel() + + timeSource := clock.NewEventTimeSource() + drl, err := quotas.NewDelayedRequestRateLimiter(disallowingRateLimiter{}, 0, timeSource) + require.NoError(t, err) + assert.False(t, drl.Allow(time.Time{}, quotas.Request{}), "expected Allow to return false because we "+ + "immediately switched to the disallowing rate limiter due to the zero delay") +} + +func TestDelayedRequestRateLimiter_Allow(t *testing.T) { + t.Parallel() + + timeSource := clock.NewEventTimeSource() + drl, err := quotas.NewDelayedRequestRateLimiter(disallowingRateLimiter{}, time.Second, timeSource) + require.NoError(t, err) + timeSource.Advance(time.Second - time.Nanosecond) + assert.True(t, drl.Allow(time.Time{}, quotas.Request{}), "expected Allow to return true because the "+ + "timer hasn't expired yet") + timeSource.Advance(time.Nanosecond) + assert.False(t, drl.Allow(time.Time{}, quotas.Request{}), "expected Allow to return false because the "+ + "timer expired, and we switched to the disallowing rate limiter") +} + +func TestDelayedRequestRateLimiter_Cancel(t *testing.T) { + t.Parallel() + + timeSource := clock.NewEventTimeSource() + drl, err := quotas.NewDelayedRequestRateLimiter(disallowingRateLimiter{}, time.Second, timeSource) + require.NoError(t, err) + timeSource.Advance(time.Second - time.Nanosecond) + assert.True(t, drl.Cancel(), "expected Cancel to return true because the timer was stopped before it "+ + "expired") + timeSource.Advance(time.Nanosecond) + assert.True(t, drl.Allow(time.Time{}, quotas.Request{}), "expected Allow to return true because the "+ + "timer was stopped before it could expire") + assert.False(t, drl.Cancel(), "expected Cancel to return false because the timer was already stopped") +} diff --git a/common/quotas/noop_request_rate_limiter_impl_test.go b/common/quotas/noop_request_rate_limiter_impl_test.go new file mode 100644 index 00000000000..01a87e0190a --- /dev/null +++ b/common/quotas/noop_request_rate_limiter_impl_test.go @@ -0,0 +1,46 @@ +// The MIT License +// +// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved. +// +// Copyright (c) 2020 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package quotas_test + +import ( + "context" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "go.temporal.io/server/common/quotas" +) + +func TestNoopRequestRateLimiterImpl(t *testing.T) { + t.Parallel() + + testNoopRequestRateLimiterImpl(t, quotas.NoopRequestRateLimiter) +} + +func testNoopRequestRateLimiterImpl(t *testing.T, rl quotas.RequestRateLimiter) { + assert.True(t, rl.Allow(time.Now(), quotas.Request{})) + assert.Equal(t, quotas.NoopReservation, rl.Reserve(time.Now(), quotas.Request{})) + assert.NoError(t, rl.Wait(context.Background(), quotas.Request{})) +} diff --git a/common/quotas/request_rate_limiter_delegator.go b/common/quotas/request_rate_limiter_delegator.go new file mode 100644 index 00000000000..a9ad7d46900 --- /dev/null +++ b/common/quotas/request_rate_limiter_delegator.go @@ -0,0 +1,71 @@ +// The MIT License +// +// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved. +// +// Copyright (c) 2020 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package quotas + +import ( + "context" + "sync/atomic" + "time" +) + +// RequestRateLimiterDelegator is a request rate limiter that delegates to another rate limiter. The delegate can be +// changed at runtime by calling SetRateLimiter. This rate limiter is useful for cases where you want to substitute one +// rate limiter implementation for another at runtime. All methods of this type are thread-safe. +type RequestRateLimiterDelegator struct { + // delegate is an atomic.Value so that it can be safely read from and written to concurrently. It stores the rate + // limiter that this rate limiter delegates to as a monomorphicRequestRateLimiter. + delegate atomic.Value +} + +// monomorphicRequestRateLimiter is a workaround for the fact that the value stored in an atomic.Value must always be +// the same type, but we want to allow the rate limiter delegate to be any type that implements the RequestRateLimiter +// interface. +type monomorphicRequestRateLimiter struct { + RequestRateLimiter +} + +// SetRateLimiter sets the rate limiter to delegate to. +func (d *RequestRateLimiterDelegator) SetRateLimiter(rl RequestRateLimiter) { + d.delegate.Store(monomorphicRequestRateLimiter{rl}) +} + +// loadDelegate returns the rate limiter that this rate limiter delegates to. +func (d *RequestRateLimiterDelegator) loadDelegate() RequestRateLimiter { + return d.delegate.Load().(RequestRateLimiter) +} + +// The following methods just delegate to the underlying rate limiter. + +func (d *RequestRateLimiterDelegator) Allow(now time.Time, request Request) bool { + return d.loadDelegate().Allow(now, request) +} + +func (d *RequestRateLimiterDelegator) Reserve(now time.Time, request Request) Reservation { + return d.loadDelegate().Reserve(now, request) +} + +func (d *RequestRateLimiterDelegator) Wait(ctx context.Context, request Request) error { + return d.loadDelegate().Wait(ctx, request) +} diff --git a/common/quotas/request_rate_limiter_delegator_test.go b/common/quotas/request_rate_limiter_delegator_test.go new file mode 100644 index 00000000000..1683f39338a --- /dev/null +++ b/common/quotas/request_rate_limiter_delegator_test.go @@ -0,0 +1,86 @@ +// The MIT License +// +// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved. +// +// Copyright (c) 2020 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package quotas_test + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" + "go.temporal.io/server/common/quotas" +) + +// blockingRequestRateLimiter is a rate limiter that blocks in its Wait method until the context is canceled. +type blockingRequestRateLimiter struct { + // RateLimiter is an embedded field so that blockingRequestRateLimiter implements the RateLimiter interface. It doesn't + // actually delegate to this rate limiter, and this field should be left nil. + quotas.RequestRateLimiter + // waitStarted is a channel which is sent to as soon as Wait is called. + waitStarted chan struct{} +} + +func (b *blockingRequestRateLimiter) Wait(ctx context.Context, _ quotas.Request) error { + b.waitStarted <- struct{}{} + <-ctx.Done() + return ctx.Err() +} + +// TestRateLimiterDelegator_Wait verifies that the RequestRateLimiterDelegator.Wait method can be called concurrently even if +// the rate limiter it delegates to is switched while the method is being called. The same condition should hold for +// all methods on RequestRateLimiterDelegator, but we only test Wait here for simplicity. +func TestRateLimiterDelegator_Wait(t *testing.T) { + t.Parallel() + + blockingRateLimiter := &blockingRequestRateLimiter{ + waitStarted: make(chan struct{}), + } + delegator := "as.RequestRateLimiterDelegator{} + delegator.SetRateLimiter(blockingRateLimiter) + + ctx, cancel := context.WithCancel(context.Background()) + waitErrs := make(chan error) + + go func() { + waitErrs <- delegator.Wait(ctx, quotas.Request{}) + }() + <-blockingRateLimiter.waitStarted + delegator.SetRateLimiter(quotas.NoopRequestRateLimiter) + assert.NoError(t, delegator.Wait(ctx, quotas.Request{})) + select { + case err := <-waitErrs: + t.Fatal("Wait returned before context was canceled:", err) + default: + } + cancel() + assert.ErrorIs(t, <-waitErrs, context.Canceled) +} + +func TestRateLimiterDelegator_SetRateLimiter(t *testing.T) { + t.Parallel() + + delegator := "as.RequestRateLimiterDelegator{} + delegator.SetRateLimiter(quotas.NoopRequestRateLimiter) + testNoopRequestRateLimiterImpl(t, delegator) +}