Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[IMPROVED] Add in warnings for filestore recover state if happy path fails. #4599

Merged
merged 1 commit into from
Sep 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 27 additions & 7 deletions server/filestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,9 @@ type FileStoreConfig struct {
Cipher StoreCipher
// Compression is the algorithm to use when compressing.
Compression StoreCompression

// Internal reference to our server.
srv *Server
}

// FileStreamInfo allows us to remember created time.
Expand Down Expand Up @@ -387,6 +390,7 @@ func newFileStoreWithCreated(fcfg FileStoreConfig, cfg StreamConfig, created tim
qch: make(chan struct{}),
fch: make(chan struct{}, 1),
fsld: make(chan struct{}),
srv: fcfg.srv,
}

// Set flush in place to AsyncFlush which by default is false.
Expand Down Expand Up @@ -527,12 +531,6 @@ func newFileStoreWithCreated(fcfg FileStoreConfig, cfg StreamConfig, created tim
return fs, nil
}

func (fs *fileStore) registerServer(s *Server) {
fs.mu.Lock()
defer fs.mu.Unlock()
fs.srv = s
}

// Lock all existing message blocks.
// Lock held on entry.
func (fs *fileStore) lockAllMsgBlocks() {
Expand Down Expand Up @@ -1436,6 +1434,16 @@ func (mb *msgBlock) rebuildStateLocked() (*LostStreamData, []uint64, error) {
return nil, tombstones, nil
}

// For doing warn logging.
// Lock should be held.
func (fs *fileStore) warn(format string, args ...any) {
// No-op if no server configured.
if fs.srv == nil {
return
}
fs.srv.Warnf(fmt.Sprintf("Filestore [%s] %s", fs.cfg.Name, format), args...)
}

// recoverFullState will attempt to receover our last full state and re-process any state changes
// that happened afterwards.
func (fs *fileStore) recoverFullState() (rerr error) {
Expand All @@ -1455,12 +1463,16 @@ func (fs *fileStore) recoverFullState() (rerr error) {
dios <- struct{}{}

if err != nil {
if !os.IsNotExist(err) {
fs.warn("Could not read stream state file: %v", err)
}
return err
}

const minLen = 32
if len(buf) < minLen {
os.Remove(fn)
fs.warn("Stream state too short (%d bytes)", len(buf))
return errCorruptState
}

Expand All @@ -1471,6 +1483,7 @@ func (fs *fileStore) recoverFullState() (rerr error) {
fs.hh.Write(buf)
if !bytes.Equal(h, fs.hh.Sum(nil)) {
os.Remove(fn)
fs.warn("Stream state checksum did not match")
return errCorruptState
}

Expand All @@ -1482,13 +1495,15 @@ func (fs *fileStore) recoverFullState() (rerr error) {
ns := fs.aek.NonceSize()
buf, err = fs.aek.Open(nil, buf[:ns], buf[ns:], nil)
if err != nil {
fs.warn("Stream state error reading encryption key: %v", err)
return err
}
}
}

if buf[0] != fullStateMagic || buf[1] != fullStateVersion {
os.Remove(fn)
fs.warn("Stream state magic and version mismatch")
return errCorruptState
}

Expand Down Expand Up @@ -1543,6 +1558,7 @@ func (fs *fileStore) recoverFullState() (rerr error) {
if lsubj := int(readU64()); lsubj > 0 {
if bi+lsubj > len(buf) {
os.Remove(fn)
fs.warn("Stream state bad subject len (%d)", lsubj)
return errCorruptState
}
subj := fs.subjString(buf[bi : bi+lsubj])
Expand Down Expand Up @@ -1573,6 +1589,7 @@ func (fs *fileStore) recoverFullState() (rerr error) {
dmap, n, err := avl.Decode(buf[bi:])
if err != nil {
os.Remove(fn)
fs.warn("Stream state error decoding avl dmap: %v", err)
return errCorruptState
}
mb.dmap = *dmap
Expand Down Expand Up @@ -1605,6 +1622,7 @@ func (fs *fileStore) recoverFullState() (rerr error) {
// Check if we had any errors.
if bi < 0 {
os.Remove(fn)
fs.warn("Stream state has no checksum present")
return errCorruptState
}

Expand All @@ -1621,9 +1639,9 @@ func (fs *fileStore) recoverFullState() (rerr error) {
if ld, _, _ := mb.rebuildState(); ld != nil {
fs.addLostData(ld)
}
fs.warn("Stream state detected prior state, could not locate msg block %d", blkIndex)
return errPriorState
}

if matched = bytes.Equal(mb.lastChecksum(), lchk[:]); !matched {
// Remove the last message block since we will re-process below.
fs.removeMsgBlockFromList(mb)
Expand All @@ -1644,12 +1662,14 @@ func (fs *fileStore) recoverFullState() (rerr error) {
return nil
}
os.Remove(fn)
fs.warn("Stream state could not recover msg block %d", bi)
return err
}
if nmb != nil {
// Check if we have to account for a partial message block.
if !matched && mb != nil && mb.index == nmb.index {
if err := fs.adjustAccounting(mb, nmb); err != nil {
fs.warn("Stream state could not adjust accounting: %v", err)
return err
}
}
Expand Down
69 changes: 69 additions & 0 deletions server/filestore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6096,6 +6096,75 @@ func TestFileStoreRemoveLastNoDoubleTombstones(t *testing.T) {
require_Equal(t, rbytes, emptyRecordLen)
}

func TestFileStoreFullStateMultiBlockPastWAL(t *testing.T) {
testFileStoreAllPermutations(t, func(t *testing.T, fcfg FileStoreConfig) {
fcfg.BlockSize = 100
scfg := StreamConfig{Name: "zzz", Subjects: []string{"*"}, Storage: FileStorage}

prf := func(context []byte) ([]byte, error) {
h := hmac.New(sha256.New, []byte("dlc22"))
if _, err := h.Write(context); err != nil {
return nil, err
}
return h.Sum(nil), nil
}
if fcfg.Cipher == NoCipher {
prf = nil
}

fs, err := newFileStoreWithCreated(fcfg, scfg, time.Now(), prf, nil)
require_NoError(t, err)
defer fs.Stop()

// This yields an internal record length of 50 bytes. So 2 msgs per blk.
msgLen := 19
msgA := bytes.Repeat([]byte("A"), msgLen)
msgZ := bytes.Repeat([]byte("Z"), msgLen)

// Store 2 msgs
fs.StoreMsg("A", nil, msgA)
fs.StoreMsg("B", nil, msgZ)
require_Equal(t, fs.numMsgBlocks(), 1)
fs.Stop()

// Grab the state from this stop.
sfile := filepath.Join(fcfg.StoreDir, msgDir, streamStreamStateFile)
buf, err := os.ReadFile(sfile)
require_NoError(t, err)

fs, err = newFileStoreWithCreated(fcfg, scfg, time.Now(), prf, nil)
require_NoError(t, err)
defer fs.Stop()

// Store 2 more msgs and delete 2 & 4, then another 2 msgs.
fs.StoreMsg("C", nil, msgA)
fs.StoreMsg("D", nil, msgZ)
fs.StoreMsg("E", nil, msgA)
fs.StoreMsg("F", nil, msgZ)
fs.StoreMsg("G", nil, msgA)
fs.StoreMsg("H", nil, msgZ)
require_Equal(t, fs.numMsgBlocks(), 4)
state := fs.State()
fs.Stop()

// Put back old stream state.
// This will test that we properly walk multiple blocks past where we snapshotted state.
fs.Stop()
err = os.WriteFile(sfile, buf, defaultFilePerms)
require_NoError(t, err)

fs, err = newFileStoreWithCreated(fcfg, scfg, time.Now(), prf, nil)
require_NoError(t, err)
defer fs.Stop()

if newState := fs.State(); !reflect.DeepEqual(state, newState) {
t.Fatalf("Restore state does not match:\n%+v\n%+v",
state, newState)
}
require_True(t, !state.FirstTime.IsZero())
})
}

///////////////////////////////////////////////////////////////////////////
// Benchmarks
///////////////////////////////////////////////////////////////////////////
Expand Down
9 changes: 2 additions & 7 deletions server/jetstream_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -755,7 +755,7 @@ func (js *jetStream) setupMetaGroup() error {
storeDir := filepath.Join(js.config.StoreDir, sysAcc.Name, defaultStoreDirName, defaultMetaGroupName)

fs, err := newFileStoreWithCreated(
FileStoreConfig{StoreDir: storeDir, BlockSize: defaultMetaFSBlkSize, AsyncFlush: false},
FileStoreConfig{StoreDir: storeDir, BlockSize: defaultMetaFSBlkSize, AsyncFlush: false, srv: s},
StreamConfig{Name: defaultMetaGroupName, Storage: FileStorage},
time.Now().UTC(),
s.jsKeyGen(s.getOpts().JetStreamKey, defaultMetaGroupName),
Expand All @@ -766,9 +766,6 @@ func (js *jetStream) setupMetaGroup() error {
return err
}

// Register our server.
fs.registerServer(s)

cfg := &RaftConfig{Name: defaultMetaGroupName, Store: storeDir, Log: fs}

// If we are soliciting leafnode connections and we are sharing a system account and do not disable it with a hint,
Expand Down Expand Up @@ -2032,7 +2029,7 @@ func (js *jetStream) createRaftGroup(accName string, rg *raftGroup, storage Stor
var store StreamStore
if storage == FileStorage {
fs, err := newFileStoreWithCreated(
FileStoreConfig{StoreDir: storeDir, BlockSize: defaultMediumBlockSize, AsyncFlush: false, SyncInterval: 5 * time.Minute},
FileStoreConfig{StoreDir: storeDir, BlockSize: defaultMediumBlockSize, AsyncFlush: false, SyncInterval: 5 * time.Minute, srv: s},
StreamConfig{Name: rg.Name, Storage: FileStorage},
time.Now().UTC(),
s.jsKeyGen(s.getOpts().JetStreamKey, rg.Name),
Expand All @@ -2042,8 +2039,6 @@ func (js *jetStream) createRaftGroup(accName string, rg *raftGroup, storage Stor
s.Errorf("Error creating filestore WAL: %v", err)
return err
}
// Register our server.
fs.registerServer(s)
store = fs
} else {
ms, err := newMemStore(&StreamConfig{Name: rg.Name, Storage: MemoryStorage})
Expand Down
6 changes: 3 additions & 3 deletions server/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -3665,14 +3665,14 @@ func (mset *stream) setupStore(fsCfg *FileStoreConfig) error {
fsCfg.Cipher = s.getOpts().JetStreamCipher
}
oldprf := s.jsKeyGen(s.getOpts().JetStreamOldKey, mset.acc.Name)
fs, err := newFileStoreWithCreated(*fsCfg, mset.cfg, mset.created, prf, oldprf)
cfg := *fsCfg
cfg.srv = s
fs, err := newFileStoreWithCreated(cfg, mset.cfg, mset.created, prf, oldprf)
if err != nil {
mset.mu.Unlock()
return err
}
mset.store = fs
// Register our server.
fs.registerServer(s)
}
// This will fire the callback but we do not require the lock since md will be 0 here.
mset.store.RegisterStorageUpdates(mset.storeUpdates)
Expand Down