Skip to content

Commit

Permalink
[IMPROVED] Do not hold filestore lock on msg block loads for firstSeq…
Browse files Browse the repository at this point in the history
…ForSubj. (#5363)

In cases where max per subject is utilized, once that condition is hit
storing a new message involves writing the new one, flushing it, then
determining the current first seq for the subject so we can remove it.
Determining the first sequence for the subject could involve walking
multiple blocks and loading them in as well, all while the filestore
lock is being held causing delays for other access to the filestore,
even for things like FastState().

Signed-off-by: Derek Collison <derek@nats.io>
  • Loading branch information
derekcollison committed Apr 28, 2024
2 parents a72ea23 + 0cee693 commit d3a31dd
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 3 deletions.
12 changes: 11 additions & 1 deletion server/filestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -3566,7 +3566,7 @@ func (fs *fileStore) rebuildFirst() {
// Optimized helper function to return first sequence.
// subj will always be publish subject here, meaning non-wildcard.
// We assume a fast check that this subj even exists already happened.
// Lock should be held.
// Write lock should be held.
func (fs *fileStore) firstSeqForSubj(subj string) (uint64, error) {
if len(fs.blks) == 0 {
return 0, nil
Expand All @@ -3583,12 +3583,18 @@ func (fs *fileStore) firstSeqForSubj(subj string) (uint64, error) {
if mb == nil {
continue
}
// If we need to load msgs here and we need to walk multiple blocks this
// could tie up the upper fs lock, so release while dealing with the block.
fs.mu.Unlock()

mb.mu.Lock()
var shouldExpire bool
if mb.fssNotLoaded() {
// Make sure we have fss loaded.
if err := mb.loadMsgsWithLock(); err != nil {
mb.mu.Unlock()
// Re-acquire fs lock
fs.mu.Lock()
return 0, err
}
shouldExpire = true
Expand All @@ -3604,6 +3610,8 @@ func (fs *fileStore) firstSeqForSubj(subj string) (uint64, error) {
mb.recalculateFirstForSubj(subj, ss.First, ss)
}
mb.mu.Unlock()
// Re-acquire fs lock
fs.mu.Lock()
return ss.First, nil
}
// If we did not find it and we loaded this msgBlock try to expire as long as not the last.
Expand All @@ -3612,6 +3620,8 @@ func (fs *fileStore) firstSeqForSubj(subj string) (uint64, error) {
mb.tryForceExpireCacheLocked()
}
mb.mu.Unlock()
// Re-acquire fs lock
fs.mu.Lock()
}
return 0, nil
}
Expand Down
4 changes: 2 additions & 2 deletions server/norace_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5542,14 +5542,14 @@ func TestNoRaceJetStreamFileStoreLargeKVAccessTiming(t *testing.T) {

// time first seq lookup for both as well.
// Base will be first in this case.
fs.mu.RLock()
fs.mu.Lock()
start = time.Now()
fs.firstSeqForSubj(first)
base = time.Since(start)
start = time.Now()
fs.firstSeqForSubj(last)
slow = time.Since(start)
fs.mu.RUnlock()
fs.mu.Unlock()

if base > 100*time.Microsecond || slow > 200*time.Microsecond {
t.Fatalf("Took too long to look up last key by subject vs first: %v vs %v", base, slow)
Expand Down

0 comments on commit d3a31dd

Please sign in to comment.