Skip to content

Commit

Permalink
[FIX] when specifying a bind, if the consumer didn't specify deliver_…
Browse files Browse the repository at this point in the history
…to, it was trapped as being a pull consumer, when the expected is that it will be what the bind returns.
  • Loading branch information
aricart committed Mar 4, 2022
1 parent afc6da4 commit 973b06c
Show file tree
Hide file tree
Showing 3 changed files with 90 additions and 2 deletions.
2 changes: 1 addition & 1 deletion nats-base-client/jsclient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -330,7 +330,7 @@ export class JetStreamClientImpl extends BaseApiClient
const cso = await this._processOptions(subject, opts);
// this effectively requires deliver subject to be specified
// as an option otherwise we have a pull consumer
if (!cso.config.deliver_subject) {
if (!cso.isBind && !cso.config.deliver_subject) {
throw new Error(
"consumer info specifies a pull consumer - deliver_subject is required",
);
Expand Down
89 changes: 89 additions & 0 deletions tests/jetstream_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2899,3 +2899,92 @@ Deno.test("jetstream - bind example", async () => {

await cleanup(ns, nc);
});

Deno.test("jetstream - test events stream", async () => {
const { ns, nc } = await setup(jetstreamServerConf({}, true));
const js = nc.jetstream();
const jsm = await nc.jetstreamManager();
await jsm.streams.add({
name: "events",
subjects: ["events.>"],
});

const sub = await js.subscribe("events.>", {
stream: "events",
config: {
ack_policy: AckPolicy.Explicit,
deliver_policy: DeliverPolicy.All,
deliver_subject: "foo",
durable_name: "me",
filter_subject: "events.>",
},
callbackFn: (err: NatsError | null, msg: JsMsg | null) => {
msg?.ack();
},
});

await js.publish("events.a");
await js.publish("events.b");
await delay(2000);
await cleanup(ns, nc);
});

Deno.test("jetstream - sub set error", async () => {
const { ns, nc } = await setup(jetstreamServerConf({}, true));
const js = nc.jetstream();
const jsm = await nc.jetstreamManager();
await jsm.streams.add({
name: "events",
subjects: ["events.>"],
});

const opts = consumerOpts();
opts.durable("me");
opts.manualAck();
opts.ackExplicit();

const sub = await js.pullSubscribe("events.>", opts);

(async () => {
for await (const m of sub) {
console.log("\n", m.subject);
m.ack();
}
})();

await js.publish("events.a");
await js.publish("events.b");

sub.pull({ batch: 10, expires: 2000 });

const ci = await jsm.consumers.info("events", "me");
console.log(ci);

await delay(3000);
await cleanup(ns, nc);
});

Deno.test("jetstream - bind without consumer should fail", async () => {
const { ns, nc } = await setup(jetstreamServerConf({}, true));
const js = nc.jetstream();
const jsm = await nc.jetstreamManager();
await jsm.streams.add({
name: "events",
subjects: ["events.>"],
});

const opts = consumerOpts();
opts.manualAck();
opts.ackExplicit();
opts.bind("events", "hello");

await assertRejects(
async () => {
await js.subscribe("events.>", opts);
},
Error,
"unable to bind - durable consumer hello doesn't exist in events",
);

await cleanup(ns, nc);
});
1 change: 0 additions & 1 deletion tests/jsm_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -939,7 +939,6 @@ Deno.test("jsm - jetstream error info", async () => {
const ne = err as NatsError;
assert(ne.isJetStreamError());
const jerr = ne.jsError();
console.log(jerr);
assert(jerr);
assertEquals(jerr.code, 500);
assertEquals(jerr.err_code, 10074);
Expand Down

0 comments on commit 973b06c

Please sign in to comment.