From 6d4304146f91473f4467673b3223e86fc101374b Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Tue, 28 Mar 2023 20:43:02 -0700 Subject: [PATCH 01/20] Bug fixes and general stability improvements. 1. Fixed a bug that would process a removal of a message after the message block was closed. 2. Improved removal of non-existant message when we know the store is empty. 3. Improved last write index size tracking when opening the file descriptor after being closed. 4. Improved Compact() by not loading messages for last block twice. 5. Improved Compact() determination of calling purge by determing last sequence under write lock. 6. Improved Compact() by only compacting underlying message block if over certain size threshold. 7. Improved Compact() by writing the index file if needed while still holding lock avoiding an unecessary re-lock. 8. Improved Compact() by not calling out to upper layers on no messages being purged. 9. Fixed a bug in Compact() that would not delete members from a block's delete map. 10. Fixed a bug in reset() when a callback was not registered (raft logs) which avoiding msg block cleanup. 11. Improved consumer store Update() call for when to avoid an outdated update. Signed-off-by: Derek Collison --- server/filestore.go | 76 ++++++++++++++++++++++++++++++--------------- 1 file changed, 51 insertions(+), 25 deletions(-) diff --git a/server/filestore.go b/server/filestore.go index 3b910e036be..59b80f7434e 100644 --- a/server/filestore.go +++ b/server/filestore.go @@ -106,6 +106,7 @@ type psi struct { } type fileStore struct { + srv *Server mu sync.RWMutex state StreamState ld *LostStreamData @@ -373,6 +374,12 @@ func newFileStoreWithCreated(fcfg FileStoreConfig, cfg StreamConfig, created tim return fs, nil } +func (fs *fileStore) registerServer(s *Server) { + fs.mu.Lock() + defer fs.mu.Unlock() + fs.srv = s +} + // Lock all existing message blocks. // Lock held on entry. func (fs *fileStore) lockAllMsgBlocks() { @@ -765,6 +772,7 @@ func (fs *fileStore) recoverMsgBlock(fi os.FileInfo, index uint32) (*msgBlock, e if ld, _ := mb.rebuildState(); ld != nil { fs.addLostData(ld) } + if mb.msgs > 0 && !mb.noTrack && fs.psim != nil { fs.populateGlobalPerSubjectInfo(mb) // Try to dump any state we needed on recovery. @@ -2625,6 +2633,16 @@ func (fs *fileStore) removeMsg(seq uint64, secure, viaLimits, needFSLock bool) ( if secure && fs.prf != nil { secure = false } + + if fs.state.Msgs == 0 { + var err = ErrStoreEOF + if seq <= fs.state.LastSeq { + err = ErrStoreMsgNotFound + } + fsUnlock() + return false, err + } + mb := fs.selectMsgBlock(seq) if mb == nil { var err = ErrStoreEOF @@ -2637,8 +2655,8 @@ func (fs *fileStore) removeMsg(seq uint64, secure, viaLimits, needFSLock bool) ( mb.mu.Lock() - // See if the sequence number is still relevant. - if seq < mb.first.seq { + // See if we are closed or the sequence number is still relevant. + if mb.closed || seq < mb.first.seq { mb.mu.Unlock() fsUnlock() return false, nil @@ -2933,6 +2951,7 @@ func (mb *msgBlock) slotInfo(slot int) (uint32, uint32, bool, error) { if mb.cache == nil || slot >= len(mb.cache.idx) { return 0, 0, false, errPartialCache } + bi := mb.cache.idx[slot] ri, hashChecked := (bi &^ hbit), (bi&hbit) != 0 @@ -3911,11 +3930,12 @@ func (mb *msgBlock) indexCacheBuf(buf []byte) error { dlen := int(rl) - msgHdrSize // Do some quick sanity checks here. - if dlen < 0 || int(slen) > dlen || dlen > int(rl) || rl > 32*1024*1024 { + if dlen < 0 || int(slen) > dlen || dlen > int(rl) || index+rl > lbuf || rl > 32*1024*1024 { // This means something is off. // TODO(dlc) - Add into bad list? return errCorruptState } + // Clear erase bit. seq = seq &^ ebit // Adjust if we guessed wrong. @@ -4756,6 +4776,9 @@ func (mb *msgBlock) writeIndexInfoLocked() error { if err != nil { return err } + if fi, _ := ifd.Stat(); fi != nil { + mb.liwsz = fi.Size() + } mb.ifd = ifd } @@ -5175,7 +5198,7 @@ func (fs *fileStore) purge(fseq uint64) (uint64, error) { // but not including the seq parameter. // Will return the number of purged messages. func (fs *fileStore) Compact(seq uint64) (uint64, error) { - if seq == 0 || seq > fs.lastSeq() { + if seq == 0 { return fs.purge(seq) } @@ -5183,15 +5206,16 @@ func (fs *fileStore) Compact(seq uint64) (uint64, error) { // We have to delete interior messages. fs.mu.Lock() + if lseq := fs.state.LastSeq; seq > lseq { + fs.mu.Unlock() + return fs.purge(seq) + } + smb := fs.selectMsgBlock(seq) if smb == nil { fs.mu.Unlock() return 0, nil } - if err := smb.loadMsgs(); err != nil { - fs.mu.Unlock() - return 0, err - } // All msgblocks up to this one can be thrown away. var deleted int @@ -5218,7 +5242,12 @@ func (fs *fileStore) Compact(seq uint64) (uint64, error) { var isEmpty bool smb.mu.Lock() - // Since we loaded before we acquired our lock, double check here under lock that we have the messages loaded. + if smb.first.seq == seq { + isEmpty = smb.msgs == 0 + goto SKIP + } + + // Make sure we have the messages loaded. if smb.cacheNotLoaded() { if err = smb.loadMsgsWithLock(); err != nil { goto SKIP @@ -5229,7 +5258,7 @@ func (fs *fileStore) Compact(seq uint64) (uint64, error) { if err == errDeletedMsg { // Update dmap. if len(smb.dmap) > 0 { - delete(smb.dmap, seq) + delete(smb.dmap, mseq) if len(smb.dmap) == 0 { smb.dmap = nil } @@ -5265,7 +5294,7 @@ func (fs *fileStore) Compact(seq uint64) (uint64, error) { // Check if we should reclaim the head space from this block. // This will be optimistic only, so don't continue if we encounter any errors here. - if smb.bytes*2 < smb.rbytes { + if smb.rbytes > compactMinimum && smb.bytes*2 < smb.rbytes { var moff uint32 moff, _, _, err = smb.slotInfo(int(smb.first.seq - smb.cache.fseq)) if err != nil || moff >= uint32(len(smb.cache.buf)) { @@ -5299,13 +5328,13 @@ func (fs *fileStore) Compact(seq uint64) (uint64, error) { } SKIP: - smb.mu.Unlock() - if !isEmpty { // Make sure to write out our index info. - smb.writeIndexInfo() + smb.writeIndexInfoLocked() } + smb.mu.Unlock() + if deleted > 0 { // Update block map. if fs.bim != nil { @@ -5329,7 +5358,7 @@ SKIP: cb := fs.scb fs.mu.Unlock() - if cb != nil { + if cb != nil && purged > 0 { cb(-int64(purged), -int64(bytes), 0, _EMPTY_) } @@ -5338,7 +5367,6 @@ SKIP: // Will completely reset our store. func (fs *fileStore) reset() error { - fs.mu.Lock() if fs.closed { fs.mu.Unlock() @@ -5352,14 +5380,12 @@ func (fs *fileStore) reset() error { var purged, bytes uint64 cb := fs.scb - if cb != nil { - for _, mb := range fs.blks { - mb.mu.Lock() - purged += mb.msgs - bytes += mb.bytes - mb.dirtyCloseWithRemove(true) - mb.mu.Unlock() - } + for _, mb := range fs.blks { + mb.mu.Lock() + purged += mb.msgs + bytes += mb.bytes + mb.dirtyCloseWithRemove(true) + mb.mu.Unlock() } // Reset @@ -6749,7 +6775,7 @@ func (o *consumerFileStore) Update(state *ConsumerState) error { defer o.mu.Unlock() // Check to see if this is an outdated update. - if state.Delivered.Consumer < o.state.Delivered.Consumer { + if state.Delivered.Consumer < o.state.Delivered.Consumer || state.AckFloor.Stream < o.state.AckFloor.Stream { return nil } From 182bf6cbae70c13e5f9dac06fd3385d4f75c52b7 Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Tue, 28 Mar 2023 21:00:49 -0700 Subject: [PATCH 02/20] Bug fixes and general stability improvements. 1. If reset ignore Applied() that are greater then our commit. 2. Improved StepDown() by placing at back of queue if preferred. 3. Improved handling of leadership transfer during StepDown(). 4. Do not store EntryLeaderTransfer records on disk. 5. Remove un-needed processing of older terms. 6. If append entry has higher term, also inherit pterm. 7. Only inherit a candidate's term if we decide to vote for them. Signed-off-by: Derek Collison --- server/filestore_test.go | 4 +- server/jetstream_cluster_3_test.go | 2 +- server/raft.go | 71 ++++++++++++++++-------------- 3 files changed, 42 insertions(+), 35 deletions(-) diff --git a/server/filestore_test.go b/server/filestore_test.go index 29e3a705a67..029cb4e9d80 100644 --- a/server/filestore_test.go +++ b/server/filestore_test.go @@ -3668,7 +3668,7 @@ func TestFileStoreFetchPerf(t *testing.T) { // https://github.com/nats-io/nats-server/issues/2936 func TestFileStoreCompactReclaimHeadSpace(t *testing.T) { testFileStoreAllPermutations(t, func(t *testing.T, fcfg FileStoreConfig) { - fcfg.BlockSize = 1024 * 1024 + fcfg.BlockSize = 4 * 1024 * 1024 fs, err := newFileStore( fcfg, @@ -3678,7 +3678,7 @@ func TestFileStoreCompactReclaimHeadSpace(t *testing.T) { defer fs.Stop() // Create random bytes for payload to test for corruption vs repeated. - msg := make([]byte, 16*1024) + msg := make([]byte, 64*1024) crand.Read(msg) // This gives us ~63 msgs in first and ~37 in second. diff --git a/server/jetstream_cluster_3_test.go b/server/jetstream_cluster_3_test.go index 5986ea4b99c..c0f028ba795 100644 --- a/server/jetstream_cluster_3_test.go +++ b/server/jetstream_cluster_3_test.go @@ -313,7 +313,7 @@ func TestJetStreamClusterDeleteConsumerWhileServerDown(t *testing.T) { // Restart. s = c.restartServer(s) - checkFor(t, time.Second, 200*time.Millisecond, func() error { + checkFor(t, 10*time.Second, 200*time.Millisecond, func() error { hs := s.healthz(&HealthzOptions{ JSEnabledOnly: false, JSServerOnly: false, diff --git a/server/raft.go b/server/raft.go index 8039d4b4232..540f80a1eb9 100644 --- a/server/raft.go +++ b/server/raft.go @@ -855,6 +855,11 @@ func (n *raft) Applied(index uint64) (entries uint64, bytes uint64) { n.Lock() defer n.Unlock() + // Ignore if not applicable. This can happen during a reset. + if index > n.commit { + return 0, 0 + } + // Ignore if already applied. if index > n.applied { n.applied = index @@ -1325,6 +1330,7 @@ func (n *raft) StepDown(preferred ...string) error { } stepdown := n.stepdown + prop := n.prop n.Unlock() if len(preferred) > 0 && maybeLeader == noLeader { @@ -1334,7 +1340,7 @@ func (n *raft) StepDown(preferred ...string) error { // If we have a new leader selected, transfer over to them. if maybeLeader != noLeader { n.debug("Selected %q for new leader", maybeLeader) - n.sendAppendEntry([]*Entry{{EntryLeaderTransfer, []byte(maybeLeader)}}) + prop.push(&Entry{EntryLeaderTransfer, []byte(maybeLeader)}) } // Force us to stepdown here. n.debug("Stepping down") @@ -2489,6 +2495,9 @@ func (n *raft) applyCommit(index uint64) error { // We pass these up as well. committed = append(committed, e) + + case EntryLeaderTransfer: + // No-op } } // Pass to the upper layers if we have normal entries. @@ -2852,15 +2861,6 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) { } } - // Ignore old terms. - if isNew && ae.term < n.term { - ar := &appendEntryResponse{n.term, n.pindex, n.id, false, _EMPTY_} - n.Unlock() - n.debug("AppendEntry ignoring old term") - n.sendRPC(ae.reply, _EMPTY_, ar.encode(arbuf)) - return - } - // If we are catching up ignore old catchup subs. // This could happen when we stall or cancel a catchup. if !isNew && catchingUp && sub != n.catchup.sub { @@ -2896,6 +2896,7 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) { // If this term is greater than ours. if ae.term > n.term { + n.pterm = ae.pterm n.term = ae.term n.vote = noVote if isNew { @@ -2915,7 +2916,7 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) { n.updateLeadChange(false) } - if ae.pterm != n.pterm || ae.pindex != n.pindex { + if (isNew && ae.pterm != n.pterm) || ae.pindex != n.pindex { // Check if this is a lower or equal index than what we were expecting. if ae.pindex <= n.pindex { n.debug("AppendEntry detected pindex less than ours: %d:%d vs %d:%d", ae.pterm, ae.pindex, n.pterm, n.pindex) @@ -3004,7 +3005,7 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) { } // Save to our WAL if we have entries. - if len(ae.entries) > 0 { + if ae.shouldStore() { // Only store if an original which will have sub != nil if sub != nil { if err := n.storeToWAL(ae); err != nil { @@ -3029,26 +3030,26 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) { n.pterm = ae.term n.pindex = ae.pindex + 1 } + } - // Check to see if we have any related entries to process here. - for _, e := range ae.entries { - switch e.Type { - case EntryLeaderTransfer: - if isNew { - maybeLeader := string(e.Data) - if maybeLeader == n.id && !n.observer && !n.paused { - n.lxfer = true - n.xferCampaign() - } + // Check to see if we have any related entries to process here. + for _, e := range ae.entries { + switch e.Type { + case EntryLeaderTransfer: + if isNew { + maybeLeader := string(e.Data) + if maybeLeader == n.id && !n.observer && !n.paused { + n.lxfer = true + n.xferCampaign() } - case EntryAddPeer: - if newPeer := string(e.Data); len(newPeer) == idLen { - // Track directly, but wait for commit to be official - if ps := n.peers[newPeer]; ps != nil { - ps.ts = time.Now().UnixNano() - } else { - n.peers[newPeer] = &lps{time.Now().UnixNano(), 0, false} - } + } + case EntryAddPeer: + if newPeer := string(e.Data); len(newPeer) == idLen { + // Track directly, but wait for commit to be official + if ps := n.peers[newPeer]; ps != nil { + ps.ts = time.Now().UnixNano() + } else { + n.peers[newPeer] = &lps{time.Now().UnixNano(), 0, false} } } } @@ -3133,6 +3134,11 @@ func (n *raft) buildAppendEntry(entries []*Entry) *appendEntry { return &appendEntry{n.id, n.term, n.commit, n.pterm, n.pindex, entries, _EMPTY_, nil, nil} } +// Determine if we should store an entry. +func (ae *appendEntry) shouldStore() bool { + return ae != nil && len(ae.entries) > 0 && ae.entries[0].Type != EntryLeaderTransfer +} + // Store our append entry to our WAL. // lock should be held. func (n *raft) storeToWAL(ae *appendEntry) error { @@ -3188,7 +3194,7 @@ func (n *raft) sendAppendEntry(entries []*Entry) { } // If we have entries store this in our wal. - if len(entries) > 0 { + if ae.shouldStore() { if err := n.storeToWAL(ae); err != nil { return } @@ -3566,8 +3572,8 @@ func (n *raft) processVoteRequest(vr *voteRequest) error { if n.state != Follower { n.debug("Stepping down from candidate, detected higher term: %d vs %d", vr.term, n.term) n.stepdown.push(noLeader) + n.term = vr.term } - n.term = vr.term n.vote = noVote n.writeTermVote() } @@ -3576,6 +3582,7 @@ func (n *raft) processVoteRequest(vr *voteRequest) error { voteOk := n.vote == noVote || n.vote == vr.candidate if voteOk && (vr.lastTerm > n.pterm || vr.lastTerm == n.pterm && vr.lastIndex >= n.pindex) { vresp.granted = true + n.term = vr.term n.vote = vr.candidate n.writeTermVote() } else { From e516c47a4b99c00a4695ce391432406684530153 Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Tue, 28 Mar 2023 21:16:04 -0700 Subject: [PATCH 03/20] Improvements to consumers attached to an interest retention stream. 1. Do not process an ack if we are closed. 2. When checking for needing an ack for a given consumer, hold lock entire time. 3. During recovery and restarts we check if we need to replay acks to the parent stream. Signed-off-by: Derek Collison --- server/consumer.go | 61 +++++++++++++++++++++++++++++++++++++++++----- 1 file changed, 55 insertions(+), 6 deletions(-) diff --git a/server/consumer.go b/server/consumer.go index 63e54673f52..1eca88d9cc1 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -2388,6 +2388,11 @@ func (o *consumer) sampleAck(sseq, dseq, dc uint64) { func (o *consumer) processAckMsg(sseq, dseq, dc uint64, doSample bool) { o.mu.Lock() + if o.closed { + o.mu.Unlock() + return + } + var sagap uint64 var needSignal bool @@ -2495,37 +2500,40 @@ func (o *consumer) needAck(sseq uint64, subj string) bool { var needAck bool var asflr, osseq uint64 var pending map[uint64]*Pending + o.mu.RLock() + defer o.mu.RUnlock() + isFiltered := o.isFiltered() // Check if we are filtered, and if so check if this is even applicable to us. - if o.isFiltered() && o.mset != nil { + if isFiltered && o.mset != nil { if subj == _EMPTY_ { var svp StoreMsg if _, err := o.mset.store.LoadMsg(sseq, &svp); err != nil { - o.mu.RUnlock() return false } subj = svp.subj } if !o.isFilteredMatch(subj) { - o.mu.RUnlock() return false } } + if isFiltered && o.mset == nil { + return false + } + if o.isLeader() { asflr, osseq = o.asflr, o.sseq pending = o.pending } else { if o.store == nil { - o.mu.RUnlock() return false } state, err := o.store.BorrowState() if err != nil || state == nil { // Fall back to what we track internally for now. needAck := sseq > o.asflr && !o.isFiltered() - o.mu.RUnlock() return needAck } // If loading state as here, the osseq is +1. @@ -2545,7 +2553,6 @@ func (o *consumer) needAck(sseq uint64, subj string) bool { } } - o.mu.RUnlock() return needAck } @@ -4555,3 +4562,45 @@ func (o *consumer) isMonitorRunning() bool { defer o.mu.Unlock() return o.inMonitor } + +// If we are a consumer of an interest or workqueue policy stream, process that state and make sure consistent. +func (o *consumer) checkStateForInterestStream() { + o.mu.Lock() + // See if we need to process this update if our parent stream is not a limits policy stream. + mset := o.mset + shouldProcessState := mset != nil && o.retention != LimitsPolicy + if !shouldProcessState { + o.mu.Unlock() + return + } + + state, err := o.store.State() + o.mu.Unlock() + + if err != nil { + return + } + + // We should make sure to update the acks. + var ss StreamState + mset.store.FastState(&ss) + + asflr := state.AckFloor.Stream + for seq := ss.FirstSeq; seq <= asflr; seq++ { + mset.ackMsg(o, seq) + } + + o.mu.RLock() + // See if we need to process this update if our parent stream is not a limits policy stream. + state, _ = o.store.State() + o.mu.RUnlock() + + // If we have pending, we will need to walk through to delivered in case we missed any of those acks as well. + if len(state.Pending) > 0 { + for seq := state.AckFloor.Stream + 1; seq <= state.Delivered.Stream; seq++ { + if _, ok := state.Pending[seq]; !ok { + mset.ackMsg(o, seq) + } + } + } +} From 5cabc365df2d6529b8c59c976127169569c5d754 Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Tue, 28 Mar 2023 21:25:10 -0700 Subject: [PATCH 04/20] General improvements around handling interest retention. 1. During ackMsg processing hold write lock to block concurrent access. 2. Check for presence of preAcks before and force removal if present. 3. Rework check for orphan msgs on startup to use checkStateForInterestStream(). Signed-off-by: Derek Collison --- server/stream.go | 106 +++++++++++++++++++++++++++-------------------- 1 file changed, 60 insertions(+), 46 deletions(-) diff --git a/server/stream.go b/server/stream.go index 34cb6119bd8..2f01654c0e5 100644 --- a/server/stream.go +++ b/server/stream.go @@ -210,6 +210,7 @@ type stream struct { qch chan struct{} active bool ddloaded bool + closed bool // Mirror mirror *sourceInfo @@ -3306,6 +3307,8 @@ func (mset *stream) setupStore(fsCfg *FileStoreConfig) error { return err } mset.store = fs + // Register our server. + fs.registerServer(s) } mset.mu.Unlock() @@ -3676,7 +3679,7 @@ var ( func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte, lseq uint64, ts int64) error { mset.mu.Lock() c, s, store := mset.client, mset.srv, mset.store - if c == nil { + if mset.closed || c == nil { mset.mu.Unlock() return nil } @@ -4449,6 +4452,7 @@ func (mset *stream) stop(deleteFlag, advisory bool) error { // Clean up consumers. mset.mu.Lock() + mset.closed = true var obs []*consumer for _, o := range mset.consumers { obs = append(obs, o) @@ -4628,6 +4632,12 @@ func (mset *stream) getPublicConsumers() []*consumer { return obs } +func (mset *stream) isInterestRetention() bool { + mset.mu.RLock() + defer mset.mu.RUnlock() + return mset.cfg.Retention != LimitsPolicy +} + // NumConsumers reports on number of active consumers for this stream. func (mset *stream) numConsumers() int { mset.mu.RLock() @@ -4817,6 +4827,7 @@ func (mset *stream) potentialFilteredConsumers() bool { return false } +// Write lock should be held here for the stream to avoid race conditions on state. func (mset *stream) checkInterest(seq uint64, obs *consumer) bool { var subj string if mset.potentialFilteredConsumers() { @@ -4838,14 +4849,23 @@ func (mset *stream) checkInterest(seq uint64, obs *consumer) bool { // ackMsg is called into from a consumer when we have a WorkQueue or Interest Retention Policy. func (mset *stream) ackMsg(o *consumer, seq uint64) { - if mset.cfg.Retention == LimitsPolicy { + if seq == 0 || mset.cfg.Retention == LimitsPolicy { + return + } + + // Don't make this RLock(). We need to have only 1 running at a time to gauge interest across all consumers. + mset.mu.Lock() + if mset.closed || mset.store == nil { + mset.mu.Unlock() return } - // Make sure this sequence is not below our first sequence. var state StreamState mset.store.FastState(&state) + + // Make sure this sequence is not below our first sequence. if seq < state.FirstSeq { + mset.mu.Unlock() return } @@ -4854,31 +4874,45 @@ func (mset *stream) ackMsg(o *consumer, seq uint64) { case WorkQueuePolicy: // Normally we just remove a message when its ack'd here but if we have direct consumers // from sources and/or mirrors we need to make sure they have delivered the msg. - mset.mu.RLock() shouldRemove = mset.directs <= 0 || !mset.checkInterest(seq, o) - mset.mu.RUnlock() case InterestPolicy: - mset.mu.RLock() shouldRemove = !mset.checkInterest(seq, o) - mset.mu.RUnlock() } - if shouldRemove { - if _, err := mset.store.RemoveMsg(seq); err == ErrStoreEOF { - // This should be rare but I have seen it. - // The ack reached us before the actual msg. - var state StreamState - mset.store.FastState(&state) - if seq >= state.LastSeq { - mset.mu.Lock() - if mset.preAcks == nil { - mset.preAcks = make(map[uint64]struct{}) - } - mset.preAcks[seq] = struct{}{} - mset.mu.Unlock() - } + // Check for existing preAcks. These will override the concept of shouldRemove. + if len(mset.preAcks) > 0 { + if _, hasAck := mset.preAcks[seq]; hasAck { + delete(mset.preAcks, seq) + shouldRemove = true } } + + // If we should remove but we know this is beyond our last we can add to preAcks here. + // The ack reached us before the actual msg. + ackBeforeMsg := shouldRemove && seq > state.LastSeq + if ackBeforeMsg { + if mset.preAcks == nil { + mset.preAcks = make(map[uint64]struct{}) + } + mset.preAcks[seq] = struct{}{} + } + mset.mu.Unlock() + + // If nothing else to do. + if !shouldRemove || ackBeforeMsg { + return + } + + // If we are here we should attempt to remove. + if _, err := mset.store.RemoveMsg(seq); err == ErrStoreEOF { + // The ack reached us before the actual msg. + mset.mu.Lock() + if mset.preAcks == nil { + mset.preAcks = make(map[uint64]struct{}) + } + mset.preAcks[seq] = struct{}{} + mset.mu.Unlock() + } } // Snapshot creates a snapshot for the stream and possibly consumers. @@ -5076,37 +5110,17 @@ func (a *Account) RestoreStream(ncfg *StreamConfig, r io.Reader) (*stream, error return mset, nil } -// This is to check for dangling messages. +// This is to check for dangling messages on interest retention streams. // Issue https://github.com/nats-io/nats-server/issues/3612 func (mset *stream) checkForOrphanMsgs() { - // We need to grab the low water mark for all consumers. - var ackFloor uint64 mset.mu.RLock() + var consumers []*consumer for _, o := range mset.consumers { - o.mu.RLock() - if o.store != nil { - if state, err := o.store.BorrowState(); err == nil { - if ackFloor == 0 || state.AckFloor.Stream < ackFloor { - ackFloor = state.AckFloor.Stream - } - } - } - o.mu.RUnlock() + consumers = append(consumers, o) } - // Grabs stream state. - var state StreamState - mset.store.FastState(&state) - s, acc := mset.srv, mset.acc mset.mu.RUnlock() - - if ackFloor > state.FirstSeq { - req := &JSApiStreamPurgeRequest{Sequence: ackFloor + 1} - purged, err := mset.purge(req) - if err != nil { - s.Warnf("stream '%s > %s' could not auto purge orphaned messages: %v", acc, mset.name(), err) - } else { - s.Debugf("stream '%s > %s' auto purged %d messages", acc, mset.name(), purged) - } + for _, o := range consumers { + o.checkStateForInterestStream() } } From 71af150448182be5998c1c77c80dbcd239774122 Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Tue, 28 Mar 2023 21:37:41 -0700 Subject: [PATCH 05/20] General improvements to interest based stream processing when acks arrive before the actual msgs. 1. If we are retention based, make sure our consumers are running before entering into monitorStream logic. 2. If we skip messages and are interest based, make sure we check for a preAck state. 3. On finalization of recovery for consumers have them check against the interest based stream. 4. Do not process ack state updates if consumer is closed and shutting down. 5. When processing final state for a stream after upper layer catchup, check all attached consumers for ack skew. 6. During catchup of stream messages consult preAck state and skip messages as needed. Signed-off-by: Derek Collison --- server/jetstream_cluster.go | 152 +++++++++++++++++++++++++----------- 1 file changed, 106 insertions(+), 46 deletions(-) diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index e4e7ba7beaa..e40c1bb0fdf 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -652,6 +652,8 @@ func (js *jetStream) setupMetaGroup() error { s.Errorf("Error creating filestore: %v", err) return err } + // Register our server. + fs.registerServer(s) cfg := &RaftConfig{Name: defaultMetaGroupName, Store: storeDir, Log: fs} @@ -785,7 +787,7 @@ func (js *jetStream) isGroupLeaderless(rg *raftGroup) bool { // If we don't have a leader. if rg.node.GroupLeader() == _EMPTY_ { // Threshold for jetstream startup. - const startupThreshold = 5 * time.Second + const startupThreshold = 10 * time.Second if rg.node.HadPreviousLeader() { // Make sure we have been running long enough to intelligently determine this. @@ -1162,8 +1164,12 @@ func (js *jetStream) monitorCluster() { } aq.recycle(&ces) case isLeader = <-lch: + // For meta layer synchronize everyone to our state on becoming leader. + if isLeader { + n.SendSnapshot(js.metaSnapshot()) + } + // Process the change. js.processLeaderChange(isLeader) - if isLeader { s.sendInternalMsgLocked(serverStatsPingReqSubj, _EMPTY_, nil, nil) // Install a snapshot as we become leader. @@ -1827,6 +1833,8 @@ func (js *jetStream) createRaftGroup(accName string, rg *raftGroup, storage Stor s.Errorf("Error creating filestore WAL: %v", err) return err } + // Register our server. + fs.registerServer(s) store = fs } else { ms, err := newMemStore(&StreamConfig{Name: rg.Name, Storage: MemoryStorage}) @@ -2041,6 +2049,31 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps } defer stopDirectMonitoring() + // Check if we are interest based and if so and we have an active stream wait until we + // have the consumers attached. This can become important when a server has lots of assets + // since we process streams first then consumers as an asset class. + if mset != nil && mset.isInterestRetention() { + js.mu.RLock() + numExpectedConsumers := len(sa.consumers) + js.mu.RUnlock() + if mset.numConsumers() < numExpectedConsumers { + s.Debugf("Waiting for consumers for interest based stream '%s > %s'", accName, mset.name()) + // Wait up to 10s + const maxWaitTime = 10 * time.Second + const sleepTime = 250 * time.Millisecond + timeout := time.Now().Add(maxWaitTime) + for time.Now().Before(timeout) { + if mset.numConsumers() >= numExpectedConsumers { + break + } + time.Sleep(sleepTime) + } + if actual := mset.numConsumers(); actual < numExpectedConsumers { + s.Warnf("All consumers not online for '%s > %s': expected %d but only have %d", accName, mset.name(), numExpectedConsumers, actual) + } + } + } + // This is triggered during a scale up from R1 to clustered mode. We need the new followers to catchup, // similar to how we trigger the catchup mechanism post a backup/restore. // We can arrive here NOT being the leader, so we send the snapshot only if we are, and in this case @@ -2050,6 +2083,7 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps n.SendSnapshot(mset.stateSnapshot()) sendSnapshot = false } + for { select { case <-s.quitCh: @@ -2185,14 +2219,6 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps case <-t.C: doSnapshot() - // Check is we have preAcks left over if we have become the leader. - if isLeader { - mset.mu.Lock() - if mset.preAcks != nil { - mset.preAcks = nil - } - mset.mu.Unlock() - } case <-uch: // keep stream assignment current @@ -2532,6 +2558,19 @@ func (js *jetStream) applyStreamEntries(mset *stream, ce *CommittedEntry, isReco if lseq-clfs < last { s.Debugf("Apply stream entries for '%s > %s' skipping message with sequence %d with last of %d", mset.account(), mset.name(), lseq+1-clfs, last) + // Check for any preAcks in case we are interest based. + mset.mu.Lock() + seq := lseq + 1 - mset.clfs + var shouldAck bool + if len(mset.preAcks) > 0 { + if _, shouldAck = mset.preAcks[seq]; shouldAck { + delete(mset.preAcks, seq) + } + } + mset.mu.Unlock() + if shouldAck { + mset.ackMsg(nil, seq) + } continue } @@ -4223,6 +4262,8 @@ func (js *jetStream) monitorConsumer(o *consumer, ca *consumerAssignment) { if n.NeedSnapshot() { doSnapshot() } + // Check our state if we are under an interest based stream. + o.checkStateForInterestStream() continue } if err := js.applyConsumerEntries(o, ce, isLeader); err == nil { @@ -4240,6 +4281,8 @@ func (js *jetStream) monitorConsumer(o *consumer, ca *consumerAssignment) { if recovering && !isLeader { js.setConsumerAssignmentRecovering(ca) } + + // Process the change. if err := js.processConsumerLeaderChange(o, isLeader); err == nil && isLeader { doSnapshot() } @@ -4348,6 +4391,7 @@ func (js *jetStream) applyConsumerEntries(o *consumer, ce *CommittedEntry, isLea } panic(err.Error()) } + if err = o.store.Update(state); err != nil { o.mu.RLock() s, acc, mset, name := o.srv, o.acc, o.mset, o.name @@ -4355,20 +4399,8 @@ func (js *jetStream) applyConsumerEntries(o *consumer, ce *CommittedEntry, isLea if s != nil && mset != nil { s.Warnf("Consumer '%s > %s > %s' error on store update from snapshot entry: %v", acc, mset.name(), name, err) } - } else if state, err := o.store.State(); err == nil { - // See if we need to process this update if our parent stream is not a limits policy stream. - o.mu.RLock() - mset := o.mset - shouldProcessAcks := mset != nil && o.retention != LimitsPolicy - o.mu.RUnlock() - // We should make sure to update the acks. - if shouldProcessAcks { - var ss StreamState - mset.store.FastState(&ss) - for seq := ss.FirstSeq; seq <= state.AckFloor.Stream; seq++ { - mset.ackMsg(o, seq) - } - } + } else { + o.checkStateForInterestStream() } } else if e.Type == EntryRemovePeer { @@ -4465,13 +4497,19 @@ func (js *jetStream) applyConsumerEntries(o *consumer, ce *CommittedEntry, isLea func (o *consumer) processReplicatedAck(dseq, sseq uint64) { o.mu.Lock() + + mset := o.mset + if o.closed || mset == nil { + o.mu.Unlock() + return + } + // Update activity. o.lat = time.Now() // Do actual ack update to store. o.store.UpdateAcks(dseq, sseq) - mset := o.mset - if mset == nil || o.retention == LimitsPolicy { + if o.retention == LimitsPolicy { o.mu.Unlock() return } @@ -7316,25 +7354,39 @@ func (mset *stream) processSnapshot(snap *streamSnapshot) (e error) { // we are synched for the next message sequence properly. lastRequested := sreq.LastSeq checkFinalState := func() { - if mset != nil { - mset.mu.Lock() - var state StreamState + // Bail if no stream. + if mset == nil { + return + } + + mset.mu.Lock() + var state StreamState + mset.store.FastState(&state) + var didReset bool + firstExpected := lastRequested + 1 + if state.FirstSeq != firstExpected { + // Reset our notion of first. + mset.store.Compact(firstExpected) mset.store.FastState(&state) - var didReset bool - firstExpected := lastRequested + 1 - if state.FirstSeq != firstExpected { - // Reset our notion of first. - mset.store.Compact(firstExpected) - mset.store.FastState(&state) - // Make sure last is also correct in case this also moved. - mset.lseq = state.LastSeq - didReset = true - } - mset.mu.Unlock() - if didReset { - s.Warnf("Catchup for stream '%s > %s' resetting first sequence: %d on catchup complete", - mset.account(), mset.name(), firstExpected) - } + // Make sure last is also correct in case this also moved. + mset.lseq = state.LastSeq + didReset = true + } + mset.mu.Unlock() + + if didReset { + s.Warnf("Catchup for stream '%s > %s' resetting first sequence: %d on catchup complete", + mset.account(), mset.name(), firstExpected) + } + + mset.mu.RLock() + var consumers []*consumer + for _, o := range mset.consumers { + consumers = append(consumers, o) + } + mset.mu.RUnlock() + for _, o := range consumers { + o.checkStateForInterestStream() } } @@ -7526,11 +7578,19 @@ func (mset *stream) processCatchupMsg(msg []byte) (uint64, error) { return 0, errCatchupBadMsg } - mset.mu.RLock() + mset.mu.Lock() st := mset.cfg.Storage ddloaded := mset.ddloaded tierName := mset.tier - mset.mu.RUnlock() + + if len(mset.preAcks) > 0 { + if _, shouldSkip := mset.preAcks[seq]; shouldSkip { + delete(mset.preAcks, seq) + // Mark this to be skipped + subj, ts = _EMPTY_, 0 + } + } + mset.mu.Unlock() if mset.js.limitsExceeded(st) { return 0, NewJSInsufficientResourcesError() From 0d9f707b4b21f52fd2e1e5305e1bf32dcbde4203 Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Tue, 28 Mar 2023 21:50:49 -0700 Subject: [PATCH 06/20] Additional tests to stress interest based streams with pull subscribers during rolling restarts. Signed-off-by: Derek Collison --- server/norace_test.go | 311 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 311 insertions(+) diff --git a/server/norace_test.go b/server/norace_test.go index f0791ba9440..ab323c79d32 100644 --- a/server/norace_test.go +++ b/server/norace_test.go @@ -7001,3 +7001,314 @@ func TestNoRaceJetStreamClusterDifferentRTTInterestBasedStreamSetup(t *testing.T default: } } + +func TestNoRaceJetStreamInterestStreamCheckInterestRaceBug(t *testing.T) { + c := createJetStreamClusterExplicit(t, "R3S", 3) + defer c.shutdown() + + nc, js := jsClientConnect(t, c.randomServer()) + defer nc.Close() + + _, err := js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"foo"}, + Replicas: 3, + Retention: nats.InterestPolicy, + }) + require_NoError(t, err) + + numConsumers := 10 + for i := 0; i < numConsumers; i++ { + nc, js := jsClientConnect(t, c.randomServer()) + defer nc.Close() + + _, err = js.Subscribe("foo", func(m *nats.Msg) { + m.Ack() + }, nats.Durable(fmt.Sprintf("C%d", i)), nats.ManualAck()) + require_NoError(t, err) + } + + numToSend := 10_000 + for i := 0; i < numToSend; i++ { + _, err := js.PublishAsync("foo", nil) + require_NoError(t, err) + } + select { + case <-js.PublishAsyncComplete(): + case <-time.After(20 * time.Second): + t.Fatalf("Did not receive completion signal") + } + + // Wait til ackfloor is correct for all consumers. + checkFor(t, 10*time.Second, 100*time.Millisecond, func() error { + for _, s := range c.servers { + mset, err := s.GlobalAccount().lookupStream("TEST") + require_NoError(t, err) + + mset.mu.RLock() + defer mset.mu.RUnlock() + + require_True(t, len(mset.consumers) == numConsumers) + + for _, o := range mset.consumers { + state, err := o.store.State() + require_NoError(t, err) + if state.AckFloor.Stream != uint64(numToSend) { + return fmt.Errorf("Ackfloor not correct yet") + } + } + } + return nil + }) + + for _, s := range c.servers { + mset, err := s.GlobalAccount().lookupStream("TEST") + require_NoError(t, err) + + mset.mu.RLock() + defer mset.mu.RUnlock() + + state := mset.state() + require_True(t, state.Msgs == 0) + require_True(t, state.FirstSeq == uint64(numToSend+1)) + } +} + +func TestNoRaceJetStreamClusterInterestStreamConsistencyAfterRollingRestart(t *testing.T) { + // Uncomment to run. Needs to be on a big machine. Do not want as part of Travis tests atm. + skip(t) + + c := createJetStreamClusterExplicit(t, "R3S", 3) + defer c.shutdown() + + numStreams := 200 + numConsumersPer := 5 + numPublishers := 10 + + nc, js := jsClientConnect(t, c.randomServer()) + defer nc.Close() + + qch := make(chan bool) + + var mm sync.Mutex + ackMap := make(map[string]map[uint64][]string) + + addAckTracking := func(seq uint64, stream, consumer string) { + mm.Lock() + defer mm.Unlock() + sam := ackMap[stream] + if sam == nil { + sam = make(map[uint64][]string) + ackMap[stream] = sam + } + sam[seq] = append(sam[seq], consumer) + } + + doPullSubscriber := func(stream, consumer, filter string) { + nc, js := jsClientConnect(t, c.randomServer()) + defer nc.Close() + + var err error + var sub *nats.Subscription + timeout := time.Now().Add(5 * time.Second) + for time.Now().Before(timeout) { + sub, err = js.PullSubscribe(filter, consumer, nats.BindStream(stream), nats.ManualAck()) + if err == nil { + break + } + } + if err != nil { + t.Logf("Error on pull subscriber: %v", err) + return + } + + for { + select { + case <-time.After(500 * time.Millisecond): + msgs, err := sub.Fetch(100, nats.MaxWait(time.Second)) + if err != nil { + continue + } + // Shuffleraf + rand.Shuffle(len(msgs), func(i, j int) { msgs[i], msgs[j] = msgs[j], msgs[i] }) + for _, m := range msgs { + meta, err := m.Metadata() + require_NoError(t, err) + m.Ack() + addAckTracking(meta.Sequence.Stream, stream, consumer) + if meta.NumDelivered > 1 { + t.Logf("Got a msg redelivered %d for sequence %d on %q %q\n", meta.NumDelivered, meta.Sequence.Stream, stream, consumer) + } + } + case <-qch: + nc.Flush() + return + } + } + } + + // Setup + wg := sync.WaitGroup{} + for i := 0; i < numStreams; i++ { + wg.Add(1) + go func(stream string) { + defer wg.Done() + subj := fmt.Sprintf("%s.>", stream) + _, err := js.AddStream(&nats.StreamConfig{ + Name: stream, + Subjects: []string{subj}, + Replicas: 3, + Retention: nats.InterestPolicy, + }) + require_NoError(t, err) + for i := 0; i < numConsumersPer; i++ { + consumer := fmt.Sprintf("C%d", i) + filter := fmt.Sprintf("%s.%d", stream, i) + _, err = js.AddConsumer(stream, &nats.ConsumerConfig{ + Durable: consumer, + FilterSubject: filter, + AckPolicy: nats.AckExplicitPolicy, + AckWait: 2 * time.Second, + }) + require_NoError(t, err) + c.waitOnConsumerLeader(globalAccountName, stream, consumer) + go doPullSubscriber(stream, consumer, filter) + } + }(fmt.Sprintf("A-%d", i)) + } + wg.Wait() + + msg := make([]byte, 2*1024) // 2k payload + rand.Read(msg) + + // Controls if publishing is on or off. + var pubActive atomic.Bool + + doPublish := func() { + nc, js := jsClientConnect(t, c.randomServer()) + defer nc.Close() + + for { + select { + case <-time.After(100 * time.Millisecond): + if pubActive.Load() { + for i := 0; i < numStreams; i++ { + for j := 0; j < numConsumersPer; j++ { + subj := fmt.Sprintf("A-%d.%d", i, j) + // Don't care about errors here for this test. + js.Publish(subj, msg) + } + } + } + case <-qch: + return + } + } + } + + pubActive.Store(true) + + for i := 0; i < numPublishers; i++ { + go doPublish() + } + + // Let run for a bit. + time.Sleep(20 * time.Second) + + // Do a rolling restart. + for _, s := range c.servers { + t.Logf("Shutdown %v\n", s) + s.Shutdown() + t.Logf("Restarting %v\n", s) + s = c.restartServer(s) + c.waitOnServerHealthz(s) + } + + // Let run for a bit longer. + time.Sleep(10 * time.Second) + + // Stop pubs. + pubActive.Store(false) + + // Let settle. + time.Sleep(10 * time.Second) + close(qch) + time.Sleep(20 * time.Second) + + nc, js = jsClientConnect(t, c.randomServer()) + defer nc.Close() + + minAckFloor := func(stream string) (uint64, string) { + var maf uint64 = math.MaxUint64 + var consumer string + for i := 0; i < numConsumersPer; i++ { + cname := fmt.Sprintf("C%d", i) + ci, err := js.ConsumerInfo(stream, cname) + require_NoError(t, err) + if ci.AckFloor.Stream < maf { + maf = ci.AckFloor.Stream + consumer = cname + } + } + return maf, consumer + } + + checkStreamAcks := func(stream string) { + mm.Lock() + defer mm.Unlock() + if sam := ackMap[stream]; sam != nil { + for seq := 1; ; seq++ { + acks := sam[uint64(seq)] + if acks == nil { + if sam[uint64(seq+1)] != nil { + t.Logf("Missing an ack on stream %q for sequence %d\n", stream, seq) + } else { + break + } + } + if len(acks) > 1 { + t.Fatalf("Multiple acks for %d which is not expected: %+v", seq, acks) + } + } + } + } + + // Now check all streams such that their first sequence is equal to the minimum of all consumers. + for i := 0; i < numStreams; i++ { + stream := fmt.Sprintf("A-%d", i) + si, err := js.StreamInfo(stream) + require_NoError(t, err) + + if maf, consumer := minAckFloor(stream); maf > si.State.FirstSeq { + t.Logf("\nBAD STATE DETECTED FOR %q, CHECKING OTHER SERVERS! ACK %d vs %+v LEADER %v, CL FOR %q %v\n", + stream, maf, si.State, c.streamLeader(globalAccountName, stream), consumer, c.consumerLeader(globalAccountName, stream, consumer)) + + checkStreamAcks(stream) + + for _, s := range c.servers { + mset, err := s.GlobalAccount().lookupStream(stream) + require_NoError(t, err) + state := mset.state() + t.Logf("Server %v Stream STATE %+v\n", s, state) + + var smv StoreMsg + if sm, err := mset.store.LoadMsg(state.FirstSeq, &smv); err == nil { + t.Logf("Subject for msg %d is %q", state.FirstSeq, sm.subj) + } else { + t.Logf("Could not retrieve msg for %d: %v", state.FirstSeq, err) + } + + if len(mset.preAcks) > 0 { + t.Logf("%v preAcks %+v\n", s, mset.preAcks) + } + + for _, o := range mset.consumers { + ostate, err := o.store.State() + require_NoError(t, err) + t.Logf("Consumer STATE for %q is %+v\n", o.name, ostate) + } + } + t.Fatalf("BAD STATE: ACKFLOOR > FIRST %d vs %d\n", maf, si.State.FirstSeq) + } + } +} From 52fbac644ca46b7aaedaa6ef28c34084911d3548 Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Tue, 28 Mar 2023 23:07:38 -0700 Subject: [PATCH 07/20] Since we no longer store leaderTransfers, which is proper, some tests were getting and advantage on that after server restart. This change speeds up raft layer more to avoid timeouts. Signed-off-by: Derek Collison --- server/jetstream_helpers_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/server/jetstream_helpers_test.go b/server/jetstream_helpers_test.go index a17038ca96c..b671602fa84 100644 --- a/server/jetstream_helpers_test.go +++ b/server/jetstream_helpers_test.go @@ -37,8 +37,8 @@ import ( func init() { // Speed up raft for tests. hbInterval = 50 * time.Millisecond - minElectionTimeout = 1 * time.Second - maxElectionTimeout = 3 * time.Second + minElectionTimeout = 250 * time.Millisecond + maxElectionTimeout = 1 * time.Second lostQuorumInterval = time.Second lostQuorumCheck = 4 * hbInterval } From e97ddcd14fcd002478cb758621277f16f773edb1 Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Tue, 28 Mar 2023 23:40:54 -0700 Subject: [PATCH 08/20] Tweak tests due to changes, make test timeouts uniform. Signed-off-by: Derek Collison --- server/jetstream_cluster_1_test.go | 1 + server/jetstream_cluster_2_test.go | 2 +- server/jetstream_helpers_test.go | 14 +++++++------- server/norace_test.go | 2 +- 4 files changed, 10 insertions(+), 9 deletions(-) diff --git a/server/jetstream_cluster_1_test.go b/server/jetstream_cluster_1_test.go index b899f0e3de3..7afe0ae0905 100644 --- a/server/jetstream_cluster_1_test.go +++ b/server/jetstream_cluster_1_test.go @@ -971,6 +971,7 @@ func TestJetStreamClusterRestoreSingleConsumer(t *testing.T) { c.stopAll() c.restartAll() c.waitOnLeader() + c.waitOnStreamLeader("$G", "foo") s = c.randomServer() nc, js = jsClientConnect(t, s) diff --git a/server/jetstream_cluster_2_test.go b/server/jetstream_cluster_2_test.go index bd530ca1002..c2d756242cd 100644 --- a/server/jetstream_cluster_2_test.go +++ b/server/jetstream_cluster_2_test.go @@ -4322,7 +4322,7 @@ func TestJetStreamClusterStreamReplicaUpdates(t *testing.T) { require_NoError(t, err) c.waitOnStreamLeader("$G", "TEST") - checkFor(t, 5*time.Second, 100*time.Millisecond, func() error { + checkFor(t, 10*time.Second, 100*time.Millisecond, func() error { si, err = js.StreamInfo("TEST") require_NoError(t, err) if len(si.Cluster.Replicas) != r-1 { diff --git a/server/jetstream_helpers_test.go b/server/jetstream_helpers_test.go index b671602fa84..2dd0612eef2 100644 --- a/server/jetstream_helpers_test.go +++ b/server/jetstream_helpers_test.go @@ -37,9 +37,9 @@ import ( func init() { // Speed up raft for tests. hbInterval = 50 * time.Millisecond - minElectionTimeout = 250 * time.Millisecond - maxElectionTimeout = 1 * time.Second - lostQuorumInterval = time.Second + minElectionTimeout = 750 * time.Millisecond + maxElectionTimeout = 2500 * time.Millisecond + lostQuorumInterval = 500 * time.Millisecond lostQuorumCheck = 4 * hbInterval } @@ -509,7 +509,7 @@ func (sc *supercluster) leader() *Server { func (sc *supercluster) waitOnLeader() { sc.t.Helper() - expires := time.Now().Add(10 * time.Second) + expires := time.Now().Add(30 * time.Second) for time.Now().Before(expires) { for _, c := range sc.clusters { if leader := c.leader(); leader != nil { @@ -548,7 +548,7 @@ func (sc *supercluster) waitOnPeerCount(n int) { sc.t.Helper() sc.waitOnLeader() leader := sc.leader() - expires := time.Now().Add(20 * time.Second) + expires := time.Now().Add(30 * time.Second) for time.Now().Before(expires) { peers := leader.JetStreamClusterPeers() if len(peers) == n { @@ -1237,7 +1237,7 @@ func (c *cluster) waitOnPeerCount(n int) { func (c *cluster) waitOnConsumerLeader(account, stream, consumer string) { c.t.Helper() - expires := time.Now().Add(20 * time.Second) + expires := time.Now().Add(30 * time.Second) for time.Now().Before(expires) { if leader := c.consumerLeader(account, stream, consumer); leader != nil { time.Sleep(200 * time.Millisecond) @@ -1329,7 +1329,7 @@ func (c *cluster) waitOnServerHealthz(s *Server) { func (c *cluster) waitOnServerCurrent(s *Server) { c.t.Helper() - expires := time.Now().Add(20 * time.Second) + expires := time.Now().Add(30 * time.Second) for time.Now().Before(expires) { time.Sleep(100 * time.Millisecond) if !s.JetStreamEnabled() || s.JetStreamIsCurrent() { diff --git a/server/norace_test.go b/server/norace_test.go index ab323c79d32..b6b137dc056 100644 --- a/server/norace_test.go +++ b/server/norace_test.go @@ -5267,7 +5267,7 @@ func TestNoRaceJetStreamClusterDirectAccessAllPeersSubs(t *testing.T) { t.Fatalf("Expected to see messages increase, got %d", si.State.Msgs) } - checkFor(t, 2*time.Second, 100*time.Millisecond, func() error { + checkFor(t, 10*time.Second, 100*time.Millisecond, func() error { // Make sure they are all the same from a state perspective. // Leader will have the expected state. lmset, err := c.streamLeader("$G", "TEST").GlobalAccount().lookupStream("TEST") From c4da37ecc7e3120427983498886b4db1e96ebfaf Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Wed, 29 Mar 2023 07:35:00 -0700 Subject: [PATCH 09/20] Make sure consumer is valid and state was returned Signed-off-by: Derek Collison --- server/consumer.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/server/consumer.go b/server/consumer.go index 1eca88d9cc1..ff4449afa21 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -4569,7 +4569,7 @@ func (o *consumer) checkStateForInterestStream() { // See if we need to process this update if our parent stream is not a limits policy stream. mset := o.mset shouldProcessState := mset != nil && o.retention != LimitsPolicy - if !shouldProcessState { + if o.closed || !shouldProcessState { o.mu.Unlock() return } @@ -4596,7 +4596,7 @@ func (o *consumer) checkStateForInterestStream() { o.mu.RUnlock() // If we have pending, we will need to walk through to delivered in case we missed any of those acks as well. - if len(state.Pending) > 0 { + if state != nil && len(state.Pending) > 0 { for seq := state.AckFloor.Stream + 1; seq <= state.Delivered.Stream; seq++ { if _, ok := state.Pending[seq]; !ok { mset.ackMsg(o, seq) From 35d1a7747aa54907261f9207adc4ba15096c819b Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Wed, 29 Mar 2023 07:59:41 -0700 Subject: [PATCH 10/20] Snapshots of no length can hold state as well Signed-off-by: Derek Collison --- server/raft.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/raft.go b/server/raft.go index 540f80a1eb9..64df8226f2c 100644 --- a/server/raft.go +++ b/server/raft.go @@ -944,7 +944,7 @@ func (n *raft) InstallSnapshot(data []byte) error { var state StreamState n.wal.FastState(&state) - if n.applied == 0 || len(data) == 0 { + if n.applied == 0 { n.Unlock() return errNoSnapAvailable } From a9a4df859fad6d1606d0223f8a8522ae838deb5b Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Wed, 29 Mar 2023 11:01:51 -0700 Subject: [PATCH 11/20] Fix for flapping test Signed-off-by: Derek Collison --- server/jetstream_cluster_1_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/jetstream_cluster_1_test.go b/server/jetstream_cluster_1_test.go index 7afe0ae0905..20566f013d0 100644 --- a/server/jetstream_cluster_1_test.go +++ b/server/jetstream_cluster_1_test.go @@ -3720,6 +3720,7 @@ func TestJetStreamClusterAccountPurge(t *testing.T) { resolver: { type: full dir: '%s/jwt' + timeout: "10ms" }`, ojwt, syspub, storeDir) }) defer c.shutdown() @@ -3892,7 +3893,6 @@ func TestJetStreamClusterAccountPurge(t *testing.T) { } c.restartAll() checkForDirs(t, 6, 4) - c.waitOnClusterReady() // unfortunately, this does not wait until leader is not catching up. purge(t) checkForDirs(t, 0, 0) c.stopAll() From ddfa5cdfec1b7dc4af32d3fca8a66fc58506146f Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Wed, 29 Mar 2023 11:05:47 -0700 Subject: [PATCH 12/20] Additional protection for bad state when rebuilding a message block Signed-off-by: Derek Collison --- server/filestore.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/filestore.go b/server/filestore.go index 59b80f7434e..8e1856ed5ea 100644 --- a/server/filestore.go +++ b/server/filestore.go @@ -1044,7 +1044,7 @@ func (mb *msgBlock) rebuildStateLocked() (*LostStreamData, error) { rl &^= hbit dlen := int(rl) - msgHdrSize // Do some quick sanity checks here. - if dlen < 0 || int(slen) > dlen || dlen > int(rl) || rl > rlBadThresh { + if dlen < 0 || int(slen) > (dlen-8) || dlen > int(rl) || rl > rlBadThresh { truncate(index) return gatherLost(lbuf - index), errBadMsg } From 6c3e64b83b6cedf28a598a0e83071e230d945adb Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Wed, 29 Mar 2023 13:56:04 -0700 Subject: [PATCH 13/20] Always make sure cluster and meta raft node available when needed Signed-off-by: Derek Collison --- server/jetstream_cluster.go | 36 ++++++++++++++++++++++++++++++++++-- 1 file changed, 34 insertions(+), 2 deletions(-) diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index e40c1bb0fdf..815eced53d8 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -351,7 +351,7 @@ func (s *Server) JetStreamClusterPeers() []string { defer js.mu.RUnlock() cc := js.cluster - if !cc.isLeader() { + if !cc.isLeader() || cc.meta == nil { return nil } peers := cc.meta.Peers() @@ -1487,6 +1487,9 @@ func (js *jetStream) processAddPeer(peer string) { defer js.mu.Unlock() s, cc := js.srv, js.cluster + if cc == nil || cc.meta == nil { + return + } isLeader := cc.isLeader() // Now check if we are meta-leader. We will check for any re-assignments. @@ -1528,7 +1531,7 @@ func (js *jetStream) processAddPeer(peer string) { func (js *jetStream) processRemovePeer(peer string) { js.mu.Lock() s, cc := js.srv, js.cluster - if cc.meta == nil { + if cc == nil || cc.meta == nil { js.mu.Unlock() return } @@ -1592,6 +1595,9 @@ func (js *jetStream) removePeerFromStreamLocked(sa *streamAssignment, peer strin } s, cc, csa := js.srv, js.cluster, sa.copyGroup() + if cc == nil || cc.meta == nil { + return false + } replaced := cc.remapStreamAssignment(csa, peer) if !replaced { s.Warnf("JetStream cluster could not replace peer for stream '%s > %s'", sa.Client.serviceAccount(), sa.Config.Name) @@ -4748,6 +4754,9 @@ func (js *jetStream) processStreamAssignmentResults(sub *subscription, c *client defer js.mu.Unlock() s, cc := js.srv, js.cluster + if cc == nil || cc.meta == nil { + return + } // This should have been done already in processStreamAssignment, but in // case we have a code path that gets here with no processStreamAssignment, @@ -4821,6 +4830,9 @@ func (js *jetStream) processConsumerAssignmentResults(sub *subscription, c *clie defer js.mu.Unlock() s, cc := js.srv, js.cluster + if cc == nil || cc.meta == nil { + return + } if sa := js.streamAssignment(result.Account, result.Stream); sa != nil && sa.consumers != nil { if ca := sa.consumers[result.Consumer]; ca != nil && !ca.responded { @@ -5591,6 +5603,10 @@ func (s *Server) jsClusteredStreamUpdateRequest(ci *ClientInfo, acc *Account, su js.mu.Lock() defer js.mu.Unlock() + if cc.meta == nil { + return + } + var resp = JSApiStreamUpdateResponse{ApiResponse: ApiResponse{Type: JSApiStreamUpdateResponseType}} osa := js.streamAssignment(acc.Name, cfg.Name) @@ -5871,6 +5887,10 @@ func (s *Server) jsClusteredStreamDeleteRequest(ci *ClientInfo, acc *Account, st js.mu.Lock() defer js.mu.Unlock() + if cc.meta == nil { + return + } + osa := js.streamAssignment(acc.Name, stream) if osa == nil { var resp = JSApiStreamDeleteResponse{ApiResponse: ApiResponse{Type: JSApiStreamDeleteResponseType}} @@ -5944,6 +5964,10 @@ func (s *Server) jsClusteredStreamRestoreRequest( js.mu.Lock() defer js.mu.Unlock() + if cc.meta == nil { + return + } + cfg := &req.Config resp := JSApiStreamRestoreResponse{ApiResponse: ApiResponse{Type: JSApiStreamRestoreResponseType}} @@ -6315,6 +6339,10 @@ func (s *Server) jsClusteredConsumerDeleteRequest(ci *ClientInfo, acc *Account, js.mu.Lock() defer js.mu.Unlock() + if cc.meta == nil { + return + } + var resp = JSApiConsumerDeleteResponse{ApiResponse: ApiResponse{Type: JSApiConsumerDeleteResponseType}} sa := js.streamAssignment(acc.Name, stream) @@ -6503,6 +6531,10 @@ func (s *Server) jsClusteredConsumerRequest(ci *ClientInfo, acc *Account, subjec js.mu.Lock() defer js.mu.Unlock() + if cc.meta == nil { + return + } + // Lookup the stream assignment. sa := js.streamAssignment(acc.Name, stream) if sa == nil { From e2746934903e4c4ad517b3ae4ec12c2ee24bd4d1 Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Wed, 29 Mar 2023 14:07:14 -0700 Subject: [PATCH 14/20] On bad or corrupt message load during commit, reset WAL vs mark write error Signed-off-by: Derek Collison --- server/raft.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/server/raft.go b/server/raft.go index 64df8226f2c..11100b0e744 100644 --- a/server/raft.go +++ b/server/raft.go @@ -2418,9 +2418,11 @@ func (n *raft) applyCommit(index uint64) error { if ae, err = n.loadEntry(index); err != nil { if err != ErrStoreClosed && err != ErrStoreEOF { if err == errBadMsg { - n.setWriteErrLocked(err) + n.warn("Got an error loading %d index: %v - will reset", index, err) + n.resetWAL() + } else { + n.warn("Got an error loading %d index: %v", index, err) } - n.warn("Got an error loading %d index: %v", index, err) } n.commit = original return errEntryLoadFailed From 2b89fea9b0ebd2c47105915b1de76c5357fae83f Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Wed, 29 Mar 2023 14:46:49 -0700 Subject: [PATCH 15/20] Double check here if the jetstream cluster was shutdown when we released the lock Signed-off-by: Derek Collison --- server/jetstream_cluster.go | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index 815eced53d8..5d3888890e4 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -5647,6 +5647,13 @@ func (s *Server) jsClusteredStreamUpdateRequest(ci *ClientInfo, acc *Account, su return } + // Since we release lock above for the config update recheck here to make sure we did not shutdown. + if cc.meta == nil { + resp.Error = NewJSClusterNotActiveError() + s.sendAPIErrResponse(ci, acc, subject, reply, string(rmsg), s.jsonResponse(&resp)) + return + } + // Make copy so to not change original. rg := osa.copyGroup().Group From c77872b519b43b24aa9859ab2448e072d4670fbc Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Wed, 29 Mar 2023 15:29:38 -0700 Subject: [PATCH 16/20] Update server/jetstream_cluster.go Pre-allocate Co-authored-by: Neil --- server/jetstream_cluster.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index 5d3888890e4..3cd0e0ebfe1 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -7419,7 +7419,7 @@ func (mset *stream) processSnapshot(snap *streamSnapshot) (e error) { } mset.mu.RLock() - var consumers []*consumer + consumers := make([]*consumer, 0, len(mset.consumers)) for _, o := range mset.consumers { consumers = append(consumers, o) } From 152b25c3142528fd151fe1094c19f6caac556a5f Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Wed, 29 Mar 2023 15:29:51 -0700 Subject: [PATCH 17/20] Update server/stream.go Pre-allocate Co-authored-by: Neil --- server/stream.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/stream.go b/server/stream.go index 2f01654c0e5..4e183f2b981 100644 --- a/server/stream.go +++ b/server/stream.go @@ -5114,7 +5114,7 @@ func (a *Account) RestoreStream(ncfg *StreamConfig, r io.Reader) (*stream, error // Issue https://github.com/nats-io/nats-server/issues/3612 func (mset *stream) checkForOrphanMsgs() { mset.mu.RLock() - var consumers []*consumer + consumers := make([]*consumer, 0, len(mset.consumers)) for _, o := range mset.consumers { consumers = append(consumers, o) } From 9a714e7d7d32cb0644e7b1a0c110802251c20303 Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Wed, 29 Mar 2023 15:47:54 -0700 Subject: [PATCH 18/20] Update based on review feedback Signed-off-by: Derek Collison --- server/consumer.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/server/consumer.go b/server/consumer.go index ff4449afa21..193c6d01b2c 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -2505,8 +2505,12 @@ func (o *consumer) needAck(sseq uint64, subj string) bool { defer o.mu.RUnlock() isFiltered := o.isFiltered() + if isFiltered && o.mset == nil { + return false + } + // Check if we are filtered, and if so check if this is even applicable to us. - if isFiltered && o.mset != nil { + if isFiltered { if subj == _EMPTY_ { var svp StoreMsg if _, err := o.mset.store.LoadMsg(sseq, &svp); err != nil { @@ -2519,10 +2523,6 @@ func (o *consumer) needAck(sseq uint64, subj string) bool { } } - if isFiltered && o.mset == nil { - return false - } - if o.isLeader() { asflr, osseq = o.asflr, o.sseq pending = o.pending From ade0e9d2958a275bbfef051d1d2a0a168a6d5c0e Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Wed, 29 Mar 2023 16:51:17 -0700 Subject: [PATCH 19/20] Snapshot meta for this function to use in case it gets removed out from underneath of us. Signed-off-by: Derek Collison --- server/jetstream_cluster.go | 16 ++++------------ 1 file changed, 4 insertions(+), 12 deletions(-) diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index 3cd0e0ebfe1..1da8cb2c6f1 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -5602,8 +5602,8 @@ func (s *Server) jsClusteredStreamUpdateRequest(ci *ClientInfo, acc *Account, su // Now process the request and proposal. js.mu.Lock() defer js.mu.Unlock() - - if cc.meta == nil { + meta := cc.meta + if meta == nil { return } @@ -5647,13 +5647,6 @@ func (s *Server) jsClusteredStreamUpdateRequest(ci *ClientInfo, acc *Account, su return } - // Since we release lock above for the config update recheck here to make sure we did not shutdown. - if cc.meta == nil { - resp.Error = NewJSClusterNotActiveError() - s.sendAPIErrResponse(ci, acc, subject, reply, string(rmsg), s.jsonResponse(&resp)) - return - } - // Make copy so to not change original. rg := osa.copyGroup().Group @@ -5876,13 +5869,12 @@ func (s *Server) jsClusteredStreamUpdateRequest(ci *ClientInfo, acc *Account, su } sa := &streamAssignment{Group: rg, Sync: osa.Sync, Created: osa.Created, Config: newCfg, Subject: subject, Reply: reply, Client: ci} - cc.meta.Propose(encodeUpdateStreamAssignment(sa)) + meta.Propose(encodeUpdateStreamAssignment(sa)) // Process any staged consumers. for _, ca := range consumers { - cc.meta.Propose(encodeAddConsumerAssignment(ca)) + meta.Propose(encodeAddConsumerAssignment(ca)) } - } func (s *Server) jsClusteredStreamDeleteRequest(ci *ClientInfo, acc *Account, stream, subject, reply string, rmsg []byte) { From c546828359f11c8bcc9492b4c48bd6c9422ea436 Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Wed, 29 Mar 2023 16:56:04 -0700 Subject: [PATCH 20/20] Moved log running test to NoRace suite Signed-off-by: Derek Collison --- server/filestore_test.go | 123 --------------------------------------- server/norace_test.go | 123 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 123 insertions(+), 123 deletions(-) diff --git a/server/filestore_test.go b/server/filestore_test.go index 029cb4e9d80..e3050018d9f 100644 --- a/server/filestore_test.go +++ b/server/filestore_test.go @@ -5372,126 +5372,3 @@ func TestFileStoreSubjectsTotals(t *testing.T) { t.Fatalf("Expected %d subjects for %q, got %d", expected, "*.*", len(st)) } } - -func TestFileStoreNumPending(t *testing.T) { - // No need for all permutations here. - storeDir := t.TempDir() - fcfg := FileStoreConfig{ - StoreDir: storeDir, - BlockSize: 2 * 1024, // Create many blocks on purpose. - } - fs, err := newFileStore(fcfg, StreamConfig{Name: "zzz", Subjects: []string{"*.*.*.*"}, Storage: FileStorage}) - require_NoError(t, err) - defer fs.Stop() - - tokens := []string{"foo", "bar", "baz"} - genSubj := func() string { - return fmt.Sprintf("%s.%s.%s.%s", - tokens[rand.Intn(len(tokens))], - tokens[rand.Intn(len(tokens))], - tokens[rand.Intn(len(tokens))], - tokens[rand.Intn(len(tokens))], - ) - } - - for i := 0; i < 50_000; i++ { - subj := genSubj() - _, _, err := fs.StoreMsg(subj, nil, []byte("Hello World")) - require_NoError(t, err) - } - - state := fs.State() - - // Scan one by one for sanity check against other calculations. - sanityCheck := func(sseq uint64, filter string) SimpleState { - t.Helper() - var ss SimpleState - var smv StoreMsg - // For here we know 0 is invalid, set to 1. - if sseq == 0 { - sseq = 1 - } - for seq := sseq; seq <= state.LastSeq; seq++ { - sm, err := fs.LoadMsg(seq, &smv) - if err != nil { - t.Logf("Encountered error %v loading sequence: %d", err, seq) - continue - } - if subjectIsSubsetMatch(sm.subj, filter) { - ss.Msgs++ - ss.Last = seq - if ss.First == 0 || seq < ss.First { - ss.First = seq - } - } - } - return ss - } - - check := func(sseq uint64, filter string) { - t.Helper() - np, lvs := fs.NumPending(sseq, filter, false) - ss := fs.FilteredState(sseq, filter) - sss := sanityCheck(sseq, filter) - if lvs != state.LastSeq { - t.Fatalf("Expected NumPending to return valid through last of %d but got %d", state.LastSeq, lvs) - } - if ss.Msgs != np { - t.Fatalf("NumPending of %d did not match ss.Msgs of %d", np, ss.Msgs) - } - if ss != sss { - t.Fatalf("Failed sanity check, expected %+v got %+v", sss, ss) - } - } - - sanityCheckLastOnly := func(sseq uint64, filter string) SimpleState { - t.Helper() - var ss SimpleState - var smv StoreMsg - // For here we know 0 is invalid, set to 1. - if sseq == 0 { - sseq = 1 - } - seen := make(map[string]bool) - for seq := state.LastSeq; seq >= sseq; seq-- { - sm, err := fs.LoadMsg(seq, &smv) - if err != nil { - t.Logf("Encountered error %v loading sequence: %d", err, seq) - continue - } - if !seen[sm.subj] && subjectIsSubsetMatch(sm.subj, filter) { - ss.Msgs++ - if ss.Last == 0 { - ss.Last = seq - } - if ss.First == 0 || seq < ss.First { - ss.First = seq - } - seen[sm.subj] = true - } - } - return ss - } - - checkLastOnly := func(sseq uint64, filter string) { - t.Helper() - np, lvs := fs.NumPending(sseq, filter, true) - ss := sanityCheckLastOnly(sseq, filter) - if lvs != state.LastSeq { - t.Fatalf("Expected NumPending to return valid through last of %d but got %d", state.LastSeq, lvs) - } - if ss.Msgs != np { - t.Fatalf("NumPending of %d did not match ss.Msgs of %d", np, ss.Msgs) - } - } - - startSeqs := []uint64{0, 1, 2, 200, 444, 555, 2222, 8888, 12_345, 28_222, 33_456, 44_400, 49_999} - checkSubs := []string{"foo.>", "*.bar.>", "foo.bar.*.baz", "*.bar.>", "*.foo.bar.*", "foo.foo.bar.baz"} - - for _, filter := range checkSubs { - for _, start := range startSeqs { - check(start, filter) - checkLastOnly(start, filter) - } - } -} diff --git a/server/norace_test.go b/server/norace_test.go index b6b137dc056..711c6ef4446 100644 --- a/server/norace_test.go +++ b/server/norace_test.go @@ -7312,3 +7312,126 @@ func TestNoRaceJetStreamClusterInterestStreamConsistencyAfterRollingRestart(t *t } } } + +func TestNoRaceFileStoreNumPending(t *testing.T) { + // No need for all permutations here. + storeDir := t.TempDir() + fcfg := FileStoreConfig{ + StoreDir: storeDir, + BlockSize: 2 * 1024, // Create many blocks on purpose. + } + fs, err := newFileStore(fcfg, StreamConfig{Name: "zzz", Subjects: []string{"*.*.*.*"}, Storage: FileStorage}) + require_NoError(t, err) + defer fs.Stop() + + tokens := []string{"foo", "bar", "baz"} + genSubj := func() string { + return fmt.Sprintf("%s.%s.%s.%s", + tokens[rand.Intn(len(tokens))], + tokens[rand.Intn(len(tokens))], + tokens[rand.Intn(len(tokens))], + tokens[rand.Intn(len(tokens))], + ) + } + + for i := 0; i < 50_000; i++ { + subj := genSubj() + _, _, err := fs.StoreMsg(subj, nil, []byte("Hello World")) + require_NoError(t, err) + } + + state := fs.State() + + // Scan one by one for sanity check against other calculations. + sanityCheck := func(sseq uint64, filter string) SimpleState { + t.Helper() + var ss SimpleState + var smv StoreMsg + // For here we know 0 is invalid, set to 1. + if sseq == 0 { + sseq = 1 + } + for seq := sseq; seq <= state.LastSeq; seq++ { + sm, err := fs.LoadMsg(seq, &smv) + if err != nil { + t.Logf("Encountered error %v loading sequence: %d", err, seq) + continue + } + if subjectIsSubsetMatch(sm.subj, filter) { + ss.Msgs++ + ss.Last = seq + if ss.First == 0 || seq < ss.First { + ss.First = seq + } + } + } + return ss + } + + check := func(sseq uint64, filter string) { + t.Helper() + np, lvs := fs.NumPending(sseq, filter, false) + ss := fs.FilteredState(sseq, filter) + sss := sanityCheck(sseq, filter) + if lvs != state.LastSeq { + t.Fatalf("Expected NumPending to return valid through last of %d but got %d", state.LastSeq, lvs) + } + if ss.Msgs != np { + t.Fatalf("NumPending of %d did not match ss.Msgs of %d", np, ss.Msgs) + } + if ss != sss { + t.Fatalf("Failed sanity check, expected %+v got %+v", sss, ss) + } + } + + sanityCheckLastOnly := func(sseq uint64, filter string) SimpleState { + t.Helper() + var ss SimpleState + var smv StoreMsg + // For here we know 0 is invalid, set to 1. + if sseq == 0 { + sseq = 1 + } + seen := make(map[string]bool) + for seq := state.LastSeq; seq >= sseq; seq-- { + sm, err := fs.LoadMsg(seq, &smv) + if err != nil { + t.Logf("Encountered error %v loading sequence: %d", err, seq) + continue + } + if !seen[sm.subj] && subjectIsSubsetMatch(sm.subj, filter) { + ss.Msgs++ + if ss.Last == 0 { + ss.Last = seq + } + if ss.First == 0 || seq < ss.First { + ss.First = seq + } + seen[sm.subj] = true + } + } + return ss + } + + checkLastOnly := func(sseq uint64, filter string) { + t.Helper() + np, lvs := fs.NumPending(sseq, filter, true) + ss := sanityCheckLastOnly(sseq, filter) + if lvs != state.LastSeq { + t.Fatalf("Expected NumPending to return valid through last of %d but got %d", state.LastSeq, lvs) + } + if ss.Msgs != np { + t.Fatalf("NumPending of %d did not match ss.Msgs of %d", np, ss.Msgs) + } + } + + startSeqs := []uint64{0, 1, 2, 200, 444, 555, 2222, 8888, 12_345, 28_222, 33_456, 44_400, 49_999} + checkSubs := []string{"foo.>", "*.bar.>", "foo.bar.*.baz", "*.bar.>", "*.foo.bar.*", "foo.foo.bar.baz"} + + for _, filter := range checkSubs { + for _, start := range startSeqs { + check(start, filter) + checkLastOnly(start, filter) + } + } +}