Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

node: governor publish gossip #1538

Merged
merged 7 commits into from
Sep 26, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
24 changes: 14 additions & 10 deletions node/pkg/governor/governor.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,16 +112,20 @@ func (ce *chainEntry) isBigTransfer(value uint64) bool {
}

type ChainGovernor struct {
db db.GovernorDB
logger *zap.Logger
mutex sync.Mutex
tokens map[tokenKey]*tokenEntry
tokensByCoinGeckoId map[string][]*tokenEntry
chains map[vaa.ChainID]*chainEntry
msgsToPublish []*common.MessagePublication
dayLengthInMinutes int
coinGeckoQuery string
env int
db db.GovernorDB
logger *zap.Logger
mutex sync.Mutex
tokens map[tokenKey]*tokenEntry
tokensByCoinGeckoId map[string][]*tokenEntry
chains map[vaa.ChainID]*chainEntry
msgsToPublish []*common.MessagePublication
dayLengthInMinutes int
coinGeckoQuery string
env int
nextStatusPublishTime time.Time
nextConfigPublishTime time.Time
statusPublishCounter int64
configPublishCounter int64
}

func NewChainGovernor(
Expand Down
168 changes: 167 additions & 1 deletion node/pkg/governor/governor_monitoring.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,22 @@
// - This is a single metric that indicates the total number of enqueued VAAs across all chains. This provides a quick check if
// anything is currently being limited.

// The chain governor also publishes the following messages to the gossip network
//
// SignedChainGovernorConfig
// - Published once every five minutes.
// - Contains a list of configured chains, along with the daily limit, big transaction size and current price.
//
// - SignedChainGovernorStatus
// - Published once a minute.
// - Contains a list of configured chains along with their remaining available notional value, the number of enqueued VAAs
// and information on zero or more enqueued VAAs.
// - Only the first 20 enqueued VAAs are include, to constrain the message size.

package governor

import (
"crypto/ecdsa"
"fmt"
"sort"
"time"
Expand All @@ -73,8 +86,13 @@ import (
"github.com/wormhole-foundation/wormhole/sdk/vaa"
"go.uber.org/zap"

ethCommon "github.com/ethereum/go-ethereum/common"
ethCrypto "github.com/ethereum/go-ethereum/crypto"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"

"google.golang.org/protobuf/proto"
)

// Admin command to display status to the log.
Expand Down Expand Up @@ -378,7 +396,7 @@ var (
})
)

func (gov *ChainGovernor) CollectMetrics(hb *gossipv1.Heartbeat) {
func (gov *ChainGovernor) CollectMetrics(hb *gossipv1.Heartbeat, sendC chan []byte, gk *ecdsa.PrivateKey, ourAddr ethCommon.Address) {
gov.mutex.Lock()
defer gov.mutex.Unlock()

Expand Down Expand Up @@ -431,4 +449,152 @@ func (gov *ChainGovernor) CollectMetrics(hb *gossipv1.Heartbeat) {
}

metricTotalEnqueuedVAAs.Set(float64(totalPending))

if startTime.After(gov.nextConfigPublishTime) {
gov.publishConfig(hb, sendC, gk, ourAddr)
gov.nextConfigPublishTime = startTime.Add(time.Minute * time.Duration(5))
}

if startTime.After(gov.nextStatusPublishTime) {
gov.publishStatus(hb, sendC, startTime, gk, ourAddr)
gov.nextStatusPublishTime = startTime.Add(time.Minute)
}
}

var governorMessagePrefix = []byte("governor|")

func (gov *ChainGovernor) publishConfig(hb *gossipv1.Heartbeat, sendC chan []byte, gk *ecdsa.PrivateKey, ourAddr ethCommon.Address) {
chains := make([]*gossipv1.ChainGovernorConfig_Chain, 0)
for _, ce := range gov.chains {
chains = append(chains, &gossipv1.ChainGovernorConfig_Chain{
ChainId: uint32(ce.emitterChainId),
NotionalLimit: ce.dailyLimit,
BigTransactionSize: ce.bigTransactionSize,
})
}

tokens := make([]*gossipv1.ChainGovernorConfig_Token, 0)
for tk, te := range gov.tokens {
price, _ := te.price.Float32()
bruce-riley marked this conversation as resolved.
Show resolved Hide resolved
tokens = append(tokens, &gossipv1.ChainGovernorConfig_Token{
OriginChainId: uint32(tk.chain),
OriginAddress: "0x" + tk.addr.String(),
Price: price,
})
}

gov.configPublishCounter += 1
bruce-riley marked this conversation as resolved.
Show resolved Hide resolved
payload := &gossipv1.ChainGovernorConfig{
NodeName: hb.NodeName,
Counter: gov.configPublishCounter,
Timestamp: hb.Timestamp,
Chains: chains,
Tokens: tokens,
}

b, err := proto.Marshal(payload)
if err != nil {
gov.logger.Error("cgov: failed to marshal config message", zap.Error(err))
return
}

digest := ethCrypto.Keccak256Hash(append(governorMessagePrefix, b...))

sig, err := ethCrypto.Sign(digest.Bytes(), gk)
if err != nil {
panic(err)
}

msg := gossipv1.GossipMessage{Message: &gossipv1.GossipMessage_SignedChainGovernorConfig{
SignedChainGovernorConfig: &gossipv1.SignedChainGovernorConfig{
Config: b,
Signature: sig,
GuardianAddr: ourAddr.Bytes(),
}}}

b, err = proto.Marshal(&msg)
if err != nil {
panic(err)
}

sendC <- b
}

func (gov *ChainGovernor) publishStatus(hb *gossipv1.Heartbeat, sendC chan []byte, startTime time.Time, gk *ecdsa.PrivateKey, ourAddr ethCommon.Address) {
chains := make([]*gossipv1.ChainGovernorStatus_Chain, 0)
numEnqueued := 0
for _, ce := range gov.chains {
value := sumValue(ce.transfers, startTime)
if value >= ce.dailyLimit {
value = 0
} else {
value = ce.dailyLimit - value
}

enqueuedVaas := make([]*gossipv1.ChainGovernorStatus_EnqueuedVAA, 0)
for _, pe := range ce.pending {
value, err := computeValue(pe.amount, pe.token)
if err != nil {
gov.logger.Error("cgov: failed to compute value of pending transfer", zap.String("msgID", pe.dbData.Msg.MessageIDString()), zap.Error(err))
value = 0
}

bruce-riley marked this conversation as resolved.
Show resolved Hide resolved
if numEnqueued < 20 {
numEnqueued = numEnqueued + 1
enqueuedVaas = append(enqueuedVaas, &gossipv1.ChainGovernorStatus_EnqueuedVAA{
Sequence: pe.dbData.Msg.Sequence,
ReleaseTime: uint32(pe.dbData.ReleaseTime.Unix()),
NotionalValue: value,
TxHash: pe.dbData.Msg.TxHash.String(),
})
}
}

emitter := gossipv1.ChainGovernorStatus_Emitter{
EmitterAddress: "0x" + ce.emitterAddr.String(),
TotalEnqueuedVaas: uint64(len(ce.pending)),
EnqueuedVaas: enqueuedVaas,
}

chains = append(chains, &gossipv1.ChainGovernorStatus_Chain{
ChainId: uint32(ce.emitterChainId),
RemainingAvailableNotional: value,
Emitters: []*gossipv1.ChainGovernorStatus_Emitter{&emitter},
})
}

gov.statusPublishCounter += 1
bruce-riley marked this conversation as resolved.
Show resolved Hide resolved
payload := &gossipv1.ChainGovernorStatus{
NodeName: hb.NodeName,
Counter: gov.statusPublishCounter,
Timestamp: hb.Timestamp,
Chains: chains,
}

b, err := proto.Marshal(payload)
if err != nil {
gov.logger.Error("cgov: failed to marshal status message", zap.Error(err))
return
}

digest := ethCrypto.Keccak256Hash(append(governorMessagePrefix, b...))

sig, err := ethCrypto.Sign(digest.Bytes(), gk)
if err != nil {
panic(err)
}

msg := gossipv1.GossipMessage{Message: &gossipv1.GossipMessage_SignedChainGovernorStatus{
SignedChainGovernorStatus: &gossipv1.SignedChainGovernorStatus{
Status: b,
Signature: sig,
GuardianAddr: ourAddr.Bytes(),
}}}

b, err = proto.Marshal(&msg)
if err != nil {
panic(err)
}

sendC <- b
}
6 changes: 5 additions & 1 deletion node/pkg/p2p/p2p.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ func Run(obsvC chan *gossipv1.SignedObservation, obsvReqC chan *gossipv1.Observa
collectNodeMetrics(ourAddr, h.ID(), heartbeat)

if gov != nil {
gov.CollectMetrics(heartbeat)
gov.CollectMetrics(heartbeat, sendC, gk, ourAddr)
}

b, err := proto.Marshal(heartbeat)
Expand Down Expand Up @@ -407,6 +407,10 @@ func Run(obsvC chan *gossipv1.SignedObservation, obsvReqC chan *gossipv1.Observa

obsvReqC <- r
}
case *gossipv1.GossipMessage_SignedChainGovernorConfig:
logger.Debug("cgov: received config message")
case *gossipv1.GossipMessage_SignedChainGovernorStatus:
logger.Debug("cgov: received status message")
default:
p2pMessagesReceived.WithLabelValues("unknown").Inc()
logger.Warn("received unknown message type (running outdated software?)",
Expand Down
72 changes: 72 additions & 0 deletions proto/gossip/v1/gossip.proto
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ message GossipMessage {
SignedObservationRequest signed_observation_request = 5;
SignedBatchObservation signed_batch_observation = 6;
SignedBatchVAAWithQuorum signed_batch_vaa_with_quorum = 7;
SignedChainGovernorConfig signed_chain_governor_config = 8;
SignedChainGovernorStatus signed_chain_governor_status = 9;
}
}

Expand Down Expand Up @@ -145,3 +147,73 @@ message SignedBatchObservation {
message SignedBatchVAAWithQuorum {
bytes batch_vaa = 1;
}

// This message is published every five minutes.
message SignedChainGovernorConfig {
// Serialized ChainGovernorConfig message.
bytes config = 1;

// ECDSA signature using the node's guardian key.
bytes signature = 2;

// Guardian address that signed this payload (truncated Eth address).
bytes guardian_addr = 3;
}

message ChainGovernorConfig {
message Chain {
uint32 chain_id = 1;
uint64 notional_limit = 2;
uint64 big_transaction_size = 3;
}

message Token {
uint32 origin_chain_id = 1;
string origin_address = 2; // human-readable hex-encoded (leading 0x)
float price = 3;
}

string node_name = 1;
int64 counter = 2;
int64 timestamp = 3;
repeated Chain chains = 4;
repeated Token tokens = 5;
}

// This message is published every minute.
message SignedChainGovernorStatus {
// Serialized ChainGovernorStatus message.
bytes status = 1;

// ECDSA signature using the node's guardian key.
bytes signature = 2;

// Guardian address that signed this payload (truncated Eth address).
bytes guardian_addr = 3;
}

message ChainGovernorStatus {
message EnqueuedVAA {
uint64 sequence = 1; // Chain and emitter address are assumed.
uint32 release_time = 2;
uint64 notional_value = 3;
string tx_hash = 4;
}

message Emitter {
string emitter_address = 1; // human-readable hex-encoded (leading 0x)
uint64 total_enqueued_vaas = 2;
repeated EnqueuedVAA enqueued_vaas = 3; // Only the first 20 will be included.
}

message Chain {
uint32 chain_id = 1;
uint64 remaining_available_notional = 2;
repeated Emitter emitters = 3;
}

string node_name = 1;
bruce-riley marked this conversation as resolved.
Show resolved Hide resolved
int64 counter = 2;
int64 timestamp = 3;
repeated Chain chains = 4;
}