From 21e3d7a51213cfc1373bc5c39f0bd7e037f3f55f Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Thu, 27 Jul 2023 21:43:15 -0700 Subject: [PATCH 1/2] This is a fix for a bad msg blk detected in the field that had sequence holes. The stream had max msgs per subject of one and only one subject but had lots of messages. The stream did not recover correctly, and upon further inspection determined that a msg blk had holes, which should not be possible. We now detect the holes and deal with the situation appropriately. Heavily tested on the data dump from the field. Signed-off-by: Derek Collison --- server/filestore.go | 101 ++++++++++++++++++++++++++++++++++---------- 1 file changed, 78 insertions(+), 23 deletions(-) diff --git a/server/filestore.go b/server/filestore.go index 7140c24241..11a6859cac 100644 --- a/server/filestore.go +++ b/server/filestore.go @@ -971,6 +971,10 @@ func (mb *msgBlock) rebuildState() (*LostStreamData, error) { func (mb *msgBlock) rebuildStateLocked() (*LostStreamData, error) { startLastSeq := mb.last.seq + // Remove the .fss file and clear any cache we have set. + mb.clearCacheAndOffset() + mb.removePerSubjectInfoLocked() + buf, err := mb.loadBlock(nil) if err != nil || len(buf) == 0 { var ld *LostStreamData @@ -996,9 +1000,6 @@ func (mb *msgBlock) rebuildStateLocked() (*LostStreamData, error) { mb.last.seq, mb.last.ts = 0, 0 firstNeedsSet := true - // Remove the .fss file from disk. - mb.removePerSubjectInfoLocked() - // Check if we need to decrypt. if mb.bek != nil && len(buf) > 0 { // Recreate to reset counter. @@ -1070,12 +1071,7 @@ func (mb *msgBlock) rebuildStateLocked() (*LostStreamData, error) { rl &^= hbit dlen := int(rl) - msgHdrSize // Do some quick sanity checks here. - if dlen < 0 || int(slen) > (dlen-8) || dlen > int(rl) || rl > rlBadThresh { - truncate(index) - return gatherLost(lbuf - index), errBadMsg - } - - if index+rl > lbuf { + if dlen < 0 || int(slen) > (dlen-8) || dlen > int(rl) || index+rl > lbuf || rl > rlBadThresh { truncate(index) return gatherLost(lbuf - index), errBadMsg } @@ -1091,15 +1087,17 @@ func (mb *msgBlock) rebuildStateLocked() (*LostStreamData, error) { addToDmap(seq) } index += rl - mb.last.seq = seq - mb.last.ts = ts + if seq >= mb.first.seq { + mb.last.seq = seq + mb.last.ts = ts + } continue } // This is for when we have index info that adjusts for deleted messages // at the head. So the first.seq will be already set here. If this is larger // replace what we have with this seq. - if firstNeedsSet && seq > mb.first.seq { + if firstNeedsSet && seq >= mb.first.seq { firstNeedsSet, mb.first.seq, mb.first.ts = false, seq, ts } @@ -1165,6 +1163,11 @@ func (mb *msgBlock) rebuildStateLocked() (*LostStreamData, error) { mb.last.seq = mb.first.seq - 1 } + // Update our fss file if needed. + if len(mb.fss) > 0 { + mb.writePerSubjectInfo() + } + return nil, nil } @@ -2598,14 +2601,42 @@ func (fs *fileStore) enforceMsgPerSubjectLimit() { fs.scb = nil defer func() { fs.scb = cb }() + var numMsgs uint64 + // collect all that are not correct. needAttention := make(map[string]*psi) for subj, psi := range fs.psim { + numMsgs += psi.total if psi.total > maxMsgsPer { needAttention[subj] = psi } } + // We had an issue with a use case where psim (and hence fss) were correct but idx was not and was not properly being caught. + // So do a quick sanity check here. If we detect a skew do a rebuild then re-check. + if numMsgs != fs.state.Msgs { + // Clear any global subject state. + fs.psim = make(map[string]*psi) + for _, mb := range fs.blks { + mb.removeIndexFile() + ld, err := mb.rebuildState() + mb.writeIndexInfo() + if err != nil && ld != nil { + fs.addLostData(ld) + } + fs.populateGlobalPerSubjectInfo(mb) + } + // Rebuild fs state too. + fs.rebuildStateLocked(nil) + // Need to redo blocks that need attention. + needAttention = make(map[string]*psi) + for subj, psi := range fs.psim { + if psi.total > maxMsgsPer { + needAttention[subj] = psi + } + } + } + // Collect all the msgBlks we alter. blks := make(map[*msgBlock]struct{}) @@ -3050,8 +3081,7 @@ func (mb *msgBlock) compact() { return } - // Close cache and index file and wipe delete map, then rebuild. - mb.clearCacheAndOffset() + // Remove index file and wipe delete map, then rebuild. mb.removeIndexFileLocked() mb.deleteDmap() mb.rebuildStateLocked() @@ -3077,6 +3107,11 @@ func (mb *msgBlock) slotInfo(slot int) (uint32, uint32, bool, error) { bi := mb.cache.idx[slot] ri, hashChecked := (bi &^ hbit), (bi&hbit) != 0 + // If this is a deleted slot return here. + if bi == dbit { + return 0, 0, false, errDeletedMsg + } + // Determine record length var rl uint32 if len(mb.cache.idx) > slot+1 { @@ -4022,7 +4057,7 @@ func (fs *fileStore) selectMsgBlockForStart(minTime time.Time) *msgBlock { func (mb *msgBlock) indexCacheBuf(buf []byte) error { var le = binary.LittleEndian - var fseq uint64 + var fseq, pseq uint64 var idx []uint32 var index uint32 @@ -4055,7 +4090,7 @@ func (mb *msgBlock) indexCacheBuf(buf []byte) error { dlen := int(rl) - msgHdrSize // Do some quick sanity checks here. - if dlen < 0 || int(slen) > dlen || dlen > int(rl) || index+rl > lbuf || rl > 32*1024*1024 { + if dlen < 0 || int(slen) > (dlen-8) || dlen > int(rl) || index+rl > lbuf || rl > rlBadThresh { // This means something is off. // TODO(dlc) - Add into bad list? return errCorruptState @@ -4063,15 +4098,31 @@ func (mb *msgBlock) indexCacheBuf(buf []byte) error { // Clear erase bit. seq = seq &^ ebit - // Adjust if we guessed wrong. - if seq != 0 && seq < fseq { - fseq = seq - } + // We defer checksum checks to individual msg cache lookups to amortorize costs and // not introduce latency for first message from a newly loaded block. - idx = append(idx, index) - mb.cache.lrl = uint32(rl) - index += mb.cache.lrl + if seq >= mb.first.seq { + // Track that we do not have holes. + // Not expected but did see it in the field. + if pseq > 0 && seq != pseq+1 { + if mb.dmap == nil { + mb.dmap = make(map[uint64]struct{}) + } + for dseq := pseq + 1; dseq < seq; dseq++ { + idx = append(idx, dbit) + mb.dmap[dseq] = struct{}{} + } + } + pseq = seq + + idx = append(idx, index) + mb.cache.lrl = uint32(rl) + // Adjust if we guessed wrong. + if seq != 0 && seq < fseq { + fseq = seq + } + } + index += rl } mb.cache.buf = buf mb.cache.idx = idx @@ -4407,6 +4458,9 @@ const hbit = 1 << 31 // Used for marking erased messages sequences. const ebit = 1 << 63 +// Used to mark a bad index as deleted. +const dbit = 1 << 30 + // Will do a lookup from cache. // Lock should be held. func (mb *msgBlock) cacheLookup(seq uint64, sm *StoreMsg) (*StoreMsg, error) { @@ -4417,6 +4471,7 @@ func (mb *msgBlock) cacheLookup(seq uint64, sm *StoreMsg) (*StoreMsg, error) { // If we have a delete map check it. if mb.dmap != nil { if _, ok := mb.dmap[seq]; ok { + mb.llts = time.Now().UnixNano() return nil, errDeletedMsg } } From 92430513d3f7b95c1b3e3d22a355ba2081633ba6 Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Fri, 28 Jul 2023 04:35:33 -0700 Subject: [PATCH 2/2] Add in const for msg record hash size Signed-off-by: Derek Collison --- server/filestore.go | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/server/filestore.go b/server/filestore.go index 11a6859cac..a8607a0878 100644 --- a/server/filestore.go +++ b/server/filestore.go @@ -275,6 +275,8 @@ const ( wiThresh = int64(30 * time.Second) // Time threshold to write index info for non FIFO cases winfThresh = int64(2 * time.Second) + // Checksum size for hash for msg records. + recordHashSize = 8 ) func newFileStore(fcfg FileStoreConfig, cfg StreamConfig) (*fileStore, error) { @@ -1071,7 +1073,7 @@ func (mb *msgBlock) rebuildStateLocked() (*LostStreamData, error) { rl &^= hbit dlen := int(rl) - msgHdrSize // Do some quick sanity checks here. - if dlen < 0 || int(slen) > (dlen-8) || dlen > int(rl) || index+rl > lbuf || rl > rlBadThresh { + if dlen < 0 || int(slen) > (dlen-recordHashSize) || dlen > int(rl) || index+rl > lbuf || rl > rlBadThresh { truncate(index) return gatherLost(lbuf - index), errBadMsg } @@ -1117,12 +1119,12 @@ func (mb *msgBlock) rebuildStateLocked() (*LostStreamData, error) { hh.Write(hdr[4:20]) hh.Write(data[:slen]) if hasHeaders { - hh.Write(data[slen+4 : dlen-8]) + hh.Write(data[slen+4 : dlen-recordHashSize]) } else { - hh.Write(data[slen : dlen-8]) + hh.Write(data[slen : dlen-recordHashSize]) } checksum := hh.Sum(nil) - if !bytes.Equal(checksum, data[len(data)-8:]) { + if !bytes.Equal(checksum, data[len(data)-recordHashSize:]) { truncate(index) return gatherLost(lbuf - index), errBadMsg } @@ -4090,7 +4092,7 @@ func (mb *msgBlock) indexCacheBuf(buf []byte) error { dlen := int(rl) - msgHdrSize // Do some quick sanity checks here. - if dlen < 0 || int(slen) > (dlen-8) || dlen > int(rl) || index+rl > lbuf || rl > rlBadThresh { + if dlen < 0 || int(slen) > (dlen-recordHashSize) || dlen > int(rl) || index+rl > lbuf || rl > rlBadThresh { // This means something is off. // TODO(dlc) - Add into bad list? return errCorruptState @@ -4614,9 +4616,9 @@ func (mb *msgBlock) msgFromBuf(buf []byte, sm *StoreMsg, hh hash.Hash64) (*Store hh.Write(hdr[4:20]) hh.Write(data[:slen]) if hasHeaders { - hh.Write(data[slen+4 : dlen-8]) + hh.Write(data[slen+4 : dlen-recordHashSize]) } else { - hh.Write(data[slen : dlen-8]) + hh.Write(data[slen : dlen-recordHashSize]) } if !bytes.Equal(hh.Sum(nil), data[len(data)-8:]) { return nil, errBadMsg