diff --git a/server/conf.go b/server/conf.go index 5538796a..a102e914 100644 --- a/server/conf.go +++ b/server/conf.go @@ -577,11 +577,6 @@ func parseFileOptions(itf interface{}, opts *Options) error { return err } opts.FileStoreOpts.AutoSync = dur - case "record_size_limit": - if err := checkType(k, reflect.Int64, v); err != nil { - return err - } - opts.FileStoreOpts.RecordSizeLimit = int(v.(int64)) } } return nil diff --git a/server/conf_test.go b/server/conf_test.go index 96c2b3d7..e54e1631 100644 --- a/server/conf_test.go +++ b/server/conf_test.go @@ -144,9 +144,6 @@ func TestParseConfig(t *testing.T) { if opts.FileStoreOpts.AutoSync != 2*time.Minute { t.Fatalf("Expected AutoSync to be 2minutes, got %v", opts.FileStoreOpts.AutoSync) } - if opts.FileStoreOpts.RecordSizeLimit != 1024 { - t.Fatalf("Expected RecordSizeLimit to be 1024, got %v", opts.FileStoreOpts.RecordSizeLimit) - } if opts.MaxChannels != 11 { t.Fatalf("Expected MaxChannels to be 11, got %v", opts.MaxChannels) } @@ -498,7 +495,6 @@ func TestParseWrongTypes(t *testing.T) { expectFailureFor(t, "file:{parallel_recovery:false}", wrongTypeErr) expectFailureFor(t, "file:{auto_sync:123}", wrongTypeErr) expectFailureFor(t, "file:{auto_sync:\"1h:0m\"}", wrongTimeErr) - expectFailureFor(t, "file:{record_size_limit:true}", wrongTypeErr) expectFailureFor(t, "cluster:{node_id:false}", wrongTypeErr) expectFailureFor(t, "cluster:{bootstrap:1}", wrongTypeErr) expectFailureFor(t, "cluster:{peers:1}", wrongTypeErr) diff --git a/stores/filestore.go b/stores/filestore.go index af2fd394..278a58d3 100644 --- a/stores/filestore.go +++ b/stores/filestore.go @@ -196,11 +196,6 @@ type FileStoreOptions struct { // by setting DoSync to false. // Setting AutoSync to any value <= 0 will disable auto sync. AutoSync time.Duration - - // RecordSizeLimit defines the maxmimum size of a record that can be read - // from disk. Should a record corruption occur, this will prevent the server - // to allocate more memory than this value. Expressed in bytes. - RecordSizeLimit int } // This is an internal error to detect situations where we do @@ -337,15 +332,6 @@ func AutoSync(dur time.Duration) FileStoreOption { } } -// RecordSizeLimit is a FileStore option that defines the maximum size of a record -// that can be read from disk. -func RecordSizeLimit(limit int) FileStoreOption { - return func(o *FileStoreOptions) error { - o.RecordSizeLimit = limit - return nil - } -} - // SliceConfig is a FileStore option that allows the configuration of // file slice limits and optional archive script file name. func SliceConfig(maxMsgs int, maxBytes int64, maxAge time.Duration, script string) FileStoreOption { @@ -433,9 +419,6 @@ func AllOptions(opts *FileStoreOptions) FileStoreOption { if err := AutoSync(opts.AutoSync)(o); err != nil { return err } - if err := RecordSizeLimit(opts.RecordSizeLimit)(o); err != nil { - return err - } o.CompactEnabled = opts.CompactEnabled o.DoCRC = opts.DoCRC o.DoSync = opts.DoSync @@ -814,7 +797,7 @@ func writeRecord(w io.Writer, buf []byte, recType recordType, rec record, recSiz // hold the payload (expanding if necessary). Therefore, this call always // return `buf`, regardless if there is an error or not. // The caller is indicating if the record is supposed to be typed or not. -func readRecord(r io.Reader, buf []byte, recTyped bool, crcTable *crc32.Table, checkCRC bool, limit int) ([]byte, int, recordType, error) { +func readRecord(r io.Reader, buf []byte, recTyped bool, crcTable *crc32.Table, checkCRC bool, expectedSize int) ([]byte, int, recordType, error) { _header := [recordHeaderSize]byte{} header := _header[:] if _, err := io.ReadFull(r, header); err != nil { @@ -835,8 +818,8 @@ func readRecord(r io.Reader, buf []byte, recTyped bool, crcTable *crc32.Table, c return buf, 0, 0, errNeedRewind } } - if limit > 0 && recSize > limit { - return buf, 0, 0, fmt.Errorf("record size %v is greater than limit of %v bytes", recSize, limit) + if expectedSize > 0 && recSize != expectedSize { + return buf, 0, 0, fmt.Errorf("expected record size to be %v bytes, got %v bytes", expectedSize, recSize) } // Now we are going to read the payload buf = util.EnsureBufBigEnough(buf, recSize) @@ -1584,10 +1567,6 @@ func (fs *FileStore) Init(info *spb.ServerInfo) error { return nil } -func (fs *FileStore) readRecord(r io.Reader, buf []byte, recTyped bool) ([]byte, int, recordType, error) { - return readRecord(r, buf, recTyped, fs.crcTable, fs.opts.DoCRC, fs.opts.RecordSizeLimit) -} - // recoverClients reads the client files and returns an array of RecoveredClient func (fs *FileStore) recoverClients() ([]*Client, error) { var err error @@ -1602,7 +1581,7 @@ func (fs *FileStore) recoverClients() ([]*Client, error) { br := bufio.NewReaderSize(fs.clientsFile.handle, defaultBufSize) for { - buf, recSize, recType, err = fs.readRecord(br, buf, true) + buf, recSize, recType, err = readRecord(br, buf, true, fs.crcTable, fs.opts.DoCRC, 0) if err != nil { switch err { case io.EOF: @@ -1653,7 +1632,7 @@ func (fs *FileStore) recoverClients() ([]*Client, error) { // recoverServerInfo reads the server file and returns a ServerInfo structure func (fs *FileStore) recoverServerInfo() (*spb.ServerInfo, error) { info := &spb.ServerInfo{} - buf, size, _, err := fs.readRecord(fs.serverFile.handle, nil, false) + buf, size, _, err := readRecord(fs.serverFile.handle, nil, false, fs.crcTable, fs.opts.DoCRC, 0) if err != nil { if err == io.EOF { // We are done, no state recovered @@ -2396,6 +2375,10 @@ func (ms *FileMsgStore) recoverOneMsgFile(fslice *fileSlice, fseq int, useIdxFil } // No `else` here because in case of error recovering index file, we will do data file recovery if !useIdxFile { + // Get these from the file store object + crcTable := ms.fstore.crcTable + doCRC := ms.fstore.opts.DoCRC + // Create a buffered reader from the data file to speed-up recovery br := bufio.NewReaderSize(fslice.file.handle, defaultBufSize) @@ -2406,7 +2389,7 @@ func (ms *FileMsgStore) recoverOneMsgFile(fslice *fileSlice, fseq int, useIdxFil offset = int64(4) for { - ms.tmpMsgBuf, msgSize, _, err = ms.fstore.readRecord(br, ms.tmpMsgBuf, false) + ms.tmpMsgBuf, msgSize, _, err = readRecord(br, ms.tmpMsgBuf, false, crcTable, doCRC, 0) if err != nil { switch err { case io.EOF: @@ -2515,16 +2498,10 @@ func (ms *FileMsgStore) ensureLastMsgAndIndexMatch(fslice *fileSlice, seq uint64 if _, err := fd.Seek(index.offset, io.SeekStart); err != nil { return fmt.Errorf("%s: unable to set position to %v", startErr, index.offset) } - // Since we want to force the check of the CRC, we can't use ms.fstore.readRecord() - // here and have to call readRecord with appropriate options. - ms.tmpMsgBuf, msgSize, _, err = readRecord(fd, ms.tmpMsgBuf, false, ms.fstore.crcTable, true, ms.fstore.opts.RecordSizeLimit) + ms.tmpMsgBuf, msgSize, _, err = readRecord(fd, ms.tmpMsgBuf, false, ms.fstore.crcTable, true, int(index.msgSize)) if err != nil { return fmt.Errorf("%s: unable to read last record: %v", startErr, err) } - if uint32(msgSize) != index.msgSize { - return fmt.Errorf("%s: last message size in index is %v, data file is %v", - startErr, index.msgSize, msgSize) - } // Recover this message msg := &pb.MsgProto{} if err := msg.Unmarshal(ms.tmpMsgBuf[:msgSize]); err != nil { @@ -4037,7 +4014,7 @@ func (ss *FileSubStore) recoverSubscriptions() error { br := bufio.NewReaderSize(ss.file.handle, defaultBufSize) for { - ss.tmpSubBuf, recSize, recType, err = ss.fstore.readRecord(br, ss.tmpSubBuf, true) + ss.tmpSubBuf, recSize, recType, err = readRecord(br, ss.tmpSubBuf, true, ss.crcTable, ss.opts.DoCRC, 0) if err != nil { switch err { case io.EOF: diff --git a/stores/filestore_test.go b/stores/filestore_test.go index 1c4af6c9..3a4ba01a 100644 --- a/stores/filestore_test.go +++ b/stores/filestore_test.go @@ -578,7 +578,6 @@ func TestFSOptions(t *testing.T) { ParallelRecovery: 5, ReadBufferSize: 5 * 1024, AutoSync: 2 * time.Minute, - RecordSizeLimit: 1024 * 1024, } // Create the file with custom options fs, err := NewFileStore(testLogger, testFSDefaultDatastore, &testDefaultStoreLimits, @@ -595,7 +594,6 @@ func TestFSOptions(t *testing.T) { ParallelRecovery(5), ReadBufferSize(5*1024), AutoSync(2*time.Minute), - RecordSizeLimit(1024*1024), ) if err != nil { t.Fatalf("Unexpected error on file store create: %v", err) @@ -1527,9 +1525,9 @@ func TestFSReadRecord(t *testing.T) { copy(b[recordHeaderSize:], payload) r.setErrToReturn(nil) r.setContent(b) - _, recSize, _, err = readRecord(r, buf, false, crc32.IEEETable, true, 2) - if err == nil || !strings.Contains(err.Error(), "than limit of 2 bytes") { - t.Fatalf("Expected limit of 2 bytes error, got %v", err) + _, recSize, _, err = readRecord(r, buf, false, crc32.IEEETable, true, 5) + if err == nil || !strings.Contains(err.Error(), "expected record size to be 5 bytes, got 10 bytes") { + t.Fatalf("Expected record size of 5 bytes error, got %v", err) } if recSize != 0 { t.Fatalf("Expected recSize to be 0, got %v", recSize) @@ -2106,28 +2104,3 @@ func TestFSServerAndClientFilesVersionError(t *testing.T) { }) } } - -func TestFSRecordSizeLimit(t *testing.T) { - cleanupFSDatastore(t) - defer cleanupFSDatastore(t) - - s := createDefaultFileStore(t) - defer s.Close() - - c := storeCreateChannel(t, s, "foo") - // Big payload - payload := make([]byte, 10*1024) - storeMsg(t, c, "foo", 1, payload) - - s.Close() - - limits := testDefaultStoreLimits - s, err := NewFileStore(testLogger, testFSDefaultDatastore, &limits, RecordSizeLimit(1024)) - if err != nil { - t.Fatalf("Error creating file store: %v", err) - } - defer s.Close() - if _, err = s.Recover(); err == nil || !strings.Contains(err.Error(), "limit of 1024 bytes") { - t.Fatalf("Expected error about limit, got %v", err) - } -} diff --git a/test/configs/test_parse.conf b/test/configs/test_parse.conf index ae9c1b1d..0dd3829d 100644 --- a/test/configs/test_parse.conf +++ b/test/configs/test_parse.conf @@ -74,7 +74,6 @@ streaming: { parallel_recovery: 9 read_buffer_size: 10 auto_sync: "2m" - record_size_limit: 1024 } cluster: {