-
Notifications
You must be signed in to change notification settings - Fork 0
/
etcd.go
78 lines (65 loc) · 1.69 KB
/
etcd.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
package etcd
import (
"crypto/tls"
"fmt"
"time"
v3 "github.com/coreos/etcd/clientv3"
"github.com/coreos/etcd/pkg/transport"
"github.com/legionus/kavka/pkg/config"
"github.com/legionus/kavka/pkg/context"
)
type EtcdClient struct {
*v3.Client
}
func useTLS(servingInfo config.ServingInfo) bool {
return len(servingInfo.SecurityConfig.CertFile) > 0
}
// NewEtcdClient creates an etcd client based on the provided config.
func NewEtcdClient(cfg *config.Config) (*EtcdClient, error) {
var (
err error
clientTLSInfo transport.TLSInfo
ctlscfg *tls.Config
)
if useTLS(cfg.Etcd.Client) {
clientTLSInfo.CertFile = cfg.Etcd.Client.CertFile
clientTLSInfo.KeyFile = cfg.Etcd.Client.KeyFile
clientTLSInfo.ClientCertAuth = cfg.Etcd.Client.CertAuth
clientTLSInfo.TrustedCAFile = cfg.Etcd.Client.TrustedCAFile
}
if !clientTLSInfo.Empty() {
ctlscfg, err = clientTLSInfo.ServerConfig()
if err != nil {
return nil, err
}
}
etcdClient, err := v3.New(v3.Config{
Endpoints: cfg.Etcd.Client.URLs,
TLS: ctlscfg,
DialTimeout: 3 * time.Second,
})
if err != nil {
return nil, err
}
return &EtcdClient{etcdClient}, nil
}
func (c *EtcdClient) GetClient() *v3.Client {
return c.Client
}
// TestEtcdClient verifies a client is functional. It will attempt to
// connect to the etcd server and block until the server responds at least once, or return an
// error if the server never responded.
func (c *EtcdClient) TestEtcdClient() error {
ctx := context.Background()
for i := 0; ; i++ {
_, err := c.Get(ctx, "/")
if err == nil {
break
}
if i > 100 {
return fmt.Errorf("could not reach etcd: %v", err)
}
time.Sleep(50 * time.Millisecond)
}
return nil
}