From 3983687b6971afa8c16fc38b588a4fb73ea76b7e Mon Sep 17 00:00:00 2001 From: Ivan Kozlovic Date: Fri, 29 Jul 2022 16:19:01 -0600 Subject: [PATCH] [FIXED] Check expected record size before loading the payload Reverted addition of record_size_limit But still address the memory usage caused by a corrupted data message on recovery. By using the expected record size from the index file, when checking that the last message matches the index information, we would find out that the index's stored message record size does not match the record size in the ".dat" file and would not allocate the memory to read the rest of the message. The record_size_limit that was added to solve that issue would have likely caused a lot of issues if mis-used. Resolves #1255 Signed-off-by: Ivan Kozlovic --- server/conf.go | 5 ---- server/conf_test.go | 4 --- stores/filestore.go | 47 +++++++++--------------------------- stores/filestore_test.go | 33 +++---------------------- test/configs/test_parse.conf | 1 - 5 files changed, 15 insertions(+), 75 deletions(-) diff --git a/server/conf.go b/server/conf.go index 5538796a..a102e914 100644 --- a/server/conf.go +++ b/server/conf.go @@ -577,11 +577,6 @@ func parseFileOptions(itf interface{}, opts *Options) error { return err } opts.FileStoreOpts.AutoSync = dur - case "record_size_limit": - if err := checkType(k, reflect.Int64, v); err != nil { - return err - } - opts.FileStoreOpts.RecordSizeLimit = int(v.(int64)) } } return nil diff --git a/server/conf_test.go b/server/conf_test.go index 96c2b3d7..e54e1631 100644 --- a/server/conf_test.go +++ b/server/conf_test.go @@ -144,9 +144,6 @@ func TestParseConfig(t *testing.T) { if opts.FileStoreOpts.AutoSync != 2*time.Minute { t.Fatalf("Expected AutoSync to be 2minutes, got %v", opts.FileStoreOpts.AutoSync) } - if opts.FileStoreOpts.RecordSizeLimit != 1024 { - t.Fatalf("Expected RecordSizeLimit to be 1024, got %v", opts.FileStoreOpts.RecordSizeLimit) - } if opts.MaxChannels != 11 { t.Fatalf("Expected MaxChannels to be 11, got %v", opts.MaxChannels) } @@ -498,7 +495,6 @@ func TestParseWrongTypes(t *testing.T) { expectFailureFor(t, "file:{parallel_recovery:false}", wrongTypeErr) expectFailureFor(t, "file:{auto_sync:123}", wrongTypeErr) expectFailureFor(t, "file:{auto_sync:\"1h:0m\"}", wrongTimeErr) - expectFailureFor(t, "file:{record_size_limit:true}", wrongTypeErr) expectFailureFor(t, "cluster:{node_id:false}", wrongTypeErr) expectFailureFor(t, "cluster:{bootstrap:1}", wrongTypeErr) expectFailureFor(t, "cluster:{peers:1}", wrongTypeErr) diff --git a/stores/filestore.go b/stores/filestore.go index af2fd394..278a58d3 100644 --- a/stores/filestore.go +++ b/stores/filestore.go @@ -196,11 +196,6 @@ type FileStoreOptions struct { // by setting DoSync to false. // Setting AutoSync to any value <= 0 will disable auto sync. AutoSync time.Duration - - // RecordSizeLimit defines the maxmimum size of a record that can be read - // from disk. Should a record corruption occur, this will prevent the server - // to allocate more memory than this value. Expressed in bytes. - RecordSizeLimit int } // This is an internal error to detect situations where we do @@ -337,15 +332,6 @@ func AutoSync(dur time.Duration) FileStoreOption { } } -// RecordSizeLimit is a FileStore option that defines the maximum size of a record -// that can be read from disk. -func RecordSizeLimit(limit int) FileStoreOption { - return func(o *FileStoreOptions) error { - o.RecordSizeLimit = limit - return nil - } -} - // SliceConfig is a FileStore option that allows the configuration of // file slice limits and optional archive script file name. func SliceConfig(maxMsgs int, maxBytes int64, maxAge time.Duration, script string) FileStoreOption { @@ -433,9 +419,6 @@ func AllOptions(opts *FileStoreOptions) FileStoreOption { if err := AutoSync(opts.AutoSync)(o); err != nil { return err } - if err := RecordSizeLimit(opts.RecordSizeLimit)(o); err != nil { - return err - } o.CompactEnabled = opts.CompactEnabled o.DoCRC = opts.DoCRC o.DoSync = opts.DoSync @@ -814,7 +797,7 @@ func writeRecord(w io.Writer, buf []byte, recType recordType, rec record, recSiz // hold the payload (expanding if necessary). Therefore, this call always // return `buf`, regardless if there is an error or not. // The caller is indicating if the record is supposed to be typed or not. -func readRecord(r io.Reader, buf []byte, recTyped bool, crcTable *crc32.Table, checkCRC bool, limit int) ([]byte, int, recordType, error) { +func readRecord(r io.Reader, buf []byte, recTyped bool, crcTable *crc32.Table, checkCRC bool, expectedSize int) ([]byte, int, recordType, error) { _header := [recordHeaderSize]byte{} header := _header[:] if _, err := io.ReadFull(r, header); err != nil { @@ -835,8 +818,8 @@ func readRecord(r io.Reader, buf []byte, recTyped bool, crcTable *crc32.Table, c return buf, 0, 0, errNeedRewind } } - if limit > 0 && recSize > limit { - return buf, 0, 0, fmt.Errorf("record size %v is greater than limit of %v bytes", recSize, limit) + if expectedSize > 0 && recSize != expectedSize { + return buf, 0, 0, fmt.Errorf("expected record size to be %v bytes, got %v bytes", expectedSize, recSize) } // Now we are going to read the payload buf = util.EnsureBufBigEnough(buf, recSize) @@ -1584,10 +1567,6 @@ func (fs *FileStore) Init(info *spb.ServerInfo) error { return nil } -func (fs *FileStore) readRecord(r io.Reader, buf []byte, recTyped bool) ([]byte, int, recordType, error) { - return readRecord(r, buf, recTyped, fs.crcTable, fs.opts.DoCRC, fs.opts.RecordSizeLimit) -} - // recoverClients reads the client files and returns an array of RecoveredClient func (fs *FileStore) recoverClients() ([]*Client, error) { var err error @@ -1602,7 +1581,7 @@ func (fs *FileStore) recoverClients() ([]*Client, error) { br := bufio.NewReaderSize(fs.clientsFile.handle, defaultBufSize) for { - buf, recSize, recType, err = fs.readRecord(br, buf, true) + buf, recSize, recType, err = readRecord(br, buf, true, fs.crcTable, fs.opts.DoCRC, 0) if err != nil { switch err { case io.EOF: @@ -1653,7 +1632,7 @@ func (fs *FileStore) recoverClients() ([]*Client, error) { // recoverServerInfo reads the server file and returns a ServerInfo structure func (fs *FileStore) recoverServerInfo() (*spb.ServerInfo, error) { info := &spb.ServerInfo{} - buf, size, _, err := fs.readRecord(fs.serverFile.handle, nil, false) + buf, size, _, err := readRecord(fs.serverFile.handle, nil, false, fs.crcTable, fs.opts.DoCRC, 0) if err != nil { if err == io.EOF { // We are done, no state recovered @@ -2396,6 +2375,10 @@ func (ms *FileMsgStore) recoverOneMsgFile(fslice *fileSlice, fseq int, useIdxFil } // No `else` here because in case of error recovering index file, we will do data file recovery if !useIdxFile { + // Get these from the file store object + crcTable := ms.fstore.crcTable + doCRC := ms.fstore.opts.DoCRC + // Create a buffered reader from the data file to speed-up recovery br := bufio.NewReaderSize(fslice.file.handle, defaultBufSize) @@ -2406,7 +2389,7 @@ func (ms *FileMsgStore) recoverOneMsgFile(fslice *fileSlice, fseq int, useIdxFil offset = int64(4) for { - ms.tmpMsgBuf, msgSize, _, err = ms.fstore.readRecord(br, ms.tmpMsgBuf, false) + ms.tmpMsgBuf, msgSize, _, err = readRecord(br, ms.tmpMsgBuf, false, crcTable, doCRC, 0) if err != nil { switch err { case io.EOF: @@ -2515,16 +2498,10 @@ func (ms *FileMsgStore) ensureLastMsgAndIndexMatch(fslice *fileSlice, seq uint64 if _, err := fd.Seek(index.offset, io.SeekStart); err != nil { return fmt.Errorf("%s: unable to set position to %v", startErr, index.offset) } - // Since we want to force the check of the CRC, we can't use ms.fstore.readRecord() - // here and have to call readRecord with appropriate options. - ms.tmpMsgBuf, msgSize, _, err = readRecord(fd, ms.tmpMsgBuf, false, ms.fstore.crcTable, true, ms.fstore.opts.RecordSizeLimit) + ms.tmpMsgBuf, msgSize, _, err = readRecord(fd, ms.tmpMsgBuf, false, ms.fstore.crcTable, true, int(index.msgSize)) if err != nil { return fmt.Errorf("%s: unable to read last record: %v", startErr, err) } - if uint32(msgSize) != index.msgSize { - return fmt.Errorf("%s: last message size in index is %v, data file is %v", - startErr, index.msgSize, msgSize) - } // Recover this message msg := &pb.MsgProto{} if err := msg.Unmarshal(ms.tmpMsgBuf[:msgSize]); err != nil { @@ -4037,7 +4014,7 @@ func (ss *FileSubStore) recoverSubscriptions() error { br := bufio.NewReaderSize(ss.file.handle, defaultBufSize) for { - ss.tmpSubBuf, recSize, recType, err = ss.fstore.readRecord(br, ss.tmpSubBuf, true) + ss.tmpSubBuf, recSize, recType, err = readRecord(br, ss.tmpSubBuf, true, ss.crcTable, ss.opts.DoCRC, 0) if err != nil { switch err { case io.EOF: diff --git a/stores/filestore_test.go b/stores/filestore_test.go index 1c4af6c9..3a4ba01a 100644 --- a/stores/filestore_test.go +++ b/stores/filestore_test.go @@ -578,7 +578,6 @@ func TestFSOptions(t *testing.T) { ParallelRecovery: 5, ReadBufferSize: 5 * 1024, AutoSync: 2 * time.Minute, - RecordSizeLimit: 1024 * 1024, } // Create the file with custom options fs, err := NewFileStore(testLogger, testFSDefaultDatastore, &testDefaultStoreLimits, @@ -595,7 +594,6 @@ func TestFSOptions(t *testing.T) { ParallelRecovery(5), ReadBufferSize(5*1024), AutoSync(2*time.Minute), - RecordSizeLimit(1024*1024), ) if err != nil { t.Fatalf("Unexpected error on file store create: %v", err) @@ -1527,9 +1525,9 @@ func TestFSReadRecord(t *testing.T) { copy(b[recordHeaderSize:], payload) r.setErrToReturn(nil) r.setContent(b) - _, recSize, _, err = readRecord(r, buf, false, crc32.IEEETable, true, 2) - if err == nil || !strings.Contains(err.Error(), "than limit of 2 bytes") { - t.Fatalf("Expected limit of 2 bytes error, got %v", err) + _, recSize, _, err = readRecord(r, buf, false, crc32.IEEETable, true, 5) + if err == nil || !strings.Contains(err.Error(), "expected record size to be 5 bytes, got 10 bytes") { + t.Fatalf("Expected record size of 5 bytes error, got %v", err) } if recSize != 0 { t.Fatalf("Expected recSize to be 0, got %v", recSize) @@ -2106,28 +2104,3 @@ func TestFSServerAndClientFilesVersionError(t *testing.T) { }) } } - -func TestFSRecordSizeLimit(t *testing.T) { - cleanupFSDatastore(t) - defer cleanupFSDatastore(t) - - s := createDefaultFileStore(t) - defer s.Close() - - c := storeCreateChannel(t, s, "foo") - // Big payload - payload := make([]byte, 10*1024) - storeMsg(t, c, "foo", 1, payload) - - s.Close() - - limits := testDefaultStoreLimits - s, err := NewFileStore(testLogger, testFSDefaultDatastore, &limits, RecordSizeLimit(1024)) - if err != nil { - t.Fatalf("Error creating file store: %v", err) - } - defer s.Close() - if _, err = s.Recover(); err == nil || !strings.Contains(err.Error(), "limit of 1024 bytes") { - t.Fatalf("Expected error about limit, got %v", err) - } -} diff --git a/test/configs/test_parse.conf b/test/configs/test_parse.conf index ae9c1b1d..0dd3829d 100644 --- a/test/configs/test_parse.conf +++ b/test/configs/test_parse.conf @@ -74,7 +74,6 @@ streaming: { parallel_recovery: 9 read_buffer_size: 10 auto_sync: "2m" - record_size_limit: 1024 } cluster: {