/
discovery.go
111 lines (103 loc) · 2.31 KB
/
discovery.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
package proxy
import (
disc "github.com/jeffjen/go-discovery"
log "github.com/Sirupsen/logrus"
etcd "github.com/coreos/etcd/client"
ctx "context"
"path"
)
var (
retry = &Backoff{}
)
func watchWorker(c ctx.Context, watcher etcd.Watcher, key string) <-chan bool {
v := make(chan bool)
go func() {
evt, err := watcher.Next(c)
if err != nil {
log.WithFields(log.Fields{"err": err}).Debug("watch")
retry.Delay()
v <- false
} else {
retry.Reset()
log.WithFields(log.Fields{"Action": evt.Action, "Key": evt.Node.Key}).Debug("key space event")
if evt.Action == "set" || evt.Action == "expire" || evt.Action == "delete" {
if key == path.Dir(evt.Node.Key) {
v <- true
} else {
v <- false
}
} else {
v <- false
}
}
}()
return v
}
func obtainWorker(o chan<- []string, d *DiscOptions) chan<- bool {
order := make(chan bool, 8)
go func() {
for _ = range order {
nodes, err := obtain(d)
if err != nil {
log.WithFields(log.Fields{"err": err}).Debug("watch")
o <- nil
} else {
o <- nodes
}
}
}()
return order
}
func watch(c ctx.Context, d *DiscOptions) (output <-chan []string, stop <-chan struct{}) {
o, s := make(chan []string), make(chan struct{})
go func() {
defer close(s)
watcher, err := disc.NewWatcher(&disc.WatcherOptions{
Config: etcd.Config{Endpoints: d.Endpoints},
Key: d.Service,
AfterIndex: d.AfterIndex,
Recursive: true,
})
if err != nil {
log.WithFields(log.Fields{"err": err}).Warning("watch")
return
}
order := obtainWorker(o, d)
defer close(order)
for yay := true; yay; {
v := watchWorker(c, watcher, d.Service)
select {
case <-c.Done():
yay = false
case expect, ok := <-v:
if ok && expect {
order <- true
}
yay = ok
}
}
}()
output, stop = o, s
return
}
func obtain(d *DiscOptions) ([]string, error) {
cfg := etcd.Config{Endpoints: d.Endpoints}
kAPI, err := disc.NewKeysAPI(cfg)
if err != nil {
return nil, err
}
resp, err := kAPI.Get(ctx.Background(), d.Service, &etcd.GetOptions{
Recursive: true,
})
if err != nil {
return nil, err
}
to := make([]string, 0)
for _, n := range resp.Node.Nodes {
if !n.Dir {
to = append(to, path.Base(n.Key))
}
}
log.WithFields(log.Fields{"To": to, "Service": d.Service}).Info("candidate")
return to, nil
}