From 8a2ca5f87bb400948f1221a0f538a59f7d077eed Mon Sep 17 00:00:00 2001 From: aricart Date: Thu, 19 Jan 2023 17:11:06 -0400 Subject: [PATCH 1/3] [FEAT] consumer filter_subjects support - this feature is exclusive of the filter_subject but allows a consumer to filter multiple subjects in a stream. To set, specify filter_subjects instead of filter_subject or if using consumer options builder filterSubject() can be called multiple times. [CHANGE] filter_subject (and filter_subjects) can be updated, the server removed this restriction --- nats-base-client/jsclient.ts | 21 +++- nats-base-client/jsconsumeropts.ts | 31 +++++- nats-base-client/jsmconsumer_api.ts | 14 ++- nats-base-client/semver.ts | 17 +++ nats-base-client/types.ts | 15 ++- tests/jetstream_test.ts | 125 ++++++++++++++++++++- tests/jsm_test.ts | 166 +++++++++++++++++++++++++++- 7 files changed, 373 insertions(+), 16 deletions(-) diff --git a/nats-base-client/jsclient.ts b/nats-base-client/jsclient.ts index 11cd3d33..28c443e2 100644 --- a/nats-base-client/jsclient.ts +++ b/nats-base-client/jsclient.ts @@ -1,5 +1,5 @@ /* - * Copyright 2022 The NATS Authors + * Copyright 2022-2023 The NATS Authors * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at @@ -410,9 +410,6 @@ export class JetStreamClientImpl extends BaseApiClient if (cso.ordered) { throw new Error("pull subscribers cannot be be ordered"); } - if (!cso.attached) { - cso.config.filter_subject = subject; - } if (cso.config.deliver_subject) { throw new Error( "consumer info specifies deliver_subject - pull consumers cannot have deliver_subject set", @@ -587,7 +584,11 @@ export class JetStreamClientImpl extends BaseApiClient } } - if (!jsi.attached) { + if ( + !jsi.attached && jsi.config.filter_subject === undefined && + jsi.config.filter_subjects === undefined + ) { + // if no filter specified, we set the subject as the filter jsi.config.filter_subject = subject; } @@ -642,6 +643,16 @@ export class JetStreamClientImpl extends BaseApiClient }, jsi.config); const ci = await this.api.add(jsi.stream, jsi.config); + if ( + Array.isArray( + jsi.config.filter_subjects && !Array.isArray(ci.config.filter_subjects), + ) + ) { + // server didn't honor `filter_subjects` + throw new Error( + `jetstream server doesn't support consumers with multiple filter subjects`, + ); + } jsi.name = ci.name; jsi.config = ci.config; jsi.last = ci; diff --git a/nats-base-client/jsconsumeropts.ts b/nats-base-client/jsconsumeropts.ts index 5b350d06..ce56067e 100644 --- a/nats-base-client/jsconsumeropts.ts +++ b/nats-base-client/jsconsumeropts.ts @@ -1,5 +1,5 @@ /* - * Copyright 2021 The NATS Authors + * Copyright 2021-2023 The NATS Authors * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at @@ -46,6 +46,7 @@ export class ConsumerOptsBuilderImpl implements ConsumerOptsBuilder { max?: number; qname?: string; isBind?: boolean; + filters?: string[]; constructor(opts?: Partial) { this.stream = ""; @@ -56,7 +57,18 @@ export class ConsumerOptsBuilderImpl implements ConsumerOptsBuilder { getOpts(): ConsumerOpts { const o = {} as ConsumerOpts; - o.config = this.config; + o.config = Object.assign({}, this.config); + if (o.config.filter_subject) { + this.filterSubject(o.config.filter_subject); + o.config.filter_subject = undefined; + } + if (o.config.filter_subjects) { + o.config.filter_subjects?.forEach((v) => { + this.filterSubject(v); + }); + o.config.filter_subjects = undefined; + } + o.mack = this.mack; o.stream = this.stream; o.callbackFn = this.callbackFn; @@ -65,6 +77,18 @@ export class ConsumerOptsBuilderImpl implements ConsumerOptsBuilder { o.ordered = this.ordered; o.config.ack_policy = o.ordered ? AckPolicy.None : o.config.ack_policy; o.isBind = o.isBind || false; + + if (this.filters) { + switch (this.filters.length) { + case 0: + break; + case 1: + o.config.filter_subject = this.filters[0]; + break; + default: + o.config.filter_subjects = this.filters; + } + } return o; } @@ -155,7 +179,8 @@ export class ConsumerOptsBuilderImpl implements ConsumerOptsBuilder { } filterSubject(s: string) { - this.config.filter_subject = s; + this.filters = this.filters || []; + this.filters.push(s); return this; } diff --git a/nats-base-client/jsmconsumer_api.ts b/nats-base-client/jsmconsumer_api.ts index 93833aeb..b622fa74 100644 --- a/nats-base-client/jsmconsumer_api.ts +++ b/nats-base-client/jsmconsumer_api.ts @@ -1,5 +1,5 @@ /* - * Copyright 2021 The NATS Authors + * Copyright 2021-2023 The NATS Authors * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at @@ -65,7 +65,7 @@ export class ConsumerAPIImpl extends BaseApiClient implements ConsumerAPI { } const nci = this.nc as NatsConnectionImpl; - const { min, ok: newAPI } = nci.features.get( + let { min, ok: newAPI } = nci.features.get( Feature.JS_NEW_CONSUMER_CREATE_API, ); @@ -82,6 +82,16 @@ export class ConsumerAPIImpl extends BaseApiClient implements ConsumerAPI { let subj; let consumerName = ""; + // new api doesn't support multiple filter subjects + // this delayed until here because the consumer in an update could have + // been created with the new API, and have a `name` + if (Array.isArray(cfg.filter_subjects)) { + const { min, ok } = nci.features.get(Feature.JS_MULTIPLE_CONSUMER_FILTER); + if (!ok) { + throw new Error(`consumer 'filter_subjects' requires server ${min}`); + } + newAPI = false; + } if (newAPI) { consumerName = cfg.name ?? cfg.durable_name ?? ""; } diff --git a/nats-base-client/semver.ts b/nats-base-client/semver.ts index 7a70a46c..69a844de 100644 --- a/nats-base-client/semver.ts +++ b/nats-base-client/semver.ts @@ -1,3 +1,18 @@ +/* + * Copyright 2022-2023 The NATS Authors + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + export type SemVer = { major: number; minor: number; micro: number }; export function parseSemVer( s = "", @@ -28,6 +43,7 @@ export enum Feature { JS_PULL_MAX_BYTES = "js_pull_max_bytes", JS_NEW_CONSUMER_CREATE_API = "js_new_consumer_create", JS_ALLOW_DIRECT = "js_allow_direct", + JS_MULTIPLE_CONSUMER_FILTER = "js_multiple_consumer_filter", } type FeatureVersion = { @@ -76,6 +92,7 @@ export class Features { this.set(Feature.JS_PULL_MAX_BYTES, "2.8.3"); this.set(Feature.JS_NEW_CONSUMER_CREATE_API, "2.9.0"); this.set(Feature.JS_ALLOW_DIRECT, "2.9.0"); + this.set(Feature.JS_MULTIPLE_CONSUMER_FILTER, "2.10.0"); this.disabled.forEach((f) => { this.features.delete(f); diff --git a/nats-base-client/types.ts b/nats-base-client/types.ts index 8e1c5e56..5993490b 100644 --- a/nats-base-client/types.ts +++ b/nats-base-client/types.ts @@ -1173,6 +1173,7 @@ export interface ConsumerOptsBuilder { maxDeliver(max: number): this; /** * Consumer should filter the messages to those that match the specified filter. + * This api can be called multiple times. * @param s */ filterSubject(s: string): this; @@ -2420,10 +2421,6 @@ export interface ConsumerConfig extends ConsumerUpdateConfig { * The consumer name */ name?: string; - /** - * Deliver only messages that match the subject filter - */ - "filter_subject"?: string; /** * For push consumers this will regularly send an empty mess with Status header 100 * and a reply subject, consumers must reply to these messages to control @@ -2521,6 +2518,16 @@ export interface ConsumerUpdateConfig { * Force the consumer state to be kept in memory rather than inherit the setting from the stream */ "mem_storage"?: boolean; + /** + * Deliver only messages that match the subject filter + * This is exclusive of {@link filter_subjects} + */ + "filter_subject"?: string; + /** + * Deliver only messages that match the specified filters. + * This is exclusive of {@link filter_subject}. + */ + "filter_subjects"?: string[]; } export interface Consumer { diff --git a/tests/jetstream_test.ts b/tests/jetstream_test.ts index 2a8ac980..01beb1e7 100644 --- a/tests/jetstream_test.ts +++ b/tests/jetstream_test.ts @@ -1,5 +1,5 @@ /* - * Copyright 2021-2022 The NATS Authors + * Copyright 2021-2023 The NATS Authors * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at @@ -75,6 +75,10 @@ import { isHeartbeatMsg, Js409Errors, } from "../nats-base-client/jsutil.ts"; +import { + assertArrayIncludes, + assertExists, +} from "https://deno.land/std@0.173.0/testing/asserts.ts"; function callbackConsume(debug = false): JsMsgCallback { return (err: NatsError | null, jm: JsMsg | null) => { @@ -4230,3 +4234,122 @@ Deno.test("jetstream - push heartbeat callback", async () => { await reconnected; await cleanup(ns, nc); }); + +Deno.test("jetstream - consumer opt multi subject filter", () => { + let opts = new ConsumerOptsBuilderImpl(); + opts.filterSubject("foo"); + let co = opts.getOpts(); + assertEquals(co.config.filter_subject, "foo"); + + opts.filterSubject("bar"); + co = opts.getOpts(); + assertEquals(co.config.filter_subject, undefined); + assertExists(co.config.filter_subjects); + assertArrayIncludes(co.config.filter_subjects, ["foo", "bar"]); +}); + +Deno.test("jetstream - push multi-subject filter", async () => { + const { ns, nc } = await setup(jetstreamServerConf()); + if (await notCompatible(ns, nc, "2.10.0")) { + return; + } + const name = nuid.next(); + const jsm = await nc.jetstreamManager(); + const js = nc.jetstream(); + await jsm.streams.add({ name, subjects: [`a.>`] }); + + const opts = consumerOpts() + .durable("me") + .ackExplicit() + .filterSubject("a.b") + .filterSubject("a.c") + .deliverTo(createInbox()) + .callback((_err, msg) => { + msg?.ack(); + }); + + const sub = await js.subscribe("a.>", opts); + const ci = await sub.consumerInfo(); + assertExists(ci.config.filter_subjects); + assertArrayIncludes(ci.config.filter_subjects, ["a.b", "a.c"]); + + await cleanup(ns, nc); +}); + +Deno.test("jetstream - pull multi-subject filter", async () => { + const { ns, nc } = await setup(jetstreamServerConf()); + if (await notCompatible(ns, nc, "2.10.0")) { + return; + } + const name = nuid.next(); + const jsm = await nc.jetstreamManager(); + const js = nc.jetstream(); + await jsm.streams.add({ name, subjects: [`a.>`] }); + + const opts = consumerOpts() + .durable("me") + .ackExplicit() + .filterSubject("a.b") + .filterSubject("a.c") + .callback((_err, msg) => { + msg?.ack(); + }); + + const sub = await js.pullSubscribe("a.>", opts); + const ci = await sub.consumerInfo(); + assertExists(ci.config.filter_subjects); + assertArrayIncludes(ci.config.filter_subjects, ["a.b", "a.c"]); + + await cleanup(ns, nc); +}); + +Deno.test("jetstream - push single filter", async () => { + const { ns, nc } = await setup(jetstreamServerConf()); + if (await notCompatible(ns, nc, "2.10.0")) { + return; + } + const name = nuid.next(); + const jsm = await nc.jetstreamManager(); + const js = nc.jetstream(); + await jsm.streams.add({ name, subjects: [`a.>`] }); + + const opts = consumerOpts() + .durable("me") + .ackExplicit() + .filterSubject("a.b") + .deliverTo(createInbox()) + .callback((_err, msg) => { + msg?.ack(); + }); + + const sub = await js.subscribe("a.>", opts); + const ci = await sub.consumerInfo(); + assertEquals(ci.config.filter_subject, "a.b"); + + await cleanup(ns, nc); +}); + +Deno.test("jetstream - pull single filter", async () => { + const { ns, nc } = await setup(jetstreamServerConf()); + if (await notCompatible(ns, nc, "2.10.0")) { + return; + } + const name = nuid.next(); + const jsm = await nc.jetstreamManager(); + const js = nc.jetstream(); + await jsm.streams.add({ name, subjects: [`a.>`] }); + + const opts = consumerOpts() + .durable("me") + .ackExplicit() + .filterSubject("a.b") + .callback((_err, msg) => { + msg?.ack(); + }); + + const sub = await js.pullSubscribe("a.>", opts); + const ci = await sub.consumerInfo(); + assertEquals(ci.config.filter_subject, "a.b"); + + await cleanup(ns, nc); +}); diff --git a/tests/jsm_test.ts b/tests/jsm_test.ts index 6d0808de..a42f34fe 100644 --- a/tests/jsm_test.ts +++ b/tests/jsm_test.ts @@ -1,5 +1,5 @@ /* - * Copyright 2021-2022 The NATS Authors + * Copyright 2021-2 The NATS Authors * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at @@ -1849,3 +1849,167 @@ Deno.test("jsm - remap domain", () => { { name: "b", external: { api: "$JS.b.API" } }, ]); }); + +Deno.test("jsm - filter_subjects", async () => { + const { ns, nc } = await setup( + jetstreamServerConf({}, true), + ); + if (await notCompatible(ns, nc, "2.10.0")) { + return; + } + + const jsm = await nc.jetstreamManager(); + const name = nuid.next(); + await jsm.streams.add({ name, subjects: [`a.>`] }); + const ci = await jsm.consumers.add(name, { + durable_name: "dur", + filter_subjects: [ + "a.b", + "a.c", + ], + ack_policy: AckPolicy.Explicit, + }); + assertEquals(ci.config.filter_subject, undefined); + assert(Array.isArray(ci.config.filter_subjects)); + assertArrayIncludes(ci.config.filter_subjects, ["a.b", "a.c"]); + await cleanup(ns, nc); +}); + +Deno.test("jsm - filter_subjects rejects filter_subject", async () => { + const { ns, nc } = await setup( + jetstreamServerConf({}, true), + ); + if (await notCompatible(ns, nc, "2.10.0")) { + return; + } + + const jsm = await nc.jetstreamManager(); + const name = nuid.next(); + await jsm.streams.add({ name, subjects: [`a.>`] }); + await assertRejects( + async () => { + await jsm.consumers.add(name, { + durable_name: "dur", + filter_subject: "a.a", + filter_subjects: [ + "a.b", + "a.c", + ], + ack_policy: AckPolicy.Explicit, + }); + }, + Error, + "consumer cannot have both filtersubject and filtersubjects specified", + ); + await cleanup(ns, nc); +}); + +Deno.test("jsm - update filter_subject", async () => { + const { ns, nc } = await setup( + jetstreamServerConf({}, true), + ); + if (await notCompatible(ns, nc, "2.10.0")) { + return; + } + const jsm = await nc.jetstreamManager(); + const name = nuid.next(); + await jsm.streams.add({ name, subjects: [`a.>`] }); + + let ci = await jsm.consumers.add(name, { + durable_name: "dur", + filter_subject: "a.x", + ack_policy: AckPolicy.Explicit, + }); + assertEquals(ci.config.filter_subject, "a.x"); + + ci = await jsm.consumers.update(name, "dur", { + filter_subject: "a.y", + }); + assertEquals(ci.config.filter_subject, "a.y"); + + await cleanup(ns, nc); +}); + +Deno.test("jsm - update filter_subjects", async () => { + const { ns, nc } = await setup( + jetstreamServerConf({}, true), + ); + if (await notCompatible(ns, nc, "2.10.0")) { + return; + } + const jsm = await nc.jetstreamManager(); + const name = nuid.next(); + await jsm.streams.add({ name, subjects: [`a.>`] }); + + let ci = await jsm.consumers.add(name, { + durable_name: "dur", + filter_subjects: ["a.x"], + ack_policy: AckPolicy.Explicit, + }); + assertArrayIncludes(ci.config.filter_subjects!, ["a.x"]); + + ci = await jsm.consumers.update(name, "dur", { + filter_subjects: ["a.x", "a.y"], + }); + assertArrayIncludes(ci.config.filter_subjects!, ["a.x", "a.y"]); + + await cleanup(ns, nc); +}); + +Deno.test("jsm - update from filter_subject to filter_subjects", async () => { + const { ns, nc } = await setup( + jetstreamServerConf({}, true), + ); + if (await notCompatible(ns, nc, "2.10.0")) { + return; + } + const jsm = await nc.jetstreamManager(); + const name = nuid.next(); + await jsm.streams.add({ name, subjects: [`a.>`] }); + + let ci = await jsm.consumers.add(name, { + durable_name: "dur", + filter_subject: "a.x", + ack_policy: AckPolicy.Explicit, + }); + assertEquals(ci.config.filter_subject, "a.x"); + + // fail if not removing filter_subject + await assertRejects( + async () => { + await jsm.consumers.update(name, "dur", { + filter_subjects: ["a.x", "a.y"], + }); + }, + Error, + "consumer cannot have both filtersubject and filtersubjects specified", + ); + // now switch it + ci = await jsm.consumers.update(name, "dur", { + filter_subject: "", + filter_subjects: ["a.x", "a.y"], + }); + assertEquals(ci.config.filter_subject, undefined); + assertArrayIncludes(ci.config.filter_subjects!, ["a.x", "a.y"]); + + // fail if not removing filter_subjects + await assertRejects( + async () => { + await jsm.consumers.update(name, "dur", { + filter_subject: "a.x", + }); + }, + Error, + "consumer cannot have both filtersubject and filtersubjects specified", + ); + + // and from filter_subjects back + ci = await jsm.consumers.update(name, "dur", { + filter_subject: "a.x", + filter_subjects: [], + }); + assertEquals(ci.config.filter_subject, "a.x"); + assertEquals(ci.config.filter_subjects!, undefined); + + await cleanup(ns, nc); +}); From f7d430bb9806bd8c8709a6b2f4b3dba94ae0574e Mon Sep 17 00:00:00 2001 From: aricart Date: Thu, 19 Jan 2023 17:20:21 -0400 Subject: [PATCH 2/3] [FEAT] consumer filter_subjects support - this feature is exclusive of the filter_subject but allows a consumer to filter multiple subjects in a stream. To set, specify filter_subjects instead of filter_subject or if using consumer options builder filterSubject() can be called multiple times. [CHANGE] filter_subject (and filter_subjects) can be updated, the server removed this restriction --- tests/jsm_test.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/jsm_test.ts b/tests/jsm_test.ts index a42f34fe..da111e1b 100644 --- a/tests/jsm_test.ts +++ b/tests/jsm_test.ts @@ -1,5 +1,5 @@ /* - * Copyright 2021-2 The NATS Authors + * Copyright 2021-2023 The NATS Authors * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at From b870cc017ed305c1afa407127d715ac6701ab7be Mon Sep 17 00:00:00 2001 From: aricart Date: Thu, 19 Jan 2023 17:22:54 -0400 Subject: [PATCH 3/3] cleaned up test import --- tests/jetstream_test.ts | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/tests/jetstream_test.ts b/tests/jetstream_test.ts index 01beb1e7..c001a44d 100644 --- a/tests/jetstream_test.ts +++ b/tests/jetstream_test.ts @@ -52,7 +52,9 @@ import { StringCodec, } from "../nats-base-client/internal_mod.ts"; import { + assertArrayIncludes, assertEquals, + assertExists, assertIsError, assertRejects, assertThrows, @@ -75,10 +77,6 @@ import { isHeartbeatMsg, Js409Errors, } from "../nats-base-client/jsutil.ts"; -import { - assertArrayIncludes, - assertExists, -} from "https://deno.land/std@0.173.0/testing/asserts.ts"; function callbackConsume(debug = false): JsMsgCallback { return (err: NatsError | null, jm: JsMsg | null) => {