diff --git a/server/consumer.go b/server/consumer.go index d9fa230170..ea6fbbe85b 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -3092,7 +3092,10 @@ func (o *consumer) getNextMsg() (*jsPubMsg, uint64, error) { o.notifyDeliveryExceeded(seq, dc-1) } // Make sure to remove from pending. - delete(o.pending, seq) + if p, ok := o.pending[seq]; ok && p != nil { + delete(o.pending, seq) + o.updateDelivered(p.Sequence, seq, dc, p.Timestamp) + } continue } if seq > 0 { diff --git a/server/filestore.go b/server/filestore.go index 7b167aefb9..2857abec5b 100644 --- a/server/filestore.go +++ b/server/filestore.go @@ -6946,20 +6946,28 @@ func (o *consumerFileStore) UpdateDelivered(dseq, sseq, dc uint64, ts int64) err } if dc > 1 { + if maxdc := uint64(o.cfg.MaxDeliver); maxdc > 0 && dc > maxdc { + // Make sure to remove from pending. + delete(o.state.Pending, sseq) + } if o.state.Redelivered == nil { o.state.Redelivered = make(map[uint64]uint64) } // Only update if greater then what we already have. - if o.state.Redelivered[sseq] < dc { + if o.state.Redelivered[sseq] < dc-1 { o.state.Redelivered[sseq] = dc - 1 } } } else { // For AckNone just update delivered and ackfloor at the same time. - o.state.Delivered.Consumer = dseq - o.state.Delivered.Stream = sseq - o.state.AckFloor.Consumer = dseq - o.state.AckFloor.Stream = sseq + if dseq > o.state.Delivered.Consumer { + o.state.Delivered.Consumer = dseq + o.state.AckFloor.Consumer = dseq + } + if sseq > o.state.Delivered.Stream { + o.state.Delivered.Stream = sseq + o.state.AckFloor.Stream = sseq + } } // Make sure we flush to disk. o.kickFlusher() diff --git a/server/jetstream_cluster_3_test.go b/server/jetstream_cluster_3_test.go index eb6d578902..c7ac26fe52 100644 --- a/server/jetstream_cluster_3_test.go +++ b/server/jetstream_cluster_3_test.go @@ -5156,3 +5156,122 @@ func TestJetStreamClusterDurableConsumerInactiveThresholdLeaderSwitch(t *testing } } } + +func TestJetStreamClusterConsumerMaxDeliveryNumAckPendingBug(t *testing.T) { + c := createJetStreamClusterExplicit(t, "R3S", 3) + defer c.shutdown() + + nc, js := jsClientConnect(t, c.randomServer()) + defer nc.Close() + + _, err := js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"*"}, + Replicas: 3, + }) + require_NoError(t, err) + + // send 50 msgs + for i := 0; i < 50; i++ { + _, err := js.Publish("foo", []byte("ok")) + require_NoError(t, err) + } + + // File based. + _, err = js.Subscribe("foo", + func(msg *nats.Msg) {}, + nats.Durable("file"), + nats.ManualAck(), + nats.MaxDeliver(1), + nats.AckWait(time.Second), + nats.MaxAckPending(10), + ) + require_NoError(t, err) + + // Let first batch retry and expire. + time.Sleep(1200 * time.Millisecond) + + cia, err := js.ConsumerInfo("TEST", "file") + require_NoError(t, err) + + // Make sure followers will have exact same state. + _, err = nc.Request(fmt.Sprintf(JSApiConsumerLeaderStepDownT, "TEST", "file"), nil, time.Second) + require_NoError(t, err) + c.waitOnConsumerLeader(globalAccountName, "TEST", "file") + + cib, err := js.ConsumerInfo("TEST", "file") + require_NoError(t, err) + + // Want to compare sans cluster details which we know will change due to leader change. + // Also last activity for delivered can be slightly off so nil out as well. + checkConsumerInfo := func(a, b *nats.ConsumerInfo) { + t.Helper() + a.Cluster, b.Cluster = nil, nil + a.Delivered.Last, b.Delivered.Last = nil, nil + if !reflect.DeepEqual(a, b) { + t.Fatalf("ConsumerInfo do not match\n\t%+v\n\t%+v", a, b) + } + } + + checkConsumerInfo(cia, cib) + + // Memory based. + _, err = js.Subscribe("foo", + func(msg *nats.Msg) {}, + nats.Durable("mem"), + nats.ManualAck(), + nats.MaxDeliver(1), + nats.AckWait(time.Second), + nats.MaxAckPending(10), + nats.ConsumerMemoryStorage(), + ) + require_NoError(t, err) + + // Let first batch retry and expire. + time.Sleep(1200 * time.Millisecond) + + cia, err = js.ConsumerInfo("TEST", "mem") + require_NoError(t, err) + + // Make sure followers will have exact same state. + _, err = nc.Request(fmt.Sprintf(JSApiConsumerLeaderStepDownT, "TEST", "mem"), nil, time.Second) + require_NoError(t, err) + c.waitOnConsumerLeader(globalAccountName, "TEST", "mem") + + cib, err = js.ConsumerInfo("TEST", "mem") + require_NoError(t, err) + + checkConsumerInfo(cia, cib) + + // Now file based but R1 and server restart. + _, err = js.Subscribe("foo", + func(msg *nats.Msg) {}, + nats.Durable("r1"), + nats.ManualAck(), + nats.MaxDeliver(1), + nats.AckWait(time.Second), + nats.MaxAckPending(10), + nats.ConsumerReplicas(1), + ) + require_NoError(t, err) + + // Let first batch retry and expire. + time.Sleep(1200 * time.Millisecond) + + cia, err = js.ConsumerInfo("TEST", "r1") + require_NoError(t, err) + + cl := c.consumerLeader(globalAccountName, "TEST", "r1") + cl.Shutdown() + cl.WaitForShutdown() + cl = c.restartServer(cl) + c.waitOnServerCurrent(cl) + + cib, err = js.ConsumerInfo("TEST", "r1") + require_NoError(t, err) + + // Created can skew a small bit due to server restart, this is expected. + now := time.Now() + cia.Created, cib.Created = now, now + checkConsumerInfo(cia, cib) +} diff --git a/server/memstore.go b/server/memstore.go index b35e3d294d..b2ed14aac8 100644 --- a/server/memstore.go +++ b/server/memstore.go @@ -1307,17 +1307,28 @@ func (o *consumerMemStore) UpdateDelivered(dseq, sseq, dc uint64, ts int64) erro } if dc > 1 { + if maxdc := uint64(o.cfg.MaxDeliver); maxdc > 0 && dc > maxdc { + // Make sure to remove from pending. + delete(o.state.Pending, sseq) + } if o.state.Redelivered == nil { o.state.Redelivered = make(map[uint64]uint64) } - o.state.Redelivered[sseq] = dc - 1 + // Only update if greater then what we already have. + if o.state.Redelivered[sseq] < dc-1 { + o.state.Redelivered[sseq] = dc - 1 + } } } else { // For AckNone just update delivered and ackfloor at the same time. - o.state.Delivered.Consumer = dseq - o.state.Delivered.Stream = sseq - o.state.AckFloor.Consumer = dseq - o.state.AckFloor.Stream = sseq + if dseq > o.state.Delivered.Consumer { + o.state.Delivered.Consumer = dseq + o.state.AckFloor.Consumer = dseq + } + if sseq > o.state.Delivered.Stream { + o.state.Delivered.Stream = sseq + o.state.AckFloor.Stream = sseq + } } return nil