Skip to content

Commit

Permalink
js: Add cleanup of consumers on unsubscribe
Browse files Browse the repository at this point in the history
Signed-off-by: Waldemar Quevedo <wally@synadia.com>
  • Loading branch information
wallyqs committed Jan 24, 2021
1 parent 58bf69a commit 42cb549
Show file tree
Hide file tree
Showing 3 changed files with 158 additions and 2 deletions.
9 changes: 7 additions & 2 deletions js.go
Expand Up @@ -503,7 +503,7 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, opts []
cb = func(m *Msg) { ocb(m); m.Ack() }
}

sub, err = js.nc.subscribe(deliver, queue, cb, ch, cb == nil, &jsSub{js: js})
sub, err = js.nc.subscribe(deliver, queue, cb, ch, cb == nil, &jsSub{js: js, attached: o.attached})
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -533,7 +533,8 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, opts []
}

var ccSubj string
if cfg.Durable != _EMPTY_ {
isDurable := cfg.Durable != _EMPTY_
if isDurable {
ccSubj = fmt.Sprintf(apiDurableCreateT, stream, cfg.Durable)
} else {
ccSubj = fmt.Sprintf(apiConsumerCreateT, stream)
Expand Down Expand Up @@ -563,6 +564,7 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, opts []
sub.jsi.stream = info.Stream
sub.jsi.consumer = info.Name
sub.jsi.deliver = info.Config.DeliverSubject
sub.jsi.durable = isDurable
} else {
sub.jsi.stream = o.stream
sub.jsi.consumer = o.consumer
Expand Down Expand Up @@ -624,6 +626,8 @@ type subOpts struct {
mack bool
// For creating or updating.
cfg *ConsumerConfig
// attached marks that a subscription was created using attach.
attached bool
}

func Durable(name string) SubOpt {
Expand All @@ -641,6 +645,7 @@ func Attach(stream, consumer string) SubOpt {
return subOptFn(func(opts *subOpts) error {
opts.stream = stream
opts.consumer = consumer
opts.attached = true
return nil
})
}
Expand Down
30 changes: 30 additions & 0 deletions nats.go
Expand Up @@ -542,6 +542,30 @@ type jsSub struct {
stream string
deliver string
pull int
attached bool
durable bool
}

// unsubscribe deletes an ephemeral consumer from a JetStream subscription
// unless it is attached/direct only.
func (jsi *jsSub) unsubscribe() {
if jsi.attached || jsi.durable {
return
}

// Skip if in direct mode as well.
js := jsi.js
if js.direct {
return
}

err := js.DeleteConsumer(jsi.stream, jsi.consumer)
if err != nil {
nc := js.nc
nc.mu.Lock()
nc.ach.push(func() { nc.Opts.AsyncErrorCB(nc, nil, err) })
nc.mu.Unlock()
}
}

// Msg is a structure used by Subscribers and PublishMsg().
Expand Down Expand Up @@ -3515,6 +3539,7 @@ func (s *Subscription) Unsubscribe() error {
s.mu.Lock()
conn := s.conn
closed := s.closed
jsi := s.jsi
s.mu.Unlock()
if conn == nil || conn.IsClosed() {
return ErrConnectionClosed
Expand All @@ -3525,6 +3550,11 @@ func (s *Subscription) Unsubscribe() error {
if conn.IsDraining() {
return ErrConnectionDraining
}

if jsi != nil {
jsi.unsubscribe()
}

return conn.unsubscribe(s, 0, false)
}

Expand Down
121 changes: 121 additions & 0 deletions test/js_test.go
Expand Up @@ -1507,3 +1507,124 @@ func TestJetStreamSubscribe_AckDupInProgress(t *testing.T) {
t.Logf("Expected to receive multiple acks, got: %v", len(pings))
}
}

func TestJetStream_UnsubscribeDeletesConsumer(t *testing.T) {
s := RunBasicJetStreamServer()
defer s.Shutdown()

if config := s.JetStreamConfig(); config != nil {
defer os.RemoveAll(config.StoreDir)
}

nc, err := nats.Connect(s.ClientURL())
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
defer nc.Close()

js, err := nc.JetStream()
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}

_, err = js.AddStream(&nats.StreamConfig{
Name: "foo",
Subjects: []string{"foo.A", "foo.B", "foo.C"},
})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
js.Publish("foo.A", []byte("A"))
js.Publish("foo.B", []byte("B"))
js.Publish("foo.C", []byte("C"))

subA, err := js.SubscribeSync("foo.A")
if err != nil {
t.Fatal(err)
}
subB, err := js.SubscribeSync("foo.B", nats.Durable("B"))
if err != nil {
t.Fatal(err)
}

// There should be two consumers.
fetchConsumers := func(t *testing.T, expected int) []*nats.ConsumerInfo {
t.Helper()
cl := js.NewConsumerLister("foo")
if !cl.Next() {
if err := cl.Err(); err != nil {
t.Errorf("Unexpected error: %v", err)
}
t.Fatalf("Unexpected consumer lister next")
}
p := cl.Page()
if len(p) != expected {
t.Fatalf("Expected %d consumers, got: %d", expected, len(p))
}
if err := cl.Err(); err != nil {
t.Errorf("Unexpected error: %v", err)
}

return p
}
fetchConsumers(t, 2)

t.Run("ephemeral consumer is deleted", func(t *testing.T) {
err = subA.Unsubscribe()
if err != nil {
t.Errorf("Unexpected error: %v", err)
}

// Only subB should remain.
for _, ci := range fetchConsumers(t, 1) {
ciB, err := subB.ConsumerInfo()
if err != nil {
t.Fatal(err)
}
if ci.Name != ciB.Name {
t.Fatalf("Expected %v, got: %v", ciB.Name, ci.Name)
}
}
})

t.Run("attached consumer not deleted", func(t *testing.T) {
if _, err = js.AddConsumer("foo", &nats.ConsumerConfig{
Durable: "wq",
AckPolicy: nats.AckExplicitPolicy,
// Need to specify filter subject here otherwise
// would get messages from foo.A as well.
FilterSubject: "foo.C",
}); err != nil {
t.Fatalf("Unexpected error: %v", err)
}

// Now test that we can attach to an existing durable.
subC, err := js.SubscribeSync("foo.C", nats.Attach("foo", "wq"), nats.Pull(1))
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
fetchConsumers(t, 2)

msg, err := subC.NextMsg(2 * time.Second)
if err != nil {
t.Errorf("Unexpected error getting message: %v", err)
}
got := string(msg.Data)
expected := "C"
if got != expected {
t.Errorf("Expected %v, got %v", expected, got)
}

// On unsubscribe there should still be 2 consumers.
subC.Unsubscribe()
fetchConsumers(t, 2)
})

t.Run("durable consumer not deleted", func(t *testing.T) {
err = subB.Unsubscribe()
if err != nil {
t.Errorf("Unexpected error: %v", err)
}
fetchConsumers(t, 2)
})
}

0 comments on commit 42cb549

Please sign in to comment.