diff --git a/server/filestore.go b/server/filestore.go index 6d071e93bc..8be56a6448 100644 --- a/server/filestore.go +++ b/server/filestore.go @@ -1032,7 +1032,7 @@ func (mb *msgBlock) rebuildStateLocked() (*LostStreamData, error) { fd = mb.mfd } else { fd, err = os.OpenFile(mb.mfn, os.O_RDWR, defaultFilePerms) - if err != nil { + if err == nil { defer fd.Close() } } @@ -1078,6 +1078,26 @@ func (mb *msgBlock) rebuildStateLocked() (*LostStreamData, error) { return gatherLost(lbuf - index), errBadMsg } + // Check for checksum failures before additional processing. + data := buf[index+msgHdrSize : index+rl] + if hh := mb.hh; hh != nil { + hh.Reset() + hh.Write(hdr[4:20]) + hh.Write(data[:slen]) + if hasHeaders { + hh.Write(data[slen+4 : dlen-recordHashSize]) + } else { + hh.Write(data[slen : dlen-recordHashSize]) + } + checksum := hh.Sum(nil) + if !bytes.Equal(checksum, data[len(data)-recordHashSize:]) { + truncate(index) + return gatherLost(lbuf - index), errBadMsg + } + copy(mb.lchk[0:], checksum) + } + + // Grab our sequence and timestamp. seq := le.Uint64(hdr[4:]) ts := int64(le.Uint64(hdr[12:])) @@ -1114,29 +1134,7 @@ func (mb *msgBlock) rebuildStateLocked() (*LostStreamData, error) { _, deleted = mb.dmap[seq] } - // Always set last. - mb.last.seq = seq - mb.last.ts = ts - if !deleted { - data := buf[index+msgHdrSize : index+rl] - if hh := mb.hh; hh != nil { - hh.Reset() - hh.Write(hdr[4:20]) - hh.Write(data[:slen]) - if hasHeaders { - hh.Write(data[slen+4 : dlen-recordHashSize]) - } else { - hh.Write(data[slen : dlen-recordHashSize]) - } - checksum := hh.Sum(nil) - if !bytes.Equal(checksum, data[len(data)-recordHashSize:]) { - truncate(index) - return gatherLost(lbuf - index), errBadMsg - } - copy(mb.lchk[0:], checksum) - } - if firstNeedsSet { firstNeedsSet, mb.first.seq, mb.first.ts = false, seq, ts } @@ -1162,6 +1160,11 @@ func (mb *msgBlock) rebuildStateLocked() (*LostStreamData, error) { mb.fssNeedsWrite = true } } + + // Always set last + mb.last.seq = seq + mb.last.ts = ts + // Advance to next record. index += rl } @@ -4646,7 +4649,7 @@ func (mb *msgBlock) msgFromBuf(buf []byte, sm *StoreMsg, hh hash.Hash64) (*Store dlen := int(rl) - msgHdrSize slen := int(le.Uint16(hdr[20:])) // Simple sanity check. - if dlen < 0 || slen > dlen || int(rl) > len(buf) { + if dlen < 0 || slen > (dlen-recordHashSize) || dlen > int(rl) || int(rl) > len(buf) { return nil, errBadMsg } data := buf[msgHdrSize : msgHdrSize+dlen] diff --git a/server/filestore_test.go b/server/filestore_test.go index 303e65cf3c..0c4fd4c99c 100644 --- a/server/filestore_test.go +++ b/server/filestore_test.go @@ -1278,7 +1278,10 @@ func TestFileStoreBitRot(t *testing.T) { // Now twiddle some bits. fs.mu.Lock() lmb := fs.lmb - contents, _ := os.ReadFile(lmb.mfn) + contents, err := os.ReadFile(lmb.mfn) + require_NoError(t, err) + require_True(t, len(contents) > 0) + var index int for { index = rand.Intn(len(contents)) @@ -1296,6 +1299,10 @@ func TestFileStoreBitRot(t *testing.T) { if len(ld.Msgs) > 0 { break } + // If our bitrot caused us to not be able to recover any messages we can break as well. + if state := fs.State(); state.Msgs == 0 { + break + } // Fail the test if we have tried the 10 times and still did not // get any corruption report. if i == 9 { @@ -1314,7 +1321,10 @@ func TestFileStoreBitRot(t *testing.T) { // checkMsgs will repair the underlying store, so checkMsgs should be clean now. if ld := fs.checkMsgs(); ld != nil { - t.Fatalf("Expected no errors restoring checked and fixed filestore, got %+v", ld) + // If we have no msgs left this will report the head msgs as lost again. + if state := fs.State(); state.Msgs > 0 { + t.Fatalf("Expected no errors restoring checked and fixed filestore, got %+v", ld) + } } }) } diff --git a/server/norace_test.go b/server/norace_test.go index 59892d002d..e3073e7ce6 100644 --- a/server/norace_test.go +++ b/server/norace_test.go @@ -3809,6 +3809,7 @@ func TestNoRaceJetStreamClusterStreamReset(t *testing.T) { // Simulate a low level write error on our consumer and make sure we can recover etc. cl = c.consumerLeader("$G", "TEST", "d1") + require_True(t, cl != nil) mset, err = cl.GlobalAccount().lookupStream("TEST") if err != nil { t.Fatalf("Unexpected error: %v", err)