diff --git a/server/consumer.go b/server/consumer.go index cdac8ccda1..8364b9c199 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -3237,6 +3237,69 @@ func (o *consumer) hbTimer() (time.Duration, *time.Timer) { return o.cfg.Heartbeat, time.NewTimer(o.cfg.Heartbeat) } +// Check here for conditions when our ack floor may have drifted below the streams first sequence. +// In general this is accounted for in normal operations, but if the consumer misses the signal from +// the stream it will not clear the message and move the ack state. +// Should only be called from consumer leader. +func (o *consumer) checkAckFloor() { + o.mu.RLock() + mset, closed, asflr := o.mset, o.closed, o.asflr + o.mu.RUnlock() + + if closed || mset == nil { + return + } + + var ss StreamState + mset.store.FastState(&ss) + + // If our floor is equal or greater that is normal and nothing for us to do. + if ss.FirstSeq == 0 || asflr >= ss.FirstSeq-1 { + return + } + + // Process all messages that no longer exist. + for seq := asflr + 1; seq < ss.FirstSeq; seq++ { + // Check if this message was pending. + o.mu.RLock() + p, isPending := o.pending[seq] + var rdc uint64 = 1 + if o.rdc != nil { + rdc = o.rdc[seq] + } + o.mu.RUnlock() + // If it was pending for us, get rid of it. + if isPending { + o.processTerm(seq, p.Sequence, rdc) + } + } + + // Do one final check here. + o.mu.Lock() + defer o.mu.Unlock() + + // If we are here, and this should be rare, we still are off with our ack floor. + // We will set it explicitly to 1 behind our current lowest in pending, or if + // pending is empty, to our current delivered -1. + if o.asflr < ss.FirstSeq-1 { + var psseq, pdseq uint64 + for seq, p := range o.pending { + if psseq == 0 || seq < psseq { + psseq, pdseq = seq, p.Sequence + } + } + // If we still have none, set to current delivered -1. + if psseq == 0 { + psseq, pdseq = o.sseq-1, o.dseq-1 + // If still not adjusted. + if psseq < ss.FirstSeq-1 { + psseq, pdseq = ss.FirstSeq-1, ss.FirstSeq-1 + } + } + o.asflr, o.adflr = psseq, pdseq + } +} + func (o *consumer) processInboundAcks(qch chan struct{}) { // Grab the server lock to watch for server quit. o.mu.RLock() @@ -3244,6 +3307,12 @@ func (o *consumer) processInboundAcks(qch chan struct{}) { hasInactiveThresh := o.cfg.InactiveThreshold > 0 o.mu.RUnlock() + // We will check this on entry and periodically. + o.checkAckFloor() + + // How often we will check for ack floor drift. + var ackFloorCheck = 30 * time.Second + for { select { case <-o.ackMsgs.ch: @@ -3257,6 +3326,8 @@ func (o *consumer) processInboundAcks(qch chan struct{}) { if hasInactiveThresh { o.suppressDeletion() } + case <-time.After(ackFloorCheck): + o.checkAckFloor() case <-qch: return case <-s.quitCh: diff --git a/server/jetstream_cluster_3_test.go b/server/jetstream_cluster_3_test.go index 52e43d631e..d9b2d09434 100644 --- a/server/jetstream_cluster_3_test.go +++ b/server/jetstream_cluster_3_test.go @@ -3511,3 +3511,145 @@ func TestJetStreamNoLeadersDuringLameDuck(t *testing.T) { } } } + +// If a consumer has not been registered (possible in heavily loaded systems with lots of assets) +// it could miss the signal of a message going away. If that message was pending and expires the +// ack floor could fall below the stream first sequence. This test will force that condition and +// make sure the system resolves itself. +func TestJetStreamConsumerAckFloorDrift(t *testing.T) { + c := createJetStreamClusterExplicit(t, "R3S", 3) + defer c.shutdown() + + nc, js := jsClientConnect(t, c.randomServer()) + defer nc.Close() + + _, err := js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"*"}, + Replicas: 3, + MaxAge: 200 * time.Millisecond, + MaxMsgs: 10, + }) + require_NoError(t, err) + + sub, err := js.PullSubscribe("foo", "C") + require_NoError(t, err) + + for i := 0; i < 10; i++ { + sendStreamMsg(t, nc, "foo", "HELLO") + } + + // No-op but will surface as delivered. + _, err = sub.Fetch(10) + require_NoError(t, err) + + // We will grab the state with delivered being 10 and ackfloor being 0 directly. + cl := c.consumerLeader(globalAccountName, "TEST", "C") + require_NotNil(t, cl) + + mset, err := cl.GlobalAccount().lookupStream("TEST") + require_NoError(t, err) + o := mset.lookupConsumer("C") + require_NotNil(t, o) + o.mu.RLock() + state, err := o.store.State() + o.mu.RUnlock() + require_NoError(t, err) + require_NotNil(t, state) + + // Now let messages expire. + checkFor(t, time.Second, 100*time.Millisecond, func() error { + si, err := js.StreamInfo("TEST") + require_NoError(t, err) + if si.State.Msgs == 0 { + return nil + } + return fmt.Errorf("stream still has msgs") + }) + + // Set state to ackfloor of 5 and no pending. + state.AckFloor.Consumer = 5 + state.AckFloor.Stream = 5 + state.Pending = nil + + // Now put back the state underneath of the consumers. + for _, s := range c.servers { + mset, err := s.GlobalAccount().lookupStream("TEST") + require_NoError(t, err) + o := mset.lookupConsumer("C") + require_NotNil(t, o) + o.mu.Lock() + err = o.setStoreState(state) + cfs := o.store.(*consumerFileStore) + o.mu.Unlock() + require_NoError(t, err) + // The lower layer will ignore, so set more directly. + cfs.mu.Lock() + cfs.state = *state + cfs.mu.Unlock() + // Also snapshot to remove any raft entries that could affect it. + snap, err := o.store.EncodedState() + require_NoError(t, err) + require_NoError(t, o.raftNode().InstallSnapshot(snap)) + } + + cl.JetStreamStepdownConsumer(globalAccountName, "TEST", "C") + c.waitOnConsumerLeader(globalAccountName, "TEST", "C") + + checkFor(t, 5*time.Second, 100*time.Millisecond, func() error { + ci, err := js.ConsumerInfo("TEST", "C") + require_NoError(t, err) + // Make sure we catch this and adjust. + if ci.AckFloor.Stream == 10 && ci.AckFloor.Consumer == 10 { + return nil + } + return fmt.Errorf("AckFloor not correct, expected 10, got %+v", ci.AckFloor) + }) +} + +func TestJetStreamClusterInterestStreamFilteredConsumersWithNoInterest(t *testing.T) { + c := createJetStreamClusterExplicit(t, "R5S", 5) + defer c.shutdown() + + nc, js := jsClientConnect(t, c.randomServer()) + defer nc.Close() + + _, err := js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"*"}, + Retention: nats.InterestPolicy, + Replicas: 3, + }) + require_NoError(t, err) + + // Create three subscribers. + ackCb := func(m *nats.Msg) { m.Ack() } + + _, err = js.Subscribe("foo", ackCb, nats.BindStream("TEST"), nats.ManualAck()) + require_NoError(t, err) + + _, err = js.Subscribe("bar", ackCb, nats.BindStream("TEST"), nats.ManualAck()) + require_NoError(t, err) + + _, err = js.Subscribe("baz", ackCb, nats.BindStream("TEST"), nats.ManualAck()) + require_NoError(t, err) + + // Now send 100 messages, randomly picking foo or bar, but never baz. + for i := 0; i < 100; i++ { + if rand.Intn(2) > 0 { + sendStreamMsg(t, nc, "foo", "HELLO") + } else { + sendStreamMsg(t, nc, "bar", "WORLD") + } + } + + // Messages are expected to go to 0. + checkFor(t, time.Second, 100*time.Millisecond, func() error { + si, err := js.StreamInfo("TEST") + require_NoError(t, err) + if si.State.Msgs == 0 { + return nil + } + return fmt.Errorf("stream still has msgs") + }) +}