forked from hashicorp/nomad
-
Notifications
You must be signed in to change notification settings - Fork 1
/
serf.go
266 lines (238 loc) · 7.51 KB
/
serf.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
package nomad
import (
"strings"
"sync/atomic"
"time"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/raft"
"github.com/hashicorp/serf/serf"
)
const (
// StatusReap is used to update the status of a node if we
// are handling a EventMemberReap
StatusReap = serf.MemberStatus(-1)
// maxPeerRetries limits how many invalidate attempts are made
maxPeerRetries = 6
// peerRetryBase is a baseline retry time
peerRetryBase = 1 * time.Second
)
// serfEventHandler is used to handle events from the serf cluster
func (s *Server) serfEventHandler() {
for {
select {
case e := <-s.eventCh:
switch e.EventType() {
case serf.EventMemberJoin:
s.nodeJoin(e.(serf.MemberEvent))
s.localMemberEvent(e.(serf.MemberEvent))
case serf.EventMemberLeave, serf.EventMemberFailed:
s.nodeFailed(e.(serf.MemberEvent))
s.localMemberEvent(e.(serf.MemberEvent))
case serf.EventMemberReap:
s.localMemberEvent(e.(serf.MemberEvent))
case serf.EventMemberUpdate, serf.EventUser, serf.EventQuery: // Ignore
default:
s.logger.Printf("[WARN] nomad: unhandled serf event: %#v", e)
}
case <-s.shutdownCh:
return
}
}
}
// nodeJoin is used to handle join events on the serf cluster
func (s *Server) nodeJoin(me serf.MemberEvent) {
for _, m := range me.Members {
ok, parts := isNomadServer(m)
if !ok {
s.logger.Printf("[WARN] nomad: non-server in gossip pool: %s", m.Name)
continue
}
s.logger.Printf("[INFO] nomad: adding server %s", parts)
// Check if this server is known
found := false
s.peerLock.Lock()
existing := s.peers[parts.Region]
for idx, e := range existing {
if e.Name == parts.Name {
existing[idx] = parts
found = true
break
}
}
// Add ot the list if not known
if !found {
s.peers[parts.Region] = append(existing, parts)
}
// Check if a local peer
if parts.Region == s.config.Region {
s.localPeers[raft.ServerAddress(parts.Addr.String())] = parts
}
s.peerLock.Unlock()
// If we still expecting to bootstrap, may need to handle this
if atomic.LoadInt32(&s.config.BootstrapExpect) != 0 {
s.maybeBootstrap()
}
}
}
// maybeBootstrap is used to handle bootstrapping when a new server joins
func (s *Server) maybeBootstrap() {
// Bootstrap can only be done if there are no committed logs, remove our
// expectations of bootstrapping. This is slightly cheaper than the full
// check that BootstrapCluster will do, so this is a good pre-filter.
var index uint64
var err error
if s.raftStore != nil {
index, err = s.raftStore.LastIndex()
} else if s.raftInmem != nil {
index, err = s.raftInmem.LastIndex()
} else {
panic("neither raftInmem or raftStore is initialized")
}
if err != nil {
s.logger.Printf("[ERR] nomad: failed to read last raft index: %v", err)
return
}
// Bootstrap can only be done if there are no committed logs,
// remove our expectations of bootstrapping
if index != 0 {
atomic.StoreInt32(&s.config.BootstrapExpect, 0)
return
}
// Scan for all the known servers
members := s.serf.Members()
var servers []serverParts
for _, member := range members {
valid, p := isNomadServer(member)
if !valid {
continue
}
if p.Region != s.config.Region {
continue
}
if p.Expect != 0 && p.Expect != int(atomic.LoadInt32(&s.config.BootstrapExpect)) {
s.logger.Printf("[ERR] nomad: peer %v has a conflicting expect value. All nodes should expect the same number.", member)
return
}
if p.Bootstrap {
s.logger.Printf("[ERR] nomad: peer %v has bootstrap mode. Expect disabled.", member)
return
}
servers = append(servers, *p)
}
// Skip if we haven't met the minimum expect count
if len(servers) < int(atomic.LoadInt32(&s.config.BootstrapExpect)) {
return
}
// Query each of the servers and make sure they report no Raft peers.
req := &structs.GenericRequest{
QueryOptions: structs.QueryOptions{
AllowStale: true,
},
}
for _, server := range servers {
var peers []string
// Retry with exponential backoff to get peer status from this server
for attempt := uint(0); attempt < maxPeerRetries; attempt++ {
if err := s.connPool.RPC(s.config.Region, server.Addr, server.MajorVersion,
"Status.Peers", req, &peers); err != nil {
nextRetry := (1 << attempt) * peerRetryBase
s.logger.Printf("[ERR] consul: Failed to confirm peer status for %s: %v. Retrying in "+
"%v...", server.Name, err, nextRetry.String())
time.Sleep(nextRetry)
} else {
break
}
}
// Found a node with some Raft peers, stop bootstrap since there's
// evidence of an existing cluster. We should get folded in by the
// existing servers if that's the case, so it's cleaner to sit as a
// candidate with no peers so we don't cause spurious elections.
// It's OK this is racy, because even with an initial bootstrap
// as long as one peer runs bootstrap things will work, and if we
// have multiple peers bootstrap in the same way, that's OK. We
// just don't want a server added much later to do a live bootstrap
// and interfere with the cluster. This isn't required for Raft's
// correctness because no server in the existing cluster will vote
// for this server, but it makes things much more stable.
if len(peers) > 0 {
s.logger.Printf("[INFO] nomad: Existing Raft peers reported by %s (%v), disabling bootstrap mode", server.Name, server.Addr)
atomic.StoreInt32(&s.config.BootstrapExpect, 0)
return
}
}
// Update the peer set
// Attempt a live bootstrap!
var configuration raft.Configuration
var addrs []string
for _, server := range servers {
addr := server.Addr.String()
addrs = append(addrs, addr)
peer := raft.Server{
ID: raft.ServerID(addr),
Address: raft.ServerAddress(addr),
}
configuration.Servers = append(configuration.Servers, peer)
}
s.logger.Printf("[INFO] nomad: Found expected number of peers (%s), attempting to bootstrap cluster...",
strings.Join(addrs, ","))
future := s.raft.BootstrapCluster(configuration)
if err := future.Error(); err != nil {
s.logger.Printf("[ERR] nomad: Failed to bootstrap cluster: %v", err)
}
// Bootstrapping complete, or failed for some reason, don't enter this again
atomic.StoreInt32(&s.config.BootstrapExpect, 0)
}
// nodeFailed is used to handle fail events on the serf cluster
func (s *Server) nodeFailed(me serf.MemberEvent) {
for _, m := range me.Members {
ok, parts := isNomadServer(m)
if !ok {
continue
}
s.logger.Printf("[INFO] nomad: removing server %s", parts)
// Remove the server if known
s.peerLock.Lock()
existing := s.peers[parts.Region]
n := len(existing)
for i := 0; i < n; i++ {
if existing[i].Name == parts.Name {
existing[i], existing[n-1] = existing[n-1], nil
existing = existing[:n-1]
n--
break
}
}
// Trim the list there are no known servers in a region
if n == 0 {
delete(s.peers, parts.Region)
} else {
s.peers[parts.Region] = existing
}
// Check if local peer
if parts.Region == s.config.Region {
delete(s.localPeers, raft.ServerAddress(parts.Addr.String()))
}
s.peerLock.Unlock()
}
}
// localMemberEvent is used to reconcile Serf events with the
// consistent store if we are the current leader.
func (s *Server) localMemberEvent(me serf.MemberEvent) {
// Do nothing if we are not the leader
if !s.IsLeader() {
return
}
// Check if this is a reap event
isReap := me.EventType() == serf.EventMemberReap
// Queue the members for reconciliation
for _, m := range me.Members {
// Change the status if this is a reap event
if isReap {
m.Status = StatusReap
}
select {
case s.reconcileCh <- m:
default:
}
}
}