-
Notifications
You must be signed in to change notification settings - Fork 0
/
discover.go
157 lines (128 loc) · 3.49 KB
/
discover.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
package sylvain
import (
"context"
"fmt"
"log"
"math/rand"
"time"
clientv3 "go.etcd.io/etcd/client/v3"
)
type ServerDiscoverOption func(svr *Server)
//type ServerDiscoverHandler func(svr *Server)
type NamedServerDiscover struct {
leaseID clientv3.LeaseID
svr *Server
clt *Client
ticker *time.Ticker
// leaseKeepAlive <-chan *clientv3.LeaseKeepAliveResponse
// handler 当服务数据变动时,会调用这个函数通知监听
//handler ServerDiscoverHandler
}
func NewNamedServerDiscover(clt *Client, opts ...ServerDiscoverOption) *NamedServerDiscover {
svr := &Server{ip: getHostIp()}
for _, opt := range opts {
opt(svr)
}
svr.Addr = svr.GetAddr()
namedServerDiscover := &NamedServerDiscover{
svr: svr,
clt: clt,
ticker: time.NewTicker(3 * time.Second),
}
namedServerDiscover.publishWithLeaseKey(5)
return namedServerDiscover
}
// publishWithLeaseKey 将服务信息 push 到 etcd 里
func (discover *NamedServerDiscover) publishWithLeaseKey(lease int64) {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
grant, err := discover.clt.Grant(ctx, lease)
if err != nil {
panic(err)
}
discover.leaseID = grant.ID
serverBuf, _ := discover.svr.MarshalBinary()
_, err = discover.clt.Put(ctx, discover.svr.Name, string(serverBuf), clientv3.WithLease(grant.ID))
if err != nil {
panic(err)
}
go discover.keepAlive()
go discover.monitor()
}
// keepAlive 让服务在 etcd 里保持存活
func (discover *NamedServerDiscover) keepAlive() {
for {
select {
case <-discover.ticker.C:
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
_, err := discover.clt.KeepAliveOnce(ctx, discover.leaseID)
if err != nil {
panic(err)
}
cancel()
}
}
}
// monitor 监听 etcd key 的变化
func (discover *NamedServerDiscover) monitor() {
for {
select {
case response, ok := <-discover.clt.Watch(context.Background(), "server", clientv3.WithPrefix()):
if !ok {
return
}
fmt.Println(response)
}
}
}
// ElectionServerEndpoint 选举服务,根据算法会返回一个最佳的服务器地址
func (discover *NamedServerDiscover) ElectionServerEndpoint(name string) string {
response, err := discover.clt.Get(context.Background(), serverNamed(name))
if err != nil {
// ERROR
}
if response.Count == 0 {
return ""
}
// 服务列表序列化,将所有的列表都序列化出来
var servers = make([]*Server, response.Count)
for i, value := range response.Kvs {
var server = new(Server)
_ = server.UnmarshalBinary(value.Value)
servers[i] = server
}
// 生成随机数,随机因子是 response 里的 Count
idx := rand.Intn(int(response.Count))
if idx == 0 {
idx++
}
// 随机选举,每一个服务的概率是一样的
return servers[idx-1].Addr
}
// Close discover close
func (discover *NamedServerDiscover) Close() {
discover.ticker.Stop()
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
if _, err := discover.clt.Revoke(ctx, discover.leaseID); err != nil {
panic(err)
}
log.Println("Delete server", discover.svr.Name)
if _, err := discover.clt.Delete(ctx, discover.svr.Name); err != nil {
panic(err)
}
cancel()
_ = discover.clt.Close()
}
func serverNamed(name string) string {
return fmt.Sprintf("server/%s", name)
}
func WithServerNamed(name string) ServerDiscoverOption {
return func(svr *Server) {
svr.Name = serverNamed(name)
}
}
func WithServerPort(port int) ServerDiscoverOption {
return func(svr *Server) {
svr.port = port
}
}