Skip to content

Commit

Permalink
Merge pull request #662 from nats-io/kv-filtered-watcher
Browse files Browse the repository at this point in the history
allow internal kv watch consumers to use filtered consumer jetstream api
  • Loading branch information
aricart committed Mar 13, 2024
2 parents d045fd7 + e24dbde commit d0ba08f
Show file tree
Hide file tree
Showing 4 changed files with 50 additions and 2 deletions.
1 change: 0 additions & 1 deletion jetstream/jsclient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,6 @@ export enum PubHeaders {

class ViewsImpl implements Views {
js: JetStreamClientImpl;
jsm?: JetStreamManager;
constructor(js: JetStreamClientImpl) {
this.js = js;
}
Expand Down
16 changes: 15 additions & 1 deletion jetstream/kv.ts
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,8 @@ import {
} from "./jsapi_types.ts";
import { JsMsg } from "./jsmsg.ts";
import { NatsConnectionImpl } from "../nats-base-client/nats.ts";
import { PubHeaders } from "./jsclient.ts";
import { JetStreamClientImpl, PubHeaders } from "./jsclient.ts";
import { nuid } from "../nats-base-client/nuid.ts";

export function Base64KeyCodec(): KvCodec<string> {
return {
Expand Down Expand Up @@ -743,6 +744,15 @@ export class Bucket implements KV, KvRemove {
return qi;
}

canSetWatcherName(): boolean {
const jsi = this.js as JetStreamClientImpl;
const nci = jsi.nc as NatsConnectionImpl;
const { ok } = nci.features.get(
Feature.JS_NEW_CONSUMER_CREATE_API,
);
return ok;
}

async watch(
opts: KvWatchOptions = {},
): Promise<QueuedIterator<KvEntry>> {
Expand All @@ -765,6 +775,10 @@ export class Bucket implements KV, KvRemove {
const cc = this._buildCC(k, content, co);
const subj = cc.filter_subject!;
const copts = consumerOpts(cc);

if (this.canSetWatcherName()) {
copts.consumerName(nuid.next());
}
copts.bindStream(this.stream);
if (opts.resumeFromRevision && opts.resumeFromRevision > 0) {
copts.startSequence(opts.resumeFromRevision);
Expand Down
24 changes: 24 additions & 0 deletions jetstream/tests/kv_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import {
parseSemVer,
QueuedIterator,
StringCodec,
syncIterator,
} from "../../nats-base-client/internal_mod.ts";

import {
Expand Down Expand Up @@ -2036,3 +2037,26 @@ Deno.test("kv - bind no info", async () => {

await cleanup(ns, nc);
});

Deno.test("kv - watcher will name and filter", async () => {
const { ns, nc } = await setup(
jetstreamServerConf({}, true),
);
if (await notCompatible(ns, nc, "2.6.3")) {
return;
}

const js = nc.jetstream();
const kv = await js.views.kv("A");

const sub = syncIterator(nc.subscribe("$JS.API.>"));
const iter = await kv.watch({ key: "a.>" });

const m = await sub.next();
assert(m?.subject.startsWith("$JS.API.CONSUMER.CREATE.KV_A."));
assert(m?.subject.endsWith("$KV.A.a.>"));

iter.stop();

await cleanup(ns, nc);
});
11 changes: 11 additions & 0 deletions jetstream/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -749,6 +749,12 @@ export interface ConsumerOptsBuilder {
* When set do not inherit the replica count from the stream but specifically set it to this amount
*/
numReplicas(n: number): this;

/**
* The name of the consumer
* @param n
*/
consumerName(n: string): this;
}

/**
Expand Down Expand Up @@ -1895,6 +1901,11 @@ export class ConsumerOptsBuilderImpl implements ConsumerOptsBuilder {
this.config.num_replicas = n;
return this;
}

consumerName(n: string) {
this.config.name = n;
return this;
}
}

export function consumerOpts(
Expand Down

0 comments on commit d0ba08f

Please sign in to comment.