diff --git a/server/filestore.go b/server/filestore.go index 798dda82ebf..33a230dcb75 100644 --- a/server/filestore.go +++ b/server/filestore.go @@ -224,6 +224,8 @@ const ( magic = uint8(22) // Version version = uint8(1) + // New IndexInfo Version + newVersion = uint8(2) // hdrLen hdrLen = 2 // This is where we keep the streams. @@ -689,9 +691,6 @@ const ( emptyRecordLen = msgHdrSize + checksumSize ) -// This is the max room needed for index header. -const indexHdrSize = 7*binary.MaxVarintLen64 + hdrLen + checksumSize - // Lock should be held. func (fs *fileStore) noTrackSubjects() bool { return !(len(fs.psim) > 0 || len(fs.cfg.Subjects) > 0 || fs.cfg.Mirror != nil || len(fs.cfg.Sources) > 0) @@ -968,7 +967,7 @@ func (mb *msgBlock) convertToEncrypted() error { return err } if buf, err = os.ReadFile(mb.ifn); err == nil && len(buf) > 0 { - if err := checkHeader(buf); err != nil { + if err := checkNewHeader(buf); err != nil { return err } buf = mb.aek.Seal(buf[:0], mb.nonce, buf, nil) @@ -5024,11 +5023,12 @@ func (mb *msgBlock) writeIndexInfo() error { // Filestore lock and mb lock should be held. func (mb *msgBlock) writeIndexInfoLocked() error { // HEADER: magic version msgs bytes fseq fts lseq lts ndel checksum - var hdr [indexHdrSize]byte + // Make large enough to hold almost all possible maximum interior delete scenarios. + var hdr [42 * 1024]byte // Write header hdr[0] = magic - hdr[1] = version + hdr[1] = newVersion n := hdrLen n += binary.PutUvarint(hdr[n:], mb.msgs) @@ -5042,7 +5042,16 @@ func (mb *msgBlock) writeIndexInfoLocked() error { // Append a delete map if needed if !mb.dmap.IsEmpty() { - buf = append(buf, mb.genDeleteMap()...) + // Always attempt to tack it onto end. + dmap, err := mb.dmap.Encode(hdr[len(buf):]) + if err != nil { + return err + } + if len(dmap) < cap(hdr)-len(buf) { + buf = hdr[:len(buf)+len(dmap)] + } else { + buf = append(buf, dmap...) + } } // Open our FD if needed. @@ -5083,6 +5092,14 @@ func (mb *msgBlock) writeIndexInfoLocked() error { return err } +func checkNewHeader(hdr []byte) error { + if hdr == nil || len(hdr) < 2 || hdr[0] != magic || + (hdr[1] != version && hdr[1] != newVersion) { + return errCorruptState + } + return nil +} + // readIndexInfo will read in the index information for the message block. func (mb *msgBlock) readIndexInfo() error { buf, err := os.ReadFile(mb.ifn) @@ -5103,7 +5120,7 @@ func (mb *msgBlock) readIndexInfo() error { } } - if err := checkHeader(buf); err != nil { + if err := checkNewHeader(buf); err != nil { defer os.Remove(mb.ifn) return fmt.Errorf("bad index file") } @@ -5162,38 +5179,28 @@ func (mb *msgBlock) readIndexInfo() error { // Now check for presence of a delete map if dmapLen > 0 { - for i := 0; i < int(dmapLen); i++ { - seq := readSeq() - if seq == 0 { - break + // New version is encoded avl seqset. + if buf[1] == newVersion { + dmap, err := avl.Decode(buf[bi:]) + if err != nil { + return fmt.Errorf("could not decode avl dmap: %v", err) + } + mb.dmap = *dmap + } else { + // This is the old version. + for i := 0; i < int(dmapLen); i++ { + seq := readSeq() + if seq == 0 { + break + } + mb.dmap.Insert(seq + mb.first.seq) } - mb.dmap.Insert(seq + mb.first.seq) } } return nil } -func (mb *msgBlock) genDeleteMap() []byte { - if mb.dmap.IsEmpty() { - return nil - } - buf := make([]byte, mb.dmap.Size()*binary.MaxVarintLen64) - // We use first seq as an offset to cut down on size. - fseq, n := uint64(mb.first.seq), 0 - - mb.dmap.Range(func(seq uint64) bool { - // This is for lazy cleanup as the first sequence moves up. - if seq < fseq { - mb.dmap.Delete(seq) - } else { - n += binary.PutUvarint(buf[n:], seq-fseq) - } - return true - }) - return buf[:n] -} - func syncAndClose(mfd, ifd *os.File) { if mfd != nil { mfd.Sync() diff --git a/server/filestore_test.go b/server/filestore_test.go index b98c84f6307..b17ff4f089c 100644 --- a/server/filestore_test.go +++ b/server/filestore_test.go @@ -5400,3 +5400,60 @@ func TestFileStoreSubjectsTotals(t *testing.T) { t.Fatalf("Expected %d subjects for %q, got %d", expected, "*.*", len(st)) } } + +func TestFileStoreNewWriteIndexInfo(t *testing.T) { + testFileStoreAllPermutations(t, func(t *testing.T, fcfg FileStoreConfig) { + fcfg.BlockSize = defaultLargeBlockSize + + fs, err := newFileStore(fcfg, StreamConfig{Name: "zzz", Subjects: []string{"*"}, Storage: FileStorage}) + require_NoError(t, err) + defer fs.Stop() + + // Fill a block. + numToFill := 254200 + for i := 0; i < numToFill; i++ { + _, _, err := fs.StoreMsg("A", nil, []byte("OK")) + require_NoError(t, err) + } + + // Maximize interior deletes for testing the new AVL sequence set. + for seq := uint64(2); seq < uint64(numToFill); seq++ { + removed, err := fs.RemoveMsg(seq) + require_NoError(t, err) + require_True(t, removed) + } + // Grab first block + fs.mu.RLock() + mb := fs.blks[0] + fs.mu.RUnlock() + + mb.mu.Lock() + start := time.Now() + require_NoError(t, mb.writeIndexInfoLocked()) + elapsed := time.Since(start) + require_True(t, elapsed < time.Millisecond) + fi, err := os.Stat(mb.ifn) + mb.mu.Unlock() + + require_NoError(t, err) + require_True(t, fi.Size() < 34*1024) // Just over 32k + + mb.mu.Lock() + mb.dmap.Empty() + err = mb.readIndexInfo() + numMsgs := mb.msgs + firstSeq := mb.first.seq + lastSeq := mb.last.seq + mb.mu.Unlock() + // Make sure consistent. + require_NoError(t, err) + require_True(t, numMsgs == 2) + require_True(t, firstSeq == 1) + require_True(t, lastSeq == uint64(numToFill)) + + fs.Stop() + fs, err = newFileStore(fcfg, StreamConfig{Name: "zzz", Subjects: []string{"*"}, Storage: FileStorage}) + require_NoError(t, err) + defer fs.Stop() + }) +}