diff --git a/server/jetstream_test.go b/server/jetstream_test.go index 46bc5aa9fc..768f762d24 100644 --- a/server/jetstream_test.go +++ b/server/jetstream_test.go @@ -21853,3 +21853,78 @@ func TestJetStreamSyncInterval(t *testing.T) { }) } } + +func TestJetStreamFilteredSubjectUsesNewConsumerCreateSubject(t *testing.T) { + s := RunBasicJetStreamServer(t) + defer s.Shutdown() + + nc, _ := jsClientConnect(t, s) + defer nc.Close() + + extEndpoint := make(chan *nats.Msg, 1) + normalEndpoint := make(chan *nats.Msg, 1) + + _, err := nc.ChanSubscribe(JSApiConsumerCreateEx, extEndpoint) + require_NoError(t, err) + + _, err = nc.ChanSubscribe(JSApiConsumerCreate, normalEndpoint) + require_NoError(t, err) + + testStreamSource := func(name string, shouldBeExtended bool, ss StreamSource) { + t.Run(name, func(t *testing.T) { + req := StreamConfig{ + Name: name, + Storage: MemoryStorage, + Subjects: []string{fmt.Sprintf("foo.%s", name)}, + Sources: []*StreamSource{&ss}, + } + reqJson, err := json.Marshal(req) + require_NoError(t, err) + + _, err = nc.Request(fmt.Sprintf(JSApiStreamCreateT, name), reqJson, time.Second) + require_NoError(t, err) + + select { + case <-time.After(time.Second * 5): + t.Fatalf("Timed out waiting for receive consumer create") + case <-extEndpoint: + if !shouldBeExtended { + t.Fatalf("Expected normal consumer create, got extended") + } + case <-normalEndpoint: + if shouldBeExtended { + t.Fatalf("Expected extended consumer create, got normal") + } + } + }) + } + + testStreamSource("OneFilterSubject", true, StreamSource{ + Name: "source", + FilterSubject: "bar.>", + }) + + testStreamSource("OneTransform", true, StreamSource{ + Name: "source", + SubjectTransforms: []SubjectTransformConfig{ + { + Source: "bar.one.>", + Destination: "bar.two.>", + }, + }, + }) + + testStreamSource("TwoTransforms", false, StreamSource{ + Name: "source", + SubjectTransforms: []SubjectTransformConfig{ + { + Source: "bar.one.>", + Destination: "bar.two.>", + }, + { + Source: "baz.one.>", + Destination: "baz.two.>", + }, + }, + }) +} diff --git a/server/stream.go b/server/stream.go index 5ce722b09e..43f8f6485c 100644 --- a/server/stream.go +++ b/server/stream.go @@ -2893,6 +2893,13 @@ func (mset *stream) setSourceConsumer(iname string, seq uint64, startTime time.T if req.Config.FilterSubject != _EMPTY_ { req.Config.Name = fmt.Sprintf("src-%s", createConsumerName()) subject = fmt.Sprintf(JSApiConsumerCreateExT, si.name, req.Config.Name, req.Config.FilterSubject) + } else if len(req.Config.FilterSubjects) == 1 { + req.Config.Name = fmt.Sprintf("src-%s", createConsumerName()) + // It is necessary to switch to using FilterSubject here as the extended consumer + // create API checks for it, so as to not accidentally allow multiple filtered subjects. + req.Config.FilterSubject = req.Config.FilterSubjects[0] + req.Config.FilterSubjects = nil + subject = fmt.Sprintf(JSApiConsumerCreateExT, si.name, req.Config.Name, req.Config.FilterSubject) } else { subject = fmt.Sprintf(JSApiConsumerCreateT, si.name) }