Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ensure file path is correct during stream restore #2917

Merged
merged 1 commit into from Mar 9, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
87 changes: 43 additions & 44 deletions server/filestore.go
Expand Up @@ -29,7 +29,6 @@ import (
"io/ioutil"
"net"
"os"
"path"
"path/filepath"
"runtime"
"sort"
Expand Down Expand Up @@ -296,8 +295,8 @@ func newFileStoreWithCreated(fcfg FileStoreConfig, cfg StreamConfig, created tim
fs.fip = !fcfg.AsyncFlush

// Check if this is a new setup.
mdir := path.Join(fcfg.StoreDir, msgDir)
odir := path.Join(fcfg.StoreDir, consumerDir)
mdir := filepath.Join(fcfg.StoreDir, msgDir)
odir := filepath.Join(fcfg.StoreDir, consumerDir)
if err := os.MkdirAll(mdir, defaultDirPerms); err != nil {
return nil, fmt.Errorf("could not create message storage directory - %v", err)
}
Expand All @@ -321,7 +320,7 @@ func newFileStoreWithCreated(fcfg FileStoreConfig, cfg StreamConfig, created tim
}

// Write our meta data iff does not exist.
meta := path.Join(fcfg.StoreDir, JetStreamMetaFile)
meta := filepath.Join(fcfg.StoreDir, JetStreamMetaFile)
if _, err := os.Stat(meta); err != nil && os.IsNotExist(err) {
if err := fs.writeStreamMeta(); err != nil {
return nil, err
Expand All @@ -331,7 +330,7 @@ func newFileStoreWithCreated(fcfg FileStoreConfig, cfg StreamConfig, created tim
// If we expect to be encrypted check that what we are restoring is not plaintext.
// This can happen on snapshot restores or conversions.
if fs.prf != nil {
keyFile := path.Join(fs.fcfg.StoreDir, JetStreamMetaFileKey)
keyFile := filepath.Join(fs.fcfg.StoreDir, JetStreamMetaFileKey)
if _, err := os.Stat(keyFile); err != nil && os.IsNotExist(err) {
if err := fs.writeStreamMeta(); err != nil {
return nil, err
Expand Down Expand Up @@ -454,7 +453,7 @@ func (fs *fileStore) writeStreamMeta() error {
return err
}
fs.aek = key
keyFile := path.Join(fs.fcfg.StoreDir, JetStreamMetaFileKey)
keyFile := filepath.Join(fs.fcfg.StoreDir, JetStreamMetaFileKey)
if _, err := os.Stat(keyFile); err != nil && !os.IsNotExist(err) {
return err
}
Expand All @@ -463,7 +462,7 @@ func (fs *fileStore) writeStreamMeta() error {
}
}

meta := path.Join(fs.fcfg.StoreDir, JetStreamMetaFile)
meta := filepath.Join(fs.fcfg.StoreDir, JetStreamMetaFile)
if _, err := os.Stat(meta); err != nil && !os.IsNotExist(err) {
return err
}
Expand All @@ -484,7 +483,7 @@ func (fs *fileStore) writeStreamMeta() error {
fs.hh.Reset()
fs.hh.Write(b)
checksum := hex.EncodeToString(fs.hh.Sum(nil))
sum := path.Join(fs.fcfg.StoreDir, JetStreamMetaFileSum)
sum := filepath.Join(fs.fcfg.StoreDir, JetStreamMetaFileSum)
if err := ioutil.WriteFile(sum, []byte(checksum), defaultFilePerms); err != nil {
return err
}
Expand All @@ -503,10 +502,10 @@ const indexHdrSize = 7*binary.MaxVarintLen64 + hdrLen + checksumSize
func (fs *fileStore) recoverMsgBlock(fi os.FileInfo, index uint64) (*msgBlock, error) {
mb := &msgBlock{fs: fs, index: index, cexp: fs.fcfg.CacheExpire}

mdir := path.Join(fs.fcfg.StoreDir, msgDir)
mb.mfn = path.Join(mdir, fi.Name())
mb.ifn = path.Join(mdir, fmt.Sprintf(indexScan, index))
mb.sfn = path.Join(mdir, fmt.Sprintf(fssScan, index))
mdir := filepath.Join(fs.fcfg.StoreDir, msgDir)
mb.mfn = filepath.Join(mdir, fi.Name())
mb.ifn = filepath.Join(mdir, fmt.Sprintf(indexScan, index))
mb.sfn = filepath.Join(mdir, fmt.Sprintf(fssScan, index))

if mb.hh == nil {
key := sha256.Sum256(fs.hashKeyForBlock(index))
Expand All @@ -517,7 +516,7 @@ func (fs *fileStore) recoverMsgBlock(fi os.FileInfo, index uint64) (*msgBlock, e

// Check if encryption is enabled.
if fs.prf != nil {
ekey, err := ioutil.ReadFile(path.Join(mdir, fmt.Sprintf(keyScan, mb.index)))
ekey, err := ioutil.ReadFile(filepath.Join(mdir, fmt.Sprintf(keyScan, mb.index)))
if err != nil {
// We do not seem to have keys even though we should. Could be a plaintext conversion.
// Create the keys and we will double check below.
Expand Down Expand Up @@ -863,12 +862,12 @@ func (fs *fileStore) recoverMsgs() error {
defer fs.mu.Unlock()

// Check for any left over purged messages.
pdir := path.Join(fs.fcfg.StoreDir, purgeDir)
pdir := filepath.Join(fs.fcfg.StoreDir, purgeDir)
if _, err := os.Stat(pdir); err == nil {
os.RemoveAll(pdir)
}

mdir := path.Join(fs.fcfg.StoreDir, msgDir)
mdir := filepath.Join(fs.fcfg.StoreDir, msgDir)
fis, err := ioutil.ReadDir(mdir)
if err != nil {
return errNotReadable
Expand Down Expand Up @@ -916,21 +915,21 @@ func (fs *fileStore) recoverMsgs() error {

// We had a bug that would leave fss files around during a snapshot.
// Clean them up here if we see them.
if fms, err := filepath.Glob(path.Join(mdir, fssScanAll)); err == nil && len(fms) > 0 {
if fms, err := filepath.Glob(filepath.Join(mdir, fssScanAll)); err == nil && len(fms) > 0 {
for _, fn := range fms {
os.Remove(fn)
}
}
// Same bug for keyfiles but for these we just need to identify orphans.
if kms, err := filepath.Glob(path.Join(mdir, keyScanAll)); err == nil && len(kms) > 0 {
if kms, err := filepath.Glob(filepath.Join(mdir, keyScanAll)); err == nil && len(kms) > 0 {
valid := make(map[uint64]bool)
for _, mb := range fs.blks {
valid[mb.index] = true
}
for _, fn := range kms {
var index uint64
shouldRemove := true
if n, err := fmt.Sscanf(path.Base(fn), keyScan, &index); err == nil && n == 1 && valid[index] {
if n, err := fmt.Sscanf(filepath.Base(fn), keyScan, &index); err == nil && n == 1 && valid[index] {
shouldRemove = false
}
if shouldRemove {
Expand Down Expand Up @@ -1516,16 +1515,16 @@ func (fs *fileStore) newMsgBlockForWrite() (*msgBlock, error) {
}
mb.hh = hh

mdir := path.Join(fs.fcfg.StoreDir, msgDir)
mb.mfn = path.Join(mdir, fmt.Sprintf(blkScan, mb.index))
mdir := filepath.Join(fs.fcfg.StoreDir, msgDir)
mb.mfn = filepath.Join(mdir, fmt.Sprintf(blkScan, mb.index))
mfd, err := os.OpenFile(mb.mfn, os.O_CREATE|os.O_RDWR, defaultFilePerms)
if err != nil {
mb.dirtyCloseWithRemove(true)
return nil, fmt.Errorf("Error creating msg block file [%q]: %v", mb.mfn, err)
}
mb.mfd = mfd

mb.ifn = path.Join(mdir, fmt.Sprintf(indexScan, mb.index))
mb.ifn = filepath.Join(mdir, fmt.Sprintf(indexScan, mb.index))
ifd, err := os.OpenFile(mb.ifn, os.O_CREATE|os.O_RDWR, defaultFilePerms)
if err != nil {
mb.dirtyCloseWithRemove(true)
Expand All @@ -1534,7 +1533,7 @@ func (fs *fileStore) newMsgBlockForWrite() (*msgBlock, error) {
mb.ifd = ifd

// For subject based info.
mb.sfn = path.Join(mdir, fmt.Sprintf(fssScan, mb.index))
mb.sfn = filepath.Join(mdir, fmt.Sprintf(fssScan, mb.index))

// Check if encryption is enabled.
if fs.prf != nil {
Expand Down Expand Up @@ -1576,8 +1575,8 @@ func (fs *fileStore) genEncryptionKeysForBlock(mb *msgBlock) error {
return err
}
mb.aek, mb.bek, mb.seed, mb.nonce = key, bek, seed, encrypted[:key.NonceSize()]
mdir := path.Join(fs.fcfg.StoreDir, msgDir)
keyFile := path.Join(mdir, fmt.Sprintf(keyScan, mb.index))
mdir := filepath.Join(fs.fcfg.StoreDir, msgDir)
keyFile := filepath.Join(mdir, fmt.Sprintf(keyScan, mb.index))
if _, err := os.Stat(keyFile); err != nil && !os.IsNotExist(err) {
return err
}
Expand Down Expand Up @@ -2119,7 +2118,7 @@ func (mb *msgBlock) compact() {
mb.closeFDsLocked()

// We will write to a new file and mv/rename it in case of failure.
mfn := path.Join(path.Join(mb.fs.fcfg.StoreDir, msgDir), fmt.Sprintf(newScan, mb.index))
mfn := filepath.Join(filepath.Join(mb.fs.fcfg.StoreDir, msgDir), fmt.Sprintf(newScan, mb.index))
defer os.Remove(mfn)
if err := ioutil.WriteFile(mfn, nbuf, defaultFilePerms); err != nil {
return
Expand Down Expand Up @@ -4061,8 +4060,8 @@ func (fs *fileStore) purge(fseq uint64) (uint64, error) {

// Move the msgs directory out of the way, will delete out of band.
// FIXME(dlc) - These can error and we need to change api above to propagate?
mdir := path.Join(fs.fcfg.StoreDir, msgDir)
pdir := path.Join(fs.fcfg.StoreDir, purgeDir)
mdir := filepath.Join(fs.fcfg.StoreDir, msgDir)
pdir := filepath.Join(fs.fcfg.StoreDir, purgeDir)
// If purge directory still exists then we need to wait
// in place and remove since rename would fail.
if _, err := os.Stat(pdir); err == nil {
Expand Down Expand Up @@ -4593,7 +4592,7 @@ func (fs *fileStore) Delete() error {
}
fs.Purge()

pdir := path.Join(fs.fcfg.StoreDir, purgeDir)
pdir := filepath.Join(fs.fcfg.StoreDir, purgeDir)
// If purge directory still exists then we need to wait
// in place and remove since rename would fail.
if _, err := os.Stat(pdir); err == nil {
Expand Down Expand Up @@ -4820,13 +4819,13 @@ func (fs *fileStore) streamSnapshot(w io.WriteCloser, state *StreamState, includ
o.mu.Unlock()

// Write all the consumer files.
if writeFile(path.Join(odirPre, JetStreamMetaFile), meta) != nil {
if writeFile(filepath.Join(odirPre, JetStreamMetaFile), meta) != nil {
return
}
if writeFile(path.Join(odirPre, JetStreamMetaFileSum), sum) != nil {
if writeFile(filepath.Join(odirPre, JetStreamMetaFileSum), sum) != nil {
return
}
writeFile(path.Join(odirPre, consumerState), state)
writeFile(filepath.Join(odirPre, consumerState), state)
}
}

Expand Down Expand Up @@ -4909,7 +4908,7 @@ func (fs *fileStore) ConsumerStore(name string, cfg *ConsumerConfig) (ConsumerSt
if cfg == nil || name == _EMPTY_ {
return nil, fmt.Errorf("bad consumer config")
}
odir := path.Join(fs.fcfg.StoreDir, consumerDir, name)
odir := filepath.Join(fs.fcfg.StoreDir, consumerDir, name)
if err := os.MkdirAll(odir, defaultDirPerms); err != nil {
return nil, fmt.Errorf("could not create consumer directory - %v", err)
}
Expand All @@ -4920,7 +4919,7 @@ func (fs *fileStore) ConsumerStore(name string, cfg *ConsumerConfig) (ConsumerSt
prf: fs.prf,
name: name,
odir: odir,
ifn: path.Join(odir, consumerState),
ifn: filepath.Join(odir, consumerState),
}
key := sha256.Sum256([]byte(fs.cfg.Name + "/" + name))
hh, err := highwayhash.New64(key[:])
Expand All @@ -4931,7 +4930,7 @@ func (fs *fileStore) ConsumerStore(name string, cfg *ConsumerConfig) (ConsumerSt

// Check for encryption.
if o.prf != nil {
if ekey, err := ioutil.ReadFile(path.Join(odir, JetStreamMetaFileKey)); err == nil {
if ekey, err := ioutil.ReadFile(filepath.Join(odir, JetStreamMetaFileKey)); err == nil {
// Recover key encryption key.
rb, err := fs.prf([]byte(fs.cfg.Name + tsep + o.name))
if err != nil {
Expand All @@ -4953,7 +4952,7 @@ func (fs *fileStore) ConsumerStore(name string, cfg *ConsumerConfig) (ConsumerSt
}

// Write our meta data iff does not exist.
meta := path.Join(odir, JetStreamMetaFile)
meta := filepath.Join(odir, JetStreamMetaFile)
if _, err := os.Stat(meta); err != nil && os.IsNotExist(err) {
csi.Created = time.Now().UTC()
if err := o.writeConsumerMeta(); err != nil {
Expand All @@ -4964,7 +4963,7 @@ func (fs *fileStore) ConsumerStore(name string, cfg *ConsumerConfig) (ConsumerSt
// If we expect to be encrypted check that what we are restoring is not plaintext.
// This can happen on snapshot restores or conversions.
if o.prf != nil {
keyFile := path.Join(odir, JetStreamMetaFileKey)
keyFile := filepath.Join(odir, JetStreamMetaFileKey)
if _, err := os.Stat(keyFile); err != nil && os.IsNotExist(err) {
if err := o.writeConsumerMeta(); err != nil {
return nil, err
Expand Down Expand Up @@ -5418,7 +5417,7 @@ func (o *consumerFileStore) updateConfig(cfg ConsumerConfig) error {
// Write out the consumer meta data, i.e. state.
// Lock should be held.
func (cfs *consumerFileStore) writeConsumerMeta() error {
meta := path.Join(cfs.odir, JetStreamMetaFile)
meta := filepath.Join(cfs.odir, JetStreamMetaFile)
if _, err := os.Stat(meta); err != nil && !os.IsNotExist(err) {
return err
}
Expand All @@ -5430,7 +5429,7 @@ func (cfs *consumerFileStore) writeConsumerMeta() error {
return err
}
cfs.aek = key
keyFile := path.Join(cfs.odir, JetStreamMetaFileKey)
keyFile := filepath.Join(cfs.odir, JetStreamMetaFileKey)
if _, err := os.Stat(keyFile); err != nil && !os.IsNotExist(err) {
return err
}
Expand All @@ -5456,7 +5455,7 @@ func (cfs *consumerFileStore) writeConsumerMeta() error {
cfs.hh.Reset()
cfs.hh.Write(b)
checksum := hex.EncodeToString(cfs.hh.Sum(nil))
sum := path.Join(cfs.odir, JetStreamMetaFileSum)
sum := filepath.Join(cfs.odir, JetStreamMetaFileSum)
if err := ioutil.WriteFile(sum, []byte(checksum), defaultFilePerms); err != nil {
return err
}
Expand Down Expand Up @@ -5787,7 +5786,7 @@ type templateFileStore struct {
}

func newTemplateFileStore(storeDir string) *templateFileStore {
tdir := path.Join(storeDir, tmplsDir)
tdir := filepath.Join(storeDir, tmplsDir)
key := sha256.Sum256([]byte("templates"))
hh, err := highwayhash.New64(key[:])
if err != nil {
Expand All @@ -5797,11 +5796,11 @@ func newTemplateFileStore(storeDir string) *templateFileStore {
}

func (ts *templateFileStore) Store(t *streamTemplate) error {
dir := path.Join(ts.dir, t.Name)
dir := filepath.Join(ts.dir, t.Name)
if err := os.MkdirAll(dir, defaultDirPerms); err != nil {
return fmt.Errorf("could not create templates storage directory for %q- %v", t.Name, err)
}
meta := path.Join(dir, JetStreamMetaFile)
meta := filepath.Join(dir, JetStreamMetaFile)
if _, err := os.Stat(meta); (err != nil && !os.IsNotExist(err)) || err == nil {
return err
}
Expand All @@ -5818,13 +5817,13 @@ func (ts *templateFileStore) Store(t *streamTemplate) error {
ts.hh.Reset()
ts.hh.Write(b)
checksum := hex.EncodeToString(ts.hh.Sum(nil))
sum := path.Join(dir, JetStreamMetaFileSum)
sum := filepath.Join(dir, JetStreamMetaFileSum)
if err := ioutil.WriteFile(sum, []byte(checksum), defaultFilePerms); err != nil {
return err
}
return nil
}

func (ts *templateFileStore) Delete(t *streamTemplate) error {
return os.RemoveAll(path.Join(ts.dir, t.Name))
return os.RemoveAll(filepath.Join(ts.dir, t.Name))
}