-
Notifications
You must be signed in to change notification settings - Fork 0
/
registration_discovery.go
281 lines (254 loc) · 9.01 KB
/
registration_discovery.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
package regdis
import (
"context"
"errors"
"fmt"
clientv3 "go.etcd.io/etcd/client/v3"
"log"
"strings"
"sync"
"time"
)
type etcdRegistrationDiscoveryImp struct {
callBackMap map[string]ServiceCallback //存储服务变化订阅的callback,key为serviceName,针对一个服务只能有一个订阅回调
callBackMapLock *sync.RWMutex
fundServiceMap map[string]map[string]Service //存储已经发现的服务,第一层key为service name,第二层key为service ID
fundServiceMapLock *sync.RWMutex
registerServiceMap map[string]*etcdServiceReg //存储已经注册的服务,key为serviceName:serviceId
registerMapLock *sync.RWMutex
savePath string //etcd存储路径
etcdClient *clientv3.Client //etcdreg client v3
findDisable bool //是否停用查找服务功能,停止find服务则不会watch etcd变动,但是则只能使用注册功能,不能使用查找和订阅功能
isInitFinish bool
initFinishCond *sync.Cond
timeOut time.Duration
}
func NewEtcdRegistrationDiscovery(endpoints []string, userName, password, savePath string, findDisable bool, timeOut time.Duration) (RegistrationAndDiscovery, error) {
client, err := clientv3.New(clientv3.Config{
Endpoints: endpoints,
DialTimeout: timeOut,
Username: userName,
Password: password,
//DialOptions: []grpc.DialOption{grpc.WithBlock()},
})
if err != nil {
return nil, err
}
//判断etcd是否能够连通
ctx, _ := context.WithTimeout(context.Background(), timeOut)
_, err = client.Status(ctx, client.Endpoints()[0])
if err != nil {
return nil, errors.New(fmt.Sprintf("connect etcd error %s", err.Error()))
}
etcdReg := &etcdRegistrationDiscoveryImp{
callBackMap: make(map[string]ServiceCallback),
callBackMapLock: &sync.RWMutex{},
registerServiceMap: make(map[string]*etcdServiceReg),
registerMapLock: &sync.RWMutex{},
fundServiceMap: make(map[string]map[string]Service),
fundServiceMapLock: &sync.RWMutex{},
etcdClient: client,
findDisable: findDisable,
savePath: fmt.Sprintf("/%s/", strings.Trim(savePath, "/")), //标准存储路径为/path/
initFinishCond: sync.NewCond(&sync.Mutex{}),
timeOut: timeOut,
}
//未停用查找功能,则初始化watch功能
if !etcdReg.findDisable {
go etcdReg.findAndWatch()
}
return etcdReg, nil
}
func (e *etcdRegistrationDiscoveryImp) findAndWatch() {
defer func() {
//初始化完成后,通知等待事件的
e.isInitFinish = true
e.initFinishCond.Broadcast()
}()
//启动时先从etcd读取一次数据
kv := clientv3.NewKV(e.etcdClient)
ctx, _ := context.WithTimeout(context.Background(), e.timeOut)
resp, err := kv.Get(ctx, e.savePath, clientv3.WithPrefix())
if err != nil {
log.Printf("query etcdreg error %s\n", err.Error())
return
}
//记录读取数据时的最大revision
maxRevision := resp.Header.Revision
e.fundServiceMapLock.Lock()
//将读取到的数据存储到发现列表
for _, kv := range resp.Kvs {
currSer, err := ParseServiceFromStr(string(kv.Value))
if err != nil {
log.Printf("findAndWatch parse.ParseServiceFromStr error %s--%v\n", err.Error(), kv.Value)
continue
}
if serMap, ok := e.fundServiceMap[currSer.ServiceName()]; ok {
serMap[currSer.ServiceID()] = currSer
} else {
serMap = map[string]Service{}
serMap[currSer.ServiceID()] = currSer
e.fundServiceMap[currSer.ServiceName()] = serMap
}
}
e.fundServiceMapLock.Unlock()
go e.watchLoop(maxRevision + 1)
}
func (e *etcdRegistrationDiscoveryImp) watchLoop(startWatchRevision int64) {
//从最前面查找到的数据的max revision + 1 开始watch,即watch上次查找后的对应目录的一切变动
//watch这里的context不能使用带有过期时间的
watchChan := e.etcdClient.Watch(context.TODO(), e.savePath, clientv3.WithPrefix(), clientv3.WithRev(startWatchRevision))
for {
select {
case ser := <-watchChan:
//为了方便去重,直接使用map,int值无实际用途
changeServiceNameList := map[string]int{}
e.fundServiceMapLock.Lock()
for _, event := range ser.Events {
//解析变动的可以,获取对应的serviceName、serviceID
//无论是put事件还是delete事件都有key,delete事件没有value
serName, serID, err := getServiceNameAndId(e.savePath, string(event.Kv.Key))
if err != nil {
log.Printf("getServiceNameAndId error %s\n", err.Error())
continue
}
//记录当前变动的服务名称
changeServiceNameList[serName] = 1
serMap, ok := e.fundServiceMap[serName]
if !ok {
serMap = make(map[string]Service)
}
//从字符串解析出service对象
switch event.Type {
case clientv3.EventTypePut:
log.Printf("新增服务id:%s\n", serID)
currSer, err := ParseServiceFromStr(string(event.Kv.Value))
if err != nil {
log.Printf("parse.ParseServiceFromStr error %s--%v\n", err.Error(), event.Kv.Value)
continue
}
//无需关心是新增还是修改,直接将新的ser对象存起来即可
serMap[serID] = currSer
case clientv3.EventTypeDelete: //删除事件,kv在prekv中
log.Printf("删除服务id:%s\n", serID)
delete(serMap, serID)
}
//处理后统一赋值
e.fundServiceMap[serName] = serMap
}
//检查订阅事件中的服务
e.callBackMapLock.RLock()
for k, v := range e.callBackMap {
//有变动则调用相应的函数
if _, ok := changeServiceNameList[k]; ok {
newSers := []Service{}
serMap, ok := e.fundServiceMap[k]
if ok {
for _, ser := range serMap {
newSers = append(newSers, ser)
}
}
//此处传输的是专门构造的slice,并发安全,直接新启用协程加快调度速度,减少锁占用时间
if v != nil {
go v(newSers)
}
}
}
e.callBackMapLock.RUnlock()
e.fundServiceMapLock.Unlock()
}
}
}
//向注册中心注册服务
func (e *etcdRegistrationDiscoveryImp) Register(ser Service) error {
e.registerMapLock.RLock()
if _, ok := e.registerServiceMap[getRegisterMapKey(ser)]; ok {
e.registerMapLock.RUnlock()
return errors.New("same name and same id service already register do not repeat")
}
e.registerMapLock.RUnlock()
etcdService := newEtcdServiceReg(ser, e.etcdClient, e.savePath, e.timeOut)
err := etcdService.Register()
if err != nil {
return err
}
//sync.Map非并发安全的,所以加锁,然后把注册的服务存入
e.registerMapLock.Lock()
defer e.registerMapLock.Unlock()
e.registerServiceMap[getRegisterMapKey(ser)] = etcdService
return nil
}
//从注册中心取消注册
func (e *etcdRegistrationDiscoveryImp) Deregister(ser Service) error {
e.registerMapLock.Lock()
defer e.registerMapLock.Unlock()
serKey := getRegisterMapKey(ser)
if curSer, ok := e.registerServiceMap[serKey]; ok {
err := curSer.Deregister()
if err != nil {
log.Println("Deregister error " + err.Error())
return err
}
delete(e.registerServiceMap, serKey)
}
return nil
}
//根据服务名查找服务列表
func (e *etcdRegistrationDiscoveryImp) Find(serviceName string) ([]Service, error) {
if e.findDisable {
return nil, errors.New("you hava set find disable can not use find")
}
//使用cond阻塞直到e.findAndWatch()初始化成功
e.initFinishCond.L.Lock()
for !e.isInitFinish {
e.initFinishCond.Wait()
}
e.initFinishCond.L.Unlock()
//因为已经启动了所有的service的监控,所以并不需要再次从etcd中查询,直接取本地存储值即可
e.fundServiceMapLock.RLock()
defer e.fundServiceMapLock.RUnlock()
serList := []Service{}
if serMap, ok := e.fundServiceMap[serviceName]; ok {
for _, v := range serMap {
serList = append(serList, v)
}
}
return serList, nil
}
//供外部使用,订阅某个服务的变化情况,一个服务只能有一个订阅回调
func (e *etcdRegistrationDiscoveryImp) Subscribe(serviceName string, callback ServiceCallback) error {
if e.findDisable {
return errors.New("you hava set find disable can not use subscribe")
}
e.callBackMapLock.Lock()
defer e.callBackMapLock.Unlock()
if _, ok := e.callBackMap[serviceName]; ok {
return errors.New("can not repeat subscribe the same service by name")
}
e.callBackMap[serviceName] = callback
return nil
}
//取消针对服务名的订阅
func (e *etcdRegistrationDiscoveryImp) Unsubscribe(serviceName string) error {
if e.findDisable {
return errors.New("you hava set find disable can not use subscribe , so do not need call unsubscribe")
}
e.callBackMapLock.Lock()
defer e.callBackMapLock.Unlock()
delete(e.callBackMap, serviceName)
return nil
}
func getRegisterMapKey(ser Service) string {
return fmt.Sprintf("%s:%s", ser.ServiceName(), ser.ServiceID())
}
//通过给定的etcd Key获取serviceName,serviceId
func getServiceNameAndId(savePath, key string) (serviceName, serviceId string, err error) {
str := strings.Replace(key, savePath, "", 1)
if str != "" && strings.Contains(str, ServiceNameAndIDSeparator) {
strArr := strings.Split(str, ServiceNameAndIDSeparator)
if len(strArr) == 2 {
return strArr[0], strArr[1], nil
}
}
return "", "", errors.New("error service value")
}