Skip to content

Commit

Permalink
Fix another flapping test
Browse files Browse the repository at this point in the history
  • Loading branch information
kozlovic committed Sep 19, 2017
1 parent 849caa0 commit 503c38b
Showing 1 changed file with 11 additions and 8 deletions.
19 changes: 11 additions & 8 deletions server/server_redelivery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1112,7 +1112,8 @@ func TestQueueRedeliveryOnStartup(t *testing.T) {
errCh <- fmt.Errorf("Unexpected new message %v into sub %d before getting all undelivered first", m.Sequence, id)
}
}
} else {
} else if atomic.LoadInt32(&restarted) == 1 {
// This is a redelivered message after server restart
if _, present := q.msgs[m.Sequence]; !present {
errCh <- fmt.Errorf("Unexpected message %v into sub %d", m.Sequence, id)
} else {
Expand All @@ -1125,20 +1126,18 @@ func TestQueueRedeliveryOnStartup(t *testing.T) {
}
}
}
// To reduce Travis test flapping, use a not too small AckWait for
// these 2 queue subs.
if _, err := sc.QueueSubscribe("foo", "queue",
newCb(1),
stan.MaxInflight(int(totalMsgs/2)),
stan.SetManualAckMode(),
stan.AckWait(ackWaitInMs(500))); err != nil {
stan.AckWait(ackWaitInMs(50))); err != nil {
t.Fatalf("Unexpected error on subscribe: %v", err)
}
if _, err := sc.QueueSubscribe("foo", "queue",
newCb(2),
stan.MaxInflight(int(totalMsgs/2)),
stan.SetManualAckMode(),
stan.AckWait(ackWaitInMs(500))); err != nil {
stan.AckWait(ackWaitInMs(50))); err != nil {
t.Fatalf("Unexpected error on subscribe: %v", err)
}
// Send more messages that can be accepted, both member should stall
Expand All @@ -1151,20 +1150,24 @@ func TestQueueRedeliveryOnStartup(t *testing.T) {
if err := Wait(ch); err != nil {
t.Fatal("Did not get our messages")
}
// Now stop server and wait more than AckWait before resarting
// Now stop server and wait more than AckWait before resarting.
s.Shutdown()
time.Sleep(600 * time.Millisecond)
atomic.StoreInt32(&restarted, 1)
// We need to make sure that the first redelivery on startup will
// actually send messages to original qsub. This happens only if
// the AckWait has elapsed. So make sure that we wait long enough.
time.Sleep(500 * time.Millisecond)
l := &trackDeliveredMsgs{newSeq: int(totalMsgs + 1), errCh: make(chan error, 1)}
opts.Trace = true
opts.CustomLogger = l
atomic.StoreInt32(&restarted, 1)
s = runServerWithOpts(t, opts, nil)
// Check that messages are delivered to members that
// originally got them. Wait for all messages to be redelivered
select {
case e := <-errCh:
t.Fatalf(e.Error())
case <-ch:
// All messages were redelivered
case <-time.After(time.Second):
t.Fatal("Did not get all redelivered messages")
}
Expand Down

0 comments on commit 503c38b

Please sign in to comment.