Skip to content

Commit

Permalink
Merge 30afcc4 into fd8baca
Browse files Browse the repository at this point in the history
  • Loading branch information
kozlovic committed Jul 15, 2019
2 parents fd8baca + 30afcc4 commit 4f6acfb
Show file tree
Hide file tree
Showing 7 changed files with 212 additions and 12 deletions.
2 changes: 2 additions & 0 deletions README.md
Expand Up @@ -1440,6 +1440,7 @@ Streaming Server File Store Options:
--file_parallel_recovery <int> On startup, number of channels that can be recovered in parallel
--file_truncate_bad_eof <bool> Truncate files for which there is an unexpected EOF on recovery, dataloss may occur
--file_read_buffer_size <size> Size of messages read ahead buffer (0 to disable)
--file_auto_sync <duration> Interval at which the store should be automatically flushed and sync'ed on disk (<= 0 to disable)
Streaming Server SQL Store Options:
--sql_driver <string> Name of the SQL Driver ("mysql" or "postgres")
Expand Down Expand Up @@ -1633,6 +1634,7 @@ File Options Configuration:
| file_descriptors_limit | Channels translate to sub-directories under the file store's root directory. Each channel needs several files to maintain the state so the need for file descriptors increase with the number of channels. This option instructs the store to limit the concurrent use of file descriptors. Note that this is a soft limit and there may be cases when the store will use more than this number. A value of 0 means no limit. Setting a limit will probably have a performance impact | Number >= 0 | `file_descriptors_limit: 100` |
| parallel_recovery | When the server starts, the recovery of channels (directories) is done sequentially. However, when using SSDs, it may be worth setting this value to something higher than 1 to perform channels recovery in parallel | Number >= 1 | `parallel_recovery: 4` |
| read_buffer_size | Size of buffers used to read ahead from message stores. This can significantly speed up sending messages to consumers after messages have been published. Default is 2MB. Set to 0 to disable | Bytes | `read_buffer_size: 2MB` |
| auto_sync | Interval at which the store should be automatically flushed and sync'ed on disk. Default is every minute. Set to <=0 to disable | Duration | `auto_sync: "2m"` |

Cluster Configuration:

Expand Down
1 change: 1 addition & 0 deletions nats-streaming-server.go
Expand Up @@ -81,6 +81,7 @@ Streaming Server File Store Options:
--file_parallel_recovery <int> On startup, number of channels that can be recovered in parallel
--file_truncate_bad_eof <bool> Truncate files for which there is an unexpected EOF on recovery, dataloss may occur
--file_read_buffer_size <size> Size of messages read ahead buffer (0 to disable)
--file_auto_sync <duration> Interval at which the store should be automatically flushed and sync'ed on disk (<= 0 to disable)
Streaming Server SQL Store Options:
--sql_driver <string> Name of the SQL Driver ("mysql" or "postgres")
Expand Down
10 changes: 10 additions & 0 deletions server/conf.go
Expand Up @@ -503,6 +503,15 @@ func parseFileOptions(itf interface{}, opts *Options) error {
return err
}
opts.FileStoreOpts.ReadBufferSize = int(v.(int64))
case "file_auto_sync", "auto_sync":
if err := checkType(k, reflect.String, v); err != nil {
return err
}
dur, err := time.ParseDuration(v.(string))
if err != nil {
return err
}
opts.FileStoreOpts.AutoSync = dur
}
}
return nil
Expand Down Expand Up @@ -608,6 +617,7 @@ func ConfigureOptions(fs *flag.FlagSet, args []string, printVersion, printHelp,
fs.Int64Var(&sopts.FileStoreOpts.FileDescriptorsLimit, "file_fds_limit", stores.DefaultFileStoreOptions.FileDescriptorsLimit, "stan.FileStoreOpts.FileDescriptorsLimit")
fs.IntVar(&sopts.FileStoreOpts.ParallelRecovery, "file_parallel_recovery", stores.DefaultFileStoreOptions.ParallelRecovery, "stan.FileStoreOpts.ParallelRecovery")
fs.BoolVar(&sopts.FileStoreOpts.TruncateUnexpectedEOF, "file_truncate_bad_eof", stores.DefaultFileStoreOptions.TruncateUnexpectedEOF, "Truncate files for which there is an unexpected EOF on recovery, dataloss may occur")
fs.DurationVar(&sopts.FileStoreOpts.AutoSync, "file_auto_sync", stores.DefaultFileStoreOptions.AutoSync, "Interval at which the store should be automatically flushed and sync'ed on disk (<= 0 to disable)")
fs.IntVar(&sopts.IOBatchSize, "io_batch_size", DefaultIOBatchSize, "stan.IOBatchSize")
fs.Int64Var(&sopts.IOSleepTime, "io_sleep_time", DefaultIOSleepTime, "stan.IOSleepTime")
fs.StringVar(&sopts.FTGroupName, "ft_group", "", "stan.FTGroupName")
Expand Down
5 changes: 5 additions & 0 deletions server/conf_test.go
Expand Up @@ -120,6 +120,9 @@ func TestParseConfig(t *testing.T) {
if opts.FileStoreOpts.ReadBufferSize != 10 {
t.Fatalf("Expected ReadBufferSize to be 10, got %v", opts.FileStoreOpts.ReadBufferSize)
}
if opts.FileStoreOpts.AutoSync != 2*time.Minute {
t.Fatalf("Expected AutoSync to be 2minutes, got %v", opts.FileStoreOpts.AutoSync)
}
if opts.MaxChannels != 11 {
t.Fatalf("Expected MaxChannels to be 11, got %v", opts.MaxChannels)
}
Expand Down Expand Up @@ -448,6 +451,8 @@ func TestParseWrongTypes(t *testing.T) {
expectFailureFor(t, "file:{slice_archive_script:123}", wrongTypeErr)
expectFailureFor(t, "file:{fds_limit:false}", wrongTypeErr)
expectFailureFor(t, "file:{parallel_recovery:false}", wrongTypeErr)
expectFailureFor(t, "file:{auto_sync:123}", wrongTypeErr)
expectFailureFor(t, "file:{auto_sync:\"1h:0m\"}", wrongTimeErr)
expectFailureFor(t, "cluster:{node_id:false}", wrongTypeErr)
expectFailureFor(t, "cluster:{bootstrap:1}", wrongTypeErr)
expectFailureFor(t, "cluster:{peers:1}", wrongTypeErr)
Expand Down
109 changes: 98 additions & 11 deletions stores/filestore.go
Expand Up @@ -189,6 +189,13 @@ type FileStoreOptions struct {
// client will be asking for the following sequential messages, so this
// is a read ahead optimization.
ReadBufferSize int

// AutoSync defines how often the store will flush and sync the files in
// the background. The default is set to 60 seconds.
// This is useful when a file sync is not desired for each Flush() call
// by setting DoSync to false.
// Setting AutoSync to any value <= 0 will disable auto sync.
AutoSync time.Duration
}

// This is an internal error to detect situations where we do
Expand All @@ -210,6 +217,7 @@ var DefaultFileStoreOptions = FileStoreOptions{
SliceMaxBytes: 64 * 1024 * 1024, // 64MB
ParallelRecovery: 1,
ReadBufferSize: 2 * 1024 * 1024, // 2MB
AutoSync: time.Minute,
}

// BufferSize is a FileStore option that sets the size of the buffer used
Expand Down Expand Up @@ -315,6 +323,15 @@ func DoSync(enableFileSync bool) FileStoreOption {
}
}

// AutoSync is a FileStore option that defines how often each store is sync'ed on disk.
// Any value <= 0 will disable this feature.
func AutoSync(dur time.Duration) FileStoreOption {
return func(o *FileStoreOptions) error {
o.AutoSync = dur
return nil
}
}

// SliceConfig is a FileStore option that allows the configuration of
// file slice limits and optional archive script file name.
func SliceConfig(maxMsgs int, maxBytes int64, maxAge time.Duration, script string) FileStoreOption {
Expand Down Expand Up @@ -399,6 +416,9 @@ func AllOptions(opts *FileStoreOptions) FileStoreOption {
if err := ReadBufferSize(opts.ReadBufferSize)(o); err != nil {
return err
}
if err := AutoSync(opts.AutoSync)(o); err != nil {
return err
}
o.CompactEnabled = opts.CompactEnabled
o.DoCRC = opts.DoCRC
o.DoSync = opts.DoSync
Expand Down Expand Up @@ -490,6 +510,7 @@ type FileStore struct {
cliCompactTS time.Time
crcTable *crc32.Table
lockFile util.LockFile
autoSyncTimer *time.Timer
}

type subscription struct {
Expand Down Expand Up @@ -648,6 +669,7 @@ var (
bkgTasksSleepDuration = defaultBkgTasksSleepDuration
cacheTTL = int64(defaultCacheTTL)
sliceCloseInterval = defaultSliceCloseInterval
testAutoSync = int64(0)
)

// FileStoreTestSetBackgroundTaskInterval is used by tests to reduce the interval
Expand Down Expand Up @@ -1277,9 +1299,47 @@ func NewFileStore(log logger.Logger, rootDir string, limits *StoreLimits, option
os.Remove(truncateFName)
}

if fs.opts.AutoSync > 0 {
fs.autoSyncTimer = time.AfterFunc(fs.opts.AutoSync, fs.autoSync)
}

return fs, nil
}

// autoSync will periodically flush and sync msgs and sub stores on disk.
func (fs *FileStore) autoSync() {
fs.Lock()
if fs.closed {
fs.Unlock()
return
}

if len(fs.channels) > 0 {
channels := make([]*Channel, 0, len(fs.channels))
for _, c := range fs.channels {
channels = append(channels, c)
}
fs.Unlock()

for _, c := range channels {
if n := atomic.LoadInt64(&testAutoSync); n > 0 {
time.Sleep(time.Duration(n))
}
c.Msgs.(*FileMsgStore).autoSync()
c.Subs.(*FileSubStore).autoSync()
}

fs.Lock()
if fs.closed {
fs.Unlock()
return
}
}

fs.autoSyncTimer.Reset(fs.opts.AutoSync)
fs.Unlock()
}

type channelRecoveryCtx struct {
wg *sync.WaitGroup
poolCh chan struct{}
Expand Down Expand Up @@ -1842,6 +1902,9 @@ func (fs *FileStore) Close() error {

fm := fs.fm
lockFile := fs.lockFile
if fs.autoSyncTimer != nil {
fs.autoSyncTimer.Stop()
}
fs.Unlock()

if fm != nil {
Expand Down Expand Up @@ -2775,7 +2838,7 @@ processErr:

func (ms *FileMsgStore) fillGaps(fslice *fileSlice, upToMsg *pb.MsgProto) error {
// flush possible buffered messages.
if err := ms.flush(fslice); err != nil {
if err := ms.flush(fslice, false); err != nil {
return err
}

Expand Down Expand Up @@ -3657,7 +3720,7 @@ func (ms *FileMsgStore) Close() error {
if ms.writeSlice != nil {
// Flush current file slice where writes happen
ms.lockFiles(ms.writeSlice)
err = ms.flush(ms.writeSlice)
err = ms.flush(ms.writeSlice, true)
ms.unlockFiles(ms.writeSlice)
}
// Remove/close all file slices
Expand All @@ -3676,7 +3739,7 @@ func (ms *FileMsgStore) Close() error {
return err
}

func (ms *FileMsgStore) flush(fslice *fileSlice) error {
func (ms *FileMsgStore) flush(fslice *fileSlice, forceSync bool) error {
if ms.bw != nil && ms.bw.buf != nil && ms.bw.buf.Buffered() > 0 {
if err := ms.bw.buf.Flush(); err != nil {
return err
Expand All @@ -3691,7 +3754,7 @@ func (ms *FileMsgStore) flush(fslice *fileSlice) error {
return err
}
}
if ms.fstore.opts.DoSync {
if ms.fstore.opts.DoSync || forceSync {
if err := fslice.file.handle.Sync(); err != nil {
return err
}
Expand All @@ -3709,14 +3772,26 @@ func (ms *FileMsgStore) Flush() error {
if ms.writeSlice != nil {
err = ms.lockFiles(ms.writeSlice)
if err == nil {
err = ms.flush(ms.writeSlice)
err = ms.flush(ms.writeSlice, false)
ms.unlockFiles(ms.writeSlice)
}
}
ms.Unlock()
return err
}

// Flushes and sync the message store on disk.
func (ms *FileMsgStore) autoSync() {
ms.Lock()
if !ms.closed && ms.writeSlice != nil {
if err := ms.lockFiles(ms.writeSlice); err == nil {
ms.flush(ms.writeSlice, true)
ms.unlockFiles(ms.writeSlice)
}
}
ms.Unlock()
}

// Empty implements the MsgStore interface
func (ms *FileMsgStore) Empty() error {
ms.Lock()
Expand Down Expand Up @@ -3781,7 +3856,7 @@ func (fs *FileStore) newFileSubStore(channel string, limits *SubStoreLimits, doR
fileName := filepath.Join(channel, subsFileName)
ss.file, err = fs.fm.createFile(fileName, defaultFileFlags, func() error {
ss.writer = nil
return ss.flush()
return ss.flush(false)
})
if err != nil {
return nil, err
Expand Down Expand Up @@ -4270,9 +4345,9 @@ func (ss *FileSubStore) writeRecord(w io.Writer, recType recordType, rec record)
return nil
}

func (ss *FileSubStore) flush() error {
func (ss *FileSubStore) flush(forceSync bool) error {
// Skip this if nothing was written since the last flush
if !ss.activity {
if !ss.activity && !forceSync {
return nil
}
// Reset this now
Expand All @@ -4282,7 +4357,7 @@ func (ss *FileSubStore) flush() error {
return err
}
}
if ss.opts.DoSync {
if ss.opts.DoSync || forceSync {
return ss.file.handle.Sync()
}
return nil
Expand All @@ -4293,13 +4368,25 @@ func (ss *FileSubStore) Flush() error {
ss.Lock()
err := ss.lockFile()
if err == nil {
err = ss.flush()
err = ss.flush(false)
ss.fm.unlockFile(ss.file)
}
ss.Unlock()
return err
}

// Flush and sync the subscription store on disk.
func (ss *FileSubStore) autoSync() {
ss.Lock()
if !ss.closed {
if err := ss.lockFile(); err == nil {
ss.flush(true)
ss.fm.unlockFile(ss.file)
}
}
ss.Unlock()
}

// Close closes this store
func (ss *FileSubStore) Close() error {
ss.Lock()
Expand All @@ -4326,7 +4413,7 @@ func (ss *FileSubStore) Close() error {
var err error
if ss.fm.remove(ss.file) {
if ss.file.handle != nil {
err = ss.flush()
err = ss.flush(true)
err = util.CloseFile(err, ss.file.handle)
}
}
Expand Down

0 comments on commit 4f6acfb

Please sign in to comment.