From 65f1926329dcee584ccb6aaefc9c9d57c827a708 Mon Sep 17 00:00:00 2001 From: Rohit Narurkar Date: Fri, 17 Jul 2020 02:55:06 +0530 Subject: [PATCH] refactor with killed replicas --- replica/replica_test.go | 750 ++++++++++++++++------------------------ 1 file changed, 306 insertions(+), 444 deletions(-) diff --git a/replica/replica_test.go b/replica/replica_test.go index cdc26383..7b8461d3 100644 --- a/replica/replica_test.go +++ b/replica/replica_test.go @@ -31,209 +31,221 @@ import ( // - Precommit: yes-to-malformed, no-to-valid, missing, fork-attempt, out-of-turn var _ = Describe("Replica", func() { - Context("with sufficient replicas online for consensus to progress", func() { - setup := func( - seed int64, - f, n uint8, - targetHeight process.Height, - mq *[]Message, - commits *map[uint8]map[process.Height]process.Value, - ) ( - *rand.Rand, - Scenario, - bool, - *sync.Mutex, - chan struct{}, - chan bool, - []*replica.Replica, - []context.Context, - []context.CancelFunc, - context.CancelFunc, - ) { - // create private keys for the signatories participating in consensus - // rounds, and get their signatories - privKeys := make([]*id.PrivKey, 3*f+1) - signatories := make([]id.Signatory, 3*f+1) - for i := range privKeys { - privKeys[i] = id.NewPrivKey() - signatories[i] = privKeys[i].Signatory() - (*commits)[uint8(i)] = make(map[process.Height]process.Value) - } - - // construct the scenario for this test case - scenario := Scenario{seed, f, n, signatories, []Message{}} - - // whether we are running the test in replay mode - replayMode := readBoolEnvVar("REPLAY_MODE") - - // fetch from the dump file in that case - if replayMode { - scenario = readFromFile("failure.dump") - seed = scenario.seed - f = scenario.f - n = scenario.n - signatories = scenario.signatories - } - - // random number generator - r := rand.New(rand.NewSource(seed)) + setup := func( + seed int64, + f, n, completion uint8, + targetHeight process.Height, + mq *[]Message, + commits *map[uint8]map[process.Height]process.Value, + ) ( + *rand.Rand, + Scenario, + bool, + *sync.Mutex, + chan struct{}, + chan bool, + []*replica.Replica, + []context.Context, + []context.CancelFunc, + context.CancelFunc, + ) { + // create private keys for the signatories participating in consensus + // rounds, and get their signatories + privKeys := make([]*id.PrivKey, 3*f+1) + signatories := make([]id.Signatory, 3*f+1) + for i := range privKeys { + privKeys[i] = id.NewPrivKey() + signatories[i] = privKeys[i].Signatory() + (*commits)[uint8(i)] = make(map[process.Height]process.Value) + } - // signals to denote processing of messages - mqSignal := make(chan struct{}, n*n) - mqSignal <- struct{}{} + // construct the scenario for this test case + scenario := Scenario{seed, f, n, completion, signatories, []Message{}} - // signal to denote completion of consensus target - completionSignal := make(chan bool, n) + // whether we are running the test in replay mode + replayMode := readBoolEnvVar("REPLAY_MODE") - // mutex to synchronize access to the mq slice for all the replicas - var mqMutex = &sync.Mutex{} + // fetch from the dump file in that case + if replayMode { + scenario = readFromFile("failure.dump") + seed = scenario.seed + f = scenario.f + n = scenario.n + completion = scenario.completion + signatories = scenario.signatories + } - // instantiate the replicas - replicas := make([]*replica.Replica, n) - for i := range replicas { - replicaIndex := uint8(i) - replicas[i] = replica.New( - replica.DefaultOptions(). - WithTimerOptions( - timer.DefaultOptions(). - WithTimeout(500*time.Millisecond), - ), - signatories[i], - signatories, - // Proposer - processutil.MockProposer{ - MockValue: func() process.Value { - v := processutil.RandomGoodValue(r) - return v - }, + // random number generator + r := rand.New(rand.NewSource(seed)) + + // signals to denote processing of messages + mqSignal := make(chan struct{}, n*n) + mqSignal <- struct{}{} + + // signal to denote completion of consensus target + completionSignal := make(chan bool, completion) + + // mutex to synchronize access to the mq slice for all the replicas + var mqMutex = &sync.Mutex{} + + // instantiate the replicas + replicas := make([]*replica.Replica, n) + for i := range replicas { + replicaIndex := uint8(i) + replicas[i] = replica.New( + replica.DefaultOptions(). + WithTimerOptions( + timer.DefaultOptions(). + WithTimeout(500*time.Millisecond), + ), + signatories[i], + signatories, + // Proposer + processutil.MockProposer{ + MockValue: func() process.Value { + mqMutex.Lock() + v := processutil.RandomGoodValue(r) + mqMutex.Unlock() + return v }, - // Validator - processutil.MockValidator{ - MockValid: func(process.Value) bool { - return true - }, + }, + // Validator + processutil.MockValidator{ + MockValid: func(process.Value) bool { + return true }, - // Committer - processutil.CommitterCallback{ - Callback: func(height process.Height, value process.Value) { - // add to the map of commits - mqMutex.Lock() - (*commits)[replicaIndex][height] = value - mqMutex.Unlock() + }, + // Committer + processutil.CommitterCallback{ + Callback: func(height process.Height, value process.Value) { + // add to the map of commits + mqMutex.Lock() + (*commits)[replicaIndex][height] = value + mqMutex.Unlock() - // signal for completion if this is the target height - if height == targetHeight { - completionSignal <- true - } - }, + // signal for completion if this is the target height + if height == targetHeight { + completionSignal <- true + } }, - // Catcher - nil, - // Broadcaster - processutil.BroadcasterCallbacks{ - BroadcastProposeCallback: func(propose process.Propose) { + }, + // Catcher + nil, + // Broadcaster + processutil.BroadcasterCallbacks{ + BroadcastProposeCallback: func(propose process.Propose) { + for j := uint8(0); j < n; j++ { mqMutex.Lock() - for j := uint8(0); j < n; j++ { - *mq = append(*mq, Message{ - to: j, - messageType: 1, - value: propose, - }) - } + *mq = append(*mq, Message{ + to: j, + messageType: 1, + value: propose, + }) mqMutex.Unlock() - }, - BroadcastPrevoteCallback: func(prevote process.Prevote) { + } + }, + BroadcastPrevoteCallback: func(prevote process.Prevote) { + for j := uint8(0); j < n; j++ { mqMutex.Lock() - for j := uint8(0); j < n; j++ { - *mq = append(*mq, Message{ - to: j, - messageType: 2, - value: prevote, - }) - } + *mq = append(*mq, Message{ + to: j, + messageType: 2, + value: prevote, + }) mqMutex.Unlock() - }, - BroadcastPrecommitCallback: func(precommit process.Precommit) { + } + }, + BroadcastPrecommitCallback: func(precommit process.Precommit) { + for j := uint8(0); j < n; j++ { mqMutex.Lock() - for j := uint8(0); j < n; j++ { - *mq = append(*mq, Message{ - to: j, - messageType: 3, - value: precommit, - }) - } + *mq = append(*mq, Message{ + to: j, + messageType: 3, + value: precommit, + }) mqMutex.Unlock() - }, - }, - // Flusher - func() { - mqSignal <- struct{}{} + } }, - ) - } + }, + // Flusher + func() { + mqSignal <- struct{}{} + }, + ) + } - // global context within which all replicas will run - ctx, cancel := context.WithCancel(context.Background()) + // global context within which all replicas will run + ctx, cancel := context.WithCancel(context.Background()) - // individual replica contexts and functions to kill/cancel them - replicaCtxs, replicaCtxCancels := make([]context.Context, n), make([]context.CancelFunc, n) - for i := range replicaCtxs { - replicaCtxs[i], replicaCtxCancels[i] = context.WithCancel(ctx) - } - - return r, scenario, replayMode, mqMutex, mqSignal, completionSignal, replicas, replicaCtxs, replicaCtxCancels, cancel + // individual replica contexts and functions to kill/cancel them + replicaCtxs, replicaCtxCancels := make([]context.Context, n), make([]context.CancelFunc, n) + for i := range replicaCtxs { + replicaCtxs[i], replicaCtxCancels[i] = context.WithCancel(ctx) } - play := func( - scenario *Scenario, - timeout time.Duration, - mq *[]Message, - mqMutex *sync.Mutex, - mqSignal chan struct{}, - completionSignal chan bool, - replicas []*replica.Replica, - successFn func(), - failureFn func(*Scenario), - inspectFn func(*Scenario), - ) { - isRunning := true - for timeoutSignal := time.After(timeout); isRunning; { - select { - case <-timeoutSignal: - failureFn(scenario) - case <-mqSignal: - // the consensus target has been achieved on every replica - if len(completionSignal) == int(scenario.n) { - successFn() - isRunning = false - continue - } + return r, scenario, replayMode, mqMutex, mqSignal, completionSignal, replicas, replicaCtxs, replicaCtxCancels, cancel + } - // synchronously get the length of mq + play := func( + scenario *Scenario, + timeout time.Duration, + mq *[]Message, + mqMutex *sync.Mutex, + mqSignal chan struct{}, + completionSignal chan bool, + replicas []*replica.Replica, + killedReplicas *map[uint8]bool, + successFn func(), + failureFn func(*Scenario), + inspectFn func(*Scenario), + ) { + isRunning := true + for timeoutSignal := time.After(timeout); isRunning; { + select { + case <-timeoutSignal: + failureFn(scenario) + case <-mqSignal: + // the consensus target has been achieved on every replica + if len(completionSignal) == int(scenario.completion) { mqMutex.Lock() - mqLen := len(*mq) + successFn() mqMutex.Unlock() - // ignore if it was a spurious signal - if mqLen == 0 { - continue - } + isRunning = false + continue + } - // pop the first message off the slice - mqMutex.Lock() - m := (*mq)[0] - *mq = (*mq)[1:] - mqMutex.Unlock() + // synchronously get the length of mq + mqMutex.Lock() + mqLen := len(*mq) + mqMutex.Unlock() - // ignore if it is a nil message - if m.value == nil { - continue - } + // ignore if it was a spurious signal + if mqLen == 0 { + continue + } - // append the message to message history - scenario.messages = append(scenario.messages, m) + // pop the first message off the slice + mqMutex.Lock() + m := (*mq)[0] + *mq = (*mq)[1:] + mqMutex.Unlock() - // handle the message + // ignore if it is a nil message + if m.value == nil { + continue + } + + // append the message to message history + scenario.messages = append(scenario.messages, m) + + // is the recipient replica killed + mqMutex.Lock() + _, isKilled := (*killedReplicas)[uint8(m.to)] + mqMutex.Unlock() + + // handle the message + if !isKilled { time.Sleep(1 * time.Millisecond) replica := replicas[m.to] switch value := m.value.(type) { @@ -247,26 +259,34 @@ var _ = Describe("Replica", func() { panic(fmt.Errorf("non-exhaustive pattern: message.value has type %T", value)) } + mqMutex.Lock() inspectFn(scenario) + mqMutex.Unlock() + } else { + mqSignal <- struct{}{} } } } + } - replay := func( - scenario *Scenario, - mqSignal chan struct{}, - completionSignal chan bool, - replicas []*replica.Replica, - successFn func(), - ) { - // dummy goroutine to keep consuming the mqSignal - go func() { - for range mqSignal { - } - }() + replay := func( + scenario *Scenario, + mqSignal chan struct{}, + completionSignal chan bool, + replicas []*replica.Replica, + killedReplicas *map[uint8]bool, + successFn func(), + inspectFn func(*Scenario), + ) { + // dummy goroutine to keep consuming the mqSignal + go func() { + for range mqSignal { + } + }() - // handle every message in the messages history - for _, message := range scenario.messages { + // handle every message in the messages history + for _, message := range scenario.messages { + if _, isKilled := (*killedReplicas)[uint8(message.to)]; !isKilled { time.Sleep(1 * time.Millisecond) recipient := replicas[message.to] switch value := message.value.(type) { @@ -280,14 +300,18 @@ var _ = Describe("Replica", func() { panic(fmt.Errorf("non-exhaustive pattern: message.value has type %T", value)) } - // exit replay if the consensus target has been achieved - if len(completionSignal) == int(scenario.n) { - successFn() - break - } + inspectFn(scenario) + } + + // exit replay if the consensus target has been achieved + if len(completionSignal) == int(scenario.completion) { + successFn() + break } } + } + Context("with sufficient replicas online for consensus to progress", func() { Context("with 3f+1 replicas online", func() { It("should be able to reach consensus", func() { // randomness seed @@ -296,17 +320,21 @@ var _ = Describe("Replica", func() { f := uint8(3) // number of replicas online n := 3*f + 1 + // number of replicas to signal completion + completion := n // target height of consensus to mark the test as succeeded targetHeight := process.Height(30) // dynamic slice to hold the messages being sent between replicas mq := []Message{} // commits from replicas commits := make(map[uint8]map[process.Height]process.Value) + // map to keep a record of which replica was killed + killedReplicas := make(map[uint8]bool) // setup the test scenario - _, scenario, replayMode, mqMutex, mqSignal, completionSignal, replicas, replicaCtxs, replicaCtxCancels, cancel := setup(seed, f, n, targetHeight, &mq, &commits) + _, scenario, replayMode, mqMutex, mqSignal, completionSignal, replicas, replicaCtxs, replicaCtxCancels, cancel := setup(seed, f, n, completion, targetHeight, &mq, &commits) - // Run all of the replicas in independent background goroutines. + // Run all of the replicas in independent background goroutines for i := range replicas { go replicas[i].Run(replicaCtxs[i]) } @@ -325,19 +353,13 @@ var _ = Describe("Replica", func() { replicaCtxCancels[i]() } - // synchronously fetch the first replica's commits - mqMutex.Lock() + // fetch the first replica's commits referenceCommits := commits[0] - mqMutex.Unlock() // ensure that all replicas have the same commits for j := uint8(0); j < n; j++ { for h := process.Height(1); h <= targetHeight; { - mqMutex.Lock() - commit := commits[j][h] - mqMutex.Unlock() - - Expect(commit).To(Equal(referenceCommits[h])) + Expect(commits[j][h]).To(Equal(referenceCommits[h])) h++ } } @@ -349,11 +371,11 @@ var _ = Describe("Replica", func() { if !replayMode { timeout := 15 * time.Second - play(&scenario, timeout, &mq, mqMutex, mqSignal, completionSignal, replicas, successFn, failureFn, inspectFn) + play(&scenario, timeout, &mq, mqMutex, mqSignal, completionSignal, replicas, &killedReplicas, successFn, failureFn, inspectFn) } if replayMode { - replay(&scenario, mqSignal, completionSignal, replicas, successFn) + replay(&scenario, mqSignal, completionSignal, replicas, &killedReplicas, successFn, inspectFn) } }) }) @@ -366,14 +388,19 @@ var _ = Describe("Replica", func() { f := uint8(3) // number of replicas online n := 2*f + 1 + // number of replicas that should signal completion + completion := n // target height of consensus to mark the test as succeeded targetHeight := process.Height(30) // dynamic slice to hold the messages being sent between replicas mq := []Message{} // commits from replicas commits := make(map[uint8]map[process.Height]process.Value) + // map to keep a record of which replica was killed + killedReplicas := make(map[uint8]bool) + // setup the test scenario - _, scenario, replayMode, mqMutex, mqSignal, completionSignal, replicas, replicaCtxs, replicaCtxCancels, cancel := setup(seed, f, n, targetHeight, &mq, &commits) + _, scenario, replayMode, mqMutex, mqSignal, completionSignal, replicas, replicaCtxs, replicaCtxCancels, cancel := setup(seed, f, n, completion, targetHeight, &mq, &commits) // Run all of the replicas in independent background goroutines. for i := range replicas { @@ -394,19 +421,13 @@ var _ = Describe("Replica", func() { replicaCtxCancels[i]() } - // synchronously fetch the first replica's commits - mqMutex.Lock() + // fetch the first replica's commits referenceCommits := commits[0] - mqMutex.Unlock() // ensure that all replicas have the same commits for j := uint8(0); j < n; j++ { for h := process.Height(1); h <= targetHeight; { - mqMutex.Lock() - commit := commits[j][h] - mqMutex.Unlock() - - Expect(commit).To(Equal(referenceCommits[h])) + Expect(commits[j][h]).To(Equal(referenceCommits[h])) h++ } } @@ -418,11 +439,11 @@ var _ = Describe("Replica", func() { if !replayMode { timeout := 35 * time.Second - play(&scenario, timeout, &mq, mqMutex, mqSignal, completionSignal, replicas, successFn, failureFn, inspectFn) + play(&scenario, timeout, &mq, mqMutex, mqSignal, completionSignal, replicas, &killedReplicas, successFn, failureFn, inspectFn) } if replayMode { - replay(&scenario, mqSignal, completionSignal, replicas, successFn) + replay(&scenario, mqSignal, completionSignal, replicas, &killedReplicas, successFn, inspectFn) } }) }) @@ -431,261 +452,92 @@ var _ = Describe("Replica", func() { Context("with 3f+1 replicas online, f replicas slowly go offline", func() { It("should be able to reach consensus", func() { // randomness seed - rSeed := time.Now().UnixNano() - r := rand.New(rand.NewSource(rSeed)) - + seed := time.Now().UnixNano() // f is the maximum no. of adversaries - // n is the number of honest replicas online - // h is the target minimum consensus height f := uint8(3) + // n is the number of honest replicas online n := 3*f + 1 - targetHeight := process.Height(12) - + // completion is the number of replicas that should signal completion + completion := 2*f + 1 + // target height of consensus to mark the test as succeeded + targetHeight := process.Height(30) + // dynamic slice to hold the messages being sent between replicas + mq := []Message{} // commits from replicas commits := make(map[uint8]map[process.Height]process.Value) - - // setup private keys for the replicas - // and their signatories - privKeys := make([]*id.PrivKey, n) - signatories := make([]id.Signatory, n) - for i := range privKeys { - privKeys[i] = id.NewPrivKey() - signatories[i] = privKeys[i].Signatory() - commits[uint8(i)] = make(map[process.Height]process.Value) - } - - // every replica sends this signal when they reach the target - // consensus height - completionSignal := make(chan bool, n) - - // replicas randomly send this signal that kills them - // this is to simulate a behaviour where f replicas go offline + // map to keep a record of which replica was killed killedReplicas := make(map[uint8]bool) - killSignal := make(chan uint8, f) - // lastKilled stores the last height at which a replica was killed - // killInterval is the number of blocks progressed after which - // a replica should be killed - lastKilled := process.Height(0) - killInterval := process.Height(2) + // setup the test scenario + r, scenario, replayMode, mqMutex, mqSignal, completionSignal, replicas, replicaCtxs, replicaCtxCancels, cancel := setup(seed, f, n, completion, targetHeight, &mq, &commits) - // slice of messages to be broadcasted between replicas. - // messages from mq are popped and handled whenever mqSignal is signalled - mq := []Message{} - mqMutex := &sync.Mutex{} - mqSignal := make(chan struct{}, n*n) - mqSignal <- struct{}{} - - // build replicas - replicas := make([]*replica.Replica, n) + // Run all of the replicas in independent background goroutines for i := range replicas { - replicaIndex := uint8(i) - - replicas[i] = replica.New( - replica.DefaultOptions(). - WithTimerOptions( - timer.DefaultOptions(). - WithTimeout(1*time.Second), - ), - signatories[i], - signatories, - // Proposer - processutil.MockProposer{ - MockValue: func() process.Value { - v := processutil.RandomGoodValue(r) - return v - }, - }, - // Validator - processutil.MockValidator{ - MockValid: func(process.Value) bool { - return true - }, - }, - // Committer - processutil.CommitterCallback{ - Callback: func(height process.Height, value process.Value) { - // add commit to the commits map - commits[replicaIndex][height] = value - - // signal for completion if this is the target height - if height == targetHeight { - completionSignal <- true - return - } - - // kill this replica if we've progressed the killInterval number - // of blocks, and there are more than 2f+1 replica alive - mqMutex.Lock() - if height-lastKilled > killInterval { - if len(killedReplicas) < int(f) { - killSignal <- replicaIndex - lastKilled = height - } - } - mqMutex.Unlock() - }, - }, - // Catcher - nil, - // Broadcaster - processutil.BroadcasterCallbacks{ - BroadcastProposeCallback: func(propose process.Propose) { - mqMutex.Lock() - for j := uint8(0); j < n; j++ { - if _, ok := killedReplicas[j]; !ok { - mq = append(mq, Message{ - to: j, - value: propose, - }) - } - } - mqMutex.Unlock() - }, - BroadcastPrevoteCallback: func(prevote process.Prevote) { - mqMutex.Lock() - for j := uint8(0); j < n; j++ { - if _, ok := killedReplicas[j]; !ok { - mq = append(mq, Message{ - to: j, - value: prevote, - }) - } - } - mqMutex.Unlock() - }, - BroadcastPrecommitCallback: func(precommit process.Precommit) { - mqMutex.Lock() - for j := uint8(0); j < n; j++ { - if _, ok := killedReplicas[j]; !ok { - mq = append(mq, Message{ - to: j, - value: precommit, - }) - } - } - mqMutex.Unlock() - }, - }, - // Flusher - func() { - mqSignal <- struct{}{} - }, - ) - } - - // Create a global context that can be used to cancel the running of all - // replicas. - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - // in case the test fails to proceed to completion, we have a signal - // to mark it as failed - // this test should take 30 seconds to complete - failTestSignal := make(chan bool, 1) - go func() { - time.Sleep(60 * time.Second) - failTestSignal <- true - }() - - // From the global context, create per-replica contexts that can be used to - // cancel replicas independently of one another. This is useful when we want - // to simulate replicas "crashing" and "restarting" during consensus. - replicaCtxs, replicaCtxCancels := make([]context.Context, n), make([]context.CancelFunc, n) - for i := range replicaCtxs { - replicaCtxs[i], replicaCtxCancels[i] = context.WithCancel(ctx) + go replicas[i].Run(replicaCtxs[i]) } - // Run all of the replicas in independent background goroutines. - for i := range replicas { - go replicas[i].Run(replicaCtxs[i]) + // callback function called on test failure + failureFn := func(scenario *Scenario) { + cancel() + dumpToFile("failure.dump", *scenario) + Fail("test failed to complete within the expected timeframe") } - completion := func() { + // callback function called on test success + successFn := func() { // cancel the replica contexts for i := range replicaCtxs { replicaCtxCancels[i]() } - // ensure that all replicas have the same commits - for i := uint8(0); i < n; i++ { - if _, ok := killedReplicas[i]; !ok { - // from the first replica that's alive - referenceCommits := commits[i] - for j := i + 1; j < n; j++ { - for h := process.Height(1); h <= targetHeight; { - Expect(commits[j][h]).To(Equal(referenceCommits[h])) - h++ - } - } - break - } + // find the first replica that was alive till the consensus target + id := uint8(0) + for _, ok := killedReplicas[id]; ok; _, ok = killedReplicas[id] { + id++ } - } - - failTest := func() { - cancel() - Fail("test failed to complete within the expected timeframe") - } + referenceCommits := commits[id] - time.Sleep(100 * time.Millisecond) - isRunning := true - for isRunning { - select { - // if the expected time frame is over, we fail the test - case _ = <-failTestSignal: - failTest() - // else continue watching out for the mqSignal - case _ = <-mqSignal: - // this means the target consensus has been reached on every replica - if len(completionSignal) == int(2*f+1) { - completion() - isRunning = false + // ensure for all alive replicas + for j := id + 1; j < n; j++ { + // ignore if the replica was killed midway + if _, ok := killedReplicas[j]; ok { continue } - // kill a replica if there has been a kill signal - if len(killSignal) > 0 { - killIndex := <-killSignal - replicaCtxCancels[killIndex]() - killedReplicas[killIndex] = true - continue + // commits are the same across all heights + for h := process.Height(1); h <= targetHeight; { + Expect(commits[j][h]).To(Equal(referenceCommits[h])) + h++ } + } + } - mqMutex.Lock() - mqLen := len(mq) - mqMutex.Unlock() - - // ignore if there isn't any message - if mqLen == 0 { - continue + // callback function called on every message processed + inspectFn := func(scenario *Scenario) { + // we wish to kill at the most f replicas + if len(killedReplicas) < int(f) { + if r.Float64() < 0.01 { + // get a random replica to kill + id := r.Intn(int(n)) + + // if its not yet killed + if _, isKilled := killedReplicas[uint8(id)]; !isKilled { + // kill the replica + killedReplicas[uint8(id)] = true + replicaCtxCancels[id]() + } } + } + } - // pop the first message - mqMutex.Lock() - m := mq[0] - mq = mq[1:] - mqMutex.Unlock() + if !replayMode { + timeout := 30 * time.Second - // ignore if its a nil message - if m.value == nil { - continue - } + play(&scenario, timeout, &mq, mqMutex, mqSignal, completionSignal, replicas, &killedReplicas, successFn, failureFn, inspectFn) + } - // handle the message - time.Sleep(1 * time.Millisecond) - replica := replicas[m.to] - switch value := m.value.(type) { - case process.Propose: - replica.Propose(context.Background(), value) - case process.Prevote: - replica.Prevote(context.Background(), value) - case process.Precommit: - replica.Precommit(context.Background(), value) - default: - panic(fmt.Errorf("non-exhaustive pattern: message.value has type %T", value)) - } - } + if replayMode { + replay(&scenario, mqSignal, completionSignal, replicas, &killedReplicas, successFn, inspectFn) } }) }) @@ -1518,6 +1370,7 @@ type Scenario struct { seed int64 f uint8 n uint8 + completion uint8 signatories []id.Signatory messages []Message } @@ -1526,6 +1379,7 @@ func (s Scenario) SizeHint() int { return surge.SizeHint(s.seed) + surge.SizeHint(s.f) + surge.SizeHint(s.n) + + surge.SizeHint(s.completion) + surge.SizeHint(s.signatories) + surge.SizeHint(s.messages) } @@ -1543,6 +1397,10 @@ func (s Scenario) Marshal(buf []byte, rem int) ([]byte, int, error) { if err != nil { return buf, rem, fmt.Errorf("marshaling n=%v: %v", s.n, err) } + buf, rem, err = surge.Marshal(s.completion, buf, rem) + if err != nil { + return buf, rem, fmt.Errorf("marshaling completion=%v: %v", s.completion, err) + } buf, rem, err = surge.Marshal(s.signatories, buf, rem) if err != nil { return buf, rem, fmt.Errorf("marshaling signatories=%v: %v", s.signatories, err) @@ -1568,6 +1426,10 @@ func (s *Scenario) Unmarshal(buf []byte, rem int) ([]byte, int, error) { if err != nil { return buf, rem, fmt.Errorf("unmarshaling n: %v", err) } + buf, rem, err = surge.Unmarshal(&s.completion, buf, rem) + if err != nil { + return buf, rem, fmt.Errorf("unmarshaling completion: %v", err) + } buf, rem, err = surge.Unmarshal(&s.signatories, buf, rem) if err != nil { return buf, rem, fmt.Errorf("unmarshaling signatories: %v", err)