Skip to content

Commit

Permalink
notify: fix data race and send on close chan in receiveCh (#566)
Browse files Browse the repository at this point in the history
  • Loading branch information
amyangfei committed May 13, 2020
1 parent 99f01ac commit 8c515b7
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 18 deletions.
54 changes: 36 additions & 18 deletions pkg/notify/notify.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,23 +20,41 @@ func (n *Notifier) Notify() {
n.mu.RLock()
defer n.mu.RUnlock()
for _, receiver := range n.receivers {
signalNonBlocking(receiver.rec.c)
receiver.rec.signalNonBlocking()
}
}

func signalNonBlocking(ch chan struct{}) {
// Receiver is a receiver of notifier, including the receiver channel and stop receiver function.
type Receiver struct {
C <-chan struct{}
c chan struct{}
Stop func()
ticker *time.Ticker
closeCh chan struct{}
}

// returns true if the receiverCh should be closed
func (r *Receiver) signalNonBlocking() bool {
select {
case ch <- struct{}{}:
case <-r.closeCh:
return true
case r.c <- struct{}{}:
default:
}
return false
}

// Receiver is a receiver of notifier, including the receiver channel and stop receiver function.
type Receiver struct {
C <-chan struct{}
Stop func()
ticker *time.Ticker
c chan struct{}
func (r *Receiver) signalTickLoop() {
go func() {
loop:
for range r.ticker.C {
exit := r.signalNonBlocking()
if exit {
break loop
}
}
close(r.c)
}()
}

// NewReceiver creates a receiver
Expand All @@ -47,22 +65,22 @@ func (n *Notifier) NewReceiver(tickTime time.Duration) *Receiver {
currentIndex := n.maxIndex
n.maxIndex++
receiverCh := make(chan struct{}, 1)
closeCh := make(chan struct{})
var ticker *time.Ticker
if tickTime > 0 {
ticker = time.NewTicker(tickTime)
go func() {
for range ticker.C {
signalNonBlocking(receiverCh)
}
}()
}
rec := &Receiver{
C: receiverCh,
c: receiverCh,
Stop: func() {
n.remove(currentIndex)
},
ticker: ticker,
c: receiverCh,
ticker: ticker,
closeCh: closeCh,
}
if tickTime > 0 {
rec.signalTickLoop()
}
n.receivers = append(n.receivers, struct {
rec *Receiver
Expand All @@ -77,10 +95,10 @@ func (n *Notifier) remove(index int) {
for i, receiver := range n.receivers {
if receiver.index == index {
n.receivers = append(n.receivers[:i], n.receivers[i+1:]...)
close(receiver.rec.closeCh)
if receiver.rec.ticker != nil {
receiver.rec.ticker.Stop()
}
close(receiver.rec.c)
break
}
}
Expand All @@ -94,7 +112,7 @@ func (n *Notifier) Close() {
if receiver.rec.ticker != nil {
receiver.rec.ticker.Stop()
}
close(receiver.rec.c)
close(receiver.rec.closeCh)
}
n.receivers = nil
}
15 changes: 15 additions & 0 deletions pkg/notify/notify_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,3 +42,18 @@ func (s *notifySuite) TestNotifyHub(c *check.C) {
<-r5.C
r5.Stop()
}

func (s *notifySuite) TestContinusStop(c *check.C) {
notifier := new(Notifier)
n := 5000
receivers := make([]*Receiver, n)
for i := 0; i < n; i++ {
receivers[i] = notifier.NewReceiver(10 * time.Millisecond)
}
for i := 0; i < n; i++ {
<-receivers[i].C
}
for i := 0; i < n; i++ {
receivers[i].Stop()
}
}

0 comments on commit 8c515b7

Please sign in to comment.