-
Notifications
You must be signed in to change notification settings - Fork 29
/
unmountedmonitor.go
420 lines (405 loc) · 16.4 KB
/
unmountedmonitor.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
package tools
// In /etc/hummingbird/andrewd-server.conf:
// [unmounted-monitor]
// initial_delay = 1 # seconds to wait between requests for the first pass
// pass_time_target = 600 # seconds to try to make subsequent passes take
// report_interval = 60 # seconds between progress reports
// state_retention = 86400 # seconds to retain state entries
// server_down_limit = 14400 # seconds a server can be down before removal
// device_unmounted_limit = 3600 # seconds a device can be unmounted before removal
// ignore_duration = 14400 # seconds to ignore a device's state change after updating its state in a ring
import (
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"sync/atomic"
"time"
"github.com/troubling/hummingbird/common/ring"
"github.com/uber-go/tally"
"go.uber.org/zap"
)
type unmountedMonitor struct {
aa *AutoAdmin
// delay between each request; adjusted each pass to try to make passes last passTimeTarget
delay time.Duration
passTimeTarget time.Duration
reportInterval time.Duration
stateRetention time.Duration
serverDownLimit time.Duration
deviceUnmountedLimit time.Duration
ignoreDuration time.Duration
ignore map[string]time.Time
passesMetric tally.Timer
queriesMetric tally.Counter
errorsMetric tally.Counter
serversUpMetric tally.Counter
serversDownMetric tally.Counter
devicesMountedMetric tally.Counter
devicesUnmountedMetric tally.Counter
serverRemovals tally.Counter
deviceRemovals tally.Counter
}
func newUnmountedMonitor(aa *AutoAdmin) *unmountedMonitor {
um := &unmountedMonitor{
aa: aa,
delay: time.Duration(aa.serverconf.GetInt("unmounted-monitor", "initial_delay", 10)) * time.Second,
passTimeTarget: time.Duration(aa.serverconf.GetInt("unmounted-monitor", "pass_time_target", 600)) * time.Second,
reportInterval: time.Duration(aa.serverconf.GetInt("unmounted-monitor", "report_interval", 60)) * time.Second,
stateRetention: time.Duration(aa.serverconf.GetInt("unmounted-monitor", "state_retention", 86400)) * time.Second,
serverDownLimit: time.Duration(aa.serverconf.GetInt("unmounted-monitor", "server_down_limit", 14400)) * time.Second,
deviceUnmountedLimit: time.Duration(aa.serverconf.GetInt("unmounted-monitor", "device_unmounted_limit", 3600)) * time.Second,
ignoreDuration: time.Duration(aa.serverconf.GetInt("unmounted-monitor", "ignore_duration", 14400)) * time.Second,
ignore: map[string]time.Time{},
passesMetric: aa.metricsScope.Timer("unmounted_passes"),
queriesMetric: aa.metricsScope.Counter("unmounted_queries"),
errorsMetric: aa.metricsScope.Counter("unmounted_errors"),
serversUpMetric: aa.metricsScope.Counter("unmounted_server_up_hits"),
serversDownMetric: aa.metricsScope.Counter("unmounted_server_down_hits"),
devicesMountedMetric: aa.metricsScope.Counter("unmounted_device_mounted_hits"),
devicesUnmountedMetric: aa.metricsScope.Counter("unmounted_device_unmounted_hits"),
serverRemovals: aa.metricsScope.Counter("unmounted_server_removals"),
deviceRemovals: aa.metricsScope.Counter("unmounted_device_removals"),
}
if um.delay < 0 {
um.delay = time.Second
}
if um.passTimeTarget < 0 {
um.passTimeTarget = time.Second
}
if um.reportInterval < 0 {
um.reportInterval = time.Second
}
return um
}
func (um *unmountedMonitor) runForever() {
for {
sleepFor := um.runOnce()
if sleepFor < 0 {
break
}
time.Sleep(sleepFor)
}
}
func (um *unmountedMonitor) runOnce() time.Duration {
defer um.passesMetric.Start().Stop()
type reconData struct {
Device string
Mounted bool
Size int64
Used int64
}
start := time.Now()
logger := um.aa.logger.With(zap.String("process", "unmounted monitor"))
logger.Debug("starting pass")
if err := um.aa.db.startProcessPass("unmounted monitor", "", 0); err != nil {
logger.Error("startProcessPass", zap.Error(err))
}
var delays int64
var errors int64
var serversUp int64
var serversDown int64
var devicesMounted int64
var devicesUnmounted int64
endpoints := um.reconUnmountedEndpoints()
cancel := make(chan struct{})
progressDone := make(chan struct{})
go func() {
for {
select {
case <-cancel:
close(progressDone)
return
case <-time.After(um.reportInterval):
d := atomic.LoadInt64(&delays)
e := atomic.LoadInt64(&errors)
up := atomic.LoadInt64(&serversUp)
down := atomic.LoadInt64(&serversDown)
mounted := atomic.LoadInt64(&devicesMounted)
unmounted := atomic.LoadInt64(&devicesUnmounted)
var eta time.Duration
if d > 0 {
eta = time.Duration(int64(time.Since(start)) / d * (int64(len(endpoints)) - d))
}
logger.Debug("progress", zap.Int64("endpoints so far", d), zap.Int("total endpoints", len(endpoints)), zap.Int64("errors", e), zap.Int64("servers up", up), zap.Int64("servers down", down), zap.Int64("mounted devices", mounted), zap.Int64("unmounted devices", unmounted), zap.String("eta", eta.String()))
if err := um.aa.db.progressProcessPass("unmounted monitor", "", 0, fmt.Sprintf("%d of %d endpoints, %d errors, %d/%d servers up/down, %d/%d devices mounted/unmounted, eta %s", d, len(endpoints), e, up, down, mounted, unmounted, eta)); err != nil {
logger.Error("progressProcessPass", zap.Error(err))
}
}
}
}()
for url, endpoint := range endpoints {
atomic.AddInt64(&delays, 1)
um.queriesMetric.Inc(1)
time.Sleep(um.delay)
reconLogger := logger.With(zap.String("method", "GET"), zap.String("url", url))
req, err := http.NewRequest("GET", url, nil)
if err != nil {
reconLogger.Error("http.NewRequest", zap.Error(err))
atomic.AddInt64(&errors, 1)
um.errorsMetric.Inc(1)
continue
}
req.Header.Set("User-Agent", "Andrewd")
resp, err := um.aa.client.Do(req)
if err != nil {
reconLogger.Error("Do", zap.Error(err))
atomic.AddInt64(&serversDown, 1)
um.serversDownMetric.Inc(1)
um.serverDown(reconLogger, endpoint.ip, endpoint.port)
continue
}
body, err := ioutil.ReadAll(resp.Body)
resp.Body.Close()
if err != nil {
reconLogger.Error("Body", zap.Int("StatusCode", resp.StatusCode), zap.Error(err))
atomic.AddInt64(&serversDown, 1)
um.serversDownMetric.Inc(1)
um.serverDown(reconLogger, endpoint.ip, endpoint.port)
continue
}
if resp.StatusCode/100 != 2 {
reconLogger.Error("StatusCode", zap.Int("StatusCode", resp.StatusCode), zap.Error(err))
atomic.AddInt64(&serversDown, 1)
um.serversDownMetric.Inc(1)
um.serverDown(reconLogger, endpoint.ip, endpoint.port)
continue
}
var items []*reconData
if err := json.Unmarshal(body, &items); err != nil {
reconLogger.Error("JSON", zap.String("JSON", string(body)), zap.Error(err))
atomic.AddInt64(&serversDown, 1)
um.serversDownMetric.Inc(1)
um.serverDown(reconLogger, endpoint.ip, endpoint.port)
continue
}
atomic.AddInt64(&serversUp, 1)
um.serversUpMetric.Inc(1)
um.serverUp(reconLogger, endpoint.ip, endpoint.port)
for _, item := range items {
devLogger := reconLogger.With(zap.String("device", item.Device))
if item.Mounted {
atomic.AddInt64(&devicesMounted, 1)
um.devicesMountedMetric.Inc(1)
um.deviceMounted(devLogger, endpoint.ip, endpoint.port, item.Device, item.Size, item.Used)
} else {
atomic.AddInt64(&devicesUnmounted, 1)
um.devicesUnmountedMetric.Inc(1)
um.deviceUnmounted(devLogger, endpoint.ip, endpoint.port, item.Device)
}
}
}
close(cancel)
<-progressDone
um.delay = um.passTimeTarget / time.Duration(delays)
sleepFor := time.Until(start.Add(um.passTimeTarget))
if sleepFor < 0 {
sleepFor = 0
}
logger.Debug("pass complete", zap.Int64("errors", errors), zap.Int64("servers up", serversUp), zap.Int64("servers down", serversDown), zap.Int64("devices mounted", devicesMounted), zap.Int64("devices unmounted", devicesUnmounted), zap.String("next delay", um.delay.String()), zap.String("sleep for", sleepFor.String()))
if err := um.aa.db.progressProcessPass("unmounted monitor", "", 0, fmt.Sprintf("%d of %d endpoints, %d errors, %d/%d servers up/down, %d/%d devices mounted/unmounted", delays, len(endpoints), errors, serversUp, serversDown, devicesMounted, devicesUnmounted)); err != nil {
logger.Error("progressProcessPass", zap.Error(err))
}
if err := um.aa.db.completeProcessPass("unmounted monitor", "", 0); err != nil {
logger.Error("completeProcessPass", zap.Error(err))
}
return sleepFor
}
type endpointIPPort struct {
ip string
port int
}
func (um *unmountedMonitor) reconUnmountedEndpoints() map[string]*endpointIPPort {
endpointMap := map[string]*endpointIPPort{}
for _, typ := range []string{"account", "container", "object"} {
if typ == "object" {
for _, policy := range um.aa.policies {
ryng, _ := getRing("", typ, policy.Index)
for _, dev := range ryng.AllDevices() {
if dev == nil || dev.Weight < 0 {
continue
}
endpointMap[fmt.Sprintf("%s://%s:%d/recon/diskusage", dev.Scheme, dev.Ip, dev.Port)] = &endpointIPPort{ip: dev.Ip, port: dev.Port}
}
}
} else {
ryng, _ := getRing("", typ, 0)
for _, dev := range ryng.AllDevices() {
if dev == nil || dev.Weight < 0 {
continue
}
endpointMap[fmt.Sprintf("%s://%s:%d/recon/diskusage", dev.Scheme, dev.Ip, dev.Port)] = &endpointIPPort{ip: dev.Ip, port: dev.Port}
}
}
}
return endpointMap
}
func (um *unmountedMonitor) serverUp(logger *zap.Logger, ip string, port int) {
if err := um.aa.db.addServerState(ip, port, true, time.Now().Add(-um.stateRetention)); err != nil {
logger.Error("could not add server up state", zap.Error(err))
}
}
func (um *unmountedMonitor) serverDown(logger *zap.Logger, ip string, port int) {
var lastUp time.Time
states, err := um.aa.db.serverStates(ip, port)
if err != nil {
logger.Error("could not retrieve server states", zap.Error(err))
return
}
for _, entry := range states {
if entry.state {
lastUp = entry.recorded
break
}
}
// This is to counteract the "andrewd hasn't been running for a while"
// scenario. If there are no entries since server_down_limit we'll make a
// fake "up" entry. This does mean that the pass_time_target and the actual
// pass times need to be faster than the server_down_limit.
if len(states) < 1 || time.Since(states[0].recorded) > um.serverDownLimit {
um.serverUp(logger, ip, port)
lastUp = time.Now()
}
if err := um.aa.db.addServerState(ip, port, false, time.Now().Add(-um.stateRetention)); err != nil {
logger.Error("could not add server down state", zap.Error(err))
return
}
// If there were no "up" entries...
if lastUp.IsZero() {
// Just pretend it was last "up" at the time of the oldest entry.
if len(states) > 0 {
lastUp = states[len(states)-1].recorded
} else {
lastUp = time.Now()
}
}
if time.Since(lastUp) < um.serverDownLimit {
return
}
um.removeFromBuilders(logger, ip, port, "")
um.serverRemovals.Inc(1)
}
func (um *unmountedMonitor) deviceMounted(logger *zap.Logger, ip string, port int, device string, size, used int64) {
logger.Debug("device mounted")
if err := um.aa.db.addDeviceState(ip, port, device, true, time.Now().Add(-um.stateRetention), size, used); err != nil {
logger.Error("could not add device mounted state", zap.Error(err))
}
}
func (um *unmountedMonitor) deviceUnmounted(logger *zap.Logger, ip string, port int, device string) {
logger.Debug("device unmounted")
var lastUp time.Time
var size int64
var used int64
states, err := um.aa.db.deviceStates(ip, port, device)
if err != nil {
logger.Error("could not retrieve device states", zap.Error(err))
return
}
for _, entry := range states {
if entry.state {
lastUp = entry.recorded
size = entry.size
used = entry.used
break
}
}
// This is to counteract the "andrewd hasn't been running for a while"
// scenario. If there are no entries since device_unmounted_limit we'll
// make a fake mounted entry. This does mean that the pass_time_target and
// the actual pass times need to be faster than the device_unmounted_limit.
if len(states) < 1 || time.Since(states[0].recorded) > um.deviceUnmountedLimit {
um.deviceMounted(logger, ip, port, device, size, used)
lastUp = time.Now()
}
logger.Debug("device unmounted, last up initially", zap.String("last up", lastUp.String()))
if err := um.aa.db.addDeviceState(ip, port, device, false, time.Now().Add(-um.stateRetention), 0, 0); err != nil {
logger.Error("could not add device unmounted state", zap.Error(err))
}
// If there were no "up" entries...
if lastUp.IsZero() {
// Just pretend it was last "up" at the time of the oldest entry.
if len(states) > 0 {
lastUp = states[len(states)-1].recorded
} else {
lastUp = time.Now()
}
logger.Debug("device unmounted, last up assumed", zap.String("last up", lastUp.String()))
}
logger.Debug("device unmounted, duration check", zap.String("last up", lastUp.String()), zap.String("limit", um.deviceUnmountedLimit.String()), zap.String("elapsed", time.Since(lastUp).String()))
if time.Since(lastUp) < um.deviceUnmountedLimit {
logger.Debug("device unmounted, but not long enough yet")
return
}
logger.Debug("device unmounted, it's time to remove it")
um.removeFromBuilders(logger, ip, port, device)
um.deviceRemovals.Inc(1)
logger.Debug("device unmounted, done trying to remove it")
}
func (um *unmountedMonitor) removeFromBuilders(logger *zap.Logger, ip string, port int, device string) {
for _, typ := range []string{"account", "container", "object"} {
if typ == "object" {
for _, policy := range um.aa.policies {
um.removeFromBuilder(logger, ip, port, device, typ, policy.Index)
}
} else {
um.removeFromBuilder(logger, ip, port, device, typ, 0)
}
}
}
func (um *unmountedMonitor) removeFromBuilder(logger *zap.Logger, ip string, port int, device, typ string, policy int) {
ignoreKey := fmt.Sprintf("%s:%d/%s/%s/%d", ip, port, device, typ, policy)
if time.Now().Before(um.ignore[ignoreKey]) {
logger.Debug("ignoring", zap.String("ignore key", ignoreKey), zap.String("until", um.ignore[ignoreKey].String()))
return
}
ringBuilder, ringBuilderFilePath, err := ring.GetRingBuilder(typ, policy)
if err != nil {
logger.Error("Could not find builder", zap.String("type", typ), zap.Int("policy", policy), zap.Error(err))
return
}
ringBuilderLock, err := ring.LockBuilderPath(ringBuilderFilePath)
if err != nil {
logger.Error("Could not lock builder path", zap.String("type", typ), zap.Int("policy", policy), zap.String("ring builder file path", ringBuilderFilePath), zap.Error(err))
return
}
defer ringBuilderLock.Close()
ringBuilder, ringBuilderFilePath, err = ring.GetRingBuilder(typ, policy)
if err != nil {
logger.Error("Could not find builder after lock", zap.String("type", typ), zap.Int("policy", policy), zap.Error(err))
return
}
changed := false
for _, dev := range ringBuilder.SearchDevs(-1, -1, ip, int64(port), "", -1, device, -1, "", "") {
if dev.Weight >= 0 {
ringBuilder.RemoveDev(dev.Id, false)
changed = true
if device == "" {
um.aa.db.addRingLog(typ, policy, fmt.Sprintf("server %s:%d down; removed device %s id:%d", ip, port, dev.Device, dev.Id))
} else {
um.aa.db.addRingLog(typ, policy, fmt.Sprintf("removed unmounted device %s id:%d on server %s:%d", dev.Device, dev.Id, ip, port))
}
}
}
if !changed {
um.ignore[ignoreKey] = time.Now().Add(um.ignoreDuration)
logger.Debug("not changed; now ignoring", zap.String("ignore key", ignoreKey), zap.String("until", um.ignore[ignoreKey].String()))
return
}
err = ringBuilder.Save(ringBuilderFilePath)
if err != nil {
logger.Error("Error while saving builder", zap.String("type", typ), zap.Int("policy", policy), zap.String("path", ringBuilderFilePath), zap.Error(err))
}
_, _, _, err = ring.Rebalance(ringBuilderFilePath, false, false, true)
if err != nil {
logger.Error("Error while rebalancing", zap.String("type", typ), zap.Int("policy", policy), zap.String("path", ringBuilderFilePath), zap.Error(err))
}
if device == "" {
um.aa.db.addRingLog(typ, policy, fmt.Sprintf("rebalanced due to downed server %s:%d", ip, port))
} else {
um.aa.db.addRingLog(typ, policy, fmt.Sprintf("rebalanced due to downed device %s on %s:%d", device, ip, port))
}
// NOTE: The ringmonitor.go will detect the above ring changes on disk and
// initiate a fastscan for ringscan.go to push out the new rings.
um.ignore[ignoreKey] = time.Now().Add(um.ignoreDuration)
logger.Debug("changed; will now ignore", zap.String("ignore key", ignoreKey), zap.String("until", um.ignore[ignoreKey].String()))
}