Skip to content

Commit

Permalink
rework the healing start/stop to remove a race where a stop would not…
Browse files Browse the repository at this point in the history
… stop the healer.

This could happen if the Stop method is called during a heal, this causes the quit message to be ignored
  • Loading branch information
Nils Dijk committed Sep 5, 2016
1 parent f1b7c45 commit a2b640f
Showing 1 changed file with 25 additions and 29 deletions.
54 changes: 25 additions & 29 deletions swim/heal_via_discover_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ package swim
import (
"errors"
"math/rand"
"sync"
"time"

log "github.com/uber-common/bark"
Expand All @@ -42,8 +43,9 @@ type discoverProviderHealer struct {
period time.Duration

previousHostListSize int
quit chan struct{}
started chan struct{}

runningMu sync.Mutex
running chan bool

logger log.Logger

Expand All @@ -56,48 +58,42 @@ func newDiscoverProviderHealer(n *Node, baseProbability float64, period time.Dur
baseProbabillity: baseProbability,
period: period,
logger: logging.Logger("healer").WithField("local", n.Address()),
started: make(chan struct{}, 1),
quit: make(chan struct{}),
rand: rand.New(rand.NewSource(n.clock.Now().UnixNano())),
}
}

// Start the partition healing loop
func (h *discoverProviderHealer) Start() {
// check if started channel is already filled
// if not, we start a new loop
select {
case h.started <- struct{}{}:
default:
h.runningMu.Lock()
defer h.runningMu.Unlock()

if h.running != nil {
// if the channel is not nil we are already running
return
}

go func() {
for {
// attempt heal with the pro
if h.rand.Float64() < h.Probability() {
h.Heal()
}

// loop or quit
select {
case <-h.node.clock.After(h.period):
case <-h.quit:
return
}
h.running = schedule(func() {
// attempt heal with the probability
if h.rand.Float64() < h.Probability() {
h.Heal()
}
}()
}, func() time.Duration {
return h.period
}, h.node.clock)
}

// Stop the partition healing loop.
func (h *discoverProviderHealer) Stop() {
// if started, consume and send quit signal
// if not started this is noop
select {
case <-h.started:
h.quit <- struct{}{}
default:
h.runningMu.Lock()
defer h.runningMu.Unlock()

if h.running == nil {
// if the channel is nil we are not running
return
}

close(h.running)
h.running = nil
}

// Probability returns the probability when a heal should be attempted
Expand Down

0 comments on commit a2b640f

Please sign in to comment.