Skip to content

Commit

Permalink
add logic to propagate extended commits (#8433)
Browse files Browse the repository at this point in the history
  • Loading branch information
cmwaters committed Oct 24, 2022
1 parent c095798 commit 574fc51
Show file tree
Hide file tree
Showing 16 changed files with 345 additions and 161 deletions.
60 changes: 44 additions & 16 deletions blocksync/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,16 +185,20 @@ func (pool *BlockPool) IsCaughtUp() bool {
return isCaughtUp
}

// PeekTwoBlocks returns blocks at pool.height and pool.height+1.
// We need to see the second block's Commit to validate the first block.
// So we peek two blocks at a time.
// PeekTwoBlocks returns blocks at pool.height and pool.height+1. We need to
// see the second block's Commit to validate the first block. So we peek two
// blocks at a time. We return an extended commit, containing vote extensions
// and their associated signatures, as this is critical to consensus in ABCI++
// as we switch from block sync to consensus mode.
//
// The caller will verify the commit.
func (pool *BlockPool) PeekTwoBlocks() (first *types.Block, second *types.Block) {
func (pool *BlockPool) PeekTwoBlocks() (first, second *types.Block, firstExtCommit *types.ExtendedCommit) {
pool.mtx.Lock()
defer pool.mtx.Unlock()

if r := pool.requesters[pool.height]; r != nil {
first = r.getBlock()
firstExtCommit = r.getExtendedCommit()
}
if r := pool.requesters[pool.height+1]; r != nil {
second = r.getBlock()
Expand All @@ -203,7 +207,8 @@ func (pool *BlockPool) PeekTwoBlocks() (first *types.Block, second *types.Block)
}

// PopRequest pops the first block at pool.height.
// It must have been validated by 'second'.Commit from PeekTwoBlocks().
// It must have been validated by the second Commit from PeekTwoBlocks.
// TODO(thane): (?) and its corresponding ExtendedCommit.
func (pool *BlockPool) PopRequest() {
pool.mtx.Lock()
defer pool.mtx.Unlock()
Expand Down Expand Up @@ -240,12 +245,22 @@ func (pool *BlockPool) RedoRequest(height int64) p2p.ID {
return peerID
}

// AddBlock validates that the block comes from the peer it was expected from and calls the requester to store it.
// TODO: ensure that blocks come in order for each peer.
func (pool *BlockPool) AddBlock(peerID p2p.ID, block *types.Block, blockSize int) {
// AddBlock validates that the block comes from the peer it was expected from
// and calls the requester to store it.
//
// This requires an extended commit at the same height as the supplied block -
// the block contains the last commit, but we need the latest commit in case we
// need to switch over from block sync to consensus at this height. If the
// height of the extended commit and the height of the block do not match, we
// do not add the block and return an error.
func (pool *BlockPool) AddBlock(peerID p2p.ID, block *types.Block, extCommit *types.ExtendedCommit, blockSize int) error {
pool.mtx.Lock()
defer pool.mtx.Unlock()

if block.Height != extCommit.Height {
return fmt.Errorf("heights don't match, not adding block (block height: %d, commit height: %d)", block.Height, extCommit.Height)
}

requester := pool.requesters[block.Height]
if requester == nil {
pool.Logger.Info(
Expand All @@ -263,19 +278,22 @@ func (pool *BlockPool) AddBlock(peerID p2p.ID, block *types.Block, blockSize int
if diff > maxDiffBetweenCurrentAndReceivedBlockHeight {
pool.sendError(errors.New("peer sent us a block we didn't expect with a height too far ahead/behind"), peerID)
}
return
return fmt.Errorf("peer sent us a block we didn't expect (peer: %s, current height: %d, block height: %d)", peerID, pool.height, block.Height)
}

if requester.setBlock(block, peerID) {
if requester.setBlock(block, extCommit, peerID) {
atomic.AddInt32(&pool.numPending, -1)
peer := pool.peers[peerID]
if peer != nil {
peer.decrPending(blockSize)
}
} else {
pool.Logger.Info("invalid peer", "peer", peerID, "blockHeight", block.Height)
pool.sendError(errors.New("invalid peer"), peerID)
err := errors.New("requester is different or block already exists")
pool.sendError(err, peerID)
return fmt.Errorf("%w (peer: %s, requester: %s, block height: %d)", err, peerID, requester.getPeerID(), block.Height)
}

return nil
}

// MaxPeerHeight returns the highest reported height.
Expand Down Expand Up @@ -424,6 +442,7 @@ func (pool *BlockPool) debug() string {
} else {
str += fmt.Sprintf("H(%v):", h)
str += fmt.Sprintf("B?(%v) ", pool.requesters[h].block != nil)
str += fmt.Sprintf("C?(%v) ", pool.requesters[h].extCommit != nil)
}
}
return str
Expand Down Expand Up @@ -512,9 +531,10 @@ type bpRequester struct {
gotBlockCh chan struct{}
redoCh chan p2p.ID // redo may send multitime, add peerId to identify repeat

mtx tmsync.Mutex
peerID p2p.ID
block *types.Block
mtx tmsync.Mutex
peerID p2p.ID
block *types.Block
extCommit *types.ExtendedCommit
}

func newBPRequester(pool *BlockPool, height int64) *bpRequester {
Expand All @@ -537,13 +557,14 @@ func (bpr *bpRequester) OnStart() error {
}

// Returns true if the peer matches and block doesn't already exist.
func (bpr *bpRequester) setBlock(block *types.Block, peerID p2p.ID) bool {
func (bpr *bpRequester) setBlock(block *types.Block, extCommit *types.ExtendedCommit, peerID p2p.ID) bool {
bpr.mtx.Lock()
if bpr.block != nil || bpr.peerID != peerID {
bpr.mtx.Unlock()
return false
}
bpr.block = block
bpr.extCommit = extCommit
bpr.mtx.Unlock()

select {
Expand All @@ -559,6 +580,12 @@ func (bpr *bpRequester) getBlock() *types.Block {
return bpr.block
}

func (bpr *bpRequester) getExtendedCommit() *types.ExtendedCommit {
bpr.mtx.Lock()
defer bpr.mtx.Unlock()
return bpr.extCommit
}

func (bpr *bpRequester) getPeerID() p2p.ID {
bpr.mtx.Lock()
defer bpr.mtx.Unlock()
Expand All @@ -576,6 +603,7 @@ func (bpr *bpRequester) reset() {

bpr.peerID = ""
bpr.block = nil
bpr.extCommit = nil
}

// Tells bpRequester to pick another peer and try again.
Expand Down
9 changes: 6 additions & 3 deletions blocksync/pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,10 @@ func (p testPeer) runInputRoutine() {
// Request desired, pretend like we got the block immediately.
func (p testPeer) simulateInput(input inputData) {
block := &types.Block{Header: types.Header{Height: input.request.Height}}
input.pool.AddBlock(input.request.PeerID, block, 123)
extCommit := &types.ExtendedCommit{
Height: input.request.Height,
}
_ = input.pool.AddBlock(input.request.PeerID, block, extCommit, 123)
// TODO: uncommenting this creates a race which is detected by:
// https://github.com/golang/go/blob/2bd767b1022dd3254bcec469f0ee164024726486/src/testing/testing.go#L854-L856
// see: https://github.com/tendermint/tendermint/issues/3390#issue-418379890
Expand Down Expand Up @@ -112,7 +115,7 @@ func TestBlockPoolBasic(t *testing.T) {
if !pool.IsRunning() {
return
}
first, second := pool.PeekTwoBlocks()
first, second, _ := pool.PeekTwoBlocks()
if first != nil && second != nil {
pool.PopRequest()
} else {
Expand Down Expand Up @@ -171,7 +174,7 @@ func TestBlockPoolTimeout(t *testing.T) {
if !pool.IsRunning() {
return
}
first, second := pool.PeekTwoBlocks()
first, second, _ := pool.PeekTwoBlocks()
if first != nil && second != nil {
pool.PopRequest()
} else {
Expand Down
77 changes: 57 additions & 20 deletions blocksync/reactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ type Reactor struct {
initialState sm.State

blockExec *sm.BlockExecutor
store *store.BlockStore
store sm.BlockStore
pool *BlockPool
blockSync bool

Expand Down Expand Up @@ -172,34 +172,44 @@ func (bcR *Reactor) RemovePeer(peer p2p.Peer, reason interface{}) {
// respondToPeer loads a block and sends it to the requesting peer,
// if we have it. Otherwise, we'll respond saying we don't have it.
func (bcR *Reactor) respondToPeer(msg *bcproto.BlockRequest,
src p2p.Peer) (queued bool) {
src p2p.Peer) error {

block := bcR.store.LoadBlock(msg.Height)
if block != nil {
extCommit := bcR.store.LoadBlockExtendedCommit(msg.Height)
if extCommit == nil {
return fmt.Errorf("found block in store without extended commit: %v", block)
}
bl, err := block.ToProto()
if err != nil {
bcR.Logger.Error("could not convert msg to protobuf", "err", err)
return false
return fmt.Errorf("failed to convert block to protobuf: %w", err)
}

msgBytes, err := EncodeMsg(&bcproto.BlockResponse{Block: bl})
msgBytes, err := EncodeMsg(&bcproto.BlockResponse{
Block: bl,
ExtCommit: extCommit.ToProto(),
})
if err != nil {
bcR.Logger.Error("could not marshal msg", "err", err)
return false
return fmt.Errorf("could not marshal msg: %w", err)
}

return src.TrySend(BlocksyncChannel, msgBytes)
if !src.TrySend(BlocksyncChannel, msgBytes) {
return fmt.Errorf("unable to queue blocksync message at height %d to peer %v", msg.Height, src)
}
return nil
}

bcR.Logger.Info("Peer asking for a block we don't have", "src", src, "height", msg.Height)

msgBytes, err := EncodeMsg(&bcproto.NoBlockResponse{Height: msg.Height})
if err != nil {
bcR.Logger.Error("could not convert msg to protobuf", "err", err)
return false
return fmt.Errorf("could not convert msg to protobuf: %w", err)
}

return src.TrySend(BlocksyncChannel, msgBytes)
if !src.TrySend(BlocksyncChannel, msgBytes) {
return fmt.Errorf("unable to queue blocksync message at height %d to peer %v", msg.Height, src)
}
return nil
}

// Receive implements Reactor by handling 4 types of messages (look below).
Expand All @@ -223,20 +233,30 @@ func (bcR *Reactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) {
case *bcproto.BlockRequest:
bcR.respondToPeer(msg, src)
case *bcproto.BlockResponse:
bi, err := types.BlockFromProto(msg.Block)
block, err := types.BlockFromProto(msg.Block)
if err != nil {
bcR.Logger.Error("Block content is invalid", "err", err)
return
}
bcR.pool.AddBlock(src.ID(), bi, len(msgBytes))
extCommit, err := types.ExtendedCommitFromProto(msg.ExtCommit)
if err != nil {
bcR.Logger.Error("failed to convert extended commit from proto",
"peer", src,
"err", err)
return
}

if err := bcR.pool.AddBlock(src.ID(), block, extCommit, block.Size()); err != nil {
bcR.Logger.Error("failed to add block", "err", err)
}
case *bcproto.StatusRequest:
// Send peer our state.
msgBytes, err := EncodeMsg(&bcproto.StatusResponse{
Height: bcR.store.Height(),
Base: bcR.store.Base(),
})
if err != nil {
bcR.Logger.Error("could not convert msg to protobut", "err", err)
bcR.Logger.Error("could not convert msg to protobuf", "err", err)
return
}
src.TrySend(BlocksyncChannel, msgBytes)
Expand Down Expand Up @@ -317,7 +337,20 @@ FOR_LOOP:
outbound, inbound, _ := bcR.Switch.NumPeers()
bcR.Logger.Debug("Consensus ticker", "numPending", numPending, "total", lenRequesters,
"outbound", outbound, "inbound", inbound)
if bcR.pool.IsCaughtUp() {
switch {
// TODO(sergio) Might be needed for implementing the upgrading solution. Remove after that
//case state.LastBlockHeight > 0 && r.store.LoadBlockExtCommit(state.LastBlockHeight) == nil:
case state.LastBlockHeight > 0 && blocksSynced == 0:
// Having state-synced, we need to blocksync at least one block
bcR.Logger.Info(
"no seen commit yet",
"height", height,
"last_block_height", state.LastBlockHeight,
"initial_height", state.InitialHeight,
"max_peer_height", bcR.pool.MaxPeerHeight(),
)
continue FOR_LOOP
case bcR.pool.IsCaughtUp():
bcR.Logger.Info("Time to switch to consensus reactor!", "height", height)
if err := bcR.pool.Stop(); err != nil {
bcR.Logger.Error("Error stopping pool", "err", err)
Expand Down Expand Up @@ -349,10 +382,13 @@ FOR_LOOP:
// routine.

// See if there are any blocks to sync.
first, second := bcR.pool.PeekTwoBlocks()
// bcR.Logger.Info("TrySync peeked", "first", first, "second", second)
if first == nil || second == nil {
// We need both to sync the first block.
first, second, extCommit := bcR.pool.PeekTwoBlocks()
if first == nil || second == nil || extCommit == nil {
if first != nil && extCommit == nil {
// See https://github.com/tendermint/tendermint/pull/8433#discussion_r866790631
panic(fmt.Errorf("peeked first block without extended commit at height %d - possible node store corruption", first.Height))
}
// we need all to sync the first block
continue FOR_LOOP
} else {
// Try again quickly next loop.
Expand All @@ -372,6 +408,7 @@ FOR_LOOP:
// NOTE: we can probably make this more efficient, but note that calling
// first.Hash() doesn't verify the tx contents, so MakePartSet() is
// currently necessary.
// TODO(sergio): Should we also validate against the extended commit?
err = state.Validators.VerifyCommitLight(
chainID, firstID, first.Height, second.LastCommit)

Expand Down Expand Up @@ -402,7 +439,7 @@ FOR_LOOP:
bcR.pool.PopRequest()

// TODO: batch saves so we dont persist to disk every block
bcR.store.SaveBlock(first, firstParts, second.LastCommit)
bcR.store.SaveBlockWithExtendedCommit(first, firstParts, extCommit)

// TODO: same thing for app - but we would need a way to
// get the hash without persisting the state
Expand Down
48 changes: 26 additions & 22 deletions blocksync/reactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,40 +104,44 @@ func newReactor(
panic(err)
}

// The commit we are building for the current height.
seenExtCommit := &types.ExtendedCommit{}

// let's add some blocks in
for blockHeight := int64(1); blockHeight <= maxBlockHeight; blockHeight++ {
lastCommit := types.NewCommit(blockHeight-1, 0, types.BlockID{}, nil)
if blockHeight > 1 {
lastBlockMeta := blockStore.LoadBlockMeta(blockHeight - 1)
lastBlock := blockStore.LoadBlock(blockHeight - 1)

vote, err := types.MakeVote(
lastBlock.Header.Height,
lastBlockMeta.BlockID,
state.Validators,
privVals[0],
lastBlock.Header.ChainID,
time.Now(),
)
if err != nil {
panic(err)
}
lastCommit = types.NewCommit(vote.Height, vote.Round,
lastBlockMeta.BlockID, []types.CommitSig{vote.CommitSig()})
}

lastCommit := seenExtCommit.Clone().ToCommit()
thisBlock := state.MakeBlock(blockHeight, nil, lastCommit, nil, state.Validators.Proposer.Address)

thisParts, err := thisBlock.MakePartSet(types.BlockPartSizeBytes)
require.NoError(t, err)
blockID := types.BlockID{Hash: thisBlock.Hash(), PartSetHeader: thisParts.Header()}

vote, err := types.MakeVote(
thisBlock.Header.Height,
blockID,
state.Validators,
privVals[0],
thisBlock.Header.ChainID,
time.Now(),
)
if err != nil {
panic(err)
}
seenExtCommit = &types.ExtendedCommit{
Height: vote.Height,
Round: vote.Round,
BlockID: blockID,
ExtendedSignatures: []types.ExtendedCommitSig{vote.ExtendedCommitSig()},
}

blockStore.SaveBlockWithExtendedCommit(thisBlock, thisParts, seenExtCommit)
state, _, err = blockExec.ApplyBlock(state, blockID, thisBlock)
if err != nil {
panic(fmt.Errorf("error apply block: %w", err))
}

blockStore.SaveBlock(thisBlock, thisParts, lastCommit)
if err = stateStore.Save(state); err != nil {
panic(err)
}
}

bcReactor := NewReactor(state.Copy(), blockExec, blockStore, fastSync)
Expand Down
Loading

0 comments on commit 574fc51

Please sign in to comment.