Skip to content

Commit

Permalink
[ADDED] TermWithReason method on JetStream message (#1539)
Browse files Browse the repository at this point in the history
Signed-off-by: Piotr Piotrowski <piotr@synadia.com>
  • Loading branch information
piotrpio committed Jan 30, 2024
1 parent 224a544 commit 73dc520
Show file tree
Hide file tree
Showing 5 changed files with 90 additions and 23 deletions.
8 changes: 4 additions & 4 deletions go_test.mod
Expand Up @@ -5,8 +5,8 @@ go 1.19
require (
github.com/golang/protobuf v1.4.2
github.com/klauspost/compress v1.17.4
github.com/nats-io/nats-server/v2 v2.10.7
github.com/nats-io/nkeys v0.4.6
github.com/nats-io/nats-server/v2 v2.10.9
github.com/nats-io/nkeys v0.4.7
github.com/nats-io/nuid v1.0.1
go.uber.org/goleak v1.2.1
golang.org/x/text v0.14.0
Expand All @@ -16,7 +16,7 @@ require (
require (
github.com/minio/highwayhash v1.0.2 // indirect
github.com/nats-io/jwt/v2 v2.5.3 // indirect
golang.org/x/crypto v0.16.0 // indirect
golang.org/x/sys v0.15.0 // indirect
golang.org/x/crypto v0.18.0 // indirect
golang.org/x/sys v0.16.0 // indirect
golang.org/x/time v0.5.0 // indirect
)
16 changes: 8 additions & 8 deletions go_test.sum
Expand Up @@ -16,21 +16,21 @@ github.com/minio/highwayhash v1.0.2 h1:Aak5U0nElisjDCfPSG79Tgzkn2gl66NxOMspRrKnA
github.com/minio/highwayhash v1.0.2/go.mod h1:BQskDq+xkJ12lmlUUi7U0M5Swg3EWR+dLTk+kldvVxY=
github.com/nats-io/jwt/v2 v2.5.3 h1:/9SWvzc6hTfamcgXJ3uYRpgj+QuY2aLNqRiqrKcrpEo=
github.com/nats-io/jwt/v2 v2.5.3/go.mod h1:iysuPemFcc7p4IoYots3IuELSI4EDe9Y0bQMe+I3Bf4=
github.com/nats-io/nats-server/v2 v2.10.7 h1:f5VDy+GMu7JyuFA0Fef+6TfulfCs5nBTgq7MMkFJx5Y=
github.com/nats-io/nats-server/v2 v2.10.7/go.mod h1:V2JHOvPiPdtfDXTuEUsthUnCvSDeFrK4Xn9hRo6du7c=
github.com/nats-io/nkeys v0.4.6 h1:IzVe95ru2CT6ta874rt9saQRkWfe2nFj1NtvYSLqMzY=
github.com/nats-io/nkeys v0.4.6/go.mod h1:4DxZNzenSVd1cYQoAa8948QY3QDjrHfcfVADymtkpts=
github.com/nats-io/nats-server/v2 v2.10.9 h1:VEW43Zz+p+9lARtiPM9ctd6ckun+92ZT2T17HWtwiFI=
github.com/nats-io/nats-server/v2 v2.10.9/go.mod h1:oorGiV9j3BOLLO3ejQe+U7pfAGyPo+ppD7rpgNF6KTQ=
github.com/nats-io/nkeys v0.4.7 h1:RwNJbbIdYCoClSDNY7QVKZlyb/wfT6ugvFCiKy6vDvI=
github.com/nats-io/nkeys v0.4.7/go.mod h1:kqXRgRDPlGy7nGaEDMuYzmiJCIAAWDK0IMBtDmGD0nc=
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/stretchr/testify v1.8.0 h1:pSgiaMZlXftHpm5L7V1+rVB+AZJydKsMxsQBIJw4PKk=
go.uber.org/goleak v1.2.1 h1:NBol2c7O1ZokfZ0LEU9K6Whx/KnwvepVetCUhtKja4A=
go.uber.org/goleak v1.2.1/go.mod h1:qlT2yGI9QafXHhZZLxlSuNsMw3FFLxBr+tBRlmO1xH4=
golang.org/x/crypto v0.16.0 h1:mMMrFzRSCF0GvB7Ne27XVtVAaXLrPmgPC7/v0tkwHaY=
golang.org/x/crypto v0.16.0/go.mod h1:gCAAfMLgwOJRpTjQ2zCCt2OcSfYMTeZVSRtQlPC7Nq4=
golang.org/x/crypto v0.18.0 h1:PGVlW0xEltQnzFZ55hkuX5+KLyrMYhHld1YHO4AKcdc=
golang.org/x/crypto v0.18.0/go.mod h1:R0j02AL6hcrfOiy9T4ZYp/rcWeMxM3L6QYxlOuEG1mg=
golang.org/x/sys v0.0.0-20190130150945-aca44879d564/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.15.0 h1:h48lPFYpsTvQJZF4EKyI4aLHaev3CxivZmv7yZig9pc=
golang.org/x/sys v0.15.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.16.0 h1:xWw16ngr6ZMtmxDyKyIgsE93KNKz5HKmMa3b8ALHidU=
golang.org/x/sys v0.16.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ=
golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU=
golang.org/x/time v0.5.0 h1:o7cqy6amK/52YcAKIPlM3a+Fpj35zvRj2TP+e1xFSfk=
Expand Down
25 changes: 24 additions & 1 deletion jetstream/message.go
Expand Up @@ -75,6 +75,15 @@ type (
// Term tells the server to not redeliver this message, regardless of
// the value of MaxDeliver.
Term() error

// TermWithReason tells the server to not redeliver this message, regardless of
// the value of MaxDeliver. The provided reason will be included in JetStream
// advisory event sent by the server.
//
// Note: This will only work with JetStream servers >= 2.10.4.
// For older servers, TermWithReason will be ignored by the server and the message
// will not be terminated.
TermWithReason(reason string) error
}

// MsgMetadata is the JetStream metadata associated with received messages.
Expand Down Expand Up @@ -123,7 +132,8 @@ type (
}

ackOpts struct {
nakDelay time.Duration
nakDelay time.Duration
termReason string
}

ackType []byte
Expand Down Expand Up @@ -305,6 +315,17 @@ func (m *jetStreamMsg) Term() error {
return m.ackReply(context.Background(), ackTerm, false, ackOpts{})
}

// TermWithReason tells the server to not redeliver this message, regardless of
// the value of MaxDeliver. The provided reason will be included in JetStream
// advisory event sent by the server.
//
// Note: This will only work with JetStream servers >= 2.10.4.
// For older servers, TermWithReason will be ignored by the server and the message
// will not be terminated.
func (m *jetStreamMsg) TermWithReason(reason string) error {
return m.ackReply(context.Background(), ackTerm, false, ackOpts{termReason: reason})
}

func (m *jetStreamMsg) ackReply(ctx context.Context, ackType ackType, sync bool, opts ackOpts) error {
err := m.checkReply()
if err != nil {
Expand All @@ -329,6 +350,8 @@ func (m *jetStreamMsg) ackReply(ctx context.Context, ackType ackType, sync bool,
var body []byte
if opts.nakDelay > 0 {
body = []byte(fmt.Sprintf("%s {\"delay\": %d}", ackType, opts.nakDelay.Nanoseconds()))
} else if opts.termReason != "" {
body = []byte(fmt.Sprintf("%s %s", ackType, opts.termReason))
} else {
body = ackType
}
Expand Down
37 changes: 37 additions & 0 deletions jetstream/test/message_test.go
Expand Up @@ -328,6 +328,43 @@ func TestAckVariants(t *testing.T) {
t.Fatalf("Invalid ack body: %q", string(ack.Data))
}
})
t.Run("term with reason", func(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
srv, nc, js, c := setup(ctx, t)
defer shutdownJSServerAndRemoveStorage(t, srv)
defer nc.Close()

if _, err := js.Publish(ctx, "FOO.1", []byte("msg")); err != nil {
t.Fatalf("Unexpected error: %v", err)
}
msgs, err := c.Fetch(1)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
msg := <-msgs.Messages()
if msg == nil {
t.Fatalf("No messages available")
}
if err := msgs.Error(); err != nil {
t.Fatalf("unexpected error during fetch: %v", err)
}
sub, err := nc.SubscribeSync(msg.Reply())
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}

if err := msg.TermWithReason("with reason"); err != nil {
t.Fatalf("Unexpected error: %v", err)
}
ack, err := sub.NextMsgWithContext(ctx)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if string(ack.Data) != "+TERM with reason" {
t.Fatalf("Invalid ack body: %q", string(ack.Data))
}
})
t.Run("in progress", func(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
Expand Down
27 changes: 17 additions & 10 deletions test/js_test.go
Expand Up @@ -3100,10 +3100,12 @@ func TestAccountInfo(t *testing.T) {
`,
expected: &nats.AccountInfo{
Tier: nats.Tier{
Memory: 0,
Store: 0,
Streams: 0,
Consumers: 0,
Memory: 0,
Store: 0,
Streams: 0,
Consumers: 0,
ReservedMemory: 0,
ReservedStore: 0,
Limits: nats.AccountLimits{
MaxMemory: -1,
MaxStore: -1,
Expand Down Expand Up @@ -3144,10 +3146,12 @@ func TestAccountInfo(t *testing.T) {
`,
expected: &nats.AccountInfo{
Tier: nats.Tier{
Memory: 0,
Store: 0,
Streams: 0,
Consumers: 0,
Memory: 0,
Store: 0,
Streams: 0,
Consumers: 0,
ReservedMemory: 0,
ReservedStore: 0,
Limits: nats.AccountLimits{
MaxMemory: 67108864,
MaxStore: 33554432,
Expand Down Expand Up @@ -3209,7 +3213,7 @@ func TestAccountInfo(t *testing.T) {
}

if !reflect.DeepEqual(test.expected, info) {
t.Fatalf("Account info does not match; expected: %v; got: %v", test.expected, info)
t.Fatalf("Account info does not match; expected: %+v; got: %+v", test.expected, info)
}
_, err = js.AddStream(&nats.StreamConfig{Name: "FOO", MaxBytes: 1024})
if err != nil {
Expand All @@ -3229,9 +3233,12 @@ func TestAccountInfo(t *testing.T) {
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
// ignore reserved store in comparison since this is dynamically
// assigned by the server
info.ReservedStore = test.expected.ReservedStore

if !reflect.DeepEqual(test.expected, info) {
t.Fatalf("Account info does not match; expected: %v; got: %v", test.expected, info)
t.Fatalf("Account info does not match; expected: %+v; got: %+v", test.expected, info)
}
})
}
Expand Down

0 comments on commit 73dc520

Please sign in to comment.