diff --git a/server/jetstream_cluster_4_test.go b/server/jetstream_cluster_4_test.go index 0013e5db899..e2bb9465dae 100644 --- a/server/jetstream_cluster_4_test.go +++ b/server/jetstream_cluster_4_test.go @@ -1619,7 +1619,6 @@ NextServer: } func TestJetStreamClusterBusyStreams(t *testing.T) { - t.Skip("Too long for CI at the moment") type streamSetup struct { config *nats.StreamConfig consumers []*nats.ConsumerConfig @@ -1864,22 +1863,24 @@ func TestJetStreamClusterBusyStreams(t *testing.T) { } checkMsgsEqual := func(t *testing.T, c *cluster, accountName, streamName string) { state := getStreamDetails(t, c, accountName, streamName).State - var msets []*stream + msets := make(map[*Server]*stream) for _, s := range c.servers { acc, err := s.LookupAccount(accountName) require_NoError(t, err) mset, err := acc.lookupStream(streamName) require_NoError(t, err) - msets = append(msets, mset) + msets[s] = mset } for seq := state.FirstSeq; seq <= state.LastSeq; seq++ { var msgId string var smv StoreMsg - for _, mset := range msets { + for replica, mset := range msets { mset.mu.RLock() sm, err := mset.store.LoadMsg(seq, &smv) mset.mu.RUnlock() - require_NoError(t, err) + if err != nil { + t.Fatalf("Unexpected error loading message (seq=%d) from stream %q on replica %q: %v", seq, streamName, replica, err) + } if msgId == _EMPTY_ { msgId = string(sm.hdr) } else if msgId != string(sm.hdr) { @@ -1888,7 +1889,7 @@ func TestJetStreamClusterBusyStreams(t *testing.T) { } } } - checkConsumer := func(t *testing.T, c *cluster, accountName, streamName, consumerName string) { + checkConsumer := func(t *testing.T, c *cluster, accountName, streamName string) { t.Helper() var leader string for _, s := range c.servers { @@ -2036,7 +2037,7 @@ func TestJetStreamClusterBusyStreams(t *testing.T) { if gotMsgs != expectedMsgs { t.Errorf("stream with sources has %v messages, but total sourced messages should be %v", gotMsgs, expectedMsgs) } - checkConsumer(t, c, accName, streamName, "A") + checkConsumer(t, c, accName, streamName) checkMsgsEqual(t, c, accName, streamName) } test(t, &testParams{