diff --git a/nats-base-client/consumer.ts b/nats-base-client/consumer.ts index c34494a7..d7c86bc4 100644 --- a/nats-base-client/consumer.ts +++ b/nats-base-client/consumer.ts @@ -23,6 +23,7 @@ import { ConsumerEvents, ConsumerInfo, ConsumerMessages, + ConsumeStop, DeliverPolicy, FetchMessages, FetchOptions, @@ -161,7 +162,7 @@ export type OrderedConsumerOptions = { opt_start_time: string; replay_policy: ReplayPolicy; inactive_threshold: number; -}; +} & ConsumeStop; export class OrderedPullConsumerImpl implements Consumer { api: ConsumerAPIImpl; diff --git a/nats-base-client/consumermessages.ts b/nats-base-client/consumermessages.ts index cfbe4d87..8125f616 100644 --- a/nats-base-client/consumermessages.ts +++ b/nats-base-client/consumermessages.ts @@ -26,6 +26,8 @@ import { ConsumerCallbackFn, ConsumerDebugEvents, ConsumerEvents, + ConsumeStop, + ConsumeStopStrategy, Events, JsMsg, MsgHdrs, @@ -55,6 +57,9 @@ export class PullConsumerMessagesImpl extends QueuedIteratorImpl timeout: Timeout | null; cleanupHandler?: () => void; listeners: QueuedIterator[]; + stopStrategy?: ConsumeStopStrategy; + stopStrategyFn?: (m: JsMsg) => boolean; + noMore: boolean; // callback: ConsumerCallbackFn; constructor( @@ -67,6 +72,7 @@ export class PullConsumerMessagesImpl extends QueuedIteratorImpl this.opts = this.parseOptions(opts, refilling); this.callback = (opts as ConsumeCallback).callback || null; + this.noMore = false; this.noIterator = typeof this.callback === "function"; this.monitor = null; this.pong = null; @@ -77,6 +83,31 @@ export class PullConsumerMessagesImpl extends QueuedIteratorImpl this.inbox = createInbox(); this.listeners = []; + const stopOpts = opts as ConsumeStop; + this.stopStrategy = stopOpts.strategy; + switch (this.stopStrategy) { + case ConsumeStopStrategy.NoMessages: + { + this.stopStrategyFn = (m): boolean => { + return m.info.pending === 0; + }; + } + break; + case ConsumeStopStrategy.Sequence: + { + if (typeof stopOpts.arg !== "number") { + throw new Error("stop strategy args is not a number"); + } + const seq = stopOpts.arg; + this.stopStrategyFn = (m): boolean => { + return m.seq >= seq; + }; + } + break; + default: + // nothing + } + const { max_messages, max_bytes, @@ -102,6 +133,9 @@ export class PullConsumerMessagesImpl extends QueuedIteratorImpl this.sub = c.api.nc.subscribe(this.inbox, { callback: (err, msg) => { + if (this.noMore) { + return; + } if (err) { // this is possibly only a permissions error which means // that the server rejected (eliminating the sub) @@ -144,10 +178,7 @@ export class PullConsumerMessagesImpl extends QueuedIteratorImpl // we got a bad request - no progress here if (code === 400) { const error = toErr(); - //@ts-ignore: fn - this._push(() => { - this.stop(error); - }); + this.stop(error); } else if (code === 409 && description === "consumer deleted") { const error = toErr(); this.stop(error); @@ -160,7 +191,8 @@ export class PullConsumerMessagesImpl extends QueuedIteratorImpl } } else { // push the user message - this._push(toJsMsg(msg)); + const jsMsg = toJsMsg(msg); + this._push(jsMsg); this.received++; if (this.pending.msgs) { this.pending.msgs--; @@ -168,13 +200,20 @@ export class PullConsumerMessagesImpl extends QueuedIteratorImpl if (this.pending.bytes) { this.pending.bytes -= (msg as MsgImpl).size(); } + // if we are just processing a stream, end when we get the last message + if (this.stopStrategyFn) { + this.noMore = this.stopStrategyFn(jsMsg); + } } // if we don't have pending bytes/messages we are done or starving if (this.pending.msgs === 0 && this.pending.bytes === 0) { this.pending.requests = 0; } - if (this.refilling) { + + if (this.noMore) { + this.stop(); + } else if (this.refilling) { // FIXME: this could result in 1/4 = 0 if ( (max_messages && @@ -239,6 +278,20 @@ export class PullConsumerMessagesImpl extends QueuedIteratorImpl } })(); + if (this.refilling && this.stopStrategyFn) { + // this is the initial pull if we have a consume, but we are + // checking pending, we want to verify that the info on the + // consumer is up-to-date, and whether we have any pending + // messages + this.consumer.info() + .then((ci) => { + if (ci.num_pending === 0) { + this.stop(); + } + }) + .catch(); + } + // this is the initial pull this.pull(this.pullOptions()); } diff --git a/nats-base-client/types.ts b/nats-base-client/types.ts index 7cd640b3..cd3d43cb 100644 --- a/nats-base-client/types.ts +++ b/nats-base-client/types.ts @@ -2647,10 +2647,6 @@ export interface ConsumerUpdateConfig { metadata?: Record; } -export type Ordered = { - ordered: true; -}; - export type NextOptions = Expires; export type ConsumeBytes = @@ -2659,19 +2655,21 @@ export type ConsumeBytes = & ThresholdBytes & Expires & IdleHeartbeat - & ConsumeCallback; + & ConsumeCallback + & ConsumeStop; export type ConsumeMessages = & Partial & ThresholdMessages & Expires & IdleHeartbeat - & ConsumeCallback; + & ConsumeCallback + & ConsumeStop; export type ConsumeOptions = ConsumeBytes | ConsumeMessages; /** - * Options for fetching + * Options for fetching bytes */ export type FetchBytes = & MaxBytes @@ -2680,7 +2678,7 @@ export type FetchBytes = & IdleHeartbeat; /** - * Options for a c + * Options for fetching messages */ export type FetchMessages = & Partial @@ -2738,6 +2736,16 @@ export type Expires = { expires?: number; }; +export enum ConsumeStopStrategy { + NoMessages, + Sequence, +} + +export type ConsumeStop = { + strategy?: ConsumeStopStrategy; + arg?: number; +}; + export type IdleHeartbeat = { /** * Number of milliseconds to wait for a server heartbeat when not actively receiving diff --git a/tests/consumers_test.ts b/tests/consumers_test.ts index 3dea2881..40e452a2 100644 --- a/tests/consumers_test.ts +++ b/tests/consumers_test.ts @@ -25,11 +25,13 @@ import { } from "https://deno.land/std@0.75.0/testing/asserts.ts"; import { AckPolicy, + ConsumeOptions, Consumer, ConsumerDebugEvents, ConsumerEvents, ConsumerMessages, ConsumerStatus, + ConsumeStopStrategy, Empty, NatsConnection, PubAck, @@ -836,3 +838,58 @@ Deno.test("consumers - next", async () => { await cleanup(ns, nc); }); + +async function testConsumerStop( + conf: ConsumeOptions, + count: number, +): Promise { + const { ns, nc } = await setup(jetstreamServerConf()); + const { stream, subj } = await initStream(nc); + + const jsm = await nc.jetstreamManager(); + await jsm.consumers.add(stream, { + durable_name: stream, + ack_policy: AckPolicy.Explicit, + }); + + const js = nc.jetstream(); + await Promise.all( + Array.from({ length: count }).map(() => { + return js.publish(subj); + }), + ); + + const c = await js.consumers.get(stream, stream); + const iter = await c.consume(conf); + await (async () => { + for await (const m of iter) { + m.ack(); + } + })().then(); + + await cleanup(ns, nc); + + return iter; +} + +Deno.test("consumer - stopStrategy empty", async () => { + const messages = await testConsumerStop({ + strategy: ConsumeStopStrategy.NoMessages, + }, 0); + assertEquals(messages.getProcessed(), 0); +}); + +Deno.test("consumer - stopStrategy all ", async () => { + const messages = await testConsumerStop({ + strategy: ConsumeStopStrategy.NoMessages, + }, 10); + assertEquals(messages.getProcessed(), 10); +}); + +Deno.test("consumer - stopStrategy sequence", async () => { + const messages = await testConsumerStop({ + strategy: ConsumeStopStrategy.Sequence, + arg: 5, + }, 10); + assertEquals(messages.getProcessed(), 5); +}); diff --git a/tests/consumersordered_test.ts b/tests/consumersordered_test.ts index df51e35e..ac8c1c4e 100644 --- a/tests/consumersordered_test.ts +++ b/tests/consumersordered_test.ts @@ -13,14 +13,26 @@ * limitations under the License. */ -import { cleanup, jetstreamServerConf, setup } from "./jstest_util.ts"; +import { + cleanup, + initStream, + jetstreamServerConf, + setup, +} from "./jstest_util.ts"; import { assertEquals, assertExists, assertRejects, } from "https://deno.land/std@0.125.0/testing/asserts.ts"; import { OrderedPullConsumerImpl } from "../nats-base-client/consumer.ts"; -import { DeliverPolicy, JsMsg } from "../nats-base-client/types.ts"; +import { + AckPolicy, + ConsumeOptions, + ConsumerMessages, + ConsumeStopStrategy, + DeliverPolicy, + JsMsg, +} from "../nats-base-client/types.ts"; import { deferred } from "../nats-base-client/mod.ts"; import { notCompatible } from "./helpers/mod.ts"; import { delay } from "../nats-base-client/util.ts"; @@ -608,3 +620,58 @@ Deno.test("ordered - next", async () => { await cleanup(ns, nc); }); + +async function testOrderedConsumerStop( + conf: ConsumeOptions, + count: number, +): Promise { + const { ns, nc } = await setup(jetstreamServerConf()); + const { stream, subj } = await initStream(nc); + + const jsm = await nc.jetstreamManager(); + await jsm.consumers.add(stream, { + durable_name: stream, + ack_policy: AckPolicy.Explicit, + }); + + const js = nc.jetstream(); + await Promise.all( + Array.from({ length: count }).map(() => { + return js.publish(subj); + }), + ); + + const c = await js.consumers.get(stream); + const iter = await c.consume(conf); + await (async () => { + for await (const m of iter) { + m.ack(); + } + })().then(); + + await cleanup(ns, nc); + + return iter; +} + +Deno.test("ordered - stopStrategy empty", async () => { + const messages = await testOrderedConsumerStop({ + strategy: ConsumeStopStrategy.NoMessages, + }, 0); + assertEquals(messages.getProcessed(), 0); +}); + +Deno.test("ordered - stopStrategy all ", async () => { + const messages = await testOrderedConsumerStop({ + strategy: ConsumeStopStrategy.NoMessages, + }, 10); + assertEquals(messages.getProcessed(), 10); +}); + +Deno.test("ordered - stopStrategy sequence", async () => { + const messages = await testOrderedConsumerStop({ + strategy: ConsumeStopStrategy.Sequence, + arg: 5, + }, 10); + assertEquals(messages.getProcessed(), 5); +}); diff --git a/tests/jetstream_test.ts b/tests/jetstream_test.ts index 4dd7fdd1..eb3f19ce 100644 --- a/tests/jetstream_test.ts +++ b/tests/jetstream_test.ts @@ -1125,7 +1125,7 @@ Deno.test("jetstream - fetch one - no wait breaks fast", async () => { await done; sw.mark(); - assert(25 > sw.duration()); + assertBetween(sw.duration(), 0, 500); assertEquals(batch.getReceived(), 1); await cleanup(ns, nc); }); @@ -1159,7 +1159,7 @@ Deno.test("jetstream - fetch none - cancel timers", async () => { await done; sw.mark(); - assert(25 > sw.duration()); + assertBetween(sw.duration(), 0, 500); assertEquals(batch.getReceived(), 0); await cleanup(ns, nc); });