From 9c5ee0afc2a13b011386020ff89c5911279496a2 Mon Sep 17 00:00:00 2001 From: Alberto Ricart Date: Mon, 15 May 2023 17:11:04 -0500 Subject: [PATCH] [WIP] stop strategy for consumer --- nats-base-client/consumer.ts | 3 +- nats-base-client/consumermessages.ts | 63 +++++++++++++++++++++- nats-base-client/types.ts | 24 ++++++--- tests/consumers_test.ts | 78 ++++++++++++++++++++++++++++ tests/consumersordered_test.ts | 38 +++++++++++++- tests/jetstream_test.ts | 4 +- 6 files changed, 195 insertions(+), 15 deletions(-) diff --git a/nats-base-client/consumer.ts b/nats-base-client/consumer.ts index c34494a7..d7c86bc4 100644 --- a/nats-base-client/consumer.ts +++ b/nats-base-client/consumer.ts @@ -23,6 +23,7 @@ import { ConsumerEvents, ConsumerInfo, ConsumerMessages, + ConsumeStop, DeliverPolicy, FetchMessages, FetchOptions, @@ -161,7 +162,7 @@ export type OrderedConsumerOptions = { opt_start_time: string; replay_policy: ReplayPolicy; inactive_threshold: number; -}; +} & ConsumeStop; export class OrderedPullConsumerImpl implements Consumer { api: ConsumerAPIImpl; diff --git a/nats-base-client/consumermessages.ts b/nats-base-client/consumermessages.ts index cfbe4d87..22e1e504 100644 --- a/nats-base-client/consumermessages.ts +++ b/nats-base-client/consumermessages.ts @@ -26,6 +26,8 @@ import { ConsumerCallbackFn, ConsumerDebugEvents, ConsumerEvents, + ConsumeStop, + ConsumeStopStrategy, Events, JsMsg, MsgHdrs, @@ -55,6 +57,9 @@ export class PullConsumerMessagesImpl extends QueuedIteratorImpl timeout: Timeout | null; cleanupHandler?: () => void; listeners: QueuedIterator[]; + stopStrategy?: ConsumeStopStrategy; + stopStrategyFn?: (m: JsMsg) => boolean; + noMore: boolean; // callback: ConsumerCallbackFn; constructor( @@ -67,6 +72,7 @@ export class PullConsumerMessagesImpl extends QueuedIteratorImpl this.opts = this.parseOptions(opts, refilling); this.callback = (opts as ConsumeCallback).callback || null; + this.noMore = false; this.noIterator = typeof this.callback === "function"; this.monitor = null; this.pong = null; @@ -77,6 +83,31 @@ export class PullConsumerMessagesImpl extends QueuedIteratorImpl this.inbox = createInbox(); this.listeners = []; + const stopOpts = opts as ConsumeStop; + this.stopStrategy = stopOpts.strategy; + switch (this.stopStrategy) { + case ConsumeStopStrategy.NoMessages: + { + this.stopStrategyFn = (m): boolean => { + return m.info.pending === 0; + }; + } + break; + case ConsumeStopStrategy.Sequence: + { + if (typeof stopOpts.arg !== "number") { + throw new Error("stop strategy args is not a number"); + } + const seq = stopOpts.arg; + this.stopStrategyFn = (m): boolean => { + return m.seq >= seq; + }; + } + break; + default: + // nothing + } + const { max_messages, max_bytes, @@ -160,7 +191,8 @@ export class PullConsumerMessagesImpl extends QueuedIteratorImpl } } else { // push the user message - this._push(toJsMsg(msg)); + const jsMsg = toJsMsg(msg); + this._push(jsMsg); this.received++; if (this.pending.msgs) { this.pending.msgs--; @@ -168,13 +200,23 @@ export class PullConsumerMessagesImpl extends QueuedIteratorImpl if (this.pending.bytes) { this.pending.bytes -= (msg as MsgImpl).size(); } + // if we are just processing a stream, end when we get the last message + if (this.stopStrategyFn && this.refilling) { + this.noMore = this.stopStrategyFn(jsMsg); + } } // if we don't have pending bytes/messages we are done or starving if (this.pending.msgs === 0 && this.pending.bytes === 0) { this.pending.requests = 0; } - if (this.refilling) { + + if (this.noMore) { + // @ts-ignore: we are pushing the pull fn + this._push(() => { + this.stop(); + }); + } else if (this.refilling) { // FIXME: this could result in 1/4 = 0 if ( (max_messages && @@ -239,6 +281,23 @@ export class PullConsumerMessagesImpl extends QueuedIteratorImpl } })(); + if (this.refilling && this.stopStrategyFn) { + // this is the initial pull if we have a consume, but we are + // checking pending, we want to verify that the info on the + // consumer is up-to-date, and whether we have any pending + // messages + this.consumer.info() + .then((ci) => { + if (ci.num_pending === 0) { + //@ts-ignore: this is a termination + this._push(() => { + this.stop(); + }); + } + }) + .catch(); + } + // this is the initial pull this.pull(this.pullOptions()); } diff --git a/nats-base-client/types.ts b/nats-base-client/types.ts index 7cd640b3..cd3d43cb 100644 --- a/nats-base-client/types.ts +++ b/nats-base-client/types.ts @@ -2647,10 +2647,6 @@ export interface ConsumerUpdateConfig { metadata?: Record; } -export type Ordered = { - ordered: true; -}; - export type NextOptions = Expires; export type ConsumeBytes = @@ -2659,19 +2655,21 @@ export type ConsumeBytes = & ThresholdBytes & Expires & IdleHeartbeat - & ConsumeCallback; + & ConsumeCallback + & ConsumeStop; export type ConsumeMessages = & Partial & ThresholdMessages & Expires & IdleHeartbeat - & ConsumeCallback; + & ConsumeCallback + & ConsumeStop; export type ConsumeOptions = ConsumeBytes | ConsumeMessages; /** - * Options for fetching + * Options for fetching bytes */ export type FetchBytes = & MaxBytes @@ -2680,7 +2678,7 @@ export type FetchBytes = & IdleHeartbeat; /** - * Options for a c + * Options for fetching messages */ export type FetchMessages = & Partial @@ -2738,6 +2736,16 @@ export type Expires = { expires?: number; }; +export enum ConsumeStopStrategy { + NoMessages, + Sequence, +} + +export type ConsumeStop = { + strategy?: ConsumeStopStrategy; + arg?: number; +}; + export type IdleHeartbeat = { /** * Number of milliseconds to wait for a server heartbeat when not actively receiving diff --git a/tests/consumers_test.ts b/tests/consumers_test.ts index 3dea2881..7f09354c 100644 --- a/tests/consumers_test.ts +++ b/tests/consumers_test.ts @@ -30,6 +30,7 @@ import { ConsumerEvents, ConsumerMessages, ConsumerStatus, + ConsumeStopStrategy, Empty, NatsConnection, PubAck, @@ -836,3 +837,80 @@ Deno.test("consumers - next", async () => { await cleanup(ns, nc); }); + +Deno.test("consumer - check pending initial", async () => { + const { ns, nc } = await setup(jetstreamServerConf()); + const { stream } = await initStream(nc); + + const jsm = await nc.jetstreamManager(); + await jsm.consumers.add(stream, { + durable_name: stream, + ack_policy: AckPolicy.Explicit, + }); + + const js = nc.jetstream(); + const c = await js.consumers.get(stream, stream); + const iter = await c.consume({ strategy: ConsumeStopStrategy.NoMessages }); + await (async () => { + for await (const m of iter) { + m.ack(); + } + })().then(); + + assertEquals(iter.getProcessed(), 0); + + await cleanup(ns, nc); +}); + +Deno.test("consumer - check pending ordered initial", async () => { + const { ns, nc } = await setup(jetstreamServerConf()); + const { stream } = await initStream(nc); + + const jsm = await nc.jetstreamManager(); + await jsm.consumers.add(stream, { + durable_name: stream, + ack_policy: AckPolicy.Explicit, + }); + + const js = nc.jetstream(); + const c = await js.consumers.get(stream); + const iter = await c.consume({ strategy: ConsumeStopStrategy.NoMessages }); + await (async () => { + for await (const m of iter) { + m.ack(); + } + })().then(); + + assertEquals(iter.getProcessed(), 0); + + await cleanup(ns, nc); +}); + +Deno.test("consumer - check pending", async () => { + const { ns, nc } = await setup(jetstreamServerConf()); + const { stream, subj } = await initStream(nc); + + const jsm = await nc.jetstreamManager(); + await jsm.consumers.add(stream, { + durable_name: stream, + ack_policy: AckPolicy.Explicit, + }); + + const js = nc.jetstream(); + await Promise.all( + Array.from({ length: 10 }).map(() => { + return js.publish(subj); + }), + ); + + const c = await js.consumers.get(stream); + const iter = await c.consume({ strategy: ConsumeStopStrategy.NoMessages }); + await (async () => { + for await (const m of iter) { + m.ack(); + } + })().then(); + assertEquals(iter.getProcessed(), 10); + + await cleanup(ns, nc); +}); diff --git a/tests/consumersordered_test.ts b/tests/consumersordered_test.ts index df51e35e..81afc177 100644 --- a/tests/consumersordered_test.ts +++ b/tests/consumersordered_test.ts @@ -13,14 +13,24 @@ * limitations under the License. */ -import { cleanup, jetstreamServerConf, setup } from "./jstest_util.ts"; +import { + cleanup, + initStream, + jetstreamServerConf, + setup, +} from "./jstest_util.ts"; import { assertEquals, assertExists, assertRejects, } from "https://deno.land/std@0.125.0/testing/asserts.ts"; import { OrderedPullConsumerImpl } from "../nats-base-client/consumer.ts"; -import { DeliverPolicy, JsMsg } from "../nats-base-client/types.ts"; +import { + AckPolicy, + ConsumeStopStrategy, + DeliverPolicy, + JsMsg, +} from "../nats-base-client/types.ts"; import { deferred } from "../nats-base-client/mod.ts"; import { notCompatible } from "./helpers/mod.ts"; import { delay } from "../nats-base-client/util.ts"; @@ -608,3 +618,27 @@ Deno.test("ordered - next", async () => { await cleanup(ns, nc); }); + +Deno.test("consumer - check pending ordered initial", async () => { + const { ns, nc } = await setup(jetstreamServerConf()); + const { stream } = await initStream(nc); + + const jsm = await nc.jetstreamManager(); + await jsm.consumers.add(stream, { + durable_name: stream, + ack_policy: AckPolicy.Explicit, + }); + + const js = nc.jetstream(); + const c = await js.consumers.get(stream); + const iter = await c.consume({ strategy: ConsumeStopStrategy.NoMessages }); + await (async () => { + for await (const _m of iter) { + // nothing + } + })().then(); + + assertEquals(iter.getProcessed(), 0); + + await cleanup(ns, nc); +}); diff --git a/tests/jetstream_test.ts b/tests/jetstream_test.ts index 4dd7fdd1..eb3f19ce 100644 --- a/tests/jetstream_test.ts +++ b/tests/jetstream_test.ts @@ -1125,7 +1125,7 @@ Deno.test("jetstream - fetch one - no wait breaks fast", async () => { await done; sw.mark(); - assert(25 > sw.duration()); + assertBetween(sw.duration(), 0, 500); assertEquals(batch.getReceived(), 1); await cleanup(ns, nc); }); @@ -1159,7 +1159,7 @@ Deno.test("jetstream - fetch none - cancel timers", async () => { await done; sw.mark(); - assert(25 > sw.duration()); + assertBetween(sw.duration(), 0, 500); assertEquals(batch.getReceived(), 0); await cleanup(ns, nc); });