Skip to content

Commit

Permalink
go/worker/compute: Optimize backup worker commit submission
Browse files Browse the repository at this point in the history
Backup compute workers now observe any gossiped commitments and pre-empt
consensus when it is obvious that there will be a discrepancy declared.
  • Loading branch information
kostko committed May 19, 2023
1 parent 7c32876 commit ca3bb10
Show file tree
Hide file tree
Showing 3 changed files with 135 additions and 53 deletions.
4 changes: 4 additions & 0 deletions .changelog/5264.feature.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
go/worker/compute: Optimize backup worker commit submission

Backup compute workers now observe any gossiped commitments and pre-empt
consensus when it is obvious that there will be a discrepancy declared.
115 changes: 115 additions & 0 deletions go/worker/compute/executor/committee/discrepancy.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
package committee

import (
"github.com/oasisprotocol/oasis-core/go/common/crash"
roothash "github.com/oasisprotocol/oasis-core/go/roothash/api"
"github.com/oasisprotocol/oasis-core/go/roothash/api/commitment"
)

// HandleNewEventLocked implements NodeHooks.
// Guarded by n.commonNode.CrossNode.
func (n *Node) HandleNewEventLocked(ev *roothash.Event) {
switch {
case ev.ExecutionDiscrepancyDetected != nil:
n.logger.Warn("execution discrepancy detected")

crash.Here(crashPointDiscrepancyDetectedAfter)

discrepancyDetectedCount.With(n.getMetricLabels()).Inc()

// If the node is not a backup worker in this epoch, no need to do anything. Also if the
// node is an executor worker in this epoch, then it has already processed and submitted
// a commitment, so no need to do anything.
epoch := n.commonNode.Group.GetEpochSnapshot()
if !epoch.IsExecutorBackupWorker() || epoch.IsExecutorWorker() {
return
}

// Make sure that the runtime has synced this consensus block.
if rt := n.commonNode.GetHostedRuntime(); rt != nil {
err := rt.ConsensusSync(n.roundCtx, uint64(ev.Height))
if err != nil {
n.logger.Warn("failed to ask the runtime to sync the latest consensus block",
"err", err,
"height", ev.Height,
)
}
}

var state StateWaitingForEvent
switch s := n.state.(type) {
case StateWaitingForBatch:
// Discrepancy detected event received before the batch. We need to
// record the received event and keep waiting for the batch.
s.pendingEvent = ev.ExecutionDiscrepancyDetected
n.transitionLocked(s)
return
case StateWaitingForEvent:
state = s
default:
n.logger.Warn("ignoring received discrepancy event in incorrect state",
"state", s,
)
return
}

// Backup worker, start processing a batch.
n.logger.Info("backup worker activating and processing batch")
n.startProcessingBatchLocked(state.batch)
}
}

func (n *Node) handleObservedExecutorCommitment(ec *commitment.ExecutorCommitment) {
n.commonNode.CrossNode.Lock()
defer n.commonNode.CrossNode.Unlock()

// Don't do anything if we are not a backup worker or we are an executor worker.
es := n.commonNode.Group.GetEpochSnapshot()
if !es.IsExecutorBackupWorker() || es.IsExecutorWorker() {
return
}

n.logger.Debug("observed executor commitment",
"commitment", ec,
)

// Make sure the executor commitment is for the next round.
currentRound := n.commonNode.CurrentBlock.Header.Round
nextRound := currentRound + 1
if ec.Header.Round != nextRound {
n.logger.Debug("observed executor commitment is not for the next round",
"ec_round", ec.Header.Round,
"next_round", nextRound,
)
return
}

// Initialize the pool if needed.
if n.commitPool.Round != currentRound {
n.commitPool.Runtime = es.GetRuntime()
n.commitPool.Committee = es.GetExecutorCommittee().Committee
n.commitPool.ResetCommitments(currentRound)
}

// TODO: Handle equivocation detection.

err := n.commitPool.AddExecutorCommitment(n.ctx, n.commonNode.CurrentBlock, es, ec, nil)
if err != nil {
n.logger.Debug("ignoring bad observed executor commitment",
"err", err,
"node_id", ec.NodeID,
)
return
}

// In case observed commits indicate a discrepancy, preempt consensus and immediately handle.
if _, err = n.commitPool.ProcessCommitments(false); err == commitment.ErrDiscrepancyDetected {
n.logger.Warn("observed commitments indicate discrepancy")

n.HandleNewEventLocked(&roothash.Event{
ExecutionDiscrepancyDetected: &roothash.ExecutionDiscrepancyDetectedEvent{
Timeout: false,
},
})
}
}
69 changes: 16 additions & 53 deletions go/worker/compute/executor/committee/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,8 @@ type Node struct { // nolint: maligned
roundCtx context.Context
roundCancelCtx context.CancelFunc

commitPool commitment.Pool

storage storage.LocalBackend
txSync txsync.Client

Expand Down Expand Up @@ -1235,59 +1237,6 @@ func (n *Node) signAndSubmitCommitment(roundCtx context.Context, ec *commitment.
return nil
}

// HandleNewEventLocked implements NodeHooks.
// Guarded by n.commonNode.CrossNode.
func (n *Node) HandleNewEventLocked(ev *roothash.Event) {
switch {
case ev.ExecutionDiscrepancyDetected != nil:
n.logger.Warn("execution discrepancy detected")

crash.Here(crashPointDiscrepancyDetectedAfter)

discrepancyDetectedCount.With(n.getMetricLabels()).Inc()

// If the node is not a backup worker in this epoch, no need to do anything. Also if the
// node is an executor worker in this epoch, then it has already processed and submitted
// a commitment, so no need to do anything.
epoch := n.commonNode.Group.GetEpochSnapshot()
if !epoch.IsExecutorBackupWorker() || epoch.IsExecutorWorker() {
return
}

// Make sure that the runtime has synced this consensus block.
if rt := n.commonNode.GetHostedRuntime(); rt != nil {
err := rt.ConsensusSync(n.roundCtx, uint64(ev.Height))
if err != nil {
n.logger.Warn("failed to ask the runtime to sync the latest consensus block",
"err", err,
"height", ev.Height,
)
}
}

var state StateWaitingForEvent
switch s := n.state.(type) {
case StateWaitingForBatch:
// Discrepancy detected event received before the batch. We need to
// record the received event and keep waiting for the batch.
s.pendingEvent = ev.ExecutionDiscrepancyDetected
n.transitionLocked(s)
return
case StateWaitingForEvent:
state = s
default:
n.logger.Warn("ignoring received discrepancy event in incorrect state",
"state", s,
)
return
}

// Backup worker, start processing a batch.
n.logger.Info("backup worker activating and processing batch")
n.startProcessingBatchLocked(state.batch)
}
}

// Guarded by n.commonNode.CrossNode.
func (n *Node) handleProposalLocked(batch *unresolvedBatch) error {
n.logger.Debug("handling a new batch proposal",
Expand Down Expand Up @@ -1519,6 +1468,17 @@ func (n *Node) worker() {
schedSub, schedCh := n.commonNode.TxPool.WatchScheduler()
defer schedSub.Close()

// Subscribe to gossiped executor commitments.
ecCh, ecSub, err := n.commonNode.Consensus.RootHash().WatchExecutorCommitments(n.ctx)
if err != nil {
n.logger.Error("failed to subscribe to executor commitments",
"err", err,
)
close(n.initCh)
return
}
defer ecSub.Close()

// We are initialized.
close(n.initCh)

Expand Down Expand Up @@ -1562,6 +1522,9 @@ func (n *Node) worker() {
case txs := <-txCh:
// Check any queued transactions.
n.handleNewCheckedTransactions(txs)
case ec := <-ecCh:
// Process observed executor commitments.
n.handleObservedExecutorCommitment(ec)
case <-n.reselect:
// Recalculate select set.
}
Expand Down

0 comments on commit ca3bb10

Please sign in to comment.