From bb404c5c5e18ae2260429f3c32b076557301bb22 Mon Sep 17 00:00:00 2001 From: Waldemar Quevedo Date: Wed, 6 Mar 2024 20:41:22 -0800 Subject: [PATCH 1/2] Add test that shows clfs behavior on dups Signed-off-by: Waldemar Quevedo --- server/jetstream_cluster_3_test.go | 120 +++++++++++++++++++++++++++++ 1 file changed, 120 insertions(+) diff --git a/server/jetstream_cluster_3_test.go b/server/jetstream_cluster_3_test.go index 62fd24f186..a3cdc4d25a 100644 --- a/server/jetstream_cluster_3_test.go +++ b/server/jetstream_cluster_3_test.go @@ -6968,3 +6968,123 @@ func TestJetStreamClusterConsumerPauseSurvivesRestart(t *testing.T) { require_True(t, leader != nil) checkTimer(leader) } + +func TestJetStreamClusterCLFSOnDuplicates(t *testing.T) { + c := createJetStreamClusterExplicit(t, "R3S", 3) + defer c.shutdown() + + nc, js := jsClientConnect(t, c.randomServer()) + defer nc.Close() + + nc2, js2 := jsClientConnect(t, c.randomServer()) + defer nc2.Close() + + streamName := "TESTW" + _, err := js.AddStream(&nats.StreamConfig{ + Name: streamName, + Subjects: []string{"foo"}, + Replicas: 3, + Storage: nats.FileStorage, + MaxAge: 3 * time.Minute, + Duplicates: 2 * time.Minute, + }) + require_NoError(t, err) + + // Give the stream to be ready. + time.Sleep(3 * time.Second) + + var wg sync.WaitGroup + + // The test will be successful if it runs for this long without dup issues. + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute) + defer cancel() + + go func() { + tick := time.NewTicker(10 * time.Second) + for { + select { + case <-ctx.Done(): + wg.Done() + return + case <-tick.C: + c.streamLeader(globalAccountName, streamName).JetStreamStepdownStream(globalAccountName, streamName) + } + } + }() + wg.Add(1) + + for i := 0; i < 5; i++ { + go func(i int) { + var err error + sub, err := js2.PullSubscribe("foo", fmt.Sprintf("A:%d", i)) + require_NoError(t, err) + + for { + select { + case <-ctx.Done(): + wg.Done() + return + default: + } + + msgs, err := sub.Fetch(100, nats.MaxWait(200*time.Millisecond)) + if err != nil { + continue + } + for _, msg := range msgs { + msg.Ack() + } + } + }(i) + wg.Add(1) + } + + // Sync producer that only does a couple of duplicates, cancel the test + // if we get too many errors without responses. + errCh := make(chan error, 10) + go func() { + // Try sync publishes normally in this state and see if it times out. + for i := 0; ; i++ { + select { + case <-ctx.Done(): + wg.Done() + return + default: + } + + var succeeded bool + var failures int + for n := 0; n < 10; n++ { + _, err := js.Publish("foo", []byte("test"), nats.MsgId(fmt.Sprintf("sync:checking:%d", i)), nats.RetryAttempts(30), nats.AckWait(500*time.Millisecond)) + if err != nil { + failures++ + continue + } + succeeded = true + } + if !succeeded { + errCh <- fmt.Errorf("Too many publishes failed with timeout: failures=%d, i=%d", failures, i) + } + } + }() + wg.Add(1) + +Loop: + for n := uint64(0); true; n++ { + select { + case <-ctx.Done(): + break Loop + case e := <-errCh: + t.Error(e) + break Loop + default: + } + // Cause a lot of duplicates very fast until producer stalls. + for i := 0; i < 128; i++ { + msgID := nats.MsgId(fmt.Sprintf("id.%d.%d", n, i)) + js.PublishAsync("foo", []byte("test"), msgID, nats.RetryAttempts(10)) + } + } + cancel() + wg.Wait() +} From efb27727aed0f9a57b99faaa4d7171c9d32dd882 Mon Sep 17 00:00:00 2001 From: Neil Twigg Date: Thu, 7 Mar 2024 12:05:43 +0000 Subject: [PATCH 2/2] Tweaks from review Signed-off-by: Neil Twigg --- server/jetstream_cluster_3_test.go | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/server/jetstream_cluster_3_test.go b/server/jetstream_cluster_3_test.go index a3cdc4d25a..77e16de9cb 100644 --- a/server/jetstream_cluster_3_test.go +++ b/server/jetstream_cluster_3_test.go @@ -6990,8 +6990,7 @@ func TestJetStreamClusterCLFSOnDuplicates(t *testing.T) { }) require_NoError(t, err) - // Give the stream to be ready. - time.Sleep(3 * time.Second) + c.waitOnStreamLeader(globalAccountName, streamName) var wg sync.WaitGroup @@ -7055,7 +7054,12 @@ func TestJetStreamClusterCLFSOnDuplicates(t *testing.T) { var succeeded bool var failures int for n := 0; n < 10; n++ { - _, err := js.Publish("foo", []byte("test"), nats.MsgId(fmt.Sprintf("sync:checking:%d", i)), nats.RetryAttempts(30), nats.AckWait(500*time.Millisecond)) + _, err := js.Publish( + "foo", []byte("test"), + nats.MsgId(fmt.Sprintf("sync:checking:%d", i)), + nats.RetryAttempts(30), + nats.AckWait(500*time.Millisecond), + ) if err != nil { failures++ continue