diff --git a/server/consumer.go b/server/consumer.go index e1696f5772..ad3b8849c0 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -2569,9 +2569,9 @@ func (o *consumer) isFiltered() bool { // `isFiltered` need to be performant, so we do // as any checks as possible to avoid unnecessary work. // Here we avoid iteration over slices if there is only one subject in stream - // and one equal filter for the consumer. - if len(mset.cfg.Subjects) == 1 && len(o.subjf) == 1 && mset.cfg.Subjects[0] == o.subjf[0].subject { - return true + // and one filter for the consumer. + if len(mset.cfg.Subjects) == 1 && len(o.subjf) == 1 { + return mset.cfg.Subjects[0] != o.subjf[0].subject } // if the list is not equal length, we can return early, as this is filtered. diff --git a/server/jetstream_test.go b/server/jetstream_test.go index a777af68cf..54566aba84 100644 --- a/server/jetstream_test.go +++ b/server/jetstream_test.go @@ -20573,3 +20573,64 @@ func TestJetStreamConsumerAckFloorWithExpired(t *testing.T) { require_True(t, ci.NumPending == 0) require_True(t, ci.NumRedelivered == 0) } + +func TestJetStreamConsumerIsFiltered(t *testing.T) { + s := RunBasicJetStreamServer(t) + defer s.Shutdown() + acc := s.GlobalAccount() + + tests := []struct { + name string + streamSubjects []string + filters []string + isFiltered bool + }{ + { + name: "single_subject", + streamSubjects: []string{"one"}, + filters: []string{"one"}, + isFiltered: false, + }, + { + name: "single_subject_filtered", + streamSubjects: []string{"one.>"}, + filters: []string{"one.filter"}, + isFiltered: true, + }, + { + name: "multi_subject_non_filtered", + streamSubjects: []string{"multi", "foo", "bar.>"}, + filters: []string{"multi", "bar.>", "foo"}, + isFiltered: false, + }, + { + name: "multi_subject_filtered_wc", + streamSubjects: []string{"events", "data"}, + filters: []string{"data"}, + isFiltered: true, + }, + { + name: "multi_subject_filtered", + streamSubjects: []string{"machines", "floors"}, + filters: []string{"machines"}, + isFiltered: true, + }, + } + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + mset, err := acc.addStream(&StreamConfig{ + Name: test.name, + Subjects: test.streamSubjects, + }) + require_NoError(t, err) + + o, err := mset.addConsumer(&ConsumerConfig{ + FilterSubjects: test.filters, + Durable: test.name, + }) + require_NoError(t, err) + + require_True(t, o.isFiltered() == test.isFiltered) + }) + } +}