-
Notifications
You must be signed in to change notification settings - Fork 9
/
electorconsul_runtime.go
154 lines (121 loc) · 3.07 KB
/
electorconsul_runtime.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
// 实现文件 electorconsul 基于 consul 实现的选举
package electorconsul
import (
"context"
"errors"
"fmt"
"time"
"github.com/pojol/braid-go/components/depends/bconsul"
"github.com/pojol/braid-go/components/depends/blog"
"github.com/pojol/braid-go/module"
"github.com/pojol/braid-go/module/meta"
)
const (
// Name 选举器名称
Name = "ConsulElection"
)
var (
// ErrConfigConvert 配置转换失败
ErrConfigConvert = errors.New("[Elector] convert config error")
)
func (e *consulElection) Init() error {
sid, err := e.client.CreateSession(e.info.Name + "_lead")
if err != nil {
e.log.Warnf("[Elector] %v Dependency check error %v", e.info.Name, err.Error())
return fmt.Errorf("[Elector] %v Dependency check error %v", e.info.Name, "consul")
}
e.sessionID = sid
e.log.Infof("[Eleactor] create session succ id:%v\n", sid)
return nil
}
func BuildWithOption(info meta.ServiceInfo, opts ...Option) module.IElector {
p := Parm{
LockTick: time.Second * 2,
RefushSessionTick: time.Second * 5,
}
for _, opt := range opts {
opt(&p)
}
p.Pubsub.GetTopic(info.Name + "." + info.ID + "." + meta.TopicElectionChangeState)
return &consulElection{
parm: p,
ps: p.Pubsub,
client: p.ConsulCli,
log: p.Log,
}
}
type consulElection struct {
lockTicker *time.Ticker
refushTicker *time.Ticker
client *bconsul.Client
sessionID string
locked bool
log *blog.Logger
ps module.IPubsub
info meta.ServiceInfo
parm Parm
}
func (e *consulElection) watch() {
watchLock := func() {
defer func() {
if err := recover(); err != nil {
e.log.Errf("[Elector] watchLock err %v", err)
}
}()
if !e.locked {
succ, err := e.client.AcquireLock(e.info.Name, e.sessionID)
if err != nil {
e.log.Warnf("[Elector] acquire lock service %s err %v", e.info.Name, err.Error())
}
if succ {
e.locked = true
e.ps.GetTopic(meta.TopicElectionChangeState).Pub(context.TODO(), meta.EncodeStateChangeMsg(meta.EMaster, e.info.ID))
e.log.Infof("[Elector] acquire lock service %s, id %s", e.info.Name, e.sessionID)
} else {
e.ps.GetTopic(meta.TopicElectionChangeState).Pub(context.TODO(), meta.EncodeStateChangeMsg(meta.ESlave, e.info.ID))
}
}
}
// time.Millisecond * 2000
e.lockTicker = time.NewTicker(e.parm.LockTick)
for {
<-e.lockTicker.C
watchLock()
}
}
func (e *consulElection) refresh() {
refushSession := func() {
defer func() {
if err := recover(); err != nil {
e.log.Errf("[Elector] refresh err %v", err)
}
}()
err := e.client.RefreshSession(e.sessionID)
if err != nil {
// log
e.log.Warnf("[Elector] refresh session err %v", err.Error())
}
}
// time.Millisecond * 1000 * 5
e.refushTicker = time.NewTicker(e.parm.RefushSessionTick)
for {
<-e.refushTicker.C
if e.locked {
refushSession()
}
}
}
// Run session 状态检查
func (e *consulElection) Run() {
go func() {
e.refresh()
}()
go func() {
e.watch()
}()
}
// Close 释放锁,删除session
func (e *consulElection) Close() {
e.client.ReleaseLock(e.info.Name, e.sessionID)
e.client.DeleteSession(e.sessionID)
}