Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[R4R] Add proposer to NewRound event and proposal info to CompleteProposal event #2767

Merged
merged 14 commits into from
Nov 15, 2018
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG_PENDING.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ Friendly reminder, we have a [bug bounty program](https://hackerone.com/tendermi

### FEATURES:

- [eventbus] create new event types for NewRound and CompleteProposal. add proposed block info to CompleteProposal event and proposer info to NewRound event
ebuchman marked this conversation as resolved.
Show resolved Hide resolved

### IMPROVEMENTS:

### BUG FIXES:
Expand Down
65 changes: 61 additions & 4 deletions consensus/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -405,8 +405,24 @@ func ensureNewVote(voteCh <-chan interface{}, height int64, round int) {
}

func ensureNewRound(roundCh <-chan interface{}, height int64, round int) {
ensureNewEvent(roundCh, height, round, ensureTimeout,
"Timeout expired while waiting for NewRound event")
select {
case <-time.After(ensureTimeout):
panic("Timeout expired while waiting for NewRound event")
case ev := <-roundCh:
rs, ok := ev.(types.EventDataNewRound)
if !ok {
panic(
fmt.Sprintf(
"expected a EventDataNewRound, got %v.Wrong subscription channel?",
reflect.TypeOf(rs)))
}
if rs.Height != height {
ebuchman marked this conversation as resolved.
Show resolved Hide resolved
panic(fmt.Sprintf("expected height %v, got %v", height, rs.Height))
}
if rs.Round != round {
panic(fmt.Sprintf("expected round %v, got %v", round, rs.Round))
}
}
}

func ensureNewTimeout(timeoutCh <-chan interface{}, height int64, round int, timeout int64) {
Expand All @@ -416,8 +432,24 @@ func ensureNewTimeout(timeoutCh <-chan interface{}, height int64, round int, tim
}

func ensureNewProposal(proposalCh <-chan interface{}, height int64, round int) {
ensureNewEvent(proposalCh, height, round, ensureTimeout,
"Timeout expired while waiting for NewProposal event")
select {
case <-time.After(ensureTimeout):
panic("Timeout expired while waiting for NewProposal event")
case ev := <-proposalCh:
rs, ok := ev.(types.EventDataCompleteProposal)
if !ok {
panic(
fmt.Sprintf(
"expected a EventDataCompleteProposal, got %v.Wrong subscription channel?",
reflect.TypeOf(rs)))
}
if rs.Height != height {
panic(fmt.Sprintf("expected height %v, got %v", height, rs.Height))
}
if rs.Round != round {
panic(fmt.Sprintf("expected round %v, got %v", round, rs.Round))
}
}
}

func ensureNewValidBlock(validBlockCh <-chan interface{}, height int64, round int) {
Expand Down Expand Up @@ -492,6 +524,31 @@ func ensureVote(voteCh <-chan interface{}, height int64, round int,
}
}

func ensureProposal(proposalCh <-chan interface{}, height int64, round int, propId types.BlockID) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What prompted the need for this if we already have ensureNewProposal?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

allows us to also assert details about the proposal (eg we want to assert this specific block is proposed vs a block is proposed). I took the pattern from ensureNewVote (though it isnt actually used at the moment) vs ensureVote

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep - we probably want to replace ensureNewProposal with this eventually so we're always asserting the correct value. Fine for now.

select {
case <-time.After(ensureTimeout):
panic("Timeout expired while waiting for NewProposal event")
case ev := <-proposalCh:
rs, ok := ev.(types.EventDataCompleteProposal)
if !ok {
panic(
fmt.Sprintf(
"expected a EventDataCompleteProposal, got %v.Wrong subscription channel?",
reflect.TypeOf(rs)))
}
if rs.Height != height {
panic(fmt.Sprintf("expected height %v, got %v", height, rs.Height))
}
if rs.Round != round {
panic(fmt.Sprintf("expected round %v, got %v", round, rs.Round))
}
if !rs.BlockID.Equals(propId) {
panic("Proposed block does not match expected block")
}
}
}


func ensurePrecommit(voteCh <-chan interface{}, height int64, round int) {
ensureVote(voteCh, height, round, types.PrecommitType)
}
Expand Down
6 changes: 3 additions & 3 deletions consensus/mempool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,18 +71,18 @@ func TestMempoolProgressInHigherRound(t *testing.T) {
}
startTestRound(cs, height, round)

ensureNewRoundStep(newRoundCh, height, round) // first round at first height
ensureNewRound(newRoundCh, height, round) // first round at first height
ensureNewEventOnChannel(newBlockCh) // first block gets committed

height = height + 1 // moving to the next height
round = 0

ensureNewRoundStep(newRoundCh, height, round) // first round at next height
ensureNewRound(newRoundCh, height, round) // first round at next height
deliverTxsRange(cs, 0, 1) // we deliver txs, but dont set a proposal so we get the next round
ensureNewTimeout(timeoutCh, height, round, cs.config.TimeoutPropose.Nanoseconds())

round = round + 1 // moving to the next round
ensureNewRoundStep(newRoundCh, height, round) // wait for the next round
ensureNewRound(newRoundCh, height, round) // wait for the next round
ensureNewEventOnChannel(newBlockCh) // now we can commit the block
}

Expand Down
6 changes: 3 additions & 3 deletions consensus/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -776,7 +776,7 @@ func (cs *ConsensusState) enterNewRound(height int64, round int) {
cs.Votes.SetRound(round + 1) // also track next round (round+1) to allow round-skipping
cs.triggeredTimeoutPrecommit = false

cs.eventBus.PublishEventNewRound(cs.RoundStateEvent())
cs.eventBus.PublishEventNewRound(cs.NewRoundEvent())
cs.metrics.Rounds.Set(float64(round))

// Wait for txs to be available in the mempool
Expand Down Expand Up @@ -1408,7 +1408,7 @@ func (cs *ConsensusState) defaultSetProposal(proposal *types.Proposal) error {
return nil
}

// Verify POLRound, which must be -1 or in range [0, proposal.Round).
// Verify POLRound, which must be -1 or in range [0, proposal.Round).
if proposal.POLRound < -1 ||
(proposal.POLRound >= 0 && proposal.POLRound >= proposal.Round) {
return ErrInvalidProposalPOLRound
Expand Down Expand Up @@ -1466,7 +1466,7 @@ func (cs *ConsensusState) addProposalBlockPart(msg *BlockPartMessage, peerID p2p
}
// NOTE: it's possible to receive complete proposal blocks for future rounds without having the proposal
cs.Logger.Info("Received complete proposal block", "height", cs.ProposalBlock.Height, "hash", cs.ProposalBlock.Hash())
cs.eventBus.PublishEventCompleteProposal(cs.RoundStateEvent())
cs.eventBus.PublishEventCompleteProposal(cs.CompleteProposalEvent())

// Update Valid* if we can.
prevotes := cs.Votes.Prevotes(cs.Round)
Expand Down
7 changes: 3 additions & 4 deletions consensus/state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,9 +197,8 @@ func TestStateBadProposal(t *testing.T) {
stateHash[0] = byte((stateHash[0] + 1) % 255)
propBlock.AppHash = stateHash
propBlockParts := propBlock.MakePartSet(partSize)
proposal := types.NewProposal(
vs2.Height, round, -1,
types.BlockID{propBlock.Hash(), propBlockParts.Header()})
blockID := types.BlockID{propBlock.Hash(), propBlockParts.Header()}
proposal := types.NewProposal(vs2.Height, round, -1, blockID)
if err := vs2.SignProposal(config.ChainID(), proposal); err != nil {
t.Fatal("failed to sign bad proposal", err)
}
Expand All @@ -213,7 +212,7 @@ func TestStateBadProposal(t *testing.T) {
startTestRound(cs1, height, round)

// wait for proposal
ensureNewProposal(proposalCh, height, round)
ensureProposal(proposalCh, height, round, blockID)

// wait for prevote
ensurePrevote(voteCh, height, round)
Expand Down
42 changes: 40 additions & 2 deletions consensus/types/round_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,18 +112,56 @@ func (rs *RoundState) RoundStateSimple() RoundStateSimple {
}
}

// NewRoundEvent returns the RoundState with proposer information as an event.
func (rs *RoundState) NewRoundEvent() types.EventDataNewRound {
addr := rs.Validators.GetProposer().Address
idx, _ := rs.Validators.GetByAddress(addr)

ednr := types.EventDataNewRound{
HeightRoundStep: rs.heightRoundStep(),
Proposer: types.ValidatorInfo{
Address: addr,
Index: idx,
},
}
return ednr
}

// CompleteProposalEvent returns information about a proposed block as an event.
func (rs *RoundState) CompleteProposalEvent() types.EventDataCompleteProposal {
// We must construct BlockID from ProposalBlock and ProposalBlockParts
// cs.Proposal is not guaranteed to be set when this function is called
blockId := types.BlockID{
Hash: rs.ProposalBlock.Hash(),
PartsHeader: rs.ProposalBlockParts.Header(),
}

edcp := types.EventDataCompleteProposal{
ebuchman marked this conversation as resolved.
Show resolved Hide resolved
HeightRoundStep: rs.heightRoundStep(),
BlockID: blockId,
ebuchman marked this conversation as resolved.
Show resolved Hide resolved
}
return edcp
}

// RoundStateEvent returns the H/R/S of the RoundState as an event.
func (rs *RoundState) RoundStateEvent() types.EventDataRoundState {
// copy the RoundState.
// TODO: if we want to avoid this, we may need synchronous events after all
rsCopy := *rs
edrs := types.EventDataRoundState{
HeightRoundStep: rs.heightRoundStep(),
RoundState: &rsCopy,
}
return edrs
}

func (rs *RoundState) heightRoundStep() types.HeightRoundStep {
hrs := types.HeightRoundStep{
Height: rs.Height,
Round: rs.Round,
Step: rs.Step.String(),
RoundState: &rsCopy,
}
return edrs
return hrs
}

// String returns a string
Expand Down
4 changes: 2 additions & 2 deletions types/event_bus.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,11 +136,11 @@ func (b *EventBus) PublishEventTimeoutWait(data EventDataRoundState) error {
return b.Publish(EventTimeoutWait, data)
}

func (b *EventBus) PublishEventNewRound(data EventDataRoundState) error {
func (b *EventBus) PublishEventNewRound(data EventDataNewRound) error {
return b.Publish(EventNewRound, data)
}

func (b *EventBus) PublishEventCompleteProposal(data EventDataRoundState) error {
func (b *EventBus) PublishEventCompleteProposal(data EventDataCompleteProposal) error {
return b.Publish(EventCompleteProposal, data)
}

Expand Down
4 changes: 2 additions & 2 deletions types/event_bus_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,9 +96,9 @@ func TestEventBusPublish(t *testing.T) {
require.NoError(t, err)
err = eventBus.PublishEventTimeoutWait(EventDataRoundState{})
require.NoError(t, err)
err = eventBus.PublishEventNewRound(EventDataRoundState{})
err = eventBus.PublishEventNewRound(EventDataNewRound{})
require.NoError(t, err)
err = eventBus.PublishEventCompleteProposal(EventDataRoundState{})
err = eventBus.PublishEventCompleteProposal(EventDataCompleteProposal{})
require.NoError(t, err)
err = eventBus.PublishEventPolka(EventDataRoundState{})
require.NoError(t, err)
Expand Down
27 changes: 25 additions & 2 deletions types/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ func RegisterEventDatas(cdc *amino.Codec) {
cdc.RegisterConcrete(EventDataNewBlockHeader{}, "tendermint/event/NewBlockHeader", nil)
cdc.RegisterConcrete(EventDataTx{}, "tendermint/event/Tx", nil)
cdc.RegisterConcrete(EventDataRoundState{}, "tendermint/event/RoundState", nil)
cdc.RegisterConcrete(EventDataNewRound{}, "tendermint/event/NewRound", nil)
cdc.RegisterConcrete(EventDataCompleteProposal{}, "tendermint/event/CompleteProposal", nil)
cdc.RegisterConcrete(EventDataVote{}, "tendermint/event/Vote", nil)
cdc.RegisterConcrete(EventDataProposalHeartbeat{}, "tendermint/event/ProposalHeartbeat", nil)
cdc.RegisterConcrete(EventDataValidatorSetUpdates{}, "tendermint/event/ValidatorSetUpdates", nil)
Expand Down Expand Up @@ -70,16 +72,37 @@ type EventDataProposalHeartbeat struct {
Heartbeat *Heartbeat
}

// NOTE: This goes into the replay WAL
type EventDataRoundState struct {
type HeightRoundStep struct {
kevlubkcm marked this conversation as resolved.
Show resolved Hide resolved
Height int64 `json:"height"`
Round int `json:"round"`
Step string `json:"step"`
}

// NOTE: This goes into the replay WAL
type EventDataRoundState struct {
HeightRoundStep

// private, not exposed to websockets
RoundState interface{} `json:"-"`
}

type ValidatorInfo struct {
Address Address `json:"address"`
Index int `json:"index"`
}

type EventDataNewRound struct {
HeightRoundStep

Proposer ValidatorInfo `json:"proposer"`
}

type EventDataCompleteProposal struct {
HeightRoundStep

BlockID BlockID `json:"block_id"`
kevlubkcm marked this conversation as resolved.
Show resolved Hide resolved
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want to add the ValidatorInfo to this event as well? Seems it would make sense

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had it there at first, but EventDataNewRound seems like the correct place to put it. Can add it back if you like.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No worries. Let's leave it as is for now - can always add it later.

}

type EventDataVote struct {
Vote *Vote
}
Expand Down