Skip to content

Commit

Permalink
Merge pull request #2246 from nats-io/wq-retention
Browse files Browse the repository at this point in the history
Fix for #2243. We were not allowing replicated acks processing for work queues.
  • Loading branch information
derekcollison committed May 24, 2021
2 parents 11539ec + 8888ab5 commit 9d86788
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 6 deletions.
2 changes: 1 addition & 1 deletion server/jetstream_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -2850,7 +2850,7 @@ func (o *consumer) processReplicatedAck(dseq, sseq uint64) {
o.mu.RLock()

mset := o.mset
if mset == nil || mset.cfg.Retention != InterestPolicy {
if mset == nil || mset.cfg.Retention == LimitsPolicy {
o.mu.RUnlock()
return
}
Expand Down
60 changes: 55 additions & 5 deletions server/jetstream_cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1979,11 +1979,6 @@ func TestJetStreamClusterInterestRetention(t *testing.T) {
m.Ack()

waitForZero := func() {
js, err := nc.JetStream(nats.MaxWait(50 * time.Millisecond))
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}

checkFor(t, 2*time.Second, 100*time.Millisecond, func() error {
si, err := js.StreamInfo("foo")
if err != nil {
Expand Down Expand Up @@ -2014,6 +2009,61 @@ func TestJetStreamClusterInterestRetention(t *testing.T) {
waitForZero()
}

// https://github.com/nats-io/nats-server/issues/2243
func TestJetStreamClusterWorkQueueRetention(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()

// Client based API
s := c.randomServer()
nc, js := jsClientConnect(t, s)
defer nc.Close()

_, err := js.AddStream(&nats.StreamConfig{
Name: "FOO",
Subjects: []string{"foo.*"},
Replicas: 2,
Retention: nats.WorkQueuePolicy,
})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}

sub, err := js.PullSubscribe("foo.test", "test")
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}

if _, err = js.Publish("foo.test", []byte("OK")); err != nil {
t.Fatalf("Unexpected publish error: %v", err)
}
si, err := js.StreamInfo("FOO")
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if si.State.Msgs != 1 {
t.Fatalf("Expected 1 msg, got state: %+v", si.State)
}

// Fetch from our pull consumer and ack.
for _, m := range fetchMsgs(t, sub, 1, 5*time.Second) {
m.Ack()
}

// Make sure the messages are removed.
checkFor(t, 2*time.Second, 100*time.Millisecond, func() error {
si, err := js.StreamInfo("FOO")
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if si.State.Msgs != 0 {
return fmt.Errorf("Expected 0 msgs, got state: %+v", si.State)
}
return nil
})

}

func TestJetStreamClusterMirrorAndSourceWorkQueues(t *testing.T) {
c := createJetStreamClusterExplicit(t, "WQ", 3)
defer c.shutdown()
Expand Down

0 comments on commit 9d86788

Please sign in to comment.