Skip to content

Commit

Permalink
Allow filtered stream mirrors
Browse files Browse the repository at this point in the history
Signed-off-by: Derek Collison <derek@nats.io>
  • Loading branch information
derekcollison committed Jun 29, 2022
1 parent b0580cd commit 3e12a5e
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 10 deletions.
60 changes: 60 additions & 0 deletions server/jetstream_cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11218,3 +11218,63 @@ func TestJetStreamClusterConsumerAndStreamNamesWithPathSeparators(t *testing.T)
_, err = js.AddConsumer("T", &nats.ConsumerConfig{Durable: `a\b`, AckPolicy: nats.AckExplicitPolicy})
require_Error(t, err, NewJSConsumerNameContainsPathSeparatorsError(), nats.ErrInvalidConsumerName)
}

func TestJetStreamClusterFilteredMirrors(t *testing.T) {
c := createJetStreamClusterExplicit(t, "MSR", 5)
defer c.shutdown()

// Client for API requests.
nc, js := jsClientConnect(t, c.randomServer())
defer nc.Close()

// Origin
_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo", "bar", "baz"},
})
require_NoError(t, err)

msg := bytes.Repeat([]byte("Z"), 3)
for i := 0; i < 100; i++ {
js.PublishAsync("foo", msg)
js.PublishAsync("bar", msg)
js.PublishAsync("baz", msg)
}
select {
case <-js.PublishAsyncComplete():
case <-time.After(5 * time.Second):
t.Fatalf("Did not receive completion signal")
}

// Create Mirror now.
_, err = js.AddStream(&nats.StreamConfig{
Name: "M",
Mirror: &nats.StreamSource{Name: "TEST", FilterSubject: "foo"},
})
require_NoError(t, err)

checkFor(t, 2*time.Second, 100*time.Millisecond, func() error {
si, err := js.StreamInfo("M")
require_NoError(t, err)
if si.State.Msgs != 100 {
return fmt.Errorf("Expected 100 msgs, got state: %+v", si.State)
}
return nil
})

sub, err := js.PullSubscribe("foo", "d", nats.BindStream("M"))
require_NoError(t, err)

// Make sure we only have "foo" and that sequence numbers preserved.
sseq, dseq := uint64(1), uint64(1)
for _, m := range fetchMsgs(t, sub, 100, 5*time.Second) {
require_True(t, m.Subject == "foo")
meta, err := m.Metadata()
require_NoError(t, err)
require_True(t, meta.Sequence.Consumer == dseq)
dseq++
require_True(t, meta.Sequence.Stream == sseq)
sseq += 3
}

}
15 changes: 5 additions & 10 deletions server/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -1006,16 +1006,6 @@ func (s *Server) checkStreamCfg(config *StreamConfig, acc *Account) (StreamConfi
if len(cfg.Sources) > 0 {
return StreamConfig{}, NewJSMirrorWithSourcesError()
}
if cfg.Mirror.FilterSubject != _EMPTY_ {
return StreamConfig{}, NewJSMirrorWithSubjectFiltersError()
}
if cfg.Mirror.OptStartSeq > 0 && cfg.Mirror.OptStartTime != nil {
return StreamConfig{}, NewJSMirrorWithStartSeqAndTimeError()
}
if cfg.Duplicates != time.Duration(0) {
return StreamConfig{}, NewJSStreamInvalidConfigError(
errors.New("stream mirrors do not make use of a de-duplication window"))
}
// We do not require other stream to exist anymore, but if we can see it check payloads.
exists, maxMsgSize, subs := hasStream(cfg.Mirror.Name)
if len(subs) > 0 {
Expand Down Expand Up @@ -2050,6 +2040,11 @@ func (mset *stream) setupMirrorConsumer() error {
req.Config.DeliverPolicy = DeliverAll
}

// Filters
if mset.cfg.Mirror.FilterSubject != _EMPTY_ {
req.Config.FilterSubject = mset.cfg.Mirror.FilterSubject
}

respCh := make(chan *JSApiConsumerCreateResponse, 1)
reply := infoReplySubject()
crSub, err := mset.subscribeInternal(reply, func(sub *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) {
Expand Down

0 comments on commit 3e12a5e

Please sign in to comment.