From e4f7287f452a9d0cff75947560bc2aad2cb12d9e Mon Sep 17 00:00:00 2001 From: Waldemar Quevedo Date: Mon, 1 Feb 2021 23:07:15 -0800 Subject: [PATCH] js: Make consumer attach based on durable name, remove Attach Signed-off-by: Waldemar Quevedo --- js.go | 84 +++++++++++++++++++++++++++---------------------- jsm.go | 8 ++--- test/js_test.go | 56 ++++++++++++++++++++++++++++----- 3 files changed, 98 insertions(+), 50 deletions(-) diff --git a/js.go b/js.go index 2e35b189e..4fa1029ad 100644 --- a/js.go +++ b/js.go @@ -448,15 +448,17 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, opts [] return nil, ErrPullModeNotAllowed } - var err error - var stream, deliver string - var ccfg *ConsumerConfig - - // If we are attaching to an existing consumer. - shouldAttach := o.stream != _EMPTY_ && o.consumer != _EMPTY_ || o.cfg.DeliverSubject != _EMPTY_ - shouldCreate := !shouldAttach + var ( + err error + shouldCreate bool + ccfg *ConsumerConfig + deliver string + stream = o.stream + consumer = o.consumer + requiresAPI = (stream == _EMPTY_ && consumer == _EMPTY_) && o.cfg.DeliverSubject == _EMPTY_ + ) - if js.direct && shouldCreate { + if js.direct && requiresAPI { return nil, ErrDirectModeRequired } @@ -466,33 +468,48 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, opts [] } else { deliver = NewInbox() } - } else if shouldAttach { - info, err := js.getConsumerInfo(o.stream, o.consumer) + } else { + // Find the stream mapped to the subject. + stream, err = js.lookupStreamBySubject(subj) if err != nil { return nil, err } - ccfg = &info.Config - // Make sure this new subject matches or is a subset. - if ccfg.FilterSubject != _EMPTY_ && subj != ccfg.FilterSubject { - return nil, ErrSubjectMismatch + // With an explicit durable name, then can lookup + // the consumer to which it should be attaching to. + var info *ConsumerInfo + consumer = o.cfg.Durable + if consumer != _EMPTY_ { + // Only create in case there is no consumer already. + info, err = js.ConsumerInfo(stream, consumer) + if err != nil && err.Error() != `consumer not found` { + return nil, err + } } - if ccfg.DeliverSubject != _EMPTY_ { - deliver = ccfg.DeliverSubject + + if info != nil { + // Attach using the found consumer config. + ccfg = &info.Config + + // Make sure this new subject matches or is a subset. + if ccfg.FilterSubject != _EMPTY_ && subj != ccfg.FilterSubject { + return nil, ErrSubjectMismatch + } + + if ccfg.DeliverSubject != _EMPTY_ { + deliver = ccfg.DeliverSubject + } else { + deliver = NewInbox() + } } else { + shouldCreate = true deliver = NewInbox() + if !isPullMode { + cfg.DeliverSubject = deliver + } + // Do filtering always, server will clear as needed. + cfg.FilterSubject = subj } - } else { - stream, err = js.lookupStreamBySubject(subj) - if err != nil { - return nil, err - } - deliver = NewInbox() - if !isPullMode { - cfg.DeliverSubject = deliver - } - // Do filtering always, server will clear as needed. - cfg.FilterSubject = subj } var sub *Subscription @@ -502,7 +519,6 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, opts [] ocb := cb cb = func(m *Msg) { ocb(m); m.Ack() } } - sub, err = js.nc.subscribe(deliver, queue, cb, ch, cb == nil, &jsSub{js: js}) if err != nil { return nil, err @@ -564,8 +580,8 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, opts [] sub.jsi.consumer = info.Name sub.jsi.deliver = info.Config.DeliverSubject } else { - sub.jsi.stream = o.stream - sub.jsi.consumer = o.consumer + sub.jsi.stream = stream + sub.jsi.consumer = consumer if js.direct { sub.jsi.deliver = o.cfg.DeliverSubject } else { @@ -637,14 +653,6 @@ func Durable(name string) SubOpt { }) } -func Attach(stream, consumer string) SubOpt { - return subOptFn(func(opts *subOpts) error { - opts.stream = stream - opts.consumer = consumer - return nil - }) -} - func Pull(batchSize int) SubOpt { return subOptFn(func(opts *subOpts) error { if batchSize == 0 { diff --git a/jsm.go b/jsm.go index 5034ce4a3..36b7cd1d6 100644 --- a/jsm.go +++ b/jsm.go @@ -212,12 +212,12 @@ type consumerDeleteResponse struct { } // DeleteConsumer deletes a Consumer. -func (js *js) DeleteConsumer(stream, durable string) error { +func (js *js) DeleteConsumer(stream, consumer string) error { if stream == _EMPTY_ { return ErrStreamNameRequired } - dcSubj := js.apiSubj(fmt.Sprintf(apiConsumerDeleteT, stream, durable)) + dcSubj := js.apiSubj(fmt.Sprintf(apiConsumerDeleteT, stream, consumer)) r, err := js.nc.Request(dcSubj, nil, js.wait) if err != nil { return err @@ -233,8 +233,8 @@ func (js *js) DeleteConsumer(stream, durable string) error { } // ConsumerInfo returns information about a Consumer. -func (js *js) ConsumerInfo(stream, durable string) (*ConsumerInfo, error) { - return js.getConsumerInfo(stream, durable) +func (js *js) ConsumerInfo(stream, consumer string) (*ConsumerInfo, error) { + return js.getConsumerInfo(stream, consumer) } // ConsumerLister fetches pages of ConsumerInfo objects. This object is not diff --git a/test/js_test.go b/test/js_test.go index 98f391bf5..ef67703fa 100644 --- a/test/js_test.go +++ b/test/js_test.go @@ -240,6 +240,26 @@ func TestJetStreamSubscribe(t *testing.T) { t.Fatalf("Unexpected error: %v", err) } + expectConsumers := func(t *testing.T, expected int) []*nats.ConsumerInfo { + t.Helper() + cl := js.NewConsumerLister("TEST") + if !cl.Next() { + if err := cl.Err(); err != nil { + t.Errorf("Unexpected error: %v", err) + } + t.Fatalf("Unexpected consumer lister next") + } + p := cl.Page() + if len(p) != expected { + t.Fatalf("Expected %d consumers, got: %d", expected, len(p)) + } + if err := cl.Err(); err != nil { + t.Errorf("Unexpected error: %v", err) + } + + return p + } + // Create the stream using our client API. _, err = js.AddStream(&nats.StreamConfig{ Name: "TEST", @@ -343,6 +363,7 @@ func TestJetStreamSubscribe(t *testing.T) { t.Fatalf("Expected to have ack'd all %d messages, got ack floor of %d", toSend, info.AckFloor.Consumer) } sub.Unsubscribe() + expectConsumers(t, 3) // Now create a sync subscriber that is durable. dname := "derek" @@ -351,13 +372,32 @@ func TestJetStreamSubscribe(t *testing.T) { t.Fatalf("Unexpected error: %v", err) } defer sub.Unsubscribe() + expectConsumers(t, 4) // Make sure we registered as a durable. if info, _ := sub.ConsumerInfo(); info.Config.Durable != dname { t.Fatalf("Expected durable name to be set to %q, got %q", dname, info.Config.Durable) } deliver := sub.Subject - sub.Unsubscribe() + + // Remove subscription, but do not delete consumer. + sub.Drain() + nc.Flush() + expectConsumers(t, 4) + + // Reattach using the same consumer. + sub, err = js.SubscribeSync("foo", nats.Durable(dname)) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + if deliver != sub.Subject { + t.Fatal("Expected delivery subject to be the same after reattach") + } + expectConsumers(t, 4) + + // Cleanup the consumer to be able to create again with a different delivery subject. + js.DeleteConsumer("TEST", dname) + expectConsumers(t, 3) // Create again and make sure that works and that we attach to the same durable with different delivery. sub, err = js.SubscribeSync("foo", nats.Durable(dname)) @@ -365,6 +405,7 @@ func TestJetStreamSubscribe(t *testing.T) { t.Fatalf("Unexpected error: %v", err) } defer sub.Unsubscribe() + expectConsumers(t, 4) if deliver == sub.Subject { t.Fatalf("Expected delivery subject to be different then %q", deliver) @@ -372,7 +413,7 @@ func TestJetStreamSubscribe(t *testing.T) { deliver = sub.Subject // Now test that we can attach to an existing durable. - sub, err = js.SubscribeSync("foo", nats.Attach(mset.Name(), dname)) + sub, err = js.SubscribeSync("foo", nats.Durable(dname)) if err != nil { t.Fatalf("Unexpected error: %v", err) } @@ -425,7 +466,7 @@ func TestJetStreamSubscribe(t *testing.T) { // Test that if we are attaching that the subjects will match up. rip from // above was created with a filtered subject of bar, so this should fail. - _, err = js.SubscribeSync("baz", nats.Attach(mset.Name(), "rip"), nats.Pull(batch)) + _, err = js.SubscribeSync("baz", nats.Durable("rip"), nats.Pull(batch)) if err != nats.ErrSubjectMismatch { t.Fatalf("Expected a %q error but got %q", nats.ErrSubjectMismatch, err) } @@ -435,7 +476,7 @@ func TestJetStreamSubscribe(t *testing.T) { js.Publish("bar", msg) } - sub, err = js.SubscribeSync("bar", nats.Attach(mset.Name(), "rip"), nats.Pull(batch)) + sub, err = js.SubscribeSync("bar", nats.Durable("rip"), nats.Pull(batch)) if err != nil { t.Fatalf("Unexpected error: %v", err) } @@ -943,7 +984,7 @@ func TestJetStreamImportDirectOnly(t *testing.T) { var sub *nats.Subscription - waitForPending := func(n int) { + waitForPending := func(t *testing.T, n int) { t.Helper() timeout := time.Now().Add(2 * time.Second) for time.Now().Before(timeout) { @@ -961,7 +1002,7 @@ func TestJetStreamImportDirectOnly(t *testing.T) { if err != nil { t.Fatalf("Unexpected error: %v", err) } - waitForPending(toSend) + waitForPending(t, toSend) // Ack the messages from the push consumer. for i := 0; i < toSend; i++ { @@ -983,8 +1024,7 @@ func TestJetStreamImportDirectOnly(t *testing.T) { if err != nil { t.Fatalf("Unexpected error: %v", err) } - - waitForPending(batch) + waitForPending(t, batch) for i := 0; i < toSend; i++ { m, err := sub.NextMsg(100 * time.Millisecond)