diff --git a/server/consumer.go b/server/consumer.go index 960ddc0656..e0b1e04c4b 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -47,23 +47,24 @@ type ConsumerInfo struct { } type ConsumerConfig struct { - Durable string `json:"durable_name,omitempty"` - Description string `json:"description,omitempty"` - DeliverPolicy DeliverPolicy `json:"deliver_policy"` - OptStartSeq uint64 `json:"opt_start_seq,omitempty"` - OptStartTime *time.Time `json:"opt_start_time,omitempty"` - AckPolicy AckPolicy `json:"ack_policy"` - AckWait time.Duration `json:"ack_wait,omitempty"` - MaxDeliver int `json:"max_deliver,omitempty"` - FilterSubject string `json:"filter_subject,omitempty"` - ReplayPolicy ReplayPolicy `json:"replay_policy"` - RateLimit uint64 `json:"rate_limit_bps,omitempty"` // Bits per sec - SampleFrequency string `json:"sample_freq,omitempty"` - MaxWaiting int `json:"max_waiting,omitempty"` - MaxAckPending int `json:"max_ack_pending,omitempty"` - Heartbeat time.Duration `json:"idle_heartbeat,omitempty"` - FlowControl bool `json:"flow_control,omitempty"` - HeadersOnly bool `json:"headers_only,omitempty"` + Durable string `json:"durable_name,omitempty"` + Description string `json:"description,omitempty"` + DeliverPolicy DeliverPolicy `json:"deliver_policy"` + OptStartSeq uint64 `json:"opt_start_seq,omitempty"` + OptStartTime *time.Time `json:"opt_start_time,omitempty"` + AckPolicy AckPolicy `json:"ack_policy"` + AckWait time.Duration `json:"ack_wait,omitempty"` + MaxDeliver int `json:"max_deliver,omitempty"` + BackOff []time.Duration `json:"backoff,omitempty"` + FilterSubject string `json:"filter_subject,omitempty"` + ReplayPolicy ReplayPolicy `json:"replay_policy"` + RateLimit uint64 `json:"rate_limit_bps,omitempty"` // Bits per sec + SampleFrequency string `json:"sample_freq,omitempty"` + MaxWaiting int `json:"max_waiting,omitempty"` + MaxAckPending int `json:"max_ack_pending,omitempty"` + Heartbeat time.Duration `json:"idle_heartbeat,omitempty"` + FlowControl bool `json:"flow_control,omitempty"` + HeadersOnly bool `json:"headers_only,omitempty"` // Pull based options. MaxRequestBatch int `json:"max_batch,omitempty"` @@ -92,6 +93,11 @@ type CreateConsumerRequest struct { Config ConsumerConfig `json:"config"` } +// ConsumerNakOptions is for optional NAK values, e.g. delay. +type ConsumerNakOptions struct { + Delay time.Duration `json:"delay"` +} + // DeliverPolicy determines how the consumer should select the first message to deliver. type DeliverPolicy int @@ -297,6 +303,10 @@ func setConsumerConfigDefaults(config *ConsumerConfig) { if config.MaxDeliver == 0 { config.MaxDeliver = -1 } + // If BackOff was specified that will override the AckWait and the MaxDeliver. + if len(config.BackOff) > 0 { + config.AckWait = config.BackOff[0] + } // Set proper default for max ack pending if we are ack explicit and none has been set. if (config.AckPolicy == AckExplicit || config.AckPolicy == AckAll) && config.MaxAckPending == 0 { config.MaxAckPending = JsDefaultMaxAckPending @@ -326,6 +336,11 @@ func (mset *stream) addConsumerWithAssignment(config *ConsumerConfig, oname stri // Make sure we have sane defaults. setConsumerConfigDefaults(config) + // Check if we have a BackOff defined that MaxDeliver is within range etc. + if lbo := len(config.BackOff); lbo > 0 && config.MaxDeliver <= lbo { + return nil, NewJSConsumerMaxDeliverBackoffError() + } + if len(config.Description) > JSMaxDescriptionLen { return nil, NewJSConsumerDescriptionTooLongError(JSMaxDescriptionLen) } @@ -1336,7 +1351,7 @@ func (o *consumer) updateDeliverSubjectLocked(newDeliver string) { func configsEqualSansDelivery(a, b ConsumerConfig) bool { // These were copied in so can set Delivery here. a.DeliverSubject, b.DeliverSubject = _EMPTY_, _EMPTY_ - return a == b + return reflect.DeepEqual(a, b) } // Helper to send a reply to an ack. @@ -1369,8 +1384,8 @@ func (o *consumer) processAck(_ *subscription, c *client, acc *Account, subject, o.processNextMsgReq(nil, c, acc, subject, reply, msg[len(AckNext):]) c.pa.hdr = phdr skipAckReply = true - case bytes.Equal(msg, AckNak): - o.processNak(sseq, dseq) + case bytes.HasPrefix(msg, AckNak): + o.processNak(sseq, dseq, dc, msg) case bytes.Equal(msg, AckProgress): o.progressUpdate(sseq) case bytes.Equal(msg, AckTerm): @@ -1592,7 +1607,7 @@ func (o *consumer) checkPendingRequests() { } // Process a NAK. -func (o *consumer) processNak(sseq, dseq uint64) { +func (o *consumer) processNak(sseq, dseq, dc uint64, nak []byte) { o.mu.Lock() defer o.mu.Unlock() @@ -1606,6 +1621,44 @@ func (o *consumer) processNak(sseq, dseq uint64) { return } } + // Check to see if we have delays attached. + if len(nak) > len(AckNak) { + arg := bytes.TrimSpace(nak[len(AckNak):]) + if len(arg) > 0 { + var d time.Duration + var err error + if arg[0] == '{' { + var nd ConsumerNakOptions + if err = json.Unmarshal(arg, &nd); err == nil { + d = nd.Delay + } + } else { + d, err = time.ParseDuration(string(arg)) + } + if err != nil { + // Treat this as normal NAK. + o.srv.Warnf("JetStream consumer '%s > %s > %s' bad NAK delay value: %q", o.acc.Name, o.stream, o.name, arg) + } else { + // We have a parsed duration that the user wants us to wait before retrying. + // Make sure we are not on the rdq. + o.removeFromRedeliverQueue(sseq) + if p, ok := o.pending[sseq]; ok { + // now - ackWait is expired now, so offset from there. + p.Timestamp = time.Now().Add(-o.cfg.AckWait).Add(d).UnixNano() + // Update store system which will update followers as well. + o.updateDelivered(p.Sequence, sseq, dc, p.Timestamp) + if o.ptmr != nil { + // Want checkPending to run and figure out the next timer ttl. + // TODO(dlc) - We could optimize this maybe a bit more and track when we expect the timer to fire. + o.ptmr.Reset(10 * time.Millisecond) + } + } + // Nothing else for use to do now so return. + return + } + } + } + // If already queued up also ignore. if !o.onRedeliverQueue(sseq) { o.addToRedeliverQueue(sseq) @@ -1709,8 +1762,8 @@ func (o *consumer) applyState(state *ConsumerState) { // Setup tracking timer if we have restored pending. if len(o.pending) > 0 && o.ptmr == nil { // This is on startup or leader change. We want to check pending - // sooner in case there are inconsistencies etc. Pick between 1-5 secs. - delay := time.Second + time.Duration(rand.Int63n(4000))*time.Millisecond + // sooner in case there are inconsistencies etc. Pick between 500ms - 1.5s + delay := 500*time.Millisecond + time.Duration(rand.Int63n(1000))*time.Millisecond // If normal is lower than this just use that. if o.cfg.AckWait < delay { delay = o.ackWait(0) @@ -3065,14 +3118,21 @@ func (o *consumer) checkPending() { shouldUpdateState = true continue } - elapsed := now - p.Timestamp - if elapsed >= ttl { + elapsed, deadline := now-p.Timestamp, ttl + if len(o.cfg.BackOff) > 0 && o.rdc != nil { + dc := int(o.rdc[p.Sequence]) + if dc >= len(o.cfg.BackOff) { + dc = len(o.cfg.BackOff) - 1 + } + deadline = int64(o.cfg.BackOff[dc]) + } + if elapsed >= deadline { if !o.onRedeliverQueue(seq) { expired = append(expired, seq) } - } else if ttl-elapsed < next { + } else if deadline-elapsed < next { // Update when we should fire next. - next = ttl - elapsed + next = deadline - elapsed } } diff --git a/server/errors.json b/server/errors.json index 7f114f9924..a0b510e794 100644 --- a/server/errors.json +++ b/server/errors.json @@ -1138,5 +1138,15 @@ "help": "", "url": "", "deprecates": "" + }, + { + "constant": "JSConsumerMaxDeliverBackoffErr", + "code": 400, + "error_code": 10116, + "description": "max deliver is required to be \u003e length of backoff values", + "comment": "", + "help": "", + "url": "", + "deprecates": "" } ] \ No newline at end of file diff --git a/server/filestore.go b/server/filestore.go index f24d835f2e..099d7bc2fd 100644 --- a/server/filestore.go +++ b/server/filestore.go @@ -4884,7 +4884,7 @@ func (o *consumerFileStore) UpdateDelivered(dseq, sseq, dc uint64, ts int64) err } // On restarts the old leader may get a replay from the raft logs that are old. - if dseq <= o.state.Delivered.Consumer { + if dseq <= o.state.AckFloor.Consumer { return nil } diff --git a/server/filestore_test.go b/server/filestore_test.go index ffce4a6165..e9f4a5e131 100644 --- a/server/filestore_test.go +++ b/server/filestore_test.go @@ -1409,7 +1409,7 @@ func TestFileStoreMeta(t *testing.T) { if err := json.Unmarshal(buf, &oconfig2); err != nil { t.Fatalf("Error unmarshalling: %v", err) } - if oconfig2 != oconfig { + if !reflect.DeepEqual(oconfig2, oconfig) { t.Fatalf("Consumer configs not equal, got %+v vs %+v", oconfig2, oconfig) } checksum, err = ioutil.ReadFile(ometasum) diff --git a/server/jetstream_api.go b/server/jetstream_api.go index 78e17fa569..c2abda6c3f 100644 --- a/server/jetstream_api.go +++ b/server/jetstream_api.go @@ -3129,6 +3129,13 @@ func (s *Server) jsConsumerCreate(sub *subscription, c *client, a *Account, subj // Make sure we have sane defaults. setConsumerConfigDefaults(&req.Config) + // Check if we have a BackOff defined that MaxDeliver is within range etc. + if lbo := len(req.Config.BackOff); lbo > 0 && req.Config.MaxDeliver <= lbo { + resp.Error = NewJSConsumerMaxDeliverBackoffError() + s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) + return + } + // Determine if we should proceed here when we are in clustered mode. if s.JetStreamIsClustered() { if req.Config.Direct { diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index f68b9314f1..4ceb97aec3 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -2052,7 +2052,7 @@ func (s *Server) sendStreamLostQuorumAdvisory(mset *stream) { return } - s.Warnf("JetStream cluster stream '%s > %s' has NO quorum, stalled.", acc.GetName(), stream) + s.Warnf("JetStream cluster stream '%s > %s' has NO quorum, stalled", acc.GetName(), stream) subj := JSAdvisoryStreamQuorumLostPre + "." + stream adv := &JSStreamQuorumLostAdvisory{ diff --git a/server/jetstream_cluster_test.go b/server/jetstream_cluster_test.go index 4637198a4b..64e78afbd9 100644 --- a/server/jetstream_cluster_test.go +++ b/server/jetstream_cluster_test.go @@ -10189,6 +10189,177 @@ func TestJetStreamClusterEphemeralPullConsumerServerShutdown(t *testing.T) { } } +func TestJetStreamClusterNAKBackoffs(t *testing.T) { + c := createJetStreamClusterExplicit(t, "JSC", 3) + defer c.shutdown() + + nc, js := jsClientConnect(t, c.randomServer()) + defer nc.Close() + + _, err := js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Replicas: 2, + Subjects: []string{"foo"}, + }) + require_NoError(t, err) + + _, err = js.Publish("foo", []byte("NAK")) + require_NoError(t, err) + + sub, err := js.SubscribeSync("foo", nats.Durable("dlc"), nats.AckWait(5*time.Second), nats.ManualAck()) + require_NoError(t, err) + defer sub.Unsubscribe() + + checkSubsPending(t, sub, 1) + m, err := sub.NextMsg(0) + require_NoError(t, err) + + // Default nak will redeliver almost immediately. + // We can now add a parse duration string after whitespace to the NAK proto. + start := time.Now() + dnak := []byte(fmt.Sprintf("%s 200ms", AckNak)) + m.Respond(dnak) + checkSubsPending(t, sub, 1) + elapsed := time.Since(start) + if elapsed < 200*time.Millisecond { + t.Fatalf("Took too short to redeliver, expected ~200ms but got %v", elapsed) + } + if elapsed > time.Second { + t.Fatalf("Took too long to redeliver, expected ~200ms but got %v", elapsed) + } + + // Now let's delay and make sure that is honored when a new consumer leader takes over. + m, err = sub.NextMsg(0) + require_NoError(t, err) + dnak = []byte(fmt.Sprintf("%s 1s", AckNak)) + start = time.Now() + m.Respond(dnak) + // Wait for NAK state to propagate. + time.Sleep(100 * time.Millisecond) + // Ask leader to stepdown. + _, err = nc.Request(fmt.Sprintf(JSApiConsumerLeaderStepDownT, "TEST", "dlc"), nil, time.Second) + require_NoError(t, err) + c.waitOnConsumerLeader("$G", "TEST", "dlc") + checkSubsPending(t, sub, 1) + elapsed = time.Since(start) + if elapsed < time.Second { + t.Fatalf("Took too short to redeliver, expected ~1s but got %v", elapsed) + } + if elapsed > 2*time.Second { + t.Fatalf("Took too long to redeliver, expected ~1s but got %v", elapsed) + } + + // Test json version. + delay, err := json.Marshal(&ConsumerNakOptions{Delay: 20 * time.Millisecond}) + require_NoError(t, err) + dnak = []byte(fmt.Sprintf("%s %s", AckNak, delay)) + m, err = sub.NextMsg(0) + require_NoError(t, err) + start = time.Now() + m.Respond(dnak) + checkSubsPending(t, sub, 1) + elapsed = time.Since(start) + if elapsed < 20*time.Millisecond { + t.Fatalf("Took too short to redeliver, expected ~20ms but got %v", elapsed) + } + if elapsed > 100*time.Millisecond { + t.Fatalf("Took too long to redeliver, expected ~20ms but got %v", elapsed) + } +} + +func TestJetStreamClusterRedeliverBackoffs(t *testing.T) { + c := createJetStreamClusterExplicit(t, "JSC", 3) + defer c.shutdown() + + nc, js := jsClientConnect(t, c.randomServer()) + defer nc.Close() + + _, err := js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Replicas: 2, + Subjects: []string{"foo"}, + }) + require_NoError(t, err) + + // Test when BackOff is configured and AckWait and MaxDeliver are as well. + // Currently the BackOff will override AckWait, but we want MaxDeliver to be set to be at least len(BackOff)+1. + ccReq := &CreateConsumerRequest{ + Stream: "TEST", + Config: ConsumerConfig{ + Durable: "dlc", + DeliverSubject: "x", + AckPolicy: AckExplicit, + AckWait: 30 * time.Second, + MaxDeliver: 2, + BackOff: []time.Duration{25 * time.Millisecond, 100 * time.Millisecond, 250 * time.Millisecond}, + }, + } + req, err := json.Marshal(ccReq) + require_NoError(t, err) + resp, err := nc.Request(fmt.Sprintf(JSApiDurableCreateT, "TEST", "dlc"), req, time.Second) + require_NoError(t, err) + var ccResp JSApiConsumerCreateResponse + err = json.Unmarshal(resp.Data, &ccResp) + require_NoError(t, err) + if ccResp.Error == nil || ccResp.Error.ErrCode != 10116 { + t.Fatalf("Expected an error when MaxDeliver is <= len(BackOff), got %+v", ccResp.Error) + } + + // Set MaxDeliver to 6. + ccReq.Config.MaxDeliver = 6 + req, err = json.Marshal(ccReq) + require_NoError(t, err) + resp, err = nc.Request(fmt.Sprintf(JSApiDurableCreateT, "TEST", "dlc"), req, time.Second) + require_NoError(t, err) + ccResp.Error = nil + err = json.Unmarshal(resp.Data, &ccResp) + require_NoError(t, err) + if ccResp.Error != nil { + t.Fatalf("Unexpected error: %+v", ccResp.Error) + } + if cfg := ccResp.ConsumerInfo.Config; cfg.AckWait != 25*time.Millisecond || cfg.MaxDeliver != 6 { + t.Fatalf("Expected AckWait to be first BackOff (25ms) and MaxDeliver set to 6, got %+v", cfg) + } + + var received []time.Time + var mu sync.Mutex + + sub, err := nc.Subscribe("x", func(m *nats.Msg) { + mu.Lock() + received = append(received, time.Now()) + mu.Unlock() + }) + require_NoError(t, err) + + // Send a message. + start := time.Now() + _, err = js.Publish("foo", []byte("m22")) + require_NoError(t, err) + + checkFor(t, 5*time.Second, 500*time.Millisecond, func() error { + mu.Lock() + nr := len(received) + mu.Unlock() + if nr >= 6 { + return nil + } + return fmt.Errorf("Only seen %d of 6", nr) + }) + sub.Unsubscribe() + + expected := ccReq.Config.BackOff + // We expect the MaxDeliver to go until 6, so fill in two additional ones. + expected = append(expected, 250*time.Millisecond, 250*time.Millisecond) + for i, tr := range received[1:] { + d := tr.Sub(start) + // Adjust start for next calcs. + start = start.Add(d) + if d < expected[i] || d > expected[i]*2 { + t.Fatalf("Timing is off for %d, expected ~%v, but got %v", i, expected[i], d) + } + } +} + // Support functions // Used to setup superclusters for tests. diff --git a/server/jetstream_errors_generated.go b/server/jetstream_errors_generated.go index 654b06243c..9887787041 100644 --- a/server/jetstream_errors_generated.go +++ b/server/jetstream_errors_generated.go @@ -104,6 +104,9 @@ const ( // JSConsumerInvalidSamplingErrF failed to parse consumer sampling configuration: {err} JSConsumerInvalidSamplingErrF ErrorIdentifier = 10095 + // JSConsumerMaxDeliverBackoffErr max deliver is required to be > length of backoff values + JSConsumerMaxDeliverBackoffErr ErrorIdentifier = 10116 + // JSConsumerMaxPendingAckPolicyRequiredErr consumer requires ack policy for max ack pending JSConsumerMaxPendingAckPolicyRequiredErr ErrorIdentifier = 10082 @@ -383,6 +386,7 @@ var ( JSConsumerInvalidDeliverSubject: {Code: 400, ErrCode: 10112, Description: "invalid push consumer deliver subject"}, JSConsumerInvalidPolicyErrF: {Code: 400, ErrCode: 10094, Description: "{err}"}, JSConsumerInvalidSamplingErrF: {Code: 400, ErrCode: 10095, Description: "failed to parse consumer sampling configuration: {err}"}, + JSConsumerMaxDeliverBackoffErr: {Code: 400, ErrCode: 10116, Description: "max deliver is required to be > length of backoff values"}, JSConsumerMaxPendingAckPolicyRequiredErr: {Code: 400, ErrCode: 10082, Description: "consumer requires ack policy for max ack pending"}, JSConsumerMaxRequestBatchNegativeErr: {Code: 400, ErrCode: 10114, Description: "consumer max request batch needs to be > 0"}, JSConsumerMaxRequestExpiresToSmall: {Code: 400, ErrCode: 10115, Description: "consumer max request expires needs to be >= 1ms"}, @@ -843,6 +847,16 @@ func NewJSConsumerInvalidSamplingError(err error, opts ...ErrorOption) *ApiError } } +// NewJSConsumerMaxDeliverBackoffError creates a new JSConsumerMaxDeliverBackoffErr error: "max deliver is required to be > length of backoff values" +func NewJSConsumerMaxDeliverBackoffError(opts ...ErrorOption) *ApiError { + eopts := parseOpts(opts) + if ae, ok := eopts.err.(*ApiError); ok { + return ae + } + + return ApiErrors[JSConsumerMaxDeliverBackoffErr] +} + // NewJSConsumerMaxPendingAckPolicyRequiredError creates a new JSConsumerMaxPendingAckPolicyRequiredErr error: "consumer requires ack policy for max ack pending" func NewJSConsumerMaxPendingAckPolicyRequiredError(opts ...ErrorOption) *ApiError { eopts := parseOpts(opts)