Skip to content

Commit

Permalink
fix: prevent data race conditions in committee (#452)
Browse files Browse the repository at this point in the history
* fix: fixing a data race condition

* test: adding test for validator consistency

* chore: removing unused codes

* fix: removing unnecessary locks

* test: fixing deadlock in consensus mock
  • Loading branch information
b00f committed May 6, 2023
1 parent b82e18f commit b91f686
Show file tree
Hide file tree
Showing 10 changed files with 155 additions and 148 deletions.
67 changes: 29 additions & 38 deletions committee/committee.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"container/list"
"fmt"
"sort"
"sync"

"github.com/pactus-project/pactus/crypto"
"github.com/pactus-project/pactus/types/validator"
Expand All @@ -14,21 +13,25 @@ import (
var _ Committee = &committee{}

type committee struct {
lk sync.RWMutex

committeeSize int
validatorList *list.List
proposerPos *list.Element
}

func cloneValidator(val *validator.Validator) *validator.Validator {
cloned := new(validator.Validator)
*cloned = *val
return cloned
}

func NewCommittee(validators []*validator.Validator, committeeSize int,
proposerAddress crypto.Address) (Committee, error) {
validatorList := list.New()
var proposerPos *list.Element

for _, v := range validators {
el := validatorList.PushBack(v)
if v.Address().EqualsTo(proposerAddress) {
for _, val := range validators {
el := validatorList.PushBack(cloneValidator(val))
if val.Address().EqualsTo(proposerAddress) {
proposerPos = el
}
}
Expand All @@ -45,9 +48,6 @@ func NewCommittee(validators []*validator.Validator, committeeSize int,
}

func (c *committee) TotalPower() int64 {
c.lk.RLock()
defer c.lk.RUnlock()

p := int64(0)
c.iterate(func(v *validator.Validator) (stop bool) {
p += v.Power()
Expand All @@ -57,24 +57,30 @@ func (c *committee) TotalPower() int64 {
}

func (c *committee) Update(lastRound int16, joined []*validator.Validator) {
c.lk.Lock()
defer c.lk.Unlock()

sort.SliceStable(joined, func(i, j int) bool {
return joined[i].Number() < joined[j].Number()
})

// First update validator list
// First update the validator list
for _, val := range joined {
committeeVal := c.find(val.Address())
if committeeVal == nil {
c.validatorList.InsertBefore(val, c.proposerPos)
c.validatorList.InsertBefore(cloneValidator(val), c.proposerPos)
} else {
*committeeVal = *val
committeeVal.UpdateLastJoinedHeight(val.LastJoinedHeight())

// Ensure that a validator's stake and bonding properties
// remain unchanged while they are part of the committee.
// Refer to the Bond executor for additional details.
if committeeVal.Stake() != val.Stake() ||
committeeVal.LastBondingHeight() != val.LastBondingHeight() ||
committeeVal.UnbondingHeight() != val.UnbondingHeight() {
panic("validators within the committee must be consistent")
}
}
}

// Now adjust the list
// Now, adjust the list
oldestFirst := make([]*list.Element, c.validatorList.Len())
i := 0
for e := c.validatorList.Front(); e != nil; e = e.Next() {
Expand Down Expand Up @@ -106,14 +112,13 @@ func (c *committee) Update(lastRound int16, joined []*validator.Validator) {
}
}

// Validators retrieves a list of all validators in the committee.
// A cloned instance of each validator is returned to avoid modification of the original objects.
func (c *committee) Validators() []*validator.Validator {
c.lk.Lock()
defer c.lk.Unlock()

vals := make([]*validator.Validator, c.validatorList.Len())
i := 0
c.iterate(func(v *validator.Validator) (stop bool) {
vals[i] = v
vals[i] = cloneValidator(v)
i++
return false
})
Expand All @@ -122,9 +127,6 @@ func (c *committee) Validators() []*validator.Validator {
}

func (c *committee) Contains(addr crypto.Address) bool {
c.lk.Lock()
defer c.lk.Unlock()

return c.find(addr) != nil
}

Expand All @@ -140,21 +142,16 @@ func (c *committee) find(addr crypto.Address) *validator.Validator {
return found
}

// IsProposer checks if the address is proposer for this height at this round.
// IsProposer checks if the given address is the proposer for the specified round.
func (c *committee) IsProposer(addr crypto.Address, round int16) bool {
c.lk.Lock()
defer c.lk.Unlock()

p := c.proposer(round)
return p.Address().EqualsTo(addr)
}

// Proposer returns proposer info for this height at this round.
// Proposer returns an instance of the proposer validator for the specified round.
// A cloned instance of the proposer is returned to avoid modification of the original object.
func (c *committee) Proposer(round int16) *validator.Validator {
c.lk.Lock()
defer c.lk.Unlock()

return c.proposer(round)
return cloneValidator(c.proposer(round))
}

func (c *committee) proposer(round int16) *validator.Validator {
Expand All @@ -170,9 +167,6 @@ func (c *committee) proposer(round int16) *validator.Validator {
}

func (c *committee) Committers() []int32 {
c.lk.RLock()
defer c.lk.RUnlock()

committers := make([]int32, c.validatorList.Len())
i := 0
c.iterate(func(v *validator.Validator) (stop bool) {
Expand All @@ -185,9 +179,6 @@ func (c *committee) Committers() []int32 {
}

func (c *committee) Size() int {
c.lk.RLock()
defer c.lk.RUnlock()

return c.validatorList.Len()
}

Expand Down
115 changes: 54 additions & 61 deletions committee/committee_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,6 @@ import (
"github.com/stretchr/testify/assert"
)

func copyValidator(val *validator.Validator) *validator.Validator {
clone := validator.NewValidator(val.PublicKey(), val.Number())
clone.UpdateLastBondingHeight(val.LastBondingHeight())
clone.UpdateLastJoinedHeight(val.LastJoinedHeight())
clone.UpdateUnbondingHeight(val.UnbondingHeight())

// Stake can't be changes as long as validator is inside committee
// Check Bond executor
return clone
}
func TestContains(t *testing.T) {
committee, signers := GenerateTestCommittee(21)
nonExist := crypto.GenerateTestAddress()
Expand All @@ -28,14 +18,14 @@ func TestContains(t *testing.T) {
}

func TestProposer(t *testing.T) {
committee, signers := GenerateTestCommittee(4)
committee, _ := GenerateTestCommittee(4)

assert.Equal(t, committee.Proposer(0).Address(), signers[0].Address())
assert.Equal(t, committee.Proposer(3).Address(), signers[3].Address())
assert.Equal(t, committee.Proposer(4).Address(), signers[0].Address())
assert.Equal(t, committee.Proposer(0).Number(), int32(0))
assert.Equal(t, committee.Proposer(3).Number(), int32(3))
assert.Equal(t, committee.Proposer(4).Number(), int32(0))

committee.Update(0, nil)
assert.Equal(t, committee.Proposer(0).Address(), signers[1].Address())
assert.Equal(t, committee.Proposer(0).Number(), int32(1))
}

func TestInvalidProposerJoinAndLeave(t *testing.T) {
Expand Down Expand Up @@ -91,6 +81,26 @@ func TestProposerMove(t *testing.T) {
assert.Equal(t, committee.Proposer(0).Number(), int32(1))
}

func TestValidatorConsistency(t *testing.T) {
val1, _ := validator.GenerateTestValidator(0)
val2, _ := validator.GenerateTestValidator(1)
val3, _ := validator.GenerateTestValidator(2)
val4, _ := validator.GenerateTestValidator(3)

committee, _ := NewCommittee([]*validator.Validator{val1, val2, val3, val4}, 4, val1.Address())

t.Run("Updating validators' stake, Should panic", func(t *testing.T) {
defer func() {
if r := recover(); r == nil {
t.Errorf("The code did not panic")
}
}()

val1.AddToStake(1)
committee.Update(0, []*validator.Validator{val1})
})
}

func TestProposerJoin(t *testing.T) {
pub1, _ := bls.GenerateTestKeyPair()
pub2, _ := bls.GenerateTestKeyPair()
Expand Down Expand Up @@ -121,17 +131,15 @@ func TestProposerJoin(t *testing.T) {

// Height 1000
// Val1 is in the committee
val2Copy := copyValidator(val2)
val2Copy.UpdateLastJoinedHeight(1000)
committee.Update(0, []*validator.Validator{val2Copy})
val2.UpdateLastJoinedHeight(1000)
committee.Update(0, []*validator.Validator{val2})
assert.Equal(t, committee.Proposer(0).Number(), int32(2))
assert.Equal(t, committee.Committers(), []int32{1, 2, 3, 4})
assert.Equal(t, committee.Size(), 4)

// Height 1001
val5Copy := copyValidator(val5)
val5Copy.UpdateLastJoinedHeight(1001)
committee.Update(0, []*validator.Validator{val5Copy})
val5.UpdateLastJoinedHeight(1001)
committee.Update(0, []*validator.Validator{val5})
assert.Equal(t, committee.Proposer(0).Number(), int32(3))
assert.Equal(t, committee.Proposer(1).Number(), int32(4))
assert.Equal(t, committee.Committers(), []int32{1, 5, 2, 3, 4})
Expand All @@ -143,13 +151,10 @@ func TestProposerJoin(t *testing.T) {
assert.Equal(t, committee.Proposer(1).Number(), int32(5))

// Height 1003
val3Copy := copyValidator(val3)
val6Copy := copyValidator(val6)
val7Copy := copyValidator(val7)
val3Copy.UpdateLastJoinedHeight(1003)
val6Copy.UpdateLastJoinedHeight(1003)
val7Copy.UpdateLastJoinedHeight(1003)
committee.Update(1, []*validator.Validator{val7Copy, val3Copy, val6Copy})
val3.UpdateLastJoinedHeight(1003)
val6.UpdateLastJoinedHeight(1003)
val7.UpdateLastJoinedHeight(1003)
committee.Update(1, []*validator.Validator{val7, val3, val6})
assert.Equal(t, committee.Proposer(0).Number(), int32(2))
assert.Equal(t, committee.Committers(), []int32{6, 7, 1, 5, 2, 3, 4})
assert.Equal(t, committee.Size(), 7)
Expand Down Expand Up @@ -226,65 +231,53 @@ func TestProposerJoinAndLeave(t *testing.T) {
// +-+-+-+-+-+-+-+ +-+-+-+-+-+-+-+ +-+-+-+-+-+-+-+ +-+-+-+-+-+-+-+

// Height 1001
val8Copy := copyValidator(val8)
val8Copy.UpdateLastJoinedHeight(1001)
committee.Update(0, []*validator.Validator{val8Copy})
val8.UpdateLastJoinedHeight(1001)
committee.Update(0, []*validator.Validator{val8})
assert.Equal(t, committee.Proposer(0).Number(), int32(2))
assert.Equal(t, committee.Proposer(1).Number(), int32(3))
assert.Equal(t, committee.Proposer(2).Number(), int32(4))
assert.Equal(t, committee.Committers(), []int32{8, 2, 3, 4, 5, 6, 7})

// Height 1002
val3Copy := copyValidator(val3)
val3Copy.UpdateLastJoinedHeight(1002)
committee.Update(3, []*validator.Validator{val3Copy})
val3.UpdateLastJoinedHeight(1002)
committee.Update(3, []*validator.Validator{val3})
assert.Equal(t, committee.Proposer(0).Number(), int32(6))

// Height 1003
val2Copy := copyValidator(val2)
val9Copy := copyValidator(val9)
val2Copy.UpdateLastJoinedHeight(1003)
val9Copy.UpdateLastJoinedHeight(1003)
committee.Update(0, []*validator.Validator{val9Copy, val2Copy})
val2.UpdateLastJoinedHeight(1003)
val9.UpdateLastJoinedHeight(1003)
committee.Update(0, []*validator.Validator{val9, val2})
assert.Equal(t, committee.Proposer(0).Number(), int32(7))
assert.Equal(t, committee.Proposer(1).Number(), int32(8))
assert.Equal(t, committee.Committers(), []int32{8, 2, 3, 5, 9, 6, 7})

// Height 1004
valACopy := copyValidator(valA)
valACopy.UpdateLastJoinedHeight(1004)
committee.Update(1, []*validator.Validator{valACopy})
valA.UpdateLastJoinedHeight(1004)
committee.Update(1, []*validator.Validator{valA})
assert.Equal(t, committee.Proposer(0).Number(), int32(2))
assert.Equal(t, committee.Committers(), []int32{8, 2, 3, 9, 6, 10, 7})

// Height 1005
valBCopy := copyValidator(valB)
valCCopy := copyValidator(valC)
valBCopy.UpdateLastJoinedHeight(1005)
valCCopy.UpdateLastJoinedHeight(1005)
committee.Update(0, []*validator.Validator{valCCopy, valBCopy})
valB.UpdateLastJoinedHeight(1005)
valC.UpdateLastJoinedHeight(1005)
committee.Update(0, []*validator.Validator{valC, valB})
assert.Equal(t, committee.Proposer(0).Number(), int32(3))
assert.Equal(t, committee.Proposer(1).Number(), int32(9))
assert.Equal(t, committee.Proposer(2).Number(), int32(10))
assert.Equal(t, committee.Committers(), []int32{8, 11, 12, 2, 3, 9, 10})

// Height 1006
val1Copy := copyValidator(val1)
val1Copy.UpdateLastJoinedHeight(1006)
committee.Update(2, []*validator.Validator{val1Copy})
val1.UpdateLastJoinedHeight(1006)
committee.Update(2, []*validator.Validator{val1})
assert.Equal(t, committee.Proposer(0).Number(), int32(11))
assert.Equal(t, committee.Committers(), []int32{11, 12, 2, 1, 3, 9, 10})

// Height 1007
val2Copy = copyValidator(val2)
val3Copy = copyValidator(val3)
val5Copy := copyValidator(val5)
val6Copy := copyValidator(val6)
val2Copy.UpdateLastJoinedHeight(1007)
val3Copy.UpdateLastJoinedHeight(1007)
val5Copy.UpdateLastJoinedHeight(1007)
val6Copy.UpdateLastJoinedHeight(1007)
committee.Update(4, []*validator.Validator{val2Copy, val3Copy, val5Copy, val6Copy})
val2.UpdateLastJoinedHeight(1007)
val3.UpdateLastJoinedHeight(1007)
val5.UpdateLastJoinedHeight(1007)
val6.UpdateLastJoinedHeight(1007)
committee.Update(4, []*validator.Validator{val2, val3, val5, val6})
assert.Equal(t, committee.Proposer(0).Number(), int32(5))
assert.Equal(t, committee.Committers(), []int32{5, 6, 11, 12, 2, 1, 3})
}
Expand All @@ -298,8 +291,8 @@ func TestIsProposer(t *testing.T) {
committee, err := NewCommittee([]*validator.Validator{val1, val2, val3, val4}, 4, val1.Address())
assert.NoError(t, err)

assert.Equal(t, committee.Proposer(0).Address(), val1.Address())
assert.Equal(t, committee.Proposer(1).Address(), val2.Address())
assert.Equal(t, committee.Proposer(0).Number(), int32(0))
assert.Equal(t, committee.Proposer(1).Number(), int32(1))
assert.True(t, committee.IsProposer(val3.Address(), 2))
assert.False(t, committee.IsProposer(val4.Address(), 2))
assert.False(t, committee.IsProposer(crypto.GenerateTestAddress(), 2))
Expand Down
3 changes: 2 additions & 1 deletion consensus/commit.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ func (s *commitState) decide() {
}

s.logger.Info("block committed, schedule new height", "precommitQH", precommitQH)
// Now we can broadcast the committed block

// Now we can announce the committed block and certificate
s.announceNewBlock(s.height, certBlock, cert)

s.enterNewState(s.newHeightState)
Expand Down
6 changes: 4 additions & 2 deletions consensus/consensus_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,8 @@ func TestNotInCommittee(t *testing.T) {
store := store.MockingStore()

st, _ := state.LoadOrNewState(tGenDoc, []crypto.Signer{signer}, store, tTxPool, nil)
Cons := NewConsensus(testConfig(), st, signer, signer.Address(), make(chan message.Message, 100), newMediator())
Cons := NewConsensus(testConfig(), st, signer, signer.Address(), make(chan message.Message, 100),
newMediator())
cons := Cons.(*consensus)

testEnterNewHeight(cons)
Expand Down Expand Up @@ -515,7 +516,8 @@ func TestNonActiveValidator(t *testing.T) {
setup(t)

signer := bls.GenerateTestSigner()
Cons := NewConsensus(testConfig(), state.MockingState(), signer, signer.Address(), make(chan message.Message, 100), newMediator())
Cons := NewConsensus(testConfig(), state.MockingState(), signer, signer.Address(), make(chan message.Message, 100),
newMediator())
nonActiveCons := Cons.(*consensus)

t.Run("non-active instances should be in new-height state", func(t *testing.T) {
Expand Down

0 comments on commit b91f686

Please sign in to comment.