Skip to content

Commit

Permalink
Merge pull request #608 from ripienaar/consumer_max_ack_pending
Browse files Browse the repository at this point in the history
support consumer MaxAckPending
  • Loading branch information
ripienaar committed Nov 16, 2020
2 parents 0482788 + 1412c4f commit be5005e
Show file tree
Hide file tree
Showing 4 changed files with 18 additions and 17 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ go 1.14

require (
github.com/golang/protobuf v1.4.2
github.com/nats-io/nats-server/v2 v2.1.8-0.20201103213111-0965a20b516d
github.com/nats-io/nats-server/v2 v2.1.8-0.20201115145023-f61fa8529a0f
github.com/nats-io/nkeys v0.2.0
github.com/nats-io/nuid v1.0.1
google.golang.org/protobuf v1.23.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ github.com/nats-io/nats-server/v2 v2.1.8-0.20200601203034-f8d6dd992b71 h1:nexMtK
github.com/nats-io/nats-server/v2 v2.1.8-0.20200601203034-f8d6dd992b71/go.mod h1:Nan/1L5Sa1JRW+Thm4HNYcIDcVRFc5zK9OpSZeI2kk4=
github.com/nats-io/nats-server/v2 v2.1.8-0.20200929001935-7f44d075f7ad h1:oRb9MIi1Y4N5cTZWciqH68aVNt1e+o4N2uRnjVzv/UE=
github.com/nats-io/nats-server/v2 v2.1.8-0.20200929001935-7f44d075f7ad/go.mod h1:TkHpUIDETmTI7mrHN40D1pzxfzHZuGmtMbtb83TGVQw=
github.com/nats-io/nats-server/v2 v2.1.8-0.20201103213111-0965a20b516d h1:zNXqQAfBphOwTdcjnd+6wM4dmMKpW5u3a9vyBhtcEIc=
github.com/nats-io/nats-server/v2 v2.1.8-0.20201103213111-0965a20b516d/go.mod h1:XD0zHR/jTXdZvWaQfS5mQgsXj6x12kMjKLyAk/cOGgY=
github.com/nats-io/nats-server/v2 v2.1.8-0.20201115145023-f61fa8529a0f h1:BURI+N+9gQDk0JEkuUm2byL+AoZ0tm+wYEA2UT+xq6A=
github.com/nats-io/nats-server/v2 v2.1.8-0.20201115145023-f61fa8529a0f/go.mod h1:XD0zHR/jTXdZvWaQfS5mQgsXj6x12kMjKLyAk/cOGgY=
github.com/nats-io/nats.go v1.10.0/go.mod h1:AjGArbfyR50+afOUotNX2Xs5SYHf+CoOa5HH1eEl2HE=
github.com/nats-io/nats.go v1.10.1-0.20200531124210-96f2130e4d55/go.mod h1:ARiFsjW9DVxk48WJbO3OSZ2DG8fjkMi7ecLmXoY/n9I=
github.com/nats-io/nats.go v1.10.1-0.20200606002146-fc6fed82929a/go.mod h1:8eAIv96Mo9QW6Or40jUHejS7e4VwZ3VRYD6Sf0BTDp4=
Expand Down
1 change: 1 addition & 0 deletions jetstream_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,7 @@ type ConsumerConfig struct {
ReplayPolicy ReplayPolicy `json:"replay_policy"`
SampleFrequency string `json:"sample_freq,omitempty"`
RateLimit uint64 `json:"rate_limit_bps,omitempty"`
MaxAckPending int `json:"max_ack_pending,omitempty"`
}

type consumerConfig struct {
Expand Down
28 changes: 14 additions & 14 deletions jetstream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ func TestMsg_ParseJSMsgMetadata(t *testing.T) {
}

if meta.StreamSeq != 2 {
t.Fatalf("Expected 2 got %q", meta.StreamSeq)
t.Fatalf("Expected 2 got %q", meta.Stream)
}

if meta.ConsumerSeq != 3 {
Expand Down Expand Up @@ -181,7 +181,7 @@ func TestMsg_Ack(t *testing.T) {
t.Fatalf("received invalid 'msg 1': %q", msg.Data)
}

if cons.Info().AckFloor.StreamSeq != 0 {
if cons.Info().AckFloor.Stream != 0 {
t.Fatalf("first message was already acked")
}

Expand All @@ -190,7 +190,7 @@ func TestMsg_Ack(t *testing.T) {
t.Fatalf("ack failed: %s", err)
}

if cons.Info().AckFloor.StreamSeq != 1 {
if cons.Info().AckFloor.Stream != 1 {
t.Fatalf("first message was not acked")
}
}
Expand All @@ -209,7 +209,7 @@ func TestMsg_Nak(t *testing.T) {
t.Fatalf("received invalid 'msg 1': %q", msg.Data)
}

if cons.Info().AckFloor.StreamSeq != 0 {
if cons.Info().AckFloor.Stream != 0 {
t.Fatalf("first message was already acked")
}

Expand All @@ -218,7 +218,7 @@ func TestMsg_Nak(t *testing.T) {
t.Fatalf("ack failed: %s", err)
}

if cons.Info().AckFloor.StreamSeq != 0 {
if cons.Info().AckFloor.Stream != 0 {
t.Fatalf("first message was acked")
}
}
Expand All @@ -237,7 +237,7 @@ func TestMsg_AckTerm(t *testing.T) {
t.Fatalf("received invalid 'msg 1': %q", msg.Data)
}

if cons.Info().AckFloor.StreamSeq != 0 {
if cons.Info().AckFloor.Stream != 0 {
t.Fatalf("first message was already acked")
}

Expand All @@ -246,7 +246,7 @@ func TestMsg_AckTerm(t *testing.T) {
t.Fatalf("ack failed: %s", err)
}

if cons.Info().AckFloor.StreamSeq != 1 {
if cons.Info().AckFloor.Stream != 1 {
t.Fatalf("first message was not acked")
}
}
Expand All @@ -265,7 +265,7 @@ func TestMsg_AckProgress(t *testing.T) {
t.Fatalf("received invalid 'msg 1': %q", msg.Data)
}

if cons.Info().AckFloor.StreamSeq != 0 {
if cons.Info().AckFloor.Stream != 0 {
t.Fatalf("first message was already acked")
}

Expand All @@ -274,7 +274,7 @@ func TestMsg_AckProgress(t *testing.T) {
t.Fatalf("ack failed: %s", err)
}

if cons.Info().AckFloor.StreamSeq != 0 {
if cons.Info().AckFloor.Stream != 0 {
t.Fatalf("first message was acked")
}

Expand All @@ -283,7 +283,7 @@ func TestMsg_AckProgress(t *testing.T) {
t.Fatalf("ack failed: %s", err)
}

if cons.Info().AckFloor.StreamSeq != 1 {
if cons.Info().AckFloor.Stream != 1 {
t.Fatalf("first message was not acked")
}
}
Expand All @@ -303,14 +303,14 @@ func TestMsg_AckAndFetch(t *testing.T) {
}

for i := 1; i < 20; i++ {
if cons.Info().AckFloor.StreamSeq == uint64(i) {
if cons.Info().AckFloor.Stream == uint64(i) {
t.Fatalf("message %d was already acked", i)
}
msg, err = msg.AckAndFetch()
if err != nil {
t.Fatalf("ack failed: %s", err)
}
if cons.Info().AckFloor.StreamSeq != uint64(i) {
if cons.Info().AckFloor.Stream != uint64(i) {
t.Fatalf("message %d was not acked", i)
}
if !bytes.Equal(msg.Data, []byte(fmt.Sprintf("msg %d", i+1))) {
Expand Down Expand Up @@ -343,7 +343,7 @@ func TestMsg_AckNext(t *testing.T) {
t.Fatalf("received invalid 'msg 1': %q", msg.Data)
}

if cons.Info().AckFloor.StreamSeq != 0 {
if cons.Info().AckFloor.Stream != 0 {
t.Fatalf("first message was already acked")
}

Expand All @@ -364,7 +364,7 @@ func TestMsg_AckNext(t *testing.T) {
}
}

if cons.Info().AckFloor.StreamSeq != 1 {
if cons.Info().AckFloor.Stream != 1 {
t.Fatalf("first message was not acked")
}
}

0 comments on commit be5005e

Please sign in to comment.