Skip to content

Commit

Permalink
Add message dat/idx file in case of "unable to verify file version" e…
Browse files Browse the repository at this point in the history
…rror

Missed from PR #1035

Resolves #1034

Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
  • Loading branch information
kozlovic committed Apr 17, 2020
1 parent c1a4776 commit 5abe9ec
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 3 deletions.
9 changes: 6 additions & 3 deletions stores/filestore.go
Expand Up @@ -2016,6 +2016,7 @@ func (fs *FileStore) newFileMsgStore(channelDirName, channel string, limits *Msg

// Use this variable for all errors below so we can do the cleanup
var err error
var fname string

// Recovery case
if doRecover {
Expand Down Expand Up @@ -2048,11 +2049,13 @@ func (fs *FileStore) newFileMsgStore(channelDirName, channel string, limits *Msg
if s, statErr := os.Stat(filepath.Join(channelDirName, idxFName)); s != nil && statErr == nil {
useIdxFile = true
}
datFile, err = ms.fm.createFile(filepath.Join(channel, fileName), defaultFileFlags, nil)
fname = filepath.Join(channel, fileName)
datFile, err = ms.fm.createFile(fname, defaultFileFlags, nil)
if err != nil {
break
}
idxFile, err = ms.fm.createFile(filepath.Join(channel, idxFName), defaultFileFlags, nil)
fname = filepath.Join(channel, idxFName)
idxFile, err = ms.fm.createFile(fname, defaultFileFlags, nil)
if err != nil {
ms.fm.unlockFile(datFile)
break
Expand Down Expand Up @@ -2125,7 +2128,7 @@ func (fs *FileStore) newFileMsgStore(channelDirName, channel string, limits *Msg
if doRecover {
action = "recover"
}
err = fmt.Errorf("unable to %s message store for [%s]: %v", action, channel, err)
err = fmt.Errorf("unable to %s message store for [%s](file: %q): %v", action, channel, fname, err)
return nil, err
}

Expand Down
52 changes: 52 additions & 0 deletions stores/filestore_msg_test.go
Expand Up @@ -2271,3 +2271,55 @@ func TestFSExpirationError(t *testing.T) {
t.Fatalf("Expected next expiration to be set to 5secs from now, got %v", time.Duration(nextExpiration))
}
}

func TestFSMsgsFileVersionError(t *testing.T) {
for _, test := range []struct {
name string
dat bool
suffix string
}{
{"data", true, datSuffix},
{"index", false, idxSuffix},
} {
t.Run(test.name, func(t *testing.T) {
cleanupFSDatastore(t)
defer cleanupFSDatastore(t)

s := createDefaultFileStore(t)
defer s.Close()

c := storeCreateChannel(t, s, "foo")
storeMsg(t, c, "foo", 1, []byte("hello"))
if err := c.Msgs.Flush(); err != nil {
t.Fatalf("Error flushing store: %v", err)
}
var fname string
ms := c.Msgs.(*FileMsgStore)
ms.Lock()
if test.dat {
fname = ms.files[1].file.name
} else {
fname = ms.files[1].idxFile.name
}
ms.Unlock()

s.Close()

os.Remove(fname)
if err := ioutil.WriteFile(fname, []byte(""), 0666); err != nil {
t.Fatalf("Error writing file: %v", err)
}

s, err := NewFileStore(testLogger, testFSDefaultDatastore, nil)
if err != nil {
t.Fatalf("Error creating filestore: %v", err)
}
defer s.Close()
if _, err := s.Recover(); err == nil ||
!strings.Contains(err.Error(), "unable to recover message store for [foo]") ||
!strings.Contains(err.Error(), fmt.Sprintf("%s1%s", msgFilesPrefix, test.suffix)) {
t.Fatalf("Unexpected error: %v", err)
}
})
}
}

0 comments on commit 5abe9ec

Please sign in to comment.