diff --git a/server/consumer.go b/server/consumer.go index fafb7b5a57a..45ad681509f 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -1735,7 +1735,12 @@ func (o *consumer) updateConfig(cfg *ConsumerConfig) error { o.mu.Lock() // When we're done with signaling, we can replace the subjects. - o.subjf = newSubjf + // If filters were removed, set `o.subjf` to nil. + if len(newSubjf) == 0 { + o.subjf = nil + } else { + o.subjf = newSubjf + } } // Record new config for others that do not need special handling. @@ -3247,7 +3252,7 @@ func (o *consumer) getNextMsg() (*jsPubMsg, uint64, error) { store := o.mset.store // If no filters are specified, optimize to fetch just non-filtered messages. - if o.subjf == nil { + if len(o.subjf) == 0 { // Grab next message applicable to us. // We will unlock here in case lots of contention, e.g. WQ. o.mu.Unlock() @@ -3308,7 +3313,7 @@ func (o *consumer) getNextMsg() (*jsPubMsg, uint64, error) { // even if len == 0 or 1. // TODO(tp): we should have sort based off generics for server // to avoid reflection. - if o.subjf != nil && len(o.subjf) > 1 { + if len(o.subjf) > 1 { sort.Slice(o.subjf, func(i, j int) bool { if o.subjf[j].pmsg != nil && o.subjf[i].pmsg == nil { return false diff --git a/server/jetstream_consumer_test.go b/server/jetstream_consumer_test.go index d13765b5b26..6c030f73876 100644 --- a/server/jetstream_consumer_test.go +++ b/server/jetstream_consumer_test.go @@ -28,6 +28,56 @@ import ( "github.com/nats-io/nats.go" ) +func TestJetStreamConsumerMultipleFiltersRemoveFilters(t *testing.T) { + + s := RunBasicJetStreamServer(t) + defer s.Shutdown() + + nc, js := jsClientConnect(t, s) + defer nc.Close() + acc := s.GlobalAccount() + + mset, err := acc.addStream(&StreamConfig{ + Name: "TEST", + Retention: LimitsPolicy, + Subjects: []string{"one", "two", "three"}, + MaxAge: time.Second * 90, + }) + require_NoError(t, err) + + _, err = mset.addConsumer(&ConsumerConfig{ + Durable: "consumer", + FilterSubjects: []string{"one", "two"}, + }) + require_NoError(t, err) + + sendStreamMsg(t, nc, "one", "data") + sendStreamMsg(t, nc, "two", "data") + sendStreamMsg(t, nc, "three", "data") + + consumer, err := js.PullSubscribe("", "consumer", nats.Bind("TEST", "consumer")) + require_NoError(t, err) + + msgs, err := consumer.Fetch(1) + require_NoError(t, err) + require_True(t, len(msgs) == 1) + + _, err = mset.addConsumer(&ConsumerConfig{ + Durable: "consumer", + FilterSubjects: []string{}, + }) + require_NoError(t, err) + + msgs, err = consumer.Fetch(1) + require_NoError(t, err) + require_True(t, len(msgs) == 1) + + msgs, err = consumer.Fetch(1) + require_NoError(t, err) + require_True(t, len(msgs) == 1) + +} + func TestJetStreamConsumerMultipleFiltersRace(t *testing.T) { s := RunBasicJetStreamServer(t) defer s.Shutdown()