From f017a5226ee16cd57a64f84dbeebbb27d73ecb29 Mon Sep 17 00:00:00 2001 From: Peter Nose Date: Mon, 31 Jul 2023 12:35:41 +0200 Subject: [PATCH] go/consensus/roothash: Track runtime proposer liveness The roothash application now monitors the runtime proposer liveness, which runtimes can utilize to penalize proposers with insufficient commitments. --- .changelog/5334.feature.md | 10 +++ .../cometbft/apps/roothash/liveness.go | 31 ++++++-- .../cometbft/apps/roothash/liveness_test.go | 66 ++++++++++++++++- .../cometbft/apps/roothash/roothash.go | 11 +++ .../cometbft/apps/roothash/transactions.go | 7 ++ go/registry/api/runtime.go | 5 ++ go/roothash/api/liveness.go | 20 +++++- go/roothash/tests/tester.go | 70 ++++++++++++++++++- go/scheduler/api/api.go | 36 +++++++--- 9 files changed, 233 insertions(+), 23 deletions(-) create mode 100644 .changelog/5334.feature.md diff --git a/.changelog/5334.feature.md b/.changelog/5334.feature.md new file mode 100644 index 00000000000..407c414ed18 --- /dev/null +++ b/.changelog/5334.feature.md @@ -0,0 +1,10 @@ +go/consensus/roothash: Track runtime proposer liveness + +The roothash application now monitors the runtime proposer liveness, which +runtimes can utilize to penalize proposers with insufficient commitments. +To activate penalties for such nodes, the executor committee parameters +need to be updated by configuring the following setting: + +- `MaxMissedProposalsPercent`: The maximum percentage of proposed rounds + in an epoch that can fail for a node to be considered live. Zero means + that all proposed rounds can fail. diff --git a/go/consensus/cometbft/apps/roothash/liveness.go b/go/consensus/cometbft/apps/roothash/liveness.go index e25bd57a649..b5edd4c9695 100644 --- a/go/consensus/cometbft/apps/roothash/liveness.go +++ b/go/consensus/cometbft/apps/roothash/liveness.go @@ -2,6 +2,7 @@ package roothash import ( "fmt" + "math" beacon "github.com/oasisprotocol/oasis-core/go/beacon/api" "github.com/oasisprotocol/oasis-core/go/common/crypto/signature" @@ -31,6 +32,7 @@ func processLivenessStatistics(ctx *tmapi.Context, epoch beacon.EpochTime, rtSta if maxFailures == 0 { maxFailures = 255 } + maxMissedProposalsPercent := uint64(rtState.Runtime.Executor.MaxMissedProposalsPercent) slashParams := rtState.Runtime.Staking.Slashing[staking.SlashRuntimeLiveness] ctx.Logger().Debug("evaluating node liveness", @@ -41,14 +43,26 @@ func processLivenessStatistics(ctx *tmapi.Context, epoch beacon.EpochTime, rtSta ) // Collect per node liveness statistics as a single node can have multiple roles. - goodRoundsPerNode := make(map[signature.PublicKey]uint64) + type Stats struct { + liveRounds uint64 + finalizedProposals uint64 + missedProposals uint64 + } + statsPerNode := make(map[signature.PublicKey]*Stats) for i, member := range rtState.ExecutorPool.Committee.Members { - goodRoundsPerNode[member.PublicKey] += rtState.LivenessStatistics.LiveRounds[i] + stats, ok := statsPerNode[member.PublicKey] + if !ok { + stats = &Stats{} + statsPerNode[member.PublicKey] = stats + } + stats.liveRounds += rtState.LivenessStatistics.LiveRounds[i] + stats.finalizedProposals += rtState.LivenessStatistics.FinalizedProposals[i] + stats.missedProposals += rtState.LivenessStatistics.MissedProposals[i] } // Penalize nodes that were not live enough. regState := registryState.NewMutableState(ctx.State()) - for nodeID, liveRounds := range goodRoundsPerNode { + for nodeID, stats := range statsPerNode { status, err := regState.NodeStatus(ctx, nodeID) if err != nil { return fmt.Errorf("failed to retrieve status for node %s: %w", nodeID, err) @@ -57,16 +71,23 @@ func processLivenessStatistics(ctx *tmapi.Context, epoch beacon.EpochTime, rtSta continue } + maxMissedProposals := ((stats.missedProposals + stats.finalizedProposals) * maxMissedProposalsPercent) / 100 + if maxMissedProposalsPercent == 0 { + maxMissedProposals = math.MaxUint64 + } + switch { - case liveRounds >= minLiveRounds: + case stats.liveRounds >= minLiveRounds && stats.missedProposals <= maxMissedProposals: // Node is live. status.RecordSuccess(rtState.Runtime.ID, epoch) default: // Node is faulty. ctx.Logger().Debug("node deemed faulty", "node_id", nodeID, - "live_rounds", liveRounds, + "live_rounds", stats.liveRounds, "min_live_rounds", minLiveRounds, + "missed_proposals", stats.missedProposals, + "max_missed_proposals", maxMissedProposals, ) status.RecordFailure(rtState.Runtime.ID, epoch) diff --git a/go/consensus/cometbft/apps/roothash/liveness_test.go b/go/consensus/cometbft/apps/roothash/liveness_test.go index 74973cf069a..e436b3cbde6 100644 --- a/go/consensus/cometbft/apps/roothash/liveness_test.go +++ b/go/consensus/cometbft/apps/roothash/liveness_test.go @@ -39,6 +39,7 @@ func TestLivenessProcessing(t *testing.T) { Executor: registry.ExecutorParameters{ MinLiveRoundsForEvaluation: 10, MinLiveRoundsPercent: 90, + MaxMissedProposalsPercent: 0, // Disabled. MaxLivenessFailures: 4, }, } @@ -74,8 +75,10 @@ func TestLivenessProcessing(t *testing.T) { Round: 0, }, LivenessStatistics: &roothash.LivenessStatistics{ - TotalRounds: 100, - LiveRounds: []uint64{91}, + TotalRounds: 100, + LiveRounds: []uint64{91}, // At least 90 required. + FinalizedProposals: []uint64{80}, + MissedProposals: []uint64{21}, // At most 20 allowed. }, } err = roothashState.SetRuntimeState(ctx, rtState) @@ -137,4 +140,63 @@ func TestLivenessProcessing(t *testing.T) { require.NoError(err, "NodeStatus") require.False(status.IsSuspended(runtime.ID, epoch), "node should not be suspended") require.Len(status.Faults, 0, "there should be no faults") + + // Start tracking proposer liveness. + rtState.Runtime.Executor.MaxMissedProposalsPercent = 20 + + // When node is proposing, everything should be left as is, no faults should be recorded. + rtState.LivenessStatistics.MissedProposals[0] = 20 // At most 20 allowed. + err = processLivenessStatistics(ctx, epoch, rtState) + require.NoError(err, "processLivenessStatistics") + status, err = registryState.NodeStatus(ctx, sk.Public()) + require.NoError(err, "NodeStatus") + require.False(status.IsSuspended(runtime.ID, epoch), "node should not be suspended") + + // When node is not proposing, it should be suspended, there should be one fault. + rtState.LivenessStatistics.MissedProposals[0] = 21 // At most 20 allowed. + err = processLivenessStatistics(ctx, epoch, rtState) + require.NoError(err, "processLivenessStatistics") + status, err = registryState.NodeStatus(ctx, sk.Public()) + require.NoError(err, "NodeStatus") + require.True(status.IsSuspended(runtime.ID, epoch), "node should be suspended") + require.EqualValues(1, status.Faults[runtime.ID].Failures, "there should be one fault") + require.EqualValues(epoch+2, status.Faults[runtime.ID].SuspendedUntil, "suspension time should be set") + + // Bump epoch so the node is no longer suspended. + epoch += 2 + + // When node is not proposing again, fault counter should increase. + rtState.LivenessStatistics.MissedProposals[0] = 21 // At most 20 allowed. + err = processLivenessStatistics(ctx, epoch, rtState) + require.NoError(err, "processLivenessStatistics") + status, err = registryState.NodeStatus(ctx, sk.Public()) + require.NoError(err, "NodeStatus") + require.True(status.IsSuspended(runtime.ID, epoch), "node should be suspended") + require.EqualValues(2, status.Faults[runtime.ID].Failures, "there should be two faults") + require.EqualValues(epoch+4, status.Faults[runtime.ID].SuspendedUntil, "suspension time should be set") + + // Bump epoch so the node is no longer suspended. + epoch += 4 + + // When node is proposing again, fault counter should decrease. + rtState.LivenessStatistics.MissedProposals[0] = 20 // At most 20 allowed. + err = processLivenessStatistics(ctx, epoch, rtState) + require.NoError(err, "processLivenessStatistics") + status, err = registryState.NodeStatus(ctx, sk.Public()) + require.NoError(err, "NodeStatus") + require.True(status.IsSuspended(runtime.ID, epoch), "node should be suspended") + require.EqualValues(1, status.Faults[runtime.ID].Failures, "there should be one fault") + require.EqualValues(epoch+2, status.Faults[runtime.ID].SuspendedUntil, "suspension time should be set") + + // Bump epoch so the node is no longer suspended. + epoch += 2 + + // When node is proposing again, fault counter should decrease. + rtState.LivenessStatistics.MissedProposals[0] = 0 // At most 20 allowed. + err = processLivenessStatistics(ctx, epoch, rtState) + require.NoError(err, "processLivenessStatistics") + status, err = registryState.NodeStatus(ctx, sk.Public()) + require.NoError(err, "NodeStatus") + require.False(status.IsSuspended(runtime.ID, epoch), "node should not be suspended") + require.Len(status.Faults, 0, "there should be no faults") } diff --git a/go/consensus/cometbft/apps/roothash/roothash.go b/go/consensus/cometbft/apps/roothash/roothash.go index 9daccc89924..ed13848dafe 100644 --- a/go/consensus/cometbft/apps/roothash/roothash.go +++ b/go/consensus/cometbft/apps/roothash/roothash.go @@ -540,6 +540,12 @@ func (app *rootHashApplication) tryFinalizeExecutorCommits( //nolint: gocyclo round := rtState.CurrentBlock.Header.Round + 1 pool := rtState.ExecutorPool + // Remember the index of the transaction scheduler within the committee. + schedulerIdx, err := pool.Committee.TransactionSchedulerIdx(pool.Round) + if err != nil { + return err + } + // Initialize per-epoch liveness statistics. if rtState.LivenessStatistics == nil { rtState.LivenessStatistics = roothash.NewLivenessStatistics(len(pool.Committee.Members)) @@ -573,7 +579,9 @@ func (app *rootHashApplication) tryFinalizeExecutorCommits( //nolint: gocyclo "round", round, ) + // Record that the round was finalized and that the scheduler received enough commitments. livenessStats.TotalRounds++ + livenessStats.FinalizedProposals[schedulerIdx]++ ec := commit.ToDDResult().(*commitment.ExecutorCommitment) @@ -757,6 +765,9 @@ func (app *rootHashApplication) tryFinalizeExecutorCommits( //nolint: gocyclo default: } + // Record that the scheduler did not receive enough commitments. + livenessStats.MissedProposals[schedulerIdx]++ + // Something else went wrong, emit empty error block. ctx.Logger().Debug("round failed", "round", round, diff --git a/go/consensus/cometbft/apps/roothash/transactions.go b/go/consensus/cometbft/apps/roothash/transactions.go index 967ba36e6e0..485bab6889b 100644 --- a/go/consensus/cometbft/apps/roothash/transactions.go +++ b/go/consensus/cometbft/apps/roothash/transactions.go @@ -95,6 +95,13 @@ func (app *rootHashApplication) executorProposerTimeout( return err } + // Record that the scheduler did not propose. + schedulerIdx, err := rtState.ExecutorPool.Committee.TransactionSchedulerIdx(rpt.Round) + if err != nil { + return err + } + rtState.LivenessStatistics.MissedProposals[schedulerIdx]++ + // Timeout triggered by executor node, emit empty error block. ctx.Logger().Debug("proposer round timeout", "round", rpt.Round, diff --git a/go/registry/api/runtime.go b/go/registry/api/runtime.go index e5ea894bdf5..891aac6eb89 100644 --- a/go/registry/api/runtime.go +++ b/go/registry/api/runtime.go @@ -101,6 +101,11 @@ type ExecutorParameters struct { // penalized. MinLiveRoundsPercent uint8 `json:"min_live_rounds_percent,omitempty"` + // MaxMissedProposalsPercent is the maximum percentage of proposed rounds in an epoch that + // can fail for a node to be considered live. Nodes not satisfying this may be penalized. + // Zero means that all proposed rounds can fail. + MaxMissedProposalsPercent uint8 `json:"max_missed_proposals_percent,omitempty"` + // MinLiveRoundsForEvaluation is the minimum number of live rounds in an epoch for the liveness // calculations to be considered for evaluation. MinLiveRoundsForEvaluation uint64 `json:"min_live_rounds_eval,omitempty"` diff --git a/go/roothash/api/liveness.go b/go/roothash/api/liveness.go index f94f446864e..5d35b556b1c 100644 --- a/go/roothash/api/liveness.go +++ b/go/roothash/api/liveness.go @@ -9,12 +9,28 @@ type LivenessStatistics struct { // LiveRounds is a list of counters, specified in committee order (e.g. counter at index i has // the value for node i in the committee). LiveRounds []uint64 `json:"good_rounds"` + + // FinalizedProposals is a list that records the number of finalized rounds when a node + // acted as a proposer. + // + // The list is ordered according to the committee arrangement (i.e., the counter at index i + // holds the value for the node at index i in the committee). + FinalizedProposals []uint64 `json:"finalized_proposals"` + + // MissedProposals is a list that records the number of failed rounds when a node + // acted as a proposer. + // + // The list is ordered according to the committee arrangement (i.e., the counter at index i + // holds the value for the node at index i in the committee). + MissedProposals []uint64 `json:"missed_proposals"` } // NewLivenessStatistics creates a new instance of per-epoch liveness statistics. func NewLivenessStatistics(numNodes int) *LivenessStatistics { return &LivenessStatistics{ - TotalRounds: 0, - LiveRounds: make([]uint64, numNodes), + TotalRounds: 0, + LiveRounds: make([]uint64, numNodes), + FinalizedProposals: make([]uint64, numNodes), + MissedProposals: make([]uint64, numNodes), } } diff --git a/go/roothash/tests/tester.go b/go/roothash/tests/tester.go index 5a91b1d5232..02fd45d3e9a 100644 --- a/go/roothash/tests/tester.go +++ b/go/roothash/tests/tester.go @@ -422,7 +422,11 @@ func (s *runtimeState) testSuccessfulRound(t *testing.T, backend api.Backend, co require.NoError(err, "GetRuntimeState") require.NotNil(state.LivenessStatistics, "liveness statistics should be set") require.EqualValues(1, state.LivenessStatistics.TotalRounds) - require.Len(state.LivenessStatistics.LiveRounds, len(s.executorCommittee.workers)+len(s.executorCommittee.backupWorkers)) + + numNodes := len(s.executorCommittee.workers) + len(s.executorCommittee.backupWorkers) + require.Len(state.LivenessStatistics.LiveRounds, numNodes) + require.Len(state.LivenessStatistics.FinalizedProposals, numNodes) + require.Len(state.LivenessStatistics.MissedProposals, numNodes) goodRoundsPerNode := make(map[signature.PublicKey]uint64) for i, member := range state.ExecutorPool.Committee.Members { @@ -435,6 +439,16 @@ func (s *runtimeState) testSuccessfulRound(t *testing.T, backend api.Backend, co require.EqualValues(1, v, "LiveRounds(%s)", nodeID) } + finalizedProposals := make([]uint64, numNodes) + missedProposals := make([]uint64, numNodes) + + schedulerIdx, err := s.executorCommittee.committee.TransactionSchedulerIdx(header.Round - 1) + require.NoError(err, "TransactionSchedulerIdx") + finalizedProposals[schedulerIdx]++ // The first round has been finalized. + + require.EqualValues(finalizedProposals, state.LivenessStatistics.FinalizedProposals, "there should be one finalized proposal") + require.EqualValues(missedProposals, state.LivenessStatistics.MissedProposals, "there should be no failed proposals") + // Nothing more to do after the block was received. return case <-time.After(recvTimeout): @@ -521,6 +535,25 @@ WaitForRoundTimeoutBlocks: require.NotNil(state.LivenessStatistics, "liveness statistics should be set") require.EqualValues(1, state.LivenessStatistics.TotalRounds, "timed out round should not count for liveness") + numNodes := len(s.executorCommittee.workers) + len(s.executorCommittee.backupWorkers) + require.Len(state.LivenessStatistics.LiveRounds, numNodes) + require.Len(state.LivenessStatistics.FinalizedProposals, numNodes) + require.Len(state.LivenessStatistics.MissedProposals, numNodes) + + finalizedProposals := make([]uint64, numNodes) + missedProposals := make([]uint64, numNodes) + + schedulerIdx, err := s.executorCommittee.committee.TransactionSchedulerIdx(header.Round - 2) + require.NoError(err, "TransactionSchedulerIdx") + finalizedProposals[schedulerIdx]++ // The first round has been finalized. + + schedulerIdx, err = s.executorCommittee.committee.TransactionSchedulerIdx(header.Round - 1) + require.NoError(err, "TransactionSchedulerIdx") + missedProposals[schedulerIdx]++ // The second round failed due to a timeout. + + require.EqualValues(finalizedProposals, state.LivenessStatistics.FinalizedProposals, "there should be one finalized proposal") + require.EqualValues(missedProposals, state.LivenessStatistics.MissedProposals, "there should be one failed proposal") + // Nothing more to do after the block was received. return case <-time.After(recvTimeout): @@ -678,9 +711,9 @@ WaitForProposerTimeoutBlocks: tx := api.NewRequestProposerTimeoutTx(0, nil, s.rt.Runtime.ID, child.Header.Round) err = consensusAPI.SignAndSubmitTx(ctx, consensus, timeoutNode.Signer, tx) - require.NoError(err, "ExectutorTimeout") + require.NoError(err, "ExecutorTimeout") - // Ensure that the round was finalized. + // Ensure that the round failed due to a proposer timeout. for { select { case blk := <-ch: @@ -695,6 +728,37 @@ WaitForProposerTimeoutBlocks: require.EqualValues(child.Header.Round+1, header.Round, "block round") require.EqualValues(block.RoundFailed, header.HeaderType, "block header type must be RoundFailed") + // Check that the liveness statistics were computed correctly. + state, err := backend.GetRuntimeState(ctx, &api.RuntimeRequest{ + RuntimeID: header.Namespace, + Height: blk.Height, + }) + require.NoError(err, "GetRuntimeState") + require.NotNil(state.LivenessStatistics, "liveness statistics should be set") + + numNodes := len(s.executorCommittee.workers) + len(s.executorCommittee.backupWorkers) + require.Len(state.LivenessStatistics.LiveRounds, numNodes) + require.Len(state.LivenessStatistics.FinalizedProposals, numNodes) + require.Len(state.LivenessStatistics.MissedProposals, numNodes) + + finalizedProposals := make([]uint64, numNodes) + missedProposals := make([]uint64, numNodes) + + schedulerIdx, err := s.executorCommittee.committee.TransactionSchedulerIdx(header.Round - 3) + require.NoError(err, "TransactionSchedulerIdx") + finalizedProposals[schedulerIdx]++ // The first round has been finalized. + + schedulerIdx, err = s.executorCommittee.committee.TransactionSchedulerIdx(header.Round - 2) + require.NoError(err, "TransactionSchedulerIdx") + missedProposals[schedulerIdx]++ // The second round failed due to a timeout. + + schedulerIdx, err = s.executorCommittee.committee.TransactionSchedulerIdx(header.Round - 1) + require.NoError(err, "TransactionSchedulerIdx") + missedProposals[schedulerIdx]++ // The third round failed due to a proposer timeout. + + require.EqualValues(finalizedProposals, state.LivenessStatistics.FinalizedProposals, "there should be one finalized proposal") + require.EqualValues(missedProposals, state.LivenessStatistics.MissedProposals, "there should be two failed proposals") + // Nothing more to do after the failed block was received. return case <-time.After(recvTimeout): diff --git a/go/scheduler/api/api.go b/go/scheduler/api/api.go index 75afcfd6af6..0891b383561 100644 --- a/go/scheduler/api/api.go +++ b/go/scheduler/api/api.go @@ -149,28 +149,42 @@ type Committee struct { ValidFor beacon.EpochTime `json:"valid_for"` } -// Workers returns committee nodes with Worker role. -func (c *Committee) Workers() []*CommitteeNode { - var workers []*CommitteeNode +// TransactionSchedulerIdx returns the index of the transaction scheduler +// within the committee for the provided round. +func (c *Committee) TransactionSchedulerIdx(round uint64) (int, error) { + var ( + total uint64 + worker uint64 + ) + for _, member := range c.Members { if member.Role != RoleWorker { continue } - workers = append(workers, member) + total++ + } + + for idx, member := range c.Members { + if member.Role != RoleWorker { + continue + } + if worker == round%total { + return idx, nil + } + worker++ } - return workers + + return 0, fmt.Errorf("no workers in committee") } // TransactionScheduler returns the transaction scheduler of the committee // based on the provided round. func (c *Committee) TransactionScheduler(round uint64) (*CommitteeNode, error) { - workers := c.Workers() - numNodes := uint64(len(workers)) - if numNodes == 0 { - return nil, fmt.Errorf("no workers in committee") + idx, err := c.TransactionSchedulerIdx(round) + if err != nil { + return nil, err } - schedulerIdx := round % numNodes - return workers[schedulerIdx], nil + return c.Members[idx], nil } // String returns a string representation of a Committee.