This repository has been archived by the owner on Apr 23, 2024. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 36
/
setting.go
156 lines (123 loc) · 3.49 KB
/
setting.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
package destination
import (
"log"
"math/rand"
"sync"
"time"
)
type (
// ServersStr represent location that accepts logs (e.g. "srv1:8123" or "=srv1*100 =srv2*100 =srv3*100")
ServersStr string
// ServerHostPort is a full host name with port, e.g. "srv1:8123"
ServerHostPort string
// Server is a single entry in cluster specification
Server struct {
HostPort ServerHostPort
Weight uint32
}
// Map represents routing map config
Map map[ServersStr]*Setting
// Setting represents group of settings for specific destination Server
Setting struct {
Default bool
Servers []Server
Tables []string
// state, all is protected by mu
mu sync.Mutex
curServerIdx int
curMaxWeight uint32
brokenHosts map[ServerHostPort]time.Time
stopCh chan struct{}
}
)
// NewSetting must be used to initialize Setting struct
func NewSetting() *Setting {
return &Setting{
brokenHosts: make(map[ServerHostPort]time.Time),
stopCh: make(chan struct{}),
}
}
// Init sets up current state based on public settings
func (s *Setting) Init() {
s.mu.Lock()
defer s.mu.Unlock()
s.curServerIdx = -1 // curServerIdx is incremented first, so we need to set it to -1 in order to get 0 first
s.recalcMaxWeight()
}
// Destroy stops health check goroutines, if any
func (s *Setting) Destroy() {
close(s.stopCh)
}
// ChooseNextServer returns next server from the list or ok=false, which means that there are no available hosts
func (s *Setting) ChooseNextServer() (srv ServerHostPort, ok bool) {
s.mu.Lock()
defer s.mu.Unlock()
cnt := len(s.Servers)
for i := 0; i < cnt; i++ {
s.curServerIdx = (s.curServerIdx + 1) % cnt
el := s.Servers[s.curServerIdx]
if _, ok := s.brokenHosts[el.HostPort]; ok {
continue
}
if s.curMaxWeight > 0 && uint32(rand.Intn(int(s.curMaxWeight))+1) > el.Weight {
continue
}
return el.HostPort, true
}
return "", false
}
type aliveCheckFunc func(ServerHostPort) error
// TempDisableHost marks provided host as temporarily disabled (it will not be returned in ChooseNextServer()).
// Provided callback checkCb is used to periodically check that server went live again.
func (s *Setting) TempDisableHost(srv ServerHostPort, aliveCheck aliveCheckFunc) {
s.mu.Lock()
defer s.mu.Unlock()
if _, ok := s.brokenHosts[srv]; ok {
return
}
log.Printf("Temporarily disabled %s", srv)
s.brokenHosts[srv] = time.Now()
s.recalcMaxWeight()
go s.tryRestoreHostLoop(srv, aliveCheck)
}
func (s *Setting) tryRestoreHostLoop(srv ServerHostPort, aliveCheck aliveCheckFunc) {
for {
select {
case <-time.After(time.Second*30 + time.Duration(rand.Intn(30000))*time.Millisecond):
case <-s.stopCh:
return
}
log.Printf("Checking whether %s is alive", srv)
err := aliveCheck(srv)
if err == nil {
s.enableHost(srv)
return
}
log.Printf("Health check failed for %s: %s", srv, err.Error())
}
}
// enableHost allows specified server to be used again
func (s *Setting) enableHost(srv ServerHostPort) {
s.mu.Lock()
defer s.mu.Unlock()
log.Printf("Server %s is back alive", srv)
delete(s.brokenHosts, srv)
s.recalcMaxWeight()
}
// assumes that mu.Lock() is held
func (s *Setting) recalcMaxWeight() {
s.curMaxWeight = 0
// shuffle servers so that we do not kill servers in order
for i := range s.Servers {
j := rand.Intn(i + 1)
s.Servers[i], s.Servers[j] = s.Servers[j], s.Servers[i]
}
for _, el := range s.Servers {
if _, ok := s.brokenHosts[el.HostPort]; ok {
continue
}
if el.Weight > s.curMaxWeight {
s.curMaxWeight = el.Weight
}
}
}