-
-
Notifications
You must be signed in to change notification settings - Fork 3
/
Register.go
108 lines (93 loc) · 2.45 KB
/
Register.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
package worker
import (
"context"
"github.com/etcd-io/etcd/clientv3"
"github.com/m9rco/exile/kernel/common"
"github.com/m9rco/exile/kernel/utils"
"time"
)
// register: /cron/workers/ IP
type Register struct {
client *clientv3.Client
kv clientv3.KV
lease clientv3.Lease
localIP string
}
// initialize the InitRegister
func InitRegister() (err error) {
var (
config clientv3.Config
client *clientv3.Client
localIp string
configureSource interface{}
configure utils.IniParser
RegisterSev Register
)
if configureSource, err = common.Manage.GetPrototype("configure"); err != nil {
return
}
configure = configureSource.(utils.IniParser)
config = clientv3.Config{
Endpoints: []string{configure.GetString("etcd", "endpoints")},
DialTimeout: time.Duration(configure.GetInt64("etcd", "dial_timeout")) * time.Millisecond,
}
if client, err = clientv3.New(config); err != nil {
return
}
// get local ip address
if localIp, err = utils.GetLocalIP(); err != nil {
return
}
// get etcd.KV and etcd.Lease apis
common.Manage.SetSingleton("Register", Register{
client: client,
kv: clientv3.NewKV(client),
lease: clientv3.NewLease(client),
localIP: localIp,
})
RegisterSev = common.Manage.GetSingleton("JobManager").(Register)
go RegisterSev.keepOnline()
return
}
// register /cron/workers
func (register *Register) keepOnline() {
var (
leaseGrantResp *clientv3.LeaseGrantResponse
err error
keepAliveChan <-chan *clientv3.LeaseKeepAliveResponse
keepAliveResp *clientv3.LeaseKeepAliveResponse
cancelCtx context.Context
cancelFunc context.CancelFunc
)
for {
// reset the cancel method
cancelFunc = nil
// create grant
if leaseGrantResp, err = register.lease.Grant(context.TODO(), 10); err != nil {
goto RETRY
}
// automatic renewal
if keepAliveChan, err = register.lease.KeepAlive(context.TODO(), leaseGrantResp.ID); err != nil {
goto RETRY
}
cancelCtx, cancelFunc = context.WithCancel(context.TODO())
// register etcd
if _, err = register.kv.Put(cancelCtx, common.JOB_WORKER_DIR+register.localIP, "", clientv3.WithLease(leaseGrantResp.ID)); err != nil {
goto RETRY
}
// lease reply
for {
select {
case keepAliveResp = <-keepAliveChan:
if keepAliveResp == nil { // if lease fails, try again !
goto RETRY
}
}
}
RETRY:
time.Sleep(1 * time.Second)
if cancelFunc != nil {
cancelFunc()
}
}
}