Skip to content

Commit

Permalink
modify the committer to return the new signatories set
Browse files Browse the repository at this point in the history
  • Loading branch information
tok-kkk committed Sep 9, 2021
1 parent 72ccbf5 commit 0c4e34f
Show file tree
Hide file tree
Showing 7 changed files with 113 additions and 58 deletions.
34 changes: 20 additions & 14 deletions mq/mq.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,22 +33,28 @@ func New(opts Options) MessageQueue {
// have heights up to (and including) the given height. The appropriate callback
// will be called for every message that is consumed. All consumed messages will
// be dropped from the MessageQueue.
func (mq *MessageQueue) Consume(h process.Height, propose func(process.Propose), prevote func(process.Prevote), precommit func(process.Precommit)) (n int) {
func (mq *MessageQueue) Consume(h process.Height, propose func(process.Propose), prevote func(process.Prevote), precommit func(process.Precommit), procsAllowed map[id.Signatory]bool) (n int) {
for from, q := range mq.queuesByPid {
for len(q) > 0 {
if q[0] == nil || height(q[0]) > h {
break
}
switch msg := q[0].(type) {
case process.Propose:
propose(msg)
case process.Prevote:
prevote(msg)
case process.Precommit:
precommit(msg)
}
n++
q = q[1:]
func() {
defer func() {
n++
q = q[1:]
}()

if ok := procsAllowed[from]; !ok {
return
}

switch msg := q[0].(type) {
case process.Propose:
propose(msg)
case process.Prevote:
prevote(msg)
case process.Precommit:
precommit(msg)
}
}()
}
mq.queuesByPid[from] = q
}
Expand Down
37 changes: 26 additions & 11 deletions mq/mq_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ var _ = Describe("MQ", func() {
proposeCallback,
prevoteCallback,
precommitCallback,
map[id.Signatory]bool{},
)

Expect(n).To(Equal(0))
Expand All @@ -124,6 +125,8 @@ var _ = Describe("MQ", func() {
sender := id.NewPrivKey().Signatory()
lowerHeight := process.Height(r.Int63())
higherHeight := lowerHeight + 1 + process.Height(r.Intn(100))
procsAllowed := map[id.Signatory]bool{}
procsAllowed[sender] = true

// send msg1
msg1 := randomMsg(r, sender, lowerHeight, processutil.RandomRound(r))
Expand Down Expand Up @@ -199,12 +202,12 @@ var _ = Describe("MQ", func() {

// cannot consume msgs of height less than lowerHeight
evenLowerHeight := lowerHeight - 1 - process.Height(r.Intn(100))
n := queue.Consume(evenLowerHeight, proposeCallback, prevoteCallback, precommitCallback)
n := queue.Consume(evenLowerHeight, proposeCallback, prevoteCallback, precommitCallback, procsAllowed)
Expect(n).To(Equal(0))
Expect(i).To(Equal(0))

// consume all messages
n = queue.Consume(higherHeight, proposeCallback, prevoteCallback, precommitCallback)
n = queue.Consume(higherHeight, proposeCallback, prevoteCallback, precommitCallback, procsAllowed)
Expect(n).To(Equal(2))
Expect(i).To(Equal(2))

Expand All @@ -222,6 +225,9 @@ var _ = Describe("MQ", func() {
loop := func() bool {
sender := id.NewPrivKey().Signatory()
height := process.Height(r.Int63())
procsAllowed := map[id.Signatory]bool{}
procsAllowed[sender] = true

// at the most 20 rounds
rounds := make([]process.Round, 1+r.Intn(20))
for t := 0; t < cap(rounds); t++ {
Expand Down Expand Up @@ -280,12 +286,12 @@ var _ = Describe("MQ", func() {

// cannot consume msgs of height less than lowerHeight
lowerHeight := height - 1 - process.Height(r.Intn(100))
n := queue.Consume(lowerHeight, proposeCallback, prevoteCallback, precommitCallback)
n := queue.Consume(lowerHeight, proposeCallback, prevoteCallback, precommitCallback, procsAllowed)
Expect(n).To(Equal(0))
Expect(t).To(Equal(0))

// consume all messages
n = queue.Consume(height, proposeCallback, prevoteCallback, precommitCallback)
n = queue.Consume(height, proposeCallback, prevoteCallback, precommitCallback, procsAllowed)
Expect(n).To(Equal(cap(rounds)))
Expect(t).To(Equal(cap(rounds)))

Expand All @@ -303,6 +309,8 @@ var _ = Describe("MQ", func() {
loop := func() bool {
sender := id.NewPrivKey().Signatory()
minHeight, maxHeight, msgsCount := insertRandomMessages(&queue, sender)
procsAllowed := map[id.Signatory]bool{}
procsAllowed[sender] = true

// we should first consume msg1 and then msg2
prevHeight := process.Height(-1)
Expand Down Expand Up @@ -369,12 +377,12 @@ var _ = Describe("MQ", func() {
}

// cannot consume msgs of height less than the min height
n := queue.Consume(minHeight-1, proposeCallback, prevoteCallback, precommitCallback)
n := queue.Consume(minHeight-1, proposeCallback, prevoteCallback, precommitCallback, procsAllowed)
Expect(n).To(Equal(0))
Expect(i).To(Equal(0))

// consume all messages
n = queue.Consume(maxHeight, proposeCallback, prevoteCallback, precommitCallback)
n = queue.Consume(maxHeight, proposeCallback, prevoteCallback, precommitCallback, procsAllowed)
Expect(n).To(Equal(msgsCount))
Expect(i).To(Equal(msgsCount))

Expand All @@ -392,6 +400,8 @@ var _ = Describe("MQ", func() {

loop := func() bool {
sender := id.NewPrivKey().Signatory()
procsAllowed := map[id.Signatory]bool{}
procsAllowed[sender] = true
_, maxHeight, _ := insertRandomMessages(&queue, sender)
thresholdHeight := process.Height(r.Intn(int(maxHeight)))
queue.DropMessagesBelowHeight(thresholdHeight)
Expand All @@ -406,7 +416,7 @@ var _ = Describe("MQ", func() {
Expect(precommit.Height >= thresholdHeight).To(BeTrue())
}

_ = queue.Consume(maxHeight, proposeCallback, prevoteCallback, precommitCallback)
_ = queue.Consume(maxHeight, proposeCallback, prevoteCallback, precommitCallback, procsAllowed)
return true
}
Expect(quick.Check(loop, nil)).To(Succeed())
Expand All @@ -418,13 +428,15 @@ var _ = Describe("MQ", func() {
loop := func() bool {
opts := mq.DefaultOptions().WithMaxCapacity(1)
queue := mq.New(opts)
procsAllowed := map[id.Signatory]bool{}

// insert a msg
originalSender := id.NewPrivKey().Signatory()
originalMsg := processutil.RandomPropose(r)
originalMsg.From = originalSender
originalMsg.Height = process.Height(1)
originalMsg.Round = process.Round(1)
procsAllowed[originalMsg.From] = true
queue.InsertPropose(originalMsg)

// any message in height > 1 or (height = 1 || round > 1) will be dropped
Expand All @@ -434,11 +446,12 @@ var _ = Describe("MQ", func() {
msg.From = id.NewPrivKey().Signatory()
msg.Height = process.Height(1)
msg.Round = process.Round(2)
procsAllowed[msg.From] = true
queue.InsertPropose(msg)

// so consuming will only return the first msg
proposeCallback := func(propose process.Propose) {}
n := queue.Consume(process.Height(1), proposeCallback, nil, nil)
n := queue.Consume(process.Height(1), proposeCallback, nil, nil, procsAllowed)
Expect(n).To(Equal(2))

// re-insert the original msg
Expand All @@ -458,7 +471,7 @@ var _ = Describe("MQ", func() {
Expect(propose.Round).To(Equal(originalMsg.Round))
Expect(propose.From).To(Equal(originalSender))
}
n = queue.Consume(process.Height(1), proposeCallback, nil, nil)
n = queue.Consume(process.Height(1), proposeCallback, nil, nil, procsAllowed)
Expect(n).To(Equal(1))

// re-insert the original msg
Expand All @@ -477,7 +490,7 @@ var _ = Describe("MQ", func() {
Expect(propose.Round).To(Equal(msg.Round))
Expect(propose.From).To(Equal(originalSender))
}
n = queue.Consume(process.Height(1), proposeCallback, nil, nil)
n = queue.Consume(process.Height(1), proposeCallback, nil, nil, procsAllowed)
Expect(n).To(Equal(1))

return true
Expand All @@ -497,6 +510,8 @@ var _ = Describe("MQ", func() {
// msgsCount > c
sender := id.NewPrivKey().Signatory()
height := process.Height(1)
procsAllowed := map[id.Signatory]bool{}
procsAllowed[sender] = true
msgsCount := c + 5 + r.Intn(20)
rounds := make([]process.Round, msgsCount)
msgs := make([]interface{}, msgsCount)
Expand Down Expand Up @@ -553,7 +568,7 @@ var _ = Describe("MQ", func() {
i++
}

n := queue.Consume(height, proposeCallback, prevoteCallback, precommitCallback)
n := queue.Consume(height, proposeCallback, prevoteCallback, precommitCallback, procsAllowed)
Expect(n).To(Equal(c))
Expect(i).To(Equal(c))

Expand Down
16 changes: 14 additions & 2 deletions process/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ type Validator interface {
// new Value implies that all correct Processes agree on this Value at this
// Height, and will never revert.
type Committer interface {
Commit(Height, Value)
Commit(Height, Value) (uint64, Scheduler)
}

// A Catcher is used to catch bad behaviour in other Processes. For example,
Expand Down Expand Up @@ -278,6 +278,12 @@ func (p *Process) Start() {
p.StartRound(0)
}

func (p *Process) StartWithNewSignatories(f uint64, scheduler Scheduler) {
p.f = f
p.scheduler = scheduler
p.StartRound(0)
}

// StartRound will progress the Process to a new Round. It does not assume that
// the Height has changed. Since this changes the current Round and the current
// Step, most of the condition methods will be retried at the end (by way of
Expand Down Expand Up @@ -694,7 +700,13 @@ func (p *Process) tryCommitUponSufficientPrecommits(round Round) {
}
}
if precommitsForValue >= int(2*p.f+1) {
p.committer.Commit(p.CurrentHeight, propose.Value)
f, scheduler := p.committer.Commit(p.CurrentHeight, propose.Value)
if f != 0 {
p.f = f
}
if scheduler != nil {
p.scheduler = scheduler
}
p.CurrentHeight++

// Reset lockedRound, lockedValue, validRound, and validValue to initial
Expand Down
24 changes: 16 additions & 8 deletions process/process_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -820,9 +820,10 @@ var _ = Describe("Process", func() {
BroadcastPrecommitCallback: nil,
}
committer := processutil.CommitterCallback{
Callback: func(height process.Height, value process.Value) {
Callback: func(height process.Height, value process.Value) (uint64, process.Scheduler) {
// we should not commit to the nil value proposal
Expect(true).To(BeFalse())
return 0, nil
},
}
p := process.New(whoami, f, nil, nil, nil, nil, broadcaster, committer, nil)
Expand Down Expand Up @@ -2674,10 +2675,11 @@ var _ = Describe("Process", func() {
f := 5 + (r.Int() % 10)
acknowledge := false
committer := processutil.CommitterCallback{
Callback: func(height process.Height, value process.Value) {
Callback: func(height process.Height, value process.Value) (uint64, process.Scheduler) {
Expect(height).To(Equal(currentHeight))
Expect(value).To(Equal(proposedValue))
acknowledge = true
return 0, nil
},
}
scheduledProposer := id.NewPrivKey().Signatory()
Expand Down Expand Up @@ -2737,10 +2739,11 @@ var _ = Describe("Process", func() {
f := 5 + (r.Int() % 10)
acknowledge := false
committer := processutil.CommitterCallback{
Callback: func(height process.Height, value process.Value) {
Callback: func(height process.Height, value process.Value) (uint64, process.Scheduler) {
Expect(height).To(Equal(currentHeight))
Expect(value).To(Equal(proposedValue))
acknowledge = true
return 0, nil
},
}

Expand Down Expand Up @@ -2795,8 +2798,9 @@ var _ = Describe("Process", func() {
whoami := id.NewPrivKey().Signatory()
f := 5 + (r.Int() % 10)
committer := processutil.CommitterCallback{
Callback: func(height process.Height, value process.Value) {
Callback: func(height process.Height, value process.Value) (uint64, process.Scheduler) {
Fail("unexpectedly received a commit")
return 0, nil
},
}
broadcaster := processutil.BroadcasterCallbacks{
Expand Down Expand Up @@ -2860,8 +2864,9 @@ var _ = Describe("Process", func() {
whoami := id.NewPrivKey().Signatory()
f := 5 + (r.Int() % 10)
committer := processutil.CommitterCallback{
Callback: func(height process.Height, value process.Value) {
Callback: func(height process.Height, value process.Value) (uint64, process.Scheduler) {
Fail("unexpectedly received a commit")
return 0, nil
},
}
scheduler := scheduler.NewRoundRobin([]id.Signatory{id.NewPrivKey().Signatory()})
Expand Down Expand Up @@ -2910,8 +2915,9 @@ var _ = Describe("Process", func() {
whoami := id.NewPrivKey().Signatory()
f := 5 + (r.Int() % 10)
committer := processutil.CommitterCallback{
Callback: func(height process.Height, value process.Value) {
Callback: func(height process.Height, value process.Value) (uint64, process.Scheduler) {
Fail("unexpectedly received a commit")
return 0, nil
},
}
validator := processutil.MockValidator{MockValid: func(process.Height, process.Round, process.Value) bool { return false }}
Expand Down Expand Up @@ -2962,10 +2968,11 @@ var _ = Describe("Process", func() {
acknowledge := false
proposedValue := processutil.RandomGoodValue(r)
committer := processutil.CommitterCallback{
Callback: func(height process.Height, value process.Value) {
Callback: func(height process.Height, value process.Value) (uint64, process.Scheduler) {
Expect(height).To(Equal(currentHeight))
Expect(value).To(Equal(proposedValue))
acknowledge = true
return 0, nil
},
}
p := process.New(whoami, f, nil, nil, nil, nil, nil, committer, nil)
Expand Down Expand Up @@ -3009,8 +3016,9 @@ var _ = Describe("Process", func() {
f := 5 + (r.Int() % 10)
proposedValue := processutil.RandomGoodValue(r)
committer := processutil.CommitterCallback{
Callback: func(height process.Height, value process.Value) {
Callback: func(height process.Height, value process.Value) (uint64, process.Scheduler) {
Fail("unexpectedly received a commit")
return 0, nil
},
}
p := process.New(whoami, f, nil, nil, nil, nil, nil, committer, nil)
Expand Down
8 changes: 4 additions & 4 deletions process/processutil/processutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,15 +42,15 @@ func (broadcaster BroadcasterCallbacks) BroadcastPrecommit(precommit process.Pre
// CommitterCallback provides a callback function to test the Committer
// behaviour required by a Process
type CommitterCallback struct {
Callback func(process.Height, process.Value)
Callback func(process.Height, process.Value) (uint64, process.Scheduler)
}

// Commit passes the commitment parameters height and round to the commit callback, if present
func (committer CommitterCallback) Commit(height process.Height, value process.Value) {
func (committer CommitterCallback) Commit(height process.Height, value process.Value) (uint64, process.Scheduler) {
if committer.Callback == nil {
return
return 0, nil
}
committer.Callback(height, value)
return committer.Callback(height, value)
}

// MockProposer is a mock implementation of the Proposer interface
Expand Down
Loading

0 comments on commit 0c4e34f

Please sign in to comment.