Skip to content

Commit

Permalink
fully channel based
Browse files Browse the repository at this point in the history
  • Loading branch information
linkdata committed May 7, 2024
1 parent d81f0c0 commit fa3072e
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 20 deletions.
45 changes: 25 additions & 20 deletions ticker.go
Original file line number Diff line number Diff line change
@@ -1,34 +1,28 @@
package rate

import (
"runtime"
"sync"
"sync/atomic"
"time"
)

type Ticker struct {
C <-chan struct{} // sends a struct{}{} at most maxrate times per second
tickCh chan struct{} // source for C
waitCh chan struct{} // source for Wait
counter *uint64
closing int32
stopped int32
tickCh chan struct{} // source for C, closed by runner
waitCh chan struct{} // source for Wait, closed by runner
mu sync.Mutex // protects closeCh
closeCh chan struct{} // channel signalling Close() is called
}

// Close stops the Ticker and frees resources.
//
// It is safe to call multiple times or concurrently.
func (ticker *Ticker) Close() {
if atomic.CompareAndSwapInt32(&ticker.closing, 0, 1) {
defer close(ticker.tickCh)
defer close(ticker.waitCh)
for atomic.LoadInt32(&ticker.stopped) != 1 {
select {
case <-ticker.waitCh:
default:
runtime.Gosched()
}
}
ticker.mu.Lock()
defer ticker.mu.Unlock()
if ticker.closeCh != nil {
close(ticker.closeCh)
ticker.closeCh = nil
}
}

Expand All @@ -37,10 +31,18 @@ func (ticker *Ticker) Wait() {
<-ticker.waitCh
}

// Closed returns true if the Ticker is closed.
func (ticker *Ticker) Closed() (yes bool) {
ticker.mu.Lock()
yes = ticker.closeCh == nil
ticker.mu.Unlock()
return
}

func (ticker *Ticker) run(parent <-chan struct{}, maxrate *int32, counter *uint64) {
defer func() {
atomic.StoreInt32(&ticker.stopped, 1)
ticker.Close()
close(ticker.waitCh)
close(ticker.tickCh)
}()
var rl Limiter
var timeCh <-chan time.Time
Expand All @@ -49,12 +51,13 @@ func (ticker *Ticker) run(parent <-chan struct{}, maxrate *int32, counter *uint6
parentCh := parent
tickCh := ticker.tickCh
waitCh := ticker.waitCh
closeCh := ticker.closeCh

if parent != nil {
tickCh = nil
}

for atomic.LoadInt32(&ticker.closing) == 0 {
for !ticker.Closed() {
select {
case tickCh <- struct{}{}:
// sent a tick to a consumer
Expand Down Expand Up @@ -90,6 +93,8 @@ func (ticker *Ticker) run(parent <-chan struct{}, maxrate *int32, counter *uint6
}
parentCh = nil
tickCh = ticker.tickCh
case <-closeCh:
return
}
}
}
Expand Down Expand Up @@ -120,7 +125,7 @@ func NewSubTicker(parent <-chan struct{}, maxrate *int32, counter *uint64) *Tick
ticker := &Ticker{
tickCh: make(chan struct{}),
waitCh: make(chan struct{}),
counter: counter,
closeCh: make(chan struct{}),
}
ticker.C = ticker.tickCh
go ticker.run(parent, maxrate, counter)
Expand Down
1 change: 1 addition & 0 deletions ticker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ func TestNewSubTicker(t *testing.T) {
t.Errorf("%v != %v", x, n)
}
t1.Close()

// there can be at most one extra tick to read after t1.Close
if _, ok := <-t2.C; ok {
if _, ok := <-t2.C; ok {
Expand Down

0 comments on commit fa3072e

Please sign in to comment.