forked from alibaba/sentinel-golang
-
Notifications
You must be signed in to change notification settings - Fork 0
/
etcdv3.go
134 lines (121 loc) · 3.79 KB
/
etcdv3.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
package etcdv3
import (
"context"
"time"
"github.com/sunduoyou/sentinel-golang/ext/datasource"
"github.com/sunduoyou/sentinel-golang/logging"
"github.com/sunduoyou/sentinel-golang/util"
"github.com/coreos/etcd/clientv3"
"github.com/coreos/etcd/mvcc/mvccpb"
"github.com/pkg/errors"
)
type Etcdv3DataSource struct {
datasource.Base
propertyKey string
lastUpdatedRevision int64
client *clientv3.Client
// cancel is the func, call cancel will stop watching on the propertyKey
cancel context.CancelFunc
// closed indicate whether continuing to watch on the propertyKey
closed util.AtomicBool
}
// NewDataSource new a Etcdv3DataSource instance.
// client is the etcdv3 client, it must be useful and should be release by User.
func NewDataSource(client *clientv3.Client, key string, handlers ...datasource.PropertyHandler) (*Etcdv3DataSource, error) {
if client == nil {
return nil, errors.New("The etcdv3 client is nil.")
}
ds := &Etcdv3DataSource{
client: client,
propertyKey: key,
}
for _, h := range handlers {
ds.AddPropertyHandler(h)
}
return ds, nil
}
func (s *Etcdv3DataSource) Initialize() error {
err := s.doReadAndUpdate()
if err != nil {
logging.Error(err, "Fail to update data for key when execute Etcdv3DataSource.Initialize()", "propertyKey", s.propertyKey)
}
go util.RunWithRecover(s.watch)
return nil
}
func (s *Etcdv3DataSource) ReadSource() ([]byte, error) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
resp, err := s.client.Get(ctx, s.propertyKey)
if err != nil {
return nil, errors.Errorf("Fail to get value for property key[%s]", s.propertyKey)
}
if resp.Count == 0 {
return nil, errors.Errorf("The key[%s] is not existed in etcd server.", s.propertyKey)
}
s.lastUpdatedRevision = resp.Header.GetRevision()
logging.Info("[Etcdv3] Get the newest data for key", "propertyKey", s.propertyKey,
"revision", resp.Header.GetRevision(), "value", resp.Kvs[0].Value)
return resp.Kvs[0].Value, nil
}
func (s *Etcdv3DataSource) doReadAndUpdate() error {
src, err := s.ReadSource()
if err != nil {
return err
}
return s.Handle(src)
}
func (s *Etcdv3DataSource) processWatchResponse(resp *clientv3.WatchResponse) {
if resp.CompactRevision > s.lastUpdatedRevision {
s.lastUpdatedRevision = resp.CompactRevision
}
if resp.Header.GetRevision() > s.lastUpdatedRevision {
s.lastUpdatedRevision = resp.Header.GetRevision()
}
if err := resp.Err(); err != nil {
logging.Error(err, "Watch on etcd endpoints occur error", "endpointd", s.client.Endpoints())
return
}
for _, ev := range resp.Events {
if ev.Type == mvccpb.PUT {
err := s.doReadAndUpdate()
if err != nil {
logging.Error(err, "Fail to execute doReadAndUpdate for PUT event")
}
}
if ev.Type == mvccpb.DELETE {
updateErr := s.Handle(nil)
if updateErr != nil {
logging.Error(updateErr, "Fail to execute doReadAndUpdate for DELETE event")
}
}
}
}
func (s *Etcdv3DataSource) watch() {
// Add watch for propertyKey from lastUpdatedRevision updated after Initializing
ctx, cancel := context.WithCancel(context.Background())
s.cancel = cancel
rch := s.client.Watch(ctx, s.propertyKey, clientv3.WithCreatedNotify(), clientv3.WithRev(s.lastUpdatedRevision))
for {
for resp := range rch {
s.processWatchResponse(&resp)
}
// Stop watching if datasource had been closed.
if s.closed.Get() {
return
}
time.Sleep(time.Duration(1) * time.Second)
ctx, cancel = context.WithCancel(context.Background())
s.cancel = cancel
if s.lastUpdatedRevision > 0 {
rch = s.client.Watch(ctx, s.propertyKey, clientv3.WithRev(s.lastUpdatedRevision+1))
} else {
rch = s.client.Watch(ctx, s.propertyKey)
}
}
}
func (s *Etcdv3DataSource) Close() error {
// stop to watch property key.
s.closed.Set(true)
s.cancel()
return nil
}