Skip to content

Commit

Permalink
[FIXED] Possible panic on FileStore.Close() with expiration
Browse files Browse the repository at this point in the history
On store close, the files were closed before the expiration routine
was shutdown, which could cause that routine to still try to
open files that have been removed from the internal file manager.

Resolves #365
  • Loading branch information
kozlovic committed Aug 9, 2017
1 parent a02c43a commit 28d4412
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 10 deletions.
29 changes: 19 additions & 10 deletions stores/filestore.go
Expand Up @@ -1901,7 +1901,9 @@ func (ms *FileMsgStore) doLockFiles(fslice *fileSlice, onlyIndexFile bool) error
}
idxWasOpened, err = ms.fm.lockFile(fslice.idxFile)
if err != nil {
ms.fm.unlockFile(fslice.file)
if !datWasOpened {
ms.fm.unlockFile(fslice.file)
}
return err
}
if !onlyIndexFile {
Expand Down Expand Up @@ -2976,6 +2978,16 @@ func (ms *FileMsgStore) Close() error {
}

ms.closed = true

// Signal the background tasks go-routine to exit
ms.bkgTasksDone <- true

ms.Unlock()

// Wait on go routines/timers to finish
ms.allDone.Wait()

ms.Lock()
var err error
if ms.writeSlice != nil {
// Flush current file slice where writes happen
Expand All @@ -2994,14 +3006,8 @@ func (ms *FileMsgStore) Close() error {
err = util.CloseFile(err, slice.idxFile.handle)
}
}
// Signal the background tasks go-routine to exit
ms.bkgTasksDone <- true

ms.Unlock()

// Wait on go routines/timers to finish
ms.allDone.Wait()

return err
}

Expand Down Expand Up @@ -3583,6 +3589,12 @@ func (ss *FileSubStore) Close() error {
ss.allDone.Done()
}
}
ss.Unlock()

// Wait on timers/callbacks
ss.allDone.Wait()

ss.Lock()
var err error
if ss.fm.remove(ss.file) {
if ss.file.handle != nil {
Expand All @@ -3592,8 +3604,5 @@ func (ss *FileSubStore) Close() error {
}
ss.Unlock()

// Wait on timers/callbacks
ss.allDone.Wait()

return err
}
23 changes: 23 additions & 0 deletions stores/filestore_msg_test.go
Expand Up @@ -1412,3 +1412,26 @@ func TestFSMsgStoreBackgroundTaskCrash(t *testing.T) {
time.Sleep(50 * time.Millisecond)
// It should not have crashed.
}

func TestFSPanicOnStoreCloseWhileMsgsExpire(t *testing.T) {
cleanupDatastore(t)
defer cleanupDatastore(t)

limits := testDefaultStoreLimits
limits.MaxAge = 30 * time.Millisecond

fs, _, err := newFileStore(t, defaultDataStore, &limits)
if err != nil {
t.Fatalf("Unable to create store: %v", err)
}
defer fs.Close()

cs := storeCreateChannel(t, fs, "foo")

for i := 0; i < 100; i++ {
storeMsg(t, cs, "foo", []byte("msg"))
}

time.Sleep(30 * time.Millisecond)
fs.Close()
}

0 comments on commit 28d4412

Please sign in to comment.