Skip to content

Commit

Permalink
BlockStore holds extended commit
Browse files Browse the repository at this point in the history
  • Loading branch information
sergio-mena committed Apr 15, 2022
1 parent 4ebd084 commit 8d504d4
Show file tree
Hide file tree
Showing 13 changed files with 506 additions and 108 deletions.
32 changes: 24 additions & 8 deletions internal/blocksync/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,12 +208,13 @@ func (pool *BlockPool) IsCaughtUp() bool {
// We need to see the second block's Commit to validate the first block.
// So we peek two blocks at a time.
// The caller will verify the commit.
func (pool *BlockPool) PeekTwoBlocks() (first *types.Block, second *types.Block) {
func (pool *BlockPool) PeekTwoBlocks() (first *types.Block, second *types.Block, firstExtCommit *types.ExtendedCommit) {
pool.mtx.RLock()
defer pool.mtx.RUnlock()

if r := pool.requesters[pool.height]; r != nil {
first = r.getBlock()
firstExtCommit = r.getExtendedCommit()
}
if r := pool.requesters[pool.height+1]; r != nil {
second = r.getBlock()
Expand All @@ -222,7 +223,7 @@ func (pool *BlockPool) PeekTwoBlocks() (first *types.Block, second *types.Block)
}

// PopRequest pops the first block at pool.height.
// It must have been validated by 'second'.Commit from PeekTwoBlocks().
// It must have been validated by 'second'.Commit from PeekTwoBlocks(), TODO: (?) and its corresponding ExtendedCommit.
func (pool *BlockPool) PopRequest() {
pool.mtx.Lock()
defer pool.mtx.Unlock()
Expand Down Expand Up @@ -268,10 +269,15 @@ func (pool *BlockPool) RedoRequest(height int64) types.NodeID {

// AddBlock validates that the block comes from the peer it was expected from and calls the requester to store it.
// TODO: ensure that blocks come in order for each peer.
func (pool *BlockPool) AddBlock(peerID types.NodeID, block *types.Block, blockSize int) {
func (pool *BlockPool) AddBlock(peerID types.NodeID, block *types.Block, extCommit *types.ExtendedCommit, blockSize int) {
pool.mtx.Lock()
defer pool.mtx.Unlock()

if block.Height != extCommit.Height {
pool.logger.Error("heights don't match, not adding block", "block_height", block.Height, "commit_height", extCommit.Height)
return
}

requester := pool.requesters[block.Height]
if requester == nil {
pool.logger.Error("peer sent us a block we didn't expect",
Expand All @@ -286,7 +292,7 @@ func (pool *BlockPool) AddBlock(peerID types.NodeID, block *types.Block, blockSi
return
}

if requester.setBlock(block, peerID) {
if requester.setBlock(block, extCommit, peerID) {
atomic.AddInt32(&pool.numPending, -1)
peer := pool.peers[peerID]
if peer != nil {
Expand Down Expand Up @@ -460,6 +466,7 @@ func (pool *BlockPool) debug() string {
} else {
str += fmt.Sprintf("H(%v):", h)
str += fmt.Sprintf("B?(%v) ", pool.requesters[h].block != nil)
str += fmt.Sprintf("C?(%v) ", pool.requesters[h].extCommit != nil)
}
}
return str
Expand Down Expand Up @@ -548,9 +555,10 @@ type bpRequester struct {
gotBlockCh chan struct{}
redoCh chan types.NodeID // redo may send multitime, add peerId to identify repeat

mtx sync.Mutex
peerID types.NodeID
block *types.Block
mtx sync.Mutex
peerID types.NodeID
block *types.Block
extCommit *types.ExtendedCommit
}

func newBPRequester(logger log.Logger, pool *BlockPool, height int64) *bpRequester {
Expand All @@ -576,13 +584,14 @@ func (bpr *bpRequester) OnStart(ctx context.Context) error {
func (*bpRequester) OnStop() {}

// Returns true if the peer matches and block doesn't already exist.
func (bpr *bpRequester) setBlock(block *types.Block, peerID types.NodeID) bool {
func (bpr *bpRequester) setBlock(block *types.Block, extCommit *types.ExtendedCommit, peerID types.NodeID) bool {
bpr.mtx.Lock()
if bpr.block != nil || bpr.peerID != peerID {
bpr.mtx.Unlock()
return false
}
bpr.block = block
bpr.extCommit = extCommit
bpr.mtx.Unlock()

select {
Expand All @@ -598,6 +607,12 @@ func (bpr *bpRequester) getBlock() *types.Block {
return bpr.block
}

func (bpr *bpRequester) getExtendedCommit() *types.ExtendedCommit {
bpr.mtx.Lock()
defer bpr.mtx.Unlock()
return bpr.extCommit
}

func (bpr *bpRequester) getPeerID() types.NodeID {
bpr.mtx.Lock()
defer bpr.mtx.Unlock()
Expand All @@ -615,6 +630,7 @@ func (bpr *bpRequester) reset() {

bpr.peerID = ""
bpr.block = nil
bpr.extCommit = nil
}

// Tells bpRequester to pick another peer and try again.
Expand Down
42 changes: 33 additions & 9 deletions internal/blocksync/reactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ type Reactor struct {
stateStore sm.Store

blockExec *sm.BlockExecutor
store *store.BlockStore
store sm.BlockStore
pool *BlockPool
consReactor consensusReactor
blockSync *atomicBool
Expand Down Expand Up @@ -186,15 +186,24 @@ func (r *Reactor) OnStop() {
func (r *Reactor) respondToPeer(ctx context.Context, msg *bcproto.BlockRequest, peerID types.NodeID, blockSyncCh *p2p.Channel) error {
block := r.store.LoadBlock(msg.Height)
if block != nil {
extCommit := r.store.LoadBlockExtCommit(msg.Height)
if extCommit == nil {
r.logger.Error("peer requesting a block; we have the block but not its extended commit (%v)", block)
return fmt.Errorf("blockstore has block but not extended commit %v", block)
}
blockProto, err := block.ToProto()
if err != nil {
r.logger.Error("failed to convert msg to protobuf", "err", err)
r.logger.Error("failed to convert block to protobuf", "err", err)
return err
}
extCommitProto := extCommit.ToProto()

return blockSyncCh.Send(ctx, p2p.Envelope{
To: peerID,
Message: &bcproto.BlockResponse{Block: blockProto},
To: peerID,
Message: &bcproto.BlockResponse{
Block: blockProto,
ExtCommit: extCommitProto,
},
})
}

Expand Down Expand Up @@ -236,8 +245,15 @@ func (r *Reactor) handleMessage(ctx context.Context, chID p2p.ChannelID, envelop
"err", err)
return err
}
extCommit, err := types.ExtendedCommitFromProto(msg.ExtCommit)
if err != nil {
r.logger.Error("failed to convert extended commit from proto",
"peer", envelope.From,
"err", err)
return err
}

r.pool.AddBlock(envelope.From, block, block.Size())
r.pool.AddBlock(envelope.From, block, extCommit, block.Size())

case *bcproto.StatusRequest:
return blockSyncCh.Send(ctx, p2p.Envelope{
Expand Down Expand Up @@ -448,6 +464,10 @@ func (r *Reactor) poolRoutine(ctx context.Context, stateSynced bool, blockSyncCh
)

switch {
case r.pool.startHeight > state.InitialHeight && blocksSynced == 0:
//If we have state-synced, we need to blocksync at least one block
continue

case r.pool.IsCaughtUp():
r.logger.Info("switching to consensus reactor", "height", height)

Expand Down Expand Up @@ -490,9 +510,12 @@ func (r *Reactor) poolRoutine(ctx context.Context, stateSynced bool, blockSyncCh
// TODO: Uncouple from request routine.

// see if there are any blocks to sync
first, second := r.pool.PeekTwoBlocks()
if first == nil || second == nil {
// we need both to sync the first block
first, second, extCommit := r.pool.PeekTwoBlocks()
if first == nil || second == nil || extCommit == nil {
if first != nil && extCommit == nil {
r.logger.Error("peeked a block without extended commit", "height", first.Height)
}
// we need all to sync the first block
continue
} else {
// try again quickly next loop
Expand All @@ -517,6 +540,7 @@ func (r *Reactor) poolRoutine(ctx context.Context, stateSynced bool, blockSyncCh
// NOTE: We can probably make this more efficient, but note that calling
// first.Hash() doesn't verify the tx contents, so MakePartSet() is
// currently necessary.
// TODO Should we also validate against the extended commit?
if err = state.Validators.VerifyCommitLight(chainID, firstID, first.Height, second.LastCommit); err != nil {
err = fmt.Errorf("invalid last commit: %w", err)
r.logger.Error(
Expand Down Expand Up @@ -549,7 +573,7 @@ func (r *Reactor) poolRoutine(ctx context.Context, stateSynced bool, blockSyncCh
r.pool.PopRequest()

// TODO: batch saves so we do not persist to disk every block
r.store.SaveBlock(first, firstParts, second.LastCommit)
r.store.SaveBlock(first, firstParts, extCommit)

var err error

Expand Down
6 changes: 3 additions & 3 deletions internal/consensus/reactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -794,10 +794,10 @@ func (r *Reactor) gossipVotesRoutine(ctx context.Context, ps *PeerState, voteCh
// catchup logic -- if peer is lagging by more than 1, send Commit
blockStoreBase := r.state.blockStore.Base()
if blockStoreBase > 0 && prs.Height != 0 && rs.Height >= prs.Height+2 && prs.Height >= blockStoreBase {
// Load the block commit for prs.Height, which contains precommit
// Load the block's extended commit for prs.Height, which contains precommit
// signatures for prs.Height.
if commit := r.state.blockStore.LoadBlockCommit(prs.Height); commit != nil {
if ok, err := r.pickSendVote(ctx, ps, commit, voteCh); err != nil {
if extCommit := r.state.blockStore.LoadBlockExtCommit(prs.Height); extCommit != nil {
if ok, err := r.pickSendVote(ctx, ps, extCommit, voteCh); err != nil {
return
} else if ok {
logger.Debug("picked Catchup commit to send", "height", prs.Height)
Expand Down
22 changes: 9 additions & 13 deletions internal/consensus/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -696,19 +696,15 @@ func (cs *State) sendInternalMessage(ctx context.Context, mi msgInfo) {
// Reconstruct LastCommit from SeenCommit, which we saved along with the block,
// (which happens even before saving the state)
func (cs *State) reconstructLastCommit(state sm.State) {
commit := cs.blockStore.LoadSeenCommit()
if commit == nil || commit.Height != state.LastBlockHeight {
commit = cs.blockStore.LoadBlockCommit(state.LastBlockHeight)
}

if commit == nil {
extCommit := cs.blockStore.LoadBlockExtCommit(state.LastBlockHeight)
if extCommit == nil {
panic(fmt.Sprintf(
"failed to reconstruct last commit; commit for height %v not found",
state.LastBlockHeight,
))
}

lastPrecommits := types.CommitToVoteSet(state.ChainID, commit, state.LastValidators)
lastPrecommits := extCommit.ToVoteSet(state.ChainID, state.LastValidators)
if !lastPrecommits.HasTwoThirdsMajority() {
panic("failed to reconstruct last commit; does not have +2/3 maj")
}
Expand Down Expand Up @@ -1401,16 +1397,17 @@ func (cs *State) createProposalBlock(ctx context.Context) (*types.Block, error)
return nil, errors.New("entered createProposalBlock with privValidator being nil")
}

var commit *types.Commit
//TODO: wouldn't it be easier if CreateProposalBlock accepted cs.LastCommit directly?
var extCommit *types.ExtendedCommit
switch {
case cs.Height == cs.state.InitialHeight:
// We're creating a proposal for the first block.
// The commit is empty, but not nil.
commit = types.NewCommit(0, 0, types.BlockID{}, nil)
extCommit = types.NewExtendedCommit(0, 0, types.BlockID{}, nil)

case cs.LastCommit.HasTwoThirdsMajority():
// Make the commit from LastCommit
commit = cs.LastCommit.MakeCommit()
extCommit = cs.LastCommit.MakeExtendedCommit()

default: // This shouldn't happen.
cs.logger.Error("propose step; cannot propose anything without commit for the previous block")
Expand All @@ -1426,7 +1423,7 @@ func (cs *State) createProposalBlock(ctx context.Context) (*types.Block, error)

proposerAddr := cs.privValidatorPubKey.Address()

ret, err := cs.blockExec.CreateProposalBlock(ctx, cs.Height, cs.state, commit, proposerAddr, cs.LastCommit.GetVotes())
ret, err := cs.blockExec.CreateProposalBlock(ctx, cs.Height, cs.state, extCommit, proposerAddr)
if err != nil {
panic(err)
}
Expand Down Expand Up @@ -1924,8 +1921,7 @@ func (cs *State) finalizeCommit(ctx context.Context, height int64) {
// NOTE: the seenCommit is local justification to commit this block,
// but may differ from the LastCommit included in the next block
precommits := cs.Votes.Precommits(cs.CommitRound)
seenCommit := precommits.MakeCommit()
cs.blockStore.SaveBlock(block, blockParts, seenCommit)
cs.blockStore.SaveBlock(block, blockParts, precommits.MakeExtendedCommit())
} else {
// Happens during replay if we already saved the block but didn't commit
logger.Debug("calling finalizeCommit on already stored block", "height", block.Height)
Expand Down
48 changes: 16 additions & 32 deletions internal/state/execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (

abciclient "github.com/tendermint/tendermint/abci/client"
abci "github.com/tendermint/tendermint/abci/types"
"github.com/tendermint/tendermint/crypto"
"github.com/tendermint/tendermint/crypto/encoding"
"github.com/tendermint/tendermint/crypto/merkle"
"github.com/tendermint/tendermint/internal/eventbus"
Expand Down Expand Up @@ -87,9 +86,8 @@ func (blockExec *BlockExecutor) CreateProposalBlock(
ctx context.Context,
height int64,
state State,
commit *types.Commit,
extCommit *types.ExtendedCommit,
proposerAddr []byte,
votes []*types.Vote,
) (*types.Block, error) {

maxBytes := state.ConsensusParams.Block.MaxBytes
Expand All @@ -101,15 +99,15 @@ func (blockExec *BlockExecutor) CreateProposalBlock(
maxDataBytes := types.MaxDataBytes(maxBytes, evSize, state.Validators.Size())

txs := blockExec.mempool.ReapMaxBytesMaxGas(maxDataBytes, maxGas)
commit := extCommit.StripExtensions()
block := state.MakeBlock(height, txs, commit, evidence, proposerAddr)

localLastCommit := buildLastCommitInfo(block, blockExec.store, state.InitialHeight)
rpp, err := blockExec.appClient.PrepareProposal(
ctx,
abci.RequestPrepareProposal{
MaxTxBytes: maxDataBytes,
Txs: block.Txs.ToSliceOfBytes(),
LocalLastCommit: extendedCommitInfo(localLastCommit, votes),
LocalLastCommit: extendedCommitInfo(*extCommit),
ByzantineValidators: block.Evidence.ToABCI(),
Height: block.Height,
Time: block.Time,
Expand Down Expand Up @@ -422,43 +420,29 @@ func buildLastCommitInfo(block *types.Block, store Store, initialHeight int64) a
}
}

//TODO reword
// extendedCommitInfo expects a CommitInfo struct along with all of the
// original votes relating to that commit, including their vote extensions. The
// order of votes does not matter.
func extendedCommitInfo(c abci.CommitInfo, votes []*types.Vote) abci.ExtendedCommitInfo {
if len(c.Votes) != len(votes) {
panic(fmt.Sprintf("extendedCommitInfo: number of votes from commit differ from the number of votes supplied (%d != %d)", len(c.Votes), len(votes)))
}
votesByVal := make(map[string]*types.Vote)
for _, vote := range votes {
if vote != nil {
valAddr := vote.ValidatorAddress.String()
if _, ok := votesByVal[valAddr]; ok {
panic(fmt.Sprintf("extendedCommitInfo: found duplicate vote for validator with address %s", valAddr))
}
votesByVal[valAddr] = vote
}
}
vs := make([]abci.ExtendedVoteInfo, len(c.Votes))
for i := range vs {
func extendedCommitInfo(extCommit types.ExtendedCommit) abci.ExtendedCommitInfo {
vs := make([]abci.ExtendedVoteInfo, len(extCommit.ExtendedSignatures))
for i, ecs := range extCommit.ExtendedSignatures {
var ext []byte
// votes[i] will be nil if c.Votes[i].SignedLastBlock is false
if c.Votes[i].SignedLastBlock {
valAddr := crypto.Address(c.Votes[i].Validator.Address).String()
vote, ok := votesByVal[valAddr]
if !ok || vote == nil {
panic(fmt.Sprintf("extendedCommitInfo: validator with address %s signed last block, but could not find vote for it", valAddr))
}
ext = vote.Extension
if ecs.ForBlock() {
ext = ecs.VoteExtension
}
vs[i] = abci.ExtendedVoteInfo{
Validator: c.Votes[i].Validator,
SignedLastBlock: c.Votes[i].SignedLastBlock,
Validator: abci.Validator{
Address: ecs.ValidatorAddress,
//TODO: Important: why do we need power here?
Power: 0,
},
SignedLastBlock: ecs.ForBlock(),
VoteExtension: ext,
}
}
return abci.ExtendedCommitInfo{
Round: c.Round,
Round: extCommit.Round,
Votes: vs,
}
}
Expand Down
3 changes: 2 additions & 1 deletion internal/state/services.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ type BlockStore interface {
LoadBlockMeta(height int64) *types.BlockMeta
LoadBlock(height int64) *types.Block

SaveBlock(block *types.Block, blockParts *types.PartSet, seenCommit *types.Commit)
SaveBlock(block *types.Block, blockParts *types.PartSet, seenCommit *types.ExtendedCommit)

PruneBlocks(height int64) (uint64, error)

Expand All @@ -36,6 +36,7 @@ type BlockStore interface {

LoadBlockCommit(height int64) *types.Commit
LoadSeenCommit() *types.Commit
LoadBlockExtCommit(height int64) *types.ExtendedCommit
}

//-----------------------------------------------------------------------------
Expand Down
Loading

0 comments on commit 8d504d4

Please sign in to comment.