Skip to content

Commit

Permalink
[FEAT] added some kv extensions to delete subkeys as this is super us…
Browse files Browse the repository at this point in the history
…eful to some applications

[FIX] changed kv#keys() to omit keys that have been deleted
  • Loading branch information
aricart committed Sep 8, 2021
1 parent 9fd6418 commit 5703ba8
Show file tree
Hide file tree
Showing 4 changed files with 132 additions and 12 deletions.
38 changes: 32 additions & 6 deletions nats-base-client/kv.ts
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ export interface RoKV {
watch(opts?: { key?: string }): Promise<QueuedIterator<Entry>>;
close(): Promise<void>;
status(): Promise<KvStatus>;
keys(): Promise<string[]>;
keys(k?: string): Promise<string[]>;
}

export interface KV extends RoKV {
Expand Down Expand Up @@ -398,7 +398,7 @@ export class Bucket implements KV, RemoveKV {
}
}

async delete(k: string): Promise<void> {
async _delete(k: string): Promise<void> {
const ek = this.encodeKey(k);
this.validateKey(ek);
const ji = this.js as JetStreamClientImpl;
Expand All @@ -409,6 +409,30 @@ export class Bucket implements KV, RemoveKV {
await this.js.publish(this.subjectForKey(ek), Empty, { headers: h });
}

async delete(k: string): Promise<void> {
if (!this.hasWildcards(k)) {
return this._delete(k);
}
const keys = await this.keys(k);
if (keys.length === 0) {
return;
}
const d = deferred<void>();
const buf: Promise<void>[] = [];
for (const k of keys) {
buf.push(this._delete(k));
}
Promise.all(buf)
.then(() => {
d.resolve();
})
.catch((err) => {
d.reject(err);
});

return d;
}

consumerOn(k: string, history = false): Promise<ConsumerInfo> {
const ek = this.encodeKey(k);
this.validateSearchKey(k);
Expand Down Expand Up @@ -556,10 +580,10 @@ export class Bucket implements KV, RemoveKV {
return qi;
}

async keys(): Promise<string[]> {
async keys(k = ">"): Promise<string[]> {
const d = deferred<string[]>();
const keys: string[] = [];
const ci = await this.consumerOn(">", false);
const ci = await this.consumerOn(k, false);
if (ci.num_pending === 0) {
return Promise.resolve(keys);
}
Expand All @@ -577,8 +601,10 @@ export class Bucket implements KV, RemoveKV {
m.respond();
} else {
const jm = toJsMsg(m);
const key = this.decodeKey(jm.subject.substring(this.prefixLen));
keys.push(key);
if (jm.headers?.get(kvOperationHdr) !== "DEL") {
const key = this.decodeKey(jm.subject.substring(this.prefixLen));
keys.push(key);
}
m.respond();
const info = parseInfo(m.reply!);
if (info.pending === 0) {
Expand Down
6 changes: 2 additions & 4 deletions tests/ekv_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -281,8 +281,8 @@ async function keys(bucket: Bucket): Promise<void> {
await b.put("x", "");

const keys = await b.keys();
assertEquals(keys.length, 4);
assertArrayIncludes(keys, ["a", "b", "c.c.c", "x"]);
assertEquals(keys.length, 3);
assertArrayIncludes(keys, ["a", "b", "x"]);
}

Deno.test("ekv - keys", async () => {
Expand Down Expand Up @@ -319,8 +319,6 @@ Deno.test("ekv - complex key", async () => {
jetstreamServerConf({}, true),
);

console.log("server running on port", nc.info!.port);

if (await notCompatible(ns, nc)) {
return;
}
Expand Down
72 changes: 72 additions & 0 deletions tests/jetstream_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import {
ConsumerOpts,
consumerOpts,
createInbox,
deferred,
delay,
DeliverPolicy,
Empty,
Expand Down Expand Up @@ -2009,3 +2010,74 @@ Deno.test("jetstream - reuse consumer", async () => {

await cleanup(ns, nc);
});

Deno.test("jetstream - pull sub - multiple consumers", async () => {
const { ns, nc } = await setup(jetstreamServerConf({}, true));
const { stream, subj } = await initStream(nc);
const jsm = await nc.jetstreamManager();
const js = nc.jetstream();
const buf: Promise<PubAck>[] = [];
for (let i = 0; i < 100; i++) {
buf.push(js.publish(subj, Empty));
}
await Promise.all(buf);

let ci = await jsm.consumers.add(stream, {
durable_name: "me",
ack_policy: AckPolicy.Explicit,
});
assertEquals(ci.num_pending, 100);

let countA = 0;
let countB = 0;
const m = new Map<number, number>();

const opts = consumerOpts();
opts.durable("me");
opts.ackExplicit();
opts.deliverAll();
const subA = await js.pullSubscribe(subj, opts);
(async () => {
for await (const msg of subA) {
const v = m.get(msg.seq) ?? 0;
m.set(msg.seq, v + 1);
countA++;
msg.ack();
}
})().then();

const subB = await js.pullSubscribe(subj, opts);
(async () => {
for await (const msg of subB) {
const v = m.get(msg.seq) ?? 0;
m.set(msg.seq, v + 1);
countB++;
msg.ack();
}
})().then();

const done = deferred<void>();
const interval = setInterval(() => {
if (countA + countB < 100) {
subA.pull({ expires: 500, batch: 25 });
subB.pull({ expires: 500, batch: 25 });
} else {
clearInterval(interval);
done.resolve();
}
}, 25);

await done;

ci = await jsm.consumers.info(stream, "me");
assertEquals(ci.num_pending, 0);
assert(countA > 0);
assert(countB > 0);
assertEquals(countA + countB, 100);

for (let i = 1; i <= 100; i++) {
assertEquals(m.get(i), 1);
}

await cleanup(ns, nc);
});
28 changes: 26 additions & 2 deletions tests/kv_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -426,8 +426,8 @@ async function keys(b: Bucket): Promise<void> {
await b.put("x", Empty);

const keys = await b.keys();
assertEquals(keys.length, 4);
assertArrayIncludes(keys, ["a", "b", "c.c.c", "x"]);
assertEquals(keys.length, 3);
assertArrayIncludes(keys, ["a", "b", "x"]);
}

Deno.test("kv - keys", async () => {
Expand Down Expand Up @@ -585,3 +585,27 @@ Deno.test("kv - remove key", async () => {

await cleanup(ns, nc);
});

Deno.test("kv - remove subkey", async () => {
const { ns, nc } = await setup(
jetstreamServerConf({}, true),
);
if (await notCompatible(ns, nc)) {
return;
}
const b = await Bucket.create(nc, nuid.next()) as Bucket;
await b.put("a", Empty);
await b.put("a.b", Empty);
await b.put("a.c", Empty);

let keys = await b.keys();
assertEquals(keys.length, 3);
assertArrayIncludes(keys, ["a", "a.b", "a.c"]);

await b.delete("a.*");
keys = await b.keys();
assertEquals(keys.length, 1);
assertArrayIncludes(keys, ["a"]);

await cleanup(ns, nc);
});

0 comments on commit 5703ba8

Please sign in to comment.