Skip to content

Commit

Permalink
parent is Ticker, inherit maxrate
Browse files Browse the repository at this point in the history
  • Loading branch information
linkdata committed May 13, 2024
1 parent 29e8b09 commit 1a63c7e
Show file tree
Hide file tree
Showing 2 changed files with 79 additions and 26 deletions.
63 changes: 38 additions & 25 deletions ticker.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
type Ticker struct {
C <-chan struct{} // sends a struct{}{} at most maxrate times per second
tickCh chan struct{} // source for C, closed by runner
maxrate *int32 // maxrate pointer, or nil
mu sync.Mutex // protects following
closeCh chan struct{} // channel signalling Close() is called
counter int64 // counter
Expand Down Expand Up @@ -81,48 +82,57 @@ func (ticker *Ticker) Rate() (n int32) {
return
}

// Load returns the current load in permille.
// Load returns the current load in permille, or -1 if the rate is unlimited.
//
// Load is rounded up, and is only zero if the rate is zero.
func (ticker *Ticker) Load() (n int32) {
ticker.mu.Lock()
n = ticker.load
ticker.mu.Unlock()
return
}

func calcLoad(rate int32, maxrate *int32) (load int32) {
if maxrate != nil {
if mr := atomic.LoadInt32(maxrate); mr > 0 {
func (ticker *Ticker) calcLoadLocked() {
load := int32(-1)
if ticker.maxrate != nil {
if mr := atomic.LoadInt32(ticker.maxrate); mr > 0 {
mr *= 10
rate := ticker.rate * 10
if mr > 10000 {
// always round up the load
rate += (mr / 1000) - 1
}
load = (rate * 1000) / mr
}
}
return
ticker.load = load
}

func (ticker *Ticker) run(closeCh, parent <-chan struct{}, maxrate *int32) {
func (ticker *Ticker) run(closeCh <-chan struct{}, parent *Ticker) {
timer := time.NewTimer(tickerTimerDuration)
defer func() {
close(ticker.tickCh)
timer.Stop()
}()

var rl Limiter
var tickCh chan struct{}
var parentCh <-chan struct{}

timer := time.NewTimer(tickerTimerDuration)
defer timer.Stop()

parentCh := parent
rateWhen := time.Now()
rateCount := ticker.counter

tickCh := ticker.tickCh
if parent != nil {
tickCh = nil
parentCh = parent.C
} else {
tickCh = ticker.tickCh
}

for !ticker.IsClosed() {
select {
case tickCh <- struct{}{}:
// sent a tick to a consumer
if parent != nil {
parentCh = parent
parentCh = parent.C
tickCh = nil
}
ticker.mu.Lock()
Expand All @@ -132,14 +142,14 @@ func (ticker *Ticker) run(closeCh, parent <-chan struct{}, maxrate *int32) {
if rateCount == 0 {
// emulate some load before first actual measurement
ticker.rate++
ticker.load = calcLoad(ticker.rate, maxrate)
ticker.calcLoadLocked()
}
} else {
ticker.padding--
}
ticker.mu.Unlock()
if doWait {
rl.Wait(maxrate)
rl.Wait(ticker.maxrate)
}
case _, ok := <-parentCh:
// if parentCh is not nil, we require a successful read from it
Expand All @@ -156,7 +166,7 @@ func (ticker *Ticker) run(closeCh, parent <-chan struct{}, maxrate *int32) {
elapsed := time.Since(rateWhen)
rateWhen = rateWhen.Add(elapsed)
ticker.rate = int32(time.Duration(delta) * time.Second / elapsed)
ticker.load = calcLoad(ticker.rate, maxrate)
ticker.calcLoadLocked()
}
ticker.mu.Unlock()
case <-closeCh:
Expand All @@ -165,24 +175,27 @@ func (ticker *Ticker) run(closeCh, parent <-chan struct{}, maxrate *int32) {
}
}

// NewTicker returns a Ticker that reads ticks from a parent channel
// NewTicker returns a Ticker that reads ticks from a parent Ticker
// and sends a `struct{}{}` at most `*maxrate` times per second.
//
// The effective max rate is thus the lower of the parent channels rate
// of sending and `*maxrate`.
// The effective max rate is thus the lower of the parent Tickers
// maxrate and this Tickers `*maxrate`.
//
// A nil `parent` channel means tick rate is only limited by `maxrate`.
// A non-nil parent channel that closes will cause this Ticker to
// stop sending ticks.
// A nil `parent` Ticker means tick rate is only limited by `maxrate`.
// If the parent Ticker is closed, this Ticker will stop sending ticks.
//
// A nil `maxrate` or a `*maxrate` of zero or less sends
// as quickly as possible, so only limited by the parent channel.
func NewTicker(parent <-chan struct{}, maxrate *int32) *Ticker {
func NewTicker(parent *Ticker, maxrate *int32) *Ticker {
if maxrate == nil && parent != nil {
maxrate = parent.maxrate
}
ticker := &Ticker{
tickCh: make(chan struct{}),
closeCh: make(chan struct{}),
maxrate: maxrate,
}
ticker.C = ticker.tickCh
go ticker.run(ticker.closeCh, parent, maxrate)
go ticker.run(ticker.closeCh, parent)
return ticker
}
42 changes: 41 additions & 1 deletion ticker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ func TestNewSubTicker(t *testing.T) {
const n = 100
now := time.Now()
t1 := NewTicker(nil, nil)
t2 := NewTicker(t1.C, nil)
t2 := NewTicker(t1, nil)
for i := 0; i < n; i++ {
_, ok := <-t2.C
if !ok {
Expand Down Expand Up @@ -265,3 +265,43 @@ func TestInitialLoad(t *testing.T) {
}
ticker.Close()
}

func TestTicker_calcLoadLocked(t *testing.T) {
tests := []struct {
name string
maxrate int32
rate int32
load int32
}{
{"unlimited", 0, 0, -1},
{"1000,0", 1000, 0, 0},
{"1000,1", 1000, 1, 1},
{"1000,1000", 1000, 1000, 1000},
{"1000,1001", 1000, 1001, 1001},
{"100,1", 100, 1, 10},
{"1500,1", 1500, 1, 1},
{"1500,1499", 1500, 1499, 1000},
{"2000,1", 2000, 1, 1},
{"2000,2", 2000, 2, 1},
{"2000,3", 2000, 3, 2},
{"2000,1999", 2000, 1999, 1000},
{"10000,1", 10000, 1, 1},
{"10000,9990", 10000, 9990, 999},
{"10000,9991", 10000, 9991, 1000},
{"10000,9999", 10000, 9999, 1000},
{"10000,10000", 10000, 10000, 1000},
{"10000,10001", 10000, 10001, 1001},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
ticker := &Ticker{
maxrate: &tt.maxrate,
rate: tt.rate,
}
ticker.calcLoadLocked()
if ticker.load != tt.load {
t.Error("ticker.load is", ticker.load, "wanted", tt.load)
}
})
}
}

0 comments on commit 1a63c7e

Please sign in to comment.