Skip to content

Commit

Permalink
Updates based on comments
Browse files Browse the repository at this point in the history
Still reading the indexes from file, but in a single I/O operation.
Tried to have indexes in memory to avoid that read and did not see
a dramatic advantage (at least in a simple test). Will revisit
that later.

Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
  • Loading branch information
kozlovic committed Jul 9, 2019
1 parent 68f19eb commit 6eaeb57
Show file tree
Hide file tree
Showing 3 changed files with 164 additions and 65 deletions.
12 changes: 6 additions & 6 deletions server/conf.go
Expand Up @@ -696,14 +696,14 @@ func ConfigureOptions(fs *flag.FlagSet, args []string, printVersion, printHelp,
sopts.MaxBytes, flagErr = getBytes(f)
case "file_compact_min_size":
sopts.FileStoreOpts.CompactMinFileSize, flagErr = getBytes(f)
case "file_buffer_size", "file_read_buffer_size":
case "file_buffer_size":
var i64 int64
i64, flagErr = getBytes(f)
if f.Name == "file_buffer_size" {
sopts.FileStoreOpts.BufferSize = int(i64)
} else {
sopts.FileStoreOpts.ReadBufferSize = int(i64)
}
sopts.FileStoreOpts.BufferSize = int(i64)
case "file_read_buffer_size":
var i64 int64
i64, flagErr = getBytes(f)
sopts.FileStoreOpts.ReadBufferSize = int(i64)
}
})
if flagErr != nil {
Expand Down
153 changes: 94 additions & 59 deletions stores/filestore.go
Expand Up @@ -23,7 +23,6 @@ import (
"os"
"os/exec"
"path/filepath"
"runtime"
"strconv"
"strings"
"sync"
Expand Down Expand Up @@ -186,8 +185,9 @@ type FileStoreOptions struct {
TruncateUnexpectedEOF bool

// ReadBufferSize, if non zero, will cause the store to preload messages
// (up to this total size) when looking up a message. This is speeding
// things up when performing lookup of consecutive messages.
// (up to this total size) when looking up a message. We expect that the
// client will be asking for the following sequential messages, so this
// is a read ahead optimization.
ReadBufferSize int
}

Expand Down Expand Up @@ -617,6 +617,28 @@ type FileMsgStore struct {
readBufSize int
}

type bufferPool struct {
p sync.Pool
}

// Get returns a pointer to slice of at least `needed` capacity.
// For reason why we use pointer to slice, check https://staticcheck.io/docs/checks#SA6002
func (bp *bufferPool) Get(needed int) *[]byte {
pBuf, _ := bp.p.Get().(*[]byte)
if pBuf != nil && cap(*pBuf) >= needed {
return pBuf
}
buf := make([]byte, needed)
return &buf
}

// Put back the pointer to slice back to the pool.
func (bp *bufferPool) Put(pBuf *[]byte) {
bp.p.Put(pBuf)
}

var fsReadBufPool = &bufferPool{}

// some variables based on constants but that we can change
// for tests puposes.
var (
Expand Down Expand Up @@ -2712,7 +2734,7 @@ func (ms *FileMsgStore) Store(m *pb.MsgProto) (uint64, error) {
}
ms.last = seq
ms.lastMsg = m
ms.cache.add(seq, m, true)
ms.cache.add(seq, m, true, false)
ms.wOffset += int64(recSize)

// For size, add the message record size, the record header and the size
Expand Down Expand Up @@ -3199,7 +3221,7 @@ func (ms *FileMsgStore) lookup(seq uint64) (*pb.MsgProto, error) {
bm := ms.bufferedMsgs[seq]
if bm != nil {
msg = bm.msg
ms.cache.add(seq, msg, false)
ms.cache.add(seq, msg, false, false)
}
}
// If not, we need to read it from disk...
Expand All @@ -3212,7 +3234,7 @@ func (ms *FileMsgStore) lookup(seq uint64) (*pb.MsgProto, error) {
if err != nil {
return nil, err
}
if ms.readBufSize > 0 {
if ms.readBufSize > 0 && seq != fslice.lastSeq {
msg, err = ms.readAheadMsgs(fslice, seq)
ms.unlockFiles(fslice)
return msg, err
Expand All @@ -3236,7 +3258,7 @@ func (ms *FileMsgStore) lookup(seq uint64) (*pb.MsgProto, error) {
if err != nil {
return nil, err
}
ms.cache.add(seq, msg, false)
ms.cache.add(seq, msg, false, false)
}
return msg, nil
}
Expand Down Expand Up @@ -3284,6 +3306,26 @@ func (ms *FileMsgStore) readAheadMsgs(fslice *fileSlice, seq uint64) (*pb.MsgPro
return nil, err
}

var pBuf *[]byte
pBuf = fsReadBufPool.Get(ms.readBufSize)
defer func() {
fsReadBufPool.Put(pBuf)
}()
buf := *pBuf

numIdx := ms.readBufSize / msgIndexRecSize
if numIdx == 0 {
numIdx = 1
}
if maxIdx := int(fslice.lastSeq - seq + 1); maxIdx < numIdx {
numIdx = maxIdx
}
idxBufSize := numIdx * msgIndexRecSize

if _, err := io.ReadFull(fslice.idxFile.handle, buf[:idxBufSize]); err != nil {
return nil, err
}

// Keep track of individual size of messages we are going to bulk read
var _msgSizes [2048]int
msgSizes := _msgSizes[:0]
Expand All @@ -3293,61 +3335,48 @@ func (ms *FileMsgStore) readAheadMsgs(fslice *fileSlice, seq uint64) (*pb.MsgPro
totalMsgsSize := 0
firstMsgOffset := int64(0)

// Indexes are fixed size. Read a page (4K) worth of them.
// Loop if needed if total message size is below readBufSize.
idxsBuf := [128 * msgIndexRecSize]byte{}
roffset := 0
idxSeq := seq
maxIdxs := int(fslice.lastSeq - seq + 1)
done := false
idxBufSize := cap(idxsBuf)
for iter := 0; !done; iter++ {
if (iter+1)*128 > maxIdxs {
idxBufSize = (maxIdxs - (iter * 128)) * msgIndexRecSize
// Will be done after reading this page of indexes
done = true
}
b, err := io.ReadFull(fslice.idxFile.handle, idxsBuf[:idxBufSize])
for i := 0; i < numIdx; i++ {
seqInIndexFile, msgIndex, err := ms.readIndexFromBuffer(buf[roffset:])
if err != nil {
return nil, err
}
roffset := 0
numIdxs := b / msgIndexRecSize
for i := 0; i < numIdxs; i++ {
seqInIndexFile, msgIndex, err := ms.readIndexFromBuffer(idxsBuf[roffset:])
if err != nil {
return nil, err
}
if seqInIndexFile != idxSeq {
return nil, fmt.Errorf("wrong sequence, wanted %v got %v", idxSeq, seqInIndexFile)
}
if msgIndex == nil {
return nil, nil
}
recSize := int(msgIndex.msgSize + recordHeaderSize)
// Stop when we are over the limit, but we need to read at least one message
if (iter > 0 || i > 0) && totalMsgsSize+recSize > ms.readBufSize {
done = true
break
}
if iter == 0 && i == 0 {
firstMsgOffset = msgIndex.offset
}
totalMsgs++
totalMsgsSize += recSize
msgSizes = append(msgSizes, int(msgIndex.msgSize))
roffset += msgIndexRecSize
idxSeq++
if seqInIndexFile != idxSeq {
return nil, fmt.Errorf("wrong sequence, wanted %v got %v", idxSeq, seqInIndexFile)
}
if msgIndex == nil {
return nil, nil
}
recSize := int(msgIndex.msgSize + recordHeaderSize)
// Stop when we are over the limit, but we need to read at least one message
if i > 0 && totalMsgsSize+recSize > ms.readBufSize {
break
}
if i == 0 {
firstMsgOffset = msgIndex.offset
}
totalMsgs++
totalMsgsSize += recSize
msgSizes = append(msgSizes, int(msgIndex.msgSize))
roffset += msgIndexRecSize
idxSeq++
}

// Case where buffer is not big enough to hold single message...
if totalMsgsSize > cap(buf) {
// Get a slice of proper size. The top-level defer function
// will ensure that this one is put back into the pool.
pBuf = fsReadBufPool.Get(totalMsgsSize)
buf = *pBuf
}

// Position the data file to the offset of the first message we need to read.
file := fslice.file.handle
if _, err := file.Seek(firstMsgOffset, io.SeekStart); err != nil {
return nil, err
}
// Use a buffer that can hold all the message(s)
ms.tmpMsgBuf = util.EnsureBufBigEnough(ms.tmpMsgBuf, totalMsgsSize)
if _, err := io.ReadFull(file, ms.tmpMsgBuf[:totalMsgsSize]); err != nil {
if _, err := io.ReadFull(file, buf[:totalMsgsSize]); err != nil {
return nil, err
}

Expand All @@ -3356,28 +3385,28 @@ func (ms *FileMsgStore) readAheadMsgs(fslice *fileSlice, seq uint64) (*pb.MsgPro

checkCRC := ms.fstore.opts.DoCRC

// This is the read offset to move from message to message in the read buffer.
roffset := 0
roffset = 0
for i := 0; i < totalMsgs; i++ {
msgSize := msgSizes[i]
payloadStart := roffset + recordHeaderSize
payloadEnd := payloadStart + msgSize
if checkCRC {
crc := util.ByteOrder.Uint32(ms.tmpMsgBuf[roffset+4 : roffset+recordHeaderSize])
crc := util.ByteOrder.Uint32(buf[roffset+4 : roffset+recordHeaderSize])
// check CRC against what was stored
if c := crc32.Checksum(ms.tmpMsgBuf[payloadStart:payloadStart+msgSize], ms.fstore.crcTable); c != crc {
if c := crc32.Checksum(buf[payloadStart:payloadEnd], ms.fstore.crcTable); c != crc {
return nil, fmt.Errorf("corrupted data, expected crc to be 0x%08x, got 0x%08x", crc, c)
}
}
// Reconstruct message
msg := &pb.MsgProto{}
if err := msg.Unmarshal(ms.tmpMsgBuf[payloadStart : payloadStart+msgSize]); err != nil {
if err := msg.Unmarshal(buf[payloadStart:payloadEnd]); err != nil {
return nil, err
}
if i == 0 {
firstMsg = msg
}
ms.cache.add(seq, msg, false)
roffset += recordHeaderSize + msgSize
ms.cache.add(seq, msg, false, true)
roffset += msgSize + recordHeaderSize
seq++
}
return firstMsg, nil
Expand Down Expand Up @@ -3506,7 +3535,14 @@ func (ms *FileMsgStore) initCache() {

// add adds a message to the cache.
// Store write lock is assumed held on entry
func (c *msgsCache) add(seq uint64, msg *pb.MsgProto, isNew bool) {
func (c *msgsCache) add(seq uint64, msg *pb.MsgProto, isNew, checkPresent bool) {
if checkPresent {
// If the message is already in the cache, msgsCache.get() will move
// it to the end of the list and update its expiration.
if c.get(seq) != nil {
return
}
}
exp := cacheTTL
if isNew {
exp += msg.Timestamp
Expand Down Expand Up @@ -3576,7 +3612,6 @@ func (c *msgsCache) evict(now int64) {
// Bulk remove
c.seqMaps = make(map[uint64]*cachedMsg)
c.head, c.tail, c.tryEvict = nil, nil, 0
runtime.GC()
return
}
cMsg := c.head
Expand Down
64 changes: 64 additions & 0 deletions stores/filestore_msg_test.go
Expand Up @@ -1919,6 +1919,70 @@ func TestFSReadBuffer(t *testing.T) {
t.Fatalf("Expected content to be %q, got %q", i, m.Data)
}
}

// Empty cache
ms.Lock()
ms.cache.empty()
ms.Unlock()

// Lookup message 8, which should load 9 and 10 too
msgStoreLookup(t, c.Msgs, 8)
for i := uint64(8); i <= 10; i++ {
ms.Lock()
m := ms.cache.get(i)
ms.Unlock()
if m == nil {
t.Fatalf("Expected msg seq %v to be in cache, it was not", i)
}
if string(m.Data) != fmt.Sprintf("%v", i) {
t.Fatalf("Expected content to be %q, got %q", i, m.Data)
}
}

// Now lookup message 1, ensure that cache has only 10 elements...
msgStoreLookup(t, c.Msgs, 1)
ms.Lock()
sizeCache := 0
for cur := ms.cache.head; cur != nil; cur = cur.next {
sizeCache++
}
ms.Unlock()
if sizeCache != 10 {
t.Fatalf("Expected cache size to be 10, got %v", sizeCache)
}

s.Close()
cleanupFSDatastore(t)

// Restart test but now with a read buffer that is too small for a single message
s, err = NewFileStore(testLogger, testFSDefaultDatastore, &limits, BufferSize(0), ReadBufferSize(500))
if err != nil {
t.Fatalf("Error creating store: %v", err)
}
defer s.Close()

payload := make([]byte, 600)
c = storeCreateChannel(t, s, "foo")
for i := uint64(1); i <= 2; i++ {
storeMsg(t, c, "foo", i, payload)
}
c.Msgs.Flush()

// Force empty of cache
ms = c.Msgs.(*FileMsgStore)
ms.Lock()
ms.cache.empty()
ms.Unlock()

// Lookup first message
msgStoreLookup(t, c.Msgs, 1)
// Ensure that other message is not in the cache
ms.Lock()
m = ms.cache.get(2)
ms.Unlock()
if m != nil {
t.Fatalf("Expected msg seq 2 to not be in the cache, got %v", m)
}
}

func TestFSReadMsgRecord(t *testing.T) {
Expand Down

0 comments on commit 6eaeb57

Please sign in to comment.