Skip to content

Commit

Permalink
feat: make handshake cancelable (cometbft#857)
Browse files Browse the repository at this point in the history
it'll make the handshake work with graceful shutdown(see: cosmos/cosmos-sdk#16202)

handshake could be a long running process if there are many local blocks to replay, for example we use it to do profiling.

Hope we can backport this to 0.34.x.

---

#### PR checklist

- [ ] Tests written/updated
- [ ] Changelog entry added in `.changelog` (we use [unclog](https://github.com/informalsystems/unclog) to manage our changelog)
- [ ] Updated relevant documentation (`docs/` or `spec/`) and code comments
  • Loading branch information
yihuang committed Jun 21, 2023
1 parent fda9f51 commit 4a1c1df
Show file tree
Hide file tree
Showing 4 changed files with 48 additions and 6 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
- `[node]` Make handshake cancelable ([cometbft/cometbft\#857](https://github.com/cometbft/cometbft/pull/857))
31 changes: 27 additions & 4 deletions consensus/replay.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,9 +240,14 @@ func (h *Handshaker) NBlocks() int {

// TODO: retry the handshake/replay if it fails ?
func (h *Handshaker) Handshake(proxyApp proxy.AppConns) error {
return h.HandshakeWithContext(context.TODO(), proxyApp)
}

// HandshakeWithContext is cancellable version of Handshake
func (h *Handshaker) HandshakeWithContext(ctx context.Context, proxyApp proxy.AppConns) error {

// Handshake is done via ABCI Info on the query conn.
res, err := proxyApp.Query().Info(context.TODO(), proxy.RequestInfo)
res, err := proxyApp.Query().Info(ctx, proxy.RequestInfo)
if err != nil {
return fmt.Errorf("error calling Info: %v", err)
}
Expand All @@ -266,7 +271,7 @@ func (h *Handshaker) Handshake(proxyApp proxy.AppConns) error {
}

// Replay blocks up to the latest in the blockstore.
appHash, err = h.ReplayBlocks(h.initialState, appHash, blockHeight, proxyApp)
appHash, err = h.ReplayBlocksWithContext(ctx, h.initialState, appHash, blockHeight, proxyApp)
if err != nil {
return fmt.Errorf("error on replay: %v", err)
}
Expand All @@ -287,6 +292,17 @@ func (h *Handshaker) ReplayBlocks(
appHash []byte,
appBlockHeight int64,
proxyApp proxy.AppConns,
) ([]byte, error) {
return h.ReplayBlocksWithContext(context.TODO(), state, appHash, appBlockHeight, proxyApp)
}

// ReplayBlocksWithContext is cancellable version of ReplayBlocks.
func (h *Handshaker) ReplayBlocksWithContext(
ctx context.Context,
state sm.State,
appHash []byte,
appBlockHeight int64,
proxyApp proxy.AppConns,
) ([]byte, error) {
storeBlockBase := h.store.Base()
storeBlockHeight := h.store.Height()
Expand Down Expand Up @@ -391,7 +407,7 @@ func (h *Handshaker) ReplayBlocks(
// Either the app is asking for replay, or we're all synced up.
if appBlockHeight < storeBlockHeight {
// the app is behind, so replay blocks, but no need to go through WAL (state is already synced to store)
return h.replayBlocks(state, proxyApp, appBlockHeight, storeBlockHeight, false)
return h.replayBlocks(ctx, state, proxyApp, appBlockHeight, storeBlockHeight, false)

} else if appBlockHeight == storeBlockHeight {
// We're good!
Expand All @@ -406,7 +422,7 @@ func (h *Handshaker) ReplayBlocks(
case appBlockHeight < stateBlockHeight:
// the app is further behind than it should be, so replay blocks
// but leave the last block to go through the WAL
return h.replayBlocks(state, proxyApp, appBlockHeight, storeBlockHeight, true)
return h.replayBlocks(ctx, state, proxyApp, appBlockHeight, storeBlockHeight, true)

case appBlockHeight == stateBlockHeight:
// We haven't run Commit (both the state and app are one block behind),
Expand Down Expand Up @@ -443,6 +459,7 @@ func (h *Handshaker) ReplayBlocks(
}

func (h *Handshaker) replayBlocks(
ctx context.Context,
state sm.State,
proxyApp proxy.AppConns,
appBlockHeight,
Expand All @@ -469,6 +486,12 @@ func (h *Handshaker) replayBlocks(
firstBlock = state.InitialHeight
}
for i := firstBlock; i <= finalBlock; i++ {
select {
case <-ctx.Done():
return nil, ctx.Err()
default:
}

h.logger.Info("Applying block", "height", i)
block := h.store.LoadBlock(i)
// Extra check to ensure the app was not changed in a way it shouldn't have.
Expand Down
19 changes: 18 additions & 1 deletion node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,23 @@ func NewNode(config *cfg.Config,
metricsProvider MetricsProvider,
logger log.Logger,
options ...Option,
) (*Node, error) {
return NewNodeWithContext(context.TODO(), config, privValidator,
nodeKey, clientCreator, genesisDocProvider, dbProvider,
metricsProvider, logger, options...)
}

// NewNodeWithContext is cancellable version of NewNode.
func NewNodeWithContext(ctx context.Context,
config *cfg.Config,
privValidator types.PrivValidator,
nodeKey *p2p.NodeKey,
clientCreator proxy.ClientCreator,
genesisDocProvider GenesisDocProvider,
dbProvider cfg.DBProvider,
metricsProvider MetricsProvider,
logger log.Logger,
options ...Option,
) (*Node, error) {
blockStore, stateDB, err := initDBs(config, dbProvider)
if err != nil {
Expand Down Expand Up @@ -207,7 +224,7 @@ func NewNode(config *cfg.Config,
// and replays any blocks as necessary to sync CometBFT with the app.
consensusLogger := logger.With("module", "consensus")
if !stateSync {
if err := doHandshake(stateStore, state, blockStore, genDoc, eventBus, proxyApp, consensusLogger); err != nil {
if err := doHandshake(ctx, stateStore, state, blockStore, genDoc, eventBus, proxyApp, consensusLogger); err != nil {
return nil, err
}

Expand Down
3 changes: 2 additions & 1 deletion node/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,7 @@ func createAndStartIndexerService(
}

func doHandshake(
ctx context.Context,
stateStore sm.Store,
state sm.State,
blockStore sm.BlockStore,
Expand All @@ -178,7 +179,7 @@ func doHandshake(
handshaker := cs.NewHandshaker(stateStore, state, blockStore, genDoc)
handshaker.SetLogger(consensusLogger)
handshaker.SetEventBus(eventBus)
if err := handshaker.Handshake(proxyApp); err != nil {
if err := handshaker.HandshakeWithContext(ctx, proxyApp); err != nil {
return fmt.Errorf("error during handshake: %v", err)
}
return nil
Expand Down

0 comments on commit 4a1c1df

Please sign in to comment.