Skip to content

Commit

Permalink
[FEAT] new consumer options (#251)
Browse files Browse the repository at this point in the history
[FEAT] added ephemeral consumer `inactive_threshold`, (`inactiveEphemeralThreshold(millis) option`) 
[FEAT] added pull consumer options `max_batch` (`maxPullBatch(n)`), and `max_expires` (`maxPullRequestExpires(millis)`)
  • Loading branch information
aricart committed Jan 26, 2022
1 parent d9927b5 commit ac6f15c
Show file tree
Hide file tree
Showing 4 changed files with 51 additions and 1 deletion.
2 changes: 1 addition & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ jobs:
deno-version: ${{ matrix.deno-version }}

- name: Set NATS Server Version
run: echo "NATS_VERSION=v2.7.0" >> $GITHUB_ENV
run: echo "NATS_VERSION=v2.7.1" >> $GITHUB_ENV

# this here because dns seems to be wedged on gha
- name: Add hosts to /etc/hosts
Expand Down
12 changes: 12 additions & 0 deletions nats-base-client/jsconsumeropts.ts
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,18 @@ export class ConsumerOptsBuilderImpl implements ConsumerOptsBuilder {
this.config.durable_name = durable;
this.isBind = true;
}

inactiveEphemeralThreshold(millis: number): void {
this.config.inactive_threshold = nanos(millis);
}

maxPullBatch(n: number): void {
this.config.max_batch = n;
}

maxPullRequestExpires(millis: number): void {
this.config.max_expires = nanos(millis);
}
}

export function isConsumerOptsBuilder(
Expand Down
9 changes: 9 additions & 0 deletions nats-base-client/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -408,6 +408,12 @@ export interface ConsumerOptsBuilder {
orderedConsumer(): void;
// binds to a consumer
bind(stream: string, durable: string): void;
// max number of messages to be delivered to a pull consumer (pull consumer only)
maxPullBatch(n: number): void;
// max amount of time before a pull request expires
maxPullRequestExpires(millis: number): void;
// max amount of time before an inactive ephemeral consumer is discarded
inactiveEphemeralThreshold(millis: number): void;
}

export interface Lister<T> {
Expand Down Expand Up @@ -788,6 +794,9 @@ export interface ConsumerUpdateConfig {
"max_waiting"?: number;
"headers_only"?: boolean;
"deliver_subject"?: string;
"max_batch"?: number;
"max_expires"?: Nanos;
"inactive_threshold"?: Nanos;
}

export interface Consumer {
Expand Down
29 changes: 29 additions & 0 deletions tests/jetstream_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -685,6 +685,35 @@ Deno.test("jetstream - max ack pending", async () => {
await cleanup(ns, nc);
});

Deno.test("jetstream - ephemeral options", async () => {
const { ns, nc } = await setup(jetstreamServerConf({}, true));
const { stream } = await initStream(nc);
const jsm = await nc.jetstreamManager();
const v = await jsm.consumers.add(stream, {
inactive_threshold: nanos(1000),
ack_policy: AckPolicy.Explicit,
});
assertEquals(v.config.inactive_threshold, nanos(1000));
await cleanup(ns, nc);
});

Deno.test("jetstream - pull consumer options", async () => {
const { ns, nc } = await setup(jetstreamServerConf({}, true));
const { stream } = await initStream(nc);
const jsm = await nc.jetstreamManager();
const v = await jsm.consumers.add(stream, {
durable_name: "me",
ack_policy: AckPolicy.Explicit,
max_batch: 10,
max_expires: nanos(20000),
});

assertEquals(v.config.max_batch, 10);
assertEquals(v.config.max_expires, nanos(20000));

await cleanup(ns, nc);
});

Deno.test("jetstream - pull sub - attached iterator", async () => {
const { ns, nc } = await setup(jetstreamServerConf({}, true));
const { stream, subj } = await initStream(nc);
Expand Down

0 comments on commit ac6f15c

Please sign in to comment.