diff --git a/nats-base-client/jsclient.ts b/nats-base-client/jsclient.ts index 02c99b10..bfb2f9f1 100644 --- a/nats-base-client/jsclient.ts +++ b/nats-base-client/jsclient.ts @@ -369,6 +369,7 @@ export class JetStreamClientImpl extends BaseApiClient so.dispatchedFn = autoAckJsMsg; } so.max = jsi.max || 0; + so.queue = jsi.queue; return so; } diff --git a/nats-base-client/jsconsumeropts.ts b/nats-base-client/jsconsumeropts.ts index da11b619..0a66f695 100644 --- a/nats-base-client/jsconsumeropts.ts +++ b/nats-base-client/jsconsumeropts.ts @@ -42,6 +42,7 @@ export class ConsumerOptsBuilderImpl implements ConsumerOptsBuilder { stream: string; callbackFn?: JsMsgCallback; max?: number; + qname?: string; constructor(opts?: Partial) { this.stream = ""; @@ -58,6 +59,7 @@ export class ConsumerOptsBuilderImpl implements ConsumerOptsBuilder { o.stream = this.stream; o.callbackFn = this.callbackFn; o.max = this.max; + o.queue = this.qname; return o; } @@ -129,6 +131,10 @@ export class ConsumerOptsBuilderImpl implements ConsumerOptsBuilder { callback(fn: JsMsgCallback) { this.callbackFn = fn; } + + queue(n: string) { + this.qname = n; + } } export function isConsumerOptsBuilder( diff --git a/nats-base-client/types.ts b/nats-base-client/types.ts index 42bd45d7..4b900e44 100644 --- a/nats-base-client/types.ts +++ b/nats-base-client/types.ts @@ -321,6 +321,7 @@ export interface ConsumerOpts { // standard max?: number; + queue?: string; debug?: boolean; } @@ -355,6 +356,8 @@ export interface ConsumerOptsBuilder { maxWaiting(max: number): void; // standard nats subscribe option for the maximum number of messages to receive on the subscription maxMessages(max: number): void; + // standard nats queue group option + queue(n: string): void; // callback to process messages (or iterator is returned) callback(fn: JsMsgCallback): void; } diff --git a/tests/jetstream_test.ts b/tests/jetstream_test.ts index 44261394..ffc54e97 100644 --- a/tests/jetstream_test.ts +++ b/tests/jetstream_test.ts @@ -1556,3 +1556,35 @@ Deno.test("jetstream - JSON", async () => { } await cleanup(ns, nc); }); + +Deno.test("jetstream - qsub", async () => { + const { ns, nc } = await setup(jetstreamServerConf({}, true)); + const { subj } = await initStream(nc); + const js = nc.jetstream(); + + const opts = consumerOpts(); + opts.queue("q"); + opts.durable("n"); + opts.deliverTo("here"); + opts.callback((err, m) => { + if (m) { + m.ack(); + } + }); + + const sub = await js.subscribe(subj, opts); + const sub2 = await js.subscribe(subj, opts); + + for (let i = 0; i < 100; i++) { + await js.publish(subj, Empty); + } + await nc.flush(); + await sub.drain(); + await sub2.drain(); + + assert(sub.getProcessed() > 0); + assert(sub2.getProcessed() > 0); + assertEquals(sub.getProcessed() + sub2.getProcessed(), 100); + + await cleanup(ns, nc); +});