Skip to content

Commit

Permalink
Merge 68f19eb into 89fe794
Browse files Browse the repository at this point in the history
  • Loading branch information
kozlovic committed Jul 4, 2019
2 parents 89fe794 + 68f19eb commit 2435bdc
Show file tree
Hide file tree
Showing 8 changed files with 398 additions and 6 deletions.
1 change: 1 addition & 0 deletions nats-streaming-server.go
Expand Up @@ -80,6 +80,7 @@ Streaming Server File Store Options:
--file_fds_limit <int> Store will try to use no more file descriptors than this given limit
--file_parallel_recovery <int> On startup, number of channels that can be recovered in parallel
--file_truncate_bad_eof <bool> Truncate files for which there is an unexpected EOF on recovery, dataloss may occur
--file_read_buffer_size <size> Size of messages read ahead buffer (0 to disable)
Streaming Server SQL Store Options:
--sql_driver <string> Name of the SQL Driver ("mysql" or "postgres")
Expand Down
14 changes: 12 additions & 2 deletions server/conf.go
Expand Up @@ -498,6 +498,11 @@ func parseFileOptions(itf interface{}, opts *Options) error {
return err
}
opts.FileStoreOpts.ParallelRecovery = int(v.(int64))
case "file_read_buffer_size", "read_buffer_size":
if err := checkType(k, reflect.Int64, v); err != nil {
return err
}
opts.FileStoreOpts.ReadBufferSize = int(v.(int64))
}
}
return nil
Expand Down Expand Up @@ -592,6 +597,7 @@ func ConfigureOptions(fs *flag.FlagSet, args []string, printVersion, printHelp,
fs.IntVar(&sopts.FileStoreOpts.CompactInterval, "file_compact_interval", stores.DefaultFileStoreOptions.CompactInterval, "stan.FileStoreOpts.CompactInterval")
fs.String("file_compact_min_size", fmt.Sprintf("%v", stores.DefaultFileStoreOptions.CompactMinFileSize), "stan.FileStoreOpts.CompactMinFileSize")
fs.String("file_buffer_size", fmt.Sprintf("%v", stores.DefaultFileStoreOptions.BufferSize), "stan.FileStoreOpts.BufferSize")
fs.String("file_read_buffer_size", fmt.Sprintf("%v", stores.DefaultFileStoreOptions.ReadBufferSize), "")
fs.BoolVar(&sopts.FileStoreOpts.DoCRC, "file_crc", stores.DefaultFileStoreOptions.DoCRC, "stan.FileStoreOpts.DoCRC")
fs.Int64Var(&sopts.FileStoreOpts.CRCPolynomial, "file_crc_poly", stores.DefaultFileStoreOptions.CRCPolynomial, "stan.FileStoreOpts.CRCPolynomial")
fs.BoolVar(&sopts.FileStoreOpts.DoSync, "file_sync", stores.DefaultFileStoreOptions.DoSync, "stan.FileStoreOpts.DoSync")
Expand Down Expand Up @@ -690,10 +696,14 @@ func ConfigureOptions(fs *flag.FlagSet, args []string, printVersion, printHelp,
sopts.MaxBytes, flagErr = getBytes(f)
case "file_compact_min_size":
sopts.FileStoreOpts.CompactMinFileSize, flagErr = getBytes(f)
case "file_buffer_size":
case "file_buffer_size", "file_read_buffer_size":
var i64 int64
i64, flagErr = getBytes(f)
sopts.FileStoreOpts.BufferSize = int(i64)
if f.Name == "file_buffer_size" {
sopts.FileStoreOpts.BufferSize = int(i64)
} else {
sopts.FileStoreOpts.ReadBufferSize = int(i64)
}
}
})
if flagErr != nil {
Expand Down
9 changes: 8 additions & 1 deletion server/conf_test.go
Expand Up @@ -117,6 +117,9 @@ func TestParseConfig(t *testing.T) {
if opts.FileStoreOpts.ParallelRecovery != 9 {
t.Fatalf("Expected ParallelRecovery to be 9, got %v", opts.FileStoreOpts.ParallelRecovery)
}
if opts.FileStoreOpts.ReadBufferSize != 10 {
t.Fatalf("Expected ReadBufferSize to be 10, got %v", opts.FileStoreOpts.ReadBufferSize)
}
if opts.MaxChannels != 11 {
t.Fatalf("Expected MaxChannels to be 11, got %v", opts.MaxChannels)
}
Expand Down Expand Up @@ -434,6 +437,7 @@ func TestParseWrongTypes(t *testing.T) {
expectFailureFor(t, "file:{compact_interval:false}", wrongTypeErr)
expectFailureFor(t, "file:{compact_min_size:false}", wrongTypeErr)
expectFailureFor(t, "file:{buffer_size:false}", wrongTypeErr)
expectFailureFor(t, "file:{read_buffer_size:false}", wrongTypeErr)
expectFailureFor(t, "file:{crc:123}", wrongTypeErr)
expectFailureFor(t, "file:{crc_poly:false}", wrongTypeErr)
expectFailureFor(t, "file:{sync:123}", wrongTypeErr)
Expand Down Expand Up @@ -545,7 +549,7 @@ func TestParseConfigureOptions(t *testing.T) {
}

// Test bytes values
sopts, _ = mustNotFail([]string{"-max_bytes", "100KB", "-mb", "100KB", "-file_compact_min_size", "200KB", "-file_buffer_size", "300KB"})
sopts, _ = mustNotFail([]string{"-max_bytes", "100KB", "-mb", "100KB", "-file_compact_min_size", "200KB", "-file_buffer_size", "300KB", "-file_read_buffer_size", "1MB"})
if sopts.MaxBytes != 100*1024 {
t.Fatalf("Expected max_bytes to be 100KB, got %v", sopts.MaxBytes)
}
Expand All @@ -555,6 +559,9 @@ func TestParseConfigureOptions(t *testing.T) {
if sopts.FileStoreOpts.BufferSize != 300*1024 {
t.Fatalf("Expected file_buffer_size to be 300KB, got %v", sopts.FileStoreOpts.BufferSize)
}
if sopts.FileStoreOpts.ReadBufferSize != 1024*1024 {
t.Fatalf("Expected file_read_buffer_size to be 1MB, got %v", sopts.FileStoreOpts.ReadBufferSize)
}

// Failures with bytes
expectToFail([]string{"-max_bytes", "12abc"}, "should be a size")
Expand Down
1 change: 1 addition & 0 deletions stores/common_test.go
Expand Up @@ -375,6 +375,7 @@ func TestMain(m *testing.M) {
var encryptionKey string

flag.BoolVar(&testFSDisableBufferWriters, "fs_no_buffer", false, "Disable use of buffer writers")
flag.BoolVar(&testFSDisableReadBuffer, "fs_no_read_buffer", false, "Disable use of read buffer")
flag.BoolVar(&testFSSetFDsLimit, "fs_set_fds_limit", false, "Set some FDs limit")
flag.BoolVar(&doSQL, "sql", true, "Set this to false if you don't want SQL to be tested")
test.AddSQLFlags(flag.CommandLine, &testSQLDriver, &testSQLSource, &testSQLSourceAdmin, &testSQLDatabaseName)
Expand Down
180 changes: 178 additions & 2 deletions stores/filestore.go
Expand Up @@ -23,6 +23,7 @@ import (
"os"
"os/exec"
"path/filepath"
"runtime"
"strconv"
"strings"
"sync"
Expand Down Expand Up @@ -183,6 +184,11 @@ type FileStoreOptions struct {
// removed (the file is truncated at the beginning of the first incomplete
// record). Dataloss may occur.
TruncateUnexpectedEOF bool

// ReadBufferSize, if non zero, will cause the store to preload messages
// (up to this total size) when looking up a message. This is speeding
// things up when performing lookup of consecutive messages.
ReadBufferSize int
}

// This is an internal error to detect situations where we do
Expand All @@ -203,6 +209,7 @@ var DefaultFileStoreOptions = FileStoreOptions{
DoSync: true,
SliceMaxBytes: 64 * 1024 * 1024, // 64MB
ParallelRecovery: 1,
ReadBufferSize: 2 * 1024 * 1024, // 2MB
}

// BufferSize is a FileStore option that sets the size of the buffer used
Expand All @@ -217,6 +224,18 @@ func BufferSize(size int) FileStoreOption {
}
}

// ReadBufferSize is a FileStore option that sets the size of the buffer used
// during store reads. This can help improve read performance.
func ReadBufferSize(size int) FileStoreOption {
return func(o *FileStoreOptions) error {
if size < 0 {
return fmt.Errorf("read buffer size value must be a positive number")
}
o.ReadBufferSize = size
return nil
}
}

// CompactEnabled is a FileStore option that enables or disables file compaction.
// The value false will disable compaction.
func CompactEnabled(enabled bool) FileStoreOption {
Expand Down Expand Up @@ -377,6 +396,9 @@ func AllOptions(opts *FileStoreOptions) FileStoreOption {
if err := ParallelRecovery(opts.ParallelRecovery)(o); err != nil {
return err
}
if err := ReadBufferSize(opts.ReadBufferSize)(o); err != nil {
return err
}
o.CompactEnabled = opts.CompactEnabled
o.DoCRC = opts.DoCRC
o.DoSync = opts.DoSync
Expand Down Expand Up @@ -592,6 +614,7 @@ type FileMsgStore struct {
bkgTasksDone chan bool // signal the background tasks go routine to stop
bkgTasksWake chan bool // signal the background tasks go routine to get out of a sleep
allDone sync.WaitGroup
readBufSize int
}

// some variables based on constants but that we can change
Expand Down Expand Up @@ -1930,6 +1953,7 @@ func (fs *FileStore) newFileMsgStore(channelDirName, channel string, limits *Msg
channelName: channel,
bkgTasksDone: make(chan bool, 1),
bkgTasksWake: make(chan bool, 1),
readBufSize: fs.opts.ReadBufferSize,
}
ms.init(channel, fs.log, limits)

Expand Down Expand Up @@ -2503,6 +2527,10 @@ func (ms *FileMsgStore) readIndex(r io.Reader) (uint64, *msgIndex, error) {
if _, err := io.ReadFull(r, buf); err != nil {
return 0, nil, err
}
return ms.readIndexFromBuffer(buf)
}

func (ms *FileMsgStore) readIndexFromBuffer(buf []byte) (uint64, *msgIndex, error) {
mindex := &msgIndex{}
seq := util.ByteOrder.Uint64(buf)
mindex.offset = int64(util.ByteOrder.Uint64(buf[8:]))
Expand Down Expand Up @@ -3184,13 +3212,18 @@ func (ms *FileMsgStore) lookup(seq uint64) (*pb.MsgProto, error) {
if err != nil {
return nil, err
}
if ms.readBufSize > 0 {
msg, err = ms.readAheadMsgs(fslice, seq)
ms.unlockFiles(fslice)
return msg, err
}
msgIndex, err := ms.readMsgIndex(fslice, seq)
if msgIndex != nil {
file := fslice.file.handle
// Position file to message's offset. 0 means from start.
_, err = file.Seek(msgIndex.offset, io.SeekStart)
if err == nil {
ms.tmpMsgBuf, _, _, err = readRecord(file, ms.tmpMsgBuf, false, ms.fstore.crcTable, ms.fstore.opts.DoCRC)
ms.tmpMsgBuf, err = ms.readMsgRecord(file, ms.tmpMsgBuf, msgIndex.msgSize)
}
}
ms.unlockFiles(fslice)
Expand All @@ -3199,7 +3232,7 @@ func (ms *FileMsgStore) lookup(seq uint64) (*pb.MsgProto, error) {
}
// Recover this message
msg = &pb.MsgProto{}
err = msg.Unmarshal(ms.tmpMsgBuf[:msgIndex.msgSize])
err = msg.Unmarshal(ms.tmpMsgBuf[recordHeaderSize : recordHeaderSize+msgIndex.msgSize])
if err != nil {
return nil, err
}
Expand All @@ -3208,6 +3241,148 @@ func (ms *FileMsgStore) lookup(seq uint64) (*pb.MsgProto, error) {
return msg, nil
}

// This is equivalent to readRecord except that we know this is a message record
// and we know the expected message size so we can do a single read as opposed
// to read header + read payload.
func (ms *FileMsgStore) readMsgRecord(r io.Reader, buf []byte, expectedMsgSize uint32) ([]byte, error) {
totalSize := recordHeaderSize + int(expectedMsgSize)
buf = util.EnsureBufBigEnough(buf, totalSize)
if _, err := io.ReadFull(r, buf[:totalSize]); err != nil {
return buf, err
}
header := buf[:recordHeaderSize]
msgSize := util.ByteOrder.Uint32(header[:4])
if msgSize == 0 {
crc := util.ByteOrder.Uint32(header[4:recordHeaderSize])
if crc == 0 {
return buf, errNeedRewind
}
}
if msgSize != expectedMsgSize {
return buf, fmt.Errorf("expected size to be %v, got %v", expectedMsgSize, msgSize)
}
if ms.fstore.opts.DoCRC {
crc := util.ByteOrder.Uint32(header[4:recordHeaderSize])
// check CRC against what was stored
if c := crc32.Checksum(buf[recordHeaderSize:totalSize], ms.fstore.crcTable); c != crc {
return buf, fmt.Errorf("corrupted data, expected crc to be 0x%08x, got 0x%08x", crc, c)
}
}
return buf, nil
}

// Read from disk and returns the message for the given `seq` but perform
// a single disk I/O with large buffer to read possibly several subsequent
// messages. These are added to the cache which will speed up lookups for
// next message sequence.
func (ms *FileMsgStore) readAheadMsgs(fslice *fileSlice, seq uint64) (*pb.MsgProto, error) {

// Compute the offset in the index file itself.
idxFileOffset := 4 + (int64(seq-fslice.firstSeq)+int64(fslice.rmCount))*msgIndexRecSize
// Then position the file pointer of the index file.
if _, err := fslice.idxFile.handle.Seek(idxFileOffset, io.SeekStart); err != nil {
return nil, err
}

// Keep track of individual size of messages we are going to bulk read
var _msgSizes [2048]int
msgSizes := _msgSizes[:0]

// Keep track of total number of messages and size
totalMsgs := 0
totalMsgsSize := 0
firstMsgOffset := int64(0)

// Indexes are fixed size. Read a page (4K) worth of them.
// Loop if needed if total message size is below readBufSize.
idxsBuf := [128 * msgIndexRecSize]byte{}
idxSeq := seq
maxIdxs := int(fslice.lastSeq - seq + 1)
done := false
idxBufSize := cap(idxsBuf)
for iter := 0; !done; iter++ {
if (iter+1)*128 > maxIdxs {
idxBufSize = (maxIdxs - (iter * 128)) * msgIndexRecSize
// Will be done after reading this page of indexes
done = true
}
b, err := io.ReadFull(fslice.idxFile.handle, idxsBuf[:idxBufSize])
if err != nil {
return nil, err
}
roffset := 0
numIdxs := b / msgIndexRecSize
for i := 0; i < numIdxs; i++ {
seqInIndexFile, msgIndex, err := ms.readIndexFromBuffer(idxsBuf[roffset:])
if err != nil {
return nil, err
}
if seqInIndexFile != idxSeq {
return nil, fmt.Errorf("wrong sequence, wanted %v got %v", idxSeq, seqInIndexFile)
}
if msgIndex == nil {
return nil, nil
}
recSize := int(msgIndex.msgSize + recordHeaderSize)
// Stop when we are over the limit, but we need to read at least one message
if (iter > 0 || i > 0) && totalMsgsSize+recSize > ms.readBufSize {
done = true
break
}
if iter == 0 && i == 0 {
firstMsgOffset = msgIndex.offset
}
totalMsgs++
totalMsgsSize += recSize
msgSizes = append(msgSizes, int(msgIndex.msgSize))
roffset += msgIndexRecSize
idxSeq++
}
}

// Position the data file to the offset of the first message we need to read.
file := fslice.file.handle
if _, err := file.Seek(firstMsgOffset, io.SeekStart); err != nil {
return nil, err
}
// Use a buffer that can hold all the message(s)
ms.tmpMsgBuf = util.EnsureBufBigEnough(ms.tmpMsgBuf, totalMsgsSize)
if _, err := io.ReadFull(file, ms.tmpMsgBuf[:totalMsgsSize]); err != nil {
return nil, err
}

// We will return the first message that the lookup was for
var firstMsg *pb.MsgProto

checkCRC := ms.fstore.opts.DoCRC

// This is the read offset to move from message to message in the read buffer.
roffset := 0
for i := 0; i < totalMsgs; i++ {
msgSize := msgSizes[i]
payloadStart := roffset + recordHeaderSize
if checkCRC {
crc := util.ByteOrder.Uint32(ms.tmpMsgBuf[roffset+4 : roffset+recordHeaderSize])
// check CRC against what was stored
if c := crc32.Checksum(ms.tmpMsgBuf[payloadStart:payloadStart+msgSize], ms.fstore.crcTable); c != crc {
return nil, fmt.Errorf("corrupted data, expected crc to be 0x%08x, got 0x%08x", crc, c)
}
}
// Reconstruct message
msg := &pb.MsgProto{}
if err := msg.Unmarshal(ms.tmpMsgBuf[payloadStart : payloadStart+msgSize]); err != nil {
return nil, err
}
if i == 0 {
firstMsg = msg
}
ms.cache.add(seq, msg, false)
roffset += recordHeaderSize + msgSize
seq++
}
return firstMsg, nil
}

// Lookup returns the stored message with given sequence number.
func (ms *FileMsgStore) Lookup(seq uint64) (*pb.MsgProto, error) {
ms.Lock()
Expand Down Expand Up @@ -3401,6 +3576,7 @@ func (c *msgsCache) evict(now int64) {
// Bulk remove
c.seqMaps = make(map[uint64]*cachedMsg)
c.head, c.tail, c.tryEvict = nil, nil, 0
runtime.GC()
return
}
cMsg := c.head
Expand Down

0 comments on commit 2435bdc

Please sign in to comment.