Skip to content

Commit

Permalink
Add more cases to TestJetStreamClusterBusyStreams
Browse files Browse the repository at this point in the history
Signed-off-by: Waldemar Quevedo <wally@nats.io>
  • Loading branch information
wallyqs committed May 20, 2024
1 parent e10cf16 commit 8cdd6c0
Showing 1 changed file with 118 additions and 51 deletions.
169 changes: 118 additions & 51 deletions server/jetstream_cluster_4_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1619,7 +1619,7 @@ NextServer:
}

func TestJetStreamClusterBusyStreams(t *testing.T) {
t.Skip("Too long for CI at the moment")
t.Skip()
type streamSetup struct {
config *nats.StreamConfig
consumers []*nats.ConsumerConfig
Expand Down Expand Up @@ -1722,7 +1722,8 @@ func TestJetStreamClusterBusyStreams(t *testing.T) {
}

for _, subject := range subjects {
_, err := js.Publish(subject, payload, nats.AckWait(200*time.Millisecond))
msgID := nats.MsgId(fmt.Sprintf("n:%d", n))
_, err := js.Publish(subject, payload, nats.AckWait(200*time.Millisecond), msgID)
if err == nil {
if nn := n.Add(1); int(nn) >= test.producerMsgs {
return
Expand Down Expand Up @@ -2062,58 +2063,124 @@ func TestJetStreamClusterBusyStreams(t *testing.T) {
})
})

t.Run("R3F/streams:30/limits", func(t *testing.T) {
testDuration := 3 * time.Minute
totalStreams := 30
consumersPerStream := 5
streams := make([]*streamSetup, totalStreams)
for i := 0; i < totalStreams; i++ {
name := fmt.Sprintf("test:%d", i)
st := &streamSetup{
config: &nats.StreamConfig{
Name: name,
Subjects: []string{fmt.Sprintf("test.%d.*", i)},
Replicas: 3,
Retention: nats.LimitsPolicy,
},
consumers: make([]*nats.ConsumerConfig, 0),
}
for j := 0; j < consumersPerStream; j++ {
subject := fmt.Sprintf("test.%d.%d", i, j)
name := fmt.Sprintf("A:%d:%d", i, j)
cc := &nats.ConsumerConfig{
Name: name,
Durable: name,
FilterSubject: subject,
AckPolicy: nats.AckExplicitPolicy,
t.Run("rollouts", func(t *testing.T) {
shared := func(t *testing.T, sc *nats.StreamConfig, tp *testParams) func(t *testing.T) {
return func(t *testing.T) {
testDuration := 3 * time.Minute
totalStreams := 30
consumersPerStream := 5
streams := make([]*streamSetup, totalStreams)
for i := 0; i < totalStreams; i++ {
name := fmt.Sprintf("test:%d", i)
st := &streamSetup{
config: &nats.StreamConfig{
Name: name,
Subjects: []string{fmt.Sprintf("test.%d.*", i)},
Replicas: 3,
Discard: sc.Discard,
Retention: sc.Retention,
Storage: sc.Storage,
MaxMsgs: sc.MaxMsgs,
MaxBytes: sc.MaxBytes,
MaxAge: sc.MaxAge,
},
consumers: make([]*nats.ConsumerConfig, 0),
}
for j := 0; j < consumersPerStream; j++ {
subject := fmt.Sprintf("test.%d.%d", i, j)
name := fmt.Sprintf("A:%d:%d", i, j)
cc := &nats.ConsumerConfig{
Name: name,
Durable: name,
FilterSubject: subject,
AckPolicy: nats.AckExplicitPolicy,
}
st.consumers = append(st.consumers, cc)
st.subjects = append(st.subjects, subject)
}
streams[i] = st
}
expect := func(t *testing.T, nc *nats.Conn, js nats.JetStreamContext, c *cluster) {
time.Sleep(testDuration + 1*time.Minute)
accName := "js"
for i := 0; i < totalStreams; i++ {
streamName := fmt.Sprintf("test:%d", i)
checkMsgsEqual(t, c, accName, streamName)
}
}
st.consumers = append(st.consumers, cc)
st.subjects = append(st.subjects, subject)
test(t, &testParams{
cluster: t.Name(),
streams: streams,
producers: 10,
consumers: 10,
restarts: tp.restarts,
rolloutRestart: tp.rolloutRestart,
ldmRestart: tp.ldmRestart,
checkHealthz: tp.checkHealthz,
restartWait: tp.restartWait,
expect: expect,
duration: testDuration,
producerMsgSize: 1024,
producerMsgs: 100_000,
})
}
streams[i] = st
}
expect := func(t *testing.T, nc *nats.Conn, js nats.JetStreamContext, c *cluster) {
time.Sleep(testDuration + 1*time.Minute)
accName := "js"
for i := 0; i < totalStreams; i++ {
streamName := fmt.Sprintf("test:%d", i)
checkMsgsEqual(t, c, accName, streamName)
}
for prefix, st := range map[string]nats.StorageType{"R3F": nats.FileStorage, "R3M": nats.MemoryStorage} {
t.Run(prefix, func(t *testing.T) {
for rolloutType, params := range map[string]*testParams{
// Rollouts using graceful restarts and checking healthz.
"ldm": &testParams{
restarts: 1,
rolloutRestart: true,
ldmRestart: true,
checkHealthz: true,
restartWait: 45 * time.Second,
},
// Non graceful restarts calling Shutdown, but using healthz on startup.
"term": &testParams{
restarts: 1,
rolloutRestart: true,
ldmRestart: false,
checkHealthz: true,
restartWait: 45 * time.Second,
},
} {
t.Run(rolloutType, func(t *testing.T) {
t.Run("limits", shared(t, &nats.StreamConfig{
Retention: nats.LimitsPolicy,
Storage: st,
}, params))
t.Run("wq", shared(t, &nats.StreamConfig{
Retention: nats.WorkQueuePolicy,
Storage: st,
}, params))
t.Run("interest", shared(t, &nats.StreamConfig{
Retention: nats.InterestPolicy,
Storage: st,
}, params))
t.Run("limits:dn:max-per-subject", shared(t, &nats.StreamConfig{
Retention: nats.LimitsPolicy,
Storage: st,
MaxMsgsPerSubject: 1,
Discard: nats.DiscardNew,
}, params))
t.Run("wq:dn:max-msgs", shared(t, &nats.StreamConfig{
Retention: nats.WorkQueuePolicy,
Storage: st,
MaxMsgs: 10_000,
Discard: nats.DiscardNew,
}, params))
t.Run("wq:dn-per-subject:max-msgs", shared(t, &nats.StreamConfig{
Retention: nats.WorkQueuePolicy,
Storage: st,
MaxMsgs: 10_000,
MaxMsgsPerSubject: 100,
Discard: nats.DiscardNew,
DiscardNewPerSubject: true,
}, params))
})
}
})
}
test(t, &testParams{
cluster: t.Name(),
streams: streams,
producers: 10,
consumers: 10,
restarts: 1,
rolloutRestart: true,
ldmRestart: true,
checkHealthz: true,
restartWait: 45 * time.Second,
expect: expect,
duration: testDuration,
producerMsgSize: 1024,
producerMsgs: 100_000,
})
})
}

0 comments on commit 8cdd6c0

Please sign in to comment.