/
watcher.go
75 lines (63 loc) · 1.42 KB
/
watcher.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
package etcd
import (
"context"
"encoding/json"
"errors"
"path"
"github.com/iobrother/zoo/core/registry"
clientv3 "go.etcd.io/etcd/client/v3"
)
type watcher struct {
w clientv3.WatchChan
client *clientv3.Client
ctx context.Context
cancel func()
}
func newWatcher(ctx context.Context, c *clientv3.Client, basePath, service string) *watcher {
w := &watcher{
client: c,
}
w.ctx, w.cancel = context.WithCancel(ctx)
watchPath := path.Join(basePath, service) + "/"
w.w = c.Watch(ctx, watchPath, clientv3.WithPrefix(), clientv3.WithPrevKV())
return w
}
func decode(b []byte) *registry.Service {
var s *registry.Service
json.Unmarshal(b, &s)
return s
}
func (w *watcher) Next() (*registry.Result, error) {
for rsp := range w.w {
if rsp.Err() != nil {
return nil, rsp.Err()
}
if rsp.Canceled {
return nil, errors.New("could not get next")
}
for _, e := range rsp.Events {
service := decode(e.Kv.Value)
var action string
switch e.Type {
case clientv3.EventTypePut:
if e.IsCreate() {
action = "create"
} else if e.IsModify() {
action = "update"
}
case clientv3.EventTypeDelete:
action = "delete"
service = decode(e.PrevKv.Value)
}
if service == nil {
continue
}
return ®istry.Result{Action: action, Service: service}, nil
}
}
return nil, errors.New("could not get next")
}
func (w *watcher) Stop() {
w.cancel()
_ = w.client.Close()
}