diff --git a/server/conf.go b/server/conf.go index 2331746e..1f5c9d2b 100644 --- a/server/conf.go +++ b/server/conf.go @@ -696,14 +696,14 @@ func ConfigureOptions(fs *flag.FlagSet, args []string, printVersion, printHelp, sopts.MaxBytes, flagErr = getBytes(f) case "file_compact_min_size": sopts.FileStoreOpts.CompactMinFileSize, flagErr = getBytes(f) - case "file_buffer_size", "file_read_buffer_size": + case "file_buffer_size": var i64 int64 i64, flagErr = getBytes(f) - if f.Name == "file_buffer_size" { - sopts.FileStoreOpts.BufferSize = int(i64) - } else { - sopts.FileStoreOpts.ReadBufferSize = int(i64) - } + sopts.FileStoreOpts.BufferSize = int(i64) + case "file_read_buffer_size": + var i64 int64 + i64, flagErr = getBytes(f) + sopts.FileStoreOpts.ReadBufferSize = int(i64) } }) if flagErr != nil { diff --git a/stores/filestore.go b/stores/filestore.go index 432500f9..1a6d88df 100644 --- a/stores/filestore.go +++ b/stores/filestore.go @@ -23,7 +23,6 @@ import ( "os" "os/exec" "path/filepath" - "runtime" "strconv" "strings" "sync" @@ -186,8 +185,9 @@ type FileStoreOptions struct { TruncateUnexpectedEOF bool // ReadBufferSize, if non zero, will cause the store to preload messages - // (up to this total size) when looking up a message. This is speeding - // things up when performing lookup of consecutive messages. + // (up to this total size) when looking up a message. We expect that the + // client will be asking for the following sequential messages, so this + // is a read ahead optimization. ReadBufferSize int } @@ -617,6 +617,28 @@ type FileMsgStore struct { readBufSize int } +type bufferPool struct { + p sync.Pool +} + +// Get returns a pointer to slice of at least `needed` capacity. +// For reason why we use pointer to slice, check https://staticcheck.io/docs/checks#SA6002 +func (bp *bufferPool) Get(needed int) *[]byte { + pBuf, _ := bp.p.Get().(*[]byte) + if pBuf != nil && cap(*pBuf) >= needed { + return pBuf + } + buf := make([]byte, needed) + return &buf +} + +// Put back the pointer to slice back to the pool. +func (bp *bufferPool) Put(pBuf *[]byte) { + bp.p.Put(pBuf) +} + +var fsReadBufPool = &bufferPool{} + // some variables based on constants but that we can change // for tests puposes. var ( @@ -2712,7 +2734,7 @@ func (ms *FileMsgStore) Store(m *pb.MsgProto) (uint64, error) { } ms.last = seq ms.lastMsg = m - ms.cache.add(seq, m, true) + ms.cache.add(seq, m, true, false) ms.wOffset += int64(recSize) // For size, add the message record size, the record header and the size @@ -3199,7 +3221,7 @@ func (ms *FileMsgStore) lookup(seq uint64) (*pb.MsgProto, error) { bm := ms.bufferedMsgs[seq] if bm != nil { msg = bm.msg - ms.cache.add(seq, msg, false) + ms.cache.add(seq, msg, false, false) } } // If not, we need to read it from disk... @@ -3212,7 +3234,7 @@ func (ms *FileMsgStore) lookup(seq uint64) (*pb.MsgProto, error) { if err != nil { return nil, err } - if ms.readBufSize > 0 { + if ms.readBufSize > 0 && seq != fslice.lastSeq { msg, err = ms.readAheadMsgs(fslice, seq) ms.unlockFiles(fslice) return msg, err @@ -3236,7 +3258,7 @@ func (ms *FileMsgStore) lookup(seq uint64) (*pb.MsgProto, error) { if err != nil { return nil, err } - ms.cache.add(seq, msg, false) + ms.cache.add(seq, msg, false, false) } return msg, nil } @@ -3284,6 +3306,26 @@ func (ms *FileMsgStore) readAheadMsgs(fslice *fileSlice, seq uint64) (*pb.MsgPro return nil, err } + var pBuf *[]byte + pBuf = fsReadBufPool.Get(ms.readBufSize) + defer func() { + fsReadBufPool.Put(pBuf) + }() + buf := *pBuf + + numIdx := ms.readBufSize / msgIndexRecSize + if numIdx == 0 { + numIdx = 1 + } + if maxIdx := int(fslice.lastSeq - seq + 1); maxIdx < numIdx { + numIdx = maxIdx + } + idxBufSize := numIdx * msgIndexRecSize + + if _, err := io.ReadFull(fslice.idxFile.handle, buf[:idxBufSize]); err != nil { + return nil, err + } + // Keep track of individual size of messages we are going to bulk read var _msgSizes [2048]int msgSizes := _msgSizes[:0] @@ -3293,51 +3335,40 @@ func (ms *FileMsgStore) readAheadMsgs(fslice *fileSlice, seq uint64) (*pb.MsgPro totalMsgsSize := 0 firstMsgOffset := int64(0) - // Indexes are fixed size. Read a page (4K) worth of them. - // Loop if needed if total message size is below readBufSize. - idxsBuf := [128 * msgIndexRecSize]byte{} + roffset := 0 idxSeq := seq - maxIdxs := int(fslice.lastSeq - seq + 1) - done := false - idxBufSize := cap(idxsBuf) - for iter := 0; !done; iter++ { - if (iter+1)*128 > maxIdxs { - idxBufSize = (maxIdxs - (iter * 128)) * msgIndexRecSize - // Will be done after reading this page of indexes - done = true - } - b, err := io.ReadFull(fslice.idxFile.handle, idxsBuf[:idxBufSize]) + for i := 0; i < numIdx; i++ { + seqInIndexFile, msgIndex, err := ms.readIndexFromBuffer(buf[roffset:]) if err != nil { return nil, err } - roffset := 0 - numIdxs := b / msgIndexRecSize - for i := 0; i < numIdxs; i++ { - seqInIndexFile, msgIndex, err := ms.readIndexFromBuffer(idxsBuf[roffset:]) - if err != nil { - return nil, err - } - if seqInIndexFile != idxSeq { - return nil, fmt.Errorf("wrong sequence, wanted %v got %v", idxSeq, seqInIndexFile) - } - if msgIndex == nil { - return nil, nil - } - recSize := int(msgIndex.msgSize + recordHeaderSize) - // Stop when we are over the limit, but we need to read at least one message - if (iter > 0 || i > 0) && totalMsgsSize+recSize > ms.readBufSize { - done = true - break - } - if iter == 0 && i == 0 { - firstMsgOffset = msgIndex.offset - } - totalMsgs++ - totalMsgsSize += recSize - msgSizes = append(msgSizes, int(msgIndex.msgSize)) - roffset += msgIndexRecSize - idxSeq++ + if seqInIndexFile != idxSeq { + return nil, fmt.Errorf("wrong sequence, wanted %v got %v", idxSeq, seqInIndexFile) } + if msgIndex == nil { + return nil, nil + } + recSize := int(msgIndex.msgSize + recordHeaderSize) + // Stop when we are over the limit, but we need to read at least one message + if i > 0 && totalMsgsSize+recSize > ms.readBufSize { + break + } + if i == 0 { + firstMsgOffset = msgIndex.offset + } + totalMsgs++ + totalMsgsSize += recSize + msgSizes = append(msgSizes, int(msgIndex.msgSize)) + roffset += msgIndexRecSize + idxSeq++ + } + + // Case where buffer is not big enough to hold single message... + if totalMsgsSize > cap(buf) { + // Get a slice of proper size. The top-level defer function + // will ensure that this one is put back into the pool. + pBuf = fsReadBufPool.Get(totalMsgsSize) + buf = *pBuf } // Position the data file to the offset of the first message we need to read. @@ -3345,9 +3376,7 @@ func (ms *FileMsgStore) readAheadMsgs(fslice *fileSlice, seq uint64) (*pb.MsgPro if _, err := file.Seek(firstMsgOffset, io.SeekStart); err != nil { return nil, err } - // Use a buffer that can hold all the message(s) - ms.tmpMsgBuf = util.EnsureBufBigEnough(ms.tmpMsgBuf, totalMsgsSize) - if _, err := io.ReadFull(file, ms.tmpMsgBuf[:totalMsgsSize]); err != nil { + if _, err := io.ReadFull(file, buf[:totalMsgsSize]); err != nil { return nil, err } @@ -3356,28 +3385,28 @@ func (ms *FileMsgStore) readAheadMsgs(fslice *fileSlice, seq uint64) (*pb.MsgPro checkCRC := ms.fstore.opts.DoCRC - // This is the read offset to move from message to message in the read buffer. - roffset := 0 + roffset = 0 for i := 0; i < totalMsgs; i++ { msgSize := msgSizes[i] payloadStart := roffset + recordHeaderSize + payloadEnd := payloadStart + msgSize if checkCRC { - crc := util.ByteOrder.Uint32(ms.tmpMsgBuf[roffset+4 : roffset+recordHeaderSize]) + crc := util.ByteOrder.Uint32(buf[roffset+4 : roffset+recordHeaderSize]) // check CRC against what was stored - if c := crc32.Checksum(ms.tmpMsgBuf[payloadStart:payloadStart+msgSize], ms.fstore.crcTable); c != crc { + if c := crc32.Checksum(buf[payloadStart:payloadEnd], ms.fstore.crcTable); c != crc { return nil, fmt.Errorf("corrupted data, expected crc to be 0x%08x, got 0x%08x", crc, c) } } // Reconstruct message msg := &pb.MsgProto{} - if err := msg.Unmarshal(ms.tmpMsgBuf[payloadStart : payloadStart+msgSize]); err != nil { + if err := msg.Unmarshal(buf[payloadStart:payloadEnd]); err != nil { return nil, err } if i == 0 { firstMsg = msg } - ms.cache.add(seq, msg, false) - roffset += recordHeaderSize + msgSize + ms.cache.add(seq, msg, false, true) + roffset += msgSize + recordHeaderSize seq++ } return firstMsg, nil @@ -3506,7 +3535,14 @@ func (ms *FileMsgStore) initCache() { // add adds a message to the cache. // Store write lock is assumed held on entry -func (c *msgsCache) add(seq uint64, msg *pb.MsgProto, isNew bool) { +func (c *msgsCache) add(seq uint64, msg *pb.MsgProto, isNew, checkPresent bool) { + if checkPresent { + // If the message is already in the cache, msgsCache.get() will move + // it to the end of the list and update its expiration. + if c.get(seq) != nil { + return + } + } exp := cacheTTL if isNew { exp += msg.Timestamp @@ -3576,7 +3612,6 @@ func (c *msgsCache) evict(now int64) { // Bulk remove c.seqMaps = make(map[uint64]*cachedMsg) c.head, c.tail, c.tryEvict = nil, nil, 0 - runtime.GC() return } cMsg := c.head diff --git a/stores/filestore_msg_test.go b/stores/filestore_msg_test.go index d8365457..6fdc2d84 100644 --- a/stores/filestore_msg_test.go +++ b/stores/filestore_msg_test.go @@ -1919,6 +1919,70 @@ func TestFSReadBuffer(t *testing.T) { t.Fatalf("Expected content to be %q, got %q", i, m.Data) } } + + // Empty cache + ms.Lock() + ms.cache.empty() + ms.Unlock() + + // Lookup message 8, which should load 9 and 10 too + msgStoreLookup(t, c.Msgs, 8) + for i := uint64(8); i <= 10; i++ { + ms.Lock() + m := ms.cache.get(i) + ms.Unlock() + if m == nil { + t.Fatalf("Expected msg seq %v to be in cache, it was not", i) + } + if string(m.Data) != fmt.Sprintf("%v", i) { + t.Fatalf("Expected content to be %q, got %q", i, m.Data) + } + } + + // Now lookup message 1, ensure that cache has only 10 elements... + msgStoreLookup(t, c.Msgs, 1) + ms.Lock() + sizeCache := 0 + for cur := ms.cache.head; cur != nil; cur = cur.next { + sizeCache++ + } + ms.Unlock() + if sizeCache != 10 { + t.Fatalf("Expected cache size to be 10, got %v", sizeCache) + } + + s.Close() + cleanupFSDatastore(t) + + // Restart test but now with a read buffer that is too small for a single message + s, err = NewFileStore(testLogger, testFSDefaultDatastore, &limits, BufferSize(0), ReadBufferSize(500)) + if err != nil { + t.Fatalf("Error creating store: %v", err) + } + defer s.Close() + + payload := make([]byte, 600) + c = storeCreateChannel(t, s, "foo") + for i := uint64(1); i <= 2; i++ { + storeMsg(t, c, "foo", i, payload) + } + c.Msgs.Flush() + + // Force empty of cache + ms = c.Msgs.(*FileMsgStore) + ms.Lock() + ms.cache.empty() + ms.Unlock() + + // Lookup first message + msgStoreLookup(t, c.Msgs, 1) + // Ensure that other message is not in the cache + ms.Lock() + m = ms.cache.get(2) + ms.Unlock() + if m != nil { + t.Fatalf("Expected msg seq 2 to not be in the cache, got %v", m) + } } func TestFSReadMsgRecord(t *testing.T) {