forked from YotpoLtd/resec
/
reconciler.go
445 lines (363 loc) · 12.9 KB
/
reconciler.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
package reconciler
import (
"encoding/json"
"fmt"
"os"
"sync"
"syscall"
"time"
"github.com/bep/debounce"
"github.com/seatgeek/resec/resec/consul"
"github.com/seatgeek/resec/resec/redis"
"github.com/seatgeek/resec/resec/state"
log "github.com/sirupsen/logrus"
"gopkg.in/d4l3k/messagediff.v1"
)
const (
ResultConsulNotHealthy = resultType("consul_not_healthy")
ResultMasterLinkDown = resultType("master_link_down")
ResultMasterSyncInProgress = resultType("master_sync_in_progress")
ResultMissingState = resultType("missing_state")
ResultNoMasterElected = resultType("no_master_elected")
ResultRedisNotHealthy = resultType("redis_not_healthy")
ResultRunAsMaster = resultType("run_as_master")
ResultRunAsSlave = resultType("run_as_slave")
ResultUnknown = resultType("unknown")
ResultUpdateService = resultType("consul_update_service")
)
type resultType string
// reconciler will take a stream of changes happening to
// consul and redis and decide what actions that should be taken
type Reconciler struct {
consulCommandCh chan<- consul.Command // Write-only channel to request Consul actions to be taken
consulState state.Consul // Latest (cached) Consul state
consulStateCh <-chan state.Consul // Read-only channel to get Consul state updates
debugSignalCh chan os.Signal // signal channel (OS / signal debug state)
forceReconcileInterval time.Duration // How often we should force reconcile
logger *log.Entry // reconciler logger
reconcileCh chan interface{} // Channel to trigger a reconcile loop
redisCommandCh chan<- redis.Command // Write-only channel to request Redis actions to be taken
redisState state.Redis // Latest (cached) Redis state
redisStateCh <-chan state.Redis // Read-only channel to get Redis state updates
signalCh chan os.Signal // signal channel (OS / signal shutdown)
stopCh chan interface{} // stop channel (internal shutdown)
sync.Mutex
}
// sendRedisCommand will build and send a Redis command
func (r *Reconciler) sendRedisCommand(cmd redis.CommandName) {
r.redisCommandCh <- redis.NewCommand(cmd, r.consulState)
}
// sendConsulCommand will build and send a Consul command
func (r *Reconciler) sendConsulCommand(cmd consul.CommandName) {
r.consulCommandCh <- consul.NewCommand(cmd, r.redisState)
}
// Run starts the procedure
func (r *Reconciler) Run() {
// Default state
currentState := ResultUnknown
// Configure logger
r.logger = log.WithField("system", "reconciler").WithField("state", currentState)
// Fire up our internal state reader, consuming updates from Consul and Redis
go r.stateReader()
// Start the Consul reader
r.sendConsulCommand(consul.StartCommand)
// Start the Redis reader
r.sendRedisCommand(redis.StartCommand)
// how long to wait between forced renconcile (e.g. to keep TTL happy)
periodicReconcileCh := time.NewTicker(r.forceReconcileInterval)
// Debounce reconciler update events if they happen in rapid succession
debounced := debounce.New(100 * time.Millisecond)
for {
select {
// signal handler
case <-r.signalCh:
fmt.Println("")
r.logger.Warn("Caught signal, stopping reconciler loop")
go r.stop()
case sig := <-r.debugSignalCh:
if sig == syscall.SIGUSR1 {
r.logger.WithField("dump_state", "consul").Warn(r.prettyPrint(r.consulState))
r.logger.WithField("dump_state", "redis").Warn(r.prettyPrint(r.redisState))
}
if sig == syscall.SIGUSR2 {
r.logger.WithField("dump_state", "reconciler").Warn(r.prettyPrint(r))
}
// Trigger a
case <-periodicReconcileCh.C:
r.reconcileCh <- true
// stop the infinite loop
case <-r.stopCh:
r.logger.Info("Shutdown requested, stopping reconciler loop")
return
// evaluate state and reconcile the systems under control
case <-r.reconcileCh:
debounced(func() {
newState := r.evaluate()
// always apply the state, even if its the same (for cases like ResultUpdateService)
r.apply(newState)
// If there are no state change, we're done
if currentState == newState {
return
}
// Update the reconciler logger to reflect the new state
r.logger = r.logger.WithField("state", newState)
// Log the reconciler state change
r.logger.
WithField("old_state", currentState).
Infof("Reconciler state transitioned from '%s' to '%s'", currentState, newState)
// Update current state
currentState = newState
})
}
}
}
func (r *Reconciler) apply(state resultType) {
switch state {
case ResultMissingState:
r.logger.Debug("Not ready to reconcile yet, missing initial state")
case ResultConsulNotHealthy:
r.logger.Debugf("Can't reconcile, Consul is not healthy")
case ResultRedisNotHealthy:
r.logger.Debugf("Redis is not healthy, deregister Consul service and don't do any further changes")
r.sendConsulCommand(consul.ReleaseLockCommand)
r.sendConsulCommand(consul.DeregisterServiceCommand)
case ResultUpdateService:
r.sendConsulCommand(consul.UpdateServiceCommand)
case ResultRunAsMaster:
r.logger.Info("Configure Redis as master")
r.sendRedisCommand(redis.RunAsMasterCommand)
r.sendConsulCommand(consul.RegisterServiceCommand)
case ResultNoMasterElected:
r.logger.Warn("Currently no master Redis is elected in Consul catalog, can't enslave local Redis")
case ResultRunAsSlave:
r.logger.Info("Reconfigure Redis as slave")
r.sendRedisCommand(redis.RunAsSlaveCommand)
case ResultMasterSyncInProgress:
r.logger.Warn("Master sync in progress, can't serve traffic")
r.sendConsulCommand(consul.DeregisterServiceCommand)
case ResultMasterLinkDown:
r.logger.Warn("Master link is down, can't serve traffic")
r.sendConsulCommand(consul.DeregisterServiceCommand)
default:
r.logger.Errorf("Unknown state: %s", state)
}
}
func (r *Reconciler) evaluate() resultType {
// Make sure the state doesn't change half-way through our evaluation
r.Lock()
defer r.Unlock()
defer r.timeTrack(time.Now(), "reconciler")
// do we have the initial state to start reconciliation
if r.missingInitialState() {
return ResultMissingState
}
// If Consul is not healthy, we can't make changes to the topology, as
// we are unable to update the Consul catalog
if r.consulState.IsUnhealhy() {
return ResultConsulNotHealthy
}
// If Redis is not healthy, we can't re-configure Redis if need be, so the only
// option is to step down as leader (if we are) and remove our Consul service
if r.redisState.IsUnhealthy() {
return ResultRedisNotHealthy
}
// if the Consul Lock is held (aka this instance should be master)
if r.consulState.IsMaster() {
// redis is already configured as master, so just update the consul check
if r.redisState.IsRedisMaster() {
return ResultUpdateService
}
// redis is not currently configured as master
return ResultRunAsMaster
}
// if the consul lock is *not* held (aka this instance should be slave)
if r.consulState.IsSlave() {
// can't enslave if there are no known master redis in consul catalog
if r.consulState.NoMasterElected() {
return ResultNoMasterElected
}
// is not following the current Redis master
if r.notSlaveOfCurrentMaster() {
return ResultRunAsSlave
}
// if sycing with redis master, lets wait for it to complete
if r.isMasterSyncInProgress() {
return ResultMasterSyncInProgress
}
// if master link is down, lets wait for it to come back up
if r.isMasterLinkDown() && r.isMasterLinkDownTooLong() {
return ResultMasterLinkDown
}
// everything is fine, update service ttl
return ResultUpdateService
}
return ResultUnknown
}
func (r *Reconciler) stateReader() {
for {
select {
// stop the infinite loop
case <-r.stopCh:
r.logger.Info("Shutdown requested, stopping state loop")
return
// New redis state change
case redis, ok := <-r.redisStateCh:
if !ok {
r.logger.Error("Redis state channel was closed, shutting down")
return
}
r.Lock()
r.logger.Debug("New Redis state")
changed := r.diffState(r.redisState, redis)
if changed {
r.redisState = redis
r.reconcileCh <- true
}
r.Unlock()
// New Consul state change
case consul, ok := <-r.consulStateCh:
if !ok {
r.logger.Error("Consul state channel was closed, shutting down")
return
}
r.Lock()
r.logger.Debug("New Consul state")
changed := r.diffState(r.consulState, consul)
if changed {
r.consulState = consul
r.reconcileCh <- true
}
r.Unlock()
}
}
}
// isMasterSyncInProgress return whether the slave is currently doing a full sync from
// the redis master - this is also the initial sync triggered by doing a SLAVEOF command
func (r *Reconciler) isMasterSyncInProgress() bool {
return r.redisState.Info.MasterSyncInProgress
}
// isMasterLinkDown return whether the slave has lost connection to the
// redis master
func (r *Reconciler) isMasterLinkDown() bool {
return r.redisState.Info.MasterLinkUp == false
}
// isMasterLinkDownTooLong return whether the slave has lost connectity to the
// redis master for too long
func (r *Reconciler) isMasterLinkDownTooLong() bool {
// TODO(jippi): make 10s configurable
return r.redisState.Info.MasterLinkDownSince > 10*time.Second
}
// missingInitialState return whether we got initial state from both Consul
// and Redis, so we are able to start making decissions on the state of
// the Redis under management
func (r *Reconciler) missingInitialState() bool {
if r.redisState.Ready == false {
r.logger.Warn("Redis still missing initial state")
return true
}
if r.consulState.Ready == false {
r.logger.Warn("Consul still missing initial state")
return true
}
return false
}
// notSlaveOfCurrentMaster return wheter the Redis under management currently
// are configured to be slave of the currently elected master Redis
func (r *Reconciler) notSlaveOfCurrentMaster() bool {
logger := r.logger.WithField("check", "isSlaveOfCurrentMaster")
// if Redis thing its master, it can't be a slave of another node
if r.redisState.IsRedisMaster() {
logger.Debugf("isRedismaster() == true")
return true
}
// if the host don't match consul state, it's not slave (of the right node)
if r.redisState.Info.MasterHost != r.consulState.MasterAddr {
logger.Debugf("'master_host=%s' do not match expected master host %s", r.redisState.Info.MasterHost, r.consulState.MasterAddr)
return true
}
// if the port don't match consul state, it's not slave (of the right node)
if r.redisState.Info.MasterPort != r.consulState.MasterPort {
logger.Debugf("'master_port=%d' do not match expected master host %d", r.redisState.Info.MasterPort, r.consulState.MasterPort)
return true
}
// looks good
return false
}
// stop will ensure Consul and Redis will gracefully stop
func (r *Reconciler) stop() {
var wg sync.WaitGroup
wg.Add(3)
r.logger.Debugf("Consul Cleanup started ")
r.sendConsulCommand(consul.StopConsulCommand)
r.logger.Debugf("Redis Cleanup started ")
r.sendRedisCommand(redis.StopCommand)
// monitor cleanup process from state
go func() {
redisStopped := false
consulStopped := false
timeoutCh := time.NewTimer(1 * time.Minute)
intervalCh := time.NewTicker(1 * time.Second)
for {
select {
case <-timeoutCh.C:
r.logger.Fatal("Did not gracefully shut down within 60s, hard quitting")
case <-intervalCh.C:
if r.redisState.Stopped && redisStopped == false {
redisStopped = true
wg.Done()
}
if r.consulState.Stopped && consulStopped == false {
consulStopped = true
wg.Done()
}
if redisStopped && consulStopped {
wg.Done()
return
}
}
}
}()
wg.Wait()
r.logger.Debugf("Cleanup completed")
close(r.stopCh)
}
func (r *Reconciler) timeTrack(start time.Time, name string) {
elapsed := time.Since(start)
r.logger.Debugf("%s took %s", name, elapsed)
}
func (r *Reconciler) diffState(a, b interface{}) bool {
d, equal := messagediff.DeepDiff(a, b)
if equal {
return false
}
for path, added := range d.Added {
r.logger.Debugf("added: %s = %#v", path.String(), added)
}
for path, removed := range d.Removed {
r.logger.Debugf("removed: %s = %#v", path.String(), removed)
}
for path, modified := range d.Modified {
r.logger.Debugf("modified: %s = %#v", path.String(), modified)
}
return true
}
// prettyPrint will JSON encode the input and return the string
// we use it to print the internal state of the reconciler when getting
// the right SIG
func (r *Reconciler) prettyPrint(data interface{}) string {
var p []byte
p, err := json.MarshalIndent(data, "", "\t")
if err != nil {
r.logger.Error(err)
return ""
}
return string(p)
}
// Marshalling to JSON is used when sendnig debug signal to the process
func (r *Reconciler) MarshalJSON() ([]byte, error) {
return json.Marshal(map[string]interface{}{
"consulState": r.consulState,
"forceReconcileInterval": r.forceReconcileInterval,
"redisState": r.redisState,
})
}