forked from streamingfast/node-manager
/
monitor.go
96 lines (79 loc) · 2.28 KB
/
monitor.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
package manageos
import (
"time"
"github.com/dfuse-io/dmetrics"
"go.uber.org/atomic"
)
type Readiness interface {
IsReady() bool
}
type MetricsAndReadinessManager struct {
headBlockChan chan *headBlock
headBlockTimeDrift *dmetrics.HeadTimeDrift
headBlockNumber *dmetrics.HeadBlockNum
readinessProbe *atomic.Bool
// ReadinessMaxLatency is the max delta between head block time and
// now before /healthz starts returning success
readinessMaxLatency time.Duration
}
func NewMetricsAndReadinessManager(headBlockTimeDrift *dmetrics.HeadTimeDrift, headBlockNumber *dmetrics.HeadBlockNum, readinessMaxLatency time.Duration) *MetricsAndReadinessManager {
return &MetricsAndReadinessManager{
headBlockChan: make(chan *headBlock, 1), // just for non-blocking, saving a few nanoseconds here
readinessProbe: atomic.NewBool(false),
headBlockTimeDrift: headBlockTimeDrift,
headBlockNumber: headBlockNumber,
readinessMaxLatency: readinessMaxLatency,
}
}
func (m *MetricsAndReadinessManager) setReadinessProbeOn() {
if m.readinessProbe.CAS(false, true) {
//m.Logger.Info("nodeos superviser is now assumed to be ready")
}
}
func (m *MetricsAndReadinessManager) setReadinessProbeOff() {
if m.readinessProbe.CAS(true, false) {
//m.Logger.Info("nodeos superviser is not ready anymore")
}
}
func (m *MetricsAndReadinessManager) IsReady() bool {
return m.readinessProbe.Load()
}
func (m *MetricsAndReadinessManager) Launch() {
var lastSeenBlock *headBlock
for {
select {
case block := <-m.headBlockChan:
lastSeenBlock = block
case <-time.After(time.Second):
}
if lastSeenBlock == nil {
continue
}
// metrics
if m.headBlockNumber != nil {
m.headBlockNumber.SetUint64(lastSeenBlock.Num)
}
if m.headBlockTimeDrift != nil {
m.headBlockTimeDrift.SetBlockTime(lastSeenBlock.Time)
}
// readiness
if m.readinessMaxLatency == 0 || time.Since(lastSeenBlock.Time) < m.readinessMaxLatency {
m.setReadinessProbeOn()
} else {
m.setReadinessProbeOff()
}
}
}
func (m *MetricsAndReadinessManager) UpdateHeadBlock(num uint64, ID string, t time.Time) {
m.headBlockChan <- &headBlock{
ID: ID,
Num: num,
Time: t,
}
}
type headBlock struct {
ID string
Num uint64
Time time.Time
}
type HeadBlockUpdater func(uint64, string, time.Time)