From 3e12a5e6365bea8f49d9d4559f1cc67bf6966499 Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Wed, 29 Jun 2022 08:03:29 -0700 Subject: [PATCH] Allow filtered stream mirrors Signed-off-by: Derek Collison --- server/jetstream_cluster_test.go | 60 ++++++++++++++++++++++++++++++++ server/stream.go | 15 +++----- 2 files changed, 65 insertions(+), 10 deletions(-) diff --git a/server/jetstream_cluster_test.go b/server/jetstream_cluster_test.go index 025ed847232..a5ebb6f46bf 100644 --- a/server/jetstream_cluster_test.go +++ b/server/jetstream_cluster_test.go @@ -11218,3 +11218,63 @@ func TestJetStreamClusterConsumerAndStreamNamesWithPathSeparators(t *testing.T) _, err = js.AddConsumer("T", &nats.ConsumerConfig{Durable: `a\b`, AckPolicy: nats.AckExplicitPolicy}) require_Error(t, err, NewJSConsumerNameContainsPathSeparatorsError(), nats.ErrInvalidConsumerName) } + +func TestJetStreamClusterFilteredMirrors(t *testing.T) { + c := createJetStreamClusterExplicit(t, "MSR", 5) + defer c.shutdown() + + // Client for API requests. + nc, js := jsClientConnect(t, c.randomServer()) + defer nc.Close() + + // Origin + _, err := js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"foo", "bar", "baz"}, + }) + require_NoError(t, err) + + msg := bytes.Repeat([]byte("Z"), 3) + for i := 0; i < 100; i++ { + js.PublishAsync("foo", msg) + js.PublishAsync("bar", msg) + js.PublishAsync("baz", msg) + } + select { + case <-js.PublishAsyncComplete(): + case <-time.After(5 * time.Second): + t.Fatalf("Did not receive completion signal") + } + + // Create Mirror now. + _, err = js.AddStream(&nats.StreamConfig{ + Name: "M", + Mirror: &nats.StreamSource{Name: "TEST", FilterSubject: "foo"}, + }) + require_NoError(t, err) + + checkFor(t, 2*time.Second, 100*time.Millisecond, func() error { + si, err := js.StreamInfo("M") + require_NoError(t, err) + if si.State.Msgs != 100 { + return fmt.Errorf("Expected 100 msgs, got state: %+v", si.State) + } + return nil + }) + + sub, err := js.PullSubscribe("foo", "d", nats.BindStream("M")) + require_NoError(t, err) + + // Make sure we only have "foo" and that sequence numbers preserved. + sseq, dseq := uint64(1), uint64(1) + for _, m := range fetchMsgs(t, sub, 100, 5*time.Second) { + require_True(t, m.Subject == "foo") + meta, err := m.Metadata() + require_NoError(t, err) + require_True(t, meta.Sequence.Consumer == dseq) + dseq++ + require_True(t, meta.Sequence.Stream == sseq) + sseq += 3 + } + +} diff --git a/server/stream.go b/server/stream.go index d77813136e4..bde3566a198 100644 --- a/server/stream.go +++ b/server/stream.go @@ -1006,16 +1006,6 @@ func (s *Server) checkStreamCfg(config *StreamConfig, acc *Account) (StreamConfi if len(cfg.Sources) > 0 { return StreamConfig{}, NewJSMirrorWithSourcesError() } - if cfg.Mirror.FilterSubject != _EMPTY_ { - return StreamConfig{}, NewJSMirrorWithSubjectFiltersError() - } - if cfg.Mirror.OptStartSeq > 0 && cfg.Mirror.OptStartTime != nil { - return StreamConfig{}, NewJSMirrorWithStartSeqAndTimeError() - } - if cfg.Duplicates != time.Duration(0) { - return StreamConfig{}, NewJSStreamInvalidConfigError( - errors.New("stream mirrors do not make use of a de-duplication window")) - } // We do not require other stream to exist anymore, but if we can see it check payloads. exists, maxMsgSize, subs := hasStream(cfg.Mirror.Name) if len(subs) > 0 { @@ -2050,6 +2040,11 @@ func (mset *stream) setupMirrorConsumer() error { req.Config.DeliverPolicy = DeliverAll } + // Filters + if mset.cfg.Mirror.FilterSubject != _EMPTY_ { + req.Config.FilterSubject = mset.cfg.Mirror.FilterSubject + } + respCh := make(chan *JSApiConsumerCreateResponse, 1) reply := infoReplySubject() crSub, err := mset.subscribeInternal(reply, func(sub *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) {