forked from Conflux-Chain/confura
-
Notifications
You must be signed in to change notification settings - Fork 14
/
node_status.go
213 lines (176 loc) · 6.11 KB
/
node_status.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
package node
import (
"encoding/json"
"fmt"
"time"
"github.com/pkg/errors"
"github.com/scroll-tech/rpc-gateway/util/metrics"
"github.com/sirupsen/logrus"
ring "github.com/zealws/golang-ring"
)
// HealthMonitor is implemented by any objects that support to monitor full node health.
type HealthMonitor interface {
// HealthyEpoch returns the healthy epoch number among full nodes.
// Usually, it is the middle epoch number of all full nodes.
HealthyEpoch() uint64
// ReportEpoch fired when epoch changes.
ReportEpoch(nodeName string, epoch uint64)
// ReportUnhealthy fired when full node becomes unhealthy or unrecovered for a long time.
ReportUnhealthy(nodeName string, remind bool, reason error)
// ReportHealthy fired when full node becomes healthy.
ReportHealthy(nodeName string)
}
// Status represents the node status, including current epoch number and health status.
type Status struct {
nodeName string
metric *statusMetrics
latestStateEpoch uint64
successCounter uint64
failureCounter uint64
unhealthy bool
unhealthReportAt time.Time
latestHeartBeatErrs *ring.Ring
}
func NewStatus(group Group, nodeName string) Status {
hbErrRingBuf := &ring.Ring{}
hbErrRingBuf.SetCapacity(int(2 * cfg.Monitor.Unhealth.Failures))
return Status{
nodeName: nodeName,
metric: newStatusMetrics(
metrics.Registry.Nodes.NodeLatency(group.Space(), group.String(), nodeName),
metrics.Registry.Nodes.NodeAvailability(group.Space(), group.String(), nodeName),
),
latestHeartBeatErrs: hbErrRingBuf,
}
}
// Update heartbeats with node and updates health status.
func (s *Status) Update(n Node, monitor HealthMonitor) {
s.heartbeat(n)
s.updateHealth(monitor)
}
// MarshalJSON marshals as JSON.
func (s *Status) MarshalJSON() ([]byte, error) {
type Status struct {
NodeName string `json:"nodeName"`
Availability string `json:"availability"`
MeanLatency string `json:"meanLatency"`
P99Latency string `json:"P99Latency"`
P75Latency string `json:"P75Latency"`
LatestStateEpoch uint64 `json:"latestStateEpoch"`
SuccessCounter uint64 `json:"successCounter"`
FailureCounter uint64 `json:"failureCounter"`
Unhealthy bool `json:"unhealthy"`
UnhealthReportAt string `json:"unhealthReportAt"`
LatestHeartBeatErrs []string `json:"latestHeartBeatErrs"`
}
availability := metrics.GetOrRegisterTimeWindowPercentageDefault(s.metric.availability).Value()
latency := metrics.GetOrRegisterHistogram(s.metric.latency).Snapshot()
scopy := Status{
NodeName: s.nodeName,
Availability: fmt.Sprintf("%.2f%%", availability),
MeanLatency: fmt.Sprintf("%.2f(ms)", latency.Mean()/1e6),
P99Latency: fmt.Sprintf("%.2f(ms)", latency.Percentile(0.99)/1e6),
P75Latency: fmt.Sprintf("%.2f(ms)", latency.Percentile(0.75)/1e6),
LatestStateEpoch: s.latestStateEpoch,
SuccessCounter: s.successCounter,
FailureCounter: s.failureCounter,
Unhealthy: s.unhealthy,
UnhealthReportAt: s.unhealthReportAt.Format(time.RFC3339),
}
hbErrors := s.latestHeartBeatErrs.Values()
for _, e := range hbErrors {
scopy.LatestHeartBeatErrs = append(scopy.LatestHeartBeatErrs, e.(error).Error())
}
return json.Marshal(&scopy)
}
// heartbeat heartbeats with node to update status.
func (s *Status) heartbeat(n Node) {
start := time.Now()
epoch, err := n.LatestEpochNumber()
s.metric.update(start, err)
if err != nil {
s.failureCounter++
s.successCounter = 0
logrus.WithFields(logrus.Fields{
"status": s, "reqTime": start,
}).WithError(err).Info("Failed to heartbeat with node")
s.latestHeartBeatErrs.Enqueue(err)
} else {
s.latestStateEpoch = epoch
s.failureCounter = 0
s.successCounter++
}
}
// updateHealth reports health status to monitor.
func (s *Status) updateHealth(monitor HealthMonitor) {
reason := s.checkHealth(monitor.HealthyEpoch())
if s.unhealthy {
if reason == nil {
// node become healthy after N success
if s.successCounter >= cfg.Monitor.Recover.SuccessCounter {
s.unhealthy = false
s.unhealthReportAt = time.Time{}
monitor.ReportHealthy(s.nodeName)
}
} else {
// remind long unhealthy every N minutes, even occasionally succeeded
remindTime := s.unhealthReportAt.Add(cfg.Monitor.Recover.RemindInterval)
if now := time.Now(); now.After(remindTime) {
monitor.ReportUnhealthy(s.nodeName, true, reason)
s.unhealthReportAt = now
}
}
} else {
if reason == nil {
monitor.ReportEpoch(s.nodeName, s.latestStateEpoch)
} else {
// node become unhealthy
s.unhealthy = true
s.unhealthReportAt = time.Now()
monitor.ReportUnhealthy(s.nodeName, false, reason)
}
}
}
// checkHealth checks health status with collected node information.
func (s *Status) checkHealth(targetEpoch uint64) error {
// RPC failures
if s.failureCounter >= cfg.Monitor.Unhealth.Failures {
return errors.Errorf("RPC failures (%v)", s.failureCounter)
}
// epoch fall behind
if s.latestStateEpoch+cfg.Monitor.Unhealth.EpochsFallBehind < targetEpoch {
return errors.Errorf("Epoch fall behind (%v)", targetEpoch-s.latestStateEpoch)
}
// latency too high
percentile := cfg.Monitor.Unhealth.LatencyPercentile
maxLatency := cfg.Monitor.Unhealth.MaxLatency
latencySnapshot := metrics.GetOrRegisterHistogram(s.metric.latency).Snapshot()
latency := time.Duration(latencySnapshot.Percentile(percentile))
if latency > maxLatency {
return errors.Errorf("Latency too high (%v)", latency)
}
return nil
}
func (s *Status) Close() {
s.metric.unregisterAll()
}
type statusMetrics struct {
latency string // ping latency via cfx_epochNumber/eth_blockNumber
availability string // node availability percent
}
func newStatusMetrics(latency, availability string) *statusMetrics {
return &statusMetrics{
latency: latency,
availability: availability,
}
}
func (sm *statusMetrics) update(start time.Time, err error) {
if err == nil {
metrics.GetOrRegisterHistogram(sm.latency).Update(time.Since(start).Nanoseconds())
}
metrics.GetOrRegisterTimeWindowPercentageDefault(sm.availability).Mark(err == nil)
}
func (sm *statusMetrics) unregisterAll() {
metrics.InfuraRegistry.Unregister(sm.latency)
metrics.InfuraRegistry.Unregister(sm.availability)
}