/
registry.go
108 lines (90 loc) · 2.9 KB
/
registry.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
package etcd
import (
"bytes"
"context"
"github.com/smartwalle/etcd4go"
clientv3 "go.etcd.io/etcd/client/v3"
"google.golang.org/grpc/resolver"
"path/filepath"
"sync"
)
const (
kDefaultScheme = "etcd"
)
type Registry struct {
scheme string
client *etcd4go.Client
mu *sync.Mutex
watchers map[string]*etcd4go.Watcher
}
func NewRegistry(client *clientv3.Client) *Registry {
return NewRegistryWithScheme(kDefaultScheme, client)
}
func NewRegistryWithScheme(scheme string, client *clientv3.Client) *Registry {
var r = &Registry{scheme: scheme, client: etcd4go.NewClient(client)}
r.mu = &sync.Mutex{}
r.watchers = make(map[string]*etcd4go.Watcher)
resolver.Register(r)
return r
}
func (this *Registry) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) {
//var key = target.Scheme + "://" + filepath.Join(target.Authority, target.Endpoint)
var key = this.buildPath(target.Authority, target.Endpoint)
var watcher = this.client.Watch(context.Background(), key, this.watch(cc), clientv3.WithPrefix())
this.mu.Lock()
this.update(cc, watcher.Values())
this.watchers[key] = watcher
this.mu.Unlock()
return this, nil
}
func (this *Registry) watch(cc resolver.ClientConn) func(watcher *etcd4go.Watcher, event, key, path string, value []byte) {
return func(watcher *etcd4go.Watcher, event, key, path string, value []byte) {
var paths = watcher.Values()
this.update(cc, paths)
}
}
func (this *Registry) update(cc resolver.ClientConn, paths map[string][]byte) {
var addrList = make([]resolver.Address, 0, len(paths))
for _, nValue := range paths {
var addr = resolver.Address{Addr: string(nValue)}
addrList = append(addrList, addr)
}
cc.UpdateState(resolver.State{Addresses: addrList})
}
func (this *Registry) Scheme() string {
return this.scheme
}
func (this *Registry) ResolveNow(option resolver.ResolveNowOptions) {
}
func (this *Registry) Close() {
this.mu.Lock()
for _, watcher := range this.watchers {
if watcher != nil {
watcher.Close()
}
}
this.mu.Unlock()
}
func (this *Registry) Register(ctx context.Context, domain, service, node, addr string, ttl int64) (key string, err error) {
_, key, err = this.client.Register(ctx, this.BuildPath(domain, service, node), addr, ttl)
return key, err
}
func (this *Registry) Unregister(ctx context.Context, domain, service, node string) (err error) {
return this.client.Unregister(ctx, this.BuildPath(domain, service, node))
}
func (this *Registry) BuildPath(domain, service, node string) string {
return this.buildPath(domain, service, node)
}
func (this *Registry) buildPath(paths ...string) string {
var nPath = filepath.Join(paths...)
if len(nPath) > 0 && nPath[0] == '/' {
nPath = nPath[1:]
}
var buf = bytes.NewBufferString(this.scheme)
buf.WriteString("://")
buf.WriteString(nPath)
if len(nPath) > 0 && nPath[len(nPath)-1] != '/' {
buf.WriteString("/")
}
return buf.String()
}