Skip to content

Commit

Permalink
Merge pull request #560 from nats-io/test-by-start-reset
Browse files Browse the repository at this point in the history
[FIX] ordered consumer max_redeliver set to 1, and to use mem_storage
  • Loading branch information
aricart committed Aug 16, 2023
2 parents b9fd6d5 + 6622385 commit eb56c37
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 6 deletions.
2 changes: 2 additions & 0 deletions jetstream/consumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -896,6 +896,8 @@ export class OrderedPullConsumerImpl implements Consumer {
// reset the consumer sequence as JetStream will renumber from 1
this.cursor.deliver_seq = 0;
const config = this.getConsumerOpts(seq);
config.max_deliver = 1;
config.mem_storage = true;
let ci;
// FIXME: replace with general jetstream retry logic
while (true) {
Expand Down
6 changes: 0 additions & 6 deletions jetstream/jsmstream_api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -97,12 +97,6 @@ export class ConsumersImpl implements Consumers {
}

checkVersion(): Promise<void> {
if (!this.notified) {
this.notified = true;
console.log(
`\u001B[33m >> consumers framework is beta functionality \u001B[0m`,
);
}
const fv = (this.api as ConsumerAPIImpl).nc.features.get(
Feature.JS_SIMPLIFICATION,
);
Expand Down
30 changes: 30 additions & 0 deletions jetstream/tests/consumersordered_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -706,3 +706,33 @@ Deno.test("ordered - headers only", async () => {

await cleanup(ns, nc);
});

Deno.test("ordered - max deliver", async () => {
const { ns, nc } = await setup(jetstreamServerConf());
const js = nc.jetstream();

const jsm = await nc.jetstreamManager();
await jsm.streams.add({ name: "test", subjects: ["test.*"] });

const oc = await js.consumers.get("test");
const ci = await oc.info();
assertExists(ci);
assertEquals(ci.config.max_deliver, 1);

await cleanup(ns, nc);
});

Deno.test("ordered - mem", async () => {
const { ns, nc } = await setup(jetstreamServerConf());
const js = nc.jetstream();

const jsm = await nc.jetstreamManager();
await jsm.streams.add({ name: "test", subjects: ["test.*"] });

const oc = await js.consumers.get("test");
const ci = await oc.info();
assertExists(ci);
assertEquals(ci.config.mem_storage, true);

await cleanup(ns, nc);
});

0 comments on commit eb56c37

Please sign in to comment.