Skip to content

Commit

Permalink
[FEAT] [JETSTREAM] [KV] support multiple key filters on watchers/hist…
Browse files Browse the repository at this point in the history
…ory. Note this functionality requires server 2.10.0 or better
  • Loading branch information
aricart committed Apr 6, 2024
1 parent 1c061a9 commit 29b791b
Show file tree
Hide file tree
Showing 4 changed files with 80 additions and 9 deletions.
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,11 @@ lint:
deno lint --ignore=docs/

test: clean
deno test --allow-all --unstable --parallel --reload --quiet --coverage=coverage tests/ jetstream/tests
deno test --allow-all --parallel --reload --quiet --coverage=coverage tests/ jetstream/tests


testw: clean
deno test --allow-all --unstable --reload --parallel --watch --fail-fast tests/ jetstream/
deno test --allow-all --reload --parallel --watch --fail-fast tests/ jetstream/

cover:
deno coverage --unstable ./coverage --lcov > ./coverage/out.lcov
Expand Down
22 changes: 17 additions & 5 deletions jetstream/kv.ts
Original file line number Diff line number Diff line change
Expand Up @@ -639,12 +639,16 @@ export class Bucket implements KV, KvRemove {
}

_buildCC(
k: string,
k: string | string[],
content: KvWatchInclude,
opts: Partial<ConsumerConfig> = {},
): Partial<ConsumerConfig> {
const ek = this.encodeKey(k);
this.validateSearchKey(k);
const a = !Array.isArray(k) ? [k] : k;
let filter_subjects: string[] | undefined = a.map((k) => {
const ek = this.encodeKey(k);
this.validateSearchKey(k);
return this.fullKeyName(ek);
});

let deliver_policy = DeliverPolicy.LastPerSubject;
if (content === KvWatchInclude.AllHistory) {
Expand All @@ -654,10 +658,17 @@ export class Bucket implements KV, KvRemove {
deliver_policy = DeliverPolicy.New;
}

let filter_subject: undefined | string = undefined;
if (filter_subjects.length === 1) {
filter_subject = filter_subjects[0];
filter_subjects = undefined;
}

return Object.assign({
deliver_policy,
"ack_policy": AckPolicy.None,
"filter_subject": this.fullKeyName(ek),
filter_subjects,
filter_subject,
"flow_control": true,
"idle_heartbeat": nanos(5 * 1000),
}, opts) as Partial<ConsumerConfig>;
Expand All @@ -668,7 +679,7 @@ export class Bucket implements KV, KvRemove {
}

async history(
opts: { key?: string; headers_only?: boolean } = {},
opts: { key?: string | string[]; headers_only?: boolean } = {},
): Promise<QueuedIterator<KvEntry>> {
const k = opts.key ?? ">";
const qi = new QueuedIteratorImpl<KvEntry>();
Expand Down Expand Up @@ -773,6 +784,7 @@ export class Bucket implements KV, KvRemove {
let count = 0;

const cc = this._buildCC(k, content, co);
console.log(cc);
const subj = cc.filter_subject!;
const copts = consumerOpts(cc);

Expand Down
57 changes: 57 additions & 0 deletions jetstream/tests/kv_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,31 @@ Deno.test("kv - history", async () => {
await cleanup(ns, nc);
});

Deno.test("kv - history multiple keys", async () => {
const { ns, nc } = await setup(
jetstreamServerConf({}),
);
const n = nuid.next();
const js = nc.jetstream();
const bucket = await js.views.kv(n, { history: 2 });

await bucket.put("A", Empty);
await bucket.put("B", Empty);
await bucket.put("C", Empty);
await bucket.put("D", Empty);

const iter = await bucket.history({ key: ["A", "D"] });
const buf = [];
for await (const e of iter) {
buf.push(e.key);
}

assertEquals(buf.length, 2);
assertArrayIncludes(buf, ["A", "D"]);

await cleanup(ns, nc);
});

Deno.test("kv - cleanups/empty", async () => {
const { ns, nc } = await setup(
jetstreamServerConf({}),
Expand Down Expand Up @@ -1808,6 +1833,38 @@ Deno.test("kv - watch updates only", async () => {
await cleanup(ns, nc);
});

Deno.test("kv - watch multiple keys", async () => {
const { ns, nc } = await setup(jetstreamServerConf({}));

const js = nc.jetstream();
const kv = await js.views.kv("K");

await kv.put("a", "a");
await kv.put("b", "b");
await kv.put("c", "c");

const d = deferred();
const iter = await kv.watch({
key: ["a", "c"],
initializedFn: () => {
d.resolve();
},
});

const notifications: string[] = [];
(async () => {
for await (const e of iter) {
notifications.push(e.key);
}
})().then();
await d;

assertEquals(notifications.length, 2);
assertArrayIncludes(notifications, ["a", "c"]);

await cleanup(ns, nc);
});

Deno.test("kv - watch history", async () => {
const { ns, nc } = await setup(jetstreamServerConf({}));

Expand Down
6 changes: 4 additions & 2 deletions jetstream/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1132,8 +1132,9 @@ export enum KvWatchInclude {
export type KvWatchOptions = {
/**
* A key or wildcarded key following keys as if they were NATS subject names.
* Note you can specify multiple keys if running on server 2.10.x or better.
*/
key?: string;
key?: string | string[];
/**
* Notification should only include entry headers
*/
Expand Down Expand Up @@ -1173,9 +1174,10 @@ export interface RoKV {

/**
* Returns an iterator of the specified key's history (or all keys).
* Note you can specify multiple keys if running on server 2.10.x or better.
* @param opts
*/
history(opts?: { key?: string }): Promise<QueuedIterator<KvEntry>>;
history(opts?: { key?: string | string[] }): Promise<QueuedIterator<KvEntry>>;

/**
* Returns an iterator that will yield KvEntry updates as they happen.
Expand Down

0 comments on commit 29b791b

Please sign in to comment.