From 22be518d4509c75bbd1c1f67ee44c7ad8226ba61 Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Wed, 13 Sep 2023 15:35:34 -0700 Subject: [PATCH] Fix for a call into fs.recalculateFirstForSubj() from fs.recalculateFirstForSubj() that did not lock the mb properly. Signed-off-by: Derek Collison --- server/filestore.go | 7 ++--- server/filestore_test.go | 55 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 59 insertions(+), 3 deletions(-) diff --git a/server/filestore.go b/server/filestore.go index 34ef6ad82f..48d6fdb9ff 100644 --- a/server/filestore.go +++ b/server/filestore.go @@ -2571,9 +2571,7 @@ func (fs *fileStore) firstSeqForSubj(subj string) (uint64, error) { mb.mu.Unlock() return 0, err } - ss := mb.fss[subj] - mb.mu.Unlock() - if ss != nil { + if ss := mb.fss[subj]; ss != nil { // Adjust first if it was not where we thought it should be. if i != start { if info, ok := fs.psim[subj]; ok { @@ -2583,8 +2581,10 @@ func (fs *fileStore) firstSeqForSubj(subj string) (uint64, error) { if ss.firstNeedsUpdate { mb.recalculateFirstForSubj(subj, ss.First, ss) } + mb.mu.Unlock() return ss.First, nil } + mb.mu.Unlock() } return 0, nil } @@ -5957,6 +5957,7 @@ func (mb *msgBlock) recalculateFirstForSubj(subj string, startSeq uint64, ss *Si return } } + // Mark first as updated. ss.firstNeedsUpdate = false startSeq++ diff --git a/server/filestore_test.go b/server/filestore_test.go index a1c3560311..a25a16c429 100644 --- a/server/filestore_test.go +++ b/server/filestore_test.go @@ -31,6 +31,7 @@ import ( "path/filepath" "reflect" "strings" + "sync" "testing" "time" @@ -5823,3 +5824,57 @@ func TestFileStoreErrPartialLoadOnSyncClose(t *testing.T) { _, err = fs.LoadMsg(1, nil) require_NoError(t, err) } + +// https://github.com/nats-io/nats-server/issues/4529 +// Run this wuth --race and you will see the unlocked access that probably caused this. +func TestFileStoreRecalcFirstSequenceBug(t *testing.T) { + fcfg := FileStoreConfig{StoreDir: t.TempDir()} + fs, err := newFileStore(fcfg, StreamConfig{Name: "zzz", Subjects: []string{"*"}, MaxMsgsPer: 2, Storage: FileStorage}) + require_NoError(t, err) + defer fs.Stop() + + msg := bytes.Repeat([]byte("A"), 22) + + for _, subj := range []string{"A", "A", "B", "B"} { + fs.StoreMsg(subj, nil, msg) + } + // Make sure the buffer is cleared. + clearLMBCache := func() { + fs.mu.RLock() + mb := fs.lmb + fs.mu.RUnlock() + mb.mu.Lock() + mb.clearCacheAndOffset() + mb.mu.Unlock() + } + + clearLMBCache() + + // Do first here. + fs.StoreMsg("A", nil, msg) + + var wg sync.WaitGroup + start := make(chan bool) + + wg.Add(1) + go func() { + defer wg.Done() + <-start + for i := 0; i < 1_000; i++ { + fs.LoadLastMsg("A", nil) + clearLMBCache() + } + }() + + wg.Add(1) + go func() { + defer wg.Done() + <-start + for i := 0; i < 1_000; i++ { + fs.StoreMsg("A", nil, msg) + } + }() + + close(start) + wg.Wait() +}