Skip to content

Commit

Permalink
Added DeleteConsumer() subscription option
Browse files Browse the repository at this point in the history
If set, the JS consumer will be deleted:
- On Unsubscribe(), if error occurs, error is returned.
- After Drain (connection and/or subscription) completes. If error
occurs there, error is reported through async error cb.

Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
  • Loading branch information
kozlovic committed Aug 11, 2021
1 parent 00b033f commit edbc1cd
Show file tree
Hide file tree
Showing 3 changed files with 165 additions and 75 deletions.
56 changes: 26 additions & 30 deletions js.go
Expand Up @@ -822,7 +822,7 @@ type jsSub struct {
deliver string
pull bool
durable bool
delCons bool
dc bool // Delete JS consumer

// Ordered consumers
ordered bool
Expand All @@ -841,30 +841,20 @@ type jsSub struct {
fcd uint64
}

func (sub *Subscription) jsiUnsubscribe(jsi *jsSub, drainMode bool) error {
// Deletes the JS Consumer.
// No connection nor subscription lock must be held on entry.
func (sub *Subscription) deleteConsumer() error {
sub.mu.Lock()
durable, delCons := jsi.durable, jsi.delCons
jsi := sub.jsi
if jsi == nil {
sub.mu.Unlock()
return nil
}
stream, consumer := jsi.stream, jsi.consumer
js := jsi.js
if jsi.hbc != nil {
jsi.hbc.Stop()
jsi.hbc = nil
}
sub.mu.Unlock()

// Delete the JS consumer only if the library created the JS consumer,
// in which case delCons==true, but even then, in drain mode if this
// is a durable, do not delete.
if !delCons || (drainMode && durable) {
return nil
}
// We don't want to possibly fail a drain because we were not able to
// delete the consumer.
err := js.DeleteConsumer(stream, consumer)
if drainMode {
return nil
}
return err
return js.DeleteConsumer(stream, consumer)
}

// SubOpt configures options for subscribing to JetStream consumers.
Expand Down Expand Up @@ -1117,6 +1107,7 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync,
dseq: 1,
pull: isPullMode,
nms: nms,
dc: o.dc,
}

sub, err = nc.subscribe(deliver, queue, cb, ch, isSync, jsi)
Expand Down Expand Up @@ -1222,15 +1213,10 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync,
}
return nil, fmt.Errorf("nats: %s", cinfo.Error.Description)
}
} else {
// Since we created the JS consumer internally, mark that we should
// delete it on Unsubscribe().
sub.mu.Lock()
sub.jsi.delCons = true
} else if consumer == _EMPTY_ {
// Update our consumer name here which is filled in when we create the consumer.
if consumer == _EMPTY_ {
sub.jsi.consumer = info.Name
}
sub.mu.Lock()
sub.jsi.consumer = info.Name
sub.mu.Unlock()
}
}
Expand Down Expand Up @@ -1451,8 +1437,7 @@ func (sub *Subscription) activityCheck() {

if !active && !closed {
nc.mu.Lock()
errCB := nc.Opts.AsyncErrorCB
if errCB != nil {
if errCB := nc.Opts.AsyncErrorCB; errCB != nil {
nc.ach.push(func() { errCB(nc, sub, ErrConsumerNotActive) })
}
nc.mu.Unlock()
Expand Down Expand Up @@ -1578,6 +1563,8 @@ type subOpts struct {
mack bool
// For an ordered consumer.
ordered bool
// User wants the library to delete the JS consumer on sub.Unsubscribe()
dc bool
}

// OrderedConsumer will create a fifo direct/ephemeral consumer for in order delivery of messages.
Expand Down Expand Up @@ -1775,6 +1762,15 @@ func IdleHeartbeat(duration time.Duration) SubOpt {
})
}

// DeleteConsumer instructs the library to delete the JetStream consumer
// when calling Subscription.Unsubscribe().
func DeleteConsumer() SubOpt {
return subOptFn(func(opts *subOpts) error {
opts.dc = true
return nil
})
}

func (sub *Subscription) ConsumerInfo() (*ConsumerInfo, error) {
sub.mu.Lock()
// TODO(dlc) - Better way to mark especially if we attach.
Expand Down
41 changes: 29 additions & 12 deletions nats.go
Expand Up @@ -3808,6 +3808,12 @@ func (nc *Conn) removeSub(s *Subscription) {
}
s.mch = nil

// If JS subscription the stop HB timer.
if jsi := s.jsi; jsi != nil && jsi.hbc != nil {
jsi.hbc.Stop()
jsi.hbc = nil
}

// Mark as invalid
s.closed = true
if s.pCond != nil {
Expand Down Expand Up @@ -3878,6 +3884,7 @@ func (s *Subscription) Unsubscribe() error {
s.mu.Lock()
conn := s.conn
closed := s.closed
dc := s.jsi != nil && s.jsi.dc
s.mu.Unlock()
if conn == nil || conn.IsClosed() {
return ErrConnectionClosed
Expand All @@ -3888,7 +3895,13 @@ func (s *Subscription) Unsubscribe() error {
if conn.IsDraining() {
return ErrConnectionDraining
}
return conn.unsubscribe(s, 0, false)
if err := conn.unsubscribe(s, 0, false); err != nil {
return err
}
if dc {
return s.deleteConsumer()
}
return nil
}

// checkDrained will watch for a subscription to be fully drained
Expand All @@ -3902,6 +3915,12 @@ func (nc *Conn) checkDrained(sub *Subscription) {
// is correct and the server will not send additional information.
nc.Flush()

sub.mu.Lock()
// For JS subscriptions, check if we are going to delete the
// JS consumer when drain completes.
dc := sub.jsi != nil && sub.jsi.dc
sub.mu.Unlock()

// Once we are here we just wait for Pending to reach 0 or
// any other state to exit this go routine.
for {
Expand All @@ -3921,6 +3940,15 @@ func (nc *Conn) checkDrained(sub *Subscription) {
nc.mu.Lock()
nc.removeSub(sub)
nc.mu.Unlock()
if dc {
if err := sub.deleteConsumer(); err != nil {
nc.mu.Lock()
if errCB := nc.Opts.AsyncErrorCB; errCB != nil {
nc.ach.push(func() { errCB(nc, sub, err) })
}
nc.mu.Unlock()
}
}
return
}

Expand Down Expand Up @@ -3959,17 +3987,6 @@ func (nc *Conn) unsubscribe(sub *Subscription, max int, drainMode bool) error {
sub.mu.Unlock()
}

// For JetStream consumers, need to clean up ephemeral consumers.
sub.mu.Lock()
jsi := sub.jsi
sub.mu.Unlock()
if jsi != nil && maxStr == _EMPTY_ {
err := sub.jsiUnsubscribe(jsi, drainMode)
if err != nil {
return err
}
}

nc.mu.Lock()
// ok here, but defer is expensive
defer nc.mu.Unlock()
Expand Down

0 comments on commit edbc1cd

Please sign in to comment.