Skip to content

Commit

Permalink
Merge 916e51e into a40dd6d
Browse files Browse the repository at this point in the history
  • Loading branch information
kozlovic committed Apr 17, 2020
2 parents a40dd6d + 916e51e commit cac45af
Show file tree
Hide file tree
Showing 4 changed files with 99 additions and 6 deletions.
2 changes: 1 addition & 1 deletion server/server_run_test.go
Expand Up @@ -1148,7 +1148,7 @@ func TestDontExposeUserPassword(t *testing.T) {
waitFor(t, 2*time.Second, 15*time.Millisecond, func() error {
l.Lock()
for _, n := range l.notices {
if strings.Contains(n, "reconnected to NATS Server at") {
if strings.Contains(n, "general\" reconnected to NATS Server at") {
msg = n
l.Unlock()
return nil
Expand Down
19 changes: 14 additions & 5 deletions stores/filestore.go
Expand Up @@ -1344,13 +1344,13 @@ func (fs *FileStore) Recover() (*RecoveredState, error) {
// in APPEND mode to allow truncate to work).
fs.serverFile, err = fs.fm.createFile(serverFileName, os.O_RDWR|os.O_CREATE, nil)
if err != nil {
return nil, err
return nil, fmt.Errorf("unable to recover server file %q: %v", serverFileName, err)
}

// Open/Create the client file.
fs.clientsFile, err = fs.fm.createFile(clientsFileName, defaultFileFlags, nil)
if err != nil {
return nil, err
return nil, fmt.Errorf("unable to recover client file %q: %v", clientsFileName, err)
}

// Recover the server file.
Expand Down Expand Up @@ -3872,7 +3872,7 @@ func (ms *FileMsgStore) Empty() error {
////////////////////////////////////////////////////////////////////////////

// newFileSubStore returns a new instace of a file SubStore.
func (fs *FileStore) newFileSubStore(channel string, limits *SubStoreLimits, doRecover bool) (*FileSubStore, error) {
func (fs *FileStore) newFileSubStore(channel string, limits *SubStoreLimits, doRecover bool) (fss *FileSubStore, retErr error) {
ss := &FileSubStore{
fstore: fs,
fm: fs.fm,
Expand All @@ -3883,8 +3883,17 @@ func (fs *FileStore) newFileSubStore(channel string, limits *SubStoreLimits, doR
// Convert the CompactInterval in time.Duration
ss.compactItvl = time.Duration(ss.opts.CompactInterval) * time.Second

var err error
defer func() {
if retErr != nil {
action := "create"
if doRecover {
action = "recover"
}
retErr = fmt.Errorf("unable to %s subscription store for [%s]: %v", action, channel, retErr)
}
}()

var err error
fileName := filepath.Join(channel, subsFileName)
ss.file, err = fs.fm.createFile(fileName, defaultFileFlags, func() error {
ss.writer = nil
Expand All @@ -3905,7 +3914,7 @@ func (fs *FileStore) newFileSubStore(channel string, limits *SubStoreLimits, doR
if err := ss.recoverSubscriptions(); err != nil {
fs.fm.unlockFile(ss.file)
ss.Close()
return nil, fmt.Errorf("unable to recover subscription store for [%s]: %v", channel, err)
return nil, err
}
}
// Do not attempt to shrink unless the option is greater than the
Expand Down
41 changes: 41 additions & 0 deletions stores/filestore_sub_test.go
Expand Up @@ -15,10 +15,12 @@ package stores

import (
"hash/crc32"
"io/ioutil"
"os"
"path/filepath"
"reflect"
"runtime"
"strings"
"testing"
"time"

Expand Down Expand Up @@ -883,3 +885,42 @@ func TestFSSubscriptionsFileWithExtraZeros(t *testing.T) {
}
}
}

func TestFSSubscriptionsFileVersionError(t *testing.T) {
cleanupFSDatastore(t)
defer cleanupFSDatastore(t)

s := createDefaultFileStore(t)
defer s.Close()

c := storeCreateChannel(t, s, "foo")
ss := c.Subs
sub1 := &spb.SubState{
ClientID: "me",
Inbox: "inbox",
AckInbox: "ackInbox",
AckWaitInSecs: 10,
}
if err := ss.CreateSub(sub1); err != nil {
t.Fatalf("Error creating sub: %v", err)
}
ss.(*FileSubStore).RLock()
fname := ss.(*FileSubStore).file.name
ss.(*FileSubStore).RUnlock()

s.Close()

os.Remove(fname)
if err := ioutil.WriteFile(fname, []byte(""), 0666); err != nil {
t.Fatalf("Error writing file: %v", err)
}

s, err := NewFileStore(testLogger, testFSDefaultDatastore, nil)
if err != nil {
t.Fatalf("Error creating filestore: %v", err)
}
defer s.Close()
if _, err := s.Recover(); err == nil || !strings.Contains(err.Error(), "recover subscription store for [foo]") {
t.Fatalf("Unexpected error: %v", err)
}
}
43 changes: 43 additions & 0 deletions stores/filestore_test.go
Expand Up @@ -2045,3 +2045,46 @@ func TestFSAutoSync(t *testing.T) {
t.Fatalf("Subscription store was not sync'ed after new activity (sync count was %v, now %v)", ssSynced, n)
}
}

func TestFSServerAndClientFilesVersionError(t *testing.T) {
for _, test := range []struct {
name string
server bool
fname string
}{
{"server", true, serverFileName},
{"client", false, clientsFileName},
} {
t.Run(test.name, func(t *testing.T) {
cleanupFSDatastore(t)
defer cleanupFSDatastore(t)

s := createDefaultFileStore(t)
defer s.Close()

var fname string
s.Lock()
if test.server {
fname = s.serverFile.name
} else {
fname = s.clientsFile.name
}
s.Unlock()

s.Close()
os.Remove(fname)
if err := ioutil.WriteFile(fname, []byte(""), 0666); err != nil {
t.Fatalf("Error creating file: %v", err)
}

s, err := NewFileStore(testLogger, testFSDefaultDatastore, nil)
if err != nil {
t.Fatalf("Error creating file store: %v", err)
}
defer s.Close()
if _, err := s.Recover(); err == nil || !strings.Contains(err.Error(), fmt.Sprintf("unable to recover %s file %q", test.name, test.fname)) {
t.Fatalf("Unexpected error: %v", err)
}
})
}
}

0 comments on commit cac45af

Please sign in to comment.