Skip to content

Commit

Permalink
Fixed another flapping test...
Browse files Browse the repository at this point in the history
  • Loading branch information
kozlovic committed Sep 20, 2017
1 parent d04eddc commit 7eda4df
Showing 1 changed file with 23 additions and 12 deletions.
35 changes: 23 additions & 12 deletions server/server_redelivery_test.go
Expand Up @@ -707,23 +707,30 @@ func TestPersistentStoreAckMsgRedeliveredToDifferentQueueSub(t *testing.T) {
s := runServerWithOpts(t, opts, nil)
defer shutdownRestartedServerOnTestExit(&s)

var err error
var sub2 stan.Subscription
var (
err error
sub2 stan.Subscription
sub2Mu sync.Mutex
)

errs := make(chan error, 10)
sub2Recv := make(chan bool)
redelivered := int32(0)
trackDelivered := int32(0)
cb := func(m *stan.Msg) {
if m.Redelivered {
if m.Sub != sub2 {
errs <- fmt.Errorf("Expected redelivered msg to be sent to sub2")
return
}
if atomic.AddInt32(&redelivered, 1) != 1 {
errs <- fmt.Errorf("Message redelivered after restart")
return
}
sub2Mu.Lock()
isSub2 := m.Sub == sub2
sub2Mu.Unlock()
if !isSub2 {
// We want the message to be redelivered to sub2, so wait
// for that to happen
return
}
sub2Recv <- true
} else {
if atomic.LoadInt32(&trackDelivered) == 1 {
Expand All @@ -740,21 +747,25 @@ func TestPersistentStoreAckMsgRedeliveredToDifferentQueueSub(t *testing.T) {

// Create a queue subscriber with manual ackMode that will
// not ack the message.
if _, err := sc.QueueSubscribe("foo", "g1", cb, stan.AckWait(ackWaitInMs(15)),
if _, err := sc.QueueSubscribe("foo", "g1", cb, stan.AckWait(ackWaitInMs(100)),
stan.SetManualAckMode()); err != nil {
t.Fatalf("Unexpected error on subscribe: %v", err)
}
waitForNumSubs(t, s, clientName, 1)
// Send a message that we know is going to go to the first (and only)
// queue sub that will not ack this message
if err := sc.Publish("foo", []byte("msg")); err != nil {
t.Fatalf("Unexpected error on publish: %v", err)
}
sub2Mu.Lock()
// Create this subscriber that will receive and ack the message
sub2, err = sc.QueueSubscribe("foo", "g1", cb, stan.AckWait(ackWaitInMs(15)))
sub2, err = sc.QueueSubscribe("foo", "g1", cb)
sub2Mu.Unlock()
if err != nil {
t.Fatalf("Unexpected error on subscribe: %v", err)
}
// Make sure these are registered.
waitForNumSubs(t, s, clientName, 2)
// Send a message
if err := sc.Publish("foo", []byte("msg")); err != nil {
t.Fatalf("Unexpected error on publish: %v", err)
}
// Wait for sub2 to receive the message.
select {
case <-sub2Recv:
Expand Down

0 comments on commit 7eda4df

Please sign in to comment.