diff --git a/server/consumer.go b/server/consumer.go index b229b9fdbc..304fb32638 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -213,14 +213,14 @@ var ( // Calculate accurate replicas for the consumer config with the parent stream config. func (consCfg ConsumerConfig) replicas(strCfg *StreamConfig) int { - if consCfg.Replicas == 0 { - if !isDurableConsumer(&consCfg) && strCfg.Retention == LimitsPolicy { + if consCfg.Replicas == 0 || consCfg.Replicas > strCfg.Replicas { + if !isDurableConsumer(&consCfg) && strCfg.Retention == LimitsPolicy && consCfg.Replicas == 0 { + // Matches old-school ephemerals only, where the replica count is 0. return 1 } return strCfg.Replicas - } else { - return consCfg.Replicas } + return consCfg.Replicas } // Consumer is a jetstream consumer. diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index 6af4fa42ba..1734f09624 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -4001,7 +4001,7 @@ func (js *jetStream) processClusterCreateConsumer(ca *consumerAssignment, state var didCreate, isConfigUpdate, needsLocalResponse bool if o == nil { // Add in the consumer if needed. - if o, err = mset.addConsumerWithAssignment(ca.Config, ca.Name, ca, false); err == nil { + if o, err = mset.addConsumerWithAssignment(ca.Config, ca.Name, ca, wasExisting); err == nil { didCreate = true } } else { diff --git a/server/jetstream_cluster_3_test.go b/server/jetstream_cluster_3_test.go index fbf46524ce..eaa412cc57 100644 --- a/server/jetstream_cluster_3_test.go +++ b/server/jetstream_cluster_3_test.go @@ -4985,3 +4985,44 @@ func TestJetStreamClusterStreamFailTrackingSnapshots(t *testing.T) { t.Fatalf("Expected no errors, got %d", len(errCh)) } } + +func TestJetStreamClusterOrphanConsumerSubjects(t *testing.T) { + c := createJetStreamClusterExplicit(t, "R3S", 3) + defer c.shutdown() + + nc, js := jsClientConnect(t, c.randomServer()) + defer nc.Close() + + _, err := js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"foo.>", "bar.>"}, + Replicas: 3, + }) + require_NoError(t, err) + + _, err = js.AddConsumer("TEST", &nats.ConsumerConfig{ + Name: "consumer_foo", + Durable: "consumer_foo", + FilterSubject: "foo.something", + }) + require_NoError(t, err) + + for _, replicas := range []int{3, 1, 3} { + _, err = js.UpdateStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"bar.>"}, + Replicas: replicas, + }) + require_NoError(t, err) + c.waitOnAllCurrent() + } + + c.waitOnStreamLeader("$G", "TEST") + c.waitOnConsumerLeader("$G", "TEST", "consumer_foo") + + info, err := js.ConsumerInfo("TEST", "consumer_foo") + require_NoError(t, err) + require_True(t, info.Cluster != nil) + require_NotEqual(t, info.Cluster.Leader, "") + require_Equal(t, len(info.Cluster.Replicas), 2) +} diff --git a/server/test_test.go b/server/test_test.go index 64717f901e..50594710a3 100644 --- a/server/test_test.go +++ b/server/test_test.go @@ -14,7 +14,6 @@ package server import ( - "bytes" "fmt" "math/rand" "net/url" @@ -112,16 +111,16 @@ func require_Error(t *testing.T, err error, expected ...error) { t.Fatalf("Expected one of %v, got '%v'", expected, err) } -func require_Equal(t *testing.T, a, b string) { +func require_Equal[T comparable](t *testing.T, a, b T) { t.Helper() - if strings.Compare(a, b) != 0 { + if a != b { t.Fatalf("require equal, but got: %v != %v", a, b) } } -func require_NotEqual(t *testing.T, a, b [32]byte) { +func require_NotEqual[T comparable](t *testing.T, a, b T) { t.Helper() - if bytes.Equal(a[:], b[:]) { + if a == b { t.Fatalf("require not equal, but got: %v != %v", a, b) } }