Skip to content

Commit

Permalink
fix: better handle reset of vote metrics (#68)
Browse files Browse the repository at this point in the history
  • Loading branch information
MattKetmo committed Mar 27, 2024
1 parent be2c27d commit 3e33d8f
Show file tree
Hide file tree
Showing 4 changed files with 68 additions and 36 deletions.
12 changes: 7 additions & 5 deletions pkg/watcher/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,11 @@ func (w *BlockWatcher) OnNodeStart(ctx context.Context, node *rpc.Node) error {

blockResp, err := node.Client.Block(ctx, nil)
if err != nil {
log.Warn().Err(err).Msg("failed to get latest block")
log.Warn().Err(err).
Str("node", node.Redacted()).
Msg("failed to get latest block")
} else {
w.handleNodeBlock(ctx, node, blockResp.Block)
w.handleNodeBlock(node, blockResp.Block)
}

// Ticker to sync validator set
Expand All @@ -70,7 +72,7 @@ func (w *BlockWatcher) OnNodeStart(ctx context.Context, node *rpc.Node) error {
return
case <-ticker.C:
if err := w.syncValidatorSet(ctx, node); err != nil {
log.Error().Err(err).Msg("failed to sync validator set")
log.Error().Err(err).Str("node", node.Redacted()).Msg("failed to sync validator set")
}
}
}
Expand All @@ -88,7 +90,7 @@ func (w *BlockWatcher) OnNewBlock(ctx context.Context, node *rpc.Node, evt *ctyp
blockEvent := evt.Data.(types.EventDataNewBlock)
block := blockEvent.Block

w.handleNodeBlock(ctx, node, block)
w.handleNodeBlock(node, block)

return nil
}
Expand All @@ -104,7 +106,7 @@ func (w *BlockWatcher) OnValidatorSetUpdates(ctx context.Context, node *rpc.Node
return nil
}

func (w *BlockWatcher) handleNodeBlock(ctx context.Context, node *rpc.Node, block *types.Block) {
func (w *BlockWatcher) handleNodeBlock(node *rpc.Node, block *types.Block) {
validatorSet := w.getValidatorSet()

if len(validatorSet) != block.LastCommit.Size() {
Expand Down
4 changes: 3 additions & 1 deletion pkg/watcher/upgrade.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,9 @@ func (w *UpgradeWatcher) Start(ctx context.Context) error {
if node == nil {
log.Warn().Msg("no node available to fetch upgrade plan")
} else if err := w.fetchUpgrade(ctx, node); err != nil {
log.Error().Err(err).Msg("failed to fetch upgrade plan")
log.Error().Err(err).
Str("node", node.Redacted()).
Msg("failed to fetch upgrade plan")
}

select {
Expand Down
4 changes: 3 additions & 1 deletion pkg/watcher/validators.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,9 @@ func (w *ValidatorsWatcher) Start(ctx context.Context) error {
if node == nil {
log.Warn().Msg("no node available to fetch validators")
} else if err := w.fetchValidators(ctx, node); err != nil {
log.Error().Err(err).Msg("failed to fetch staking validators")
log.Error().Err(err).
Str("node", node.Redacted()).
Msg("failed to fetch staking validators")
}

select {
Expand Down
84 changes: 55 additions & 29 deletions pkg/watcher/votes.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,9 @@ func (w *VotesWatcher) Start(ctx context.Context) error {
if node == nil {
log.Warn().Msg("no node available to fetch proposals")
} else if err := w.fetchProposals(ctx, node); err != nil {
log.Error().Err(err).Msg("failed to fetch pending proposals")
log.Error().Err(err).
Str("node", node.Redacted()).
Msg("failed to fetch pending proposals")
}

select {
Expand All @@ -55,17 +57,37 @@ func (w *VotesWatcher) Start(ctx context.Context) error {
}

func (w *VotesWatcher) fetchProposals(ctx context.Context, node *rpc.Node) error {
w.metrics.Vote.Reset()
var (
votes map[uint64]map[TrackedValidator]bool
err error
)

switch w.options.GovModuleVersion {
case "v1beta1":
return w.fetchProposalsV1Beta1(ctx, node)
votes, err = w.fetchProposalsV1Beta1(ctx, node)
default: // v1
return w.fetchProposalsV1(ctx, node)
votes, err = w.fetchProposalsV1(ctx, node)
}

if err != nil {
return err
}

w.metrics.Vote.Reset()
for proposalId, votes := range votes {
for validator, voted := range votes {
w.metrics.Vote.
WithLabelValues(node.ChainID(), validator.Address, validator.Name, fmt.Sprintf("%d", proposalId)).
Set(metrics.BoolToFloat64(voted))
}
}

return nil
}

func (w *VotesWatcher) fetchProposalsV1(ctx context.Context, node *rpc.Node) error {
func (w *VotesWatcher) fetchProposalsV1(ctx context.Context, node *rpc.Node) (map[uint64]map[TrackedValidator]bool, error) {
votes := make(map[uint64]map[TrackedValidator]bool)

clientCtx := (client.Context{}).WithClient(node.Client)
queryClient := gov.NewQueryClient(clientCtx)

Expand All @@ -74,13 +96,14 @@ func (w *VotesWatcher) fetchProposalsV1(ctx context.Context, node *rpc.Node) err
ProposalStatus: gov.StatusVotingPeriod,
})
if err != nil {
return fmt.Errorf("failed to get proposals: %w", err)
return votes, fmt.Errorf("failed to fetch proposals in voting period: %w", err)
}

chainID := node.ChainID()

// For each proposal, fetch validators vote
for _, proposal := range proposalsResp.GetProposals() {
votes[proposal.Id] = make(map[TrackedValidator]bool)
w.metrics.ProposalEndTime.WithLabelValues(chainID, fmt.Sprintf("%d", proposal.Id)).Set(float64(proposal.VotingEndTime.Unix()))

for _, validator := range w.validators {
Expand All @@ -95,34 +118,29 @@ func (w *VotesWatcher) fetchProposalsV1(ctx context.Context, node *rpc.Node) err
})

if isInvalidArgumentError(err) {
w.handleVoteV1(chainID, validator, proposal.Id, nil)
votes[proposal.Id][validator] = false
} else if err != nil {
return fmt.Errorf("failed to get validator vote for proposal %d: %w", proposal.Id, err)
return votes, fmt.Errorf("failed to get validator vote for proposal %d: %w", proposal.Id, err)
} else {
vote := voteResp.GetVote()
w.handleVoteV1(chainID, validator, proposal.Id, vote.Options)
voted := false
for _, option := range vote.Options {
if option.Option != gov.OptionEmpty {
voted = true
break
}
}
votes[proposal.Id][validator] = voted
}
}
}

return nil
return votes, nil
}

func (w *VotesWatcher) handleVoteV1(chainID string, validator TrackedValidator, proposalId uint64, votes []*gov.WeightedVoteOption) {
voted := false
for _, option := range votes {
if option.Option != gov.OptionEmpty {
voted = true
break
}
}
func (w *VotesWatcher) fetchProposalsV1Beta1(ctx context.Context, node *rpc.Node) (map[uint64]map[TrackedValidator]bool, error) {
votes := make(map[uint64]map[TrackedValidator]bool)

w.metrics.Vote.
WithLabelValues(chainID, validator.Address, validator.Name, fmt.Sprintf("%d", proposalId)).
Set(metrics.BoolToFloat64(voted))
}

func (w *VotesWatcher) fetchProposalsV1Beta1(ctx context.Context, node *rpc.Node) error {
clientCtx := (client.Context{}).WithClient(node.Client)
queryClient := govbeta.NewQueryClient(clientCtx)

Expand All @@ -131,13 +149,14 @@ func (w *VotesWatcher) fetchProposalsV1Beta1(ctx context.Context, node *rpc.Node
ProposalStatus: govbeta.StatusVotingPeriod,
})
if err != nil {
return fmt.Errorf("failed to get proposals: %w", err)
return votes, fmt.Errorf("failed to fetch proposals in voting period: %w", err)
}

chainID := node.ChainID()

// For each proposal, fetch validators vote
for _, proposal := range proposalsResp.GetProposals() {
votes[proposal.ProposalId] = make(map[TrackedValidator]bool)
w.metrics.ProposalEndTime.WithLabelValues(chainID, fmt.Sprintf("%d", proposal.ProposalId)).Set(float64(proposal.VotingEndTime.Unix()))

for _, validator := range w.validators {
Expand All @@ -152,17 +171,24 @@ func (w *VotesWatcher) fetchProposalsV1Beta1(ctx context.Context, node *rpc.Node
})

if isInvalidArgumentError(err) {
w.handleVoteV1Beta1(chainID, validator, proposal.ProposalId, nil)
votes[proposal.ProposalId][validator] = false
} else if err != nil {
return fmt.Errorf("failed to get validator vote for proposal %d: %w", proposal.ProposalId, err)
return votes, fmt.Errorf("failed to get validator vote for proposal %d: %w", proposal.ProposalId, err)
} else {
vote := voteResp.GetVote()
w.handleVoteV1Beta1(chainID, validator, proposal.ProposalId, vote.Options)
voted := false
for _, option := range vote.Options {
if option.Option != govbeta.OptionEmpty {
voted = true
break
}
}
votes[proposal.ProposalId][validator] = voted
}
}
}

return nil
return votes, nil
}

func (w *VotesWatcher) handleVoteV1Beta1(chainID string, validator TrackedValidator, proposalId uint64, votes []govbeta.WeightedVoteOption) {
Expand Down

0 comments on commit 3e33d8f

Please sign in to comment.