-
Notifications
You must be signed in to change notification settings - Fork 5
/
health.go
97 lines (83 loc) · 2.72 KB
/
health.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
package embetcd
import (
"context"
"github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
"path"
"sync"
"time"
cli "github.com/coreos/etcd/clientv3"
"github.com/coreos/etcd/pkg/types"
)
// memberHealth is a struct for storing Discovery time and Last time seen healthy
type memberHealth struct {
// Name is the name of the member in the cluster
Name string
// DiscoveredTime indicates the time the member was first discovered
// this is used to account for member startup time
Discovered time.Time
// LastHealthy indicates the last time the member was seen healthy
LastHealthy time.Time
// Client for checking member's health
Client *Client
// ClientURLs are the client urls to the member
ClientURLs []string
}
// healthCheckViaMemberHealth checks
func healthCheckViaMemberHealth(timeout context.Context, m *memberHealth) (healthKey bool) {
if m.ClientURLs != nil && len(m.ClientURLs) > 0 {
// create a new client
var err error
if m.Client == nil {
m.Client, err = NewClient(cli.Config{Endpoints: m.ClientURLs[:], Context: timeout})
}
// set client endpoints
if err == nil && m.Client != nil {
m.Client.SetEndpoints(m.ClientURLs[:]...)
if _, err := m.Client.Get(timeout, "health"); err == nil || err == rpctypes.ErrPermissionDenied {
healthKey = true
}
}
}
return healthKey
}
// updateMemberHealthStats fetches the health status of an individual member
func isMemberHealthy(ctx context.Context, m *memberHealth, clusterClient *Client) bool {
var healthKey bool // whether the member health request succeeded
var memberKey bool // whether the member key was found in the cluster
wg := sync.WaitGroup{}
timeout, cancel := context.WithTimeout(ctx, DefaultDialTimeout)
defer cancel()
wg.Add(1)
go func() {
defer wg.Done()
if clusterClient != nil {
if resp, nameErr := clusterClient.Get(timeout, path.Join("members", m.Name)); nameErr == nil && len(resp.Kvs) > 0 {
memberKey = true
}
}
}()
wg.Add(1)
go func() {
defer wg.Done()
healthKey = healthCheckViaMemberHealth(timeout, m)
}()
wg.Wait()
return healthKey || memberKey
}
// updateClusterHealthStats checks the health of each member and update times last seen healthy
func updateClusterHealthStats(ctx context.Context, clusterClient *Client, cache map[types.ID]*memberHealth) {
// create a wait group to wait for health status from each member of the cluster
wg := sync.WaitGroup{}
// get the cluster member health concurrently
for _, m := range cache {
// fetch the health of the member in a separate go routine
wg.Add(1)
go func(ctx context.Context, m *memberHealth, clusterClient *Client) {
if isMemberHealthy(ctx, m, clusterClient) {
m.LastHealthy = time.Now()
}
wg.Done()
}(ctx, m, clusterClient)
}
wg.Wait()
}