Skip to content

Commit

Permalink
added internal functionality to delete key(s) from a KV (#193)
Browse files Browse the repository at this point in the history
  • Loading branch information
aricart committed Sep 8, 2021
1 parent a4e0d49 commit 9fd6418
Show file tree
Hide file tree
Showing 2 changed files with 67 additions and 1 deletion.
43 changes: 42 additions & 1 deletion nats-base-client/kv.ts
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,10 @@ const validKeyRe = /^[-/=.\w]+$/;
const validSearchKey = /^[-/=.>*\w]+$/;
const validBucketRe = /^[-\w]+$/;

export interface RemoveKV {
remove(k: string): Promise<void>;
}

export interface RoKV {
get(k: string): Promise<Entry | null>;
history(opts?: { key?: string }): Promise<QueuedIterator<Entry>>;
Expand Down Expand Up @@ -210,7 +214,7 @@ export function validateBucket(name: string) {
}
}

export class Bucket implements KV {
export class Bucket implements KV, RemoveKV {
jsm: JetStreamManager;
js: JetStreamClient;
stream!: string;
Expand Down Expand Up @@ -424,6 +428,43 @@ export class Bucket implements KV {
return this.jsm.consumers.add(this.stream, opts);
}

async remove(k: string): Promise<void> {
const ci = await this.consumerOn(k, true);
if (ci.num_pending === 0) {
await this.jsm.consumers.delete(this.stream, ci.name);
return;
} else {
const ji = this.js as JetStreamClientImpl;
const nc = ji.nc;
const buf: Promise<boolean>[] = [];
const sub = nc.subscribe(ci.config.deliver_subject!, {
callback: (err, msg) => {
if (err === null) {
err = checkJsError(msg);
}
if (err) {
sub.unsubscribe();
return;
}
if (isFlowControlMsg(msg) || isHeartbeatMsg(msg)) {
msg.respond();
return;
}
const jm = toJsMsg(msg);
buf.push(this.jsm.streams.deleteMessage(this.stream, jm.seq));
if (jm.info.pending === 0) {
sub.unsubscribe();
}
jm.ack();
},
});
if (buf.length) {
await Promise.all(buf);
}
await sub.closed;
}
}

async history(opts: { key?: string } = {}): Promise<QueuedIterator<Entry>> {
const k = opts.key ?? ">";
const ci = await this.consumerOn(k, true);
Expand Down
25 changes: 25 additions & 0 deletions tests/kv_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -560,3 +560,28 @@ Deno.test("kv - complex key", async () => {

await cleanup(ns, nc);
});

Deno.test("kv - remove key", async () => {
const { ns, nc } = await setup(
jetstreamServerConf({}, true),
);
if (await notCompatible(ns, nc)) {
return;
}
const sc = StringCodec();
const b = await Bucket.create(nc, nuid.next()) as Bucket;

await b.put("a.b", sc.encode("ab"));
let v = await b.get("a.b");
assert(v);
assertEquals(sc.decode(v.value), "ab");

await b.remove("a.b");
v = await b.get("a.b");
assertEquals(v, null);

const status = await b.status();
assertEquals(status.values, 0);

await cleanup(ns, nc);
});

0 comments on commit 9fd6418

Please sign in to comment.