Skip to content

Commit

Permalink
Merge 9117aac into 196a4a3
Browse files Browse the repository at this point in the history
  • Loading branch information
kozlovic committed Jul 22, 2019
2 parents 196a4a3 + 9117aac commit efbb7bb
Show file tree
Hide file tree
Showing 4 changed files with 146 additions and 98 deletions.
23 changes: 23 additions & 0 deletions stores/cryptostore_test.go
Expand Up @@ -21,6 +21,7 @@ import (
"reflect"
"strings"
"testing"
"time"

"github.com/nats-io/stan.go/pb"
)
Expand Down Expand Up @@ -493,3 +494,25 @@ func TestCryptoStoreMultipleCiphers(t *testing.T) {
}
}
}

func TestCryptoFileAutoSync(t *testing.T) {
cleanupFSDatastore(t)
defer cleanupFSDatastore(t)

fs, _ := newFileStore(t, testFSDefaultDatastore, nil, AutoSync(15*time.Millisecond))
s, err := NewCryptoStore(fs, CryptoCipherAES, []byte("testkey"))
if err != nil {
t.Fatalf("Error creating store: %v", err)
}
defer s.Close()

// Add some state
cs := storeCreateChannel(t, s, "foo")
storeMsg(t, cs, "foo", 1, []byte("msg"))
storeSub(t, cs, "foo")

// Wait for auto sync to kick in
time.Sleep(50 * time.Millisecond)

// Server should not have panic'ed.
}
89 changes: 43 additions & 46 deletions stores/filestore.go
Expand Up @@ -510,7 +510,6 @@ type FileStore struct {
cliCompactTS time.Time
crcTable *crc32.Table
lockFile util.LockFile
autoSyncTimer *time.Timer
}

type subscription struct {
Expand Down Expand Up @@ -546,6 +545,9 @@ type FileSubStore struct {
activity bool // was there any write between two flush calls
writer io.Writer // this is either `bw` or `file` depending if buffer writer is used or not
shrinkTimer *time.Timer // timer associated with callback shrinking buffer when possible
syncTimer *time.Timer // timer associated with performing auto flush and disk sync
needSync bool // this is required to reduce sync'ing in case DoSync==false, but AutoSync>0
synced int64 // number of times the file is actually sync'ed
allDone sync.WaitGroup
}

Expand Down Expand Up @@ -636,6 +638,8 @@ type FileMsgStore struct {
bkgTasksWake chan bool // signal the background tasks go routine to get out of a sleep
allDone sync.WaitGroup
readBufSize int
needSync bool // this required to reduce sync'ing when DoSync==false, but AutoSync>0
synced int64 // number of times the file is actually sync'ed
}

type bufferPool struct {
Expand Down Expand Up @@ -669,7 +673,6 @@ var (
bkgTasksSleepDuration = defaultBkgTasksSleepDuration
cacheTTL = int64(defaultCacheTTL)
sliceCloseInterval = defaultSliceCloseInterval
testAutoSync = int64(0)
)

// FileStoreTestSetBackgroundTaskInterval is used by tests to reduce the interval
Expand Down Expand Up @@ -1299,47 +1302,9 @@ func NewFileStore(log logger.Logger, rootDir string, limits *StoreLimits, option
os.Remove(truncateFName)
}

if fs.opts.AutoSync > 0 {
fs.autoSyncTimer = time.AfterFunc(fs.opts.AutoSync, fs.autoSync)
}

return fs, nil
}

// autoSync will periodically flush and sync msgs and sub stores on disk.
func (fs *FileStore) autoSync() {
fs.Lock()
if fs.closed {
fs.Unlock()
return
}

if len(fs.channels) > 0 {
channels := make([]*Channel, 0, len(fs.channels))
for _, c := range fs.channels {
channels = append(channels, c)
}
fs.Unlock()

for _, c := range channels {
if n := atomic.LoadInt64(&testAutoSync); n > 0 {
time.Sleep(time.Duration(n))
}
c.Msgs.(*FileMsgStore).autoSync()
c.Subs.(*FileSubStore).autoSync()
}

fs.Lock()
if fs.closed {
fs.Unlock()
return
}
}

fs.autoSyncTimer.Reset(fs.opts.AutoSync)
fs.Unlock()
}

type channelRecoveryCtx struct {
wg *sync.WaitGroup
poolCh chan struct{}
Expand Down Expand Up @@ -1902,9 +1867,6 @@ func (fs *FileStore) Close() error {

fm := fs.fm
lockFile := fs.lockFile
if fs.autoSyncTimer != nil {
fs.autoSyncTimer.Stop()
}
fs.Unlock()

if fm != nil {
Expand Down Expand Up @@ -2655,6 +2617,11 @@ func (ms *FileMsgStore) Store(m *pb.MsgProto) (uint64, error) {
}
}

// In case DoSync is false, but we have auto-sync.
// This allow the background task to auto-sync only when there
// has been new activity since last sync.
ms.needSync = true

// Is there a gap in message sequence?
if ms.last > 0 && m.Sequence > ms.last+1 {
if err := ms.fillGaps(fslice, m); err != nil {
Expand Down Expand Up @@ -3189,6 +3156,9 @@ func (ms *FileMsgStore) backgroundTasks() {
nextExpiration := ms.expiration
lastCacheCheck := ms.timeTick
lastBufShrink := ms.timeTick
autoSyncInterval := int64(ms.fstore.opts.AutoSync)
doAutoSync := autoSyncInterval > 0
lastAutoSync := ms.timeTick
ms.RUnlock()

for {
Expand Down Expand Up @@ -3255,6 +3225,12 @@ func (ms *FileMsgStore) backgroundTasks() {
lastCacheCheck = timeTick
}

// Check for auto-sync
if doAutoSync && timeTick >= lastAutoSync+autoSyncInterval {
ms.autoSync()
lastAutoSync = timeTick
}

select {
case <-ms.bkgTasksDone:
return
Expand Down Expand Up @@ -3761,6 +3737,8 @@ func (ms *FileMsgStore) flush(fslice *fileSlice, forceSync bool) error {
if err := fslice.idxFile.handle.Sync(); err != nil {
return err
}
ms.needSync = false
ms.synced++
}
return nil
}
Expand All @@ -3783,7 +3761,7 @@ func (ms *FileMsgStore) Flush() error {
// Flushes and sync the message store on disk.
func (ms *FileMsgStore) autoSync() {
ms.Lock()
if !ms.closed && ms.writeSlice != nil {
if ms.needSync && !ms.closed && ms.writeSlice != nil {
if err := ms.lockFiles(ms.writeSlice); err == nil {
ms.flush(ms.writeSlice, true)
ms.unlockFiles(ms.writeSlice)
Expand Down Expand Up @@ -3893,6 +3871,12 @@ func (fs *FileStore) newFileSubStore(channel string, limits *SubStoreLimits, doR
} else {
fs.fm.unlockFile(ss.file)
}
if fs.opts.AutoSync > 0 {
ss.Lock()
ss.allDone.Add(1)
ss.syncTimer = time.AfterFunc(fs.opts.AutoSync, ss.autoSync)
ss.Unlock()
}
return ss, nil
}

Expand Down Expand Up @@ -4321,6 +4305,7 @@ func (ss *FileSubStore) writeRecord(w io.Writer, recType recordType, rec record)
}
// Indicate that we wrote something to the buffer/file
ss.activity = true
ss.needSync = true
switch recType {
case subRecNew:
ss.numRecs++
Expand Down Expand Up @@ -4358,7 +4343,11 @@ func (ss *FileSubStore) flush(forceSync bool) error {
}
}
if ss.opts.DoSync || forceSync {
return ss.file.handle.Sync()
if err := ss.file.handle.Sync(); err != nil {
return err
}
ss.needSync = false
ss.synced++
}
return nil
}
Expand All @@ -4379,10 +4368,13 @@ func (ss *FileSubStore) Flush() error {
func (ss *FileSubStore) autoSync() {
ss.Lock()
if !ss.closed {
if err := ss.lockFile(); err == nil {
if ss.needSync && ss.lockFile() == nil {
ss.flush(true)
ss.fm.unlockFile(ss.file)
}
ss.syncTimer.Reset(ss.fstore.opts.AutoSync)
} else {
ss.allDone.Done()
}
ss.Unlock()
}
Expand All @@ -4404,6 +4396,11 @@ func (ss *FileSubStore) Close() error {
ss.allDone.Done()
}
}
if ss.syncTimer != nil {
if ss.syncTimer.Stop() {
ss.allDone.Done()
}
}
ss.Unlock()

// Wait on timers/callbacks
Expand Down
101 changes: 49 additions & 52 deletions stores/filestore_test.go
Expand Up @@ -1974,24 +1974,40 @@ func TestFSAutoSync(t *testing.T) {

time.Sleep(150 * time.Millisecond)

ms.Lock()
notFlushed := ms.bw != nil && ms.bw.buf != nil && ms.bw.buf.Buffered() > 0
ms.Unlock()
if !notFlushed {
t.Fatalf("Message store should not have been flushed")
}

ss.Lock()
notFlushed = ss.bw != nil && ss.bw.buf != nil && ss.bw.buf.Buffered() > 0
ss.Unlock()
if !notFlushed {
t.Fatalf("Message store should not have been flushed")
checkMsgStoreFlushed := func(t *testing.T, ms *FileMsgStore, shouldBeFlushed bool) int64 {
t.Helper()
ms.Lock()
flushed := ms.bw != nil && ms.bw.buf != nil && ms.bw.buf.Buffered() == 0
synced := ms.synced
ms.Unlock()
if shouldBeFlushed && !flushed {
t.Fatalf("Message store should have been flushed")
} else if !shouldBeFlushed && flushed {
t.Fatalf("Message store should not have been flushed")
}
return synced
}
checkMsgStoreFlushed(t, ms, false)

checkSubStoreFlushed := func(t *testing.T, ss *FileSubStore, shouldBeFlushed bool) int64 {
t.Helper()
ss.Lock()
flushed := ss.bw != nil && ss.bw.buf != nil && ss.bw.buf.Buffered() == 0
synced := ss.synced
ss.Unlock()
if shouldBeFlushed && !flushed {
t.Fatalf("Subscription store should have been flushed")
} else if !shouldBeFlushed && flushed {
t.Fatalf("Subscription store should not have been flushed")
}
return synced
}
checkSubStoreFlushed(t, ss, false)

s.Close()
cleanupFSDatastore(t)
// Verify that auto sync works
s = createDefaultFileStore(t, BufferSize(1024), AutoSync(100*time.Millisecond))
s = createDefaultFileStore(t, BufferSize(1024), AutoSync(15*time.Millisecond))
defer s.Close()

c = storeCreateChannel(t, s, "foo")
Expand All @@ -2001,50 +2017,31 @@ func TestFSAutoSync(t *testing.T) {
storeMsg(t, c, "foo", 1, []byte("hello"))
storeSub(t, c, "foo")

time.Sleep(150 * time.Millisecond)

ms.Lock()
notFlushed = ms.bw != nil && ms.bw.buf != nil && ms.bw.buf.Buffered() > 0
ms.Unlock()
if notFlushed {
t.Fatalf("Message store should have been flushed")
}

ss.Lock()
notFlushed = ss.bw != nil && ss.bw.buf != nil && ss.bw.buf.Buffered() > 0
ss.Unlock()
if notFlushed {
t.Fatalf("Message store should have been flushed")
}

s.Close()
time.Sleep(50 * time.Millisecond)

cleanupFSDatastore(t)
// Verify that auto sync works if a channel is removed
// Make the auto sync code wait for 300ms when getting ready to sync a msg/sub store.
atomic.StoreInt64(&testAutoSync, int64(300*time.Millisecond))
defer atomic.StoreInt64(&testAutoSync, 0)
msSynced := checkMsgStoreFlushed(t, ms, true)
ssSynced := checkSubStoreFlushed(t, ss, true)

s = createDefaultFileStore(t, BufferSize(1024), AutoSync(150*time.Millisecond))
defer s.Close()
// Check that without new activity, there is no unnecessary sync'ing.
time.Sleep(100 * time.Millisecond)

c = storeCreateChannel(t, s, "foo")
if n := checkMsgStoreFlushed(t, ms, true); n != msSynced {
t.Fatalf("Message store is unnecessarily sync'ed (sync count was %v, now %v)", msSynced, n)
}
if n := checkSubStoreFlushed(t, ss, true); n != ssSynced {
t.Fatalf("Subscription store is unnecessarily sync'ed (sync count was %v, now %v)", ssSynced, n)
}

storeMsg(t, c, "foo", 1, []byte("hello"))
subID := storeSub(t, c, "foo")
storeSubDelete(t, c, "foo", subID)
// Add new activity and check things are now updated.
storeMsg(t, c, "foo", 2, []byte("hello"))
storeSub(t, c, "foo")

// Wait for the autosync callback to pick up the channel's stores to
// perform an autosync. We have made the code wait 300ms before
// issuing the autoSync() call...
time.Sleep(200 * time.Millisecond)
time.Sleep(50 * time.Millisecond)

// Now delete channel
if err := s.DeleteChannel("foo"); err != nil {
t.Fatalf("Error deleting channel: %v", err)
if n := checkMsgStoreFlushed(t, ms, true); n == msSynced {
t.Fatalf("Message store was not sync'ed after new activity (sync count was %v, now %v)", msSynced, n)
}
if n := checkSubStoreFlushed(t, ss, true); n == ssSynced {
t.Fatalf("Subscription store was not sync'ed after new activity (sync count was %v, now %v)", ssSynced, n)
}

// Now wait for the autoSync callback to resume and make sure
// that we don't get a crash.
time.Sleep(300 * time.Millisecond)
}

0 comments on commit efbb7bb

Please sign in to comment.