/
health.go
337 lines (298 loc) · 9.39 KB
/
health.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
package etcdcli
import (
"context"
"fmt"
"strings"
"sync"
"time"
"github.com/prometheus/client_golang/prometheus"
"go.etcd.io/etcd/api/v3/etcdserverpb"
clientv3 "go.etcd.io/etcd/client/v3"
"k8s.io/component-base/metrics/legacyregistry"
klog "k8s.io/klog/v2"
)
func init() {
legacyregistry.RawMustRegister(raftTerms)
}
const raftTermsMetricName = "etcd_debugging_raft_terms_total"
// raftTermsCollector is thread-safe internally
var raftTerms = &raftTermsCollector{
desc: prometheus.NewDesc(
raftTermsMetricName,
"Number of etcd raft terms as observed by each member.",
[]string{"member"},
prometheus.Labels{},
),
terms: map[string]uint64{},
lock: sync.RWMutex{},
}
type healthCheck struct {
Member *etcdserverpb.Member
Healthy bool
Took string
Error error
}
type memberHealth []healthCheck
func GetMemberHealth(ctx context.Context, etcdMembers []*etcdserverpb.Member) memberHealth {
memberHealth := memberHealth{}
for _, member := range etcdMembers {
if !HasStarted(member) {
memberHealth = append(memberHealth, healthCheck{Member: member, Healthy: false})
continue
}
const defaultTimeout = 30 * time.Second
resChan := make(chan healthCheck, 1)
go func() {
ctxTimeout, cancel := context.WithTimeout(ctx, defaultTimeout)
defer cancel()
resChan <- checkSingleMemberHealth(ctxTimeout, member)
// closing here to avoid late replies to panic on resChan,
// the result will be considered a timeout anyway
close(resChan)
}()
select {
case res := <-resChan:
memberHealth = append(memberHealth, res)
case <-time.After(defaultTimeout):
memberHealth = append(memberHealth, healthCheck{
Member: member,
Healthy: false,
Error: fmt.Errorf("30s timeout waiting for member %s to respond to health check",
member.Name)})
}
}
// Purge any unknown members from the raft term metrics collector.
for _, cachedMember := range raftTerms.List() {
found := false
for _, member := range etcdMembers {
if member.Name == cachedMember {
found = true
break
}
}
if !found {
// Forget is a map deletion underneath, which is idempotent and under a lock.
raftTerms.Forget(cachedMember)
}
}
return memberHealth
}
func checkSingleMemberHealth(ctx context.Context, member *etcdserverpb.Member) healthCheck {
// If the endpoint is for a learner member then we should skip testing the connection
// via the member list call as learners don't support that.
// The learner's connection would get tested in the health check below
skipConnectionTest := false
if member.IsLearner {
skipConnectionTest = true
}
cli, err := newEtcdClientWithClientOpts([]string{member.ClientURLs[0]}, skipConnectionTest)
if err != nil {
return healthCheck{
Member: member,
Healthy: false,
Error: fmt.Errorf("create client failure: %w", err)}
}
defer func() {
if err := cli.Close(); err != nil {
klog.Errorf("error closing etcd client for GetMemberHealth: %v", err)
}
}()
st := time.Now()
var resp *clientv3.GetResponse
if member.IsLearner {
// Learner members only support serializable (without consensus) read requests
resp, err = cli.Get(ctx, "health", clientv3.WithSerializable())
} else {
// Linearized request to verify health of a voting member
resp, err = cli.Get(ctx, "health")
}
hc := healthCheck{Member: member, Healthy: false, Took: time.Since(st).String()}
if err == nil {
if resp.Header != nil {
// TODO(thomas): this is a somewhat misplaced side-effect that is safe to call from multiple goroutines
raftTerms.Set(member.Name, resp.Header.RaftTerm)
}
hc.Healthy = true
} else {
klog.Errorf("health check for member (%v) failed: err(%v)", member.Name, err)
hc.Error = fmt.Errorf("health check failed: %w", err)
}
return hc
}
// Status returns a reporting of memberHealth status
func (h memberHealth) Status() string {
healthyMembers := h.GetHealthyMembers()
status := []string{}
if len(h) == len(healthyMembers) {
status = append(status, fmt.Sprintf("%d members are available", len(h)))
} else {
status = append(status, fmt.Sprintf("%d of %d members are available", len(healthyMembers), len(h)))
for _, etcd := range h {
switch {
case !HasStarted(etcd.Member):
status = append(status, fmt.Sprintf("%s has not started", GetMemberNameOrHost(etcd.Member)))
break
case !etcd.Healthy:
status = append(status, fmt.Sprintf("%s is unhealthy", etcd.Member.Name))
break
}
}
}
return strings.Join(status, ", ")
}
// GetHealthyMembers returns healthy members
func (h memberHealth) GetHealthyMembers() []*etcdserverpb.Member {
var members []*etcdserverpb.Member
for _, etcd := range h {
if etcd.Healthy {
members = append(members, etcd.Member)
}
}
return members
}
// GetUnhealthy returns unhealthy members
func (h memberHealth) GetUnhealthyMembers() []*etcdserverpb.Member {
var members []*etcdserverpb.Member
for _, etcd := range h {
if !etcd.Healthy {
members = append(members, etcd.Member)
}
}
return members
}
// GetUnstarted returns unstarted members
func (h memberHealth) GetUnstartedMembers() []*etcdserverpb.Member {
var members []*etcdserverpb.Member
for _, etcd := range h {
if !HasStarted(etcd.Member) {
members = append(members, etcd.Member)
}
}
return members
}
// GetUnhealthyMemberNames returns a list of unhealthy member names
func GetUnhealthyMemberNames(memberHealth []healthCheck) []string {
memberNames := []string{}
for _, etcd := range memberHealth {
if !etcd.Healthy {
memberNames = append(memberNames, GetMemberNameOrHost(etcd.Member))
}
}
return memberNames
}
// GetHealthyMemberNames returns a list of healthy member names
func GetHealthyMemberNames(memberHealth []healthCheck) []string {
memberNames := []string{}
for _, etcd := range memberHealth {
if etcd.Healthy {
memberNames = append(memberNames, etcd.Member.Name)
}
}
return memberNames
}
// GetUnstartedMemberNames returns a list of unstarted member names
func GetUnstartedMemberNames(memberHealth []healthCheck) []string {
memberNames := []string{}
for _, etcd := range memberHealth {
if !HasStarted(etcd.Member) {
memberNames = append(memberNames, GetMemberNameOrHost(etcd.Member))
}
}
return memberNames
}
// HasStarted return true if etcd member has started.
func HasStarted(member *etcdserverpb.Member) bool {
if len(member.ClientURLs) == 0 {
return false
}
return true
}
// IsQuorumFaultTolerant checks the current etcd cluster and returns true if the cluster can tolerate the
// loss of a single etcd member. Such loss is common during new static pod revision.
func IsQuorumFaultTolerant(memberHealth []healthCheck) bool {
totalMembers := len(memberHealth)
quorum, err := MinimumTolerableQuorum(totalMembers)
if err != nil {
klog.Errorf("etcd cluster could not determine minimum quorum required. total number of members is %v. minimum quorum required is %v: %v", totalMembers, quorum, err)
return false
}
healthyMembers := len(GetHealthyMemberNames(memberHealth))
switch {
case totalMembers-quorum < 1:
klog.Errorf("etcd cluster has quorum of %d which is not fault tolerant: %+v", quorum, memberHealth)
return false
case healthyMembers-quorum < 1:
klog.Errorf("etcd cluster has quorum of %d and %d healthy members which is not fault tolerant: %+v", quorum, healthyMembers, memberHealth)
return false
}
return true
}
// IsQuorumFaultTolerantErr is the same as IsQuorumFaultTolerant but with an error return instead of the log
func IsQuorumFaultTolerantErr(memberHealth []healthCheck) error {
totalMembers := len(memberHealth)
quorum, err := MinimumTolerableQuorum(totalMembers)
if err != nil {
return fmt.Errorf("etcd cluster could not determine minimum quorum required. total number of members is %v. minimum quorum required is %v: %w", totalMembers, quorum, err)
}
healthyMembers := len(GetHealthyMemberNames(memberHealth))
switch {
case totalMembers-quorum < 1:
return fmt.Errorf("etcd cluster has quorum of %d which is not fault tolerant: %+v", quorum, memberHealth)
case healthyMembers-quorum < 1:
return fmt.Errorf("etcd cluster has quorum of %d and %d healthy members which is not fault tolerant: %+v", quorum, healthyMembers, memberHealth)
}
return nil
}
func IsClusterHealthy(memberHealth memberHealth) bool {
unhealthyMembers := memberHealth.GetUnhealthyMembers()
if len(unhealthyMembers) > 0 {
return false
}
return true
}
// raftTermsCollector is a Prometheus collector to re-expose raft terms as a counter.
type raftTermsCollector struct {
desc *prometheus.Desc
terms map[string]uint64
lock sync.RWMutex
}
func (c *raftTermsCollector) Describe(ch chan<- *prometheus.Desc) {
ch <- c.desc
}
func (c *raftTermsCollector) Set(member string, value uint64) {
c.lock.Lock()
defer c.lock.Unlock()
c.terms[member] = value
}
func (c *raftTermsCollector) Forget(member string) {
c.lock.Lock()
defer c.lock.Unlock()
delete(c.terms, member)
}
func (c *raftTermsCollector) List() []string {
c.lock.RLock()
defer c.lock.RUnlock()
var members []string
for member := range c.terms {
members = append(members, member)
}
return members
}
func (c *raftTermsCollector) Collect(ch chan<- prometheus.Metric) {
c.lock.RLock()
defer c.lock.RUnlock()
for member, val := range c.terms {
ch <- prometheus.MustNewConstMetric(
c.desc,
prometheus.CounterValue,
float64(val),
member,
)
}
}
func MinimumTolerableQuorum(members int) (int, error) {
if members <= 0 {
return 0, fmt.Errorf("invalid etcd member length: %v", members)
}
return (members / 2) + 1, nil
}