Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimize backup worker commit submission #5264

Merged
merged 3 commits into from
May 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 4 additions & 0 deletions .changelog/5264.feature.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
go/worker/compute: Optimize backup worker commit submission

Backup compute workers now observe any gossiped commitments and pre-empt
consensus when it is obvious that there will be a discrepancy declared.
37 changes: 36 additions & 1 deletion go/consensus/api/submission.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package api
import (
"context"
"fmt"
"sync"
"time"

"github.com/cenkalti/backoff/v4"
Expand Down Expand Up @@ -78,6 +79,9 @@ type submissionManager struct {
priceDiscovery PriceDiscovery
maxFee quantity.Quantity

noncesLock sync.Mutex
nonces map[staking.Address]uint64

logger *logging.Logger
}

Expand Down Expand Up @@ -131,12 +135,41 @@ func (m *submissionManager) EstimateGasAndSetFee(ctx context.Context, signer sig
return nil
}

func (m *submissionManager) getSignerNonce(ctx context.Context, signerAddr staking.Address) (uint64, error) {
m.noncesLock.Lock()
defer m.noncesLock.Unlock()

nonce, ok := m.nonces[signerAddr]
if !ok {
// Query latest nonce when one is not available.
var err error
nonce, err = m.backend.GetSignerNonce(ctx, &GetSignerNonceRequest{
AccountAddress: signerAddr,
Height: HeightLatest,
})
if err != nil {
return 0, err
}
}

m.nonces[signerAddr] = nonce + 1

return nonce, nil
}

func (m *submissionManager) clearSignerNonce(signerAddr staking.Address) {
m.noncesLock.Lock()
defer m.noncesLock.Unlock()

delete(m.nonces, signerAddr)
}

func (m *submissionManager) signAndSubmitTx(ctx context.Context, signer signature.Signer, tx *transaction.Transaction, withProof bool) (*transaction.SignedTransaction, *transaction.Proof, error) {
// Update transaction nonce.
var err error
signerAddr := staking.NewAddress(signer.Public())

tx.Nonce, err = m.backend.GetSignerNonce(ctx, &GetSignerNonceRequest{AccountAddress: signerAddr, Height: HeightLatest})
tx.Nonce, err = m.getSignerNonce(ctx, signerAddr)
if err != nil {
if errors.Is(err, ErrNoCommittedBlocks) {
// No committed blocks available, retry submission.
Expand Down Expand Up @@ -174,6 +207,7 @@ func (m *submissionManager) signAndSubmitTx(ctx context.Context, signer signatur
return nil, nil, err
case errors.Is(err, transaction.ErrInvalidNonce):
// Invalid nonce, retry submission.
m.clearSignerNonce(signerAddr)
m.logger.Debug("retrying transaction submission due to invalid nonce",
"account_address", signerAddr,
"nonce", tx.Nonce,
Expand Down Expand Up @@ -226,6 +260,7 @@ func NewSubmissionManager(backend ClientBackend, priceDiscovery PriceDiscovery,
sm := &submissionManager{
backend: backend,
priceDiscovery: priceDiscovery,
nonces: make(map[staking.Address]uint64),
logger: logging.GetLogger("consensus/submission"),
}
_ = sm.maxFee.FromUint64(maxFee)
Expand Down
9 changes: 7 additions & 2 deletions go/consensus/tendermint/apps/roothash/roothash.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/oasisprotocol/oasis-core/go/common/crypto/signature"
"github.com/oasisprotocol/oasis-core/go/common/logging"
"github.com/oasisprotocol/oasis-core/go/common/node"
"github.com/oasisprotocol/oasis-core/go/common/pubsub"
"github.com/oasisprotocol/oasis-core/go/consensus/api/transaction"
tmapi "github.com/oasisprotocol/oasis-core/go/consensus/tendermint/api"
governanceApi "github.com/oasisprotocol/oasis-core/go/consensus/tendermint/apps/governance/api"
Expand All @@ -38,6 +39,8 @@ var _ tmapi.Application = (*rootHashApplication)(nil)
type rootHashApplication struct {
state tmapi.ApplicationState
md tmapi.MessageDispatcher

ecNotifier *pubsub.Broker
}

func (app *rootHashApplication) Name() string {
Expand Down Expand Up @@ -818,6 +821,8 @@ func (app *rootHashApplication) tryFinalizeBlock(
}

// New constructs a new roothash application instance.
func New() tmapi.Application {
return &rootHashApplication{}
func New(ecNotifier *pubsub.Broker) tmapi.Application {
return &rootHashApplication{
ecNotifier: ecNotifier,
}
}
7 changes: 7 additions & 0 deletions go/consensus/tendermint/apps/roothash/transactions.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,13 @@ func (app *rootHashApplication) executorCommit(
cc *roothash.ExecutorCommit,
) (err error) {
if ctx.IsCheckOnly() {
// In case an executor commit notifier has been set up, push all commits into channel.
if app.ecNotifier != nil {
for _, ec := range cc.Commits {
ec := ec
peternose marked this conversation as resolved.
Show resolved Hide resolved
app.ecNotifier.Broadcast(&ec)
}
}
return nil
}

Expand Down
6 changes: 3 additions & 3 deletions go/consensus/tendermint/apps/roothash/transactions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ func TestMessagesGasEstimation(t *testing.T) {

// Create a test message dispatcher that fakes gas estimation.
var md testMsgDispatcher
app := rootHashApplication{appState, &md}
app := rootHashApplication{appState, &md, nil}

// Generate a private key for the single node in this test.
sk, err := memorySigner.NewSigner(rand.Reader)
Expand Down Expand Up @@ -502,7 +502,7 @@ func TestEvidence(t *testing.T) {
err = expiredCommitment2.Sign(sk, runtime.ID)
require.NoError(err, "expiredCommitment2.Sign")
var md testMsgDispatcher
app := rootHashApplication{appState, &md}
app := rootHashApplication{appState, &md, nil}

ctx = appState.NewContext(abciAPI.ContextDeliverTx, now)
defer ctx.Close()
Expand Down Expand Up @@ -654,7 +654,7 @@ func TestSubmitMsg(t *testing.T) {
defer ctx.Close()

var md testMsgDispatcher
app := rootHashApplication{appState, &md}
app := rootHashApplication{appState, &md, nil}

// Generate a private key for the caller.
skCaller, err := memorySigner.NewSigner(rand.Reader)
Expand Down
18 changes: 17 additions & 1 deletion go/consensus/tendermint/roothash/roothash.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
app "github.com/oasisprotocol/oasis-core/go/consensus/tendermint/apps/roothash"
"github.com/oasisprotocol/oasis-core/go/roothash/api"
"github.com/oasisprotocol/oasis-core/go/roothash/api/block"
"github.com/oasisprotocol/oasis-core/go/roothash/api/commitment"
"github.com/oasisprotocol/oasis-core/go/roothash/api/message"
runtimeRegistry "github.com/oasisprotocol/oasis-core/go/runtime/registry"
)
Expand Down Expand Up @@ -78,6 +79,8 @@ type serviceClient struct {
trackedRuntime map[common.Namespace]*trackedRuntime

pruneHandler *pruneHandler

ecNotifier *pubsub.Broker
}

// Implements api.Backend.
Expand Down Expand Up @@ -236,6 +239,15 @@ func (sc *serviceClient) WatchEvents(ctx context.Context, id common.Namespace) (
return ch, sub, nil
}

// Implements api.Backend.
func (sc *serviceClient) WatchExecutorCommitments(ctx context.Context) (<-chan *commitment.ExecutorCommitment, pubsub.ClosableSubscription, error) {
sub := sc.ecNotifier.Subscribe()
ch := make(chan *commitment.ExecutorCommitment)
sub.Unwrap(ch)

return ch, sub, nil
}

// Implements api.Backend.
func (sc *serviceClient) TrackRuntime(ctx context.Context, history api.BlockHistory) error {
sc.pruneHandler.trackRuntime(history)
Expand Down Expand Up @@ -851,8 +863,11 @@ func New(
dataDir string,
backend tmapi.Backend,
) (ServiceClient, error) {
// Create the general executor commitment notifier.
ecNotifier := pubsub.NewBroker(false)

// Initialize and register the tendermint service component.
a := app.New()
a := app.New(ecNotifier)
if err := backend.RegisterApplication(a); err != nil {
return nil, err
}
Expand All @@ -876,6 +891,7 @@ func New(
cmdCh: make(chan interface{}, runtimeRegistry.MaxRuntimeCount),
trackedRuntime: make(map[common.Namespace]*trackedRuntime),
pruneHandler: ph,
ecNotifier: ecNotifier,
}, nil
}

Expand Down
7 changes: 7 additions & 0 deletions go/roothash/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,13 @@ type Backend interface {
// WatchEvents returns a stream of protocol events.
WatchEvents(ctx context.Context, runtimeID common.Namespace) (<-chan *Event, pubsub.ClosableSubscription, error)

// WatchExecutorCommitments returns a channel that produces a stream of executor commitments
// observed in the consensus layer P2P network.
//
// Note that these commitments may not have been processed by consensus, commitments may be
// received in any order and duplicates are possible.
WatchExecutorCommitments(ctx context.Context) (<-chan *commitment.ExecutorCommitment, pubsub.ClosableSubscription, error)

// TrackRuntime adds a runtime the history of which should be tracked.
TrackRuntime(ctx context.Context, history BlockHistory) error

Expand Down
71 changes: 71 additions & 0 deletions go/roothash/api/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
cmnGrpc "github.com/oasisprotocol/oasis-core/go/common/grpc"
"github.com/oasisprotocol/oasis-core/go/common/pubsub"
"github.com/oasisprotocol/oasis-core/go/roothash/api/block"
"github.com/oasisprotocol/oasis-core/go/roothash/api/commitment"
"github.com/oasisprotocol/oasis-core/go/roothash/api/message"
)

Expand Down Expand Up @@ -39,6 +40,8 @@ var (
methodWatchBlocks = serviceName.NewMethod("WatchBlocks", common.Namespace{})
// methodWatchEvents is the WatchEvents method.
methodWatchEvents = serviceName.NewMethod("WatchEvents", common.Namespace{})
// methodWatchExecutorCommitments is the WatchExecutorCommitments method.
methodWatchExecutorCommitments = serviceName.NewMethod("WatchExecutorCommitments", nil)

// serviceDesc is the gRPC service descriptor.
serviceDesc = grpc.ServiceDesc{
Expand Down Expand Up @@ -93,6 +96,11 @@ var (
Handler: handlerWatchEvents,
ServerStreams: true,
},
{
StreamName: methodWatchExecutorCommitments.ShortName(),
Handler: handlerWatchExecutorCommitments,
ServerStreams: true,
},
},
}
)
Expand Down Expand Up @@ -362,6 +370,34 @@ func handlerWatchEvents(srv interface{}, stream grpc.ServerStream) error {
}
}

func handlerWatchExecutorCommitments(srv interface{}, stream grpc.ServerStream) error {
if err := stream.RecvMsg(nil); err != nil {
return err
}

ctx := stream.Context()
ch, sub, err := srv.(Backend).WatchExecutorCommitments(ctx)
if err != nil {
return err
}
defer sub.Close()

for {
select {
case ec, ok := <-ch:
if !ok {
return nil
}

if err := stream.SendMsg(ec); err != nil {
return err
}
case <-ctx.Done():
return ctx.Err()
}
}
}

// RegisterService registers a new roothash service with the given gRPC server.
func RegisterService(server *grpc.Server, service Backend) {
server.RegisterService(&serviceDesc, service)
Expand Down Expand Up @@ -520,6 +556,41 @@ func (c *roothashClient) WatchEvents(ctx context.Context, runtimeID common.Names
return ch, sub, nil
}

func (c *roothashClient) WatchExecutorCommitments(ctx context.Context) (<-chan *commitment.ExecutorCommitment, pubsub.ClosableSubscription, error) {
ctx, sub := pubsub.NewContextSubscription(ctx)

stream, err := c.conn.NewStream(ctx, &serviceDesc.Streams[1], methodWatchExecutorCommitments.FullName())
if err != nil {
return nil, nil, err
}
if err = stream.SendMsg(nil); err != nil {
return nil, nil, err
}
if err = stream.CloseSend(); err != nil {
return nil, nil, err
}

ch := make(chan *commitment.ExecutorCommitment)
go func() {
defer close(ch)

for {
var ec commitment.ExecutorCommitment
if serr := stream.RecvMsg(&ec); serr != nil {
return
}

select {
case ch <- &ec:
case <-ctx.Done():
return
}
}
}()

return ch, sub, nil
}

// NewRootHashClient creates a new gRPC roothash client service.
func NewRootHashClient(c *grpc.ClientConn) Backend {
return &roothashClient{
Expand Down