Skip to content

Commit

Permalink
Merge pull request #585 from ripienaar/js_ack_helpers
Browse files Browse the repository at this point in the history
add JetStream ack helpers
  • Loading branch information
ripienaar committed Oct 5, 2020
2 parents 42c4036 + f70aadb commit efb1e55
Show file tree
Hide file tree
Showing 5 changed files with 728 additions and 6 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ go 1.14

require (
github.com/golang/protobuf v1.4.2
github.com/nats-io/nats-server/v2 v2.1.8-0.20200617224755-fa744fdcdaa3
github.com/nats-io/nats-server/v2 v2.1.8-0.20200929001935-7f44d075f7ad
github.com/nats-io/nkeys v0.2.0
github.com/nats-io/nuid v1.0.1
google.golang.org/protobuf v1.23.0
Expand Down
10 changes: 6 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,13 @@ github.com/minio/highwayhash v1.0.0/go.mod h1:xQboMTeM9nY9v/LlAOxFctujiv5+Aq2hR5
github.com/nats-io/jwt v0.3.2/go.mod h1:/euKqTS1ZD+zzjYrY7pseZrTtWQSjujC7xjPc8wL6eU=
github.com/nats-io/jwt v0.3.3-0.20200519195258-f2bf5ce574c7 h1:RnGotxlghqR5D2KDAu4TyuLqyjuylOsJiAFhXvMvQIc=
github.com/nats-io/jwt v0.3.3-0.20200519195258-f2bf5ce574c7/go.mod h1:n3cvmLfBfnpV4JJRN7lRYCyZnw48ksGsbThGXEk4w9M=
github.com/nats-io/jwt/v2 v2.0.0-20200602193336-473d698956ed h1:nnV8Mw23aNwNpKuQWuVBEuAqyBOEY21hLWKpVdNr6dQ=
github.com/nats-io/jwt/v2 v2.0.0-20200602193336-473d698956ed/go.mod h1:vs+ZEjP+XKy8szkBmQwCB7RjYdIlMaPsFPs4VdS4bTQ=
github.com/nats-io/jwt/v2 v2.0.0-20200916203241-1f8ce17dff02 h1:WloZv3SCb55D/rOHYy1rWBXLrj3BYc9zw8VIq6X54lI=
github.com/nats-io/jwt/v2 v2.0.0-20200916203241-1f8ce17dff02/go.mod h1:vs+ZEjP+XKy8szkBmQwCB7RjYdIlMaPsFPs4VdS4bTQ=
github.com/nats-io/nats-server/v2 v2.1.8-0.20200524125952-51ebd92a9093/go.mod h1:rQnBf2Rv4P9adtAs/Ti6LfFmVtFG6HLhl/H7cVshcJU=
github.com/nats-io/nats-server/v2 v2.1.8-0.20200601203034-f8d6dd992b71 h1:nexMtKbOeM+w3vGQMNF0BEt+2xZDmVCtYXql2Ym+RWg=
github.com/nats-io/nats-server/v2 v2.1.8-0.20200601203034-f8d6dd992b71/go.mod h1:Nan/1L5Sa1JRW+Thm4HNYcIDcVRFc5zK9OpSZeI2kk4=
github.com/nats-io/nats-server/v2 v2.1.8-0.20200617224755-fa744fdcdaa3 h1:aDJ5IrBlq4KHBgwWZtKXi1lvY2EkRYOiC2KQfdLTJL8=
github.com/nats-io/nats-server/v2 v2.1.8-0.20200617224755-fa744fdcdaa3/go.mod h1:uXGA6y1uxwW755SK+LoDZggh+UUVsbVoxh8ZG8MqbsI=
github.com/nats-io/nats-server/v2 v2.1.8-0.20200929001935-7f44d075f7ad h1:oRb9MIi1Y4N5cTZWciqH68aVNt1e+o4N2uRnjVzv/UE=
github.com/nats-io/nats-server/v2 v2.1.8-0.20200929001935-7f44d075f7ad/go.mod h1:TkHpUIDETmTI7mrHN40D1pzxfzHZuGmtMbtb83TGVQw=
github.com/nats-io/nats.go v1.10.0/go.mod h1:AjGArbfyR50+afOUotNX2Xs5SYHf+CoOa5HH1eEl2HE=
github.com/nats-io/nats.go v1.10.1-0.20200531124210-96f2130e4d55/go.mod h1:ARiFsjW9DVxk48WJbO3OSZ2DG8fjkMi7ecLmXoY/n9I=
github.com/nats-io/nats.go v1.10.1-0.20200606002146-fc6fed82929a/go.mod h1:8eAIv96Mo9QW6Or40jUHejS7e4VwZ3VRYD6Sf0BTDp4=
Expand All @@ -41,6 +41,8 @@ golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20191022100944-742c48ecaeb7 h1:HmbHVPwrPEKPGLAcHSrMe6+hqSUlvZU0rab6x5EXfGU=
golang.org/x/sys v0.0.0-20191022100944-742c48ecaeb7/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/time v0.0.0-20200416051211-89c76fbcd5d1 h1:NusfzzA6yGQ+ua51ck7E3omNUX/JuqbFSaRGqU8CcLI=
golang.org/x/time v0.0.0-20200416051211-89c76fbcd5d1/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8=
Expand Down
356 changes: 356 additions & 0 deletions jetstream.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,356 @@
package nats

import (
"bytes"
"context"
"encoding/json"
"fmt"
"strconv"
"strings"
"time"
)

var (
AckAck = []byte("+ACK")
AckNak = []byte("-NAK")
AckProgress = []byte("+WPI")
AckNext = []byte("+NXT")
AckTerm = []byte("+TERM")
)

// JetStreamMsgMetaData is metadata related to a JetStream originated message
type JetStreamMsgMetaData struct {
Stream string
Consumer string
Parsed bool
Delivered int
StreamSeq int
ConsumerSeq int
TimeStamp time.Time
}

func (m *Msg) JetStreamMetaData() (*JetStreamMsgMetaData, error) {
var err error

if m.jsMeta != nil && m.jsMeta.Parsed {
return m.jsMeta, nil
}

m.jsMeta, err = m.parseJSMsgMetadata()

return m.jsMeta, err
}

func (m *Msg) parseJSMsgMetadata() (*JetStreamMsgMetaData, error) {
if m.jsMeta != nil {
return m.jsMeta, nil
}

if len(m.Reply) == 0 {
return nil, ErrNotJSMessage
}

meta := &JetStreamMsgMetaData{}

tsa := [32]string{}
parts := tsa[:0]
start := 0
btsep := byte('.')
for i := 0; i < len(m.Reply); i++ {
if m.Reply[i] == btsep {
parts = append(parts, m.Reply[start:i])
start = i + 1
}
}
parts = append(parts, m.Reply[start:])

if len(parts) != 8 || parts[0] != "$JS" || parts[1] != "ACK" {
return nil, ErrNotJSMessage
}

var err error

meta.Stream = parts[2]
meta.Consumer = parts[3]
meta.Delivered, err = strconv.Atoi(parts[4])
if err != nil {
return nil, ErrNotJSMessage
}

meta.StreamSeq, err = strconv.Atoi(parts[5])
if err != nil {
return nil, ErrNotJSMessage
}

meta.ConsumerSeq, err = strconv.Atoi(parts[6])
if err != nil {
return nil, ErrNotJSMessage
}

tsi, err := strconv.Atoi(parts[7])
if err != nil {
return nil, ErrNotJSMessage
}
meta.TimeStamp = time.Unix(0, int64(tsi))

meta.Parsed = true

return meta, nil
}

const jsStreamUnspecified = "not.set"

type jsAcKOpts struct {
str string // stream to expect a ack from
}

type jsOpts struct {
timeout time.Duration
ctx context.Context

ack jsAcKOpts
}

func newJsOpts() *jsOpts {
return &jsOpts{ack: jsAcKOpts{str: jsStreamUnspecified}}
}

func (j *jsOpts) context(dftl time.Duration) (context.Context, context.CancelFunc) {
if j.ctx != nil {
return context.WithCancel(j.ctx)
}

if j.timeout == 0 {
j.timeout = dftl
}

return context.WithTimeout(context.Background(), j.timeout)
}

// AckOption configures the various JetStream message acknowledgement helpers
type AckOption func(opts *jsOpts) error

// PublishOption configures publishing messages
type PublishOption func(opts *jsOpts) error

// PublishExpectsStream waits for an ack after publishing and ensure it's from a specific stream, empty arguments waits for any valid acknowledgement
func PublishExpectsStream(stream ...string) PublishOption {
return func(opts *jsOpts) error {
switch len(stream) {
case 0:
opts.ack.str = ""
case 1:
opts.ack.str = stream[0]
if !isValidJSName(opts.ack.str) {
return ErrInvalidStreamName
}
default:
return ErrMultiStreamUnsupported
}

return nil
}
}

// PublishStreamTimeout sets the period of time to wait for JetStream to acknowledge receipt, defaults to JetStreamTimeout option
func PublishStreamTimeout(t time.Duration) PublishOption {
return func(opts *jsOpts) error {
opts.timeout = t
return nil
}
}

// PublishCtx sets an interrupt context for waiting on a stream to reply
func PublishCtx(ctx context.Context) PublishOption {
return func(opts *jsOpts) error {
opts.ctx = ctx
return nil
}
}

// AckWaitDuration waits for confirmation from the JetStream server
func AckWaitDuration(d time.Duration) AckOption {
return func(opts *jsOpts) error {
opts.timeout = d
return nil
}
}

func (m *Msg) jsAck(body []byte, opts ...AckOption) error {
if m.Reply == "" {
return ErrMsgNoReply
}

if m == nil || m.Sub == nil {
return ErrMsgNotBound
}

m.Sub.mu.Lock()
nc := m.Sub.conn
m.Sub.mu.Unlock()

var err error
var aopts *jsOpts

if len(opts) > 0 {
aopts = newJsOpts()
for _, f := range opts {
if err = f(aopts); err != nil {
return err
}
}
}

if aopts == nil || aopts.timeout == 0 {
return m.Respond(body)
}

_, err = nc.Request(m.Reply, body, aopts.timeout)

return err
}

// Ack acknowledges a JetStream messages received from a Consumer, indicating the message
// should not be received again later
func (m *Msg) Ack(opts ...AckOption) error {
return m.jsAck(AckAck, opts...)
}

// Nak acknowledges a JetStream message received from a Consumer, indicating that the message
// is not completely processed and should be sent again later
func (m *Msg) Nak(opts ...AckOption) error {
return m.jsAck(AckNak, opts...)
}

// AckProgress acknowledges a Jetstream message received from a Consumer, indicating that work is
// ongoing and further processing time is required equal to the configured AckWait of the Consumer
func (m *Msg) AckProgress(opts ...AckOption) error {
return m.jsAck(AckProgress, opts...)
}

// AckNext performs an Ack() and request that the next message be sent to subject ib
func (m *Msg) AckNext(ib string) error {
return m.RespondMsg(&Msg{Subject: m.Reply, Reply: ib, Data: AckNext})
}

// AckAndFetch performs an AckNext() and returns the next message from the stream
func (m *Msg) AckAndFetch(opts ...AckOption) (*Msg, error) {
if m.Reply == "" {
return nil, ErrMsgNoReply
}

if m == nil || m.Sub == nil {
return nil, ErrMsgNotBound
}

m.Sub.mu.Lock()
nc := m.Sub.conn
m.Sub.mu.Unlock()

var err error

aopts := newJsOpts()
for _, f := range opts {
if err = f(aopts); err != nil {
return nil, err
}
}

ctx, cancel := aopts.context(nc.Opts.JetStreamTimeout)
defer cancel()

sub, err := nc.SubscribeSync(NewInbox())
if err != nil {
return nil, err
}
sub.AutoUnsubscribe(1)
defer sub.Unsubscribe()

err = m.RespondMsg(&Msg{Reply: sub.Subject, Data: AckNext, Subject: m.Reply})
if err != nil {
return nil, err
}
nc.Flush()

return sub.NextMsgWithContext(ctx)
}

// AckTerm acknowledges a message received from JetStream indicating the message will not be processed
// and should not be sent to another consumer
func (m *Msg) AckTerm(opts ...AckOption) error {
return m.jsAck(AckTerm, opts...)
}

// JetStreamPublishAck metadata received from JetStream when publishing messages
type JetStreamPublishAck struct {
Stream string `json:"stream"`
Sequence int `json:"seq"`
}

// ParsePublishAck parses the publish acknowledgement sent by JetStream
func ParsePublishAck(m []byte) (*JetStreamPublishAck, error) {
if bytes.HasPrefix([]byte("-ERR"), m) {
if len(m) > 7 {
return nil, fmt.Errorf(string(m[6 : len(m)-1]))
}

return nil, fmt.Errorf(string(m))
}

if !bytes.HasPrefix(m, []byte("+OK {")) {
return nil, fmt.Errorf("invalid JetStream Ack: %v", string(m))
}

ack := &JetStreamPublishAck{}
err := json.Unmarshal(m[3:], ack)
return ack, err
}

func isValidJSName(n string) bool {
return !(n == "" || strings.ContainsAny(n, ">*. "))
}

func (nc *Conn) jsPublish(subj string, data []byte, opts []PublishOption) error {
var err error
var aopts *jsOpts

if len(opts) > 0 {
aopts = newJsOpts()
for _, f := range opts {
if err = f(aopts); err != nil {
return err
}
}
}

if aopts == nil || aopts.timeout == 0 && aopts.ctx == nil && aopts.ack.str == jsStreamUnspecified {
return nc.publish(subj, _EMPTY_, nil, data)
}

ctx, cancel := aopts.context(nc.Opts.JetStreamTimeout)
defer cancel()

resp, err := nc.RequestWithContext(ctx, subj, data)
if err != nil {
return err
}

ack, err := ParsePublishAck(resp.Data)
if err != nil {
return err
}

if ack.Stream == "" || ack.Sequence == 0 {
return ErrInvalidJSAck
}

if aopts.ack.str == jsStreamUnspecified || aopts.ack.str == "" {
return nil
}

if ack.Stream == aopts.ack.str {
return nil
}

return fmt.Errorf("received ack from stream %q", ack.Stream)
}
Loading

0 comments on commit efb1e55

Please sign in to comment.