Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

node: prevent reobservation of unreliable messages #1627

Merged
merged 1 commit into from
Sep 26, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 4 additions & 0 deletions node/pkg/common/chainlock.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@ type MessagePublication struct {
EmitterChain vaa.ChainID
EmitterAddress vaa.Address
Payload []byte

// Unreliable indicates if this message can be reobserved. If a message is considered unreliable it cannot be
// reobserved.
Unreliable bool
}

func (msg *MessagePublication) MessageID() []byte {
Expand Down
11 changes: 9 additions & 2 deletions node/pkg/processor/cleanup.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,12 +177,19 @@ func (p *Processor) handleCleanup(ctx context.Context) {
aggregationStateTimeout.Inc()
case !s.submitted && delta.Minutes() >= 5:
// Poor observation has been unsubmitted for five minutes - clearly, something went wrong.
// If we have previously submitted an observation, we can make another attempt to get it over
// the finish line by sending a re-observation request to the network and rebroadcasting our
// If we have previously submitted an observation, and it was reliable, we can make another attempt to get
// it over the finish line by sending a re-observation request to the network and rebroadcasting our
// sig. If we do not have an observation, it means we either never observed it, or it got
// revived by a malfunctioning guardian node, in which case, we can't do anything about it
// and just delete it to keep our state nice and lean.
if s.ourMsg != nil {
// Unreliable observations cannot be resubmitted and can be considered failed after 5 minutes
if !s.ourObservation.IsReliable() {
p.logger.Info("expiring unsubmitted unreliable observation", zap.String("digest", hash), zap.Duration("delta", delta))
delete(p.state.signatures, hash)
aggregationStateTimeout.Inc()
break
}
p.logger.Info("resubmitting observation",
zap.String("digest", hash),
zap.Duration("delta", delta),
Expand Down
1 change: 1 addition & 0 deletions node/pkg/processor/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ func (p *Processor) handleMessage(ctx context.Context, k *common.MessagePublicat
Sequence: k.Sequence,
ConsistencyLevel: k.ConsistencyLevel,
},
Unreliable: k.Unreliable,
}

// A governance message should never be emitted on-chain
Expand Down
2 changes: 2 additions & 0 deletions node/pkg/processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ type (
// SigningMsg returns the hash of the signing body of the observation. This is used
// for signature generation and verification.
SigningMsg() ethcommon.Hash
// IsReliable returns whether this message is considered reliable meaning it can be reobserved.
IsReliable() bool
// HandleQuorum finishes processing the observation once a quorum of signatures have
// been received for it.
HandleQuorum(sigs []*vaa.Signature, hash string, p *Processor)
Expand Down
5 changes: 5 additions & 0 deletions node/pkg/processor/vaa.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

type VAA struct {
vaa.VAA
Unreliable bool
}

func (v *VAA) HandleQuorum(sigs []*vaa.Signature, hash string, p *Processor) {
Expand Down Expand Up @@ -45,3 +46,7 @@ func (v *VAA) HandleQuorum(sigs []*vaa.Signature, hash string, p *Processor) {
p.attestationEvents.ReportVAAQuorum(signed)
p.state.signatures[hash].submitted = true
}

func (v *VAA) IsReliable() bool {
return !v.Unreliable
}
15 changes: 14 additions & 1 deletion node/pkg/solana/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,8 @@ const (
postMessageInstructionNumAccounts = 9
postMessageInstructionID = 0x01
postMessageUnreliableInstructionID = 0x08
accountPrefixReliable = "msg"
accountPrefixUnreliable = "msu"
)

// PostMessageData represents the user-supplied, untrusted instruction data
Expand Down Expand Up @@ -499,7 +501,7 @@ func (s *SolanaWatcher) fetchMessageAccount(ctx context.Context, logger *zap.Log
}

data := info.Value.Data.GetBinary()
if string(data[:3]) != "msg" && string(data[:3]) != "msu" {
if string(data[:3]) != accountPrefixReliable && string(data[:3]) != accountPrefixUnreliable {
p2p.DefaultRegistry.AddErrorCount(s.chainID, 1)
solanaConnectionErrors.WithLabelValues(s.networkName, string(s.commitment), "bad_account_data").Inc()
logger.Error("account is not a message account",
Expand Down Expand Up @@ -534,6 +536,16 @@ func (s *SolanaWatcher) processMessageAccount(logger *zap.Logger, data []byte, a
var txHash eth_common.Hash
copy(txHash[:], acc[:])

var reliable bool
switch string(data[:3]) {
case accountPrefixReliable:
reliable = true
case accountPrefixUnreliable:
reliable = false
default:
panic("invalid prefix")
hendrikhofstadt marked this conversation as resolved.
Show resolved Hide resolved
}

observation := &common.MessagePublication{
TxHash: txHash,
Timestamp: time.Unix(int64(proposal.SubmissionTime), 0),
Expand All @@ -543,6 +555,7 @@ func (s *SolanaWatcher) processMessageAccount(logger *zap.Logger, data []byte, a
EmitterAddress: proposal.EmitterAddress,
Payload: proposal.Payload,
ConsistencyLevel: proposal.ConsistencyLevel,
Unreliable: !reliable,
}

solanaMessagesConfirmed.WithLabelValues(s.networkName).Inc()
Expand Down