forked from st3v/go-plugins
-
Notifications
You must be signed in to change notification settings - Fork 0
/
watcher.go
124 lines (108 loc) · 2.21 KB
/
watcher.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
package eureka
import (
"errors"
"time"
"github.com/hudl/fargo"
"github.com/micro/go-micro/registry"
)
type eurekaWatcher struct {
conn fargoConnection
exit chan bool
results chan *registry.Result
}
func newWatcher(conn fargoConnection) registry.Watcher {
w := &eurekaWatcher{
conn: conn,
exit: make(chan bool),
results: make(chan *registry.Result),
}
go w.poll()
return w
}
func (e *eurekaWatcher) poll() {
// list service ticker
t := time.NewTicker(time.Second * 10)
done := make(chan struct{})
services := make(map[string]<-chan fargo.AppUpdate)
for {
select {
case <-e.exit:
close(done)
return
case <-t.C:
apps, err := e.conn.GetApps()
if err != nil {
continue
}
for _, app := range apps {
if _, ok := services[app.Name]; ok {
continue
}
ch := e.conn.ScheduleAppUpdates(app.Name, false, done)
services[app.Name] = ch
go e.watch(ch, done)
}
}
}
}
func (e *eurekaWatcher) watch(ch <-chan fargo.AppUpdate, done chan struct{}) {
for {
select {
// exit on exit
case <-e.exit:
return
// exit on done
case <-done:
return
// process updates
case u := <-ch:
if u.Err != nil {
continue
}
// process instances independently
for _, instance := range u.App.Instances {
var action string
switch instance.Status {
// update
case fargo.UP:
action = "update"
// delete
case fargo.OUTOFSERVICE, fargo.UNKNOWN, fargo.DOWN:
action = "delete"
// skip
default:
continue
}
// construct the service with a single node
service := appToService(&fargo.Application{
Name: u.App.Name,
Instances: []*fargo.Instance{instance},
})
if len(service) == 0 {
continue
}
// in case we get bounced during processing
// check exit channels
select {
// send the update
case e.results <- ®istry.Result{Action: action, Service: service[0]}:
case <-done:
return
case <-e.exit:
return
}
}
}
}
}
func (e *eurekaWatcher) Next() (*registry.Result, error) {
select {
case <-e.exit:
return nil, errors.New("watcher stopped")
case r := <-e.results:
return r, nil
}
}
func (e *eurekaWatcher) Stop() {
close(e.exit)
}