Skip to content

Commit

Permalink
rework to use channels
Browse files Browse the repository at this point in the history
  • Loading branch information
linkdata committed May 7, 2024
1 parent f3b9144 commit d81f0c0
Show file tree
Hide file tree
Showing 3 changed files with 115 additions and 36 deletions.
2 changes: 1 addition & 1 deletion limiter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func TestLimiter_Wait(t *testing.T) {
{
name: "rate exceeds SleepGranularity",
rate: sleepGranularity * 100,
count: sleepGranularity + 1,
count: sleepGranularity,
},
}
for _, tt := range tests {
Expand Down
98 changes: 63 additions & 35 deletions ticker.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,10 @@ import (
)

type Ticker struct {
C <-chan struct{}
ch chan struct{}
C <-chan struct{} // sends a struct{}{} at most maxrate times per second
tickCh chan struct{} // source for C
waitCh chan struct{} // source for Wait
counter *uint64
waiting int32
closing int32
stopped int32
}
Expand All @@ -20,28 +20,21 @@ type Ticker struct {
// It is safe to call multiple times or concurrently.
func (ticker *Ticker) Close() {
if atomic.CompareAndSwapInt32(&ticker.closing, 0, 1) {
defer close(ticker.ch)
defer close(ticker.tickCh)
defer close(ticker.waitCh)
for atomic.LoadInt32(&ticker.stopped) != 1 {
select {
case <-ticker.C:
if ticker.counter != nil {
atomic.AddUint64(ticker.counter, ^uint64(0))
}
case <-ticker.waitCh:
default:
runtime.Gosched()
}
}
}
}

// Wait delays until the next tick is available without consuming it.
// Wait delays until the next tick is available.
func (ticker *Ticker) Wait() {
var delay time.Duration
for !atomic.CompareAndSwapInt32(&ticker.waiting, 0, 1) && atomic.LoadInt32(&ticker.stopped) == 0 {
time.Sleep(delay)
delay += time.Millisecond
}
<-ticker.C
<-ticker.waitCh
}

func (ticker *Ticker) run(parent <-chan struct{}, maxrate *int32, counter *uint64) {
Expand All @@ -50,20 +43,54 @@ func (ticker *Ticker) run(parent <-chan struct{}, maxrate *int32, counter *uint6
ticker.Close()
}()
var rl Limiter
var timeCh <-chan time.Time
timer := time.NewTimer(time.Second)
defer timer.Stop()
parentCh := parent
tickCh := ticker.tickCh
waitCh := ticker.waitCh

if parent != nil {
tickCh = nil
}

for atomic.LoadInt32(&ticker.closing) == 0 {
if parent != nil {
if _, ok := <-parent; !ok {
break
select {
case tickCh <- struct{}{}:
// sent a tick to a consumer
if counter != nil {
atomic.AddUint64(counter, 1)
}
waitCh = ticker.waitCh
timeCh = nil
if parent != nil {
parentCh = parent
tickCh = nil
}
rl.Wait(maxrate)
case waitCh <- struct{}{}:
// unblocked a goroutine calling Wait()
timeCh = nil
if maxrate != nil {
if rate := atomic.LoadInt32(maxrate); rate > 0 {
timer.Reset(time.Second / time.Duration(rate))
waitCh = nil
timeCh = timer.C
}
}
case <-timeCh:
// enough time has passed since last Wait() unblock
// so that we can safely allow one more to unblock
waitCh = ticker.waitCh
timeCh = nil
case _, ok := <-parentCh:
// if parentCh is not nil, we require a successful read from it
if !ok {
return
}
parentCh = nil
tickCh = ticker.tickCh
}
ticker.ch <- struct{}{}
if atomic.CompareAndSwapInt32(&ticker.waiting, 1, 0) {
ticker.ch <- struct{}{}
}
if counter != nil {
atomic.AddUint64(counter, 1)
}
rl.Wait(maxrate)
}
}

Expand All @@ -75,11 +102,8 @@ func (ticker *Ticker) run(parent <-chan struct{}, maxrate *int32, counter *uint6
//
// A nil `maxrate` or a `*maxrate` of zero or less sends
// as quickly as possible.
func NewTicker(maxrate *int32, counter *uint64) (ticker *Ticker) {
ch := make(chan struct{})
ticker = &Ticker{C: ch, ch: ch, counter: counter}
go ticker.run(nil, maxrate, counter)
return
func NewTicker(maxrate *int32, counter *uint64) *Ticker {
return NewSubTicker(nil, maxrate, counter)
}

// NewSubTicker returns a channel that reads from another struct{}{}
Expand All @@ -92,9 +116,13 @@ func NewTicker(maxrate *int32, counter *uint64) (ticker *Ticker) {
// Use this to make "background" tickers that are less prioritized.
//
// The Ticker is closed when the parent channel closes.
func NewSubTicker(parent <-chan struct{}, maxrate *int32, counter *uint64) (ticker *Ticker) {
ch := make(chan struct{})
ticker = &Ticker{C: ch, ch: ch, counter: counter}
func NewSubTicker(parent <-chan struct{}, maxrate *int32, counter *uint64) *Ticker {
ticker := &Ticker{
tickCh: make(chan struct{}),
waitCh: make(chan struct{}),
counter: counter,
}
ticker.C = ticker.tickCh
go ticker.run(parent, maxrate, counter)
return
return ticker
}
51 changes: 51 additions & 0 deletions ticker_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package rate

import (
"sync"
"sync/atomic"
"testing"
"time"
Expand All @@ -19,6 +20,29 @@ func TestTickerClosing(t *testing.T) {
}
}

func TestTickerClosingWithWaiters(t *testing.T) {
maxrate := int32(time.Second / variance * 2)
ticker := NewTicker(&maxrate, nil)
var wg sync.WaitGroup
for i := 0; i < 10; i++ {
wg.Add(1)
go func() {
defer wg.Done()
ticker.Wait()
}()
}
ticker.Wait()
ticker.Close()
wg.Wait()
select {
case _, ok := <-ticker.C:
if ok {
t.Error("got a tick")
}
default:
}
}

func TestNewTicker(t *testing.T) {
const n = 100
var counter uint64
Expand Down Expand Up @@ -120,3 +144,30 @@ func TestWait(t *testing.T) {
t.Error("counter is", counter, ", but expected", wantcounter)
}
}

func TestWaitTwice(t *testing.T) {
var counter, wantcounter uint64

now := time.Now()
maxrate := int32(time.Second / variance * 2)
ticker := NewTicker(&maxrate, &counter)

var wg sync.WaitGroup
wg.Add(1)

go func() {
defer wg.Done()
ticker.Wait()
}()
ticker.Wait()

wg.Wait()

if d := time.Since(now); d > variance {
t.Errorf("%v > %v", d, variance)
}
ticker.Close()
if counter != wantcounter {
t.Error("counter is", counter, ", but expected", wantcounter)
}
}

0 comments on commit d81f0c0

Please sign in to comment.