diff --git a/server/jetstream_cluster_4_test.go b/server/jetstream_cluster_4_test.go index 063aa716d6..fb13ba90cb 100644 --- a/server/jetstream_cluster_4_test.go +++ b/server/jetstream_cluster_4_test.go @@ -1619,7 +1619,7 @@ NextServer: } func TestJetStreamClusterBusyStreams(t *testing.T) { - t.Skip("Too long for CI at the moment") + t.Skip() type streamSetup struct { config *nats.StreamConfig consumers []*nats.ConsumerConfig @@ -1722,7 +1722,8 @@ func TestJetStreamClusterBusyStreams(t *testing.T) { } for _, subject := range subjects { - _, err := js.Publish(subject, payload, nats.AckWait(200*time.Millisecond)) + msgID := nats.MsgId(fmt.Sprintf("n:%d", n)) + _, err := js.Publish(subject, payload, nats.AckWait(200*time.Millisecond), msgID) if err == nil { if nn := n.Add(1); int(nn) >= test.producerMsgs { return @@ -2062,58 +2063,124 @@ func TestJetStreamClusterBusyStreams(t *testing.T) { }) }) - t.Run("R3F/streams:30/limits", func(t *testing.T) { - testDuration := 3 * time.Minute - totalStreams := 30 - consumersPerStream := 5 - streams := make([]*streamSetup, totalStreams) - for i := 0; i < totalStreams; i++ { - name := fmt.Sprintf("test:%d", i) - st := &streamSetup{ - config: &nats.StreamConfig{ - Name: name, - Subjects: []string{fmt.Sprintf("test.%d.*", i)}, - Replicas: 3, - Retention: nats.LimitsPolicy, - }, - consumers: make([]*nats.ConsumerConfig, 0), - } - for j := 0; j < consumersPerStream; j++ { - subject := fmt.Sprintf("test.%d.%d", i, j) - name := fmt.Sprintf("A:%d:%d", i, j) - cc := &nats.ConsumerConfig{ - Name: name, - Durable: name, - FilterSubject: subject, - AckPolicy: nats.AckExplicitPolicy, + t.Run("rollouts", func(t *testing.T) { + shared := func(t *testing.T, sc *nats.StreamConfig, tp *testParams) func(t *testing.T) { + return func(t *testing.T) { + testDuration := 3 * time.Minute + totalStreams := 30 + consumersPerStream := 5 + streams := make([]*streamSetup, totalStreams) + for i := 0; i < totalStreams; i++ { + name := fmt.Sprintf("test:%d", i) + st := &streamSetup{ + config: &nats.StreamConfig{ + Name: name, + Subjects: []string{fmt.Sprintf("test.%d.*", i)}, + Replicas: 3, + Discard: sc.Discard, + Retention: sc.Retention, + Storage: sc.Storage, + MaxMsgs: sc.MaxMsgs, + MaxBytes: sc.MaxBytes, + MaxAge: sc.MaxAge, + }, + consumers: make([]*nats.ConsumerConfig, 0), + } + for j := 0; j < consumersPerStream; j++ { + subject := fmt.Sprintf("test.%d.%d", i, j) + name := fmt.Sprintf("A:%d:%d", i, j) + cc := &nats.ConsumerConfig{ + Name: name, + Durable: name, + FilterSubject: subject, + AckPolicy: nats.AckExplicitPolicy, + } + st.consumers = append(st.consumers, cc) + st.subjects = append(st.subjects, subject) + } + streams[i] = st + } + expect := func(t *testing.T, nc *nats.Conn, js nats.JetStreamContext, c *cluster) { + time.Sleep(testDuration + 1*time.Minute) + accName := "js" + for i := 0; i < totalStreams; i++ { + streamName := fmt.Sprintf("test:%d", i) + checkMsgsEqual(t, c, accName, streamName) + } } - st.consumers = append(st.consumers, cc) - st.subjects = append(st.subjects, subject) + test(t, &testParams{ + cluster: t.Name(), + streams: streams, + producers: 10, + consumers: 10, + restarts: tp.restarts, + rolloutRestart: tp.rolloutRestart, + ldmRestart: tp.ldmRestart, + checkHealthz: tp.checkHealthz, + restartWait: tp.restartWait, + expect: expect, + duration: testDuration, + producerMsgSize: 1024, + producerMsgs: 100_000, + }) } - streams[i] = st } - expect := func(t *testing.T, nc *nats.Conn, js nats.JetStreamContext, c *cluster) { - time.Sleep(testDuration + 1*time.Minute) - accName := "js" - for i := 0; i < totalStreams; i++ { - streamName := fmt.Sprintf("test:%d", i) - checkMsgsEqual(t, c, accName, streamName) - } + for prefix, st := range map[string]nats.StorageType{"R3F": nats.FileStorage, "R3M": nats.MemoryStorage} { + t.Run(prefix, func(t *testing.T) { + for rolloutType, params := range map[string]*testParams{ + // Rollouts using graceful restarts and checking healthz. + "ldm": &testParams{ + restarts: 1, + rolloutRestart: true, + ldmRestart: true, + checkHealthz: true, + restartWait: 45 * time.Second, + }, + // Non graceful restarts calling Shutdown, but using healthz on startup. + "term": &testParams{ + restarts: 1, + rolloutRestart: true, + ldmRestart: false, + checkHealthz: true, + restartWait: 45 * time.Second, + }, + } { + t.Run(rolloutType, func(t *testing.T) { + t.Run("limits", shared(t, &nats.StreamConfig{ + Retention: nats.LimitsPolicy, + Storage: st, + }, params)) + t.Run("wq", shared(t, &nats.StreamConfig{ + Retention: nats.WorkQueuePolicy, + Storage: st, + }, params)) + t.Run("interest", shared(t, &nats.StreamConfig{ + Retention: nats.InterestPolicy, + Storage: st, + }, params)) + t.Run("limits:dn:max-per-subject", shared(t, &nats.StreamConfig{ + Retention: nats.LimitsPolicy, + Storage: st, + MaxMsgsPerSubject: 1, + Discard: nats.DiscardNew, + }, params)) + t.Run("wq:dn:max-msgs", shared(t, &nats.StreamConfig{ + Retention: nats.WorkQueuePolicy, + Storage: st, + MaxMsgs: 10_000, + Discard: nats.DiscardNew, + }, params)) + t.Run("wq:dn-per-subject:max-msgs", shared(t, &nats.StreamConfig{ + Retention: nats.WorkQueuePolicy, + Storage: st, + MaxMsgs: 10_000, + MaxMsgsPerSubject: 100, + Discard: nats.DiscardNew, + DiscardNewPerSubject: true, + }, params)) + }) + } + }) } - test(t, &testParams{ - cluster: t.Name(), - streams: streams, - producers: 10, - consumers: 10, - restarts: 1, - rolloutRestart: true, - ldmRestart: true, - checkHealthz: true, - restartWait: 45 * time.Second, - expect: expect, - duration: testDuration, - producerMsgSize: 1024, - producerMsgs: 100_000, - }) }) }