Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FIXED] FileStore: Empty() should remove all message files (dat/idx) #1011

Merged
merged 1 commit into from Jan 17, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
13 changes: 6 additions & 7 deletions stores/filestore.go
Expand Up @@ -3831,24 +3831,23 @@ func (ms *FileMsgStore) Empty() error {
defer ms.Unlock()

var err error
// Remove/close all file slices
// Close all file slices
for sliceID, slice := range ms.files {
ms.fm.remove(slice.file)
ms.fm.remove(slice.idxFile)
if slice.file.handle != nil {
err = util.CloseFile(err, slice.file.handle)
}
if lerr := os.Remove(slice.file.name); lerr != nil && err == nil {
err = lerr
}
if slice.idxFile.handle != nil {
err = util.CloseFile(err, slice.idxFile.handle)
}
if lerr := os.Remove(slice.idxFile.name); lerr != nil && err == nil {
err = lerr
}
delete(ms.files, sliceID)
}
// Remove all message files (dat and idx) present
msgfiles, _ := filepath.Glob(filepath.Join(ms.fm.rootDir, ms.channelName, msgFilesPrefix+"*.*"))
for _, f := range msgfiles {
os.Remove(f)
}
// Reset generic counters
ms.empty()
// FileMsgStore specific
Expand Down
34 changes: 34 additions & 0 deletions stores/filestore_msg_test.go
Expand Up @@ -1745,6 +1745,40 @@ func TestFSRecoverEmptyIndexMsgFile(t *testing.T) {
}
}

func TestFSEmptyRemovesAllMsgsFiles(t *testing.T) {
cleanupFSDatastore(t)
defer cleanupFSDatastore(t)

s := createDefaultFileStore(t)
defer s.Close()

c := storeCreateChannel(t, s, "foo")
ms := c.Msgs
storeMsg(t, c, "foo", 1, []byte("msg"))
ms.(*FileMsgStore).RLock()
datname := ms.(*FileMsgStore).writeSlice.file.name
idxname := ms.(*FileMsgStore).writeSlice.idxFile.name
ms.(*FileMsgStore).RUnlock()
s.Close()

os.Remove(datname)

s, rs := openDefaultFileStore(t)
defer s.Close()

c = getRecoveredChannel(t, rs, "foo")
if n, b := msgStoreState(t, c.Msgs); n != 0 || b != 0 {
t.Fatalf("Unexpected state: %v - %v", n, b)
}
if err := c.Msgs.Empty(); err != nil {
t.Fatalf("Error on empty: %v", err)
}
// Make sure we have remove the index file too.
if fi, err := os.Stat(idxname); err == nil || fi != nil {
t.Fatalf("Expected index file to be gone, it was not")
}
}

func TestFSGetSeqFromTimestamp(t *testing.T) {
cleanupFSDatastore(t)
defer cleanupFSDatastore(t)
Expand Down