Skip to content

Commit

Permalink
Merge pull request #2812 from nats-io/backoff
Browse files Browse the repository at this point in the history
Consumer Ack/Nak Backoffs
  • Loading branch information
derekcollison committed Jan 24, 2022
2 parents 89435d5 + 6be9925 commit 5e71c90
Show file tree
Hide file tree
Showing 8 changed files with 292 additions and 30 deletions.
114 changes: 87 additions & 27 deletions server/consumer.go
Expand Up @@ -47,23 +47,24 @@ type ConsumerInfo struct {
}

type ConsumerConfig struct {
Durable string `json:"durable_name,omitempty"`
Description string `json:"description,omitempty"`
DeliverPolicy DeliverPolicy `json:"deliver_policy"`
OptStartSeq uint64 `json:"opt_start_seq,omitempty"`
OptStartTime *time.Time `json:"opt_start_time,omitempty"`
AckPolicy AckPolicy `json:"ack_policy"`
AckWait time.Duration `json:"ack_wait,omitempty"`
MaxDeliver int `json:"max_deliver,omitempty"`
FilterSubject string `json:"filter_subject,omitempty"`
ReplayPolicy ReplayPolicy `json:"replay_policy"`
RateLimit uint64 `json:"rate_limit_bps,omitempty"` // Bits per sec
SampleFrequency string `json:"sample_freq,omitempty"`
MaxWaiting int `json:"max_waiting,omitempty"`
MaxAckPending int `json:"max_ack_pending,omitempty"`
Heartbeat time.Duration `json:"idle_heartbeat,omitempty"`
FlowControl bool `json:"flow_control,omitempty"`
HeadersOnly bool `json:"headers_only,omitempty"`
Durable string `json:"durable_name,omitempty"`
Description string `json:"description,omitempty"`
DeliverPolicy DeliverPolicy `json:"deliver_policy"`
OptStartSeq uint64 `json:"opt_start_seq,omitempty"`
OptStartTime *time.Time `json:"opt_start_time,omitempty"`
AckPolicy AckPolicy `json:"ack_policy"`
AckWait time.Duration `json:"ack_wait,omitempty"`
MaxDeliver int `json:"max_deliver,omitempty"`
BackOff []time.Duration `json:"backoff,omitempty"`
FilterSubject string `json:"filter_subject,omitempty"`
ReplayPolicy ReplayPolicy `json:"replay_policy"`
RateLimit uint64 `json:"rate_limit_bps,omitempty"` // Bits per sec
SampleFrequency string `json:"sample_freq,omitempty"`
MaxWaiting int `json:"max_waiting,omitempty"`
MaxAckPending int `json:"max_ack_pending,omitempty"`
Heartbeat time.Duration `json:"idle_heartbeat,omitempty"`
FlowControl bool `json:"flow_control,omitempty"`
HeadersOnly bool `json:"headers_only,omitempty"`

// Pull based options.
MaxRequestBatch int `json:"max_batch,omitempty"`
Expand Down Expand Up @@ -92,6 +93,11 @@ type CreateConsumerRequest struct {
Config ConsumerConfig `json:"config"`
}

// ConsumerNakOptions is for optional NAK values, e.g. delay.
type ConsumerNakOptions struct {
Delay time.Duration `json:"delay"`
}

// DeliverPolicy determines how the consumer should select the first message to deliver.
type DeliverPolicy int

Expand Down Expand Up @@ -297,6 +303,10 @@ func setConsumerConfigDefaults(config *ConsumerConfig) {
if config.MaxDeliver == 0 {
config.MaxDeliver = -1
}
// If BackOff was specified that will override the AckWait and the MaxDeliver.
if len(config.BackOff) > 0 {
config.AckWait = config.BackOff[0]
}
// Set proper default for max ack pending if we are ack explicit and none has been set.
if (config.AckPolicy == AckExplicit || config.AckPolicy == AckAll) && config.MaxAckPending == 0 {
config.MaxAckPending = JsDefaultMaxAckPending
Expand Down Expand Up @@ -326,6 +336,11 @@ func (mset *stream) addConsumerWithAssignment(config *ConsumerConfig, oname stri
// Make sure we have sane defaults.
setConsumerConfigDefaults(config)

// Check if we have a BackOff defined that MaxDeliver is within range etc.
if lbo := len(config.BackOff); lbo > 0 && config.MaxDeliver <= lbo {
return nil, NewJSConsumerMaxDeliverBackoffError()
}

if len(config.Description) > JSMaxDescriptionLen {
return nil, NewJSConsumerDescriptionTooLongError(JSMaxDescriptionLen)
}
Expand Down Expand Up @@ -1336,7 +1351,7 @@ func (o *consumer) updateDeliverSubjectLocked(newDeliver string) {
func configsEqualSansDelivery(a, b ConsumerConfig) bool {
// These were copied in so can set Delivery here.
a.DeliverSubject, b.DeliverSubject = _EMPTY_, _EMPTY_
return a == b
return reflect.DeepEqual(a, b)
}

// Helper to send a reply to an ack.
Expand Down Expand Up @@ -1369,8 +1384,8 @@ func (o *consumer) processAck(_ *subscription, c *client, acc *Account, subject,
o.processNextMsgReq(nil, c, acc, subject, reply, msg[len(AckNext):])
c.pa.hdr = phdr
skipAckReply = true
case bytes.Equal(msg, AckNak):
o.processNak(sseq, dseq)
case bytes.HasPrefix(msg, AckNak):
o.processNak(sseq, dseq, dc, msg)
case bytes.Equal(msg, AckProgress):
o.progressUpdate(sseq)
case bytes.Equal(msg, AckTerm):
Expand Down Expand Up @@ -1592,7 +1607,7 @@ func (o *consumer) checkPendingRequests() {
}

// Process a NAK.
func (o *consumer) processNak(sseq, dseq uint64) {
func (o *consumer) processNak(sseq, dseq, dc uint64, nak []byte) {
o.mu.Lock()
defer o.mu.Unlock()

Expand All @@ -1606,6 +1621,44 @@ func (o *consumer) processNak(sseq, dseq uint64) {
return
}
}
// Check to see if we have delays attached.
if len(nak) > len(AckNak) {
arg := bytes.TrimSpace(nak[len(AckNak):])
if len(arg) > 0 {
var d time.Duration
var err error
if arg[0] == '{' {
var nd ConsumerNakOptions
if err = json.Unmarshal(arg, &nd); err == nil {
d = nd.Delay
}
} else {
d, err = time.ParseDuration(string(arg))
}
if err != nil {
// Treat this as normal NAK.
o.srv.Warnf("JetStream consumer '%s > %s > %s' bad NAK delay value: %q", o.acc.Name, o.stream, o.name, arg)
} else {
// We have a parsed duration that the user wants us to wait before retrying.
// Make sure we are not on the rdq.
o.removeFromRedeliverQueue(sseq)
if p, ok := o.pending[sseq]; ok {
// now - ackWait is expired now, so offset from there.
p.Timestamp = time.Now().Add(-o.cfg.AckWait).Add(d).UnixNano()
// Update store system which will update followers as well.
o.updateDelivered(p.Sequence, sseq, dc, p.Timestamp)
if o.ptmr != nil {
// Want checkPending to run and figure out the next timer ttl.
// TODO(dlc) - We could optimize this maybe a bit more and track when we expect the timer to fire.
o.ptmr.Reset(10 * time.Millisecond)
}
}
// Nothing else for use to do now so return.
return
}
}
}

// If already queued up also ignore.
if !o.onRedeliverQueue(sseq) {
o.addToRedeliverQueue(sseq)
Expand Down Expand Up @@ -1709,8 +1762,8 @@ func (o *consumer) applyState(state *ConsumerState) {
// Setup tracking timer if we have restored pending.
if len(o.pending) > 0 && o.ptmr == nil {
// This is on startup or leader change. We want to check pending
// sooner in case there are inconsistencies etc. Pick between 1-5 secs.
delay := time.Second + time.Duration(rand.Int63n(4000))*time.Millisecond
// sooner in case there are inconsistencies etc. Pick between 500ms - 1.5s
delay := 500*time.Millisecond + time.Duration(rand.Int63n(1000))*time.Millisecond
// If normal is lower than this just use that.
if o.cfg.AckWait < delay {
delay = o.ackWait(0)
Expand Down Expand Up @@ -3065,14 +3118,21 @@ func (o *consumer) checkPending() {
shouldUpdateState = true
continue
}
elapsed := now - p.Timestamp
if elapsed >= ttl {
elapsed, deadline := now-p.Timestamp, ttl
if len(o.cfg.BackOff) > 0 && o.rdc != nil {
dc := int(o.rdc[p.Sequence])
if dc >= len(o.cfg.BackOff) {
dc = len(o.cfg.BackOff) - 1
}
deadline = int64(o.cfg.BackOff[dc])
}
if elapsed >= deadline {
if !o.onRedeliverQueue(seq) {
expired = append(expired, seq)
}
} else if ttl-elapsed < next {
} else if deadline-elapsed < next {
// Update when we should fire next.
next = ttl - elapsed
next = deadline - elapsed
}
}

Expand Down
10 changes: 10 additions & 0 deletions server/errors.json
Expand Up @@ -1138,5 +1138,15 @@
"help": "",
"url": "",
"deprecates": ""
},
{
"constant": "JSConsumerMaxDeliverBackoffErr",
"code": 400,
"error_code": 10116,
"description": "max deliver is required to be \u003e length of backoff values",
"comment": "",
"help": "",
"url": "",
"deprecates": ""
}
]
2 changes: 1 addition & 1 deletion server/filestore.go
Expand Up @@ -4884,7 +4884,7 @@ func (o *consumerFileStore) UpdateDelivered(dseq, sseq, dc uint64, ts int64) err
}

// On restarts the old leader may get a replay from the raft logs that are old.
if dseq <= o.state.Delivered.Consumer {
if dseq <= o.state.AckFloor.Consumer {
return nil
}

Expand Down
2 changes: 1 addition & 1 deletion server/filestore_test.go
Expand Up @@ -1409,7 +1409,7 @@ func TestFileStoreMeta(t *testing.T) {
if err := json.Unmarshal(buf, &oconfig2); err != nil {
t.Fatalf("Error unmarshalling: %v", err)
}
if oconfig2 != oconfig {
if !reflect.DeepEqual(oconfig2, oconfig) {
t.Fatalf("Consumer configs not equal, got %+v vs %+v", oconfig2, oconfig)
}
checksum, err = ioutil.ReadFile(ometasum)
Expand Down
7 changes: 7 additions & 0 deletions server/jetstream_api.go
Expand Up @@ -3129,6 +3129,13 @@ func (s *Server) jsConsumerCreate(sub *subscription, c *client, a *Account, subj
// Make sure we have sane defaults.
setConsumerConfigDefaults(&req.Config)

// Check if we have a BackOff defined that MaxDeliver is within range etc.
if lbo := len(req.Config.BackOff); lbo > 0 && req.Config.MaxDeliver <= lbo {
resp.Error = NewJSConsumerMaxDeliverBackoffError()
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}

// Determine if we should proceed here when we are in clustered mode.
if s.JetStreamIsClustered() {
if req.Config.Direct {
Expand Down
2 changes: 1 addition & 1 deletion server/jetstream_cluster.go
Expand Up @@ -2052,7 +2052,7 @@ func (s *Server) sendStreamLostQuorumAdvisory(mset *stream) {
return
}

s.Warnf("JetStream cluster stream '%s > %s' has NO quorum, stalled.", acc.GetName(), stream)
s.Warnf("JetStream cluster stream '%s > %s' has NO quorum, stalled", acc.GetName(), stream)

subj := JSAdvisoryStreamQuorumLostPre + "." + stream
adv := &JSStreamQuorumLostAdvisory{
Expand Down

0 comments on commit 5e71c90

Please sign in to comment.