diff --git a/server/consumer.go b/server/consumer.go index 63e54673f52..193c6d01b2c 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -2388,6 +2388,11 @@ func (o *consumer) sampleAck(sseq, dseq, dc uint64) { func (o *consumer) processAckMsg(sseq, dseq, dc uint64, doSample bool) { o.mu.Lock() + if o.closed { + o.mu.Unlock() + return + } + var sagap uint64 var needSignal bool @@ -2495,20 +2500,25 @@ func (o *consumer) needAck(sseq uint64, subj string) bool { var needAck bool var asflr, osseq uint64 var pending map[uint64]*Pending + o.mu.RLock() + defer o.mu.RUnlock() + + isFiltered := o.isFiltered() + if isFiltered && o.mset == nil { + return false + } // Check if we are filtered, and if so check if this is even applicable to us. - if o.isFiltered() && o.mset != nil { + if isFiltered { if subj == _EMPTY_ { var svp StoreMsg if _, err := o.mset.store.LoadMsg(sseq, &svp); err != nil { - o.mu.RUnlock() return false } subj = svp.subj } if !o.isFilteredMatch(subj) { - o.mu.RUnlock() return false } } @@ -2518,14 +2528,12 @@ func (o *consumer) needAck(sseq uint64, subj string) bool { pending = o.pending } else { if o.store == nil { - o.mu.RUnlock() return false } state, err := o.store.BorrowState() if err != nil || state == nil { // Fall back to what we track internally for now. needAck := sseq > o.asflr && !o.isFiltered() - o.mu.RUnlock() return needAck } // If loading state as here, the osseq is +1. @@ -2545,7 +2553,6 @@ func (o *consumer) needAck(sseq uint64, subj string) bool { } } - o.mu.RUnlock() return needAck } @@ -4555,3 +4562,45 @@ func (o *consumer) isMonitorRunning() bool { defer o.mu.Unlock() return o.inMonitor } + +// If we are a consumer of an interest or workqueue policy stream, process that state and make sure consistent. +func (o *consumer) checkStateForInterestStream() { + o.mu.Lock() + // See if we need to process this update if our parent stream is not a limits policy stream. + mset := o.mset + shouldProcessState := mset != nil && o.retention != LimitsPolicy + if o.closed || !shouldProcessState { + o.mu.Unlock() + return + } + + state, err := o.store.State() + o.mu.Unlock() + + if err != nil { + return + } + + // We should make sure to update the acks. + var ss StreamState + mset.store.FastState(&ss) + + asflr := state.AckFloor.Stream + for seq := ss.FirstSeq; seq <= asflr; seq++ { + mset.ackMsg(o, seq) + } + + o.mu.RLock() + // See if we need to process this update if our parent stream is not a limits policy stream. + state, _ = o.store.State() + o.mu.RUnlock() + + // If we have pending, we will need to walk through to delivered in case we missed any of those acks as well. + if state != nil && len(state.Pending) > 0 { + for seq := state.AckFloor.Stream + 1; seq <= state.Delivered.Stream; seq++ { + if _, ok := state.Pending[seq]; !ok { + mset.ackMsg(o, seq) + } + } + } +} diff --git a/server/filestore.go b/server/filestore.go index 3b910e036be..8e1856ed5ea 100644 --- a/server/filestore.go +++ b/server/filestore.go @@ -106,6 +106,7 @@ type psi struct { } type fileStore struct { + srv *Server mu sync.RWMutex state StreamState ld *LostStreamData @@ -373,6 +374,12 @@ func newFileStoreWithCreated(fcfg FileStoreConfig, cfg StreamConfig, created tim return fs, nil } +func (fs *fileStore) registerServer(s *Server) { + fs.mu.Lock() + defer fs.mu.Unlock() + fs.srv = s +} + // Lock all existing message blocks. // Lock held on entry. func (fs *fileStore) lockAllMsgBlocks() { @@ -765,6 +772,7 @@ func (fs *fileStore) recoverMsgBlock(fi os.FileInfo, index uint32) (*msgBlock, e if ld, _ := mb.rebuildState(); ld != nil { fs.addLostData(ld) } + if mb.msgs > 0 && !mb.noTrack && fs.psim != nil { fs.populateGlobalPerSubjectInfo(mb) // Try to dump any state we needed on recovery. @@ -1036,7 +1044,7 @@ func (mb *msgBlock) rebuildStateLocked() (*LostStreamData, error) { rl &^= hbit dlen := int(rl) - msgHdrSize // Do some quick sanity checks here. - if dlen < 0 || int(slen) > dlen || dlen > int(rl) || rl > rlBadThresh { + if dlen < 0 || int(slen) > (dlen-8) || dlen > int(rl) || rl > rlBadThresh { truncate(index) return gatherLost(lbuf - index), errBadMsg } @@ -2625,6 +2633,16 @@ func (fs *fileStore) removeMsg(seq uint64, secure, viaLimits, needFSLock bool) ( if secure && fs.prf != nil { secure = false } + + if fs.state.Msgs == 0 { + var err = ErrStoreEOF + if seq <= fs.state.LastSeq { + err = ErrStoreMsgNotFound + } + fsUnlock() + return false, err + } + mb := fs.selectMsgBlock(seq) if mb == nil { var err = ErrStoreEOF @@ -2637,8 +2655,8 @@ func (fs *fileStore) removeMsg(seq uint64, secure, viaLimits, needFSLock bool) ( mb.mu.Lock() - // See if the sequence number is still relevant. - if seq < mb.first.seq { + // See if we are closed or the sequence number is still relevant. + if mb.closed || seq < mb.first.seq { mb.mu.Unlock() fsUnlock() return false, nil @@ -2933,6 +2951,7 @@ func (mb *msgBlock) slotInfo(slot int) (uint32, uint32, bool, error) { if mb.cache == nil || slot >= len(mb.cache.idx) { return 0, 0, false, errPartialCache } + bi := mb.cache.idx[slot] ri, hashChecked := (bi &^ hbit), (bi&hbit) != 0 @@ -3911,11 +3930,12 @@ func (mb *msgBlock) indexCacheBuf(buf []byte) error { dlen := int(rl) - msgHdrSize // Do some quick sanity checks here. - if dlen < 0 || int(slen) > dlen || dlen > int(rl) || rl > 32*1024*1024 { + if dlen < 0 || int(slen) > dlen || dlen > int(rl) || index+rl > lbuf || rl > 32*1024*1024 { // This means something is off. // TODO(dlc) - Add into bad list? return errCorruptState } + // Clear erase bit. seq = seq &^ ebit // Adjust if we guessed wrong. @@ -4756,6 +4776,9 @@ func (mb *msgBlock) writeIndexInfoLocked() error { if err != nil { return err } + if fi, _ := ifd.Stat(); fi != nil { + mb.liwsz = fi.Size() + } mb.ifd = ifd } @@ -5175,7 +5198,7 @@ func (fs *fileStore) purge(fseq uint64) (uint64, error) { // but not including the seq parameter. // Will return the number of purged messages. func (fs *fileStore) Compact(seq uint64) (uint64, error) { - if seq == 0 || seq > fs.lastSeq() { + if seq == 0 { return fs.purge(seq) } @@ -5183,15 +5206,16 @@ func (fs *fileStore) Compact(seq uint64) (uint64, error) { // We have to delete interior messages. fs.mu.Lock() + if lseq := fs.state.LastSeq; seq > lseq { + fs.mu.Unlock() + return fs.purge(seq) + } + smb := fs.selectMsgBlock(seq) if smb == nil { fs.mu.Unlock() return 0, nil } - if err := smb.loadMsgs(); err != nil { - fs.mu.Unlock() - return 0, err - } // All msgblocks up to this one can be thrown away. var deleted int @@ -5218,7 +5242,12 @@ func (fs *fileStore) Compact(seq uint64) (uint64, error) { var isEmpty bool smb.mu.Lock() - // Since we loaded before we acquired our lock, double check here under lock that we have the messages loaded. + if smb.first.seq == seq { + isEmpty = smb.msgs == 0 + goto SKIP + } + + // Make sure we have the messages loaded. if smb.cacheNotLoaded() { if err = smb.loadMsgsWithLock(); err != nil { goto SKIP @@ -5229,7 +5258,7 @@ func (fs *fileStore) Compact(seq uint64) (uint64, error) { if err == errDeletedMsg { // Update dmap. if len(smb.dmap) > 0 { - delete(smb.dmap, seq) + delete(smb.dmap, mseq) if len(smb.dmap) == 0 { smb.dmap = nil } @@ -5265,7 +5294,7 @@ func (fs *fileStore) Compact(seq uint64) (uint64, error) { // Check if we should reclaim the head space from this block. // This will be optimistic only, so don't continue if we encounter any errors here. - if smb.bytes*2 < smb.rbytes { + if smb.rbytes > compactMinimum && smb.bytes*2 < smb.rbytes { var moff uint32 moff, _, _, err = smb.slotInfo(int(smb.first.seq - smb.cache.fseq)) if err != nil || moff >= uint32(len(smb.cache.buf)) { @@ -5299,13 +5328,13 @@ func (fs *fileStore) Compact(seq uint64) (uint64, error) { } SKIP: - smb.mu.Unlock() - if !isEmpty { // Make sure to write out our index info. - smb.writeIndexInfo() + smb.writeIndexInfoLocked() } + smb.mu.Unlock() + if deleted > 0 { // Update block map. if fs.bim != nil { @@ -5329,7 +5358,7 @@ SKIP: cb := fs.scb fs.mu.Unlock() - if cb != nil { + if cb != nil && purged > 0 { cb(-int64(purged), -int64(bytes), 0, _EMPTY_) } @@ -5338,7 +5367,6 @@ SKIP: // Will completely reset our store. func (fs *fileStore) reset() error { - fs.mu.Lock() if fs.closed { fs.mu.Unlock() @@ -5352,14 +5380,12 @@ func (fs *fileStore) reset() error { var purged, bytes uint64 cb := fs.scb - if cb != nil { - for _, mb := range fs.blks { - mb.mu.Lock() - purged += mb.msgs - bytes += mb.bytes - mb.dirtyCloseWithRemove(true) - mb.mu.Unlock() - } + for _, mb := range fs.blks { + mb.mu.Lock() + purged += mb.msgs + bytes += mb.bytes + mb.dirtyCloseWithRemove(true) + mb.mu.Unlock() } // Reset @@ -6749,7 +6775,7 @@ func (o *consumerFileStore) Update(state *ConsumerState) error { defer o.mu.Unlock() // Check to see if this is an outdated update. - if state.Delivered.Consumer < o.state.Delivered.Consumer { + if state.Delivered.Consumer < o.state.Delivered.Consumer || state.AckFloor.Stream < o.state.AckFloor.Stream { return nil } diff --git a/server/filestore_test.go b/server/filestore_test.go index 29e3a705a67..e3050018d9f 100644 --- a/server/filestore_test.go +++ b/server/filestore_test.go @@ -3668,7 +3668,7 @@ func TestFileStoreFetchPerf(t *testing.T) { // https://github.com/nats-io/nats-server/issues/2936 func TestFileStoreCompactReclaimHeadSpace(t *testing.T) { testFileStoreAllPermutations(t, func(t *testing.T, fcfg FileStoreConfig) { - fcfg.BlockSize = 1024 * 1024 + fcfg.BlockSize = 4 * 1024 * 1024 fs, err := newFileStore( fcfg, @@ -3678,7 +3678,7 @@ func TestFileStoreCompactReclaimHeadSpace(t *testing.T) { defer fs.Stop() // Create random bytes for payload to test for corruption vs repeated. - msg := make([]byte, 16*1024) + msg := make([]byte, 64*1024) crand.Read(msg) // This gives us ~63 msgs in first and ~37 in second. @@ -5372,126 +5372,3 @@ func TestFileStoreSubjectsTotals(t *testing.T) { t.Fatalf("Expected %d subjects for %q, got %d", expected, "*.*", len(st)) } } - -func TestFileStoreNumPending(t *testing.T) { - // No need for all permutations here. - storeDir := t.TempDir() - fcfg := FileStoreConfig{ - StoreDir: storeDir, - BlockSize: 2 * 1024, // Create many blocks on purpose. - } - fs, err := newFileStore(fcfg, StreamConfig{Name: "zzz", Subjects: []string{"*.*.*.*"}, Storage: FileStorage}) - require_NoError(t, err) - defer fs.Stop() - - tokens := []string{"foo", "bar", "baz"} - genSubj := func() string { - return fmt.Sprintf("%s.%s.%s.%s", - tokens[rand.Intn(len(tokens))], - tokens[rand.Intn(len(tokens))], - tokens[rand.Intn(len(tokens))], - tokens[rand.Intn(len(tokens))], - ) - } - - for i := 0; i < 50_000; i++ { - subj := genSubj() - _, _, err := fs.StoreMsg(subj, nil, []byte("Hello World")) - require_NoError(t, err) - } - - state := fs.State() - - // Scan one by one for sanity check against other calculations. - sanityCheck := func(sseq uint64, filter string) SimpleState { - t.Helper() - var ss SimpleState - var smv StoreMsg - // For here we know 0 is invalid, set to 1. - if sseq == 0 { - sseq = 1 - } - for seq := sseq; seq <= state.LastSeq; seq++ { - sm, err := fs.LoadMsg(seq, &smv) - if err != nil { - t.Logf("Encountered error %v loading sequence: %d", err, seq) - continue - } - if subjectIsSubsetMatch(sm.subj, filter) { - ss.Msgs++ - ss.Last = seq - if ss.First == 0 || seq < ss.First { - ss.First = seq - } - } - } - return ss - } - - check := func(sseq uint64, filter string) { - t.Helper() - np, lvs := fs.NumPending(sseq, filter, false) - ss := fs.FilteredState(sseq, filter) - sss := sanityCheck(sseq, filter) - if lvs != state.LastSeq { - t.Fatalf("Expected NumPending to return valid through last of %d but got %d", state.LastSeq, lvs) - } - if ss.Msgs != np { - t.Fatalf("NumPending of %d did not match ss.Msgs of %d", np, ss.Msgs) - } - if ss != sss { - t.Fatalf("Failed sanity check, expected %+v got %+v", sss, ss) - } - } - - sanityCheckLastOnly := func(sseq uint64, filter string) SimpleState { - t.Helper() - var ss SimpleState - var smv StoreMsg - // For here we know 0 is invalid, set to 1. - if sseq == 0 { - sseq = 1 - } - seen := make(map[string]bool) - for seq := state.LastSeq; seq >= sseq; seq-- { - sm, err := fs.LoadMsg(seq, &smv) - if err != nil { - t.Logf("Encountered error %v loading sequence: %d", err, seq) - continue - } - if !seen[sm.subj] && subjectIsSubsetMatch(sm.subj, filter) { - ss.Msgs++ - if ss.Last == 0 { - ss.Last = seq - } - if ss.First == 0 || seq < ss.First { - ss.First = seq - } - seen[sm.subj] = true - } - } - return ss - } - - checkLastOnly := func(sseq uint64, filter string) { - t.Helper() - np, lvs := fs.NumPending(sseq, filter, true) - ss := sanityCheckLastOnly(sseq, filter) - if lvs != state.LastSeq { - t.Fatalf("Expected NumPending to return valid through last of %d but got %d", state.LastSeq, lvs) - } - if ss.Msgs != np { - t.Fatalf("NumPending of %d did not match ss.Msgs of %d", np, ss.Msgs) - } - } - - startSeqs := []uint64{0, 1, 2, 200, 444, 555, 2222, 8888, 12_345, 28_222, 33_456, 44_400, 49_999} - checkSubs := []string{"foo.>", "*.bar.>", "foo.bar.*.baz", "*.bar.>", "*.foo.bar.*", "foo.foo.bar.baz"} - - for _, filter := range checkSubs { - for _, start := range startSeqs { - check(start, filter) - checkLastOnly(start, filter) - } - } -} diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index e4e7ba7beaa..1da8cb2c6f1 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -351,7 +351,7 @@ func (s *Server) JetStreamClusterPeers() []string { defer js.mu.RUnlock() cc := js.cluster - if !cc.isLeader() { + if !cc.isLeader() || cc.meta == nil { return nil } peers := cc.meta.Peers() @@ -652,6 +652,8 @@ func (js *jetStream) setupMetaGroup() error { s.Errorf("Error creating filestore: %v", err) return err } + // Register our server. + fs.registerServer(s) cfg := &RaftConfig{Name: defaultMetaGroupName, Store: storeDir, Log: fs} @@ -785,7 +787,7 @@ func (js *jetStream) isGroupLeaderless(rg *raftGroup) bool { // If we don't have a leader. if rg.node.GroupLeader() == _EMPTY_ { // Threshold for jetstream startup. - const startupThreshold = 5 * time.Second + const startupThreshold = 10 * time.Second if rg.node.HadPreviousLeader() { // Make sure we have been running long enough to intelligently determine this. @@ -1162,8 +1164,12 @@ func (js *jetStream) monitorCluster() { } aq.recycle(&ces) case isLeader = <-lch: + // For meta layer synchronize everyone to our state on becoming leader. + if isLeader { + n.SendSnapshot(js.metaSnapshot()) + } + // Process the change. js.processLeaderChange(isLeader) - if isLeader { s.sendInternalMsgLocked(serverStatsPingReqSubj, _EMPTY_, nil, nil) // Install a snapshot as we become leader. @@ -1481,6 +1487,9 @@ func (js *jetStream) processAddPeer(peer string) { defer js.mu.Unlock() s, cc := js.srv, js.cluster + if cc == nil || cc.meta == nil { + return + } isLeader := cc.isLeader() // Now check if we are meta-leader. We will check for any re-assignments. @@ -1522,7 +1531,7 @@ func (js *jetStream) processAddPeer(peer string) { func (js *jetStream) processRemovePeer(peer string) { js.mu.Lock() s, cc := js.srv, js.cluster - if cc.meta == nil { + if cc == nil || cc.meta == nil { js.mu.Unlock() return } @@ -1586,6 +1595,9 @@ func (js *jetStream) removePeerFromStreamLocked(sa *streamAssignment, peer strin } s, cc, csa := js.srv, js.cluster, sa.copyGroup() + if cc == nil || cc.meta == nil { + return false + } replaced := cc.remapStreamAssignment(csa, peer) if !replaced { s.Warnf("JetStream cluster could not replace peer for stream '%s > %s'", sa.Client.serviceAccount(), sa.Config.Name) @@ -1827,6 +1839,8 @@ func (js *jetStream) createRaftGroup(accName string, rg *raftGroup, storage Stor s.Errorf("Error creating filestore WAL: %v", err) return err } + // Register our server. + fs.registerServer(s) store = fs } else { ms, err := newMemStore(&StreamConfig{Name: rg.Name, Storage: MemoryStorage}) @@ -2041,6 +2055,31 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps } defer stopDirectMonitoring() + // Check if we are interest based and if so and we have an active stream wait until we + // have the consumers attached. This can become important when a server has lots of assets + // since we process streams first then consumers as an asset class. + if mset != nil && mset.isInterestRetention() { + js.mu.RLock() + numExpectedConsumers := len(sa.consumers) + js.mu.RUnlock() + if mset.numConsumers() < numExpectedConsumers { + s.Debugf("Waiting for consumers for interest based stream '%s > %s'", accName, mset.name()) + // Wait up to 10s + const maxWaitTime = 10 * time.Second + const sleepTime = 250 * time.Millisecond + timeout := time.Now().Add(maxWaitTime) + for time.Now().Before(timeout) { + if mset.numConsumers() >= numExpectedConsumers { + break + } + time.Sleep(sleepTime) + } + if actual := mset.numConsumers(); actual < numExpectedConsumers { + s.Warnf("All consumers not online for '%s > %s': expected %d but only have %d", accName, mset.name(), numExpectedConsumers, actual) + } + } + } + // This is triggered during a scale up from R1 to clustered mode. We need the new followers to catchup, // similar to how we trigger the catchup mechanism post a backup/restore. // We can arrive here NOT being the leader, so we send the snapshot only if we are, and in this case @@ -2050,6 +2089,7 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps n.SendSnapshot(mset.stateSnapshot()) sendSnapshot = false } + for { select { case <-s.quitCh: @@ -2185,14 +2225,6 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps case <-t.C: doSnapshot() - // Check is we have preAcks left over if we have become the leader. - if isLeader { - mset.mu.Lock() - if mset.preAcks != nil { - mset.preAcks = nil - } - mset.mu.Unlock() - } case <-uch: // keep stream assignment current @@ -2532,6 +2564,19 @@ func (js *jetStream) applyStreamEntries(mset *stream, ce *CommittedEntry, isReco if lseq-clfs < last { s.Debugf("Apply stream entries for '%s > %s' skipping message with sequence %d with last of %d", mset.account(), mset.name(), lseq+1-clfs, last) + // Check for any preAcks in case we are interest based. + mset.mu.Lock() + seq := lseq + 1 - mset.clfs + var shouldAck bool + if len(mset.preAcks) > 0 { + if _, shouldAck = mset.preAcks[seq]; shouldAck { + delete(mset.preAcks, seq) + } + } + mset.mu.Unlock() + if shouldAck { + mset.ackMsg(nil, seq) + } continue } @@ -4223,6 +4268,8 @@ func (js *jetStream) monitorConsumer(o *consumer, ca *consumerAssignment) { if n.NeedSnapshot() { doSnapshot() } + // Check our state if we are under an interest based stream. + o.checkStateForInterestStream() continue } if err := js.applyConsumerEntries(o, ce, isLeader); err == nil { @@ -4240,6 +4287,8 @@ func (js *jetStream) monitorConsumer(o *consumer, ca *consumerAssignment) { if recovering && !isLeader { js.setConsumerAssignmentRecovering(ca) } + + // Process the change. if err := js.processConsumerLeaderChange(o, isLeader); err == nil && isLeader { doSnapshot() } @@ -4348,6 +4397,7 @@ func (js *jetStream) applyConsumerEntries(o *consumer, ce *CommittedEntry, isLea } panic(err.Error()) } + if err = o.store.Update(state); err != nil { o.mu.RLock() s, acc, mset, name := o.srv, o.acc, o.mset, o.name @@ -4355,20 +4405,8 @@ func (js *jetStream) applyConsumerEntries(o *consumer, ce *CommittedEntry, isLea if s != nil && mset != nil { s.Warnf("Consumer '%s > %s > %s' error on store update from snapshot entry: %v", acc, mset.name(), name, err) } - } else if state, err := o.store.State(); err == nil { - // See if we need to process this update if our parent stream is not a limits policy stream. - o.mu.RLock() - mset := o.mset - shouldProcessAcks := mset != nil && o.retention != LimitsPolicy - o.mu.RUnlock() - // We should make sure to update the acks. - if shouldProcessAcks { - var ss StreamState - mset.store.FastState(&ss) - for seq := ss.FirstSeq; seq <= state.AckFloor.Stream; seq++ { - mset.ackMsg(o, seq) - } - } + } else { + o.checkStateForInterestStream() } } else if e.Type == EntryRemovePeer { @@ -4465,13 +4503,19 @@ func (js *jetStream) applyConsumerEntries(o *consumer, ce *CommittedEntry, isLea func (o *consumer) processReplicatedAck(dseq, sseq uint64) { o.mu.Lock() + + mset := o.mset + if o.closed || mset == nil { + o.mu.Unlock() + return + } + // Update activity. o.lat = time.Now() // Do actual ack update to store. o.store.UpdateAcks(dseq, sseq) - mset := o.mset - if mset == nil || o.retention == LimitsPolicy { + if o.retention == LimitsPolicy { o.mu.Unlock() return } @@ -4710,6 +4754,9 @@ func (js *jetStream) processStreamAssignmentResults(sub *subscription, c *client defer js.mu.Unlock() s, cc := js.srv, js.cluster + if cc == nil || cc.meta == nil { + return + } // This should have been done already in processStreamAssignment, but in // case we have a code path that gets here with no processStreamAssignment, @@ -4783,6 +4830,9 @@ func (js *jetStream) processConsumerAssignmentResults(sub *subscription, c *clie defer js.mu.Unlock() s, cc := js.srv, js.cluster + if cc == nil || cc.meta == nil { + return + } if sa := js.streamAssignment(result.Account, result.Stream); sa != nil && sa.consumers != nil { if ca := sa.consumers[result.Consumer]; ca != nil && !ca.responded { @@ -5552,6 +5602,10 @@ func (s *Server) jsClusteredStreamUpdateRequest(ci *ClientInfo, acc *Account, su // Now process the request and proposal. js.mu.Lock() defer js.mu.Unlock() + meta := cc.meta + if meta == nil { + return + } var resp = JSApiStreamUpdateResponse{ApiResponse: ApiResponse{Type: JSApiStreamUpdateResponseType}} @@ -5815,13 +5869,12 @@ func (s *Server) jsClusteredStreamUpdateRequest(ci *ClientInfo, acc *Account, su } sa := &streamAssignment{Group: rg, Sync: osa.Sync, Created: osa.Created, Config: newCfg, Subject: subject, Reply: reply, Client: ci} - cc.meta.Propose(encodeUpdateStreamAssignment(sa)) + meta.Propose(encodeUpdateStreamAssignment(sa)) // Process any staged consumers. for _, ca := range consumers { - cc.meta.Propose(encodeAddConsumerAssignment(ca)) + meta.Propose(encodeAddConsumerAssignment(ca)) } - } func (s *Server) jsClusteredStreamDeleteRequest(ci *ClientInfo, acc *Account, stream, subject, reply string, rmsg []byte) { @@ -5833,6 +5886,10 @@ func (s *Server) jsClusteredStreamDeleteRequest(ci *ClientInfo, acc *Account, st js.mu.Lock() defer js.mu.Unlock() + if cc.meta == nil { + return + } + osa := js.streamAssignment(acc.Name, stream) if osa == nil { var resp = JSApiStreamDeleteResponse{ApiResponse: ApiResponse{Type: JSApiStreamDeleteResponseType}} @@ -5906,6 +5963,10 @@ func (s *Server) jsClusteredStreamRestoreRequest( js.mu.Lock() defer js.mu.Unlock() + if cc.meta == nil { + return + } + cfg := &req.Config resp := JSApiStreamRestoreResponse{ApiResponse: ApiResponse{Type: JSApiStreamRestoreResponseType}} @@ -6277,6 +6338,10 @@ func (s *Server) jsClusteredConsumerDeleteRequest(ci *ClientInfo, acc *Account, js.mu.Lock() defer js.mu.Unlock() + if cc.meta == nil { + return + } + var resp = JSApiConsumerDeleteResponse{ApiResponse: ApiResponse{Type: JSApiConsumerDeleteResponseType}} sa := js.streamAssignment(acc.Name, stream) @@ -6465,6 +6530,10 @@ func (s *Server) jsClusteredConsumerRequest(ci *ClientInfo, acc *Account, subjec js.mu.Lock() defer js.mu.Unlock() + if cc.meta == nil { + return + } + // Lookup the stream assignment. sa := js.streamAssignment(acc.Name, stream) if sa == nil { @@ -7316,25 +7385,39 @@ func (mset *stream) processSnapshot(snap *streamSnapshot) (e error) { // we are synched for the next message sequence properly. lastRequested := sreq.LastSeq checkFinalState := func() { - if mset != nil { - mset.mu.Lock() - var state StreamState + // Bail if no stream. + if mset == nil { + return + } + + mset.mu.Lock() + var state StreamState + mset.store.FastState(&state) + var didReset bool + firstExpected := lastRequested + 1 + if state.FirstSeq != firstExpected { + // Reset our notion of first. + mset.store.Compact(firstExpected) mset.store.FastState(&state) - var didReset bool - firstExpected := lastRequested + 1 - if state.FirstSeq != firstExpected { - // Reset our notion of first. - mset.store.Compact(firstExpected) - mset.store.FastState(&state) - // Make sure last is also correct in case this also moved. - mset.lseq = state.LastSeq - didReset = true - } - mset.mu.Unlock() - if didReset { - s.Warnf("Catchup for stream '%s > %s' resetting first sequence: %d on catchup complete", - mset.account(), mset.name(), firstExpected) - } + // Make sure last is also correct in case this also moved. + mset.lseq = state.LastSeq + didReset = true + } + mset.mu.Unlock() + + if didReset { + s.Warnf("Catchup for stream '%s > %s' resetting first sequence: %d on catchup complete", + mset.account(), mset.name(), firstExpected) + } + + mset.mu.RLock() + consumers := make([]*consumer, 0, len(mset.consumers)) + for _, o := range mset.consumers { + consumers = append(consumers, o) + } + mset.mu.RUnlock() + for _, o := range consumers { + o.checkStateForInterestStream() } } @@ -7526,11 +7609,19 @@ func (mset *stream) processCatchupMsg(msg []byte) (uint64, error) { return 0, errCatchupBadMsg } - mset.mu.RLock() + mset.mu.Lock() st := mset.cfg.Storage ddloaded := mset.ddloaded tierName := mset.tier - mset.mu.RUnlock() + + if len(mset.preAcks) > 0 { + if _, shouldSkip := mset.preAcks[seq]; shouldSkip { + delete(mset.preAcks, seq) + // Mark this to be skipped + subj, ts = _EMPTY_, 0 + } + } + mset.mu.Unlock() if mset.js.limitsExceeded(st) { return 0, NewJSInsufficientResourcesError() diff --git a/server/jetstream_cluster_1_test.go b/server/jetstream_cluster_1_test.go index b899f0e3de3..20566f013d0 100644 --- a/server/jetstream_cluster_1_test.go +++ b/server/jetstream_cluster_1_test.go @@ -971,6 +971,7 @@ func TestJetStreamClusterRestoreSingleConsumer(t *testing.T) { c.stopAll() c.restartAll() c.waitOnLeader() + c.waitOnStreamLeader("$G", "foo") s = c.randomServer() nc, js = jsClientConnect(t, s) @@ -3719,6 +3720,7 @@ func TestJetStreamClusterAccountPurge(t *testing.T) { resolver: { type: full dir: '%s/jwt' + timeout: "10ms" }`, ojwt, syspub, storeDir) }) defer c.shutdown() @@ -3891,7 +3893,6 @@ func TestJetStreamClusterAccountPurge(t *testing.T) { } c.restartAll() checkForDirs(t, 6, 4) - c.waitOnClusterReady() // unfortunately, this does not wait until leader is not catching up. purge(t) checkForDirs(t, 0, 0) c.stopAll() diff --git a/server/jetstream_cluster_2_test.go b/server/jetstream_cluster_2_test.go index bd530ca1002..c2d756242cd 100644 --- a/server/jetstream_cluster_2_test.go +++ b/server/jetstream_cluster_2_test.go @@ -4322,7 +4322,7 @@ func TestJetStreamClusterStreamReplicaUpdates(t *testing.T) { require_NoError(t, err) c.waitOnStreamLeader("$G", "TEST") - checkFor(t, 5*time.Second, 100*time.Millisecond, func() error { + checkFor(t, 10*time.Second, 100*time.Millisecond, func() error { si, err = js.StreamInfo("TEST") require_NoError(t, err) if len(si.Cluster.Replicas) != r-1 { diff --git a/server/jetstream_cluster_3_test.go b/server/jetstream_cluster_3_test.go index 5986ea4b99c..c0f028ba795 100644 --- a/server/jetstream_cluster_3_test.go +++ b/server/jetstream_cluster_3_test.go @@ -313,7 +313,7 @@ func TestJetStreamClusterDeleteConsumerWhileServerDown(t *testing.T) { // Restart. s = c.restartServer(s) - checkFor(t, time.Second, 200*time.Millisecond, func() error { + checkFor(t, 10*time.Second, 200*time.Millisecond, func() error { hs := s.healthz(&HealthzOptions{ JSEnabledOnly: false, JSServerOnly: false, diff --git a/server/jetstream_helpers_test.go b/server/jetstream_helpers_test.go index a17038ca96c..2dd0612eef2 100644 --- a/server/jetstream_helpers_test.go +++ b/server/jetstream_helpers_test.go @@ -37,9 +37,9 @@ import ( func init() { // Speed up raft for tests. hbInterval = 50 * time.Millisecond - minElectionTimeout = 1 * time.Second - maxElectionTimeout = 3 * time.Second - lostQuorumInterval = time.Second + minElectionTimeout = 750 * time.Millisecond + maxElectionTimeout = 2500 * time.Millisecond + lostQuorumInterval = 500 * time.Millisecond lostQuorumCheck = 4 * hbInterval } @@ -509,7 +509,7 @@ func (sc *supercluster) leader() *Server { func (sc *supercluster) waitOnLeader() { sc.t.Helper() - expires := time.Now().Add(10 * time.Second) + expires := time.Now().Add(30 * time.Second) for time.Now().Before(expires) { for _, c := range sc.clusters { if leader := c.leader(); leader != nil { @@ -548,7 +548,7 @@ func (sc *supercluster) waitOnPeerCount(n int) { sc.t.Helper() sc.waitOnLeader() leader := sc.leader() - expires := time.Now().Add(20 * time.Second) + expires := time.Now().Add(30 * time.Second) for time.Now().Before(expires) { peers := leader.JetStreamClusterPeers() if len(peers) == n { @@ -1237,7 +1237,7 @@ func (c *cluster) waitOnPeerCount(n int) { func (c *cluster) waitOnConsumerLeader(account, stream, consumer string) { c.t.Helper() - expires := time.Now().Add(20 * time.Second) + expires := time.Now().Add(30 * time.Second) for time.Now().Before(expires) { if leader := c.consumerLeader(account, stream, consumer); leader != nil { time.Sleep(200 * time.Millisecond) @@ -1329,7 +1329,7 @@ func (c *cluster) waitOnServerHealthz(s *Server) { func (c *cluster) waitOnServerCurrent(s *Server) { c.t.Helper() - expires := time.Now().Add(20 * time.Second) + expires := time.Now().Add(30 * time.Second) for time.Now().Before(expires) { time.Sleep(100 * time.Millisecond) if !s.JetStreamEnabled() || s.JetStreamIsCurrent() { diff --git a/server/norace_test.go b/server/norace_test.go index f0791ba9440..711c6ef4446 100644 --- a/server/norace_test.go +++ b/server/norace_test.go @@ -5267,7 +5267,7 @@ func TestNoRaceJetStreamClusterDirectAccessAllPeersSubs(t *testing.T) { t.Fatalf("Expected to see messages increase, got %d", si.State.Msgs) } - checkFor(t, 2*time.Second, 100*time.Millisecond, func() error { + checkFor(t, 10*time.Second, 100*time.Millisecond, func() error { // Make sure they are all the same from a state perspective. // Leader will have the expected state. lmset, err := c.streamLeader("$G", "TEST").GlobalAccount().lookupStream("TEST") @@ -7001,3 +7001,437 @@ func TestNoRaceJetStreamClusterDifferentRTTInterestBasedStreamSetup(t *testing.T default: } } + +func TestNoRaceJetStreamInterestStreamCheckInterestRaceBug(t *testing.T) { + c := createJetStreamClusterExplicit(t, "R3S", 3) + defer c.shutdown() + + nc, js := jsClientConnect(t, c.randomServer()) + defer nc.Close() + + _, err := js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"foo"}, + Replicas: 3, + Retention: nats.InterestPolicy, + }) + require_NoError(t, err) + + numConsumers := 10 + for i := 0; i < numConsumers; i++ { + nc, js := jsClientConnect(t, c.randomServer()) + defer nc.Close() + + _, err = js.Subscribe("foo", func(m *nats.Msg) { + m.Ack() + }, nats.Durable(fmt.Sprintf("C%d", i)), nats.ManualAck()) + require_NoError(t, err) + } + + numToSend := 10_000 + for i := 0; i < numToSend; i++ { + _, err := js.PublishAsync("foo", nil) + require_NoError(t, err) + } + select { + case <-js.PublishAsyncComplete(): + case <-time.After(20 * time.Second): + t.Fatalf("Did not receive completion signal") + } + + // Wait til ackfloor is correct for all consumers. + checkFor(t, 10*time.Second, 100*time.Millisecond, func() error { + for _, s := range c.servers { + mset, err := s.GlobalAccount().lookupStream("TEST") + require_NoError(t, err) + + mset.mu.RLock() + defer mset.mu.RUnlock() + + require_True(t, len(mset.consumers) == numConsumers) + + for _, o := range mset.consumers { + state, err := o.store.State() + require_NoError(t, err) + if state.AckFloor.Stream != uint64(numToSend) { + return fmt.Errorf("Ackfloor not correct yet") + } + } + } + return nil + }) + + for _, s := range c.servers { + mset, err := s.GlobalAccount().lookupStream("TEST") + require_NoError(t, err) + + mset.mu.RLock() + defer mset.mu.RUnlock() + + state := mset.state() + require_True(t, state.Msgs == 0) + require_True(t, state.FirstSeq == uint64(numToSend+1)) + } +} + +func TestNoRaceJetStreamClusterInterestStreamConsistencyAfterRollingRestart(t *testing.T) { + // Uncomment to run. Needs to be on a big machine. Do not want as part of Travis tests atm. + skip(t) + + c := createJetStreamClusterExplicit(t, "R3S", 3) + defer c.shutdown() + + numStreams := 200 + numConsumersPer := 5 + numPublishers := 10 + + nc, js := jsClientConnect(t, c.randomServer()) + defer nc.Close() + + qch := make(chan bool) + + var mm sync.Mutex + ackMap := make(map[string]map[uint64][]string) + + addAckTracking := func(seq uint64, stream, consumer string) { + mm.Lock() + defer mm.Unlock() + sam := ackMap[stream] + if sam == nil { + sam = make(map[uint64][]string) + ackMap[stream] = sam + } + sam[seq] = append(sam[seq], consumer) + } + + doPullSubscriber := func(stream, consumer, filter string) { + nc, js := jsClientConnect(t, c.randomServer()) + defer nc.Close() + + var err error + var sub *nats.Subscription + timeout := time.Now().Add(5 * time.Second) + for time.Now().Before(timeout) { + sub, err = js.PullSubscribe(filter, consumer, nats.BindStream(stream), nats.ManualAck()) + if err == nil { + break + } + } + if err != nil { + t.Logf("Error on pull subscriber: %v", err) + return + } + + for { + select { + case <-time.After(500 * time.Millisecond): + msgs, err := sub.Fetch(100, nats.MaxWait(time.Second)) + if err != nil { + continue + } + // Shuffleraf + rand.Shuffle(len(msgs), func(i, j int) { msgs[i], msgs[j] = msgs[j], msgs[i] }) + for _, m := range msgs { + meta, err := m.Metadata() + require_NoError(t, err) + m.Ack() + addAckTracking(meta.Sequence.Stream, stream, consumer) + if meta.NumDelivered > 1 { + t.Logf("Got a msg redelivered %d for sequence %d on %q %q\n", meta.NumDelivered, meta.Sequence.Stream, stream, consumer) + } + } + case <-qch: + nc.Flush() + return + } + } + } + + // Setup + wg := sync.WaitGroup{} + for i := 0; i < numStreams; i++ { + wg.Add(1) + go func(stream string) { + defer wg.Done() + subj := fmt.Sprintf("%s.>", stream) + _, err := js.AddStream(&nats.StreamConfig{ + Name: stream, + Subjects: []string{subj}, + Replicas: 3, + Retention: nats.InterestPolicy, + }) + require_NoError(t, err) + for i := 0; i < numConsumersPer; i++ { + consumer := fmt.Sprintf("C%d", i) + filter := fmt.Sprintf("%s.%d", stream, i) + _, err = js.AddConsumer(stream, &nats.ConsumerConfig{ + Durable: consumer, + FilterSubject: filter, + AckPolicy: nats.AckExplicitPolicy, + AckWait: 2 * time.Second, + }) + require_NoError(t, err) + c.waitOnConsumerLeader(globalAccountName, stream, consumer) + go doPullSubscriber(stream, consumer, filter) + } + }(fmt.Sprintf("A-%d", i)) + } + wg.Wait() + + msg := make([]byte, 2*1024) // 2k payload + rand.Read(msg) + + // Controls if publishing is on or off. + var pubActive atomic.Bool + + doPublish := func() { + nc, js := jsClientConnect(t, c.randomServer()) + defer nc.Close() + + for { + select { + case <-time.After(100 * time.Millisecond): + if pubActive.Load() { + for i := 0; i < numStreams; i++ { + for j := 0; j < numConsumersPer; j++ { + subj := fmt.Sprintf("A-%d.%d", i, j) + // Don't care about errors here for this test. + js.Publish(subj, msg) + } + } + } + case <-qch: + return + } + } + } + + pubActive.Store(true) + + for i := 0; i < numPublishers; i++ { + go doPublish() + } + + // Let run for a bit. + time.Sleep(20 * time.Second) + + // Do a rolling restart. + for _, s := range c.servers { + t.Logf("Shutdown %v\n", s) + s.Shutdown() + t.Logf("Restarting %v\n", s) + s = c.restartServer(s) + c.waitOnServerHealthz(s) + } + + // Let run for a bit longer. + time.Sleep(10 * time.Second) + + // Stop pubs. + pubActive.Store(false) + + // Let settle. + time.Sleep(10 * time.Second) + close(qch) + time.Sleep(20 * time.Second) + + nc, js = jsClientConnect(t, c.randomServer()) + defer nc.Close() + + minAckFloor := func(stream string) (uint64, string) { + var maf uint64 = math.MaxUint64 + var consumer string + for i := 0; i < numConsumersPer; i++ { + cname := fmt.Sprintf("C%d", i) + ci, err := js.ConsumerInfo(stream, cname) + require_NoError(t, err) + if ci.AckFloor.Stream < maf { + maf = ci.AckFloor.Stream + consumer = cname + } + } + return maf, consumer + } + + checkStreamAcks := func(stream string) { + mm.Lock() + defer mm.Unlock() + if sam := ackMap[stream]; sam != nil { + for seq := 1; ; seq++ { + acks := sam[uint64(seq)] + if acks == nil { + if sam[uint64(seq+1)] != nil { + t.Logf("Missing an ack on stream %q for sequence %d\n", stream, seq) + } else { + break + } + } + if len(acks) > 1 { + t.Fatalf("Multiple acks for %d which is not expected: %+v", seq, acks) + } + } + } + } + + // Now check all streams such that their first sequence is equal to the minimum of all consumers. + for i := 0; i < numStreams; i++ { + stream := fmt.Sprintf("A-%d", i) + si, err := js.StreamInfo(stream) + require_NoError(t, err) + + if maf, consumer := minAckFloor(stream); maf > si.State.FirstSeq { + t.Logf("\nBAD STATE DETECTED FOR %q, CHECKING OTHER SERVERS! ACK %d vs %+v LEADER %v, CL FOR %q %v\n", + stream, maf, si.State, c.streamLeader(globalAccountName, stream), consumer, c.consumerLeader(globalAccountName, stream, consumer)) + + checkStreamAcks(stream) + + for _, s := range c.servers { + mset, err := s.GlobalAccount().lookupStream(stream) + require_NoError(t, err) + state := mset.state() + t.Logf("Server %v Stream STATE %+v\n", s, state) + + var smv StoreMsg + if sm, err := mset.store.LoadMsg(state.FirstSeq, &smv); err == nil { + t.Logf("Subject for msg %d is %q", state.FirstSeq, sm.subj) + } else { + t.Logf("Could not retrieve msg for %d: %v", state.FirstSeq, err) + } + + if len(mset.preAcks) > 0 { + t.Logf("%v preAcks %+v\n", s, mset.preAcks) + } + + for _, o := range mset.consumers { + ostate, err := o.store.State() + require_NoError(t, err) + t.Logf("Consumer STATE for %q is %+v\n", o.name, ostate) + } + } + t.Fatalf("BAD STATE: ACKFLOOR > FIRST %d vs %d\n", maf, si.State.FirstSeq) + } + } +} + +func TestNoRaceFileStoreNumPending(t *testing.T) { + // No need for all permutations here. + storeDir := t.TempDir() + fcfg := FileStoreConfig{ + StoreDir: storeDir, + BlockSize: 2 * 1024, // Create many blocks on purpose. + } + fs, err := newFileStore(fcfg, StreamConfig{Name: "zzz", Subjects: []string{"*.*.*.*"}, Storage: FileStorage}) + require_NoError(t, err) + defer fs.Stop() + + tokens := []string{"foo", "bar", "baz"} + genSubj := func() string { + return fmt.Sprintf("%s.%s.%s.%s", + tokens[rand.Intn(len(tokens))], + tokens[rand.Intn(len(tokens))], + tokens[rand.Intn(len(tokens))], + tokens[rand.Intn(len(tokens))], + ) + } + + for i := 0; i < 50_000; i++ { + subj := genSubj() + _, _, err := fs.StoreMsg(subj, nil, []byte("Hello World")) + require_NoError(t, err) + } + + state := fs.State() + + // Scan one by one for sanity check against other calculations. + sanityCheck := func(sseq uint64, filter string) SimpleState { + t.Helper() + var ss SimpleState + var smv StoreMsg + // For here we know 0 is invalid, set to 1. + if sseq == 0 { + sseq = 1 + } + for seq := sseq; seq <= state.LastSeq; seq++ { + sm, err := fs.LoadMsg(seq, &smv) + if err != nil { + t.Logf("Encountered error %v loading sequence: %d", err, seq) + continue + } + if subjectIsSubsetMatch(sm.subj, filter) { + ss.Msgs++ + ss.Last = seq + if ss.First == 0 || seq < ss.First { + ss.First = seq + } + } + } + return ss + } + + check := func(sseq uint64, filter string) { + t.Helper() + np, lvs := fs.NumPending(sseq, filter, false) + ss := fs.FilteredState(sseq, filter) + sss := sanityCheck(sseq, filter) + if lvs != state.LastSeq { + t.Fatalf("Expected NumPending to return valid through last of %d but got %d", state.LastSeq, lvs) + } + if ss.Msgs != np { + t.Fatalf("NumPending of %d did not match ss.Msgs of %d", np, ss.Msgs) + } + if ss != sss { + t.Fatalf("Failed sanity check, expected %+v got %+v", sss, ss) + } + } + + sanityCheckLastOnly := func(sseq uint64, filter string) SimpleState { + t.Helper() + var ss SimpleState + var smv StoreMsg + // For here we know 0 is invalid, set to 1. + if sseq == 0 { + sseq = 1 + } + seen := make(map[string]bool) + for seq := state.LastSeq; seq >= sseq; seq-- { + sm, err := fs.LoadMsg(seq, &smv) + if err != nil { + t.Logf("Encountered error %v loading sequence: %d", err, seq) + continue + } + if !seen[sm.subj] && subjectIsSubsetMatch(sm.subj, filter) { + ss.Msgs++ + if ss.Last == 0 { + ss.Last = seq + } + if ss.First == 0 || seq < ss.First { + ss.First = seq + } + seen[sm.subj] = true + } + } + return ss + } + + checkLastOnly := func(sseq uint64, filter string) { + t.Helper() + np, lvs := fs.NumPending(sseq, filter, true) + ss := sanityCheckLastOnly(sseq, filter) + if lvs != state.LastSeq { + t.Fatalf("Expected NumPending to return valid through last of %d but got %d", state.LastSeq, lvs) + } + if ss.Msgs != np { + t.Fatalf("NumPending of %d did not match ss.Msgs of %d", np, ss.Msgs) + } + } + + startSeqs := []uint64{0, 1, 2, 200, 444, 555, 2222, 8888, 12_345, 28_222, 33_456, 44_400, 49_999} + checkSubs := []string{"foo.>", "*.bar.>", "foo.bar.*.baz", "*.bar.>", "*.foo.bar.*", "foo.foo.bar.baz"} + + for _, filter := range checkSubs { + for _, start := range startSeqs { + check(start, filter) + checkLastOnly(start, filter) + } + } +} diff --git a/server/raft.go b/server/raft.go index 8039d4b4232..11100b0e744 100644 --- a/server/raft.go +++ b/server/raft.go @@ -855,6 +855,11 @@ func (n *raft) Applied(index uint64) (entries uint64, bytes uint64) { n.Lock() defer n.Unlock() + // Ignore if not applicable. This can happen during a reset. + if index > n.commit { + return 0, 0 + } + // Ignore if already applied. if index > n.applied { n.applied = index @@ -939,7 +944,7 @@ func (n *raft) InstallSnapshot(data []byte) error { var state StreamState n.wal.FastState(&state) - if n.applied == 0 || len(data) == 0 { + if n.applied == 0 { n.Unlock() return errNoSnapAvailable } @@ -1325,6 +1330,7 @@ func (n *raft) StepDown(preferred ...string) error { } stepdown := n.stepdown + prop := n.prop n.Unlock() if len(preferred) > 0 && maybeLeader == noLeader { @@ -1334,7 +1340,7 @@ func (n *raft) StepDown(preferred ...string) error { // If we have a new leader selected, transfer over to them. if maybeLeader != noLeader { n.debug("Selected %q for new leader", maybeLeader) - n.sendAppendEntry([]*Entry{{EntryLeaderTransfer, []byte(maybeLeader)}}) + prop.push(&Entry{EntryLeaderTransfer, []byte(maybeLeader)}) } // Force us to stepdown here. n.debug("Stepping down") @@ -2412,9 +2418,11 @@ func (n *raft) applyCommit(index uint64) error { if ae, err = n.loadEntry(index); err != nil { if err != ErrStoreClosed && err != ErrStoreEOF { if err == errBadMsg { - n.setWriteErrLocked(err) + n.warn("Got an error loading %d index: %v - will reset", index, err) + n.resetWAL() + } else { + n.warn("Got an error loading %d index: %v", index, err) } - n.warn("Got an error loading %d index: %v", index, err) } n.commit = original return errEntryLoadFailed @@ -2489,6 +2497,9 @@ func (n *raft) applyCommit(index uint64) error { // We pass these up as well. committed = append(committed, e) + + case EntryLeaderTransfer: + // No-op } } // Pass to the upper layers if we have normal entries. @@ -2852,15 +2863,6 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) { } } - // Ignore old terms. - if isNew && ae.term < n.term { - ar := &appendEntryResponse{n.term, n.pindex, n.id, false, _EMPTY_} - n.Unlock() - n.debug("AppendEntry ignoring old term") - n.sendRPC(ae.reply, _EMPTY_, ar.encode(arbuf)) - return - } - // If we are catching up ignore old catchup subs. // This could happen when we stall or cancel a catchup. if !isNew && catchingUp && sub != n.catchup.sub { @@ -2896,6 +2898,7 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) { // If this term is greater than ours. if ae.term > n.term { + n.pterm = ae.pterm n.term = ae.term n.vote = noVote if isNew { @@ -2915,7 +2918,7 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) { n.updateLeadChange(false) } - if ae.pterm != n.pterm || ae.pindex != n.pindex { + if (isNew && ae.pterm != n.pterm) || ae.pindex != n.pindex { // Check if this is a lower or equal index than what we were expecting. if ae.pindex <= n.pindex { n.debug("AppendEntry detected pindex less than ours: %d:%d vs %d:%d", ae.pterm, ae.pindex, n.pterm, n.pindex) @@ -3004,7 +3007,7 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) { } // Save to our WAL if we have entries. - if len(ae.entries) > 0 { + if ae.shouldStore() { // Only store if an original which will have sub != nil if sub != nil { if err := n.storeToWAL(ae); err != nil { @@ -3029,26 +3032,26 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) { n.pterm = ae.term n.pindex = ae.pindex + 1 } + } - // Check to see if we have any related entries to process here. - for _, e := range ae.entries { - switch e.Type { - case EntryLeaderTransfer: - if isNew { - maybeLeader := string(e.Data) - if maybeLeader == n.id && !n.observer && !n.paused { - n.lxfer = true - n.xferCampaign() - } + // Check to see if we have any related entries to process here. + for _, e := range ae.entries { + switch e.Type { + case EntryLeaderTransfer: + if isNew { + maybeLeader := string(e.Data) + if maybeLeader == n.id && !n.observer && !n.paused { + n.lxfer = true + n.xferCampaign() } - case EntryAddPeer: - if newPeer := string(e.Data); len(newPeer) == idLen { - // Track directly, but wait for commit to be official - if ps := n.peers[newPeer]; ps != nil { - ps.ts = time.Now().UnixNano() - } else { - n.peers[newPeer] = &lps{time.Now().UnixNano(), 0, false} - } + } + case EntryAddPeer: + if newPeer := string(e.Data); len(newPeer) == idLen { + // Track directly, but wait for commit to be official + if ps := n.peers[newPeer]; ps != nil { + ps.ts = time.Now().UnixNano() + } else { + n.peers[newPeer] = &lps{time.Now().UnixNano(), 0, false} } } } @@ -3133,6 +3136,11 @@ func (n *raft) buildAppendEntry(entries []*Entry) *appendEntry { return &appendEntry{n.id, n.term, n.commit, n.pterm, n.pindex, entries, _EMPTY_, nil, nil} } +// Determine if we should store an entry. +func (ae *appendEntry) shouldStore() bool { + return ae != nil && len(ae.entries) > 0 && ae.entries[0].Type != EntryLeaderTransfer +} + // Store our append entry to our WAL. // lock should be held. func (n *raft) storeToWAL(ae *appendEntry) error { @@ -3188,7 +3196,7 @@ func (n *raft) sendAppendEntry(entries []*Entry) { } // If we have entries store this in our wal. - if len(entries) > 0 { + if ae.shouldStore() { if err := n.storeToWAL(ae); err != nil { return } @@ -3566,8 +3574,8 @@ func (n *raft) processVoteRequest(vr *voteRequest) error { if n.state != Follower { n.debug("Stepping down from candidate, detected higher term: %d vs %d", vr.term, n.term) n.stepdown.push(noLeader) + n.term = vr.term } - n.term = vr.term n.vote = noVote n.writeTermVote() } @@ -3576,6 +3584,7 @@ func (n *raft) processVoteRequest(vr *voteRequest) error { voteOk := n.vote == noVote || n.vote == vr.candidate if voteOk && (vr.lastTerm > n.pterm || vr.lastTerm == n.pterm && vr.lastIndex >= n.pindex) { vresp.granted = true + n.term = vr.term n.vote = vr.candidate n.writeTermVote() } else { diff --git a/server/stream.go b/server/stream.go index 34cb6119bd8..4e183f2b981 100644 --- a/server/stream.go +++ b/server/stream.go @@ -210,6 +210,7 @@ type stream struct { qch chan struct{} active bool ddloaded bool + closed bool // Mirror mirror *sourceInfo @@ -3306,6 +3307,8 @@ func (mset *stream) setupStore(fsCfg *FileStoreConfig) error { return err } mset.store = fs + // Register our server. + fs.registerServer(s) } mset.mu.Unlock() @@ -3676,7 +3679,7 @@ var ( func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte, lseq uint64, ts int64) error { mset.mu.Lock() c, s, store := mset.client, mset.srv, mset.store - if c == nil { + if mset.closed || c == nil { mset.mu.Unlock() return nil } @@ -4449,6 +4452,7 @@ func (mset *stream) stop(deleteFlag, advisory bool) error { // Clean up consumers. mset.mu.Lock() + mset.closed = true var obs []*consumer for _, o := range mset.consumers { obs = append(obs, o) @@ -4628,6 +4632,12 @@ func (mset *stream) getPublicConsumers() []*consumer { return obs } +func (mset *stream) isInterestRetention() bool { + mset.mu.RLock() + defer mset.mu.RUnlock() + return mset.cfg.Retention != LimitsPolicy +} + // NumConsumers reports on number of active consumers for this stream. func (mset *stream) numConsumers() int { mset.mu.RLock() @@ -4817,6 +4827,7 @@ func (mset *stream) potentialFilteredConsumers() bool { return false } +// Write lock should be held here for the stream to avoid race conditions on state. func (mset *stream) checkInterest(seq uint64, obs *consumer) bool { var subj string if mset.potentialFilteredConsumers() { @@ -4838,14 +4849,23 @@ func (mset *stream) checkInterest(seq uint64, obs *consumer) bool { // ackMsg is called into from a consumer when we have a WorkQueue or Interest Retention Policy. func (mset *stream) ackMsg(o *consumer, seq uint64) { - if mset.cfg.Retention == LimitsPolicy { + if seq == 0 || mset.cfg.Retention == LimitsPolicy { + return + } + + // Don't make this RLock(). We need to have only 1 running at a time to gauge interest across all consumers. + mset.mu.Lock() + if mset.closed || mset.store == nil { + mset.mu.Unlock() return } - // Make sure this sequence is not below our first sequence. var state StreamState mset.store.FastState(&state) + + // Make sure this sequence is not below our first sequence. if seq < state.FirstSeq { + mset.mu.Unlock() return } @@ -4854,31 +4874,45 @@ func (mset *stream) ackMsg(o *consumer, seq uint64) { case WorkQueuePolicy: // Normally we just remove a message when its ack'd here but if we have direct consumers // from sources and/or mirrors we need to make sure they have delivered the msg. - mset.mu.RLock() shouldRemove = mset.directs <= 0 || !mset.checkInterest(seq, o) - mset.mu.RUnlock() case InterestPolicy: - mset.mu.RLock() shouldRemove = !mset.checkInterest(seq, o) - mset.mu.RUnlock() } - if shouldRemove { - if _, err := mset.store.RemoveMsg(seq); err == ErrStoreEOF { - // This should be rare but I have seen it. - // The ack reached us before the actual msg. - var state StreamState - mset.store.FastState(&state) - if seq >= state.LastSeq { - mset.mu.Lock() - if mset.preAcks == nil { - mset.preAcks = make(map[uint64]struct{}) - } - mset.preAcks[seq] = struct{}{} - mset.mu.Unlock() - } + // Check for existing preAcks. These will override the concept of shouldRemove. + if len(mset.preAcks) > 0 { + if _, hasAck := mset.preAcks[seq]; hasAck { + delete(mset.preAcks, seq) + shouldRemove = true } } + + // If we should remove but we know this is beyond our last we can add to preAcks here. + // The ack reached us before the actual msg. + ackBeforeMsg := shouldRemove && seq > state.LastSeq + if ackBeforeMsg { + if mset.preAcks == nil { + mset.preAcks = make(map[uint64]struct{}) + } + mset.preAcks[seq] = struct{}{} + } + mset.mu.Unlock() + + // If nothing else to do. + if !shouldRemove || ackBeforeMsg { + return + } + + // If we are here we should attempt to remove. + if _, err := mset.store.RemoveMsg(seq); err == ErrStoreEOF { + // The ack reached us before the actual msg. + mset.mu.Lock() + if mset.preAcks == nil { + mset.preAcks = make(map[uint64]struct{}) + } + mset.preAcks[seq] = struct{}{} + mset.mu.Unlock() + } } // Snapshot creates a snapshot for the stream and possibly consumers. @@ -5076,37 +5110,17 @@ func (a *Account) RestoreStream(ncfg *StreamConfig, r io.Reader) (*stream, error return mset, nil } -// This is to check for dangling messages. +// This is to check for dangling messages on interest retention streams. // Issue https://github.com/nats-io/nats-server/issues/3612 func (mset *stream) checkForOrphanMsgs() { - // We need to grab the low water mark for all consumers. - var ackFloor uint64 mset.mu.RLock() + consumers := make([]*consumer, 0, len(mset.consumers)) for _, o := range mset.consumers { - o.mu.RLock() - if o.store != nil { - if state, err := o.store.BorrowState(); err == nil { - if ackFloor == 0 || state.AckFloor.Stream < ackFloor { - ackFloor = state.AckFloor.Stream - } - } - } - o.mu.RUnlock() + consumers = append(consumers, o) } - // Grabs stream state. - var state StreamState - mset.store.FastState(&state) - s, acc := mset.srv, mset.acc mset.mu.RUnlock() - - if ackFloor > state.FirstSeq { - req := &JSApiStreamPurgeRequest{Sequence: ackFloor + 1} - purged, err := mset.purge(req) - if err != nil { - s.Warnf("stream '%s > %s' could not auto purge orphaned messages: %v", acc, mset.name(), err) - } else { - s.Debugf("stream '%s > %s' auto purged %d messages", acc, mset.name(), purged) - } + for _, o := range consumers { + o.checkStateForInterestStream() } }