forked from flynn/flynn
/
discoverd_wrapper.go
93 lines (81 loc) · 2.21 KB
/
discoverd_wrapper.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
package main
import (
"errors"
"time"
discoverd "github.com/flynn/flynn/discoverd/client"
"github.com/flynn/flynn/pkg/shutdown"
"github.com/flynn/flynn/pkg/stream"
"gopkg.in/inconshreveable/log15.v2"
)
const serviceName = "cluster-monitor"
func newDiscoverdWrapper(addr string, logger log15.Logger) *discoverdWrapper {
return &discoverdWrapper{
leader: make(chan bool),
addr: addr,
logger: logger,
}
}
type discoverdWrapper struct {
addr string
leader chan bool
logger log15.Logger
}
func (d *discoverdWrapper) Register() (bool, error) {
log := d.logger.New("fn", "discoverd.Register")
log.Info("registering with service discovery")
hb, err := discoverd.AddServiceAndRegister(serviceName, d.addr)
if err != nil {
log.Error("error registering with service discovery", "err", err)
return false, err
}
shutdown.BeforeExit(func() { hb.Close() })
selfAddr := hb.Addr()
log = log.New("self.addr", selfAddr)
service := discoverd.NewService(serviceName)
var leaders chan *discoverd.Instance
var stream stream.Stream
connect := func() (err error) {
log.Info("connecting service leader stream")
leaders = make(chan *discoverd.Instance)
stream, err = service.Leaders(leaders)
if err != nil {
log.Error("error connecting service leader stream", "err", err)
}
return
}
if err := connect(); err != nil {
return false, err
}
go func() {
outer:
for {
for leader := range leaders {
if leader == nil {
// a nil leader indicates there are no instances for
// the service, ignore and wait for an actual leader
log.Warn("received nil leader event")
continue
}
log.Info("received leader event", "leader.addr", leader.Addr)
d.leader <- leader.Addr == selfAddr
}
log.Warn("service leader stream disconnected", "err", stream.Err())
for {
if err := connect(); err == nil {
continue outer
}
time.Sleep(100 * time.Millisecond)
}
}
}()
select {
case isLeader := <-d.leader:
return isLeader, nil
case <-time.After(30 * time.Second):
return false, errors.New("timed out waiting for current service leader")
}
}
// Only one reciever can consume from this channel at a time.
func (d *discoverdWrapper) LeaderCh() chan bool {
return d.leader
}