Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(consensus): improve consensus alghorithm #1329

Merged
merged 7 commits into from
Jun 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,9 @@ linters-settings:
- name: "get-return"
disabled: true

- name: "confusing-naming"
disabled: true

- name: "function-result-limit"
disabled: true

Expand Down
6 changes: 3 additions & 3 deletions committee/committee.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,18 +144,18 @@ func (c *committee) find(addr crypto.Address) *validator.Validator {

// IsProposer checks if the given address is the proposer for the specified round.
func (c *committee) IsProposer(addr crypto.Address, round int16) bool {
p := c.getProposer(round)
p := c.proposer(round)

return p.Address() == addr
}

// Proposer returns an instance of the proposer validator for the specified round.
// A cloned instance of the proposer is returned to avoid modification of the original object.
func (c *committee) Proposer(round int16) *validator.Validator {
return c.getProposer(round).Clone()
return c.proposer(round).Clone()
}

func (c *committee) getProposer(round int16) *validator.Validator {
func (c *committee) proposer(round int16) *validator.Validator {
pos := c.proposerPos
for i := 0; i < int(round); i++ {
pos = pos.Next
Expand Down
11 changes: 6 additions & 5 deletions consensus/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,15 @@ import "time"
type Config struct {
ChangeProposerTimeout time.Duration `toml:"-"`
ChangeProposerDelta time.Duration `toml:"-"`
QueryVoteTimeout time.Duration `toml:"-"`
MinimumAvailabilityScore float64 `toml:"-"`
}

func DefaultConfig() *Config {
return &Config{
ChangeProposerTimeout: 8 * time.Second,
ChangeProposerDelta: 4 * time.Second,
ChangeProposerTimeout: 5 * time.Second,
ChangeProposerDelta: 5 * time.Second,
QueryVoteTimeout: 5 * time.Second,
MinimumAvailabilityScore: 0.9,
}
}
Expand All @@ -38,7 +40,6 @@ func (conf *Config) BasicCheck() error {
}

func (conf *Config) CalculateChangeProposerTimeout(round int16) time.Duration {
return time.Duration(
conf.ChangeProposerTimeout.Milliseconds()+conf.ChangeProposerDelta.Milliseconds()*int64(round),
) * time.Millisecond
return conf.ChangeProposerTimeout +
conf.ChangeProposerDelta*time.Duration(round)
}
90 changes: 59 additions & 31 deletions consensus/consensus.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,11 +118,7 @@
cs.lk.Lock()
defer cs.lk.Unlock()

cs.doMoveToNewHeight()
if cs.active {
cs.queryProposal()
cs.queryVotes()
}
cs.moveToNewHeight()
}

func (cs *consensus) String() string {
Expand Down Expand Up @@ -184,10 +180,10 @@
cs.lk.Lock()
defer cs.lk.Unlock()

cs.doMoveToNewHeight()
cs.moveToNewHeight()
}

func (cs *consensus) doMoveToNewHeight() {
func (cs *consensus) moveToNewHeight() {
stateHeight := cs.bcState.LastBlockHeight()
if cs.height != stateHeight+1 {
cs.enterNewState(cs.newHeightState)
Expand All @@ -205,6 +201,23 @@
}()
}

func (cs *consensus) handleTimeout(t *ticker) {
cs.lk.Lock()
defer cs.lk.Unlock()

cs.logger.Trace("handle ticker", "ticker", t)

// Old tickers might be triggered now. Ignore them.
if cs.height != t.Height || cs.round != t.Round {
cs.logger.Trace("stale ticker", "ticker", t)

Check warning on line 212 in consensus/consensus.go

View check run for this annotation

Codecov / codecov/patch

consensus/consensus.go#L212

Added line #L212 was not covered by tests

return

Check warning on line 214 in consensus/consensus.go

View check run for this annotation

Codecov / codecov/patch

consensus/consensus.go#L214

Added line #L214 was not covered by tests
}

cs.logger.Debug("timer expired", "ticker", t)
cs.currentState.onTimeout(t)
}

func (cs *consensus) SetProposal(p *proposal.Proposal) {
cs.lk.Lock()
defer cs.lk.Unlock()
Expand All @@ -222,7 +235,13 @@
}

if p.Round() < cs.round {
cs.logger.Trace("expired round", "proposal", p)
cs.logger.Trace("proposal for expired round", "proposal", p)

return
}

if err := p.BasicCheck(); err != nil {
cs.logger.Warn("invalid proposal", "proposal", p, "error", err)

return
}
Expand Down Expand Up @@ -266,23 +285,6 @@
cs.currentState.onSetProposal(p)
}

func (cs *consensus) handleTimeout(t *ticker) {
cs.lk.Lock()
defer cs.lk.Unlock()

cs.logger.Trace("handle ticker", "ticker", t)

// Old tickers might be triggered now. Ignore them.
if cs.height != t.Height || cs.round != t.Round {
cs.logger.Trace("stale ticker", "ticker", t)

return
}

cs.logger.Debug("timer expired", "ticker", t)
cs.currentState.onTimeout(t)
}

func (cs *consensus) AddVote(v *vote.Vote) {
cs.lk.Lock()
defer cs.lk.Unlock()
Expand All @@ -299,6 +301,12 @@
return
}

if v.Round() < cs.round {
cs.logger.Trace("vote for expired round", "vote", v)

return
}

if v.Type() == vote.VoteTypeCPPreVote ||
v.Type() == vote.VoteTypeCPMainVote ||
v.Type() == vote.VoteTypeCPDecided {
Expand All @@ -318,13 +326,26 @@
cs.logger.Info("new vote added", "vote", v)

cs.currentState.onAddVote(v)

if v.Type() == vote.VoteTypeCPDecided {
if v.Round() > cs.round {
cs.changeProposer.cpDecide(v.Round(), v.CPValue())
}
}
}
}

func (cs *consensus) proposer(round int16) *validator.Validator {
return cs.bcState.Proposer(round)
}

func (cs *consensus) IsProposer() bool {
cs.lk.RLock()
defer cs.lk.RUnlock()

Check warning on line 344 in consensus/consensus.go

View check run for this annotation

Codecov / codecov/patch

consensus/consensus.go#L343-L344

Added lines #L343 - L344 were not covered by tests

return cs.isProposer()

Check warning on line 346 in consensus/consensus.go

View check run for this annotation

Codecov / codecov/patch

consensus/consensus.go#L346

Added line #L346 was not covered by tests
}

func (cs *consensus) isProposer() bool {
return cs.proposer(cs.round).Address() == cs.valKey.Address()
}
Expand Down Expand Up @@ -377,7 +398,7 @@

func (cs *consensus) queryProposal() {
cs.broadcaster(cs.valKey.Address(),
message.NewQueryProposalMessage(cs.height, cs.valKey.Address()))
message.NewQueryProposalMessage(cs.height, cs.round, cs.valKey.Address()))
}

// queryVotes is an anti-entropy mechanism to retrieve missed votes
Expand Down Expand Up @@ -462,14 +483,21 @@
defer cs.lk.RUnlock()

votes := []*vote.Vote{}
if round == cs.round {
switch {
case round < cs.round:
// Past round: Only broadcast cp:decided votes
vs := cs.log.CPDecidedVoteSet(round)
votes = append(votes, vs.AllVotes()...)

case round == cs.round:
// Current round
m := cs.log.RoundMessages(round)
votes = append(votes, m.AllVotes()...)
} else {
// Only broadcast cp:decided votes
vs := cs.log.CPDecidedVoteVoteSet(round)
votes = append(votes, vs.AllVotes()...)

case round > cs.round:
// Future round
}

if len(votes) == 0 {
return nil
}
Expand Down
32 changes: 23 additions & 9 deletions consensus/consensus_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,12 +57,12 @@ func testConfig() *Config {
return &Config{
ChangeProposerTimeout: 1 * time.Hour, // Disabling timers
ChangeProposerDelta: 1 * time.Hour, // Disabling timers
QueryVoteTimeout: 1 * time.Hour, // Disabling timers
}
}

func setup(t *testing.T) *testData {
t.Helper()
queryVoteInitialTimeout = 2 * time.Hour

return setupWithSeed(t, testsuite.GenerateSeed())
}
Expand All @@ -89,8 +89,10 @@ func setupWithSeed(t *testing.T, seed int64) *testData {
params := param.DefaultParams()
params.CommitteeSize = 4

// to prevent triggering timers before starting the tests to avoid double entries for new heights in some tests.
getTime := util.RoundNow(params.BlockIntervalInSecond).Add(time.Duration(params.BlockIntervalInSecond) * time.Second)
// To prevent triggering timers before starting the tests and
// avoid double entries for new heights in some tests.
getTime := util.RoundNow(params.BlockIntervalInSecond).
Add(time.Duration(params.BlockIntervalInSecond) * time.Second)
genDoc := genesis.MakeGenesis(getTime, accs, vals, params)
stX, err := state.LoadOrNewState(genDoc, []*bls.ValidatorKey{valKeys[tIndexX]},
store.MockingStore(ts), txPool, nil)
Expand Down Expand Up @@ -401,8 +403,7 @@ func TestStart(t *testing.T) {
td := setup(t)

td.consX.Start()
td.shouldPublishQueryProposal(t, td.consX, 1)
td.shouldPublishQueryVote(t, td.consX, 1, 0)
td.checkHeightRound(t, td.consX, 1, 0)
}

func TestNotInCommittee(t *testing.T) {
Expand Down Expand Up @@ -474,14 +475,14 @@ func TestConsensusAddVote(t *testing.T) {
v6, _ := td.GenerateTestPrepareVote(1, 0)
td.consP.AddVote(v6)

assert.True(t, td.consP.HasVote(v1.Hash())) // previous round
assert.True(t, td.consP.HasVote(v2.Hash())) // next round
assert.False(t, td.consP.HasVote(v1.Hash())) // previous round
assert.True(t, td.consP.HasVote(v2.Hash())) // next round
assert.True(t, td.consP.HasVote(v3.Hash()))
assert.True(t, td.consP.HasVote(v4.Hash()))
assert.False(t, td.consP.HasVote(v5.Hash())) // valid votes for the next height
assert.False(t, td.consP.HasVote(v6.Hash())) // invalid votes

assert.Equal(t, td.consP.AllVotes(), []*vote.Vote{v1, v3, v4})
assert.Equal(t, td.consP.AllVotes(), []*vote.Vote{v3, v4})
assert.NotContains(t, td.consP.AllVotes(), v2)
}

Expand Down Expand Up @@ -604,6 +605,9 @@ func TestPickRandomVote(t *testing.T) {

rndVote1 := td.consP.PickRandomVote(1)
assert.Equal(t, rndVote1.Type(), vote.VoteTypePrepare)

rndVote2 := td.consP.PickRandomVote(2)
assert.Nil(t, rndVote2)
}

func TestSetProposalFromPreviousRound(t *testing.T) {
Expand Down Expand Up @@ -724,7 +728,17 @@ func TestProposalWithBigRound(t *testing.T) {

p := td.makeProposal(t, 1, util.MaxInt16)
td.consP.SetProposal(p)
assert.Equal(t, td.consP.log.RoundProposal(util.MaxInt16), p)
assert.Nil(t, td.consP.Proposal())
}

func TestInvalidProposal(t *testing.T) {
td := setup(t)

td.enterNewHeight(td.consP)

p := td.makeProposal(t, 1, 0)
p.SetSignature(nil) // Make proposal invalid
td.consP.SetProposal(p)
assert.Nil(t, td.consP.Proposal())
}

Expand Down
17 changes: 7 additions & 10 deletions consensus/cp.go
Original file line number Diff line number Diff line change
Expand Up @@ -310,25 +310,22 @@ func (cp *changeProposer) checkJust(v *vote.Vote) error {
}
}

func (cp *changeProposer) strongTermination() {
cpDecided := cp.log.CPDecidedVoteVoteSet(cp.round)
func (cp *changeProposer) cpStrongTermination() {
cpDecided := cp.log.CPDecidedVoteSet(cp.round)
if cpDecided.HasAnyVoteFor(cp.cpRound, vote.CPValueNo) {
cp.cpDecide(vote.CPValueNo)
cp.cpDecide(cp.round, vote.CPValueNo)
} else if cpDecided.HasAnyVoteFor(cp.cpRound, vote.CPValueYes) {
cp.cpDecide(vote.CPValueYes)
cp.cpDecide(cp.round, vote.CPValueYes)
}
}

func (cp *changeProposer) cpDecide(cpValue vote.CPValue) {
func (cp *changeProposer) cpDecide(round int16, cpValue vote.CPValue) {
if cpValue == vote.CPValueYes {
cp.round++
cp.round = round + 1
cp.cpDecided = 1
cp.enterNewState(cp.proposeState)
} else if cpValue == vote.CPValueNo {
roundProposal := cp.log.RoundProposal(cp.round)
if roundProposal == nil {
cp.queryProposal()
}
cp.round = round
cp.cpDecided = 0
cp.enterNewState(cp.prepareState)
}
Expand Down
6 changes: 3 additions & 3 deletions consensus/cp_decide.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ func (s *cpDecideState) decide() {
QCert: cert,
}
s.signAddCPDecidedVote(hash.UndefHash, s.cpRound, vote.CPValueYes, just)
s.cpDecide(vote.CPValueYes)
s.cpDecide(s.round, vote.CPValueYes)
} else if cpMainVotes.HasQuorumVotesFor(s.cpRound, vote.CPValueNo) {
// decided for no and proceeds to the next round
s.logger.Info("binary agreement decided", "value", 0, "round", s.cpRound)
Expand All @@ -37,7 +37,7 @@ func (s *cpDecideState) decide() {
QCert: cert,
}
s.signAddCPDecidedVote(*s.cpWeakValidity, s.cpRound, vote.CPValueNo, just)
s.cpDecide(vote.CPValueNo)
s.cpDecide(s.round, vote.CPValueNo)
} else {
// conflicting votes
s.logger.Debug("conflicting main votes", "round", s.cpRound)
Expand All @@ -52,7 +52,7 @@ func (s *cpDecideState) onAddVote(v *vote.Vote) {
s.decide()
}

s.strongTermination()
s.cpStrongTermination()
}

func (*cpDecideState) name() string {
Expand Down
2 changes: 1 addition & 1 deletion consensus/cp_mainvote.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func (s *cpMainVoteState) onAddVote(v *vote.Vote) {
s.decide()
}

s.strongTermination()
s.cpStrongTermination()
}

func (*cpMainVoteState) name() string {
Expand Down
Loading
Loading