Skip to content

Commit

Permalink
Merge pull request #507 from nats-io/fix_flapper
Browse files Browse the repository at this point in the history
Fix flappers
  • Loading branch information
kozlovic committed Mar 9, 2018
2 parents 08c1b25 + 8fa7112 commit 5ed80b6
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 57 deletions.
8 changes: 4 additions & 4 deletions server/server_redelivery_test.go
Expand Up @@ -1116,7 +1116,7 @@ func TestQueueRedeliveryOnStartup(t *testing.T) {

ch := make(chan bool, 1)
errCh := make(chan error, 4)
skipCh := make(chan bool, 1)
skipCh := make(chan bool, 2)
restarted := int32(0)
totalMsgs := int32(10)
delivered := int32(0)
Expand Down Expand Up @@ -1181,15 +1181,15 @@ func TestQueueRedeliveryOnStartup(t *testing.T) {
}
}
// Wait for all messages to be received
if err := Wait(ch); err != nil {
t.Fatal("Did not get our messages")
}
select {
case <-ch:
case <-skipCh:
// If we have a redelivery before the server restart
// (can happen on Travis because of timing), no point
// in continuing this test.
return
case <-time.After(5 * time.Second):
t.Fatal("Did not receive all our messages")
default:
}
// Now stop server and wait more than AckWait before resarting.
Expand Down
69 changes: 16 additions & 53 deletions stores/common_msg_test.go
Expand Up @@ -6,7 +6,6 @@ import (
"fmt"
"reflect"
"runtime"
"sync/atomic"
"testing"
"time"

Expand Down Expand Up @@ -301,53 +300,6 @@ func TestCSMaxMsgs(t *testing.T) {
}
}

func dumpFSState(t *testing.T, s Store, format string, args ...interface{}) {
fs, ok := s.(*FileStore)
if !ok {
stackFatalf(t, format, args...)
return
}
fs.RLock()
channels := fs.channels
fs.RUnlock()
for name, c := range channels {
fmt.Printf("== Channel %q ==\n", name)
ms := c.Msgs.(*FileMsgStore)
ms.RLock()
nextExp := ms.expiration
totalCount := ms.totalCount
first := ms.first
last := ms.last
lenWakeChan := len(ms.bkgTasksWake)
timeTick := atomic.LoadInt64(&ms.timeTick)
ms.RUnlock()
fmt.Printf(" Now : %v\n", time.Now())
fmt.Printf(" TimeTick : %v\n", time.Unix(0, timeTick))
fmt.Printf(" Next Expiration : %v\n", time.Unix(0, nextExp))
fmt.Printf(" Sleep interval : %v\n", bkgTasksSleepDuration)
fmt.Printf(" Total Count : %v\n", totalCount)
fmt.Printf(" First : %v\n", first)
fmt.Printf(" Last : %v\n", last)
fmt.Printf(" Len wakeup chan : %v\n", lenWakeChan)
fmt.Printf(" -- Messages --\n")
for idx := first; idx <= last; idx++ {
m, err := ms.Lookup(idx)
if err != nil {
fmt.Printf(" Msg %2d : Error during lookup: %v\n", idx, err)
} else if m == nil {
fmt.Printf(" Msg %2d : Lookup returned nil\n", idx)
} else {
fmt.Printf(" Msg %2d : %v\n", idx, time.Unix(0, m.Timestamp))
}
}
fmt.Printf(" --------------\n")
}
buf := make([]byte, 1024*1024)
n := runtime.Stack(buf, true)
fmt.Printf("Go-routines:\n%s\n", string(buf[:n]))
stackFatalf(t, format, args...)
}

func TestCSMaxAge(t *testing.T) {
for _, st := range testStores {
st := st
Expand Down Expand Up @@ -382,13 +334,13 @@ func TestCSMaxAge(t *testing.T) {
expectedLast := uint64(15)
first, last := msgStoreFirstAndLastSequence(t, cs.Msgs)
if first != expectedFirst || last != expectedLast {
dumpFSState(t, s, "Expected first/last to be %v/%v, got %v/%v",
t.Fatalf("Expected first/last to be %v/%v, got %v/%v",
expectedFirst, expectedLast, first, last)
}
// Wait more and all should be gone.
time.Sleep(250 * time.Millisecond)
if n, _ := msgStoreState(t, cs.Msgs); n != 0 {
dumpFSState(t, s, "All messages should have expired, got %v", n)
t.Fatalf("All messages should have expired, got %v", n)
}

// We are going to set a limit of MaxMsgs to 1 on top
Expand All @@ -411,14 +363,25 @@ func TestCSMaxAge(t *testing.T) {
time.Sleep(60 * time.Millisecond)
// Ensure there is still 1 message...
if n, _ := msgStoreState(t, cs.Msgs); n != 1 {
dumpFSState(t, s, "There should be 1 message, got %v", n)
t.Fatalf("There should be 1 message, got %v", n)
}
// ...which should be m2: this should not fail
msgStoreLookup(t, cs.Msgs, m2.Sequence)
// Again, wait more and second message should not be gone
time.Sleep(100 * time.Millisecond)
if n, _ := msgStoreState(t, cs.Msgs); n != 0 {
dumpFSState(t, s, "All messages should have expired, got %v", n)
timeout := time.Now().Add(2 * time.Second)
ok := false
for time.Now().Before(timeout) {
if n, _ := msgStoreState(t, cs.Msgs); n != 0 {
time.Sleep(15 * time.Millisecond)
} else {
ok = true
break
}
}
if !ok {
n, _ := msgStoreState(t, cs.Msgs)
t.Fatalf("All messages should have expired, got %v", n)
}

if st.name == TypeMemory {
Expand Down

0 comments on commit 5ed80b6

Please sign in to comment.