Skip to content

Commit

Permalink
Support a general nak_delay on NATS Jetstream input and a header to d…
Browse files Browse the repository at this point in the history
…elay a msg for a custom period of time.

Signed-off-by: Marco Amador <amador.marco@gmail.com>
  • Loading branch information
mfamador committed Apr 30, 2024
1 parent 51d971a commit 10320aa
Show file tree
Hide file tree
Showing 2 changed files with 69 additions and 17 deletions.
63 changes: 46 additions & 17 deletions internal/impl/nats/input_jetstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,15 @@ You can access these metadata fields using
Description("The maximum number of outstanding acks to be allowed before consuming is halted.").
Advanced().
Default(1024)).
Field(service.NewDurationField("nak_delay").
Description("An optional delay duration on redelivering a message when negatively acknowledged.").
Example("1m").
Advanced().
Optional()).
Field(service.NewStringField("nak_delay_until_header").
Description("An optional header name on which will come a unix epoch timestamp in seconds until when the message delivery should be delayed. By default is `nak_delay_until`").
Advanced().
Default("nak_delay_until")).
Fields(connectionTailFields()...).
Field(inputTracingDocs())
}
Expand All @@ -102,16 +111,18 @@ func init() {
//------------------------------------------------------------------------------

type jetStreamReader struct {
connDetails connectionDetails
deliverOpt nats.SubOpt
subject string
queue string
stream string
bind bool
pull bool
durable string
ackWait time.Duration
maxAckPending int
connDetails connectionDetails
deliverOpt nats.SubOpt
subject string
queue string
stream string
bind bool
pull bool
durable string
ackWait time.Duration
nakDelay time.Duration
nakDelayUntilHeader string
maxAckPending int

log *service.Logger

Expand Down Expand Up @@ -197,6 +208,15 @@ func newJetStreamReaderFromConfig(conf *service.ParsedConfig, mgr *service.Resou
}
}

if conf.Contains("nak_delay") {
if j.nakDelay, err = conf.FieldDuration("nak_delay"); err != nil {
return nil, err
}
}
if j.nakDelayUntilHeader, err = conf.FieldString("nak_delay_until_header"); err != nil {
return nil, err
}

if j.maxAckPending, err = conf.FieldInt("max_ack_pending"); err != nil {
return nil, err
}
Expand Down Expand Up @@ -315,14 +335,13 @@ func (j *jetStreamReader) Read(ctx context.Context) (*service.Message, service.A
if natsSub == nil {
return nil, nil, service.ErrNotConnected
}

if !j.pull {
nmsg, err := natsSub.NextMsgWithContext(ctx)
if err != nil {
// TODO: Any errors need capturing here to signal a lost connection?
return nil, nil, err
}
return convertMessage(nmsg)
return j.convertMessage(nmsg)
}

for {
Expand All @@ -343,7 +362,7 @@ func (j *jetStreamReader) Read(ctx context.Context) (*service.Message, service.A
if len(msgs) == 0 {
continue
}
return convertMessage(msgs[0])
return j.convertMessage(msgs[0])
}
}

Expand All @@ -360,7 +379,7 @@ func (j *jetStreamReader) Close(ctx context.Context) error {
return nil
}

func convertMessage(m *nats.Msg) (*service.Message, service.AckFunc, error) {
func (j *jetStreamReader) convertMessage(m *nats.Msg) (*service.Message, service.AckFunc, error) {
msg := service.NewMessage(m.Data)
msg.MetaSet("nats_subject", m.Subject)

Expand All @@ -382,9 +401,19 @@ func convertMessage(m *nats.Msg) (*service.Message, service.AckFunc, error) {
}

return msg, func(ctx context.Context, res error) error {
if res == nil {
return m.Ack()
if res != nil {
if val, ok := m.Header[j.nakDelayUntilHeader]; ok {
if unixTime, err := strconv.ParseInt(val[0], 10, 64); err != nil {
j.log.Warnf("error parsing unix epoch time from header %s: %s error: %v", j.nakDelayUntilHeader, val[0], err)
return m.Nak()
} else {
return m.NakWithDelay(time.Unix(unixTime, 0).Sub(time.Now().UTC()))
}
} else if j.nakDelay > 0 {
return m.NakWithDelay(j.nakDelay)
}
return m.Nak()
}
return m.Nak()
return m.Ack()
}, nil
}
23 changes: 23 additions & 0 deletions website/docs/components/inputs/nats_jetstream.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ input:
deliver: all
ack_wait: 30s
max_ack_pending: 1024
nak_delay: 1m # No default (optional)
nak_delay_until_header: nak_delay_until
tls:
enabled: false
skip_cert_verify: false
Expand Down Expand Up @@ -241,6 +243,27 @@ The maximum number of outstanding acks to be allowed before consuming is halted.
Type: `int`
Default: `1024`

### `nak_delay`

An optional delay duration on redelivering a message when negatively acknowledged.


Type: `string`

```yml
# Examples

nak_delay: 1m
```

### `nak_delay_until_header`

An optional header name on which will come a unix epoch timestamp in seconds until when the message delivery should be delayed. By default is `nak_delay_until`


Type: `string`
Default: `"nak_delay_until"`

### `tls`

Custom TLS settings can be used to override system defaults.
Expand Down

0 comments on commit 10320aa

Please sign in to comment.