Skip to content

Commit

Permalink
[FEAT] added support for Consumers to set mem_storage.
Browse files Browse the repository at this point in the history
  • Loading branch information
aricart committed Jul 27, 2022
1 parent 02b0add commit fa40d3d
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 0 deletions.
5 changes: 5 additions & 0 deletions nats-base-client/jsconsumeropts.ts
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,11 @@ export class ConsumerOptsBuilderImpl implements ConsumerOptsBuilder {
this.config.max_expires = nanos(millis);
return this;
}

memory() {
this.config.mem_storage = true;
return this;
}
}

export function isConsumerOptsBuilder(
Expand Down
6 changes: 6 additions & 0 deletions nats-base-client/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1227,6 +1227,12 @@ export interface ConsumerOptsBuilder {
* @param millis
*/
inactiveEphemeralThreshold(millis: number): this;

/**
* Force the consumer state to be kept in memory rather than inherit the setting from
* the Stream
*/
memory(): this;
}

/**
Expand Down
35 changes: 35 additions & 0 deletions tests/jetstream_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3510,3 +3510,38 @@ Deno.test("jetstream - republish", async () => {

await cleanup(ns, nc);
});

Deno.test("jetstream - mem_storage consumer option", async () => {
const { ns, nc } = await setup(jetstreamServerConf({}, true));
if (await notCompatible(ns, nc, "2.9.0")) {
return;
}

const { stream, subj } = await initStream(nc);
const jsm = await nc.jetstreamManager();
const si = await jsm.streams.info(stream);
assertEquals(si.config.storage, StorageType.File);

const opts = consumerOpts();
opts.ackExplicit();
opts.durable("opts");
opts.memory();

const js = nc.jetstream();
const sub = await js.pullSubscribe(subj, opts);
let ci = await sub.consumerInfo();
assertEquals(ci.config.mem_storage, true);

ci.config.mem_storage = false;
ci = await jsm.consumers.update(stream, "opts", ci.config);
assertEquals(ci.config.mem_storage, undefined);

ci = await jsm.consumers.add(stream, {
durable_name: "dopts",
mem_storage: true,
ack_policy: AckPolicy.Explicit,
});
assertEquals(ci.config.mem_storage, true);

await cleanup(ns, nc);
});

0 comments on commit fa40d3d

Please sign in to comment.