Skip to content

Commit

Permalink
[FIXED] Possible lock inversion and incorrect delete of JS consumer
Browse files Browse the repository at this point in the history
- Some refactoring to prevent lock inversion (no connection publish
should be done under the subscription lock).
- Remove used of jsSub mutex since so far everything can be done
under the protection of the subscription's lock.
- Attempt to delete JS consumer on Unsubscribe *only* if the library
called AddConsumer and got a success.

Resolves #775
Resolves #776

Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
  • Loading branch information
kozlovic committed Aug 10, 2021
1 parent d1955c8 commit 00b033f
Show file tree
Hide file tree
Showing 3 changed files with 121 additions and 114 deletions.
157 changes: 72 additions & 85 deletions js.go
Expand Up @@ -812,7 +812,6 @@ type nextRequest struct {

// jsSub includes JetStream subscription info.
type jsSub struct {
mu sync.RWMutex
js *js

// For pull subscribers, this is the next message subject to send requests to.
Expand All @@ -823,7 +822,7 @@ type jsSub struct {
deliver string
pull bool
durable bool
attached bool
delCons bool

// Ordered consumers
ordered bool
Expand All @@ -838,27 +837,34 @@ type jsSub struct {
active bool
fc bool
cmeta string
fcs map[uint64]string
fcr string
fcd uint64
}

func (jsi *jsSub) unsubscribe(drainMode bool) error {
jsi.mu.Lock()
durable, attached := jsi.durable, jsi.attached
func (sub *Subscription) jsiUnsubscribe(jsi *jsSub, drainMode bool) error {
sub.mu.Lock()
durable, delCons := jsi.durable, jsi.delCons
stream, consumer := jsi.stream, jsi.consumer
js := jsi.js
if jsi.hbc != nil {
jsi.hbc.Stop()
jsi.hbc = nil
}
jsi.mu.Unlock()
sub.mu.Unlock()

if drainMode && (durable || attached) {
// Skip deleting consumer for durables/attached
// consumers when using drain mode.
// Delete the JS consumer only if the library created the JS consumer,
// in which case delCons==true, but even then, in drain mode if this
// is a durable, do not delete.
if !delCons || (drainMode && durable) {
return nil
}

return js.DeleteConsumer(stream, consumer)
// We don't want to possibly fail a drain because we were not able to
// delete the consumer.
err := js.DeleteConsumer(stream, consumer)
if drainMode {
return nil
}
return err
}

// SubOpt configures options for subscribing to JetStream consumers.
Expand Down Expand Up @@ -941,7 +947,6 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync,
ccfg *ConsumerConfig
info *ConsumerInfo
deliver string
attached bool
stream = o.stream
consumer = o.consumer
isDurable = o.cfg.Durable != _EMPTY_
Expand Down Expand Up @@ -1019,7 +1024,6 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync,
case info != nil:
// Attach using the found consumer config.
ccfg = &info.Config
attached = true

// Make sure this new subject matches or is a subset.
if ccfg.FilterSubject != _EMPTY_ && subj != ccfg.FilterSubject {
Expand Down Expand Up @@ -1104,7 +1108,6 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync,
stream: stream,
consumer: consumer,
durable: isDurable,
attached: attached,
deliver: deliver,
hbs: hasHeartbeats,
hbi: o.cfg.Heartbeat,
Expand Down Expand Up @@ -1196,9 +1199,6 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync,
return nil, ErrSubjectMismatch
}

// Update attached status.
jsi.attached = true

// Use the deliver subject from latest consumer config to attach.
if info.Config.DeliverSubject != _EMPTY_ {
// We can't reuse the channel, so if one was passed, we need to create a new one.
Expand All @@ -1222,10 +1222,15 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync,
}
return nil, fmt.Errorf("nats: %s", cinfo.Error.Description)
}
} else if consumer == _EMPTY_ {
// Update our consumer name here which is filled in when we create the consumer.
} else {
// Since we created the JS consumer internally, mark that we should
// delete it on Unsubscribe().
sub.mu.Lock()
sub.jsi.consumer = info.Name
sub.jsi.delCons = true
// Update our consumer name here which is filled in when we create the consumer.
if consumer == _EMPTY_ {
sub.jsi.consumer = info.Name
}
sub.mu.Unlock()
}
}
Expand Down Expand Up @@ -1267,10 +1272,11 @@ func isControlMessage(msg *Msg) bool {
return len(msg.Data) == 0 && msg.Header.Get(statusHdr) == controlMsg
}

func (jsi *jsSub) trackSequences(reply string) {
jsi.mu.Lock()
jsi.cmeta = reply
jsi.mu.Unlock()
// Keeps track of the incoming message's reply subject so that the consumer's
// state (deliver sequence, etc..) can be checked against heartbeats.
// Runs under the subscription lock
func (sub *Subscription) trackSequences(reply string) {
sub.jsi.cmeta = reply
}

// Check to make sure messages are arriving in order.
Expand All @@ -1290,16 +1296,12 @@ func (sub *Subscription) checkOrderedMsgs(m *Msg) bool {
sseq, dseq := uint64(parseNum(tokens[5])), uint64(parseNum(tokens[6]))

jsi := sub.jsi
jsi.mu.Lock()
if dseq != jsi.dseq {
rseq := jsi.sseq + 1
jsi.mu.Unlock()
sub.resetOrderedConsumer(rseq)
sub.resetOrderedConsumer(jsi.sseq + 1)
return true
}
// Update our tracking here.
jsi.dseq, jsi.sseq = dseq+1, sseq
jsi.mu.Unlock()
return false
}

Expand Down Expand Up @@ -1328,8 +1330,7 @@ func (sub *Subscription) applyNewSID() (osid int64) {
// Lock should be held.
func (sub *Subscription) resetOrderedConsumer(sseq uint64) {
nc := sub.conn
closed := sub.closed
if sub.jsi == nil || nc == nil || closed {
if sub.jsi == nil || nc == nil || sub.closed {
return
}

Expand All @@ -1340,8 +1341,8 @@ func (sub *Subscription) resetOrderedConsumer(sseq uint64) {
newDeliver := nc.newInbox()
sub.Subject = newDeliver

// Snapshot jsi under sub lock here.
jsi := sub.jsi
// Snapshot the new sid under sub lock.
nsid := sub.sid

// We are still in the low level readloop for the connection so we need
// to spin a go routine to try to create the new consumer.
Expand All @@ -1351,7 +1352,7 @@ func (sub *Subscription) resetOrderedConsumer(sseq uint64) {
// This is done here in this go routine to prevent lock inversion.
nc.mu.Lock()
nc.bw.appendString(fmt.Sprintf(unsubProto, osid, _EMPTY_))
nc.bw.appendString(fmt.Sprintf(subProto, newDeliver, _EMPTY_, sub.sid))
nc.bw.appendString(fmt.Sprintf(subProto, newDeliver, _EMPTY_, nsid))
nc.kickFlusher()
nc.mu.Unlock()

Expand All @@ -1360,11 +1361,12 @@ func (sub *Subscription) resetOrderedConsumer(sseq uint64) {
nc.unsubscribe(sub, 0, true)
}

jsi.mu.Lock()
sub.mu.Lock()
jsi := sub.jsi
// Reset some items in jsi.
jsi.dseq = 1
jsi.cmeta = _EMPTY_
jsi.fcs = nil
jsi.fcr, jsi.fcd = _EMPTY_, 0
jsi.deliver = newDeliver
// Reset consumer request for starting policy.
cfg := jsi.ccreq.Config
Expand All @@ -1375,7 +1377,7 @@ func (sub *Subscription) resetOrderedConsumer(sseq uint64) {
ccSubj := fmt.Sprintf(apiConsumerCreateT, jsi.stream)
j, err := json.Marshal(jsi.ccreq)
js := jsi.js
jsi.mu.Unlock()
sub.mu.Unlock()

if err != nil {
pushErr(err)
Expand Down Expand Up @@ -1403,86 +1405,71 @@ func (sub *Subscription) resetOrderedConsumer(sseq uint64) {
return
}

jsi.mu.Lock()
sub.mu.Lock()
jsi.consumer = cinfo.Name
jsi.mu.Unlock()
sub.mu.Unlock()
}()
}

// checkForFlowControlResponse will check to see if we should send a flow control response
// based on the delivered index.
// Lock should be held.
func (sub *Subscription) checkForFlowControlResponse(delivered uint64) {
jsi, nc := sub.jsi, sub.conn
if jsi == nil {
return
}

jsi.mu.Lock()
defer jsi.mu.Unlock()

if len(jsi.fcs) == 0 {
return
}

if reply := jsi.fcs[delivered]; reply != _EMPTY_ {
delete(jsi.fcs, delivered)
nc.Publish(reply, nil)
// based on the subscription current delivered index and the target.
// Runs under subscription lock
func (sub *Subscription) checkForFlowControlResponse() string {
// Caller has verified that there is a sub.jsi and fc
jsi := sub.jsi
if jsi.fcd == sub.delivered {
fcr := jsi.fcr
jsi.fcr, jsi.fcd = _EMPTY_, 0
return fcr
}
return _EMPTY_
}

// Record an inbound flow control message.
func (jsi *jsSub) scheduleFlowControlResponse(dfuture uint64, reply string) {
jsi.mu.Lock()
if jsi.fcs == nil {
jsi.fcs = make(map[uint64]string)
}
jsi.fcs[dfuture] = reply
jsi.mu.Unlock()
// Runs under subscription lock
func (sub *Subscription) scheduleFlowControlResponse(dfuture uint64, reply string) {
jsi := sub.jsi
jsi.fcr, jsi.fcd = reply, dfuture
}

// Checks for activity from our consumer.
// If we do not think we are active send an async error.
func (sub *Subscription) activityCheck() {
sub.mu.Lock()
jsi := sub.jsi
if jsi == nil {
sub.mu.Unlock()
return
}

jsi.mu.Lock()
active := jsi.active
jsi.hbc.Reset(jsi.hbi)
jsi.active = false
jsi.mu.Unlock()

if !active {
sub.mu.Lock()
nc := sub.conn
closed := sub.closed
sub.mu.Unlock()
nc := sub.conn
closed := sub.closed
sub.mu.Unlock()

if !closed {
nc.mu.Lock()
errCB := nc.Opts.AsyncErrorCB
if errCB != nil {
nc.ach.push(func() { errCB(nc, sub, ErrConsumerNotActive) })
}
nc.mu.Unlock()
if !active && !closed {
nc.mu.Lock()
errCB := nc.Opts.AsyncErrorCB
if errCB != nil {
nc.ach.push(func() { errCB(nc, sub, ErrConsumerNotActive) })
}
nc.mu.Unlock()
}
}

// scheduleHeartbeatCheck sets up the timer check to make sure we are active
// or receiving idle heartbeats..
func (sub *Subscription) scheduleHeartbeatCheck() {
sub.mu.Lock()
defer sub.mu.Unlock()

jsi := sub.jsi
if jsi == nil {
return
}

jsi.mu.Lock()
defer jsi.mu.Unlock()

if jsi.hbc == nil {
jsi.hbc = time.AfterFunc(jsi.hbi*hbcThresh, sub.activityCheck)
} else {
Expand All @@ -1503,10 +1490,10 @@ func (nc *Conn) handleConsumerSequenceMismatch(sub *Subscription, err error) {
// checkForSequenceMismatch will make sure we have not missed any messages since last seen.
func (nc *Conn) checkForSequenceMismatch(msg *Msg, s *Subscription, jsi *jsSub) {
// Process heartbeat received, get latest control metadata if present.
jsi.mu.Lock()
s.mu.Lock()
ctrl, ordered := jsi.cmeta, jsi.ordered
jsi.active = true
jsi.mu.Unlock()
s.mu.Unlock()

if ctrl == _EMPTY_ {
return
Expand Down

0 comments on commit 00b033f

Please sign in to comment.