From 7d041da3c88eaa4216e426d7ec6183c27c579026 Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Sun, 10 Sep 2023 11:07:27 -0700 Subject: [PATCH] Fix for datarace on clfs Signed-off-by: Derek Collison --- server/jetstream_cluster.go | 9 +++------ server/stream.go | 14 +++++++++++++- 2 files changed, 16 insertions(+), 7 deletions(-) diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index 9c4cf1067b..30a0384c7a 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -2892,9 +2892,7 @@ func (js *jetStream) applyStreamEntries(mset *stream, ce *CommittedEntry, isReco } } else if isRecovering { // On recovery, reset CLFS/FAILED. - mset.mu.Lock() - mset.clfs = ss.Failed - mset.mu.Unlock() + mset.setCLFS(ss.Failed) } } else if e.Type == EntryRemovePeer { js.mu.RLock() @@ -7268,7 +7266,7 @@ func (mset *stream) stateSnapshot() []byte { func (mset *stream) stateSnapshotLocked() []byte { // Decide if we can support the new style of stream snapshots. if mset.supportsBinarySnapshotLocked() { - snap, _ := mset.store.EncodedStreamState(mset.clfs) + snap, _ := mset.store.EncodedStreamState(mset.getCLFS()) return snap } @@ -7470,10 +7468,9 @@ func (mset *stream) processClusteredInboundMsg(subject, reply string, hdr, msg [ mset.clMu.Lock() if mset.clseq == 0 || mset.clseq < lseq { // Re-capture - lseq, clfs = mset.lastSeqAndCLFS() + lseq, clfs = mset.lseq, mset.clfs mset.clseq = lseq + clfs } - esm := encodeStreamMsgAllowCompress(subject, reply, hdr, msg, mset.clseq, time.Now().UnixNano(), mset.compressOK) mset.clseq++ diff --git a/server/stream.go b/server/stream.go index 015b9b2d6a..e673b15746 100644 --- a/server/stream.go +++ b/server/stream.go @@ -979,7 +979,7 @@ func (mset *stream) rebuildDedupe() { func (mset *stream) lastSeqAndCLFS() (uint64, uint64) { mset.mu.RLock() defer mset.mu.RUnlock() - return mset.lseq, mset.clfs + return mset.lseq, mset.getCLFS() } func (mset *stream) clearCLFS() uint64 { @@ -990,6 +990,18 @@ func (mset *stream) clearCLFS() uint64 { return clfs } +func (mset *stream) getCLFS() uint64 { + mset.clMu.Lock() + defer mset.clMu.Unlock() + return mset.clfs +} + +func (mset *stream) setCLFS(clfs uint64) { + mset.clMu.Lock() + mset.clfs = clfs + mset.clMu.Unlock() +} + func (mset *stream) lastSeq() uint64 { mset.mu.RLock() lseq := mset.lseq