Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

go/worker/compute/executor: Propose promptly upon detecting discrepancy #5447

Merged
merged 5 commits into from
Nov 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changelog/5447.bugfix.2.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
go/runtime: Fix zombie channel pipe leak on runtime restarts

Pipes created by a call to channels.Unwrap spawned new goroutines
that were not terminated during runtime restarts. These zombie
pipes also intercepted one value from the newly created pipes,
causing them to block indefinitely.
1 change: 1 addition & 0 deletions .changelog/5447.bugfix.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
go/worker/compute/executor: Propose promptly upon detecting discrepancy
3 changes: 3 additions & 0 deletions go/consensus/cometbft/apps/roothash/transactions.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@
if err = commitment.VerifyExecutorCommitment(ctx, rtState.LastBlock, rtState.Runtime, rtState.Committee.ValidFor, &commit, msgGasAccountant, nl); err != nil { // nolint: gosec
ctx.Logger().Debug("failed to verify executor commitment",
"err", err,
"runtime_id", cc.ID,
"round", commit.Header.Header.Round,
)
return err
Expand All @@ -105,12 +106,14 @@
if err := rtState.CommitmentPool.AddVerifiedExecutorCommitment(rtState.Committee, &commit); err != nil { // nolint: gosec
ctx.Logger().Debug("failed to add executor commitment",
"err", err,
"runtime_id", cc.ID,

Check warning on line 109 in go/consensus/cometbft/apps/roothash/transactions.go

View check run for this annotation

Codecov / codecov/patch

go/consensus/cometbft/apps/roothash/transactions.go#L109

Added line #L109 was not covered by tests
"round", commit.Header.Header.Round,
)
return err
}

ctx.Logger().Debug("executor commitment added to pool",
"runtime_id", cc.ID,
"round", commit.Header.Header.Round,
"node_id", commit.NodeID,
"scheduler_id", commit.Header.SchedulerID,
Expand Down
2 changes: 1 addition & 1 deletion go/runtime/host/host.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ type Runtime interface {
Call(ctx context.Context, body *protocol.Body) (*protocol.Body, error)

// UpdateCapabilityTEE asks the runtime to update its CapabilityTEE with latest data.
UpdateCapabilityTEE(ctx context.Context) error
UpdateCapabilityTEE()

// WatchEvents subscribes to runtime status events.
WatchEvents(ctx context.Context) (<-chan *Event, pubsub.ClosableSubscription, error)
Expand Down
3 changes: 1 addition & 2 deletions go/runtime/host/mock/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,8 +185,7 @@
}

// Implements host.Runtime.
func (r *runtime) UpdateCapabilityTEE(context.Context) error {
return nil
func (r *runtime) UpdateCapabilityTEE() {

Check warning on line 188 in go/runtime/host/mock/mock.go

View check run for this annotation

Codecov / codecov/patch

go/runtime/host/mock/mock.go#L188

Added line #L188 was not covered by tests
}

// Implements host.Runtime.
Expand Down
8 changes: 3 additions & 5 deletions go/runtime/host/multi/multi.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,18 +207,16 @@
}

// UpdateCapabilityTEE implements host.Runtime.
func (agg *Aggregate) UpdateCapabilityTEE(ctx context.Context) error {
func (agg *Aggregate) UpdateCapabilityTEE() {

Check warning on line 210 in go/runtime/host/multi/multi.go

View check run for this annotation

Codecov / codecov/patch

go/runtime/host/multi/multi.go#L210

Added line #L210 was not covered by tests
agg.l.RLock()
defer agg.l.RUnlock()

var err error
if agg.active != nil {
err = errors.Join(err, agg.active.host.UpdateCapabilityTEE(ctx))
agg.active.host.UpdateCapabilityTEE()

Check warning on line 215 in go/runtime/host/multi/multi.go

View check run for this annotation

Codecov / codecov/patch

go/runtime/host/multi/multi.go#L215

Added line #L215 was not covered by tests
}
if agg.next != nil {
err = errors.Join(err, agg.next.host.UpdateCapabilityTEE(ctx))
agg.next.host.UpdateCapabilityTEE()

Check warning on line 218 in go/runtime/host/multi/multi.go

View check run for this annotation

Codecov / codecov/patch

go/runtime/host/multi/multi.go#L218

Added line #L218 was not covered by tests
}
return err
}

// WatchEvents implements host.Runtime.
Expand Down
36 changes: 15 additions & 21 deletions go/runtime/host/sandbox/sandbox.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
"time"

"github.com/cenkalti/backoff/v4"
"github.com/eapache/channels"

"github.com/oasisprotocol/oasis-core/go/common"
cmnBackoff "github.com/oasisprotocol/oasis-core/go/common/backoff"
Expand Down Expand Up @@ -82,15 +81,15 @@
id := cfg.Bundle.Manifest.ID

r := &sandboxedRuntime{
cfg: p.cfg,
rtCfg: cfg,
id: id,
stopCh: make(chan struct{}),
quitCh: make(chan struct{}),
ctrlCh: make(chan interface{}, ctrlChannelBufferSize),
notifier: pubsub.NewBroker(false),
notifyUpdateCapabilityTEE: channels.NewRingChannel(1),
logger: p.cfg.Logger.With("runtime_id", id),
cfg: p.cfg,
rtCfg: cfg,
id: id,
stopCh: make(chan struct{}),
quitCh: make(chan struct{}),
ctrlCh: make(chan interface{}, ctrlChannelBufferSize),
notifier: pubsub.NewBroker(false),
notifyUpdateCapabilityTEECh: make(chan struct{}, 1),
logger: p.cfg.Logger.With("runtime_id", id),
}

return r, nil
Expand Down Expand Up @@ -124,8 +123,8 @@
conn protocol.Connection
notifier *pubsub.Broker

notifyUpdateCapabilityTEE *channels.RingChannel
capabilityTEE *node.CapabilityTEE
notifyUpdateCapabilityTEECh chan struct{}
capabilityTEE *node.CapabilityTEE

logger *logging.Logger
}
Expand Down Expand Up @@ -193,13 +192,11 @@
}

// Implements host.Runtime.
func (r *sandboxedRuntime) UpdateCapabilityTEE(ctx context.Context) error {
func (r *sandboxedRuntime) UpdateCapabilityTEE() {

Check warning on line 195 in go/runtime/host/sandbox/sandbox.go

View check run for this annotation

Codecov / codecov/patch

go/runtime/host/sandbox/sandbox.go#L195

Added line #L195 was not covered by tests
select {
case <-ctx.Done():
return ctx.Err()
case r.notifyUpdateCapabilityTEE.In() <- struct{}{}:
case r.notifyUpdateCapabilityTEECh <- struct{}{}:
default:

Check warning on line 198 in go/runtime/host/sandbox/sandbox.go

View check run for this annotation

Codecov / codecov/patch

go/runtime/host/sandbox/sandbox.go#L197-L198

Added lines #L197 - L198 were not covered by tests
}
return nil
}

// Implements host.Runtime.
Expand Down Expand Up @@ -407,15 +404,12 @@
return fmt.Errorf("version mismatch (runtime reported: %s bundle: %s)", *rtVersion, bndVersion)
}

notifyUpdateCapabilityTEECh := make(chan struct{})
channels.Unwrap(r.notifyUpdateCapabilityTEE, notifyUpdateCapabilityTEECh)

hp := &HostInitializerParams{
Runtime: r,
Version: *rtVersion,
Process: p,
Connection: pc,
NotifyUpdateCapabilityTEE: notifyUpdateCapabilityTEECh,
NotifyUpdateCapabilityTEE: r.notifyUpdateCapabilityTEECh,
}

// Perform configuration-specific host initialization.
Expand Down
6 changes: 1 addition & 5 deletions go/runtime/registry/host.go
Original file line number Diff line number Diff line change
Expand Up @@ -926,11 +926,7 @@

n.logger.Debug("requesting the runtime to update CapabilityTEE")

if err = n.host.UpdateCapabilityTEE(n.ctx); err != nil {
n.logger.Error("failed to update runtime CapabilityTEE",
"err", err,
)
}
n.host.UpdateCapabilityTEE()

Check warning on line 929 in go/runtime/registry/host.go

View check run for this annotation

Codecov / codecov/patch

go/runtime/registry/host.go#L929

Added line #L929 was not covered by tests
lastAttestationUpdateHeight = height
lastAttestationUpdate = time.Now()
}
Expand Down
67 changes: 31 additions & 36 deletions go/worker/compute/executor/committee/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -1668,42 +1668,37 @@ func (n *Node) roundWorker(ctx context.Context) {
}
}

for {
select {
case <-ctx.Done():
n.logger.Debug("exiting round, context canceled")
return
case ev := <-n.evCh:
// Handle an event.
n.handleEvent(ctx, ev)
case txs := <-n.txCh:
// Check any queued transactions.
n.handleNewCheckedTransactions(txs)
case txs := <-n.missingTxCh:
// Missing transactions fetched.
n.handleMissingTransactions(txs)
case ec := <-n.ecCh:
// Process observed executor commitments.
n.handleObservedExecutorCommitment(ctx, ec)
continue
case batch := <-n.processedBatchCh:
// Batch processing has finished.
n.handleProcessedBatch(ctx, batch)
case <-schedulerRankTicker.C:
// Change scheduler rank and try again.
schedulerRank++
n.logger.Debug("scheduler rank has changed",
"rank", schedulerRank,
)
case <-flushTimer.C:
// Force scheduling for primary transaction scheduler.
n.logger.Debug("scheduling is now forced")
flush = true
case <-n.reselectCh:
// Try again.
}

break
select {
case <-ctx.Done():
n.logger.Debug("exiting round, context canceled")
return
case ev := <-n.evCh:
// Handle an event.
n.handleEvent(ctx, ev)
case txs := <-n.txCh:
// Check any queued transactions.
n.handleNewCheckedTransactions(txs)
case txs := <-n.missingTxCh:
// Missing transactions fetched.
n.handleMissingTransactions(txs)
case ec := <-n.ecCh:
// Process observed executor commitments.
n.handleObservedExecutorCommitment(ctx, ec)
case batch := <-n.processedBatchCh:
// Batch processing has finished.
n.handleProcessedBatch(ctx, batch)
case <-schedulerRankTicker.C:
// Change scheduler rank and try again.
schedulerRank++
n.logger.Debug("scheduler rank has changed",
"rank", schedulerRank,
)
case <-flushTimer.C:
// Force scheduling for primary transaction scheduler.
n.logger.Debug("scheduling is now forced")
flush = true
case <-n.reselectCh:
// Try again.
}
}
}
Expand Down
8 changes: 6 additions & 2 deletions go/worker/registration/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,11 @@ func (rp *roleProvider) SetAvailableWithCallback(hook RegisterNodeHook, cb Regis
rp.cb = cb
rp.Unlock()

rp.w.registerCh <- struct{}{}
// Notify worker that role provider has been updated.
select {
case rp.w.registerCh <- struct{}{}:
default:
}
}

func (rp *roleProvider) SetUnavailable() {
Expand Down Expand Up @@ -1079,7 +1083,7 @@ func New(
logger: logger,
consensus: consensus,
p2p: p2p,
registerCh: make(chan struct{}, 64),
registerCh: make(chan struct{}, 1),
}

w.storedDeregister = storedDeregister
Expand Down
Loading