diff --git a/process/process.go b/process/process.go index 8b75c44a..8fd0225f 100644 --- a/process/process.go +++ b/process/process.go @@ -160,19 +160,14 @@ func (p *Process) Start() { p.broadcaster.Broadcast(resync) // Start the Process from previous state. - switch p.state.CurrentStep { - case StepNil, StepPropose: + if p.state.CurrentStep == StepNil || p.state.CurrentStep == StepPropose { p.startRound(p.state.CurrentRound) - case StepPrevote: - if numPrevotes >= 2*p.state.Prevotes.f+1 { - p.scheduleTimeoutPrevote(p.state.CurrentHeight, p.state.CurrentRound, p.timer.Timeout(StepPrevote, p.state.CurrentRound)) - } - case StepPrecommit: - if numPrecommits >= 2*p.state.Precommits.f+1 { - p.scheduleTimeoutPrecommit(p.state.CurrentHeight, p.state.CurrentRound, p.timer.Timeout(StepPrecommit, p.state.CurrentRound)) - } - default: - panic("unknown step value") + } + if numPrevotes >= 2*p.state.Prevotes.f+1 && p.state.CurrentStep == StepPrevote { + p.scheduleTimeoutPrevote(p.state.CurrentHeight, p.state.CurrentRound, p.timer.Timeout(StepPrevote, p.state.CurrentRound)) + } + if numPrecommits >= 2*p.state.Precommits.f+1 { + p.scheduleTimeoutPrecommit(p.state.CurrentHeight, p.state.CurrentRound, p.timer.Timeout(StepPrecommit, p.state.CurrentRound)) } } @@ -300,7 +295,7 @@ func (p *Process) handlePropose(propose *Propose) { p.syncLatestCommit(propose.latestCommit) p.logger.Debugf("received propose at height=%v and round=%v", propose.height, propose.round) - n, firstTime, _, _, _ := p.state.Proposals.Insert(propose) + _, firstTime, _, _, _ := p.state.Proposals.Insert(propose) // upon Propose{currentHeight, currentRound, block, -1} if propose.Height() == p.state.CurrentHeight && propose.Round() == p.state.CurrentRound && propose.ValidRound() == block.InvalidRound { @@ -333,7 +328,17 @@ func (p *Process) handlePropose(propose *Propose) { } } + // Resend our prevote from the valid round if it exists in case of missed + // messages. + if propose.ValidRound() > block.InvalidRound { + prevote := p.state.Prevotes.QueryByHeightRoundSignatory(propose.Height(), propose.ValidRound(), p.signatory) + if prevote != nil { + p.broadcaster.Broadcast(prevote) + } + } + // upon f+1 *{currentHeight, round, *, *} and round > currentRound + n := p.numberOfMessagesAtCurrentHeight(propose.Round()) if n > p.state.Prevotes.F() && propose.Height() == p.state.CurrentHeight && propose.Round() > p.state.CurrentRound { p.startRound(propose.Round()) } @@ -351,7 +356,7 @@ func (p *Process) handlePrevote(prevote *Prevote) { prevoteDebugStr = prevote.blockHash.String() } p.logger.Debugf("received prevote=%v at height=%v and round=%v", prevoteDebugStr, prevote.height, prevote.round) - n, _, _, firstTimeExceeding2F, firstTimeExceeding2FOnBlockHash := p.state.Prevotes.Insert(prevote) + _, _, _, firstTimeExceeding2F, firstTimeExceeding2FOnBlockHash := p.state.Prevotes.Insert(prevote) if firstTimeExceeding2F && prevote.Height() == p.state.CurrentHeight && prevote.Round() == p.state.CurrentRound && p.state.CurrentStep == StepPrevote { // upon 2f+1 Prevote{currentHeight, currentRound, *} while step = StepPrevote for the first time p.scheduleTimeoutPrevote(p.state.CurrentHeight, p.state.CurrentRound, p.timer.Timeout(StepPrevote, p.state.CurrentRound)) @@ -378,6 +383,7 @@ func (p *Process) handlePrevote(prevote *Prevote) { } // upon f+1 *{currentHeight, round, *, *} and round > currentRound + n := p.numberOfMessagesAtCurrentHeight(prevote.Round()) if n > p.state.Prevotes.F() && prevote.Height() == p.state.CurrentHeight && prevote.Round() > p.state.CurrentRound { p.startRound(prevote.Round()) } @@ -395,12 +401,13 @@ func (p *Process) handlePrecommit(precommit *Precommit) { } p.logger.Debugf("received precommit=%v at height=%v and round=%v", precommitDebugStr, precommit.height, precommit.round) // upon 2f+1 Precommit{currentHeight, currentRound, *} for the first time - n, _, _, firstTimeExceeding2F, _ := p.state.Precommits.Insert(precommit) + _, _, _, firstTimeExceeding2F, _ := p.state.Precommits.Insert(precommit) if firstTimeExceeding2F && precommit.Height() == p.state.CurrentHeight && precommit.Round() == p.state.CurrentRound { p.scheduleTimeoutPrecommit(p.state.CurrentHeight, p.state.CurrentRound, p.timer.Timeout(StepPrecommit, p.state.CurrentRound)) } // upon f+1 *{currentHeight, round, *, *} and round > currentRound + n := p.numberOfMessagesAtCurrentHeight(precommit.Round()) if n > p.state.Precommits.F() && precommit.Height() == p.state.CurrentHeight && precommit.Round() > p.state.CurrentRound { p.startRound(precommit.Round()) } @@ -594,6 +601,13 @@ func (p *Process) checkProposeInCurrentHeightWithPrecommits(round block.Round) { } } +func (p *Process) numberOfMessagesAtCurrentHeight(round block.Round) int { + numUniqueProposals := p.state.Proposals.QueryByHeightRound(p.state.CurrentHeight, round) + numUniquePrevotes := p.state.Prevotes.QueryByHeightRound(p.state.CurrentHeight, round) + numUniquePrecommits := p.state.Precommits.QueryByHeightRound(p.state.CurrentHeight, round) + return numUniqueProposals + numUniquePrevotes + numUniquePrecommits +} + func (p *Process) syncLatestCommit(latestCommit LatestCommit) { // Check that the latest commit is from the future if latestCommit.Block.Header().Height() <= p.state.CurrentHeight { diff --git a/process/process_test.go b/process/process_test.go index 725a5150..1c9ea47a 100644 --- a/process/process_test.go +++ b/process/process_test.go @@ -66,7 +66,7 @@ var _ = Describe("Process", func() { Context("when a new process is initialized", func() { Context("when the process is the proposer", func() { - Context("when validBlock is nil", func() { + Context("when the valid block is nil", func() { It("should propose a block generated proposer and broadcast it", func() { // Init a default process to be modified processOrigin := NewProcessOrigin(100) @@ -87,7 +87,7 @@ var _ = Describe("Process", func() { }) }) - Context("when validBlock isn't nil", func() { + Context("when the valid block is not nil", func() { It("should propose the valid block we have and broadcast it", func() { // Init a default process to be modified processOrigin := NewProcessOrigin(100) @@ -107,24 +107,25 @@ var _ = Describe("Process", func() { }) Context("when the process is not proposer", func() { - Context("when receive a propose from the proposer before the timeout expire", func() { + Context("when we receive a propose from the proposer before the timeout expires", func() { Context("when the block is valid", func() { It("should broadcast a prevote to the proposal", func() { - // Init a default process to be modified + // Initialise a default process. processOrigin := NewProcessOrigin(100) - // Replace the scheduler and start the process + // Replace the scheduler and start the process. privateKey := newEcdsaKey() scheduler := NewMockScheduler(id.NewSignatory(privateKey.PublicKey)) processOrigin.Scheduler = scheduler process := processOrigin.ToProcess() - // Generate a valid proposal + // Generate a valid proposal. message := NewPropose(1, 0, RandomBlock(block.Standard), block.InvalidRound) Expect(Sign(message, *privateKey)).NotTo(HaveOccurred()) process.HandleMessage(message) - // Expect the proposer broadcast a propose message with zero height and round + // Expect the proposer broadcasts a propose message with + // zero height and round. var propose Message Eventually(processOrigin.BroadcastMessages).Should(Receive(&propose)) proposal, ok := propose.(*Prevote) @@ -136,22 +137,23 @@ var _ = Describe("Process", func() { Context("when the block is invalid", func() { It("should broadcast a nil prevote", func() { - // Init a default process to be modified + // Initialise a default process. processOrigin := NewProcessOrigin(100) - // Replace the broadcaster and start the process + // Replace the broadcaster and start the process. privateKey := newEcdsaKey() scheduler := NewMockScheduler(id.NewSignatory(privateKey.PublicKey)) processOrigin.Scheduler = scheduler processOrigin.Validator = NewMockValidator(fmt.Errorf("")) process := processOrigin.ToProcess() - // Generate a invalid proposal + // Generate an invalid proposal. message := NewPropose(1, 0, RandomBlock(block.Standard), block.InvalidRound) Expect(Sign(message, *privateKey)).NotTo(HaveOccurred()) process.HandleMessage(message) - // Expect the proposer broadcast a propose message with zero height and round + // Ensure we receive a propose message with the zero + // height and round. var propose Message Eventually(processOrigin.BroadcastMessages).Should(Receive(&propose)) proposal, ok := propose.(*Prevote) @@ -160,6 +162,40 @@ var _ = Describe("Process", func() { Expect(proposal.Round()).Should(BeZero()) }) }) + + Context("when the valid block is not nil", func() { + It("should broadcast our prevote from that round", func() { + // Initialise a default process. + processOrigin := NewProcessOrigin(100) + + // Replace the broadcaster. + privateKey := newEcdsaKey() + scheduler := NewMockScheduler(id.NewSignatory(privateKey.PublicKey)) + processOrigin.Scheduler = scheduler + processOrigin.Validator = NewMockValidator(fmt.Errorf("")) + + // Insert a prevote for the valid round as this is the + // message we will be expected to resend later. + validRound := RandomRound() + prevote := NewPrevote(1, validRound, RandomBlock(block.Standard).Hash(), nil) + Expect(Sign(prevote, *processOrigin.PrivateKey)).ShouldNot(HaveOccurred()) + processOrigin.State.Prevotes.Insert(prevote) + + // Start the process. + process := processOrigin.ToProcess() + + // Generate a valid proposal with a valid round. + propose := NewPropose(1, 0, RandomBlock(block.Standard), validRound) + Expect(Sign(propose, *privateKey)).NotTo(HaveOccurred()) + process.HandleMessage(propose) + + // Ensure we broadcast a prevote message for the valid + // round. + Eventually(processOrigin.BroadcastMessages).Should(Receive(&prevote)) + Expect(prevote.Height()).Should(Equal(block.Height(1))) + Expect(prevote.Round()).Should(Equal(validRound)) + }) + }) }) Context("when we do not receive a propose during the timeout", func() { @@ -766,15 +802,6 @@ var _ = Describe("Process", func() { Expect(ok).Should(BeTrue()) }) - It("should panic if the step is invalid", func() { - processOrigin := NewProcessOrigin(100) - processOrigin.State.CurrentStep = Step(100) - process := processOrigin.ToProcess() - Expect(func() { - process.Start() - }).Should(Panic()) - }) - Context("when the process has messages from a previous height", func() { It("should resend the most recent proposal, prevote, and precommit", func() { processOrigin := NewProcessOrigin(100)