From 787f6acf31aec48b6bb397e993b9f57ec3712cec Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Wed, 13 Sep 2023 15:35:34 -0700 Subject: [PATCH] Fix for a call into fs.recalculateFirstForSubj() from fs.recalculateFirstForSubj() that did not lock the mb properly. Signed-off-by: Derek Collison --- server/filestore.go | 7 ++--- server/filestore_test.go | 55 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 59 insertions(+), 3 deletions(-) diff --git a/server/filestore.go b/server/filestore.go index 57659cc5e3..6562e4f8c8 100644 --- a/server/filestore.go +++ b/server/filestore.go @@ -3114,9 +3114,7 @@ func (fs *fileStore) firstSeqForSubj(subj string) (uint64, error) { mb.mu.Unlock() return 0, err } - ss := mb.fss[subj] - mb.mu.Unlock() - if ss != nil { + if ss := mb.fss[subj]; ss != nil { // Adjust first if it was not where we thought it should be. if i != start { if info, ok := fs.psim[subj]; ok { @@ -3126,8 +3124,10 @@ func (fs *fileStore) firstSeqForSubj(subj string) (uint64, error) { if ss.firstNeedsUpdate { mb.recalculateFirstForSubj(subj, ss.First, ss) } + mb.mu.Unlock() return ss.First, nil } + mb.mu.Unlock() } return 0, nil } @@ -6578,6 +6578,7 @@ func (mb *msgBlock) recalculateFirstForSubj(subj string, startSeq uint64, ss *Si return } } + // Mark first as updated. ss.firstNeedsUpdate = false startSeq++ diff --git a/server/filestore_test.go b/server/filestore_test.go index b3f5a009cf..2be7eb0b28 100644 --- a/server/filestore_test.go +++ b/server/filestore_test.go @@ -29,6 +29,7 @@ import ( "os" "path/filepath" "reflect" + "sync" "testing" "time" @@ -5429,6 +5430,60 @@ func TestFileStoreSyncIntervals(t *testing.T) { checkSyncFlag(false) } +// https://github.com/nats-io/nats-server/issues/4529 +// Run this wuth --race and you will see the unlocked access that probably caused this. +func TestFileStoreRecalcFirstSequenceBug(t *testing.T) { + fcfg := FileStoreConfig{StoreDir: t.TempDir()} + fs, err := newFileStore(fcfg, StreamConfig{Name: "zzz", Subjects: []string{"*"}, MaxMsgsPer: 2, Storage: FileStorage}) + require_NoError(t, err) + defer fs.Stop() + + msg := bytes.Repeat([]byte("A"), 22) + + for _, subj := range []string{"A", "A", "B", "B"} { + fs.StoreMsg(subj, nil, msg) + } + // Make sure the buffer is cleared. + clearLMBCache := func() { + fs.mu.RLock() + mb := fs.lmb + fs.mu.RUnlock() + mb.mu.Lock() + mb.clearCacheAndOffset() + mb.mu.Unlock() + } + + clearLMBCache() + + // Do first here. + fs.StoreMsg("A", nil, msg) + + var wg sync.WaitGroup + start := make(chan bool) + + wg.Add(1) + go func() { + defer wg.Done() + <-start + for i := 0; i < 1_000; i++ { + fs.LoadLastMsg("A", nil) + clearLMBCache() + } + }() + + wg.Add(1) + go func() { + defer wg.Done() + <-start + for i := 0; i < 1_000; i++ { + fs.StoreMsg("A", nil, msg) + } + }() + + close(start) + wg.Wait() +} + /////////////////////////////////////////////////////////////////////////// // New WAL based architecture tests ///////////////////////////////////////////////////////////////////////////