-
Notifications
You must be signed in to change notification settings - Fork 2.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Introduce EventValidBlock for informing peers about wanted block #2652
Changes from 3 commits
8ec9a55
4fe1b89
339fdf2
fa68a91
108a9a8
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -174,7 +174,7 @@ func (conR *ConsensusReactor) AddPeer(peer p2p.Peer) { | |
// Send our state to peer. | ||
// If we're fast_syncing, broadcast a RoundStepMessage later upon SwitchToConsensus(). | ||
if !conR.FastSync() { | ||
conR.sendNewRoundStepMessages(peer) | ||
conR.sendNewRoundStepMessage(peer) | ||
} | ||
} | ||
|
||
|
@@ -215,8 +215,8 @@ func (conR *ConsensusReactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) | |
switch msg := msg.(type) { | ||
case *NewRoundStepMessage: | ||
ps.ApplyNewRoundStepMessage(msg) | ||
case *CommitStepMessage: | ||
ps.ApplyCommitStepMessage(msg) | ||
case *NewValidBlockMessage: | ||
ps.ApplyNewValidBlockMessage(msg) | ||
case *HasVoteMessage: | ||
ps.ApplyHasVoteMessage(msg) | ||
case *VoteSetMaj23Message: | ||
|
@@ -368,6 +368,11 @@ func (conR *ConsensusReactor) subscribeToBroadcastEvents() { | |
conR.broadcastNewRoundStepMessages(data.(*cstypes.RoundState)) | ||
}) | ||
|
||
conR.conS.evsw.AddListenerForEvent(subscriber, types.EventValidBlock, | ||
func(data tmevents.EventData) { | ||
conR.broadcastNewValidBlockMessage(data.(*cstypes.RoundState)) | ||
}) | ||
|
||
conR.conS.evsw.AddListenerForEvent(subscriber, types.EventVote, | ||
func(data tmevents.EventData) { | ||
conR.broadcastHasVoteMessage(data.(*types.Vote)) | ||
|
@@ -392,13 +397,18 @@ func (conR *ConsensusReactor) broadcastProposalHeartbeatMessage(hb *types.Heartb | |
} | ||
|
||
func (conR *ConsensusReactor) broadcastNewRoundStepMessages(rs *cstypes.RoundState) { | ||
nrsMsg, csMsg := makeRoundStepMessages(rs) | ||
if nrsMsg != nil { | ||
conR.Switch.Broadcast(StateChannel, cdc.MustMarshalBinaryBare(nrsMsg)) | ||
} | ||
if csMsg != nil { | ||
conR.Switch.Broadcast(StateChannel, cdc.MustMarshalBinaryBare(csMsg)) | ||
nrsMsg := makeRoundStepMessage(rs) | ||
conR.Switch.Broadcast(StateChannel, cdc.MustMarshalBinaryBare(nrsMsg)) | ||
} | ||
|
||
func (conR *ConsensusReactor) broadcastNewValidBlockMessage(rs *cstypes.RoundState) { | ||
csMsg := &NewValidBlockMessage{ | ||
Height: rs.Height, | ||
Round: rs.Round, | ||
BlockPartsHeader: rs.ProposalBlockParts.Header(), | ||
BlockParts: rs.ProposalBlockParts.BitArray(), | ||
} | ||
conR.Switch.Broadcast(StateChannel, cdc.MustMarshalBinaryBare(csMsg)) | ||
} | ||
|
||
// Broadcasts HasVoteMessage to peers that care. | ||
|
@@ -427,33 +437,21 @@ func (conR *ConsensusReactor) broadcastHasVoteMessage(vote *types.Vote) { | |
*/ | ||
} | ||
|
||
func makeRoundStepMessages(rs *cstypes.RoundState) (nrsMsg *NewRoundStepMessage, csMsg *CommitStepMessage) { | ||
func makeRoundStepMessage(rs *cstypes.RoundState) (nrsMsg *NewRoundStepMessage) { | ||
nrsMsg = &NewRoundStepMessage{ | ||
Height: rs.Height, | ||
Round: rs.Round, | ||
Step: rs.Step, | ||
SecondsSinceStartTime: int(time.Since(rs.StartTime).Seconds()), | ||
LastCommitRound: rs.LastCommit.Round(), | ||
} | ||
if rs.Step == cstypes.RoundStepCommit { | ||
csMsg = &CommitStepMessage{ | ||
Height: rs.Height, | ||
BlockPartsHeader: rs.ProposalBlockParts.Header(), | ||
BlockParts: rs.ProposalBlockParts.BitArray(), | ||
} | ||
} | ||
return | ||
} | ||
|
||
func (conR *ConsensusReactor) sendNewRoundStepMessages(peer p2p.Peer) { | ||
func (conR *ConsensusReactor) sendNewRoundStepMessage(peer p2p.Peer) { | ||
rs := conR.conS.GetRoundState() | ||
nrsMsg, csMsg := makeRoundStepMessages(rs) | ||
if nrsMsg != nil { | ||
peer.Send(StateChannel, cdc.MustMarshalBinaryBare(nrsMsg)) | ||
} | ||
if csMsg != nil { | ||
peer.Send(StateChannel, cdc.MustMarshalBinaryBare(csMsg)) | ||
} | ||
nrsMsg := makeRoundStepMessage(rs) | ||
peer.Send(StateChannel, cdc.MustMarshalBinaryBare(nrsMsg)) | ||
} | ||
|
||
func (conR *ConsensusReactor) gossipDataRoutine(peer p2p.Peer, ps *PeerState) { | ||
|
@@ -524,6 +522,7 @@ OUTER_LOOP: | |
msg := &ProposalMessage{Proposal: rs.Proposal} | ||
logger.Debug("Sending proposal", "height", prs.Height, "round", prs.Round) | ||
if peer.Send(DataChannel, cdc.MustMarshalBinaryBare(msg)) { | ||
// NOTE[ZM]: A peer might have received different proposal msg so this Proposal msg will be rejected! | ||
ebuchman marked this conversation as resolved.
Show resolved
Hide resolved
|
||
ps.SetHasProposal(rs.Proposal) | ||
} | ||
} | ||
|
@@ -964,7 +963,8 @@ func (ps *PeerState) SetHasProposal(proposal *types.Proposal) { | |
if ps.PRS.Height != proposal.Height || ps.PRS.Round != proposal.Round { | ||
return | ||
} | ||
if ps.PRS.Proposal { | ||
// ps.PRS.ProposalBlockParts is set due to NewValidBlockMessage | ||
if ps.PRS.Proposal || ps.PRS.ProposalBlockParts != nil { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is this really necessary? I believe the situation this captures is when they don't have a proposal, but they sent us NewValidBlockMessage. For this to run, then either they'd have to send us a proposal (which I think would be Byzantine) or we'd have to have tried to send it to them (should that ever happen?). I think it does make sense to have the check here, just hard to follow the circumstance where There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The key part here is |
||
return | ||
} | ||
|
||
|
@@ -1211,7 +1211,6 @@ func (ps *PeerState) ApplyNewRoundStepMessage(msg *NewRoundStepMessage) { | |
// Just remember these values. | ||
psHeight := ps.PRS.Height | ||
psRound := ps.PRS.Round | ||
//psStep := ps.PRS.Step | ||
psCatchupCommitRound := ps.PRS.CatchupCommitRound | ||
psCatchupCommit := ps.PRS.CatchupCommit | ||
|
||
|
@@ -1252,12 +1251,12 @@ func (ps *PeerState) ApplyNewRoundStepMessage(msg *NewRoundStepMessage) { | |
} | ||
} | ||
|
||
// ApplyCommitStepMessage updates the peer state for the new commit. | ||
func (ps *PeerState) ApplyCommitStepMessage(msg *CommitStepMessage) { | ||
// ApplyNewValidBlockMessage updates the peer state for the new valid block. | ||
func (ps *PeerState) ApplyNewValidBlockMessage(msg *NewValidBlockMessage) { | ||
ps.mtx.Lock() | ||
defer ps.mtx.Unlock() | ||
|
||
if ps.PRS.Height != msg.Height { | ||
if ps.PRS.Height != msg.Height && ps.PRS.Round != msg.Round { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What if there was a commit in Round 2, but this peer never received the block from Round 2, and is already in Round 3. Then it sees the commit from round 2, and sends us NewValidBlockMessage with Round==2? Do we need special logic for when the NewValidBlockMessage is specifically for a commit, since the round may be different then? Hard to write a test for this right now :( There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Excellent point, I missed this. We probably need to treat commit in a special way. Maybe we can just add flag to NewValidBlockMessage to say if it is commit or not. And yes, we don't have tests for this, but it will come soon :) |
||
return | ||
} | ||
|
||
|
@@ -1344,7 +1343,7 @@ type ConsensusMessage interface{} | |
func RegisterConsensusMessages(cdc *amino.Codec) { | ||
cdc.RegisterInterface((*ConsensusMessage)(nil), nil) | ||
cdc.RegisterConcrete(&NewRoundStepMessage{}, "tendermint/NewRoundStepMessage", nil) | ||
cdc.RegisterConcrete(&CommitStepMessage{}, "tendermint/CommitStep", nil) | ||
cdc.RegisterConcrete(&NewValidBlockMessage{}, "tendermint/NewValidBlockMessage", nil) | ||
cdc.RegisterConcrete(&ProposalMessage{}, "tendermint/Proposal", nil) | ||
cdc.RegisterConcrete(&ProposalPOLMessage{}, "tendermint/ProposalPOL", nil) | ||
cdc.RegisterConcrete(&BlockPartMessage{}, "tendermint/BlockPart", nil) | ||
|
@@ -1384,15 +1383,17 @@ func (m *NewRoundStepMessage) String() string { | |
//------------------------------------- | ||
|
||
// CommitStepMessage is sent when a block is committed. | ||
type CommitStepMessage struct { | ||
type NewValidBlockMessage struct { | ||
Height int64 | ||
Round int | ||
BlockPartsHeader types.PartSetHeader | ||
BlockParts *cmn.BitArray | ||
} | ||
|
||
// String returns a string representation. | ||
func (m *CommitStepMessage) String() string { | ||
return fmt.Sprintf("[CommitStep H:%v BP:%v BA:%v]", m.Height, m.BlockPartsHeader, m.BlockParts) | ||
func (m *NewValidBlockMessage) String() string { | ||
return fmt.Sprintf("[ValidBlockMessage H:%v R:%v BP:%v BA:%v]", | ||
m.Height, m.Round, m.BlockPartsHeader, m.BlockParts) | ||
} | ||
|
||
//------------------------------------- | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -904,13 +904,6 @@ func (cs *ConsensusState) defaultDecideProposal(height int64, round int) { | |
polRound, polBlockID := cs.Votes.POLInfo() | ||
proposal := types.NewProposal(height, round, blockParts.Header(), polRound, polBlockID) | ||
if err := cs.privValidator.SignProposal(cs.state.ChainID, proposal); err == nil { | ||
// Set fields | ||
/* fields set by setProposal and addBlockPart | ||
cs.Proposal = proposal | ||
cs.ProposalBlock = block | ||
cs.ProposalBlockParts = blockParts | ||
*/ | ||
|
||
// send proposal and block parts on internal msg queue | ||
cs.sendInternalMessage(msgInfo{&ProposalMessage{proposal}, ""}) | ||
for i := 0; i < blockParts.Total(); i++ { | ||
|
@@ -1240,6 +1233,8 @@ func (cs *ConsensusState) enterCommit(height int64, commitRound int) { | |
// Set up ProposalBlockParts and keep waiting. | ||
cs.ProposalBlock = nil | ||
cs.ProposalBlockParts = types.NewPartSetFromHeader(blockID.PartsHeader) | ||
cs.eventBus.PublishEventValidBlock(cs.RoundStateEvent()) | ||
cs.evsw.FireEvent(types.EventValidBlock, &cs.RoundState) | ||
} else { | ||
// We just need to keep waiting. | ||
} | ||
|
@@ -1420,11 +1415,6 @@ func (cs *ConsensusState) defaultSetProposal(proposal *types.Proposal) error { | |
return nil | ||
} | ||
|
||
// We don't care about the proposal if we're already in cstypes.RoundStepCommit. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This line was pointed out as the reason #2567 wasn't a bug (ie. see #2567 (comment)). Now, instead of checking the commit step, we check if the ProposalBlockParts is already set, so we don't overwrite it. Is that right? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I was aware of #2567 and was convinced that we still have the same guarantee with the new condition. I will make one more check. |
||
if cstypes.RoundStepCommit <= cs.Step { | ||
return nil | ||
} | ||
|
||
// Verify POLRound, which must be -1 or between 0 and proposal.Round exclusive. | ||
if proposal.POLRound != -1 && | ||
(proposal.POLRound < 0 || proposal.Round <= proposal.POLRound) { | ||
|
@@ -1437,7 +1427,12 @@ func (cs *ConsensusState) defaultSetProposal(proposal *types.Proposal) error { | |
} | ||
|
||
cs.Proposal = proposal | ||
cs.ProposalBlockParts = types.NewPartSetFromHeader(proposal.BlockPartsHeader) | ||
// We don't update cs.ProposalBlockParts if it is already set. | ||
// This happens if we're already in cstypes.RoundStepCommit or if there is a valid block in the current round. | ||
// TODO: We can check if Proposal is for a different block as this is a sign of misbehavior! | ||
if cs.ProposalBlockParts == nil { | ||
cs.ProposalBlockParts = types.NewPartSetFromHeader(proposal.BlockPartsHeader) | ||
} | ||
cs.Logger.Info("Received proposal", "proposal", proposal) | ||
return nil | ||
} | ||
|
@@ -1616,16 +1611,26 @@ func (cs *ConsensusState) addVote(vote *types.Vote, peerID p2p.ID) (added bool, | |
|
||
// Update Valid* if we can. | ||
// NOTE: our proposal block may be nil or not what received a polka.. | ||
// TODO: we may want to still update the ValidBlock and obtain it via gossipping | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why don't we consider vote.Round < cs.Round anymore? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. cs.ProposalBlock corresponds to the cs.Round. The spec also only cares about updating validBlock for the current round. |
||
if len(blockID.Hash) != 0 && | ||
(cs.ValidRound < vote.Round) && | ||
(vote.Round <= cs.Round) && | ||
cs.ProposalBlock.HashesTo(blockID.Hash) { | ||
|
||
cs.Logger.Info("Updating ValidBlock because of POL.", "validRound", cs.ValidRound, "POLRound", vote.Round) | ||
cs.ValidRound = vote.Round | ||
cs.ValidBlock = cs.ProposalBlock | ||
cs.ValidBlockParts = cs.ProposalBlockParts | ||
if len(blockID.Hash) != 0 && (cs.ValidRound < vote.Round) && (vote.Round == cs.Round) { | ||
|
||
if cs.ProposalBlock.HashesTo(blockID.Hash) { | ||
cs.Logger.Info( | ||
"Updating ValidBlock because of POL.", "validRound", cs.ValidRound, "POLRound", vote.Round) | ||
cs.ValidRound = vote.Round | ||
cs.ValidBlock = cs.ProposalBlock | ||
cs.ValidBlockParts = cs.ProposalBlockParts | ||
} else { | ||
cs.Logger.Info( | ||
"Valid block we don't know about. Set ProposalBlock=nil", | ||
"proposal", cs.ProposalBlock.Hash(), "blockId", blockID.Hash) | ||
// We're getting the wrong block. | ||
cs.ProposalBlock = nil | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. what about There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We update |
||
} | ||
if !cs.ProposalBlockParts.HasHeader(blockID.PartsHeader) { | ||
cs.ProposalBlockParts = types.NewPartSetFromHeader(blockID.PartsHeader) | ||
} | ||
cs.evsw.FireEvent(types.EventValidBlock, &cs.RoundState) | ||
cs.eventBus.PublishEventValidBlock(cs.RoundStateEvent()) | ||
} | ||
} | ||
|
||
|
@@ -1634,7 +1639,8 @@ func (cs *ConsensusState) addVote(vote *types.Vote, peerID p2p.ID) (added bool, | |
// Round-skip if there is any 2/3+ of votes ahead of us | ||
cs.enterNewRound(height, vote.Round) | ||
} else if cs.Round == vote.Round && cstypes.RoundStepPrevote <= cs.Step { // current round | ||
if prevotes.HasTwoThirdsMajority() { | ||
blockID, ok := prevotes.TwoThirdsMajority() | ||
if ok && (cs.isProposalComplete() || len(blockID.Hash) == 0) { | ||
cs.enterPrecommit(height, vote.Round) | ||
} else if prevotes.HasTwoThirdsAny() { | ||
cs.enterPrevoteWait(height, vote.Round) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
broadcastNewRoundStepMessage