Skip to content

Commit

Permalink
Adding more info about failed check
Browse files Browse the repository at this point in the history
Signed-off-by: Waldemar Quevedo <wally@nats.io>
  • Loading branch information
wallyqs committed May 21, 2024
1 parent 185e0fa commit 39a9af0
Showing 1 changed file with 8 additions and 7 deletions.
15 changes: 8 additions & 7 deletions server/jetstream_cluster_4_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1619,7 +1619,6 @@ NextServer:
}

func TestJetStreamClusterBusyStreams(t *testing.T) {
t.Skip("Too long for CI at the moment")
type streamSetup struct {
config *nats.StreamConfig
consumers []*nats.ConsumerConfig
Expand Down Expand Up @@ -1864,22 +1863,24 @@ func TestJetStreamClusterBusyStreams(t *testing.T) {
}
checkMsgsEqual := func(t *testing.T, c *cluster, accountName, streamName string) {
state := getStreamDetails(t, c, accountName, streamName).State
var msets []*stream
msets := make(map[*Server]*stream)
for _, s := range c.servers {
acc, err := s.LookupAccount(accountName)
require_NoError(t, err)
mset, err := acc.lookupStream(streamName)
require_NoError(t, err)
msets = append(msets, mset)
msets[s] = mset
}
for seq := state.FirstSeq; seq <= state.LastSeq; seq++ {
var msgId string
var smv StoreMsg
for _, mset := range msets {
for replica, mset := range msets {
mset.mu.RLock()
sm, err := mset.store.LoadMsg(seq, &smv)
mset.mu.RUnlock()
require_NoError(t, err)
if err != nil {
t.Fatalf("Unexpected error loading message (seq=%d) from stream %q on replica %q: %v", seq, streamName, replica, err)
}
if msgId == _EMPTY_ {
msgId = string(sm.hdr)
} else if msgId != string(sm.hdr) {
Expand All @@ -1888,7 +1889,7 @@ func TestJetStreamClusterBusyStreams(t *testing.T) {
}
}
}
checkConsumer := func(t *testing.T, c *cluster, accountName, streamName, consumerName string) {
checkConsumer := func(t *testing.T, c *cluster, accountName, streamName string) {
t.Helper()
var leader string
for _, s := range c.servers {
Expand Down Expand Up @@ -2036,7 +2037,7 @@ func TestJetStreamClusterBusyStreams(t *testing.T) {
if gotMsgs != expectedMsgs {
t.Errorf("stream with sources has %v messages, but total sourced messages should be %v", gotMsgs, expectedMsgs)
}
checkConsumer(t, c, accName, streamName, "A")
checkConsumer(t, c, accName, streamName)
checkMsgsEqual(t, c, accName, streamName)
}
test(t, &testParams{
Expand Down

0 comments on commit 39a9af0

Please sign in to comment.