Skip to content

Commit

Permalink
Ensure file path is correct during stream restore
Browse files Browse the repository at this point in the history
Also had to change all references from `path.` to `filepath.` when
dealing with files, so that it works properly on Windows.

Fixed also lots of tests to defer the shutdown of the server
after the removal of the storage, and fixed some config files
directories to use the single quote `'` to surround the file path,
again to work on Windows.

Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
  • Loading branch information
kozlovic committed Mar 9, 2022
1 parent 0cb0f6d commit b412869
Show file tree
Hide file tree
Showing 23 changed files with 467 additions and 496 deletions.
87 changes: 43 additions & 44 deletions server/filestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import (
"io/ioutil"
"net"
"os"
"path"
"path/filepath"
"runtime"
"sort"
Expand Down Expand Up @@ -296,8 +295,8 @@ func newFileStoreWithCreated(fcfg FileStoreConfig, cfg StreamConfig, created tim
fs.fip = !fcfg.AsyncFlush

// Check if this is a new setup.
mdir := path.Join(fcfg.StoreDir, msgDir)
odir := path.Join(fcfg.StoreDir, consumerDir)
mdir := filepath.Join(fcfg.StoreDir, msgDir)
odir := filepath.Join(fcfg.StoreDir, consumerDir)
if err := os.MkdirAll(mdir, defaultDirPerms); err != nil {
return nil, fmt.Errorf("could not create message storage directory - %v", err)
}
Expand All @@ -321,7 +320,7 @@ func newFileStoreWithCreated(fcfg FileStoreConfig, cfg StreamConfig, created tim
}

// Write our meta data iff does not exist.
meta := path.Join(fcfg.StoreDir, JetStreamMetaFile)
meta := filepath.Join(fcfg.StoreDir, JetStreamMetaFile)
if _, err := os.Stat(meta); err != nil && os.IsNotExist(err) {
if err := fs.writeStreamMeta(); err != nil {
return nil, err
Expand All @@ -331,7 +330,7 @@ func newFileStoreWithCreated(fcfg FileStoreConfig, cfg StreamConfig, created tim
// If we expect to be encrypted check that what we are restoring is not plaintext.
// This can happen on snapshot restores or conversions.
if fs.prf != nil {
keyFile := path.Join(fs.fcfg.StoreDir, JetStreamMetaFileKey)
keyFile := filepath.Join(fs.fcfg.StoreDir, JetStreamMetaFileKey)
if _, err := os.Stat(keyFile); err != nil && os.IsNotExist(err) {
if err := fs.writeStreamMeta(); err != nil {
return nil, err
Expand Down Expand Up @@ -454,7 +453,7 @@ func (fs *fileStore) writeStreamMeta() error {
return err
}
fs.aek = key
keyFile := path.Join(fs.fcfg.StoreDir, JetStreamMetaFileKey)
keyFile := filepath.Join(fs.fcfg.StoreDir, JetStreamMetaFileKey)
if _, err := os.Stat(keyFile); err != nil && !os.IsNotExist(err) {
return err
}
Expand All @@ -463,7 +462,7 @@ func (fs *fileStore) writeStreamMeta() error {
}
}

meta := path.Join(fs.fcfg.StoreDir, JetStreamMetaFile)
meta := filepath.Join(fs.fcfg.StoreDir, JetStreamMetaFile)
if _, err := os.Stat(meta); err != nil && !os.IsNotExist(err) {
return err
}
Expand All @@ -484,7 +483,7 @@ func (fs *fileStore) writeStreamMeta() error {
fs.hh.Reset()
fs.hh.Write(b)
checksum := hex.EncodeToString(fs.hh.Sum(nil))
sum := path.Join(fs.fcfg.StoreDir, JetStreamMetaFileSum)
sum := filepath.Join(fs.fcfg.StoreDir, JetStreamMetaFileSum)
if err := ioutil.WriteFile(sum, []byte(checksum), defaultFilePerms); err != nil {
return err
}
Expand All @@ -503,10 +502,10 @@ const indexHdrSize = 7*binary.MaxVarintLen64 + hdrLen + checksumSize
func (fs *fileStore) recoverMsgBlock(fi os.FileInfo, index uint64) (*msgBlock, error) {
mb := &msgBlock{fs: fs, index: index, cexp: fs.fcfg.CacheExpire}

mdir := path.Join(fs.fcfg.StoreDir, msgDir)
mb.mfn = path.Join(mdir, fi.Name())
mb.ifn = path.Join(mdir, fmt.Sprintf(indexScan, index))
mb.sfn = path.Join(mdir, fmt.Sprintf(fssScan, index))
mdir := filepath.Join(fs.fcfg.StoreDir, msgDir)
mb.mfn = filepath.Join(mdir, fi.Name())
mb.ifn = filepath.Join(mdir, fmt.Sprintf(indexScan, index))
mb.sfn = filepath.Join(mdir, fmt.Sprintf(fssScan, index))

if mb.hh == nil {
key := sha256.Sum256(fs.hashKeyForBlock(index))
Expand All @@ -517,7 +516,7 @@ func (fs *fileStore) recoverMsgBlock(fi os.FileInfo, index uint64) (*msgBlock, e

// Check if encryption is enabled.
if fs.prf != nil {
ekey, err := ioutil.ReadFile(path.Join(mdir, fmt.Sprintf(keyScan, mb.index)))
ekey, err := ioutil.ReadFile(filepath.Join(mdir, fmt.Sprintf(keyScan, mb.index)))
if err != nil {
// We do not seem to have keys even though we should. Could be a plaintext conversion.
// Create the keys and we will double check below.
Expand Down Expand Up @@ -863,12 +862,12 @@ func (fs *fileStore) recoverMsgs() error {
defer fs.mu.Unlock()

// Check for any left over purged messages.
pdir := path.Join(fs.fcfg.StoreDir, purgeDir)
pdir := filepath.Join(fs.fcfg.StoreDir, purgeDir)
if _, err := os.Stat(pdir); err == nil {
os.RemoveAll(pdir)
}

mdir := path.Join(fs.fcfg.StoreDir, msgDir)
mdir := filepath.Join(fs.fcfg.StoreDir, msgDir)
fis, err := ioutil.ReadDir(mdir)
if err != nil {
return errNotReadable
Expand Down Expand Up @@ -916,21 +915,21 @@ func (fs *fileStore) recoverMsgs() error {

// We had a bug that would leave fss files around during a snapshot.
// Clean them up here if we see them.
if fms, err := filepath.Glob(path.Join(mdir, fssScanAll)); err == nil && len(fms) > 0 {
if fms, err := filepath.Glob(filepath.Join(mdir, fssScanAll)); err == nil && len(fms) > 0 {
for _, fn := range fms {
os.Remove(fn)
}
}
// Same bug for keyfiles but for these we just need to identify orphans.
if kms, err := filepath.Glob(path.Join(mdir, keyScanAll)); err == nil && len(kms) > 0 {
if kms, err := filepath.Glob(filepath.Join(mdir, keyScanAll)); err == nil && len(kms) > 0 {
valid := make(map[uint64]bool)
for _, mb := range fs.blks {
valid[mb.index] = true
}
for _, fn := range kms {
var index uint64
shouldRemove := true
if n, err := fmt.Sscanf(path.Base(fn), keyScan, &index); err == nil && n == 1 && valid[index] {
if n, err := fmt.Sscanf(filepath.Base(fn), keyScan, &index); err == nil && n == 1 && valid[index] {
shouldRemove = false
}
if shouldRemove {
Expand Down Expand Up @@ -1516,16 +1515,16 @@ func (fs *fileStore) newMsgBlockForWrite() (*msgBlock, error) {
}
mb.hh = hh

mdir := path.Join(fs.fcfg.StoreDir, msgDir)
mb.mfn = path.Join(mdir, fmt.Sprintf(blkScan, mb.index))
mdir := filepath.Join(fs.fcfg.StoreDir, msgDir)
mb.mfn = filepath.Join(mdir, fmt.Sprintf(blkScan, mb.index))
mfd, err := os.OpenFile(mb.mfn, os.O_CREATE|os.O_RDWR, defaultFilePerms)
if err != nil {
mb.dirtyCloseWithRemove(true)
return nil, fmt.Errorf("Error creating msg block file [%q]: %v", mb.mfn, err)
}
mb.mfd = mfd

mb.ifn = path.Join(mdir, fmt.Sprintf(indexScan, mb.index))
mb.ifn = filepath.Join(mdir, fmt.Sprintf(indexScan, mb.index))
ifd, err := os.OpenFile(mb.ifn, os.O_CREATE|os.O_RDWR, defaultFilePerms)
if err != nil {
mb.dirtyCloseWithRemove(true)
Expand All @@ -1534,7 +1533,7 @@ func (fs *fileStore) newMsgBlockForWrite() (*msgBlock, error) {
mb.ifd = ifd

// For subject based info.
mb.sfn = path.Join(mdir, fmt.Sprintf(fssScan, mb.index))
mb.sfn = filepath.Join(mdir, fmt.Sprintf(fssScan, mb.index))

// Check if encryption is enabled.
if fs.prf != nil {
Expand Down Expand Up @@ -1576,8 +1575,8 @@ func (fs *fileStore) genEncryptionKeysForBlock(mb *msgBlock) error {
return err
}
mb.aek, mb.bek, mb.seed, mb.nonce = key, bek, seed, encrypted[:key.NonceSize()]
mdir := path.Join(fs.fcfg.StoreDir, msgDir)
keyFile := path.Join(mdir, fmt.Sprintf(keyScan, mb.index))
mdir := filepath.Join(fs.fcfg.StoreDir, msgDir)
keyFile := filepath.Join(mdir, fmt.Sprintf(keyScan, mb.index))
if _, err := os.Stat(keyFile); err != nil && !os.IsNotExist(err) {
return err
}
Expand Down Expand Up @@ -2119,7 +2118,7 @@ func (mb *msgBlock) compact() {
mb.closeFDsLocked()

// We will write to a new file and mv/rename it in case of failure.
mfn := path.Join(path.Join(mb.fs.fcfg.StoreDir, msgDir), fmt.Sprintf(newScan, mb.index))
mfn := filepath.Join(filepath.Join(mb.fs.fcfg.StoreDir, msgDir), fmt.Sprintf(newScan, mb.index))
defer os.Remove(mfn)
if err := ioutil.WriteFile(mfn, nbuf, defaultFilePerms); err != nil {
return
Expand Down Expand Up @@ -4061,8 +4060,8 @@ func (fs *fileStore) purge(fseq uint64) (uint64, error) {

// Move the msgs directory out of the way, will delete out of band.
// FIXME(dlc) - These can error and we need to change api above to propagate?
mdir := path.Join(fs.fcfg.StoreDir, msgDir)
pdir := path.Join(fs.fcfg.StoreDir, purgeDir)
mdir := filepath.Join(fs.fcfg.StoreDir, msgDir)
pdir := filepath.Join(fs.fcfg.StoreDir, purgeDir)
// If purge directory still exists then we need to wait
// in place and remove since rename would fail.
if _, err := os.Stat(pdir); err == nil {
Expand Down Expand Up @@ -4593,7 +4592,7 @@ func (fs *fileStore) Delete() error {
}
fs.Purge()

pdir := path.Join(fs.fcfg.StoreDir, purgeDir)
pdir := filepath.Join(fs.fcfg.StoreDir, purgeDir)
// If purge directory still exists then we need to wait
// in place and remove since rename would fail.
if _, err := os.Stat(pdir); err == nil {
Expand Down Expand Up @@ -4820,13 +4819,13 @@ func (fs *fileStore) streamSnapshot(w io.WriteCloser, state *StreamState, includ
o.mu.Unlock()

// Write all the consumer files.
if writeFile(path.Join(odirPre, JetStreamMetaFile), meta) != nil {
if writeFile(filepath.Join(odirPre, JetStreamMetaFile), meta) != nil {
return
}
if writeFile(path.Join(odirPre, JetStreamMetaFileSum), sum) != nil {
if writeFile(filepath.Join(odirPre, JetStreamMetaFileSum), sum) != nil {
return
}
writeFile(path.Join(odirPre, consumerState), state)
writeFile(filepath.Join(odirPre, consumerState), state)
}
}

Expand Down Expand Up @@ -4909,7 +4908,7 @@ func (fs *fileStore) ConsumerStore(name string, cfg *ConsumerConfig) (ConsumerSt
if cfg == nil || name == _EMPTY_ {
return nil, fmt.Errorf("bad consumer config")
}
odir := path.Join(fs.fcfg.StoreDir, consumerDir, name)
odir := filepath.Join(fs.fcfg.StoreDir, consumerDir, name)
if err := os.MkdirAll(odir, defaultDirPerms); err != nil {
return nil, fmt.Errorf("could not create consumer directory - %v", err)
}
Expand All @@ -4920,7 +4919,7 @@ func (fs *fileStore) ConsumerStore(name string, cfg *ConsumerConfig) (ConsumerSt
prf: fs.prf,
name: name,
odir: odir,
ifn: path.Join(odir, consumerState),
ifn: filepath.Join(odir, consumerState),
}
key := sha256.Sum256([]byte(fs.cfg.Name + "/" + name))
hh, err := highwayhash.New64(key[:])
Expand All @@ -4931,7 +4930,7 @@ func (fs *fileStore) ConsumerStore(name string, cfg *ConsumerConfig) (ConsumerSt

// Check for encryption.
if o.prf != nil {
if ekey, err := ioutil.ReadFile(path.Join(odir, JetStreamMetaFileKey)); err == nil {
if ekey, err := ioutil.ReadFile(filepath.Join(odir, JetStreamMetaFileKey)); err == nil {
// Recover key encryption key.
rb, err := fs.prf([]byte(fs.cfg.Name + tsep + o.name))
if err != nil {
Expand All @@ -4953,7 +4952,7 @@ func (fs *fileStore) ConsumerStore(name string, cfg *ConsumerConfig) (ConsumerSt
}

// Write our meta data iff does not exist.
meta := path.Join(odir, JetStreamMetaFile)
meta := filepath.Join(odir, JetStreamMetaFile)
if _, err := os.Stat(meta); err != nil && os.IsNotExist(err) {
csi.Created = time.Now().UTC()
if err := o.writeConsumerMeta(); err != nil {
Expand All @@ -4964,7 +4963,7 @@ func (fs *fileStore) ConsumerStore(name string, cfg *ConsumerConfig) (ConsumerSt
// If we expect to be encrypted check that what we are restoring is not plaintext.
// This can happen on snapshot restores or conversions.
if o.prf != nil {
keyFile := path.Join(odir, JetStreamMetaFileKey)
keyFile := filepath.Join(odir, JetStreamMetaFileKey)
if _, err := os.Stat(keyFile); err != nil && os.IsNotExist(err) {
if err := o.writeConsumerMeta(); err != nil {
return nil, err
Expand Down Expand Up @@ -5418,7 +5417,7 @@ func (o *consumerFileStore) updateConfig(cfg ConsumerConfig) error {
// Write out the consumer meta data, i.e. state.
// Lock should be held.
func (cfs *consumerFileStore) writeConsumerMeta() error {
meta := path.Join(cfs.odir, JetStreamMetaFile)
meta := filepath.Join(cfs.odir, JetStreamMetaFile)
if _, err := os.Stat(meta); err != nil && !os.IsNotExist(err) {
return err
}
Expand All @@ -5430,7 +5429,7 @@ func (cfs *consumerFileStore) writeConsumerMeta() error {
return err
}
cfs.aek = key
keyFile := path.Join(cfs.odir, JetStreamMetaFileKey)
keyFile := filepath.Join(cfs.odir, JetStreamMetaFileKey)
if _, err := os.Stat(keyFile); err != nil && !os.IsNotExist(err) {
return err
}
Expand All @@ -5456,7 +5455,7 @@ func (cfs *consumerFileStore) writeConsumerMeta() error {
cfs.hh.Reset()
cfs.hh.Write(b)
checksum := hex.EncodeToString(cfs.hh.Sum(nil))
sum := path.Join(cfs.odir, JetStreamMetaFileSum)
sum := filepath.Join(cfs.odir, JetStreamMetaFileSum)
if err := ioutil.WriteFile(sum, []byte(checksum), defaultFilePerms); err != nil {
return err
}
Expand Down Expand Up @@ -5787,7 +5786,7 @@ type templateFileStore struct {
}

func newTemplateFileStore(storeDir string) *templateFileStore {
tdir := path.Join(storeDir, tmplsDir)
tdir := filepath.Join(storeDir, tmplsDir)
key := sha256.Sum256([]byte("templates"))
hh, err := highwayhash.New64(key[:])
if err != nil {
Expand All @@ -5797,11 +5796,11 @@ func newTemplateFileStore(storeDir string) *templateFileStore {
}

func (ts *templateFileStore) Store(t *streamTemplate) error {
dir := path.Join(ts.dir, t.Name)
dir := filepath.Join(ts.dir, t.Name)
if err := os.MkdirAll(dir, defaultDirPerms); err != nil {
return fmt.Errorf("could not create templates storage directory for %q- %v", t.Name, err)
}
meta := path.Join(dir, JetStreamMetaFile)
meta := filepath.Join(dir, JetStreamMetaFile)
if _, err := os.Stat(meta); (err != nil && !os.IsNotExist(err)) || err == nil {
return err
}
Expand All @@ -5818,13 +5817,13 @@ func (ts *templateFileStore) Store(t *streamTemplate) error {
ts.hh.Reset()
ts.hh.Write(b)
checksum := hex.EncodeToString(ts.hh.Sum(nil))
sum := path.Join(dir, JetStreamMetaFileSum)
sum := filepath.Join(dir, JetStreamMetaFileSum)
if err := ioutil.WriteFile(sum, []byte(checksum), defaultFilePerms); err != nil {
return err
}
return nil
}

func (ts *templateFileStore) Delete(t *streamTemplate) error {
return os.RemoveAll(path.Join(ts.dir, t.Name))
return os.RemoveAll(filepath.Join(ts.dir, t.Name))
}

0 comments on commit b412869

Please sign in to comment.