Skip to content

Commit

Permalink
[FIXED] Check expected record size before loading the payload
Browse files Browse the repository at this point in the history
Reverted addition of record_size_limit

But still address the memory usage caused by a corrupted data message
on recovery.

By using the expected record size from the index file, when checking
that the last message matches the index information, we would find
out that the index's stored message record size does not match the
record size in the ".dat" file and would not allocate the memory
to read the rest of the message.

The record_size_limit that was added to solve that issue would have
likely caused a lot of issues if mis-used.

Resolves #1255

Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
  • Loading branch information
kozlovic committed Jul 29, 2022
1 parent 2f0c582 commit 3983687
Show file tree
Hide file tree
Showing 5 changed files with 15 additions and 75 deletions.
5 changes: 0 additions & 5 deletions server/conf.go
Expand Up @@ -577,11 +577,6 @@ func parseFileOptions(itf interface{}, opts *Options) error {
return err
}
opts.FileStoreOpts.AutoSync = dur
case "record_size_limit":
if err := checkType(k, reflect.Int64, v); err != nil {
return err
}
opts.FileStoreOpts.RecordSizeLimit = int(v.(int64))
}
}
return nil
Expand Down
4 changes: 0 additions & 4 deletions server/conf_test.go
Expand Up @@ -144,9 +144,6 @@ func TestParseConfig(t *testing.T) {
if opts.FileStoreOpts.AutoSync != 2*time.Minute {
t.Fatalf("Expected AutoSync to be 2minutes, got %v", opts.FileStoreOpts.AutoSync)
}
if opts.FileStoreOpts.RecordSizeLimit != 1024 {
t.Fatalf("Expected RecordSizeLimit to be 1024, got %v", opts.FileStoreOpts.RecordSizeLimit)
}
if opts.MaxChannels != 11 {
t.Fatalf("Expected MaxChannels to be 11, got %v", opts.MaxChannels)
}
Expand Down Expand Up @@ -498,7 +495,6 @@ func TestParseWrongTypes(t *testing.T) {
expectFailureFor(t, "file:{parallel_recovery:false}", wrongTypeErr)
expectFailureFor(t, "file:{auto_sync:123}", wrongTypeErr)
expectFailureFor(t, "file:{auto_sync:\"1h:0m\"}", wrongTimeErr)
expectFailureFor(t, "file:{record_size_limit:true}", wrongTypeErr)
expectFailureFor(t, "cluster:{node_id:false}", wrongTypeErr)
expectFailureFor(t, "cluster:{bootstrap:1}", wrongTypeErr)
expectFailureFor(t, "cluster:{peers:1}", wrongTypeErr)
Expand Down
47 changes: 12 additions & 35 deletions stores/filestore.go
Expand Up @@ -196,11 +196,6 @@ type FileStoreOptions struct {
// by setting DoSync to false.
// Setting AutoSync to any value <= 0 will disable auto sync.
AutoSync time.Duration

// RecordSizeLimit defines the maxmimum size of a record that can be read
// from disk. Should a record corruption occur, this will prevent the server
// to allocate more memory than this value. Expressed in bytes.
RecordSizeLimit int
}

// This is an internal error to detect situations where we do
Expand Down Expand Up @@ -337,15 +332,6 @@ func AutoSync(dur time.Duration) FileStoreOption {
}
}

// RecordSizeLimit is a FileStore option that defines the maximum size of a record
// that can be read from disk.
func RecordSizeLimit(limit int) FileStoreOption {
return func(o *FileStoreOptions) error {
o.RecordSizeLimit = limit
return nil
}
}

// SliceConfig is a FileStore option that allows the configuration of
// file slice limits and optional archive script file name.
func SliceConfig(maxMsgs int, maxBytes int64, maxAge time.Duration, script string) FileStoreOption {
Expand Down Expand Up @@ -433,9 +419,6 @@ func AllOptions(opts *FileStoreOptions) FileStoreOption {
if err := AutoSync(opts.AutoSync)(o); err != nil {
return err
}
if err := RecordSizeLimit(opts.RecordSizeLimit)(o); err != nil {
return err
}
o.CompactEnabled = opts.CompactEnabled
o.DoCRC = opts.DoCRC
o.DoSync = opts.DoSync
Expand Down Expand Up @@ -814,7 +797,7 @@ func writeRecord(w io.Writer, buf []byte, recType recordType, rec record, recSiz
// hold the payload (expanding if necessary). Therefore, this call always
// return `buf`, regardless if there is an error or not.
// The caller is indicating if the record is supposed to be typed or not.
func readRecord(r io.Reader, buf []byte, recTyped bool, crcTable *crc32.Table, checkCRC bool, limit int) ([]byte, int, recordType, error) {
func readRecord(r io.Reader, buf []byte, recTyped bool, crcTable *crc32.Table, checkCRC bool, expectedSize int) ([]byte, int, recordType, error) {
_header := [recordHeaderSize]byte{}
header := _header[:]
if _, err := io.ReadFull(r, header); err != nil {
Expand All @@ -835,8 +818,8 @@ func readRecord(r io.Reader, buf []byte, recTyped bool, crcTable *crc32.Table, c
return buf, 0, 0, errNeedRewind
}
}
if limit > 0 && recSize > limit {
return buf, 0, 0, fmt.Errorf("record size %v is greater than limit of %v bytes", recSize, limit)
if expectedSize > 0 && recSize != expectedSize {
return buf, 0, 0, fmt.Errorf("expected record size to be %v bytes, got %v bytes", expectedSize, recSize)
}
// Now we are going to read the payload
buf = util.EnsureBufBigEnough(buf, recSize)
Expand Down Expand Up @@ -1584,10 +1567,6 @@ func (fs *FileStore) Init(info *spb.ServerInfo) error {
return nil
}

func (fs *FileStore) readRecord(r io.Reader, buf []byte, recTyped bool) ([]byte, int, recordType, error) {
return readRecord(r, buf, recTyped, fs.crcTable, fs.opts.DoCRC, fs.opts.RecordSizeLimit)
}

// recoverClients reads the client files and returns an array of RecoveredClient
func (fs *FileStore) recoverClients() ([]*Client, error) {
var err error
Expand All @@ -1602,7 +1581,7 @@ func (fs *FileStore) recoverClients() ([]*Client, error) {
br := bufio.NewReaderSize(fs.clientsFile.handle, defaultBufSize)

for {
buf, recSize, recType, err = fs.readRecord(br, buf, true)
buf, recSize, recType, err = readRecord(br, buf, true, fs.crcTable, fs.opts.DoCRC, 0)
if err != nil {
switch err {
case io.EOF:
Expand Down Expand Up @@ -1653,7 +1632,7 @@ func (fs *FileStore) recoverClients() ([]*Client, error) {
// recoverServerInfo reads the server file and returns a ServerInfo structure
func (fs *FileStore) recoverServerInfo() (*spb.ServerInfo, error) {
info := &spb.ServerInfo{}
buf, size, _, err := fs.readRecord(fs.serverFile.handle, nil, false)
buf, size, _, err := readRecord(fs.serverFile.handle, nil, false, fs.crcTable, fs.opts.DoCRC, 0)
if err != nil {
if err == io.EOF {
// We are done, no state recovered
Expand Down Expand Up @@ -2396,6 +2375,10 @@ func (ms *FileMsgStore) recoverOneMsgFile(fslice *fileSlice, fseq int, useIdxFil
}
// No `else` here because in case of error recovering index file, we will do data file recovery
if !useIdxFile {
// Get these from the file store object
crcTable := ms.fstore.crcTable
doCRC := ms.fstore.opts.DoCRC

// Create a buffered reader from the data file to speed-up recovery
br := bufio.NewReaderSize(fslice.file.handle, defaultBufSize)

Expand All @@ -2406,7 +2389,7 @@ func (ms *FileMsgStore) recoverOneMsgFile(fslice *fileSlice, fseq int, useIdxFil
offset = int64(4)

for {
ms.tmpMsgBuf, msgSize, _, err = ms.fstore.readRecord(br, ms.tmpMsgBuf, false)
ms.tmpMsgBuf, msgSize, _, err = readRecord(br, ms.tmpMsgBuf, false, crcTable, doCRC, 0)
if err != nil {
switch err {
case io.EOF:
Expand Down Expand Up @@ -2515,16 +2498,10 @@ func (ms *FileMsgStore) ensureLastMsgAndIndexMatch(fslice *fileSlice, seq uint64
if _, err := fd.Seek(index.offset, io.SeekStart); err != nil {
return fmt.Errorf("%s: unable to set position to %v", startErr, index.offset)
}
// Since we want to force the check of the CRC, we can't use ms.fstore.readRecord()
// here and have to call readRecord with appropriate options.
ms.tmpMsgBuf, msgSize, _, err = readRecord(fd, ms.tmpMsgBuf, false, ms.fstore.crcTable, true, ms.fstore.opts.RecordSizeLimit)
ms.tmpMsgBuf, msgSize, _, err = readRecord(fd, ms.tmpMsgBuf, false, ms.fstore.crcTable, true, int(index.msgSize))
if err != nil {
return fmt.Errorf("%s: unable to read last record: %v", startErr, err)
}
if uint32(msgSize) != index.msgSize {
return fmt.Errorf("%s: last message size in index is %v, data file is %v",
startErr, index.msgSize, msgSize)
}
// Recover this message
msg := &pb.MsgProto{}
if err := msg.Unmarshal(ms.tmpMsgBuf[:msgSize]); err != nil {
Expand Down Expand Up @@ -4037,7 +4014,7 @@ func (ss *FileSubStore) recoverSubscriptions() error {
br := bufio.NewReaderSize(ss.file.handle, defaultBufSize)

for {
ss.tmpSubBuf, recSize, recType, err = ss.fstore.readRecord(br, ss.tmpSubBuf, true)
ss.tmpSubBuf, recSize, recType, err = readRecord(br, ss.tmpSubBuf, true, ss.crcTable, ss.opts.DoCRC, 0)
if err != nil {
switch err {
case io.EOF:
Expand Down
33 changes: 3 additions & 30 deletions stores/filestore_test.go
Expand Up @@ -578,7 +578,6 @@ func TestFSOptions(t *testing.T) {
ParallelRecovery: 5,
ReadBufferSize: 5 * 1024,
AutoSync: 2 * time.Minute,
RecordSizeLimit: 1024 * 1024,
}
// Create the file with custom options
fs, err := NewFileStore(testLogger, testFSDefaultDatastore, &testDefaultStoreLimits,
Expand All @@ -595,7 +594,6 @@ func TestFSOptions(t *testing.T) {
ParallelRecovery(5),
ReadBufferSize(5*1024),
AutoSync(2*time.Minute),
RecordSizeLimit(1024*1024),
)
if err != nil {
t.Fatalf("Unexpected error on file store create: %v", err)
Expand Down Expand Up @@ -1527,9 +1525,9 @@ func TestFSReadRecord(t *testing.T) {
copy(b[recordHeaderSize:], payload)
r.setErrToReturn(nil)
r.setContent(b)
_, recSize, _, err = readRecord(r, buf, false, crc32.IEEETable, true, 2)
if err == nil || !strings.Contains(err.Error(), "than limit of 2 bytes") {
t.Fatalf("Expected limit of 2 bytes error, got %v", err)
_, recSize, _, err = readRecord(r, buf, false, crc32.IEEETable, true, 5)
if err == nil || !strings.Contains(err.Error(), "expected record size to be 5 bytes, got 10 bytes") {
t.Fatalf("Expected record size of 5 bytes error, got %v", err)
}
if recSize != 0 {
t.Fatalf("Expected recSize to be 0, got %v", recSize)
Expand Down Expand Up @@ -2106,28 +2104,3 @@ func TestFSServerAndClientFilesVersionError(t *testing.T) {
})
}
}

func TestFSRecordSizeLimit(t *testing.T) {
cleanupFSDatastore(t)
defer cleanupFSDatastore(t)

s := createDefaultFileStore(t)
defer s.Close()

c := storeCreateChannel(t, s, "foo")
// Big payload
payload := make([]byte, 10*1024)
storeMsg(t, c, "foo", 1, payload)

s.Close()

limits := testDefaultStoreLimits
s, err := NewFileStore(testLogger, testFSDefaultDatastore, &limits, RecordSizeLimit(1024))
if err != nil {
t.Fatalf("Error creating file store: %v", err)
}
defer s.Close()
if _, err = s.Recover(); err == nil || !strings.Contains(err.Error(), "limit of 1024 bytes") {
t.Fatalf("Expected error about limit, got %v", err)
}
}
1 change: 0 additions & 1 deletion test/configs/test_parse.conf
Expand Up @@ -74,7 +74,6 @@ streaming: {
parallel_recovery: 9
read_buffer_size: 10
auto_sync: "2m"
record_size_limit: 1024
}

cluster: {
Expand Down

0 comments on commit 3983687

Please sign in to comment.