Skip to content

Commit

Permalink
[ADDED] FileStore: RecordSizeLimit to limit single record read size
Browse files Browse the repository at this point in the history
In case of memory corruption, it is possible that the record size
is way greater than it should, which would cause the server to
create a buffer of the wrong size in the attempt to read the
record. This new option will limit how big the buffer needed
to read the record from disk can be.

Resolves #1255

Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
  • Loading branch information
kozlovic committed Jul 15, 2022
1 parent 3520b47 commit f7c403b
Show file tree
Hide file tree
Showing 2 changed files with 76 additions and 17 deletions.
39 changes: 29 additions & 10 deletions stores/filestore.go
Expand Up @@ -196,6 +196,11 @@ type FileStoreOptions struct {
// by setting DoSync to false.
// Setting AutoSync to any value <= 0 will disable auto sync.
AutoSync time.Duration

// RecordSizeLimit defines the maxmimum size of a record that can be read
// from disk. Should a record corruption occur, this will prevent the server
// to allocate more memory than this value. Expressed in bytes.
RecordSizeLimit int
}

// This is an internal error to detect situations where we do
Expand Down Expand Up @@ -332,6 +337,15 @@ func AutoSync(dur time.Duration) FileStoreOption {
}
}

// RecordSizeLimit is a FileStore option that defines the maximum size of a record
// that can be read from disk.
func RecordSizeLimit(limit int) FileStoreOption {
return func(o *FileStoreOptions) error {
o.RecordSizeLimit = limit
return nil
}
}

// SliceConfig is a FileStore option that allows the configuration of
// file slice limits and optional archive script file name.
func SliceConfig(maxMsgs int, maxBytes int64, maxAge time.Duration, script string) FileStoreOption {
Expand Down Expand Up @@ -797,7 +811,7 @@ func writeRecord(w io.Writer, buf []byte, recType recordType, rec record, recSiz
// hold the payload (expanding if necessary). Therefore, this call always
// return `buf`, regardless if there is an error or not.
// The caller is indicating if the record is supposed to be typed or not.
func readRecord(r io.Reader, buf []byte, recTyped bool, crcTable *crc32.Table, checkCRC bool) ([]byte, int, recordType, error) {
func readRecord(r io.Reader, buf []byte, recTyped bool, crcTable *crc32.Table, checkCRC bool, limit int) ([]byte, int, recordType, error) {
_header := [recordHeaderSize]byte{}
header := _header[:]
if _, err := io.ReadFull(r, header); err != nil {
Expand All @@ -818,6 +832,9 @@ func readRecord(r io.Reader, buf []byte, recTyped bool, crcTable *crc32.Table, c
return buf, 0, 0, errNeedRewind
}
}
if limit > 0 && recSize > limit {
return buf, 0, 0, fmt.Errorf("record size %v is greater than limit of %v bytes", recSize, limit)
}
// Now we are going to read the payload
buf = util.EnsureBufBigEnough(buf, recSize)
if _, err := io.ReadFull(r, buf[:recSize]); err != nil {
Expand Down Expand Up @@ -1564,6 +1581,10 @@ func (fs *FileStore) Init(info *spb.ServerInfo) error {
return nil
}

func (fs *FileStore) readRecord(r io.Reader, buf []byte, recTyped bool) ([]byte, int, recordType, error) {
return readRecord(r, buf, recTyped, fs.crcTable, fs.opts.DoCRC, fs.opts.RecordSizeLimit)
}

// recoverClients reads the client files and returns an array of RecoveredClient
func (fs *FileStore) recoverClients() ([]*Client, error) {
var err error
Expand All @@ -1578,7 +1599,7 @@ func (fs *FileStore) recoverClients() ([]*Client, error) {
br := bufio.NewReaderSize(fs.clientsFile.handle, defaultBufSize)

for {
buf, recSize, recType, err = readRecord(br, buf, true, fs.crcTable, fs.opts.DoCRC)
buf, recSize, recType, err = fs.readRecord(br, buf, true)
if err != nil {
switch err {
case io.EOF:
Expand Down Expand Up @@ -1629,7 +1650,7 @@ func (fs *FileStore) recoverClients() ([]*Client, error) {
// recoverServerInfo reads the server file and returns a ServerInfo structure
func (fs *FileStore) recoverServerInfo() (*spb.ServerInfo, error) {
info := &spb.ServerInfo{}
buf, size, _, err := readRecord(fs.serverFile.handle, nil, false, fs.crcTable, fs.opts.DoCRC)
buf, size, _, err := fs.readRecord(fs.serverFile.handle, nil, false)
if err != nil {
if err == io.EOF {
// We are done, no state recovered
Expand Down Expand Up @@ -2372,10 +2393,6 @@ func (ms *FileMsgStore) recoverOneMsgFile(fslice *fileSlice, fseq int, useIdxFil
}
// No `else` here because in case of error recovering index file, we will do data file recovery
if !useIdxFile {
// Get these from the file store object
crcTable := ms.fstore.crcTable
doCRC := ms.fstore.opts.DoCRC

// Create a buffered reader from the data file to speed-up recovery
br := bufio.NewReaderSize(fslice.file.handle, defaultBufSize)

Expand All @@ -2386,7 +2403,7 @@ func (ms *FileMsgStore) recoverOneMsgFile(fslice *fileSlice, fseq int, useIdxFil
offset = int64(4)

for {
ms.tmpMsgBuf, msgSize, _, err = readRecord(br, ms.tmpMsgBuf, false, crcTable, doCRC)
ms.tmpMsgBuf, msgSize, _, err = ms.fstore.readRecord(br, ms.tmpMsgBuf, false)
if err != nil {
switch err {
case io.EOF:
Expand Down Expand Up @@ -2495,7 +2512,9 @@ func (ms *FileMsgStore) ensureLastMsgAndIndexMatch(fslice *fileSlice, seq uint64
if _, err := fd.Seek(index.offset, io.SeekStart); err != nil {
return fmt.Errorf("%s: unable to set position to %v", startErr, index.offset)
}
ms.tmpMsgBuf, msgSize, _, err = readRecord(fd, ms.tmpMsgBuf, false, ms.fstore.crcTable, true)
// Since we want to force the check of the CRC, we can't use ms.fstore.readRecord()
// here and have to call readRecord with appropriate options.
ms.tmpMsgBuf, msgSize, _, err = readRecord(fd, ms.tmpMsgBuf, false, ms.fstore.crcTable, true, ms.fstore.opts.RecordSizeLimit)
if err != nil {
return fmt.Errorf("%s: unable to read last record: %v", startErr, err)
}
Expand Down Expand Up @@ -4015,7 +4034,7 @@ func (ss *FileSubStore) recoverSubscriptions() error {
br := bufio.NewReaderSize(ss.file.handle, defaultBufSize)

for {
ss.tmpSubBuf, recSize, recType, err = readRecord(br, ss.tmpSubBuf, true, ss.crcTable, ss.opts.DoCRC)
ss.tmpSubBuf, recSize, recType, err = ss.fstore.readRecord(br, ss.tmpSubBuf, true)
if err != nil {
switch err {
case io.EOF:
Expand Down
54 changes: 47 additions & 7 deletions stores/filestore_test.go
Expand Up @@ -1397,7 +1397,7 @@ func TestFSReadRecord(t *testing.T) {
// Reader returns an error
errReturned := fmt.Errorf("Fake error")
r.setErrToReturn(errReturned)
retBuf, recSize, recType, err = readRecord(r, buf, false, crc32.IEEETable, true)
retBuf, recSize, recType, err = readRecord(r, buf, false, crc32.IEEETable, true, 0)
if !strings.Contains(err.Error(), errReturned.Error()) {
t.Fatalf("Expected error %v, got: %v", errReturned, err)
}
Expand All @@ -1417,7 +1417,7 @@ func TestFSReadRecord(t *testing.T) {
util.ByteOrder.PutUint32(header, 0)
r.setErrToReturn(nil)
r.setContent(header)
retBuf, recSize, recType, err = readRecord(r, buf, false, crc32.IEEETable, true)
retBuf, recSize, recType, err = readRecord(r, buf, false, crc32.IEEETable, true, 0)
if err == nil {
t.Fatal("Expected error got none")
}
Expand All @@ -1437,7 +1437,7 @@ func TestFSReadRecord(t *testing.T) {
copy(b[recordHeaderSize:], []byte("hello"))
r.setErrToReturn(nil)
r.setContent(b)
retBuf, recSize, recType, err = readRecord(r, buf, false, crc32.IEEETable, true)
retBuf, recSize, recType, err = readRecord(r, buf, false, crc32.IEEETable, true, 0)
if err == nil {
t.Fatal("Expected error got none")
}
Expand All @@ -1452,7 +1452,7 @@ func TestFSReadRecord(t *testing.T) {
}
// Not asking for CRC should return ok
r.setContent(b)
retBuf, recSize, recType, err = readRecord(r, buf, false, crc32.IEEETable, false)
retBuf, recSize, recType, err = readRecord(r, buf, false, crc32.IEEETable, false, 0)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
Expand All @@ -1474,7 +1474,7 @@ func TestFSReadRecord(t *testing.T) {
copy(b[recordHeaderSize:], payload)
r.setErrToReturn(nil)
r.setContent(b)
retBuf, recSize, recType, err = readRecord(r, buf, false, crc32.IEEETable, true)
retBuf, recSize, recType, err = readRecord(r, buf, false, crc32.IEEETable, true, 0)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
Expand All @@ -1492,7 +1492,7 @@ func TestFSReadRecord(t *testing.T) {
util.ByteOrder.PutUint32(b, 1<<24|10) // reuse previous buf
r.setErrToReturn(nil)
r.setContent(b)
retBuf, recSize, recType, err = readRecord(r, buf, true, crc32.IEEETable, true)
retBuf, recSize, recType, err = readRecord(r, buf, true, crc32.IEEETable, true, 0)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
Expand All @@ -1512,10 +1512,25 @@ func TestFSReadRecord(t *testing.T) {
}
// Don't call setContent since this would reset the read position
r.content = b
_, _, _, err = readRecord(r, buf, true, crc32.IEEETable, true)
_, _, _, err = readRecord(r, buf, true, crc32.IEEETable, true, 0)
if err != errNeedRewind {
t.Fatalf("Expected error %v, got %v", errNeedRewind, err)
}

// Check that record size limit is enforced
b = make([]byte, recordHeaderSize+10)
util.ByteOrder.PutUint32(b, uint32(len(payload)))
util.ByteOrder.PutUint32(b[4:recordHeaderSize], crc32.ChecksumIEEE(payload))
copy(b[recordHeaderSize:], payload)
r.setErrToReturn(nil)
r.setContent(b)
_, recSize, _, err = readRecord(r, buf, false, crc32.IEEETable, true, 2)
if err == nil || !strings.Contains(err.Error(), "than limit of 2 bytes") {
t.Fatalf("Expected limit of 2 bytes error, got %v", err)
}
if recSize != 0 {
t.Fatalf("Expected recSize to be 0, got %v", recSize)
}
}

type testWriter struct {
Expand Down Expand Up @@ -2088,3 +2103,28 @@ func TestFSServerAndClientFilesVersionError(t *testing.T) {
})
}
}

func TestFSRecordSizeLimit(t *testing.T) {
cleanupFSDatastore(t)
defer cleanupFSDatastore(t)

s := createDefaultFileStore(t)
defer s.Close()

c := storeCreateChannel(t, s, "foo")
// Big payload
payload := make([]byte, 10*1024)
storeMsg(t, c, "foo", 1, payload)

s.Close()

limits := testDefaultStoreLimits
s, err := NewFileStore(testLogger, testFSDefaultDatastore, &limits, RecordSizeLimit(1024))
if err != nil {
t.Fatalf("Error creating file store: %v", err)
}
defer s.Close()
if _, err = s.Recover(); err == nil || !strings.Contains(err.Error(), "limit of 1024 bytes") {
t.Fatalf("Expected error about limit, got %v", err)
}
}

0 comments on commit f7c403b

Please sign in to comment.