Skip to content

Commit

Permalink
Restore int64 based atomic rate limiter (#94)
Browse files Browse the repository at this point in the history
This limiter was introduced and merged in the following PR #85
Later @twelsh-aw found an issue with this implementation #90
So @rabbbit reverted this change in #91

Our tests did not detect this issue, so we have a separate PR #93 that enhances our tests approach to detect potential errors better.
With this PR, we want to restore the int64-based atomic rate limiter implementation as a non-default rate limiter and then check that #93 will detect the bug.
Right after it, we'll open a subsequent PR to fix this bug.
  • Loading branch information
storozhukBM committed Jul 2, 2022
1 parent 029273d commit 783ade2
Show file tree
Hide file tree
Showing 4 changed files with 108 additions and 4 deletions.
90 changes: 90 additions & 0 deletions limiter_atomic_int64.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
// Copyright (c) 2022 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package ratelimit // import "go.uber.org/ratelimit"

import (
"time"

"sync/atomic"
)

type atomicInt64Limiter struct {
//lint:ignore U1000 Padding is unused but it is crucial to maintain performance
// of this rate limiter in case of collocation with other frequently accessed memory.
prepadding [64]byte // cache line size = 64; created to avoid false sharing.
state int64 // unix nanoseconds of the next permissions issue.
//lint:ignore U1000 like prepadding.
postpadding [56]byte // cache line size - state size = 64 - 8; created to avoid false sharing.

perRequest time.Duration
maxSlack time.Duration
clock Clock
}

// newAtomicBased returns a new atomic based limiter.
func newAtomicInt64Based(rate int, opts ...Option) *atomicInt64Limiter {
// TODO consider moving config building to the implementation
// independent code.
config := buildConfig(opts)
perRequest := config.per / time.Duration(rate)
l := &atomicInt64Limiter{
perRequest: perRequest,
maxSlack: time.Duration(config.slack) * perRequest,
clock: config.clock,
}
atomic.StoreInt64(&l.state, 0)
return l
}

// Take blocks to ensure that the time spent between multiple
// Take calls is on average time.Second/rate.
func (t *atomicInt64Limiter) Take() time.Time {
var (
newTimeOfNextPermissionIssue int64
now int64
)
for {
now = t.clock.Now().UnixNano()
timeOfNextPermissionIssue := atomic.LoadInt64(&t.state)

switch {
case timeOfNextPermissionIssue == 0:
// If this is our first request, then we allow it.
newTimeOfNextPermissionIssue = now
case now-timeOfNextPermissionIssue > int64(t.maxSlack):
// a lot of nanoseconds passed since the last Take call
// we will limit max accumulated time to maxSlack
newTimeOfNextPermissionIssue = now - int64(t.maxSlack)
default:
// calculate the time at which our permission was issued
newTimeOfNextPermissionIssue = timeOfNextPermissionIssue + int64(t.perRequest)
}

if atomic.CompareAndSwapInt64(&t.state, timeOfNextPermissionIssue, newTimeOfNextPermissionIssue) {
break
}
}
nanosToSleepUntilOurPermissionIsIssued := newTimeOfNextPermissionIssue - now
if nanosToSleepUntilOurPermissionIsIssued > 0 {
t.clock.Sleep(time.Duration(nanosToSleepUntilOurPermissionIsIssued))
}
return time.Unix(0, newTimeOfNextPermissionIssue)
}
9 changes: 6 additions & 3 deletions ratelimit_bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,9 @@ func BenchmarkRateLimiter(b *testing.B) {
for _, procs := range []int{1, 4, 8, 16} {
runtime.GOMAXPROCS(procs)
for name, limiter := range map[string]Limiter{
"atomic": New(b.N * 10000000),
"mutex": newMutexBased(b.N * 10000000),
"atomic": newAtomicBased(b.N * 1000000000000),
"atomic_int64": newAtomicInt64Based(b.N * 1000000000000),
"mutex": newMutexBased(b.N * 1000000000000),
} {
for ng := 1; ng < 16; ng++ {
runner(b, name, procs, ng, limiter, count)
Expand Down Expand Up @@ -47,7 +48,9 @@ func BenchmarkRateLimiter(b *testing.B) {
}

func runner(b *testing.B, name string, procs int, ng int, limiter Limiter, count *atomic.Int64) bool {
return b.Run(fmt.Sprintf("type:%s-procs:%d-goroutines:%d", name, procs, ng), func(b *testing.B) {
return b.Run(fmt.Sprintf("type:%s;max_procs:%d;goroutines:%d", name, procs, ng), func(b *testing.B) {
b.ReportAllocs()

var wg sync.WaitGroup
trigger := atomic.NewBool(true)
n := b.N
Expand Down
12 changes: 11 additions & 1 deletion ratelimit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,13 +54,23 @@ func runTest(t *testing.T, fn func(testRunner)) {
return newAtomicBased(rate, opts...)
},
},
{
name: "atomic_int64",
constructor: func(rate int, opts ...Option) Limiter {
return newAtomicInt64Based(rate, opts...)
},
},
}

for _, tt := range impls {
t.Run(tt.name, func(t *testing.T) {
// Set a non-default time.Time since some limiters (int64 in particular) use
// the default value as "non-initialized" state.
clockMock := clock.NewMock()
clockMock.Set(time.Now())
r := runnerImpl{
t: t,
clock: clock.NewMock(),
clock: clockMock,
constructor: tt.constructor,
doneCh: make(chan struct{}),
}
Expand Down
1 change: 1 addition & 0 deletions tools/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ require (

require (
github.com/BurntSushi/toml v1.0.0 // indirect
github.com/storozhukBM/benchart v1.0.0
golang.org/x/exp/typeparams v0.0.0-20220328175248-053ad81199eb // indirect
golang.org/x/mod v0.6.0-dev.0.20220106191415-9b9b3d81d5e3 // indirect
golang.org/x/sys v0.0.0-20220330033206-e17cdc41300f // indirect
Expand Down

0 comments on commit 783ade2

Please sign in to comment.