Skip to content

Commit

Permalink
node: prevent reobservation of unreliable messages (#1627)
Browse files Browse the repository at this point in the history
  • Loading branch information
hendrikhofstadt committed Sep 26, 2022
1 parent 1febea0 commit 5993a23
Show file tree
Hide file tree
Showing 6 changed files with 35 additions and 3 deletions.
4 changes: 4 additions & 0 deletions node/pkg/common/chainlock.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@ type MessagePublication struct {
EmitterChain vaa.ChainID
EmitterAddress vaa.Address
Payload []byte

// Unreliable indicates if this message can be reobserved. If a message is considered unreliable it cannot be
// reobserved.
Unreliable bool
}

func (msg *MessagePublication) MessageID() []byte {
Expand Down
11 changes: 9 additions & 2 deletions node/pkg/processor/cleanup.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,12 +178,19 @@ func (p *Processor) handleCleanup(ctx context.Context) {
aggregationStateTimeout.Inc()
case !s.submitted && delta.Minutes() >= 5 && time.Since(s.lastRetry) >= retryTime:
// Poor observation has been unsubmitted for five minutes - clearly, something went wrong.
// If we have previously submitted an observation, we can make another attempt to get it over
// the finish line by sending a re-observation request to the network and rebroadcasting our
// If we have previously submitted an observation, and it was reliable, we can make another attempt to get
// it over the finish line by sending a re-observation request to the network and rebroadcasting our
// sig. If we do not have an observation, it means we either never observed it, or it got
// revived by a malfunctioning guardian node, in which case, we can't do anything about it
// and just delete it to keep our state nice and lean.
if s.ourMsg != nil {
// Unreliable observations cannot be resubmitted and can be considered failed after 5 minutes
if !s.ourObservation.IsReliable() {
p.logger.Info("expiring unsubmitted unreliable observation", zap.String("digest", hash), zap.Duration("delta", delta))
delete(p.state.signatures, hash)
aggregationStateTimeout.Inc()
break
}
p.logger.Info("resubmitting observation",
zap.String("digest", hash),
zap.Duration("delta", delta),
Expand Down
1 change: 1 addition & 0 deletions node/pkg/processor/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ func (p *Processor) handleMessage(ctx context.Context, k *common.MessagePublicat
Sequence: k.Sequence,
ConsistencyLevel: k.ConsistencyLevel,
},
Unreliable: k.Unreliable,
}

// A governance message should never be emitted on-chain
Expand Down
2 changes: 2 additions & 0 deletions node/pkg/processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ type (
// SigningMsg returns the hash of the signing body of the observation. This is used
// for signature generation and verification.
SigningMsg() ethcommon.Hash
// IsReliable returns whether this message is considered reliable meaning it can be reobserved.
IsReliable() bool
// HandleQuorum finishes processing the observation once a quorum of signatures have
// been received for it.
HandleQuorum(sigs []*vaa.Signature, hash string, p *Processor)
Expand Down
5 changes: 5 additions & 0 deletions node/pkg/processor/vaa.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

type VAA struct {
vaa.VAA
Unreliable bool
}

func (v *VAA) HandleQuorum(sigs []*vaa.Signature, hash string, p *Processor) {
Expand Down Expand Up @@ -45,3 +46,7 @@ func (v *VAA) HandleQuorum(sigs []*vaa.Signature, hash string, p *Processor) {
p.attestationEvents.ReportVAAQuorum(signed)
p.state.signatures[hash].submitted = true
}

func (v *VAA) IsReliable() bool {
return !v.Unreliable
}
15 changes: 14 additions & 1 deletion node/pkg/solana/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,8 @@ const (
postMessageInstructionNumAccounts = 9
postMessageInstructionID = 0x01
postMessageUnreliableInstructionID = 0x08
accountPrefixReliable = "msg"
accountPrefixUnreliable = "msu"
)

// PostMessageData represents the user-supplied, untrusted instruction data
Expand Down Expand Up @@ -499,7 +501,7 @@ func (s *SolanaWatcher) fetchMessageAccount(ctx context.Context, logger *zap.Log
}

data := info.Value.Data.GetBinary()
if string(data[:3]) != "msg" && string(data[:3]) != "msu" {
if string(data[:3]) != accountPrefixReliable && string(data[:3]) != accountPrefixUnreliable {
p2p.DefaultRegistry.AddErrorCount(s.chainID, 1)
solanaConnectionErrors.WithLabelValues(s.networkName, string(s.commitment), "bad_account_data").Inc()
logger.Error("account is not a message account",
Expand Down Expand Up @@ -534,6 +536,16 @@ func (s *SolanaWatcher) processMessageAccount(logger *zap.Logger, data []byte, a
var txHash eth_common.Hash
copy(txHash[:], acc[:])

var reliable bool
switch string(data[:3]) {
case accountPrefixReliable:
reliable = true
case accountPrefixUnreliable:
reliable = false
default:
panic("invalid prefix")
}

observation := &common.MessagePublication{
TxHash: txHash,
Timestamp: time.Unix(int64(proposal.SubmissionTime), 0),
Expand All @@ -543,6 +555,7 @@ func (s *SolanaWatcher) processMessageAccount(logger *zap.Logger, data []byte, a
EmitterAddress: proposal.EmitterAddress,
Payload: proposal.Payload,
ConsistencyLevel: proposal.ConsistencyLevel,
Unreliable: !reliable,
}

solanaMessagesConfirmed.WithLabelValues(s.networkName).Inc()
Expand Down

0 comments on commit 5993a23

Please sign in to comment.