From b826bef0de09e3a1de97b0e3ce053a56dafdb5e1 Mon Sep 17 00:00:00 2001 From: Neil Twigg Date: Thu, 17 Aug 2023 17:22:43 +0100 Subject: [PATCH] Recover in consumer assignment when asset already existed Signed-off-by: Neil Twigg --- server/consumer.go | 3 ++- server/jetstream_cluster.go | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/server/consumer.go b/server/consumer.go index 2ee3896a018..304fb32638b 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -214,7 +214,8 @@ var ( // Calculate accurate replicas for the consumer config with the parent stream config. func (consCfg ConsumerConfig) replicas(strCfg *StreamConfig) int { if consCfg.Replicas == 0 || consCfg.Replicas > strCfg.Replicas { - if !isDurableConsumer(&consCfg) && strCfg.Retention == LimitsPolicy { + if !isDurableConsumer(&consCfg) && strCfg.Retention == LimitsPolicy && consCfg.Replicas == 0 { + // Matches old-school ephemerals only, where the replica count is 0. return 1 } return strCfg.Replicas diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index 12b44b33cb4..c31902ad9c5 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -4001,7 +4001,7 @@ func (js *jetStream) processClusterCreateConsumer(ca *consumerAssignment, state var didCreate, isConfigUpdate, needsLocalResponse bool if o == nil { // Add in the consumer if needed. - if o, err = mset.addConsumerWithAssignment(ca.Config, ca.Name, ca, false); err == nil { + if o, err = mset.addConsumerWithAssignment(ca.Config, ca.Name, ca, wasExisting); err == nil { didCreate = true } } else {