Skip to content

Commit

Permalink
Allow 2.10 tombstones to be skipped and allow us to recover on downgr…
Browse files Browse the repository at this point in the history
…ade from 2.10 to 2.9.

Also fixed small bug that could set bad first seq.

Signed-off-by: Derek Collison <derek@nats.io>
  • Loading branch information
derekcollison committed Aug 30, 2023
1 parent d6e7106 commit 8841432
Show file tree
Hide file tree
Showing 2 changed files with 139 additions and 19 deletions.
62 changes: 44 additions & 18 deletions server/filestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -1081,6 +1081,12 @@ func (mb *msgBlock) rebuildStateLocked() (*LostStreamData, error) {
seq := le.Uint64(hdr[4:])
ts := int64(le.Uint64(hdr[12:]))

// Check if this is a delete tombstone.
if seq&tbit != 0 {
index += rl
continue
}

// This is an old erased message, or a new one that we can track.
if seq == 0 || seq&ebit != 0 || seq < mb.first.seq {
seq = seq &^ ebit
Expand Down Expand Up @@ -2392,6 +2398,12 @@ func (fs *fileStore) storeRawMsg(subj string, hdr, msg []byte, seq uint64, ts in
}
}
}
} else if mb := fs.selectMsgBlock(fseq); mb != nil {
// If we are here we could not remove fseq from above, so rebuild.
var ld *LostStreamData
if ld, _ = mb.rebuildState(); ld != nil {
fs.rebuildStateLocked(ld)
}
}
}

Expand Down Expand Up @@ -3048,7 +3060,8 @@ func (mb *msgBlock) compact() {
if !isDeleted(seq) {
// Normal message here.
nbuf = append(nbuf, buf[index:index+rl]...)
if !firstSet {
// Do not set based on tombstone.
if !firstSet && seq&tbit == 0 {
firstSet = true
mb.first.seq = seq
}
Expand Down Expand Up @@ -3811,20 +3824,25 @@ func (mb *msgBlock) writeMsgRecord(rl, seq uint64, subj string, mhdr, msg []byte
// Update write through cache.
// Write to msg record.
mb.cache.buf = append(mb.cache.buf, checksum...)
// Write index
mb.cache.idx = append(mb.cache.idx, uint32(index)|hbit)
mb.cache.lrl = uint32(rl)
if mb.cache.fseq == 0 {
mb.cache.fseq = seq
}

// Set cache timestamp for last store.
mb.lwts = ts
// Decide if we write index info if flushing in place.
writeIndex := ts-mb.lwits > wiThresh

// Accounting
mb.updateAccounting(seq, ts, rl)
// Only update index and do accounting if not a delete tombstone.
if seq&tbit == 0 {
// Strip ebit if set.
seq = seq &^ ebit
if mb.cache.fseq == 0 {
mb.cache.fseq = seq
}
// Write index
mb.cache.idx = append(mb.cache.idx, uint32(index)|hbit)
// Accounting
mb.updateAccounting(seq, ts, rl)
}

fch, werr := mb.fch, mb.werr

Expand Down Expand Up @@ -3927,7 +3945,7 @@ func (mb *msgBlock) updateAccounting(seq uint64, ts int64, rl uint64) {
seq = seq &^ ebit
}

if mb.first.seq == 0 || mb.first.ts == 0 {
if mb.first.seq == 0 || mb.first.ts == 0 && seq >= mb.first.seq {
mb.first.seq = seq
mb.first.ts = ts
}
Expand Down Expand Up @@ -4112,6 +4130,12 @@ func (mb *msgBlock) indexCacheBuf(buf []byte) error {
return errCorruptState
}

// Check for tombstones which we can skip in terms of indexing.
if seq&tbit != 0 {
index += rl
continue
}

// Clear erase bit.
seq = seq &^ ebit

Expand Down Expand Up @@ -4467,15 +4491,17 @@ var (
errNoMainKey = errors.New("encrypted store encountered with no main key")
)

// Used for marking messages that have had their checksums checked.
// Used to signal a message record with headers.
const hbit = 1 << 31

// Used for marking erased messages sequences.
const ebit = 1 << 63

// Used to mark a bad index as deleted.
const dbit = 1 << 30
const (
// Used for marking messages that have had their checksums checked.
// Used to signal a message record with headers.
hbit = 1 << 31
// Used for marking erased messages sequences.
ebit = 1 << 63
// Used for marking tombstone sequences.
tbit = 1 << 62
// Used to mark a bad index as deleted.
dbit = 1 << 30
)

// Will do a lookup from cache.
// Lock should be held.
Expand Down
96 changes: 95 additions & 1 deletion server/filestore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5501,6 +5501,7 @@ func TestFileStoreNumPendingLargeNumBlks(t *testing.T) {
}
fs, err := newFileStore(fcfg, StreamConfig{Name: "zzz", Subjects: []string{"zzz"}, Storage: FileStorage})
require_NoError(t, err)
defer fs.Stop()

subj, msg := "zzz", bytes.Repeat([]byte("X"), 100)
numMsgs := 10_000
Expand Down Expand Up @@ -5554,6 +5555,7 @@ func TestFileStoreRestoreEncryptedWithNoKeyFuncFails(t *testing.T) {
prf,
)
require_NoError(t, err)
defer fs.Stop()

subj, msg := "zzz", bytes.Repeat([]byte("X"), 100)
numMsgs := 100
Expand All @@ -5572,9 +5574,10 @@ func TestFileStoreRestoreEncryptedWithNoKeyFuncFails(t *testing.T) {
require_Error(t, err, errNoMainKey)
}

func TestFileStoreRecaluclateFirstForSubjBug(t *testing.T) {
func TestFileStoreRecalculateFirstForSubjBug(t *testing.T) {
fs, err := newFileStore(FileStoreConfig{StoreDir: t.TempDir()}, StreamConfig{Name: "zzz", Subjects: []string{"*"}, Storage: FileStorage})
require_NoError(t, err)
defer fs.Stop()

fs.StoreMsg("foo", nil, nil) // 1
fs.StoreMsg("bar", nil, nil) // 2
Expand Down Expand Up @@ -5607,6 +5610,7 @@ func TestFileStoreRecaluclateFirstForSubjBug(t *testing.T) {
func TestFileStoreKeepWithDeletedMsgsBug(t *testing.T) {
fs, err := newFileStore(FileStoreConfig{StoreDir: t.TempDir()}, StreamConfig{Name: "zzz", Subjects: []string{"*"}, Storage: FileStorage})
require_NoError(t, err)
defer fs.Stop()

msg := bytes.Repeat([]byte("A"), 19)
for i := 0; i < 5; i++ {
Expand All @@ -5623,3 +5627,93 @@ func TestFileStoreKeepWithDeletedMsgsBug(t *testing.T) {
require_NoError(t, err)
require_True(t, n == 3)
}

// This is for 2.10 delete tombstones and backward compatibility if a user downgrades to 2.9.x
// TODO(dlc) - Can remove once merged into 2.10 codebase.
func TestFileStoreTombstoneBackwardCompatibility(t *testing.T) {
sd := t.TempDir()
fs, err := newFileStore(FileStoreConfig{StoreDir: sd}, StreamConfig{Name: "zzz", Subjects: []string{"*"}, Storage: FileStorage})
require_NoError(t, err)
defer fs.Stop()

// We will test scenarios where tombstones are embedded in a filestore from a 2.10 system.
msgA := bytes.Repeat([]byte("A"), 22)
msgZ := bytes.Repeat([]byte("Z"), 22)

fs.StoreMsg("A", nil, msgA)
fs.StoreMsg("B", nil, msgZ)

mb := fs.getFirstBlock()
require_True(t, mb != nil)

// >= 2.10 tombstone
mb.writeMsgRecord(emptyRecordLen, 2|tbit, _EMPTY_, nil, nil, time.Now().UnixNano(), true)

// Put a real message behind it.
fs.StoreMsg("C", nil, msgA)

checkState := func() {
state := fs.State()
require_True(t, state.Msgs == 3)
require_True(t, state.FirstSeq == 1)
require_True(t, state.LastSeq == 3)
require_True(t, state.NumSubjects == 3)

sm, err := fs.LoadMsg(2, nil)
require_NoError(t, err)
require_True(t, bytes.Equal(sm.msg, msgZ))
require_True(t, sm.subj == "B")

sm, err = fs.LoadMsg(3, nil)
require_NoError(t, err)
require_True(t, bytes.Equal(sm.msg, msgA))
require_True(t, sm.subj == "C")
}

checkState()
fs.Stop()

// Make sure we are good on recreate.
fs, err = newFileStore(FileStoreConfig{StoreDir: sd}, StreamConfig{Name: "zzz", Subjects: []string{"*"}, Storage: FileStorage})
require_NoError(t, err)
defer fs.Stop()

checkState()

// Now we will purge, place tombstone first, then add messages and check.
_, err = fs.Purge()
require_NoError(t, err)

// >= 2.10 tombstone
mb.writeMsgRecord(emptyRecordLen, 22|tbit, _EMPTY_, nil, nil, time.Now().UnixNano(), true)

fs.StoreMsg("A", nil, msgA) // seq 4
fs.StoreMsg("B", nil, msgZ) // seq 5

checkPurgeState := func() {
state := fs.State()
require_True(t, state.Msgs == 2)
require_True(t, state.FirstSeq == 4)
require_True(t, state.LastSeq == 5)
require_True(t, state.NumSubjects == 2)

sm, err := fs.LoadMsg(4, nil)
require_NoError(t, err)
require_True(t, bytes.Equal(sm.msg, msgA))
require_True(t, sm.subj == "A")

sm, err = fs.LoadMsg(5, nil)
require_NoError(t, err)
require_True(t, bytes.Equal(sm.msg, msgZ))
require_True(t, sm.subj == "B")
}

checkPurgeState()

// Make sure we are good on recreate.
fs, err = newFileStore(FileStoreConfig{StoreDir: sd}, StreamConfig{Name: "zzz", Subjects: []string{"*"}, Storage: FileStorage})
require_NoError(t, err)
defer fs.Stop()

checkPurgeState()
}

0 comments on commit 8841432

Please sign in to comment.