forked from smallnest/rpcx
/
etcdv3_discovery.go
255 lines (220 loc) ยท 5.67 KB
/
etcdv3_discovery.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
package client
import (
"strings"
"sync"
"time"
"github.com/docker/libkv"
"github.com/docker/libkv/store"
etcd "github.com/smallnest/libkv-etcdv3-store"
"github.com/zouyapeng/rpcx/log"
)
func init() {
etcd.Register()
}
// EtcdV3Discovery is a etcd service discovery.
// It always returns the registered servers in etcd.
type EtcdV3Discovery struct {
basePath string
kv store.Store
pairs []*KVPair
chans []chan []*KVPair
mu sync.Mutex
// -1 means it always retry to watch until zookeeper is ok, 0 means no retry.
RetriesAfterWatchFailed int
filter ServiceDiscoveryFilter
stopCh chan struct{}
}
// NewEtcdV3Discovery returns a new EtcdV3Discovery.
func NewEtcdV3Discovery(basePath string, servicePath string, etcdAddr []string, options *store.Config) ServiceDiscovery {
kv, err := libkv.NewStore(etcd.ETCDV3, etcdAddr, options)
if err != nil {
log.Infof("cannot create store: %v", err)
panic(err)
}
return NewEtcdV3DiscoveryStore(basePath+"/"+servicePath, kv)
}
// NewEtcdV3DiscoveryStore return a new EtcdV3Discovery with specified store.
func NewEtcdV3DiscoveryStore(basePath string, kv store.Store) ServiceDiscovery {
if len(basePath) > 1 && strings.HasSuffix(basePath, "/") {
basePath = basePath[:len(basePath)-1]
}
d := &EtcdV3Discovery{basePath: basePath, kv: kv}
d.stopCh = make(chan struct{})
ps, err := kv.List(basePath)
if err != nil {
log.Errorf("cannot get services of from registry: %v, err: %v", basePath, err)
panic(err)
}
pairs := make([]*KVPair, 0, len(ps))
var prefix string
for _, p := range ps {
if prefix == "" {
if strings.HasPrefix(p.Key, "/") {
if strings.HasPrefix(d.basePath, "/") {
prefix = d.basePath + "/"
} else {
prefix = "/" + d.basePath + "/"
}
} else {
if strings.HasPrefix(d.basePath, "/") {
prefix = d.basePath[1:] + "/"
} else {
prefix = d.basePath + "/"
}
}
}
if p.Key == prefix[:len(prefix)-1] {
continue
}
k := strings.TrimPrefix(p.Key, prefix)
pair := &KVPair{Key: k, Value: string(p.Value)}
if d.filter != nil && !d.filter(pair) {
continue
}
pairs = append(pairs, pair)
}
d.pairs = pairs
d.RetriesAfterWatchFailed = -1
go d.watch()
return d
}
// NewEtcdV3DiscoveryTemplate returns a new EtcdV3Discovery template.
func NewEtcdV3DiscoveryTemplate(basePath string, etcdAddr []string, options *store.Config) ServiceDiscovery {
if len(basePath) > 1 && strings.HasSuffix(basePath, "/") {
basePath = basePath[:len(basePath)-1]
}
kv, err := libkv.NewStore(etcd.ETCDV3, etcdAddr, options)
if err != nil {
log.Infof("cannot create store: %v", err)
panic(err)
}
return &EtcdV3Discovery{basePath: basePath, kv: kv}
}
// Clone clones this ServiceDiscovery with new servicePath.
func (d *EtcdV3Discovery) Clone(servicePath string) ServiceDiscovery {
return NewEtcdV3DiscoveryStore(d.basePath+"/"+servicePath, d.kv)
}
// SetFilter sets the filer.
func (d *EtcdV3Discovery) SetFilter(filter ServiceDiscoveryFilter) {
d.filter = filter
}
// GetServices returns the servers
func (d *EtcdV3Discovery) GetServices() []*KVPair {
return d.pairs
}
// WatchService returns a nil chan.
func (d *EtcdV3Discovery) WatchService() chan []*KVPair {
d.mu.Lock()
defer d.mu.Unlock()
ch := make(chan []*KVPair, 10)
d.chans = append(d.chans, ch)
return ch
}
func (d *EtcdV3Discovery) RemoveWatcher(ch chan []*KVPair) {
d.mu.Lock()
defer d.mu.Unlock()
var chans []chan []*KVPair
for _, c := range d.chans {
if c == ch {
continue
}
chans = append(chans, c)
}
d.chans = chans
}
func (d *EtcdV3Discovery) watch() {
defer func() {
d.kv.Close()
}()
rewatch:
for {
var err error
var c <-chan []*store.KVPair
var tempDelay time.Duration
retry := d.RetriesAfterWatchFailed
for d.RetriesAfterWatchFailed < 0 || retry >= 0 {
c, err = d.kv.WatchTree(d.basePath, nil)
if err != nil {
if d.RetriesAfterWatchFailed > 0 {
retry--
}
if tempDelay == 0 {
tempDelay = 1 * time.Second
} else {
tempDelay *= 2
}
if max := 30 * time.Second; tempDelay > max {
tempDelay = max
}
log.Warnf("can not watchtree (with retry %d, sleep %v): %s: %v", retry, tempDelay, d.basePath, err)
time.Sleep(tempDelay)
continue
}
break
}
if err != nil {
log.Errorf("can't watch %s: %v", d.basePath, err)
return
}
for {
select {
case <-d.stopCh:
log.Info("discovery has been closed")
return
case ps := <-c:
if ps == nil {
log.Warnf("rewatch %s", d.basePath)
goto rewatch
}
var pairs []*KVPair // latest servers
var prefix string
for _, p := range ps {
if prefix == "" {
if strings.HasPrefix(p.Key, "/") {
if strings.HasPrefix(d.basePath, "/") {
prefix = d.basePath + "/"
} else {
prefix = "/" + d.basePath + "/"
}
} else {
if strings.HasPrefix(d.basePath, "/") {
prefix = d.basePath[1:] + "/"
} else {
prefix = d.basePath + "/"
}
}
}
if p.Key == prefix[:len(prefix)-1] || !strings.HasPrefix(p.Key, prefix) {
continue
}
k := strings.TrimPrefix(p.Key, prefix)
pair := &KVPair{Key: k, Value: string(p.Value)}
if d.filter != nil && !d.filter(pair) {
continue
}
pairs = append(pairs, pair)
}
d.pairs = pairs
d.mu.Lock()
for _, ch := range d.chans {
ch := ch
go func() {
defer func() {
recover()
}()
select {
case ch <- pairs:
case <-time.After(time.Minute):
log.Warn("chan is full and new change has been dropped")
}
}()
}
d.mu.Unlock()
}
}
log.Warn("chan is closed and will rewatch")
}
}
func (d *EtcdV3Discovery) Close() {
close(d.stopCh)
}