Skip to content

Commit

Permalink
Merge pull request #875 from nats-io/filestore_optimizations
Browse files Browse the repository at this point in the history
[ADDED] FileStore: read ahead capability
  • Loading branch information
kozlovic committed Jul 9, 2019
2 parents 89fe794 + 6eaeb57 commit 502d27d
Show file tree
Hide file tree
Showing 8 changed files with 499 additions and 8 deletions.
1 change: 1 addition & 0 deletions nats-streaming-server.go
Expand Up @@ -80,6 +80,7 @@ Streaming Server File Store Options:
--file_fds_limit <int> Store will try to use no more file descriptors than this given limit
--file_parallel_recovery <int> On startup, number of channels that can be recovered in parallel
--file_truncate_bad_eof <bool> Truncate files for which there is an unexpected EOF on recovery, dataloss may occur
--file_read_buffer_size <size> Size of messages read ahead buffer (0 to disable)
Streaming Server SQL Store Options:
--sql_driver <string> Name of the SQL Driver ("mysql" or "postgres")
Expand Down
10 changes: 10 additions & 0 deletions server/conf.go
Expand Up @@ -498,6 +498,11 @@ func parseFileOptions(itf interface{}, opts *Options) error {
return err
}
opts.FileStoreOpts.ParallelRecovery = int(v.(int64))
case "file_read_buffer_size", "read_buffer_size":
if err := checkType(k, reflect.Int64, v); err != nil {
return err
}
opts.FileStoreOpts.ReadBufferSize = int(v.(int64))
}
}
return nil
Expand Down Expand Up @@ -592,6 +597,7 @@ func ConfigureOptions(fs *flag.FlagSet, args []string, printVersion, printHelp,
fs.IntVar(&sopts.FileStoreOpts.CompactInterval, "file_compact_interval", stores.DefaultFileStoreOptions.CompactInterval, "stan.FileStoreOpts.CompactInterval")
fs.String("file_compact_min_size", fmt.Sprintf("%v", stores.DefaultFileStoreOptions.CompactMinFileSize), "stan.FileStoreOpts.CompactMinFileSize")
fs.String("file_buffer_size", fmt.Sprintf("%v", stores.DefaultFileStoreOptions.BufferSize), "stan.FileStoreOpts.BufferSize")
fs.String("file_read_buffer_size", fmt.Sprintf("%v", stores.DefaultFileStoreOptions.ReadBufferSize), "")
fs.BoolVar(&sopts.FileStoreOpts.DoCRC, "file_crc", stores.DefaultFileStoreOptions.DoCRC, "stan.FileStoreOpts.DoCRC")
fs.Int64Var(&sopts.FileStoreOpts.CRCPolynomial, "file_crc_poly", stores.DefaultFileStoreOptions.CRCPolynomial, "stan.FileStoreOpts.CRCPolynomial")
fs.BoolVar(&sopts.FileStoreOpts.DoSync, "file_sync", stores.DefaultFileStoreOptions.DoSync, "stan.FileStoreOpts.DoSync")
Expand Down Expand Up @@ -694,6 +700,10 @@ func ConfigureOptions(fs *flag.FlagSet, args []string, printVersion, printHelp,
var i64 int64
i64, flagErr = getBytes(f)
sopts.FileStoreOpts.BufferSize = int(i64)
case "file_read_buffer_size":
var i64 int64
i64, flagErr = getBytes(f)
sopts.FileStoreOpts.ReadBufferSize = int(i64)
}
})
if flagErr != nil {
Expand Down
9 changes: 8 additions & 1 deletion server/conf_test.go
Expand Up @@ -117,6 +117,9 @@ func TestParseConfig(t *testing.T) {
if opts.FileStoreOpts.ParallelRecovery != 9 {
t.Fatalf("Expected ParallelRecovery to be 9, got %v", opts.FileStoreOpts.ParallelRecovery)
}
if opts.FileStoreOpts.ReadBufferSize != 10 {
t.Fatalf("Expected ReadBufferSize to be 10, got %v", opts.FileStoreOpts.ReadBufferSize)
}
if opts.MaxChannels != 11 {
t.Fatalf("Expected MaxChannels to be 11, got %v", opts.MaxChannels)
}
Expand Down Expand Up @@ -434,6 +437,7 @@ func TestParseWrongTypes(t *testing.T) {
expectFailureFor(t, "file:{compact_interval:false}", wrongTypeErr)
expectFailureFor(t, "file:{compact_min_size:false}", wrongTypeErr)
expectFailureFor(t, "file:{buffer_size:false}", wrongTypeErr)
expectFailureFor(t, "file:{read_buffer_size:false}", wrongTypeErr)
expectFailureFor(t, "file:{crc:123}", wrongTypeErr)
expectFailureFor(t, "file:{crc_poly:false}", wrongTypeErr)
expectFailureFor(t, "file:{sync:123}", wrongTypeErr)
Expand Down Expand Up @@ -545,7 +549,7 @@ func TestParseConfigureOptions(t *testing.T) {
}

// Test bytes values
sopts, _ = mustNotFail([]string{"-max_bytes", "100KB", "-mb", "100KB", "-file_compact_min_size", "200KB", "-file_buffer_size", "300KB"})
sopts, _ = mustNotFail([]string{"-max_bytes", "100KB", "-mb", "100KB", "-file_compact_min_size", "200KB", "-file_buffer_size", "300KB", "-file_read_buffer_size", "1MB"})
if sopts.MaxBytes != 100*1024 {
t.Fatalf("Expected max_bytes to be 100KB, got %v", sopts.MaxBytes)
}
Expand All @@ -555,6 +559,9 @@ func TestParseConfigureOptions(t *testing.T) {
if sopts.FileStoreOpts.BufferSize != 300*1024 {
t.Fatalf("Expected file_buffer_size to be 300KB, got %v", sopts.FileStoreOpts.BufferSize)
}
if sopts.FileStoreOpts.ReadBufferSize != 1024*1024 {
t.Fatalf("Expected file_read_buffer_size to be 1MB, got %v", sopts.FileStoreOpts.ReadBufferSize)
}

// Failures with bytes
expectToFail([]string{"-max_bytes", "12abc"}, "should be a size")
Expand Down
1 change: 1 addition & 0 deletions stores/common_test.go
Expand Up @@ -375,6 +375,7 @@ func TestMain(m *testing.M) {
var encryptionKey string

flag.BoolVar(&testFSDisableBufferWriters, "fs_no_buffer", false, "Disable use of buffer writers")
flag.BoolVar(&testFSDisableReadBuffer, "fs_no_read_buffer", false, "Disable use of read buffer")
flag.BoolVar(&testFSSetFDsLimit, "fs_set_fds_limit", false, "Set some FDs limit")
flag.BoolVar(&doSQL, "sql", true, "Set this to false if you don't want SQL to be tested")
test.AddSQLFlags(flag.CommandLine, &testSQLDriver, &testSQLSource, &testSQLSourceAdmin, &testSQLDatabaseName)
Expand Down

0 comments on commit 502d27d

Please sign in to comment.