From f4387ec74e4116db972606f010bcf1aae1140b83 Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Tue, 10 Oct 2023 17:17:55 -0700 Subject: [PATCH 1/2] Fix for compaction with compression and added an out of band compaction in syncBlocks to reclaim more space. Signed-off-by: Derek Collison --- server/filestore.go | 86 ++++++++++++++++++++++++++++++---------- server/filestore_test.go | 69 ++++++++++++++++++++++++++++++++ 2 files changed, 133 insertions(+), 22 deletions(-) diff --git a/server/filestore.go b/server/filestore.go index 78e4481112..4621e5a8a9 100644 --- a/server/filestore.go +++ b/server/filestore.go @@ -3491,14 +3491,13 @@ func (fs *fileStore) removeMsg(seq uint64, secure, viaLimits, needFSLock bool) ( } else if !isEmpty { // Out of order delete. mb.dmap.Insert(seq) - // Check if <25% utilization and minimum size met. - if mb.rbytes > compactMinimum && !isLastBlock { - // Remove the interior delete records - rbytes := mb.rbytes - uint64(mb.dmap.Size()*emptyRecordLen) - if rbytes>>2 > mb.bytes { - mb.compact() - fs.kickFlushStateLoop() - } + // Make simple check here similar to Compact(). If we can save 50% and over a certain threshold do inline. + // All other more thorough cleanup will happen in syncBlocks logic. + // Note that we do not have to store empty records for the deleted, so don't use to calculate. + // TODO(dlc) - This should not be inline, should kick the sync routine. + if mb.rbytes > compactMinimum && mb.bytes*2 < mb.rbytes && !isLastBlock { + mb.compact() + fs.kickFlushStateLoop() } } @@ -3572,7 +3571,9 @@ func (mb *msgBlock) compact() { } buf := mb.cache.buf - nbuf := make([]byte, 0, len(buf)) + nbuf := getMsgBlockBuf(len(buf)) + // Recycle our nbuf when we are done. + defer recycleMsgBlockBuf(nbuf) var le = binary.LittleEndian var firstSet bool @@ -3622,9 +3623,16 @@ func (mb *msgBlock) compact() { } // Handle compression - var err error - if nbuf, err = mb.cmp.Compress(nbuf); err != nil { - return + if mb.cmp != NoCompression { + cbuf, err := mb.cmp.Compress(nbuf) + if err != nil { + return + } + meta := &CompressionInfo{ + Algorithm: mb.cmp, + OriginalSize: uint64(len(nbuf)), + } + nbuf = append(meta.MarshalMetadata(), cbuf...) } // Check for encryption. @@ -4701,6 +4709,24 @@ func (mb *msgBlock) decompressIfNeeded(buf []byte) ([]byte, error) { } } +// Lock should be held. +func (mb *msgBlock) ensureRawBytesLoaded() error { + if mb.rbytes > 0 { + return nil + } + f, err := os.Open(mb.mfn) + if err != nil { + return err + } + defer f.Close() + if fi, err := f.Stat(); fi != nil && err == nil { + mb.rbytes = uint64(fi.Size()) + } else { + return err + } + return nil +} + // Sync msg and index files as needed. This is called from a timer. func (fs *fileStore) syncBlocks() { fs.mu.RLock() @@ -4709,8 +4735,10 @@ func (fs *fileStore) syncBlocks() { return } blks := append([]*msgBlock(nil), fs.blks...) + lmb := fs.lmb fs.mu.RUnlock() + var shouldWriteState bool for _, mb := range blks { // Do actual sync. Hold lock for consistency. mb.mu.Lock() @@ -4722,24 +4750,33 @@ func (fs *fileStore) syncBlocks() { if mb.mfd != nil && mb.sinceLastWriteActivity() > closeFDsIdle { mb.dirtyCloseWithRemove(false) } + // Check if we should compact here as well. + // Do not compact last mb. + if mb != lmb && mb.ensureRawBytesLoaded() == nil && mb.rbytes > mb.bytes { + mb.compact() + shouldWriteState = true + } + // Check if we need to sync. We will not hold lock during actual sync. - var fn string - if mb.needSync { + needSync := mb.needSync + if needSync { // Flush anything that may be pending. - if mb.pendingWriteSizeLocked() > 0 { - mb.flushPendingMsgsLocked() - } - fn = mb.mfn - mb.needSync = false + mb.flushPendingMsgsLocked() } mb.mu.Unlock() // Check if we need to sync. // This is done not holding any locks. - if fn != _EMPTY_ { - if fd, _ := os.OpenFile(fn, os.O_RDWR, defaultFilePerms); fd != nil { - fd.Sync() + if needSync { + if fd, _ := os.OpenFile(mb.mfn, os.O_RDWR, defaultFilePerms); fd != nil { + canClear := fd.Sync() == nil fd.Close() + // Only clear sync flag on success. + if canClear { + mb.mu.Lock() + mb.needSync = false + mb.mu.Unlock() + } } } } @@ -4750,6 +4787,11 @@ func (fs *fileStore) syncBlocks() { syncAlways := fs.fcfg.SyncAlways fs.mu.Unlock() + // Check if we should write out our state due to compaction of one or more msg blocks. + if shouldWriteState { + fs.writeFullState() + } + // Sync state file if we are not running with sync always. if !syncAlways { if fd, _ := os.OpenFile(fn, os.O_RDWR, defaultFilePerms); fd != nil { fd.Sync() diff --git a/server/filestore_test.go b/server/filestore_test.go index d7fef22907..e7b7ec7031 100644 --- a/server/filestore_test.go +++ b/server/filestore_test.go @@ -6235,6 +6235,75 @@ func TestFileStoreFullStateMidBlockPastWAL(t *testing.T) { }) } +func TestFileStoreCompactingBlocksOnSync(t *testing.T) { + testFileStoreAllPermutations(t, func(t *testing.T, fcfg FileStoreConfig) { + fcfg.BlockSize = 1000 // 20 msgs per block. + fcfg.SyncInterval = 100 * time.Millisecond + scfg := StreamConfig{Name: "zzz", Subjects: []string{"*"}, Storage: FileStorage, MaxMsgsPer: 1} + + prf := func(context []byte) ([]byte, error) { + h := hmac.New(sha256.New, []byte("dlc22")) + if _, err := h.Write(context); err != nil { + return nil, err + } + return h.Sum(nil), nil + } + if fcfg.Cipher == NoCipher { + prf = nil + } + + fs, err := newFileStoreWithCreated(fcfg, scfg, time.Now(), prf, nil) + require_NoError(t, err) + defer fs.Stop() + + // This yields an internal record length of 50 bytes. So 20 msgs per blk. + msg := bytes.Repeat([]byte("Z"), 19) + subjects := "ABCDEFGHIJKLMNOPQRST" + for _, subj := range subjects { + fs.StoreMsg(string(subj), nil, msg) + } + require_Equal(t, fs.numMsgBlocks(), 1) + total, reported, err := fs.Utilization() + require_NoError(t, err) + + require_Equal(t, total, reported) + + // Now start removing, since we are small this should not kick in any inline logic. + // Remove all interior messages, leave 1 and 20. So write B-S + for i := 1; i < 19; i++ { + fs.StoreMsg(string(subjects[i]), nil, msg) + } + require_Equal(t, fs.numMsgBlocks(), 2) + + blkUtil := func() (uint64, uint64) { + fs.mu.RLock() + fmb := fs.blks[0] + fs.mu.RUnlock() + fmb.mu.RLock() + defer fmb.mu.RUnlock() + return fmb.rbytes, fmb.bytes + } + + total, reported = blkUtil() + require_Equal(t, reported, 100) + // Raw bytes will be 1000, but due to compression could be less. + if fcfg.Compression != NoCompression { + require_True(t, total > reported) + } else { + require_Equal(t, total, 1000) + } + + // Make sure the sync interval when kicked in compacts down to rbytes == 100. + checkFor(t, time.Second, 100*time.Millisecond, func() error { + if total, reported := blkUtil(); total <= reported { + return nil + } + return fmt.Errorf("Not compacted yet, raw %v vs reported %v", + friendlyBytes(total), friendlyBytes(reported)) + }) + }) +} + /////////////////////////////////////////////////////////////////////////// // Benchmarks /////////////////////////////////////////////////////////////////////////// From 842d600e3faae31ead3ab97e70b8747f27c626b2 Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Wed, 11 Oct 2023 07:54:36 -0700 Subject: [PATCH 2/2] Grab blk fn while mb lock held Signed-off-by: Derek Collison --- server/filestore.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/server/filestore.go b/server/filestore.go index 4621e5a8a9..54dd769c1b 100644 --- a/server/filestore.go +++ b/server/filestore.go @@ -4758,7 +4758,7 @@ func (fs *fileStore) syncBlocks() { } // Check if we need to sync. We will not hold lock during actual sync. - needSync := mb.needSync + needSync, fn := mb.needSync, mb.mfn if needSync { // Flush anything that may be pending. mb.flushPendingMsgsLocked() @@ -4768,7 +4768,7 @@ func (fs *fileStore) syncBlocks() { // Check if we need to sync. // This is done not holding any locks. if needSync { - if fd, _ := os.OpenFile(mb.mfn, os.O_RDWR, defaultFilePerms); fd != nil { + if fd, _ := os.OpenFile(fn, os.O_RDWR, defaultFilePerms); fd != nil { canClear := fd.Sync() == nil fd.Close() // Only clear sync flag on success.