Skip to content

Commit

Permalink
resolve conflicts
Browse files Browse the repository at this point in the history
  • Loading branch information
mfamador committed Jun 4, 2024
1 parent 814b290 commit 8affb70
Showing 1 changed file with 46 additions and 17 deletions.
63 changes: 46 additions & 17 deletions internal/impl/nats/input_jetstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,15 @@ xref:configuration:interpolation.adoc#bloblang-queries[function interpolation].
Description("The maximum number of outstanding acks to be allowed before consuming is halted.").
Advanced().
Default(1024)).
Field(service.NewDurationField("nak_delay").
Description("An optional delay duration on redelivering the messages when negatively acknowledged.").
Example("1m").
Advanced().
Optional()).
Field(service.NewStringField("nak_delay_until_header").
Description("An optional header name on which will come a unix epoch timestamp in seconds until when the message delivery should be delayed. By default is `nak_delay_until`").
Advanced().
Default("nak_delay_until")).
Fields(connectionTailFields()...).
Field(inputTracingDocs())
}
Expand All @@ -101,16 +110,18 @@ func init() {
//------------------------------------------------------------------------------

type jetStreamReader struct {
connDetails connectionDetails
deliverOpt nats.SubOpt
subject string
queue string
stream string
bind bool
pull bool
durable string
ackWait time.Duration
maxAckPending int
connDetails connectionDetails
deliverOpt nats.SubOpt
subject string
queue string
stream string
bind bool
pull bool
durable string
ackWait time.Duration
nakDelay time.Duration
nakDelayUntilHeader string
maxAckPending int

log *service.Logger

Expand Down Expand Up @@ -196,6 +207,15 @@ func newJetStreamReaderFromConfig(conf *service.ParsedConfig, mgr *service.Resou
}
}

if conf.Contains("nak_delay") {
if j.nakDelay, err = conf.FieldDuration("nak_delay"); err != nil {
return nil, err
}
}
if j.nakDelayUntilHeader, err = conf.FieldString("nak_delay_until_header"); err != nil {
return nil, err
}

if j.maxAckPending, err = conf.FieldInt("max_ack_pending"); err != nil {
return nil, err
}
Expand Down Expand Up @@ -314,14 +334,13 @@ func (j *jetStreamReader) Read(ctx context.Context) (*service.Message, service.A
if natsSub == nil {
return nil, nil, service.ErrNotConnected
}

if !j.pull {
nmsg, err := natsSub.NextMsgWithContext(ctx)
if err != nil {
// TODO: Any errors need capturing here to signal a lost connection?
return nil, nil, err
}
return convertMessage(nmsg)
return j.convertMessage(nmsg)
}

for {
Expand All @@ -342,7 +361,7 @@ func (j *jetStreamReader) Read(ctx context.Context) (*service.Message, service.A
if len(msgs) == 0 {
continue
}
return convertMessage(msgs[0])
return j.convertMessage(msgs[0])
}
}

Expand All @@ -359,7 +378,7 @@ func (j *jetStreamReader) Close(ctx context.Context) error {
return nil
}

func convertMessage(m *nats.Msg) (*service.Message, service.AckFunc, error) {
func (j *jetStreamReader) convertMessage(m *nats.Msg) (*service.Message, service.AckFunc, error) {
msg := service.NewMessage(m.Data)
msg.MetaSet("nats_subject", m.Subject)

Expand All @@ -381,9 +400,19 @@ func convertMessage(m *nats.Msg) (*service.Message, service.AckFunc, error) {
}

return msg, func(ctx context.Context, res error) error {
if res == nil {
return m.Ack()
if res != nil {
if val, ok := m.Header[j.nakDelayUntilHeader]; ok {
if unixTime, err := strconv.ParseInt(val[0], 10, 64); err != nil {
j.log.Warnf("error parsing unix epoch time from header %s: %s error: %v", j.nakDelayUntilHeader, val[0], err)
return m.Nak()
} else {
return m.NakWithDelay(time.Unix(unixTime, 0).Sub(time.Now().UTC()))
}
} else if j.nakDelay > 0 {
return m.NakWithDelay(j.nakDelay)
}
return m.Nak()
}
return m.Nak()
return m.Ack()
}, nil
}

0 comments on commit 8affb70

Please sign in to comment.