Skip to content

Commit

Permalink
added a test to verify the reuse of the consumer (#192)
Browse files Browse the repository at this point in the history
* Added a test to verify the reuse of the consumer
  • Loading branch information
aricart committed Sep 3, 2021
1 parent ddf5a41 commit dad900f
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 3 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ jobs:
deno-version: ${{ matrix.deno-version }}

- name: Set NATS Server Version
run: echo "NATS_VERSION=v2.3.3" >> $GITHUB_ENV
run: echo "NATS_VERSION=v2.4.0" >> $GITHUB_ENV

- name: Get nats-server
run: |
Expand Down
2 changes: 0 additions & 2 deletions nats-base-client/jsclient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -368,8 +368,6 @@ export class JetStreamClientImpl extends BaseApiClient

if (!jsi.attached) {
jsi.config.filter_subject = subject;
// jsi.config.deliver_subject = jsi.config.deliver_subject ??
// createInbox(this.nc.options.inboxPrefix);
}

jsi.deliver = jsi.config.deliver_subject ||
Expand Down
45 changes: 45 additions & 0 deletions tests/jetstream_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import {
NatsError,
nuid,
QueuedIterator,
RetentionPolicy,
StringCodec,
} from "../nats-base-client/internal_mod.ts";
import {
Expand Down Expand Up @@ -1964,3 +1965,47 @@ Deno.test("jetstream - cleanup", async () => {

await cleanup(ns, nc);
});

Deno.test("jetstream - reuse consumer", async () => {
const { ns, nc } = await setup(jetstreamServerConf({}, true));
const id = nuid.next();
const jsm = await nc.jetstreamManager();
await jsm.streams.add({
subjects: [`${id}.*`],
name: id,
retention: RetentionPolicy.Workqueue,
});

await jsm.consumers.add(id, {
"durable_name": "X",
"deliver_subject": "out",
"deliver_policy": DeliverPolicy.All,
"ack_policy": AckPolicy.Explicit,
"deliver_group": "queuea",
});

// second create should be OK, since it is idempotent
await jsm.consumers.add(id, {
"durable_name": "X",
"deliver_subject": "out",
"deliver_policy": DeliverPolicy.All,
"ack_policy": AckPolicy.Explicit,
"deliver_group": "queuea",
});

const js = nc.jetstream();
const opts = consumerOpts();
opts.ackExplicit();
opts.durable("X");
opts.deliverAll();
opts.deliverTo("out2");
opts.queue("queuea");

const sub = await js.subscribe(`${id}.*`, opts);
const ci = await sub.consumerInfo();
// the deliver subject we specified should be ignored
// the one specified by the consumer is used
assertEquals(ci.config.deliver_subject, "out");

await cleanup(ns, nc);
});

0 comments on commit dad900f

Please sign in to comment.