diff --git a/server/server_redelivery_test.go b/server/server_redelivery_test.go index 3e5d61f7..e39a5b62 100644 --- a/server/server_redelivery_test.go +++ b/server/server_redelivery_test.go @@ -1116,7 +1116,7 @@ func TestQueueRedeliveryOnStartup(t *testing.T) { ch := make(chan bool, 1) errCh := make(chan error, 4) - skipCh := make(chan bool, 1) + skipCh := make(chan bool, 2) restarted := int32(0) totalMsgs := int32(10) delivered := int32(0) @@ -1181,15 +1181,15 @@ func TestQueueRedeliveryOnStartup(t *testing.T) { } } // Wait for all messages to be received - if err := Wait(ch); err != nil { - t.Fatal("Did not get our messages") - } select { + case <-ch: case <-skipCh: // If we have a redelivery before the server restart // (can happen on Travis because of timing), no point // in continuing this test. return + case <-time.After(5 * time.Second): + t.Fatal("Did not receive all our messages") default: } // Now stop server and wait more than AckWait before resarting. diff --git a/stores/common_msg_test.go b/stores/common_msg_test.go index 5efc3194..26db14b6 100644 --- a/stores/common_msg_test.go +++ b/stores/common_msg_test.go @@ -6,7 +6,6 @@ import ( "fmt" "reflect" "runtime" - "sync/atomic" "testing" "time" @@ -301,53 +300,6 @@ func TestCSMaxMsgs(t *testing.T) { } } -func dumpFSState(t *testing.T, s Store, format string, args ...interface{}) { - fs, ok := s.(*FileStore) - if !ok { - stackFatalf(t, format, args...) - return - } - fs.RLock() - channels := fs.channels - fs.RUnlock() - for name, c := range channels { - fmt.Printf("== Channel %q ==\n", name) - ms := c.Msgs.(*FileMsgStore) - ms.RLock() - nextExp := ms.expiration - totalCount := ms.totalCount - first := ms.first - last := ms.last - lenWakeChan := len(ms.bkgTasksWake) - timeTick := atomic.LoadInt64(&ms.timeTick) - ms.RUnlock() - fmt.Printf(" Now : %v\n", time.Now()) - fmt.Printf(" TimeTick : %v\n", time.Unix(0, timeTick)) - fmt.Printf(" Next Expiration : %v\n", time.Unix(0, nextExp)) - fmt.Printf(" Sleep interval : %v\n", bkgTasksSleepDuration) - fmt.Printf(" Total Count : %v\n", totalCount) - fmt.Printf(" First : %v\n", first) - fmt.Printf(" Last : %v\n", last) - fmt.Printf(" Len wakeup chan : %v\n", lenWakeChan) - fmt.Printf(" -- Messages --\n") - for idx := first; idx <= last; idx++ { - m, err := ms.Lookup(idx) - if err != nil { - fmt.Printf(" Msg %2d : Error during lookup: %v\n", idx, err) - } else if m == nil { - fmt.Printf(" Msg %2d : Lookup returned nil\n", idx) - } else { - fmt.Printf(" Msg %2d : %v\n", idx, time.Unix(0, m.Timestamp)) - } - } - fmt.Printf(" --------------\n") - } - buf := make([]byte, 1024*1024) - n := runtime.Stack(buf, true) - fmt.Printf("Go-routines:\n%s\n", string(buf[:n])) - stackFatalf(t, format, args...) -} - func TestCSMaxAge(t *testing.T) { for _, st := range testStores { st := st @@ -382,13 +334,13 @@ func TestCSMaxAge(t *testing.T) { expectedLast := uint64(15) first, last := msgStoreFirstAndLastSequence(t, cs.Msgs) if first != expectedFirst || last != expectedLast { - dumpFSState(t, s, "Expected first/last to be %v/%v, got %v/%v", + t.Fatalf("Expected first/last to be %v/%v, got %v/%v", expectedFirst, expectedLast, first, last) } // Wait more and all should be gone. time.Sleep(250 * time.Millisecond) if n, _ := msgStoreState(t, cs.Msgs); n != 0 { - dumpFSState(t, s, "All messages should have expired, got %v", n) + t.Fatalf("All messages should have expired, got %v", n) } // We are going to set a limit of MaxMsgs to 1 on top @@ -411,14 +363,25 @@ func TestCSMaxAge(t *testing.T) { time.Sleep(60 * time.Millisecond) // Ensure there is still 1 message... if n, _ := msgStoreState(t, cs.Msgs); n != 1 { - dumpFSState(t, s, "There should be 1 message, got %v", n) + t.Fatalf("There should be 1 message, got %v", n) } // ...which should be m2: this should not fail msgStoreLookup(t, cs.Msgs, m2.Sequence) // Again, wait more and second message should not be gone time.Sleep(100 * time.Millisecond) - if n, _ := msgStoreState(t, cs.Msgs); n != 0 { - dumpFSState(t, s, "All messages should have expired, got %v", n) + timeout := time.Now().Add(2 * time.Second) + ok := false + for time.Now().Before(timeout) { + if n, _ := msgStoreState(t, cs.Msgs); n != 0 { + time.Sleep(15 * time.Millisecond) + } else { + ok = true + break + } + } + if !ok { + n, _ := msgStoreState(t, cs.Msgs) + t.Fatalf("All messages should have expired, got %v", n) } if st.name == TypeMemory {