From 4a1c1df1c182e61876d75e4f850ce7d66408b2dc Mon Sep 17 00:00:00 2001 From: yihuang Date: Fri, 19 May 2023 17:53:08 +0800 Subject: [PATCH] feat: make handshake cancelable (#857) it'll make the handshake work with graceful shutdown(see: https://github.com/cosmos/cosmos-sdk/issues/16202) handshake could be a long running process if there are many local blocks to replay, for example we use it to do profiling. Hope we can backport this to 0.34.x. --- #### PR checklist - [ ] Tests written/updated - [ ] Changelog entry added in `.changelog` (we use [unclog](https://github.com/informalsystems/unclog) to manage our changelog) - [ ] Updated relevant documentation (`docs/` or `spec/`) and code comments --- .../857-make-handshake-cancelable.md | 1 + consensus/replay.go | 31 ++++++++++++++++--- node/node.go | 19 +++++++++++- node/setup.go | 3 +- 4 files changed, 48 insertions(+), 6 deletions(-) create mode 100644 .changelog/unreleased/improvements/857-make-handshake-cancelable.md diff --git a/.changelog/unreleased/improvements/857-make-handshake-cancelable.md b/.changelog/unreleased/improvements/857-make-handshake-cancelable.md new file mode 100644 index 0000000000..16b447f6d2 --- /dev/null +++ b/.changelog/unreleased/improvements/857-make-handshake-cancelable.md @@ -0,0 +1 @@ +- `[node]` Make handshake cancelable ([cometbft/cometbft\#857](https://github.com/cometbft/cometbft/pull/857)) diff --git a/consensus/replay.go b/consensus/replay.go index 6496693950..b8e457fa51 100644 --- a/consensus/replay.go +++ b/consensus/replay.go @@ -240,9 +240,14 @@ func (h *Handshaker) NBlocks() int { // TODO: retry the handshake/replay if it fails ? func (h *Handshaker) Handshake(proxyApp proxy.AppConns) error { + return h.HandshakeWithContext(context.TODO(), proxyApp) +} + +// HandshakeWithContext is cancellable version of Handshake +func (h *Handshaker) HandshakeWithContext(ctx context.Context, proxyApp proxy.AppConns) error { // Handshake is done via ABCI Info on the query conn. - res, err := proxyApp.Query().Info(context.TODO(), proxy.RequestInfo) + res, err := proxyApp.Query().Info(ctx, proxy.RequestInfo) if err != nil { return fmt.Errorf("error calling Info: %v", err) } @@ -266,7 +271,7 @@ func (h *Handshaker) Handshake(proxyApp proxy.AppConns) error { } // Replay blocks up to the latest in the blockstore. - appHash, err = h.ReplayBlocks(h.initialState, appHash, blockHeight, proxyApp) + appHash, err = h.ReplayBlocksWithContext(ctx, h.initialState, appHash, blockHeight, proxyApp) if err != nil { return fmt.Errorf("error on replay: %v", err) } @@ -287,6 +292,17 @@ func (h *Handshaker) ReplayBlocks( appHash []byte, appBlockHeight int64, proxyApp proxy.AppConns, +) ([]byte, error) { + return h.ReplayBlocksWithContext(context.TODO(), state, appHash, appBlockHeight, proxyApp) +} + +// ReplayBlocksWithContext is cancellable version of ReplayBlocks. +func (h *Handshaker) ReplayBlocksWithContext( + ctx context.Context, + state sm.State, + appHash []byte, + appBlockHeight int64, + proxyApp proxy.AppConns, ) ([]byte, error) { storeBlockBase := h.store.Base() storeBlockHeight := h.store.Height() @@ -391,7 +407,7 @@ func (h *Handshaker) ReplayBlocks( // Either the app is asking for replay, or we're all synced up. if appBlockHeight < storeBlockHeight { // the app is behind, so replay blocks, but no need to go through WAL (state is already synced to store) - return h.replayBlocks(state, proxyApp, appBlockHeight, storeBlockHeight, false) + return h.replayBlocks(ctx, state, proxyApp, appBlockHeight, storeBlockHeight, false) } else if appBlockHeight == storeBlockHeight { // We're good! @@ -406,7 +422,7 @@ func (h *Handshaker) ReplayBlocks( case appBlockHeight < stateBlockHeight: // the app is further behind than it should be, so replay blocks // but leave the last block to go through the WAL - return h.replayBlocks(state, proxyApp, appBlockHeight, storeBlockHeight, true) + return h.replayBlocks(ctx, state, proxyApp, appBlockHeight, storeBlockHeight, true) case appBlockHeight == stateBlockHeight: // We haven't run Commit (both the state and app are one block behind), @@ -443,6 +459,7 @@ func (h *Handshaker) ReplayBlocks( } func (h *Handshaker) replayBlocks( + ctx context.Context, state sm.State, proxyApp proxy.AppConns, appBlockHeight, @@ -469,6 +486,12 @@ func (h *Handshaker) replayBlocks( firstBlock = state.InitialHeight } for i := firstBlock; i <= finalBlock; i++ { + select { + case <-ctx.Done(): + return nil, ctx.Err() + default: + } + h.logger.Info("Applying block", "height", i) block := h.store.LoadBlock(i) // Extra check to ensure the app was not changed in a way it shouldn't have. diff --git a/node/node.go b/node/node.go index fa295a9459..71df9393a8 100644 --- a/node/node.go +++ b/node/node.go @@ -143,6 +143,23 @@ func NewNode(config *cfg.Config, metricsProvider MetricsProvider, logger log.Logger, options ...Option, +) (*Node, error) { + return NewNodeWithContext(context.TODO(), config, privValidator, + nodeKey, clientCreator, genesisDocProvider, dbProvider, + metricsProvider, logger, options...) +} + +// NewNodeWithContext is cancellable version of NewNode. +func NewNodeWithContext(ctx context.Context, + config *cfg.Config, + privValidator types.PrivValidator, + nodeKey *p2p.NodeKey, + clientCreator proxy.ClientCreator, + genesisDocProvider GenesisDocProvider, + dbProvider cfg.DBProvider, + metricsProvider MetricsProvider, + logger log.Logger, + options ...Option, ) (*Node, error) { blockStore, stateDB, err := initDBs(config, dbProvider) if err != nil { @@ -207,7 +224,7 @@ func NewNode(config *cfg.Config, // and replays any blocks as necessary to sync CometBFT with the app. consensusLogger := logger.With("module", "consensus") if !stateSync { - if err := doHandshake(stateStore, state, blockStore, genDoc, eventBus, proxyApp, consensusLogger); err != nil { + if err := doHandshake(ctx, stateStore, state, blockStore, genDoc, eventBus, proxyApp, consensusLogger); err != nil { return nil, err } diff --git a/node/setup.go b/node/setup.go index 39df1fba93..2acb8325d0 100644 --- a/node/setup.go +++ b/node/setup.go @@ -167,6 +167,7 @@ func createAndStartIndexerService( } func doHandshake( + ctx context.Context, stateStore sm.Store, state sm.State, blockStore sm.BlockStore, @@ -178,7 +179,7 @@ func doHandshake( handshaker := cs.NewHandshaker(stateStore, state, blockStore, genDoc) handshaker.SetLogger(consensusLogger) handshaker.SetEventBus(eventBus) - if err := handshaker.Handshake(proxyApp); err != nil { + if err := handshaker.HandshakeWithContext(ctx, proxyApp); err != nil { return fmt.Errorf("error during handshake: %v", err) } return nil