This repository has been archived by the owner on Apr 29, 2020. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 51
/
health.go
281 lines (254 loc) · 7.89 KB
/
health.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
package watch
import (
"fmt"
"io/ioutil"
"net/http"
"time"
"github.com/square/p2/pkg/health"
"github.com/square/p2/pkg/kp"
"github.com/square/p2/pkg/logging"
"github.com/square/p2/pkg/pods"
"github.com/square/p2/pkg/preparer"
"github.com/square/p2/pkg/types"
"github.com/square/p2/pkg/util/param"
)
// These constants should probably all be something the p2 user can set
// in their preparer config...
// Duration between health checks
const HEALTHCHECK_INTERVAL = 1 * time.Second
// Maximum allowed time for a single check, in seconds
var HEALTHCHECK_TIMEOUT = param.Int64("healthcheck_timeout", 5)
// Contains method for watching the consul reality store to
// track services running on a node. A manager method:
// MonitorPodHealth tracks the reality store and manages
// a health checking go routine for each service in the
// reality store
// PodWatch houses a pod's manifest, a channel to kill the
// pod's goroutine if the pod is removed from the reality
// tree, and a bool that indicates whether or not the pod
// has a running MonitorHealth go routine
type PodWatch struct {
manifest pods.Manifest
updater kp.HealthUpdater
statusChecker StatusChecker
// For tracking/controlling the go routine that performs health checks
// on the pod associated with this PodWatch
shutdownCh chan bool
logger *logging.Logger
}
// StatusChecker holds all the data required to perform
// a status check on a particular service
type StatusChecker struct {
ID types.PodID
Node string
URI string
Client *http.Client
}
// MonitorPodHealth is meant to be a long running go routine.
// MonitorPodHealth reads from a consul store to determine which
// services should be running on the host. MonitorPodHealth
// runs a CheckHealth routine to monitor the health of each
// service and kills routines for services that should no
// longer be running.
func MonitorPodHealth(config *preparer.PreparerConfig, logger *logging.Logger, shutdownCh chan struct{}) {
store, err := config.GetStore()
if err != nil {
// A bad config should have already produced a nice, user-friendly error message.
logger.WithError(err).Fatalln("error creating health monitor KV store")
}
healthManager := store.NewHealthManager(config.NodeName, *logger)
// if GetClient fails it means the certfile/keyfile/cafile were
// invalid or did not exist. It makes sense to throw a fatal error
client, err := config.GetClient(time.Duration(*HEALTHCHECK_TIMEOUT) * time.Second)
if err != nil {
logger.WithError(err).Fatalln("failed to get http client for this preparer")
}
node := config.NodeName
pods := []PodWatch{}
watchQuitCh := make(chan struct{})
watchErrCh := make(chan error)
watchPodCh := make(chan []kp.ManifestResult)
go store.WatchPods(
kp.REALITY_TREE,
node,
watchQuitCh,
watchErrCh,
watchPodCh,
)
for {
select {
case results := <-watchPodCh:
// check if pods have been added or removed
// starts monitor routine for new pods
// kills monitor routine for removed pods
pods = updatePods(healthManager, client, pods, results, node, logger)
case err := <-watchErrCh:
logger.WithError(err).Errorln("there was an error reading reality manifests for health monitor")
case <-shutdownCh:
for _, pod := range pods {
pod.shutdownCh <- true
}
healthManager.Close()
close(watchQuitCh)
return
}
}
}
// compares services being monitored with services that
// need to be monitored.
func updatePods(
healthManager kp.HealthManager,
client *http.Client,
current []PodWatch,
reality []kp.ManifestResult,
node string,
logger *logging.Logger,
) []PodWatch {
newCurrent := []PodWatch{}
// for pod in current if pod not in reality: kill
for _, pod := range current {
inReality := false
for _, man := range reality {
if man.Manifest.ID() == pod.manifest.ID() &&
man.Manifest.GetStatusHTTP() == pod.manifest.GetStatusHTTP() &&
man.Manifest.GetStatusPath() == pod.manifest.GetStatusPath() &&
man.Manifest.GetStatusPort() == pod.manifest.GetStatusPort() {
inReality = true
break
}
}
// if this podwatch is not in the reality store kill its go routine
// else add this podwatch to newCurrent
if inReality == false {
pod.shutdownCh <- true
} else {
newCurrent = append(newCurrent, pod)
}
}
// for pod in reality if pod not in current: create podwatch and
// append to current
for _, man := range reality {
missing := true
for _, pod := range newCurrent {
if man.Manifest.ID() == pod.manifest.ID() {
missing = false
break
}
}
// if a manifest is in reality but not current a podwatch is created
// with that manifest and added to newCurrent
if missing {
sc := StatusChecker{
ID: man.Manifest.ID(),
Node: node,
Client: client,
}
if man.Manifest.GetStatusPort() == 0 {
sc.URI = ""
} else if man.Manifest.GetStatusHTTP() {
sc.URI = fmt.Sprintf("http://%s:%d%s", node, man.Manifest.GetStatusPort(), man.Manifest.GetStatusPath())
} else {
sc.URI = fmt.Sprintf("https://%s:%d%s", node, man.Manifest.GetStatusPort(), man.Manifest.GetStatusPath())
}
newPod := PodWatch{
manifest: man.Manifest,
updater: healthManager.NewUpdater(man.Manifest.ID(), string(man.Manifest.ID())),
statusChecker: sc,
shutdownCh: make(chan bool, 1),
logger: logger,
}
// Each health monitor will have its own statusChecker
go newPod.MonitorHealth()
newCurrent = append(newCurrent, newPod)
}
}
return newCurrent
}
// Monitor Health is a go routine that runs as long as the
// service it is monitoring. Every HEALTHCHECK_INTERVAL it
// performs a health check and writes that information to
// consul
func (p *PodWatch) MonitorHealth() {
for {
select {
case <-time.After(HEALTHCHECK_INTERVAL):
p.checkHealth()
case <-p.shutdownCh:
p.updater.Close()
return
}
}
}
func (p *PodWatch) checkHealth() {
health, err := p.statusChecker.Check()
if err != nil {
p.logger.WithError(err).Warningln("health check failed")
return
}
p.updater.PutHealth(resToKPRes(health))
}
// Given the result of a status check this method
// creates a health.Result for that node/service/result
func (sc *StatusChecker) Check() (health.Result, error) {
if sc.URI != "" {
return sc.resultFromCheck(sc.StatusCheck())
} else {
// "unknown" is probably more accurate, but automated tools can't handle an app that is
// always non-"passing". For instance, p2-replicate by default waits for a node to
// become "passing" before it considers the deployment a success.
//
// TODO: P2 has the capacity to check whether the app's process is running. This would
// make a great default status check! However, that information isn't easily accessible
// over here in the watch package. It would take a lot of refactoring to make this
// happen.
return health.Result{
ID: sc.ID,
Node: sc.Node,
Service: string(sc.ID),
Status: health.Passing,
Output: "(no health check defined)",
}, nil
}
}
func (sc *StatusChecker) resultFromCheck(resp *http.Response, err error) (health.Result, error) {
res := health.Result{
ID: sc.ID,
Node: sc.Node,
Service: string(sc.ID),
}
if err != nil || resp == nil {
res.Status = health.Critical
if err != nil {
res.Output = err.Error()
}
return res, nil
}
res.Output, err = getBody(resp)
if resp.StatusCode >= 200 && resp.StatusCode < 300 {
res.Status = health.Passing
} else {
res.Status = health.Critical
}
return res, err
}
// Go version of http status check
func (sc *StatusChecker) StatusCheck() (*http.Response, error) {
return sc.Client.Get(sc.URI)
}
func getBody(resp *http.Response) (string, error) {
defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return "", err
}
return string(body), nil
}
func resToKPRes(res health.Result) kp.WatchResult {
return kp.WatchResult{
Service: res.Service,
Node: res.Node,
Id: res.ID,
Status: string(res.Status),
Output: res.Output,
}
}