Skip to content

Commit

Permalink
go/worker/compute: Optimize backup worker commit submission
Browse files Browse the repository at this point in the history
Backup compute workers now observe any gossiped commitments and pre-empt
consensus when it is obvious that there will be a discrepancy declared.
  • Loading branch information
kostko committed May 22, 2023
1 parent a0bbd8f commit 236d095
Show file tree
Hide file tree
Showing 4 changed files with 139 additions and 58 deletions.
4 changes: 4 additions & 0 deletions .changelog/5264.feature.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
go/worker/compute: Optimize backup worker commit submission

Backup compute workers now observe any gossiped commitments and pre-empt
consensus when it is obvious that there will be a discrepancy declared.
115 changes: 115 additions & 0 deletions go/worker/compute/executor/committee/discrepancy.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
package committee

import (
"github.com/oasisprotocol/oasis-core/go/common/crash"
roothash "github.com/oasisprotocol/oasis-core/go/roothash/api"
"github.com/oasisprotocol/oasis-core/go/roothash/api/commitment"
)

func (n *Node) handleDiscrepancyLocked(height uint64) {
n.logger.Warn("execution discrepancy detected")

crash.Here(crashPointDiscrepancyDetectedAfter)

discrepancyDetectedCount.With(n.getMetricLabels()).Inc()

// If the node is not a backup worker in this epoch, no need to do anything. Also if the
// node is an executor worker in this epoch, then it has already processed and submitted
// a commitment, so no need to do anything.
epoch := n.commonNode.Group.GetEpochSnapshot()
if !epoch.IsExecutorBackupWorker() || epoch.IsExecutorWorker() {
return
}

// Make sure that the runtime has synced this consensus block.
if rt := n.commonNode.GetHostedRuntime(); rt != nil {
err := rt.ConsensusSync(n.roundCtx, height)
if err != nil {
n.logger.Warn("failed to ask the runtime to sync the latest consensus block",
"err", err,
"height", height,
)
}
}

var state StateWaitingForEvent
switch s := n.state.(type) {
case StateWaitingForBatch:
// Discrepancy detected event received before the batch. We need to remember that there was
// a discrepancy and keep waiting for the batch.
s.discrepancyDetected = true
n.transitionLocked(s)
return
case StateWaitingForEvent:
state = s
default:
n.logger.Warn("ignoring received discrepancy event in incorrect state",
"state", s,
)
return
}

// Backup worker, start processing a batch.
n.logger.Info("backup worker activating and processing batch")
n.startProcessingBatchLocked(state.batch)
}

// HandleNewEventLocked implements NodeHooks.
// Guarded by n.commonNode.CrossNode.
func (n *Node) HandleNewEventLocked(ev *roothash.Event) {
switch {
case ev.ExecutionDiscrepancyDetected != nil:
n.handleDiscrepancyLocked(uint64(ev.Height))
}
}

func (n *Node) handleObservedExecutorCommitment(ec *commitment.ExecutorCommitment) {
n.commonNode.CrossNode.Lock()
defer n.commonNode.CrossNode.Unlock()

// Don't do anything if we are not a backup worker or we are an executor worker.
es := n.commonNode.Group.GetEpochSnapshot()
if !es.IsExecutorBackupWorker() || es.IsExecutorWorker() {
return
}

n.logger.Debug("observed executor commitment",
"commitment", ec,
)

// Make sure the executor commitment is for the next round.
currentRound := n.commonNode.CurrentBlock.Header.Round
nextRound := currentRound + 1
if ec.Header.Round != nextRound {
n.logger.Debug("observed executor commitment is not for the next round",
"ec_round", ec.Header.Round,
"next_round", nextRound,
)
return
}

// Initialize the pool if needed.
if n.commitPool.Round != currentRound {
n.commitPool.Runtime = es.GetRuntime()
n.commitPool.Committee = es.GetExecutorCommittee().Committee
n.commitPool.ResetCommitments(currentRound)
}

// TODO: Handle equivocation detection.

err := n.commitPool.AddExecutorCommitment(n.ctx, n.commonNode.CurrentBlock, es, ec, nil)
if err != nil {
n.logger.Debug("ignoring bad observed executor commitment",
"err", err,
"node_id", ec.NodeID,
)
return
}

// In case observed commits indicate a discrepancy, preempt consensus and immediately handle.
if _, err = n.commitPool.ProcessCommitments(false); err == commitment.ErrDiscrepancyDetected {
n.logger.Warn("observed commitments indicate discrepancy")

n.handleDiscrepancyLocked(uint64(n.commonNode.CurrentBlockHeight))
}
}
71 changes: 17 additions & 54 deletions go/worker/compute/executor/committee/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,8 @@ type Node struct { // nolint: maligned
roundCtx context.Context
roundCancelCtx context.CancelFunc

commitPool commitment.Pool

storage storage.LocalBackend
txSync txsync.Client

Expand Down Expand Up @@ -743,7 +745,7 @@ func (n *Node) maybeStartProcessingBatchLocked(batch *unresolvedBatch) {
case epoch.IsExecutorBackupWorker():
// Backup worker, wait for discrepancy event.
state, ok := n.state.(StateWaitingForBatch)
if ok && state.pendingEvent != nil {
if ok && state.discrepancyDetected {
// We have already received a discrepancy event, start processing immediately.
n.logger.Info("already received a discrepancy event, start processing batch")
n.startProcessingBatchLocked(batch)
Expand Down Expand Up @@ -1249,59 +1251,6 @@ func (n *Node) signAndSubmitCommitment(roundCtx context.Context, ec *commitment.
return nil
}

// HandleNewEventLocked implements NodeHooks.
// Guarded by n.commonNode.CrossNode.
func (n *Node) HandleNewEventLocked(ev *roothash.Event) {
switch {
case ev.ExecutionDiscrepancyDetected != nil:
n.logger.Warn("execution discrepancy detected")

crash.Here(crashPointDiscrepancyDetectedAfter)

discrepancyDetectedCount.With(n.getMetricLabels()).Inc()

// If the node is not a backup worker in this epoch, no need to do anything. Also if the
// node is an executor worker in this epoch, then it has already processed and submitted
// a commitment, so no need to do anything.
epoch := n.commonNode.Group.GetEpochSnapshot()
if !epoch.IsExecutorBackupWorker() || epoch.IsExecutorWorker() {
return
}

// Make sure that the runtime has synced this consensus block.
if rt := n.commonNode.GetHostedRuntime(); rt != nil {
err := rt.ConsensusSync(n.roundCtx, uint64(ev.Height))
if err != nil {
n.logger.Warn("failed to ask the runtime to sync the latest consensus block",
"err", err,
"height", ev.Height,
)
}
}

var state StateWaitingForEvent
switch s := n.state.(type) {
case StateWaitingForBatch:
// Discrepancy detected event received before the batch. We need to
// record the received event and keep waiting for the batch.
s.pendingEvent = ev.ExecutionDiscrepancyDetected
n.transitionLocked(s)
return
case StateWaitingForEvent:
state = s
default:
n.logger.Warn("ignoring received discrepancy event in incorrect state",
"state", s,
)
return
}

// Backup worker, start processing a batch.
n.logger.Info("backup worker activating and processing batch")
n.startProcessingBatchLocked(state.batch)
}
}

// Guarded by n.commonNode.CrossNode.
func (n *Node) handleProposalLocked(batch *unresolvedBatch) error {
n.logger.Debug("handling a new batch proposal",
Expand Down Expand Up @@ -1533,6 +1482,17 @@ func (n *Node) worker() {
schedSub, schedCh := n.commonNode.TxPool.WatchScheduler()
defer schedSub.Close()

// Subscribe to gossiped executor commitments.
ecCh, ecSub, err := n.commonNode.Consensus.RootHash().WatchExecutorCommitments(n.ctx)
if err != nil {
n.logger.Error("failed to subscribe to executor commitments",
"err", err,
)
close(n.initCh)
return
}
defer ecSub.Close()

// We are initialized.
close(n.initCh)

Expand Down Expand Up @@ -1576,6 +1536,9 @@ func (n *Node) worker() {
case txs := <-txCh:
// Check any queued transactions.
n.handleNewCheckedTransactions(txs)
case ec := <-ecCh:
// Process observed executor commitments.
n.handleObservedExecutorCommitment(ec)
case <-n.reselect:
// Recalculate select set.
}
Expand Down
7 changes: 3 additions & 4 deletions go/worker/compute/executor/committee/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"time"

"github.com/oasisprotocol/oasis-core/go/common/crypto/hash"
roothash "github.com/oasisprotocol/oasis-core/go/roothash/api"
"github.com/oasisprotocol/oasis-core/go/runtime/host/protocol"
"github.com/oasisprotocol/oasis-core/go/runtime/transaction"
storage "github.com/oasisprotocol/oasis-core/go/storage/api"
Expand Down Expand Up @@ -109,9 +108,9 @@ func (s StateNotReady) String() string {

// StateWaitingForBatch is the waiting for batch state.
type StateWaitingForBatch struct {
// Pending execute discrepancy detected event in case the node is a
// backup worker and the event was received before the batch.
pendingEvent *roothash.ExecutionDiscrepancyDetectedEvent
// Whether a discrepancy has been detected in case the node is a backup worker and the event has
// been received before the batch.
discrepancyDetected bool
}

// Name returns the name of the state.
Expand Down

0 comments on commit 236d095

Please sign in to comment.