Skip to content

Commit

Permalink
Lock when bumping clfs. Make sure to unlock mset lock before bumpCLFS.
Browse files Browse the repository at this point in the history
Locking order is clMu then mset lock.

Signed-off-by: Derek Collison <derek@nats.io>
  • Loading branch information
derekcollison committed Mar 7, 2024
1 parent b9c2d75 commit 677c69a
Showing 1 changed file with 14 additions and 12 deletions.
26 changes: 14 additions & 12 deletions server/stream.go
Expand Up @@ -4396,7 +4396,9 @@ func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte,
if traceOnly {
return
}
mset.clMu.Lock()
mset.clfs++
mset.clMu.Unlock()
}

// Apply the input subject transform if any
Expand Down Expand Up @@ -4427,8 +4429,8 @@ func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte,
// Bail here if sealed.
if isSealed {
outq := mset.outq
bumpCLFS()
mset.mu.Unlock()
bumpCLFS()
if canRespond && outq != nil {
resp.PubAck = &PubAck{Stream: name}
resp.Error = ApiErrors[JSStreamSealedErr]
Expand Down Expand Up @@ -4495,8 +4497,8 @@ func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte,
if !isClustered || traceOnly {
// Expected stream.
if sname := getExpectedStream(hdr); sname != _EMPTY_ && sname != name {
bumpCLFS()
mset.mu.Unlock()
bumpCLFS()
if canRespond {
resp.PubAck = &PubAck{Stream: name}
resp.Error = NewJSStreamNotMatchError()
Expand All @@ -4510,8 +4512,8 @@ func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte,
// Dedupe detection.
if msgId = getMsgId(hdr); msgId != _EMPTY_ {
if dde := mset.checkMsgId(msgId); dde != nil {
bumpCLFS()
mset.mu.Unlock()
bumpCLFS()
if canRespond {
response := append(pubAck, strconv.FormatUint(dde.seq, 10)...)
response = append(response, ",\"duplicate\": true}"...)
Expand All @@ -4534,8 +4536,8 @@ func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte,
fseq, err = 0, nil
}
if err != nil || fseq != seq {
bumpCLFS()
mset.mu.Unlock()
bumpCLFS()
if canRespond {
resp.PubAck = &PubAck{Stream: name}
resp.Error = NewJSStreamWrongLastSequenceError(fseq)
Expand All @@ -4549,8 +4551,8 @@ func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte,
// Expected last sequence.
if seq, exists := getExpectedLastSeq(hdr); exists && seq != mset.lseq {
mlseq := mset.lseq
bumpCLFS()
mset.mu.Unlock()
bumpCLFS()
if canRespond {
resp.PubAck = &PubAck{Stream: name}
resp.Error = NewJSStreamWrongLastSequenceError(mlseq)
Expand All @@ -4566,8 +4568,8 @@ func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte,
}
if lmsgId != mset.lmsgId {
last := mset.lmsgId
bumpCLFS()
mset.mu.Unlock()
bumpCLFS()
if canRespond {
resp.PubAck = &PubAck{Stream: name}
resp.Error = NewJSStreamWrongLastMsgIDError(last)
Expand All @@ -4580,8 +4582,8 @@ func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte,
// Check for any rollups.
if rollup := getRollup(hdr); rollup != _EMPTY_ {
if !mset.cfg.AllowRollup || mset.cfg.DenyPurge {
bumpCLFS()
mset.mu.Unlock()
bumpCLFS()
if canRespond {
resp.PubAck = &PubAck{Stream: name}
resp.Error = NewJSStreamRollupFailedError(errors.New("rollup not permitted"))
Expand All @@ -4596,8 +4598,8 @@ func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte,
case JSMsgRollupAll:
rollupAll = true
default:
bumpCLFS()
mset.mu.Unlock()
bumpCLFS()
err := fmt.Errorf("rollup value invalid: %q", rollup)
if canRespond {
resp.PubAck = &PubAck{Stream: name}
Expand All @@ -4619,8 +4621,8 @@ func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte,

// Check to see if we are over the max msg size.
if maxMsgSize >= 0 && (len(hdr)+len(msg)) > maxMsgSize {
bumpCLFS()
mset.mu.Unlock()
bumpCLFS()
if canRespond {
resp.PubAck = &PubAck{Stream: name}
resp.Error = NewJSStreamMessageExceedsMaximumError()
Expand All @@ -4631,8 +4633,8 @@ func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte,
}

if len(hdr) > math.MaxUint16 {
bumpCLFS()
mset.mu.Unlock()
bumpCLFS()
if canRespond {
resp.PubAck = &PubAck{Stream: name}
resp.Error = NewJSStreamHeaderExceedsMaximumError()
Expand All @@ -4645,8 +4647,8 @@ func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte,
// Check to see if we have exceeded our limits.
if js.limitsExceeded(stype) {
s.resourcesExceededError()
bumpCLFS()
mset.mu.Unlock()
bumpCLFS()
if canRespond {
resp.PubAck = &PubAck{Stream: name}
resp.Error = NewJSInsufficientResourcesError()
Expand Down Expand Up @@ -4773,8 +4775,8 @@ func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte,
mset.store.FastState(&state)
mset.lseq = state.LastSeq
mset.lmsgId = olmsgId
bumpCLFS()
mset.mu.Unlock()
bumpCLFS()

switch err {
case ErrMaxMsgs, ErrMaxBytes, ErrMaxMsgsPerSubject, ErrMsgTooLarge:
Expand Down

0 comments on commit 677c69a

Please sign in to comment.