diff --git a/server/jetstream_cluster_3_test.go b/server/jetstream_cluster_3_test.go index 349f0fd7ac..2e5e533738 100644 --- a/server/jetstream_cluster_3_test.go +++ b/server/jetstream_cluster_3_test.go @@ -6352,651 +6352,3 @@ Consume3: t.Errorf("Consumers to same stream are at different sequences: %d vs %d", a, b) } } - -func TestJetStreamClusterWorkQueueStreamDiscardNewDesync(t *testing.T) { - t.Run("max msgs", func(t *testing.T) { - testJetStreamClusterWorkQueueStreamDiscardNewDesync(t, &nats.StreamConfig{ - Name: "WQTEST_MM", - Subjects: []string{"messages.*"}, - Replicas: 3, - MaxAge: 10 * time.Minute, - MaxMsgs: 100, - Retention: nats.WorkQueuePolicy, - Discard: nats.DiscardNew, - }) - }) - t.Run("max bytes", func(t *testing.T) { - testJetStreamClusterWorkQueueStreamDiscardNewDesync(t, &nats.StreamConfig{ - Name: "WQTEST_MB", - Subjects: []string{"messages.*"}, - Replicas: 3, - MaxAge: 10 * time.Minute, - MaxBytes: 1 * 1024 * 1024, - Retention: nats.WorkQueuePolicy, - Discard: nats.DiscardNew, - }) - }) -} - -func testJetStreamClusterWorkQueueStreamDiscardNewDesync(t *testing.T, sc *nats.StreamConfig) { - conf := ` - listen: 127.0.0.1:-1 - server_name: %s - jetstream: { - store_dir: '%s', - } - cluster { - name: %s - listen: 127.0.0.1:%d - routes = [%s] - } - system_account: sys - no_auth_user: js - accounts { - sys { - users = [ - { user: sys, pass: sys } - ] - } - js { - jetstream = enabled - users = [ - { user: js, pass: js } - ] - } - }` - c := createJetStreamClusterWithTemplate(t, conf, sc.Name, 3) - defer c.shutdown() - - nc, js := jsClientConnect(t, c.randomServer()) - defer nc.Close() - - cnc, cjs := jsClientConnect(t, c.randomServer()) - defer cnc.Close() - - _, err := js.AddStream(sc) - require_NoError(t, err) - - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - psub, err := cjs.PullSubscribe("messages.*", "consumer") - require_NoError(t, err) - - stepDown := func() { - _, err = nc.Request(fmt.Sprintf(JSApiStreamLeaderStepDownT, sc.Name), nil, time.Second) - } - - // Messages will be produced and consumed in parallel, then once there are - // enough errors a leader election will be triggered. - var ( - wg sync.WaitGroup - received uint64 - errCh = make(chan error, 100_000) - receivedMap = make(map[string]*nats.Msg) - ) - wg.Add(1) - go func() { - tick := time.NewTicker(20 * time.Millisecond) - for { - select { - case <-ctx.Done(): - wg.Done() - return - case <-tick.C: - msgs, err := psub.Fetch(10, nats.MaxWait(200*time.Millisecond)) - if err != nil { - // The consumer will continue to timeout here eventually. - continue - } - for _, msg := range msgs { - received++ - receivedMap[msg.Subject] = msg - msg.Ack() - } - } - } - }() - - shouldDrop := make(map[string]error) - wg.Add(1) - go func() { - payload := []byte(strings.Repeat("A", 1024)) - tick := time.NewTicker(1 * time.Millisecond) - for i := 1; ; i++ { - select { - case <-ctx.Done(): - wg.Done() - return - case <-tick.C: - subject := fmt.Sprintf("messages.%d", i) - _, err := js.Publish(subject, payload, nats.RetryAttempts(0)) - if err != nil { - errCh <- err - } - // Capture the messages that have failed. - if err != nil { - shouldDrop[subject] = err - } - } - } - }() - - // Collect enough errors to cause things to get out of sync. - var errCount int -Setup: - for { - select { - case err = <-errCh: - errCount++ - if errCount%500 == 0 { - stepDown() - } else if errCount >= 2000 { - // Stop both producing and consuming. - cancel() - break Setup - } - case <-time.After(5 * time.Second): - // Unblock the test and continue. - cancel() - break Setup - } - } - - // Both goroutines should be exiting now.. - wg.Wait() - - // Let acks propagate for stream checks. - time.Sleep(250 * time.Millisecond) - - // Check messages that ought to have been dropped. - for subject := range receivedMap { - found, ok := shouldDrop[subject] - if ok { - t.Errorf("Should have dropped message published on %q since got error: %v", subject, found) - } - } -} - -// https://github.com/nats-io/nats-server/issues/5071 -func TestJetStreamClusterStreamPlacementDistribution(t *testing.T) { - c := createJetStreamClusterExplicit(t, "R3S", 5) - defer c.shutdown() - - s := c.randomNonLeader() - nc, js := jsClientConnect(t, s) - defer nc.Close() - - for i := 1; i <= 10; i++ { - _, err := js.AddStream(&nats.StreamConfig{ - Name: fmt.Sprintf("TEST:%d", i), - Subjects: []string{fmt.Sprintf("foo.%d.*", i)}, - Replicas: 3, - }) - require_NoError(t, err) - } - - // 10 streams, 3 replicas div 5 servers. - expectedStreams := 10 * 3 / 5 - for _, s := range c.servers { - jsz, err := s.Jsz(nil) - require_NoError(t, err) - require_Equal(t, jsz.Streams, expectedStreams) - } -} - -func TestJetStreamClusterConsumerPauseViaConfig(t *testing.T) { - c := createJetStreamClusterExplicit(t, "R3S", 3) - defer c.shutdown() - - nc, js := jsClientConnect(t, c.randomServer()) - defer nc.Close() - - _, err := js.AddStream(&nats.StreamConfig{ - Name: "TEST", - Subjects: []string{"foo"}, - Replicas: 3, - }) - require_NoError(t, err) - - jsTestPause_CreateOrUpdateConsumer(t, nc, ActionCreate, "TEST", ConsumerConfig{ - Name: "my_consumer", - Replicas: 3, - }) - - sub, err := js.PullSubscribe("foo", "", nats.Bind("TEST", "my_consumer")) - require_NoError(t, err) - - stepdown := func() { - t.Helper() - _, err := nc.Request(fmt.Sprintf(JSApiConsumerLeaderStepDownT, "TEST", "my_consumer"), nil, time.Second) - require_NoError(t, err) - c.waitOnConsumerLeader(globalAccountName, "TEST", "my_consumer") - } - - publish := func(wait time.Duration) { - t.Helper() - for i := 0; i < 5; i++ { - _, err = js.Publish("foo", []byte("OK")) - require_NoError(t, err) - } - msgs, err := sub.Fetch(5, nats.MaxWait(wait)) - require_NoError(t, err) - require_Equal(t, len(msgs), 5) - } - - // This should be fast as there's no deadline. - publish(time.Second) - - // Now we're going to set the deadline. - deadline := jsTestPause_PauseConsumer(t, nc, "TEST", "my_consumer", time.Now().Add(time.Second*3)) - c.waitOnAllCurrent() - - // It will now take longer than 3 seconds. - publish(time.Second * 5) - require_True(t, time.Now().After(deadline)) - - // The next set of publishes after the deadline should now be fast. - publish(time.Second) - - // We'll kick the leader, but since we're after the deadline, this - // should still be fast. - stepdown() - publish(time.Second) - - // Now we're going to do an update and then immediately kick the - // leader. The pause should still be in effect afterwards. - deadline = jsTestPause_PauseConsumer(t, nc, "TEST", "my_consumer", time.Now().Add(time.Second*3)) - c.waitOnAllCurrent() - publish(time.Second * 5) - require_True(t, time.Now().After(deadline)) - - // The next set of publishes after the deadline should now be fast. - publish(time.Second) -} - -func TestJetStreamClusterConsumerPauseViaEndpoint(t *testing.T) { - c := createJetStreamClusterExplicit(t, "R3S", 3) - defer c.shutdown() - - nc, js := jsClientConnect(t, c.randomServer()) - defer nc.Close() - - _, err := js.AddStream(&nats.StreamConfig{ - Name: "TEST", - Subjects: []string{"push", "pull"}, - Replicas: 3, - }) - require_NoError(t, err) - - t.Run("PullConsumer", func(t *testing.T) { - _, err := js.AddConsumer("TEST", &nats.ConsumerConfig{ - Name: "pull_consumer", - }) - require_NoError(t, err) - - sub, err := js.PullSubscribe("pull", "", nats.Bind("TEST", "pull_consumer")) - require_NoError(t, err) - - // This should succeed as there's no pause, so it definitely - // shouldn't take more than a second. - for i := 0; i < 10; i++ { - _, err = js.Publish("pull", []byte("OK")) - require_NoError(t, err) - } - msgs, err := sub.Fetch(10, nats.MaxWait(time.Second)) - require_NoError(t, err) - require_Equal(t, len(msgs), 10) - - // Now we'll pause the consumer for 3 seconds. - deadline := time.Now().Add(time.Second * 3) - require_True(t, jsTestPause_PauseConsumer(t, nc, "TEST", "pull_consumer", deadline).Equal(deadline)) - c.waitOnAllCurrent() - - // This should fail as we'll wait for only half of the deadline. - for i := 0; i < 10; i++ { - _, err = js.Publish("pull", []byte("OK")) - require_NoError(t, err) - } - _, err = sub.Fetch(10, nats.MaxWait(time.Until(deadline)/2)) - require_Error(t, err, nats.ErrTimeout) - - // This should succeed after a short wait, and when we're done, - // we should be after the deadline. - msgs, err = sub.Fetch(10) - require_NoError(t, err) - require_Equal(t, len(msgs), 10) - require_True(t, time.Now().After(deadline)) - - // This should succeed as there's no pause, so it definitely - // shouldn't take more than a second. - for i := 0; i < 10; i++ { - _, err = js.Publish("pull", []byte("OK")) - require_NoError(t, err) - } - msgs, err = sub.Fetch(10, nats.MaxWait(time.Second)) - require_NoError(t, err) - require_Equal(t, len(msgs), 10) - - require_True(t, jsTestPause_PauseConsumer(t, nc, "TEST", "pull_consumer", time.Time{}).Equal(time.Time{})) - c.waitOnAllCurrent() - - // This should succeed as there's no pause, so it definitely - // shouldn't take more than a second. - for i := 0; i < 10; i++ { - _, err = js.Publish("pull", []byte("OK")) - require_NoError(t, err) - } - msgs, err = sub.Fetch(10, nats.MaxWait(time.Second)) - require_NoError(t, err) - require_Equal(t, len(msgs), 10) - }) - - t.Run("PushConsumer", func(t *testing.T) { - ch := make(chan *nats.Msg, 100) - _, err = js.ChanSubscribe("push", ch, nats.BindStream("TEST"), nats.ConsumerName("push_consumer")) - require_NoError(t, err) - - // This should succeed as there's no pause, so it definitely - // shouldn't take more than a second. - for i := 0; i < 10; i++ { - _, err = js.Publish("push", []byte("OK")) - require_NoError(t, err) - } - for i := 0; i < 10; i++ { - msg := require_ChanRead(t, ch, time.Second) - require_NotEqual(t, msg, nil) - } - - // Now we'll pause the consumer for 3 seconds. - deadline := time.Now().Add(time.Second * 3) - require_True(t, jsTestPause_PauseConsumer(t, nc, "TEST", "push_consumer", deadline).Equal(deadline)) - c.waitOnAllCurrent() - - // This should succeed after a short wait, and when we're done, - // we should be after the deadline. - for i := 0; i < 10; i++ { - _, err = js.Publish("push", []byte("OK")) - require_NoError(t, err) - } - for i := 0; i < 10; i++ { - msg := require_ChanRead(t, ch, time.Second*5) - require_NotEqual(t, msg, nil) - require_True(t, time.Now().After(deadline)) - } - - // This should succeed as there's no pause, so it definitely - // shouldn't take more than a second. - for i := 0; i < 10; i++ { - _, err = js.Publish("push", []byte("OK")) - require_NoError(t, err) - } - for i := 0; i < 10; i++ { - msg := require_ChanRead(t, ch, time.Second) - require_NotEqual(t, msg, nil) - } - - require_True(t, jsTestPause_PauseConsumer(t, nc, "TEST", "push_consumer", time.Time{}).Equal(time.Time{})) - c.waitOnAllCurrent() - - // This should succeed as there's no pause, so it definitely - // shouldn't take more than a second. - for i := 0; i < 10; i++ { - _, err = js.Publish("push", []byte("OK")) - require_NoError(t, err) - } - for i := 0; i < 10; i++ { - msg := require_ChanRead(t, ch, time.Second) - require_NotEqual(t, msg, nil) - } - }) -} - -func TestJetStreamClusterConsumerPauseTimerFollowsLeader(t *testing.T) { - c := createJetStreamClusterExplicit(t, "R3S", 3) - defer c.shutdown() - - nc, js := jsClientConnect(t, c.randomServer()) - defer nc.Close() - - _, err := js.AddStream(&nats.StreamConfig{ - Name: "TEST", - Subjects: []string{"foo"}, - Replicas: 3, - }) - require_NoError(t, err) - - deadline := time.Now().Add(time.Hour) - jsTestPause_CreateOrUpdateConsumer(t, nc, ActionCreate, "TEST", ConsumerConfig{ - Name: "my_consumer", - PauseUntil: &deadline, - Replicas: 3, - }) - - for i := 0; i < 10; i++ { - c.waitOnConsumerLeader(globalAccountName, "TEST", "my_consumer") - c.waitOnAllCurrent() - - for _, s := range c.servers { - stream, err := s.gacc.lookupStream("TEST") - require_NoError(t, err) - - consumer := stream.lookupConsumer("my_consumer") - require_NotEqual(t, consumer, nil) - - isLeader := s.JetStreamIsConsumerLeader(globalAccountName, "TEST", "my_consumer") - - consumer.mu.RLock() - hasTimer := consumer.uptmr != nil - consumer.mu.RUnlock() - - require_Equal(t, isLeader, hasTimer) - } - - _, err = nc.Request(fmt.Sprintf(JSApiConsumerLeaderStepDownT, "TEST", "my_consumer"), nil, time.Second) - require_NoError(t, err) - } -} - -func TestJetStreamClusterConsumerPauseHeartbeats(t *testing.T) { - c := createJetStreamClusterExplicit(t, "R3S", 3) - defer c.shutdown() - - nc, js := jsClientConnect(t, c.randomServer()) - defer nc.Close() - - _, err := js.AddStream(&nats.StreamConfig{ - Name: "TEST", - Subjects: []string{"foo"}, - Replicas: 3, - }) - require_NoError(t, err) - - deadline := time.Now().Add(time.Hour) - dsubj := "deliver_subj" - - ci := jsTestPause_CreateOrUpdateConsumer(t, nc, ActionCreate, "TEST", ConsumerConfig{ - Name: "my_consumer", - PauseUntil: &deadline, - Heartbeat: time.Millisecond * 100, - DeliverSubject: dsubj, - }) - require_True(t, ci.Config.PauseUntil.Equal(deadline)) - - ch := make(chan *nats.Msg, 10) - _, err = nc.ChanSubscribe(dsubj, ch) - require_NoError(t, err) - - for i := 0; i < 20; i++ { - msg := require_ChanRead(t, ch, time.Millisecond*200) - require_Equal(t, msg.Header.Get("Status"), "100") - require_Equal(t, msg.Header.Get("Description"), "Idle Heartbeat") - } -} - -func TestJetStreamClusterConsumerPauseAdvisories(t *testing.T) { - c := createJetStreamClusterExplicit(t, "R3S", 3) - defer c.shutdown() - - nc, js := jsClientConnect(t, c.randomServer()) - defer nc.Close() - - checkAdvisory := func(msg *nats.Msg, shouldBePaused bool, deadline time.Time) { - t.Helper() - var advisory JSConsumerPauseAdvisory - require_NoError(t, json.Unmarshal(msg.Data, &advisory)) - require_Equal(t, advisory.Stream, "TEST") - require_Equal(t, advisory.Consumer, "my_consumer") - require_Equal(t, advisory.Paused, shouldBePaused) - require_True(t, advisory.PauseUntil.Equal(deadline)) - } - - _, err := js.AddStream(&nats.StreamConfig{ - Name: "TEST", - Subjects: []string{"foo"}, - Replicas: 3, - }) - require_NoError(t, err) - - ch := make(chan *nats.Msg, 10) - _, err = nc.ChanSubscribe(JSAdvisoryConsumerPausePre+".TEST.my_consumer", ch) - require_NoError(t, err) - - deadline := time.Now().Add(time.Second) - jsTestPause_CreateOrUpdateConsumer(t, nc, ActionCreate, "TEST", ConsumerConfig{ - Name: "my_consumer", - PauseUntil: &deadline, - Replicas: 3, - }) - - // First advisory should tell us that the consumer was paused - // on creation. - msg := require_ChanRead(t, ch, time.Second*2) - checkAdvisory(msg, true, deadline) - require_Len(t, len(ch), 0) // Should only receive one advisory. - - // The second one for the unpause. - msg = require_ChanRead(t, ch, time.Second*2) - checkAdvisory(msg, false, deadline) - require_Len(t, len(ch), 0) // Should only receive one advisory. - - // Now we'll pause the consumer for a second using the API. - deadline = time.Now().Add(time.Second) - require_True(t, jsTestPause_PauseConsumer(t, nc, "TEST", "my_consumer", deadline).Equal(deadline)) - - // Third advisory should tell us about the pause via the API. - msg = require_ChanRead(t, ch, time.Second*2) - checkAdvisory(msg, true, deadline) - require_Len(t, len(ch), 0) // Should only receive one advisory. - - // Finally that should unpause. - msg = require_ChanRead(t, ch, time.Second*2) - checkAdvisory(msg, false, deadline) - require_Len(t, len(ch), 0) // Should only receive one advisory. - - // Now we're going to set the deadline into the future so we can - // see what happens when we kick leaders or restart. - deadline = time.Now().Add(time.Hour) - require_True(t, jsTestPause_PauseConsumer(t, nc, "TEST", "my_consumer", deadline).Equal(deadline)) - - // Setting the deadline should have generated an advisory. - msg = require_ChanRead(t, ch, time.Second) - checkAdvisory(msg, true, deadline) - require_Len(t, len(ch), 0) // Should only receive one advisory. - - // Try to kick the consumer leader. - srv := c.consumerLeader(globalAccountName, "TEST", "my_consumer") - srv.JetStreamStepdownConsumer(globalAccountName, "TEST", "my_consumer") - c.waitOnConsumerLeader(globalAccountName, "TEST", "my_consumer") - - // This shouldn't have generated an advisory. - require_NoChanRead(t, ch, time.Second) -} - -func TestJetStreamClusterConsumerPauseSurvivesRestart(t *testing.T) { - c := createJetStreamClusterExplicit(t, "R3S", 3) - defer c.shutdown() - - nc, js := jsClientConnect(t, c.randomServer()) - defer nc.Close() - - checkTimer := func(s *Server) { - stream, err := s.gacc.lookupStream("TEST") - require_NoError(t, err) - - consumer := stream.lookupConsumer("my_consumer") - require_NotEqual(t, consumer, nil) - - consumer.mu.RLock() - timer := consumer.uptmr - consumer.mu.RUnlock() - require_True(t, timer != nil) - } - - _, err := js.AddStream(&nats.StreamConfig{ - Name: "TEST", - Subjects: []string{"foo"}, - Replicas: 3, - }) - require_NoError(t, err) - - deadline := time.Now().Add(time.Hour) - jsTestPause_CreateOrUpdateConsumer(t, nc, ActionCreate, "TEST", ConsumerConfig{ - Name: "my_consumer", - PauseUntil: &deadline, - Replicas: 3, - }) - - // First try with just restarting the consumer leader. - srv := c.consumerLeader(globalAccountName, "TEST", "my_consumer") - srv.Shutdown() - c.restartServer(srv) - c.waitOnAllCurrent() - c.waitOnConsumerLeader(globalAccountName, "TEST", "my_consumer") - leader := c.consumerLeader(globalAccountName, "TEST", "my_consumer") - require_True(t, leader != nil) - checkTimer(leader) - - // Then try restarting the entire cluster. - c.stopAll() - c.restartAllSamePorts() - c.waitOnAllCurrent() - c.waitOnConsumerLeader(globalAccountName, "TEST", "my_consumer") - leader = c.consumerLeader(globalAccountName, "TEST", "my_consumer") - require_True(t, leader != nil) - checkTimer(leader) -} - -func TestJetStreamClusterStreamPedantic(t *testing.T) { - c := createJetStreamClusterExplicit(t, "R3S", 3) - defer c.shutdown() - - nc, _ := jsClientConnect(t, c.randomServer()) - defer nc.Close() - - _, err := addStreamPedanticWithError(t, nc, &StreamRequest{ - StreamConfig: StreamConfig{ - Name: "TEST", - MaxAge: 1 * time.Minute, - Duplicates: 1 * time.Hour, - Storage: FileStorage, - }, - Pedantic: true, - }) - require_Error(t, err) - fmt.Printf("Error: %v\n", err.Description) - - _, err = addStreamPedanticWithError(t, nc, &StreamRequest{ - StreamConfig: StreamConfig{ - Name: "TEST", - MaxAge: 1 * time.Hour, - Duplicates: 1 * time.Minute, - Storage: FileStorage, - }, - Pedantic: true, - }) - if err != nil { - t.Fatalf("Error: %v", err) - } - -} diff --git a/server/jetstream_consumer_test.go b/server/jetstream_consumer_test.go index 25faa9e599..0e2f920357 100644 --- a/server/jetstream_consumer_test.go +++ b/server/jetstream_consumer_test.go @@ -21,6 +21,7 @@ import ( "fmt" "math/rand" "sort" + "strconv" "strings" "sync" "sync/atomic"