Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ADDED] TermWithReason method on JetStream message #1539

Merged
merged 2 commits into from Jan 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
8 changes: 4 additions & 4 deletions go_test.mod
Expand Up @@ -5,8 +5,8 @@ go 1.19
require (
github.com/golang/protobuf v1.4.2
github.com/klauspost/compress v1.17.4
github.com/nats-io/nats-server/v2 v2.10.7
github.com/nats-io/nkeys v0.4.6
github.com/nats-io/nats-server/v2 v2.10.9
github.com/nats-io/nkeys v0.4.7
github.com/nats-io/nuid v1.0.1
go.uber.org/goleak v1.2.1
golang.org/x/text v0.14.0
Expand All @@ -16,7 +16,7 @@ require (
require (
github.com/minio/highwayhash v1.0.2 // indirect
github.com/nats-io/jwt/v2 v2.5.3 // indirect
golang.org/x/crypto v0.16.0 // indirect
golang.org/x/sys v0.15.0 // indirect
golang.org/x/crypto v0.18.0 // indirect
golang.org/x/sys v0.16.0 // indirect
golang.org/x/time v0.5.0 // indirect
)
16 changes: 8 additions & 8 deletions go_test.sum
Expand Up @@ -16,21 +16,21 @@ github.com/minio/highwayhash v1.0.2 h1:Aak5U0nElisjDCfPSG79Tgzkn2gl66NxOMspRrKnA
github.com/minio/highwayhash v1.0.2/go.mod h1:BQskDq+xkJ12lmlUUi7U0M5Swg3EWR+dLTk+kldvVxY=
github.com/nats-io/jwt/v2 v2.5.3 h1:/9SWvzc6hTfamcgXJ3uYRpgj+QuY2aLNqRiqrKcrpEo=
github.com/nats-io/jwt/v2 v2.5.3/go.mod h1:iysuPemFcc7p4IoYots3IuELSI4EDe9Y0bQMe+I3Bf4=
github.com/nats-io/nats-server/v2 v2.10.7 h1:f5VDy+GMu7JyuFA0Fef+6TfulfCs5nBTgq7MMkFJx5Y=
github.com/nats-io/nats-server/v2 v2.10.7/go.mod h1:V2JHOvPiPdtfDXTuEUsthUnCvSDeFrK4Xn9hRo6du7c=
github.com/nats-io/nkeys v0.4.6 h1:IzVe95ru2CT6ta874rt9saQRkWfe2nFj1NtvYSLqMzY=
github.com/nats-io/nkeys v0.4.6/go.mod h1:4DxZNzenSVd1cYQoAa8948QY3QDjrHfcfVADymtkpts=
github.com/nats-io/nats-server/v2 v2.10.9 h1:VEW43Zz+p+9lARtiPM9ctd6ckun+92ZT2T17HWtwiFI=
github.com/nats-io/nats-server/v2 v2.10.9/go.mod h1:oorGiV9j3BOLLO3ejQe+U7pfAGyPo+ppD7rpgNF6KTQ=
github.com/nats-io/nkeys v0.4.7 h1:RwNJbbIdYCoClSDNY7QVKZlyb/wfT6ugvFCiKy6vDvI=
github.com/nats-io/nkeys v0.4.7/go.mod h1:kqXRgRDPlGy7nGaEDMuYzmiJCIAAWDK0IMBtDmGD0nc=
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/stretchr/testify v1.8.0 h1:pSgiaMZlXftHpm5L7V1+rVB+AZJydKsMxsQBIJw4PKk=
go.uber.org/goleak v1.2.1 h1:NBol2c7O1ZokfZ0LEU9K6Whx/KnwvepVetCUhtKja4A=
go.uber.org/goleak v1.2.1/go.mod h1:qlT2yGI9QafXHhZZLxlSuNsMw3FFLxBr+tBRlmO1xH4=
golang.org/x/crypto v0.16.0 h1:mMMrFzRSCF0GvB7Ne27XVtVAaXLrPmgPC7/v0tkwHaY=
golang.org/x/crypto v0.16.0/go.mod h1:gCAAfMLgwOJRpTjQ2zCCt2OcSfYMTeZVSRtQlPC7Nq4=
golang.org/x/crypto v0.18.0 h1:PGVlW0xEltQnzFZ55hkuX5+KLyrMYhHld1YHO4AKcdc=
golang.org/x/crypto v0.18.0/go.mod h1:R0j02AL6hcrfOiy9T4ZYp/rcWeMxM3L6QYxlOuEG1mg=
golang.org/x/sys v0.0.0-20190130150945-aca44879d564/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.15.0 h1:h48lPFYpsTvQJZF4EKyI4aLHaev3CxivZmv7yZig9pc=
golang.org/x/sys v0.15.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.16.0 h1:xWw16ngr6ZMtmxDyKyIgsE93KNKz5HKmMa3b8ALHidU=
golang.org/x/sys v0.16.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ=
golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU=
golang.org/x/time v0.5.0 h1:o7cqy6amK/52YcAKIPlM3a+Fpj35zvRj2TP+e1xFSfk=
Expand Down
25 changes: 24 additions & 1 deletion jetstream/message.go
Expand Up @@ -75,6 +75,15 @@ type (
// Term tells the server to not redeliver this message, regardless of
// the value of MaxDeliver.
Term() error

// TermWithReason tells the server to not redeliver this message, regardless of
// the value of MaxDeliver. The provided reason will be included in JetStream
// advisory event sent by the server.
//
// Note: This will only work with JetStream servers >= 2.10.4.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would result in a timeout error right since implementation uses ackReply?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But ackReply does a simple publish on a JS reply subject, it does not wait for server ack (only DoubleAck does that). So it would simply work as if the message was not terminated and will eventually be redelivered.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, i always get confused with this method name, you're right.

// For older servers, TermWithReason will be ignored by the server and the message
// will not be terminated.
TermWithReason(reason string) error
}

// MsgMetadata is the JetStream metadata associated with received messages.
Expand Down Expand Up @@ -123,7 +132,8 @@ type (
}

ackOpts struct {
nakDelay time.Duration
nakDelay time.Duration
termReason string
}

ackType []byte
Expand Down Expand Up @@ -305,6 +315,17 @@ func (m *jetStreamMsg) Term() error {
return m.ackReply(context.Background(), ackTerm, false, ackOpts{})
}

// TermWithReason tells the server to not redeliver this message, regardless of
// the value of MaxDeliver. The provided reason will be included in JetStream
// advisory event sent by the server.
//
// Note: This will only work with JetStream servers >= 2.10.4.
// For older servers, TermWithReason will be ignored by the server and the message
// will not be terminated.
func (m *jetStreamMsg) TermWithReason(reason string) error {
return m.ackReply(context.Background(), ackTerm, false, ackOpts{termReason: reason})
}

func (m *jetStreamMsg) ackReply(ctx context.Context, ackType ackType, sync bool, opts ackOpts) error {
err := m.checkReply()
if err != nil {
Expand All @@ -329,6 +350,8 @@ func (m *jetStreamMsg) ackReply(ctx context.Context, ackType ackType, sync bool,
var body []byte
if opts.nakDelay > 0 {
body = []byte(fmt.Sprintf("%s {\"delay\": %d}", ackType, opts.nakDelay.Nanoseconds()))
} else if opts.termReason != "" {
body = []byte(fmt.Sprintf("%s %s", ackType, opts.termReason))
} else {
body = ackType
}
Expand Down
37 changes: 37 additions & 0 deletions jetstream/test/message_test.go
Expand Up @@ -328,6 +328,43 @@ func TestAckVariants(t *testing.T) {
t.Fatalf("Invalid ack body: %q", string(ack.Data))
}
})
t.Run("term with reason", func(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
srv, nc, js, c := setup(ctx, t)
defer shutdownJSServerAndRemoveStorage(t, srv)
defer nc.Close()

if _, err := js.Publish(ctx, "FOO.1", []byte("msg")); err != nil {
t.Fatalf("Unexpected error: %v", err)
}
msgs, err := c.Fetch(1)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
msg := <-msgs.Messages()
if msg == nil {
t.Fatalf("No messages available")
}
if err := msgs.Error(); err != nil {
t.Fatalf("unexpected error during fetch: %v", err)
}
sub, err := nc.SubscribeSync(msg.Reply())
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}

if err := msg.TermWithReason("with reason"); err != nil {
t.Fatalf("Unexpected error: %v", err)
}
ack, err := sub.NextMsgWithContext(ctx)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if string(ack.Data) != "+TERM with reason" {
t.Fatalf("Invalid ack body: %q", string(ack.Data))
}
})
t.Run("in progress", func(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
Expand Down
27 changes: 17 additions & 10 deletions test/js_test.go
Expand Up @@ -3100,10 +3100,12 @@ func TestAccountInfo(t *testing.T) {
`,
expected: &nats.AccountInfo{
Tier: nats.Tier{
Memory: 0,
Store: 0,
Streams: 0,
Consumers: 0,
Memory: 0,
Store: 0,
Streams: 0,
Consumers: 0,
ReservedMemory: 0,
ReservedStore: 0,
Limits: nats.AccountLimits{
MaxMemory: -1,
MaxStore: -1,
Expand Down Expand Up @@ -3144,10 +3146,12 @@ func TestAccountInfo(t *testing.T) {
`,
expected: &nats.AccountInfo{
Tier: nats.Tier{
Memory: 0,
Store: 0,
Streams: 0,
Consumers: 0,
Memory: 0,
Store: 0,
Streams: 0,
Consumers: 0,
ReservedMemory: 0,
ReservedStore: 0,
Limits: nats.AccountLimits{
MaxMemory: 67108864,
MaxStore: 33554432,
Expand Down Expand Up @@ -3209,7 +3213,7 @@ func TestAccountInfo(t *testing.T) {
}

if !reflect.DeepEqual(test.expected, info) {
t.Fatalf("Account info does not match; expected: %v; got: %v", test.expected, info)
t.Fatalf("Account info does not match; expected: %+v; got: %+v", test.expected, info)
}
_, err = js.AddStream(&nats.StreamConfig{Name: "FOO", MaxBytes: 1024})
if err != nil {
Expand All @@ -3229,9 +3233,12 @@ func TestAccountInfo(t *testing.T) {
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
// ignore reserved store in comparison since this is dynamically
// assigned by the server
info.ReservedStore = test.expected.ReservedStore

if !reflect.DeepEqual(test.expected, info) {
t.Fatalf("Account info does not match; expected: %v; got: %v", test.expected, info)
t.Fatalf("Account info does not match; expected: %+v; got: %+v", test.expected, info)
}
})
}
Expand Down