Skip to content

Commit

Permalink
Atomic based limiter (uber-go#15)
Browse files Browse the repository at this point in the history
  • Loading branch information
storozhukBM authored and noah8713 committed Aug 4, 2020
1 parent cd1f0ab commit 5fb77d4
Show file tree
Hide file tree
Showing 12 changed files with 460 additions and 39 deletions.
4 changes: 3 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
test:
go test .
go test -race .
cover:
go test . -coverprofile=cover.out
go tool cover -html=cover.out
bench:
go test -bench=.
File renamed without changes.
133 changes: 133 additions & 0 deletions inner/clock/clock.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
// Copyright (c) 2016 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package clock

// Forked from github.com/andres-erbsen/clock to isolate a missing nap.

import (
"container/heap"
"sync"
"time"
)

// Mock represents a mock clock that only moves forward programmically.
// It can be preferable to a real-time clock when testing time-based functionality.
type Mock struct {
sync.Mutex
now time.Time // current time
timers Timers // timers
}

// NewMock returns an instance of a mock clock.
// The current time of the mock clock on initialization is the Unix epoch.
func NewMock() *Mock {
return &Mock{now: time.Unix(0, 0)}
}

// Add moves the current time of the mock clock forward by the duration.
// This should only be called from a single goroutine at a time.
func (m *Mock) Add(d time.Duration) {
m.Lock()
// Calculate the final time.
end := m.now.Add(d)

for len(m.timers) > 0 && m.now.Before(end) {
t := heap.Pop(&m.timers).(*Timer)
m.now = t.next
m.Unlock()
t.Tick()
m.Lock()
}

m.Unlock()
// Give a small buffer to make sure the other goroutines get handled.
nap()
}

// Timer produces a timer that will emit a time some duration after now.
func (m *Mock) Timer(d time.Duration) *Timer {
ch := make(chan time.Time)
t := &Timer{
C: ch,
c: ch,
mock: m,
next: m.now.Add(d),
}
m.addTimer(t)
return t
}

func (m *Mock) addTimer(t *Timer) {
m.Lock()
defer m.Unlock()
heap.Push(&m.timers, t)
}

// After produces a channel that will emit the time after a duration passes.
func (m *Mock) After(d time.Duration) <-chan time.Time {
return m.Timer(d).C
}

// AfterFunc waits for the duration to elapse and then executes a function.
// A Timer is returned that can be stopped.
func (m *Mock) AfterFunc(d time.Duration, f func()) *Timer {
t := m.Timer(d)
go func() {
<-t.c
f()
}()
nap()
return t
}

// Now returns the current wall time on the mock clock.
func (m *Mock) Now() time.Time {
m.Lock()
defer m.Unlock()
return m.now
}

// Sleep pauses the goroutine for the given duration on the mock clock.
// The clock must be moved forward in a separate goroutine.
func (m *Mock) Sleep(d time.Duration) {
<-m.After(d)
}

// Timer represents a single event.
type Timer struct {
C <-chan time.Time
c chan time.Time
next time.Time // next tick time
mock *Mock // mock clock
}

func (t *Timer) Next() time.Time { return t.next }

func (t *Timer) Tick() {
select {
case t.c <- t.next:
default:
}
nap()
}

// Sleep momentarily so that other goroutines can process.
func nap() { time.Sleep(1 * time.Millisecond) }
File renamed without changes.
File renamed without changes.
File renamed without changes.
34 changes: 34 additions & 0 deletions interface.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
// Copyright (c) 2016 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package clock

import "time"

// Clock represents an interface to the functions in the standard library time
// package. Two implementations are available in the clock package. The first
// is a real-time clock which simply wraps the time package's functions. The
// second is a mock clock which will only make forward progress when
// programmatically adjusted.
type Clock interface {
AfterFunc(d time.Duration, f func())
Now() time.Time
Sleep(d time.Duration)
}
67 changes: 67 additions & 0 deletions mutexbased.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package ratelimit

import (
"github.com/noah8713/ratelimit"
"sync"
"time"
)

type mutexLimiter struct {
sync.Mutex
last time.Time
sleepFor time.Duration
perRequest time.Duration
maxSlack time.Duration
clock Clock
}

// New returns a Limiter that will limit to the given RPS.
func newMutexBased(rate int, opts ...Option) Limiter {
l := &mutexLimiter{
perRequest: time.Second / time.Duration(rate),
maxSlack: -10 * time.Second / time.Duration(rate),
}
if l.clock == nil {
l.clock = clock.New()
}
return l
}

// Take blocks to ensure that the time spent between multiple
// Take calls is on average time.Second/rate.
func (t *mutexLimiter) Take() time.Time {
t.Lock()
defer t.Unlock()

now := t.clock.Now()

// If this is our first request, then we allow it.
if t.last.IsZero() {
t.last = now
return t.last
}

// sleepFor calculates how much time we should sleep based on
// the perRequest budget and how long the last request took.
// Since the request may take longer than the budget, this number
// can get negative, and is summed across requests.
t.sleepFor += t.perRequest - now.Sub(t.last)

// We shouldn't allow sleepFor to get too negative, since it would mean that
// a service that slowed down a lot for a short period of time would get
// a much higher RPS following that.
if t.sleepFor < t.maxSlack {
t.sleepFor = t.maxSlack
}

// If sleepFor is positive, then we should sleep now.
if t.sleepFor > 0 {
t.clock.Sleep(t.sleepFor)
t.last = now.Add(t.sleepFor)
t.sleepFor = 0
} else {
t.last = now
}

return t.last
}
88 changes: 50 additions & 38 deletions ratelimit.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,11 @@
package ratelimit // import "go.uber.org/ratelimit"

import (
"sync"
"time"

"go.uber.org/ratelimit/internal/clock"
"github.com/noah8713/ratelimit/"
"sync/atomic"
"unsafe"
)

// Note: This file is inspired by:
Expand All @@ -46,10 +47,15 @@ type Clock interface {
Sleep(time.Duration)
}

type state struct {
last time.Time
sleepFor time.Duration
}

type limiter struct {
sync.Mutex
last time.Time
sleepFor time.Duration
state unsafe.Pointer
padding [56]byte // cache line size - state pointer size = 64 - 8; created to avoid false sharing

perRequest time.Duration
maxSlack time.Duration
clock Clock
Expand All @@ -70,6 +76,11 @@ func New(rate int, opts ...Option) Limiter {
if l.clock == nil {
l.clock = clock.New()
}
initialState := state{
last: time.Time{},
sleepFor: 0,
}
atomic.StorePointer(&l.state, unsafe.Pointer(&initialState))
return l
}

Expand All @@ -92,40 +103,41 @@ func withoutSlackOption(l *limiter) {
// Take blocks to ensure that the time spent between multiple
// Take calls is on average time.Second/rate.
func (t *limiter) Take() time.Time {
t.Lock()
defer t.Unlock()

now := t.clock.Now()

// If this is our first request, then we allow it.
if t.last.IsZero() {
t.last = now
return t.last
newState := state{}
taken := false
for !taken {
now := t.clock.Now()

previousStatePointer := atomic.LoadPointer(&t.state)
oldState := (*state)(previousStatePointer)

newState = state{}
newState.last = now

// If this is our first request, then we allow it.
if oldState.last.IsZero() {
taken = atomic.CompareAndSwapPointer(&t.state, previousStatePointer, unsafe.Pointer(&newState))
continue
}

// sleepFor calculates how much time we should sleep based on
// the perRequest budget and how long the last request took.
// Since the request may take longer than the budget, this number
// can get negative, and is summed across requests.
newState.sleepFor += t.perRequest - now.Sub(oldState.last)
// We shouldn't allow sleepFor to get too negative, since it would mean that
// a service that slowed down a lot for a short period of time would get
// a much higher RPS following that.
if newState.sleepFor < t.maxSlack {
newState.sleepFor = t.maxSlack
}
if newState.sleepFor > 0 {
newState.last = newState.last.Add(newState.sleepFor)
}
taken = atomic.CompareAndSwapPointer(&t.state, previousStatePointer, unsafe.Pointer(&newState))
}

// sleepFor calculates how much time we should sleep based on
// the perRequest budget and how long the last request took.
// Since the request may take longer than the budget, this number
// can get negative, and is summed across requests.
t.sleepFor += t.perRequest - now.Sub(t.last)

// We shouldn't allow sleepFor to get too negative, since it would mean that
// a service that slowed down a lot for a short period of time would get
// a much higher RPS following that.
if t.sleepFor < t.maxSlack {
t.sleepFor = t.maxSlack
}

// If sleepFor is positive, then we should sleep now.
if t.sleepFor > 0 {
t.clock.Sleep(t.sleepFor)
t.last = now.Add(t.sleepFor)
t.sleepFor = 0
} else {
t.last = now
}

return t.last
t.clock.Sleep(newState.sleepFor)
return newState.last
}

type unlimited struct{}
Expand Down
Loading

0 comments on commit 5fb77d4

Please sign in to comment.