Skip to content

Commit

Permalink
[FIX] improved ordered consumer - api calls on the consumer no longer…
Browse files Browse the repository at this point in the history
… re-create the consumer
  • Loading branch information
aricart committed May 18, 2024
1 parent 8fe83cc commit e266cc2
Show file tree
Hide file tree
Showing 2 changed files with 205 additions and 100 deletions.
67 changes: 49 additions & 18 deletions jetstream/consumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,11 @@ export enum ConsumerDebugEvents {
* have the format of `{msgsLeft: number, bytesLeft: number}`.
*/
Discard = "discard",

/**
* Notifies that the current consumer will be reset
*/
Reset = "reset",
/**
* Notifies whenever there's a request for additional messages from the server.
* This notification telegraphs the request options, which should be treated as
Expand Down Expand Up @@ -1110,7 +1115,7 @@ export class OrderedPullConsumerImpl implements Consumer {
}
const dseq = m.info.deliverySequence;
if (dseq !== this.cursor.deliver_seq + 1) {
this.reset(this.opts);
this.notifyOrderedResetAndReset();
return;
}
this.cursor.deliver_seq = dseq;
Expand All @@ -1124,14 +1129,37 @@ export class OrderedPullConsumerImpl implements Consumer {
};
}

async reset(opts: ConsumeOptions | FetchOptions = {
max_messages: 100,
expires: 30_000,
} as ConsumeMessages, fromFetch = false): Promise<ConsumerMessages> {
this.currentConsumer = await this.resetConsumer(
this.cursor.stream_seq + 1,
);
if (this.iter === null) {
async reset(
opts: ConsumeOptions | FetchOptions = {
max_messages: 100,
expires: 30_000,
} as ConsumeMessages,
info?: Partial<{ fromFetch: boolean; orderedReset: boolean }>,
): Promise<void> {
info = info || {};
// this is known to be directly related to a pull
const fromFetch = info.fromFetch || false;
// a sequence order caused the reset
const orderedReset = info.orderedReset || false;

if (this.type === PullConsumerType.Fetch && orderedReset) {
// the fetch pull simply needs to end the iterator
this.iter?.src.stop();
await this.iter?.closed();
this.currentConsumer = null;
return;
}

if (this.currentConsumer === null || orderedReset) {
this.currentConsumer = await this.resetConsumer(
this.cursor.stream_seq + 1,
);
}

// if we don't have an iterator, or it is a fetch request
// we create the iterator - otherwise this is a reset that is happening
// while the OC is active, so simply bind the new OC to current iterator.
if (this.iter === null || fromFetch) {
this.iter = new OrderedConsumerMessages();
}
this.consumer = new PullConsumerImpl(this.api, this.currentConsumer);
Expand All @@ -1144,19 +1172,21 @@ export class OrderedPullConsumerImpl implements Consumer {
msgs = await this.consumer.fetch(opts);
} else if (this.type === PullConsumerType.Consume) {
msgs = await this.consumer.consume(opts);
} else {
return Promise.reject("reset called with unset consumer type");
}
const msgsImpl = msgs as PullConsumerMessagesImpl;
msgsImpl.forOrderedConsumer = true;
msgsImpl.resetHandler = () => {
this.reset(this.opts);
this.notifyOrderedResetAndReset();
};
this.iter.setSource(msgsImpl);
return this.iter;
}

consume(opts: ConsumeOptions = {
notifyOrderedResetAndReset() {
this.iter?.notify(ConsumerDebugEvents.Reset, "");
this.reset(this.opts, { orderedReset: true });
}

async consume(opts: ConsumeOptions = {
max_messages: 100,
expires: 30_000,
} as ConsumeMessages): Promise<ConsumerMessages> {
Expand All @@ -1178,10 +1208,11 @@ export class OrderedPullConsumerImpl implements Consumer {
}
this.type = PullConsumerType.Consume;
this.opts = opts;
return this.reset(opts);
await this.reset(opts);
return this.iter!;
}

fetch(
async fetch(
opts: FetchOptions = { max_messages: 100, expires: 30_000 },
): Promise<ConsumerMessages> {
const copts = opts as ConsumeOptions;
Expand All @@ -1208,8 +1239,8 @@ export class OrderedPullConsumerImpl implements Consumer {
}
this.type = PullConsumerType.Fetch;
this.opts = opts;
this.iter = new OrderedConsumerMessages();
return this.reset(opts, true);
await this.reset(opts, { fromFetch: true });
return this.iter!;
}

async next(
Expand Down
238 changes: 156 additions & 82 deletions jetstream/tests/consumers_ordered_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,12 @@ import {
assertRejects,
assertStringIncludes,
} from "https://deno.land/std@0.221.0/assert/mod.ts";
import { DeliverPolicy, JsMsg } from "../mod.ts";
import {
ConsumerDebugEvents,
ConsumerMessages,
DeliverPolicy,
JsMsg,
} from "../mod.ts";
import {
OrderedConsumerMessages,
OrderedPullConsumerImpl,
Expand Down Expand Up @@ -89,87 +94,6 @@ Deno.test("ordered consumers - fetch", async () => {
await cleanup(ns, nc);
});

Deno.test("ordered consumers - fetch reset", async () => {
const { ns, nc } = await setup(jetstreamServerConf());
const js = nc.jetstream();

const jsm = await nc.jetstreamManager();
await jsm.streams.add({ name: "test", subjects: ["test.*"] });
await js.publish("test.a");
await js.publish("test.b");
await js.publish("test.c");

const oc = await js.consumers.get("test") as OrderedPullConsumerImpl;
assertExists(oc);

const seen: number[] = new Array(3).fill(0);
let done = deferred();

const callback = (m: JsMsg) => {
const idx = m.seq - 1;
seen[idx]++;
// mess with the internals so we see these again
if (seen[idx] === 1) {
oc.cursor.deliver_seq--;
oc.cursor.stream_seq--;
}
iter.stop();
done.resolve();
};

let iter = await oc.fetch({
max_messages: 1,
//@ts-ignore: callback not exposed
callback,
});
await done;
done = deferred();

iter = await oc.fetch({
max_messages: 1,
//@ts-ignore: callback not exposed
callback,
});
await done;
done = deferred();

iter = await oc.fetch({
max_messages: 1,
//@ts-ignore: callback not exposed
callback,
});
await done;
done = deferred();

iter = await oc.fetch({
max_messages: 1,
//@ts-ignore: callback not exposed
callback,
});
await done;
done = deferred();

iter = await oc.fetch({
max_messages: 1,
//@ts-ignore: callback not exposed
callback,
});
await done;
done = deferred();

iter = await oc.fetch({
max_messages: 1,
//@ts-ignore: callback not exposed
callback,
});
await done;

assertEquals(seen, [2, 2, 2]);
assertEquals(oc.serial, 6);

await cleanup(ns, nc);
});

Deno.test("ordered consumers - consume reset", async () => {
const { ns, nc } = await setup(jetstreamServerConf());
const js = nc.jetstream();
Expand Down Expand Up @@ -978,3 +902,153 @@ Deno.test("ordered consumers - name prefix", async () => {

await cleanup(ns, nc);
});

Deno.test("ordered consumers - fetch reset", async () => {
const { ns, nc } = await setup(jetstreamServerConf());
const jsm = await nc.jetstreamManager();

await jsm.streams.add({ name: "A", subjects: ["a"] });
const js = nc.jetstream();
await js.publish("a", JSON.stringify(1));

const c = await js.consumers.get("A") as OrderedPullConsumerImpl;

let resets = 0;
function countResets(iter: ConsumerMessages): Promise<void> {
return (async () => {
for await (const s of await iter.status()) {
if (s.type === ConsumerDebugEvents.Reset) {
resets++;
}
}
})();
}

// after the first message others will get published
let iter = await c.fetch({ max_messages: 10, expires: 3_000 });
const first = countResets(iter);
const sequences = [];
for await (const m of iter) {
sequences.push(m.json());
// mess with the internal state to cause a reset
if (m.seq === 1) {
c.cursor.deliver_seq = 3;
const buf = [];
for (let i = 2; i < 20; i++) {
buf.push(js.publish("a", JSON.stringify(i)));
}
await Promise.all(buf);
}
}

iter = await c.fetch({ max_messages: 10, expires: 2_000 });
const second = countResets(iter);

const done = (async () => {
for await (const m of iter) {
sequences.push(m.json());
}
})().catch();

await Promise.all([first, second, done]);
assertEquals(c.serial, 2);
assertEquals(resets, 1);
assertEquals(sequences, [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]);
await cleanup(ns, nc);
});

Deno.test("ordered consumers - consume reset", async () => {
const { ns, nc } = await setup(jetstreamServerConf());
const jsm = await nc.jetstreamManager();

await jsm.streams.add({ name: "A", subjects: ["a"] });
const js = nc.jetstream();
await js.publish("a", JSON.stringify(1));

let resets = 0;
function countResets(iter: ConsumerMessages): Promise<void> {
return (async () => {
for await (const s of await iter.status()) {
if (s.type === ConsumerDebugEvents.Reset) {
resets++;
}
}
})();
}

const c = await js.consumers.get("A") as OrderedPullConsumerImpl;

// after the first message others will get published
let iter = await c.consume({ max_messages: 11, expires: 5000 });
countResets(iter).catch();
const sequences = [];
for await (const m of iter) {
sequences.push(m.json());
// mess with the internal state to cause a reset
if (m.seq === 1) {
c.cursor.deliver_seq = 3;
const buf = [];
for (let i = 2; i < 20; i++) {
buf.push(js.publish("a", JSON.stringify(i)));
}
await Promise.all(buf);
}
if (m.seq === 11) {
break;
}
}

assertEquals(c.serial, 2);
assertEquals(resets, 1);
assertEquals(sequences, [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]);

await cleanup(ns, nc);
});

Deno.test("ordered consumers - next reset", async () => {
const { ns, nc } = await setup(jetstreamServerConf());
const jsm = await nc.jetstreamManager();

await jsm.streams.add({ name: "A", subjects: ["a"] });
const js = nc.jetstream();
await js.publish("a", JSON.stringify(1));
await js.publish("a", JSON.stringify(2));

const c = await js.consumers.get("A") as OrderedPullConsumerImpl;

// get the first
let m = await c.next({ expires: 1000 });
assertExists(m);
assertEquals(m.json(), 1);

// force a reset
c.cursor.deliver_seq = 3;
await js.publish("a", JSON.stringify(2));

m = await c.next({ expires: 1000 });
assertEquals(m, null);
assertEquals(c.serial, 1);

await cleanup(ns, nc);
});

Deno.test("ordered consumers - next reset", async () => {
const { ns, nc } = await setup(jetstreamServerConf());
const jsm = await nc.jetstreamManager();

await jsm.streams.add({ name: "A", subjects: ["a"] });
const js = nc.jetstream();

await js.publish("a", JSON.stringify(1));
await js.publish("a", JSON.stringify(2));

const c = await js.consumers.get("A") as OrderedPullConsumerImpl;
await c.next();
await c.next();

assertEquals(c.serial, 1);
await c.info();
assertEquals(c.serial, 1);

await cleanup(ns, nc);
});

0 comments on commit e266cc2

Please sign in to comment.