From 8841432d03971347e87592c24e79e40aeaa0f0d1 Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Wed, 30 Aug 2023 14:38:41 -0700 Subject: [PATCH] Allow 2.10 tombstones to be skipped and allow us to recover on downgrade from 2.10 to 2.9. Also fixed small bug that could set bad first seq. Signed-off-by: Derek Collison --- server/filestore.go | 62 ++++++++++++++++++-------- server/filestore_test.go | 96 +++++++++++++++++++++++++++++++++++++++- 2 files changed, 139 insertions(+), 19 deletions(-) diff --git a/server/filestore.go b/server/filestore.go index e5de6a973c..b95294bec3 100644 --- a/server/filestore.go +++ b/server/filestore.go @@ -1081,6 +1081,12 @@ func (mb *msgBlock) rebuildStateLocked() (*LostStreamData, error) { seq := le.Uint64(hdr[4:]) ts := int64(le.Uint64(hdr[12:])) + // Check if this is a delete tombstone. + if seq&tbit != 0 { + index += rl + continue + } + // This is an old erased message, or a new one that we can track. if seq == 0 || seq&ebit != 0 || seq < mb.first.seq { seq = seq &^ ebit @@ -2392,6 +2398,12 @@ func (fs *fileStore) storeRawMsg(subj string, hdr, msg []byte, seq uint64, ts in } } } + } else if mb := fs.selectMsgBlock(fseq); mb != nil { + // If we are here we could not remove fseq from above, so rebuild. + var ld *LostStreamData + if ld, _ = mb.rebuildState(); ld != nil { + fs.rebuildStateLocked(ld) + } } } @@ -3048,7 +3060,8 @@ func (mb *msgBlock) compact() { if !isDeleted(seq) { // Normal message here. nbuf = append(nbuf, buf[index:index+rl]...) - if !firstSet { + // Do not set based on tombstone. + if !firstSet && seq&tbit == 0 { firstSet = true mb.first.seq = seq } @@ -3811,20 +3824,25 @@ func (mb *msgBlock) writeMsgRecord(rl, seq uint64, subj string, mhdr, msg []byte // Update write through cache. // Write to msg record. mb.cache.buf = append(mb.cache.buf, checksum...) - // Write index - mb.cache.idx = append(mb.cache.idx, uint32(index)|hbit) mb.cache.lrl = uint32(rl) - if mb.cache.fseq == 0 { - mb.cache.fseq = seq - } // Set cache timestamp for last store. mb.lwts = ts // Decide if we write index info if flushing in place. writeIndex := ts-mb.lwits > wiThresh - // Accounting - mb.updateAccounting(seq, ts, rl) + // Only update index and do accounting if not a delete tombstone. + if seq&tbit == 0 { + // Strip ebit if set. + seq = seq &^ ebit + if mb.cache.fseq == 0 { + mb.cache.fseq = seq + } + // Write index + mb.cache.idx = append(mb.cache.idx, uint32(index)|hbit) + // Accounting + mb.updateAccounting(seq, ts, rl) + } fch, werr := mb.fch, mb.werr @@ -3927,7 +3945,7 @@ func (mb *msgBlock) updateAccounting(seq uint64, ts int64, rl uint64) { seq = seq &^ ebit } - if mb.first.seq == 0 || mb.first.ts == 0 { + if mb.first.seq == 0 || mb.first.ts == 0 && seq >= mb.first.seq { mb.first.seq = seq mb.first.ts = ts } @@ -4112,6 +4130,12 @@ func (mb *msgBlock) indexCacheBuf(buf []byte) error { return errCorruptState } + // Check for tombstones which we can skip in terms of indexing. + if seq&tbit != 0 { + index += rl + continue + } + // Clear erase bit. seq = seq &^ ebit @@ -4467,15 +4491,17 @@ var ( errNoMainKey = errors.New("encrypted store encountered with no main key") ) -// Used for marking messages that have had their checksums checked. -// Used to signal a message record with headers. -const hbit = 1 << 31 - -// Used for marking erased messages sequences. -const ebit = 1 << 63 - -// Used to mark a bad index as deleted. -const dbit = 1 << 30 +const ( + // Used for marking messages that have had their checksums checked. + // Used to signal a message record with headers. + hbit = 1 << 31 + // Used for marking erased messages sequences. + ebit = 1 << 63 + // Used for marking tombstone sequences. + tbit = 1 << 62 + // Used to mark a bad index as deleted. + dbit = 1 << 30 +) // Will do a lookup from cache. // Lock should be held. diff --git a/server/filestore_test.go b/server/filestore_test.go index a30f99426e..303e65cf3c 100644 --- a/server/filestore_test.go +++ b/server/filestore_test.go @@ -5501,6 +5501,7 @@ func TestFileStoreNumPendingLargeNumBlks(t *testing.T) { } fs, err := newFileStore(fcfg, StreamConfig{Name: "zzz", Subjects: []string{"zzz"}, Storage: FileStorage}) require_NoError(t, err) + defer fs.Stop() subj, msg := "zzz", bytes.Repeat([]byte("X"), 100) numMsgs := 10_000 @@ -5554,6 +5555,7 @@ func TestFileStoreRestoreEncryptedWithNoKeyFuncFails(t *testing.T) { prf, ) require_NoError(t, err) + defer fs.Stop() subj, msg := "zzz", bytes.Repeat([]byte("X"), 100) numMsgs := 100 @@ -5572,9 +5574,10 @@ func TestFileStoreRestoreEncryptedWithNoKeyFuncFails(t *testing.T) { require_Error(t, err, errNoMainKey) } -func TestFileStoreRecaluclateFirstForSubjBug(t *testing.T) { +func TestFileStoreRecalculateFirstForSubjBug(t *testing.T) { fs, err := newFileStore(FileStoreConfig{StoreDir: t.TempDir()}, StreamConfig{Name: "zzz", Subjects: []string{"*"}, Storage: FileStorage}) require_NoError(t, err) + defer fs.Stop() fs.StoreMsg("foo", nil, nil) // 1 fs.StoreMsg("bar", nil, nil) // 2 @@ -5607,6 +5610,7 @@ func TestFileStoreRecaluclateFirstForSubjBug(t *testing.T) { func TestFileStoreKeepWithDeletedMsgsBug(t *testing.T) { fs, err := newFileStore(FileStoreConfig{StoreDir: t.TempDir()}, StreamConfig{Name: "zzz", Subjects: []string{"*"}, Storage: FileStorage}) require_NoError(t, err) + defer fs.Stop() msg := bytes.Repeat([]byte("A"), 19) for i := 0; i < 5; i++ { @@ -5623,3 +5627,93 @@ func TestFileStoreKeepWithDeletedMsgsBug(t *testing.T) { require_NoError(t, err) require_True(t, n == 3) } + +// This is for 2.10 delete tombstones and backward compatibility if a user downgrades to 2.9.x +// TODO(dlc) - Can remove once merged into 2.10 codebase. +func TestFileStoreTombstoneBackwardCompatibility(t *testing.T) { + sd := t.TempDir() + fs, err := newFileStore(FileStoreConfig{StoreDir: sd}, StreamConfig{Name: "zzz", Subjects: []string{"*"}, Storage: FileStorage}) + require_NoError(t, err) + defer fs.Stop() + + // We will test scenarios where tombstones are embedded in a filestore from a 2.10 system. + msgA := bytes.Repeat([]byte("A"), 22) + msgZ := bytes.Repeat([]byte("Z"), 22) + + fs.StoreMsg("A", nil, msgA) + fs.StoreMsg("B", nil, msgZ) + + mb := fs.getFirstBlock() + require_True(t, mb != nil) + + // >= 2.10 tombstone + mb.writeMsgRecord(emptyRecordLen, 2|tbit, _EMPTY_, nil, nil, time.Now().UnixNano(), true) + + // Put a real message behind it. + fs.StoreMsg("C", nil, msgA) + + checkState := func() { + state := fs.State() + require_True(t, state.Msgs == 3) + require_True(t, state.FirstSeq == 1) + require_True(t, state.LastSeq == 3) + require_True(t, state.NumSubjects == 3) + + sm, err := fs.LoadMsg(2, nil) + require_NoError(t, err) + require_True(t, bytes.Equal(sm.msg, msgZ)) + require_True(t, sm.subj == "B") + + sm, err = fs.LoadMsg(3, nil) + require_NoError(t, err) + require_True(t, bytes.Equal(sm.msg, msgA)) + require_True(t, sm.subj == "C") + } + + checkState() + fs.Stop() + + // Make sure we are good on recreate. + fs, err = newFileStore(FileStoreConfig{StoreDir: sd}, StreamConfig{Name: "zzz", Subjects: []string{"*"}, Storage: FileStorage}) + require_NoError(t, err) + defer fs.Stop() + + checkState() + + // Now we will purge, place tombstone first, then add messages and check. + _, err = fs.Purge() + require_NoError(t, err) + + // >= 2.10 tombstone + mb.writeMsgRecord(emptyRecordLen, 22|tbit, _EMPTY_, nil, nil, time.Now().UnixNano(), true) + + fs.StoreMsg("A", nil, msgA) // seq 4 + fs.StoreMsg("B", nil, msgZ) // seq 5 + + checkPurgeState := func() { + state := fs.State() + require_True(t, state.Msgs == 2) + require_True(t, state.FirstSeq == 4) + require_True(t, state.LastSeq == 5) + require_True(t, state.NumSubjects == 2) + + sm, err := fs.LoadMsg(4, nil) + require_NoError(t, err) + require_True(t, bytes.Equal(sm.msg, msgA)) + require_True(t, sm.subj == "A") + + sm, err = fs.LoadMsg(5, nil) + require_NoError(t, err) + require_True(t, bytes.Equal(sm.msg, msgZ)) + require_True(t, sm.subj == "B") + } + + checkPurgeState() + + // Make sure we are good on recreate. + fs, err = newFileStore(FileStoreConfig{StoreDir: sd}, StreamConfig{Name: "zzz", Subjects: []string{"*"}, Storage: FileStorage}) + require_NoError(t, err) + defer fs.Stop() + + checkPurgeState() +}