From aeef0eff5383b20807d1cd7b2eb8e493337d1245 Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Wed, 27 Sep 2023 16:22:15 -0700 Subject: [PATCH] Add in warnings for filestore recover state if happy path fails. Signed-off-by: Derek Collison --- server/filestore.go | 34 ++++++++++++++---- server/filestore_test.go | 69 +++++++++++++++++++++++++++++++++++++ server/jetstream_cluster.go | 9 ++--- server/stream.go | 6 ++-- 4 files changed, 101 insertions(+), 17 deletions(-) diff --git a/server/filestore.go b/server/filestore.go index eb4c694052..ef170eba4d 100644 --- a/server/filestore.go +++ b/server/filestore.go @@ -60,6 +60,9 @@ type FileStoreConfig struct { Cipher StoreCipher // Compression is the algorithm to use when compressing. Compression StoreCompression + + // Internal reference to our server. + srv *Server } // FileStreamInfo allows us to remember created time. @@ -387,6 +390,7 @@ func newFileStoreWithCreated(fcfg FileStoreConfig, cfg StreamConfig, created tim qch: make(chan struct{}), fch: make(chan struct{}, 1), fsld: make(chan struct{}), + srv: fcfg.srv, } // Set flush in place to AsyncFlush which by default is false. @@ -527,12 +531,6 @@ func newFileStoreWithCreated(fcfg FileStoreConfig, cfg StreamConfig, created tim return fs, nil } -func (fs *fileStore) registerServer(s *Server) { - fs.mu.Lock() - defer fs.mu.Unlock() - fs.srv = s -} - // Lock all existing message blocks. // Lock held on entry. func (fs *fileStore) lockAllMsgBlocks() { @@ -1436,6 +1434,16 @@ func (mb *msgBlock) rebuildStateLocked() (*LostStreamData, []uint64, error) { return nil, tombstones, nil } +// For doing warn logging. +// Lock should be held. +func (fs *fileStore) warn(format string, args ...any) { + // No-op if no server configured. + if fs.srv == nil { + return + } + fs.srv.Warnf(fmt.Sprintf("Filestore [%s] %s", fs.cfg.Name, format), args...) +} + // recoverFullState will attempt to receover our last full state and re-process any state changes // that happened afterwards. func (fs *fileStore) recoverFullState() (rerr error) { @@ -1455,12 +1463,16 @@ func (fs *fileStore) recoverFullState() (rerr error) { dios <- struct{}{} if err != nil { + if !os.IsNotExist(err) { + fs.warn("Could not read stream state file: %v", err) + } return err } const minLen = 32 if len(buf) < minLen { os.Remove(fn) + fs.warn("Stream state too short (%d bytes)", len(buf)) return errCorruptState } @@ -1471,6 +1483,7 @@ func (fs *fileStore) recoverFullState() (rerr error) { fs.hh.Write(buf) if !bytes.Equal(h, fs.hh.Sum(nil)) { os.Remove(fn) + fs.warn("Stream state checksum did not match") return errCorruptState } @@ -1482,6 +1495,7 @@ func (fs *fileStore) recoverFullState() (rerr error) { ns := fs.aek.NonceSize() buf, err = fs.aek.Open(nil, buf[:ns], buf[ns:], nil) if err != nil { + fs.warn("Stream state error reading encryption key: %v", err) return err } } @@ -1489,6 +1503,7 @@ func (fs *fileStore) recoverFullState() (rerr error) { if buf[0] != fullStateMagic || buf[1] != fullStateVersion { os.Remove(fn) + fs.warn("Stream state magic and version mismatch") return errCorruptState } @@ -1543,6 +1558,7 @@ func (fs *fileStore) recoverFullState() (rerr error) { if lsubj := int(readU64()); lsubj > 0 { if bi+lsubj > len(buf) { os.Remove(fn) + fs.warn("Stream state bad subject len (%d)", lsubj) return errCorruptState } subj := fs.subjString(buf[bi : bi+lsubj]) @@ -1573,6 +1589,7 @@ func (fs *fileStore) recoverFullState() (rerr error) { dmap, n, err := avl.Decode(buf[bi:]) if err != nil { os.Remove(fn) + fs.warn("Stream state error decoding avl dmap: %v", err) return errCorruptState } mb.dmap = *dmap @@ -1605,6 +1622,7 @@ func (fs *fileStore) recoverFullState() (rerr error) { // Check if we had any errors. if bi < 0 { os.Remove(fn) + fs.warn("Stream state has no checksum present") return errCorruptState } @@ -1621,9 +1639,9 @@ func (fs *fileStore) recoverFullState() (rerr error) { if ld, _, _ := mb.rebuildState(); ld != nil { fs.addLostData(ld) } + fs.warn("Stream state detected prior state, could not locate msg block %d", blkIndex) return errPriorState } - if matched = bytes.Equal(mb.lastChecksum(), lchk[:]); !matched { // Remove the last message block since we will re-process below. fs.removeMsgBlockFromList(mb) @@ -1644,12 +1662,14 @@ func (fs *fileStore) recoverFullState() (rerr error) { return nil } os.Remove(fn) + fs.warn("Stream state could not recover msg block %d", bi) return err } if nmb != nil { // Check if we have to account for a partial message block. if !matched && mb != nil && mb.index == nmb.index { if err := fs.adjustAccounting(mb, nmb); err != nil { + fs.warn("Stream state could not adjust accounting: %v", err) return err } } diff --git a/server/filestore_test.go b/server/filestore_test.go index a961ef9e11..05e082043d 100644 --- a/server/filestore_test.go +++ b/server/filestore_test.go @@ -6096,6 +6096,75 @@ func TestFileStoreRemoveLastNoDoubleTombstones(t *testing.T) { require_Equal(t, rbytes, emptyRecordLen) } +func TestFileStoreFullStateMultiBlockPastWAL(t *testing.T) { + testFileStoreAllPermutations(t, func(t *testing.T, fcfg FileStoreConfig) { + fcfg.BlockSize = 100 + scfg := StreamConfig{Name: "zzz", Subjects: []string{"*"}, Storage: FileStorage} + + prf := func(context []byte) ([]byte, error) { + h := hmac.New(sha256.New, []byte("dlc22")) + if _, err := h.Write(context); err != nil { + return nil, err + } + return h.Sum(nil), nil + } + if fcfg.Cipher == NoCipher { + prf = nil + } + + fs, err := newFileStoreWithCreated(fcfg, scfg, time.Now(), prf, nil) + require_NoError(t, err) + defer fs.Stop() + + // This yields an internal record length of 50 bytes. So 2 msgs per blk. + msgLen := 19 + msgA := bytes.Repeat([]byte("A"), msgLen) + msgZ := bytes.Repeat([]byte("Z"), msgLen) + + // Store 2 msgs + fs.StoreMsg("A", nil, msgA) + fs.StoreMsg("B", nil, msgZ) + require_Equal(t, fs.numMsgBlocks(), 1) + fs.Stop() + + // Grab the state from this stop. + sfile := filepath.Join(fcfg.StoreDir, msgDir, streamStreamStateFile) + buf, err := os.ReadFile(sfile) + require_NoError(t, err) + + fs, err = newFileStoreWithCreated(fcfg, scfg, time.Now(), prf, nil) + require_NoError(t, err) + defer fs.Stop() + + // Store 2 more msgs and delete 2 & 4, then another 2 msgs. + fs.StoreMsg("C", nil, msgA) + fs.StoreMsg("D", nil, msgZ) + fs.StoreMsg("E", nil, msgA) + fs.StoreMsg("F", nil, msgZ) + fs.StoreMsg("G", nil, msgA) + fs.StoreMsg("H", nil, msgZ) + require_Equal(t, fs.numMsgBlocks(), 4) + state := fs.State() + fs.Stop() + + // Put back old stream state. + // This will test that we properly walk multiple blocks past where we snapshotted state. + fs.Stop() + err = os.WriteFile(sfile, buf, defaultFilePerms) + require_NoError(t, err) + + fs, err = newFileStoreWithCreated(fcfg, scfg, time.Now(), prf, nil) + require_NoError(t, err) + defer fs.Stop() + + if newState := fs.State(); !reflect.DeepEqual(state, newState) { + t.Fatalf("Restore state does not match:\n%+v\n%+v", + state, newState) + } + require_True(t, !state.FirstTime.IsZero()) + }) +} + /////////////////////////////////////////////////////////////////////////// // Benchmarks /////////////////////////////////////////////////////////////////////////// diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index 6a94329a3c..b5c02bd5fb 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -755,7 +755,7 @@ func (js *jetStream) setupMetaGroup() error { storeDir := filepath.Join(js.config.StoreDir, sysAcc.Name, defaultStoreDirName, defaultMetaGroupName) fs, err := newFileStoreWithCreated( - FileStoreConfig{StoreDir: storeDir, BlockSize: defaultMetaFSBlkSize, AsyncFlush: false}, + FileStoreConfig{StoreDir: storeDir, BlockSize: defaultMetaFSBlkSize, AsyncFlush: false, srv: s}, StreamConfig{Name: defaultMetaGroupName, Storage: FileStorage}, time.Now().UTC(), s.jsKeyGen(s.getOpts().JetStreamKey, defaultMetaGroupName), @@ -766,9 +766,6 @@ func (js *jetStream) setupMetaGroup() error { return err } - // Register our server. - fs.registerServer(s) - cfg := &RaftConfig{Name: defaultMetaGroupName, Store: storeDir, Log: fs} // If we are soliciting leafnode connections and we are sharing a system account and do not disable it with a hint, @@ -2032,7 +2029,7 @@ func (js *jetStream) createRaftGroup(accName string, rg *raftGroup, storage Stor var store StreamStore if storage == FileStorage { fs, err := newFileStoreWithCreated( - FileStoreConfig{StoreDir: storeDir, BlockSize: defaultMediumBlockSize, AsyncFlush: false, SyncInterval: 5 * time.Minute}, + FileStoreConfig{StoreDir: storeDir, BlockSize: defaultMediumBlockSize, AsyncFlush: false, SyncInterval: 5 * time.Minute, srv: s}, StreamConfig{Name: rg.Name, Storage: FileStorage}, time.Now().UTC(), s.jsKeyGen(s.getOpts().JetStreamKey, rg.Name), @@ -2042,8 +2039,6 @@ func (js *jetStream) createRaftGroup(accName string, rg *raftGroup, storage Stor s.Errorf("Error creating filestore WAL: %v", err) return err } - // Register our server. - fs.registerServer(s) store = fs } else { ms, err := newMemStore(&StreamConfig{Name: rg.Name, Storage: MemoryStorage}) diff --git a/server/stream.go b/server/stream.go index b907e8de5d..80afc26ca6 100644 --- a/server/stream.go +++ b/server/stream.go @@ -3665,14 +3665,14 @@ func (mset *stream) setupStore(fsCfg *FileStoreConfig) error { fsCfg.Cipher = s.getOpts().JetStreamCipher } oldprf := s.jsKeyGen(s.getOpts().JetStreamOldKey, mset.acc.Name) - fs, err := newFileStoreWithCreated(*fsCfg, mset.cfg, mset.created, prf, oldprf) + cfg := *fsCfg + cfg.srv = s + fs, err := newFileStoreWithCreated(cfg, mset.cfg, mset.created, prf, oldprf) if err != nil { mset.mu.Unlock() return err } mset.store = fs - // Register our server. - fs.registerServer(s) } // This will fire the callback but we do not require the lock since md will be 0 here. mset.store.RegisterStorageUpdates(mset.storeUpdates)