Skip to content

Commit

Permalink
Merge pull request #469 from nats-io/multiple-consumer-filters2
Browse files Browse the repository at this point in the history
[FEAT] multiple consumer filters
  • Loading branch information
aricart committed Jan 19, 2023
2 parents 64da87c + b870cc0 commit e5bea9a
Show file tree
Hide file tree
Showing 7 changed files with 371 additions and 16 deletions.
21 changes: 16 additions & 5 deletions nats-base-client/jsclient.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2022 The NATS Authors
* Copyright 2022-2023 The NATS Authors
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
Expand Down Expand Up @@ -410,9 +410,6 @@ export class JetStreamClientImpl extends BaseApiClient
if (cso.ordered) {
throw new Error("pull subscribers cannot be be ordered");
}
if (!cso.attached) {
cso.config.filter_subject = subject;
}
if (cso.config.deliver_subject) {
throw new Error(
"consumer info specifies deliver_subject - pull consumers cannot have deliver_subject set",
Expand Down Expand Up @@ -587,7 +584,11 @@ export class JetStreamClientImpl extends BaseApiClient
}
}

if (!jsi.attached) {
if (
!jsi.attached && jsi.config.filter_subject === undefined &&
jsi.config.filter_subjects === undefined
) {
// if no filter specified, we set the subject as the filter
jsi.config.filter_subject = subject;
}

Expand Down Expand Up @@ -642,6 +643,16 @@ export class JetStreamClientImpl extends BaseApiClient
}, jsi.config);

const ci = await this.api.add(jsi.stream, jsi.config);
if (
Array.isArray(
jsi.config.filter_subjects && !Array.isArray(ci.config.filter_subjects),
)
) {
// server didn't honor `filter_subjects`
throw new Error(
`jetstream server doesn't support consumers with multiple filter subjects`,
);
}
jsi.name = ci.name;
jsi.config = ci.config;
jsi.last = ci;
Expand Down
31 changes: 28 additions & 3 deletions nats-base-client/jsconsumeropts.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2021 The NATS Authors
* Copyright 2021-2023 The NATS Authors
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
Expand Down Expand Up @@ -46,6 +46,7 @@ export class ConsumerOptsBuilderImpl implements ConsumerOptsBuilder {
max?: number;
qname?: string;
isBind?: boolean;
filters?: string[];

constructor(opts?: Partial<ConsumerConfig>) {
this.stream = "";
Expand All @@ -56,7 +57,18 @@ export class ConsumerOptsBuilderImpl implements ConsumerOptsBuilder {

getOpts(): ConsumerOpts {
const o = {} as ConsumerOpts;
o.config = this.config;
o.config = Object.assign({}, this.config);
if (o.config.filter_subject) {
this.filterSubject(o.config.filter_subject);
o.config.filter_subject = undefined;
}
if (o.config.filter_subjects) {
o.config.filter_subjects?.forEach((v) => {
this.filterSubject(v);
});
o.config.filter_subjects = undefined;
}

o.mack = this.mack;
o.stream = this.stream;
o.callbackFn = this.callbackFn;
Expand All @@ -65,6 +77,18 @@ export class ConsumerOptsBuilderImpl implements ConsumerOptsBuilder {
o.ordered = this.ordered;
o.config.ack_policy = o.ordered ? AckPolicy.None : o.config.ack_policy;
o.isBind = o.isBind || false;

if (this.filters) {
switch (this.filters.length) {
case 0:
break;
case 1:
o.config.filter_subject = this.filters[0];
break;
default:
o.config.filter_subjects = this.filters;
}
}
return o;
}

Expand Down Expand Up @@ -155,7 +179,8 @@ export class ConsumerOptsBuilderImpl implements ConsumerOptsBuilder {
}

filterSubject(s: string) {
this.config.filter_subject = s;
this.filters = this.filters || [];
this.filters.push(s);
return this;
}

Expand Down
14 changes: 12 additions & 2 deletions nats-base-client/jsmconsumer_api.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2021 The NATS Authors
* Copyright 2021-2023 The NATS Authors
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
Expand Down Expand Up @@ -65,7 +65,7 @@ export class ConsumerAPIImpl extends BaseApiClient implements ConsumerAPI {
}

const nci = this.nc as NatsConnectionImpl;
const { min, ok: newAPI } = nci.features.get(
let { min, ok: newAPI } = nci.features.get(
Feature.JS_NEW_CONSUMER_CREATE_API,
);

Expand All @@ -82,6 +82,16 @@ export class ConsumerAPIImpl extends BaseApiClient implements ConsumerAPI {

let subj;
let consumerName = "";
// new api doesn't support multiple filter subjects
// this delayed until here because the consumer in an update could have
// been created with the new API, and have a `name`
if (Array.isArray(cfg.filter_subjects)) {
const { min, ok } = nci.features.get(Feature.JS_MULTIPLE_CONSUMER_FILTER);
if (!ok) {
throw new Error(`consumer 'filter_subjects' requires server ${min}`);
}
newAPI = false;
}
if (newAPI) {
consumerName = cfg.name ?? cfg.durable_name ?? "";
}
Expand Down
17 changes: 17 additions & 0 deletions nats-base-client/semver.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,18 @@
/*
* Copyright 2022-2023 The NATS Authors
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

export type SemVer = { major: number; minor: number; micro: number };
export function parseSemVer(
s = "",
Expand Down Expand Up @@ -28,6 +43,7 @@ export enum Feature {
JS_PULL_MAX_BYTES = "js_pull_max_bytes",
JS_NEW_CONSUMER_CREATE_API = "js_new_consumer_create",
JS_ALLOW_DIRECT = "js_allow_direct",
JS_MULTIPLE_CONSUMER_FILTER = "js_multiple_consumer_filter",
}

type FeatureVersion = {
Expand Down Expand Up @@ -76,6 +92,7 @@ export class Features {
this.set(Feature.JS_PULL_MAX_BYTES, "2.8.3");
this.set(Feature.JS_NEW_CONSUMER_CREATE_API, "2.9.0");
this.set(Feature.JS_ALLOW_DIRECT, "2.9.0");
this.set(Feature.JS_MULTIPLE_CONSUMER_FILTER, "2.10.0");

this.disabled.forEach((f) => {
this.features.delete(f);
Expand Down
15 changes: 11 additions & 4 deletions nats-base-client/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1173,6 +1173,7 @@ export interface ConsumerOptsBuilder {
maxDeliver(max: number): this;
/**
* Consumer should filter the messages to those that match the specified filter.
* This api can be called multiple times.
* @param s
*/
filterSubject(s: string): this;
Expand Down Expand Up @@ -2420,10 +2421,6 @@ export interface ConsumerConfig extends ConsumerUpdateConfig {
* The consumer name
*/
name?: string;
/**
* Deliver only messages that match the subject filter
*/
"filter_subject"?: string;
/**
* For push consumers this will regularly send an empty mess with Status header 100
* and a reply subject, consumers must reply to these messages to control
Expand Down Expand Up @@ -2521,6 +2518,16 @@ export interface ConsumerUpdateConfig {
* Force the consumer state to be kept in memory rather than inherit the setting from the stream
*/
"mem_storage"?: boolean;
/**
* Deliver only messages that match the subject filter
* This is exclusive of {@link filter_subjects}
*/
"filter_subject"?: string;
/**
* Deliver only messages that match the specified filters.
* This is exclusive of {@link filter_subject}.
*/
"filter_subjects"?: string[];
}

export interface Consumer {
Expand Down
123 changes: 122 additions & 1 deletion tests/jetstream_test.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2021-2022 The NATS Authors
* Copyright 2021-2023 The NATS Authors
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
Expand Down Expand Up @@ -52,7 +52,9 @@ import {
StringCodec,
} from "../nats-base-client/internal_mod.ts";
import {
assertArrayIncludes,
assertEquals,
assertExists,
assertIsError,
assertRejects,
assertThrows,
Expand Down Expand Up @@ -4230,3 +4232,122 @@ Deno.test("jetstream - push heartbeat callback", async () => {
await reconnected;
await cleanup(ns, nc);
});

Deno.test("jetstream - consumer opt multi subject filter", () => {
let opts = new ConsumerOptsBuilderImpl();
opts.filterSubject("foo");
let co = opts.getOpts();
assertEquals(co.config.filter_subject, "foo");

opts.filterSubject("bar");
co = opts.getOpts();
assertEquals(co.config.filter_subject, undefined);
assertExists(co.config.filter_subjects);
assertArrayIncludes(co.config.filter_subjects, ["foo", "bar"]);
});

Deno.test("jetstream - push multi-subject filter", async () => {
const { ns, nc } = await setup(jetstreamServerConf());
if (await notCompatible(ns, nc, "2.10.0")) {
return;
}
const name = nuid.next();
const jsm = await nc.jetstreamManager();
const js = nc.jetstream();
await jsm.streams.add({ name, subjects: [`a.>`] });

const opts = consumerOpts()
.durable("me")
.ackExplicit()
.filterSubject("a.b")
.filterSubject("a.c")
.deliverTo(createInbox())
.callback((_err, msg) => {
msg?.ack();
});

const sub = await js.subscribe("a.>", opts);
const ci = await sub.consumerInfo();
assertExists(ci.config.filter_subjects);
assertArrayIncludes(ci.config.filter_subjects, ["a.b", "a.c"]);

await cleanup(ns, nc);
});

Deno.test("jetstream - pull multi-subject filter", async () => {
const { ns, nc } = await setup(jetstreamServerConf());
if (await notCompatible(ns, nc, "2.10.0")) {
return;
}
const name = nuid.next();
const jsm = await nc.jetstreamManager();
const js = nc.jetstream();
await jsm.streams.add({ name, subjects: [`a.>`] });

const opts = consumerOpts()
.durable("me")
.ackExplicit()
.filterSubject("a.b")
.filterSubject("a.c")
.callback((_err, msg) => {
msg?.ack();
});

const sub = await js.pullSubscribe("a.>", opts);
const ci = await sub.consumerInfo();
assertExists(ci.config.filter_subjects);
assertArrayIncludes(ci.config.filter_subjects, ["a.b", "a.c"]);

await cleanup(ns, nc);
});

Deno.test("jetstream - push single filter", async () => {
const { ns, nc } = await setup(jetstreamServerConf());
if (await notCompatible(ns, nc, "2.10.0")) {
return;
}
const name = nuid.next();
const jsm = await nc.jetstreamManager();
const js = nc.jetstream();
await jsm.streams.add({ name, subjects: [`a.>`] });

const opts = consumerOpts()
.durable("me")
.ackExplicit()
.filterSubject("a.b")
.deliverTo(createInbox())
.callback((_err, msg) => {
msg?.ack();
});

const sub = await js.subscribe("a.>", opts);
const ci = await sub.consumerInfo();
assertEquals(ci.config.filter_subject, "a.b");

await cleanup(ns, nc);
});

Deno.test("jetstream - pull single filter", async () => {
const { ns, nc } = await setup(jetstreamServerConf());
if (await notCompatible(ns, nc, "2.10.0")) {
return;
}
const name = nuid.next();
const jsm = await nc.jetstreamManager();
const js = nc.jetstream();
await jsm.streams.add({ name, subjects: [`a.>`] });

const opts = consumerOpts()
.durable("me")
.ackExplicit()
.filterSubject("a.b")
.callback((_err, msg) => {
msg?.ack();
});

const sub = await js.pullSubscribe("a.>", opts);
const ci = await sub.consumerInfo();
assertEquals(ci.config.filter_subject, "a.b");

await cleanup(ns, nc);
});

0 comments on commit e5bea9a

Please sign in to comment.