Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce EventValidBlock for informing peers about wanted block #2652

Merged
merged 5 commits into from
Oct 31, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG_PENDING.md
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,8 @@ Friendly reminder, we have a [bug bounty program](https://hackerone.com/tendermi
timeoutPrecommit before starting next round
- [consensus] [\#1745](https://github.com/tendermint/tendermint/issues/1745) Wait for
Proposal or timeoutProposal before entering prevote
- [consensus] [\#2583](https://github.com/tendermint/tendermint/issues/2583) ensure valid
block property with faulty proposer
- [consensus] [\#2642](https://github.com/tendermint/tendermint/issues/2642) Only propose ValidBlock, not LockedBlock
- [consensus] [\#2642](https://github.com/tendermint/tendermint/issues/2642) Initialized ValidRound and LockedRound to -1
- [consensus] [\#1637](https://github.com/tendermint/tendermint/issues/1637) Limit the amount of evidence that can be included in a
Expand Down
2 changes: 1 addition & 1 deletion consensus/byzantine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,7 @@ func (br *ByzantineReactor) AddPeer(peer p2p.Peer) {
// Send our state to peer.
// If we're fast_syncing, broadcast a RoundStepMessage later upon SwitchToConsensus().
if !br.reactor.fastSync {
br.reactor.sendNewRoundStepMessages(peer)
br.reactor.sendNewRoundStepMessage(peer)
}
}
func (br *ByzantineReactor) RemovePeer(peer p2p.Peer, reason interface{}) {
Expand Down
5 changes: 5 additions & 0 deletions consensus/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -420,6 +420,11 @@ func ensureNewProposal(proposalCh <-chan interface{}, height int64, round int) {
"Timeout expired while waiting for NewProposal event")
}

func ensureNewValidBlock(validBlockCh <-chan interface{}, height int64, round int) {
ensureNewEvent(validBlockCh, height, round, ensureTimeout,
"Timeout expired while waiting for NewValidBlock event")
}

func ensureNewBlock(blockCh <-chan interface{}, height int64) {
select {
case <-time.After(ensureTimeout):
Expand Down
87 changes: 50 additions & 37 deletions consensus/reactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ func (conR *ConsensusReactor) AddPeer(peer p2p.Peer) {
// Send our state to peer.
// If we're fast_syncing, broadcast a RoundStepMessage later upon SwitchToConsensus().
if !conR.FastSync() {
conR.sendNewRoundStepMessages(peer)
conR.sendNewRoundStepMessage(peer)
}
}

Expand Down Expand Up @@ -215,8 +215,8 @@ func (conR *ConsensusReactor) Receive(chID byte, src p2p.Peer, msgBytes []byte)
switch msg := msg.(type) {
case *NewRoundStepMessage:
ps.ApplyNewRoundStepMessage(msg)
case *CommitStepMessage:
ps.ApplyCommitStepMessage(msg)
case *NewValidBlockMessage:
ps.ApplyNewValidBlockMessage(msg)
case *HasVoteMessage:
ps.ApplyHasVoteMessage(msg)
case *VoteSetMaj23Message:
Expand Down Expand Up @@ -365,7 +365,12 @@ func (conR *ConsensusReactor) subscribeToBroadcastEvents() {
const subscriber = "consensus-reactor"
conR.conS.evsw.AddListenerForEvent(subscriber, types.EventNewRoundStep,
func(data tmevents.EventData) {
conR.broadcastNewRoundStepMessages(data.(*cstypes.RoundState))
conR.broadcastNewRoundStepMessage(data.(*cstypes.RoundState))
})

conR.conS.evsw.AddListenerForEvent(subscriber, types.EventValidBlock,
func(data tmevents.EventData) {
conR.broadcastNewValidBlockMessage(data.(*cstypes.RoundState))
})

conR.conS.evsw.AddListenerForEvent(subscriber, types.EventVote,
Expand All @@ -391,14 +396,20 @@ func (conR *ConsensusReactor) broadcastProposalHeartbeatMessage(hb *types.Heartb
conR.Switch.Broadcast(StateChannel, cdc.MustMarshalBinaryBare(msg))
}

func (conR *ConsensusReactor) broadcastNewRoundStepMessages(rs *cstypes.RoundState) {
nrsMsg, csMsg := makeRoundStepMessages(rs)
if nrsMsg != nil {
conR.Switch.Broadcast(StateChannel, cdc.MustMarshalBinaryBare(nrsMsg))
}
if csMsg != nil {
conR.Switch.Broadcast(StateChannel, cdc.MustMarshalBinaryBare(csMsg))
func (conR *ConsensusReactor) broadcastNewRoundStepMessage(rs *cstypes.RoundState) {
nrsMsg := makeRoundStepMessage(rs)
conR.Switch.Broadcast(StateChannel, cdc.MustMarshalBinaryBare(nrsMsg))
}

func (conR *ConsensusReactor) broadcastNewValidBlockMessage(rs *cstypes.RoundState) {
csMsg := &NewValidBlockMessage{
Height: rs.Height,
Round: rs.Round,
BlockPartsHeader: rs.ProposalBlockParts.Header(),
BlockParts: rs.ProposalBlockParts.BitArray(),
IsCommit: rs.Step == cstypes.RoundStepCommit,
}
conR.Switch.Broadcast(StateChannel, cdc.MustMarshalBinaryBare(csMsg))
}

// Broadcasts HasVoteMessage to peers that care.
Expand Down Expand Up @@ -427,33 +438,21 @@ func (conR *ConsensusReactor) broadcastHasVoteMessage(vote *types.Vote) {
*/
}

func makeRoundStepMessages(rs *cstypes.RoundState) (nrsMsg *NewRoundStepMessage, csMsg *CommitStepMessage) {
func makeRoundStepMessage(rs *cstypes.RoundState) (nrsMsg *NewRoundStepMessage) {
nrsMsg = &NewRoundStepMessage{
Height: rs.Height,
Round: rs.Round,
Step: rs.Step,
Height: rs.Height,
Round: rs.Round,
Step: rs.Step,
SecondsSinceStartTime: int(time.Since(rs.StartTime).Seconds()),
LastCommitRound: rs.LastCommit.Round(),
}
if rs.Step == cstypes.RoundStepCommit {
csMsg = &CommitStepMessage{
Height: rs.Height,
BlockPartsHeader: rs.ProposalBlockParts.Header(),
BlockParts: rs.ProposalBlockParts.BitArray(),
}
}
return
}

func (conR *ConsensusReactor) sendNewRoundStepMessages(peer p2p.Peer) {
func (conR *ConsensusReactor) sendNewRoundStepMessage(peer p2p.Peer) {
rs := conR.conS.GetRoundState()
nrsMsg, csMsg := makeRoundStepMessages(rs)
if nrsMsg != nil {
peer.Send(StateChannel, cdc.MustMarshalBinaryBare(nrsMsg))
}
if csMsg != nil {
peer.Send(StateChannel, cdc.MustMarshalBinaryBare(csMsg))
}
nrsMsg := makeRoundStepMessage(rs)
peer.Send(StateChannel, cdc.MustMarshalBinaryBare(nrsMsg))
}

func (conR *ConsensusReactor) gossipDataRoutine(peer p2p.Peer, ps *PeerState) {
Expand Down Expand Up @@ -524,6 +523,7 @@ OUTER_LOOP:
msg := &ProposalMessage{Proposal: rs.Proposal}
logger.Debug("Sending proposal", "height", prs.Height, "round", prs.Round)
if peer.Send(DataChannel, cdc.MustMarshalBinaryBare(msg)) {
// NOTE[ZM]: A peer might have received different proposal msg so this Proposal msg will be rejected!
ebuchman marked this conversation as resolved.
Show resolved Hide resolved
ps.SetHasProposal(rs.Proposal)
}
}
Expand Down Expand Up @@ -964,11 +964,18 @@ func (ps *PeerState) SetHasProposal(proposal *types.Proposal) {
if ps.PRS.Height != proposal.Height || ps.PRS.Round != proposal.Round {
return
}

if ps.PRS.Proposal {
return
}

ps.PRS.Proposal = true

// ps.PRS.ProposalBlockParts is set due to NewValidBlockMessage
if ps.PRS.ProposalBlockParts != nil {
return
}

ps.PRS.ProposalBlockPartsHeader = proposal.BlockPartsHeader
ps.PRS.ProposalBlockParts = cmn.NewBitArray(proposal.BlockPartsHeader.Total)
ps.PRS.ProposalPOLRound = proposal.POLRound
Expand Down Expand Up @@ -1211,7 +1218,6 @@ func (ps *PeerState) ApplyNewRoundStepMessage(msg *NewRoundStepMessage) {
// Just remember these values.
psHeight := ps.PRS.Height
psRound := ps.PRS.Round
//psStep := ps.PRS.Step
psCatchupCommitRound := ps.PRS.CatchupCommitRound
psCatchupCommit := ps.PRS.CatchupCommit

Expand Down Expand Up @@ -1252,15 +1258,19 @@ func (ps *PeerState) ApplyNewRoundStepMessage(msg *NewRoundStepMessage) {
}
}

// ApplyCommitStepMessage updates the peer state for the new commit.
func (ps *PeerState) ApplyCommitStepMessage(msg *CommitStepMessage) {
// ApplyNewValidBlockMessage updates the peer state for the new valid block.
func (ps *PeerState) ApplyNewValidBlockMessage(msg *NewValidBlockMessage) {
ps.mtx.Lock()
defer ps.mtx.Unlock()

if ps.PRS.Height != msg.Height {
return
}

if ps.PRS.Round != msg.Round && !msg.IsCommit {
return
}

ps.PRS.ProposalBlockPartsHeader = msg.BlockPartsHeader
ps.PRS.ProposalBlockParts = msg.BlockParts
}
Expand Down Expand Up @@ -1344,7 +1354,7 @@ type ConsensusMessage interface{}
func RegisterConsensusMessages(cdc *amino.Codec) {
cdc.RegisterInterface((*ConsensusMessage)(nil), nil)
cdc.RegisterConcrete(&NewRoundStepMessage{}, "tendermint/NewRoundStepMessage", nil)
cdc.RegisterConcrete(&CommitStepMessage{}, "tendermint/CommitStep", nil)
cdc.RegisterConcrete(&NewValidBlockMessage{}, "tendermint/NewValidBlockMessage", nil)
cdc.RegisterConcrete(&ProposalMessage{}, "tendermint/Proposal", nil)
cdc.RegisterConcrete(&ProposalPOLMessage{}, "tendermint/ProposalPOL", nil)
cdc.RegisterConcrete(&BlockPartMessage{}, "tendermint/BlockPart", nil)
Expand Down Expand Up @@ -1384,15 +1394,18 @@ func (m *NewRoundStepMessage) String() string {
//-------------------------------------

// CommitStepMessage is sent when a block is committed.
type CommitStepMessage struct {
type NewValidBlockMessage struct {
Height int64
Round int
BlockPartsHeader types.PartSetHeader
BlockParts *cmn.BitArray
IsCommit bool
}

// String returns a string representation.
func (m *CommitStepMessage) String() string {
return fmt.Sprintf("[CommitStep H:%v BP:%v BA:%v]", m.Height, m.BlockPartsHeader, m.BlockParts)
func (m *NewValidBlockMessage) String() string {
return fmt.Sprintf("[ValidBlockMessage H:%v R:%v BP:%v BA:%v IsCommit:%v]",
m.Height, m.Round, m.BlockPartsHeader, m.BlockParts, m.IsCommit)
}

//-------------------------------------
Expand Down
63 changes: 31 additions & 32 deletions consensus/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -904,13 +904,6 @@ func (cs *ConsensusState) defaultDecideProposal(height int64, round int) {
polRound, polBlockID := cs.Votes.POLInfo()
proposal := types.NewProposal(height, round, blockParts.Header(), polRound, polBlockID)
if err := cs.privValidator.SignProposal(cs.state.ChainID, proposal); err == nil {
// Set fields
/* fields set by setProposal and addBlockPart
cs.Proposal = proposal
cs.ProposalBlock = block
cs.ProposalBlockParts = blockParts
*/

// send proposal and block parts on internal msg queue
cs.sendInternalMessage(msgInfo{&ProposalMessage{proposal}, ""})
for i := 0; i < blockParts.Total(); i++ {
Expand Down Expand Up @@ -994,14 +987,6 @@ func (cs *ConsensusState) enterPrevote(height int64, round int) {
cs.newStep()
}()

// fire event for how we got here
if cs.isProposalComplete() {
cs.eventBus.PublishEventCompleteProposal(cs.RoundStateEvent())
} else {
// we received +2/3 prevotes for a future round
// TODO: catchup event?
}

cs.Logger.Info(fmt.Sprintf("enterPrevote(%v/%v). Current: %v/%v/%v", height, round, cs.Height, cs.Round, cs.Step))

// Sign and broadcast vote as necessary
Expand Down Expand Up @@ -1240,6 +1225,8 @@ func (cs *ConsensusState) enterCommit(height int64, commitRound int) {
// Set up ProposalBlockParts and keep waiting.
cs.ProposalBlock = nil
cs.ProposalBlockParts = types.NewPartSetFromHeader(blockID.PartsHeader)
cs.eventBus.PublishEventValidBlock(cs.RoundStateEvent())
cs.evsw.FireEvent(types.EventValidBlock, &cs.RoundState)
} else {
// We just need to keep waiting.
}
Expand Down Expand Up @@ -1420,11 +1407,6 @@ func (cs *ConsensusState) defaultSetProposal(proposal *types.Proposal) error {
return nil
}

// We don't care about the proposal if we're already in cstypes.RoundStepCommit.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This line was pointed out as the reason #2567 wasn't a bug (ie. see #2567 (comment)).

Now, instead of checking the commit step, we check if the ProposalBlockParts is already set, so we don't overwrite it. Is that right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was aware of #2567 and was convinced that we still have the same guarantee with the new condition. I will make one more check.

if cstypes.RoundStepCommit <= cs.Step {
return nil
}

// Verify POLRound, which must be -1 or between 0 and proposal.Round exclusive.
if proposal.POLRound != -1 &&
(proposal.POLRound < 0 || proposal.Round <= proposal.POLRound) {
Expand All @@ -1437,7 +1419,12 @@ func (cs *ConsensusState) defaultSetProposal(proposal *types.Proposal) error {
}

cs.Proposal = proposal
cs.ProposalBlockParts = types.NewPartSetFromHeader(proposal.BlockPartsHeader)
// We don't update cs.ProposalBlockParts if it is already set.
// This happens if we're already in cstypes.RoundStepCommit or if there is a valid block in the current round.
// TODO: We can check if Proposal is for a different block as this is a sign of misbehavior!
if cs.ProposalBlockParts == nil {
cs.ProposalBlockParts = types.NewPartSetFromHeader(proposal.BlockPartsHeader)
}
cs.Logger.Info("Received proposal", "proposal", proposal)
return nil
}
Expand Down Expand Up @@ -1478,6 +1465,7 @@ func (cs *ConsensusState) addProposalBlockPart(msg *BlockPartMessage, peerID p2p
}
// NOTE: it's possible to receive complete proposal blocks for future rounds without having the proposal
cs.Logger.Info("Received complete proposal block", "height", cs.ProposalBlock.Height, "hash", cs.ProposalBlock.Hash())
cs.eventBus.PublishEventCompleteProposal(cs.RoundStateEvent())

// Update Valid* if we can.
prevotes := cs.Votes.Prevotes(cs.Round)
Expand Down Expand Up @@ -1616,16 +1604,26 @@ func (cs *ConsensusState) addVote(vote *types.Vote, peerID p2p.ID) (added bool,

// Update Valid* if we can.
// NOTE: our proposal block may be nil or not what received a polka..
// TODO: we may want to still update the ValidBlock and obtain it via gossipping
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why don't we consider vote.Round < cs.Round anymore?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cs.ProposalBlock corresponds to the cs.Round. The spec also only cares about updating validBlock for the current round.

if len(blockID.Hash) != 0 &&
(cs.ValidRound < vote.Round) &&
(vote.Round <= cs.Round) &&
cs.ProposalBlock.HashesTo(blockID.Hash) {

cs.Logger.Info("Updating ValidBlock because of POL.", "validRound", cs.ValidRound, "POLRound", vote.Round)
cs.ValidRound = vote.Round
cs.ValidBlock = cs.ProposalBlock
cs.ValidBlockParts = cs.ProposalBlockParts
if len(blockID.Hash) != 0 && (cs.ValidRound < vote.Round) && (vote.Round == cs.Round) {

if cs.ProposalBlock.HashesTo(blockID.Hash) {
cs.Logger.Info(
"Updating ValidBlock because of POL.", "validRound", cs.ValidRound, "POLRound", vote.Round)
cs.ValidRound = vote.Round
cs.ValidBlock = cs.ProposalBlock
cs.ValidBlockParts = cs.ProposalBlockParts
} else {
cs.Logger.Info(
"Valid block we don't know about. Set ProposalBlock=nil",
"proposal", cs.ProposalBlock.Hash(), "blockId", blockID.Hash)
// We're getting the wrong block.
cs.ProposalBlock = nil
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what about cs.ValidRound and cs.ValidBlockParts? Shouldn't we reset these too?

Copy link
Contributor Author

@milosevic milosevic Oct 25, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We update cs.ValidRound, cs.ValidBlockParts and cs.ValidBlock only when we have the corresponding block. Therefore, at this point we can only "tell" gossip layer that we need some block, and then update ValidX variables when block arrives (if it arrives on time).

}
if !cs.ProposalBlockParts.HasHeader(blockID.PartsHeader) {
cs.ProposalBlockParts = types.NewPartSetFromHeader(blockID.PartsHeader)
}
cs.evsw.FireEvent(types.EventValidBlock, &cs.RoundState)
cs.eventBus.PublishEventValidBlock(cs.RoundStateEvent())
}
}

Expand All @@ -1634,7 +1632,8 @@ func (cs *ConsensusState) addVote(vote *types.Vote, peerID p2p.ID) (added bool,
// Round-skip if there is any 2/3+ of votes ahead of us
cs.enterNewRound(height, vote.Round)
} else if cs.Round == vote.Round && cstypes.RoundStepPrevote <= cs.Step { // current round
if prevotes.HasTwoThirdsMajority() {
blockID, ok := prevotes.TwoThirdsMajority()
if ok && (cs.isProposalComplete() || len(blockID.Hash) == 0) {
cs.enterPrecommit(height, vote.Round)
} else if prevotes.HasTwoThirdsAny() {
cs.enterPrevoteWait(height, vote.Round)
Expand Down