From acffa0668affd35e4156c7b38f5c974d0668b6ab Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Sun, 17 Sep 2023 19:49:01 -0700 Subject: [PATCH] Various fixes and improvements to tombstone and buffer gaps. Signed-off-by: Derek Collison --- server/filestore.go | 87 +++++++++++++++-------------- server/filestore_test.go | 115 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 162 insertions(+), 40 deletions(-) diff --git a/server/filestore.go b/server/filestore.go index 6562e4f8c8..8e054e1206 100644 --- a/server/filestore.go +++ b/server/filestore.go @@ -1559,6 +1559,7 @@ func (fs *fileStore) recoverFullState() (rerr error) { } if numBlocks := readU64(); numBlocks > 0 { + lastIndex := int(numBlocks - 1) fs.blks = make([]*msgBlock, 0, numBlocks) for i := 0; i < int(numBlocks); i++ { index, nbytes, fseq, fts, lseq, lts, numDeleted := uint32(readU64()), readU64(), readU64(), readI64(), readU64(), readI64(), readU64() @@ -1578,7 +1579,13 @@ func (fs *fileStore) recoverFullState() (rerr error) { mb.msgs -= numDeleted bi += n } - fs.addMsgBlock(mb) + // Only add in if not empty or the lmb. + if mb.msgs > 0 || i == lastIndex { + fs.addMsgBlock(mb) + } else { + // Mark dirty to cleanup. + fs.dirty++ + } } } @@ -2905,10 +2912,6 @@ func (fs *fileStore) storeRawMsg(subj string, hdr, msg []byte, seq uint64, ts in return err } - // Mark dirty here since we added in a new message. - // We do not kick the flusher, that happens on new msg block for write or Stop(). - fs.dirty++ - // Adjust top level tracking of per subject msg counts. if len(subj) > 0 { index := fs.lmb.index @@ -3249,9 +3252,13 @@ func (fs *fileStore) enforceMsgPerSubjectLimit() { } } - // Now write updated index for all affected msgBlks. + // Expire the cache if we can. for mb := range blks { - mb.tryForceExpireCacheLocked() + mb.mu.Lock() + if mb.msgs > 0 { + mb.tryForceExpireCacheLocked() + } + mb.mu.Unlock() } } @@ -3483,6 +3490,7 @@ func (fs *fileStore) removeMsg(seq uint64, secure, viaLimits, needFSLock bool) ( // We will write a tombstone at the end. var firstSeqNeedsUpdate bool if isEmpty { + // This writes tombstone iff mb == lmb, so no need to do below. fs.removeMsgBlock(mb) firstSeqNeedsUpdate = seq == fs.state.FirstSeq } @@ -3498,11 +3506,12 @@ func (fs *fileStore) removeMsg(seq uint64, secure, viaLimits, needFSLock bool) ( // Check if we need to write a deleted record tombstone. // This is for user initiated removes or to hold the first seq // when the last block is empty. - if !viaLimits || (isEmpty && isLastBlock) { + + // If not via limits and not empty and last (empty writes tombstone above if last) write tombstone. + if !viaLimits && !(isEmpty && isLastBlock) { if lmb := fs.lmb; sm != nil && lmb != nil { lmb.writeTombstone(sm.seq, sm.ts) } - fs.kickFlushStateLoop() } if cb := fs.scb; cb != nil { @@ -3552,9 +3561,6 @@ func (mb *msgBlock) compact() { return mb.dmap.Exists(seq) } - // For skip msgs. - var smh [msgHdrSize]byte - for index, lbuf := uint32(0), uint32(len(buf)); index < lbuf; { if index+msgHdrSize > lbuf { return @@ -3572,28 +3578,25 @@ func (mb *msgBlock) compact() { seq := le.Uint64(hdr[4:]) if !isDeleted(seq) { - // Normal message here. - nbuf = append(nbuf, buf[index:index+rl]...) - // Do not set based on tombstone. - if !firstSet && seq&tbit == 0 { - firstSet = true - mb.first.seq = seq - } - } else if firstSet { - // This is an interior delete that we need to make sure we have a placeholder for. - le.PutUint32(smh[0:], emptyRecordLen) - le.PutUint64(smh[4:], seq|ebit) - le.PutUint64(smh[12:], 0) - le.PutUint16(smh[20:], 0) - nbuf = append(nbuf, smh[:]...) - mb.hh.Reset() - mb.hh.Write(smh[4:20]) - checksum := mb.hh.Sum(nil) - nbuf = append(nbuf, checksum...) - } - // Always set last. - mb.last.seq = seq &^ ebit - + // Check for tombstones. + if seq&tbit != 0 { + // If we are last mb we should consider to keep these unless the tombstone reflects a seq in this mb. + if mb == mb.fs.lmb && seq < mb.first.seq { + nbuf = append(nbuf, buf[index:index+rl]...) + } + } else { + // Normal message here. + nbuf = append(nbuf, buf[index:index+rl]...) + if !firstSet { + firstSet = true + mb.first.seq = seq + } + } + } + // Always set last as long as not a tombstone. + if seq&tbit == 0 { + mb.last.seq = seq &^ ebit + } // Advance to next record. index += rl } @@ -4273,7 +4276,6 @@ func (mb *msgBlock) enableForWriting(fip bool) error { } // Helper function to place a delete tombstone. -// Lock should be held. func (mb *msgBlock) writeTombstone(seq uint64, ts int64) error { return mb.writeMsgRecord(emptyRecordLen, seq|tbit, _EMPTY_, nil, nil, ts, true) } @@ -4299,6 +4301,7 @@ func (mb *msgBlock) writeMsgRecord(rl, seq uint64, subj string, mhdr, msg []byte // Check if we are tracking per subject for our simple state. // Do this before changing the cache that would trigger a flush pending msgs call // if we needed to regenerate the per subject info. + // Note that tombstones have no subject so will not trigger here. if len(subj) > 0 && !mb.noTrack { if err := mb.ensurePerSubjectInfoLoaded(); err != nil { return err @@ -4808,7 +4811,7 @@ func (fs *fileStore) selectMsgBlockForStart(minTime time.Time) *msgBlock { func (mb *msgBlock) indexCacheBuf(buf []byte) error { var le = binary.LittleEndian - var fseq, pseq uint64 + var fseq uint64 var idx []uint32 var index uint32 @@ -4865,14 +4868,13 @@ func (mb *msgBlock) indexCacheBuf(buf []byte) error { // not introduce latency for first message from a newly loaded block. if seq >= mb.first.seq { // Track that we do not have holes. - // Not expected but did see it in the field. - if pseq > 0 && seq != pseq+1 { - for dseq := pseq + 1; dseq < seq; dseq++ { + if slot := int(seq - mb.first.seq); slot != len(idx) { + // If we have a hole fill it. + for dseq := mb.first.seq + uint64(len(idx)); dseq < seq; dseq++ { idx = append(idx, dbit) mb.dmap.Insert(dseq) } } - pseq = seq // Add to our index. idx = append(idx, index) mb.cache.lrl = uint32(rl) @@ -4881,6 +4883,11 @@ func (mb *msgBlock) indexCacheBuf(buf []byte) error { fseq = seq } + // Make sure our dmap has this entry if it was erased. + if erased { + mb.dmap.Insert(seq) + } + // Handle FSS inline here. if slen > 0 && !mb.noTrack && !erased && !mb.dmap.Exists(seq) { bsubj := buf[index+msgHdrSize : index+msgHdrSize+uint32(slen)] diff --git a/server/filestore_test.go b/server/filestore_test.go index 72c1024698..a961ef9e11 100644 --- a/server/filestore_test.go +++ b/server/filestore_test.go @@ -5981,6 +5981,121 @@ func TestFileStoreSelectBlockWithFirstSeqRemovals(t *testing.T) { }) } +func TestFileStoreMsgBlockHolesAndIndexing(t *testing.T) { + fs, err := newFileStore( + FileStoreConfig{StoreDir: t.TempDir()}, + StreamConfig{Name: "zzz", Subjects: []string{"*"}, Storage: FileStorage, MaxMsgsPer: 1}, + ) + require_NoError(t, err) + defer fs.Stop() + + // Grab the message block by hand and manipulate at that level. + mb := fs.getFirstBlock() + writeMsg := func(subj string, seq uint64) { + rl := fileStoreMsgSize(subj, nil, []byte(subj)) + require_NoError(t, mb.writeMsgRecord(rl, seq, subj, nil, []byte(subj), time.Now().UnixNano(), true)) + } + readMsg := func(seq uint64, expectedSubj string) { + // Clear cache so we load back in from disk and need to properly process anyholes. + ld, tombs, err := mb.rebuildState() + require_NoError(t, err) + require_Equal(t, ld, nil) + require_Equal(t, len(tombs), 0) + fs.rebuildState(nil) + sm, _, err := mb.fetchMsg(seq, nil) + require_NoError(t, err) + require_Equal(t, sm.subj, expectedSubj) + require_True(t, bytes.Equal(sm.buf, []byte(expectedSubj))) + } + + writeMsg("A", 2) + require_Equal(t, mb.first.seq, 2) + require_Equal(t, mb.last.seq, 2) + + writeMsg("B", 4) + require_Equal(t, mb.first.seq, 2) + require_Equal(t, mb.last.seq, 4) + + writeMsg("C", 12) + + readMsg(4, "B") + require_True(t, mb.dmap.Exists(3)) + + readMsg(12, "C") + readMsg(2, "A") + + // Check that we get deleted for the right ones etc. + checkDeleted := func(seq uint64) { + _, _, err = mb.fetchMsg(seq, nil) + require_Error(t, err, ErrStoreMsgNotFound, errDeletedMsg) + mb.mu.RLock() + shouldExist, exists := seq >= mb.first.seq, mb.dmap.Exists(seq) + mb.mu.RUnlock() + if shouldExist { + require_True(t, exists) + } + } + checkDeleted(1) + checkDeleted(3) + for seq := 5; seq < 12; seq++ { + checkDeleted(uint64(seq)) + } +} + +func TestFileStoreMsgBlockCompactionAndHoles(t *testing.T) { + fs, err := newFileStore( + FileStoreConfig{StoreDir: t.TempDir()}, + StreamConfig{Name: "zzz", Subjects: []string{"*"}, Storage: FileStorage, MaxMsgsPer: 1}, + ) + require_NoError(t, err) + defer fs.Stop() + + msg := bytes.Repeat([]byte("Z"), 1024) + for _, subj := range []string{"A", "B", "C", "D", "E", "F", "G", "H", "I", "J"} { + fs.StoreMsg(subj, nil, msg) + } + // Leave first one but delete the rest. + for seq := uint64(2); seq < 10; seq++ { + fs.RemoveMsg(seq) + } + require_Equal(t, fs.numMsgBlocks(), 1) + mb := fs.getFirstBlock() + require_NotNil(t, mb) + + _, ub, _ := fs.Utilization() + + // Do compaction, should remove all excess now. + mb.mu.Lock() + mb.compact() + mb.mu.Unlock() + + ta, ua, _ := fs.Utilization() + require_Equal(t, ub, ua) + require_Equal(t, ta, ua) +} + +func TestFileStoreRemoveLastNoDoubleTombstones(t *testing.T) { + fs, err := newFileStore( + FileStoreConfig{StoreDir: t.TempDir()}, + StreamConfig{Name: "zzz", Subjects: []string{"*"}, Storage: FileStorage, MaxMsgsPer: 1}, + ) + require_NoError(t, err) + defer fs.Stop() + + fs.StoreMsg("A", nil, []byte("hello")) + fs.mu.Lock() + fs.removeMsgViaLimits(1) + fs.mu.Unlock() + + require_Equal(t, fs.numMsgBlocks(), 1) + mb := fs.getFirstBlock() + require_NotNil(t, mb) + mb.loadMsgs() + rbytes, _, err := fs.Utilization() + require_NoError(t, err) + require_Equal(t, rbytes, emptyRecordLen) +} + /////////////////////////////////////////////////////////////////////////// // Benchmarks ///////////////////////////////////////////////////////////////////////////