Skip to content

Commit

Permalink
go/consensus/roothash: Track runtime proposer liveness
Browse files Browse the repository at this point in the history
The roothash application now monitors the runtime proposer liveness, which
runtimes can utilize to penalize proposers with insufficient commitments.
  • Loading branch information
peternose committed Aug 1, 2023
1 parent 4425f68 commit f017a52
Show file tree
Hide file tree
Showing 9 changed files with 233 additions and 23 deletions.
10 changes: 10 additions & 0 deletions .changelog/5334.feature.md
@@ -0,0 +1,10 @@
go/consensus/roothash: Track runtime proposer liveness

The roothash application now monitors the runtime proposer liveness, which
runtimes can utilize to penalize proposers with insufficient commitments.
To activate penalties for such nodes, the executor committee parameters
need to be updated by configuring the following setting:

- `MaxMissedProposalsPercent`: The maximum percentage of proposed rounds
in an epoch that can fail for a node to be considered live. Zero means
that all proposed rounds can fail.
31 changes: 26 additions & 5 deletions go/consensus/cometbft/apps/roothash/liveness.go
Expand Up @@ -2,6 +2,7 @@ package roothash

import (
"fmt"
"math"

beacon "github.com/oasisprotocol/oasis-core/go/beacon/api"
"github.com/oasisprotocol/oasis-core/go/common/crypto/signature"
Expand Down Expand Up @@ -31,6 +32,7 @@ func processLivenessStatistics(ctx *tmapi.Context, epoch beacon.EpochTime, rtSta
if maxFailures == 0 {
maxFailures = 255
}
maxMissedProposalsPercent := uint64(rtState.Runtime.Executor.MaxMissedProposalsPercent)
slashParams := rtState.Runtime.Staking.Slashing[staking.SlashRuntimeLiveness]

ctx.Logger().Debug("evaluating node liveness",
Expand All @@ -41,14 +43,26 @@ func processLivenessStatistics(ctx *tmapi.Context, epoch beacon.EpochTime, rtSta
)

// Collect per node liveness statistics as a single node can have multiple roles.
goodRoundsPerNode := make(map[signature.PublicKey]uint64)
type Stats struct {
liveRounds uint64
finalizedProposals uint64
missedProposals uint64
}
statsPerNode := make(map[signature.PublicKey]*Stats)
for i, member := range rtState.ExecutorPool.Committee.Members {
goodRoundsPerNode[member.PublicKey] += rtState.LivenessStatistics.LiveRounds[i]
stats, ok := statsPerNode[member.PublicKey]
if !ok {
stats = &Stats{}
statsPerNode[member.PublicKey] = stats
}
stats.liveRounds += rtState.LivenessStatistics.LiveRounds[i]
stats.finalizedProposals += rtState.LivenessStatistics.FinalizedProposals[i]
stats.missedProposals += rtState.LivenessStatistics.MissedProposals[i]
}

// Penalize nodes that were not live enough.
regState := registryState.NewMutableState(ctx.State())
for nodeID, liveRounds := range goodRoundsPerNode {
for nodeID, stats := range statsPerNode {
status, err := regState.NodeStatus(ctx, nodeID)
if err != nil {
return fmt.Errorf("failed to retrieve status for node %s: %w", nodeID, err)
Expand All @@ -57,16 +71,23 @@ func processLivenessStatistics(ctx *tmapi.Context, epoch beacon.EpochTime, rtSta
continue
}

maxMissedProposals := ((stats.missedProposals + stats.finalizedProposals) * maxMissedProposalsPercent) / 100
if maxMissedProposalsPercent == 0 {
maxMissedProposals = math.MaxUint64
}

switch {
case liveRounds >= minLiveRounds:
case stats.liveRounds >= minLiveRounds && stats.missedProposals <= maxMissedProposals:
// Node is live.
status.RecordSuccess(rtState.Runtime.ID, epoch)
default:
// Node is faulty.
ctx.Logger().Debug("node deemed faulty",
"node_id", nodeID,
"live_rounds", liveRounds,
"live_rounds", stats.liveRounds,
"min_live_rounds", minLiveRounds,
"missed_proposals", stats.missedProposals,
"max_missed_proposals", maxMissedProposals,
)

status.RecordFailure(rtState.Runtime.ID, epoch)
Expand Down
66 changes: 64 additions & 2 deletions go/consensus/cometbft/apps/roothash/liveness_test.go
Expand Up @@ -39,6 +39,7 @@ func TestLivenessProcessing(t *testing.T) {
Executor: registry.ExecutorParameters{
MinLiveRoundsForEvaluation: 10,
MinLiveRoundsPercent: 90,
MaxMissedProposalsPercent: 0, // Disabled.
MaxLivenessFailures: 4,
},
}
Expand Down Expand Up @@ -74,8 +75,10 @@ func TestLivenessProcessing(t *testing.T) {
Round: 0,
},
LivenessStatistics: &roothash.LivenessStatistics{
TotalRounds: 100,
LiveRounds: []uint64{91},
TotalRounds: 100,
LiveRounds: []uint64{91}, // At least 90 required.
FinalizedProposals: []uint64{80},
MissedProposals: []uint64{21}, // At most 20 allowed.
},
}
err = roothashState.SetRuntimeState(ctx, rtState)
Expand Down Expand Up @@ -137,4 +140,63 @@ func TestLivenessProcessing(t *testing.T) {
require.NoError(err, "NodeStatus")
require.False(status.IsSuspended(runtime.ID, epoch), "node should not be suspended")
require.Len(status.Faults, 0, "there should be no faults")

// Start tracking proposer liveness.
rtState.Runtime.Executor.MaxMissedProposalsPercent = 20

// When node is proposing, everything should be left as is, no faults should be recorded.
rtState.LivenessStatistics.MissedProposals[0] = 20 // At most 20 allowed.
err = processLivenessStatistics(ctx, epoch, rtState)
require.NoError(err, "processLivenessStatistics")
status, err = registryState.NodeStatus(ctx, sk.Public())
require.NoError(err, "NodeStatus")
require.False(status.IsSuspended(runtime.ID, epoch), "node should not be suspended")

// When node is not proposing, it should be suspended, there should be one fault.
rtState.LivenessStatistics.MissedProposals[0] = 21 // At most 20 allowed.
err = processLivenessStatistics(ctx, epoch, rtState)
require.NoError(err, "processLivenessStatistics")
status, err = registryState.NodeStatus(ctx, sk.Public())
require.NoError(err, "NodeStatus")
require.True(status.IsSuspended(runtime.ID, epoch), "node should be suspended")
require.EqualValues(1, status.Faults[runtime.ID].Failures, "there should be one fault")
require.EqualValues(epoch+2, status.Faults[runtime.ID].SuspendedUntil, "suspension time should be set")

// Bump epoch so the node is no longer suspended.
epoch += 2

// When node is not proposing again, fault counter should increase.
rtState.LivenessStatistics.MissedProposals[0] = 21 // At most 20 allowed.
err = processLivenessStatistics(ctx, epoch, rtState)
require.NoError(err, "processLivenessStatistics")
status, err = registryState.NodeStatus(ctx, sk.Public())
require.NoError(err, "NodeStatus")
require.True(status.IsSuspended(runtime.ID, epoch), "node should be suspended")
require.EqualValues(2, status.Faults[runtime.ID].Failures, "there should be two faults")
require.EqualValues(epoch+4, status.Faults[runtime.ID].SuspendedUntil, "suspension time should be set")

// Bump epoch so the node is no longer suspended.
epoch += 4

// When node is proposing again, fault counter should decrease.
rtState.LivenessStatistics.MissedProposals[0] = 20 // At most 20 allowed.
err = processLivenessStatistics(ctx, epoch, rtState)
require.NoError(err, "processLivenessStatistics")
status, err = registryState.NodeStatus(ctx, sk.Public())
require.NoError(err, "NodeStatus")
require.True(status.IsSuspended(runtime.ID, epoch), "node should be suspended")
require.EqualValues(1, status.Faults[runtime.ID].Failures, "there should be one fault")
require.EqualValues(epoch+2, status.Faults[runtime.ID].SuspendedUntil, "suspension time should be set")

// Bump epoch so the node is no longer suspended.
epoch += 2

// When node is proposing again, fault counter should decrease.
rtState.LivenessStatistics.MissedProposals[0] = 0 // At most 20 allowed.
err = processLivenessStatistics(ctx, epoch, rtState)
require.NoError(err, "processLivenessStatistics")
status, err = registryState.NodeStatus(ctx, sk.Public())
require.NoError(err, "NodeStatus")
require.False(status.IsSuspended(runtime.ID, epoch), "node should not be suspended")
require.Len(status.Faults, 0, "there should be no faults")
}
11 changes: 11 additions & 0 deletions go/consensus/cometbft/apps/roothash/roothash.go
Expand Up @@ -540,6 +540,12 @@ func (app *rootHashApplication) tryFinalizeExecutorCommits( //nolint: gocyclo
round := rtState.CurrentBlock.Header.Round + 1
pool := rtState.ExecutorPool

// Remember the index of the transaction scheduler within the committee.
schedulerIdx, err := pool.Committee.TransactionSchedulerIdx(pool.Round)
if err != nil {
return err
}

// Initialize per-epoch liveness statistics.
if rtState.LivenessStatistics == nil {
rtState.LivenessStatistics = roothash.NewLivenessStatistics(len(pool.Committee.Members))
Expand Down Expand Up @@ -573,7 +579,9 @@ func (app *rootHashApplication) tryFinalizeExecutorCommits( //nolint: gocyclo
"round", round,
)

// Record that the round was finalized and that the scheduler received enough commitments.
livenessStats.TotalRounds++
livenessStats.FinalizedProposals[schedulerIdx]++

ec := commit.ToDDResult().(*commitment.ExecutorCommitment)

Expand Down Expand Up @@ -757,6 +765,9 @@ func (app *rootHashApplication) tryFinalizeExecutorCommits( //nolint: gocyclo
default:
}

// Record that the scheduler did not receive enough commitments.
livenessStats.MissedProposals[schedulerIdx]++

// Something else went wrong, emit empty error block.
ctx.Logger().Debug("round failed",
"round", round,
Expand Down
7 changes: 7 additions & 0 deletions go/consensus/cometbft/apps/roothash/transactions.go
Expand Up @@ -95,6 +95,13 @@ func (app *rootHashApplication) executorProposerTimeout(
return err
}

// Record that the scheduler did not propose.
schedulerIdx, err := rtState.ExecutorPool.Committee.TransactionSchedulerIdx(rpt.Round)
if err != nil {
return err
}
rtState.LivenessStatistics.MissedProposals[schedulerIdx]++

// Timeout triggered by executor node, emit empty error block.
ctx.Logger().Debug("proposer round timeout",
"round", rpt.Round,
Expand Down
5 changes: 5 additions & 0 deletions go/registry/api/runtime.go
Expand Up @@ -101,6 +101,11 @@ type ExecutorParameters struct {
// penalized.
MinLiveRoundsPercent uint8 `json:"min_live_rounds_percent,omitempty"`

// MaxMissedProposalsPercent is the maximum percentage of proposed rounds in an epoch that
// can fail for a node to be considered live. Nodes not satisfying this may be penalized.
// Zero means that all proposed rounds can fail.
MaxMissedProposalsPercent uint8 `json:"max_missed_proposals_percent,omitempty"`

// MinLiveRoundsForEvaluation is the minimum number of live rounds in an epoch for the liveness
// calculations to be considered for evaluation.
MinLiveRoundsForEvaluation uint64 `json:"min_live_rounds_eval,omitempty"`
Expand Down
20 changes: 18 additions & 2 deletions go/roothash/api/liveness.go
Expand Up @@ -9,12 +9,28 @@ type LivenessStatistics struct {
// LiveRounds is a list of counters, specified in committee order (e.g. counter at index i has
// the value for node i in the committee).
LiveRounds []uint64 `json:"good_rounds"`

// FinalizedProposals is a list that records the number of finalized rounds when a node
// acted as a proposer.
//
// The list is ordered according to the committee arrangement (i.e., the counter at index i
// holds the value for the node at index i in the committee).
FinalizedProposals []uint64 `json:"finalized_proposals"`

// MissedProposals is a list that records the number of failed rounds when a node
// acted as a proposer.
//
// The list is ordered according to the committee arrangement (i.e., the counter at index i
// holds the value for the node at index i in the committee).
MissedProposals []uint64 `json:"missed_proposals"`
}

// NewLivenessStatistics creates a new instance of per-epoch liveness statistics.
func NewLivenessStatistics(numNodes int) *LivenessStatistics {
return &LivenessStatistics{
TotalRounds: 0,
LiveRounds: make([]uint64, numNodes),
TotalRounds: 0,
LiveRounds: make([]uint64, numNodes),
FinalizedProposals: make([]uint64, numNodes),
MissedProposals: make([]uint64, numNodes),
}
}
70 changes: 67 additions & 3 deletions go/roothash/tests/tester.go
Expand Up @@ -422,7 +422,11 @@ func (s *runtimeState) testSuccessfulRound(t *testing.T, backend api.Backend, co
require.NoError(err, "GetRuntimeState")
require.NotNil(state.LivenessStatistics, "liveness statistics should be set")
require.EqualValues(1, state.LivenessStatistics.TotalRounds)
require.Len(state.LivenessStatistics.LiveRounds, len(s.executorCommittee.workers)+len(s.executorCommittee.backupWorkers))

numNodes := len(s.executorCommittee.workers) + len(s.executorCommittee.backupWorkers)
require.Len(state.LivenessStatistics.LiveRounds, numNodes)
require.Len(state.LivenessStatistics.FinalizedProposals, numNodes)
require.Len(state.LivenessStatistics.MissedProposals, numNodes)

goodRoundsPerNode := make(map[signature.PublicKey]uint64)
for i, member := range state.ExecutorPool.Committee.Members {
Expand All @@ -435,6 +439,16 @@ func (s *runtimeState) testSuccessfulRound(t *testing.T, backend api.Backend, co
require.EqualValues(1, v, "LiveRounds(%s)", nodeID)
}

finalizedProposals := make([]uint64, numNodes)
missedProposals := make([]uint64, numNodes)

schedulerIdx, err := s.executorCommittee.committee.TransactionSchedulerIdx(header.Round - 1)
require.NoError(err, "TransactionSchedulerIdx")
finalizedProposals[schedulerIdx]++ // The first round has been finalized.

require.EqualValues(finalizedProposals, state.LivenessStatistics.FinalizedProposals, "there should be one finalized proposal")
require.EqualValues(missedProposals, state.LivenessStatistics.MissedProposals, "there should be no failed proposals")

// Nothing more to do after the block was received.
return
case <-time.After(recvTimeout):
Expand Down Expand Up @@ -521,6 +535,25 @@ WaitForRoundTimeoutBlocks:
require.NotNil(state.LivenessStatistics, "liveness statistics should be set")
require.EqualValues(1, state.LivenessStatistics.TotalRounds, "timed out round should not count for liveness")

numNodes := len(s.executorCommittee.workers) + len(s.executorCommittee.backupWorkers)
require.Len(state.LivenessStatistics.LiveRounds, numNodes)
require.Len(state.LivenessStatistics.FinalizedProposals, numNodes)
require.Len(state.LivenessStatistics.MissedProposals, numNodes)

finalizedProposals := make([]uint64, numNodes)
missedProposals := make([]uint64, numNodes)

schedulerIdx, err := s.executorCommittee.committee.TransactionSchedulerIdx(header.Round - 2)
require.NoError(err, "TransactionSchedulerIdx")
finalizedProposals[schedulerIdx]++ // The first round has been finalized.

schedulerIdx, err = s.executorCommittee.committee.TransactionSchedulerIdx(header.Round - 1)
require.NoError(err, "TransactionSchedulerIdx")
missedProposals[schedulerIdx]++ // The second round failed due to a timeout.

require.EqualValues(finalizedProposals, state.LivenessStatistics.FinalizedProposals, "there should be one finalized proposal")
require.EqualValues(missedProposals, state.LivenessStatistics.MissedProposals, "there should be one failed proposal")

// Nothing more to do after the block was received.
return
case <-time.After(recvTimeout):
Expand Down Expand Up @@ -678,9 +711,9 @@ WaitForProposerTimeoutBlocks:

tx := api.NewRequestProposerTimeoutTx(0, nil, s.rt.Runtime.ID, child.Header.Round)
err = consensusAPI.SignAndSubmitTx(ctx, consensus, timeoutNode.Signer, tx)
require.NoError(err, "ExectutorTimeout")
require.NoError(err, "ExecutorTimeout")

// Ensure that the round was finalized.
// Ensure that the round failed due to a proposer timeout.
for {
select {
case blk := <-ch:
Expand All @@ -695,6 +728,37 @@ WaitForProposerTimeoutBlocks:
require.EqualValues(child.Header.Round+1, header.Round, "block round")
require.EqualValues(block.RoundFailed, header.HeaderType, "block header type must be RoundFailed")

// Check that the liveness statistics were computed correctly.
state, err := backend.GetRuntimeState(ctx, &api.RuntimeRequest{
RuntimeID: header.Namespace,
Height: blk.Height,
})
require.NoError(err, "GetRuntimeState")
require.NotNil(state.LivenessStatistics, "liveness statistics should be set")

numNodes := len(s.executorCommittee.workers) + len(s.executorCommittee.backupWorkers)
require.Len(state.LivenessStatistics.LiveRounds, numNodes)
require.Len(state.LivenessStatistics.FinalizedProposals, numNodes)
require.Len(state.LivenessStatistics.MissedProposals, numNodes)

finalizedProposals := make([]uint64, numNodes)
missedProposals := make([]uint64, numNodes)

schedulerIdx, err := s.executorCommittee.committee.TransactionSchedulerIdx(header.Round - 3)
require.NoError(err, "TransactionSchedulerIdx")
finalizedProposals[schedulerIdx]++ // The first round has been finalized.

schedulerIdx, err = s.executorCommittee.committee.TransactionSchedulerIdx(header.Round - 2)
require.NoError(err, "TransactionSchedulerIdx")
missedProposals[schedulerIdx]++ // The second round failed due to a timeout.

schedulerIdx, err = s.executorCommittee.committee.TransactionSchedulerIdx(header.Round - 1)
require.NoError(err, "TransactionSchedulerIdx")
missedProposals[schedulerIdx]++ // The third round failed due to a proposer timeout.

require.EqualValues(finalizedProposals, state.LivenessStatistics.FinalizedProposals, "there should be one finalized proposal")
require.EqualValues(missedProposals, state.LivenessStatistics.MissedProposals, "there should be two failed proposals")

// Nothing more to do after the failed block was received.
return
case <-time.After(recvTimeout):
Expand Down

0 comments on commit f017a52

Please sign in to comment.