-
Notifications
You must be signed in to change notification settings - Fork 0
/
etcd.go
135 lines (119 loc) · 3.05 KB
/
etcd.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
// Package etcd provides ...
package etcd
import (
"context"
"encoding/json"
"fmt"
"time"
"github.com/trustasia-com/go-van/pkg/logx"
"github.com/trustasia-com/go-van/pkg/registry"
clientv3 "go.etcd.io/etcd/client/v3"
"google.golang.org/grpc"
)
// prefix store k/v prefix
const prefix = "/go-van/registry"
// NewRegistry return etcd regsitry
func NewRegistry(opts ...registry.Option) registry.Registry {
options := registry.Options{
TTL: time.Second * 15,
}
// apply option
for _, o := range opts {
o(&options)
}
// new etcd client
config := clientv3.Config{
TLS: options.TLS,
DialTimeout: time.Second * 5,
Endpoints: options.Addresses,
DialOptions: []grpc.DialOption{
grpc.WithBlock(),
},
}
// auth cred
if options.Context != nil {
auth, ok := options.Context.Value(authKey{}).(*authCreds)
if ok {
config.Username = auth.username
config.Password = auth.password
}
}
// ignore error, will call handle error
client, err := clientv3.New(config)
if err != nil {
logx.Fatalf("etcd: new etcd client: %s", err)
}
return &etcdRegistry{options: options, client: client}
}
type etcdRegistry struct {
options registry.Options
client *clientv3.Client
leaseID clientv3.LeaseID
}
// Register register service to registry
func (r *etcdRegistry) Register(ctx context.Context, ins *registry.Instance) error {
key := fmt.Sprintf("%s/%s/%s", prefix, ins.Name, ins.ID)
data, err := json.Marshal(ins)
if err != nil {
return err
}
// lease id
resp, err := r.client.Grant(ctx, int64(r.options.TTL.Seconds()))
if err != nil {
return err
}
_, err = r.client.Put(ctx, key, string(data), clientv3.WithLease(resp.ID))
if err != nil {
return err
}
r.leaseID = resp.ID
return r.keepAliveAsync(ctx, ins)
}
// Deregister deregister service from registry
func (r *etcdRegistry) Deregister(ctx context.Context, ins *registry.Instance) error {
key := fmt.Sprintf("%s/%s/%s", prefix, ins.Name, ins.ID)
_, err := r.client.Delete(ctx, key)
return err
}
// GetService get service from regsitry
func (r *etcdRegistry) GetService(ctx context.Context, name string) ([]*registry.Instance, error) {
key := fmt.Sprintf("%s/%s", prefix, name)
resp, err := r.client.Get(ctx, key, clientv3.WithPrefix(), clientv3.WithSerializable())
if err != nil {
return nil, err
}
var items []*registry.Instance
for _, kv := range resp.Kvs {
srv := ®istry.Instance{}
err = json.Unmarshal(kv.Value, srv)
if err != nil {
return nil, err
}
items = append(items, srv)
}
return items, nil
}
// Watch service change
func (r *etcdRegistry) Watch(ctx context.Context, name string) (registry.Watcher, error) {
key := fmt.Sprintf("%s/%s", prefix, name)
return newWatcher(ctx, key, r.client), nil
}
func (r *etcdRegistry) keepAliveAsync(ctx context.Context, ins *registry.Instance) error {
ch, err := r.client.KeepAlive(context.TODO(), r.leaseID)
if err != nil {
return err
}
go func() {
for {
select {
case _, ok := <-ch:
if !ok {
r.client.Revoke(ctx, r.leaseID)
_ = r.Register(ctx, ins)
return
}
}
}
}()
return nil
}