Skip to content

Commit

Permalink
Load and Wait acts on toplevel Ticker
Browse files Browse the repository at this point in the history
  • Loading branch information
linkdata committed May 13, 2024
1 parent 1a63c7e commit 2bc0af7
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 44 deletions.
55 changes: 38 additions & 17 deletions ticker.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,14 @@ import (
type Ticker struct {
C <-chan struct{} // sends a struct{}{} at most maxrate times per second
tickCh chan struct{} // source for C, closed by runner
maxrate *int32 // maxrate pointer, or nil
parent *Ticker // parent Ticker, or nil
maxrate *int32 // (atomic) maxrate pointer, or nil
mu sync.Mutex // protects following
closeCh chan struct{} // channel signalling Close() is called
counter int64 // counter
padding int32 // padding added by Wait
rate int32 // current rate
load int32 // current load in permille
padding int32 // padding added by Wait
}

var tickerTimerDuration = time.Second
Expand Down Expand Up @@ -56,9 +57,16 @@ func (ticker *Ticker) IsClosed() (yes bool) {
// Returns true if we waited successfully, or false if the Ticker is closed.
//
// Typical use case is to launch goroutines that in turn uses the Ticker to rate limit some resource or action,
// thus limiting the rate of goroutines spawning without impacting the resource use rate.
// thus limiting the rate of goroutines spawning without impacting the resource use rate:
//
// if ticker.Load() < 1000 || ticker.Wait() {
// go startMoreWork()
// }
func (ticker *Ticker) Wait() (ok bool) {
if _, ok = <-ticker.tickCh; ok {
for ticker.parent != nil {
ticker = ticker.parent
}
ticker.mu.Lock()
ticker.padding++
ticker.mu.Unlock()
Expand All @@ -82,18 +90,24 @@ func (ticker *Ticker) Rate() (n int32) {
return
}

// Load returns the current load in permille, or -1 if the rate is unlimited.
// Load returns the current load in permille.
//
// Load is rounded up, and is only zero if the rate is zero.
func (ticker *Ticker) Load() (n int32) {
ticker.mu.Lock()
n = ticker.load
ticker.mu.Unlock()
// If the Ticker has parent Ticker(s), the highest load is returned.
func (ticker *Ticker) Load() (load int32) {
for ticker != nil {
ticker.mu.Lock()
if ticker.load > load {
load = ticker.load
}
ticker.mu.Unlock()
ticker = ticker.parent
}
return
}

func (ticker *Ticker) calcLoadLocked() {
load := int32(-1)
var load int32
if ticker.maxrate != nil {
if mr := atomic.LoadInt32(ticker.maxrate); mr > 0 {
mr *= 10
Expand All @@ -102,7 +116,9 @@ func (ticker *Ticker) calcLoadLocked() {
// always round up the load
rate += (mr / 1000) - 1
}
load = (rate * 1000) / mr
if load = (rate * 1000) / mr; load > 1000 {
load = 1000
}
}
}
ticker.load = load
Expand All @@ -121,6 +137,7 @@ func (ticker *Ticker) run(closeCh <-chan struct{}, parent *Ticker) {

rateWhen := time.Now()
rateCount := ticker.counter
needElapsed := (tickerTimerDuration * 10) / 11
if parent != nil {
parentCh = parent.C
} else {
Expand Down Expand Up @@ -162,11 +179,12 @@ func (ticker *Ticker) run(closeCh <-chan struct{}, parent *Ticker) {
// update current rate and load
ticker.mu.Lock()
if delta := ticker.counter - rateCount; delta > 0 {
rateCount = ticker.counter
elapsed := time.Since(rateWhen)
rateWhen = rateWhen.Add(elapsed)
ticker.rate = int32(time.Duration(delta) * time.Second / elapsed)
ticker.calcLoadLocked()
if elapsed := time.Since(rateWhen); elapsed >= needElapsed {
rateWhen = rateWhen.Add(elapsed)
rateCount = ticker.counter
ticker.rate = int32(time.Duration(delta) * time.Second / elapsed)
ticker.calcLoadLocked()
}
}
ticker.mu.Unlock()
case <-closeCh:
Expand All @@ -187,12 +205,15 @@ func (ticker *Ticker) run(closeCh <-chan struct{}, parent *Ticker) {
// A nil `maxrate` or a `*maxrate` of zero or less sends
// as quickly as possible, so only limited by the parent channel.
func NewTicker(parent *Ticker, maxrate *int32) *Ticker {
if maxrate == nil && parent != nil {
maxrate = parent.maxrate
if parent != nil {
if maxrate == nil {
maxrate = parent.maxrate
}
}
ticker := &Ticker{
tickCh: make(chan struct{}),
closeCh: make(chan struct{}),
parent: parent,
maxrate: maxrate,
}
ticker.C = ticker.tickCh
Expand Down
52 changes: 25 additions & 27 deletions ticker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ func TestNewTicker(t *testing.T) {
const n = 100
now := time.Now()
ticker := NewTicker(nil, nil)
defer ticker.Close()
for i := 0; i < n; i++ {
_, ok := <-ticker.C
if !ok {
Expand All @@ -109,7 +110,9 @@ func TestNewSubTicker(t *testing.T) {
const n = 100
now := time.Now()
t1 := NewTicker(nil, nil)
defer t1.Close()
t2 := NewTicker(t1, nil)
defer t2.Close()
for i := 0; i < n; i++ {
_, ok := <-t2.C
if !ok {
Expand Down Expand Up @@ -146,6 +149,7 @@ func TestWait(t *testing.T) {

maxrate := int32(100)
ticker := NewTicker(nil, &maxrate)
defer ticker.Close()
period := time.Second / time.Duration(maxrate)

now := time.Now()
Expand Down Expand Up @@ -177,6 +181,7 @@ func TestWaitTwice(t *testing.T) {
now := time.Now()
maxrate := int32(time.Second / variance * 2)
ticker := NewTicker(nil, &maxrate)
defer ticker.Close()

var wg sync.WaitGroup
wg.Add(1)
Expand All @@ -198,50 +203,44 @@ func TestWaitTwice(t *testing.T) {
}
}

/*func TestWaitFullRate_100(t *testing.T) {
for i := 0; i < 100; i++ {
TestWaitFullRate(t)
}
}*/

func TestWaitFullRate(t *testing.T) {
tickerTimerDuration = time.Millisecond
tickerTimerDuration = time.Second / 10
defer func() {
tickerTimerDuration = time.Second
}()
maxrate := int32(1000)
n := int(variance / 2 / (time.Second / time.Duration(maxrate)))
if n < 3 {
panic(n)
}
ticker := NewTicker(nil, &maxrate)
parent := NewTicker(nil, &maxrate)
defer parent.Close()
ticker := NewTicker(parent, nil)
defer ticker.Close()

if load := ticker.Load(); load != 0 {
t.Error("load out of spec", load)
}

// allow 1% over maxrate to account for sub-second tickerTimerDuration
maxratelimit := (maxrate * 101) / 100
now := time.Now()

for i := 0; i < n; i++ {
for time.Since(now) < tickerTimerDuration*2 {
ticker.Wait()
if rate := ticker.Rate(); rate < 1 || rate > maxrate {
t.Error("rate out of spec", rate)
if rate := ticker.Rate(); rate < 1 || rate > maxratelimit {
t.Fatal("rate out of spec", rate, ticker.Count())
}
if load := ticker.Load(); load < 1 || load > 1000 {
t.Error("load out of spec", load)
t.Fatal("load out of spec", load)
}
_, ok := <-ticker.C
if !ok {
t.Error("ticker channel closed early")
t.Fatal("ticker channel closed early")
}
}
expectsince := time.Second / time.Duration(maxrate) * time.Duration(n-1) // -1 since first tick is "free"
if d := time.Since(now); d < expectsince {
t.Errorf("%v < %v", d, expectsince)
}
if d := time.Since(now); d > variance {
t.Errorf("%v > %v", d, variance)
}
if rate := ticker.Rate(); rate < maxrate/2 {
t.Error("rate out of spec", rate)
}
if load := ticker.Load(); load < 500 || load > 1000 {
t.Error("load out of spec", load)
}
}

func TestInitialLoad(t *testing.T) {
Expand Down Expand Up @@ -273,11 +272,11 @@ func TestTicker_calcLoadLocked(t *testing.T) {
rate int32
load int32
}{
{"unlimited", 0, 0, -1},
{"unlimited", 0, 0, 0},
{"1000,0", 1000, 0, 0},
{"1000,1", 1000, 1, 1},
{"1000,1000", 1000, 1000, 1000},
{"1000,1001", 1000, 1001, 1001},
{"1000,1001", 1000, 1001, 1000},
{"100,1", 100, 1, 10},
{"1500,1", 1500, 1, 1},
{"1500,1499", 1500, 1499, 1000},
Expand All @@ -290,7 +289,6 @@ func TestTicker_calcLoadLocked(t *testing.T) {
{"10000,9991", 10000, 9991, 1000},
{"10000,9999", 10000, 9999, 1000},
{"10000,10000", 10000, 10000, 1000},
{"10000,10001", 10000, 10001, 1001},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
Expand Down

0 comments on commit 2bc0af7

Please sign in to comment.