From c9f92794327a52e13fe97d87def944c8f9a35af2 Mon Sep 17 00:00:00 2001 From: Alberto Ricart Date: Tue, 7 Nov 2023 16:06:54 -0600 Subject: [PATCH] [FEAT] exported consumer - this allows getting a Consumer without performing any JSM operations on the server --- README.md | 3 +- jetstream/internal_mod.ts | 1 + jetstream/jsmstream_api.ts | 14 ++++++ jetstream/mod.ts | 1 + jetstream/tests/consumers_test.ts | 72 +++++++++++++++++++++++++++++++ jetstream/types.ts | 17 +++++++- 6 files changed, 106 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index 0e6b1171..28b5b1c7 100644 --- a/README.md +++ b/README.md @@ -31,7 +31,8 @@ point into the JS Doc is the [NatsConnection](https://nats-io.github.io/nats.deno/interfaces/NatsConnection.html) all functionality starts with a connection. -**Check out [NATS by example](https://natsbyexample.com) - An evolving collection of runnable, cross-client reference examples for NATS.** +**Check out [NATS by example](https://natsbyexample.com) - An evolving +collection of runnable, cross-client reference examples for NATS.** ## Basics diff --git a/jetstream/internal_mod.ts b/jetstream/internal_mod.ts index 1764d54a..f5af95ac 100644 --- a/jetstream/internal_mod.ts +++ b/jetstream/internal_mod.ts @@ -140,6 +140,7 @@ export type { ConsumerMessages, ConsumerStatus, Expires, + ExportedConsumer, FetchBytes, FetchMessages, FetchOptions, diff --git a/jetstream/jsmstream_api.ts b/jetstream/jsmstream_api.ts index a16b846f..d08613e2 100644 --- a/jetstream/jsmstream_api.ts +++ b/jetstream/jsmstream_api.ts @@ -41,6 +41,7 @@ import { } from "../nats-base-client/core.ts"; import { ApiPagedRequest, + ConsumerInfo, ExternalStream, MsgDeleteRequest, MsgRequest, @@ -60,6 +61,7 @@ import { } from "./jsapi_types.ts"; import { Consumer, + ExportedConsumer, OrderedConsumerOptions, OrderedPullConsumerImpl, PullConsumerImpl, @@ -132,6 +134,18 @@ export class ConsumersImpl implements Consumers { }); } + async bind( + stream: string, + name: string, + ): Promise { + await this.checkVersion(); + const ci: ConsumerInfo = { + stream_name: stream, + name: name, + } as unknown as ConsumerInfo; + return Promise.resolve(new PullConsumerImpl(this.api, ci)); + } + async ordered( stream: string, opts?: Partial, diff --git a/jetstream/mod.ts b/jetstream/mod.ts index 53e4984d..1d942e3c 100644 --- a/jetstream/mod.ts +++ b/jetstream/mod.ts @@ -61,6 +61,7 @@ export type { DeliveryInfo, Destroyable, Expires, + ExportedConsumer, ExternalStream, FetchBytes, FetchMessages, diff --git a/jetstream/tests/consumers_test.ts b/jetstream/tests/consumers_test.ts index 3b4b7778..e79d4b4f 100644 --- a/jetstream/tests/consumers_test.ts +++ b/jetstream/tests/consumers_test.ts @@ -40,6 +40,7 @@ import { ConsumerDebugEvents, ConsumerEvents, ConsumerStatus, + PullConsumerImpl, PullConsumerMessagesImpl, } from "../consumer.ts"; @@ -593,3 +594,74 @@ Deno.test("consumers - inboxPrefix is respected", async () => { await done; await cleanup(ns, nc); }); + +Deno.test("consumers - bind", async () => { + const { ns, nc } = await setup(jetstreamServerConf()); + const jsm = await nc.jetstreamManager(); + await jsm.streams.add({ name: "messages", subjects: ["hello"] }); + + const js = nc.jetstream(); + await js.publish("hello"); + await js.publish("hello"); + + await jsm.consumers.add("messages", { + durable_name: "a", + deliver_policy: DeliverPolicy.All, + ack_policy: AckPolicy.Explicit, + ack_wait: nanos(3000), + max_waiting: 500, + }); + + await jsm.consumers.add("messages", { + durable_name: "b", + deliver_policy: DeliverPolicy.All, + ack_policy: AckPolicy.Explicit, + ack_wait: nanos(3000), + max_waiting: 500, + }); + + await jsm.consumers.add("messages", { + durable_name: "c", + deliver_policy: DeliverPolicy.All, + ack_policy: AckPolicy.Explicit, + ack_wait: nanos(3000), + max_waiting: 500, + }); + + jsm.consumers.info = () => { + throw new Error("exported is not allowed info"); + }; + + let consumer = await js.consumers.bind("messages", "a"); + let pci = consumer as PullConsumerImpl; + pci.info = () => { + throw new Error("exported is not allowed to info"); + }; + let iter = await consumer.consume({ max_messages: 2 }); + for await (const m of iter) { + m.ack(); + if (m.info.pending === 0) { + break; + } + } + + consumer = await js.consumers.bind("messages", "b"); + pci = consumer as PullConsumerImpl; + pci.info = () => { + throw new Error("exported is not allowed to info"); + }; + iter = await consumer.fetch({ max_messages: 2 }); + for await (const m of iter) { + m.ack(); + } + + consumer = await js.consumers.bind("messages", "c"); + pci = consumer as PullConsumerImpl; + pci.info = () => { + throw new Error("exported is not allowed to info"); + }; + assertExists(await consumer.next()); + assertExists(await consumer.next()); + + await cleanup(ns, nc); +}); diff --git a/jetstream/types.ts b/jetstream/types.ts index 587f667e..7ec77618 100644 --- a/jetstream/types.ts +++ b/jetstream/types.ts @@ -13,7 +13,11 @@ * limitations under the License. */ -import { Consumer, OrderedConsumerOptions } from "./consumer.ts"; +import { + Consumer, + ExportedConsumer, + OrderedConsumerOptions, +} from "./consumer.ts"; import { JetStreamOptions, MsgHdrs, @@ -465,6 +469,17 @@ export interface Consumers { stream: string, name?: string | Partial, ): Promise; + + /** + * Returns a Consumer configured for the specified stream and consumer name. Note there + * are no JSAPI lookups or verifications. + * @param stream + * @param name + */ + bind( + stream: string, + name: string, + ): Promise; } export interface ConsumerOpts {