From 09c23e65ad77deb7d9d227653727e035732f192b Mon Sep 17 00:00:00 2001 From: Jaz Gulati Date: Thu, 19 Mar 2020 10:49:27 +1100 Subject: [PATCH 1/6] process: precommit timeout should be restarted regardless of step --- process/process.go | 19 +++++++------------ 1 file changed, 7 insertions(+), 12 deletions(-) diff --git a/process/process.go b/process/process.go index 8b75c44a..8c1e8f71 100644 --- a/process/process.go +++ b/process/process.go @@ -160,19 +160,14 @@ func (p *Process) Start() { p.broadcaster.Broadcast(resync) // Start the Process from previous state. - switch p.state.CurrentStep { - case StepNil, StepPropose: + if p.state.CurrentStep == StepNil || p.state.CurrentStep == StepPropose { p.startRound(p.state.CurrentRound) - case StepPrevote: - if numPrevotes >= 2*p.state.Prevotes.f+1 { - p.scheduleTimeoutPrevote(p.state.CurrentHeight, p.state.CurrentRound, p.timer.Timeout(StepPrevote, p.state.CurrentRound)) - } - case StepPrecommit: - if numPrecommits >= 2*p.state.Precommits.f+1 { - p.scheduleTimeoutPrecommit(p.state.CurrentHeight, p.state.CurrentRound, p.timer.Timeout(StepPrecommit, p.state.CurrentRound)) - } - default: - panic("unknown step value") + } + if numPrevotes >= 2*p.state.Prevotes.f+1 && p.state.CurrentStep == StepPrevote { + p.scheduleTimeoutPrevote(p.state.CurrentHeight, p.state.CurrentRound, p.timer.Timeout(StepPrevote, p.state.CurrentRound)) + } + if numPrecommits >= 2*p.state.Precommits.f+1 { + p.scheduleTimeoutPrecommit(p.state.CurrentHeight, p.state.CurrentRound, p.timer.Timeout(StepPrecommit, p.state.CurrentRound)) } } From 7b50fe7232608dbcf810bbecccbf8649587e5ee6 Mon Sep 17 00:00:00 2001 From: Jaz Gulati Date: Thu, 19 Mar 2020 10:50:34 +1100 Subject: [PATCH 2/6] process: fix n calculation --- process/process.go | 16 +++++++++++++--- 1 file changed, 13 insertions(+), 3 deletions(-) diff --git a/process/process.go b/process/process.go index 8c1e8f71..bde757d1 100644 --- a/process/process.go +++ b/process/process.go @@ -295,7 +295,7 @@ func (p *Process) handlePropose(propose *Propose) { p.syncLatestCommit(propose.latestCommit) p.logger.Debugf("received propose at height=%v and round=%v", propose.height, propose.round) - n, firstTime, _, _, _ := p.state.Proposals.Insert(propose) + _, firstTime, _, _, _ := p.state.Proposals.Insert(propose) // upon Propose{currentHeight, currentRound, block, -1} if propose.Height() == p.state.CurrentHeight && propose.Round() == p.state.CurrentRound && propose.ValidRound() == block.InvalidRound { @@ -329,6 +329,7 @@ func (p *Process) handlePropose(propose *Propose) { } // upon f+1 *{currentHeight, round, *, *} and round > currentRound + n := p.numberOfMessagesAtCurrentHeight(propose.Round()) if n > p.state.Prevotes.F() && propose.Height() == p.state.CurrentHeight && propose.Round() > p.state.CurrentRound { p.startRound(propose.Round()) } @@ -346,7 +347,7 @@ func (p *Process) handlePrevote(prevote *Prevote) { prevoteDebugStr = prevote.blockHash.String() } p.logger.Debugf("received prevote=%v at height=%v and round=%v", prevoteDebugStr, prevote.height, prevote.round) - n, _, _, firstTimeExceeding2F, firstTimeExceeding2FOnBlockHash := p.state.Prevotes.Insert(prevote) + _, _, _, firstTimeExceeding2F, firstTimeExceeding2FOnBlockHash := p.state.Prevotes.Insert(prevote) if firstTimeExceeding2F && prevote.Height() == p.state.CurrentHeight && prevote.Round() == p.state.CurrentRound && p.state.CurrentStep == StepPrevote { // upon 2f+1 Prevote{currentHeight, currentRound, *} while step = StepPrevote for the first time p.scheduleTimeoutPrevote(p.state.CurrentHeight, p.state.CurrentRound, p.timer.Timeout(StepPrevote, p.state.CurrentRound)) @@ -373,6 +374,7 @@ func (p *Process) handlePrevote(prevote *Prevote) { } // upon f+1 *{currentHeight, round, *, *} and round > currentRound + n := p.numberOfMessagesAtCurrentHeight(prevote.Round()) if n > p.state.Prevotes.F() && prevote.Height() == p.state.CurrentHeight && prevote.Round() > p.state.CurrentRound { p.startRound(prevote.Round()) } @@ -390,12 +392,13 @@ func (p *Process) handlePrecommit(precommit *Precommit) { } p.logger.Debugf("received precommit=%v at height=%v and round=%v", precommitDebugStr, precommit.height, precommit.round) // upon 2f+1 Precommit{currentHeight, currentRound, *} for the first time - n, _, _, firstTimeExceeding2F, _ := p.state.Precommits.Insert(precommit) + _, _, _, firstTimeExceeding2F, _ := p.state.Precommits.Insert(precommit) if firstTimeExceeding2F && precommit.Height() == p.state.CurrentHeight && precommit.Round() == p.state.CurrentRound { p.scheduleTimeoutPrecommit(p.state.CurrentHeight, p.state.CurrentRound, p.timer.Timeout(StepPrecommit, p.state.CurrentRound)) } // upon f+1 *{currentHeight, round, *, *} and round > currentRound + n := p.numberOfMessagesAtCurrentHeight(precommit.Round()) if n > p.state.Precommits.F() && precommit.Height() == p.state.CurrentHeight && precommit.Round() > p.state.CurrentRound { p.startRound(precommit.Round()) } @@ -589,6 +592,13 @@ func (p *Process) checkProposeInCurrentHeightWithPrecommits(round block.Round) { } } +func (p *Process) numberOfMessagesAtCurrentHeight(round block.Round) int { + numUniqueProposals := p.state.Proposals.QueryByHeightRound(p.state.CurrentHeight, round) + numUniquePrevotes := p.state.Prevotes.QueryByHeightRound(p.state.CurrentHeight, round) + numUniquePrecommits := p.state.Precommits.QueryByHeightRound(p.state.CurrentHeight, round) + return numUniqueProposals + numUniquePrevotes + numUniquePrecommits +} + func (p *Process) syncLatestCommit(latestCommit LatestCommit) { // Check that the latest commit is from the future if latestCommit.Block.Header().Height() <= p.state.CurrentHeight { From 273f1865cf0d82fdc4f9d1101265f0db10ea2a1e Mon Sep 17 00:00:00 2001 From: Jaz Gulati Date: Thu, 19 Mar 2020 10:50:57 +1100 Subject: [PATCH 3/6] process: resend prevote from valid round in case of missed messages --- process/process.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/process/process.go b/process/process.go index bde757d1..dc8f1a98 100644 --- a/process/process.go +++ b/process/process.go @@ -328,6 +328,12 @@ func (p *Process) handlePropose(propose *Propose) { } } + // Resend our prevote from the valid round in case of missed messages. + if propose.ValidRound() > block.InvalidRound { + prevote := p.state.Prevotes.QueryByHeightRoundSignatory(propose.Height(), propose.ValidRound(), p.signatory) + p.broadcaster.Broadcast(prevote) + } + // upon f+1 *{currentHeight, round, *, *} and round > currentRound n := p.numberOfMessagesAtCurrentHeight(propose.Round()) if n > p.state.Prevotes.F() && propose.Height() == p.state.CurrentHeight && propose.Round() > p.state.CurrentRound { From 12cbe76f79a87e0256b5bc6e5246e0871e2d00a4 Mon Sep 17 00:00:00 2001 From: Jaz Gulati Date: Thu, 19 Mar 2020 11:48:52 +1100 Subject: [PATCH 4/6] process_test: remove deprecated test --- process/process_test.go | 9 --------- 1 file changed, 9 deletions(-) diff --git a/process/process_test.go b/process/process_test.go index 725a5150..c9e98fb3 100644 --- a/process/process_test.go +++ b/process/process_test.go @@ -766,15 +766,6 @@ var _ = Describe("Process", func() { Expect(ok).Should(BeTrue()) }) - It("should panic if the step is invalid", func() { - processOrigin := NewProcessOrigin(100) - processOrigin.State.CurrentStep = Step(100) - process := processOrigin.ToProcess() - Expect(func() { - process.Start() - }).Should(Panic()) - }) - Context("when the process has messages from a previous height", func() { It("should resend the most recent proposal, prevote, and precommit", func() { processOrigin := NewProcessOrigin(100) From fb14926b7650f8ef3eb8a76dc82dd0a9c2dd8412 Mon Sep 17 00:00:00 2001 From: Jaz Gulati Date: Thu, 19 Mar 2020 11:58:59 +1100 Subject: [PATCH 5/6] process: only resend our prevote if it is non-nil --- process/process.go | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/process/process.go b/process/process.go index dc8f1a98..8fd0225f 100644 --- a/process/process.go +++ b/process/process.go @@ -328,10 +328,13 @@ func (p *Process) handlePropose(propose *Propose) { } } - // Resend our prevote from the valid round in case of missed messages. + // Resend our prevote from the valid round if it exists in case of missed + // messages. if propose.ValidRound() > block.InvalidRound { prevote := p.state.Prevotes.QueryByHeightRoundSignatory(propose.Height(), propose.ValidRound(), p.signatory) - p.broadcaster.Broadcast(prevote) + if prevote != nil { + p.broadcaster.Broadcast(prevote) + } } // upon f+1 *{currentHeight, round, *, *} and round > currentRound From 7e85aefedaa2e8b30fe199140eafc84c84b4c804 Mon Sep 17 00:00:00 2001 From: Jaz Gulati Date: Thu, 19 Mar 2020 12:39:50 +1100 Subject: [PATCH 6/6] process_test: test resending prevotes for old rounds --- process/process_test.go | 58 +++++++++++++++++++++++++++++++++-------- 1 file changed, 47 insertions(+), 11 deletions(-) diff --git a/process/process_test.go b/process/process_test.go index c9e98fb3..1c9ea47a 100644 --- a/process/process_test.go +++ b/process/process_test.go @@ -66,7 +66,7 @@ var _ = Describe("Process", func() { Context("when a new process is initialized", func() { Context("when the process is the proposer", func() { - Context("when validBlock is nil", func() { + Context("when the valid block is nil", func() { It("should propose a block generated proposer and broadcast it", func() { // Init a default process to be modified processOrigin := NewProcessOrigin(100) @@ -87,7 +87,7 @@ var _ = Describe("Process", func() { }) }) - Context("when validBlock isn't nil", func() { + Context("when the valid block is not nil", func() { It("should propose the valid block we have and broadcast it", func() { // Init a default process to be modified processOrigin := NewProcessOrigin(100) @@ -107,24 +107,25 @@ var _ = Describe("Process", func() { }) Context("when the process is not proposer", func() { - Context("when receive a propose from the proposer before the timeout expire", func() { + Context("when we receive a propose from the proposer before the timeout expires", func() { Context("when the block is valid", func() { It("should broadcast a prevote to the proposal", func() { - // Init a default process to be modified + // Initialise a default process. processOrigin := NewProcessOrigin(100) - // Replace the scheduler and start the process + // Replace the scheduler and start the process. privateKey := newEcdsaKey() scheduler := NewMockScheduler(id.NewSignatory(privateKey.PublicKey)) processOrigin.Scheduler = scheduler process := processOrigin.ToProcess() - // Generate a valid proposal + // Generate a valid proposal. message := NewPropose(1, 0, RandomBlock(block.Standard), block.InvalidRound) Expect(Sign(message, *privateKey)).NotTo(HaveOccurred()) process.HandleMessage(message) - // Expect the proposer broadcast a propose message with zero height and round + // Expect the proposer broadcasts a propose message with + // zero height and round. var propose Message Eventually(processOrigin.BroadcastMessages).Should(Receive(&propose)) proposal, ok := propose.(*Prevote) @@ -136,22 +137,23 @@ var _ = Describe("Process", func() { Context("when the block is invalid", func() { It("should broadcast a nil prevote", func() { - // Init a default process to be modified + // Initialise a default process. processOrigin := NewProcessOrigin(100) - // Replace the broadcaster and start the process + // Replace the broadcaster and start the process. privateKey := newEcdsaKey() scheduler := NewMockScheduler(id.NewSignatory(privateKey.PublicKey)) processOrigin.Scheduler = scheduler processOrigin.Validator = NewMockValidator(fmt.Errorf("")) process := processOrigin.ToProcess() - // Generate a invalid proposal + // Generate an invalid proposal. message := NewPropose(1, 0, RandomBlock(block.Standard), block.InvalidRound) Expect(Sign(message, *privateKey)).NotTo(HaveOccurred()) process.HandleMessage(message) - // Expect the proposer broadcast a propose message with zero height and round + // Ensure we receive a propose message with the zero + // height and round. var propose Message Eventually(processOrigin.BroadcastMessages).Should(Receive(&propose)) proposal, ok := propose.(*Prevote) @@ -160,6 +162,40 @@ var _ = Describe("Process", func() { Expect(proposal.Round()).Should(BeZero()) }) }) + + Context("when the valid block is not nil", func() { + It("should broadcast our prevote from that round", func() { + // Initialise a default process. + processOrigin := NewProcessOrigin(100) + + // Replace the broadcaster. + privateKey := newEcdsaKey() + scheduler := NewMockScheduler(id.NewSignatory(privateKey.PublicKey)) + processOrigin.Scheduler = scheduler + processOrigin.Validator = NewMockValidator(fmt.Errorf("")) + + // Insert a prevote for the valid round as this is the + // message we will be expected to resend later. + validRound := RandomRound() + prevote := NewPrevote(1, validRound, RandomBlock(block.Standard).Hash(), nil) + Expect(Sign(prevote, *processOrigin.PrivateKey)).ShouldNot(HaveOccurred()) + processOrigin.State.Prevotes.Insert(prevote) + + // Start the process. + process := processOrigin.ToProcess() + + // Generate a valid proposal with a valid round. + propose := NewPropose(1, 0, RandomBlock(block.Standard), validRound) + Expect(Sign(propose, *privateKey)).NotTo(HaveOccurred()) + process.HandleMessage(propose) + + // Ensure we broadcast a prevote message for the valid + // round. + Eventually(processOrigin.BroadcastMessages).Should(Receive(&prevote)) + Expect(prevote.Height()).Should(Equal(block.Height(1))) + Expect(prevote.Round()).Should(Equal(validRound)) + }) + }) }) Context("when we do not receive a propose during the timeout", func() {