diff --git a/jetstream/consumer.ts b/jetstream/consumer.ts index e0974f55..6ed6a863 100644 --- a/jetstream/consumer.ts +++ b/jetstream/consumer.ts @@ -239,6 +239,11 @@ export enum ConsumerDebugEvents { * have the format of `{msgsLeft: number, bytesLeft: number}`. */ Discard = "discard", + + /** + * Notifies that the current consumer will be reset + */ + Reset = "reset", /** * Notifies whenever there's a request for additional messages from the server. * This notification telegraphs the request options, which should be treated as @@ -1110,7 +1115,7 @@ export class OrderedPullConsumerImpl implements Consumer { } const dseq = m.info.deliverySequence; if (dseq !== this.cursor.deliver_seq + 1) { - this.reset(this.opts); + this.notifyOrderedResetAndReset(); return; } this.cursor.deliver_seq = dseq; @@ -1124,14 +1129,37 @@ export class OrderedPullConsumerImpl implements Consumer { }; } - async reset(opts: ConsumeOptions | FetchOptions = { - max_messages: 100, - expires: 30_000, - } as ConsumeMessages, fromFetch = false): Promise { - this.currentConsumer = await this.resetConsumer( - this.cursor.stream_seq + 1, - ); - if (this.iter === null) { + async reset( + opts: ConsumeOptions | FetchOptions = { + max_messages: 100, + expires: 30_000, + } as ConsumeMessages, + info?: Partial<{ fromFetch: boolean; orderedReset: boolean }>, + ): Promise { + info = info || {}; + // this is known to be directly related to a pull + const fromFetch = info.fromFetch || false; + // a sequence order caused the reset + const orderedReset = info.orderedReset || false; + + if (this.type === PullConsumerType.Fetch && orderedReset) { + // the fetch pull simply needs to end the iterator + this.iter?.src.stop(); + await this.iter?.closed(); + this.currentConsumer = null; + return; + } + + if (this.currentConsumer === null || orderedReset) { + this.currentConsumer = await this.resetConsumer( + this.cursor.stream_seq + 1, + ); + } + + // if we don't have an iterator, or it is a fetch request + // we create the iterator - otherwise this is a reset that is happening + // while the OC is active, so simply bind the new OC to current iterator. + if (this.iter === null || fromFetch) { this.iter = new OrderedConsumerMessages(); } this.consumer = new PullConsumerImpl(this.api, this.currentConsumer); @@ -1144,19 +1172,21 @@ export class OrderedPullConsumerImpl implements Consumer { msgs = await this.consumer.fetch(opts); } else if (this.type === PullConsumerType.Consume) { msgs = await this.consumer.consume(opts); - } else { - return Promise.reject("reset called with unset consumer type"); } const msgsImpl = msgs as PullConsumerMessagesImpl; msgsImpl.forOrderedConsumer = true; msgsImpl.resetHandler = () => { - this.reset(this.opts); + this.notifyOrderedResetAndReset(); }; this.iter.setSource(msgsImpl); - return this.iter; } - consume(opts: ConsumeOptions = { + notifyOrderedResetAndReset() { + this.iter?.notify(ConsumerDebugEvents.Reset, ""); + this.reset(this.opts, { orderedReset: true }); + } + + async consume(opts: ConsumeOptions = { max_messages: 100, expires: 30_000, } as ConsumeMessages): Promise { @@ -1178,10 +1208,11 @@ export class OrderedPullConsumerImpl implements Consumer { } this.type = PullConsumerType.Consume; this.opts = opts; - return this.reset(opts); + await this.reset(opts); + return this.iter!; } - fetch( + async fetch( opts: FetchOptions = { max_messages: 100, expires: 30_000 }, ): Promise { const copts = opts as ConsumeOptions; @@ -1208,8 +1239,8 @@ export class OrderedPullConsumerImpl implements Consumer { } this.type = PullConsumerType.Fetch; this.opts = opts; - this.iter = new OrderedConsumerMessages(); - return this.reset(opts, true); + await this.reset(opts, { fromFetch: true }); + return this.iter!; } async next( diff --git a/jetstream/tests/consumers_ordered_test.ts b/jetstream/tests/consumers_ordered_test.ts index 1d44a3f6..27bca953 100644 --- a/jetstream/tests/consumers_ordered_test.ts +++ b/jetstream/tests/consumers_ordered_test.ts @@ -21,7 +21,12 @@ import { assertRejects, assertStringIncludes, } from "https://deno.land/std@0.221.0/assert/mod.ts"; -import { DeliverPolicy, JsMsg } from "../mod.ts"; +import { + ConsumerDebugEvents, + ConsumerMessages, + DeliverPolicy, + JsMsg, +} from "../mod.ts"; import { OrderedConsumerMessages, OrderedPullConsumerImpl, @@ -89,87 +94,6 @@ Deno.test("ordered consumers - fetch", async () => { await cleanup(ns, nc); }); -Deno.test("ordered consumers - fetch reset", async () => { - const { ns, nc } = await setup(jetstreamServerConf()); - const js = nc.jetstream(); - - const jsm = await nc.jetstreamManager(); - await jsm.streams.add({ name: "test", subjects: ["test.*"] }); - await js.publish("test.a"); - await js.publish("test.b"); - await js.publish("test.c"); - - const oc = await js.consumers.get("test") as OrderedPullConsumerImpl; - assertExists(oc); - - const seen: number[] = new Array(3).fill(0); - let done = deferred(); - - const callback = (m: JsMsg) => { - const idx = m.seq - 1; - seen[idx]++; - // mess with the internals so we see these again - if (seen[idx] === 1) { - oc.cursor.deliver_seq--; - oc.cursor.stream_seq--; - } - iter.stop(); - done.resolve(); - }; - - let iter = await oc.fetch({ - max_messages: 1, - //@ts-ignore: callback not exposed - callback, - }); - await done; - done = deferred(); - - iter = await oc.fetch({ - max_messages: 1, - //@ts-ignore: callback not exposed - callback, - }); - await done; - done = deferred(); - - iter = await oc.fetch({ - max_messages: 1, - //@ts-ignore: callback not exposed - callback, - }); - await done; - done = deferred(); - - iter = await oc.fetch({ - max_messages: 1, - //@ts-ignore: callback not exposed - callback, - }); - await done; - done = deferred(); - - iter = await oc.fetch({ - max_messages: 1, - //@ts-ignore: callback not exposed - callback, - }); - await done; - done = deferred(); - - iter = await oc.fetch({ - max_messages: 1, - //@ts-ignore: callback not exposed - callback, - }); - await done; - - assertEquals(seen, [2, 2, 2]); - assertEquals(oc.serial, 6); - - await cleanup(ns, nc); -}); - Deno.test("ordered consumers - consume reset", async () => { const { ns, nc } = await setup(jetstreamServerConf()); const js = nc.jetstream(); @@ -978,3 +902,153 @@ Deno.test("ordered consumers - name prefix", async () => { await cleanup(ns, nc); }); + +Deno.test("ordered consumers - fetch reset", async () => { + const { ns, nc } = await setup(jetstreamServerConf()); + const jsm = await nc.jetstreamManager(); + + await jsm.streams.add({ name: "A", subjects: ["a"] }); + const js = nc.jetstream(); + await js.publish("a", JSON.stringify(1)); + + const c = await js.consumers.get("A") as OrderedPullConsumerImpl; + + let resets = 0; + function countResets(iter: ConsumerMessages): Promise { + return (async () => { + for await (const s of await iter.status()) { + if (s.type === ConsumerDebugEvents.Reset) { + resets++; + } + } + })(); + } + + // after the first message others will get published + let iter = await c.fetch({ max_messages: 10, expires: 3_000 }); + const first = countResets(iter); + const sequences = []; + for await (const m of iter) { + sequences.push(m.json()); + // mess with the internal state to cause a reset + if (m.seq === 1) { + c.cursor.deliver_seq = 3; + const buf = []; + for (let i = 2; i < 20; i++) { + buf.push(js.publish("a", JSON.stringify(i))); + } + await Promise.all(buf); + } + } + + iter = await c.fetch({ max_messages: 10, expires: 2_000 }); + const second = countResets(iter); + + const done = (async () => { + for await (const m of iter) { + sequences.push(m.json()); + } + })().catch(); + + await Promise.all([first, second, done]); + assertEquals(c.serial, 2); + assertEquals(resets, 1); + assertEquals(sequences, [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]); + await cleanup(ns, nc); +}); + +Deno.test("ordered consumers - consume reset", async () => { + const { ns, nc } = await setup(jetstreamServerConf()); + const jsm = await nc.jetstreamManager(); + + await jsm.streams.add({ name: "A", subjects: ["a"] }); + const js = nc.jetstream(); + await js.publish("a", JSON.stringify(1)); + + let resets = 0; + function countResets(iter: ConsumerMessages): Promise { + return (async () => { + for await (const s of await iter.status()) { + if (s.type === ConsumerDebugEvents.Reset) { + resets++; + } + } + })(); + } + + const c = await js.consumers.get("A") as OrderedPullConsumerImpl; + + // after the first message others will get published + let iter = await c.consume({ max_messages: 11, expires: 5000 }); + countResets(iter).catch(); + const sequences = []; + for await (const m of iter) { + sequences.push(m.json()); + // mess with the internal state to cause a reset + if (m.seq === 1) { + c.cursor.deliver_seq = 3; + const buf = []; + for (let i = 2; i < 20; i++) { + buf.push(js.publish("a", JSON.stringify(i))); + } + await Promise.all(buf); + } + if (m.seq === 11) { + break; + } + } + + assertEquals(c.serial, 2); + assertEquals(resets, 1); + assertEquals(sequences, [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]); + + await cleanup(ns, nc); +}); + +Deno.test("ordered consumers - next reset", async () => { + const { ns, nc } = await setup(jetstreamServerConf()); + const jsm = await nc.jetstreamManager(); + + await jsm.streams.add({ name: "A", subjects: ["a"] }); + const js = nc.jetstream(); + await js.publish("a", JSON.stringify(1)); + await js.publish("a", JSON.stringify(2)); + + const c = await js.consumers.get("A") as OrderedPullConsumerImpl; + + // get the first + let m = await c.next({ expires: 1000 }); + assertExists(m); + assertEquals(m.json(), 1); + + // force a reset + c.cursor.deliver_seq = 3; + await js.publish("a", JSON.stringify(2)); + + m = await c.next({ expires: 1000 }); + assertEquals(m, null); + assertEquals(c.serial, 1); + + await cleanup(ns, nc); +}); + +Deno.test("ordered consumers - next reset", async () => { + const { ns, nc } = await setup(jetstreamServerConf()); + const jsm = await nc.jetstreamManager(); + + await jsm.streams.add({ name: "A", subjects: ["a"] }); + const js = nc.jetstream(); + + await js.publish("a", JSON.stringify(1)); + await js.publish("a", JSON.stringify(2)); + + const c = await js.consumers.get("A") as OrderedPullConsumerImpl; + await c.next(); + await c.next(); + + assertEquals(c.serial, 1); + await c.info(); + assertEquals(c.serial, 1); + + await cleanup(ns, nc); +});