Skip to content

Commit

Permalink
Merge e4f7287 into 3797661
Browse files Browse the repository at this point in the history
  • Loading branch information
wallyqs committed Feb 2, 2021
2 parents 3797661 + e4f7287 commit 0734f21
Show file tree
Hide file tree
Showing 3 changed files with 98 additions and 50 deletions.
84 changes: 46 additions & 38 deletions js.go
Expand Up @@ -448,15 +448,17 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, opts []
return nil, ErrPullModeNotAllowed
}

var err error
var stream, deliver string
var ccfg *ConsumerConfig

// If we are attaching to an existing consumer.
shouldAttach := o.stream != _EMPTY_ && o.consumer != _EMPTY_ || o.cfg.DeliverSubject != _EMPTY_
shouldCreate := !shouldAttach
var (
err error
shouldCreate bool
ccfg *ConsumerConfig
deliver string
stream = o.stream
consumer = o.consumer
requiresAPI = (stream == _EMPTY_ && consumer == _EMPTY_) && o.cfg.DeliverSubject == _EMPTY_
)

if js.direct && shouldCreate {
if js.direct && requiresAPI {
return nil, ErrDirectModeRequired
}

Expand All @@ -466,33 +468,48 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, opts []
} else {
deliver = NewInbox()
}
} else if shouldAttach {
info, err := js.getConsumerInfo(o.stream, o.consumer)
} else {
// Find the stream mapped to the subject.
stream, err = js.lookupStreamBySubject(subj)
if err != nil {
return nil, err
}

ccfg = &info.Config
// Make sure this new subject matches or is a subset.
if ccfg.FilterSubject != _EMPTY_ && subj != ccfg.FilterSubject {
return nil, ErrSubjectMismatch
// With an explicit durable name, then can lookup
// the consumer to which it should be attaching to.
var info *ConsumerInfo
consumer = o.cfg.Durable
if consumer != _EMPTY_ {
// Only create in case there is no consumer already.
info, err = js.ConsumerInfo(stream, consumer)
if err != nil && err.Error() != `consumer not found` {
return nil, err
}
}
if ccfg.DeliverSubject != _EMPTY_ {
deliver = ccfg.DeliverSubject

if info != nil {
// Attach using the found consumer config.
ccfg = &info.Config

// Make sure this new subject matches or is a subset.
if ccfg.FilterSubject != _EMPTY_ && subj != ccfg.FilterSubject {
return nil, ErrSubjectMismatch
}

if ccfg.DeliverSubject != _EMPTY_ {
deliver = ccfg.DeliverSubject
} else {
deliver = NewInbox()
}
} else {
shouldCreate = true
deliver = NewInbox()
if !isPullMode {
cfg.DeliverSubject = deliver
}
// Do filtering always, server will clear as needed.
cfg.FilterSubject = subj
}
} else {
stream, err = js.lookupStreamBySubject(subj)
if err != nil {
return nil, err
}
deliver = NewInbox()
if !isPullMode {
cfg.DeliverSubject = deliver
}
// Do filtering always, server will clear as needed.
cfg.FilterSubject = subj
}

var sub *Subscription
Expand All @@ -502,7 +519,6 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, opts []
ocb := cb
cb = func(m *Msg) { ocb(m); m.Ack() }
}

sub, err = js.nc.subscribe(deliver, queue, cb, ch, cb == nil, &jsSub{js: js})
if err != nil {
return nil, err
Expand Down Expand Up @@ -564,8 +580,8 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, opts []
sub.jsi.consumer = info.Name
sub.jsi.deliver = info.Config.DeliverSubject
} else {
sub.jsi.stream = o.stream
sub.jsi.consumer = o.consumer
sub.jsi.stream = stream
sub.jsi.consumer = consumer
if js.direct {
sub.jsi.deliver = o.cfg.DeliverSubject
} else {
Expand Down Expand Up @@ -637,14 +653,6 @@ func Durable(name string) SubOpt {
})
}

func Attach(stream, consumer string) SubOpt {
return subOptFn(func(opts *subOpts) error {
opts.stream = stream
opts.consumer = consumer
return nil
})
}

func Pull(batchSize int) SubOpt {
return subOptFn(func(opts *subOpts) error {
if batchSize == 0 {
Expand Down
8 changes: 4 additions & 4 deletions jsm.go
Expand Up @@ -212,12 +212,12 @@ type consumerDeleteResponse struct {
}

// DeleteConsumer deletes a Consumer.
func (js *js) DeleteConsumer(stream, durable string) error {
func (js *js) DeleteConsumer(stream, consumer string) error {
if stream == _EMPTY_ {
return ErrStreamNameRequired
}

dcSubj := js.apiSubj(fmt.Sprintf(apiConsumerDeleteT, stream, durable))
dcSubj := js.apiSubj(fmt.Sprintf(apiConsumerDeleteT, stream, consumer))
r, err := js.nc.Request(dcSubj, nil, js.wait)
if err != nil {
return err
Expand All @@ -233,8 +233,8 @@ func (js *js) DeleteConsumer(stream, durable string) error {
}

// ConsumerInfo returns information about a Consumer.
func (js *js) ConsumerInfo(stream, durable string) (*ConsumerInfo, error) {
return js.getConsumerInfo(stream, durable)
func (js *js) ConsumerInfo(stream, consumer string) (*ConsumerInfo, error) {
return js.getConsumerInfo(stream, consumer)
}

// ConsumerLister fetches pages of ConsumerInfo objects. This object is not
Expand Down
56 changes: 48 additions & 8 deletions test/js_test.go
Expand Up @@ -240,6 +240,26 @@ func TestJetStreamSubscribe(t *testing.T) {
t.Fatalf("Unexpected error: %v", err)
}

expectConsumers := func(t *testing.T, expected int) []*nats.ConsumerInfo {
t.Helper()
cl := js.NewConsumerLister("TEST")
if !cl.Next() {
if err := cl.Err(); err != nil {
t.Errorf("Unexpected error: %v", err)
}
t.Fatalf("Unexpected consumer lister next")
}
p := cl.Page()
if len(p) != expected {
t.Fatalf("Expected %d consumers, got: %d", expected, len(p))
}
if err := cl.Err(); err != nil {
t.Errorf("Unexpected error: %v", err)
}

return p
}

// Create the stream using our client API.
_, err = js.AddStream(&nats.StreamConfig{
Name: "TEST",
Expand Down Expand Up @@ -343,6 +363,7 @@ func TestJetStreamSubscribe(t *testing.T) {
t.Fatalf("Expected to have ack'd all %d messages, got ack floor of %d", toSend, info.AckFloor.Consumer)
}
sub.Unsubscribe()
expectConsumers(t, 3)

// Now create a sync subscriber that is durable.
dname := "derek"
Expand All @@ -351,28 +372,48 @@ func TestJetStreamSubscribe(t *testing.T) {
t.Fatalf("Unexpected error: %v", err)
}
defer sub.Unsubscribe()
expectConsumers(t, 4)

// Make sure we registered as a durable.
if info, _ := sub.ConsumerInfo(); info.Config.Durable != dname {
t.Fatalf("Expected durable name to be set to %q, got %q", dname, info.Config.Durable)
}
deliver := sub.Subject
sub.Unsubscribe()

// Remove subscription, but do not delete consumer.
sub.Drain()
nc.Flush()
expectConsumers(t, 4)

// Reattach using the same consumer.
sub, err = js.SubscribeSync("foo", nats.Durable(dname))
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if deliver != sub.Subject {
t.Fatal("Expected delivery subject to be the same after reattach")
}
expectConsumers(t, 4)

// Cleanup the consumer to be able to create again with a different delivery subject.
js.DeleteConsumer("TEST", dname)
expectConsumers(t, 3)

// Create again and make sure that works and that we attach to the same durable with different delivery.
sub, err = js.SubscribeSync("foo", nats.Durable(dname))
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
defer sub.Unsubscribe()
expectConsumers(t, 4)

if deliver == sub.Subject {
t.Fatalf("Expected delivery subject to be different then %q", deliver)
}
deliver = sub.Subject

// Now test that we can attach to an existing durable.
sub, err = js.SubscribeSync("foo", nats.Attach(mset.Name(), dname))
sub, err = js.SubscribeSync("foo", nats.Durable(dname))
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
Expand Down Expand Up @@ -425,7 +466,7 @@ func TestJetStreamSubscribe(t *testing.T) {

// Test that if we are attaching that the subjects will match up. rip from
// above was created with a filtered subject of bar, so this should fail.
_, err = js.SubscribeSync("baz", nats.Attach(mset.Name(), "rip"), nats.Pull(batch))
_, err = js.SubscribeSync("baz", nats.Durable("rip"), nats.Pull(batch))
if err != nats.ErrSubjectMismatch {
t.Fatalf("Expected a %q error but got %q", nats.ErrSubjectMismatch, err)
}
Expand All @@ -435,7 +476,7 @@ func TestJetStreamSubscribe(t *testing.T) {
js.Publish("bar", msg)
}

sub, err = js.SubscribeSync("bar", nats.Attach(mset.Name(), "rip"), nats.Pull(batch))
sub, err = js.SubscribeSync("bar", nats.Durable("rip"), nats.Pull(batch))
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
Expand Down Expand Up @@ -943,7 +984,7 @@ func TestJetStreamImportDirectOnly(t *testing.T) {

var sub *nats.Subscription

waitForPending := func(n int) {
waitForPending := func(t *testing.T, n int) {
t.Helper()
timeout := time.Now().Add(2 * time.Second)
for time.Now().Before(timeout) {
Expand All @@ -961,7 +1002,7 @@ func TestJetStreamImportDirectOnly(t *testing.T) {
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
waitForPending(toSend)
waitForPending(t, toSend)

// Ack the messages from the push consumer.
for i := 0; i < toSend; i++ {
Expand All @@ -983,8 +1024,7 @@ func TestJetStreamImportDirectOnly(t *testing.T) {
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}

waitForPending(batch)
waitForPending(t, batch)

for i := 0; i < toSend; i++ {
m, err := sub.NextMsg(100 * time.Millisecond)
Expand Down

0 comments on commit 0734f21

Please sign in to comment.