Skip to content

Commit

Permalink
Introduce EventValidBlock for informing peer about wanted block
Browse files Browse the repository at this point in the history
  • Loading branch information
Zarko Milosevic committed Oct 19, 2018
1 parent ed107d0 commit 3111d10
Show file tree
Hide file tree
Showing 8 changed files with 213 additions and 55 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG_PENDING.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ BUG FIXES:
timeoutPrecommit before starting next round
- [consensus] [\#1745](https://github.com/tendermint/tendermint/issues/1745) wait for
Proposal or timeoutProposal before entering prevote
- [consensus] [\#2583](https://github.com/tendermint/tendermint/issues/2583) ensure valid
block property with faulty proposer
- [evidence] \#2515 fix db iter leak (@goolAdapter)
- [common/bit_array] Fixed a bug in the `Or` function
- [common/bit_array] Fixed a bug in the `Sub` function (@james-ray)
Expand Down
2 changes: 1 addition & 1 deletion consensus/byzantine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,7 @@ func (br *ByzantineReactor) AddPeer(peer p2p.Peer) {
// Send our state to peer.
// If we're fast_syncing, broadcast a RoundStepMessage later upon SwitchToConsensus().
if !br.reactor.fastSync {
br.reactor.sendNewRoundStepMessages(peer)
br.reactor.sendNewRoundStepMessage(peer)
}
}
func (br *ByzantineReactor) RemovePeer(peer p2p.Peer, reason interface{}) {
Expand Down
5 changes: 5 additions & 0 deletions consensus/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -420,6 +420,11 @@ func ensureNewProposal(proposalCh <-chan interface{}, height int64, round int) {
"Timeout expired while waiting for NewProposal event")
}

func ensureNewValidBlock(validBlockCh <-chan interface{}, height int64, round int) {
ensureNewEvent(validBlockCh, height, round, ensureTimeout,
"Timeout expired while waiting for NewValidBlock event")
}

func ensureNewBlock(blockCh <-chan interface{}, height int64) {
select {
case <-time.After(ensureTimeout):
Expand Down
69 changes: 35 additions & 34 deletions consensus/reactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ func (conR *ConsensusReactor) AddPeer(peer p2p.Peer) {
// Send our state to peer.
// If we're fast_syncing, broadcast a RoundStepMessage later upon SwitchToConsensus().
if !conR.FastSync() {
conR.sendNewRoundStepMessages(peer)
conR.sendNewRoundStepMessage(peer)
}
}

Expand Down Expand Up @@ -215,8 +215,8 @@ func (conR *ConsensusReactor) Receive(chID byte, src p2p.Peer, msgBytes []byte)
switch msg := msg.(type) {
case *NewRoundStepMessage:
ps.ApplyNewRoundStepMessage(msg)
case *CommitStepMessage:
ps.ApplyCommitStepMessage(msg)
case *NewValidBlockMessage:
ps.ApplyNewValidBlockMessage(msg)
case *HasVoteMessage:
ps.ApplyHasVoteMessage(msg)
case *VoteSetMaj23Message:
Expand Down Expand Up @@ -368,6 +368,11 @@ func (conR *ConsensusReactor) subscribeToBroadcastEvents() {
conR.broadcastNewRoundStepMessages(data.(*cstypes.RoundState))
})

conR.conS.evsw.AddListenerForEvent(subscriber, types.EventValidBlock,
func(data tmevents.EventData) {
conR.broadcastNewValidBlockMessage(data.(*cstypes.RoundState))
})

conR.conS.evsw.AddListenerForEvent(subscriber, types.EventVote,
func(data tmevents.EventData) {
conR.broadcastHasVoteMessage(data.(*types.Vote))
Expand All @@ -392,13 +397,18 @@ func (conR *ConsensusReactor) broadcastProposalHeartbeatMessage(hb *types.Heartb
}

func (conR *ConsensusReactor) broadcastNewRoundStepMessages(rs *cstypes.RoundState) {
nrsMsg, csMsg := makeRoundStepMessages(rs)
if nrsMsg != nil {
conR.Switch.Broadcast(StateChannel, cdc.MustMarshalBinaryBare(nrsMsg))
}
if csMsg != nil {
conR.Switch.Broadcast(StateChannel, cdc.MustMarshalBinaryBare(csMsg))
nrsMsg := makeRoundStepMessage(rs)
conR.Switch.Broadcast(StateChannel, cdc.MustMarshalBinaryBare(nrsMsg))
}

func (conR *ConsensusReactor) broadcastNewValidBlockMessage(rs *cstypes.RoundState) {
csMsg := &NewValidBlockMessage{
Height: rs.Height,
Round: rs.Round,
BlockPartsHeader: rs.ProposalBlockParts.Header(),
BlockParts: rs.ProposalBlockParts.BitArray(),
}
conR.Switch.Broadcast(StateChannel, cdc.MustMarshalBinaryBare(csMsg))
}

// Broadcasts HasVoteMessage to peers that care.
Expand Down Expand Up @@ -427,33 +437,21 @@ func (conR *ConsensusReactor) broadcastHasVoteMessage(vote *types.Vote) {
*/
}

func makeRoundStepMessages(rs *cstypes.RoundState) (nrsMsg *NewRoundStepMessage, csMsg *CommitStepMessage) {
func makeRoundStepMessage(rs *cstypes.RoundState) (nrsMsg *NewRoundStepMessage) {
nrsMsg = &NewRoundStepMessage{
Height: rs.Height,
Round: rs.Round,
Step: rs.Step,
SecondsSinceStartTime: int(time.Since(rs.StartTime).Seconds()),
LastCommitRound: rs.LastCommit.Round(),
}
if rs.Step == cstypes.RoundStepCommit {
csMsg = &CommitStepMessage{
Height: rs.Height,
BlockPartsHeader: rs.ProposalBlockParts.Header(),
BlockParts: rs.ProposalBlockParts.BitArray(),
}
}
return
}

func (conR *ConsensusReactor) sendNewRoundStepMessages(peer p2p.Peer) {
func (conR *ConsensusReactor) sendNewRoundStepMessage(peer p2p.Peer) {
rs := conR.conS.GetRoundState()
nrsMsg, csMsg := makeRoundStepMessages(rs)
if nrsMsg != nil {
peer.Send(StateChannel, cdc.MustMarshalBinaryBare(nrsMsg))
}
if csMsg != nil {
peer.Send(StateChannel, cdc.MustMarshalBinaryBare(csMsg))
}
nrsMsg := makeRoundStepMessage(rs)
peer.Send(StateChannel, cdc.MustMarshalBinaryBare(nrsMsg))
}

func (conR *ConsensusReactor) gossipDataRoutine(peer p2p.Peer, ps *PeerState) {
Expand Down Expand Up @@ -524,6 +522,7 @@ OUTER_LOOP:
msg := &ProposalMessage{Proposal: rs.Proposal}
logger.Debug("Sending proposal", "height", prs.Height, "round", prs.Round)
if peer.Send(DataChannel, cdc.MustMarshalBinaryBare(msg)) {
// NOTE[ZM]: A peer might have received different proposal msg so this Proposal msg will be rejected!
ps.SetHasProposal(rs.Proposal)
}
}
Expand Down Expand Up @@ -964,7 +963,8 @@ func (ps *PeerState) SetHasProposal(proposal *types.Proposal) {
if ps.PRS.Height != proposal.Height || ps.PRS.Round != proposal.Round {
return
}
if ps.PRS.Proposal {
// ps.PRS.ProposalBlockParts is set due to NewValidBlockMessage
if ps.PRS.Proposal || ps.PRS.ProposalBlockParts != nil {
return
}

Expand Down Expand Up @@ -1211,7 +1211,6 @@ func (ps *PeerState) ApplyNewRoundStepMessage(msg *NewRoundStepMessage) {
// Just remember these values.
psHeight := ps.PRS.Height
psRound := ps.PRS.Round
//psStep := ps.PRS.Step
psCatchupCommitRound := ps.PRS.CatchupCommitRound
psCatchupCommit := ps.PRS.CatchupCommit

Expand Down Expand Up @@ -1252,12 +1251,12 @@ func (ps *PeerState) ApplyNewRoundStepMessage(msg *NewRoundStepMessage) {
}
}

// ApplyCommitStepMessage updates the peer state for the new commit.
func (ps *PeerState) ApplyCommitStepMessage(msg *CommitStepMessage) {
// ApplyNewValidBlockMessage updates the peer state for the new valid block.
func (ps *PeerState) ApplyNewValidBlockMessage(msg *NewValidBlockMessage) {
ps.mtx.Lock()
defer ps.mtx.Unlock()

if ps.PRS.Height != msg.Height {
if ps.PRS.Height != msg.Height && ps.PRS.Round != msg.Round {
return
}

Expand Down Expand Up @@ -1344,7 +1343,7 @@ type ConsensusMessage interface{}
func RegisterConsensusMessages(cdc *amino.Codec) {
cdc.RegisterInterface((*ConsensusMessage)(nil), nil)
cdc.RegisterConcrete(&NewRoundStepMessage{}, "tendermint/NewRoundStepMessage", nil)
cdc.RegisterConcrete(&CommitStepMessage{}, "tendermint/CommitStep", nil)
cdc.RegisterConcrete(&NewValidBlockMessage{}, "tendermint/NewValidBlockMessage", nil)
cdc.RegisterConcrete(&ProposalMessage{}, "tendermint/Proposal", nil)
cdc.RegisterConcrete(&ProposalPOLMessage{}, "tendermint/ProposalPOL", nil)
cdc.RegisterConcrete(&BlockPartMessage{}, "tendermint/BlockPart", nil)
Expand Down Expand Up @@ -1384,15 +1383,17 @@ func (m *NewRoundStepMessage) String() string {
//-------------------------------------

// CommitStepMessage is sent when a block is committed.
type CommitStepMessage struct {
type NewValidBlockMessage struct {
Height int64
Round int
BlockPartsHeader types.PartSetHeader
BlockParts *cmn.BitArray
}

// String returns a string representation.
func (m *CommitStepMessage) String() string {
return fmt.Sprintf("[CommitStep H:%v BP:%v BA:%v]", m.Height, m.BlockPartsHeader, m.BlockParts)
func (m *NewValidBlockMessage) String() string {
return fmt.Sprintf("[ValidBlockMessage H:%v R:%v BP:%v BA:%v]",
m.Height, m.Round, m.BlockPartsHeader, m.BlockParts)
}

//-------------------------------------
Expand Down
48 changes: 28 additions & 20 deletions consensus/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -904,13 +904,6 @@ func (cs *ConsensusState) defaultDecideProposal(height int64, round int) {
polRound, polBlockID := cs.Votes.POLInfo()
proposal := types.NewProposal(height, round, blockParts.Header(), polRound, polBlockID)
if err := cs.privValidator.SignProposal(cs.state.ChainID, proposal); err == nil {
// Set fields
/* fields set by setProposal and addBlockPart
cs.Proposal = proposal
cs.ProposalBlock = block
cs.ProposalBlockParts = blockParts
*/

// send proposal and block parts on internal msg queue
cs.sendInternalMessage(msgInfo{&ProposalMessage{proposal}, ""})
for i := 0; i < blockParts.Total(); i++ {
Expand Down Expand Up @@ -1240,6 +1233,8 @@ func (cs *ConsensusState) enterCommit(height int64, commitRound int) {
// Set up ProposalBlockParts and keep waiting.
cs.ProposalBlock = nil
cs.ProposalBlockParts = types.NewPartSetFromHeader(blockID.PartsHeader)
cs.eventBus.PublishEventValidBlock(cs.RoundStateEvent())
cs.evsw.FireEvent(types.EventValidBlock, &cs.RoundState)
} else {
// We just need to keep waiting.
}
Expand Down Expand Up @@ -1420,8 +1415,10 @@ func (cs *ConsensusState) defaultSetProposal(proposal *types.Proposal) error {
return nil
}

// We don't care about the proposal if we're already in cstypes.RoundStepCommit.
if cstypes.RoundStepCommit <= cs.Step {
// We don't care about the proposal if cs.ProposalBlockParts is already set.
// This happens if we're already in cstypes.RoundStepCommit or if there is a valid block in the current round.
// TODO: We can check if Proposal is for a different block as this is a sign of misbehavior!
if cs.ProposalBlockParts != nil {
return nil
}

Expand Down Expand Up @@ -1616,16 +1613,26 @@ func (cs *ConsensusState) addVote(vote *types.Vote, peerID p2p.ID) (added bool,

// Update Valid* if we can.
// NOTE: our proposal block may be nil or not what received a polka..
// TODO: we may want to still update the ValidBlock and obtain it via gossipping
if len(blockID.Hash) != 0 &&
(cs.ValidRound < vote.Round) &&
(vote.Round <= cs.Round) &&
cs.ProposalBlock.HashesTo(blockID.Hash) {

cs.Logger.Info("Updating ValidBlock because of POL.", "validRound", cs.ValidRound, "POLRound", vote.Round)
cs.ValidRound = vote.Round
cs.ValidBlock = cs.ProposalBlock
cs.ValidBlockParts = cs.ProposalBlockParts
if len(blockID.Hash) != 0 && (cs.ValidRound < vote.Round) && (vote.Round == cs.Round) {

if cs.ProposalBlock.HashesTo(blockID.Hash) {
cs.Logger.Info(
"Updating ValidBlock because of POL.", "validRound", cs.ValidRound, "POLRound", vote.Round)
cs.ValidRound = vote.Round
cs.ValidBlock = cs.ProposalBlock
cs.ValidBlockParts = cs.ProposalBlockParts
} else {
cs.Logger.Info(
"Valid block we don't know about. Set ProposalBlock=nil",
"proposal", cs.ProposalBlock.Hash(), "blockId", blockID.Hash)
// We're getting the wrong block.
cs.ProposalBlock = nil
}
if !cs.ProposalBlockParts.HasHeader(blockID.PartsHeader) {
cs.ProposalBlockParts = types.NewPartSetFromHeader(blockID.PartsHeader)
}
cs.evsw.FireEvent(types.EventValidBlock, &cs.RoundState)
cs.eventBus.PublishEventValidBlock(cs.RoundStateEvent())
}
}

Expand All @@ -1634,7 +1641,8 @@ func (cs *ConsensusState) addVote(vote *types.Vote, peerID p2p.ID) (added bool,
// Round-skip if there is any 2/3+ of votes ahead of us
cs.enterNewRound(height, vote.Round)
} else if cs.Round == vote.Round && cstypes.RoundStepPrevote <= cs.Step { // current round
if prevotes.HasTwoThirdsMajority() {
blockID, ok := prevotes.TwoThirdsMajority()
if ok && (cs.isProposalComplete() || len(blockID.Hash) == 0) {
cs.enterPrecommit(height, vote.Round)
} else if prevotes.HasTwoThirdsAny() {
cs.enterPrevoteWait(height, vote.Round)
Expand Down

0 comments on commit 3111d10

Please sign in to comment.