Skip to content

Commit

Permalink
Add quotas.DelayedRateLimiter
Browse files Browse the repository at this point in the history
  • Loading branch information
MichaelSnowden committed Jul 28, 2023
1 parent 851e26b commit b2d94c1
Show file tree
Hide file tree
Showing 5 changed files with 377 additions and 0 deletions.
78 changes: 78 additions & 0 deletions common/quotas/delayed_request_rate_limiter.go
@@ -0,0 +1,78 @@
// The MIT License
//
// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved.
//
// Copyright (c) 2020 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package quotas

import (
"errors"
"fmt"
"time"

"go.temporal.io/server/common/clock"
)

// DelayedRequestRateLimiter is a rate limiter that allows all requests without any delay for a given duration. After
// the delay expires, it delegates to another rate limiter. This rate limiter is useful for cases where you want to
// allow all requests for a given duration, e.g. during something volatile like a deployment, and then switch to another
// rate limiter after the duration expires.
type DelayedRequestRateLimiter struct {
// RequestRateLimiter is the delegate that we switch to after the delay expires.
RequestRateLimiter
// timer triggers the rate limiter to delegate to the underlying rate limiter. We hold a reference to it in order to
// cancel it prematurely if needed.
timer clock.Timer
}

var ErrNegativeDelay = errors.New("delay cannot be negative")

// NewDelayedRequestRateLimiter returns a DelayedRequestRateLimiter that delegates to the given rate limiter after a
// delay. The timeSource is used to create the timer that triggers the switch. It returns an error if the given delay
// is negative.
func NewDelayedRequestRateLimiter(
rl RequestRateLimiter,
delay time.Duration,
timeSource clock.TimeSource,
) (*DelayedRequestRateLimiter, error) {
if delay < 0 {
return nil, fmt.Errorf("%w: %v", ErrNegativeDelay, delay)
}

delegator := RequestRateLimiterDelegator{}
delegator.SetRateLimiter(NoopRequestRateLimiter)

timer := timeSource.AfterFunc(delay, func() {
delegator.SetRateLimiter(rl)
})

return &DelayedRequestRateLimiter{
RequestRateLimiter: &delegator,
timer: timer,
}, nil
}

// Cancel stops the timer that triggers the rate limiter to delegate to the underlying rate limiter. It returns true if
// the timer was stopped before it expired.
func (rl *DelayedRequestRateLimiter) Cancel() bool {
return rl.timer.Stop()
}
96 changes: 96 additions & 0 deletions common/quotas/delayed_request_rate_limiter_test.go
@@ -0,0 +1,96 @@
// The MIT License
//
// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved.
//
// Copyright (c) 2020 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package quotas_test

import (
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.temporal.io/server/common/clock"
"go.temporal.io/server/common/quotas"
)

// disallowingRateLimiter is a rate limiter whose Allow method always returns false.
type disallowingRateLimiter struct {
// RequestRateLimiter is an embedded field so that disallowingRateLimiter implements that interface. It doesn't
// actually delegate to this rate limiter, and this field should be left nil.
quotas.RequestRateLimiter
}

func (rl disallowingRateLimiter) Allow(time.Time, quotas.Request) bool {
return false
}

func TestNewDelayedRequestRateLimiter_NegativeDelay(t *testing.T) {
t.Parallel()

_, err := quotas.NewDelayedRequestRateLimiter(
quotas.NoopRequestRateLimiter,
-time.Nanosecond,
clock.NewRealTimeSource(),
)
assert.ErrorIs(t, err, quotas.ErrNegativeDelay)
}

func TestNewDelayedRequestRateLimiter_ZeroDelay(t *testing.T) {
t.Parallel()

timeSource := clock.NewEventTimeSource()
drl, err := quotas.NewDelayedRequestRateLimiter(disallowingRateLimiter{}, 0, timeSource)
require.NoError(t, err)
assert.False(t, drl.Allow(time.Time{}, quotas.Request{}), "expected Allow to return false because we "+
"immediately switched to the disallowing rate limiter due to the zero delay")
}

func TestDelayedRequestRateLimiter_Allow(t *testing.T) {
t.Parallel()

timeSource := clock.NewEventTimeSource()
drl, err := quotas.NewDelayedRequestRateLimiter(disallowingRateLimiter{}, time.Second, timeSource)
require.NoError(t, err)
timeSource.Advance(time.Second - time.Nanosecond)
assert.True(t, drl.Allow(time.Time{}, quotas.Request{}), "expected Allow to return true because the "+
"timer hasn't expired yet")
timeSource.Advance(time.Nanosecond)
assert.False(t, drl.Allow(time.Time{}, quotas.Request{}), "expected Allow to return false because the "+
"timer expired, and we switched to the disallowing rate limiter")
}

func TestDelayedRequestRateLimiter_Cancel(t *testing.T) {
t.Parallel()

timeSource := clock.NewEventTimeSource()
drl, err := quotas.NewDelayedRequestRateLimiter(disallowingRateLimiter{}, time.Second, timeSource)
require.NoError(t, err)
timeSource.Advance(time.Second - time.Nanosecond)
assert.True(t, drl.Cancel(), "expected Cancel to return true because the timer was stopped before it "+
"expired")
timeSource.Advance(time.Nanosecond)
assert.True(t, drl.Allow(time.Time{}, quotas.Request{}), "expected Allow to return true because the "+
"timer was stopped before it could expire")
assert.False(t, drl.Cancel(), "expected Cancel to return false because the timer was already stopped")
}
46 changes: 46 additions & 0 deletions common/quotas/noop_request_rate_limiter_impl_test.go
@@ -0,0 +1,46 @@
// The MIT License
//
// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved.
//
// Copyright (c) 2020 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package quotas_test

import (
"context"
"testing"
"time"

"github.com/stretchr/testify/assert"
"go.temporal.io/server/common/quotas"
)

func TestNoopRequestRateLimiterImpl(t *testing.T) {
t.Parallel()

testNoopRequestRateLimiterImpl(t, quotas.NoopRequestRateLimiter)
}

func testNoopRequestRateLimiterImpl(t *testing.T, rl quotas.RequestRateLimiter) {
assert.True(t, rl.Allow(time.Now(), quotas.Request{}))
assert.Equal(t, quotas.NoopReservation, rl.Reserve(time.Now(), quotas.Request{}))
assert.NoError(t, rl.Wait(context.Background(), quotas.Request{}))
}
71 changes: 71 additions & 0 deletions common/quotas/request_rate_limiter_delegator.go
@@ -0,0 +1,71 @@
// The MIT License
//
// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved.
//
// Copyright (c) 2020 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package quotas

import (
"context"
"sync/atomic"
"time"
)

// RequestRateLimiterDelegator is a request rate limiter that delegates to another rate limiter. The delegate can be
// changed at runtime by calling SetRateLimiter. This rate limiter is useful for cases where you want to substitute one
// rate limiter implementation for another at runtime. All methods of this type are thread-safe.
type RequestRateLimiterDelegator struct {
// delegate is an atomic.Value so that it can be safely read from and written to concurrently. It stores the rate
// limiter that this rate limiter delegates to as a monomorphicRequestRateLimiter.
delegate atomic.Value
}

// monomorphicRequestRateLimiter is a workaround for the fact that the value stored in an atomic.Value must always be
// the same type, but we want to allow the rate limiter delegate to be any type that implements the RequestRateLimiter
// interface.
type monomorphicRequestRateLimiter struct {
RequestRateLimiter
}

// SetRateLimiter sets the rate limiter to delegate to.
func (d *RequestRateLimiterDelegator) SetRateLimiter(rl RequestRateLimiter) {
d.delegate.Store(monomorphicRequestRateLimiter{rl})
}

// loadDelegate returns the rate limiter that this rate limiter delegates to.
func (d *RequestRateLimiterDelegator) loadDelegate() RequestRateLimiter {
return d.delegate.Load().(RequestRateLimiter)
}

// The following methods just delegate to the underlying rate limiter.

func (d *RequestRateLimiterDelegator) Allow(now time.Time, request Request) bool {
return d.loadDelegate().Allow(now, request)
}

func (d *RequestRateLimiterDelegator) Reserve(now time.Time, request Request) Reservation {
return d.loadDelegate().Reserve(now, request)
}

func (d *RequestRateLimiterDelegator) Wait(ctx context.Context, request Request) error {
return d.loadDelegate().Wait(ctx, request)
}
86 changes: 86 additions & 0 deletions common/quotas/request_rate_limiter_delegator_test.go
@@ -0,0 +1,86 @@
// The MIT License
//
// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved.
//
// Copyright (c) 2020 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package quotas_test

import (
"context"
"testing"

"github.com/stretchr/testify/assert"
"go.temporal.io/server/common/quotas"
)

// blockingRequestRateLimiter is a rate limiter that blocks in its Wait method until the context is canceled.
type blockingRequestRateLimiter struct {
// RateLimiter is an embedded field so that blockingRequestRateLimiter implements the RateLimiter interface. It doesn't
// actually delegate to this rate limiter, and this field should be left nil.
quotas.RequestRateLimiter
// waitStarted is a channel which is sent to as soon as Wait is called.
waitStarted chan struct{}
}

func (b *blockingRequestRateLimiter) Wait(ctx context.Context, _ quotas.Request) error {
b.waitStarted <- struct{}{}
<-ctx.Done()
return ctx.Err()
}

// TestRateLimiterDelegator_Wait verifies that the RequestRateLimiterDelegator.Wait method can be called concurrently even if
// the rate limiter it delegates to is switched while the method is being called. The same condition should hold for
// all methods on RequestRateLimiterDelegator, but we only test Wait here for simplicity.
func TestRateLimiterDelegator_Wait(t *testing.T) {
t.Parallel()

blockingRateLimiter := &blockingRequestRateLimiter{
waitStarted: make(chan struct{}),
}
delegator := &quotas.RequestRateLimiterDelegator{}
delegator.SetRateLimiter(blockingRateLimiter)

ctx, cancel := context.WithCancel(context.Background())
waitErrs := make(chan error)

go func() {
waitErrs <- delegator.Wait(ctx, quotas.Request{})
}()
<-blockingRateLimiter.waitStarted
delegator.SetRateLimiter(quotas.NoopRequestRateLimiter)
assert.NoError(t, delegator.Wait(ctx, quotas.Request{}))
select {
case err := <-waitErrs:
t.Fatal("Wait returned before context was canceled:", err)
default:
}
cancel()
assert.ErrorIs(t, <-waitErrs, context.Canceled)
}

func TestRateLimiterDelegator_SetRateLimiter(t *testing.T) {
t.Parallel()

delegator := &quotas.RequestRateLimiterDelegator{}
delegator.SetRateLimiter(quotas.NoopRequestRateLimiter)
testNoopRequestRateLimiterImpl(t, delegator)
}

0 comments on commit b2d94c1

Please sign in to comment.