-
Notifications
You must be signed in to change notification settings - Fork 0
/
client.go
259 lines (217 loc) · 6 KB
/
client.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
package etcdv3
import (
"context"
"crypto/tls"
"errors"
"time"
"go.etcd.io/etcd/client/pkg/v3/transport"
clientv3 "go.etcd.io/etcd/client/v3"
"google.golang.org/grpc"
)
var (
// ErrNoKey indicates a client method needs a key but receives none.
ErrNoKey = errors.New("no key provided")
// ErrNoValue indicates a client method needs a value but receives none.
ErrNoValue = errors.New("no value provided")
)
// Client is a wrapper around the etcd client.
type Client interface {
// GetEntries queries the given prefix in etcd and returns a slice
// containing the values of all keys found, recursively, underneath that
// prefix.
GetEntries(prefix string) ([]string, error)
// WatchPrefix watches the given prefix in etcd for changes. When a change
// is detected, it will signal on the passed channel. Clients are expected
// to call GetEntries to update themselves with the latest set of complete
// values. WatchPrefix will always send an initial sentinel value on the
// channel after establishing the watch, to ensure that clients always
// receive the latest set of values. WatchPrefix will block until the
// context passed to the NewClient constructor is terminated.
WatchPrefix(prefix string, ch chan struct{})
// Register a service with etcd.
Register(s Service) error
// Deregister a service with etcd.
Deregister(s Service) error
// LeaseID returns the lease id created for this service instance
LeaseID() int64
}
type client struct {
cli *clientv3.Client
ctx context.Context
kv clientv3.KV
// Watcher interface instance, used to leverage Watcher.Close()
watcher clientv3.Watcher
// watcher context
wctx context.Context
// watcher cancel func
wcf context.CancelFunc
// leaseID will be 0 (clientv3.NoLease) if a lease was not created
leaseID clientv3.LeaseID
hbch <-chan *clientv3.LeaseKeepAliveResponse
// Lease interface instance, used to leverage Lease.Close()
leaser clientv3.Lease
}
// ClientOptions defines options for the etcd client. All values are optional.
// If any duration is not specified, a default of 3 seconds will be used.
type ClientOptions struct {
Cert string
Key string
CACert string
DialTimeout time.Duration
DialKeepAlive time.Duration
// DialOptions is a list of dial options for the gRPC client (e.g., for interceptors).
// For example, pass grpc.WithBlock() to block until the underlying connection is up.
// Without this, Dial returns immediately and connecting the server happens in background.
DialOptions []grpc.DialOption
Username string
Password string
}
// NewClient returns Client with a connection to the named machines. It will
// return an error if a connection to the cluster cannot be made.
func NewClient(ctx context.Context, machines []string, options ClientOptions) (Client, error) {
if options.DialTimeout == 0 {
options.DialTimeout = 3 * time.Second
}
if options.DialKeepAlive == 0 {
options.DialKeepAlive = 3 * time.Second
}
var err error
var tlscfg *tls.Config
if options.Cert != "" && options.Key != "" {
tlsInfo := transport.TLSInfo{
CertFile: options.Cert,
KeyFile: options.Key,
TrustedCAFile: options.CACert,
}
tlscfg, err = tlsInfo.ClientConfig()
if err != nil {
return nil, err
}
}
cli, err := clientv3.New(clientv3.Config{
Context: ctx,
Endpoints: machines,
DialTimeout: options.DialTimeout,
DialKeepAliveTime: options.DialKeepAlive,
DialOptions: options.DialOptions,
TLS: tlscfg,
Username: options.Username,
Password: options.Password,
})
if err != nil {
return nil, err
}
return &client{
cli: cli,
ctx: ctx,
kv: clientv3.NewKV(cli),
}, nil
}
func (c *client) LeaseID() int64 { return int64(c.leaseID) }
// GetEntries implements the etcd Client interface.
func (c *client) GetEntries(key string) ([]string, error) {
resp, err := c.kv.Get(c.ctx, key, clientv3.WithPrefix())
if err != nil {
return nil, err
}
entries := make([]string, len(resp.Kvs))
for i, kv := range resp.Kvs {
entries[i] = string(kv.Value)
}
return entries, nil
}
// WatchPrefix implements the etcd Client interface.
func (c *client) WatchPrefix(prefix string, ch chan struct{}) {
c.wctx, c.wcf = context.WithCancel(c.ctx)
c.watcher = clientv3.NewWatcher(c.cli)
wch := c.watcher.Watch(c.wctx, prefix, clientv3.WithPrefix(), clientv3.WithRev(0))
ch <- struct{}{}
for wr := range wch {
if wr.Canceled {
return
}
ch <- struct{}{}
}
}
func (c *client) Register(s Service) error {
var err error
if s.Key == "" {
return ErrNoKey
}
if s.Value == "" {
return ErrNoValue
}
if c.leaser != nil {
c.leaser.Close()
}
c.leaser = clientv3.NewLease(c.cli)
if c.watcher != nil {
c.watcher.Close()
}
c.watcher = clientv3.NewWatcher(c.cli)
if c.kv == nil {
c.kv = clientv3.NewKV(c.cli)
}
if s.TTL == nil {
s.TTL = NewTTLOption(time.Second*3, time.Second*10)
}
grantResp, err := c.leaser.Grant(c.ctx, int64(s.TTL.ttl.Seconds()))
if err != nil {
return err
}
c.leaseID = grantResp.ID
_, err = c.kv.Put(
c.ctx,
s.Key,
s.Value,
clientv3.WithLease(c.leaseID),
)
if err != nil {
return err
}
// this will keep the key alive 'forever' or until we revoke it or
// the context is canceled
c.hbch, err = c.leaser.KeepAlive(c.ctx, c.leaseID)
if err != nil {
return err
}
// discard the keepalive response, make etcd library not to complain
// fix bug #799
go func() {
for {
select {
case r := <-c.hbch:
// avoid dead loop when channel was closed
if r == nil {
return
}
case <-c.ctx.Done():
return
}
}
}()
return nil
}
func (c *client) Deregister(s Service) error {
defer c.close()
if s.Key == "" {
return ErrNoKey
}
if _, err := c.cli.Delete(c.ctx, s.Key, clientv3.WithIgnoreLease()); err != nil {
return err
}
return nil
}
// close will close any open clients and call
// the watcher cancel func
func (c *client) close() {
if c.leaser != nil {
c.leaser.Close()
}
if c.watcher != nil {
c.watcher.Close()
}
if c.wcf != nil {
c.wcf()
}
}