forked from hashicorp/consul
/
flood.go
66 lines (54 loc) · 1.33 KB
/
flood.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
package consul
import (
"time"
"github.com/hashicorp/consul/agent/consul/servers"
"github.com/hashicorp/serf/serf"
)
// FloodNotify lets all the waiting Flood goroutines know that some change may
// have affected them.
func (s *Server) FloodNotify() {
s.floodLock.RLock()
defer s.floodLock.RUnlock()
for _, ch := range s.floodCh {
select {
case ch <- struct{}{}:
default:
}
}
}
// Flood is a long-running goroutine that floods servers from the LAN to the
// given global Serf instance, such as the WAN. This will exit once either of
// the Serf instances are shut down.
func (s *Server) Flood(portFn servers.FloodPortFn, global *serf.Serf) {
s.floodLock.Lock()
floodCh := make(chan struct{})
s.floodCh = append(s.floodCh, floodCh)
s.floodLock.Unlock()
ticker := time.NewTicker(s.config.SerfFloodInterval)
defer ticker.Stop()
defer func() {
s.floodLock.Lock()
defer s.floodLock.Unlock()
for i, ch := range s.floodCh {
if ch == floodCh {
s.floodCh = append(s.floodCh[:i], s.floodCh[i+1:]...)
return
}
}
panic("flood channels out of sync")
}()
for {
select {
case <-s.serfLAN.ShutdownCh():
return
case <-global.ShutdownCh():
return
case <-ticker.C:
goto FLOOD
case <-floodCh:
goto FLOOD
}
FLOOD:
servers.FloodJoins(s.logger, portFn, s.config.Datacenter, s.serfLAN, global)
}
}