diff --git a/jetstream/consumer.ts b/jetstream/consumer.ts index e39b283a..e0974f55 100644 --- a/jetstream/consumer.ts +++ b/jetstream/consumer.ts @@ -22,7 +22,7 @@ import { } from "../nats-base-client/util.ts"; import { ConsumerAPI, ConsumerAPIImpl } from "./jsmconsumer_api.ts"; import { nuid } from "../nats-base-client/nuid.ts"; -import { isHeartbeatMsg } from "./jsutil.ts"; +import { isHeartbeatMsg, minValidation } from "./jsutil.ts"; import { QueuedIteratorImpl } from "../nats-base-client/queued_iterator.ts"; import { createInbox, @@ -948,6 +948,7 @@ export class PullConsumerImpl implements Consumer { * {@link ConsumerUpdateConfig} */ export type OrderedConsumerOptions = { + name_prefix: string; filterSubjects: string[] | string; deliver_policy: DeliverPolicy; opt_start_seq: number; @@ -981,6 +982,11 @@ export class OrderedPullConsumerImpl implements Consumer { this.stream = stream; this.cursor = { stream_seq: 1, deliver_seq: 0 }; this.namePrefix = nuid.next(); + if (typeof opts.name_prefix === "string") { + // make sure the prefix is valid + minValidation("name_prefix", opts.name_prefix); + this.namePrefix = opts.name_prefix + this.namePrefix; + } this.serial = 0; this.currentConsumer = null; this.userCallback = null; diff --git a/jetstream/tests/consumers_ordered_test.ts b/jetstream/tests/consumers_ordered_test.ts index bfe80942..1d44a3f6 100644 --- a/jetstream/tests/consumers_ordered_test.ts +++ b/jetstream/tests/consumers_ordered_test.ts @@ -15,6 +15,7 @@ import { initStream } from "./jstest_util.ts"; import { + assert, assertEquals, assertExists, assertRejects, @@ -947,3 +948,33 @@ Deno.test("ordered consumers - bind is rejected", async () => { await cleanup(ns, nc); }); + +Deno.test("ordered consumers - name prefix", async () => { + const { ns, nc } = await setup(jetstreamServerConf()); + + const jsm = await nc.jetstreamManager(); + await jsm.streams.add({ name: "A", subjects: ["a"] }); + + const js = nc.jetstream(); + const c = await js.consumers.get("A", { name_prefix: "hello" }); + const ci = await c.info(true); + assert(ci.name.startsWith("hello")); + + await assertRejects( + () => { + return js.consumers.get("A", { name_prefix: "" }); + }, + Error, + "name_prefix name required", + ); + + await assertRejects( + () => { + return js.consumers.get("A", { name_prefix: "one.two" }); + }, + Error, + "invalid name_prefix name - name_prefix name cannot contain '.'", + ); + + await cleanup(ns, nc); +});