-
Notifications
You must be signed in to change notification settings - Fork 0
/
etcd.go
168 lines (154 loc) · 3.93 KB
/
etcd.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
package etcd
import (
"context"
clientv3 "go.etcd.io/etcd/client/v3"
"time"
)
type EtcdConfig struct {
Endpoints []string `json:"endpoints"`
DialTimeout time.Duration `json:"dialTimeout"`
}
type EtcdClient struct {
config *EtcdConfig
etcdCli *clientv3.Client
lease clientv3.Lease
}
type LockResult struct {
IsLock bool
LeaseID clientv3.LeaseID
Cancel context.CancelFunc
}
func InitEtcdClient(c *EtcdConfig) (*EtcdClient, error) {
config := clientv3.Config{
Endpoints: c.Endpoints,
DialTimeout: c.DialTimeout,
}
cli, err := clientv3.New(config)
if err != nil {
return nil, err
}
lease := clientv3.NewLease(cli)
return &EtcdClient{
config: c,
etcdCli: cli,
lease: lease,
}, nil
}
func (cli *EtcdClient) Close() error {
return cli.etcdCli.Close()
}
func (cli *EtcdClient) Put(key, metadata string, timeout time.Duration) error {
if timeout <= 0 {
timeout = 1 * time.Second
}
ctx, _ := context.WithTimeout(context.Background(), timeout)
_, err := cli.etcdCli.Put(ctx, key, metadata)
if err != nil {
return err
}
return nil
}
func (cli *EtcdClient) GetMetadata(key string, timeout time.Duration) (string, error) {
if timeout <= 0 {
timeout = 1 * time.Second
}
ctx, cancel := context.WithTimeout(context.Background(), timeout)
resp, err := cli.etcdCli.Get(ctx, key)
cancel()
if err != nil {
return "", err
}
return string(resp.Kvs[0].Value), nil
}
func (cli *EtcdClient) GetNodes(key string, timeout time.Duration) ([]string, error) {
if timeout <= 0 {
timeout = 1 * time.Second
}
ctx, cancel := context.WithTimeout(context.Background(), timeout)
resp, err := cli.etcdCli.Get(ctx, key, clientv3.WithPrefix())
cancel()
if err != nil {
return nil, err
}
var nodes []string
for _, item := range resp.Kvs {
nodes = append(nodes, string(item.Key))
}
return nodes, nil
}
func (cli *EtcdClient) Update(key, update string, timeout time.Duration) (string, error) {
if timeout <= 0 {
timeout = 1 * time.Second
}
ctx, cancel := context.WithTimeout(context.Background(), timeout)
resp, err := cli.etcdCli.Put(ctx, key, update, clientv3.WithPrevKV())
cancel()
if err != nil {
return "", err
}
return string(resp.PrevKv.Value), nil
}
func (cli *EtcdClient) Delete(key string, timeout time.Duration) error {
if timeout <= 0 {
timeout = 1 * time.Second
}
ctx, cancel := context.WithTimeout(context.Background(), timeout)
_, err := cli.etcdCli.Delete(ctx, key)
cancel()
if err != nil {
return err
}
return nil
}
func (cli *EtcdClient) Watch(key string, cb func(int32, string, string)) {
go func() {
watchKeys := cli.etcdCli.Watch(context.Background(), key)
for resp := range watchKeys {
for _, item := range resp.Events {
cb(int32(item.Type), string(item.Kv.Key), string(item.Kv.Value))
}
}
}()
}
// etcd 分布式锁
func (cli *EtcdClient) Lock(key, val string, ttl int64) (*LockResult, error) {
// 取消续租,释放租约
release := func(leaseID clientv3.LeaseID, cancel context.CancelFunc) {
cancel()
_, _ = cli.lease.Revoke(context.Background(), leaseID)
}
// 创建租约
leaseResp, err := cli.lease.Grant(context.Background(), ttl)
if err != nil {
return nil, err
}
leaseID := leaseResp.ID
// 自动续租
ctx, cancel := context.WithCancel(context.Background())
if _, err = cli.lease.KeepAlive(ctx, leaseID); err != nil {
release(leaseID, cancel)
return nil, err
}
txn := clientv3.NewKV(cli.etcdCli).Txn(context.Background())
txn = txn.If(clientv3.Compare(clientv3.CreateRevision(key), "=", 0)).
Then(clientv3.OpPut(key, val, clientv3.WithLease(leaseID))).
Else()
txnRest, err := txn.Commit()
if err != nil {
release(leaseID, cancel)
return nil, err
}
if !txnRest.Succeeded {
release(leaseID, cancel)
return &LockResult{IsLock: false}, nil
}
return &LockResult{
IsLock: true,
LeaseID: leaseID,
Cancel: cancel,
}, err
}
func (cli *EtcdClient) Unlock(res *LockResult) {
res.Cancel()
_, _ = cli.lease.Revoke(context.Background(), res.LeaseID)
}