-
Notifications
You must be signed in to change notification settings - Fork 681
/
cleanup.go
312 lines (286 loc) · 11.6 KB
/
cleanup.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
// nolint:unparam // this will be refactored in https://github.com/wormhole-foundation/wormhole/pull/1953
package processor
import (
"context"
"encoding/hex"
"fmt"
"time"
"github.com/certusone/wormhole/node/pkg/common"
"github.com/certusone/wormhole/node/pkg/db"
gossipv1 "github.com/certusone/wormhole/node/pkg/proto/gossip/v1"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/wormhole-foundation/wormhole/sdk/vaa"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
)
var (
aggregationStateEntries = promauto.NewGauge(
prometheus.GaugeOpts{
Name: "wormhole_aggregation_state_entries",
Help: "Current number of aggregation state entries (including unexpired succeed ones)",
})
aggregationStateExpiration = promauto.NewCounter(
prometheus.CounterOpts{
Name: "wormhole_aggregation_state_expirations_total",
Help: "Total number of expired submitted aggregation states",
})
aggregationStateLate = promauto.NewCounter(
prometheus.CounterOpts{
Name: "wormhole_aggregation_state_late_total",
Help: "Total number of late aggregation states (cluster achieved consensus without us)",
})
aggregationStateTimeout = promauto.NewCounter(
prometheus.CounterOpts{
Name: "wormhole_aggregation_state_timeout_total",
Help: "Total number of aggregation states expired due to timeout after exhausting retries",
})
aggregationStateRetries = promauto.NewCounter(
prometheus.CounterOpts{
Name: "wormhole_aggregation_state_retries_total",
Help: "Total number of aggregation states queued for resubmission",
})
aggregationStateUnobserved = promauto.NewCounter(
prometheus.CounterOpts{
Name: "wormhole_aggregation_state_unobserved_total",
Help: "Total number of aggregation states expired due to no matching local message observations",
})
aggregationStateFulfillment = promauto.NewCounterVec(
prometheus.CounterOpts{
Name: "wormhole_aggregation_state_settled_signatures_total",
Help: "Total number of signatures produced by a validator, counted after waiting a fixed amount of time",
}, []string{"addr", "origin", "status"})
)
const (
settlementTime = time.Second * 30
// retryLimitOurs defines how long this Guardian will keep an observation in the local state before discarding it.
// Oservations from other Guardians can take up to 24h to arrive if they are held in their Governor. Therefore, this value should be greater than 24h.
retryLimitOurs = time.Hour * 30
retryLimitNotOurs = time.Hour
)
var (
FirstRetryMinWait = time.Minute * 5
)
// handleCleanup handles periodic retransmissions and cleanup of observations
func (p *Processor) handleCleanup(ctx context.Context) {
p.logger.Info("aggregation state summary", zap.Int("cached", len(p.state.signatures)))
aggregationStateEntries.Set(float64(len(p.state.signatures)))
for hash, s := range p.state.signatures {
delta := time.Since(s.firstObserved)
if !s.submitted && s.ourObservation != nil && delta > settlementTime {
// Expire pending VAAs post settlement time if we have a stored quorum VAA.
//
// This occurs when we observed a message after the cluster has already reached
// consensus on it, causing us to never achieve quorum.
if ourVaa, ok := s.ourObservation.(*VAA); ok {
if p.haveSignedVAA(*db.VaaIDFromVAA(&ourVaa.VAA)) {
// If we have a stored quorum VAA, we can safely expire the state.
//
// This is a rare case, and we can safely expire the state, since we
// have a quorum VAA.
p.logger.Info("Expiring late VAA",
zap.String("message_id", ourVaa.VAA.MessageID()),
zap.String("digest", hash),
zap.Duration("delta", delta),
)
aggregationStateLate.Inc()
delete(p.state.signatures, hash)
continue
}
}
}
switch {
case !s.settled && delta > settlementTime:
// After 30 seconds, the observation is considered settled - it's unlikely that more observations will
// arrive, barring special circumstances. This is a better time to count misses than submission,
// because we submit right when we quorum rather than waiting for all observations to arrive.
s.settled = true
// Use either the most recent (in case of a observation we haven't seen) or stored gs, if available.
var gs *common.GuardianSet
if s.gs != nil {
gs = s.gs
} else {
gs = p.gs
}
hasSigs := len(s.signatures)
wantSigs := vaa.CalculateQuorum(len(gs.Keys))
quorum := hasSigs >= wantSigs
var chain vaa.ChainID
if s.ourObservation != nil {
chain = s.ourObservation.GetEmitterChain()
}
if p.logger.Level().Enabled(zapcore.DebugLevel) {
p.logger.Debug("observation considered settled",
zap.String("message_id", s.LoggingID()),
zap.String("digest", hash),
zap.Duration("delta", delta),
zap.Int("have_sigs", hasSigs),
zap.Int("required_sigs", wantSigs),
zap.Bool("quorum", quorum),
zap.Stringer("emitter_chain", chain),
)
}
for _, k := range gs.Keys {
if _, ok := s.signatures[k]; ok {
aggregationStateFulfillment.WithLabelValues(k.Hex(), s.source, "present").Inc()
} else {
aggregationStateFulfillment.WithLabelValues(k.Hex(), s.source, "missing").Inc()
}
}
case s.submitted && delta.Hours() >= 1:
// We could delete submitted observations right away, but then we'd lose context about additional (late)
// observation that come in. Therefore, keep it for a reasonable amount of time.
// If a very late observation arrives after cleanup, a nil aggregation state will be created
// and then expired after a while (as noted in observation.go, this can be abused by a byzantine guardian).
if p.logger.Level().Enabled(zapcore.DebugLevel) {
p.logger.Debug("expiring submitted observation",
zap.String("message_id", s.LoggingID()),
zap.String("digest", hash),
zap.Duration("delta", delta),
)
}
delete(p.state.signatures, hash)
aggregationStateExpiration.Inc()
case !s.submitted && ((s.ourMsg != nil && delta > retryLimitOurs) || (s.ourMsg == nil && delta > retryLimitNotOurs)):
// Clearly, this horse is dead and continued beatings won't bring it closer to quorum.
p.logger.Info("expiring unsubmitted observation after exhausting retries",
zap.String("message_id", s.LoggingID()),
zap.String("digest", hash),
zap.Duration("delta", delta),
zap.Bool("weObserved", s.ourMsg != nil),
)
delete(p.state.signatures, hash)
aggregationStateTimeout.Inc()
case !s.submitted && delta >= FirstRetryMinWait && time.Since(s.nextRetry) >= 0:
// Poor observation has been unsubmitted for five minutes - clearly, something went wrong.
// If we have previously submitted an observation, and it was reliable, we can make another attempt to get
// it over the finish line by sending a re-observation request to the network and rebroadcasting our
// sig. If we do not have an observation, it means we either never observed it, or it got
// revived by a malfunctioning guardian node, in which case, we can't do anything about it
// and just delete it to keep our state nice and lean.
if s.ourMsg != nil {
// Unreliable observations cannot be resubmitted and can be considered failed after 5 minutes
if !s.ourObservation.IsReliable() {
p.logger.Info("expiring unsubmitted unreliable observation",
zap.String("message_id", s.LoggingID()),
zap.String("digest", hash),
zap.Duration("delta", delta),
)
delete(p.state.signatures, hash)
aggregationStateTimeout.Inc()
break
}
// Reobservation requests should not be resubmitted but we will keep waiting for more observations.
if s.ourObservation.IsReobservation() {
if p.logger.Level().Enabled(zapcore.DebugLevel) {
p.logger.Debug("not submitting reobservation request for reobservation",
zap.String("message_id", s.LoggingID()),
zap.String("digest", hash),
zap.Duration("delta", delta),
)
}
break
}
// If we have already stored this VAA, there is no reason for us to request reobservation.
alreadyInDB, err := p.signedVaaAlreadyInDB(hash, s)
if err != nil {
p.logger.Error("failed to check if observation is already in DB, requesting reobservation",
zap.String("message_id", s.LoggingID()),
zap.String("hash", hash),
zap.Error(err))
}
if alreadyInDB {
if p.logger.Level().Enabled(zapcore.DebugLevel) {
p.logger.Debug("observation already in DB, not requesting reobservation",
zap.String("message_id", s.LoggingID()),
zap.String("digest", hash),
)
}
} else {
p.logger.Info("resubmitting observation",
zap.String("message_id", s.LoggingID()),
zap.String("digest", hash),
zap.Duration("delta", delta),
zap.String("firstObserved", s.firstObserved.String()),
)
req := &gossipv1.ObservationRequest{
ChainId: uint32(s.ourObservation.GetEmitterChain()),
TxHash: s.txHash,
}
if err := common.PostObservationRequest(p.obsvReqSendC, req); err != nil {
p.logger.Warn("failed to broadcast re-observation request", zap.String("message_id", s.LoggingID()), zap.Error(err))
}
p.gossipSendC <- s.ourMsg
s.retryCtr++
s.nextRetry = time.Now().Add(nextRetryDuration(s.retryCtr))
aggregationStateRetries.Inc()
}
} else {
// For nil state entries, we log the quorum to determine whether the
// network reached consensus without us. We don't know the correct guardian
// set, so we simply use the most recent one.
hasSigs := len(s.signatures)
wantSigs := vaa.CalculateQuorum(len(p.gs.Keys))
if p.logger.Level().Enabled(zapcore.DebugLevel) {
p.logger.Debug("expiring unsubmitted nil observation",
zap.String("message_id", s.LoggingID()),
zap.String("digest", hash),
zap.Duration("delta", delta),
zap.Int("have_sigs", hasSigs),
zap.Int("required_sigs", wantSigs),
zap.Bool("quorum", hasSigs >= wantSigs),
)
}
delete(p.state.signatures, hash)
aggregationStateUnobserved.Inc()
}
}
}
// Clean up old pythnet VAAs.
oldestTime := time.Now().Add(-time.Hour)
for key, pe := range p.pythnetVaas {
if pe.updateTime.Before(oldestTime) {
delete(p.pythnetVaas, key)
}
}
}
// signedVaaAlreadyInDB checks if the VAA is already in the DB. If it is, it makes sure the hash matches.
func (p *Processor) signedVaaAlreadyInDB(hash string, s *state) (bool, error) {
if s.ourObservation == nil {
p.logger.Debug("unable to check if VAA is already in DB, no observation", zap.String("digest", hash))
return false, nil
}
vaaID, err := db.VaaIDFromString(s.ourObservation.MessageID())
if err != nil {
return false, fmt.Errorf(`failed to generate VAA ID from message id "%s": %w`, s.ourObservation.MessageID(), err)
}
vb, err := p.db.GetSignedVAABytes(*vaaID)
if err != nil {
if err == db.ErrVAANotFound {
if p.logger.Level().Enabled(zapcore.DebugLevel) {
p.logger.Debug("VAA not in DB",
zap.String("message_id", s.ourObservation.MessageID()),
zap.String("digest", hash),
)
}
return false, nil
} else {
return false, fmt.Errorf(`failed to look up message id "%s" in db: %w`, s.ourObservation.MessageID(), err)
}
}
v, err := vaa.Unmarshal(vb)
if err != nil {
return false, fmt.Errorf("failed to unmarshal VAA: %w", err)
}
oldHash := hex.EncodeToString(v.SigningDigest().Bytes())
if hash != oldHash {
if p.logger.Core().Enabled(zapcore.DebugLevel) {
p.logger.Debug("VAA already in DB but hash is different",
zap.String("message_id", s.ourObservation.MessageID()),
zap.String("old_hash", oldHash),
zap.String("new_hash", hash))
}
return false, fmt.Errorf("hash mismatch in_db: %s, new: %s", oldHash, hash)
}
return true, nil
}